# -*- coding: utf-8 -*- """ 素材缓存服务 提供素材下载缓存功能,避免相同素材重复下载。 """ import json import os import hashlib import logging import shutil import time import uuid from typing import Optional, Tuple from urllib.parse import urlparse, unquote import psutil from services import storage logger = logging.getLogger(__name__) def _extract_cache_key(url: str) -> str: """ 从 URL 提取缓存键 去除签名等查询参数,保留路径作为唯一标识。 Args: url: 完整的素材 URL Returns: 缓存键(URL 路径的 MD5 哈希) """ parsed = urlparse(url) # 使用 scheme + host + path 作为唯一标识(忽略签名等查询参数) cache_key_source = f"{parsed.scheme}://{parsed.netloc}{unquote(parsed.path)}" return hashlib.md5(cache_key_source.encode('utf-8')).hexdigest() def _get_file_extension(url: str) -> str: """ 从 URL 提取文件扩展名 Args: url: 素材 URL Returns: 文件扩展名(如 .mp4, .png),无法识别时返回空字符串 """ parsed = urlparse(url) path = unquote(parsed.path) _, ext = os.path.splitext(path) return ext.lower() if ext else '' class MaterialCache: """ 素材缓存管理器 负责素材文件的缓存存储和检索。 """ LOCK_TIMEOUT_SEC = 30.0 LOCK_POLL_INTERVAL_SEC = 0.1 LOCK_STALE_SECONDS = 24 * 60 * 60 def __init__(self, cache_dir: str, enabled: bool = True, max_size_gb: float = 0): """ 初始化缓存管理器 Args: cache_dir: 缓存目录路径 enabled: 是否启用缓存 max_size_gb: 最大缓存大小(GB),0 表示不限制 """ self.cache_dir = cache_dir self.enabled = enabled self.max_size_bytes = int(max_size_gb * 1024 * 1024 * 1024) if max_size_gb > 0 else 0 if self.enabled: os.makedirs(self.cache_dir, exist_ok=True) logger.info(f"Material cache initialized: {cache_dir}") def get_cache_path(self, url: str) -> str: """ 获取素材的缓存文件路径 Args: url: 素材 URL Returns: 缓存文件的完整路径 """ cache_key = _extract_cache_key(url) ext = _get_file_extension(url) filename = f"{cache_key}{ext}" return os.path.join(self.cache_dir, filename) def _get_lock_path(self, cache_key: str) -> str: """获取缓存锁文件路径""" assert self.cache_dir return os.path.join(self.cache_dir, f"{cache_key}.lock") def _write_lock_metadata(self, lock_fd: int, lock_path: str) -> bool: """写入锁元数据,失败则清理锁文件""" try: try: process_start_time = psutil.Process(os.getpid()).create_time() except Exception as e: process_start_time = None logger.warning(f"Cache lock process start time error: {e}") metadata = { 'pid': os.getpid(), 'process_start_time': process_start_time, 'created_at': time.time() } with os.fdopen(lock_fd, 'w', encoding='utf-8') as lock_file: json.dump(metadata, lock_file) return True except Exception as e: try: os.close(lock_fd) except Exception: pass self._remove_lock_file(lock_path, f"write metadata failed: {e}") return False def _read_lock_metadata(self, lock_path: str) -> Optional[dict]: """读取锁元数据,失败返回 None(兼容历史空锁文件)""" try: with open(lock_path, 'r', encoding='utf-8') as lock_file: data = json.load(lock_file) return data if isinstance(data, dict) else None except Exception: return None def _is_process_alive(self, pid: int, expected_start_time: Optional[float]) -> bool: """判断进程是否存活并校验启动时间(防止 PID 复用)""" try: process = psutil.Process(pid) if expected_start_time is None: return process.is_running() actual_start_time = process.create_time() return abs(actual_start_time - expected_start_time) < 1.0 except psutil.NoSuchProcess: return False except Exception as e: logger.warning(f"Cache lock process check error: {e}") return True def _is_lock_stale(self, lock_path: str) -> bool: """判断锁是否过期(进程已退出或超过最大存活时长)""" if not os.path.exists(lock_path): return False now = time.time() metadata = self._read_lock_metadata(lock_path) if metadata: created_at = metadata.get('created_at') if isinstance(created_at, (int, float)) and now - created_at > self.LOCK_STALE_SECONDS: return True pid = metadata.get('pid') pid_value = int(pid) if isinstance(pid, int) or (isinstance(pid, str) and pid.isdigit()) else None expected_start_time = metadata.get('process_start_time') expected_start_time_value = ( expected_start_time if isinstance(expected_start_time, (int, float)) else None ) if pid_value is not None and not self._is_process_alive(pid_value, expected_start_time_value): return True return self._is_lock_stale_by_mtime(lock_path, now) return self._is_lock_stale_by_mtime(lock_path, now) def _is_lock_stale_by_mtime(self, lock_path: str, now: float) -> bool: """基于文件时间判断锁是否过期""" try: mtime = os.path.getmtime(lock_path) return now - mtime > self.LOCK_STALE_SECONDS except Exception as e: logger.warning(f"Cache lock stat error: {e}") return False def _remove_lock_file(self, lock_path: str, reason: str = "") -> bool: """删除锁文件""" try: os.remove(lock_path) if reason: logger.info(f"Cache lock removed: {lock_path} ({reason})") return True except FileNotFoundError: return True except Exception as e: logger.warning(f"Cache lock remove error: {e}") return False def _acquire_lock(self, cache_key: str) -> Optional[str]: """获取缓存锁(跨进程安全)""" if not self.enabled: return None lock_path = self._get_lock_path(cache_key) deadline = time.monotonic() + self.LOCK_TIMEOUT_SEC while True: try: fd = os.open(lock_path, os.O_CREAT | os.O_EXCL | os.O_WRONLY) if not self._write_lock_metadata(fd, lock_path): return None return lock_path except FileExistsError: if self._is_lock_stale(lock_path): removed = self._remove_lock_file(lock_path, "stale lock") if removed: continue if time.monotonic() >= deadline: logger.warning(f"Cache lock timeout: {lock_path}") return None time.sleep(self.LOCK_POLL_INTERVAL_SEC) except Exception as e: logger.warning(f"Cache lock error: {e}") return None def _release_lock(self, lock_path: Optional[str]) -> None: """释放缓存锁""" if not lock_path: return self._remove_lock_file(lock_path) def is_cached(self, url: str) -> Tuple[bool, str]: """ 检查素材是否已缓存 Args: url: 素材 URL Returns: (是否已缓存, 缓存文件路径) """ if not self.enabled: return False, '' cache_path = self.get_cache_path(url) exists = os.path.exists(cache_path) and os.path.getsize(cache_path) > 0 return exists, cache_path def get_or_download( self, url: str, dest: str, timeout: int = 300, max_retries: int = 5 ) -> bool: """ 从缓存获取素材,若未缓存则下载并缓存 Args: url: 素材 URL dest: 目标文件路径(任务工作目录中的路径) timeout: 下载超时时间(秒) max_retries: 最大重试次数 Returns: 是否成功 """ # 确保目标目录存在 dest_dir = os.path.dirname(dest) if dest_dir: os.makedirs(dest_dir, exist_ok=True) # 缓存未启用时直接下载 if not self.enabled: return storage.download_file(url, dest, max_retries=max_retries, timeout=timeout) cache_key = _extract_cache_key(url) lock_path = self._acquire_lock(cache_key) if not lock_path: logger.warning(f"Cache lock unavailable, downloading without cache: {url[:80]}...") return storage.download_file(url, dest, max_retries=max_retries, timeout=timeout) try: cache_path = self.get_cache_path(url) cached = os.path.exists(cache_path) and os.path.getsize(cache_path) > 0 if cached: # 命中缓存,复制到目标路径 try: shutil.copy2(cache_path, dest) # 更新访问时间(用于 LRU 清理) os.utime(cache_path, None) file_size = os.path.getsize(dest) logger.info(f"Cache hit: {url[:80]}... -> {dest} ({file_size} bytes)") return True except Exception as e: logger.warning(f"Failed to copy from cache: {e}, will re-download") # 缓存复制失败,删除可能损坏的缓存文件 try: os.remove(cache_path) except Exception: pass # 未命中缓存,下载到缓存目录 logger.debug(f"Cache miss: {url[:80]}...") # 先下载到临时文件(唯一文件名,避免并发覆盖) temp_cache_path = os.path.join( self.cache_dir, f"{cache_key}.{uuid.uuid4().hex}.downloading" ) try: if not storage.download_file(url, temp_cache_path, max_retries=max_retries, timeout=timeout): # 下载失败,清理临时文件 if os.path.exists(temp_cache_path): os.remove(temp_cache_path) return False if not os.path.exists(temp_cache_path) or os.path.getsize(temp_cache_path) <= 0: if os.path.exists(temp_cache_path): os.remove(temp_cache_path) return False # 下载成功,原子替换缓存文件 os.replace(temp_cache_path, cache_path) # 复制到目标路径 shutil.copy2(cache_path, dest) file_size = os.path.getsize(dest) logger.info(f"Downloaded and cached: {url[:80]}... ({file_size} bytes)") # 检查是否需要清理缓存 if self.max_size_bytes > 0: self._cleanup_if_needed() return True except Exception as e: logger.error(f"Cache download error: {e}") # 清理临时文件 if os.path.exists(temp_cache_path): try: os.remove(temp_cache_path) except Exception: pass return False finally: self._release_lock(lock_path) def add_to_cache(self, url: str, source_path: str) -> bool: """ 将本地文件添加到缓存 Args: url: 对应的 URL(用于生成缓存键) source_path: 本地文件路径 Returns: 是否成功 """ if not self.enabled: return False if not os.path.exists(source_path): logger.warning(f"Source file not found for cache: {source_path}") return False cache_key = _extract_cache_key(url) lock_path = self._acquire_lock(cache_key) if not lock_path: logger.warning(f"Cache lock unavailable for adding: {url[:80]}...") return False try: cache_path = self.get_cache_path(url) # 先复制到临时文件 temp_cache_path = os.path.join( self.cache_dir, f"{cache_key}.{uuid.uuid4().hex}.adding" ) shutil.copy2(source_path, temp_cache_path) # 原子替换 os.replace(temp_cache_path, cache_path) # 更新访问时间 os.utime(cache_path, None) logger.info(f"Added to cache: {url[:80]}... <- {source_path}") # 检查清理 if self.max_size_bytes > 0: self._cleanup_if_needed() return True except Exception as e: logger.error(f"Failed to add to cache: {e}") if 'temp_cache_path' in locals() and os.path.exists(temp_cache_path): try: os.remove(temp_cache_path) except Exception: pass return False finally: self._release_lock(lock_path) def _cleanup_if_needed(self) -> None: """ 检查并清理缓存(LRU 策略) 当缓存大小超过限制时,删除最久未访问的文件。 """ if self.max_size_bytes <= 0: return try: # 获取所有缓存文件及其信息 cache_files = [] total_size = 0 for filename in os.listdir(self.cache_dir): if filename.endswith('.downloading') or filename.endswith('.lock'): continue file_path = os.path.join(self.cache_dir, filename) if os.path.isfile(file_path): stat = os.stat(file_path) cache_files.append({ 'path': file_path, 'size': stat.st_size, 'atime': stat.st_atime }) total_size += stat.st_size # 如果未超过限制,无需清理 if total_size <= self.max_size_bytes: return # 按访问时间排序(最久未访问的在前) cache_files.sort(key=lambda x: x['atime']) # 删除文件直到低于限制的 80% target_size = int(self.max_size_bytes * 0.8) deleted_count = 0 for file_info in cache_files: if total_size <= target_size: break # 从文件名提取 cache_key,检查是否有锁(说明正在被使用) filename = os.path.basename(file_info['path']) cache_key = os.path.splitext(filename)[0] lock_path = self._get_lock_path(cache_key) if os.path.exists(lock_path): if self._is_lock_stale(lock_path): self._remove_lock_file(lock_path, "cleanup stale lock") else: # 该文件正在被其他任务使用,跳过删除 logger.debug(f"Cache cleanup: skipping locked file {filename}") continue try: os.remove(file_info['path']) total_size -= file_info['size'] deleted_count += 1 except Exception as e: logger.warning(f"Failed to delete cache file: {e}") if deleted_count > 0: logger.info(f"Cache cleanup: deleted {deleted_count} files, current size: {total_size / (1024*1024*1024):.2f} GB") except Exception as e: logger.warning(f"Cache cleanup error: {e}") def clear(self) -> None: """清空所有缓存""" if not self.enabled: return try: if os.path.exists(self.cache_dir): shutil.rmtree(self.cache_dir) os.makedirs(self.cache_dir, exist_ok=True) logger.info("Cache cleared") except Exception as e: logger.error(f"Failed to clear cache: {e}") def get_stats(self) -> dict: """ 获取缓存统计信息 Returns: 包含缓存统计的字典 """ if not self.enabled or not os.path.exists(self.cache_dir): return {'enabled': False, 'file_count': 0, 'total_size_mb': 0} file_count = 0 total_size = 0 for filename in os.listdir(self.cache_dir): if filename.endswith('.downloading') or filename.endswith('.lock'): continue file_path = os.path.join(self.cache_dir, filename) if os.path.isfile(file_path): file_count += 1 total_size += os.path.getsize(file_path) return { 'enabled': True, 'cache_dir': self.cache_dir, 'file_count': file_count, 'total_size_mb': round(total_size / (1024 * 1024), 2), 'max_size_gb': self.max_size_bytes / (1024 * 1024 * 1024) if self.max_size_bytes > 0 else 0 }