Compare commits

..

10 Commits

Author SHA1 Message Date
e5c5a181d3 feat(config): 添加环境变量加载功能
- 集成 python-dotenv 库以支持 .env 文件
- 在主函数中添加 load_dotenv() 调用
- 实现环境配置的自动加载机制
2026-01-18 18:16:19 +08:00
f27490e9e1 feat(task): 支持图片素材类型的视频渲染
- 添加 IMAGE_EXTENSIONS 常量定义支持的图片格式
- 实现 get_material_type 方法优先使用服务端类型或根据URL后缀推断
- 添加 is_image_material 方法判断素材是否为图片类型
- 修改 RenderSegmentVideoHandler 支持图片转视频流程
- 实现 _convert_image_to_video 方法将静态图片转换为视频
- 更新下载步骤为先检测素材类型再确定输入文件扩展名
- 添加图片素材转换为视频的处理逻辑
- 重构步骤编号以匹配新的处理流程
- 优化错误提示信息支持HTTP/HTTPS协议检查
2026-01-18 13:52:46 +08:00
10c57a387f feat(config): 更新环境配置文件模板
- 修改 API_ENDPOINT 默认地址为本地开发地址
- 添加 WORKER_ID 配置项
- 新增硬件加速配置选项 HW_ACCEL
- 添加素材缓存配置 CACHE_ENABLED、CACHE_DIR、CACHE_MAX_SIZE_GB
- 新增下载 URL 映射配置 HTTP_REPLACE_MAP
- 更新上传方式配置选项和相关参数
- 重新组织配置项分组和注释说明
2026-01-17 17:43:36 +08:00
a72e1ef1a1 fix(video): 解决LUT路径中冒号转义问题
- 在LUT路径处理中添加冒号转义功能,避免FFmpeg filter语法冲突
- 保留原有的反斜杠转换逻辑
- 确保LUT文件路径在FFmpeg命令中正确解析
2026-01-17 16:57:16 +08:00
095e203fe6 feat(task): 增强素材URL处理和验证逻辑
- 添加详细的get_material_url方法文档说明优先级逻辑
- 新增get_source_ref方法用于获取素材源引用
- 新增get_bound_material_url方法用于获取绑定素材URL
- 在视频渲染处理器中添加HTTP URL格式验证检查
- 当素材URL格式无效时返回详细错误信息和调试日志
- 验证失败时返回E_SPEC_INVALID错误码并提示服务器需提供有效的boundMaterialUrl
2026-01-17 16:22:01 +08:00
fe757408b6 feat(cache): 添加素材缓存功能以避免重复下载
- 新增素材缓存配置选项包括启用状态、缓存目录和最大缓存大小
- 实现 MaterialCache 类提供缓存存储和检索功能
- 修改 download_file 方法支持缓存下载模式
- 添加缓存清理机制使用 LRU 策略管理磁盘空间
- 配置默认值优化本地开发体验
- 实现缓存统计和监控功能
2026-01-17 15:07:12 +08:00
d5cd0dca03 fix(api): 修复任务列表解析中的空值错误
- 将 data.get('data', {}).get('tasks', []) 修改为 data.get('data', {}).get('tasks') or []
- 防止当 tasks 字段为 None 时导致的解析异常
- 确保即使返回数据中没有 tasks 字段也能正常处理
2026-01-17 14:35:58 +08:00
2bded11a03 feat(task): 添加转场效果相关属性和方法
- 新增 get_transition_type、get_transition_ms、has_transition 方法用于处理转场类型和时长
- 新增 get_overlap_tail_ms、get_transition_in_type、get_transition_in_ms 等方法处理入场转场
- 新增 get_transition_out_type、get_transition_out_ms、has_transition_out 等方法处理出场转场
- 新增 get_overlap_head_ms、get_overlap_tail_ms_v2 方法计算头部和尾部重叠时长
- 更新渲染视频处理器中使用新的转场相关方法计算 overlap 时长
2026-01-14 09:30:09 +08:00
71bd2e59f9 feat(video): 添加硬件加速支持
- 定义硬件加速类型常量(none、qsv、cuda)
- 配置QSV和CUDA编码参数及预设
- 在WorkerConfig中添加硬件加速配置选项
- 实现基于硬件加速类型的编码参数动态获取
- 添加FFmpeg硬件加速解码和滤镜参数
- 检测并报告系统硬件加速支持信息
- 在API客户端中上报硬件加速配置和支持状态
2026-01-13 13:34:27 +08:00
a26c44a3cd feat(video): 添加视频特效处理功能
- 在常量模块中定义支持的特效类型(相机定格、缩放、模糊)
- 在任务域中创建Effect数据类,支持从字符串解析特效配置
- 实现cameraShot特效参数解析和默认值处理
- 扩展RenderSpec类,添加获取特效列表的方法
- 修改视频渲染处理器,集成特效滤镜构建逻辑
- 实现cameraShot特效的filter_complex滤镜图构建
- 添加fps参数支持和overlay检测逻辑优化
- 完成特效与转场overlap的兼容处理
2026-01-13 09:31:39 +08:00
11 changed files with 1238 additions and 56 deletions

View File

@@ -1,13 +1,59 @@
TEMPLATE_DIR=template/ # ===================
API_ENDPOINT=https://zhentuai.com/task/v1 # API 配置
# ===================
API_ENDPOINT=http://127.0.0.1:18084/api
ACCESS_KEY=TEST_ACCESS_KEY ACCESS_KEY=TEST_ACCESS_KEY
WORKER_ID=1
# ===================
# 目录配置
# ===================
TEMP_DIR=tmp/ TEMP_DIR=tmp/
#REDIRECT_TO_URL=https://renderworker-deuvulkhes.cn-shanghai.fcapp.run/
# QSV # ===================
ENCODER_ARGS="-c:v h264_qsv -global_quality 28 -look_ahead 1" # 并发与调度
# NVENC # ===================
#ENCODER_ARGS="-c:v h264_nvenc -cq:v 24 -preset:v p7 -tune:v hq -profile:v high" #MAX_CONCURRENCY=4 # 最大并发任务数
# HEVC #HEARTBEAT_INTERVAL=5 # 心跳间隔(秒)
#VIDEO_ARGS="-profile:v main #LEASE_EXTENSION_THRESHOLD=60 # 租约续期阈值(秒),提前多久续期
UPLOAD_METHOD="rclone" #LEASE_EXTENSION_DURATION=300 # 租约续期时长(秒)
RCLONE_REPLACE_MAP="https://oss.zhentuai.com|alioss://frametour-assets,https://frametour-assets.oss-cn-shanghai.aliyuncs.com|alioss://frametour-assets"
# ===================
# 能力配置
# ===================
# 支持的任务类型,逗号分隔,默认全部支持
#CAPABILITIES=RENDER_SEGMENT_VIDEO,PREPARE_JOB_AUDIO,PACKAGE_SEGMENT_TS,FINALIZE_MP4
# ===================
# 超时配置
# ===================
#FFMPEG_TIMEOUT=3600 # FFmpeg 执行超时(秒)
#DOWNLOAD_TIMEOUT=300 # 下载超时(秒)
#UPLOAD_TIMEOUT=600 # 上传超时(秒)
# ===================
# 硬件加速
# ===================
# 可选值: none, qsv, cuda
HW_ACCEL=none
# ===================
# 素材缓存
# ===================
#CACHE_ENABLED=true # 是否启用素材缓存
#CACHE_DIR= # 缓存目录,默认 TEMP_DIR/cache
#CACHE_MAX_SIZE_GB=0 # 最大缓存大小(GB),0 表示不限制
# ===================
# URL 映射(内网下载加速)
# ===================
# 格式: src1|dst1,src2|dst2
#HTTP_REPLACE_MAP="https://cdcdn.zhentuai.com|http://192.168.10.254:9000"
# ===================
# 上传配置
# ===================
# 上传方式: 默认 HTTP,可选 rclone
#UPLOAD_METHOD=rclone
#RCLONE_CONFIG_FILE= # rclone 配置文件路径
#RCLONE_REPLACE_MAP="https://oss.example.com|alioss://bucket"

