# -*- coding: utf-8 -*- """ 任务领域模型 定义任务类型、任务实体、渲染规格、输出规格等数据结构。 """ import os from dataclasses import dataclass, field from datetime import datetime from enum import Enum from math import isfinite from typing import Dict, Any, Optional, List from urllib.parse import urlparse, unquote # 支持的图片扩展名 IMAGE_EXTENSIONS = {'.jpg', '.jpeg', '.png', '.webp', '.bmp', '.gif'} class TaskType(Enum): """任务类型枚举""" RENDER_SEGMENT_VIDEO = "RENDER_SEGMENT_VIDEO" # 渲染视频片段 COMPOSE_TRANSITION = "COMPOSE_TRANSITION" # 合成转场效果 PREPARE_JOB_AUDIO = "PREPARE_JOB_AUDIO" # 生成全局音频 PACKAGE_SEGMENT_TS = "PACKAGE_SEGMENT_TS" # 封装 TS 分片 FINALIZE_MP4 = "FINALIZE_MP4" # 产出最终 MP4 # 支持的转场类型(对应 FFmpeg xfade 参数) TRANSITION_TYPES = { 'fade': 'fade', # 淡入淡出(默认) 'dissolve': 'dissolve', # 溶解过渡 'wipeleft': 'wipeleft', # 向左擦除 'wiperight': 'wiperight', # 向右擦除 'wipeup': 'wipeup', # 向上擦除 'wipedown': 'wipedown', # 向下擦除 'slideleft': 'slideleft', # 向左滑动 'slideright': 'slideright', # 向右滑动 'slideup': 'slideup', # 向上滑动 'slidedown': 'slidedown', # 向下滑动 } # 支持的特效类型 EFFECT_TYPES = { 'cameraShot', # 相机定格效果 'zoom', # 缩放效果(预留) 'blur', # 模糊效果(预留) 'ospeed', # 原始变速效果(兼容旧模板) } class TaskStatus(Enum): """任务状态枚举""" PENDING = "PENDING" RUNNING = "RUNNING" SUCCESS = "SUCCESS" FAILED = "FAILED" @dataclass class TransitionConfig: """ 转场配置 用于 RENDER_SEGMENT_VIDEO 任务的入场/出场转场配置。 """ type: str = "fade" # 转场类型 duration_ms: int = 500 # 转场时长(毫秒) @classmethod def from_dict(cls, data: Optional[Dict]) -> Optional['TransitionConfig']: """从字典创建 TransitionConfig""" if not data: return None trans_type = data.get('type', 'fade') # 验证转场类型是否支持 if trans_type not in TRANSITION_TYPES: trans_type = 'fade' return cls( type=trans_type, duration_ms=int(data.get('durationMs', 500)) ) def get_overlap_ms(self) -> int: """获取 overlap 时长(单边,为转场时长的一半)""" return self.duration_ms // 2 def get_ffmpeg_transition(self) -> str: """获取 FFmpeg xfade 参数""" return TRANSITION_TYPES.get(self.type, 'fade') @dataclass class Effect: """ 特效配置 格式:type:params 例如: - cameraShot:3,1 表示在第3秒定格1秒 - zoom:1.5,1.2,2 表示从第1.5秒开始放大 1.2 倍并持续 2 秒 """ effect_type: str # 效果类型 params: str = "" # 参数字符串 @classmethod def from_string(cls, effect_str: str) -> Optional['Effect']: """ 从字符串解析 Effect 格式:type:params 或 type(无参数时) """ if not effect_str: return None parts = effect_str.split(':', 1) effect_type = parts[0].strip() if effect_type not in EFFECT_TYPES: return None params = parts[1].strip() if len(parts) > 1 else "" return cls(effect_type=effect_type, params=params) @classmethod def parse_effects(cls, effects_str: Optional[str]) -> List['Effect']: """ 解析效果字符串 格式:effect1|effect2|effect3 例如:cameraShot:3,1|zoom:1.5,1.2,2 """ if not effects_str: return [] effects = [] for part in effects_str.split('|'): effect = cls.from_string(part.strip()) if effect: effects.append(effect) return effects def get_camera_shot_params(self) -> tuple: """ 获取 cameraShot 效果参数 Returns: (start_sec, duration_sec): 开始时间和持续时间(秒) """ if self.effect_type != 'cameraShot': return (0, 0) if not self.params: return (3, 1) # 默认值 parts = self.params.split(',') try: start = int(parts[0]) if len(parts) >= 1 else 3 duration = int(parts[1]) if len(parts) >= 2 else 1 return (start, duration) except ValueError: return (3, 1) def get_zoom_params(self) -> tuple: """ 获取 zoom 效果参数 Returns: (start_sec, scale_factor, duration_sec): 起始时间、放大倍数、持续时长(秒) """ if self.effect_type != 'zoom': return (0.0, 1.2, 1.0) default_start_sec = 0.0 default_scale_factor = 1.2 default_duration_sec = 1.0 if not self.params: return (default_start_sec, default_scale_factor, default_duration_sec) parts = [part.strip() for part in self.params.split(',')] try: start_sec = float(parts[0]) if len(parts) >= 1 and parts[0] else default_start_sec scale_factor = float(parts[1]) if len(parts) >= 2 and parts[1] else default_scale_factor duration_sec = float(parts[2]) if len(parts) >= 3 and parts[2] else default_duration_sec except ValueError: return (default_start_sec, default_scale_factor, default_duration_sec) if not isfinite(start_sec) or start_sec < 0: start_sec = default_start_sec if not isfinite(scale_factor) or scale_factor <= 1.0: scale_factor = default_scale_factor if not isfinite(duration_sec) or duration_sec <= 0: duration_sec = default_duration_sec return (start_sec, scale_factor, duration_sec) def get_ospeed_params(self) -> float: """获取 ospeed 效果参数,返回 PTS 乘数(>0),无效时返回 1.0""" if self.effect_type != 'ospeed': return 1.0 if not self.params: return 1.0 try: factor = float(self.params.strip()) except ValueError: return 1.0 if not isfinite(factor) or factor <= 0: return 1.0 return factor @dataclass class RenderSpec: """ 渲染规格 用于 RENDER_SEGMENT_VIDEO 任务,定义视频渲染参数。 """ crop_enable: bool = False crop_size: Optional[str] = None speed: str = "1.0" lut_url: Optional[str] = None overlay_url: Optional[str] = None effects: Optional[str] = None zoom_cut: bool = False video_crop: Optional[str] = None face_pos: Optional[str] = None transitions: Optional[str] = None # 转场配置(PRD v2 新增) transition_in: Optional[TransitionConfig] = None # 入场转场 transition_out: Optional[TransitionConfig] = None # 出场转场 @classmethod def from_dict(cls, data: Optional[Dict]) -> 'RenderSpec': """从字典创建 RenderSpec""" if not data: return cls() return cls( crop_enable=data.get('cropEnable', False), crop_size=data.get('cropSize'), speed=str(data.get('speed', '1.0')), lut_url=data.get('lutUrl'), overlay_url=data.get('overlayUrl'), effects=data.get('effects'), zoom_cut=data.get('zoomCut', False), video_crop=data.get('videoCrop'), face_pos=data.get('facePos'), transitions=data.get('transitions'), transition_in=TransitionConfig.from_dict(data.get('transitionIn')), transition_out=TransitionConfig.from_dict(data.get('transitionOut')) ) def has_transition_in(self) -> bool: """是否有入场转场""" return self.transition_in is not None and self.transition_in.duration_ms > 0 def has_transition_out(self) -> bool: """是否有出场转场""" return self.transition_out is not None and self.transition_out.duration_ms > 0 def get_overlap_head_ms(self) -> int: """获取头部 overlap 时长(毫秒)""" if self.has_transition_in(): return self.transition_in.get_overlap_ms() return 0 def get_overlap_tail_ms(self) -> int: """获取尾部 overlap 时长(毫秒)""" if self.has_transition_out(): return self.transition_out.get_overlap_ms() return 0 def get_effects(self) -> List['Effect']: """获取解析后的特效列表""" return Effect.parse_effects(self.effects) @dataclass class OutputSpec: """ 输出规格 用于 RENDER_SEGMENT_VIDEO 任务,定义视频输出参数。 """ width: int = 1080 height: int = 1920 fps: int = 30 bitrate: int = 4000000 codec: str = "h264" @classmethod def from_dict(cls, data: Optional[Dict]) -> 'OutputSpec': """从字典创建 OutputSpec""" if not data: return cls() return cls( width=data.get('width', 1080), height=data.get('height', 1920), fps=data.get('fps', 30), bitrate=data.get('bitrate', 4000000), codec=data.get('codec', 'h264') ) @dataclass class AudioSpec: """ 音频规格 用于 PREPARE_JOB_AUDIO 任务中的片段叠加音效。 """ audio_url: Optional[str] = None volume: float = 1.0 fade_in_ms: int = 10 fade_out_ms: int = 10 start_ms: int = 0 delay_ms: int = 0 loop_enable: bool = False @classmethod def from_dict(cls, data: Optional[Dict]) -> Optional['AudioSpec']: """从字典创建 AudioSpec""" if not data: return None return cls( audio_url=data.get('audioUrl'), volume=float(data.get('volume', 1.0)), fade_in_ms=int(data.get('fadeInMs', 10)), fade_out_ms=int(data.get('fadeOutMs', 10)), start_ms=int(data.get('startMs', 0)), delay_ms=int(data.get('delayMs', 0)), loop_enable=data.get('loopEnable', False) ) @dataclass class AudioProfile: """ 音频配置 用于 PREPARE_JOB_AUDIO 任务的全局音频参数。 """ sample_rate: int = 48000 channels: int = 2 codec: str = "aac" @classmethod def from_dict(cls, data: Optional[Dict]) -> 'AudioProfile': """从字典创建 AudioProfile""" if not data: return cls() return cls( sample_rate=data.get('sampleRate', 48000), channels=data.get('channels', 2), codec=data.get('codec', 'aac') ) @dataclass class Task: """ 任务实体 表示一个待执行的渲染任务。 """ task_id: str task_type: TaskType priority: int lease_expire_time: datetime payload: Dict[str, Any] @classmethod def from_dict(cls, data: Dict) -> 'Task': """从 API 响应字典创建 Task""" lease_time_str = data.get('leaseExpireTime', '') # 解析 ISO 8601 时间格式 if lease_time_str: if lease_time_str.endswith('Z'): lease_time_str = lease_time_str[:-1] + '+00:00' try: lease_expire_time = datetime.fromisoformat(lease_time_str) except ValueError: # 解析失败时使用当前时间 + 5分钟 lease_expire_time = datetime.now() else: lease_expire_time = datetime.now() return cls( task_id=str(data['taskId']), task_type=TaskType(data['taskType']), priority=data.get('priority', 0), lease_expire_time=lease_expire_time, payload=data.get('payload', {}) ) def get_job_id(self) -> str: """获取作业 ID""" return str(self.payload.get('jobId', '')) def get_segment_id(self) -> Optional[str]: """获取片段 ID(如果有)""" segment_id = self.payload.get('segmentId') return str(segment_id) if segment_id else None def get_plan_segment_index(self) -> int: """获取计划片段索引""" return int(self.payload.get('planSegmentIndex', 0)) def get_duration_ms(self) -> int: """获取时长(毫秒)""" return int(self.payload.get('durationMs', 5000)) def get_material_url(self) -> Optional[str]: """ 获取素材 URL 优先使用 boundMaterialUrl(实际可下载的 HTTP URL), 如果不存在则回退到 sourceRef(可能是 slot 引用)。 Returns: 素材 URL,如果都不存在返回 None """ return self.payload.get('boundMaterialUrl') or self.payload.get('sourceRef') def get_source_ref(self) -> Optional[str]: """获取素材源引用(slot 标识符,如 device:xxx)""" return self.payload.get('sourceRef') def get_bound_material_url(self) -> Optional[str]: """获取绑定的素材 URL(实际可下载的 HTTP URL)""" return self.payload.get('boundMaterialUrl') def get_material_type(self) -> str: """ 获取素材类型 优先使用服务端下发的 materialType 字段, 如果不存在则根据 URL 后缀自动推断。 Returns: 素材类型:"video" 或 "image" """ # 优先使用服务端下发的类型 material_type = self.payload.get('materialType') if material_type in ('video', 'image'): return material_type # 降级:根据 URL 后缀推断 material_url = self.get_material_url() if material_url: parsed = urlparse(material_url) path = unquote(parsed.path) _, ext = os.path.splitext(path) if ext.lower() in IMAGE_EXTENSIONS: return 'image' # 默认视频类型 return 'video' def is_image_material(self) -> bool: """判断素材是否为图片类型""" return self.get_material_type() == 'image' def get_render_spec(self) -> RenderSpec: """获取渲染规格""" return RenderSpec.from_dict(self.payload.get('renderSpec')) def get_output_spec(self) -> OutputSpec: """获取输出规格""" return OutputSpec.from_dict(self.payload.get('output')) def get_transition_type(self) -> Optional[str]: """获取转场类型(来自 TaskPayload 顶层)""" return self.payload.get('transitionType') def get_transition_ms(self) -> int: """获取转场时长(毫秒,来自 TaskPayload 顶层)""" return int(self.payload.get('transitionMs', 0)) def has_transition(self) -> bool: """是否有转场效果""" return self.get_transition_ms() > 0 def get_overlap_tail_ms(self) -> int: """ 获取尾部 overlap 时长(毫秒) 转场发生在当前片段与下一片段之间,当前片段需要在尾部多渲染 overlap 帧。 overlap = transitionMs / 2 """ return self.get_transition_ms() // 2 def get_transition_in_type(self) -> Optional[str]: """获取入场转场类型(来自前一片段的出场转场)""" return self.payload.get('transitionInType') def get_transition_in_ms(self) -> int: """获取入场转场时长(毫秒)""" return int(self.payload.get('transitionInMs', 0)) def get_transition_out_type(self) -> Optional[str]: """获取出场转场类型(当前片段的转场配置)""" return self.payload.get('transitionOutType') def get_transition_out_ms(self) -> int: """获取出场转场时长(毫秒)""" return int(self.payload.get('transitionOutMs', 0)) def has_transition_in(self) -> bool: """是否有入场转场""" return self.get_transition_in_ms() > 0 def has_transition_out(self) -> bool: """是否有出场转场""" return self.get_transition_out_ms() > 0 def get_overlap_head_ms(self) -> int: """ 获取头部 overlap 时长(毫秒) 入场转场来自前一个片段,当前片段需要在头部多渲染 overlap 帧。 overlap = transitionInMs / 2 """ return self.get_transition_in_ms() // 2 def get_overlap_tail_ms_v2(self) -> int: """ 获取尾部 overlap 时长(毫秒)- 使用新的字段名 出场转场用于当前片段与下一片段之间,当前片段需要在尾部多渲染 overlap 帧。 overlap = transitionOutMs / 2 """ return self.get_transition_out_ms() // 2 def get_bgm_url(self) -> Optional[str]: """获取 BGM URL""" return self.payload.get('bgmUrl') def get_total_duration_ms(self) -> int: """获取总时长(毫秒)""" return int(self.payload.get('totalDurationMs', 0)) def get_segments(self) -> List[Dict]: """获取片段列表""" return self.payload.get('segments', []) def get_audio_profile(self) -> AudioProfile: """获取音频配置""" return AudioProfile.from_dict(self.payload.get('audioProfile')) def get_video_url(self) -> Optional[str]: """获取视频 URL(用于 PACKAGE_SEGMENT_TS)""" return self.payload.get('videoUrl') def get_audio_url(self) -> Optional[str]: """获取音频 URL(用于 PACKAGE_SEGMENT_TS)""" return self.payload.get('audioUrl') def get_start_time_ms(self) -> int: """获取开始时间(毫秒)""" return int(self.payload.get('startTimeMs', 0)) def get_m3u8_url(self) -> Optional[str]: """获取 m3u8 URL(用于 FINALIZE_MP4)""" return self.payload.get('m3u8Url') def get_ts_list(self) -> List[str]: """获取 TS 列表(用于 FINALIZE_MP4)""" return self.payload.get('tsList', []) # ========== COMPOSE_TRANSITION 相关方法 ========== def get_transition_id(self) -> Optional[str]: """获取转场 ID(用于 COMPOSE_TRANSITION)""" return self.payload.get('transitionId') def get_prev_segment(self) -> Optional[Dict]: """获取前一个片段信息(用于 COMPOSE_TRANSITION)""" return self.payload.get('prevSegment') def get_next_segment(self) -> Optional[Dict]: """获取后一个片段信息(用于 COMPOSE_TRANSITION)""" return self.payload.get('nextSegment') def get_transition_config(self) -> Optional[TransitionConfig]: """获取转场配置(用于 COMPOSE_TRANSITION)""" return TransitionConfig.from_dict(self.payload.get('transition')) # ========== PACKAGE_SEGMENT_TS 转场相关方法 ========== def is_transition_segment(self) -> bool: """是否为转场分片(用于 PACKAGE_SEGMENT_TS)""" return self.payload.get('isTransitionSegment', False) def should_trim_head(self) -> bool: """是否需要裁剪头部 overlap(用于 PACKAGE_SEGMENT_TS)""" return self.payload.get('trimHead', False) def should_trim_tail(self) -> bool: """是否需要裁剪尾部 overlap(用于 PACKAGE_SEGMENT_TS)""" return self.payload.get('trimTail', False) def get_trim_head_ms(self) -> int: """获取头部裁剪时长(毫秒)""" return int(self.payload.get('trimHeadMs', 0)) def get_trim_tail_ms(self) -> int: """获取尾部裁剪时长(毫秒)""" return int(self.payload.get('trimTailMs', 0))