import json import os import subprocess import warnings from datetime import datetime, timedelta from typing import IO from config import VIDEO_CLIP_EACH_SEC, VIDEO_CLIP_OVERFLOW_SEC, VIDEO_OUTPUT_DIR, \ FFMPEG_EXEC, HANDBRAKE_EXEC, HANDBRAKE_PRESET_FILE, HANDBRAKE_PRESET, HANDBRAKE_ENCOPT from . import LOGGER def base_ts_to_filename(start_ts: float, is_mp4=False) -> str: base_start = datetime.fromtimestamp(start_ts) if is_mp4: return base_start.strftime("%Y%m%d_%H%M.mp4") else: return base_start.strftime("%Y%m%d_%H%M.flv") def get_video_real_duration(filename): ffmpeg_process = subprocess.Popen([ FFMPEG_EXEC, *_common_ffmpeg_setting(), "-i", filename, "-c", "copy", "-f", "null", "-" ], stdout=subprocess.PIPE) return handle_ffmpeg_output(ffmpeg_process.stdout) def encode_video_with_subtitles(orig_filename: str, subtitles: list[str], base_ts: float): create_dt = datetime.fromtimestamp(base_ts) current_dt = (create_dt).strftime("%Y%m%d_%H%M_") process = get_encode_process_use_handbrake(orig_filename, subtitles, os.path.join(VIDEO_OUTPUT_DIR, "{}.mp4".format(current_dt))) handle_handbrake_output(process.stdout) process.wait() return [{ "base_path": VIDEO_OUTPUT_DIR, "file": "{}.mp4".format(current_dt), }] def get_encode_process_use_handbrake(orig_filename: str, subtitles: list[str], new_filename: str): print("[+]Use HandBrakeCli") encode_process = subprocess.Popen([ HANDBRAKE_EXEC, *_common_handbrake_setting(), "--preset-import-file", HANDBRAKE_PRESET_FILE, "--preset", HANDBRAKE_PRESET, "-i", orig_filename, "-x", HANDBRAKE_ENCOPT, "--ssa-file", ",".join(i for i in subtitles), "--ssa-burn", ",".join("%d" % (i+1) for i in range(len(subtitles))), "-o", new_filename ], stdout=subprocess.PIPE) return encode_process def handle_handbrake_output(stdout: IO[bytes]): out_time = "0:0:0.0" speed = "0" if stdout is None: print("[!]STDOUT is null") return json_body = "" json_start = False _i = 0 while True: line = stdout.readline() if line == b"": break if json_start: json_body += line.strip().decode("UTF-8") if line.startswith(b"}"): json_start = False status_payload = json.loads(json_body) if status_payload["State"] == "WORKING": out_time = "ETA: {Hours:02d}:{Minutes:02d}:{Seconds:02d}".format_map(status_payload["Working"]) speed = "{Rate:.2f}FPS".format_map(status_payload["Working"]) _i += 1 if _i % 300 == 150: LOGGER.debug("[>]Speed:{}@{}".format(out_time, speed)) elif status_payload["State"] == "WORKDONE": break continue if line.startswith(b"Progress:"): json_start = True json_body = "{" LOGGER.debug("[ ]Speed:{}@{}".format(out_time, speed)) def handle_ffmpeg_output(stdout: IO[bytes]) -> str: out_time = "0:0:0.0" speed = "0" if stdout is None: print("[!]STDOUT is null") return out_time _i = 0 while True: line = stdout.readline() if line == b"": break if line.strip() == b"progress=end": # 处理完毕 break if line.startswith(b"out_time="): out_time = line.replace(b"out_time=", b"").decode().strip() if line.startswith(b"speed="): speed = line.replace(b"speed=", b"").decode().strip() _i += 1 if _i % 300 == 150: LOGGER.debug("[>]Speed:{}@{}".format(out_time, speed)) LOGGER.debug("[ ]Speed:{}@{}".format(out_time, speed)) return out_time def duration_str_to_float(duration_str) -> float: _duration = datetime.strptime(duration_str, "%H:%M:%S.%f") - datetime(1900, 1, 1) return _duration.total_seconds() def quick_split_video(file): warnings.warn("已过时", DeprecationWarning) if not os.path.isfile(file): raise FileNotFoundError(file) file_name = os.path.split(file)[-1] _create_dt = os.path.splitext(file_name)[0] create_dt = datetime.strptime(_create_dt[:13], "%Y%m%d_%H%M") _duration_str = get_video_real_duration(file) duration = duration_str_to_float(_duration_str) current_sec = 0 _video_parts = [] while current_sec < duration: if (current_sec + VIDEO_CLIP_OVERFLOW_SEC * 2) > duration: print("[-]Less than 2 overflow sec, skip") break current_dt = (create_dt + timedelta(seconds=current_sec)).strftime("%Y%m%d_%H%M_") print("CUR_DT", current_dt) print("BIAS_T", current_sec) split_process = subprocess.Popen([ FFMPEG_EXEC, *_common_ffmpeg_setting(), "-ss", str(current_sec), "-i", file, "-c:v", "copy", "-c:a", "copy", "-f", "mp4", "-t", str(VIDEO_CLIP_EACH_SEC + VIDEO_CLIP_OVERFLOW_SEC), "-fflags", "+genpts", "-shortest", "-movflags", "faststart", os.path.join(VIDEO_OUTPUT_DIR, "{}.mp4".format(current_dt)) ], stdout=subprocess.PIPE) handle_ffmpeg_output(split_process.stdout) split_process.wait() _video_parts.append({ "base_path": VIDEO_OUTPUT_DIR, "file": "{}.mp4".format(current_dt), }) current_sec += VIDEO_CLIP_EACH_SEC return _video_parts def _common_ffmpeg_setting(): return ( "-y", "-hide_banner", "-progress", "-", "-loglevel", "error", ) def _common_handbrake_setting(): return ( "--json", "--crop-mode", "none", "--no-comb-detect", "--no-bwdif", "--no-decomb", "--no-detelecine", "--no-hqdn3d", "--no-nlmeans", "--no-chroma-smooth", "--no-unsharp", "--no-lapsharp", "--no-deblock", "--align-av" )