You've already forked FrameTour-RenderWorker
Compare commits
13 Commits
d955def63c
...
next
| Author | SHA1 | Date | |
|---|---|---|---|
| affe933fec | |||
| 379e0bf999 | |||
| d54e6e948f | |||
| ca90336905 | |||
| 34e7d84d52 | |||
| 9dd5b6237d | |||
| c2ece02ecf | |||
| 952b8f5c01 | |||
| 3cb2f8d02a | |||
| ef4cf549c4 | |||
| 16ea45ad1c | |||
| ad4a9cc869 | |||
| 88aa3adca1 |
@@ -30,6 +30,8 @@ TEMP_DIR=tmp/
|
||||
#FFMPEG_TIMEOUT=3600 # FFmpeg 执行超时(秒)
|
||||
#DOWNLOAD_TIMEOUT=300 # 下载超时(秒)
|
||||
#UPLOAD_TIMEOUT=600 # 上传超时(秒)
|
||||
#TASK_DOWNLOAD_CONCURRENCY=4 # 单任务内并行下载数(1-16)
|
||||
#TASK_UPLOAD_CONCURRENCY=2 # 单任务内并行上传数(1-16)
|
||||
|
||||
# ===================
|
||||
# 硬件加速与多显卡
|
||||
|
||||
@@ -10,10 +10,9 @@ SOFTWARE_VERSION = '2.0.0'
|
||||
|
||||
# 支持的任务类型
|
||||
TASK_TYPES = (
|
||||
'RENDER_SEGMENT_VIDEO',
|
||||
'RENDER_SEGMENT_TS',
|
||||
'COMPOSE_TRANSITION',
|
||||
'PREPARE_JOB_AUDIO',
|
||||
'PACKAGE_SEGMENT_TS',
|
||||
'FINALIZE_MP4',
|
||||
)
|
||||
|
||||
@@ -39,6 +38,7 @@ EFFECT_TYPES = (
|
||||
'cameraShot', # 相机定格效果:在指定时间点冻结画面
|
||||
'zoom', # 缩放效果(预留)
|
||||
'blur', # 模糊效果(预留)
|
||||
'ospeed', # 原始变速效果(兼容旧模板)
|
||||
)
|
||||
|
||||
# 硬件加速类型
|
||||
|
||||
@@ -17,9 +17,8 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
# 默认支持的任务类型
|
||||
DEFAULT_CAPABILITIES = [
|
||||
"RENDER_SEGMENT_VIDEO",
|
||||
"RENDER_SEGMENT_TS",
|
||||
"PREPARE_JOB_AUDIO",
|
||||
"PACKAGE_SEGMENT_TS",
|
||||
"FINALIZE_MP4"
|
||||
]
|
||||
|
||||
|
||||
100
domain/task.py
100
domain/task.py
@@ -5,12 +5,13 @@
|
||||
定义任务类型、任务实体、渲染规格、输出规格等数据结构。
|
||||
"""
|
||||
|
||||
from enum import Enum
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Dict, Any, Optional, List
|
||||
from datetime import datetime
|
||||
from urllib.parse import urlparse, unquote
|
||||
import os
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime
|
||||
from enum import Enum
|
||||
from math import isfinite
|
||||
from typing import Dict, Any, Optional, List
|
||||
from urllib.parse import urlparse, unquote
|
||||
|
||||
|
||||
# 支持的图片扩展名
|
||||
@@ -19,12 +20,15 @@ IMAGE_EXTENSIONS = {'.jpg', '.jpeg', '.png', '.webp', '.bmp', '.gif'}
|
||||
|
||||
class TaskType(Enum):
|
||||
"""任务类型枚举"""
|
||||
RENDER_SEGMENT_VIDEO = "RENDER_SEGMENT_VIDEO" # 渲染视频片段
|
||||
RENDER_SEGMENT_TS = "RENDER_SEGMENT_TS" # 渲染+封装 TS(合并原 RENDER_SEGMENT_VIDEO + PACKAGE_SEGMENT_TS)
|
||||
COMPOSE_TRANSITION = "COMPOSE_TRANSITION" # 合成转场效果
|
||||
PREPARE_JOB_AUDIO = "PREPARE_JOB_AUDIO" # 生成全局音频
|
||||
PACKAGE_SEGMENT_TS = "PACKAGE_SEGMENT_TS" # 封装 TS 分片
|
||||
FINALIZE_MP4 = "FINALIZE_MP4" # 产出最终 MP4
|
||||
|
||||
# Deprecated: 历史任务类型,保留枚举值供兼容
|
||||
RENDER_SEGMENT_VIDEO = "RENDER_SEGMENT_VIDEO"
|
||||
PACKAGE_SEGMENT_TS = "PACKAGE_SEGMENT_TS"
|
||||
|
||||
|
||||
# 支持的转场类型(对应 FFmpeg xfade 参数)
|
||||
TRANSITION_TYPES = {
|
||||
@@ -45,6 +49,7 @@ EFFECT_TYPES = {
|
||||
'cameraShot', # 相机定格效果
|
||||
'zoom', # 缩放效果(预留)
|
||||
'blur', # 模糊效果(预留)
|
||||
'ospeed', # 原始变速效果(兼容旧模板)
|
||||
}
|
||||
|
||||
|
||||
@@ -95,7 +100,9 @@ class Effect:
|
||||
特效配置
|
||||
|
||||
格式:type:params
|
||||
例如:cameraShot:3,1 表示在第3秒定格1秒
|
||||
例如:
|
||||
- cameraShot:3,1 表示在第3秒定格1秒
|
||||
- zoom:1.5,1.2,2 表示从第1.5秒开始放大 1.2 倍并持续 2 秒
|
||||
"""
|
||||
effect_type: str # 效果类型
|
||||
params: str = "" # 参数字符串
|
||||
@@ -122,7 +129,7 @@ class Effect:
|
||||
解析效果字符串
|
||||
|
||||
格式:effect1|effect2|effect3
|
||||
例如:cameraShot:3,1|blur:5
|
||||
例如:cameraShot:3,1|zoom:1.5,1.2,2
|
||||
"""
|
||||
if not effects_str:
|
||||
return []
|
||||
@@ -152,6 +159,54 @@ class Effect:
|
||||
except ValueError:
|
||||
return (3, 1)
|
||||
|
||||
def get_zoom_params(self) -> tuple:
|
||||
"""
|
||||
获取 zoom 效果参数
|
||||
|
||||
Returns:
|
||||
(start_sec, scale_factor, duration_sec): 起始时间、放大倍数、持续时长(秒)
|
||||
"""
|
||||
if self.effect_type != 'zoom':
|
||||
return (0.0, 1.2, 1.0)
|
||||
|
||||
default_start_sec = 0.0
|
||||
default_scale_factor = 1.2
|
||||
default_duration_sec = 1.0
|
||||
|
||||
if not self.params:
|
||||
return (default_start_sec, default_scale_factor, default_duration_sec)
|
||||
|
||||
parts = [part.strip() for part in self.params.split(',')]
|
||||
try:
|
||||
start_sec = float(parts[0]) if len(parts) >= 1 and parts[0] else default_start_sec
|
||||
scale_factor = float(parts[1]) if len(parts) >= 2 and parts[1] else default_scale_factor
|
||||
duration_sec = float(parts[2]) if len(parts) >= 3 and parts[2] else default_duration_sec
|
||||
except ValueError:
|
||||
return (default_start_sec, default_scale_factor, default_duration_sec)
|
||||
|
||||
if not isfinite(start_sec) or start_sec < 0:
|
||||
start_sec = default_start_sec
|
||||
if not isfinite(scale_factor) or scale_factor <= 1.0:
|
||||
scale_factor = default_scale_factor
|
||||
if not isfinite(duration_sec) or duration_sec <= 0:
|
||||
duration_sec = default_duration_sec
|
||||
|
||||
return (start_sec, scale_factor, duration_sec)
|
||||
|
||||
def get_ospeed_params(self) -> float:
|
||||
"""获取 ospeed 效果参数,返回 PTS 乘数(>0),无效时返回 1.0"""
|
||||
if self.effect_type != 'ospeed':
|
||||
return 1.0
|
||||
if not self.params:
|
||||
return 1.0
|
||||
try:
|
||||
factor = float(self.params.strip())
|
||||
except ValueError:
|
||||
return 1.0
|
||||
if not isfinite(factor) or factor <= 0:
|
||||
return 1.0
|
||||
return factor
|
||||
|
||||
|
||||
@dataclass
|
||||
class RenderSpec:
|
||||
@@ -161,14 +216,13 @@ class RenderSpec:
|
||||
用于 RENDER_SEGMENT_VIDEO 任务,定义视频渲染参数。
|
||||
"""
|
||||
crop_enable: bool = False
|
||||
crop_size: Optional[str] = None
|
||||
crop_scale: float = 1.0
|
||||
speed: str = "1.0"
|
||||
lut_url: Optional[str] = None
|
||||
overlay_url: Optional[str] = None
|
||||
effects: Optional[str] = None
|
||||
zoom_cut: bool = False
|
||||
video_crop: Optional[str] = None
|
||||
face_pos: Optional[str] = None
|
||||
crop_pos: Optional[str] = None
|
||||
transitions: Optional[str] = None
|
||||
# 转场配置(PRD v2 新增)
|
||||
transition_in: Optional[TransitionConfig] = None # 入场转场
|
||||
@@ -179,16 +233,24 @@ class RenderSpec:
|
||||
"""从字典创建 RenderSpec"""
|
||||
if not data:
|
||||
return cls()
|
||||
|
||||
# 安全解析 cropScale:接受浮点数或字符串浮点数,非法值回退到 1.0
|
||||
try:
|
||||
crop_scale = float(data.get('cropScale', 1.0))
|
||||
if crop_scale <= 0 or not isfinite(crop_scale):
|
||||
crop_scale = 1.0
|
||||
except (ValueError, TypeError):
|
||||
crop_scale = 1.0
|
||||
|
||||
return cls(
|
||||
crop_enable=data.get('cropEnable', False),
|
||||
crop_size=data.get('cropSize'),
|
||||
crop_scale=crop_scale,
|
||||
speed=str(data.get('speed', '1.0')),
|
||||
lut_url=data.get('lutUrl'),
|
||||
overlay_url=data.get('overlayUrl'),
|
||||
effects=data.get('effects'),
|
||||
zoom_cut=data.get('zoomCut', False),
|
||||
video_crop=data.get('videoCrop'),
|
||||
face_pos=data.get('facePos'),
|
||||
crop_pos=data.get('cropPos'),
|
||||
transitions=data.get('transitions'),
|
||||
transition_in=TransitionConfig.from_dict(data.get('transitionIn')),
|
||||
transition_out=TransitionConfig.from_dict(data.get('transitionOut'))
|
||||
@@ -489,6 +551,14 @@ class Task:
|
||||
"""获取片段列表"""
|
||||
return self.payload.get('segments', [])
|
||||
|
||||
def get_global_audio_fade_in_ms(self) -> int:
|
||||
"""获取全局音频淡入时长(毫秒),0 表示不淡入"""
|
||||
return int(self.payload.get('globalAudioFadeInMs', 0))
|
||||
|
||||
def get_global_audio_fade_out_ms(self) -> int:
|
||||
"""获取全局音频淡出时长(毫秒),0 表示不淡出"""
|
||||
return int(self.payload.get('globalAudioFadeOutMs', 0))
|
||||
|
||||
def get_audio_profile(self) -> AudioProfile:
|
||||
"""获取音频配置"""
|
||||
return AudioProfile.from_dict(self.payload.get('audioProfile'))
|
||||
|
||||
@@ -6,17 +6,15 @@
|
||||
"""
|
||||
|
||||
from handlers.base import BaseHandler
|
||||
from handlers.render_video import RenderSegmentVideoHandler
|
||||
from handlers.render_video import RenderSegmentTsHandler
|
||||
from handlers.compose_transition import ComposeTransitionHandler
|
||||
from handlers.prepare_audio import PrepareJobAudioHandler
|
||||
from handlers.package_ts import PackageSegmentTsHandler
|
||||
from handlers.finalize_mp4 import FinalizeMp4Handler
|
||||
|
||||
__all__ = [
|
||||
'BaseHandler',
|
||||
'RenderSegmentVideoHandler',
|
||||
'RenderSegmentTsHandler',
|
||||
'ComposeTransitionHandler',
|
||||
'PrepareJobAudioHandler',
|
||||
'PackageSegmentTsHandler',
|
||||
'FinalizeMp4Handler',
|
||||
]
|
||||
|
||||
378
handlers/base.py
378
handlers/base.py
@@ -12,6 +12,7 @@ import shutil
|
||||
import tempfile
|
||||
import subprocess
|
||||
import threading
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
from abc import ABC
|
||||
from typing import Optional, List, Dict, Any, Tuple, TYPE_CHECKING
|
||||
|
||||
@@ -23,7 +24,13 @@ from domain.result import TaskResult, ErrorCode
|
||||
from domain.config import WorkerConfig
|
||||
from services import storage
|
||||
from services.cache import MaterialCache
|
||||
from util.tracing import get_current_task_context, mark_span_error, start_span
|
||||
from util.tracing import (
|
||||
bind_trace_context,
|
||||
capture_otel_context,
|
||||
get_current_task_context,
|
||||
mark_span_error,
|
||||
start_span,
|
||||
)
|
||||
from constant import (
|
||||
HW_ACCEL_NONE, HW_ACCEL_QSV, HW_ACCEL_CUDA,
|
||||
VIDEO_ENCODE_PARAMS, VIDEO_ENCODE_PARAMS_QSV, VIDEO_ENCODE_PARAMS_CUDA
|
||||
@@ -35,49 +42,71 @@ if TYPE_CHECKING:
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def get_video_encode_args(hw_accel: str = HW_ACCEL_NONE) -> List[str]:
|
||||
def get_video_encode_args(hw_accel: str = HW_ACCEL_NONE, maxrate: Optional[int] = None) -> List[str]:
|
||||
"""
|
||||
根据硬件加速配置获取视频编码参数
|
||||
|
||||
Args:
|
||||
hw_accel: 硬件加速类型 (none, qsv, cuda)
|
||||
maxrate: 最大码率(bps),用于限制 CRF/CQ 模式的峰值码率。
|
||||
例如 4000000 表示 4Mbps。
|
||||
bufsize 设为 1x maxrate(1 秒窗口),适合短视频(<10s)的严格码率控制。
|
||||
|
||||
Returns:
|
||||
FFmpeg 视频编码参数列表
|
||||
"""
|
||||
has_maxrate = maxrate is not None and maxrate > 0
|
||||
|
||||
if hw_accel == HW_ACCEL_QSV:
|
||||
params = VIDEO_ENCODE_PARAMS_QSV
|
||||
return [
|
||||
args = [
|
||||
'-c:v', params['codec'],
|
||||
'-preset', params['preset'],
|
||||
'-profile:v', params['profile'],
|
||||
'-level', params['level'],
|
||||
'-global_quality', params['global_quality'],
|
||||
'-look_ahead', params['look_ahead'],
|
||||
# 禁用 B 帧,避免独立 TS 分片在 HLS 边界出现 PTS/DTS 回退
|
||||
'-bf', '0',
|
||||
]
|
||||
elif hw_accel == HW_ACCEL_CUDA:
|
||||
params = VIDEO_ENCODE_PARAMS_CUDA
|
||||
return [
|
||||
args = [
|
||||
'-c:v', params['codec'],
|
||||
'-preset', params['preset'],
|
||||
'-profile:v', params['profile'],
|
||||
'-level', params['level'],
|
||||
'-rc', params['rc'],
|
||||
'-cq', params['cq'],
|
||||
'-b:v', '0', # 配合 vbr 模式使用 cq
|
||||
# 有 maxrate 时设置 -b:v 为 maxrate,让 NVENC VBV 模型真正生效;
|
||||
# 无 maxrate 时保持 -b:v 0(纯 CQ 质量模式)
|
||||
'-b:v', f'{maxrate // 1000}k' if has_maxrate else '0',
|
||||
# 禁用 B 帧,避免独立 TS 分片在 HLS 边界出现 PTS/DTS 回退
|
||||
'-bf', '0',
|
||||
]
|
||||
else:
|
||||
# 软件编码(默认)
|
||||
params = VIDEO_ENCODE_PARAMS
|
||||
return [
|
||||
args = [
|
||||
'-c:v', params['codec'],
|
||||
'-preset', params['preset'],
|
||||
'-profile:v', params['profile'],
|
||||
'-level', params['level'],
|
||||
'-crf', params['crf'],
|
||||
'-pix_fmt', params['pix_fmt'],
|
||||
# 禁用 B 帧,避免独立 TS 分片在 HLS 边界出现 PTS/DTS 回退
|
||||
'-bf', '0',
|
||||
]
|
||||
|
||||
# CRF/CQ + maxrate 上限:保留质量控制的同时限制峰值码率
|
||||
# bufsize = 1x maxrate(1 秒窗口),对短视频 VBV 约束更紧,避免缓冲区过大导致码率失控
|
||||
if has_maxrate:
|
||||
maxrate_k = f'{maxrate // 1000}k'
|
||||
bufsize_k = maxrate_k # 1x maxrate,短视频下收敛更快
|
||||
args.extend(['-maxrate', maxrate_k, '-bufsize', bufsize_k])
|
||||
|
||||
return args
|
||||
|
||||
|
||||
def get_hwaccel_decode_args(hw_accel: str = HW_ACCEL_NONE, device_index: Optional[int] = None) -> List[str]:
|
||||
"""
|
||||
@@ -274,6 +303,9 @@ class BaseHandler(TaskHandler, ABC):
|
||||
|
||||
# 线程本地存储:用于存储当前线程的 GPU 设备索引
|
||||
_thread_local = threading.local()
|
||||
DEFAULT_TASK_DOWNLOAD_CONCURRENCY = 4
|
||||
DEFAULT_TASK_UPLOAD_CONCURRENCY = 2
|
||||
MAX_TASK_TRANSFER_CONCURRENCY = 16
|
||||
|
||||
def __init__(self, config: WorkerConfig, api_client: 'APIClientV2'):
|
||||
"""
|
||||
@@ -290,6 +322,251 @@ class BaseHandler(TaskHandler, ABC):
|
||||
enabled=config.cache_enabled,
|
||||
max_size_gb=config.cache_max_size_gb
|
||||
)
|
||||
self.task_download_concurrency = self._resolve_task_transfer_concurrency(
|
||||
"TASK_DOWNLOAD_CONCURRENCY",
|
||||
self.DEFAULT_TASK_DOWNLOAD_CONCURRENCY
|
||||
)
|
||||
self.task_upload_concurrency = self._resolve_task_transfer_concurrency(
|
||||
"TASK_UPLOAD_CONCURRENCY",
|
||||
self.DEFAULT_TASK_UPLOAD_CONCURRENCY
|
||||
)
|
||||
|
||||
def _resolve_task_transfer_concurrency(self, env_name: str, default_value: int) -> int:
|
||||
"""读取并规范化任务内传输并发数配置。"""
|
||||
raw_value = os.getenv(env_name)
|
||||
if raw_value is None or not raw_value.strip():
|
||||
return default_value
|
||||
|
||||
try:
|
||||
parsed_value = int(raw_value.strip())
|
||||
except ValueError:
|
||||
logger.warning(
|
||||
f"Invalid {env_name} value '{raw_value}', using default {default_value}"
|
||||
)
|
||||
return default_value
|
||||
|
||||
if parsed_value < 1:
|
||||
logger.warning(f"{env_name} must be >= 1, forcing to 1")
|
||||
return 1
|
||||
|
||||
if parsed_value > self.MAX_TASK_TRANSFER_CONCURRENCY:
|
||||
logger.warning(
|
||||
f"{env_name}={parsed_value} exceeds limit {self.MAX_TASK_TRANSFER_CONCURRENCY}, "
|
||||
f"using {self.MAX_TASK_TRANSFER_CONCURRENCY}"
|
||||
)
|
||||
return self.MAX_TASK_TRANSFER_CONCURRENCY
|
||||
|
||||
return parsed_value
|
||||
|
||||
def download_files_parallel(
|
||||
self,
|
||||
download_jobs: List[Dict[str, Any]],
|
||||
timeout: Optional[int] = None
|
||||
) -> Dict[str, Dict[str, Any]]:
|
||||
"""
|
||||
单任务内并行下载多个文件。
|
||||
|
||||
Args:
|
||||
download_jobs: 下载任务列表。每项字段:
|
||||
- key: 唯一标识
|
||||
- url: 下载地址
|
||||
- dest: 目标文件路径
|
||||
- required: 是否关键文件(可选,默认 True)
|
||||
- use_cache: 是否使用缓存(可选,默认 True)
|
||||
timeout: 单文件下载超时(秒)
|
||||
|
||||
Returns:
|
||||
key -> 结果字典:
|
||||
- success: 是否成功
|
||||
- url: 原始 URL
|
||||
- dest: 目标文件路径
|
||||
- required: 是否关键文件
|
||||
"""
|
||||
if not download_jobs:
|
||||
return {}
|
||||
|
||||
normalized_jobs: List[Dict[str, Any]] = []
|
||||
seen_keys = set()
|
||||
for download_job in download_jobs:
|
||||
job_key = str(download_job.get("key", "")).strip()
|
||||
job_url = str(download_job.get("url", "")).strip()
|
||||
job_dest = str(download_job.get("dest", "")).strip()
|
||||
if not job_key or not job_url or not job_dest:
|
||||
raise ValueError("Each download job must include non-empty key/url/dest")
|
||||
if job_key in seen_keys:
|
||||
raise ValueError(f"Duplicate download job key: {job_key}")
|
||||
seen_keys.add(job_key)
|
||||
normalized_jobs.append({
|
||||
"key": job_key,
|
||||
"url": job_url,
|
||||
"dest": job_dest,
|
||||
"required": bool(download_job.get("required", True)),
|
||||
"use_cache": bool(download_job.get("use_cache", True)),
|
||||
})
|
||||
|
||||
if timeout is None:
|
||||
timeout = self.config.download_timeout
|
||||
|
||||
parent_otel_context = capture_otel_context()
|
||||
task_context = get_current_task_context()
|
||||
task_prefix = f"[task:{task_context.task_id}] " if task_context else ""
|
||||
results: Dict[str, Dict[str, Any]] = {}
|
||||
|
||||
def _run_download_job(download_job: Dict[str, Any]) -> bool:
|
||||
with bind_trace_context(parent_otel_context, task_context):
|
||||
return self.download_file(
|
||||
download_job["url"],
|
||||
download_job["dest"],
|
||||
timeout=timeout,
|
||||
use_cache=download_job["use_cache"],
|
||||
)
|
||||
|
||||
max_workers = min(self.task_download_concurrency, len(normalized_jobs))
|
||||
if max_workers <= 1:
|
||||
for download_job in normalized_jobs:
|
||||
is_success = _run_download_job(download_job)
|
||||
results[download_job["key"]] = {
|
||||
"success": is_success,
|
||||
"url": download_job["url"],
|
||||
"dest": download_job["dest"],
|
||||
"required": download_job["required"],
|
||||
}
|
||||
else:
|
||||
with ThreadPoolExecutor(
|
||||
max_workers=max_workers,
|
||||
thread_name_prefix="TaskDownload",
|
||||
) as executor:
|
||||
future_to_job = {
|
||||
executor.submit(_run_download_job, download_job): download_job
|
||||
for download_job in normalized_jobs
|
||||
}
|
||||
for completed_future in as_completed(future_to_job):
|
||||
download_job = future_to_job[completed_future]
|
||||
is_success = False
|
||||
try:
|
||||
is_success = bool(completed_future.result())
|
||||
except Exception as exc:
|
||||
logger.error(
|
||||
f"{task_prefix}Parallel download raised exception for "
|
||||
f"key={download_job['key']}: {exc}"
|
||||
)
|
||||
results[download_job["key"]] = {
|
||||
"success": is_success,
|
||||
"url": download_job["url"],
|
||||
"dest": download_job["dest"],
|
||||
"required": download_job["required"],
|
||||
}
|
||||
|
||||
success_count = sum(1 for item in results.values() if item["success"])
|
||||
logger.debug(
|
||||
f"{task_prefix}Parallel download completed: {success_count}/{len(normalized_jobs)}"
|
||||
)
|
||||
return results
|
||||
|
||||
def upload_files_parallel(
|
||||
self,
|
||||
upload_jobs: List[Dict[str, Any]]
|
||||
) -> Dict[str, Dict[str, Any]]:
|
||||
"""
|
||||
单任务内并行上传多个文件。
|
||||
|
||||
Args:
|
||||
upload_jobs: 上传任务列表。每项字段:
|
||||
- key: 唯一标识
|
||||
- task_id: 任务 ID
|
||||
- file_type: 文件类型(video/audio/ts/mp4)
|
||||
- file_path: 本地文件路径
|
||||
- file_name: 文件名(可选)
|
||||
- required: 是否关键文件(可选,默认 True)
|
||||
|
||||
Returns:
|
||||
key -> 结果字典:
|
||||
- success: 是否成功
|
||||
- url: 上传后的访问 URL(失败为 None)
|
||||
- file_path: 本地文件路径
|
||||
- required: 是否关键文件
|
||||
"""
|
||||
if not upload_jobs:
|
||||
return {}
|
||||
|
||||
normalized_jobs: List[Dict[str, Any]] = []
|
||||
seen_keys = set()
|
||||
for upload_job in upload_jobs:
|
||||
job_key = str(upload_job.get("key", "")).strip()
|
||||
task_id = str(upload_job.get("task_id", "")).strip()
|
||||
file_type = str(upload_job.get("file_type", "")).strip()
|
||||
file_path = str(upload_job.get("file_path", "")).strip()
|
||||
if not job_key or not task_id or not file_type or not file_path:
|
||||
raise ValueError(
|
||||
"Each upload job must include non-empty key/task_id/file_type/file_path"
|
||||
)
|
||||
if job_key in seen_keys:
|
||||
raise ValueError(f"Duplicate upload job key: {job_key}")
|
||||
seen_keys.add(job_key)
|
||||
normalized_jobs.append({
|
||||
"key": job_key,
|
||||
"task_id": task_id,
|
||||
"file_type": file_type,
|
||||
"file_path": file_path,
|
||||
"file_name": upload_job.get("file_name"),
|
||||
"required": bool(upload_job.get("required", True)),
|
||||
})
|
||||
|
||||
parent_otel_context = capture_otel_context()
|
||||
task_context = get_current_task_context()
|
||||
task_prefix = f"[task:{task_context.task_id}] " if task_context else ""
|
||||
results: Dict[str, Dict[str, Any]] = {}
|
||||
|
||||
def _run_upload_job(upload_job: Dict[str, Any]) -> Optional[str]:
|
||||
with bind_trace_context(parent_otel_context, task_context):
|
||||
return self.upload_file(
|
||||
upload_job["task_id"],
|
||||
upload_job["file_type"],
|
||||
upload_job["file_path"],
|
||||
upload_job.get("file_name")
|
||||
)
|
||||
|
||||
max_workers = min(self.task_upload_concurrency, len(normalized_jobs))
|
||||
if max_workers <= 1:
|
||||
for upload_job in normalized_jobs:
|
||||
result_url = _run_upload_job(upload_job)
|
||||
results[upload_job["key"]] = {
|
||||
"success": bool(result_url),
|
||||
"url": result_url,
|
||||
"file_path": upload_job["file_path"],
|
||||
"required": upload_job["required"],
|
||||
}
|
||||
else:
|
||||
with ThreadPoolExecutor(
|
||||
max_workers=max_workers,
|
||||
thread_name_prefix="TaskUpload",
|
||||
) as executor:
|
||||
future_to_job = {
|
||||
executor.submit(_run_upload_job, upload_job): upload_job
|
||||
for upload_job in normalized_jobs
|
||||
}
|
||||
for completed_future in as_completed(future_to_job):
|
||||
upload_job = future_to_job[completed_future]
|
||||
result_url = None
|
||||
try:
|
||||
result_url = completed_future.result()
|
||||
except Exception as exc:
|
||||
logger.error(
|
||||
f"{task_prefix}Parallel upload raised exception for "
|
||||
f"key={upload_job['key']}: {exc}"
|
||||
)
|
||||
results[upload_job["key"]] = {
|
||||
"success": bool(result_url),
|
||||
"url": result_url,
|
||||
"file_path": upload_job["file_path"],
|
||||
"required": upload_job["required"],
|
||||
}
|
||||
|
||||
success_count = sum(1 for item in results.values() if item["success"])
|
||||
logger.debug(
|
||||
f"{task_prefix}Parallel upload completed: {success_count}/{len(normalized_jobs)}"
|
||||
)
|
||||
return results
|
||||
|
||||
# ========== GPU 设备管理 ==========
|
||||
|
||||
@@ -324,14 +601,17 @@ class BaseHandler(TaskHandler, ABC):
|
||||
|
||||
# ========== FFmpeg 参数生成 ==========
|
||||
|
||||
def get_video_encode_args(self) -> List[str]:
|
||||
def get_video_encode_args(self, maxrate: Optional[int] = None) -> List[str]:
|
||||
"""
|
||||
获取当前配置的视频编码参数
|
||||
|
||||
Args:
|
||||
maxrate: 最大码率(bps),用于限制峰值码率
|
||||
|
||||
Returns:
|
||||
FFmpeg 视频编码参数列表
|
||||
"""
|
||||
return get_video_encode_args(self.config.hw_accel)
|
||||
return get_video_encode_args(self.config.hw_accel, maxrate=maxrate)
|
||||
|
||||
def get_hwaccel_decode_args(self) -> List[str]:
|
||||
"""
|
||||
@@ -427,10 +707,26 @@ class BaseHandler(TaskHandler, ABC):
|
||||
},
|
||||
) as span:
|
||||
try:
|
||||
lock_wait_ms = 0
|
||||
lock_acquired = False
|
||||
cache_path_used = "unknown"
|
||||
if use_cache:
|
||||
result = self.material_cache.get_or_download(url, dest, timeout=timeout)
|
||||
result, cache_metrics = self.material_cache.get_or_download_with_metrics(
|
||||
url,
|
||||
dest,
|
||||
timeout=timeout
|
||||
)
|
||||
lock_wait_ms = int(cache_metrics.get("lock_wait_ms", 0))
|
||||
lock_acquired = bool(cache_metrics.get("lock_acquired", False))
|
||||
cache_path_used = str(cache_metrics.get("cache_path_used", "unknown"))
|
||||
else:
|
||||
result = storage.download_file(url, dest, timeout=timeout)
|
||||
cache_path_used = "direct"
|
||||
|
||||
if span is not None:
|
||||
span.set_attribute("render.file.lock_wait_ms", lock_wait_ms)
|
||||
span.set_attribute("render.file.lock_acquired", lock_acquired)
|
||||
span.set_attribute("render.file.cache_path_used", cache_path_used)
|
||||
|
||||
if result:
|
||||
file_size = os.path.getsize(dest) if os.path.exists(dest) else 0
|
||||
@@ -463,16 +759,22 @@ class BaseHandler(TaskHandler, ABC):
|
||||
Returns:
|
||||
访问 URL,失败返回 None
|
||||
"""
|
||||
local_file_exists = os.path.exists(file_path)
|
||||
local_file_size = os.path.getsize(file_path) if local_file_exists else 0
|
||||
with start_span(
|
||||
"render.task.file.upload",
|
||||
kind=SpanKind.CLIENT,
|
||||
attributes={
|
||||
"render.file.type": file_type,
|
||||
"render.file.path": file_path,
|
||||
"render.file.timeout_seconds": self.config.upload_timeout,
|
||||
"render.file.local_exists": local_file_exists,
|
||||
"render.file.local_size_bytes": local_file_size,
|
||||
},
|
||||
) as span:
|
||||
upload_info = self.api_client.get_upload_url(task_id, file_type, file_name)
|
||||
if not upload_info:
|
||||
mark_span_error(span, "get upload url failed", ErrorCode.E_UPLOAD_FAILED.value)
|
||||
logger.error(f"[task:{task_id}] Failed to get upload URL")
|
||||
return None
|
||||
|
||||
@@ -480,6 +782,7 @@ class BaseHandler(TaskHandler, ABC):
|
||||
access_url = upload_info.get('accessUrl')
|
||||
|
||||
if not upload_url:
|
||||
mark_span_error(span, "invalid upload url response", ErrorCode.E_UPLOAD_FAILED.value)
|
||||
logger.error(f"[task:{task_id}] Invalid upload URL response")
|
||||
return None
|
||||
|
||||
@@ -492,9 +795,40 @@ class BaseHandler(TaskHandler, ABC):
|
||||
span.set_attribute("render.file.access_url", access_url)
|
||||
|
||||
try:
|
||||
result = storage.upload_file(upload_url, file_path, timeout=self.config.upload_timeout)
|
||||
result, upload_metrics = storage.upload_file_with_metrics(
|
||||
upload_url,
|
||||
file_path,
|
||||
timeout=self.config.upload_timeout,
|
||||
)
|
||||
upload_method = str(upload_metrics.get("upload_method", "unknown"))
|
||||
http_attempts = int(upload_metrics.get("http_attempts", 0))
|
||||
http_retry_count = int(upload_metrics.get("http_retry_count", 0))
|
||||
http_status_code = int(upload_metrics.get("http_status_code", 0))
|
||||
http_replace_applied = bool(upload_metrics.get("http_replace_applied", False))
|
||||
content_type = str(upload_metrics.get("content_type", ""))
|
||||
error_type = str(upload_metrics.get("error_type", ""))
|
||||
rclone_attempted = bool(upload_metrics.get("rclone_attempted", False))
|
||||
rclone_succeeded = bool(upload_metrics.get("rclone_succeeded", False))
|
||||
rclone_fallback_http = bool(upload_metrics.get("rclone_fallback_http", False))
|
||||
|
||||
if span is not None:
|
||||
span.set_attribute("render.file.upload_success", bool(result))
|
||||
span.set_attribute("render.file.upload_method", upload_method)
|
||||
span.set_attribute("render.file.http_attempts", http_attempts)
|
||||
span.set_attribute("render.file.http_retry_count", http_retry_count)
|
||||
span.set_attribute("render.file.http_replace_applied", http_replace_applied)
|
||||
span.set_attribute("render.file.rclone_attempted", rclone_attempted)
|
||||
span.set_attribute("render.file.rclone_succeeded", rclone_succeeded)
|
||||
span.set_attribute("render.file.rclone_fallback_http", rclone_fallback_http)
|
||||
if content_type:
|
||||
span.set_attribute("render.file.content_type", content_type)
|
||||
if http_status_code > 0:
|
||||
span.set_attribute("render.file.http_status_code", http_status_code)
|
||||
if error_type:
|
||||
span.set_attribute("render.file.error_type", error_type)
|
||||
|
||||
if result:
|
||||
file_size = os.path.getsize(file_path)
|
||||
file_size = local_file_size if local_file_size > 0 else os.path.getsize(file_path)
|
||||
logger.info(
|
||||
f"[task:{task_id}] Uploaded: {file_path} ({file_size} bytes)"
|
||||
)
|
||||
@@ -502,12 +836,26 @@ class BaseHandler(TaskHandler, ABC):
|
||||
if span is not None:
|
||||
span.set_attribute("render.file.size_bytes", file_size)
|
||||
|
||||
cache_write_back = "skipped"
|
||||
if access_url:
|
||||
self.material_cache.add_to_cache(access_url, file_path)
|
||||
cache_added = self.material_cache.add_to_cache(access_url, file_path)
|
||||
cache_write_back = "success" if cache_added else "failed"
|
||||
if not cache_added:
|
||||
logger.warning(f"[task:{task_id}] Upload cache write back failed: {file_path}")
|
||||
if span is not None:
|
||||
span.set_attribute("render.file.cache_write_back", cache_write_back)
|
||||
|
||||
return access_url
|
||||
|
||||
logger.error(f"[task:{task_id}] Upload failed: {file_path}")
|
||||
mark_span_error(
|
||||
span,
|
||||
f"upload failed(method={upload_method}, status={http_status_code}, retries={http_retry_count}, error={error_type})",
|
||||
ErrorCode.E_UPLOAD_FAILED.value
|
||||
)
|
||||
logger.error(
|
||||
f"[task:{task_id}] Upload failed: {file_path}, method={upload_method}, "
|
||||
f"http_status={http_status_code}, retries={http_retry_count}, error_type={error_type}"
|
||||
)
|
||||
return None
|
||||
except Exception as e:
|
||||
mark_span_error(span, str(e), ErrorCode.E_UPLOAD_FAILED.value)
|
||||
@@ -538,10 +886,8 @@ class BaseHandler(TaskHandler, ABC):
|
||||
if cmd_to_run and cmd_to_run[0] == 'ffmpeg' and '-loglevel' not in cmd_to_run:
|
||||
cmd_to_run[1:1] = ['-loglevel', FFMPEG_LOGLEVEL]
|
||||
|
||||
# 日志记录命令(限制长度)
|
||||
# 日志记录命令(不限制长度)
|
||||
cmd_str = ' '.join(cmd_to_run)
|
||||
if len(cmd_str) > 500:
|
||||
cmd_str = cmd_str[:500] + '...'
|
||||
logger.info(f"[task:{task_id}] FFmpeg: {cmd_str}")
|
||||
|
||||
with start_span(
|
||||
|
||||
@@ -91,23 +91,37 @@ class ComposeTransitionHandler(BaseHandler):
|
||||
f"overlap_tail={overlap_tail_ms}ms, overlap_head={overlap_head_ms}ms"
|
||||
)
|
||||
|
||||
# 1. 下载前一个片段视频
|
||||
# 1. 并行下载前后片段视频
|
||||
prev_video_file = os.path.join(work_dir, 'prev_segment.mp4')
|
||||
if not self.download_file(prev_segment['videoUrl'], prev_video_file):
|
||||
next_video_file = os.path.join(work_dir, 'next_segment.mp4')
|
||||
download_results = self.download_files_parallel([
|
||||
{
|
||||
'key': 'prev_video',
|
||||
'url': prev_segment['videoUrl'],
|
||||
'dest': prev_video_file,
|
||||
'required': True
|
||||
},
|
||||
{
|
||||
'key': 'next_video',
|
||||
'url': next_segment['videoUrl'],
|
||||
'dest': next_video_file,
|
||||
'required': True
|
||||
}
|
||||
])
|
||||
prev_result = download_results.get('prev_video')
|
||||
if not prev_result or not prev_result['success']:
|
||||
return TaskResult.fail(
|
||||
ErrorCode.E_INPUT_UNAVAILABLE,
|
||||
f"Failed to download prev segment video: {prev_segment['videoUrl']}"
|
||||
)
|
||||
|
||||
# 2. 下载后一个片段视频
|
||||
next_video_file = os.path.join(work_dir, 'next_segment.mp4')
|
||||
if not self.download_file(next_segment['videoUrl'], next_video_file):
|
||||
next_result = download_results.get('next_video')
|
||||
if not next_result or not next_result['success']:
|
||||
return TaskResult.fail(
|
||||
ErrorCode.E_INPUT_UNAVAILABLE,
|
||||
f"Failed to download next segment video: {next_segment['videoUrl']}"
|
||||
)
|
||||
|
||||
# 3. 获取前一个片段的实际时长
|
||||
# 2. 获取前一个片段的实际时长
|
||||
prev_duration = self.probe_duration(prev_video_file)
|
||||
if not prev_duration:
|
||||
return TaskResult.fail(
|
||||
@@ -115,7 +129,7 @@ class ComposeTransitionHandler(BaseHandler):
|
||||
"Failed to probe prev segment duration"
|
||||
)
|
||||
|
||||
# 4. 构建转场合成命令
|
||||
# 3. 构建转场合成命令
|
||||
output_file = os.path.join(work_dir, 'transition.mp4')
|
||||
cmd = self._build_command(
|
||||
prev_video_file=prev_video_file,
|
||||
@@ -128,25 +142,25 @@ class ComposeTransitionHandler(BaseHandler):
|
||||
output_spec=output_spec
|
||||
)
|
||||
|
||||
# 5. 执行 FFmpeg
|
||||
# 4. 执行 FFmpeg
|
||||
if not self.run_ffmpeg(cmd, task.task_id):
|
||||
return TaskResult.fail(
|
||||
ErrorCode.E_FFMPEG_FAILED,
|
||||
"FFmpeg transition composition failed"
|
||||
)
|
||||
|
||||
# 6. 验证输出文件
|
||||
# 5. 验证输出文件
|
||||
if not self.ensure_file_exists(output_file, min_size=1024):
|
||||
return TaskResult.fail(
|
||||
ErrorCode.E_FFMPEG_FAILED,
|
||||
"Transition output file is missing or too small"
|
||||
)
|
||||
|
||||
# 7. 获取实际时长
|
||||
# 6. 获取实际时长
|
||||
actual_duration = self.probe_duration(output_file)
|
||||
actual_duration_ms = int(actual_duration * 1000) if actual_duration else transition_duration_ms
|
||||
|
||||
# 8. 上传产物
|
||||
# 7. 上传产物
|
||||
transition_video_url = self.upload_file(task.task_id, 'video', output_file)
|
||||
if not transition_video_url:
|
||||
return TaskResult.fail(
|
||||
@@ -236,7 +250,7 @@ class ComposeTransitionHandler(BaseHandler):
|
||||
]
|
||||
|
||||
# 编码参数(根据硬件加速配置动态获取)
|
||||
cmd.extend(self.get_video_encode_args())
|
||||
cmd.extend(self.get_video_encode_args(maxrate=output_spec.bitrate))
|
||||
|
||||
# 帧率
|
||||
fps = output_spec.fps
|
||||
|
||||
@@ -110,16 +110,26 @@ class FinalizeMp4Handler(BaseHandler):
|
||||
Returns:
|
||||
TaskResult
|
||||
"""
|
||||
# 1. 下载所有 TS 分片
|
||||
# 1. 并行下载所有 TS 分片
|
||||
download_jobs = []
|
||||
for i, ts_url in enumerate(ts_list):
|
||||
download_jobs.append({
|
||||
'key': str(i),
|
||||
'url': ts_url,
|
||||
'dest': os.path.join(work_dir, f'seg_{i}.ts'),
|
||||
'required': True
|
||||
})
|
||||
download_results = self.download_files_parallel(download_jobs)
|
||||
|
||||
ts_files = []
|
||||
for i, ts_url in enumerate(ts_list):
|
||||
ts_file = os.path.join(work_dir, f'seg_{i}.ts')
|
||||
if not self.download_file(ts_url, ts_file):
|
||||
result = download_results.get(str(i))
|
||||
if not result or not result['success']:
|
||||
return TaskResult.fail(
|
||||
ErrorCode.E_INPUT_UNAVAILABLE,
|
||||
f"Failed to download TS segment {i}: {ts_url}"
|
||||
)
|
||||
ts_files.append(ts_file)
|
||||
ts_files.append(result['dest'])
|
||||
|
||||
logger.info(f"[task:{task.task_id}] Downloaded {len(ts_files)} TS segments")
|
||||
|
||||
|
||||
@@ -1,304 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
TS 分片封装处理器
|
||||
|
||||
处理 PACKAGE_SEGMENT_TS 任务,将视频片段和对应时间区间的音频封装为 TS 分片。
|
||||
支持转场相关的 overlap 裁剪和转场分片封装。
|
||||
"""
|
||||
|
||||
import os
|
||||
import logging
|
||||
from typing import List, Optional
|
||||
|
||||
from handlers.base import BaseHandler, VIDEO_ENCODE_ARGS
|
||||
from domain.task import Task, TaskType
|
||||
from domain.result import TaskResult, ErrorCode
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class PackageSegmentTsHandler(BaseHandler):
|
||||
"""
|
||||
TS 分片封装处理器
|
||||
|
||||
职责:
|
||||
- 下载视频片段
|
||||
- 下载全局音频
|
||||
- 截取对应时间区间的音频
|
||||
- 封装为 TS 分片
|
||||
- 上传 TS 产物
|
||||
|
||||
关键约束:
|
||||
- TS 必须包含音视频同轨
|
||||
- 使用 output_ts_offset 保证时间戳连续
|
||||
- 输出 extinfDurationSec 供 m3u8 使用
|
||||
|
||||
转场相关:
|
||||
- 普通片段 TS:需要裁剪掉 overlap 区域(已被转场分片使用)
|
||||
- 转场分片 TS:直接封装转场视频产物,无需裁剪
|
||||
- 无转场时:走原有逻辑,不做裁剪
|
||||
|
||||
精确裁剪:
|
||||
- 当需要裁剪 overlap 区域时,必须使用重编码方式(-vf trim)才能精确切割
|
||||
- 使用 -c copy 只能从关键帧切割,会导致不精确
|
||||
"""
|
||||
|
||||
def get_supported_type(self) -> TaskType:
|
||||
return TaskType.PACKAGE_SEGMENT_TS
|
||||
|
||||
def handle(self, task: Task) -> TaskResult:
|
||||
"""处理 TS 封装任务"""
|
||||
work_dir = self.create_work_dir(task.task_id)
|
||||
|
||||
try:
|
||||
# 解析参数
|
||||
video_url = task.get_video_url()
|
||||
audio_url = task.get_audio_url()
|
||||
start_time_ms = task.get_start_time_ms()
|
||||
duration_ms = task.get_duration_ms()
|
||||
output_spec = task.get_output_spec()
|
||||
|
||||
# 转场相关参数
|
||||
is_transition_segment = task.is_transition_segment()
|
||||
trim_head = task.should_trim_head()
|
||||
trim_tail = task.should_trim_tail()
|
||||
trim_head_ms = task.get_trim_head_ms()
|
||||
trim_tail_ms = task.get_trim_tail_ms()
|
||||
|
||||
if not video_url:
|
||||
return TaskResult.fail(
|
||||
ErrorCode.E_SPEC_INVALID,
|
||||
"Missing videoUrl"
|
||||
)
|
||||
|
||||
if not audio_url:
|
||||
return TaskResult.fail(
|
||||
ErrorCode.E_SPEC_INVALID,
|
||||
"Missing audioUrl"
|
||||
)
|
||||
|
||||
# 计算时间参数
|
||||
start_sec = start_time_ms / 1000.0
|
||||
duration_sec = duration_ms / 1000.0
|
||||
|
||||
# 1. 下载视频片段
|
||||
video_file = os.path.join(work_dir, 'video.mp4')
|
||||
if not self.download_file(video_url, video_file):
|
||||
return TaskResult.fail(
|
||||
ErrorCode.E_INPUT_UNAVAILABLE,
|
||||
f"Failed to download video: {video_url}"
|
||||
)
|
||||
|
||||
# 2. 下载全局音频
|
||||
audio_file = os.path.join(work_dir, 'audio.aac')
|
||||
if not self.download_file(audio_url, audio_file):
|
||||
return TaskResult.fail(
|
||||
ErrorCode.E_INPUT_UNAVAILABLE,
|
||||
f"Failed to download audio: {audio_url}"
|
||||
)
|
||||
|
||||
# 3. 判断是否需要精确裁剪视频
|
||||
needs_video_trim = not is_transition_segment and (
|
||||
(trim_head and trim_head_ms > 0) or
|
||||
(trim_tail and trim_tail_ms > 0)
|
||||
)
|
||||
|
||||
# 4. 如果需要裁剪,先重编码裁剪视频
|
||||
processed_video_file = video_file
|
||||
if needs_video_trim:
|
||||
processed_video_file = os.path.join(work_dir, 'trimmed_video.mp4')
|
||||
trim_cmd = self._build_trim_command(
|
||||
video_file=video_file,
|
||||
output_file=processed_video_file,
|
||||
trim_head_ms=trim_head_ms if trim_head else 0,
|
||||
trim_tail_ms=trim_tail_ms if trim_tail else 0,
|
||||
output_spec=output_spec
|
||||
)
|
||||
|
||||
logger.info(f"[task:{task.task_id}] Trimming video: head={trim_head_ms}ms, tail={trim_tail_ms}ms")
|
||||
|
||||
if not self.run_ffmpeg(trim_cmd, task.task_id):
|
||||
return TaskResult.fail(
|
||||
ErrorCode.E_FFMPEG_FAILED,
|
||||
"Video trim failed"
|
||||
)
|
||||
|
||||
if not self.ensure_file_exists(processed_video_file, min_size=1024):
|
||||
return TaskResult.fail(
|
||||
ErrorCode.E_FFMPEG_FAILED,
|
||||
"Trimmed video file is missing or too small"
|
||||
)
|
||||
|
||||
# 5. 构建 TS 封装命令
|
||||
output_file = os.path.join(work_dir, 'segment.ts')
|
||||
cmd = self._build_package_command(
|
||||
video_file=processed_video_file,
|
||||
audio_file=audio_file,
|
||||
output_file=output_file,
|
||||
start_sec=start_sec,
|
||||
duration_sec=duration_sec
|
||||
)
|
||||
|
||||
# 6. 执行 FFmpeg
|
||||
if not self.run_ffmpeg(cmd, task.task_id):
|
||||
return TaskResult.fail(
|
||||
ErrorCode.E_FFMPEG_FAILED,
|
||||
"TS packaging failed"
|
||||
)
|
||||
|
||||
# 7. 验证输出文件
|
||||
if not self.ensure_file_exists(output_file, min_size=1024):
|
||||
return TaskResult.fail(
|
||||
ErrorCode.E_FFMPEG_FAILED,
|
||||
"TS output file is missing or too small"
|
||||
)
|
||||
|
||||
# 8. 获取实际时长(用于 EXTINF)
|
||||
actual_duration = self.probe_duration(output_file)
|
||||
extinf_duration = actual_duration if actual_duration else duration_sec
|
||||
|
||||
# 9. 上传产物
|
||||
ts_url = self.upload_file(task.task_id, 'ts', output_file)
|
||||
if not ts_url:
|
||||
return TaskResult.fail(
|
||||
ErrorCode.E_UPLOAD_FAILED,
|
||||
"Failed to upload TS"
|
||||
)
|
||||
|
||||
return TaskResult.ok({
|
||||
'tsUrl': ts_url,
|
||||
'extinfDurationSec': extinf_duration
|
||||
})
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[task:{task.task_id}] Unexpected error: {e}", exc_info=True)
|
||||
return TaskResult.fail(ErrorCode.E_UNKNOWN, str(e))
|
||||
|
||||
finally:
|
||||
self.cleanup_work_dir(work_dir)
|
||||
|
||||
def _build_trim_command(
|
||||
self,
|
||||
video_file: str,
|
||||
output_file: str,
|
||||
trim_head_ms: int,
|
||||
trim_tail_ms: int,
|
||||
output_spec
|
||||
) -> List[str]:
|
||||
"""
|
||||
构建视频精确裁剪命令(重编码方式)
|
||||
|
||||
使用 trim 滤镜进行精确帧级裁剪,而非 -ss/-t 参数的关键帧裁剪。
|
||||
|
||||
Args:
|
||||
video_file: 输入视频路径
|
||||
output_file: 输出视频路径
|
||||
trim_head_ms: 头部裁剪时长(毫秒)
|
||||
trim_tail_ms: 尾部裁剪时长(毫秒)
|
||||
output_spec: 输出规格
|
||||
|
||||
Returns:
|
||||
FFmpeg 命令参数列表
|
||||
"""
|
||||
# 获取原视频时长
|
||||
original_duration = self.probe_duration(video_file)
|
||||
if not original_duration:
|
||||
original_duration = 10.0 # 默认值,避免除零
|
||||
|
||||
trim_head_sec = trim_head_ms / 1000.0
|
||||
trim_tail_sec = trim_tail_ms / 1000.0
|
||||
|
||||
# 计算裁剪后的起止时间
|
||||
start_time = trim_head_sec
|
||||
end_time = original_duration - trim_tail_sec
|
||||
|
||||
# 构建 trim 滤镜
|
||||
vf_filter = f"trim=start={start_time}:end={end_time},setpts=PTS-STARTPTS"
|
||||
|
||||
cmd = [
|
||||
'ffmpeg', '-y', '-hide_banner',
|
||||
'-i', video_file,
|
||||
'-vf', vf_filter,
|
||||
]
|
||||
|
||||
# 编码参数
|
||||
cmd.extend(VIDEO_ENCODE_ARGS)
|
||||
|
||||
# 帧率
|
||||
fps = output_spec.fps
|
||||
cmd.extend(['-r', str(fps)])
|
||||
|
||||
# 计算输出视频帧数,动态调整 GOP
|
||||
output_duration_sec = end_time - start_time
|
||||
total_frames = int(output_duration_sec * fps)
|
||||
|
||||
# 动态 GOP:短视频使用较小的 GOP
|
||||
if total_frames <= 1:
|
||||
gop_size = 1
|
||||
elif total_frames < fps:
|
||||
gop_size = total_frames
|
||||
else:
|
||||
gop_size = fps # 每秒一个关键帧
|
||||
|
||||
cmd.extend(['-g', str(gop_size)])
|
||||
cmd.extend(['-keyint_min', str(min(gop_size, fps // 2 or 1))])
|
||||
|
||||
# 强制第一帧为关键帧
|
||||
cmd.extend(['-force_key_frames', 'expr:eq(n,0)'])
|
||||
|
||||
# 无音频(音频单独处理)
|
||||
cmd.append('-an')
|
||||
|
||||
cmd.append(output_file)
|
||||
|
||||
return cmd
|
||||
|
||||
def _build_package_command(
|
||||
self,
|
||||
video_file: str,
|
||||
audio_file: str,
|
||||
output_file: str,
|
||||
start_sec: float,
|
||||
duration_sec: float
|
||||
) -> List[str]:
|
||||
"""
|
||||
构建 TS 封装命令
|
||||
|
||||
将视频和对应时间区间的音频封装为 TS 分片。
|
||||
视频使用 copy 模式(已经过精确裁剪或无需裁剪)。
|
||||
|
||||
Args:
|
||||
video_file: 视频文件路径(已处理)
|
||||
audio_file: 音频文件路径
|
||||
output_file: 输出文件路径
|
||||
start_sec: 音频开始时间(秒)
|
||||
duration_sec: 音频时长(秒)
|
||||
|
||||
Returns:
|
||||
FFmpeg 命令参数列表
|
||||
"""
|
||||
cmd = [
|
||||
'ffmpeg', '-y', '-hide_banner',
|
||||
# 视频输入
|
||||
'-i', video_file,
|
||||
# 音频输入(从 start_sec 开始截取 duration_sec)
|
||||
'-ss', str(start_sec),
|
||||
'-t', str(duration_sec),
|
||||
'-i', audio_file,
|
||||
# 映射流
|
||||
'-map', '0:v:0', # 使用第一个输入的视频流
|
||||
'-map', '1:a:0', # 使用第二个输入的音频流
|
||||
# 复制编码(视频已处理,无需重编码)
|
||||
'-c:v', 'copy',
|
||||
'-c:a', 'copy',
|
||||
# 关键:时间戳偏移,保证整体连续
|
||||
'-output_ts_offset', str(start_sec),
|
||||
# 复用参数
|
||||
'-muxdelay', '0',
|
||||
'-muxpreload', '0',
|
||||
# 输出格式
|
||||
'-f', 'mpegts',
|
||||
output_file
|
||||
]
|
||||
|
||||
return cmd
|
||||
@@ -55,56 +55,88 @@ class PrepareJobAudioHandler(BaseHandler):
|
||||
bgm_url = task.get_bgm_url()
|
||||
segments = task.get_segments()
|
||||
|
||||
# 1. 下载 BGM(如有)
|
||||
bgm_file = None
|
||||
# 1. 并行下载 BGM 与叠加音效
|
||||
bgm_file = os.path.join(work_dir, 'bgm.mp3') if bgm_url else None
|
||||
download_jobs = []
|
||||
if bgm_url and bgm_file:
|
||||
download_jobs.append({
|
||||
'key': 'bgm',
|
||||
'url': bgm_url,
|
||||
'dest': bgm_file,
|
||||
'required': False
|
||||
})
|
||||
|
||||
sfx_download_candidates = []
|
||||
for i, seg in enumerate(segments):
|
||||
audio_spec_data = seg.get('audioSpecJson')
|
||||
if not audio_spec_data:
|
||||
continue
|
||||
audio_spec = AudioSpec.from_dict(audio_spec_data)
|
||||
if not audio_spec or not audio_spec.audio_url:
|
||||
continue
|
||||
sfx_file = os.path.join(work_dir, f'sfx_{i}.mp3')
|
||||
job_key = f'sfx_{i}'
|
||||
sfx_download_candidates.append({
|
||||
'key': job_key,
|
||||
'file': sfx_file,
|
||||
'spec': audio_spec,
|
||||
'segment': seg
|
||||
})
|
||||
download_jobs.append({
|
||||
'key': job_key,
|
||||
'url': audio_spec.audio_url,
|
||||
'dest': sfx_file,
|
||||
'required': False
|
||||
})
|
||||
|
||||
download_results = self.download_files_parallel(download_jobs)
|
||||
if bgm_url:
|
||||
bgm_file = os.path.join(work_dir, 'bgm.mp3')
|
||||
if not self.download_file(bgm_url, bgm_file):
|
||||
bgm_result = download_results.get('bgm')
|
||||
if not bgm_result or not bgm_result['success']:
|
||||
logger.warning(f"[task:{task.task_id}] Failed to download BGM")
|
||||
bgm_file = None
|
||||
|
||||
# 2. 下载叠加音效
|
||||
sfx_files = []
|
||||
for i, seg in enumerate(segments):
|
||||
audio_spec_data = seg.get('audioSpecJson')
|
||||
if audio_spec_data:
|
||||
audio_spec = AudioSpec.from_dict(audio_spec_data)
|
||||
if audio_spec and audio_spec.audio_url:
|
||||
sfx_file = os.path.join(work_dir, f'sfx_{i}.mp3')
|
||||
if self.download_file(audio_spec.audio_url, sfx_file):
|
||||
sfx_files.append({
|
||||
'file': sfx_file,
|
||||
'spec': audio_spec,
|
||||
'segment': seg
|
||||
})
|
||||
else:
|
||||
logger.warning(f"[task:{task.task_id}] Failed to download SFX {i}")
|
||||
for sfx_candidate in sfx_download_candidates:
|
||||
sfx_result = download_results.get(sfx_candidate['key'])
|
||||
if sfx_result and sfx_result['success']:
|
||||
sfx_files.append({
|
||||
'file': sfx_candidate['file'],
|
||||
'spec': sfx_candidate['spec'],
|
||||
'segment': sfx_candidate['segment']
|
||||
})
|
||||
else:
|
||||
logger.warning(f"[task:{task.task_id}] Failed to download SFX {sfx_candidate['key']}")
|
||||
|
||||
# 3. 构建音频混音命令
|
||||
# 2. 构建音频混音命令
|
||||
output_file = os.path.join(work_dir, 'audio_full.aac')
|
||||
global_fade_in_ms = task.get_global_audio_fade_in_ms()
|
||||
global_fade_out_ms = task.get_global_audio_fade_out_ms()
|
||||
cmd = self._build_audio_command(
|
||||
bgm_file=bgm_file,
|
||||
sfx_files=sfx_files,
|
||||
output_file=output_file,
|
||||
total_duration_sec=total_duration_sec,
|
||||
audio_profile=audio_profile
|
||||
audio_profile=audio_profile,
|
||||
global_fade_in_ms=global_fade_in_ms,
|
||||
global_fade_out_ms=global_fade_out_ms
|
||||
)
|
||||
|
||||
# 4. 执行 FFmpeg
|
||||
# 3. 执行 FFmpeg
|
||||
if not self.run_ffmpeg(cmd, task.task_id):
|
||||
return TaskResult.fail(
|
||||
ErrorCode.E_FFMPEG_FAILED,
|
||||
"Audio mixing failed"
|
||||
)
|
||||
|
||||
# 5. 验证输出文件
|
||||
# 4. 验证输出文件
|
||||
if not self.ensure_file_exists(output_file, min_size=1024):
|
||||
return TaskResult.fail(
|
||||
ErrorCode.E_FFMPEG_FAILED,
|
||||
"Audio output file is missing or too small"
|
||||
)
|
||||
|
||||
# 6. 上传产物
|
||||
# 5. 上传产物
|
||||
audio_url = self.upload_file(task.task_id, 'audio', output_file)
|
||||
if not audio_url:
|
||||
return TaskResult.fail(
|
||||
@@ -129,7 +161,9 @@ class PrepareJobAudioHandler(BaseHandler):
|
||||
sfx_files: List[Dict],
|
||||
output_file: str,
|
||||
total_duration_sec: float,
|
||||
audio_profile: AudioProfile
|
||||
audio_profile: AudioProfile,
|
||||
global_fade_in_ms: int = 0,
|
||||
global_fade_out_ms: int = 0
|
||||
) -> List[str]:
|
||||
"""
|
||||
构建音频混音命令
|
||||
@@ -140,6 +174,8 @@ class PrepareJobAudioHandler(BaseHandler):
|
||||
output_file: 输出文件路径
|
||||
total_duration_sec: 总时长(秒)
|
||||
audio_profile: 音频配置
|
||||
global_fade_in_ms: 全局音频淡入时长(毫秒),0 不应用
|
||||
global_fade_out_ms: 全局音频淡出时长(毫秒),0 不应用
|
||||
|
||||
Returns:
|
||||
FFmpeg 命令参数列表
|
||||
@@ -147,8 +183,23 @@ class PrepareJobAudioHandler(BaseHandler):
|
||||
sample_rate = audio_profile.sample_rate
|
||||
channels = audio_profile.channels
|
||||
|
||||
# 构建全局 afade 滤镜(作用于最终混合音频,在 amix 之后)
|
||||
global_fade_filters = self._build_global_fade_filters(
|
||||
total_duration_sec, global_fade_in_ms, global_fade_out_ms
|
||||
)
|
||||
|
||||
# 情况1:无 BGM 也无叠加音效 -> 生成静音
|
||||
if not bgm_file and not sfx_files:
|
||||
if global_fade_filters:
|
||||
return [
|
||||
'ffmpeg', '-y', '-hide_banner',
|
||||
'-f', 'lavfi',
|
||||
'-i', f'anullsrc=r={sample_rate}:cl=stereo',
|
||||
'-t', str(total_duration_sec),
|
||||
'-af', ','.join(global_fade_filters),
|
||||
'-c:a', 'aac', '-b:a', '128k',
|
||||
output_file
|
||||
]
|
||||
return [
|
||||
'ffmpeg', '-y', '-hide_banner',
|
||||
'-f', 'lavfi',
|
||||
@@ -160,10 +211,14 @@ class PrepareJobAudioHandler(BaseHandler):
|
||||
|
||||
# 情况2:仅 BGM,无叠加音效
|
||||
if not sfx_files:
|
||||
af_arg = []
|
||||
if global_fade_filters:
|
||||
af_arg = ['-af', ','.join(global_fade_filters)]
|
||||
return [
|
||||
'ffmpeg', '-y', '-hide_banner',
|
||||
'-i', bgm_file,
|
||||
'-t', str(total_duration_sec),
|
||||
*af_arg,
|
||||
'-c:a', 'aac', '-b:a', '128k',
|
||||
'-ar', str(sample_rate), '-ac', str(channels),
|
||||
output_file
|
||||
@@ -233,10 +288,21 @@ class PrepareJobAudioHandler(BaseHandler):
|
||||
# dropout_transition=0 表示输入结束时不做渐变
|
||||
mix_inputs = "[bgm]" + "".join(sfx_labels)
|
||||
num_inputs = 1 + len(sfx_files)
|
||||
filter_parts.append(
|
||||
f"{mix_inputs}amix=inputs={num_inputs}:duration=first:"
|
||||
f"dropout_transition=0:normalize=0[out]"
|
||||
)
|
||||
|
||||
# 如果有全局 fade,amix 输出到中间标签后再追加 afade;否则直接输出到 [out]
|
||||
if global_fade_filters:
|
||||
filter_parts.append(
|
||||
f"{mix_inputs}amix=inputs={num_inputs}:duration=first:"
|
||||
f"dropout_transition=0:normalize=0[mixed]"
|
||||
)
|
||||
filter_parts.append(
|
||||
f"[mixed]{','.join(global_fade_filters)}[out]"
|
||||
)
|
||||
else:
|
||||
filter_parts.append(
|
||||
f"{mix_inputs}amix=inputs={num_inputs}:duration=first:"
|
||||
f"dropout_transition=0:normalize=0[out]"
|
||||
)
|
||||
|
||||
filter_complex = ';'.join(filter_parts)
|
||||
|
||||
@@ -249,3 +315,33 @@ class PrepareJobAudioHandler(BaseHandler):
|
||||
]
|
||||
|
||||
return cmd
|
||||
|
||||
@staticmethod
|
||||
def _build_global_fade_filters(
|
||||
total_duration_sec: float,
|
||||
global_fade_in_ms: int,
|
||||
global_fade_out_ms: int
|
||||
) -> List[str]:
|
||||
"""
|
||||
构建全局音频淡入/淡出滤镜列表
|
||||
|
||||
在 amix 混音输出之后追加,作用于最终混合音频。
|
||||
与片段级 audioSpecJson 中的 fadeInMs/fadeOutMs(仅作用于单个叠加音效)独立。
|
||||
|
||||
Args:
|
||||
total_duration_sec: 总时长(秒)
|
||||
global_fade_in_ms: 全局淡入时长(毫秒),0 不应用
|
||||
global_fade_out_ms: 全局淡出时长(毫秒),0 不应用
|
||||
|
||||
Returns:
|
||||
afade 滤镜字符串列表,可能为空
|
||||
"""
|
||||
filters = []
|
||||
if global_fade_in_ms > 0:
|
||||
fade_in_sec = global_fade_in_ms / 1000.0
|
||||
filters.append(f"afade=t=in:st=0:d={fade_in_sec}")
|
||||
if global_fade_out_ms > 0:
|
||||
fade_out_sec = global_fade_out_ms / 1000.0
|
||||
fade_out_start = total_duration_sec - fade_out_sec
|
||||
filters.append(f"afade=t=out:st={fade_out_start}:d={fade_out_sec}")
|
||||
return filters
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
视频片段渲染处理器
|
||||
渲染+TS封装处理器
|
||||
|
||||
处理 RENDER_SEGMENT_VIDEO 任务,将原素材渲染为符合输出规格的视频片段。
|
||||
支持转场 overlap 区域的帧冻结生成。
|
||||
处理 RENDER_SEGMENT_TS 任务,将原素材渲染为视频并封装为 TS 分片。
|
||||
支持转场 overlap 区域的帧冻结生成和精确裁剪。
|
||||
"""
|
||||
|
||||
import os
|
||||
@@ -26,21 +26,24 @@ def _get_extension_from_url(url: str) -> str:
|
||||
return ext.lower() if ext else ''
|
||||
|
||||
|
||||
class RenderSegmentVideoHandler(BaseHandler):
|
||||
class RenderSegmentTsHandler(BaseHandler):
|
||||
"""
|
||||
视频片段渲染处理器
|
||||
渲染+TS封装处理器
|
||||
|
||||
职责:
|
||||
- 下载素材文件
|
||||
- 下载 LUT 文件(如有)
|
||||
- 下载叠加层(如有)
|
||||
- 下载音频(如有)
|
||||
- 构建 FFmpeg 渲染命令
|
||||
- 执行渲染(支持帧冻结生成 overlap 区域)
|
||||
- 裁剪 overlap 区域(如需要)
|
||||
- 封装为 TS 分片
|
||||
- 上传产物
|
||||
"""
|
||||
|
||||
def get_supported_type(self) -> TaskType:
|
||||
return TaskType.RENDER_SEGMENT_VIDEO
|
||||
return TaskType.RENDER_SEGMENT_TS
|
||||
|
||||
def handle(self, task: Task) -> TaskResult:
|
||||
"""处理视频渲染任务"""
|
||||
@@ -85,13 +88,82 @@ class RenderSegmentVideoHandler(BaseHandler):
|
||||
else:
|
||||
input_file = os.path.join(work_dir, 'input.mp4')
|
||||
|
||||
# 2. 下载素材
|
||||
if not self.download_file(material_url, input_file):
|
||||
# 2. 构建并行下载任务(主素材 + 可选 LUT + 可选叠加层 + 可选音频)
|
||||
audio_url = task.get_audio_url()
|
||||
audio_file = None
|
||||
|
||||
lut_file = os.path.join(work_dir, 'lut.cube') if render_spec.lut_url else None
|
||||
overlay_file = None
|
||||
if render_spec.overlay_url:
|
||||
# 根据 URL 后缀确定文件扩展名
|
||||
overlay_url_lower = render_spec.overlay_url.lower()
|
||||
if overlay_url_lower.endswith('.jpg') or overlay_url_lower.endswith('.jpeg'):
|
||||
overlay_ext = '.jpg'
|
||||
elif overlay_url_lower.endswith('.mov'):
|
||||
overlay_ext = '.mov'
|
||||
else:
|
||||
overlay_ext = '.png'
|
||||
overlay_file = os.path.join(work_dir, f'overlay{overlay_ext}')
|
||||
|
||||
download_jobs = [
|
||||
{
|
||||
'key': 'material',
|
||||
'url': material_url,
|
||||
'dest': input_file,
|
||||
'required': True
|
||||
}
|
||||
]
|
||||
if render_spec.lut_url and lut_file:
|
||||
download_jobs.append({
|
||||
'key': 'lut',
|
||||
'url': render_spec.lut_url,
|
||||
'dest': lut_file,
|
||||
'required': False
|
||||
})
|
||||
if render_spec.overlay_url and overlay_file:
|
||||
download_jobs.append({
|
||||
'key': 'overlay',
|
||||
'url': render_spec.overlay_url,
|
||||
'dest': overlay_file,
|
||||
'required': False
|
||||
})
|
||||
if audio_url:
|
||||
audio_file = os.path.join(work_dir, 'audio.aac')
|
||||
download_jobs.append({
|
||||
'key': 'audio',
|
||||
'url': audio_url,
|
||||
'dest': audio_file,
|
||||
'required': True
|
||||
})
|
||||
download_results = self.download_files_parallel(download_jobs)
|
||||
|
||||
material_result = download_results.get('material')
|
||||
if not material_result or not material_result['success']:
|
||||
return TaskResult.fail(
|
||||
ErrorCode.E_INPUT_UNAVAILABLE,
|
||||
f"Failed to download material: {material_url}"
|
||||
)
|
||||
|
||||
if render_spec.lut_url:
|
||||
lut_result = download_results.get('lut')
|
||||
if not lut_result or not lut_result['success']:
|
||||
logger.warning(f"[task:{task.task_id}] Failed to download LUT, continuing without it")
|
||||
lut_file = None
|
||||
|
||||
if render_spec.overlay_url:
|
||||
overlay_result = download_results.get('overlay')
|
||||
if not overlay_result or not overlay_result['success']:
|
||||
logger.warning(f"[task:{task.task_id}] Failed to download overlay, continuing without it")
|
||||
overlay_file = None
|
||||
|
||||
if audio_url:
|
||||
audio_dl = download_results.get('audio')
|
||||
if not audio_dl or not audio_dl['success']:
|
||||
return TaskResult.fail(
|
||||
ErrorCode.E_INPUT_UNAVAILABLE,
|
||||
f"Failed to download audio: {audio_url}"
|
||||
)
|
||||
|
||||
# 3. 图片素材转换为视频
|
||||
if is_image:
|
||||
video_input_file = os.path.join(work_dir, 'input_video.mp4')
|
||||
@@ -111,31 +183,7 @@ class RenderSegmentVideoHandler(BaseHandler):
|
||||
input_file = video_input_file
|
||||
logger.info(f"[task:{task.task_id}] Image converted to video successfully")
|
||||
|
||||
# 4. 下载 LUT(如有)
|
||||
lut_file = None
|
||||
if render_spec.lut_url:
|
||||
lut_file = os.path.join(work_dir, 'lut.cube')
|
||||
if not self.download_file(render_spec.lut_url, lut_file):
|
||||
logger.warning(f"[task:{task.task_id}] Failed to download LUT, continuing without it")
|
||||
lut_file = None
|
||||
|
||||
# 5. 下载叠加层(如有)
|
||||
overlay_file = None
|
||||
if render_spec.overlay_url:
|
||||
# 根据 URL 后缀确定文件扩展名
|
||||
url_lower = render_spec.overlay_url.lower()
|
||||
if url_lower.endswith('.jpg') or url_lower.endswith('.jpeg'):
|
||||
ext = '.jpg'
|
||||
elif url_lower.endswith('.mov'):
|
||||
ext = '.mov'
|
||||
else:
|
||||
ext = '.png' # 默认
|
||||
overlay_file = os.path.join(work_dir, f'overlay{ext}')
|
||||
if not self.download_file(render_spec.overlay_url, overlay_file):
|
||||
logger.warning(f"[task:{task.task_id}] Failed to download overlay, continuing without it")
|
||||
overlay_file = None
|
||||
|
||||
# 6. 探测源视频时长(仅对视频素材)
|
||||
# 4. 探测源视频时长(仅对视频素材)
|
||||
# 用于检测时长不足并通过冻结最后一帧补足
|
||||
source_duration_sec = None
|
||||
if not is_image:
|
||||
@@ -158,13 +206,13 @@ class RenderSegmentVideoHandler(BaseHandler):
|
||||
f"will freeze last frame for {shortage_sec:.2f}s"
|
||||
)
|
||||
|
||||
# 7. 计算 overlap 时长(用于转场帧冻结)
|
||||
# 5. 计算 overlap 时长(用于转场帧冻结)
|
||||
# 头部 overlap: 来自前一片段的出场转场
|
||||
overlap_head_ms = task.get_overlap_head_ms()
|
||||
# 尾部 overlap: 当前片段的出场转场
|
||||
overlap_tail_ms = task.get_overlap_tail_ms_v2()
|
||||
|
||||
# 8. 构建 FFmpeg 命令
|
||||
# 6. 构建 FFmpeg 命令
|
||||
output_file = os.path.join(work_dir, 'output.mp4')
|
||||
cmd = self._build_command(
|
||||
input_file=input_file,
|
||||
@@ -179,41 +227,96 @@ class RenderSegmentVideoHandler(BaseHandler):
|
||||
source_duration_sec=source_duration_sec
|
||||
)
|
||||
|
||||
# 9. 执行 FFmpeg
|
||||
# 7. 执行 FFmpeg
|
||||
if not self.run_ffmpeg(cmd, task.task_id):
|
||||
return TaskResult.fail(
|
||||
ErrorCode.E_FFMPEG_FAILED,
|
||||
"FFmpeg rendering failed"
|
||||
)
|
||||
|
||||
# 10. 验证输出文件
|
||||
# 8. 验证输出文件
|
||||
if not self.ensure_file_exists(output_file, min_size=4096):
|
||||
return TaskResult.fail(
|
||||
ErrorCode.E_FFMPEG_FAILED,
|
||||
"Output file is missing or too small"
|
||||
)
|
||||
|
||||
# 11. 获取实际时长
|
||||
actual_duration = self.probe_duration(output_file)
|
||||
actual_duration_ms = int(actual_duration * 1000) if actual_duration else duration_ms
|
||||
# 9. Overlap 裁剪(仅非转场分片、且有需要裁剪的 overlap 时)
|
||||
is_transition_seg = task.is_transition_segment()
|
||||
trim_head = task.should_trim_head()
|
||||
trim_tail = task.should_trim_tail()
|
||||
trim_head_ms = task.get_trim_head_ms()
|
||||
trim_tail_ms = task.get_trim_tail_ms()
|
||||
needs_video_trim = not is_transition_seg and (
|
||||
(trim_head and trim_head_ms > 0) or
|
||||
(trim_tail and trim_tail_ms > 0)
|
||||
)
|
||||
|
||||
# 12. 上传产物
|
||||
video_url = self.upload_file(task.task_id, 'video', output_file)
|
||||
if not video_url:
|
||||
return TaskResult.fail(
|
||||
ErrorCode.E_UPLOAD_FAILED,
|
||||
"Failed to upload video"
|
||||
processed_video = output_file
|
||||
if needs_video_trim:
|
||||
processed_video = os.path.join(work_dir, 'trimmed_video.mp4')
|
||||
trim_cmd = self._build_trim_command(
|
||||
video_file=output_file,
|
||||
output_file=processed_video,
|
||||
trim_head_ms=trim_head_ms if trim_head else 0,
|
||||
trim_tail_ms=trim_tail_ms if trim_tail else 0,
|
||||
output_spec=output_spec
|
||||
)
|
||||
|
||||
# 13. 构建结果(包含 overlap 信息)
|
||||
result_data = {
|
||||
'videoUrl': video_url,
|
||||
'actualDurationMs': actual_duration_ms,
|
||||
'overlapHeadMs': overlap_head_ms,
|
||||
'overlapTailMs': overlap_tail_ms
|
||||
}
|
||||
logger.info(f"[task:{task.task_id}] Trimming video: head={trim_head_ms}ms, tail={trim_tail_ms}ms")
|
||||
|
||||
return TaskResult.ok(result_data)
|
||||
if not self.run_ffmpeg(trim_cmd, task.task_id):
|
||||
return TaskResult.fail(
|
||||
ErrorCode.E_FFMPEG_FAILED,
|
||||
"Video trim failed"
|
||||
)
|
||||
|
||||
if not self.ensure_file_exists(processed_video, min_size=1024):
|
||||
return TaskResult.fail(
|
||||
ErrorCode.E_FFMPEG_FAILED,
|
||||
"Trimmed video file is missing or too small"
|
||||
)
|
||||
|
||||
# 10. 封装 TS
|
||||
start_time_ms = task.get_start_time_ms()
|
||||
start_sec = start_time_ms / 1000.0
|
||||
duration_sec = duration_ms / 1000.0
|
||||
ts_output = os.path.join(work_dir, 'segment.ts')
|
||||
ts_cmd = self._build_ts_package_command(
|
||||
video_file=processed_video,
|
||||
audio_file=audio_file,
|
||||
output_file=ts_output,
|
||||
start_sec=start_sec,
|
||||
duration_sec=duration_sec
|
||||
)
|
||||
|
||||
if not self.run_ffmpeg(ts_cmd, task.task_id):
|
||||
return TaskResult.fail(
|
||||
ErrorCode.E_FFMPEG_FAILED,
|
||||
"TS packaging failed"
|
||||
)
|
||||
|
||||
if not self.ensure_file_exists(ts_output, min_size=1024):
|
||||
return TaskResult.fail(
|
||||
ErrorCode.E_FFMPEG_FAILED,
|
||||
"TS output file is missing or too small"
|
||||
)
|
||||
|
||||
# 11. 获取 EXTINF 时长 + 上传 TS
|
||||
actual_duration = self.probe_duration(ts_output)
|
||||
extinf_duration = actual_duration if actual_duration else duration_sec
|
||||
|
||||
ts_url = self.upload_file(task.task_id, 'ts', ts_output)
|
||||
if not ts_url:
|
||||
return TaskResult.fail(
|
||||
ErrorCode.E_UPLOAD_FAILED,
|
||||
"Failed to upload TS"
|
||||
)
|
||||
|
||||
return TaskResult.ok({
|
||||
'tsUrl': ts_url,
|
||||
'extinfDurationSec': extinf_duration
|
||||
})
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[task:{task.task_id}] Unexpected error: {e}", exc_info=True)
|
||||
@@ -222,6 +325,43 @@ class RenderSegmentVideoHandler(BaseHandler):
|
||||
finally:
|
||||
self.cleanup_work_dir(work_dir)
|
||||
|
||||
@staticmethod
|
||||
def _build_crop_filter(
|
||||
render_spec: 'RenderSpec',
|
||||
width: int,
|
||||
height: int,
|
||||
task_id: str = ''
|
||||
) -> Optional[str]:
|
||||
"""
|
||||
构建裁切滤镜
|
||||
|
||||
crop_enable 时:以目标比例为基准,按 crop_scale 倍率裁切,crop_pos 控制位置(默认居中)。
|
||||
|
||||
Returns:
|
||||
crop 滤镜字符串,无需裁切时返回 None
|
||||
"""
|
||||
if render_spec.crop_enable:
|
||||
scale = render_spec.crop_scale
|
||||
target_ratio = width / height
|
||||
|
||||
# 解析裁切位置,默认居中
|
||||
fx, fy = 0.5, 0.5
|
||||
if render_spec.crop_pos:
|
||||
try:
|
||||
fx, fy = map(float, render_spec.crop_pos.split(','))
|
||||
except ValueError:
|
||||
logger.warning(f"[task:{task_id}] Invalid crop position: {render_spec.crop_pos}, using center")
|
||||
fx, fy = 0.5, 0.5
|
||||
|
||||
# 基准:源中最大的目标比例矩形,再除以倍率
|
||||
return (
|
||||
f"crop='min(iw,ih*{target_ratio})/{scale}':'min(ih,iw/{target_ratio})/{scale}':"
|
||||
f"'(iw-min(iw,ih*{target_ratio})/{scale})*{fx}':"
|
||||
f"'(ih-min(ih,iw/{target_ratio})/{scale})*{fy}'"
|
||||
)
|
||||
|
||||
return None
|
||||
|
||||
def _convert_image_to_video(
|
||||
self,
|
||||
image_file: str,
|
||||
@@ -270,23 +410,10 @@ class RenderSegmentVideoHandler(BaseHandler):
|
||||
# 构建滤镜:缩放填充到目标尺寸
|
||||
filters = []
|
||||
|
||||
# 裁切处理(与视频相同逻辑)
|
||||
if render_spec.crop_enable and render_spec.face_pos:
|
||||
try:
|
||||
fx, fy = map(float, render_spec.face_pos.split(','))
|
||||
target_ratio = width / height
|
||||
filters.append(
|
||||
f"crop='min(iw,ih*{target_ratio})':'min(ih,iw/{target_ratio})':"
|
||||
f"'(iw-min(iw,ih*{target_ratio}))*{fx}':"
|
||||
f"'(ih-min(ih,iw/{target_ratio}))*{fy}'"
|
||||
)
|
||||
except (ValueError, ZeroDivisionError):
|
||||
logger.warning(f"[task:{task_id}] Invalid face position: {render_spec.face_pos}")
|
||||
elif render_spec.zoom_cut:
|
||||
target_ratio = width / height
|
||||
filters.append(
|
||||
f"crop='min(iw,ih*{target_ratio})':'min(ih,iw/{target_ratio})'"
|
||||
)
|
||||
# 裁切处理
|
||||
crop_filter = self._build_crop_filter(render_spec, width, height, task_id)
|
||||
if crop_filter:
|
||||
filters.append(crop_filter)
|
||||
|
||||
# 缩放填充
|
||||
filters.append(
|
||||
@@ -324,6 +451,116 @@ class RenderSegmentVideoHandler(BaseHandler):
|
||||
logger.info(f"[task:{task_id}] Converting image to video: {actual_duration_sec:.2f}s at {fps}fps")
|
||||
return self.run_ffmpeg(cmd, task_id)
|
||||
|
||||
def _build_trim_command(
|
||||
self,
|
||||
video_file: str,
|
||||
output_file: str,
|
||||
trim_head_ms: int,
|
||||
trim_tail_ms: int,
|
||||
output_spec
|
||||
) -> List[str]:
|
||||
"""
|
||||
构建视频精确裁剪命令(重编码方式)
|
||||
|
||||
使用 trim 滤镜进行精确帧级裁剪,而非 -ss/-t 参数的关键帧裁剪。
|
||||
|
||||
Args:
|
||||
video_file: 输入视频路径
|
||||
output_file: 输出视频路径
|
||||
trim_head_ms: 头部裁剪时长(毫秒)
|
||||
trim_tail_ms: 尾部裁剪时长(毫秒)
|
||||
output_spec: 输出规格
|
||||
|
||||
Returns:
|
||||
FFmpeg 命令参数列表
|
||||
"""
|
||||
original_duration = self.probe_duration(video_file)
|
||||
if not original_duration:
|
||||
original_duration = 10.0
|
||||
|
||||
trim_head_sec = trim_head_ms / 1000.0
|
||||
trim_tail_sec = trim_tail_ms / 1000.0
|
||||
|
||||
start_time = trim_head_sec
|
||||
end_time = original_duration - trim_tail_sec
|
||||
|
||||
vf_filter = f"trim=start={start_time}:end={end_time},setpts=PTS-STARTPTS"
|
||||
|
||||
cmd = [
|
||||
'ffmpeg', '-y', '-hide_banner',
|
||||
'-i', video_file,
|
||||
'-vf', vf_filter,
|
||||
]
|
||||
|
||||
cmd.extend(self.get_video_encode_args(maxrate=output_spec.bitrate))
|
||||
|
||||
fps = output_spec.fps
|
||||
cmd.extend(['-r', str(fps)])
|
||||
|
||||
output_duration_sec = end_time - start_time
|
||||
total_frames = int(output_duration_sec * fps)
|
||||
|
||||
if total_frames <= 1:
|
||||
gop_size = 1
|
||||
elif total_frames < fps:
|
||||
gop_size = total_frames
|
||||
else:
|
||||
gop_size = fps
|
||||
|
||||
cmd.extend(['-g', str(gop_size)])
|
||||
cmd.extend(['-keyint_min', str(min(gop_size, fps // 2 or 1))])
|
||||
cmd.extend(['-force_key_frames', 'expr:eq(n,0)'])
|
||||
cmd.append('-an')
|
||||
cmd.append(output_file)
|
||||
|
||||
return cmd
|
||||
|
||||
def _build_ts_package_command(
|
||||
self,
|
||||
video_file: str,
|
||||
audio_file: Optional[str],
|
||||
output_file: str,
|
||||
start_sec: float,
|
||||
duration_sec: float
|
||||
) -> List[str]:
|
||||
"""
|
||||
构建 TS 封装命令
|
||||
|
||||
将视频和对应时间区间的音频封装为 TS 分片。
|
||||
视频使用 copy 模式(已经过渲染/裁剪)。
|
||||
支持无音频模式(video-only TS)。
|
||||
|
||||
Args:
|
||||
video_file: 视频文件路径(已处理)
|
||||
audio_file: 音频文件路径(可选,None 时生成 video-only TS)
|
||||
output_file: 输出文件路径
|
||||
start_sec: 音频开始时间(秒)
|
||||
duration_sec: 音频时长(秒)
|
||||
|
||||
Returns:
|
||||
FFmpeg 命令参数列表
|
||||
"""
|
||||
cmd = [
|
||||
'ffmpeg', '-y', '-hide_banner',
|
||||
'-i', video_file,
|
||||
]
|
||||
|
||||
if audio_file:
|
||||
cmd.extend(['-ss', str(start_sec), '-t', str(duration_sec), '-i', audio_file])
|
||||
cmd.extend(['-map', '0:v:0', '-map', '1:a:0', '-c:v', 'copy', '-c:a', 'copy'])
|
||||
else:
|
||||
cmd.extend(['-c:v', 'copy'])
|
||||
|
||||
cmd.extend([
|
||||
'-output_ts_offset', str(start_sec),
|
||||
'-muxdelay', '0',
|
||||
'-muxpreload', '0',
|
||||
'-f', 'mpegts',
|
||||
output_file
|
||||
])
|
||||
|
||||
return cmd
|
||||
|
||||
def _build_command(
|
||||
self,
|
||||
input_file: str,
|
||||
@@ -391,7 +628,7 @@ class RenderSegmentVideoHandler(BaseHandler):
|
||||
cmd.extend(['-vf', filters])
|
||||
|
||||
# 编码参数(根据硬件加速配置动态获取)
|
||||
cmd.extend(self.get_video_encode_args())
|
||||
cmd.extend(self.get_video_encode_args(maxrate=output_spec.bitrate))
|
||||
|
||||
# 帧率
|
||||
fps = output_spec.fps
|
||||
@@ -465,7 +702,10 @@ class RenderSegmentVideoHandler(BaseHandler):
|
||||
|
||||
# 解析 effects
|
||||
effects = render_spec.get_effects()
|
||||
has_camera_shot = any(e.effect_type == 'cameraShot' for e in effects)
|
||||
has_complex_effect = any(
|
||||
effect.effect_type in {'cameraShot', 'zoom'}
|
||||
for effect in effects
|
||||
)
|
||||
|
||||
# 硬件加速时需要先 hwdownload(将 GPU 表面下载到系统内存)
|
||||
hwaccel_prefix = self.get_hwaccel_filter_prefix()
|
||||
@@ -473,12 +713,23 @@ class RenderSegmentVideoHandler(BaseHandler):
|
||||
# 去掉末尾的逗号,作为第一个滤镜
|
||||
filters.append(hwaccel_prefix.rstrip(','))
|
||||
|
||||
# 1. 变速处理
|
||||
# 1. 变速处理(合并 RenderSpec.speed 与 ospeed 效果)
|
||||
speed = float(render_spec.speed) if render_spec.speed else 1.0
|
||||
if speed != 1.0 and speed > 0:
|
||||
# setpts 公式:PTS / speed
|
||||
pts_factor = 1.0 / speed
|
||||
filters.append(f"setpts={pts_factor}*PTS")
|
||||
if speed <= 0:
|
||||
speed = 1.0
|
||||
|
||||
ospeed_factor = 1.0
|
||||
for effect in effects:
|
||||
if effect.effect_type == 'ospeed':
|
||||
ospeed_factor = effect.get_ospeed_params()
|
||||
break
|
||||
|
||||
combined_pts_factor = (1.0 / speed) * ospeed_factor
|
||||
# 统一归零视频起始时间戳,避免源素材非 0 起始 PTS 造成封装后首帧冻结
|
||||
if combined_pts_factor != 1.0:
|
||||
filters.append(f"setpts={combined_pts_factor}*(PTS-STARTPTS)")
|
||||
else:
|
||||
filters.append("setpts=PTS-STARTPTS")
|
||||
|
||||
# 2. LUT 调色
|
||||
if lut_file:
|
||||
@@ -487,26 +738,9 @@ class RenderSegmentVideoHandler(BaseHandler):
|
||||
filters.append(f"lut3d='{lut_path}'")
|
||||
|
||||
# 3. 裁切处理
|
||||
if render_spec.crop_enable and render_spec.face_pos:
|
||||
# 根据人脸位置进行智能裁切
|
||||
try:
|
||||
fx, fy = map(float, render_spec.face_pos.split(','))
|
||||
# 计算裁切区域(保持输出比例)
|
||||
target_ratio = width / height
|
||||
# 假设裁切到目标比例
|
||||
filters.append(
|
||||
f"crop='min(iw,ih*{target_ratio})':'min(ih,iw/{target_ratio})':"
|
||||
f"'(iw-min(iw,ih*{target_ratio}))*{fx}':"
|
||||
f"'(ih-min(ih,iw/{target_ratio}))*{fy}'"
|
||||
)
|
||||
except (ValueError, ZeroDivisionError):
|
||||
logger.warning(f"Invalid face position: {render_spec.face_pos}")
|
||||
elif render_spec.zoom_cut:
|
||||
# 中心缩放裁切
|
||||
target_ratio = width / height
|
||||
filters.append(
|
||||
f"crop='min(iw,ih*{target_ratio})':'min(ih,iw/{target_ratio})'"
|
||||
)
|
||||
crop_filter = self._build_crop_filter(render_spec, width, height)
|
||||
if crop_filter:
|
||||
filters.append(crop_filter)
|
||||
|
||||
# 4. 缩放和填充
|
||||
scale_filter = (
|
||||
@@ -515,9 +749,8 @@ class RenderSegmentVideoHandler(BaseHandler):
|
||||
)
|
||||
filters.append(scale_filter)
|
||||
|
||||
# 5. 特效处理(cameraShot 需要特殊处理)
|
||||
if has_camera_shot:
|
||||
# cameraShot 需要使用 filter_complex 格式
|
||||
# 5. 特效处理(cameraShot / zoom 需要使用 filter_complex)
|
||||
if has_complex_effect:
|
||||
return self._build_filter_complex_with_effects(
|
||||
base_filters=filters,
|
||||
effects=effects,
|
||||
@@ -541,15 +774,13 @@ class RenderSegmentVideoHandler(BaseHandler):
|
||||
# 计算是否需要额外的尾部冻结(源视频时长不足)
|
||||
extra_tail_freeze_sec = 0.0
|
||||
if source_duration_sec is not None:
|
||||
speed = float(render_spec.speed) if render_spec.speed else 1.0
|
||||
if speed > 0:
|
||||
# 计算变速后的有效时长
|
||||
effective_duration_sec = source_duration_sec / speed
|
||||
required_duration_sec = duration_ms / 1000.0
|
||||
# 使用已计算的 combined_pts_factor
|
||||
effective_duration_sec = source_duration_sec * combined_pts_factor
|
||||
required_duration_sec = duration_ms / 1000.0
|
||||
|
||||
# 如果源视频时长不足,需要冻结最后一帧来补足
|
||||
if effective_duration_sec < required_duration_sec:
|
||||
extra_tail_freeze_sec = required_duration_sec - effective_duration_sec
|
||||
# 如果源视频时长不足,需要冻结最后一帧来补足
|
||||
if effective_duration_sec < required_duration_sec:
|
||||
extra_tail_freeze_sec = required_duration_sec - effective_duration_sec
|
||||
|
||||
if overlap_head_ms > 0:
|
||||
# 头部冻结:将第一帧冻结指定时长
|
||||
@@ -598,7 +829,7 @@ class RenderSegmentVideoHandler(BaseHandler):
|
||||
"""
|
||||
构建包含特效的 filter_complex 滤镜图
|
||||
|
||||
cameraShot 效果需要使用 split/freezeframes/concat 滤镜组合。
|
||||
cameraShot / zoom 效果都在此处统一处理并按 effects 顺序叠加。
|
||||
|
||||
Args:
|
||||
base_filters: 基础滤镜列表
|
||||
@@ -669,6 +900,31 @@ class RenderSegmentVideoHandler(BaseHandler):
|
||||
f"{frozen_out}{rest_out}concat=n=2:v=1:a=0{effect_output}"
|
||||
)
|
||||
|
||||
current_output = effect_output
|
||||
effect_idx += 1
|
||||
elif effect.effect_type == 'zoom':
|
||||
start_sec, scale_factor, duration_sec = effect.get_zoom_params()
|
||||
if start_sec < 0 or scale_factor <= 1.0 or duration_sec <= 0:
|
||||
continue
|
||||
|
||||
zoom_end_sec = start_sec + duration_sec
|
||||
base_out = f'[eff{effect_idx}_base]'
|
||||
zoom_source_out = f'[eff{effect_idx}_zoom_src]'
|
||||
zoom_scaled_out = f'[eff{effect_idx}_zoom_scaled]'
|
||||
effect_output = f'[v_eff{effect_idx}]'
|
||||
zoom_enable = f"'between(t,{start_sec},{zoom_end_sec})'"
|
||||
|
||||
filter_parts.append(
|
||||
f"{current_output}split=2{base_out}{zoom_source_out}"
|
||||
)
|
||||
filter_parts.append(
|
||||
f"{zoom_source_out}scale=iw*{scale_factor}:ih*{scale_factor},"
|
||||
f"crop={width}:{height}:(in_w-{width})/2:(in_h-{height})/2{zoom_scaled_out}"
|
||||
)
|
||||
filter_parts.append(
|
||||
f"{base_out}{zoom_scaled_out}overlay=0:0:enable={zoom_enable}{effect_output}"
|
||||
)
|
||||
|
||||
current_output = effect_output
|
||||
effect_idx += 1
|
||||
|
||||
@@ -679,14 +935,20 @@ class RenderSegmentVideoHandler(BaseHandler):
|
||||
extra_tail_freeze_sec = 0.0
|
||||
if source_duration_sec is not None and render_spec is not None and duration_ms > 0:
|
||||
speed = float(render_spec.speed) if render_spec.speed else 1.0
|
||||
if speed > 0:
|
||||
# 计算变速后的有效时长
|
||||
effective_duration_sec = source_duration_sec / speed
|
||||
required_duration_sec = duration_ms / 1000.0
|
||||
if speed <= 0:
|
||||
speed = 1.0
|
||||
ospeed_factor = 1.0
|
||||
for effect in effects:
|
||||
if effect.effect_type == 'ospeed':
|
||||
ospeed_factor = effect.get_ospeed_params()
|
||||
break
|
||||
combined_pts_factor = (1.0 / speed) * ospeed_factor
|
||||
effective_duration_sec = source_duration_sec * combined_pts_factor
|
||||
required_duration_sec = duration_ms / 1000.0
|
||||
|
||||
# 如果源视频时长不足,需要冻结最后一帧来补足
|
||||
if effective_duration_sec < required_duration_sec:
|
||||
extra_tail_freeze_sec = required_duration_sec - effective_duration_sec
|
||||
# 如果源视频时长不足,需要冻结最后一帧来补足
|
||||
if effective_duration_sec < required_duration_sec:
|
||||
extra_tail_freeze_sec = required_duration_sec - effective_duration_sec
|
||||
|
||||
if overlap_head_ms > 0:
|
||||
head_duration_sec = overlap_head_ms / 1000.0
|
||||
|
||||
3
index.py
3
index.py
@@ -4,9 +4,8 @@
|
||||
RenderWorker v2 入口
|
||||
|
||||
支持 v2 API 协议的渲染 Worker,处理以下任务类型:
|
||||
- RENDER_SEGMENT_VIDEO: 渲染视频片段
|
||||
- RENDER_SEGMENT_TS: 渲染视频片段并封装为 TS
|
||||
- PREPARE_JOB_AUDIO: 生成全局音频
|
||||
- PACKAGE_SEGMENT_TS: 封装 TS 分片
|
||||
- FINALIZE_MP4: 产出最终 MP4
|
||||
|
||||
使用方法:
|
||||
|
||||
@@ -12,7 +12,7 @@ import logging
|
||||
import shutil
|
||||
import time
|
||||
import uuid
|
||||
from typing import Optional, Tuple
|
||||
from typing import Any, Dict, Optional, Tuple
|
||||
from urllib.parse import urlparse, unquote
|
||||
|
||||
import psutil
|
||||
@@ -66,6 +66,7 @@ class MaterialCache:
|
||||
LOCK_TIMEOUT_SEC = 30.0
|
||||
LOCK_POLL_INTERVAL_SEC = 0.1
|
||||
LOCK_STALE_SECONDS = 24 * 60 * 60
|
||||
DOWNLOAD_LOCK_TIMEOUT_SEC = 5.0
|
||||
|
||||
def __init__(self, cache_dir: str, enabled: bool = True, max_size_gb: float = 0):
|
||||
"""
|
||||
@@ -194,13 +195,14 @@ class MaterialCache:
|
||||
logger.warning(f"Cache lock remove error: {e}")
|
||||
return False
|
||||
|
||||
def _acquire_lock(self, cache_key: str) -> Optional[str]:
|
||||
def _acquire_lock(self, cache_key: str, timeout_sec: Optional[float] = None) -> Optional[str]:
|
||||
"""获取缓存锁(跨进程安全)"""
|
||||
if not self.enabled:
|
||||
return None
|
||||
|
||||
wait_timeout_sec = self.LOCK_TIMEOUT_SEC if timeout_sec is None else max(float(timeout_sec), 0.0)
|
||||
lock_path = self._get_lock_path(cache_key)
|
||||
deadline = time.monotonic() + self.LOCK_TIMEOUT_SEC
|
||||
deadline = time.monotonic() + wait_timeout_sec
|
||||
|
||||
while True:
|
||||
try:
|
||||
@@ -214,13 +216,24 @@ class MaterialCache:
|
||||
if removed:
|
||||
continue
|
||||
if time.monotonic() >= deadline:
|
||||
logger.warning(f"Cache lock timeout: {lock_path}")
|
||||
logger.warning(f"Cache lock timeout ({wait_timeout_sec:.1f}s): {lock_path}")
|
||||
return None
|
||||
time.sleep(self.LOCK_POLL_INTERVAL_SEC)
|
||||
except Exception as e:
|
||||
logger.warning(f"Cache lock error: {e}")
|
||||
return None
|
||||
|
||||
def _acquire_lock_with_wait(
|
||||
self,
|
||||
cache_key: str,
|
||||
timeout_sec: Optional[float] = None
|
||||
) -> Tuple[Optional[str], int]:
|
||||
"""获取缓存锁并返回等待时长(毫秒)"""
|
||||
start_time = time.monotonic()
|
||||
lock_path = self._acquire_lock(cache_key, timeout_sec=timeout_sec)
|
||||
lock_wait_ms = max(int((time.monotonic() - start_time) * 1000), 0)
|
||||
return lock_path, lock_wait_ms
|
||||
|
||||
def _release_lock(self, lock_path: Optional[str]) -> None:
|
||||
"""释放缓存锁"""
|
||||
if not lock_path:
|
||||
@@ -244,6 +257,27 @@ class MaterialCache:
|
||||
exists = os.path.exists(cache_path) and os.path.getsize(cache_path) > 0
|
||||
return exists, cache_path
|
||||
|
||||
def _is_cache_file_ready(self, cache_path: str) -> bool:
|
||||
"""缓存文件是否已就绪(存在且大小大于 0)"""
|
||||
try:
|
||||
return os.path.exists(cache_path) and os.path.getsize(cache_path) > 0
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
def _copy_cache_to_dest(self, cache_path: str, dest: str) -> Tuple[bool, int]:
|
||||
"""将缓存文件复制到目标路径并返回结果与文件大小"""
|
||||
try:
|
||||
shutil.copy2(cache_path, dest)
|
||||
try:
|
||||
os.utime(cache_path, None)
|
||||
except Exception as e:
|
||||
logger.debug(f"Failed to update cache access time: {e}")
|
||||
file_size = os.path.getsize(dest) if os.path.exists(dest) else 0
|
||||
return True, file_size
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to copy from cache: {e}")
|
||||
return False, 0
|
||||
|
||||
def get_or_download(
|
||||
self,
|
||||
url: str,
|
||||
@@ -251,8 +285,24 @@ class MaterialCache:
|
||||
timeout: int = 300,
|
||||
max_retries: int = 5
|
||||
) -> bool:
|
||||
"""兼容旧接口:返回下载是否成功。"""
|
||||
result, _ = self.get_or_download_with_metrics(
|
||||
url=url,
|
||||
dest=dest,
|
||||
timeout=timeout,
|
||||
max_retries=max_retries,
|
||||
)
|
||||
return result
|
||||
|
||||
def get_or_download_with_metrics(
|
||||
self,
|
||||
url: str,
|
||||
dest: str,
|
||||
timeout: int = 300,
|
||||
max_retries: int = 5
|
||||
) -> Tuple[bool, Dict[str, Any]]:
|
||||
"""
|
||||
从缓存获取素材,若未缓存则下载并缓存
|
||||
从缓存获取素材,若未缓存则下载并缓存,并返回关键指标。
|
||||
|
||||
Args:
|
||||
url: 素材 URL
|
||||
@@ -261,8 +311,14 @@ class MaterialCache:
|
||||
max_retries: 最大重试次数
|
||||
|
||||
Returns:
|
||||
是否成功
|
||||
(是否成功, 指标字典)
|
||||
"""
|
||||
metrics: Dict[str, Any] = {
|
||||
"lock_wait_ms": 0,
|
||||
"lock_acquired": False,
|
||||
"cache_path_used": "unknown",
|
||||
}
|
||||
|
||||
# 确保目标目录存在
|
||||
dest_dir = os.path.dirname(dest)
|
||||
if dest_dir:
|
||||
@@ -270,34 +326,49 @@ class MaterialCache:
|
||||
|
||||
# 缓存未启用时直接下载
|
||||
if not self.enabled:
|
||||
return storage.download_file(url, dest, max_retries=max_retries, timeout=timeout)
|
||||
result = storage.download_file(url, dest, max_retries=max_retries, timeout=timeout)
|
||||
metrics["cache_path_used"] = "direct"
|
||||
return result, metrics
|
||||
|
||||
cache_key = _extract_cache_key(url)
|
||||
lock_path = self._acquire_lock(cache_key)
|
||||
cache_path = self.get_cache_path(url)
|
||||
|
||||
def _try_serve_from_cache(log_prefix: str, delete_on_failure: bool = False) -> bool:
|
||||
if not self._is_cache_file_ready(cache_path):
|
||||
return False
|
||||
copied, file_size = self._copy_cache_to_dest(cache_path, dest)
|
||||
if copied:
|
||||
metrics["cache_path_used"] = "cache"
|
||||
logger.info(f"{log_prefix}: {url[:80]}... -> {dest} ({file_size} bytes)")
|
||||
return True
|
||||
if delete_on_failure:
|
||||
try:
|
||||
os.remove(cache_path)
|
||||
except Exception:
|
||||
pass
|
||||
return False
|
||||
|
||||
if _try_serve_from_cache("Cache hit"):
|
||||
return True, metrics
|
||||
|
||||
lock_path, lock_wait_ms = self._acquire_lock_with_wait(
|
||||
cache_key,
|
||||
timeout_sec=self.DOWNLOAD_LOCK_TIMEOUT_SEC,
|
||||
)
|
||||
metrics["lock_wait_ms"] = lock_wait_ms
|
||||
if not lock_path:
|
||||
if _try_serve_from_cache("Cache hit after lock timeout"):
|
||||
return True, metrics
|
||||
logger.warning(f"Cache lock unavailable, downloading without cache: {url[:80]}...")
|
||||
return storage.download_file(url, dest, max_retries=max_retries, timeout=timeout)
|
||||
result = storage.download_file(url, dest, max_retries=max_retries, timeout=timeout)
|
||||
metrics["cache_path_used"] = "direct"
|
||||
return result, metrics
|
||||
|
||||
metrics["lock_acquired"] = True
|
||||
|
||||
try:
|
||||
cache_path = self.get_cache_path(url)
|
||||
cached = os.path.exists(cache_path) and os.path.getsize(cache_path) > 0
|
||||
|
||||
if cached:
|
||||
# 命中缓存,复制到目标路径
|
||||
try:
|
||||
shutil.copy2(cache_path, dest)
|
||||
# 更新访问时间(用于 LRU 清理)
|
||||
os.utime(cache_path, None)
|
||||
file_size = os.path.getsize(dest)
|
||||
logger.info(f"Cache hit: {url[:80]}... -> {dest} ({file_size} bytes)")
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to copy from cache: {e}, will re-download")
|
||||
# 缓存复制失败,删除可能损坏的缓存文件
|
||||
try:
|
||||
os.remove(cache_path)
|
||||
except Exception:
|
||||
pass
|
||||
if _try_serve_from_cache("Cache hit", delete_on_failure=True):
|
||||
return True, metrics
|
||||
|
||||
# 未命中缓存,下载到缓存目录
|
||||
logger.debug(f"Cache miss: {url[:80]}...")
|
||||
@@ -312,26 +383,25 @@ class MaterialCache:
|
||||
# 下载失败,清理临时文件
|
||||
if os.path.exists(temp_cache_path):
|
||||
os.remove(temp_cache_path)
|
||||
return False
|
||||
return False, metrics
|
||||
|
||||
if not os.path.exists(temp_cache_path) or os.path.getsize(temp_cache_path) <= 0:
|
||||
if os.path.exists(temp_cache_path):
|
||||
os.remove(temp_cache_path)
|
||||
return False
|
||||
return False, metrics
|
||||
|
||||
# 下载成功,原子替换缓存文件
|
||||
os.replace(temp_cache_path, cache_path)
|
||||
|
||||
# 复制到目标路径
|
||||
shutil.copy2(cache_path, dest)
|
||||
file_size = os.path.getsize(dest)
|
||||
logger.info(f"Downloaded and cached: {url[:80]}... ({file_size} bytes)")
|
||||
if not _try_serve_from_cache("Downloaded and cached", delete_on_failure=False):
|
||||
return False, metrics
|
||||
|
||||
# 检查是否需要清理缓存
|
||||
if self.max_size_bytes > 0:
|
||||
self._cleanup_if_needed()
|
||||
|
||||
return True
|
||||
return True, metrics
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Cache download error: {e}")
|
||||
@@ -341,7 +411,7 @@ class MaterialCache:
|
||||
os.remove(temp_cache_path)
|
||||
except Exception:
|
||||
pass
|
||||
return False
|
||||
return False, metrics
|
||||
finally:
|
||||
self._release_lock(lock_path)
|
||||
|
||||
|
||||
@@ -8,7 +8,7 @@
|
||||
import os
|
||||
import logging
|
||||
import subprocess
|
||||
from typing import Optional
|
||||
from typing import Any, Dict, Optional, Tuple
|
||||
from urllib.parse import unquote
|
||||
|
||||
import requests
|
||||
@@ -65,6 +65,22 @@ def _apply_http_replace_map(url: str) -> str:
|
||||
|
||||
|
||||
def upload_file(url: str, file_path: str, max_retries: int = 5, timeout: int = 60) -> bool:
|
||||
"""兼容旧接口:仅返回上传是否成功。"""
|
||||
result, _ = upload_file_with_metrics(
|
||||
url=url,
|
||||
file_path=file_path,
|
||||
max_retries=max_retries,
|
||||
timeout=timeout,
|
||||
)
|
||||
return result
|
||||
|
||||
|
||||
def upload_file_with_metrics(
|
||||
url: str,
|
||||
file_path: str,
|
||||
max_retries: int = 5,
|
||||
timeout: int = 60
|
||||
) -> Tuple[bool, Dict[str, Any]]:
|
||||
"""
|
||||
使用签名 URL 上传文件到 OSS
|
||||
|
||||
@@ -75,30 +91,54 @@ def upload_file(url: str, file_path: str, max_retries: int = 5, timeout: int = 6
|
||||
timeout: 超时时间(秒)
|
||||
|
||||
Returns:
|
||||
是否成功
|
||||
(是否成功, 上传指标)
|
||||
"""
|
||||
metrics: Dict[str, Any] = {
|
||||
"upload_method": "none",
|
||||
"file_size_bytes": 0,
|
||||
"content_type": "",
|
||||
"http_attempts": 0,
|
||||
"http_retry_count": 0,
|
||||
"http_status_code": 0,
|
||||
"http_replace_applied": False,
|
||||
"rclone_attempted": False,
|
||||
"rclone_succeeded": False,
|
||||
"rclone_fallback_http": False,
|
||||
"error_type": "",
|
||||
}
|
||||
|
||||
if not os.path.exists(file_path):
|
||||
logger.error(f"File not found: {file_path}")
|
||||
return False
|
||||
metrics["error_type"] = "file_not_found"
|
||||
return False, metrics
|
||||
|
||||
file_size = os.path.getsize(file_path)
|
||||
metrics["file_size_bytes"] = file_size
|
||||
logger.info(f"Uploading: {file_path} ({file_size} bytes)")
|
||||
|
||||
# 检查是否使用 rclone 上传
|
||||
if os.getenv("UPLOAD_METHOD") == "rclone":
|
||||
metrics["rclone_attempted"] = True
|
||||
logger.debug(f"Uploading to: {url}")
|
||||
result = _upload_with_rclone(url, file_path)
|
||||
metrics["rclone_succeeded"] = result
|
||||
if result:
|
||||
return True
|
||||
metrics["upload_method"] = "rclone"
|
||||
return True, metrics
|
||||
# rclone 失败时回退到 HTTP
|
||||
metrics["rclone_fallback_http"] = True
|
||||
|
||||
# 应用 HTTP_REPLACE_MAP 替换 URL
|
||||
http_url = _apply_http_replace_map(url)
|
||||
metrics["http_replace_applied"] = http_url != url
|
||||
content_type = _get_content_type(file_path)
|
||||
metrics["content_type"] = content_type
|
||||
metrics["upload_method"] = "rclone_fallback_http" if metrics["rclone_fallback_http"] else "http"
|
||||
logger.debug(f"Uploading to: {http_url} (Content-Type: {content_type})")
|
||||
|
||||
retries = 0
|
||||
while retries < max_retries:
|
||||
metrics["http_attempts"] = retries + 1
|
||||
try:
|
||||
with open(file_path, 'rb') as f:
|
||||
with requests.put(
|
||||
@@ -108,19 +148,30 @@ def upload_file(url: str, file_path: str, max_retries: int = 5, timeout: int = 6
|
||||
timeout=timeout,
|
||||
headers={"Content-Type": content_type}
|
||||
) as response:
|
||||
status_code = int(getattr(response, 'status_code', 0) or 0)
|
||||
metrics["http_status_code"] = status_code
|
||||
response.raise_for_status()
|
||||
logger.info(f"Upload succeeded: {file_path}")
|
||||
return True
|
||||
metrics["error_type"] = ""
|
||||
return True, metrics
|
||||
|
||||
except requests.exceptions.Timeout:
|
||||
retries += 1
|
||||
metrics["http_retry_count"] = retries
|
||||
metrics["error_type"] = "timeout"
|
||||
logger.warning(f"Upload timed out. Retrying {retries}/{max_retries}...")
|
||||
except requests.exceptions.RequestException as e:
|
||||
retries += 1
|
||||
metrics["http_retry_count"] = retries
|
||||
metrics["error_type"] = "request_exception"
|
||||
response_obj = getattr(e, 'response', None)
|
||||
status_code = getattr(response_obj, 'status_code', 0) if response_obj is not None else 0
|
||||
if isinstance(status_code, int) and status_code > 0:
|
||||
metrics["http_status_code"] = status_code
|
||||
logger.warning(f"Upload failed ({e}). Retrying {retries}/{max_retries}...")
|
||||
|
||||
logger.error(f"Upload failed after {max_retries} retries: {file_path}")
|
||||
return False
|
||||
return False, metrics
|
||||
|
||||
|
||||
def _upload_with_rclone(url: str, file_path: str) -> bool:
|
||||
|
||||
@@ -14,7 +14,7 @@ from domain.task import Task, TaskType
|
||||
|
||||
# 需要 GPU 加速的任务类型
|
||||
GPU_REQUIRED_TASK_TYPES = {
|
||||
TaskType.RENDER_SEGMENT_VIDEO,
|
||||
TaskType.RENDER_SEGMENT_TS,
|
||||
TaskType.COMPOSE_TRANSITION,
|
||||
}
|
||||
from domain.config import WorkerConfig
|
||||
@@ -85,17 +85,15 @@ class TaskExecutor:
|
||||
def _register_handlers(self):
|
||||
"""注册所有任务处理器"""
|
||||
# 延迟导入以避免循环依赖
|
||||
from handlers.render_video import RenderSegmentVideoHandler
|
||||
from handlers.render_video import RenderSegmentTsHandler
|
||||
from handlers.compose_transition import ComposeTransitionHandler
|
||||
from handlers.prepare_audio import PrepareJobAudioHandler
|
||||
from handlers.package_ts import PackageSegmentTsHandler
|
||||
from handlers.finalize_mp4 import FinalizeMp4Handler
|
||||
|
||||
handlers = [
|
||||
RenderSegmentVideoHandler(self.config, self.api_client),
|
||||
RenderSegmentTsHandler(self.config, self.api_client),
|
||||
ComposeTransitionHandler(self.config, self.api_client),
|
||||
PrepareJobAudioHandler(self.config, self.api_client),
|
||||
PackageSegmentTsHandler(self.config, self.api_client),
|
||||
FinalizeMp4Handler(self.config, self.api_client),
|
||||
]
|
||||
|
||||
|
||||
238
tests/unit/test_base_handler_parallel_transfer.py
Normal file
238
tests/unit/test_base_handler_parallel_transfer.py
Normal file
@@ -0,0 +1,238 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import os
|
||||
from contextlib import contextmanager
|
||||
from types import SimpleNamespace
|
||||
|
||||
import pytest
|
||||
|
||||
from domain.config import WorkerConfig
|
||||
from domain.result import TaskResult
|
||||
from domain.task import TaskType
|
||||
from handlers.base import BaseHandler
|
||||
|
||||
|
||||
class _DummyApiClient:
|
||||
pass
|
||||
|
||||
|
||||
class _DummyHandler(BaseHandler):
|
||||
def handle(self, task):
|
||||
return TaskResult.ok({})
|
||||
|
||||
def get_supported_type(self):
|
||||
return TaskType.RENDER_SEGMENT_TS
|
||||
|
||||
|
||||
def _create_handler(tmp_path):
|
||||
config = WorkerConfig(
|
||||
api_endpoint='http://127.0.0.1:18084/api',
|
||||
access_key='TEST_ACCESS_KEY',
|
||||
worker_id='test-worker',
|
||||
temp_dir=str(tmp_path),
|
||||
cache_enabled=False,
|
||||
cache_dir=str(tmp_path / 'cache')
|
||||
)
|
||||
return _DummyHandler(config, _DummyApiClient())
|
||||
|
||||
|
||||
def test_download_files_parallel_collects_success_and_failure(tmp_path, monkeypatch):
|
||||
handler = _create_handler(tmp_path)
|
||||
handler.task_download_concurrency = 3
|
||||
captured_calls = []
|
||||
|
||||
def _fake_download(url, dest, timeout=None, use_cache=True):
|
||||
captured_calls.append((url, dest, timeout, use_cache))
|
||||
os.makedirs(os.path.dirname(dest), exist_ok=True)
|
||||
with open(dest, 'wb') as file_obj:
|
||||
file_obj.write(b'1')
|
||||
return not url.endswith('/fail')
|
||||
|
||||
monkeypatch.setattr(handler, 'download_file', _fake_download)
|
||||
results = handler.download_files_parallel(
|
||||
[
|
||||
{'key': 'first', 'url': 'https://example.com/first', 'dest': str(tmp_path / 'first.bin')},
|
||||
{'key': 'second', 'url': 'https://example.com/fail', 'dest': str(tmp_path / 'second.bin')},
|
||||
{'key': 'third', 'url': 'https://example.com/third', 'dest': str(tmp_path / 'third.bin'), 'use_cache': False},
|
||||
],
|
||||
timeout=15,
|
||||
)
|
||||
|
||||
assert len(captured_calls) == 3
|
||||
assert results['first']['success'] is True
|
||||
assert results['second']['success'] is False
|
||||
assert results['third']['success'] is True
|
||||
assert any(call_item[3] is False for call_item in captured_calls)
|
||||
|
||||
|
||||
def test_download_files_parallel_rejects_duplicate_key(tmp_path):
|
||||
handler = _create_handler(tmp_path)
|
||||
with pytest.raises(ValueError, match='Duplicate download job key'):
|
||||
handler.download_files_parallel(
|
||||
[
|
||||
{'key': 'dup', 'url': 'https://example.com/1', 'dest': str(tmp_path / '1.bin')},
|
||||
{'key': 'dup', 'url': 'https://example.com/2', 'dest': str(tmp_path / '2.bin')},
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
def test_upload_files_parallel_collects_urls(tmp_path, monkeypatch):
|
||||
handler = _create_handler(tmp_path)
|
||||
handler.task_upload_concurrency = 2
|
||||
|
||||
def _fake_upload(task_id, file_type, file_path, file_name=None):
|
||||
if file_type == 'video':
|
||||
return f'https://cdn.example.com/{task_id}/{file_name or "video.mp4"}'
|
||||
return None
|
||||
|
||||
monkeypatch.setattr(handler, 'upload_file', _fake_upload)
|
||||
results = handler.upload_files_parallel(
|
||||
[
|
||||
{
|
||||
'key': 'video_output',
|
||||
'task_id': 'task-1',
|
||||
'file_type': 'video',
|
||||
'file_path': str(tmp_path / 'video.mp4'),
|
||||
'file_name': 'output.mp4',
|
||||
},
|
||||
{
|
||||
'key': 'audio_output',
|
||||
'task_id': 'task-1',
|
||||
'file_type': 'audio',
|
||||
'file_path': str(tmp_path / 'audio.aac'),
|
||||
},
|
||||
]
|
||||
)
|
||||
|
||||
assert results['video_output']['success'] is True
|
||||
assert results['video_output']['url'] == 'https://cdn.example.com/task-1/output.mp4'
|
||||
assert results['audio_output']['success'] is False
|
||||
assert results['audio_output']['url'] is None
|
||||
|
||||
|
||||
def test_download_file_sets_lock_wait_ms_span_attribute(tmp_path, monkeypatch):
|
||||
handler = _create_handler(tmp_path)
|
||||
destination = tmp_path / "download.bin"
|
||||
|
||||
class _FakeSpan:
|
||||
def __init__(self):
|
||||
self.attributes = {}
|
||||
|
||||
def set_attribute(self, key, value):
|
||||
self.attributes[key] = value
|
||||
|
||||
fake_span = _FakeSpan()
|
||||
|
||||
@contextmanager
|
||||
def _fake_start_span(name, kind=None, attributes=None):
|
||||
if attributes:
|
||||
fake_span.attributes.update(attributes)
|
||||
yield fake_span
|
||||
|
||||
def _fake_get_or_download_with_metrics(url, dest, timeout=300, max_retries=5):
|
||||
os.makedirs(os.path.dirname(dest), exist_ok=True)
|
||||
with open(dest, 'wb') as file_obj:
|
||||
file_obj.write(b'abc')
|
||||
return True, {"lock_wait_ms": 1234, "lock_acquired": True, "cache_path_used": "cache"}
|
||||
|
||||
monkeypatch.setattr("handlers.base.start_span", _fake_start_span)
|
||||
monkeypatch.setattr(
|
||||
handler.material_cache,
|
||||
"get_or_download_with_metrics",
|
||||
_fake_get_or_download_with_metrics
|
||||
)
|
||||
|
||||
assert handler.download_file("https://example.com/file.bin", str(destination), timeout=1, use_cache=True)
|
||||
assert fake_span.attributes["render.file.lock_wait_ms"] == 1234
|
||||
assert fake_span.attributes["render.file.lock_acquired"] is True
|
||||
assert fake_span.attributes["render.file.cache_path_used"] == "cache"
|
||||
|
||||
|
||||
def test_download_file_without_cache_sets_lock_wait_ms_zero(tmp_path, monkeypatch):
|
||||
handler = _create_handler(tmp_path)
|
||||
destination = tmp_path / "download-no-cache.bin"
|
||||
|
||||
class _FakeSpan:
|
||||
def __init__(self):
|
||||
self.attributes = {}
|
||||
|
||||
def set_attribute(self, key, value):
|
||||
self.attributes[key] = value
|
||||
|
||||
fake_span = _FakeSpan()
|
||||
|
||||
@contextmanager
|
||||
def _fake_start_span(name, kind=None, attributes=None):
|
||||
if attributes:
|
||||
fake_span.attributes.update(attributes)
|
||||
yield fake_span
|
||||
|
||||
def _fake_storage_download(url, dest, timeout=30):
|
||||
os.makedirs(os.path.dirname(dest), exist_ok=True)
|
||||
with open(dest, 'wb') as file_obj:
|
||||
file_obj.write(b'def')
|
||||
return True
|
||||
|
||||
monkeypatch.setattr("handlers.base.start_span", _fake_start_span)
|
||||
monkeypatch.setattr("handlers.base.storage.download_file", _fake_storage_download)
|
||||
|
||||
assert handler.download_file("https://example.com/file.bin", str(destination), timeout=1, use_cache=False)
|
||||
assert fake_span.attributes["render.file.lock_wait_ms"] == 0
|
||||
assert fake_span.attributes["render.file.lock_acquired"] is False
|
||||
assert fake_span.attributes["render.file.cache_path_used"] == "direct"
|
||||
|
||||
|
||||
def test_upload_file_sets_detailed_span_attributes(tmp_path, monkeypatch):
|
||||
handler = _create_handler(tmp_path)
|
||||
source_path = tmp_path / "upload.mp4"
|
||||
source_path.write_bytes(b"abc123")
|
||||
fake_span_attributes = {}
|
||||
|
||||
class _FakeSpan:
|
||||
def set_attribute(self, key, value):
|
||||
fake_span_attributes[key] = value
|
||||
|
||||
@contextmanager
|
||||
def _fake_start_span(name, kind=None, attributes=None):
|
||||
if attributes:
|
||||
fake_span_attributes.update(attributes)
|
||||
yield _FakeSpan()
|
||||
|
||||
handler.api_client = SimpleNamespace(
|
||||
get_upload_url=lambda *args, **kwargs: {
|
||||
"uploadUrl": "https://example.com/upload",
|
||||
"accessUrl": "https://cdn.example.com/output.mp4",
|
||||
}
|
||||
)
|
||||
|
||||
monkeypatch.setattr("handlers.base.start_span", _fake_start_span)
|
||||
monkeypatch.setattr(
|
||||
"handlers.base.storage.upload_file_with_metrics",
|
||||
lambda *args, **kwargs: (
|
||||
True,
|
||||
{
|
||||
"upload_method": "http",
|
||||
"http_attempts": 2,
|
||||
"http_retry_count": 1,
|
||||
"http_status_code": 200,
|
||||
"http_replace_applied": True,
|
||||
"content_type": "video/mp4",
|
||||
"error_type": "",
|
||||
"rclone_attempted": False,
|
||||
"rclone_succeeded": False,
|
||||
"rclone_fallback_http": False,
|
||||
},
|
||||
)
|
||||
)
|
||||
monkeypatch.setattr(handler.material_cache, "add_to_cache", lambda *args, **kwargs: True)
|
||||
|
||||
access_url = handler.upload_file("task-1", "video", str(source_path), "output.mp4")
|
||||
assert access_url == "https://cdn.example.com/output.mp4"
|
||||
assert fake_span_attributes["render.file.upload_success"] is True
|
||||
assert fake_span_attributes["render.file.upload_method"] == "http"
|
||||
assert fake_span_attributes["render.file.http_attempts"] == 2
|
||||
assert fake_span_attributes["render.file.http_retry_count"] == 1
|
||||
assert fake_span_attributes["render.file.http_status_code"] == 200
|
||||
assert fake_span_attributes["render.file.http_replace_applied"] is True
|
||||
assert fake_span_attributes["render.file.content_type"] == "video/mp4"
|
||||
assert fake_span_attributes["render.file.cache_write_back"] == "success"
|
||||
@@ -13,3 +13,89 @@ def test_cache_lock_acquire_release(tmp_path):
|
||||
assert os.path.exists(lock_path)
|
||||
cache._release_lock(lock_path)
|
||||
assert not os.path.exists(lock_path)
|
||||
|
||||
|
||||
def test_get_or_download_cache_hit_does_not_wait_lock(tmp_path, monkeypatch):
|
||||
cache = MaterialCache(cache_dir=str(tmp_path), enabled=True, max_size_gb=0)
|
||||
url = "https://example.com/path/video.mp4?token=abc"
|
||||
cache_path = cache.get_cache_path(url)
|
||||
with open(cache_path, 'wb') as file_obj:
|
||||
file_obj.write(b'cached-data')
|
||||
destination = tmp_path / "result.bin"
|
||||
|
||||
def _unexpected_acquire(*args, **kwargs):
|
||||
raise AssertionError("cache hit path should not acquire lock")
|
||||
|
||||
monkeypatch.setattr(cache, "_acquire_lock", _unexpected_acquire)
|
||||
|
||||
assert cache.get_or_download(url, str(destination), timeout=1) is True
|
||||
assert destination.read_bytes() == b'cached-data'
|
||||
|
||||
|
||||
def test_get_or_download_lock_timeout_can_still_use_ready_cache(tmp_path, monkeypatch):
|
||||
cache = MaterialCache(cache_dir=str(tmp_path), enabled=True, max_size_gb=0)
|
||||
url = "https://example.com/path/audio.aac?token=abc"
|
||||
cache_path = cache.get_cache_path(url)
|
||||
with open(cache_path, 'wb') as file_obj:
|
||||
file_obj.write(b'audio-cache')
|
||||
destination = tmp_path / "audio.aac"
|
||||
download_called = {"value": False}
|
||||
|
||||
monkeypatch.setattr(cache, "_acquire_lock", lambda *args, **kwargs: None)
|
||||
|
||||
def _fake_download(*args, **kwargs):
|
||||
download_called["value"] = True
|
||||
return False
|
||||
|
||||
monkeypatch.setattr("services.cache.storage.download_file", _fake_download)
|
||||
|
||||
assert cache.get_or_download(url, str(destination), timeout=1) is True
|
||||
assert destination.read_bytes() == b'audio-cache'
|
||||
assert download_called["value"] is False
|
||||
|
||||
|
||||
def test_get_or_download_uses_short_lock_timeout(tmp_path, monkeypatch):
|
||||
cache = MaterialCache(cache_dir=str(tmp_path), enabled=True, max_size_gb=0)
|
||||
url = "https://example.com/path/segment.ts?token=abc"
|
||||
destination = tmp_path / "segment.ts"
|
||||
captured = {"timeout_sec": None}
|
||||
|
||||
def _fake_acquire(cache_key, timeout_sec=None):
|
||||
captured["timeout_sec"] = timeout_sec
|
||||
return None
|
||||
|
||||
monkeypatch.setattr(cache, "_acquire_lock", _fake_acquire)
|
||||
monkeypatch.setattr("services.cache.storage.download_file", lambda *args, **kwargs: True)
|
||||
|
||||
assert cache.get_or_download(url, str(destination), timeout=1) is True
|
||||
assert captured["timeout_sec"] == cache.DOWNLOAD_LOCK_TIMEOUT_SEC
|
||||
|
||||
|
||||
def test_get_or_download_with_metrics_cache_hit_wait_zero(tmp_path):
|
||||
cache = MaterialCache(cache_dir=str(tmp_path), enabled=True, max_size_gb=0)
|
||||
url = "https://example.com/path/hit.mp4?token=abc"
|
||||
cache_path = cache.get_cache_path(url)
|
||||
with open(cache_path, 'wb') as file_obj:
|
||||
file_obj.write(b'hit-data')
|
||||
destination = tmp_path / "hit.mp4"
|
||||
|
||||
success, metrics = cache.get_or_download_with_metrics(url, str(destination), timeout=1)
|
||||
assert success is True
|
||||
assert metrics["lock_wait_ms"] == 0
|
||||
assert metrics["lock_acquired"] is False
|
||||
assert metrics["cache_path_used"] == "cache"
|
||||
|
||||
|
||||
def test_get_or_download_with_metrics_reports_lock_wait_ms(tmp_path, monkeypatch):
|
||||
cache = MaterialCache(cache_dir=str(tmp_path), enabled=True, max_size_gb=0)
|
||||
url = "https://example.com/path/miss.mp4?token=abc"
|
||||
destination = tmp_path / "miss.mp4"
|
||||
|
||||
monkeypatch.setattr(cache, "_acquire_lock_with_wait", lambda *args, **kwargs: (None, 4321))
|
||||
monkeypatch.setattr("services.cache.storage.download_file", lambda *args, **kwargs: True)
|
||||
|
||||
success, metrics = cache.get_or_download_with_metrics(url, str(destination), timeout=1)
|
||||
assert success is True
|
||||
assert metrics["lock_wait_ms"] == 4321
|
||||
assert metrics["lock_acquired"] is False
|
||||
assert metrics["cache_path_used"] == "direct"
|
||||
|
||||
235
tests/unit/test_render_video_effects.py
Normal file
235
tests/unit/test_render_video_effects.py
Normal file
@@ -0,0 +1,235 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import pytest
|
||||
|
||||
from domain.config import WorkerConfig
|
||||
from domain.task import Effect, OutputSpec, RenderSpec
|
||||
from handlers.render_video import RenderSegmentTsHandler
|
||||
|
||||
|
||||
class _DummyApiClient:
|
||||
pass
|
||||
|
||||
|
||||
def _create_handler(tmp_path):
|
||||
config = WorkerConfig(
|
||||
api_endpoint='http://127.0.0.1:18084/api',
|
||||
access_key='TEST_ACCESS_KEY',
|
||||
worker_id='test-worker',
|
||||
temp_dir=str(tmp_path),
|
||||
cache_enabled=False,
|
||||
cache_dir=str(tmp_path / 'cache')
|
||||
)
|
||||
return RenderSegmentTsHandler(config, _DummyApiClient())
|
||||
|
||||
|
||||
def test_get_zoom_params_with_valid_values():
|
||||
effect = Effect.from_string('zoom:1.5,1.35,2')
|
||||
assert effect is not None
|
||||
|
||||
start_sec, scale_factor, duration_sec = effect.get_zoom_params()
|
||||
assert start_sec == pytest.approx(1.5)
|
||||
assert scale_factor == pytest.approx(1.35)
|
||||
assert duration_sec == pytest.approx(2.0)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
'effect_str',
|
||||
[
|
||||
'zoom:-1,0.9,-2',
|
||||
'zoom:nan,inf,0',
|
||||
'zoom:bad,value,data',
|
||||
'zoom:,,',
|
||||
],
|
||||
)
|
||||
def test_get_zoom_params_invalid_values_fallback_to_default(effect_str):
|
||||
effect = Effect.from_string(effect_str)
|
||||
assert effect is not None
|
||||
assert effect.get_zoom_params() == (0.0, 1.2, 1.0)
|
||||
|
||||
|
||||
def test_build_command_with_zoom_uses_filter_complex(tmp_path):
|
||||
handler = _create_handler(tmp_path)
|
||||
render_spec = RenderSpec(effects='zoom:1.5,1.4,2')
|
||||
output_spec = OutputSpec(width=1080, height=1920, fps=30)
|
||||
|
||||
command = handler._build_command(
|
||||
input_file='input.mp4',
|
||||
output_file='output.mp4',
|
||||
render_spec=render_spec,
|
||||
output_spec=output_spec,
|
||||
duration_ms=6000,
|
||||
)
|
||||
|
||||
assert '-filter_complex' in command
|
||||
assert '-vf' not in command
|
||||
|
||||
|
||||
def test_build_video_filters_zoom_and_camera_shot_stack_in_order(tmp_path):
|
||||
handler = _create_handler(tmp_path)
|
||||
render_spec = RenderSpec(effects='cameraShot:3,1|zoom:1,1.2,2')
|
||||
output_spec = OutputSpec(width=1080, height=1920, fps=30)
|
||||
|
||||
filters = handler._build_video_filters(
|
||||
render_spec=render_spec,
|
||||
output_spec=output_spec,
|
||||
duration_ms=8000,
|
||||
source_duration_sec=10.0,
|
||||
)
|
||||
|
||||
camera_shot_marker = 'concat=n=2:v=1:a=0'
|
||||
zoom_marker = "overlay=0:0:enable='between(t,1.0,3.0)'"
|
||||
assert camera_shot_marker in filters
|
||||
assert zoom_marker in filters
|
||||
assert filters.index(camera_shot_marker) < filters.index(zoom_marker)
|
||||
|
||||
|
||||
# ---------- ospeed 测试 ----------
|
||||
|
||||
def test_get_ospeed_params_with_valid_values():
|
||||
effect = Effect.from_string('ospeed:2')
|
||||
assert effect is not None
|
||||
assert effect.get_ospeed_params() == pytest.approx(2.0)
|
||||
|
||||
effect2 = Effect.from_string('ospeed:0.5')
|
||||
assert effect2 is not None
|
||||
assert effect2.get_ospeed_params() == pytest.approx(0.5)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
'effect_str',
|
||||
[
|
||||
'ospeed:0',
|
||||
'ospeed:-1',
|
||||
'ospeed:nan',
|
||||
'ospeed:inf',
|
||||
'ospeed:abc',
|
||||
'ospeed:',
|
||||
],
|
||||
)
|
||||
def test_get_ospeed_params_invalid_values_fallback(effect_str):
|
||||
effect = Effect.from_string(effect_str)
|
||||
assert effect is not None
|
||||
assert effect.get_ospeed_params() == 1.0
|
||||
|
||||
|
||||
def test_get_ospeed_params_no_params():
|
||||
effect = Effect.from_string('ospeed')
|
||||
assert effect is not None
|
||||
assert effect.get_ospeed_params() == 1.0
|
||||
|
||||
|
||||
def test_ospeed_does_not_trigger_filter_complex(tmp_path):
|
||||
handler = _create_handler(tmp_path)
|
||||
render_spec = RenderSpec(effects='ospeed:2')
|
||||
output_spec = OutputSpec(width=1080, height=1920, fps=30)
|
||||
|
||||
command = handler._build_command(
|
||||
input_file='input.mp4',
|
||||
output_file='output.mp4',
|
||||
render_spec=render_spec,
|
||||
output_spec=output_spec,
|
||||
duration_ms=6000,
|
||||
)
|
||||
|
||||
assert '-vf' in command
|
||||
assert '-filter_complex' not in command
|
||||
# 验证 setpts 滤镜
|
||||
vf_idx = command.index('-vf')
|
||||
vf_value = command[vf_idx + 1]
|
||||
assert 'setpts=2.0*(PTS-STARTPTS)' in vf_value
|
||||
|
||||
|
||||
def test_default_speed_always_normalizes_pts(tmp_path):
|
||||
handler = _create_handler(tmp_path)
|
||||
render_spec = RenderSpec()
|
||||
output_spec = OutputSpec(width=1080, height=1920, fps=30)
|
||||
|
||||
filters = handler._build_video_filters(
|
||||
render_spec=render_spec,
|
||||
output_spec=output_spec,
|
||||
duration_ms=6000,
|
||||
source_duration_sec=10.0,
|
||||
)
|
||||
|
||||
assert 'setpts=PTS-STARTPTS' in filters
|
||||
|
||||
|
||||
def test_ospeed_combined_with_speed(tmp_path):
|
||||
handler = _create_handler(tmp_path)
|
||||
# speed=2 → 1/2=0.5, ospeed=3 → 0.5*3=1.5
|
||||
render_spec = RenderSpec(speed='2', effects='ospeed:3')
|
||||
output_spec = OutputSpec(width=1080, height=1920, fps=30)
|
||||
|
||||
filters = handler._build_video_filters(
|
||||
render_spec=render_spec,
|
||||
output_spec=output_spec,
|
||||
duration_ms=6000,
|
||||
source_duration_sec=10.0,
|
||||
)
|
||||
|
||||
assert 'setpts=1.5*(PTS-STARTPTS)' in filters
|
||||
|
||||
|
||||
def test_ospeed_with_complex_effects(tmp_path):
|
||||
handler = _create_handler(tmp_path)
|
||||
render_spec = RenderSpec(effects='ospeed:2|zoom:1,1.2,2')
|
||||
output_spec = OutputSpec(width=1080, height=1920, fps=30)
|
||||
|
||||
filters = handler._build_video_filters(
|
||||
render_spec=render_spec,
|
||||
output_spec=output_spec,
|
||||
duration_ms=8000,
|
||||
source_duration_sec=10.0,
|
||||
)
|
||||
|
||||
# zoom 触发 filter_complex
|
||||
assert '[v_base]' in filters
|
||||
# setpts 在 base_chain 中
|
||||
assert 'setpts=2.0*(PTS-STARTPTS)' in filters
|
||||
# zoom 正常处理
|
||||
assert "overlay=0:0:enable='between(t,1.0,3.0)'" in filters
|
||||
|
||||
|
||||
def test_only_first_ospeed_is_used(tmp_path):
|
||||
handler = _create_handler(tmp_path)
|
||||
render_spec = RenderSpec(effects='ospeed:2|ospeed:5')
|
||||
output_spec = OutputSpec(width=1080, height=1920, fps=30)
|
||||
|
||||
filters = handler._build_video_filters(
|
||||
render_spec=render_spec,
|
||||
output_spec=output_spec,
|
||||
duration_ms=6000,
|
||||
source_duration_sec=10.0,
|
||||
)
|
||||
|
||||
assert 'setpts=2.0*(PTS-STARTPTS)' in filters
|
||||
assert 'setpts=5.0*(PTS-STARTPTS)' not in filters
|
||||
|
||||
|
||||
def test_ospeed_affects_tpad_calculation(tmp_path):
|
||||
handler = _create_handler(tmp_path)
|
||||
# ospeed:2 使 10s 视频变为 20s 有效时长,目标 6s → 无需 tpad
|
||||
render_spec = RenderSpec(effects='ospeed:2')
|
||||
output_spec = OutputSpec(width=1080, height=1920, fps=30)
|
||||
|
||||
filters = handler._build_video_filters(
|
||||
render_spec=render_spec,
|
||||
output_spec=output_spec,
|
||||
duration_ms=6000,
|
||||
source_duration_sec=10.0,
|
||||
)
|
||||
|
||||
assert 'tpad' not in filters
|
||||
|
||||
# 对比:无 ospeed 时 10s 视频 → 目标 15s → 需要 5s tpad
|
||||
render_spec_no_ospeed = RenderSpec()
|
||||
filters_no_ospeed = handler._build_video_filters(
|
||||
render_spec=render_spec_no_ospeed,
|
||||
output_spec=output_spec,
|
||||
duration_ms=15000,
|
||||
source_duration_sec=10.0,
|
||||
)
|
||||
|
||||
assert 'tpad' in filters_no_ospeed
|
||||
assert 'stop_duration=5.0' in filters_no_ospeed
|
||||
81
tests/unit/test_storage_upload_metrics.py
Normal file
81
tests/unit/test_storage_upload_metrics.py
Normal file
@@ -0,0 +1,81 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import requests
|
||||
|
||||
from services import storage
|
||||
|
||||
|
||||
class _FakeResponse:
|
||||
def __init__(self, status_code=200):
|
||||
self.status_code = status_code
|
||||
|
||||
def raise_for_status(self):
|
||||
return None
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
return False
|
||||
|
||||
|
||||
def test_upload_file_with_metrics_file_not_found(tmp_path):
|
||||
missing_path = tmp_path / "missing.mp4"
|
||||
success, metrics = storage.upload_file_with_metrics(
|
||||
"https://example.com/upload",
|
||||
str(missing_path),
|
||||
max_retries=1,
|
||||
timeout=1,
|
||||
)
|
||||
|
||||
assert success is False
|
||||
assert metrics["error_type"] == "file_not_found"
|
||||
assert metrics["upload_method"] == "none"
|
||||
|
||||
|
||||
def test_upload_file_with_metrics_http_success(tmp_path, monkeypatch):
|
||||
source_path = tmp_path / "video.mp4"
|
||||
source_path.write_bytes(b"content")
|
||||
|
||||
monkeypatch.setattr("services.storage.requests.put", lambda *args, **kwargs: _FakeResponse(200))
|
||||
|
||||
success, metrics = storage.upload_file_with_metrics(
|
||||
"https://example.com/upload",
|
||||
str(source_path),
|
||||
max_retries=3,
|
||||
timeout=1,
|
||||
)
|
||||
|
||||
assert success is True
|
||||
assert metrics["upload_method"] == "http"
|
||||
assert metrics["http_attempts"] == 1
|
||||
assert metrics["http_retry_count"] == 0
|
||||
assert metrics["http_status_code"] == 200
|
||||
assert metrics["content_type"] == "video/mp4"
|
||||
|
||||
|
||||
def test_upload_file_with_metrics_retry_then_success(tmp_path, monkeypatch):
|
||||
source_path = tmp_path / "audio.aac"
|
||||
source_path.write_bytes(b"audio-bytes")
|
||||
call_counter = {"count": 0}
|
||||
|
||||
def _fake_put(*args, **kwargs):
|
||||
call_counter["count"] += 1
|
||||
if call_counter["count"] == 1:
|
||||
raise requests.exceptions.Timeout()
|
||||
return _FakeResponse(200)
|
||||
|
||||
monkeypatch.setattr("services.storage.requests.put", _fake_put)
|
||||
|
||||
success, metrics = storage.upload_file_with_metrics(
|
||||
"https://example.com/upload",
|
||||
str(source_path),
|
||||
max_retries=3,
|
||||
timeout=1,
|
||||
)
|
||||
|
||||
assert success is True
|
||||
assert metrics["http_attempts"] == 2
|
||||
assert metrics["http_retry_count"] == 1
|
||||
assert metrics["http_status_code"] == 200
|
||||
assert metrics["error_type"] == ""
|
||||
51
tests/unit/test_tracing.py
Normal file
51
tests/unit/test_tracing.py
Normal file
@@ -0,0 +1,51 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import importlib
|
||||
from types import SimpleNamespace
|
||||
|
||||
import util.tracing as tracing_module
|
||||
|
||||
|
||||
def _create_task_stub():
|
||||
task_type = SimpleNamespace(value="RENDER_SEGMENT_TS")
|
||||
return SimpleNamespace(
|
||||
task_id="task-1001",
|
||||
task_type=task_type,
|
||||
get_job_id=lambda: "job-2002",
|
||||
get_segment_id=lambda: "seg-3003",
|
||||
)
|
||||
|
||||
|
||||
def test_task_trace_scope_sets_and_resets_context(monkeypatch):
|
||||
monkeypatch.setenv("OTEL_ENABLED", "false")
|
||||
tracing = importlib.reload(tracing_module)
|
||||
|
||||
assert tracing.initialize_tracing("worker-1", "2.0.0") is False
|
||||
assert tracing.get_current_task_context() is None
|
||||
|
||||
with tracing.task_trace_scope(_create_task_stub()) as span:
|
||||
assert span is None
|
||||
context = tracing.get_current_task_context()
|
||||
assert context is not None
|
||||
assert context.task_id == "task-1001"
|
||||
assert context.task_type == "RENDER_SEGMENT_TS"
|
||||
assert context.job_id == "job-2002"
|
||||
assert context.segment_id == "seg-3003"
|
||||
|
||||
with tracing.start_span("render.task.sample.step") as child_span:
|
||||
assert child_span is None
|
||||
|
||||
assert tracing.get_current_task_context() is None
|
||||
|
||||
|
||||
def test_bind_trace_context_restores_previous_context(monkeypatch):
|
||||
monkeypatch.setenv("OTEL_ENABLED", "false")
|
||||
tracing = importlib.reload(tracing_module)
|
||||
|
||||
tracing.initialize_tracing("worker-1", "2.0.0")
|
||||
context = tracing.TaskTraceContext(task_id="task-1", task_type="FINALIZE_MP4")
|
||||
|
||||
assert tracing.get_current_task_context() is None
|
||||
with tracing.bind_trace_context(None, context):
|
||||
assert tracing.get_current_task_context() == context
|
||||
assert tracing.get_current_task_context() is None
|
||||
25
tests/unit/test_video_encode_args.py
Normal file
25
tests/unit/test_video_encode_args.py
Normal file
@@ -0,0 +1,25 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
from constant import HW_ACCEL_CUDA, HW_ACCEL_NONE, HW_ACCEL_QSV
|
||||
from handlers.base import get_video_encode_args
|
||||
|
||||
|
||||
def _assert_bframe_disabled(args):
|
||||
assert '-bf' in args
|
||||
bf_index = args.index('-bf')
|
||||
assert args[bf_index + 1] == '0'
|
||||
|
||||
|
||||
def test_get_video_encode_args_disable_b_frames_for_software():
|
||||
args = get_video_encode_args(HW_ACCEL_NONE)
|
||||
_assert_bframe_disabled(args)
|
||||
|
||||
|
||||
def test_get_video_encode_args_disable_b_frames_for_qsv():
|
||||
args = get_video_encode_args(HW_ACCEL_QSV)
|
||||
_assert_bframe_disabled(args)
|
||||
|
||||
|
||||
def test_get_video_encode_args_disable_b_frames_for_cuda():
|
||||
args = get_video_encode_args(HW_ACCEL_CUDA)
|
||||
_assert_bframe_disabled(args)
|
||||
Reference in New Issue
Block a user