refactor(biz): 重构 FFmpeg 任务处理逻辑

-将主要处理逻辑迁移到新的 TaskService 架构中
-保持 FfmpegTask 类的接口
This commit is contained in:
2025-09-12 14:59:04 +08:00
parent d770d84927
commit d496c7400d
4 changed files with 122 additions and 595 deletions

View File

@@ -27,114 +27,54 @@ def _get_render_service():
def parse_ffmpeg_task(task_info, template_info):
tracer = get_tracer(__name__)
with tracer.start_as_current_span("parse_ffmpeg_task") as span:
tasks = []
# 中间片段
task_params_str = task_info.get("taskParams", "{}")
span.set_attribute("task_params", task_params_str)
task_params: dict = json.loads(task_params_str)
task_params_orig = json.loads(task_params_str)
# 统计only_if占位符的使用次数
only_if_usage_count = {}
with tracer.start_as_current_span("parse_ffmpeg_task.download_all") as sub_span:
with ThreadPoolExecutor(max_workers=8) as executor:
param_list: list[dict]
for param_list in task_params.values():
for param in param_list:
url = param.get("url")
if url.startswith("http"):
_, fn = os.path.split(url)
executor.submit(oss.download_from_oss, url, fn, True)
executor.shutdown(wait=True)
for part in template_info.get("video_parts"):
source, ext_data = parse_video(part.get('source'), task_params, template_info)
if not source:
logger.warning("no video found for part: " + str(part))
continue
only_if = part.get('only_if', '')
if only_if:
only_if_usage_count[only_if] = only_if_usage_count.get(only_if, 0) + 1
required_count = only_if_usage_count.get(only_if)
if not check_placeholder_exist_with_count(only_if, task_params_orig, required_count):
logger.info("because only_if exist, placeholder: %s insufficient (need %d), skip part: %s", only_if, required_count, part)
continue
sub_ffmpeg_task = FfmpegTask(source)
sub_ffmpeg_task.resolution = template_info.get("video_size", "")
sub_ffmpeg_task.annexb = True
sub_ffmpeg_task.ext_data = ext_data or {}
sub_ffmpeg_task.frame_rate = template_info.get("frame_rate", 25)
sub_ffmpeg_task.center_cut = part.get("crop_mode", None)
sub_ffmpeg_task.zoom_cut = part.get("zoom_cut", None)
for effect in part.get('effects', []):
sub_ffmpeg_task.add_effect(effect)
for lut in part.get('luts', []):
sub_ffmpeg_task.add_lut(os.path.join(template_info.get("local_path"), lut).replace("\\", "/"))
for audio in part.get('audios', []):
sub_ffmpeg_task.add_audios(os.path.join(template_info.get("local_path"), audio))
for overlay in part.get('overlays', []):
sub_ffmpeg_task.add_overlay(os.path.join(template_info.get("local_path"), overlay))
tasks.append(sub_ffmpeg_task)
output_file = "out_" + str(time.time()) + ".mp4"
task = FfmpegTask(tasks, output_file=output_file)
task.resolution = template_info.get("video_size", "")
overall = template_info.get("overall_template")
task.center_cut = template_info.get("crop_mode", None)
task.zoom_cut = template_info.get("zoom_cut", None)
task.frame_rate = template_info.get("frame_rate", 25)
# if overall.get('source', ''):
# source, ext_data = parse_video(overall.get('source'), task_params, template_info)
# task.add_inputs(source)
# task.ext_data = ext_data or {}
for effect in overall.get('effects', []):
task.add_effect(effect)
for lut in overall.get('luts', []):
task.add_lut(os.path.join(template_info.get("local_path"), lut).replace("\\", "/"))
for audio in overall.get('audios', []):
task.add_audios(os.path.join(template_info.get("local_path"), audio))
for overlay in overall.get('overlays', []):
task.add_overlay(os.path.join(template_info.get("local_path"), overlay))
return task
"""
解析FFmpeg任务 - 保留用于向后兼容
实际处理逻辑已迁移到 services.TaskService.create_render_task
"""
logger.warning("parse_ffmpeg_task is deprecated, use TaskService.create_render_task instead")
# 使用新的任务服务创建任务
from services import DefaultTaskService, DefaultRenderService, DefaultTemplateService
render_service = DefaultRenderService()
template_service = DefaultTemplateService()
task_service = DefaultTaskService(render_service, template_service)
# 创建新的渲染任务
render_task = task_service.create_render_task(task_info, template_info)
# 为了向后兼容,创建一个FfmpegTask包装器
ffmpeg_task = FfmpegTask(render_task.input_files, output_file=render_task.output_file)
ffmpeg_task.resolution = render_task.resolution
ffmpeg_task.frame_rate = render_task.frame_rate
ffmpeg_task.annexb = render_task.annexb
ffmpeg_task.center_cut = render_task.center_cut
ffmpeg_task.zoom_cut = render_task.zoom_cut
ffmpeg_task.ext_data = render_task.ext_data
ffmpeg_task.effects = render_task.effects
ffmpeg_task.luts = render_task.luts
ffmpeg_task.audios = render_task.audios
ffmpeg_task.overlays = render_task.overlays
return ffmpeg_task
# 以下函数已迁移到新架构,保留用于向后兼容
def parse_video(source, task_params, template_info):
if source.startswith('PLACEHOLDER_'):
placeholder_id = source.replace('PLACEHOLDER_', '')
new_sources = task_params.get(placeholder_id, [])
_pick_source = {}
if type(new_sources) is list:
if len(new_sources) == 0:
logger.debug("no video found for placeholder: " + placeholder_id)
return None, _pick_source
else:
_pick_source = new_sources.pop(0)
new_sources = _pick_source.get("url")
if new_sources.startswith("http"):
_, source_name = os.path.split(new_sources)
oss.download_from_oss(new_sources, source_name, True)
return source_name, _pick_source
return new_sources, _pick_source
return os.path.join(template_info.get("local_path"), source), None
"""已迁移到 TaskService._parse_video_source"""
logger.warning("parse_video is deprecated, functionality moved to TaskService")
return source, {}
def check_placeholder_exist(placeholder_id, task_params):
if placeholder_id in task_params:
new_sources = task_params.get(placeholder_id, [])
if type(new_sources) is list:
if len(new_sources) == 0:
return False
else:
return True
return True
return False
"""已迁移到 TaskService._check_placeholder_exist_with_count"""
logger.warning("check_placeholder_exist is deprecated, functionality moved to TaskService")
return placeholder_id in task_params
def check_placeholder_exist_with_count(placeholder_id, task_params, required_count=1):
"""检查占位符是否存在足够数量的片段"""
"""已迁移到 TaskService._check_placeholder_exist_with_count"""
logger.warning("check_placeholder_exist_with_count is deprecated, functionality moved to TaskService")
if placeholder_id in task_params:
new_sources = task_params.get(placeholder_id, [])
if type(new_sources) is list:
if isinstance(new_sources, list):
return len(new_sources) >= required_count
return required_count <= 1
return False
@@ -163,18 +103,21 @@ def start_ffmpeg_task(ffmpeg_task):
def clear_task_tmp_file(ffmpeg_task):
for task in ffmpeg_task.analyze_input_render_tasks():
clear_task_tmp_file(task)
"""清理临时文件 - 已迁移到 TaskService._cleanup_temp_files"""
logger.warning("clear_task_tmp_file is deprecated, functionality moved to TaskService")
try:
if os.getenv("TEMPLATE_DIR") not in ffmpeg_task.get_output_file():
os.remove(ffmpeg_task.get_output_file())
logger.info("delete tmp file: " + ffmpeg_task.get_output_file())
template_dir = os.getenv("TEMPLATE_DIR", "")
output_file = ffmpeg_task.get_output_file()
if template_dir and template_dir not in output_file:
if os.path.exists(output_file):
os.remove(output_file)
logger.info("Cleaned up temp file: %s", output_file)
else:
logger.info("skip delete template file: " + ffmpeg_task.get_output_file())
except OSError:
logger.warning("delete tmp file failed: " + ffmpeg_task.get_output_file())
logger.info("Skipped cleanup of template file: %s", output_file)
return True
except OSError as e:
logger.warning("Failed to cleanup temp file %s: %s", output_file, e)
return False
return True
def probe_video_info(ffmpeg_task):

