# -*- coding: utf-8 -*- """ 任务处理器基类 提供所有处理器共用的基础功能。 """ import os import json import logging import shutil import tempfile import subprocess import threading from abc import ABC from typing import Optional, List, Dict, Any, Tuple, TYPE_CHECKING from core.handler import TaskHandler from domain.task import Task from domain.result import TaskResult, ErrorCode from domain.config import WorkerConfig from services import storage from services.cache import MaterialCache from constant import ( HW_ACCEL_NONE, HW_ACCEL_QSV, HW_ACCEL_CUDA, VIDEO_ENCODE_PARAMS, VIDEO_ENCODE_PARAMS_QSV, VIDEO_ENCODE_PARAMS_CUDA ) if TYPE_CHECKING: from services.api_client import APIClientV2 logger = logging.getLogger(__name__) def get_video_encode_args(hw_accel: str = HW_ACCEL_NONE) -> List[str]: """ 根据硬件加速配置获取视频编码参数 Args: hw_accel: 硬件加速类型 (none, qsv, cuda) Returns: FFmpeg 视频编码参数列表 """ if hw_accel == HW_ACCEL_QSV: params = VIDEO_ENCODE_PARAMS_QSV return [ '-c:v', params['codec'], '-preset', params['preset'], '-profile:v', params['profile'], '-level', params['level'], '-global_quality', params['global_quality'], '-look_ahead', params['look_ahead'], ] elif hw_accel == HW_ACCEL_CUDA: params = VIDEO_ENCODE_PARAMS_CUDA return [ '-c:v', params['codec'], '-preset', params['preset'], '-profile:v', params['profile'], '-level', params['level'], '-rc', params['rc'], '-cq', params['cq'], '-b:v', '0', # 配合 vbr 模式使用 cq ] else: # 软件编码(默认) params = VIDEO_ENCODE_PARAMS return [ '-c:v', params['codec'], '-preset', params['preset'], '-profile:v', params['profile'], '-level', params['level'], '-crf', params['crf'], '-pix_fmt', params['pix_fmt'], ] def get_hwaccel_decode_args(hw_accel: str = HW_ACCEL_NONE, device_index: Optional[int] = None) -> List[str]: """ 获取硬件加速解码参数(输入文件之前使用) Args: hw_accel: 硬件加速类型 (none, qsv, cuda) device_index: GPU 设备索引,用于多显卡调度 Returns: FFmpeg 硬件加速解码参数列表 """ if hw_accel == HW_ACCEL_CUDA: # CUDA 硬件加速解码 args = ['-hwaccel', 'cuda'] # 多显卡模式下指定设备 if device_index is not None: args.extend(['-hwaccel_device', str(device_index)]) args.extend(['-hwaccel_output_format', 'cuda']) return args elif hw_accel == HW_ACCEL_QSV: # QSV 硬件加速解码 args = ['-hwaccel', 'qsv'] # QSV 在 Windows 上使用 -qsv_device if device_index is not None: args.extend(['-qsv_device', str(device_index)]) args.extend(['-hwaccel_output_format', 'qsv']) return args else: return [] def get_hwaccel_filter_prefix(hw_accel: str = HW_ACCEL_NONE) -> str: """ 获取硬件加速滤镜前缀(用于 hwdownload 从 GPU 到 CPU) 注意:由于大多数复杂滤镜(如 lut3d, overlay, crop 等)不支持硬件表面, 我们需要在滤镜链开始时将硬件表面下载到系统内存。 CUDA/QSV hwdownload 只支持 nv12 格式输出,因此需要两步转换: 1. hwdownload,format=nv12 - 从 GPU 下载到 CPU 2. format=yuv420p - 转换为标准格式(确保与 RGBA/YUVA overlay 混合时颜色正确) Args: hw_accel: 硬件加速类型 Returns: 需要添加到滤镜链开头的 hwdownload 滤镜字符串 """ if hw_accel == HW_ACCEL_CUDA: return 'hwdownload,format=nv12,format=yuv420p,' elif hw_accel == HW_ACCEL_QSV: return 'hwdownload,format=nv12,format=yuv420p,' else: return '' # v2 统一视频编码参数(兼容旧代码,使用软件编码) VIDEO_ENCODE_ARGS = get_video_encode_args(HW_ACCEL_NONE) # v2 统一音频编码参数 AUDIO_ENCODE_ARGS = [ '-c:a', 'aac', '-b:a', '128k', '-ar', '48000', '-ac', '2', ] FFMPEG_LOGLEVEL = 'error' def subprocess_args(include_stdout: bool = True) -> Dict[str, Any]: """ 创建跨平台的 subprocess 参数 在 Windows 上使用 Pyinstaller --noconsole 打包时,需要特殊处理以避免弹出命令行窗口。 Args: include_stdout: 是否包含 stdout 捕获 Returns: subprocess.run 使用的参数字典 """ ret: Dict[str, Any] = {} # Windows 特殊处理 if hasattr(subprocess, 'STARTUPINFO'): si = subprocess.STARTUPINFO() si.dwFlags |= subprocess.STARTF_USESHOWWINDOW ret['startupinfo'] = si ret['env'] = os.environ # 重定向 stdin 避免 "handle is invalid" 错误 ret['stdin'] = subprocess.PIPE if include_stdout: ret['stdout'] = subprocess.PIPE return ret def probe_video_info(video_file: str) -> Tuple[int, int, float]: """ 探测视频信息(宽度、高度、时长) Args: video_file: 视频文件路径 Returns: (width, height, duration) 元组,失败返回 (0, 0, 0) """ try: result = subprocess.run( [ 'ffprobe', '-v', 'error', '-select_streams', 'v:0', '-show_entries', 'stream=width,height:format=duration', '-of', 'csv=s=x:p=0', video_file ], capture_output=True, timeout=30, **subprocess_args(False) ) if result.returncode != 0: logger.warning(f"ffprobe failed for {video_file}") return 0, 0, 0 output = result.stdout.decode('utf-8').strip() if not output: return 0, 0, 0 lines = output.split('\n') if len(lines) >= 2: wh = lines[0].strip() duration_str = lines[1].strip() width, height = wh.split('x') return int(width), int(height), float(duration_str) return 0, 0, 0 except Exception as e: logger.warning(f"probe_video_info error: {e}") return 0, 0, 0 def probe_duration_json(file_path: str) -> Optional[float]: """ 使用 ffprobe JSON 输出探测媒体时长 Args: file_path: 媒体文件路径 Returns: 时长(秒),失败返回 None """ try: result = subprocess.run( [ 'ffprobe', '-v', 'error', '-show_entries', 'format=duration', '-of', 'json', file_path ], capture_output=True, timeout=30, **subprocess_args(False) ) if result.returncode != 0: return None data = json.loads(result.stdout.decode('utf-8')) duration = data.get('format', {}).get('duration') return float(duration) if duration else None except Exception as e: logger.warning(f"probe_duration_json error: {e}") return None class BaseHandler(TaskHandler, ABC): """ 任务处理器基类 提供所有处理器共用的基础功能,包括: - 临时目录管理 - 文件下载/上传 - FFmpeg 命令执行 - GPU 设备管理(多显卡调度) - 日志记录 """ # 线程本地存储:用于存储当前线程的 GPU 设备索引 _thread_local = threading.local() def __init__(self, config: WorkerConfig, api_client: 'APIClientV2'): """ 初始化处理器 Args: config: Worker 配置 api_client: API 客户端 """ self.config = config self.api_client = api_client self.material_cache = MaterialCache( cache_dir=config.cache_dir, enabled=config.cache_enabled, max_size_gb=config.cache_max_size_gb ) # ========== GPU 设备管理 ========== def set_gpu_device(self, device_index: int) -> None: """ 设置当前线程的 GPU 设备索引 由 TaskExecutor 在任务执行前调用。 Args: device_index: GPU 设备索引 """ self._thread_local.gpu_device = device_index def get_gpu_device(self) -> Optional[int]: """ 获取当前线程的 GPU 设备索引 Returns: GPU 设备索引,未设置则返回 None """ return getattr(self._thread_local, 'gpu_device', None) def clear_gpu_device(self) -> None: """ 清除当前线程的 GPU 设备索引 由 TaskExecutor 在任务执行后调用。 """ if hasattr(self._thread_local, 'gpu_device'): del self._thread_local.gpu_device # ========== FFmpeg 参数生成 ========== def get_video_encode_args(self) -> List[str]: """ 获取当前配置的视频编码参数 Returns: FFmpeg 视频编码参数列表 """ return get_video_encode_args(self.config.hw_accel) def get_hwaccel_decode_args(self) -> List[str]: """ 获取硬件加速解码参数(支持设备指定) Returns: FFmpeg 硬件加速解码参数列表 """ device_index = self.get_gpu_device() return get_hwaccel_decode_args(self.config.hw_accel, device_index) def get_hwaccel_filter_prefix(self) -> str: """ 获取硬件加速滤镜前缀 Returns: 需要添加到滤镜链开头的 hwdownload 滤镜字符串 """ return get_hwaccel_filter_prefix(self.config.hw_accel) def before_handle(self, task: Task) -> None: """处理前钩子""" logger.debug(f"[task:{task.task_id}] Before handle: {task.task_type.value}") def after_handle(self, task: Task, result: TaskResult) -> None: """处理后钩子""" status = "success" if result.success else "failed" logger.debug(f"[task:{task.task_id}] After handle: {status}") def create_work_dir(self, task_id: str = None) -> str: """ 创建临时工作目录 Args: task_id: 任务 ID(用于目录命名) Returns: 工作目录路径 """ # 确保临时根目录存在 os.makedirs(self.config.temp_dir, exist_ok=True) # 创建唯一的工作目录 prefix = f"task_{task_id}_" if task_id else "task_" work_dir = tempfile.mkdtemp(dir=self.config.temp_dir, prefix=prefix) logger.debug(f"Created work directory: {work_dir}") return work_dir def cleanup_work_dir(self, work_dir: str) -> None: """ 清理临时工作目录 Args: work_dir: 工作目录路径 """ if not work_dir or not os.path.exists(work_dir): return try: shutil.rmtree(work_dir) logger.debug(f"Cleaned up work directory: {work_dir}") except Exception as e: logger.warning(f"Failed to cleanup work directory {work_dir}: {e}") def download_file(self, url: str, dest: str, timeout: int = None, use_cache: bool = True) -> bool: """ 下载文件(支持缓存) Args: url: 文件 URL dest: 目标路径 timeout: 超时时间(秒) use_cache: 是否使用缓存(默认 True) Returns: 是否成功 """ if timeout is None: timeout = self.config.download_timeout try: if use_cache: # 使用缓存下载 result = self.material_cache.get_or_download(url, dest, timeout=timeout) else: # 直接下载(不走缓存) result = storage.download_file(url, dest, timeout=timeout) if result: file_size = os.path.getsize(dest) if os.path.exists(dest) else 0 logger.debug(f"Downloaded: {url} -> {dest} ({file_size} bytes)") return result except Exception as e: logger.error(f"Download failed: {url} -> {e}") return False def upload_file( self, task_id: str, file_type: str, file_path: str, file_name: str = None ) -> Optional[str]: """ 上传文件并返回访问 URL Args: task_id: 任务 ID file_type: 文件类型(video/audio/ts/mp4) file_path: 本地文件路径 file_name: 文件名(可选) Returns: 访问 URL,失败返回 None """ # 获取上传 URL upload_info = self.api_client.get_upload_url(task_id, file_type, file_name) if not upload_info: logger.error(f"[task:{task_id}] Failed to get upload URL") return None upload_url = upload_info.get('uploadUrl') access_url = upload_info.get('accessUrl') if not upload_url: logger.error(f"[task:{task_id}] Invalid upload URL response") return None # 上传文件 try: result = storage.upload_file(upload_url, file_path, timeout=self.config.upload_timeout) if result: file_size = os.path.getsize(file_path) logger.info(f"[task:{task_id}] Uploaded: {file_path} ({file_size} bytes)") return access_url else: logger.error(f"[task:{task_id}] Upload failed: {file_path}") return None except Exception as e: logger.error(f"[task:{task_id}] Upload error: {e}") return None def run_ffmpeg( self, cmd: List[str], task_id: str, timeout: int = None ) -> bool: """ 执行 FFmpeg 命令 Args: cmd: FFmpeg 命令参数列表 task_id: 任务 ID(用于日志) timeout: 超时时间(秒) Returns: 是否成功 """ if timeout is None: timeout = self.config.ffmpeg_timeout cmd_to_run = list(cmd) if cmd_to_run and cmd_to_run[0] == 'ffmpeg' and '-loglevel' not in cmd_to_run: cmd_to_run[1:1] = ['-loglevel', FFMPEG_LOGLEVEL] # 日志记录命令(限制长度) cmd_str = ' '.join(cmd_to_run) if len(cmd_str) > 500: cmd_str = cmd_str[:500] + '...' logger.info(f"[task:{task_id}] FFmpeg: {cmd_str}") try: run_args = subprocess_args(False) run_args['stdout'] = subprocess.DEVNULL run_args['stderr'] = subprocess.PIPE result = subprocess.run( cmd_to_run, timeout=timeout, **run_args ) if result.returncode != 0: stderr = (result.stderr or b'').decode('utf-8', errors='replace')[:1000] logger.error(f"[task:{task_id}] FFmpeg failed (code={result.returncode}): {stderr}") return False return True except subprocess.TimeoutExpired: logger.error(f"[task:{task_id}] FFmpeg timeout after {timeout}s") return False except Exception as e: logger.error(f"[task:{task_id}] FFmpeg error: {e}") return False def probe_duration(self, file_path: str) -> Optional[float]: """ 探测媒体文件时长 Args: file_path: 文件路径 Returns: 时长(秒),失败返回 None """ # 首先尝试 JSON 输出方式 duration = probe_duration_json(file_path) if duration is not None: return duration # 回退到旧方式 try: _, _, duration = probe_video_info(file_path) return float(duration) if duration else None except Exception as e: logger.warning(f"Failed to probe duration: {file_path} -> {e}") return None def get_file_size(self, file_path: str) -> int: """ 获取文件大小 Args: file_path: 文件路径 Returns: 文件大小(字节) """ try: return os.path.getsize(file_path) except Exception: return 0 def ensure_file_exists(self, file_path: str, min_size: int = 0) -> bool: """ 确保文件存在且大小满足要求 Args: file_path: 文件路径 min_size: 最小大小(字节) Returns: 是否满足要求 """ if not os.path.exists(file_path): return False return os.path.getsize(file_path) >= min_size