From 24de32e6bb67e9f5ebc2605ac6906af4a7f50f79 Mon Sep 17 00:00:00 2001 From: Jerry Yan <792602257@qq.com> Date: Sun, 11 Jan 2026 21:14:02 +0800 Subject: [PATCH] =?UTF-8?q?feat(render):=20=E5=AE=9E=E7=8E=B0=E6=B8=B2?= =?UTF-8?q?=E6=9F=93=E7=B3=BB=E7=BB=9Fv2=E6=A0=B8=E5=BF=83=E6=9E=B6?= =?UTF-8?q?=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 添加v2支持的任务类型常量定义 - 更新软件版本至0.0.9 - 定义v2统一音视频编码参数 - 实现系统信息工具get_sys_info_v2方法 - 新增get_capabilities和_get_gpu_info功能 - 创建core模块及TaskHandler抽象基类 - 添加渲染系统设计文档包括集群架构、v2 PRD和Worker PRD - 实现任务处理器抽象基类及接口规范 --- constant/__init__.py | 39 +++- core/__init__.py | 12 ++ core/handler.py | 79 ++++++++ domain/__init__.py | 24 +++ domain/config.py | 122 +++++++++++++ domain/result.py | 105 +++++++++++ domain/task.py | 249 +++++++++++++++++++++++++ handlers/__init__.py | 20 ++ handlers/base.py | 280 ++++++++++++++++++++++++++++ handlers/finalize_mp4.py | 190 +++++++++++++++++++ handlers/package_ts.py | 175 ++++++++++++++++++ handlers/prepare_audio.py | 251 ++++++++++++++++++++++++++ handlers/render_video.py | 274 ++++++++++++++++++++++++++++ index_v2.py | 188 +++++++++++++++++++ services/__init__.py | 16 ++ services/api_client.py | 371 ++++++++++++++++++++++++++++++++++++++ services/lease_service.py | 110 +++++++++++ services/task_executor.py | 234 ++++++++++++++++++++++++ util/system.py | 76 +++++++- 19 files changed, 2812 insertions(+), 3 deletions(-) create mode 100644 core/__init__.py create mode 100644 core/handler.py create mode 100644 domain/__init__.py create mode 100644 domain/config.py create mode 100644 domain/result.py create mode 100644 domain/task.py create mode 100644 handlers/__init__.py create mode 100644 handlers/base.py create mode 100644 handlers/finalize_mp4.py create mode 100644 handlers/package_ts.py create mode 100644 handlers/prepare_audio.py create mode 100644 handlers/render_video.py create mode 100644 index_v2.py create mode 100644 services/__init__.py create mode 100644 services/api_client.py create mode 100644 services/lease_service.py create mode 100644 services/task_executor.py diff --git a/constant/__init__.py b/constant/__init__.py index 2ee3d22..f9f1aaf 100644 --- a/constant/__init__.py +++ b/constant/__init__.py @@ -1,3 +1,9 @@ +# -*- coding: utf-8 -*- +""" +常量定义 +""" + +# v1 支持的特性 SUPPORT_FEATURE = ( 'simple_render_algo', 'gpu_accelerate', @@ -6,4 +12,35 @@ SUPPORT_FEATURE = ( 'rclone_upload', 'custom_re_encode', ) -SOFTWARE_VERSION = '0.0.8' + +# 软件版本 +SOFTWARE_VERSION = '0.0.9' + +# v2 支持的任务类型 +V2_TASK_TYPES = ( + 'RENDER_SEGMENT_VIDEO', + 'PREPARE_JOB_AUDIO', + 'PACKAGE_SEGMENT_TS', + 'FINALIZE_MP4', +) + +# v2 默认能力 +V2_DEFAULT_CAPABILITIES = list(V2_TASK_TYPES) + +# v2 统一视频编码参数 +V2_VIDEO_ENCODE_PARAMS = { + 'codec': 'libx264', + 'preset': 'medium', + 'profile': 'main', + 'level': '4.0', + 'crf': '23', + 'pix_fmt': 'yuv420p', +} + +# v2 统一音频编码参数 +V2_AUDIO_ENCODE_PARAMS = { + 'codec': 'aac', + 'bitrate': '128k', + 'sample_rate': '48000', + 'channels': '2', +} diff --git a/core/__init__.py b/core/__init__.py new file mode 100644 index 0000000..7e43d60 --- /dev/null +++ b/core/__init__.py @@ -0,0 +1,12 @@ +# -*- coding: utf-8 -*- +""" +核心抽象层 + +包含任务处理器抽象基类等核心接口定义。 +""" + +from core.handler import TaskHandler + +__all__ = [ + 'TaskHandler', +] diff --git a/core/handler.py b/core/handler.py new file mode 100644 index 0000000..41d5d70 --- /dev/null +++ b/core/handler.py @@ -0,0 +1,79 @@ +# -*- coding: utf-8 -*- +""" +任务处理器抽象基类 + +定义任务处理器的接口规范。 +""" + +from abc import ABC, abstractmethod +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from domain.task import Task, TaskType + from domain.result import TaskResult + + +class TaskHandler(ABC): + """ + 任务处理器抽象基类 + + 所有任务处理器都必须继承此类并实现相应方法。 + """ + + @abstractmethod + def handle(self, task: 'Task') -> 'TaskResult': + """ + 处理任务的主方法 + + Args: + task: 任务实体 + + Returns: + TaskResult: 任务结果(成功或失败) + """ + pass + + @abstractmethod + def get_supported_type(self) -> 'TaskType': + """ + 返回此处理器支持的任务类型 + + Returns: + TaskType: 支持的任务类型枚举值 + """ + pass + + def before_handle(self, task: 'Task') -> None: + """ + 处理前钩子(可选重写) + + 用于任务执行前的准备工作,如日志记录、资源检查等。 + + Args: + task: 任务实体 + """ + pass + + def after_handle(self, task: 'Task', result: 'TaskResult') -> None: + """ + 处理后钩子(可选重写) + + 用于任务执行后的清理工作,如资源释放、统计记录等。 + + Args: + task: 任务实体 + result: 任务结果 + """ + pass + + def validate_task(self, task: 'Task') -> bool: + """ + 验证任务是否有效(可选重写) + + Args: + task: 任务实体 + + Returns: + bool: 任务是否有效 + """ + return True diff --git a/domain/__init__.py b/domain/__init__.py new file mode 100644 index 0000000..968e6df --- /dev/null +++ b/domain/__init__.py @@ -0,0 +1,24 @@ +# -*- coding: utf-8 -*- +""" +领域模型层 + +包含任务实体、结果、配置等核心数据结构。 +""" + +from domain.task import Task, TaskType, TaskStatus, RenderSpec, OutputSpec, AudioSpec, AudioProfile +from domain.result import TaskResult, ErrorCode, RETRY_CONFIG +from domain.config import WorkerConfig + +__all__ = [ + 'Task', + 'TaskType', + 'TaskStatus', + 'RenderSpec', + 'OutputSpec', + 'AudioSpec', + 'AudioProfile', + 'TaskResult', + 'ErrorCode', + 'RETRY_CONFIG', + 'WorkerConfig', +] diff --git a/domain/config.py b/domain/config.py new file mode 100644 index 0000000..59b450e --- /dev/null +++ b/domain/config.py @@ -0,0 +1,122 @@ +# -*- coding: utf-8 -*- +""" +Worker 配置模型 + +定义 Worker 运行时的配置参数。 +""" + +import os +from dataclasses import dataclass, field +from typing import List, Optional + + +# 默认支持的任务类型 +DEFAULT_CAPABILITIES = [ + "RENDER_SEGMENT_VIDEO", + "PREPARE_JOB_AUDIO", + "PACKAGE_SEGMENT_TS", + "FINALIZE_MP4" +] + + +@dataclass +class WorkerConfig: + """ + Worker 配置 + + 包含 Worker 运行所需的所有配置参数。 + """ + # API 配置 + api_endpoint: str + access_key: str + worker_id: str + + # 并发控制 + max_concurrency: int = 4 + + # 心跳配置 + heartbeat_interval: int = 5 # 秒 + + # 租约配置 + lease_extension_threshold: int = 60 # 秒,提前多久续期 + lease_extension_duration: int = 300 # 秒,每次续期时长 + + # 目录配置 + temp_dir: str = "/tmp/render_worker" + + # 能力配置 + capabilities: List[str] = field(default_factory=lambda: DEFAULT_CAPABILITIES.copy()) + + # FFmpeg 配置 + ffmpeg_timeout: int = 3600 # 秒,FFmpeg 执行超时 + + # 下载/上传配置 + download_timeout: int = 300 # 秒,下载超时 + upload_timeout: int = 600 # 秒,上传超时 + + @classmethod + def from_env(cls) -> 'WorkerConfig': + """从环境变量创建配置""" + + # API 端点,优先使用 V2 版本 + api_endpoint = os.getenv('API_ENDPOINT_V2') or os.getenv('API_ENDPOINT', '') + if not api_endpoint: + raise ValueError("API_ENDPOINT_V2 or API_ENDPOINT environment variable is required") + + # Access Key + access_key = os.getenv('ACCESS_KEY', '') + if not access_key: + raise ValueError("ACCESS_KEY environment variable is required") + + # Worker ID + worker_id = os.getenv('WORKER_ID', '100001') + + # 并发数 + max_concurrency = int(os.getenv('MAX_CONCURRENCY', '4')) + + # 心跳间隔 + heartbeat_interval = int(os.getenv('HEARTBEAT_INTERVAL', '5')) + + # 租约配置 + lease_extension_threshold = int(os.getenv('LEASE_EXTENSION_THRESHOLD', '60')) + lease_extension_duration = int(os.getenv('LEASE_EXTENSION_DURATION', '300')) + + # 临时目录 + temp_dir = os.getenv('TEMP_DIR', os.getenv('TEMP', '/tmp/render_worker')) + + # 能力列表 + capabilities_str = os.getenv('CAPABILITIES', '') + if capabilities_str: + capabilities = [c.strip() for c in capabilities_str.split(',') if c.strip()] + else: + capabilities = DEFAULT_CAPABILITIES.copy() + + # FFmpeg 超时 + ffmpeg_timeout = int(os.getenv('FFMPEG_TIMEOUT', '3600')) + + # 下载/上传超时 + download_timeout = int(os.getenv('DOWNLOAD_TIMEOUT', '300')) + upload_timeout = int(os.getenv('UPLOAD_TIMEOUT', '600')) + + return cls( + api_endpoint=api_endpoint, + access_key=access_key, + worker_id=worker_id, + max_concurrency=max_concurrency, + heartbeat_interval=heartbeat_interval, + lease_extension_threshold=lease_extension_threshold, + lease_extension_duration=lease_extension_duration, + temp_dir=temp_dir, + capabilities=capabilities, + ffmpeg_timeout=ffmpeg_timeout, + download_timeout=download_timeout, + upload_timeout=upload_timeout + ) + + def get_work_dir_path(self, task_id: str) -> str: + """获取任务工作目录路径""" + return os.path.join(self.temp_dir, f"task_{task_id}") + + def ensure_temp_dir(self) -> None: + """确保临时目录存在""" + os.makedirs(self.temp_dir, exist_ok=True) diff --git a/domain/result.py b/domain/result.py new file mode 100644 index 0000000..ef05ea5 --- /dev/null +++ b/domain/result.py @@ -0,0 +1,105 @@ +# -*- coding: utf-8 -*- +""" +任务结果模型 + +定义错误码、重试配置、任务结果等数据结构。 +""" + +from enum import Enum +from dataclasses import dataclass +from typing import Optional, Dict, Any, List + + +class ErrorCode(Enum): + """错误码枚举""" + E_INPUT_UNAVAILABLE = "E_INPUT_UNAVAILABLE" # 素材不可访问/404 + E_FFMPEG_FAILED = "E_FFMPEG_FAILED" # FFmpeg 执行失败 + E_UPLOAD_FAILED = "E_UPLOAD_FAILED" # 上传失败 + E_SPEC_INVALID = "E_SPEC_INVALID" # renderSpec 非法 + E_TIMEOUT = "E_TIMEOUT" # 执行超时 + E_UNKNOWN = "E_UNKNOWN" # 未知错误 + + +# 重试配置 +RETRY_CONFIG: Dict[ErrorCode, Dict[str, Any]] = { + ErrorCode.E_INPUT_UNAVAILABLE: { + 'max_retries': 3, + 'backoff': [1, 2, 5] # 重试间隔(秒) + }, + ErrorCode.E_FFMPEG_FAILED: { + 'max_retries': 2, + 'backoff': [1, 3] + }, + ErrorCode.E_UPLOAD_FAILED: { + 'max_retries': 3, + 'backoff': [1, 2, 5] + }, + ErrorCode.E_SPEC_INVALID: { + 'max_retries': 0, # 不重试 + 'backoff': [] + }, + ErrorCode.E_TIMEOUT: { + 'max_retries': 2, + 'backoff': [5, 10] + }, + ErrorCode.E_UNKNOWN: { + 'max_retries': 1, + 'backoff': [2] + }, +} + + +@dataclass +class TaskResult: + """ + 任务结果 + + 封装任务执行的结果,包括成功数据或失败信息。 + """ + success: bool + data: Optional[Dict[str, Any]] = None + error_code: Optional[ErrorCode] = None + error_message: Optional[str] = None + + @classmethod + def ok(cls, data: Dict[str, Any]) -> 'TaskResult': + """创建成功结果""" + return cls(success=True, data=data) + + @classmethod + def fail(cls, error_code: ErrorCode, error_message: str) -> 'TaskResult': + """创建失败结果""" + return cls( + success=False, + error_code=error_code, + error_message=error_message + ) + + def to_report_dict(self) -> Dict[str, Any]: + """ + 转换为上报格式 + + 用于 API 上报时的数据格式转换。 + """ + if self.success: + return {'result': self.data} + else: + return { + 'errorCode': self.error_code.value if self.error_code else 'E_UNKNOWN', + 'errorMessage': self.error_message or 'Unknown error' + } + + def can_retry(self) -> bool: + """是否可以重试""" + if self.success: + return False + if not self.error_code: + return True + config = RETRY_CONFIG.get(self.error_code, {}) + return config.get('max_retries', 0) > 0 + + def get_retry_config(self) -> Dict[str, Any]: + """获取重试配置""" + if not self.error_code: + return {'max_retries': 1, 'backoff': [2]} + return RETRY_CONFIG.get(self.error_code, {'max_retries': 1, 'backoff': [2]}) diff --git a/domain/task.py b/domain/task.py new file mode 100644 index 0000000..d39a679 --- /dev/null +++ b/domain/task.py @@ -0,0 +1,249 @@ +# -*- coding: utf-8 -*- +""" +任务领域模型 + +定义任务类型、任务实体、渲染规格、输出规格等数据结构。 +""" + +from enum import Enum +from dataclasses import dataclass, field +from typing import Dict, Any, Optional, List +from datetime import datetime + + +class TaskType(Enum): + """任务类型枚举""" + RENDER_SEGMENT_VIDEO = "RENDER_SEGMENT_VIDEO" # 渲染视频片段 + PREPARE_JOB_AUDIO = "PREPARE_JOB_AUDIO" # 生成全局音频 + PACKAGE_SEGMENT_TS = "PACKAGE_SEGMENT_TS" # 封装 TS 分片 + FINALIZE_MP4 = "FINALIZE_MP4" # 产出最终 MP4 + + +class TaskStatus(Enum): + """任务状态枚举""" + PENDING = "PENDING" + RUNNING = "RUNNING" + SUCCESS = "SUCCESS" + FAILED = "FAILED" + + +@dataclass +class RenderSpec: + """ + 渲染规格 + + 用于 RENDER_SEGMENT_VIDEO 任务,定义视频渲染参数。 + """ + crop_enable: bool = False + crop_size: Optional[str] = None + speed: str = "1.0" + lut_url: Optional[str] = None + overlay_url: Optional[str] = None + effects: Optional[str] = None + zoom_cut: bool = False + video_crop: Optional[str] = None + face_pos: Optional[str] = None + transitions: Optional[str] = None + + @classmethod + def from_dict(cls, data: Optional[Dict]) -> 'RenderSpec': + """从字典创建 RenderSpec""" + if not data: + return cls() + return cls( + crop_enable=data.get('cropEnable', False), + crop_size=data.get('cropSize'), + speed=str(data.get('speed', '1.0')), + lut_url=data.get('lutUrl'), + overlay_url=data.get('overlayUrl'), + effects=data.get('effects'), + zoom_cut=data.get('zoomCut', False), + video_crop=data.get('videoCrop'), + face_pos=data.get('facePos'), + transitions=data.get('transitions') + ) + + +@dataclass +class OutputSpec: + """ + 输出规格 + + 用于 RENDER_SEGMENT_VIDEO 任务,定义视频输出参数。 + """ + width: int = 1080 + height: int = 1920 + fps: int = 30 + bitrate: int = 4000000 + codec: str = "h264" + + @classmethod + def from_dict(cls, data: Optional[Dict]) -> 'OutputSpec': + """从字典创建 OutputSpec""" + if not data: + return cls() + return cls( + width=data.get('width', 1080), + height=data.get('height', 1920), + fps=data.get('fps', 30), + bitrate=data.get('bitrate', 4000000), + codec=data.get('codec', 'h264') + ) + + +@dataclass +class AudioSpec: + """ + 音频规格 + + 用于 PREPARE_JOB_AUDIO 任务中的片段叠加音效。 + """ + audio_url: Optional[str] = None + volume: float = 1.0 + fade_in_ms: int = 10 + fade_out_ms: int = 10 + start_ms: int = 0 + delay_ms: int = 0 + loop_enable: bool = False + + @classmethod + def from_dict(cls, data: Optional[Dict]) -> Optional['AudioSpec']: + """从字典创建 AudioSpec""" + if not data: + return None + return cls( + audio_url=data.get('audioUrl'), + volume=float(data.get('volume', 1.0)), + fade_in_ms=int(data.get('fadeInMs', 10)), + fade_out_ms=int(data.get('fadeOutMs', 10)), + start_ms=int(data.get('startMs', 0)), + delay_ms=int(data.get('delayMs', 0)), + loop_enable=data.get('loopEnable', False) + ) + + +@dataclass +class AudioProfile: + """ + 音频配置 + + 用于 PREPARE_JOB_AUDIO 任务的全局音频参数。 + """ + sample_rate: int = 48000 + channels: int = 2 + codec: str = "aac" + + @classmethod + def from_dict(cls, data: Optional[Dict]) -> 'AudioProfile': + """从字典创建 AudioProfile""" + if not data: + return cls() + return cls( + sample_rate=data.get('sampleRate', 48000), + channels=data.get('channels', 2), + codec=data.get('codec', 'aac') + ) + + +@dataclass +class Task: + """ + 任务实体 + + 表示一个待执行的渲染任务。 + """ + task_id: str + task_type: TaskType + priority: int + lease_expire_time: datetime + payload: Dict[str, Any] + + @classmethod + def from_dict(cls, data: Dict) -> 'Task': + """从 API 响应字典创建 Task""" + lease_time_str = data.get('leaseExpireTime', '') + + # 解析 ISO 8601 时间格式 + if lease_time_str: + if lease_time_str.endswith('Z'): + lease_time_str = lease_time_str[:-1] + '+00:00' + try: + lease_expire_time = datetime.fromisoformat(lease_time_str) + except ValueError: + # 解析失败时使用当前时间 + 5分钟 + lease_expire_time = datetime.now() + else: + lease_expire_time = datetime.now() + + return cls( + task_id=str(data['taskId']), + task_type=TaskType(data['taskType']), + priority=data.get('priority', 0), + lease_expire_time=lease_expire_time, + payload=data.get('payload', {}) + ) + + def get_job_id(self) -> str: + """获取作业 ID""" + return str(self.payload.get('jobId', '')) + + def get_segment_id(self) -> Optional[str]: + """获取片段 ID(如果有)""" + segment_id = self.payload.get('segmentId') + return str(segment_id) if segment_id else None + + def get_plan_segment_index(self) -> int: + """获取计划片段索引""" + return int(self.payload.get('planSegmentIndex', 0)) + + def get_duration_ms(self) -> int: + """获取时长(毫秒)""" + return int(self.payload.get('durationMs', 5000)) + + def get_material_url(self) -> Optional[str]: + """获取素材 URL""" + return self.payload.get('boundMaterialUrl') or self.payload.get('sourceRef') + + def get_render_spec(self) -> RenderSpec: + """获取渲染规格""" + return RenderSpec.from_dict(self.payload.get('renderSpec')) + + def get_output_spec(self) -> OutputSpec: + """获取输出规格""" + return OutputSpec.from_dict(self.payload.get('output')) + + def get_bgm_url(self) -> Optional[str]: + """获取 BGM URL""" + return self.payload.get('bgmUrl') + + def get_total_duration_ms(self) -> int: + """获取总时长(毫秒)""" + return int(self.payload.get('totalDurationMs', 0)) + + def get_segments(self) -> List[Dict]: + """获取片段列表""" + return self.payload.get('segments', []) + + def get_audio_profile(self) -> AudioProfile: + """获取音频配置""" + return AudioProfile.from_dict(self.payload.get('audioProfile')) + + def get_video_url(self) -> Optional[str]: + """获取视频 URL(用于 PACKAGE_SEGMENT_TS)""" + return self.payload.get('videoUrl') + + def get_audio_url(self) -> Optional[str]: + """获取音频 URL(用于 PACKAGE_SEGMENT_TS)""" + return self.payload.get('audioUrl') + + def get_start_time_ms(self) -> int: + """获取开始时间(毫秒)""" + return int(self.payload.get('startTimeMs', 0)) + + def get_m3u8_url(self) -> Optional[str]: + """获取 m3u8 URL(用于 FINALIZE_MP4)""" + return self.payload.get('m3u8Url') + + def get_ts_list(self) -> List[str]: + """获取 TS 列表(用于 FINALIZE_MP4)""" + return self.payload.get('tsList', []) diff --git a/handlers/__init__.py b/handlers/__init__.py new file mode 100644 index 0000000..b905644 --- /dev/null +++ b/handlers/__init__.py @@ -0,0 +1,20 @@ +# -*- coding: utf-8 -*- +""" +任务处理器层 + +包含各种任务类型的具体处理器实现。 +""" + +from handlers.base import BaseHandler +from handlers.render_video import RenderSegmentVideoHandler +from handlers.prepare_audio import PrepareJobAudioHandler +from handlers.package_ts import PackageSegmentTsHandler +from handlers.finalize_mp4 import FinalizeMp4Handler + +__all__ = [ + 'BaseHandler', + 'RenderSegmentVideoHandler', + 'PrepareJobAudioHandler', + 'PackageSegmentTsHandler', + 'FinalizeMp4Handler', +] diff --git a/handlers/base.py b/handlers/base.py new file mode 100644 index 0000000..064e8ed --- /dev/null +++ b/handlers/base.py @@ -0,0 +1,280 @@ +# -*- coding: utf-8 -*- +""" +任务处理器基类 + +提供所有处理器共用的基础功能。 +""" + +import os +import logging +import shutil +import tempfile +import subprocess +from abc import ABC +from typing import Optional, List, TYPE_CHECKING + +from core.handler import TaskHandler +from domain.task import Task +from domain.result import TaskResult, ErrorCode +from domain.config import WorkerConfig +from util import oss +from util.ffmpeg import subprocess_args + +if TYPE_CHECKING: + from services.api_client import APIClientV2 + +logger = logging.getLogger(__name__) + + +# v2 统一视频编码参数(来自集成文档) +VIDEO_ENCODE_ARGS = [ + '-c:v', 'libx264', + '-preset', 'medium', + '-profile:v', 'main', + '-level', '4.0', + '-crf', '23', + '-pix_fmt', 'yuv420p', +] + +# v2 统一音频编码参数 +AUDIO_ENCODE_ARGS = [ + '-c:a', 'aac', + '-b:a', '128k', + '-ar', '48000', + '-ac', '2', +] + + +class BaseHandler(TaskHandler, ABC): + """ + 任务处理器基类 + + 提供所有处理器共用的基础功能,包括: + - 临时目录管理 + - 文件下载/上传 + - FFmpeg 命令执行 + - 日志记录 + """ + + def __init__(self, config: WorkerConfig, api_client: 'APIClientV2'): + """ + 初始化处理器 + + Args: + config: Worker 配置 + api_client: API 客户端 + """ + self.config = config + self.api_client = api_client + + def before_handle(self, task: Task) -> None: + """处理前钩子""" + logger.debug(f"[task:{task.task_id}] Before handle: {task.task_type.value}") + + def after_handle(self, task: Task, result: TaskResult) -> None: + """处理后钩子""" + status = "success" if result.success else "failed" + logger.debug(f"[task:{task.task_id}] After handle: {status}") + + def create_work_dir(self, task_id: str = None) -> str: + """ + 创建临时工作目录 + + Args: + task_id: 任务 ID(用于目录命名) + + Returns: + 工作目录路径 + """ + # 确保临时根目录存在 + os.makedirs(self.config.temp_dir, exist_ok=True) + + # 创建唯一的工作目录 + prefix = f"task_{task_id}_" if task_id else "task_" + work_dir = tempfile.mkdtemp(dir=self.config.temp_dir, prefix=prefix) + + logger.debug(f"Created work directory: {work_dir}") + return work_dir + + def cleanup_work_dir(self, work_dir: str) -> None: + """ + 清理临时工作目录 + + Args: + work_dir: 工作目录路径 + """ + if not work_dir or not os.path.exists(work_dir): + return + + try: + shutil.rmtree(work_dir) + logger.debug(f"Cleaned up work directory: {work_dir}") + except Exception as e: + logger.warning(f"Failed to cleanup work directory {work_dir}: {e}") + + def download_file(self, url: str, dest: str, timeout: int = None) -> bool: + """ + 下载文件 + + Args: + url: 文件 URL + dest: 目标路径 + timeout: 超时时间(秒) + + Returns: + 是否成功 + """ + if timeout is None: + timeout = self.config.download_timeout + + try: + result = oss.download_from_oss(url, dest) + if result: + file_size = os.path.getsize(dest) if os.path.exists(dest) else 0 + logger.debug(f"Downloaded: {url} -> {dest} ({file_size} bytes)") + return result + except Exception as e: + logger.error(f"Download failed: {url} -> {e}") + return False + + def upload_file( + self, + task_id: str, + file_type: str, + file_path: str, + file_name: str = None + ) -> Optional[str]: + """ + 上传文件并返回访问 URL + + Args: + task_id: 任务 ID + file_type: 文件类型(video/audio/ts/mp4) + file_path: 本地文件路径 + file_name: 文件名(可选) + + Returns: + 访问 URL,失败返回 None + """ + # 获取上传 URL + upload_info = self.api_client.get_upload_url(task_id, file_type, file_name) + if not upload_info: + logger.error(f"[task:{task_id}] Failed to get upload URL") + return None + + upload_url = upload_info.get('uploadUrl') + access_url = upload_info.get('accessUrl') + + if not upload_url: + logger.error(f"[task:{task_id}] Invalid upload URL response") + return None + + # 上传文件 + try: + result = oss.upload_to_oss(upload_url, file_path) + if result: + file_size = os.path.getsize(file_path) + logger.info(f"[task:{task_id}] Uploaded: {file_path} ({file_size} bytes)") + return access_url + else: + logger.error(f"[task:{task_id}] Upload failed: {file_path}") + return None + except Exception as e: + logger.error(f"[task:{task_id}] Upload error: {e}") + return None + + def run_ffmpeg( + self, + cmd: List[str], + task_id: str, + timeout: int = None + ) -> bool: + """ + 执行 FFmpeg 命令 + + Args: + cmd: FFmpeg 命令参数列表 + task_id: 任务 ID(用于日志) + timeout: 超时时间(秒) + + Returns: + 是否成功 + """ + if timeout is None: + timeout = self.config.ffmpeg_timeout + + # 日志记录命令(限制长度) + cmd_str = ' '.join(cmd) + if len(cmd_str) > 500: + cmd_str = cmd_str[:500] + '...' + logger.info(f"[task:{task_id}] FFmpeg: {cmd_str}") + + try: + result = subprocess.run( + cmd, + capture_output=True, + timeout=timeout, + **subprocess_args(False) + ) + + if result.returncode != 0: + stderr = result.stderr.decode('utf-8', errors='replace')[:1000] + logger.error(f"[task:{task_id}] FFmpeg failed (code={result.returncode}): {stderr}") + return False + + return True + + except subprocess.TimeoutExpired: + logger.error(f"[task:{task_id}] FFmpeg timeout after {timeout}s") + return False + except Exception as e: + logger.error(f"[task:{task_id}] FFmpeg error: {e}") + return False + + def probe_duration(self, file_path: str) -> Optional[float]: + """ + 探测媒体文件时长 + + Args: + file_path: 文件路径 + + Returns: + 时长(秒),失败返回 None + """ + try: + from util.ffmpeg import probe_video_info + _, _, duration = probe_video_info(file_path) + return float(duration) if duration else None + except Exception as e: + logger.warning(f"Failed to probe duration: {file_path} -> {e}") + return None + + def get_file_size(self, file_path: str) -> int: + """ + 获取文件大小 + + Args: + file_path: 文件路径 + + Returns: + 文件大小(字节) + """ + try: + return os.path.getsize(file_path) + except Exception: + return 0 + + def ensure_file_exists(self, file_path: str, min_size: int = 0) -> bool: + """ + 确保文件存在且大小满足要求 + + Args: + file_path: 文件路径 + min_size: 最小大小(字节) + + Returns: + 是否满足要求 + """ + if not os.path.exists(file_path): + return False + return os.path.getsize(file_path) >= min_size diff --git a/handlers/finalize_mp4.py b/handlers/finalize_mp4.py new file mode 100644 index 0000000..fe9f0cf --- /dev/null +++ b/handlers/finalize_mp4.py @@ -0,0 +1,190 @@ +# -*- coding: utf-8 -*- +""" +最终 MP4 合并处理器 + +处理 FINALIZE_MP4 任务,将所有 TS 分片合并为最终可下载的 MP4 文件。 +""" + +import os +import logging +from typing import List + +from handlers.base import BaseHandler +from domain.task import Task, TaskType +from domain.result import TaskResult, ErrorCode + +logger = logging.getLogger(__name__) + + +class FinalizeMp4Handler(BaseHandler): + """ + 最终 MP4 合并处理器 + + 职责: + - 下载所有 TS 分片 + - 使用 concat demuxer 合并 + - 产出最终 MP4(remux,不重编码) + - 上传 MP4 产物 + + 关键约束: + - 优先使用 remux(复制流,不重新编码) + - 使用 aac_adtstoasc bitstream filter 处理音频 + """ + + def get_supported_type(self) -> TaskType: + return TaskType.FINALIZE_MP4 + + def handle(self, task: Task) -> TaskResult: + """处理 MP4 合并任务""" + work_dir = self.create_work_dir(task.task_id) + + try: + # 获取 TS 列表 + ts_list = task.get_ts_list() + m3u8_url = task.get_m3u8_url() + + if not ts_list and not m3u8_url: + return TaskResult.fail( + ErrorCode.E_SPEC_INVALID, + "Missing tsList or m3u8Url" + ) + + output_file = os.path.join(work_dir, 'final.mp4') + + if ts_list: + # 方式1:使用 TS 列表 + result = self._process_ts_list(task, work_dir, ts_list, output_file) + else: + # 方式2:使用 m3u8 URL + result = self._process_m3u8(task, work_dir, m3u8_url, output_file) + + if not result.success: + return result + + # 验证输出文件 + if not self.ensure_file_exists(output_file, min_size=4096): + return TaskResult.fail( + ErrorCode.E_FFMPEG_FAILED, + "MP4 output file is missing or too small" + ) + + # 获取文件大小 + file_size = self.get_file_size(output_file) + + # 上传产物 + mp4_url = self.upload_file(task.task_id, 'mp4', output_file) + if not mp4_url: + return TaskResult.fail( + ErrorCode.E_UPLOAD_FAILED, + "Failed to upload MP4" + ) + + return TaskResult.ok({ + 'mp4Url': mp4_url, + 'fileSizeBytes': file_size + }) + + except Exception as e: + logger.error(f"[task:{task.task_id}] Unexpected error: {e}", exc_info=True) + return TaskResult.fail(ErrorCode.E_UNKNOWN, str(e)) + + finally: + self.cleanup_work_dir(work_dir) + + def _process_ts_list( + self, + task: Task, + work_dir: str, + ts_list: List[str], + output_file: str + ) -> TaskResult: + """ + 使用 TS 列表处理 + + Args: + task: 任务实体 + work_dir: 工作目录 + ts_list: TS URL 列表 + output_file: 输出文件路径 + + Returns: + TaskResult + """ + # 1. 下载所有 TS 分片 + ts_files = [] + for i, ts_url in enumerate(ts_list): + ts_file = os.path.join(work_dir, f'seg_{i}.ts') + if not self.download_file(ts_url, ts_file): + return TaskResult.fail( + ErrorCode.E_INPUT_UNAVAILABLE, + f"Failed to download TS segment {i}: {ts_url}" + ) + ts_files.append(ts_file) + + logger.info(f"[task:{task.task_id}] Downloaded {len(ts_files)} TS segments") + + # 2. 创建 concat 文件列表 + concat_file = os.path.join(work_dir, 'concat.txt') + with open(concat_file, 'w', encoding='utf-8') as f: + for ts_file in ts_files: + # 路径中的反斜杠需要转义或使用正斜杠 + ts_path = ts_file.replace('\\', '/') + f.write(f"file '{ts_path}'\n") + + # 3. 构建合并命令(remux,不重编码) + cmd = [ + 'ffmpeg', '-y', '-hide_banner', + '-f', 'concat', + '-safe', '0', + '-i', concat_file, + '-c', 'copy', # 复制流,不重编码 + '-bsf:a', 'aac_adtstoasc', # 音频 bitstream filter + output_file + ] + + # 4. 执行 FFmpeg + if not self.run_ffmpeg(cmd, task.task_id): + return TaskResult.fail( + ErrorCode.E_FFMPEG_FAILED, + "MP4 concatenation failed" + ) + + return TaskResult.ok({}) + + def _process_m3u8( + self, + task: Task, + work_dir: str, + m3u8_url: str, + output_file: str + ) -> TaskResult: + """ + 使用 m3u8 URL 处理 + + Args: + task: 任务实体 + work_dir: 工作目录 + m3u8_url: m3u8 URL + output_file: 输出文件路径 + + Returns: + TaskResult + """ + # 构建命令 + cmd = [ + 'ffmpeg', '-y', '-hide_banner', + '-protocol_whitelist', 'file,http,https,tcp,tls', + '-i', m3u8_url, + '-c', 'copy', + '-bsf:a', 'aac_adtstoasc', + output_file + ] + + # 执行 FFmpeg + if not self.run_ffmpeg(cmd, task.task_id): + return TaskResult.fail( + ErrorCode.E_FFMPEG_FAILED, + "MP4 conversion from m3u8 failed" + ) + + return TaskResult.ok({}) diff --git a/handlers/package_ts.py b/handlers/package_ts.py new file mode 100644 index 0000000..d219823 --- /dev/null +++ b/handlers/package_ts.py @@ -0,0 +1,175 @@ +# -*- coding: utf-8 -*- +""" +TS 分片封装处理器 + +处理 PACKAGE_SEGMENT_TS 任务,将视频片段和对应时间区间的音频封装为 TS 分片。 +""" + +import os +import logging +from typing import List + +from handlers.base import BaseHandler +from domain.task import Task, TaskType +from domain.result import TaskResult, ErrorCode + +logger = logging.getLogger(__name__) + + +class PackageSegmentTsHandler(BaseHandler): + """ + TS 分片封装处理器 + + 职责: + - 下载视频片段 + - 下载全局音频 + - 截取对应时间区间的音频 + - 封装为 TS 分片 + - 上传 TS 产物 + + 关键约束: + - TS 必须包含音视频同轨 + - 使用 output_ts_offset 保证时间戳连续 + - 输出 extinfDurationSec 供 m3u8 使用 + """ + + def get_supported_type(self) -> TaskType: + return TaskType.PACKAGE_SEGMENT_TS + + def handle(self, task: Task) -> TaskResult: + """处理 TS 封装任务""" + work_dir = self.create_work_dir(task.task_id) + + try: + # 解析参数 + video_url = task.get_video_url() + audio_url = task.get_audio_url() + start_time_ms = task.get_start_time_ms() + duration_ms = task.get_duration_ms() + + if not video_url: + return TaskResult.fail( + ErrorCode.E_SPEC_INVALID, + "Missing videoUrl" + ) + + if not audio_url: + return TaskResult.fail( + ErrorCode.E_SPEC_INVALID, + "Missing audioUrl" + ) + + # 计算时间参数 + start_sec = start_time_ms / 1000.0 + duration_sec = duration_ms / 1000.0 + + # 1. 下载视频片段 + video_file = os.path.join(work_dir, 'video.mp4') + if not self.download_file(video_url, video_file): + return TaskResult.fail( + ErrorCode.E_INPUT_UNAVAILABLE, + f"Failed to download video: {video_url}" + ) + + # 2. 下载全局音频 + audio_file = os.path.join(work_dir, 'audio.aac') + if not self.download_file(audio_url, audio_file): + return TaskResult.fail( + ErrorCode.E_INPUT_UNAVAILABLE, + f"Failed to download audio: {audio_url}" + ) + + # 3. 构建 TS 封装命令 + output_file = os.path.join(work_dir, 'segment.ts') + cmd = self._build_command( + video_file=video_file, + audio_file=audio_file, + output_file=output_file, + start_sec=start_sec, + duration_sec=duration_sec + ) + + # 4. 执行 FFmpeg + if not self.run_ffmpeg(cmd, task.task_id): + return TaskResult.fail( + ErrorCode.E_FFMPEG_FAILED, + "TS packaging failed" + ) + + # 5. 验证输出文件 + if not self.ensure_file_exists(output_file, min_size=1024): + return TaskResult.fail( + ErrorCode.E_FFMPEG_FAILED, + "TS output file is missing or too small" + ) + + # 6. 获取实际时长(用于 EXTINF) + actual_duration = self.probe_duration(output_file) + extinf_duration = actual_duration if actual_duration else duration_sec + + # 7. 上传产物 + ts_url = self.upload_file(task.task_id, 'ts', output_file) + if not ts_url: + return TaskResult.fail( + ErrorCode.E_UPLOAD_FAILED, + "Failed to upload TS" + ) + + return TaskResult.ok({ + 'tsUrl': ts_url, + 'extinfDurationSec': extinf_duration + }) + + except Exception as e: + logger.error(f"[task:{task.task_id}] Unexpected error: {e}", exc_info=True) + return TaskResult.fail(ErrorCode.E_UNKNOWN, str(e)) + + finally: + self.cleanup_work_dir(work_dir) + + def _build_command( + self, + video_file: str, + audio_file: str, + output_file: str, + start_sec: float, + duration_sec: float + ) -> List[str]: + """ + 构建 TS 封装命令 + + Args: + video_file: 视频文件路径 + audio_file: 音频文件路径 + output_file: 输出文件路径 + start_sec: 开始时间(秒) + duration_sec: 时长(秒) + + Returns: + FFmpeg 命令参数列表 + """ + cmd = [ + 'ffmpeg', '-y', '-hide_banner', + # 视频输入 + '-i', video_file, + # 音频输入(从 start_sec 开始截取 duration_sec) + '-ss', str(start_sec), + '-t', str(duration_sec), + '-i', audio_file, + # 映射流 + '-map', '0:v:0', # 使用第一个输入的视频流 + '-map', '1:a:0', # 使用第二个输入的音频流 + # 复制编码(不重新编码) + '-c:v', 'copy', + '-c:a', 'copy', + # 关键:时间戳偏移,保证整体连续 + '-output_ts_offset', str(start_sec), + # 复用参数 + '-muxdelay', '0', + '-muxpreload', '0', + # 输出格式 + '-f', 'mpegts', + output_file + ] + + return cmd diff --git a/handlers/prepare_audio.py b/handlers/prepare_audio.py new file mode 100644 index 0000000..27d1263 --- /dev/null +++ b/handlers/prepare_audio.py @@ -0,0 +1,251 @@ +# -*- coding: utf-8 -*- +""" +全局音频准备处理器 + +处理 PREPARE_JOB_AUDIO 任务,生成整个视频的连续音频轨道。 +""" + +import os +import logging +from typing import List, Dict, Optional + +from handlers.base import BaseHandler, AUDIO_ENCODE_ARGS +from domain.task import Task, TaskType, AudioSpec, AudioProfile +from domain.result import TaskResult, ErrorCode + +logger = logging.getLogger(__name__) + + +class PrepareJobAudioHandler(BaseHandler): + """ + 全局音频准备处理器 + + 职责: + - 下载全局 BGM + - 下载各片段叠加音效 + - 构建复杂混音命令 + - 执行混音 + - 上传音频产物 + + 关键约束: + - 全局 BGM 连续生成一次,贯穿整个时长 + - 禁止使用 amix normalize=1 + - 只对叠加音轨做极短淡入淡出(5-20ms) + - 不对 BGM 做边界 fade + """ + + def get_supported_type(self) -> TaskType: + return TaskType.PREPARE_JOB_AUDIO + + def handle(self, task: Task) -> TaskResult: + """处理音频准备任务""" + work_dir = self.create_work_dir(task.task_id) + + try: + # 解析参数 + total_duration_ms = task.get_total_duration_ms() + if total_duration_ms <= 0: + return TaskResult.fail( + ErrorCode.E_SPEC_INVALID, + "Invalid totalDurationMs" + ) + + total_duration_sec = total_duration_ms / 1000.0 + audio_profile = task.get_audio_profile() + bgm_url = task.get_bgm_url() + segments = task.get_segments() + + # 1. 下载 BGM(如有) + bgm_file = None + if bgm_url: + bgm_file = os.path.join(work_dir, 'bgm.mp3') + if not self.download_file(bgm_url, bgm_file): + logger.warning(f"[task:{task.task_id}] Failed to download BGM") + bgm_file = None + + # 2. 下载叠加音效 + sfx_files = [] + for i, seg in enumerate(segments): + audio_spec_data = seg.get('audioSpecJson') + if audio_spec_data: + audio_spec = AudioSpec.from_dict(audio_spec_data) + if audio_spec and audio_spec.audio_url: + sfx_file = os.path.join(work_dir, f'sfx_{i}.mp3') + if self.download_file(audio_spec.audio_url, sfx_file): + sfx_files.append({ + 'file': sfx_file, + 'spec': audio_spec, + 'segment': seg + }) + else: + logger.warning(f"[task:{task.task_id}] Failed to download SFX {i}") + + # 3. 构建音频混音命令 + output_file = os.path.join(work_dir, 'audio_full.aac') + cmd = self._build_audio_command( + bgm_file=bgm_file, + sfx_files=sfx_files, + output_file=output_file, + total_duration_sec=total_duration_sec, + audio_profile=audio_profile + ) + + # 4. 执行 FFmpeg + if not self.run_ffmpeg(cmd, task.task_id): + return TaskResult.fail( + ErrorCode.E_FFMPEG_FAILED, + "Audio mixing failed" + ) + + # 5. 验证输出文件 + if not self.ensure_file_exists(output_file, min_size=1024): + return TaskResult.fail( + ErrorCode.E_FFMPEG_FAILED, + "Audio output file is missing or too small" + ) + + # 6. 上传产物 + audio_url = self.upload_file(task.task_id, 'audio', output_file) + if not audio_url: + return TaskResult.fail( + ErrorCode.E_UPLOAD_FAILED, + "Failed to upload audio" + ) + + return TaskResult.ok({ + 'audioUrl': audio_url + }) + + except Exception as e: + logger.error(f"[task:{task.task_id}] Unexpected error: {e}", exc_info=True) + return TaskResult.fail(ErrorCode.E_UNKNOWN, str(e)) + + finally: + self.cleanup_work_dir(work_dir) + + def _build_audio_command( + self, + bgm_file: Optional[str], + sfx_files: List[Dict], + output_file: str, + total_duration_sec: float, + audio_profile: AudioProfile + ) -> List[str]: + """ + 构建音频混音命令 + + Args: + bgm_file: BGM 文件路径(可选) + sfx_files: 叠加音效列表 + output_file: 输出文件路径 + total_duration_sec: 总时长(秒) + audio_profile: 音频配置 + + Returns: + FFmpeg 命令参数列表 + """ + sample_rate = audio_profile.sample_rate + channels = audio_profile.channels + + # 情况1:无 BGM 也无叠加音效 -> 生成静音 + if not bgm_file and not sfx_files: + return [ + 'ffmpeg', '-y', '-hide_banner', + '-f', 'lavfi', + '-i', f'anullsrc=r={sample_rate}:cl=stereo', + '-t', str(total_duration_sec), + '-c:a', 'aac', '-b:a', '128k', + output_file + ] + + # 情况2:仅 BGM,无叠加音效 + if not sfx_files: + return [ + 'ffmpeg', '-y', '-hide_banner', + '-i', bgm_file, + '-t', str(total_duration_sec), + '-c:a', 'aac', '-b:a', '128k', + '-ar', str(sample_rate), '-ac', str(channels), + output_file + ] + + # 情况3:BGM + 叠加音效 -> 复杂滤镜 + inputs = [] + if bgm_file: + inputs.extend(['-i', bgm_file]) + for sfx in sfx_files: + inputs.extend(['-i', sfx['file']]) + + filter_parts = [] + input_idx = 0 + + # BGM 处理(或生成静音底轨) + if bgm_file: + filter_parts.append( + f"[0:a]atrim=0:{total_duration_sec},asetpts=PTS-STARTPTS," + f"apad=whole_dur={total_duration_sec}[bgm]" + ) + input_idx = 1 + else: + filter_parts.append( + f"anullsrc=r={sample_rate}:cl=stereo," + f"atrim=0:{total_duration_sec}[bgm]" + ) + input_idx = 0 + + # 叠加音效处理 + sfx_labels = [] + for i, sfx in enumerate(sfx_files): + idx = input_idx + i + spec = sfx['spec'] + seg = sfx['segment'] + + # 计算时间参数 + start_time_ms = seg.get('startTimeMs', 0) + duration_ms = seg.get('durationMs', 5000) + delay_ms = start_time_ms + spec.delay_ms + delay_sec = delay_ms / 1000.0 + duration_sec = duration_ms / 1000.0 + + # 淡入淡出参数(极短,5-20ms) + fade_in_sec = spec.fade_in_ms / 1000.0 + fade_out_sec = spec.fade_out_ms / 1000.0 + + # 音量 + volume = spec.volume + + label = f"sfx{i}" + sfx_labels.append(f"[{label}]") + + # 构建滤镜:延迟 + 淡入淡出 + 音量 + # 注意:只对叠加音轨做淡入淡出,不对 BGM 做 + sfx_filter = ( + f"[{idx}:a]" + f"adelay={int(delay_ms)}|{int(delay_ms)}," + f"afade=t=in:st={delay_sec}:d={fade_in_sec}," + f"afade=t=out:st={delay_sec + duration_sec - fade_out_sec}:d={fade_out_sec}," + f"volume={volume}" + f"[{label}]" + ) + filter_parts.append(sfx_filter) + + # 混音(关键:normalize=0,禁止归一化) + # dropout_transition=0 表示输入结束时不做渐变 + mix_inputs = "[bgm]" + "".join(sfx_labels) + num_inputs = 1 + len(sfx_files) + filter_parts.append( + f"{mix_inputs}amix=inputs={num_inputs}:duration=first:" + f"dropout_transition=0:normalize=0[out]" + ) + + filter_complex = ';'.join(filter_parts) + + cmd = ['ffmpeg', '-y', '-hide_banner'] + inputs + [ + '-filter_complex', filter_complex, + '-map', '[out]', + '-c:a', 'aac', '-b:a', '128k', + '-ar', str(sample_rate), '-ac', str(channels), + output_file + ] + + return cmd diff --git a/handlers/render_video.py b/handlers/render_video.py new file mode 100644 index 0000000..a91b622 --- /dev/null +++ b/handlers/render_video.py @@ -0,0 +1,274 @@ +# -*- coding: utf-8 -*- +""" +视频片段渲染处理器 + +处理 RENDER_SEGMENT_VIDEO 任务,将原素材渲染为符合输出规格的视频片段。 +""" + +import os +import logging +from typing import List, Optional + +from handlers.base import BaseHandler, VIDEO_ENCODE_ARGS +from domain.task import Task, TaskType, RenderSpec, OutputSpec +from domain.result import TaskResult, ErrorCode + +logger = logging.getLogger(__name__) + + +class RenderSegmentVideoHandler(BaseHandler): + """ + 视频片段渲染处理器 + + 职责: + - 下载素材文件 + - 下载 LUT 文件(如有) + - 下载叠加层(如有) + - 构建 FFmpeg 渲染命令 + - 执行渲染 + - 上传产物 + """ + + def get_supported_type(self) -> TaskType: + return TaskType.RENDER_SEGMENT_VIDEO + + def handle(self, task: Task) -> TaskResult: + """处理视频渲染任务""" + work_dir = self.create_work_dir(task.task_id) + + try: + # 解析参数 + material_url = task.get_material_url() + if not material_url: + return TaskResult.fail( + ErrorCode.E_SPEC_INVALID, + "Missing material URL (boundMaterialUrl or sourceRef)" + ) + + render_spec = task.get_render_spec() + output_spec = task.get_output_spec() + duration_ms = task.get_duration_ms() + + # 1. 下载素材 + input_file = os.path.join(work_dir, 'input.mp4') + if not self.download_file(material_url, input_file): + return TaskResult.fail( + ErrorCode.E_INPUT_UNAVAILABLE, + f"Failed to download material: {material_url}" + ) + + # 2. 下载 LUT(如有) + lut_file = None + if render_spec.lut_url: + lut_file = os.path.join(work_dir, 'lut.cube') + if not self.download_file(render_spec.lut_url, lut_file): + logger.warning(f"[task:{task.task_id}] Failed to download LUT, continuing without it") + lut_file = None + + # 3. 下载叠加层(如有) + overlay_file = None + if render_spec.overlay_url: + # 根据 URL 后缀确定文件扩展名 + ext = '.png' + if render_spec.overlay_url.lower().endswith('.jpg') or render_spec.overlay_url.lower().endswith('.jpeg'): + ext = '.jpg' + overlay_file = os.path.join(work_dir, f'overlay{ext}') + if not self.download_file(render_spec.overlay_url, overlay_file): + logger.warning(f"[task:{task.task_id}] Failed to download overlay, continuing without it") + overlay_file = None + + # 4. 构建 FFmpeg 命令 + output_file = os.path.join(work_dir, 'output.mp4') + cmd = self._build_command( + input_file=input_file, + output_file=output_file, + render_spec=render_spec, + output_spec=output_spec, + duration_ms=duration_ms, + lut_file=lut_file, + overlay_file=overlay_file + ) + + # 5. 执行 FFmpeg + if not self.run_ffmpeg(cmd, task.task_id): + return TaskResult.fail( + ErrorCode.E_FFMPEG_FAILED, + "FFmpeg rendering failed" + ) + + # 6. 验证输出文件 + if not self.ensure_file_exists(output_file, min_size=4096): + return TaskResult.fail( + ErrorCode.E_FFMPEG_FAILED, + "Output file is missing or too small" + ) + + # 7. 获取实际时长 + actual_duration = self.probe_duration(output_file) + actual_duration_ms = int(actual_duration * 1000) if actual_duration else duration_ms + + # 8. 上传产物 + video_url = self.upload_file(task.task_id, 'video', output_file) + if not video_url: + return TaskResult.fail( + ErrorCode.E_UPLOAD_FAILED, + "Failed to upload video" + ) + + return TaskResult.ok({ + 'videoUrl': video_url, + 'actualDurationMs': actual_duration_ms + }) + + except Exception as e: + logger.error(f"[task:{task.task_id}] Unexpected error: {e}", exc_info=True) + return TaskResult.fail(ErrorCode.E_UNKNOWN, str(e)) + + finally: + self.cleanup_work_dir(work_dir) + + def _build_command( + self, + input_file: str, + output_file: str, + render_spec: RenderSpec, + output_spec: OutputSpec, + duration_ms: int, + lut_file: Optional[str] = None, + overlay_file: Optional[str] = None + ) -> List[str]: + """ + 构建 FFmpeg 渲染命令 + + Args: + input_file: 输入文件路径 + output_file: 输出文件路径 + render_spec: 渲染规格 + output_spec: 输出规格 + duration_ms: 目标时长(毫秒) + lut_file: LUT 文件路径(可选) + overlay_file: 叠加层文件路径(可选) + + Returns: + FFmpeg 命令参数列表 + """ + cmd = ['ffmpeg', '-y', '-hide_banner'] + + # 输入文件 + cmd.extend(['-i', input_file]) + + # 叠加层输入 + if overlay_file: + cmd.extend(['-i', overlay_file]) + + # 构建视频滤镜链 + filters = self._build_video_filters( + render_spec=render_spec, + output_spec=output_spec, + lut_file=lut_file, + has_overlay=overlay_file is not None + ) + + # 应用滤镜 + if overlay_file: + # 使用 filter_complex 处理叠加 + cmd.extend(['-filter_complex', filters]) + elif filters: + cmd.extend(['-vf', filters]) + + # 编码参数(v2 统一参数) + cmd.extend(VIDEO_ENCODE_ARGS) + + # 帧率 + fps = output_spec.fps + cmd.extend(['-r', str(fps)]) + + # GOP 大小(关键帧间隔) + gop_size = fps * 2 # 2秒一个关键帧 + cmd.extend(['-g', str(gop_size)]) + cmd.extend(['-keyint_min', str(gop_size)]) + + # 时长 + duration_sec = duration_ms / 1000.0 + cmd.extend(['-t', str(duration_sec)]) + + # 无音频(视频片段不包含音频) + cmd.append('-an') + + # 输出文件 + cmd.append(output_file) + + return cmd + + def _build_video_filters( + self, + render_spec: RenderSpec, + output_spec: OutputSpec, + lut_file: Optional[str] = None, + has_overlay: bool = False + ) -> str: + """ + 构建视频滤镜链 + + Args: + render_spec: 渲染规格 + output_spec: 输出规格 + lut_file: LUT 文件路径 + has_overlay: 是否有叠加层 + + Returns: + 滤镜字符串 + """ + filters = [] + width = output_spec.width + height = output_spec.height + + # 1. 变速处理 + speed = float(render_spec.speed) if render_spec.speed else 1.0 + if speed != 1.0 and speed > 0: + # setpts 公式:PTS / speed + pts_factor = 1.0 / speed + filters.append(f"setpts={pts_factor}*PTS") + + # 2. LUT 调色 + if lut_file: + # 路径中的反斜杠需要转义 + lut_path = lut_file.replace('\\', '/') + filters.append(f"lut3d='{lut_path}'") + + # 3. 裁切处理 + if render_spec.crop_enable and render_spec.face_pos: + # 根据人脸位置进行智能裁切 + try: + fx, fy = map(float, render_spec.face_pos.split(',')) + # 计算裁切区域(保持输出比例) + target_ratio = width / height + # 假设裁切到目标比例 + filters.append( + f"crop='min(iw,ih*{target_ratio})':'min(ih,iw/{target_ratio})':" + f"'(iw-min(iw,ih*{target_ratio}))*{fx}':" + f"'(ih-min(ih,iw/{target_ratio}))*{fy}'" + ) + except (ValueError, ZeroDivisionError): + logger.warning(f"Invalid face position: {render_spec.face_pos}") + elif render_spec.zoom_cut: + # 中心缩放裁切 + target_ratio = width / height + filters.append( + f"crop='min(iw,ih*{target_ratio})':'min(ih,iw/{target_ratio})'" + ) + + # 4. 缩放和填充 + scale_filter = ( + f"scale={width}:{height}:force_original_aspect_ratio=decrease," + f"pad={width}:{height}:(ow-iw)/2:(oh-ih)/2:black" + ) + filters.append(scale_filter) + + # 5. 构建最终滤镜 + if has_overlay: + # 使用 filter_complex 格式 + base_filters = ','.join(filters) if filters else 'copy' + return f"[0:v]{base_filters}[base];[base][1:v]overlay=0:0" + else: + return ','.join(filters) if filters else '' diff --git a/index_v2.py b/index_v2.py new file mode 100644 index 0000000..63fc660 --- /dev/null +++ b/index_v2.py @@ -0,0 +1,188 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +RenderWorker v2 入口 + +支持 v2 API 协议的渲染 Worker,处理以下任务类型: +- RENDER_SEGMENT_VIDEO: 渲染视频片段 +- PREPARE_JOB_AUDIO: 生成全局音频 +- PACKAGE_SEGMENT_TS: 封装 TS 分片 +- FINALIZE_MP4: 产出最终 MP4 + +使用方法: + python index_v2.py + +环境变量: + API_ENDPOINT_V2: v2 API 端点(或使用 API_ENDPOINT) + ACCESS_KEY: Worker 认证密钥 + WORKER_ID: Worker ID(默认 100001) + MAX_CONCURRENCY: 最大并发数(默认 4) + HEARTBEAT_INTERVAL: 心跳间隔秒数(默认 5) + TEMP_DIR: 临时文件目录 +""" + +import os +import sys +import time +import signal +import logging + +# 加载配置(必须在其他模块之前) +import config + +from domain.config import WorkerConfig +from services.api_client import APIClientV2 +from services.task_executor import TaskExecutor + +# 日志配置 +logging.basicConfig( + level=logging.INFO, + format='[%(asctime)s] [%(levelname)s] [%(name)s] %(message)s', + datefmt='%Y-%m-%d %H:%M:%S' +) +logger = logging.getLogger('worker_v2') + + +class WorkerV2: + """ + v2 渲染 Worker 主类 + + 负责: + - 配置加载 + - API 客户端初始化 + - 任务执行器管理 + - 主循环运行 + - 优雅退出处理 + """ + + def __init__(self): + """初始化 Worker""" + # 加载配置 + try: + self.config = WorkerConfig.from_env() + except ValueError as e: + logger.error(f"Configuration error: {e}") + sys.exit(1) + + # 初始化 API 客户端 + self.api_client = APIClientV2(self.config) + + # 初始化任务执行器 + self.task_executor = TaskExecutor(self.config, self.api_client) + + # 运行状态 + self.running = True + + # 确保临时目录存在 + self.config.ensure_temp_dir() + + # 注册信号处理器 + self._setup_signal_handlers() + + def _setup_signal_handlers(self): + """设置信号处理器""" + # Windows 不支持 SIGTERM + signal.signal(signal.SIGINT, self._signal_handler) + if hasattr(signal, 'SIGTERM'): + signal.signal(signal.SIGTERM, self._signal_handler) + + def _signal_handler(self, signum, frame): + """ + 信号处理,优雅退出 + + Args: + signum: 信号编号 + frame: 当前栈帧 + """ + signal_name = signal.Signals(signum).name + logger.info(f"Received signal {signal_name}, initiating shutdown...") + self.running = False + + def run(self): + """主循环""" + logger.info("=" * 60) + logger.info("RenderWorker v2 Starting") + logger.info("=" * 60) + logger.info(f"Worker ID: {self.config.worker_id}") + logger.info(f"API Endpoint: {self.config.api_endpoint}") + logger.info(f"Max Concurrency: {self.config.max_concurrency}") + logger.info(f"Heartbeat Interval: {self.config.heartbeat_interval}s") + logger.info(f"Capabilities: {', '.join(self.config.capabilities)}") + logger.info(f"Temp Directory: {self.config.temp_dir}") + logger.info("=" * 60) + + consecutive_errors = 0 + max_consecutive_errors = 10 + + while self.running: + try: + # 心跳同步并拉取任务 + current_task_ids = self.task_executor.get_current_task_ids() + tasks = self.api_client.sync(current_task_ids) + + # 提交新任务 + for task in tasks: + if self.task_executor.submit_task(task): + logger.info(f"Submitted task: {task.task_id} ({task.task_type.value})") + + # 重置错误计数 + consecutive_errors = 0 + + # 等待下次心跳 + time.sleep(self.config.heartbeat_interval) + + except KeyboardInterrupt: + logger.info("Keyboard interrupt received") + self.running = False + except Exception as e: + consecutive_errors += 1 + logger.error(f"Worker loop error ({consecutive_errors}/{max_consecutive_errors}): {e}", exc_info=True) + + # 连续错误过多,增加等待时间 + if consecutive_errors >= max_consecutive_errors: + logger.error("Too many consecutive errors, waiting 30 seconds...") + time.sleep(30) + consecutive_errors = 0 + else: + time.sleep(5) + + # 优雅关闭 + self._shutdown() + + def _shutdown(self): + """优雅关闭""" + logger.info("Shutting down...") + + # 等待当前任务完成 + current_count = self.task_executor.get_current_task_count() + if current_count > 0: + logger.info(f"Waiting for {current_count} running task(s) to complete...") + + # 关闭执行器 + self.task_executor.shutdown(wait=True) + + # 关闭 API 客户端 + self.api_client.close() + + logger.info("Worker stopped") + + +def main(): + """主函数""" + # 初始化 OpenTelemetry(如果可用) + try: + from telemetry import init_opentelemetry + init_opentelemetry() + logger.info("OpenTelemetry initialized") + except ImportError: + logger.debug("OpenTelemetry not available, skipping initialization") + except Exception as e: + logger.warning(f"Failed to initialize OpenTelemetry: {e}") + + # 创建并运行 Worker + worker = WorkerV2() + worker.run() + + +if __name__ == '__main__': + main() diff --git a/services/__init__.py b/services/__init__.py new file mode 100644 index 0000000..61e70d7 --- /dev/null +++ b/services/__init__.py @@ -0,0 +1,16 @@ +# -*- coding: utf-8 -*- +""" +服务层 + +包含 API 客户端、任务执行器、租约服务等服务组件。 +""" + +from services.api_client import APIClientV2 +from services.lease_service import LeaseService +from services.task_executor import TaskExecutor + +__all__ = [ + 'APIClientV2', + 'LeaseService', + 'TaskExecutor', +] diff --git a/services/api_client.py b/services/api_client.py new file mode 100644 index 0000000..a5615ed --- /dev/null +++ b/services/api_client.py @@ -0,0 +1,371 @@ +# -*- coding: utf-8 -*- +""" +v2 API 客户端 + +实现与渲染服务端 v2 接口的通信。 +""" + +import logging +import subprocess +import requests +from typing import Dict, List, Optional, Any + +from domain.task import Task +from domain.config import WorkerConfig + +logger = logging.getLogger(__name__) + + +class APIClientV2: + """ + v2 API 客户端 + + 负责与渲染服务端的所有 HTTP 通信。 + """ + + def __init__(self, config: WorkerConfig): + """ + 初始化 API 客户端 + + Args: + config: Worker 配置 + """ + self.config = config + self.base_url = config.api_endpoint.rstrip('/') + self.access_key = config.access_key + self.worker_id = config.worker_id + self.session = requests.Session() + + # 设置默认请求头 + self.session.headers.update({ + 'Content-Type': 'application/json', + 'Accept': 'application/json' + }) + + def sync(self, current_task_ids: List[str]) -> List[Task]: + """ + 心跳同步并拉取任务 + + Args: + current_task_ids: 当前正在执行的任务 ID 列表 + + Returns: + List[Task]: 新分配的任务列表 + """ + url = f"{self.base_url}/render/v2/worker/sync" + + # 将 task_id 转换为整数(服务端期望 []int64) + task_ids_int = [int(tid) for tid in current_task_ids if tid.isdigit()] + + payload = { + 'accessKey': self.access_key, + 'workerId': self.worker_id, + 'capabilities': self.config.capabilities, + 'maxConcurrency': self.config.max_concurrency, + 'currentTaskCount': len(current_task_ids), + 'currentTaskIds': task_ids_int, + 'ffmpegVersion': self._get_ffmpeg_version(), + 'codecInfo': self._get_codec_info(), + 'systemInfo': self._get_system_info() + } + + try: + resp = self.session.post(url, json=payload, timeout=10) + resp.raise_for_status() + data = resp.json() + + if data.get('code') != 200: + logger.warning(f"Sync failed: {data.get('message')}") + return [] + + # 解析任务列表 + tasks = [] + for task_data in data.get('data', {}).get('tasks', []): + try: + task = Task.from_dict(task_data) + tasks.append(task) + except Exception as e: + logger.error(f"Failed to parse task: {e}") + + if tasks: + logger.info(f"Received {len(tasks)} new tasks") + + return tasks + + except requests.exceptions.Timeout: + logger.warning("Sync timeout") + return [] + except requests.exceptions.RequestException as e: + logger.error(f"Sync request error: {e}") + return [] + except Exception as e: + logger.error(f"Sync error: {e}") + return [] + + def report_start(self, task_id: str) -> bool: + """ + 报告任务开始 + + Args: + task_id: 任务 ID + + Returns: + bool: 是否成功 + """ + url = f"{self.base_url}/render/v2/task/{task_id}/start" + + try: + resp = self.session.post( + url, + json={'workerId': self.worker_id}, + timeout=10 + ) + if resp.status_code == 200: + logger.debug(f"[task:{task_id}] Start reported") + return True + else: + logger.warning(f"[task:{task_id}] Report start failed: {resp.status_code}") + return False + except Exception as e: + logger.error(f"[task:{task_id}] Report start error: {e}") + return False + + def report_success(self, task_id: str, result: Dict[str, Any]) -> bool: + """ + 报告任务成功 + + Args: + task_id: 任务 ID + result: 任务结果数据 + + Returns: + bool: 是否成功 + """ + url = f"{self.base_url}/render/v2/task/{task_id}/success" + + try: + resp = self.session.post( + url, + json={ + 'workerId': self.worker_id, + 'result': result + }, + timeout=10 + ) + if resp.status_code == 200: + logger.debug(f"[task:{task_id}] Success reported") + return True + else: + logger.warning(f"[task:{task_id}] Report success failed: {resp.status_code}") + return False + except Exception as e: + logger.error(f"[task:{task_id}] Report success error: {e}") + return False + + def report_fail(self, task_id: str, error_code: str, error_message: str) -> bool: + """ + 报告任务失败 + + Args: + task_id: 任务 ID + error_code: 错误码 + error_message: 错误信息 + + Returns: + bool: 是否成功 + """ + url = f"{self.base_url}/render/v2/task/{task_id}/fail" + + try: + resp = self.session.post( + url, + json={ + 'workerId': self.worker_id, + 'errorCode': error_code, + 'errorMessage': error_message[:1000] # 限制长度 + }, + timeout=10 + ) + if resp.status_code == 200: + logger.debug(f"[task:{task_id}] Failure reported") + return True + else: + logger.warning(f"[task:{task_id}] Report fail failed: {resp.status_code}") + return False + except Exception as e: + logger.error(f"[task:{task_id}] Report fail error: {e}") + return False + + def get_upload_url(self, task_id: str, file_type: str, file_name: str = None) -> Optional[Dict[str, str]]: + """ + 获取上传 URL + + Args: + task_id: 任务 ID + file_type: 文件类型(video/audio/ts/mp4) + file_name: 文件名(可选) + + Returns: + Dict 包含 uploadUrl 和 accessUrl,失败返回 None + """ + url = f"{self.base_url}/render/v2/task/{task_id}/uploadUrl" + + payload = {'fileType': file_type} + if file_name: + payload['fileName'] = file_name + + try: + resp = self.session.post(url, json=payload, timeout=10) + if resp.status_code == 200: + data = resp.json() + if data.get('code') == 200: + return data.get('data') + logger.warning(f"[task:{task_id}] Get upload URL failed: {resp.status_code}") + return None + except Exception as e: + logger.error(f"[task:{task_id}] Get upload URL error: {e}") + return None + + def extend_lease(self, task_id: str, extension: int = None) -> bool: + """ + 延长租约 + + Args: + task_id: 任务 ID + extension: 延长秒数(默认使用配置值) + + Returns: + bool: 是否成功 + """ + if extension is None: + extension = self.config.lease_extension_duration + + url = f"{self.base_url}/render/v2/task/{task_id}/extend-lease" + + try: + resp = self.session.post( + url, + params={ + 'workerId': self.worker_id, + 'extension': extension + }, + timeout=10 + ) + if resp.status_code == 200: + logger.debug(f"[task:{task_id}] Lease extended by {extension}s") + return True + else: + logger.warning(f"[task:{task_id}] Extend lease failed: {resp.status_code}") + return False + except Exception as e: + logger.error(f"[task:{task_id}] Extend lease error: {e}") + return False + + def get_task_info(self, task_id: str) -> Optional[Dict]: + """ + 获取任务详情 + + Args: + task_id: 任务 ID + + Returns: + 任务详情字典,失败返回 None + """ + url = f"{self.base_url}/render/v2/task/{task_id}" + + try: + resp = self.session.get(url, timeout=10) + if resp.status_code == 200: + data = resp.json() + if data.get('code') == 200: + return data.get('data') + return None + except Exception as e: + logger.error(f"[task:{task_id}] Get task info error: {e}") + return None + + def _get_ffmpeg_version(self) -> str: + """获取 FFmpeg 版本""" + try: + result = subprocess.run( + ['ffmpeg', '-version'], + capture_output=True, + text=True, + timeout=5 + ) + first_line = result.stdout.split('\n')[0] + if 'version' in first_line: + parts = first_line.split() + for i, part in enumerate(parts): + if part == 'version' and i + 1 < len(parts): + return parts[i + 1] + return 'unknown' + except Exception: + return 'unknown' + + def _get_codec_info(self) -> str: + """获取支持的编解码器信息""" + try: + result = subprocess.run( + ['ffmpeg', '-codecs'], + capture_output=True, + text=True, + timeout=5 + ) + # 检查常用编解码器 + codecs = [] + output = result.stdout + if 'libx264' in output: + codecs.append('libx264') + if 'libx265' in output or 'hevc' in output: + codecs.append('libx265') + if 'aac' in output: + codecs.append('aac') + if 'libfdk_aac' in output: + codecs.append('libfdk_aac') + return ', '.join(codecs) if codecs else 'unknown' + except Exception: + return 'unknown' + + def _get_system_info(self) -> Dict[str, Any]: + """获取系统信息""" + try: + import platform + import psutil + + info = { + 'os': platform.system(), + 'cpu': f"{psutil.cpu_count()} cores", + 'memory': f"{psutil.virtual_memory().total // (1024**3)}GB", + 'cpuUsage': f"{psutil.cpu_percent()}%", + 'memoryAvailable': f"{psutil.virtual_memory().available // (1024**3)}GB" + } + + # 尝试获取 GPU 信息 + gpu_info = self._get_gpu_info() + if gpu_info: + info['gpu'] = gpu_info + + return info + except Exception: + return {} + + def _get_gpu_info(self) -> Optional[str]: + """获取 GPU 信息""" + try: + result = subprocess.run( + ['nvidia-smi', '--query-gpu=name', '--format=csv,noheader'], + capture_output=True, + text=True, + timeout=5 + ) + if result.returncode == 0: + gpu_name = result.stdout.strip().split('\n')[0] + return gpu_name + except Exception: + pass + return None + + def close(self): + """关闭会话""" + self.session.close() diff --git a/services/lease_service.py b/services/lease_service.py new file mode 100644 index 0000000..6e40dd9 --- /dev/null +++ b/services/lease_service.py @@ -0,0 +1,110 @@ +# -*- coding: utf-8 -*- +""" +租约续期服务 + +后台线程定期为正在执行的任务续期租约。 +""" + +import logging +import threading +import time +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from services.api_client import APIClientV2 + +logger = logging.getLogger(__name__) + + +class LeaseService: + """ + 租约续期服务 + + 在后台线程中定期调用 API 延长任务租约, + 防止长时间任务因租约过期被回收。 + """ + + def __init__( + self, + api_client: 'APIClientV2', + task_id: str, + interval: int = 60, + extension: int = 300 + ): + """ + 初始化租约服务 + + Args: + api_client: API 客户端 + task_id: 任务 ID + interval: 续期间隔(秒),默认 60 秒 + extension: 每次续期时长(秒),默认 300 秒 + """ + self.api_client = api_client + self.task_id = task_id + self.interval = interval + self.extension = extension + self.running = False + self.thread: threading.Thread = None + self._stop_event = threading.Event() + + def start(self): + """启动租约续期线程""" + if self.running: + logger.warning(f"[task:{self.task_id}] Lease service already running") + return + + self.running = True + self._stop_event.clear() + self.thread = threading.Thread( + target=self._run, + name=f"LeaseService-{self.task_id}", + daemon=True + ) + self.thread.start() + logger.debug(f"[task:{self.task_id}] Lease service started (interval={self.interval}s)") + + def stop(self): + """停止租约续期线程""" + if not self.running: + return + + self.running = False + self._stop_event.set() + + if self.thread and self.thread.is_alive(): + self.thread.join(timeout=5) + + logger.debug(f"[task:{self.task_id}] Lease service stopped") + + def _run(self): + """续期线程主循环""" + while self.running: + # 等待指定间隔或收到停止信号 + if self._stop_event.wait(timeout=self.interval): + # 收到停止信号 + break + + if self.running: + self._extend_lease() + + def _extend_lease(self): + """执行租约续期""" + try: + success = self.api_client.extend_lease(self.task_id, self.extension) + if success: + logger.debug(f"[task:{self.task_id}] Lease extended by {self.extension}s") + else: + logger.warning(f"[task:{self.task_id}] Failed to extend lease") + except Exception as e: + logger.warning(f"[task:{self.task_id}] Lease extension error: {e}") + + def __enter__(self): + """上下文管理器入口""" + self.start() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """上下文管理器出口""" + self.stop() + return False diff --git a/services/task_executor.py b/services/task_executor.py new file mode 100644 index 0000000..25def77 --- /dev/null +++ b/services/task_executor.py @@ -0,0 +1,234 @@ +# -*- coding: utf-8 -*- +""" +任务执行器 + +管理任务的并发执行,协调处理器、租约服务等组件。 +""" + +import logging +import threading +from concurrent.futures import ThreadPoolExecutor, Future +from typing import Dict, Optional, TYPE_CHECKING + +from domain.task import Task, TaskType +from domain.result import TaskResult, ErrorCode +from domain.config import WorkerConfig +from core.handler import TaskHandler +from services.lease_service import LeaseService + +if TYPE_CHECKING: + from services.api_client import APIClientV2 + +logger = logging.getLogger(__name__) + + +class TaskExecutor: + """ + 任务执行器 + + 负责任务的并发调度和执行,包括: + - 注册和管理任务处理器 + - 维护任务执行状态 + - 协调租约续期 + - 上报执行结果 + """ + + def __init__(self, config: WorkerConfig, api_client: 'APIClientV2'): + """ + 初始化任务执行器 + + Args: + config: Worker 配置 + api_client: API 客户端 + """ + self.config = config + self.api_client = api_client + + # 任务处理器注册表 + self.handlers: Dict[TaskType, TaskHandler] = {} + + # 当前任务跟踪 + self.current_tasks: Dict[str, Task] = {} + self.current_futures: Dict[str, Future] = {} + + # 线程池 + self.executor = ThreadPoolExecutor( + max_workers=config.max_concurrency, + thread_name_prefix="TaskWorker" + ) + + # 线程安全锁 + self.lock = threading.Lock() + + # 注册处理器 + self._register_handlers() + + def _register_handlers(self): + """注册所有任务处理器""" + # 延迟导入以避免循环依赖 + from handlers.render_video import RenderSegmentVideoHandler + from handlers.prepare_audio import PrepareJobAudioHandler + from handlers.package_ts import PackageSegmentTsHandler + from handlers.finalize_mp4 import FinalizeMp4Handler + + handlers = [ + RenderSegmentVideoHandler(self.config, self.api_client), + PrepareJobAudioHandler(self.config, self.api_client), + PackageSegmentTsHandler(self.config, self.api_client), + FinalizeMp4Handler(self.config, self.api_client), + ] + + for handler in handlers: + task_type = handler.get_supported_type() + self.handlers[task_type] = handler + logger.debug(f"Registered handler for {task_type.value}") + + def get_current_task_ids(self) -> list: + """ + 获取当前正在执行的任务 ID 列表 + + Returns: + 任务 ID 列表 + """ + with self.lock: + return list(self.current_tasks.keys()) + + def get_current_task_count(self) -> int: + """ + 获取当前正在执行的任务数量 + + Returns: + 任务数量 + """ + with self.lock: + return len(self.current_tasks) + + def can_accept_task(self) -> bool: + """ + 检查是否可以接受新任务 + + Returns: + 是否可以接受 + """ + return self.get_current_task_count() < self.config.max_concurrency + + def submit_task(self, task: Task) -> bool: + """ + 提交任务到线程池 + + Args: + task: 任务实体 + + Returns: + 是否提交成功 + """ + with self.lock: + # 检查任务是否已在执行 + if task.task_id in self.current_tasks: + logger.warning(f"[task:{task.task_id}] Task already running, skipping") + return False + + # 检查是否有对应的处理器 + if task.task_type not in self.handlers: + logger.error(f"[task:{task.task_id}] No handler for type: {task.task_type.value}") + return False + + # 记录任务 + self.current_tasks[task.task_id] = task + + # 提交到线程池 + future = self.executor.submit(self._process_task, task) + self.current_futures[task.task_id] = future + + logger.info(f"[task:{task.task_id}] Submitted ({task.task_type.value})") + return True + + def _process_task(self, task: Task): + """ + 处理单个任务(在线程池中执行) + + Args: + task: 任务实体 + """ + task_id = task.task_id + logger.info(f"[task:{task_id}] Starting {task.task_type.value}") + + # 启动租约续期服务 + lease_service = LeaseService( + self.api_client, + task_id, + interval=self.config.lease_extension_threshold, + extension=self.config.lease_extension_duration + ) + lease_service.start() + + try: + # 报告任务开始 + self.api_client.report_start(task_id) + + # 获取处理器 + handler = self.handlers.get(task.task_type) + if not handler: + raise ValueError(f"No handler for task type: {task.task_type}") + + # 执行前钩子 + handler.before_handle(task) + + # 执行任务 + result = handler.handle(task) + + # 执行后钩子 + handler.after_handle(task, result) + + # 上报结果 + if result.success: + self.api_client.report_success(task_id, result.data) + logger.info(f"[task:{task_id}] Completed successfully") + else: + error_code = result.error_code.value if result.error_code else 'E_UNKNOWN' + self.api_client.report_fail(task_id, error_code, result.error_message or '') + logger.error(f"[task:{task_id}] Failed: {result.error_message}") + + except Exception as e: + logger.error(f"[task:{task_id}] Exception: {e}", exc_info=True) + self.api_client.report_fail(task_id, 'E_UNKNOWN', str(e)) + + finally: + # 停止租约续期 + lease_service.stop() + + # 从当前任务中移除 + with self.lock: + self.current_tasks.pop(task_id, None) + self.current_futures.pop(task_id, None) + + def shutdown(self, wait: bool = True): + """ + 关闭执行器 + + Args: + wait: 是否等待所有任务完成 + """ + logger.info("Shutting down task executor...") + + # 关闭线程池 + self.executor.shutdown(wait=wait) + + # 清理状态 + with self.lock: + self.current_tasks.clear() + self.current_futures.clear() + + logger.info("Task executor shutdown complete") + + def get_handler(self, task_type: TaskType) -> Optional[TaskHandler]: + """ + 获取指定类型的处理器 + + Args: + task_type: 任务类型 + + Returns: + 处理器实例,不存在则返回 None + """ + return self.handlers.get(task_type) diff --git a/util/system.py b/util/system.py index 7596d53..b7f28a6 100644 --- a/util/system.py +++ b/util/system.py @@ -1,14 +1,24 @@ +# -*- coding: utf-8 -*- +""" +系统信息工具 + +提供系统信息采集功能。 +""" + import os import platform from datetime import datetime import psutil -from constant import SUPPORT_FEATURE, SOFTWARE_VERSION +from constant import SUPPORT_FEATURE, SOFTWARE_VERSION, V2_DEFAULT_CAPABILITIES def get_sys_info(): """ - Returns a dictionary with system information. + 获取系统信息(v1 格式) + + Returns: + dict: 系统信息字典 """ info = { 'version': SOFTWARE_VERSION, @@ -22,3 +32,65 @@ def get_sys_info(): 'support_feature': SUPPORT_FEATURE } return info + + +def get_sys_info_v2(): + """ + 获取系统信息(v2 格式) + + Returns: + dict: v2 API 所需的系统信息字典 + """ + mem = psutil.virtual_memory() + + info = { + 'os': platform.system(), + 'cpu': f"{os.cpu_count()} cores", + 'memory': f"{mem.total // (1024**3)}GB", + 'cpuUsage': f"{psutil.cpu_percent()}%", + 'memoryAvailable': f"{mem.available // (1024**3)}GB", + 'platform': platform.system(), + 'pythonVersion': platform.python_version(), + } + + # 尝试获取 GPU 信息 + gpu_info = _get_gpu_info() + if gpu_info: + info['gpu'] = gpu_info + + return info + + +def get_capabilities(): + """ + 获取 Worker 支持的能力列表 + + Returns: + list: 能力列表 + """ + return V2_DEFAULT_CAPABILITIES.copy() + + +def _get_gpu_info(): + """ + 尝试获取 GPU 信息 + + Returns: + str: GPU 信息,失败返回 None + """ + try: + import subprocess + # 尝试使用 nvidia-smi + result = subprocess.run( + ['nvidia-smi', '--query-gpu=name', '--format=csv,noheader'], + capture_output=True, + text=True, + timeout=5 + ) + if result.returncode == 0: + gpu_name = result.stdout.strip().split('\n')[0] + return gpu_name + except Exception: + pass + + return None