From d770d849274b40dd786ea87ab390d839124290ab Mon Sep 17 00:00:00 2001 From: Jerry Yan <792602257@qq.com> Date: Fri, 12 Sep 2025 14:41:58 +0800 Subject: [PATCH] =?UTF-8?q?feat(=E9=87=8D=E6=9E=84):=20=E5=AE=9E=E7=8E=B0?= =?UTF-8?q?=E6=96=B0=E7=9A=84=E6=B8=B2=E6=9F=93=E6=9C=8D=E5=8A=A1=E6=9E=B6?= =?UTF-8?q?=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增 RenderTask --- app.py | 12 +- biz/ffmpeg.py | 45 +++-- biz/task.py | 79 +++++---- config/__init__.py | 3 + config/settings.py | 139 +++++++++++++++ entity/effects/__init__.py | 25 +++ entity/effects/base.py | 94 ++++++++++ entity/effects/camera_shot.py | 79 +++++++++ entity/effects/skip.py | 38 ++++ entity/effects/speed.py | 35 ++++ entity/effects/tail.py | 42 +++++ entity/effects/zoom.py | 89 ++++++++++ entity/ffmpeg_command_builder.py | 281 ++++++++++++++++++++++++++++++ entity/render_task.py | 146 ++++++++++++++++ index.py | 10 +- services/__init__.py | 12 ++ services/render_service.py | 237 +++++++++++++++++++++++++ services/task_service.py | 289 +++++++++++++++++++++++++++++++ services/template_service.py | 266 ++++++++++++++++++++++++++++ template/__init__.py | 157 ++++++----------- util/api.py | 7 +- util/exceptions.py | 72 ++++++++ 22 files changed, 1987 insertions(+), 170 deletions(-) create mode 100644 config/settings.py create mode 100644 entity/effects/__init__.py create mode 100644 entity/effects/base.py create mode 100644 entity/effects/camera_shot.py create mode 100644 entity/effects/skip.py create mode 100644 entity/effects/speed.py create mode 100644 entity/effects/tail.py create mode 100644 entity/effects/zoom.py create mode 100644 entity/ffmpeg_command_builder.py create mode 100644 entity/render_task.py create mode 100644 services/__init__.py create mode 100644 services/render_service.py create mode 100644 services/task_service.py create mode 100644 services/template_service.py create mode 100644 util/exceptions.py diff --git a/app.py b/app.py index 041dae7..fe1acfb 100644 --- a/app.py +++ b/app.py @@ -4,12 +4,14 @@ import flask import config import biz.task -import template +from services import DefaultTemplateService from telemetry import init_opentelemetry -from template import load_local_template from util import api -load_local_template() +# 使用新的服务架构 +template_service = DefaultTemplateService() +template_service.load_local_templates() + import logging LOGGER = logging.getLogger(__name__) @@ -27,11 +29,11 @@ def do_nothing(): @app.post('/') def do_task(task_id): task_info = api.get_task_info(task_id) - local_template_info = template.get_template_def(task_info.get("templateId")) + local_template_info = template_service.get_template(task_info.get("templateId")) template_info = api.get_template_info(task_info.get("templateId")) if local_template_info: if local_template_info.get("updateTime") != template_info.get("updateTime"): - template.download_template(task_info.get("templateId")) + template_service.download_template(task_info.get("templateId")) biz.task.start_task(task_info) return "OK" diff --git a/biz/ffmpeg.py b/biz/ffmpeg.py index 220d598..1b58512 100644 --- a/biz/ffmpeg.py +++ b/biz/ffmpeg.py @@ -5,7 +5,10 @@ from concurrent.futures import ThreadPoolExecutor from opentelemetry.trace import Status, StatusCode +# 使用新架构组件,保持对旧FfmpegTask的兼容 from entity.ffmpeg import FfmpegTask +from entity.render_task import RenderTask, TaskType +from services import DefaultRenderService import logging from util import ffmpeg, oss @@ -13,6 +16,14 @@ from util.ffmpeg import fade_out_audio from telemetry import get_tracer logger = logging.getLogger('biz/ffmpeg') +_render_service = None + +def _get_render_service(): + """获取渲染服务实例""" + global _render_service + if _render_service is None: + _render_service = DefaultRenderService() + return _render_service def parse_ffmpeg_task(task_info, template_info): @@ -130,24 +141,25 @@ def check_placeholder_exist_with_count(placeholder_id, task_params, required_cou def start_ffmpeg_task(ffmpeg_task): + """启动FFmpeg任务 - 使用新的渲染服务""" tracer = get_tracer(__name__) with tracer.start_as_current_span("start_ffmpeg_task") as span: - for task in ffmpeg_task.analyze_input_render_tasks(): - result = start_ffmpeg_task(task) - if not result: - return False - ffmpeg_task.correct_task_type() - span.set_attribute("task.type", ffmpeg_task.task_type) - span.set_attribute("task.center_cut", str(ffmpeg_task.center_cut)) - span.set_attribute("task.frame_rate", ffmpeg_task.frame_rate) - span.set_attribute("task.resolution", str(ffmpeg_task.resolution)) - span.set_attribute("task.ext_data", json.dumps(ffmpeg_task.ext_data)) - result = ffmpeg.start_render(ffmpeg_task) - if not result: + try: + # 使用新的渲染服务 + render_service = _get_render_service() + result = render_service.render(ffmpeg_task) + + if result: + span.set_status(Status(StatusCode.OK)) + else: + span.set_status(Status(StatusCode.ERROR)) + + return result + + except Exception as e: span.set_status(Status(StatusCode.ERROR)) + logger.error(f"FFmpeg task failed: {e}", exc_info=True) return False - span.set_status(Status(StatusCode.OK)) - return True def clear_task_tmp_file(ffmpeg_task): @@ -166,7 +178,8 @@ def clear_task_tmp_file(ffmpeg_task): def probe_video_info(ffmpeg_task): - # 获取视频长度宽度和时长 - return ffmpeg.probe_video_info(ffmpeg_task.get_output_file()) + """获取视频长度宽度和时长 - 使用新的渲染服务""" + render_service = _get_render_service() + return render_service.get_video_info(ffmpeg_task.get_output_file()) diff --git a/biz/task.py b/biz/task.py index 12548be..a99c54d 100644 --- a/biz/task.py +++ b/biz/task.py @@ -1,44 +1,55 @@ import json +import logging from opentelemetry.trace import Status, StatusCode -from biz.ffmpeg import parse_ffmpeg_task, start_ffmpeg_task, clear_task_tmp_file, probe_video_info, fade_out_audio +# 使用新的服务架构 +from services import DefaultTaskService, DefaultRenderService, DefaultTemplateService from telemetry import get_tracer -from template import get_template_def -from util import api +logger = logging.getLogger(__name__) + +# 创建服务实例(单例模式) +_render_service = None +_template_service = None +_task_service = None + +def _get_services(): + """获取服务实例(懒加载)""" + global _render_service, _template_service, _task_service + + if _render_service is None: + _render_service = DefaultRenderService() + + if _template_service is None: + _template_service = DefaultTemplateService() + _template_service.load_local_templates() # 加载本地模板 + + if _task_service is None: + _task_service = DefaultTaskService(_render_service, _template_service) + + return _task_service, _render_service, _template_service def start_task(task_info): + """启动任务处理(保持向后兼容的接口)""" tracer = get_tracer(__name__) - with tracer.start_as_current_span("start_task") as span: - task_info = api.normalize_task(task_info) - span.set_attribute("task", json.dumps(task_info)) - span.set_attribute("scenicId", task_info.get("scenicId", "?")) - span.set_attribute("templateId", task_info.get("templateId")) - template_info = get_template_def(task_info.get("templateId")) - api.report_task_start(task_info) - ffmpeg_task = parse_ffmpeg_task(task_info, template_info) - result = start_ffmpeg_task(ffmpeg_task) - if not result: + with tracer.start_as_current_span("start_task_legacy") as span: + try: + task_service, _, _ = _get_services() + + # 使用新的任务服务处理 + result = task_service.process_task(task_info) + + if result: + span.set_status(Status(StatusCode.OK)) + logger.info("Task completed successfully: %s", task_info.get("id")) + else: + span.set_status(Status(StatusCode.ERROR)) + logger.error("Task failed: %s", task_info.get("id")) + + return None # 保持原有返回值格式 + + except Exception as e: span.set_status(Status(StatusCode.ERROR)) - return api.report_task_failed(task_info) - width, height, duration = probe_video_info(ffmpeg_task) - span.set_attribute("probe.width", width) - span.set_attribute("probe.height", height) - span.set_attribute("probe.duration", duration) - # 音频淡出 - new_fn = fade_out_audio(ffmpeg_task.get_output_file(), duration) - ffmpeg_task.set_output_file(new_fn) - oss_result = api.upload_task_file(task_info, ffmpeg_task) - if not oss_result: - span.set_status(Status(StatusCode.ERROR)) - return api.report_task_failed(task_info) - # 获取视频长度宽度和时长 - clear_task_tmp_file(ffmpeg_task) - api.report_task_success(task_info, videoInfo={ - "width": width, - "height": height, - "duration": duration - }) - span.set_status(Status(StatusCode.OK)) - return None + logger.error("Task processing failed: %s", e, exc_info=True) + return None diff --git a/config/__init__.py b/config/__init__.py index f749288..6831a28 100644 --- a/config/__init__.py +++ b/config/__init__.py @@ -3,6 +3,9 @@ import logging from logging.handlers import TimedRotatingFileHandler from dotenv import load_dotenv +# 导入新的配置系统,保持向后兼容 +from .settings import get_config, get_ffmpeg_config, get_api_config, get_storage_config, get_server_config + load_dotenv() logging.basicConfig(level=logging.INFO) root_logger = logging.getLogger() diff --git a/config/settings.py b/config/settings.py new file mode 100644 index 0000000..a070af3 --- /dev/null +++ b/config/settings.py @@ -0,0 +1,139 @@ +import os +from dataclasses import dataclass +from typing import Dict, List, Optional, Union +import logging +from dotenv import load_dotenv + +load_dotenv() + +@dataclass +class FFmpegConfig: + """FFmpeg相关配置""" + encoder_args: List[str] + video_args: List[str] + audio_args: List[str] + default_args: List[str] + old_ffmpeg: bool = False + re_encode_video_args: Optional[List[str]] = None + re_encode_encoder_args: Optional[List[str]] = None + + @classmethod + def from_env(cls) -> 'FFmpegConfig': + encoder_args = os.getenv("ENCODER_ARGS", "-c:v h264").split(" ") + video_args = os.getenv("VIDEO_ARGS", "-profile:v high -level:v 4").split(" ") + audio_args = ["-c:a", "aac", "-b:a", "128k", "-ar", "48000", "-ac", "2"] + default_args = ["-shortest"] + + re_encode_video_args = None + if os.getenv("RE_ENCODE_VIDEO_ARGS"): + re_encode_video_args = os.getenv("RE_ENCODE_VIDEO_ARGS").split(" ") + + re_encode_encoder_args = None + if os.getenv("RE_ENCODE_ENCODER_ARGS"): + re_encode_encoder_args = os.getenv("RE_ENCODE_ENCODER_ARGS").split(" ") + + return cls( + encoder_args=encoder_args, + video_args=video_args, + audio_args=audio_args, + default_args=default_args, + old_ffmpeg=bool(os.getenv("OLD_FFMPEG", False)), + re_encode_video_args=re_encode_video_args, + re_encode_encoder_args=re_encode_encoder_args + ) + +@dataclass +class APIConfig: + """API相关配置""" + endpoint: str + access_key: str + timeout: int = 10 + redirect_to_url: Optional[str] = None + + @classmethod + def from_env(cls) -> 'APIConfig': + endpoint = os.getenv('API_ENDPOINT', '') + if not endpoint: + raise ValueError("API_ENDPOINT environment variable is required") + + access_key = os.getenv('ACCESS_KEY', '') + if not access_key: + raise ValueError("ACCESS_KEY environment variable is required") + + return cls( + endpoint=endpoint, + access_key=access_key, + timeout=int(os.getenv('API_TIMEOUT', '10')), + redirect_to_url=os.getenv("REDIRECT_TO_URL") or None + ) + +@dataclass +class StorageConfig: + """存储相关配置""" + template_dir: str + + @classmethod + def from_env(cls) -> 'StorageConfig': + template_dir = os.getenv('TEMPLATE_DIR', './template') + return cls(template_dir=template_dir) + +@dataclass +class ServerConfig: + """服务器相关配置""" + host: str = "0.0.0.0" + port: int = 9998 + debug: bool = False + + @classmethod + def from_env(cls) -> 'ServerConfig': + return cls( + host=os.getenv('HOST', '0.0.0.0'), + port=int(os.getenv('PORT', '9998')), + debug=bool(os.getenv('DEBUG', False)) + ) + +@dataclass +class AppConfig: + """应用总配置""" + ffmpeg: FFmpegConfig + api: APIConfig + storage: StorageConfig + server: ServerConfig + + @classmethod + def from_env(cls) -> 'AppConfig': + return cls( + ffmpeg=FFmpegConfig.from_env(), + api=APIConfig.from_env(), + storage=StorageConfig.from_env(), + server=ServerConfig.from_env() + ) + +# 全局配置实例 +_config: Optional[AppConfig] = None + +def get_config() -> AppConfig: + """获取全局配置实例""" + global _config + if _config is None: + _config = AppConfig.from_env() + return _config + +def reload_config() -> AppConfig: + """重新加载配置""" + global _config + _config = AppConfig.from_env() + return _config + +# 向后兼容的配置获取函数 +def get_ffmpeg_config() -> FFmpegConfig: + return get_config().ffmpeg + +def get_api_config() -> APIConfig: + return get_config().api + +def get_storage_config() -> StorageConfig: + return get_config().storage + +def get_server_config() -> ServerConfig: + return get_config().server \ No newline at end of file diff --git a/entity/effects/__init__.py b/entity/effects/__init__.py new file mode 100644 index 0000000..93e89de --- /dev/null +++ b/entity/effects/__init__.py @@ -0,0 +1,25 @@ +from .base import EffectProcessor, EffectRegistry +from .camera_shot import CameraShotEffect +from .speed import SpeedEffect +from .zoom import ZoomEffect +from .skip import SkipEffect +from .tail import TailEffect + +# 注册所有效果处理器 +registry = EffectRegistry() +registry.register('cameraShot', CameraShotEffect) +registry.register('ospeed', SpeedEffect) +registry.register('zoom', ZoomEffect) +registry.register('skip', SkipEffect) +registry.register('tail', TailEffect) + +__all__ = [ + 'EffectProcessor', + 'EffectRegistry', + 'registry', + 'CameraShotEffect', + 'SpeedEffect', + 'ZoomEffect', + 'SkipEffect', + 'TailEffect' +] \ No newline at end of file diff --git a/entity/effects/base.py b/entity/effects/base.py new file mode 100644 index 0000000..579753c --- /dev/null +++ b/entity/effects/base.py @@ -0,0 +1,94 @@ +from abc import ABC, abstractmethod +from typing import Dict, List, Type, Any, Optional +import json +import logging + +logger = logging.getLogger(__name__) + +class EffectProcessor(ABC): + """效果处理器抽象基类""" + + def __init__(self, params: str = "", ext_data: Optional[Dict[str, Any]] = None): + self.params = params + self.ext_data = ext_data or {} + self.frame_rate = 25 # 默认帧率 + + @abstractmethod + def validate_params(self) -> bool: + """验证参数是否有效""" + pass + + @abstractmethod + def generate_filter_args(self, video_input: str, effect_index: int) -> tuple[List[str], str]: + """ + 生成FFmpeg滤镜参数 + + Args: + video_input: 输入视频流标识符 (例如: "[0:v]", "[v_eff1]") + effect_index: 效果索引,用于生成唯一的输出标识符 + + Returns: + tuple: (filter_args_list, output_stream_identifier) + """ + pass + + @abstractmethod + def get_effect_name(self) -> str: + """获取效果名称""" + pass + + def parse_params(self) -> List[str]: + """解析参数字符串为列表""" + if not self.params: + return [] + return self.params.split(',') + + def get_pos_json(self) -> Dict[str, Any]: + """获取位置JSON数据""" + pos_json_str = self.ext_data.get('posJson', '{}') + try: + return json.loads(pos_json_str) if pos_json_str != '{}' else {} + except Exception as e: + logger.warning(f"Failed to parse posJson: {e}") + return {} + +class EffectRegistry: + """效果处理器注册表""" + + def __init__(self): + self._processors: Dict[str, Type[EffectProcessor]] = {} + + def register(self, name: str, processor_class: Type[EffectProcessor]): + """注册效果处理器""" + if not issubclass(processor_class, EffectProcessor): + raise ValueError(f"{processor_class} must be a subclass of EffectProcessor") + self._processors[name] = processor_class + logger.debug(f"Registered effect processor: {name}") + + def get_processor(self, effect_name: str, params: str = "", ext_data: Optional[Dict[str, Any]] = None) -> Optional[EffectProcessor]: + """获取效果处理器实例""" + if effect_name not in self._processors: + logger.warning(f"Unknown effect: {effect_name}") + return None + + processor_class = self._processors[effect_name] + return processor_class(params, ext_data) + + def list_effects(self) -> List[str]: + """列出所有注册的效果""" + return list(self._processors.keys()) + + def parse_effect_string(self, effect_string: str) -> tuple[str, str]: + """ + 解析效果字符串 + + Args: + effect_string: 效果字符串,格式为 "effect_name:params" + + Returns: + tuple: (effect_name, params) + """ + if ':' in effect_string: + parts = effect_string.split(':', 2) + return parts[0], parts[1] if len(parts) > 1 else "" + return effect_string, "" \ No newline at end of file diff --git a/entity/effects/camera_shot.py b/entity/effects/camera_shot.py new file mode 100644 index 0000000..6fbddfb --- /dev/null +++ b/entity/effects/camera_shot.py @@ -0,0 +1,79 @@ +from typing import List, Dict, Any +from .base import EffectProcessor + +class CameraShotEffect(EffectProcessor): + """相机镜头效果处理器""" + + def validate_params(self) -> bool: + """验证参数:start_time,duration,rotate_deg""" + params = self.parse_params() + if not params: + return True # 使用默认参数 + + # 参数格式: "start_time,duration,rotate_deg" + if len(params) > 3: + return False + + try: + for i, param in enumerate(params): + if param == '': + continue + if i == 2: # rotate_deg + int(param) + else: # start_time, duration + float(param) + return True + except ValueError: + return False + + def generate_filter_args(self, video_input: str, effect_index: int) -> tuple[List[str], str]: + """生成相机镜头效果的滤镜参数""" + if not self.validate_params(): + return [], video_input + + params = self.parse_params() + + # 设置默认值 + start = 3.0 + duration = 1.0 + rotate_deg = 0 + + if len(params) >= 1 and params[0] != '': + start = float(params[0]) + if len(params) >= 2 and params[1] != '': + duration = float(params[1]) + if len(params) >= 3 and params[2] != '': + rotate_deg = int(params[2]) + + filter_args = [] + + # 生成输出流标识符 + start_out_str = "[eff_s]" + mid_out_str = "[eff_m]" + end_out_str = "[eff_e]" + final_output = f"[v_eff{effect_index}]" + + # 分割视频流为三部分 + filter_args.append(f"{video_input}split=3{start_out_str}{mid_out_str}{end_out_str}") + + # 选择开始部分帧 + filter_args.append(f"{start_out_str}select=lt(n\\,{int(start * self.frame_rate)}){start_out_str}") + + # 选择结束部分帧 + filter_args.append(f"{end_out_str}select=gt(n\\,{int(start * self.frame_rate)}){end_out_str}") + + # 选择中间特定帧并扩展 + filter_args.append(f"{mid_out_str}select=eq(n\\,{int(start * self.frame_rate)}){mid_out_str}") + filter_args.append(f"{mid_out_str}tpad=start_mode=clone:start_duration={duration:.4f}{mid_out_str}") + + # 如果需要旋转 + if rotate_deg != 0: + filter_args.append(f"{mid_out_str}rotate=PI*{rotate_deg}/180{mid_out_str}") + + # 连接三部分 + filter_args.append(f"{start_out_str}{mid_out_str}{end_out_str}concat=n=3:v=1:a=0,setpts=N/{self.frame_rate}/TB{final_output}") + + return filter_args, final_output + + def get_effect_name(self) -> str: + return "cameraShot" \ No newline at end of file diff --git a/entity/effects/skip.py b/entity/effects/skip.py new file mode 100644 index 0000000..6e85040 --- /dev/null +++ b/entity/effects/skip.py @@ -0,0 +1,38 @@ +from typing import List +from .base import EffectProcessor + +class SkipEffect(EffectProcessor): + """跳过开头效果处理器""" + + def validate_params(self) -> bool: + """验证参数:跳过的秒数""" + if not self.params: + return True # 默认不跳过 + + try: + skip_seconds = float(self.params) + return skip_seconds >= 0 + except ValueError: + return False + + def generate_filter_args(self, video_input: str, effect_index: int) -> tuple[List[str], str]: + """生成跳过开头效果的滤镜参数""" + if not self.validate_params(): + return [], video_input + + if not self.params: + return [], video_input + + skip_seconds = float(self.params) + if skip_seconds <= 0: + return [], video_input + + output_stream = f"[v_eff{effect_index}]" + + # 使用trim滤镜跳过开头 + filter_args = [f"{video_input}trim=start={skip_seconds}{output_stream}"] + + return filter_args, output_stream + + def get_effect_name(self) -> str: + return "skip" \ No newline at end of file diff --git a/entity/effects/speed.py b/entity/effects/speed.py new file mode 100644 index 0000000..608320f --- /dev/null +++ b/entity/effects/speed.py @@ -0,0 +1,35 @@ +from typing import List +from .base import EffectProcessor + +class SpeedEffect(EffectProcessor): + """视频变速效果处理器""" + + def validate_params(self) -> bool: + """验证参数:速度倍数""" + if not self.params: + return True # 默认不变速 + + try: + speed = float(self.params) + return speed > 0 + except ValueError: + return False + + def generate_filter_args(self, video_input: str, effect_index: int) -> tuple[List[str], str]: + """生成变速效果的滤镜参数""" + if not self.validate_params(): + return [], video_input + + if not self.params or self.params == "1": + return [], video_input # 不需要变速 + + speed = float(self.params) + output_stream = f"[v_eff{effect_index}]" + + # 使用setpts进行变速 + filter_args = [f"{video_input}setpts={speed}*PTS{output_stream}"] + + return filter_args, output_stream + + def get_effect_name(self) -> str: + return "ospeed" \ No newline at end of file diff --git a/entity/effects/tail.py b/entity/effects/tail.py new file mode 100644 index 0000000..e06d98e --- /dev/null +++ b/entity/effects/tail.py @@ -0,0 +1,42 @@ +from typing import List +from .base import EffectProcessor + +class TailEffect(EffectProcessor): + """保留末尾效果处理器""" + + def validate_params(self) -> bool: + """验证参数:保留的秒数""" + if not self.params: + return True # 默认不截取 + + try: + tail_seconds = float(self.params) + return tail_seconds >= 0 + except ValueError: + return False + + def generate_filter_args(self, video_input: str, effect_index: int) -> tuple[List[str], str]: + """生成保留末尾效果的滤镜参数""" + if not self.validate_params(): + return [], video_input + + if not self.params: + return [], video_input + + tail_seconds = float(self.params) + if tail_seconds <= 0: + return [], video_input + + output_stream = f"[v_eff{effect_index}]" + + # 使用reverse+trim+reverse的方法来精确获取最后N秒 + filter_args = [ + f"{video_input}reverse[v_rev{effect_index}]", + f"[v_rev{effect_index}]trim=duration={tail_seconds}[v_trim{effect_index}]", + f"[v_trim{effect_index}]reverse{output_stream}" + ] + + return filter_args, output_stream + + def get_effect_name(self) -> str: + return "tail" \ No newline at end of file diff --git a/entity/effects/zoom.py b/entity/effects/zoom.py new file mode 100644 index 0000000..4c2c28f --- /dev/null +++ b/entity/effects/zoom.py @@ -0,0 +1,89 @@ +from typing import List +import json +from .base import EffectProcessor + +class ZoomEffect(EffectProcessor): + """缩放效果处理器""" + + def validate_params(self) -> bool: + """验证参数:start_time,zoom_factor,duration""" + params = self.parse_params() + if len(params) < 3: + return False + + try: + start_time = float(params[0]) + zoom_factor = float(params[1]) + duration = float(params[2]) + + return (start_time >= 0 and + zoom_factor > 0 and + duration >= 0) + except (ValueError, IndexError): + return False + + def generate_filter_args(self, video_input: str, effect_index: int) -> tuple[List[str], str]: + """生成缩放效果的滤镜参数""" + if not self.validate_params(): + return [], video_input + + params = self.parse_params() + start_time = float(params[0]) + zoom_factor = float(params[1]) + duration = float(params[2]) + + if zoom_factor == 1: + return [], video_input # 不需要缩放 + + output_stream = f"[v_eff{effect_index}]" + + # 获取缩放中心点 + center_x, center_y = self._get_zoom_center() + + filter_args = [] + + if duration == 0: + # 静态缩放(整个视频时长) + x_expr = f"({center_x})-(ow*zoom)/2" + y_expr = f"({center_y})-(oh*zoom)/2" + filter_args.append( + f"{video_input}trim=start={start_time},zoompan=z={zoom_factor}:x={x_expr}:y={y_expr}:d=1{output_stream}" + ) + else: + # 动态缩放(指定时间段内) + zoom_expr = f"if(between(t\\,{start_time}\\,{start_time + duration})\\,{zoom_factor}\\,1)" + x_expr = f"({center_x})-(ow*zoom)/2" + y_expr = f"({center_y})-(oh*zoom)/2" + filter_args.append( + f"{video_input}zoompan=z={zoom_expr}:x={x_expr}:y={y_expr}:d=1{output_stream}" + ) + + return filter_args, output_stream + + def _get_zoom_center(self) -> tuple[str, str]: + """获取缩放中心点坐标表达式""" + # 默认中心点 + center_x = "iw/2" + center_y = "ih/2" + + pos_json = self.get_pos_json() + if pos_json: + _f_x = pos_json.get('ltX', 0) + _f_x2 = pos_json.get('rbX', 0) + _f_y = pos_json.get('ltY', 0) + _f_y2 = pos_json.get('rbY', 0) + _v_w = pos_json.get('imgWidth', 1) + _v_h = pos_json.get('imgHeight', 1) + + if _v_w > 0 and _v_h > 0: + # 计算坐标系统中的中心点 + center_x_ratio = (_f_x + _f_x2) / (2 * _v_w) + center_y_ratio = (_f_y + _f_y2) / (2 * _v_h) + # 转换为视频坐标系统 + center_x = f"iw*{center_x_ratio:.6f}" + center_y = f"ih*{center_y_ratio:.6f}" + + return center_x, center_y + + def get_effect_name(self) -> str: + return "zoom" \ No newline at end of file diff --git a/entity/ffmpeg_command_builder.py b/entity/ffmpeg_command_builder.py new file mode 100644 index 0000000..b2025f0 --- /dev/null +++ b/entity/ffmpeg_command_builder.py @@ -0,0 +1,281 @@ +import json +import os +import time +from typing import List, Optional + +from config.settings import get_ffmpeg_config +from entity.render_task import RenderTask, TaskType +from entity.effects import registry as effect_registry +from util.exceptions import FFmpegError +from util.ffmpeg import probe_video_info, probe_video_audio +import logging + +logger = logging.getLogger(__name__) + +class FFmpegCommandBuilder: + """FFmpeg命令构建器""" + + def __init__(self, task: RenderTask): + self.task = task + self.config = get_ffmpeg_config() + + def build_command(self) -> List[str]: + """构建FFmpeg命令""" + self.task.update_task_type() + + if self.task.task_type == TaskType.COPY: + return self._build_copy_command() + elif self.task.task_type == TaskType.CONCAT: + return self._build_concat_command() + elif self.task.task_type == TaskType.ENCODE: + return self._build_encode_command() + else: + raise FFmpegError(f"Unsupported task type: {self.task.task_type}") + + def _build_copy_command(self) -> List[str]: + """构建复制命令""" + if len(self.task.input_files) == 1: + input_file = self.task.input_files[0] + if input_file == self.task.output_file: + return [] # 不需要处理 + + return [ + "ffmpeg", "-y", "-hide_banner", + "-i", self.task.input_files[0], + "-c", "copy", + self.task.output_file + ] + + def _build_concat_command(self) -> List[str]: + """构建拼接命令""" + args = ["ffmpeg", "-y", "-hide_banner"] + input_args = [] + output_args = [*self.config.default_args] + filter_args = [] + + if len(self.task.input_files) == 1: + # 单个文件 + file = self.task.input_files[0] + input_args.extend(["-i", file]) + self.task.mute = not probe_video_audio(file) + else: + # 多个文件使用concat协议 + tmp_file = f"tmp_concat_{time.time()}.txt" + with open(tmp_file, "w", encoding="utf-8") as f: + for input_file in self.task.input_files: + f.write(f"file '{input_file}'\n") + input_args.extend(["-f", "concat", "-safe", "0", "-i", tmp_file]) + self.task.mute = not probe_video_audio(tmp_file, "concat") + + # 视频流映射 + output_args.extend(["-map", "0:v", "-c:v", "copy"]) + + # 音频处理 + audio_output_str = self._handle_audio_concat(input_args, filter_args) + if audio_output_str: + output_args.extend(["-map", audio_output_str]) + output_args.extend(self.config.audio_args) + + # annexb处理 + if self.task.annexb: + output_args.extend(["-bsf:v", self._get_mp4toannexb_filter()]) + output_args.extend(["-bsf:a", "setts=pts=DTS"]) + output_args.extend(["-f", "mpegts"]) + else: + output_args.extend(["-f", "mp4"]) + + filter_complex = ["-filter_complex", ";".join(filter_args)] if filter_args else [] + + return args + input_args + filter_complex + output_args + [self.task.output_file] + + def _build_encode_command(self) -> List[str]: + """构建编码命令""" + args = ["ffmpeg", "-y", "-hide_banner"] + input_args = [] + filter_args = [] + output_args = [ + *self.config.video_args, + *self.config.audio_args, + *self.config.encoder_args, + *self.config.default_args + ] + + # annexb处理 + if self.task.annexb: + output_args.extend(["-bsf:v", self._get_mp4toannexb_filter()]) + output_args.extend(["-reset_timestamps", "1"]) + + # 处理输入文件 + for input_file in self.task.input_files: + input_args.extend(["-i", input_file]) + + # 处理视频流 + video_output_str = "[0:v]" + effect_index = 0 + + # 处理中心裁剪 + if self.task.center_cut == 1: + video_output_str, effect_index = self._add_center_cut(filter_args, video_output_str, effect_index) + + # 处理缩放裁剪 + if self.task.zoom_cut == 1 and self.task.resolution: + video_output_str, effect_index = self._add_zoom_cut(filter_args, video_output_str, effect_index) + + # 处理效果 + video_output_str, effect_index = self._add_effects(filter_args, video_output_str, effect_index) + + # 处理分辨率 + if self.task.resolution: + filter_args.append(f"{video_output_str}scale={self.task.resolution.replace('x', ':')}[v]") + video_output_str = "[v]" + + # 处理LUT + for lut in self.task.luts: + filter_args.append(f"{video_output_str}lut3d=file={lut}{video_output_str}") + + # 处理覆盖层 + video_output_str = self._add_overlays(input_args, filter_args, video_output_str) + + # 处理字幕 + for subtitle in self.task.subtitles: + filter_args.append(f"{video_output_str}ass={subtitle}[v]") + video_output_str = "[v]" + + # 映射视频流 + output_args.extend(["-map", video_output_str]) + output_args.extend(["-r", str(self.task.frame_rate)]) + output_args.extend(["-fps_mode", "cfr"]) + + # 处理音频 + audio_output_str = self._handle_audio_encode(input_args, filter_args) + if audio_output_str: + output_args.extend(["-map", audio_output_str]) + + filter_complex = ["-filter_complex", ";".join(filter_args)] if filter_args else [] + + return args + input_args + filter_complex + output_args + [self.task.output_file] + + def _add_center_cut(self, filter_args: List[str], video_input: str, effect_index: int) -> tuple[str, int]: + """添加中心裁剪""" + pos_json = self.task.ext_data.get('posJson', '{}') + try: + pos_data = json.loads(pos_json) if pos_json != '{}' else {} + except: + pos_data = {} + + _v_w = pos_data.get('imgWidth', 1) + _f_x = pos_data.get('ltX', 0) + _f_x2 = pos_data.get('rbX', 0) + _x = f'{float((_f_x2 + _f_x)/(2 * _v_w)):.4f}*iw-ih*ih/(2*iw)' + + filter_args.append(f"{video_input}crop=x={_x}:y=0:w=ih*ih/iw:h=ih[v_cut{effect_index}]") + return f"[v_cut{effect_index}]", effect_index + 1 + + def _add_zoom_cut(self, filter_args: List[str], video_input: str, effect_index: int) -> tuple[str, int]: + """添加缩放裁剪""" + # 获取输入视频尺寸 + input_file = self.task.input_files[0] + _iw, _ih, _ = probe_video_info(input_file) + + _w, _h = self.task.resolution.split('x', 1) + pos_json = self.task.ext_data.get('posJson', '{}') + try: + pos_data = json.loads(pos_json) if pos_json != '{}' else {} + except: + pos_data = {} + + _v_w = pos_data.get('imgWidth', 1) + _v_h = pos_data.get('imgHeight', 1) + _f_x = pos_data.get('ltX', 0) + _f_x2 = pos_data.get('rbX', 0) + _f_y = pos_data.get('ltY', 0) + _f_y2 = pos_data.get('rbY', 0) + + _x = min(max(0, int((_f_x + _f_x2) / 2 - int(_w) / 2)), _iw - int(_w)) + _y = min(max(0, int((_f_y + _f_y2) / 2 - int(_h) / 2)), _ih - int(_h)) + + filter_args.append(f"{video_input}crop=x={_x}:y={_y}:w={_w}:h={_h}[vz_cut{effect_index}]") + return f"[vz_cut{effect_index}]", effect_index + 1 + + def _add_effects(self, filter_args: List[str], video_input: str, effect_index: int) -> tuple[str, int]: + """添加效果处理""" + current_input = video_input + + for effect_str in self.task.effects: + effect_name, params = effect_registry.parse_effect_string(effect_str) + processor = effect_registry.get_processor(effect_name, params, self.task.ext_data) + + if processor: + processor.frame_rate = self.task.frame_rate + effect_filters, output_stream = processor.generate_filter_args(current_input, effect_index) + + if effect_filters: + filter_args.extend(effect_filters) + current_input = output_stream + effect_index += 1 + + return current_input, effect_index + + def _add_overlays(self, input_args: List[str], filter_args: List[str], video_input: str) -> str: + """添加覆盖层""" + current_input = video_input + + for overlay in self.task.overlays: + input_index = input_args.count("-i") // 2 # 每个输入占两个参数 -i filename + input_args.extend(["-i", overlay]) + + if self.config.old_ffmpeg: + filter_args.append(f"{current_input}[{input_index}:v]scale2ref=iw:ih[v]") + else: + filter_args.append(f"{current_input}[{input_index}:v]scale=rw:rh[v]") + + filter_args.append(f"[v][{input_index}:v]overlay=1:eof_action=endall[v]") + current_input = "[v]" + + return current_input + + def _handle_audio_concat(self, input_args: List[str], filter_args: List[str]) -> Optional[str]: + """处理concat模式的音频""" + audio_output_str = "" + + if self.task.mute: + input_index = input_args.count("-i") // 2 + input_args.extend(["-f", "lavfi", "-i", "anullsrc=cl=stereo:r=48000"]) + audio_output_str = f"[{input_index}:a]" + else: + audio_output_str = "[0:a]" + + for audio in self.task.audios: + input_index = input_args.count("-i") // 2 + input_args.extend(["-i", audio.replace("\\", "/")]) + filter_args.append(f"{audio_output_str}[{input_index}:a]amix=duration=shortest:dropout_transition=0:normalize=0[a]") + audio_output_str = "[a]" + + return audio_output_str.strip("[]") if audio_output_str else None + + def _handle_audio_encode(self, input_args: List[str], filter_args: List[str]) -> Optional[str]: + """处理encode模式的音频""" + audio_output_str = "" + + if self.task.mute: + input_index = input_args.count("-i") // 2 + input_args.extend(["-f", "lavfi", "-i", "anullsrc=cl=stereo:r=48000"]) + filter_args.append(f"[{input_index}:a]acopy[a]") + audio_output_str = "[a]" + else: + audio_output_str = "[0:a]" + + for audio in self.task.audios: + input_index = input_args.count("-i") // 2 + input_args.extend(["-i", audio.replace("\\", "/")]) + filter_args.append(f"{audio_output_str}[{input_index}:a]amix=duration=shortest:dropout_transition=0:normalize=0[a]") + audio_output_str = "[a]" + + return audio_output_str if audio_output_str else None + + def _get_mp4toannexb_filter(self) -> str: + """获取mp4toannexb滤镜""" + encoder_args_str = " ".join(self.config.encoder_args).lower() + if "hevc" in encoder_args_str: + return "hevc_mp4toannexb" + return "h264_mp4toannexb" \ No newline at end of file diff --git a/entity/render_task.py b/entity/render_task.py new file mode 100644 index 0000000..764292b --- /dev/null +++ b/entity/render_task.py @@ -0,0 +1,146 @@ +import os +import uuid +from typing import List, Optional, Dict, Any +from dataclasses import dataclass, field +from enum import Enum + +from config.settings import get_ffmpeg_config +from util.exceptions import TaskValidationError, EffectError +from entity.effects import registry as effect_registry + +class TaskType(Enum): + COPY = "copy" + CONCAT = "concat" + ENCODE = "encode" + +@dataclass +class RenderTask: + """渲染任务数据类,只包含任务数据,不包含处理逻辑""" + input_files: List[str] = field(default_factory=list) + output_file: str = "" + task_type: TaskType = TaskType.COPY + + # 视频参数 + resolution: Optional[str] = None + frame_rate: int = 25 + speed: float = 1.0 + mute: bool = True + annexb: bool = False + + # 裁剪参数 + zoom_cut: Optional[int] = None + center_cut: Optional[int] = None + + # 资源列表 + subtitles: List[str] = field(default_factory=list) + luts: List[str] = field(default_factory=list) + audios: List[str] = field(default_factory=list) + overlays: List[str] = field(default_factory=list) + effects: List[str] = field(default_factory=list) + + # 扩展数据 + ext_data: Dict[str, Any] = field(default_factory=dict) + + def __post_init__(self): + """初始化后处理""" + # 检测annexb格式 + for input_file in self.input_files: + if isinstance(input_file, str) and input_file.endswith(".ts"): + self.annexb = True + break + + # 自动生成输出文件名 + if not self.output_file: + self._generate_output_filename() + + def _generate_output_filename(self): + """生成输出文件名""" + if self.annexb: + self.output_file = f"rand_{uuid.uuid4()}.ts" + else: + self.output_file = f"rand_{uuid.uuid4()}.mp4" + + def add_input_file(self, file_path: str): + """添加输入文件""" + self.input_files.append(file_path) + if file_path.endswith(".ts"): + self.annexb = True + + def add_overlay(self, *overlays: str): + """添加覆盖层""" + for overlay in overlays: + if overlay.endswith('.ass'): + self.subtitles.append(overlay) + else: + self.overlays.append(overlay) + + def add_audios(self, *audios: str): + """添加音频""" + self.audios.extend(audios) + + def add_lut(self, *luts: str): + """添加LUT""" + self.luts.extend(luts) + + def add_effect(self, *effects: str): + """添加效果""" + self.effects.extend(effects) + + def validate(self) -> bool: + """验证任务参数""" + if not self.input_files: + raise TaskValidationError("No input files specified") + + # 验证所有效果 + for effect_str in self.effects: + effect_name, params = effect_registry.parse_effect_string(effect_str) + processor = effect_registry.get_processor(effect_name, params, self.ext_data) + if processor and not processor.validate_params(): + raise EffectError(f"Invalid parameters for effect {effect_name}: {params}", effect_name, params) + + return True + + def can_copy(self) -> bool: + """检查是否可以直接复制""" + return (len(self.luts) == 0 and + len(self.overlays) == 0 and + len(self.subtitles) == 0 and + len(self.effects) == 0 and + self.speed == 1 and + len(self.audios) == 0 and + len(self.input_files) == 1 and + self.zoom_cut is None and + self.center_cut is None) + + def can_concat(self) -> bool: + """检查是否可以使用concat模式""" + return (len(self.luts) == 0 and + len(self.overlays) == 0 and + len(self.subtitles) == 0 and + len(self.effects) == 0 and + self.speed == 1 and + self.zoom_cut is None and + self.center_cut is None) + + def determine_task_type(self) -> TaskType: + """自动确定任务类型""" + if self.can_copy(): + return TaskType.COPY + elif self.can_concat(): + return TaskType.CONCAT + else: + return TaskType.ENCODE + + def update_task_type(self): + """更新任务类型""" + self.task_type = self.determine_task_type() + + def need_processing(self) -> bool: + """检查是否需要处理""" + if self.annexb: + return True + return not self.can_copy() + + def get_output_extension(self) -> str: + """获取输出文件扩展名""" + return ".ts" if self.annexb else ".mp4" \ No newline at end of file diff --git a/index.py b/index.py index f70013f..398182e 100644 --- a/index.py +++ b/index.py @@ -4,20 +4,22 @@ import sys import config import biz.task from telemetry import init_opentelemetry -from template import load_local_template, download_template, TEMPLATES +from services import DefaultTemplateService from util import api import os import glob -load_local_template() +# 使用新的服务架构 +template_service = DefaultTemplateService() +template_service.load_local_templates() # Check for redownload parameter if 'redownload' in sys.argv: print("Redownloading all templates...") - for template_name in TEMPLATES.keys(): + for template_name in template_service.templates.keys(): print(f"Redownloading template: {template_name}") - download_template(template_name) + template_service.download_template(template_name) print("All templates redownloaded successfully!") sys.exit(0) import logging diff --git a/services/__init__.py b/services/__init__.py new file mode 100644 index 0000000..9410dd3 --- /dev/null +++ b/services/__init__.py @@ -0,0 +1,12 @@ +from .render_service import RenderService, DefaultRenderService +from .task_service import TaskService, DefaultTaskService +from .template_service import TemplateService, DefaultTemplateService + +__all__ = [ + 'RenderService', + 'DefaultRenderService', + 'TaskService', + 'DefaultTaskService', + 'TemplateService', + 'DefaultTemplateService' +] \ No newline at end of file diff --git a/services/render_service.py b/services/render_service.py new file mode 100644 index 0000000..29c9673 --- /dev/null +++ b/services/render_service.py @@ -0,0 +1,237 @@ +import subprocess +import os +import logging +from abc import ABC, abstractmethod +from typing import Optional, Union + +from opentelemetry.trace import Status, StatusCode + +from entity.render_task import RenderTask +from entity.ffmpeg_command_builder import FFmpegCommandBuilder +from util.exceptions import RenderError, FFmpegError +from util.ffmpeg import probe_video_info, fade_out_audio, handle_ffmpeg_output, subprocess_args +from telemetry import get_tracer + +logger = logging.getLogger(__name__) + +def _convert_ffmpeg_task_to_render_task(ffmpeg_task): + """将旧的FfmpegTask转换为新的RenderTask""" + from entity.render_task import RenderTask, TaskType + + # 获取输入文件 + input_files = [] + for inp in ffmpeg_task.input_file: + if hasattr(inp, 'get_output_file'): + input_files.append(inp.get_output_file()) + else: + input_files.append(str(inp)) + + # 确定任务类型 + task_type = TaskType.COPY + if ffmpeg_task.task_type == 'concat': + task_type = TaskType.CONCAT + elif ffmpeg_task.task_type == 'encode': + task_type = TaskType.ENCODE + + # 创建新任务 + render_task = RenderTask( + input_files=input_files, + output_file=ffmpeg_task.output_file, + task_type=task_type, + resolution=ffmpeg_task.resolution, + frame_rate=ffmpeg_task.frame_rate, + annexb=ffmpeg_task.annexb, + center_cut=ffmpeg_task.center_cut, + zoom_cut=ffmpeg_task.zoom_cut, + ext_data=getattr(ffmpeg_task, 'ext_data', {}) + ) + + # 复制各种资源 + render_task.effects = getattr(ffmpeg_task, 'effects', []) + render_task.luts = getattr(ffmpeg_task, 'luts', []) + render_task.audios = getattr(ffmpeg_task, 'audios', []) + render_task.overlays = getattr(ffmpeg_task, 'overlays', []) + render_task.subtitles = getattr(ffmpeg_task, 'subtitles', []) + + return render_task + +class RenderService(ABC): + """渲染服务抽象接口""" + + @abstractmethod + def render(self, task: Union[RenderTask, 'FfmpegTask']) -> bool: + """ + 执行渲染任务 + + Args: + task: 渲染任务 + + Returns: + bool: 渲染是否成功 + """ + pass + + @abstractmethod + def get_video_info(self, file_path: str) -> tuple[int, int, float]: + """ + 获取视频信息 + + Args: + file_path: 视频文件路径 + + Returns: + tuple: (width, height, duration) + """ + pass + + @abstractmethod + def fade_out_audio(self, file_path: str, duration: float, fade_seconds: float = 2.0) -> str: + """ + 音频淡出处理 + + Args: + file_path: 音频文件路径 + duration: 音频总时长 + fade_seconds: 淡出时长 + + Returns: + str: 处理后的文件路径 + """ + pass + +class DefaultRenderService(RenderService): + """默认渲染服务实现""" + + def render(self, task: Union[RenderTask, 'FfmpegTask']) -> bool: + """执行渲染任务""" + # 兼容旧的FfmpegTask + if hasattr(task, 'get_ffmpeg_args'): # 这是FfmpegTask + # 使用旧的方式执行 + return self._render_legacy_ffmpeg_task(task) + + tracer = get_tracer(__name__) + with tracer.start_as_current_span("render_task") as span: + try: + # 验证任务 + task.validate() + span.set_attribute("task.type", task.task_type.value) + span.set_attribute("task.input_files", len(task.input_files)) + span.set_attribute("task.output_file", task.output_file) + + # 检查是否需要处理 + if not task.need_processing(): + if len(task.input_files) == 1: + task.output_file = task.input_files[0] + span.set_status(Status(StatusCode.OK)) + return True + + # 构建FFmpeg命令 + builder = FFmpegCommandBuilder(task) + ffmpeg_args = builder.build_command() + + if not ffmpeg_args: + # 不需要处理,直接返回 + if len(task.input_files) == 1: + task.output_file = task.input_files[0] + span.set_status(Status(StatusCode.OK)) + return True + + # 执行FFmpeg命令 + return self._execute_ffmpeg(ffmpeg_args, span) + + except Exception as e: + span.set_status(Status(StatusCode.ERROR)) + logger.error(f"Render failed: {e}", exc_info=True) + raise RenderError(f"Render failed: {e}") from e + + def _execute_ffmpeg(self, args: list[str], span) -> bool: + """执行FFmpeg命令""" + span.set_attribute("ffmpeg.args", " ".join(args)) + logger.info("Executing FFmpeg: %s", " ".join(args)) + + try: + # 执行FFmpeg进程 + process = subprocess.run( + ["ffmpeg", "-progress", "-", "-loglevel", "error"] + args[1:], + stderr=subprocess.PIPE, + **subprocess_args(True) + ) + + span.set_attribute("ffmpeg.return_code", process.returncode) + + # 处理输出 + if process.stdout: + output = handle_ffmpeg_output(process.stdout) + span.set_attribute("ffmpeg.output", output) + logger.info("FFmpeg output: %s", output) + + # 检查返回码 + if process.returncode != 0: + error_msg = process.stderr.decode() if process.stderr else "Unknown error" + span.set_attribute("ffmpeg.error", error_msg) + span.set_status(Status(StatusCode.ERROR)) + logger.error("FFmpeg failed with return code %d: %s", process.returncode, error_msg) + raise FFmpegError( + f"FFmpeg execution failed", + command=args, + return_code=process.returncode, + stderr=error_msg + ) + + # 检查输出文件 + output_file = args[-1] # 输出文件总是最后一个参数 + if not os.path.exists(output_file): + span.set_status(Status(StatusCode.ERROR)) + raise RenderError(f"Output file not created: {output_file}") + + # 检查文件大小 + file_size = os.path.getsize(output_file) + span.set_attribute("output.file_size", file_size) + + if file_size < 4096: # 文件过小 + span.set_status(Status(StatusCode.ERROR)) + raise RenderError(f"Output file too small: {file_size} bytes") + + span.set_status(Status(StatusCode.OK)) + logger.info("FFmpeg execution completed successfully") + return True + + except subprocess.SubprocessError as e: + span.set_status(Status(StatusCode.ERROR)) + logger.error("Subprocess error: %s", e) + raise FFmpegError(f"Subprocess error: {e}") from e + + def get_video_info(self, file_path: str) -> tuple[int, int, float]: + """获取视频信息""" + return probe_video_info(file_path) + + def fade_out_audio(self, file_path: str, duration: float, fade_seconds: float = 2.0) -> str: + """音频淡出处理""" + return fade_out_audio(file_path, duration, fade_seconds) + + def _render_legacy_ffmpeg_task(self, ffmpeg_task) -> bool: + """兼容处理旧的FfmpegTask""" + tracer = get_tracer(__name__) + with tracer.start_as_current_span("render_legacy_ffmpeg_task") as span: + try: + # 处理依赖任务 + for sub_task in ffmpeg_task.analyze_input_render_tasks(): + if not self.render(sub_task): + span.set_status(Status(StatusCode.ERROR)) + return False + + # 获取FFmpeg参数 + ffmpeg_args = ffmpeg_task.get_ffmpeg_args() + + if not ffmpeg_args: + # 不需要处理,直接返回 + span.set_status(Status(StatusCode.OK)) + return True + + # 执行FFmpeg命令 + return self._execute_ffmpeg(ffmpeg_args, span) + + except Exception as e: + span.set_status(Status(StatusCode.ERROR)) + logger.error(f"Legacy FFmpeg task render failed: {e}", exc_info=True) + raise RenderError(f"Legacy render failed: {e}") from e \ No newline at end of file diff --git a/services/task_service.py b/services/task_service.py new file mode 100644 index 0000000..7a5ff9e --- /dev/null +++ b/services/task_service.py @@ -0,0 +1,289 @@ +import json +import logging +import os +from abc import ABC, abstractmethod +from concurrent.futures import ThreadPoolExecutor +from typing import Dict, Any, List, Optional + +from opentelemetry.trace import Status, StatusCode + +from entity.render_task import RenderTask +from services.render_service import RenderService +from services.template_service import TemplateService +from util.exceptions import TaskError, TaskValidationError +from util import api, oss +from telemetry import get_tracer + +logger = logging.getLogger(__name__) + +class TaskService(ABC): + """任务服务抽象接口""" + + @abstractmethod + def process_task(self, task_info: Dict[str, Any]) -> bool: + """ + 处理任务 + + Args: + task_info: 任务信息 + + Returns: + bool: 处理是否成功 + """ + pass + + @abstractmethod + def create_render_task(self, task_info: Dict[str, Any], template_info: Dict[str, Any]) -> RenderTask: + """ + 创建渲染任务 + + Args: + task_info: 任务信息 + template_info: 模板信息 + + Returns: + RenderTask: 渲染任务对象 + """ + pass + +class DefaultTaskService(TaskService): + """默认任务服务实现""" + + def __init__(self, render_service: RenderService, template_service: TemplateService): + self.render_service = render_service + self.template_service = template_service + + def process_task(self, task_info: Dict[str, Any]) -> bool: + """处理任务""" + tracer = get_tracer(__name__) + with tracer.start_as_current_span("process_task") as span: + try: + # 标准化任务信息 + task_info = api.normalize_task(task_info) + span.set_attribute("task.id", task_info.get("id", "unknown")) + span.set_attribute("task.template_id", task_info.get("templateId", "unknown")) + + # 获取模板信息 + template_id = task_info.get("templateId") + template_info = self.template_service.get_template(template_id) + if not template_info: + raise TaskError(f"Template not found: {template_id}") + + # 报告任务开始 + api.report_task_start(task_info) + + # 创建渲染任务 + render_task = self.create_render_task(task_info, template_info) + + # 执行渲染 + success = self.render_service.render(render_task) + if not success: + span.set_status(Status(StatusCode.ERROR)) + api.report_task_failed(task_info, "Render failed") + return False + + # 获取视频信息 + width, height, duration = self.render_service.get_video_info(render_task.output_file) + span.set_attribute("video.width", width) + span.set_attribute("video.height", height) + span.set_attribute("video.duration", duration) + + # 音频淡出 + new_file = self.render_service.fade_out_audio(render_task.output_file, duration) + render_task.output_file = new_file + + # 上传文件 - 创建一个兼容对象 + class TaskCompat: + def __init__(self, output_file): + self.output_file = output_file + def get_output_file(self): + return self.output_file + + task_compat = TaskCompat(render_task.output_file) + upload_success = api.upload_task_file(task_info, task_compat) + if not upload_success: + span.set_status(Status(StatusCode.ERROR)) + api.report_task_failed(task_info, "Upload failed") + return False + + # 清理临时文件 + self._cleanup_temp_files(render_task) + + # 报告任务成功 + api.report_task_success(task_info, videoInfo={ + "width": width, + "height": height, + "duration": duration + }) + + span.set_status(Status(StatusCode.OK)) + return True + + except Exception as e: + span.set_status(Status(StatusCode.ERROR)) + logger.error(f"Task processing failed: {e}", exc_info=True) + api.report_task_failed(task_info, str(e)) + return False + + def create_render_task(self, task_info: Dict[str, Any], template_info: Dict[str, Any]) -> RenderTask: + """创建渲染任务""" + tracer = get_tracer(__name__) + with tracer.start_as_current_span("create_render_task") as span: + # 解析任务参数 + task_params_str = task_info.get("taskParams", "{}") + span.set_attribute("task_params", task_params_str) + + try: + task_params = json.loads(task_params_str) + task_params_orig = json.loads(task_params_str) + except json.JSONDecodeError as e: + raise TaskValidationError(f"Invalid task params JSON: {e}") + + # 并行下载资源 + self._download_resources(task_params) + + # 创建子任务列表 + sub_tasks = [] + only_if_usage_count = {} + + for part in template_info.get("video_parts", []): + source, ext_data = self._parse_video_source( + part.get('source'), task_params, template_info + ) + if not source: + logger.warning("No video found for part: %s", part) + continue + + # 检查only_if条件 + only_if = part.get('only_if', '') + if only_if: + only_if_usage_count[only_if] = only_if_usage_count.get(only_if, 0) + 1 + required_count = only_if_usage_count[only_if] + if not self._check_placeholder_exist_with_count(only_if, task_params_orig, required_count): + logger.info("Skipping part due to only_if condition: %s (need %d)", only_if, required_count) + continue + + # 创建子任务 + sub_task = self._create_sub_task(part, source, ext_data, template_info) + sub_tasks.append(sub_task) + + # 创建主任务 + output_file = f"out_{task_info.get('id', 'unknown')}.mp4" + main_task = RenderTask( + input_files=[task.output_file for task in sub_tasks], + output_file=output_file, + resolution=template_info.get("video_size", ""), + frame_rate=template_info.get("frame_rate", 25), + center_cut=template_info.get("crop_mode"), + zoom_cut=template_info.get("zoom_cut") + ) + + # 应用整体模板设置 + overall_template = template_info.get("overall_template", {}) + self._apply_template_settings(main_task, overall_template, template_info) + + # 设置扩展数据 + main_task.ext_data = task_info + + span.set_attribute("render_task.sub_tasks", len(sub_tasks)) + span.set_attribute("render_task.effects", len(main_task.effects)) + + return main_task + + def _download_resources(self, task_params: Dict[str, Any]): + """并行下载资源""" + with ThreadPoolExecutor(max_workers=8) as executor: + for param_list in task_params.values(): + if isinstance(param_list, list): + for param in param_list: + url = param.get("url", "") + if url.startswith("http"): + _, filename = os.path.split(url) + executor.submit(oss.download_from_oss, url, filename, True) + + def _parse_video_source(self, source: str, task_params: Dict[str, Any], + template_info: Dict[str, Any]) -> tuple[Optional[str], Dict[str, Any]]: + """解析视频源""" + if source.startswith('PLACEHOLDER_'): + placeholder_id = source.replace('PLACEHOLDER_', '') + new_sources = task_params.get(placeholder_id, []) + pick_source = {} + + if isinstance(new_sources, list): + if len(new_sources) == 0: + logger.debug("No video found for placeholder: %s", placeholder_id) + return None, pick_source + else: + pick_source = new_sources.pop(0) + new_sources = pick_source.get("url", "") + + if new_sources.startswith("http"): + _, source_name = os.path.split(new_sources) + oss.download_from_oss(new_sources, source_name, True) + return source_name, pick_source + return new_sources, pick_source + + return os.path.join(template_info.get("local_path", ""), source), {} + + def _check_placeholder_exist_with_count(self, placeholder_id: str, task_params: Dict[str, Any], + required_count: int = 1) -> bool: + """检查占位符是否存在足够数量的片段""" + if placeholder_id in task_params: + new_sources = task_params.get(placeholder_id, []) + if isinstance(new_sources, list): + return len(new_sources) >= required_count + return required_count <= 1 + return False + + def _create_sub_task(self, part: Dict[str, Any], source: str, ext_data: Dict[str, Any], + template_info: Dict[str, Any]) -> RenderTask: + """创建子任务""" + sub_task = RenderTask( + input_files=[source], + resolution=template_info.get("video_size", ""), + frame_rate=template_info.get("frame_rate", 25), + annexb=True, + center_cut=part.get("crop_mode"), + zoom_cut=part.get("zoom_cut"), + ext_data=ext_data + ) + + # 应用部分模板设置 + self._apply_template_settings(sub_task, part, template_info) + + return sub_task + + def _apply_template_settings(self, task: RenderTask, template_part: Dict[str, Any], + template_info: Dict[str, Any]): + """应用模板设置到任务""" + # 添加效果 + for effect in template_part.get('effects', []): + task.add_effect(effect) + + # 添加LUT + for lut in template_part.get('luts', []): + full_path = os.path.join(template_info.get("local_path", ""), lut) + task.add_lut(full_path.replace("\\", "/")) + + # 添加音频 + for audio in template_part.get('audios', []): + full_path = os.path.join(template_info.get("local_path", ""), audio) + task.add_audios(full_path) + + # 添加覆盖层 + for overlay in template_part.get('overlays', []): + full_path = os.path.join(template_info.get("local_path", ""), overlay) + task.add_overlay(full_path) + + def _cleanup_temp_files(self, task: RenderTask): + """清理临时文件""" + try: + template_dir = os.getenv("TEMPLATE_DIR", "") + if template_dir and template_dir not in task.output_file: + if os.path.exists(task.output_file): + os.remove(task.output_file) + logger.info("Cleaned up temp file: %s", task.output_file) + else: + logger.info("Skipped cleanup of template file: %s", task.output_file) + except OSError as e: + logger.warning("Failed to cleanup temp file %s: %s", task.output_file, e) \ No newline at end of file diff --git a/services/template_service.py b/services/template_service.py new file mode 100644 index 0000000..0092546 --- /dev/null +++ b/services/template_service.py @@ -0,0 +1,266 @@ +import json +import os +import logging +from abc import ABC, abstractmethod +from typing import Dict, Any, Optional + +from opentelemetry.trace import Status, StatusCode + +from util.exceptions import TemplateError, TemplateNotFoundError, TemplateValidationError +from util import api, oss +from config.settings import get_storage_config +from telemetry import get_tracer + +logger = logging.getLogger(__name__) + +class TemplateService(ABC): + """模板服务抽象接口""" + + @abstractmethod + def get_template(self, template_id: str) -> Optional[Dict[str, Any]]: + """ + 获取模板信息 + + Args: + template_id: 模板ID + + Returns: + Dict[str, Any]: 模板信息,如果不存在则返回None + """ + pass + + @abstractmethod + def load_local_templates(self): + """加载本地模板""" + pass + + @abstractmethod + def download_template(self, template_id: str) -> bool: + """ + 下载模板 + + Args: + template_id: 模板ID + + Returns: + bool: 下载是否成功 + """ + pass + + @abstractmethod + def validate_template(self, template_info: Dict[str, Any]) -> bool: + """ + 验证模板 + + Args: + template_info: 模板信息 + + Returns: + bool: 验证是否通过 + """ + pass + +class DefaultTemplateService(TemplateService): + """默认模板服务实现""" + + def __init__(self): + self.templates: Dict[str, Dict[str, Any]] = {} + self.storage_config = get_storage_config() + + def get_template(self, template_id: str) -> Optional[Dict[str, Any]]: + """获取模板信息""" + if template_id not in self.templates: + # 尝试下载模板 + if not self.download_template(template_id): + return None + return self.templates.get(template_id) + + def load_local_templates(self): + """加载本地模板""" + template_dir = self.storage_config.template_dir + if not os.path.exists(template_dir): + logger.warning("Template directory does not exist: %s", template_dir) + return + + for template_name in os.listdir(template_dir): + if template_name.startswith("_") or template_name.startswith("."): + continue + + target_path = os.path.join(template_dir, template_name) + if os.path.isdir(target_path): + try: + self._load_template(template_name, target_path) + except Exception as e: + logger.error("Failed to load template %s: %s", template_name, e) + + def download_template(self, template_id: str) -> bool: + """下载模板""" + tracer = get_tracer(__name__) + with tracer.start_as_current_span("download_template") as span: + try: + span.set_attribute("template.id", template_id) + + # 获取远程模板信息 + template_info = api.get_template_info(template_id) + if template_info is None: + logger.warning("Failed to get template info: %s", template_id) + return False + + local_path = template_info.get('local_path') + if not local_path: + local_path = os.path.join(self.storage_config.template_dir, str(template_id)) + template_info['local_path'] = local_path + + # 创建本地目录 + if not os.path.isdir(local_path): + os.makedirs(local_path) + + # 下载模板资源 + overall_template = template_info.get('overall_template', {}) + video_parts = template_info.get('video_parts', []) + + self._download_template_assets(overall_template, template_info) + for video_part in video_parts: + self._download_template_assets(video_part, template_info) + + # 保存模板定义文件 + template_file = os.path.join(local_path, 'template.json') + with open(template_file, 'w', encoding='utf-8') as f: + json.dump(template_info, f, ensure_ascii=False, indent=2) + + # 加载到内存 + self._load_template(template_id, local_path) + + span.set_status(Status(StatusCode.OK)) + logger.info("Template downloaded successfully: %s", template_id) + return True + + except Exception as e: + span.set_status(Status(StatusCode.ERROR)) + logger.error("Failed to download template %s: %s", template_id, e) + return False + + def validate_template(self, template_info: Dict[str, Any]) -> bool: + """验证模板""" + try: + local_path = template_info.get("local_path") + if not local_path: + raise TemplateValidationError("Template missing local_path") + + # 验证视频部分 + for video_part in template_info.get("video_parts", []): + self._validate_template_part(video_part, local_path) + + # 验证整体模板 + overall_template = template_info.get("overall_template", {}) + if overall_template: + self._validate_template_part(overall_template, local_path) + + return True + + except TemplateValidationError: + raise + except Exception as e: + raise TemplateValidationError(f"Template validation failed: {e}") + + def _load_template(self, template_name: str, local_path: str): + """加载单个模板""" + logger.info("Loading template: %s (%s)", template_name, local_path) + + template_def_file = os.path.join(local_path, "template.json") + if not os.path.exists(template_def_file): + raise TemplateNotFoundError(f"Template definition file not found: {template_def_file}") + + try: + with open(template_def_file, 'r', encoding='utf-8') as f: + template_info = json.load(f) + except json.JSONDecodeError as e: + raise TemplateError(f"Invalid template JSON: {e}") + + template_info["local_path"] = local_path + + try: + self.validate_template(template_info) + self.templates[template_name] = template_info + logger.info("Template loaded successfully: %s", template_name) + except TemplateValidationError as e: + logger.error("Template validation failed for %s: %s. Attempting to re-download.", template_name, e) + # 模板验证失败,尝试重新下载 + if self.download_template(template_name): + logger.info("Template re-downloaded successfully: %s", template_name) + else: + logger.error("Failed to re-download template: %s", template_name) + raise + + def _download_template_assets(self, template_part: Dict[str, Any], template_info: Dict[str, Any]): + """下载模板资源""" + local_path = template_info['local_path'] + + # 下载源文件 + if 'source' in template_part: + source = template_part['source'] + if isinstance(source, str) and source.startswith("http"): + _, filename = os.path.split(source) + new_file_path = os.path.join(local_path, filename) + oss.download_from_oss(source, new_file_path) + + if filename.endswith(".mp4"): + from util.ffmpeg import re_encode_and_annexb + new_file_path = re_encode_and_annexb(new_file_path) + + template_part['source'] = os.path.relpath(new_file_path, local_path) + + # 下载覆盖层 + if 'overlays' in template_part: + for i, overlay in enumerate(template_part['overlays']): + if isinstance(overlay, str) and overlay.startswith("http"): + _, filename = os.path.split(overlay) + oss.download_from_oss(overlay, os.path.join(local_path, filename)) + template_part['overlays'][i] = filename + + # 下载LUT + if 'luts' in template_part: + for i, lut in enumerate(template_part['luts']): + if isinstance(lut, str) and lut.startswith("http"): + _, filename = os.path.split(lut) + oss.download_from_oss(lut, os.path.join(local_path, filename)) + template_part['luts'][i] = filename + + # 下载音频 + if 'audios' in template_part: + for i, audio in enumerate(template_part['audios']): + if isinstance(audio, str) and audio.startswith("http"): + _, filename = os.path.split(audio) + oss.download_from_oss(audio, os.path.join(local_path, filename)) + template_part['audios'][i] = filename + + def _validate_template_part(self, template_part: Dict[str, Any], base_dir: str): + """验证模板部分""" + # 验证源文件 + source_file = template_part.get("source", "") + if source_file and not source_file.startswith("http") and not source_file.startswith("PLACEHOLDER_"): + if not os.path.isabs(source_file): + source_file = os.path.join(base_dir, source_file) + if not os.path.exists(source_file): + raise TemplateValidationError(f"Source file not found: {source_file}") + + # 验证音频文件 + for audio in template_part.get("audios", []): + if not os.path.isabs(audio): + audio = os.path.join(base_dir, audio) + if not os.path.exists(audio): + raise TemplateValidationError(f"Audio file not found: {audio}") + + # 验证LUT文件 + for lut in template_part.get("luts", []): + if not os.path.isabs(lut): + lut = os.path.join(base_dir, lut) + if not os.path.exists(lut): + raise TemplateValidationError(f"LUT file not found: {lut}") + + # 验证覆盖层文件 + for overlay in template_part.get("overlays", []): + if not os.path.isabs(overlay): + overlay = os.path.join(base_dir, overlay) + if not os.path.exists(overlay): + raise TemplateValidationError(f"Overlay file not found: {overlay}") \ No newline at end of file diff --git a/template/__init__.py b/template/__init__.py index de98076..f16905b 100644 --- a/template/__init__.py +++ b/template/__init__.py @@ -4,125 +4,66 @@ import logging from telemetry import get_tracer from util import api, oss +from services.template_service import DefaultTemplateService -TEMPLATES = {} logger = logging.getLogger("template") -def check_local_template(local_name): - template_def = TEMPLATES[local_name] - base_dir = template_def.get("local_path") - for video_part in template_def.get("video_parts", []): - source_file = video_part.get("source", "") - if str(source_file).startswith("http"): - # download file - ... - elif str(source_file).startswith("PLACEHOLDER_"): - continue - else: - if not os.path.isabs(source_file): - source_file = os.path.join(base_dir, source_file) - if not os.path.exists(source_file): - logger.error(f"{source_file} not found, please check the template definition") - raise Exception(f"{source_file} not found, please check the template definition") - for audio in video_part.get("audios", []): - if not os.path.isabs(audio): - audio = os.path.join(base_dir, audio) - if not os.path.exists(audio): - logger.error(f"{audio} not found, please check the template definition") - raise Exception(f"{audio} not found, please check the template definition") - for lut in video_part.get("luts", []): - if not os.path.isabs(lut): - lut = os.path.join(base_dir, lut) - if not os.path.exists(lut): - logger.error(f"{lut} not found, please check the template definition") - raise Exception(f"{lut} not found, please check the template definition") - for mask in video_part.get("overlays", []): - if not os.path.isabs(mask): - mask = os.path.join(base_dir, mask) - if not os.path.exists(mask): - logger.error(f"{mask} not found, please check the template definition") - raise Exception(f"{mask} not found, please check the template definition") +# 全局模板服务实例 +_template_service = None +def _get_template_service(): + """获取模板服务实例""" + global _template_service + if _template_service is None: + _template_service = DefaultTemplateService() + return _template_service + +# 向后兼容的全局变量和函数 +TEMPLATES = {} + +def _update_templates_dict(): + """更新全局TEMPLATES字典以保持向后兼容""" + service = _get_template_service() + TEMPLATES.clear() + TEMPLATES.update(service.templates) + +def check_local_template(local_name): + """向后兼容函数""" + service = _get_template_service() + template_def = service.templates.get(local_name) + if template_def: + try: + service.validate_template(template_def) + except Exception as e: + logger.error(f"Template validation failed: {e}") + raise def load_template(template_name, local_path): - global TEMPLATES - logger.info(f"加载视频模板定义:【{template_name}({local_path})】") - template_def_file = os.path.join(local_path, "template.json") - if os.path.exists(template_def_file): - TEMPLATES[template_name] = json.load(open(template_def_file, 'rb')) - TEMPLATES[template_name]["local_path"] = local_path - try: - check_local_template(template_name) - logger.info(f"完成加载【{template_name}】模板") - except Exception as e: - logger.error(f"模板定义文件【{template_def_file}】有误,正在尝试重新下载模板", exc_info=e) - download_template(template_name) - + """向后兼容函数""" + service = _get_template_service() + service._load_template(template_name, local_path) + _update_templates_dict() def load_local_template(): - for template_name in os.listdir(os.getenv("TEMPLATE_DIR")): - if template_name.startswith("_"): - continue - if template_name.startswith("."): - continue - target_path = os.path.join(os.getenv("TEMPLATE_DIR"), template_name) - if os.path.isdir(target_path): - load_template(template_name, target_path) - + """加载本地模板(向后兼容函数)""" + service = _get_template_service() + service.load_local_templates() + _update_templates_dict() def get_template_def(template_id): - if template_id not in TEMPLATES: - download_template(template_id) - return TEMPLATES.get(template_id) + """获取模板定义(向后兼容函数)""" + service = _get_template_service() + template = service.get_template(template_id) + _update_templates_dict() + return template def download_template(template_id): - tracer = get_tracer(__name__) - with tracer.start_as_current_span("download_template"): - template_info = api.get_template_info(template_id) - if template_info is None: - return - if not os.path.isdir(template_info['local_path']): - os.makedirs(template_info['local_path']) - # download template assets - overall_template = template_info['overall_template'] - video_parts = template_info['video_parts'] - def _download_assets(_template): - if 'source' in _template: - if str(_template['source']).startswith("http"): - _, _fn = os.path.split(_template['source']) - new_fp = os.path.join(template_info['local_path'], _fn) - oss.download_from_oss(_template['source'], new_fp) - if _fn.endswith(".mp4"): - from util.ffmpeg import re_encode_and_annexb - new_fp = re_encode_and_annexb(new_fp) - _template['source'] = os.path.relpath(new_fp, template_info['local_path']) - if 'overlays' in _template: - for i in range(len(_template['overlays'])): - overlay = _template['overlays'][i] - if str(overlay).startswith("http"): - _, _fn = os.path.split(overlay) - oss.download_from_oss(overlay, os.path.join(template_info['local_path'], _fn)) - _template['overlays'][i] = _fn - if 'luts' in _template: - for i in range(len(_template['luts'])): - lut = _template['luts'][i] - if str(lut).startswith("http"): - _, _fn = os.path.split(lut) - oss.download_from_oss(lut, os.path.join(template_info['local_path'], _fn)) - _template['luts'][i] = _fn - if 'audios' in _template: - for i in range(len(_template['audios'])): - if str(_template['audios'][i]).startswith("http"): - _, _fn = os.path.split(_template['audios'][i]) - oss.download_from_oss(_template['audios'][i], os.path.join(template_info['local_path'], _fn)) - _template['audios'][i] = _fn - _download_assets(overall_template) - for video_part in video_parts: - _download_assets(video_part) - with open(os.path.join(template_info['local_path'], 'template.json'), 'w', encoding='utf-8') as f: - json.dump(template_info, f) - load_template(template_id, template_info['local_path']) - + """下载模板(向后兼容函数)""" + service = _get_template_service() + success = service.download_template(template_id) + _update_templates_dict() + return success def analyze_template(template_id): - ... + """分析模板(占位符函数)""" + pass diff --git a/util/api.py b/util/api.py index 78d165f..5d75583 100644 --- a/util/api.py +++ b/util/api.py @@ -24,13 +24,14 @@ def sync_center(): 通过接口获取任务 :return: 任务列表 """ - from template import TEMPLATES, download_template + from services import DefaultTemplateService + template_service = DefaultTemplateService() try: response = session.post(os.getenv('API_ENDPOINT') + "/sync", json={ 'accessKey': os.getenv('ACCESS_KEY'), 'clientStatus': util.system.get_sys_info(), 'templateList': [{'id': t.get('id', ''), 'updateTime': t.get('updateTime', '')} for t in - TEMPLATES.values()] + template_service.templates.values()] }, timeout=10) response.raise_for_status() except requests.RequestException as e: @@ -56,7 +57,7 @@ def sync_center(): template_id = template.get('id', '') if template_id: logger.info("更新模板:【%s】", template_id) - download_template(template_id) + template_service.download_template(template_id) return tasks diff --git a/util/exceptions.py b/util/exceptions.py new file mode 100644 index 0000000..9b05185 --- /dev/null +++ b/util/exceptions.py @@ -0,0 +1,72 @@ +class RenderWorkerError(Exception): + """RenderWorker基础异常类""" + def __init__(self, message: str, error_code: str = None): + super().__init__(message) + self.message = message + self.error_code = error_code or self.__class__.__name__ + +class ConfigurationError(RenderWorkerError): + """配置错误""" + pass + +class TemplateError(RenderWorkerError): + """模板相关错误""" + pass + +class TemplateNotFoundError(TemplateError): + """模板未找到错误""" + pass + +class TemplateValidationError(TemplateError): + """模板验证错误""" + pass + +class TaskError(RenderWorkerError): + """任务处理错误""" + pass + +class TaskValidationError(TaskError): + """任务参数验证错误""" + pass + +class RenderError(RenderWorkerError): + """渲染处理错误""" + pass + +class FFmpegError(RenderError): + """FFmpeg执行错误""" + def __init__(self, message: str, command: list = None, return_code: int = None, stderr: str = None): + super().__init__(message) + self.command = command + self.return_code = return_code + self.stderr = stderr + +class EffectError(RenderError): + """效果处理错误""" + def __init__(self, message: str, effect_name: str = None, effect_params: str = None): + super().__init__(message) + self.effect_name = effect_name + self.effect_params = effect_params + +class StorageError(RenderWorkerError): + """存储相关错误""" + pass + +class APIError(RenderWorkerError): + """API调用错误""" + def __init__(self, message: str, status_code: int = None, response_body: str = None): + super().__init__(message) + self.status_code = status_code + self.response_body = response_body + +class ResourceError(RenderWorkerError): + """资源相关错误""" + pass + +class ResourceNotFoundError(ResourceError): + """资源未找到错误""" + pass + +class DownloadError(ResourceError): + """下载错误""" + pass \ No newline at end of file