You've already forked lubo_toolkit
							
							
		
			
				
	
	
		
			671 lines
		
	
	
		
			26 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			671 lines
		
	
	
		
			26 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| # 工作流
 | |
| import os.path
 | |
| import platform
 | |
| import subprocess
 | |
| import sys
 | |
| import threading
 | |
| from hashlib import md5
 | |
| from typing import Optional, IO, Union
 | |
| 
 | |
| from datetime import datetime, timedelta
 | |
| from PyQt5 import QtGui
 | |
| from PyQt5.QtCore import Qt, QThread, pyqtSignal
 | |
| from PyQt5.QtWidgets import QWidget, QLabel, QApplication, QFrame, QVBoxLayout, QPushButton, \
 | |
|     QSizePolicy, QMessageBox
 | |
| from danmaku_xml_helper import get_file_start, diff_danmaku_files, NoDanmakuException
 | |
| from config import load_config, FFMPEG_EXEC, DANMAKU_FACTORY_EXEC, FFMPEG_USE_INTEL_GPU, FFMPEG_USE_NVIDIA_GPU, \
 | |
|     VIDEO_BITRATE, VIDEO_CLIP_EACH_SEC, VIDEO_CLIP_OVERFLOW_SEC, VIDEO_RESOLUTION, DANMAKU_SPEED, DEFAULT_FONT_NAME, \
 | |
|     VIDEO_OUTPUT_DIR
 | |
| 
 | |
| 
 | |
| class Job:
 | |
|     DANMAKU_ENCODE = 0
 | |
|     PURE_SPLIT = 1
 | |
| 
 | |
|     def __init__(self):
 | |
|         super(Job, self).__init__()
 | |
|         self.video = None
 | |
|         self.type = self.PURE_SPLIT
 | |
|         self.danmaku: list[str] = []
 | |
|         self.subtitles: list[str] = []
 | |
| 
 | |
|     def __repr__(self):
 | |
|         return "Job对象:Video[{}];Type[{}]".format(self.video, self.type)
 | |
| 
 | |
| 
 | |
| class WorkLabel(QLabel):
 | |
|     def __init__(self, *args, **kwargs):
 | |
|         super(WorkLabel, self).__init__(*args, **kwargs)
 | |
|         self.workVideo = None
 | |
|         self.workDanmaku: list[str] = []
 | |
|         self.running: bool = False
 | |
|         self.finished: bool = False
 | |
|         self.init_ui()
 | |
| 
 | |
|     def _update_text(self):
 | |
|         if self.workVideo is None:
 | |
|             text = "请拖入视频"
 | |
|         else:
 | |
|             text = "工作视频:{}".format(self.workVideo)
 | |
|         if len(self.workDanmaku) == 0:
 | |
|             self.setText("{}\n分割视频模式".format(text))
 | |
|         else:
 | |
|             self.setText("{}\n压制弹幕模式\n基准弹幕:{}".format(text, "\n附加弹幕:".join(self.workDanmaku)))
 | |
|         self.adjustSize()
 | |
| 
 | |
|     def mouseDoubleClickEvent(self, a0: QtGui.QMouseEvent) -> None:
 | |
|         if self.finished:
 | |
|             self.finished = False
 | |
|         elif self.workVideo is None:
 | |
|             self.parent().labelDestroy.emit(self)
 | |
|             return self.deleteLater()
 | |
|         else:
 | |
|             self.workVideo = None
 | |
|             self.workDanmaku = []
 | |
|         self.init_ui()
 | |
| 
 | |
|     def init_ui(self):
 | |
|         font = QtGui.QFont()
 | |
|         font.setPixelSize(12)
 | |
|         self.setFont(font)
 | |
|         self.setFrameShape(QFrame.Box)
 | |
|         self.setStyleSheet("""QLabel {
 | |
|             background-color: white;
 | |
|         }""")
 | |
|         self.setAlignment(Qt.AlignTop)
 | |
|         self.setWordWrap(True)
 | |
|         self.setMinimumHeight(64)
 | |
|         self.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
 | |
|         self._update_text()
 | |
|         self.setAcceptDrops(True)
 | |
| 
 | |
|     def dragEnterEvent(self, a0: QtGui.QDragEnterEvent) -> None:
 | |
|         if a0.mimeData().hasUrls():
 | |
|             a0.accept()
 | |
|         else:
 | |
|             a0.ignore()
 | |
|         print("Enter Label", a0.mimeData().urls())
 | |
| 
 | |
|     def dropEvent(self, a0: QtGui.QDropEvent) -> None:
 | |