View File

@@ -34,7 +34,21 @@ TRANSITION_TYPES = (
'slidedown', # 向下滑动 'slidedown', # 向下滑动
) )
# 统一视频编码参数(来自集成文档) # 支持的特效类型
EFFECT_TYPES = (
'cameraShot', # 相机定格效果:在指定时间点冻结画面
'zoom', # 缩放效果(预留)
'blur', # 模糊效果(预留)
)
# 硬件加速类型
HW_ACCEL_NONE = 'none' # 纯软件编解码
HW_ACCEL_QSV = 'qsv' # Intel Quick Sync Video (核显/独显)
HW_ACCEL_CUDA = 'cuda' # NVIDIA NVENC/NVDEC
HW_ACCEL_TYPES = (HW_ACCEL_NONE, HW_ACCEL_QSV, HW_ACCEL_CUDA)
# 统一视频编码参数(软件编码,来自集成文档)
VIDEO_ENCODE_PARAMS = { VIDEO_ENCODE_PARAMS = {
'codec': 'libx264', 'codec': 'libx264',
'preset': 'medium', 'preset': 'medium',
@@ -44,6 +58,28 @@ VIDEO_ENCODE_PARAMS = {
'pix_fmt': 'yuv420p', 'pix_fmt': 'yuv420p',
} }
# QSV 硬件加速视频编码参数(Intel Quick Sync)
VIDEO_ENCODE_PARAMS_QSV = {
'codec': 'h264_qsv',
'preset': 'medium', # QSV 支持: veryfast, faster, fast, medium, slow, slower, veryslow
'profile': 'main',
'level': '4.0',
'global_quality': '23', # QSV 使用 global_quality 代替 crf(1-51,值越低质量越高)
'look_ahead': '1', # 启用前瞻分析提升质量
'pix_fmt': 'nv12', # QSV 硬件表面格式
}
# CUDA 硬件加速视频编码参数(NVIDIA NVENC)
VIDEO_ENCODE_PARAMS_CUDA = {
'codec': 'h264_nvenc',
'preset': 'p4', # NVENC 预设 p1-p7(p1 最快,p7 最慢/质量最高),p4 ≈ medium
'profile': 'main',
'level': '4.0',
'rc': 'vbr', # 码率控制模式:vbr 可变码率
'cq': '23', # 恒定质量模式的质量值(0-51)
'pix_fmt': 'yuv420p', # NVENC 输入格式(会自动转换)
}
# 统一音频编码参数 # 统一音频编码参数
AUDIO_ENCODE_PARAMS = { AUDIO_ENCODE_PARAMS = {
'codec': 'aac', 'codec': 'aac',

View File

@@ -9,6 +9,8 @@ import os
from dataclasses import dataclass, field from dataclasses import dataclass, field
from typing import List, Optional from typing import List, Optional
from constant import HW_ACCEL_NONE, HW_ACCEL_QSV, HW_ACCEL_CUDA, HW_ACCEL_TYPES
# 默认支持的任务类型 # 默认支持的任务类型
DEFAULT_CAPABILITIES = [ DEFAULT_CAPABILITIES = [
@@ -54,6 +56,14 @@ class WorkerConfig:
download_timeout: int = 300 # 秒,下载超时 download_timeout: int = 300 # 秒,下载超时
upload_timeout: int = 600 # 秒,上传超时 upload_timeout: int = 600 # 秒,上传超时
# 硬件加速配置
hw_accel: str = HW_ACCEL_NONE # 硬件加速类型: none, qsv, cuda
# 素材缓存配置
cache_enabled: bool = True # 是否启用素材缓存
cache_dir: str = "" # 缓存目录,默认为 temp_dir/cache
cache_max_size_gb: float = 0 # 最大缓存大小(GB),0 表示不限制
@classmethod @classmethod
def from_env(cls) -> 'WorkerConfig': def from_env(cls) -> 'WorkerConfig':
"""从环境变量创建配置""" """从环境变量创建配置"""
@@ -98,6 +108,16 @@ class WorkerConfig:
download_timeout = int(os.getenv('DOWNLOAD_TIMEOUT', '300')) download_timeout = int(os.getenv('DOWNLOAD_TIMEOUT', '300'))
upload_timeout = int(os.getenv('UPLOAD_TIMEOUT', '600')) upload_timeout = int(os.getenv('UPLOAD_TIMEOUT', '600'))
# 硬件加速配置
hw_accel = os.getenv('HW_ACCEL', HW_ACCEL_NONE).lower()
if hw_accel not in HW_ACCEL_TYPES:
hw_accel = HW_ACCEL_NONE
# 素材缓存配置
cache_enabled = os.getenv('CACHE_ENABLED', 'true').lower() in ('true', '1', 'yes')
cache_dir = os.getenv('CACHE_DIR', '') # 空字符串表示使用默认路径
cache_max_size_gb = float(os.getenv('CACHE_MAX_SIZE_GB', '0'))
return cls( return cls(
api_endpoint=api_endpoint, api_endpoint=api_endpoint,
access_key=access_key, access_key=access_key,
@@ -110,7 +130,11 @@ class WorkerConfig:
capabilities=capabilities, capabilities=capabilities,
ffmpeg_timeout=ffmpeg_timeout, ffmpeg_timeout=ffmpeg_timeout,
download_timeout=download_timeout, download_timeout=download_timeout,
upload_timeout=upload_timeout upload_timeout=upload_timeout,
hw_accel=hw_accel,
cache_enabled=cache_enabled,
cache_dir=cache_dir if cache_dir else os.path.join(temp_dir, 'cache'),
cache_max_size_gb=cache_max_size_gb
) )
def get_work_dir_path(self, task_id: str) -> str: def get_work_dir_path(self, task_id: str) -> str:
@@ -120,3 +144,15 @@ class WorkerConfig:
def ensure_temp_dir(self) -> None: def ensure_temp_dir(self) -> None:
"""确保临时目录存在""" """确保临时目录存在"""
os.makedirs(self.temp_dir, exist_ok=True) os.makedirs(self.temp_dir, exist_ok=True)
def is_hw_accel_enabled(self) -> bool:
"""是否启用了硬件加速"""
return self.hw_accel != HW_ACCEL_NONE
def is_qsv(self) -> bool:
"""是否使用 QSV 硬件加速"""
return self.hw_accel == HW_ACCEL_QSV
def is_cuda(self) -> bool:
"""是否使用 CUDA 硬件加速"""
return self.hw_accel == HW_ACCEL_CUDA

View File

@@ -9,6 +9,12 @@ from enum import Enum
from dataclasses import dataclass, field from dataclasses import dataclass, field
from typing import Dict, Any, Optional, List from typing import Dict, Any, Optional, List
from datetime import datetime from datetime import datetime
from urllib.parse import urlparse, unquote
import os
# 支持的图片扩展名
IMAGE_EXTENSIONS = {'.jpg', '.jpeg', '.png', '.webp', '.bmp', '.gif'}
class TaskType(Enum): class TaskType(Enum):
@@ -34,6 +40,13 @@ TRANSITION_TYPES = {
'slidedown': 'slidedown', # 向下滑动 'slidedown': 'slidedown', # 向下滑动
} }
# 支持的特效类型
EFFECT_TYPES = {
'cameraShot', # 相机定格效果
'zoom', # 缩放效果(预留)
'blur', # 模糊效果(预留)
}
class TaskStatus(Enum): class TaskStatus(Enum):
"""任务状态枚举""" """任务状态枚举"""
@@ -76,6 +89,70 @@ class TransitionConfig:
return TRANSITION_TYPES.get(self.type, 'fade') return TRANSITION_TYPES.get(self.type, 'fade')
@dataclass
class Effect:
"""
特效配置
格式:type:params
例如:cameraShot:3,1 表示在第3秒定格1秒
"""
effect_type: str # 效果类型
params: str = "" # 参数字符串
@classmethod
def from_string(cls, effect_str: str) -> Optional['Effect']:
"""
从字符串解析 Effect
格式:type:params 或 type(无参数时)
"""
if not effect_str:
return None
parts = effect_str.split(':', 1)
effect_type = parts[0].strip()
if effect_type not in EFFECT_TYPES:
return None
params = parts[1].strip() if len(parts) > 1 else ""
return cls(effect_type=effect_type, params=params)
@classmethod
def parse_effects(cls, effects_str: Optional[str]) -> List['Effect']:
"""
解析效果字符串
格式:effect1|effect2|effect3
例如:cameraShot:3,1|blur:5
"""
if not effects_str:
return []
effects = []
for part in effects_str.split('|'):
effect = cls.from_string(part.strip())
if effect:
effects.append(effect)
return effects
def get_camera_shot_params(self) -> tuple:
"""
获取 cameraShot 效果参数
Returns:
(start_sec, duration_sec): 开始时间和持续时间(秒)
"""
if self.effect_type != 'cameraShot':
return (0, 0)
if not self.params:
return (3, 1) # 默认值
parts = self.params.split(',')
try:
start = int(parts[0]) if len(parts) >= 1 else 3
duration = int(parts[1]) if len(parts) >= 2 else 1
return (start, duration)
except ValueError:
return (3, 1)
@dataclass @dataclass
class RenderSpec: class RenderSpec:
""" """
@@ -137,6 +214,10 @@ class RenderSpec:
return self.transition_out.get_overlap_ms() return self.transition_out.get_overlap_ms()
return 0 return 0
def get_effects(self) -> List['Effect']:
"""获取解析后的特效列表"""
return Effect.parse_effects(self.effects)
@dataclass @dataclass
class OutputSpec: class OutputSpec:
@@ -275,9 +356,56 @@ class Task:
return int(self.payload.get('durationMs', 5000)) return int(self.payload.get('durationMs', 5000))
def get_material_url(self) -> Optional[str]: def get_material_url(self) -> Optional[str]:
"""获取素材 URL""" """
获取素材 URL
优先使用 boundMaterialUrl(实际可下载的 HTTP URL),
如果不存在则回退到 sourceRef(可能是 slot 引用)。
Returns:
素材 URL,如果都不存在返回 None
"""
return self.payload.get('boundMaterialUrl') or self.payload.get('sourceRef') return self.payload.get('boundMaterialUrl') or self.payload.get('sourceRef')
def get_source_ref(self) -> Optional[str]:
"""获取素材源引用(slot 标识符,如 device:xxx)"""
return self.payload.get('sourceRef')
def get_bound_material_url(self) -> Optional[str]:
"""获取绑定的素材 URL(实际可下载的 HTTP URL)"""
return self.payload.get('boundMaterialUrl')
def get_material_type(self) -> str:
"""
获取素材类型
优先使用服务端下发的 materialType 字段,
如果不存在则根据 URL 后缀自动推断。
Returns:
素材类型:"video""image"
"""
# 优先使用服务端下发的类型
material_type = self.payload.get('materialType')
if material_type in ('video', 'image'):
return material_type
# 降级:根据 URL 后缀推断
material_url = self.get_material_url()
if material_url:
parsed = urlparse(material_url)
path = unquote(parsed.path)
_, ext = os.path.splitext(path)
if ext.lower() in IMAGE_EXTENSIONS:
return 'image'
# 默认视频类型
return 'video'
def is_image_material(self) -> bool:
"""判断素材是否为图片类型"""
return self.get_material_type() == 'image'
def get_render_spec(self) -> RenderSpec: def get_render_spec(self) -> RenderSpec:
"""获取渲染规格""" """获取渲染规格"""
return RenderSpec.from_dict(self.payload.get('renderSpec')) return RenderSpec.from_dict(self.payload.get('renderSpec'))
@@ -286,6 +414,69 @@ class Task:
"""获取输出规格""" """获取输出规格"""
return OutputSpec.from_dict(self.payload.get('output')) return OutputSpec.from_dict(self.payload.get('output'))
def get_transition_type(self) -> Optional[str]:
"""获取转场类型(来自 TaskPayload 顶层)"""
return self.payload.get('transitionType')
def get_transition_ms(self) -> int:
"""获取转场时长(毫秒,来自 TaskPayload 顶层)"""
return int(self.payload.get('transitionMs', 0))
def has_transition(self) -> bool:
"""是否有转场效果"""
return self.get_transition_ms() > 0
def get_overlap_tail_ms(self) -> int:
"""
获取尾部 overlap 时长(毫秒)
转场发生在当前片段与下一片段之间,当前片段需要在尾部多渲染 overlap 帧。
overlap = transitionMs / 2
"""
return self.get_transition_ms() // 2
def get_transition_in_type(self) -> Optional[str]:
"""获取入场转场类型(来自前一片段的出场转场)"""
return self.payload.get('transitionInType')
def get_transition_in_ms(self) -> int:
"""获取入场转场时长(毫秒)"""
return int(self.payload.get('transitionInMs', 0))
def get_transition_out_type(self) -> Optional[str]:
"""获取出场转场类型(当前片段的转场配置)"""
return self.payload.get('transitionOutType')
def get_transition_out_ms(self) -> int:
"""获取出场转场时长(毫秒)"""
return int(self.payload.get('transitionOutMs', 0))
def has_transition_in(self) -> bool:
"""是否有入场转场"""
return self.get_transition_in_ms() > 0
def has_transition_out(self) -> bool:
"""是否有出场转场"""
return self.get_transition_out_ms() > 0
def get_overlap_head_ms(self) -> int:
"""
获取头部 overlap 时长(毫秒)
入场转场来自前一个片段,当前片段需要在头部多渲染 overlap 帧。
overlap = transitionInMs / 2
"""
return self.get_transition_in_ms() // 2
def get_overlap_tail_ms_v2(self) -> int:
"""
获取尾部 overlap 时长(毫秒)- 使用新的字段名
出场转场用于当前片段与下一片段之间,当前片段需要在尾部多渲染 overlap 帧。
overlap = transitionOutMs / 2
"""
return self.get_transition_out_ms() // 2
def get_bgm_url(self) -> Optional[str]: def get_bgm_url(self) -> Optional[str]:
"""获取 BGM URL""" """获取 BGM URL"""
return self.payload.get('bgmUrl') return self.payload.get('bgmUrl')

View File

@@ -19,6 +19,11 @@ from domain.task import Task
from domain.result import TaskResult, ErrorCode from domain.result import TaskResult, ErrorCode
from domain.config import WorkerConfig from domain.config import WorkerConfig
from services import storage from services import storage
from services.cache import MaterialCache
from constant import (
HW_ACCEL_NONE, HW_ACCEL_QSV, HW_ACCEL_CUDA,
VIDEO_ENCODE_PARAMS, VIDEO_ENCODE_PARAMS_QSV, VIDEO_ENCODE_PARAMS_CUDA
)
if TYPE_CHECKING: if TYPE_CHECKING:
from services.api_client import APIClientV2 from services.api_client import APIClientV2
@@ -26,15 +31,94 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# v2 统一视频编码参数(来自集成文档) def get_video_encode_args(hw_accel: str = HW_ACCEL_NONE) -> List[str]:
VIDEO_ENCODE_ARGS = [ """
'-c:v', 'libx264', 根据硬件加速配置获取视频编码参数
'-preset', 'medium',
'-profile:v', 'main', Args:
'-level', '4.0', hw_accel: 硬件加速类型 (none, qsv, cuda)
'-crf', '23',
'-pix_fmt', 'yuv420p', Returns:
FFmpeg 视频编码参数列表
"""
if hw_accel == HW_ACCEL_QSV:
params = VIDEO_ENCODE_PARAMS_QSV
return [
'-c:v', params['codec'],
'-preset', params['preset'],
'-profile:v', params['profile'],
'-level', params['level'],
'-global_quality', params['global_quality'],
'-look_ahead', params['look_ahead'],
] ]
elif hw_accel == HW_ACCEL_CUDA:
params = VIDEO_ENCODE_PARAMS_CUDA
return [
'-c:v', params['codec'],
'-preset', params['preset'],
'-profile:v', params['profile'],
'-level', params['level'],
'-rc', params['rc'],
'-cq', params['cq'],
'-b:v', '0', # 配合 vbr 模式使用 cq
]
else:
# 软件编码(默认)
params = VIDEO_ENCODE_PARAMS
return [
'-c:v', params['codec'],
'-preset', params['preset'],
'-profile:v', params['profile'],
'-level', params['level'],
'-crf', params['crf'],
'-pix_fmt', params['pix_fmt'],
]
def get_hwaccel_decode_args(hw_accel: str = HW_ACCEL_NONE) -> List[str]:
"""
获取硬件加速解码参数(输入文件之前使用)
Args:
hw_accel: 硬件加速类型 (none, qsv, cuda)
Returns:
FFmpeg 硬件加速解码参数列表
"""
if hw_accel == HW_ACCEL_CUDA:
# CUDA 硬件加速解码
# 注意:使用 cuda 作为 hwaccel,但输出到系统内存以便 CPU 滤镜处理
return ['-hwaccel', 'cuda', '-hwaccel_output_format', 'cuda']
elif hw_accel == HW_ACCEL_QSV:
# QSV 硬件加速解码
return ['-hwaccel', 'qsv', '-hwaccel_output_format', 'qsv']
else:
return []
def get_hwaccel_filter_prefix(hw_accel: str = HW_ACCEL_NONE) -> str:
"""
获取硬件加速滤镜前缀(用于 hwdownload 从 GPU 到 CPU)
注意:由于大多数复杂滤镜(如 lut3d, overlay, crop 等)不支持硬件表面,
我们需要在滤镜链开始时将硬件表面下载到系统内存。
Args:
hw_accel: 硬件加速类型
Returns:
需要添加到滤镜链开头的 hwdownload 滤镜字符串
"""
if hw_accel == HW_ACCEL_CUDA:
return 'hwdownload,format=nv12,'
elif hw_accel == HW_ACCEL_QSV:
return 'hwdownload,format=nv12,'
else:
return ''
# v2 统一视频编码参数(兼容旧代码,使用软件编码)
VIDEO_ENCODE_ARGS = get_video_encode_args(HW_ACCEL_NONE)
# v2 统一音频编码参数 # v2 统一音频编码参数
AUDIO_ENCODE_ARGS = [ AUDIO_ENCODE_ARGS = [
@@ -177,6 +261,38 @@ class BaseHandler(TaskHandler, ABC):
""" """
self.config = config self.config = config
self.api_client = api_client self.api_client = api_client
self.material_cache = MaterialCache(
cache_dir=config.cache_dir,
enabled=config.cache_enabled,
max_size_gb=config.cache_max_size_gb
)
def get_video_encode_args(self) -> List[str]:
"""
获取当前配置的视频编码参数
Returns:
FFmpeg 视频编码参数列表
"""
return get_video_encode_args(self.config.hw_accel)
def get_hwaccel_decode_args(self) -> List[str]:
"""
获取硬件加速解码参数(在输入文件之前使用)
Returns:
FFmpeg 硬件加速解码参数列表
"""
return get_hwaccel_decode_args(self.config.hw_accel)
def get_hwaccel_filter_prefix(self) -> str:
"""
获取硬件加速滤镜前缀
Returns:
需要添加到滤镜链开头的 hwdownload 滤镜字符串
"""
return get_hwaccel_filter_prefix(self.config.hw_accel)
def before_handle(self, task: Task) -> None: def before_handle(self, task: Task) -> None:
"""处理前钩子""" """处理前钩子"""
@@ -223,14 +339,15 @@ class BaseHandler(TaskHandler, ABC):
except Exception as e: except Exception as e:
logger.warning(f"Failed to cleanup work directory {work_dir}: {e}") logger.warning(f"Failed to cleanup work directory {work_dir}: {e}")
def download_file(self, url: str, dest: str, timeout: int = None) -> bool: def download_file(self, url: str, dest: str, timeout: int = None, use_cache: bool = True) -> bool:
""" """
下载文件 下载文件(支持缓存)
Args: Args:
url: 文件 URL url: 文件 URL
dest: 目标路径 dest: 目标路径
timeout: 超时时间(秒) timeout: 超时时间(秒)
use_cache: 是否使用缓存(默认 True)
Returns: Returns:
是否成功 是否成功
@@ -239,7 +356,13 @@ class BaseHandler(TaskHandler, ABC):
timeout = self.config.download_timeout timeout = self.config.download_timeout
try: try:
if use_cache:
# 使用缓存下载
result = self.material_cache.get_or_download(url, dest, timeout=timeout)
else:
# 直接下载(不走缓存)
result = storage.download_file(url, dest, timeout=timeout) result = storage.download_file(url, dest, timeout=timeout)
if result: if result:
file_size = os.path.getsize(dest) if os.path.exists(dest) else 0 file_size = os.path.getsize(dest) if os.path.exists(dest) else 0
logger.debug(f"Downloaded: {url} -> {dest} ({file_size} bytes)") logger.debug(f"Downloaded: {url} -> {dest} ({file_size} bytes)")

View File

@@ -10,7 +10,7 @@ import os
import logging import logging
from typing import List, Optional from typing import List, Optional
from handlers.base import BaseHandler, VIDEO_ENCODE_ARGS from handlers.base import BaseHandler
from domain.task import Task, TaskType, TransitionConfig, TRANSITION_TYPES from domain.task import Task, TaskType, TransitionConfig, TRANSITION_TYPES
from domain.result import TaskResult, ErrorCode from domain.result import TaskResult, ErrorCode
@@ -235,8 +235,8 @@ class ComposeTransitionHandler(BaseHandler):
'-map', '[outv]', '-map', '[outv]',
] ]
# 编码参数(与片段视频一致 # 编码参数(根据硬件加速配置动态获取
cmd.extend(VIDEO_ENCODE_ARGS) cmd.extend(self.get_video_encode_args())
# 帧率 # 帧率
fps = output_spec.fps fps = output_spec.fps

View File

@@ -9,14 +9,23 @@
import os import os
import logging import logging
from typing import List, Optional, Tuple from typing import List, Optional, Tuple
from urllib.parse import urlparse, unquote
from handlers.base import BaseHandler, VIDEO_ENCODE_ARGS from handlers.base import BaseHandler
from domain.task import Task, TaskType, RenderSpec, OutputSpec from domain.task import Task, TaskType, RenderSpec, OutputSpec, Effect, IMAGE_EXTENSIONS
from domain.result import TaskResult, ErrorCode from domain.result import TaskResult, ErrorCode
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def _get_extension_from_url(url: str) -> str:
"""从 URL 提取文件扩展名"""
parsed = urlparse(url)
path = unquote(parsed.path)
_, ext = os.path.splitext(path)
return ext.lower() if ext else ''
class RenderSegmentVideoHandler(BaseHandler): class RenderSegmentVideoHandler(BaseHandler):
""" """
视频片段渲染处理器 视频片段渲染处理器
@@ -46,19 +55,63 @@ class RenderSegmentVideoHandler(BaseHandler):
"Missing material URL (boundMaterialUrl or sourceRef)" "Missing material URL (boundMaterialUrl or sourceRef)"
) )
# 检查 URL 格式:必须是 HTTP 或 HTTPS 协议
if not material_url.startswith(('http://', 'https://')):
source_ref = task.get_source_ref()
bound_url = task.get_bound_material_url()
logger.error(
f"[task:{task.task_id}] Invalid material URL format: '{material_url}'. "
f"boundMaterialUrl={bound_url}, sourceRef={source_ref}. "
f"Server should provide boundMaterialUrl with HTTP/HTTPS URL."
)
return TaskResult.fail(
ErrorCode.E_SPEC_INVALID,
f"Invalid material URL: '{material_url}' is not a valid HTTP/HTTPS URL. "
f"Server must provide boundMaterialUrl."
)
render_spec = task.get_render_spec() render_spec = task.get_render_spec()
output_spec = task.get_output_spec() output_spec = task.get_output_spec()
duration_ms = task.get_duration_ms() duration_ms = task.get_duration_ms()
# 1. 下载素材 # 1. 检测素材类型并确定输入文件扩展名
is_image = task.is_image_material()
if is_image:
# 图片素材:根据 URL 确定扩展名
ext = _get_extension_from_url(material_url)
if not ext or ext not in IMAGE_EXTENSIONS:
ext = '.jpg' # 默认扩展名
input_file = os.path.join(work_dir, f'input{ext}')
else:
input_file = os.path.join(work_dir, 'input.mp4') input_file = os.path.join(work_dir, 'input.mp4')
# 2. 下载素材
if not self.download_file(material_url, input_file): if not self.download_file(material_url, input_file):
return TaskResult.fail( return TaskResult.fail(
ErrorCode.E_INPUT_UNAVAILABLE, ErrorCode.E_INPUT_UNAVAILABLE,
f"Failed to download material: {material_url}" f"Failed to download material: {material_url}"
) )
# 2. 下载 LUT(如有) # 3. 图片素材转换为视频
if is_image:
video_input_file = os.path.join(work_dir, 'input_video.mp4')
if not self._convert_image_to_video(
image_file=input_file,
output_file=video_input_file,
duration_ms=duration_ms,
output_spec=output_spec,
render_spec=render_spec,
task_id=task.task_id
):
return TaskResult.fail(
ErrorCode.E_FFMPEG_FAILED,
"Failed to convert image to video"
)
# 使用转换后的视频作为输入
input_file = video_input_file
logger.info(f"[task:{task.task_id}] Image converted to video successfully")
# 4. 下载 LUT(如有)
lut_file = None lut_file = None
if render_spec.lut_url: if render_spec.lut_url:
lut_file = os.path.join(work_dir, 'lut.cube') lut_file = os.path.join(work_dir, 'lut.cube')
@@ -66,7 +119,7 @@ class RenderSegmentVideoHandler(BaseHandler):
logger.warning(f"[task:{task.task_id}] Failed to download LUT, continuing without it") logger.warning(f"[task:{task.task_id}] Failed to download LUT, continuing without it")
lut_file = None lut_file = None
# 3. 下载叠加层(如有) # 5. 下载叠加层(如有)
overlay_file = None overlay_file = None
if render_spec.overlay_url: if render_spec.overlay_url:
# 根据 URL 后缀确定文件扩展名 # 根据 URL 后缀确定文件扩展名
@@ -78,11 +131,13 @@ class RenderSegmentVideoHandler(BaseHandler):
logger.warning(f"[task:{task.task_id}] Failed to download overlay, continuing without it") logger.warning(f"[task:{task.task_id}] Failed to download overlay, continuing without it")
overlay_file = None overlay_file = None
# 4. 计算 overlap 时长 # 6. 计算 overlap 时长(用于转场帧冻结)
overlap_head_ms = render_spec.get_overlap_head_ms() # 头部 overlap: 来自前一片段的出场转场
overlap_tail_ms = render_spec.get_overlap_tail_ms() overlap_head_ms = task.get_overlap_head_ms()
# 尾部 overlap: 当前片段的出场转场
overlap_tail_ms = task.get_overlap_tail_ms_v2()
# 5. 构建 FFmpeg 命令 # 7. 构建 FFmpeg 命令
output_file = os.path.join(work_dir, 'output.mp4') output_file = os.path.join(work_dir, 'output.mp4')
cmd = self._build_command( cmd = self._build_command(
input_file=input_file, input_file=input_file,
@@ -96,25 +151,25 @@ class RenderSegmentVideoHandler(BaseHandler):
overlap_tail_ms=overlap_tail_ms overlap_tail_ms=overlap_tail_ms
) )
# 6. 执行 FFmpeg # 8. 执行 FFmpeg
if not self.run_ffmpeg(cmd, task.task_id): if not self.run_ffmpeg(cmd, task.task_id):
return TaskResult.fail( return TaskResult.fail(
ErrorCode.E_FFMPEG_FAILED, ErrorCode.E_FFMPEG_FAILED,
"FFmpeg rendering failed" "FFmpeg rendering failed"
) )
# 7. 验证输出文件 # 9. 验证输出文件
if not self.ensure_file_exists(output_file, min_size=4096): if not self.ensure_file_exists(output_file, min_size=4096):
return TaskResult.fail( return TaskResult.fail(
ErrorCode.E_FFMPEG_FAILED, ErrorCode.E_FFMPEG_FAILED,
"Output file is missing or too small" "Output file is missing or too small"
) )
# 8. 获取实际时长 # 10. 获取实际时长
actual_duration = self.probe_duration(output_file) actual_duration = self.probe_duration(output_file)
actual_duration_ms = int(actual_duration * 1000) if actual_duration else duration_ms actual_duration_ms = int(actual_duration * 1000) if actual_duration else duration_ms
# 9. 上传产物 # 11. 上传产物
video_url = self.upload_file(task.task_id, 'video', output_file) video_url = self.upload_file(task.task_id, 'video', output_file)
if not video_url: if not video_url:
return TaskResult.fail( return TaskResult.fail(
@@ -122,7 +177,7 @@ class RenderSegmentVideoHandler(BaseHandler):
"Failed to upload video" "Failed to upload video"
) )
# 10. 构建结果(包含 overlap 信息) # 12. 构建结果(包含 overlap 信息)
result_data = { result_data = {
'videoUrl': video_url, 'videoUrl': video_url,
'actualDurationMs': actual_duration_ms, 'actualDurationMs': actual_duration_ms,
@@ -139,6 +194,96 @@ class RenderSegmentVideoHandler(BaseHandler):
finally: finally:
self.cleanup_work_dir(work_dir) self.cleanup_work_dir(work_dir)
def _convert_image_to_video(
self,
image_file: str,
output_file: str,
duration_ms: int,
output_spec: OutputSpec,
render_spec: RenderSpec,
task_id: str
) -> bool:
"""
将图片转换为视频
使用 FFmpeg 将静态图片转换为指定时长的视频,
同时应用缩放填充和变速处理。
Args:
image_file: 输入图片文件路径
output_file: 输出视频文件路径
duration_ms: 目标时长(毫秒)
output_spec: 输出规格
render_spec: 渲染规格
task_id: 任务 ID(用于日志)
Returns:
是否成功
"""
width = output_spec.width
height = output_spec.height
fps = output_spec.fps
# 计算实际时长(考虑变速)
speed = float(render_spec.speed) if render_spec.speed else 1.0
if speed <= 0:
speed = 1.0
# 变速后的实际播放时长
actual_duration_sec = (duration_ms / 1000.0) / speed
# 构建 FFmpeg 命令
cmd = [
'ffmpeg', '-y', '-hide_banner',
'-loop', '1', # 循环输入图片
'-i', image_file,
'-t', str(actual_duration_sec), # 输出时长
]
# 构建滤镜:缩放填充到目标尺寸
filters = []
# 裁切处理(与视频相同逻辑)
if render_spec.crop_enable and render_spec.face_pos:
try:
fx, fy = map(float, render_spec.face_pos.split(','))
target_ratio = width / height
filters.append(
f"crop='min(iw,ih*{target_ratio})':'min(ih,iw/{target_ratio})':"
f"'(iw-min(iw,ih*{target_ratio}))*{fx}':"
f"'(ih-min(ih,iw/{target_ratio}))*{fy}'"
)
except (ValueError, ZeroDivisionError):
logger.warning(f"[task:{task_id}] Invalid face position: {render_spec.face_pos}")
elif render_spec.zoom_cut:
target_ratio = width / height
filters.append(
f"crop='min(iw,ih*{target_ratio})':'min(ih,iw/{target_ratio})'"
)
# 缩放填充
filters.append(
f"scale={width}:{height}:force_original_aspect_ratio=decrease,"
f"pad={width}:{height}:(ow-iw)/2:(oh-ih)/2:black"
)
# 格式转换(确保兼容性)
filters.append("format=yuv420p")
cmd.extend(['-vf', ','.join(filters)])
# 编码参数
cmd.extend([
'-c:v', 'libx264',
'-preset', 'fast',
'-crf', '18',
'-r', str(fps),
'-an', # 无音频
output_file
])
logger.info(f"[task:{task_id}] Converting image to video: {actual_duration_sec:.2f}s at {fps}fps")
return self.run_ffmpeg(cmd, task_id)
def _build_command( def _build_command(
self, self,
input_file: str, input_file: str,
@@ -170,6 +315,11 @@ class RenderSegmentVideoHandler(BaseHandler):
""" """
cmd = ['ffmpeg', '-y', '-hide_banner'] cmd = ['ffmpeg', '-y', '-hide_banner']
# 硬件加速解码参数(在输入文件之前)
hwaccel_args = self.get_hwaccel_decode_args()
if hwaccel_args:
cmd.extend(hwaccel_args)
# 输入文件 # 输入文件
cmd.extend(['-i', input_file]) cmd.extend(['-i', input_file])
@@ -188,14 +338,16 @@ class RenderSegmentVideoHandler(BaseHandler):
) )
# 应用滤镜 # 应用滤镜
if overlay_file: # 检测是否为 filter_complex 格式(包含分号或方括号标签)
# 使用 filter_complex 处理叠加 is_filter_complex = ';' in filters or (filters.startswith('[') and ']' in filters)
if is_filter_complex or overlay_file:
# 使用 filter_complex 处理
cmd.extend(['-filter_complex', filters]) cmd.extend(['-filter_complex', filters])
elif filters: elif filters:
cmd.extend(['-vf', filters]) cmd.extend(['-vf', filters])
# 编码参数(v2 统一参数 # 编码参数(根据硬件加速配置动态获取
cmd.extend(VIDEO_ENCODE_ARGS) cmd.extend(self.get_video_encode_args())
# 帧率 # 帧率
fps = output_spec.fps fps = output_spec.fps
@@ -245,6 +397,17 @@ class RenderSegmentVideoHandler(BaseHandler):
filters = [] filters = []
width = output_spec.width width = output_spec.width
height = output_spec.height height = output_spec.height
fps = output_spec.fps
# 解析 effects
effects = render_spec.get_effects()
has_camera_shot = any(e.effect_type == 'cameraShot' for e in effects)
# 硬件加速时需要先 hwdownload(将 GPU 表面下载到系统内存)
hwaccel_prefix = self.get_hwaccel_filter_prefix()
if hwaccel_prefix:
# 去掉末尾的逗号,作为第一个滤镜
filters.append(hwaccel_prefix.rstrip(','))
# 1. 变速处理 # 1. 变速处理
speed = float(render_spec.speed) if render_spec.speed else 1.0 speed = float(render_spec.speed) if render_spec.speed else 1.0
@@ -255,8 +418,8 @@ class RenderSegmentVideoHandler(BaseHandler):
# 2. LUT 调色 # 2. LUT 调色
if lut_file: if lut_file:
# 路径中的反斜杠需要转 # 路径中的反斜杠需要转换,冒号需要转义(FFmpeg filter语法中冒号是特殊字符)
lut_path = lut_file.replace('\\', '/') lut_path = lut_file.replace('\\', '/').replace(':', r'\:')
filters.append(f"lut3d='{lut_path}'") filters.append(f"lut3d='{lut_path}'")
# 3. 裁切处理 # 3. 裁切处理
@@ -288,7 +451,20 @@ class RenderSegmentVideoHandler(BaseHandler):
) )
filters.append(scale_filter) filters.append(scale_filter)
# 5. 帧冻结(tpad)- 用于转场 overlap 区域 # 5. 特效处理(cameraShot 需要特殊处理)
if has_camera_shot:
# cameraShot 需要使用 filter_complex 格式
return self._build_filter_complex_with_effects(
base_filters=filters,
effects=effects,
fps=fps,
has_overlay=has_overlay,
overlap_head_ms=overlap_head_ms,
overlap_tail_ms=overlap_tail_ms,
use_hwdownload=bool(hwaccel_prefix)
)
# 6. 帧冻结(tpad)- 用于转场 overlap 区域
# 注意:tpad 必须在缩放之后应用 # 注意:tpad 必须在缩放之后应用
tpad_parts = [] tpad_parts = []
if overlap_head_ms > 0: if overlap_head_ms > 0:
@@ -303,10 +479,122 @@ class RenderSegmentVideoHandler(BaseHandler):
if tpad_parts: if tpad_parts:
filters.append(f"tpad={':'.join(tpad_parts)}") filters.append(f"tpad={':'.join(tpad_parts)}")
# 6. 构建最终滤镜 # 7. 构建最终滤镜
if has_overlay: if has_overlay:
# 使用 filter_complex 格式 # 使用 filter_complex 格式
base_filters = ','.join(filters) if filters else 'copy' base_filters = ','.join(filters) if filters else 'copy'
return f"[0:v]{base_filters}[base];[base][1:v]overlay=0:0" return f"[0:v]{base_filters}[base];[base][1:v]overlay=0:0"
else: else:
return ','.join(filters) if filters else '' return ','.join(filters) if filters else ''
def _build_filter_complex_with_effects(
self,
base_filters: List[str],
effects: List[Effect],
fps: int,
has_overlay: bool = False,
overlap_head_ms: int = 0,
overlap_tail_ms: int = 0,
use_hwdownload: bool = False
) -> str:
"""
构建包含特效的 filter_complex 滤镜图
cameraShot 效果需要使用 split/freezeframes/concat 滤镜组合。
Args:
base_filters: 基础滤镜列表
effects: 特效列表
fps: 帧率
has_overlay: 是否有叠加层
overlap_head_ms: 头部 overlap 时长
overlap_tail_ms: 尾部 overlap 时长
use_hwdownload: 是否使用了硬件加速解码(已在 base_filters 中包含 hwdownload)
Returns:
filter_complex 格式的滤镜字符串
"""
filter_parts = []
# 基础滤镜链
base_chain = ','.join(base_filters) if base_filters else 'copy'
# 当前输出标签
current_output = '[v_base]'
filter_parts.append(f"[0:v]{base_chain}{current_output}")
# 处理每个特效
effect_idx = 0
for effect in effects:
if effect.effect_type == 'cameraShot':
start_sec, duration_sec = effect.get_camera_shot_params()
if start_sec <= 0 or duration_sec <= 0:
continue
# cameraShot 实现:
# 1. fps + split 分割
# 2. 第一路:trim(0, start+duration) + freezeframes
# 3. 第二路:trim(start, end)
# 4. concat 拼接
start_frame = start_sec * fps
split_out_a = f'[eff{effect_idx}_a]'
split_out_b = f'[eff{effect_idx}_b]'
effect_output = f'[v_eff{effect_idx}]'
# fps + split
filter_parts.append(
f"{current_output}fps=fps={fps},split{split_out_a}{split_out_b}"
)
# 第一路:trim + freezeframes(在 start 帧处冻结 duration 秒)
# freezeframes: 从 first 帧开始,用 replace 帧替换后续帧
# 这样实现定格效果:在 start_frame 位置冻结
filter_parts.append(
f"{split_out_a}trim=start=0:end={start_sec + duration_sec},"
f"setpts=PTS-STARTPTS,"
f"freezeframes=first={start_frame}:last={start_frame + duration_sec * fps - 1}:replace={start_frame}"
f"{split_out_a}"
)
# 第二路:trim 从 start 开始
filter_parts.append(
f"{split_out_b}trim=start={start_sec},setpts=PTS-STARTPTS{split_out_b}"
)
# concat 拼接
filter_parts.append(
f"{split_out_a}{split_out_b}concat=n=2:v=1:a=0{effect_output}"
)
current_output = effect_output
effect_idx += 1
# 帧冻结(tpad)- 用于转场 overlap 区域
tpad_parts = []
if overlap_head_ms > 0:
head_duration_sec = overlap_head_ms / 1000.0
tpad_parts.append(f"start_mode=clone:start_duration={head_duration_sec}")
if overlap_tail_ms > 0:
tail_duration_sec = overlap_tail_ms / 1000.0
tpad_parts.append(f"stop_mode=clone:stop_duration={tail_duration_sec}")
if tpad_parts:
tpad_output = '[v_tpad]'
filter_parts.append(f"{current_output}tpad={':'.join(tpad_parts)}{tpad_output}")
current_output = tpad_output
# 最终输出
if has_overlay:
# 叠加层处理
filter_parts.append(f"{current_output}[1:v]overlay=0:0")
else:
# 移除最后一个标签,直接输出
# 将最后一个滤镜的输出标签替换为空(直接输出)
if filter_parts:
last_filter = filter_parts[-1]
# 移除末尾的输出标签
if last_filter.endswith(current_output):
filter_parts[-1] = last_filter[:-len(current_output)]
return ';'.join(filter_parts)

View File

@@ -26,6 +26,8 @@ import time
import signal import signal
import logging import logging
from dotenv import load_dotenv
from domain.config import WorkerConfig from domain.config import WorkerConfig
from services.api_client import APIClientV2 from services.api_client import APIClientV2
from services.task_executor import TaskExecutor from services.task_executor import TaskExecutor
@@ -166,6 +168,9 @@ class WorkerV2:
def main(): def main():
"""主函数""" """主函数"""
# 加载 .env 文件(如果存在)
load_dotenv()
logger.info(f"RenderWorker v{SOFTWARE_VERSION}") logger.info(f"RenderWorker v{SOFTWARE_VERSION}")
# 创建并运行 Worker # 创建并运行 Worker

View File

@@ -12,6 +12,7 @@ from typing import Dict, List, Optional, Any
from domain.task import Task from domain.task import Task
from domain.config import WorkerConfig from domain.config import WorkerConfig
from util.system import get_hw_accel_info_str
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -80,7 +81,7 @@ class APIClientV2:
# 解析任务列表 # 解析任务列表
tasks = [] tasks = []
for task_data in data.get('data', {}).get('tasks', []): for task_data in data.get('data', {}).get('tasks') or []:
try: try:
task = Task.from_dict(task_data) task = Task.from_dict(task_data)
tasks.append(task) tasks.append(task)
@@ -338,7 +339,9 @@ class APIClientV2:
'cpu': f"{psutil.cpu_count()} cores", 'cpu': f"{psutil.cpu_count()} cores",
'memory': f"{psutil.virtual_memory().total // (1024**3)}GB", 'memory': f"{psutil.virtual_memory().total // (1024**3)}GB",
'cpuUsage': f"{psutil.cpu_percent()}%", 'cpuUsage': f"{psutil.cpu_percent()}%",
'memoryAvailable': f"{psutil.virtual_memory().available // (1024**3)}GB" 'memoryAvailable': f"{psutil.virtual_memory().available // (1024**3)}GB",
'hwAccelConfig': self.config.hw_accel, # 当前配置的硬件加速
'hwAccelSupport': get_hw_accel_info_str(), # 系统支持的硬件加速
} }
# 尝试获取 GPU 信息 # 尝试获取 GPU 信息

