You've already forked FrameTour-RenderWorker
mypy
This commit is contained in:
102
entity/ffmpeg.py
102
entity/ffmpeg.py
@@ -2,10 +2,40 @@
|
||||
import os
|
||||
|
||||
DEFAULT_ARGS = ("-shortest",)
|
||||
ENCODER_ARGS = ("-c:v", "h264", ) if not os.getenv("ENCODER_ARGS", False) else os.getenv("ENCODER_ARGS", "").split(" ")
|
||||
VIDEO_ARGS = ("-profile:v", "high", "-level:v", "4", ) if not os.getenv("VIDEO_ARGS", False) else os.getenv("VIDEO_ARGS", "").split(" ")
|
||||
AUDIO_ARGS = ("-c:a", "aac", "-b:a", "128k", "-ar", "48000", "-ac", "2", )
|
||||
MUTE_AUDIO_INPUT = ("-f", "lavfi", "-i", "anullsrc=cl=stereo:r=48000", )
|
||||
ENCODER_ARGS = (
|
||||
(
|
||||
"-c:v",
|
||||
"h264",
|
||||
)
|
||||
if not os.getenv("ENCODER_ARGS", False)
|
||||
else os.getenv("ENCODER_ARGS", "").split(" ")
|
||||
)
|
||||
VIDEO_ARGS = (
|
||||
(
|
||||
"-profile:v",
|
||||
"high",
|
||||
"-level:v",
|
||||
"4",
|
||||
)
|
||||
if not os.getenv("VIDEO_ARGS", False)
|
||||
else os.getenv("VIDEO_ARGS", "").split(" ")
|
||||
)
|
||||
AUDIO_ARGS = (
|
||||
"-c:a",
|
||||
"aac",
|
||||
"-b:a",
|
||||
"128k",
|
||||
"-ar",
|
||||
"48000",
|
||||
"-ac",
|
||||
"2",
|
||||
)
|
||||
MUTE_AUDIO_INPUT = (
|
||||
"-f",
|
||||
"lavfi",
|
||||
"-i",
|
||||
"anullsrc=cl=stereo:r=48000",
|
||||
)
|
||||
|
||||
|
||||
def get_mp4toannexb_filter():
|
||||
@@ -24,8 +54,8 @@ class FfmpegTask(object):
|
||||
兼容类:保留原有FfmpegTask接口用于向后兼容
|
||||
实际处理逻辑已迁移到新架构,该类主要用作数据载体
|
||||
"""
|
||||
|
||||
def __init__(self, input_file, task_type='copy', output_file=''):
|
||||
|
||||
def __init__(self, input_file, task_type="copy", output_file=""):
|
||||
"""保持原有构造函数签名"""
|
||||
self.annexb = False
|
||||
if type(input_file) is str:
|
||||
@@ -52,7 +82,7 @@ class FfmpegTask(object):
|
||||
self.effects = []
|
||||
|
||||
def __repr__(self):
|
||||
return f'FfmpegTask(input_file={self.input_file}, task_type={self.task_type})'
|
||||
return f"FfmpegTask(input_file={self.input_file}, task_type={self.task_type})"
|
||||
|
||||
def analyze_input_render_tasks(self):
|
||||
"""分析输入中的子任务"""
|
||||
@@ -73,7 +103,7 @@ class FfmpegTask(object):
|
||||
def add_overlay(self, *overlays):
|
||||
"""添加覆盖层"""
|
||||
for overlay in overlays:
|
||||
if str(overlay).endswith('.ass'):
|
||||
if str(overlay).endswith(".ass"):
|
||||
self.subtitles.append(overlay)
|
||||
else:
|
||||
self.overlays.append(overlay)
|
||||
@@ -96,7 +126,7 @@ class FfmpegTask(object):
|
||||
|
||||
def get_output_file(self):
|
||||
"""获取输出文件"""
|
||||
if self.task_type == 'copy':
|
||||
if self.task_type == "copy":
|
||||
return self.input_file[0] if self.input_file else ""
|
||||
if not self.output_file:
|
||||
self.set_output_file()
|
||||
@@ -105,29 +135,43 @@ class FfmpegTask(object):
|
||||
def correct_task_type(self):
|
||||
"""校正任务类型"""
|
||||
if self.check_can_copy():
|
||||
self.task_type = 'copy'
|
||||
self.task_type = "copy"
|
||||
elif self.check_can_concat():
|
||||
self.task_type = 'concat'
|
||||
self.task_type = "concat"
|
||||
else:
|
||||
self.task_type = 'encode'
|
||||
self.task_type = "encode"
|
||||
|
||||
def check_can_concat(self):
|
||||
"""检查是否可以连接"""
|
||||
return (len(self.luts) == 0 and len(self.overlays) == 0 and
|
||||
len(self.subtitles) == 0 and len(self.effects) == 0 and
|
||||
self.speed == 1 and self.zoom_cut is None and self.center_cut is None)
|
||||
return (
|
||||
len(self.luts) == 0
|
||||
and len(self.overlays) == 0
|
||||
and len(self.subtitles) == 0
|
||||
and len(self.effects) == 0
|
||||
and self.speed == 1
|
||||
and self.zoom_cut is None
|
||||
and self.center_cut is None
|
||||
)
|
||||
|
||||
def check_can_copy(self):
|
||||
"""检查是否可以复制"""
|
||||
return (len(self.luts) == 0 and len(self.overlays) == 0 and
|
||||
len(self.subtitles) == 0 and len(self.effects) == 0 and
|
||||
self.speed == 1 and len(self.audios) == 0 and len(self.input_file) <= 1 and
|
||||
self.zoom_cut is None and self.center_cut is None)
|
||||
return (
|
||||
len(self.luts) == 0
|
||||
and len(self.overlays) == 0
|
||||
and len(self.subtitles) == 0
|
||||
and len(self.effects) == 0
|
||||
and self.speed == 1
|
||||
and len(self.audios) == 0
|
||||
and len(self.input_file) <= 1
|
||||
and self.zoom_cut is None
|
||||
and self.center_cut is None
|
||||
)
|
||||
|
||||
def set_output_file(self, file=None):
|
||||
"""设置输出文件"""
|
||||
if file is None:
|
||||
import uuid
|
||||
|
||||
if self.annexb:
|
||||
self.output_file = f"rand_{uuid.uuid4()}.ts"
|
||||
else:
|
||||
@@ -149,12 +193,24 @@ class FfmpegTask(object):
|
||||
建议使用新的 FFmpegCommandBuilder 来生成命令
|
||||
"""
|
||||
# 简化版本,主要用于向后兼容
|
||||
if self.task_type == 'copy' and len(self.input_file) == 1:
|
||||
if self.task_type == "copy" and len(self.input_file) == 1:
|
||||
if isinstance(self.input_file[0], str):
|
||||
if self.input_file[0] == self.get_output_file():
|
||||
return []
|
||||
return ['-y', '-hide_banner', '-i', self.input_file[0], '-c', 'copy', self.get_output_file()]
|
||||
|
||||
return [
|
||||
"-y",
|
||||
"-hide_banner",
|
||||
"-i",
|
||||
self.input_file[0],
|
||||
"-c",
|
||||
"copy",
|
||||
self.get_output_file(),
|
||||
]
|
||||
|
||||
# 对于复杂情况,返回基础命令结构
|
||||
# 实际处理会在新的服务架构中完成
|
||||
return ['-y', '-hide_banner', '-i'] + self.input_file + ['-c', 'copy', self.get_output_file()]
|
||||
return (
|
||||
["-y", "-hide_banner", "-i"]
|
||||
+ self.input_file
|
||||
+ ["-c", "copy", self.get_output_file()]
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user