Files
FrameTour-RenderWorker/entity/ffmpeg.py
2025-09-24 10:17:11 +08:00

217 lines
6.1 KiB
Python

# 保留用于向后兼容的常量定义
import os
DEFAULT_ARGS = ("-shortest",)
ENCODER_ARGS = (
(
"-c:v",
"h264",
)
if not os.getenv("ENCODER_ARGS", False)
else os.getenv("ENCODER_ARGS", "").split(" ")
)
VIDEO_ARGS = (
(
"-profile:v",
"high",
"-level:v",
"4",
)
if not os.getenv("VIDEO_ARGS", False)
else os.getenv("VIDEO_ARGS", "").split(" ")
)
AUDIO_ARGS = (
"-c:a",
"aac",
"-b:a",
"128k",
"-ar",
"48000",
"-ac",
"2",
)
MUTE_AUDIO_INPUT = (
"-f",
"lavfi",
"-i",
"anullsrc=cl=stereo:r=48000",
)
def get_mp4toannexb_filter():
"""
Determine which mp4toannexb filter to use based on ENCODER_ARGS.
Returns 'hevc_mp4toannexb' if ENCODER_ARGS contains 'hevc', otherwise 'h264_mp4toannexb'.
"""
encoder_args_str = os.getenv("ENCODER_ARGS", "").lower()
if "hevc" in encoder_args_str:
return "hevc_mp4toannexb"
return "h264_mp4toannexb"
class FfmpegTask(object):
"""
兼容类:保留原有FfmpegTask接口用于向后兼容
实际处理逻辑已迁移到新架构,该类主要用作数据载体
"""
def __init__(self, input_file, task_type="copy", output_file=""):
"""保持原有构造函数签名"""
self.annexb = False
if type(input_file) is str:
if input_file.endswith(".ts"):
self.annexb = True
self.input_file = [input_file]
elif type(input_file) is list:
self.input_file = input_file
else:
self.input_file = []
self.zoom_cut = None
self.center_cut = None
self.ext_data = {}
self.task_type = task_type
self.output_file = output_file or ""
self.mute = True
self.speed = 1
self.frame_rate = 25
self.resolution = None
self.subtitles = []
self.luts = []
self.audios = []
self.overlays = []
self.effects = []
def __repr__(self):
return f"FfmpegTask(input_file={self.input_file}, task_type={self.task_type})"
def analyze_input_render_tasks(self):
"""分析输入中的子任务"""
for i in self.input_file:
if isinstance(i, FfmpegTask) and i.need_run():
yield i
def need_run(self):
"""判断是否需要运行"""
if self.annexb:
return True
return not self.check_can_copy()
def add_inputs(self, *inputs):
"""添加输入文件"""
self.input_file.extend(inputs)
def add_overlay(self, *overlays):
"""添加覆盖层"""
for overlay in overlays:
if str(overlay).endswith(".ass"):
self.subtitles.append(overlay)
else:
self.overlays.append(overlay)
self.correct_task_type()
def add_audios(self, *audios):
"""添加音频"""
self.audios.extend(audios)
self.correct_task_type()
def add_lut(self, *luts):
"""添加LUT"""
self.luts.extend(luts)
self.correct_task_type()
def add_effect(self, *effects):
"""添加效果"""
self.effects.extend(effects)
self.correct_task_type()
def get_output_file(self):
"""获取输出文件"""
if self.task_type == "copy":
return self.input_file[0] if self.input_file else ""
if not self.output_file:
self.set_output_file()
return self.output_file
def correct_task_type(self):
"""校正任务类型"""
if self.check_can_copy():
self.task_type = "copy"
elif self.check_can_concat():
self.task_type = "concat"
else:
self.task_type = "encode"
def check_can_concat(self):
"""检查是否可以连接"""
return (
len(self.luts) == 0
and len(self.overlays) == 0
and len(self.subtitles) == 0
and len(self.effects) == 0
and self.speed == 1
and self.zoom_cut is None
and self.center_cut is None
)
def check_can_copy(self):
"""检查是否可以复制"""
return (
len(self.luts) == 0
and len(self.overlays) == 0
and len(self.subtitles) == 0
and len(self.effects) == 0
and self.speed == 1
and len(self.audios) == 0
and len(self.input_file) <= 1
and self.zoom_cut is None
and self.center_cut is None
)
def set_output_file(self, file=None):
"""设置输出文件"""
if file is None:
import uuid
if self.annexb:
self.output_file = f"rand_{uuid.uuid4()}.ts"
else:
self.output_file = f"rand_{uuid.uuid4()}.mp4"
else:
if isinstance(file, FfmpegTask):
if file != self:
self.output_file = file.get_output_file()
elif isinstance(file, str):
self.output_file = file
def check_annexb(self):
"""检查annexb格式"""
return self.annexb
def get_ffmpeg_args(self):
"""
保留用于向后兼容,但实际逻辑已迁移到新架构
建议使用新的 FFmpegCommandBuilder 来生成命令
"""
# 简化版本,主要用于向后兼容
if self.task_type == "copy" and len(self.input_file) == 1:
if isinstance(self.input_file[0], str):
if self.input_file[0] == self.get_output_file():
return []
return [
"-y",
"-hide_banner",
"-i",
self.input_file[0],
"-c",
"copy",
self.get_output_file(),
]
# 对于复杂情况,返回基础命令结构
# 实际处理会在新的服务架构中完成
return (
["-y", "-hide_banner", "-i"]
+ self.input_file
+ ["-c", "copy", self.get_output_file()]
)