You've already forked FrameTour-RenderWorker
- 在 base.py 中添加文件下载、上传和 FFmpeg 执行的链路追踪 - 在 api_client.py 中实现 API 请求的链路追踪和错误标记 - 在 lease_service.py 中添加租约续期的链路追踪支持 - 在 task_executor.py 中集成任务执行的完整链路追踪 - 新增 util/tracing.py 工具模块提供统一的追踪上下文管理 - 在 .env.example 中添加 OTEL 配置选项 - 在 index.py 中初始化和关闭链路追踪功能
486 lines
16 KiB
Python
486 lines
16 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""
|
|
v2 API 客户端
|
|
|
|
实现与渲染服务端 v2 接口的通信。
|
|
"""
|
|
|
|
import logging
|
|
import subprocess
|
|
import time
|
|
import requests
|
|
from typing import Dict, List, Optional, Any
|
|
from urllib.parse import urlparse
|
|
|
|
from opentelemetry.trace import SpanKind, Status, StatusCode
|
|
|
|
from domain.task import Task
|
|
from domain.config import WorkerConfig
|
|
from util.system import get_hw_accel_info_str
|
|
from util.tracing import inject_trace_headers, mark_span_error, start_span
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class APIClientV2:
|
|
"""
|
|
v2 API 客户端
|
|
|
|
负责与渲染服务端的所有 HTTP 通信。
|
|
"""
|
|
|
|
SYSTEM_INFO_TTL_SECONDS = 30
|
|
|
|
def __init__(self, config: WorkerConfig):
|
|
"""
|
|
初始化 API 客户端
|
|
|
|
Args:
|
|
config: Worker 配置
|
|
"""
|
|
self.config = config
|
|
self.base_url = config.api_endpoint.rstrip('/')
|
|
self.access_key = config.access_key
|
|
self.worker_id = config.worker_id
|
|
self.session = requests.Session()
|
|
|
|
self._ffmpeg_version: Optional[str] = None
|
|
self._codec_info: Optional[str] = None
|
|
self._hw_accel_info: Optional[str] = None
|
|
self._gpu_info: Optional[str] = None
|
|
self._gpu_info_checked = False
|
|
self._static_system_info: Optional[Dict[str, Any]] = None
|
|
self._system_info_cache: Optional[Dict[str, Any]] = None
|
|
self._system_info_cache_ts = 0.0
|
|
|
|
# 设置默认请求头
|
|
self.session.headers.update({
|
|
'Content-Type': 'application/json',
|
|
'Accept': 'application/json'
|
|
})
|
|
|
|
def _request_with_trace(
|
|
self,
|
|
method: str,
|
|
url: str,
|
|
*,
|
|
task_id: Optional[str] = None,
|
|
span_name: str = "",
|
|
**kwargs: Any,
|
|
) -> requests.Response:
|
|
request_kwargs = dict(kwargs)
|
|
headers = request_kwargs.pop("headers", None)
|
|
if task_id:
|
|
request_kwargs["headers"] = inject_trace_headers(headers)
|
|
elif headers:
|
|
request_kwargs["headers"] = headers
|
|
|
|
parsed_url = urlparse(url)
|
|
attributes = {
|
|
"http.request.method": method.upper(),
|
|
"url.path": parsed_url.path,
|
|
"server.address": parsed_url.hostname or "",
|
|
}
|
|
if parsed_url.port:
|
|
attributes["server.port"] = parsed_url.port
|
|
|
|
name = span_name or f"render.api.{method.lower()}"
|
|
with start_span(name, task_id=task_id, kind=SpanKind.CLIENT, attributes=attributes) as span:
|
|
try:
|
|
response = self.session.request(method=method, url=url, **request_kwargs)
|
|
except Exception as exc:
|
|
mark_span_error(span, str(exc), "HTTP_REQUEST_ERROR")
|
|
raise
|
|
|
|
if span is not None:
|
|
span.set_attribute("http.response.status_code", response.status_code)
|
|
if response.status_code >= 400:
|
|
span.set_status(Status(StatusCode.ERROR, f"HTTP {response.status_code}"))
|
|
return response
|
|
|
|
def sync(self, current_task_ids: List[str]) -> List[Task]:
|
|
"""
|
|
心跳同步并拉取任务
|
|
|
|
Args:
|
|
current_task_ids: 当前正在执行的任务 ID 列表
|
|
|
|
Returns:
|
|
List[Task]: 新分配的任务列表
|
|
"""
|
|
url = f"{self.base_url}/render/v2/worker/sync"
|
|
|
|
# 将 task_id 转换为整数(服务端期望 []int64)
|
|
task_ids_int = [int(tid) for tid in current_task_ids if tid.isdigit()]
|
|
|
|
payload = {
|
|
'accessKey': self.access_key,
|
|
'workerId': self.worker_id,
|
|
'capabilities': self.config.capabilities,
|
|
'maxConcurrency': self.config.max_concurrency,
|
|
'currentTaskCount': len(current_task_ids),
|
|
'currentTaskIds': task_ids_int,
|
|
'ffmpegVersion': self._get_ffmpeg_version(),
|
|
'codecInfo': self._get_codec_info(),
|
|
'systemInfo': self._get_system_info()
|
|
}
|
|
|
|
try:
|
|
resp = self.session.post(url, json=payload, timeout=10)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
|
|
if data.get('code') != 200:
|
|
logger.warning(f"Sync failed: {data.get('message')}")
|
|
return []
|
|
|
|
# 解析任务列表
|
|
tasks = []
|
|
for task_data in data.get('data', {}).get('tasks') or []:
|
|
try:
|
|
task = Task.from_dict(task_data)
|
|
tasks.append(task)
|
|
except Exception as e:
|
|
logger.error(f"Failed to parse task: {e}")
|
|
|
|
if tasks:
|
|
logger.info(f"Received {len(tasks)} new tasks")
|
|
|
|
return tasks
|
|
|
|
except requests.exceptions.Timeout:
|
|
logger.warning("Sync timeout")
|
|
return []
|
|
except requests.exceptions.RequestException as e:
|
|
logger.error(f"Sync request error: {e}")
|
|
return []
|
|
except Exception as e:
|
|
logger.error(f"Sync error: {e}")
|
|
return []
|
|
|
|
def report_start(self, task_id: str) -> bool:
|
|
"""
|
|
报告任务开始
|
|
|
|
Args:
|
|
task_id: 任务 ID
|
|
|
|
Returns:
|
|
bool: 是否成功
|
|
"""
|
|
url = f"{self.base_url}/render/v2/task/{task_id}/start"
|
|
|
|
try:
|
|
resp = self._request_with_trace(
|
|
method="POST",
|
|
url=url,
|
|
task_id=task_id,
|
|
span_name="render.task.api.report_start",
|
|
json={'workerId': self.worker_id},
|
|
timeout=10,
|
|
)
|
|
if resp.status_code == 200:
|
|
logger.debug(f"[task:{task_id}] Start reported")
|
|
return True
|
|
else:
|
|
logger.warning(f"[task:{task_id}] Report start failed: {resp.status_code}")
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"[task:{task_id}] Report start error: {e}")
|
|
return False
|
|
|
|
def report_success(self, task_id: str, result: Dict[str, Any]) -> bool:
|
|
"""
|
|
报告任务成功
|
|
|
|
Args:
|
|
task_id: 任务 ID
|
|
result: 任务结果数据
|
|
|
|
Returns:
|
|
bool: 是否成功
|
|
"""
|
|
url = f"{self.base_url}/render/v2/task/{task_id}/success"
|
|
|
|
try:
|
|
resp = self._request_with_trace(
|
|
method="POST",
|
|
url=url,
|
|
task_id=task_id,
|
|
span_name="render.task.api.report_success",
|
|
json={
|
|
'workerId': self.worker_id,
|
|
'result': result
|
|
},
|
|
timeout=10,
|
|
)
|
|
if resp.status_code == 200:
|
|
logger.debug(f"[task:{task_id}] Success reported")
|
|
return True
|
|
else:
|
|
logger.warning(f"[task:{task_id}] Report success failed: {resp.status_code}")
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"[task:{task_id}] Report success error: {e}")
|
|
return False
|
|
|
|
def report_fail(self, task_id: str, error_code: str, error_message: str) -> bool:
|
|
"""
|
|
报告任务失败
|
|
|
|
Args:
|
|
task_id: 任务 ID
|
|
error_code: 错误码
|
|
error_message: 错误信息
|
|
|
|
Returns:
|
|
bool: 是否成功
|
|
"""
|
|
url = f"{self.base_url}/render/v2/task/{task_id}/fail"
|
|
|
|
try:
|
|
resp = self._request_with_trace(
|
|
method="POST",
|
|
url=url,
|
|
task_id=task_id,
|
|
span_name="render.task.api.report_fail",
|
|
json={
|
|
'workerId': self.worker_id,
|
|
'errorCode': error_code,
|
|
'errorMessage': error_message[:1000] # 限制长度
|
|
},
|
|
timeout=10,
|
|
)
|
|
if resp.status_code == 200:
|
|
logger.debug(f"[task:{task_id}] Failure reported")
|
|
return True
|
|
else:
|
|
logger.warning(f"[task:{task_id}] Report fail failed: {resp.status_code}")
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"[task:{task_id}] Report fail error: {e}")
|
|
return False
|
|
|
|
def get_upload_url(self, task_id: str, file_type: str, file_name: str = None) -> Optional[Dict[str, str]]:
|
|
"""
|
|
获取上传 URL
|
|
|
|
Args:
|
|
task_id: 任务 ID
|
|
file_type: 文件类型(video/audio/ts/mp4)
|
|
file_name: 文件名(可选)
|
|
|
|
Returns:
|
|
Dict 包含 uploadUrl 和 accessUrl,失败返回 None
|
|
"""
|
|
url = f"{self.base_url}/render/v2/task/{task_id}/uploadUrl"
|
|
|
|
payload = {'fileType': file_type}
|
|
if file_name:
|
|
payload['fileName'] = file_name
|
|
|
|
try:
|
|
resp = self._request_with_trace(
|
|
method="POST",
|
|
url=url,
|
|
task_id=task_id,
|
|
span_name="render.task.api.get_upload_url",
|
|
json=payload,
|
|
timeout=10,
|
|
)
|
|
if resp.status_code == 200:
|
|
data = resp.json()
|
|
if data.get('code') == 200:
|
|
return data.get('data')
|
|
logger.warning(f"[task:{task_id}] Get upload URL failed: {resp.status_code}")
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"[task:{task_id}] Get upload URL error: {e}")
|
|
return None
|
|
|
|
def extend_lease(self, task_id: str, extension: int = None) -> bool:
|
|
"""
|
|
延长租约
|
|
|
|
Args:
|
|
task_id: 任务 ID
|
|
extension: 延长秒数(默认使用配置值)
|
|
|
|
Returns:
|
|
bool: 是否成功
|
|
"""
|
|
if extension is None:
|
|
extension = self.config.lease_extension_duration
|
|
|
|
url = f"{self.base_url}/render/v2/task/{task_id}/extend-lease"
|
|
|
|
try:
|
|
resp = self._request_with_trace(
|
|
method="POST",
|
|
url=url,
|
|
task_id=task_id,
|
|
span_name="render.task.api.extend_lease",
|
|
params={
|
|
'workerId': self.worker_id,
|
|
'extension': extension
|
|
},
|
|
timeout=10,
|
|
)
|
|
if resp.status_code == 200:
|
|
logger.debug(f"[task:{task_id}] Lease extended by {extension}s")
|
|
return True
|
|
else:
|
|
logger.warning(f"[task:{task_id}] Extend lease failed: {resp.status_code}")
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"[task:{task_id}] Extend lease error: {e}")
|
|
return False
|
|
|
|
def get_task_info(self, task_id: str) -> Optional[Dict]:
|
|
"""
|
|
获取任务详情
|
|
|
|
Args:
|
|
task_id: 任务 ID
|
|
|
|
Returns:
|
|
任务详情字典,失败返回 None
|
|
"""
|
|
url = f"{self.base_url}/render/v2/task/{task_id}"
|
|
|
|
try:
|
|
resp = self._request_with_trace(
|
|
method="GET",
|
|
url=url,
|
|
task_id=task_id,
|
|
span_name="render.task.api.get_task_info",
|
|
timeout=10,
|
|
)
|
|
if resp.status_code == 200:
|
|
data = resp.json()
|
|
if data.get('code') == 200:
|
|
return data.get('data')
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"[task:{task_id}] Get task info error: {e}")
|
|
return None
|
|
|
|
def _get_ffmpeg_version(self) -> str:
|
|
"""获取 FFmpeg 版本"""
|
|
if self._ffmpeg_version is not None:
|
|
return self._ffmpeg_version
|
|
try:
|
|
result = subprocess.run(
|
|
['ffmpeg', '-version'],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=5
|
|
)
|
|
first_line = result.stdout.split('\n')[0]
|
|
if 'version' in first_line:
|
|
parts = first_line.split()
|
|
for i, part in enumerate(parts):
|
|
if part == 'version' and i + 1 < len(parts):
|
|
self._ffmpeg_version = parts[i + 1]
|
|
return self._ffmpeg_version
|
|
self._ffmpeg_version = 'unknown'
|
|
return self._ffmpeg_version
|
|
except Exception:
|
|
self._ffmpeg_version = 'unknown'
|
|
return self._ffmpeg_version
|
|
|
|
def _get_codec_info(self) -> str:
|
|
"""获取支持的编解码器信息"""
|
|
if self._codec_info is not None:
|
|
return self._codec_info
|
|
try:
|
|
result = subprocess.run(
|
|
['ffmpeg', '-codecs'],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=5
|
|
)
|
|
# 检查常用编解码器
|
|
codecs = []
|
|
output = result.stdout
|
|
if 'libx264' in output:
|
|
codecs.append('libx264')
|
|
if 'libx265' in output or 'hevc' in output:
|
|
codecs.append('libx265')
|
|
if 'aac' in output:
|
|
codecs.append('aac')
|
|
if 'libfdk_aac' in output:
|
|
codecs.append('libfdk_aac')
|
|
self._codec_info = ', '.join(codecs) if codecs else 'unknown'
|
|
return self._codec_info
|
|
except Exception:
|
|
self._codec_info = 'unknown'
|
|
return self._codec_info
|
|
|
|
def _get_system_info(self) -> Dict[str, Any]:
|
|
"""获取系统信息"""
|
|
try:
|
|
now = time.monotonic()
|
|
if (
|
|
self._system_info_cache
|
|
and now - self._system_info_cache_ts < self.SYSTEM_INFO_TTL_SECONDS
|
|
):
|
|
return self._system_info_cache
|
|
|
|
import platform
|
|
import psutil
|
|
|
|
if self._hw_accel_info is None:
|
|
self._hw_accel_info = get_hw_accel_info_str()
|
|
|
|
if self._static_system_info is None:
|
|
self._static_system_info = {
|
|
'os': platform.system(),
|
|
'cpu': f"{psutil.cpu_count()} cores",
|
|
'memory': f"{psutil.virtual_memory().total // (1024**3)}GB",
|
|
'hwAccelConfig': self.config.hw_accel, # 当前配置的硬件加速
|
|
'hwAccelSupport': self._hw_accel_info, # 系统支持的硬件加速
|
|
}
|
|
|
|
info = dict(self._static_system_info)
|
|
info.update({
|
|
'cpuUsage': f"{psutil.cpu_percent()}%",
|
|
'memoryAvailable': f"{psutil.virtual_memory().available // (1024**3)}GB",
|
|
})
|
|
|
|
# 尝试获取 GPU 信息
|
|
gpu_info = self._get_gpu_info()
|
|
if gpu_info:
|
|
info['gpu'] = gpu_info
|
|
|
|
self._system_info_cache = info
|
|
self._system_info_cache_ts = now
|
|
return info
|
|
except Exception:
|
|
return {}
|
|
|
|
def _get_gpu_info(self) -> Optional[str]:
|
|
"""获取 GPU 信息"""
|
|
if self._gpu_info_checked:
|
|
return self._gpu_info
|
|
|
|
self._gpu_info_checked = True
|
|
try:
|
|
result = subprocess.run(
|
|
['nvidia-smi', '--query-gpu=name', '--format=csv,noheader'],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=5
|
|
)
|
|
if result.returncode == 0:
|
|
gpu_name = result.stdout.strip().split('\n')[0]
|
|
self._gpu_info = gpu_name
|
|
except Exception:
|
|
self._gpu_info = None
|
|
|
|
return self._gpu_info
|
|
|
|
def close(self):
|
|
"""关闭会话"""
|
|
self.session.close()
|