Files
FrameTour-RenderWorker/handlers/render_video.py
Jerry Yan eeb21cada3 perf(render_video): 优化视频编码的关键帧间隔策略
- 根据视频总帧数动态计算 GOP 大小,避免关键帧过多
- 短视频使用全部帧数作为 GOP,确保只在开头有关键帧
- 正常视频保持 2 秒一个关键帧的策略
- 添加强制第一帧为关键帧的设置
- 优化关键帧最小间隔参数,提升编码效率
2026-02-04 17:46:01 +08:00

654 lines
24 KiB
Python

# -*- coding: utf-8 -*-
"""
视频片段渲染处理器
处理 RENDER_SEGMENT_VIDEO 任务,将原素材渲染为符合输出规格的视频片段。
支持转场 overlap 区域的帧冻结生成。
"""
import os
import logging
from typing import List, Optional, Tuple
from urllib.parse import urlparse, unquote
from handlers.base import BaseHandler
from domain.task import Task, TaskType, RenderSpec, OutputSpec, Effect, IMAGE_EXTENSIONS
from domain.result import TaskResult, ErrorCode
logger = logging.getLogger(__name__)
def _get_extension_from_url(url: str) -> str:
"""从 URL 提取文件扩展名"""
parsed = urlparse(url)
path = unquote(parsed.path)
_, ext = os.path.splitext(path)
return ext.lower() if ext else ''
class RenderSegmentVideoHandler(BaseHandler):
"""
视频片段渲染处理器
职责:
- 下载素材文件
- 下载 LUT 文件(如有)
- 下载叠加层(如有)
- 构建 FFmpeg 渲染命令
- 执行渲染(支持帧冻结生成 overlap 区域)
- 上传产物
"""
def get_supported_type(self) -> TaskType:
return TaskType.RENDER_SEGMENT_VIDEO
def handle(self, task: Task) -> TaskResult:
"""处理视频渲染任务"""
work_dir = self.create_work_dir(task.task_id)
try:
# 解析参数
material_url = task.get_material_url()
if not material_url:
return TaskResult.fail(
ErrorCode.E_SPEC_INVALID,
"Missing material URL (boundMaterialUrl or sourceRef)"
)
# 检查 URL 格式:必须是 HTTP 或 HTTPS 协议
if not material_url.startswith(('http://', 'https://')):
source_ref = task.get_source_ref()
bound_url = task.get_bound_material_url()
logger.error(
f"[task:{task.task_id}] Invalid material URL format: '{material_url}'. "
f"boundMaterialUrl={bound_url}, sourceRef={source_ref}. "
f"Server should provide boundMaterialUrl with HTTP/HTTPS URL."
)
return TaskResult.fail(
ErrorCode.E_SPEC_INVALID,
f"Invalid material URL: '{material_url}' is not a valid HTTP/HTTPS URL. "
f"Server must provide boundMaterialUrl."
)
render_spec = task.get_render_spec()
output_spec = task.get_output_spec()
duration_ms = task.get_duration_ms()
# 1. 检测素材类型并确定输入文件扩展名
is_image = task.is_image_material()
if is_image:
# 图片素材:根据 URL 确定扩展名
ext = _get_extension_from_url(material_url)
if not ext or ext not in IMAGE_EXTENSIONS:
ext = '.jpg' # 默认扩展名
input_file = os.path.join(work_dir, f'input{ext}')
else:
input_file = os.path.join(work_dir, 'input.mp4')
# 2. 下载素材
if not self.download_file(material_url, input_file):
return TaskResult.fail(
ErrorCode.E_INPUT_UNAVAILABLE,
f"Failed to download material: {material_url}"
)
# 3. 图片素材转换为视频
if is_image:
video_input_file = os.path.join(work_dir, 'input_video.mp4')
if not self._convert_image_to_video(
image_file=input_file,
output_file=video_input_file,
duration_ms=duration_ms,
output_spec=output_spec,
render_spec=render_spec,
task_id=task.task_id
):
return TaskResult.fail(
ErrorCode.E_FFMPEG_FAILED,
"Failed to convert image to video"
)
# 使用转换后的视频作为输入
input_file = video_input_file
logger.info(f"[task:{task.task_id}] Image converted to video successfully")
# 4. 下载 LUT(如有)
lut_file = None
if render_spec.lut_url:
lut_file = os.path.join(work_dir, 'lut.cube')
if not self.download_file(render_spec.lut_url, lut_file):
logger.warning(f"[task:{task.task_id}] Failed to download LUT, continuing without it")
lut_file = None
# 5. 下载叠加层(如有)
overlay_file = None
if render_spec.overlay_url:
# 根据 URL 后缀确定文件扩展名
url_lower = render_spec.overlay_url.lower()
if url_lower.endswith('.jpg') or url_lower.endswith('.jpeg'):
ext = '.jpg'
elif url_lower.endswith('.mov'):
ext = '.mov'
else:
ext = '.png' # 默认
overlay_file = os.path.join(work_dir, f'overlay{ext}')
if not self.download_file(render_spec.overlay_url, overlay_file):
logger.warning(f"[task:{task.task_id}] Failed to download overlay, continuing without it")
overlay_file = None
# 6. 计算 overlap 时长(用于转场帧冻结)
# 头部 overlap: 来自前一片段的出场转场
overlap_head_ms = task.get_overlap_head_ms()
# 尾部 overlap: 当前片段的出场转场
overlap_tail_ms = task.get_overlap_tail_ms_v2()
# 7. 构建 FFmpeg 命令
output_file = os.path.join(work_dir, 'output.mp4')
cmd = self._build_command(
input_file=input_file,
output_file=output_file,
render_spec=render_spec,
output_spec=output_spec,
duration_ms=duration_ms,
lut_file=lut_file,
overlay_file=overlay_file,
overlap_head_ms=overlap_head_ms,
overlap_tail_ms=overlap_tail_ms
)
# 8. 执行 FFmpeg
if not self.run_ffmpeg(cmd, task.task_id):
return TaskResult.fail(
ErrorCode.E_FFMPEG_FAILED,
"FFmpeg rendering failed"
)
# 9. 验证输出文件
if not self.ensure_file_exists(output_file, min_size=4096):
return TaskResult.fail(
ErrorCode.E_FFMPEG_FAILED,
"Output file is missing or too small"
)
# 10. 获取实际时长
actual_duration = self.probe_duration(output_file)
actual_duration_ms = int(actual_duration * 1000) if actual_duration else duration_ms
# 11. 上传产物
video_url = self.upload_file(task.task_id, 'video', output_file)
if not video_url:
return TaskResult.fail(
ErrorCode.E_UPLOAD_FAILED,
"Failed to upload video"
)
# 12. 构建结果(包含 overlap 信息)
result_data = {
'videoUrl': video_url,
'actualDurationMs': actual_duration_ms,
'overlapHeadMs': overlap_head_ms,
'overlapTailMs': overlap_tail_ms
}
return TaskResult.ok(result_data)
except Exception as e:
logger.error(f"[task:{task.task_id}] Unexpected error: {e}", exc_info=True)
return TaskResult.fail(ErrorCode.E_UNKNOWN, str(e))
finally:
self.cleanup_work_dir(work_dir)
def _convert_image_to_video(
self,
image_file: str,
output_file: str,
duration_ms: int,
output_spec: OutputSpec,
render_spec: RenderSpec,
task_id: str
) -> bool:
"""
将图片转换为视频
使用 FFmpeg 将静态图片转换为指定时长的视频,
同时应用缩放填充和变速处理。
Args:
image_file: 输入图片文件路径
output_file: 输出视频文件路径
duration_ms: 目标时长(毫秒)
output_spec: 输出规格
render_spec: 渲染规格
task_id: 任务 ID(用于日志)
Returns:
是否成功
"""
width = output_spec.width
height = output_spec.height
fps = output_spec.fps
# 计算实际时长(考虑变速)
speed = float(render_spec.speed) if render_spec.speed else 1.0
if speed <= 0:
speed = 1.0
# 变速后的实际播放时长
actual_duration_sec = (duration_ms / 1000.0) / speed
# 构建 FFmpeg 命令
cmd = [
'ffmpeg', '-y', '-hide_banner',
'-loop', '1', # 循环输入图片
'-i', image_file,
'-t', str(actual_duration_sec), # 输出时长
]
# 构建滤镜:缩放填充到目标尺寸
filters = []
# 裁切处理(与视频相同逻辑)
if render_spec.crop_enable and render_spec.face_pos:
try:
fx, fy = map(float, render_spec.face_pos.split(','))
target_ratio = width / height
filters.append(
f"crop='min(iw,ih*{target_ratio})':'min(ih,iw/{target_ratio})':"
f"'(iw-min(iw,ih*{target_ratio}))*{fx}':"
f"'(ih-min(ih,iw/{target_ratio}))*{fy}'"
)
except (ValueError, ZeroDivisionError):
logger.warning(f"[task:{task_id}] Invalid face position: {render_spec.face_pos}")
elif render_spec.zoom_cut:
target_ratio = width / height
filters.append(
f"crop='min(iw,ih*{target_ratio})':'min(ih,iw/{target_ratio})'"
)
# 缩放填充
filters.append(
f"scale={width}:{height}:force_original_aspect_ratio=decrease,"
f"pad={width}:{height}:(ow-iw)/2:(oh-ih)/2:black"
)
# 格式转换(确保兼容性)
filters.append("format=yuv420p")
cmd.extend(['-vf', ','.join(filters)])
# 计算总帧数,动态调整 GOP
total_frames = int(actual_duration_sec * fps)
if total_frames <= 1:
gop_size = 1
elif total_frames < fps:
gop_size = total_frames
else:
gop_size = fps * 2 # 正常情况,2 秒一个关键帧
# 编码参数
cmd.extend([
'-c:v', 'libx264',
'-preset', 'fast',
'-crf', '18',
'-r', str(fps),
'-g', str(gop_size),
'-keyint_min', str(min(gop_size, fps // 2 or 1)),
'-force_key_frames', 'expr:eq(n,0)',
'-an', # 无音频
output_file
])
logger.info(f"[task:{task_id}] Converting image to video: {actual_duration_sec:.2f}s at {fps}fps")
return self.run_ffmpeg(cmd, task_id)
def _build_command(
self,
input_file: str,
output_file: str,
render_spec: RenderSpec,
output_spec: OutputSpec,
duration_ms: int,
lut_file: Optional[str] = None,
overlay_file: Optional[str] = None,
overlap_head_ms: int = 0,
overlap_tail_ms: int = 0
) -> List[str]:
"""
构建 FFmpeg 渲染命令
Args:
input_file: 输入文件路径
output_file: 输出文件路径
render_spec: 渲染规格
output_spec: 输出规格
duration_ms: 目标时长(毫秒)
lut_file: LUT 文件路径(可选)
overlay_file: 叠加层文件路径(可选)
overlap_head_ms: 头部 overlap 时长(毫秒)
overlap_tail_ms: 尾部 overlap 时长(毫秒)
Returns:
FFmpeg 命令参数列表
"""
cmd = ['ffmpeg', '-y', '-hide_banner']
# 硬件加速解码参数(在输入文件之前)
hwaccel_args = self.get_hwaccel_decode_args()
if hwaccel_args:
cmd.extend(hwaccel_args)
# 输入文件
cmd.extend(['-i', input_file])
# 叠加层输入
if overlay_file:
cmd.extend(['-i', overlay_file])
# 构建视频滤镜链
filters = self._build_video_filters(
render_spec=render_spec,
output_spec=output_spec,
lut_file=lut_file,
overlay_file=overlay_file,
overlap_head_ms=overlap_head_ms,
overlap_tail_ms=overlap_tail_ms
)
# 应用滤镜
# 检测是否为 filter_complex 格式(包含分号或方括号标签)
is_filter_complex = ';' in filters or (filters.startswith('[') and ']' in filters)
if is_filter_complex or overlay_file:
# 使用 filter_complex 处理
cmd.extend(['-filter_complex', filters])
elif filters:
cmd.extend(['-vf', filters])
# 编码参数(根据硬件加速配置动态获取)
cmd.extend(self.get_video_encode_args())
# 帧率
fps = output_spec.fps
cmd.extend(['-r', str(fps)])
# 时长(包含 overlap 区域)
total_duration_ms = duration_ms + overlap_head_ms + overlap_tail_ms
duration_sec = total_duration_ms / 1000.0
cmd.extend(['-t', str(duration_sec)])
# 动态调整 GOP 大小:对于短视频,GOP 不能大于总帧数
total_frames = int(duration_sec * fps)
if total_frames <= 1:
gop_size = 1
elif total_frames < fps:
# 短于 1 秒的视频,使用全部帧数作为 GOP(整个视频只有开头一个关键帧)
gop_size = total_frames
else:
# 正常情况,2 秒一个关键帧
gop_size = fps * 2
cmd.extend(['-g', str(gop_size)])
cmd.extend(['-keyint_min', str(min(gop_size, fps // 2 or 1))])
# 强制第一帧为关键帧
cmd.extend(['-force_key_frames', 'expr:eq(n,0)'])
# 无音频(视频片段不包含音频)
cmd.append('-an')
# 输出文件
cmd.append(output_file)
return cmd
def _build_video_filters(
self,
render_spec: RenderSpec,
output_spec: OutputSpec,
lut_file: Optional[str] = None,
overlay_file: Optional[str] = None,
overlap_head_ms: int = 0,
overlap_tail_ms: int = 0
) -> str:
"""
构建视频滤镜链
Args:
render_spec: 渲染规格
output_spec: 输出规格
lut_file: LUT 文件路径
overlay_file: 叠加层文件路径(支持图片 png/jpg 和视频 mov)
overlap_head_ms: 头部 overlap 时长(毫秒)
overlap_tail_ms: 尾部 overlap 时长(毫秒)
Returns:
滤镜字符串
"""
filters = []
width = output_spec.width
height = output_spec.height
fps = output_spec.fps
# 判断 overlay 类型
has_overlay = overlay_file is not None
is_video_overlay = has_overlay and overlay_file.lower().endswith('.mov')
# 解析 effects
effects = render_spec.get_effects()
has_camera_shot = any(e.effect_type == 'cameraShot' for e in effects)
# 硬件加速时需要先 hwdownload(将 GPU 表面下载到系统内存)
hwaccel_prefix = self.get_hwaccel_filter_prefix()
if hwaccel_prefix:
# 去掉末尾的逗号,作为第一个滤镜
filters.append(hwaccel_prefix.rstrip(','))
# 1. 变速处理
speed = float(render_spec.speed) if render_spec.speed else 1.0
if speed != 1.0 and speed > 0:
# setpts 公式:PTS / speed
pts_factor = 1.0 / speed
filters.append(f"setpts={pts_factor}*PTS")
# 2. LUT 调色
if lut_file:
# 路径中的反斜杠需要转换,冒号需要转义(FFmpeg filter语法中冒号是特殊字符)
lut_path = lut_file.replace('\\', '/').replace(':', r'\:')
filters.append(f"lut3d='{lut_path}'")
# 3. 裁切处理
if render_spec.crop_enable and render_spec.face_pos:
# 根据人脸位置进行智能裁切
try:
fx, fy = map(float, render_spec.face_pos.split(','))
# 计算裁切区域(保持输出比例)
target_ratio = width / height
# 假设裁切到目标比例
filters.append(
f"crop='min(iw,ih*{target_ratio})':'min(ih,iw/{target_ratio})':"
f"'(iw-min(iw,ih*{target_ratio}))*{fx}':"
f"'(ih-min(ih,iw/{target_ratio}))*{fy}'"
)
except (ValueError, ZeroDivisionError):
logger.warning(f"Invalid face position: {render_spec.face_pos}")
elif render_spec.zoom_cut:
# 中心缩放裁切
target_ratio = width / height
filters.append(
f"crop='min(iw,ih*{target_ratio})':'min(ih,iw/{target_ratio})'"
)
# 4. 缩放和填充
scale_filter = (
f"scale={width}:{height}:force_original_aspect_ratio=decrease,"
f"pad={width}:{height}:(ow-iw)/2:(oh-ih)/2:black"
)
filters.append(scale_filter)
# 5. 特效处理(cameraShot 需要特殊处理)
if has_camera_shot:
# cameraShot 需要使用 filter_complex 格式
return self._build_filter_complex_with_effects(
base_filters=filters,
effects=effects,
fps=fps,
width=width,
height=height,
has_overlay=has_overlay,
is_video_overlay=is_video_overlay,
overlap_head_ms=overlap_head_ms,
overlap_tail_ms=overlap_tail_ms,
use_hwdownload=bool(hwaccel_prefix)
)
# 6. 帧冻结(tpad)- 用于转场 overlap 区域
# 注意:tpad 必须在缩放之后应用
tpad_parts = []
if overlap_head_ms > 0:
# 头部冻结:将第一帧冻结指定时长
head_duration_sec = overlap_head_ms / 1000.0
tpad_parts.append(f"start_mode=clone:start_duration={head_duration_sec}")
if overlap_tail_ms > 0:
# 尾部冻结:将最后一帧冻结指定时长
tail_duration_sec = overlap_tail_ms / 1000.0
tpad_parts.append(f"stop_mode=clone:stop_duration={tail_duration_sec}")
if tpad_parts:
filters.append(f"tpad={':'.join(tpad_parts)}")
# 7. 构建最终滤镜
if has_overlay:
# 使用 filter_complex 格式
base_filters = ','.join(filters) if filters else 'copy'
overlay_scale = f"scale={width}:{height}"
# 视频 overlay 使用 eof_action=pass(结束后消失),图片 overlay 使用默认行为(保持显示)
overlay_params = 'eof_action=pass' if is_video_overlay else ''
overlay_filter = f"overlay=0:0:{overlay_params}" if overlay_params else 'overlay=0:0'
# 视频 overlay 需要在末尾统一颜色范围,避免 overlay 结束后 range 从 tv 变为 pc
range_fix = ',format=yuv420p,setrange=tv' if is_video_overlay else ''
return f"[0:v]{base_filters}[base];[1:v]{overlay_scale}[overlay];[base][overlay]{overlay_filter}{range_fix}"
else:
return ','.join(filters) if filters else ''
def _build_filter_complex_with_effects(
self,
base_filters: List[str],
effects: List[Effect],
fps: int,
width: int,
height: int,
has_overlay: bool = False,
is_video_overlay: bool = False,
overlap_head_ms: int = 0,
overlap_tail_ms: int = 0,
use_hwdownload: bool = False
) -> str:
"""
构建包含特效的 filter_complex 滤镜图
cameraShot 效果需要使用 split/freezeframes/concat 滤镜组合。
Args:
base_filters: 基础滤镜列表
effects: 特效列表
fps: 帧率
width: 输出宽度
height: 输出高度
has_overlay: 是否有叠加层
is_video_overlay: 叠加层是否为视频格式(如 .mov)
overlap_head_ms: 头部 overlap 时长
overlap_tail_ms: 尾部 overlap 时长
use_hwdownload: 是否使用了硬件加速解码(已在 base_filters 中包含 hwdownload)
Returns:
filter_complex 格式的滤镜字符串
"""
filter_parts = []
# 基础滤镜链
base_chain = ','.join(base_filters) if base_filters else 'copy'
# 当前输出标签
current_output = '[v_base]'
filter_parts.append(f"[0:v]{base_chain}{current_output}")
# 处理每个特效
effect_idx = 0
for effect in effects:
if effect.effect_type == 'cameraShot':
start_sec, duration_sec = effect.get_camera_shot_params()
if start_sec <= 0 or duration_sec <= 0:
continue
# cameraShot 实现(定格效果):
# 1. fps + split 分割
# 2. 第一路:trim(0, start) + tpad冻结duration秒
# 3. 第二路:trim(start, end)
# 4. concat 拼接
split_out_a = f'[eff{effect_idx}_a]'
split_out_b = f'[eff{effect_idx}_b]'
frozen_out = f'[eff{effect_idx}_frozen]'
rest_out = f'[eff{effect_idx}_rest]'
effect_output = f'[v_eff{effect_idx}]'
# fps + split
filter_parts.append(
f"{current_output}fps=fps={fps},split{split_out_a}{split_out_b}"
)
# 第一路:trim(0, start) + tpad冻结
# tpad=stop_mode=clone 将最后一帧冻结指定时长
filter_parts.append(
f"{split_out_a}trim=start=0:end={start_sec},setpts=PTS-STARTPTS,"
f"tpad=stop_mode=clone:stop_duration={duration_sec}{frozen_out}"
)
# 第二路:trim 从 start 开始
filter_parts.append(
f"{split_out_b}trim=start={start_sec},setpts=PTS-STARTPTS{rest_out}"
)
# concat 拼接
filter_parts.append(
f"{frozen_out}{rest_out}concat=n=2:v=1:a=0{effect_output}"
)
current_output = effect_output
effect_idx += 1
# 帧冻结(tpad)- 用于转场 overlap 区域
tpad_parts = []
if overlap_head_ms > 0:
head_duration_sec = overlap_head_ms / 1000.0
tpad_parts.append(f"start_mode=clone:start_duration={head_duration_sec}")
if overlap_tail_ms > 0:
tail_duration_sec = overlap_tail_ms / 1000.0
tpad_parts.append(f"stop_mode=clone:stop_duration={tail_duration_sec}")
if tpad_parts:
tpad_output = '[v_tpad]'
filter_parts.append(f"{current_output}tpad={':'.join(tpad_parts)}{tpad_output}")
current_output = tpad_output
# 最终输出
if has_overlay:
# 叠加层处理
# 视频 overlay 使用 eof_action=pass(结束后消失),图片 overlay 使用默认行为(保持显示)
overlay_params = 'eof_action=pass' if is_video_overlay else ''
overlay_filter = f"overlay=0:0:{overlay_params}" if overlay_params else 'overlay=0:0'
overlay_scale = f"scale={width}:{height}"
overlay_output = '[v_overlay]'
# 视频 overlay 需要在末尾统一颜色范围,避免 overlay 结束后 range 从 tv 变为 pc
range_fix = ',format=yuv420p,setrange=tv' if is_video_overlay else ''
filter_parts.append(f"[1:v]{overlay_scale}{overlay_output}")
filter_parts.append(f"{current_output}{overlay_output}{overlay_filter}{range_fix}")
else:
# 移除最后一个标签,直接输出
# 将最后一个滤镜的输出标签替换为空(直接输出)
if filter_parts:
last_filter = filter_parts[-1]
# 移除末尾的输出标签
if last_filter.endswith(current_output):
filter_parts[-1] = last_filter[:-len(current_output)]
return ';'.join(filter_parts)