339 lines
13 KiB
Python
339 lines
13 KiB
Python
import time
|
|
import uuid
|
|
from typing import Any
|
|
|
|
ENCODER_ARGS = ("-c:v", "h264_nvenc",)
|
|
PROFILE_LEVEL_ARGS = ("-profile:v", "high", "-level:v", "4")
|
|
|
|
|
|
class FfmpegTask(object):
|
|
|
|
effects: list[str]
|
|
|
|
def __init__(self, input_file, task_type='copy', output_file=''):
|
|
self.annexb = False
|
|
if type(input_file) is str:
|
|
if input_file.endswith(".ts"):
|
|
self.annexb = True
|
|
self.input_file = [input_file]
|
|
elif type(input_file) is list:
|
|
self.input_file = input_file
|
|
else:
|
|
self.input_file = []
|
|
self.zoom_cut = None
|
|
self.center_cut = None
|
|
self.ext_data = {}
|
|
self.task_type = task_type
|
|
self.output_file = output_file
|
|
self.mute = True
|
|
self.speed = 1
|
|
self.frame_rate = 25
|
|
self.subtitles = []
|
|
self.luts = []
|
|
self.audios = []
|
|
self.overlays = []
|
|
self.effects = []
|
|
|
|
def __repr__(self):
|
|
_str = f'FfmpegTask(input_file={self.input_file}, task_type={self.task_type}'
|
|
if len(self.luts) > 0:
|
|
_str += f', luts={self.luts}'
|
|
if len(self.audios) > 0:
|
|
_str += f', audios={self.audios}'
|
|
if len(self.overlays) > 0:
|
|
_str += f', overlays={self.overlays}'
|
|
if self.annexb:
|
|
_str += f', annexb={self.annexb}'
|
|
if self.effects:
|
|
_str += f', effects={self.effects}'
|
|
if self.mute:
|
|
_str += f', mute={self.mute}'
|
|
return _str + ')'
|
|
|
|
def analyze_input_render_tasks(self):
|
|
for i in self.input_file:
|
|
if type(i) is str:
|
|
continue
|
|
elif isinstance(i, FfmpegTask):
|
|
if i.need_run():
|
|
yield i
|
|
|
|
def need_run(self):
|
|
"""
|
|
判断是否需要运行
|
|
:rtype: bool
|
|
:return:
|
|
"""
|
|
if self.annexb:
|
|
return True
|
|
# TODO: copy from url
|
|
return not self.check_can_copy()
|
|
|
|
def add_inputs(self, *inputs):
|
|
self.input_file.extend(inputs)
|
|
|
|
def add_overlay(self, *overlays):
|
|
for overlay in overlays:
|
|
if str(overlay).endswith('.ass'):
|
|
self.subtitles.append(overlay)
|
|
else:
|
|
self.overlays.append(overlay)
|
|
self.correct_task_type()
|
|
|
|
def add_audios(self, *audios):
|
|
self.audios.extend(audios)
|
|
self.correct_task_type()
|
|
self.check_audio_track()
|
|
|
|
def add_lut(self, *luts):
|
|
self.luts.extend(luts)
|
|
self.correct_task_type()
|
|
|
|
def add_effect(self, *effects):
|
|
self.effects.extend(effects)
|
|
self.correct_task_type()
|
|
|
|
def get_output_file(self):
|
|
if self.task_type == 'copy':
|
|
return self.input_file[0]
|
|
if self.output_file == '':
|
|
self.set_output_file()
|
|
return self.output_file
|
|
|
|
def correct_task_type(self):
|
|
if self.check_can_copy():
|
|
self.task_type = 'copy'
|
|
elif self.check_can_concat():
|
|
self.task_type = 'concat'
|
|
else:
|
|
self.task_type = 'encode'
|
|
|
|
def check_can_concat(self):
|
|
if len(self.luts) > 0:
|
|
return False
|
|
if len(self.overlays) > 0:
|
|
return False
|
|
if len(self.subtitles) > 0:
|
|
return False
|
|
if len(self.effects) > 0:
|
|
return False
|
|
if self.speed != 1:
|
|
return False
|
|
if self.zoom_cut is not None:
|
|
return False
|
|
if self.center_cut is not None:
|
|
return False
|
|
return True
|
|
|
|
def check_can_copy(self):
|
|
if len(self.luts) > 0:
|
|
return False
|
|
if len(self.overlays) > 0:
|
|
return False
|
|
if len(self.subtitles) > 0:
|
|
return False
|
|
if len(self.effects) > 0:
|
|
return False
|
|
if self.speed != 1:
|
|
return False
|
|
if len(self.audios) > 1:
|
|
return False
|
|
if len(self.input_file) > 1:
|
|
return False
|
|
if self.zoom_cut is not None:
|
|
return False
|
|
if self.center_cut is not None:
|
|
return False
|
|
return True
|
|
|
|
def check_audio_track(self):
|
|
if len(self.audios) > 0:
|
|
self.mute = False
|
|
|
|
def get_ffmpeg_args(self):
|
|
args = ['-y', '-hide_banner']
|
|
if self.task_type == 'encode':
|
|
input_args = []
|
|
filter_args = []
|
|
output_args = [*PROFILE_LEVEL_ARGS,"-shortest", *ENCODER_ARGS]
|
|
if self.annexb:
|
|
output_args.append("-bsf:v")
|
|
output_args.append("h264_mp4toannexb")
|
|
output_args.append("-reset_timestamps")
|
|
output_args.append("1")
|
|
video_output_str = "[0:v]"
|
|
audio_output_str = "[0:v]"
|
|
audio_input_count = 0
|
|
for input_file in self.input_file:
|
|
input_args.append("-i")
|
|
if type(input_file) is str:
|
|
input_args.append(input_file)
|
|
elif isinstance(input_file, FfmpegTask):
|
|
input_args.append(input_file.get_output_file())
|
|
if self.center_cut == 1:
|
|
pos_json = self.ext_data.get('posJson', {})
|
|
_v_w = pos_json.get('imgWidth', 1)
|
|
_f_x = pos_json.get('ltX', 0)
|
|
_x = f'{float(_f_x/_v_w) :.5f}*iw'
|
|
filter_args.append(f"[{video_output_str}]crop=x={_x}:y=0:w=ih*ih/iw:h=ih[{video_output_str}]")
|
|
for effect in self.effects:
|
|
if effect.startswith("cameraShot:"):
|
|
param = effect.split(":", 2)[1]
|
|
if param == '':
|
|
param = "3,1"
|
|
_split = param.split(",")
|
|
start = 3
|
|
duration = 1
|
|
if len(_split) >= 2:
|
|
start = int(_split[0])
|
|
duration = int(_split[1])
|
|
elif len(_split) == 1:
|
|
start = int(_split[0])
|
|
_start_out_str = "[eff_s]"
|
|
_end_out_str = "[eff_e]"
|
|
filter_args.append(f"{video_output_str}fps=fps={self.frame_rate},split{_start_out_str}{_end_out_str}")
|
|
filter_args.append(f"{_start_out_str}trim=start={0}:end={start+duration},freezeframes=first={start*self.frame_rate}:replace={start*self.frame_rate}{_start_out_str}")
|
|
filter_args.append(f"{_end_out_str}trim=start={start}{_end_out_str}")
|
|
video_output_str = "[v_eff]"
|
|
filter_args.append(f"{_start_out_str}{_end_out_str}concat=n=2:v=1:a=0{video_output_str}")
|
|
elif effect.startswith("zoom:"):
|
|
...
|
|
...
|
|
for lut in self.luts:
|
|
filter_args.append(f"[{video_output_str}]lut3d=file={lut}[{video_output_str}]")
|
|
for overlay in self.overlays:
|
|
input_index = input_args.count("-i")
|
|
input_args.append("-i")
|
|
input_args.append(overlay)
|
|
filter_args.append(f"{video_output_str}[{input_index}:v]scale=rw:rh[v]")
|
|
filter_args.append(f"[v][{input_index}:v]overlay=1:eof_action=endall[v]")
|
|
video_output_str = "[v]"
|
|
for subtitle in self.subtitles:
|
|
filter_args.append(f"{video_output_str}ass={subtitle}[v]")
|
|
video_output_str = "[v]"
|
|
output_args.append("-map")
|
|
output_args.append(video_output_str)
|
|
output_args.append("-r")
|
|
output_args.append(f"{self.frame_rate}")
|
|
if self.mute:
|
|
output_args.append("-an")
|
|
else:
|
|
input_index = 0
|
|
for audio in self.audios:
|
|
input_index = input_args.count("-i")
|
|
input_args.append("-i")
|
|
input_args.append(audio.replace("\\", "/"))
|
|
if audio_input_count > 0:
|
|
filter_args.append(f"{audio_output_str}[{input_index}:a]amix[a]")
|
|
audio_output_str = "[a]"
|
|
else:
|
|
audio_output_str = f"[{input_index}:a]"
|
|
audio_input_count += 1
|
|
if audio_input_count == 1:
|
|
audio_output_str = f"{input_index}"
|
|
output_args.append(f"-map")
|
|
output_args.append(audio_output_str)
|
|
return args + input_args + ["-filter_complex", ";".join(filter_args)] + output_args + [self.get_output_file()]
|
|
elif self.task_type == 'concat':
|
|
# 无法通过 annexb 合并的
|
|
input_args = []
|
|
output_args = ["-shortest"]
|
|
if self.check_annexb() and len(self.audios) <= 1:
|
|
# output_args
|
|
if len(self.audios) > 0:
|
|
input_args.append("-an")
|
|
_tmp_file = "tmp_concat_"+str(time.time())+".txt"
|
|
with open(_tmp_file, "w", encoding="utf-8") as f:
|
|
for input_file in self.input_file:
|
|
if type(input_file) is str:
|
|
f.write("file '"+input_file+"'\n")
|
|
elif isinstance(input_file, FfmpegTask):
|
|
f.write("file '" + input_file.get_output_file() + "'\n")
|
|
input_args += ("-f", "concat", "-safe", "0", "-i", _tmp_file)
|
|
output_args.append("-c:v")
|
|
output_args.append("copy")
|
|
if len(self.audios) > 0:
|
|
input_args.append("-i")
|
|
input_args.append(self.audios[0])
|
|
output_args.append("-c:a")
|
|
output_args.append("copy")
|
|
output_args.append("-f")
|
|
output_args.append("mp4")
|
|
return args + input_args + output_args + [self.get_output_file()]
|
|
output_args += ["-r", f"{self.frame_rate}", *PROFILE_LEVEL_ARGS, *ENCODER_ARGS]
|
|
filter_args = []
|
|
video_output_str = "[0:v]"
|
|
audio_output_str = "[0:a]"
|
|
video_input_count = 0
|
|
audio_input_count = 0
|
|
for input_file in self.input_file:
|
|
input_index = input_args.count("-i")
|
|
input_args.append("-i")
|
|
if type(input_file) is str:
|
|
input_args.append(input_file.replace("\\", "/"))
|
|
elif isinstance(input_file, FfmpegTask):
|
|
input_args.append(input_file.get_output_file().replace("\\", "/"))
|
|
if video_input_count > 0:
|
|
filter_args.append(f"{video_output_str}[{input_index}:v]concat=n=2:v=1:a=0[v]")
|
|
video_output_str = "[v]"
|
|
else:
|
|
video_output_str = f"[{input_index}:v]"
|
|
video_input_count += 1
|
|
output_args.append("-map")
|
|
output_args.append(video_output_str)
|
|
if self.mute:
|
|
output_args.append("-an")
|
|
else:
|
|
input_index = 0
|
|
for audio in self.audios:
|
|
input_index = input_args.count("-i")
|
|
input_args.append("-i")
|
|
input_args.append(audio.replace("\\", "/"))
|
|
if audio_input_count > 0:
|
|
filter_args.append(f"{audio_output_str}[{input_index}:a]amix[a]")
|
|
audio_output_str = "[a]"
|
|
else:
|
|
audio_output_str = f"[{input_index}:a]"
|
|
audio_input_count += 1
|
|
if audio_input_count == 1:
|
|
audio_output_str = f"{input_index}"
|
|
output_args.append(f"-map")
|
|
output_args.append(audio_output_str)
|
|
return args + input_args + ["-filter_complex", ";".join(filter_args)] + output_args + [self.get_output_file()]
|
|
elif self.task_type == 'copy':
|
|
if len(self.input_file) == 1:
|
|
if type(self.input_file[0]) is str:
|
|
if self.input_file[0] == self.get_output_file():
|
|
return []
|
|
return args + ["-i", self.input_file[0]] + ["-c", "copy", self.get_output_file()]
|
|
|
|
def set_output_file(self, file=None):
|
|
if file is None:
|
|
if self.output_file == '':
|
|
if self.annexb:
|
|
self.output_file = "rand_" + str(uuid.uuid4()) + ".ts"
|
|
else:
|
|
self.output_file = "rand_" + str(uuid.uuid4()) + ".mp4"
|
|
else:
|
|
if isinstance(file, FfmpegTask):
|
|
if file == self:
|
|
return
|
|
self.output_file = file.get_output_file()
|
|
if type(file) is str:
|
|
self.output_file = file
|
|
|
|
def check_annexb(self):
|
|
for input_file in self.input_file:
|
|
if type(input_file) is str:
|
|
if self.task_type == 'encode':
|
|
return self.annexb
|
|
elif self.task_type == 'concat':
|
|
return False
|
|
elif self.task_type == 'copy':
|
|
return self.annexb
|
|
else:
|
|
return False
|
|
elif isinstance(input_file, FfmpegTask):
|
|
if not input_file.check_annexb():
|
|
return False
|
|
return True |