Files
FrameTour-RenderWorker/handlers/base.py
Jerry Yan fe757408b6 feat(cache): 添加素材缓存功能以避免重复下载
- 新增素材缓存配置选项包括启用状态、缓存目录和最大缓存大小
- 实现 MaterialCache 类提供缓存存储和检索功能
- 修改 download_file 方法支持缓存下载模式
- 添加缓存清理机制使用 LRU 策略管理磁盘空间
- 配置默认值优化本地开发体验
- 实现缓存统计和监控功能
2026-01-17 15:07:12 +08:00

520 lines
15 KiB
Python

# -*- coding: utf-8 -*-
"""
任务处理器基类
提供所有处理器共用的基础功能。
"""
import os
import json
import logging
import shutil
import tempfile
import subprocess
from abc import ABC
from typing import Optional, List, Dict, Any, Tuple, TYPE_CHECKING
from core.handler import TaskHandler
from domain.task import Task
from domain.result import TaskResult, ErrorCode
from domain.config import WorkerConfig
from services import storage
from services.cache import MaterialCache
from constant import (
HW_ACCEL_NONE, HW_ACCEL_QSV, HW_ACCEL_CUDA,
VIDEO_ENCODE_PARAMS, VIDEO_ENCODE_PARAMS_QSV, VIDEO_ENCODE_PARAMS_CUDA
)
if TYPE_CHECKING:
from services.api_client import APIClientV2
logger = logging.getLogger(__name__)
def get_video_encode_args(hw_accel: str = HW_ACCEL_NONE) -> List[str]:
"""
根据硬件加速配置获取视频编码参数
Args:
hw_accel: 硬件加速类型 (none, qsv, cuda)
Returns:
FFmpeg 视频编码参数列表
"""
if hw_accel == HW_ACCEL_QSV:
params = VIDEO_ENCODE_PARAMS_QSV
return [
'-c:v', params['codec'],
'-preset', params['preset'],
'-profile:v', params['profile'],
'-level', params['level'],
'-global_quality', params['global_quality'],
'-look_ahead', params['look_ahead'],
]
elif hw_accel == HW_ACCEL_CUDA:
params = VIDEO_ENCODE_PARAMS_CUDA
return [
'-c:v', params['codec'],
'-preset', params['preset'],
'-profile:v', params['profile'],
'-level', params['level'],
'-rc', params['rc'],
'-cq', params['cq'],
'-b:v', '0', # 配合 vbr 模式使用 cq
]
else:
# 软件编码(默认)
params = VIDEO_ENCODE_PARAMS
return [
'-c:v', params['codec'],
'-preset', params['preset'],
'-profile:v', params['profile'],
'-level', params['level'],
'-crf', params['crf'],
'-pix_fmt', params['pix_fmt'],
]
def get_hwaccel_decode_args(hw_accel: str = HW_ACCEL_NONE) -> List[str]:
"""
获取硬件加速解码参数(输入文件之前使用)
Args:
hw_accel: 硬件加速类型 (none, qsv, cuda)
Returns:
FFmpeg 硬件加速解码参数列表
"""
if hw_accel == HW_ACCEL_CUDA:
# CUDA 硬件加速解码
# 注意:使用 cuda 作为 hwaccel,但输出到系统内存以便 CPU 滤镜处理
return ['-hwaccel', 'cuda', '-hwaccel_output_format', 'cuda']
elif hw_accel == HW_ACCEL_QSV:
# QSV 硬件加速解码
return ['-hwaccel', 'qsv', '-hwaccel_output_format', 'qsv']
else:
return []
def get_hwaccel_filter_prefix(hw_accel: str = HW_ACCEL_NONE) -> str:
"""
获取硬件加速滤镜前缀(用于 hwdownload 从 GPU 到 CPU)
注意:由于大多数复杂滤镜(如 lut3d, overlay, crop 等)不支持硬件表面,
我们需要在滤镜链开始时将硬件表面下载到系统内存。
Args:
hw_accel: 硬件加速类型
Returns:
需要添加到滤镜链开头的 hwdownload 滤镜字符串
"""
if hw_accel == HW_ACCEL_CUDA:
return 'hwdownload,format=nv12,'
elif hw_accel == HW_ACCEL_QSV:
return 'hwdownload,format=nv12,'
else:
return ''
# v2 统一视频编码参数(兼容旧代码,使用软件编码)
VIDEO_ENCODE_ARGS = get_video_encode_args(HW_ACCEL_NONE)
# v2 统一音频编码参数
AUDIO_ENCODE_ARGS = [
'-c:a', 'aac',
'-b:a', '128k',
'-ar', '48000',
'-ac', '2',
]
def subprocess_args(include_stdout: bool = True) -> Dict[str, Any]:
"""
创建跨平台的 subprocess 参数
在 Windows 上使用 Pyinstaller --noconsole 打包时,需要特殊处理以避免弹出命令行窗口。
Args:
include_stdout: 是否包含 stdout 捕获
Returns:
subprocess.run 使用的参数字典
"""
ret: Dict[str, Any] = {}
# Windows 特殊处理
if hasattr(subprocess, 'STARTUPINFO'):
si = subprocess.STARTUPINFO()
si.dwFlags |= subprocess.STARTF_USESHOWWINDOW
ret['startupinfo'] = si
ret['env'] = os.environ
# 重定向 stdin 避免 "handle is invalid" 错误
ret['stdin'] = subprocess.PIPE
if include_stdout:
ret['stdout'] = subprocess.PIPE
return ret
def probe_video_info(video_file: str) -> Tuple[int, int, float]:
"""
探测视频信息(宽度、高度、时长)
Args:
video_file: 视频文件路径
Returns:
(width, height, duration) 元组,失败返回 (0, 0, 0)
"""
try:
result = subprocess.run(
[
'ffprobe', '-v', 'error',
'-select_streams', 'v:0',
'-show_entries', 'stream=width,height:format=duration',
'-of', 'csv=s=x:p=0',
video_file
],
capture_output=True,
timeout=30,
**subprocess_args(False)
)
if result.returncode != 0:
logger.warning(f"ffprobe failed for {video_file}")
return 0, 0, 0
output = result.stdout.decode('utf-8').strip()
if not output:
return 0, 0, 0
lines = output.split('\n')
if len(lines) >= 2:
wh = lines[0].strip()
duration_str = lines[1].strip()
width, height = wh.split('x')
return int(width), int(height), float(duration_str)
return 0, 0, 0
except Exception as e:
logger.warning(f"probe_video_info error: {e}")
return 0, 0, 0
def probe_duration_json(file_path: str) -> Optional[float]:
"""
使用 ffprobe JSON 输出探测媒体时长
Args:
file_path: 媒体文件路径
Returns:
时长(秒),失败返回 None
"""
try:
result = subprocess.run(
[
'ffprobe', '-v', 'error',
'-show_entries', 'format=duration',
'-of', 'json',
file_path
],
capture_output=True,
timeout=30,
**subprocess_args(False)
)
if result.returncode != 0:
return None
data = json.loads(result.stdout.decode('utf-8'))
duration = data.get('format', {}).get('duration')
return float(duration) if duration else None
except Exception as e:
logger.warning(f"probe_duration_json error: {e}")
return None
class BaseHandler(TaskHandler, ABC):
"""
任务处理器基类
提供所有处理器共用的基础功能,包括:
- 临时目录管理
- 文件下载/上传
- FFmpeg 命令执行
- 日志记录
"""
def __init__(self, config: WorkerConfig, api_client: 'APIClientV2'):
"""
初始化处理器
Args:
config: Worker 配置
api_client: API 客户端
"""
self.config = config
self.api_client = api_client
self.material_cache = MaterialCache(
cache_dir=config.cache_dir,
enabled=config.cache_enabled,
max_size_gb=config.cache_max_size_gb
)
def get_video_encode_args(self) -> List[str]:
"""
获取当前配置的视频编码参数
Returns:
FFmpeg 视频编码参数列表
"""
return get_video_encode_args(self.config.hw_accel)
def get_hwaccel_decode_args(self) -> List[str]:
"""
获取硬件加速解码参数(在输入文件之前使用)
Returns:
FFmpeg 硬件加速解码参数列表
"""
return get_hwaccel_decode_args(self.config.hw_accel)
def get_hwaccel_filter_prefix(self) -> str:
"""
获取硬件加速滤镜前缀
Returns:
需要添加到滤镜链开头的 hwdownload 滤镜字符串
"""
return get_hwaccel_filter_prefix(self.config.hw_accel)
def before_handle(self, task: Task) -> None:
"""处理前钩子"""
logger.debug(f"[task:{task.task_id}] Before handle: {task.task_type.value}")
def after_handle(self, task: Task, result: TaskResult) -> None:
"""处理后钩子"""
status = "success" if result.success else "failed"
logger.debug(f"[task:{task.task_id}] After handle: {status}")
def create_work_dir(self, task_id: str = None) -> str:
"""
创建临时工作目录
Args:
task_id: 任务 ID(用于目录命名)
Returns:
工作目录路径
"""
# 确保临时根目录存在
os.makedirs(self.config.temp_dir, exist_ok=True)
# 创建唯一的工作目录
prefix = f"task_{task_id}_" if task_id else "task_"
work_dir = tempfile.mkdtemp(dir=self.config.temp_dir, prefix=prefix)
logger.debug(f"Created work directory: {work_dir}")
return work_dir
def cleanup_work_dir(self, work_dir: str) -> None:
"""
清理临时工作目录
Args:
work_dir: 工作目录路径
"""
if not work_dir or not os.path.exists(work_dir):
return
try:
shutil.rmtree(work_dir)
logger.debug(f"Cleaned up work directory: {work_dir}")
except Exception as e:
logger.warning(f"Failed to cleanup work directory {work_dir}: {e}")
def download_file(self, url: str, dest: str, timeout: int = None, use_cache: bool = True) -> bool:
"""
下载文件(支持缓存)
Args:
url: 文件 URL
dest: 目标路径
timeout: 超时时间(秒)
use_cache: 是否使用缓存(默认 True)
Returns:
是否成功
"""
if timeout is None:
timeout = self.config.download_timeout
try:
if use_cache:
# 使用缓存下载
result = self.material_cache.get_or_download(url, dest, timeout=timeout)
else:
# 直接下载(不走缓存)
result = storage.download_file(url, dest, timeout=timeout)
if result:
file_size = os.path.getsize(dest) if os.path.exists(dest) else 0
logger.debug(f"Downloaded: {url} -> {dest} ({file_size} bytes)")
return result
except Exception as e:
logger.error(f"Download failed: {url} -> {e}")
return False
def upload_file(
self,
task_id: str,
file_type: str,
file_path: str,
file_name: str = None
) -> Optional[str]:
"""
上传文件并返回访问 URL
Args:
task_id: 任务 ID
file_type: 文件类型(video/audio/ts/mp4)
file_path: 本地文件路径
file_name: 文件名(可选)
Returns:
访问 URL,失败返回 None
"""
# 获取上传 URL
upload_info = self.api_client.get_upload_url(task_id, file_type, file_name)
if not upload_info:
logger.error(f"[task:{task_id}] Failed to get upload URL")
return None
upload_url = upload_info.get('uploadUrl')
access_url = upload_info.get('accessUrl')
if not upload_url:
logger.error(f"[task:{task_id}] Invalid upload URL response")
return None
# 上传文件
try:
result = storage.upload_file(upload_url, file_path, timeout=self.config.upload_timeout)
if result:
file_size = os.path.getsize(file_path)
logger.info(f"[task:{task_id}] Uploaded: {file_path} ({file_size} bytes)")
return access_url
else:
logger.error(f"[task:{task_id}] Upload failed: {file_path}")
return None
except Exception as e:
logger.error(f"[task:{task_id}] Upload error: {e}")
return None
def run_ffmpeg(
self,
cmd: List[str],
task_id: str,
timeout: int = None
) -> bool:
"""
执行 FFmpeg 命令
Args:
cmd: FFmpeg 命令参数列表
task_id: 任务 ID(用于日志)
timeout: 超时时间(秒)
Returns:
是否成功
"""
if timeout is None:
timeout = self.config.ffmpeg_timeout
# 日志记录命令(限制长度)
cmd_str = ' '.join(cmd)
if len(cmd_str) > 500:
cmd_str = cmd_str[:500] + '...'
logger.info(f"[task:{task_id}] FFmpeg: {cmd_str}")
try:
result = subprocess.run(
cmd,
capture_output=True,
timeout=timeout,
**subprocess_args(False)
)
if result.returncode != 0:
stderr = result.stderr.decode('utf-8', errors='replace')[:1000]
logger.error(f"[task:{task_id}] FFmpeg failed (code={result.returncode}): {stderr}")
return False
return True
except subprocess.TimeoutExpired:
logger.error(f"[task:{task_id}] FFmpeg timeout after {timeout}s")
return False
except Exception as e:
logger.error(f"[task:{task_id}] FFmpeg error: {e}")
return False
def probe_duration(self, file_path: str) -> Optional[float]:
"""
探测媒体文件时长
Args:
file_path: 文件路径
Returns:
时长(秒),失败返回 None
"""
# 首先尝试 JSON 输出方式
duration = probe_duration_json(file_path)
if duration is not None:
return duration
# 回退到旧方式
try:
_, _, duration = probe_video_info(file_path)
return float(duration) if duration else None
except Exception as e:
logger.warning(f"Failed to probe duration: {file_path} -> {e}")
return None
def get_file_size(self, file_path: str) -> int:
"""
获取文件大小
Args:
file_path: 文件路径
Returns:
文件大小(字节)
"""
try:
return os.path.getsize(file_path)
except Exception:
return 0
def ensure_file_exists(self, file_path: str, min_size: int = 0) -> bool:
"""
确保文件存在且大小满足要求
Args:
file_path: 文件路径
min_size: 最小大小(字节)
Returns:
是否满足要求
"""
if not os.path.exists(file_path):
return False
return os.path.getsize(file_path) >= min_size