Compare commits

..

2 Commits

Author SHA1 Message Date
b291f33486 feat(material-cache): 添加缓存锁机制防止并发冲突
- 实现跨进程缓存锁获取和释放功能
- 在下载过程中使用UUID生成唯一的临时文件名避免并发覆盖
- 添加超时机制和轮询间隔控制锁等待时间
- 修改清理逻辑跳过锁文件和下载中的临时文件
- 添加测试验证缓存锁功能正常工作

fix(ffmpeg): 优化FFmpeg命令执行和错误处理

- 添加默认日志级别为error减少冗余输出
- 修复subprocess运行参数传递方式
- 改进错误信息截取避免空值解码异常

refactor(system-info): 优化系统信息获取和缓存机制

- 实现FFmpeg版本、编解码器信息缓存避免重复查询
- 添加系统信息TTL缓存机制提升性能
- 实现GPU信息检查状态缓存避免重复检测
- 整合静态系统信息和动态信息分离处理

refactor(storage): 优化HTTP上传下载资源管理

- 使用上下文管理器确保请求连接正确关闭
- 修改rclone命令构建方式从字符串改为列表形式
- 改进错误处理截取stderr输出长度限制
- 优化响应处理避免资源泄露
2026-01-19 20:03:18 +08:00
0cc96a968b feat(gpu): 添加多显卡调度支持
- 新增 GPUDevice 数据类定义 GPU 设备信息
- 扩展 WorkerConfig 添加 gpu_devices 配置项
- 从环境变量 GPU_DEVICES 读取多显卡设备配置
- 实现 GPUScheduler 提供轮询调度功能
- 修改 FFmpeg 参数生成支持设备指定
- 添加线程本地存储管理当前 GPU 设备
- 更新任务执行器集成 GPU 设备分配
- 实现 GPU 设备自动检测和验证功能
- 添加相关日志记录和状态监控
2026-01-19 18:34:03 +08:00
11 changed files with 630 additions and 107 deletions

View File

@@ -32,11 +32,17 @@ TEMP_DIR=tmp/
#UPLOAD_TIMEOUT=600 # 上传超时(秒)
# ===================
# 硬件加速
# 硬件加速与多显卡
# ===================
# 可选值: none, qsv, cuda
# 硬件加速类型: none, qsv, cuda
HW_ACCEL=none
# GPU 设备列表(逗号分隔的设备索引)
# 不配置时:自动检测所有设备
# 单设备示例:GPU_DEVICES=0
# 多设备示例:GPU_DEVICES=0,1,2
#GPU_DEVICES=0,1
# ===================
# 素材缓存
# ===================

View File

