You've already forked FrameTour-RenderWorker
- 新增素材缓存配置选项包括启用状态、缓存目录和最大缓存大小 - 实现 MaterialCache 类提供缓存存储和检索功能 - 修改 download_file 方法支持缓存下载模式 - 添加缓存清理机制使用 LRU 策略管理磁盘空间 - 配置默认值优化本地开发体验 - 实现缓存统计和监控功能
159 lines
5.0 KiB
Python
159 lines
5.0 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""
|
|
Worker 配置模型
|
|
|
|
定义 Worker 运行时的配置参数。
|
|
"""
|
|
|
|
import os
|
|
from dataclasses import dataclass, field
|
|
from typing import List, Optional
|
|
|
|
from constant import HW_ACCEL_NONE, HW_ACCEL_QSV, HW_ACCEL_CUDA, HW_ACCEL_TYPES
|
|
|
|
|
|
# 默认支持的任务类型
|
|
DEFAULT_CAPABILITIES = [
|
|
"RENDER_SEGMENT_VIDEO",
|
|
"PREPARE_JOB_AUDIO",
|
|
"PACKAGE_SEGMENT_TS",
|
|
"FINALIZE_MP4"
|
|
]
|
|
|
|
|
|
@dataclass
|
|
class WorkerConfig:
|
|
"""
|
|
Worker 配置
|
|
|
|
包含 Worker 运行所需的所有配置参数。
|
|
"""
|
|
# API 配置
|
|
api_endpoint: str
|
|
access_key: str
|
|
worker_id: str
|
|
|
|
# 并发控制
|
|
max_concurrency: int = 4
|
|
|
|
# 心跳配置
|
|
heartbeat_interval: int = 5 # 秒
|
|
|
|
# 租约配置
|
|
lease_extension_threshold: int = 60 # 秒,提前多久续期
|
|
lease_extension_duration: int = 300 # 秒,每次续期时长
|
|
|
|
# 目录配置
|
|
temp_dir: str = "/tmp/render_worker"
|
|
|
|
# 能力配置
|
|
capabilities: List[str] = field(default_factory=lambda: DEFAULT_CAPABILITIES.copy())
|
|
|
|
# FFmpeg 配置
|
|
ffmpeg_timeout: int = 3600 # 秒,FFmpeg 执行超时
|
|
|
|
# 下载/上传配置
|
|
download_timeout: int = 300 # 秒,下载超时
|
|
upload_timeout: int = 600 # 秒,上传超时
|
|
|
|
# 硬件加速配置
|
|
hw_accel: str = HW_ACCEL_NONE # 硬件加速类型: none, qsv, cuda
|
|
|
|
# 素材缓存配置
|
|
cache_enabled: bool = True # 是否启用素材缓存
|
|
cache_dir: str = "" # 缓存目录,默认为 temp_dir/cache
|
|
cache_max_size_gb: float = 0 # 最大缓存大小(GB),0 表示不限制
|
|
|
|
@classmethod
|
|
def from_env(cls) -> 'WorkerConfig':
|
|
"""从环境变量创建配置"""
|
|
|
|
# API 端点,优先使用 V2 版本
|
|
api_endpoint = os.getenv('API_ENDPOINT_V2') or os.getenv('API_ENDPOINT', '')
|
|
if not api_endpoint:
|
|
raise ValueError("API_ENDPOINT_V2 or API_ENDPOINT environment variable is required")
|
|
|
|
# Access Key
|
|
access_key = os.getenv('ACCESS_KEY', '')
|
|
if not access_key:
|
|
raise ValueError("ACCESS_KEY environment variable is required")
|
|
|
|
# Worker ID
|
|
worker_id = os.getenv('WORKER_ID', '100001')
|
|
|
|
# 并发数
|
|
max_concurrency = int(os.getenv('MAX_CONCURRENCY', '4'))
|
|
|
|
# 心跳间隔
|
|
heartbeat_interval = int(os.getenv('HEARTBEAT_INTERVAL', '5'))
|
|
|
|
# 租约配置
|
|
lease_extension_threshold = int(os.getenv('LEASE_EXTENSION_THRESHOLD', '60'))
|
|
lease_extension_duration = int(os.getenv('LEASE_EXTENSION_DURATION', '300'))
|
|
|
|
# 临时目录
|
|
temp_dir = os.getenv('TEMP_DIR', os.getenv('TEMP', '/tmp/render_worker'))
|
|
|
|
# 能力列表
|
|
capabilities_str = os.getenv('CAPABILITIES', '')
|
|
if capabilities_str:
|
|
capabilities = [c.strip() for c in capabilities_str.split(',') if c.strip()]
|
|
else:
|
|
capabilities = DEFAULT_CAPABILITIES.copy()
|
|
|
|
# FFmpeg 超时
|
|
ffmpeg_timeout = int(os.getenv('FFMPEG_TIMEOUT', '3600'))
|
|
|
|
# 下载/上传超时
|
|
download_timeout = int(os.getenv('DOWNLOAD_TIMEOUT', '300'))
|
|
upload_timeout = int(os.getenv('UPLOAD_TIMEOUT', '600'))
|
|
|
|
# 硬件加速配置
|
|
hw_accel = os.getenv('HW_ACCEL', HW_ACCEL_NONE).lower()
|
|
if hw_accel not in HW_ACCEL_TYPES:
|
|
hw_accel = HW_ACCEL_NONE
|
|
|
|
# 素材缓存配置
|
|
cache_enabled = os.getenv('CACHE_ENABLED', 'true').lower() in ('true', '1', 'yes')
|
|
cache_dir = os.getenv('CACHE_DIR', '') # 空字符串表示使用默认路径
|
|
cache_max_size_gb = float(os.getenv('CACHE_MAX_SIZE_GB', '0'))
|
|
|
|
return cls(
|
|
api_endpoint=api_endpoint,
|
|
access_key=access_key,
|
|
worker_id=worker_id,
|
|
max_concurrency=max_concurrency,
|
|
heartbeat_interval=heartbeat_interval,
|
|
lease_extension_threshold=lease_extension_threshold,
|
|
lease_extension_duration=lease_extension_duration,
|
|
temp_dir=temp_dir,
|
|
capabilities=capabilities,
|
|
ffmpeg_timeout=ffmpeg_timeout,
|
|
download_timeout=download_timeout,
|
|
upload_timeout=upload_timeout,
|
|
hw_accel=hw_accel,
|
|
cache_enabled=cache_enabled,
|
|
cache_dir=cache_dir if cache_dir else os.path.join(temp_dir, 'cache'),
|
|
cache_max_size_gb=cache_max_size_gb
|
|
)
|
|
|
|
def get_work_dir_path(self, task_id: str) -> str:
|
|
"""获取任务工作目录路径"""
|
|
return os.path.join(self.temp_dir, f"task_{task_id}")
|
|
|
|
def ensure_temp_dir(self) -> None:
|
|
"""确保临时目录存在"""
|
|
os.makedirs(self.temp_dir, exist_ok=True)
|
|
|
|
def is_hw_accel_enabled(self) -> bool:
|
|
"""是否启用了硬件加速"""
|
|
return self.hw_accel != HW_ACCEL_NONE
|
|
|
|
def is_qsv(self) -> bool:
|
|
"""是否使用 QSV 硬件加速"""
|
|
return self.hw_accel == HW_ACCEL_QSV
|
|
|
|
def is_cuda(self) -> bool:
|
|
"""是否使用 CUDA 硬件加速"""
|
|
return self.hw_accel == HW_ACCEL_CUDA
|