From 16ea45ad1ce734f159bef2add8c76587716e5874 Mon Sep 17 00:00:00 2001 From: Jerry Yan <792602257@qq.com> Date: Sat, 7 Feb 2026 03:45:52 +0800 Subject: [PATCH] =?UTF-8?q?perf(cache):=20=E4=BC=98=E5=8C=96=E7=BC=93?= =?UTF-8?q?=E5=AD=98=E4=B8=8B=E8=BD=BD=E9=80=BB=E8=BE=91=E5=B9=B6=E6=B7=BB?= =?UTF-8?q?=E5=8A=A0=E6=80=A7=E8=83=BD=E6=8C=87=E6=A0=87=E8=BF=BD=E8=B8=AA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 实现了带等待时间统计的缓存锁获取功能 - 新增 get_or_download_with_metrics 方法返回详细的性能指标 - 在 tracing span 中记录锁等待时间、锁获取状态和缓存路径使用情况 - 优化缓存命中路径避免不必要的锁获取操作 - 添加了缓存文件就绪检查和复制功能的独立方法 - 增加了针对缓存锁超时但仍可使用就绪缓存的处理逻辑 - 新增了多个单元测试验证缓存锁定和指标报告功能 --- handlers/base.py | 18 +++- services/cache.py | 140 ++++++++++++++++++------- tests/unit/test_material_cache_lock.py | 86 +++++++++++++++ 3 files changed, 208 insertions(+), 36 deletions(-) diff --git a/handlers/base.py b/handlers/base.py index 22cf223..fd71626 100644 --- a/handlers/base.py +++ b/handlers/base.py @@ -682,10 +682,26 @@ class BaseHandler(TaskHandler, ABC): }, ) as span: try: + lock_wait_ms = 0 + lock_acquired = False + cache_path_used = "unknown" if use_cache: - result = self.material_cache.get_or_download(url, dest, timeout=timeout) + result, cache_metrics = self.material_cache.get_or_download_with_metrics( + url, + dest, + timeout=timeout + ) + lock_wait_ms = int(cache_metrics.get("lock_wait_ms", 0)) + lock_acquired = bool(cache_metrics.get("lock_acquired", False)) + cache_path_used = str(cache_metrics.get("cache_path_used", "unknown")) else: result = storage.download_file(url, dest, timeout=timeout) + cache_path_used = "direct" + + if span is not None: + span.set_attribute("render.file.lock_wait_ms", lock_wait_ms) + span.set_attribute("render.file.lock_acquired", lock_acquired) + span.set_attribute("render.file.cache_path_used", cache_path_used) if result: file_size = os.path.getsize(dest) if os.path.exists(dest) else 0 diff --git a/services/cache.py b/services/cache.py index 30b6225..e7a5b41 100644 --- a/services/cache.py +++ b/services/cache.py @@ -12,7 +12,7 @@ import logging import shutil import time import uuid -from typing import Optional, Tuple +from typing import Any, Dict, Optional, Tuple from urllib.parse import urlparse, unquote import psutil @@ -66,6 +66,7 @@ class MaterialCache: LOCK_TIMEOUT_SEC = 30.0 LOCK_POLL_INTERVAL_SEC = 0.1 LOCK_STALE_SECONDS = 24 * 60 * 60 + DOWNLOAD_LOCK_TIMEOUT_SEC = 5.0 def __init__(self, cache_dir: str, enabled: bool = True, max_size_gb: float = 0): """ @@ -194,13 +195,14 @@ class MaterialCache: logger.warning(f"Cache lock remove error: {e}") return False - def _acquire_lock(self, cache_key: str) -> Optional[str]: + def _acquire_lock(self, cache_key: str, timeout_sec: Optional[float] = None) -> Optional[str]: """获取缓存锁(跨进程安全)""" if not self.enabled: return None + wait_timeout_sec = self.LOCK_TIMEOUT_SEC if timeout_sec is None else max(float(timeout_sec), 0.0) lock_path = self._get_lock_path(cache_key) - deadline = time.monotonic() + self.LOCK_TIMEOUT_SEC + deadline = time.monotonic() + wait_timeout_sec while True: try: @@ -214,13 +216,24 @@ class MaterialCache: if removed: continue if time.monotonic() >= deadline: - logger.warning(f"Cache lock timeout: {lock_path}") + logger.warning(f"Cache lock timeout ({wait_timeout_sec:.1f}s): {lock_path}") return None time.sleep(self.LOCK_POLL_INTERVAL_SEC) except Exception as e: logger.warning(f"Cache lock error: {e}") return None + def _acquire_lock_with_wait( + self, + cache_key: str, + timeout_sec: Optional[float] = None + ) -> Tuple[Optional[str], int]: + """获取缓存锁并返回等待时长(毫秒)""" + start_time = time.monotonic() + lock_path = self._acquire_lock(cache_key, timeout_sec=timeout_sec) + lock_wait_ms = max(int((time.monotonic() - start_time) * 1000), 0) + return lock_path, lock_wait_ms + def _release_lock(self, lock_path: Optional[str]) -> None: """释放缓存锁""" if not lock_path: @@ -244,6 +257,27 @@ class MaterialCache: exists = os.path.exists(cache_path) and os.path.getsize(cache_path) > 0 return exists, cache_path + def _is_cache_file_ready(self, cache_path: str) -> bool: + """缓存文件是否已就绪(存在且大小大于 0)""" + try: + return os.path.exists(cache_path) and os.path.getsize(cache_path) > 0 + except Exception: + return False + + def _copy_cache_to_dest(self, cache_path: str, dest: str) -> Tuple[bool, int]: + """将缓存文件复制到目标路径并返回结果与文件大小""" + try: + shutil.copy2(cache_path, dest) + try: + os.utime(cache_path, None) + except Exception as e: + logger.debug(f"Failed to update cache access time: {e}") + file_size = os.path.getsize(dest) if os.path.exists(dest) else 0 + return True, file_size + except Exception as e: + logger.warning(f"Failed to copy from cache: {e}") + return False, 0 + def get_or_download( self, url: str, @@ -251,8 +285,24 @@ class MaterialCache: timeout: int = 300, max_retries: int = 5 ) -> bool: + """兼容旧接口:返回下载是否成功。""" + result, _ = self.get_or_download_with_metrics( + url=url, + dest=dest, + timeout=timeout, + max_retries=max_retries, + ) + return result + + def get_or_download_with_metrics( + self, + url: str, + dest: str, + timeout: int = 300, + max_retries: int = 5 + ) -> Tuple[bool, Dict[str, Any]]: """ - 从缓存获取素材,若未缓存则下载并缓存 + 从缓存获取素材,若未缓存则下载并缓存,并返回关键指标。 Args: url: 素材 URL @@ -261,8 +311,14 @@ class MaterialCache: max_retries: 最大重试次数 Returns: - 是否成功 + (是否成功, 指标字典) """ + metrics: Dict[str, Any] = { + "lock_wait_ms": 0, + "lock_acquired": False, + "cache_path_used": "unknown", + } + # 确保目标目录存在 dest_dir = os.path.dirname(dest) if dest_dir: @@ -270,34 +326,49 @@ class MaterialCache: # 缓存未启用时直接下载 if not self.enabled: - return storage.download_file(url, dest, max_retries=max_retries, timeout=timeout) + result = storage.download_file(url, dest, max_retries=max_retries, timeout=timeout) + metrics["cache_path_used"] = "direct" + return result, metrics cache_key = _extract_cache_key(url) - lock_path = self._acquire_lock(cache_key) + cache_path = self.get_cache_path(url) + + def _try_serve_from_cache(log_prefix: str, delete_on_failure: bool = False) -> bool: + if not self._is_cache_file_ready(cache_path): + return False + copied, file_size = self._copy_cache_to_dest(cache_path, dest) + if copied: + metrics["cache_path_used"] = "cache" + logger.info(f"{log_prefix}: {url[:80]}... -> {dest} ({file_size} bytes)") + return True + if delete_on_failure: + try: + os.remove(cache_path) + except Exception: + pass + return False + + if _try_serve_from_cache("Cache hit"): + return True, metrics + + lock_path, lock_wait_ms = self._acquire_lock_with_wait( + cache_key, + timeout_sec=self.DOWNLOAD_LOCK_TIMEOUT_SEC, + ) + metrics["lock_wait_ms"] = lock_wait_ms if not lock_path: + if _try_serve_from_cache("Cache hit after lock timeout"): + return True, metrics logger.warning(f"Cache lock unavailable, downloading without cache: {url[:80]}...") - return storage.download_file(url, dest, max_retries=max_retries, timeout=timeout) + result = storage.download_file(url, dest, max_retries=max_retries, timeout=timeout) + metrics["cache_path_used"] = "direct" + return result, metrics + + metrics["lock_acquired"] = True try: - cache_path = self.get_cache_path(url) - cached = os.path.exists(cache_path) and os.path.getsize(cache_path) > 0 - - if cached: - # 命中缓存,复制到目标路径 - try: - shutil.copy2(cache_path, dest) - # 更新访问时间(用于 LRU 清理) - os.utime(cache_path, None) - file_size = os.path.getsize(dest) - logger.info(f"Cache hit: {url[:80]}... -> {dest} ({file_size} bytes)") - return True - except Exception as e: - logger.warning(f"Failed to copy from cache: {e}, will re-download") - # 缓存复制失败,删除可能损坏的缓存文件 - try: - os.remove(cache_path) - except Exception: - pass + if _try_serve_from_cache("Cache hit", delete_on_failure=True): + return True, metrics # 未命中缓存,下载到缓存目录 logger.debug(f"Cache miss: {url[:80]}...") @@ -312,26 +383,25 @@ class MaterialCache: # 下载失败,清理临时文件 if os.path.exists(temp_cache_path): os.remove(temp_cache_path) - return False + return False, metrics if not os.path.exists(temp_cache_path) or os.path.getsize(temp_cache_path) <= 0: if os.path.exists(temp_cache_path): os.remove(temp_cache_path) - return False + return False, metrics # 下载成功,原子替换缓存文件 os.replace(temp_cache_path, cache_path) # 复制到目标路径 - shutil.copy2(cache_path, dest) - file_size = os.path.getsize(dest) - logger.info(f"Downloaded and cached: {url[:80]}... ({file_size} bytes)") + if not _try_serve_from_cache("Downloaded and cached", delete_on_failure=False): + return False, metrics # 检查是否需要清理缓存 if self.max_size_bytes > 0: self._cleanup_if_needed() - return True + return True, metrics except Exception as e: logger.error(f"Cache download error: {e}") @@ -341,7 +411,7 @@ class MaterialCache: os.remove(temp_cache_path) except Exception: pass - return False + return False, metrics finally: self._release_lock(lock_path) diff --git a/tests/unit/test_material_cache_lock.py b/tests/unit/test_material_cache_lock.py index be01535..fae1199 100644 --- a/tests/unit/test_material_cache_lock.py +++ b/tests/unit/test_material_cache_lock.py @@ -13,3 +13,89 @@ def test_cache_lock_acquire_release(tmp_path): assert os.path.exists(lock_path) cache._release_lock(lock_path) assert not os.path.exists(lock_path) + + +def test_get_or_download_cache_hit_does_not_wait_lock(tmp_path, monkeypatch): + cache = MaterialCache(cache_dir=str(tmp_path), enabled=True, max_size_gb=0) + url = "https://example.com/path/video.mp4?token=abc" + cache_path = cache.get_cache_path(url) + with open(cache_path, 'wb') as file_obj: + file_obj.write(b'cached-data') + destination = tmp_path / "result.bin" + + def _unexpected_acquire(*args, **kwargs): + raise AssertionError("cache hit path should not acquire lock") + + monkeypatch.setattr(cache, "_acquire_lock", _unexpected_acquire) + + assert cache.get_or_download(url, str(destination), timeout=1) is True + assert destination.read_bytes() == b'cached-data' + + +def test_get_or_download_lock_timeout_can_still_use_ready_cache(tmp_path, monkeypatch): + cache = MaterialCache(cache_dir=str(tmp_path), enabled=True, max_size_gb=0) + url = "https://example.com/path/audio.aac?token=abc" + cache_path = cache.get_cache_path(url) + with open(cache_path, 'wb') as file_obj: + file_obj.write(b'audio-cache') + destination = tmp_path / "audio.aac" + download_called = {"value": False} + + monkeypatch.setattr(cache, "_acquire_lock", lambda *args, **kwargs: None) + + def _fake_download(*args, **kwargs): + download_called["value"] = True + return False + + monkeypatch.setattr("services.cache.storage.download_file", _fake_download) + + assert cache.get_or_download(url, str(destination), timeout=1) is True + assert destination.read_bytes() == b'audio-cache' + assert download_called["value"] is False + + +def test_get_or_download_uses_short_lock_timeout(tmp_path, monkeypatch): + cache = MaterialCache(cache_dir=str(tmp_path), enabled=True, max_size_gb=0) + url = "https://example.com/path/segment.ts?token=abc" + destination = tmp_path / "segment.ts" + captured = {"timeout_sec": None} + + def _fake_acquire(cache_key, timeout_sec=None): + captured["timeout_sec"] = timeout_sec + return None + + monkeypatch.setattr(cache, "_acquire_lock", _fake_acquire) + monkeypatch.setattr("services.cache.storage.download_file", lambda *args, **kwargs: True) + + assert cache.get_or_download(url, str(destination), timeout=1) is True + assert captured["timeout_sec"] == cache.DOWNLOAD_LOCK_TIMEOUT_SEC + + +def test_get_or_download_with_metrics_cache_hit_wait_zero(tmp_path): + cache = MaterialCache(cache_dir=str(tmp_path), enabled=True, max_size_gb=0) + url = "https://example.com/path/hit.mp4?token=abc" + cache_path = cache.get_cache_path(url) + with open(cache_path, 'wb') as file_obj: + file_obj.write(b'hit-data') + destination = tmp_path / "hit.mp4" + + success, metrics = cache.get_or_download_with_metrics(url, str(destination), timeout=1) + assert success is True + assert metrics["lock_wait_ms"] == 0 + assert metrics["lock_acquired"] is False + assert metrics["cache_path_used"] == "cache" + + +def test_get_or_download_with_metrics_reports_lock_wait_ms(tmp_path, monkeypatch): + cache = MaterialCache(cache_dir=str(tmp_path), enabled=True, max_size_gb=0) + url = "https://example.com/path/miss.mp4?token=abc" + destination = tmp_path / "miss.mp4" + + monkeypatch.setattr(cache, "_acquire_lock_with_wait", lambda *args, **kwargs: (None, 4321)) + monkeypatch.setattr("services.cache.storage.download_file", lambda *args, **kwargs: True) + + success, metrics = cache.get_or_download_with_metrics(url, str(destination), timeout=1) + assert success is True + assert metrics["lock_wait_ms"] == 4321 + assert metrics["lock_acquired"] is False + assert metrics["cache_path_used"] == "direct"