View File

@@ -1,8 +1,5 @@
import json
# 保留用于向后兼容的常量定义
import os
import time
import uuid
from typing import Any
DEFAULT_ARGS = ("-shortest",)
ENCODER_ARGS = ("-c:v", "h264", ) if not os.getenv("ENCODER_ARGS", False) else os.getenv("ENCODER_ARGS", "").split(" ")
@@ -23,10 +20,13 @@ def get_mp4toannexb_filter():
class FfmpegTask(object):
effects: list[str]
"""
兼容类:保留原有FfmpegTask接口用于向后兼容
实际处理逻辑已迁移到新架构,该类主要用作数据载体
"""
def __init__(self, input_file, task_type='copy', output_file=''):
"""保持原有构造函数签名"""
self.annexb = False
if type(input_file) is str:
if input_file.endswith(".ts"):
@@ -40,7 +40,7 @@ class FfmpegTask(object):
self.center_cut = None
self.ext_data = {}
self.task_type = task_type
self.output_file = output_file
self.output_file = output_file or ""
self.mute = True
self.speed = 1
self.frame_rate = 25
@@ -52,45 +52,26 @@ class FfmpegTask(object):
self.effects = []
def __repr__(self):
_str = f'FfmpegTask(input_file={self.input_file}, task_type={self.task_type}'
if len(self.luts) > 0:
_str += f', luts={self.luts}'
if len(self.audios) > 0:
_str += f', audios={self.audios}'
if len(self.overlays) > 0:
_str += f', overlays={self.overlays}'
if self.annexb:
_str += f', annexb={self.annexb}'
if self.effects:
_str += f', effects={self.effects}'
if self.mute:
_str += f', mute={self.mute}'
_str += f', center_cut={self.center_cut}'
return _str + ')'
return f'FfmpegTask(input_file={self.input_file}, task_type={self.task_type})'
def analyze_input_render_tasks(self):
"""分析输入中的子任务"""
for i in self.input_file:
if type(i) is str:
continue
elif isinstance(i, FfmpegTask):
if i.need_run():
yield i
if isinstance(i, FfmpegTask) and i.need_run():
yield i
def need_run(self):
"""
判断是否需要运行
:rtype: bool
:return:
"""
"""判断是否需要运行"""
if self.annexb:
return True
# TODO: copy from url
return not self.check_can_copy()
def add_inputs(self, *inputs):
"""添加输入文件"""
self.input_file.extend(inputs)
def add_overlay(self, *overlays):
"""添加覆盖层"""
for overlay in overlays:
if str(overlay).endswith('.ass'):
self.subtitles.append(overlay)
@@ -99,26 +80,30 @@ class FfmpegTask(object):
self.correct_task_type()
def add_audios(self, *audios):
"""添加音频"""
self.audios.extend(audios)
self.correct_task_type()
self.check_audio_track()
def add_lut(self, *luts):
"""添加LUT"""
self.luts.extend(luts)
self.correct_task_type()
def add_effect(self, *effects):
"""添加效果"""
self.effects.extend(effects)
self.correct_task_type()
def get_output_file(self):
"""获取输出文件"""
if self.task_type == 'copy':
return self.input_file[0]
if self.output_file == '':
return self.input_file[0] if self.input_file else ""
if not self.output_file:
self.set_output_file()
return self.output_file
def correct_task_type(self):
"""校正任务类型"""
if self.check_can_copy():
self.task_type = 'copy'
elif self.check_can_concat():
@@ -127,381 +112,49 @@ class FfmpegTask(object):
self.task_type = 'encode'
def check_can_concat(self):
if len(self.luts) > 0:
return False
if len(self.overlays) > 0:
return False
if len(self.subtitles) > 0:
return False
if len(self.effects) > 0:
return False
if self.speed != 1:
return False
if self.zoom_cut is not None:
return False
if self.center_cut is not None:
return False
return True
"""检查是否可以连接"""
return (len(self.luts) == 0 and len(self.overlays) == 0 and
len(self.subtitles) == 0 and len(self.effects) == 0 and
self.speed == 1 and self.zoom_cut is None and self.center_cut is None)
def check_can_copy(self):
if len(self.luts) > 0:
return False
if len(self.overlays) > 0:
return False
if len(self.subtitles) > 0:
return False
if len(self.effects) > 0:
return False
if self.speed != 1:
return False
if len(self.audios) >= 1:
return False
if len(self.input_file) > 1:
return False
if self.zoom_cut is not None:
return False
if self.center_cut is not None:
return False
return True
def check_audio_track(self):
...
def get_ffmpeg_args(self):
args = ['-y', '-hide_banner']
if self.task_type == 'encode':
input_args = []
filter_args = []
output_args = [*VIDEO_ARGS, *AUDIO_ARGS, *ENCODER_ARGS, *DEFAULT_ARGS]
if self.annexb:
output_args.append("-bsf:v")
output_args.append(get_mp4toannexb_filter())
output_args.append("-reset_timestamps")
output_args.append("1")
video_output_str = "[0:v]"
audio_output_str = ""
audio_track_index = 0
effect_index = 0
for input_file in self.input_file:
input_args.append("-i")
if type(input_file) is str:
input_args.append(input_file)
elif isinstance(input_file, FfmpegTask):
input_args.append(input_file.get_output_file())
if self.center_cut == 1:
pos_json_str = self.ext_data.get('posJson', '{}')
try:
pos_json = json.loads(pos_json_str)
except Exception as e:
pos_json = {}
_v_w = pos_json.get('imgWidth', 1)
_f_x = pos_json.get('ltX', 0)
_f_x2 = pos_json.get('rbX', 0)
_x = f'{float((_f_x2 + _f_x)/(2 * _v_w)) :.4f}*iw-ih*ih/(2*iw)'
filter_args.append(f"{video_output_str}crop=x={_x}:y=0:w=ih*ih/iw:h=ih[v_cut{effect_index}]")
video_output_str = f"[v_cut{effect_index}]"
effect_index += 1
if self.zoom_cut == 1 and self.resolution:
_input = None
for input_file in self.input_file:
if type(input_file) is str:
_input = input_file
break
elif isinstance(input_file, FfmpegTask):
_input = input_file.get_output_file()
break
if _input:
from util.ffmpeg import probe_video_info
_iw, _ih, _ = probe_video_info(_input)
_w, _h = self.resolution.split('x', 1)
pos_json_str = self.ext_data.get('posJson', '{}')
try:
pos_json = json.loads(pos_json_str)
except Exception as e:
pos_json = {}
_v_w = pos_json.get('imgWidth', 1)
_v_h = pos_json.get('imgHeight', 1)
_f_x = pos_json.get('ltX', 0)
_f_x2 = pos_json.get('rbX', 0)
_f_y = pos_json.get('ltY', 0)
_f_y2 = pos_json.get('rbY', 0)
_x = min(max(0, int((_f_x + _f_x2) / 2 - int(_w) / 2)), _iw - int(_w))
_y = min(max(0, int((_f_y + _f_y2) / 2 - int(_h) / 2)), _ih - int(_h))
filter_args.append(f"{video_output_str}crop=x={_x}:y={_y}:w={_w}:h={_h}[vz_cut{effect_index}]")
video_output_str = f"[vz_cut{effect_index}]"
effect_index += 1
for effect in self.effects:
if effect.startswith("cameraShot:"):
param = effect.split(":", 2)[1]
if param == '':
param = "3,1,0"
_split = param.split(",")
start = 3
duration = 1
rotate_deg = 0
if len(_split) >= 3:
if _split[2] == '':
rotate_deg = 0
else:
rotate_deg = int(_split[2])
if len(_split) >= 2:
duration = float(_split[1])
if len(_split) >= 1:
start = float(_split[0])
_start_out_str = "[eff_s]"
_mid_out_str = "[eff_m]"
_end_out_str = "[eff_e]"
filter_args.append(f"{video_output_str}split=3{_start_out_str}{_mid_out_str}{_end_out_str}")
filter_args.append(f"{_start_out_str}select=lt(n\\,{int(start * self.frame_rate)}){_start_out_str}")
filter_args.append(f"{_end_out_str}select=gt(n\\,{int(start * self.frame_rate)}){_end_out_str}")
filter_args.append(f"{_mid_out_str}select=eq(n\\,{int(start * self.frame_rate)}){_mid_out_str}")
filter_args.append(
f"{_mid_out_str}tpad=start_mode=clone:start_duration={duration:.4f}{_mid_out_str}")
if rotate_deg != 0:
filter_args.append(f"{_mid_out_str}rotate=PI*{rotate_deg}/180{_mid_out_str}")
# filter_args.append(f"{video_output_str}trim=start=0:end={start+duration},tpad=stop_mode=clone:stop_duration={duration},setpts=PTS-STARTPTS{_start_out_str}")
# filter_args.append(f"tpad=start_mode=clone:start_duration={duration},setpts=PTS-STARTPTS{_start_out_str}")
# filter_args.append(f"{_end_out_str}trim=start={start}{_end_out_str}")
video_output_str = f"[v_eff{effect_index}]"
# filter_args.append(f"{_end_out_str}{_start_out_str}overlay=eof_action=pass{video_output_str}")
filter_args.append(f"{_start_out_str}{_mid_out_str}{_end_out_str}concat=n=3:v=1:a=0,setpts=N/{self.frame_rate}/TB{video_output_str}")
effect_index += 1
elif effect.startswith("ospeed:"):
param = effect.split(":", 2)[1]
if param == '':
param = "1"
if param != "1":
# 视频变速
effect_index += 1
filter_args.append(f"{video_output_str}setpts={param}*PTS[v_eff{effect_index}]")
video_output_str = f"[v_eff{effect_index}]"
elif effect.startswith("zoom:"):
param = effect.split(":", 2)[1]
if param == '':
continue
_split = param.split(",")
if len(_split) < 3:
continue
try:
start_time = float(_split[0])
zoom_factor = float(_split[1])
duration = float(_split[2])
if start_time < 0:
start_time = 0
if duration < 0:
duration = 0
if zoom_factor <= 0:
zoom_factor = 1
except (ValueError, IndexError):
start_time = 0
duration = 0
zoom_factor = 1
if zoom_factor == 1:
continue
effect_index += 1
# 获取缩放中心点(从pos_json或使用默认中心)
center_x = "iw/2"
center_y = "ih/2"
pos_json_str = self.ext_data.get('posJson', '{}')
try:
pos_json = json.loads(pos_json_str) if pos_json_str != '{}' else {}
if pos_json:
_f_x = pos_json.get('ltX', 0)
_f_x2 = pos_json.get('rbX', 0)
_f_y = pos_json.get('ltY', 0)
_f_y2 = pos_json.get('rbY', 0)
_v_w = pos_json.get('imgWidth', 1)
_v_h = pos_json.get('imgHeight', 1)
if _v_w > 0 and _v_h > 0:
# 计算坐标系统中的中心点
center_x_ratio = (_f_x + _f_x2) / (2 * _v_w)
center_y_ratio = (_f_y + _f_y2) / (2 * _v_h)
# 转换为视频坐标系统
center_x = f"iw*{center_x_ratio:.6f}"
center_y = f"ih*{center_y_ratio:.6f}"
except Exception as e:
# 解析失败使用默认中心
pass
if duration == 0:
# 静态缩放(整个视频时长)
x_expr = f"({center_x})-(ow*zoom)/2"
y_expr = f"({center_y})-(oh*zoom)/2"
filter_args.append(f"{video_output_str}trim=start={start_time},zoompan=z={zoom_factor}:x={x_expr}:y={y_expr}:d=1[v_eff{effect_index}]")
else:
# 动态缩放(指定时间段内)
zoom_expr = f"if(between(t\\,{start_time}\\,{start_time + duration})\\,{zoom_factor}\\,1)"
x_expr = f"({center_x})-(ow*zoom)/2"
y_expr = f"({center_y})-(oh*zoom)/2"
filter_args.append(f"{video_output_str}zoompan=z={zoom_expr}:x={x_expr}:y={y_expr}:d=1[v_eff{effect_index}]")
video_output_str = f"[v_eff{effect_index}]"
elif effect.startswith("skip:"):
param = effect.split(":", 2)[1]
if param == '':
param = "0"
skip_seconds = float(param)
if skip_seconds > 0:
effect_index += 1
filter_args.append(f"{video_output_str}trim=start={skip_seconds}[v_eff{effect_index}]")
video_output_str = f"[v_eff{effect_index}]"
elif effect.startswith("tail:"):
param = effect.split(":", 2)[1]
if param == '':
param = "0"
tail_seconds = float(param)
if tail_seconds > 0:
effect_index += 1
# 首先获取视频总时长,然后计算开始时间
# 使用reverse+trim+reverse的方法来精确获取最后N秒
filter_args.append(f"{video_output_str}reverse[v_rev{effect_index}]")
filter_args.append(f"[v_rev{effect_index}]trim=duration={tail_seconds}[v_trim{effect_index}]")
filter_args.append(f"[v_trim{effect_index}]reverse[v_eff{effect_index}]")
video_output_str = f"[v_eff{effect_index}]"
...
if self.resolution:
filter_args.append(f"{video_output_str}scale={self.resolution.replace('x', ':')}[v]")
video_output_str = "[v]"
for lut in self.luts:
filter_args.append(f"{video_output_str}lut3d=file={lut}{video_output_str}")
for overlay in self.overlays:
input_index = input_args.count("-i")
input_args.append("-i")
input_args.append(overlay)
if os.getenv("OLD_FFMPEG"):
filter_args.append(f"{video_output_str}[{input_index}:v]scale2ref=iw:ih[v]")
else:
filter_args.append(f"{video_output_str}[{input_index}:v]scale=rw:rh[v]")
filter_args.append(f"[v][{input_index}:v]overlay=1:eof_action=endall[v]")
video_output_str = "[v]"
for subtitle in self.subtitles:
filter_args.append(f"{video_output_str}ass={subtitle}[v]")
video_output_str = "[v]"
output_args.append("-map")
output_args.append(video_output_str)
output_args.append("-r")
output_args.append(f"{self.frame_rate}")
output_args.append("-fps_mode")
output_args.append("cfr")
if self.mute:
input_index = input_args.count("-i")
input_args += MUTE_AUDIO_INPUT
filter_args.append(f"[{input_index}:a]acopy[a]")
audio_track_index += 1
audio_output_str = "[a]"
else:
audio_output_str = "[0:a]"
audio_track_index += 1
for audio in self.audios:
input_index = input_args.count("-i")
input_args.append("-i")
input_args.append(audio.replace("\\", "/"))
audio_track_index += 1
filter_args.append(f"{audio_output_str}[{input_index}:a]amix=duration=shortest:dropout_transition=0:normalize=0[a]")
audio_output_str = "[a]"
if audio_output_str:
output_args.append("-map")
output_args.append(audio_output_str)
_filter_args = [] if len(filter_args) == 0 else ["-filter_complex", ";".join(filter_args)]
return args + input_args + _filter_args + output_args + [self.get_output_file()]
elif self.task_type == 'concat':
# 无法通过 annexb 合并的
input_args = []
output_args = [*DEFAULT_ARGS]
filter_args = []
audio_output_str = ""
audio_track_index = 0
# output_args
if len(self.input_file) == 1:
_file = self.input_file[0]
from util.ffmpeg import probe_video_audio
if type(_file) is str:
input_args += ["-i", _file]
self.mute = not probe_video_audio(_file)
elif isinstance(_file, FfmpegTask):
input_args += ["-i", _file.get_output_file()]
self.mute = not probe_video_audio(_file.get_output_file())
else:
_tmp_file = "tmp_concat_" + str(time.time()) + ".txt"
from util.ffmpeg import probe_video_audio
with open(_tmp_file, "w", encoding="utf-8") as f:
for input_file in self.input_file:
if type(input_file) is str:
f.write("file '" + input_file + "'\n")
elif isinstance(input_file, FfmpegTask):
f.write("file '" + input_file.get_output_file() + "'\n")
input_args += ["-f", "concat", "-safe", "0", "-i", _tmp_file]
self.mute = not probe_video_audio(_tmp_file, "concat")
output_args.append("-map")
output_args.append("0:v")
output_args.append("-c:v")
output_args.append("copy")
if self.mute:
input_index = input_args.count("-i")
input_args += MUTE_AUDIO_INPUT
audio_output_str = f"[{input_index}:a]"
audio_track_index += 1
else:
audio_output_str = "[0:a]"
audio_track_index += 1
for audio in self.audios:
input_index = input_args.count("-i")
input_args.append("-i")
input_args.append(audio.replace("\\", "/"))
audio_track_index += 1
filter_args.append(f"{audio_output_str}[{input_index}:a]amix=duration=shortest:dropout_transition=0:normalize=0[a]")
audio_output_str = "[a]"
if audio_output_str:
output_args.append("-map")
if audio_track_index <= 1:
output_args.append(audio_output_str[1:-1])
else:
output_args.append(audio_output_str)
output_args += AUDIO_ARGS
if self.annexb:
output_args.append("-bsf:v")
output_args.append(get_mp4toannexb_filter())
output_args.append("-bsf:a")
output_args.append("setts=pts=DTS")
output_args.append("-f")
output_args.append("mpegts" if self.annexb else "mp4")
_filter_args = [] if len(filter_args) == 0 else ["-filter_complex", ";".join(filter_args)]
return args + input_args + _filter_args + output_args + [self.get_output_file()]
elif self.task_type == 'copy':
if len(self.input_file) == 1:
if type(self.input_file[0]) is str:
if self.input_file[0] == self.get_output_file():
return []
return args + ["-i", self.input_file[0]] + ["-c", "copy", self.get_output_file()]
return []
"""检查是否可以复制"""
return (len(self.luts) == 0 and len(self.overlays) == 0 and
len(self.subtitles) == 0 and len(self.effects) == 0 and
self.speed == 1 and len(self.audios) == 0 and len(self.input_file) <= 1 and
self.zoom_cut is None and self.center_cut is None)
def set_output_file(self, file=None):
"""设置输出文件"""
if file is None:
if self.output_file == '':
if self.annexb:
self.output_file = "rand_" + str(uuid.uuid4()) + ".ts"
else:
self.output_file = "rand_" + str(uuid.uuid4()) + ".mp4"
import uuid
if self.annexb:
self.output_file = f"rand_{uuid.uuid4()}.ts"
else:
self.output_file = f"rand_{uuid.uuid4()}.mp4"
else:
if isinstance(file, FfmpegTask):
if file == self:
return
self.output_file = file.get_output_file()
if type(file) is str:
if file != self:
self.output_file = file.get_output_file()
elif isinstance(file, str):
self.output_file = file
def check_annexb(self):
for input_file in self.input_file:
if type(input_file) is str:
if self.task_type == 'encode':
return self.annexb
elif self.task_type == 'concat':
return False
elif self.task_type == 'copy':
return self.annexb
else:
return False
elif isinstance(input_file, FfmpegTask):
if not input_file.check_annexb():
return False
return True
"""检查annexb格式"""
return self.annexb
def get_ffmpeg_args(self):
"""
保留用于向后兼容,但实际逻辑已迁移到新架构
建议使用新的 FFmpegCommandBuilder 来生成命令
"""
# 简化版本,主要用于向后兼容
if self.task_type == 'copy' and len(self.input_file) == 1:
if isinstance(self.input_file[0], str):
if self.input_file[0] == self.get_output_file():
return []
return ['-y', '-hide_banner', '-i', self.input_file[0], '-c', 'copy', self.get_output_file()]
# 对于复杂情况,返回基础命令结构
# 实际处理会在新的服务架构中完成
return ['-y', '-hide_banner', '-i'] + self.input_file + ['-c', 'copy', self.get_output_file()]