291
services/cache.py Normal file
View File

@@ -0,0 +1,291 @@
# -*- coding: utf-8 -*-
"""
素材缓存服务
提供素材下载缓存功能,避免相同素材重复下载。
"""
import os
import hashlib
import logging
import shutil
import time
from typing import Optional, Tuple
from urllib.parse import urlparse, unquote
from services import storage
logger = logging.getLogger(__name__)
def _extract_cache_key(url: str) -> str:
"""
从 URL 提取缓存键
去除签名等查询参数,保留路径作为唯一标识。
Args:
url: 完整的素材 URL
Returns:
缓存键(URL 路径的 MD5 哈希)
"""
parsed = urlparse(url)
# 使用 scheme + host + path 作为唯一标识(忽略签名等查询参数)
cache_key_source = f"{parsed.scheme}://{parsed.netloc}{unquote(parsed.path)}"
return hashlib.md5(cache_key_source.encode('utf-8')).hexdigest()
def _get_file_extension(url: str) -> str:
"""
从 URL 提取文件扩展名
Args:
url: 素材 URL
Returns:
文件扩展名(如 .mp4, .png),无法识别时返回空字符串
"""
parsed = urlparse(url)
path = unquote(parsed.path)
_, ext = os.path.splitext(path)
return ext.lower() if ext else ''
class MaterialCache:
"""
素材缓存管理器
负责素材文件的缓存存储和检索。
"""
def __init__(self, cache_dir: str, enabled: bool = True, max_size_gb: float = 0):
"""
初始化缓存管理器
Args:
cache_dir: 缓存目录路径
enabled: 是否启用缓存
max_size_gb: 最大缓存大小(GB),0 表示不限制
"""
self.cache_dir = cache_dir
self.enabled = enabled
self.max_size_bytes = int(max_size_gb * 1024 * 1024 * 1024) if max_size_gb > 0 else 0
if self.enabled:
os.makedirs(self.cache_dir, exist_ok=True)
logger.info(f"Material cache initialized: {cache_dir}")
def get_cache_path(self, url: str) -> str:
"""
获取素材的缓存文件路径
Args:
url: 素材 URL
Returns:
缓存文件的完整路径
"""
cache_key = _extract_cache_key(url)
ext = _get_file_extension(url)
filename = f"{cache_key}{ext}"
return os.path.join(self.cache_dir, filename)
def is_cached(self, url: str) -> Tuple[bool, str]:
"""
检查素材是否已缓存
Args:
url: 素材 URL
Returns:
(是否已缓存, 缓存文件路径)
"""
if not self.enabled:
return False, ''
cache_path = self.get_cache_path(url)
exists = os.path.exists(cache_path) and os.path.getsize(cache_path) > 0
return exists, cache_path
def get_or_download(
self,
url: str,
dest: str,
timeout: int = 300,
max_retries: int = 5
) -> bool:
"""
从缓存获取素材,若未缓存则下载并缓存
Args:
url: 素材 URL
dest: 目标文件路径(任务工作目录中的路径)
timeout: 下载超时时间(秒)
max_retries: 最大重试次数
Returns:
是否成功
"""
# 确保目标目录存在
dest_dir = os.path.dirname(dest)
if dest_dir:
os.makedirs(dest_dir, exist_ok=True)
# 缓存未启用时直接下载
if not self.enabled:
return storage.download_file(url, dest, max_retries=max_retries, timeout=timeout)
# 检查缓存
cached, cache_path = self.is_cached(url)
if cached:
# 命中缓存,复制到目标路径
try:
shutil.copy2(cache_path, dest)
# 更新访问时间(用于 LRU 清理)
os.utime(cache_path, None)
file_size = os.path.getsize(dest)
logger.info(f"Cache hit: {url[:80]}... -> {dest} ({file_size} bytes)")
return True
except Exception as e:
logger.warning(f"Failed to copy from cache: {e}, will re-download")
# 缓存复制失败,删除可能损坏的缓存文件
try:
os.remove(cache_path)
except Exception:
pass
# 未命中缓存,下载到缓存目录
logger.debug(f"Cache miss: {url[:80]}...")
# 先下载到临时文件
temp_cache_path = cache_path + '.downloading'
try:
if not storage.download_file(url, temp_cache_path, max_retries=max_retries, timeout=timeout):
# 下载失败,清理临时文件
if os.path.exists(temp_cache_path):
os.remove(temp_cache_path)
return False
# 下载成功,移动到正式缓存路径
if os.path.exists(cache_path):
os.remove(cache_path)
os.rename(temp_cache_path, cache_path)
# 复制到目标路径
shutil.copy2(cache_path, dest)
file_size = os.path.getsize(dest)
logger.info(f"Downloaded and cached: {url[:80]}... ({file_size} bytes)")
# 检查是否需要清理缓存
if self.max_size_bytes > 0:
self._cleanup_if_needed()
return True
except Exception as e:
logger.error(f"Cache download error: {e}")
# 清理临时文件
if os.path.exists(temp_cache_path):
try:
os.remove(temp_cache_path)
except Exception:
pass
return False
def _cleanup_if_needed(self) -> None:
"""
检查并清理缓存(LRU 策略)
当缓存大小超过限制时,删除最久未访问的文件。
"""
if self.max_size_bytes <= 0:
return
try:
# 获取所有缓存文件及其信息
cache_files = []
total_size = 0
for filename in os.listdir(self.cache_dir):
if filename.endswith('.downloading'):
continue
file_path = os.path.join(self.cache_dir, filename)
if os.path.isfile(file_path):
stat = os.stat(file_path)
cache_files.append({
'path': file_path,
'size': stat.st_size,
'atime': stat.st_atime
})
total_size += stat.st_size
# 如果未超过限制,无需清理
if total_size <= self.max_size_bytes:
return
# 按访问时间排序(最久未访问的在前)
cache_files.sort(key=lambda x: x['atime'])
# 删除文件直到低于限制的 80%
target_size = int(self.max_size_bytes * 0.8)
deleted_count = 0
for file_info in cache_files:
if total_size <= target_size:
break
try:
os.remove(file_info['path'])
total_size -= file_info['size']
deleted_count += 1
except Exception as e:
logger.warning(f"Failed to delete cache file: {e}")
if deleted_count > 0:
logger.info(f"Cache cleanup: deleted {deleted_count} files, current size: {total_size / (1024*1024*1024):.2f} GB")
except Exception as e:
logger.warning(f"Cache cleanup error: {e}")
def clear(self) -> None:
"""清空所有缓存"""
if not self.enabled:
return
try:
if os.path.exists(self.cache_dir):
shutil.rmtree(self.cache_dir)
os.makedirs(self.cache_dir, exist_ok=True)
logger.info("Cache cleared")
except Exception as e:
logger.error(f"Failed to clear cache: {e}")
def get_stats(self) -> dict:
"""
获取缓存统计信息
Returns:
包含缓存统计的字典
"""
if not self.enabled or not os.path.exists(self.cache_dir):
return {'enabled': False, 'file_count': 0, 'total_size_mb': 0}
file_count = 0
total_size = 0
for filename in os.listdir(self.cache_dir):
if filename.endswith('.downloading'):
continue
file_path = os.path.join(self.cache_dir, filename)
if os.path.isfile(file_path):
file_count += 1
total_size += os.path.getsize(file_path)
return {
'enabled': True,
'cache_dir': self.cache_dir,
'file_count': file_count,
'total_size_mb': round(total_size / (1024 * 1024), 2),
'max_size_gb': self.max_size_bytes / (1024 * 1024 * 1024) if self.max_size_bytes > 0 else 0
}