|         urls = a0.mimeData().urls()
 | |
|         for url in urls:
 | |
|             # 判断是否为视频
 | |
|             path = url.path().strip("/")
 | |
|             if path.endswith(".mp4") or path.endswith(".flv"):
 | |
|                 self.set_video(path)
 | |
|             elif path.endswith(".xml"):
 | |
|                 self.add_danmaku(path)
 | |
|             else:
 | |
|                 print("Unknown File", path)
 | |
|         self._update_text()
 | |
| 
 | |
|     def set_video(self, file):
 | |
|         if not os.path.isfile(file):
 | |
|             raise FileNotFoundError(file)
 | |
|         if file == self.workVideo:
 | |
|             return print("Same Video", file)
 | |
|         self.workVideo = file
 | |
|         self._update_text()
 | |
| 
 | |
|     def add_danmaku(self, file: str) -> None:
 | |
|         if not os.path.isfile(file):
 | |
|             raise FileNotFoundError(file)
 | |
|         if file in self.workDanmaku:
 | |
|             return print("Already Added File", file)
 | |
|         self.workDanmaku.append(file)
 | |
|         self._update_text()
 | |
| 
 | |
|     def have_job(self) -> bool:
 | |
|         return self.workVideo is not None and not self.running and not self.finished
 | |
| 
 | |
|     def get_job(self) -> Job:
 | |
|         job = Job()
 | |
|         job.video = self.workVideo
 | |
|         if len(self.workDanmaku) > 0:
 | |
|             job.type = Job.DANMAKU_ENCODE
 | |
|             job.danmaku = self.workDanmaku
 | |
|         else:
 | |
|             job.type = Job.PURE_SPLIT
 | |
|             job.danmaku = []
 | |
|         return job
 | |
| 
 | |
|     def start_running(self) -> None:
 | |
|         self.running = True
 | |
|         self.finished = False
 | |
|         self.setStyleSheet("""QLabel {
 | |
|             color: white;
 | |
|             background-color: red;
 | |
|         }""")
 | |
| 
 | |
|     def stop_running(self) -> None:
 | |
|         self.running = False
 | |
|         self.finished = True
 | |
|         self.setStyleSheet("""QLabel {
 | |
|             color: white;
 | |
|             background-color: green;
 | |
|         }""")
 | |
| 
 | |
| 
 | |
| class HomePage(QWidget):
 | |
|     showMessageBox = pyqtSignal(str, str)
 | |
|     labelDestroy = pyqtSignal(WorkLabel)
 | |
|     processCurTime = pyqtSignal(str)
 | |
|     processSpeed = pyqtSignal(str)
 | |
|     processCurTime2 = pyqtSignal(str)
 | |
|     processSpeed2 = pyqtSignal(str)
 | |
| 
 | |
|     def __init__(self):
 | |
|         super(HomePage, self).__init__()
 | |
|         self.layout = None
 | |
|         self.labels: list[WorkLabel] = []
 | |
|         self.worker: WorkerThread
 | |
|         self.btn_start: QPushButton
 | |
|         self.showMessageBox.connect(self.on_show_message_box_info)
 | |
|         self.labelDestroy.connect(self.on_label_destroy)
 | |
|         self.processCurTime.connect(self.on_process_cur_time_change)
 | |
|         self.processSpeed.connect(self.on_process_speed_change)
 | |
|         self.processCurTime2.connect(self.on_process_cur_time2_change)
 | |
|         self.processSpeed2.connect(self.on_process_speed2_change)
 | |
|         self.process_cur_time = "-"
 | |
|         self.process_speed = "-"
 | |
|         self.process_cur_time2 = ""
 | |
|         self.process_speed2 = ""
 | |
|         self.cur_clip_duration = 0
 | |
|         self.init_ui()
 | |
| 
 | |
|     def init_ui(self):
 | |
|         layout = QVBoxLayout()
 | |
|         self.layout = QVBoxLayout()
 | |
|         layout.addLayout(self.layout)
 | |
| 
 | |
|         btn_start = QPushButton(self)
 | |
|         btn_start.setText("开始")
 | |
|         btn_start.clicked.connect(self.handle_do)
 | |
|         btn_start.setGeometry(0, 560, 400, 40)
 | |
|         layout.addWidget(btn_start, 1, Qt.AlignBottom)
 | |
|         self.setLayout(layout)
 | |
| 
 | |
|         self.btn_start = btn_start
 | |
|         self.resize(400, 600)
 | |
|         self.setWindowTitle("录播工作流")
 | |
|         self.setAcceptDrops(True)
 | |
