diff --git a/app.py b/app.py index df50743..077c694 100644 --- a/app.py +++ b/app.py @@ -21,48 +21,51 @@ LOGGER = logging.getLogger(__name__) init_opentelemetry(batch=False) app = flask.Flask(__name__) -@app.get('/health/check') + +@app.get("/health/check") def health_check(): return api.sync_center() -@app.post('/') + +@app.post("/") def do_nothing(): return "NOOP" -@app.post('/') + +@app.post("/") def do_task(task_id): try: task_info = api.get_task_info(task_id) if not task_info: LOGGER.error("Failed to get task info for task: %s", task_id) return "Failed to get task info", 400 - + template_id = task_info.get("templateId") if not template_id: LOGGER.error("Task %s missing templateId", task_id) return "Missing templateId", 400 - + local_template_info = template_service.get_template(template_id) template_info = api.get_template_info(template_id) - + if not template_info: LOGGER.error("Failed to get template info for template: %s", template_id) return "Failed to get template info", 400 - + if local_template_info: if local_template_info.get("updateTime") != template_info.get("updateTime"): LOGGER.info("Template %s needs update, downloading...", template_id) if not template_service.download_template(template_id): LOGGER.error("Failed to download template: %s", template_id) return "Failed to download template", 500 - + biz.task.start_task(task_info) return "OK" - + except Exception as e: LOGGER.error("Error processing task %s: %s", task_id, e, exc_info=True) return "Internal server error", 500 -if __name__ == '__main__': +if __name__ == "__main__": app.run(host="0.0.0.0", port=9998) diff --git a/biz/ffmpeg.py b/biz/ffmpeg.py index 0e6d785..27d2d9f 100644 --- a/biz/ffmpeg.py +++ b/biz/ffmpeg.py @@ -15,9 +15,10 @@ from util import ffmpeg, oss from util.ffmpeg import fade_out_audio from telemetry import get_tracer -logger = logging.getLogger('biz/ffmpeg') +logger = logging.getLogger("biz/ffmpeg") _render_service = None + def _get_render_service(): """获取渲染服务实例""" global _render_service @@ -31,19 +32,28 @@ def parse_ffmpeg_task(task_info, template_info): 解析FFmpeg任务 - 保留用于向后兼容 实际处理逻辑已迁移到 services.TaskService.create_render_task """ - logger.warning("parse_ffmpeg_task is deprecated, use TaskService.create_render_task instead") - + logger.warning( + "parse_ffmpeg_task is deprecated, use TaskService.create_render_task instead" + ) + # 使用新的任务服务创建任务 - from services import DefaultTaskService, DefaultRenderService, DefaultTemplateService + from services import ( + DefaultTaskService, + DefaultRenderService, + DefaultTemplateService, + ) + render_service = DefaultRenderService() template_service = DefaultTemplateService() task_service = DefaultTaskService(render_service, template_service) - + # 创建新的渲染任务 render_task = task_service.create_render_task(task_info, template_info) - + # 为了向后兼容,创建一个FfmpegTask包装器 - ffmpeg_task = FfmpegTask(render_task.input_files, output_file=render_task.output_file) + ffmpeg_task = FfmpegTask( + render_task.input_files, output_file=render_task.output_file + ) ffmpeg_task.resolution = render_task.resolution ffmpeg_task.frame_rate = render_task.frame_rate ffmpeg_task.annexb = render_task.annexb @@ -54,7 +64,7 @@ def parse_ffmpeg_task(task_info, template_info): ffmpeg_task.luts = render_task.luts ffmpeg_task.audios = render_task.audios ffmpeg_task.overlays = render_task.overlays - + return ffmpeg_task @@ -64,14 +74,20 @@ def parse_video(source, task_params, template_info): logger.warning("parse_video is deprecated, functionality moved to TaskService") return source, {} + def check_placeholder_exist(placeholder_id, task_params): - """已迁移到 TaskService._check_placeholder_exist_with_count""" - logger.warning("check_placeholder_exist is deprecated, functionality moved to TaskService") + """已迁移到 TaskService._check_placeholder_exist_with_count""" + logger.warning( + "check_placeholder_exist is deprecated, functionality moved to TaskService" + ) return placeholder_id in task_params + def check_placeholder_exist_with_count(placeholder_id, task_params, required_count=1): """已迁移到 TaskService._check_placeholder_exist_with_count""" - logger.warning("check_placeholder_exist_with_count is deprecated, functionality moved to TaskService") + logger.warning( + "check_placeholder_exist_with_count is deprecated, functionality moved to TaskService" + ) if placeholder_id in task_params: new_sources = task_params.get(placeholder_id, []) if isinstance(new_sources, list): @@ -88,14 +104,14 @@ def start_ffmpeg_task(ffmpeg_task): # 使用新的渲染服务 render_service = _get_render_service() result = render_service.render(ffmpeg_task) - + if result: span.set_status(Status(StatusCode.OK)) else: span.set_status(Status(StatusCode.ERROR)) - + return result - + except Exception as e: span.set_status(Status(StatusCode.ERROR)) logger.error(f"FFmpeg task failed: {e}", exc_info=True) @@ -104,7 +120,9 @@ def start_ffmpeg_task(ffmpeg_task): def clear_task_tmp_file(ffmpeg_task): """清理临时文件 - 已迁移到 TaskService._cleanup_temp_files""" - logger.warning("clear_task_tmp_file is deprecated, functionality moved to TaskService") + logger.warning( + "clear_task_tmp_file is deprecated, functionality moved to TaskService" + ) try: template_dir = os.getenv("TEMPLATE_DIR", "") output_file = ffmpeg_task.get_output_file() @@ -124,5 +142,3 @@ def probe_video_info(ffmpeg_task): """获取视频长度宽度和时长 - 使用新的渲染服务""" render_service = _get_render_service() return render_service.get_video_info(ffmpeg_task.get_output_file()) - - diff --git a/biz/task.py b/biz/task.py index 58b2baa..e2ca867 100644 --- a/biz/task.py +++ b/biz/task.py @@ -12,6 +12,7 @@ logger = logging.getLogger(__name__) # 确保服务已注册 register_default_services() + def start_task(task_info): """启动任务处理(保持向后兼容的接口)""" tracer = get_tracer(__name__) @@ -19,19 +20,19 @@ def start_task(task_info): try: # 使用服务容器获取任务服务 task_service = get_task_service() - + # 使用新的任务服务处理 result = task_service.process_task(task_info) - + if result: span.set_status(Status(StatusCode.OK)) logger.info("Task completed successfully: %s", task_info.get("id")) else: span.set_status(Status(StatusCode.ERROR)) logger.error("Task failed: %s", task_info.get("id")) - + return None # 保持原有返回值格式 - + except Exception as e: span.set_status(Status(StatusCode.ERROR)) logger.error("Task processing failed: %s", e, exc_info=True) diff --git a/config/__init__.py b/config/__init__.py index 6831a28..d96efc2 100644 --- a/config/__init__.py +++ b/config/__init__.py @@ -4,16 +4,28 @@ from logging.handlers import TimedRotatingFileHandler from dotenv import load_dotenv # 导入新的配置系统,保持向后兼容 -from .settings import get_config, get_ffmpeg_config, get_api_config, get_storage_config, get_server_config +from .settings import ( + get_config, + get_ffmpeg_config, + get_api_config, + get_storage_config, + get_server_config, +) load_dotenv() logging.basicConfig(level=logging.INFO) root_logger = logging.getLogger() -rf_handler = TimedRotatingFileHandler('all_log.log', when='midnight') -rf_handler.setFormatter(logging.Formatter("[%(asctime)s][%(name)s]%(levelname)s - %(message)s")) +rf_handler = TimedRotatingFileHandler("all_log.log", when="midnight") +rf_handler.setFormatter( + logging.Formatter("[%(asctime)s][%(name)s]%(levelname)s - %(message)s") +) rf_handler.setLevel(logging.DEBUG) -f_handler = TimedRotatingFileHandler('error.log', when='midnight') +f_handler = TimedRotatingFileHandler("error.log", when="midnight") f_handler.setLevel(logging.ERROR) -f_handler.setFormatter(logging.Formatter("[%(asctime)s][%(name)s][:%(lineno)d]%(levelname)s - - %(message)s")) +f_handler.setFormatter( + logging.Formatter( + "[%(asctime)s][%(name)s][:%(lineno)d]%(levelname)s - - %(message)s" + ) +) root_logger.addHandler(rf_handler) -root_logger.addHandler(f_handler) \ No newline at end of file +root_logger.addHandler(f_handler) diff --git a/config/settings.py b/config/settings.py index 481d33d..ab762a0 100644 --- a/config/settings.py +++ b/config/settings.py @@ -6,17 +6,19 @@ from dotenv import load_dotenv load_dotenv() + @dataclass class FFmpegConfig: """FFmpeg相关配置""" + encoder_args: List[str] - video_args: List[str] + video_args: List[str] audio_args: List[str] default_args: List[str] old_ffmpeg: bool = False re_encode_video_args: Optional[List[str]] = None re_encode_encoder_args: Optional[List[str]] = None - + # 新增配置选项,消除硬编码 max_download_workers: int = 8 progress_args: List[str] = None @@ -24,29 +26,31 @@ class FFmpegConfig: null_audio_args: List[str] = None overlay_scale_mode: str = "scale2ref" # 新版本使用scale2ref,旧版本使用scale amix_args: List[str] = None - + @classmethod - def from_env(cls) -> 'FFmpegConfig': + def from_env(cls) -> "FFmpegConfig": encoder_args = os.getenv("ENCODER_ARGS", "-c:v h264").split(" ") video_args = os.getenv("VIDEO_ARGS", "-profile:v high -level:v 4").split(" ") audio_args = ["-c:a", "aac", "-b:a", "128k", "-ar", "48000", "-ac", "2"] default_args = ["-shortest"] - + re_encode_video_args = None if os.getenv("RE_ENCODE_VIDEO_ARGS"): re_encode_video_args = os.getenv("RE_ENCODE_VIDEO_ARGS").split(" ") - - re_encode_encoder_args = None + + re_encode_encoder_args = None if os.getenv("RE_ENCODE_ENCODER_ARGS"): re_encode_encoder_args = os.getenv("RE_ENCODE_ENCODER_ARGS").split(" ") - + # 新增配置项的默认值 progress_args = ["-progress", "-"] - loglevel_args = ["-loglevel", "error"] + loglevel_args = ["-loglevel", "error"] null_audio_args = ["-f", "lavfi", "-i", "anullsrc=cl=stereo:r=48000"] amix_args = ["amix=duration=shortest:dropout_transition=0:normalize=0"] - overlay_scale_mode = "scale" if bool(os.getenv("OLD_FFMPEG", False)) else "scale2ref" - + overlay_scale_mode = ( + "scale" if bool(os.getenv("OLD_FFMPEG", False)) else "scale2ref" + ) + return cls( encoder_args=encoder_args, video_args=video_args, @@ -60,79 +64,89 @@ class FFmpegConfig: loglevel_args=loglevel_args, null_audio_args=null_audio_args, overlay_scale_mode=overlay_scale_mode, - amix_args=amix_args + amix_args=amix_args, ) + @dataclass class APIConfig: """API相关配置""" + endpoint: str access_key: str timeout: int = 10 redirect_to_url: Optional[str] = None - + @classmethod - def from_env(cls) -> 'APIConfig': - endpoint = os.getenv('API_ENDPOINT', '') + def from_env(cls) -> "APIConfig": + endpoint = os.getenv("API_ENDPOINT", "") if not endpoint: raise ValueError("API_ENDPOINT environment variable is required") - - access_key = os.getenv('ACCESS_KEY', '') + + access_key = os.getenv("ACCESS_KEY", "") if not access_key: raise ValueError("ACCESS_KEY environment variable is required") - + return cls( endpoint=endpoint, access_key=access_key, - timeout=int(os.getenv('API_TIMEOUT', '10')), - redirect_to_url=os.getenv("REDIRECT_TO_URL") or None + timeout=int(os.getenv("API_TIMEOUT", "10")), + redirect_to_url=os.getenv("REDIRECT_TO_URL") or None, ) + @dataclass class StorageConfig: """存储相关配置""" + template_dir: str - + @classmethod - def from_env(cls) -> 'StorageConfig': - template_dir = os.getenv('TEMPLATE_DIR', './template') + def from_env(cls) -> "StorageConfig": + template_dir = os.getenv("TEMPLATE_DIR", "./template") return cls(template_dir=template_dir) + @dataclass class ServerConfig: """服务器相关配置""" + host: str = "0.0.0.0" port: int = 9998 debug: bool = False - + @classmethod - def from_env(cls) -> 'ServerConfig': + def from_env(cls) -> "ServerConfig": return cls( - host=os.getenv('HOST', '0.0.0.0'), - port=int(os.getenv('PORT', '9998')), - debug=bool(os.getenv('DEBUG', False)) + host=os.getenv("HOST", "0.0.0.0"), + port=int(os.getenv("PORT", "9998")), + debug=bool(os.getenv("DEBUG", False)), ) + @dataclass class AppConfig: """应用总配置""" + ffmpeg: FFmpegConfig api: APIConfig storage: StorageConfig server: ServerConfig - + @classmethod - def from_env(cls) -> 'AppConfig': + def from_env(cls) -> "AppConfig": return cls( ffmpeg=FFmpegConfig.from_env(), api=APIConfig.from_env(), storage=StorageConfig.from_env(), - server=ServerConfig.from_env() + server=ServerConfig.from_env(), ) + # 全局配置实例 _config: Optional[AppConfig] = None + def get_config() -> AppConfig: """获取全局配置实例""" global _config @@ -140,21 +154,26 @@ def get_config() -> AppConfig: _config = AppConfig.from_env() return _config + def reload_config() -> AppConfig: """重新加载配置""" global _config _config = AppConfig.from_env() return _config + # 向后兼容的配置获取函数 def get_ffmpeg_config() -> FFmpegConfig: return get_config().ffmpeg + def get_api_config() -> APIConfig: return get_config().api + def get_storage_config() -> StorageConfig: return get_config().storage + def get_server_config() -> ServerConfig: - return get_config().server \ No newline at end of file + return get_config().server diff --git a/constant/__init__.py b/constant/__init__.py index e9bfcca..f4fa80e 100644 --- a/constant/__init__.py +++ b/constant/__init__.py @@ -1,9 +1,9 @@ SUPPORT_FEATURE = ( - 'simple_render_algo', - 'gpu_accelerate', - 'hevc_encode', - 'rapid_download', - 'rclone_upload', - 'custom_re_encode', + "simple_render_algo", + "gpu_accelerate", + "hevc_encode", + "rapid_download", + "rclone_upload", + "custom_re_encode", ) -SOFTWARE_VERSION = '0.0.5' +SOFTWARE_VERSION = "0.0.5" diff --git a/entity/__init__.py b/entity/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/entity/effects/__init__.py b/entity/effects/__init__.py index 93e89de..badc387 100644 --- a/entity/effects/__init__.py +++ b/entity/effects/__init__.py @@ -1,25 +1,25 @@ from .base import EffectProcessor, EffectRegistry from .camera_shot import CameraShotEffect -from .speed import SpeedEffect +from .speed import SpeedEffect from .zoom import ZoomEffect from .skip import SkipEffect from .tail import TailEffect # 注册所有效果处理器 registry = EffectRegistry() -registry.register('cameraShot', CameraShotEffect) -registry.register('ospeed', SpeedEffect) -registry.register('zoom', ZoomEffect) -registry.register('skip', SkipEffect) -registry.register('tail', TailEffect) +registry.register("cameraShot", CameraShotEffect) +registry.register("ospeed", SpeedEffect) +registry.register("zoom", ZoomEffect) +registry.register("skip", SkipEffect) +registry.register("tail", TailEffect) __all__ = [ - 'EffectProcessor', - 'EffectRegistry', - 'registry', - 'CameraShotEffect', - 'SpeedEffect', - 'ZoomEffect', - 'SkipEffect', - 'TailEffect' -] \ No newline at end of file + "EffectProcessor", + "EffectRegistry", + "registry", + "CameraShotEffect", + "SpeedEffect", + "ZoomEffect", + "SkipEffect", + "TailEffect", +] diff --git a/entity/effects/base.py b/entity/effects/base.py index 579753c..4b42f93 100644 --- a/entity/effects/base.py +++ b/entity/effects/base.py @@ -5,90 +5,99 @@ import logging logger = logging.getLogger(__name__) + class EffectProcessor(ABC): """效果处理器抽象基类""" - + def __init__(self, params: str = "", ext_data: Optional[Dict[str, Any]] = None): self.params = params self.ext_data = ext_data or {} self.frame_rate = 25 # 默认帧率 - + @abstractmethod def validate_params(self) -> bool: """验证参数是否有效""" pass - + @abstractmethod - def generate_filter_args(self, video_input: str, effect_index: int) -> tuple[List[str], str]: + def generate_filter_args( + self, video_input: str, effect_index: int + ) -> tuple[List[str], str]: """ 生成FFmpeg滤镜参数 - + Args: video_input: 输入视频流标识符 (例如: "[0:v]", "[v_eff1]") effect_index: 效果索引,用于生成唯一的输出标识符 - + Returns: tuple: (filter_args_list, output_stream_identifier) """ pass - + @abstractmethod def get_effect_name(self) -> str: """获取效果名称""" pass - + def parse_params(self) -> List[str]: """解析参数字符串为列表""" if not self.params: return [] - return self.params.split(',') - + return self.params.split(",") + def get_pos_json(self) -> Dict[str, Any]: """获取位置JSON数据""" - pos_json_str = self.ext_data.get('posJson', '{}') + pos_json_str = self.ext_data.get("posJson", "{}") try: - return json.loads(pos_json_str) if pos_json_str != '{}' else {} + return json.loads(pos_json_str) if pos_json_str != "{}" else {} except Exception as e: logger.warning(f"Failed to parse posJson: {e}") return {} + class EffectRegistry: """效果处理器注册表""" - + def __init__(self): self._processors: Dict[str, Type[EffectProcessor]] = {} - + def register(self, name: str, processor_class: Type[EffectProcessor]): """注册效果处理器""" if not issubclass(processor_class, EffectProcessor): raise ValueError(f"{processor_class} must be a subclass of EffectProcessor") self._processors[name] = processor_class logger.debug(f"Registered effect processor: {name}") - - def get_processor(self, effect_name: str, params: str = "", ext_data: Optional[Dict[str, Any]] = None) -> Optional[EffectProcessor]: + + def get_processor( + self, + effect_name: str, + params: str = "", + ext_data: Optional[Dict[str, Any]] = None, + ) -> Optional[EffectProcessor]: """获取效果处理器实例""" if effect_name not in self._processors: logger.warning(f"Unknown effect: {effect_name}") return None - + processor_class = self._processors[effect_name] return processor_class(params, ext_data) - + def list_effects(self) -> List[str]: """列出所有注册的效果""" return list(self._processors.keys()) - + def parse_effect_string(self, effect_string: str) -> tuple[str, str]: """ 解析效果字符串 - + Args: effect_string: 效果字符串,格式为 "effect_name:params" - + Returns: tuple: (effect_name, params) """ - if ':' in effect_string: - parts = effect_string.split(':', 2) + if ":" in effect_string: + parts = effect_string.split(":", 2) return parts[0], parts[1] if len(parts) > 1 else "" - return effect_string, "" \ No newline at end of file + return effect_string, "" diff --git a/entity/effects/camera_shot.py b/entity/effects/camera_shot.py index 6fbddfb..20036e4 100644 --- a/entity/effects/camera_shot.py +++ b/entity/effects/camera_shot.py @@ -1,22 +1,23 @@ from typing import List, Dict, Any from .base import EffectProcessor + class CameraShotEffect(EffectProcessor): """相机镜头效果处理器""" - + def validate_params(self) -> bool: """验证参数:start_time,duration,rotate_deg""" params = self.parse_params() if not params: return True # 使用默认参数 - + # 参数格式: "start_time,duration,rotate_deg" if len(params) > 3: return False - + try: for i, param in enumerate(params): - if param == '': + if param == "": continue if i == 2: # rotate_deg int(param) @@ -25,55 +26,69 @@ class CameraShotEffect(EffectProcessor): return True except ValueError: return False - - def generate_filter_args(self, video_input: str, effect_index: int) -> tuple[List[str], str]: + + def generate_filter_args( + self, video_input: str, effect_index: int + ) -> tuple[List[str], str]: """生成相机镜头效果的滤镜参数""" if not self.validate_params(): return [], video_input - + params = self.parse_params() - + # 设置默认值 start = 3.0 duration = 1.0 rotate_deg = 0 - - if len(params) >= 1 and params[0] != '': + + if len(params) >= 1 and params[0] != "": start = float(params[0]) - if len(params) >= 2 and params[1] != '': + if len(params) >= 2 and params[1] != "": duration = float(params[1]) - if len(params) >= 3 and params[2] != '': + if len(params) >= 3 and params[2] != "": rotate_deg = int(params[2]) - + filter_args = [] - + # 生成输出流标识符 start_out_str = "[eff_s]" - mid_out_str = "[eff_m]" + mid_out_str = "[eff_m]" end_out_str = "[eff_e]" final_output = f"[v_eff{effect_index}]" - + # 分割视频流为三部分 - filter_args.append(f"{video_input}split=3{start_out_str}{mid_out_str}{end_out_str}") - + filter_args.append( + f"{video_input}split=3{start_out_str}{mid_out_str}{end_out_str}" + ) + # 选择开始部分帧 - filter_args.append(f"{start_out_str}select=lt(n\\,{int(start * self.frame_rate)}){start_out_str}") - + filter_args.append( + f"{start_out_str}select=lt(n\\,{int(start * self.frame_rate)}){start_out_str}" + ) + # 选择结束部分帧 - filter_args.append(f"{end_out_str}select=gt(n\\,{int(start * self.frame_rate)}){end_out_str}") - + filter_args.append( + f"{end_out_str}select=gt(n\\,{int(start * self.frame_rate)}){end_out_str}" + ) + # 选择中间特定帧并扩展 - filter_args.append(f"{mid_out_str}select=eq(n\\,{int(start * self.frame_rate)}){mid_out_str}") - filter_args.append(f"{mid_out_str}tpad=start_mode=clone:start_duration={duration:.4f}{mid_out_str}") - + filter_args.append( + f"{mid_out_str}select=eq(n\\,{int(start * self.frame_rate)}){mid_out_str}" + ) + filter_args.append( + f"{mid_out_str}tpad=start_mode=clone:start_duration={duration:.4f}{mid_out_str}" + ) + # 如果需要旋转 if rotate_deg != 0: filter_args.append(f"{mid_out_str}rotate=PI*{rotate_deg}/180{mid_out_str}") - + # 连接三部分 - filter_args.append(f"{start_out_str}{mid_out_str}{end_out_str}concat=n=3:v=1:a=0,setpts=N/{self.frame_rate}/TB{final_output}") - + filter_args.append( + f"{start_out_str}{mid_out_str}{end_out_str}concat=n=3:v=1:a=0,setpts=N/{self.frame_rate}/TB{final_output}" + ) + return filter_args, final_output - + def get_effect_name(self) -> str: - return "cameraShot" \ No newline at end of file + return "cameraShot" diff --git a/entity/effects/skip.py b/entity/effects/skip.py index 6e85040..46c23c3 100644 --- a/entity/effects/skip.py +++ b/entity/effects/skip.py @@ -1,38 +1,41 @@ from typing import List from .base import EffectProcessor + class SkipEffect(EffectProcessor): """跳过开头效果处理器""" - + def validate_params(self) -> bool: """验证参数:跳过的秒数""" if not self.params: return True # 默认不跳过 - + try: skip_seconds = float(self.params) return skip_seconds >= 0 except ValueError: return False - - def generate_filter_args(self, video_input: str, effect_index: int) -> tuple[List[str], str]: + + def generate_filter_args( + self, video_input: str, effect_index: int + ) -> tuple[List[str], str]: """生成跳过开头效果的滤镜参数""" if not self.validate_params(): return [], video_input - + if not self.params: return [], video_input - + skip_seconds = float(self.params) if skip_seconds <= 0: return [], video_input - + output_stream = f"[v_eff{effect_index}]" - + # 使用trim滤镜跳过开头 filter_args = [f"{video_input}trim=start={skip_seconds}{output_stream}"] - + return filter_args, output_stream - + def get_effect_name(self) -> str: - return "skip" \ No newline at end of file + return "skip" diff --git a/entity/effects/speed.py b/entity/effects/speed.py index 608320f..0fdc220 100644 --- a/entity/effects/speed.py +++ b/entity/effects/speed.py @@ -1,35 +1,38 @@ from typing import List from .base import EffectProcessor + class SpeedEffect(EffectProcessor): """视频变速效果处理器""" - + def validate_params(self) -> bool: """验证参数:速度倍数""" if not self.params: return True # 默认不变速 - + try: speed = float(self.params) return speed > 0 except ValueError: return False - - def generate_filter_args(self, video_input: str, effect_index: int) -> tuple[List[str], str]: + + def generate_filter_args( + self, video_input: str, effect_index: int + ) -> tuple[List[str], str]: """生成变速效果的滤镜参数""" if not self.validate_params(): return [], video_input - + if not self.params or self.params == "1": return [], video_input # 不需要变速 - + speed = float(self.params) output_stream = f"[v_eff{effect_index}]" - + # 使用setpts进行变速 filter_args = [f"{video_input}setpts={speed}*PTS{output_stream}"] - + return filter_args, output_stream - + def get_effect_name(self) -> str: - return "ospeed" \ No newline at end of file + return "ospeed" diff --git a/entity/effects/tail.py b/entity/effects/tail.py index e06d98e..befc098 100644 --- a/entity/effects/tail.py +++ b/entity/effects/tail.py @@ -1,42 +1,45 @@ from typing import List from .base import EffectProcessor + class TailEffect(EffectProcessor): """保留末尾效果处理器""" - + def validate_params(self) -> bool: """验证参数:保留的秒数""" if not self.params: return True # 默认不截取 - + try: tail_seconds = float(self.params) return tail_seconds >= 0 except ValueError: return False - - def generate_filter_args(self, video_input: str, effect_index: int) -> tuple[List[str], str]: + + def generate_filter_args( + self, video_input: str, effect_index: int + ) -> tuple[List[str], str]: """生成保留末尾效果的滤镜参数""" if not self.validate_params(): return [], video_input - + if not self.params: return [], video_input - + tail_seconds = float(self.params) if tail_seconds <= 0: return [], video_input - + output_stream = f"[v_eff{effect_index}]" - + # 使用reverse+trim+reverse的方法来精确获取最后N秒 filter_args = [ f"{video_input}reverse[v_rev{effect_index}]", f"[v_rev{effect_index}]trim=duration={tail_seconds}[v_trim{effect_index}]", - f"[v_trim{effect_index}]reverse{output_stream}" + f"[v_trim{effect_index}]reverse{output_stream}", ] - + return filter_args, output_stream - + def get_effect_name(self) -> str: - return "tail" \ No newline at end of file + return "tail" diff --git a/entity/effects/zoom.py b/entity/effects/zoom.py index 4c2c28f..714f8ac 100644 --- a/entity/effects/zoom.py +++ b/entity/effects/zoom.py @@ -2,46 +2,47 @@ from typing import List import json from .base import EffectProcessor + class ZoomEffect(EffectProcessor): """缩放效果处理器""" - + def validate_params(self) -> bool: """验证参数:start_time,zoom_factor,duration""" params = self.parse_params() if len(params) < 3: return False - + try: start_time = float(params[0]) zoom_factor = float(params[1]) duration = float(params[2]) - - return (start_time >= 0 and - zoom_factor > 0 and - duration >= 0) + + return start_time >= 0 and zoom_factor > 0 and duration >= 0 except (ValueError, IndexError): return False - - def generate_filter_args(self, video_input: str, effect_index: int) -> tuple[List[str], str]: + + def generate_filter_args( + self, video_input: str, effect_index: int + ) -> tuple[List[str], str]: """生成缩放效果的滤镜参数""" if not self.validate_params(): return [], video_input - + params = self.parse_params() start_time = float(params[0]) - zoom_factor = float(params[1]) + zoom_factor = float(params[1]) duration = float(params[2]) - + if zoom_factor == 1: return [], video_input # 不需要缩放 - + output_stream = f"[v_eff{effect_index}]" - + # 获取缩放中心点 center_x, center_y = self._get_zoom_center() - + filter_args = [] - + if duration == 0: # 静态缩放(整个视频时长) x_expr = f"({center_x})-(ow*zoom)/2" @@ -57,24 +58,24 @@ class ZoomEffect(EffectProcessor): filter_args.append( f"{video_input}zoompan=z={zoom_expr}:x={x_expr}:y={y_expr}:d=1{output_stream}" ) - + return filter_args, output_stream - + def _get_zoom_center(self) -> tuple[str, str]: """获取缩放中心点坐标表达式""" # 默认中心点 center_x = "iw/2" center_y = "ih/2" - + pos_json = self.get_pos_json() if pos_json: - _f_x = pos_json.get('ltX', 0) - _f_x2 = pos_json.get('rbX', 0) - _f_y = pos_json.get('ltY', 0) - _f_y2 = pos_json.get('rbY', 0) - _v_w = pos_json.get('imgWidth', 1) - _v_h = pos_json.get('imgHeight', 1) - + _f_x = pos_json.get("ltX", 0) + _f_x2 = pos_json.get("rbX", 0) + _f_y = pos_json.get("ltY", 0) + _f_y2 = pos_json.get("rbY", 0) + _v_w = pos_json.get("imgWidth", 1) + _v_h = pos_json.get("imgHeight", 1) + if _v_w > 0 and _v_h > 0: # 计算坐标系统中的中心点 center_x_ratio = (_f_x + _f_x2) / (2 * _v_w) @@ -82,8 +83,8 @@ class ZoomEffect(EffectProcessor): # 转换为视频坐标系统 center_x = f"iw*{center_x_ratio:.6f}" center_y = f"ih*{center_y_ratio:.6f}" - + return center_x, center_y - + def get_effect_name(self) -> str: - return "zoom" \ No newline at end of file + return "zoom" diff --git a/entity/ffmpeg.py b/entity/ffmpeg.py index 77532b2..daecb5b 100644 --- a/entity/ffmpeg.py +++ b/entity/ffmpeg.py @@ -2,10 +2,40 @@ import os DEFAULT_ARGS = ("-shortest",) -ENCODER_ARGS = ("-c:v", "h264", ) if not os.getenv("ENCODER_ARGS", False) else os.getenv("ENCODER_ARGS", "").split(" ") -VIDEO_ARGS = ("-profile:v", "high", "-level:v", "4", ) if not os.getenv("VIDEO_ARGS", False) else os.getenv("VIDEO_ARGS", "").split(" ") -AUDIO_ARGS = ("-c:a", "aac", "-b:a", "128k", "-ar", "48000", "-ac", "2", ) -MUTE_AUDIO_INPUT = ("-f", "lavfi", "-i", "anullsrc=cl=stereo:r=48000", ) +ENCODER_ARGS = ( + ( + "-c:v", + "h264", + ) + if not os.getenv("ENCODER_ARGS", False) + else os.getenv("ENCODER_ARGS", "").split(" ") +) +VIDEO_ARGS = ( + ( + "-profile:v", + "high", + "-level:v", + "4", + ) + if not os.getenv("VIDEO_ARGS", False) + else os.getenv("VIDEO_ARGS", "").split(" ") +) +AUDIO_ARGS = ( + "-c:a", + "aac", + "-b:a", + "128k", + "-ar", + "48000", + "-ac", + "2", +) +MUTE_AUDIO_INPUT = ( + "-f", + "lavfi", + "-i", + "anullsrc=cl=stereo:r=48000", +) def get_mp4toannexb_filter(): @@ -24,8 +54,8 @@ class FfmpegTask(object): 兼容类:保留原有FfmpegTask接口用于向后兼容 实际处理逻辑已迁移到新架构,该类主要用作数据载体 """ - - def __init__(self, input_file, task_type='copy', output_file=''): + + def __init__(self, input_file, task_type="copy", output_file=""): """保持原有构造函数签名""" self.annexb = False if type(input_file) is str: @@ -52,7 +82,7 @@ class FfmpegTask(object): self.effects = [] def __repr__(self): - return f'FfmpegTask(input_file={self.input_file}, task_type={self.task_type})' + return f"FfmpegTask(input_file={self.input_file}, task_type={self.task_type})" def analyze_input_render_tasks(self): """分析输入中的子任务""" @@ -73,7 +103,7 @@ class FfmpegTask(object): def add_overlay(self, *overlays): """添加覆盖层""" for overlay in overlays: - if str(overlay).endswith('.ass'): + if str(overlay).endswith(".ass"): self.subtitles.append(overlay) else: self.overlays.append(overlay) @@ -96,7 +126,7 @@ class FfmpegTask(object): def get_output_file(self): """获取输出文件""" - if self.task_type == 'copy': + if self.task_type == "copy": return self.input_file[0] if self.input_file else "" if not self.output_file: self.set_output_file() @@ -105,29 +135,43 @@ class FfmpegTask(object): def correct_task_type(self): """校正任务类型""" if self.check_can_copy(): - self.task_type = 'copy' + self.task_type = "copy" elif self.check_can_concat(): - self.task_type = 'concat' + self.task_type = "concat" else: - self.task_type = 'encode' + self.task_type = "encode" def check_can_concat(self): """检查是否可以连接""" - return (len(self.luts) == 0 and len(self.overlays) == 0 and - len(self.subtitles) == 0 and len(self.effects) == 0 and - self.speed == 1 and self.zoom_cut is None and self.center_cut is None) + return ( + len(self.luts) == 0 + and len(self.overlays) == 0 + and len(self.subtitles) == 0 + and len(self.effects) == 0 + and self.speed == 1 + and self.zoom_cut is None + and self.center_cut is None + ) def check_can_copy(self): """检查是否可以复制""" - return (len(self.luts) == 0 and len(self.overlays) == 0 and - len(self.subtitles) == 0 and len(self.effects) == 0 and - self.speed == 1 and len(self.audios) == 0 and len(self.input_file) <= 1 and - self.zoom_cut is None and self.center_cut is None) + return ( + len(self.luts) == 0 + and len(self.overlays) == 0 + and len(self.subtitles) == 0 + and len(self.effects) == 0 + and self.speed == 1 + and len(self.audios) == 0 + and len(self.input_file) <= 1 + and self.zoom_cut is None + and self.center_cut is None + ) def set_output_file(self, file=None): """设置输出文件""" if file is None: import uuid + if self.annexb: self.output_file = f"rand_{uuid.uuid4()}.ts" else: @@ -149,12 +193,24 @@ class FfmpegTask(object): 建议使用新的 FFmpegCommandBuilder 来生成命令 """ # 简化版本,主要用于向后兼容 - if self.task_type == 'copy' and len(self.input_file) == 1: + if self.task_type == "copy" and len(self.input_file) == 1: if isinstance(self.input_file[0], str): if self.input_file[0] == self.get_output_file(): return [] - return ['-y', '-hide_banner', '-i', self.input_file[0], '-c', 'copy', self.get_output_file()] - + return [ + "-y", + "-hide_banner", + "-i", + self.input_file[0], + "-c", + "copy", + self.get_output_file(), + ] + # 对于复杂情况,返回基础命令结构 # 实际处理会在新的服务架构中完成 - return ['-y', '-hide_banner', '-i'] + self.input_file + ['-c', 'copy', self.get_output_file()] \ No newline at end of file + return ( + ["-y", "-hide_banner", "-i"] + + self.input_file + + ["-c", "copy", self.get_output_file()] + ) diff --git a/entity/ffmpeg_command_builder.py b/entity/ffmpeg_command_builder.py index 16fc3bb..fe5332b 100644 --- a/entity/ffmpeg_command_builder.py +++ b/entity/ffmpeg_command_builder.py @@ -8,25 +8,30 @@ from entity.effects import registry as effect_registry from util.exceptions import FFmpegError from util.ffmpeg import probe_video_info, probe_video_audio from util.ffmpeg_utils import ( - build_base_ffmpeg_args, build_null_audio_input, build_amix_filter, - build_overlay_scale_filter, get_annexb_filter, build_standard_output_args + build_base_ffmpeg_args, + build_null_audio_input, + build_amix_filter, + build_overlay_scale_filter, + get_annexb_filter, + build_standard_output_args, ) from util.json_utils import safe_json_loads import logging logger = logging.getLogger(__name__) + class FFmpegCommandBuilder: """FFmpeg命令构建器""" - + def __init__(self, task: RenderTask): self.task = task self.config = get_ffmpeg_config() - + def build_command(self) -> List[str]: """构建FFmpeg命令""" self.task.update_task_type() - + if self.task.task_type == TaskType.COPY: return self._build_copy_command() elif self.task.task_type == TaskType.CONCAT: @@ -35,28 +40,32 @@ class FFmpegCommandBuilder: return self._build_encode_command() else: raise FFmpegError(f"Unsupported task type: {self.task.task_type}") - + def _build_copy_command(self) -> List[str]: """构建复制命令""" if len(self.task.input_files) == 1: input_file = self.task.input_files[0] if input_file == self.task.output_file: return [] # 不需要处理 - + return [ - "ffmpeg", "-y", "-hide_banner", - "-i", self.task.input_files[0], - "-c", "copy", - self.task.output_file + "ffmpeg", + "-y", + "-hide_banner", + "-i", + self.task.input_files[0], + "-c", + "copy", + self.task.output_file, ] - + def _build_concat_command(self) -> List[str]: """构建拼接命令""" args = ["ffmpeg", "-y", "-hide_banner"] input_args = [] output_args = [*self.config.default_args] filter_args = [] - + if len(self.task.input_files) == 1: # 单个文件 file = self.task.input_files[0] @@ -70,16 +79,16 @@ class FFmpegCommandBuilder: f.write(f"file '{input_file}'\n") input_args.extend(["-f", "concat", "-safe", "0", "-i", tmp_file]) self.task.mute = not probe_video_audio(tmp_file, "concat") - + # 视频流映射 output_args.extend(["-map", "0:v", "-c:v", "copy"]) - + # 音频处理 audio_output_str = self._handle_audio_concat(input_args, filter_args) if audio_output_str: output_args.extend(["-map", audio_output_str]) output_args.extend(self.config.audio_args) - + # annexb处理 if self.task.annexb: output_args.extend(["-bsf:v", self._get_mp4toannexb_filter()]) @@ -87,170 +96,210 @@ class FFmpegCommandBuilder: output_args.extend(["-f", "mpegts"]) else: output_args.extend(["-f", "mp4"]) - - filter_complex = ["-filter_complex", ";".join(filter_args)] if filter_args else [] - - return args + input_args + filter_complex + output_args + [self.task.output_file] - + + filter_complex = ( + ["-filter_complex", ";".join(filter_args)] if filter_args else [] + ) + + return ( + args + input_args + filter_complex + output_args + [self.task.output_file] + ) + def _build_encode_command(self) -> List[str]: """构建编码命令""" args = build_base_ffmpeg_args() - + input_args = [] filter_args = [] output_args = build_standard_output_args() - + # annexb处理 if self.task.annexb: output_args.extend(["-bsf:v", get_annexb_filter()]) output_args.extend(["-reset_timestamps", "1"]) - + # 处理输入文件 for input_file in self.task.input_files: input_args.extend(["-i", input_file]) - + # 处理视频流 video_output_str = "[0:v]" effect_index = 0 - + # 处理中心裁剪 if self.task.center_cut == 1: - video_output_str, effect_index = self._add_center_cut(filter_args, video_output_str, effect_index) - - # 处理缩放裁剪 + video_output_str, effect_index = self._add_center_cut( + filter_args, video_output_str, effect_index + ) + + # 处理缩放裁剪 if self.task.zoom_cut == 1 and self.task.resolution: - video_output_str, effect_index = self._add_zoom_cut(filter_args, video_output_str, effect_index) - + video_output_str, effect_index = self._add_zoom_cut( + filter_args, video_output_str, effect_index + ) + # 处理效果 - video_output_str, effect_index = self._add_effects(filter_args, video_output_str, effect_index) - + video_output_str, effect_index = self._add_effects( + filter_args, video_output_str, effect_index + ) + # 处理分辨率 if self.task.resolution: - filter_args.append(f"{video_output_str}scale={self.task.resolution.replace('x', ':')}[v]") + filter_args.append( + f"{video_output_str}scale={self.task.resolution.replace('x', ':')}[v]" + ) video_output_str = "[v]" - + # 处理LUT for lut in self.task.luts: filter_args.append(f"{video_output_str}lut3d=file={lut}{video_output_str}") - + # 处理覆盖层 video_output_str = self._add_overlays(input_args, filter_args, video_output_str) - + # 处理字幕 for subtitle in self.task.subtitles: filter_args.append(f"{video_output_str}ass={subtitle}[v]") video_output_str = "[v]" - + # 映射视频流 output_args.extend(["-map", video_output_str]) output_args.extend(["-r", str(self.task.frame_rate)]) output_args.extend(["-fps_mode", "cfr"]) - + # 处理音频 audio_output_str = self._handle_audio_encode(input_args, filter_args) if audio_output_str: output_args.extend(["-map", audio_output_str]) - - filter_complex = ["-filter_complex", ";".join(filter_args)] if filter_args else [] - - return args + input_args + filter_complex + output_args + [self.task.output_file] - - def _add_center_cut(self, filter_args: List[str], video_input: str, effect_index: int) -> tuple[str, int]: + + filter_complex = ( + ["-filter_complex", ";".join(filter_args)] if filter_args else [] + ) + + return ( + args + input_args + filter_complex + output_args + [self.task.output_file] + ) + + def _add_center_cut( + self, filter_args: List[str], video_input: str, effect_index: int + ) -> tuple[str, int]: """添加中心裁剪""" - pos_json = self.task.ext_data.get('posJson', '{}') + pos_json = self.task.ext_data.get("posJson", "{}") pos_data = safe_json_loads(pos_json, {}) - - _v_w = pos_data.get('imgWidth', 1) - _f_x = pos_data.get('ltX', 0) - _f_x2 = pos_data.get('rbX', 0) - _x = f'{float((_f_x2 + _f_x)/(2 * _v_w)):.4f}*iw-ih*ih/(2*iw)' - - filter_args.append(f"{video_input}crop=x={_x}:y=0:w=ih*ih/iw:h=ih[v_cut{effect_index}]") + + _v_w = pos_data.get("imgWidth", 1) + _f_x = pos_data.get("ltX", 0) + _f_x2 = pos_data.get("rbX", 0) + _x = f"{float((_f_x2 + _f_x)/(2 * _v_w)):.4f}*iw-ih*ih/(2*iw)" + + filter_args.append( + f"{video_input}crop=x={_x}:y=0:w=ih*ih/iw:h=ih[v_cut{effect_index}]" + ) return f"[v_cut{effect_index}]", effect_index + 1 - - def _add_zoom_cut(self, filter_args: List[str], video_input: str, effect_index: int) -> tuple[str, int]: + + def _add_zoom_cut( + self, filter_args: List[str], video_input: str, effect_index: int + ) -> tuple[str, int]: """添加缩放裁剪""" # 获取输入视频尺寸 input_file = self.task.input_files[0] _iw, _ih, _ = probe_video_info(input_file) - - _w, _h = self.task.resolution.split('x', 1) - pos_json = self.task.ext_data.get('posJson', '{}') + + _w, _h = self.task.resolution.split("x", 1) + pos_json = self.task.ext_data.get("posJson", "{}") pos_data = safe_json_loads(pos_json, {}) - - _v_w = pos_data.get('imgWidth', 1) - _v_h = pos_data.get('imgHeight', 1) - _f_x = pos_data.get('ltX', 0) - _f_x2 = pos_data.get('rbX', 0) - _f_y = pos_data.get('ltY', 0) - _f_y2 = pos_data.get('rbY', 0) - + + _v_w = pos_data.get("imgWidth", 1) + _v_h = pos_data.get("imgHeight", 1) + _f_x = pos_data.get("ltX", 0) + _f_x2 = pos_data.get("rbX", 0) + _f_y = pos_data.get("ltY", 0) + _f_y2 = pos_data.get("rbY", 0) + _x = min(max(0, int((_f_x + _f_x2) / 2 - int(_w) / 2)), _iw - int(_w)) _y = min(max(0, int((_f_y + _f_y2) / 2 - int(_h) / 2)), _ih - int(_h)) - - filter_args.append(f"{video_input}crop=x={_x}:y={_y}:w={_w}:h={_h}[vz_cut{effect_index}]") + + filter_args.append( + f"{video_input}crop=x={_x}:y={_y}:w={_w}:h={_h}[vz_cut{effect_index}]" + ) return f"[vz_cut{effect_index}]", effect_index + 1 - - def _add_effects(self, filter_args: List[str], video_input: str, effect_index: int) -> tuple[str, int]: + + def _add_effects( + self, filter_args: List[str], video_input: str, effect_index: int + ) -> tuple[str, int]: """添加效果处理""" current_input = video_input - + for effect_str in self.task.effects: effect_name, params = effect_registry.parse_effect_string(effect_str) - processor = effect_registry.get_processor(effect_name, params, self.task.ext_data) - + processor = effect_registry.get_processor( + effect_name, params, self.task.ext_data + ) + if processor: processor.frame_rate = self.task.frame_rate - effect_filters, output_stream = processor.generate_filter_args(current_input, effect_index) - + effect_filters, output_stream = processor.generate_filter_args( + current_input, effect_index + ) + if effect_filters: filter_args.extend(effect_filters) current_input = output_stream effect_index += 1 - + return current_input, effect_index - - def _add_overlays(self, input_args: List[str], filter_args: List[str], video_input: str) -> str: + + def _add_overlays( + self, input_args: List[str], filter_args: List[str], video_input: str + ) -> str: """添加覆盖层""" current_input = video_input - + for overlay in self.task.overlays: input_index = input_args.count("-i") // 2 # 每个输入占两个参数 -i filename input_args.extend(["-i", overlay]) - + if self.config.overlay_scale_mode == "scale": filter_args.append(f"{current_input}[{input_index}:v]scale=iw:ih[v]") else: - filter_args.append(f"{current_input}[{input_index}:v]{self.config.overlay_scale_mode}=iw:ih[v]") - + filter_args.append( + f"{current_input}[{input_index}:v]{self.config.overlay_scale_mode}=iw:ih[v]" + ) + filter_args.append(f"[v][{input_index}:v]overlay=1:eof_action=endall[v]") current_input = "[v]" - + return current_input - - def _handle_audio_concat(self, input_args: List[str], filter_args: List[str]) -> Optional[str]: + + def _handle_audio_concat( + self, input_args: List[str], filter_args: List[str] + ) -> Optional[str]: """处理concat模式的音频""" audio_output_str = "" - + if self.task.mute: input_index = input_args.count("-i") // 2 input_args.extend(build_null_audio_input()) audio_output_str = f"[{input_index}:a]" else: audio_output_str = "[0:a]" - + for audio in self.task.audios: input_index = input_args.count("-i") // 2 input_args.extend(["-i", audio.replace("\\", "/")]) - filter_args.append(f"{audio_output_str}[{input_index}:a]{self.config.amix_args[0]}[a]") + filter_args.append( + f"{audio_output_str}[{input_index}:a]{self.config.amix_args[0]}[a]" + ) audio_output_str = "[a]" - + return audio_output_str.strip("[]") if audio_output_str else None - - def _handle_audio_encode(self, input_args: List[str], filter_args: List[str]) -> Optional[str]: + + def _handle_audio_encode( + self, input_args: List[str], filter_args: List[str] + ) -> Optional[str]: """处理encode模式的音频""" audio_output_str = "" - + if self.task.mute: input_index = input_args.count("-i") // 2 input_args.extend(["-f", "lavfi", "-i", "anullsrc=cl=stereo:r=48000"]) @@ -258,12 +307,13 @@ class FFmpegCommandBuilder: audio_output_str = "[a]" else: audio_output_str = "[0:a]" - + for audio in self.task.audios: input_index = input_args.count("-i") // 2 input_args.extend(["-i", audio.replace("\\", "/")]) - filter_args.append(f"{audio_output_str}[{input_index}:a]{self.config.amix_args[0]}[a]") + filter_args.append( + f"{audio_output_str}[{input_index}:a]{self.config.amix_args[0]}[a]" + ) audio_output_str = "[a]" - + return audio_output_str if audio_output_str else None - diff --git a/entity/render_task.py b/entity/render_task.py index 764292b..e4f132f 100644 --- a/entity/render_task.py +++ b/entity/render_task.py @@ -8,39 +8,42 @@ from config.settings import get_ffmpeg_config from util.exceptions import TaskValidationError, EffectError from entity.effects import registry as effect_registry + class TaskType(Enum): COPY = "copy" - CONCAT = "concat" + CONCAT = "concat" ENCODE = "encode" + @dataclass class RenderTask: """渲染任务数据类,只包含任务数据,不包含处理逻辑""" + input_files: List[str] = field(default_factory=list) output_file: str = "" task_type: TaskType = TaskType.COPY - + # 视频参数 resolution: Optional[str] = None frame_rate: int = 25 speed: float = 1.0 mute: bool = True annexb: bool = False - + # 裁剪参数 zoom_cut: Optional[int] = None center_cut: Optional[int] = None - + # 资源列表 subtitles: List[str] = field(default_factory=list) luts: List[str] = field(default_factory=list) audios: List[str] = field(default_factory=list) overlays: List[str] = field(default_factory=list) effects: List[str] = field(default_factory=list) - + # 扩展数据 ext_data: Dict[str, Any] = field(default_factory=dict) - + def __post_init__(self): """初始化后处理""" # 检测annexb格式 @@ -48,80 +51,90 @@ class RenderTask: if isinstance(input_file, str) and input_file.endswith(".ts"): self.annexb = True break - + # 自动生成输出文件名 if not self.output_file: self._generate_output_filename() - + def _generate_output_filename(self): """生成输出文件名""" if self.annexb: self.output_file = f"rand_{uuid.uuid4()}.ts" else: self.output_file = f"rand_{uuid.uuid4()}.mp4" - + def add_input_file(self, file_path: str): """添加输入文件""" self.input_files.append(file_path) if file_path.endswith(".ts"): self.annexb = True - + def add_overlay(self, *overlays: str): """添加覆盖层""" for overlay in overlays: - if overlay.endswith('.ass'): + if overlay.endswith(".ass"): self.subtitles.append(overlay) else: self.overlays.append(overlay) - + def add_audios(self, *audios: str): """添加音频""" self.audios.extend(audios) - + def add_lut(self, *luts: str): """添加LUT""" self.luts.extend(luts) - + def add_effect(self, *effects: str): """添加效果""" self.effects.extend(effects) - + def validate(self) -> bool: """验证任务参数""" if not self.input_files: raise TaskValidationError("No input files specified") - + # 验证所有效果 for effect_str in self.effects: effect_name, params = effect_registry.parse_effect_string(effect_str) - processor = effect_registry.get_processor(effect_name, params, self.ext_data) + processor = effect_registry.get_processor( + effect_name, params, self.ext_data + ) if processor and not processor.validate_params(): - raise EffectError(f"Invalid parameters for effect {effect_name}: {params}", effect_name, params) - + raise EffectError( + f"Invalid parameters for effect {effect_name}: {params}", + effect_name, + params, + ) + return True - + def can_copy(self) -> bool: """检查是否可以直接复制""" - return (len(self.luts) == 0 and - len(self.overlays) == 0 and - len(self.subtitles) == 0 and - len(self.effects) == 0 and - self.speed == 1 and - len(self.audios) == 0 and - len(self.input_files) == 1 and - self.zoom_cut is None and - self.center_cut is None) - + return ( + len(self.luts) == 0 + and len(self.overlays) == 0 + and len(self.subtitles) == 0 + and len(self.effects) == 0 + and self.speed == 1 + and len(self.audios) == 0 + and len(self.input_files) == 1 + and self.zoom_cut is None + and self.center_cut is None + ) + def can_concat(self) -> bool: """检查是否可以使用concat模式""" - return (len(self.luts) == 0 and - len(self.overlays) == 0 and - len(self.subtitles) == 0 and - len(self.effects) == 0 and - self.speed == 1 and - self.zoom_cut is None and - self.center_cut is None) - + return ( + len(self.luts) == 0 + and len(self.overlays) == 0 + and len(self.subtitles) == 0 + and len(self.effects) == 0 + and self.speed == 1 + and self.zoom_cut is None + and self.center_cut is None + ) + def determine_task_type(self) -> TaskType: """自动确定任务类型""" if self.can_copy(): @@ -130,17 +143,17 @@ class RenderTask: return TaskType.CONCAT else: return TaskType.ENCODE - + def update_task_type(self): """更新任务类型""" self.task_type = self.determine_task_type() - + def need_processing(self) -> bool: """检查是否需要处理""" if self.annexb: return True return not self.can_copy() - + def get_output_extension(self) -> str: """获取输出文件扩展名""" - return ".ts" if self.annexb else ".mp4" \ No newline at end of file + return ".ts" if self.annexb else ".mp4" diff --git a/index.py b/index.py index 10f2937..5466f40 100644 --- a/index.py +++ b/index.py @@ -18,7 +18,7 @@ register_default_services() template_service = get_template_service() # Check for redownload parameter -if 'redownload' in sys.argv: +if "redownload" in sys.argv: print("Redownloading all templates...") try: for template_name in template_service.templates.keys(): @@ -35,12 +35,13 @@ import logging LOGGER = logging.getLogger(__name__) init_opentelemetry() + def cleanup_temp_files(): """清理临时文件 - 异步执行避免阻塞主循环""" import threading - + def _cleanup(): - for file_globs in ['*.mp4', '*.ts', 'tmp_concat*.txt']: + for file_globs in ["*.mp4", "*.ts", "tmp_concat*.txt"]: for file_path in glob.glob(file_globs): try: if os.path.exists(file_path): @@ -48,34 +49,35 @@ def cleanup_temp_files(): LOGGER.debug(f"Deleted temp file: {file_path}") except Exception as e: LOGGER.warning(f"Error deleting file {file_path}: {e}") - + # 在后台线程中执行清理 threading.Thread(target=_cleanup, daemon=True).start() + def main_loop(): """主处理循环""" while True: try: print("waiting for task...") task_list = api.sync_center() - + if len(task_list) == 0: # 异步清理临时文件 cleanup_temp_files() sleep(5) continue - + for task in task_list: task_id = task.get("id", "unknown") print(f"Processing task: {task_id}") - + try: biz.task.start_task(task) LOGGER.info(f"Task {task_id} completed successfully") except Exception as e: LOGGER.error(f"Task {task_id} failed: {e}", exc_info=True) # 继续处理下一个任务而不是崩溃 - + except KeyboardInterrupt: LOGGER.info("Received shutdown signal, exiting...") break @@ -83,6 +85,7 @@ def main_loop(): LOGGER.error("Unexpected error in main loop", exc_info=e) sleep(5) # 避免快速循环消耗CPU + if __name__ == "__main__": try: main_loop() diff --git a/services/__init__.py b/services/__init__.py index 8d38cd9..d40ccb5 100644 --- a/services/__init__.py +++ b/services/__init__.py @@ -2,21 +2,25 @@ from .render_service import RenderService, DefaultRenderService from .task_service import TaskService, DefaultTaskService from .template_service import TemplateService, DefaultTemplateService from .service_container import ( - ServiceContainer, get_container, register_default_services, - get_render_service, get_template_service, get_task_service + ServiceContainer, + get_container, + register_default_services, + get_render_service, + get_template_service, + get_task_service, ) __all__ = [ - 'RenderService', - 'DefaultRenderService', - 'TaskService', - 'DefaultTaskService', - 'TemplateService', - 'DefaultTemplateService', - 'ServiceContainer', - 'get_container', - 'register_default_services', - 'get_render_service', - 'get_template_service', - 'get_task_service' -] \ No newline at end of file + "RenderService", + "DefaultRenderService", + "TaskService", + "DefaultTaskService", + "TemplateService", + "DefaultTemplateService", + "ServiceContainer", + "get_container", + "register_default_services", + "get_render_service", + "get_template_service", + "get_task_service", +] diff --git a/services/render_service.py b/services/render_service.py index 6776b1c..628532e 100644 --- a/services/render_service.py +++ b/services/render_service.py @@ -9,67 +9,76 @@ from opentelemetry.trace import Status, StatusCode from entity.render_task import RenderTask from entity.ffmpeg_command_builder import FFmpegCommandBuilder from util.exceptions import RenderError, FFmpegError -from util.ffmpeg import probe_video_info, fade_out_audio, handle_ffmpeg_output, subprocess_args +from util.ffmpeg import ( + probe_video_info, + fade_out_audio, + handle_ffmpeg_output, + subprocess_args, +) from telemetry import get_tracer logger = logging.getLogger(__name__) # 向后兼容层 - 处理旧的FfmpegTask对象 + class RenderService(ABC): """渲染服务抽象接口""" - + @abstractmethod - def render(self, task: Union[RenderTask, 'FfmpegTask']) -> bool: + def render(self, task: Union[RenderTask, "FfmpegTask"]) -> bool: """ 执行渲染任务 - + Args: task: 渲染任务 - + Returns: bool: 渲染是否成功 """ pass - + @abstractmethod def get_video_info(self, file_path: str) -> tuple[int, int, float]: """ 获取视频信息 - + Args: file_path: 视频文件路径 - + Returns: tuple: (width, height, duration) """ pass - + @abstractmethod - def fade_out_audio(self, file_path: str, duration: float, fade_seconds: float = 2.0) -> str: + def fade_out_audio( + self, file_path: str, duration: float, fade_seconds: float = 2.0 + ) -> str: """ 音频淡出处理 - + Args: file_path: 音频文件路径 duration: 音频总时长 fade_seconds: 淡出时长 - + Returns: str: 处理后的文件路径 """ pass + class DefaultRenderService(RenderService): """默认渲染服务实现""" - - def render(self, task: Union[RenderTask, 'FfmpegTask']) -> bool: + + def render(self, task: Union[RenderTask, "FfmpegTask"]) -> bool: """执行渲染任务""" # 兼容旧的FfmpegTask - if hasattr(task, 'get_ffmpeg_args'): # 这是FfmpegTask + if hasattr(task, "get_ffmpeg_args"): # 这是FfmpegTask # 使用旧的方式执行 return self._render_legacy_ffmpeg_task(task) - + tracer = get_tracer(__name__) with tracer.start_as_current_span("render_task") as span: try: @@ -78,98 +87,104 @@ class DefaultRenderService(RenderService): span.set_attribute("task.type", task.task_type.value) span.set_attribute("task.input_files", len(task.input_files)) span.set_attribute("task.output_file", task.output_file) - + # 检查是否需要处理 if not task.need_processing(): if len(task.input_files) == 1: task.output_file = task.input_files[0] span.set_status(Status(StatusCode.OK)) return True - + # 构建FFmpeg命令 builder = FFmpegCommandBuilder(task) ffmpeg_args = builder.build_command() - + if not ffmpeg_args: # 不需要处理,直接返回 if len(task.input_files) == 1: task.output_file = task.input_files[0] span.set_status(Status(StatusCode.OK)) return True - + # 执行FFmpeg命令 return self._execute_ffmpeg(ffmpeg_args, span) - + except Exception as e: span.set_status(Status(StatusCode.ERROR)) logger.error(f"Render failed: {e}", exc_info=True) raise RenderError(f"Render failed: {e}") from e - + def _execute_ffmpeg(self, args: list[str], span) -> bool: """执行FFmpeg命令""" span.set_attribute("ffmpeg.args", " ".join(args)) logger.info("Executing FFmpeg: %s", " ".join(args)) - + try: # 执行FFmpeg进程 (使用构建器已经包含的参数) process = subprocess.run( - args, - stderr=subprocess.PIPE, - **subprocess_args(True) + args, stderr=subprocess.PIPE, **subprocess_args(True) ) - + span.set_attribute("ffmpeg.return_code", process.returncode) - + # 处理输出 if process.stdout: output = handle_ffmpeg_output(process.stdout) span.set_attribute("ffmpeg.output", output) logger.info("FFmpeg output: %s", output) - + # 检查返回码 if process.returncode != 0: - error_msg = process.stderr.decode() if process.stderr else "Unknown error" + error_msg = ( + process.stderr.decode() if process.stderr else "Unknown error" + ) span.set_attribute("ffmpeg.error", error_msg) span.set_status(Status(StatusCode.ERROR)) - logger.error("FFmpeg failed with return code %d: %s", process.returncode, error_msg) + logger.error( + "FFmpeg failed with return code %d: %s", + process.returncode, + error_msg, + ) raise FFmpegError( f"FFmpeg execution failed", command=args, return_code=process.returncode, - stderr=error_msg + stderr=error_msg, ) - + # 检查输出文件 output_file = args[-1] # 输出文件总是最后一个参数 if not os.path.exists(output_file): span.set_status(Status(StatusCode.ERROR)) raise RenderError(f"Output file not created: {output_file}") - + # 检查文件大小 file_size = os.path.getsize(output_file) span.set_attribute("output.file_size", file_size) - + if file_size < 4096: # 文件过小 span.set_status(Status(StatusCode.ERROR)) raise RenderError(f"Output file too small: {file_size} bytes") - + span.set_status(Status(StatusCode.OK)) logger.info("FFmpeg execution completed successfully") return True - + except subprocess.SubprocessError as e: span.set_status(Status(StatusCode.ERROR)) logger.error("Subprocess error: %s", e) raise FFmpegError(f"Subprocess error: {e}") from e - + def get_video_info(self, file_path: str) -> tuple[int, int, float]: """获取视频信息""" return probe_video_info(file_path) - - def fade_out_audio(self, file_path: str, duration: float, fade_seconds: float = 2.0) -> str: + + def fade_out_audio( + self, file_path: str, duration: float, fade_seconds: float = 2.0 + ) -> str: """音频淡出处理""" return fade_out_audio(file_path, duration, fade_seconds) - + def _render_legacy_ffmpeg_task(self, ffmpeg_task) -> bool: """兼容处理旧的FfmpegTask""" tracer = get_tracer(__name__) @@ -180,19 +195,19 @@ class DefaultRenderService(RenderService): if not self.render(sub_task): span.set_status(Status(StatusCode.ERROR)) return False - + # 获取FFmpeg参数 ffmpeg_args = ffmpeg_task.get_ffmpeg_args() - + if not ffmpeg_args: # 不需要处理,直接返回 span.set_status(Status(StatusCode.OK)) return True - + # 执行FFmpeg命令 return self._execute_ffmpeg(ffmpeg_args, span) - + except Exception as e: span.set_status(Status(StatusCode.ERROR)) logger.error(f"Legacy FFmpeg task render failed: {e}", exc_info=True) - raise RenderError(f"Legacy render failed: {e}") from e \ No newline at end of file + raise RenderError(f"Legacy render failed: {e}") from e diff --git a/services/service_container.py b/services/service_container.py index 7af7374..cc96dc6 100644 --- a/services/service_container.py +++ b/services/service_container.py @@ -1,39 +1,43 @@ """ 服务容器模块 - 提供线程安全的服务实例管理 """ + import threading from typing import Dict, Type, TypeVar, Optional import logging logger = logging.getLogger(__name__) -T = TypeVar('T') +T = TypeVar("T") + class ServiceContainer: """线程安全的服务容器,实现依赖注入和单例管理""" - + def __init__(self): self._services: Dict[Type, object] = {} self._factories: Dict[Type, callable] = {} self._lock = threading.RLock() - + def register_singleton(self, service_type: Type[T], factory: callable) -> None: """注册单例服务工厂""" with self._lock: self._factories[service_type] = factory logger.debug(f"Registered singleton factory for {service_type.__name__}") - + def get_service(self, service_type: Type[T]) -> T: """获取服务实例(懒加载单例)""" with self._lock: # 检查是否已存在实例 if service_type in self._services: return self._services[service_type] - + # 检查是否有工厂方法 if service_type not in self._factories: - raise ValueError(f"No factory registered for service type: {service_type}") - + raise ValueError( + f"No factory registered for service type: {service_type}" + ) + # 创建新实例 factory = self._factories[service_type] try: @@ -42,14 +46,16 @@ class ServiceContainer: logger.debug(f"Created new instance of {service_type.__name__}") return instance except Exception as e: - logger.error(f"Failed to create instance of {service_type.__name__}: {e}") + logger.error( + f"Failed to create instance of {service_type.__name__}: {e}" + ) raise - + def has_service(self, service_type: Type[T]) -> bool: """检查是否有服务注册""" with self._lock: return service_type in self._factories - + def clear_cache(self, service_type: Optional[Type[T]] = None) -> None: """清理服务缓存""" with self._lock: @@ -60,10 +66,12 @@ class ServiceContainer: self._services.clear() logger.debug("Cleared all service cache") + # 全局服务容器实例 _container: Optional[ServiceContainer] = None _container_lock = threading.Lock() + def get_container() -> ServiceContainer: """获取全局服务容器实例""" global _container @@ -73,45 +81,54 @@ def get_container() -> ServiceContainer: _container = ServiceContainer() return _container + def register_default_services(): """注册默认的服务实现""" from .render_service import DefaultRenderService, RenderService from .template_service import DefaultTemplateService, TemplateService from .task_service import DefaultTaskService, TaskService - + container = get_container() - + # 注册渲染服务 container.register_singleton(RenderService, lambda: DefaultRenderService()) - + # 注册模板服务 def create_template_service(): service = DefaultTemplateService() service.load_local_templates() return service + container.register_singleton(TemplateService, create_template_service) - + # 注册任务服务(依赖其他服务) def create_task_service(): render_service = container.get_service(RenderService) template_service = container.get_service(TemplateService) return DefaultTaskService(render_service, template_service) + container.register_singleton(TaskService, create_task_service) - + logger.info("Default services registered successfully") + # 便捷函数 -def get_render_service() -> 'RenderService': +def get_render_service() -> "RenderService": """获取渲染服务实例""" from .render_service import RenderService + return get_container().get_service(RenderService) -def get_template_service() -> 'TemplateService': + +def get_template_service() -> "TemplateService": """获取模板服务实例""" from .template_service import TemplateService + return get_container().get_service(TemplateService) -def get_task_service() -> 'TaskService': + +def get_task_service() -> "TaskService": """获取任务服务实例""" from .task_service import TaskService - return get_container().get_service(TaskService) \ No newline at end of file + + return get_container().get_service(TaskService) diff --git a/services/task_service.py b/services/task_service.py index 486546b..da656e0 100644 --- a/services/task_service.py +++ b/services/task_service.py @@ -16,43 +16,49 @@ from telemetry import get_tracer logger = logging.getLogger(__name__) + class TaskService(ABC): """任务服务抽象接口""" - + @abstractmethod def process_task(self, task_info: Dict[str, Any]) -> bool: """ 处理任务 - + Args: task_info: 任务信息 - + Returns: bool: 处理是否成功 """ pass - + @abstractmethod - def create_render_task(self, task_info: Dict[str, Any], template_info: Dict[str, Any]) -> RenderTask: + def create_render_task( + self, task_info: Dict[str, Any], template_info: Dict[str, Any] + ) -> RenderTask: """ 创建渲染任务 - + Args: task_info: 任务信息 template_info: 模板信息 - + Returns: RenderTask: 渲染任务对象 """ pass + class DefaultTaskService(TaskService): """默认任务服务实现""" - - def __init__(self, render_service: RenderService, template_service: TemplateService): + + def __init__( + self, render_service: RenderService, template_service: TemplateService + ): self.render_service = render_service self.template_service = template_service - + def process_task(self, task_info: Dict[str, Any]) -> bool: """处理任务""" tracer = get_tracer(__name__) @@ -61,112 +67,128 @@ class DefaultTaskService(TaskService): # 标准化任务信息 task_info = api.normalize_task(task_info) span.set_attribute("task.id", task_info.get("id", "unknown")) - span.set_attribute("task.template_id", task_info.get("templateId", "unknown")) - + span.set_attribute( + "task.template_id", task_info.get("templateId", "unknown") + ) + # 获取模板信息 template_id = task_info.get("templateId") template_info = self.template_service.get_template(template_id) if not template_info: raise TaskError(f"Template not found: {template_id}") - + # 报告任务开始 api.report_task_start(task_info) - + # 创建渲染任务 render_task = self.create_render_task(task_info, template_info) - + # 执行渲染 success = self.render_service.render(render_task) if not success: span.set_status(Status(StatusCode.ERROR)) api.report_task_failed(task_info, "Render failed") return False - + # 获取视频信息 - width, height, duration = self.render_service.get_video_info(render_task.output_file) + width, height, duration = self.render_service.get_video_info( + render_task.output_file + ) span.set_attribute("video.width", width) span.set_attribute("video.height", height) span.set_attribute("video.duration", duration) - + # 音频淡出 - new_file = self.render_service.fade_out_audio(render_task.output_file, duration) + new_file = self.render_service.fade_out_audio( + render_task.output_file, duration + ) render_task.output_file = new_file - + # 上传文件 - 创建一个兼容对象 class TaskCompat: def __init__(self, output_file): self.output_file = output_file + def get_output_file(self): return self.output_file - + task_compat = TaskCompat(render_task.output_file) upload_success = api.upload_task_file(task_info, task_compat) if not upload_success: span.set_status(Status(StatusCode.ERROR)) api.report_task_failed(task_info, "Upload failed") return False - + # 清理临时文件 self._cleanup_temp_files(render_task) - + # 报告任务成功 - api.report_task_success(task_info, videoInfo={ - "width": width, - "height": height, - "duration": duration - }) - + api.report_task_success( + task_info, + videoInfo={"width": width, "height": height, "duration": duration}, + ) + span.set_status(Status(StatusCode.OK)) return True - + except Exception as e: span.set_status(Status(StatusCode.ERROR)) logger.error(f"Task processing failed: {e}", exc_info=True) api.report_task_failed(task_info, str(e)) return False - - def create_render_task(self, task_info: Dict[str, Any], template_info: Dict[str, Any]) -> RenderTask: + + def create_render_task( + self, task_info: Dict[str, Any], template_info: Dict[str, Any] + ) -> RenderTask: """创建渲染任务""" tracer = get_tracer(__name__) with tracer.start_as_current_span("create_render_task") as span: # 解析任务参数 task_params_str = task_info.get("taskParams", "{}") span.set_attribute("task_params", task_params_str) - + task_params = safe_json_loads(task_params_str, {}) task_params_orig = safe_json_loads(task_params_str, {}) - + if not task_params: raise TaskValidationError("Invalid or empty task params JSON") - + # 并行下载资源 self._download_resources(task_params) - + # 创建子任务列表 sub_tasks = [] only_if_usage_count = {} - + for part in template_info.get("video_parts", []): source, ext_data = self._parse_video_source( - part.get('source'), task_params, template_info + part.get("source"), task_params, template_info ) if not source: logger.warning("No video found for part: %s", part) continue - + # 检查only_if条件 - only_if = part.get('only_if', '') + only_if = part.get("only_if", "") if only_if: - only_if_usage_count[only_if] = only_if_usage_count.get(only_if, 0) + 1 + only_if_usage_count[only_if] = ( + only_if_usage_count.get(only_if, 0) + 1 + ) required_count = only_if_usage_count[only_if] - if not self._check_placeholder_exist_with_count(only_if, task_params_orig, required_count): - logger.info("Skipping part due to only_if condition: %s (need %d)", only_if, required_count) + if not self._check_placeholder_exist_with_count( + only_if, task_params_orig, required_count + ): + logger.info( + "Skipping part due to only_if condition: %s (need %d)", + only_if, + required_count, + ) continue - + # 创建子任务 sub_task = self._create_sub_task(part, source, ext_data, template_info) sub_tasks.append(sub_task) - + # 创建主任务 output_file = f"out_{task_info.get('id', 'unknown')}.mp4" main_task = RenderTask( @@ -175,28 +197,29 @@ class DefaultTaskService(TaskService): resolution=template_info.get("video_size", ""), frame_rate=template_info.get("frame_rate", 25), center_cut=template_info.get("crop_mode"), - zoom_cut=template_info.get("zoom_cut") + zoom_cut=template_info.get("zoom_cut"), ) - + # 应用整体模板设置 overall_template = template_info.get("overall_template", {}) self._apply_template_settings(main_task, overall_template, template_info) - + # 设置扩展数据 main_task.ext_data = task_info - + span.set_attribute("render_task.sub_tasks", len(sub_tasks)) span.set_attribute("render_task.effects", len(main_task.effects)) - + return main_task - + def _download_resources(self, task_params: Dict[str, Any]): """并行下载资源""" from config.settings import get_ffmpeg_config + config = get_ffmpeg_config() - + download_futures = [] - + with ThreadPoolExecutor(max_workers=config.max_download_workers) as executor: for param_list in task_params.values(): if isinstance(param_list, list): @@ -204,9 +227,11 @@ class DefaultTaskService(TaskService): url = param.get("url", "") if url.startswith("http"): _, filename = os.path.split(url) - future = executor.submit(oss.download_from_oss, url, filename, True) + future = executor.submit( + oss.download_from_oss, url, filename, True + ) download_futures.append((future, url, filename)) - + # 等待所有下载完成,并记录失败的下载 failed_downloads = [] for future, url, filename in download_futures: @@ -217,18 +242,21 @@ class DefaultTaskService(TaskService): except Exception as e: logger.warning(f"Failed to download {url}: {e}") failed_downloads.append((url, filename)) - + if failed_downloads: - logger.warning(f"Failed to download {len(failed_downloads)} resources: {[f[1] for f in failed_downloads]}") - - def _parse_video_source(self, source: str, task_params: Dict[str, Any], - template_info: Dict[str, Any]) -> tuple[Optional[str], Dict[str, Any]]: + logger.warning( + f"Failed to download {len(failed_downloads)} resources: {[f[1] for f in failed_downloads]}" + ) + + def _parse_video_source( + self, source: str, task_params: Dict[str, Any], template_info: Dict[str, Any] + ) -> tuple[Optional[str], Dict[str, Any]]: """解析视频源""" - if source.startswith('PLACEHOLDER_'): - placeholder_id = source.replace('PLACEHOLDER_', '') + if source.startswith("PLACEHOLDER_"): + placeholder_id = source.replace("PLACEHOLDER_", "") new_sources = task_params.get(placeholder_id, []) pick_source = {} - + if isinstance(new_sources, list): if len(new_sources) == 0: logger.debug("No video found for placeholder: %s", placeholder_id) @@ -236,17 +264,18 @@ class DefaultTaskService(TaskService): else: pick_source = new_sources.pop(0) new_sources = pick_source.get("url", "") - + if new_sources.startswith("http"): _, source_name = os.path.split(new_sources) oss.download_from_oss(new_sources, source_name, True) return source_name, pick_source return new_sources, pick_source - + return os.path.join(template_info.get("local_path", ""), source), {} - - def _check_placeholder_exist_with_count(self, placeholder_id: str, task_params: Dict[str, Any], - required_count: int = 1) -> bool: + + def _check_placeholder_exist_with_count( + self, placeholder_id: str, task_params: Dict[str, Any], required_count: int = 1 + ) -> bool: """检查占位符是否存在足够数量的片段""" if placeholder_id in task_params: new_sources = task_params.get(placeholder_id, []) @@ -254,9 +283,14 @@ class DefaultTaskService(TaskService): return len(new_sources) >= required_count return required_count <= 1 return False - - def _create_sub_task(self, part: Dict[str, Any], source: str, ext_data: Dict[str, Any], - template_info: Dict[str, Any]) -> RenderTask: + + def _create_sub_task( + self, + part: Dict[str, Any], + source: str, + ext_data: Dict[str, Any], + template_info: Dict[str, Any], + ) -> RenderTask: """创建子任务""" sub_task = RenderTask( input_files=[source], @@ -265,36 +299,40 @@ class DefaultTaskService(TaskService): annexb=True, center_cut=part.get("crop_mode"), zoom_cut=part.get("zoom_cut"), - ext_data=ext_data + ext_data=ext_data, ) - + # 应用部分模板设置 self._apply_template_settings(sub_task, part, template_info) - + return sub_task - - def _apply_template_settings(self, task: RenderTask, template_part: Dict[str, Any], - template_info: Dict[str, Any]): + + def _apply_template_settings( + self, + task: RenderTask, + template_part: Dict[str, Any], + template_info: Dict[str, Any], + ): """应用模板设置到任务""" # 添加效果 - for effect in template_part.get('effects', []): + for effect in template_part.get("effects", []): task.add_effect(effect) - + # 添加LUT - for lut in template_part.get('luts', []): + for lut in template_part.get("luts", []): full_path = os.path.join(template_info.get("local_path", ""), lut) task.add_lut(full_path.replace("\\", "/")) - + # 添加音频 - for audio in template_part.get('audios', []): + for audio in template_part.get("audios", []): full_path = os.path.join(template_info.get("local_path", ""), audio) task.add_audios(full_path) - + # 添加覆盖层 - for overlay in template_part.get('overlays', []): + for overlay in template_part.get("overlays", []): full_path = os.path.join(template_info.get("local_path", ""), overlay) task.add_overlay(full_path) - + def _cleanup_temp_files(self, task: RenderTask): """清理临时文件""" try: @@ -306,4 +344,4 @@ class DefaultTaskService(TaskService): else: logger.info("Skipped cleanup of template file: %s", task.output_file) except OSError as e: - logger.warning("Failed to cleanup temp file %s: %s", task.output_file, e) \ No newline at end of file + logger.warning("Failed to cleanup temp file %s: %s", task.output_file, e) diff --git a/services/template_service.py b/services/template_service.py index 0092546..f824fe6 100644 --- a/services/template_service.py +++ b/services/template_service.py @@ -6,67 +6,73 @@ from typing import Dict, Any, Optional from opentelemetry.trace import Status, StatusCode -from util.exceptions import TemplateError, TemplateNotFoundError, TemplateValidationError +from util.exceptions import ( + TemplateError, + TemplateNotFoundError, + TemplateValidationError, +) from util import api, oss from config.settings import get_storage_config from telemetry import get_tracer logger = logging.getLogger(__name__) + class TemplateService(ABC): """模板服务抽象接口""" - + @abstractmethod def get_template(self, template_id: str) -> Optional[Dict[str, Any]]: """ 获取模板信息 - + Args: template_id: 模板ID - + Returns: Dict[str, Any]: 模板信息,如果不存在则返回None """ pass - + @abstractmethod def load_local_templates(self): """加载本地模板""" pass - + @abstractmethod def download_template(self, template_id: str) -> bool: """ 下载模板 - + Args: template_id: 模板ID - + Returns: bool: 下载是否成功 """ pass - + @abstractmethod def validate_template(self, template_info: Dict[str, Any]) -> bool: """ 验证模板 - + Args: template_info: 模板信息 - + Returns: bool: 验证是否通过 """ pass + class DefaultTemplateService(TemplateService): """默认模板服务实现""" - + def __init__(self): self.templates: Dict[str, Dict[str, Any]] = {} self.storage_config = get_storage_config() - + def get_template(self, template_id: str) -> Optional[Dict[str, Any]]: """获取模板信息""" if template_id not in self.templates: @@ -74,193 +80,208 @@ class DefaultTemplateService(TemplateService): if not self.download_template(template_id): return None return self.templates.get(template_id) - + def load_local_templates(self): """加载本地模板""" template_dir = self.storage_config.template_dir if not os.path.exists(template_dir): logger.warning("Template directory does not exist: %s", template_dir) return - + for template_name in os.listdir(template_dir): if template_name.startswith("_") or template_name.startswith("."): continue - + target_path = os.path.join(template_dir, template_name) if os.path.isdir(target_path): try: self._load_template(template_name, target_path) except Exception as e: logger.error("Failed to load template %s: %s", template_name, e) - + def download_template(self, template_id: str) -> bool: """下载模板""" tracer = get_tracer(__name__) with tracer.start_as_current_span("download_template") as span: try: span.set_attribute("template.id", template_id) - + # 获取远程模板信息 template_info = api.get_template_info(template_id) if template_info is None: logger.warning("Failed to get template info: %s", template_id) return False - - local_path = template_info.get('local_path') + + local_path = template_info.get("local_path") if not local_path: - local_path = os.path.join(self.storage_config.template_dir, str(template_id)) - template_info['local_path'] = local_path - + local_path = os.path.join( + self.storage_config.template_dir, str(template_id) + ) + template_info["local_path"] = local_path + # 创建本地目录 if not os.path.isdir(local_path): os.makedirs(local_path) - + # 下载模板资源 - overall_template = template_info.get('overall_template', {}) - video_parts = template_info.get('video_parts', []) - + overall_template = template_info.get("overall_template", {}) + video_parts = template_info.get("video_parts", []) + self._download_template_assets(overall_template, template_info) for video_part in video_parts: self._download_template_assets(video_part, template_info) - + # 保存模板定义文件 - template_file = os.path.join(local_path, 'template.json') - with open(template_file, 'w', encoding='utf-8') as f: + template_file = os.path.join(local_path, "template.json") + with open(template_file, "w", encoding="utf-8") as f: json.dump(template_info, f, ensure_ascii=False, indent=2) - + # 加载到内存 self._load_template(template_id, local_path) - + span.set_status(Status(StatusCode.OK)) logger.info("Template downloaded successfully: %s", template_id) return True - + except Exception as e: span.set_status(Status(StatusCode.ERROR)) logger.error("Failed to download template %s: %s", template_id, e) return False - + def validate_template(self, template_info: Dict[str, Any]) -> bool: """验证模板""" try: local_path = template_info.get("local_path") if not local_path: raise TemplateValidationError("Template missing local_path") - + # 验证视频部分 for video_part in template_info.get("video_parts", []): self._validate_template_part(video_part, local_path) - + # 验证整体模板 overall_template = template_info.get("overall_template", {}) if overall_template: self._validate_template_part(overall_template, local_path) - + return True - + except TemplateValidationError: raise except Exception as e: raise TemplateValidationError(f"Template validation failed: {e}") - + def _load_template(self, template_name: str, local_path: str): """加载单个模板""" logger.info("Loading template: %s (%s)", template_name, local_path) - + template_def_file = os.path.join(local_path, "template.json") if not os.path.exists(template_def_file): - raise TemplateNotFoundError(f"Template definition file not found: {template_def_file}") - + raise TemplateNotFoundError( + f"Template definition file not found: {template_def_file}" + ) + try: - with open(template_def_file, 'r', encoding='utf-8') as f: + with open(template_def_file, "r", encoding="utf-8") as f: template_info = json.load(f) except json.JSONDecodeError as e: raise TemplateError(f"Invalid template JSON: {e}") - + template_info["local_path"] = local_path - + try: self.validate_template(template_info) self.templates[template_name] = template_info logger.info("Template loaded successfully: %s", template_name) except TemplateValidationError as e: - logger.error("Template validation failed for %s: %s. Attempting to re-download.", template_name, e) + logger.error( + "Template validation failed for %s: %s. Attempting to re-download.", + template_name, + e, + ) # 模板验证失败,尝试重新下载 if self.download_template(template_name): logger.info("Template re-downloaded successfully: %s", template_name) else: logger.error("Failed to re-download template: %s", template_name) raise - - def _download_template_assets(self, template_part: Dict[str, Any], template_info: Dict[str, Any]): + + def _download_template_assets( + self, template_part: Dict[str, Any], template_info: Dict[str, Any] + ): """下载模板资源""" - local_path = template_info['local_path'] - + local_path = template_info["local_path"] + # 下载源文件 - if 'source' in template_part: - source = template_part['source'] + if "source" in template_part: + source = template_part["source"] if isinstance(source, str) and source.startswith("http"): _, filename = os.path.split(source) new_file_path = os.path.join(local_path, filename) oss.download_from_oss(source, new_file_path) - + if filename.endswith(".mp4"): from util.ffmpeg import re_encode_and_annexb + new_file_path = re_encode_and_annexb(new_file_path) - - template_part['source'] = os.path.relpath(new_file_path, local_path) - + + template_part["source"] = os.path.relpath(new_file_path, local_path) + # 下载覆盖层 - if 'overlays' in template_part: - for i, overlay in enumerate(template_part['overlays']): + if "overlays" in template_part: + for i, overlay in enumerate(template_part["overlays"]): if isinstance(overlay, str) and overlay.startswith("http"): _, filename = os.path.split(overlay) oss.download_from_oss(overlay, os.path.join(local_path, filename)) - template_part['overlays'][i] = filename - + template_part["overlays"][i] = filename + # 下载LUT - if 'luts' in template_part: - for i, lut in enumerate(template_part['luts']): + if "luts" in template_part: + for i, lut in enumerate(template_part["luts"]): if isinstance(lut, str) and lut.startswith("http"): _, filename = os.path.split(lut) oss.download_from_oss(lut, os.path.join(local_path, filename)) - template_part['luts'][i] = filename - + template_part["luts"][i] = filename + # 下载音频 - if 'audios' in template_part: - for i, audio in enumerate(template_part['audios']): + if "audios" in template_part: + for i, audio in enumerate(template_part["audios"]): if isinstance(audio, str) and audio.startswith("http"): _, filename = os.path.split(audio) oss.download_from_oss(audio, os.path.join(local_path, filename)) - template_part['audios'][i] = filename - + template_part["audios"][i] = filename + def _validate_template_part(self, template_part: Dict[str, Any], base_dir: str): """验证模板部分""" # 验证源文件 source_file = template_part.get("source", "") - if source_file and not source_file.startswith("http") and not source_file.startswith("PLACEHOLDER_"): + if ( + source_file + and not source_file.startswith("http") + and not source_file.startswith("PLACEHOLDER_") + ): if not os.path.isabs(source_file): source_file = os.path.join(base_dir, source_file) if not os.path.exists(source_file): raise TemplateValidationError(f"Source file not found: {source_file}") - + # 验证音频文件 for audio in template_part.get("audios", []): if not os.path.isabs(audio): audio = os.path.join(base_dir, audio) if not os.path.exists(audio): raise TemplateValidationError(f"Audio file not found: {audio}") - + # 验证LUT文件 for lut in template_part.get("luts", []): if not os.path.isabs(lut): lut = os.path.join(base_dir, lut) if not os.path.exists(lut): raise TemplateValidationError(f"LUT file not found: {lut}") - + # 验证覆盖层文件 for overlay in template_part.get("overlays", []): if not os.path.isabs(overlay): overlay = os.path.join(base_dir, overlay) if not os.path.exists(overlay): - raise TemplateValidationError(f"Overlay file not found: {overlay}") \ No newline at end of file + raise TemplateValidationError(f"Overlay file not found: {overlay}") diff --git a/telemetry/__init__.py b/telemetry/__init__.py index d532378..6dd0883 100644 --- a/telemetry/__init__.py +++ b/telemetry/__init__.py @@ -2,36 +2,54 @@ import os from constant import SOFTWARE_VERSION from opentelemetry import trace -from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter as OTLPSpanHttpExporter -from opentelemetry.sdk.resources import DEPLOYMENT_ENVIRONMENT, HOST_NAME, Resource, SERVICE_NAME, SERVICE_VERSION +from opentelemetry.exporter.otlp.proto.http.trace_exporter import ( + OTLPSpanExporter as OTLPSpanHttpExporter, +) +from opentelemetry.sdk.resources import ( + DEPLOYMENT_ENVIRONMENT, + HOST_NAME, + Resource, + SERVICE_NAME, + SERVICE_VERSION, +) from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor, SimpleSpanProcessor from opentelemetry.instrumentation.threading import ThreadingInstrumentor ThreadingInstrumentor().instrument() + def get_tracer(name): return trace.get_tracer(name) + # 初始化 OpenTelemetry def init_opentelemetry(batch=True): # 设置服务名、主机名 - resource = Resource(attributes={ - SERVICE_NAME: "RENDER_WORKER", - SERVICE_VERSION: SOFTWARE_VERSION, - DEPLOYMENT_ENVIRONMENT: "Python", - HOST_NAME: os.getenv("ACCESS_KEY"), - }) + resource = Resource( + attributes={ + SERVICE_NAME: "RENDER_WORKER", + SERVICE_VERSION: SOFTWARE_VERSION, + DEPLOYMENT_ENVIRONMENT: "Python", + HOST_NAME: os.getenv("ACCESS_KEY"), + } + ) # 使用HTTP协议上报 if batch: - span_processor = BatchSpanProcessor(OTLPSpanHttpExporter( - endpoint="https://oltp.jerryyan.top/v1/traces", - )) + span_processor = BatchSpanProcessor( + OTLPSpanHttpExporter( + endpoint="https://oltp.jerryyan.top/v1/traces", + ) + ) else: - span_processor = SimpleSpanProcessor(OTLPSpanHttpExporter( - endpoint="https://oltp.jerryyan.top/v1/traces", - )) + span_processor = SimpleSpanProcessor( + OTLPSpanHttpExporter( + endpoint="https://oltp.jerryyan.top/v1/traces", + ) + ) - trace_provider = TracerProvider(resource=resource, active_span_processor=span_processor) + trace_provider = TracerProvider( + resource=resource, active_span_processor=span_processor + ) trace.set_tracer_provider(trace_provider) diff --git a/util/api.py b/util/api.py index b94e3a7..70ce93f 100644 --- a/util/api.py +++ b/util/api.py @@ -21,15 +21,11 @@ retry_strategy = Retry( total=3, status_forcelist=[429, 500, 502, 503, 504], backoff_factor=1, - respect_retry_after_header=True + respect_retry_after_header=True, ) # 配置HTTP适配器(连接池) -adapter = HTTPAdapter( - pool_connections=10, - pool_maxsize=20, - max_retries=retry_strategy -) +adapter = HTTPAdapter(pool_connections=10, pool_maxsize=20, max_retries=retry_strategy) session.mount("http://", adapter) session.mount("https://", adapter) @@ -51,23 +47,30 @@ def sync_center(): :return: 任务列表 """ from services import DefaultTemplateService + template_service = DefaultTemplateService() try: - response = session.post(os.getenv('API_ENDPOINT') + "/sync", json={ - 'accessKey': os.getenv('ACCESS_KEY'), - 'clientStatus': util.system.get_sys_info(), - 'templateList': [{'id': t.get('id', ''), 'updateTime': t.get('updateTime', '')} for t in - template_service.templates.values()] - }, timeout=10) + response = session.post( + os.getenv("API_ENDPOINT") + "/sync", + json={ + "accessKey": os.getenv("ACCESS_KEY"), + "clientStatus": util.system.get_sys_info(), + "templateList": [ + {"id": t.get("id", ""), "updateTime": t.get("updateTime", "")} + for t in template_service.templates.values() + ], + }, + timeout=10, + ) response.raise_for_status() except requests.RequestException as e: logger.error("请求失败!", e) return [] data = response.json() logger.debug("获取任务结果:【%s】", data) - if data.get('code', 0) == 200: - templates = data.get('data', {}).get('templates', []) - tasks = data.get('data', {}).get('tasks', []) + if data.get("code", 0) == 200: + templates = data.get("data", {}).get("templates", []) + tasks = data.get("data", {}).get("tasks", []) else: tasks = [] templates = [] @@ -75,12 +78,16 @@ def sync_center(): if os.getenv("REDIRECT_TO_URL", False) != False: for task in tasks: _sess = requests.Session() - logger.info("重定向任务【%s】至配置的地址:%s", task.get("id"), os.getenv("REDIRECT_TO_URL")) + logger.info( + "重定向任务【%s】至配置的地址:%s", + task.get("id"), + os.getenv("REDIRECT_TO_URL"), + ) url = f"{os.getenv('REDIRECT_TO_URL')}{task.get('id')}" threading.Thread(target=requests.post, args=(url,)).start() return [] for template in templates: - template_id = template.get('id', '') + template_id = template.get("id", "") if template_id: logger.info("更新模板:【%s】", template_id) template_service.download_template(template_id) @@ -100,10 +107,17 @@ def get_template_info(template_id): with tracer.start_as_current_span("get_template_info.request") as req_span: try: req_span.set_attribute("http.method", "POST") - req_span.set_attribute("http.url", '{0}/template/{1}'.format(os.getenv('API_ENDPOINT'), template_id)) - response = session.post('{0}/template/{1}'.format(os.getenv('API_ENDPOINT'), template_id), json={ - 'accessKey': os.getenv('ACCESS_KEY'), - }, timeout=10) + req_span.set_attribute( + "http.url", + "{0}/template/{1}".format(os.getenv("API_ENDPOINT"), template_id), + ) + response = session.post( + "{0}/template/{1}".format(os.getenv("API_ENDPOINT"), template_id), + json={ + "accessKey": os.getenv("ACCESS_KEY"), + }, + timeout=10, + ) req_span.set_attribute("http.status_code", response.status_code) req_span.set_attribute("http.response", response.text) response.raise_for_status() @@ -113,64 +127,68 @@ def get_template_info(template_id): return None data = response.json() logger.debug("获取模板信息结果:【%s】", data) - remote_template_info = data.get('data', {}) + remote_template_info = data.get("data", {}) if not remote_template_info: logger.warning("获取模板信息结果为空", data) return None template = { - 'id': template_id, - 'updateTime': remote_template_info.get('updateTime', template_id), - 'scenic_name': remote_template_info.get('scenicName', '景区'), - 'name': remote_template_info.get('name', '模版'), - 'video_size': remote_template_info.get('resolution', '1920x1080'), - 'frame_rate': 25, - 'overall_duration': 30, - 'video_parts': [ - - ] + "id": template_id, + "updateTime": remote_template_info.get("updateTime", template_id), + "scenic_name": remote_template_info.get("scenicName", "景区"), + "name": remote_template_info.get("name", "模版"), + "video_size": remote_template_info.get("resolution", "1920x1080"), + "frame_rate": 25, + "overall_duration": 30, + "video_parts": [], } def _template_normalizer(template_info): _template = {} - _placeholder_type = template_info.get('isPlaceholder', -1) + _placeholder_type = template_info.get("isPlaceholder", -1) if _placeholder_type == 0: # 固定视频 - _template['source'] = template_info.get('sourceUrl', '') + _template["source"] = template_info.get("sourceUrl", "") elif _placeholder_type == 1: # 占位符 - _template['source'] = "PLACEHOLDER_" + template_info.get('sourceUrl', '') - _template['mute'] = template_info.get('mute', True) - _template['crop_mode'] = template_info.get('cropEnable', None) - _template['zoom_cut'] = template_info.get('zoomCut', None) + _template["source"] = "PLACEHOLDER_" + template_info.get( + "sourceUrl", "" + ) + _template["mute"] = template_info.get("mute", True) + _template["crop_mode"] = template_info.get("cropEnable", None) + _template["zoom_cut"] = template_info.get("zoomCut", None) else: - _template['source'] = None - _overlays = template_info.get('overlays', '') + _template["source"] = None + _overlays = template_info.get("overlays", "") if _overlays: - _template['overlays'] = _overlays.split(",") - _audios = template_info.get('audios', '') + _template["overlays"] = _overlays.split(",") + _audios = template_info.get("audios", "") if _audios: - _template['audios'] = _audios.split(",") - _luts = template_info.get('luts', '') + _template["audios"] = _audios.split(",") + _luts = template_info.get("luts", "") if _luts: - _template['luts'] = _luts.split(",") - _only_if = template_info.get('onlyIf', '') + _template["luts"] = _luts.split(",") + _only_if = template_info.get("onlyIf", "") if _only_if: - _template['only_if'] = _only_if - _effects = template_info.get('effects', '') + _template["only_if"] = _only_if + _effects = template_info.get("effects", "") if _effects: - _template['effects'] = _effects.split("|") + _template["effects"] = _effects.split("|") return _template # outer template definition overall_template = _template_normalizer(remote_template_info) - template['overall_template'] = overall_template + template["overall_template"] = overall_template # inter template definition - inter_template_list = remote_template_info.get('children', []) + inter_template_list = remote_template_info.get("children", []) for children_template in inter_template_list: parts = _template_normalizer(children_template) - template['video_parts'].append(parts) - template['local_path'] = os.path.join(os.getenv('TEMPLATE_DIR'), str(template_id)) - with get_tracer("api").start_as_current_span("get_template_info.template") as res_span: + template["video_parts"].append(parts) + template["local_path"] = os.path.join( + os.getenv("TEMPLATE_DIR"), str(template_id) + ) + with get_tracer("api").start_as_current_span( + "get_template_info.template" + ) as res_span: res_span.set_attribute("normalized.response", json.dumps(template)) return template @@ -181,12 +199,19 @@ def report_task_success(task_info, **kwargs): with tracer.start_as_current_span("report_task_success.request") as req_span: try: req_span.set_attribute("http.method", "POST") - req_span.set_attribute("http.url", - '{0}/{1}/success'.format(os.getenv('API_ENDPOINT'), task_info.get("id"))) - response = session.post('{0}/{1}/success'.format(os.getenv('API_ENDPOINT'), task_info.get("id")), json={ - 'accessKey': os.getenv('ACCESS_KEY'), - **kwargs - }, timeout=10) + req_span.set_attribute( + "http.url", + "{0}/{1}/success".format( + os.getenv("API_ENDPOINT"), task_info.get("id") + ), + ) + response = session.post( + "{0}/{1}/success".format( + os.getenv("API_ENDPOINT"), task_info.get("id") + ), + json={"accessKey": os.getenv("ACCESS_KEY"), **kwargs}, + timeout=10, + ) req_span.set_attribute("http.status_code", response.status_code) req_span.set_attribute("http.response", response.text) response.raise_for_status() @@ -203,11 +228,21 @@ def report_task_start(task_info): with tracer.start_as_current_span("report_task_start.request") as req_span: try: req_span.set_attribute("http.method", "POST") - req_span.set_attribute("http.url", - '{0}/{1}/start'.format(os.getenv('API_ENDPOINT'), task_info.get("id"))) - response = session.post('{0}/{1}/start'.format(os.getenv('API_ENDPOINT'), task_info.get("id")), json={ - 'accessKey': os.getenv('ACCESS_KEY'), - }, timeout=10) + req_span.set_attribute( + "http.url", + "{0}/{1}/start".format( + os.getenv("API_ENDPOINT"), task_info.get("id") + ), + ) + response = session.post( + "{0}/{1}/start".format( + os.getenv("API_ENDPOINT"), task_info.get("id") + ), + json={ + "accessKey": os.getenv("ACCESS_KEY"), + }, + timeout=10, + ) req_span.set_attribute("http.status_code", response.status_code) req_span.set_attribute("http.response", response.text) response.raise_for_status() @@ -218,7 +253,7 @@ def report_task_start(task_info): return None -def report_task_failed(task_info, reason=''): +def report_task_failed(task_info, reason=""): tracer = get_tracer(__name__) with tracer.start_as_current_span("report_task_failed") as span: span.set_attribute("task_id", task_info.get("id")) @@ -226,12 +261,19 @@ def report_task_failed(task_info, reason=''): with tracer.start_as_current_span("report_task_failed.request") as req_span: try: req_span.set_attribute("http.method", "POST") - req_span.set_attribute("http.url", - '{0}/{1}/fail'.format(os.getenv('API_ENDPOINT'), task_info.get("id"))) - response = session.post('{0}/{1}/fail'.format(os.getenv('API_ENDPOINT'), task_info.get("id")), json={ - 'accessKey': os.getenv('ACCESS_KEY'), - 'reason': reason - }, timeout=10) + req_span.set_attribute( + "http.url", + "{0}/{1}/fail".format( + os.getenv("API_ENDPOINT"), task_info.get("id") + ), + ) + response = session.post( + "{0}/{1}/fail".format( + os.getenv("API_ENDPOINT"), task_info.get("id") + ), + json={"accessKey": os.getenv("ACCESS_KEY"), "reason": reason}, + timeout=10, + ) req_span.set_attribute("http.status_code", response.status_code) req_span.set_attribute("http.response", response.text) response.raise_for_status() @@ -248,15 +290,26 @@ def upload_task_file(task_info, ffmpeg_task): with get_tracer("api").start_as_current_span("upload_task_file") as span: logger.info("开始上传文件: %s", task_info.get("id")) span.set_attribute("file.id", task_info.get("id")) - with tracer.start_as_current_span("upload_task_file.request_upload_url") as req_span: + with tracer.start_as_current_span( + "upload_task_file.request_upload_url" + ) as req_span: try: req_span.set_attribute("http.method", "POST") - req_span.set_attribute("http.url", - '{0}/{1}/uploadUrl'.format(os.getenv('API_ENDPOINT'), task_info.get("id"))) - response = session.post('{0}/{1}/uploadUrl'.format(os.getenv('API_ENDPOINT'), task_info.get("id")), - json={ - 'accessKey': os.getenv('ACCESS_KEY'), - }, timeout=10) + req_span.set_attribute( + "http.url", + "{0}/{1}/uploadUrl".format( + os.getenv("API_ENDPOINT"), task_info.get("id") + ), + ) + response = session.post( + "{0}/{1}/uploadUrl".format( + os.getenv("API_ENDPOINT"), task_info.get("id") + ), + json={ + "accessKey": os.getenv("ACCESS_KEY"), + }, + timeout=10, + ) req_span.set_attribute("http.status_code", response.status_code) req_span.set_attribute("http.response", response.text) response.raise_for_status() @@ -267,21 +320,25 @@ def upload_task_file(task_info, ffmpeg_task): logger.error("请求失败!", e) return False data = response.json() - url = data.get('data', "") + url = data.get("data", "") logger.info("开始上传文件: %s 至 %s", task_info.get("id"), url) return oss.upload_to_oss(url, ffmpeg_task.get_output_file()) def get_task_info(id): try: - response = session.get(os.getenv('API_ENDPOINT') + "/" + id + "/info", params={ - 'accessKey': os.getenv('ACCESS_KEY'), - }, timeout=10) + response = session.get( + os.getenv("API_ENDPOINT") + "/" + id + "/info", + params={ + "accessKey": os.getenv("ACCESS_KEY"), + }, + timeout=10, + ) response.raise_for_status() except requests.RequestException as e: logger.error("请求失败!", e) return [] data = response.json() logger.debug("获取任务结果:【%s】", data) - if data.get('code', 0) == 200: - return data.get('data', {}) \ No newline at end of file + if data.get("code", 0) == 200: + return data.get("data", {}) diff --git a/util/exceptions.py b/util/exceptions.py index 9b05185..64a9156 100644 --- a/util/exceptions.py +++ b/util/exceptions.py @@ -1,72 +1,111 @@ class RenderWorkerError(Exception): """RenderWorker基础异常类""" + def __init__(self, message: str, error_code: str = None): super().__init__(message) self.message = message self.error_code = error_code or self.__class__.__name__ + class ConfigurationError(RenderWorkerError): """配置错误""" + pass + class TemplateError(RenderWorkerError): """模板相关错误""" + pass + class TemplateNotFoundError(TemplateError): """模板未找到错误""" + pass + class TemplateValidationError(TemplateError): """模板验证错误""" + pass + class TaskError(RenderWorkerError): """任务处理错误""" + pass + class TaskValidationError(TaskError): """任务参数验证错误""" + pass + class RenderError(RenderWorkerError): """渲染处理错误""" + pass + class FFmpegError(RenderError): """FFmpeg执行错误""" - def __init__(self, message: str, command: list = None, return_code: int = None, stderr: str = None): + + def __init__( + self, + message: str, + command: list = None, + return_code: int = None, + stderr: str = None, + ): super().__init__(message) self.command = command self.return_code = return_code self.stderr = stderr + class EffectError(RenderError): """效果处理错误""" - def __init__(self, message: str, effect_name: str = None, effect_params: str = None): + + def __init__( + self, message: str, effect_name: str = None, effect_params: str = None + ): super().__init__(message) self.effect_name = effect_name self.effect_params = effect_params + class StorageError(RenderWorkerError): """存储相关错误""" + pass + class APIError(RenderWorkerError): """API调用错误""" - def __init__(self, message: str, status_code: int = None, response_body: str = None): + + def __init__( + self, message: str, status_code: int = None, response_body: str = None + ): super().__init__(message) self.status_code = status_code self.response_body = response_body + class ResourceError(RenderWorkerError): """资源相关错误""" + pass + class ResourceNotFoundError(ResourceError): """资源未找到错误""" + pass + class DownloadError(ResourceError): """下载错误""" - pass \ No newline at end of file + + pass diff --git a/util/ffmpeg.py b/util/ffmpeg.py index 0cee5aa..dbe83d4 100644 --- a/util/ffmpeg.py +++ b/util/ffmpeg.py @@ -7,11 +7,19 @@ from typing import Optional, IO from opentelemetry.trace import Status, StatusCode -from entity.ffmpeg import FfmpegTask, ENCODER_ARGS, VIDEO_ARGS, AUDIO_ARGS, MUTE_AUDIO_INPUT, get_mp4toannexb_filter +from entity.ffmpeg import ( + FfmpegTask, + ENCODER_ARGS, + VIDEO_ARGS, + AUDIO_ARGS, + MUTE_AUDIO_INPUT, + get_mp4toannexb_filter, +) from telemetry import get_tracer logger = logging.getLogger(__name__) + def re_encode_and_annexb(file): with get_tracer("ffmpeg").start_as_current_span("re_encode_and_annexb") as span: span.set_attribute("file.path", file) @@ -30,40 +38,69 @@ def re_encode_and_annexb(file): _encoder_args = tuple(os.getenv("RE_ENCODE_ENCODER_ARGS", "").split(" ")) else: _encoder_args = ENCODER_ARGS - ffmpeg_process = subprocess.run(["ffmpeg", "-y", "-hide_banner", "-i", file, - *(set() if has_audio else MUTE_AUDIO_INPUT), - "-fps_mode", "cfr", - "-map", "0:v", "-map", "0:a" if has_audio else "1:a", - *_video_args, "-bsf:v", get_mp4toannexb_filter(), - *AUDIO_ARGS, "-bsf:a", "setts=pts=DTS", - *_encoder_args, "-shortest", "-fflags", "+genpts", - "-f", "mpegts", file + ".ts"]) + ffmpeg_process = subprocess.run( + [ + "ffmpeg", + "-y", + "-hide_banner", + "-i", + file, + *(set() if has_audio else MUTE_AUDIO_INPUT), + "-fps_mode", + "cfr", + "-map", + "0:v", + "-map", + "0:a" if has_audio else "1:a", + *_video_args, + "-bsf:v", + get_mp4toannexb_filter(), + *AUDIO_ARGS, + "-bsf:a", + "setts=pts=DTS", + *_encoder_args, + "-shortest", + "-fflags", + "+genpts", + "-f", + "mpegts", + file + ".ts", + ] + ) logger.info(" ".join(ffmpeg_process.args)) span.set_attribute("ffmpeg.args", json.dumps(ffmpeg_process.args)) - logger.info("ReEncodeAndAnnexb: %s, returned: %s", file, ffmpeg_process.returncode) + logger.info( + "ReEncodeAndAnnexb: %s, returned: %s", file, ffmpeg_process.returncode + ) span.set_attribute("ffmpeg.code", ffmpeg_process.returncode) if ffmpeg_process.returncode == 0: span.set_status(Status(StatusCode.OK)) - span.set_attribute("file.size", os.path.getsize(file+".ts")) + span.set_attribute("file.size", os.path.getsize(file + ".ts")) # os.remove(file) - return file+".ts" + return file + ".ts" else: span.set_status(Status(StatusCode.ERROR)) return file + # start_render函数已迁移到services/render_service.py中的DefaultRenderService # 保留原有签名用于向后兼容,但建议使用新的服务架构 + def start_render(ffmpeg_task): """ - 已迁移到新架构,建议使用 DefaultRenderService.render() + 已迁移到新架构,建议使用 DefaultRenderService.render() 保留用于向后兼容 """ - logger.warning("start_render is deprecated, use DefaultRenderService.render() instead") + logger.warning( + "start_render is deprecated, use DefaultRenderService.render() instead" + ) from services import DefaultRenderService + render_service = DefaultRenderService() return render_service.render(ffmpeg_task) + def handle_ffmpeg_output(stdout: Optional[bytes]) -> str: out_time = "0:0:0.0" if stdout is None: @@ -81,7 +118,8 @@ def handle_ffmpeg_output(stdout: Optional[bytes]) -> str: if line.startswith(b"speed="): speed = line.replace(b"speed=", b"").decode().strip() print("[ ]Speed:", out_time, "@", speed) - return out_time+"@"+speed + return out_time + "@" + speed + def duration_str_to_float(duration_str: str) -> float: _duration = datetime.strptime(duration_str, "%H:%M:%S.%f") - datetime(1900, 1, 1) @@ -94,8 +132,18 @@ def probe_video_info(video_file): span.set_attribute("video.file", video_file) # 获取宽度和高度 result = subprocess.run( - ["ffprobe", '-v', 'error', '-select_streams', 'v:0', '-show_entries', 'stream=width,height:format=duration', '-of', - 'csv=s=x:p=0', video_file], + [ + "ffprobe", + "-v", + "error", + "-select_streams", + "v:0", + "-show_entries", + "stream=width,height:format=duration", + "-of", + "csv=s=x:p=0", + video_file, + ], stderr=subprocess.STDOUT, **subprocess_args(True) ) @@ -104,14 +152,14 @@ def probe_video_info(video_file): if result.returncode != 0: span.set_status(Status(StatusCode.ERROR)) return 0, 0, 0 - all_result = result.stdout.decode('utf-8').strip() + all_result = result.stdout.decode("utf-8").strip() span.set_attribute("ffprobe.out", all_result) - if all_result == '': + if all_result == "": span.set_status(Status(StatusCode.ERROR)) return 0, 0, 0 span.set_status(Status(StatusCode.OK)) - wh, duration = all_result.split('\n') - width, height = wh.strip().split('x') + wh, duration = all_result.split("\n") + width, height = wh.strip().split("x") return int(width), int(height), float(duration) @@ -119,8 +167,19 @@ def probe_video_audio(video_file, type=None): tracer = get_tracer(__name__) with tracer.start_as_current_span("probe_video_audio") as span: span.set_attribute("video.file", video_file) - args = ["ffprobe", "-hide_banner", "-v", "error", "-select_streams", "a", "-show_entries", "stream=index", "-of", "csv=p=0"] - if type == 'concat': + args = [ + "ffprobe", + "-hide_banner", + "-v", + "error", + "-select_streams", + "a", + "-show_entries", + "stream=index", + "-of", + "csv=p=0", + ] + if type == "concat": args.append("-safe") args.append("0") args.append("-f") @@ -130,16 +189,16 @@ def probe_video_audio(video_file, type=None): result = subprocess.run(args, stderr=subprocess.STDOUT, **subprocess_args(True)) span.set_attribute("ffprobe.args", json.dumps(result.args)) span.set_attribute("ffprobe.code", result.returncode) - logger.info("probe_video_audio: %s", result.stdout.decode('utf-8').strip()) + logger.info("probe_video_audio: %s", result.stdout.decode("utf-8").strip()) if result.returncode != 0: return False - if result.stdout.decode('utf-8').strip() == '': + if result.stdout.decode("utf-8").strip() == "": return False return True # 音频淡出2秒 -def fade_out_audio(file, duration, fade_out_sec = 2): +def fade_out_audio(file, duration, fade_out_sec=2): if type(duration) == str: try: duration = float(duration) @@ -157,7 +216,25 @@ def fade_out_audio(file, duration, fade_out_sec = 2): os.remove(new_fn) logger.info("delete tmp file: " + new_fn) try: - process = subprocess.run(["ffmpeg", "-i", file, "-c:v", "copy", "-c:a", "aac", "-af", "afade=t=out:st=" + str(duration - fade_out_sec) + ":d=" + str(fade_out_sec), "-y", new_fn], **subprocess_args(True)) + process = subprocess.run( + [ + "ffmpeg", + "-i", + file, + "-c:v", + "copy", + "-c:a", + "aac", + "-af", + "afade=t=out:st=" + + str(duration - fade_out_sec) + + ":d=" + + str(fade_out_sec), + "-y", + new_fn, + ], + **subprocess_args(True) + ) span.set_attribute("ffmpeg.args", json.dumps(process.args)) logger.info(" ".join(process.args)) if process.returncode != 0: @@ -173,7 +250,6 @@ def fade_out_audio(file, duration, fade_out_sec = 2): return file - # Create a set of arguments which make a ``subprocess.Popen`` (and # variants) call work with or without Pyinstaller, ``--noconsole`` or # not, on Windows and Linux. Typical use:: @@ -186,7 +262,7 @@ def fade_out_audio(file, duration, fade_out_sec = 2): # **subprocess_args(False)) def subprocess_args(include_stdout=True): # The following is true only on Windows. - if hasattr(subprocess, 'STARTUPINFO'): + if hasattr(subprocess, "STARTUPINFO"): # On Windows, subprocess calls will pop up a command window by default # when run from Pyinstaller with the ``--noconsole`` option. Avoid this # distraction. @@ -210,7 +286,7 @@ def subprocess_args(include_stdout=True): # # So, add it only if it's needed. if include_stdout: - ret = {'stdout': subprocess.PIPE} + ret = {"stdout": subprocess.PIPE} else: ret = {} @@ -218,8 +294,5 @@ def subprocess_args(include_stdout=True): # with the ``--noconsole`` option requires redirecting everything # (stdin, stdout, stderr) to avoid an OSError exception # "[Error 6] the handle is invalid." - ret.update({'stdin': subprocess.PIPE, - 'startupinfo': si, - 'env': env}) + ret.update({"stdin": subprocess.PIPE, "startupinfo": si, "env": env}) return ret - diff --git a/util/ffmpeg_utils.py b/util/ffmpeg_utils.py index c1256f5..74a2f8b 100644 --- a/util/ffmpeg_utils.py +++ b/util/ffmpeg_utils.py @@ -1,16 +1,18 @@ """ FFmpeg工具模块 - 提供FFmpeg命令构建和处理的公共函数 """ + import logging from typing import List, Tuple, Optional from config.settings import get_ffmpeg_config logger = logging.getLogger(__name__) + def build_base_ffmpeg_args() -> List[str]: """ 构建基础FFmpeg参数 - + Returns: 基础参数列表 """ @@ -20,40 +22,45 @@ def build_base_ffmpeg_args() -> List[str]: args.extend(config.loglevel_args) return args + def build_null_audio_input() -> List[str]: """ 构建空音频输入参数 - + Returns: 空音频输入参数列表 """ config = get_ffmpeg_config() return config.null_audio_args + def build_amix_filter(input1: str, input2: str, output: str) -> str: """ 构建音频混合滤镜 - + Args: input1: 第一个音频输入 - input2: 第二个音频输入 + input2: 第二个音频输入 output: 输出流名称 - + Returns: 混合滤镜字符串 """ config = get_ffmpeg_config() return f"{input1}[{input2}]{config.amix_args[0]}[{output}]" -def build_overlay_scale_filter(video_input: str, overlay_input: str, output: str) -> str: + +def build_overlay_scale_filter( + video_input: str, overlay_input: str, output: str +) -> str: """ 构建覆盖层缩放滤镜 - + Args: video_input: 视频输入流 overlay_input: 覆盖层输入流 output: 输出流名称 - + Returns: 缩放滤镜字符串 """ @@ -61,12 +68,15 @@ def build_overlay_scale_filter(video_input: str, overlay_input: str, output: str if config.overlay_scale_mode == "scale": return f"{video_input}[{overlay_input}]scale=iw:ih[{output}]" else: - return f"{video_input}[{overlay_input}]{config.overlay_scale_mode}=iw:ih[{output}]" + return ( + f"{video_input}[{overlay_input}]{config.overlay_scale_mode}=iw:ih[{output}]" + ) + def get_annexb_filter() -> str: """ 获取annexb转换滤镜 - + Returns: annexb滤镜名称 """ @@ -76,10 +86,11 @@ def get_annexb_filter() -> str: return "hevc_mp4toannexb" return "h264_mp4toannexb" + def build_standard_output_args() -> List[str]: """ 构建标准输出参数 - + Returns: 输出参数列表 """ @@ -88,40 +99,63 @@ def build_standard_output_args() -> List[str]: *config.video_args, *config.audio_args, *config.encoder_args, - *config.default_args + *config.default_args, ] + def validate_ffmpeg_file_extensions(file_path: str) -> bool: """ 验证文件扩展名是否为FFmpeg支持的格式 - + Args: file_path: 文件路径 - + Returns: 是否为支持的格式 """ supported_extensions = { - '.mp4', '.avi', '.mov', '.mkv', '.flv', '.wmv', '.webm', - '.ts', '.m2ts', '.mts', '.m4v', '.3gp', '.asf', '.rm', - '.mp3', '.wav', '.aac', '.flac', '.ogg', '.m4a', '.wma' + ".mp4", + ".avi", + ".mov", + ".mkv", + ".flv", + ".wmv", + ".webm", + ".ts", + ".m2ts", + ".mts", + ".m4v", + ".3gp", + ".asf", + ".rm", + ".mp3", + ".wav", + ".aac", + ".flac", + ".ogg", + ".m4a", + ".wma", } - + import os + _, ext = os.path.splitext(file_path.lower()) return ext in supported_extensions -def estimate_processing_time(input_duration: float, complexity_factor: float = 1.0) -> float: + +def estimate_processing_time( + input_duration: float, complexity_factor: float = 1.0 +) -> float: """ 估算处理时间 - + Args: input_duration: 输入文件时长(秒) complexity_factor: 复杂度因子(1.0为普通处理) - + Returns: 预估处理时间(秒) """ # 基础处理速度假设为实时的0.5倍(即处理1秒视频需要2秒) base_processing_ratio = 2.0 - return input_duration * base_processing_ratio * complexity_factor \ No newline at end of file + return input_duration * base_processing_ratio * complexity_factor diff --git a/util/json_utils.py b/util/json_utils.py index e9f79e6..603e531 100644 --- a/util/json_utils.py +++ b/util/json_utils.py @@ -1,41 +1,46 @@ """ JSON处理工具模块 - 提供安全的JSON解析和处理功能 """ + import json import logging from typing import Dict, Any, Optional, Union logger = logging.getLogger(__name__) + def safe_json_loads(json_str: Union[str, bytes], default: Any = None) -> Any: """ 安全解析JSON字符串 - + Args: json_str: JSON字符串 default: 解析失败时返回的默认值 - + Returns: 解析后的对象,或默认值 """ - if not json_str or json_str == '{}': + if not json_str or json_str == "{}": return default or {} - + try: return json.loads(json_str) except (json.JSONDecodeError, TypeError) as e: logger.warning(f"Failed to parse JSON: {e}, input: {json_str}") return default or {} -def safe_json_dumps(obj: Any, indent: Optional[int] = None, ensure_ascii: bool = False) -> str: + +def safe_json_dumps( + obj: Any, indent: Optional[int] = None, ensure_ascii: bool = False +) -> str: """ 安全序列化对象为JSON字符串 - + Args: obj: 要序列化的对象 indent: 缩进空格数 ensure_ascii: 是否确保ASCII编码 - + Returns: JSON字符串 """ @@ -45,43 +50,45 @@ def safe_json_dumps(obj: Any, indent: Optional[int] = None, ensure_ascii: bool = logger.error(f"Failed to serialize to JSON: {e}") return "{}" + def get_nested_value(data: Dict[str, Any], key_path: str, default: Any = None) -> Any: """ 从嵌套字典中安全获取值 - + Args: data: 字典数据 key_path: 键路径,用点分隔(如 "user.profile.name") default: 默认值 - + Returns: 找到的值或默认值 """ if not isinstance(data, dict): return default - + try: - keys = key_path.split('.') + keys = key_path.split(".") current = data - + for key in keys: if isinstance(current, dict) and key in current: current = current[key] else: return default - + return current except Exception as e: logger.warning(f"Failed to get nested value for path '{key_path}': {e}") return default + def merge_dicts(*dicts: Dict[str, Any]) -> Dict[str, Any]: """ 合并多个字典,后面的字典会覆盖前面的字典中相同的键 - + Args: *dicts: 要合并的字典 - + Returns: 合并后的字典 """ @@ -89,4 +96,4 @@ def merge_dicts(*dicts: Dict[str, Any]) -> Dict[str, Any]: for d in dicts: if isinstance(d, dict): result.update(d) - return result \ No newline at end of file + return result diff --git a/util/oss.py b/util/oss.py index d3832c6..2f8c30a 100644 --- a/util/oss.py +++ b/util/oss.py @@ -31,12 +31,14 @@ def upload_to_oss(url, file_path): if replace_map != "": replace_list = [i.split("|", 1) for i in replace_map.split(",")] new_url = url - for (_src, _dst) in replace_list: + for _src, _dst in replace_list: new_url = new_url.replace(_src, _dst) new_url = new_url.split("?", 1)[0] r_span.set_attribute("rclone.target_dir", new_url) if new_url != url: - result = os.system(f"rclone copyto --no-check-dest --ignore-existing --multi-thread-chunk-size 8M --multi-thread-streams 8 {file_path} {new_url}") + result = os.system( + f"rclone copyto --no-check-dest --ignore-existing --multi-thread-chunk-size 8M --multi-thread-streams 8 {file_path} {new_url}" + ) r_span.set_attribute("rclone.result", result) if result == 0: span.set_status(Status(StatusCode.OK)) @@ -49,8 +51,14 @@ def upload_to_oss(url, file_path): try: req_span.set_attribute("http.method", "PUT") req_span.set_attribute("http.url", url) - with open(file_path, 'rb') as f: - response = requests.put(url, data=f, stream=True, timeout=60, headers={"Content-Type": "video/mp4"}) + with open(file_path, "rb") as f: + response = requests.put( + url, + data=f, + stream=True, + timeout=60, + headers={"Content-Type": "video/mp4"}, + ) req_span.set_attribute("http.status_code", response.status_code) req_span.set_attribute("http.response", response.text) response.raise_for_status() @@ -61,12 +69,16 @@ def upload_to_oss(url, file_path): req_span.set_attribute("http.error", "Timeout") req_span.set_status(Status(StatusCode.ERROR)) retries += 1 - logger.warning(f"Upload timed out. Retrying {retries}/{max_retries}...") + logger.warning( + f"Upload timed out. Retrying {retries}/{max_retries}..." + ) except Exception as e: req_span.set_attribute("http.error", str(e)) req_span.set_status(Status(StatusCode.ERROR)) retries += 1 - logger.warning(f"Upload failed. Retrying {retries}/{max_retries}...") + logger.warning( + f"Upload failed. Retrying {retries}/{max_retries}..." + ) span.set_status(Status(StatusCode.ERROR)) return False @@ -83,11 +95,11 @@ def download_from_oss(url, file_path, skip_if_exist=None): with tracer.start_as_current_span("download_from_oss") as span: span.set_attribute("file.url", url) span.set_attribute("file.path", file_path) - + # 如果skip_if_exist为None,则从启动参数中读取 if skip_if_exist is None: - skip_if_exist = 'skip_if_exist' in sys.argv - + skip_if_exist = "skip_if_exist" in sys.argv + if skip_if_exist and os.path.exists(file_path): span.set_attribute("file.exist", True) span.set_attribute("file.size", os.path.getsize(file_path)) @@ -107,7 +119,7 @@ def download_from_oss(url, file_path, skip_if_exist=None): req_span.set_attribute("http.url", url) response = requests.get(url, timeout=15) # 设置超时时间 req_span.set_attribute("http.status_code", response.status_code) - with open(file_path, 'wb') as f: + with open(file_path, "wb") as f: f.write(response.content) req_span.set_attribute("file.size", os.path.getsize(file_path)) req_span.set_status(Status(StatusCode.OK)) @@ -117,11 +129,15 @@ def download_from_oss(url, file_path, skip_if_exist=None): req_span.set_attribute("http.error", "Timeout") req_span.set_status(Status(StatusCode.ERROR)) retries += 1 - logger.warning(f"Download timed out. Retrying {retries}/{max_retries}...") + logger.warning( + f"Download timed out. Retrying {retries}/{max_retries}..." + ) except Exception as e: req_span.set_attribute("http.error", str(e)) req_span.set_status(Status(StatusCode.ERROR)) retries += 1 - logger.warning(f"Download failed. Retrying {retries}/{max_retries}...") + logger.warning( + f"Download failed. Retrying {retries}/{max_retries}..." + ) span.set_status(Status(StatusCode.ERROR)) return False diff --git a/util/system.py b/util/system.py index 7596d53..476634b 100644 --- a/util/system.py +++ b/util/system.py @@ -11,14 +11,14 @@ def get_sys_info(): Returns a dictionary with system information. """ info = { - 'version': SOFTWARE_VERSION, - 'client_datetime': datetime.now().isoformat(), - 'platform': platform.system(), - 'runtime_version': 'Python ' + platform.python_version(), - 'cpu_count': os.cpu_count(), - 'cpu_usage': psutil.cpu_percent(), - 'memory_total': psutil.virtual_memory().total, - 'memory_available': psutil.virtual_memory().available, - 'support_feature': SUPPORT_FEATURE + "version": SOFTWARE_VERSION, + "client_datetime": datetime.now().isoformat(), + "platform": platform.system(), + "runtime_version": "Python " + platform.python_version(), + "cpu_count": os.cpu_count(), + "cpu_usage": psutil.cpu_percent(), + "memory_total": psutil.virtual_memory().total, + "memory_available": psutil.virtual_memory().available, + "support_feature": SUPPORT_FEATURE, } return info