You've already forked FrameTour-RenderWorker
Compare commits
2 Commits
9b373dea34
...
d955def63c
| Author | SHA1 | Date | |
|---|---|---|---|
| d955def63c | |||
| 9d16d3c6af |
@@ -23,7 +23,7 @@ from domain.result import TaskResult, ErrorCode
|
||||
from domain.config import WorkerConfig
|
||||
from services import storage
|
||||
from services.cache import MaterialCache
|
||||
from util.tracing import mark_span_error, start_span
|
||||
from util.tracing import get_current_task_context, mark_span_error, start_span
|
||||
from constant import (
|
||||
HW_ACCEL_NONE, HW_ACCEL_QSV, HW_ACCEL_CUDA,
|
||||
VIDEO_ENCODE_PARAMS, VIDEO_ENCODE_PARAMS_QSV, VIDEO_ENCODE_PARAMS_CUDA
|
||||
@@ -413,10 +413,15 @@ class BaseHandler(TaskHandler, ABC):
|
||||
if timeout is None:
|
||||
timeout = self.config.download_timeout
|
||||
|
||||
task_context = get_current_task_context()
|
||||
task_prefix = f"[task:{task_context.task_id}] " if task_context else ""
|
||||
logger.debug(f"{task_prefix}Downloading from: {url} -> {dest}")
|
||||
|
||||
with start_span(
|
||||
"render.task.file.download",
|
||||
kind=SpanKind.CLIENT,
|
||||
attributes={
|
||||
"render.file.source_url": url,
|
||||
"render.file.destination": dest,
|
||||
"render.file.use_cache": use_cache,
|
||||
},
|
||||
@@ -429,13 +434,14 @@ class BaseHandler(TaskHandler, ABC):
|
||||
|
||||
if result:
|
||||
file_size = os.path.getsize(dest) if os.path.exists(dest) else 0
|
||||
logger.debug(f"Downloaded: {url} -> {dest} ({file_size} bytes)")
|
||||
logger.debug(f"{task_prefix}Downloaded: {url} -> {dest} ({file_size} bytes)")
|
||||
if span is not None:
|
||||
span.set_attribute("render.file.size_bytes", file_size)
|
||||
return result
|
||||
except Exception as e:
|
||||
mark_span_error(span, str(e), ErrorCode.E_INPUT_UNAVAILABLE.value)
|
||||
logger.error(f"Download failed: {url} -> {e}")
|
||||
logger.error(f"{task_prefix}Download failed: {e}")
|
||||
logger.debug(f"{task_prefix}Download source address: {url}")
|
||||
return False
|
||||
|
||||
def upload_file(
|
||||
@@ -477,11 +483,22 @@ class BaseHandler(TaskHandler, ABC):
|
||||
logger.error(f"[task:{task_id}] Invalid upload URL response")
|
||||
return None
|
||||
|
||||
logger.debug(
|
||||
f"[task:{task_id}] Upload target address: uploadUrl={upload_url}, accessUrl={access_url}"
|
||||
)
|
||||
if span is not None:
|
||||
span.set_attribute("render.file.upload_url", upload_url)
|
||||
if access_url:
|
||||
span.set_attribute("render.file.access_url", access_url)
|
||||
|
||||
try:
|
||||
result = storage.upload_file(upload_url, file_path, timeout=self.config.upload_timeout)
|
||||
if result:
|
||||
file_size = os.path.getsize(file_path)
|
||||
logger.info(f"[task:{task_id}] Uploaded: {file_path} ({file_size} bytes)")
|
||||
logger.info(
|
||||
f"[task:{task_id}] Uploaded: {file_path} ({file_size} bytes)"
|
||||
)
|
||||
logger.debug(f"[task:{task_id}] Uploaded access address: {access_url or upload_url}")
|
||||
if span is not None:
|
||||
span.set_attribute("render.file.size_bytes", file_size)
|
||||
|
||||
|
||||
3
index.py
3
index.py
@@ -46,7 +46,8 @@ def setup_logging():
|
||||
|
||||
# 获取根logger
|
||||
root_logger = logging.getLogger()
|
||||
root_logger.setLevel(logging.INFO)
|
||||
# 允许 DEBUG 日志流入各 handler(具体是否落盘由 handler 级别决定)
|
||||
root_logger.setLevel(logging.DEBUG)
|
||||
|
||||
# 清除已有的handlers(避免重复)
|
||||
root_logger.handlers.clear()
|
||||
|
||||
@@ -61,11 +61,14 @@ class GPUScheduler:
|
||||
|
||||
configured_devices = self._config.gpu_devices
|
||||
|
||||
if configured_devices:
|
||||
# 使用配置指定的设备
|
||||
if self._config.hw_accel == HW_ACCEL_QSV:
|
||||
# QSV 使用 Intel 核显,无 nvidia-smi,直接按配置或默认设备初始化
|
||||
self._devices = self._init_qsv_devices(configured_devices)
|
||||
elif configured_devices:
|
||||
# CUDA:使用配置指定的设备并通过 nvidia-smi 验证
|
||||
self._devices = self._validate_configured_devices(configured_devices)
|
||||
else:
|
||||
# 自动检测所有设备
|
||||
# CUDA:自动检测所有 NVIDIA 设备
|
||||
self._devices = self._auto_detect_devices()
|
||||
|
||||
if self._devices:
|
||||
@@ -75,6 +78,25 @@ class GPUScheduler:
|
||||
else:
|
||||
logger.warning("No GPU devices available, scheduler disabled")
|
||||
|
||||
def _init_qsv_devices(self, configured_indices: List[int]) -> List[GPUDevice]:
|
||||
"""
|
||||
初始化 QSV 设备列表
|
||||
|
||||
QSV 使用 Intel 核显,没有 nvidia-smi 可用于检测。
|
||||
若配置了 GPU_DEVICES 则直接信任配置,否则使用默认设备 0。
|
||||
|
||||
Args:
|
||||
configured_indices: 配置的设备索引列表(可为空)
|
||||
|
||||
Returns:
|
||||
QSV 设备列表
|
||||
"""
|
||||
indices = configured_indices if configured_indices else [0]
|
||||
return [
|
||||
GPUDevice(index=idx, name=f"QSV-{idx}", available=True)
|
||||
for idx in indices
|
||||
]
|
||||
|
||||
def _validate_configured_devices(self, indices: List[int]) -> List[GPUDevice]:
|
||||
"""
|
||||
验证配置的设备列表
|
||||
|
||||
@@ -86,7 +86,7 @@ def upload_file(url: str, file_path: str, max_retries: int = 5, timeout: int = 6
|
||||
|
||||
# 检查是否使用 rclone 上传
|
||||
if os.getenv("UPLOAD_METHOD") == "rclone":
|
||||
logger.info(f"Uploading to: {url}")
|
||||
logger.debug(f"Uploading to: {url}")
|
||||
result = _upload_with_rclone(url, file_path)
|
||||
if result:
|
||||
return True
|
||||
@@ -95,7 +95,7 @@ def upload_file(url: str, file_path: str, max_retries: int = 5, timeout: int = 6
|
||||
# 应用 HTTP_REPLACE_MAP 替换 URL
|
||||
http_url = _apply_http_replace_map(url)
|
||||
content_type = _get_content_type(file_path)
|
||||
logger.info(f"Uploading to: {http_url} (Content-Type: {content_type})")
|
||||
logger.debug(f"Uploading to: {http_url} (Content-Type: {content_type})")
|
||||
|
||||
retries = 0
|
||||
while retries < max_retries:
|
||||
@@ -152,7 +152,8 @@ def _upload_with_rclone(url: str, file_path: str) -> bool:
|
||||
return False
|
||||
|
||||
if new_url.startswith(("http://", "https://")):
|
||||
logger.warning(f"rclone upload skipped: URL still starts with http after replace: {new_url}")
|
||||
logger.warning("rclone upload skipped: URL still starts with http after replace")
|
||||
logger.debug(f"rclone upload skipped address: {new_url}")
|
||||
return False
|
||||
|
||||
cmd = [
|
||||
@@ -207,7 +208,7 @@ def download_file(
|
||||
logger.debug(f"File exists, skipping download: {file_path}")
|
||||
return True
|
||||
|
||||
logger.info(f"Downloading: {url}")
|
||||
logger.debug(f"Downloading: {url}")
|
||||
|
||||
# 确保目标目录存在
|
||||
file_dir = os.path.dirname(file_path)
|
||||
@@ -239,5 +240,6 @@ def download_file(
|
||||
retries += 1
|
||||
logger.warning(f"Download failed ({e}). Retrying {retries}/{max_retries}...")
|
||||
|
||||
logger.error(f"Download failed after {max_retries} retries: {url}")
|
||||
logger.error(f"Download failed after {max_retries} retries")
|
||||
logger.debug(f"Download failed source address: {url}")
|
||||
return False
|
||||
|
||||
Reference in New Issue
Block a user