|         self.show()
 | |
| 
 | |
|     def add_label(self):
 | |
|         label1 = WorkLabel(self)
 | |
|         self.layout.addWidget(label1)
 | |
|         self.labels.append(label1)
 | |
|         return label1
 | |
| 
 | |
|     def handle_do(self):
 | |
|         if len(self.labels) == 0:
 | |
|             return self.on_show_message_box_warn("提示", "请添加任务")
 | |
|         for label in self.labels:
 | |
|             if not label.have_job():
 | |
|                 continue
 | |
|             _thread = WorkerThread(self, label)
 | |
|             _thread.start()
 | |
|             self.worker = _thread
 | |
|             _thread.started.connect(self.on_worker_start)
 | |
|             _thread.finished.connect(self.on_worker_stop)
 | |
|             break
 | |
| 
 | |
|     def dragEnterEvent(self, a0: QtGui.QDragEnterEvent) -> None:
 | |
|         if a0.mimeData().hasUrls():
 | |
|             a0.accept()
 | |
|         else:
 | |
|             a0.ignore()
 | |
| 
 | |
|     def dropEvent(self, a0: QtGui.QDropEvent) -> None:
 | |
|         label1 = self.add_label()
 | |
|         label1.dropEvent(a0)
 | |
| 
 | |
|     def on_show_message_box_info(self, title: str, content: str):
 | |
|         QMessageBox.information(self, title, content, QMessageBox.Yes)
 | |
| 
 | |
|     def on_show_message_box_warn(self, title: str, content: str):
 | |
|         QMessageBox.warning(self, title, content, QMessageBox.Yes)
 | |
| 
 | |
|     def on_label_destroy(self, label: WorkLabel):
 | |
|         if label in self.labels:
 | |
|             self.labels.remove(label)
 | |
| 
 | |
|     def on_worker_start(self):
 | |
|         self.btn_start.setDisabled(True)
 | |
|         self.btn_start.setText("正在处理")
 | |
| 
 | |
|     def on_process_cur_time_change(self, s: str) -> None:
 | |
|         if self.process_cur_time == s:
 | |
|             return
 | |
|         self.process_cur_time = s
 | |
|         self.update_btn_process_text()
 | |
| 
 | |
|     def on_process_speed_change(self, s: str) -> None:
 | |
|         if self.process_speed == s:
 | |
|             return
 | |
|         self.process_speed = s
 | |
|         self.update_btn_process_text()
 | |
| 
 | |
|     def on_process_cur_time2_change(self, s: str) -> None:
 | |
|         if self.process_cur_time2 == s:
 | |
|             return
 | |
|         self.process_cur_time2 = s
 | |
|         self.update_btn_process_text()
 | |
| 
 | |
|     def on_process_speed2_change(self, s: str) -> None:
 | |
|         if self.process_speed2 == s:
 | |
|             return
 | |
|         self.process_speed2 = s
 | |
|         self.update_btn_process_text()
 | |
| 
 | |
|     def update_btn_process_text(self):
 | |
|         _t = []
 | |
|         if self.process_cur_time != "" and self.process_speed != "":
 | |
|             _t.append("{}@{}".format(self.process_cur_time, self.process_speed))
 | |
|         if self.process_cur_time2 != "" and self.process_speed2 != "":
 | |
|             _t.append("{}@{}".format(self.process_cur_time2, self.process_speed2))
 | |
|         if len(_t) == 0:
 | |
|             self.btn_start.setText("Working")
 | |
|         else:
 | |
|             self.btn_start.setText("|".join(_t))
 | |
| 
 | |
|     def on_worker_stop(self):
 | |
|         self.btn_start.setDisabled(False)
 | |
|         self.btn_start.setText("开始")
 | |
|         self.worker = None
 | |
|         self.handle_do()
 | |
| 
 | |
| 
 | |
| class WorkerThread(QThread):
 | |
|     def __init__(self, app: HomePage, label: WorkLabel):
 | |
|         super(WorkerThread, self).__init__()
 | |
|         self.app = app
 | |
|         self.label = label
 | |
| 
 | |
|     def get_video_real_duration(self, filename) -> float:
 | |
|         ffmpeg_process = subprocess.Popen([
 | |
|             FFMPEG_EXEC, "-hide_banner", "-progress", "-", "-loglevel", "error", "-i", filename, "-c", "copy", "-f",
 | |
|             "null", "-"
 | |
|         ], **subprocess_args(True))
 | |
|         _duration_str = self.handle_ffmpeg_output(ffmpeg_process.stdout)
 | |
