# -*- coding: utf-8 -*- """ 转场合成处理器 处理 COMPOSE_TRANSITION 任务,将相邻两个片段的 overlap 区域进行混合,生成转场效果。 使用 FFmpeg xfade 滤镜实现多种转场效果。 """ import os import logging from typing import List, Optional from handlers.base import BaseHandler from domain.task import Task, TaskType, TransitionConfig, TRANSITION_TYPES from domain.result import TaskResult, ErrorCode logger = logging.getLogger(__name__) class ComposeTransitionHandler(BaseHandler): """ 转场合成处理器 职责: - 下载前一个片段的视频(含尾部 overlap) - 下载后一个片段的视频(含头部 overlap) - 使用 xfade 滤镜合成转场效果 - 上传转场视频产物 关键约束: - 转场任务必须等待前后两个片段的 RENDER_SEGMENT_VIDEO 都完成后才能执行 - 输出编码参数必须与片段视频一致,确保后续 TS 封装兼容 - 转场视频不含音频轨道(音频由 PREPARE_JOB_AUDIO 统一处理) """ def get_supported_type(self) -> TaskType: return TaskType.COMPOSE_TRANSITION def handle(self, task: Task) -> TaskResult: """处理转场合成任务""" work_dir = self.create_work_dir(task.task_id) try: # 解析参数 transition_id = task.get_transition_id() prev_segment = task.get_prev_segment() next_segment = task.get_next_segment() transition_config = task.get_transition_config() output_spec = task.get_output_spec() # 参数验证 if not transition_id: return TaskResult.fail( ErrorCode.E_SPEC_INVALID, "Missing transitionId" ) if not prev_segment or not prev_segment.get('videoUrl'): return TaskResult.fail( ErrorCode.E_SPEC_INVALID, "Missing prevSegment.videoUrl" ) if not next_segment or not next_segment.get('videoUrl'): return TaskResult.fail( ErrorCode.E_SPEC_INVALID, "Missing nextSegment.videoUrl" ) if not transition_config: return TaskResult.fail( ErrorCode.E_SPEC_INVALID, "Missing transition config" ) # 获取 overlap 时长 overlap_tail_ms = prev_segment.get('overlapTailMs', 0) overlap_head_ms = next_segment.get('overlapHeadMs', 0) transition_duration_ms = transition_config.duration_ms # 验证 overlap 时长 if overlap_tail_ms <= 0 or overlap_head_ms <= 0: return TaskResult.fail( ErrorCode.E_SPEC_INVALID, f"Invalid overlap duration: tail={overlap_tail_ms}ms, head={overlap_head_ms}ms" ) logger.info( f"[task:{task.task_id}] Composing transition: {transition_config.type}, " f"duration={transition_duration_ms}ms, " f"overlap_tail={overlap_tail_ms}ms, overlap_head={overlap_head_ms}ms" ) # 1. 下载前一个片段视频 prev_video_file = os.path.join(work_dir, 'prev_segment.mp4') if not self.download_file(prev_segment['videoUrl'], prev_video_file): return TaskResult.fail( ErrorCode.E_INPUT_UNAVAILABLE, f"Failed to download prev segment video: {prev_segment['videoUrl']}" ) # 2. 下载后一个片段视频 next_video_file = os.path.join(work_dir, 'next_segment.mp4') if not self.download_file(next_segment['videoUrl'], next_video_file): return TaskResult.fail( ErrorCode.E_INPUT_UNAVAILABLE, f"Failed to download next segment video: {next_segment['videoUrl']}" ) # 3. 获取前一个片段的实际时长 prev_duration = self.probe_duration(prev_video_file) if not prev_duration: return TaskResult.fail( ErrorCode.E_FFMPEG_FAILED, "Failed to probe prev segment duration" ) # 4. 构建转场合成命令 output_file = os.path.join(work_dir, 'transition.mp4') cmd = self._build_command( prev_video_file=prev_video_file, next_video_file=next_video_file, output_file=output_file, prev_duration_sec=prev_duration, overlap_tail_ms=overlap_tail_ms, overlap_head_ms=overlap_head_ms, transition_config=transition_config, output_spec=output_spec ) # 5. 执行 FFmpeg if not self.run_ffmpeg(cmd, task.task_id): return TaskResult.fail( ErrorCode.E_FFMPEG_FAILED, "FFmpeg transition composition failed" ) # 6. 验证输出文件 if not self.ensure_file_exists(output_file, min_size=1024): return TaskResult.fail( ErrorCode.E_FFMPEG_FAILED, "Transition output file is missing or too small" ) # 7. 获取实际时长 actual_duration = self.probe_duration(output_file) actual_duration_ms = int(actual_duration * 1000) if actual_duration else transition_duration_ms # 8. 上传产物 transition_video_url = self.upload_file(task.task_id, 'video', output_file) if not transition_video_url: return TaskResult.fail( ErrorCode.E_UPLOAD_FAILED, "Failed to upload transition video" ) return TaskResult.ok({ 'transitionVideoUrl': transition_video_url, 'actualDurationMs': actual_duration_ms }) except Exception as e: logger.error(f"[task:{task.task_id}] Unexpected error: {e}", exc_info=True) return TaskResult.fail(ErrorCode.E_UNKNOWN, str(e)) finally: self.cleanup_work_dir(work_dir) def _build_command( self, prev_video_file: str, next_video_file: str, output_file: str, prev_duration_sec: float, overlap_tail_ms: int, overlap_head_ms: int, transition_config: TransitionConfig, output_spec ) -> List[str]: """ 构建转场合成命令 使用 xfade 滤镜合成转场效果: 1. 从前一个片段截取尾部 overlap 区域 2. 从后一个片段截取头部 overlap 区域 3. 使用 xfade 进行混合 注意: - 转场视频时长很短,需要特别处理 GOP 大小 - 确保第一帧是关键帧以便后续 TS 封装 Args: prev_video_file: 前一个片段视频路径 next_video_file: 后一个片段视频路径 output_file: 输出文件路径 prev_duration_sec: 前一个片段总时长(秒) overlap_tail_ms: 尾部 overlap 时长(毫秒) overlap_head_ms: 头部 overlap 时长(毫秒) transition_config: 转场配置 output_spec: 输出规格 Returns: FFmpeg 命令参数列表 """ # 计算时间参数 overlap_tail_sec = overlap_tail_ms / 1000.0 overlap_head_sec = overlap_head_ms / 1000.0 # 前一个片段的尾部 overlap 起始位置 tail_start_sec = prev_duration_sec - overlap_tail_sec # 转场时长(使用两个 overlap 区域的总和,xfade 会将两段合成为此时长) # 注意:xfade 的输出时长 = overlap_tail + overlap_head - duration # 当 duration = overlap_tail + overlap_head 时,输出时长约等于 duration transition_duration_sec = min(overlap_tail_sec, overlap_head_sec) # 获取 xfade 转场类型 xfade_transition = transition_config.get_ffmpeg_transition() # 构建滤镜 # [0:v] trim 截取前一个片段的尾部 overlap # [1:v] trim 截取后一个片段的头部 overlap # xfade 混合两段视频 filter_complex = ( f"[0:v]trim=start={tail_start_sec},setpts=PTS-STARTPTS[v0];" f"[1:v]trim=end={overlap_head_sec},setpts=PTS-STARTPTS[v1];" f"[v0][v1]xfade=transition={xfade_transition}:duration={transition_duration_sec}:offset=0[outv]" ) cmd = [ 'ffmpeg', '-y', '-hide_banner', '-i', prev_video_file, '-i', next_video_file, '-filter_complex', filter_complex, '-map', '[outv]', ] # 编码参数(根据硬件加速配置动态获取) cmd.extend(self.get_video_encode_args()) # 帧率 fps = output_spec.fps # 计算输出视频的预估帧数 # xfade 输出时长 ≈ overlap_tail + overlap_head - transition_duration output_duration_sec = overlap_tail_sec + overlap_head_sec - transition_duration_sec total_frames = int(output_duration_sec * fps) # 动态调整 GOP 大小:对于短视频,GOP 不能大于总帧数 # 确保至少有 1 个关键帧(第一帧),最小 GOP = 1 if total_frames <= 1: gop_size = 1 elif total_frames < fps: # 短于 1 秒的视频,使用全部帧数作为 GOP(整个视频只有开头一个关键帧) gop_size = total_frames else: # 正常情况,每秒一个关键帧(比标准的 2 秒更密集,适合短视频) gop_size = fps cmd.extend(['-r', str(fps)]) cmd.extend(['-g', str(gop_size)]) cmd.extend(['-keyint_min', str(min(gop_size, fps // 2 or 1))]) # 强制第一帧为关键帧 cmd.extend(['-force_key_frames', 'expr:eq(n,0)']) # 无音频 cmd.append('-an') # 输出文件 cmd.append(output_file) return cmd