@@ -5,12 +5,15 @@ Worker 配置模型
定义 Worker 运行时的配置参数。
"""
import logging
import os
from dataclasses import dataclass, field
from typing import List, Optional
from constant import HW_ACCEL_NONE, HW_ACCEL_QSV, HW_ACCEL_CUDA, HW_ACCEL_TYPES
logger = logging.getLogger(__name__)
# 默认支持的任务类型
DEFAULT_CAPABILITIES = [
@@ -59,6 +62,9 @@ class WorkerConfig:
# 硬件加速配置
hw_accel: str = HW_ACCEL_NONE # 硬件加速类型: none, qsv, cuda
# GPU 设备配置(多显卡调度)
gpu_devices: List[int] = field(default_factory=list) # 空列表表示使用默认设备
# 素材缓存配置
cache_enabled: bool = True # 是否启用素材缓存
cache_dir: str = "" # 缓存目录,默认为 temp_dir/cache
@@ -113,6 +119,16 @@ class WorkerConfig:
if hw_accel not in HW_ACCEL_TYPES:
hw_accel = HW_ACCEL_NONE
# GPU 设备列表(用于多显卡调度)
gpu_devices_str = os.getenv('GPU_DEVICES', '')
gpu_devices: List[int] = []
if gpu_devices_str:
try:
gpu_devices = [int(d.strip()) for d in gpu_devices_str.split(',') if d.strip()]
except ValueError:
logger.warning(f"Invalid GPU_DEVICES value: {gpu_devices_str}, using auto-detect")
gpu_devices = []
# 素材缓存配置
cache_enabled = os.getenv('CACHE_ENABLED', 'true').lower() in ('true', '1', 'yes')
cache_dir = os.getenv('CACHE_DIR', '') # 空字符串表示使用默认路径
@@ -132,6 +148,7 @@ class WorkerConfig:
download_timeout=download_timeout,
upload_timeout=upload_timeout,
hw_accel=hw_accel,
gpu_devices=gpu_devices,
cache_enabled=cache_enabled,
cache_dir=cache_dir if cache_dir else os.path.join(temp_dir, 'cache'),
cache_max_size_gb=cache_max_size_gb
@@ -156,3 +173,11 @@ class WorkerConfig:
def is_cuda(self) -> bool:
"""是否使用 CUDA 硬件加速"""
return self.hw_accel == HW_ACCEL_CUDA
def has_multi_gpu(self) -> bool:
"""是否配置了多 GPU"""
return len(self.gpu_devices) > 1
def get_gpu_devices(self) -> List[int]:
"""获取 GPU 设备列表"""
return self.gpu_devices.copy()

31
domain/gpu.py Normal file
View File

@@ -0,0 +1,31 @@
# -*- coding: utf-8 -*-
"""
GPU 设备模型
定义 GPU 设备的数据结构。
"""
from dataclasses import dataclass
from typing import Optional
@dataclass
class GPUDevice:
"""
GPU 设备信息
Attributes:
index: 设备索引(对应 nvidia-smi 中的 GPU ID)
name: 设备名称(如 "NVIDIA GeForce RTX 3090"
memory_total: 显存总量(MB),可选
available: 设备是否可用
"""
index: int
name: str
memory_total: Optional[int] = None
available: bool = True
def __str__(self) -> str:
status = "available" if self.available else "unavailable"
mem_info = f", {self.memory_total}MB" if self.memory_total else ""
return f"GPU[{self.index}]: {self.name}{mem_info} ({status})"

View File

@@ -11,6 +11,7 @@ import logging
import shutil
import tempfile
import subprocess
import threading
from abc import ABC
from typing import Optional, List, Dict, Any, Tuple, TYPE_CHECKING
@@ -75,23 +76,33 @@ def get_video_encode_args(hw_accel: str = HW_ACCEL_NONE) -> List[str]:
]
def get_hwaccel_decode_args(hw_accel: str = HW_ACCEL_NONE) -> List[str]:
def get_hwaccel_decode_args(hw_accel: str = HW_ACCEL_NONE, device_index: Optional[int] = None) -> List[str]:
"""
获取硬件加速解码参数(输入文件之前使用)
Args:
hw_accel: 硬件加速类型 (none, qsv, cuda)
device_index: GPU 设备索引,用于多显卡调度
Returns:
FFmpeg 硬件加速解码参数列表
"""
if hw_accel == HW_ACCEL_CUDA:
# CUDA 硬件加速解码
# 注意:使用 cuda 作为 hwaccel,但输出到系统内存以便 CPU 滤镜处理
return ['-hwaccel', 'cuda', '-hwaccel_output_format', 'cuda']
args = ['-hwaccel', 'cuda']
# 多显卡模式下指定设备
if device_index is not None:
args.extend(['-hwaccel_device', str(device_index)])
args.extend(['-hwaccel_output_format', 'cuda'])
return args
elif hw_accel == HW_ACCEL_QSV:
# QSV 硬件加速解码
return ['-hwaccel', 'qsv', '-hwaccel_output_format', 'qsv']
args = ['-hwaccel', 'qsv']
# QSV 在 Windows 上使用 -qsv_device
if device_index is not None:
args.extend(['-qsv_device', str(device_index)])
args.extend(['-hwaccel_output_format', 'qsv'])
return args
else:
return []
@@ -128,6 +139,8 @@ AUDIO_ENCODE_ARGS = [
'-ac', '2',
]
FFMPEG_LOGLEVEL = 'error'
def subprocess_args(include_stdout: bool = True) -> Dict[str, Any]:
"""
@@ -248,9 +261,13 @@ class BaseHandler(TaskHandler, ABC):
- 临时目录管理
- 文件下载/上传
- FFmpeg 命令执行
- GPU 设备管理(多显卡调度)
- 日志记录
"""
# 线程本地存储:用于存储当前线程的 GPU 设备索引
_thread_local = threading.local()
def __init__(self, config: WorkerConfig, api_client: 'APIClientV2'):
"""
初始化处理器
@@ -267,6 +284,39 @@ class BaseHandler(TaskHandler, ABC):
max_size_gb=config.cache_max_size_gb
)
# ========== GPU 设备管理 ==========
def set_gpu_device(self, device_index: int) -> None:
"""
设置当前线程的 GPU 设备索引
由 TaskExecutor 在任务执行前调用。
Args:
device_index: GPU 设备索引
"""
self._thread_local.gpu_device = device_index
def get_gpu_device(self) -> Optional[int]:
"""
获取当前线程的 GPU 设备索引
Returns:
GPU 设备索引,未设置则返回 None
"""
return getattr(self._thread_local, 'gpu_device', None)
def clear_gpu_device(self) -> None:
"""
清除当前线程的 GPU 设备索引
由 TaskExecutor 在任务执行后调用。
"""
if hasattr(self._thread_local, 'gpu_device'):
del self._thread_local.gpu_device
# ========== FFmpeg 参数生成 ==========
def get_video_encode_args(self) -> List[str]:
"""
获取当前配置的视频编码参数
@@ -278,12 +328,13 @@ class BaseHandler(TaskHandler, ABC):
def get_hwaccel_decode_args(self) -> List[str]:
"""
获取硬件加速解码参数(在输入文件之前使用
获取硬件加速解码参数(支持设备指定
Returns:
FFmpeg 硬件加速解码参数列表
"""
return get_hwaccel_decode_args(self.config.hw_accel)
device_index = self.get_gpu_device()
return get_hwaccel_decode_args(self.config.hw_accel, device_index)
def get_hwaccel_filter_prefix(self) -> str:
"""
@@ -437,22 +488,28 @@ class BaseHandler(TaskHandler, ABC):
if timeout is None:
timeout = self.config.ffmpeg_timeout
cmd_to_run = list(cmd)
if cmd_to_run and cmd_to_run[0] == 'ffmpeg' and '-loglevel' not in cmd_to_run:
cmd_to_run[1:1] = ['-loglevel', FFMPEG_LOGLEVEL]
# 日志记录命令(限制长度)
cmd_str = ' '.join(cmd)
cmd_str = ' '.join(cmd_to_run)
if len(cmd_str) > 500:
cmd_str = cmd_str[:500] + '...'
logger.info(f"[task:{task_id}] FFmpeg: {cmd_str}")
try:
run_args = subprocess_args(False)
run_args['stdout'] = subprocess.DEVNULL
run_args['stderr'] = subprocess.PIPE
result = subprocess.run(
cmd,
capture_output=True,
cmd_to_run,
timeout=timeout,
**subprocess_args(False)
**run_args
)
if result.returncode != 0:
stderr = result.stderr.decode('utf-8', errors='replace')[:1000]
stderr = (result.stderr or b'').decode('utf-8', errors='replace')[:1000]
logger.error(f"[task:{task_id}] FFmpeg failed (code={result.returncode}): {stderr}")
return False

View File

@@ -7,6 +7,7 @@ v2 API 客户端
import logging
import subprocess
import time
import requests
from typing import Dict, List, Optional, Any
@@ -24,6 +25,8 @@ class APIClientV2:
负责与渲染服务端的所有 HTTP 通信。
"""
SYSTEM_INFO_TTL_SECONDS = 30
def __init__(self, config: WorkerConfig):
"""
初始化 API 客户端
@@ -37,6 +40,15 @@ class APIClientV2:
self.worker_id = config.worker_id
self.session = requests.Session()
self._ffmpeg_version: Optional[str] = None
self._codec_info: Optional[str] = None
self._hw_accel_info: Optional[str] = None
self._gpu_info: Optional[str] = None
self._gpu_info_checked = False
self._static_system_info: Optional[Dict[str, Any]] = None
self._system_info_cache: Optional[Dict[str, Any]] = None
self._system_info_cache_ts = 0.0
# 设置默认请求头
self.session.headers.update({
'Content-Type': 'application/json',
@@ -287,6 +299,8 @@ class APIClientV2:
def _get_ffmpeg_version(self) -> str:
"""获取 FFmpeg 版本"""
if self._ffmpeg_version is not None:
return self._ffmpeg_version
try:
result = subprocess.run(
['ffmpeg', '-version'],
@@ -299,13 +313,18 @@ class APIClientV2:
parts = first_line.split()
for i, part in enumerate(parts):
if part == 'version' and i + 1 < len(parts):
return parts[i + 1]
return 'unknown'
self._ffmpeg_version = parts[i + 1]
return self._ffmpeg_version
self._ffmpeg_version = 'unknown'
return self._ffmpeg_version
except Exception:
return 'unknown'
self._ffmpeg_version = 'unknown'
return self._ffmpeg_version
def _get_codec_info(self) -> str:
"""获取支持的编解码器信息"""
if self._codec_info is not None:
return self._codec_info
try:
result = subprocess.run(
['ffmpeg', '-codecs'],
@@ -324,37 +343,60 @@ class APIClientV2:
codecs.append('aac')
if 'libfdk_aac' in output:
codecs.append('libfdk_aac')
return ', '.join(codecs) if codecs else 'unknown'
self._codec_info = ', '.join(codecs) if codecs else 'unknown'
return self._codec_info
except Exception:
return 'unknown'
self._codec_info = 'unknown'
return self._codec_info
def _get_system_info(self) -> Dict[str, Any]:
"""获取系统信息"""
try:
now = time.monotonic()
if (
self._system_info_cache
and now - self._system_info_cache_ts < self.SYSTEM_INFO_TTL_SECONDS
):
return self._system_info_cache
import platform
import psutil
info = {
if self._hw_accel_info is None:
self._hw_accel_info = get_hw_accel_info_str()
if self._static_system_info is None:
self._static_system_info = {
'os': platform.system(),
'cpu': f"{psutil.cpu_count()} cores",
'memory': f"{psutil.virtual_memory().total // (1024**3)}GB",
'hwAccelConfig': self.config.hw_accel, # 当前配置的硬件加速
'hwAccelSupport': self._hw_accel_info, # 系统支持的硬件加速
}
info = dict(self._static_system_info)
info.update({
'cpuUsage': f"{psutil.cpu_percent()}%",
'memoryAvailable': f"{psutil.virtual_memory().available // (1024**3)}GB",
'hwAccelConfig': self.config.hw_accel, # 当前配置的硬件加速
'hwAccelSupport': get_hw_accel_info_str(), # 系统支持的硬件加速
}
})
# 尝试获取 GPU 信息
gpu_info = self._get_gpu_info()
if gpu_info:
info['gpu'] = gpu_info
self._system_info_cache = info
self._system_info_cache_ts = now
return info
except Exception:
return {}
def _get_gpu_info(self) -> Optional[str]:
"""获取 GPU 信息"""
if self._gpu_info_checked:
return self._gpu_info
self._gpu_info_checked = True
try:
result = subprocess.run(
['nvidia-smi', '--query-gpu=name', '--format=csv,noheader'],
@@ -364,10 +406,11 @@ class APIClientV2:
)
if result.returncode == 0:
gpu_name = result.stdout.strip().split('\n')[0]
return gpu_name
self._gpu_info = gpu_name
except Exception:
pass
return None
self._gpu_info = None
return self._gpu_info
def close(self):
"""关闭会话"""

View File

@@ -10,6 +10,7 @@ import hashlib
import logging
import shutil
import time
import uuid
from typing import Optional, Tuple
from urllib.parse import urlparse, unquote
@@ -59,6 +60,9 @@ class MaterialCache:
负责素材文件的缓存存储和检索。
"""
LOCK_TIMEOUT_SEC = 30.0
LOCK_POLL_INTERVAL_SEC = 0.1
def __init__(self, cache_dir: str, enabled: bool = True, max_size_gb: float = 0):
"""
初始化缓存管理器
@@ -91,6 +95,44 @@ class MaterialCache:
filename = f"{cache_key}{ext}"
return os.path.join(self.cache_dir, filename)
def _get_lock_path(self, cache_key: str) -> str:
"""获取缓存锁文件路径"""
assert self.cache_dir
return os.path.join(self.cache_dir, f"{cache_key}.lock")
def _acquire_lock(self, cache_key: str) -> Optional[str]:
"""获取缓存锁(跨进程安全)"""
if not self.enabled:
return None
lock_path = self._get_lock_path(cache_key)
deadline = time.monotonic() + self.LOCK_TIMEOUT_SEC
while True:
try:
fd = os.open(lock_path, os.O_CREAT | os.O_EXCL | os.O_WRONLY)
os.close(fd)
return lock_path
except FileExistsError:
if time.monotonic() >= deadline:
logger.warning(f"Cache lock timeout: {lock_path}")
return None
time.sleep(self.LOCK_POLL_INTERVAL_SEC)
except Exception as e:
logger.warning(f"Cache lock error: {e}")
return None
def _release_lock(self, lock_path: Optional[str]) -> None:
"""释放缓存锁"""
if not lock_path:
return
try:
os.remove(lock_path)
except FileNotFoundError:
return
except Exception as e:
logger.warning(f"Cache lock release error: {e}")
def is_cached(self, url: str) -> Tuple[bool, str]:
"""
检查素材是否已缓存
@@ -136,8 +178,15 @@ class MaterialCache:
if not self.enabled:
return storage.download_file(url, dest, max_retries=max_retries, timeout=timeout)
# 检查缓存
cached, cache_path = self.is_cached(url)
cache_key = _extract_cache_key(url)
lock_path = self._acquire_lock(cache_key)
if not lock_path:
logger.warning(f"Cache lock unavailable, downloading without cache: {url[:80]}...")
return storage.download_file(url, dest, max_retries=max_retries, timeout=timeout)
try:
cache_path = self.get_cache_path(url)
cached = os.path.exists(cache_path) and os.path.getsize(cache_path) > 0
if cached:
# 命中缓存,复制到目标路径
@@ -159,8 +208,11 @@ class MaterialCache:
# 未命中缓存,下载到缓存目录
logger.debug(f"Cache miss: {url[:80]}...")
# 先下载到临时文件
temp_cache_path = cache_path + '.downloading'
# 先下载到临时文件(唯一文件名,避免并发覆盖)
temp_cache_path = os.path.join(
self.cache_dir,
f"{cache_key}.{uuid.uuid4().hex}.downloading"
)
try:
if not storage.download_file(url, temp_cache_path, max_retries=max_retries, timeout=timeout):
# 下载失败,清理临时文件
@@ -168,10 +220,13 @@ class MaterialCache:
os.remove(temp_cache_path)
return False
# 下载成功,移动到正式缓存路径
if os.path.exists(cache_path):
os.remove(cache_path)
os.rename(temp_cache_path, cache_path)
if not os.path.exists(temp_cache_path) or os.path.getsize(temp_cache_path) <= 0:
if os.path.exists(temp_cache_path):
os.remove(temp_cache_path)
return False
# 下载成功,原子替换缓存文件
os.replace(temp_cache_path, cache_path)
# 复制到目标路径
shutil.copy2(cache_path, dest)
@@ -193,6 +248,8 @@ class MaterialCache:
except Exception:
pass
return False
finally:
self._release_lock(lock_path)
def _cleanup_if_needed(self) -> None:
"""
@@ -209,7 +266,7 @@ class MaterialCache:
total_size = 0
for filename in os.listdir(self.cache_dir):
if filename.endswith('.downloading'):
if filename.endswith('.downloading') or filename.endswith('.lock'):
continue
file_path = os.path.join(self.cache_dir, filename)
if os.path.isfile(file_path):
@@ -275,7 +332,7 @@ class MaterialCache:
total_size = 0
for filename in os.listdir(self.cache_dir):
if filename.endswith('.downloading'):
if filename.endswith('.downloading') or filename.endswith('.lock'):
continue
file_path = os.path.join(self.cache_dir, filename)
if os.path.isfile(file_path):

164
services/gpu_scheduler.py Normal file
View File

@@ -0,0 +1,164 @@
# -*- coding: utf-8 -*-
"""
GPU 调度器
提供多 GPU 设备的轮询调度功能。
"""
import logging
import threading
from typing import List, Optional
from domain.config import WorkerConfig
from domain.gpu import GPUDevice
from util.system import get_all_gpu_info, validate_gpu_device
from constant import HW_ACCEL_CUDA, HW_ACCEL_QSV
logger = logging.getLogger(__name__)
class GPUScheduler:
"""
GPU 调度器
实现多 GPU 设备的轮询(Round Robin)调度。
线程安全,支持并发任务执行。
使用方式:
scheduler = GPUScheduler(config)
# 在任务执行时
device_index = scheduler.acquire()
try:
# 执行任务
pass
finally:
scheduler.release(device_index)
"""
def __init__(self, config: WorkerConfig):
"""
初始化调度器
Args:
config: Worker 配置
"""
self._config = config
self._devices: List[GPUDevice] = []
self._next_index: int = 0
self._lock = threading.Lock()
self._enabled = False
# 初始化设备列表
self._init_devices()
def _init_devices(self) -> None:
"""初始化 GPU 设备列表"""
# 仅在启用硬件加速时才初始化
if self._config.hw_accel not in (HW_ACCEL_CUDA, HW_ACCEL_QSV):
logger.info("Hardware acceleration not enabled, GPU scheduler disabled")
return
configured_devices = self._config.gpu_devices
if configured_devices:
# 使用配置指定的设备
self._devices = self._validate_configured_devices(configured_devices)
else:
# 自动检测所有设备
self._devices = self._auto_detect_devices()
if self._devices:
self._enabled = True
device_info = ', '.join(str(d) for d in self._devices)
logger.info(f"GPU scheduler initialized with {len(self._devices)} device(s): {device_info}")
else:
logger.warning("No GPU devices available, scheduler disabled")
def _validate_configured_devices(self, indices: List[int]) -> List[GPUDevice]:
"""
验证配置的设备列表
Args:
indices: 配置的设备索引列表
Returns:
验证通过的设备列表
"""
devices = []
for index in indices:
if validate_gpu_device(index):
devices.append(GPUDevice(
index=index,
name=f"GPU-{index}",
available=True
))
else:
logger.warning(f"GPU device {index} is not available, skipping")
return devices
def _auto_detect_devices(self) -> List[GPUDevice]:
"""
自动检测所有可用 GPU
Returns:
检测到的设备列表
"""
all_devices = get_all_gpu_info()
# 过滤不可用设备
return [d for d in all_devices if d.available]
@property
def enabled(self) -> bool:
"""调度器是否启用"""
return self._enabled
@property
def device_count(self) -> int:
"""设备数量"""
return len(self._devices)
def acquire(self) -> Optional[int]:
"""
获取下一个可用的 GPU 设备(轮询调度)
Returns:
GPU 设备索引,如果调度器未启用或无设备则返回 None
"""
if not self._enabled or not self._devices:
return None
with self._lock:
device = self._devices[self._next_index]
self._next_index = (self._next_index + 1) % len(self._devices)
logger.debug(f"Acquired GPU device: {device.index}")
return device.index
def release(self, device_index: Optional[int]) -> None:
"""
释放 GPU 设备
当前实现为无状态轮询,此方法仅用于日志记录。
Args:
device_index: 设备索引
"""
if device_index is not None:
logger.debug(f"Released GPU device: {device_index}")
def get_status(self) -> dict:
"""
获取调度器状态信息
Returns:
状态字典
"""
return {
'enabled': self._enabled,
'device_count': len(self._devices),
'devices': [
{'index': d.index, 'name': d.name, 'available': d.available}
for d in self._devices
],
'hw_accel': self._config.hw_accel,
}

View File

@@ -7,6 +7,7 @@
import os
import logging
import subprocess
from typing import Optional
import requests
@@ -73,13 +74,13 @@ def upload_file(url: str, file_path: str, max_retries: int = 5, timeout: int = 6
while retries < max_retries:
try:
with open(file_path, 'rb') as f:
response = requests.put(
with requests.put(
http_url,
data=f,
stream=True,
timeout=timeout,
headers={"Content-Type": "application/octet-stream"}
)
) as response:
response.raise_for_status()
logger.info(f"Upload succeeded: {file_path}")
return True
@@ -111,7 +112,6 @@ def _upload_with_rclone(url: str, file_path: str) -> bool:
return False
config_file = os.getenv("RCLONE_CONFIG_FILE", "")
rclone_config = f"--config {config_file}" if config_file else ""
# 替换 URL
new_url = url
@@ -123,19 +123,30 @@ def _upload_with_rclone(url: str, file_path: str) -> bool:
if new_url == url:
return False
cmd = (
f"rclone copyto --no-check-dest --ignore-existing "
f"--multi-thread-chunk-size 8M --multi-thread-streams 8 "
f"{rclone_config} {file_path} {new_url}"
)
logger.debug(f"rclone command: {cmd}")
cmd = [
"rclone",
"copyto",
"--no-check-dest",
"--ignore-existing",
"--multi-thread-chunk-size",
"8M",
"--multi-thread-streams",
"8",
]
if config_file:
cmd.extend(["--config", config_file])
cmd.extend([file_path, new_url])
result = os.system(cmd)
if result == 0:
logger.debug(f"rclone command: {' '.join(cmd)}")
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
logger.info(f"rclone upload succeeded: {file_path}")
return True
logger.warning(f"rclone upload failed (code={result}): {file_path}")
stderr = (result.stderr or '').strip()
stderr = stderr[:500] if stderr else ""
logger.warning(f"rclone upload failed (code={result.returncode}): {file_path} {stderr}")
return False
@@ -177,7 +188,7 @@ def download_file(
retries = 0
while retries < max_retries:
try:
response = requests.get(http_url, timeout=timeout, stream=True)
with requests.get(http_url, timeout=timeout, stream=True) as response:
response.raise_for_status()
with open(file_path, 'wb') as f:

View File

@@ -15,6 +15,7 @@ from domain.result import TaskResult, ErrorCode
from domain.config import WorkerConfig
from core.handler import TaskHandler
from services.lease_service import LeaseService
from services.gpu_scheduler import GPUScheduler
if TYPE_CHECKING:
from services.api_client import APIClientV2
@@ -60,6 +61,12 @@ class TaskExecutor:
# 线程安全锁
self.lock = threading.Lock()
# GPU 调度器(如果启用硬件加速)
self.gpu_scheduler = GPUScheduler(config)
if self.gpu_scheduler.enabled:
logger.info(f"GPU scheduler enabled with {self.gpu_scheduler.device_count} device(s)")
# 注册处理器
self._register_handlers()
@@ -130,6 +137,14 @@ class TaskExecutor:
logger.warning(f"[task:{task.task_id}] Task already running, skipping")
return False
# 检查并发上限
if len(self.current_tasks) >= self.config.max_concurrency:
logger.info(
f"[task:{task.task_id}] Max concurrency reached "
f"({self.config.max_concurrency}), rejecting task"
)
return False
# 检查是否有对应的处理器
if task.task_type not in self.handlers:
logger.error(f"[task:{task.task_id}] No handler for type: {task.task_type.value}")
@@ -164,15 +179,27 @@ class TaskExecutor:
)
lease_service.start()
# 获取 GPU 设备
device_index = None
if self.gpu_scheduler.enabled:
device_index = self.gpu_scheduler.acquire()
if device_index is not None:
logger.info(f"[task:{task_id}] Assigned to GPU device {device_index}")
# 获取处理器(需要在设置 GPU 设备前获取)
handler = self.handlers.get(task.task_type)
try:
# 报告任务开始
self.api_client.report_start(task_id)
# 获取处理器
handler = self.handlers.get(task.task_type)
if not handler:
raise ValueError(f"No handler for task type: {task.task_type}")
# 设置 GPU 设备(线程本地存储)
if device_index is not None:
handler.set_gpu_device(device_index)
# 执行前钩子
handler.before_handle(task)
@@ -196,6 +223,14 @@ class TaskExecutor:
self.api_client.report_fail(task_id, 'E_UNKNOWN', str(e))
finally:
# 清除 GPU 设备设置
if handler:
handler.clear_gpu_device()
# 释放 GPU 设备
if self.gpu_scheduler.enabled:
self.gpu_scheduler.release(device_index)
# 停止租约续期
lease_service.stop()

View File

@@ -0,0 +1,15 @@
# -*- coding: utf-8 -*-
import os
from services.cache import MaterialCache, _extract_cache_key
def test_cache_lock_acquire_release(tmp_path):
cache = MaterialCache(cache_dir=str(tmp_path), enabled=True, max_size_gb=0)
cache_key = _extract_cache_key("https://example.com/path/file.mp4?token=abc")
lock_path = cache._acquire_lock(cache_key)
assert lock_path
assert os.path.exists(lock_path)
cache._release_lock(lock_path)
assert not os.path.exists(lock_path)

View File

@@ -5,13 +5,17 @@
提供系统信息采集功能。
"""
import logging
import os
import platform
import subprocess
from typing import Optional, Dict, Any
from typing import Optional, Dict, Any, List
import psutil
from constant import SOFTWARE_VERSION, DEFAULT_CAPABILITIES, HW_ACCEL_NONE, HW_ACCEL_QSV, HW_ACCEL_CUDA
from domain.gpu import GPUDevice
logger = logging.getLogger(__name__)
def get_sys_info():
@@ -264,3 +268,78 @@ def get_hw_accel_info_str() -> str:
return "No hardware acceleration available"
return ', '.join(parts) + f" [recommended: {support['recommended']}]"
def get_all_gpu_info() -> List[GPUDevice]:
"""
获取所有 NVIDIA GPU 信息
使用 nvidia-smi 查询所有 GPU 设备。
Returns:
GPU 设备列表,失败返回空列表
"""
try:
result = subprocess.run(
[
'nvidia-smi',
'--query-gpu=index,name,memory.total',
'--format=csv,noheader,nounits'
],
capture_output=True,
text=True,
timeout=10
)
if result.returncode != 0:
return []
devices = []
for line in result.stdout.strip().split('\n'):
if not line.strip():
continue
parts = [p.strip() for p in line.split(',')]
if len(parts) >= 2:
index = int(parts[0])
name = parts[1]
memory = int(parts[2]) if len(parts) >= 3 else None
devices.append(GPUDevice(
index=index,
name=name,
memory_total=memory,
available=True
))
return devices
except Exception as e:
logger.warning(f"Failed to detect GPUs: {e}")
return []
def validate_gpu_device(index: int) -> bool:
"""
验证指定索引的 GPU 设备是否可用
Args:
index: GPU 设备索引
Returns:
设备是否可用
"""
try:
result = subprocess.run(
[
'nvidia-smi',
'-i', str(index),
'--query-gpu=name',
'--format=csv,noheader'
],
capture_output=True,
text=True,
timeout=5
)
return result.returncode == 0 and bool(result.stdout.strip())
except Exception:
return False