|         ffmpeg_process.wait()
 | |
|         return duration_str_to_float(_duration_str)
 | |
| 
 | |
|     def run(self) -> None:
 | |
|         self.label.start_running()
 | |
|         job = self.label.get_job()
 | |
|         if job.type == Job.DANMAKU_ENCODE:
 | |
|             self.run_danmaku_encode(job)
 | |
|         elif job.type == Job.PURE_SPLIT:
 | |
|             self.quick_split_video(job.video)
 | |
|         self.label.stop_running()
 | |
| 
 | |
|     def run_danmaku_encode(self, job: Job):
 | |
|         base_danmaku = job.danmaku.pop(0)
 | |
|         time_shift = 0
 | |
|         try:
 | |
|             base_start_ts = get_file_start(base_danmaku)
 | |
|         except NoDanmakuException:
 | |
|             return
 | |
|         new_subtitle_name = danmaku_to_subtitle(base_danmaku, time_shift)
 | |
|         job.subtitles.append(new_subtitle_name)
 | |
|         for danmaku in job.danmaku:
 | |
|             try:
 | |
|                 time_shift = diff_danmaku_files(base_danmaku, danmaku)
 | |
|             except NoDanmakuException:
 | |
|                 continue
 | |
|             new_subtitle_name = danmaku_to_subtitle(danmaku, time_shift)
 | |
|             job.subtitles.append(new_subtitle_name)
 | |
|         # 压制
 | |
|         split_files = self.multi_gpu_encode_video_with_subtitles(job.video, job.subtitles, base_start_ts)
 | |
|         for file in split_files:
 | |
|             self.quick_split_video(file)
 | |
|         for _f in job.subtitles:
 | |
|             if os.path.isfile(_f):
 | |
|                 os.remove(_f)
 | |
| 
 | |
|     def encode_video_with_subtitles(self, orig_filename: str, subtitles: list[str], new_filename: str):
 | |
|         if FFMPEG_USE_NVIDIA_GPU:
 | |
|             print("[+]Use Nvidia NvEnc Acceleration")
 | |
|             encode_process = subprocess.Popen([
 | |
|                 FFMPEG_EXEC, "-hide_banner", "-progress", "-", "-loglevel", "error", "-y",
 | |
|                 "-i", orig_filename, "-vf",
 | |
|                 ",".join("subtitles=%s" % i for i in subtitles) + ",hwupload_cuda",
 | |
|                 "-c:a", "copy", "-c:v", "h264_nvenc",
 | |
|                 "-f", "mp4", "-preset:v", "fast", "-profile:v", "high", "-level", "4.1",
 | |
|                 "-b:v", VIDEO_BITRATE, "-rc:v", "vbr", "-tune:v", "hq",
 | |
|                 "-qmin", "10", "-qmax", "32", "-crf", "16",
 | |
|                 "-fflags", "+genpts", "-shortest", "-movflags", "faststart",
 | |
|                 # "-t", "10",
 | |
|                 new_filename
 | |
|             ], **subprocess_args(True))
 | |
|         elif FFMPEG_USE_INTEL_GPU:
 | |
|             if platform.system().lower() == "windows":
 | |
|                 print("[+]Use Intel QSV Acceleration")
 | |
|                 encode_process = subprocess.Popen([
 | |
|                     FFMPEG_EXEC, "-hide_banner", "-progress", "-", "-loglevel", "error", "-y",
 | |
|                     "-hwaccel", "qsv", "-i", orig_filename, "-vf",
 | |
|                     ",".join("subtitles=%s" % i for i in subtitles),
 | |
|                     "-c:a", "copy", "-c:v", "h264_qsv",
 | |
|                     "-f", "mp4", "-preset:v", "fast", "-profile:v", "high", "-level", "4.1",
 | |
|                     "-b:v", VIDEO_BITRATE, "-rc:v", "vbr", "-tune:v", "hq",
 | |
|                     "-qmin", "10", "-qmax", "32", "-crf", "16",
 | |
|                     "-fflags", "+genpts", "-shortest", "-movflags", "faststart",
 | |
|                     # "-t", "10",
 | |
|                     new_filename
 | |
|                 ], **subprocess_args(True))
 | |
|             else:
 | |
|                 print("[+]Use Intel VAAPI Acceleration")
 | |
