You've already forked FrameTour-RenderWorker
- 实现跨进程缓存锁获取和释放功能 - 在下载过程中使用UUID生成唯一的临时文件名避免并发覆盖 - 添加超时机制和轮询间隔控制锁等待时间 - 修改清理逻辑跳过锁文件和下载中的临时文件 - 添加测试验证缓存锁功能正常工作 fix(ffmpeg): 优化FFmpeg命令执行和错误处理 - 添加默认日志级别为error减少冗余输出 - 修复subprocess运行参数传递方式 - 改进错误信息截取避免空值解码异常 refactor(system-info): 优化系统信息获取和缓存机制 - 实现FFmpeg版本、编解码器信息缓存避免重复查询 - 添加系统信息TTL缓存机制提升性能 - 实现GPU信息检查状态缓存避免重复检测 - 整合静态系统信息和动态信息分离处理 refactor(storage): 优化HTTP上传下载资源管理 - 使用上下文管理器确保请求连接正确关闭 - 修改rclone命令构建方式从字符串改为列表形式 - 改进错误处理截取stderr输出长度限制 - 优化响应处理避免资源泄露
418 lines
13 KiB
Python
418 lines
13 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""
|
|
v2 API 客户端
|
|
|
|
实现与渲染服务端 v2 接口的通信。
|
|
"""
|
|
|
|
import logging
|
|
import subprocess
|
|
import time
|
|
import requests
|
|
from typing import Dict, List, Optional, Any
|
|
|
|
from domain.task import Task
|
|
from domain.config import WorkerConfig
|
|
from util.system import get_hw_accel_info_str
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class APIClientV2:
|
|
"""
|
|
v2 API 客户端
|
|
|
|
负责与渲染服务端的所有 HTTP 通信。
|
|
"""
|
|
|
|
SYSTEM_INFO_TTL_SECONDS = 30
|
|
|
|
def __init__(self, config: WorkerConfig):
|
|
"""
|
|
初始化 API 客户端
|
|
|
|
Args:
|
|
config: Worker 配置
|
|
"""
|
|
self.config = config
|
|
self.base_url = config.api_endpoint.rstrip('/')
|
|
self.access_key = config.access_key
|
|
self.worker_id = config.worker_id
|
|
self.session = requests.Session()
|
|
|
|
self._ffmpeg_version: Optional[str] = None
|
|
self._codec_info: Optional[str] = None
|
|
self._hw_accel_info: Optional[str] = None
|
|
self._gpu_info: Optional[str] = None
|
|
self._gpu_info_checked = False
|
|
self._static_system_info: Optional[Dict[str, Any]] = None
|
|
self._system_info_cache: Optional[Dict[str, Any]] = None
|
|
self._system_info_cache_ts = 0.0
|
|
|
|
# 设置默认请求头
|
|
self.session.headers.update({
|
|
'Content-Type': 'application/json',
|
|
'Accept': 'application/json'
|
|
})
|
|
|
|
def sync(self, current_task_ids: List[str]) -> List[Task]:
|
|
"""
|
|
心跳同步并拉取任务
|
|
|
|
Args:
|
|
current_task_ids: 当前正在执行的任务 ID 列表
|
|
|
|
Returns:
|
|
List[Task]: 新分配的任务列表
|
|
"""
|
|
url = f"{self.base_url}/render/v2/worker/sync"
|
|
|
|
# 将 task_id 转换为整数(服务端期望 []int64)
|
|
task_ids_int = [int(tid) for tid in current_task_ids if tid.isdigit()]
|
|
|
|
payload = {
|
|
'accessKey': self.access_key,
|
|
'workerId': self.worker_id,
|
|
'capabilities': self.config.capabilities,
|
|
'maxConcurrency': self.config.max_concurrency,
|
|
'currentTaskCount': len(current_task_ids),
|
|
'currentTaskIds': task_ids_int,
|
|
'ffmpegVersion': self._get_ffmpeg_version(),
|
|
'codecInfo': self._get_codec_info(),
|
|
'systemInfo': self._get_system_info()
|
|
}
|
|
|
|
try:
|
|
resp = self.session.post(url, json=payload, timeout=10)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
|
|
if data.get('code') != 200:
|
|
logger.warning(f"Sync failed: {data.get('message')}")
|
|
return []
|
|
|
|
# 解析任务列表
|
|
tasks = []
|
|
for task_data in data.get('data', {}).get('tasks') or []:
|
|
try:
|
|
task = Task.from_dict(task_data)
|
|
tasks.append(task)
|
|
except Exception as e:
|
|
logger.error(f"Failed to parse task: {e}")
|
|
|
|
if tasks:
|
|
logger.info(f"Received {len(tasks)} new tasks")
|
|
|
|
return tasks
|
|
|
|
except requests.exceptions.Timeout:
|
|
logger.warning("Sync timeout")
|
|
return []
|
|
except requests.exceptions.RequestException as e:
|
|
logger.error(f"Sync request error: {e}")
|
|
return []
|
|
except Exception as e:
|
|
logger.error(f"Sync error: {e}")
|
|
return []
|
|
|
|
def report_start(self, task_id: str) -> bool:
|
|
"""
|
|
报告任务开始
|
|
|
|
Args:
|
|
task_id: 任务 ID
|
|
|
|
Returns:
|
|
bool: 是否成功
|
|
"""
|
|
url = f"{self.base_url}/render/v2/task/{task_id}/start"
|
|
|
|
try:
|
|
resp = self.session.post(
|
|
url,
|
|
json={'workerId': self.worker_id},
|
|
timeout=10
|
|
)
|
|
if resp.status_code == 200:
|
|
logger.debug(f"[task:{task_id}] Start reported")
|
|
return True
|
|
else:
|
|
logger.warning(f"[task:{task_id}] Report start failed: {resp.status_code}")
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"[task:{task_id}] Report start error: {e}")
|
|
return False
|
|
|
|
def report_success(self, task_id: str, result: Dict[str, Any]) -> bool:
|
|
"""
|
|
报告任务成功
|
|
|
|
Args:
|
|
task_id: 任务 ID
|
|
result: 任务结果数据
|
|
|
|
Returns:
|
|
bool: 是否成功
|
|
"""
|
|
url = f"{self.base_url}/render/v2/task/{task_id}/success"
|
|
|
|
try:
|
|
resp = self.session.post(
|
|
url,
|
|
json={
|
|
'workerId': self.worker_id,
|
|
'result': result
|
|
},
|
|
timeout=10
|
|
)
|
|
if resp.status_code == 200:
|
|
logger.debug(f"[task:{task_id}] Success reported")
|
|
return True
|
|
else:
|
|
logger.warning(f"[task:{task_id}] Report success failed: {resp.status_code}")
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"[task:{task_id}] Report success error: {e}")
|
|
return False
|
|
|
|
def report_fail(self, task_id: str, error_code: str, error_message: str) -> bool:
|
|
"""
|
|
报告任务失败
|
|
|
|
Args:
|
|
task_id: 任务 ID
|
|
error_code: 错误码
|
|
error_message: 错误信息
|
|
|
|
Returns:
|
|
bool: 是否成功
|
|
"""
|
|
url = f"{self.base_url}/render/v2/task/{task_id}/fail"
|
|
|
|
try:
|
|
resp = self.session.post(
|
|
url,
|
|
json={
|
|
'workerId': self.worker_id,
|
|
'errorCode': error_code,
|
|
'errorMessage': error_message[:1000] # 限制长度
|
|
},
|
|
timeout=10
|
|
)
|
|
if resp.status_code == 200:
|
|
logger.debug(f"[task:{task_id}] Failure reported")
|
|
return True
|
|
else:
|
|
logger.warning(f"[task:{task_id}] Report fail failed: {resp.status_code}")
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"[task:{task_id}] Report fail error: {e}")
|
|
return False
|
|
|
|
def get_upload_url(self, task_id: str, file_type: str, file_name: str = None) -> Optional[Dict[str, str]]:
|
|
"""
|
|
获取上传 URL
|
|
|
|
Args:
|
|
task_id: 任务 ID
|
|
file_type: 文件类型(video/audio/ts/mp4)
|
|
file_name: 文件名(可选)
|
|
|
|
Returns:
|
|
Dict 包含 uploadUrl 和 accessUrl,失败返回 None
|
|
"""
|
|
url = f"{self.base_url}/render/v2/task/{task_id}/uploadUrl"
|
|
|
|
payload = {'fileType': file_type}
|
|
if file_name:
|
|
payload['fileName'] = file_name
|
|
|
|
try:
|
|
resp = self.session.post(url, json=payload, timeout=10)
|
|
if resp.status_code == 200:
|
|
data = resp.json()
|
|
if data.get('code') == 200:
|
|
return data.get('data')
|
|
logger.warning(f"[task:{task_id}] Get upload URL failed: {resp.status_code}")
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"[task:{task_id}] Get upload URL error: {e}")
|
|
return None
|
|
|
|
def extend_lease(self, task_id: str, extension: int = None) -> bool:
|
|
"""
|
|
延长租约
|
|
|
|
Args:
|
|
task_id: 任务 ID
|
|
extension: 延长秒数(默认使用配置值)
|
|
|
|
Returns:
|
|
bool: 是否成功
|
|
"""
|
|
if extension is None:
|
|
extension = self.config.lease_extension_duration
|
|
|
|
url = f"{self.base_url}/render/v2/task/{task_id}/extend-lease"
|
|
|
|
try:
|
|
resp = self.session.post(
|
|
url,
|
|
params={
|
|
'workerId': self.worker_id,
|
|
'extension': extension
|
|
},
|
|
timeout=10
|
|
)
|
|
if resp.status_code == 200:
|
|
logger.debug(f"[task:{task_id}] Lease extended by {extension}s")
|
|
return True
|
|
else:
|
|
logger.warning(f"[task:{task_id}] Extend lease failed: {resp.status_code}")
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"[task:{task_id}] Extend lease error: {e}")
|
|
return False
|
|
|
|
def get_task_info(self, task_id: str) -> Optional[Dict]:
|
|
"""
|
|
获取任务详情
|
|
|
|
Args:
|
|
task_id: 任务 ID
|
|
|
|
Returns:
|
|
任务详情字典,失败返回 None
|
|
"""
|
|
url = f"{self.base_url}/render/v2/task/{task_id}"
|
|
|
|
try:
|
|
resp = self.session.get(url, timeout=10)
|
|
if resp.status_code == 200:
|
|
data = resp.json()
|
|
if data.get('code') == 200:
|
|
return data.get('data')
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"[task:{task_id}] Get task info error: {e}")
|
|
return None
|
|
|
|
def _get_ffmpeg_version(self) -> str:
|
|
"""获取 FFmpeg 版本"""
|
|
if self._ffmpeg_version is not None:
|
|
return self._ffmpeg_version
|
|
try:
|
|
result = subprocess.run(
|
|
['ffmpeg', '-version'],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=5
|
|
)
|
|
first_line = result.stdout.split('\n')[0]
|
|
if 'version' in first_line:
|
|
parts = first_line.split()
|
|
for i, part in enumerate(parts):
|
|
if part == 'version' and i + 1 < len(parts):
|
|
self._ffmpeg_version = parts[i + 1]
|
|
return self._ffmpeg_version
|
|
self._ffmpeg_version = 'unknown'
|
|
return self._ffmpeg_version
|
|
except Exception:
|
|
self._ffmpeg_version = 'unknown'
|
|
return self._ffmpeg_version
|
|
|
|
def _get_codec_info(self) -> str:
|
|
"""获取支持的编解码器信息"""
|
|
if self._codec_info is not None:
|
|
return self._codec_info
|
|
try:
|
|
result = subprocess.run(
|
|
['ffmpeg', '-codecs'],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=5
|
|
)
|
|
# 检查常用编解码器
|
|
codecs = []
|
|
output = result.stdout
|
|
if 'libx264' in output:
|
|
codecs.append('libx264')
|
|
if 'libx265' in output or 'hevc' in output:
|
|
codecs.append('libx265')
|
|
if 'aac' in output:
|
|
codecs.append('aac')
|
|
if 'libfdk_aac' in output:
|
|
codecs.append('libfdk_aac')
|
|
self._codec_info = ', '.join(codecs) if codecs else 'unknown'
|
|
return self._codec_info
|
|
except Exception:
|
|
self._codec_info = 'unknown'
|
|
return self._codec_info
|
|
|
|
def _get_system_info(self) -> Dict[str, Any]:
|
|
"""获取系统信息"""
|
|
try:
|
|
now = time.monotonic()
|
|
if (
|
|
self._system_info_cache
|
|
and now - self._system_info_cache_ts < self.SYSTEM_INFO_TTL_SECONDS
|
|
):
|
|
return self._system_info_cache
|
|
|
|
import platform
|
|
import psutil
|
|
|
|
if self._hw_accel_info is None:
|
|
self._hw_accel_info = get_hw_accel_info_str()
|
|
|
|
if self._static_system_info is None:
|
|
self._static_system_info = {
|
|
'os': platform.system(),
|
|
'cpu': f"{psutil.cpu_count()} cores",
|
|
'memory': f"{psutil.virtual_memory().total // (1024**3)}GB",
|
|
'hwAccelConfig': self.config.hw_accel, # 当前配置的硬件加速
|
|
'hwAccelSupport': self._hw_accel_info, # 系统支持的硬件加速
|
|
}
|
|
|
|
info = dict(self._static_system_info)
|
|
info.update({
|
|
'cpuUsage': f"{psutil.cpu_percent()}%",
|
|
'memoryAvailable': f"{psutil.virtual_memory().available // (1024**3)}GB",
|
|
})
|
|
|
|
# 尝试获取 GPU 信息
|
|
gpu_info = self._get_gpu_info()
|
|
if gpu_info:
|
|
info['gpu'] = gpu_info
|
|
|
|
self._system_info_cache = info
|
|
self._system_info_cache_ts = now
|
|
return info
|
|
except Exception:
|
|
return {}
|
|
|
|
def _get_gpu_info(self) -> Optional[str]:
|
|
"""获取 GPU 信息"""
|
|
if self._gpu_info_checked:
|
|
return self._gpu_info
|
|
|
|
self._gpu_info_checked = True
|
|
try:
|
|
result = subprocess.run(
|
|
['nvidia-smi', '--query-gpu=name', '--format=csv,noheader'],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=5
|
|
)
|
|
if result.returncode == 0:
|
|
gpu_name = result.stdout.strip().split('\n')[0]
|
|
self._gpu_info = gpu_name
|
|
except Exception:
|
|
self._gpu_info = None
|
|
|
|
return self._gpu_info
|
|
|
|
def close(self):
|
|
"""关闭会话"""
|
|
self.session.close()
|