18 Commits

Author SHA1 Message Date
dd2d40c55b feat(logger): 重构日志系统配置
- 添加RotatingFileHandler支持日志轮转
- 配置多个日志处理器分别输出到控制台、全部日志文件和错误日志文件
- 设置不同日志级别的输出过滤
- 确保日志文件目录存在并正确初始化日志系统
- 移除原有的基础日志配置方式
2026-02-04 18:06:06 +08:00
c57524f174 feat(video): 添加源视频时长检测和帧冻结补足功能
- 探测源视频实际时长并计算变速后的有效时长
- 检测源视频时长不足的情况并记录警告日志
- 计算时长短缺并自动冻结最后一帧进行补足
- 更新 FFmpeg 命令构建逻辑以支持时长补足
- 合并转场 overlap 冻结和时长不足冻结的处理
- 添加必要的参数传递以支持时长检测功能
2026-02-04 17:59:46 +08:00
eeb21cada3 perf(render_video): 优化视频编码的关键帧间隔策略
- 根据视频总帧数动态计算 GOP 大小,避免关键帧过多
- 短视频使用全部帧数作为 GOP,确保只在开头有关键帧
- 正常视频保持 2 秒一个关键帧的策略
- 添加强制第一帧为关键帧的设置
- 优化关键帧最小间隔参数,提升编码效率
2026-02-04 17:46:01 +08:00
a70573395b feat(cache): 增强缓存锁机制支持进程存活检测
- 添加了锁元数据写入和读取功能,记录进程ID和启动时间
- 实现了进程存活检查机制,防止PID复用导致的死锁
- 引入了过期锁检测和自动清理机制
- 集成了psutil库进行系统进程监控
- 优化了缓存清理逻辑,支持跳过活跃锁文件
- 使用JSON格式存储锁元数据信息
2026-01-28 23:41:53 +08:00
ffb9d5390e feat(video): 添加视频渲染的宽高参数支持
- 在 render_video 函数中添加 width 和 height 参数传递
- 为 overlay 功能添加 scale 滤镜支持
- 更新 filter_complex 字符串以包含尺寸缩放逻辑
- 修改 overlay 处理流程以正确应用指定尺寸
- 添加相关参数的文档说明
2026-01-27 17:03:56 +08:00
6126856361 feat(cache): 实现上传文件缓存功能
- 在文件上传成功后将文件加入缓存系统
- 添加 add_to_cache 方法支持本地文件缓存
- 实现原子操作确保缓存写入安全
- 集成锁机制防止并发冲突
- 自动触发缓存清理策略
- 记录详细的缓存操作日志
2026-01-26 10:41:26 +08:00
a6263398ed fix(storage): 解决URL编码字符处理问题
- 添加了URL解码功能以处理编码字符(如%2F转换为/)
- 修复了URL匹配逻辑中的编码问题
- 确保替换操作正确处理已编码的路径字符
2026-01-24 23:33:50 +08:00
885b69233a feat(storage): 添加上传日志记录功能
- 导入 urllib.parse.unquote 模块用于 URL 解码
- 在使用 rclone 上传时添加上传目标 URL 的日志记录
- 便于调试和监控文件上传过程
2026-01-24 23:29:37 +08:00
9158854411 fix(video): 修复MP4合并时的路径处理问题
- 修改concat.txt中TS文件路径为相对路径,只写文件名
- 移除不必要的反斜杠替换逻辑
- 确保FFmpeg concat协议能正确识别文件路径
2026-01-24 22:59:35 +08:00
634dc6c855 fix(cache): 解决缓存清理时删除正在使用的文件问题
- 添加文件锁定检查机制避免删除正在使用的缓存文件
- 实现基于文件名提取cache_key的锁定状态检测
- 在删除前验证锁文件是否存在以确保安全清理
- 添加调试日志记录跳过的锁定文件信息
2026-01-24 22:57:57 +08:00
ca9093504f fix(video): 修复视频定格效果实现逻辑
- 修改cameraShot实现注释,明确标注定格效果功能
- 使用tpad代替freezeframes实现更准确的定格效果
- 更新滤镜链参数配置,确保定格时长正确应用
- 优化变量命名,提高代码可读性
- 调整concat拼接输入源,确保视频流正确连接
2026-01-21 16:48:05 +08:00
ceba9a17a4 fix(video): 解决视频overlay结束后颜色范围变化问题
- 视频overlay需要在末尾统一颜色范围,避免overlay结束后range从tv变为pc
- 添加format=yuv420p和setrange=tv参数来保持一致的颜色范围
- 确保视频overlay结束后的显示效果保持稳定
2026-01-21 16:24:38 +08:00
7acae2f708 fix(video): 修复硬件加速视频处理的颜色空间转换问题
- 修正CUDA/QSV硬件下载仅支持nv12格式输出的问题
- 实现两步转换流程:先下载到nv12格式再转为yuv420p
- 确保与RGBA/YUVA混合时颜色空间转换正确
- 更新文档说明硬件加速滤镜链的格式
2026-01-21 16:14:40 +08:00
ed8dca543e fix(video): 修复硬件加速滤镜中的颜色空间转换问题
- 将硬件下载后的格式从 nv12 改为 yuv420p
- 确保与 RGBA/YUVA 格式的 overlay 混合时颜色空间转换正确
- 解决复杂滤镜(如 lut3d, overlay, crop 等)在硬件表面的颜色显示问题
2026-01-21 16:12:53 +08:00
0a7a0dac89 feat(video): 支持视频格式叠加层渲染
- 添加对 .mov 视频格式叠加层的支持
- 实现视频叠加层结束后自动消失的功能
- 修改参数传递方式从 has_overlay 改为 overlay_file
- 添加 is_video_overlay 参数区分图片和视频叠加层
- 优化 overlay 滤镜参数根据文件类型动态设置
- 更新函数签名和文档注释以支持新的叠加层功能
2026-01-21 15:24:58 +08:00
797507d24b feat(storage): 添加文件上传的 Content-Type 检测功能
- 添加文件扩展名到 Content-Type 的映射表
- 实现根据文件扩展名获取对应 Content-Type 的函数
- 将上传日志中的调试信息改为信息级别并显示 Content-Type
- 使用正确的 Content-Type 替换默认的 application/octet-stream
- 支持 mp
2026-01-21 15:01:22 +08:00
f7ca07b9db debug(storage): 添加上传URL调试日志
- 在上传过程中添加HTTP URL的调试日志输出
2026-01-21 14:56:55 +08:00
4d5e57f61b feat(task): 优化 GPU 调度以支持特定任务类型
- 添加 GPU_REQUIRED_TASK_TYPES 集合定义需要 GPU 加速的任务类型
- 修改任务执行逻辑仅对需要 GPU 的任务类型获取 GPU 设备
- 更新 GPU 设备释放逻辑确保仅在实际分配设备时进行释放
- 改进日志记录和资源管理流程
2026-01-21 14:54:58 +08:00
7 changed files with 449 additions and 68 deletions

