# -*- coding: utf-8 -*- """ 任务领域模型 定义任务类型、任务实体、渲染规格、输出规格等数据结构。 """ from enum import Enum from dataclasses import dataclass, field from typing import Dict, Any, Optional, List from datetime import datetime class TaskType(Enum): """任务类型枚举""" RENDER_SEGMENT_VIDEO = "RENDER_SEGMENT_VIDEO" # 渲染视频片段 COMPOSE_TRANSITION = "COMPOSE_TRANSITION" # 合成转场效果 PREPARE_JOB_AUDIO = "PREPARE_JOB_AUDIO" # 生成全局音频 PACKAGE_SEGMENT_TS = "PACKAGE_SEGMENT_TS" # 封装 TS 分片 FINALIZE_MP4 = "FINALIZE_MP4" # 产出最终 MP4 # 支持的转场类型(对应 FFmpeg xfade 参数) TRANSITION_TYPES = { 'fade': 'fade', # 淡入淡出(默认) 'dissolve': 'dissolve', # 溶解过渡 'wipeleft': 'wipeleft', # 向左擦除 'wiperight': 'wiperight', # 向右擦除 'wipeup': 'wipeup', # 向上擦除 'wipedown': 'wipedown', # 向下擦除 'slideleft': 'slideleft', # 向左滑动 'slideright': 'slideright', # 向右滑动 'slideup': 'slideup', # 向上滑动 'slidedown': 'slidedown', # 向下滑动 } # 支持的特效类型 EFFECT_TYPES = { 'cameraShot', # 相机定格效果 'zoom', # 缩放效果(预留) 'blur', # 模糊效果(预留) } class TaskStatus(Enum): """任务状态枚举""" PENDING = "PENDING" RUNNING = "RUNNING" SUCCESS = "SUCCESS" FAILED = "FAILED" @dataclass class TransitionConfig: """ 转场配置 用于 RENDER_SEGMENT_VIDEO 任务的入场/出场转场配置。 """ type: str = "fade" # 转场类型 duration_ms: int = 500 # 转场时长(毫秒) @classmethod def from_dict(cls, data: Optional[Dict]) -> Optional['TransitionConfig']: """从字典创建 TransitionConfig""" if not data: return None trans_type = data.get('type', 'fade') # 验证转场类型是否支持 if trans_type not in TRANSITION_TYPES: trans_type = 'fade' return cls( type=trans_type, duration_ms=int(data.get('durationMs', 500)) ) def get_overlap_ms(self) -> int: """获取 overlap 时长(单边,为转场时长的一半)""" return self.duration_ms // 2 def get_ffmpeg_transition(self) -> str: """获取 FFmpeg xfade 参数""" return TRANSITION_TYPES.get(self.type, 'fade') @dataclass class Effect: """ 特效配置 格式:type:params 例如:cameraShot:3,1 表示在第3秒定格1秒 """ effect_type: str # 效果类型 params: str = "" # 参数字符串 @classmethod def from_string(cls, effect_str: str) -> Optional['Effect']: """ 从字符串解析 Effect 格式:type:params 或 type(无参数时) """ if not effect_str: return None parts = effect_str.split(':', 1) effect_type = parts[0].strip() if effect_type not in EFFECT_TYPES: return None params = parts[1].strip() if len(parts) > 1 else "" return cls(effect_type=effect_type, params=params) @classmethod def parse_effects(cls, effects_str: Optional[str]) -> List['Effect']: """ 解析效果字符串 格式:effect1|effect2|effect3 例如:cameraShot:3,1|blur:5 """ if not effects_str: return [] effects = [] for part in effects_str.split('|'): effect = cls.from_string(part.strip()) if effect: effects.append(effect) return effects def get_camera_shot_params(self) -> tuple: """ 获取 cameraShot 效果参数 Returns: (start_sec, duration_sec): 开始时间和持续时间(秒) """ if self.effect_type != 'cameraShot': return (0, 0) if not self.params: return (3, 1) # 默认值 parts = self.params.split(',') try: start = int(parts[0]) if len(parts) >= 1 else 3 duration = int(parts[1]) if len(parts) >= 2 else 1 return (start, duration) except ValueError: return (3, 1) @dataclass class RenderSpec: """ 渲染规格 用于 RENDER_SEGMENT_VIDEO 任务,定义视频渲染参数。 """ crop_enable: bool = False crop_size: Optional[str] = None speed: str = "1.0" lut_url: Optional[str] = None overlay_url: Optional[str] = None effects: Optional[str] = None zoom_cut: bool = False video_crop: Optional[str] = None face_pos: Optional[str] = None transitions: Optional[str] = None # 转场配置(PRD v2 新增) transition_in: Optional[TransitionConfig] = None # 入场转场 transition_out: Optional[TransitionConfig] = None # 出场转场 @classmethod def from_dict(cls, data: Optional[Dict]) -> 'RenderSpec': """从字典创建 RenderSpec""" if not data: return cls() return cls( crop_enable=data.get('cropEnable', False), crop_size=data.get('cropSize'), speed=str(data.get('speed', '1.0')), lut_url=data.get('lutUrl'), overlay_url=data.get('overlayUrl'), effects=data.get('effects'), zoom_cut=data.get('zoomCut', False), video_crop=data.get('videoCrop'), face_pos=data.get('facePos'), transitions=data.get('transitions'), transition_in=TransitionConfig.from_dict(data.get('transitionIn')), transition_out=TransitionConfig.from_dict(data.get('transitionOut')) ) def has_transition_in(self) -> bool: """是否有入场转场""" return self.transition_in is not None and self.transition_in.duration_ms > 0 def has_transition_out(self) -> bool: """是否有出场转场""" return self.transition_out is not None and self.transition_out.duration_ms > 0 def get_overlap_head_ms(self) -> int: """获取头部 overlap 时长(毫秒)""" if self.has_transition_in(): return self.transition_in.get_overlap_ms() return 0 def get_overlap_tail_ms(self) -> int: """获取尾部 overlap 时长(毫秒)""" if self.has_transition_out(): return self.transition_out.get_overlap_ms() return 0 def get_effects(self) -> List['Effect']: """获取解析后的特效列表""" return Effect.parse_effects(self.effects) @dataclass class OutputSpec: """ 输出规格 用于 RENDER_SEGMENT_VIDEO 任务,定义视频输出参数。 """ width: int = 1080 height: int = 1920 fps: int = 30 bitrate: int = 4000000 codec: str = "h264" @classmethod def from_dict(cls, data: Optional[Dict]) -> 'OutputSpec': """从字典创建 OutputSpec""" if not data: return cls() return cls( width=data.get('width', 1080), height=data.get('height', 1920), fps=data.get('fps', 30), bitrate=data.get('bitrate', 4000000), codec=data.get('codec', 'h264') ) @dataclass class AudioSpec: """ 音频规格 用于 PREPARE_JOB_AUDIO 任务中的片段叠加音效。 """ audio_url: Optional[str] = None volume: float = 1.0 fade_in_ms: int = 10 fade_out_ms: int = 10 start_ms: int = 0 delay_ms: int = 0 loop_enable: bool = False @classmethod def from_dict(cls, data: Optional[Dict]) -> Optional['AudioSpec']: """从字典创建 AudioSpec""" if not data: return None return cls( audio_url=data.get('audioUrl'), volume=float(data.get('volume', 1.0)), fade_in_ms=int(data.get('fadeInMs', 10)), fade_out_ms=int(data.get('fadeOutMs', 10)), start_ms=int(data.get('startMs', 0)), delay_ms=int(data.get('delayMs', 0)), loop_enable=data.get('loopEnable', False) ) @dataclass class AudioProfile: """ 音频配置 用于 PREPARE_JOB_AUDIO 任务的全局音频参数。 """ sample_rate: int = 48000 channels: int = 2 codec: str = "aac" @classmethod def from_dict(cls, data: Optional[Dict]) -> 'AudioProfile': """从字典创建 AudioProfile""" if not data: return cls() return cls( sample_rate=data.get('sampleRate', 48000), channels=data.get('channels', 2), codec=data.get('codec', 'aac') ) @dataclass class Task: """ 任务实体 表示一个待执行的渲染任务。 """ task_id: str task_type: TaskType priority: int lease_expire_time: datetime payload: Dict[str, Any] @classmethod def from_dict(cls, data: Dict) -> 'Task': """从 API 响应字典创建 Task""" lease_time_str = data.get('leaseExpireTime', '') # 解析 ISO 8601 时间格式 if lease_time_str: if lease_time_str.endswith('Z'): lease_time_str = lease_time_str[:-1] + '+00:00' try: lease_expire_time = datetime.fromisoformat(lease_time_str) except ValueError: # 解析失败时使用当前时间 + 5分钟 lease_expire_time = datetime.now() else: lease_expire_time = datetime.now() return cls( task_id=str(data['taskId']), task_type=TaskType(data['taskType']), priority=data.get('priority', 0), lease_expire_time=lease_expire_time, payload=data.get('payload', {}) ) def get_job_id(self) -> str: """获取作业 ID""" return str(self.payload.get('jobId', '')) def get_segment_id(self) -> Optional[str]: """获取片段 ID(如果有)""" segment_id = self.payload.get('segmentId') return str(segment_id) if segment_id else None def get_plan_segment_index(self) -> int: """获取计划片段索引""" return int(self.payload.get('planSegmentIndex', 0)) def get_duration_ms(self) -> int: """获取时长(毫秒)""" return int(self.payload.get('durationMs', 5000)) def get_material_url(self) -> Optional[str]: """ 获取素材 URL 优先使用 boundMaterialUrl(实际可下载的 HTTP URL), 如果不存在则回退到 sourceRef(可能是 slot 引用)。 Returns: 素材 URL,如果都不存在返回 None """ return self.payload.get('boundMaterialUrl') or self.payload.get('sourceRef') def get_source_ref(self) -> Optional[str]: """获取素材源引用(slot 标识符,如 device:xxx)""" return self.payload.get('sourceRef') def get_bound_material_url(self) -> Optional[str]: """获取绑定的素材 URL(实际可下载的 HTTP URL)""" return self.payload.get('boundMaterialUrl') def get_render_spec(self) -> RenderSpec: """获取渲染规格""" return RenderSpec.from_dict(self.payload.get('renderSpec')) def get_output_spec(self) -> OutputSpec: """获取输出规格""" return OutputSpec.from_dict(self.payload.get('output')) def get_transition_type(self) -> Optional[str]: """获取转场类型(来自 TaskPayload 顶层)""" return self.payload.get('transitionType') def get_transition_ms(self) -> int: """获取转场时长(毫秒,来自 TaskPayload 顶层)""" return int(self.payload.get('transitionMs', 0)) def has_transition(self) -> bool: """是否有转场效果""" return self.get_transition_ms() > 0 def get_overlap_tail_ms(self) -> int: """ 获取尾部 overlap 时长(毫秒) 转场发生在当前片段与下一片段之间,当前片段需要在尾部多渲染 overlap 帧。 overlap = transitionMs / 2 """ return self.get_transition_ms() // 2 def get_transition_in_type(self) -> Optional[str]: """获取入场转场类型(来自前一片段的出场转场)""" return self.payload.get('transitionInType') def get_transition_in_ms(self) -> int: """获取入场转场时长(毫秒)""" return int(self.payload.get('transitionInMs', 0)) def get_transition_out_type(self) -> Optional[str]: """获取出场转场类型(当前片段的转场配置)""" return self.payload.get('transitionOutType') def get_transition_out_ms(self) -> int: """获取出场转场时长(毫秒)""" return int(self.payload.get('transitionOutMs', 0)) def has_transition_in(self) -> bool: """是否有入场转场""" return self.get_transition_in_ms() > 0 def has_transition_out(self) -> bool: """是否有出场转场""" return self.get_transition_out_ms() > 0 def get_overlap_head_ms(self) -> int: """ 获取头部 overlap 时长(毫秒) 入场转场来自前一个片段,当前片段需要在头部多渲染 overlap 帧。 overlap = transitionInMs / 2 """ return self.get_transition_in_ms() // 2 def get_overlap_tail_ms_v2(self) -> int: """ 获取尾部 overlap 时长(毫秒)- 使用新的字段名 出场转场用于当前片段与下一片段之间,当前片段需要在尾部多渲染 overlap 帧。 overlap = transitionOutMs / 2 """ return self.get_transition_out_ms() // 2 def get_bgm_url(self) -> Optional[str]: """获取 BGM URL""" return self.payload.get('bgmUrl') def get_total_duration_ms(self) -> int: """获取总时长(毫秒)""" return int(self.payload.get('totalDurationMs', 0)) def get_segments(self) -> List[Dict]: """获取片段列表""" return self.payload.get('segments', []) def get_audio_profile(self) -> AudioProfile: """获取音频配置""" return AudioProfile.from_dict(self.payload.get('audioProfile')) def get_video_url(self) -> Optional[str]: """获取视频 URL(用于 PACKAGE_SEGMENT_TS)""" return self.payload.get('videoUrl') def get_audio_url(self) -> Optional[str]: """获取音频 URL(用于 PACKAGE_SEGMENT_TS)""" return self.payload.get('audioUrl') def get_start_time_ms(self) -> int: """获取开始时间(毫秒)""" return int(self.payload.get('startTimeMs', 0)) def get_m3u8_url(self) -> Optional[str]: """获取 m3u8 URL(用于 FINALIZE_MP4)""" return self.payload.get('m3u8Url') def get_ts_list(self) -> List[str]: """获取 TS 列表(用于 FINALIZE_MP4)""" return self.payload.get('tsList', []) # ========== COMPOSE_TRANSITION 相关方法 ========== def get_transition_id(self) -> Optional[str]: """获取转场 ID(用于 COMPOSE_TRANSITION)""" return self.payload.get('transitionId') def get_prev_segment(self) -> Optional[Dict]: """获取前一个片段信息(用于 COMPOSE_TRANSITION)""" return self.payload.get('prevSegment') def get_next_segment(self) -> Optional[Dict]: """获取后一个片段信息(用于 COMPOSE_TRANSITION)""" return self.payload.get('nextSegment') def get_transition_config(self) -> Optional[TransitionConfig]: """获取转场配置(用于 COMPOSE_TRANSITION)""" return TransitionConfig.from_dict(self.payload.get('transition')) # ========== PACKAGE_SEGMENT_TS 转场相关方法 ========== def is_transition_segment(self) -> bool: """是否为转场分片(用于 PACKAGE_SEGMENT_TS)""" return self.payload.get('isTransitionSegment', False) def should_trim_head(self) -> bool: """是否需要裁剪头部 overlap(用于 PACKAGE_SEGMENT_TS)""" return self.payload.get('trimHead', False) def should_trim_tail(self) -> bool: """是否需要裁剪尾部 overlap(用于 PACKAGE_SEGMENT_TS)""" return self.payload.get('trimTail', False) def get_trim_head_ms(self) -> int: """获取头部裁剪时长(毫秒)""" return int(self.payload.get('trimHeadMs', 0)) def get_trim_tail_ms(self) -> int: """获取尾部裁剪时长(毫秒)""" return int(self.payload.get('trimTailMs', 0))