5 Commits

Author SHA1 Message Date
a70573395b feat(cache): 增强缓存锁机制支持进程存活检测
- 添加了锁元数据写入和读取功能,记录进程ID和启动时间
- 实现了进程存活检查机制,防止PID复用导致的死锁
- 引入了过期锁检测和自动清理机制
- 集成了psutil库进行系统进程监控
- 优化了缓存清理逻辑,支持跳过活跃锁文件
- 使用JSON格式存储锁元数据信息
2026-01-28 23:41:53 +08:00
ffb9d5390e feat(video): 添加视频渲染的宽高参数支持
- 在 render_video 函数中添加 width 和 height 参数传递
- 为 overlay 功能添加 scale 滤镜支持
- 更新 filter_complex 字符串以包含尺寸缩放逻辑
- 修改 overlay 处理流程以正确应用指定尺寸
- 添加相关参数的文档说明
2026-01-27 17:03:56 +08:00
6126856361 feat(cache): 实现上传文件缓存功能
- 在文件上传成功后将文件加入缓存系统
- 添加 add_to_cache 方法支持本地文件缓存
- 实现原子操作确保缓存写入安全
- 集成锁机制防止并发冲突
- 自动触发缓存清理策略
- 记录详细的缓存操作日志
2026-01-26 10:41:26 +08:00
a6263398ed fix(storage): 解决URL编码字符处理问题
- 添加了URL解码功能以处理编码字符(如%2F转换为/)
- 修复了URL匹配逻辑中的编码问题
- 确保替换操作正确处理已编码的路径字符
2026-01-24 23:33:50 +08:00
885b69233a feat(storage): 添加上传日志记录功能
- 导入 urllib.parse.unquote 模块用于 URL 解码
- 在使用 rclone 上传时添加上传目标 URL 的日志记录
- 便于调试和监控文件上传过程
2026-01-24 23:29:37 +08:00
4 changed files with 187 additions and 12 deletions

View File

@@ -464,6 +464,11 @@ class BaseHandler(TaskHandler, ABC):
if result: if result:
file_size = os.path.getsize(file_path) file_size = os.path.getsize(file_path)
logger.info(f"[task:{task_id}] Uploaded: {file_path} ({file_size} bytes)") logger.info(f"[task:{task_id}] Uploaded: {file_path} ({file_size} bytes)")
# 将上传成功的文件加入缓存
if access_url:
self.material_cache.add_to_cache(access_url, file_path)
return access_url return access_url
else: else:
logger.error(f"[task:{task_id}] Upload failed: {file_path}") logger.error(f"[task:{task_id}] Upload failed: {file_path}")

View File

