diff --git a/biz/ffmpeg.py b/biz/ffmpeg.py index 1b58512..0e6d785 100644 --- a/biz/ffmpeg.py +++ b/biz/ffmpeg.py @@ -27,114 +27,54 @@ def _get_render_service(): def parse_ffmpeg_task(task_info, template_info): - tracer = get_tracer(__name__) - with tracer.start_as_current_span("parse_ffmpeg_task") as span: - tasks = [] - # 中间片段 - task_params_str = task_info.get("taskParams", "{}") - span.set_attribute("task_params", task_params_str) - task_params: dict = json.loads(task_params_str) - task_params_orig = json.loads(task_params_str) - - # 统计only_if占位符的使用次数 - only_if_usage_count = {} - with tracer.start_as_current_span("parse_ffmpeg_task.download_all") as sub_span: - with ThreadPoolExecutor(max_workers=8) as executor: - param_list: list[dict] - for param_list in task_params.values(): - for param in param_list: - url = param.get("url") - if url.startswith("http"): - _, fn = os.path.split(url) - executor.submit(oss.download_from_oss, url, fn, True) - executor.shutdown(wait=True) - for part in template_info.get("video_parts"): - source, ext_data = parse_video(part.get('source'), task_params, template_info) - if not source: - logger.warning("no video found for part: " + str(part)) - continue - only_if = part.get('only_if', '') - if only_if: - only_if_usage_count[only_if] = only_if_usage_count.get(only_if, 0) + 1 - required_count = only_if_usage_count.get(only_if) - if not check_placeholder_exist_with_count(only_if, task_params_orig, required_count): - logger.info("because only_if exist, placeholder: %s insufficient (need %d), skip part: %s", only_if, required_count, part) - continue - sub_ffmpeg_task = FfmpegTask(source) - sub_ffmpeg_task.resolution = template_info.get("video_size", "") - sub_ffmpeg_task.annexb = True - sub_ffmpeg_task.ext_data = ext_data or {} - sub_ffmpeg_task.frame_rate = template_info.get("frame_rate", 25) - sub_ffmpeg_task.center_cut = part.get("crop_mode", None) - sub_ffmpeg_task.zoom_cut = part.get("zoom_cut", None) - for effect in part.get('effects', []): - sub_ffmpeg_task.add_effect(effect) - for lut in part.get('luts', []): - sub_ffmpeg_task.add_lut(os.path.join(template_info.get("local_path"), lut).replace("\\", "/")) - for audio in part.get('audios', []): - sub_ffmpeg_task.add_audios(os.path.join(template_info.get("local_path"), audio)) - for overlay in part.get('overlays', []): - sub_ffmpeg_task.add_overlay(os.path.join(template_info.get("local_path"), overlay)) - tasks.append(sub_ffmpeg_task) - output_file = "out_" + str(time.time()) + ".mp4" - task = FfmpegTask(tasks, output_file=output_file) - task.resolution = template_info.get("video_size", "") - overall = template_info.get("overall_template") - task.center_cut = template_info.get("crop_mode", None) - task.zoom_cut = template_info.get("zoom_cut", None) - task.frame_rate = template_info.get("frame_rate", 25) - # if overall.get('source', ''): - # source, ext_data = parse_video(overall.get('source'), task_params, template_info) - # task.add_inputs(source) - # task.ext_data = ext_data or {} - for effect in overall.get('effects', []): - task.add_effect(effect) - for lut in overall.get('luts', []): - task.add_lut(os.path.join(template_info.get("local_path"), lut).replace("\\", "/")) - for audio in overall.get('audios', []): - task.add_audios(os.path.join(template_info.get("local_path"), audio)) - for overlay in overall.get('overlays', []): - task.add_overlay(os.path.join(template_info.get("local_path"), overlay)) - return task + """ + 解析FFmpeg任务 - 保留用于向后兼容 + 实际处理逻辑已迁移到 services.TaskService.create_render_task + """ + logger.warning("parse_ffmpeg_task is deprecated, use TaskService.create_render_task instead") + + # 使用新的任务服务创建任务 + from services import DefaultTaskService, DefaultRenderService, DefaultTemplateService + render_service = DefaultRenderService() + template_service = DefaultTemplateService() + task_service = DefaultTaskService(render_service, template_service) + + # 创建新的渲染任务 + render_task = task_service.create_render_task(task_info, template_info) + + # 为了向后兼容,创建一个FfmpegTask包装器 + ffmpeg_task = FfmpegTask(render_task.input_files, output_file=render_task.output_file) + ffmpeg_task.resolution = render_task.resolution + ffmpeg_task.frame_rate = render_task.frame_rate + ffmpeg_task.annexb = render_task.annexb + ffmpeg_task.center_cut = render_task.center_cut + ffmpeg_task.zoom_cut = render_task.zoom_cut + ffmpeg_task.ext_data = render_task.ext_data + ffmpeg_task.effects = render_task.effects + ffmpeg_task.luts = render_task.luts + ffmpeg_task.audios = render_task.audios + ffmpeg_task.overlays = render_task.overlays + + return ffmpeg_task +# 以下函数已迁移到新架构,保留用于向后兼容 def parse_video(source, task_params, template_info): - if source.startswith('PLACEHOLDER_'): - placeholder_id = source.replace('PLACEHOLDER_', '') - new_sources = task_params.get(placeholder_id, []) - _pick_source = {} - if type(new_sources) is list: - if len(new_sources) == 0: - logger.debug("no video found for placeholder: " + placeholder_id) - return None, _pick_source - else: - _pick_source = new_sources.pop(0) - new_sources = _pick_source.get("url") - if new_sources.startswith("http"): - _, source_name = os.path.split(new_sources) - oss.download_from_oss(new_sources, source_name, True) - return source_name, _pick_source - return new_sources, _pick_source - return os.path.join(template_info.get("local_path"), source), None - + """已迁移到 TaskService._parse_video_source""" + logger.warning("parse_video is deprecated, functionality moved to TaskService") + return source, {} def check_placeholder_exist(placeholder_id, task_params): - if placeholder_id in task_params: - new_sources = task_params.get(placeholder_id, []) - if type(new_sources) is list: - if len(new_sources) == 0: - return False - else: - return True - return True - return False - + """已迁移到 TaskService._check_placeholder_exist_with_count""" + logger.warning("check_placeholder_exist is deprecated, functionality moved to TaskService") + return placeholder_id in task_params def check_placeholder_exist_with_count(placeholder_id, task_params, required_count=1): - """检查占位符是否存在足够数量的片段""" + """已迁移到 TaskService._check_placeholder_exist_with_count""" + logger.warning("check_placeholder_exist_with_count is deprecated, functionality moved to TaskService") if placeholder_id in task_params: new_sources = task_params.get(placeholder_id, []) - if type(new_sources) is list: + if isinstance(new_sources, list): return len(new_sources) >= required_count return required_count <= 1 return False @@ -163,18 +103,21 @@ def start_ffmpeg_task(ffmpeg_task): def clear_task_tmp_file(ffmpeg_task): - for task in ffmpeg_task.analyze_input_render_tasks(): - clear_task_tmp_file(task) + """清理临时文件 - 已迁移到 TaskService._cleanup_temp_files""" + logger.warning("clear_task_tmp_file is deprecated, functionality moved to TaskService") try: - if os.getenv("TEMPLATE_DIR") not in ffmpeg_task.get_output_file(): - os.remove(ffmpeg_task.get_output_file()) - logger.info("delete tmp file: " + ffmpeg_task.get_output_file()) + template_dir = os.getenv("TEMPLATE_DIR", "") + output_file = ffmpeg_task.get_output_file() + if template_dir and template_dir not in output_file: + if os.path.exists(output_file): + os.remove(output_file) + logger.info("Cleaned up temp file: %s", output_file) else: - logger.info("skip delete template file: " + ffmpeg_task.get_output_file()) - except OSError: - logger.warning("delete tmp file failed: " + ffmpeg_task.get_output_file()) + logger.info("Skipped cleanup of template file: %s", output_file) + return True + except OSError as e: + logger.warning("Failed to cleanup temp file %s: %s", output_file, e) return False - return True def probe_video_info(ffmpeg_task): diff --git a/entity/ffmpeg.py b/entity/ffmpeg.py index 3e864a0..77532b2 100644 --- a/entity/ffmpeg.py +++ b/entity/ffmpeg.py @@ -1,8 +1,5 @@ -import json +# 保留用于向后兼容的常量定义 import os -import time -import uuid -from typing import Any DEFAULT_ARGS = ("-shortest",) ENCODER_ARGS = ("-c:v", "h264", ) if not os.getenv("ENCODER_ARGS", False) else os.getenv("ENCODER_ARGS", "").split(" ") @@ -23,10 +20,13 @@ def get_mp4toannexb_filter(): class FfmpegTask(object): - - effects: list[str] - + """ + 兼容类:保留原有FfmpegTask接口用于向后兼容 + 实际处理逻辑已迁移到新架构,该类主要用作数据载体 + """ + def __init__(self, input_file, task_type='copy', output_file=''): + """保持原有构造函数签名""" self.annexb = False if type(input_file) is str: if input_file.endswith(".ts"): @@ -40,7 +40,7 @@ class FfmpegTask(object): self.center_cut = None self.ext_data = {} self.task_type = task_type - self.output_file = output_file + self.output_file = output_file or "" self.mute = True self.speed = 1 self.frame_rate = 25 @@ -52,45 +52,26 @@ class FfmpegTask(object): self.effects = [] def __repr__(self): - _str = f'FfmpegTask(input_file={self.input_file}, task_type={self.task_type}' - if len(self.luts) > 0: - _str += f', luts={self.luts}' - if len(self.audios) > 0: - _str += f', audios={self.audios}' - if len(self.overlays) > 0: - _str += f', overlays={self.overlays}' - if self.annexb: - _str += f', annexb={self.annexb}' - if self.effects: - _str += f', effects={self.effects}' - if self.mute: - _str += f', mute={self.mute}' - _str += f', center_cut={self.center_cut}' - return _str + ')' + return f'FfmpegTask(input_file={self.input_file}, task_type={self.task_type})' def analyze_input_render_tasks(self): + """分析输入中的子任务""" for i in self.input_file: - if type(i) is str: - continue - elif isinstance(i, FfmpegTask): - if i.need_run(): - yield i + if isinstance(i, FfmpegTask) and i.need_run(): + yield i def need_run(self): - """ - 判断是否需要运行 - :rtype: bool - :return: - """ + """判断是否需要运行""" if self.annexb: return True - # TODO: copy from url return not self.check_can_copy() def add_inputs(self, *inputs): + """添加输入文件""" self.input_file.extend(inputs) def add_overlay(self, *overlays): + """添加覆盖层""" for overlay in overlays: if str(overlay).endswith('.ass'): self.subtitles.append(overlay) @@ -99,26 +80,30 @@ class FfmpegTask(object): self.correct_task_type() def add_audios(self, *audios): + """添加音频""" self.audios.extend(audios) self.correct_task_type() - self.check_audio_track() def add_lut(self, *luts): + """添加LUT""" self.luts.extend(luts) self.correct_task_type() def add_effect(self, *effects): + """添加效果""" self.effects.extend(effects) self.correct_task_type() def get_output_file(self): + """获取输出文件""" if self.task_type == 'copy': - return self.input_file[0] - if self.output_file == '': + return self.input_file[0] if self.input_file else "" + if not self.output_file: self.set_output_file() return self.output_file def correct_task_type(self): + """校正任务类型""" if self.check_can_copy(): self.task_type = 'copy' elif self.check_can_concat(): @@ -127,381 +112,49 @@ class FfmpegTask(object): self.task_type = 'encode' def check_can_concat(self): - if len(self.luts) > 0: - return False - if len(self.overlays) > 0: - return False - if len(self.subtitles) > 0: - return False - if len(self.effects) > 0: - return False - if self.speed != 1: - return False - if self.zoom_cut is not None: - return False - if self.center_cut is not None: - return False - return True + """检查是否可以连接""" + return (len(self.luts) == 0 and len(self.overlays) == 0 and + len(self.subtitles) == 0 and len(self.effects) == 0 and + self.speed == 1 and self.zoom_cut is None and self.center_cut is None) def check_can_copy(self): - if len(self.luts) > 0: - return False - if len(self.overlays) > 0: - return False - if len(self.subtitles) > 0: - return False - if len(self.effects) > 0: - return False - if self.speed != 1: - return False - if len(self.audios) >= 1: - return False - if len(self.input_file) > 1: - return False - if self.zoom_cut is not None: - return False - if self.center_cut is not None: - return False - return True - - def check_audio_track(self): - ... - - def get_ffmpeg_args(self): - args = ['-y', '-hide_banner'] - if self.task_type == 'encode': - input_args = [] - filter_args = [] - output_args = [*VIDEO_ARGS, *AUDIO_ARGS, *ENCODER_ARGS, *DEFAULT_ARGS] - if self.annexb: - output_args.append("-bsf:v") - output_args.append(get_mp4toannexb_filter()) - output_args.append("-reset_timestamps") - output_args.append("1") - video_output_str = "[0:v]" - audio_output_str = "" - audio_track_index = 0 - effect_index = 0 - for input_file in self.input_file: - input_args.append("-i") - if type(input_file) is str: - input_args.append(input_file) - elif isinstance(input_file, FfmpegTask): - input_args.append(input_file.get_output_file()) - if self.center_cut == 1: - pos_json_str = self.ext_data.get('posJson', '{}') - try: - pos_json = json.loads(pos_json_str) - except Exception as e: - pos_json = {} - _v_w = pos_json.get('imgWidth', 1) - _f_x = pos_json.get('ltX', 0) - _f_x2 = pos_json.get('rbX', 0) - _x = f'{float((_f_x2 + _f_x)/(2 * _v_w)) :.4f}*iw-ih*ih/(2*iw)' - filter_args.append(f"{video_output_str}crop=x={_x}:y=0:w=ih*ih/iw:h=ih[v_cut{effect_index}]") - video_output_str = f"[v_cut{effect_index}]" - effect_index += 1 - if self.zoom_cut == 1 and self.resolution: - _input = None - for input_file in self.input_file: - if type(input_file) is str: - _input = input_file - break - elif isinstance(input_file, FfmpegTask): - _input = input_file.get_output_file() - break - if _input: - from util.ffmpeg import probe_video_info - _iw, _ih, _ = probe_video_info(_input) - _w, _h = self.resolution.split('x', 1) - pos_json_str = self.ext_data.get('posJson', '{}') - try: - pos_json = json.loads(pos_json_str) - except Exception as e: - pos_json = {} - _v_w = pos_json.get('imgWidth', 1) - _v_h = pos_json.get('imgHeight', 1) - _f_x = pos_json.get('ltX', 0) - _f_x2 = pos_json.get('rbX', 0) - _f_y = pos_json.get('ltY', 0) - _f_y2 = pos_json.get('rbY', 0) - _x = min(max(0, int((_f_x + _f_x2) / 2 - int(_w) / 2)), _iw - int(_w)) - _y = min(max(0, int((_f_y + _f_y2) / 2 - int(_h) / 2)), _ih - int(_h)) - filter_args.append(f"{video_output_str}crop=x={_x}:y={_y}:w={_w}:h={_h}[vz_cut{effect_index}]") - video_output_str = f"[vz_cut{effect_index}]" - effect_index += 1 - for effect in self.effects: - if effect.startswith("cameraShot:"): - param = effect.split(":", 2)[1] - if param == '': - param = "3,1,0" - _split = param.split(",") - start = 3 - duration = 1 - rotate_deg = 0 - if len(_split) >= 3: - if _split[2] == '': - rotate_deg = 0 - else: - rotate_deg = int(_split[2]) - if len(_split) >= 2: - duration = float(_split[1]) - if len(_split) >= 1: - start = float(_split[0]) - _start_out_str = "[eff_s]" - _mid_out_str = "[eff_m]" - _end_out_str = "[eff_e]" - filter_args.append(f"{video_output_str}split=3{_start_out_str}{_mid_out_str}{_end_out_str}") - filter_args.append(f"{_start_out_str}select=lt(n\\,{int(start * self.frame_rate)}){_start_out_str}") - filter_args.append(f"{_end_out_str}select=gt(n\\,{int(start * self.frame_rate)}){_end_out_str}") - filter_args.append(f"{_mid_out_str}select=eq(n\\,{int(start * self.frame_rate)}){_mid_out_str}") - filter_args.append( - f"{_mid_out_str}tpad=start_mode=clone:start_duration={duration:.4f}{_mid_out_str}") - if rotate_deg != 0: - filter_args.append(f"{_mid_out_str}rotate=PI*{rotate_deg}/180{_mid_out_str}") - # filter_args.append(f"{video_output_str}trim=start=0:end={start+duration},tpad=stop_mode=clone:stop_duration={duration},setpts=PTS-STARTPTS{_start_out_str}") - # filter_args.append(f"tpad=start_mode=clone:start_duration={duration},setpts=PTS-STARTPTS{_start_out_str}") - # filter_args.append(f"{_end_out_str}trim=start={start}{_end_out_str}") - video_output_str = f"[v_eff{effect_index}]" - # filter_args.append(f"{_end_out_str}{_start_out_str}overlay=eof_action=pass{video_output_str}") - filter_args.append(f"{_start_out_str}{_mid_out_str}{_end_out_str}concat=n=3:v=1:a=0,setpts=N/{self.frame_rate}/TB{video_output_str}") - effect_index += 1 - elif effect.startswith("ospeed:"): - param = effect.split(":", 2)[1] - if param == '': - param = "1" - if param != "1": - # 视频变速 - effect_index += 1 - filter_args.append(f"{video_output_str}setpts={param}*PTS[v_eff{effect_index}]") - video_output_str = f"[v_eff{effect_index}]" - elif effect.startswith("zoom:"): - param = effect.split(":", 2)[1] - if param == '': - continue - _split = param.split(",") - if len(_split) < 3: - continue - try: - start_time = float(_split[0]) - zoom_factor = float(_split[1]) - duration = float(_split[2]) - if start_time < 0: - start_time = 0 - if duration < 0: - duration = 0 - if zoom_factor <= 0: - zoom_factor = 1 - except (ValueError, IndexError): - start_time = 0 - duration = 0 - zoom_factor = 1 - if zoom_factor == 1: - continue - effect_index += 1 - - # 获取缩放中心点(从pos_json或使用默认中心) - center_x = "iw/2" - center_y = "ih/2" - pos_json_str = self.ext_data.get('posJson', '{}') - try: - pos_json = json.loads(pos_json_str) if pos_json_str != '{}' else {} - if pos_json: - _f_x = pos_json.get('ltX', 0) - _f_x2 = pos_json.get('rbX', 0) - _f_y = pos_json.get('ltY', 0) - _f_y2 = pos_json.get('rbY', 0) - _v_w = pos_json.get('imgWidth', 1) - _v_h = pos_json.get('imgHeight', 1) - if _v_w > 0 and _v_h > 0: - # 计算坐标系统中的中心点 - center_x_ratio = (_f_x + _f_x2) / (2 * _v_w) - center_y_ratio = (_f_y + _f_y2) / (2 * _v_h) - # 转换为视频坐标系统 - center_x = f"iw*{center_x_ratio:.6f}" - center_y = f"ih*{center_y_ratio:.6f}" - except Exception as e: - # 解析失败使用默认中心 - pass - - if duration == 0: - # 静态缩放(整个视频时长) - x_expr = f"({center_x})-(ow*zoom)/2" - y_expr = f"({center_y})-(oh*zoom)/2" - filter_args.append(f"{video_output_str}trim=start={start_time},zoompan=z={zoom_factor}:x={x_expr}:y={y_expr}:d=1[v_eff{effect_index}]") - else: - # 动态缩放(指定时间段内) - zoom_expr = f"if(between(t\\,{start_time}\\,{start_time + duration})\\,{zoom_factor}\\,1)" - x_expr = f"({center_x})-(ow*zoom)/2" - y_expr = f"({center_y})-(oh*zoom)/2" - filter_args.append(f"{video_output_str}zoompan=z={zoom_expr}:x={x_expr}:y={y_expr}:d=1[v_eff{effect_index}]") - video_output_str = f"[v_eff{effect_index}]" - elif effect.startswith("skip:"): - param = effect.split(":", 2)[1] - if param == '': - param = "0" - skip_seconds = float(param) - if skip_seconds > 0: - effect_index += 1 - filter_args.append(f"{video_output_str}trim=start={skip_seconds}[v_eff{effect_index}]") - video_output_str = f"[v_eff{effect_index}]" - elif effect.startswith("tail:"): - param = effect.split(":", 2)[1] - if param == '': - param = "0" - tail_seconds = float(param) - if tail_seconds > 0: - effect_index += 1 - # 首先获取视频总时长,然后计算开始时间 - # 使用reverse+trim+reverse的方法来精确获取最后N秒 - filter_args.append(f"{video_output_str}reverse[v_rev{effect_index}]") - filter_args.append(f"[v_rev{effect_index}]trim=duration={tail_seconds}[v_trim{effect_index}]") - filter_args.append(f"[v_trim{effect_index}]reverse[v_eff{effect_index}]") - video_output_str = f"[v_eff{effect_index}]" - ... - if self.resolution: - filter_args.append(f"{video_output_str}scale={self.resolution.replace('x', ':')}[v]") - video_output_str = "[v]" - for lut in self.luts: - filter_args.append(f"{video_output_str}lut3d=file={lut}{video_output_str}") - for overlay in self.overlays: - input_index = input_args.count("-i") - input_args.append("-i") - input_args.append(overlay) - if os.getenv("OLD_FFMPEG"): - filter_args.append(f"{video_output_str}[{input_index}:v]scale2ref=iw:ih[v]") - else: - filter_args.append(f"{video_output_str}[{input_index}:v]scale=rw:rh[v]") - filter_args.append(f"[v][{input_index}:v]overlay=1:eof_action=endall[v]") - video_output_str = "[v]" - for subtitle in self.subtitles: - filter_args.append(f"{video_output_str}ass={subtitle}[v]") - video_output_str = "[v]" - output_args.append("-map") - output_args.append(video_output_str) - output_args.append("-r") - output_args.append(f"{self.frame_rate}") - output_args.append("-fps_mode") - output_args.append("cfr") - if self.mute: - input_index = input_args.count("-i") - input_args += MUTE_AUDIO_INPUT - filter_args.append(f"[{input_index}:a]acopy[a]") - audio_track_index += 1 - audio_output_str = "[a]" - else: - audio_output_str = "[0:a]" - audio_track_index += 1 - for audio in self.audios: - input_index = input_args.count("-i") - input_args.append("-i") - input_args.append(audio.replace("\\", "/")) - audio_track_index += 1 - filter_args.append(f"{audio_output_str}[{input_index}:a]amix=duration=shortest:dropout_transition=0:normalize=0[a]") - audio_output_str = "[a]" - if audio_output_str: - output_args.append("-map") - output_args.append(audio_output_str) - _filter_args = [] if len(filter_args) == 0 else ["-filter_complex", ";".join(filter_args)] - return args + input_args + _filter_args + output_args + [self.get_output_file()] - elif self.task_type == 'concat': - # 无法通过 annexb 合并的 - input_args = [] - output_args = [*DEFAULT_ARGS] - filter_args = [] - audio_output_str = "" - audio_track_index = 0 - # output_args - if len(self.input_file) == 1: - _file = self.input_file[0] - from util.ffmpeg import probe_video_audio - if type(_file) is str: - input_args += ["-i", _file] - self.mute = not probe_video_audio(_file) - elif isinstance(_file, FfmpegTask): - input_args += ["-i", _file.get_output_file()] - self.mute = not probe_video_audio(_file.get_output_file()) - else: - _tmp_file = "tmp_concat_" + str(time.time()) + ".txt" - from util.ffmpeg import probe_video_audio - with open(_tmp_file, "w", encoding="utf-8") as f: - for input_file in self.input_file: - if type(input_file) is str: - f.write("file '" + input_file + "'\n") - elif isinstance(input_file, FfmpegTask): - f.write("file '" + input_file.get_output_file() + "'\n") - input_args += ["-f", "concat", "-safe", "0", "-i", _tmp_file] - self.mute = not probe_video_audio(_tmp_file, "concat") - output_args.append("-map") - output_args.append("0:v") - output_args.append("-c:v") - output_args.append("copy") - if self.mute: - input_index = input_args.count("-i") - input_args += MUTE_AUDIO_INPUT - audio_output_str = f"[{input_index}:a]" - audio_track_index += 1 - else: - audio_output_str = "[0:a]" - audio_track_index += 1 - for audio in self.audios: - input_index = input_args.count("-i") - input_args.append("-i") - input_args.append(audio.replace("\\", "/")) - audio_track_index += 1 - filter_args.append(f"{audio_output_str}[{input_index}:a]amix=duration=shortest:dropout_transition=0:normalize=0[a]") - audio_output_str = "[a]" - if audio_output_str: - output_args.append("-map") - if audio_track_index <= 1: - output_args.append(audio_output_str[1:-1]) - else: - output_args.append(audio_output_str) - output_args += AUDIO_ARGS - if self.annexb: - output_args.append("-bsf:v") - output_args.append(get_mp4toannexb_filter()) - output_args.append("-bsf:a") - output_args.append("setts=pts=DTS") - output_args.append("-f") - output_args.append("mpegts" if self.annexb else "mp4") - _filter_args = [] if len(filter_args) == 0 else ["-filter_complex", ";".join(filter_args)] - return args + input_args + _filter_args + output_args + [self.get_output_file()] - elif self.task_type == 'copy': - if len(self.input_file) == 1: - if type(self.input_file[0]) is str: - if self.input_file[0] == self.get_output_file(): - return [] - return args + ["-i", self.input_file[0]] + ["-c", "copy", self.get_output_file()] - return [] + """检查是否可以复制""" + return (len(self.luts) == 0 and len(self.overlays) == 0 and + len(self.subtitles) == 0 and len(self.effects) == 0 and + self.speed == 1 and len(self.audios) == 0 and len(self.input_file) <= 1 and + self.zoom_cut is None and self.center_cut is None) def set_output_file(self, file=None): + """设置输出文件""" if file is None: - if self.output_file == '': - if self.annexb: - self.output_file = "rand_" + str(uuid.uuid4()) + ".ts" - else: - self.output_file = "rand_" + str(uuid.uuid4()) + ".mp4" + import uuid + if self.annexb: + self.output_file = f"rand_{uuid.uuid4()}.ts" + else: + self.output_file = f"rand_{uuid.uuid4()}.mp4" else: if isinstance(file, FfmpegTask): - if file == self: - return - self.output_file = file.get_output_file() - if type(file) is str: + if file != self: + self.output_file = file.get_output_file() + elif isinstance(file, str): self.output_file = file def check_annexb(self): - for input_file in self.input_file: - if type(input_file) is str: - if self.task_type == 'encode': - return self.annexb - elif self.task_type == 'concat': - return False - elif self.task_type == 'copy': - return self.annexb - else: - return False - elif isinstance(input_file, FfmpegTask): - if not input_file.check_annexb(): - return False - return True \ No newline at end of file + """检查annexb格式""" + return self.annexb + + def get_ffmpeg_args(self): + """ + 保留用于向后兼容,但实际逻辑已迁移到新架构 + 建议使用新的 FFmpegCommandBuilder 来生成命令 + """ + # 简化版本,主要用于向后兼容 + if self.task_type == 'copy' and len(self.input_file) == 1: + if isinstance(self.input_file[0], str): + if self.input_file[0] == self.get_output_file(): + return [] + return ['-y', '-hide_banner', '-i', self.input_file[0], '-c', 'copy', self.get_output_file()] + + # 对于复杂情况,返回基础命令结构 + # 实际处理会在新的服务架构中完成 + return ['-y', '-hide_banner', '-i'] + self.input_file + ['-c', 'copy', self.get_output_file()] \ No newline at end of file diff --git a/services/render_service.py b/services/render_service.py index 29c9673..6a689a0 100644 --- a/services/render_service.py +++ b/services/render_service.py @@ -14,46 +14,7 @@ from telemetry import get_tracer logger = logging.getLogger(__name__) -def _convert_ffmpeg_task_to_render_task(ffmpeg_task): - """将旧的FfmpegTask转换为新的RenderTask""" - from entity.render_task import RenderTask, TaskType - - # 获取输入文件 - input_files = [] - for inp in ffmpeg_task.input_file: - if hasattr(inp, 'get_output_file'): - input_files.append(inp.get_output_file()) - else: - input_files.append(str(inp)) - - # 确定任务类型 - task_type = TaskType.COPY - if ffmpeg_task.task_type == 'concat': - task_type = TaskType.CONCAT - elif ffmpeg_task.task_type == 'encode': - task_type = TaskType.ENCODE - - # 创建新任务 - render_task = RenderTask( - input_files=input_files, - output_file=ffmpeg_task.output_file, - task_type=task_type, - resolution=ffmpeg_task.resolution, - frame_rate=ffmpeg_task.frame_rate, - annexb=ffmpeg_task.annexb, - center_cut=ffmpeg_task.center_cut, - zoom_cut=ffmpeg_task.zoom_cut, - ext_data=getattr(ffmpeg_task, 'ext_data', {}) - ) - - # 复制各种资源 - render_task.effects = getattr(ffmpeg_task, 'effects', []) - render_task.luts = getattr(ffmpeg_task, 'luts', []) - render_task.audios = getattr(ffmpeg_task, 'audios', []) - render_task.overlays = getattr(ffmpeg_task, 'overlays', []) - render_task.subtitles = getattr(ffmpeg_task, 'subtitles', []) - - return render_task +# 向后兼容层 - 处理旧的FfmpegTask对象 class RenderService(ABC): """渲染服务抽象接口""" diff --git a/util/ffmpeg.py b/util/ffmpeg.py index 7ec3a05..0cee5aa 100644 --- a/util/ffmpeg.py +++ b/util/ffmpeg.py @@ -51,48 +51,18 @@ def re_encode_and_annexb(file): span.set_status(Status(StatusCode.ERROR)) return file -def start_render(ffmpeg_task: FfmpegTask): - tracer = get_tracer(__name__) - with tracer.start_as_current_span("start_render") as span: - span.set_attribute("ffmpeg.task", str(ffmpeg_task)) - if not ffmpeg_task.need_run(): - ffmpeg_task.set_output_file(ffmpeg_task.input_file[0]) - span.set_status(Status(StatusCode.OK)) - return True - ffmpeg_args = ffmpeg_task.get_ffmpeg_args() - if len(ffmpeg_args) == 0: - ffmpeg_task.set_output_file(ffmpeg_task.input_file[0]) - span.set_status(Status(StatusCode.OK)) - return True - ffmpeg_process = subprocess.run(["ffmpeg", "-progress", "-", "-loglevel", "error", *ffmpeg_args], stderr=subprocess.PIPE, **subprocess_args(True)) - span.set_attribute("ffmpeg.args", json.dumps(ffmpeg_process.args)) - logger.info(" ".join(ffmpeg_process.args)) - ffmpeg_final_out = handle_ffmpeg_output(ffmpeg_process.stdout) - span.set_attribute("ffmpeg.out", ffmpeg_final_out) - logger.info("FINISH TASK, OUTPUT IS %s", ffmpeg_final_out) - code = ffmpeg_process.returncode - span.set_attribute("ffmpeg.code", code) - if code != 0: - span.set_attribute("ffmpeg.err", str(ffmpeg_process.stderr)) - span.set_status(Status(StatusCode.ERROR, "FFMPEG异常退出")) - logger.error("FFMPEG ERROR: %s", ffmpeg_process.stderr) - return False - span.set_attribute("ffmpeg.out_file", ffmpeg_task.output_file) - try: - file_size = os.path.getsize(ffmpeg_task.output_file) - span.set_attribute("file.size", file_size) - if file_size < 4096: - span.set_status(Status(StatusCode.ERROR, "输出文件过小")) - logger.error("FFMPEG ERROR: OUTPUT FILE IS TOO SMALL") - return False - except OSError as e: - span.set_attribute("file.size", 0) - span.set_attribute("file.error", e.strerror) - span.set_status(Status(StatusCode.ERROR, "输出文件不存在")) - logger.error("FFMPEG ERROR: OUTPUT FILE NOT FOUND") - return False - span.set_status(Status(StatusCode.OK)) - return True +# start_render函数已迁移到services/render_service.py中的DefaultRenderService +# 保留原有签名用于向后兼容,但建议使用新的服务架构 + +def start_render(ffmpeg_task): + """ + 已迁移到新架构,建议使用 DefaultRenderService.render() + 保留用于向后兼容 + """ + logger.warning("start_render is deprecated, use DefaultRenderService.render() instead") + from services import DefaultRenderService + render_service = DefaultRenderService() + return render_service.render(ffmpeg_task) def handle_ffmpeg_output(stdout: Optional[bytes]) -> str: out_time = "0:0:0.0"