import logging import os import subprocess from datetime import datetime from typing import Optional, IO from entity.ffmpeg import FfmpegTask logger = logging.getLogger(__name__) def to_annexb(file): if not os.path.exists(file): return file logger.info("ToAnnexb: %s", file) ffmpeg_process = subprocess.run(["ffmpeg", "-y", "-hide_banner", "-i", file, "-c", "copy", "-bsf:v", "h264_mp4toannexb", "-f", "mpegts", file+".ts"]) logger.info("ToAnnexb: %s, returned: %s", file, ffmpeg_process.returncode) if ffmpeg_process.returncode == 0: os.remove(file) return file+".ts" else: return file def re_encode_and_annexb(file): if not os.path.exists(file): return file logger.info("ReEncodeAndAnnexb: %s", file) ffmpeg_process = subprocess.run(["ffmpeg", "-y", "-hide_banner", "-i", file, "-c:v", "h264_qsv", "-global_quality", "28", "-look_ahead", "1", "-bsf:v", "h264_mp4toannexb", "-f", "mpegts", file+".ts"]) logger.info("ReEncodeAndAnnexb: %s, returned: %s", file, ffmpeg_process.returncode) if ffmpeg_process.returncode == 0: os.remove(file) return file+".ts" else: return file def start_render(ffmpeg_task: FfmpegTask): logger.info(ffmpeg_task) if not ffmpeg_task.need_run(): ffmpeg_task.set_output_file(ffmpeg_task.input_file[0]) return True ffmpeg_args = ffmpeg_task.get_ffmpeg_args() logger.info(ffmpeg_args) if len(ffmpeg_args) == 0: ffmpeg_task.set_output_file(ffmpeg_task.input_file[0]) return True ffmpeg_process = subprocess.run(["ffmpeg", "-progress", "-", "-loglevel", "error", *ffmpeg_args], **subprocess_args(True)) logger.info("FINISH TASK, OUTPUT IS %s", handle_ffmpeg_output(ffmpeg_process.stdout)) code = ffmpeg_process.returncode if code != 0: logger.error("FFMPEG ERROR: %s", ffmpeg_process.stderr) return False try: out_file_stat = os.stat(ffmpeg_task.output_file) if out_file_stat.st_size < 4096: logger.error("FFMPEG ERROR: OUTPUT FILE IS TOO SMALL") return False except OSError: logger.error("FFMPEG ERROR: OUTPUT FILE NOT FOUND") return False return True def handle_ffmpeg_output(stdout: Optional[bytes]) -> str: out_time = "0:0:0.0" if stdout is None: print("[!]STDOUT is null") return out_time speed = "0" for line in stdout.split(b"\n"): if line == b"": break if line.strip() == b"progress=end": # 处理完毕 break if line.startswith(b"out_time="): out_time = line.replace(b"out_time=", b"").decode().strip() if line.startswith(b"speed="): speed = line.replace(b"speed=", b"").decode().strip() print("[ ]Speed:", out_time, "@", speed) return out_time+"@"+speed def duration_str_to_float(duration_str: str) -> float: _duration = datetime.strptime(duration_str, "%H:%M:%S.%f") - datetime(1900, 1, 1) return _duration.total_seconds() def probe_video_info(video_file): # 获取宽度和高度 result = subprocess.run( ["ffprobe", '-v', 'error', '-select_streams', 'v:0', '-show_entries', 'stream=width,height:format=duration', '-of', 'csv=s=x:p=0', video_file], stderr=subprocess.STDOUT, **subprocess_args(True) ) if result.returncode != 0: return 0, 0, 0 all_result = result.stdout.decode('utf-8').strip() wh, duration = all_result.split('\n') width, height = wh.strip().split('x') return int(width), int(height), float(duration) # Create a set of arguments which make a ``subprocess.Popen`` (and # variants) call work with or without Pyinstaller, ``--noconsole`` or # not, on Windows and Linux. Typical use:: # # subprocess.call(['program_to_run', 'arg_1'], **subprocess_args()) # # When calling ``check_output``:: # # subprocess.check_output(['program_to_run', 'arg_1'], # **subprocess_args(False)) def subprocess_args(include_stdout=True): # The following is true only on Windows. if hasattr(subprocess, 'STARTUPINFO'): # On Windows, subprocess calls will pop up a command window by default # when run from Pyinstaller with the ``--noconsole`` option. Avoid this # distraction. si = subprocess.STARTUPINFO() si.dwFlags |= subprocess.STARTF_USESHOWWINDOW # Windows doesn't search the path by default. Pass it an environment so # it will. env = os.environ else: si = None env = None # ``subprocess.check_output`` doesn't allow specifying ``stdout``:: # # Traceback (most recent call last): # File "test_subprocess.py", line 58, in # **subprocess_args(stdout=None)) # File "C:\Python27\lib\subprocess.py", line 567, in check_output # raise ValueError('stdout argument not allowed, it will be overridden.') # ValueError: stdout argument not allowed, it will be overridden. # # So, add it only if it's needed. if include_stdout: ret = {'stdout': subprocess.PIPE} else: ret = {} # On Windows, running this from the binary produced by Pyinstaller # with the ``--noconsole`` option requires redirecting everything # (stdin, stdout, stderr) to avoid an OSError exception # "[Error 6] the handle is invalid." ret.update({'stdin': subprocess.PIPE, 'startupinfo': si, 'env': env}) return ret