@@ -466,6 +466,8 @@ class RenderSegmentVideoHandler(BaseHandler):
base_filters=filters, base_filters=filters,
effects=effects, effects=effects,
fps=fps, fps=fps,
width=width,
height=height,
has_overlay=has_overlay, has_overlay=has_overlay,
is_video_overlay=is_video_overlay, is_video_overlay=is_video_overlay,
overlap_head_ms=overlap_head_ms, overlap_head_ms=overlap_head_ms,
@@ -492,12 +494,13 @@ class RenderSegmentVideoHandler(BaseHandler):
if has_overlay: if has_overlay:
# 使用 filter_complex 格式 # 使用 filter_complex 格式
base_filters = ','.join(filters) if filters else 'copy' base_filters = ','.join(filters) if filters else 'copy'
overlay_scale = f"scale={width}:{height}"
# 视频 overlay 使用 eof_action=pass(结束后消失),图片 overlay 使用默认行为(保持显示) # 视频 overlay 使用 eof_action=pass(结束后消失),图片 overlay 使用默认行为(保持显示)
overlay_params = 'eof_action=pass' if is_video_overlay else '' overlay_params = 'eof_action=pass' if is_video_overlay else ''
overlay_filter = f"overlay=0:0:{overlay_params}" if overlay_params else 'overlay=0:0' overlay_filter = f"overlay=0:0:{overlay_params}" if overlay_params else 'overlay=0:0'
# 视频 overlay 需要在末尾统一颜色范围,避免 overlay 结束后 range 从 tv 变为 pc # 视频 overlay 需要在末尾统一颜色范围,避免 overlay 结束后 range 从 tv 变为 pc
range_fix = ',format=yuv420p,setrange=tv' if is_video_overlay else '' range_fix = ',format=yuv420p,setrange=tv' if is_video_overlay else ''
return f"[0:v]{base_filters}[base];[base][1:v]{overlay_filter}{range_fix}" return f"[0:v]{base_filters}[base];[1:v]{overlay_scale}[overlay];[base][overlay]{overlay_filter}{range_fix}"
else: else:
return ','.join(filters) if filters else '' return ','.join(filters) if filters else ''
@@ -506,6 +509,8 @@ class RenderSegmentVideoHandler(BaseHandler):
base_filters: List[str], base_filters: List[str],
effects: List[Effect], effects: List[Effect],
fps: int, fps: int,
width: int,
height: int,
has_overlay: bool = False, has_overlay: bool = False,
is_video_overlay: bool = False, is_video_overlay: bool = False,
overlap_head_ms: int = 0, overlap_head_ms: int = 0,
@@ -521,6 +526,8 @@ class RenderSegmentVideoHandler(BaseHandler):
base_filters: 基础滤镜列表 base_filters: 基础滤镜列表
effects: 特效列表 effects: 特效列表
fps: 帧率 fps: 帧率
width: 输出宽度
height: 输出高度
has_overlay: 是否有叠加层 has_overlay: 是否有叠加层
is_video_overlay: 叠加层是否为视频格式(如 .mov) is_video_overlay: 叠加层是否为视频格式(如 .mov)
overlap_head_ms: 头部 overlap 时长 overlap_head_ms: 头部 overlap 时长
@@ -604,9 +611,12 @@ class RenderSegmentVideoHandler(BaseHandler):
# 视频 overlay 使用 eof_action=pass(结束后消失),图片 overlay 使用默认行为(保持显示) # 视频 overlay 使用 eof_action=pass(结束后消失),图片 overlay 使用默认行为(保持显示)
overlay_params = 'eof_action=pass' if is_video_overlay else '' overlay_params = 'eof_action=pass' if is_video_overlay else ''
overlay_filter = f"overlay=0:0:{overlay_params}" if overlay_params else 'overlay=0:0' overlay_filter = f"overlay=0:0:{overlay_params}" if overlay_params else 'overlay=0:0'
overlay_scale = f"scale={width}:{height}"
overlay_output = '[v_overlay]'
# 视频 overlay 需要在末尾统一颜色范围,避免 overlay 结束后 range 从 tv 变为 pc # 视频 overlay 需要在末尾统一颜色范围,避免 overlay 结束后 range 从 tv 变为 pc
range_fix = ',format=yuv420p,setrange=tv' if is_video_overlay else '' range_fix = ',format=yuv420p,setrange=tv' if is_video_overlay else ''
filter_parts.append(f"{current_output}[1:v]{overlay_filter}{range_fix}") filter_parts.append(f"[1:v]{overlay_scale}{overlay_output}")
filter_parts.append(f"{current_output}{overlay_output}{overlay_filter}{range_fix}")
else: else:
# 移除最后一个标签,直接输出 # 移除最后一个标签,直接输出
# 将最后一个滤镜的输出标签替换为空(直接输出) # 将最后一个滤镜的输出标签替换为空(直接输出)

View File

