import uuid class FfmpegTask(object): def __init__(self, input_file, task_type='copy', output_file=''): if type(input_file) is str: self.input_file = [input_file] elif type(input_file) is list: self.input_file = input_file else: self.input_file = [] self.task_type = task_type self.output_file = output_file self.mute = True self.speed = 1 self.frame_rate = 25 self.subtitles = [] self.luts = [] self.audios = [] self.overlays = [] self.annexb = False def __repr__(self): _str = f'FfmpegTask(input_file={self.input_file}, task_type={self.task_type}' if len(self.luts) > 0: _str += f', luts={self.luts}' if len(self.audios) > 0: _str += f', audios={self.audios}' if len(self.overlays) > 0: _str += f', overlays={self.overlays}' if self.annexb: _str += f', annexb={self.annexb}' if self.mute: _str += f', mute={self.mute}' return _str + ')' def analyze_input_render_tasks(self): for i in self.input_file: if type(i) is str: continue elif isinstance(i, FfmpegTask): if i.need_run(): yield i def need_run(self): """ 判断是否需要运行 :rtype: bool :return: """ if self.annexb: return True # TODO: copy from url return not self.check_can_copy() def add_inputs(self, *inputs): self.input_file.extend(inputs) def add_overlay(self, *overlays): for overlay in overlays: if str(overlay).endswith('.ass'): self.subtitles.append(overlay) else: self.overlays.append(overlay) self.correct_task_type() def add_audios(self, *audios): self.audios.extend(audios) self.correct_task_type() self.check_audio_track() def add_lut(self, *luts): self.luts.extend(luts) self.correct_task_type() def get_output_file(self): if self.task_type == 'copy': return self.input_file[0] if self.output_file == '': self.set_output_file() return self.output_file def correct_task_type(self): if self.check_can_copy(): self.task_type = 'copy' elif self.check_can_concat(): self.task_type = 'concat' else: self.task_type = 'encode' def check_can_concat(self): if len(self.luts) > 0: return False if len(self.overlays) > 0: return False if len(self.subtitles) > 0: return False if self.speed != 1: return False return True def check_can_copy(self): if len(self.luts) > 0: return False if len(self.overlays) > 0: return False if len(self.subtitles) > 0: return False if self.speed != 1: return False if len(self.audios) > 1: return False if len(self.input_file) > 1: return False return True def check_audio_track(self): if len(self.audios) > 0: self.mute = False def get_ffmpeg_args(self): args = ['-y', '-hide_banner'] video_output_str = "[0:v]" if self.task_type == 'encode': input_args = [] filter_args = [] output_args = ["-shortest", "-c:v h264_qsv"] video_output_str = "[0:v]" audio_output_str = "[0:v]" video_input_count = 0 audio_input_count = 0 for input_file in self.input_file: input_args.append("-i") if type(input_file) is str: input_args.append(input_file) elif isinstance(input_file, FfmpegTask): input_args.append(input_file.get_output_file()) for lut in self.luts: filter_args.append("[0:v]lut3d=file=" + lut + "[0:v]") for overlay in self.overlays: input_index = input_args.count("-i") input_args.append("-i") input_args.append(overlay) filter_args.append(f"{video_output_str}[{input_index}:v]scale=rw:rh[v]") filter_args.append(f"[v][{input_index}:v]overlay=1:eof_action=endall[v]") video_output_str = "[v]" for subtitle in self.subtitles: filter_args.append(f"{video_output_str}ass={subtitle}[v]") video_output_str = "[v]" output_args.append("-map") output_args.append(video_output_str) output_args.append("-r") output_args.append(f"{self.frame_rate}") if self.mute: output_args.append("-an") else: input_index = 0 for audio in self.audios: input_index = input_args.count("-i") input_args.append("-i") input_args.append(audio.replace("\\", "/")) if audio_input_count > 0: filter_args.append(f"{audio_output_str}[{input_index}:a]amix[a]") audio_output_str = "[a]" else: audio_output_str = f"[{input_index}:a]" audio_input_count += 1 if audio_input_count == 1: audio_output_str = f"{input_index}" output_args.append(f"-map") output_args.append(audio_output_str) return args + input_args + ["-filter_complex", ";".join(filter_args)] + output_args + [self.get_output_file()] elif self.task_type == 'concat': input_args = [] output_args = ["-shortest", "-c:v", "h264_qsv", "-r", "25"] filter_args = [] video_output_str = "[0:v]" audio_output_str = "[0:v]" video_input_count = 0 audio_input_count = 0 for input_file in self.input_file: input_index = input_args.count("-i") input_args.append("-i") if type(input_file) is str: input_args.append(input_file.replace("\\", "/")) elif isinstance(input_file, FfmpegTask): input_args.append(input_file.get_output_file().replace("\\", "/")) if video_input_count > 0: filter_args.append(f"{video_output_str}[{input_index}:v]concat=n=2:v=1:a=0[v]") video_output_str = "[v]" else: video_output_str = f"[{input_index}:v]" video_input_count += 1 output_args.append("-map") output_args.append(video_output_str) if self.mute: output_args.append("-an") else: input_index = 0 for audio in self.audios: input_index = input_args.count("-i") input_args.append("-i") input_args.append(audio.replace("\\", "/")) if audio_input_count > 0: filter_args.append(f"{audio_output_str}[{input_index}:a]amix[a]") audio_output_str = "[a]" else: audio_output_str = f"[{input_index}:a]" audio_input_count += 1 if audio_input_count == 1: audio_output_str = f"{input_index}" output_args.append(f"-map") output_args.append(audio_output_str) return args + input_args + ["-filter_complex", ";".join(filter_args)] + output_args + [self.get_output_file()] elif self.task_type == 'copy': if len(self.input_file) == 1: if type(self.input_file[0]) is str: if self.input_file[0] == self.get_output_file(): return None return args + ["-i", self.input_file[0]] + ["-c", "copy", self.get_output_file()] def set_output_file(self, file=None): if file is None: if self.output_file == '': self.output_file = "rand_" + str(uuid.uuid4()) + ".mp4" else: self.output_file = file