音轨叠加

This commit is contained in:
2025-03-06 10:34:28 +08:00
parent 94373cee72
commit 56bdad7ad1
4 changed files with 102 additions and 109 deletions

View File

@ -3,8 +3,11 @@ import time
import uuid
from typing import Any
ENCODER_ARGS = ("-c:v", "h264_qsv", "-global_quality", "28", "-look_ahead", "1",)
PROFILE_LEVEL_ARGS = ("-profile:v", "high", "-level:v", "4")
DEFAULT_ARGS = ("-shortest",)
ENCODER_ARGS = ("-c:v", "h264_qsv", "-global_quality", "28", "-look_ahead", "1", )
VIDEO_ARGS = ("-profile:v", "high", "-level:v", "4", )
AUDIO_ARGS = ("-c:a", "aac", "-b:a", "128k", "-ar", "48000", "-ac", "2", )
MUTE_AUDIO_INPUT = ("-f", "lavfi", "-i", "anullsrc=cl=stereo:r=48000", )
class FfmpegTask(object):
@ -149,23 +152,22 @@ class FfmpegTask(object):
return True
def check_audio_track(self):
if len(self.audios) > 0:
self.mute = False
...
def get_ffmpeg_args(self):
args = ['-y', '-hide_banner']
if self.task_type == 'encode':
input_args = []
filter_args = []
output_args = [*PROFILE_LEVEL_ARGS,"-shortest", *ENCODER_ARGS]
output_args = [*VIDEO_ARGS, *AUDIO_ARGS, *ENCODER_ARGS, *DEFAULT_ARGS]
if self.annexb:
output_args.append("-bsf:v")
output_args.append("h264_mp4toannexb")
output_args.append("-reset_timestamps")
output_args.append("1")
video_output_str = "[0:v]"
audio_output_str = "[0:v]"
audio_input_count = 0
audio_output_str = ""
effect_index = 0
for input_file in self.input_file:
input_args.append("-i")
if type(input_file) is str:
@ -179,8 +181,9 @@ class FfmpegTask(object):
_f_x = pos_json.get('ltX', 0)
_f_x2 = pos_json.get('rbX', 0)
_x = f'{float((_f_x2 - _f_x)/(2 * _v_w)) :.4f}*iw'
filter_args.append(f"{video_output_str}crop=x={_x}:y=0:w=ih*ih/iw:h=ih[v_cut]")
video_output_str = "[v_cut]"
filter_args.append(f"{video_output_str}crop=x={_x}:y=0:w=ih*ih/iw:h=ih[v_cut{effect_index}]")
video_output_str = f"[v_cut{effect_index}]"
effect_index += 1
for effect in self.effects:
if effect.startswith("cameraShot:"):
param = effect.split(":", 2)[1]
@ -203,18 +206,19 @@ class FfmpegTask(object):
_mid_out_str = "[eff_m]"
_end_out_str = "[eff_e]"
filter_args.append(f"{video_output_str}split=3{_start_out_str}{_mid_out_str}{_end_out_str}")
filter_args.append(f"{video_output_str}[0:v]select=lt(n\,{int(start*self.frame_rate)}){_start_out_str}")
filter_args.append(f"{video_output_str}[0:v]select=gt(n\,{int(start*self.frame_rate)}){_end_out_str}")
filter_args.append(f"{video_output_str}[0:v]select=eq(n\,{int(start*self.frame_rate)}){_mid_out_str}")
filter_args.append(f"{_start_out_str}select=lt(n\,{int(start*self.frame_rate)}){_start_out_str}")
filter_args.append(f"{_end_out_str}select=gt(n\,{int(start*self.frame_rate)}){_end_out_str}")
filter_args.append(f"{_mid_out_str}select=eq(n\,{int(start*self.frame_rate)}){_mid_out_str}")
filter_args.append(f"{_mid_out_str}tpad=start_mode=clone:start_duration={duration:.4f}{_mid_out_str}")
if rotate_deg != 0:
filter_args.append(f"{_mid_out_str}rotate=PI*{rotate_deg}/360{_mid_out_str}")
# filter_args.append(f"{video_output_str}trim=start=0:end={start+duration},tpad=stop_mode=clone:stop_duration={duration},setpts=PTS-STARTPTS{_start_out_str}")
# filter_args.append(f"tpad=start_mode=clone:start_duration={duration},setpts=PTS-STARTPTS{_start_out_str}")
# filter_args.append(f"{_end_out_str}trim=start={start}{_end_out_str}")
video_output_str = "[v_eff]"
video_output_str = f"[v_eff{effect_index}]"
# filter_args.append(f"{_end_out_str}{_start_out_str}overlay=eof_action=pass{video_output_str}")
filter_args.append(f"{_start_out_str}{_mid_out_str}{_end_out_str}concat=n=3:v=1:a=0,setpts=N/{self.frame_rate}/TB{video_output_str}")
effect_index += 1
elif effect.startswith("zoom:"):
...
...
@ -235,90 +239,73 @@ class FfmpegTask(object):
output_args.append("-r")
output_args.append(f"{self.frame_rate}")
if self.mute:
output_args.append("-an")
input_index = input_args.count("-i")
input_args += MUTE_AUDIO_INPUT
filter_args.append(f"[{input_index}:a]acopy[a]")
audio_output_str = "[a]"
else:
input_index = 0
for audio in self.audios:
input_index = input_args.count("-i")
input_args.append("-i")
input_args.append(audio.replace("\\", "/"))
if audio_input_count > 0:
filter_args.append(f"{audio_output_str}[{input_index}:a]amix[a]")
audio_output_str = "[a]"
else:
audio_output_str = f"[{input_index}:a]"
audio_input_count += 1
if audio_input_count == 1:
audio_output_str = f"{input_index}"
output_args.append(f"-map")
audio_output_str = "[0:a]"
for audio in self.audios:
input_index = input_args.count("-i")
input_args.append("-i")
input_args.append(audio.replace("\\", "/"))
if audio_output_str == "":
filter_args.append(f"[{input_index}:a]acopy[a]")
audio_output_str = "[a]"
else:
filter_args.append(f"{audio_output_str}[{input_index}:a]amix[a]")
audio_output_str = "[a]"
if audio_output_str:
output_args.append("-map")
output_args.append(audio_output_str)
return args + input_args + ["-filter_complex", ";".join(filter_args)] + output_args + [self.get_output_file()]
_filter_args = [] if len(filter_args) == 0 else ["-filter_complex", ";".join(filter_args)]
return args + input_args + _filter_args + output_args + [self.get_output_file()]
elif self.task_type == 'concat':
# 无法通过 annexb 合并的
input_args = []
output_args = ["-shortest"]
if self.check_annexb() and len(self.audios) <= 1:
# output_args
if len(self.audios) > 0:
input_args.append("-an")
_tmp_file = "tmp_concat_"+str(time.time())+".txt"
with open(_tmp_file, "w", encoding="utf-8") as f:
for input_file in self.input_file:
if type(input_file) is str:
f.write("file '"+input_file+"'\n")
elif isinstance(input_file, FfmpegTask):
f.write("file '" + input_file.get_output_file() + "'\n")
input_args += ("-f", "concat", "-safe", "0", "-i", _tmp_file)
output_args.append("-c:v")
output_args.append("copy")
if len(self.audios) > 0:
input_args.append("-i")
input_args.append(self.audios[0])
output_args.append("-c:a")
output_args.append("copy")
output_args.append("-f")
output_args.append("mp4")
return args + input_args + output_args + [self.get_output_file()]
output_args += ["-r", f"{self.frame_rate}", *PROFILE_LEVEL_ARGS, *ENCODER_ARGS]
output_args = [*DEFAULT_ARGS]
filter_args = []
video_output_str = "[0:v]"
audio_output_str = "[0:a]"
video_input_count = 0
audio_input_count = 0
for input_file in self.input_file:
audio_output_str = ""
audio_track_index = 0
# output_args
_tmp_file = "tmp_concat_"+str(time.time())+".txt"
with open(_tmp_file, "w", encoding="utf-8") as f:
for input_file in self.input_file:
if type(input_file) is str:
f.write("file '"+input_file+"'\n")
elif isinstance(input_file, FfmpegTask):
f.write("file '" + input_file.get_output_file() + "'\n")
input_args += ["-f", "concat", "-safe", "0", "-i", _tmp_file]
output_args.append("-map")
output_args.append("0:v")
output_args.append("-c:v")
output_args.append("copy")
if self.mute:
input_index = input_args.count("-i")
input_args += MUTE_AUDIO_INPUT
audio_output_str = f"[{input_index}:a]"
audio_track_index += 1
else:
audio_output_str = "[0:a]"
audio_track_index += 1
for audio in self.audios:
input_index = input_args.count("-i")
input_args.append("-i")
if type(input_file) is str:
input_args.append(input_file.replace("\\", "/"))
elif isinstance(input_file, FfmpegTask):
input_args.append(input_file.get_output_file().replace("\\", "/"))
if video_input_count > 0:
filter_args.append(f"{video_output_str}[{input_index}:v]concat=n=2:v=1:a=0[v]")
video_output_str = "[v]"
input_args.append(audio.replace("\\", "/"))
audio_track_index += 1
filter_args.append(f"{audio_output_str}[{input_index}:a]amix[a]")
audio_output_str = "[a]"
if audio_output_str:
output_args.append("-map")
if audio_track_index <= 1:
output_args.append(audio_output_str[1:-1])
else:
video_output_str = f"[{input_index}:v]"
video_input_count += 1
output_args.append("-map")
output_args.append(video_output_str)
if self.mute:
output_args.append("-an")
else:
input_index = 0
for audio in self.audios:
input_index = input_args.count("-i")
input_args.append("-i")
input_args.append(audio.replace("\\", "/"))
if audio_input_count > 0:
filter_args.append(f"{audio_output_str}[{input_index}:a]amix[a]")
audio_output_str = "[a]"
else:
audio_output_str = f"[{input_index}:a]"
audio_input_count += 1
if audio_input_count == 1:
audio_output_str = f"{input_index}"
output_args.append(f"-map")
output_args.append(audio_output_str)
return args + input_args + ["-filter_complex", ";".join(filter_args)] + output_args + [self.get_output_file()]
output_args.append(audio_output_str)
output_args += AUDIO_ARGS
output_args.append("-f")
output_args.append("mp4")
_filter_args = [] if len(filter_args) == 0 else ["-filter_complex", ";".join(filter_args)]
return args + input_args + _filter_args + output_args + [self.get_output_file()]
elif self.task_type == 'copy':
if len(self.input_file) == 1:
if type(self.input_file[0]) is str: