Compare commits

..

2 Commits

Author SHA1 Message Date
d955def63c feat(tracing): 增强文件下载上传的日志记录和追踪功能
- 添加任务上下文信息到日志前缀,便于追踪具体任务
- 在跨度中增加文件源URL和上传URL的属性记录
- 将存储服务中的info级别日志调整为debug级别以减少冗余输出
- 添加文件访问地址的调试日志输出
- 优化根日志级别设置允许DEBUG日志流入处理器
- 修复重试失败后的错误日志格式问题
2026-02-07 00:26:01 +08:00
9d16d3c6af feat(gpu): 添加 QSV 硬件加速支持
- 实现 QSV 设备初始化逻辑,支持 Intel 核显
- 区分 QSV 和 CUDA 设备初始化流程
- 添加 QSV 设备验证和配置处理
- 更新设备检测逻辑以支持不同硬件加速类型
- 实现 QSV 设备名称格式化和可用性设置
2026-02-07 00:25:43 +08:00
4 changed files with 55 additions and 13 deletions

View File

@@ -23,7 +23,7 @@ from domain.result import TaskResult, ErrorCode
from domain.config import WorkerConfig from domain.config import WorkerConfig
from services import storage from services import storage
from services.cache import MaterialCache from services.cache import MaterialCache
from util.tracing import mark_span_error, start_span from util.tracing import get_current_task_context, mark_span_error, start_span
from constant import ( from constant import (
HW_ACCEL_NONE, HW_ACCEL_QSV, HW_ACCEL_CUDA, HW_ACCEL_NONE, HW_ACCEL_QSV, HW_ACCEL_CUDA,
VIDEO_ENCODE_PARAMS, VIDEO_ENCODE_PARAMS_QSV, VIDEO_ENCODE_PARAMS_CUDA VIDEO_ENCODE_PARAMS, VIDEO_ENCODE_PARAMS_QSV, VIDEO_ENCODE_PARAMS_CUDA
@@ -413,10 +413,15 @@ class BaseHandler(TaskHandler, ABC):
if timeout is None: if timeout is None:
timeout = self.config.download_timeout timeout = self.config.download_timeout
task_context = get_current_task_context()
task_prefix = f"[task:{task_context.task_id}] " if task_context else ""
logger.debug(f"{task_prefix}Downloading from: {url} -> {dest}")
with start_span( with start_span(
"render.task.file.download", "render.task.file.download",
kind=SpanKind.CLIENT, kind=SpanKind.CLIENT,
attributes={ attributes={
"render.file.source_url": url,
"render.file.destination": dest, "render.file.destination": dest,
"render.file.use_cache": use_cache, "render.file.use_cache": use_cache,
}, },
@@ -429,13 +434,14 @@ class BaseHandler(TaskHandler, ABC):
if result: if result:
file_size = os.path.getsize(dest) if os.path.exists(dest) else 0 file_size = os.path.getsize(dest) if os.path.exists(dest) else 0
logger.debug(f"Downloaded: {url} -> {dest} ({file_size} bytes)") logger.debug(f"{task_prefix}Downloaded: {url} -> {dest} ({file_size} bytes)")
if span is not None: if span is not None:
span.set_attribute("render.file.size_bytes", file_size) span.set_attribute("render.file.size_bytes", file_size)
return result return result
except Exception as e: except Exception as e:
mark_span_error(span, str(e), ErrorCode.E_INPUT_UNAVAILABLE.value) mark_span_error(span, str(e), ErrorCode.E_INPUT_UNAVAILABLE.value)
logger.error(f"Download failed: {url} -> {e}") logger.error(f"{task_prefix}Download failed: {e}")
logger.debug(f"{task_prefix}Download source address: {url}")
return False return False
def upload_file( def upload_file(
@@ -477,11 +483,22 @@ class BaseHandler(TaskHandler, ABC):
logger.error(f"[task:{task_id}] Invalid upload URL response") logger.error(f"[task:{task_id}] Invalid upload URL response")
return None return None
logger.debug(
f"[task:{task_id}] Upload target address: uploadUrl={upload_url}, accessUrl={access_url}"
)
if span is not None:
span.set_attribute("render.file.upload_url", upload_url)
if access_url:
span.set_attribute("render.file.access_url", access_url)
try: try:
result = storage.upload_file(upload_url, file_path, timeout=self.config.upload_timeout) result = storage.upload_file(upload_url, file_path, timeout=self.config.upload_timeout)
if result: if result:
file_size = os.path.getsize(file_path) file_size = os.path.getsize(file_path)
logger.info(f"[task:{task_id}] Uploaded: {file_path} ({file_size} bytes)") logger.info(
f"[task:{task_id}] Uploaded: {file_path} ({file_size} bytes)"
)
logger.debug(f"[task:{task_id}] Uploaded access address: {access_url or upload_url}")
if span is not None: if span is not None:
span.set_attribute("render.file.size_bytes", file_size) span.set_attribute("render.file.size_bytes", file_size)

View File

@@ -46,7 +46,8 @@ def setup_logging():
# 获取根logger # 获取根logger
root_logger = logging.getLogger() root_logger = logging.getLogger()
root_logger.setLevel(logging.INFO) # 允许 DEBUG 日志流入各 handler(具体是否落盘由 handler 级别决定)
root_logger.setLevel(logging.DEBUG)
# 清除已有的handlers(避免重复) # 清除已有的handlers(避免重复)
root_logger.handlers.clear() root_logger.handlers.clear()

View File

@@ -61,11 +61,14 @@ class GPUScheduler:
configured_devices = self._config.gpu_devices configured_devices = self._config.gpu_devices
if configured_devices: if self._config.hw_accel == HW_ACCEL_QSV:
# 使用配置指定的设备 # QSV 使用 Intel 核显,无 nvidia-smi,直接按配置或默认设备初始化
self._devices = self._init_qsv_devices(configured_devices)
elif configured_devices:
# CUDA:使用配置指定的设备并通过 nvidia-smi 验证
self._devices = self._validate_configured_devices(configured_devices) self._devices = self._validate_configured_devices(configured_devices)
else: else:
# 自动检测所有设备 # CUDA:自动检测所有 NVIDIA 设备
self._devices = self._auto_detect_devices() self._devices = self._auto_detect_devices()
if self._devices: if self._devices:
@@ -75,6 +78,25 @@ class GPUScheduler:
else: else:
logger.warning("No GPU devices available, scheduler disabled") logger.warning("No GPU devices available, scheduler disabled")
def _init_qsv_devices(self, configured_indices: List[int]) -> List[GPUDevice]:
"""
初始化 QSV 设备列表
QSV 使用 Intel 核显,没有 nvidia-smi 可用于检测。
若配置了 GPU_DEVICES 则直接信任配置,否则使用默认设备 0。
Args:
configured_indices: 配置的设备索引列表(可为空)
Returns:
QSV 设备列表
"""
indices = configured_indices if configured_indices else [0]
return [
GPUDevice(index=idx, name=f"QSV-{idx}", available=True)
for idx in indices
]
def _validate_configured_devices(self, indices: List[int]) -> List[GPUDevice]: def _validate_configured_devices(self, indices: List[int]) -> List[GPUDevice]:
""" """
验证配置的设备列表 验证配置的设备列表

View File

@@ -86,7 +86,7 @@ def upload_file(url: str, file_path: str, max_retries: int = 5, timeout: int = 6
# 检查是否使用 rclone 上传 # 检查是否使用 rclone 上传
if os.getenv("UPLOAD_METHOD") == "rclone": if os.getenv("UPLOAD_METHOD") == "rclone":
logger.info(f"Uploading to: {url}") logger.debug(f"Uploading to: {url}")
result = _upload_with_rclone(url, file_path) result = _upload_with_rclone(url, file_path)
if result: if result:
return True return True
@@ -95,7 +95,7 @@ def upload_file(url: str, file_path: str, max_retries: int = 5, timeout: int = 6
# 应用 HTTP_REPLACE_MAP 替换 URL # 应用 HTTP_REPLACE_MAP 替换 URL
http_url = _apply_http_replace_map(url) http_url = _apply_http_replace_map(url)
content_type = _get_content_type(file_path) content_type = _get_content_type(file_path)
logger.info(f"Uploading to: {http_url} (Content-Type: {content_type})") logger.debug(f"Uploading to: {http_url} (Content-Type: {content_type})")
retries = 0 retries = 0
while retries < max_retries: while retries < max_retries:
@@ -152,7 +152,8 @@ def _upload_with_rclone(url: str, file_path: str) -> bool:
return False return False
if new_url.startswith(("http://", "https://")): if new_url.startswith(("http://", "https://")):
logger.warning(f"rclone upload skipped: URL still starts with http after replace: {new_url}") logger.warning("rclone upload skipped: URL still starts with http after replace")
logger.debug(f"rclone upload skipped address: {new_url}")
return False return False
cmd = [ cmd = [
@@ -207,7 +208,7 @@ def download_file(
logger.debug(f"File exists, skipping download: {file_path}") logger.debug(f"File exists, skipping download: {file_path}")
return True return True
logger.info(f"Downloading: {url}") logger.debug(f"Downloading: {url}")
# 确保目标目录存在 # 确保目标目录存在
file_dir = os.path.dirname(file_path) file_dir = os.path.dirname(file_path)
@@ -239,5 +240,6 @@ def download_file(
retries += 1 retries += 1
logger.warning(f"Download failed ({e}). Retrying {retries}/{max_retries}...") logger.warning(f"Download failed ({e}). Retrying {retries}/{max_retries}...")
logger.error(f"Download failed after {max_retries} retries: {url}") logger.error(f"Download failed after {max_retries} retries")
logger.debug(f"Download failed source address: {url}")
return False return False