6 Commits

Author SHA1 Message Date
dd2d40c55b feat(logger): 重构日志系统配置
- 添加RotatingFileHandler支持日志轮转
- 配置多个日志处理器分别输出到控制台、全部日志文件和错误日志文件
- 设置不同日志级别的输出过滤
- 确保日志文件目录存在并正确初始化日志系统
- 移除原有的基础日志配置方式
2026-02-04 18:06:06 +08:00
c57524f174 feat(video): 添加源视频时长检测和帧冻结补足功能
- 探测源视频实际时长并计算变速后的有效时长
- 检测源视频时长不足的情况并记录警告日志
- 计算时长短缺并自动冻结最后一帧进行补足
- 更新 FFmpeg 命令构建逻辑以支持时长补足
- 合并转场 overlap 冻结和时长不足冻结的处理
- 添加必要的参数传递以支持时长检测功能
2026-02-04 17:59:46 +08:00
eeb21cada3 perf(render_video): 优化视频编码的关键帧间隔策略
- 根据视频总帧数动态计算 GOP 大小,避免关键帧过多
- 短视频使用全部帧数作为 GOP,确保只在开头有关键帧
- 正常视频保持 2 秒一个关键帧的策略
- 添加强制第一帧为关键帧的设置
- 优化关键帧最小间隔参数,提升编码效率
2026-02-04 17:46:01 +08:00
a70573395b feat(cache): 增强缓存锁机制支持进程存活检测
- 添加了锁元数据写入和读取功能,记录进程ID和启动时间
- 实现了进程存活检查机制,防止PID复用导致的死锁
- 引入了过期锁检测和自动清理机制
- 集成了psutil库进行系统进程监控
- 优化了缓存清理逻辑,支持跳过活跃锁文件
- 使用JSON格式存储锁元数据信息
2026-01-28 23:41:53 +08:00
ffb9d5390e feat(video): 添加视频渲染的宽高参数支持
- 在 render_video 函数中添加 width 和 height 参数传递
- 为 overlay 功能添加 scale 滤镜支持
- 更新 filter_complex 字符串以包含尺寸缩放逻辑
- 修改 overlay 处理流程以正确应用指定尺寸
- 添加相关参数的文档说明
2026-01-27 17:03:56 +08:00
6126856361 feat(cache): 实现上传文件缓存功能
- 在文件上传成功后将文件加入缓存系统
- 添加 add_to_cache 方法支持本地文件缓存
- 实现原子操作确保缓存写入安全
- 集成锁机制防止并发冲突
- 自动触发缓存清理策略
- 记录详细的缓存操作日志
2026-01-26 10:41:26 +08:00
4 changed files with 359 additions and 44 deletions

View File

@@ -464,6 +464,11 @@ class BaseHandler(TaskHandler, ABC):
if result: if result:
file_size = os.path.getsize(file_path) file_size = os.path.getsize(file_path)
logger.info(f"[task:{task_id}] Uploaded: {file_path} ({file_size} bytes)") logger.info(f"[task:{task_id}] Uploaded: {file_path} ({file_size} bytes)")
# 将上传成功的文件加入缓存
if access_url:
self.material_cache.add_to_cache(access_url, file_path)
return access_url return access_url
else: else:
logger.error(f"[task:{task_id}] Upload failed: {file_path}") logger.error(f"[task:{task_id}] Upload failed: {file_path}")

View File

