import time import uuid class FfmpegTask(object): def __init__(self, input_file, task_type='copy', output_file=''): self.annexb = False if type(input_file) is str: if input_file.endswith(".ts"): self.annexb = True self.input_file = [input_file] elif type(input_file) is list: self.input_file = input_file else: self.input_file = [] self.task_type = task_type self.output_file = output_file self.mute = True self.speed = 1 self.frame_rate = 25 self.subtitles = [] self.luts = [] self.audios = [] self.overlays = [] def __repr__(self): _str = f'FfmpegTask(input_file={self.input_file}, task_type={self.task_type}' if len(self.luts) > 0: _str += f', luts={self.luts}' if len(self.audios) > 0: _str += f', audios={self.audios}' if len(self.overlays) > 0: _str += f', overlays={self.overlays}' if self.annexb: _str += f', annexb={self.annexb}' if self.mute: _str += f', mute={self.mute}' return _str + ')' def analyze_input_render_tasks(self): for i in self.input_file: if type(i) is str: continue elif isinstance(i, FfmpegTask): if i.need_run(): yield i def need_run(self): """ 判断是否需要运行 :rtype: bool :return: """ if self.annexb: return True # TODO: copy from url return not self.check_can_copy() def add_inputs(self, *inputs): self.input_file.extend(inputs) def add_overlay(self, *overlays): for overlay in overlays: if str(overlay).endswith('.ass'): self.subtitles.append(overlay) else: self.overlays.append(overlay) self.correct_task_type() def add_audios(self, *audios): self.audios.extend(audios) self.correct_task_type() self.check_audio_track() def add_lut(self, *luts): self.luts.extend(luts) self.correct_task_type() def get_output_file(self): if self.task_type == 'copy': return self.input_file[0] if self.output_file == '': self.set_output_file() return self.output_file def correct_task_type(self): if self.check_can_copy(): self.task_type = 'copy' elif self.check_can_concat(): self.task_type = 'concat' else: self.task_type = 'encode' def check_can_concat(self): if len(self.luts) > 0: return False if len(self.overlays) > 0: return False if len(self.subtitles) > 0: return False if self.speed != 1: return False return True def check_can_copy(self): if len(self.luts) > 0: return False if len(self.overlays) > 0: return False if len(self.subtitles) > 0: return False if self.speed != 1: return False if len(self.audios) > 1: return False if len(self.input_file) > 1: return False return True def check_audio_track(self): if len(self.audios) > 0: self.mute = False def get_ffmpeg_args(self): args = ['-y', '-hide_banner'] video_output_str = "[0:v]" if self.task_type == 'encode': input_args = [] filter_args = [] output_args = ["-shortest", "-c:v", "h264_qsv"] if self.annexb: output_args.append("-bsf:v") output_args.append("h264_mp4toannexb") video_output_str = "[0:v]" audio_output_str = "[0:v]" video_input_count = 0 audio_input_count = 0 for input_file in self.input_file: input_args.append("-i") if type(input_file) is str: input_args.append(input_file) elif isinstance(input_file, FfmpegTask): input_args.append(input_file.get_output_file()) for lut in self.luts: filter_args.append("[0:v]lut3d=file=" + lut + "[0:v]") for overlay in self.overlays: input_index = input_args.count("-i") input_args.append("-i") input_args.append(overlay) filter_args.append(f"{video_output_str}[{input_index}:v]scale=rw:rh[v]") filter_args.append(f"[v][{input_index}:v]overlay=1:eof_action=endall[v]") video_output_str = "[v]" for subtitle in self.subtitles: filter_args.append(f"{video_output_str}ass={subtitle}[v]") video_output_str = "[v]" output_args.append("-map") output_args.append(video_output_str) output_args.append("-r") output_args.append(f"{self.frame_rate}") if self.mute: output_args.append("-an") else: input_index = 0 for audio in self.audios: input_index = input_args.count("-i") input_args.append("-i") input_args.append(audio.replace("\\", "/")) if audio_input_count > 0: filter_args.append(f"{audio_output_str}[{input_index}:a]amix[a]") audio_output_str = "[a]" else: audio_output_str = f"[{input_index}:a]" audio_input_count += 1 if audio_input_count == 1: audio_output_str = f"{input_index}" output_args.append(f"-map") output_args.append(audio_output_str) return args + input_args + ["-filter_complex", ";".join(filter_args)] + output_args + [self.get_output_file()] elif self.task_type == 'concat': # 无法通过 annexb 合并的 input_args = [] output_args = ["-shortest"] if self.check_annexb() and len(self.audios) <= 1: # output_args if len(self.audios) > 0: input_args.append("-an") _tmp_file = "tmp_concat_"+str(time.time())+".txt" with open(_tmp_file, "w", encoding="utf-8") as f: for input_file in self.input_file: if type(input_file) is str: f.write("file '"+input_file+"'\n") elif isinstance(input_file, FfmpegTask): f.write("file '" + input_file.get_output_file() + "'\n") input_args.append("-f") input_args.append("concat") input_args.append("-safe") input_args.append("0") input_args.append("-i") input_args.append(_tmp_file) output_args.append("-c:v") output_args.append("copy") if len(self.audios) > 0: input_args.append("-i") input_args.append(self.audios[0]) output_args.append("-c:a") output_args.append("copy") output_args.append("-f") output_args.append("mp4") return args + input_args + output_args + [self.get_output_file()] output_args.append("-c:v") output_args.append("h264_qsv") output_args.append("-r") output_args.append("25") filter_args = [] video_output_str = "[0:v]" audio_output_str = "[0:a]" video_input_count = 0 audio_input_count = 0 for input_file in self.input_file: input_index = input_args.count("-i") input_args.append("-i") if type(input_file) is str: input_args.append(input_file.replace("\\", "/")) elif isinstance(input_file, FfmpegTask): input_args.append(input_file.get_output_file().replace("\\", "/")) if video_input_count > 0: filter_args.append(f"{video_output_str}[{input_index}:v]concat=n=2:v=1:a=0[v]") video_output_str = "[v]" else: video_output_str = f"[{input_index}:v]" video_input_count += 1 output_args.append("-map") output_args.append(video_output_str) if self.mute: output_args.append("-an") else: input_index = 0 for audio in self.audios: input_index = input_args.count("-i") input_args.append("-i") input_args.append(audio.replace("\\", "/")) if audio_input_count > 0: filter_args.append(f"{audio_output_str}[{input_index}:a]amix[a]") audio_output_str = "[a]" else: audio_output_str = f"[{input_index}:a]" audio_input_count += 1 if audio_input_count == 1: audio_output_str = f"{input_index}" output_args.append(f"-map") output_args.append(audio_output_str) return args + input_args + ["-filter_complex", ";".join(filter_args)] + output_args + [self.get_output_file()] elif self.task_type == 'copy': if len(self.input_file) == 1: if type(self.input_file[0]) is str: if self.input_file[0] == self.get_output_file(): return [] return args + ["-i", self.input_file[0]] + ["-c", "copy", self.get_output_file()] def set_output_file(self, file=None): if file is None: if self.output_file == '': if self.annexb: self.output_file = "rand_" + str(uuid.uuid4()) + ".ts" else: self.output_file = "rand_" + str(uuid.uuid4()) + ".mp4" else: self.output_file = file def check_annexb(self): for input_file in self.input_file: if type(input_file) is str: if self.task_type == 'encode': return self.annexb elif self.task_type == 'concat': return False elif self.task_type == 'copy': return self.annexb else: return False elif isinstance(input_file, FfmpegTask): if not input_file.check_annexb(): return False return True