Files
FrameTour-RenderWorker/handlers/render_video.py
Jerry Yan 9c6186ecd3 feat(video): 添加视频转场功能支持
- 在 TASK_TYPES 中新增 COMPOSE_TRANSITION 类型
- 定义 TRANSITION_TYPES 常量支持多种转场效果
- 在 TaskType 枚举中添加 COMPOSE_TRANSITION
- 创建 TransitionConfig 数据类处理转场配置
- 为 RenderSpec 添加 transition_in 和 transition_out 属性
- 在 Task 类中添加转场相关的方法
- 新增 ComposeTransitionHandler 处理转场合成任务
- 修改 PackageSegmentTsHandler 支持转场分片封装
- 修改 RenderSegmentVideoHandler 支持 overlap 区域生成
- 在 TaskExecutor 中注册转场处理器
2026-01-12 22:41:22 +08:00

313 lines
11 KiB
Python

# -*- coding: utf-8 -*-
"""
视频片段渲染处理器
处理 RENDER_SEGMENT_VIDEO 任务,将原素材渲染为符合输出规格的视频片段。
支持转场 overlap 区域的帧冻结生成。
"""
import os
import logging
from typing import List, Optional, Tuple
from handlers.base import BaseHandler, VIDEO_ENCODE_ARGS
from domain.task import Task, TaskType, RenderSpec, OutputSpec
from domain.result import TaskResult, ErrorCode
logger = logging.getLogger(__name__)
class RenderSegmentVideoHandler(BaseHandler):
"""
视频片段渲染处理器
职责:
- 下载素材文件
- 下载 LUT 文件(如有)
- 下载叠加层(如有)
- 构建 FFmpeg 渲染命令
- 执行渲染(支持帧冻结生成 overlap 区域)
- 上传产物
"""
def get_supported_type(self) -> TaskType:
return TaskType.RENDER_SEGMENT_VIDEO
def handle(self, task: Task) -> TaskResult:
"""处理视频渲染任务"""
work_dir = self.create_work_dir(task.task_id)
try:
# 解析参数
material_url = task.get_material_url()
if not material_url:
return TaskResult.fail(
ErrorCode.E_SPEC_INVALID,
"Missing material URL (boundMaterialUrl or sourceRef)"
)
render_spec = task.get_render_spec()
output_spec = task.get_output_spec()
duration_ms = task.get_duration_ms()
# 1. 下载素材
input_file = os.path.join(work_dir, 'input.mp4')
if not self.download_file(material_url, input_file):
return TaskResult.fail(
ErrorCode.E_INPUT_UNAVAILABLE,
f"Failed to download material: {material_url}"
)
# 2. 下载 LUT(如有)
lut_file = None
if render_spec.lut_url:
lut_file = os.path.join(work_dir, 'lut.cube')
if not self.download_file(render_spec.lut_url, lut_file):
logger.warning(f"[task:{task.task_id}] Failed to download LUT, continuing without it")
lut_file = None
# 3. 下载叠加层(如有)
overlay_file = None
if render_spec.overlay_url:
# 根据 URL 后缀确定文件扩展名
ext = '.png'
if render_spec.overlay_url.lower().endswith('.jpg') or render_spec.overlay_url.lower().endswith('.jpeg'):
ext = '.jpg'
overlay_file = os.path.join(work_dir, f'overlay{ext}')
if not self.download_file(render_spec.overlay_url, overlay_file):
logger.warning(f"[task:{task.task_id}] Failed to download overlay, continuing without it")
overlay_file = None
# 4. 计算 overlap 时长
overlap_head_ms = render_spec.get_overlap_head_ms()
overlap_tail_ms = render_spec.get_overlap_tail_ms()
# 5. 构建 FFmpeg 命令
output_file = os.path.join(work_dir, 'output.mp4')
cmd = self._build_command(
input_file=input_file,
output_file=output_file,
render_spec=render_spec,
output_spec=output_spec,
duration_ms=duration_ms,
lut_file=lut_file,
overlay_file=overlay_file,
overlap_head_ms=overlap_head_ms,
overlap_tail_ms=overlap_tail_ms
)
# 6. 执行 FFmpeg
if not self.run_ffmpeg(cmd, task.task_id):
return TaskResult.fail(
ErrorCode.E_FFMPEG_FAILED,
"FFmpeg rendering failed"
)
# 7. 验证输出文件
if not self.ensure_file_exists(output_file, min_size=4096):
return TaskResult.fail(
ErrorCode.E_FFMPEG_FAILED,
"Output file is missing or too small"
)
# 8. 获取实际时长
actual_duration = self.probe_duration(output_file)
actual_duration_ms = int(actual_duration * 1000) if actual_duration else duration_ms
# 9. 上传产物
video_url = self.upload_file(task.task_id, 'video', output_file)
if not video_url:
return TaskResult.fail(
ErrorCode.E_UPLOAD_FAILED,
"Failed to upload video"
)
# 10. 构建结果(包含 overlap 信息)
result_data = {
'videoUrl': video_url,
'actualDurationMs': actual_duration_ms,
'overlapHeadMs': overlap_head_ms,
'overlapTailMs': overlap_tail_ms
}
return TaskResult.ok(result_data)
except Exception as e:
logger.error(f"[task:{task.task_id}] Unexpected error: {e}", exc_info=True)
return TaskResult.fail(ErrorCode.E_UNKNOWN, str(e))
finally:
self.cleanup_work_dir(work_dir)
def _build_command(
self,
input_file: str,
output_file: str,
render_spec: RenderSpec,
output_spec: OutputSpec,
duration_ms: int,
lut_file: Optional[str] = None,
overlay_file: Optional[str] = None,
overlap_head_ms: int = 0,
overlap_tail_ms: int = 0
) -> List[str]:
"""
构建 FFmpeg 渲染命令
Args:
input_file: 输入文件路径
output_file: 输出文件路径
render_spec: 渲染规格
output_spec: 输出规格
duration_ms: 目标时长(毫秒)
lut_file: LUT 文件路径(可选)
overlay_file: 叠加层文件路径(可选)
overlap_head_ms: 头部 overlap 时长(毫秒)
overlap_tail_ms: 尾部 overlap 时长(毫秒)
Returns:
FFmpeg 命令参数列表
"""
cmd = ['ffmpeg', '-y', '-hide_banner']
# 输入文件
cmd.extend(['-i', input_file])
# 叠加层输入
if overlay_file:
cmd.extend(['-i', overlay_file])
# 构建视频滤镜链
filters = self._build_video_filters(
render_spec=render_spec,
output_spec=output_spec,
lut_file=lut_file,
has_overlay=overlay_file is not None,
overlap_head_ms=overlap_head_ms,
overlap_tail_ms=overlap_tail_ms
)
# 应用滤镜
if overlay_file:
# 使用 filter_complex 处理叠加
cmd.extend(['-filter_complex', filters])
elif filters:
cmd.extend(['-vf', filters])
# 编码参数(v2 统一参数)
cmd.extend(VIDEO_ENCODE_ARGS)
# 帧率
fps = output_spec.fps
cmd.extend(['-r', str(fps)])
# GOP 大小(关键帧间隔)
gop_size = fps * 2 # 2秒一个关键帧
cmd.extend(['-g', str(gop_size)])
cmd.extend(['-keyint_min', str(gop_size)])
# 时长(包含 overlap 区域)
total_duration_ms = duration_ms + overlap_head_ms + overlap_tail_ms
duration_sec = total_duration_ms / 1000.0
cmd.extend(['-t', str(duration_sec)])
# 无音频(视频片段不包含音频)
cmd.append('-an')
# 输出文件
cmd.append(output_file)
return cmd
def _build_video_filters(
self,
render_spec: RenderSpec,
output_spec: OutputSpec,
lut_file: Optional[str] = None,
has_overlay: bool = False,
overlap_head_ms: int = 0,
overlap_tail_ms: int = 0
) -> str:
"""
构建视频滤镜链
Args:
render_spec: 渲染规格
output_spec: 输出规格
lut_file: LUT 文件路径
has_overlay: 是否有叠加层
overlap_head_ms: 头部 overlap 时长(毫秒)
overlap_tail_ms: 尾部 overlap 时长(毫秒)
Returns:
滤镜字符串
"""
filters = []
width = output_spec.width
height = output_spec.height
# 1. 变速处理
speed = float(render_spec.speed) if render_spec.speed else 1.0
if speed != 1.0 and speed > 0:
# setpts 公式:PTS / speed
pts_factor = 1.0 / speed
filters.append(f"setpts={pts_factor}*PTS")
# 2. LUT 调色
if lut_file:
# 路径中的反斜杠需要转义
lut_path = lut_file.replace('\\', '/')
filters.append(f"lut3d='{lut_path}'")
# 3. 裁切处理
if render_spec.crop_enable and render_spec.face_pos:
# 根据人脸位置进行智能裁切
try:
fx, fy = map(float, render_spec.face_pos.split(','))
# 计算裁切区域(保持输出比例)
target_ratio = width / height
# 假设裁切到目标比例
filters.append(
f"crop='min(iw,ih*{target_ratio})':'min(ih,iw/{target_ratio})':"
f"'(iw-min(iw,ih*{target_ratio}))*{fx}':"
f"'(ih-min(ih,iw/{target_ratio}))*{fy}'"
)
except (ValueError, ZeroDivisionError):
logger.warning(f"Invalid face position: {render_spec.face_pos}")
elif render_spec.zoom_cut:
# 中心缩放裁切
target_ratio = width / height
filters.append(
f"crop='min(iw,ih*{target_ratio})':'min(ih,iw/{target_ratio})'"
)
# 4. 缩放和填充
scale_filter = (
f"scale={width}:{height}:force_original_aspect_ratio=decrease,"
f"pad={width}:{height}:(ow-iw)/2:(oh-ih)/2:black"
)
filters.append(scale_filter)
# 5. 帧冻结(tpad)- 用于转场 overlap 区域
# 注意:tpad 必须在缩放之后应用
tpad_parts = []
if overlap_head_ms > 0:
# 头部冻结:将第一帧冻结指定时长
head_duration_sec = overlap_head_ms / 1000.0
tpad_parts.append(f"start_mode=clone:start_duration={head_duration_sec}")
if overlap_tail_ms > 0:
# 尾部冻结:将最后一帧冻结指定时长
tail_duration_sec = overlap_tail_ms / 1000.0
tpad_parts.append(f"stop_mode=clone:stop_duration={tail_duration_sec}")
if tpad_parts:
filters.append(f"tpad={':'.join(tpad_parts)}")
# 6. 构建最终滤镜
if has_overlay:
# 使用 filter_complex 格式
base_filters = ','.join(filters) if filters else 'copy'
return f"[0:v]{base_filters}[base];[base][1:v]overlay=0:0"
else:
return ','.join(filters) if filters else ''