@@ -135,13 +135,36 @@ class RenderSegmentVideoHandler(BaseHandler):
logger.warning(f"[task:{task.task_id}] Failed to download overlay, continuing without it") logger.warning(f"[task:{task.task_id}] Failed to download overlay, continuing without it")
overlay_file = None overlay_file = None
# 6. 计算 overlap 时长(用于转场帧冻结 # 6. 探测源视频时长(仅对视频素材
# 用于检测时长不足并通过冻结最后一帧补足
source_duration_sec = None
if not is_image:
source_duration = self.probe_duration(input_file)
if source_duration:
source_duration_sec = source_duration
speed = float(render_spec.speed) if render_spec.speed else 1.0
if speed > 0:
# 计算变速后的有效时长
effective_duration_sec = source_duration_sec / speed
required_duration_sec = duration_ms / 1000.0
# 如果源视频时长不足,记录日志
if effective_duration_sec < required_duration_sec:
shortage_sec = required_duration_sec - effective_duration_sec
logger.warning(
f"[task:{task.task_id}] Source video duration insufficient: "
f"effective={effective_duration_sec:.2f}s (speed={speed}), "
f"required={required_duration_sec:.2f}s, "
f"will freeze last frame for {shortage_sec:.2f}s"
)
# 7. 计算 overlap 时长(用于转场帧冻结)
# 头部 overlap: 来自前一片段的出场转场 # 头部 overlap: 来自前一片段的出场转场
overlap_head_ms = task.get_overlap_head_ms() overlap_head_ms = task.get_overlap_head_ms()
# 尾部 overlap: 当前片段的出场转场 # 尾部 overlap: 当前片段的出场转场
overlap_tail_ms = task.get_overlap_tail_ms_v2() overlap_tail_ms = task.get_overlap_tail_ms_v2()
# 7. 构建 FFmpeg 命令 # 8. 构建 FFmpeg 命令
output_file = os.path.join(work_dir, 'output.mp4') output_file = os.path.join(work_dir, 'output.mp4')
cmd = self._build_command( cmd = self._build_command(
input_file=input_file, input_file=input_file,
@@ -152,28 +175,29 @@ class RenderSegmentVideoHandler(BaseHandler):
lut_file=lut_file, lut_file=lut_file,
overlay_file=overlay_file, overlay_file=overlay_file,
overlap_head_ms=overlap_head_ms, overlap_head_ms=overlap_head_ms,
overlap_tail_ms=overlap_tail_ms overlap_tail_ms=overlap_tail_ms,
source_duration_sec=source_duration_sec
) )
# 8. 执行 FFmpeg # 9. 执行 FFmpeg
if not self.run_ffmpeg(cmd, task.task_id): if not self.run_ffmpeg(cmd, task.task_id):
return TaskResult.fail( return TaskResult.fail(
ErrorCode.E_FFMPEG_FAILED, ErrorCode.E_FFMPEG_FAILED,
"FFmpeg rendering failed" "FFmpeg rendering failed"
) )
# 9. 验证输出文件 # 10. 验证输出文件
if not self.ensure_file_exists(output_file, min_size=4096): if not self.ensure_file_exists(output_file, min_size=4096):
return TaskResult.fail( return TaskResult.fail(
ErrorCode.E_FFMPEG_FAILED, ErrorCode.E_FFMPEG_FAILED,
"Output file is missing or too small" "Output file is missing or too small"
) )
# 10. 获取实际时长 # 11. 获取实际时长
actual_duration = self.probe_duration(output_file) actual_duration = self.probe_duration(output_file)
actual_duration_ms = int(actual_duration * 1000) if actual_duration else duration_ms actual_duration_ms = int(actual_duration * 1000) if actual_duration else duration_ms
# 11. 上传产物 # 12. 上传产物
video_url = self.upload_file(task.task_id, 'video', output_file) video_url = self.upload_file(task.task_id, 'video', output_file)
if not video_url: if not video_url:
return TaskResult.fail( return TaskResult.fail(
@@ -181,7 +205,7 @@ class RenderSegmentVideoHandler(BaseHandler):
"Failed to upload video" "Failed to upload video"
) )
# 12. 构建结果(包含 overlap 信息) # 13. 构建结果(包含 overlap 信息)
result_data = { result_data = {
'videoUrl': video_url, 'videoUrl': video_url,
'actualDurationMs': actual_duration_ms, 'actualDurationMs': actual_duration_ms,
@@ -275,12 +299,24 @@ class RenderSegmentVideoHandler(BaseHandler):
cmd.extend(['-vf', ','.join(filters)]) cmd.extend(['-vf', ','.join(filters)])
# 计算总帧数,动态调整 GOP
total_frames = int(actual_duration_sec * fps)
if total_frames <= 1:
gop_size = 1
elif total_frames < fps:
gop_size = total_frames
else:
gop_size = fps * 2 # 正常情况,2 秒一个关键帧
# 编码参数 # 编码参数
cmd.extend([ cmd.extend([
'-c:v', 'libx264', '-c:v', 'libx264',
'-preset', 'fast', '-preset', 'fast',
'-crf', '18', '-crf', '18',
'-r', str(fps), '-r', str(fps),
'-g', str(gop_size),
'-keyint_min', str(min(gop_size, fps // 2 or 1)),
'-force_key_frames', 'expr:eq(n,0)',
'-an', # 无音频 '-an', # 无音频
output_file output_file
]) ])
@@ -298,7 +334,8 @@ class RenderSegmentVideoHandler(BaseHandler):
lut_file: Optional[str] = None, lut_file: Optional[str] = None,
overlay_file: Optional[str] = None, overlay_file: Optional[str] = None,
overlap_head_ms: int = 0, overlap_head_ms: int = 0,
overlap_tail_ms: int = 0 overlap_tail_ms: int = 0,
source_duration_sec: Optional[float] = None
) -> List[str]: ) -> List[str]:
""" """
构建 FFmpeg 渲染命令 构建 FFmpeg 渲染命令
@@ -313,6 +350,7 @@ class RenderSegmentVideoHandler(BaseHandler):
overlay_file: 叠加层文件路径(可选) overlay_file: 叠加层文件路径(可选)
overlap_head_ms: 头部 overlap 时长(毫秒) overlap_head_ms: 头部 overlap 时长(毫秒)
overlap_tail_ms: 尾部 overlap 时长(毫秒) overlap_tail_ms: 尾部 overlap 时长(毫秒)
source_duration_sec: 源视频实际时长(秒),用于检测时长不足
Returns: Returns:
FFmpeg 命令参数列表 FFmpeg 命令参数列表
@@ -335,10 +373,12 @@ class RenderSegmentVideoHandler(BaseHandler):
filters = self._build_video_filters( filters = self._build_video_filters(
render_spec=render_spec, render_spec=render_spec,
output_spec=output_spec, output_spec=output_spec,
duration_ms=duration_ms,
lut_file=lut_file, lut_file=lut_file,
overlay_file=overlay_file, overlay_file=overlay_file,
overlap_head_ms=overlap_head_ms, overlap_head_ms=overlap_head_ms,
overlap_tail_ms=overlap_tail_ms overlap_tail_ms=overlap_tail_ms,
source_duration_sec=source_duration_sec
) )
# 应用滤镜 # 应用滤镜
@@ -357,16 +397,28 @@ class RenderSegmentVideoHandler(BaseHandler):
fps = output_spec.fps fps = output_spec.fps
cmd.extend(['-r', str(fps)]) cmd.extend(['-r', str(fps)])
# GOP 大小(关键帧间隔)
gop_size = fps * 2 # 2秒一个关键帧
cmd.extend(['-g', str(gop_size)])
cmd.extend(['-keyint_min', str(gop_size)])
# 时长(包含 overlap 区域) # 时长(包含 overlap 区域)
total_duration_ms = duration_ms + overlap_head_ms + overlap_tail_ms total_duration_ms = duration_ms + overlap_head_ms + overlap_tail_ms
duration_sec = total_duration_ms / 1000.0 duration_sec = total_duration_ms / 1000.0
cmd.extend(['-t', str(duration_sec)]) cmd.extend(['-t', str(duration_sec)])
# 动态调整 GOP 大小:对于短视频,GOP 不能大于总帧数
total_frames = int(duration_sec * fps)
if total_frames <= 1:
gop_size = 1
elif total_frames < fps:
# 短于 1 秒的视频,使用全部帧数作为 GOP(整个视频只有开头一个关键帧)
gop_size = total_frames
else:
# 正常情况,2 秒一个关键帧
gop_size = fps * 2
cmd.extend(['-g', str(gop_size)])
cmd.extend(['-keyint_min', str(min(gop_size, fps // 2 or 1))])
# 强制第一帧为关键帧
cmd.extend(['-force_key_frames', 'expr:eq(n,0)'])
# 无音频(视频片段不包含音频) # 无音频(视频片段不包含音频)
cmd.append('-an') cmd.append('-an')
@@ -379,10 +431,12 @@ class RenderSegmentVideoHandler(BaseHandler):
self, self,
render_spec: RenderSpec, render_spec: RenderSpec,
output_spec: OutputSpec, output_spec: OutputSpec,
duration_ms: int,
lut_file: Optional[str] = None, lut_file: Optional[str] = None,
overlay_file: Optional[str] = None, overlay_file: Optional[str] = None,
overlap_head_ms: int = 0, overlap_head_ms: int = 0,
overlap_tail_ms: int = 0 overlap_tail_ms: int = 0,
source_duration_sec: Optional[float] = None
) -> str: ) -> str:
""" """
构建视频滤镜链 构建视频滤镜链
@@ -390,10 +444,12 @@ class RenderSegmentVideoHandler(BaseHandler):
Args: Args:
render_spec: 渲染规格 render_spec: 渲染规格
output_spec: 输出规格 output_spec: 输出规格
duration_ms: 目标时长(毫秒)
lut_file: LUT 文件路径 lut_file: LUT 文件路径
overlay_file: 叠加层文件路径(支持图片 png/jpg 和视频 mov) overlay_file: 叠加层文件路径(支持图片 png/jpg 和视频 mov)
overlap_head_ms: 头部 overlap 时长(毫秒) overlap_head_ms: 头部 overlap 时长(毫秒)
overlap_tail_ms: 尾部 overlap 时长(毫秒) overlap_tail_ms: 尾部 overlap 时长(毫秒)
source_duration_sec: 源视频实际时长(秒),用于检测时长不足
Returns: Returns:
滤镜字符串 滤镜字符串
@@ -466,24 +522,45 @@ class RenderSegmentVideoHandler(BaseHandler):
base_filters=filters, base_filters=filters,
effects=effects, effects=effects,
fps=fps, fps=fps,
width=width,
height=height,
has_overlay=has_overlay, has_overlay=has_overlay,
is_video_overlay=is_video_overlay, is_video_overlay=is_video_overlay,
overlap_head_ms=overlap_head_ms, overlap_head_ms=overlap_head_ms,
overlap_tail_ms=overlap_tail_ms, overlap_tail_ms=overlap_tail_ms,
use_hwdownload=bool(hwaccel_prefix) use_hwdownload=bool(hwaccel_prefix),
duration_ms=duration_ms,
render_spec=render_spec,
source_duration_sec=source_duration_sec
) )
# 6. 帧冻结(tpad)- 用于转场 overlap 区域 # 6. 帧冻结(tpad)- 用于转场 overlap 区域和时长不足补足
# 注意:tpad 必须在缩放之后应用 # 注意:tpad 必须在缩放之后应用
tpad_parts = [] tpad_parts = []
# 计算是否需要额外的尾部冻结(源视频时长不足)
extra_tail_freeze_sec = 0.0
if source_duration_sec is not None:
speed = float(render_spec.speed) if render_spec.speed else 1.0
if speed > 0:
# 计算变速后的有效时长
effective_duration_sec = source_duration_sec / speed
required_duration_sec = duration_ms / 1000.0
# 如果源视频时长不足,需要冻结最后一帧来补足
if effective_duration_sec < required_duration_sec:
extra_tail_freeze_sec = required_duration_sec - effective_duration_sec
if overlap_head_ms > 0: if overlap_head_ms > 0:
# 头部冻结:将第一帧冻结指定时长 # 头部冻结:将第一帧冻结指定时长
head_duration_sec = overlap_head_ms / 1000.0 head_duration_sec = overlap_head_ms / 1000.0
tpad_parts.append(f"start_mode=clone:start_duration={head_duration_sec}") tpad_parts.append(f"start_mode=clone:start_duration={head_duration_sec}")
if overlap_tail_ms > 0:
# 尾部冻结:将最后一帧冻结指定时长 # 尾部冻结:合并 overlap 和时长不足的冻结
tail_duration_sec = overlap_tail_ms / 1000.0 total_tail_freeze_sec = (overlap_tail_ms / 1000.0) + extra_tail_freeze_sec
tpad_parts.append(f"stop_mode=clone:stop_duration={tail_duration_sec}") if total_tail_freeze_sec > 0:
# 将最后一帧冻结指定时长
tpad_parts.append(f"stop_mode=clone:stop_duration={total_tail_freeze_sec}")
if tpad_parts: if tpad_parts:
filters.append(f"tpad={':'.join(tpad_parts)}") filters.append(f"tpad={':'.join(tpad_parts)}")
@@ -492,12 +569,13 @@ class RenderSegmentVideoHandler(BaseHandler):
if has_overlay: if has_overlay:
# 使用 filter_complex 格式 # 使用 filter_complex 格式
base_filters = ','.join(filters) if filters else 'copy' base_filters = ','.join(filters) if filters else 'copy'
overlay_scale = f"scale={width}:{height}"
# 视频 overlay 使用 eof_action=pass(结束后消失),图片 overlay 使用默认行为(保持显示) # 视频 overlay 使用 eof_action=pass(结束后消失),图片 overlay 使用默认行为(保持显示)
overlay_params = 'eof_action=pass' if is_video_overlay else '' overlay_params = 'eof_action=pass' if is_video_overlay else ''
overlay_filter = f"overlay=0:0:{overlay_params}" if overlay_params else 'overlay=0:0' overlay_filter = f"overlay=0:0:{overlay_params}" if overlay_params else 'overlay=0:0'
# 视频 overlay 需要在末尾统一颜色范围,避免 overlay 结束后 range 从 tv 变为 pc # 视频 overlay 需要在末尾统一颜色范围,避免 overlay 结束后 range 从 tv 变为 pc
range_fix = ',format=yuv420p,setrange=tv' if is_video_overlay else '' range_fix = ',format=yuv420p,setrange=tv' if is_video_overlay else ''
return f"[0:v]{base_filters}[base];[base][1:v]{overlay_filter}{range_fix}" return f"[0:v]{base_filters}[base];[1:v]{overlay_scale}[overlay];[base][overlay]{overlay_filter}{range_fix}"
else: else:
return ','.join(filters) if filters else '' return ','.join(filters) if filters else ''
@@ -506,11 +584,16 @@ class RenderSegmentVideoHandler(BaseHandler):
base_filters: List[str], base_filters: List[str],
effects: List[Effect], effects: List[Effect],
fps: int, fps: int,
width: int,
height: int,
has_overlay: bool = False, has_overlay: bool = False,
is_video_overlay: bool = False, is_video_overlay: bool = False,
overlap_head_ms: int = 0, overlap_head_ms: int = 0,
overlap_tail_ms: int = 0, overlap_tail_ms: int = 0,
use_hwdownload: bool = False use_hwdownload: bool = False,
duration_ms: int = 0,
render_spec: Optional[RenderSpec] = None,
source_duration_sec: Optional[float] = None
) -> str: ) -> str:
""" """
构建包含特效的 filter_complex 滤镜图 构建包含特效的 filter_complex 滤镜图
@@ -521,11 +604,16 @@ class RenderSegmentVideoHandler(BaseHandler):
base_filters: 基础滤镜列表 base_filters: 基础滤镜列表
effects: 特效列表 effects: 特效列表
fps: 帧率 fps: 帧率
width: 输出宽度
height: 输出高度
has_overlay: 是否有叠加层 has_overlay: 是否有叠加层
is_video_overlay: 叠加层是否为视频格式(如 .mov) is_video_overlay: 叠加层是否为视频格式(如 .mov)
overlap_head_ms: 头部 overlap 时长 overlap_head_ms: 头部 overlap 时长
overlap_tail_ms: 尾部 overlap 时长 overlap_tail_ms: 尾部 overlap 时长
use_hwdownload: 是否使用了硬件加速解码(已在 base_filters 中包含 hwdownload) use_hwdownload: 是否使用了硬件加速解码(已在 base_filters 中包含 hwdownload)
duration_ms: 目标时长(毫秒)
render_spec: 渲染规格(用于获取变速参数)
source_duration_sec: 源视频实际时长(秒),用于检测时长不足
Returns: Returns:
filter_complex 格式的滤镜字符串 filter_complex 格式的滤镜字符串
@@ -584,14 +672,30 @@ class RenderSegmentVideoHandler(BaseHandler):
current_output = effect_output current_output = effect_output
effect_idx += 1 effect_idx += 1
# 帧冻结(tpad)- 用于转场 overlap 区域 # 帧冻结(tpad)- 用于转场 overlap 区域和时长不足补足
tpad_parts = [] tpad_parts = []
# 计算是否需要额外的尾部冻结(源视频时长不足)
extra_tail_freeze_sec = 0.0
if source_duration_sec is not None and render_spec is not None and duration_ms > 0:
speed = float(render_spec.speed) if render_spec.speed else 1.0
if speed > 0:
# 计算变速后的有效时长
effective_duration_sec = source_duration_sec / speed
required_duration_sec = duration_ms / 1000.0
# 如果源视频时长不足,需要冻结最后一帧来补足
if effective_duration_sec < required_duration_sec:
extra_tail_freeze_sec = required_duration_sec - effective_duration_sec
if overlap_head_ms > 0: if overlap_head_ms > 0:
head_duration_sec = overlap_head_ms / 1000.0 head_duration_sec = overlap_head_ms / 1000.0
tpad_parts.append(f"start_mode=clone:start_duration={head_duration_sec}") tpad_parts.append(f"start_mode=clone:start_duration={head_duration_sec}")
if overlap_tail_ms > 0:
tail_duration_sec = overlap_tail_ms / 1000.0 # 尾部冻结:合并 overlap 和时长不足的冻结
tpad_parts.append(f"stop_mode=clone:stop_duration={tail_duration_sec}") total_tail_freeze_sec = (overlap_tail_ms / 1000.0) + extra_tail_freeze_sec
if total_tail_freeze_sec > 0:
tpad_parts.append(f"stop_mode=clone:stop_duration={total_tail_freeze_sec}")
if tpad_parts: if tpad_parts:
tpad_output = '[v_tpad]' tpad_output = '[v_tpad]'
@@ -604,9 +708,12 @@ class RenderSegmentVideoHandler(BaseHandler):
# 视频 overlay 使用 eof_action=pass(结束后消失),图片 overlay 使用默认行为(保持显示) # 视频 overlay 使用 eof_action=pass(结束后消失),图片 overlay 使用默认行为(保持显示)
overlay_params = 'eof_action=pass' if is_video_overlay else '' overlay_params = 'eof_action=pass' if is_video_overlay else ''
overlay_filter = f"overlay=0:0:{overlay_params}" if overlay_params else 'overlay=0:0' overlay_filter = f"overlay=0:0:{overlay_params}" if overlay_params else 'overlay=0:0'
overlay_scale = f"scale={width}:{height}"
overlay_output = '[v_overlay]'
# 视频 overlay 需要在末尾统一颜色范围,避免 overlay 结束后 range 从 tv 变为 pc # 视频 overlay 需要在末尾统一颜色范围,避免 overlay 结束后 range 从 tv 变为 pc
range_fix = ',format=yuv420p,setrange=tv' if is_video_overlay else '' range_fix = ',format=yuv420p,setrange=tv' if is_video_overlay else ''
filter_parts.append(f"{current_output}[1:v]{overlay_filter}{range_fix}") filter_parts.append(f"[1:v]{overlay_scale}{overlay_output}")
filter_parts.append(f"{current_output}{overlay_output}{overlay_filter}{range_fix}")
else: else:
# 移除最后一个标签,直接输出 # 移除最后一个标签,直接输出
# 将最后一个滤镜的输出标签替换为空(直接输出) # 将最后一个滤镜的输出标签替换为空(直接输出)

