lubo_toolkit/auto_merge_video.py
2021-09-02 13:19:50 +08:00

130 lines
4.7 KiB
Python

import os
import subprocess
import re
import copy
import time
template_video_part = {
"filename": "",
"file_ext": "flv",
"start": "",
"end": "",
"part_start": "",
"duration": "",
}
template_current_video = {
"parts": [],
"pack_ext": "mp4",
"out_ext": "mp4",
"suffix": "",
"target_name": "",
}
def merge_video_parts_and_split(video):
if len(video["parts"]) <= 1:
raise Exception("视频片段不够")
prev_part = video["parts"].pop(0)
if prev_part["part_start"] is None:
raise Exception("视频片段part_start读取异常")
ts_part = []
for part in video["parts"]:
if part["part_start"] is None:
raise Exception("视频片段part_start读取异常")
if prev_part["duration"] == "":
duration = float(part["part_start"]) - float(prev_part["part_start"])
if duration <= 60:
raise Exception("视频片段duration过小")
prev_part["duration"] = str(duration)
new_filename = str(time.time())
os.system(" ".join([
"ffmpeg", "-y", "-i", "{filename}.{file_ext}".format_map(prev_part),
"-c copy", "-f mpegts", "-t {duration}".format_map(prev_part),
"{}.ts".format(new_filename)
]))
os.system(" ".join([
"ffmpeg", "-y", "-i", "{}.ts".format(new_filename),
"-c copy", "-f mpegts", "-ss {start}".format_map(prev_part) if prev_part["start"] != "" else "",
"-to {end}".format_map(prev_part) if prev_part["end"] != "" else "",
"{filename}.ts".format_map(prev_part)
]))
os.remove("{}.ts".format(new_filename))
ts_part.append("{filename}.ts".format_map(prev_part))
prev_part = part
os.system(" ".join([
"ffmpeg", "-y", "-i", "{filename}.{file_ext}".format_map(prev_part),
"-c copy", "-f mpegts", "-ss {start}".format_map(prev_part) if prev_part["start"] != "" else "",
"-to {end}".format_map(prev_part) if prev_part["end"] != "" else "",
"{filename}.ts".format_map(prev_part)
]))
ts_part.append("{filename}.ts".format_map(prev_part))
os.system(" ".join([
"ffmpeg", "-y", "-i", "\"concat:{}\"".format("|".join(ts_part)),
"-c copy", "-f {pack_ext}".format_map(video),
"{target_name}{suffix}.{out_ext}".format_map(video),
]))
for _delete in ts_part:
os.remove(_delete)
def get_video_part_start(file):
s = subprocess.Popen([
"ffprobe", file
], bufsize=0, stderr=subprocess.PIPE, universal_newlines=True)
while True:
line = s.stderr.readline()
if line == "" and not s.poll():
break
line = line.strip()
match = re.search(r", start: (\d+\.\d+)", line)
if match:
return match.group(1)
with open("../a.txt", "r", encoding="gbk") as f:
skip_current = False
current_video = copy.deepcopy(template_current_video)
for current_line in f.readlines():
current_line = current_line.strip()
if current_line == "":
if skip_current:
skip_current = False
else:
merge_video_parts_and_split(current_video)
current_video = copy.deepcopy(template_current_video)
continue
if current_line.startswith("=" * 5):
break
if "-" in current_line:
skip_current = True
continue
if "~" in current_line:
file_params = current_line.split("~")
for file_param in file_params:
video_part = copy.deepcopy(template_video_part)
bias_time = ""
if "+" in file_param:
filename, bias_time = file_param.split("+", 1)
else:
filename = file_param
if "." in filename:
filename, file_ext = filename.split(".", 1)
video_part['file_ext'] = file_ext
video_part["filename"] = filename
if current_video["suffix"] == "":
suffix, _ = filename.split("_")
current_video['suffix'] = "_" + suffix[4:]
if len(current_video["parts"]) == 0:
video_part['start'] = bias_time
else:
video_part['end'] = bias_time
video_part['part_start'] = get_video_part_start("{filename}.{file_ext}".format_map(video_part))
current_video["parts"].append(video_part)
continue
else:
if len(current_video["parts"]) == 0:
skip_current = True
continue
current_video['target_name'] = current_line
continue