diff --git a/handlers/base.py b/handlers/base.py index ada5e18..f39247d 100644 --- a/handlers/base.py +++ b/handlers/base.py @@ -139,6 +139,8 @@ AUDIO_ENCODE_ARGS = [ '-ac', '2', ] +FFMPEG_LOGLEVEL = 'error' + def subprocess_args(include_stdout: bool = True) -> Dict[str, Any]: """ @@ -486,22 +488,28 @@ class BaseHandler(TaskHandler, ABC): if timeout is None: timeout = self.config.ffmpeg_timeout + cmd_to_run = list(cmd) + if cmd_to_run and cmd_to_run[0] == 'ffmpeg' and '-loglevel' not in cmd_to_run: + cmd_to_run[1:1] = ['-loglevel', FFMPEG_LOGLEVEL] + # 日志记录命令(限制长度) - cmd_str = ' '.join(cmd) + cmd_str = ' '.join(cmd_to_run) if len(cmd_str) > 500: cmd_str = cmd_str[:500] + '...' logger.info(f"[task:{task_id}] FFmpeg: {cmd_str}") try: + run_args = subprocess_args(False) + run_args['stdout'] = subprocess.DEVNULL + run_args['stderr'] = subprocess.PIPE result = subprocess.run( - cmd, - capture_output=True, + cmd_to_run, timeout=timeout, - **subprocess_args(False) + **run_args ) if result.returncode != 0: - stderr = result.stderr.decode('utf-8', errors='replace')[:1000] + stderr = (result.stderr or b'').decode('utf-8', errors='replace')[:1000] logger.error(f"[task:{task_id}] FFmpeg failed (code={result.returncode}): {stderr}") return False diff --git a/services/api_client.py b/services/api_client.py index 014274f..7b99fb6 100644 --- a/services/api_client.py +++ b/services/api_client.py @@ -7,6 +7,7 @@ v2 API 客户端 import logging import subprocess +import time import requests from typing import Dict, List, Optional, Any @@ -24,6 +25,8 @@ class APIClientV2: 负责与渲染服务端的所有 HTTP 通信。 """ + SYSTEM_INFO_TTL_SECONDS = 30 + def __init__(self, config: WorkerConfig): """ 初始化 API 客户端 @@ -37,6 +40,15 @@ class APIClientV2: self.worker_id = config.worker_id self.session = requests.Session() + self._ffmpeg_version: Optional[str] = None + self._codec_info: Optional[str] = None + self._hw_accel_info: Optional[str] = None + self._gpu_info: Optional[str] = None + self._gpu_info_checked = False + self._static_system_info: Optional[Dict[str, Any]] = None + self._system_info_cache: Optional[Dict[str, Any]] = None + self._system_info_cache_ts = 0.0 + # 设置默认请求头 self.session.headers.update({ 'Content-Type': 'application/json', @@ -287,6 +299,8 @@ class APIClientV2: def _get_ffmpeg_version(self) -> str: """获取 FFmpeg 版本""" + if self._ffmpeg_version is not None: + return self._ffmpeg_version try: result = subprocess.run( ['ffmpeg', '-version'], @@ -299,13 +313,18 @@ class APIClientV2: parts = first_line.split() for i, part in enumerate(parts): if part == 'version' and i + 1 < len(parts): - return parts[i + 1] - return 'unknown' + self._ffmpeg_version = parts[i + 1] + return self._ffmpeg_version + self._ffmpeg_version = 'unknown' + return self._ffmpeg_version except Exception: - return 'unknown' + self._ffmpeg_version = 'unknown' + return self._ffmpeg_version def _get_codec_info(self) -> str: """获取支持的编解码器信息""" + if self._codec_info is not None: + return self._codec_info try: result = subprocess.run( ['ffmpeg', '-codecs'], @@ -324,37 +343,60 @@ class APIClientV2: codecs.append('aac') if 'libfdk_aac' in output: codecs.append('libfdk_aac') - return ', '.join(codecs) if codecs else 'unknown' + self._codec_info = ', '.join(codecs) if codecs else 'unknown' + return self._codec_info except Exception: - return 'unknown' + self._codec_info = 'unknown' + return self._codec_info def _get_system_info(self) -> Dict[str, Any]: """获取系统信息""" try: + now = time.monotonic() + if ( + self._system_info_cache + and now - self._system_info_cache_ts < self.SYSTEM_INFO_TTL_SECONDS + ): + return self._system_info_cache + import platform import psutil - info = { - 'os': platform.system(), - 'cpu': f"{psutil.cpu_count()} cores", - 'memory': f"{psutil.virtual_memory().total // (1024**3)}GB", + if self._hw_accel_info is None: + self._hw_accel_info = get_hw_accel_info_str() + + if self._static_system_info is None: + self._static_system_info = { + 'os': platform.system(), + 'cpu': f"{psutil.cpu_count()} cores", + 'memory': f"{psutil.virtual_memory().total // (1024**3)}GB", + 'hwAccelConfig': self.config.hw_accel, # 当前配置的硬件加速 + 'hwAccelSupport': self._hw_accel_info, # 系统支持的硬件加速 + } + + info = dict(self._static_system_info) + info.update({ 'cpuUsage': f"{psutil.cpu_percent()}%", 'memoryAvailable': f"{psutil.virtual_memory().available // (1024**3)}GB", - 'hwAccelConfig': self.config.hw_accel, # 当前配置的硬件加速 - 'hwAccelSupport': get_hw_accel_info_str(), # 系统支持的硬件加速 - } + }) # 尝试获取 GPU 信息 gpu_info = self._get_gpu_info() if gpu_info: info['gpu'] = gpu_info + self._system_info_cache = info + self._system_info_cache_ts = now return info except Exception: return {} def _get_gpu_info(self) -> Optional[str]: """获取 GPU 信息""" + if self._gpu_info_checked: + return self._gpu_info + + self._gpu_info_checked = True try: result = subprocess.run( ['nvidia-smi', '--query-gpu=name', '--format=csv,noheader'], @@ -364,10 +406,11 @@ class APIClientV2: ) if result.returncode == 0: gpu_name = result.stdout.strip().split('\n')[0] - return gpu_name + self._gpu_info = gpu_name except Exception: - pass - return None + self._gpu_info = None + + return self._gpu_info def close(self): """关闭会话""" diff --git a/services/cache.py b/services/cache.py index 97499f1..73d63f7 100644 --- a/services/cache.py +++ b/services/cache.py @@ -10,6 +10,7 @@ import hashlib import logging import shutil import time +import uuid from typing import Optional, Tuple from urllib.parse import urlparse, unquote @@ -59,6 +60,9 @@ class MaterialCache: 负责素材文件的缓存存储和检索。 """ + LOCK_TIMEOUT_SEC = 30.0 + LOCK_POLL_INTERVAL_SEC = 0.1 + def __init__(self, cache_dir: str, enabled: bool = True, max_size_gb: float = 0): """ 初始化缓存管理器 @@ -91,6 +95,44 @@ class MaterialCache: filename = f"{cache_key}{ext}" return os.path.join(self.cache_dir, filename) + def _get_lock_path(self, cache_key: str) -> str: + """获取缓存锁文件路径""" + assert self.cache_dir + return os.path.join(self.cache_dir, f"{cache_key}.lock") + + def _acquire_lock(self, cache_key: str) -> Optional[str]: + """获取缓存锁(跨进程安全)""" + if not self.enabled: + return None + + lock_path = self._get_lock_path(cache_key) + deadline = time.monotonic() + self.LOCK_TIMEOUT_SEC + + while True: + try: + fd = os.open(lock_path, os.O_CREAT | os.O_EXCL | os.O_WRONLY) + os.close(fd) + return lock_path + except FileExistsError: + if time.monotonic() >= deadline: + logger.warning(f"Cache lock timeout: {lock_path}") + return None + time.sleep(self.LOCK_POLL_INTERVAL_SEC) + except Exception as e: + logger.warning(f"Cache lock error: {e}") + return None + + def _release_lock(self, lock_path: Optional[str]) -> None: + """释放缓存锁""" + if not lock_path: + return + try: + os.remove(lock_path) + except FileNotFoundError: + return + except Exception as e: + logger.warning(f"Cache lock release error: {e}") + def is_cached(self, url: str) -> Tuple[bool, str]: """ 检查素材是否已缓存 @@ -136,63 +178,78 @@ class MaterialCache: if not self.enabled: return storage.download_file(url, dest, max_retries=max_retries, timeout=timeout) - # 检查缓存 - cached, cache_path = self.is_cached(url) + cache_key = _extract_cache_key(url) + lock_path = self._acquire_lock(cache_key) + if not lock_path: + logger.warning(f"Cache lock unavailable, downloading without cache: {url[:80]}...") + return storage.download_file(url, dest, max_retries=max_retries, timeout=timeout) - if cached: - # 命中缓存,复制到目标路径 - try: - shutil.copy2(cache_path, dest) - # 更新访问时间(用于 LRU 清理) - os.utime(cache_path, None) - file_size = os.path.getsize(dest) - logger.info(f"Cache hit: {url[:80]}... -> {dest} ({file_size} bytes)") - return True - except Exception as e: - logger.warning(f"Failed to copy from cache: {e}, will re-download") - # 缓存复制失败,删除可能损坏的缓存文件 - try: - os.remove(cache_path) - except Exception: - pass - - # 未命中缓存,下载到缓存目录 - logger.debug(f"Cache miss: {url[:80]}...") - - # 先下载到临时文件 - temp_cache_path = cache_path + '.downloading' try: - if not storage.download_file(url, temp_cache_path, max_retries=max_retries, timeout=timeout): - # 下载失败,清理临时文件 - if os.path.exists(temp_cache_path): - os.remove(temp_cache_path) - return False + cache_path = self.get_cache_path(url) + cached = os.path.exists(cache_path) and os.path.getsize(cache_path) > 0 - # 下载成功,移动到正式缓存路径 - if os.path.exists(cache_path): - os.remove(cache_path) - os.rename(temp_cache_path, cache_path) - - # 复制到目标路径 - shutil.copy2(cache_path, dest) - file_size = os.path.getsize(dest) - logger.info(f"Downloaded and cached: {url[:80]}... ({file_size} bytes)") - - # 检查是否需要清理缓存 - if self.max_size_bytes > 0: - self._cleanup_if_needed() - - return True - - except Exception as e: - logger.error(f"Cache download error: {e}") - # 清理临时文件 - if os.path.exists(temp_cache_path): + if cached: + # 命中缓存,复制到目标路径 try: - os.remove(temp_cache_path) - except Exception: - pass - return False + shutil.copy2(cache_path, dest) + # 更新访问时间(用于 LRU 清理) + os.utime(cache_path, None) + file_size = os.path.getsize(dest) + logger.info(f"Cache hit: {url[:80]}... -> {dest} ({file_size} bytes)") + return True + except Exception as e: + logger.warning(f"Failed to copy from cache: {e}, will re-download") + # 缓存复制失败,删除可能损坏的缓存文件 + try: + os.remove(cache_path) + except Exception: + pass + + # 未命中缓存,下载到缓存目录 + logger.debug(f"Cache miss: {url[:80]}...") + + # 先下载到临时文件(唯一文件名,避免并发覆盖) + temp_cache_path = os.path.join( + self.cache_dir, + f"{cache_key}.{uuid.uuid4().hex}.downloading" + ) + try: + if not storage.download_file(url, temp_cache_path, max_retries=max_retries, timeout=timeout): + # 下载失败,清理临时文件 + if os.path.exists(temp_cache_path): + os.remove(temp_cache_path) + return False + + if not os.path.exists(temp_cache_path) or os.path.getsize(temp_cache_path) <= 0: + if os.path.exists(temp_cache_path): + os.remove(temp_cache_path) + return False + + # 下载成功,原子替换缓存文件 + os.replace(temp_cache_path, cache_path) + + # 复制到目标路径 + shutil.copy2(cache_path, dest) + file_size = os.path.getsize(dest) + logger.info(f"Downloaded and cached: {url[:80]}... ({file_size} bytes)") + + # 检查是否需要清理缓存 + if self.max_size_bytes > 0: + self._cleanup_if_needed() + + return True + + except Exception as e: + logger.error(f"Cache download error: {e}") + # 清理临时文件 + if os.path.exists(temp_cache_path): + try: + os.remove(temp_cache_path) + except Exception: + pass + return False + finally: + self._release_lock(lock_path) def _cleanup_if_needed(self) -> None: """ @@ -209,7 +266,7 @@ class MaterialCache: total_size = 0 for filename in os.listdir(self.cache_dir): - if filename.endswith('.downloading'): + if filename.endswith('.downloading') or filename.endswith('.lock'): continue file_path = os.path.join(self.cache_dir, filename) if os.path.isfile(file_path): @@ -275,7 +332,7 @@ class MaterialCache: total_size = 0 for filename in os.listdir(self.cache_dir): - if filename.endswith('.downloading'): + if filename.endswith('.downloading') or filename.endswith('.lock'): continue file_path = os.path.join(self.cache_dir, filename) if os.path.isfile(file_path): diff --git a/services/storage.py b/services/storage.py index c807b21..99bafc0 100644 --- a/services/storage.py +++ b/services/storage.py @@ -7,6 +7,7 @@ import os import logging +import subprocess from typing import Optional import requests @@ -73,16 +74,16 @@ def upload_file(url: str, file_path: str, max_retries: int = 5, timeout: int = 6 while retries < max_retries: try: with open(file_path, 'rb') as f: - response = requests.put( + with requests.put( http_url, data=f, stream=True, timeout=timeout, headers={"Content-Type": "application/octet-stream"} - ) - response.raise_for_status() - logger.info(f"Upload succeeded: {file_path}") - return True + ) as response: + response.raise_for_status() + logger.info(f"Upload succeeded: {file_path}") + return True except requests.exceptions.Timeout: retries += 1 @@ -111,7 +112,6 @@ def _upload_with_rclone(url: str, file_path: str) -> bool: return False config_file = os.getenv("RCLONE_CONFIG_FILE", "") - rclone_config = f"--config {config_file}" if config_file else "" # 替换 URL new_url = url @@ -123,19 +123,30 @@ def _upload_with_rclone(url: str, file_path: str) -> bool: if new_url == url: return False - cmd = ( - f"rclone copyto --no-check-dest --ignore-existing " - f"--multi-thread-chunk-size 8M --multi-thread-streams 8 " - f"{rclone_config} {file_path} {new_url}" - ) - logger.debug(f"rclone command: {cmd}") + cmd = [ + "rclone", + "copyto", + "--no-check-dest", + "--ignore-existing", + "--multi-thread-chunk-size", + "8M", + "--multi-thread-streams", + "8", + ] + if config_file: + cmd.extend(["--config", config_file]) + cmd.extend([file_path, new_url]) - result = os.system(cmd) - if result == 0: + logger.debug(f"rclone command: {' '.join(cmd)}") + + result = subprocess.run(cmd, capture_output=True, text=True) + if result.returncode == 0: logger.info(f"rclone upload succeeded: {file_path}") return True - logger.warning(f"rclone upload failed (code={result}): {file_path}") + stderr = (result.stderr or '').strip() + stderr = stderr[:500] if stderr else "" + logger.warning(f"rclone upload failed (code={result.returncode}): {file_path} {stderr}") return False @@ -177,13 +188,13 @@ def download_file( retries = 0 while retries < max_retries: try: - response = requests.get(http_url, timeout=timeout, stream=True) - response.raise_for_status() + with requests.get(http_url, timeout=timeout, stream=True) as response: + response.raise_for_status() - with open(file_path, 'wb') as f: - for chunk in response.iter_content(chunk_size=8192): - if chunk: - f.write(chunk) + with open(file_path, 'wb') as f: + for chunk in response.iter_content(chunk_size=8192): + if chunk: + f.write(chunk) file_size = os.path.getsize(file_path) logger.info(f"Download succeeded: {file_path} ({file_size} bytes)") diff --git a/services/task_executor.py b/services/task_executor.py index 917735a..efb8883 100644 --- a/services/task_executor.py +++ b/services/task_executor.py @@ -137,6 +137,14 @@ class TaskExecutor: logger.warning(f"[task:{task.task_id}] Task already running, skipping") return False + # 检查并发上限 + if len(self.current_tasks) >= self.config.max_concurrency: + logger.info( + f"[task:{task.task_id}] Max concurrency reached " + f"({self.config.max_concurrency}), rejecting task" + ) + return False + # 检查是否有对应的处理器 if task.task_type not in self.handlers: logger.error(f"[task:{task.task_id}] No handler for type: {task.task_type.value}") diff --git a/tests/unit/test_material_cache_lock.py b/tests/unit/test_material_cache_lock.py new file mode 100644 index 0000000..be01535 --- /dev/null +++ b/tests/unit/test_material_cache_lock.py @@ -0,0 +1,15 @@ +# -*- coding: utf-8 -*- + +import os + +from services.cache import MaterialCache, _extract_cache_key + + +def test_cache_lock_acquire_release(tmp_path): + cache = MaterialCache(cache_dir=str(tmp_path), enabled=True, max_size_gb=0) + cache_key = _extract_cache_key("https://example.com/path/file.mp4?token=abc") + lock_path = cache._acquire_lock(cache_key) + assert lock_path + assert os.path.exists(lock_path) + cache._release_lock(lock_path) + assert not os.path.exists(lock_path)