7 Commits

Author SHA1 Message Date
a70573395b feat(cache): 增强缓存锁机制支持进程存活检测
- 添加了锁元数据写入和读取功能,记录进程ID和启动时间
- 实现了进程存活检查机制,防止PID复用导致的死锁
- 引入了过期锁检测和自动清理机制
- 集成了psutil库进行系统进程监控
- 优化了缓存清理逻辑,支持跳过活跃锁文件
- 使用JSON格式存储锁元数据信息
2026-01-28 23:41:53 +08:00
ffb9d5390e feat(video): 添加视频渲染的宽高参数支持
- 在 render_video 函数中添加 width 和 height 参数传递
- 为 overlay 功能添加 scale 滤镜支持
- 更新 filter_complex 字符串以包含尺寸缩放逻辑
- 修改 overlay 处理流程以正确应用指定尺寸
- 添加相关参数的文档说明
2026-01-27 17:03:56 +08:00
6126856361 feat(cache): 实现上传文件缓存功能
- 在文件上传成功后将文件加入缓存系统
- 添加 add_to_cache 方法支持本地文件缓存
- 实现原子操作确保缓存写入安全
- 集成锁机制防止并发冲突
- 自动触发缓存清理策略
- 记录详细的缓存操作日志
2026-01-26 10:41:26 +08:00
a6263398ed fix(storage): 解决URL编码字符处理问题
- 添加了URL解码功能以处理编码字符(如%2F转换为/)
- 修复了URL匹配逻辑中的编码问题
- 确保替换操作正确处理已编码的路径字符
2026-01-24 23:33:50 +08:00
885b69233a feat(storage): 添加上传日志记录功能
- 导入 urllib.parse.unquote 模块用于 URL 解码
- 在使用 rclone 上传时添加上传目标 URL 的日志记录
- 便于调试和监控文件上传过程
2026-01-24 23:29:37 +08:00
9158854411 fix(video): 修复MP4合并时的路径处理问题
- 修改concat.txt中TS文件路径为相对路径,只写文件名
- 移除不必要的反斜杠替换逻辑
- 确保FFmpeg concat协议能正确识别文件路径
2026-01-24 22:59:35 +08:00
634dc6c855 fix(cache): 解决缓存清理时删除正在使用的文件问题
- 添加文件锁定检查机制避免删除正在使用的缓存文件
- 实现基于文件名提取cache_key的锁定状态检测
- 在删除前验证锁文件是否存在以确保安全清理
- 添加调试日志记录跳过的锁定文件信息
2026-01-24 22:57:57 +08:00
5 changed files with 195 additions and 12 deletions

View File

@@ -464,6 +464,11 @@ class BaseHandler(TaskHandler, ABC):
if result:
file_size = os.path.getsize(file_path)
logger.info(f"[task:{task_id}] Uploaded: {file_path} ({file_size} bytes)")
# 将上传成功的文件加入缓存
if access_url:
self.material_cache.add_to_cache(access_url, file_path)
return access_url
else:
logger.error(f"[task:{task_id}] Upload failed: {file_path}")

View File

