feat(video): 添加视频特效处理功能

- 在常量模块中定义支持的特效类型(相机定格、缩放、模糊)
- 在任务域中创建Effect数据类,支持从字符串解析特效配置
- 实现cameraShot特效参数解析和默认值处理
- 扩展RenderSpec类,添加获取特效列表的方法
- 修改视频渲染处理器,集成特效滤镜构建逻辑
- 实现cameraShot特效的filter_complex滤镜图构建
- 添加fps参数支持和overlay检测逻辑优化
- 完成特效与转场overlap的兼容处理
This commit is contained in:
2026-01-13 09:31:39 +08:00
parent 9c6186ecd3
commit a26c44a3cd
3 changed files with 216 additions and 5 deletions

View File

@@ -34,6 +34,13 @@ TRANSITION_TYPES = (
'slidedown', # 向下滑动 'slidedown', # 向下滑动
) )
# 支持的特效类型
EFFECT_TYPES = (
'cameraShot', # 相机定格效果:在指定时间点冻结画面
'zoom', # 缩放效果(预留)
'blur', # 模糊效果(预留)
)
# 统一视频编码参数(来自集成文档) # 统一视频编码参数(来自集成文档)
VIDEO_ENCODE_PARAMS = { VIDEO_ENCODE_PARAMS = {
'codec': 'libx264', 'codec': 'libx264',

View File

@@ -34,6 +34,13 @@ TRANSITION_TYPES = {
'slidedown': 'slidedown', # 向下滑动 'slidedown': 'slidedown', # 向下滑动
} }
# 支持的特效类型
EFFECT_TYPES = {
'cameraShot', # 相机定格效果
'zoom', # 缩放效果(预留)
'blur', # 模糊效果(预留)
}
class TaskStatus(Enum): class TaskStatus(Enum):
"""任务状态枚举""" """任务状态枚举"""
@@ -76,6 +83,70 @@ class TransitionConfig:
return TRANSITION_TYPES.get(self.type, 'fade') return TRANSITION_TYPES.get(self.type, 'fade')
@dataclass
class Effect:
"""
特效配置
格式:type:params
例如:cameraShot:3,1 表示在第3秒定格1秒
"""
effect_type: str # 效果类型
params: str = "" # 参数字符串
@classmethod
def from_string(cls, effect_str: str) -> Optional['Effect']:
"""
从字符串解析 Effect
格式:type:params 或 type(无参数时)
"""
if not effect_str:
return None
parts = effect_str.split(':', 1)
effect_type = parts[0].strip()
if effect_type not in EFFECT_TYPES:
return None
params = parts[1].strip() if len(parts) > 1 else ""
return cls(effect_type=effect_type, params=params)
@classmethod
def parse_effects(cls, effects_str: Optional[str]) -> List['Effect']:
"""
解析效果字符串
格式:effect1|effect2|effect3
例如:cameraShot:3,1|blur:5
"""
if not effects_str:
return []
effects = []
for part in effects_str.split('|'):
effect = cls.from_string(part.strip())
if effect:
effects.append(effect)
return effects
def get_camera_shot_params(self) -> tuple:
"""
获取 cameraShot 效果参数
Returns:
(start_sec, duration_sec): 开始时间和持续时间(秒)
"""
if self.effect_type != 'cameraShot':
return (0, 0)
if not self.params:
return (3, 1) # 默认值
parts = self.params.split(',')
try:
start = int(parts[0]) if len(parts) >= 1 else 3
duration = int(parts[1]) if len(parts) >= 2 else 1
return (start, duration)
except ValueError:
return (3, 1)
@dataclass @dataclass
class RenderSpec: class RenderSpec:
""" """
@@ -137,6 +208,10 @@ class RenderSpec:
return self.transition_out.get_overlap_ms() return self.transition_out.get_overlap_ms()
return 0 return 0
def get_effects(self) -> List['Effect']:
"""获取解析后的特效列表"""
return Effect.parse_effects(self.effects)
@dataclass @dataclass
class OutputSpec: class OutputSpec:

View File

@@ -11,7 +11,7 @@ import logging
from typing import List, Optional, Tuple from typing import List, Optional, Tuple
from handlers.base import BaseHandler, VIDEO_ENCODE_ARGS from handlers.base import BaseHandler, VIDEO_ENCODE_ARGS
from domain.task import Task, TaskType, RenderSpec, OutputSpec from domain.task import Task, TaskType, RenderSpec, OutputSpec, Effect
from domain.result import TaskResult, ErrorCode from domain.result import TaskResult, ErrorCode
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -188,8 +188,10 @@ class RenderSegmentVideoHandler(BaseHandler):
) )
# 应用滤镜 # 应用滤镜
if overlay_file: # 检测是否为 filter_complex 格式(包含分号或方括号标签)
# 使用 filter_complex 处理叠加 is_filter_complex = ';' in filters or (filters.startswith('[') and ']' in filters)
if is_filter_complex or overlay_file:
# 使用 filter_complex 处理
cmd.extend(['-filter_complex', filters]) cmd.extend(['-filter_complex', filters])
elif filters: elif filters:
cmd.extend(['-vf', filters]) cmd.extend(['-vf', filters])
@@ -245,6 +247,11 @@ class RenderSegmentVideoHandler(BaseHandler):
filters = [] filters = []
width = output_spec.width width = output_spec.width
height = output_spec.height height = output_spec.height
fps = output_spec.fps
# 解析 effects
effects = render_spec.get_effects()
has_camera_shot = any(e.effect_type == 'cameraShot' for e in effects)
# 1. 变速处理 # 1. 变速处理
speed = float(render_spec.speed) if render_spec.speed else 1.0 speed = float(render_spec.speed) if render_spec.speed else 1.0
@@ -288,7 +295,19 @@ class RenderSegmentVideoHandler(BaseHandler):
) )
filters.append(scale_filter) filters.append(scale_filter)
# 5. 帧冻结(tpad)- 用于转场 overlap 区域 # 5. 特效处理(cameraShot 需要特殊处理)
if has_camera_shot:
# cameraShot 需要使用 filter_complex 格式
return self._build_filter_complex_with_effects(
base_filters=filters,
effects=effects,
fps=fps,
has_overlay=has_overlay,
overlap_head_ms=overlap_head_ms,
overlap_tail_ms=overlap_tail_ms
)
# 6. 帧冻结(tpad)- 用于转场 overlap 区域
# 注意:tpad 必须在缩放之后应用 # 注意:tpad 必须在缩放之后应用
tpad_parts = [] tpad_parts = []
if overlap_head_ms > 0: if overlap_head_ms > 0:
@@ -303,10 +322,120 @@ class RenderSegmentVideoHandler(BaseHandler):
if tpad_parts: if tpad_parts:
filters.append(f"tpad={':'.join(tpad_parts)}") filters.append(f"tpad={':'.join(tpad_parts)}")
# 6. 构建最终滤镜 # 7. 构建最终滤镜
if has_overlay: if has_overlay:
# 使用 filter_complex 格式 # 使用 filter_complex 格式
base_filters = ','.join(filters) if filters else 'copy' base_filters = ','.join(filters) if filters else 'copy'
return f"[0:v]{base_filters}[base];[base][1:v]overlay=0:0" return f"[0:v]{base_filters}[base];[base][1:v]overlay=0:0"
else: else:
return ','.join(filters) if filters else '' return ','.join(filters) if filters else ''
def _build_filter_complex_with_effects(
self,
base_filters: List[str],
effects: List[Effect],
fps: int,
has_overlay: bool = False,
overlap_head_ms: int = 0,
overlap_tail_ms: int = 0
) -> str:
"""
构建包含特效的 filter_complex 滤镜图
cameraShot 效果需要使用 split/freezeframes/concat 滤镜组合。
Args:
base_filters: 基础滤镜列表
effects: 特效列表
fps: 帧率
has_overlay: 是否有叠加层
overlap_head_ms: 头部 overlap 时长
overlap_tail_ms: 尾部 overlap 时长
Returns:
filter_complex 格式的滤镜字符串
"""
filter_parts = []
# 基础滤镜链
base_chain = ','.join(base_filters) if base_filters else 'copy'
# 当前输出标签
current_output = '[v_base]'
filter_parts.append(f"[0:v]{base_chain}{current_output}")
# 处理每个特效
effect_idx = 0
for effect in effects:
if effect.effect_type == 'cameraShot':
start_sec, duration_sec = effect.get_camera_shot_params()
if start_sec <= 0 or duration_sec <= 0:
continue
# cameraShot 实现:
# 1. fps + split 分割
# 2. 第一路:trim(0, start+duration) + freezeframes
# 3. 第二路:trim(start, end)
# 4. concat 拼接
start_frame = start_sec * fps
split_out_a = f'[eff{effect_idx}_a]'
split_out_b = f'[eff{effect_idx}_b]'
effect_output = f'[v_eff{effect_idx}]'
# fps + split
filter_parts.append(
f"{current_output}fps=fps={fps},split{split_out_a}{split_out_b}"
)
# 第一路:trim + freezeframes(在 start 帧处冻结 duration 秒)
# freezeframes: 从 first 帧开始,用 replace 帧替换后续帧
# 这样实现定格效果:在 start_frame 位置冻结
filter_parts.append(
f"{split_out_a}trim=start=0:end={start_sec + duration_sec},"
f"setpts=PTS-STARTPTS,"
f"freezeframes=first={start_frame}:last={start_frame + duration_sec * fps - 1}:replace={start_frame}"
f"{split_out_a}"
)
# 第二路:trim 从 start 开始
filter_parts.append(
f"{split_out_b}trim=start={start_sec},setpts=PTS-STARTPTS{split_out_b}"
)
# concat 拼接
filter_parts.append(
f"{split_out_a}{split_out_b}concat=n=2:v=1:a=0{effect_output}"
)
current_output = effect_output
effect_idx += 1
# 帧冻结(tpad)- 用于转场 overlap 区域
tpad_parts = []
if overlap_head_ms > 0:
head_duration_sec = overlap_head_ms / 1000.0
tpad_parts.append(f"start_mode=clone:start_duration={head_duration_sec}")
if overlap_tail_ms > 0:
tail_duration_sec = overlap_tail_ms / 1000.0
tpad_parts.append(f"stop_mode=clone:stop_duration={tail_duration_sec}")
if tpad_parts:
tpad_output = '[v_tpad]'
filter_parts.append(f"{current_output}tpad={':'.join(tpad_parts)}{tpad_output}")
current_output = tpad_output
# 最终输出
if has_overlay:
# 叠加层处理
filter_parts.append(f"{current_output}[1:v]overlay=0:0")
else:
# 移除最后一个标签,直接输出
# 将最后一个滤镜的输出标签替换为空(直接输出)
if filter_parts:
last_filter = filter_parts[-1]
# 移除末尾的输出标签
if last_filter.endswith(current_output):
filter_parts[-1] = last_filter[:-len(current_output)]
return ';'.join(filter_parts)