# 保留用于向后兼容的常量定义 import os DEFAULT_ARGS = ("-shortest",) ENCODER_ARGS = ( ( "-c:v", "h264", ) if not os.getenv("ENCODER_ARGS", False) else os.getenv("ENCODER_ARGS", "").split(" ") ) VIDEO_ARGS = ( ( "-profile:v", "high", "-level:v", "4", ) if not os.getenv("VIDEO_ARGS", False) else os.getenv("VIDEO_ARGS", "").split(" ") ) AUDIO_ARGS = ( "-c:a", "aac", "-b:a", "128k", "-ar", "48000", "-ac", "2", ) MUTE_AUDIO_INPUT = ( "-f", "lavfi", "-i", "anullsrc=cl=stereo:r=48000", ) def get_mp4toannexb_filter(): """ Determine which mp4toannexb filter to use based on ENCODER_ARGS. Returns 'hevc_mp4toannexb' if ENCODER_ARGS contains 'hevc', otherwise 'h264_mp4toannexb'. """ encoder_args_str = os.getenv("ENCODER_ARGS", "").lower() if "hevc" in encoder_args_str: return "hevc_mp4toannexb" return "h264_mp4toannexb" class FfmpegTask(object): """ 兼容类:保留原有FfmpegTask接口用于向后兼容 实际处理逻辑已迁移到新架构,该类主要用作数据载体 """ def __init__(self, input_file, task_type="copy", output_file=""): """保持原有构造函数签名""" self.annexb = False if type(input_file) is str: if input_file.endswith(".ts"): self.annexb = True self.input_file = [input_file] elif type(input_file) is list: self.input_file = input_file else: self.input_file = [] self.zoom_cut = None self.center_cut = None self.ext_data = {} self.task_type = task_type self.output_file = output_file or "" self.mute = True self.speed = 1 self.frame_rate = 25 self.resolution = None self.subtitles = [] self.luts = [] self.audios = [] self.overlays = [] self.effects = [] def __repr__(self): return f"FfmpegTask(input_file={self.input_file}, task_type={self.task_type})" def analyze_input_render_tasks(self): """分析输入中的子任务""" for i in self.input_file: if isinstance(i, FfmpegTask) and i.need_run(): yield i def need_run(self): """判断是否需要运行""" if self.annexb: return True return not self.check_can_copy() def add_inputs(self, *inputs): """添加输入文件""" self.input_file.extend(inputs) def add_overlay(self, *overlays): """添加覆盖层""" for overlay in overlays: if str(overlay).endswith(".ass"): self.subtitles.append(overlay) else: self.overlays.append(overlay) self.correct_task_type() def add_audios(self, *audios): """添加音频""" self.audios.extend(audios) self.correct_task_type() def add_lut(self, *luts): """添加LUT""" self.luts.extend(luts) self.correct_task_type() def add_effect(self, *effects): """添加效果""" self.effects.extend(effects) self.correct_task_type() def get_output_file(self): """获取输出文件""" if self.task_type == "copy": return self.input_file[0] if self.input_file else "" if not self.output_file: self.set_output_file() return self.output_file def correct_task_type(self): """校正任务类型""" if self.check_can_copy(): self.task_type = "copy" elif self.check_can_concat(): self.task_type = "concat" else: self.task_type = "encode" def check_can_concat(self): """检查是否可以连接""" return ( len(self.luts) == 0 and len(self.overlays) == 0 and len(self.subtitles) == 0 and len(self.effects) == 0 and self.speed == 1 and self.zoom_cut is None and self.center_cut is None ) def check_can_copy(self): """检查是否可以复制""" return ( len(self.luts) == 0 and len(self.overlays) == 0 and len(self.subtitles) == 0 and len(self.effects) == 0 and self.speed == 1 and len(self.audios) == 0 and len(self.input_file) <= 1 and self.zoom_cut is None and self.center_cut is None ) def set_output_file(self, file=None): """设置输出文件""" if file is None: import uuid if self.annexb: self.output_file = f"rand_{uuid.uuid4()}.ts" else: self.output_file = f"rand_{uuid.uuid4()}.mp4" else: if isinstance(file, FfmpegTask): if file != self: self.output_file = file.get_output_file() elif isinstance(file, str): self.output_file = file def check_annexb(self): """检查annexb格式""" return self.annexb def get_ffmpeg_args(self): """ 保留用于向后兼容,但实际逻辑已迁移到新架构 建议使用新的 FFmpegCommandBuilder 来生成命令 """ # 简化版本,主要用于向后兼容 if self.task_type == "copy" and len(self.input_file) == 1: if isinstance(self.input_file[0], str): if self.input_file[0] == self.get_output_file(): return [] return [ "-y", "-hide_banner", "-i", self.input_file[0], "-c", "copy", self.get_output_file(), ] # 对于复杂情况,返回基础命令结构 # 实际处理会在新的服务架构中完成 return ( ["-y", "-hide_banner", "-i"] + self.input_file + ["-c", "copy", self.get_output_file()] )