View File

@@ -25,6 +25,8 @@ import sys
import time import time
import signal import signal
import logging import logging
import os
from logging.handlers import RotatingFileHandler
from dotenv import load_dotenv from dotenv import load_dotenv
@@ -34,11 +36,55 @@ from services.task_executor import TaskExecutor
from constant import SOFTWARE_VERSION from constant import SOFTWARE_VERSION
# 日志配置 # 日志配置
logging.basicConfig( def setup_logging():
level=logging.INFO, """配置日志系统,输出到控制台和文件"""
format='[%(asctime)s] [%(levelname)s] [%(name)s] %(message)s', # 日志格式
datefmt='%Y-%m-%d %H:%M:%S' log_format = '[%(asctime)s] [%(levelname)s] [%(name)s] %(message)s'
) date_format = '%Y-%m-%d %H:%M:%S'
formatter = logging.Formatter(log_format, date_format)
# 获取根logger
root_logger = logging.getLogger()
root_logger.setLevel(logging.INFO)
# 清除已有的handlers(避免重复)
root_logger.handlers.clear()
# 1. 控制台handler(只输出WARNING及以上级别)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.WARNING)
console_handler.setFormatter(formatter)
root_logger.addHandler(console_handler)
# 确保日志文件所在目录存在
log_dir = os.path.dirname(os.path.abspath(__file__))
# 2. 所有日志文件handler(all_log.log)
all_log_path = os.path.join(log_dir, 'all_log.log')
all_log_handler = RotatingFileHandler(
all_log_path,
maxBytes=10*1024*1024, # 10MB
backupCount=5,
encoding='utf-8'
)
all_log_handler.setLevel(logging.DEBUG) # 记录所有级别
all_log_handler.setFormatter(formatter)
root_logger.addHandler(all_log_handler)
# 3. 错误日志文件handler(error.log)
error_log_path = os.path.join(log_dir, 'error.log')
error_log_handler = RotatingFileHandler(
error_log_path,
maxBytes=10*1024*1024, # 10MB
backupCount=5,
encoding='utf-8'
)
error_log_handler.setLevel(logging.ERROR) # 只记录ERROR及以上
error_log_handler.setFormatter(formatter)
root_logger.addHandler(error_log_handler)
# 初始化日志系统
setup_logging()
logger = logging.getLogger('worker') logger = logging.getLogger('worker')

