From 9c6186ecd3654aaf3ad7ddd513ffc7dc2671377b Mon Sep 17 00:00:00 2001 From: Jerry Yan <792602257@qq.com> Date: Mon, 12 Jan 2026 22:41:22 +0800 Subject: [PATCH] =?UTF-8?q?feat(video):=20=E6=B7=BB=E5=8A=A0=E8=A7=86?= =?UTF-8?q?=E9=A2=91=E8=BD=AC=E5=9C=BA=E5=8A=9F=E8=83=BD=E6=94=AF=E6=8C=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 在 TASK_TYPES 中新增 COMPOSE_TRANSITION 类型 - 定义 TRANSITION_TYPES 常量支持多种转场效果 - 在 TaskType 枚举中添加 COMPOSE_TRANSITION - 创建 TransitionConfig 数据类处理转场配置 - 为 RenderSpec 添加 transition_in 和 transition_out 属性 - 在 Task 类中添加转场相关的方法 - 新增 ComposeTransitionHandler 处理转场合成任务 - 修改 PackageSegmentTsHandler 支持转场分片封装 - 修改 RenderSegmentVideoHandler 支持 overlap 区域生成 - 在 TaskExecutor 中注册转场处理器 --- constant/__init__.py | 15 ++ domain/task.py | 116 +++++++++++++- handlers/__init__.py | 2 + handlers/compose_transition.py | 273 +++++++++++++++++++++++++++++++++ handlers/package_ts.py | 157 +++++++++++++++++-- handlers/render_video.py | 72 +++++++-- services/task_executor.py | 2 + 7 files changed, 605 insertions(+), 32 deletions(-) create mode 100644 handlers/compose_transition.py diff --git a/constant/__init__.py b/constant/__init__.py index c723b63..54a353a 100644 --- a/constant/__init__.py +++ b/constant/__init__.py @@ -11,6 +11,7 @@ SOFTWARE_VERSION = '2.0.0' # 支持的任务类型 TASK_TYPES = ( 'RENDER_SEGMENT_VIDEO', + 'COMPOSE_TRANSITION', 'PREPARE_JOB_AUDIO', 'PACKAGE_SEGMENT_TS', 'FINALIZE_MP4', @@ -19,6 +20,20 @@ TASK_TYPES = ( # 默认能力 DEFAULT_CAPABILITIES = list(TASK_TYPES) +# 支持的转场类型(对应 FFmpeg xfade 参数) +TRANSITION_TYPES = ( + 'fade', # 淡入淡出(默认) + 'dissolve', # 溶解过渡 + 'wipeleft', # 向左擦除 + 'wiperight', # 向右擦除 + 'wipeup', # 向上擦除 + 'wipedown', # 向下擦除 + 'slideleft', # 向左滑动 + 'slideright', # 向右滑动 + 'slideup', # 向上滑动 + 'slidedown', # 向下滑动 +) + # 统一视频编码参数(来自集成文档) VIDEO_ENCODE_PARAMS = { 'codec': 'libx264', diff --git a/domain/task.py b/domain/task.py index d39a679..95bd058 100644 --- a/domain/task.py +++ b/domain/task.py @@ -14,11 +14,27 @@ from datetime import datetime class TaskType(Enum): """任务类型枚举""" RENDER_SEGMENT_VIDEO = "RENDER_SEGMENT_VIDEO" # 渲染视频片段 + COMPOSE_TRANSITION = "COMPOSE_TRANSITION" # 合成转场效果 PREPARE_JOB_AUDIO = "PREPARE_JOB_AUDIO" # 生成全局音频 PACKAGE_SEGMENT_TS = "PACKAGE_SEGMENT_TS" # 封装 TS 分片 FINALIZE_MP4 = "FINALIZE_MP4" # 产出最终 MP4 +# 支持的转场类型(对应 FFmpeg xfade 参数) +TRANSITION_TYPES = { + 'fade': 'fade', # 淡入淡出(默认) + 'dissolve': 'dissolve', # 溶解过渡 + 'wipeleft': 'wipeleft', # 向左擦除 + 'wiperight': 'wiperight', # 向右擦除 + 'wipeup': 'wipeup', # 向上擦除 + 'wipedown': 'wipedown', # 向下擦除 + 'slideleft': 'slideleft', # 向左滑动 + 'slideright': 'slideright', # 向右滑动 + 'slideup': 'slideup', # 向上滑动 + 'slidedown': 'slidedown', # 向下滑动 +} + + class TaskStatus(Enum): """任务状态枚举""" PENDING = "PENDING" @@ -27,6 +43,39 @@ class TaskStatus(Enum): FAILED = "FAILED" +@dataclass +class TransitionConfig: + """ + 转场配置 + + 用于 RENDER_SEGMENT_VIDEO 任务的入场/出场转场配置。 + """ + type: str = "fade" # 转场类型 + duration_ms: int = 500 # 转场时长(毫秒) + + @classmethod + def from_dict(cls, data: Optional[Dict]) -> Optional['TransitionConfig']: + """从字典创建 TransitionConfig""" + if not data: + return None + trans_type = data.get('type', 'fade') + # 验证转场类型是否支持 + if trans_type not in TRANSITION_TYPES: + trans_type = 'fade' + return cls( + type=trans_type, + duration_ms=int(data.get('durationMs', 500)) + ) + + def get_overlap_ms(self) -> int: + """获取 overlap 时长(单边,为转场时长的一半)""" + return self.duration_ms // 2 + + def get_ffmpeg_transition(self) -> str: + """获取 FFmpeg xfade 参数""" + return TRANSITION_TYPES.get(self.type, 'fade') + + @dataclass class RenderSpec: """ @@ -44,6 +93,9 @@ class RenderSpec: video_crop: Optional[str] = None face_pos: Optional[str] = None transitions: Optional[str] = None + # 转场配置(PRD v2 新增) + transition_in: Optional[TransitionConfig] = None # 入场转场 + transition_out: Optional[TransitionConfig] = None # 出场转场 @classmethod def from_dict(cls, data: Optional[Dict]) -> 'RenderSpec': @@ -60,9 +112,31 @@ class RenderSpec: zoom_cut=data.get('zoomCut', False), video_crop=data.get('videoCrop'), face_pos=data.get('facePos'), - transitions=data.get('transitions') + transitions=data.get('transitions'), + transition_in=TransitionConfig.from_dict(data.get('transitionIn')), + transition_out=TransitionConfig.from_dict(data.get('transitionOut')) ) + def has_transition_in(self) -> bool: + """是否有入场转场""" + return self.transition_in is not None and self.transition_in.duration_ms > 0 + + def has_transition_out(self) -> bool: + """是否有出场转场""" + return self.transition_out is not None and self.transition_out.duration_ms > 0 + + def get_overlap_head_ms(self) -> int: + """获取头部 overlap 时长(毫秒)""" + if self.has_transition_in(): + return self.transition_in.get_overlap_ms() + return 0 + + def get_overlap_tail_ms(self) -> int: + """获取尾部 overlap 时长(毫秒)""" + if self.has_transition_out(): + return self.transition_out.get_overlap_ms() + return 0 + @dataclass class OutputSpec: @@ -247,3 +321,43 @@ class Task: def get_ts_list(self) -> List[str]: """获取 TS 列表(用于 FINALIZE_MP4)""" return self.payload.get('tsList', []) + + # ========== COMPOSE_TRANSITION 相关方法 ========== + + def get_transition_id(self) -> Optional[str]: + """获取转场 ID(用于 COMPOSE_TRANSITION)""" + return self.payload.get('transitionId') + + def get_prev_segment(self) -> Optional[Dict]: + """获取前一个片段信息(用于 COMPOSE_TRANSITION)""" + return self.payload.get('prevSegment') + + def get_next_segment(self) -> Optional[Dict]: + """获取后一个片段信息(用于 COMPOSE_TRANSITION)""" + return self.payload.get('nextSegment') + + def get_transition_config(self) -> Optional[TransitionConfig]: + """获取转场配置(用于 COMPOSE_TRANSITION)""" + return TransitionConfig.from_dict(self.payload.get('transition')) + + # ========== PACKAGE_SEGMENT_TS 转场相关方法 ========== + + def is_transition_segment(self) -> bool: + """是否为转场分片(用于 PACKAGE_SEGMENT_TS)""" + return self.payload.get('isTransitionSegment', False) + + def should_trim_head(self) -> bool: + """是否需要裁剪头部 overlap(用于 PACKAGE_SEGMENT_TS)""" + return self.payload.get('trimHead', False) + + def should_trim_tail(self) -> bool: + """是否需要裁剪尾部 overlap(用于 PACKAGE_SEGMENT_TS)""" + return self.payload.get('trimTail', False) + + def get_trim_head_ms(self) -> int: + """获取头部裁剪时长(毫秒)""" + return int(self.payload.get('trimHeadMs', 0)) + + def get_trim_tail_ms(self) -> int: + """获取尾部裁剪时长(毫秒)""" + return int(self.payload.get('trimTailMs', 0)) diff --git a/handlers/__init__.py b/handlers/__init__.py index b905644..13dab17 100644 --- a/handlers/__init__.py +++ b/handlers/__init__.py @@ -7,6 +7,7 @@ from handlers.base import BaseHandler from handlers.render_video import RenderSegmentVideoHandler +from handlers.compose_transition import ComposeTransitionHandler from handlers.prepare_audio import PrepareJobAudioHandler from handlers.package_ts import PackageSegmentTsHandler from handlers.finalize_mp4 import FinalizeMp4Handler @@ -14,6 +15,7 @@ from handlers.finalize_mp4 import FinalizeMp4Handler __all__ = [ 'BaseHandler', 'RenderSegmentVideoHandler', + 'ComposeTransitionHandler', 'PrepareJobAudioHandler', 'PackageSegmentTsHandler', 'FinalizeMp4Handler', diff --git a/handlers/compose_transition.py b/handlers/compose_transition.py new file mode 100644 index 0000000..e6ea62e --- /dev/null +++ b/handlers/compose_transition.py @@ -0,0 +1,273 @@ +# -*- coding: utf-8 -*- +""" +转场合成处理器 + +处理 COMPOSE_TRANSITION 任务,将相邻两个片段的 overlap 区域进行混合,生成转场效果。 +使用 FFmpeg xfade 滤镜实现多种转场效果。 +""" + +import os +import logging +from typing import List, Optional + +from handlers.base import BaseHandler, VIDEO_ENCODE_ARGS +from domain.task import Task, TaskType, TransitionConfig, TRANSITION_TYPES +from domain.result import TaskResult, ErrorCode + +logger = logging.getLogger(__name__) + + +class ComposeTransitionHandler(BaseHandler): + """ + 转场合成处理器 + + 职责: + - 下载前一个片段的视频(含尾部 overlap) + - 下载后一个片段的视频(含头部 overlap) + - 使用 xfade 滤镜合成转场效果 + - 上传转场视频产物 + + 关键约束: + - 转场任务必须等待前后两个片段的 RENDER_SEGMENT_VIDEO 都完成后才能执行 + - 输出编码参数必须与片段视频一致,确保后续 TS 封装兼容 + - 转场视频不含音频轨道(音频由 PREPARE_JOB_AUDIO 统一处理) + """ + + def get_supported_type(self) -> TaskType: + return TaskType.COMPOSE_TRANSITION + + def handle(self, task: Task) -> TaskResult: + """处理转场合成任务""" + work_dir = self.create_work_dir(task.task_id) + + try: + # 解析参数 + transition_id = task.get_transition_id() + prev_segment = task.get_prev_segment() + next_segment = task.get_next_segment() + transition_config = task.get_transition_config() + output_spec = task.get_output_spec() + + # 参数验证 + if not transition_id: + return TaskResult.fail( + ErrorCode.E_SPEC_INVALID, + "Missing transitionId" + ) + + if not prev_segment or not prev_segment.get('videoUrl'): + return TaskResult.fail( + ErrorCode.E_SPEC_INVALID, + "Missing prevSegment.videoUrl" + ) + + if not next_segment or not next_segment.get('videoUrl'): + return TaskResult.fail( + ErrorCode.E_SPEC_INVALID, + "Missing nextSegment.videoUrl" + ) + + if not transition_config: + return TaskResult.fail( + ErrorCode.E_SPEC_INVALID, + "Missing transition config" + ) + + # 获取 overlap 时长 + overlap_tail_ms = prev_segment.get('overlapTailMs', 0) + overlap_head_ms = next_segment.get('overlapHeadMs', 0) + transition_duration_ms = transition_config.duration_ms + + # 验证 overlap 时长 + if overlap_tail_ms <= 0 or overlap_head_ms <= 0: + return TaskResult.fail( + ErrorCode.E_SPEC_INVALID, + f"Invalid overlap duration: tail={overlap_tail_ms}ms, head={overlap_head_ms}ms" + ) + + logger.info( + f"[task:{task.task_id}] Composing transition: {transition_config.type}, " + f"duration={transition_duration_ms}ms, " + f"overlap_tail={overlap_tail_ms}ms, overlap_head={overlap_head_ms}ms" + ) + + # 1. 下载前一个片段视频 + prev_video_file = os.path.join(work_dir, 'prev_segment.mp4') + if not self.download_file(prev_segment['videoUrl'], prev_video_file): + return TaskResult.fail( + ErrorCode.E_INPUT_UNAVAILABLE, + f"Failed to download prev segment video: {prev_segment['videoUrl']}" + ) + + # 2. 下载后一个片段视频 + next_video_file = os.path.join(work_dir, 'next_segment.mp4') + if not self.download_file(next_segment['videoUrl'], next_video_file): + return TaskResult.fail( + ErrorCode.E_INPUT_UNAVAILABLE, + f"Failed to download next segment video: {next_segment['videoUrl']}" + ) + + # 3. 获取前一个片段的实际时长 + prev_duration = self.probe_duration(prev_video_file) + if not prev_duration: + return TaskResult.fail( + ErrorCode.E_FFMPEG_FAILED, + "Failed to probe prev segment duration" + ) + + # 4. 构建转场合成命令 + output_file = os.path.join(work_dir, 'transition.mp4') + cmd = self._build_command( + prev_video_file=prev_video_file, + next_video_file=next_video_file, + output_file=output_file, + prev_duration_sec=prev_duration, + overlap_tail_ms=overlap_tail_ms, + overlap_head_ms=overlap_head_ms, + transition_config=transition_config, + output_spec=output_spec + ) + + # 5. 执行 FFmpeg + if not self.run_ffmpeg(cmd, task.task_id): + return TaskResult.fail( + ErrorCode.E_FFMPEG_FAILED, + "FFmpeg transition composition failed" + ) + + # 6. 验证输出文件 + if not self.ensure_file_exists(output_file, min_size=1024): + return TaskResult.fail( + ErrorCode.E_FFMPEG_FAILED, + "Transition output file is missing or too small" + ) + + # 7. 获取实际时长 + actual_duration = self.probe_duration(output_file) + actual_duration_ms = int(actual_duration * 1000) if actual_duration else transition_duration_ms + + # 8. 上传产物 + transition_video_url = self.upload_file(task.task_id, 'video', output_file) + if not transition_video_url: + return TaskResult.fail( + ErrorCode.E_UPLOAD_FAILED, + "Failed to upload transition video" + ) + + return TaskResult.ok({ + 'transitionVideoUrl': transition_video_url, + 'actualDurationMs': actual_duration_ms + }) + + except Exception as e: + logger.error(f"[task:{task.task_id}] Unexpected error: {e}", exc_info=True) + return TaskResult.fail(ErrorCode.E_UNKNOWN, str(e)) + + finally: + self.cleanup_work_dir(work_dir) + + def _build_command( + self, + prev_video_file: str, + next_video_file: str, + output_file: str, + prev_duration_sec: float, + overlap_tail_ms: int, + overlap_head_ms: int, + transition_config: TransitionConfig, + output_spec + ) -> List[str]: + """ + 构建转场合成命令 + + 使用 xfade 滤镜合成转场效果: + 1. 从前一个片段截取尾部 overlap 区域 + 2. 从后一个片段截取头部 overlap 区域 + 3. 使用 xfade 进行混合 + + 注意: + - 转场视频时长很短,需要特别处理 GOP 大小 + - 确保第一帧是关键帧以便后续 TS 封装 + + Args: + prev_video_file: 前一个片段视频路径 + next_video_file: 后一个片段视频路径 + output_file: 输出文件路径 + prev_duration_sec: 前一个片段总时长(秒) + overlap_tail_ms: 尾部 overlap 时长(毫秒) + overlap_head_ms: 头部 overlap 时长(毫秒) + transition_config: 转场配置 + output_spec: 输出规格 + + Returns: + FFmpeg 命令参数列表 + """ + # 计算时间参数 + overlap_tail_sec = overlap_tail_ms / 1000.0 + overlap_head_sec = overlap_head_ms / 1000.0 + + # 前一个片段的尾部 overlap 起始位置 + tail_start_sec = prev_duration_sec - overlap_tail_sec + + # 转场时长(使用两个 overlap 区域的总和,xfade 会将两段合成为此时长) + # 注意:xfade 的输出时长 = overlap_tail + overlap_head - duration + # 当 duration = overlap_tail + overlap_head 时,输出时长约等于 duration + transition_duration_sec = min(overlap_tail_sec, overlap_head_sec) + + # 获取 xfade 转场类型 + xfade_transition = transition_config.get_ffmpeg_transition() + + # 构建滤镜 + # [0:v] trim 截取前一个片段的尾部 overlap + # [1:v] trim 截取后一个片段的头部 overlap + # xfade 混合两段视频 + filter_complex = ( + f"[0:v]trim=start={tail_start_sec},setpts=PTS-STARTPTS[v0];" + f"[1:v]trim=end={overlap_head_sec},setpts=PTS-STARTPTS[v1];" + f"[v0][v1]xfade=transition={xfade_transition}:duration={transition_duration_sec}:offset=0[outv]" + ) + + cmd = [ + 'ffmpeg', '-y', '-hide_banner', + '-i', prev_video_file, + '-i', next_video_file, + '-filter_complex', filter_complex, + '-map', '[outv]', + ] + + # 编码参数(与片段视频一致) + cmd.extend(VIDEO_ENCODE_ARGS) + + # 帧率 + fps = output_spec.fps + + # 计算输出视频的预估帧数 + # xfade 输出时长 ≈ overlap_tail + overlap_head - transition_duration + output_duration_sec = overlap_tail_sec + overlap_head_sec - transition_duration_sec + total_frames = int(output_duration_sec * fps) + + # 动态调整 GOP 大小:对于短视频,GOP 不能大于总帧数 + # 确保至少有 1 个关键帧(第一帧),最小 GOP = 1 + if total_frames <= 1: + gop_size = 1 + elif total_frames < fps: + # 短于 1 秒的视频,使用全部帧数作为 GOP(整个视频只有开头一个关键帧) + gop_size = total_frames + else: + # 正常情况,每秒一个关键帧(比标准的 2 秒更密集,适合短视频) + gop_size = fps + + cmd.extend(['-r', str(fps)]) + cmd.extend(['-g', str(gop_size)]) + cmd.extend(['-keyint_min', str(min(gop_size, fps // 2 or 1))]) + + # 强制第一帧为关键帧 + cmd.extend(['-force_key_frames', 'expr:eq(n,0)']) + + # 无音频 + cmd.append('-an') + + # 输出文件 + cmd.append(output_file) + + return cmd diff --git a/handlers/package_ts.py b/handlers/package_ts.py index d219823..0d6864a 100644 --- a/handlers/package_ts.py +++ b/handlers/package_ts.py @@ -3,13 +3,14 @@ TS 分片封装处理器 处理 PACKAGE_SEGMENT_TS 任务,将视频片段和对应时间区间的音频封装为 TS 分片。 +支持转场相关的 overlap 裁剪和转场分片封装。 """ import os import logging -from typing import List +from typing import List, Optional -from handlers.base import BaseHandler +from handlers.base import BaseHandler, VIDEO_ENCODE_ARGS from domain.task import Task, TaskType from domain.result import TaskResult, ErrorCode @@ -31,6 +32,15 @@ class PackageSegmentTsHandler(BaseHandler): - TS 必须包含音视频同轨 - 使用 output_ts_offset 保证时间戳连续 - 输出 extinfDurationSec 供 m3u8 使用 + + 转场相关: + - 普通片段 TS:需要裁剪掉 overlap 区域(已被转场分片使用) + - 转场分片 TS:直接封装转场视频产物,无需裁剪 + - 无转场时:走原有逻辑,不做裁剪 + + 精确裁剪: + - 当需要裁剪 overlap 区域时,必须使用重编码方式(-vf trim)才能精确切割 + - 使用 -c copy 只能从关键帧切割,会导致不精确 """ def get_supported_type(self) -> TaskType: @@ -46,6 +56,14 @@ class PackageSegmentTsHandler(BaseHandler): audio_url = task.get_audio_url() start_time_ms = task.get_start_time_ms() duration_ms = task.get_duration_ms() + output_spec = task.get_output_spec() + + # 转场相关参数 + is_transition_segment = task.is_transition_segment() + trim_head = task.should_trim_head() + trim_tail = task.should_trim_tail() + trim_head_ms = task.get_trim_head_ms() + trim_tail_ms = task.get_trim_tail_ms() if not video_url: return TaskResult.fail( @@ -79,35 +97,67 @@ class PackageSegmentTsHandler(BaseHandler): f"Failed to download audio: {audio_url}" ) - # 3. 构建 TS 封装命令 + # 3. 判断是否需要精确裁剪视频 + needs_video_trim = not is_transition_segment and ( + (trim_head and trim_head_ms > 0) or + (trim_tail and trim_tail_ms > 0) + ) + + # 4. 如果需要裁剪,先重编码裁剪视频 + processed_video_file = video_file + if needs_video_trim: + processed_video_file = os.path.join(work_dir, 'trimmed_video.mp4') + trim_cmd = self._build_trim_command( + video_file=video_file, + output_file=processed_video_file, + trim_head_ms=trim_head_ms if trim_head else 0, + trim_tail_ms=trim_tail_ms if trim_tail else 0, + output_spec=output_spec + ) + + logger.info(f"[task:{task.task_id}] Trimming video: head={trim_head_ms}ms, tail={trim_tail_ms}ms") + + if not self.run_ffmpeg(trim_cmd, task.task_id): + return TaskResult.fail( + ErrorCode.E_FFMPEG_FAILED, + "Video trim failed" + ) + + if not self.ensure_file_exists(processed_video_file, min_size=1024): + return TaskResult.fail( + ErrorCode.E_FFMPEG_FAILED, + "Trimmed video file is missing or too small" + ) + + # 5. 构建 TS 封装命令 output_file = os.path.join(work_dir, 'segment.ts') - cmd = self._build_command( - video_file=video_file, + cmd = self._build_package_command( + video_file=processed_video_file, audio_file=audio_file, output_file=output_file, start_sec=start_sec, duration_sec=duration_sec ) - # 4. 执行 FFmpeg + # 6. 执行 FFmpeg if not self.run_ffmpeg(cmd, task.task_id): return TaskResult.fail( ErrorCode.E_FFMPEG_FAILED, "TS packaging failed" ) - # 5. 验证输出文件 + # 7. 验证输出文件 if not self.ensure_file_exists(output_file, min_size=1024): return TaskResult.fail( ErrorCode.E_FFMPEG_FAILED, "TS output file is missing or too small" ) - # 6. 获取实际时长(用于 EXTINF) + # 8. 获取实际时长(用于 EXTINF) actual_duration = self.probe_duration(output_file) extinf_duration = actual_duration if actual_duration else duration_sec - # 7. 上传产物 + # 9. 上传产物 ts_url = self.upload_file(task.task_id, 'ts', output_file) if not ts_url: return TaskResult.fail( @@ -127,7 +177,83 @@ class PackageSegmentTsHandler(BaseHandler): finally: self.cleanup_work_dir(work_dir) - def _build_command( + def _build_trim_command( + self, + video_file: str, + output_file: str, + trim_head_ms: int, + trim_tail_ms: int, + output_spec + ) -> List[str]: + """ + 构建视频精确裁剪命令(重编码方式) + + 使用 trim 滤镜进行精确帧级裁剪,而非 -ss/-t 参数的关键帧裁剪。 + + Args: + video_file: 输入视频路径 + output_file: 输出视频路径 + trim_head_ms: 头部裁剪时长(毫秒) + trim_tail_ms: 尾部裁剪时长(毫秒) + output_spec: 输出规格 + + Returns: + FFmpeg 命令参数列表 + """ + # 获取原视频时长 + original_duration = self.probe_duration(video_file) + if not original_duration: + original_duration = 10.0 # 默认值,避免除零 + + trim_head_sec = trim_head_ms / 1000.0 + trim_tail_sec = trim_tail_ms / 1000.0 + + # 计算裁剪后的起止时间 + start_time = trim_head_sec + end_time = original_duration - trim_tail_sec + + # 构建 trim 滤镜 + vf_filter = f"trim=start={start_time}:end={end_time},setpts=PTS-STARTPTS" + + cmd = [ + 'ffmpeg', '-y', '-hide_banner', + '-i', video_file, + '-vf', vf_filter, + ] + + # 编码参数 + cmd.extend(VIDEO_ENCODE_ARGS) + + # 帧率 + fps = output_spec.fps + cmd.extend(['-r', str(fps)]) + + # 计算输出视频帧数,动态调整 GOP + output_duration_sec = end_time - start_time + total_frames = int(output_duration_sec * fps) + + # 动态 GOP:短视频使用较小的 GOP + if total_frames <= 1: + gop_size = 1 + elif total_frames < fps: + gop_size = total_frames + else: + gop_size = fps # 每秒一个关键帧 + + cmd.extend(['-g', str(gop_size)]) + cmd.extend(['-keyint_min', str(min(gop_size, fps // 2 or 1))]) + + # 强制第一帧为关键帧 + cmd.extend(['-force_key_frames', 'expr:eq(n,0)']) + + # 无音频(音频单独处理) + cmd.append('-an') + + cmd.append(output_file) + + return cmd + + def _build_package_command( self, video_file: str, audio_file: str, @@ -138,12 +264,15 @@ class PackageSegmentTsHandler(BaseHandler): """ 构建 TS 封装命令 + 将视频和对应时间区间的音频封装为 TS 分片。 + 视频使用 copy 模式(已经过精确裁剪或无需裁剪)。 + Args: - video_file: 视频文件路径 + video_file: 视频文件路径(已处理) audio_file: 音频文件路径 output_file: 输出文件路径 - start_sec: 开始时间(秒) - duration_sec: 时长(秒) + start_sec: 音频开始时间(秒) + duration_sec: 音频时长(秒) Returns: FFmpeg 命令参数列表 @@ -159,7 +288,7 @@ class PackageSegmentTsHandler(BaseHandler): # 映射流 '-map', '0:v:0', # 使用第一个输入的视频流 '-map', '1:a:0', # 使用第二个输入的音频流 - # 复制编码(不重新编码) + # 复制编码(视频已处理,无需重编码) '-c:v', 'copy', '-c:a', 'copy', # 关键:时间戳偏移,保证整体连续 diff --git a/handlers/render_video.py b/handlers/render_video.py index a91b622..02c1927 100644 --- a/handlers/render_video.py +++ b/handlers/render_video.py @@ -3,11 +3,12 @@ 视频片段渲染处理器 处理 RENDER_SEGMENT_VIDEO 任务,将原素材渲染为符合输出规格的视频片段。 +支持转场 overlap 区域的帧冻结生成。 """ import os import logging -from typing import List, Optional +from typing import List, Optional, Tuple from handlers.base import BaseHandler, VIDEO_ENCODE_ARGS from domain.task import Task, TaskType, RenderSpec, OutputSpec @@ -25,7 +26,7 @@ class RenderSegmentVideoHandler(BaseHandler): - 下载 LUT 文件(如有) - 下载叠加层(如有) - 构建 FFmpeg 渲染命令 - - 执行渲染 + - 执行渲染(支持帧冻结生成 overlap 区域) - 上传产物 """ @@ -77,7 +78,11 @@ class RenderSegmentVideoHandler(BaseHandler): logger.warning(f"[task:{task.task_id}] Failed to download overlay, continuing without it") overlay_file = None - # 4. 构建 FFmpeg 命令 + # 4. 计算 overlap 时长 + overlap_head_ms = render_spec.get_overlap_head_ms() + overlap_tail_ms = render_spec.get_overlap_tail_ms() + + # 5. 构建 FFmpeg 命令 output_file = os.path.join(work_dir, 'output.mp4') cmd = self._build_command( input_file=input_file, @@ -86,28 +91,30 @@ class RenderSegmentVideoHandler(BaseHandler): output_spec=output_spec, duration_ms=duration_ms, lut_file=lut_file, - overlay_file=overlay_file + overlay_file=overlay_file, + overlap_head_ms=overlap_head_ms, + overlap_tail_ms=overlap_tail_ms ) - # 5. 执行 FFmpeg + # 6. 执行 FFmpeg if not self.run_ffmpeg(cmd, task.task_id): return TaskResult.fail( ErrorCode.E_FFMPEG_FAILED, "FFmpeg rendering failed" ) - # 6. 验证输出文件 + # 7. 验证输出文件 if not self.ensure_file_exists(output_file, min_size=4096): return TaskResult.fail( ErrorCode.E_FFMPEG_FAILED, "Output file is missing or too small" ) - # 7. 获取实际时长 + # 8. 获取实际时长 actual_duration = self.probe_duration(output_file) actual_duration_ms = int(actual_duration * 1000) if actual_duration else duration_ms - # 8. 上传产物 + # 9. 上传产物 video_url = self.upload_file(task.task_id, 'video', output_file) if not video_url: return TaskResult.fail( @@ -115,10 +122,15 @@ class RenderSegmentVideoHandler(BaseHandler): "Failed to upload video" ) - return TaskResult.ok({ + # 10. 构建结果(包含 overlap 信息) + result_data = { 'videoUrl': video_url, - 'actualDurationMs': actual_duration_ms - }) + 'actualDurationMs': actual_duration_ms, + 'overlapHeadMs': overlap_head_ms, + 'overlapTailMs': overlap_tail_ms + } + + return TaskResult.ok(result_data) except Exception as e: logger.error(f"[task:{task.task_id}] Unexpected error: {e}", exc_info=True) @@ -135,7 +147,9 @@ class RenderSegmentVideoHandler(BaseHandler): output_spec: OutputSpec, duration_ms: int, lut_file: Optional[str] = None, - overlay_file: Optional[str] = None + overlay_file: Optional[str] = None, + overlap_head_ms: int = 0, + overlap_tail_ms: int = 0 ) -> List[str]: """ 构建 FFmpeg 渲染命令 @@ -148,6 +162,8 @@ class RenderSegmentVideoHandler(BaseHandler): duration_ms: 目标时长(毫秒) lut_file: LUT 文件路径(可选) overlay_file: 叠加层文件路径(可选) + overlap_head_ms: 头部 overlap 时长(毫秒) + overlap_tail_ms: 尾部 overlap 时长(毫秒) Returns: FFmpeg 命令参数列表 @@ -166,7 +182,9 @@ class RenderSegmentVideoHandler(BaseHandler): render_spec=render_spec, output_spec=output_spec, lut_file=lut_file, - has_overlay=overlay_file is not None + has_overlay=overlay_file is not None, + overlap_head_ms=overlap_head_ms, + overlap_tail_ms=overlap_tail_ms ) # 应用滤镜 @@ -188,8 +206,9 @@ class RenderSegmentVideoHandler(BaseHandler): cmd.extend(['-g', str(gop_size)]) cmd.extend(['-keyint_min', str(gop_size)]) - # 时长 - duration_sec = duration_ms / 1000.0 + # 时长(包含 overlap 区域) + total_duration_ms = duration_ms + overlap_head_ms + overlap_tail_ms + duration_sec = total_duration_ms / 1000.0 cmd.extend(['-t', str(duration_sec)]) # 无音频(视频片段不包含音频) @@ -205,7 +224,9 @@ class RenderSegmentVideoHandler(BaseHandler): render_spec: RenderSpec, output_spec: OutputSpec, lut_file: Optional[str] = None, - has_overlay: bool = False + has_overlay: bool = False, + overlap_head_ms: int = 0, + overlap_tail_ms: int = 0 ) -> str: """ 构建视频滤镜链 @@ -215,6 +236,8 @@ class RenderSegmentVideoHandler(BaseHandler): output_spec: 输出规格 lut_file: LUT 文件路径 has_overlay: 是否有叠加层 + overlap_head_ms: 头部 overlap 时长(毫秒) + overlap_tail_ms: 尾部 overlap 时长(毫秒) Returns: 滤镜字符串 @@ -265,7 +288,22 @@ class RenderSegmentVideoHandler(BaseHandler): ) filters.append(scale_filter) - # 5. 构建最终滤镜 + # 5. 帧冻结(tpad)- 用于转场 overlap 区域 + # 注意:tpad 必须在缩放之后应用 + tpad_parts = [] + if overlap_head_ms > 0: + # 头部冻结:将第一帧冻结指定时长 + head_duration_sec = overlap_head_ms / 1000.0 + tpad_parts.append(f"start_mode=clone:start_duration={head_duration_sec}") + if overlap_tail_ms > 0: + # 尾部冻结:将最后一帧冻结指定时长 + tail_duration_sec = overlap_tail_ms / 1000.0 + tpad_parts.append(f"stop_mode=clone:stop_duration={tail_duration_sec}") + + if tpad_parts: + filters.append(f"tpad={':'.join(tpad_parts)}") + + # 6. 构建最终滤镜 if has_overlay: # 使用 filter_complex 格式 base_filters = ','.join(filters) if filters else 'copy' diff --git a/services/task_executor.py b/services/task_executor.py index 25def77..15df864 100644 --- a/services/task_executor.py +++ b/services/task_executor.py @@ -67,12 +67,14 @@ class TaskExecutor: """注册所有任务处理器""" # 延迟导入以避免循环依赖 from handlers.render_video import RenderSegmentVideoHandler + from handlers.compose_transition import ComposeTransitionHandler from handlers.prepare_audio import PrepareJobAudioHandler from handlers.package_ts import PackageSegmentTsHandler from handlers.finalize_mp4 import FinalizeMp4Handler handlers = [ RenderSegmentVideoHandler(self.config, self.api_client), + ComposeTransitionHandler(self.config, self.api_client), PrepareJobAudioHandler(self.config, self.api_client), PackageSegmentTsHandler(self.config, self.api_client), FinalizeMp4Handler(self.config, self.api_client),