import json import logging import os import subprocess from datetime import datetime from typing import Optional, IO from opentelemetry.trace import Status, StatusCode from entity.ffmpeg import FfmpegTask, ENCODER_ARGS, VIDEO_ARGS, AUDIO_ARGS, MUTE_AUDIO_INPUT from telemetry import get_tracer logger = logging.getLogger(__name__) def re_encode_and_annexb(file): with get_tracer("ffmpeg").start_as_current_span("re_encode_and_annexb") as span: span.set_attribute("file.path", file) if not os.path.exists(file): span.set_status(Status(StatusCode.ERROR)) return file logger.info("ReEncodeAndAnnexb: %s", file) has_audio = not not probe_video_audio(file) ffmpeg_process = subprocess.run(["ffmpeg", "-y", "-hide_banner", "-vsync", "cfr", "-i", file, *(set() if has_audio else MUTE_AUDIO_INPUT), "-map", "0:v", "-map", "0:a" if has_audio else "1:a", *VIDEO_ARGS, "-bsf:v", "h264_mp4toannexb", *AUDIO_ARGS, "-bsf:a", "setts=pts=DTS", *ENCODER_ARGS, "-shortest", "-fflags", "+genpts", "-f", "mpegts", file + ".ts"]) logger.info(" ".join(ffmpeg_process.args)) span.set_attribute("ffmpeg.args", json.dumps(ffmpeg_process.args)) logger.info("ReEncodeAndAnnexb: %s, returned: %s", file, ffmpeg_process.returncode) span.set_attribute("ffmpeg.code", ffmpeg_process.returncode) if ffmpeg_process.returncode == 0: span.set_status(Status(StatusCode.OK)) span.set_attribute("file.size", os.path.getsize(file+".ts")) # os.remove(file) return file+".ts" else: span.set_status(Status(StatusCode.ERROR)) return file def start_render(ffmpeg_task: FfmpegTask): tracer = get_tracer(__name__) with tracer.start_as_current_span("start_render") as span: span.set_attribute("ffmpeg.task", str(ffmpeg_task)) if not ffmpeg_task.need_run(): ffmpeg_task.set_output_file(ffmpeg_task.input_file[0]) span.set_status(Status(StatusCode.OK)) return True ffmpeg_args = ffmpeg_task.get_ffmpeg_args() if len(ffmpeg_args) == 0: ffmpeg_task.set_output_file(ffmpeg_task.input_file[0]) span.set_status(Status(StatusCode.OK)) return True ffmpeg_process = subprocess.run(["ffmpeg", "-progress", "-", "-loglevel", "error", *ffmpeg_args], stderr=subprocess.PIPE, **subprocess_args(True)) span.set_attribute("ffmpeg.args", json.dumps(ffmpeg_process.args)) logger.info(" ".join(ffmpeg_process.args)) ffmpeg_final_out = handle_ffmpeg_output(ffmpeg_process.stdout) span.set_attribute("ffmpeg.out", ffmpeg_final_out) logger.info("FINISH TASK, OUTPUT IS %s", ffmpeg_final_out) code = ffmpeg_process.returncode span.set_attribute("ffmpeg.code", code) if code != 0: span.set_attribute("ffmpeg.err", str(ffmpeg_process.stderr)) span.set_status(Status(StatusCode.ERROR, "FFMPEG异常退出")) logger.error("FFMPEG ERROR: %s", ffmpeg_process.stderr) return False span.set_attribute("ffmpeg.out_file", ffmpeg_task.output_file) try: file_size = os.path.getsize(ffmpeg_task.output_file) span.set_attribute("file.size", file_size) if file_size < 4096: span.set_status(Status(StatusCode.ERROR, "输出文件过小")) logger.error("FFMPEG ERROR: OUTPUT FILE IS TOO SMALL") return False except OSError as e: span.set_attribute("file.size", 0) span.set_attribute("file.error", e.strerror) span.set_status(Status(StatusCode.ERROR, "输出文件不存在")) logger.error("FFMPEG ERROR: OUTPUT FILE NOT FOUND") return False span.set_status(Status(StatusCode.OK)) return True def handle_ffmpeg_output(stdout: Optional[bytes]) -> str: out_time = "0:0:0.0" if stdout is None: print("[!]STDOUT is null") return out_time speed = "0" for line in stdout.split(b"\n"): if line == b"": break if line.strip() == b"progress=end": # 处理完毕 break if line.startswith(b"out_time="): out_time = line.replace(b"out_time=", b"").decode().strip() if line.startswith(b"speed="): speed = line.replace(b"speed=", b"").decode().strip() print("[ ]Speed:", out_time, "@", speed) return out_time+"@"+speed def duration_str_to_float(duration_str: str) -> float: _duration = datetime.strptime(duration_str, "%H:%M:%S.%f") - datetime(1900, 1, 1) return _duration.total_seconds() def probe_video_info(video_file): tracer = get_tracer(__name__) with tracer.start_as_current_span("probe_video_info") as span: span.set_attribute("video.file", video_file) # 获取宽度和高度 result = subprocess.run( ["ffprobe", '-v', 'error', '-select_streams', 'v:0', '-show_entries', 'stream=width,height:format=duration', '-of', 'csv=s=x:p=0', video_file], stderr=subprocess.STDOUT, **subprocess_args(True) ) span.set_attribute("ffprobe.args", json.dumps(result.args)) span.set_attribute("ffprobe.code", result.returncode) if result.returncode != 0: span.set_status(Status(StatusCode.ERROR)) return 0, 0, 0 all_result = result.stdout.decode('utf-8').strip() span.set_attribute("ffprobe.out", all_result) if all_result == '': span.set_status(Status(StatusCode.ERROR)) return 0, 0, 0 span.set_status(Status(StatusCode.OK)) wh, duration = all_result.split('\n') width, height = wh.strip().split('x') return int(width), int(height), float(duration) def probe_video_audio(video_file, type=None): tracer = get_tracer(__name__) with tracer.start_as_current_span("probe_video_audio") as span: span.set_attribute("video.file", video_file) args = ["ffprobe", "-hide_banner", "-v", "error", "-select_streams", "a", "-show_entries", "stream=index", "-of", "csv=p=0"] if type == 'concat': args.append("-safe") args.append("0") args.append("-f") args.append("concat") args.append(video_file) logger.info(" ".join(args)) result = subprocess.run(args, stderr=subprocess.STDOUT, **subprocess_args(True)) span.set_attribute("ffprobe.args", json.dumps(result.args)) span.set_attribute("ffprobe.code", result.returncode) logger.info("probe_video_audio: %s", result.stdout.decode('utf-8').strip()) if result.returncode != 0: return False if result.stdout.decode('utf-8').strip() == '': return False return True # 音频淡出2秒 def fade_out_audio(file, duration, fade_out_sec = 2): if type(duration) == str: try: duration = float(duration) except Exception as e: logger.error("duration is not float: %s", e) return file tracer = get_tracer(__name__) with tracer.start_as_current_span("fade_out_audio") as span: span.set_attribute("audio.file", file) if duration <= fade_out_sec: return file else: new_fn = file + "_.mp4" if os.path.exists(new_fn): os.remove(new_fn) logger.info("delete tmp file: " + new_fn) try: process = subprocess.run(["ffmpeg", "-i", file, "-c:v", "copy", "-c:a", "aac", "-af", "afade=t=out:st=" + str(duration - fade_out_sec) + ":d=" + str(fade_out_sec), "-y", new_fn], **subprocess_args(True)) span.set_attribute("ffmpeg.args", json.dumps(process.args)) logger.info(" ".join(process.args)) if process.returncode != 0: span.set_status(Status(StatusCode.ERROR)) logger.error("FFMPEG ERROR: %s", process.stderr) return file else: span.set_status(Status(StatusCode.OK)) return new_fn except Exception as e: span.set_status(Status(StatusCode.ERROR)) logger.error("FFMPEG ERROR: %s", e) return file # Create a set of arguments which make a ``subprocess.Popen`` (and # variants) call work with or without Pyinstaller, ``--noconsole`` or # not, on Windows and Linux. Typical use:: # # subprocess.call(['program_to_run', 'arg_1'], **subprocess_args()) # # When calling ``check_output``:: # # subprocess.check_output(['program_to_run', 'arg_1'], # **subprocess_args(False)) def subprocess_args(include_stdout=True): # The following is true only on Windows. if hasattr(subprocess, 'STARTUPINFO'): # On Windows, subprocess calls will pop up a command window by default # when run from Pyinstaller with the ``--noconsole`` option. Avoid this # distraction. si = subprocess.STARTUPINFO() si.dwFlags |= subprocess.STARTF_USESHOWWINDOW # Windows doesn't search the path by default. Pass it an environment so # it will. env = os.environ else: si = None env = None # ``subprocess.check_output`` doesn't allow specifying ``stdout``:: # # Traceback (most recent call last): # File "test_subprocess.py", line 58, in # **subprocess_args(stdout=None)) # File "C:\Python27\lib\subprocess.py", line 567, in check_output # raise ValueError('stdout argument not allowed, it will be overridden.') # ValueError: stdout argument not allowed, it will be overridden. # # So, add it only if it's needed. if include_stdout: ret = {'stdout': subprocess.PIPE} else: ret = {} # On Windows, running this from the binary produced by Pyinstaller # with the ``--noconsole`` option requires redirecting everything # (stdin, stdout, stderr) to avoid an OSError exception # "[Error 6] the handle is invalid." ret.update({'stdin': subprocess.PIPE, 'startupinfo': si, 'env': env}) return ret