17 Commits

Author SHA1 Message Date
affe933fec fix(video): 解决视频时间戳处理和编码参数问题
- 统一归零视频起始时间戳,避免源素材非 0 起始 PTS 造成封装后首帧冻结
- 修改 setpts 滤镜表达式为 setpts=PTS-STARTPTS 格式
- 为所有速度调整场景应用标准化的时间戳处理
- 添加视频编码参数测试文件,确保 B 帧在各种硬件加速下被禁用
- 为软件、QSV 和 CUDA 硬件加速添加 B 帧禁用测试用例
2026-03-06 17:30:34 +08:00
379e0bf999 fix(video): 优化视频编码参数配置
- 修改了 maxrate 参数的处理逻辑,当 maxrate 有效时设置 -b:v 为 maxrate
- 调整了 bufsize 计算方式,从 2 倍改为 1 倍 maxrate,适合短视频严格码率控制
- 添加了硬件加速 QSV 编码参数支持
- 修复了 NVENC VBV 模型生效条件判断逻辑
- 更新了 CRF/CQ 模式下的峰值码率限制实现
2026-03-04 18:05:00 +08:00
d54e6e948f fix(video): 解决 HLS 流中 B 帧导致的 PTS/DTS 回退问题
- 在硬件加速编码参数中添加 -bf 0 选项禁用 B 帧
- 在 CUDA 硬件加速模式下添加 B 帧禁用配置
- 在软件编码模式下添加 B 帧禁用参数设置
- 添加注释说明禁用 B 帧的原因是为了避免 TS 分片边界问题
2026-03-04 11:25:08 +08:00
ca90336905 feat(video): 添加视频编码最大码率限制功能
- 在 get_video_encode_args 函数中新增 maxrate 参数用于限制峰值码率
- 实现 CRF/CQ 模式下同时控制质量和峰值码率的功能
- 自动计算 bufsize 为 maxrate 的 2 倍值
- 更新 VideoHandler 类中的编码参数方法以传递码率限制
- 修改视频合成和渲染模块以应用输出规格中的码率设置
- 移除静态 VIDEO_ENCODE_ARGS 常量以支持动态参数生成
2026-03-04 10:03:33 +08:00
34e7d84d52 refactor(video): 重构视频裁切功能实现
- 将 crop_size 字段替换为 crop_scale 浮点数字段,支持缩放倍率控制
- 将 face_pos 字段重命名为 crop_pos,统一裁切位置控制
- 移除 zoom_cut 和 crop_size 字段,简化裁切参数
- 新增 _build_crop_filter 静态方法,统一构建裁切滤镜逻辑
- 优化裁切算法,支持按目标比例和倍率进行精确裁切
- 统一处理图像和视频的裁切逻辑,消除代码重复
- 添加 cropScale 参数的安全解析,防止非法数值导致错误
- 改进裁切位置解析,支持浮点数坐标并添加异常处理
2026-02-27 13:37:42 +08:00
9dd5b6237d refactor(worker): 合并渲染和TS封装任务为单一处理流程
- 将 RENDER_SEGMENT_VIDEO 和 PACKAGE_SEGMENT_TS 任务类型合并为 RENDER_SEGMENT_TS
- 移除独立的 PackageSegmentTsHandler,将其功能集成到 RenderSegmentTsHandler 中
- 更新任务执行器中的 GPU 资源分配配置
- 修改单元测试以适配新的任务类型名称
- 在 TaskType 枚举中保留历史任务类型的兼容性标记
- 更新常量定义和默认功能配置中的任务类型引用
- 添加视频精确裁剪和 TS 封装功能到渲染处理器中
2026-02-11 14:30:24 +08:00
c2ece02ecf feat(audio): 添加全局音频淡入淡出功能
- 在 Task 类中新增 get_global_audio_fade_in_ms 和 get_global_audio_fade_out_ms 方法
- 修改 prepare_audio.py 中的音频混音逻辑,支持全局淡入淡出参数
- 新增 _build_global_fade_filters 静态方法构建全局淡入淡出滤镜
- 更新音频混音命令构建逻辑,支持在混音后应用全局淡入淡出效果
- 为无BGM和仅BGM的情况添加全局淡入淡出滤镜支持
- 在amix混音后追加全局淡入淡出滤镜,与片段级音频效果独立处理
2026-02-11 11:53:34 +08:00
952b8f5c01 feat(video): 添加原始变速效果支持
- 在常量定义中新增 ospeed 效果类型用于兼容旧模板
- 在任务域中实现 get_ospeed_params 方法解析变速参数
- 修改视频渲染处理器合并 speed 与 ospeed 效果计算
- 更新时长计算逻辑以正确处理 ospeed 变速影响
- 新增 ospeed 参数验证和边界值处理机制
- 添加完整的 ospeed 效果单元测试覆盖各种场景
2026-02-10 12:20:20 +08:00
3cb2f8d02a test(handlers): 添加基础处理器并行传输相关单元测试
- 实现 download_files_parallel 方法的并发下载功能测试
- 验证上传文件并行处理收集URL功能的正确性
- 测试下载文件时设置锁等待时间跨度属性
- 验证无缓存下载时锁等待时间为零的场景
- 测试上传文件时设置详细的跨度属性功能
- 添加渲染视频效果相关的参数解析测试
- 实现存储服务上传指标
2026-02-07 18:29:54 +08:00
ef4cf549c4 feat(storage): 增强文件上传功能并添加详细的指标追踪
- 在存储服务中新增 upload_file_with_metrics 方法,返回上传结果和详细指标
- 为上传操作添加完整的指标收集,包括 HTTP 尝试次数、重试次数、状态码等
- 集成 OpenTelemetry 追踪,记录文件上传的关键属性和错误标记
- 改进缓存写回逻辑,添加缓存写入失败的日志记录
- 支持 Rclone 上传方式的指标追踪和回退到 HTTP 的情况记录
- 优化本地文件大小检查,避免重复的文件系统调用
- 添加更详细的错误日志,包含上传方法、状态码和错误类型信息
2026-02-07 18:29:20 +08:00
16ea45ad1c perf(cache): 优化缓存下载逻辑并添加性能指标追踪
- 实现了带等待时间统计的缓存锁获取功能
- 新增 get_or_download_with_metrics 方法返回详细的性能指标
- 在 tracing span 中记录锁等待时间、锁获取状态和缓存路径使用情况
- 优化缓存命中路径避免不必要的锁获取操作
- 添加了缓存文件就绪检查和复制功能的独立方法
- 增加了针对缓存锁超时但仍可使用就绪缓存的处理逻辑
- 新增了多个单元测试验证缓存锁定和指标报告功能
2026-02-07 03:45:52 +08:00
ad4a9cc869 feat(video): 添加视频缩放特效功能支持
- 在 EffectConfig 中新增 zoom 特效类型及参数解析
- 实现 get_zoom_params 方法用于获取缩放效果参数
- 更新文档注释说明 zoom 特效使用格式示例
- 修改渲染逻辑支持 zoom 特效的 filter_complex 处理
- 添加缩放特效的视频滤镜构建实现
- 统一处理 cameraShot 和 zoom 特效的效果叠加逻辑
2026-02-07 01:25:20 +08:00
88aa3adca1 feat(base): 添加单任务内文件传输并发功能
- 引入 ThreadPoolExecutor 实现并行下载和上传
- 新增 download_files_parallel 和 upload_files_parallel 方法
- 添加任务传输并发数配置选项 TASK_DOWNLOAD_CONCURRENCY 和 TASK_UPLOAD_CONCURRENCY
- 实现并发数配置的环境变量解析和验证逻辑
- 在多个处理器中应用并行下载优化文件获取性能
- 更新 .env.example 配置文件模板
- 移除 FFmpeg 命令日志长度限制
2026-02-07 00:38:43 +08:00
d955def63c feat(tracing): 增强文件下载上传的日志记录和追踪功能
- 添加任务上下文信息到日志前缀,便于追踪具体任务
- 在跨度中增加文件源URL和上传URL的属性记录
- 将存储服务中的info级别日志调整为debug级别以减少冗余输出
- 添加文件访问地址的调试日志输出
- 优化根日志级别设置允许DEBUG日志流入处理器
- 修复重试失败后的错误日志格式问题
2026-02-07 00:26:01 +08:00
9d16d3c6af feat(gpu): 添加 QSV 硬件加速支持
- 实现 QSV 设备初始化逻辑,支持 Intel 核显
- 区分 QSV 和 CUDA 设备初始化流程
- 添加 QSV 设备验证和配置处理
- 更新设备检测逻辑以支持不同硬件加速类型
- 实现 QSV 设备名称格式化和可用性设置
2026-02-07 00:25:43 +08:00
9b373dea34 feat(tracing): 集成 OpenTelemetry 链路追踪功能
- 在 base.py 中添加文件下载、上传和 FFmpeg 执行的链路追踪
- 在 api_client.py 中实现 API 请求的链路追踪和错误标记
- 在 lease_service.py 中添加租约续期的链路追踪支持
- 在 task_executor.py 中集成任务执行的完整链路追踪
- 新增 util/tracing.py 工具模块提供统一的追踪上下文管理
- 在 .env.example 中添加 OTEL 配置选项
- 在 index.py 中初始化和关闭链路追踪功能
2026-02-07 00:11:01 +08:00
c9a6133be9 fix(logger): 修复PyInstaller打包后的日志目录路径问题
- 添加sys.frozen判断来区分打包环境和开发环境
- 打包环境下使用sys.executable所在目录作为日志目录
- 开发环境下继续使用当前文件所在目录作为日志目录
- 防止打包后日志文件随临时解压目录丢失的问题
2026-02-06 14:02:14 +08:00
25 changed files with 2486 additions and 713 deletions

View File

@@ -30,6 +30,8 @@ TEMP_DIR=tmp/
#FFMPEG_TIMEOUT=3600 # FFmpeg 执行超时(秒)
#DOWNLOAD_TIMEOUT=300 # 下载超时(秒)
#UPLOAD_TIMEOUT=600 # 上传超时(秒)
#TASK_DOWNLOAD_CONCURRENCY=4 # 单任务内并行下载数(1-16)
#TASK_UPLOAD_CONCURRENCY=2 # 单任务内并行上传数(1-16)
# ===================
# 硬件加速与多显卡
@@ -63,3 +65,9 @@ HW_ACCEL=none
#UPLOAD_METHOD=rclone
#RCLONE_CONFIG_FILE= # rclone 配置文件路径
#RCLONE_REPLACE_MAP="https://oss.example.com|alioss://bucket"
# ===================
# OTel 链路追踪
# ===================
# 是否启用 OTel 追踪(默认 true)
#OTEL_ENABLED=true

View File

@@ -10,10 +10,9 @@ SOFTWARE_VERSION = '2.0.0'
# 支持的任务类型
TASK_TYPES = (
'RENDER_SEGMENT_VIDEO',
'RENDER_SEGMENT_TS',
'COMPOSE_TRANSITION',
'PREPARE_JOB_AUDIO',
'PACKAGE_SEGMENT_TS',
'FINALIZE_MP4',
)
@@ -39,6 +38,7 @@ EFFECT_TYPES = (
'cameraShot', # 相机定格效果:在指定时间点冻结画面
'zoom', # 缩放效果(预留)
'blur', # 模糊效果(预留)
'ospeed', # 原始变速效果(兼容旧模板)
)
# 硬件加速类型

View File

@@ -17,9 +17,8 @@ logger = logging.getLogger(__name__)
# 默认支持的任务类型
DEFAULT_CAPABILITIES = [
"RENDER_SEGMENT_VIDEO",
"RENDER_SEGMENT_TS",
"PREPARE_JOB_AUDIO",
"PACKAGE_SEGMENT_TS",
"FINALIZE_MP4"
]

View File