View File

@@ -5,6 +5,7 @@
提供素材下载缓存功能,避免相同素材重复下载。 提供素材下载缓存功能,避免相同素材重复下载。
""" """
import json
import os import os
import hashlib import hashlib
import logging import logging
@@ -14,6 +15,8 @@ import uuid
from typing import Optional, Tuple from typing import Optional, Tuple
from urllib.parse import urlparse, unquote from urllib.parse import urlparse, unquote
import psutil
from services import storage from services import storage
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -62,6 +65,7 @@ class MaterialCache:
LOCK_TIMEOUT_SEC = 30.0 LOCK_TIMEOUT_SEC = 30.0
LOCK_POLL_INTERVAL_SEC = 0.1 LOCK_POLL_INTERVAL_SEC = 0.1
LOCK_STALE_SECONDS = 24 * 60 * 60
def __init__(self, cache_dir: str, enabled: bool = True, max_size_gb: float = 0): def __init__(self, cache_dir: str, enabled: bool = True, max_size_gb: float = 0):
""" """
@@ -100,6 +104,96 @@ class MaterialCache:
assert self.cache_dir assert self.cache_dir
return os.path.join(self.cache_dir, f"{cache_key}.lock") return os.path.join(self.cache_dir, f"{cache_key}.lock")
def _write_lock_metadata(self, lock_fd: int, lock_path: str) -> bool:
"""写入锁元数据,失败则清理锁文件"""
try:
try:
process_start_time = psutil.Process(os.getpid()).create_time()
except Exception as e:
process_start_time = None
logger.warning(f"Cache lock process start time error: {e}")
metadata = {
'pid': os.getpid(),
'process_start_time': process_start_time,
'created_at': time.time()
}
with os.fdopen(lock_fd, 'w', encoding='utf-8') as lock_file:
json.dump(metadata, lock_file)
return True
except Exception as e:
try:
os.close(lock_fd)
except Exception:
pass
self._remove_lock_file(lock_path, f"write metadata failed: {e}")
return False
def _read_lock_metadata(self, lock_path: str) -> Optional[dict]:
"""读取锁元数据,失败返回 None(兼容历史空锁文件)"""
try:
with open(lock_path, 'r', encoding='utf-8') as lock_file:
data = json.load(lock_file)
return data if isinstance(data, dict) else None
except Exception:
return None
def _is_process_alive(self, pid: int, expected_start_time: Optional[float]) -> bool:
"""判断进程是否存活并校验启动时间(防止 PID 复用)"""
try:
process = psutil.Process(pid)
if expected_start_time is None:
return process.is_running()
actual_start_time = process.create_time()
return abs(actual_start_time - expected_start_time) < 1.0
except psutil.NoSuchProcess:
return False
except Exception as e:
logger.warning(f"Cache lock process check error: {e}")
return True
def _is_lock_stale(self, lock_path: str) -> bool:
"""判断锁是否过期(进程已退出或超过最大存活时长)"""
if not os.path.exists(lock_path):
return False
now = time.time()
metadata = self._read_lock_metadata(lock_path)
if metadata:
created_at = metadata.get('created_at')
if isinstance(created_at, (int, float)) and now - created_at > self.LOCK_STALE_SECONDS:
return True
pid = metadata.get('pid')
pid_value = int(pid) if isinstance(pid, int) or (isinstance(pid, str) and pid.isdigit()) else None
expected_start_time = metadata.get('process_start_time')
expected_start_time_value = (
expected_start_time if isinstance(expected_start_time, (int, float)) else None
)
if pid_value is not None and not self._is_process_alive(pid_value, expected_start_time_value):
return True
return self._is_lock_stale_by_mtime(lock_path, now)
return self._is_lock_stale_by_mtime(lock_path, now)
def _is_lock_stale_by_mtime(self, lock_path: str, now: float) -> bool:
"""基于文件时间判断锁是否过期"""
try:
mtime = os.path.getmtime(lock_path)
return now - mtime > self.LOCK_STALE_SECONDS
except Exception as e:
logger.warning(f"Cache lock stat error: {e}")
return False
def _remove_lock_file(self, lock_path: str, reason: str = "") -> bool:
"""删除锁文件"""
try:
os.remove(lock_path)
if reason:
logger.info(f"Cache lock removed: {lock_path} ({reason})")
return True
except FileNotFoundError:
return True
except Exception as e:
logger.warning(f"Cache lock remove error: {e}")
return False
def _acquire_lock(self, cache_key: str) -> Optional[str]: def _acquire_lock(self, cache_key: str) -> Optional[str]:
"""获取缓存锁(跨进程安全)""" """获取缓存锁(跨进程安全)"""
if not self.enabled: if not self.enabled:
@@ -111,9 +205,14 @@ class MaterialCache:
while True: while True:
try: try:
fd = os.open(lock_path, os.O_CREAT | os.O_EXCL | os.O_WRONLY) fd = os.open(lock_path, os.O_CREAT | os.O_EXCL | os.O_WRONLY)
os.close(fd) if not self._write_lock_metadata(fd, lock_path):
return None
return lock_path return lock_path
except FileExistsError: except FileExistsError:
if self._is_lock_stale(lock_path):
removed = self._remove_lock_file(lock_path, "stale lock")
if removed:
continue
if time.monotonic() >= deadline: if time.monotonic() >= deadline:
logger.warning(f"Cache lock timeout: {lock_path}") logger.warning(f"Cache lock timeout: {lock_path}")
return None return None
@@ -126,12 +225,7 @@ class MaterialCache:
"""释放缓存锁""" """释放缓存锁"""
if not lock_path: if not lock_path:
return return
try: self._remove_lock_file(lock_path)
os.remove(lock_path)
except FileNotFoundError:
return
except Exception as e:
logger.warning(f"Cache lock release error: {e}")
def is_cached(self, url: str) -> Tuple[bool, str]: def is_cached(self, url: str) -> Tuple[bool, str]:
""" """
@@ -251,6 +345,66 @@ class MaterialCache:
finally: finally:
self._release_lock(lock_path) self._release_lock(lock_path)
def add_to_cache(self, url: str, source_path: str) -> bool:
"""
将本地文件添加到缓存
Args:
url: 对应的 URL(用于生成缓存键)
source_path: 本地文件路径
Returns:
是否成功
"""
if not self.enabled:
return False
if not os.path.exists(source_path):
logger.warning(f"Source file not found for cache: {source_path}")
return False
cache_key = _extract_cache_key(url)
lock_path = self._acquire_lock(cache_key)
if not lock_path:
logger.warning(f"Cache lock unavailable for adding: {url[:80]}...")
return False
try:
cache_path = self.get_cache_path(url)
# 先复制到临时文件
temp_cache_path = os.path.join(
self.cache_dir,
f"{cache_key}.{uuid.uuid4().hex}.adding"
)
shutil.copy2(source_path, temp_cache_path)
# 原子替换
os.replace(temp_cache_path, cache_path)
# 更新访问时间
os.utime(cache_path, None)
logger.info(f"Added to cache: {url[:80]}... <- {source_path}")
# 检查清理
if self.max_size_bytes > 0:
self._cleanup_if_needed()
return True
except Exception as e:
logger.error(f"Failed to add to cache: {e}")
if 'temp_cache_path' in locals() and os.path.exists(temp_cache_path):
try:
os.remove(temp_cache_path)
except Exception:
pass
return False
finally:
self._release_lock(lock_path)
def _cleanup_if_needed(self) -> None: def _cleanup_if_needed(self) -> None:
""" """
检查并清理缓存(LRU 策略) 检查并清理缓存(LRU 策略)
@@ -297,9 +451,12 @@ class MaterialCache:
cache_key = os.path.splitext(filename)[0] cache_key = os.path.splitext(filename)[0]
lock_path = self._get_lock_path(cache_key) lock_path = self._get_lock_path(cache_key)
if os.path.exists(lock_path): if os.path.exists(lock_path):
# 该文件正在被其他任务使用,跳过删除 if self._is_lock_stale(lock_path):
logger.debug(f"Cache cleanup: skipping locked file {filename}") self._remove_lock_file(lock_path, "cleanup stale lock")
continue else:
# 该文件正在被其他任务使用,跳过删除
logger.debug(f"Cache cleanup: skipping locked file {filename}")
continue
try: try:
os.remove(file_info['path']) os.remove(file_info['path'])
total_size -= file_info['size'] total_size -= file_info['size']