You've already forked FrameTour-RenderWorker
- 修改了 maxrate 参数的处理逻辑,当 maxrate 有效时设置 -b:v 为 maxrate - 调整了 bufsize 计算方式,从 2 倍改为 1 倍 maxrate,适合短视频严格码率控制 - 添加了硬件加速 QSV 编码参数支持 - 修复了 NVENC VBV 模型生效条件判断逻辑 - 更新了 CRF/CQ 模式下的峰值码率限制实现
982 lines
35 KiB
Python
982 lines
35 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""
|
|
任务处理器基类
|
|
|
|
提供所有处理器共用的基础功能。
|
|
"""
|
|
|
|
import os
|
|
import json
|
|
import logging
|
|
import shutil
|
|
import tempfile
|
|
import subprocess
|
|
import threading
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
from abc import ABC
|
|
from typing import Optional, List, Dict, Any, Tuple, TYPE_CHECKING
|
|
|
|
from opentelemetry.trace import SpanKind
|
|
|
|
from core.handler import TaskHandler
|
|
from domain.task import Task
|
|
from domain.result import TaskResult, ErrorCode
|
|
from domain.config import WorkerConfig
|
|
from services import storage
|
|
from services.cache import MaterialCache
|
|
from util.tracing import (
|
|
bind_trace_context,
|
|
capture_otel_context,
|
|
get_current_task_context,
|
|
mark_span_error,
|
|
start_span,
|
|
)
|
|
from constant import (
|
|
HW_ACCEL_NONE, HW_ACCEL_QSV, HW_ACCEL_CUDA,
|
|
VIDEO_ENCODE_PARAMS, VIDEO_ENCODE_PARAMS_QSV, VIDEO_ENCODE_PARAMS_CUDA
|
|
)
|
|
|
|
if TYPE_CHECKING:
|
|
from services.api_client import APIClientV2
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def get_video_encode_args(hw_accel: str = HW_ACCEL_NONE, maxrate: Optional[int] = None) -> List[str]:
|
|
"""
|
|
根据硬件加速配置获取视频编码参数
|
|
|
|
Args:
|
|
hw_accel: 硬件加速类型 (none, qsv, cuda)
|
|
maxrate: 最大码率(bps),用于限制 CRF/CQ 模式的峰值码率。
|
|
例如 4000000 表示 4Mbps。
|
|
bufsize 设为 1x maxrate(1 秒窗口),适合短视频(<10s)的严格码率控制。
|
|
|
|
Returns:
|
|
FFmpeg 视频编码参数列表
|
|
"""
|
|
has_maxrate = maxrate is not None and maxrate > 0
|
|
|
|
if hw_accel == HW_ACCEL_QSV:
|
|
params = VIDEO_ENCODE_PARAMS_QSV
|
|
args = [
|
|
'-c:v', params['codec'],
|
|
'-preset', params['preset'],
|
|
'-profile:v', params['profile'],
|
|
'-level', params['level'],
|
|
'-global_quality', params['global_quality'],
|
|
'-look_ahead', params['look_ahead'],
|
|
# 禁用 B 帧,避免独立 TS 分片在 HLS 边界出现 PTS/DTS 回退
|
|
'-bf', '0',
|
|
]
|
|
elif hw_accel == HW_ACCEL_CUDA:
|
|
params = VIDEO_ENCODE_PARAMS_CUDA
|
|
args = [
|
|
'-c:v', params['codec'],
|
|
'-preset', params['preset'],
|
|
'-profile:v', params['profile'],
|
|
'-level', params['level'],
|
|
'-rc', params['rc'],
|
|
'-cq', params['cq'],
|
|
# 有 maxrate 时设置 -b:v 为 maxrate,让 NVENC VBV 模型真正生效;
|
|
# 无 maxrate 时保持 -b:v 0(纯 CQ 质量模式)
|
|
'-b:v', f'{maxrate // 1000}k' if has_maxrate else '0',
|
|
# 禁用 B 帧,避免独立 TS 分片在 HLS 边界出现 PTS/DTS 回退
|
|
'-bf', '0',
|
|
]
|
|
else:
|
|
# 软件编码(默认)
|
|
params = VIDEO_ENCODE_PARAMS
|
|
args = [
|
|
'-c:v', params['codec'],
|
|
'-preset', params['preset'],
|
|
'-profile:v', params['profile'],
|
|
'-level', params['level'],
|
|
'-crf', params['crf'],
|
|
'-pix_fmt', params['pix_fmt'],
|
|
# 禁用 B 帧,避免独立 TS 分片在 HLS 边界出现 PTS/DTS 回退
|
|
'-bf', '0',
|
|
]
|
|
|
|
# CRF/CQ + maxrate 上限:保留质量控制的同时限制峰值码率
|
|
# bufsize = 1x maxrate(1 秒窗口),对短视频 VBV 约束更紧,避免缓冲区过大导致码率失控
|
|
if has_maxrate:
|
|
maxrate_k = f'{maxrate // 1000}k'
|
|
bufsize_k = maxrate_k # 1x maxrate,短视频下收敛更快
|
|
args.extend(['-maxrate', maxrate_k, '-bufsize', bufsize_k])
|
|
|
|
return args
|
|
|
|
|
|
def get_hwaccel_decode_args(hw_accel: str = HW_ACCEL_NONE, device_index: Optional[int] = None) -> List[str]:
|
|
"""
|
|
获取硬件加速解码参数(输入文件之前使用)
|
|
|
|
Args:
|
|
hw_accel: 硬件加速类型 (none, qsv, cuda)
|
|
device_index: GPU 设备索引,用于多显卡调度
|
|
|
|
Returns:
|
|
FFmpeg 硬件加速解码参数列表
|
|
"""
|
|
if hw_accel == HW_ACCEL_CUDA:
|
|
# CUDA 硬件加速解码
|
|
args = ['-hwaccel', 'cuda']
|
|
# 多显卡模式下指定设备
|
|
if device_index is not None:
|
|
args.extend(['-hwaccel_device', str(device_index)])
|
|
args.extend(['-hwaccel_output_format', 'cuda'])
|
|
return args
|
|
elif hw_accel == HW_ACCEL_QSV:
|
|
# QSV 硬件加速解码
|
|
args = ['-hwaccel', 'qsv']
|
|
# QSV 在 Windows 上使用 -qsv_device
|
|
if device_index is not None:
|
|
args.extend(['-qsv_device', str(device_index)])
|
|
args.extend(['-hwaccel_output_format', 'qsv'])
|
|
return args
|
|
else:
|
|
return []
|
|
|
|
|
|
def get_hwaccel_filter_prefix(hw_accel: str = HW_ACCEL_NONE) -> str:
|
|
"""
|
|
获取硬件加速滤镜前缀(用于 hwdownload 从 GPU 到 CPU)
|
|
|
|
注意:由于大多数复杂滤镜(如 lut3d, overlay, crop 等)不支持硬件表面,
|
|
我们需要在滤镜链开始时将硬件表面下载到系统内存。
|
|
|
|
CUDA/QSV hwdownload 只支持 nv12 格式输出,因此需要两步转换:
|
|
1. hwdownload,format=nv12 - 从 GPU 下载到 CPU
|
|
2. format=yuv420p - 转换为标准格式(确保与 RGBA/YUVA overlay 混合时颜色正确)
|
|
|
|
Args:
|
|
hw_accel: 硬件加速类型
|
|
|
|
Returns:
|
|
需要添加到滤镜链开头的 hwdownload 滤镜字符串
|
|
"""
|
|
if hw_accel == HW_ACCEL_CUDA:
|
|
return 'hwdownload,format=nv12,format=yuv420p,'
|
|
elif hw_accel == HW_ACCEL_QSV:
|
|
return 'hwdownload,format=nv12,format=yuv420p,'
|
|
else:
|
|
return ''
|
|
|
|
|
|
# v2 统一视频编码参数(兼容旧代码,使用软件编码)
|
|
VIDEO_ENCODE_ARGS = get_video_encode_args(HW_ACCEL_NONE)
|
|
|
|
# v2 统一音频编码参数
|
|
AUDIO_ENCODE_ARGS = [
|
|
'-c:a', 'aac',
|
|
'-b:a', '128k',
|
|
'-ar', '48000',
|
|
'-ac', '2',
|
|
]
|
|
|
|
FFMPEG_LOGLEVEL = 'error'
|
|
|
|
|
|
def subprocess_args(include_stdout: bool = True) -> Dict[str, Any]:
|
|
"""
|
|
创建跨平台的 subprocess 参数
|
|
|
|
在 Windows 上使用 Pyinstaller --noconsole 打包时,需要特殊处理以避免弹出命令行窗口。
|
|
|
|
Args:
|
|
include_stdout: 是否包含 stdout 捕获
|
|
|
|
Returns:
|
|
subprocess.run 使用的参数字典
|
|
"""
|
|
ret: Dict[str, Any] = {}
|
|
|
|
# Windows 特殊处理
|
|
if hasattr(subprocess, 'STARTUPINFO'):
|
|
si = subprocess.STARTUPINFO()
|
|
si.dwFlags |= subprocess.STARTF_USESHOWWINDOW
|
|
ret['startupinfo'] = si
|
|
ret['env'] = os.environ
|
|
|
|
# 重定向 stdin 避免 "handle is invalid" 错误
|
|
ret['stdin'] = subprocess.PIPE
|
|
|
|
if include_stdout:
|
|
ret['stdout'] = subprocess.PIPE
|
|
|
|
return ret
|
|
|
|
|
|
def probe_video_info(video_file: str) -> Tuple[int, int, float]:
|
|
"""
|
|
探测视频信息(宽度、高度、时长)
|
|
|
|
Args:
|
|
video_file: 视频文件路径
|
|
|
|
Returns:
|
|
(width, height, duration) 元组,失败返回 (0, 0, 0)
|
|
"""
|
|
try:
|
|
result = subprocess.run(
|
|
[
|
|
'ffprobe', '-v', 'error',
|
|
'-select_streams', 'v:0',
|
|
'-show_entries', 'stream=width,height:format=duration',
|
|
'-of', 'csv=s=x:p=0',
|
|
video_file
|
|
],
|
|
capture_output=True,
|
|
timeout=30,
|
|
**subprocess_args(False)
|
|
)
|
|
|
|
if result.returncode != 0:
|
|
logger.warning(f"ffprobe failed for {video_file}")
|
|
return 0, 0, 0
|
|
|
|
output = result.stdout.decode('utf-8').strip()
|
|
if not output:
|
|
return 0, 0, 0
|
|
|
|
lines = output.split('\n')
|
|
if len(lines) >= 2:
|
|
wh = lines[0].strip()
|
|
duration_str = lines[1].strip()
|
|
width, height = wh.split('x')
|
|
return int(width), int(height), float(duration_str)
|
|
|
|
return 0, 0, 0
|
|
|
|
except Exception as e:
|
|
logger.warning(f"probe_video_info error: {e}")
|
|
return 0, 0, 0
|
|
|
|
|
|
def probe_duration_json(file_path: str) -> Optional[float]:
|
|
"""
|
|
使用 ffprobe JSON 输出探测媒体时长
|
|
|
|
Args:
|
|
file_path: 媒体文件路径
|
|
|
|
Returns:
|
|
时长(秒),失败返回 None
|
|
"""
|
|
try:
|
|
result = subprocess.run(
|
|
[
|
|
'ffprobe', '-v', 'error',
|
|
'-show_entries', 'format=duration',
|
|
'-of', 'json',
|
|
file_path
|
|
],
|
|
capture_output=True,
|
|
timeout=30,
|
|
**subprocess_args(False)
|
|
)
|
|
|
|
if result.returncode != 0:
|
|
return None
|
|
|
|
data = json.loads(result.stdout.decode('utf-8'))
|
|
duration = data.get('format', {}).get('duration')
|
|
return float(duration) if duration else None
|
|
|
|
except Exception as e:
|
|
logger.warning(f"probe_duration_json error: {e}")
|
|
return None
|
|
|
|
|
|
class BaseHandler(TaskHandler, ABC):
|
|
"""
|
|
任务处理器基类
|
|
|
|
提供所有处理器共用的基础功能,包括:
|
|
- 临时目录管理
|
|
- 文件下载/上传
|
|
- FFmpeg 命令执行
|
|
- GPU 设备管理(多显卡调度)
|
|
- 日志记录
|
|
"""
|
|
|
|
# 线程本地存储:用于存储当前线程的 GPU 设备索引
|
|
_thread_local = threading.local()
|
|
DEFAULT_TASK_DOWNLOAD_CONCURRENCY = 4
|
|
DEFAULT_TASK_UPLOAD_CONCURRENCY = 2
|
|
MAX_TASK_TRANSFER_CONCURRENCY = 16
|
|
|
|
def __init__(self, config: WorkerConfig, api_client: 'APIClientV2'):
|
|
"""
|
|
初始化处理器
|
|
|
|
Args:
|
|
config: Worker 配置
|
|
api_client: API 客户端
|
|
"""
|
|
self.config = config
|
|
self.api_client = api_client
|
|
self.material_cache = MaterialCache(
|
|
cache_dir=config.cache_dir,
|
|
enabled=config.cache_enabled,
|
|
max_size_gb=config.cache_max_size_gb
|
|
)
|
|
self.task_download_concurrency = self._resolve_task_transfer_concurrency(
|
|
"TASK_DOWNLOAD_CONCURRENCY",
|
|
self.DEFAULT_TASK_DOWNLOAD_CONCURRENCY
|
|
)
|
|
self.task_upload_concurrency = self._resolve_task_transfer_concurrency(
|
|
"TASK_UPLOAD_CONCURRENCY",
|
|
self.DEFAULT_TASK_UPLOAD_CONCURRENCY
|
|
)
|
|
|
|
def _resolve_task_transfer_concurrency(self, env_name: str, default_value: int) -> int:
|
|
"""读取并规范化任务内传输并发数配置。"""
|
|
raw_value = os.getenv(env_name)
|
|
if raw_value is None or not raw_value.strip():
|
|
return default_value
|
|
|
|
try:
|
|
parsed_value = int(raw_value.strip())
|
|
except ValueError:
|
|
logger.warning(
|
|
f"Invalid {env_name} value '{raw_value}', using default {default_value}"
|
|
)
|
|
return default_value
|
|
|
|
if parsed_value < 1:
|
|
logger.warning(f"{env_name} must be >= 1, forcing to 1")
|
|
return 1
|
|
|
|
if parsed_value > self.MAX_TASK_TRANSFER_CONCURRENCY:
|
|
logger.warning(
|
|
f"{env_name}={parsed_value} exceeds limit {self.MAX_TASK_TRANSFER_CONCURRENCY}, "
|
|
f"using {self.MAX_TASK_TRANSFER_CONCURRENCY}"
|
|
)
|
|
return self.MAX_TASK_TRANSFER_CONCURRENCY
|
|
|
|
return parsed_value
|
|
|
|
def download_files_parallel(
|
|
self,
|
|
download_jobs: List[Dict[str, Any]],
|
|
timeout: Optional[int] = None
|
|
) -> Dict[str, Dict[str, Any]]:
|
|
"""
|
|
单任务内并行下载多个文件。
|
|
|
|
Args:
|
|
download_jobs: 下载任务列表。每项字段:
|
|
- key: 唯一标识
|
|
- url: 下载地址
|
|
- dest: 目标文件路径
|
|
- required: 是否关键文件(可选,默认 True)
|
|
- use_cache: 是否使用缓存(可选,默认 True)
|
|
timeout: 单文件下载超时(秒)
|
|
|
|
Returns:
|
|
key -> 结果字典:
|
|
- success: 是否成功
|
|
- url: 原始 URL
|
|
- dest: 目标文件路径
|
|
- required: 是否关键文件
|
|
"""
|
|
if not download_jobs:
|
|
return {}
|
|
|
|
normalized_jobs: List[Dict[str, Any]] = []
|
|
seen_keys = set()
|
|
for download_job in download_jobs:
|
|
job_key = str(download_job.get("key", "")).strip()
|
|
job_url = str(download_job.get("url", "")).strip()
|
|
job_dest = str(download_job.get("dest", "")).strip()
|
|
if not job_key or not job_url or not job_dest:
|
|
raise ValueError("Each download job must include non-empty key/url/dest")
|
|
if job_key in seen_keys:
|
|
raise ValueError(f"Duplicate download job key: {job_key}")
|
|
seen_keys.add(job_key)
|
|
normalized_jobs.append({
|
|
"key": job_key,
|
|
"url": job_url,
|
|
"dest": job_dest,
|
|
"required": bool(download_job.get("required", True)),
|
|
"use_cache": bool(download_job.get("use_cache", True)),
|
|
})
|
|
|
|
if timeout is None:
|
|
timeout = self.config.download_timeout
|
|
|
|
parent_otel_context = capture_otel_context()
|
|
task_context = get_current_task_context()
|
|
task_prefix = f"[task:{task_context.task_id}] " if task_context else ""
|
|
results: Dict[str, Dict[str, Any]] = {}
|
|
|
|
def _run_download_job(download_job: Dict[str, Any]) -> bool:
|
|
with bind_trace_context(parent_otel_context, task_context):
|
|
return self.download_file(
|
|
download_job["url"],
|
|
download_job["dest"],
|
|
timeout=timeout,
|
|
use_cache=download_job["use_cache"],
|
|
)
|
|
|
|
max_workers = min(self.task_download_concurrency, len(normalized_jobs))
|
|
if max_workers <= 1:
|
|
for download_job in normalized_jobs:
|
|
is_success = _run_download_job(download_job)
|
|
results[download_job["key"]] = {
|
|
"success": is_success,
|
|
"url": download_job["url"],
|
|
"dest": download_job["dest"],
|
|
"required": download_job["required"],
|
|
}
|
|
else:
|
|
with ThreadPoolExecutor(
|
|
max_workers=max_workers,
|
|
thread_name_prefix="TaskDownload",
|
|
) as executor:
|
|
future_to_job = {
|
|
executor.submit(_run_download_job, download_job): download_job
|
|
for download_job in normalized_jobs
|
|
}
|
|
for completed_future in as_completed(future_to_job):
|
|
download_job = future_to_job[completed_future]
|
|
is_success = False
|
|
try:
|
|
is_success = bool(completed_future.result())
|
|
except Exception as exc:
|
|
logger.error(
|
|
f"{task_prefix}Parallel download raised exception for "
|
|
f"key={download_job['key']}: {exc}"
|
|
)
|
|
results[download_job["key"]] = {
|
|
"success": is_success,
|
|
"url": download_job["url"],
|
|
"dest": download_job["dest"],
|
|
"required": download_job["required"],
|
|
}
|
|
|
|
success_count = sum(1 for item in results.values() if item["success"])
|
|
logger.debug(
|
|
f"{task_prefix}Parallel download completed: {success_count}/{len(normalized_jobs)}"
|
|
)
|
|
return results
|
|
|
|
def upload_files_parallel(
|
|
self,
|
|
upload_jobs: List[Dict[str, Any]]
|
|
) -> Dict[str, Dict[str, Any]]:
|
|
"""
|
|
单任务内并行上传多个文件。
|
|
|
|
Args:
|
|
upload_jobs: 上传任务列表。每项字段:
|
|
- key: 唯一标识
|
|
- task_id: 任务 ID
|
|
- file_type: 文件类型(video/audio/ts/mp4)
|
|
- file_path: 本地文件路径
|
|
- file_name: 文件名(可选)
|
|
- required: 是否关键文件(可选,默认 True)
|
|
|
|
Returns:
|
|
key -> 结果字典:
|
|
- success: 是否成功
|
|
- url: 上传后的访问 URL(失败为 None)
|
|
- file_path: 本地文件路径
|
|
- required: 是否关键文件
|
|
"""
|
|
if not upload_jobs:
|
|
return {}
|
|
|
|
normalized_jobs: List[Dict[str, Any]] = []
|
|
seen_keys = set()
|
|
for upload_job in upload_jobs:
|
|
job_key = str(upload_job.get("key", "")).strip()
|
|
task_id = str(upload_job.get("task_id", "")).strip()
|
|
file_type = str(upload_job.get("file_type", "")).strip()
|
|
file_path = str(upload_job.get("file_path", "")).strip()
|
|
if not job_key or not task_id or not file_type or not file_path:
|
|
raise ValueError(
|
|
"Each upload job must include non-empty key/task_id/file_type/file_path"
|
|
)
|
|
if job_key in seen_keys:
|
|
raise ValueError(f"Duplicate upload job key: {job_key}")
|
|
seen_keys.add(job_key)
|
|
normalized_jobs.append({
|
|
"key": job_key,
|
|
"task_id": task_id,
|
|
"file_type": file_type,
|
|
"file_path": file_path,
|
|
"file_name": upload_job.get("file_name"),
|
|
"required": bool(upload_job.get("required", True)),
|
|
})
|
|
|
|
parent_otel_context = capture_otel_context()
|
|
task_context = get_current_task_context()
|
|
task_prefix = f"[task:{task_context.task_id}] " if task_context else ""
|
|
results: Dict[str, Dict[str, Any]] = {}
|
|
|
|
def _run_upload_job(upload_job: Dict[str, Any]) -> Optional[str]:
|
|
with bind_trace_context(parent_otel_context, task_context):
|
|
return self.upload_file(
|
|
upload_job["task_id"],
|
|
upload_job["file_type"],
|
|
upload_job["file_path"],
|
|
upload_job.get("file_name")
|
|
)
|
|
|
|
max_workers = min(self.task_upload_concurrency, len(normalized_jobs))
|
|
if max_workers <= 1:
|
|
for upload_job in normalized_jobs:
|
|
result_url = _run_upload_job(upload_job)
|
|
results[upload_job["key"]] = {
|
|
"success": bool(result_url),
|
|
"url": result_url,
|
|
"file_path": upload_job["file_path"],
|
|
"required": upload_job["required"],
|
|
}
|
|
else:
|
|
with ThreadPoolExecutor(
|
|
max_workers=max_workers,
|
|
thread_name_prefix="TaskUpload",
|
|
) as executor:
|
|
future_to_job = {
|
|
executor.submit(_run_upload_job, upload_job): upload_job
|
|
for upload_job in normalized_jobs
|
|
}
|
|
for completed_future in as_completed(future_to_job):
|
|
upload_job = future_to_job[completed_future]
|
|
result_url = None
|
|
try:
|
|
result_url = completed_future.result()
|
|
except Exception as exc:
|
|
logger.error(
|
|
f"{task_prefix}Parallel upload raised exception for "
|
|
f"key={upload_job['key']}: {exc}"
|
|
)
|
|
results[upload_job["key"]] = {
|
|
"success": bool(result_url),
|
|
"url": result_url,
|
|
"file_path": upload_job["file_path"],
|
|
"required": upload_job["required"],
|
|
}
|
|
|
|
success_count = sum(1 for item in results.values() if item["success"])
|
|
logger.debug(
|
|
f"{task_prefix}Parallel upload completed: {success_count}/{len(normalized_jobs)}"
|
|
)
|
|
return results
|
|
|
|
# ========== GPU 设备管理 ==========
|
|
|
|
def set_gpu_device(self, device_index: int) -> None:
|
|
"""
|
|
设置当前线程的 GPU 设备索引
|
|
|
|
由 TaskExecutor 在任务执行前调用。
|
|
|
|
Args:
|
|
device_index: GPU 设备索引
|
|
"""
|
|
self._thread_local.gpu_device = device_index
|
|
|
|
def get_gpu_device(self) -> Optional[int]:
|
|
"""
|
|
获取当前线程的 GPU 设备索引
|
|
|
|
Returns:
|
|
GPU 设备索引,未设置则返回 None
|
|
"""
|
|
return getattr(self._thread_local, 'gpu_device', None)
|
|
|
|
def clear_gpu_device(self) -> None:
|
|
"""
|
|
清除当前线程的 GPU 设备索引
|
|
|
|
由 TaskExecutor 在任务执行后调用。
|
|
"""
|
|
if hasattr(self._thread_local, 'gpu_device'):
|
|
del self._thread_local.gpu_device
|
|
|
|
# ========== FFmpeg 参数生成 ==========
|
|
|
|
def get_video_encode_args(self, maxrate: Optional[int] = None) -> List[str]:
|
|
"""
|
|
获取当前配置的视频编码参数
|
|
|
|
Args:
|
|
maxrate: 最大码率(bps),用于限制峰值码率
|
|
|
|
Returns:
|
|
FFmpeg 视频编码参数列表
|
|
"""
|
|
return get_video_encode_args(self.config.hw_accel, maxrate=maxrate)
|
|
|
|
def get_hwaccel_decode_args(self) -> List[str]:
|
|
"""
|
|
获取硬件加速解码参数(支持设备指定)
|
|
|
|
Returns:
|
|
FFmpeg 硬件加速解码参数列表
|
|
"""
|
|
device_index = self.get_gpu_device()
|
|
return get_hwaccel_decode_args(self.config.hw_accel, device_index)
|
|
|
|
def get_hwaccel_filter_prefix(self) -> str:
|
|
"""
|
|
获取硬件加速滤镜前缀
|
|
|
|
Returns:
|
|
需要添加到滤镜链开头的 hwdownload 滤镜字符串
|
|
"""
|
|
return get_hwaccel_filter_prefix(self.config.hw_accel)
|
|
|
|
def before_handle(self, task: Task) -> None:
|
|
"""处理前钩子"""
|
|
logger.debug(f"[task:{task.task_id}] Before handle: {task.task_type.value}")
|
|
|
|
def after_handle(self, task: Task, result: TaskResult) -> None:
|
|
"""处理后钩子"""
|
|
status = "success" if result.success else "failed"
|
|
logger.debug(f"[task:{task.task_id}] After handle: {status}")
|
|
|
|
def create_work_dir(self, task_id: str = None) -> str:
|
|
"""
|
|
创建临时工作目录
|
|
|
|
Args:
|
|
task_id: 任务 ID(用于目录命名)
|
|
|
|
Returns:
|
|
工作目录路径
|
|
"""
|
|
# 确保临时根目录存在
|
|
os.makedirs(self.config.temp_dir, exist_ok=True)
|
|
|
|
# 创建唯一的工作目录
|
|
prefix = f"task_{task_id}_" if task_id else "task_"
|
|
work_dir = tempfile.mkdtemp(dir=self.config.temp_dir, prefix=prefix)
|
|
|
|
logger.debug(f"Created work directory: {work_dir}")
|
|
return work_dir
|
|
|
|
def cleanup_work_dir(self, work_dir: str) -> None:
|
|
"""
|
|
清理临时工作目录
|
|
|
|
Args:
|
|
work_dir: 工作目录路径
|
|
"""
|
|
if not work_dir or not os.path.exists(work_dir):
|
|
return
|
|
|
|
try:
|
|
shutil.rmtree(work_dir)
|
|
logger.debug(f"Cleaned up work directory: {work_dir}")
|
|
except Exception as e:
|
|
logger.warning(f"Failed to cleanup work directory {work_dir}: {e}")
|
|
|
|
def download_file(self, url: str, dest: str, timeout: int = None, use_cache: bool = True) -> bool:
|
|
"""
|
|
下载文件(支持缓存)
|
|
|
|
Args:
|
|
url: 文件 URL
|
|
dest: 目标路径
|
|
timeout: 超时时间(秒)
|
|
use_cache: 是否使用缓存(默认 True)
|
|
|
|
Returns:
|
|
是否成功
|
|
"""
|
|
if timeout is None:
|
|
timeout = self.config.download_timeout
|
|
|
|
task_context = get_current_task_context()
|
|
task_prefix = f"[task:{task_context.task_id}] " if task_context else ""
|
|
logger.debug(f"{task_prefix}Downloading from: {url} -> {dest}")
|
|
|
|
with start_span(
|
|
"render.task.file.download",
|
|
kind=SpanKind.CLIENT,
|
|
attributes={
|
|
"render.file.source_url": url,
|
|
"render.file.destination": dest,
|
|
"render.file.use_cache": use_cache,
|
|
},
|
|
) as span:
|
|
try:
|
|
lock_wait_ms = 0
|
|
lock_acquired = False
|
|
cache_path_used = "unknown"
|
|
if use_cache:
|
|
result, cache_metrics = self.material_cache.get_or_download_with_metrics(
|
|
url,
|
|
dest,
|
|
timeout=timeout
|
|
)
|
|
lock_wait_ms = int(cache_metrics.get("lock_wait_ms", 0))
|
|
lock_acquired = bool(cache_metrics.get("lock_acquired", False))
|
|
cache_path_used = str(cache_metrics.get("cache_path_used", "unknown"))
|
|
else:
|
|
result = storage.download_file(url, dest, timeout=timeout)
|
|
cache_path_used = "direct"
|
|
|
|
if span is not None:
|
|
span.set_attribute("render.file.lock_wait_ms", lock_wait_ms)
|
|
span.set_attribute("render.file.lock_acquired", lock_acquired)
|
|
span.set_attribute("render.file.cache_path_used", cache_path_used)
|
|
|
|
if result:
|
|
file_size = os.path.getsize(dest) if os.path.exists(dest) else 0
|
|
logger.debug(f"{task_prefix}Downloaded: {url} -> {dest} ({file_size} bytes)")
|
|
if span is not None:
|
|
span.set_attribute("render.file.size_bytes", file_size)
|
|
return result
|
|
except Exception as e:
|
|
mark_span_error(span, str(e), ErrorCode.E_INPUT_UNAVAILABLE.value)
|
|
logger.error(f"{task_prefix}Download failed: {e}")
|
|
logger.debug(f"{task_prefix}Download source address: {url}")
|
|
return False
|
|
|
|
def upload_file(
|
|
self,
|
|
task_id: str,
|
|
file_type: str,
|
|
file_path: str,
|
|
file_name: str = None
|
|
) -> Optional[str]:
|
|
"""
|
|
上传文件并返回访问 URL
|
|
|
|
Args:
|
|
task_id: 任务 ID
|
|
file_type: 文件类型(video/audio/ts/mp4)
|
|
file_path: 本地文件路径
|
|
file_name: 文件名(可选)
|
|
|
|
Returns:
|
|
访问 URL,失败返回 None
|
|
"""
|
|
local_file_exists = os.path.exists(file_path)
|
|
local_file_size = os.path.getsize(file_path) if local_file_exists else 0
|
|
with start_span(
|
|
"render.task.file.upload",
|
|
kind=SpanKind.CLIENT,
|
|
attributes={
|
|
"render.file.type": file_type,
|
|
"render.file.path": file_path,
|
|
"render.file.timeout_seconds": self.config.upload_timeout,
|
|
"render.file.local_exists": local_file_exists,
|
|
"render.file.local_size_bytes": local_file_size,
|
|
},
|
|
) as span:
|
|
upload_info = self.api_client.get_upload_url(task_id, file_type, file_name)
|
|
if not upload_info:
|
|
mark_span_error(span, "get upload url failed", ErrorCode.E_UPLOAD_FAILED.value)
|
|
logger.error(f"[task:{task_id}] Failed to get upload URL")
|
|
return None
|
|
|
|
upload_url = upload_info.get('uploadUrl')
|
|
access_url = upload_info.get('accessUrl')
|
|
|
|
if not upload_url:
|
|
mark_span_error(span, "invalid upload url response", ErrorCode.E_UPLOAD_FAILED.value)
|
|
logger.error(f"[task:{task_id}] Invalid upload URL response")
|
|
return None
|
|
|
|
logger.debug(
|
|
f"[task:{task_id}] Upload target address: uploadUrl={upload_url}, accessUrl={access_url}"
|
|
)
|
|
if span is not None:
|
|
span.set_attribute("render.file.upload_url", upload_url)
|
|
if access_url:
|
|
span.set_attribute("render.file.access_url", access_url)
|
|
|
|
try:
|
|
result, upload_metrics = storage.upload_file_with_metrics(
|
|
upload_url,
|
|
file_path,
|
|
timeout=self.config.upload_timeout,
|
|
)
|
|
upload_method = str(upload_metrics.get("upload_method", "unknown"))
|
|
http_attempts = int(upload_metrics.get("http_attempts", 0))
|
|
http_retry_count = int(upload_metrics.get("http_retry_count", 0))
|
|
http_status_code = int(upload_metrics.get("http_status_code", 0))
|
|
http_replace_applied = bool(upload_metrics.get("http_replace_applied", False))
|
|
content_type = str(upload_metrics.get("content_type", ""))
|
|
error_type = str(upload_metrics.get("error_type", ""))
|
|
rclone_attempted = bool(upload_metrics.get("rclone_attempted", False))
|
|
rclone_succeeded = bool(upload_metrics.get("rclone_succeeded", False))
|
|
rclone_fallback_http = bool(upload_metrics.get("rclone_fallback_http", False))
|
|
|
|
if span is not None:
|
|
span.set_attribute("render.file.upload_success", bool(result))
|
|
span.set_attribute("render.file.upload_method", upload_method)
|
|
span.set_attribute("render.file.http_attempts", http_attempts)
|
|
span.set_attribute("render.file.http_retry_count", http_retry_count)
|
|
span.set_attribute("render.file.http_replace_applied", http_replace_applied)
|
|
span.set_attribute("render.file.rclone_attempted", rclone_attempted)
|
|
span.set_attribute("render.file.rclone_succeeded", rclone_succeeded)
|
|
span.set_attribute("render.file.rclone_fallback_http", rclone_fallback_http)
|
|
if content_type:
|
|
span.set_attribute("render.file.content_type", content_type)
|
|
if http_status_code > 0:
|
|
span.set_attribute("render.file.http_status_code", http_status_code)
|
|
if error_type:
|
|
span.set_attribute("render.file.error_type", error_type)
|
|
|
|
if result:
|
|
file_size = local_file_size if local_file_size > 0 else os.path.getsize(file_path)
|
|
logger.info(
|
|
f"[task:{task_id}] Uploaded: {file_path} ({file_size} bytes)"
|
|
)
|
|
logger.debug(f"[task:{task_id}] Uploaded access address: {access_url or upload_url}")
|
|
if span is not None:
|
|
span.set_attribute("render.file.size_bytes", file_size)
|
|
|
|
cache_write_back = "skipped"
|
|
if access_url:
|
|
cache_added = self.material_cache.add_to_cache(access_url, file_path)
|
|
cache_write_back = "success" if cache_added else "failed"
|
|
if not cache_added:
|
|
logger.warning(f"[task:{task_id}] Upload cache write back failed: {file_path}")
|
|
if span is not None:
|
|
span.set_attribute("render.file.cache_write_back", cache_write_back)
|
|
|
|
return access_url
|
|
|
|
mark_span_error(
|
|
span,
|
|
f"upload failed(method={upload_method}, status={http_status_code}, retries={http_retry_count}, error={error_type})",
|
|
ErrorCode.E_UPLOAD_FAILED.value
|
|
)
|
|
logger.error(
|
|
f"[task:{task_id}] Upload failed: {file_path}, method={upload_method}, "
|
|
f"http_status={http_status_code}, retries={http_retry_count}, error_type={error_type}"
|
|
)
|
|
return None
|
|
except Exception as e:
|
|
mark_span_error(span, str(e), ErrorCode.E_UPLOAD_FAILED.value)
|
|
logger.error(f"[task:{task_id}] Upload error: {e}")
|
|
return None
|
|
|
|
def run_ffmpeg(
|
|
self,
|
|
cmd: List[str],
|
|
task_id: str,
|
|
timeout: int = None
|
|
) -> bool:
|
|
"""
|
|
执行 FFmpeg 命令
|
|
|
|
Args:
|
|
cmd: FFmpeg 命令参数列表
|
|
task_id: 任务 ID(用于日志)
|
|
timeout: 超时时间(秒)
|
|
|
|
Returns:
|
|
是否成功
|
|
"""
|
|
if timeout is None:
|
|
timeout = self.config.ffmpeg_timeout
|
|
|
|
cmd_to_run = list(cmd)
|
|
if cmd_to_run and cmd_to_run[0] == 'ffmpeg' and '-loglevel' not in cmd_to_run:
|
|
cmd_to_run[1:1] = ['-loglevel', FFMPEG_LOGLEVEL]
|
|
|
|
# 日志记录命令(不限制长度)
|
|
cmd_str = ' '.join(cmd_to_run)
|
|
logger.info(f"[task:{task_id}] FFmpeg: {cmd_str}")
|
|
|
|
with start_span(
|
|
"render.task.ffmpeg.run",
|
|
attributes={
|
|
"render.ffmpeg.timeout_seconds": timeout,
|
|
"render.ffmpeg.command": cmd_str,
|
|
},
|
|
) as span:
|
|
try:
|
|
run_args = subprocess_args(False)
|
|
run_args['stdout'] = subprocess.DEVNULL
|
|
run_args['stderr'] = subprocess.PIPE
|
|
result = subprocess.run(
|
|
cmd_to_run,
|
|
timeout=timeout,
|
|
**run_args
|
|
)
|
|
|
|
if span is not None:
|
|
span.set_attribute("render.ffmpeg.return_code", result.returncode)
|
|
|
|
if result.returncode != 0:
|
|
stderr = (result.stderr or b'').decode('utf-8', errors='replace')[:1000]
|
|
logger.error(f"[task:{task_id}] FFmpeg failed (code={result.returncode}): {stderr}")
|
|
mark_span_error(span, stderr or "ffmpeg failed", ErrorCode.E_FFMPEG_FAILED.value)
|
|
return False
|
|
|
|
return True
|
|
|
|
except subprocess.TimeoutExpired:
|
|
logger.error(f"[task:{task_id}] FFmpeg timeout after {timeout}s")
|
|
mark_span_error(span, f"timeout after {timeout}s", ErrorCode.E_TIMEOUT.value)
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"[task:{task_id}] FFmpeg error: {e}")
|
|
mark_span_error(span, str(e), ErrorCode.E_FFMPEG_FAILED.value)
|
|
return False
|
|
|
|
def probe_duration(self, file_path: str) -> Optional[float]:
|
|
"""
|
|
探测媒体文件时长
|
|
|
|
Args:
|
|
file_path: 文件路径
|
|
|
|
Returns:
|
|
时长(秒),失败返回 None
|
|
"""
|
|
# 首先尝试 JSON 输出方式
|
|
duration = probe_duration_json(file_path)
|
|
if duration is not None:
|
|
return duration
|
|
|
|
# 回退到旧方式
|
|
try:
|
|
_, _, duration = probe_video_info(file_path)
|
|
return float(duration) if duration else None
|
|
except Exception as e:
|
|
logger.warning(f"Failed to probe duration: {file_path} -> {e}")
|
|
return None
|
|
|
|
def get_file_size(self, file_path: str) -> int:
|
|
"""
|
|
获取文件大小
|
|
|
|
Args:
|
|
file_path: 文件路径
|
|
|
|
Returns:
|
|
文件大小(字节)
|
|
"""
|
|
try:
|
|
return os.path.getsize(file_path)
|
|
except Exception:
|
|
return 0
|
|
|
|
def ensure_file_exists(self, file_path: str, min_size: int = 0) -> bool:
|
|
"""
|
|
确保文件存在且大小满足要求
|
|
|
|
Args:
|
|
file_path: 文件路径
|
|
min_size: 最小大小(字节)
|
|
|
|
Returns:
|
|
是否满足要求
|
|
"""
|
|
if not os.path.exists(file_path):
|
|
return False
|
|
return os.path.getsize(file_path) >= min_size
|