# -*- coding: utf-8 -*- """ 素材缓存服务 提供素材下载缓存功能,避免相同素材重复下载。 """ import os import hashlib import logging import shutil import time import uuid from typing import Optional, Tuple from urllib.parse import urlparse, unquote from services import storage logger = logging.getLogger(__name__) def _extract_cache_key(url: str) -> str: """ 从 URL 提取缓存键 去除签名等查询参数,保留路径作为唯一标识。 Args: url: 完整的素材 URL Returns: 缓存键(URL 路径的 MD5 哈希) """ parsed = urlparse(url) # 使用 scheme + host + path 作为唯一标识(忽略签名等查询参数) cache_key_source = f"{parsed.scheme}://{parsed.netloc}{unquote(parsed.path)}" return hashlib.md5(cache_key_source.encode('utf-8')).hexdigest() def _get_file_extension(url: str) -> str: """ 从 URL 提取文件扩展名 Args: url: 素材 URL Returns: 文件扩展名(如 .mp4, .png),无法识别时返回空字符串 """ parsed = urlparse(url) path = unquote(parsed.path) _, ext = os.path.splitext(path) return ext.lower() if ext else '' class MaterialCache: """ 素材缓存管理器 负责素材文件的缓存存储和检索。 """ LOCK_TIMEOUT_SEC = 30.0 LOCK_POLL_INTERVAL_SEC = 0.1 def __init__(self, cache_dir: str, enabled: bool = True, max_size_gb: float = 0): """ 初始化缓存管理器 Args: cache_dir: 缓存目录路径 enabled: 是否启用缓存 max_size_gb: 最大缓存大小(GB),0 表示不限制 """ self.cache_dir = cache_dir self.enabled = enabled self.max_size_bytes = int(max_size_gb * 1024 * 1024 * 1024) if max_size_gb > 0 else 0 if self.enabled: os.makedirs(self.cache_dir, exist_ok=True) logger.info(f"Material cache initialized: {cache_dir}") def get_cache_path(self, url: str) -> str: """ 获取素材的缓存文件路径 Args: url: 素材 URL Returns: 缓存文件的完整路径 """ cache_key = _extract_cache_key(url) ext = _get_file_extension(url) filename = f"{cache_key}{ext}" return os.path.join(self.cache_dir, filename) def _get_lock_path(self, cache_key: str) -> str: """获取缓存锁文件路径""" assert self.cache_dir return os.path.join(self.cache_dir, f"{cache_key}.lock") def _acquire_lock(self, cache_key: str) -> Optional[str]: """获取缓存锁(跨进程安全)""" if not self.enabled: return None lock_path = self._get_lock_path(cache_key) deadline = time.monotonic() + self.LOCK_TIMEOUT_SEC while True: try: fd = os.open(lock_path, os.O_CREAT | os.O_EXCL | os.O_WRONLY) os.close(fd) return lock_path except FileExistsError: if time.monotonic() >= deadline: logger.warning(f"Cache lock timeout: {lock_path}") return None time.sleep(self.LOCK_POLL_INTERVAL_SEC) except Exception as e: logger.warning(f"Cache lock error: {e}") return None def _release_lock(self, lock_path: Optional[str]) -> None: """释放缓存锁""" if not lock_path: return try: os.remove(lock_path) except FileNotFoundError: return except Exception as e: logger.warning(f"Cache lock release error: {e}") def is_cached(self, url: str) -> Tuple[bool, str]: """ 检查素材是否已缓存 Args: url: 素材 URL Returns: (是否已缓存, 缓存文件路径) """ if not self.enabled: return False, '' cache_path = self.get_cache_path(url) exists = os.path.exists(cache_path) and os.path.getsize(cache_path) > 0 return exists, cache_path def get_or_download( self, url: str, dest: str, timeout: int = 300, max_retries: int = 5 ) -> bool: """ 从缓存获取素材,若未缓存则下载并缓存 Args: url: 素材 URL dest: 目标文件路径(任务工作目录中的路径) timeout: 下载超时时间(秒) max_retries: 最大重试次数 Returns: 是否成功 """ # 确保目标目录存在 dest_dir = os.path.dirname(dest) if dest_dir: os.makedirs(dest_dir, exist_ok=True) # 缓存未启用时直接下载 if not self.enabled: return storage.download_file(url, dest, max_retries=max_retries, timeout=timeout) cache_key = _extract_cache_key(url) lock_path = self._acquire_lock(cache_key) if not lock_path: logger.warning(f"Cache lock unavailable, downloading without cache: {url[:80]}...") return storage.download_file(url, dest, max_retries=max_retries, timeout=timeout) try: cache_path = self.get_cache_path(url) cached = os.path.exists(cache_path) and os.path.getsize(cache_path) > 0 if cached: # 命中缓存,复制到目标路径 try: shutil.copy2(cache_path, dest) # 更新访问时间(用于 LRU 清理) os.utime(cache_path, None) file_size = os.path.getsize(dest) logger.info(f"Cache hit: {url[:80]}... -> {dest} ({file_size} bytes)") return True except Exception as e: logger.warning(f"Failed to copy from cache: {e}, will re-download") # 缓存复制失败,删除可能损坏的缓存文件 try: os.remove(cache_path) except Exception: pass # 未命中缓存,下载到缓存目录 logger.debug(f"Cache miss: {url[:80]}...") # 先下载到临时文件(唯一文件名,避免并发覆盖) temp_cache_path = os.path.join( self.cache_dir, f"{cache_key}.{uuid.uuid4().hex}.downloading" ) try: if not storage.download_file(url, temp_cache_path, max_retries=max_retries, timeout=timeout): # 下载失败,清理临时文件 if os.path.exists(temp_cache_path): os.remove(temp_cache_path) return False if not os.path.exists(temp_cache_path) or os.path.getsize(temp_cache_path) <= 0: if os.path.exists(temp_cache_path): os.remove(temp_cache_path) return False # 下载成功,原子替换缓存文件 os.replace(temp_cache_path, cache_path) # 复制到目标路径 shutil.copy2(cache_path, dest) file_size = os.path.getsize(dest) logger.info(f"Downloaded and cached: {url[:80]}... ({file_size} bytes)") # 检查是否需要清理缓存 if self.max_size_bytes > 0: self._cleanup_if_needed() return True except Exception as e: logger.error(f"Cache download error: {e}") # 清理临时文件 if os.path.exists(temp_cache_path): try: os.remove(temp_cache_path) except Exception: pass return False finally: self._release_lock(lock_path) def add_to_cache(self, url: str, source_path: str) -> bool: """ 将本地文件添加到缓存 Args: url: 对应的 URL(用于生成缓存键) source_path: 本地文件路径 Returns: 是否成功 """ if not self.enabled: return False if not os.path.exists(source_path): logger.warning(f"Source file not found for cache: {source_path}") return False cache_key = _extract_cache_key(url) lock_path = self._acquire_lock(cache_key) if not lock_path: logger.warning(f"Cache lock unavailable for adding: {url[:80]}...") return False try: cache_path = self.get_cache_path(url) # 先复制到临时文件 temp_cache_path = os.path.join( self.cache_dir, f"{cache_key}.{uuid.uuid4().hex}.adding" ) shutil.copy2(source_path, temp_cache_path) # 原子替换 os.replace(temp_cache_path, cache_path) # 更新访问时间 os.utime(cache_path, None) logger.info(f"Added to cache: {url[:80]}... <- {source_path}") # 检查清理 if self.max_size_bytes > 0: self._cleanup_if_needed() return True except Exception as e: logger.error(f"Failed to add to cache: {e}") if 'temp_cache_path' in locals() and os.path.exists(temp_cache_path): try: os.remove(temp_cache_path) except Exception: pass return False finally: self._release_lock(lock_path) def _cleanup_if_needed(self) -> None: """ 检查并清理缓存(LRU 策略) 当缓存大小超过限制时,删除最久未访问的文件。 """ if self.max_size_bytes <= 0: return try: # 获取所有缓存文件及其信息 cache_files = [] total_size = 0 for filename in os.listdir(self.cache_dir): if filename.endswith('.downloading') or filename.endswith('.lock'): continue file_path = os.path.join(self.cache_dir, filename) if os.path.isfile(file_path): stat = os.stat(file_path) cache_files.append({ 'path': file_path, 'size': stat.st_size, 'atime': stat.st_atime }) total_size += stat.st_size # 如果未超过限制,无需清理 if total_size <= self.max_size_bytes: return # 按访问时间排序(最久未访问的在前) cache_files.sort(key=lambda x: x['atime']) # 删除文件直到低于限制的 80% target_size = int(self.max_size_bytes * 0.8) deleted_count = 0 for file_info in cache_files: if total_size <= target_size: break # 从文件名提取 cache_key,检查是否有锁(说明正在被使用) filename = os.path.basename(file_info['path']) cache_key = os.path.splitext(filename)[0] lock_path = self._get_lock_path(cache_key) if os.path.exists(lock_path): # 该文件正在被其他任务使用,跳过删除 logger.debug(f"Cache cleanup: skipping locked file {filename}") continue try: os.remove(file_info['path']) total_size -= file_info['size'] deleted_count += 1 except Exception as e: logger.warning(f"Failed to delete cache file: {e}") if deleted_count > 0: logger.info(f"Cache cleanup: deleted {deleted_count} files, current size: {total_size / (1024*1024*1024):.2f} GB") except Exception as e: logger.warning(f"Cache cleanup error: {e}") def clear(self) -> None: """清空所有缓存""" if not self.enabled: return try: if os.path.exists(self.cache_dir): shutil.rmtree(self.cache_dir) os.makedirs(self.cache_dir, exist_ok=True) logger.info("Cache cleared") except Exception as e: logger.error(f"Failed to clear cache: {e}") def get_stats(self) -> dict: """ 获取缓存统计信息 Returns: 包含缓存统计的字典 """ if not self.enabled or not os.path.exists(self.cache_dir): return {'enabled': False, 'file_count': 0, 'total_size_mb': 0} file_count = 0 total_size = 0 for filename in os.listdir(self.cache_dir): if filename.endswith('.downloading') or filename.endswith('.lock'): continue file_path = os.path.join(self.cache_dir, filename) if os.path.isfile(file_path): file_count += 1 total_size += os.path.getsize(file_path) return { 'enabled': True, 'cache_dir': self.cache_dir, 'file_count': file_count, 'total_size_mb': round(total_size / (1024 * 1024), 2), 'max_size_gb': self.max_size_bytes / (1024 * 1024 * 1024) if self.max_size_bytes > 0 else 0 }