|                 encode_process = subprocess.Popen([
 | |
|                     FFMPEG_EXEC, "-hide_banner", "-progress", "-", "-loglevel", "error", "-y",
 | |
|                     "-hwaccel", "vaapi", "-i", orig_filename, "-vf",
 | |
|                     ",".join("subtitles=%s" % i for i in subtitles) + ",hwupload",
 | |
|                     "-c:a", "copy", "-c:v", "h264_vaapi",
 | |
|                     "-f", "mp4", "-preset:v", "fast", "-profile:v", "high", "-level", "4.1",
 | |
|                     "-b:v", VIDEO_BITRATE, "-rc:v", "vbr", "-tune:v", "hq",
 | |
|                     "-qmin", "10", "-qmax", "32", "-crf", "16",
 | |
|                     "-fflags", "+genpts", "-shortest", "-movflags", "faststart",
 | |
|                     # "-t", "10",
 | |
|                     new_filename
 | |
|                 ], **subprocess_args(True))
 | |
|         else:
 | |
|             print("[+]Use CPU Encode")
 | |
|             encode_process = subprocess.Popen([
 | |
|                 FFMPEG_EXEC, "-hide_banner", "-progress", "-", "-loglevel", "error", "-y",
 | |
|                 "-i", orig_filename, "-vf",
 | |
|                 ",".join("subtitles=%s" % i for i in subtitles),
 | |
|                 "-c:a", "copy", "-c:v", "h264",
 | |
|                 "-f", "mp4", "-preset:v", "fast", "-profile:v", "high", "-level", "4.1",
 | |
|                 "-b:v", VIDEO_BITRATE, "-rc:v", "vbr",
 | |
|                 "-qmin", "10", "-qmax", "32", "-crf", "16",
 | |
|                 "-fflags", "+genpts", "-shortest", "-movflags", "faststart",
 | |
|                 # "-t", "10",
 | |
|                 new_filename
 | |
|             ], **subprocess_args(True))
 | |
|         self.handle_ffmpeg_output(encode_process.stdout)
 | |
|         return encode_process.wait()
 | |
| 
 | |
|     def multi_gpu_encode_video_with_subtitles(self, orig_filename: str, subtitles: list[str], base_ts: float):
 | |
|         new_filename = base_ts_to_filename(base_ts)
 | |
|         new_fullpath = os.path.join(VIDEO_OUTPUT_DIR, new_filename)
 | |
|         if not (FFMPEG_USE_NVIDIA_GPU and FFMPEG_USE_INTEL_GPU):
 | |
|             print("[!]Not Enabled Both GPU")
 | |
|             self.encode_video_with_subtitles(orig_filename, subtitles, new_fullpath)
 | |
|             return [new_fullpath]
 | |
|         _duration_str = self.get_video_real_duration(orig_filename)
 | |
|         duration = duration_str_to_float(_duration_str)
 | |
|         if duration > (VIDEO_CLIP_EACH_SEC * 5):
 | |
|             # qsv 压制前2段,剩余交由nvenc压制
 | |
|             _slices = int(duration / VIDEO_CLIP_EACH_SEC)
 | |
|             new_filename0 = base_ts_to_filename(base_ts + (_slices - 1) * VIDEO_CLIP_EACH_SEC)
 | |
|             new_filename1 = base_ts_to_filename(base_ts)
 | |
|             new_fullpath0 = os.path.join(VIDEO_OUTPUT_DIR, new_filename0)
 | |
|             new_fullpath1 = os.path.join(VIDEO_OUTPUT_DIR, new_filename1)
 | |
|             print("[+]Use Intel QSV Acceleration")
 | |
|             encode_process0 = subprocess.Popen([
 | |
|                 FFMPEG_EXEC, "-hide_banner", "-progress", "-", "-loglevel", "error", "-y",
 | |
|                 "-hwaccel", "qsv", "-ss", str((_slices - 1) * VIDEO_CLIP_EACH_SEC),
 | |
|                 "-copyts", "-i", orig_filename, "-vf",
 | |
|                 ",".join("subtitles=%s" % i for i in subtitles),
 | |
|                 "-c:a", "copy", "-c:v", "h264_qsv",
 | |
|                 "-f", "mp4", "-b:v", VIDEO_BITRATE, "-rc:v", "vbr", "-tune:v", "hq",
 | |
|                 *_common_ffmpeg_params(),
 | |
|                 # "-t", "10",
 | |
|                 new_fullpath0
 | |
|             ], **subprocess_args(True))
 | |
|             print("[+]Use Nvidia NvEnc Acceleration")
 | |
