diff --git a/.gitignore b/.gitignore index 5a6bc28..a870de9 100644 --- a/.gitignore +++ b/.gitignore @@ -32,4 +32,5 @@ target/ venv/ cython_debug/ .env -.serena \ No newline at end of file +.serena +.claude \ No newline at end of file diff --git a/app.py b/app.py deleted file mode 100644 index 041dae7..0000000 --- a/app.py +++ /dev/null @@ -1,40 +0,0 @@ -import time - -import flask - -import config -import biz.task -import template -from telemetry import init_opentelemetry -from template import load_local_template -from util import api - -load_local_template() -import logging - -LOGGER = logging.getLogger(__name__) -init_opentelemetry(batch=False) -app = flask.Flask(__name__) - -@app.get('/health/check') -def health_check(): - return api.sync_center() - -@app.post('/') -def do_nothing(): - return "NOOP" - -@app.post('/') -def do_task(task_id): - task_info = api.get_task_info(task_id) - local_template_info = template.get_template_def(task_info.get("templateId")) - template_info = api.get_template_info(task_info.get("templateId")) - if local_template_info: - if local_template_info.get("updateTime") != template_info.get("updateTime"): - template.download_template(task_info.get("templateId")) - biz.task.start_task(task_info) - return "OK" - - -if __name__ == '__main__': - app.run(host="0.0.0.0", port=9998) diff --git a/biz/ffmpeg.py b/biz/ffmpeg.py deleted file mode 100644 index 5b7dcd9..0000000 --- a/biz/ffmpeg.py +++ /dev/null @@ -1,192 +0,0 @@ -import json -import os.path -import time -from concurrent.futures import ThreadPoolExecutor, as_completed - -from opentelemetry.trace import Status, StatusCode - -from entity.ffmpeg import FfmpegTask -import logging - -from util import ffmpeg, oss -from util.ffmpeg import fade_out_audio -from telemetry import get_tracer - -logger = logging.getLogger('biz/ffmpeg') - - -def parse_ffmpeg_task(task_info, template_info): - tracer = get_tracer(__name__) - with tracer.start_as_current_span("parse_ffmpeg_task") as span: - tasks = [] - # 中间片段 - task_params_str = task_info.get("taskParams", "{}") - span.set_attribute("task_params", task_params_str) - task_params: dict = json.loads(task_params_str) - task_params_orig = json.loads(task_params_str) - - # 统计only_if占位符的使用次数 - only_if_usage_count = {} - with tracer.start_as_current_span("parse_ffmpeg_task.download_all") as sub_span: - with ThreadPoolExecutor(max_workers=8) as executor: - param_list: list[dict] - for param_list in task_params.values(): - for param in param_list: - url = param.get("url") - if url.startswith("http"): - _, fn = os.path.split(url) - executor.submit(oss.download_from_oss, url, fn, True) - executor.shutdown(wait=True) - for part in template_info.get("video_parts"): - source, ext_data = parse_video(part.get('source'), task_params, template_info) - if not source: - logger.warning("no video found for part: " + str(part)) - continue - only_if = part.get('only_if', '') - if only_if: - only_if_usage_count[only_if] = only_if_usage_count.get(only_if, 0) + 1 - required_count = only_if_usage_count.get(only_if) - if not check_placeholder_exist_with_count(only_if, task_params_orig, required_count): - logger.info("because only_if exist, placeholder: %s insufficient (need %d), skip part: %s", only_if, required_count, part) - continue - sub_ffmpeg_task = FfmpegTask(source) - sub_ffmpeg_task.resolution = template_info.get("video_size", "") - sub_ffmpeg_task.annexb = True - sub_ffmpeg_task.ext_data = ext_data or {} - sub_ffmpeg_task.frame_rate = template_info.get("frame_rate", 25) - sub_ffmpeg_task.center_cut = part.get("crop_mode", None) - sub_ffmpeg_task.zoom_cut = part.get("zoom_cut", None) - for effect in part.get('effects', []): - sub_ffmpeg_task.add_effect(effect) - for lut in part.get('luts', []): - sub_ffmpeg_task.add_lut(os.path.join(template_info.get("local_path"), lut).replace("\\", "/")) - for audio in part.get('audios', []): - sub_ffmpeg_task.add_audios(os.path.join(template_info.get("local_path"), audio)) - for overlay in part.get('overlays', []): - sub_ffmpeg_task.add_overlay(os.path.join(template_info.get("local_path"), overlay)) - tasks.append(sub_ffmpeg_task) - output_file = "out_" + str(time.time()) + ".mp4" - task = FfmpegTask(tasks, output_file=output_file) - task.resolution = template_info.get("video_size", "") - overall = template_info.get("overall_template") - task.center_cut = template_info.get("crop_mode", None) - task.zoom_cut = template_info.get("zoom_cut", None) - task.frame_rate = template_info.get("frame_rate", 25) - # if overall.get('source', ''): - # source, ext_data = parse_video(overall.get('source'), task_params, template_info) - # task.add_inputs(source) - # task.ext_data = ext_data or {} - for effect in overall.get('effects', []): - task.add_effect(effect) - for lut in overall.get('luts', []): - task.add_lut(os.path.join(template_info.get("local_path"), lut).replace("\\", "/")) - for audio in overall.get('audios', []): - task.add_audios(os.path.join(template_info.get("local_path"), audio)) - for overlay in overall.get('overlays', []): - task.add_overlay(os.path.join(template_info.get("local_path"), overlay)) - return task - - -def parse_video(source, task_params, template_info): - if source.startswith('PLACEHOLDER_'): - placeholder_id = source.replace('PLACEHOLDER_', '') - new_sources = task_params.get(placeholder_id, []) - _pick_source = {} - if type(new_sources) is list: - if len(new_sources) == 0: - logger.debug("no video found for placeholder: " + placeholder_id) - return None, _pick_source - else: - _pick_source = new_sources.pop(0) - new_sources = _pick_source.get("url") - if new_sources.startswith("http"): - _, source_name = os.path.split(new_sources) - oss.download_from_oss(new_sources, source_name, True) - return source_name, _pick_source - return new_sources, _pick_source - return os.path.join(template_info.get("local_path"), source), None - - -def check_placeholder_exist(placeholder_id, task_params): - if placeholder_id in task_params: - new_sources = task_params.get(placeholder_id, []) - if type(new_sources) is list: - if len(new_sources) == 0: - return False - else: - return True - return True - return False - - -def check_placeholder_exist_with_count(placeholder_id, task_params, required_count=1): - """检查占位符是否存在足够数量的片段""" - if placeholder_id in task_params: - new_sources = task_params.get(placeholder_id, []) - if type(new_sources) is list: - return len(new_sources) >= required_count - return required_count <= 1 - return False - - -def start_ffmpeg_task(ffmpeg_task, max_workers=None): - if max_workers is None: - max_workers = int(os.environ.get("FFMPEG_MAX_WORKERS", 4)) - tracer = get_tracer(__name__) - with tracer.start_as_current_span("start_ffmpeg_task") as span: - sub_tasks = list(ffmpeg_task.analyze_input_render_tasks()) - - if sub_tasks: - span.set_attribute("sub_tasks.count", len(sub_tasks)) - with ThreadPoolExecutor(max_workers=max_workers) as executor: - futures = {executor.submit(start_ffmpeg_task, task, max_workers): task - for task in sub_tasks} - for future in as_completed(futures): - try: - if not future.result(): - # 快速失败:取消剩余任务 - for f in futures: - f.cancel() - span.set_status(Status(StatusCode.ERROR)) - return False - except Exception as e: - logger.error("子任务执行失败: %s", e) - for f in futures: - f.cancel() - span.set_status(Status(StatusCode.ERROR)) - return False - - ffmpeg_task.correct_task_type() - span.set_attribute("task.type", ffmpeg_task.task_type) - span.set_attribute("task.center_cut", str(ffmpeg_task.center_cut)) - span.set_attribute("task.frame_rate", ffmpeg_task.frame_rate) - span.set_attribute("task.resolution", str(ffmpeg_task.resolution)) - span.set_attribute("task.ext_data", json.dumps(ffmpeg_task.ext_data)) - result = ffmpeg.start_render(ffmpeg_task) - if not result: - span.set_status(Status(StatusCode.ERROR)) - return False - span.set_status(Status(StatusCode.OK)) - return True - - -def clear_task_tmp_file(ffmpeg_task): - for task in ffmpeg_task.analyze_input_render_tasks(): - clear_task_tmp_file(task) - try: - if os.getenv("TEMPLATE_DIR") not in ffmpeg_task.get_output_file(): - os.remove(ffmpeg_task.get_output_file()) - logger.info("delete tmp file: " + ffmpeg_task.get_output_file()) - else: - logger.info("skip delete template file: " + ffmpeg_task.get_output_file()) - except OSError: - logger.warning("delete tmp file failed: " + ffmpeg_task.get_output_file()) - return False - return True - - -def probe_video_info(ffmpeg_task): - # 获取视频长度宽度和时长 - return ffmpeg.probe_video_info(ffmpeg_task.get_output_file()) - - diff --git a/biz/render.py b/biz/render.py deleted file mode 100644 index e69de29..0000000 diff --git a/biz/task.py b/biz/task.py deleted file mode 100644 index 12548be..0000000 --- a/biz/task.py +++ /dev/null @@ -1,44 +0,0 @@ -import json - -from opentelemetry.trace import Status, StatusCode - -from biz.ffmpeg import parse_ffmpeg_task, start_ffmpeg_task, clear_task_tmp_file, probe_video_info, fade_out_audio -from telemetry import get_tracer -from template import get_template_def -from util import api - - -def start_task(task_info): - tracer = get_tracer(__name__) - with tracer.start_as_current_span("start_task") as span: - task_info = api.normalize_task(task_info) - span.set_attribute("task", json.dumps(task_info)) - span.set_attribute("scenicId", task_info.get("scenicId", "?")) - span.set_attribute("templateId", task_info.get("templateId")) - template_info = get_template_def(task_info.get("templateId")) - api.report_task_start(task_info) - ffmpeg_task = parse_ffmpeg_task(task_info, template_info) - result = start_ffmpeg_task(ffmpeg_task) - if not result: - span.set_status(Status(StatusCode.ERROR)) - return api.report_task_failed(task_info) - width, height, duration = probe_video_info(ffmpeg_task) - span.set_attribute("probe.width", width) - span.set_attribute("probe.height", height) - span.set_attribute("probe.duration", duration) - # 音频淡出 - new_fn = fade_out_audio(ffmpeg_task.get_output_file(), duration) - ffmpeg_task.set_output_file(new_fn) - oss_result = api.upload_task_file(task_info, ffmpeg_task) - if not oss_result: - span.set_status(Status(StatusCode.ERROR)) - return api.report_task_failed(task_info) - # 获取视频长度宽度和时长 - clear_task_tmp_file(ffmpeg_task) - api.report_task_success(task_info, videoInfo={ - "width": width, - "height": height, - "duration": duration - }) - span.set_status(Status(StatusCode.OK)) - return None diff --git a/config/__init__.py b/config/__init__.py deleted file mode 100644 index f749288..0000000 --- a/config/__init__.py +++ /dev/null @@ -1,16 +0,0 @@ -import datetime -import logging -from logging.handlers import TimedRotatingFileHandler -from dotenv import load_dotenv - -load_dotenv() -logging.basicConfig(level=logging.INFO) -root_logger = logging.getLogger() -rf_handler = TimedRotatingFileHandler('all_log.log', when='midnight') -rf_handler.setFormatter(logging.Formatter("[%(asctime)s][%(name)s]%(levelname)s - %(message)s")) -rf_handler.setLevel(logging.DEBUG) -f_handler = TimedRotatingFileHandler('error.log', when='midnight') -f_handler.setLevel(logging.ERROR) -f_handler.setFormatter(logging.Formatter("[%(asctime)s][%(name)s][:%(lineno)d]%(levelname)s - - %(message)s")) -root_logger.addHandler(rf_handler) -root_logger.addHandler(f_handler) \ No newline at end of file diff --git a/constant/__init__.py b/constant/__init__.py index f9f1aaf..c723b63 100644 --- a/constant/__init__.py +++ b/constant/__init__.py @@ -1,34 +1,26 @@ # -*- coding: utf-8 -*- """ 常量定义 + +v2 版本常量,用于 Render Worker v2 API。 """ -# v1 支持的特性 -SUPPORT_FEATURE = ( - 'simple_render_algo', - 'gpu_accelerate', - 'hevc_encode', - 'rapid_download', - 'rclone_upload', - 'custom_re_encode', -) - # 软件版本 -SOFTWARE_VERSION = '0.0.9' +SOFTWARE_VERSION = '2.0.0' -# v2 支持的任务类型 -V2_TASK_TYPES = ( +# 支持的任务类型 +TASK_TYPES = ( 'RENDER_SEGMENT_VIDEO', 'PREPARE_JOB_AUDIO', 'PACKAGE_SEGMENT_TS', 'FINALIZE_MP4', ) -# v2 默认能力 -V2_DEFAULT_CAPABILITIES = list(V2_TASK_TYPES) +# 默认能力 +DEFAULT_CAPABILITIES = list(TASK_TYPES) -# v2 统一视频编码参数 -V2_VIDEO_ENCODE_PARAMS = { +# 统一视频编码参数(来自集成文档) +VIDEO_ENCODE_PARAMS = { 'codec': 'libx264', 'preset': 'medium', 'profile': 'main', @@ -37,10 +29,20 @@ V2_VIDEO_ENCODE_PARAMS = { 'pix_fmt': 'yuv420p', } -# v2 统一音频编码参数 -V2_AUDIO_ENCODE_PARAMS = { +# 统一音频编码参数 +AUDIO_ENCODE_PARAMS = { 'codec': 'aac', 'bitrate': '128k', 'sample_rate': '48000', 'channels': '2', } + +# 错误码 +ERROR_CODES = { + 'E_INPUT_UNAVAILABLE': '素材不可访问', + 'E_FFMPEG_FAILED': 'FFmpeg 执行失败', + 'E_UPLOAD_FAILED': '上传失败', + 'E_SPEC_INVALID': '渲染规格非法', + 'E_TIMEOUT': '执行超时', + 'E_UNKNOWN': '未知错误', +} diff --git a/entity/ffmpeg.py b/entity/ffmpeg.py deleted file mode 100644 index fd195d3..0000000 --- a/entity/ffmpeg.py +++ /dev/null @@ -1,538 +0,0 @@ -import json -import os -import time -import uuid -from typing import Any - -DEFAULT_ARGS = ("-shortest",) -ENCODER_ARGS = ("-c:v", "h264", ) if not os.getenv("ENCODER_ARGS", False) else os.getenv("ENCODER_ARGS", "").split(" ") -VIDEO_ARGS = ("-profile:v", "high", "-level:v", "4", ) if not os.getenv("VIDEO_ARGS", False) else os.getenv("VIDEO_ARGS", "").split(" ") -AUDIO_ARGS = ("-c:a", "aac", "-b:a", "128k", "-ar", "48000", "-ac", "2", ) -MUTE_AUDIO_INPUT = ("-f", "lavfi", "-i", "anullsrc=cl=stereo:r=48000", ) - - -def get_mp4toannexb_filter(): - """ - Determine which mp4toannexb filter to use based on ENCODER_ARGS. - Returns 'hevc_mp4toannexb' if ENCODER_ARGS contains 'hevc', otherwise 'h264_mp4toannexb'. - """ - encoder_args_str = os.getenv("ENCODER_ARGS", "").lower() - if "hevc" in encoder_args_str: - return "hevc_mp4toannexb" - return "h264_mp4toannexb" - - -class FfmpegTask(object): - - effects: list[str] - - def __init__(self, input_file, task_type='copy', output_file=''): - self.annexb = False - if type(input_file) is str: - if input_file.endswith(".ts"): - self.annexb = True - self.input_file = [input_file] - elif type(input_file) is list: - self.input_file = input_file - else: - self.input_file = [] - self.zoom_cut = None - self.center_cut = None - self.ext_data = {} - self.task_type = task_type - self.output_file = output_file - self.mute = True - self.speed = 1 - self.frame_rate = 25 - self.resolution = None - self.subtitles = [] - self.luts = [] - self.audios = [] - self.overlays = [] - self.effects = [] - - def __repr__(self): - _str = f'FfmpegTask(input_file={self.input_file}, task_type={self.task_type}' - if len(self.luts) > 0: - _str += f', luts={self.luts}' - if len(self.audios) > 0: - _str += f', audios={self.audios}' - if len(self.overlays) > 0: - _str += f', overlays={self.overlays}' - if self.annexb: - _str += f', annexb={self.annexb}' - if self.effects: - _str += f', effects={self.effects}' - if self.mute: - _str += f', mute={self.mute}' - _str += f', center_cut={self.center_cut}' - return _str + ')' - - def analyze_input_render_tasks(self): - for i in self.input_file: - if type(i) is str: - continue - elif isinstance(i, FfmpegTask): - if i.need_run(): - yield i - - def need_run(self): - """ - 判断是否需要运行 - :rtype: bool - :return: - """ - if self.annexb: - return True - # TODO: copy from url - return not self.check_can_copy() - - def add_inputs(self, *inputs): - self.input_file.extend(inputs) - - def add_overlay(self, *overlays): - for overlay in overlays: - if str(overlay).endswith('.ass'): - self.subtitles.append(overlay) - else: - self.overlays.append(overlay) - self.correct_task_type() - - def add_audios(self, *audios): - self.audios.extend(audios) - self.correct_task_type() - self.check_audio_track() - - def add_lut(self, *luts): - self.luts.extend(luts) - self.correct_task_type() - - def add_effect(self, *effects): - self.effects.extend(effects) - self.correct_task_type() - - def get_output_file(self): - if self.task_type == 'copy': - return self.input_file[0] - if self.output_file == '': - self.set_output_file() - return self.output_file - - def correct_task_type(self): - if self.check_can_copy(): - self.task_type = 'copy' - elif self.check_can_concat(): - self.task_type = 'concat' - else: - self.task_type = 'encode' - - def check_can_concat(self): - if len(self.luts) > 0: - return False - if len(self.overlays) > 0: - return False - if len(self.subtitles) > 0: - return False - if len(self.effects) > 0: - return False - if self.speed != 1: - return False - if self.zoom_cut is not None: - return False - if self.center_cut is not None: - return False - return True - - def check_can_copy(self): - if len(self.luts) > 0: - return False - if len(self.overlays) > 0: - return False - if len(self.subtitles) > 0: - return False - if len(self.effects) > 0: - return False - if self.speed != 1: - return False - if len(self.audios) >= 1: - return False - if len(self.input_file) > 1: - return False - if self.zoom_cut is not None: - return False - if self.center_cut is not None: - return False - return True - - def check_audio_track(self): - ... - - def get_ffmpeg_args(self): - args = ['-y', '-hide_banner'] - if self.task_type == 'encode': - input_args = [] - filter_args = [] - output_args = [*VIDEO_ARGS, *AUDIO_ARGS, *ENCODER_ARGS, *DEFAULT_ARGS] - if self.annexb: - output_args.append("-bsf:v") - output_args.append(get_mp4toannexb_filter()) - output_args.append("-reset_timestamps") - output_args.append("1") - video_output_str = "[0:v]" - audio_output_str = "" - audio_track_index = 0 - effect_index = 0 - for input_file in self.input_file: - input_args.append("-i") - if type(input_file) is str: - input_args.append(input_file) - elif isinstance(input_file, FfmpegTask): - input_args.append(input_file.get_output_file()) - if self.center_cut == 1: - pos_json_str = self.ext_data.get('posJson', '{}') - try: - pos_json = json.loads(pos_json_str) - except Exception as e: - pos_json = {} - _v_w = pos_json.get('imgWidth', 1) - _f_x = pos_json.get('ltX', 0) - _f_x2 = pos_json.get('rbX', 0) - _x = f'{float((_f_x2 + _f_x)/(2 * _v_w)) :.4f}*iw-ih*ih/(2*iw)' - filter_args.append(f"{video_output_str}crop=x={_x}:y=0:w=ih*ih/iw:h=ih[v_cut{effect_index}]") - video_output_str = f"[v_cut{effect_index}]" - effect_index += 1 - if self.zoom_cut == 1 and self.resolution: - _input = None - for input_file in self.input_file: - if type(input_file) is str: - _input = input_file - break - elif isinstance(input_file, FfmpegTask): - _input = input_file.get_output_file() - break - if _input: - from util.ffmpeg import probe_video_info - _iw, _ih, _ = probe_video_info(_input) - _w, _h = self.resolution.split('x', 1) - pos_json_str = self.ext_data.get('posJson', '{}') - try: - pos_json = json.loads(pos_json_str) - except Exception as e: - pos_json = {} - _v_w = pos_json.get('imgWidth', 1) - _v_h = pos_json.get('imgHeight', 1) - _f_x = pos_json.get('ltX', 0) - _f_x2 = pos_json.get('rbX', 0) - _f_y = pos_json.get('ltY', 0) - _f_y2 = pos_json.get('rbY', 0) - _x = min(max(0, int((_f_x + _f_x2) / 2 - int(_w) / 2)), _iw - int(_w)) - _y = min(max(0, int((_f_y + _f_y2) / 2 - int(_h) / 2)), _ih - int(_h)) - filter_args.append(f"{video_output_str}crop=x={_x}:y={_y}:w={_w}:h={_h}[vz_cut{effect_index}]") - video_output_str = f"[vz_cut{effect_index}]" - effect_index += 1 - for effect in self.effects: - if effect.startswith("cameraShot:"): - param = effect.split(":", 2)[1] - if param == '': - param = "3,1,0" - _split = param.split(",") - start = 3 - duration = 1 - rotate_deg = 0 - if len(_split) >= 3: - if _split[2] == '': - rotate_deg = 0 - else: - rotate_deg = int(_split[2]) - if len(_split) >= 2: - duration = float(_split[1]) - if len(_split) >= 1: - start = float(_split[0]) - _start_out_str = "[eff_s]" - _mid_out_str = "[eff_m]" - _end_out_str = "[eff_e]" - filter_args.append(f"{video_output_str}split=3{_start_out_str}{_mid_out_str}{_end_out_str}") - filter_args.append(f"{_start_out_str}select=lt(n\\,{int(start * self.frame_rate)}){_start_out_str}") - filter_args.append(f"{_end_out_str}select=gt(n\\,{int(start * self.frame_rate)}){_end_out_str}") - filter_args.append(f"{_mid_out_str}select=eq(n\\,{int(start * self.frame_rate)}){_mid_out_str}") - filter_args.append( - f"{_mid_out_str}tpad=start_mode=clone:start_duration={duration:.4f}{_mid_out_str}") - if rotate_deg != 0: - filter_args.append(f"{_mid_out_str}rotate=PI*{rotate_deg}/180{_mid_out_str}") - # filter_args.append(f"{video_output_str}trim=start=0:end={start+duration},tpad=stop_mode=clone:stop_duration={duration},setpts=PTS-STARTPTS{_start_out_str}") - # filter_args.append(f"tpad=start_mode=clone:start_duration={duration},setpts=PTS-STARTPTS{_start_out_str}") - # filter_args.append(f"{_end_out_str}trim=start={start}{_end_out_str}") - video_output_str = f"[v_eff{effect_index}]" - # filter_args.append(f"{_end_out_str}{_start_out_str}overlay=eof_action=pass{video_output_str}") - filter_args.append(f"{_start_out_str}{_mid_out_str}{_end_out_str}concat=n=3:v=1:a=0,setpts=N/{self.frame_rate}/TB{video_output_str}") - effect_index += 1 - elif effect.startswith("ospeed:"): - param = effect.split(":", 2)[1] - if param == '': - param = "1" - if param != "1": - # 视频变速:使用fps实现,避免PTS冲突 - effect_index += 1 - filter_args.append(f"{video_output_str}setpts={param}*PTS[v_eff{effect_index}]") - video_output_str = f"[v_eff{effect_index}]" - elif effect.startswith("zoom:"): - param = effect.split(":", 2)[1] - if param == '': - continue - _split = param.split(",") - if len(_split) < 2: - continue - try: - start_time = float(_split[0]) - zoom_factor = float(_split[1]) - if start_time < 0: - start_time = 0 - if zoom_factor <= 0: - zoom_factor = 1 - except (ValueError, IndexError): - start_time = 0 - zoom_factor = 1 - if zoom_factor == 1: - continue - effect_index += 1 - - left_x = f"iw/(2*{zoom_factor})" - top_y = f"ih/(2*{zoom_factor})" - pos_json_str = self.ext_data.get('posJson', '{}') - try: - pos_json = json.loads(pos_json_str) if pos_json_str != '{}' else {} - if pos_json: - _f_x = pos_json.get('ltX', 0) - _f_x2 = pos_json.get('rbX', 0) - _f_y = pos_json.get('ltY', 0) - _f_y2 = pos_json.get('rbY', 0) - _v_w = pos_json.get('imgWidth', 1) - _v_h = pos_json.get('imgHeight', 1) - if _v_w > 0 and _v_h > 0: - # 计算坐标系统中的中心点 - center_x_ratio = (_f_x + _f_x2) / (2 * _v_w) - center_y_ratio = (_f_y + _f_y2) / (2 * _v_h) - # 转换为视频坐标系统 - left_x = f"iw*({center_x_ratio:.6f}-1/(2*{zoom_factor}))" - top_y = f"ih*({center_y_ratio:.6f}-1/(2*{zoom_factor}))" - except Exception as e: - # 解析失败使用默认中心 - pass - - # 静态缩放(整个视频时长) - filter_args.append(f"{video_output_str}scale={zoom_factor}*iw:{zoom_factor}*ih,crop=iw/{zoom_factor}:ih/{zoom_factor}:{left_x}:{top_y}[v_eff{effect_index}]") - video_output_str = f"[v_eff{effect_index}]" - elif effect.startswith("skip:"): - param = effect.split(":", 2)[1] - if param == '': - param = "0" - skip_seconds = float(param) - if skip_seconds > 0: - effect_index += 1 - filter_args.append(f"{video_output_str}trim=start={skip_seconds},setpts=PTS-STARTPTS[v_eff{effect_index}]") - video_output_str = f"[v_eff{effect_index}]" - elif effect.startswith("tail:"): - param = effect.split(":", 2)[1] - if param == '': - param = "0" - tail_seconds = float(param) - if tail_seconds > 0: - effect_index += 1 - # 首先获取视频总时长,然后计算开始时间 - # 使用reverse+trim+reverse的方法来精确获取最后N秒 - filter_args.append(f"{video_output_str}reverse[v_rev{effect_index}]") - filter_args.append(f"[v_rev{effect_index}]trim=duration={tail_seconds}[v_trim{effect_index}]") - filter_args.append(f"[v_trim{effect_index}]reverse,setpts=PTS-STARTPTS[v_eff{effect_index}]") - video_output_str = f"[v_eff{effect_index}]" - elif effect.startswith("show:"): - param = effect.split(":", 2)[1] - if param == '': - param = "0" - show_seconds = float(param) - if show_seconds > 0: - effect_index += 1 - filter_args.append(f"{video_output_str}trim=end={show_seconds},setpts=PTS-STARTPTS[v_eff{effect_index}]") - video_output_str = f"[v_eff{effect_index}]" - elif effect.startswith("grid4:"): - param = effect.split(":", 2)[1] - if param == '': - param = "1" - delay_seconds = float(param) - effect_index += 1 - - # 获取分辨率,如果没有设置则使用默认值 - if self.resolution: - width, height = self.resolution.split('x') - else: - width, height = "1920", "1080" # 默认分辨率 - - # 计算四宫格中每个象限的大小 - grid_width = int(width) // 2 - grid_height = int(height) // 2 - - # 分割视频流为4份 - filter_args.append(f"{video_output_str}split=4[grid_v1_{effect_index}][grid_v2_{effect_index}][grid_v3_{effect_index}][grid_v4_{effect_index}]") - - # 创建黑色背景 - filter_args.append(f"color=black:size={width}x{height}:duration=1[bg_{effect_index}]") - - # 缩放每个视频流到绝对尺寸 - filter_args.append(f"[grid_v1_{effect_index}]scale={grid_width}:{grid_height}[v1_scaled_{effect_index}]") - filter_args.append(f"[grid_v2_{effect_index}]scale={grid_width}:{grid_height},tpad=start_duration={delay_seconds}[v2_scaled_{effect_index}]") - filter_args.append(f"[grid_v3_{effect_index}]scale={grid_width}:{grid_height},tpad=start_duration={delay_seconds*2}[v3_scaled_{effect_index}]") - filter_args.append(f"[grid_v4_{effect_index}]scale={grid_width}:{grid_height},tpad=start_duration={delay_seconds*3}[v4_scaled_{effect_index}]") - - # 使用overlay将四个视频流叠加到四个位置 - filter_args.append(f"[bg_{effect_index}][v1_scaled_{effect_index}]overlay=0:0:shortest=1[grid_step1_{effect_index}]") - filter_args.append(f"[grid_step1_{effect_index}][v2_scaled_{effect_index}]overlay=w/2:0:shortest=1[grid_step2_{effect_index}]") - filter_args.append(f"[grid_step2_{effect_index}][v3_scaled_{effect_index}]overlay=0:h/2:shortest=1[grid_step3_{effect_index}]") - filter_args.append(f"[grid_step3_{effect_index}][v4_scaled_{effect_index}]overlay=w/2:h/2:shortest=1[v_eff{effect_index}]") - - video_output_str = f"[v_eff{effect_index}]" - ... - if self.resolution: - filter_args.append(f"{video_output_str}scale={self.resolution.replace('x', ':')}[v]") - video_output_str = "[v]" - for lut in self.luts: - filter_args.append(f"{video_output_str}lut3d=file={lut}{video_output_str}") - for overlay in self.overlays: - input_index = input_args.count("-i") - input_args.append("-i") - input_args.append(overlay) - if os.getenv("OLD_FFMPEG"): - filter_args.append(f"{video_output_str}[{input_index}:v]scale2ref=iw:ih[v]") - else: - filter_args.append(f"{video_output_str}[{input_index}:v]scale=rw:rh[v]") - filter_args.append(f"[v][{input_index}:v]overlay=1:eof_action=endall[v]") - video_output_str = "[v]" - for subtitle in self.subtitles: - filter_args.append(f"{video_output_str}ass={subtitle}[v]") - video_output_str = "[v]" - output_args.append("-map") - output_args.append(video_output_str) - output_args.append("-r") - output_args.append(f"{self.frame_rate}") - output_args.append("-fps_mode") - output_args.append("cfr") - if self.mute: - input_index = input_args.count("-i") - input_args += MUTE_AUDIO_INPUT - filter_args.append(f"[{input_index}:a]acopy[a]") - audio_track_index += 1 - audio_output_str = "[a]" - else: - audio_output_str = "[0:a]" - audio_track_index += 1 - for audio in self.audios: - input_index = input_args.count("-i") - input_args.append("-i") - input_args.append(audio.replace("\\", "/")) - audio_track_index += 1 - filter_args.append(f"{audio_output_str}[{input_index}:a]amix=duration=shortest:dropout_transition=0:normalize=0[a]") - audio_output_str = "[a]" - if audio_output_str: - output_args.append("-map") - output_args.append(audio_output_str) - _filter_args = [] if len(filter_args) == 0 else ["-filter_complex", ";".join(filter_args)] - return args + input_args + _filter_args + output_args + [self.get_output_file()] - elif self.task_type == 'concat': - # 无法通过 annexb 合并的 - input_args = [] - output_args = [*DEFAULT_ARGS] - filter_args = [] - audio_output_str = "" - audio_track_index = 0 - # output_args - if len(self.input_file) == 1: - _file = self.input_file[0] - from util.ffmpeg import probe_video_audio - if type(_file) is str: - input_args += ["-i", _file] - self.mute = not probe_video_audio(_file) - elif isinstance(_file, FfmpegTask): - input_args += ["-i", _file.get_output_file()] - self.mute = not probe_video_audio(_file.get_output_file()) - else: - _tmp_file = "tmp_concat_" + str(time.time()) + ".txt" - from util.ffmpeg import probe_video_audio - with open(_tmp_file, "w", encoding="utf-8") as f: - for input_file in self.input_file: - if type(input_file) is str: - f.write("file '" + input_file + "'\n") - elif isinstance(input_file, FfmpegTask): - f.write("file '" + input_file.get_output_file() + "'\n") - input_args += ["-f", "concat", "-safe", "0", "-i", _tmp_file] - self.mute = not probe_video_audio(_tmp_file, "concat") - output_args.append("-map") - output_args.append("0:v") - output_args.append("-c:v") - output_args.append("copy") - if self.mute: - input_index = input_args.count("-i") - input_args += MUTE_AUDIO_INPUT - audio_output_str = f"[{input_index}:a]" - audio_track_index += 1 - else: - audio_output_str = "[0:a]" - audio_track_index += 1 - for audio in self.audios: - input_index = input_args.count("-i") - input_args.append("-i") - input_args.append(audio.replace("\\", "/")) - audio_track_index += 1 - filter_args.append(f"{audio_output_str}[{input_index}:a]amix=duration=shortest:dropout_transition=0:normalize=0[a]") - audio_output_str = "[a]" - if audio_output_str: - output_args.append("-map") - if audio_track_index <= 1: - output_args.append(audio_output_str[1:-1]) - else: - output_args.append(audio_output_str) - output_args += AUDIO_ARGS - if self.annexb: - output_args.append("-bsf:v") - output_args.append(get_mp4toannexb_filter()) - output_args.append("-bsf:a") - output_args.append("setts=pts=DTS") - output_args.append("-f") - output_args.append("mpegts" if self.annexb else "mp4") - _filter_args = [] if len(filter_args) == 0 else ["-filter_complex", ";".join(filter_args)] - return args + input_args + _filter_args + output_args + [self.get_output_file()] - elif self.task_type == 'copy': - if len(self.input_file) == 1: - if type(self.input_file[0]) is str: - if self.input_file[0] == self.get_output_file(): - return [] - return args + ["-i", self.input_file[0]] + ["-c", "copy", self.get_output_file()] - return [] - - def set_output_file(self, file=None): - if file is None: - if self.output_file == '': - if self.annexb: - self.output_file = "rand_" + str(uuid.uuid4()) + ".ts" - else: - self.output_file = "rand_" + str(uuid.uuid4()) + ".mp4" - else: - if isinstance(file, FfmpegTask): - if file == self: - return - self.output_file = file.get_output_file() - if type(file) is str: - self.output_file = file - - def check_annexb(self): - for input_file in self.input_file: - if type(input_file) is str: - if self.task_type == 'encode': - return self.annexb - elif self.task_type == 'concat': - return False - elif self.task_type == 'copy': - return self.annexb - else: - return False - elif isinstance(input_file, FfmpegTask): - if not input_file.check_annexb(): - return False - return True \ No newline at end of file diff --git a/handlers/base.py b/handlers/base.py index 064e8ed..0d8f4ab 100644 --- a/handlers/base.py +++ b/handlers/base.py @@ -6,19 +6,19 @@ """ import os +import json import logging import shutil import tempfile import subprocess from abc import ABC -from typing import Optional, List, TYPE_CHECKING +from typing import Optional, List, Dict, Any, Tuple, TYPE_CHECKING from core.handler import TaskHandler from domain.task import Task from domain.result import TaskResult, ErrorCode from domain.config import WorkerConfig -from util import oss -from util.ffmpeg import subprocess_args +from services import storage if TYPE_CHECKING: from services.api_client import APIClientV2 @@ -45,6 +45,117 @@ AUDIO_ENCODE_ARGS = [ ] +def subprocess_args(include_stdout: bool = True) -> Dict[str, Any]: + """ + 创建跨平台的 subprocess 参数 + + 在 Windows 上使用 Pyinstaller --noconsole 打包时,需要特殊处理以避免弹出命令行窗口。 + + Args: + include_stdout: 是否包含 stdout 捕获 + + Returns: + subprocess.run 使用的参数字典 + """ + ret: Dict[str, Any] = {} + + # Windows 特殊处理 + if hasattr(subprocess, 'STARTUPINFO'): + si = subprocess.STARTUPINFO() + si.dwFlags |= subprocess.STARTF_USESHOWWINDOW + ret['startupinfo'] = si + ret['env'] = os.environ + + # 重定向 stdin 避免 "handle is invalid" 错误 + ret['stdin'] = subprocess.PIPE + + if include_stdout: + ret['stdout'] = subprocess.PIPE + + return ret + + +def probe_video_info(video_file: str) -> Tuple[int, int, float]: + """ + 探测视频信息(宽度、高度、时长) + + Args: + video_file: 视频文件路径 + + Returns: + (width, height, duration) 元组,失败返回 (0, 0, 0) + """ + try: + result = subprocess.run( + [ + 'ffprobe', '-v', 'error', + '-select_streams', 'v:0', + '-show_entries', 'stream=width,height:format=duration', + '-of', 'csv=s=x:p=0', + video_file + ], + capture_output=True, + timeout=30, + **subprocess_args(False) + ) + + if result.returncode != 0: + logger.warning(f"ffprobe failed for {video_file}") + return 0, 0, 0 + + output = result.stdout.decode('utf-8').strip() + if not output: + return 0, 0, 0 + + lines = output.split('\n') + if len(lines) >= 2: + wh = lines[0].strip() + duration_str = lines[1].strip() + width, height = wh.split('x') + return int(width), int(height), float(duration_str) + + return 0, 0, 0 + + except Exception as e: + logger.warning(f"probe_video_info error: {e}") + return 0, 0, 0 + + +def probe_duration_json(file_path: str) -> Optional[float]: + """ + 使用 ffprobe JSON 输出探测媒体时长 + + Args: + file_path: 媒体文件路径 + + Returns: + 时长(秒),失败返回 None + """ + try: + result = subprocess.run( + [ + 'ffprobe', '-v', 'error', + '-show_entries', 'format=duration', + '-of', 'json', + file_path + ], + capture_output=True, + timeout=30, + **subprocess_args(False) + ) + + if result.returncode != 0: + return None + + data = json.loads(result.stdout.decode('utf-8')) + duration = data.get('format', {}).get('duration') + return float(duration) if duration else None + + except Exception as e: + logger.warning(f"probe_duration_json error: {e}") + return None + + class BaseHandler(TaskHandler, ABC): """ 任务处理器基类 @@ -128,7 +239,7 @@ class BaseHandler(TaskHandler, ABC): timeout = self.config.download_timeout try: - result = oss.download_from_oss(url, dest) + result = storage.download_file(url, dest, timeout=timeout) if result: file_size = os.path.getsize(dest) if os.path.exists(dest) else 0 logger.debug(f"Downloaded: {url} -> {dest} ({file_size} bytes)") @@ -171,7 +282,7 @@ class BaseHandler(TaskHandler, ABC): # 上传文件 try: - result = oss.upload_to_oss(upload_url, file_path) + result = storage.upload_file(upload_url, file_path, timeout=self.config.upload_timeout) if result: file_size = os.path.getsize(file_path) logger.info(f"[task:{task_id}] Uploaded: {file_path} ({file_size} bytes)") @@ -241,8 +352,13 @@ class BaseHandler(TaskHandler, ABC): Returns: 时长(秒),失败返回 None """ + # 首先尝试 JSON 输出方式 + duration = probe_duration_json(file_path) + if duration is not None: + return duration + + # 回退到旧方式 try: - from util.ffmpeg import probe_video_info _, _, duration = probe_video_info(file_path) return float(duration) if duration else None except Exception as e: diff --git a/index.py b/index.py index f70013f..b02ddf1 100644 --- a/index.py +++ b/index.py @@ -1,52 +1,177 @@ -from time import sleep +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +RenderWorker v2 入口 + +支持 v2 API 协议的渲染 Worker,处理以下任务类型: +- RENDER_SEGMENT_VIDEO: 渲染视频片段 +- PREPARE_JOB_AUDIO: 生成全局音频 +- PACKAGE_SEGMENT_TS: 封装 TS 分片 +- FINALIZE_MP4: 产出最终 MP4 + +使用方法: + python index.py + +环境变量: + API_ENDPOINT_V2: v2 API 端点(或使用 API_ENDPOINT) + ACCESS_KEY: Worker 认证密钥 + WORKER_ID: Worker ID(默认 100001) + MAX_CONCURRENCY: 最大并发数(默认 4) + HEARTBEAT_INTERVAL: 心跳间隔秒数(默认 5) + TEMP_DIR: 临时文件目录 +""" + import sys - -import config -import biz.task -from telemetry import init_opentelemetry -from template import load_local_template, download_template, TEMPLATES -from util import api - -import os -import glob - -load_local_template() - -# Check for redownload parameter -if 'redownload' in sys.argv: - print("Redownloading all templates...") - for template_name in TEMPLATES.keys(): - print(f"Redownloading template: {template_name}") - download_template(template_name) - print("All templates redownloaded successfully!") - sys.exit(0) +import time +import signal import logging -LOGGER = logging.getLogger(__name__) -init_opentelemetry() +from domain.config import WorkerConfig +from services.api_client import APIClientV2 +from services.task_executor import TaskExecutor +from constant import SOFTWARE_VERSION -while True: - # print(get_sys_info()) - print("waiting for task...") - try: - task_list = api.sync_center() - except Exception as e: - LOGGER.error("sync_center error", exc_info=e) - sleep(5) - continue - if len(task_list) == 0: - # 删除当前文件夹下所有以.mp4、.ts结尾的文件 - for file_globs in ['*.mp4', '*.ts', 'tmp_concat*.txt']: - for file_path in glob.glob(file_globs): - try: - os.remove(file_path) - print(f"Deleted file: {file_path}") - except Exception as e: - LOGGER.error(f"Error deleting file {file_path}", exc_info=e) - sleep(5) - for task in task_list: - print("start task:", task) +# 日志配置 +logging.basicConfig( + level=logging.INFO, + format='[%(asctime)s] [%(levelname)s] [%(name)s] %(message)s', + datefmt='%Y-%m-%d %H:%M:%S' +) +logger = logging.getLogger('worker') + + +class WorkerV2: + """ + v2 渲染 Worker 主类 + + 负责: + - 配置加载 + - API 客户端初始化 + - 任务执行器管理 + - 主循环运行 + - 优雅退出处理 + """ + + def __init__(self): + """初始化 Worker""" + # 加载配置 try: - biz.task.start_task(task) - except Exception as e: - LOGGER.error("task_start error", exc_info=e) + self.config = WorkerConfig.from_env() + except ValueError as e: + logger.error(f"Configuration error: {e}") + sys.exit(1) + + # 初始化 API 客户端 + self.api_client = APIClientV2(self.config) + + # 初始化任务执行器 + self.task_executor = TaskExecutor(self.config, self.api_client) + + # 运行状态 + self.running = True + + # 确保临时目录存在 + self.config.ensure_temp_dir() + + # 注册信号处理器 + self._setup_signal_handlers() + + def _setup_signal_handlers(self): + """设置信号处理器""" + # Windows 不支持 SIGTERM + signal.signal(signal.SIGINT, self._signal_handler) + if hasattr(signal, 'SIGTERM'): + signal.signal(signal.SIGTERM, self._signal_handler) + + def _signal_handler(self, signum, frame): + """ + 信号处理,优雅退出 + + Args: + signum: 信号编号 + frame: 当前栈帧 + """ + signal_name = signal.Signals(signum).name + logger.info(f"Received signal {signal_name}, initiating shutdown...") + self.running = False + + def run(self): + """主循环""" + logger.info("=" * 60) + logger.info("RenderWorker v2 Starting") + logger.info("=" * 60) + logger.info(f"Worker ID: {self.config.worker_id}") + logger.info(f"API Endpoint: {self.config.api_endpoint}") + logger.info(f"Max Concurrency: {self.config.max_concurrency}") + logger.info(f"Heartbeat Interval: {self.config.heartbeat_interval}s") + logger.info(f"Capabilities: {', '.join(self.config.capabilities)}") + logger.info(f"Temp Directory: {self.config.temp_dir}") + logger.info("=" * 60) + + consecutive_errors = 0 + max_consecutive_errors = 10 + + while self.running: + try: + # 心跳同步并拉取任务 + current_task_ids = self.task_executor.get_current_task_ids() + tasks = self.api_client.sync(current_task_ids) + + # 提交新任务 + for task in tasks: + if self.task_executor.submit_task(task): + logger.info(f"Submitted task: {task.task_id} ({task.task_type.value})") + + # 重置错误计数 + consecutive_errors = 0 + + # 等待下次心跳 + time.sleep(self.config.heartbeat_interval) + + except KeyboardInterrupt: + logger.info("Keyboard interrupt received") + self.running = False + except Exception as e: + consecutive_errors += 1 + logger.error(f"Worker loop error ({consecutive_errors}/{max_consecutive_errors}): {e}", exc_info=True) + + # 连续错误过多,增加等待时间 + if consecutive_errors >= max_consecutive_errors: + logger.error("Too many consecutive errors, waiting 30 seconds...") + time.sleep(30) + consecutive_errors = 0 + else: + time.sleep(5) + + # 优雅关闭 + self._shutdown() + + def _shutdown(self): + """优雅关闭""" + logger.info("Shutting down...") + + # 等待当前任务完成 + current_count = self.task_executor.get_current_task_count() + if current_count > 0: + logger.info(f"Waiting for {current_count} running task(s) to complete...") + + # 关闭执行器 + self.task_executor.shutdown(wait=True) + + # 关闭 API 客户端 + self.api_client.close() + + logger.info("Worker stopped") + + +def main(): + """主函数""" + logger.info(f"RenderWorker v{SOFTWARE_VERSION}") + + # 创建并运行 Worker + worker = WorkerV2() + worker.run() + + +if __name__ == '__main__': + main() diff --git a/index_v2.py b/index_v2.py deleted file mode 100644 index 63fc660..0000000 --- a/index_v2.py +++ /dev/null @@ -1,188 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -""" -RenderWorker v2 入口 - -支持 v2 API 协议的渲染 Worker,处理以下任务类型: -- RENDER_SEGMENT_VIDEO: 渲染视频片段 -- PREPARE_JOB_AUDIO: 生成全局音频 -- PACKAGE_SEGMENT_TS: 封装 TS 分片 -- FINALIZE_MP4: 产出最终 MP4 - -使用方法: - python index_v2.py - -环境变量: - API_ENDPOINT_V2: v2 API 端点(或使用 API_ENDPOINT) - ACCESS_KEY: Worker 认证密钥 - WORKER_ID: Worker ID(默认 100001) - MAX_CONCURRENCY: 最大并发数(默认 4) - HEARTBEAT_INTERVAL: 心跳间隔秒数(默认 5) - TEMP_DIR: 临时文件目录 -""" - -import os -import sys -import time -import signal -import logging - -# 加载配置(必须在其他模块之前) -import config - -from domain.config import WorkerConfig -from services.api_client import APIClientV2 -from services.task_executor import TaskExecutor - -# 日志配置 -logging.basicConfig( - level=logging.INFO, - format='[%(asctime)s] [%(levelname)s] [%(name)s] %(message)s', - datefmt='%Y-%m-%d %H:%M:%S' -) -logger = logging.getLogger('worker_v2') - - -class WorkerV2: - """ - v2 渲染 Worker 主类 - - 负责: - - 配置加载 - - API 客户端初始化 - - 任务执行器管理 - - 主循环运行 - - 优雅退出处理 - """ - - def __init__(self): - """初始化 Worker""" - # 加载配置 - try: - self.config = WorkerConfig.from_env() - except ValueError as e: - logger.error(f"Configuration error: {e}") - sys.exit(1) - - # 初始化 API 客户端 - self.api_client = APIClientV2(self.config) - - # 初始化任务执行器 - self.task_executor = TaskExecutor(self.config, self.api_client) - - # 运行状态 - self.running = True - - # 确保临时目录存在 - self.config.ensure_temp_dir() - - # 注册信号处理器 - self._setup_signal_handlers() - - def _setup_signal_handlers(self): - """设置信号处理器""" - # Windows 不支持 SIGTERM - signal.signal(signal.SIGINT, self._signal_handler) - if hasattr(signal, 'SIGTERM'): - signal.signal(signal.SIGTERM, self._signal_handler) - - def _signal_handler(self, signum, frame): - """ - 信号处理,优雅退出 - - Args: - signum: 信号编号 - frame: 当前栈帧 - """ - signal_name = signal.Signals(signum).name - logger.info(f"Received signal {signal_name}, initiating shutdown...") - self.running = False - - def run(self): - """主循环""" - logger.info("=" * 60) - logger.info("RenderWorker v2 Starting") - logger.info("=" * 60) - logger.info(f"Worker ID: {self.config.worker_id}") - logger.info(f"API Endpoint: {self.config.api_endpoint}") - logger.info(f"Max Concurrency: {self.config.max_concurrency}") - logger.info(f"Heartbeat Interval: {self.config.heartbeat_interval}s") - logger.info(f"Capabilities: {', '.join(self.config.capabilities)}") - logger.info(f"Temp Directory: {self.config.temp_dir}") - logger.info("=" * 60) - - consecutive_errors = 0 - max_consecutive_errors = 10 - - while self.running: - try: - # 心跳同步并拉取任务 - current_task_ids = self.task_executor.get_current_task_ids() - tasks = self.api_client.sync(current_task_ids) - - # 提交新任务 - for task in tasks: - if self.task_executor.submit_task(task): - logger.info(f"Submitted task: {task.task_id} ({task.task_type.value})") - - # 重置错误计数 - consecutive_errors = 0 - - # 等待下次心跳 - time.sleep(self.config.heartbeat_interval) - - except KeyboardInterrupt: - logger.info("Keyboard interrupt received") - self.running = False - except Exception as e: - consecutive_errors += 1 - logger.error(f"Worker loop error ({consecutive_errors}/{max_consecutive_errors}): {e}", exc_info=True) - - # 连续错误过多,增加等待时间 - if consecutive_errors >= max_consecutive_errors: - logger.error("Too many consecutive errors, waiting 30 seconds...") - time.sleep(30) - consecutive_errors = 0 - else: - time.sleep(5) - - # 优雅关闭 - self._shutdown() - - def _shutdown(self): - """优雅关闭""" - logger.info("Shutting down...") - - # 等待当前任务完成 - current_count = self.task_executor.get_current_task_count() - if current_count > 0: - logger.info(f"Waiting for {current_count} running task(s) to complete...") - - # 关闭执行器 - self.task_executor.shutdown(wait=True) - - # 关闭 API 客户端 - self.api_client.close() - - logger.info("Worker stopped") - - -def main(): - """主函数""" - # 初始化 OpenTelemetry(如果可用) - try: - from telemetry import init_opentelemetry - init_opentelemetry() - logger.info("OpenTelemetry initialized") - except ImportError: - logger.debug("OpenTelemetry not available, skipping initialization") - except Exception as e: - logger.warning(f"Failed to initialize OpenTelemetry: {e}") - - # 创建并运行 Worker - worker = WorkerV2() - worker.run() - - -if __name__ == '__main__': - main() diff --git a/services/__init__.py b/services/__init__.py index 61e70d7..6503d46 100644 --- a/services/__init__.py +++ b/services/__init__.py @@ -2,15 +2,17 @@ """ 服务层 -包含 API 客户端、任务执行器、租约服务等服务组件。 +包含 API 客户端、任务执行器、租约服务、存储服务等组件。 """ from services.api_client import APIClientV2 from services.lease_service import LeaseService from services.task_executor import TaskExecutor +from services import storage __all__ = [ 'APIClientV2', 'LeaseService', 'TaskExecutor', + 'storage', ] diff --git a/services/storage.py b/services/storage.py new file mode 100644 index 0000000..c807b21 --- /dev/null +++ b/services/storage.py @@ -0,0 +1,200 @@ +# -*- coding: utf-8 -*- +""" +存储服务 + +提供文件上传/下载功能,支持 OSS 签名 URL 和 HTTP_REPLACE_MAP 环境变量。 +""" + +import os +import logging +from typing import Optional + +import requests + +logger = logging.getLogger(__name__) + + +def _apply_http_replace_map(url: str) -> str: + """ + 应用 HTTP_REPLACE_MAP 环境变量替换 URL + + Args: + url: 原始 URL + + Returns: + 替换后的 URL + """ + replace_map = os.getenv("HTTP_REPLACE_MAP", "") + if not replace_map: + return url + + new_url = url + replace_list = [i.split("|", 1) for i in replace_map.split(",") if "|" in i] + for src, dst in replace_list: + new_url = new_url.replace(src, dst) + + if new_url != url: + logger.debug(f"HTTP_REPLACE_MAP: {url} -> {new_url}") + + return new_url + + +def upload_file(url: str, file_path: str, max_retries: int = 5, timeout: int = 60) -> bool: + """ + 使用签名 URL 上传文件到 OSS + + Args: + url: 签名 URL + file_path: 本地文件路径 + max_retries: 最大重试次数 + timeout: 超时时间(秒) + + Returns: + 是否成功 + """ + if not os.path.exists(file_path): + logger.error(f"File not found: {file_path}") + return False + + file_size = os.path.getsize(file_path) + logger.info(f"Uploading: {file_path} ({file_size} bytes)") + + # 检查是否使用 rclone 上传 + if os.getenv("UPLOAD_METHOD") == "rclone": + result = _upload_with_rclone(url, file_path) + if result: + return True + # rclone 失败时回退到 HTTP + + # 应用 HTTP_REPLACE_MAP 替换 URL + http_url = _apply_http_replace_map(url) + + retries = 0 + while retries < max_retries: + try: + with open(file_path, 'rb') as f: + response = requests.put( + http_url, + data=f, + stream=True, + timeout=timeout, + headers={"Content-Type": "application/octet-stream"} + ) + response.raise_for_status() + logger.info(f"Upload succeeded: {file_path}") + return True + + except requests.exceptions.Timeout: + retries += 1 + logger.warning(f"Upload timed out. Retrying {retries}/{max_retries}...") + except requests.exceptions.RequestException as e: + retries += 1 + logger.warning(f"Upload failed ({e}). Retrying {retries}/{max_retries}...") + + logger.error(f"Upload failed after {max_retries} retries: {file_path}") + return False + + +def _upload_with_rclone(url: str, file_path: str) -> bool: + """ + 使用 rclone 上传文件 + + Args: + url: 目标 URL + file_path: 本地文件路径 + + Returns: + 是否成功 + """ + replace_map = os.getenv("RCLONE_REPLACE_MAP", "") + if not replace_map: + return False + + config_file = os.getenv("RCLONE_CONFIG_FILE", "") + rclone_config = f"--config {config_file}" if config_file else "" + + # 替换 URL + new_url = url + replace_list = [i.split("|", 1) for i in replace_map.split(",") if "|" in i] + for src, dst in replace_list: + new_url = new_url.replace(src, dst) + new_url = new_url.split("?", 1)[0] # 移除查询参数 + + if new_url == url: + return False + + cmd = ( + f"rclone copyto --no-check-dest --ignore-existing " + f"--multi-thread-chunk-size 8M --multi-thread-streams 8 " + f"{rclone_config} {file_path} {new_url}" + ) + logger.debug(f"rclone command: {cmd}") + + result = os.system(cmd) + if result == 0: + logger.info(f"rclone upload succeeded: {file_path}") + return True + + logger.warning(f"rclone upload failed (code={result}): {file_path}") + return False + + +def download_file( + url: str, + file_path: str, + max_retries: int = 5, + timeout: int = 30, + skip_if_exist: bool = False +) -> bool: + """ + 使用签名 URL 下载文件 + + Args: + url: 签名 URL + file_path: 本地文件路径 + max_retries: 最大重试次数 + timeout: 超时时间(秒) + skip_if_exist: 如果文件存在则跳过 + + Returns: + 是否成功 + """ + # 如果文件已存在且跳过 + if skip_if_exist and os.path.exists(file_path): + logger.debug(f"File exists, skipping download: {file_path}") + return True + + logger.info(f"Downloading: {url}") + + # 确保目标目录存在 + file_dir = os.path.dirname(file_path) + if file_dir: + os.makedirs(file_dir, exist_ok=True) + + # 应用 HTTP_REPLACE_MAP 替换 URL + http_url = _apply_http_replace_map(url) + + retries = 0 + while retries < max_retries: + try: + response = requests.get(http_url, timeout=timeout, stream=True) + response.raise_for_status() + + with open(file_path, 'wb') as f: + for chunk in response.iter_content(chunk_size=8192): + if chunk: + f.write(chunk) + + file_size = os.path.getsize(file_path) + logger.info(f"Download succeeded: {file_path} ({file_size} bytes)") + return True + + except requests.exceptions.Timeout: + retries += 1 + logger.warning(f"Download timed out. Retrying {retries}/{max_retries}...") + except requests.exceptions.RequestException as e: + retries += 1 + logger.warning(f"Download failed ({e}). Retrying {retries}/{max_retries}...") + + logger.error(f"Download failed after {max_retries} retries: {url}") + return False diff --git a/telemetry/__init__.py b/telemetry/__init__.py deleted file mode 100644 index d532378..0000000 --- a/telemetry/__init__.py +++ /dev/null @@ -1,37 +0,0 @@ -import os - -from constant import SOFTWARE_VERSION -from opentelemetry import trace -from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter as OTLPSpanHttpExporter -from opentelemetry.sdk.resources import DEPLOYMENT_ENVIRONMENT, HOST_NAME, Resource, SERVICE_NAME, SERVICE_VERSION -from opentelemetry.sdk.trace import TracerProvider -from opentelemetry.sdk.trace.export import BatchSpanProcessor, SimpleSpanProcessor -from opentelemetry.instrumentation.threading import ThreadingInstrumentor - -ThreadingInstrumentor().instrument() - -def get_tracer(name): - return trace.get_tracer(name) - -# 初始化 OpenTelemetry -def init_opentelemetry(batch=True): - # 设置服务名、主机名 - resource = Resource(attributes={ - SERVICE_NAME: "RENDER_WORKER", - SERVICE_VERSION: SOFTWARE_VERSION, - DEPLOYMENT_ENVIRONMENT: "Python", - HOST_NAME: os.getenv("ACCESS_KEY"), - }) - - # 使用HTTP协议上报 - if batch: - span_processor = BatchSpanProcessor(OTLPSpanHttpExporter( - endpoint="https://oltp.jerryyan.top/v1/traces", - )) - else: - span_processor = SimpleSpanProcessor(OTLPSpanHttpExporter( - endpoint="https://oltp.jerryyan.top/v1/traces", - )) - - trace_provider = TracerProvider(resource=resource, active_span_processor=span_processor) - trace.set_tracer_provider(trace_provider) diff --git a/template/.gitignore b/template/.gitignore deleted file mode 100644 index 01b7e33..0000000 --- a/template/.gitignore +++ /dev/null @@ -1 +0,0 @@ -**/* \ No newline at end of file diff --git a/template/__init__.py b/template/__init__.py deleted file mode 100644 index de98076..0000000 --- a/template/__init__.py +++ /dev/null @@ -1,128 +0,0 @@ -import json -import os -import logging - -from telemetry import get_tracer -from util import api, oss - -TEMPLATES = {} -logger = logging.getLogger("template") - -def check_local_template(local_name): - template_def = TEMPLATES[local_name] - base_dir = template_def.get("local_path") - for video_part in template_def.get("video_parts", []): - source_file = video_part.get("source", "") - if str(source_file).startswith("http"): - # download file - ... - elif str(source_file).startswith("PLACEHOLDER_"): - continue - else: - if not os.path.isabs(source_file): - source_file = os.path.join(base_dir, source_file) - if not os.path.exists(source_file): - logger.error(f"{source_file} not found, please check the template definition") - raise Exception(f"{source_file} not found, please check the template definition") - for audio in video_part.get("audios", []): - if not os.path.isabs(audio): - audio = os.path.join(base_dir, audio) - if not os.path.exists(audio): - logger.error(f"{audio} not found, please check the template definition") - raise Exception(f"{audio} not found, please check the template definition") - for lut in video_part.get("luts", []): - if not os.path.isabs(lut): - lut = os.path.join(base_dir, lut) - if not os.path.exists(lut): - logger.error(f"{lut} not found, please check the template definition") - raise Exception(f"{lut} not found, please check the template definition") - for mask in video_part.get("overlays", []): - if not os.path.isabs(mask): - mask = os.path.join(base_dir, mask) - if not os.path.exists(mask): - logger.error(f"{mask} not found, please check the template definition") - raise Exception(f"{mask} not found, please check the template definition") - - -def load_template(template_name, local_path): - global TEMPLATES - logger.info(f"加载视频模板定义:【{template_name}({local_path})】") - template_def_file = os.path.join(local_path, "template.json") - if os.path.exists(template_def_file): - TEMPLATES[template_name] = json.load(open(template_def_file, 'rb')) - TEMPLATES[template_name]["local_path"] = local_path - try: - check_local_template(template_name) - logger.info(f"完成加载【{template_name}】模板") - except Exception as e: - logger.error(f"模板定义文件【{template_def_file}】有误,正在尝试重新下载模板", exc_info=e) - download_template(template_name) - - -def load_local_template(): - for template_name in os.listdir(os.getenv("TEMPLATE_DIR")): - if template_name.startswith("_"): - continue - if template_name.startswith("."): - continue - target_path = os.path.join(os.getenv("TEMPLATE_DIR"), template_name) - if os.path.isdir(target_path): - load_template(template_name, target_path) - - -def get_template_def(template_id): - if template_id not in TEMPLATES: - download_template(template_id) - return TEMPLATES.get(template_id) - -def download_template(template_id): - tracer = get_tracer(__name__) - with tracer.start_as_current_span("download_template"): - template_info = api.get_template_info(template_id) - if template_info is None: - return - if not os.path.isdir(template_info['local_path']): - os.makedirs(template_info['local_path']) - # download template assets - overall_template = template_info['overall_template'] - video_parts = template_info['video_parts'] - def _download_assets(_template): - if 'source' in _template: - if str(_template['source']).startswith("http"): - _, _fn = os.path.split(_template['source']) - new_fp = os.path.join(template_info['local_path'], _fn) - oss.download_from_oss(_template['source'], new_fp) - if _fn.endswith(".mp4"): - from util.ffmpeg import re_encode_and_annexb - new_fp = re_encode_and_annexb(new_fp) - _template['source'] = os.path.relpath(new_fp, template_info['local_path']) - if 'overlays' in _template: - for i in range(len(_template['overlays'])): - overlay = _template['overlays'][i] - if str(overlay).startswith("http"): - _, _fn = os.path.split(overlay) - oss.download_from_oss(overlay, os.path.join(template_info['local_path'], _fn)) - _template['overlays'][i] = _fn - if 'luts' in _template: - for i in range(len(_template['luts'])): - lut = _template['luts'][i] - if str(lut).startswith("http"): - _, _fn = os.path.split(lut) - oss.download_from_oss(lut, os.path.join(template_info['local_path'], _fn)) - _template['luts'][i] = _fn - if 'audios' in _template: - for i in range(len(_template['audios'])): - if str(_template['audios'][i]).startswith("http"): - _, _fn = os.path.split(_template['audios'][i]) - oss.download_from_oss(_template['audios'][i], os.path.join(template_info['local_path'], _fn)) - _template['audios'][i] = _fn - _download_assets(overall_template) - for video_part in video_parts: - _download_assets(video_part) - with open(os.path.join(template_info['local_path'], 'template.json'), 'w', encoding='utf-8') as f: - json.dump(template_info, f) - load_template(template_id, template_info['local_path']) - - -def analyze_template(template_id): - ... diff --git a/util/__init__.py b/util/__init__.py new file mode 100644 index 0000000..b9e4d83 --- /dev/null +++ b/util/__init__.py @@ -0,0 +1,15 @@ +# -*- coding: utf-8 -*- +""" +工具模块 + +提供系统信息采集等工具函数。 +""" + +from util.system import get_sys_info, get_capabilities, get_gpu_info, get_ffmpeg_version + +__all__ = [ + 'get_sys_info', + 'get_capabilities', + 'get_gpu_info', + 'get_ffmpeg_version', +] diff --git a/util/api.py b/util/api.py deleted file mode 100644 index 78d165f..0000000 --- a/util/api.py +++ /dev/null @@ -1,260 +0,0 @@ -import json -import logging -import os -import threading - -import requests -from opentelemetry.trace import Status, StatusCode - -import util.system -from telemetry import get_tracer -from util import oss - -session = requests.Session() -logger = logging.getLogger(__name__) - - -def normalize_task(task_info): - ... - return task_info - - -def sync_center(): - """ - 通过接口获取任务 - :return: 任务列表 - """ - from template import TEMPLATES, download_template - try: - response = session.post(os.getenv('API_ENDPOINT') + "/sync", json={ - 'accessKey': os.getenv('ACCESS_KEY'), - 'clientStatus': util.system.get_sys_info(), - 'templateList': [{'id': t.get('id', ''), 'updateTime': t.get('updateTime', '')} for t in - TEMPLATES.values()] - }, timeout=10) - response.raise_for_status() - except requests.RequestException as e: - logger.error("请求失败!", e) - return [] - data = response.json() - logger.debug("获取任务结果:【%s】", data) - if data.get('code', 0) == 200: - templates = data.get('data', {}).get('templates', []) - tasks = data.get('data', {}).get('tasks', []) - else: - tasks = [] - templates = [] - logger.warning("获取任务失败") - if os.getenv("REDIRECT_TO_URL", False) != False: - for task in tasks: - _sess = requests.Session() - logger.info("重定向任务【%s】至配置的地址:%s", task.get("id"), os.getenv("REDIRECT_TO_URL")) - url = f"{os.getenv('REDIRECT_TO_URL')}{task.get('id')}" - threading.Thread(target=requests.post, args=(url,)).start() - return [] - for template in templates: - template_id = template.get('id', '') - if template_id: - logger.info("更新模板:【%s】", template_id) - download_template(template_id) - return tasks - - -def get_template_info(template_id): - """ - 通过接口获取模板信息 - :rtype: Template - :param template_id: 模板id - :type template_id: str - :return: 模板信息 - """ - tracer = get_tracer(__name__) - with tracer.start_as_current_span("get_template_info"): - with tracer.start_as_current_span("get_template_info.request") as req_span: - try: - req_span.set_attribute("http.method", "POST") - req_span.set_attribute("http.url", '{0}/template/{1}'.format(os.getenv('API_ENDPOINT'), template_id)) - response = session.post('{0}/template/{1}'.format(os.getenv('API_ENDPOINT'), template_id), json={ - 'accessKey': os.getenv('ACCESS_KEY'), - }, timeout=10) - req_span.set_attribute("http.status_code", response.status_code) - req_span.set_attribute("http.response", response.text) - response.raise_for_status() - except requests.RequestException as e: - req_span.set_attribute("api.error", str(e)) - logger.error("请求失败!", e) - return None - data = response.json() - logger.debug("获取模板信息结果:【%s】", data) - remote_template_info = data.get('data', {}) - if not remote_template_info: - logger.warning("获取模板信息结果为空", data) - return None - template = { - 'id': template_id, - 'updateTime': remote_template_info.get('updateTime', template_id), - 'scenic_name': remote_template_info.get('scenicName', '景区'), - 'name': remote_template_info.get('name', '模版'), - 'video_size': remote_template_info.get('resolution', '1920x1080'), - 'frame_rate': 25, - 'overall_duration': 30, - 'video_parts': [ - - ] - } - - def _template_normalizer(template_info): - _template = {} - _placeholder_type = template_info.get('isPlaceholder', -1) - if _placeholder_type == 0: - # 固定视频 - _template['source'] = template_info.get('sourceUrl', '') - elif _placeholder_type == 1: - # 占位符 - _template['source'] = "PLACEHOLDER_" + template_info.get('sourceUrl', '') - _template['mute'] = template_info.get('mute', True) - _template['crop_mode'] = template_info.get('cropEnable', None) - _template['zoom_cut'] = template_info.get('zoomCut', None) - else: - _template['source'] = None - _overlays = template_info.get('overlays', '') - if _overlays: - _template['overlays'] = _overlays.split(",") - _audios = template_info.get('audios', '') - if _audios: - _template['audios'] = _audios.split(",") - _luts = template_info.get('luts', '') - if _luts: - _template['luts'] = _luts.split(",") - _only_if = template_info.get('onlyIf', '') - if _only_if: - _template['only_if'] = _only_if - _effects = template_info.get('effects', '') - if _effects: - _template['effects'] = _effects.split("|") - return _template - - # outer template definition - overall_template = _template_normalizer(remote_template_info) - template['overall_template'] = overall_template - # inter template definition - inter_template_list = remote_template_info.get('children', []) - for children_template in inter_template_list: - parts = _template_normalizer(children_template) - template['video_parts'].append(parts) - template['local_path'] = os.path.join(os.getenv('TEMPLATE_DIR'), str(template_id)) - with get_tracer("api").start_as_current_span("get_template_info.template") as res_span: - res_span.set_attribute("normalized.response", json.dumps(template)) - return template - - -def report_task_success(task_info, **kwargs): - tracer = get_tracer(__name__) - with tracer.start_as_current_span("report_task_success"): - with tracer.start_as_current_span("report_task_success.request") as req_span: - try: - req_span.set_attribute("http.method", "POST") - req_span.set_attribute("http.url", - '{0}/{1}/success'.format(os.getenv('API_ENDPOINT'), task_info.get("id"))) - response = session.post('{0}/{1}/success'.format(os.getenv('API_ENDPOINT'), task_info.get("id")), json={ - 'accessKey': os.getenv('ACCESS_KEY'), - **kwargs - }, timeout=10) - req_span.set_attribute("http.status_code", response.status_code) - req_span.set_attribute("http.response", response.text) - response.raise_for_status() - req_span.set_status(Status(StatusCode.OK)) - except requests.RequestException as e: - req_span.set_attribute("api.error", str(e)) - logger.error("请求失败!", e) - return None - - -def report_task_start(task_info): - tracer = get_tracer(__name__) - with tracer.start_as_current_span("report_task_start"): - with tracer.start_as_current_span("report_task_start.request") as req_span: - try: - req_span.set_attribute("http.method", "POST") - req_span.set_attribute("http.url", - '{0}/{1}/start'.format(os.getenv('API_ENDPOINT'), task_info.get("id"))) - response = session.post('{0}/{1}/start'.format(os.getenv('API_ENDPOINT'), task_info.get("id")), json={ - 'accessKey': os.getenv('ACCESS_KEY'), - }, timeout=10) - req_span.set_attribute("http.status_code", response.status_code) - req_span.set_attribute("http.response", response.text) - response.raise_for_status() - req_span.set_status(Status(StatusCode.OK)) - except requests.RequestException as e: - req_span.set_attribute("api.error", str(e)) - logger.error("请求失败!", e) - return None - - -def report_task_failed(task_info, reason=''): - tracer = get_tracer(__name__) - with tracer.start_as_current_span("report_task_failed") as span: - span.set_attribute("task_id", task_info.get("id")) - span.set_attribute("reason", reason) - with tracer.start_as_current_span("report_task_failed.request") as req_span: - try: - req_span.set_attribute("http.method", "POST") - req_span.set_attribute("http.url", - '{0}/{1}/fail'.format(os.getenv('API_ENDPOINT'), task_info.get("id"))) - response = session.post('{0}/{1}/fail'.format(os.getenv('API_ENDPOINT'), task_info.get("id")), json={ - 'accessKey': os.getenv('ACCESS_KEY'), - 'reason': reason - }, timeout=10) - req_span.set_attribute("http.status_code", response.status_code) - req_span.set_attribute("http.response", response.text) - response.raise_for_status() - req_span.set_status(Status(StatusCode.OK)) - except requests.RequestException as e: - req_span.set_attribute("api.error", str(e)) - req_span.set_status(Status(StatusCode.ERROR)) - logger.error("请求失败!", e) - return None - - -def upload_task_file(task_info, ffmpeg_task): - tracer = get_tracer(__name__) - with get_tracer("api").start_as_current_span("upload_task_file") as span: - logger.info("开始上传文件: %s", task_info.get("id")) - span.set_attribute("file.id", task_info.get("id")) - with tracer.start_as_current_span("upload_task_file.request_upload_url") as req_span: - try: - req_span.set_attribute("http.method", "POST") - req_span.set_attribute("http.url", - '{0}/{1}/uploadUrl'.format(os.getenv('API_ENDPOINT'), task_info.get("id"))) - response = session.post('{0}/{1}/uploadUrl'.format(os.getenv('API_ENDPOINT'), task_info.get("id")), - json={ - 'accessKey': os.getenv('ACCESS_KEY'), - }, timeout=10) - req_span.set_attribute("http.status_code", response.status_code) - req_span.set_attribute("http.response", response.text) - response.raise_for_status() - req_span.set_status(Status(StatusCode.OK)) - except requests.RequestException as e: - span.set_attribute("api.error", str(e)) - req_span.set_status(Status(StatusCode.ERROR)) - logger.error("请求失败!", e) - return False - data = response.json() - url = data.get('data', "") - logger.info("开始上传文件: %s 至 %s", task_info.get("id"), url) - return oss.upload_to_oss(url, ffmpeg_task.get_output_file()) - - -def get_task_info(id): - try: - response = session.get(os.getenv('API_ENDPOINT') + "/" + id + "/info", params={ - 'accessKey': os.getenv('ACCESS_KEY'), - }, timeout=10) - response.raise_for_status() - except requests.RequestException as e: - logger.error("请求失败!", e) - return [] - data = response.json() - logger.debug("获取任务结果:【%s】", data) - if data.get('code', 0) == 200: - return data.get('data', {}) \ No newline at end of file diff --git a/util/ffmpeg.py b/util/ffmpeg.py deleted file mode 100644 index 1fd8fbf..0000000 --- a/util/ffmpeg.py +++ /dev/null @@ -1,257 +0,0 @@ -import json -import logging -import os -import subprocess -from datetime import datetime -from typing import Optional, IO - -from opentelemetry.trace import Status, StatusCode - -from entity.ffmpeg import FfmpegTask, ENCODER_ARGS, VIDEO_ARGS, AUDIO_ARGS, MUTE_AUDIO_INPUT, get_mp4toannexb_filter -from telemetry import get_tracer - -logger = logging.getLogger(__name__) - -def re_encode_and_annexb(file): - with get_tracer("ffmpeg").start_as_current_span("re_encode_and_annexb") as span: - span.set_attribute("file.path", file) - if not os.path.exists(file): - span.set_status(Status(StatusCode.ERROR)) - return file - logger.info("ReEncodeAndAnnexb: %s", file) - has_audio = not not probe_video_audio(file) - # 优先使用RE_ENCODE_VIDEO_ARGS环境变量,其次使用默认的VIDEO_ARGS - if os.getenv("RE_ENCODE_VIDEO_ARGS", False): - _video_args = tuple(os.getenv("RE_ENCODE_VIDEO_ARGS", "").split(" ")) - else: - _video_args = VIDEO_ARGS - # 优先使用RE_ENCODE_ENCODER_ARGS环境变量,其次使用默认的ENCODER_ARGS - if os.getenv("RE_ENCODE_ENCODER_ARGS", False): - _encoder_args = tuple(os.getenv("RE_ENCODE_ENCODER_ARGS", "").split(" ")) - else: - _encoder_args = ENCODER_ARGS - ffmpeg_process = subprocess.run(["ffmpeg", "-y", "-hide_banner", "-i", file, - *(set() if has_audio else MUTE_AUDIO_INPUT), - "-fps_mode", "cfr", - "-map", "0:v", "-map", "0:a" if has_audio else "1:a", - *_video_args, "-bsf:v", get_mp4toannexb_filter(), - *AUDIO_ARGS, "-bsf:a", "setts=pts=DTS", - *_encoder_args, "-shortest", "-fflags", "+genpts", - "-f", "mpegts", file + ".ts"]) - logger.info(" ".join(ffmpeg_process.args)) - span.set_attribute("ffmpeg.args", json.dumps(ffmpeg_process.args)) - logger.info("ReEncodeAndAnnexb: %s, returned: %s", file, ffmpeg_process.returncode) - span.set_attribute("ffmpeg.code", ffmpeg_process.returncode) - if ffmpeg_process.returncode == 0: - span.set_status(Status(StatusCode.OK)) - span.set_attribute("file.size", os.path.getsize(file+".ts")) - # os.remove(file) - return file+".ts" - else: - span.set_status(Status(StatusCode.ERROR)) - return file - -def start_render(ffmpeg_task: FfmpegTask): - tracer = get_tracer(__name__) - with tracer.start_as_current_span("start_render") as span: - span.set_attribute("ffmpeg.task", str(ffmpeg_task)) - if not ffmpeg_task.need_run(): - ffmpeg_task.set_output_file(ffmpeg_task.input_file[0]) - span.set_status(Status(StatusCode.OK)) - return True - ffmpeg_args = ffmpeg_task.get_ffmpeg_args() - if len(ffmpeg_args) == 0: - ffmpeg_task.set_output_file(ffmpeg_task.input_file[0]) - span.set_status(Status(StatusCode.OK)) - return True - # 通过环境变量传入通用FFmpeg参数 - common_args = os.getenv("FFMPEG_COMMON_ARGS", "").split() if os.getenv("FFMPEG_COMMON_ARGS") else [] - ffmpeg_process = subprocess.run(["ffmpeg", "-progress", "-", "-loglevel", "error", *common_args, *ffmpeg_args], stderr=subprocess.PIPE, **subprocess_args(True)) - span.set_attribute("ffmpeg.args", json.dumps(ffmpeg_process.args)) - logger.info(" ".join(ffmpeg_process.args)) - ffmpeg_final_out = handle_ffmpeg_output(ffmpeg_process.stdout) - span.set_attribute("ffmpeg.out", ffmpeg_final_out) - logger.info("FINISH TASK, OUTPUT IS %s", ffmpeg_final_out) - code = ffmpeg_process.returncode - span.set_attribute("ffmpeg.code", code) - if code != 0: - span.set_attribute("ffmpeg.err", str(ffmpeg_process.stderr)) - span.set_status(Status(StatusCode.ERROR, "FFMPEG异常退出")) - logger.error("FFMPEG ERROR: %s", ffmpeg_process.stderr) - return False - span.set_attribute("ffmpeg.out_file", ffmpeg_task.output_file) - try: - file_size = os.path.getsize(ffmpeg_task.output_file) - span.set_attribute("file.size", file_size) - if file_size < 4096: - span.set_status(Status(StatusCode.ERROR, "输出文件过小")) - logger.error("FFMPEG ERROR: OUTPUT FILE IS TOO SMALL") - return False - except OSError as e: - span.set_attribute("file.size", 0) - span.set_attribute("file.error", e.strerror) - span.set_status(Status(StatusCode.ERROR, "输出文件不存在")) - logger.error("FFMPEG ERROR: OUTPUT FILE NOT FOUND") - return False - span.set_status(Status(StatusCode.OK)) - return True - -def handle_ffmpeg_output(stdout: Optional[bytes]) -> str: - out_time = "0:0:0.0" - if stdout is None: - print("[!]STDOUT is null") - return out_time - speed = "0" - for line in stdout.split(b"\n"): - if line == b"": - break - if line.strip() == b"progress=end": - # 处理完毕 - break - if line.startswith(b"out_time="): - out_time = line.replace(b"out_time=", b"").decode().strip() - if line.startswith(b"speed="): - speed = line.replace(b"speed=", b"").decode().strip() - print("[ ]Speed:", out_time, "@", speed) - return out_time+"@"+speed - -def duration_str_to_float(duration_str: str) -> float: - _duration = datetime.strptime(duration_str, "%H:%M:%S.%f") - datetime(1900, 1, 1) - return _duration.total_seconds() - - -def probe_video_info(video_file): - tracer = get_tracer(__name__) - with tracer.start_as_current_span("probe_video_info") as span: - span.set_attribute("video.file", video_file) - # 获取宽度和高度 - result = subprocess.run( - ["ffprobe", '-v', 'error', '-select_streams', 'v:0', '-show_entries', 'stream=width,height:format=duration', '-of', - 'csv=s=x:p=0', video_file], - stderr=subprocess.STDOUT, - **subprocess_args(True) - ) - span.set_attribute("ffprobe.args", json.dumps(result.args)) - span.set_attribute("ffprobe.code", result.returncode) - if result.returncode != 0: - span.set_status(Status(StatusCode.ERROR)) - return 0, 0, 0 - all_result = result.stdout.decode('utf-8').strip() - span.set_attribute("ffprobe.out", all_result) - if all_result == '': - span.set_status(Status(StatusCode.ERROR)) - return 0, 0, 0 - span.set_status(Status(StatusCode.OK)) - wh, duration = all_result.split('\n') - width, height = wh.strip().split('x') - return int(width), int(height), float(duration) - - -def probe_video_audio(video_file, type=None): - tracer = get_tracer(__name__) - with tracer.start_as_current_span("probe_video_audio") as span: - span.set_attribute("video.file", video_file) - args = ["ffprobe", "-hide_banner", "-v", "error", "-select_streams", "a", "-show_entries", "stream=index", "-of", "csv=p=0"] - if type == 'concat': - args.append("-safe") - args.append("0") - args.append("-f") - args.append("concat") - args.append(video_file) - logger.info(" ".join(args)) - result = subprocess.run(args, stderr=subprocess.STDOUT, **subprocess_args(True)) - span.set_attribute("ffprobe.args", json.dumps(result.args)) - span.set_attribute("ffprobe.code", result.returncode) - logger.info("probe_video_audio: %s", result.stdout.decode('utf-8').strip()) - if result.returncode != 0: - return False - if result.stdout.decode('utf-8').strip() == '': - return False - return True - - -# 音频淡出2秒 -def fade_out_audio(file, duration, fade_out_sec = 2): - if type(duration) == str: - try: - duration = float(duration) - except Exception as e: - logger.error("duration is not float: %s", e) - return file - tracer = get_tracer(__name__) - with tracer.start_as_current_span("fade_out_audio") as span: - span.set_attribute("audio.file", file) - if duration <= fade_out_sec: - return file - else: - new_fn = file + "_.mp4" - if os.path.exists(new_fn): - os.remove(new_fn) - logger.info("delete tmp file: " + new_fn) - try: - process = subprocess.run(["ffmpeg", "-i", file, "-c:v", "copy", "-c:a", "aac", "-af", "afade=t=out:st=" + str(duration - fade_out_sec) + ":d=" + str(fade_out_sec), "-y", new_fn], **subprocess_args(True)) - span.set_attribute("ffmpeg.args", json.dumps(process.args)) - logger.info(" ".join(process.args)) - if process.returncode != 0: - span.set_status(Status(StatusCode.ERROR)) - logger.error("FFMPEG ERROR: %s", process.stderr) - return file - else: - span.set_status(Status(StatusCode.OK)) - return new_fn - except Exception as e: - span.set_status(Status(StatusCode.ERROR)) - logger.error("FFMPEG ERROR: %s", e) - return file - - - -# Create a set of arguments which make a ``subprocess.Popen`` (and -# variants) call work with or without Pyinstaller, ``--noconsole`` or -# not, on Windows and Linux. Typical use:: -# -# subprocess.call(['program_to_run', 'arg_1'], **subprocess_args()) -# -# When calling ``check_output``:: -# -# subprocess.check_output(['program_to_run', 'arg_1'], -# **subprocess_args(False)) -def subprocess_args(include_stdout=True): - # The following is true only on Windows. - if hasattr(subprocess, 'STARTUPINFO'): - # On Windows, subprocess calls will pop up a command window by default - # when run from Pyinstaller with the ``--noconsole`` option. Avoid this - # distraction. - si = subprocess.STARTUPINFO() - si.dwFlags |= subprocess.STARTF_USESHOWWINDOW - # Windows doesn't search the path by default. Pass it an environment so - # it will. - env = os.environ - else: - si = None - env = None - - # ``subprocess.check_output`` doesn't allow specifying ``stdout``:: - # - # Traceback (most recent call last): - # File "test_subprocess.py", line 58, in - # **subprocess_args(stdout=None)) - # File "C:\Python27\lib\subprocess.py", line 567, in check_output - # raise ValueError('stdout argument not allowed, it will be overridden.') - # ValueError: stdout argument not allowed, it will be overridden. - # - # So, add it only if it's needed. - if include_stdout: - ret = {'stdout': subprocess.PIPE} - else: - ret = {} - - # On Windows, running this from the binary produced by Pyinstaller - # with the ``--noconsole`` option requires redirecting everything - # (stdin, stdout, stderr) to avoid an OSError exception - # "[Error 6] the handle is invalid." - ret.update({'stdin': subprocess.PIPE, - 'startupinfo': si, - 'env': env}) - return ret - diff --git a/util/oss.py b/util/oss.py deleted file mode 100644 index cc0e293..0000000 --- a/util/oss.py +++ /dev/null @@ -1,155 +0,0 @@ -import logging -import os -import sys - -import requests -from opentelemetry.trace import Status, StatusCode - -from telemetry import get_tracer - -logger = logging.getLogger(__name__) - - -def _apply_http_replace_map(url): - """ - 应用 HTTP_REPLACE_MAP 环境变量替换 URL - :param str url: 原始 URL - :return str: 替换后的 URL - """ - replace_map = os.getenv("HTTP_REPLACE_MAP", "") - if not replace_map: - return url - replace_list = [i.split("|", 1) for i in replace_map.split(",")] - new_url = url - for (_src, _dst) in replace_list: - new_url = new_url.replace(_src, _dst) - if new_url != url: - logger.debug(f"HTTP_REPLACE_MAP: {url} -> {new_url}") - return new_url - - -def upload_to_oss(url, file_path): - """ - 使用签名URL上传文件到OSS - :param str url: 签名URL - :param str file_path: 文件路径 - :return bool: 是否成功 - """ - tracer = get_tracer(__name__) - with tracer.start_as_current_span("upload_to_oss") as span: - span.set_attribute("file.url", url) - span.set_attribute("file.path", file_path) - span.set_attribute("file.size", os.path.getsize(file_path)) - max_retries = 5 - retries = 0 - if os.getenv("UPLOAD_METHOD") == "rclone": - with tracer.start_as_current_span("rclone_to_oss") as r_span: - replace_map = os.getenv("RCLONE_REPLACE_MAP") - config_file = os.getenv("RCLONE_CONFIG_FILE") - rclone_config = "" - if config_file != "": - rclone_config = f"--config {config_file}" - r_span.set_attribute("rclone.replace_map", replace_map) - if replace_map != "": - replace_list = [i.split("|", 1) for i in replace_map.split(",")] - new_url = url - for (_src, _dst) in replace_list: - new_url = new_url.replace(_src, _dst) - new_url = new_url.split("?", 1)[0] - r_span.set_attribute("rclone.target_dir", new_url) - if new_url != url: - result = os.system(f"rclone copyto --no-check-dest --ignore-existing --multi-thread-chunk-size 8M --multi-thread-streams 8 {rclone_config} {file_path} {new_url}") - r_span.set_attribute("rclone.result", result) - if result == 0: - span.set_status(Status(StatusCode.OK)) - return True - else: - span.set_status(Status(StatusCode.ERROR)) - # 应用 HTTP_REPLACE_MAP 替换 URL - http_url = _apply_http_replace_map(url) - span.set_attribute("file.http_url", http_url) - while retries < max_retries: - with tracer.start_as_current_span("upload_to_oss.request") as req_span: - req_span.set_attribute("http.retry_count", retries) - try: - req_span.set_attribute("http.method", "PUT") - req_span.set_attribute("http.url", http_url) - with open(file_path, 'rb') as f: - response = requests.put(http_url, data=f, stream=True, timeout=60, headers={"Content-Type": "video/mp4"}) - req_span.set_attribute("http.status_code", response.status_code) - req_span.set_attribute("http.response", response.text) - response.raise_for_status() - req_span.set_status(Status(StatusCode.OK)) - span.set_status(Status(StatusCode.OK)) - return True - except requests.exceptions.Timeout: - req_span.set_attribute("http.error", "Timeout") - req_span.set_status(Status(StatusCode.ERROR)) - retries += 1 - logger.warning(f"Upload timed out. Retrying {retries}/{max_retries}...") - except Exception as e: - req_span.set_attribute("http.error", str(e)) - req_span.set_status(Status(StatusCode.ERROR)) - retries += 1 - logger.warning(f"Upload failed. Retrying {retries}/{max_retries}...") - span.set_status(Status(StatusCode.ERROR)) - return False - - -def download_from_oss(url, file_path, skip_if_exist=None): - """ - 使用签名URL下载文件到OSS - :param skip_if_exist: 如果存在就不下载了 - :param str url: 签名URL - :param Union[LiteralString, str, bytes] file_path: 文件路径 - :return bool: 是否成功 - """ - tracer = get_tracer(__name__) - with tracer.start_as_current_span("download_from_oss") as span: - span.set_attribute("file.url", url) - span.set_attribute("file.path", file_path) - - # 如果skip_if_exist为None,则从启动参数中读取 - if skip_if_exist is None: - skip_if_exist = 'skip_if_exist' in sys.argv - - if skip_if_exist and os.path.exists(file_path): - span.set_attribute("file.exist", True) - span.set_attribute("file.size", os.path.getsize(file_path)) - return True - logging.info("download_from_oss: %s", url) - file_dir, file_name = os.path.split(file_path) - if file_dir: - if not os.path.exists(file_dir): - os.makedirs(file_dir) - # 应用 HTTP_REPLACE_MAP 替换 URL - http_url = _apply_http_replace_map(url) - span.set_attribute("file.http_url", http_url) - max_retries = 5 - retries = 0 - while retries < max_retries: - with tracer.start_as_current_span("download_from_oss.request") as req_span: - req_span.set_attribute("http.retry_count", retries) - try: - req_span.set_attribute("http.method", "GET") - req_span.set_attribute("http.url", http_url) - response = requests.get(http_url, timeout=15) # 设置超时时间 - req_span.set_attribute("http.status_code", response.status_code) - with open(file_path, 'wb') as f: - f.write(response.content) - req_span.set_attribute("file.size", os.path.getsize(file_path)) - req_span.set_status(Status(StatusCode.OK)) - span.set_status(Status(StatusCode.OK)) - return True - except requests.exceptions.Timeout: - req_span.set_attribute("http.error", "Timeout") - req_span.set_status(Status(StatusCode.ERROR)) - retries += 1 - logger.warning(f"Download timed out. Retrying {retries}/{max_retries}...") - except Exception as e: - req_span.set_attribute("http.error", str(e)) - req_span.set_status(Status(StatusCode.ERROR)) - retries += 1 - logger.warning(f"Download failed. Retrying {retries}/{max_retries}...") - span.set_status(Status(StatusCode.ERROR)) - return False diff --git a/util/system.py b/util/system.py index b7f28a6..433307d 100644 --- a/util/system.py +++ b/util/system.py @@ -7,40 +7,20 @@ import os import platform -from datetime import datetime +import subprocess +from typing import Optional import psutil -from constant import SUPPORT_FEATURE, SOFTWARE_VERSION, V2_DEFAULT_CAPABILITIES +from constant import SOFTWARE_VERSION, DEFAULT_CAPABILITIES def get_sys_info(): """ - 获取系统信息(v1 格式) + 获取系统信息 Returns: dict: 系统信息字典 """ - info = { - 'version': SOFTWARE_VERSION, - 'client_datetime': datetime.now().isoformat(), - 'platform': platform.system(), - 'runtime_version': 'Python ' + platform.python_version(), - 'cpu_count': os.cpu_count(), - 'cpu_usage': psutil.cpu_percent(), - 'memory_total': psutil.virtual_memory().total, - 'memory_available': psutil.virtual_memory().available, - 'support_feature': SUPPORT_FEATURE - } - return info - - -def get_sys_info_v2(): - """ - 获取系统信息(v2 格式) - - Returns: - dict: v2 API 所需的系统信息字典 - """ mem = psutil.virtual_memory() info = { @@ -51,10 +31,11 @@ def get_sys_info_v2(): 'memoryAvailable': f"{mem.available // (1024**3)}GB", 'platform': platform.system(), 'pythonVersion': platform.python_version(), + 'version': SOFTWARE_VERSION, } # 尝试获取 GPU 信息 - gpu_info = _get_gpu_info() + gpu_info = get_gpu_info() if gpu_info: info['gpu'] = gpu_info @@ -68,10 +49,10 @@ def get_capabilities(): Returns: list: 能力列表 """ - return V2_DEFAULT_CAPABILITIES.copy() + return DEFAULT_CAPABILITIES.copy() -def _get_gpu_info(): +def get_gpu_info() -> Optional[str]: """ 尝试获取 GPU 信息 @@ -79,7 +60,6 @@ def _get_gpu_info(): str: GPU 信息,失败返回 None """ try: - import subprocess # 尝试使用 nvidia-smi result = subprocess.run( ['nvidia-smi', '--query-gpu=name', '--format=csv,noheader'], @@ -94,3 +74,30 @@ def _get_gpu_info(): pass return None + + +def get_ffmpeg_version() -> str: + """ + 获取 FFmpeg 版本 + + Returns: + str: FFmpeg 版本号 + """ + try: + result = subprocess.run( + ['ffmpeg', '-version'], + capture_output=True, + text=True, + timeout=5 + ) + if result.returncode == 0: + first_line = result.stdout.split('\n')[0] + # 解析版本号,例如 "ffmpeg version 6.0 ..." + parts = first_line.split() + for i, part in enumerate(parts): + if part == 'version' and i + 1 < len(parts): + return parts[i + 1] + except Exception: + pass + + return 'unknown'