7 Commits

Author SHA1 Message Date
a70573395b feat(cache): 增强缓存锁机制支持进程存活检测
- 添加了锁元数据写入和读取功能,记录进程ID和启动时间
- 实现了进程存活检查机制,防止PID复用导致的死锁
- 引入了过期锁检测和自动清理机制
- 集成了psutil库进行系统进程监控
- 优化了缓存清理逻辑,支持跳过活跃锁文件
- 使用JSON格式存储锁元数据信息
2026-01-28 23:41:53 +08:00
ffb9d5390e feat(video): 添加视频渲染的宽高参数支持
- 在 render_video 函数中添加 width 和 height 参数传递
- 为 overlay 功能添加 scale 滤镜支持
- 更新 filter_complex 字符串以包含尺寸缩放逻辑
- 修改 overlay 处理流程以正确应用指定尺寸
- 添加相关参数的文档说明
2026-01-27 17:03:56 +08:00
6126856361 feat(cache): 实现上传文件缓存功能
- 在文件上传成功后将文件加入缓存系统
- 添加 add_to_cache 方法支持本地文件缓存
- 实现原子操作确保缓存写入安全
- 集成锁机制防止并发冲突
- 自动触发缓存清理策略
- 记录详细的缓存操作日志
2026-01-26 10:41:26 +08:00
a6263398ed fix(storage): 解决URL编码字符处理问题
- 添加了URL解码功能以处理编码字符(如%2F转换为/)
- 修复了URL匹配逻辑中的编码问题
- 确保替换操作正确处理已编码的路径字符
2026-01-24 23:33:50 +08:00
885b69233a feat(storage): 添加上传日志记录功能
- 导入 urllib.parse.unquote 模块用于 URL 解码
- 在使用 rclone 上传时添加上传目标 URL 的日志记录
- 便于调试和监控文件上传过程
2026-01-24 23:29:37 +08:00
9158854411 fix(video): 修复MP4合并时的路径处理问题
- 修改concat.txt中TS文件路径为相对路径,只写文件名
- 移除不必要的反斜杠替换逻辑
- 确保FFmpeg concat协议能正确识别文件路径
2026-01-24 22:59:35 +08:00
634dc6c855 fix(cache): 解决缓存清理时删除正在使用的文件问题
- 添加文件锁定检查机制避免删除正在使用的缓存文件
- 实现基于文件名提取cache_key的锁定状态检测
- 在删除前验证锁文件是否存在以确保安全清理
- 添加调试日志记录跳过的锁定文件信息
2026-01-24 22:57:57 +08:00
5 changed files with 195 additions and 12 deletions

View File

@@ -464,6 +464,11 @@ class BaseHandler(TaskHandler, ABC):
if result: if result:
file_size = os.path.getsize(file_path) file_size = os.path.getsize(file_path)
logger.info(f"[task:{task_id}] Uploaded: {file_path} ({file_size} bytes)") logger.info(f"[task:{task_id}] Uploaded: {file_path} ({file_size} bytes)")
# 将上传成功的文件加入缓存
if access_url:
self.material_cache.add_to_cache(access_url, file_path)
return access_url return access_url
else: else:
logger.error(f"[task:{task_id}] Upload failed: {file_path}") logger.error(f"[task:{task_id}] Upload failed: {file_path}")

View File

