import os import subprocess import re import copy import time template_video_part = { "filename": "", "file_ext": "flv", "start": "", "end": "", "part_start": "", "duration": "", } template_current_video = { "parts": [], "pack_ext": "mp4", "out_ext": "mp4", "suffix": "", "target_name": "", } def merge_video_parts_and_split(video): if len(video["parts"]) <= 1: raise Exception("视频片段不够") prev_part = video["parts"].pop(0) if prev_part["part_start"] is None: raise Exception("视频片段part_start读取异常") ts_part = [] for part in video["parts"]: if part["part_start"] is None: raise Exception("视频片段part_start读取异常") if prev_part["duration"] == "": duration = float(part["part_start"]) - float(prev_part["part_start"]) if duration <= 60: raise Exception("视频片段duration过小") prev_part["duration"] = str(duration) new_filename = str(time.time()) os.system(" ".join([ "ffmpeg", "-y", "-i", "{filename}.{file_ext}".format_map(prev_part), "-c copy", "-f mpegts", "-t {duration}".format_map(prev_part), "{}.ts".format(new_filename) ])) os.system(" ".join([ "ffmpeg", "-y", "-i", "{}.ts".format(new_filename), "-c copy", "-f mpegts", "-ss {start}".format_map(prev_part) if prev_part["start"] != "" else "", "-to {end}".format_map(prev_part) if prev_part["end"] != "" else "", "{filename}.ts".format_map(prev_part) ])) os.remove("{}.ts".format(new_filename)) ts_part.append("{filename}.ts".format_map(prev_part)) prev_part = part os.system(" ".join([ "ffmpeg", "-y", "-i", "{filename}.{file_ext}".format_map(prev_part), "-c copy", "-f mpegts", "-ss {start}".format_map(prev_part) if prev_part["start"] != "" else "", "-to {end}".format_map(prev_part) if prev_part["end"] != "" else "", "{filename}.ts".format_map(prev_part) ])) ts_part.append("{filename}.ts".format_map(prev_part)) os.system(" ".join([ "ffmpeg", "-y", "-i", "\"concat:{}\"".format("|".join(ts_part)), "-c copy", "-f {pack_ext}".format_map(video), "{target_name}{suffix}.{out_ext}".format_map(video), ])) for _delete in ts_part: os.remove(_delete) def get_video_part_start(file): s = subprocess.Popen([ "ffprobe", file ], bufsize=0, stderr=subprocess.PIPE, universal_newlines=True) while True: line = s.stderr.readline() if line == "" and not s.poll(): break line = line.strip() match = re.search(r", start: (\d+\.\d+)", line) if match: return match.group(1) with open("../a.txt", "r", encoding="gbk") as f: skip_current = False current_video = copy.deepcopy(template_current_video) for current_line in f.readlines(): current_line = current_line.strip() if current_line == "": if skip_current: skip_current = False else: merge_video_parts_and_split(current_video) current_video = copy.deepcopy(template_current_video) continue if current_line.startswith("=" * 5): break if "-" in current_line: skip_current = True continue if "~" in current_line: file_params = current_line.split("~") for file_param in file_params: video_part = copy.deepcopy(template_video_part) bias_time = "" if "+" in file_param: filename, bias_time = file_param.split("+", 1) else: filename = file_param if "." in filename: filename, file_ext = filename.split(".", 1) video_part['file_ext'] = file_ext video_part["filename"] = filename if current_video["suffix"] == "": suffix, _ = filename.split("_") current_video['suffix'] = "_" + suffix[4:] if len(current_video["parts"]) == 0: video_part['start'] = bias_time else: video_part['end'] = bias_time video_part['part_start'] = get_video_part_start("{filename}.{file_ext}".format_map(video_part)) current_video["parts"].append(video_part) continue else: if len(current_video["parts"]) == 0: skip_current = True continue current_video['target_name'] = current_line continue