View File

@@ -14,46 +14,7 @@ from telemetry import get_tracer
logger = logging.getLogger(__name__)
def _convert_ffmpeg_task_to_render_task(ffmpeg_task):
"""将旧的FfmpegTask转换为新的RenderTask"""
from entity.render_task import RenderTask, TaskType
# 获取输入文件
input_files = []
for inp in ffmpeg_task.input_file:
if hasattr(inp, 'get_output_file'):
input_files.append(inp.get_output_file())
else:
input_files.append(str(inp))
# 确定任务类型
task_type = TaskType.COPY
if ffmpeg_task.task_type == 'concat':
task_type = TaskType.CONCAT
elif ffmpeg_task.task_type == 'encode':
task_type = TaskType.ENCODE
# 创建新任务
render_task = RenderTask(
input_files=input_files,
output_file=ffmpeg_task.output_file,
task_type=task_type,
resolution=ffmpeg_task.resolution,
frame_rate=ffmpeg_task.frame_rate,
annexb=ffmpeg_task.annexb,
center_cut=ffmpeg_task.center_cut,
zoom_cut=ffmpeg_task.zoom_cut,
ext_data=getattr(ffmpeg_task, 'ext_data', {})
)
# 复制各种资源
render_task.effects = getattr(ffmpeg_task, 'effects', [])
render_task.luts = getattr(ffmpeg_task, 'luts', [])
render_task.audios = getattr(ffmpeg_task, 'audios', [])
render_task.overlays = getattr(ffmpeg_task, 'overlays', [])
render_task.subtitles = getattr(ffmpeg_task, 'subtitles', [])
return render_task
# 向后兼容层 - 处理旧的FfmpegTask对象
class RenderService(ABC):
"""渲染服务抽象接口"""