|             encode_process1 = subprocess.Popen([
 | |
|                 FFMPEG_EXEC, "-hide_banner", "-progress", "-", "-loglevel", "error", "-y",
 | |
|                 "-t", str((_slices - 1) * VIDEO_CLIP_EACH_SEC + (VIDEO_CLIP_OVERFLOW_SEC * 0.8)),
 | |
|                 "-i", orig_filename, "-vf",
 | |
|                 ",".join("subtitles=%s" % i for i in subtitles),
 | |
|                 "-c:a", "copy", "-c:v", "h264_nvenc", "-ss", str(VIDEO_CLIP_EACH_SEC * 2),
 | |
|                 "-f", "mp4", "-b:v", VIDEO_BITRATE, "-rc:v", "vbr", "-tune:v", "hq",
 | |
|                 *_common_ffmpeg_params(),
 | |
|                 # "-t", "10",
 | |
|                 new_fullpath1
 | |
|             ], **subprocess_args(True))
 | |
|             encode_process0.wait()
 | |
|             encode_process1.wait()
 | |
|             return [new_filename0, new_filename1]
 | |
|         elif duration > (VIDEO_CLIP_EACH_SEC * 3):
 | |
|             # 至少也要能切2片,才用双GPU加速
 | |
|             _slices = int(duration / VIDEO_CLIP_EACH_SEC)
 | |
|             new_filename0 = base_ts_to_filename(base_ts + _slices * VIDEO_CLIP_EACH_SEC, True)
 | |
|             new_filename1 = base_ts_to_filename(base_ts)
 | |
|             new_fullpath0 = os.path.join(VIDEO_OUTPUT_DIR, new_filename0)
 | |
|             new_fullpath1 = os.path.join(VIDEO_OUTPUT_DIR, new_filename1)
 | |
|             print("[+]Use Intel QSV Acceleration")
 | |
|             encode_process0 = subprocess.Popen([
 | |
|                 FFMPEG_EXEC, "-hide_banner", "-progress", "-", "-loglevel", "error", "-y",
 | |
|                 "-hwaccel", "qsv", "-ss", str(_slices * VIDEO_CLIP_EACH_SEC),
 | |
|                 "-copyts", "-i", orig_filename, "-vf",
 | |
|                 ",".join("subtitles=%s" % i for i in subtitles),
 | |
|                 "-c:a", "copy", "-c:v", "h264_qsv",
 | |
|                 "-f", "mp4", "-b:v", VIDEO_BITRATE, "-rc:v", "vbr", "-tune:v", "hq",
 | |
|                 *_common_ffmpeg_params(),
 | |
|                 # "-t", "10",
 | |
|                 new_fullpath0
 | |
|             ], **subprocess_args(True))
 | |
|             print("[+]Use Nvidia NvEnc Acceleration")
 | |
|             encode_process1 = subprocess.Popen([
 | |
|                 FFMPEG_EXEC, "-hide_banner", "-progress", "-", "-loglevel", "error", "-y",
 | |
|                 "-t", str(_slices * VIDEO_CLIP_EACH_SEC + (VIDEO_CLIP_OVERFLOW_SEC * 0.8)),
 | |
|                 "-i", orig_filename, "-vf",
 | |
|                 ",".join("subtitles=%s" % i for i in subtitles),
 | |
|                 "-c:a", "copy", "-c:v", "h264_nvenc", "-ss", str(VIDEO_CLIP_EACH_SEC),
 | |
|                 "-f", "mp4", "-b:v", VIDEO_BITRATE, "-rc:v", "vbr", "-tune:v", "hq",
 | |
|                 *_common_ffmpeg_params(),
 | |
|                 # "-t", "10",
 | |
|                 new_fullpath1
 | |
|             ], **subprocess_args(True))
 | |
|             encode_process1.wait()
 | |
|             encode_process0.wait()
 | |
|             return [new_filename1]
 | |
|         else:
 | |
|             print("[-]VideoClip to short,", duration)
 | |
|             print("[-]Fallback to normal")
 | |
|             self.encode_video_with_subtitles(orig_filename, subtitles, new_fullpath)
 | |
|             return [new_fullpath]
 | |
| 
 | |
|     def quick_split_video(self, file):
 | |
|         if not os.path.isfile(file):
 | |
|             raise FileNotFoundError(file)
 | |
|         file_name = os.path.split(file)[-1]
 | |
|         _create_dt = os.path.splitext(file_name)[0]
 | |
|         create_dt = datetime.strptime(_create_dt, "%Y%m%d_%H%M")
 | |
|         duration = self.get_video_real_duration(file)
 | |
|         current_sec = 0
 | |
|         while current_sec < duration:
 | |
|             if (current_sec + VIDEO_CLIP_OVERFLOW_SEC * 2) > duration:
 | |
