import json import os import time import uuid from typing import Any DEFAULT_ARGS = ("-shortest",) ENCODER_ARGS = ("-c:v", "h264", ) if not os.getenv("ENCODER_ARGS", False) else os.getenv("ENCODER_ARGS", "").split(" ") VIDEO_ARGS = ("-profile:v", "high", "-level:v", "4", ) AUDIO_ARGS = ("-c:a", "aac", "-b:a", "128k", "-ar", "48000", "-ac", "2", ) MUTE_AUDIO_INPUT = ("-f", "lavfi", "-i", "anullsrc=cl=stereo:r=48000", ) class FfmpegTask(object): effects: list[str] def __init__(self, input_file, task_type='copy', output_file=''): self.annexb = False if type(input_file) is str: if input_file.endswith(".ts"): self.annexb = True self.input_file = [input_file] elif type(input_file) is list: self.input_file = input_file else: self.input_file = [] self.zoom_cut = None self.center_cut = None self.ext_data = {} self.task_type = task_type self.output_file = output_file self.mute = True self.speed = 1 self.frame_rate = 25 self.resolution = None self.subtitles = [] self.luts = [] self.audios = [] self.overlays = [] self.effects = [] def __repr__(self): _str = f'FfmpegTask(input_file={self.input_file}, task_type={self.task_type}' if len(self.luts) > 0: _str += f', luts={self.luts}' if len(self.audios) > 0: _str += f', audios={self.audios}' if len(self.overlays) > 0: _str += f', overlays={self.overlays}' if self.annexb: _str += f', annexb={self.annexb}' if self.effects: _str += f', effects={self.effects}' if self.mute: _str += f', mute={self.mute}' _str += f', center_cut={self.center_cut}' return _str + ')' def analyze_input_render_tasks(self): for i in self.input_file: if type(i) is str: continue elif isinstance(i, FfmpegTask): if i.need_run(): yield i def need_run(self): """ 判断是否需要运行 :rtype: bool :return: """ if self.annexb: return True # TODO: copy from url return not self.check_can_copy() def add_inputs(self, *inputs): self.input_file.extend(inputs) def add_overlay(self, *overlays): for overlay in overlays: if str(overlay).endswith('.ass'): self.subtitles.append(overlay) else: self.overlays.append(overlay) self.correct_task_type() def add_audios(self, *audios): self.audios.extend(audios) self.correct_task_type() self.check_audio_track() def add_lut(self, *luts): self.luts.extend(luts) self.correct_task_type() def add_effect(self, *effects): self.effects.extend(effects) self.correct_task_type() def get_output_file(self): if self.task_type == 'copy': return self.input_file[0] if self.output_file == '': self.set_output_file() return self.output_file def correct_task_type(self): if self.check_can_copy(): self.task_type = 'copy' elif self.check_can_concat(): self.task_type = 'concat' else: self.task_type = 'encode' def check_can_concat(self): if len(self.luts) > 0: return False if len(self.overlays) > 0: return False if len(self.subtitles) > 0: return False if len(self.effects) > 0: return False if self.speed != 1: return False if self.zoom_cut is not None: return False if self.center_cut is not None: return False return True def check_can_copy(self): if len(self.luts) > 0: return False if len(self.overlays) > 0: return False if len(self.subtitles) > 0: return False if len(self.effects) > 0: return False if self.speed != 1: return False if len(self.audios) >= 1: return False if len(self.input_file) > 1: return False if self.zoom_cut is not None: return False if self.center_cut is not None: return False return True def check_audio_track(self): ... def get_ffmpeg_args(self): args = ['-y', '-hide_banner'] if self.task_type == 'encode': input_args = [] filter_args = [] output_args = [*VIDEO_ARGS, *AUDIO_ARGS, *ENCODER_ARGS, *DEFAULT_ARGS] if self.annexb: output_args.append("-bsf:v") output_args.append("h264_mp4toannexb") output_args.append("-reset_timestamps") output_args.append("1") video_output_str = "[0:v]" audio_output_str = "" audio_track_index = 0 effect_index = 0 for input_file in self.input_file: input_args.append("-i") if type(input_file) is str: input_args.append(input_file) elif isinstance(input_file, FfmpegTask): input_args.append(input_file.get_output_file()) if self.center_cut == 1: pos_json_str = self.ext_data.get('posJson', '{}') pos_json = json.loads(pos_json_str) _v_w = pos_json.get('imgWidth', 1) _f_x = pos_json.get('ltX', 0) _f_x2 = pos_json.get('rbX', 0) _x = f'{float((_f_x2 + _f_x)/(2 * _v_w)) :.4f}*iw-ih*ih/(2*iw)' filter_args.append(f"{video_output_str}crop=x={_x}:y=0:w=ih*ih/iw:h=ih[v_cut{effect_index}]") video_output_str = f"[v_cut{effect_index}]" effect_index += 1 for effect in self.effects: if effect.startswith("cameraShot:"): param = effect.split(":", 2)[1] if param == '': param = "3,1,0" _split = param.split(",") start = 3 duration = 1 rotate_deg = 0 if len(_split) >= 3: if _split[2] == '': rotate_deg = 0 else: rotate_deg = int(_split[2]) if len(_split) >= 2: duration = float(_split[1]) if len(_split) >= 1: start = float(_split[0]) _start_out_str = "[eff_s]" _mid_out_str = "[eff_m]" _end_out_str = "[eff_e]" filter_args.append(f"{video_output_str}split=3{_start_out_str}{_mid_out_str}{_end_out_str}") filter_args.append(f"{_start_out_str}select=lt(n\\,{int(start*self.frame_rate)}){_start_out_str}") filter_args.append(f"{_end_out_str}select=gt(n\\,{int(start*self.frame_rate)}){_end_out_str}") filter_args.append(f"{_mid_out_str}select=eq(n\\,{int(start*self.frame_rate)}){_mid_out_str}") filter_args.append(f"{_mid_out_str}tpad=start_mode=clone:start_duration={duration:.4f}{_mid_out_str}") if rotate_deg != 0: filter_args.append(f"{_mid_out_str}rotate=PI*{rotate_deg}/360{_mid_out_str}") # filter_args.append(f"{video_output_str}trim=start=0:end={start+duration},tpad=stop_mode=clone:stop_duration={duration},setpts=PTS-STARTPTS{_start_out_str}") # filter_args.append(f"tpad=start_mode=clone:start_duration={duration},setpts=PTS-STARTPTS{_start_out_str}") # filter_args.append(f"{_end_out_str}trim=start={start}{_end_out_str}") video_output_str = f"[v_eff{effect_index}]" # filter_args.append(f"{_end_out_str}{_start_out_str}overlay=eof_action=pass{video_output_str}") filter_args.append(f"{_start_out_str}{_mid_out_str}{_end_out_str}concat=n=3:v=1:a=0,setpts=N/{self.frame_rate}/TB{video_output_str}") effect_index += 1 elif effect.startswith("ospeed:"): param = effect.split(":", 2)[1] if param == '': param = "1" if param != "1": # 视频变速 effect_index += 1 filter_args.append(f"{video_output_str}setpts={param}*PTS[v_eff{effect_index}]") video_output_str = f"[v_eff{effect_index}]" elif effect.startswith("zoom:"): ... ... for lut in self.luts: filter_args.append(f"{video_output_str}lut3d=file={lut}{video_output_str}") for overlay in self.overlays: input_index = input_args.count("-i") input_args.append("-i") input_args.append(overlay) if self.resolution: filter_args.append(f"{video_output_str}scale={self.resolution.replace('x', ':')}[v]") else: if os.getenv("OLD_FFMPEG"): filter_args.append(f"{video_output_str}[{input_index}:v]scale2ref=iw:ih[v]") else: filter_args.append(f"{video_output_str}[{input_index}:v]scale=rw:rh[v]") filter_args.append(f"[v][{input_index}:v]overlay=1:eof_action=endall[v]") video_output_str = "[v]" for subtitle in self.subtitles: filter_args.append(f"{video_output_str}ass={subtitle}[v]") video_output_str = "[v]" output_args.append("-map") output_args.append(video_output_str) output_args.append("-r") output_args.append(f"{self.frame_rate}") if self.mute: input_index = input_args.count("-i") input_args += MUTE_AUDIO_INPUT filter_args.append(f"[{input_index}:a]acopy[a]") audio_track_index += 1 audio_output_str = "[a]" else: audio_output_str = "[0:a]" audio_track_index += 1 for audio in self.audios: input_index = input_args.count("-i") input_args.append("-i") input_args.append(audio.replace("\\", "/")) audio_track_index += 1 filter_args.append(f"{audio_output_str}[{input_index}:a]amix=duration=shortest:dropout_transition=0:normalize=0[a]") audio_output_str = "[a]" if audio_output_str: output_args.append("-map") output_args.append(audio_output_str) _filter_args = [] if len(filter_args) == 0 else ["-filter_complex", ";".join(filter_args)] return args + input_args + _filter_args + output_args + [self.get_output_file()] elif self.task_type == 'concat': # 无法通过 annexb 合并的 input_args = [] output_args = [*DEFAULT_ARGS] filter_args = [] audio_output_str = "" audio_track_index = 0 # output_args if len(self.input_file) == 1: _file = self.input_file[0] from util.ffmpeg import probe_video_audio if type(_file) is str: input_args += ["-i", _file] self.mute = not probe_video_audio(_file) elif isinstance(_file, FfmpegTask): input_args += ["-i", _file.get_output_file()] self.mute = not probe_video_audio(_file.get_output_file()) else: _tmp_file = "tmp_concat_" + str(time.time()) + ".txt" from util.ffmpeg import probe_video_audio with open(_tmp_file, "w", encoding="utf-8") as f: for input_file in self.input_file: if type(input_file) is str: f.write("file '" + input_file + "'\n") elif isinstance(input_file, FfmpegTask): f.write("file '" + input_file.get_output_file() + "'\n") input_args += ["-f", "concat", "-safe", "0", "-i", _tmp_file] self.mute = not probe_video_audio(_tmp_file, "concat") output_args.append("-map") output_args.append("0:v") output_args.append("-c:v") output_args.append("copy") if self.mute: input_index = input_args.count("-i") input_args += MUTE_AUDIO_INPUT audio_output_str = f"[{input_index}:a]" audio_track_index += 1 else: audio_output_str = "[0:a]" audio_track_index += 1 for audio in self.audios: input_index = input_args.count("-i") input_args.append("-i") input_args.append(audio.replace("\\", "/")) audio_track_index += 1 filter_args.append(f"{audio_output_str}[{input_index}:a]amix=duration=shortest:dropout_transition=0:normalize=0[a]") audio_output_str = "[a]" if audio_output_str: output_args.append("-map") if audio_track_index <= 1: output_args.append(audio_output_str[1:-1]) else: output_args.append(audio_output_str) output_args += AUDIO_ARGS if self.annexb: output_args.append("-bsf:v") output_args.append("h264_mp4toannexb") output_args.append("-bsf:a") output_args.append("setts=pts=DTS") output_args.append("-f") output_args.append("mpegts" if self.annexb else "mp4") _filter_args = [] if len(filter_args) == 0 else ["-filter_complex", ";".join(filter_args)] return args + input_args + _filter_args + output_args + [self.get_output_file()] elif self.task_type == 'copy': if len(self.input_file) == 1: if type(self.input_file[0]) is str: if self.input_file[0] == self.get_output_file(): return [] return args + ["-i", self.input_file[0]] + ["-c", "copy", self.get_output_file()] return [] def set_output_file(self, file=None): if file is None: if self.output_file == '': if self.annexb: self.output_file = "rand_" + str(uuid.uuid4()) + ".ts" else: self.output_file = "rand_" + str(uuid.uuid4()) + ".mp4" else: if isinstance(file, FfmpegTask): if file == self: return self.output_file = file.get_output_file() if type(file) is str: self.output_file = file def check_annexb(self): for input_file in self.input_file: if type(input_file) is str: if self.task_type == 'encode': return self.annexb elif self.task_type == 'concat': return False elif self.task_type == 'copy': return self.annexb else: return False elif isinstance(input_file, FfmpegTask): if not input_file.check_annexb(): return False return True