You've already forked my-video-workflow
初次提交
This commit is contained in:
0
workflow/__init__.py
Normal file
0
workflow/__init__.py
Normal file
61
workflow/danmaku.py
Normal file
61
workflow/danmaku.py
Normal file
@ -0,0 +1,61 @@
|
||||
import datetime
|
||||
import os
|
||||
import argparse
|
||||
import subprocess
|
||||
from hashlib import md5
|
||||
from typing import Union
|
||||
|
||||
from bs4 import BeautifulSoup
|
||||
|
||||
from config import DANMAKU_FACTORY_EXEC, VIDEO_RESOLUTION, DANMAKU_SPEED, DEFAULT_FONT_NAME
|
||||
from exception.danmaku import NoDanmakuException, DanmakuFormatErrorException
|
||||
from util.file import check_file_exist
|
||||
|
||||
|
||||
def get_file_start(file: Union[os.PathLike[str], str]) -> float:
|
||||
with open(file, "r", encoding="utf-8") as f:
|
||||
soup = BeautifulSoup("".join(f.readlines()), "lxml")
|
||||
danmaku_item = soup.find("d")
|
||||
if danmaku_item is None:
|
||||
# 没有弹幕?
|
||||
raise NoDanmakuException()
|
||||
danmaku_info = danmaku_item["p"]
|
||||
split_info = danmaku_info.split(",")
|
||||
if len(split_info) < 5:
|
||||
raise DanmakuFormatErrorException()
|
||||
bias_sec = float(split_info[0])
|
||||
bias_ts_ms = int(split_info[4])
|
||||
return bias_ts_ms / 1000 - bias_sec
|
||||
|
||||
|
||||
def diff_danmaku_files(base_file: Union[os.PathLike[str], str], file: Union[os.PathLike[str], str]) -> float:
|
||||
return get_file_start(file) - get_file_start(base_file)
|
||||
|
||||
|
||||
def danmaku_to_subtitle(file: Union[os.PathLike[str], str], time_shift: float):
|
||||
new_subtitle_name = md5(file).digest().hex() + ".ass"
|
||||
process = subprocess.Popen((
|
||||
DANMAKU_FACTORY_EXEC,
|
||||
"-r", str(VIDEO_RESOLUTION), "-s", str(DANMAKU_SPEED), "-f", "5",
|
||||
"-S", "40", "-N", str(DEFAULT_FONT_NAME), "--showmsgbox", "FALSE",
|
||||
"-O", "255", "-L", "1", "-D", "0",
|
||||
"-o", "ass", new_subtitle_name, "-i", file, "-t", str(time_shift)
|
||||
))
|
||||
process.wait()
|
||||
return new_subtitle_name
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("base", help="以此为标准")
|
||||
parser.add_argument("file", nargs="+", help="需要对齐的文件")
|
||||
args = parser.parse_args()
|
||||
check_file_exist(args.base)
|
||||
base_start_ts = get_file_start(args.base)
|
||||
base_start = datetime.datetime.fromtimestamp(base_start_ts)
|
||||
print("[+] 基准文件[{:s}],开始时间:{:.3f},时间:{}".format(args.base, base_start_ts, base_start))
|
||||
for _file in args.file:
|
||||
check_file_exist(_file)
|
||||
file_start_ts = get_file_start(_file)
|
||||
diff_sec = file_start_ts - base_start_ts
|
||||
print("[+] 待调整文件[{:s}],开始时间:{:.3f},偏差:{:.3f}".format(_file, file_start_ts, diff_sec))
|
13
workflow/queues.py
Normal file
13
workflow/queues.py
Normal file
@ -0,0 +1,13 @@
|
||||
from multiprocessing import Queue
|
||||
|
||||
from dto.DanmakuJobItem import DanmakuJobItem
|
||||
from dto.EncodeJobItem import EncodeJobItem
|
||||
from dto.SplitJobItem import SplitJobItem
|
||||
from dto.TypeJobResult import TypeJobResult
|
||||
from entity.WorkflowItem import WorkflowItem
|
||||
|
||||
JOB_EVENT_QUEUE: "Queue[TypeJobResult]" = Queue()
|
||||
JOB_QUEUE: "Queue[WorkflowItem]" = Queue()
|
||||
DANMAKU_PROCESSING_QUEUE: "Queue[DanmakuJobItem]" = Queue()
|
||||
VIDEO_ENCODING_QUEUE: "Queue[EncodeJobItem]" = Queue()
|
||||
VIDEO_SPLITING_QUEUE: "Queue[SplitJobItem]" = Queue()
|
16
workflow/video.py
Normal file
16
workflow/video.py
Normal file
@ -0,0 +1,16 @@
|
||||
import re
|
||||
import subprocess
|
||||
|
||||
|
||||
def get_video_real_duration(filename):
|
||||
ffmpeg_process = subprocess.Popen([
|
||||
"ffmpeg", "-hide_banner", "-i", filename, "-c", "copy", "-f", "null", "-"
|
||||
], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||
result = "0:0:0.0"
|
||||
for line in ffmpeg_process.stderr.readlines():
|
||||
match_result = re.findall("(?<= time=).+?(?= )", line.decode())
|
||||
if len(match_result) == 0:
|
||||
continue
|
||||
result = match_result.pop()
|
||||
return result
|
||||
|
Reference in New Issue
Block a user