|                 print("[-]Less than 2 overflow sec, skip")
 | |
|                 break
 | |
|             current_dt = (create_dt + timedelta(seconds=current_sec)).strftime("%Y%m%d_%H%M_")
 | |
|             print("CUR_DT", current_dt)
 | |
|             print("BIAS_T", current_sec)
 | |
|             split_process = subprocess.Popen([
 | |
|                 FFMPEG_EXEC, "-y", "-hide_banner", "-progress", "-", "-loglevel", "error",
 | |
|                 "-ss", str(current_sec),
 | |
|                 "-i", file_name, "-c", "copy", "-f", "mp4",
 | |
|                 "-t", str(VIDEO_CLIP_EACH_SEC + VIDEO_CLIP_OVERFLOW_SEC),
 | |
|                 "-fflags", "+genpts", "-shortest", "-movflags", "faststart",
 | |
|                 "{}.mp4".format(current_dt)
 | |
|             ], **subprocess_args(True))
 | |
|             self.handle_ffmpeg_output(split_process.stdout)
 | |
|             current_sec += VIDEO_CLIP_EACH_SEC
 | |
| 
 | |
|     def handle_ffmpeg_output(self, stdout: Optional[IO[bytes]], second=False) -> str:
 | |
|         out_time = "0:0:0.0"
 | |
|         if stdout is None:
 | |
|             return out_time
 | |
|         speed = "0"
 | |
|         while True:
 | |
|             line = stdout.readline()
 | |
|             if line == b"":
 | |
|                 break
 | |
|             if line.strip() == b"progress=end":
 | |
|                 # 处理完毕
 | |
|                 break
 | |
|             if line.startswith(b"out_time="):
 | |
|                 out_time = line.replace(b"out_time=", b"").decode().strip()
 | |
|                 if second:
 | |
|                     self.app.processCurTime2.emit(out_time)
 | |
|                 else:
 | |
|                     self.app.processCurTime.emit(out_time)
 | |
|             if line.startswith(b"speed="):
 | |
|                 speed = line.replace(b"speed=", b"").decode().strip()
 | |
|                 if second:
 | |
|                     self.app.processSpeed2.emit(speed)
 | |
|                 else:
 | |
|                     self.app.processSpeed.emit(speed)
 | |
|         if second:
 | |
|             self.app.processSpeed2.emit("")
 | |
|             self.app.processCurTime2.emit("")
 | |
|         else:
 | |
|             self.app.processSpeed.emit("")
 | |
|             self.app.processCurTime.emit("")
 | |
|         return out_time
 | |
| 
 | |
| 
 | |
| class NvencWorkerThread(QThread):
 | |
|     ...
 | |
| 
 | |
| 
 | |
| class IntelWorkerThread(QThread):
 | |
|     ...
 | |
| 
 | |
| 
 | |
| class SplitWorkerThread(QThread):
 | |
|     ...
 | |
| 
 | |
| 
 | |
| def duration_str_to_float(duration_str) -> float:
 | |
|     _duration = datetime.strptime(duration_str, "%H:%M:%S.%f") - datetime(1900, 1, 1)
 | |
|     return _duration.total_seconds()
 | |
| 
 | |
| 
 | |
| def base_ts_to_filename(start_ts: float, is_mp4=False) -> str:
 | |
|     base_start = datetime.fromtimestamp(start_ts)
 | |
|     if is_mp4:
 | |
|         return base_start.strftime("%Y%m%d_%H%M.mp4")
 | |
|     else:
 | |
|         return base_start.strftime("%Y%m%d_%H%M.flv")
 | |
| 
 | |
| 
 | |
| def danmaku_to_subtitle(file: Union[os.PathLike[str], str], time_shift: float):
 | |
|     new_subtitle_name = md5(file.encode("utf-8")).hexdigest() + ".ass"
 | |
|     process = subprocess.Popen((
 | |
|         DANMAKU_FACTORY_EXEC, "--ignore-warnings",
 | |
|         "-r", str(VIDEO_RESOLUTION), "-s", str(DANMAKU_SPEED), "-f", "5",
 | |
|         "-S", "40", "-N", str(DEFAULT_FONT_NAME), "--showmsgbox", "FALSE",
 | |
|         "-O", "255", "-L", "1", "-D", "0",
 | |
|         "-o", "ass", new_subtitle_name, "-i", file, "-t", str(time_shift)
 | |
|     ), **subprocess_args(True))
 | |
|     process.wait()
 | |
|     return new_subtitle_name
 | |
| 
 | |
| 
 | |
