# -*- coding: utf-8 -*- """ v2 API 客户端 实现与渲染服务端 v2 接口的通信。 """ import logging import subprocess import requests from typing import Dict, List, Optional, Any from domain.task import Task from domain.config import WorkerConfig from util.system import get_hw_accel_info_str logger = logging.getLogger(__name__) class APIClientV2: """ v2 API 客户端 负责与渲染服务端的所有 HTTP 通信。 """ def __init__(self, config: WorkerConfig): """ 初始化 API 客户端 Args: config: Worker 配置 """ self.config = config self.base_url = config.api_endpoint.rstrip('/') self.access_key = config.access_key self.worker_id = config.worker_id self.session = requests.Session() # 设置默认请求头 self.session.headers.update({ 'Content-Type': 'application/json', 'Accept': 'application/json' }) def sync(self, current_task_ids: List[str]) -> List[Task]: """ 心跳同步并拉取任务 Args: current_task_ids: 当前正在执行的任务 ID 列表 Returns: List[Task]: 新分配的任务列表 """ url = f"{self.base_url}/render/v2/worker/sync" # 将 task_id 转换为整数(服务端期望 []int64) task_ids_int = [int(tid) for tid in current_task_ids if tid.isdigit()] payload = { 'accessKey': self.access_key, 'workerId': self.worker_id, 'capabilities': self.config.capabilities, 'maxConcurrency': self.config.max_concurrency, 'currentTaskCount': len(current_task_ids), 'currentTaskIds': task_ids_int, 'ffmpegVersion': self._get_ffmpeg_version(), 'codecInfo': self._get_codec_info(), 'systemInfo': self._get_system_info() } try: resp = self.session.post(url, json=payload, timeout=10) resp.raise_for_status() data = resp.json() if data.get('code') != 200: logger.warning(f"Sync failed: {data.get('message')}") return [] # 解析任务列表 tasks = [] for task_data in data.get('data', {}).get('tasks') or []: try: task = Task.from_dict(task_data) tasks.append(task) except Exception as e: logger.error(f"Failed to parse task: {e}") if tasks: logger.info(f"Received {len(tasks)} new tasks") return tasks except requests.exceptions.Timeout: logger.warning("Sync timeout") return [] except requests.exceptions.RequestException as e: logger.error(f"Sync request error: {e}") return [] except Exception as e: logger.error(f"Sync error: {e}") return [] def report_start(self, task_id: str) -> bool: """ 报告任务开始 Args: task_id: 任务 ID Returns: bool: 是否成功 """ url = f"{self.base_url}/render/v2/task/{task_id}/start" try: resp = self.session.post( url, json={'workerId': self.worker_id}, timeout=10 ) if resp.status_code == 200: logger.debug(f"[task:{task_id}] Start reported") return True else: logger.warning(f"[task:{task_id}] Report start failed: {resp.status_code}") return False except Exception as e: logger.error(f"[task:{task_id}] Report start error: {e}") return False def report_success(self, task_id: str, result: Dict[str, Any]) -> bool: """ 报告任务成功 Args: task_id: 任务 ID result: 任务结果数据 Returns: bool: 是否成功 """ url = f"{self.base_url}/render/v2/task/{task_id}/success" try: resp = self.session.post( url, json={ 'workerId': self.worker_id, 'result': result }, timeout=10 ) if resp.status_code == 200: logger.debug(f"[task:{task_id}] Success reported") return True else: logger.warning(f"[task:{task_id}] Report success failed: {resp.status_code}") return False except Exception as e: logger.error(f"[task:{task_id}] Report success error: {e}") return False def report_fail(self, task_id: str, error_code: str, error_message: str) -> bool: """ 报告任务失败 Args: task_id: 任务 ID error_code: 错误码 error_message: 错误信息 Returns: bool: 是否成功 """ url = f"{self.base_url}/render/v2/task/{task_id}/fail" try: resp = self.session.post( url, json={ 'workerId': self.worker_id, 'errorCode': error_code, 'errorMessage': error_message[:1000] # 限制长度 }, timeout=10 ) if resp.status_code == 200: logger.debug(f"[task:{task_id}] Failure reported") return True else: logger.warning(f"[task:{task_id}] Report fail failed: {resp.status_code}") return False except Exception as e: logger.error(f"[task:{task_id}] Report fail error: {e}") return False def get_upload_url(self, task_id: str, file_type: str, file_name: str = None) -> Optional[Dict[str, str]]: """ 获取上传 URL Args: task_id: 任务 ID file_type: 文件类型(video/audio/ts/mp4) file_name: 文件名(可选) Returns: Dict 包含 uploadUrl 和 accessUrl,失败返回 None """ url = f"{self.base_url}/render/v2/task/{task_id}/uploadUrl" payload = {'fileType': file_type} if file_name: payload['fileName'] = file_name try: resp = self.session.post(url, json=payload, timeout=10) if resp.status_code == 200: data = resp.json() if data.get('code') == 200: return data.get('data') logger.warning(f"[task:{task_id}] Get upload URL failed: {resp.status_code}") return None except Exception as e: logger.error(f"[task:{task_id}] Get upload URL error: {e}") return None def extend_lease(self, task_id: str, extension: int = None) -> bool: """ 延长租约 Args: task_id: 任务 ID extension: 延长秒数(默认使用配置值) Returns: bool: 是否成功 """ if extension is None: extension = self.config.lease_extension_duration url = f"{self.base_url}/render/v2/task/{task_id}/extend-lease" try: resp = self.session.post( url, params={ 'workerId': self.worker_id, 'extension': extension }, timeout=10 ) if resp.status_code == 200: logger.debug(f"[task:{task_id}] Lease extended by {extension}s") return True else: logger.warning(f"[task:{task_id}] Extend lease failed: {resp.status_code}") return False except Exception as e: logger.error(f"[task:{task_id}] Extend lease error: {e}") return False def get_task_info(self, task_id: str) -> Optional[Dict]: """ 获取任务详情 Args: task_id: 任务 ID Returns: 任务详情字典,失败返回 None """ url = f"{self.base_url}/render/v2/task/{task_id}" try: resp = self.session.get(url, timeout=10) if resp.status_code == 200: data = resp.json() if data.get('code') == 200: return data.get('data') return None except Exception as e: logger.error(f"[task:{task_id}] Get task info error: {e}") return None def _get_ffmpeg_version(self) -> str: """获取 FFmpeg 版本""" try: result = subprocess.run( ['ffmpeg', '-version'], capture_output=True, text=True, timeout=5 ) first_line = result.stdout.split('\n')[0] if 'version' in first_line: parts = first_line.split() for i, part in enumerate(parts): if part == 'version' and i + 1 < len(parts): return parts[i + 1] return 'unknown' except Exception: return 'unknown' def _get_codec_info(self) -> str: """获取支持的编解码器信息""" try: result = subprocess.run( ['ffmpeg', '-codecs'], capture_output=True, text=True, timeout=5 ) # 检查常用编解码器 codecs = [] output = result.stdout if 'libx264' in output: codecs.append('libx264') if 'libx265' in output or 'hevc' in output: codecs.append('libx265') if 'aac' in output: codecs.append('aac') if 'libfdk_aac' in output: codecs.append('libfdk_aac') return ', '.join(codecs) if codecs else 'unknown' except Exception: return 'unknown' def _get_system_info(self) -> Dict[str, Any]: """获取系统信息""" try: import platform import psutil info = { 'os': platform.system(), 'cpu': f"{psutil.cpu_count()} cores", 'memory': f"{psutil.virtual_memory().total // (1024**3)}GB", 'cpuUsage': f"{psutil.cpu_percent()}%", 'memoryAvailable': f"{psutil.virtual_memory().available // (1024**3)}GB", 'hwAccelConfig': self.config.hw_accel, # 当前配置的硬件加速 'hwAccelSupport': get_hw_accel_info_str(), # 系统支持的硬件加速 } # 尝试获取 GPU 信息 gpu_info = self._get_gpu_info() if gpu_info: info['gpu'] = gpu_info return info except Exception: return {} def _get_gpu_info(self) -> Optional[str]: """获取 GPU 信息""" try: result = subprocess.run( ['nvidia-smi', '--query-gpu=name', '--format=csv,noheader'], capture_output=True, text=True, timeout=5 ) if result.returncode == 0: gpu_name = result.stdout.strip().split('\n')[0] return gpu_name except Exception: pass return None def close(self): """关闭会话""" self.session.close()