# -*- coding: utf-8 -*- """ 任务处理器基类 提供所有处理器共用的基础功能。 """ import os import json import logging import shutil import tempfile import subprocess from abc import ABC from typing import Optional, List, Dict, Any, Tuple, TYPE_CHECKING from core.handler import TaskHandler from domain.task import Task from domain.result import TaskResult, ErrorCode from domain.config import WorkerConfig from services import storage from constant import ( HW_ACCEL_NONE, HW_ACCEL_QSV, HW_ACCEL_CUDA, VIDEO_ENCODE_PARAMS, VIDEO_ENCODE_PARAMS_QSV, VIDEO_ENCODE_PARAMS_CUDA ) if TYPE_CHECKING: from services.api_client import APIClientV2 logger = logging.getLogger(__name__) def get_video_encode_args(hw_accel: str = HW_ACCEL_NONE) -> List[str]: """ 根据硬件加速配置获取视频编码参数 Args: hw_accel: 硬件加速类型 (none, qsv, cuda) Returns: FFmpeg 视频编码参数列表 """ if hw_accel == HW_ACCEL_QSV: params = VIDEO_ENCODE_PARAMS_QSV return [ '-c:v', params['codec'], '-preset', params['preset'], '-profile:v', params['profile'], '-level', params['level'], '-global_quality', params['global_quality'], '-look_ahead', params['look_ahead'], ] elif hw_accel == HW_ACCEL_CUDA: params = VIDEO_ENCODE_PARAMS_CUDA return [ '-c:v', params['codec'], '-preset', params['preset'], '-profile:v', params['profile'], '-level', params['level'], '-rc', params['rc'], '-cq', params['cq'], '-b:v', '0', # 配合 vbr 模式使用 cq ] else: # 软件编码(默认) params = VIDEO_ENCODE_PARAMS return [ '-c:v', params['codec'], '-preset', params['preset'], '-profile:v', params['profile'], '-level', params['level'], '-crf', params['crf'], '-pix_fmt', params['pix_fmt'], ] def get_hwaccel_decode_args(hw_accel: str = HW_ACCEL_NONE) -> List[str]: """ 获取硬件加速解码参数(输入文件之前使用) Args: hw_accel: 硬件加速类型 (none, qsv, cuda) Returns: FFmpeg 硬件加速解码参数列表 """ if hw_accel == HW_ACCEL_CUDA: # CUDA 硬件加速解码 # 注意:使用 cuda 作为 hwaccel,但输出到系统内存以便 CPU 滤镜处理 return ['-hwaccel', 'cuda', '-hwaccel_output_format', 'cuda'] elif hw_accel == HW_ACCEL_QSV: # QSV 硬件加速解码 return ['-hwaccel', 'qsv', '-hwaccel_output_format', 'qsv'] else: return [] def get_hwaccel_filter_prefix(hw_accel: str = HW_ACCEL_NONE) -> str: """ 获取硬件加速滤镜前缀(用于 hwdownload 从 GPU 到 CPU) 注意:由于大多数复杂滤镜(如 lut3d, overlay, crop 等)不支持硬件表面, 我们需要在滤镜链开始时将硬件表面下载到系统内存。 Args: hw_accel: 硬件加速类型 Returns: 需要添加到滤镜链开头的 hwdownload 滤镜字符串 """ if hw_accel == HW_ACCEL_CUDA: return 'hwdownload,format=nv12,' elif hw_accel == HW_ACCEL_QSV: return 'hwdownload,format=nv12,' else: return '' # v2 统一视频编码参数(兼容旧代码,使用软件编码) VIDEO_ENCODE_ARGS = get_video_encode_args(HW_ACCEL_NONE) # v2 统一音频编码参数 AUDIO_ENCODE_ARGS = [ '-c:a', 'aac', '-b:a', '128k', '-ar', '48000', '-ac', '2', ] def subprocess_args(include_stdout: bool = True) -> Dict[str, Any]: """ 创建跨平台的 subprocess 参数 在 Windows 上使用 Pyinstaller --noconsole 打包时,需要特殊处理以避免弹出命令行窗口。 Args: include_stdout: 是否包含 stdout 捕获 Returns: subprocess.run 使用的参数字典 """ ret: Dict[str, Any] = {} # Windows 特殊处理 if hasattr(subprocess, 'STARTUPINFO'): si = subprocess.STARTUPINFO() si.dwFlags |= subprocess.STARTF_USESHOWWINDOW ret['startupinfo'] = si ret['env'] = os.environ # 重定向 stdin 避免 "handle is invalid" 错误 ret['stdin'] = subprocess.PIPE if include_stdout: ret['stdout'] = subprocess.PIPE return ret def probe_video_info(video_file: str) -> Tuple[int, int, float]: """ 探测视频信息(宽度、高度、时长) Args: video_file: 视频文件路径 Returns: (width, height, duration) 元组,失败返回 (0, 0, 0) """ try: result = subprocess.run( [ 'ffprobe', '-v', 'error', '-select_streams', 'v:0', '-show_entries', 'stream=width,height:format=duration', '-of', 'csv=s=x:p=0', video_file ], capture_output=True, timeout=30, **subprocess_args(False) ) if result.returncode != 0: logger.warning(f"ffprobe failed for {video_file}") return 0, 0, 0 output = result.stdout.decode('utf-8').strip() if not output: return 0, 0, 0 lines = output.split('\n') if len(lines) >= 2: wh = lines[0].strip() duration_str = lines[1].strip() width, height = wh.split('x') return int(width), int(height), float(duration_str) return 0, 0, 0 except Exception as e: logger.warning(f"probe_video_info error: {e}") return 0, 0, 0 def probe_duration_json(file_path: str) -> Optional[float]: """ 使用 ffprobe JSON 输出探测媒体时长 Args: file_path: 媒体文件路径 Returns: 时长(秒),失败返回 None """ try: result = subprocess.run( [ 'ffprobe', '-v', 'error', '-show_entries', 'format=duration', '-of', 'json', file_path ], capture_output=True, timeout=30, **subprocess_args(False) ) if result.returncode != 0: return None data = json.loads(result.stdout.decode('utf-8')) duration = data.get('format', {}).get('duration') return float(duration) if duration else None except Exception as e: logger.warning(f"probe_duration_json error: {e}") return None class BaseHandler(TaskHandler, ABC): """ 任务处理器基类 提供所有处理器共用的基础功能,包括: - 临时目录管理 - 文件下载/上传 - FFmpeg 命令执行 - 日志记录 """ def __init__(self, config: WorkerConfig, api_client: 'APIClientV2'): """ 初始化处理器 Args: config: Worker 配置 api_client: API 客户端 """ self.config = config self.api_client = api_client def get_video_encode_args(self) -> List[str]: """ 获取当前配置的视频编码参数 Returns: FFmpeg 视频编码参数列表 """ return get_video_encode_args(self.config.hw_accel) def get_hwaccel_decode_args(self) -> List[str]: """ 获取硬件加速解码参数(在输入文件之前使用) Returns: FFmpeg 硬件加速解码参数列表 """ return get_hwaccel_decode_args(self.config.hw_accel) def get_hwaccel_filter_prefix(self) -> str: """ 获取硬件加速滤镜前缀 Returns: 需要添加到滤镜链开头的 hwdownload 滤镜字符串 """ return get_hwaccel_filter_prefix(self.config.hw_accel) def before_handle(self, task: Task) -> None: """处理前钩子""" logger.debug(f"[task:{task.task_id}] Before handle: {task.task_type.value}") def after_handle(self, task: Task, result: TaskResult) -> None: """处理后钩子""" status = "success" if result.success else "failed" logger.debug(f"[task:{task.task_id}] After handle: {status}") def create_work_dir(self, task_id: str = None) -> str: """ 创建临时工作目录 Args: task_id: 任务 ID(用于目录命名) Returns: 工作目录路径 """ # 确保临时根目录存在 os.makedirs(self.config.temp_dir, exist_ok=True) # 创建唯一的工作目录 prefix = f"task_{task_id}_" if task_id else "task_" work_dir = tempfile.mkdtemp(dir=self.config.temp_dir, prefix=prefix) logger.debug(f"Created work directory: {work_dir}") return work_dir def cleanup_work_dir(self, work_dir: str) -> None: """ 清理临时工作目录 Args: work_dir: 工作目录路径 """ if not work_dir or not os.path.exists(work_dir): return try: shutil.rmtree(work_dir) logger.debug(f"Cleaned up work directory: {work_dir}") except Exception as e: logger.warning(f"Failed to cleanup work directory {work_dir}: {e}") def download_file(self, url: str, dest: str, timeout: int = None) -> bool: """ 下载文件 Args: url: 文件 URL dest: 目标路径 timeout: 超时时间(秒) Returns: 是否成功 """ if timeout is None: timeout = self.config.download_timeout try: result = storage.download_file(url, dest, timeout=timeout) if result: file_size = os.path.getsize(dest) if os.path.exists(dest) else 0 logger.debug(f"Downloaded: {url} -> {dest} ({file_size} bytes)") return result except Exception as e: logger.error(f"Download failed: {url} -> {e}") return False def upload_file( self, task_id: str, file_type: str, file_path: str, file_name: str = None ) -> Optional[str]: """ 上传文件并返回访问 URL Args: task_id: 任务 ID file_type: 文件类型(video/audio/ts/mp4) file_path: 本地文件路径 file_name: 文件名(可选) Returns: 访问 URL,失败返回 None """ # 获取上传 URL upload_info = self.api_client.get_upload_url(task_id, file_type, file_name) if not upload_info: logger.error(f"[task:{task_id}] Failed to get upload URL") return None upload_url = upload_info.get('uploadUrl') access_url = upload_info.get('accessUrl') if not upload_url: logger.error(f"[task:{task_id}] Invalid upload URL response") return None # 上传文件 try: result = storage.upload_file(upload_url, file_path, timeout=self.config.upload_timeout) if result: file_size = os.path.getsize(file_path) logger.info(f"[task:{task_id}] Uploaded: {file_path} ({file_size} bytes)") return access_url else: logger.error(f"[task:{task_id}] Upload failed: {file_path}") return None except Exception as e: logger.error(f"[task:{task_id}] Upload error: {e}") return None def run_ffmpeg( self, cmd: List[str], task_id: str, timeout: int = None ) -> bool: """ 执行 FFmpeg 命令 Args: cmd: FFmpeg 命令参数列表 task_id: 任务 ID(用于日志) timeout: 超时时间(秒) Returns: 是否成功 """ if timeout is None: timeout = self.config.ffmpeg_timeout # 日志记录命令(限制长度) cmd_str = ' '.join(cmd) if len(cmd_str) > 500: cmd_str = cmd_str[:500] + '...' logger.info(f"[task:{task_id}] FFmpeg: {cmd_str}") try: result = subprocess.run( cmd, capture_output=True, timeout=timeout, **subprocess_args(False) ) if result.returncode != 0: stderr = result.stderr.decode('utf-8', errors='replace')[:1000] logger.error(f"[task:{task_id}] FFmpeg failed (code={result.returncode}): {stderr}") return False return True except subprocess.TimeoutExpired: logger.error(f"[task:{task_id}] FFmpeg timeout after {timeout}s") return False except Exception as e: logger.error(f"[task:{task_id}] FFmpeg error: {e}") return False def probe_duration(self, file_path: str) -> Optional[float]: """ 探测媒体文件时长 Args: file_path: 文件路径 Returns: 时长(秒),失败返回 None """ # 首先尝试 JSON 输出方式 duration = probe_duration_json(file_path) if duration is not None: return duration # 回退到旧方式 try: _, _, duration = probe_video_info(file_path) return float(duration) if duration else None except Exception as e: logger.warning(f"Failed to probe duration: {file_path} -> {e}") return None def get_file_size(self, file_path: str) -> int: """ 获取文件大小 Args: file_path: 文件路径 Returns: 文件大小(字节) """ try: return os.path.getsize(file_path) except Exception: return 0 def ensure_file_exists(self, file_path: str, min_size: int = 0) -> bool: """ 确保文件存在且大小满足要求 Args: file_path: 文件路径 min_size: 最小大小(字节) Returns: 是否满足要求 """ if not os.path.exists(file_path): return False return os.path.getsize(file_path) >= min_size