diff --git a/.env.example b/.env.example index d2f662f..351a3eb 100644 --- a/.env.example +++ b/.env.example @@ -32,11 +32,17 @@ TEMP_DIR=tmp/ #UPLOAD_TIMEOUT=600 # 上传超时(秒) # =================== -# 硬件加速 +# 硬件加速与多显卡 # =================== -# 可选值: none, qsv, cuda +# 硬件加速类型: none, qsv, cuda HW_ACCEL=none +# GPU 设备列表(逗号分隔的设备索引) +# 不配置时:自动检测所有设备 +# 单设备示例:GPU_DEVICES=0 +# 多设备示例:GPU_DEVICES=0,1,2 +#GPU_DEVICES=0,1 + # =================== # 素材缓存 # =================== diff --git a/domain/config.py b/domain/config.py index 7715479..9ed95ab 100644 --- a/domain/config.py +++ b/domain/config.py @@ -5,12 +5,15 @@ Worker 配置模型 定义 Worker 运行时的配置参数。 """ +import logging import os from dataclasses import dataclass, field from typing import List, Optional from constant import HW_ACCEL_NONE, HW_ACCEL_QSV, HW_ACCEL_CUDA, HW_ACCEL_TYPES +logger = logging.getLogger(__name__) + # 默认支持的任务类型 DEFAULT_CAPABILITIES = [ @@ -59,6 +62,9 @@ class WorkerConfig: # 硬件加速配置 hw_accel: str = HW_ACCEL_NONE # 硬件加速类型: none, qsv, cuda + # GPU 设备配置(多显卡调度) + gpu_devices: List[int] = field(default_factory=list) # 空列表表示使用默认设备 + # 素材缓存配置 cache_enabled: bool = True # 是否启用素材缓存 cache_dir: str = "" # 缓存目录,默认为 temp_dir/cache @@ -113,6 +119,16 @@ class WorkerConfig: if hw_accel not in HW_ACCEL_TYPES: hw_accel = HW_ACCEL_NONE + # GPU 设备列表(用于多显卡调度) + gpu_devices_str = os.getenv('GPU_DEVICES', '') + gpu_devices: List[int] = [] + if gpu_devices_str: + try: + gpu_devices = [int(d.strip()) for d in gpu_devices_str.split(',') if d.strip()] + except ValueError: + logger.warning(f"Invalid GPU_DEVICES value: {gpu_devices_str}, using auto-detect") + gpu_devices = [] + # 素材缓存配置 cache_enabled = os.getenv('CACHE_ENABLED', 'true').lower() in ('true', '1', 'yes') cache_dir = os.getenv('CACHE_DIR', '') # 空字符串表示使用默认路径 @@ -132,6 +148,7 @@ class WorkerConfig: download_timeout=download_timeout, upload_timeout=upload_timeout, hw_accel=hw_accel, + gpu_devices=gpu_devices, cache_enabled=cache_enabled, cache_dir=cache_dir if cache_dir else os.path.join(temp_dir, 'cache'), cache_max_size_gb=cache_max_size_gb @@ -156,3 +173,11 @@ class WorkerConfig: def is_cuda(self) -> bool: """是否使用 CUDA 硬件加速""" return self.hw_accel == HW_ACCEL_CUDA + + def has_multi_gpu(self) -> bool: + """是否配置了多 GPU""" + return len(self.gpu_devices) > 1 + + def get_gpu_devices(self) -> List[int]: + """获取 GPU 设备列表""" + return self.gpu_devices.copy() diff --git a/domain/gpu.py b/domain/gpu.py new file mode 100644 index 0000000..b09d94d --- /dev/null +++ b/domain/gpu.py @@ -0,0 +1,31 @@ +# -*- coding: utf-8 -*- +""" +GPU 设备模型 + +定义 GPU 设备的数据结构。 +""" + +from dataclasses import dataclass +from typing import Optional + + +@dataclass +class GPUDevice: + """ + GPU 设备信息 + + Attributes: + index: 设备索引(对应 nvidia-smi 中的 GPU ID) + name: 设备名称(如 "NVIDIA GeForce RTX 3090") + memory_total: 显存总量(MB),可选 + available: 设备是否可用 + """ + index: int + name: str + memory_total: Optional[int] = None + available: bool = True + + def __str__(self) -> str: + status = "available" if self.available else "unavailable" + mem_info = f", {self.memory_total}MB" if self.memory_total else "" + return f"GPU[{self.index}]: {self.name}{mem_info} ({status})" diff --git a/handlers/base.py b/handlers/base.py index 2742dc5..ada5e18 100644 --- a/handlers/base.py +++ b/handlers/base.py @@ -11,6 +11,7 @@ import logging import shutil import tempfile import subprocess +import threading from abc import ABC from typing import Optional, List, Dict, Any, Tuple, TYPE_CHECKING @@ -75,23 +76,33 @@ def get_video_encode_args(hw_accel: str = HW_ACCEL_NONE) -> List[str]: ] -def get_hwaccel_decode_args(hw_accel: str = HW_ACCEL_NONE) -> List[str]: +def get_hwaccel_decode_args(hw_accel: str = HW_ACCEL_NONE, device_index: Optional[int] = None) -> List[str]: """ 获取硬件加速解码参数(输入文件之前使用) Args: hw_accel: 硬件加速类型 (none, qsv, cuda) + device_index: GPU 设备索引,用于多显卡调度 Returns: FFmpeg 硬件加速解码参数列表 """ if hw_accel == HW_ACCEL_CUDA: # CUDA 硬件加速解码 - # 注意:使用 cuda 作为 hwaccel,但输出到系统内存以便 CPU 滤镜处理 - return ['-hwaccel', 'cuda', '-hwaccel_output_format', 'cuda'] + args = ['-hwaccel', 'cuda'] + # 多显卡模式下指定设备 + if device_index is not None: + args.extend(['-hwaccel_device', str(device_index)]) + args.extend(['-hwaccel_output_format', 'cuda']) + return args elif hw_accel == HW_ACCEL_QSV: # QSV 硬件加速解码 - return ['-hwaccel', 'qsv', '-hwaccel_output_format', 'qsv'] + args = ['-hwaccel', 'qsv'] + # QSV 在 Windows 上使用 -qsv_device + if device_index is not None: + args.extend(['-qsv_device', str(device_index)]) + args.extend(['-hwaccel_output_format', 'qsv']) + return args else: return [] @@ -248,9 +259,13 @@ class BaseHandler(TaskHandler, ABC): - 临时目录管理 - 文件下载/上传 - FFmpeg 命令执行 + - GPU 设备管理(多显卡调度) - 日志记录 """ + # 线程本地存储:用于存储当前线程的 GPU 设备索引 + _thread_local = threading.local() + def __init__(self, config: WorkerConfig, api_client: 'APIClientV2'): """ 初始化处理器 @@ -267,6 +282,39 @@ class BaseHandler(TaskHandler, ABC): max_size_gb=config.cache_max_size_gb ) + # ========== GPU 设备管理 ========== + + def set_gpu_device(self, device_index: int) -> None: + """ + 设置当前线程的 GPU 设备索引 + + 由 TaskExecutor 在任务执行前调用。 + + Args: + device_index: GPU 设备索引 + """ + self._thread_local.gpu_device = device_index + + def get_gpu_device(self) -> Optional[int]: + """ + 获取当前线程的 GPU 设备索引 + + Returns: + GPU 设备索引,未设置则返回 None + """ + return getattr(self._thread_local, 'gpu_device', None) + + def clear_gpu_device(self) -> None: + """ + 清除当前线程的 GPU 设备索引 + + 由 TaskExecutor 在任务执行后调用。 + """ + if hasattr(self._thread_local, 'gpu_device'): + del self._thread_local.gpu_device + + # ========== FFmpeg 参数生成 ========== + def get_video_encode_args(self) -> List[str]: """ 获取当前配置的视频编码参数 @@ -278,12 +326,13 @@ class BaseHandler(TaskHandler, ABC): def get_hwaccel_decode_args(self) -> List[str]: """ - 获取硬件加速解码参数(在输入文件之前使用) + 获取硬件加速解码参数(支持设备指定) Returns: FFmpeg 硬件加速解码参数列表 """ - return get_hwaccel_decode_args(self.config.hw_accel) + device_index = self.get_gpu_device() + return get_hwaccel_decode_args(self.config.hw_accel, device_index) def get_hwaccel_filter_prefix(self) -> str: """ diff --git a/services/gpu_scheduler.py b/services/gpu_scheduler.py new file mode 100644 index 0000000..805bbdd --- /dev/null +++ b/services/gpu_scheduler.py @@ -0,0 +1,164 @@ +# -*- coding: utf-8 -*- +""" +GPU 调度器 + +提供多 GPU 设备的轮询调度功能。 +""" + +import logging +import threading +from typing import List, Optional + +from domain.config import WorkerConfig +from domain.gpu import GPUDevice +from util.system import get_all_gpu_info, validate_gpu_device +from constant import HW_ACCEL_CUDA, HW_ACCEL_QSV + +logger = logging.getLogger(__name__) + + +class GPUScheduler: + """ + GPU 调度器 + + 实现多 GPU 设备的轮询(Round Robin)调度。 + 线程安全,支持并发任务执行。 + + 使用方式: + scheduler = GPUScheduler(config) + + # 在任务执行时 + device_index = scheduler.acquire() + try: + # 执行任务 + pass + finally: + scheduler.release(device_index) + """ + + def __init__(self, config: WorkerConfig): + """ + 初始化调度器 + + Args: + config: Worker 配置 + """ + self._config = config + self._devices: List[GPUDevice] = [] + self._next_index: int = 0 + self._lock = threading.Lock() + self._enabled = False + + # 初始化设备列表 + self._init_devices() + + def _init_devices(self) -> None: + """初始化 GPU 设备列表""" + # 仅在启用硬件加速时才初始化 + if self._config.hw_accel not in (HW_ACCEL_CUDA, HW_ACCEL_QSV): + logger.info("Hardware acceleration not enabled, GPU scheduler disabled") + return + + configured_devices = self._config.gpu_devices + + if configured_devices: + # 使用配置指定的设备 + self._devices = self._validate_configured_devices(configured_devices) + else: + # 自动检测所有设备 + self._devices = self._auto_detect_devices() + + if self._devices: + self._enabled = True + device_info = ', '.join(str(d) for d in self._devices) + logger.info(f"GPU scheduler initialized with {len(self._devices)} device(s): {device_info}") + else: + logger.warning("No GPU devices available, scheduler disabled") + + def _validate_configured_devices(self, indices: List[int]) -> List[GPUDevice]: + """ + 验证配置的设备列表 + + Args: + indices: 配置的设备索引列表 + + Returns: + 验证通过的设备列表 + """ + devices = [] + for index in indices: + if validate_gpu_device(index): + devices.append(GPUDevice( + index=index, + name=f"GPU-{index}", + available=True + )) + else: + logger.warning(f"GPU device {index} is not available, skipping") + return devices + + def _auto_detect_devices(self) -> List[GPUDevice]: + """ + 自动检测所有可用 GPU + + Returns: + 检测到的设备列表 + """ + all_devices = get_all_gpu_info() + # 过滤不可用设备 + return [d for d in all_devices if d.available] + + @property + def enabled(self) -> bool: + """调度器是否启用""" + return self._enabled + + @property + def device_count(self) -> int: + """设备数量""" + return len(self._devices) + + def acquire(self) -> Optional[int]: + """ + 获取下一个可用的 GPU 设备(轮询调度) + + Returns: + GPU 设备索引,如果调度器未启用或无设备则返回 None + """ + if not self._enabled or not self._devices: + return None + + with self._lock: + device = self._devices[self._next_index] + self._next_index = (self._next_index + 1) % len(self._devices) + logger.debug(f"Acquired GPU device: {device.index}") + return device.index + + def release(self, device_index: Optional[int]) -> None: + """ + 释放 GPU 设备 + + 当前实现为无状态轮询,此方法仅用于日志记录。 + + Args: + device_index: 设备索引 + """ + if device_index is not None: + logger.debug(f"Released GPU device: {device_index}") + + def get_status(self) -> dict: + """ + 获取调度器状态信息 + + Returns: + 状态字典 + """ + return { + 'enabled': self._enabled, + 'device_count': len(self._devices), + 'devices': [ + {'index': d.index, 'name': d.name, 'available': d.available} + for d in self._devices + ], + 'hw_accel': self._config.hw_accel, + } diff --git a/services/task_executor.py b/services/task_executor.py index 15df864..917735a 100644 --- a/services/task_executor.py +++ b/services/task_executor.py @@ -15,6 +15,7 @@ from domain.result import TaskResult, ErrorCode from domain.config import WorkerConfig from core.handler import TaskHandler from services.lease_service import LeaseService +from services.gpu_scheduler import GPUScheduler if TYPE_CHECKING: from services.api_client import APIClientV2 @@ -60,6 +61,12 @@ class TaskExecutor: # 线程安全锁 self.lock = threading.Lock() + # GPU 调度器(如果启用硬件加速) + self.gpu_scheduler = GPUScheduler(config) + + if self.gpu_scheduler.enabled: + logger.info(f"GPU scheduler enabled with {self.gpu_scheduler.device_count} device(s)") + # 注册处理器 self._register_handlers() @@ -164,15 +171,27 @@ class TaskExecutor: ) lease_service.start() + # 获取 GPU 设备 + device_index = None + if self.gpu_scheduler.enabled: + device_index = self.gpu_scheduler.acquire() + if device_index is not None: + logger.info(f"[task:{task_id}] Assigned to GPU device {device_index}") + + # 获取处理器(需要在设置 GPU 设备前获取) + handler = self.handlers.get(task.task_type) + try: # 报告任务开始 self.api_client.report_start(task_id) - # 获取处理器 - handler = self.handlers.get(task.task_type) if not handler: raise ValueError(f"No handler for task type: {task.task_type}") + # 设置 GPU 设备(线程本地存储) + if device_index is not None: + handler.set_gpu_device(device_index) + # 执行前钩子 handler.before_handle(task) @@ -196,6 +215,14 @@ class TaskExecutor: self.api_client.report_fail(task_id, 'E_UNKNOWN', str(e)) finally: + # 清除 GPU 设备设置 + if handler: + handler.clear_gpu_device() + + # 释放 GPU 设备 + if self.gpu_scheduler.enabled: + self.gpu_scheduler.release(device_index) + # 停止租约续期 lease_service.stop() diff --git a/util/system.py b/util/system.py index efd24c1..be6cfd1 100644 --- a/util/system.py +++ b/util/system.py @@ -5,13 +5,17 @@ 提供系统信息采集功能。 """ +import logging import os import platform import subprocess -from typing import Optional, Dict, Any +from typing import Optional, Dict, Any, List import psutil from constant import SOFTWARE_VERSION, DEFAULT_CAPABILITIES, HW_ACCEL_NONE, HW_ACCEL_QSV, HW_ACCEL_CUDA +from domain.gpu import GPUDevice + +logger = logging.getLogger(__name__) def get_sys_info(): @@ -264,3 +268,78 @@ def get_hw_accel_info_str() -> str: return "No hardware acceleration available" return ', '.join(parts) + f" [recommended: {support['recommended']}]" + + +def get_all_gpu_info() -> List[GPUDevice]: + """ + 获取所有 NVIDIA GPU 信息 + + 使用 nvidia-smi 查询所有 GPU 设备。 + + Returns: + GPU 设备列表,失败返回空列表 + """ + try: + result = subprocess.run( + [ + 'nvidia-smi', + '--query-gpu=index,name,memory.total', + '--format=csv,noheader,nounits' + ], + capture_output=True, + text=True, + timeout=10 + ) + + if result.returncode != 0: + return [] + + devices = [] + for line in result.stdout.strip().split('\n'): + if not line.strip(): + continue + parts = [p.strip() for p in line.split(',')] + if len(parts) >= 2: + index = int(parts[0]) + name = parts[1] + memory = int(parts[2]) if len(parts) >= 3 else None + devices.append(GPUDevice( + index=index, + name=name, + memory_total=memory, + available=True + )) + + return devices + + except Exception as e: + logger.warning(f"Failed to detect GPUs: {e}") + return [] + + +def validate_gpu_device(index: int) -> bool: + """ + 验证指定索引的 GPU 设备是否可用 + + Args: + index: GPU 设备索引 + + Returns: + 设备是否可用 + """ + try: + result = subprocess.run( + [ + 'nvidia-smi', + '-i', str(index), + '--query-gpu=name', + '--format=csv,noheader' + ], + capture_output=True, + text=True, + timeout=5 + ) + return result.returncode == 0 and bool(result.stdout.strip()) + except Exception: + return False +