import os import uuid from typing import List, Optional, Dict, Any from dataclasses import dataclass, field from enum import Enum from config.settings import get_ffmpeg_config from util.exceptions import TaskValidationError, EffectError from entity.effects import registry as effect_registry class TaskType(Enum): COPY = "copy" CONCAT = "concat" ENCODE = "encode" @dataclass class RenderTask: """渲染任务数据类,只包含任务数据,不包含处理逻辑""" input_files: List[str] = field(default_factory=list) output_file: str = "" task_type: TaskType = TaskType.COPY # 视频参数 resolution: Optional[str] = None frame_rate: int = 25 speed: float = 1.0 mute: bool = True annexb: bool = False # 裁剪参数 zoom_cut: Optional[int] = None center_cut: Optional[int] = None # 资源列表 subtitles: List[str] = field(default_factory=list) luts: List[str] = field(default_factory=list) audios: List[str] = field(default_factory=list) overlays: List[str] = field(default_factory=list) effects: List[str] = field(default_factory=list) # 扩展数据 ext_data: Dict[str, Any] = field(default_factory=dict) def __post_init__(self): """初始化后处理""" # 检测annexb格式 for input_file in self.input_files: if isinstance(input_file, str) and input_file.endswith(".ts"): self.annexb = True break # 自动生成输出文件名 if not self.output_file: self._generate_output_filename() def _generate_output_filename(self): """生成输出文件名""" if self.annexb: self.output_file = f"rand_{uuid.uuid4()}.ts" else: self.output_file = f"rand_{uuid.uuid4()}.mp4" def add_input_file(self, file_path: str): """添加输入文件""" self.input_files.append(file_path) if file_path.endswith(".ts"): self.annexb = True def add_overlay(self, *overlays: str): """添加覆盖层""" for overlay in overlays: if overlay.endswith('.ass'): self.subtitles.append(overlay) else: self.overlays.append(overlay) def add_audios(self, *audios: str): """添加音频""" self.audios.extend(audios) def add_lut(self, *luts: str): """添加LUT""" self.luts.extend(luts) def add_effect(self, *effects: str): """添加效果""" self.effects.extend(effects) def validate(self) -> bool: """验证任务参数""" if not self.input_files: raise TaskValidationError("No input files specified") # 验证所有效果 for effect_str in self.effects: effect_name, params = effect_registry.parse_effect_string(effect_str) processor = effect_registry.get_processor(effect_name, params, self.ext_data) if processor and not processor.validate_params(): raise EffectError(f"Invalid parameters for effect {effect_name}: {params}", effect_name, params) return True def can_copy(self) -> bool: """检查是否可以直接复制""" return (len(self.luts) == 0 and len(self.overlays) == 0 and len(self.subtitles) == 0 and len(self.effects) == 0 and self.speed == 1 and len(self.audios) == 0 and len(self.input_files) == 1 and self.zoom_cut is None and self.center_cut is None) def can_concat(self) -> bool: """检查是否可以使用concat模式""" return (len(self.luts) == 0 and len(self.overlays) == 0 and len(self.subtitles) == 0 and len(self.effects) == 0 and self.speed == 1 and self.zoom_cut is None and self.center_cut is None) def determine_task_type(self) -> TaskType: """自动确定任务类型""" if self.can_copy(): return TaskType.COPY elif self.can_concat(): return TaskType.CONCAT else: return TaskType.ENCODE def update_task_type(self): """更新任务类型""" self.task_type = self.determine_task_type() def need_processing(self) -> bool: """检查是否需要处理""" if self.annexb: return True return not self.can_copy() def get_output_extension(self) -> str: """获取输出文件扩展名""" return ".ts" if self.annexb else ".mp4"