feat(material-cache): 添加缓存锁机制防止并发冲突

- 实现跨进程缓存锁获取和释放功能
- 在下载过程中使用UUID生成唯一的临时文件名避免并发覆盖
- 添加超时机制和轮询间隔控制锁等待时间
- 修改清理逻辑跳过锁文件和下载中的临时文件
- 添加测试验证缓存锁功能正常工作

fix(ffmpeg): 优化FFmpeg命令执行和错误处理

- 添加默认日志级别为error减少冗余输出
- 修复subprocess运行参数传递方式
- 改进错误信息截取避免空值解码异常

refactor(system-info): 优化系统信息获取和缓存机制

- 实现FFmpeg版本、编解码器信息缓存避免重复查询
- 添加系统信息TTL缓存机制提升性能
- 实现GPU信息检查状态缓存避免重复检测
- 整合静态系统信息和动态信息分离处理

refactor(storage): 优化HTTP上传下载资源管理

- 使用上下文管理器确保请求连接正确关闭
- 修改rclone命令构建方式从字符串改为列表形式
- 改进错误处理截取stderr输出长度限制
- 优化响应处理避免资源泄露
This commit is contained in:
2026-01-19 20:03:18 +08:00
parent 0cc96a968b
commit b291f33486
6 changed files with 238 additions and 96 deletions

View File

@@ -139,6 +139,8 @@ AUDIO_ENCODE_ARGS = [
'-ac', '2', '-ac', '2',
] ]
FFMPEG_LOGLEVEL = 'error'
def subprocess_args(include_stdout: bool = True) -> Dict[str, Any]: def subprocess_args(include_stdout: bool = True) -> Dict[str, Any]:
""" """
@@ -486,22 +488,28 @@ class BaseHandler(TaskHandler, ABC):
if timeout is None: if timeout is None:
timeout = self.config.ffmpeg_timeout timeout = self.config.ffmpeg_timeout
cmd_to_run = list(cmd)
if cmd_to_run and cmd_to_run[0] == 'ffmpeg' and '-loglevel' not in cmd_to_run:
cmd_to_run[1:1] = ['-loglevel', FFMPEG_LOGLEVEL]
# 日志记录命令(限制长度) # 日志记录命令(限制长度)
cmd_str = ' '.join(cmd) cmd_str = ' '.join(cmd_to_run)
if len(cmd_str) > 500: if len(cmd_str) > 500:
cmd_str = cmd_str[:500] + '...' cmd_str = cmd_str[:500] + '...'
logger.info(f"[task:{task_id}] FFmpeg: {cmd_str}") logger.info(f"[task:{task_id}] FFmpeg: {cmd_str}")
try: try:
run_args = subprocess_args(False)
run_args['stdout'] = subprocess.DEVNULL
run_args['stderr'] = subprocess.PIPE
result = subprocess.run( result = subprocess.run(
cmd, cmd_to_run,
capture_output=True,
timeout=timeout, timeout=timeout,
**subprocess_args(False) **run_args
) )
if result.returncode != 0: if result.returncode != 0:
stderr = result.stderr.decode('utf-8', errors='replace')[:1000] stderr = (result.stderr or b'').decode('utf-8', errors='replace')[:1000]
logger.error(f"[task:{task_id}] FFmpeg failed (code={result.returncode}): {stderr}") logger.error(f"[task:{task_id}] FFmpeg failed (code={result.returncode}): {stderr}")
return False return False

View File

