# 工作流 import datetime import os.path import subprocess import sys from typing import Optional, IO, AnyStr from PyQt5 import QtGui from PyQt5.QtCore import Qt, QThread, pyqtSignal from PyQt5.QtWidgets import QWidget, QLabel, QApplication, QFrame, QVBoxLayout, QPushButton, \ QSizePolicy, QMessageBox from danmaku_xml_helper import get_file_start, diff_danmaku_files DANMAKU_PATH = "workspace" DANMAKU_FACTORY_CLI = "DFCLI.exe" FFMPEG_TOOL = "ffmpeg.exe" SPLIT_PATH = "split" SPLIT_TOOL = "quick_split_video.exe" HOME_PATH = os.path.realpath(".") class Job: DANMAKU_ENCODE = 0 PURE_SPLIT = 1 def __init__(self): super(Job, self).__init__() self.video = None self.type = self.PURE_SPLIT self.danmaku: list[str] = [] self.subtitles: list[str] = [] def __repr__(self): return "Job对象:Video[{}];Type[{}]".format(self.video, self.type) class WorkLabel(QLabel): def __init__(self, *args, **kwargs): super(WorkLabel, self).__init__(*args, **kwargs) self.workVideo = None self.workDanmaku: list[str] = [] self.running: bool = False self.finished: bool = False self.init_ui() def _update_text(self): if self.workVideo is None: text = "请拖入视频" else: text = "工作视频:{}".format(self.workVideo) if len(self.workDanmaku) == 0: self.setText("{}\n分割视频模式".format(text)) else: self.setText("{}\n压制弹幕模式\n基准弹幕:{}".format(text, "\n附加弹幕:".join(self.workDanmaku))) self.adjustSize() def mouseDoubleClickEvent(self, a0: QtGui.QMouseEvent) -> None: if self.workVideo is None or self.finished: self.parent().labelDestroy.emit(self) return self.deleteLater() self.workVideo = None self.workDanmaku = [] self.init_ui() def init_ui(self): font = QtGui.QFont() font.setPixelSize(12) self.setFont(font) self.setFrameShape(QFrame.Box) self.setStyleSheet("""QLabel { background-color: white; }""") self.setAlignment(Qt.AlignTop) self.setWordWrap(True) self.setMinimumHeight(64) self.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding) self._update_text() self.setAcceptDrops(True) def dragEnterEvent(self, a0: QtGui.QDragEnterEvent) -> None: if a0.mimeData().hasUrls(): a0.accept() else: a0.ignore() print("Enter Label", a0.mimeData().urls()) def dropEvent(self, a0: QtGui.QDropEvent) -> None: urls = a0.mimeData().urls() for url in urls: # 判断是否为视频 path = url.path().strip("/") if path.endswith(".mp4") or path.endswith(".flv"): self.set_video(path) elif path.endswith(".xml"): self.add_danmaku(path) else: print("Unknown File", path) self._update_text() def set_video(self, file): if not os.path.isfile(file): raise FileNotFoundError(file) if file == self.workVideo: return print("Same Video", file) self.workVideo = file self._update_text() def add_danmaku(self, file: str) -> None: if not os.path.isfile(file): raise FileNotFoundError(file) if file in self.workDanmaku: return print("Already Added File", file) self.workDanmaku.append(file) self._update_text() def have_job(self) -> bool: return self.workVideo is not None and not self.running and not self.finished def get_job(self) -> Job: job = Job() job.video = self.workVideo if len(self.workDanmaku) > 0: job.type = Job.DANMAKU_ENCODE job.danmaku = self.workDanmaku else: job.type = Job.PURE_SPLIT job.danmaku = [] return job def start_running(self) -> None: self.running = True self.finished = False self.setStyleSheet("""QLabel { color: white; background-color: red; }""") def stop_running(self) -> None: self.running = False self.finished = True self.setStyleSheet("""QLabel { color: white; background-color: green; }""") class HomePage(QWidget): showMessageBox = pyqtSignal(str, str) labelDestroy = pyqtSignal(WorkLabel) processCurTime = pyqtSignal(str) processSpeed = pyqtSignal(str) def __init__(self): super(HomePage, self).__init__() self.layout = None self.labels: list[WorkLabel] = [] self.worker: Optional[WorkerThread] = None self.btn_start: Optional[QPushButton] = None self.showMessageBox.connect(self.on_show_message_box_info) self.labelDestroy.connect(self.on_label_destroy) self.processCurTime.connect(self.on_process_cur_time_change) self.processSpeed.connect(self.on_process_speed_change) self.process_cur_time = "-" self.process_speed = "-" self.init_ui() def init_ui(self): layout = QVBoxLayout() self.layout = QVBoxLayout() layout.addLayout(self.layout) btn_start = QPushButton(self) btn_start.setText("开始") btn_start.clicked.connect(self.handle_do) btn_start.setGeometry(0, 560, 400, 40) layout.addWidget(btn_start, 1, Qt.AlignBottom) self.setLayout(layout) self.btn_start = btn_start self.resize(400, 600) self.setWindowTitle("录播工作流") self.setAcceptDrops(True) self.show() def add_label(self): label1 = WorkLabel(self) self.layout.addWidget(label1) self.labels.append(label1) return label1 def handle_do(self): if len(self.labels) == 0: return self.on_show_message_box_warn("提示", "请添加任务") for label in self.labels: if not label.have_job(): continue _thread = WorkerThread(self, label) _thread.start() self.worker = _thread _thread.started.connect(self.on_worker_start) _thread.finished.connect(self.on_worker_stop) break def dragEnterEvent(self, a0: QtGui.QDragEnterEvent) -> None: if a0.mimeData().hasUrls(): a0.accept() else: a0.ignore() def dropEvent(self, a0: QtGui.QDropEvent) -> None: label1 = self.add_label() label1.dropEvent(a0) def on_show_message_box_info(self, title: str, content: str): QMessageBox.information(self, title, content, QMessageBox.Yes) def on_show_message_box_warn(self, title: str, content: str): QMessageBox.warning(self, title, content, QMessageBox.Yes) def on_label_destroy(self, label: WorkLabel): if label in self.labels: self.labels.remove(label) def on_worker_start(self): self.btn_start.setDisabled(True) self.btn_start.setText("正在处理") def on_process_cur_time_change(self, s: str)->None: self.process_cur_time = s self.update_btn_process_text() def on_process_speed_change(self, s: str)->None: self.process_speed = s self.update_btn_process_text() def update_btn_process_text(self): self.btn_start.setText("{}@{}".format(self.process_cur_time, self.process_speed)) def on_worker_stop(self): self.btn_start.setDisabled(False) self.btn_start.setText("开始") self.worker = None class WorkerThread(QThread): def __init__(self, app: HomePage, label: WorkLabel): super(WorkerThread, self).__init__() self.app = app self.label = label def run(self) -> None: self.label.start_running() job = self.label.get_job() if job.type == Job.DANMAKU_ENCODE: self.run_danmaku_encode(job) elif job.type == Job.PURE_SPLIT: self.run_pure_split(job) self.label.stop_running() def run_danmaku_encode(self, job: Job): os.chdir(DANMAKU_PATH) base_danmaku = job.danmaku.pop(0) time_shift = 0 base_start_ts = get_file_start(base_danmaku) new_subtitle_name = os.path.basename(base_danmaku) + ".ass" subprocess.Popen(( DANMAKU_FACTORY_CLI, "-r", "1280x720", "-s", "10", "-f", "5", "-S", "40", "-N", "\"Sarasa Term SC\"", "--showmsgbox", "FALSE", "-O", "255", "-L", "1", "-D", "0", "-o", "ass", new_subtitle_name, "-i", base_danmaku, "-t", str(time_shift) ), **subprocess_args()).wait() print(new_subtitle_name) job.subtitles.append(new_subtitle_name) for danmaku in job.danmaku: time_shift = diff_danmaku_files(base_danmaku, danmaku) new_subtitle_name = os.path.basename(danmaku) + ".ass" subprocess.Popen(( DANMAKU_FACTORY_CLI, "-r", "1280x720", "-s", "10", "-f", "5", "-S", "40", "-N", "\"Sarasa Term SC\"", "--showmsgbox", "FALSE", "-O", "255", "-L", "1", "-D", "0", "-o", "ass", new_subtitle_name, "-i", danmaku, "-t", str(time_shift) ), **subprocess_args()).wait() print(new_subtitle_name) job.subtitles.append(new_subtitle_name) # 压制 base_start = datetime.datetime.fromtimestamp(base_start_ts) new_file_name = base_start.strftime("%Y%m%d_%H%M.flv") encode_process = subprocess.Popen([ FFMPEG_TOOL, "-hide_banner", "-progress", "-", "-v", "0", "-y", "-i", job.video, "-vf", ",".join("subtitles=%s" % i for i in job.subtitles) + ",hwupload_cuda", "-c:a", "copy", "-c:v", "h264_nvenc", "-f", "mp4", "-preset:v", "fast", "-profile:v", "high", "-level", "4.1", "-b:v", "2.5M", "-bufsize:v", "5M", "-rc:v", "vbr_hq", "-bf:v", "3", "-qmin", "10", "-qmax", "52", "-crf", "16", # "-t", "10", os.path.join(HOME_PATH, SPLIT_PATH, new_file_name) ], **subprocess_args(True)) self.handle_ffmpeg_output(encode_process.stdout) encode_process.wait() for _f in job.subtitles: if os.path.isfile(_f): os.remove(_f) os.chdir(HOME_PATH) os.chdir(SPLIT_PATH) split_process = subprocess.Popen([ SPLIT_TOOL, new_file_name ], **subprocess_args(True)) self.handle_ffmpeg_output(split_process.stdout) split_process.wait() os.chdir(HOME_PATH) def run_pure_split(self, job: Job): os.chdir(SPLIT_PATH) split_process = subprocess.Popen([ SPLIT_TOOL, job.video ], **subprocess_args(True)) self.handle_ffmpeg_output(split_process.stdout) split_process.wait() os.chdir(HOME_PATH) def handle_ffmpeg_output(self, stderr: IO[bytes]) -> None: while True: line = stderr.readline() if line == b"": break if line.startswith(b"out_time="): cur_time = line.replace(b"out_time=", b"").decode() self.app.processCurTime.emit(cur_time.strip()) if line.startswith(b"speed="): speed = line.replace(b"speed=", b"").decode() self.app.processSpeed.emit(speed.strip()) # Create a set of arguments which make a ``subprocess.Popen`` (and # variants) call work with or without Pyinstaller, ``--noconsole`` or # not, on Windows and Linux. Typical use:: # # subprocess.call(['program_to_run', 'arg_1'], **subprocess_args()) # # When calling ``check_output``:: # # subprocess.check_output(['program_to_run', 'arg_1'], # **subprocess_args(False)) def subprocess_args(include_stdout=True): # The following is true only on Windows. if hasattr(subprocess, 'STARTUPINFO'): # On Windows, subprocess calls will pop up a command window by default # when run from Pyinstaller with the ``--noconsole`` option. Avoid this # distraction. si = subprocess.STARTUPINFO() si.dwFlags |= subprocess.STARTF_USESHOWWINDOW # Windows doesn't search the path by default. Pass it an environment so # it will. env = os.environ else: si = None env = None # ``subprocess.check_output`` doesn't allow specifying ``stdout``:: # # Traceback (most recent call last): # File "test_subprocess.py", line 58, in # **subprocess_args(stdout=None)) # File "C:\Python27\lib\subprocess.py", line 567, in check_output # raise ValueError('stdout argument not allowed, it will be overridden.') # ValueError: stdout argument not allowed, it will be overridden. # # So, add it only if it's needed. if include_stdout: ret = {'stdout': subprocess.PIPE} else: ret = {} # On Windows, running this from the binary produced by Pyinstaller # with the ``--noconsole`` option requires redirecting everything # (stdin, stdout, stderr) to avoid an OSError exception # "[Error 6] the handle is invalid." ret.update({'stdin': subprocess.PIPE, 'stderr': subprocess.PIPE, 'startupinfo': si, 'env': env}) return ret def check_all_prerequisite(): if not os.path.isdir(DANMAKU_PATH): os.mkdir(DANMAKU_PATH) if not os.path.isdir(SPLIT_PATH): os.mkdir(SPLIT_PATH) os.chdir(DANMAKU_PATH) validate_process = subprocess.Popen([ "where.exe", DANMAKU_FACTORY_CLI ], **subprocess_args(True)) if len(validate_process.stdout.readlines()) == 0: input("弹幕处理工具不存在") exit(1) os.chdir(HOME_PATH) os.chdir(SPLIT_PATH) validate_process = subprocess.Popen([ "where.exe", SPLIT_TOOL ], **subprocess_args(True)) if len(validate_process.stdout.readlines()) == 0: input("视频分割工具不存在") exit(1) os.chdir(HOME_PATH) validate_process = subprocess.Popen([ "where.exe", FFMPEG_TOOL ], **subprocess_args(True)) if len(validate_process.stdout.readlines()) == 0: input("FFMPEG工具不存在") exit(1) os.chdir(HOME_PATH) def main(): check_all_prerequisite() app = QApplication(sys.argv) page = HomePage() sys.exit(app.exec_()) if __name__ == '__main__': main()