| # Create a set of arguments which make a ``subprocess.Popen`` (and
 | |
| # variants) call work with or without Pyinstaller, ``--noconsole`` or
 | |
| # not, on Windows and Linux. Typical use::
 | |
| #
 | |
| #   subprocess.call(['program_to_run', 'arg_1'], **subprocess_args())
 | |
| #
 | |
| # When calling ``check_output``::
 | |
| #
 | |
| #   subprocess.check_output(['program_to_run', 'arg_1'],
 | |
| #                           **subprocess_args(False))
 | |
| def subprocess_args(include_stdout=True):
 | |
|     # The following is true only on Windows.
 | |
|     if hasattr(subprocess, 'STARTUPINFO'):
 | |
|         # On Windows, subprocess calls will pop up a command window by default
 | |
|         # when run from Pyinstaller with the ``--noconsole`` option. Avoid this
 | |
|         # distraction.
 | |
|         si = subprocess.STARTUPINFO()
 | |
|         si.dwFlags |= subprocess.STARTF_USESHOWWINDOW
 | |
|         # Windows doesn't search the path by default. Pass it an environment so
 | |
|         # it will.
 | |
|         env = os.environ
 | |
|     else:
 | |
|         si = None
 | |
|         env = None
 | |
| 
 | |
|     # ``subprocess.check_output`` doesn't allow specifying ``stdout``::
 | |
|     #
 | |
|     #   Traceback (most recent call last):
 | |
|     #     File "test_subprocess.py", line 58, in <module>
 | |
|     #       **subprocess_args(stdout=None))
 | |
|     #     File "C:\Python27\lib\subprocess.py", line 567, in check_output
 | |
|     #       raise ValueError('stdout argument not allowed, it will be overridden.')
 | |
|     #   ValueError: stdout argument not allowed, it will be overridden.
 | |
|     #
 | |
|     # So, add it only if it's needed.
 | |
|     if include_stdout:
 | |
|         ret = {'stdout': subprocess.PIPE}
 | |
|     else:
 | |
|         ret = {}
 | |
| 
 | |
|     # On Windows, running this from the binary produced by Pyinstaller
 | |
|     # with the ``--noconsole`` option requires redirecting everything
 | |
|     # (stdin, stdout, stderr) to avoid an OSError exception
 | |
|     # "[Error 6] the handle is invalid."
 | |
|     ret.update({'stdin': subprocess.PIPE,
 | |
|                 'stderr': subprocess.PIPE,
 | |
|                 'startupinfo': si,
 | |
|                 'env': env})
 | |
|     return ret
 | |
| 
 | |
| 
 | |
| def check_exec(name: Union[os.PathLike[str], str]) -> bool:
 | |
|     if is_windows():
 | |
|         check_process = subprocess.Popen([
 | |
|             "where.exe", name
 | |
|         ], **subprocess_args(True))
 | |
|         check_process.wait()
 | |
|         return len(check_process.stdout.readlines()) > 0
 | |
|     elif is_linux():
 | |
|         check_process = subprocess.Popen([
 | |
|             "which", name
 | |
|         ])
 | |
|         check_process.wait()
 | |
|         return check_process.returncode == 0
 | |
|     else:
 | |
|         return False
 | |
| 
 | |
| 
 | |
| def is_windows() -> bool:
 | |
|     return platform.system().lower() == "windows"
 | |
| 
 | |
| 
 | |
| def is_linux() -> bool:
 | |
|     return platform.system().lower() == "linux"
 | |
| 
 | |
| 
 | |
| def check_all_prerequisite():
 | |
|     if not check_exec(DANMAKU_FACTORY_EXEC):
 | |
|         input("弹幕处理工具不存在")
 | |
|         exit(1)
 | |
|     if not check_exec(FFMPEG_EXEC):
 | |
|         input("FFMPEG工具不存在")
 | |
|         exit(1)
 | |
| 
 | |
| 
 | |
| def _common_ffmpeg_params():
 | |
|     return (
 | |
|         "-preset:v", "fast", "-profile:v", "high", "-level", "4.1",
 | |
|         "-qmin", "10", "-qmax", "48", "-crf", "26",
 | |
|         "-fflags", "+genpts", "-shortest"
 | |
|     )
 | |
| 
 | |
| 
 | |
| def main():
 | |
|     check_all_prerequisite()
 | |
|     app = QApplication(sys.argv)
 | |
|     page = HomePage()
 | |
|     sys.exit(app.exec_())
 | |
| 
 | |
| 
 | |
| if __name__ == '__main__':
 | |
|     load_config()
 | |
|     main()
 |