Compare commits

...

2 Commits

Author SHA1 Message Date
b291f33486 feat(material-cache): 添加缓存锁机制防止并发冲突
- 实现跨进程缓存锁获取和释放功能
- 在下载过程中使用UUID生成唯一的临时文件名避免并发覆盖
- 添加超时机制和轮询间隔控制锁等待时间
- 修改清理逻辑跳过锁文件和下载中的临时文件
- 添加测试验证缓存锁功能正常工作

fix(ffmpeg): 优化FFmpeg命令执行和错误处理

- 添加默认日志级别为error减少冗余输出
- 修复subprocess运行参数传递方式
- 改进错误信息截取避免空值解码异常

refactor(system-info): 优化系统信息获取和缓存机制

- 实现FFmpeg版本、编解码器信息缓存避免重复查询
- 添加系统信息TTL缓存机制提升性能
- 实现GPU信息检查状态缓存避免重复检测
- 整合静态系统信息和动态信息分离处理

refactor(storage): 优化HTTP上传下载资源管理

- 使用上下文管理器确保请求连接正确关闭
- 修改rclone命令构建方式从字符串改为列表形式
- 改进错误处理截取stderr输出长度限制
- 优化响应处理避免资源泄露
2026-01-19 20:03:18 +08:00
0cc96a968b feat(gpu): 添加多显卡调度支持
- 新增 GPUDevice 数据类定义 GPU 设备信息
- 扩展 WorkerConfig 添加 gpu_devices 配置项
- 从环境变量 GPU_DEVICES 读取多显卡设备配置
- 实现 GPUScheduler 提供轮询调度功能
- 修改 FFmpeg 参数生成支持设备指定
- 添加线程本地存储管理当前 GPU 设备
- 更新任务执行器集成 GPU 设备分配
- 实现 GPU 设备自动检测和验证功能
- 添加相关日志记录和状态监控
2026-01-19 18:34:03 +08:00
11 changed files with 630 additions and 107 deletions

View File

@@ -32,11 +32,17 @@ TEMP_DIR=tmp/
#UPLOAD_TIMEOUT=600 # 上传超时(秒) #UPLOAD_TIMEOUT=600 # 上传超时(秒)
# =================== # ===================
# 硬件加速 # 硬件加速与多显卡
# =================== # ===================
# 可选值: none, qsv, cuda # 硬件加速类型: none, qsv, cuda
HW_ACCEL=none HW_ACCEL=none
# GPU 设备列表(逗号分隔的设备索引)
# 不配置时:自动检测所有设备
# 单设备示例:GPU_DEVICES=0
# 多设备示例:GPU_DEVICES=0,1,2
#GPU_DEVICES=0,1
# =================== # ===================
# 素材缓存 # 素材缓存
# =================== # ===================

View File