@@ -5,6 +5,7 @@
提供素材下载缓存功能,避免相同素材重复下载。 提供素材下载缓存功能,避免相同素材重复下载。
""" """
import json
import os import os
import hashlib import hashlib
import logging import logging
@@ -14,6 +15,8 @@ import uuid
from typing import Optional, Tuple from typing import Optional, Tuple
from urllib.parse import urlparse, unquote from urllib.parse import urlparse, unquote
import psutil
from services import storage from services import storage
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -62,6 +65,7 @@ class MaterialCache:
LOCK_TIMEOUT_SEC = 30.0 LOCK_TIMEOUT_SEC = 30.0
LOCK_POLL_INTERVAL_SEC = 0.1 LOCK_POLL_INTERVAL_SEC = 0.1
LOCK_STALE_SECONDS = 24 * 60 * 60
def __init__(self, cache_dir: str, enabled: bool = True, max_size_gb: float = 0): def __init__(self, cache_dir: str, enabled: bool = True, max_size_gb: float = 0):
""" """
@@ -100,6 +104,96 @@ class MaterialCache:
assert self.cache_dir assert self.cache_dir
return os.path.join(self.cache_dir, f"{cache_key}.lock") return os.path.join(self.cache_dir, f"{cache_key}.lock")
def _write_lock_metadata(self, lock_fd: int, lock_path: str) -> bool:
"""写入锁元数据,失败则清理锁文件"""
try:
try:
process_start_time = psutil.Process(os.getpid()).create_time()
except Exception as e:
process_start_time = None
logger.warning(f"Cache lock process start time error: {e}")
metadata = {
'pid': os.getpid(),
'process_start_time': process_start_time,
'created_at': time.time()
}
with os.fdopen(lock_fd, 'w', encoding='utf-8') as lock_file:
json.dump(metadata, lock_file)
return True
except Exception as e:
try:
os.close(lock_fd)
except Exception:
pass
self._remove_lock_file(lock_path, f"write metadata failed: {e}")
return False
def _read_lock_metadata(self, lock_path: str) -> Optional[dict]:
"""读取锁元数据,失败返回 None(兼容历史空锁文件)"""
try:
with open(lock_path, 'r', encoding='utf-8') as lock_file:
data = json.load(lock_file)
return data if isinstance(data, dict) else None
except Exception:
return None
def _is_process_alive(self, pid: int, expected_start_time: Optional[float]) -> bool:
"""判断进程是否存活并校验启动时间(防止 PID 复用)"""
try:
process = psutil.Process(pid)
if expected_start_time is None:
return process.is_running()
actual_start_time = process.create_time()
return abs(actual_start_time - expected_start_time) < 1.0
except psutil.NoSuchProcess:
return False
except Exception as e:
logger.warning(f"Cache lock process check error: {e}")
return True
def _is_lock_stale(self, lock_path: str) -> bool:
"""判断锁是否过期(进程已退出或超过最大存活时长)"""
if not os.path.exists(lock_path):
return False
now = time.time()
metadata = self._read_lock_metadata(lock_path)
if metadata:
created_at = metadata.get('created_at')
if isinstance(created_at, (int, float)) and now - created_at > self.LOCK_STALE_SECONDS:
return True
pid = metadata.get('pid')
pid_value = int(pid) if isinstance(pid, int) or (isinstance(pid, str) and pid.isdigit()) else None
expected_start_time = metadata.get('process_start_time')
expected_start_time_value = (
expected_start_time if isinstance(expected_start_time, (int, float)) else None
)
if pid_value is not None and not self._is_process_alive(pid_value, expected_start_time_value):
return True
return self._is_lock_stale_by_mtime(lock_path, now)
return self._is_lock_stale_by_mtime(lock_path, now)
def _is_lock_stale_by_mtime(self, lock_path: str, now: float) -> bool:
"""基于文件时间判断锁是否过期"""
try:
mtime = os.path.getmtime(lock_path)
return now - mtime > self.LOCK_STALE_SECONDS
except Exception as e:
logger.warning(f"Cache lock stat error: {e}")
return False
def _remove_lock_file(self, lock_path: str, reason: str = "") -> bool:
"""删除锁文件"""
try:
os.remove(lock_path)
if reason:
logger.info(f"Cache lock removed: {lock_path} ({reason})")
return True
except FileNotFoundError:
return True
except Exception as e:
logger.warning(f"Cache lock remove error: {e}")
return False
def _acquire_lock(self, cache_key: str) -> Optional[str]: def _acquire_lock(self, cache_key: str) -> Optional[str]:
"""获取缓存锁(跨进程安全)""" """获取缓存锁(跨进程安全)"""
if not self.enabled: if not self.enabled:
@@ -111,9 +205,14 @@ class MaterialCache:
while True: while True:
try: try:
fd = os.open(lock_path, os.O_CREAT | os.O_EXCL | os.O_WRONLY) fd = os.open(lock_path, os.O_CREAT | os.O_EXCL | os.O_WRONLY)
os.close(fd) if not self._write_lock_metadata(fd, lock_path):
return None
return lock_path return lock_path
except FileExistsError: except FileExistsError:
if self._is_lock_stale(lock_path):
removed = self._remove_lock_file(lock_path, "stale lock")
if removed:
continue
if time.monotonic() >= deadline: if time.monotonic() >= deadline:
logger.warning(f"Cache lock timeout: {lock_path}") logger.warning(f"Cache lock timeout: {lock_path}")
return None return None
@@ -126,12 +225,7 @@ class MaterialCache:
"""释放缓存锁""" """释放缓存锁"""
if not lock_path: if not lock_path:
return return
try: self._remove_lock_file(lock_path)
os.remove(lock_path)
except FileNotFoundError:
return
except Exception as e:
logger.warning(f"Cache lock release error: {e}")
def is_cached(self, url: str) -> Tuple[bool, str]: def is_cached(self, url: str) -> Tuple[bool, str]:
""" """
@@ -251,6 +345,66 @@ class MaterialCache:
finally: finally:
self._release_lock(lock_path) self._release_lock(lock_path)
def add_to_cache(self, url: str, source_path: str) -> bool:
"""
将本地文件添加到缓存
Args:
url: 对应的 URL(用于生成缓存键)
source_path: 本地文件路径
Returns:
是否成功
"""
if not self.enabled:
return False
if not os.path.exists(source_path):
logger.warning(f"Source file not found for cache: {source_path}")
return False
cache_key = _extract_cache_key(url)
lock_path = self._acquire_lock(cache_key)
if not lock_path:
logger.warning(f"Cache lock unavailable for adding: {url[:80]}...")
return False
try:
cache_path = self.get_cache_path(url)
# 先复制到临时文件
temp_cache_path = os.path.join(
self.cache_dir,
f"{cache_key}.{uuid.uuid4().hex}.adding"
)
shutil.copy2(source_path, temp_cache_path)
# 原子替换
os.replace(temp_cache_path, cache_path)
# 更新访问时间
os.utime(cache_path, None)
logger.info(f"Added to cache: {url[:80]}... <- {source_path}")
# 检查清理
if self.max_size_bytes > 0:
self._cleanup_if_needed()
return True
except Exception as e:
logger.error(f"Failed to add to cache: {e}")
if 'temp_cache_path' in locals() and os.path.exists(temp_cache_path):
try:
os.remove(temp_cache_path)
except Exception:
pass
return False
finally:
self._release_lock(lock_path)
def _cleanup_if_needed(self) -> None: def _cleanup_if_needed(self) -> None:
""" """
检查并清理缓存(LRU 策略) 检查并清理缓存(LRU 策略)
@@ -297,9 +451,12 @@ class MaterialCache:
cache_key = os.path.splitext(filename)[0] cache_key = os.path.splitext(filename)[0]
lock_path = self._get_lock_path(cache_key) lock_path = self._get_lock_path(cache_key)
if os.path.exists(lock_path): if os.path.exists(lock_path):
# 该文件正在被其他任务使用,跳过删除 if self._is_lock_stale(lock_path):
logger.debug(f"Cache cleanup: skipping locked file {filename}") self._remove_lock_file(lock_path, "cleanup stale lock")
continue else:
# 该文件正在被其他任务使用,跳过删除
logger.debug(f"Cache cleanup: skipping locked file {filename}")
continue
try: try:
os.remove(file_info['path']) os.remove(file_info['path'])
total_size -= file_info['size'] total_size -= file_info['size']

View File

@@ -9,6 +9,7 @@ import os
import logging import logging
import subprocess import subprocess
from typing import Optional from typing import Optional
from urllib.parse import unquote
import requests import requests
@@ -85,6 +86,7 @@ def upload_file(url: str, file_path: str, max_retries: int = 5, timeout: int = 6
# 检查是否使用 rclone 上传 # 检查是否使用 rclone 上传
if os.getenv("UPLOAD_METHOD") == "rclone": if os.getenv("UPLOAD_METHOD") == "rclone":
logger.info(f"Uploading to: {url}")
result = _upload_with_rclone(url, file_path) result = _upload_with_rclone(url, file_path)
if result: if result:
return True return True
@@ -144,6 +146,7 @@ def _upload_with_rclone(url: str, file_path: str) -> bool:
for src, dst in replace_list: for src, dst in replace_list:
new_url = new_url.replace(src, dst) new_url = new_url.replace(src, dst)
new_url = new_url.split("?", 1)[0] # 移除查询参数 new_url = new_url.split("?", 1)[0] # 移除查询参数
new_url = unquote(new_url) # 解码 URL 编码的字符(如 %2F -> /)
if new_url == url: if new_url == url:
return False return False