# -*- coding: utf-8 -*- """ 任务处理器基类 提供所有处理器共用的基础功能。 """ import os import json import logging import shutil import tempfile import subprocess import threading from concurrent.futures import ThreadPoolExecutor, as_completed from abc import ABC from typing import Optional, List, Dict, Any, Tuple, TYPE_CHECKING from opentelemetry.trace import SpanKind from core.handler import TaskHandler from domain.task import Task from domain.result import TaskResult, ErrorCode from domain.config import WorkerConfig from services import storage from services.cache import MaterialCache from util.tracing import ( bind_trace_context, capture_otel_context, get_current_task_context, mark_span_error, start_span, ) from constant import ( HW_ACCEL_NONE, HW_ACCEL_QSV, HW_ACCEL_CUDA, VIDEO_ENCODE_PARAMS, VIDEO_ENCODE_PARAMS_QSV, VIDEO_ENCODE_PARAMS_CUDA ) if TYPE_CHECKING: from services.api_client import APIClientV2 logger = logging.getLogger(__name__) def get_video_encode_args(hw_accel: str = HW_ACCEL_NONE) -> List[str]: """ 根据硬件加速配置获取视频编码参数 Args: hw_accel: 硬件加速类型 (none, qsv, cuda) Returns: FFmpeg 视频编码参数列表 """ if hw_accel == HW_ACCEL_QSV: params = VIDEO_ENCODE_PARAMS_QSV return [ '-c:v', params['codec'], '-preset', params['preset'], '-profile:v', params['profile'], '-level', params['level'], '-global_quality', params['global_quality'], '-look_ahead', params['look_ahead'], ] elif hw_accel == HW_ACCEL_CUDA: params = VIDEO_ENCODE_PARAMS_CUDA return [ '-c:v', params['codec'], '-preset', params['preset'], '-profile:v', params['profile'], '-level', params['level'], '-rc', params['rc'], '-cq', params['cq'], '-b:v', '0', # 配合 vbr 模式使用 cq ] else: # 软件编码(默认) params = VIDEO_ENCODE_PARAMS return [ '-c:v', params['codec'], '-preset', params['preset'], '-profile:v', params['profile'], '-level', params['level'], '-crf', params['crf'], '-pix_fmt', params['pix_fmt'], ] def get_hwaccel_decode_args(hw_accel: str = HW_ACCEL_NONE, device_index: Optional[int] = None) -> List[str]: """ 获取硬件加速解码参数(输入文件之前使用) Args: hw_accel: 硬件加速类型 (none, qsv, cuda) device_index: GPU 设备索引,用于多显卡调度 Returns: FFmpeg 硬件加速解码参数列表 """ if hw_accel == HW_ACCEL_CUDA: # CUDA 硬件加速解码 args = ['-hwaccel', 'cuda'] # 多显卡模式下指定设备 if device_index is not None: args.extend(['-hwaccel_device', str(device_index)]) args.extend(['-hwaccel_output_format', 'cuda']) return args elif hw_accel == HW_ACCEL_QSV: # QSV 硬件加速解码 args = ['-hwaccel', 'qsv'] # QSV 在 Windows 上使用 -qsv_device if device_index is not None: args.extend(['-qsv_device', str(device_index)]) args.extend(['-hwaccel_output_format', 'qsv']) return args else: return [] def get_hwaccel_filter_prefix(hw_accel: str = HW_ACCEL_NONE) -> str: """ 获取硬件加速滤镜前缀(用于 hwdownload 从 GPU 到 CPU) 注意:由于大多数复杂滤镜(如 lut3d, overlay, crop 等)不支持硬件表面, 我们需要在滤镜链开始时将硬件表面下载到系统内存。 CUDA/QSV hwdownload 只支持 nv12 格式输出,因此需要两步转换: 1. hwdownload,format=nv12 - 从 GPU 下载到 CPU 2. format=yuv420p - 转换为标准格式(确保与 RGBA/YUVA overlay 混合时颜色正确) Args: hw_accel: 硬件加速类型 Returns: 需要添加到滤镜链开头的 hwdownload 滤镜字符串 """ if hw_accel == HW_ACCEL_CUDA: return 'hwdownload,format=nv12,format=yuv420p,' elif hw_accel == HW_ACCEL_QSV: return 'hwdownload,format=nv12,format=yuv420p,' else: return '' # v2 统一视频编码参数(兼容旧代码,使用软件编码) VIDEO_ENCODE_ARGS = get_video_encode_args(HW_ACCEL_NONE) # v2 统一音频编码参数 AUDIO_ENCODE_ARGS = [ '-c:a', 'aac', '-b:a', '128k', '-ar', '48000', '-ac', '2', ] FFMPEG_LOGLEVEL = 'error' def subprocess_args(include_stdout: bool = True) -> Dict[str, Any]: """ 创建跨平台的 subprocess 参数 在 Windows 上使用 Pyinstaller --noconsole 打包时,需要特殊处理以避免弹出命令行窗口。 Args: include_stdout: 是否包含 stdout 捕获 Returns: subprocess.run 使用的参数字典 """ ret: Dict[str, Any] = {} # Windows 特殊处理 if hasattr(subprocess, 'STARTUPINFO'): si = subprocess.STARTUPINFO() si.dwFlags |= subprocess.STARTF_USESHOWWINDOW ret['startupinfo'] = si ret['env'] = os.environ # 重定向 stdin 避免 "handle is invalid" 错误 ret['stdin'] = subprocess.PIPE if include_stdout: ret['stdout'] = subprocess.PIPE return ret def probe_video_info(video_file: str) -> Tuple[int, int, float]: """ 探测视频信息(宽度、高度、时长) Args: video_file: 视频文件路径 Returns: (width, height, duration) 元组,失败返回 (0, 0, 0) """ try: result = subprocess.run( [ 'ffprobe', '-v', 'error', '-select_streams', 'v:0', '-show_entries', 'stream=width,height:format=duration', '-of', 'csv=s=x:p=0', video_file ], capture_output=True, timeout=30, **subprocess_args(False) ) if result.returncode != 0: logger.warning(f"ffprobe failed for {video_file}") return 0, 0, 0 output = result.stdout.decode('utf-8').strip() if not output: return 0, 0, 0 lines = output.split('\n') if len(lines) >= 2: wh = lines[0].strip() duration_str = lines[1].strip() width, height = wh.split('x') return int(width), int(height), float(duration_str) return 0, 0, 0 except Exception as e: logger.warning(f"probe_video_info error: {e}") return 0, 0, 0 def probe_duration_json(file_path: str) -> Optional[float]: """ 使用 ffprobe JSON 输出探测媒体时长 Args: file_path: 媒体文件路径 Returns: 时长(秒),失败返回 None """ try: result = subprocess.run( [ 'ffprobe', '-v', 'error', '-show_entries', 'format=duration', '-of', 'json', file_path ], capture_output=True, timeout=30, **subprocess_args(False) ) if result.returncode != 0: return None data = json.loads(result.stdout.decode('utf-8')) duration = data.get('format', {}).get('duration') return float(duration) if duration else None except Exception as e: logger.warning(f"probe_duration_json error: {e}") return None class BaseHandler(TaskHandler, ABC): """ 任务处理器基类 提供所有处理器共用的基础功能,包括: - 临时目录管理 - 文件下载/上传 - FFmpeg 命令执行 - GPU 设备管理(多显卡调度) - 日志记录 """ # 线程本地存储:用于存储当前线程的 GPU 设备索引 _thread_local = threading.local() DEFAULT_TASK_DOWNLOAD_CONCURRENCY = 4 DEFAULT_TASK_UPLOAD_CONCURRENCY = 2 MAX_TASK_TRANSFER_CONCURRENCY = 16 def __init__(self, config: WorkerConfig, api_client: 'APIClientV2'): """ 初始化处理器 Args: config: Worker 配置 api_client: API 客户端 """ self.config = config self.api_client = api_client self.material_cache = MaterialCache( cache_dir=config.cache_dir, enabled=config.cache_enabled, max_size_gb=config.cache_max_size_gb ) self.task_download_concurrency = self._resolve_task_transfer_concurrency( "TASK_DOWNLOAD_CONCURRENCY", self.DEFAULT_TASK_DOWNLOAD_CONCURRENCY ) self.task_upload_concurrency = self._resolve_task_transfer_concurrency( "TASK_UPLOAD_CONCURRENCY", self.DEFAULT_TASK_UPLOAD_CONCURRENCY ) def _resolve_task_transfer_concurrency(self, env_name: str, default_value: int) -> int: """读取并规范化任务内传输并发数配置。""" raw_value = os.getenv(env_name) if raw_value is None or not raw_value.strip(): return default_value try: parsed_value = int(raw_value.strip()) except ValueError: logger.warning( f"Invalid {env_name} value '{raw_value}', using default {default_value}" ) return default_value if parsed_value < 1: logger.warning(f"{env_name} must be >= 1, forcing to 1") return 1 if parsed_value > self.MAX_TASK_TRANSFER_CONCURRENCY: logger.warning( f"{env_name}={parsed_value} exceeds limit {self.MAX_TASK_TRANSFER_CONCURRENCY}, " f"using {self.MAX_TASK_TRANSFER_CONCURRENCY}" ) return self.MAX_TASK_TRANSFER_CONCURRENCY return parsed_value def download_files_parallel( self, download_jobs: List[Dict[str, Any]], timeout: Optional[int] = None ) -> Dict[str, Dict[str, Any]]: """ 单任务内并行下载多个文件。 Args: download_jobs: 下载任务列表。每项字段: - key: 唯一标识 - url: 下载地址 - dest: 目标文件路径 - required: 是否关键文件(可选,默认 True) - use_cache: 是否使用缓存(可选,默认 True) timeout: 单文件下载超时(秒) Returns: key -> 结果字典: - success: 是否成功 - url: 原始 URL - dest: 目标文件路径 - required: 是否关键文件 """ if not download_jobs: return {} normalized_jobs: List[Dict[str, Any]] = [] seen_keys = set() for download_job in download_jobs: job_key = str(download_job.get("key", "")).strip() job_url = str(download_job.get("url", "")).strip() job_dest = str(download_job.get("dest", "")).strip() if not job_key or not job_url or not job_dest: raise ValueError("Each download job must include non-empty key/url/dest") if job_key in seen_keys: raise ValueError(f"Duplicate download job key: {job_key}") seen_keys.add(job_key) normalized_jobs.append({ "key": job_key, "url": job_url, "dest": job_dest, "required": bool(download_job.get("required", True)), "use_cache": bool(download_job.get("use_cache", True)), }) if timeout is None: timeout = self.config.download_timeout parent_otel_context = capture_otel_context() task_context = get_current_task_context() task_prefix = f"[task:{task_context.task_id}] " if task_context else "" results: Dict[str, Dict[str, Any]] = {} def _run_download_job(download_job: Dict[str, Any]) -> bool: with bind_trace_context(parent_otel_context, task_context): return self.download_file( download_job["url"], download_job["dest"], timeout=timeout, use_cache=download_job["use_cache"], ) max_workers = min(self.task_download_concurrency, len(normalized_jobs)) if max_workers <= 1: for download_job in normalized_jobs: is_success = _run_download_job(download_job) results[download_job["key"]] = { "success": is_success, "url": download_job["url"], "dest": download_job["dest"], "required": download_job["required"], } else: with ThreadPoolExecutor( max_workers=max_workers, thread_name_prefix="TaskDownload", ) as executor: future_to_job = { executor.submit(_run_download_job, download_job): download_job for download_job in normalized_jobs } for completed_future in as_completed(future_to_job): download_job = future_to_job[completed_future] is_success = False try: is_success = bool(completed_future.result()) except Exception as exc: logger.error( f"{task_prefix}Parallel download raised exception for " f"key={download_job['key']}: {exc}" ) results[download_job["key"]] = { "success": is_success, "url": download_job["url"], "dest": download_job["dest"], "required": download_job["required"], } success_count = sum(1 for item in results.values() if item["success"]) logger.debug( f"{task_prefix}Parallel download completed: {success_count}/{len(normalized_jobs)}" ) return results def upload_files_parallel( self, upload_jobs: List[Dict[str, Any]] ) -> Dict[str, Dict[str, Any]]: """ 单任务内并行上传多个文件。 Args: upload_jobs: 上传任务列表。每项字段: - key: 唯一标识 - task_id: 任务 ID - file_type: 文件类型(video/audio/ts/mp4) - file_path: 本地文件路径 - file_name: 文件名(可选) - required: 是否关键文件(可选,默认 True) Returns: key -> 结果字典: - success: 是否成功 - url: 上传后的访问 URL(失败为 None) - file_path: 本地文件路径 - required: 是否关键文件 """ if not upload_jobs: return {} normalized_jobs: List[Dict[str, Any]] = [] seen_keys = set() for upload_job in upload_jobs: job_key = str(upload_job.get("key", "")).strip() task_id = str(upload_job.get("task_id", "")).strip() file_type = str(upload_job.get("file_type", "")).strip() file_path = str(upload_job.get("file_path", "")).strip() if not job_key or not task_id or not file_type or not file_path: raise ValueError( "Each upload job must include non-empty key/task_id/file_type/file_path" ) if job_key in seen_keys: raise ValueError(f"Duplicate upload job key: {job_key}") seen_keys.add(job_key) normalized_jobs.append({ "key": job_key, "task_id": task_id, "file_type": file_type, "file_path": file_path, "file_name": upload_job.get("file_name"), "required": bool(upload_job.get("required", True)), }) parent_otel_context = capture_otel_context() task_context = get_current_task_context() task_prefix = f"[task:{task_context.task_id}] " if task_context else "" results: Dict[str, Dict[str, Any]] = {} def _run_upload_job(upload_job: Dict[str, Any]) -> Optional[str]: with bind_trace_context(parent_otel_context, task_context): return self.upload_file( upload_job["task_id"], upload_job["file_type"], upload_job["file_path"], upload_job.get("file_name") ) max_workers = min(self.task_upload_concurrency, len(normalized_jobs)) if max_workers <= 1: for upload_job in normalized_jobs: result_url = _run_upload_job(upload_job) results[upload_job["key"]] = { "success": bool(result_url), "url": result_url, "file_path": upload_job["file_path"], "required": upload_job["required"], } else: with ThreadPoolExecutor( max_workers=max_workers, thread_name_prefix="TaskUpload", ) as executor: future_to_job = { executor.submit(_run_upload_job, upload_job): upload_job for upload_job in normalized_jobs } for completed_future in as_completed(future_to_job): upload_job = future_to_job[completed_future] result_url = None try: result_url = completed_future.result() except Exception as exc: logger.error( f"{task_prefix}Parallel upload raised exception for " f"key={upload_job['key']}: {exc}" ) results[upload_job["key"]] = { "success": bool(result_url), "url": result_url, "file_path": upload_job["file_path"], "required": upload_job["required"], } success_count = sum(1 for item in results.values() if item["success"]) logger.debug( f"{task_prefix}Parallel upload completed: {success_count}/{len(normalized_jobs)}" ) return results # ========== GPU 设备管理 ========== def set_gpu_device(self, device_index: int) -> None: """ 设置当前线程的 GPU 设备索引 由 TaskExecutor 在任务执行前调用。 Args: device_index: GPU 设备索引 """ self._thread_local.gpu_device = device_index def get_gpu_device(self) -> Optional[int]: """ 获取当前线程的 GPU 设备索引 Returns: GPU 设备索引,未设置则返回 None """ return getattr(self._thread_local, 'gpu_device', None) def clear_gpu_device(self) -> None: """ 清除当前线程的 GPU 设备索引 由 TaskExecutor 在任务执行后调用。 """ if hasattr(self._thread_local, 'gpu_device'): del self._thread_local.gpu_device # ========== FFmpeg 参数生成 ========== def get_video_encode_args(self) -> List[str]: """ 获取当前配置的视频编码参数 Returns: FFmpeg 视频编码参数列表 """ return get_video_encode_args(self.config.hw_accel) def get_hwaccel_decode_args(self) -> List[str]: """ 获取硬件加速解码参数(支持设备指定) Returns: FFmpeg 硬件加速解码参数列表 """ device_index = self.get_gpu_device() return get_hwaccel_decode_args(self.config.hw_accel, device_index) def get_hwaccel_filter_prefix(self) -> str: """ 获取硬件加速滤镜前缀 Returns: 需要添加到滤镜链开头的 hwdownload 滤镜字符串 """ return get_hwaccel_filter_prefix(self.config.hw_accel) def before_handle(self, task: Task) -> None: """处理前钩子""" logger.debug(f"[task:{task.task_id}] Before handle: {task.task_type.value}") def after_handle(self, task: Task, result: TaskResult) -> None: """处理后钩子""" status = "success" if result.success else "failed" logger.debug(f"[task:{task.task_id}] After handle: {status}") def create_work_dir(self, task_id: str = None) -> str: """ 创建临时工作目录 Args: task_id: 任务 ID(用于目录命名) Returns: 工作目录路径 """ # 确保临时根目录存在 os.makedirs(self.config.temp_dir, exist_ok=True) # 创建唯一的工作目录 prefix = f"task_{task_id}_" if task_id else "task_" work_dir = tempfile.mkdtemp(dir=self.config.temp_dir, prefix=prefix) logger.debug(f"Created work directory: {work_dir}") return work_dir def cleanup_work_dir(self, work_dir: str) -> None: """ 清理临时工作目录 Args: work_dir: 工作目录路径 """ if not work_dir or not os.path.exists(work_dir): return try: shutil.rmtree(work_dir) logger.debug(f"Cleaned up work directory: {work_dir}") except Exception as e: logger.warning(f"Failed to cleanup work directory {work_dir}: {e}") def download_file(self, url: str, dest: str, timeout: int = None, use_cache: bool = True) -> bool: """ 下载文件(支持缓存) Args: url: 文件 URL dest: 目标路径 timeout: 超时时间(秒) use_cache: 是否使用缓存(默认 True) Returns: 是否成功 """ if timeout is None: timeout = self.config.download_timeout task_context = get_current_task_context() task_prefix = f"[task:{task_context.task_id}] " if task_context else "" logger.debug(f"{task_prefix}Downloading from: {url} -> {dest}") with start_span( "render.task.file.download", kind=SpanKind.CLIENT, attributes={ "render.file.source_url": url, "render.file.destination": dest, "render.file.use_cache": use_cache, }, ) as span: try: if use_cache: result = self.material_cache.get_or_download(url, dest, timeout=timeout) else: result = storage.download_file(url, dest, timeout=timeout) if result: file_size = os.path.getsize(dest) if os.path.exists(dest) else 0 logger.debug(f"{task_prefix}Downloaded: {url} -> {dest} ({file_size} bytes)") if span is not None: span.set_attribute("render.file.size_bytes", file_size) return result except Exception as e: mark_span_error(span, str(e), ErrorCode.E_INPUT_UNAVAILABLE.value) logger.error(f"{task_prefix}Download failed: {e}") logger.debug(f"{task_prefix}Download source address: {url}") return False def upload_file( self, task_id: str, file_type: str, file_path: str, file_name: str = None ) -> Optional[str]: """ 上传文件并返回访问 URL Args: task_id: 任务 ID file_type: 文件类型(video/audio/ts/mp4) file_path: 本地文件路径 file_name: 文件名(可选) Returns: 访问 URL,失败返回 None """ with start_span( "render.task.file.upload", kind=SpanKind.CLIENT, attributes={ "render.file.type": file_type, "render.file.path": file_path, }, ) as span: upload_info = self.api_client.get_upload_url(task_id, file_type, file_name) if not upload_info: logger.error(f"[task:{task_id}] Failed to get upload URL") return None upload_url = upload_info.get('uploadUrl') access_url = upload_info.get('accessUrl') if not upload_url: logger.error(f"[task:{task_id}] Invalid upload URL response") return None logger.debug( f"[task:{task_id}] Upload target address: uploadUrl={upload_url}, accessUrl={access_url}" ) if span is not None: span.set_attribute("render.file.upload_url", upload_url) if access_url: span.set_attribute("render.file.access_url", access_url) try: result = storage.upload_file(upload_url, file_path, timeout=self.config.upload_timeout) if result: file_size = os.path.getsize(file_path) logger.info( f"[task:{task_id}] Uploaded: {file_path} ({file_size} bytes)" ) logger.debug(f"[task:{task_id}] Uploaded access address: {access_url or upload_url}") if span is not None: span.set_attribute("render.file.size_bytes", file_size) if access_url: self.material_cache.add_to_cache(access_url, file_path) return access_url logger.error(f"[task:{task_id}] Upload failed: {file_path}") return None except Exception as e: mark_span_error(span, str(e), ErrorCode.E_UPLOAD_FAILED.value) logger.error(f"[task:{task_id}] Upload error: {e}") return None def run_ffmpeg( self, cmd: List[str], task_id: str, timeout: int = None ) -> bool: """ 执行 FFmpeg 命令 Args: cmd: FFmpeg 命令参数列表 task_id: 任务 ID(用于日志) timeout: 超时时间(秒) Returns: 是否成功 """ if timeout is None: timeout = self.config.ffmpeg_timeout cmd_to_run = list(cmd) if cmd_to_run and cmd_to_run[0] == 'ffmpeg' and '-loglevel' not in cmd_to_run: cmd_to_run[1:1] = ['-loglevel', FFMPEG_LOGLEVEL] # 日志记录命令(不限制长度) cmd_str = ' '.join(cmd_to_run) logger.info(f"[task:{task_id}] FFmpeg: {cmd_str}") with start_span( "render.task.ffmpeg.run", attributes={ "render.ffmpeg.timeout_seconds": timeout, "render.ffmpeg.command": cmd_str, }, ) as span: try: run_args = subprocess_args(False) run_args['stdout'] = subprocess.DEVNULL run_args['stderr'] = subprocess.PIPE result = subprocess.run( cmd_to_run, timeout=timeout, **run_args ) if span is not None: span.set_attribute("render.ffmpeg.return_code", result.returncode) if result.returncode != 0: stderr = (result.stderr or b'').decode('utf-8', errors='replace')[:1000] logger.error(f"[task:{task_id}] FFmpeg failed (code={result.returncode}): {stderr}") mark_span_error(span, stderr or "ffmpeg failed", ErrorCode.E_FFMPEG_FAILED.value) return False return True except subprocess.TimeoutExpired: logger.error(f"[task:{task_id}] FFmpeg timeout after {timeout}s") mark_span_error(span, f"timeout after {timeout}s", ErrorCode.E_TIMEOUT.value) return False except Exception as e: logger.error(f"[task:{task_id}] FFmpeg error: {e}") mark_span_error(span, str(e), ErrorCode.E_FFMPEG_FAILED.value) return False def probe_duration(self, file_path: str) -> Optional[float]: """ 探测媒体文件时长 Args: file_path: 文件路径 Returns: 时长(秒),失败返回 None """ # 首先尝试 JSON 输出方式 duration = probe_duration_json(file_path) if duration is not None: return duration # 回退到旧方式 try: _, _, duration = probe_video_info(file_path) return float(duration) if duration else None except Exception as e: logger.warning(f"Failed to probe duration: {file_path} -> {e}") return None def get_file_size(self, file_path: str) -> int: """ 获取文件大小 Args: file_path: 文件路径 Returns: 文件大小(字节) """ try: return os.path.getsize(file_path) except Exception: return 0 def ensure_file_exists(self, file_path: str, min_size: int = 0) -> bool: """ 确保文件存在且大小满足要求 Args: file_path: 文件路径 min_size: 最小大小(字节) Returns: 是否满足要求 """ if not os.path.exists(file_path): return False return os.path.getsize(file_path) >= min_size