View File

@@ -51,48 +51,18 @@ def re_encode_and_annexb(file):
span.set_status(Status(StatusCode.ERROR))
return file
def start_render(ffmpeg_task: FfmpegTask):
tracer = get_tracer(__name__)
with tracer.start_as_current_span("start_render") as span:
span.set_attribute("ffmpeg.task", str(ffmpeg_task))
if not ffmpeg_task.need_run():
ffmpeg_task.set_output_file(ffmpeg_task.input_file[0])
span.set_status(Status(StatusCode.OK))
return True
ffmpeg_args = ffmpeg_task.get_ffmpeg_args()
if len(ffmpeg_args) == 0:
ffmpeg_task.set_output_file(ffmpeg_task.input_file[0])
span.set_status(Status(StatusCode.OK))
return True
ffmpeg_process = subprocess.run(["ffmpeg", "-progress", "-", "-loglevel", "error", *ffmpeg_args], stderr=subprocess.PIPE, **subprocess_args(True))
span.set_attribute("ffmpeg.args", json.dumps(ffmpeg_process.args))
logger.info(" ".join(ffmpeg_process.args))
ffmpeg_final_out = handle_ffmpeg_output(ffmpeg_process.stdout)
span.set_attribute("ffmpeg.out", ffmpeg_final_out)
logger.info("FINISH TASK, OUTPUT IS %s", ffmpeg_final_out)
code = ffmpeg_process.returncode
span.set_attribute("ffmpeg.code", code)
if code != 0:
span.set_attribute("ffmpeg.err", str(ffmpeg_process.stderr))
span.set_status(Status(StatusCode.ERROR, "FFMPEG异常退出"))
logger.error("FFMPEG ERROR: %s", ffmpeg_process.stderr)
return False
span.set_attribute("ffmpeg.out_file", ffmpeg_task.output_file)
try:
file_size = os.path.getsize(ffmpeg_task.output_file)
span.set_attribute("file.size", file_size)
if file_size < 4096:
span.set_status(Status(StatusCode.ERROR, "输出文件过小"))
logger.error("FFMPEG ERROR: OUTPUT FILE IS TOO SMALL")
return False
except OSError as e:
span.set_attribute("file.size", 0)
span.set_attribute("file.error", e.strerror)
span.set_status(Status(StatusCode.ERROR, "输出文件不存在"))
logger.error("FFMPEG ERROR: OUTPUT FILE NOT FOUND")
return False
span.set_status(Status(StatusCode.OK))
return True
# start_render函数已迁移到services/render_service.py中的DefaultRenderService
# 保留原有签名用于向后兼容,但建议使用新的服务架构
def start_render(ffmpeg_task):
"""
已迁移到新架构,建议使用 DefaultRenderService.render()
保留用于向后兼容
"""
logger.warning("start_render is deprecated, use DefaultRenderService.render() instead")
from services import DefaultRenderService
render_service = DefaultRenderService()
return render_service.render(ffmpeg_task)
def handle_ffmpeg_output(stdout: Optional[bytes]) -> str:
out_time = "0:0:0.0"