Files
FrameTour-RenderWorker/domain/task.py
Jerry Yan 2bded11a03 feat(task): 添加转场效果相关属性和方法
- 新增 get_transition_type、get_transition_ms、has_transition 方法用于处理转场类型和时长
- 新增 get_overlap_tail_ms、get_transition_in_type、get_transition_in_ms 等方法处理入场转场
- 新增 get_transition_out_type、get_transition_out_ms、has_transition_out 等方法处理出场转场
- 新增 get_overlap_head_ms、get_overlap_tail_ms_v2 方法计算头部和尾部重叠时长
- 更新渲染视频处理器中使用新的转场相关方法计算 overlap 时长
2026-01-14 09:30:09 +08:00

502 lines
16 KiB
Python

# -*- coding: utf-8 -*-
"""
任务领域模型
定义任务类型、任务实体、渲染规格、输出规格等数据结构。
"""
from enum import Enum
from dataclasses import dataclass, field
from typing import Dict, Any, Optional, List
from datetime import datetime
class TaskType(Enum):
"""任务类型枚举"""
RENDER_SEGMENT_VIDEO = "RENDER_SEGMENT_VIDEO" # 渲染视频片段
COMPOSE_TRANSITION = "COMPOSE_TRANSITION" # 合成转场效果
PREPARE_JOB_AUDIO = "PREPARE_JOB_AUDIO" # 生成全局音频
PACKAGE_SEGMENT_TS = "PACKAGE_SEGMENT_TS" # 封装 TS 分片
FINALIZE_MP4 = "FINALIZE_MP4" # 产出最终 MP4
# 支持的转场类型(对应 FFmpeg xfade 参数)
TRANSITION_TYPES = {
'fade': 'fade', # 淡入淡出(默认)
'dissolve': 'dissolve', # 溶解过渡
'wipeleft': 'wipeleft', # 向左擦除
'wiperight': 'wiperight', # 向右擦除
'wipeup': 'wipeup', # 向上擦除
'wipedown': 'wipedown', # 向下擦除
'slideleft': 'slideleft', # 向左滑动
'slideright': 'slideright', # 向右滑动
'slideup': 'slideup', # 向上滑动
'slidedown': 'slidedown', # 向下滑动
}
# 支持的特效类型
EFFECT_TYPES = {
'cameraShot', # 相机定格效果
'zoom', # 缩放效果(预留)
'blur', # 模糊效果(预留)
}
class TaskStatus(Enum):
"""任务状态枚举"""
PENDING = "PENDING"
RUNNING = "RUNNING"
SUCCESS = "SUCCESS"
FAILED = "FAILED"
@dataclass
class TransitionConfig:
"""
转场配置
用于 RENDER_SEGMENT_VIDEO 任务的入场/出场转场配置。
"""
type: str = "fade" # 转场类型
duration_ms: int = 500 # 转场时长(毫秒)
@classmethod
def from_dict(cls, data: Optional[Dict]) -> Optional['TransitionConfig']:
"""从字典创建 TransitionConfig"""
if not data:
return None
trans_type = data.get('type', 'fade')
# 验证转场类型是否支持
if trans_type not in TRANSITION_TYPES:
trans_type = 'fade'
return cls(
type=trans_type,
duration_ms=int(data.get('durationMs', 500))
)
def get_overlap_ms(self) -> int:
"""获取 overlap 时长(单边,为转场时长的一半)"""
return self.duration_ms // 2
def get_ffmpeg_transition(self) -> str:
"""获取 FFmpeg xfade 参数"""
return TRANSITION_TYPES.get(self.type, 'fade')
@dataclass
class Effect:
"""
特效配置
格式:type:params
例如:cameraShot:3,1 表示在第3秒定格1秒
"""
effect_type: str # 效果类型
params: str = "" # 参数字符串
@classmethod
def from_string(cls, effect_str: str) -> Optional['Effect']:
"""
从字符串解析 Effect
格式:type:params 或 type(无参数时)
"""
if not effect_str:
return None
parts = effect_str.split(':', 1)
effect_type = parts[0].strip()
if effect_type not in EFFECT_TYPES:
return None
params = parts[1].strip() if len(parts) > 1 else ""
return cls(effect_type=effect_type, params=params)
@classmethod
def parse_effects(cls, effects_str: Optional[str]) -> List['Effect']:
"""
解析效果字符串
格式:effect1|effect2|effect3
例如:cameraShot:3,1|blur:5
"""
if not effects_str:
return []
effects = []
for part in effects_str.split('|'):
effect = cls.from_string(part.strip())
if effect:
effects.append(effect)
return effects
def get_camera_shot_params(self) -> tuple:
"""
获取 cameraShot 效果参数
Returns:
(start_sec, duration_sec): 开始时间和持续时间(秒)
"""
if self.effect_type != 'cameraShot':
return (0, 0)
if not self.params:
return (3, 1) # 默认值
parts = self.params.split(',')
try:
start = int(parts[0]) if len(parts) >= 1 else 3
duration = int(parts[1]) if len(parts) >= 2 else 1
return (start, duration)
except ValueError:
return (3, 1)
@dataclass
class RenderSpec:
"""
渲染规格
用于 RENDER_SEGMENT_VIDEO 任务,定义视频渲染参数。
"""
crop_enable: bool = False
crop_size: Optional[str] = None
speed: str = "1.0"
lut_url: Optional[str] = None
overlay_url: Optional[str] = None
effects: Optional[str] = None
zoom_cut: bool = False
video_crop: Optional[str] = None
face_pos: Optional[str] = None
transitions: Optional[str] = None
# 转场配置(PRD v2 新增)
transition_in: Optional[TransitionConfig] = None # 入场转场
transition_out: Optional[TransitionConfig] = None # 出场转场
@classmethod
def from_dict(cls, data: Optional[Dict]) -> 'RenderSpec':
"""从字典创建 RenderSpec"""
if not data:
return cls()
return cls(
crop_enable=data.get('cropEnable', False),
crop_size=data.get('cropSize'),
speed=str(data.get('speed', '1.0')),
lut_url=data.get('lutUrl'),
overlay_url=data.get('overlayUrl'),
effects=data.get('effects'),
zoom_cut=data.get('zoomCut', False),
video_crop=data.get('videoCrop'),
face_pos=data.get('facePos'),
transitions=data.get('transitions'),
transition_in=TransitionConfig.from_dict(data.get('transitionIn')),
transition_out=TransitionConfig.from_dict(data.get('transitionOut'))
)
def has_transition_in(self) -> bool:
"""是否有入场转场"""
return self.transition_in is not None and self.transition_in.duration_ms > 0
def has_transition_out(self) -> bool:
"""是否有出场转场"""
return self.transition_out is not None and self.transition_out.duration_ms > 0
def get_overlap_head_ms(self) -> int:
"""获取头部 overlap 时长(毫秒)"""
if self.has_transition_in():
return self.transition_in.get_overlap_ms()
return 0
def get_overlap_tail_ms(self) -> int:
"""获取尾部 overlap 时长(毫秒)"""
if self.has_transition_out():
return self.transition_out.get_overlap_ms()
return 0
def get_effects(self) -> List['Effect']:
"""获取解析后的特效列表"""
return Effect.parse_effects(self.effects)
@dataclass
class OutputSpec:
"""
输出规格
用于 RENDER_SEGMENT_VIDEO 任务,定义视频输出参数。
"""
width: int = 1080
height: int = 1920
fps: int = 30
bitrate: int = 4000000
codec: str = "h264"
@classmethod
def from_dict(cls, data: Optional[Dict]) -> 'OutputSpec':
"""从字典创建 OutputSpec"""
if not data:
return cls()
return cls(
width=data.get('width', 1080),
height=data.get('height', 1920),
fps=data.get('fps', 30),
bitrate=data.get('bitrate', 4000000),
codec=data.get('codec', 'h264')
)
@dataclass
class AudioSpec:
"""
音频规格
用于 PREPARE_JOB_AUDIO 任务中的片段叠加音效。
"""
audio_url: Optional[str] = None
volume: float = 1.0
fade_in_ms: int = 10
fade_out_ms: int = 10
start_ms: int = 0
delay_ms: int = 0
loop_enable: bool = False
@classmethod
def from_dict(cls, data: Optional[Dict]) -> Optional['AudioSpec']:
"""从字典创建 AudioSpec"""
if not data:
return None
return cls(
audio_url=data.get('audioUrl'),
volume=float(data.get('volume', 1.0)),
fade_in_ms=int(data.get('fadeInMs', 10)),
fade_out_ms=int(data.get('fadeOutMs', 10)),
start_ms=int(data.get('startMs', 0)),
delay_ms=int(data.get('delayMs', 0)),
loop_enable=data.get('loopEnable', False)
)
@dataclass
class AudioProfile:
"""
音频配置
用于 PREPARE_JOB_AUDIO 任务的全局音频参数。
"""
sample_rate: int = 48000
channels: int = 2
codec: str = "aac"
@classmethod
def from_dict(cls, data: Optional[Dict]) -> 'AudioProfile':
"""从字典创建 AudioProfile"""
if not data:
return cls()
return cls(
sample_rate=data.get('sampleRate', 48000),
channels=data.get('channels', 2),
codec=data.get('codec', 'aac')
)
@dataclass
class Task:
"""
任务实体
表示一个待执行的渲染任务。
"""
task_id: str
task_type: TaskType
priority: int
lease_expire_time: datetime
payload: Dict[str, Any]
@classmethod
def from_dict(cls, data: Dict) -> 'Task':
"""从 API 响应字典创建 Task"""
lease_time_str = data.get('leaseExpireTime', '')
# 解析 ISO 8601 时间格式
if lease_time_str:
if lease_time_str.endswith('Z'):
lease_time_str = lease_time_str[:-1] + '+00:00'
try:
lease_expire_time = datetime.fromisoformat(lease_time_str)
except ValueError:
# 解析失败时使用当前时间 + 5分钟
lease_expire_time = datetime.now()
else:
lease_expire_time = datetime.now()
return cls(
task_id=str(data['taskId']),
task_type=TaskType(data['taskType']),
priority=data.get('priority', 0),
lease_expire_time=lease_expire_time,
payload=data.get('payload', {})
)
def get_job_id(self) -> str:
"""获取作业 ID"""
return str(self.payload.get('jobId', ''))
def get_segment_id(self) -> Optional[str]:
"""获取片段 ID(如果有)"""
segment_id = self.payload.get('segmentId')
return str(segment_id) if segment_id else None
def get_plan_segment_index(self) -> int:
"""获取计划片段索引"""
return int(self.payload.get('planSegmentIndex', 0))
def get_duration_ms(self) -> int:
"""获取时长(毫秒)"""
return int(self.payload.get('durationMs', 5000))
def get_material_url(self) -> Optional[str]:
"""获取素材 URL"""
return self.payload.get('boundMaterialUrl') or self.payload.get('sourceRef')
def get_render_spec(self) -> RenderSpec:
"""获取渲染规格"""
return RenderSpec.from_dict(self.payload.get('renderSpec'))
def get_output_spec(self) -> OutputSpec:
"""获取输出规格"""
return OutputSpec.from_dict(self.payload.get('output'))
def get_transition_type(self) -> Optional[str]:
"""获取转场类型(来自 TaskPayload 顶层)"""
return self.payload.get('transitionType')
def get_transition_ms(self) -> int:
"""获取转场时长(毫秒,来自 TaskPayload 顶层)"""
return int(self.payload.get('transitionMs', 0))
def has_transition(self) -> bool:
"""是否有转场效果"""
return self.get_transition_ms() > 0
def get_overlap_tail_ms(self) -> int:
"""
获取尾部 overlap 时长(毫秒)
转场发生在当前片段与下一片段之间,当前片段需要在尾部多渲染 overlap 帧。
overlap = transitionMs / 2
"""
return self.get_transition_ms() // 2
def get_transition_in_type(self) -> Optional[str]:
"""获取入场转场类型(来自前一片段的出场转场)"""
return self.payload.get('transitionInType')
def get_transition_in_ms(self) -> int:
"""获取入场转场时长(毫秒)"""
return int(self.payload.get('transitionInMs', 0))
def get_transition_out_type(self) -> Optional[str]:
"""获取出场转场类型(当前片段的转场配置)"""
return self.payload.get('transitionOutType')
def get_transition_out_ms(self) -> int:
"""获取出场转场时长(毫秒)"""
return int(self.payload.get('transitionOutMs', 0))
def has_transition_in(self) -> bool:
"""是否有入场转场"""
return self.get_transition_in_ms() > 0
def has_transition_out(self) -> bool:
"""是否有出场转场"""
return self.get_transition_out_ms() > 0
def get_overlap_head_ms(self) -> int:
"""
获取头部 overlap 时长(毫秒)
入场转场来自前一个片段,当前片段需要在头部多渲染 overlap 帧。
overlap = transitionInMs / 2
"""
return self.get_transition_in_ms() // 2
def get_overlap_tail_ms_v2(self) -> int:
"""
获取尾部 overlap 时长(毫秒)- 使用新的字段名
出场转场用于当前片段与下一片段之间,当前片段需要在尾部多渲染 overlap 帧。
overlap = transitionOutMs / 2
"""
return self.get_transition_out_ms() // 2
def get_bgm_url(self) -> Optional[str]:
"""获取 BGM URL"""
return self.payload.get('bgmUrl')
def get_total_duration_ms(self) -> int:
"""获取总时长(毫秒)"""
return int(self.payload.get('totalDurationMs', 0))
def get_segments(self) -> List[Dict]:
"""获取片段列表"""
return self.payload.get('segments', [])
def get_audio_profile(self) -> AudioProfile:
"""获取音频配置"""
return AudioProfile.from_dict(self.payload.get('audioProfile'))
def get_video_url(self) -> Optional[str]:
"""获取视频 URL(用于 PACKAGE_SEGMENT_TS)"""
return self.payload.get('videoUrl')
def get_audio_url(self) -> Optional[str]:
"""获取音频 URL(用于 PACKAGE_SEGMENT_TS)"""
return self.payload.get('audioUrl')
def get_start_time_ms(self) -> int:
"""获取开始时间(毫秒)"""
return int(self.payload.get('startTimeMs', 0))
def get_m3u8_url(self) -> Optional[str]:
"""获取 m3u8 URL(用于 FINALIZE_MP4)"""
return self.payload.get('m3u8Url')
def get_ts_list(self) -> List[str]:
"""获取 TS 列表(用于 FINALIZE_MP4)"""
return self.payload.get('tsList', [])
# ========== COMPOSE_TRANSITION 相关方法 ==========
def get_transition_id(self) -> Optional[str]:
"""获取转场 ID(用于 COMPOSE_TRANSITION)"""
return self.payload.get('transitionId')
def get_prev_segment(self) -> Optional[Dict]:
"""获取前一个片段信息(用于 COMPOSE_TRANSITION)"""
return self.payload.get('prevSegment')
def get_next_segment(self) -> Optional[Dict]:
"""获取后一个片段信息(用于 COMPOSE_TRANSITION)"""
return self.payload.get('nextSegment')
def get_transition_config(self) -> Optional[TransitionConfig]:
"""获取转场配置(用于 COMPOSE_TRANSITION)"""
return TransitionConfig.from_dict(self.payload.get('transition'))
# ========== PACKAGE_SEGMENT_TS 转场相关方法 ==========
def is_transition_segment(self) -> bool:
"""是否为转场分片(用于 PACKAGE_SEGMENT_TS)"""
return self.payload.get('isTransitionSegment', False)
def should_trim_head(self) -> bool:
"""是否需要裁剪头部 overlap(用于 PACKAGE_SEGMENT_TS)"""
return self.payload.get('trimHead', False)
def should_trim_tail(self) -> bool:
"""是否需要裁剪尾部 overlap(用于 PACKAGE_SEGMENT_TS)"""
return self.payload.get('trimTail', False)
def get_trim_head_ms(self) -> int:
"""获取头部裁剪时长(毫秒)"""
return int(self.payload.get('trimHeadMs', 0))
def get_trim_tail_ms(self) -> int:
"""获取尾部裁剪时长(毫秒)"""
return int(self.payload.get('trimTailMs', 0))