View File

@@ -8,10 +8,10 @@
import os import os
import platform import platform
import subprocess import subprocess
from typing import Optional from typing import Optional, Dict, Any
import psutil import psutil
from constant import SOFTWARE_VERSION, DEFAULT_CAPABILITIES from constant import SOFTWARE_VERSION, DEFAULT_CAPABILITIES, HW_ACCEL_NONE, HW_ACCEL_QSV, HW_ACCEL_CUDA
def get_sys_info(): def get_sys_info():
@@ -101,3 +101,166 @@ def get_ffmpeg_version() -> str:
pass pass
return 'unknown' return 'unknown'
def check_ffmpeg_encoder(encoder: str) -> bool:
"""
检查 FFmpeg 是否支持指定的编码器
Args:
encoder: 编码器名称,如 'h264_nvenc', 'h264_qsv'
Returns:
bool: 是否支持该编码器
"""
try:
result = subprocess.run(
['ffmpeg', '-hide_banner', '-encoders'],
capture_output=True,
text=True,
timeout=5
)
if result.returncode == 0:
return encoder in result.stdout
except Exception:
pass
return False
def check_ffmpeg_decoder(decoder: str) -> bool:
"""
检查 FFmpeg 是否支持指定的解码器
Args:
decoder: 解码器名称,如 'h264_cuvid', 'h264_qsv'
Returns:
bool: 是否支持该解码器
"""
try:
result = subprocess.run(
['ffmpeg', '-hide_banner', '-decoders'],
capture_output=True,
text=True,
timeout=5
)
if result.returncode == 0:
return decoder in result.stdout
except Exception:
pass
return False
def check_ffmpeg_hwaccel(hwaccel: str) -> bool:
"""
检查 FFmpeg 是否支持指定的硬件加速方法
Args:
hwaccel: 硬件加速方法,如 'cuda', 'qsv', 'dxva2', 'd3d11va'
Returns:
bool: 是否支持该硬件加速方法
"""
try:
result = subprocess.run(
['ffmpeg', '-hide_banner', '-hwaccels'],
capture_output=True,
text=True,
timeout=5
)
if result.returncode == 0:
return hwaccel in result.stdout
except Exception:
pass
return False
def detect_hw_accel_support() -> Dict[str, Any]:
"""
检测系统的硬件加速支持情况
Returns:
dict: 硬件加速支持信息
{
'cuda': {
'available': bool,
'gpu': str or None,
'encoder': bool, # h264_nvenc
'decoder': bool, # h264_cuvid
},
'qsv': {
'available': bool,
'encoder': bool, # h264_qsv
'decoder': bool, # h264_qsv
},
'recommended': str # 推荐的加速方式: 'cuda', 'qsv', 'none'
}
"""
result = {
'cuda': {
'available': False,
'gpu': None,
'encoder': False,
'decoder': False,
},
'qsv': {
'available': False,
'encoder': False,
'decoder': False,
},
'recommended': HW_ACCEL_NONE
}
# 检测 CUDA/NVENC 支持
gpu_info = get_gpu_info()
if gpu_info:
result['cuda']['gpu'] = gpu_info
result['cuda']['available'] = check_ffmpeg_hwaccel('cuda')
result['cuda']['encoder'] = check_ffmpeg_encoder('h264_nvenc')
result['cuda']['decoder'] = check_ffmpeg_decoder('h264_cuvid')
# 检测 QSV 支持
result['qsv']['available'] = check_ffmpeg_hwaccel('qsv')
result['qsv']['encoder'] = check_ffmpeg_encoder('h264_qsv')
result['qsv']['decoder'] = check_ffmpeg_decoder('h264_qsv')
# 推荐硬件加速方式(优先 CUDA,其次 QSV)
if result['cuda']['available'] and result['cuda']['encoder']:
result['recommended'] = HW_ACCEL_CUDA
elif result['qsv']['available'] and result['qsv']['encoder']:
result['recommended'] = HW_ACCEL_QSV
return result
def get_hw_accel_info_str() -> str:
"""
获取硬件加速支持信息的可读字符串
Returns:
str: 硬件加速支持信息描述
"""
support = detect_hw_accel_support()
parts = []
if support['cuda']['available']:
gpu = support['cuda']['gpu'] or 'Unknown GPU'
status = 'encoder+decoder' if support['cuda']['encoder'] and support['cuda']['decoder'] else (
'encoder only' if support['cuda']['encoder'] else 'decoder only' if support['cuda']['decoder'] else 'hwaccel only'
)
parts.append(f"CUDA({gpu}, {status})")
if support['qsv']['available']:
status = 'encoder+decoder' if support['qsv']['encoder'] and support['qsv']['decoder'] else (
'encoder only' if support['qsv']['encoder'] else 'decoder only' if support['qsv']['decoder'] else 'hwaccel only'
)
parts.append(f"QSV({status})")
if not parts:
return "No hardware acceleration available"
return ', '.join(parts) + f" [recommended: {support['recommended']}]"