You've already forked FrameTour-RenderWorker
Compare commits
16 Commits
b291f33486
...
next
| Author | SHA1 | Date | |
|---|---|---|---|
| eeb21cada3 | |||
| a70573395b | |||
| ffb9d5390e | |||
| 6126856361 | |||
| a6263398ed | |||
| 885b69233a | |||
| 9158854411 | |||
| 634dc6c855 | |||
| ca9093504f | |||
| ceba9a17a4 | |||
| 7acae2f708 | |||
| ed8dca543e | |||
| 0a7a0dac89 | |||
| 797507d24b | |||
| f7ca07b9db | |||
| 4d5e57f61b |
@@ -114,6 +114,10 @@ def get_hwaccel_filter_prefix(hw_accel: str = HW_ACCEL_NONE) -> str:
|
||||
注意:由于大多数复杂滤镜(如 lut3d, overlay, crop 等)不支持硬件表面,
|
||||
我们需要在滤镜链开始时将硬件表面下载到系统内存。
|
||||
|
||||
CUDA/QSV hwdownload 只支持 nv12 格式输出,因此需要两步转换:
|
||||
1. hwdownload,format=nv12 - 从 GPU 下载到 CPU
|
||||
2. format=yuv420p - 转换为标准格式(确保与 RGBA/YUVA overlay 混合时颜色正确)
|
||||
|
||||
Args:
|
||||
hw_accel: 硬件加速类型
|
||||
|
||||
@@ -121,9 +125,9 @@ def get_hwaccel_filter_prefix(hw_accel: str = HW_ACCEL_NONE) -> str:
|
||||
需要添加到滤镜链开头的 hwdownload 滤镜字符串
|
||||
"""
|
||||
if hw_accel == HW_ACCEL_CUDA:
|
||||
return 'hwdownload,format=nv12,'
|
||||
return 'hwdownload,format=nv12,format=yuv420p,'
|
||||
elif hw_accel == HW_ACCEL_QSV:
|
||||
return 'hwdownload,format=nv12,'
|
||||
return 'hwdownload,format=nv12,format=yuv420p,'
|
||||
else:
|
||||
return ''
|
||||
|
||||
@@ -460,6 +464,11 @@ class BaseHandler(TaskHandler, ABC):
|
||||
if result:
|
||||
file_size = os.path.getsize(file_path)
|
||||
logger.info(f"[task:{task_id}] Uploaded: {file_path} ({file_size} bytes)")
|
||||
|
||||
# 将上传成功的文件加入缓存
|
||||
if access_url:
|
||||
self.material_cache.add_to_cache(access_url, file_path)
|
||||
|
||||
return access_url
|
||||
else:
|
||||
logger.error(f"[task:{task_id}] Upload failed: {file_path}")
|
||||
|
||||
@@ -127,9 +127,9 @@ class FinalizeMp4Handler(BaseHandler):
|
||||
concat_file = os.path.join(work_dir, 'concat.txt')
|
||||
with open(concat_file, 'w', encoding='utf-8') as f:
|
||||
for ts_file in ts_files:
|
||||
# 路径中的反斜杠需要转义或使用正斜杠
|
||||
ts_path = ts_file.replace('\\', '/')
|
||||
f.write(f"file '{ts_path}'\n")
|
||||
# FFmpeg concat 路径相对于 concat.txt 所在目录,只需写文件名
|
||||
ts_filename = os.path.basename(ts_file)
|
||||
f.write(f"file '{ts_filename}'\n")
|
||||
|
||||
# 3. 构建合并命令(remux,不重编码)
|
||||
cmd = [
|
||||
|
||||
@@ -123,9 +123,13 @@ class RenderSegmentVideoHandler(BaseHandler):
|
||||
overlay_file = None
|
||||
if render_spec.overlay_url:
|
||||
# 根据 URL 后缀确定文件扩展名
|
||||
ext = '.png'
|
||||
if render_spec.overlay_url.lower().endswith('.jpg') or render_spec.overlay_url.lower().endswith('.jpeg'):
|
||||
url_lower = render_spec.overlay_url.lower()
|
||||
if url_lower.endswith('.jpg') or url_lower.endswith('.jpeg'):
|
||||
ext = '.jpg'
|
||||
elif url_lower.endswith('.mov'):
|
||||
ext = '.mov'
|
||||
else:
|
||||
ext = '.png' # 默认
|
||||
overlay_file = os.path.join(work_dir, f'overlay{ext}')
|
||||
if not self.download_file(render_spec.overlay_url, overlay_file):
|
||||
logger.warning(f"[task:{task.task_id}] Failed to download overlay, continuing without it")
|
||||
@@ -271,12 +275,24 @@ class RenderSegmentVideoHandler(BaseHandler):
|
||||
|
||||
cmd.extend(['-vf', ','.join(filters)])
|
||||
|
||||
# 计算总帧数,动态调整 GOP
|
||||
total_frames = int(actual_duration_sec * fps)
|
||||
if total_frames <= 1:
|
||||
gop_size = 1
|
||||
elif total_frames < fps:
|
||||
gop_size = total_frames
|
||||
else:
|
||||
gop_size = fps * 2 # 正常情况,2 秒一个关键帧
|
||||
|
||||
# 编码参数
|
||||
cmd.extend([
|
||||
'-c:v', 'libx264',
|
||||
'-preset', 'fast',
|
||||
'-crf', '18',
|
||||
'-r', str(fps),
|
||||
'-g', str(gop_size),
|
||||
'-keyint_min', str(min(gop_size, fps // 2 or 1)),
|
||||
'-force_key_frames', 'expr:eq(n,0)',
|
||||
'-an', # 无音频
|
||||
output_file
|
||||
])
|
||||
@@ -332,7 +348,7 @@ class RenderSegmentVideoHandler(BaseHandler):
|
||||
render_spec=render_spec,
|
||||
output_spec=output_spec,
|
||||
lut_file=lut_file,
|
||||
has_overlay=overlay_file is not None,
|
||||
overlay_file=overlay_file,
|
||||
overlap_head_ms=overlap_head_ms,
|
||||
overlap_tail_ms=overlap_tail_ms
|
||||
)
|
||||
@@ -353,16 +369,28 @@ class RenderSegmentVideoHandler(BaseHandler):
|
||||
fps = output_spec.fps
|
||||
cmd.extend(['-r', str(fps)])
|
||||
|
||||
# GOP 大小(关键帧间隔)
|
||||
gop_size = fps * 2 # 2秒一个关键帧
|
||||
cmd.extend(['-g', str(gop_size)])
|
||||
cmd.extend(['-keyint_min', str(gop_size)])
|
||||
|
||||
# 时长(包含 overlap 区域)
|
||||
total_duration_ms = duration_ms + overlap_head_ms + overlap_tail_ms
|
||||
duration_sec = total_duration_ms / 1000.0
|
||||
cmd.extend(['-t', str(duration_sec)])
|
||||
|
||||
# 动态调整 GOP 大小:对于短视频,GOP 不能大于总帧数
|
||||
total_frames = int(duration_sec * fps)
|
||||
if total_frames <= 1:
|
||||
gop_size = 1
|
||||
elif total_frames < fps:
|
||||
# 短于 1 秒的视频,使用全部帧数作为 GOP(整个视频只有开头一个关键帧)
|
||||
gop_size = total_frames
|
||||
else:
|
||||
# 正常情况,2 秒一个关键帧
|
||||
gop_size = fps * 2
|
||||
|
||||
cmd.extend(['-g', str(gop_size)])
|
||||
cmd.extend(['-keyint_min', str(min(gop_size, fps // 2 or 1))])
|
||||
|
||||
# 强制第一帧为关键帧
|
||||
cmd.extend(['-force_key_frames', 'expr:eq(n,0)'])
|
||||
|
||||
# 无音频(视频片段不包含音频)
|
||||
cmd.append('-an')
|
||||
|
||||
@@ -376,7 +404,7 @@ class RenderSegmentVideoHandler(BaseHandler):
|
||||
render_spec: RenderSpec,
|
||||
output_spec: OutputSpec,
|
||||
lut_file: Optional[str] = None,
|
||||
has_overlay: bool = False,
|
||||
overlay_file: Optional[str] = None,
|
||||
overlap_head_ms: int = 0,
|
||||
overlap_tail_ms: int = 0
|
||||
) -> str:
|
||||
@@ -387,7 +415,7 @@ class RenderSegmentVideoHandler(BaseHandler):
|
||||
render_spec: 渲染规格
|
||||
output_spec: 输出规格
|
||||
lut_file: LUT 文件路径
|
||||
has_overlay: 是否有叠加层
|
||||
overlay_file: 叠加层文件路径(支持图片 png/jpg 和视频 mov)
|
||||
overlap_head_ms: 头部 overlap 时长(毫秒)
|
||||
overlap_tail_ms: 尾部 overlap 时长(毫秒)
|
||||
|
||||
@@ -399,6 +427,10 @@ class RenderSegmentVideoHandler(BaseHandler):
|
||||
height = output_spec.height
|
||||
fps = output_spec.fps
|
||||
|
||||
# 判断 overlay 类型
|
||||
has_overlay = overlay_file is not None
|
||||
is_video_overlay = has_overlay and overlay_file.lower().endswith('.mov')
|
||||
|
||||
# 解析 effects
|
||||
effects = render_spec.get_effects()
|
||||
has_camera_shot = any(e.effect_type == 'cameraShot' for e in effects)
|
||||
@@ -458,7 +490,10 @@ class RenderSegmentVideoHandler(BaseHandler):
|
||||
base_filters=filters,
|
||||
effects=effects,
|
||||
fps=fps,
|
||||
width=width,
|
||||
height=height,
|
||||
has_overlay=has_overlay,
|
||||
is_video_overlay=is_video_overlay,
|
||||
overlap_head_ms=overlap_head_ms,
|
||||
overlap_tail_ms=overlap_tail_ms,
|
||||
use_hwdownload=bool(hwaccel_prefix)
|
||||
@@ -483,7 +518,13 @@ class RenderSegmentVideoHandler(BaseHandler):
|
||||
if has_overlay:
|
||||
# 使用 filter_complex 格式
|
||||
base_filters = ','.join(filters) if filters else 'copy'
|
||||
return f"[0:v]{base_filters}[base];[base][1:v]overlay=0:0"
|
||||
overlay_scale = f"scale={width}:{height}"
|
||||
# 视频 overlay 使用 eof_action=pass(结束后消失),图片 overlay 使用默认行为(保持显示)
|
||||
overlay_params = 'eof_action=pass' if is_video_overlay else ''
|
||||
overlay_filter = f"overlay=0:0:{overlay_params}" if overlay_params else 'overlay=0:0'
|
||||
# 视频 overlay 需要在末尾统一颜色范围,避免 overlay 结束后 range 从 tv 变为 pc
|
||||
range_fix = ',format=yuv420p,setrange=tv' if is_video_overlay else ''
|
||||
return f"[0:v]{base_filters}[base];[1:v]{overlay_scale}[overlay];[base][overlay]{overlay_filter}{range_fix}"
|
||||
else:
|
||||
return ','.join(filters) if filters else ''
|
||||
|
||||
@@ -492,7 +533,10 @@ class RenderSegmentVideoHandler(BaseHandler):
|
||||
base_filters: List[str],
|
||||
effects: List[Effect],
|
||||
fps: int,
|
||||
width: int,
|
||||
height: int,
|
||||
has_overlay: bool = False,
|
||||
is_video_overlay: bool = False,
|
||||
overlap_head_ms: int = 0,
|
||||
overlap_tail_ms: int = 0,
|
||||
use_hwdownload: bool = False
|
||||
@@ -506,7 +550,10 @@ class RenderSegmentVideoHandler(BaseHandler):
|
||||
base_filters: 基础滤镜列表
|
||||
effects: 特效列表
|
||||
fps: 帧率
|
||||
width: 输出宽度
|
||||
height: 输出高度
|
||||
has_overlay: 是否有叠加层
|
||||
is_video_overlay: 叠加层是否为视频格式(如 .mov)
|
||||
overlap_head_ms: 头部 overlap 时长
|
||||
overlap_tail_ms: 尾部 overlap 时长
|
||||
use_hwdownload: 是否使用了硬件加速解码(已在 base_filters 中包含 hwdownload)
|
||||
@@ -531,15 +578,16 @@ class RenderSegmentVideoHandler(BaseHandler):
|
||||
if start_sec <= 0 or duration_sec <= 0:
|
||||
continue
|
||||
|
||||
# cameraShot 实现:
|
||||
# cameraShot 实现(定格效果):
|
||||
# 1. fps + split 分割
|
||||
# 2. 第一路:trim(0, start+duration) + freezeframes
|
||||
# 2. 第一路:trim(0, start) + tpad冻结duration秒
|
||||
# 3. 第二路:trim(start, end)
|
||||
# 4. concat 拼接
|
||||
|
||||
start_frame = start_sec * fps
|
||||
split_out_a = f'[eff{effect_idx}_a]'
|
||||
split_out_b = f'[eff{effect_idx}_b]'
|
||||
frozen_out = f'[eff{effect_idx}_frozen]'
|
||||
rest_out = f'[eff{effect_idx}_rest]'
|
||||
effect_output = f'[v_eff{effect_idx}]'
|
||||
|
||||
# fps + split
|
||||
@@ -547,24 +595,21 @@ class RenderSegmentVideoHandler(BaseHandler):
|
||||
f"{current_output}fps=fps={fps},split{split_out_a}{split_out_b}"
|
||||
)
|
||||
|
||||
# 第一路:trim + freezeframes(在 start 帧处冻结 duration 秒)
|
||||
# freezeframes: 从 first 帧开始,用 replace 帧替换后续帧
|
||||
# 这样实现定格效果:在 start_frame 位置冻结
|
||||
# 第一路:trim(0, start) + tpad冻结
|
||||
# tpad=stop_mode=clone 将最后一帧冻结指定时长
|
||||
filter_parts.append(
|
||||
f"{split_out_a}trim=start=0:end={start_sec + duration_sec},"
|
||||
f"setpts=PTS-STARTPTS,"
|
||||
f"freezeframes=first={start_frame}:last={start_frame + duration_sec * fps - 1}:replace={start_frame}"
|
||||
f"{split_out_a}"
|
||||
f"{split_out_a}trim=start=0:end={start_sec},setpts=PTS-STARTPTS,"
|
||||
f"tpad=stop_mode=clone:stop_duration={duration_sec}{frozen_out}"
|
||||
)
|
||||
|
||||
# 第二路:trim 从 start 开始
|
||||
filter_parts.append(
|
||||
f"{split_out_b}trim=start={start_sec},setpts=PTS-STARTPTS{split_out_b}"
|
||||
f"{split_out_b}trim=start={start_sec},setpts=PTS-STARTPTS{rest_out}"
|
||||
)
|
||||
|
||||
# concat 拼接
|
||||
filter_parts.append(
|
||||
f"{split_out_a}{split_out_b}concat=n=2:v=1:a=0{effect_output}"
|
||||
f"{frozen_out}{rest_out}concat=n=2:v=1:a=0{effect_output}"
|
||||
)
|
||||
|
||||
current_output = effect_output
|
||||
@@ -587,7 +632,15 @@ class RenderSegmentVideoHandler(BaseHandler):
|
||||
# 最终输出
|
||||
if has_overlay:
|
||||
# 叠加层处理
|
||||
filter_parts.append(f"{current_output}[1:v]overlay=0:0")
|
||||
# 视频 overlay 使用 eof_action=pass(结束后消失),图片 overlay 使用默认行为(保持显示)
|
||||
overlay_params = 'eof_action=pass' if is_video_overlay else ''
|
||||
overlay_filter = f"overlay=0:0:{overlay_params}" if overlay_params else 'overlay=0:0'
|
||||
overlay_scale = f"scale={width}:{height}"
|
||||
overlay_output = '[v_overlay]'
|
||||
# 视频 overlay 需要在末尾统一颜色范围,避免 overlay 结束后 range 从 tv 变为 pc
|
||||
range_fix = ',format=yuv420p,setrange=tv' if is_video_overlay else ''
|
||||
filter_parts.append(f"[1:v]{overlay_scale}{overlay_output}")
|
||||
filter_parts.append(f"{current_output}{overlay_output}{overlay_filter}{range_fix}")
|
||||
else:
|
||||
# 移除最后一个标签,直接输出
|
||||
# 将最后一个滤镜的输出标签替换为空(直接输出)
|
||||
|
||||
@@ -5,6 +5,7 @@
|
||||
提供素材下载缓存功能,避免相同素材重复下载。
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import hashlib
|
||||
import logging
|
||||
@@ -14,6 +15,8 @@ import uuid
|
||||
from typing import Optional, Tuple
|
||||
from urllib.parse import urlparse, unquote
|
||||
|
||||
import psutil
|
||||
|
||||
from services import storage
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -62,6 +65,7 @@ class MaterialCache:
|
||||
|
||||
LOCK_TIMEOUT_SEC = 30.0
|
||||
LOCK_POLL_INTERVAL_SEC = 0.1
|
||||
LOCK_STALE_SECONDS = 24 * 60 * 60
|
||||
|
||||
def __init__(self, cache_dir: str, enabled: bool = True, max_size_gb: float = 0):
|
||||
"""
|
||||
@@ -100,6 +104,96 @@ class MaterialCache:
|
||||
assert self.cache_dir
|
||||
return os.path.join(self.cache_dir, f"{cache_key}.lock")
|
||||
|
||||
def _write_lock_metadata(self, lock_fd: int, lock_path: str) -> bool:
|
||||
"""写入锁元数据,失败则清理锁文件"""
|
||||
try:
|
||||
try:
|
||||
process_start_time = psutil.Process(os.getpid()).create_time()
|
||||
except Exception as e:
|
||||
process_start_time = None
|
||||
logger.warning(f"Cache lock process start time error: {e}")
|
||||
metadata = {
|
||||
'pid': os.getpid(),
|
||||
'process_start_time': process_start_time,
|
||||
'created_at': time.time()
|
||||
}
|
||||
with os.fdopen(lock_fd, 'w', encoding='utf-8') as lock_file:
|
||||
json.dump(metadata, lock_file)
|
||||
return True
|
||||
except Exception as e:
|
||||
try:
|
||||
os.close(lock_fd)
|
||||
except Exception:
|
||||
pass
|
||||
self._remove_lock_file(lock_path, f"write metadata failed: {e}")
|
||||
return False
|
||||
|
||||
def _read_lock_metadata(self, lock_path: str) -> Optional[dict]:
|
||||
"""读取锁元数据,失败返回 None(兼容历史空锁文件)"""
|
||||
try:
|
||||
with open(lock_path, 'r', encoding='utf-8') as lock_file:
|
||||
data = json.load(lock_file)
|
||||
return data if isinstance(data, dict) else None
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
def _is_process_alive(self, pid: int, expected_start_time: Optional[float]) -> bool:
|
||||
"""判断进程是否存活并校验启动时间(防止 PID 复用)"""
|
||||
try:
|
||||
process = psutil.Process(pid)
|
||||
if expected_start_time is None:
|
||||
return process.is_running()
|
||||
actual_start_time = process.create_time()
|
||||
return abs(actual_start_time - expected_start_time) < 1.0
|
||||
except psutil.NoSuchProcess:
|
||||
return False
|
||||
except Exception as e:
|
||||
logger.warning(f"Cache lock process check error: {e}")
|
||||
return True
|
||||
|
||||
def _is_lock_stale(self, lock_path: str) -> bool:
|
||||
"""判断锁是否过期(进程已退出或超过最大存活时长)"""
|
||||
if not os.path.exists(lock_path):
|
||||
return False
|
||||
now = time.time()
|
||||
metadata = self._read_lock_metadata(lock_path)
|
||||
if metadata:
|
||||
created_at = metadata.get('created_at')
|
||||
if isinstance(created_at, (int, float)) and now - created_at > self.LOCK_STALE_SECONDS:
|
||||
return True
|
||||
pid = metadata.get('pid')
|
||||
pid_value = int(pid) if isinstance(pid, int) or (isinstance(pid, str) and pid.isdigit()) else None
|
||||
expected_start_time = metadata.get('process_start_time')
|
||||
expected_start_time_value = (
|
||||
expected_start_time if isinstance(expected_start_time, (int, float)) else None
|
||||
)
|
||||
if pid_value is not None and not self._is_process_alive(pid_value, expected_start_time_value):
|
||||
return True
|
||||
return self._is_lock_stale_by_mtime(lock_path, now)
|
||||
return self._is_lock_stale_by_mtime(lock_path, now)
|
||||
|
||||
def _is_lock_stale_by_mtime(self, lock_path: str, now: float) -> bool:
|
||||
"""基于文件时间判断锁是否过期"""
|
||||
try:
|
||||
mtime = os.path.getmtime(lock_path)
|
||||
return now - mtime > self.LOCK_STALE_SECONDS
|
||||
except Exception as e:
|
||||
logger.warning(f"Cache lock stat error: {e}")
|
||||
return False
|
||||
|
||||
def _remove_lock_file(self, lock_path: str, reason: str = "") -> bool:
|
||||
"""删除锁文件"""
|
||||
try:
|
||||
os.remove(lock_path)
|
||||
if reason:
|
||||
logger.info(f"Cache lock removed: {lock_path} ({reason})")
|
||||
return True
|
||||
except FileNotFoundError:
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.warning(f"Cache lock remove error: {e}")
|
||||
return False
|
||||
|
||||
def _acquire_lock(self, cache_key: str) -> Optional[str]:
|
||||
"""获取缓存锁(跨进程安全)"""
|
||||
if not self.enabled:
|
||||
@@ -111,9 +205,14 @@ class MaterialCache:
|
||||
while True:
|
||||
try:
|
||||
fd = os.open(lock_path, os.O_CREAT | os.O_EXCL | os.O_WRONLY)
|
||||
os.close(fd)
|
||||
if not self._write_lock_metadata(fd, lock_path):
|
||||
return None
|
||||
return lock_path
|
||||
except FileExistsError:
|
||||
if self._is_lock_stale(lock_path):
|
||||
removed = self._remove_lock_file(lock_path, "stale lock")
|
||||
if removed:
|
||||
continue
|
||||
if time.monotonic() >= deadline:
|
||||
logger.warning(f"Cache lock timeout: {lock_path}")
|
||||
return None
|
||||
@@ -126,12 +225,7 @@ class MaterialCache:
|
||||
"""释放缓存锁"""
|
||||
if not lock_path:
|
||||
return
|
||||
try:
|
||||
os.remove(lock_path)
|
||||
except FileNotFoundError:
|
||||
return
|
||||
except Exception as e:
|
||||
logger.warning(f"Cache lock release error: {e}")
|
||||
self._remove_lock_file(lock_path)
|
||||
|
||||
def is_cached(self, url: str) -> Tuple[bool, str]:
|
||||
"""
|
||||
@@ -251,6 +345,66 @@ class MaterialCache:
|
||||
finally:
|
||||
self._release_lock(lock_path)
|
||||
|
||||
def add_to_cache(self, url: str, source_path: str) -> bool:
|
||||
"""
|
||||
将本地文件添加到缓存
|
||||
|
||||
Args:
|
||||
url: 对应的 URL(用于生成缓存键)
|
||||
source_path: 本地文件路径
|
||||
|
||||
Returns:
|
||||
是否成功
|
||||
"""
|
||||
if not self.enabled:
|
||||
return False
|
||||
|
||||
if not os.path.exists(source_path):
|
||||
logger.warning(f"Source file not found for cache: {source_path}")
|
||||
return False
|
||||
|
||||
cache_key = _extract_cache_key(url)
|
||||
lock_path = self._acquire_lock(cache_key)
|
||||
if not lock_path:
|
||||
logger.warning(f"Cache lock unavailable for adding: {url[:80]}...")
|
||||
return False
|
||||
|
||||
try:
|
||||
cache_path = self.get_cache_path(url)
|
||||
|
||||
# 先复制到临时文件
|
||||
temp_cache_path = os.path.join(
|
||||
self.cache_dir,
|
||||
f"{cache_key}.{uuid.uuid4().hex}.adding"
|
||||
)
|
||||
|
||||
shutil.copy2(source_path, temp_cache_path)
|
||||
|
||||
# 原子替换
|
||||
os.replace(temp_cache_path, cache_path)
|
||||
|
||||
# 更新访问时间
|
||||
os.utime(cache_path, None)
|
||||
|
||||
logger.info(f"Added to cache: {url[:80]}... <- {source_path}")
|
||||
|
||||
# 检查清理
|
||||
if self.max_size_bytes > 0:
|
||||
self._cleanup_if_needed()
|
||||
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to add to cache: {e}")
|
||||
if 'temp_cache_path' in locals() and os.path.exists(temp_cache_path):
|
||||
try:
|
||||
os.remove(temp_cache_path)
|
||||
except Exception:
|
||||
pass
|
||||
return False
|
||||
finally:
|
||||
self._release_lock(lock_path)
|
||||
|
||||
def _cleanup_if_needed(self) -> None:
|
||||
"""
|
||||
检查并清理缓存(LRU 策略)
|
||||
@@ -292,6 +446,17 @@ class MaterialCache:
|
||||
for file_info in cache_files:
|
||||
if total_size <= target_size:
|
||||
break
|
||||
# 从文件名提取 cache_key,检查是否有锁(说明正在被使用)
|
||||
filename = os.path.basename(file_info['path'])
|
||||
cache_key = os.path.splitext(filename)[0]
|
||||
lock_path = self._get_lock_path(cache_key)
|
||||
if os.path.exists(lock_path):
|
||||
if self._is_lock_stale(lock_path):
|
||||
self._remove_lock_file(lock_path, "cleanup stale lock")
|
||||
else:
|
||||
# 该文件正在被其他任务使用,跳过删除
|
||||
logger.debug(f"Cache cleanup: skipping locked file {filename}")
|
||||
continue
|
||||
try:
|
||||
os.remove(file_info['path'])
|
||||
total_size -= file_info['size']
|
||||
|
||||
@@ -9,12 +9,36 @@ import os
|
||||
import logging
|
||||
import subprocess
|
||||
from typing import Optional
|
||||
from urllib.parse import unquote
|
||||
|
||||
import requests
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# 文件扩展名到 Content-Type 的映射
|
||||
_CONTENT_TYPE_MAP = {
|
||||
'.mp4': 'video/mp4',
|
||||
'.aac': 'audio/aac',
|
||||
'.ts': 'video/mp2t',
|
||||
'.m4a': 'audio/mp4',
|
||||
}
|
||||
|
||||
|
||||
def _get_content_type(file_path: str) -> str:
|
||||
"""
|
||||
根据文件扩展名获取 Content-Type
|
||||
|
||||
Args:
|
||||
file_path: 文件路径
|
||||
|
||||
Returns:
|
||||
Content-Type 字符串
|
||||
"""
|
||||
ext = os.path.splitext(file_path)[1].lower()
|
||||
return _CONTENT_TYPE_MAP.get(ext, 'application/octet-stream')
|
||||
|
||||
|
||||
def _apply_http_replace_map(url: str) -> str:
|
||||
"""
|
||||
应用 HTTP_REPLACE_MAP 环境变量替换 URL
|
||||
@@ -62,6 +86,7 @@ def upload_file(url: str, file_path: str, max_retries: int = 5, timeout: int = 6
|
||||
|
||||
# 检查是否使用 rclone 上传
|
||||
if os.getenv("UPLOAD_METHOD") == "rclone":
|
||||
logger.info(f"Uploading to: {url}")
|
||||
result = _upload_with_rclone(url, file_path)
|
||||
if result:
|
||||
return True
|
||||
@@ -69,6 +94,8 @@ def upload_file(url: str, file_path: str, max_retries: int = 5, timeout: int = 6
|
||||
|
||||
# 应用 HTTP_REPLACE_MAP 替换 URL
|
||||
http_url = _apply_http_replace_map(url)
|
||||
content_type = _get_content_type(file_path)
|
||||
logger.info(f"Uploading to: {http_url} (Content-Type: {content_type})")
|
||||
|
||||
retries = 0
|
||||
while retries < max_retries:
|
||||
@@ -79,7 +106,7 @@ def upload_file(url: str, file_path: str, max_retries: int = 5, timeout: int = 6
|
||||
data=f,
|
||||
stream=True,
|
||||
timeout=timeout,
|
||||
headers={"Content-Type": "application/octet-stream"}
|
||||
headers={"Content-Type": content_type}
|
||||
) as response:
|
||||
response.raise_for_status()
|
||||
logger.info(f"Upload succeeded: {file_path}")
|
||||
@@ -119,6 +146,7 @@ def _upload_with_rclone(url: str, file_path: str) -> bool:
|
||||
for src, dst in replace_list:
|
||||
new_url = new_url.replace(src, dst)
|
||||
new_url = new_url.split("?", 1)[0] # 移除查询参数
|
||||
new_url = unquote(new_url) # 解码 URL 编码的字符(如 %2F -> /)
|
||||
|
||||
if new_url == url:
|
||||
return False
|
||||
|
||||
@@ -12,6 +12,12 @@ from typing import Dict, Optional, TYPE_CHECKING
|
||||
|
||||
from domain.task import Task, TaskType
|
||||
from domain.result import TaskResult, ErrorCode
|
||||
|
||||
# 需要 GPU 加速的任务类型
|
||||
GPU_REQUIRED_TASK_TYPES = {
|
||||
TaskType.RENDER_SEGMENT_VIDEO,
|
||||
TaskType.COMPOSE_TRANSITION,
|
||||
}
|
||||
from domain.config import WorkerConfig
|
||||
from core.handler import TaskHandler
|
||||
from services.lease_service import LeaseService
|
||||
@@ -179,9 +185,10 @@ class TaskExecutor:
|
||||
)
|
||||
lease_service.start()
|
||||
|
||||
# 获取 GPU 设备
|
||||
# 获取 GPU 设备(仅对需要 GPU 的任务类型)
|
||||
device_index = None
|
||||
if self.gpu_scheduler.enabled:
|
||||
needs_gpu = task.task_type in GPU_REQUIRED_TASK_TYPES
|
||||
if needs_gpu and self.gpu_scheduler.enabled:
|
||||
device_index = self.gpu_scheduler.acquire()
|
||||
if device_index is not None:
|
||||
logger.info(f"[task:{task_id}] Assigned to GPU device {device_index}")
|
||||
@@ -227,8 +234,8 @@ class TaskExecutor:
|
||||
if handler:
|
||||
handler.clear_gpu_device()
|
||||
|
||||
# 释放 GPU 设备
|
||||
if self.gpu_scheduler.enabled:
|
||||
# 释放 GPU 设备(仅当实际分配了设备时)
|
||||
if device_index is not None:
|
||||
self.gpu_scheduler.release(device_index)
|
||||
|
||||
# 停止租约续期
|
||||
|
||||
Reference in New Issue
Block a user