View File

@@ -114,6 +114,10 @@ def get_hwaccel_filter_prefix(hw_accel: str = HW_ACCEL_NONE) -> str:
注意:由于大多数复杂滤镜(如 lut3d, overlay, crop 等)不支持硬件表面,
我们需要在滤镜链开始时将硬件表面下载到系统内存。
CUDA/QSV hwdownload 只支持 nv12 格式输出,因此需要两步转换:
1. hwdownload,format=nv12 - 从 GPU 下载到 CPU
2. format=yuv420p - 转换为标准格式(确保与 RGBA/YUVA overlay 混合时颜色正确)
Args:
hw_accel: 硬件加速类型
@@ -121,9 +125,9 @@ def get_hwaccel_filter_prefix(hw_accel: str = HW_ACCEL_NONE) -> str:
需要添加到滤镜链开头的 hwdownload 滤镜字符串
"""
if hw_accel == HW_ACCEL_CUDA:
return 'hwdownload,format=nv12,'
return 'hwdownload,format=nv12,format=yuv420p,'
elif hw_accel == HW_ACCEL_QSV:
return 'hwdownload,format=nv12,'
return 'hwdownload,format=nv12,format=yuv420p,'
else:
return ''
@@ -460,6 +464,11 @@ class BaseHandler(TaskHandler, ABC):
if result:
file_size = os.path.getsize(file_path)
logger.info(f"[task:{task_id}] Uploaded: {file_path} ({file_size} bytes)")
# 将上传成功的文件加入缓存
if access_url:
self.material_cache.add_to_cache(access_url, file_path)
return access_url
else:
logger.error(f"[task:{task_id}] Upload failed: {file_path}")

View File

@@ -127,9 +127,9 @@ class FinalizeMp4Handler(BaseHandler):
concat_file = os.path.join(work_dir, 'concat.txt')
with open(concat_file, 'w', encoding='utf-8') as f:
for ts_file in ts_files:
# 路径中的反斜杠需要转义或使用正斜杠
ts_path = ts_file.replace('\\', '/')
f.write(f"file '{ts_path}'\n")
# FFmpeg concat 路径相对于 concat.txt 所在目录,只需写文件名
ts_filename = os.path.basename(ts_file)
f.write(f"file '{ts_filename}'\n")
# 3. 构建合并命令(remux,不重编码)
cmd = [

View File

@@ -123,21 +123,48 @@ class RenderSegmentVideoHandler(BaseHandler):
overlay_file = None
if render_spec.overlay_url:
# 根据 URL 后缀确定文件扩展名
ext = '.png'
if render_spec.overlay_url.lower().endswith('.jpg') or render_spec.overlay_url.lower().endswith('.jpeg'):
url_lower = render_spec.overlay_url.lower()
if url_lower.endswith('.jpg') or url_lower.endswith('.jpeg'):
ext = '.jpg'
elif url_lower.endswith('.mov'):
ext = '.mov'
else:
ext = '.png' # 默认
overlay_file = os.path.join(work_dir, f'overlay{ext}')
if not self.download_file(render_spec.overlay_url, overlay_file):
logger.warning(f"[task:{task.task_id}] Failed to download overlay, continuing without it")
overlay_file = None
# 6. 计算 overlap 时长(用于转场帧冻结
# 6. 探测源视频时长(仅对视频素材
# 用于检测时长不足并通过冻结最后一帧补足
source_duration_sec = None
if not is_image:
source_duration = self.probe_duration(input_file)
if source_duration:
source_duration_sec = source_duration
speed = float(render_spec.speed) if render_spec.speed else 1.0
if speed > 0:
# 计算变速后的有效时长
effective_duration_sec = source_duration_sec / speed
required_duration_sec = duration_ms / 1000.0
# 如果源视频时长不足,记录日志
if effective_duration_sec < required_duration_sec:
shortage_sec = required_duration_sec - effective_duration_sec
logger.warning(
f"[task:{task.task_id}] Source video duration insufficient: "
f"effective={effective_duration_sec:.2f}s (speed={speed}), "
f"required={required_duration_sec:.2f}s, "
f"will freeze last frame for {shortage_sec:.2f}s"
)
# 7. 计算 overlap 时长(用于转场帧冻结)
# 头部 overlap: 来自前一片段的出场转场
overlap_head_ms = task.get_overlap_head_ms()
# 尾部 overlap: 当前片段的出场转场
overlap_tail_ms = task.get_overlap_tail_ms_v2()
# 7. 构建 FFmpeg 命令
# 8. 构建 FFmpeg 命令
output_file = os.path.join(work_dir, 'output.mp4')
cmd = self._build_command(
input_file=input_file,
@@ -148,28 +175,29 @@ class RenderSegmentVideoHandler(BaseHandler):
lut_file=lut_file,
overlay_file=overlay_file,
overlap_head_ms=overlap_head_ms,
overlap_tail_ms=overlap_tail_ms
overlap_tail_ms=overlap_tail_ms,
source_duration_sec=source_duration_sec
)
# 8. 执行 FFmpeg
# 9. 执行 FFmpeg
if not self.run_ffmpeg(cmd, task.task_id):
return TaskResult.fail(
ErrorCode.E_FFMPEG_FAILED,
"FFmpeg rendering failed"
)
# 9. 验证输出文件
# 10. 验证输出文件
if not self.ensure_file_exists(output_file, min_size=4096):
return TaskResult.fail(
ErrorCode.E_FFMPEG_FAILED,
"Output file is missing or too small"
)
# 10. 获取实际时长
# 11. 获取实际时长
actual_duration = self.probe_duration(output_file)
actual_duration_ms = int(actual_duration * 1000) if actual_duration else duration_ms
# 11. 上传产物
# 12. 上传产物
video_url = self.upload_file(task.task_id, 'video', output_file)
if not video_url:
return TaskResult.fail(
@@ -177,7 +205,7 @@ class RenderSegmentVideoHandler(BaseHandler):
"Failed to upload video"
)
# 12. 构建结果(包含 overlap 信息)
# 13. 构建结果(包含 overlap 信息)
result_data = {
'videoUrl': video_url,
'actualDurationMs': actual_duration_ms,
@@ -271,12 +299,24 @@ class RenderSegmentVideoHandler(BaseHandler):
cmd.extend(['-vf', ','.join(filters)])
# 计算总帧数,动态调整 GOP
total_frames = int(actual_duration_sec * fps)
if total_frames <= 1:
gop_size = 1
elif total_frames < fps:
gop_size = total_frames
else:
gop_size = fps * 2 # 正常情况,2 秒一个关键帧
# 编码参数
cmd.extend([
'-c:v', 'libx264',
'-preset', 'fast',
'-crf', '18',
'-r', str(fps),
'-g', str(gop_size),
'-keyint_min', str(min(gop_size, fps // 2 or 1)),
'-force_key_frames', 'expr:eq(n,0)',
'-an', # 无音频
output_file
])
@@ -294,7 +334,8 @@ class RenderSegmentVideoHandler(BaseHandler):
lut_file: Optional[str] = None,
overlay_file: Optional[str] = None,
overlap_head_ms: int = 0,
overlap_tail_ms: int = 0
overlap_tail_ms: int = 0,
source_duration_sec: Optional[float] = None
) -> List[str]:
"""
构建 FFmpeg 渲染命令
@@ -309,6 +350,7 @@ class RenderSegmentVideoHandler(BaseHandler):
overlay_file: 叠加层文件路径(可选)
overlap_head_ms: 头部 overlap 时长(毫秒)
overlap_tail_ms: 尾部 overlap 时长(毫秒)
source_duration_sec: 源视频实际时长(秒),用于检测时长不足
Returns:
FFmpeg 命令参数列表
@@ -331,10 +373,12 @@ class RenderSegmentVideoHandler(BaseHandler):
filters = self._build_video_filters(
render_spec=render_spec,
output_spec=output_spec,
duration_ms=duration_ms,
lut_file=lut_file,
has_overlay=overlay_file is not None,
overlay_file=overlay_file,
overlap_head_ms=overlap_head_ms,
overlap_tail_ms=overlap_tail_ms
overlap_tail_ms=overlap_tail_ms,
source_duration_sec=source_duration_sec
)
# 应用滤镜
@@ -353,16 +397,28 @@ class RenderSegmentVideoHandler(BaseHandler):
fps = output_spec.fps
cmd.extend(['-r', str(fps)])
# GOP 大小(关键帧间隔)
gop_size = fps * 2 # 2秒一个关键帧
cmd.extend(['-g', str(gop_size)])
cmd.extend(['-keyint_min', str(gop_size)])
# 时长(包含 overlap 区域)
total_duration_ms = duration_ms + overlap_head_ms + overlap_tail_ms
duration_sec = total_duration_ms / 1000.0
cmd.extend(['-t', str(duration_sec)])
# 动态调整 GOP 大小:对于短视频,GOP 不能大于总帧数
total_frames = int(duration_sec * fps)
if total_frames <= 1:
gop_size = 1
elif total_frames < fps:
# 短于 1 秒的视频,使用全部帧数作为 GOP(整个视频只有开头一个关键帧)
gop_size = total_frames
else:
# 正常情况,2 秒一个关键帧
gop_size = fps * 2
cmd.extend(['-g', str(gop_size)])
cmd.extend(['-keyint_min', str(min(gop_size, fps // 2 or 1))])
# 强制第一帧为关键帧
cmd.extend(['-force_key_frames', 'expr:eq(n,0)'])
# 无音频(视频片段不包含音频)
cmd.append('-an')
@@ -375,10 +431,12 @@ class RenderSegmentVideoHandler(BaseHandler):
self,
render_spec: RenderSpec,
output_spec: OutputSpec,
duration_ms: int,
lut_file: Optional[str] = None,
has_overlay: bool = False,
overlay_file: Optional[str] = None,
overlap_head_ms: int = 0,
overlap_tail_ms: int = 0
overlap_tail_ms: int = 0,
source_duration_sec: Optional[float] = None
) -> str:
"""
构建视频滤镜链
@@ -386,10 +444,12 @@ class RenderSegmentVideoHandler(BaseHandler):
Args:
render_spec: 渲染规格
output_spec: 输出规格
duration_ms: 目标时长(毫秒)
lut_file: LUT 文件路径
has_overlay: 是否有叠加层
overlay_file: 叠加层文件路径(支持图片 png/jpg 和视频 mov)
overlap_head_ms: 头部 overlap 时长(毫秒)
overlap_tail_ms: 尾部 overlap 时长(毫秒)
source_duration_sec: 源视频实际时长(秒),用于检测时长不足
Returns:
滤镜字符串
@@ -399,6 +459,10 @@ class RenderSegmentVideoHandler(BaseHandler):
height = output_spec.height
fps = output_spec.fps
# 判断 overlay 类型
has_overlay = overlay_file is not None
is_video_overlay = has_overlay and overlay_file.lower().endswith('.mov')
# 解析 effects
effects = render_spec.get_effects()
has_camera_shot = any(e.effect_type == 'cameraShot' for e in effects)
@@ -458,23 +522,45 @@ class RenderSegmentVideoHandler(BaseHandler):
base_filters=filters,
effects=effects,
fps=fps,
width=width,
height=height,
has_overlay=has_overlay,
is_video_overlay=is_video_overlay,
overlap_head_ms=overlap_head_ms,
overlap_tail_ms=overlap_tail_ms,
use_hwdownload=bool(hwaccel_prefix)
use_hwdownload=bool(hwaccel_prefix),
duration_ms=duration_ms,
render_spec=render_spec,
source_duration_sec=source_duration_sec
)
# 6. 帧冻结(tpad)- 用于转场 overlap 区域
# 6. 帧冻结(tpad)- 用于转场 overlap 区域和时长不足补足
# 注意:tpad 必须在缩放之后应用
tpad_parts = []
# 计算是否需要额外的尾部冻结(源视频时长不足)
extra_tail_freeze_sec = 0.0
if source_duration_sec is not None:
speed = float(render_spec.speed) if render_spec.speed else 1.0
if speed > 0:
# 计算变速后的有效时长
effective_duration_sec = source_duration_sec / speed
required_duration_sec = duration_ms / 1000.0
# 如果源视频时长不足,需要冻结最后一帧来补足
if effective_duration_sec < required_duration_sec:
extra_tail_freeze_sec = required_duration_sec - effective_duration_sec
if overlap_head_ms > 0:
# 头部冻结:将第一帧冻结指定时长
head_duration_sec = overlap_head_ms / 1000.0
tpad_parts.append(f"start_mode=clone:start_duration={head_duration_sec}")
if overlap_tail_ms > 0:
# 尾部冻结:将最后一帧冻结指定时长
tail_duration_sec = overlap_tail_ms / 1000.0
tpad_parts.append(f"stop_mode=clone:stop_duration={tail_duration_sec}")
# 尾部冻结:合并 overlap 和时长不足的冻结
total_tail_freeze_sec = (overlap_tail_ms / 1000.0) + extra_tail_freeze_sec
if total_tail_freeze_sec > 0:
# 将最后一帧冻结指定时长
tpad_parts.append(f"stop_mode=clone:stop_duration={total_tail_freeze_sec}")
if tpad_parts:
filters.append(f"tpad={':'.join(tpad_parts)}")
@@ -483,7 +569,13 @@ class RenderSegmentVideoHandler(BaseHandler):
if has_overlay:
# 使用 filter_complex 格式
base_filters = ','.join(filters) if filters else 'copy'
return f"[0:v]{base_filters}[base];[base][1:v]overlay=0:0"
overlay_scale = f"scale={width}:{height}"
# 视频 overlay 使用 eof_action=pass(结束后消失),图片 overlay 使用默认行为(保持显示)
overlay_params = 'eof_action=pass' if is_video_overlay else ''
overlay_filter = f"overlay=0:0:{overlay_params}" if overlay_params else 'overlay=0:0'
# 视频 overlay 需要在末尾统一颜色范围,避免 overlay 结束后 range 从 tv 变为 pc
range_fix = ',format=yuv420p,setrange=tv' if is_video_overlay else ''
return f"[0:v]{base_filters}[base];[1:v]{overlay_scale}[overlay];[base][overlay]{overlay_filter}{range_fix}"
else:
return ','.join(filters) if filters else ''
@@ -492,10 +584,16 @@ class RenderSegmentVideoHandler(BaseHandler):
base_filters: List[str],
effects: List[Effect],
fps: int,
width: int,
height: int,
has_overlay: bool = False,
is_video_overlay: bool = False,
overlap_head_ms: int = 0,
overlap_tail_ms: int = 0,
use_hwdownload: bool = False
use_hwdownload: bool = False,
duration_ms: int = 0,
render_spec: Optional[RenderSpec] = None,
source_duration_sec: Optional[float] = None
) -> str:
"""
构建包含特效的 filter_complex 滤镜图
@@ -506,10 +604,16 @@ class RenderSegmentVideoHandler(BaseHandler):
base_filters: 基础滤镜列表
effects: 特效列表
fps: 帧率
width: 输出宽度
height: 输出高度
has_overlay: 是否有叠加层
is_video_overlay: 叠加层是否为视频格式(如 .mov)
overlap_head_ms: 头部 overlap 时长
overlap_tail_ms: 尾部 overlap 时长
use_hwdownload: 是否使用了硬件加速解码(已在 base_filters 中包含 hwdownload)
duration_ms: 目标时长(毫秒)
render_spec: 渲染规格(用于获取变速参数)
source_duration_sec: 源视频实际时长(秒),用于检测时长不足
Returns:
filter_complex 格式的滤镜字符串
@@ -531,15 +635,16 @@ class RenderSegmentVideoHandler(BaseHandler):
if start_sec <= 0 or duration_sec <= 0:
continue
# cameraShot 实现:
# cameraShot 实现(定格效果)
# 1. fps + split 分割
# 2. 第一路:trim(0, start+duration) + freezeframes
# 2. 第一路:trim(0, start) + tpad冻结duration秒
# 3. 第二路:trim(start, end)
# 4. concat 拼接
start_frame = start_sec * fps
split_out_a = f'[eff{effect_idx}_a]'
split_out_b = f'[eff{effect_idx}_b]'
frozen_out = f'[eff{effect_idx}_frozen]'
rest_out = f'[eff{effect_idx}_rest]'
effect_output = f'[v_eff{effect_idx}]'
# fps + split
@@ -547,37 +652,50 @@ class RenderSegmentVideoHandler(BaseHandler):
f"{current_output}fps=fps={fps},split{split_out_a}{split_out_b}"
)
# 第一路:trim + freezeframes(在 start 帧处冻结 duration 秒)
# freezeframes: 从 first 帧开始,用 replace 帧替换后续帧
# 这样实现定格效果:在 start_frame 位置冻结
# 第一路:trim(0, start) + tpad冻结
# tpad=stop_mode=clone 将最后一帧冻结指定时长
filter_parts.append(
f"{split_out_a}trim=start=0:end={start_sec + duration_sec},"
f"setpts=PTS-STARTPTS,"
f"freezeframes=first={start_frame}:last={start_frame + duration_sec * fps - 1}:replace={start_frame}"
f"{split_out_a}"
f"{split_out_a}trim=start=0:end={start_sec},setpts=PTS-STARTPTS,"
f"tpad=stop_mode=clone:stop_duration={duration_sec}{frozen_out}"
)
# 第二路:trim 从 start 开始
filter_parts.append(
f"{split_out_b}trim=start={start_sec},setpts=PTS-STARTPTS{split_out_b}"
f"{split_out_b}trim=start={start_sec},setpts=PTS-STARTPTS{rest_out}"
)
# concat 拼接
filter_parts.append(
f"{split_out_a}{split_out_b}concat=n=2:v=1:a=0{effect_output}"
f"{frozen_out}{rest_out}concat=n=2:v=1:a=0{effect_output}"
)
current_output = effect_output
effect_idx += 1
# 帧冻结(tpad)- 用于转场 overlap 区域
# 帧冻结(tpad)- 用于转场 overlap 区域和时长不足补足
tpad_parts = []
# 计算是否需要额外的尾部冻结(源视频时长不足)
extra_tail_freeze_sec = 0.0
if source_duration_sec is not None and render_spec is not None and duration_ms > 0:
speed = float(render_spec.speed) if render_spec.speed else 1.0
if speed > 0:
# 计算变速后的有效时长
effective_duration_sec = source_duration_sec / speed
required_duration_sec = duration_ms / 1000.0
# 如果源视频时长不足,需要冻结最后一帧来补足
if effective_duration_sec < required_duration_sec:
extra_tail_freeze_sec = required_duration_sec - effective_duration_sec
if overlap_head_ms > 0:
head_duration_sec = overlap_head_ms / 1000.0
tpad_parts.append(f"start_mode=clone:start_duration={head_duration_sec}")
if overlap_tail_ms > 0:
tail_duration_sec = overlap_tail_ms / 1000.0
tpad_parts.append(f"stop_mode=clone:stop_duration={tail_duration_sec}")
# 尾部冻结:合并 overlap 和时长不足的冻结
total_tail_freeze_sec = (overlap_tail_ms / 1000.0) + extra_tail_freeze_sec
if total_tail_freeze_sec > 0:
tpad_parts.append(f"stop_mode=clone:stop_duration={total_tail_freeze_sec}")
if tpad_parts:
tpad_output = '[v_tpad]'
@@ -587,7 +705,15 @@ class RenderSegmentVideoHandler(BaseHandler):
# 最终输出
if has_overlay:
# 叠加层处理
filter_parts.append(f"{current_output}[1:v]overlay=0:0")
# 视频 overlay 使用 eof_action=pass(结束后消失),图片 overlay 使用默认行为(保持显示)
overlay_params = 'eof_action=pass' if is_video_overlay else ''
overlay_filter = f"overlay=0:0:{overlay_params}" if overlay_params else 'overlay=0:0'
overlay_scale = f"scale={width}:{height}"
overlay_output = '[v_overlay]'
# 视频 overlay 需要在末尾统一颜色范围,避免 overlay 结束后 range 从 tv 变为 pc
range_fix = ',format=yuv420p,setrange=tv' if is_video_overlay else ''
filter_parts.append(f"[1:v]{overlay_scale}{overlay_output}")
filter_parts.append(f"{current_output}{overlay_output}{overlay_filter}{range_fix}")
else:
# 移除最后一个标签,直接输出
# 将最后一个滤镜的输出标签替换为空(直接输出)

View File

@@ -25,6 +25,8 @@ import sys
import time
import signal
import logging
import os
from logging.handlers import RotatingFileHandler
from dotenv import load_dotenv
@@ -34,11 +36,55 @@ from services.task_executor import TaskExecutor
from constant import SOFTWARE_VERSION
# 日志配置
logging.basicConfig(
level=logging.INFO,
format='[%(asctime)s] [%(levelname)s] [%(name)s] %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
def setup_logging():
"""配置日志系统,输出到控制台和文件"""
# 日志格式
log_format = '[%(asctime)s] [%(levelname)s] [%(name)s] %(message)s'
date_format = '%Y-%m-%d %H:%M:%S'
formatter = logging.Formatter(log_format, date_format)
# 获取根logger
root_logger = logging.getLogger()
root_logger.setLevel(logging.INFO)
# 清除已有的handlers(避免重复)
root_logger.handlers.clear()
# 1. 控制台handler(只输出WARNING及以上级别)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.WARNING)
console_handler.setFormatter(formatter)
root_logger.addHandler(console_handler)
# 确保日志文件所在目录存在
log_dir = os.path.dirname(os.path.abspath(__file__))
# 2. 所有日志文件handler(all_log.log)
all_log_path = os.path.join(log_dir, 'all_log.log')
all_log_handler = RotatingFileHandler(
all_log_path,
maxBytes=10*1024*1024, # 10MB
backupCount=5,
encoding='utf-8'
)
all_log_handler.setLevel(logging.DEBUG) # 记录所有级别
all_log_handler.setFormatter(formatter)
root_logger.addHandler(all_log_handler)
# 3. 错误日志文件handler(error.log)
error_log_path = os.path.join(log_dir, 'error.log')
error_log_handler = RotatingFileHandler(
error_log_path,
maxBytes=10*1024*1024, # 10MB
backupCount=5,
encoding='utf-8'
)
error_log_handler.setLevel(logging.ERROR) # 只记录ERROR及以上
error_log_handler.setFormatter(formatter)
root_logger.addHandler(error_log_handler)
# 初始化日志系统
setup_logging()
logger = logging.getLogger('worker')

View File

@@ -5,6 +5,7 @@
提供素材下载缓存功能,避免相同素材重复下载。
"""
import json
import os
import hashlib
import logging
@@ -14,6 +15,8 @@ import uuid
from typing import Optional, Tuple
from urllib.parse import urlparse, unquote
import psutil
from services import storage
logger = logging.getLogger(__name__)
@@ -62,6 +65,7 @@ class MaterialCache:
LOCK_TIMEOUT_SEC = 30.0
LOCK_POLL_INTERVAL_SEC = 0.1
LOCK_STALE_SECONDS = 24 * 60 * 60
def __init__(self, cache_dir: str, enabled: bool = True, max_size_gb: float = 0):
"""
@@ -100,6 +104,96 @@ class MaterialCache:
assert self.cache_dir
return os.path.join(self.cache_dir, f"{cache_key}.lock")
def _write_lock_metadata(self, lock_fd: int, lock_path: str) -> bool:
"""写入锁元数据,失败则清理锁文件"""
try:
try:
process_start_time = psutil.Process(os.getpid()).create_time()
except Exception as e:
process_start_time = None
logger.warning(f"Cache lock process start time error: {e}")
metadata = {
'pid': os.getpid(),
'process_start_time': process_start_time,
'created_at': time.time()
}
with os.fdopen(lock_fd, 'w', encoding='utf-8') as lock_file:
json.dump(metadata, lock_file)
return True
except Exception as e:
try:
os.close(lock_fd)
except Exception:
pass
self._remove_lock_file(lock_path, f"write metadata failed: {e}")
return False
def _read_lock_metadata(self, lock_path: str) -> Optional[dict]:
"""读取锁元数据,失败返回 None(兼容历史空锁文件)"""
try:
with open(lock_path, 'r', encoding='utf-8') as lock_file:
data = json.load(lock_file)
return data if isinstance(data, dict) else None
except Exception:
return None
def _is_process_alive(self, pid: int, expected_start_time: Optional[float]) -> bool:
"""判断进程是否存活并校验启动时间(防止 PID 复用)"""
try:
process = psutil.Process(pid)
if expected_start_time is None:
return process.is_running()
actual_start_time = process.create_time()
return abs(actual_start_time - expected_start_time) < 1.0
except psutil.NoSuchProcess:
return False
except Exception as e:
logger.warning(f"Cache lock process check error: {e}")
return True
def _is_lock_stale(self, lock_path: str) -> bool:
"""判断锁是否过期(进程已退出或超过最大存活时长)"""
if not os.path.exists(lock_path):
return False
now = time.time()
metadata = self._read_lock_metadata(lock_path)
if metadata:
created_at = metadata.get('created_at')
if isinstance(created_at, (int, float)) and now - created_at > self.LOCK_STALE_SECONDS:
return True
pid = metadata.get('pid')
pid_value = int(pid) if isinstance(pid, int) or (isinstance(pid, str) and pid.isdigit()) else None
expected_start_time = metadata.get('process_start_time')
expected_start_time_value = (
expected_start_time if isinstance(expected_start_time, (int, float)) else None
)
if pid_value is not None and not self._is_process_alive(pid_value, expected_start_time_value):
return True
return self._is_lock_stale_by_mtime(lock_path, now)
return self._is_lock_stale_by_mtime(lock_path, now)
def _is_lock_stale_by_mtime(self, lock_path: str, now: float) -> bool:
"""基于文件时间判断锁是否过期"""
try:
mtime = os.path.getmtime(lock_path)
return now - mtime > self.LOCK_STALE_SECONDS
except Exception as e:
logger.warning(f"Cache lock stat error: {e}")
return False
def _remove_lock_file(self, lock_path: str, reason: str = "") -> bool:
"""删除锁文件"""
try:
os.remove(lock_path)
if reason:
logger.info(f"Cache lock removed: {lock_path} ({reason})")
return True
except FileNotFoundError:
return True
except Exception as e:
logger.warning(f"Cache lock remove error: {e}")
return False
def _acquire_lock(self, cache_key: str) -> Optional[str]:
"""获取缓存锁(跨进程安全)"""
if not self.enabled:
@@ -111,9 +205,14 @@ class MaterialCache:
while True:
try:
fd = os.open(lock_path, os.O_CREAT | os.O_EXCL | os.O_WRONLY)
os.close(fd)
if not self._write_lock_metadata(fd, lock_path):
return None
return lock_path
except FileExistsError:
if self._is_lock_stale(lock_path):
removed = self._remove_lock_file(lock_path, "stale lock")
if removed:
continue
if time.monotonic() >= deadline:
logger.warning(f"Cache lock timeout: {lock_path}")
return None
@@ -126,12 +225,7 @@ class MaterialCache:
"""释放缓存锁"""
if not lock_path:
return
try:
os.remove(lock_path)
except FileNotFoundError:
return
except Exception as e:
logger.warning(f"Cache lock release error: {e}")
self._remove_lock_file(lock_path)
def is_cached(self, url: str) -> Tuple[bool, str]:
"""
@@ -251,6 +345,66 @@ class MaterialCache:
finally:
self._release_lock(lock_path)
def add_to_cache(self, url: str, source_path: str) -> bool:
"""
将本地文件添加到缓存
Args:
url: 对应的 URL(用于生成缓存键)
source_path: 本地文件路径
Returns:
是否成功
"""
if not self.enabled:
return False
if not os.path.exists(source_path):
logger.warning(f"Source file not found for cache: {source_path}")
return False
cache_key = _extract_cache_key(url)
lock_path = self._acquire_lock(cache_key)
if not lock_path:
logger.warning(f"Cache lock unavailable for adding: {url[:80]}...")
return False
try:
cache_path = self.get_cache_path(url)
# 先复制到临时文件
temp_cache_path = os.path.join(
self.cache_dir,
f"{cache_key}.{uuid.uuid4().hex}.adding"
)
shutil.copy2(source_path, temp_cache_path)
# 原子替换
os.replace(temp_cache_path, cache_path)
# 更新访问时间
os.utime(cache_path, None)
logger.info(f"Added to cache: {url[:80]}... <- {source_path}")
# 检查清理
if self.max_size_bytes > 0:
self._cleanup_if_needed()
return True
except Exception as e:
logger.error(f"Failed to add to cache: {e}")
if 'temp_cache_path' in locals() and os.path.exists(temp_cache_path):
try:
os.remove(temp_cache_path)
except Exception:
pass
return False
finally:
self._release_lock(lock_path)
def _cleanup_if_needed(self) -> None:
"""
检查并清理缓存(LRU 策略)
@@ -292,6 +446,17 @@ class MaterialCache:
for file_info in cache_files:
if total_size <= target_size:
break
# 从文件名提取 cache_key,检查是否有锁(说明正在被使用)
filename = os.path.basename(file_info['path'])
cache_key = os.path.splitext(filename)[0]
lock_path = self._get_lock_path(cache_key)
if os.path.exists(lock_path):
if self._is_lock_stale(lock_path):
self._remove_lock_file(lock_path, "cleanup stale lock")
else:
# 该文件正在被其他任务使用,跳过删除
logger.debug(f"Cache cleanup: skipping locked file {filename}")
continue
try:
os.remove(file_info['path'])
total_size -= file_info['size']

View File

@@ -9,12 +9,36 @@ import os
import logging
import subprocess
from typing import Optional
from urllib.parse import unquote
import requests
logger = logging.getLogger(__name__)
# 文件扩展名到 Content-Type 的映射
_CONTENT_TYPE_MAP = {
'.mp4': 'video/mp4',
'.aac': 'audio/aac',
'.ts': 'video/mp2t',
'.m4a': 'audio/mp4',
}
def _get_content_type(file_path: str) -> str:
"""
根据文件扩展名获取 Content-Type
Args:
file_path: 文件路径
Returns:
Content-Type 字符串
"""
ext = os.path.splitext(file_path)[1].lower()
return _CONTENT_TYPE_MAP.get(ext, 'application/octet-stream')
def _apply_http_replace_map(url: str) -> str:
"""
应用 HTTP_REPLACE_MAP 环境变量替换 URL
@@ -62,6 +86,7 @@ def upload_file(url: str, file_path: str, max_retries: int = 5, timeout: int = 6
# 检查是否使用 rclone 上传
if os.getenv("UPLOAD_METHOD") == "rclone":
logger.info(f"Uploading to: {url}")
result = _upload_with_rclone(url, file_path)
if result:
return True
@@ -69,6 +94,8 @@ def upload_file(url: str, file_path: str, max_retries: int = 5, timeout: int = 6
# 应用 HTTP_REPLACE_MAP 替换 URL
http_url = _apply_http_replace_map(url)
content_type = _get_content_type(file_path)
logger.info(f"Uploading to: {http_url} (Content-Type: {content_type})")
retries = 0
while retries < max_retries:
@@ -79,7 +106,7 @@ def upload_file(url: str, file_path: str, max_retries: int = 5, timeout: int = 6
data=f,
stream=True,
timeout=timeout,
headers={"Content-Type": "application/octet-stream"}
headers={"Content-Type": content_type}
) as response:
response.raise_for_status()
logger.info(f"Upload succeeded: {file_path}")
@@ -119,6 +146,7 @@ def _upload_with_rclone(url: str, file_path: str) -> bool:
for src, dst in replace_list:
new_url = new_url.replace(src, dst)
new_url = new_url.split("?", 1)[0] # 移除查询参数
new_url = unquote(new_url) # 解码 URL 编码的字符(如 %2F -> /)
if new_url == url:
return False

View File

@@ -12,6 +12,12 @@ from typing import Dict, Optional, TYPE_CHECKING
from domain.task import Task, TaskType
from domain.result import TaskResult, ErrorCode
# 需要 GPU 加速的任务类型
GPU_REQUIRED_TASK_TYPES = {
TaskType.RENDER_SEGMENT_VIDEO,
TaskType.COMPOSE_TRANSITION,
}
from domain.config import WorkerConfig
from core.handler import TaskHandler
from services.lease_service import LeaseService
@@ -179,9 +185,10 @@ class TaskExecutor:
)
lease_service.start()
# 获取 GPU 设备
# 获取 GPU 设备(仅对需要 GPU 的任务类型)
device_index = None
if self.gpu_scheduler.enabled:
needs_gpu = task.task_type in GPU_REQUIRED_TASK_TYPES
if needs_gpu and self.gpu_scheduler.enabled:
device_index = self.gpu_scheduler.acquire()
if device_index is not None:
logger.info(f"[task:{task_id}] Assigned to GPU device {device_index}")
@@ -227,8 +234,8 @@ class TaskExecutor:
if handler:
handler.clear_gpu_device()
# 释放 GPU 设备
if self.gpu_scheduler.enabled:
# 释放 GPU 设备(仅当实际分配了设备时)
if device_index is not None:
self.gpu_scheduler.release(device_index)
# 停止租约续期