Files
FrameTour-RenderWorker/services/cache.py
Jerry Yan a70573395b feat(cache): 增强缓存锁机制支持进程存活检测
- 添加了锁元数据写入和读取功能,记录进程ID和启动时间
- 实现了进程存活检查机制,防止PID复用导致的死锁
- 引入了过期锁检测和自动清理机制
- 集成了psutil库进行系统进程监控
- 优化了缓存清理逻辑,支持跳过活跃锁文件
- 使用JSON格式存储锁元数据信息
2026-01-28 23:41:53 +08:00

514 lines
18 KiB
Python

# -*- coding: utf-8 -*-
"""
素材缓存服务
提供素材下载缓存功能,避免相同素材重复下载。
"""
import json
import os
import hashlib
import logging
import shutil
import time
import uuid
from typing import Optional, Tuple
from urllib.parse import urlparse, unquote
import psutil
from services import storage
logger = logging.getLogger(__name__)
def _extract_cache_key(url: str) -> str:
"""
从 URL 提取缓存键
去除签名等查询参数,保留路径作为唯一标识。
Args:
url: 完整的素材 URL
Returns:
缓存键(URL 路径的 MD5 哈希)
"""
parsed = urlparse(url)
# 使用 scheme + host + path 作为唯一标识(忽略签名等查询参数)
cache_key_source = f"{parsed.scheme}://{parsed.netloc}{unquote(parsed.path)}"
return hashlib.md5(cache_key_source.encode('utf-8')).hexdigest()
def _get_file_extension(url: str) -> str:
"""
从 URL 提取文件扩展名
Args:
url: 素材 URL
Returns:
文件扩展名(如 .mp4, .png),无法识别时返回空字符串
"""
parsed = urlparse(url)
path = unquote(parsed.path)
_, ext = os.path.splitext(path)
return ext.lower() if ext else ''
class MaterialCache:
"""
素材缓存管理器
负责素材文件的缓存存储和检索。
"""
LOCK_TIMEOUT_SEC = 30.0
LOCK_POLL_INTERVAL_SEC = 0.1
LOCK_STALE_SECONDS = 24 * 60 * 60
def __init__(self, cache_dir: str, enabled: bool = True, max_size_gb: float = 0):
"""
初始化缓存管理器
Args:
cache_dir: 缓存目录路径
enabled: 是否启用缓存
max_size_gb: 最大缓存大小(GB),0 表示不限制
"""
self.cache_dir = cache_dir
self.enabled = enabled
self.max_size_bytes = int(max_size_gb * 1024 * 1024 * 1024) if max_size_gb > 0 else 0
if self.enabled:
os.makedirs(self.cache_dir, exist_ok=True)
logger.info(f"Material cache initialized: {cache_dir}")
def get_cache_path(self, url: str) -> str:
"""
获取素材的缓存文件路径
Args:
url: 素材 URL
Returns:
缓存文件的完整路径
"""
cache_key = _extract_cache_key(url)
ext = _get_file_extension(url)
filename = f"{cache_key}{ext}"
return os.path.join(self.cache_dir, filename)
def _get_lock_path(self, cache_key: str) -> str:
"""获取缓存锁文件路径"""
assert self.cache_dir
return os.path.join(self.cache_dir, f"{cache_key}.lock")
def _write_lock_metadata(self, lock_fd: int, lock_path: str) -> bool:
"""写入锁元数据,失败则清理锁文件"""
try:
try:
process_start_time = psutil.Process(os.getpid()).create_time()
except Exception as e:
process_start_time = None
logger.warning(f"Cache lock process start time error: {e}")
metadata = {
'pid': os.getpid(),
'process_start_time': process_start_time,
'created_at': time.time()
}
with os.fdopen(lock_fd, 'w', encoding='utf-8') as lock_file:
json.dump(metadata, lock_file)
return True
except Exception as e:
try:
os.close(lock_fd)
except Exception:
pass
self._remove_lock_file(lock_path, f"write metadata failed: {e}")
return False
def _read_lock_metadata(self, lock_path: str) -> Optional[dict]:
"""读取锁元数据,失败返回 None(兼容历史空锁文件)"""
try:
with open(lock_path, 'r', encoding='utf-8') as lock_file:
data = json.load(lock_file)
return data if isinstance(data, dict) else None
except Exception:
return None
def _is_process_alive(self, pid: int, expected_start_time: Optional[float]) -> bool:
"""判断进程是否存活并校验启动时间(防止 PID 复用)"""
try:
process = psutil.Process(pid)
if expected_start_time is None:
return process.is_running()
actual_start_time = process.create_time()
return abs(actual_start_time - expected_start_time) < 1.0
except psutil.NoSuchProcess:
return False
except Exception as e:
logger.warning(f"Cache lock process check error: {e}")
return True
def _is_lock_stale(self, lock_path: str) -> bool:
"""判断锁是否过期(进程已退出或超过最大存活时长)"""
if not os.path.exists(lock_path):
return False
now = time.time()
metadata = self._read_lock_metadata(lock_path)
if metadata:
created_at = metadata.get('created_at')
if isinstance(created_at, (int, float)) and now - created_at > self.LOCK_STALE_SECONDS:
return True
pid = metadata.get('pid')
pid_value = int(pid) if isinstance(pid, int) or (isinstance(pid, str) and pid.isdigit()) else None
expected_start_time = metadata.get('process_start_time')
expected_start_time_value = (
expected_start_time if isinstance(expected_start_time, (int, float)) else None
)
if pid_value is not None and not self._is_process_alive(pid_value, expected_start_time_value):
return True
return self._is_lock_stale_by_mtime(lock_path, now)
return self._is_lock_stale_by_mtime(lock_path, now)
def _is_lock_stale_by_mtime(self, lock_path: str, now: float) -> bool:
"""基于文件时间判断锁是否过期"""
try:
mtime = os.path.getmtime(lock_path)
return now - mtime > self.LOCK_STALE_SECONDS
except Exception as e:
logger.warning(f"Cache lock stat error: {e}")
return False
def _remove_lock_file(self, lock_path: str, reason: str = "") -> bool:
"""删除锁文件"""
try:
os.remove(lock_path)
if reason:
logger.info(f"Cache lock removed: {lock_path} ({reason})")
return True
except FileNotFoundError:
return True
except Exception as e:
logger.warning(f"Cache lock remove error: {e}")
return False
def _acquire_lock(self, cache_key: str) -> Optional[str]:
"""获取缓存锁(跨进程安全)"""
if not self.enabled:
return None
lock_path = self._get_lock_path(cache_key)
deadline = time.monotonic() + self.LOCK_TIMEOUT_SEC
while True:
try:
fd = os.open(lock_path, os.O_CREAT | os.O_EXCL | os.O_WRONLY)
if not self._write_lock_metadata(fd, lock_path):
return None
return lock_path
except FileExistsError:
if self._is_lock_stale(lock_path):
removed = self._remove_lock_file(lock_path, "stale lock")
if removed:
continue
if time.monotonic() >= deadline:
logger.warning(f"Cache lock timeout: {lock_path}")
return None
time.sleep(self.LOCK_POLL_INTERVAL_SEC)
except Exception as e:
logger.warning(f"Cache lock error: {e}")
return None
def _release_lock(self, lock_path: Optional[str]) -> None:
"""释放缓存锁"""
if not lock_path:
return
self._remove_lock_file(lock_path)
def is_cached(self, url: str) -> Tuple[bool, str]:
"""
检查素材是否已缓存
Args:
url: 素材 URL
Returns:
(是否已缓存, 缓存文件路径)
"""
if not self.enabled:
return False, ''
cache_path = self.get_cache_path(url)
exists = os.path.exists(cache_path) and os.path.getsize(cache_path) > 0
return exists, cache_path
def get_or_download(
self,
url: str,
dest: str,
timeout: int = 300,
max_retries: int = 5
) -> bool:
"""
从缓存获取素材,若未缓存则下载并缓存
Args:
url: 素材 URL
dest: 目标文件路径(任务工作目录中的路径)
timeout: 下载超时时间(秒)
max_retries: 最大重试次数
Returns:
是否成功
"""
# 确保目标目录存在
dest_dir = os.path.dirname(dest)
if dest_dir:
os.makedirs(dest_dir, exist_ok=True)
# 缓存未启用时直接下载
if not self.enabled:
return storage.download_file(url, dest, max_retries=max_retries, timeout=timeout)
cache_key = _extract_cache_key(url)
lock_path = self._acquire_lock(cache_key)
if not lock_path:
logger.warning(f"Cache lock unavailable, downloading without cache: {url[:80]}...")
return storage.download_file(url, dest, max_retries=max_retries, timeout=timeout)
try:
cache_path = self.get_cache_path(url)
cached = os.path.exists(cache_path) and os.path.getsize(cache_path) > 0
if cached:
# 命中缓存,复制到目标路径
try:
shutil.copy2(cache_path, dest)
# 更新访问时间(用于 LRU 清理)
os.utime(cache_path, None)
file_size = os.path.getsize(dest)
logger.info(f"Cache hit: {url[:80]}... -> {dest} ({file_size} bytes)")
return True
except Exception as e:
logger.warning(f"Failed to copy from cache: {e}, will re-download")
# 缓存复制失败,删除可能损坏的缓存文件
try:
os.remove(cache_path)
except Exception:
pass
# 未命中缓存,下载到缓存目录
logger.debug(f"Cache miss: {url[:80]}...")
# 先下载到临时文件(唯一文件名,避免并发覆盖)
temp_cache_path = os.path.join(
self.cache_dir,
f"{cache_key}.{uuid.uuid4().hex}.downloading"
)
try:
if not storage.download_file(url, temp_cache_path, max_retries=max_retries, timeout=timeout):
# 下载失败,清理临时文件
if os.path.exists(temp_cache_path):
os.remove(temp_cache_path)
return False
if not os.path.exists(temp_cache_path) or os.path.getsize(temp_cache_path) <= 0:
if os.path.exists(temp_cache_path):
os.remove(temp_cache_path)
return False
# 下载成功,原子替换缓存文件
os.replace(temp_cache_path, cache_path)
# 复制到目标路径
shutil.copy2(cache_path, dest)
file_size = os.path.getsize(dest)
logger.info(f"Downloaded and cached: {url[:80]}... ({file_size} bytes)")
# 检查是否需要清理缓存
if self.max_size_bytes > 0:
self._cleanup_if_needed()
return True
except Exception as e:
logger.error(f"Cache download error: {e}")
# 清理临时文件
if os.path.exists(temp_cache_path):
try:
os.remove(temp_cache_path)
except Exception:
pass
return False
finally:
self._release_lock(lock_path)
def add_to_cache(self, url: str, source_path: str) -> bool:
"""
将本地文件添加到缓存
Args:
url: 对应的 URL(用于生成缓存键)
source_path: 本地文件路径
Returns:
是否成功
"""
if not self.enabled:
return False
if not os.path.exists(source_path):
logger.warning(f"Source file not found for cache: {source_path}")
return False
cache_key = _extract_cache_key(url)
lock_path = self._acquire_lock(cache_key)
if not lock_path:
logger.warning(f"Cache lock unavailable for adding: {url[:80]}...")
return False
try:
cache_path = self.get_cache_path(url)
# 先复制到临时文件
temp_cache_path = os.path.join(
self.cache_dir,
f"{cache_key}.{uuid.uuid4().hex}.adding"
)
shutil.copy2(source_path, temp_cache_path)
# 原子替换
os.replace(temp_cache_path, cache_path)
# 更新访问时间
os.utime(cache_path, None)
logger.info(f"Added to cache: {url[:80]}... <- {source_path}")
# 检查清理
if self.max_size_bytes > 0:
self._cleanup_if_needed()
return True
except Exception as e:
logger.error(f"Failed to add to cache: {e}")
if 'temp_cache_path' in locals() and os.path.exists(temp_cache_path):
try:
os.remove(temp_cache_path)
except Exception:
pass
return False
finally:
self._release_lock(lock_path)
def _cleanup_if_needed(self) -> None:
"""
检查并清理缓存(LRU 策略)
当缓存大小超过限制时,删除最久未访问的文件。
"""
if self.max_size_bytes <= 0:
return
try:
# 获取所有缓存文件及其信息
cache_files = []
total_size = 0
for filename in os.listdir(self.cache_dir):
if filename.endswith('.downloading') or filename.endswith('.lock'):
continue
file_path = os.path.join(self.cache_dir, filename)
if os.path.isfile(file_path):
stat = os.stat(file_path)
cache_files.append({
'path': file_path,
'size': stat.st_size,
'atime': stat.st_atime
})
total_size += stat.st_size
# 如果未超过限制,无需清理
if total_size <= self.max_size_bytes:
return
# 按访问时间排序(最久未访问的在前)
cache_files.sort(key=lambda x: x['atime'])
# 删除文件直到低于限制的 80%
target_size = int(self.max_size_bytes * 0.8)
deleted_count = 0
for file_info in cache_files:
if total_size <= target_size:
break
# 从文件名提取 cache_key,检查是否有锁(说明正在被使用)
filename = os.path.basename(file_info['path'])
cache_key = os.path.splitext(filename)[0]
lock_path = self._get_lock_path(cache_key)
if os.path.exists(lock_path):
if self._is_lock_stale(lock_path):
self._remove_lock_file(lock_path, "cleanup stale lock")
else:
# 该文件正在被其他任务使用,跳过删除
logger.debug(f"Cache cleanup: skipping locked file {filename}")
continue
try:
os.remove(file_info['path'])
total_size -= file_info['size']
deleted_count += 1
except Exception as e:
logger.warning(f"Failed to delete cache file: {e}")
if deleted_count > 0:
logger.info(f"Cache cleanup: deleted {deleted_count} files, current size: {total_size / (1024*1024*1024):.2f} GB")
except Exception as e:
logger.warning(f"Cache cleanup error: {e}")
def clear(self) -> None:
"""清空所有缓存"""
if not self.enabled:
return
try:
if os.path.exists(self.cache_dir):
shutil.rmtree(self.cache_dir)
os.makedirs(self.cache_dir, exist_ok=True)
logger.info("Cache cleared")
except Exception as e:
logger.error(f"Failed to clear cache: {e}")
def get_stats(self) -> dict:
"""
获取缓存统计信息
Returns:
包含缓存统计的字典
"""
if not self.enabled or not os.path.exists(self.cache_dir):
return {'enabled': False, 'file_count': 0, 'total_size_mb': 0}
file_count = 0
total_size = 0
for filename in os.listdir(self.cache_dir):
if filename.endswith('.downloading') or filename.endswith('.lock'):
continue
file_path = os.path.join(self.cache_dir, filename)
if os.path.isfile(file_path):
file_count += 1
total_size += os.path.getsize(file_path)
return {
'enabled': True,
'cache_dir': self.cache_dir,
'file_count': file_count,
'total_size_mb': round(total_size / (1024 * 1024), 2),
'max_size_gb': self.max_size_bytes / (1024 * 1024 * 1024) if self.max_size_bytes > 0 else 0
}