From 6d37e7c23c17dc045028f8dbec7fa5de506bf5d3 Mon Sep 17 00:00:00 2001 From: Jerry Yan <792602257@qq.com> Date: Wed, 24 Sep 2025 04:51:12 +0800 Subject: [PATCH] refactor --- app.py | 48 +++++++++--- biz/task.py | 29 ++----- config/settings.py | 23 +++++- entity/ffmpeg_command_builder.py | 46 +++++------ index.py | 89 +++++++++++++++------- services/__init__.py | 12 ++- services/render_service.py | 4 +- services/service_container.py | 117 ++++++++++++++++++++++++++++ services/task_service.py | 36 +++++++-- util/api.py | 26 +++++++ util/ffmpeg_utils.py | 127 +++++++++++++++++++++++++++++++ util/json_utils.py | 92 ++++++++++++++++++++++ 12 files changed, 548 insertions(+), 101 deletions(-) create mode 100644 services/service_container.py create mode 100644 util/ffmpeg_utils.py create mode 100644 util/json_utils.py diff --git a/app.py b/app.py index fe1acfb..df50743 100644 --- a/app.py +++ b/app.py @@ -8,9 +8,12 @@ from services import DefaultTemplateService from telemetry import init_opentelemetry from util import api -# 使用新的服务架构 -template_service = DefaultTemplateService() -template_service.load_local_templates() +# 使用新的服务容器架构 +from services.service_container import get_template_service, register_default_services + +# 确保服务已注册 +register_default_services() +template_service = get_template_service() import logging @@ -28,14 +31,37 @@ def do_nothing(): @app.post('/') def do_task(task_id): - task_info = api.get_task_info(task_id) - local_template_info = template_service.get_template(task_info.get("templateId")) - template_info = api.get_template_info(task_info.get("templateId")) - if local_template_info: - if local_template_info.get("updateTime") != template_info.get("updateTime"): - template_service.download_template(task_info.get("templateId")) - biz.task.start_task(task_info) - return "OK" + try: + task_info = api.get_task_info(task_id) + if not task_info: + LOGGER.error("Failed to get task info for task: %s", task_id) + return "Failed to get task info", 400 + + template_id = task_info.get("templateId") + if not template_id: + LOGGER.error("Task %s missing templateId", task_id) + return "Missing templateId", 400 + + local_template_info = template_service.get_template(template_id) + template_info = api.get_template_info(template_id) + + if not template_info: + LOGGER.error("Failed to get template info for template: %s", template_id) + return "Failed to get template info", 400 + + if local_template_info: + if local_template_info.get("updateTime") != template_info.get("updateTime"): + LOGGER.info("Template %s needs update, downloading...", template_id) + if not template_service.download_template(template_id): + LOGGER.error("Failed to download template: %s", template_id) + return "Failed to download template", 500 + + biz.task.start_task(task_info) + return "OK" + + except Exception as e: + LOGGER.error("Error processing task %s: %s", task_id, e, exc_info=True) + return "Internal server error", 500 if __name__ == '__main__': diff --git a/biz/task.py b/biz/task.py index a99c54d..58b2baa 100644 --- a/biz/task.py +++ b/biz/task.py @@ -3,39 +3,22 @@ import logging from opentelemetry.trace import Status, StatusCode -# 使用新的服务架构 -from services import DefaultTaskService, DefaultRenderService, DefaultTemplateService +# 使用新的服务容器架构 +from services.service_container import get_task_service, register_default_services from telemetry import get_tracer logger = logging.getLogger(__name__) -# 创建服务实例(单例模式) -_render_service = None -_template_service = None -_task_service = None - -def _get_services(): - """获取服务实例(懒加载)""" - global _render_service, _template_service, _task_service - - if _render_service is None: - _render_service = DefaultRenderService() - - if _template_service is None: - _template_service = DefaultTemplateService() - _template_service.load_local_templates() # 加载本地模板 - - if _task_service is None: - _task_service = DefaultTaskService(_render_service, _template_service) - - return _task_service, _render_service, _template_service +# 确保服务已注册 +register_default_services() def start_task(task_info): """启动任务处理(保持向后兼容的接口)""" tracer = get_tracer(__name__) with tracer.start_as_current_span("start_task_legacy") as span: try: - task_service, _, _ = _get_services() + # 使用服务容器获取任务服务 + task_service = get_task_service() # 使用新的任务服务处理 result = task_service.process_task(task_info) diff --git a/config/settings.py b/config/settings.py index a070af3..481d33d 100644 --- a/config/settings.py +++ b/config/settings.py @@ -17,6 +17,14 @@ class FFmpegConfig: re_encode_video_args: Optional[List[str]] = None re_encode_encoder_args: Optional[List[str]] = None + # 新增配置选项,消除硬编码 + max_download_workers: int = 8 + progress_args: List[str] = None + loglevel_args: List[str] = None + null_audio_args: List[str] = None + overlay_scale_mode: str = "scale2ref" # 新版本使用scale2ref,旧版本使用scale + amix_args: List[str] = None + @classmethod def from_env(cls) -> 'FFmpegConfig': encoder_args = os.getenv("ENCODER_ARGS", "-c:v h264").split(" ") @@ -32,6 +40,13 @@ class FFmpegConfig: if os.getenv("RE_ENCODE_ENCODER_ARGS"): re_encode_encoder_args = os.getenv("RE_ENCODE_ENCODER_ARGS").split(" ") + # 新增配置项的默认值 + progress_args = ["-progress", "-"] + loglevel_args = ["-loglevel", "error"] + null_audio_args = ["-f", "lavfi", "-i", "anullsrc=cl=stereo:r=48000"] + amix_args = ["amix=duration=shortest:dropout_transition=0:normalize=0"] + overlay_scale_mode = "scale" if bool(os.getenv("OLD_FFMPEG", False)) else "scale2ref" + return cls( encoder_args=encoder_args, video_args=video_args, @@ -39,7 +54,13 @@ class FFmpegConfig: default_args=default_args, old_ffmpeg=bool(os.getenv("OLD_FFMPEG", False)), re_encode_video_args=re_encode_video_args, - re_encode_encoder_args=re_encode_encoder_args + re_encode_encoder_args=re_encode_encoder_args, + max_download_workers=int(os.getenv("MAX_DOWNLOAD_WORKERS", "8")), + progress_args=progress_args, + loglevel_args=loglevel_args, + null_audio_args=null_audio_args, + overlay_scale_mode=overlay_scale_mode, + amix_args=amix_args ) @dataclass diff --git a/entity/ffmpeg_command_builder.py b/entity/ffmpeg_command_builder.py index b2025f0..16fc3bb 100644 --- a/entity/ffmpeg_command_builder.py +++ b/entity/ffmpeg_command_builder.py @@ -1,4 +1,3 @@ -import json import os import time from typing import List, Optional @@ -8,6 +7,11 @@ from entity.render_task import RenderTask, TaskType from entity.effects import registry as effect_registry from util.exceptions import FFmpegError from util.ffmpeg import probe_video_info, probe_video_audio +from util.ffmpeg_utils import ( + build_base_ffmpeg_args, build_null_audio_input, build_amix_filter, + build_overlay_scale_filter, get_annexb_filter, build_standard_output_args +) +from util.json_utils import safe_json_loads import logging logger = logging.getLogger(__name__) @@ -90,19 +94,15 @@ class FFmpegCommandBuilder: def _build_encode_command(self) -> List[str]: """构建编码命令""" - args = ["ffmpeg", "-y", "-hide_banner"] + args = build_base_ffmpeg_args() + input_args = [] filter_args = [] - output_args = [ - *self.config.video_args, - *self.config.audio_args, - *self.config.encoder_args, - *self.config.default_args - ] + output_args = build_standard_output_args() # annexb处理 if self.task.annexb: - output_args.extend(["-bsf:v", self._get_mp4toannexb_filter()]) + output_args.extend(["-bsf:v", get_annexb_filter()]) output_args.extend(["-reset_timestamps", "1"]) # 处理输入文件 @@ -158,10 +158,7 @@ class FFmpegCommandBuilder: def _add_center_cut(self, filter_args: List[str], video_input: str, effect_index: int) -> tuple[str, int]: """添加中心裁剪""" pos_json = self.task.ext_data.get('posJson', '{}') - try: - pos_data = json.loads(pos_json) if pos_json != '{}' else {} - except: - pos_data = {} + pos_data = safe_json_loads(pos_json, {}) _v_w = pos_data.get('imgWidth', 1) _f_x = pos_data.get('ltX', 0) @@ -179,10 +176,7 @@ class FFmpegCommandBuilder: _w, _h = self.task.resolution.split('x', 1) pos_json = self.task.ext_data.get('posJson', '{}') - try: - pos_data = json.loads(pos_json) if pos_json != '{}' else {} - except: - pos_data = {} + pos_data = safe_json_loads(pos_json, {}) _v_w = pos_data.get('imgWidth', 1) _v_h = pos_data.get('imgHeight', 1) @@ -224,10 +218,10 @@ class FFmpegCommandBuilder: input_index = input_args.count("-i") // 2 # 每个输入占两个参数 -i filename input_args.extend(["-i", overlay]) - if self.config.old_ffmpeg: - filter_args.append(f"{current_input}[{input_index}:v]scale2ref=iw:ih[v]") + if self.config.overlay_scale_mode == "scale": + filter_args.append(f"{current_input}[{input_index}:v]scale=iw:ih[v]") else: - filter_args.append(f"{current_input}[{input_index}:v]scale=rw:rh[v]") + filter_args.append(f"{current_input}[{input_index}:v]{self.config.overlay_scale_mode}=iw:ih[v]") filter_args.append(f"[v][{input_index}:v]overlay=1:eof_action=endall[v]") current_input = "[v]" @@ -240,7 +234,7 @@ class FFmpegCommandBuilder: if self.task.mute: input_index = input_args.count("-i") // 2 - input_args.extend(["-f", "lavfi", "-i", "anullsrc=cl=stereo:r=48000"]) + input_args.extend(build_null_audio_input()) audio_output_str = f"[{input_index}:a]" else: audio_output_str = "[0:a]" @@ -248,7 +242,7 @@ class FFmpegCommandBuilder: for audio in self.task.audios: input_index = input_args.count("-i") // 2 input_args.extend(["-i", audio.replace("\\", "/")]) - filter_args.append(f"{audio_output_str}[{input_index}:a]amix=duration=shortest:dropout_transition=0:normalize=0[a]") + filter_args.append(f"{audio_output_str}[{input_index}:a]{self.config.amix_args[0]}[a]") audio_output_str = "[a]" return audio_output_str.strip("[]") if audio_output_str else None @@ -268,14 +262,8 @@ class FFmpegCommandBuilder: for audio in self.task.audios: input_index = input_args.count("-i") // 2 input_args.extend(["-i", audio.replace("\\", "/")]) - filter_args.append(f"{audio_output_str}[{input_index}:a]amix=duration=shortest:dropout_transition=0:normalize=0[a]") + filter_args.append(f"{audio_output_str}[{input_index}:a]{self.config.amix_args[0]}[a]") audio_output_str = "[a]" return audio_output_str if audio_output_str else None - def _get_mp4toannexb_filter(self) -> str: - """获取mp4toannexb滤镜""" - encoder_args_str = " ".join(self.config.encoder_args).lower() - if "hevc" in encoder_args_str: - return "hevc_mp4toannexb" - return "h264_mp4toannexb" \ No newline at end of file diff --git a/index.py b/index.py index 398182e..10f2937 100644 --- a/index.py +++ b/index.py @@ -10,45 +10,82 @@ from util import api import os import glob -# 使用新的服务架构 -template_service = DefaultTemplateService() -template_service.load_local_templates() +# 使用新的服务容器架构 +from services.service_container import get_template_service, register_default_services + +# 确保服务已注册 +register_default_services() +template_service = get_template_service() # Check for redownload parameter if 'redownload' in sys.argv: print("Redownloading all templates...") - for template_name in template_service.templates.keys(): - print(f"Redownloading template: {template_name}") - template_service.download_template(template_name) - print("All templates redownloaded successfully!") + try: + for template_name in template_service.templates.keys(): + print(f"Redownloading template: {template_name}") + if not template_service.download_template(template_name): + print(f"Failed to download template: {template_name}") + print("Template redownload process completed!") + except Exception as e: + print(f"Error during template redownload: {e}") + sys.exit(1) sys.exit(0) import logging LOGGER = logging.getLogger(__name__) init_opentelemetry() -while True: - # print(get_sys_info()) - print("waiting for task...") - try: - task_list = api.sync_center() - except Exception as e: - LOGGER.error("sync_center error", exc_info=e) - sleep(5) - continue - if len(task_list) == 0: - # 删除当前文件夹下所有以.mp4、.ts结尾的文件 +def cleanup_temp_files(): + """清理临时文件 - 异步执行避免阻塞主循环""" + import threading + + def _cleanup(): for file_globs in ['*.mp4', '*.ts', 'tmp_concat*.txt']: for file_path in glob.glob(file_globs): try: - os.remove(file_path) - print(f"Deleted file: {file_path}") + if os.path.exists(file_path): + os.remove(file_path) + LOGGER.debug(f"Deleted temp file: {file_path}") except Exception as e: - LOGGER.error(f"Error deleting file {file_path}", exc_info=e) - sleep(5) - for task in task_list: - print("start task:", task) + LOGGER.warning(f"Error deleting file {file_path}: {e}") + + # 在后台线程中执行清理 + threading.Thread(target=_cleanup, daemon=True).start() + +def main_loop(): + """主处理循环""" + while True: try: - biz.task.start_task(task) + print("waiting for task...") + task_list = api.sync_center() + + if len(task_list) == 0: + # 异步清理临时文件 + cleanup_temp_files() + sleep(5) + continue + + for task in task_list: + task_id = task.get("id", "unknown") + print(f"Processing task: {task_id}") + + try: + biz.task.start_task(task) + LOGGER.info(f"Task {task_id} completed successfully") + except Exception as e: + LOGGER.error(f"Task {task_id} failed: {e}", exc_info=True) + # 继续处理下一个任务而不是崩溃 + + except KeyboardInterrupt: + LOGGER.info("Received shutdown signal, exiting...") + break except Exception as e: - LOGGER.error("task_start error", exc_info=e) + LOGGER.error("Unexpected error in main loop", exc_info=e) + sleep(5) # 避免快速循环消耗CPU + +if __name__ == "__main__": + try: + main_loop() + except Exception as e: + LOGGER.critical("Critical error in main process", exc_info=e) + sys.exit(1) diff --git a/services/__init__.py b/services/__init__.py index 9410dd3..8d38cd9 100644 --- a/services/__init__.py +++ b/services/__init__.py @@ -1,6 +1,10 @@ from .render_service import RenderService, DefaultRenderService from .task_service import TaskService, DefaultTaskService from .template_service import TemplateService, DefaultTemplateService +from .service_container import ( + ServiceContainer, get_container, register_default_services, + get_render_service, get_template_service, get_task_service +) __all__ = [ 'RenderService', @@ -8,5 +12,11 @@ __all__ = [ 'TaskService', 'DefaultTaskService', 'TemplateService', - 'DefaultTemplateService' + 'DefaultTemplateService', + 'ServiceContainer', + 'get_container', + 'register_default_services', + 'get_render_service', + 'get_template_service', + 'get_task_service' ] \ No newline at end of file diff --git a/services/render_service.py b/services/render_service.py index 6a689a0..6776b1c 100644 --- a/services/render_service.py +++ b/services/render_service.py @@ -111,9 +111,9 @@ class DefaultRenderService(RenderService): logger.info("Executing FFmpeg: %s", " ".join(args)) try: - # 执行FFmpeg进程 + # 执行FFmpeg进程 (使用构建器已经包含的参数) process = subprocess.run( - ["ffmpeg", "-progress", "-", "-loglevel", "error"] + args[1:], + args, stderr=subprocess.PIPE, **subprocess_args(True) ) diff --git a/services/service_container.py b/services/service_container.py new file mode 100644 index 0000000..7af7374 --- /dev/null +++ b/services/service_container.py @@ -0,0 +1,117 @@ +""" +服务容器模块 - 提供线程安全的服务实例管理 +""" +import threading +from typing import Dict, Type, TypeVar, Optional +import logging + +logger = logging.getLogger(__name__) + +T = TypeVar('T') + +class ServiceContainer: + """线程安全的服务容器,实现依赖注入和单例管理""" + + def __init__(self): + self._services: Dict[Type, object] = {} + self._factories: Dict[Type, callable] = {} + self._lock = threading.RLock() + + def register_singleton(self, service_type: Type[T], factory: callable) -> None: + """注册单例服务工厂""" + with self._lock: + self._factories[service_type] = factory + logger.debug(f"Registered singleton factory for {service_type.__name__}") + + def get_service(self, service_type: Type[T]) -> T: + """获取服务实例(懒加载单例)""" + with self._lock: + # 检查是否已存在实例 + if service_type in self._services: + return self._services[service_type] + + # 检查是否有工厂方法 + if service_type not in self._factories: + raise ValueError(f"No factory registered for service type: {service_type}") + + # 创建新实例 + factory = self._factories[service_type] + try: + instance = factory() + self._services[service_type] = instance + logger.debug(f"Created new instance of {service_type.__name__}") + return instance + except Exception as e: + logger.error(f"Failed to create instance of {service_type.__name__}: {e}") + raise + + def has_service(self, service_type: Type[T]) -> bool: + """检查是否有服务注册""" + with self._lock: + return service_type in self._factories + + def clear_cache(self, service_type: Optional[Type[T]] = None) -> None: + """清理服务缓存""" + with self._lock: + if service_type: + self._services.pop(service_type, None) + logger.debug(f"Cleared cache for {service_type.__name__}") + else: + self._services.clear() + logger.debug("Cleared all service cache") + +# 全局服务容器实例 +_container: Optional[ServiceContainer] = None +_container_lock = threading.Lock() + +def get_container() -> ServiceContainer: + """获取全局服务容器实例""" + global _container + if _container is None: + with _container_lock: + if _container is None: + _container = ServiceContainer() + return _container + +def register_default_services(): + """注册默认的服务实现""" + from .render_service import DefaultRenderService, RenderService + from .template_service import DefaultTemplateService, TemplateService + from .task_service import DefaultTaskService, TaskService + + container = get_container() + + # 注册渲染服务 + container.register_singleton(RenderService, lambda: DefaultRenderService()) + + # 注册模板服务 + def create_template_service(): + service = DefaultTemplateService() + service.load_local_templates() + return service + container.register_singleton(TemplateService, create_template_service) + + # 注册任务服务(依赖其他服务) + def create_task_service(): + render_service = container.get_service(RenderService) + template_service = container.get_service(TemplateService) + return DefaultTaskService(render_service, template_service) + container.register_singleton(TaskService, create_task_service) + + logger.info("Default services registered successfully") + +# 便捷函数 +def get_render_service() -> 'RenderService': + """获取渲染服务实例""" + from .render_service import RenderService + return get_container().get_service(RenderService) + +def get_template_service() -> 'TemplateService': + """获取模板服务实例""" + from .template_service import TemplateService + return get_container().get_service(TemplateService) + +def get_task_service() -> 'TaskService': + """获取任务服务实例""" + from .task_service import TaskService + return get_container().get_service(TaskService) \ No newline at end of file diff --git a/services/task_service.py b/services/task_service.py index 7a5ff9e..486546b 100644 --- a/services/task_service.py +++ b/services/task_service.py @@ -1,4 +1,3 @@ -import json import logging import os from abc import ABC, abstractmethod @@ -12,6 +11,7 @@ from services.render_service import RenderService from services.template_service import TemplateService from util.exceptions import TaskError, TaskValidationError from util import api, oss +from util.json_utils import safe_json_loads from telemetry import get_tracer logger = logging.getLogger(__name__) @@ -133,11 +133,11 @@ class DefaultTaskService(TaskService): task_params_str = task_info.get("taskParams", "{}") span.set_attribute("task_params", task_params_str) - try: - task_params = json.loads(task_params_str) - task_params_orig = json.loads(task_params_str) - except json.JSONDecodeError as e: - raise TaskValidationError(f"Invalid task params JSON: {e}") + task_params = safe_json_loads(task_params_str, {}) + task_params_orig = safe_json_loads(task_params_str, {}) + + if not task_params: + raise TaskValidationError("Invalid or empty task params JSON") # 并行下载资源 self._download_resources(task_params) @@ -192,14 +192,34 @@ class DefaultTaskService(TaskService): def _download_resources(self, task_params: Dict[str, Any]): """并行下载资源""" - with ThreadPoolExecutor(max_workers=8) as executor: + from config.settings import get_ffmpeg_config + config = get_ffmpeg_config() + + download_futures = [] + + with ThreadPoolExecutor(max_workers=config.max_download_workers) as executor: for param_list in task_params.values(): if isinstance(param_list, list): for param in param_list: url = param.get("url", "") if url.startswith("http"): _, filename = os.path.split(url) - executor.submit(oss.download_from_oss, url, filename, True) + future = executor.submit(oss.download_from_oss, url, filename, True) + download_futures.append((future, url, filename)) + + # 等待所有下载完成,并记录失败的下载 + failed_downloads = [] + for future, url, filename in download_futures: + try: + result = future.result(timeout=30) # 30秒超时 + if not result: + failed_downloads.append((url, filename)) + except Exception as e: + logger.warning(f"Failed to download {url}: {e}") + failed_downloads.append((url, filename)) + + if failed_downloads: + logger.warning(f"Failed to download {len(failed_downloads)} resources: {[f[1] for f in failed_downloads]}") def _parse_video_source(self, source: str, task_params: Dict[str, Any], template_info: Dict[str, Any]) -> tuple[Optional[str], Dict[str, Any]]: diff --git a/util/api.py b/util/api.py index 5d75583..b94e3a7 100644 --- a/util/api.py +++ b/util/api.py @@ -2,6 +2,9 @@ import json import logging import os import threading +import time +from urllib3.util.retry import Retry +from requests.adapters import HTTPAdapter import requests from opentelemetry.trace import Status, StatusCode @@ -10,7 +13,30 @@ import util.system from telemetry import get_tracer from util import oss +# 创建带有连接池和重试策略的会话 session = requests.Session() + +# 配置重试策略 +retry_strategy = Retry( + total=3, + status_forcelist=[429, 500, 502, 503, 504], + backoff_factor=1, + respect_retry_after_header=True +) + +# 配置HTTP适配器(连接池) +adapter = HTTPAdapter( + pool_connections=10, + pool_maxsize=20, + max_retries=retry_strategy +) + +session.mount("http://", adapter) +session.mount("https://", adapter) + +# 设置默认超时 +session.timeout = 30 + logger = logging.getLogger(__name__) diff --git a/util/ffmpeg_utils.py b/util/ffmpeg_utils.py new file mode 100644 index 0000000..c1256f5 --- /dev/null +++ b/util/ffmpeg_utils.py @@ -0,0 +1,127 @@ +""" +FFmpeg工具模块 - 提供FFmpeg命令构建和处理的公共函数 +""" +import logging +from typing import List, Tuple, Optional +from config.settings import get_ffmpeg_config + +logger = logging.getLogger(__name__) + +def build_base_ffmpeg_args() -> List[str]: + """ + 构建基础FFmpeg参数 + + Returns: + 基础参数列表 + """ + config = get_ffmpeg_config() + args = ["ffmpeg", "-y", "-hide_banner"] + args.extend(config.progress_args) + args.extend(config.loglevel_args) + return args + +def build_null_audio_input() -> List[str]: + """ + 构建空音频输入参数 + + Returns: + 空音频输入参数列表 + """ + config = get_ffmpeg_config() + return config.null_audio_args + +def build_amix_filter(input1: str, input2: str, output: str) -> str: + """ + 构建音频混合滤镜 + + Args: + input1: 第一个音频输入 + input2: 第二个音频输入 + output: 输出流名称 + + Returns: + 混合滤镜字符串 + """ + config = get_ffmpeg_config() + return f"{input1}[{input2}]{config.amix_args[0]}[{output}]" + +def build_overlay_scale_filter(video_input: str, overlay_input: str, output: str) -> str: + """ + 构建覆盖层缩放滤镜 + + Args: + video_input: 视频输入流 + overlay_input: 覆盖层输入流 + output: 输出流名称 + + Returns: + 缩放滤镜字符串 + """ + config = get_ffmpeg_config() + if config.overlay_scale_mode == "scale": + return f"{video_input}[{overlay_input}]scale=iw:ih[{output}]" + else: + return f"{video_input}[{overlay_input}]{config.overlay_scale_mode}=iw:ih[{output}]" + +def get_annexb_filter() -> str: + """ + 获取annexb转换滤镜 + + Returns: + annexb滤镜名称 + """ + config = get_ffmpeg_config() + encoder_args_str = " ".join(config.encoder_args).lower() + if "hevc" in encoder_args_str: + return "hevc_mp4toannexb" + return "h264_mp4toannexb" + +def build_standard_output_args() -> List[str]: + """ + 构建标准输出参数 + + Returns: + 输出参数列表 + """ + config = get_ffmpeg_config() + return [ + *config.video_args, + *config.audio_args, + *config.encoder_args, + *config.default_args + ] + +def validate_ffmpeg_file_extensions(file_path: str) -> bool: + """ + 验证文件扩展名是否为FFmpeg支持的格式 + + Args: + file_path: 文件路径 + + Returns: + 是否为支持的格式 + """ + supported_extensions = { + '.mp4', '.avi', '.mov', '.mkv', '.flv', '.wmv', '.webm', + '.ts', '.m2ts', '.mts', '.m4v', '.3gp', '.asf', '.rm', + '.mp3', '.wav', '.aac', '.flac', '.ogg', '.m4a', '.wma' + } + + import os + _, ext = os.path.splitext(file_path.lower()) + return ext in supported_extensions + +def estimate_processing_time(input_duration: float, complexity_factor: float = 1.0) -> float: + """ + 估算处理时间 + + Args: + input_duration: 输入文件时长(秒) + complexity_factor: 复杂度因子(1.0为普通处理) + + Returns: + 预估处理时间(秒) + """ + # 基础处理速度假设为实时的0.5倍(即处理1秒视频需要2秒) + base_processing_ratio = 2.0 + return input_duration * base_processing_ratio * complexity_factor \ No newline at end of file diff --git a/util/json_utils.py b/util/json_utils.py new file mode 100644 index 0000000..e9f79e6 --- /dev/null +++ b/util/json_utils.py @@ -0,0 +1,92 @@ +""" +JSON处理工具模块 - 提供安全的JSON解析和处理功能 +""" +import json +import logging +from typing import Dict, Any, Optional, Union + +logger = logging.getLogger(__name__) + +def safe_json_loads(json_str: Union[str, bytes], default: Any = None) -> Any: + """ + 安全解析JSON字符串 + + Args: + json_str: JSON字符串 + default: 解析失败时返回的默认值 + + Returns: + 解析后的对象,或默认值 + """ + if not json_str or json_str == '{}': + return default or {} + + try: + return json.loads(json_str) + except (json.JSONDecodeError, TypeError) as e: + logger.warning(f"Failed to parse JSON: {e}, input: {json_str}") + return default or {} + +def safe_json_dumps(obj: Any, indent: Optional[int] = None, ensure_ascii: bool = False) -> str: + """ + 安全序列化对象为JSON字符串 + + Args: + obj: 要序列化的对象 + indent: 缩进空格数 + ensure_ascii: 是否确保ASCII编码 + + Returns: + JSON字符串 + """ + try: + return json.dumps(obj, indent=indent, ensure_ascii=ensure_ascii) + except (TypeError, ValueError) as e: + logger.error(f"Failed to serialize to JSON: {e}") + return "{}" + +def get_nested_value(data: Dict[str, Any], key_path: str, default: Any = None) -> Any: + """ + 从嵌套字典中安全获取值 + + Args: + data: 字典数据 + key_path: 键路径,用点分隔(如 "user.profile.name") + default: 默认值 + + Returns: + 找到的值或默认值 + """ + if not isinstance(data, dict): + return default + + try: + keys = key_path.split('.') + current = data + + for key in keys: + if isinstance(current, dict) and key in current: + current = current[key] + else: + return default + + return current + except Exception as e: + logger.warning(f"Failed to get nested value for path '{key_path}': {e}") + return default + +def merge_dicts(*dicts: Dict[str, Any]) -> Dict[str, Any]: + """ + 合并多个字典,后面的字典会覆盖前面的字典中相同的键 + + Args: + *dicts: 要合并的字典 + + Returns: + 合并后的字典 + """ + result = {} + for d in dicts: + if isinstance(d, dict): + result.update(d) + return result \ No newline at end of file