You've already forked FrameTour-RenderWorker
160 lines
5.6 KiB
Python
160 lines
5.6 KiB
Python
# 保留用于向后兼容的常量定义
|
|
import os
|
|
|
|
DEFAULT_ARGS = ("-shortest",)
|
|
ENCODER_ARGS = ("-c:v", "h264", ) if not os.getenv("ENCODER_ARGS", False) else os.getenv("ENCODER_ARGS", "").split(" ")
|
|
VIDEO_ARGS = ("-profile:v", "high", "-level:v", "4", ) if not os.getenv("VIDEO_ARGS", False) else os.getenv("VIDEO_ARGS", "").split(" ")
|
|
AUDIO_ARGS = ("-c:a", "aac", "-b:a", "128k", "-ar", "48000", "-ac", "2", )
|
|
MUTE_AUDIO_INPUT = ("-f", "lavfi", "-i", "anullsrc=cl=stereo:r=48000", )
|
|
|
|
|
|
def get_mp4toannexb_filter():
|
|
"""
|
|
Determine which mp4toannexb filter to use based on ENCODER_ARGS.
|
|
Returns 'hevc_mp4toannexb' if ENCODER_ARGS contains 'hevc', otherwise 'h264_mp4toannexb'.
|
|
"""
|
|
encoder_args_str = os.getenv("ENCODER_ARGS", "").lower()
|
|
if "hevc" in encoder_args_str:
|
|
return "hevc_mp4toannexb"
|
|
return "h264_mp4toannexb"
|
|
|
|
|
|
class FfmpegTask(object):
|
|
"""
|
|
兼容类:保留原有FfmpegTask接口用于向后兼容
|
|
实际处理逻辑已迁移到新架构,该类主要用作数据载体
|
|
"""
|
|
|
|
def __init__(self, input_file, task_type='copy', output_file=''):
|
|
"""保持原有构造函数签名"""
|
|
self.annexb = False
|
|
if type(input_file) is str:
|
|
if input_file.endswith(".ts"):
|
|
self.annexb = True
|
|
self.input_file = [input_file]
|
|
elif type(input_file) is list:
|
|
self.input_file = input_file
|
|
else:
|
|
self.input_file = []
|
|
self.zoom_cut = None
|
|
self.center_cut = None
|
|
self.ext_data = {}
|
|
self.task_type = task_type
|
|
self.output_file = output_file or ""
|
|
self.mute = True
|
|
self.speed = 1
|
|
self.frame_rate = 25
|
|
self.resolution = None
|
|
self.subtitles = []
|
|
self.luts = []
|
|
self.audios = []
|
|
self.overlays = []
|
|
self.effects = []
|
|
|
|
def __repr__(self):
|
|
return f'FfmpegTask(input_file={self.input_file}, task_type={self.task_type})'
|
|
|
|
def analyze_input_render_tasks(self):
|
|
"""分析输入中的子任务"""
|
|
for i in self.input_file:
|
|
if isinstance(i, FfmpegTask) and i.need_run():
|
|
yield i
|
|
|
|
def need_run(self):
|
|
"""判断是否需要运行"""
|
|
if self.annexb:
|
|
return True
|
|
return not self.check_can_copy()
|
|
|
|
def add_inputs(self, *inputs):
|
|
"""添加输入文件"""
|
|
self.input_file.extend(inputs)
|
|
|
|
def add_overlay(self, *overlays):
|
|
"""添加覆盖层"""
|
|
for overlay in overlays:
|
|
if str(overlay).endswith('.ass'):
|
|
self.subtitles.append(overlay)
|
|
else:
|
|
self.overlays.append(overlay)
|
|
self.correct_task_type()
|
|
|
|
def add_audios(self, *audios):
|
|
"""添加音频"""
|
|
self.audios.extend(audios)
|
|
self.correct_task_type()
|
|
|
|
def add_lut(self, *luts):
|
|
"""添加LUT"""
|
|
self.luts.extend(luts)
|
|
self.correct_task_type()
|
|
|
|
def add_effect(self, *effects):
|
|
"""添加效果"""
|
|
self.effects.extend(effects)
|
|
self.correct_task_type()
|
|
|
|
def get_output_file(self):
|
|
"""获取输出文件"""
|
|
if self.task_type == 'copy':
|
|
return self.input_file[0] if self.input_file else ""
|
|
if not self.output_file:
|
|
self.set_output_file()
|
|
return self.output_file
|
|
|
|
def correct_task_type(self):
|
|
"""校正任务类型"""
|
|
if self.check_can_copy():
|
|
self.task_type = 'copy'
|
|
elif self.check_can_concat():
|
|
self.task_type = 'concat'
|
|
else:
|
|
self.task_type = 'encode'
|
|
|
|
def check_can_concat(self):
|
|
"""检查是否可以连接"""
|
|
return (len(self.luts) == 0 and len(self.overlays) == 0 and
|
|
len(self.subtitles) == 0 and len(self.effects) == 0 and
|
|
self.speed == 1 and self.zoom_cut is None and self.center_cut is None)
|
|
|
|
def check_can_copy(self):
|
|
"""检查是否可以复制"""
|
|
return (len(self.luts) == 0 and len(self.overlays) == 0 and
|
|
len(self.subtitles) == 0 and len(self.effects) == 0 and
|
|
self.speed == 1 and len(self.audios) == 0 and len(self.input_file) <= 1 and
|
|
self.zoom_cut is None and self.center_cut is None)
|
|
|
|
def set_output_file(self, file=None):
|
|
"""设置输出文件"""
|
|
if file is None:
|
|
import uuid
|
|
if self.annexb:
|
|
self.output_file = f"rand_{uuid.uuid4()}.ts"
|
|
else:
|
|
self.output_file = f"rand_{uuid.uuid4()}.mp4"
|
|
else:
|
|
if isinstance(file, FfmpegTask):
|
|
if file != self:
|
|
self.output_file = file.get_output_file()
|
|
elif isinstance(file, str):
|
|
self.output_file = file
|
|
|
|
def check_annexb(self):
|
|
"""检查annexb格式"""
|
|
return self.annexb
|
|
|
|
def get_ffmpeg_args(self):
|
|
"""
|
|
保留用于向后兼容,但实际逻辑已迁移到新架构
|
|
建议使用新的 FFmpegCommandBuilder 来生成命令
|
|
"""
|
|
# 简化版本,主要用于向后兼容
|
|
if self.task_type == 'copy' and len(self.input_file) == 1:
|
|
if isinstance(self.input_file[0], str):
|
|
if self.input_file[0] == self.get_output_file():
|
|
return []
|
|
return ['-y', '-hide_banner', '-i', self.input_file[0], '-c', 'copy', self.get_output_file()]
|
|
|
|
# 对于复杂情况,返回基础命令结构
|
|
# 实际处理会在新的服务架构中完成
|
|
return ['-y', '-hide_banner', '-i'] + self.input_file + ['-c', 'copy', self.get_output_file()] |