@@ -127,9 +127,9 @@ class FinalizeMp4Handler(BaseHandler):
concat_file = os.path.join(work_dir, 'concat.txt')
with open(concat_file, 'w', encoding='utf-8') as f:
for ts_file in ts_files:
# 路径中的反斜杠需要转义或使用正斜杠
ts_path = ts_file.replace('\\', '/')
f.write(f"file '{ts_path}'\n")
# FFmpeg concat 路径相对于 concat.txt 所在目录,只需写文件名
ts_filename = os.path.basename(ts_file)
f.write(f"file '{ts_filename}'\n")
# 3. 构建合并命令(remux,不重编码)
cmd = [

View File

@@ -466,6 +466,8 @@ class RenderSegmentVideoHandler(BaseHandler):
base_filters=filters,
effects=effects,
fps=fps,
width=width,
height=height,
has_overlay=has_overlay,
is_video_overlay=is_video_overlay,
overlap_head_ms=overlap_head_ms,
@@ -492,12 +494,13 @@ class RenderSegmentVideoHandler(BaseHandler):
if has_overlay:
# 使用 filter_complex 格式
base_filters = ','.join(filters) if filters else 'copy'
overlay_scale = f"scale={width}:{height}"
# 视频 overlay 使用 eof_action=pass(结束后消失),图片 overlay 使用默认行为(保持显示)
overlay_params = 'eof_action=pass' if is_video_overlay else ''
overlay_filter = f"overlay=0:0:{overlay_params}" if overlay_params else 'overlay=0:0'
# 视频 overlay 需要在末尾统一颜色范围,避免 overlay 结束后 range 从 tv 变为 pc
range_fix = ',format=yuv420p,setrange=tv' if is_video_overlay else ''
return f"[0:v]{base_filters}[base];[base][1:v]{overlay_filter}{range_fix}"
return f"[0:v]{base_filters}[base];[1:v]{overlay_scale}[overlay];[base][overlay]{overlay_filter}{range_fix}"
else:
return ','.join(filters) if filters else ''
@@ -506,6 +509,8 @@ class RenderSegmentVideoHandler(BaseHandler):
base_filters: List[str],
effects: List[Effect],
fps: int,
width: int,
height: int,
has_overlay: bool = False,
is_video_overlay: bool = False,
overlap_head_ms: int = 0,
@@ -521,6 +526,8 @@ class RenderSegmentVideoHandler(BaseHandler):
base_filters: 基础滤镜列表
effects: 特效列表
fps: 帧率
width: 输出宽度
height: 输出高度
has_overlay: 是否有叠加层
is_video_overlay: 叠加层是否为视频格式(如 .mov)
overlap_head_ms: 头部 overlap 时长
@@ -604,9 +611,12 @@ class RenderSegmentVideoHandler(BaseHandler):
# 视频 overlay 使用 eof_action=pass(结束后消失),图片 overlay 使用默认行为(保持显示)
overlay_params = 'eof_action=pass' if is_video_overlay else ''
overlay_filter = f"overlay=0:0:{overlay_params}" if overlay_params else 'overlay=0:0'
overlay_scale = f"scale={width}:{height}"
overlay_output = '[v_overlay]'
# 视频 overlay 需要在末尾统一颜色范围,避免 overlay 结束后 range 从 tv 变为 pc
range_fix = ',format=yuv420p,setrange=tv' if is_video_overlay else ''
filter_parts.append(f"{current_output}[1:v]{overlay_filter}{range_fix}")
filter_parts.append(f"[1:v]{overlay_scale}{overlay_output}")
filter_parts.append(f"{current_output}{overlay_output}{overlay_filter}{range_fix}")
else:
# 移除最后一个标签,直接输出
# 将最后一个滤镜的输出标签替换为空(直接输出)

View File

@@ -5,6 +5,7 @@
提供素材下载缓存功能,避免相同素材重复下载。
"""
import json
import os
import hashlib
import logging
@@ -14,6 +15,8 @@ import uuid
from typing import Optional, Tuple
from urllib.parse import urlparse, unquote
import psutil
from services import storage
logger = logging.getLogger(__name__)
@@ -62,6 +65,7 @@ class MaterialCache:
LOCK_TIMEOUT_SEC = 30.0
LOCK_POLL_INTERVAL_SEC = 0.1
LOCK_STALE_SECONDS = 24 * 60 * 60
def __init__(self, cache_dir: str, enabled: bool = True, max_size_gb: float = 0):
"""
@@ -100,6 +104,96 @@ class MaterialCache:
assert self.cache_dir
return os.path.join(self.cache_dir, f"{cache_key}.lock")
def _write_lock_metadata(self, lock_fd: int, lock_path: str) -> bool:
"""写入锁元数据,失败则清理锁文件"""
try:
try:
process_start_time = psutil.Process(os.getpid()).create_time()
except Exception as e:
process_start_time = None
logger.warning(f"Cache lock process start time error: {e}")
metadata = {
'pid': os.getpid(),
'process_start_time': process_start_time,
'created_at': time.time()
}
with os.fdopen(lock_fd, 'w', encoding='utf-8') as lock_file:
json.dump(metadata, lock_file)
return True
except Exception as e:
try:
os.close(lock_fd)
except Exception:
pass
self._remove_lock_file(lock_path, f"write metadata failed: {e}")
return False
def _read_lock_metadata(self, lock_path: str) -> Optional[dict]:
"""读取锁元数据,失败返回 None(兼容历史空锁文件)"""
try:
with open(lock_path, 'r', encoding='utf-8') as lock_file:
data = json.load(lock_file)
return data if isinstance(data, dict) else None
except Exception:
return None
def _is_process_alive(self, pid: int, expected_start_time: Optional[float]) -> bool:
"""判断进程是否存活并校验启动时间(防止 PID 复用)"""
try:
process = psutil.Process(pid)
if expected_start_time is None:
return process.is_running()
actual_start_time = process.create_time()
return abs(actual_start_time - expected_start_time) < 1.0
except psutil.NoSuchProcess:
return False
except Exception as e:
logger.warning(f"Cache lock process check error: {e}")
return True
def _is_lock_stale(self, lock_path: str) -> bool:
"""判断锁是否过期(进程已退出或超过最大存活时长)"""
if not os.path.exists(lock_path):
return False
now = time.time()
metadata = self._read_lock_metadata(lock_path)
if metadata:
created_at = metadata.get('created_at')
if isinstance(created_at, (int, float)) and now - created_at > self.LOCK_STALE_SECONDS:
return True
pid = metadata.get('pid')
pid_value = int(pid) if isinstance(pid, int) or (isinstance(pid, str) and pid.isdigit()) else None
expected_start_time = metadata.get('process_start_time')
expected_start_time_value = (
expected_start_time if isinstance(expected_start_time, (int, float)) else None
)
if pid_value is not None and not self._is_process_alive(pid_value, expected_start_time_value):
return True
return self._is_lock_stale_by_mtime(lock_path, now)
return self._is_lock_stale_by_mtime(lock_path, now)
def _is_lock_stale_by_mtime(self, lock_path: str, now: float) -> bool:
"""基于文件时间判断锁是否过期"""
try:
mtime = os.path.getmtime(lock_path)
return now - mtime > self.LOCK_STALE_SECONDS
except Exception as e:
logger.warning(f"Cache lock stat error: {e}")
return False
def _remove_lock_file(self, lock_path: str, reason: str = "") -> bool:
"""删除锁文件"""
try:
os.remove(lock_path)
if reason:
logger.info(f"Cache lock removed: {lock_path} ({reason})")
return True
except FileNotFoundError:
return True
except Exception as e:
logger.warning(f"Cache lock remove error: {e}")
return False
def _acquire_lock(self, cache_key: str) -> Optional[str]:
"""获取缓存锁(跨进程安全)"""
if not self.enabled:
@@ -111,9 +205,14 @@ class MaterialCache:
while True:
try:
fd = os.open(lock_path, os.O_CREAT | os.O_EXCL | os.O_WRONLY)
os.close(fd)
if not self._write_lock_metadata(fd, lock_path):
return None
return lock_path
except FileExistsError:
if self._is_lock_stale(lock_path):
removed = self._remove_lock_file(lock_path, "stale lock")
if removed:
continue
if time.monotonic() >= deadline:
logger.warning(f"Cache lock timeout: {lock_path}")
return None
@@ -126,12 +225,7 @@ class MaterialCache:
"""释放缓存锁"""
if not lock_path:
return
try:
os.remove(lock_path)
except FileNotFoundError:
return
except Exception as e:
logger.warning(f"Cache lock release error: {e}")
self._remove_lock_file(lock_path)
def is_cached(self, url: str) -> Tuple[bool, str]:
"""
@@ -251,6 +345,66 @@ class MaterialCache:
finally:
self._release_lock(lock_path)
def add_to_cache(self, url: str, source_path: str) -> bool:
"""
将本地文件添加到缓存
Args:
url: 对应的 URL(用于生成缓存键)
source_path: 本地文件路径
Returns:
是否成功
"""
if not self.enabled:
return False
if not os.path.exists(source_path):
logger.warning(f"Source file not found for cache: {source_path}")
return False
cache_key = _extract_cache_key(url)
lock_path = self._acquire_lock(cache_key)
if not lock_path:
logger.warning(f"Cache lock unavailable for adding: {url[:80]}...")
return False
try:
cache_path = self.get_cache_path(url)
# 先复制到临时文件
temp_cache_path = os.path.join(
self.cache_dir,
f"{cache_key}.{uuid.uuid4().hex}.adding"
)
shutil.copy2(source_path, temp_cache_path)
# 原子替换
os.replace(temp_cache_path, cache_path)
# 更新访问时间
os.utime(cache_path, None)
logger.info(f"Added to cache: {url[:80]}... <- {source_path}")
# 检查清理
if self.max_size_bytes > 0:
self._cleanup_if_needed()
return True
except Exception as e:
logger.error(f"Failed to add to cache: {e}")
if 'temp_cache_path' in locals() and os.path.exists(temp_cache_path):
try:
os.remove(temp_cache_path)
except Exception:
pass
return False
finally:
self._release_lock(lock_path)
def _cleanup_if_needed(self) -> None:
"""
检查并清理缓存(LRU 策略)
@@ -292,6 +446,17 @@ class MaterialCache:
for file_info in cache_files:
if total_size <= target_size:
break
# 从文件名提取 cache_key,检查是否有锁(说明正在被使用)
filename = os.path.basename(file_info['path'])
cache_key = os.path.splitext(filename)[0]
lock_path = self._get_lock_path(cache_key)
if os.path.exists(lock_path):
if self._is_lock_stale(lock_path):
self._remove_lock_file(lock_path, "cleanup stale lock")
else:
# 该文件正在被其他任务使用,跳过删除
logger.debug(f"Cache cleanup: skipping locked file {filename}")
continue
try:
os.remove(file_info['path'])
total_size -= file_info['size']

View File

@@ -9,6 +9,7 @@ import os
import logging
import subprocess
from typing import Optional
from urllib.parse import unquote
import requests
@@ -85,6 +86,7 @@ def upload_file(url: str, file_path: str, max_retries: int = 5, timeout: int = 6
# 检查是否使用 rclone 上传
if os.getenv("UPLOAD_METHOD") == "rclone":
logger.info(f"Uploading to: {url}")
result = _upload_with_rclone(url, file_path)
if result:
return True
@@ -144,6 +146,7 @@ def _upload_with_rclone(url: str, file_path: str) -> bool:
for src, dst in replace_list:
new_url = new_url.replace(src, dst)
new_url = new_url.split("?", 1)[0] # 移除查询参数
new_url = unquote(new_url) # 解码 URL 编码的字符(如 %2F -> /)
if new_url == url:
return False