@@ -5,12 +5,15 @@ Worker 配置模型
定义 Worker 运行时的配置参数。 定义 Worker 运行时的配置参数。
""" """
import logging
import os import os
from dataclasses import dataclass, field from dataclasses import dataclass, field
from typing import List, Optional from typing import List, Optional
from constant import HW_ACCEL_NONE, HW_ACCEL_QSV, HW_ACCEL_CUDA, HW_ACCEL_TYPES from constant import HW_ACCEL_NONE, HW_ACCEL_QSV, HW_ACCEL_CUDA, HW_ACCEL_TYPES
logger = logging.getLogger(__name__)
# 默认支持的任务类型 # 默认支持的任务类型
DEFAULT_CAPABILITIES = [ DEFAULT_CAPABILITIES = [
@@ -59,6 +62,9 @@ class WorkerConfig:
# 硬件加速配置 # 硬件加速配置
hw_accel: str = HW_ACCEL_NONE # 硬件加速类型: none, qsv, cuda hw_accel: str = HW_ACCEL_NONE # 硬件加速类型: none, qsv, cuda
# GPU 设备配置(多显卡调度)
gpu_devices: List[int] = field(default_factory=list) # 空列表表示使用默认设备
# 素材缓存配置 # 素材缓存配置
cache_enabled: bool = True # 是否启用素材缓存 cache_enabled: bool = True # 是否启用素材缓存
cache_dir: str = "" # 缓存目录,默认为 temp_dir/cache cache_dir: str = "" # 缓存目录,默认为 temp_dir/cache
@@ -113,6 +119,16 @@ class WorkerConfig:
if hw_accel not in HW_ACCEL_TYPES: if hw_accel not in HW_ACCEL_TYPES:
hw_accel = HW_ACCEL_NONE hw_accel = HW_ACCEL_NONE
# GPU 设备列表(用于多显卡调度)
gpu_devices_str = os.getenv('GPU_DEVICES', '')
gpu_devices: List[int] = []
if gpu_devices_str:
try:
gpu_devices = [int(d.strip()) for d in gpu_devices_str.split(',') if d.strip()]
except ValueError:
logger.warning(f"Invalid GPU_DEVICES value: {gpu_devices_str}, using auto-detect")
gpu_devices = []
# 素材缓存配置 # 素材缓存配置
cache_enabled = os.getenv('CACHE_ENABLED', 'true').lower() in ('true', '1', 'yes') cache_enabled = os.getenv('CACHE_ENABLED', 'true').lower() in ('true', '1', 'yes')
cache_dir = os.getenv('CACHE_DIR', '') # 空字符串表示使用默认路径 cache_dir = os.getenv('CACHE_DIR', '') # 空字符串表示使用默认路径
@@ -132,6 +148,7 @@ class WorkerConfig:
download_timeout=download_timeout, download_timeout=download_timeout,
upload_timeout=upload_timeout, upload_timeout=upload_timeout,
hw_accel=hw_accel, hw_accel=hw_accel,
gpu_devices=gpu_devices,
cache_enabled=cache_enabled, cache_enabled=cache_enabled,
cache_dir=cache_dir if cache_dir else os.path.join(temp_dir, 'cache'), cache_dir=cache_dir if cache_dir else os.path.join(temp_dir, 'cache'),
cache_max_size_gb=cache_max_size_gb cache_max_size_gb=cache_max_size_gb
@@ -156,3 +173,11 @@ class WorkerConfig:
def is_cuda(self) -> bool: def is_cuda(self) -> bool:
"""是否使用 CUDA 硬件加速""" """是否使用 CUDA 硬件加速"""
return self.hw_accel == HW_ACCEL_CUDA return self.hw_accel == HW_ACCEL_CUDA
def has_multi_gpu(self) -> bool:
"""是否配置了多 GPU"""
return len(self.gpu_devices) > 1
def get_gpu_devices(self) -> List[int]:
"""获取 GPU 设备列表"""
return self.gpu_devices.copy()

31
domain/gpu.py Normal file
View File

@@ -0,0 +1,31 @@
# -*- coding: utf-8 -*-
"""
GPU 设备模型
定义 GPU 设备的数据结构。
"""
from dataclasses import dataclass
from typing import Optional
@dataclass
class GPUDevice:
"""
GPU 设备信息
Attributes:
index: 设备索引(对应 nvidia-smi 中的 GPU ID)
name: 设备名称(如 "NVIDIA GeForce RTX 3090"
memory_total: 显存总量(MB),可选
available: 设备是否可用
"""
index: int
name: str
memory_total: Optional[int] = None
available: bool = True
def __str__(self) -> str:
status = "available" if self.available else "unavailable"
mem_info = f", {self.memory_total}MB" if self.memory_total else ""
return f"GPU[{self.index}]: {self.name}{mem_info} ({status})"

View File

@@ -11,6 +11,7 @@ import logging
import shutil import shutil
import tempfile import tempfile
import subprocess import subprocess
import threading
from abc import ABC from abc import ABC
from typing import Optional, List, Dict, Any, Tuple, TYPE_CHECKING from typing import Optional, List, Dict, Any, Tuple, TYPE_CHECKING
@@ -75,23 +76,33 @@ def get_video_encode_args(hw_accel: str = HW_ACCEL_NONE) -> List[str]:
] ]
def get_hwaccel_decode_args(hw_accel: str = HW_ACCEL_NONE) -> List[str]: def get_hwaccel_decode_args(hw_accel: str = HW_ACCEL_NONE, device_index: Optional[int] = None) -> List[str]:
""" """
获取硬件加速解码参数(输入文件之前使用) 获取硬件加速解码参数(输入文件之前使用)
Args: Args:
hw_accel: 硬件加速类型 (none, qsv, cuda) hw_accel: 硬件加速类型 (none, qsv, cuda)
device_index: GPU 设备索引,用于多显卡调度
Returns: Returns:
FFmpeg 硬件加速解码参数列表 FFmpeg 硬件加速解码参数列表
""" """
if hw_accel == HW_ACCEL_CUDA: if hw_accel == HW_ACCEL_CUDA:
# CUDA 硬件加速解码 # CUDA 硬件加速解码
# 注意:使用 cuda 作为 hwaccel,但输出到系统内存以便 CPU 滤镜处理 args = ['-hwaccel', 'cuda']
return ['-hwaccel', 'cuda', '-hwaccel_output_format', 'cuda'] # 多显卡模式下指定设备
if device_index is not None:
args.extend(['-hwaccel_device', str(device_index)])
args.extend(['-hwaccel_output_format', 'cuda'])
return args
elif hw_accel == HW_ACCEL_QSV: elif hw_accel == HW_ACCEL_QSV:
# QSV 硬件加速解码 # QSV 硬件加速解码
return ['-hwaccel', 'qsv', '-hwaccel_output_format', 'qsv'] args = ['-hwaccel', 'qsv']
# QSV 在 Windows 上使用 -qsv_device
if device_index is not None:
args.extend(['-qsv_device', str(device_index)])
args.extend(['-hwaccel_output_format', 'qsv'])
return args
else: else:
return [] return []
@@ -128,6 +139,8 @@ AUDIO_ENCODE_ARGS = [
'-ac', '2', '-ac', '2',
] ]
FFMPEG_LOGLEVEL = 'error'
def subprocess_args(include_stdout: bool = True) -> Dict[str, Any]: def subprocess_args(include_stdout: bool = True) -> Dict[str, Any]:
""" """
@@ -248,9 +261,13 @@ class BaseHandler(TaskHandler, ABC):
- 临时目录管理 - 临时目录管理
- 文件下载/上传 - 文件下载/上传
- FFmpeg 命令执行 - FFmpeg 命令执行
- GPU 设备管理(多显卡调度)
- 日志记录 - 日志记录
""" """
# 线程本地存储:用于存储当前线程的 GPU 设备索引
_thread_local = threading.local()
def __init__(self, config: WorkerConfig, api_client: 'APIClientV2'): def __init__(self, config: WorkerConfig, api_client: 'APIClientV2'):
""" """
初始化处理器 初始化处理器
@@ -267,6 +284,39 @@ class BaseHandler(TaskHandler, ABC):
max_size_gb=config.cache_max_size_gb max_size_gb=config.cache_max_size_gb
) )
# ========== GPU 设备管理 ==========
def set_gpu_device(self, device_index: int) -> None:
"""
设置当前线程的 GPU 设备索引
由 TaskExecutor 在任务执行前调用。
Args:
device_index: GPU 设备索引
"""
self._thread_local.gpu_device = device_index
def get_gpu_device(self) -> Optional[int]:
"""
获取当前线程的 GPU 设备索引
Returns:
GPU 设备索引,未设置则返回 None
"""
return getattr(self._thread_local, 'gpu_device', None)
def clear_gpu_device(self) -> None:
"""
清除当前线程的 GPU 设备索引
由 TaskExecutor 在任务执行后调用。
"""
if hasattr(self._thread_local, 'gpu_device'):
del self._thread_local.gpu_device
# ========== FFmpeg 参数生成 ==========
def get_video_encode_args(self) -> List[str]: def get_video_encode_args(self) -> List[str]:
""" """
获取当前配置的视频编码参数 获取当前配置的视频编码参数
@@ -278,12 +328,13 @@ class BaseHandler(TaskHandler, ABC):
def get_hwaccel_decode_args(self) -> List[str]: def get_hwaccel_decode_args(self) -> List[str]:
""" """
获取硬件加速解码参数(在输入文件之前使用 获取硬件加速解码参数(支持设备指定
Returns: Returns:
FFmpeg 硬件加速解码参数列表 FFmpeg 硬件加速解码参数列表
""" """
return get_hwaccel_decode_args(self.config.hw_accel) device_index = self.get_gpu_device()
return get_hwaccel_decode_args(self.config.hw_accel, device_index)
def get_hwaccel_filter_prefix(self) -> str: def get_hwaccel_filter_prefix(self) -> str:
""" """
@@ -437,22 +488,28 @@ class BaseHandler(TaskHandler, ABC):
if timeout is None: if timeout is None:
timeout = self.config.ffmpeg_timeout timeout = self.config.ffmpeg_timeout
cmd_to_run = list(cmd)
if cmd_to_run and cmd_to_run[0] == 'ffmpeg' and '-loglevel' not in cmd_to_run:
cmd_to_run[1:1] = ['-loglevel', FFMPEG_LOGLEVEL]
# 日志记录命令(限制长度) # 日志记录命令(限制长度)
cmd_str = ' '.join(cmd) cmd_str = ' '.join(cmd_to_run)
if len(cmd_str) > 500: if len(cmd_str) > 500:
cmd_str = cmd_str[:500] + '...' cmd_str = cmd_str[:500] + '...'
logger.info(f"[task:{task_id}] FFmpeg: {cmd_str}") logger.info(f"[task:{task_id}] FFmpeg: {cmd_str}")
try: try:
run_args = subprocess_args(False)
run_args['stdout'] = subprocess.DEVNULL
run_args['stderr'] = subprocess.PIPE
result = subprocess.run( result = subprocess.run(
cmd, cmd_to_run,
capture_output=True,
timeout=timeout, timeout=timeout,
**subprocess_args(False) **run_args
) )
if result.returncode != 0: if result.returncode != 0:
stderr = result.stderr.decode('utf-8', errors='replace')[:1000] stderr = (result.stderr or b'').decode('utf-8', errors='replace')[:1000]
logger.error(f"[task:{task_id}] FFmpeg failed (code={result.returncode}): {stderr}") logger.error(f"[task:{task_id}] FFmpeg failed (code={result.returncode}): {stderr}")
return False return False

View File

@@ -7,6 +7,7 @@ v2 API 客户端
import logging import logging
import subprocess import subprocess
import time
import requests import requests
from typing import Dict, List, Optional, Any from typing import Dict, List, Optional, Any
@@ -24,6 +25,8 @@ class APIClientV2:
负责与渲染服务端的所有 HTTP 通信。 负责与渲染服务端的所有 HTTP 通信。
""" """
SYSTEM_INFO_TTL_SECONDS = 30
def __init__(self, config: WorkerConfig): def __init__(self, config: WorkerConfig):
""" """
初始化 API 客户端 初始化 API 客户端
@@ -37,6 +40,15 @@ class APIClientV2:
self.worker_id = config.worker_id self.worker_id = config.worker_id
self.session = requests.Session() self.session = requests.Session()
self._ffmpeg_version: Optional[str] = None
self._codec_info: Optional[str] = None
self._hw_accel_info: Optional[str] = None
self._gpu_info: Optional[str] = None
self._gpu_info_checked = False
self._static_system_info: Optional[Dict[str, Any]] = None
self._system_info_cache: Optional[Dict[str, Any]] = None
self._system_info_cache_ts = 0.0
# 设置默认请求头 # 设置默认请求头
self.session.headers.update({ self.session.headers.update({
'Content-Type': 'application/json', 'Content-Type': 'application/json',
@@ -287,6 +299,8 @@ class APIClientV2:
def _get_ffmpeg_version(self) -> str: def _get_ffmpeg_version(self) -> str:
"""获取 FFmpeg 版本""" """获取 FFmpeg 版本"""
if self._ffmpeg_version is not None:
return self._ffmpeg_version
try: try:
result = subprocess.run( result = subprocess.run(
['ffmpeg', '-version'], ['ffmpeg', '-version'],
@@ -299,13 +313,18 @@ class APIClientV2:
parts = first_line.split() parts = first_line.split()
for i, part in enumerate(parts): for i, part in enumerate(parts):
if part == 'version' and i + 1 < len(parts): if part == 'version' and i + 1 < len(parts):
return parts[i + 1] self._ffmpeg_version = parts[i + 1]
return 'unknown' return self._ffmpeg_version
self._ffmpeg_version = 'unknown'
return self._ffmpeg_version
except Exception: except Exception:
return 'unknown' self._ffmpeg_version = 'unknown'
return self._ffmpeg_version
def _get_codec_info(self) -> str: def _get_codec_info(self) -> str:
"""获取支持的编解码器信息""" """获取支持的编解码器信息"""
if self._codec_info is not None:
return self._codec_info
try: try:
result = subprocess.run( result = subprocess.run(
['ffmpeg', '-codecs'], ['ffmpeg', '-codecs'],
@@ -324,37 +343,60 @@ class APIClientV2:
codecs.append('aac') codecs.append('aac')
if 'libfdk_aac' in output: if 'libfdk_aac' in output:
codecs.append('libfdk_aac') codecs.append('libfdk_aac')
return ', '.join(codecs) if codecs else 'unknown' self._codec_info = ', '.join(codecs) if codecs else 'unknown'
return self._codec_info
except Exception: except Exception:
return 'unknown' self._codec_info = 'unknown'
return self._codec_info
def _get_system_info(self) -> Dict[str, Any]: def _get_system_info(self) -> Dict[str, Any]:
"""获取系统信息""" """获取系统信息"""
try: try:
now = time.monotonic()
if (
self._system_info_cache
and now - self._system_info_cache_ts < self.SYSTEM_INFO_TTL_SECONDS
):
return self._system_info_cache
import platform import platform
import psutil import psutil
info = { if self._hw_accel_info is None:
self._hw_accel_info = get_hw_accel_info_str()
if self._static_system_info is None:
self._static_system_info = {
'os': platform.system(), 'os': platform.system(),
'cpu': f"{psutil.cpu_count()} cores", 'cpu': f"{psutil.cpu_count()} cores",
'memory': f"{psutil.virtual_memory().total // (1024**3)}GB", 'memory': f"{psutil.virtual_memory().total // (1024**3)}GB",
'hwAccelConfig': self.config.hw_accel, # 当前配置的硬件加速
'hwAccelSupport': self._hw_accel_info, # 系统支持的硬件加速
}
info = dict(self._static_system_info)
info.update({
'cpuUsage': f"{psutil.cpu_percent()}%", 'cpuUsage': f"{psutil.cpu_percent()}%",
'memoryAvailable': f"{psutil.virtual_memory().available // (1024**3)}GB", 'memoryAvailable': f"{psutil.virtual_memory().available // (1024**3)}GB",
'hwAccelConfig': self.config.hw_accel, # 当前配置的硬件加速 })
'hwAccelSupport': get_hw_accel_info_str(), # 系统支持的硬件加速
}
# 尝试获取 GPU 信息 # 尝试获取 GPU 信息
gpu_info = self._get_gpu_info() gpu_info = self._get_gpu_info()
if gpu_info: if gpu_info:
info['gpu'] = gpu_info info['gpu'] = gpu_info
self._system_info_cache = info
self._system_info_cache_ts = now
return info return info
except Exception: except Exception:
return {} return {}
def _get_gpu_info(self) -> Optional[str]: def _get_gpu_info(self) -> Optional[str]:
"""获取 GPU 信息""" """获取 GPU 信息"""
if self._gpu_info_checked:
return self._gpu_info
self._gpu_info_checked = True
try: try:
result = subprocess.run( result = subprocess.run(
['nvidia-smi', '--query-gpu=name', '--format=csv,noheader'], ['nvidia-smi', '--query-gpu=name', '--format=csv,noheader'],
@@ -364,10 +406,11 @@ class APIClientV2:
) )
if result.returncode == 0: if result.returncode == 0:
gpu_name = result.stdout.strip().split('\n')[0] gpu_name = result.stdout.strip().split('\n')[0]
return gpu_name self._gpu_info = gpu_name
except Exception: except Exception:
pass self._gpu_info = None
return None
return self._gpu_info
def close(self): def close(self):
"""关闭会话""" """关闭会话"""

View File

@@ -10,6 +10,7 @@ import hashlib
import logging import logging
import shutil import shutil
import time import time
import uuid
from typing import Optional, Tuple from typing import Optional, Tuple
from urllib.parse import urlparse, unquote from urllib.parse import urlparse, unquote
@@ -59,6 +60,9 @@ class MaterialCache:
负责素材文件的缓存存储和检索。 负责素材文件的缓存存储和检索。
""" """
LOCK_TIMEOUT_SEC = 30.0
LOCK_POLL_INTERVAL_SEC = 0.1
def __init__(self, cache_dir: str, enabled: bool = True, max_size_gb: float = 0): def __init__(self, cache_dir: str, enabled: bool = True, max_size_gb: float = 0):
""" """
初始化缓存管理器 初始化缓存管理器
@@ -91,6 +95,44 @@ class MaterialCache:
filename = f"{cache_key}{ext}" filename = f"{cache_key}{ext}"
return os.path.join(self.cache_dir, filename) return os.path.join(self.cache_dir, filename)
def _get_lock_path(self, cache_key: str) -> str:
"""获取缓存锁文件路径"""
assert self.cache_dir
return os.path.join(self.cache_dir, f"{cache_key}.lock")
def _acquire_lock(self, cache_key: str) -> Optional[str]:
"""获取缓存锁(跨进程安全)"""
if not self.enabled:
return None
lock_path = self._get_lock_path(cache_key)
deadline = time.monotonic() + self.LOCK_TIMEOUT_SEC
while True:
try:
fd = os.open(lock_path, os.O_CREAT | os.O_EXCL | os.O_WRONLY)
os.close(fd)
return lock_path
except FileExistsError:
if time.monotonic() >= deadline:
logger.warning(f"Cache lock timeout: {lock_path}")
return None
time.sleep(self.LOCK_POLL_INTERVAL_SEC)
except Exception as e:
logger.warning(f"Cache lock error: {e}")
return None
def _release_lock(self, lock_path: Optional[str]) -> None:
"""释放缓存锁"""
if not lock_path:
return
try:
os.remove(lock_path)
except FileNotFoundError:
return
except Exception as e:
logger.warning(f"Cache lock release error: {e}")
def is_cached(self, url: str) -> Tuple[bool, str]: def is_cached(self, url: str) -> Tuple[bool, str]:
""" """
检查素材是否已缓存 检查素材是否已缓存
@@ -136,8 +178,15 @@ class MaterialCache:
if not self.enabled: if not self.enabled:
return storage.download_file(url, dest, max_retries=max_retries, timeout=timeout) return storage.download_file(url, dest, max_retries=max_retries, timeout=timeout)
# 检查缓存 cache_key = _extract_cache_key(url)
cached, cache_path = self.is_cached(url) lock_path = self._acquire_lock(cache_key)
if not lock_path:
logger.warning(f"Cache lock unavailable, downloading without cache: {url[:80]}...")
return storage.download_file(url, dest, max_retries=max_retries, timeout=timeout)
try:
cache_path = self.get_cache_path(url)
cached = os.path.exists(cache_path) and os.path.getsize(cache_path) > 0
if cached: if cached:
# 命中缓存,复制到目标路径 # 命中缓存,复制到目标路径
@@ -159,8 +208,11 @@ class MaterialCache:
# 未命中缓存,下载到缓存目录 # 未命中缓存,下载到缓存目录
logger.debug(f"Cache miss: {url[:80]}...") logger.debug(f"Cache miss: {url[:80]}...")
# 先下载到临时文件 # 先下载到临时文件(唯一文件名,避免并发覆盖)
temp_cache_path = cache_path + '.downloading' temp_cache_path = os.path.join(
self.cache_dir,
f"{cache_key}.{uuid.uuid4().hex}.downloading"
)
try: try:
if not storage.download_file(url, temp_cache_path, max_retries=max_retries, timeout=timeout): if not storage.download_file(url, temp_cache_path, max_retries=max_retries, timeout=timeout):
# 下载失败,清理临时文件 # 下载失败,清理临时文件
@@ -168,10 +220,13 @@ class MaterialCache:
os.remove(temp_cache_path) os.remove(temp_cache_path)
return False return False
# 下载成功,移动到正式缓存路径 if not os.path.exists(temp_cache_path) or os.path.getsize(temp_cache_path) <= 0:
if os.path.exists(cache_path): if os.path.exists(temp_cache_path):
os.remove(cache_path) os.remove(temp_cache_path)
os.rename(temp_cache_path, cache_path) return False
# 下载成功,原子替换缓存文件
os.replace(temp_cache_path, cache_path)
# 复制到目标路径 # 复制到目标路径
shutil.copy2(cache_path, dest) shutil.copy2(cache_path, dest)
@@ -193,6 +248,8 @@ class MaterialCache:
except Exception: except Exception:
pass pass
return False return False
finally:
self._release_lock(lock_path)
def _cleanup_if_needed(self) -> None: def _cleanup_if_needed(self) -> None:
""" """
@@ -209,7 +266,7 @@ class MaterialCache:
total_size = 0 total_size = 0
for filename in os.listdir(self.cache_dir): for filename in os.listdir(self.cache_dir):
if filename.endswith('.downloading'): if filename.endswith('.downloading') or filename.endswith('.lock'):
continue continue
file_path = os.path.join(self.cache_dir, filename) file_path = os.path.join(self.cache_dir, filename)
if os.path.isfile(file_path): if os.path.isfile(file_path):
@@ -275,7 +332,7 @@ class MaterialCache:
total_size = 0 total_size = 0
for filename in os.listdir(self.cache_dir): for filename in os.listdir(self.cache_dir):
if filename.endswith('.downloading'): if filename.endswith('.downloading') or filename.endswith('.lock'):
continue continue
file_path = os.path.join(self.cache_dir, filename) file_path = os.path.join(self.cache_dir, filename)
if os.path.isfile(file_path): if os.path.isfile(file_path):

164
services/gpu_scheduler.py Normal file
View File

@@ -0,0 +1,164 @@
# -*- coding: utf-8 -*-
"""
GPU 调度器
提供多 GPU 设备的轮询调度功能。
"""
import logging
import threading
from typing import List, Optional
from domain.config import WorkerConfig
from domain.gpu import GPUDevice
from util.system import get_all_gpu_info, validate_gpu_device
from constant import HW_ACCEL_CUDA, HW_ACCEL_QSV
logger = logging.getLogger(__name__)
class GPUScheduler:
"""
GPU 调度器
实现多 GPU 设备的轮询(Round Robin)调度。
线程安全,支持并发任务执行。
使用方式:
scheduler = GPUScheduler(config)
# 在任务执行时
device_index = scheduler.acquire()
try:
# 执行任务
pass
finally:
scheduler.release(device_index)
"""
def __init__(self, config: WorkerConfig):
"""
初始化调度器
Args:
config: Worker 配置
"""
self._config = config
self._devices: List[GPUDevice] = []
self._next_index: int = 0
self._lock = threading.Lock()
self._enabled = False
# 初始化设备列表
self._init_devices()
def _init_devices(self) -> None:
"""初始化 GPU 设备列表"""
# 仅在启用硬件加速时才初始化
if self._config.hw_accel not in (HW_ACCEL_CUDA, HW_ACCEL_QSV):
logger.info("Hardware acceleration not enabled, GPU scheduler disabled")
return
configured_devices = self._config.gpu_devices
if configured_devices:
# 使用配置指定的设备
self._devices = self._validate_configured_devices(configured_devices)
else:
# 自动检测所有设备
self._devices = self._auto_detect_devices()
if self._devices:
self._enabled = True
device_info = ', '.join(str(d) for d in self._devices)
logger.info(f"GPU scheduler initialized with {len(self._devices)} device(s): {device_info}")
else:
logger.warning("No GPU devices available, scheduler disabled")
def _validate_configured_devices(self, indices: List[int]) -> List[GPUDevice]:
"""
验证配置的设备列表
Args:
indices: 配置的设备索引列表
Returns:
验证通过的设备列表
"""
devices = []
for index in indices:
if validate_gpu_device(index):
devices.append(GPUDevice(
index=index,
name=f"GPU-{index}",
available=True
))
else:
logger.warning(f"GPU device {index} is not available, skipping")
return devices
def _auto_detect_devices(self) -> List[GPUDevice]:
"""
自动检测所有可用 GPU
Returns:
检测到的设备列表
"""
all_devices = get_all_gpu_info()
# 过滤不可用设备
return [d for d in all_devices if d.available]
@property
def enabled(self) -> bool:
"""调度器是否启用"""
return self._enabled
@property
def device_count(self) -> int:
"""设备数量"""
return len(self._devices)
def acquire(self) -> Optional[int]:
"""
获取下一个可用的 GPU 设备(轮询调度)
Returns:
GPU 设备索引,如果调度器未启用或无设备则返回 None
"""
if not self._enabled or not self._devices:
return None
with self._lock:
device = self._devices[self._next_index]
self._next_index = (self._next_index + 1) % len(self._devices)
logger.debug(f"Acquired GPU device: {device.index}")
return device.index
def release(self, device_index: Optional[int]) -> None:
"""
释放 GPU 设备
当前实现为无状态轮询,此方法仅用于日志记录。
Args:
device_index: 设备索引
"""
if device_index is not None:
logger.debug(f"Released GPU device: {device_index}")
def get_status(self) -> dict:
"""
获取调度器状态信息
Returns:
状态字典
"""
return {
'enabled': self._enabled,
'device_count': len(self._devices),
'devices': [
{'index': d.index, 'name': d.name, 'available': d.available}
for d in self._devices
],
'hw_accel': self._config.hw_accel,
}

View File

@@ -7,6 +7,7 @@
import os import os
import logging import logging
import subprocess
from typing import Optional from typing import Optional
import requests import requests
@@ -73,13 +74,13 @@ def upload_file(url: str, file_path: str, max_retries: int = 5, timeout: int = 6
while retries < max_retries: while retries < max_retries:
try: try:
with open(file_path, 'rb') as f: with open(file_path, 'rb') as f:
response = requests.put( with requests.put(
http_url, http_url,
data=f, data=f,
stream=True, stream=True,
timeout=timeout, timeout=timeout,
headers={"Content-Type": "application/octet-stream"} headers={"Content-Type": "application/octet-stream"}
) ) as response:
response.raise_for_status() response.raise_for_status()
logger.info(f"Upload succeeded: {file_path}") logger.info(f"Upload succeeded: {file_path}")
return True return True
@@ -111,7 +112,6 @@ def _upload_with_rclone(url: str, file_path: str) -> bool:
return False return False
config_file = os.getenv("RCLONE_CONFIG_FILE", "") config_file = os.getenv("RCLONE_CONFIG_FILE", "")
rclone_config = f"--config {config_file}" if config_file else ""
# 替换 URL # 替换 URL
new_url = url new_url = url
@@ -123,19 +123,30 @@ def _upload_with_rclone(url: str, file_path: str) -> bool:
if new_url == url: if new_url == url:
return False return False
cmd = ( cmd = [
f"rclone copyto --no-check-dest --ignore-existing " "rclone",
f"--multi-thread-chunk-size 8M --multi-thread-streams 8 " "copyto",
f"{rclone_config} {file_path} {new_url}" "--no-check-dest",
) "--ignore-existing",
logger.debug(f"rclone command: {cmd}") "--multi-thread-chunk-size",
"8M",
"--multi-thread-streams",
"8",
]
if config_file:
cmd.extend(["--config", config_file])
cmd.extend([file_path, new_url])
result = os.system(cmd) logger.debug(f"rclone command: {' '.join(cmd)}")
if result == 0:
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
logger.info(f"rclone upload succeeded: {file_path}") logger.info(f"rclone upload succeeded: {file_path}")
return True return True
logger.warning(f"rclone upload failed (code={result}): {file_path}") stderr = (result.stderr or '').strip()
stderr = stderr[:500] if stderr else ""
logger.warning(f"rclone upload failed (code={result.returncode}): {file_path} {stderr}")
return False return False
@@ -177,7 +188,7 @@ def download_file(
retries = 0 retries = 0
while retries < max_retries: while retries < max_retries:
try: try:
response = requests.get(http_url, timeout=timeout, stream=True) with requests.get(http_url, timeout=timeout, stream=True) as response:
response.raise_for_status() response.raise_for_status()
with open(file_path, 'wb') as f: with open(file_path, 'wb') as f:

View File

@@ -15,6 +15,7 @@ from domain.result import TaskResult, ErrorCode
from domain.config import WorkerConfig from domain.config import WorkerConfig
from core.handler import TaskHandler from core.handler import TaskHandler
from services.lease_service import LeaseService from services.lease_service import LeaseService
from services.gpu_scheduler import GPUScheduler
if TYPE_CHECKING: if TYPE_CHECKING:
from services.api_client import APIClientV2 from services.api_client import APIClientV2
@@ -60,6 +61,12 @@ class TaskExecutor:
# 线程安全锁 # 线程安全锁
self.lock = threading.Lock() self.lock = threading.Lock()
# GPU 调度器(如果启用硬件加速)
self.gpu_scheduler = GPUScheduler(config)
if self.gpu_scheduler.enabled:
logger.info(f"GPU scheduler enabled with {self.gpu_scheduler.device_count} device(s)")
# 注册处理器 # 注册处理器
self._register_handlers() self._register_handlers()
@@ -130,6 +137,14 @@ class TaskExecutor:
logger.warning(f"[task:{task.task_id}] Task already running, skipping") logger.warning(f"[task:{task.task_id}] Task already running, skipping")
return False return False
# 检查并发上限
if len(self.current_tasks) >= self.config.max_concurrency:
logger.info(
f"[task:{task.task_id}] Max concurrency reached "
f"({self.config.max_concurrency}), rejecting task"
)
return False
# 检查是否有对应的处理器 # 检查是否有对应的处理器
if task.task_type not in self.handlers: if task.task_type not in self.handlers:
logger.error(f"[task:{task.task_id}] No handler for type: {task.task_type.value}") logger.error(f"[task:{task.task_id}] No handler for type: {task.task_type.value}")
@@ -164,15 +179,27 @@ class TaskExecutor:
) )
lease_service.start() lease_service.start()
# 获取 GPU 设备
device_index = None
if self.gpu_scheduler.enabled:
device_index = self.gpu_scheduler.acquire()
if device_index is not None:
logger.info(f"[task:{task_id}] Assigned to GPU device {device_index}")
# 获取处理器(需要在设置 GPU 设备前获取)
handler = self.handlers.get(task.task_type)
try: try:
# 报告任务开始 # 报告任务开始
self.api_client.report_start(task_id) self.api_client.report_start(task_id)
# 获取处理器
handler = self.handlers.get(task.task_type)
if not handler: if not handler:
raise ValueError(f"No handler for task type: {task.task_type}") raise ValueError(f"No handler for task type: {task.task_type}")
# 设置 GPU 设备(线程本地存储)
if device_index is not None:
handler.set_gpu_device(device_index)
# 执行前钩子 # 执行前钩子
handler.before_handle(task) handler.before_handle(task)
@@ -196,6 +223,14 @@ class TaskExecutor:
self.api_client.report_fail(task_id, 'E_UNKNOWN', str(e)) self.api_client.report_fail(task_id, 'E_UNKNOWN', str(e))
finally: finally:
# 清除 GPU 设备设置
if handler:
handler.clear_gpu_device()
# 释放 GPU 设备
if self.gpu_scheduler.enabled:
self.gpu_scheduler.release(device_index)
# 停止租约续期 # 停止租约续期
lease_service.stop() lease_service.stop()

View File

@@ -0,0 +1,15 @@
# -*- coding: utf-8 -*-
import os
from services.cache import MaterialCache, _extract_cache_key
def test_cache_lock_acquire_release(tmp_path):
cache = MaterialCache(cache_dir=str(tmp_path), enabled=True, max_size_gb=0)
cache_key = _extract_cache_key("https://example.com/path/file.mp4?token=abc")
lock_path = cache._acquire_lock(cache_key)
assert lock_path
assert os.path.exists(lock_path)
cache._release_lock(lock_path)
assert not os.path.exists(lock_path)

View File

@@ -5,13 +5,17 @@
提供系统信息采集功能。 提供系统信息采集功能。
""" """
import logging
import os import os
import platform import platform
import subprocess import subprocess
from typing import Optional, Dict, Any from typing import Optional, Dict, Any, List
import psutil import psutil
from constant import SOFTWARE_VERSION, DEFAULT_CAPABILITIES, HW_ACCEL_NONE, HW_ACCEL_QSV, HW_ACCEL_CUDA from constant import SOFTWARE_VERSION, DEFAULT_CAPABILITIES, HW_ACCEL_NONE, HW_ACCEL_QSV, HW_ACCEL_CUDA
from domain.gpu import GPUDevice
logger = logging.getLogger(__name__)
def get_sys_info(): def get_sys_info():
@@ -264,3 +268,78 @@ def get_hw_accel_info_str() -> str:
return "No hardware acceleration available" return "No hardware acceleration available"
return ', '.join(parts) + f" [recommended: {support['recommended']}]" return ', '.join(parts) + f" [recommended: {support['recommended']}]"
def get_all_gpu_info() -> List[GPUDevice]:
"""
获取所有 NVIDIA GPU 信息
使用 nvidia-smi 查询所有 GPU 设备。
Returns:
GPU 设备列表,失败返回空列表
"""
try:
result = subprocess.run(
[
'nvidia-smi',
'--query-gpu=index,name,memory.total',
'--format=csv,noheader,nounits'
],
capture_output=True,
text=True,
timeout=10
)
if result.returncode != 0:
return []
devices = []
for line in result.stdout.strip().split('\n'):
if not line.strip():
continue
parts = [p.strip() for p in line.split(',')]
if len(parts) >= 2:
index = int(parts[0])
name = parts[1]
memory = int(parts[2]) if len(parts) >= 3 else None
devices.append(GPUDevice(
index=index,
name=name,
memory_total=memory,
available=True
))
return devices
except Exception as e:
logger.warning(f"Failed to detect GPUs: {e}")
return []
def validate_gpu_device(index: int) -> bool:
"""
验证指定索引的 GPU 设备是否可用
Args:
index: GPU 设备索引
Returns:
设备是否可用
"""
try:
result = subprocess.run(
[
'nvidia-smi',
'-i', str(index),
'--query-gpu=name',
'--format=csv,noheader'
],
capture_output=True,
text=True,
timeout=5
)
return result.returncode == 0 and bool(result.stdout.strip())
except Exception:
return False