@@ -7,6 +7,7 @@ v2 API 客户端
import logging import logging
import subprocess import subprocess
import time
import requests import requests
from typing import Dict, List, Optional, Any from typing import Dict, List, Optional, Any
@@ -24,6 +25,8 @@ class APIClientV2:
负责与渲染服务端的所有 HTTP 通信。 负责与渲染服务端的所有 HTTP 通信。
""" """
SYSTEM_INFO_TTL_SECONDS = 30
def __init__(self, config: WorkerConfig): def __init__(self, config: WorkerConfig):
""" """
初始化 API 客户端 初始化 API 客户端
@@ -37,6 +40,15 @@ class APIClientV2:
self.worker_id = config.worker_id self.worker_id = config.worker_id
self.session = requests.Session() self.session = requests.Session()
self._ffmpeg_version: Optional[str] = None
self._codec_info: Optional[str] = None
self._hw_accel_info: Optional[str] = None
self._gpu_info: Optional[str] = None
self._gpu_info_checked = False
self._static_system_info: Optional[Dict[str, Any]] = None
self._system_info_cache: Optional[Dict[str, Any]] = None
self._system_info_cache_ts = 0.0
# 设置默认请求头 # 设置默认请求头
self.session.headers.update({ self.session.headers.update({
'Content-Type': 'application/json', 'Content-Type': 'application/json',
@@ -287,6 +299,8 @@ class APIClientV2:
def _get_ffmpeg_version(self) -> str: def _get_ffmpeg_version(self) -> str:
"""获取 FFmpeg 版本""" """获取 FFmpeg 版本"""
if self._ffmpeg_version is not None:
return self._ffmpeg_version
try: try:
result = subprocess.run( result = subprocess.run(
['ffmpeg', '-version'], ['ffmpeg', '-version'],
@@ -299,13 +313,18 @@ class APIClientV2:
parts = first_line.split() parts = first_line.split()
for i, part in enumerate(parts): for i, part in enumerate(parts):
if part == 'version' and i + 1 < len(parts): if part == 'version' and i + 1 < len(parts):
return parts[i + 1] self._ffmpeg_version = parts[i + 1]
return 'unknown' return self._ffmpeg_version
self._ffmpeg_version = 'unknown'
return self._ffmpeg_version
except Exception: except Exception:
return 'unknown' self._ffmpeg_version = 'unknown'
return self._ffmpeg_version
def _get_codec_info(self) -> str: def _get_codec_info(self) -> str:
"""获取支持的编解码器信息""" """获取支持的编解码器信息"""
if self._codec_info is not None:
return self._codec_info
try: try:
result = subprocess.run( result = subprocess.run(
['ffmpeg', '-codecs'], ['ffmpeg', '-codecs'],
@@ -324,37 +343,60 @@ class APIClientV2:
codecs.append('aac') codecs.append('aac')
if 'libfdk_aac' in output: if 'libfdk_aac' in output:
codecs.append('libfdk_aac') codecs.append('libfdk_aac')
return ', '.join(codecs) if codecs else 'unknown' self._codec_info = ', '.join(codecs) if codecs else 'unknown'
return self._codec_info
except Exception: except Exception:
return 'unknown' self._codec_info = 'unknown'
return self._codec_info
def _get_system_info(self) -> Dict[str, Any]: def _get_system_info(self) -> Dict[str, Any]:
"""获取系统信息""" """获取系统信息"""
try: try:
now = time.monotonic()
if (
self._system_info_cache
and now - self._system_info_cache_ts < self.SYSTEM_INFO_TTL_SECONDS
):
return self._system_info_cache
import platform import platform
import psutil import psutil
info = { if self._hw_accel_info is None:
self._hw_accel_info = get_hw_accel_info_str()
if self._static_system_info is None:
self._static_system_info = {
'os': platform.system(), 'os': platform.system(),
'cpu': f"{psutil.cpu_count()} cores", 'cpu': f"{psutil.cpu_count()} cores",
'memory': f"{psutil.virtual_memory().total // (1024**3)}GB", 'memory': f"{psutil.virtual_memory().total // (1024**3)}GB",
'hwAccelConfig': self.config.hw_accel, # 当前配置的硬件加速
'hwAccelSupport': self._hw_accel_info, # 系统支持的硬件加速
}
info = dict(self._static_system_info)
info.update({
'cpuUsage': f"{psutil.cpu_percent()}%", 'cpuUsage': f"{psutil.cpu_percent()}%",
'memoryAvailable': f"{psutil.virtual_memory().available // (1024**3)}GB", 'memoryAvailable': f"{psutil.virtual_memory().available // (1024**3)}GB",
'hwAccelConfig': self.config.hw_accel, # 当前配置的硬件加速 })
'hwAccelSupport': get_hw_accel_info_str(), # 系统支持的硬件加速
}
# 尝试获取 GPU 信息 # 尝试获取 GPU 信息
gpu_info = self._get_gpu_info() gpu_info = self._get_gpu_info()
if gpu_info: if gpu_info:
info['gpu'] = gpu_info info['gpu'] = gpu_info
self._system_info_cache = info
self._system_info_cache_ts = now
return info return info
except Exception: except Exception:
return {} return {}
def _get_gpu_info(self) -> Optional[str]: def _get_gpu_info(self) -> Optional[str]:
"""获取 GPU 信息""" """获取 GPU 信息"""
if self._gpu_info_checked:
return self._gpu_info
self._gpu_info_checked = True
try: try:
result = subprocess.run( result = subprocess.run(
['nvidia-smi', '--query-gpu=name', '--format=csv,noheader'], ['nvidia-smi', '--query-gpu=name', '--format=csv,noheader'],
@@ -364,10 +406,11 @@ class APIClientV2:
) )
if result.returncode == 0: if result.returncode == 0:
gpu_name = result.stdout.strip().split('\n')[0] gpu_name = result.stdout.strip().split('\n')[0]
return gpu_name self._gpu_info = gpu_name
except Exception: except Exception:
pass self._gpu_info = None
return None
return self._gpu_info
def close(self): def close(self):
"""关闭会话""" """关闭会话"""

View File

@@ -10,6 +10,7 @@ import hashlib
import logging import logging
import shutil import shutil
import time import time
import uuid
from typing import Optional, Tuple from typing import Optional, Tuple
from urllib.parse import urlparse, unquote from urllib.parse import urlparse, unquote
@@ -59,6 +60,9 @@ class MaterialCache:
负责素材文件的缓存存储和检索。 负责素材文件的缓存存储和检索。
""" """
LOCK_TIMEOUT_SEC = 30.0
LOCK_POLL_INTERVAL_SEC = 0.1
def __init__(self, cache_dir: str, enabled: bool = True, max_size_gb: float = 0): def __init__(self, cache_dir: str, enabled: bool = True, max_size_gb: float = 0):
""" """
初始化缓存管理器 初始化缓存管理器
@@ -91,6 +95,44 @@ class MaterialCache:
filename = f"{cache_key}{ext}" filename = f"{cache_key}{ext}"
return os.path.join(self.cache_dir, filename) return os.path.join(self.cache_dir, filename)
def _get_lock_path(self, cache_key: str) -> str:
"""获取缓存锁文件路径"""
assert self.cache_dir
return os.path.join(self.cache_dir, f"{cache_key}.lock")
def _acquire_lock(self, cache_key: str) -> Optional[str]:
"""获取缓存锁(跨进程安全)"""
if not self.enabled:
return None
lock_path = self._get_lock_path(cache_key)
deadline = time.monotonic() + self.LOCK_TIMEOUT_SEC
while True:
try:
fd = os.open(lock_path, os.O_CREAT | os.O_EXCL | os.O_WRONLY)
os.close(fd)
return lock_path
except FileExistsError:
if time.monotonic() >= deadline:
logger.warning(f"Cache lock timeout: {lock_path}")
return None
time.sleep(self.LOCK_POLL_INTERVAL_SEC)
except Exception as e:
logger.warning(f"Cache lock error: {e}")
return None
def _release_lock(self, lock_path: Optional[str]) -> None:
"""释放缓存锁"""
if not lock_path:
return
try:
os.remove(lock_path)
except FileNotFoundError:
return
except Exception as e:
logger.warning(f"Cache lock release error: {e}")
def is_cached(self, url: str) -> Tuple[bool, str]: def is_cached(self, url: str) -> Tuple[bool, str]:
""" """
检查素材是否已缓存 检查素材是否已缓存
@@ -136,8 +178,15 @@ class MaterialCache:
if not self.enabled: if not self.enabled:
return storage.download_file(url, dest, max_retries=max_retries, timeout=timeout) return storage.download_file(url, dest, max_retries=max_retries, timeout=timeout)
# 检查缓存 cache_key = _extract_cache_key(url)
cached, cache_path = self.is_cached(url) lock_path = self._acquire_lock(cache_key)
if not lock_path:
logger.warning(f"Cache lock unavailable, downloading without cache: {url[:80]}...")
return storage.download_file(url, dest, max_retries=max_retries, timeout=timeout)
try:
cache_path = self.get_cache_path(url)
cached = os.path.exists(cache_path) and os.path.getsize(cache_path) > 0
if cached: if cached:
# 命中缓存,复制到目标路径 # 命中缓存,复制到目标路径
@@ -159,8 +208,11 @@ class MaterialCache:
# 未命中缓存,下载到缓存目录 # 未命中缓存,下载到缓存目录
logger.debug(f"Cache miss: {url[:80]}...") logger.debug(f"Cache miss: {url[:80]}...")
# 先下载到临时文件 # 先下载到临时文件(唯一文件名,避免并发覆盖)
temp_cache_path = cache_path + '.downloading' temp_cache_path = os.path.join(
self.cache_dir,
f"{cache_key}.{uuid.uuid4().hex}.downloading"
)
try: try:
if not storage.download_file(url, temp_cache_path, max_retries=max_retries, timeout=timeout): if not storage.download_file(url, temp_cache_path, max_retries=max_retries, timeout=timeout):
# 下载失败,清理临时文件 # 下载失败,清理临时文件
@@ -168,10 +220,13 @@ class MaterialCache:
os.remove(temp_cache_path) os.remove(temp_cache_path)
return False return False
# 下载成功,移动到正式缓存路径 if not os.path.exists(temp_cache_path) or os.path.getsize(temp_cache_path) <= 0:
if os.path.exists(cache_path): if os.path.exists(temp_cache_path):
os.remove(cache_path) os.remove(temp_cache_path)
os.rename(temp_cache_path, cache_path) return False
# 下载成功,原子替换缓存文件
os.replace(temp_cache_path, cache_path)
# 复制到目标路径 # 复制到目标路径
shutil.copy2(cache_path, dest) shutil.copy2(cache_path, dest)
@@ -193,6 +248,8 @@ class MaterialCache:
except Exception: except Exception:
pass pass
return False return False
finally:
self._release_lock(lock_path)
def _cleanup_if_needed(self) -> None: def _cleanup_if_needed(self) -> None:
""" """
@@ -209,7 +266,7 @@ class MaterialCache:
total_size = 0 total_size = 0
for filename in os.listdir(self.cache_dir): for filename in os.listdir(self.cache_dir):
if filename.endswith('.downloading'): if filename.endswith('.downloading') or filename.endswith('.lock'):
continue continue
file_path = os.path.join(self.cache_dir, filename) file_path = os.path.join(self.cache_dir, filename)
if os.path.isfile(file_path): if os.path.isfile(file_path):
@@ -275,7 +332,7 @@ class MaterialCache:
total_size = 0 total_size = 0
for filename in os.listdir(self.cache_dir): for filename in os.listdir(self.cache_dir):
if filename.endswith('.downloading'): if filename.endswith('.downloading') or filename.endswith('.lock'):
continue continue
file_path = os.path.join(self.cache_dir, filename) file_path = os.path.join(self.cache_dir, filename)
if os.path.isfile(file_path): if os.path.isfile(file_path):

View File

@@ -7,6 +7,7 @@
import os import os
import logging import logging
import subprocess
from typing import Optional from typing import Optional
import requests import requests
@@ -73,13 +74,13 @@ def upload_file(url: str, file_path: str, max_retries: int = 5, timeout: int = 6
while retries < max_retries: while retries < max_retries:
try: try:
with open(file_path, 'rb') as f: with open(file_path, 'rb') as f:
response = requests.put( with requests.put(
http_url, http_url,
data=f, data=f,
stream=True, stream=True,
timeout=timeout, timeout=timeout,
headers={"Content-Type": "application/octet-stream"} headers={"Content-Type": "application/octet-stream"}
) ) as response:
response.raise_for_status() response.raise_for_status()
logger.info(f"Upload succeeded: {file_path}") logger.info(f"Upload succeeded: {file_path}")
return True return True
@@ -111,7 +112,6 @@ def _upload_with_rclone(url: str, file_path: str) -> bool:
return False return False
config_file = os.getenv("RCLONE_CONFIG_FILE", "") config_file = os.getenv("RCLONE_CONFIG_FILE", "")
rclone_config = f"--config {config_file}" if config_file else ""
# 替换 URL # 替换 URL
new_url = url new_url = url
@@ -123,19 +123,30 @@ def _upload_with_rclone(url: str, file_path: str) -> bool:
if new_url == url: if new_url == url:
return False return False
cmd = ( cmd = [
f"rclone copyto --no-check-dest --ignore-existing " "rclone",
f"--multi-thread-chunk-size 8M --multi-thread-streams 8 " "copyto",
f"{rclone_config} {file_path} {new_url}" "--no-check-dest",
) "--ignore-existing",
logger.debug(f"rclone command: {cmd}") "--multi-thread-chunk-size",
"8M",
"--multi-thread-streams",
"8",
]
if config_file:
cmd.extend(["--config", config_file])
cmd.extend([file_path, new_url])
result = os.system(cmd) logger.debug(f"rclone command: {' '.join(cmd)}")
if result == 0:
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
logger.info(f"rclone upload succeeded: {file_path}") logger.info(f"rclone upload succeeded: {file_path}")
return True return True
logger.warning(f"rclone upload failed (code={result}): {file_path}") stderr = (result.stderr or '').strip()
stderr = stderr[:500] if stderr else ""
logger.warning(f"rclone upload failed (code={result.returncode}): {file_path} {stderr}")
return False return False
@@ -177,7 +188,7 @@ def download_file(
retries = 0 retries = 0
while retries < max_retries: while retries < max_retries:
try: try:
response = requests.get(http_url, timeout=timeout, stream=True) with requests.get(http_url, timeout=timeout, stream=True) as response:
response.raise_for_status() response.raise_for_status()
with open(file_path, 'wb') as f: with open(file_path, 'wb') as f:

View File

@@ -137,6 +137,14 @@ class TaskExecutor:
logger.warning(f"[task:{task.task_id}] Task already running, skipping") logger.warning(f"[task:{task.task_id}] Task already running, skipping")
return False return False
# 检查并发上限
if len(self.current_tasks) >= self.config.max_concurrency:
logger.info(
f"[task:{task.task_id}] Max concurrency reached "
f"({self.config.max_concurrency}), rejecting task"
)
return False
# 检查是否有对应的处理器 # 检查是否有对应的处理器
if task.task_type not in self.handlers: if task.task_type not in self.handlers:
logger.error(f"[task:{task.task_id}] No handler for type: {task.task_type.value}") logger.error(f"[task:{task.task_id}] No handler for type: {task.task_type.value}")

View File

@@ -0,0 +1,15 @@
# -*- coding: utf-8 -*-
import os
from services.cache import MaterialCache, _extract_cache_key
def test_cache_lock_acquire_release(tmp_path):
cache = MaterialCache(cache_dir=str(tmp_path), enabled=True, max_size_gb=0)
cache_key = _extract_cache_key("https://example.com/path/file.mp4?token=abc")
lock_path = cache._acquire_lock(cache_key)
assert lock_path
assert os.path.exists(lock_path)
cache._release_lock(lock_path)
assert not os.path.exists(lock_path)