@@ -127,9 +127,9 @@ class FinalizeMp4Handler(BaseHandler):
concat_file = os.path.join(work_dir, 'concat.txt') concat_file = os.path.join(work_dir, 'concat.txt')
with open(concat_file, 'w', encoding='utf-8') as f: with open(concat_file, 'w', encoding='utf-8') as f:
for ts_file in ts_files: for ts_file in ts_files:
# 路径中的反斜杠需要转义或使用正斜杠 # FFmpeg concat 路径相对于 concat.txt 所在目录,只需写文件名
ts_path = ts_file.replace('\\', '/') ts_filename = os.path.basename(ts_file)
f.write(f"file '{ts_path}'\n") f.write(f"file '{ts_filename}'\n")
# 3. 构建合并命令(remux,不重编码) # 3. 构建合并命令(remux,不重编码)
cmd = [ cmd = [

View File

@@ -466,6 +466,8 @@ class RenderSegmentVideoHandler(BaseHandler):
base_filters=filters, base_filters=filters,
effects=effects, effects=effects,
fps=fps, fps=fps,
width=width,
height=height,
has_overlay=has_overlay, has_overlay=has_overlay,
is_video_overlay=is_video_overlay, is_video_overlay=is_video_overlay,
overlap_head_ms=overlap_head_ms, overlap_head_ms=overlap_head_ms,
@@ -492,12 +494,13 @@ class RenderSegmentVideoHandler(BaseHandler):
if has_overlay: if has_overlay:
# 使用 filter_complex 格式 # 使用 filter_complex 格式
base_filters = ','.join(filters) if filters else 'copy' base_filters = ','.join(filters) if filters else 'copy'
overlay_scale = f"scale={width}:{height}"
# 视频 overlay 使用 eof_action=pass(结束后消失),图片 overlay 使用默认行为(保持显示) # 视频 overlay 使用 eof_action=pass(结束后消失),图片 overlay 使用默认行为(保持显示)
overlay_params = 'eof_action=pass' if is_video_overlay else '' overlay_params = 'eof_action=pass' if is_video_overlay else ''
overlay_filter = f"overlay=0:0:{overlay_params}" if overlay_params else 'overlay=0:0' overlay_filter = f"overlay=0:0:{overlay_params}" if overlay_params else 'overlay=0:0'
# 视频 overlay 需要在末尾统一颜色范围,避免 overlay 结束后 range 从 tv 变为 pc # 视频 overlay 需要在末尾统一颜色范围,避免 overlay 结束后 range 从 tv 变为 pc
range_fix = ',format=yuv420p,setrange=tv' if is_video_overlay else '' range_fix = ',format=yuv420p,setrange=tv' if is_video_overlay else ''
return f"[0:v]{base_filters}[base];[base][1:v]{overlay_filter}{range_fix}" return f"[0:v]{base_filters}[base];[1:v]{overlay_scale}[overlay];[base][overlay]{overlay_filter}{range_fix}"
else: else:
return ','.join(filters) if filters else '' return ','.join(filters) if filters else ''
@@ -506,6 +509,8 @@ class RenderSegmentVideoHandler(BaseHandler):
base_filters: List[str], base_filters: List[str],
effects: List[Effect], effects: List[Effect],
fps: int, fps: int,
width: int,
height: int,
has_overlay: bool = False, has_overlay: bool = False,
is_video_overlay: bool = False, is_video_overlay: bool = False,
overlap_head_ms: int = 0, overlap_head_ms: int = 0,
@@ -521,6 +526,8 @@ class RenderSegmentVideoHandler(BaseHandler):
base_filters: 基础滤镜列表 base_filters: 基础滤镜列表
effects: 特效列表 effects: 特效列表
fps: 帧率 fps: 帧率
width: 输出宽度
height: 输出高度
has_overlay: 是否有叠加层 has_overlay: 是否有叠加层
is_video_overlay: 叠加层是否为视频格式(如 .mov) is_video_overlay: 叠加层是否为视频格式(如 .mov)
overlap_head_ms: 头部 overlap 时长 overlap_head_ms: 头部 overlap 时长
@@ -604,9 +611,12 @@ class RenderSegmentVideoHandler(BaseHandler):
# 视频 overlay 使用 eof_action=pass(结束后消失),图片 overlay 使用默认行为(保持显示) # 视频 overlay 使用 eof_action=pass(结束后消失),图片 overlay 使用默认行为(保持显示)
overlay_params = 'eof_action=pass' if is_video_overlay else '' overlay_params = 'eof_action=pass' if is_video_overlay else ''
overlay_filter = f"overlay=0:0:{overlay_params}" if overlay_params else 'overlay=0:0' overlay_filter = f"overlay=0:0:{overlay_params}" if overlay_params else 'overlay=0:0'
overlay_scale = f"scale={width}:{height}"
overlay_output = '[v_overlay]'
# 视频 overlay 需要在末尾统一颜色范围,避免 overlay 结束后 range 从 tv 变为 pc # 视频 overlay 需要在末尾统一颜色范围,避免 overlay 结束后 range 从 tv 变为 pc
range_fix = ',format=yuv420p,setrange=tv' if is_video_overlay else '' range_fix = ',format=yuv420p,setrange=tv' if is_video_overlay else ''
filter_parts.append(f"{current_output}[1:v]{overlay_filter}{range_fix}") filter_parts.append(f"[1:v]{overlay_scale}{overlay_output}")
filter_parts.append(f"{current_output}{overlay_output}{overlay_filter}{range_fix}")
else: else:
# 移除最后一个标签,直接输出 # 移除最后一个标签,直接输出
# 将最后一个滤镜的输出标签替换为空(直接输出) # 将最后一个滤镜的输出标签替换为空(直接输出)

View File

@@ -5,6 +5,7 @@
提供素材下载缓存功能,避免相同素材重复下载。 提供素材下载缓存功能,避免相同素材重复下载。
""" """
import json
import os import os
import hashlib import hashlib
import logging import logging
@@ -14,6 +15,8 @@ import uuid
from typing import Optional, Tuple from typing import Optional, Tuple
from urllib.parse import urlparse, unquote from urllib.parse import urlparse, unquote
import psutil
from services import storage from services import storage
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -62,6 +65,7 @@ class MaterialCache:
LOCK_TIMEOUT_SEC = 30.0 LOCK_TIMEOUT_SEC = 30.0
LOCK_POLL_INTERVAL_SEC = 0.1 LOCK_POLL_INTERVAL_SEC = 0.1
LOCK_STALE_SECONDS = 24 * 60 * 60
def __init__(self, cache_dir: str, enabled: bool = True, max_size_gb: float = 0): def __init__(self, cache_dir: str, enabled: bool = True, max_size_gb: float = 0):
""" """
@@ -100,6 +104,96 @@ class MaterialCache:
assert self.cache_dir assert self.cache_dir
return os.path.join(self.cache_dir, f"{cache_key}.lock") return os.path.join(self.cache_dir, f"{cache_key}.lock")
def _write_lock_metadata(self, lock_fd: int, lock_path: str) -> bool:
"""写入锁元数据,失败则清理锁文件"""
try:
try:
process_start_time = psutil.Process(os.getpid()).create_time()
except Exception as e:
process_start_time = None
logger.warning(f"Cache lock process start time error: {e}")
metadata = {
'pid': os.getpid(),
'process_start_time': process_start_time,
'created_at': time.time()
}
with os.fdopen(lock_fd, 'w', encoding='utf-8') as lock_file:
json.dump(metadata, lock_file)
return True
except Exception as e:
try:
os.close(lock_fd)
except Exception:
pass
self._remove_lock_file(lock_path, f"write metadata failed: {e}")
return False
def _read_lock_metadata(self, lock_path: str) -> Optional[dict]:
"""读取锁元数据,失败返回 None(兼容历史空锁文件)"""
try:
with open(lock_path, 'r', encoding='utf-8') as lock_file:
data = json.load(lock_file)
return data if isinstance(data, dict) else None
except Exception:
return None
def _is_process_alive(self, pid: int, expected_start_time: Optional[float]) -> bool:
"""判断进程是否存活并校验启动时间(防止 PID 复用)"""
try:
process = psutil.Process(pid)
if expected_start_time is None:
return process.is_running()
actual_start_time = process.create_time()
return abs(actual_start_time - expected_start_time) < 1.0
except psutil.NoSuchProcess:
return False
except Exception as e:
logger.warning(f"Cache lock process check error: {e}")
return True
def _is_lock_stale(self, lock_path: str) -> bool:
"""判断锁是否过期(进程已退出或超过最大存活时长)"""
if not os.path.exists(lock_path):
return False
now = time.time()
metadata = self._read_lock_metadata(lock_path)
if metadata:
created_at = metadata.get('created_at')
if isinstance(created_at, (int, float)) and now - created_at > self.LOCK_STALE_SECONDS:
return True
pid = metadata.get('pid')
pid_value = int(pid) if isinstance(pid, int) or (isinstance(pid, str) and pid.isdigit()) else None
expected_start_time = metadata.get('process_start_time')
expected_start_time_value = (
expected_start_time if isinstance(expected_start_time, (int, float)) else None
)
if pid_value is not None and not self._is_process_alive(pid_value, expected_start_time_value):
return True
return self._is_lock_stale_by_mtime(lock_path, now)
return self._is_lock_stale_by_mtime(lock_path, now)
def _is_lock_stale_by_mtime(self, lock_path: str, now: float) -> bool:
"""基于文件时间判断锁是否过期"""
try:
mtime = os.path.getmtime(lock_path)
return now - mtime > self.LOCK_STALE_SECONDS
except Exception as e:
logger.warning(f"Cache lock stat error: {e}")
return False
def _remove_lock_file(self, lock_path: str, reason: str = "") -> bool:
"""删除锁文件"""
try:
os.remove(lock_path)
if reason:
logger.info(f"Cache lock removed: {lock_path} ({reason})")
return True
except FileNotFoundError:
return True
except Exception as e:
logger.warning(f"Cache lock remove error: {e}")
return False
def _acquire_lock(self, cache_key: str) -> Optional[str]: def _acquire_lock(self, cache_key: str) -> Optional[str]:
"""获取缓存锁(跨进程安全)""" """获取缓存锁(跨进程安全)"""
if not self.enabled: if not self.enabled:
@@ -111,9 +205,14 @@ class MaterialCache:
while True: while True:
try: try:
fd = os.open(lock_path, os.O_CREAT | os.O_EXCL | os.O_WRONLY) fd = os.open(lock_path, os.O_CREAT | os.O_EXCL | os.O_WRONLY)
os.close(fd) if not self._write_lock_metadata(fd, lock_path):
return None
return lock_path return lock_path
except FileExistsError: except FileExistsError:
if self._is_lock_stale(lock_path):
removed = self._remove_lock_file(lock_path, "stale lock")
if removed:
continue
if time.monotonic() >= deadline: if time.monotonic() >= deadline:
logger.warning(f"Cache lock timeout: {lock_path}") logger.warning(f"Cache lock timeout: {lock_path}")
return None return None
@@ -126,12 +225,7 @@ class MaterialCache:
"""释放缓存锁""" """释放缓存锁"""
if not lock_path: if not lock_path:
return return
try: self._remove_lock_file(lock_path)
os.remove(lock_path)
except FileNotFoundError:
return
except Exception as e:
logger.warning(f"Cache lock release error: {e}")
def is_cached(self, url: str) -> Tuple[bool, str]: def is_cached(self, url: str) -> Tuple[bool, str]:
""" """
@@ -251,6 +345,66 @@ class MaterialCache:
finally: finally:
self._release_lock(lock_path) self._release_lock(lock_path)
def add_to_cache(self, url: str, source_path: str) -> bool:
"""
将本地文件添加到缓存
Args:
url: 对应的 URL(用于生成缓存键)
source_path: 本地文件路径
Returns:
是否成功
"""
if not self.enabled:
return False
if not os.path.exists(source_path):
logger.warning(f"Source file not found for cache: {source_path}")
return False
cache_key = _extract_cache_key(url)
lock_path = self._acquire_lock(cache_key)
if not lock_path:
logger.warning(f"Cache lock unavailable for adding: {url[:80]}...")
return False
try:
cache_path = self.get_cache_path(url)
# 先复制到临时文件
temp_cache_path = os.path.join(
self.cache_dir,
f"{cache_key}.{uuid.uuid4().hex}.adding"
)
shutil.copy2(source_path, temp_cache_path)
# 原子替换
os.replace(temp_cache_path, cache_path)
# 更新访问时间
os.utime(cache_path, None)
logger.info(f"Added to cache: {url[:80]}... <- {source_path}")
# 检查清理
if self.max_size_bytes > 0:
self._cleanup_if_needed()
return True
except Exception as e:
logger.error(f"Failed to add to cache: {e}")
if 'temp_cache_path' in locals() and os.path.exists(temp_cache_path):
try:
os.remove(temp_cache_path)
except Exception:
pass
return False
finally:
self._release_lock(lock_path)
def _cleanup_if_needed(self) -> None: def _cleanup_if_needed(self) -> None:
""" """
检查并清理缓存(LRU 策略) 检查并清理缓存(LRU 策略)
@@ -292,6 +446,17 @@ class MaterialCache:
for file_info in cache_files: for file_info in cache_files:
if total_size <= target_size: if total_size <= target_size:
break break
# 从文件名提取 cache_key,检查是否有锁(说明正在被使用)
filename = os.path.basename(file_info['path'])
cache_key = os.path.splitext(filename)[0]
lock_path = self._get_lock_path(cache_key)
if os.path.exists(lock_path):
if self._is_lock_stale(lock_path):
self._remove_lock_file(lock_path, "cleanup stale lock")
else:
# 该文件正在被其他任务使用,跳过删除
logger.debug(f"Cache cleanup: skipping locked file {filename}")
continue
try: try:
os.remove(file_info['path']) os.remove(file_info['path'])
total_size -= file_info['size'] total_size -= file_info['size']

View File

@@ -9,6 +9,7 @@ import os
import logging import logging
import subprocess import subprocess
from typing import Optional from typing import Optional
from urllib.parse import unquote
import requests import requests
@@ -85,6 +86,7 @@ def upload_file(url: str, file_path: str, max_retries: int = 5, timeout: int = 6
# 检查是否使用 rclone 上传 # 检查是否使用 rclone 上传
if os.getenv("UPLOAD_METHOD") == "rclone": if os.getenv("UPLOAD_METHOD") == "rclone":
logger.info(f"Uploading to: {url}")
result = _upload_with_rclone(url, file_path) result = _upload_with_rclone(url, file_path)
if result: if result:
return True return True
@@ -144,6 +146,7 @@ def _upload_with_rclone(url: str, file_path: str) -> bool:
for src, dst in replace_list: for src, dst in replace_list:
new_url = new_url.replace(src, dst) new_url = new_url.replace(src, dst)
new_url = new_url.split("?", 1)[0] # 移除查询参数 new_url = new_url.split("?", 1)[0] # 移除查询参数
new_url = unquote(new_url) # 解码 URL 编码的字符(如 %2F -> /)
if new_url == url: if new_url == url:
return False return False