import os import re import subprocess from datetime import datetime, timedelta from typing import IO from config import FFMPEG_EXEC, VIDEO_BITRATE, FFMPEG_USE_GPU, VIDEO_CLIP_EACH_SEC, VIDEO_CLIP_OVERFLOW_SEC, PROD_ENV def get_video_real_duration(filename): ffmpeg_process = subprocess.Popen([ "ffmpeg", "-hide_banner", "-i", filename, "-c", "copy", "-f", "null", "-" ], stdout=subprocess.PIPE, stderr=subprocess.PIPE) result = "0:0:0.0" for line in ffmpeg_process.stderr.readlines(): match_result = re.findall("(?<= time=).+?(?= )", line.decode()) if len(match_result) == 0: continue result = match_result.pop() return result def encode_video_with_subtitles(orig_filename: str, subtitles: list[str], new_filename: str): encode_process = subprocess.Popen([ FFMPEG_EXEC, "-hide_banner", "-progress", "-", "-v", "0", "-y", "-i", orig_filename, "-vf", ",".join("subtitles=%s" % i for i in subtitles) + (",hwupload_cuda" if FFMPEG_USE_GPU else ""), "-c:a", "copy", "-c:v", "h264_nvenc" if FFMPEG_USE_GPU else "h264", "-f", "mp4", "-preset:v", "fast", "-profile:v", "high", "-level", "4.1", "-b:v", VIDEO_BITRATE, "-rc:v", "vbr", *(["-tune:v", "hq"] if FFMPEG_USE_GPU else []), "-qmin", "10", "-qmax", "32", "-crf", "16", # "-t", "10", new_filename ], stdout=subprocess.PIPE) handle_ffmpeg_output(encode_process.stdout) return encode_process.wait() def handle_ffmpeg_output(stderr: IO[bytes]) -> None: while True: line = stderr.readline() if line == b"": break if PROD_ENV: # 正式环境不要输出一大堆的东西了 continue if line.startswith(b"out_time="): cur_time = line.replace(b"out_time=", b"").decode() print("CurTime", cur_time.strip()) if line.startswith(b"speed="): speed = line.replace(b"speed=", b"").decode() print("Speed", speed.strip()) def quick_split_video(file): if not os.path.isfile(file): raise FileNotFoundError(file) file_name = os.path.split(file)[-1] _create_dt = os.path.splitext(file_name)[0] create_dt = datetime.strptime(_create_dt, "%Y%m%d_%H%M") _duration_str = get_video_real_duration(file) _duration = datetime.strptime(_duration_str, "%H:%M:%S.%f") - datetime(1900, 1, 1) duration = _duration.total_seconds() current_sec = 0 while current_sec < duration: current_dt = (create_dt + timedelta(seconds=current_sec)).strftime("%Y%m%d_%H%M_") print("CUR_DT", current_dt) print("BIAS_T", current_sec) split_process = subprocess.Popen([ "ffmpeg", "-y", "-hide_banner", "-progress", "-", "-v", "0", "-ss", str(current_sec), "-i", file_name, "-c", "copy", "-f", "mp4", "-t", str(VIDEO_CLIP_EACH_SEC + VIDEO_CLIP_OVERFLOW_SEC), "{}.mp4".format(current_dt) ], stdout=subprocess.PIPE) handle_ffmpeg_output(split_process.stdout) current_sec += VIDEO_CLIP_EACH_SEC