From a26c44a3cd0af39cae9982d4bac2e0aedef36ac4 Mon Sep 17 00:00:00 2001 From: Jerry Yan <792602257@qq.com> Date: Tue, 13 Jan 2026 09:31:39 +0800 Subject: [PATCH] =?UTF-8?q?feat(video):=20=E6=B7=BB=E5=8A=A0=E8=A7=86?= =?UTF-8?q?=E9=A2=91=E7=89=B9=E6=95=88=E5=A4=84=E7=90=86=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 在常量模块中定义支持的特效类型(相机定格、缩放、模糊) - 在任务域中创建Effect数据类,支持从字符串解析特效配置 - 实现cameraShot特效参数解析和默认值处理 - 扩展RenderSpec类,添加获取特效列表的方法 - 修改视频渲染处理器,集成特效滤镜构建逻辑 - 实现cameraShot特效的filter_complex滤镜图构建 - 添加fps参数支持和overlay检测逻辑优化 - 完成特效与转场overlap的兼容处理 --- constant/__init__.py | 7 ++ domain/task.py | 75 +++++++++++++++++++++ handlers/render_video.py | 139 +++++++++++++++++++++++++++++++++++++-- 3 files changed, 216 insertions(+), 5 deletions(-) diff --git a/constant/__init__.py b/constant/__init__.py index 54a353a..93a44a8 100644 --- a/constant/__init__.py +++ b/constant/__init__.py @@ -34,6 +34,13 @@ TRANSITION_TYPES = ( 'slidedown', # 向下滑动 ) +# 支持的特效类型 +EFFECT_TYPES = ( + 'cameraShot', # 相机定格效果:在指定时间点冻结画面 + 'zoom', # 缩放效果(预留) + 'blur', # 模糊效果(预留) +) + # 统一视频编码参数(来自集成文档) VIDEO_ENCODE_PARAMS = { 'codec': 'libx264', diff --git a/domain/task.py b/domain/task.py index 95bd058..2fed94f 100644 --- a/domain/task.py +++ b/domain/task.py @@ -34,6 +34,13 @@ TRANSITION_TYPES = { 'slidedown': 'slidedown', # 向下滑动 } +# 支持的特效类型 +EFFECT_TYPES = { + 'cameraShot', # 相机定格效果 + 'zoom', # 缩放效果(预留) + 'blur', # 模糊效果(预留) +} + class TaskStatus(Enum): """任务状态枚举""" @@ -76,6 +83,70 @@ class TransitionConfig: return TRANSITION_TYPES.get(self.type, 'fade') +@dataclass +class Effect: + """ + 特效配置 + + 格式:type:params + 例如:cameraShot:3,1 表示在第3秒定格1秒 + """ + effect_type: str # 效果类型 + params: str = "" # 参数字符串 + + @classmethod + def from_string(cls, effect_str: str) -> Optional['Effect']: + """ + 从字符串解析 Effect + + 格式:type:params 或 type(无参数时) + """ + if not effect_str: + return None + parts = effect_str.split(':', 1) + effect_type = parts[0].strip() + if effect_type not in EFFECT_TYPES: + return None + params = parts[1].strip() if len(parts) > 1 else "" + return cls(effect_type=effect_type, params=params) + + @classmethod + def parse_effects(cls, effects_str: Optional[str]) -> List['Effect']: + """ + 解析效果字符串 + + 格式:effect1|effect2|effect3 + 例如:cameraShot:3,1|blur:5 + """ + if not effects_str: + return [] + effects = [] + for part in effects_str.split('|'): + effect = cls.from_string(part.strip()) + if effect: + effects.append(effect) + return effects + + def get_camera_shot_params(self) -> tuple: + """ + 获取 cameraShot 效果参数 + + Returns: + (start_sec, duration_sec): 开始时间和持续时间(秒) + """ + if self.effect_type != 'cameraShot': + return (0, 0) + if not self.params: + return (3, 1) # 默认值 + parts = self.params.split(',') + try: + start = int(parts[0]) if len(parts) >= 1 else 3 + duration = int(parts[1]) if len(parts) >= 2 else 1 + return (start, duration) + except ValueError: + return (3, 1) + + @dataclass class RenderSpec: """ @@ -137,6 +208,10 @@ class RenderSpec: return self.transition_out.get_overlap_ms() return 0 + def get_effects(self) -> List['Effect']: + """获取解析后的特效列表""" + return Effect.parse_effects(self.effects) + @dataclass class OutputSpec: diff --git a/handlers/render_video.py b/handlers/render_video.py index 02c1927..6a76299 100644 --- a/handlers/render_video.py +++ b/handlers/render_video.py @@ -11,7 +11,7 @@ import logging from typing import List, Optional, Tuple from handlers.base import BaseHandler, VIDEO_ENCODE_ARGS -from domain.task import Task, TaskType, RenderSpec, OutputSpec +from domain.task import Task, TaskType, RenderSpec, OutputSpec, Effect from domain.result import TaskResult, ErrorCode logger = logging.getLogger(__name__) @@ -188,8 +188,10 @@ class RenderSegmentVideoHandler(BaseHandler): ) # 应用滤镜 - if overlay_file: - # 使用 filter_complex 处理叠加 + # 检测是否为 filter_complex 格式(包含分号或方括号标签) + is_filter_complex = ';' in filters or (filters.startswith('[') and ']' in filters) + if is_filter_complex or overlay_file: + # 使用 filter_complex 处理 cmd.extend(['-filter_complex', filters]) elif filters: cmd.extend(['-vf', filters]) @@ -245,6 +247,11 @@ class RenderSegmentVideoHandler(BaseHandler): filters = [] width = output_spec.width height = output_spec.height + fps = output_spec.fps + + # 解析 effects + effects = render_spec.get_effects() + has_camera_shot = any(e.effect_type == 'cameraShot' for e in effects) # 1. 变速处理 speed = float(render_spec.speed) if render_spec.speed else 1.0 @@ -288,7 +295,19 @@ class RenderSegmentVideoHandler(BaseHandler): ) filters.append(scale_filter) - # 5. 帧冻结(tpad)- 用于转场 overlap 区域 + # 5. 特效处理(cameraShot 需要特殊处理) + if has_camera_shot: + # cameraShot 需要使用 filter_complex 格式 + return self._build_filter_complex_with_effects( + base_filters=filters, + effects=effects, + fps=fps, + has_overlay=has_overlay, + overlap_head_ms=overlap_head_ms, + overlap_tail_ms=overlap_tail_ms + ) + + # 6. 帧冻结(tpad)- 用于转场 overlap 区域 # 注意:tpad 必须在缩放之后应用 tpad_parts = [] if overlap_head_ms > 0: @@ -303,10 +322,120 @@ class RenderSegmentVideoHandler(BaseHandler): if tpad_parts: filters.append(f"tpad={':'.join(tpad_parts)}") - # 6. 构建最终滤镜 + # 7. 构建最终滤镜 if has_overlay: # 使用 filter_complex 格式 base_filters = ','.join(filters) if filters else 'copy' return f"[0:v]{base_filters}[base];[base][1:v]overlay=0:0" else: return ','.join(filters) if filters else '' + + def _build_filter_complex_with_effects( + self, + base_filters: List[str], + effects: List[Effect], + fps: int, + has_overlay: bool = False, + overlap_head_ms: int = 0, + overlap_tail_ms: int = 0 + ) -> str: + """ + 构建包含特效的 filter_complex 滤镜图 + + cameraShot 效果需要使用 split/freezeframes/concat 滤镜组合。 + + Args: + base_filters: 基础滤镜列表 + effects: 特效列表 + fps: 帧率 + has_overlay: 是否有叠加层 + overlap_head_ms: 头部 overlap 时长 + overlap_tail_ms: 尾部 overlap 时长 + + Returns: + filter_complex 格式的滤镜字符串 + """ + filter_parts = [] + + # 基础滤镜链 + base_chain = ','.join(base_filters) if base_filters else 'copy' + + # 当前输出标签 + current_output = '[v_base]' + filter_parts.append(f"[0:v]{base_chain}{current_output}") + + # 处理每个特效 + effect_idx = 0 + for effect in effects: + if effect.effect_type == 'cameraShot': + start_sec, duration_sec = effect.get_camera_shot_params() + if start_sec <= 0 or duration_sec <= 0: + continue + + # cameraShot 实现: + # 1. fps + split 分割 + # 2. 第一路:trim(0, start+duration) + freezeframes + # 3. 第二路:trim(start, end) + # 4. concat 拼接 + + start_frame = start_sec * fps + split_out_a = f'[eff{effect_idx}_a]' + split_out_b = f'[eff{effect_idx}_b]' + effect_output = f'[v_eff{effect_idx}]' + + # fps + split + filter_parts.append( + f"{current_output}fps=fps={fps},split{split_out_a}{split_out_b}" + ) + + # 第一路:trim + freezeframes(在 start 帧处冻结 duration 秒) + # freezeframes: 从 first 帧开始,用 replace 帧替换后续帧 + # 这样实现定格效果:在 start_frame 位置冻结 + filter_parts.append( + f"{split_out_a}trim=start=0:end={start_sec + duration_sec}," + f"setpts=PTS-STARTPTS," + f"freezeframes=first={start_frame}:last={start_frame + duration_sec * fps - 1}:replace={start_frame}" + f"{split_out_a}" + ) + + # 第二路:trim 从 start 开始 + filter_parts.append( + f"{split_out_b}trim=start={start_sec},setpts=PTS-STARTPTS{split_out_b}" + ) + + # concat 拼接 + filter_parts.append( + f"{split_out_a}{split_out_b}concat=n=2:v=1:a=0{effect_output}" + ) + + current_output = effect_output + effect_idx += 1 + + # 帧冻结(tpad)- 用于转场 overlap 区域 + tpad_parts = [] + if overlap_head_ms > 0: + head_duration_sec = overlap_head_ms / 1000.0 + tpad_parts.append(f"start_mode=clone:start_duration={head_duration_sec}") + if overlap_tail_ms > 0: + tail_duration_sec = overlap_tail_ms / 1000.0 + tpad_parts.append(f"stop_mode=clone:stop_duration={tail_duration_sec}") + + if tpad_parts: + tpad_output = '[v_tpad]' + filter_parts.append(f"{current_output}tpad={':'.join(tpad_parts)}{tpad_output}") + current_output = tpad_output + + # 最终输出 + if has_overlay: + # 叠加层处理 + filter_parts.append(f"{current_output}[1:v]overlay=0:0") + else: + # 移除最后一个标签,直接输出 + # 将最后一个滤镜的输出标签替换为空(直接输出) + if filter_parts: + last_filter = filter_parts[-1] + # 移除末尾的输出标签 + if last_filter.endswith(current_output): + filter_parts[-1] = last_filter[:-len(current_output)] + + return ';'.join(filter_parts)