diff --git a/services/cache.py b/services/cache.py index 913705b..30b6225 100644 --- a/services/cache.py +++ b/services/cache.py @@ -5,6 +5,7 @@ 提供素材下载缓存功能,避免相同素材重复下载。 """ +import json import os import hashlib import logging @@ -14,6 +15,8 @@ import uuid from typing import Optional, Tuple from urllib.parse import urlparse, unquote +import psutil + from services import storage logger = logging.getLogger(__name__) @@ -62,6 +65,7 @@ class MaterialCache: LOCK_TIMEOUT_SEC = 30.0 LOCK_POLL_INTERVAL_SEC = 0.1 + LOCK_STALE_SECONDS = 24 * 60 * 60 def __init__(self, cache_dir: str, enabled: bool = True, max_size_gb: float = 0): """ @@ -100,6 +104,96 @@ class MaterialCache: assert self.cache_dir return os.path.join(self.cache_dir, f"{cache_key}.lock") + def _write_lock_metadata(self, lock_fd: int, lock_path: str) -> bool: + """写入锁元数据,失败则清理锁文件""" + try: + try: + process_start_time = psutil.Process(os.getpid()).create_time() + except Exception as e: + process_start_time = None + logger.warning(f"Cache lock process start time error: {e}") + metadata = { + 'pid': os.getpid(), + 'process_start_time': process_start_time, + 'created_at': time.time() + } + with os.fdopen(lock_fd, 'w', encoding='utf-8') as lock_file: + json.dump(metadata, lock_file) + return True + except Exception as e: + try: + os.close(lock_fd) + except Exception: + pass + self._remove_lock_file(lock_path, f"write metadata failed: {e}") + return False + + def _read_lock_metadata(self, lock_path: str) -> Optional[dict]: + """读取锁元数据,失败返回 None(兼容历史空锁文件)""" + try: + with open(lock_path, 'r', encoding='utf-8') as lock_file: + data = json.load(lock_file) + return data if isinstance(data, dict) else None + except Exception: + return None + + def _is_process_alive(self, pid: int, expected_start_time: Optional[float]) -> bool: + """判断进程是否存活并校验启动时间(防止 PID 复用)""" + try: + process = psutil.Process(pid) + if expected_start_time is None: + return process.is_running() + actual_start_time = process.create_time() + return abs(actual_start_time - expected_start_time) < 1.0 + except psutil.NoSuchProcess: + return False + except Exception as e: + logger.warning(f"Cache lock process check error: {e}") + return True + + def _is_lock_stale(self, lock_path: str) -> bool: + """判断锁是否过期(进程已退出或超过最大存活时长)""" + if not os.path.exists(lock_path): + return False + now = time.time() + metadata = self._read_lock_metadata(lock_path) + if metadata: + created_at = metadata.get('created_at') + if isinstance(created_at, (int, float)) and now - created_at > self.LOCK_STALE_SECONDS: + return True + pid = metadata.get('pid') + pid_value = int(pid) if isinstance(pid, int) or (isinstance(pid, str) and pid.isdigit()) else None + expected_start_time = metadata.get('process_start_time') + expected_start_time_value = ( + expected_start_time if isinstance(expected_start_time, (int, float)) else None + ) + if pid_value is not None and not self._is_process_alive(pid_value, expected_start_time_value): + return True + return self._is_lock_stale_by_mtime(lock_path, now) + return self._is_lock_stale_by_mtime(lock_path, now) + + def _is_lock_stale_by_mtime(self, lock_path: str, now: float) -> bool: + """基于文件时间判断锁是否过期""" + try: + mtime = os.path.getmtime(lock_path) + return now - mtime > self.LOCK_STALE_SECONDS + except Exception as e: + logger.warning(f"Cache lock stat error: {e}") + return False + + def _remove_lock_file(self, lock_path: str, reason: str = "") -> bool: + """删除锁文件""" + try: + os.remove(lock_path) + if reason: + logger.info(f"Cache lock removed: {lock_path} ({reason})") + return True + except FileNotFoundError: + return True + except Exception as e: + logger.warning(f"Cache lock remove error: {e}") + return False + def _acquire_lock(self, cache_key: str) -> Optional[str]: """获取缓存锁(跨进程安全)""" if not self.enabled: @@ -111,9 +205,14 @@ class MaterialCache: while True: try: fd = os.open(lock_path, os.O_CREAT | os.O_EXCL | os.O_WRONLY) - os.close(fd) + if not self._write_lock_metadata(fd, lock_path): + return None return lock_path except FileExistsError: + if self._is_lock_stale(lock_path): + removed = self._remove_lock_file(lock_path, "stale lock") + if removed: + continue if time.monotonic() >= deadline: logger.warning(f"Cache lock timeout: {lock_path}") return None @@ -126,12 +225,7 @@ class MaterialCache: """释放缓存锁""" if not lock_path: return - try: - os.remove(lock_path) - except FileNotFoundError: - return - except Exception as e: - logger.warning(f"Cache lock release error: {e}") + self._remove_lock_file(lock_path) def is_cached(self, url: str) -> Tuple[bool, str]: """ @@ -357,9 +451,12 @@ class MaterialCache: cache_key = os.path.splitext(filename)[0] lock_path = self._get_lock_path(cache_key) if os.path.exists(lock_path): - # 该文件正在被其他任务使用,跳过删除 - logger.debug(f"Cache cleanup: skipping locked file {filename}") - continue + if self._is_lock_stale(lock_path): + self._remove_lock_file(lock_path, "cleanup stale lock") + else: + # 该文件正在被其他任务使用,跳过删除 + logger.debug(f"Cache cleanup: skipping locked file {filename}") + continue try: os.remove(file_info['path']) total_size -= file_info['size']