# -*- coding: utf-8 -*- """ v2 API 客户端 实现与渲染服务端 v2 接口的通信。 """ import logging import subprocess import requests from typing import Dict, List, Optional, Any from domain.task import Task from domain.config import WorkerConfig logger = logging.getLogger(__name__) class APIClientV2: """ v2 API 客户端 负责与渲染服务端的所有 HTTP 通信。 """ def __init__(self, config: WorkerConfig): """ 初始化 API 客户端 Args: config: Worker 配置 """ self.config = config self.base_url = config.api_endpoint.rstrip('/') self.access_key = config.access_key self.worker_id = config.worker_id self.session = requests.Session() # 设置默认请求头 self.session.headers.update({ 'Content-Type': 'application/json', 'Accept': 'application/json' }) def sync(self, current_task_ids: List[str]) -> List[Task]: """ 心跳同步并拉取任务 Args: current_task_ids: 当前正在执行的任务 ID 列表 Returns: List[Task]: 新分配的任务列表 """ url = f"{self.base_url}/render/v2/worker/sync" # 将 task_id 转换为整数(服务端期望 []int64) task_ids_int = [int(tid) for tid in current_task_ids if tid.isdigit()] payload = { 'accessKey': self.access_key, 'workerId': self.worker_id, 'capabilities': self.config.capabilities, 'maxConcurrency': self.config.max_concurrency, 'currentTaskCount': len(current_task_ids), 'currentTaskIds': task_ids_int, 'ffmpegVersion': self._get_ffmpeg_version(), 'codecInfo': self._get_codec_info(), 'systemInfo': self._get_system_info() } try: resp = self.session.post(url, json=payload, timeout=10) resp.raise_for_status() data = resp.json() if data.get('code') != 200: logger.warning(f"Sync failed: {data.get('message')}") return [] # 解析任务列表 tasks = [] for task_data in data.get('data', {}).get('tasks', []): try: task = Task.from_dict(task_data) tasks.append(task) except Exception as e: logger.error(f"Failed to parse task: {e}") if tasks: logger.info(f"Received {len(tasks)} new tasks") return tasks except requests.exceptions.Timeout: logger.warning("Sync timeout") return [] except requests.exceptions.RequestException as e: logger.error(f"Sync request error: {e}") return [] except Exception as e: logger.error(f"Sync error: {e}") return [] def report_start(self, task_id: str) -> bool: """ 报告任务开始 Args: task_id: 任务 ID Returns: bool: 是否成功 """ url = f"{self.base_url}/render/v2/task/{task_id}/start" try: resp = self.session.post( url, json={'workerId': self.worker_id}, timeout=10 ) if resp.status_code == 200: logger.debug(f"[task:{task_id}] Start reported") return True else: logger.warning(f"[task:{task_id}] Report start failed: {resp.status_code}") return False except Exception as e: logger.error(f"[task:{task_id}] Report start error: {e}") return False def report_success(self, task_id: str, result: Dict[str, Any]) -> bool: """ 报告任务成功 Args: task_id: 任务 ID result: 任务结果数据 Returns: bool: 是否成功 """ url = f"{self.base_url}/render/v2/task/{task_id}/success" try: resp = self.session.post( url, json={ 'workerId': self.worker_id, 'result': result }, timeout=10 ) if resp.status_code == 200: logger.debug(f"[task:{task_id}] Success reported") return True else: logger.warning(f"[task:{task_id}] Report success failed: {resp.status_code}") return False except Exception as e: logger.error(f"[task:{task_id}] Report success error: {e}") return False def report_fail(self, task_id: str, error_code: str, error_message: str) -> bool: """ 报告任务失败 Args: task_id: 任务 ID error_code: 错误码 error_message: 错误信息 Returns: bool: 是否成功 """ url = f"{self.base_url}/render/v2/task/{task_id}/fail" try: resp = self.session.post( url, json={ 'workerId': self.worker_id, 'errorCode': error_code, 'errorMessage': error_message[:1000] # 限制长度 }, timeout=10 ) if resp.status_code == 200: logger.debug(f"[task:{task_id}] Failure reported") return True else: logger.warning(f"[task:{task_id}] Report fail failed: {resp.status_code}") return False except Exception as e: logger.error(f"[task:{task_id}] Report fail error: {e}") return False def get_upload_url(self, task_id: str, file_type: str, file_name: str = None) -> Optional[Dict[str, str]]: """ 获取上传 URL Args: task_id: 任务 ID file_type: 文件类型(video/audio/ts/mp4) file_name: 文件名(可选) Returns: Dict 包含 uploadUrl 和 accessUrl,失败返回 None """ url = f"{self.base_url}/render/v2/task/{task_id}/uploadUrl" payload = {'fileType': file_type} if file_name: payload['fileName'] = file_name try: resp = self.session.post(url, json=payload, timeout=10) if resp.status_code == 200: data = resp.json() if data.get('code') == 200: return data.get('data') logger.warning(f"[task:{task_id}] Get upload URL failed: {resp.status_code}") return None except Exception as e: logger.error(f"[task:{task_id}] Get upload URL error: {e}") return None def extend_lease(self, task_id: str, extension: int = None) -> bool: """ 延长租约 Args: task_id: 任务 ID extension: 延长秒数(默认使用配置值) Returns: bool: 是否成功 """ if extension is None: extension = self.config.lease_extension_duration url = f"{self.base_url}/render/v2/task/{task_id}/extend-lease" try: resp = self.session.post( url, params={ 'workerId': self.worker_id, 'extension': extension }, timeout=10 ) if resp.status_code == 200: logger.debug(f"[task:{task_id}] Lease extended by {extension}s") return True else: logger.warning(f"[task:{task_id}] Extend lease failed: {resp.status_code}") return False except Exception as e: logger.error(f"[task:{task_id}] Extend lease error: {e}") return False def get_task_info(self, task_id: str) -> Optional[Dict]: """ 获取任务详情 Args: task_id: 任务 ID Returns: 任务详情字典,失败返回 None """ url = f"{self.base_url}/render/v2/task/{task_id}" try: resp = self.session.get(url, timeout=10) if resp.status_code == 200: data = resp.json() if data.get('code') == 200: return data.get('data') return None except Exception as e: logger.error(f"[task:{task_id}] Get task info error: {e}") return None def _get_ffmpeg_version(self) -> str: """获取 FFmpeg 版本""" try: result = subprocess.run( ['ffmpeg', '-version'], capture_output=True, text=True, timeout=5 ) first_line = result.stdout.split('\n')[0] if 'version' in first_line: parts = first_line.split() for i, part in enumerate(parts): if part == 'version' and i + 1 < len(parts): return parts[i + 1] return 'unknown' except Exception: return 'unknown' def _get_codec_info(self) -> str: """获取支持的编解码器信息""" try: result = subprocess.run( ['ffmpeg', '-codecs'], capture_output=True, text=True, timeout=5 ) # 检查常用编解码器 codecs = [] output = result.stdout if 'libx264' in output: codecs.append('libx264') if 'libx265' in output or 'hevc' in output: codecs.append('libx265') if 'aac' in output: codecs.append('aac') if 'libfdk_aac' in output: codecs.append('libfdk_aac') return ', '.join(codecs) if codecs else 'unknown' except Exception: return 'unknown' def _get_system_info(self) -> Dict[str, Any]: """获取系统信息""" try: import platform import psutil info = { 'os': platform.system(), 'cpu': f"{psutil.cpu_count()} cores", 'memory': f"{psutil.virtual_memory().total // (1024**3)}GB", 'cpuUsage': f"{psutil.cpu_percent()}%", 'memoryAvailable': f"{psutil.virtual_memory().available // (1024**3)}GB" } # 尝试获取 GPU 信息 gpu_info = self._get_gpu_info() if gpu_info: info['gpu'] = gpu_info return info except Exception: return {} def _get_gpu_info(self) -> Optional[str]: """获取 GPU 信息""" try: result = subprocess.run( ['nvidia-smi', '--query-gpu=name', '--format=csv,noheader'], capture_output=True, text=True, timeout=5 ) if result.returncode == 0: gpu_name = result.stdout.strip().split('\n')[0] return gpu_name except Exception: pass return None def close(self): """关闭会话""" self.session.close()