# -*- coding: utf-8 -*- """ 视频片段渲染处理器 处理 RENDER_SEGMENT_VIDEO 任务,将原素材渲染为符合输出规格的视频片段。 支持转场 overlap 区域的帧冻结生成。 """ import os import logging from typing import List, Optional, Tuple from handlers.base import BaseHandler from domain.task import Task, TaskType, RenderSpec, OutputSpec, Effect from domain.result import TaskResult, ErrorCode logger = logging.getLogger(__name__) class RenderSegmentVideoHandler(BaseHandler): """ 视频片段渲染处理器 职责: - 下载素材文件 - 下载 LUT 文件(如有) - 下载叠加层(如有) - 构建 FFmpeg 渲染命令 - 执行渲染(支持帧冻结生成 overlap 区域) - 上传产物 """ def get_supported_type(self) -> TaskType: return TaskType.RENDER_SEGMENT_VIDEO def handle(self, task: Task) -> TaskResult: """处理视频渲染任务""" work_dir = self.create_work_dir(task.task_id) try: # 解析参数 material_url = task.get_material_url() if not material_url: return TaskResult.fail( ErrorCode.E_SPEC_INVALID, "Missing material URL (boundMaterialUrl or sourceRef)" ) render_spec = task.get_render_spec() output_spec = task.get_output_spec() duration_ms = task.get_duration_ms() # 1. 下载素材 input_file = os.path.join(work_dir, 'input.mp4') if not self.download_file(material_url, input_file): return TaskResult.fail( ErrorCode.E_INPUT_UNAVAILABLE, f"Failed to download material: {material_url}" ) # 2. 下载 LUT(如有) lut_file = None if render_spec.lut_url: lut_file = os.path.join(work_dir, 'lut.cube') if not self.download_file(render_spec.lut_url, lut_file): logger.warning(f"[task:{task.task_id}] Failed to download LUT, continuing without it") lut_file = None # 3. 下载叠加层(如有) overlay_file = None if render_spec.overlay_url: # 根据 URL 后缀确定文件扩展名 ext = '.png' if render_spec.overlay_url.lower().endswith('.jpg') or render_spec.overlay_url.lower().endswith('.jpeg'): ext = '.jpg' overlay_file = os.path.join(work_dir, f'overlay{ext}') if not self.download_file(render_spec.overlay_url, overlay_file): logger.warning(f"[task:{task.task_id}] Failed to download overlay, continuing without it") overlay_file = None # 4. 计算 overlap 时长 overlap_head_ms = render_spec.get_overlap_head_ms() overlap_tail_ms = render_spec.get_overlap_tail_ms() # 5. 构建 FFmpeg 命令 output_file = os.path.join(work_dir, 'output.mp4') cmd = self._build_command( input_file=input_file, output_file=output_file, render_spec=render_spec, output_spec=output_spec, duration_ms=duration_ms, lut_file=lut_file, overlay_file=overlay_file, overlap_head_ms=overlap_head_ms, overlap_tail_ms=overlap_tail_ms ) # 6. 执行 FFmpeg if not self.run_ffmpeg(cmd, task.task_id): return TaskResult.fail( ErrorCode.E_FFMPEG_FAILED, "FFmpeg rendering failed" ) # 7. 验证输出文件 if not self.ensure_file_exists(output_file, min_size=4096): return TaskResult.fail( ErrorCode.E_FFMPEG_FAILED, "Output file is missing or too small" ) # 8. 获取实际时长 actual_duration = self.probe_duration(output_file) actual_duration_ms = int(actual_duration * 1000) if actual_duration else duration_ms # 9. 上传产物 video_url = self.upload_file(task.task_id, 'video', output_file) if not video_url: return TaskResult.fail( ErrorCode.E_UPLOAD_FAILED, "Failed to upload video" ) # 10. 构建结果(包含 overlap 信息) result_data = { 'videoUrl': video_url, 'actualDurationMs': actual_duration_ms, 'overlapHeadMs': overlap_head_ms, 'overlapTailMs': overlap_tail_ms } return TaskResult.ok(result_data) except Exception as e: logger.error(f"[task:{task.task_id}] Unexpected error: {e}", exc_info=True) return TaskResult.fail(ErrorCode.E_UNKNOWN, str(e)) finally: self.cleanup_work_dir(work_dir) def _build_command( self, input_file: str, output_file: str, render_spec: RenderSpec, output_spec: OutputSpec, duration_ms: int, lut_file: Optional[str] = None, overlay_file: Optional[str] = None, overlap_head_ms: int = 0, overlap_tail_ms: int = 0 ) -> List[str]: """ 构建 FFmpeg 渲染命令 Args: input_file: 输入文件路径 output_file: 输出文件路径 render_spec: 渲染规格 output_spec: 输出规格 duration_ms: 目标时长(毫秒) lut_file: LUT 文件路径(可选) overlay_file: 叠加层文件路径(可选) overlap_head_ms: 头部 overlap 时长(毫秒) overlap_tail_ms: 尾部 overlap 时长(毫秒) Returns: FFmpeg 命令参数列表 """ cmd = ['ffmpeg', '-y', '-hide_banner'] # 硬件加速解码参数(在输入文件之前) hwaccel_args = self.get_hwaccel_decode_args() if hwaccel_args: cmd.extend(hwaccel_args) # 输入文件 cmd.extend(['-i', input_file]) # 叠加层输入 if overlay_file: cmd.extend(['-i', overlay_file]) # 构建视频滤镜链 filters = self._build_video_filters( render_spec=render_spec, output_spec=output_spec, lut_file=lut_file, has_overlay=overlay_file is not None, overlap_head_ms=overlap_head_ms, overlap_tail_ms=overlap_tail_ms ) # 应用滤镜 # 检测是否为 filter_complex 格式(包含分号或方括号标签) is_filter_complex = ';' in filters or (filters.startswith('[') and ']' in filters) if is_filter_complex or overlay_file: # 使用 filter_complex 处理 cmd.extend(['-filter_complex', filters]) elif filters: cmd.extend(['-vf', filters]) # 编码参数(根据硬件加速配置动态获取) cmd.extend(self.get_video_encode_args()) # 帧率 fps = output_spec.fps cmd.extend(['-r', str(fps)]) # GOP 大小(关键帧间隔) gop_size = fps * 2 # 2秒一个关键帧 cmd.extend(['-g', str(gop_size)]) cmd.extend(['-keyint_min', str(gop_size)]) # 时长(包含 overlap 区域) total_duration_ms = duration_ms + overlap_head_ms + overlap_tail_ms duration_sec = total_duration_ms / 1000.0 cmd.extend(['-t', str(duration_sec)]) # 无音频(视频片段不包含音频) cmd.append('-an') # 输出文件 cmd.append(output_file) return cmd def _build_video_filters( self, render_spec: RenderSpec, output_spec: OutputSpec, lut_file: Optional[str] = None, has_overlay: bool = False, overlap_head_ms: int = 0, overlap_tail_ms: int = 0 ) -> str: """ 构建视频滤镜链 Args: render_spec: 渲染规格 output_spec: 输出规格 lut_file: LUT 文件路径 has_overlay: 是否有叠加层 overlap_head_ms: 头部 overlap 时长(毫秒) overlap_tail_ms: 尾部 overlap 时长(毫秒) Returns: 滤镜字符串 """ filters = [] width = output_spec.width height = output_spec.height fps = output_spec.fps # 解析 effects effects = render_spec.get_effects() has_camera_shot = any(e.effect_type == 'cameraShot' for e in effects) # 硬件加速时需要先 hwdownload(将 GPU 表面下载到系统内存) hwaccel_prefix = self.get_hwaccel_filter_prefix() if hwaccel_prefix: # 去掉末尾的逗号,作为第一个滤镜 filters.append(hwaccel_prefix.rstrip(',')) # 1. 变速处理 speed = float(render_spec.speed) if render_spec.speed else 1.0 if speed != 1.0 and speed > 0: # setpts 公式:PTS / speed pts_factor = 1.0 / speed filters.append(f"setpts={pts_factor}*PTS") # 2. LUT 调色 if lut_file: # 路径中的反斜杠需要转义 lut_path = lut_file.replace('\\', '/') filters.append(f"lut3d='{lut_path}'") # 3. 裁切处理 if render_spec.crop_enable and render_spec.face_pos: # 根据人脸位置进行智能裁切 try: fx, fy = map(float, render_spec.face_pos.split(',')) # 计算裁切区域(保持输出比例) target_ratio = width / height # 假设裁切到目标比例 filters.append( f"crop='min(iw,ih*{target_ratio})':'min(ih,iw/{target_ratio})':" f"'(iw-min(iw,ih*{target_ratio}))*{fx}':" f"'(ih-min(ih,iw/{target_ratio}))*{fy}'" ) except (ValueError, ZeroDivisionError): logger.warning(f"Invalid face position: {render_spec.face_pos}") elif render_spec.zoom_cut: # 中心缩放裁切 target_ratio = width / height filters.append( f"crop='min(iw,ih*{target_ratio})':'min(ih,iw/{target_ratio})'" ) # 4. 缩放和填充 scale_filter = ( f"scale={width}:{height}:force_original_aspect_ratio=decrease," f"pad={width}:{height}:(ow-iw)/2:(oh-ih)/2:black" ) filters.append(scale_filter) # 5. 特效处理(cameraShot 需要特殊处理) if has_camera_shot: # cameraShot 需要使用 filter_complex 格式 return self._build_filter_complex_with_effects( base_filters=filters, effects=effects, fps=fps, has_overlay=has_overlay, overlap_head_ms=overlap_head_ms, overlap_tail_ms=overlap_tail_ms, use_hwdownload=bool(hwaccel_prefix) ) # 6. 帧冻结(tpad)- 用于转场 overlap 区域 # 注意:tpad 必须在缩放之后应用 tpad_parts = [] if overlap_head_ms > 0: # 头部冻结:将第一帧冻结指定时长 head_duration_sec = overlap_head_ms / 1000.0 tpad_parts.append(f"start_mode=clone:start_duration={head_duration_sec}") if overlap_tail_ms > 0: # 尾部冻结:将最后一帧冻结指定时长 tail_duration_sec = overlap_tail_ms / 1000.0 tpad_parts.append(f"stop_mode=clone:stop_duration={tail_duration_sec}") if tpad_parts: filters.append(f"tpad={':'.join(tpad_parts)}") # 7. 构建最终滤镜 if has_overlay: # 使用 filter_complex 格式 base_filters = ','.join(filters) if filters else 'copy' return f"[0:v]{base_filters}[base];[base][1:v]overlay=0:0" else: return ','.join(filters) if filters else '' def _build_filter_complex_with_effects( self, base_filters: List[str], effects: List[Effect], fps: int, has_overlay: bool = False, overlap_head_ms: int = 0, overlap_tail_ms: int = 0, use_hwdownload: bool = False ) -> str: """ 构建包含特效的 filter_complex 滤镜图 cameraShot 效果需要使用 split/freezeframes/concat 滤镜组合。 Args: base_filters: 基础滤镜列表 effects: 特效列表 fps: 帧率 has_overlay: 是否有叠加层 overlap_head_ms: 头部 overlap 时长 overlap_tail_ms: 尾部 overlap 时长 use_hwdownload: 是否使用了硬件加速解码(已在 base_filters 中包含 hwdownload) Returns: filter_complex 格式的滤镜字符串 """ filter_parts = [] # 基础滤镜链 base_chain = ','.join(base_filters) if base_filters else 'copy' # 当前输出标签 current_output = '[v_base]' filter_parts.append(f"[0:v]{base_chain}{current_output}") # 处理每个特效 effect_idx = 0 for effect in effects: if effect.effect_type == 'cameraShot': start_sec, duration_sec = effect.get_camera_shot_params() if start_sec <= 0 or duration_sec <= 0: continue # cameraShot 实现: # 1. fps + split 分割 # 2. 第一路:trim(0, start+duration) + freezeframes # 3. 第二路:trim(start, end) # 4. concat 拼接 start_frame = start_sec * fps split_out_a = f'[eff{effect_idx}_a]' split_out_b = f'[eff{effect_idx}_b]' effect_output = f'[v_eff{effect_idx}]' # fps + split filter_parts.append( f"{current_output}fps=fps={fps},split{split_out_a}{split_out_b}" ) # 第一路:trim + freezeframes(在 start 帧处冻结 duration 秒) # freezeframes: 从 first 帧开始,用 replace 帧替换后续帧 # 这样实现定格效果:在 start_frame 位置冻结 filter_parts.append( f"{split_out_a}trim=start=0:end={start_sec + duration_sec}," f"setpts=PTS-STARTPTS," f"freezeframes=first={start_frame}:last={start_frame + duration_sec * fps - 1}:replace={start_frame}" f"{split_out_a}" ) # 第二路:trim 从 start 开始 filter_parts.append( f"{split_out_b}trim=start={start_sec},setpts=PTS-STARTPTS{split_out_b}" ) # concat 拼接 filter_parts.append( f"{split_out_a}{split_out_b}concat=n=2:v=1:a=0{effect_output}" ) current_output = effect_output effect_idx += 1 # 帧冻结(tpad)- 用于转场 overlap 区域 tpad_parts = [] if overlap_head_ms > 0: head_duration_sec = overlap_head_ms / 1000.0 tpad_parts.append(f"start_mode=clone:start_duration={head_duration_sec}") if overlap_tail_ms > 0: tail_duration_sec = overlap_tail_ms / 1000.0 tpad_parts.append(f"stop_mode=clone:stop_duration={tail_duration_sec}") if tpad_parts: tpad_output = '[v_tpad]' filter_parts.append(f"{current_output}tpad={':'.join(tpad_parts)}{tpad_output}") current_output = tpad_output # 最终输出 if has_overlay: # 叠加层处理 filter_parts.append(f"{current_output}[1:v]overlay=0:0") else: # 移除最后一个标签,直接输出 # 将最后一个滤镜的输出标签替换为空(直接输出) if filter_parts: last_filter = filter_parts[-1] # 移除末尾的输出标签 if last_filter.endswith(current_output): filter_parts[-1] = last_filter[:-len(current_output)] return ';'.join(filter_parts)