Files
Jerry Yan 6126856361 feat(cache): 实现上传文件缓存功能
- 在文件上传成功后将文件加入缓存系统
- 添加 add_to_cache 方法支持本地文件缓存
- 实现原子操作确保缓存写入安全
- 集成锁机制防止并发冲突
- 自动触发缓存清理策略
- 记录详细的缓存操作日志
2026-01-26 10:41:26 +08:00

586 lines
17 KiB
Python

# -*- coding: utf-8 -*-
"""
任务处理器基类
提供所有处理器共用的基础功能。
"""
import os
import json
import logging
import shutil
import tempfile
import subprocess
import threading
from abc import ABC
from typing import Optional, List, Dict, Any, Tuple, TYPE_CHECKING
from core.handler import TaskHandler
from domain.task import Task
from domain.result import TaskResult, ErrorCode
from domain.config import WorkerConfig
from services import storage
from services.cache import MaterialCache
from constant import (
HW_ACCEL_NONE, HW_ACCEL_QSV, HW_ACCEL_CUDA,
VIDEO_ENCODE_PARAMS, VIDEO_ENCODE_PARAMS_QSV, VIDEO_ENCODE_PARAMS_CUDA
)
if TYPE_CHECKING:
from services.api_client import APIClientV2
logger = logging.getLogger(__name__)
def get_video_encode_args(hw_accel: str = HW_ACCEL_NONE) -> List[str]:
"""
根据硬件加速配置获取视频编码参数
Args:
hw_accel: 硬件加速类型 (none, qsv, cuda)
Returns:
FFmpeg 视频编码参数列表
"""
if hw_accel == HW_ACCEL_QSV:
params = VIDEO_ENCODE_PARAMS_QSV
return [
'-c:v', params['codec'],
'-preset', params['preset'],
'-profile:v', params['profile'],
'-level', params['level'],
'-global_quality', params['global_quality'],
'-look_ahead', params['look_ahead'],
]
elif hw_accel == HW_ACCEL_CUDA:
params = VIDEO_ENCODE_PARAMS_CUDA
return [
'-c:v', params['codec'],
'-preset', params['preset'],
'-profile:v', params['profile'],
'-level', params['level'],
'-rc', params['rc'],
'-cq', params['cq'],
'-b:v', '0', # 配合 vbr 模式使用 cq
]
else:
# 软件编码(默认)
params = VIDEO_ENCODE_PARAMS
return [
'-c:v', params['codec'],
'-preset', params['preset'],
'-profile:v', params['profile'],
'-level', params['level'],
'-crf', params['crf'],
'-pix_fmt', params['pix_fmt'],
]
def get_hwaccel_decode_args(hw_accel: str = HW_ACCEL_NONE, device_index: Optional[int] = None) -> List[str]:
"""
获取硬件加速解码参数(输入文件之前使用)
Args:
hw_accel: 硬件加速类型 (none, qsv, cuda)
device_index: GPU 设备索引,用于多显卡调度
Returns:
FFmpeg 硬件加速解码参数列表
"""
if hw_accel == HW_ACCEL_CUDA:
# CUDA 硬件加速解码
args = ['-hwaccel', 'cuda']
# 多显卡模式下指定设备
if device_index is not None:
args.extend(['-hwaccel_device', str(device_index)])
args.extend(['-hwaccel_output_format', 'cuda'])
return args
elif hw_accel == HW_ACCEL_QSV:
# QSV 硬件加速解码
args = ['-hwaccel', 'qsv']
# QSV 在 Windows 上使用 -qsv_device
if device_index is not None:
args.extend(['-qsv_device', str(device_index)])
args.extend(['-hwaccel_output_format', 'qsv'])
return args
else:
return []
def get_hwaccel_filter_prefix(hw_accel: str = HW_ACCEL_NONE) -> str:
"""
获取硬件加速滤镜前缀(用于 hwdownload 从 GPU 到 CPU)
注意:由于大多数复杂滤镜(如 lut3d, overlay, crop 等)不支持硬件表面,
我们需要在滤镜链开始时将硬件表面下载到系统内存。
CUDA/QSV hwdownload 只支持 nv12 格式输出,因此需要两步转换:
1. hwdownload,format=nv12 - 从 GPU 下载到 CPU
2. format=yuv420p - 转换为标准格式(确保与 RGBA/YUVA overlay 混合时颜色正确)
Args:
hw_accel: 硬件加速类型
Returns:
需要添加到滤镜链开头的 hwdownload 滤镜字符串
"""
if hw_accel == HW_ACCEL_CUDA:
return 'hwdownload,format=nv12,format=yuv420p,'
elif hw_accel == HW_ACCEL_QSV:
return 'hwdownload,format=nv12,format=yuv420p,'
else:
return ''
# v2 统一视频编码参数(兼容旧代码,使用软件编码)
VIDEO_ENCODE_ARGS = get_video_encode_args(HW_ACCEL_NONE)
# v2 统一音频编码参数
AUDIO_ENCODE_ARGS = [
'-c:a', 'aac',
'-b:a', '128k',
'-ar', '48000',
'-ac', '2',
]
FFMPEG_LOGLEVEL = 'error'
def subprocess_args(include_stdout: bool = True) -> Dict[str, Any]:
"""
创建跨平台的 subprocess 参数
在 Windows 上使用 Pyinstaller --noconsole 打包时,需要特殊处理以避免弹出命令行窗口。
Args:
include_stdout: 是否包含 stdout 捕获
Returns:
subprocess.run 使用的参数字典
"""
ret: Dict[str, Any] = {}
# Windows 特殊处理
if hasattr(subprocess, 'STARTUPINFO'):
si = subprocess.STARTUPINFO()
si.dwFlags |= subprocess.STARTF_USESHOWWINDOW
ret['startupinfo'] = si
ret['env'] = os.environ
# 重定向 stdin 避免 "handle is invalid" 错误
ret['stdin'] = subprocess.PIPE
if include_stdout:
ret['stdout'] = subprocess.PIPE
return ret
def probe_video_info(video_file: str) -> Tuple[int, int, float]:
"""
探测视频信息(宽度、高度、时长)
Args:
video_file: 视频文件路径
Returns:
(width, height, duration) 元组,失败返回 (0, 0, 0)
"""
try:
result = subprocess.run(
[
'ffprobe', '-v', 'error',
'-select_streams', 'v:0',
'-show_entries', 'stream=width,height:format=duration',
'-of', 'csv=s=x:p=0',
video_file
],
capture_output=True,
timeout=30,
**subprocess_args(False)
)
if result.returncode != 0:
logger.warning(f"ffprobe failed for {video_file}")
return 0, 0, 0
output = result.stdout.decode('utf-8').strip()
if not output:
return 0, 0, 0
lines = output.split('\n')
if len(lines) >= 2:
wh = lines[0].strip()
duration_str = lines[1].strip()
width, height = wh.split('x')
return int(width), int(height), float(duration_str)
return 0, 0, 0
except Exception as e:
logger.warning(f"probe_video_info error: {e}")
return 0, 0, 0
def probe_duration_json(file_path: str) -> Optional[float]:
"""
使用 ffprobe JSON 输出探测媒体时长
Args:
file_path: 媒体文件路径
Returns:
时长(秒),失败返回 None
"""
try:
result = subprocess.run(
[
'ffprobe', '-v', 'error',
'-show_entries', 'format=duration',
'-of', 'json',
file_path
],
capture_output=True,
timeout=30,
**subprocess_args(False)
)
if result.returncode != 0:
return None
data = json.loads(result.stdout.decode('utf-8'))
duration = data.get('format', {}).get('duration')
return float(duration) if duration else None
except Exception as e:
logger.warning(f"probe_duration_json error: {e}")
return None
class BaseHandler(TaskHandler, ABC):
"""
任务处理器基类
提供所有处理器共用的基础功能,包括:
- 临时目录管理
- 文件下载/上传
- FFmpeg 命令执行
- GPU 设备管理(多显卡调度)
- 日志记录
"""
# 线程本地存储:用于存储当前线程的 GPU 设备索引
_thread_local = threading.local()
def __init__(self, config: WorkerConfig, api_client: 'APIClientV2'):
"""
初始化处理器
Args:
config: Worker 配置
api_client: API 客户端
"""
self.config = config
self.api_client = api_client
self.material_cache = MaterialCache(
cache_dir=config.cache_dir,
enabled=config.cache_enabled,
max_size_gb=config.cache_max_size_gb
)
# ========== GPU 设备管理 ==========
def set_gpu_device(self, device_index: int) -> None:
"""
设置当前线程的 GPU 设备索引
由 TaskExecutor 在任务执行前调用。
Args:
device_index: GPU 设备索引
"""
self._thread_local.gpu_device = device_index
def get_gpu_device(self) -> Optional[int]:
"""
获取当前线程的 GPU 设备索引
Returns:
GPU 设备索引,未设置则返回 None
"""
return getattr(self._thread_local, 'gpu_device', None)
def clear_gpu_device(self) -> None:
"""
清除当前线程的 GPU 设备索引
由 TaskExecutor 在任务执行后调用。
"""
if hasattr(self._thread_local, 'gpu_device'):
del self._thread_local.gpu_device
# ========== FFmpeg 参数生成 ==========
def get_video_encode_args(self) -> List[str]:
"""
获取当前配置的视频编码参数
Returns:
FFmpeg 视频编码参数列表
"""
return get_video_encode_args(self.config.hw_accel)
def get_hwaccel_decode_args(self) -> List[str]:
"""
获取硬件加速解码参数(支持设备指定)
Returns:
FFmpeg 硬件加速解码参数列表
"""
device_index = self.get_gpu_device()
return get_hwaccel_decode_args(self.config.hw_accel, device_index)
def get_hwaccel_filter_prefix(self) -> str:
"""
获取硬件加速滤镜前缀
Returns:
需要添加到滤镜链开头的 hwdownload 滤镜字符串
"""
return get_hwaccel_filter_prefix(self.config.hw_accel)
def before_handle(self, task: Task) -> None:
"""处理前钩子"""
logger.debug(f"[task:{task.task_id}] Before handle: {task.task_type.value}")
def after_handle(self, task: Task, result: TaskResult) -> None:
"""处理后钩子"""
status = "success" if result.success else "failed"
logger.debug(f"[task:{task.task_id}] After handle: {status}")
def create_work_dir(self, task_id: str = None) -> str:
"""
创建临时工作目录
Args:
task_id: 任务 ID(用于目录命名)
Returns:
工作目录路径
"""
# 确保临时根目录存在
os.makedirs(self.config.temp_dir, exist_ok=True)
# 创建唯一的工作目录
prefix = f"task_{task_id}_" if task_id else "task_"
work_dir = tempfile.mkdtemp(dir=self.config.temp_dir, prefix=prefix)
logger.debug(f"Created work directory: {work_dir}")
return work_dir
def cleanup_work_dir(self, work_dir: str) -> None:
"""
清理临时工作目录
Args:
work_dir: 工作目录路径
"""
if not work_dir or not os.path.exists(work_dir):
return
try:
shutil.rmtree(work_dir)
logger.debug(f"Cleaned up work directory: {work_dir}")
except Exception as e:
logger.warning(f"Failed to cleanup work directory {work_dir}: {e}")
def download_file(self, url: str, dest: str, timeout: int = None, use_cache: bool = True) -> bool:
"""
下载文件(支持缓存)
Args:
url: 文件 URL
dest: 目标路径
timeout: 超时时间(秒)
use_cache: 是否使用缓存(默认 True)
Returns:
是否成功
"""
if timeout is None:
timeout = self.config.download_timeout
try:
if use_cache:
# 使用缓存下载
result = self.material_cache.get_or_download(url, dest, timeout=timeout)
else:
# 直接下载(不走缓存)
result = storage.download_file(url, dest, timeout=timeout)
if result:
file_size = os.path.getsize(dest) if os.path.exists(dest) else 0
logger.debug(f"Downloaded: {url} -> {dest} ({file_size} bytes)")
return result
except Exception as e:
logger.error(f"Download failed: {url} -> {e}")
return False
def upload_file(
self,
task_id: str,
file_type: str,
file_path: str,
file_name: str = None
) -> Optional[str]:
"""
上传文件并返回访问 URL
Args:
task_id: 任务 ID
file_type: 文件类型(video/audio/ts/mp4)
file_path: 本地文件路径
file_name: 文件名(可选)
Returns:
访问 URL,失败返回 None
"""
# 获取上传 URL
upload_info = self.api_client.get_upload_url(task_id, file_type, file_name)
if not upload_info:
logger.error(f"[task:{task_id}] Failed to get upload URL")
return None
upload_url = upload_info.get('uploadUrl')
access_url = upload_info.get('accessUrl')
if not upload_url:
logger.error(f"[task:{task_id}] Invalid upload URL response")
return None
# 上传文件
try:
result = storage.upload_file(upload_url, file_path, timeout=self.config.upload_timeout)
if result:
file_size = os.path.getsize(file_path)
logger.info(f"[task:{task_id}] Uploaded: {file_path} ({file_size} bytes)")
# 将上传成功的文件加入缓存
if access_url:
self.material_cache.add_to_cache(access_url, file_path)
return access_url
else:
logger.error(f"[task:{task_id}] Upload failed: {file_path}")
return None
except Exception as e:
logger.error(f"[task:{task_id}] Upload error: {e}")
return None
def run_ffmpeg(
self,
cmd: List[str],
task_id: str,
timeout: int = None
) -> bool:
"""
执行 FFmpeg 命令
Args:
cmd: FFmpeg 命令参数列表
task_id: 任务 ID(用于日志)
timeout: 超时时间(秒)
Returns:
是否成功
"""
if timeout is None:
timeout = self.config.ffmpeg_timeout
cmd_to_run = list(cmd)
if cmd_to_run and cmd_to_run[0] == 'ffmpeg' and '-loglevel' not in cmd_to_run:
cmd_to_run[1:1] = ['-loglevel', FFMPEG_LOGLEVEL]
# 日志记录命令(限制长度)
cmd_str = ' '.join(cmd_to_run)
if len(cmd_str) > 500:
cmd_str = cmd_str[:500] + '...'
logger.info(f"[task:{task_id}] FFmpeg: {cmd_str}")
try:
run_args = subprocess_args(False)
run_args['stdout'] = subprocess.DEVNULL
run_args['stderr'] = subprocess.PIPE
result = subprocess.run(
cmd_to_run,
timeout=timeout,
**run_args
)
if result.returncode != 0:
stderr = (result.stderr or b'').decode('utf-8', errors='replace')[:1000]
logger.error(f"[task:{task_id}] FFmpeg failed (code={result.returncode}): {stderr}")
return False
return True
except subprocess.TimeoutExpired:
logger.error(f"[task:{task_id}] FFmpeg timeout after {timeout}s")
return False
except Exception as e:
logger.error(f"[task:{task_id}] FFmpeg error: {e}")
return False
def probe_duration(self, file_path: str) -> Optional[float]:
"""
探测媒体文件时长
Args:
file_path: 文件路径
Returns:
时长(秒),失败返回 None
"""
# 首先尝试 JSON 输出方式
duration = probe_duration_json(file_path)
if duration is not None:
return duration
# 回退到旧方式
try:
_, _, duration = probe_video_info(file_path)
return float(duration) if duration else None
except Exception as e:
logger.warning(f"Failed to probe duration: {file_path} -> {e}")
return None
def get_file_size(self, file_path: str) -> int:
"""
获取文件大小
Args:
file_path: 文件路径
Returns:
文件大小(字节)
"""
try:
return os.path.getsize(file_path)
except Exception:
return 0
def ensure_file_exists(self, file_path: str, min_size: int = 0) -> bool:
"""
确保文件存在且大小满足要求
Args:
file_path: 文件路径
min_size: 最小大小(字节)
Returns:
是否满足要求
"""
if not os.path.exists(file_path):
return False
return os.path.getsize(file_path) >= min_size