@@ -5,12 +5,13 @@
定义任务类型、任务实体、渲染规格、输出规格等数据结构。
"""
from enum import Enum
from dataclasses import dataclass, field
from typing import Dict, Any, Optional, List
from datetime import datetime
from urllib.parse import urlparse, unquote
import os
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from math import isfinite
from typing import Dict, Any, Optional, List
from urllib.parse import urlparse, unquote
# 支持的图片扩展名
@@ -19,12 +20,15 @@ IMAGE_EXTENSIONS = {'.jpg', '.jpeg', '.png', '.webp', '.bmp', '.gif'}
class TaskType(Enum):
"""任务类型枚举"""
RENDER_SEGMENT_VIDEO = "RENDER_SEGMENT_VIDEO" # 渲染视频片段
RENDER_SEGMENT_TS = "RENDER_SEGMENT_TS" # 渲染+封装 TS(合并原 RENDER_SEGMENT_VIDEO + PACKAGE_SEGMENT_TS)
COMPOSE_TRANSITION = "COMPOSE_TRANSITION" # 合成转场效果
PREPARE_JOB_AUDIO = "PREPARE_JOB_AUDIO" # 生成全局音频
PACKAGE_SEGMENT_TS = "PACKAGE_SEGMENT_TS" # 封装 TS 分片
FINALIZE_MP4 = "FINALIZE_MP4" # 产出最终 MP4
# Deprecated: 历史任务类型,保留枚举值供兼容
RENDER_SEGMENT_VIDEO = "RENDER_SEGMENT_VIDEO"
PACKAGE_SEGMENT_TS = "PACKAGE_SEGMENT_TS"
# 支持的转场类型(对应 FFmpeg xfade 参数)
TRANSITION_TYPES = {
@@ -45,6 +49,7 @@ EFFECT_TYPES = {
'cameraShot', # 相机定格效果
'zoom', # 缩放效果(预留)
'blur', # 模糊效果(预留)
'ospeed', # 原始变速效果(兼容旧模板)
}
@@ -95,7 +100,9 @@ class Effect:
特效配置
格式:type:params
例如:cameraShot:3,1 表示在第3秒定格1秒
例如:
- cameraShot:3,1 表示在第3秒定格1秒
- zoom:1.5,1.2,2 表示从第1.5秒开始放大 1.2 倍并持续 2 秒
"""
effect_type: str # 效果类型
params: str = "" # 参数字符串
@@ -122,7 +129,7 @@ class Effect:
解析效果字符串
格式:effect1|effect2|effect3
例如:cameraShot:3,1|blur:5
例如:cameraShot:3,1|zoom:1.5,1.2,2
"""
if not effects_str:
return []
@@ -152,6 +159,54 @@ class Effect:
except ValueError:
return (3, 1)
def get_zoom_params(self) -> tuple:
"""
获取 zoom 效果参数
Returns:
(start_sec, scale_factor, duration_sec): 起始时间、放大倍数、持续时长(秒)
"""
if self.effect_type != 'zoom':
return (0.0, 1.2, 1.0)
default_start_sec = 0.0
default_scale_factor = 1.2
default_duration_sec = 1.0
if not self.params:
return (default_start_sec, default_scale_factor, default_duration_sec)
parts = [part.strip() for part in self.params.split(',')]
try:
start_sec = float(parts[0]) if len(parts) >= 1 and parts[0] else default_start_sec
scale_factor = float(parts[1]) if len(parts) >= 2 and parts[1] else default_scale_factor
duration_sec = float(parts[2]) if len(parts) >= 3 and parts[2] else default_duration_sec
except ValueError:
return (default_start_sec, default_scale_factor, default_duration_sec)
if not isfinite(start_sec) or start_sec < 0:
start_sec = default_start_sec
if not isfinite(scale_factor) or scale_factor <= 1.0:
scale_factor = default_scale_factor
if not isfinite(duration_sec) or duration_sec <= 0:
duration_sec = default_duration_sec
return (start_sec, scale_factor, duration_sec)
def get_ospeed_params(self) -> float:
"""获取 ospeed 效果参数,返回 PTS 乘数(>0),无效时返回 1.0"""
if self.effect_type != 'ospeed':
return 1.0
if not self.params:
return 1.0
try:
factor = float(self.params.strip())
except ValueError:
return 1.0
if not isfinite(factor) or factor <= 0:
return 1.0
return factor
@dataclass
class RenderSpec:
@@ -161,14 +216,13 @@ class RenderSpec:
用于 RENDER_SEGMENT_VIDEO 任务,定义视频渲染参数。
"""
crop_enable: bool = False
crop_size: Optional[str] = None
crop_scale: float = 1.0
speed: str = "1.0"
lut_url: Optional[str] = None
overlay_url: Optional[str] = None
effects: Optional[str] = None
zoom_cut: bool = False
video_crop: Optional[str] = None
face_pos: Optional[str] = None
crop_pos: Optional[str] = None
transitions: Optional[str] = None
# 转场配置(PRD v2 新增)
transition_in: Optional[TransitionConfig] = None # 入场转场
@@ -179,16 +233,24 @@ class RenderSpec:
"""从字典创建 RenderSpec"""
if not data:
return cls()
# 安全解析 cropScale:接受浮点数或字符串浮点数,非法值回退到 1.0
try:
crop_scale = float(data.get('cropScale', 1.0))
if crop_scale <= 0 or not isfinite(crop_scale):
crop_scale = 1.0
except (ValueError, TypeError):
crop_scale = 1.0
return cls(
crop_enable=data.get('cropEnable', False),
crop_size=data.get('cropSize'),
crop_scale=crop_scale,
speed=str(data.get('speed', '1.0')),
lut_url=data.get('lutUrl'),
overlay_url=data.get('overlayUrl'),
effects=data.get('effects'),
zoom_cut=data.get('zoomCut', False),
video_crop=data.get('videoCrop'),
face_pos=data.get('facePos'),
crop_pos=data.get('cropPos'),
transitions=data.get('transitions'),
transition_in=TransitionConfig.from_dict(data.get('transitionIn')),
transition_out=TransitionConfig.from_dict(data.get('transitionOut'))
@@ -489,6 +551,14 @@ class Task:
"""获取片段列表"""
return self.payload.get('segments', [])
def get_global_audio_fade_in_ms(self) -> int:
"""获取全局音频淡入时长(毫秒),0 表示不淡入"""
return int(self.payload.get('globalAudioFadeInMs', 0))
def get_global_audio_fade_out_ms(self) -> int:
"""获取全局音频淡出时长(毫秒),0 表示不淡出"""
return int(self.payload.get('globalAudioFadeOutMs', 0))
def get_audio_profile(self) -> AudioProfile:
"""获取音频配置"""
return AudioProfile.from_dict(self.payload.get('audioProfile'))

View File

@@ -6,17 +6,15 @@
"""
from handlers.base import BaseHandler
from handlers.render_video import RenderSegmentVideoHandler
from handlers.render_video import RenderSegmentTsHandler
from handlers.compose_transition import ComposeTransitionHandler
from handlers.prepare_audio import PrepareJobAudioHandler
from handlers.package_ts import PackageSegmentTsHandler
from handlers.finalize_mp4 import FinalizeMp4Handler
__all__ = [
'BaseHandler',
'RenderSegmentVideoHandler',
'RenderSegmentTsHandler',
'ComposeTransitionHandler',
'PrepareJobAudioHandler',
'PackageSegmentTsHandler',
'FinalizeMp4Handler',
]

View File

@@ -12,15 +12,25 @@ import shutil
import tempfile
import subprocess
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from abc import ABC
from typing import Optional, List, Dict, Any, Tuple, TYPE_CHECKING
from opentelemetry.trace import SpanKind
from core.handler import TaskHandler
from domain.task import Task
from domain.result import TaskResult, ErrorCode
from domain.config import WorkerConfig
from services import storage
from services.cache import MaterialCache
from util.tracing import (
bind_trace_context,
capture_otel_context,
get_current_task_context,
mark_span_error,
start_span,
)
from constant import (
HW_ACCEL_NONE, HW_ACCEL_QSV, HW_ACCEL_CUDA,
VIDEO_ENCODE_PARAMS, VIDEO_ENCODE_PARAMS_QSV, VIDEO_ENCODE_PARAMS_CUDA
@@ -32,49 +42,71 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
def get_video_encode_args(hw_accel: str = HW_ACCEL_NONE) -> List[str]:
def get_video_encode_args(hw_accel: str = HW_ACCEL_NONE, maxrate: Optional[int] = None) -> List[str]:
"""
根据硬件加速配置获取视频编码参数
Args:
hw_accel: 硬件加速类型 (none, qsv, cuda)
maxrate: 最大码率(bps),用于限制 CRF/CQ 模式的峰值码率。
例如 4000000 表示 4Mbps。
bufsize 设为 1x maxrate(1 秒窗口),适合短视频(<10s)的严格码率控制。
Returns:
FFmpeg 视频编码参数列表
"""
has_maxrate = maxrate is not None and maxrate > 0
if hw_accel == HW_ACCEL_QSV:
params = VIDEO_ENCODE_PARAMS_QSV
return [
args = [
'-c:v', params['codec'],
'-preset', params['preset'],
'-profile:v', params['profile'],
'-level', params['level'],
'-global_quality', params['global_quality'],
'-look_ahead', params['look_ahead'],
# 禁用 B 帧,避免独立 TS 分片在 HLS 边界出现 PTS/DTS 回退
'-bf', '0',
]
elif hw_accel == HW_ACCEL_CUDA:
params = VIDEO_ENCODE_PARAMS_CUDA
return [
args = [
'-c:v', params['codec'],
'-preset', params['preset'],
'-profile:v', params['profile'],
'-level', params['level'],
'-rc', params['rc'],
'-cq', params['cq'],
'-b:v', '0', # 配合 vbr 模式使用 cq
# 有 maxrate 时设置 -b:v 为 maxrate,让 NVENC VBV 模型真正生效;
# 无 maxrate 时保持 -b:v 0(纯 CQ 质量模式)
'-b:v', f'{maxrate // 1000}k' if has_maxrate else '0',
# 禁用 B 帧,避免独立 TS 分片在 HLS 边界出现 PTS/DTS 回退
'-bf', '0',
]
else:
# 软件编码(默认)
params = VIDEO_ENCODE_PARAMS
return [
args = [
'-c:v', params['codec'],
'-preset', params['preset'],
'-profile:v', params['profile'],
'-level', params['level'],
'-crf', params['crf'],
'-pix_fmt', params['pix_fmt'],
# 禁用 B 帧,避免独立 TS 分片在 HLS 边界出现 PTS/DTS 回退
'-bf', '0',
]
# CRF/CQ + maxrate 上限:保留质量控制的同时限制峰值码率
# bufsize = 1x maxrate(1 秒窗口),对短视频 VBV 约束更紧,避免缓冲区过大导致码率失控
if has_maxrate:
maxrate_k = f'{maxrate // 1000}k'
bufsize_k = maxrate_k # 1x maxrate,短视频下收敛更快
args.extend(['-maxrate', maxrate_k, '-bufsize', bufsize_k])
return args
def get_hwaccel_decode_args(hw_accel: str = HW_ACCEL_NONE, device_index: Optional[int] = None) -> List[str]:
"""
@@ -271,6 +303,9 @@ class BaseHandler(TaskHandler, ABC):
# 线程本地存储:用于存储当前线程的 GPU 设备索引
_thread_local = threading.local()
DEFAULT_TASK_DOWNLOAD_CONCURRENCY = 4
DEFAULT_TASK_UPLOAD_CONCURRENCY = 2
MAX_TASK_TRANSFER_CONCURRENCY = 16
def __init__(self, config: WorkerConfig, api_client: 'APIClientV2'):
"""
@@ -287,6 +322,251 @@ class BaseHandler(TaskHandler, ABC):
enabled=config.cache_enabled,
max_size_gb=config.cache_max_size_gb
)
self.task_download_concurrency = self._resolve_task_transfer_concurrency(
"TASK_DOWNLOAD_CONCURRENCY",
self.DEFAULT_TASK_DOWNLOAD_CONCURRENCY
)
self.task_upload_concurrency = self._resolve_task_transfer_concurrency(
"TASK_UPLOAD_CONCURRENCY",
self.DEFAULT_TASK_UPLOAD_CONCURRENCY
)
def _resolve_task_transfer_concurrency(self, env_name: str, default_value: int) -> int:
"""读取并规范化任务内传输并发数配置。"""
raw_value = os.getenv(env_name)
if raw_value is None or not raw_value.strip():
return default_value
try:
parsed_value = int(raw_value.strip())
except ValueError:
logger.warning(
f"Invalid {env_name} value '{raw_value}', using default {default_value}"
)
return default_value
if parsed_value < 1:
logger.warning(f"{env_name} must be >= 1, forcing to 1")
return 1
if parsed_value > self.MAX_TASK_TRANSFER_CONCURRENCY:
logger.warning(
f"{env_name}={parsed_value} exceeds limit {self.MAX_TASK_TRANSFER_CONCURRENCY}, "
f"using {self.MAX_TASK_TRANSFER_CONCURRENCY}"
)
return self.MAX_TASK_TRANSFER_CONCURRENCY
return parsed_value
def download_files_parallel(
self,
download_jobs: List[Dict[str, Any]],
timeout: Optional[int] = None
) -> Dict[str, Dict[str, Any]]:
"""
单任务内并行下载多个文件。
Args:
download_jobs: 下载任务列表。每项字段:
- key: 唯一标识
- url: 下载地址
- dest: 目标文件路径
- required: 是否关键文件(可选,默认 True)
- use_cache: 是否使用缓存(可选,默认 True)
timeout: 单文件下载超时(秒)
Returns:
key -> 结果字典:
- success: 是否成功
- url: 原始 URL
- dest: 目标文件路径
- required: 是否关键文件
"""
if not download_jobs:
return {}
normalized_jobs: List[Dict[str, Any]] = []
seen_keys = set()
for download_job in download_jobs:
job_key = str(download_job.get("key", "")).strip()
job_url = str(download_job.get("url", "")).strip()
job_dest = str(download_job.get("dest", "")).strip()
if not job_key or not job_url or not job_dest:
raise ValueError("Each download job must include non-empty key/url/dest")
if job_key in seen_keys:
raise ValueError(f"Duplicate download job key: {job_key}")
seen_keys.add(job_key)
normalized_jobs.append({
"key": job_key,
"url": job_url,
"dest": job_dest,
"required": bool(download_job.get("required", True)),
"use_cache": bool(download_job.get("use_cache", True)),
})
if timeout is None:
timeout = self.config.download_timeout
parent_otel_context = capture_otel_context()
task_context = get_current_task_context()
task_prefix = f"[task:{task_context.task_id}] " if task_context else ""
results: Dict[str, Dict[str, Any]] = {}
def _run_download_job(download_job: Dict[str, Any]) -> bool:
with bind_trace_context(parent_otel_context, task_context):
return self.download_file(
download_job["url"],
download_job["dest"],
timeout=timeout,
use_cache=download_job["use_cache"],
)
max_workers = min(self.task_download_concurrency, len(normalized_jobs))
if max_workers <= 1:
for download_job in normalized_jobs:
is_success = _run_download_job(download_job)
results[download_job["key"]] = {
"success": is_success,
"url": download_job["url"],
"dest": download_job["dest"],
"required": download_job["required"],
}
else:
with ThreadPoolExecutor(
max_workers=max_workers,
thread_name_prefix="TaskDownload",
) as executor:
future_to_job = {
executor.submit(_run_download_job, download_job): download_job
for download_job in normalized_jobs
}
for completed_future in as_completed(future_to_job):
download_job = future_to_job[completed_future]
is_success = False
try:
is_success = bool(completed_future.result())
except Exception as exc:
logger.error(
f"{task_prefix}Parallel download raised exception for "
f"key={download_job['key']}: {exc}"
)
results[download_job["key"]] = {
"success": is_success,
"url": download_job["url"],
"dest": download_job["dest"],
"required": download_job["required"],
}
success_count = sum(1 for item in results.values() if item["success"])
logger.debug(
f"{task_prefix}Parallel download completed: {success_count}/{len(normalized_jobs)}"
)
return results
def upload_files_parallel(
self,
upload_jobs: List[Dict[str, Any]]
) -> Dict[str, Dict[str, Any]]:
"""
单任务内并行上传多个文件。
Args:
upload_jobs: 上传任务列表。每项字段:
- key: 唯一标识
- task_id: 任务 ID
- file_type: 文件类型(video/audio/ts/mp4)
- file_path: 本地文件路径
- file_name: 文件名(可选)
- required: 是否关键文件(可选,默认 True)
Returns:
key -> 结果字典:
- success: 是否成功
- url: 上传后的访问 URL(失败为 None)
- file_path: 本地文件路径
- required: 是否关键文件
"""
if not upload_jobs:
return {}
normalized_jobs: List[Dict[str, Any]] = []
seen_keys = set()
for upload_job in upload_jobs:
job_key = str(upload_job.get("key", "")).strip()
task_id = str(upload_job.get("task_id", "")).strip()
file_type = str(upload_job.get("file_type", "")).strip()
file_path = str(upload_job.get("file_path", "")).strip()
if not job_key or not task_id or not file_type or not file_path:
raise ValueError(
"Each upload job must include non-empty key/task_id/file_type/file_path"
)
if job_key in seen_keys:
raise ValueError(f"Duplicate upload job key: {job_key}")
seen_keys.add(job_key)
normalized_jobs.append({
"key": job_key,
"task_id": task_id,
"file_type": file_type,
"file_path": file_path,
"file_name": upload_job.get("file_name"),
"required": bool(upload_job.get("required", True)),
})
parent_otel_context = capture_otel_context()
task_context = get_current_task_context()
task_prefix = f"[task:{task_context.task_id}] " if task_context else ""
results: Dict[str, Dict[str, Any]] = {}
def _run_upload_job(upload_job: Dict[str, Any]) -> Optional[str]:
with bind_trace_context(parent_otel_context, task_context):
return self.upload_file(
upload_job["task_id"],
upload_job["file_type"],
upload_job["file_path"],
upload_job.get("file_name")
)
max_workers = min(self.task_upload_concurrency, len(normalized_jobs))
if max_workers <= 1:
for upload_job in normalized_jobs:
result_url = _run_upload_job(upload_job)
results[upload_job["key"]] = {
"success": bool(result_url),
"url": result_url,
"file_path": upload_job["file_path"],
"required": upload_job["required"],
}
else:
with ThreadPoolExecutor(
max_workers=max_workers,
thread_name_prefix="TaskUpload",
) as executor:
future_to_job = {
executor.submit(_run_upload_job, upload_job): upload_job
for upload_job in normalized_jobs
}
for completed_future in as_completed(future_to_job):
upload_job = future_to_job[completed_future]
result_url = None
try:
result_url = completed_future.result()
except Exception as exc:
logger.error(
f"{task_prefix}Parallel upload raised exception for "
f"key={upload_job['key']}: {exc}"
)
results[upload_job["key"]] = {
"success": bool(result_url),
"url": result_url,
"file_path": upload_job["file_path"],
"required": upload_job["required"],
}
success_count = sum(1 for item in results.values() if item["success"])
logger.debug(
f"{task_prefix}Parallel upload completed: {success_count}/{len(normalized_jobs)}"
)
return results
# ========== GPU 设备管理 ==========
@@ -321,14 +601,17 @@ class BaseHandler(TaskHandler, ABC):
# ========== FFmpeg 参数生成 ==========
def get_video_encode_args(self) -> List[str]:
def get_video_encode_args(self, maxrate: Optional[int] = None) -> List[str]:
"""
获取当前配置的视频编码参数
Args:
maxrate: 最大码率(bps),用于限制峰值码率
Returns:
FFmpeg 视频编码参数列表
"""
return get_video_encode_args(self.config.hw_accel)
return get_video_encode_args(self.config.hw_accel, maxrate=maxrate)
def get_hwaccel_decode_args(self) -> List[str]:
"""
@@ -410,21 +693,52 @@ class BaseHandler(TaskHandler, ABC):
if timeout is None:
timeout = self.config.download_timeout
try:
if use_cache:
# 使用缓存下载
result = self.material_cache.get_or_download(url, dest, timeout=timeout)
else:
# 直接下载(不走缓存)
result = storage.download_file(url, dest, timeout=timeout)
task_context = get_current_task_context()
task_prefix = f"[task:{task_context.task_id}] " if task_context else ""
logger.debug(f"{task_prefix}Downloading from: {url} -> {dest}")
if result:
file_size = os.path.getsize(dest) if os.path.exists(dest) else 0
logger.debug(f"Downloaded: {url} -> {dest} ({file_size} bytes)")
return result
except Exception as e:
logger.error(f"Download failed: {url} -> {e}")
return False
with start_span(
"render.task.file.download",
kind=SpanKind.CLIENT,
attributes={
"render.file.source_url": url,
"render.file.destination": dest,
"render.file.use_cache": use_cache,
},
) as span:
try:
lock_wait_ms = 0
lock_acquired = False
cache_path_used = "unknown"
if use_cache:
result, cache_metrics = self.material_cache.get_or_download_with_metrics(
url,
dest,
timeout=timeout
)
lock_wait_ms = int(cache_metrics.get("lock_wait_ms", 0))
lock_acquired = bool(cache_metrics.get("lock_acquired", False))
cache_path_used = str(cache_metrics.get("cache_path_used", "unknown"))
else:
result = storage.download_file(url, dest, timeout=timeout)
cache_path_used = "direct"
if span is not None:
span.set_attribute("render.file.lock_wait_ms", lock_wait_ms)
span.set_attribute("render.file.lock_acquired", lock_acquired)
span.set_attribute("render.file.cache_path_used", cache_path_used)
if result:
file_size = os.path.getsize(dest) if os.path.exists(dest) else 0
logger.debug(f"{task_prefix}Downloaded: {url} -> {dest} ({file_size} bytes)")
if span is not None:
span.set_attribute("render.file.size_bytes", file_size)
return result
except Exception as e:
mark_span_error(span, str(e), ErrorCode.E_INPUT_UNAVAILABLE.value)
logger.error(f"{task_prefix}Download failed: {e}")
logger.debug(f"{task_prefix}Download source address: {url}")
return False
def upload_file(
self,
@@ -445,37 +759,108 @@ class BaseHandler(TaskHandler, ABC):
Returns:
访问 URL,失败返回 None
"""
# 获取上传 URL
upload_info = self.api_client.get_upload_url(task_id, file_type, file_name)
if not upload_info:
logger.error(f"[task:{task_id}] Failed to get upload URL")
return None
upload_url = upload_info.get('uploadUrl')
access_url = upload_info.get('accessUrl')
if not upload_url:
logger.error(f"[task:{task_id}] Invalid upload URL response")
return None
# 上传文件
try:
result = storage.upload_file(upload_url, file_path, timeout=self.config.upload_timeout)
if result:
file_size = os.path.getsize(file_path)
logger.info(f"[task:{task_id}] Uploaded: {file_path} ({file_size} bytes)")
# 将上传成功的文件加入缓存
if access_url:
self.material_cache.add_to_cache(access_url, file_path)
return access_url
else:
logger.error(f"[task:{task_id}] Upload failed: {file_path}")
local_file_exists = os.path.exists(file_path)
local_file_size = os.path.getsize(file_path) if local_file_exists else 0
with start_span(
"render.task.file.upload",
kind=SpanKind.CLIENT,
attributes={
"render.file.type": file_type,
"render.file.path": file_path,
"render.file.timeout_seconds": self.config.upload_timeout,
"render.file.local_exists": local_file_exists,
"render.file.local_size_bytes": local_file_size,
},
) as span:
upload_info = self.api_client.get_upload_url(task_id, file_type, file_name)
if not upload_info:
mark_span_error(span, "get upload url failed", ErrorCode.E_UPLOAD_FAILED.value)
logger.error(f"[task:{task_id}] Failed to get upload URL")
return None
upload_url = upload_info.get('uploadUrl')
access_url = upload_info.get('accessUrl')
if not upload_url:
mark_span_error(span, "invalid upload url response", ErrorCode.E_UPLOAD_FAILED.value)
logger.error(f"[task:{task_id}] Invalid upload URL response")
return None
logger.debug(
f"[task:{task_id}] Upload target address: uploadUrl={upload_url}, accessUrl={access_url}"
)
if span is not None:
span.set_attribute("render.file.upload_url", upload_url)
if access_url:
span.set_attribute("render.file.access_url", access_url)
try:
result, upload_metrics = storage.upload_file_with_metrics(
upload_url,
file_path,
timeout=self.config.upload_timeout,
)
upload_method = str(upload_metrics.get("upload_method", "unknown"))
http_attempts = int(upload_metrics.get("http_attempts", 0))
http_retry_count = int(upload_metrics.get("http_retry_count", 0))
http_status_code = int(upload_metrics.get("http_status_code", 0))
http_replace_applied = bool(upload_metrics.get("http_replace_applied", False))
content_type = str(upload_metrics.get("content_type", ""))
error_type = str(upload_metrics.get("error_type", ""))
rclone_attempted = bool(upload_metrics.get("rclone_attempted", False))
rclone_succeeded = bool(upload_metrics.get("rclone_succeeded", False))
rclone_fallback_http = bool(upload_metrics.get("rclone_fallback_http", False))
if span is not None:
span.set_attribute("render.file.upload_success", bool(result))
span.set_attribute("render.file.upload_method", upload_method)
span.set_attribute("render.file.http_attempts", http_attempts)
span.set_attribute("render.file.http_retry_count", http_retry_count)
span.set_attribute("render.file.http_replace_applied", http_replace_applied)
span.set_attribute("render.file.rclone_attempted", rclone_attempted)
span.set_attribute("render.file.rclone_succeeded", rclone_succeeded)
span.set_attribute("render.file.rclone_fallback_http", rclone_fallback_http)
if content_type:
span.set_attribute("render.file.content_type", content_type)
if http_status_code > 0:
span.set_attribute("render.file.http_status_code", http_status_code)
if error_type:
span.set_attribute("render.file.error_type", error_type)
if result:
file_size = local_file_size if local_file_size > 0 else os.path.getsize(file_path)
logger.info(
f"[task:{task_id}] Uploaded: {file_path} ({file_size} bytes)"
)
logger.debug(f"[task:{task_id}] Uploaded access address: {access_url or upload_url}")
if span is not None:
span.set_attribute("render.file.size_bytes", file_size)
cache_write_back = "skipped"
if access_url:
cache_added = self.material_cache.add_to_cache(access_url, file_path)
cache_write_back = "success" if cache_added else "failed"
if not cache_added:
logger.warning(f"[task:{task_id}] Upload cache write back failed: {file_path}")
if span is not None:
span.set_attribute("render.file.cache_write_back", cache_write_back)
return access_url
mark_span_error(
span,
f"upload failed(method={upload_method}, status={http_status_code}, retries={http_retry_count}, error={error_type})",
ErrorCode.E_UPLOAD_FAILED.value
)
logger.error(
f"[task:{task_id}] Upload failed: {file_path}, method={upload_method}, "
f"http_status={http_status_code}, retries={http_retry_count}, error_type={error_type}"
)
return None
except Exception as e:
mark_span_error(span, str(e), ErrorCode.E_UPLOAD_FAILED.value)
logger.error(f"[task:{task_id}] Upload error: {e}")
return None
except Exception as e:
logger.error(f"[task:{task_id}] Upload error: {e}")
return None
def run_ffmpeg(
self,
@@ -501,35 +886,46 @@ class BaseHandler(TaskHandler, ABC):
if cmd_to_run and cmd_to_run[0] == 'ffmpeg' and '-loglevel' not in cmd_to_run:
cmd_to_run[1:1] = ['-loglevel', FFMPEG_LOGLEVEL]
# 日志记录命令(限制长度)
# 日志记录命令(限制长度)
cmd_str = ' '.join(cmd_to_run)
if len(cmd_str) > 500:
cmd_str = cmd_str[:500] + '...'
logger.info(f"[task:{task_id}] FFmpeg: {cmd_str}")
try:
run_args = subprocess_args(False)
run_args['stdout'] = subprocess.DEVNULL
run_args['stderr'] = subprocess.PIPE
result = subprocess.run(
cmd_to_run,
timeout=timeout,
**run_args
)
with start_span(
"render.task.ffmpeg.run",
attributes={
"render.ffmpeg.timeout_seconds": timeout,
"render.ffmpeg.command": cmd_str,
},
) as span:
try:
run_args = subprocess_args(False)
run_args['stdout'] = subprocess.DEVNULL
run_args['stderr'] = subprocess.PIPE
result = subprocess.run(
cmd_to_run,
timeout=timeout,
**run_args
)
if result.returncode != 0:
stderr = (result.stderr or b'').decode('utf-8', errors='replace')[:1000]
logger.error(f"[task:{task_id}] FFmpeg failed (code={result.returncode}): {stderr}")
if span is not None:
span.set_attribute("render.ffmpeg.return_code", result.returncode)
if result.returncode != 0:
stderr = (result.stderr or b'').decode('utf-8', errors='replace')[:1000]
logger.error(f"[task:{task_id}] FFmpeg failed (code={result.returncode}): {stderr}")
mark_span_error(span, stderr or "ffmpeg failed", ErrorCode.E_FFMPEG_FAILED.value)
return False
return True
except subprocess.TimeoutExpired:
logger.error(f"[task:{task_id}] FFmpeg timeout after {timeout}s")
mark_span_error(span, f"timeout after {timeout}s", ErrorCode.E_TIMEOUT.value)
return False
except Exception as e:
logger.error(f"[task:{task_id}] FFmpeg error: {e}")
mark_span_error(span, str(e), ErrorCode.E_FFMPEG_FAILED.value)
return False
return True
except subprocess.TimeoutExpired:
logger.error(f"[task:{task_id}] FFmpeg timeout after {timeout}s")
return False
except Exception as e:
logger.error(f"[task:{task_id}] FFmpeg error: {e}")
return False
def probe_duration(self, file_path: str) -> Optional[float]:
"""

View File

@@ -91,23 +91,37 @@ class ComposeTransitionHandler(BaseHandler):
f"overlap_tail={overlap_tail_ms}ms, overlap_head={overlap_head_ms}ms"
)
# 1. 下载前一个片段视频
# 1. 并行下载前片段视频
prev_video_file = os.path.join(work_dir, 'prev_segment.mp4')
if not self.download_file(prev_segment['videoUrl'], prev_video_file):
next_video_file = os.path.join(work_dir, 'next_segment.mp4')
download_results = self.download_files_parallel([
{
'key': 'prev_video',
'url': prev_segment['videoUrl'],
'dest': prev_video_file,
'required': True
},
{
'key': 'next_video',
'url': next_segment['videoUrl'],
'dest': next_video_file,
'required': True
}
])
prev_result = download_results.get('prev_video')
if not prev_result or not prev_result['success']:
return TaskResult.fail(
ErrorCode.E_INPUT_UNAVAILABLE,
f"Failed to download prev segment video: {prev_segment['videoUrl']}"
)
# 2. 下载后一个片段视频
next_video_file = os.path.join(work_dir, 'next_segment.mp4')
if not self.download_file(next_segment['videoUrl'], next_video_file):
next_result = download_results.get('next_video')
if not next_result or not next_result['success']:
return TaskResult.fail(
ErrorCode.E_INPUT_UNAVAILABLE,
f"Failed to download next segment video: {next_segment['videoUrl']}"
)
# 3. 获取前一个片段的实际时长
# 2. 获取前一个片段的实际时长
prev_duration = self.probe_duration(prev_video_file)
if not prev_duration:
return TaskResult.fail(
@@ -115,7 +129,7 @@ class ComposeTransitionHandler(BaseHandler):
"Failed to probe prev segment duration"
)
# 4. 构建转场合成命令
# 3. 构建转场合成命令
output_file = os.path.join(work_dir, 'transition.mp4')
cmd = self._build_command(
prev_video_file=prev_video_file,
@@ -128,25 +142,25 @@ class ComposeTransitionHandler(BaseHandler):
output_spec=output_spec
)
# 5. 执行 FFmpeg
# 4. 执行 FFmpeg
if not self.run_ffmpeg(cmd, task.task_id):
return TaskResult.fail(
ErrorCode.E_FFMPEG_FAILED,
"FFmpeg transition composition failed"
)
# 6. 验证输出文件
# 5. 验证输出文件
if not self.ensure_file_exists(output_file, min_size=1024):
return TaskResult.fail(
ErrorCode.E_FFMPEG_FAILED,
"Transition output file is missing or too small"
)
# 7. 获取实际时长
# 6. 获取实际时长
actual_duration = self.probe_duration(output_file)
actual_duration_ms = int(actual_duration * 1000) if actual_duration else transition_duration_ms
# 8. 上传产物
# 7. 上传产物
transition_video_url = self.upload_file(task.task_id, 'video', output_file)
if not transition_video_url:
return TaskResult.fail(
@@ -236,7 +250,7 @@ class ComposeTransitionHandler(BaseHandler):
]
# 编码参数(根据硬件加速配置动态获取)
cmd.extend(self.get_video_encode_args())
cmd.extend(self.get_video_encode_args(maxrate=output_spec.bitrate))
# 帧率
fps = output_spec.fps

View File

@@ -110,16 +110,26 @@ class FinalizeMp4Handler(BaseHandler):
Returns:
TaskResult
"""
# 1. 下载所有 TS 分片
# 1. 并行下载所有 TS 分片
download_jobs = []
for i, ts_url in enumerate(ts_list):
download_jobs.append({
'key': str(i),
'url': ts_url,
'dest': os.path.join(work_dir, f'seg_{i}.ts'),
'required': True
})
download_results = self.download_files_parallel(download_jobs)
ts_files = []
for i, ts_url in enumerate(ts_list):
ts_file = os.path.join(work_dir, f'seg_{i}.ts')
if not self.download_file(ts_url, ts_file):
result = download_results.get(str(i))
if not result or not result['success']:
return TaskResult.fail(
ErrorCode.E_INPUT_UNAVAILABLE,
f"Failed to download TS segment {i}: {ts_url}"
)
ts_files.append(ts_file)
ts_files.append(result['dest'])
logger.info(f"[task:{task.task_id}] Downloaded {len(ts_files)} TS segments")

View File

@@ -1,304 +0,0 @@
# -*- coding: utf-8 -*-
"""
TS 分片封装处理器
处理 PACKAGE_SEGMENT_TS 任务,将视频片段和对应时间区间的音频封装为 TS 分片。
支持转场相关的 overlap 裁剪和转场分片封装。
"""
import os
import logging
from typing import List, Optional
from handlers.base import BaseHandler, VIDEO_ENCODE_ARGS
from domain.task import Task, TaskType
from domain.result import TaskResult, ErrorCode
logger = logging.getLogger(__name__)
class PackageSegmentTsHandler(BaseHandler):
"""
TS 分片封装处理器
职责:
- 下载视频片段
- 下载全局音频
- 截取对应时间区间的音频
- 封装为 TS 分片
- 上传 TS 产物
关键约束:
- TS 必须包含音视频同轨
- 使用 output_ts_offset 保证时间戳连续
- 输出 extinfDurationSec 供 m3u8 使用
转场相关:
- 普通片段 TS:需要裁剪掉 overlap 区域(已被转场分片使用)
- 转场分片 TS:直接封装转场视频产物,无需裁剪
- 无转场时:走原有逻辑,不做裁剪
精确裁剪:
- 当需要裁剪 overlap 区域时,必须使用重编码方式(-vf trim)才能精确切割
- 使用 -c copy 只能从关键帧切割,会导致不精确
"""
def get_supported_type(self) -> TaskType:
return TaskType.PACKAGE_SEGMENT_TS
def handle(self, task: Task) -> TaskResult:
"""处理 TS 封装任务"""
work_dir = self.create_work_dir(task.task_id)
try:
# 解析参数
video_url = task.get_video_url()
audio_url = task.get_audio_url()
start_time_ms = task.get_start_time_ms()
duration_ms = task.get_duration_ms()
output_spec = task.get_output_spec()
# 转场相关参数
is_transition_segment = task.is_transition_segment()
trim_head = task.should_trim_head()
trim_tail = task.should_trim_tail()
trim_head_ms = task.get_trim_head_ms()
trim_tail_ms = task.get_trim_tail_ms()
if not video_url:
return TaskResult.fail(
ErrorCode.E_SPEC_INVALID,
"Missing videoUrl"
)
if not audio_url:
return TaskResult.fail(
ErrorCode.E_SPEC_INVALID,
"Missing audioUrl"
)
# 计算时间参数
start_sec = start_time_ms / 1000.0
duration_sec = duration_ms / 1000.0
# 1. 下载视频片段
video_file = os.path.join(work_dir, 'video.mp4')
if not self.download_file(video_url, video_file):
return TaskResult.fail(
ErrorCode.E_INPUT_UNAVAILABLE,
f"Failed to download video: {video_url}"
)
# 2. 下载全局音频
audio_file = os.path.join(work_dir, 'audio.aac')
if not self.download_file(audio_url, audio_file):
return TaskResult.fail(
ErrorCode.E_INPUT_UNAVAILABLE,
f"Failed to download audio: {audio_url}"
)
# 3. 判断是否需要精确裁剪视频
needs_video_trim = not is_transition_segment and (
(trim_head and trim_head_ms > 0) or
(trim_tail and trim_tail_ms > 0)
)
# 4. 如果需要裁剪,先重编码裁剪视频
processed_video_file = video_file
if needs_video_trim:
processed_video_file = os.path.join(work_dir, 'trimmed_video.mp4')
trim_cmd = self._build_trim_command(
video_file=video_file,
output_file=processed_video_file,
trim_head_ms=trim_head_ms if trim_head else 0,
trim_tail_ms=trim_tail_ms if trim_tail else 0,
output_spec=output_spec
)
logger.info(f"[task:{task.task_id}] Trimming video: head={trim_head_ms}ms, tail={trim_tail_ms}ms")
if not self.run_ffmpeg(trim_cmd, task.task_id):
return TaskResult.fail(
ErrorCode.E_FFMPEG_FAILED,
"Video trim failed"
)
if not self.ensure_file_exists(processed_video_file, min_size=1024):
return TaskResult.fail(
ErrorCode.E_FFMPEG_FAILED,
"Trimmed video file is missing or too small"
)
# 5. 构建 TS 封装命令
output_file = os.path.join(work_dir, 'segment.ts')
cmd = self._build_package_command(
video_file=processed_video_file,
audio_file=audio_file,
output_file=output_file,
start_sec=start_sec,
duration_sec=duration_sec
)
# 6. 执行 FFmpeg
if not self.run_ffmpeg(cmd, task.task_id):
return TaskResult.fail(
ErrorCode.E_FFMPEG_FAILED,
"TS packaging failed"
)
# 7. 验证输出文件
if not self.ensure_file_exists(output_file, min_size=1024):
return TaskResult.fail(
ErrorCode.E_FFMPEG_FAILED,
"TS output file is missing or too small"
)
# 8. 获取实际时长(用于 EXTINF)
actual_duration = self.probe_duration(output_file)
extinf_duration = actual_duration if actual_duration else duration_sec
# 9. 上传产物
ts_url = self.upload_file(task.task_id, 'ts', output_file)
if not ts_url:
return TaskResult.fail(
ErrorCode.E_UPLOAD_FAILED,
"Failed to upload TS"
)
return TaskResult.ok({
'tsUrl': ts_url,
'extinfDurationSec': extinf_duration
})
except Exception as e:
logger.error(f"[task:{task.task_id}] Unexpected error: {e}", exc_info=True)
return TaskResult.fail(ErrorCode.E_UNKNOWN, str(e))
finally:
self.cleanup_work_dir(work_dir)
def _build_trim_command(
self,
video_file: str,
output_file: str,
trim_head_ms: int,
trim_tail_ms: int,
output_spec
) -> List[str]:
"""
构建视频精确裁剪命令(重编码方式)
使用 trim 滤镜进行精确帧级裁剪,而非 -ss/-t 参数的关键帧裁剪。
Args:
video_file: 输入视频路径
output_file: 输出视频路径
trim_head_ms: 头部裁剪时长(毫秒)
trim_tail_ms: 尾部裁剪时长(毫秒)
output_spec: 输出规格
Returns:
FFmpeg 命令参数列表
"""
# 获取原视频时长
original_duration = self.probe_duration(video_file)
if not original_duration:
original_duration = 10.0 # 默认值,避免除零
trim_head_sec = trim_head_ms / 1000.0
trim_tail_sec = trim_tail_ms / 1000.0
# 计算裁剪后的起止时间
start_time = trim_head_sec
end_time = original_duration - trim_tail_sec
# 构建 trim 滤镜
vf_filter = f"trim=start={start_time}:end={end_time},setpts=PTS-STARTPTS"
cmd = [
'ffmpeg', '-y', '-hide_banner',
'-i', video_file,
'-vf', vf_filter,
]
# 编码参数
cmd.extend(VIDEO_ENCODE_ARGS)
# 帧率
fps = output_spec.fps
cmd.extend(['-r', str(fps)])
# 计算输出视频帧数,动态调整 GOP
output_duration_sec = end_time - start_time
total_frames = int(output_duration_sec * fps)
# 动态 GOP:短视频使用较小的 GOP
if total_frames <= 1:
gop_size = 1
elif total_frames < fps:
gop_size = total_frames
else:
gop_size = fps # 每秒一个关键帧
cmd.extend(['-g', str(gop_size)])
cmd.extend(['-keyint_min', str(min(gop_size, fps // 2 or 1))])
# 强制第一帧为关键帧
cmd.extend(['-force_key_frames', 'expr:eq(n,0)'])
# 无音频(音频单独处理)
cmd.append('-an')
cmd.append(output_file)
return cmd
def _build_package_command(
self,
video_file: str,
audio_file: str,
output_file: str,
start_sec: float,
duration_sec: float
) -> List[str]:
"""
构建 TS 封装命令
将视频和对应时间区间的音频封装为 TS 分片。
视频使用 copy 模式(已经过精确裁剪或无需裁剪)。
Args:
video_file: 视频文件路径(已处理)
audio_file: 音频文件路径
output_file: 输出文件路径
start_sec: 音频开始时间(秒)
duration_sec: 音频时长(秒)
Returns:
FFmpeg 命令参数列表
"""
cmd = [
'ffmpeg', '-y', '-hide_banner',
# 视频输入
'-i', video_file,
# 音频输入(从 start_sec 开始截取 duration_sec)
'-ss', str(start_sec),
'-t', str(duration_sec),
'-i', audio_file,
# 映射流
'-map', '0:v:0', # 使用第一个输入的视频流
'-map', '1:a:0', # 使用第二个输入的音频流
# 复制编码(视频已处理,无需重编码)
'-c:v', 'copy',
'-c:a', 'copy',
# 关键:时间戳偏移,保证整体连续
'-output_ts_offset', str(start_sec),
# 复用参数
'-muxdelay', '0',
'-muxpreload', '0',
# 输出格式
'-f', 'mpegts',
output_file
]
return cmd

View File

@@ -55,56 +55,88 @@ class PrepareJobAudioHandler(BaseHandler):
bgm_url = task.get_bgm_url()
segments = task.get_segments()
# 1. 下载 BGM(如有)
bgm_file = None
# 1. 并行下载 BGM 与叠加音效
bgm_file = os.path.join(work_dir, 'bgm.mp3') if bgm_url else None
download_jobs = []
if bgm_url and bgm_file:
download_jobs.append({
'key': 'bgm',
'url': bgm_url,
'dest': bgm_file,
'required': False
})
sfx_download_candidates = []
for i, seg in enumerate(segments):
audio_spec_data = seg.get('audioSpecJson')
if not audio_spec_data:
continue
audio_spec = AudioSpec.from_dict(audio_spec_data)
if not audio_spec or not audio_spec.audio_url:
continue
sfx_file = os.path.join(work_dir, f'sfx_{i}.mp3')
job_key = f'sfx_{i}'
sfx_download_candidates.append({
'key': job_key,
'file': sfx_file,
'spec': audio_spec,
'segment': seg
})
download_jobs.append({
'key': job_key,
'url': audio_spec.audio_url,
'dest': sfx_file,
'required': False
})
download_results = self.download_files_parallel(download_jobs)
if bgm_url:
bgm_file = os.path.join(work_dir, 'bgm.mp3')
if not self.download_file(bgm_url, bgm_file):
bgm_result = download_results.get('bgm')
if not bgm_result or not bgm_result['success']:
logger.warning(f"[task:{task.task_id}] Failed to download BGM")
bgm_file = None
# 2. 下载叠加音效
sfx_files = []
for i, seg in enumerate(segments):
audio_spec_data = seg.get('audioSpecJson')
if audio_spec_data:
audio_spec = AudioSpec.from_dict(audio_spec_data)
if audio_spec and audio_spec.audio_url:
sfx_file = os.path.join(work_dir, f'sfx_{i}.mp3')
if self.download_file(audio_spec.audio_url, sfx_file):
sfx_files.append({
'file': sfx_file,
'spec': audio_spec,
'segment': seg
})
else:
logger.warning(f"[task:{task.task_id}] Failed to download SFX {i}")
for sfx_candidate in sfx_download_candidates:
sfx_result = download_results.get(sfx_candidate['key'])
if sfx_result and sfx_result['success']:
sfx_files.append({
'file': sfx_candidate['file'],
'spec': sfx_candidate['spec'],
'segment': sfx_candidate['segment']
})
else:
logger.warning(f"[task:{task.task_id}] Failed to download SFX {sfx_candidate['key']}")
# 3. 构建音频混音命令
# 2. 构建音频混音命令
output_file = os.path.join(work_dir, 'audio_full.aac')
global_fade_in_ms = task.get_global_audio_fade_in_ms()
global_fade_out_ms = task.get_global_audio_fade_out_ms()
cmd = self._build_audio_command(
bgm_file=bgm_file,
sfx_files=sfx_files,
output_file=output_file,
total_duration_sec=total_duration_sec,
audio_profile=audio_profile
audio_profile=audio_profile,
global_fade_in_ms=global_fade_in_ms,
global_fade_out_ms=global_fade_out_ms
)
# 4. 执行 FFmpeg
# 3. 执行 FFmpeg
if not self.run_ffmpeg(cmd, task.task_id):
return TaskResult.fail(
ErrorCode.E_FFMPEG_FAILED,
"Audio mixing failed"
)
# 5. 验证输出文件
# 4. 验证输出文件
if not self.ensure_file_exists(output_file, min_size=1024):
return TaskResult.fail(
ErrorCode.E_FFMPEG_FAILED,
"Audio output file is missing or too small"
)
# 6. 上传产物
# 5. 上传产物
audio_url = self.upload_file(task.task_id, 'audio', output_file)
if not audio_url:
return TaskResult.fail(
@@ -129,7 +161,9 @@ class PrepareJobAudioHandler(BaseHandler):
sfx_files: List[Dict],
output_file: str,
total_duration_sec: float,
audio_profile: AudioProfile
audio_profile: AudioProfile,
global_fade_in_ms: int = 0,
global_fade_out_ms: int = 0
) -> List[str]:
"""
构建音频混音命令
@@ -140,6 +174,8 @@ class PrepareJobAudioHandler(BaseHandler):
output_file: 输出文件路径
total_duration_sec: 总时长(秒)
audio_profile: 音频配置
global_fade_in_ms: 全局音频淡入时长(毫秒),0 不应用
global_fade_out_ms: 全局音频淡出时长(毫秒),0 不应用
Returns:
FFmpeg 命令参数列表
@@ -147,8 +183,23 @@ class PrepareJobAudioHandler(BaseHandler):
sample_rate = audio_profile.sample_rate
channels = audio_profile.channels
# 构建全局 afade 滤镜(作用于最终混合音频,在 amix 之后)
global_fade_filters = self._build_global_fade_filters(
total_duration_sec, global_fade_in_ms, global_fade_out_ms
)
# 情况1:无 BGM 也无叠加音效 -> 生成静音
if not bgm_file and not sfx_files:
if global_fade_filters:
return [
'ffmpeg', '-y', '-hide_banner',
'-f', 'lavfi',
'-i', f'anullsrc=r={sample_rate}:cl=stereo',
'-t', str(total_duration_sec),
'-af', ','.join(global_fade_filters),
'-c:a', 'aac', '-b:a', '128k',
output_file
]
return [
'ffmpeg', '-y', '-hide_banner',
'-f', 'lavfi',
@@ -160,10 +211,14 @@ class PrepareJobAudioHandler(BaseHandler):
# 情况2:仅 BGM,无叠加音效
if not sfx_files:
af_arg = []
if global_fade_filters:
af_arg = ['-af', ','.join(global_fade_filters)]
return [
'ffmpeg', '-y', '-hide_banner',
'-i', bgm_file,
'-t', str(total_duration_sec),
*af_arg,
'-c:a', 'aac', '-b:a', '128k',
'-ar', str(sample_rate), '-ac', str(channels),
output_file
@@ -233,10 +288,21 @@ class PrepareJobAudioHandler(BaseHandler):
# dropout_transition=0 表示输入结束时不做渐变
mix_inputs = "[bgm]" + "".join(sfx_labels)
num_inputs = 1 + len(sfx_files)
filter_parts.append(
f"{mix_inputs}amix=inputs={num_inputs}:duration=first:"
f"dropout_transition=0:normalize=0[out]"
)
# 如果有全局 fade,amix 输出到中间标签后再追加 afade;否则直接输出到 [out]
if global_fade_filters:
filter_parts.append(
f"{mix_inputs}amix=inputs={num_inputs}:duration=first:"
f"dropout_transition=0:normalize=0[mixed]"
)
filter_parts.append(
f"[mixed]{','.join(global_fade_filters)}[out]"
)
else:
filter_parts.append(
f"{mix_inputs}amix=inputs={num_inputs}:duration=first:"
f"dropout_transition=0:normalize=0[out]"
)
filter_complex = ';'.join(filter_parts)
@@ -249,3 +315,33 @@ class PrepareJobAudioHandler(BaseHandler):
]
return cmd
@staticmethod
def _build_global_fade_filters(
total_duration_sec: float,
global_fade_in_ms: int,
global_fade_out_ms: int
) -> List[str]:
"""
构建全局音频淡入/淡出滤镜列表
在 amix 混音输出之后追加,作用于最终混合音频。
与片段级 audioSpecJson 中的 fadeInMs/fadeOutMs(仅作用于单个叠加音效)独立。
Args:
total_duration_sec: 总时长(秒)
global_fade_in_ms: 全局淡入时长(毫秒),0 不应用
global_fade_out_ms: 全局淡出时长(毫秒),0 不应用
Returns:
afade 滤镜字符串列表,可能为空
"""
filters = []
if global_fade_in_ms > 0:
fade_in_sec = global_fade_in_ms / 1000.0
filters.append(f"afade=t=in:st=0:d={fade_in_sec}")
if global_fade_out_ms > 0:
fade_out_sec = global_fade_out_ms / 1000.0
fade_out_start = total_duration_sec - fade_out_sec
filters.append(f"afade=t=out:st={fade_out_start}:d={fade_out_sec}")
return filters

View File

@@ -1,9 +1,9 @@
# -*- coding: utf-8 -*-
"""
视频片段渲染处理器
渲染+TS封装处理器
处理 RENDER_SEGMENT_VIDEO 任务,将原素材渲染为符合输出规格的视频片段
支持转场 overlap 区域的帧冻结生成。
处理 RENDER_SEGMENT_TS 任务,将原素材渲染为视频并封装为 TS 分片
支持转场 overlap 区域的帧冻结生成和精确裁剪
"""
import os
@@ -26,21 +26,24 @@ def _get_extension_from_url(url: str) -> str:
return ext.lower() if ext else ''
class RenderSegmentVideoHandler(BaseHandler):
class RenderSegmentTsHandler(BaseHandler):
"""
视频片段渲染处理器
渲染+TS封装处理器
职责:
- 下载素材文件
- 下载 LUT 文件(如有)
- 下载叠加层(如有)
- 下载音频(如有)
- 构建 FFmpeg 渲染命令
- 执行渲染(支持帧冻结生成 overlap 区域)
- 裁剪 overlap 区域(如需要)
- 封装为 TS 分片
- 上传产物
"""
def get_supported_type(self) -> TaskType:
return TaskType.RENDER_SEGMENT_VIDEO
return TaskType.RENDER_SEGMENT_TS
def handle(self, task: Task) -> TaskResult:
"""处理视频渲染任务"""
@@ -85,13 +88,82 @@ class RenderSegmentVideoHandler(BaseHandler):
else:
input_file = os.path.join(work_dir, 'input.mp4')
# 2. 下载素材
if not self.download_file(material_url, input_file):
# 2. 构建并行下载任务(主素材 + 可选 LUT + 可选叠加层 + 可选音频)
audio_url = task.get_audio_url()
audio_file = None
lut_file = os.path.join(work_dir, 'lut.cube') if render_spec.lut_url else None
overlay_file = None
if render_spec.overlay_url:
# 根据 URL 后缀确定文件扩展名
overlay_url_lower = render_spec.overlay_url.lower()
if overlay_url_lower.endswith('.jpg') or overlay_url_lower.endswith('.jpeg'):
overlay_ext = '.jpg'
elif overlay_url_lower.endswith('.mov'):
overlay_ext = '.mov'
else:
overlay_ext = '.png'
overlay_file = os.path.join(work_dir, f'overlay{overlay_ext}')
download_jobs = [
{
'key': 'material',
'url': material_url,
'dest': input_file,
'required': True
}
]
if render_spec.lut_url and lut_file:
download_jobs.append({
'key': 'lut',
'url': render_spec.lut_url,
'dest': lut_file,
'required': False
})
if render_spec.overlay_url and overlay_file:
download_jobs.append({
'key': 'overlay',
'url': render_spec.overlay_url,
'dest': overlay_file,
'required': False
})
if audio_url:
audio_file = os.path.join(work_dir, 'audio.aac')
download_jobs.append({
'key': 'audio',
'url': audio_url,
'dest': audio_file,
'required': True
})
download_results = self.download_files_parallel(download_jobs)
material_result = download_results.get('material')
if not material_result or not material_result['success']:
return TaskResult.fail(
ErrorCode.E_INPUT_UNAVAILABLE,
f"Failed to download material: {material_url}"
)
if render_spec.lut_url:
lut_result = download_results.get('lut')
if not lut_result or not lut_result['success']:
logger.warning(f"[task:{task.task_id}] Failed to download LUT, continuing without it")
lut_file = None
if render_spec.overlay_url:
overlay_result = download_results.get('overlay')
if not overlay_result or not overlay_result['success']:
logger.warning(f"[task:{task.task_id}] Failed to download overlay, continuing without it")
overlay_file = None
if audio_url:
audio_dl = download_results.get('audio')
if not audio_dl or not audio_dl['success']:
return TaskResult.fail(
ErrorCode.E_INPUT_UNAVAILABLE,
f"Failed to download audio: {audio_url}"
)
# 3. 图片素材转换为视频
if is_image:
video_input_file = os.path.join(work_dir, 'input_video.mp4')
@@ -111,31 +183,7 @@ class RenderSegmentVideoHandler(BaseHandler):
input_file = video_input_file
logger.info(f"[task:{task.task_id}] Image converted to video successfully")
# 4. 下载 LUT(如有
lut_file = None
if render_spec.lut_url:
lut_file = os.path.join(work_dir, 'lut.cube')
if not self.download_file(render_spec.lut_url, lut_file):
logger.warning(f"[task:{task.task_id}] Failed to download LUT, continuing without it")
lut_file = None
# 5. 下载叠加层(如有)
overlay_file = None
if render_spec.overlay_url:
# 根据 URL 后缀确定文件扩展名
url_lower = render_spec.overlay_url.lower()
if url_lower.endswith('.jpg') or url_lower.endswith('.jpeg'):
ext = '.jpg'
elif url_lower.endswith('.mov'):
ext = '.mov'
else:
ext = '.png' # 默认
overlay_file = os.path.join(work_dir, f'overlay{ext}')
if not self.download_file(render_spec.overlay_url, overlay_file):
logger.warning(f"[task:{task.task_id}] Failed to download overlay, continuing without it")
overlay_file = None
# 6. 探测源视频时长(仅对视频素材)
# 4. 探测源视频时长(仅对视频素材
# 用于检测时长不足并通过冻结最后一帧补足
source_duration_sec = None
if not is_image:
@@ -158,13 +206,13 @@ class RenderSegmentVideoHandler(BaseHandler):
f"will freeze last frame for {shortage_sec:.2f}s"
)
# 7. 计算 overlap 时长(用于转场帧冻结)
# 5. 计算 overlap 时长(用于转场帧冻结)
# 头部 overlap: 来自前一片段的出场转场
overlap_head_ms = task.get_overlap_head_ms()
# 尾部 overlap: 当前片段的出场转场
overlap_tail_ms = task.get_overlap_tail_ms_v2()
# 8. 构建 FFmpeg 命令
# 6. 构建 FFmpeg 命令
output_file = os.path.join(work_dir, 'output.mp4')
cmd = self._build_command(
input_file=input_file,
@@ -179,41 +227,96 @@ class RenderSegmentVideoHandler(BaseHandler):
source_duration_sec=source_duration_sec
)
# 9. 执行 FFmpeg
# 7. 执行 FFmpeg
if not self.run_ffmpeg(cmd, task.task_id):
return TaskResult.fail(
ErrorCode.E_FFMPEG_FAILED,
"FFmpeg rendering failed"
)
# 10. 验证输出文件
# 8. 验证输出文件
if not self.ensure_file_exists(output_file, min_size=4096):
return TaskResult.fail(
ErrorCode.E_FFMPEG_FAILED,
"Output file is missing or too small"
)
# 11. 获取实际时长
actual_duration = self.probe_duration(output_file)
actual_duration_ms = int(actual_duration * 1000) if actual_duration else duration_ms
# 9. Overlap 裁剪(仅非转场分片、且有需要裁剪的 overlap 时)
is_transition_seg = task.is_transition_segment()
trim_head = task.should_trim_head()
trim_tail = task.should_trim_tail()
trim_head_ms = task.get_trim_head_ms()
trim_tail_ms = task.get_trim_tail_ms()
needs_video_trim = not is_transition_seg and (
(trim_head and trim_head_ms > 0) or
(trim_tail and trim_tail_ms > 0)
)
# 12. 上传产物
video_url = self.upload_file(task.task_id, 'video', output_file)
if not video_url:
return TaskResult.fail(
ErrorCode.E_UPLOAD_FAILED,
"Failed to upload video"
processed_video = output_file
if needs_video_trim:
processed_video = os.path.join(work_dir, 'trimmed_video.mp4')
trim_cmd = self._build_trim_command(
video_file=output_file,
output_file=processed_video,
trim_head_ms=trim_head_ms if trim_head else 0,
trim_tail_ms=trim_tail_ms if trim_tail else 0,
output_spec=output_spec
)
# 13. 构建结果(包含 overlap 信息)
result_data = {
'videoUrl': video_url,
'actualDurationMs': actual_duration_ms,
'overlapHeadMs': overlap_head_ms,
'overlapTailMs': overlap_tail_ms
}
logger.info(f"[task:{task.task_id}] Trimming video: head={trim_head_ms}ms, tail={trim_tail_ms}ms")
return TaskResult.ok(result_data)
if not self.run_ffmpeg(trim_cmd, task.task_id):
return TaskResult.fail(
ErrorCode.E_FFMPEG_FAILED,
"Video trim failed"
)
if not self.ensure_file_exists(processed_video, min_size=1024):
return TaskResult.fail(
ErrorCode.E_FFMPEG_FAILED,
"Trimmed video file is missing or too small"
)
# 10. 封装 TS
start_time_ms = task.get_start_time_ms()
start_sec = start_time_ms / 1000.0
duration_sec = duration_ms / 1000.0
ts_output = os.path.join(work_dir, 'segment.ts')
ts_cmd = self._build_ts_package_command(
video_file=processed_video,
audio_file=audio_file,
output_file=ts_output,
start_sec=start_sec,
duration_sec=duration_sec
)
if not self.run_ffmpeg(ts_cmd, task.task_id):
return TaskResult.fail(
ErrorCode.E_FFMPEG_FAILED,
"TS packaging failed"
)
if not self.ensure_file_exists(ts_output, min_size=1024):
return TaskResult.fail(
ErrorCode.E_FFMPEG_FAILED,
"TS output file is missing or too small"
)
# 11. 获取 EXTINF 时长 + 上传 TS
actual_duration = self.probe_duration(ts_output)
extinf_duration = actual_duration if actual_duration else duration_sec
ts_url = self.upload_file(task.task_id, 'ts', ts_output)
if not ts_url:
return TaskResult.fail(
ErrorCode.E_UPLOAD_FAILED,
"Failed to upload TS"
)
return TaskResult.ok({
'tsUrl': ts_url,
'extinfDurationSec': extinf_duration
})
except Exception as e:
logger.error(f"[task:{task.task_id}] Unexpected error: {e}", exc_info=True)
@@ -222,6 +325,43 @@ class RenderSegmentVideoHandler(BaseHandler):
finally:
self.cleanup_work_dir(work_dir)
@staticmethod
def _build_crop_filter(
render_spec: 'RenderSpec',
width: int,
height: int,
task_id: str = ''
) -> Optional[str]:
"""
构建裁切滤镜
crop_enable 时:以目标比例为基准,按 crop_scale 倍率裁切,crop_pos 控制位置(默认居中)。
Returns:
crop 滤镜字符串,无需裁切时返回 None
"""
if render_spec.crop_enable:
scale = render_spec.crop_scale
target_ratio = width / height
# 解析裁切位置,默认居中
fx, fy = 0.5, 0.5
if render_spec.crop_pos:
try:
fx, fy = map(float, render_spec.crop_pos.split(','))
except ValueError:
logger.warning(f"[task:{task_id}] Invalid crop position: {render_spec.crop_pos}, using center")
fx, fy = 0.5, 0.5
# 基准:源中最大的目标比例矩形,再除以倍率
return (
f"crop='min(iw,ih*{target_ratio})/{scale}':'min(ih,iw/{target_ratio})/{scale}':"
f"'(iw-min(iw,ih*{target_ratio})/{scale})*{fx}':"
f"'(ih-min(ih,iw/{target_ratio})/{scale})*{fy}'"
)
return None
def _convert_image_to_video(
self,
image_file: str,
@@ -270,23 +410,10 @@ class RenderSegmentVideoHandler(BaseHandler):
# 构建滤镜:缩放填充到目标尺寸
filters = []
# 裁切处理(与视频相同逻辑)
if render_spec.crop_enable and render_spec.face_pos:
try:
fx, fy = map(float, render_spec.face_pos.split(','))
target_ratio = width / height
filters.append(
f"crop='min(iw,ih*{target_ratio})':'min(ih,iw/{target_ratio})':"
f"'(iw-min(iw,ih*{target_ratio}))*{fx}':"
f"'(ih-min(ih,iw/{target_ratio}))*{fy}'"
)
except (ValueError, ZeroDivisionError):
logger.warning(f"[task:{task_id}] Invalid face position: {render_spec.face_pos}")
elif render_spec.zoom_cut:
target_ratio = width / height
filters.append(
f"crop='min(iw,ih*{target_ratio})':'min(ih,iw/{target_ratio})'"
)
# 裁切处理
crop_filter = self._build_crop_filter(render_spec, width, height, task_id)
if crop_filter:
filters.append(crop_filter)
# 缩放填充
filters.append(
@@ -324,6 +451,116 @@ class RenderSegmentVideoHandler(BaseHandler):
logger.info(f"[task:{task_id}] Converting image to video: {actual_duration_sec:.2f}s at {fps}fps")
return self.run_ffmpeg(cmd, task_id)
def _build_trim_command(
self,
video_file: str,
output_file: str,
trim_head_ms: int,
trim_tail_ms: int,
output_spec
) -> List[str]:
"""
构建视频精确裁剪命令(重编码方式)
使用 trim 滤镜进行精确帧级裁剪,而非 -ss/-t 参数的关键帧裁剪。
Args:
video_file: 输入视频路径
output_file: 输出视频路径
trim_head_ms: 头部裁剪时长(毫秒)
trim_tail_ms: 尾部裁剪时长(毫秒)
output_spec: 输出规格
Returns:
FFmpeg 命令参数列表
"""
original_duration = self.probe_duration(video_file)
if not original_duration:
original_duration = 10.0
trim_head_sec = trim_head_ms / 1000.0
trim_tail_sec = trim_tail_ms / 1000.0
start_time = trim_head_sec
end_time = original_duration - trim_tail_sec
vf_filter = f"trim=start={start_time}:end={end_time},setpts=PTS-STARTPTS"
cmd = [
'ffmpeg', '-y', '-hide_banner',
'-i', video_file,
'-vf', vf_filter,
]
cmd.extend(self.get_video_encode_args(maxrate=output_spec.bitrate))
fps = output_spec.fps
cmd.extend(['-r', str(fps)])
output_duration_sec = end_time - start_time
total_frames = int(output_duration_sec * fps)
if total_frames <= 1:
gop_size = 1
elif total_frames < fps:
gop_size = total_frames
else:
gop_size = fps
cmd.extend(['-g', str(gop_size)])
cmd.extend(['-keyint_min', str(min(gop_size, fps // 2 or 1))])
cmd.extend(['-force_key_frames', 'expr:eq(n,0)'])
cmd.append('-an')
cmd.append(output_file)
return cmd
def _build_ts_package_command(
self,
video_file: str,
audio_file: Optional[str],
output_file: str,
start_sec: float,
duration_sec: float
) -> List[str]:
"""
构建 TS 封装命令
将视频和对应时间区间的音频封装为 TS 分片。
视频使用 copy 模式(已经过渲染/裁剪)。
支持无音频模式(video-only TS)。
Args:
video_file: 视频文件路径(已处理)
audio_file: 音频文件路径(可选,None 时生成 video-only TS)
output_file: 输出文件路径
start_sec: 音频开始时间(秒)
duration_sec: 音频时长(秒)
Returns:
FFmpeg 命令参数列表
"""
cmd = [
'ffmpeg', '-y', '-hide_banner',
'-i', video_file,
]
if audio_file:
cmd.extend(['-ss', str(start_sec), '-t', str(duration_sec), '-i', audio_file])
cmd.extend(['-map', '0:v:0', '-map', '1:a:0', '-c:v', 'copy', '-c:a', 'copy'])
else:
cmd.extend(['-c:v', 'copy'])
cmd.extend([
'-output_ts_offset', str(start_sec),
'-muxdelay', '0',
'-muxpreload', '0',
'-f', 'mpegts',
output_file
])
return cmd
def _build_command(
self,
input_file: str,
@@ -391,7 +628,7 @@ class RenderSegmentVideoHandler(BaseHandler):
cmd.extend(['-vf', filters])
# 编码参数(根据硬件加速配置动态获取)
cmd.extend(self.get_video_encode_args())
cmd.extend(self.get_video_encode_args(maxrate=output_spec.bitrate))
# 帧率
fps = output_spec.fps
@@ -465,7 +702,10 @@ class RenderSegmentVideoHandler(BaseHandler):
# 解析 effects
effects = render_spec.get_effects()
has_camera_shot = any(e.effect_type == 'cameraShot' for e in effects)
has_complex_effect = any(
effect.effect_type in {'cameraShot', 'zoom'}
for effect in effects
)
# 硬件加速时需要先 hwdownload(将 GPU 表面下载到系统内存)
hwaccel_prefix = self.get_hwaccel_filter_prefix()
@@ -473,12 +713,23 @@ class RenderSegmentVideoHandler(BaseHandler):
# 去掉末尾的逗号,作为第一个滤镜
filters.append(hwaccel_prefix.rstrip(','))
# 1. 变速处理
# 1. 变速处理(合并 RenderSpec.speed 与 ospeed 效果)
speed = float(render_spec.speed) if render_spec.speed else 1.0
if speed != 1.0 and speed > 0:
# setpts 公式:PTS / speed
pts_factor = 1.0 / speed
filters.append(f"setpts={pts_factor}*PTS")
if speed <= 0:
speed = 1.0
ospeed_factor = 1.0
for effect in effects:
if effect.effect_type == 'ospeed':
ospeed_factor = effect.get_ospeed_params()
break
combined_pts_factor = (1.0 / speed) * ospeed_factor
# 统一归零视频起始时间戳,避免源素材非 0 起始 PTS 造成封装后首帧冻结
if combined_pts_factor != 1.0:
filters.append(f"setpts={combined_pts_factor}*(PTS-STARTPTS)")
else:
filters.append("setpts=PTS-STARTPTS")
# 2. LUT 调色
if lut_file:
@@ -487,26 +738,9 @@ class RenderSegmentVideoHandler(BaseHandler):
filters.append(f"lut3d='{lut_path}'")
# 3. 裁切处理
if render_spec.crop_enable and render_spec.face_pos:
# 根据人脸位置进行智能裁切
try:
fx, fy = map(float, render_spec.face_pos.split(','))
# 计算裁切区域(保持输出比例)
target_ratio = width / height
# 假设裁切到目标比例
filters.append(
f"crop='min(iw,ih*{target_ratio})':'min(ih,iw/{target_ratio})':"
f"'(iw-min(iw,ih*{target_ratio}))*{fx}':"
f"'(ih-min(ih,iw/{target_ratio}))*{fy}'"
)
except (ValueError, ZeroDivisionError):
logger.warning(f"Invalid face position: {render_spec.face_pos}")
elif render_spec.zoom_cut:
# 中心缩放裁切
target_ratio = width / height
filters.append(
f"crop='min(iw,ih*{target_ratio})':'min(ih,iw/{target_ratio})'"
)
crop_filter = self._build_crop_filter(render_spec, width, height)
if crop_filter:
filters.append(crop_filter)
# 4. 缩放和填充
scale_filter = (
@@ -515,9 +749,8 @@ class RenderSegmentVideoHandler(BaseHandler):
)
filters.append(scale_filter)
# 5. 特效处理(cameraShot 需要特殊处理
if has_camera_shot:
# cameraShot 需要使用 filter_complex 格式
# 5. 特效处理(cameraShot / zoom 需要使用 filter_complex
if has_complex_effect:
return self._build_filter_complex_with_effects(
base_filters=filters,
effects=effects,
@@ -541,15 +774,13 @@ class RenderSegmentVideoHandler(BaseHandler):
# 计算是否需要额外的尾部冻结(源视频时长不足)
extra_tail_freeze_sec = 0.0
if source_duration_sec is not None:
speed = float(render_spec.speed) if render_spec.speed else 1.0
if speed > 0:
# 计算变速后的有效时长
effective_duration_sec = source_duration_sec / speed
required_duration_sec = duration_ms / 1000.0
# 使用已计算的 combined_pts_factor
effective_duration_sec = source_duration_sec * combined_pts_factor
required_duration_sec = duration_ms / 1000.0
# 如果源视频时长不足,需要冻结最后一帧来补足
if effective_duration_sec < required_duration_sec:
extra_tail_freeze_sec = required_duration_sec - effective_duration_sec
# 如果源视频时长不足,需要冻结最后一帧来补足
if effective_duration_sec < required_duration_sec:
extra_tail_freeze_sec = required_duration_sec - effective_duration_sec
if overlap_head_ms > 0:
# 头部冻结:将第一帧冻结指定时长
@@ -598,7 +829,7 @@ class RenderSegmentVideoHandler(BaseHandler):
"""
构建包含特效的 filter_complex 滤镜图
cameraShot 效果需要使用 split/freezeframes/concat 滤镜组合
cameraShot / zoom 效果都在此处统一处理并按 effects 顺序叠加
Args:
base_filters: 基础滤镜列表
@@ -669,6 +900,31 @@ class RenderSegmentVideoHandler(BaseHandler):
f"{frozen_out}{rest_out}concat=n=2:v=1:a=0{effect_output}"
)
current_output = effect_output
effect_idx += 1
elif effect.effect_type == 'zoom':
start_sec, scale_factor, duration_sec = effect.get_zoom_params()
if start_sec < 0 or scale_factor <= 1.0 or duration_sec <= 0:
continue
zoom_end_sec = start_sec + duration_sec
base_out = f'[eff{effect_idx}_base]'
zoom_source_out = f'[eff{effect_idx}_zoom_src]'
zoom_scaled_out = f'[eff{effect_idx}_zoom_scaled]'
effect_output = f'[v_eff{effect_idx}]'
zoom_enable = f"'between(t,{start_sec},{zoom_end_sec})'"
filter_parts.append(
f"{current_output}split=2{base_out}{zoom_source_out}"
)
filter_parts.append(
f"{zoom_source_out}scale=iw*{scale_factor}:ih*{scale_factor},"
f"crop={width}:{height}:(in_w-{width})/2:(in_h-{height})/2{zoom_scaled_out}"
)
filter_parts.append(
f"{base_out}{zoom_scaled_out}overlay=0:0:enable={zoom_enable}{effect_output}"
)
current_output = effect_output
effect_idx += 1
@@ -679,14 +935,20 @@ class RenderSegmentVideoHandler(BaseHandler):
extra_tail_freeze_sec = 0.0
if source_duration_sec is not None and render_spec is not None and duration_ms > 0:
speed = float(render_spec.speed) if render_spec.speed else 1.0
if speed > 0:
# 计算变速后的有效时长
effective_duration_sec = source_duration_sec / speed
required_duration_sec = duration_ms / 1000.0
if speed <= 0:
speed = 1.0
ospeed_factor = 1.0
for effect in effects:
if effect.effect_type == 'ospeed':
ospeed_factor = effect.get_ospeed_params()
break
combined_pts_factor = (1.0 / speed) * ospeed_factor
effective_duration_sec = source_duration_sec * combined_pts_factor
required_duration_sec = duration_ms / 1000.0
# 如果源视频时长不足,需要冻结最后一帧来补足
if effective_duration_sec < required_duration_sec:
extra_tail_freeze_sec = required_duration_sec - effective_duration_sec
# 如果源视频时长不足,需要冻结最后一帧来补足
if effective_duration_sec < required_duration_sec:
extra_tail_freeze_sec = required_duration_sec - effective_duration_sec
if overlap_head_ms > 0:
head_duration_sec = overlap_head_ms / 1000.0

View File

@@ -4,9 +4,8 @@
RenderWorker v2 入口
支持 v2 API 协议的渲染 Worker,处理以下任务类型:
- RENDER_SEGMENT_VIDEO: 渲染视频片段
- RENDER_SEGMENT_TS: 渲染视频片段并封装为 TS
- PREPARE_JOB_AUDIO: 生成全局音频
- PACKAGE_SEGMENT_TS: 封装 TS 分片
- FINALIZE_MP4: 产出最终 MP4
使用方法:
@@ -34,6 +33,7 @@ from domain.config import WorkerConfig
from services.api_client import APIClientV2
from services.task_executor import TaskExecutor
from constant import SOFTWARE_VERSION
from util.tracing import initialize_tracing, shutdown_tracing
# 日志配置
def setup_logging():
@@ -45,7 +45,8 @@ def setup_logging():
# 获取根logger
root_logger = logging.getLogger()
root_logger.setLevel(logging.INFO)
# 允许 DEBUG 日志流入各 handler(具体是否落盘由 handler 级别决定)
root_logger.setLevel(logging.DEBUG)
# 清除已有的handlers(避免重复)
root_logger.handlers.clear()
@@ -56,8 +57,12 @@ def setup_logging():
console_handler.setFormatter(formatter)
root_logger.addHandler(console_handler)
# 确日志文件所在目录存在
log_dir = os.path.dirname(os.path.abspath(__file__))
# 确日志目录:PyInstaller 打包后 __file__ 指向临时解压目录,日志会随之丢失
# 使用 sys.frozen 判断是否为打包环境,打包后取 exe 所在目录
if getattr(sys, 'frozen', False):
log_dir = os.path.dirname(sys.executable)
else:
log_dir = os.path.dirname(os.path.abspath(__file__))
# 2. 所有日志文件handler(all_log.log)
all_log_path = os.path.join(log_dir, 'all_log.log')
@@ -109,6 +114,9 @@ class WorkerV2:
logger.error(f"Configuration error: {e}")
sys.exit(1)
tracing_enabled = initialize_tracing(self.config.worker_id, SOFTWARE_VERSION)
logger.info("OTel tracing %s", "enabled" if tracing_enabled else "disabled")
# 初始化 API 客户端
self.api_client = APIClientV2(self.config)
@@ -208,6 +216,7 @@ class WorkerV2:
# 关闭 API 客户端
self.api_client.close()
shutdown_tracing()
logger.info("Worker stopped")

View File

@@ -10,10 +10,14 @@ import subprocess
import time
import requests
from typing import Dict, List, Optional, Any
from urllib.parse import urlparse
from opentelemetry.trace import SpanKind, Status, StatusCode
from domain.task import Task
from domain.config import WorkerConfig
from util.system import get_hw_accel_info_str
from util.tracing import inject_trace_headers, mark_span_error, start_span
logger = logging.getLogger(__name__)
@@ -55,6 +59,45 @@ class APIClientV2:
'Accept': 'application/json'
})
def _request_with_trace(
self,
method: str,
url: str,
*,
task_id: Optional[str] = None,
span_name: str = "",
**kwargs: Any,
) -> requests.Response:
request_kwargs = dict(kwargs)
headers = request_kwargs.pop("headers", None)
if task_id:
request_kwargs["headers"] = inject_trace_headers(headers)
elif headers:
request_kwargs["headers"] = headers
parsed_url = urlparse(url)
attributes = {
"http.request.method": method.upper(),
"url.path": parsed_url.path,
"server.address": parsed_url.hostname or "",
}
if parsed_url.port:
attributes["server.port"] = parsed_url.port
name = span_name or f"render.api.{method.lower()}"
with start_span(name, task_id=task_id, kind=SpanKind.CLIENT, attributes=attributes) as span:
try:
response = self.session.request(method=method, url=url, **request_kwargs)
except Exception as exc:
mark_span_error(span, str(exc), "HTTP_REQUEST_ERROR")
raise
if span is not None:
span.set_attribute("http.response.status_code", response.status_code)
if response.status_code >= 400:
span.set_status(Status(StatusCode.ERROR, f"HTTP {response.status_code}"))
return response
def sync(self, current_task_ids: List[str]) -> List[Task]:
"""
心跳同步并拉取任务
@@ -128,10 +171,13 @@ class APIClientV2:
url = f"{self.base_url}/render/v2/task/{task_id}/start"
try:
resp = self.session.post(
url,
resp = self._request_with_trace(
method="POST",
url=url,
task_id=task_id,
span_name="render.task.api.report_start",
json={'workerId': self.worker_id},
timeout=10
timeout=10,
)
if resp.status_code == 200:
logger.debug(f"[task:{task_id}] Start reported")
@@ -157,13 +203,16 @@ class APIClientV2:
url = f"{self.base_url}/render/v2/task/{task_id}/success"
try:
resp = self.session.post(
url,
resp = self._request_with_trace(
method="POST",
url=url,
task_id=task_id,
span_name="render.task.api.report_success",
json={
'workerId': self.worker_id,
'result': result
},
timeout=10
timeout=10,
)
if resp.status_code == 200:
logger.debug(f"[task:{task_id}] Success reported")
@@ -190,14 +239,17 @@ class APIClientV2:
url = f"{self.base_url}/render/v2/task/{task_id}/fail"
try:
resp = self.session.post(
url,
resp = self._request_with_trace(
method="POST",
url=url,
task_id=task_id,
span_name="render.task.api.report_fail",
json={
'workerId': self.worker_id,
'errorCode': error_code,
'errorMessage': error_message[:1000] # 限制长度
},
timeout=10
timeout=10,
)
if resp.status_code == 200:
logger.debug(f"[task:{task_id}] Failure reported")
@@ -228,7 +280,14 @@ class APIClientV2:
payload['fileName'] = file_name
try:
resp = self.session.post(url, json=payload, timeout=10)
resp = self._request_with_trace(
method="POST",
url=url,
task_id=task_id,
span_name="render.task.api.get_upload_url",
json=payload,
timeout=10,
)
if resp.status_code == 200:
data = resp.json()
if data.get('code') == 200:
@@ -256,13 +315,16 @@ class APIClientV2:
url = f"{self.base_url}/render/v2/task/{task_id}/extend-lease"
try:
resp = self.session.post(
url,
resp = self._request_with_trace(
method="POST",
url=url,
task_id=task_id,
span_name="render.task.api.extend_lease",
params={
'workerId': self.worker_id,
'extension': extension
},
timeout=10
timeout=10,
)
if resp.status_code == 200:
logger.debug(f"[task:{task_id}] Lease extended by {extension}s")
@@ -287,7 +349,13 @@ class APIClientV2:
url = f"{self.base_url}/render/v2/task/{task_id}"
try:
resp = self.session.get(url, timeout=10)
resp = self._request_with_trace(
method="GET",
url=url,
task_id=task_id,
span_name="render.task.api.get_task_info",
timeout=10,
)
if resp.status_code == 200:
data = resp.json()
if data.get('code') == 200:

View File

@@ -12,7 +12,7 @@ import logging
import shutil
import time
import uuid
from typing import Optional, Tuple
from typing import Any, Dict, Optional, Tuple
from urllib.parse import urlparse, unquote
import psutil
@@ -66,6 +66,7 @@ class MaterialCache:
LOCK_TIMEOUT_SEC = 30.0
LOCK_POLL_INTERVAL_SEC = 0.1
LOCK_STALE_SECONDS = 24 * 60 * 60
DOWNLOAD_LOCK_TIMEOUT_SEC = 5.0
def __init__(self, cache_dir: str, enabled: bool = True, max_size_gb: float = 0):
"""
@@ -194,13 +195,14 @@ class MaterialCache:
logger.warning(f"Cache lock remove error: {e}")
return False
def _acquire_lock(self, cache_key: str) -> Optional[str]:
def _acquire_lock(self, cache_key: str, timeout_sec: Optional[float] = None) -> Optional[str]:
"""获取缓存锁(跨进程安全)"""
if not self.enabled:
return None
wait_timeout_sec = self.LOCK_TIMEOUT_SEC if timeout_sec is None else max(float(timeout_sec), 0.0)
lock_path = self._get_lock_path(cache_key)
deadline = time.monotonic() + self.LOCK_TIMEOUT_SEC
deadline = time.monotonic() + wait_timeout_sec
while True:
try:
@@ -214,13 +216,24 @@ class MaterialCache:
if removed:
continue
if time.monotonic() >= deadline:
logger.warning(f"Cache lock timeout: {lock_path}")
logger.warning(f"Cache lock timeout ({wait_timeout_sec:.1f}s): {lock_path}")
return None
time.sleep(self.LOCK_POLL_INTERVAL_SEC)
except Exception as e:
logger.warning(f"Cache lock error: {e}")
return None
def _acquire_lock_with_wait(
self,
cache_key: str,
timeout_sec: Optional[float] = None
) -> Tuple[Optional[str], int]:
"""获取缓存锁并返回等待时长(毫秒)"""
start_time = time.monotonic()
lock_path = self._acquire_lock(cache_key, timeout_sec=timeout_sec)
lock_wait_ms = max(int((time.monotonic() - start_time) * 1000), 0)
return lock_path, lock_wait_ms
def _release_lock(self, lock_path: Optional[str]) -> None:
"""释放缓存锁"""
if not lock_path:
@@ -244,6 +257,27 @@ class MaterialCache:
exists = os.path.exists(cache_path) and os.path.getsize(cache_path) > 0
return exists, cache_path
def _is_cache_file_ready(self, cache_path: str) -> bool:
"""缓存文件是否已就绪(存在且大小大于 0)"""
try:
return os.path.exists(cache_path) and os.path.getsize(cache_path) > 0
except Exception:
return False
def _copy_cache_to_dest(self, cache_path: str, dest: str) -> Tuple[bool, int]:
"""将缓存文件复制到目标路径并返回结果与文件大小"""
try:
shutil.copy2(cache_path, dest)
try:
os.utime(cache_path, None)
except Exception as e:
logger.debug(f"Failed to update cache access time: {e}")
file_size = os.path.getsize(dest) if os.path.exists(dest) else 0
return True, file_size
except Exception as e:
logger.warning(f"Failed to copy from cache: {e}")
return False, 0
def get_or_download(
self,
url: str,
@@ -251,8 +285,24 @@ class MaterialCache:
timeout: int = 300,
max_retries: int = 5
) -> bool:
"""兼容旧接口:返回下载是否成功。"""
result, _ = self.get_or_download_with_metrics(
url=url,
dest=dest,
timeout=timeout,
max_retries=max_retries,
)
return result
def get_or_download_with_metrics(
self,
url: str,
dest: str,
timeout: int = 300,
max_retries: int = 5
) -> Tuple[bool, Dict[str, Any]]:
"""
从缓存获取素材,若未缓存则下载并缓存
从缓存获取素材,若未缓存则下载并缓存,并返回关键指标。
Args:
url: 素材 URL
@@ -261,8 +311,14 @@ class MaterialCache:
max_retries: 最大重试次数
Returns:
是否成功
(是否成功, 指标字典)
"""
metrics: Dict[str, Any] = {
"lock_wait_ms": 0,
"lock_acquired": False,
"cache_path_used": "unknown",
}
# 确保目标目录存在
dest_dir = os.path.dirname(dest)
if dest_dir:
@@ -270,34 +326,49 @@ class MaterialCache:
# 缓存未启用时直接下载
if not self.enabled:
return storage.download_file(url, dest, max_retries=max_retries, timeout=timeout)
result = storage.download_file(url, dest, max_retries=max_retries, timeout=timeout)
metrics["cache_path_used"] = "direct"
return result, metrics
cache_key = _extract_cache_key(url)
lock_path = self._acquire_lock(cache_key)
cache_path = self.get_cache_path(url)
def _try_serve_from_cache(log_prefix: str, delete_on_failure: bool = False) -> bool:
if not self._is_cache_file_ready(cache_path):
return False
copied, file_size = self._copy_cache_to_dest(cache_path, dest)
if copied:
metrics["cache_path_used"] = "cache"
logger.info(f"{log_prefix}: {url[:80]}... -> {dest} ({file_size} bytes)")
return True
if delete_on_failure:
try:
os.remove(cache_path)
except Exception:
pass
return False
if _try_serve_from_cache("Cache hit"):
return True, metrics
lock_path, lock_wait_ms = self._acquire_lock_with_wait(
cache_key,
timeout_sec=self.DOWNLOAD_LOCK_TIMEOUT_SEC,
)
metrics["lock_wait_ms"] = lock_wait_ms
if not lock_path:
if _try_serve_from_cache("Cache hit after lock timeout"):
return True, metrics
logger.warning(f"Cache lock unavailable, downloading without cache: {url[:80]}...")
return storage.download_file(url, dest, max_retries=max_retries, timeout=timeout)
result = storage.download_file(url, dest, max_retries=max_retries, timeout=timeout)
metrics["cache_path_used"] = "direct"
return result, metrics
metrics["lock_acquired"] = True
try:
cache_path = self.get_cache_path(url)
cached = os.path.exists(cache_path) and os.path.getsize(cache_path) > 0
if cached:
# 命中缓存,复制到目标路径
try:
shutil.copy2(cache_path, dest)
# 更新访问时间(用于 LRU 清理)
os.utime(cache_path, None)
file_size = os.path.getsize(dest)
logger.info(f"Cache hit: {url[:80]}... -> {dest} ({file_size} bytes)")
return True
except Exception as e:
logger.warning(f"Failed to copy from cache: {e}, will re-download")
# 缓存复制失败,删除可能损坏的缓存文件
try:
os.remove(cache_path)
except Exception:
pass
if _try_serve_from_cache("Cache hit", delete_on_failure=True):
return True, metrics
# 未命中缓存,下载到缓存目录
logger.debug(f"Cache miss: {url[:80]}...")
@@ -312,26 +383,25 @@ class MaterialCache:
# 下载失败,清理临时文件
if os.path.exists(temp_cache_path):
os.remove(temp_cache_path)
return False
return False, metrics
if not os.path.exists(temp_cache_path) or os.path.getsize(temp_cache_path) <= 0:
if os.path.exists(temp_cache_path):
os.remove(temp_cache_path)
return False
return False, metrics
# 下载成功,原子替换缓存文件
os.replace(temp_cache_path, cache_path)
# 复制到目标路径
shutil.copy2(cache_path, dest)
file_size = os.path.getsize(dest)
logger.info(f"Downloaded and cached: {url[:80]}... ({file_size} bytes)")
if not _try_serve_from_cache("Downloaded and cached", delete_on_failure=False):
return False, metrics
# 检查是否需要清理缓存
if self.max_size_bytes > 0:
self._cleanup_if_needed()
return True
return True, metrics
except Exception as e:
logger.error(f"Cache download error: {e}")
@@ -341,7 +411,7 @@ class MaterialCache:
os.remove(temp_cache_path)
except Exception:
pass
return False
return False, metrics
finally:
self._release_lock(lock_path)

View File

@@ -61,11 +61,14 @@ class GPUScheduler:
configured_devices = self._config.gpu_devices
if configured_devices:
# 使用配置指定的设备
if self._config.hw_accel == HW_ACCEL_QSV:
# QSV 使用 Intel 核显,无 nvidia-smi,直接按配置或默认设备初始化
self._devices = self._init_qsv_devices(configured_devices)
elif configured_devices:
# CUDA:使用配置指定的设备并通过 nvidia-smi 验证
self._devices = self._validate_configured_devices(configured_devices)
else:
# 自动检测所有设备
# CUDA:自动检测所有 NVIDIA 设备
self._devices = self._auto_detect_devices()
if self._devices:
@@ -75,6 +78,25 @@ class GPUScheduler:
else:
logger.warning("No GPU devices available, scheduler disabled")
def _init_qsv_devices(self, configured_indices: List[int]) -> List[GPUDevice]:
"""
初始化 QSV 设备列表
QSV 使用 Intel 核显,没有 nvidia-smi 可用于检测。
若配置了 GPU_DEVICES 则直接信任配置,否则使用默认设备 0。
Args:
configured_indices: 配置的设备索引列表(可为空)
Returns:
QSV 设备列表
"""
indices = configured_indices if configured_indices else [0]
return [
GPUDevice(index=idx, name=f"QSV-{idx}", available=True)
for idx in indices
]
def _validate_configured_devices(self, indices: List[int]) -> List[GPUDevice]:
"""
验证配置的设备列表

View File

@@ -8,10 +8,13 @@
import logging
import threading
import time
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Any, Optional
if TYPE_CHECKING:
from services.api_client import APIClientV2
from util.tracing import TaskTraceContext
from util.tracing import bind_trace_context, start_span
logger = logging.getLogger(__name__)
@@ -29,7 +32,9 @@ class LeaseService:
api_client: 'APIClientV2',
task_id: str,
interval: int = 60,
extension: int = 300
extension: int = 300,
parent_otel_context: Any = None,
task_trace_context: Optional['TaskTraceContext'] = None,
):
"""
初始化租约服务
@@ -44,6 +49,8 @@ class LeaseService:
self.task_id = task_id
self.interval = interval
self.extension = extension
self.parent_otel_context = parent_otel_context
self.task_trace_context = task_trace_context
self.running = False
self.thread: threading.Thread = None
self._stop_event = threading.Event()
@@ -79,25 +86,29 @@ class LeaseService:
def _run(self):
"""续期线程主循环"""
while self.running:
# 等待指定间隔或收到停止信号
if self._stop_event.wait(timeout=self.interval):
# 收到停止信号
break
with bind_trace_context(self.parent_otel_context, self.task_trace_context):
while self.running:
if self._stop_event.wait(timeout=self.interval):
break
if self.running:
self._extend_lease()
if self.running:
self._extend_lease()
def _extend_lease(self):
"""执行租约续期"""
try:
success = self.api_client.extend_lease(self.task_id, self.extension)
if success:
logger.debug(f"[task:{self.task_id}] Lease extended by {self.extension}s")
else:
logger.warning(f"[task:{self.task_id}] Failed to extend lease")
except Exception as e:
logger.warning(f"[task:{self.task_id}] Lease extension error: {e}")
with start_span(
"render.task.lease.extend",
task_id=self.task_id,
attributes={"render.lease.extension_seconds": self.extension},
):
try:
success = self.api_client.extend_lease(self.task_id, self.extension)
if success:
logger.debug(f"[task:{self.task_id}] Lease extended by {self.extension}s")
else:
logger.warning(f"[task:{self.task_id}] Failed to extend lease")
except Exception as e:
logger.warning(f"[task:{self.task_id}] Lease extension error: {e}")
def __enter__(self):
"""上下文管理器入口"""

View File

@@ -8,7 +8,7 @@
import os
import logging
import subprocess
from typing import Optional
from typing import Any, Dict, Optional, Tuple
from urllib.parse import unquote
import requests
@@ -65,6 +65,22 @@ def _apply_http_replace_map(url: str) -> str:
def upload_file(url: str, file_path: str, max_retries: int = 5, timeout: int = 60) -> bool:
"""兼容旧接口:仅返回上传是否成功。"""
result, _ = upload_file_with_metrics(
url=url,
file_path=file_path,
max_retries=max_retries,
timeout=timeout,
)
return result
def upload_file_with_metrics(
url: str,
file_path: str,
max_retries: int = 5,
timeout: int = 60
) -> Tuple[bool, Dict[str, Any]]:
"""
使用签名 URL 上传文件到 OSS
@@ -75,30 +91,54 @@ def upload_file(url: str, file_path: str, max_retries: int = 5, timeout: int = 6
timeout: 超时时间(秒)
Returns:
是否成功
(是否成功, 上传指标)
"""
metrics: Dict[str, Any] = {
"upload_method": "none",
"file_size_bytes": 0,
"content_type": "",
"http_attempts": 0,
"http_retry_count": 0,
"http_status_code": 0,
"http_replace_applied": False,
"rclone_attempted": False,
"rclone_succeeded": False,
"rclone_fallback_http": False,
"error_type": "",
}
if not os.path.exists(file_path):
logger.error(f"File not found: {file_path}")
return False
metrics["error_type"] = "file_not_found"
return False, metrics
file_size = os.path.getsize(file_path)
metrics["file_size_bytes"] = file_size
logger.info(f"Uploading: {file_path} ({file_size} bytes)")
# 检查是否使用 rclone 上传
if os.getenv("UPLOAD_METHOD") == "rclone":
logger.info(f"Uploading to: {url}")
metrics["rclone_attempted"] = True
logger.debug(f"Uploading to: {url}")
result = _upload_with_rclone(url, file_path)
metrics["rclone_succeeded"] = result
if result:
return True
metrics["upload_method"] = "rclone"
return True, metrics
# rclone 失败时回退到 HTTP
metrics["rclone_fallback_http"] = True
# 应用 HTTP_REPLACE_MAP 替换 URL
http_url = _apply_http_replace_map(url)
metrics["http_replace_applied"] = http_url != url
content_type = _get_content_type(file_path)
logger.info(f"Uploading to: {http_url} (Content-Type: {content_type})")
metrics["content_type"] = content_type
metrics["upload_method"] = "rclone_fallback_http" if metrics["rclone_fallback_http"] else "http"
logger.debug(f"Uploading to: {http_url} (Content-Type: {content_type})")
retries = 0
while retries < max_retries:
metrics["http_attempts"] = retries + 1
try:
with open(file_path, 'rb') as f:
with requests.put(
@@ -108,19 +148,30 @@ def upload_file(url: str, file_path: str, max_retries: int = 5, timeout: int = 6
timeout=timeout,
headers={"Content-Type": content_type}
) as response:
status_code = int(getattr(response, 'status_code', 0) or 0)
metrics["http_status_code"] = status_code
response.raise_for_status()
logger.info(f"Upload succeeded: {file_path}")
return True
metrics["error_type"] = ""
return True, metrics
except requests.exceptions.Timeout:
retries += 1
metrics["http_retry_count"] = retries
metrics["error_type"] = "timeout"
logger.warning(f"Upload timed out. Retrying {retries}/{max_retries}...")
except requests.exceptions.RequestException as e:
retries += 1
metrics["http_retry_count"] = retries
metrics["error_type"] = "request_exception"
response_obj = getattr(e, 'response', None)
status_code = getattr(response_obj, 'status_code', 0) if response_obj is not None else 0
if isinstance(status_code, int) and status_code > 0:
metrics["http_status_code"] = status_code
logger.warning(f"Upload failed ({e}). Retrying {retries}/{max_retries}...")
logger.error(f"Upload failed after {max_retries} retries: {file_path}")
return False
return False, metrics
def _upload_with_rclone(url: str, file_path: str) -> bool:
@@ -151,6 +202,11 @@ def _upload_with_rclone(url: str, file_path: str) -> bool:
if new_url == url:
return False
if new_url.startswith(("http://", "https://")):
logger.warning("rclone upload skipped: URL still starts with http after replace")
logger.debug(f"rclone upload skipped address: {new_url}")
return False
cmd = [
"rclone",
"copyto",
@@ -203,7 +259,7 @@ def download_file(
logger.debug(f"File exists, skipping download: {file_path}")
return True
logger.info(f"Downloading: {url}")
logger.debug(f"Downloading: {url}")
# 确保目标目录存在
file_dir = os.path.dirname(file_path)
@@ -235,5 +291,6 @@ def download_file(
retries += 1
logger.warning(f"Download failed ({e}). Retrying {retries}/{max_retries}...")
logger.error(f"Download failed after {max_retries} retries: {url}")
logger.error(f"Download failed after {max_retries} retries")
logger.debug(f"Download failed source address: {url}")
return False

View File

@@ -11,17 +11,23 @@ from concurrent.futures import ThreadPoolExecutor, Future
from typing import Dict, Optional, TYPE_CHECKING
from domain.task import Task, TaskType
from domain.result import TaskResult, ErrorCode
# 需要 GPU 加速的任务类型
GPU_REQUIRED_TASK_TYPES = {
TaskType.RENDER_SEGMENT_VIDEO,
TaskType.RENDER_SEGMENT_TS,
TaskType.COMPOSE_TRANSITION,
}
from domain.config import WorkerConfig
from core.handler import TaskHandler
from services.lease_service import LeaseService
from services.gpu_scheduler import GPUScheduler
from util.tracing import (
capture_otel_context,
get_current_task_context,
mark_span_error,
start_span,
task_trace_scope,
)
if TYPE_CHECKING:
from services.api_client import APIClientV2
@@ -79,17 +85,15 @@ class TaskExecutor:
def _register_handlers(self):
"""注册所有任务处理器"""
# 延迟导入以避免循环依赖
from handlers.render_video import RenderSegmentVideoHandler
from handlers.render_video import RenderSegmentTsHandler
from handlers.compose_transition import ComposeTransitionHandler
from handlers.prepare_audio import PrepareJobAudioHandler
from handlers.package_ts import PackageSegmentTsHandler
from handlers.finalize_mp4 import FinalizeMp4Handler
handlers = [
RenderSegmentVideoHandler(self.config, self.api_client),
RenderSegmentTsHandler(self.config, self.api_client),
ComposeTransitionHandler(self.config, self.api_client),
PrepareJobAudioHandler(self.config, self.api_client),
PackageSegmentTsHandler(self.config, self.api_client),
FinalizeMp4Handler(self.config, self.api_client),
]
@@ -174,77 +178,84 @@ class TaskExecutor:
task: 任务实体
"""
task_id = task.task_id
logger.info(f"[task:{task_id}] Starting {task.task_type.value}")
# 启动租约续期服务
lease_service = LeaseService(
self.api_client,
task_id,
interval=self.config.lease_extension_threshold,
extension=self.config.lease_extension_duration
)
lease_service.start()
# 获取 GPU 设备(仅对需要 GPU 的任务类型)
device_index = None
needs_gpu = task.task_type in GPU_REQUIRED_TASK_TYPES
if needs_gpu and self.gpu_scheduler.enabled:
device_index = self.gpu_scheduler.acquire()
if device_index is not None:
logger.info(f"[task:{task_id}] Assigned to GPU device {device_index}")
# 获取处理器(需要在设置 GPU 设备前获取)
handler = self.handlers.get(task.task_type)
device_index = None
lease_service = None
try:
# 报告任务开始
self.api_client.report_start(task_id)
with task_trace_scope(task, span_name="render.task.execute") as task_span:
logger.info(f"[task:{task_id}] Starting {task.task_type.value}")
if not handler:
raise ValueError(f"No handler for task type: {task.task_type}")
lease_service = LeaseService(
self.api_client,
task_id,
interval=self.config.lease_extension_threshold,
extension=self.config.lease_extension_duration,
parent_otel_context=capture_otel_context(),
task_trace_context=get_current_task_context(),
)
with start_span("render.task.lease.start"):
lease_service.start()
# 设置 GPU 设备(线程本地存储)
if device_index is not None:
handler.set_gpu_device(device_index)
needs_gpu = task.task_type in GPU_REQUIRED_TASK_TYPES
if needs_gpu and self.gpu_scheduler.enabled:
with start_span("render.task.gpu.acquire"):
device_index = self.gpu_scheduler.acquire()
if device_index is not None:
logger.info(f"[task:{task_id}] Assigned to GPU device {device_index}")
# 执行前钩子
handler.before_handle(task)
try:
with start_span("render.task.report.start"):
self.api_client.report_start(task_id)
# 执行任务
result = handler.handle(task)
if not handler:
raise ValueError(f"No handler for task type: {task.task_type}")
# 执行后钩子
handler.after_handle(task, result)
if device_index is not None:
handler.set_gpu_device(device_index)
# 上报结果
if result.success:
self.api_client.report_success(task_id, result.data)
logger.info(f"[task:{task_id}] Completed successfully")
else:
error_code = result.error_code.value if result.error_code else 'E_UNKNOWN'
self.api_client.report_fail(task_id, error_code, result.error_message or '')
logger.error(f"[task:{task_id}] Failed: {result.error_message}")
with start_span("render.task.handler.before"):
handler.before_handle(task)
except Exception as e:
logger.error(f"[task:{task_id}] Exception: {e}", exc_info=True)
self.api_client.report_fail(task_id, 'E_UNKNOWN', str(e))
with start_span("render.task.handler.execute"):
result = handler.handle(task)
finally:
# 清除 GPU 设备设置
if handler:
handler.clear_gpu_device()
with start_span("render.task.handler.after"):
handler.after_handle(task, result)
# 释放 GPU 设备(仅当实际分配了设备时)
if device_index is not None:
self.gpu_scheduler.release(device_index)
if result.success:
with start_span("render.task.report.success"):
self.api_client.report_success(task_id, result.data)
if task_span is not None:
task_span.set_attribute("render.task.result", "success")
logger.info(f"[task:{task_id}] Completed successfully")
else:
error_code = result.error_code.value if result.error_code else 'E_UNKNOWN'
with start_span("render.task.report.fail"):
self.api_client.report_fail(task_id, error_code, result.error_message or '')
mark_span_error(task_span, result.error_message or "task failed", error_code)
logger.error(f"[task:{task_id}] Failed: {result.error_message}")
# 停止租约续期
lease_service.stop()
except Exception as e:
mark_span_error(task_span, str(e), "E_UNKNOWN")
logger.error(f"[task:{task_id}] Exception: {e}", exc_info=True)
with start_span("render.task.report.exception"):
self.api_client.report_fail(task_id, 'E_UNKNOWN', str(e))
# 从当前任务中移除
with self.lock:
self.current_tasks.pop(task_id, None)
self.current_futures.pop(task_id, None)
finally:
if handler:
handler.clear_gpu_device()
if device_index is not None:
with start_span("render.task.gpu.release"):
self.gpu_scheduler.release(device_index)
if lease_service is not None:
with start_span("render.task.lease.stop"):
lease_service.stop()
with self.lock:
self.current_tasks.pop(task_id, None)
self.current_futures.pop(task_id, None)
def shutdown(self, wait: bool = True):
"""

View File

@@ -0,0 +1,238 @@
# -*- coding: utf-8 -*-
import os
from contextlib import contextmanager
from types import SimpleNamespace
import pytest
from domain.config import WorkerConfig
from domain.result import TaskResult
from domain.task import TaskType
from handlers.base import BaseHandler
class _DummyApiClient:
pass
class _DummyHandler(BaseHandler):
def handle(self, task):
return TaskResult.ok({})
def get_supported_type(self):
return TaskType.RENDER_SEGMENT_TS
def _create_handler(tmp_path):
config = WorkerConfig(
api_endpoint='http://127.0.0.1:18084/api',
access_key='TEST_ACCESS_KEY',
worker_id='test-worker',
temp_dir=str(tmp_path),
cache_enabled=False,
cache_dir=str(tmp_path / 'cache')
)
return _DummyHandler(config, _DummyApiClient())
def test_download_files_parallel_collects_success_and_failure(tmp_path, monkeypatch):
handler = _create_handler(tmp_path)
handler.task_download_concurrency = 3
captured_calls = []
def _fake_download(url, dest, timeout=None, use_cache=True):
captured_calls.append((url, dest, timeout, use_cache))
os.makedirs(os.path.dirname(dest), exist_ok=True)
with open(dest, 'wb') as file_obj:
file_obj.write(b'1')
return not url.endswith('/fail')
monkeypatch.setattr(handler, 'download_file', _fake_download)
results = handler.download_files_parallel(
[
{'key': 'first', 'url': 'https://example.com/first', 'dest': str(tmp_path / 'first.bin')},
{'key': 'second', 'url': 'https://example.com/fail', 'dest': str(tmp_path / 'second.bin')},
{'key': 'third', 'url': 'https://example.com/third', 'dest': str(tmp_path / 'third.bin'), 'use_cache': False},
],
timeout=15,
)
assert len(captured_calls) == 3
assert results['first']['success'] is True
assert results['second']['success'] is False
assert results['third']['success'] is True
assert any(call_item[3] is False for call_item in captured_calls)
def test_download_files_parallel_rejects_duplicate_key(tmp_path):
handler = _create_handler(tmp_path)
with pytest.raises(ValueError, match='Duplicate download job key'):
handler.download_files_parallel(
[
{'key': 'dup', 'url': 'https://example.com/1', 'dest': str(tmp_path / '1.bin')},
{'key': 'dup', 'url': 'https://example.com/2', 'dest': str(tmp_path / '2.bin')},
]
)
def test_upload_files_parallel_collects_urls(tmp_path, monkeypatch):
handler = _create_handler(tmp_path)
handler.task_upload_concurrency = 2
def _fake_upload(task_id, file_type, file_path, file_name=None):
if file_type == 'video':
return f'https://cdn.example.com/{task_id}/{file_name or "video.mp4"}'
return None
monkeypatch.setattr(handler, 'upload_file', _fake_upload)
results = handler.upload_files_parallel(
[
{
'key': 'video_output',
'task_id': 'task-1',
'file_type': 'video',
'file_path': str(tmp_path / 'video.mp4'),
'file_name': 'output.mp4',
},
{
'key': 'audio_output',
'task_id': 'task-1',
'file_type': 'audio',
'file_path': str(tmp_path / 'audio.aac'),
},
]
)
assert results['video_output']['success'] is True
assert results['video_output']['url'] == 'https://cdn.example.com/task-1/output.mp4'
assert results['audio_output']['success'] is False
assert results['audio_output']['url'] is None
def test_download_file_sets_lock_wait_ms_span_attribute(tmp_path, monkeypatch):
handler = _create_handler(tmp_path)
destination = tmp_path / "download.bin"
class _FakeSpan:
def __init__(self):
self.attributes = {}
def set_attribute(self, key, value):
self.attributes[key] = value
fake_span = _FakeSpan()
@contextmanager
def _fake_start_span(name, kind=None, attributes=None):
if attributes:
fake_span.attributes.update(attributes)
yield fake_span
def _fake_get_or_download_with_metrics(url, dest, timeout=300, max_retries=5):
os.makedirs(os.path.dirname(dest), exist_ok=True)
with open(dest, 'wb') as file_obj:
file_obj.write(b'abc')
return True, {"lock_wait_ms": 1234, "lock_acquired": True, "cache_path_used": "cache"}
monkeypatch.setattr("handlers.base.start_span", _fake_start_span)
monkeypatch.setattr(
handler.material_cache,
"get_or_download_with_metrics",
_fake_get_or_download_with_metrics
)
assert handler.download_file("https://example.com/file.bin", str(destination), timeout=1, use_cache=True)
assert fake_span.attributes["render.file.lock_wait_ms"] == 1234
assert fake_span.attributes["render.file.lock_acquired"] is True
assert fake_span.attributes["render.file.cache_path_used"] == "cache"
def test_download_file_without_cache_sets_lock_wait_ms_zero(tmp_path, monkeypatch):
handler = _create_handler(tmp_path)
destination = tmp_path / "download-no-cache.bin"
class _FakeSpan:
def __init__(self):
self.attributes = {}
def set_attribute(self, key, value):
self.attributes[key] = value
fake_span = _FakeSpan()
@contextmanager
def _fake_start_span(name, kind=None, attributes=None):
if attributes:
fake_span.attributes.update(attributes)
yield fake_span
def _fake_storage_download(url, dest, timeout=30):
os.makedirs(os.path.dirname(dest), exist_ok=True)
with open(dest, 'wb') as file_obj:
file_obj.write(b'def')
return True
monkeypatch.setattr("handlers.base.start_span", _fake_start_span)
monkeypatch.setattr("handlers.base.storage.download_file", _fake_storage_download)
assert handler.download_file("https://example.com/file.bin", str(destination), timeout=1, use_cache=False)
assert fake_span.attributes["render.file.lock_wait_ms"] == 0
assert fake_span.attributes["render.file.lock_acquired"] is False
assert fake_span.attributes["render.file.cache_path_used"] == "direct"
def test_upload_file_sets_detailed_span_attributes(tmp_path, monkeypatch):
handler = _create_handler(tmp_path)
source_path = tmp_path / "upload.mp4"
source_path.write_bytes(b"abc123")
fake_span_attributes = {}
class _FakeSpan:
def set_attribute(self, key, value):
fake_span_attributes[key] = value
@contextmanager
def _fake_start_span(name, kind=None, attributes=None):
if attributes:
fake_span_attributes.update(attributes)
yield _FakeSpan()
handler.api_client = SimpleNamespace(
get_upload_url=lambda *args, **kwargs: {
"uploadUrl": "https://example.com/upload",
"accessUrl": "https://cdn.example.com/output.mp4",
}
)
monkeypatch.setattr("handlers.base.start_span", _fake_start_span)
monkeypatch.setattr(
"handlers.base.storage.upload_file_with_metrics",
lambda *args, **kwargs: (
True,
{
"upload_method": "http",
"http_attempts": 2,
"http_retry_count": 1,
"http_status_code": 200,
"http_replace_applied": True,
"content_type": "video/mp4",
"error_type": "",
"rclone_attempted": False,
"rclone_succeeded": False,
"rclone_fallback_http": False,
},
)
)
monkeypatch.setattr(handler.material_cache, "add_to_cache", lambda *args, **kwargs: True)
access_url = handler.upload_file("task-1", "video", str(source_path), "output.mp4")
assert access_url == "https://cdn.example.com/output.mp4"
assert fake_span_attributes["render.file.upload_success"] is True
assert fake_span_attributes["render.file.upload_method"] == "http"
assert fake_span_attributes["render.file.http_attempts"] == 2
assert fake_span_attributes["render.file.http_retry_count"] == 1
assert fake_span_attributes["render.file.http_status_code"] == 200
assert fake_span_attributes["render.file.http_replace_applied"] is True
assert fake_span_attributes["render.file.content_type"] == "video/mp4"
assert fake_span_attributes["render.file.cache_write_back"] == "success"

View File

@@ -13,3 +13,89 @@ def test_cache_lock_acquire_release(tmp_path):
assert os.path.exists(lock_path)
cache._release_lock(lock_path)
assert not os.path.exists(lock_path)
def test_get_or_download_cache_hit_does_not_wait_lock(tmp_path, monkeypatch):
cache = MaterialCache(cache_dir=str(tmp_path), enabled=True, max_size_gb=0)
url = "https://example.com/path/video.mp4?token=abc"
cache_path = cache.get_cache_path(url)
with open(cache_path, 'wb') as file_obj:
file_obj.write(b'cached-data')
destination = tmp_path / "result.bin"
def _unexpected_acquire(*args, **kwargs):
raise AssertionError("cache hit path should not acquire lock")
monkeypatch.setattr(cache, "_acquire_lock", _unexpected_acquire)
assert cache.get_or_download(url, str(destination), timeout=1) is True
assert destination.read_bytes() == b'cached-data'
def test_get_or_download_lock_timeout_can_still_use_ready_cache(tmp_path, monkeypatch):
cache = MaterialCache(cache_dir=str(tmp_path), enabled=True, max_size_gb=0)
url = "https://example.com/path/audio.aac?token=abc"
cache_path = cache.get_cache_path(url)
with open(cache_path, 'wb') as file_obj:
file_obj.write(b'audio-cache')
destination = tmp_path / "audio.aac"
download_called = {"value": False}
monkeypatch.setattr(cache, "_acquire_lock", lambda *args, **kwargs: None)
def _fake_download(*args, **kwargs):
download_called["value"] = True
return False
monkeypatch.setattr("services.cache.storage.download_file", _fake_download)
assert cache.get_or_download(url, str(destination), timeout=1) is True
assert destination.read_bytes() == b'audio-cache'
assert download_called["value"] is False
def test_get_or_download_uses_short_lock_timeout(tmp_path, monkeypatch):
cache = MaterialCache(cache_dir=str(tmp_path), enabled=True, max_size_gb=0)
url = "https://example.com/path/segment.ts?token=abc"
destination = tmp_path / "segment.ts"
captured = {"timeout_sec": None}
def _fake_acquire(cache_key, timeout_sec=None):
captured["timeout_sec"] = timeout_sec
return None
monkeypatch.setattr(cache, "_acquire_lock", _fake_acquire)
monkeypatch.setattr("services.cache.storage.download_file", lambda *args, **kwargs: True)
assert cache.get_or_download(url, str(destination), timeout=1) is True
assert captured["timeout_sec"] == cache.DOWNLOAD_LOCK_TIMEOUT_SEC
def test_get_or_download_with_metrics_cache_hit_wait_zero(tmp_path):
cache = MaterialCache(cache_dir=str(tmp_path), enabled=True, max_size_gb=0)
url = "https://example.com/path/hit.mp4?token=abc"
cache_path = cache.get_cache_path(url)
with open(cache_path, 'wb') as file_obj:
file_obj.write(b'hit-data')
destination = tmp_path / "hit.mp4"
success, metrics = cache.get_or_download_with_metrics(url, str(destination), timeout=1)
assert success is True
assert metrics["lock_wait_ms"] == 0
assert metrics["lock_acquired"] is False
assert metrics["cache_path_used"] == "cache"
def test_get_or_download_with_metrics_reports_lock_wait_ms(tmp_path, monkeypatch):
cache = MaterialCache(cache_dir=str(tmp_path), enabled=True, max_size_gb=0)
url = "https://example.com/path/miss.mp4?token=abc"
destination = tmp_path / "miss.mp4"
monkeypatch.setattr(cache, "_acquire_lock_with_wait", lambda *args, **kwargs: (None, 4321))
monkeypatch.setattr("services.cache.storage.download_file", lambda *args, **kwargs: True)
success, metrics = cache.get_or_download_with_metrics(url, str(destination), timeout=1)
assert success is True
assert metrics["lock_wait_ms"] == 4321
assert metrics["lock_acquired"] is False
assert metrics["cache_path_used"] == "direct"

View File

@@ -0,0 +1,235 @@
# -*- coding: utf-8 -*-
import pytest
from domain.config import WorkerConfig
from domain.task import Effect, OutputSpec, RenderSpec
from handlers.render_video import RenderSegmentTsHandler
class _DummyApiClient:
pass
def _create_handler(tmp_path):
config = WorkerConfig(
api_endpoint='http://127.0.0.1:18084/api',
access_key='TEST_ACCESS_KEY',
worker_id='test-worker',
temp_dir=str(tmp_path),
cache_enabled=False,
cache_dir=str(tmp_path / 'cache')
)
return RenderSegmentTsHandler(config, _DummyApiClient())
def test_get_zoom_params_with_valid_values():
effect = Effect.from_string('zoom:1.5,1.35,2')
assert effect is not None
start_sec, scale_factor, duration_sec = effect.get_zoom_params()
assert start_sec == pytest.approx(1.5)
assert scale_factor == pytest.approx(1.35)
assert duration_sec == pytest.approx(2.0)
@pytest.mark.parametrize(
'effect_str',
[
'zoom:-1,0.9,-2',
'zoom:nan,inf,0',
'zoom:bad,value,data',
'zoom:,,',
],
)
def test_get_zoom_params_invalid_values_fallback_to_default(effect_str):
effect = Effect.from_string(effect_str)
assert effect is not None
assert effect.get_zoom_params() == (0.0, 1.2, 1.0)
def test_build_command_with_zoom_uses_filter_complex(tmp_path):
handler = _create_handler(tmp_path)
render_spec = RenderSpec(effects='zoom:1.5,1.4,2')
output_spec = OutputSpec(width=1080, height=1920, fps=30)
command = handler._build_command(
input_file='input.mp4',
output_file='output.mp4',
render_spec=render_spec,
output_spec=output_spec,
duration_ms=6000,
)
assert '-filter_complex' in command
assert '-vf' not in command
def test_build_video_filters_zoom_and_camera_shot_stack_in_order(tmp_path):
handler = _create_handler(tmp_path)
render_spec = RenderSpec(effects='cameraShot:3,1|zoom:1,1.2,2')
output_spec = OutputSpec(width=1080, height=1920, fps=30)
filters = handler._build_video_filters(
render_spec=render_spec,
output_spec=output_spec,
duration_ms=8000,
source_duration_sec=10.0,
)
camera_shot_marker = 'concat=n=2:v=1:a=0'
zoom_marker = "overlay=0:0:enable='between(t,1.0,3.0)'"
assert camera_shot_marker in filters
assert zoom_marker in filters
assert filters.index(camera_shot_marker) < filters.index(zoom_marker)
# ---------- ospeed 测试 ----------
def test_get_ospeed_params_with_valid_values():
effect = Effect.from_string('ospeed:2')
assert effect is not None
assert effect.get_ospeed_params() == pytest.approx(2.0)
effect2 = Effect.from_string('ospeed:0.5')
assert effect2 is not None
assert effect2.get_ospeed_params() == pytest.approx(0.5)
@pytest.mark.parametrize(
'effect_str',
[
'ospeed:0',
'ospeed:-1',
'ospeed:nan',
'ospeed:inf',
'ospeed:abc',
'ospeed:',
],
)
def test_get_ospeed_params_invalid_values_fallback(effect_str):
effect = Effect.from_string(effect_str)
assert effect is not None
assert effect.get_ospeed_params() == 1.0
def test_get_ospeed_params_no_params():
effect = Effect.from_string('ospeed')
assert effect is not None
assert effect.get_ospeed_params() == 1.0
def test_ospeed_does_not_trigger_filter_complex(tmp_path):
handler = _create_handler(tmp_path)
render_spec = RenderSpec(effects='ospeed:2')
output_spec = OutputSpec(width=1080, height=1920, fps=30)
command = handler._build_command(
input_file='input.mp4',
output_file='output.mp4',
render_spec=render_spec,
output_spec=output_spec,
duration_ms=6000,
)
assert '-vf' in command
assert '-filter_complex' not in command
# 验证 setpts 滤镜
vf_idx = command.index('-vf')
vf_value = command[vf_idx + 1]
assert 'setpts=2.0*(PTS-STARTPTS)' in vf_value
def test_default_speed_always_normalizes_pts(tmp_path):
handler = _create_handler(tmp_path)
render_spec = RenderSpec()
output_spec = OutputSpec(width=1080, height=1920, fps=30)
filters = handler._build_video_filters(
render_spec=render_spec,
output_spec=output_spec,
duration_ms=6000,
source_duration_sec=10.0,
)
assert 'setpts=PTS-STARTPTS' in filters
def test_ospeed_combined_with_speed(tmp_path):
handler = _create_handler(tmp_path)
# speed=2 → 1/2=0.5, ospeed=3 → 0.5*3=1.5
render_spec = RenderSpec(speed='2', effects='ospeed:3')
output_spec = OutputSpec(width=1080, height=1920, fps=30)
filters = handler._build_video_filters(
render_spec=render_spec,
output_spec=output_spec,
duration_ms=6000,
source_duration_sec=10.0,
)
assert 'setpts=1.5*(PTS-STARTPTS)' in filters
def test_ospeed_with_complex_effects(tmp_path):
handler = _create_handler(tmp_path)
render_spec = RenderSpec(effects='ospeed:2|zoom:1,1.2,2')
output_spec = OutputSpec(width=1080, height=1920, fps=30)
filters = handler._build_video_filters(
render_spec=render_spec,
output_spec=output_spec,
duration_ms=8000,
source_duration_sec=10.0,
)
# zoom 触发 filter_complex
assert '[v_base]' in filters
# setpts 在 base_chain 中
assert 'setpts=2.0*(PTS-STARTPTS)' in filters
# zoom 正常处理
assert "overlay=0:0:enable='between(t,1.0,3.0)'" in filters
def test_only_first_ospeed_is_used(tmp_path):
handler = _create_handler(tmp_path)
render_spec = RenderSpec(effects='ospeed:2|ospeed:5')
output_spec = OutputSpec(width=1080, height=1920, fps=30)
filters = handler._build_video_filters(
render_spec=render_spec,
output_spec=output_spec,
duration_ms=6000,
source_duration_sec=10.0,
)
assert 'setpts=2.0*(PTS-STARTPTS)' in filters
assert 'setpts=5.0*(PTS-STARTPTS)' not in filters
def test_ospeed_affects_tpad_calculation(tmp_path):
handler = _create_handler(tmp_path)
# ospeed:2 使 10s 视频变为 20s 有效时长,目标 6s → 无需 tpad
render_spec = RenderSpec(effects='ospeed:2')
output_spec = OutputSpec(width=1080, height=1920, fps=30)
filters = handler._build_video_filters(
render_spec=render_spec,
output_spec=output_spec,
duration_ms=6000,
source_duration_sec=10.0,
)
assert 'tpad' not in filters
# 对比:无 ospeed 时 10s 视频 → 目标 15s → 需要 5s tpad
render_spec_no_ospeed = RenderSpec()
filters_no_ospeed = handler._build_video_filters(
render_spec=render_spec_no_ospeed,
output_spec=output_spec,
duration_ms=15000,
source_duration_sec=10.0,
)
assert 'tpad' in filters_no_ospeed
assert 'stop_duration=5.0' in filters_no_ospeed

View File

@@ -0,0 +1,81 @@
# -*- coding: utf-8 -*-
import requests
from services import storage
class _FakeResponse:
def __init__(self, status_code=200):
self.status_code = status_code
def raise_for_status(self):
return None
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
return False
def test_upload_file_with_metrics_file_not_found(tmp_path):
missing_path = tmp_path / "missing.mp4"
success, metrics = storage.upload_file_with_metrics(
"https://example.com/upload",
str(missing_path),
max_retries=1,
timeout=1,
)
assert success is False
assert metrics["error_type"] == "file_not_found"
assert metrics["upload_method"] == "none"
def test_upload_file_with_metrics_http_success(tmp_path, monkeypatch):
source_path = tmp_path / "video.mp4"
source_path.write_bytes(b"content")
monkeypatch.setattr("services.storage.requests.put", lambda *args, **kwargs: _FakeResponse(200))
success, metrics = storage.upload_file_with_metrics(
"https://example.com/upload",
str(source_path),
max_retries=3,
timeout=1,
)
assert success is True
assert metrics["upload_method"] == "http"
assert metrics["http_attempts"] == 1
assert metrics["http_retry_count"] == 0
assert metrics["http_status_code"] == 200
assert metrics["content_type"] == "video/mp4"
def test_upload_file_with_metrics_retry_then_success(tmp_path, monkeypatch):
source_path = tmp_path / "audio.aac"
source_path.write_bytes(b"audio-bytes")
call_counter = {"count": 0}
def _fake_put(*args, **kwargs):
call_counter["count"] += 1
if call_counter["count"] == 1:
raise requests.exceptions.Timeout()
return _FakeResponse(200)
monkeypatch.setattr("services.storage.requests.put", _fake_put)
success, metrics = storage.upload_file_with_metrics(
"https://example.com/upload",
str(source_path),
max_retries=3,
timeout=1,
)
assert success is True
assert metrics["http_attempts"] == 2
assert metrics["http_retry_count"] == 1
assert metrics["http_status_code"] == 200
assert metrics["error_type"] == ""

View File

@@ -0,0 +1,51 @@
# -*- coding: utf-8 -*-
import importlib
from types import SimpleNamespace
import util.tracing as tracing_module
def _create_task_stub():
task_type = SimpleNamespace(value="RENDER_SEGMENT_TS")
return SimpleNamespace(
task_id="task-1001",
task_type=task_type,
get_job_id=lambda: "job-2002",
get_segment_id=lambda: "seg-3003",
)
def test_task_trace_scope_sets_and_resets_context(monkeypatch):
monkeypatch.setenv("OTEL_ENABLED", "false")
tracing = importlib.reload(tracing_module)
assert tracing.initialize_tracing("worker-1", "2.0.0") is False
assert tracing.get_current_task_context() is None
with tracing.task_trace_scope(_create_task_stub()) as span:
assert span is None
context = tracing.get_current_task_context()
assert context is not None
assert context.task_id == "task-1001"
assert context.task_type == "RENDER_SEGMENT_TS"
assert context.job_id == "job-2002"
assert context.segment_id == "seg-3003"
with tracing.start_span("render.task.sample.step") as child_span:
assert child_span is None
assert tracing.get_current_task_context() is None
def test_bind_trace_context_restores_previous_context(monkeypatch):
monkeypatch.setenv("OTEL_ENABLED", "false")
tracing = importlib.reload(tracing_module)
tracing.initialize_tracing("worker-1", "2.0.0")
context = tracing.TaskTraceContext(task_id="task-1", task_type="FINALIZE_MP4")
assert tracing.get_current_task_context() is None
with tracing.bind_trace_context(None, context):
assert tracing.get_current_task_context() == context
assert tracing.get_current_task_context() is None

View File

@@ -0,0 +1,25 @@
# -*- coding: utf-8 -*-
from constant import HW_ACCEL_CUDA, HW_ACCEL_NONE, HW_ACCEL_QSV
from handlers.base import get_video_encode_args
def _assert_bframe_disabled(args):
assert '-bf' in args
bf_index = args.index('-bf')
assert args[bf_index + 1] == '0'
def test_get_video_encode_args_disable_b_frames_for_software():
args = get_video_encode_args(HW_ACCEL_NONE)
_assert_bframe_disabled(args)
def test_get_video_encode_args_disable_b_frames_for_qsv():
args = get_video_encode_args(HW_ACCEL_QSV)
_assert_bframe_disabled(args)
def test_get_video_encode_args_disable_b_frames_for_cuda():
args = get_video_encode_args(HW_ACCEL_CUDA)
_assert_bframe_disabled(args)

260
util/tracing.py Normal file
View File

@@ -0,0 +1,260 @@
# -*- coding: utf-8 -*-
"""
OTel 链路追踪工具。
提供统一的 tracing 初始化、任务上下文管理与 Span 创建能力。
"""
import logging
import os
from contextlib import contextmanager, nullcontext
from contextvars import ContextVar
from dataclasses import dataclass
from typing import Any, Dict, Iterator, Mapping, Optional
from opentelemetry import context as otel_context
from opentelemetry import propagate, trace
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource, SERVICE_NAME, SERVICE_VERSION
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.trace import Span, SpanKind, Status, StatusCode
logger = logging.getLogger(__name__)
_DEFAULT_SERVICE_NAME = "RenderWorkerNext"
_DEFAULT_TRACER_NAME = "render.worker"
_OTEL_EXPORTER_OTLP_ENDPOINT = "https://oltp.jerryyan.top/v1/traces"
_TASK_ID_ATTR = "render.task.id"
_TASK_TYPE_ATTR = "render.task.type"
_JOB_ID_ATTR = "render.job.id"
_SEGMENT_ID_ATTR = "render.segment.id"
_ERROR_CODE_ATTR = "render.error.code"
_ERROR_MESSAGE_ATTR = "render.error.message"
_TRUE_VALUES = {"1", "true", "yes", "on"}
_TRACING_INITIALIZED = False
_TRACING_ENABLED = False
_TRACER_PROVIDER: Optional[TracerProvider] = None
_CURRENT_TASK_CONTEXT: ContextVar[Optional["TaskTraceContext"]] = ContextVar(
"render_worker_task_trace_context",
default=None,
)
@dataclass(frozen=True)
class TaskTraceContext:
"""任务维度的 tracing 上下文。"""
task_id: str
task_type: str
job_id: str = ""
segment_id: str = ""
def to_attributes(self) -> Dict[str, str]:
attributes = {
_TASK_ID_ATTR: self.task_id,
_TASK_TYPE_ATTR: self.task_type,
}
if self.job_id:
attributes[_JOB_ID_ATTR] = self.job_id
if self.segment_id:
attributes[_SEGMENT_ID_ATTR] = self.segment_id
return attributes
def _parse_bool(value: str, default: bool) -> bool:
if value is None:
return default
return value.strip().lower() in _TRUE_VALUES
def is_tracing_enabled() -> bool:
return _TRACING_ENABLED
def initialize_tracing(worker_id: str, service_version: str) -> bool:
"""
初始化 OTel tracing。
"""
global _TRACING_INITIALIZED
global _TRACING_ENABLED
global _TRACER_PROVIDER
if _TRACING_INITIALIZED:
return _TRACING_ENABLED
_TRACING_INITIALIZED = True
if not _parse_bool(os.getenv("OTEL_ENABLED"), default=True):
logger.info("OTel tracing disabled by OTEL_ENABLED")
_TRACING_ENABLED = False
return False
service_name = _DEFAULT_SERVICE_NAME
attributes: Dict[str, str] = {
SERVICE_NAME: service_name,
SERVICE_VERSION: service_version,
"render.worker.id": str(worker_id),
}
resource = Resource.create(attributes)
tracer_provider = TracerProvider(resource=resource)
tracer_provider.add_span_processor(
BatchSpanProcessor(
OTLPSpanExporter(endpoint=_OTEL_EXPORTER_OTLP_ENDPOINT)
)
)
trace.set_tracer_provider(tracer_provider)
_TRACING_ENABLED = True
if trace.get_tracer_provider() is tracer_provider:
_TRACER_PROVIDER = tracer_provider
logger.info("OTel tracing initialized (service=%s, worker=%s)", service_name, worker_id)
return True
def shutdown_tracing() -> None:
"""优雅关闭 tracing provider,刷新剩余 span。"""
global _TRACING_ENABLED
if not _TRACING_ENABLED:
return
provider = _TRACER_PROVIDER
if provider is not None:
try:
provider.shutdown()
except Exception as exc:
logger.warning("Failed to shutdown tracing provider: %s", exc)
_TRACING_ENABLED = False
def build_task_trace_context(task: Any) -> TaskTraceContext:
task_id = str(getattr(task, "task_id", ""))
task_type_obj = getattr(task, "task_type", "")
task_type = str(getattr(task_type_obj, "value", task_type_obj))
job_id = ""
if hasattr(task, "get_job_id"):
job_id = str(task.get_job_id() or "")
segment_id = ""
if hasattr(task, "get_segment_id"):
segment_value = task.get_segment_id()
segment_id = str(segment_value) if segment_value is not None else ""
return TaskTraceContext(
task_id=task_id,
task_type=task_type,
job_id=job_id,
segment_id=segment_id,
)
def get_current_task_context() -> Optional[TaskTraceContext]:
return _CURRENT_TASK_CONTEXT.get()
def capture_otel_context() -> Any:
return otel_context.get_current()
@contextmanager
def bind_trace_context(parent_otel_context: Any, task_context: Optional[TaskTraceContext]) -> Iterator[None]:
"""
在当前线程绑定父 OTel 上下文与任务上下文。
用于跨线程延续任务链路(例如租约续期线程)。
"""
otel_token = None
task_token = None
if parent_otel_context is not None:
otel_token = otel_context.attach(parent_otel_context)
if task_context is not None:
task_token = _CURRENT_TASK_CONTEXT.set(task_context)
try:
yield
finally:
if task_token is not None:
_CURRENT_TASK_CONTEXT.reset(task_token)
if otel_token is not None:
otel_context.detach(otel_token)
@contextmanager
def task_trace_scope(task: Any, span_name: str = "render.task.process") -> Iterator[Optional[Span]]:
"""创建任务根 Span 并绑定任务上下文。"""
task_context = build_task_trace_context(task)
task_token = _CURRENT_TASK_CONTEXT.set(task_context)
span_cm = nullcontext(None)
if _TRACING_ENABLED:
tracer = trace.get_tracer(_DEFAULT_TRACER_NAME)
span_cm = tracer.start_as_current_span(span_name, kind=SpanKind.CONSUMER)
try:
with span_cm as span:
if span is not None:
for key, value in task_context.to_attributes().items():
span.set_attribute(key, value)
yield span
finally:
_CURRENT_TASK_CONTEXT.reset(task_token)
@contextmanager
def start_span(
name: str,
*,
attributes: Optional[Mapping[str, Any]] = None,
kind: SpanKind = SpanKind.INTERNAL,
task_id: Optional[str] = None,
) -> Iterator[Optional[Span]]:
"""
创建任务内子 Span。
当 tracing 未启用,或当前不在任务上下文中且未显式传入 task_id 时,返回空上下文。
"""
task_context = get_current_task_context()
should_trace = _TRACING_ENABLED and (task_context is not None or bool(task_id))
if not should_trace:
with nullcontext(None) as span:
yield span
return
tracer = trace.get_tracer(_DEFAULT_TRACER_NAME)
with tracer.start_as_current_span(name, kind=kind) as span:
if task_context is not None:
for key, value in task_context.to_attributes().items():
span.set_attribute(key, value)
if task_id and (task_context is None or task_context.task_id != task_id):
span.set_attribute(_TASK_ID_ATTR, task_id)
if attributes:
for key, value in attributes.items():
if value is not None:
span.set_attribute(key, value)
yield span
def mark_span_error(span: Optional[Span], message: str, error_code: str = "") -> None:
"""标记 Span 为错误状态。"""
if span is None:
return
if error_code:
span.set_attribute(_ERROR_CODE_ATTR, error_code)
if message:
span.set_attribute(_ERROR_MESSAGE_ATTR, message[:500])
span.set_status(Status(StatusCode.ERROR, message[:200]))
def inject_trace_headers(headers: Optional[Mapping[str, str]] = None) -> Dict[str, str]:
"""向 HTTP 头注入当前 trace 上下文。"""
carrier = dict(headers) if headers else {}
if _TRACING_ENABLED:
propagate.inject(carrier)
return carrier