# -*- coding: utf-8 -*- """ 任务领域模型 定义任务类型、任务实体、渲染规格、输出规格等数据结构。 """ from enum import Enum from dataclasses import dataclass, field from typing import Dict, Any, Optional, List from datetime import datetime class TaskType(Enum): """任务类型枚举""" RENDER_SEGMENT_VIDEO = "RENDER_SEGMENT_VIDEO" # 渲染视频片段 PREPARE_JOB_AUDIO = "PREPARE_JOB_AUDIO" # 生成全局音频 PACKAGE_SEGMENT_TS = "PACKAGE_SEGMENT_TS" # 封装 TS 分片 FINALIZE_MP4 = "FINALIZE_MP4" # 产出最终 MP4 class TaskStatus(Enum): """任务状态枚举""" PENDING = "PENDING" RUNNING = "RUNNING" SUCCESS = "SUCCESS" FAILED = "FAILED" @dataclass class RenderSpec: """ 渲染规格 用于 RENDER_SEGMENT_VIDEO 任务,定义视频渲染参数。 """ crop_enable: bool = False crop_size: Optional[str] = None speed: str = "1.0" lut_url: Optional[str] = None overlay_url: Optional[str] = None effects: Optional[str] = None zoom_cut: bool = False video_crop: Optional[str] = None face_pos: Optional[str] = None transitions: Optional[str] = None @classmethod def from_dict(cls, data: Optional[Dict]) -> 'RenderSpec': """从字典创建 RenderSpec""" if not data: return cls() return cls( crop_enable=data.get('cropEnable', False), crop_size=data.get('cropSize'), speed=str(data.get('speed', '1.0')), lut_url=data.get('lutUrl'), overlay_url=data.get('overlayUrl'), effects=data.get('effects'), zoom_cut=data.get('zoomCut', False), video_crop=data.get('videoCrop'), face_pos=data.get('facePos'), transitions=data.get('transitions') ) @dataclass class OutputSpec: """ 输出规格 用于 RENDER_SEGMENT_VIDEO 任务,定义视频输出参数。 """ width: int = 1080 height: int = 1920 fps: int = 30 bitrate: int = 4000000 codec: str = "h264" @classmethod def from_dict(cls, data: Optional[Dict]) -> 'OutputSpec': """从字典创建 OutputSpec""" if not data: return cls() return cls( width=data.get('width', 1080), height=data.get('height', 1920), fps=data.get('fps', 30), bitrate=data.get('bitrate', 4000000), codec=data.get('codec', 'h264') ) @dataclass class AudioSpec: """ 音频规格 用于 PREPARE_JOB_AUDIO 任务中的片段叠加音效。 """ audio_url: Optional[str] = None volume: float = 1.0 fade_in_ms: int = 10 fade_out_ms: int = 10 start_ms: int = 0 delay_ms: int = 0 loop_enable: bool = False @classmethod def from_dict(cls, data: Optional[Dict]) -> Optional['AudioSpec']: """从字典创建 AudioSpec""" if not data: return None return cls( audio_url=data.get('audioUrl'), volume=float(data.get('volume', 1.0)), fade_in_ms=int(data.get('fadeInMs', 10)), fade_out_ms=int(data.get('fadeOutMs', 10)), start_ms=int(data.get('startMs', 0)), delay_ms=int(data.get('delayMs', 0)), loop_enable=data.get('loopEnable', False) ) @dataclass class AudioProfile: """ 音频配置 用于 PREPARE_JOB_AUDIO 任务的全局音频参数。 """ sample_rate: int = 48000 channels: int = 2 codec: str = "aac" @classmethod def from_dict(cls, data: Optional[Dict]) -> 'AudioProfile': """从字典创建 AudioProfile""" if not data: return cls() return cls( sample_rate=data.get('sampleRate', 48000), channels=data.get('channels', 2), codec=data.get('codec', 'aac') ) @dataclass class Task: """ 任务实体 表示一个待执行的渲染任务。 """ task_id: str task_type: TaskType priority: int lease_expire_time: datetime payload: Dict[str, Any] @classmethod def from_dict(cls, data: Dict) -> 'Task': """从 API 响应字典创建 Task""" lease_time_str = data.get('leaseExpireTime', '') # 解析 ISO 8601 时间格式 if lease_time_str: if lease_time_str.endswith('Z'): lease_time_str = lease_time_str[:-1] + '+00:00' try: lease_expire_time = datetime.fromisoformat(lease_time_str) except ValueError: # 解析失败时使用当前时间 + 5分钟 lease_expire_time = datetime.now() else: lease_expire_time = datetime.now() return cls( task_id=str(data['taskId']), task_type=TaskType(data['taskType']), priority=data.get('priority', 0), lease_expire_time=lease_expire_time, payload=data.get('payload', {}) ) def get_job_id(self) -> str: """获取作业 ID""" return str(self.payload.get('jobId', '')) def get_segment_id(self) -> Optional[str]: """获取片段 ID(如果有)""" segment_id = self.payload.get('segmentId') return str(segment_id) if segment_id else None def get_plan_segment_index(self) -> int: """获取计划片段索引""" return int(self.payload.get('planSegmentIndex', 0)) def get_duration_ms(self) -> int: """获取时长(毫秒)""" return int(self.payload.get('durationMs', 5000)) def get_material_url(self) -> Optional[str]: """获取素材 URL""" return self.payload.get('boundMaterialUrl') or self.payload.get('sourceRef') def get_render_spec(self) -> RenderSpec: """获取渲染规格""" return RenderSpec.from_dict(self.payload.get('renderSpec')) def get_output_spec(self) -> OutputSpec: """获取输出规格""" return OutputSpec.from_dict(self.payload.get('output')) def get_bgm_url(self) -> Optional[str]: """获取 BGM URL""" return self.payload.get('bgmUrl') def get_total_duration_ms(self) -> int: """获取总时长(毫秒)""" return int(self.payload.get('totalDurationMs', 0)) def get_segments(self) -> List[Dict]: """获取片段列表""" return self.payload.get('segments', []) def get_audio_profile(self) -> AudioProfile: """获取音频配置""" return AudioProfile.from_dict(self.payload.get('audioProfile')) def get_video_url(self) -> Optional[str]: """获取视频 URL(用于 PACKAGE_SEGMENT_TS)""" return self.payload.get('videoUrl') def get_audio_url(self) -> Optional[str]: """获取音频 URL(用于 PACKAGE_SEGMENT_TS)""" return self.payload.get('audioUrl') def get_start_time_ms(self) -> int: """获取开始时间(毫秒)""" return int(self.payload.get('startTimeMs', 0)) def get_m3u8_url(self) -> Optional[str]: """获取 m3u8 URL(用于 FINALIZE_MP4)""" return self.payload.get('m3u8Url') def get_ts_list(self) -> List[str]: """获取 TS 列表(用于 FINALIZE_MP4)""" return self.payload.get('tsList', [])