Files
FrameTour-RenderWorker/handlers/base.py
Jerry Yan 16ea45ad1c perf(cache): 优化缓存下载逻辑并添加性能指标追踪
- 实现了带等待时间统计的缓存锁获取功能
- 新增 get_or_download_with_metrics 方法返回详细的性能指标
- 在 tracing span 中记录锁等待时间、锁获取状态和缓存路径使用情况
- 优化缓存命中路径避免不必要的锁获取操作
- 添加了缓存文件就绪检查和复制功能的独立方法
- 增加了针对缓存锁超时但仍可使用就绪缓存的处理逻辑
- 新增了多个单元测试验证缓存锁定和指标报告功能
2026-02-07 03:45:52 +08:00

905 lines
30 KiB
Python

# -*- coding: utf-8 -*-
"""
任务处理器基类
提供所有处理器共用的基础功能。
"""
import os
import json
import logging
import shutil
import tempfile
import subprocess
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from abc import ABC
from typing import Optional, List, Dict, Any, Tuple, TYPE_CHECKING
from opentelemetry.trace import SpanKind
from core.handler import TaskHandler
from domain.task import Task
from domain.result import TaskResult, ErrorCode
from domain.config import WorkerConfig
from services import storage
from services.cache import MaterialCache
from util.tracing import (
bind_trace_context,
capture_otel_context,
get_current_task_context,
mark_span_error,
start_span,
)
from constant import (
HW_ACCEL_NONE, HW_ACCEL_QSV, HW_ACCEL_CUDA,
VIDEO_ENCODE_PARAMS, VIDEO_ENCODE_PARAMS_QSV, VIDEO_ENCODE_PARAMS_CUDA
)
if TYPE_CHECKING:
from services.api_client import APIClientV2
logger = logging.getLogger(__name__)
def get_video_encode_args(hw_accel: str = HW_ACCEL_NONE) -> List[str]:
"""
根据硬件加速配置获取视频编码参数
Args:
hw_accel: 硬件加速类型 (none, qsv, cuda)
Returns:
FFmpeg 视频编码参数列表
"""
if hw_accel == HW_ACCEL_QSV:
params = VIDEO_ENCODE_PARAMS_QSV
return [
'-c:v', params['codec'],
'-preset', params['preset'],
'-profile:v', params['profile'],
'-level', params['level'],
'-global_quality', params['global_quality'],
'-look_ahead', params['look_ahead'],
]
elif hw_accel == HW_ACCEL_CUDA:
params = VIDEO_ENCODE_PARAMS_CUDA
return [
'-c:v', params['codec'],
'-preset', params['preset'],
'-profile:v', params['profile'],
'-level', params['level'],
'-rc', params['rc'],
'-cq', params['cq'],
'-b:v', '0', # 配合 vbr 模式使用 cq
]
else:
# 软件编码(默认)
params = VIDEO_ENCODE_PARAMS
return [
'-c:v', params['codec'],
'-preset', params['preset'],
'-profile:v', params['profile'],
'-level', params['level'],
'-crf', params['crf'],
'-pix_fmt', params['pix_fmt'],
]
def get_hwaccel_decode_args(hw_accel: str = HW_ACCEL_NONE, device_index: Optional[int] = None) -> List[str]:
"""
获取硬件加速解码参数(输入文件之前使用)
Args:
hw_accel: 硬件加速类型 (none, qsv, cuda)
device_index: GPU 设备索引,用于多显卡调度
Returns:
FFmpeg 硬件加速解码参数列表
"""
if hw_accel == HW_ACCEL_CUDA:
# CUDA 硬件加速解码
args = ['-hwaccel', 'cuda']
# 多显卡模式下指定设备
if device_index is not None:
args.extend(['-hwaccel_device', str(device_index)])
args.extend(['-hwaccel_output_format', 'cuda'])
return args
elif hw_accel == HW_ACCEL_QSV:
# QSV 硬件加速解码
args = ['-hwaccel', 'qsv']
# QSV 在 Windows 上使用 -qsv_device
if device_index is not None:
args.extend(['-qsv_device', str(device_index)])
args.extend(['-hwaccel_output_format', 'qsv'])
return args
else:
return []
def get_hwaccel_filter_prefix(hw_accel: str = HW_ACCEL_NONE) -> str:
"""
获取硬件加速滤镜前缀(用于 hwdownload 从 GPU 到 CPU)
注意:由于大多数复杂滤镜(如 lut3d, overlay, crop 等)不支持硬件表面,
我们需要在滤镜链开始时将硬件表面下载到系统内存。
CUDA/QSV hwdownload 只支持 nv12 格式输出,因此需要两步转换:
1. hwdownload,format=nv12 - 从 GPU 下载到 CPU
2. format=yuv420p - 转换为标准格式(确保与 RGBA/YUVA overlay 混合时颜色正确)
Args:
hw_accel: 硬件加速类型
Returns:
需要添加到滤镜链开头的 hwdownload 滤镜字符串
"""
if hw_accel == HW_ACCEL_CUDA:
return 'hwdownload,format=nv12,format=yuv420p,'
elif hw_accel == HW_ACCEL_QSV:
return 'hwdownload,format=nv12,format=yuv420p,'
else:
return ''
# v2 统一视频编码参数(兼容旧代码,使用软件编码)
VIDEO_ENCODE_ARGS = get_video_encode_args(HW_ACCEL_NONE)
# v2 统一音频编码参数
AUDIO_ENCODE_ARGS = [
'-c:a', 'aac',
'-b:a', '128k',
'-ar', '48000',
'-ac', '2',
]
FFMPEG_LOGLEVEL = 'error'
def subprocess_args(include_stdout: bool = True) -> Dict[str, Any]:
"""
创建跨平台的 subprocess 参数
在 Windows 上使用 Pyinstaller --noconsole 打包时,需要特殊处理以避免弹出命令行窗口。
Args:
include_stdout: 是否包含 stdout 捕获
Returns:
subprocess.run 使用的参数字典
"""
ret: Dict[str, Any] = {}
# Windows 特殊处理
if hasattr(subprocess, 'STARTUPINFO'):
si = subprocess.STARTUPINFO()
si.dwFlags |= subprocess.STARTF_USESHOWWINDOW
ret['startupinfo'] = si
ret['env'] = os.environ
# 重定向 stdin 避免 "handle is invalid" 错误
ret['stdin'] = subprocess.PIPE
if include_stdout:
ret['stdout'] = subprocess.PIPE
return ret
def probe_video_info(video_file: str) -> Tuple[int, int, float]:
"""
探测视频信息(宽度、高度、时长)
Args:
video_file: 视频文件路径
Returns:
(width, height, duration) 元组,失败返回 (0, 0, 0)
"""
try:
result = subprocess.run(
[
'ffprobe', '-v', 'error',
'-select_streams', 'v:0',
'-show_entries', 'stream=width,height:format=duration',
'-of', 'csv=s=x:p=0',
video_file
],
capture_output=True,
timeout=30,
**subprocess_args(False)
)
if result.returncode != 0:
logger.warning(f"ffprobe failed for {video_file}")
return 0, 0, 0
output = result.stdout.decode('utf-8').strip()
if not output:
return 0, 0, 0
lines = output.split('\n')
if len(lines) >= 2:
wh = lines[0].strip()
duration_str = lines[1].strip()
width, height = wh.split('x')
return int(width), int(height), float(duration_str)
return 0, 0, 0
except Exception as e:
logger.warning(f"probe_video_info error: {e}")
return 0, 0, 0
def probe_duration_json(file_path: str) -> Optional[float]:
"""
使用 ffprobe JSON 输出探测媒体时长
Args:
file_path: 媒体文件路径
Returns:
时长(秒),失败返回 None
"""
try:
result = subprocess.run(
[
'ffprobe', '-v', 'error',
'-show_entries', 'format=duration',
'-of', 'json',
file_path
],
capture_output=True,
timeout=30,
**subprocess_args(False)
)
if result.returncode != 0:
return None
data = json.loads(result.stdout.decode('utf-8'))
duration = data.get('format', {}).get('duration')
return float(duration) if duration else None
except Exception as e:
logger.warning(f"probe_duration_json error: {e}")
return None
class BaseHandler(TaskHandler, ABC):
"""
任务处理器基类
提供所有处理器共用的基础功能,包括:
- 临时目录管理
- 文件下载/上传
- FFmpeg 命令执行
- GPU 设备管理(多显卡调度)
- 日志记录
"""
# 线程本地存储:用于存储当前线程的 GPU 设备索引
_thread_local = threading.local()
DEFAULT_TASK_DOWNLOAD_CONCURRENCY = 4
DEFAULT_TASK_UPLOAD_CONCURRENCY = 2
MAX_TASK_TRANSFER_CONCURRENCY = 16
def __init__(self, config: WorkerConfig, api_client: 'APIClientV2'):
"""
初始化处理器
Args:
config: Worker 配置
api_client: API 客户端
"""
self.config = config
self.api_client = api_client
self.material_cache = MaterialCache(
cache_dir=config.cache_dir,
enabled=config.cache_enabled,
max_size_gb=config.cache_max_size_gb
)
self.task_download_concurrency = self._resolve_task_transfer_concurrency(
"TASK_DOWNLOAD_CONCURRENCY",
self.DEFAULT_TASK_DOWNLOAD_CONCURRENCY
)
self.task_upload_concurrency = self._resolve_task_transfer_concurrency(
"TASK_UPLOAD_CONCURRENCY",
self.DEFAULT_TASK_UPLOAD_CONCURRENCY
)
def _resolve_task_transfer_concurrency(self, env_name: str, default_value: int) -> int:
"""读取并规范化任务内传输并发数配置。"""
raw_value = os.getenv(env_name)
if raw_value is None or not raw_value.strip():
return default_value
try:
parsed_value = int(raw_value.strip())
except ValueError:
logger.warning(
f"Invalid {env_name} value '{raw_value}', using default {default_value}"
)
return default_value
if parsed_value < 1:
logger.warning(f"{env_name} must be >= 1, forcing to 1")
return 1
if parsed_value > self.MAX_TASK_TRANSFER_CONCURRENCY:
logger.warning(
f"{env_name}={parsed_value} exceeds limit {self.MAX_TASK_TRANSFER_CONCURRENCY}, "
f"using {self.MAX_TASK_TRANSFER_CONCURRENCY}"
)
return self.MAX_TASK_TRANSFER_CONCURRENCY
return parsed_value
def download_files_parallel(
self,
download_jobs: List[Dict[str, Any]],
timeout: Optional[int] = None
) -> Dict[str, Dict[str, Any]]:
"""
单任务内并行下载多个文件。
Args:
download_jobs: 下载任务列表。每项字段:
- key: 唯一标识
- url: 下载地址
- dest: 目标文件路径
- required: 是否关键文件(可选,默认 True)
- use_cache: 是否使用缓存(可选,默认 True)
timeout: 单文件下载超时(秒)
Returns:
key -> 结果字典:
- success: 是否成功
- url: 原始 URL
- dest: 目标文件路径
- required: 是否关键文件
"""
if not download_jobs:
return {}
normalized_jobs: List[Dict[str, Any]] = []
seen_keys = set()
for download_job in download_jobs:
job_key = str(download_job.get("key", "")).strip()
job_url = str(download_job.get("url", "")).strip()
job_dest = str(download_job.get("dest", "")).strip()
if not job_key or not job_url or not job_dest:
raise ValueError("Each download job must include non-empty key/url/dest")
if job_key in seen_keys:
raise ValueError(f"Duplicate download job key: {job_key}")
seen_keys.add(job_key)
normalized_jobs.append({
"key": job_key,
"url": job_url,
"dest": job_dest,
"required": bool(download_job.get("required", True)),
"use_cache": bool(download_job.get("use_cache", True)),
})
if timeout is None:
timeout = self.config.download_timeout
parent_otel_context = capture_otel_context()
task_context = get_current_task_context()
task_prefix = f"[task:{task_context.task_id}] " if task_context else ""
results: Dict[str, Dict[str, Any]] = {}
def _run_download_job(download_job: Dict[str, Any]) -> bool:
with bind_trace_context(parent_otel_context, task_context):
return self.download_file(
download_job["url"],
download_job["dest"],
timeout=timeout,
use_cache=download_job["use_cache"],
)
max_workers = min(self.task_download_concurrency, len(normalized_jobs))
if max_workers <= 1:
for download_job in normalized_jobs:
is_success = _run_download_job(download_job)
results[download_job["key"]] = {
"success": is_success,
"url": download_job["url"],
"dest": download_job["dest"],
"required": download_job["required"],
}
else:
with ThreadPoolExecutor(
max_workers=max_workers,
thread_name_prefix="TaskDownload",
) as executor:
future_to_job = {
executor.submit(_run_download_job, download_job): download_job
for download_job in normalized_jobs
}
for completed_future in as_completed(future_to_job):
download_job = future_to_job[completed_future]
is_success = False
try:
is_success = bool(completed_future.result())
except Exception as exc:
logger.error(
f"{task_prefix}Parallel download raised exception for "
f"key={download_job['key']}: {exc}"
)
results[download_job["key"]] = {
"success": is_success,
"url": download_job["url"],
"dest": download_job["dest"],
"required": download_job["required"],
}
success_count = sum(1 for item in results.values() if item["success"])
logger.debug(
f"{task_prefix}Parallel download completed: {success_count}/{len(normalized_jobs)}"
)
return results
def upload_files_parallel(
self,
upload_jobs: List[Dict[str, Any]]
) -> Dict[str, Dict[str, Any]]:
"""
单任务内并行上传多个文件。
Args:
upload_jobs: 上传任务列表。每项字段:
- key: 唯一标识
- task_id: 任务 ID
- file_type: 文件类型(video/audio/ts/mp4)
- file_path: 本地文件路径
- file_name: 文件名(可选)
- required: 是否关键文件(可选,默认 True)
Returns:
key -> 结果字典:
- success: 是否成功
- url: 上传后的访问 URL(失败为 None)
- file_path: 本地文件路径
- required: 是否关键文件
"""
if not upload_jobs:
return {}
normalized_jobs: List[Dict[str, Any]] = []
seen_keys = set()
for upload_job in upload_jobs:
job_key = str(upload_job.get("key", "")).strip()
task_id = str(upload_job.get("task_id", "")).strip()
file_type = str(upload_job.get("file_type", "")).strip()
file_path = str(upload_job.get("file_path", "")).strip()
if not job_key or not task_id or not file_type or not file_path:
raise ValueError(
"Each upload job must include non-empty key/task_id/file_type/file_path"
)
if job_key in seen_keys:
raise ValueError(f"Duplicate upload job key: {job_key}")
seen_keys.add(job_key)
normalized_jobs.append({
"key": job_key,
"task_id": task_id,
"file_type": file_type,
"file_path": file_path,
"file_name": upload_job.get("file_name"),
"required": bool(upload_job.get("required", True)),
})
parent_otel_context = capture_otel_context()
task_context = get_current_task_context()
task_prefix = f"[task:{task_context.task_id}] " if task_context else ""
results: Dict[str, Dict[str, Any]] = {}
def _run_upload_job(upload_job: Dict[str, Any]) -> Optional[str]:
with bind_trace_context(parent_otel_context, task_context):
return self.upload_file(
upload_job["task_id"],
upload_job["file_type"],
upload_job["file_path"],
upload_job.get("file_name")
)
max_workers = min(self.task_upload_concurrency, len(normalized_jobs))
if max_workers <= 1:
for upload_job in normalized_jobs:
result_url = _run_upload_job(upload_job)
results[upload_job["key"]] = {
"success": bool(result_url),
"url": result_url,
"file_path": upload_job["file_path"],
"required": upload_job["required"],
}
else:
with ThreadPoolExecutor(
max_workers=max_workers,
thread_name_prefix="TaskUpload",
) as executor:
future_to_job = {
executor.submit(_run_upload_job, upload_job): upload_job
for upload_job in normalized_jobs
}
for completed_future in as_completed(future_to_job):
upload_job = future_to_job[completed_future]
result_url = None
try:
result_url = completed_future.result()
except Exception as exc:
logger.error(
f"{task_prefix}Parallel upload raised exception for "
f"key={upload_job['key']}: {exc}"
)
results[upload_job["key"]] = {
"success": bool(result_url),
"url": result_url,
"file_path": upload_job["file_path"],
"required": upload_job["required"],
}
success_count = sum(1 for item in results.values() if item["success"])
logger.debug(
f"{task_prefix}Parallel upload completed: {success_count}/{len(normalized_jobs)}"
)
return results
# ========== GPU 设备管理 ==========
def set_gpu_device(self, device_index: int) -> None:
"""
设置当前线程的 GPU 设备索引
由 TaskExecutor 在任务执行前调用。
Args:
device_index: GPU 设备索引
"""
self._thread_local.gpu_device = device_index
def get_gpu_device(self) -> Optional[int]:
"""
获取当前线程的 GPU 设备索引
Returns:
GPU 设备索引,未设置则返回 None
"""
return getattr(self._thread_local, 'gpu_device', None)
def clear_gpu_device(self) -> None:
"""
清除当前线程的 GPU 设备索引
由 TaskExecutor 在任务执行后调用。
"""
if hasattr(self._thread_local, 'gpu_device'):
del self._thread_local.gpu_device
# ========== FFmpeg 参数生成 ==========
def get_video_encode_args(self) -> List[str]:
"""
获取当前配置的视频编码参数
Returns:
FFmpeg 视频编码参数列表
"""
return get_video_encode_args(self.config.hw_accel)
def get_hwaccel_decode_args(self) -> List[str]:
"""
获取硬件加速解码参数(支持设备指定)
Returns:
FFmpeg 硬件加速解码参数列表
"""
device_index = self.get_gpu_device()
return get_hwaccel_decode_args(self.config.hw_accel, device_index)
def get_hwaccel_filter_prefix(self) -> str:
"""
获取硬件加速滤镜前缀
Returns:
需要添加到滤镜链开头的 hwdownload 滤镜字符串
"""
return get_hwaccel_filter_prefix(self.config.hw_accel)
def before_handle(self, task: Task) -> None:
"""处理前钩子"""
logger.debug(f"[task:{task.task_id}] Before handle: {task.task_type.value}")
def after_handle(self, task: Task, result: TaskResult) -> None:
"""处理后钩子"""
status = "success" if result.success else "failed"
logger.debug(f"[task:{task.task_id}] After handle: {status}")
def create_work_dir(self, task_id: str = None) -> str:
"""
创建临时工作目录
Args:
task_id: 任务 ID(用于目录命名)
Returns:
工作目录路径
"""
# 确保临时根目录存在
os.makedirs(self.config.temp_dir, exist_ok=True)
# 创建唯一的工作目录
prefix = f"task_{task_id}_" if task_id else "task_"
work_dir = tempfile.mkdtemp(dir=self.config.temp_dir, prefix=prefix)
logger.debug(f"Created work directory: {work_dir}")
return work_dir
def cleanup_work_dir(self, work_dir: str) -> None:
"""
清理临时工作目录
Args:
work_dir: 工作目录路径
"""
if not work_dir or not os.path.exists(work_dir):
return
try:
shutil.rmtree(work_dir)
logger.debug(f"Cleaned up work directory: {work_dir}")
except Exception as e:
logger.warning(f"Failed to cleanup work directory {work_dir}: {e}")
def download_file(self, url: str, dest: str, timeout: int = None, use_cache: bool = True) -> bool:
"""
下载文件(支持缓存)
Args:
url: 文件 URL
dest: 目标路径
timeout: 超时时间(秒)
use_cache: 是否使用缓存(默认 True)
Returns:
是否成功
"""
if timeout is None:
timeout = self.config.download_timeout
task_context = get_current_task_context()
task_prefix = f"[task:{task_context.task_id}] " if task_context else ""
logger.debug(f"{task_prefix}Downloading from: {url} -> {dest}")
with start_span(
"render.task.file.download",
kind=SpanKind.CLIENT,
attributes={
"render.file.source_url": url,
"render.file.destination": dest,
"render.file.use_cache": use_cache,
},
) as span:
try:
lock_wait_ms = 0
lock_acquired = False
cache_path_used = "unknown"
if use_cache:
result, cache_metrics = self.material_cache.get_or_download_with_metrics(
url,
dest,
timeout=timeout
)
lock_wait_ms = int(cache_metrics.get("lock_wait_ms", 0))
lock_acquired = bool(cache_metrics.get("lock_acquired", False))
cache_path_used = str(cache_metrics.get("cache_path_used", "unknown"))
else:
result = storage.download_file(url, dest, timeout=timeout)
cache_path_used = "direct"
if span is not None:
span.set_attribute("render.file.lock_wait_ms", lock_wait_ms)
span.set_attribute("render.file.lock_acquired", lock_acquired)
span.set_attribute("render.file.cache_path_used", cache_path_used)
if result:
file_size = os.path.getsize(dest) if os.path.exists(dest) else 0
logger.debug(f"{task_prefix}Downloaded: {url} -> {dest} ({file_size} bytes)")
if span is not None:
span.set_attribute("render.file.size_bytes", file_size)
return result
except Exception as e:
mark_span_error(span, str(e), ErrorCode.E_INPUT_UNAVAILABLE.value)
logger.error(f"{task_prefix}Download failed: {e}")
logger.debug(f"{task_prefix}Download source address: {url}")
return False
def upload_file(
self,
task_id: str,
file_type: str,
file_path: str,
file_name: str = None
) -> Optional[str]:
"""
上传文件并返回访问 URL
Args:
task_id: 任务 ID
file_type: 文件类型(video/audio/ts/mp4)
file_path: 本地文件路径
file_name: 文件名(可选)
Returns:
访问 URL,失败返回 None
"""
with start_span(
"render.task.file.upload",
kind=SpanKind.CLIENT,
attributes={
"render.file.type": file_type,
"render.file.path": file_path,
},
) as span:
upload_info = self.api_client.get_upload_url(task_id, file_type, file_name)
if not upload_info:
logger.error(f"[task:{task_id}] Failed to get upload URL")
return None
upload_url = upload_info.get('uploadUrl')
access_url = upload_info.get('accessUrl')
if not upload_url:
logger.error(f"[task:{task_id}] Invalid upload URL response")
return None
logger.debug(
f"[task:{task_id}] Upload target address: uploadUrl={upload_url}, accessUrl={access_url}"
)
if span is not None:
span.set_attribute("render.file.upload_url", upload_url)
if access_url:
span.set_attribute("render.file.access_url", access_url)
try:
result = storage.upload_file(upload_url, file_path, timeout=self.config.upload_timeout)
if result:
file_size = os.path.getsize(file_path)
logger.info(
f"[task:{task_id}] Uploaded: {file_path} ({file_size} bytes)"
)
logger.debug(f"[task:{task_id}] Uploaded access address: {access_url or upload_url}")
if span is not None:
span.set_attribute("render.file.size_bytes", file_size)
if access_url:
self.material_cache.add_to_cache(access_url, file_path)
return access_url
logger.error(f"[task:{task_id}] Upload failed: {file_path}")
return None
except Exception as e:
mark_span_error(span, str(e), ErrorCode.E_UPLOAD_FAILED.value)
logger.error(f"[task:{task_id}] Upload error: {e}")
return None
def run_ffmpeg(
self,
cmd: List[str],
task_id: str,
timeout: int = None
) -> bool:
"""
执行 FFmpeg 命令
Args:
cmd: FFmpeg 命令参数列表
task_id: 任务 ID(用于日志)
timeout: 超时时间(秒)
Returns:
是否成功
"""
if timeout is None:
timeout = self.config.ffmpeg_timeout
cmd_to_run = list(cmd)
if cmd_to_run and cmd_to_run[0] == 'ffmpeg' and '-loglevel' not in cmd_to_run:
cmd_to_run[1:1] = ['-loglevel', FFMPEG_LOGLEVEL]
# 日志记录命令(不限制长度)
cmd_str = ' '.join(cmd_to_run)
logger.info(f"[task:{task_id}] FFmpeg: {cmd_str}")
with start_span(
"render.task.ffmpeg.run",
attributes={
"render.ffmpeg.timeout_seconds": timeout,
"render.ffmpeg.command": cmd_str,
},
) as span:
try:
run_args = subprocess_args(False)
run_args['stdout'] = subprocess.DEVNULL
run_args['stderr'] = subprocess.PIPE
result = subprocess.run(
cmd_to_run,
timeout=timeout,
**run_args
)
if span is not None:
span.set_attribute("render.ffmpeg.return_code", result.returncode)
if result.returncode != 0:
stderr = (result.stderr or b'').decode('utf-8', errors='replace')[:1000]
logger.error(f"[task:{task_id}] FFmpeg failed (code={result.returncode}): {stderr}")
mark_span_error(span, stderr or "ffmpeg failed", ErrorCode.E_FFMPEG_FAILED.value)
return False
return True
except subprocess.TimeoutExpired:
logger.error(f"[task:{task_id}] FFmpeg timeout after {timeout}s")
mark_span_error(span, f"timeout after {timeout}s", ErrorCode.E_TIMEOUT.value)
return False
except Exception as e:
logger.error(f"[task:{task_id}] FFmpeg error: {e}")
mark_span_error(span, str(e), ErrorCode.E_FFMPEG_FAILED.value)
return False
def probe_duration(self, file_path: str) -> Optional[float]:
"""
探测媒体文件时长
Args:
file_path: 文件路径
Returns:
时长(秒),失败返回 None
"""
# 首先尝试 JSON 输出方式
duration = probe_duration_json(file_path)
if duration is not None:
return duration
# 回退到旧方式
try:
_, _, duration = probe_video_info(file_path)
return float(duration) if duration else None
except Exception as e:
logger.warning(f"Failed to probe duration: {file_path} -> {e}")
return None
def get_file_size(self, file_path: str) -> int:
"""
获取文件大小
Args:
file_path: 文件路径
Returns:
文件大小(字节)
"""
try:
return os.path.getsize(file_path)
except Exception:
return 0
def ensure_file_exists(self, file_path: str, min_size: int = 0) -> bool:
"""
确保文件存在且大小满足要求
Args:
file_path: 文件路径
min_size: 最小大小(字节)
Returns:
是否满足要求
"""
if not os.path.exists(file_path):
return False
return os.path.getsize(file_path) >= min_size