# -*- coding: utf-8 -*- """ 渲染+TS封装处理器 处理 RENDER_SEGMENT_TS 任务,将原素材渲染为视频并封装为 TS 分片。 支持转场 overlap 区域的帧冻结生成和精确裁剪。 """ import os import logging from typing import List, Optional, Tuple from urllib.parse import urlparse, unquote from handlers.base import BaseHandler, VIDEO_ENCODE_ARGS from domain.task import Task, TaskType, RenderSpec, OutputSpec, Effect, IMAGE_EXTENSIONS from domain.result import TaskResult, ErrorCode logger = logging.getLogger(__name__) def _get_extension_from_url(url: str) -> str: """从 URL 提取文件扩展名""" parsed = urlparse(url) path = unquote(parsed.path) _, ext = os.path.splitext(path) return ext.lower() if ext else '' class RenderSegmentTsHandler(BaseHandler): """ 渲染+TS封装处理器 职责: - 下载素材文件 - 下载 LUT 文件(如有) - 下载叠加层(如有) - 下载音频(如有) - 构建 FFmpeg 渲染命令 - 执行渲染(支持帧冻结生成 overlap 区域) - 裁剪 overlap 区域(如需要) - 封装为 TS 分片 - 上传产物 """ def get_supported_type(self) -> TaskType: return TaskType.RENDER_SEGMENT_TS def handle(self, task: Task) -> TaskResult: """处理视频渲染任务""" work_dir = self.create_work_dir(task.task_id) try: # 解析参数 material_url = task.get_material_url() if not material_url: return TaskResult.fail( ErrorCode.E_SPEC_INVALID, "Missing material URL (boundMaterialUrl or sourceRef)" ) # 检查 URL 格式:必须是 HTTP 或 HTTPS 协议 if not material_url.startswith(('http://', 'https://')): source_ref = task.get_source_ref() bound_url = task.get_bound_material_url() logger.error( f"[task:{task.task_id}] Invalid material URL format: '{material_url}'. " f"boundMaterialUrl={bound_url}, sourceRef={source_ref}. " f"Server should provide boundMaterialUrl with HTTP/HTTPS URL." ) return TaskResult.fail( ErrorCode.E_SPEC_INVALID, f"Invalid material URL: '{material_url}' is not a valid HTTP/HTTPS URL. " f"Server must provide boundMaterialUrl." ) render_spec = task.get_render_spec() output_spec = task.get_output_spec() duration_ms = task.get_duration_ms() # 1. 检测素材类型并确定输入文件扩展名 is_image = task.is_image_material() if is_image: # 图片素材:根据 URL 确定扩展名 ext = _get_extension_from_url(material_url) if not ext or ext not in IMAGE_EXTENSIONS: ext = '.jpg' # 默认扩展名 input_file = os.path.join(work_dir, f'input{ext}') else: input_file = os.path.join(work_dir, 'input.mp4') # 2. 构建并行下载任务(主素材 + 可选 LUT + 可选叠加层 + 可选音频) audio_url = task.get_audio_url() audio_file = None lut_file = os.path.join(work_dir, 'lut.cube') if render_spec.lut_url else None overlay_file = None if render_spec.overlay_url: # 根据 URL 后缀确定文件扩展名 overlay_url_lower = render_spec.overlay_url.lower() if overlay_url_lower.endswith('.jpg') or overlay_url_lower.endswith('.jpeg'): overlay_ext = '.jpg' elif overlay_url_lower.endswith('.mov'): overlay_ext = '.mov' else: overlay_ext = '.png' overlay_file = os.path.join(work_dir, f'overlay{overlay_ext}') download_jobs = [ { 'key': 'material', 'url': material_url, 'dest': input_file, 'required': True } ] if render_spec.lut_url and lut_file: download_jobs.append({ 'key': 'lut', 'url': render_spec.lut_url, 'dest': lut_file, 'required': False }) if render_spec.overlay_url and overlay_file: download_jobs.append({ 'key': 'overlay', 'url': render_spec.overlay_url, 'dest': overlay_file, 'required': False }) if audio_url: audio_file = os.path.join(work_dir, 'audio.aac') download_jobs.append({ 'key': 'audio', 'url': audio_url, 'dest': audio_file, 'required': True }) download_results = self.download_files_parallel(download_jobs) material_result = download_results.get('material') if not material_result or not material_result['success']: return TaskResult.fail( ErrorCode.E_INPUT_UNAVAILABLE, f"Failed to download material: {material_url}" ) if render_spec.lut_url: lut_result = download_results.get('lut') if not lut_result or not lut_result['success']: logger.warning(f"[task:{task.task_id}] Failed to download LUT, continuing without it") lut_file = None if render_spec.overlay_url: overlay_result = download_results.get('overlay') if not overlay_result or not overlay_result['success']: logger.warning(f"[task:{task.task_id}] Failed to download overlay, continuing without it") overlay_file = None if audio_url: audio_dl = download_results.get('audio') if not audio_dl or not audio_dl['success']: return TaskResult.fail( ErrorCode.E_INPUT_UNAVAILABLE, f"Failed to download audio: {audio_url}" ) # 3. 图片素材转换为视频 if is_image: video_input_file = os.path.join(work_dir, 'input_video.mp4') if not self._convert_image_to_video( image_file=input_file, output_file=video_input_file, duration_ms=duration_ms, output_spec=output_spec, render_spec=render_spec, task_id=task.task_id ): return TaskResult.fail( ErrorCode.E_FFMPEG_FAILED, "Failed to convert image to video" ) # 使用转换后的视频作为输入 input_file = video_input_file logger.info(f"[task:{task.task_id}] Image converted to video successfully") # 4. 探测源视频时长(仅对视频素材) # 用于检测时长不足并通过冻结最后一帧补足 source_duration_sec = None if not is_image: source_duration = self.probe_duration(input_file) if source_duration: source_duration_sec = source_duration speed = float(render_spec.speed) if render_spec.speed else 1.0 if speed > 0: # 计算变速后的有效时长 effective_duration_sec = source_duration_sec / speed required_duration_sec = duration_ms / 1000.0 # 如果源视频时长不足,记录日志 if effective_duration_sec < required_duration_sec: shortage_sec = required_duration_sec - effective_duration_sec logger.warning( f"[task:{task.task_id}] Source video duration insufficient: " f"effective={effective_duration_sec:.2f}s (speed={speed}), " f"required={required_duration_sec:.2f}s, " f"will freeze last frame for {shortage_sec:.2f}s" ) # 5. 计算 overlap 时长(用于转场帧冻结) # 头部 overlap: 来自前一片段的出场转场 overlap_head_ms = task.get_overlap_head_ms() # 尾部 overlap: 当前片段的出场转场 overlap_tail_ms = task.get_overlap_tail_ms_v2() # 6. 构建 FFmpeg 命令 output_file = os.path.join(work_dir, 'output.mp4') cmd = self._build_command( input_file=input_file, output_file=output_file, render_spec=render_spec, output_spec=output_spec, duration_ms=duration_ms, lut_file=lut_file, overlay_file=overlay_file, overlap_head_ms=overlap_head_ms, overlap_tail_ms=overlap_tail_ms, source_duration_sec=source_duration_sec ) # 7. 执行 FFmpeg if not self.run_ffmpeg(cmd, task.task_id): return TaskResult.fail( ErrorCode.E_FFMPEG_FAILED, "FFmpeg rendering failed" ) # 8. 验证输出文件 if not self.ensure_file_exists(output_file, min_size=4096): return TaskResult.fail( ErrorCode.E_FFMPEG_FAILED, "Output file is missing or too small" ) # 9. Overlap 裁剪(仅非转场分片、且有需要裁剪的 overlap 时) is_transition_seg = task.is_transition_segment() trim_head = task.should_trim_head() trim_tail = task.should_trim_tail() trim_head_ms = task.get_trim_head_ms() trim_tail_ms = task.get_trim_tail_ms() needs_video_trim = not is_transition_seg and ( (trim_head and trim_head_ms > 0) or (trim_tail and trim_tail_ms > 0) ) processed_video = output_file if needs_video_trim: processed_video = os.path.join(work_dir, 'trimmed_video.mp4') trim_cmd = self._build_trim_command( video_file=output_file, output_file=processed_video, trim_head_ms=trim_head_ms if trim_head else 0, trim_tail_ms=trim_tail_ms if trim_tail else 0, output_spec=output_spec ) logger.info(f"[task:{task.task_id}] Trimming video: head={trim_head_ms}ms, tail={trim_tail_ms}ms") if not self.run_ffmpeg(trim_cmd, task.task_id): return TaskResult.fail( ErrorCode.E_FFMPEG_FAILED, "Video trim failed" ) if not self.ensure_file_exists(processed_video, min_size=1024): return TaskResult.fail( ErrorCode.E_FFMPEG_FAILED, "Trimmed video file is missing or too small" ) # 10. 封装 TS start_time_ms = task.get_start_time_ms() start_sec = start_time_ms / 1000.0 duration_sec = duration_ms / 1000.0 ts_output = os.path.join(work_dir, 'segment.ts') ts_cmd = self._build_ts_package_command( video_file=processed_video, audio_file=audio_file, output_file=ts_output, start_sec=start_sec, duration_sec=duration_sec ) if not self.run_ffmpeg(ts_cmd, task.task_id): return TaskResult.fail( ErrorCode.E_FFMPEG_FAILED, "TS packaging failed" ) if not self.ensure_file_exists(ts_output, min_size=1024): return TaskResult.fail( ErrorCode.E_FFMPEG_FAILED, "TS output file is missing or too small" ) # 11. 获取 EXTINF 时长 + 上传 TS actual_duration = self.probe_duration(ts_output) extinf_duration = actual_duration if actual_duration else duration_sec ts_url = self.upload_file(task.task_id, 'ts', ts_output) if not ts_url: return TaskResult.fail( ErrorCode.E_UPLOAD_FAILED, "Failed to upload TS" ) return TaskResult.ok({ 'tsUrl': ts_url, 'extinfDurationSec': extinf_duration }) except Exception as e: logger.error(f"[task:{task.task_id}] Unexpected error: {e}", exc_info=True) return TaskResult.fail(ErrorCode.E_UNKNOWN, str(e)) finally: self.cleanup_work_dir(work_dir) @staticmethod def _build_crop_filter( render_spec: 'RenderSpec', width: int, height: int, task_id: str = '' ) -> Optional[str]: """ 构建裁切滤镜 crop_enable 时:以目标比例为基准,按 crop_scale 倍率裁切,crop_pos 控制位置(默认居中)。 Returns: crop 滤镜字符串,无需裁切时返回 None """ if render_spec.crop_enable: scale = render_spec.crop_scale target_ratio = width / height # 解析裁切位置,默认居中 fx, fy = 0.5, 0.5 if render_spec.crop_pos: try: fx, fy = map(float, render_spec.crop_pos.split(',')) except ValueError: logger.warning(f"[task:{task_id}] Invalid crop position: {render_spec.crop_pos}, using center") fx, fy = 0.5, 0.5 # 基准:源中最大的目标比例矩形,再除以倍率 return ( f"crop='min(iw,ih*{target_ratio})/{scale}':'min(ih,iw/{target_ratio})/{scale}':" f"'(iw-min(iw,ih*{target_ratio})/{scale})*{fx}':" f"'(ih-min(ih,iw/{target_ratio})/{scale})*{fy}'" ) return None def _convert_image_to_video( self, image_file: str, output_file: str, duration_ms: int, output_spec: OutputSpec, render_spec: RenderSpec, task_id: str ) -> bool: """ 将图片转换为视频 使用 FFmpeg 将静态图片转换为指定时长的视频, 同时应用缩放填充和变速处理。 Args: image_file: 输入图片文件路径 output_file: 输出视频文件路径 duration_ms: 目标时长(毫秒) output_spec: 输出规格 render_spec: 渲染规格 task_id: 任务 ID(用于日志) Returns: 是否成功 """ width = output_spec.width height = output_spec.height fps = output_spec.fps # 计算实际时长(考虑变速) speed = float(render_spec.speed) if render_spec.speed else 1.0 if speed <= 0: speed = 1.0 # 变速后的实际播放时长 actual_duration_sec = (duration_ms / 1000.0) / speed # 构建 FFmpeg 命令 cmd = [ 'ffmpeg', '-y', '-hide_banner', '-loop', '1', # 循环输入图片 '-i', image_file, '-t', str(actual_duration_sec), # 输出时长 ] # 构建滤镜:缩放填充到目标尺寸 filters = [] # 裁切处理 crop_filter = self._build_crop_filter(render_spec, width, height, task_id) if crop_filter: filters.append(crop_filter) # 缩放填充 filters.append( f"scale={width}:{height}:force_original_aspect_ratio=decrease," f"pad={width}:{height}:(ow-iw)/2:(oh-ih)/2:black" ) # 格式转换(确保兼容性) filters.append("format=yuv420p") cmd.extend(['-vf', ','.join(filters)]) # 计算总帧数,动态调整 GOP total_frames = int(actual_duration_sec * fps) if total_frames <= 1: gop_size = 1 elif total_frames < fps: gop_size = total_frames else: gop_size = fps * 2 # 正常情况,2 秒一个关键帧 # 编码参数 cmd.extend([ '-c:v', 'libx264', '-preset', 'fast', '-crf', '18', '-r', str(fps), '-g', str(gop_size), '-keyint_min', str(min(gop_size, fps // 2 or 1)), '-force_key_frames', 'expr:eq(n,0)', '-an', # 无音频 output_file ]) logger.info(f"[task:{task_id}] Converting image to video: {actual_duration_sec:.2f}s at {fps}fps") return self.run_ffmpeg(cmd, task_id) def _build_trim_command( self, video_file: str, output_file: str, trim_head_ms: int, trim_tail_ms: int, output_spec ) -> List[str]: """ 构建视频精确裁剪命令(重编码方式) 使用 trim 滤镜进行精确帧级裁剪,而非 -ss/-t 参数的关键帧裁剪。 Args: video_file: 输入视频路径 output_file: 输出视频路径 trim_head_ms: 头部裁剪时长(毫秒) trim_tail_ms: 尾部裁剪时长(毫秒) output_spec: 输出规格 Returns: FFmpeg 命令参数列表 """ original_duration = self.probe_duration(video_file) if not original_duration: original_duration = 10.0 trim_head_sec = trim_head_ms / 1000.0 trim_tail_sec = trim_tail_ms / 1000.0 start_time = trim_head_sec end_time = original_duration - trim_tail_sec vf_filter = f"trim=start={start_time}:end={end_time},setpts=PTS-STARTPTS" cmd = [ 'ffmpeg', '-y', '-hide_banner', '-i', video_file, '-vf', vf_filter, ] cmd.extend(VIDEO_ENCODE_ARGS) fps = output_spec.fps cmd.extend(['-r', str(fps)]) output_duration_sec = end_time - start_time total_frames = int(output_duration_sec * fps) if total_frames <= 1: gop_size = 1 elif total_frames < fps: gop_size = total_frames else: gop_size = fps cmd.extend(['-g', str(gop_size)]) cmd.extend(['-keyint_min', str(min(gop_size, fps // 2 or 1))]) cmd.extend(['-force_key_frames', 'expr:eq(n,0)']) cmd.append('-an') cmd.append(output_file) return cmd def _build_ts_package_command( self, video_file: str, audio_file: Optional[str], output_file: str, start_sec: float, duration_sec: float ) -> List[str]: """ 构建 TS 封装命令 将视频和对应时间区间的音频封装为 TS 分片。 视频使用 copy 模式(已经过渲染/裁剪)。 支持无音频模式(video-only TS)。 Args: video_file: 视频文件路径(已处理) audio_file: 音频文件路径(可选,None 时生成 video-only TS) output_file: 输出文件路径 start_sec: 音频开始时间(秒) duration_sec: 音频时长(秒) Returns: FFmpeg 命令参数列表 """ cmd = [ 'ffmpeg', '-y', '-hide_banner', '-i', video_file, ] if audio_file: cmd.extend(['-ss', str(start_sec), '-t', str(duration_sec), '-i', audio_file]) cmd.extend(['-map', '0:v:0', '-map', '1:a:0', '-c:v', 'copy', '-c:a', 'copy']) else: cmd.extend(['-c:v', 'copy']) cmd.extend([ '-output_ts_offset', str(start_sec), '-muxdelay', '0', '-muxpreload', '0', '-f', 'mpegts', output_file ]) return cmd def _build_command( self, input_file: str, output_file: str, render_spec: RenderSpec, output_spec: OutputSpec, duration_ms: int, lut_file: Optional[str] = None, overlay_file: Optional[str] = None, overlap_head_ms: int = 0, overlap_tail_ms: int = 0, source_duration_sec: Optional[float] = None ) -> List[str]: """ 构建 FFmpeg 渲染命令 Args: input_file: 输入文件路径 output_file: 输出文件路径 render_spec: 渲染规格 output_spec: 输出规格 duration_ms: 目标时长(毫秒) lut_file: LUT 文件路径(可选) overlay_file: 叠加层文件路径(可选) overlap_head_ms: 头部 overlap 时长(毫秒) overlap_tail_ms: 尾部 overlap 时长(毫秒) source_duration_sec: 源视频实际时长(秒),用于检测时长不足 Returns: FFmpeg 命令参数列表 """ cmd = ['ffmpeg', '-y', '-hide_banner'] # 硬件加速解码参数(在输入文件之前) hwaccel_args = self.get_hwaccel_decode_args() if hwaccel_args: cmd.extend(hwaccel_args) # 输入文件 cmd.extend(['-i', input_file]) # 叠加层输入 if overlay_file: cmd.extend(['-i', overlay_file]) # 构建视频滤镜链 filters = self._build_video_filters( render_spec=render_spec, output_spec=output_spec, duration_ms=duration_ms, lut_file=lut_file, overlay_file=overlay_file, overlap_head_ms=overlap_head_ms, overlap_tail_ms=overlap_tail_ms, source_duration_sec=source_duration_sec ) # 应用滤镜 # 检测是否为 filter_complex 格式(包含分号或方括号标签) is_filter_complex = ';' in filters or (filters.startswith('[') and ']' in filters) if is_filter_complex or overlay_file: # 使用 filter_complex 处理 cmd.extend(['-filter_complex', filters]) elif filters: cmd.extend(['-vf', filters]) # 编码参数(根据硬件加速配置动态获取) cmd.extend(self.get_video_encode_args()) # 帧率 fps = output_spec.fps cmd.extend(['-r', str(fps)]) # 时长(包含 overlap 区域) total_duration_ms = duration_ms + overlap_head_ms + overlap_tail_ms duration_sec = total_duration_ms / 1000.0 cmd.extend(['-t', str(duration_sec)]) # 动态调整 GOP 大小:对于短视频,GOP 不能大于总帧数 total_frames = int(duration_sec * fps) if total_frames <= 1: gop_size = 1 elif total_frames < fps: # 短于 1 秒的视频,使用全部帧数作为 GOP(整个视频只有开头一个关键帧) gop_size = total_frames else: # 正常情况,2 秒一个关键帧 gop_size = fps * 2 cmd.extend(['-g', str(gop_size)]) cmd.extend(['-keyint_min', str(min(gop_size, fps // 2 or 1))]) # 强制第一帧为关键帧 cmd.extend(['-force_key_frames', 'expr:eq(n,0)']) # 无音频(视频片段不包含音频) cmd.append('-an') # 输出文件 cmd.append(output_file) return cmd def _build_video_filters( self, render_spec: RenderSpec, output_spec: OutputSpec, duration_ms: int, lut_file: Optional[str] = None, overlay_file: Optional[str] = None, overlap_head_ms: int = 0, overlap_tail_ms: int = 0, source_duration_sec: Optional[float] = None ) -> str: """ 构建视频滤镜链 Args: render_spec: 渲染规格 output_spec: 输出规格 duration_ms: 目标时长(毫秒) lut_file: LUT 文件路径 overlay_file: 叠加层文件路径(支持图片 png/jpg 和视频 mov) overlap_head_ms: 头部 overlap 时长(毫秒) overlap_tail_ms: 尾部 overlap 时长(毫秒) source_duration_sec: 源视频实际时长(秒),用于检测时长不足 Returns: 滤镜字符串 """ filters = [] width = output_spec.width height = output_spec.height fps = output_spec.fps # 判断 overlay 类型 has_overlay = overlay_file is not None is_video_overlay = has_overlay and overlay_file.lower().endswith('.mov') # 解析 effects effects = render_spec.get_effects() has_complex_effect = any( effect.effect_type in {'cameraShot', 'zoom'} for effect in effects ) # 硬件加速时需要先 hwdownload(将 GPU 表面下载到系统内存) hwaccel_prefix = self.get_hwaccel_filter_prefix() if hwaccel_prefix: # 去掉末尾的逗号,作为第一个滤镜 filters.append(hwaccel_prefix.rstrip(',')) # 1. 变速处理(合并 RenderSpec.speed 与 ospeed 效果) speed = float(render_spec.speed) if render_spec.speed else 1.0 if speed <= 0: speed = 1.0 ospeed_factor = 1.0 for effect in effects: if effect.effect_type == 'ospeed': ospeed_factor = effect.get_ospeed_params() break combined_pts_factor = (1.0 / speed) * ospeed_factor if combined_pts_factor != 1.0: filters.append(f"setpts={combined_pts_factor}*PTS") # 2. LUT 调色 if lut_file: # 路径中的反斜杠需要转换,冒号需要转义(FFmpeg filter语法中冒号是特殊字符) lut_path = lut_file.replace('\\', '/').replace(':', r'\:') filters.append(f"lut3d='{lut_path}'") # 3. 裁切处理 crop_filter = self._build_crop_filter(render_spec, width, height) if crop_filter: filters.append(crop_filter) # 4. 缩放和填充 scale_filter = ( f"scale={width}:{height}:force_original_aspect_ratio=decrease," f"pad={width}:{height}:(ow-iw)/2:(oh-ih)/2:black" ) filters.append(scale_filter) # 5. 特效处理(cameraShot / zoom 需要使用 filter_complex) if has_complex_effect: return self._build_filter_complex_with_effects( base_filters=filters, effects=effects, fps=fps, width=width, height=height, has_overlay=has_overlay, is_video_overlay=is_video_overlay, overlap_head_ms=overlap_head_ms, overlap_tail_ms=overlap_tail_ms, use_hwdownload=bool(hwaccel_prefix), duration_ms=duration_ms, render_spec=render_spec, source_duration_sec=source_duration_sec ) # 6. 帧冻结(tpad)- 用于转场 overlap 区域和时长不足补足 # 注意:tpad 必须在缩放之后应用 tpad_parts = [] # 计算是否需要额外的尾部冻结(源视频时长不足) extra_tail_freeze_sec = 0.0 if source_duration_sec is not None: # 使用已计算的 combined_pts_factor effective_duration_sec = source_duration_sec * combined_pts_factor required_duration_sec = duration_ms / 1000.0 # 如果源视频时长不足,需要冻结最后一帧来补足 if effective_duration_sec < required_duration_sec: extra_tail_freeze_sec = required_duration_sec - effective_duration_sec if overlap_head_ms > 0: # 头部冻结:将第一帧冻结指定时长 head_duration_sec = overlap_head_ms / 1000.0 tpad_parts.append(f"start_mode=clone:start_duration={head_duration_sec}") # 尾部冻结:合并 overlap 和时长不足的冻结 total_tail_freeze_sec = (overlap_tail_ms / 1000.0) + extra_tail_freeze_sec if total_tail_freeze_sec > 0: # 将最后一帧冻结指定时长 tpad_parts.append(f"stop_mode=clone:stop_duration={total_tail_freeze_sec}") if tpad_parts: filters.append(f"tpad={':'.join(tpad_parts)}") # 7. 构建最终滤镜 if has_overlay: # 使用 filter_complex 格式 base_filters = ','.join(filters) if filters else 'copy' overlay_scale = f"scale={width}:{height}" # 视频 overlay 使用 eof_action=pass(结束后消失),图片 overlay 使用默认行为(保持显示) overlay_params = 'eof_action=pass' if is_video_overlay else '' overlay_filter = f"overlay=0:0:{overlay_params}" if overlay_params else 'overlay=0:0' # 视频 overlay 需要在末尾统一颜色范围,避免 overlay 结束后 range 从 tv 变为 pc range_fix = ',format=yuv420p,setrange=tv' if is_video_overlay else '' return f"[0:v]{base_filters}[base];[1:v]{overlay_scale}[overlay];[base][overlay]{overlay_filter}{range_fix}" else: return ','.join(filters) if filters else '' def _build_filter_complex_with_effects( self, base_filters: List[str], effects: List[Effect], fps: int, width: int, height: int, has_overlay: bool = False, is_video_overlay: bool = False, overlap_head_ms: int = 0, overlap_tail_ms: int = 0, use_hwdownload: bool = False, duration_ms: int = 0, render_spec: Optional[RenderSpec] = None, source_duration_sec: Optional[float] = None ) -> str: """ 构建包含特效的 filter_complex 滤镜图 cameraShot / zoom 效果都在此处统一处理并按 effects 顺序叠加。 Args: base_filters: 基础滤镜列表 effects: 特效列表 fps: 帧率 width: 输出宽度 height: 输出高度 has_overlay: 是否有叠加层 is_video_overlay: 叠加层是否为视频格式(如 .mov) overlap_head_ms: 头部 overlap 时长 overlap_tail_ms: 尾部 overlap 时长 use_hwdownload: 是否使用了硬件加速解码(已在 base_filters 中包含 hwdownload) duration_ms: 目标时长(毫秒) render_spec: 渲染规格(用于获取变速参数) source_duration_sec: 源视频实际时长(秒),用于检测时长不足 Returns: filter_complex 格式的滤镜字符串 """ filter_parts = [] # 基础滤镜链 base_chain = ','.join(base_filters) if base_filters else 'copy' # 当前输出标签 current_output = '[v_base]' filter_parts.append(f"[0:v]{base_chain}{current_output}") # 处理每个特效 effect_idx = 0 for effect in effects: if effect.effect_type == 'cameraShot': start_sec, duration_sec = effect.get_camera_shot_params() if start_sec <= 0 or duration_sec <= 0: continue # cameraShot 实现(定格效果): # 1. fps + split 分割 # 2. 第一路:trim(0, start) + tpad冻结duration秒 # 3. 第二路:trim(start, end) # 4. concat 拼接 split_out_a = f'[eff{effect_idx}_a]' split_out_b = f'[eff{effect_idx}_b]' frozen_out = f'[eff{effect_idx}_frozen]' rest_out = f'[eff{effect_idx}_rest]' effect_output = f'[v_eff{effect_idx}]' # fps + split filter_parts.append( f"{current_output}fps=fps={fps},split{split_out_a}{split_out_b}" ) # 第一路:trim(0, start) + tpad冻结 # tpad=stop_mode=clone 将最后一帧冻结指定时长 filter_parts.append( f"{split_out_a}trim=start=0:end={start_sec},setpts=PTS-STARTPTS," f"tpad=stop_mode=clone:stop_duration={duration_sec}{frozen_out}" ) # 第二路:trim 从 start 开始 filter_parts.append( f"{split_out_b}trim=start={start_sec},setpts=PTS-STARTPTS{rest_out}" ) # concat 拼接 filter_parts.append( f"{frozen_out}{rest_out}concat=n=2:v=1:a=0{effect_output}" ) current_output = effect_output effect_idx += 1 elif effect.effect_type == 'zoom': start_sec, scale_factor, duration_sec = effect.get_zoom_params() if start_sec < 0 or scale_factor <= 1.0 or duration_sec <= 0: continue zoom_end_sec = start_sec + duration_sec base_out = f'[eff{effect_idx}_base]' zoom_source_out = f'[eff{effect_idx}_zoom_src]' zoom_scaled_out = f'[eff{effect_idx}_zoom_scaled]' effect_output = f'[v_eff{effect_idx}]' zoom_enable = f"'between(t,{start_sec},{zoom_end_sec})'" filter_parts.append( f"{current_output}split=2{base_out}{zoom_source_out}" ) filter_parts.append( f"{zoom_source_out}scale=iw*{scale_factor}:ih*{scale_factor}," f"crop={width}:{height}:(in_w-{width})/2:(in_h-{height})/2{zoom_scaled_out}" ) filter_parts.append( f"{base_out}{zoom_scaled_out}overlay=0:0:enable={zoom_enable}{effect_output}" ) current_output = effect_output effect_idx += 1 # 帧冻结(tpad)- 用于转场 overlap 区域和时长不足补足 tpad_parts = [] # 计算是否需要额外的尾部冻结(源视频时长不足) extra_tail_freeze_sec = 0.0 if source_duration_sec is not None and render_spec is not None and duration_ms > 0: speed = float(render_spec.speed) if render_spec.speed else 1.0 if speed <= 0: speed = 1.0 ospeed_factor = 1.0 for effect in effects: if effect.effect_type == 'ospeed': ospeed_factor = effect.get_ospeed_params() break combined_pts_factor = (1.0 / speed) * ospeed_factor effective_duration_sec = source_duration_sec * combined_pts_factor required_duration_sec = duration_ms / 1000.0 # 如果源视频时长不足,需要冻结最后一帧来补足 if effective_duration_sec < required_duration_sec: extra_tail_freeze_sec = required_duration_sec - effective_duration_sec if overlap_head_ms > 0: head_duration_sec = overlap_head_ms / 1000.0 tpad_parts.append(f"start_mode=clone:start_duration={head_duration_sec}") # 尾部冻结:合并 overlap 和时长不足的冻结 total_tail_freeze_sec = (overlap_tail_ms / 1000.0) + extra_tail_freeze_sec if total_tail_freeze_sec > 0: tpad_parts.append(f"stop_mode=clone:stop_duration={total_tail_freeze_sec}") if tpad_parts: tpad_output = '[v_tpad]' filter_parts.append(f"{current_output}tpad={':'.join(tpad_parts)}{tpad_output}") current_output = tpad_output # 最终输出 if has_overlay: # 叠加层处理 # 视频 overlay 使用 eof_action=pass(结束后消失),图片 overlay 使用默认行为(保持显示) overlay_params = 'eof_action=pass' if is_video_overlay else '' overlay_filter = f"overlay=0:0:{overlay_params}" if overlay_params else 'overlay=0:0' overlay_scale = f"scale={width}:{height}" overlay_output = '[v_overlay]' # 视频 overlay 需要在末尾统一颜色范围,避免 overlay 结束后 range 从 tv 变为 pc range_fix = ',format=yuv420p,setrange=tv' if is_video_overlay else '' filter_parts.append(f"[1:v]{overlay_scale}{overlay_output}") filter_parts.append(f"{current_output}{overlay_output}{overlay_filter}{range_fix}") else: # 移除最后一个标签,直接输出 # 将最后一个滤镜的输出标签替换为空(直接输出) if filter_parts: last_filter = filter_parts[-1] # 移除末尾的输出标签 if last_filter.endswith(current_output): filter_parts[-1] = last_filter[:-len(current_output)] return ';'.join(filter_parts)