83 Commits

Author SHA1 Message Date
9c6186ecd3 feat(video): 添加视频转场功能支持
- 在 TASK_TYPES 中新增 COMPOSE_TRANSITION 类型
- 定义 TRANSITION_TYPES 常量支持多种转场效果
- 在 TaskType 枚举中添加 COMPOSE_TRANSITION
- 创建 TransitionConfig 数据类处理转场配置
- 为 RenderSpec 添加 transition_in 和 transition_out 属性
- 在 Task 类中添加转场相关的方法
- 新增 ComposeTransitionHandler 处理转场合成任务
- 修改 PackageSegmentTsHandler 支持转场分片封装
- 修改 RenderSegmentVideoHandler 支持 overlap 区域生成
- 在 TaskExecutor 中注册转场处理器
2026-01-12 22:41:22 +08:00
2911a4eff8 refactor(core): 移除旧版 FFmpeg 业务逻辑并重构常量配置
- 删除 biz/ffmpeg.py 和 biz/task.py 旧版业务模块
- 删除 entity/ffmpeg.py FFmpeg 任务实体类
- 删除 config/__init__.py 旧版配置初始化
- 更新 constant/__init__.py 常量定义,从 v1/v2 版本改为统一版本
- 修改 handlers/base.py 基础处理器,替换 OSS 相关导入为存储服务
- 添加 subprocess_args 工具函数支持跨平台进程参数配置
- 新增 probe_video_info 函数用于视频信息探测
- 新增 probe_duration_json 函数用于媒体时长探测
2026-01-12 17:01:18 +08:00
24de32e6bb feat(render): 实现渲染系统v2核心架构
- 添加v2支持的任务类型常量定义
- 更新软件版本至0.0.9
- 定义v2统一音视频编码参数
- 实现系统信息工具get_sys_info_v2方法
- 新增get_capabilities和_get_gpu_info功能
- 创建core模块及TaskHandler抽象基类
- 添加渲染系统设计文档包括集群架构、v2 PRD和Worker PRD
- 实现任务处理器抽象基类及接口规范
2026-01-12 17:01:18 +08:00
357c0afb3b feat(util): 添加FFmpeg通用参数环境变量支持
- 通过FFMPEG_COMMON_ARGS环境变量传入通用FFmpeg参数
- 在执行FFmpeg命令时合并环境变量中的通用参数
- 保持原有FFmpeg参数传递机制不变
2026-01-10 22:51:48 +08:00
8de0564fef feat(biz): 更新FFmpeg任务启动功能以支持环境变量配置最大工作线程数
- 修改start_ffmpeg_task函数参数max_workers默认值为None
- 添加环境变量FFMPEG_MAX_WORKERS读取逻辑
- 当max_workers为None时从环境变量获取默认值,否则使用传入值
- 保持原有tracer和任务分析功能不变
2026-01-10 18:28:00 +08:00
c61f6d7521 refactor(ffmpeg): 优化FFmpeg任务处理逻辑
- 添加as_completed导入以支持并发任务执行
- 实现线程池并发处理子任务,提高执行效率
- 添加任务数量监控指标
- 实现快速失败机制,及时取消剩余任务
- 增强异常处理和错误日志记录
- 添加最大工作线程数参数配置
2026-01-01 00:09:31 +08:00
4ef57a208e feat(oss): 添加 HTTP_REPLACE_MAP 环境变量支持
- 实现 _apply_http_replace_map 函数用于 URL 替换
- 在上传文件时应用 HTTP_REPLACE_MAP 环境变量替换 URL
- 添加 http_url 属性到 trace span 中
- 支持通过环境变量配置 URL 替换规则
2025-12-31 17:28:38 +08:00
a415d8571d chore(constant): 更新软件版本号至0.0.8
- 将 SOFTWARE_VERSION 从 0.0.6 更新到 0.0.8

feat(util/oss): 支持自定义rclone配置文件路径

- 新增读取环境变量 RCLONE_CONFIG_FILE 来指定配置文件
- 当 RCLONE_CONFIG_FILE 为空时默认使用 rclone.conf
- 在调用 rclone 命令时加入 --config 参数以应用指定配置文件
2025-12-12 16:00:34 +08:00
4af52d5a54 fix(ffmpeg): 修复视频裁剪时间戳问题
- 在 trim 过滤器后添加 setpts 过滤器以重置时间戳
- 修复 skip、tail 和 show 效果的时间戳计算问题
- 确保裁剪后的视频片段时间戳从零开始
- 避免因时间戳不连续导致的播放问题
2025-12-09 18:07:48 +08:00
d7704005b6 feat(entity/ffmpeg.py): 添加grid4效果支持在ffmpeg.py中增加了对grid4效果的支持。该功能允许用户通过指定参数来创建一个四宫格视频布局,每个格子显示不同的视频片段,并且可以设置延迟时间以实现更丰富的视觉效果。具体改动包括:
- 解析`grid4`效果的参数,如果未提供则默认为1。
- 根据提供的或默认的分辨率分割视频流为四个部分。
-为每个分割后的视频流应用缩放和时间延迟处理。
- 创建黑色背景并使用overlay滤镜将处理后的视频流放置于正确的位置上,形成最终的四宫格布局。
2025-09-18 17:01:03 +08:00
f85ccea933 feat(constant): 更新软件版本号至 0.0.6- 在 constant/__init__.py 文件中将 SOFTWARE_VERSION 从 '0.0.5' 修改为 '0.0.6'
- 在 entity/ffmpeg.py 文件中添加了新的视频效果处理逻辑,支持显示特定时长的视频片段
2025-09-18 09:42:57 +08:00
0c7181911e refactor(entity): 优化视频变速和缩放效果处理
-将视频变速实现从 minterpolate 改为使用 setpts,避免 PTS 冲突问题
-简化缩放效果处理逻辑
2025-09-12 18:01:41 +08:00
cf43f6379e feat(ffmpeg): 使用 minterpolate 替代 fps 调整视频速度
- 将视频变速功能从直接调整帧率改为使用 minterpolate 滤镜- 通过设置 fps 和 mi_mode 参数实现平滑的视频慢放效果
- 解决了直接调整帧率可能导致的 PTS 冲突问题
2025-09-12 16:50:53 +08:00
ce8854404b fix(entity): 修复视频慢放时 PTS 冲突问题
- 修改视频变速功能,通过改变帧率实现慢放效果
-避免使用 setpts滤镜导致的 PTS 冲突
- 优化代码结构,提高可读性和可维护性
2025-09-12 14:54:01 +08:00
c36e838d4f fix(entity): 修复缩放效果
fix(entity): 移除 ffmpeg缩放和裁剪滤镜中的 setpts 指令

移除了 ffmpeg 缩放、裁剪和尾部处理滤镜中的 setpts=PTS-STARTPTS指令。这个指令在某些情况下可能导致视频处理出现问题,例如在使用 zoompan 滤镜时。此修改旨在提高视频处理的稳定性和正确性。

fix(entity): 修复缩放效果中中心点计算错误

-针对静态缩放和动态缩放分别修正了中心点计算公式
- 确保在不同缩放因子下,图像中心点位置保持正确

fix(entity): 修复 ffmpeg zoompan 滤镜参数

-将 zoompan 滤镜的参数从 'z=' 改为 'z=',统一参数格式- 此修改解决了 ffmpeg 在处理某些视频时可能遇到的参数解析问题

feat(zoom): 实现视频缩放特效的自定义中心点功能

- 添加代码以解析 posJson 数据,计算并设置缩放中心点
- 使用 zoompan滤镜替代原有的 scale 和 crop滤镜,支持动态缩放
- 优化静态缩放的实现,确保整个视频时长的应用

fix(entity): 修复视频缩放效果的 FFmpeg 命令

- 在 zoom_expr 中添加转义字符,以解决 FFmpeg 解析问题
- 修改缩放和裁剪滤镜的参数,提高视频处理的准确性
2025-09-12 13:20:10 +08:00
1571934943 fix(entity): 修复中心裁剪计算逻辑并优化 JSON 解析
- 在解析 posJson 时添加异常处理,避免无效 JSON 导致程序崩溃
- 修复中心裁剪计算逻辑中的取整问题,确保裁剪位置准确
2025-09-07 01:45:56 +08:00
35693ac83c build(constant): 更新软件版本号
- 将 SOFTWARE_VERSION 从 '0.0.4' 修改为 '0.0.5'
2025-09-06 15:44:24 +08:00
d154f2c74d feat(api): 添加模板属性 zoom_cut
在模板信息中增加了 zoom_cut 属性,用于获取模板的缩放裁剪信息。
2025-09-06 15:43:55 +08:00
bd0c44b17f tail效果 2025-08-12 14:22:26 +08:00
432472fd19 逻辑问题 2025-08-09 10:57:45 +08:00
8f0250df43 通过argv传skip_if_exist默认值 2025-08-08 13:58:01 +08:00
0209c5de3f 单独渲染模板 2025-08-08 13:58:01 +08:00
51e7d21f84 帧跳过、zoom 2025-08-08 13:58:01 +08:00
0770cb361d vsync 2025-08-05 17:43:01 +08:00
2f694da5fd hevc+重下模板 2025-08-05 12:43:27 +08:00
bf912037d1 lut 2025-08-01 17:24:14 +08:00
1119a7b030 onlyIf判断优化 2025-08-01 17:24:14 +08:00
5282e58a10 支持zoom_cut 2025-07-21 10:58:07 +08:00
f7141e5d4e Thread-span支持 2025-07-19 14:07:39 +08:00
f23bcfdd25 调小chunk-size 2025-07-18 13:54:38 +08:00
4b080771f6 允许跳过下载,并发下载,env和版本更新 2025-07-18 13:48:46 +08:00
13a10b9812 修复 2025-06-04 15:38:14 +08:00
3976b72607 优化埋点 2025-05-29 10:05:35 +08:00
04ce423811 优化裁切参数获取,避免同机位多素材出问题 2025-05-29 10:01:21 +08:00
399c3d2dc6 修复裁切 2025-05-27 11:09:10 +08:00
ef3edafcd6 支持rclone多线程上传 2025-05-26 16:17:38 +08:00
6d631d873e onlyif判断 2025-05-05 19:47:54 +08:00
02dd2b72a0 根据模板定义的分辨率进行操作 2025-04-30 18:07:33 +08:00
d8bc3c8595 健康检查时同步信息 2025-04-28 17:59:44 +08:00
5d58198b7e 接口支持查询模板信息,避免使用旧模板 2025-04-28 17:57:34 +08:00
789513a0be 避免勿删模板 2025-04-28 16:32:34 +08:00
b3911839f3 app不使用批量上报 2025-04-28 16:28:27 +08:00
1c0e4ce411 添加dockerfile 2025-04-28 15:45:22 +08:00
1603be9157 尝试传入resolution,不使用scale自适应模板 2025-04-28 15:02:50 +08:00
f139fbccd7 下载模板时trace归组 2025-04-27 14:24:12 +08:00
2fb0f93886 收集ffmpeg异常,流式上传 2025-04-27 13:47:52 +08:00
9537f995a1 支持redirection 2025-04-20 14:32:06 +08:00
ec03f8180e 修改接口 2025-04-20 12:32:33 +08:00
972b6a4e4d 修改接口 2025-04-20 12:09:17 +08:00
3d810e5c5b 添加接口,添加方便测试的方法 2025-04-20 11:50:32 +08:00
a9043361ec 支持通过env获取encoder args 2025-04-20 11:02:23 +08:00
740a3c7a63 完善requirements.txt 2025-04-20 10:54:49 +08:00
450240bd5a 支持同机位多视频片段复用 2025-04-14 14:15:16 +08:00
6b5975d8b9 更换oltp服务器 2025-04-04 15:51:10 +08:00
85c2e7459e 删除sync_center埋点 2025-04-03 11:22:20 +08:00
364ceb29a1 音频淡出 2025-04-01 17:17:57 +08:00
ced0c1ad1e 修改 2025-03-30 18:11:03 +08:00
6e4dbfd843 固定模板支持音乐 2025-03-28 18:08:19 +08:00
09e0f5f3be concat支持annexb 2025-03-28 18:07:57 +08:00
52c2df8b65 删除无用方法 2025-03-28 17:30:28 +08:00
b25ad20ddd 日志 2025-03-28 17:27:24 +08:00
7c6e4a97b2 片段有音频不可以copy 2025-03-28 17:26:49 +08:00
8f0e69c3de overall_speed片段全局变速 2025-03-24 10:30:36 +08:00
b8db0d2b95 metrics调整 2025-03-23 18:36:26 +08:00
6dc7e86e8e 埋点采集部分接口调整 2025-03-18 13:58:36 +08:00
c62f1ab976 upload返回结果 2025-03-11 16:15:51 +08:00
744fe28421 修复居中切割的问题 2025-03-10 15:16:23 +08:00
cf43e6d549 修复amix降低声音的问题,修复reencode_to_annexb不添加音轨的问题 2025-03-10 10:13:30 +08:00
dcf5f5630d 主动判断是否有音频 2025-03-06 23:02:54 +08:00
56bdad7ad1 音轨叠加 2025-03-06 10:34:28 +08:00
94373cee72 cameraShot特效及旋转 2025-03-05 14:57:02 +08:00
4549b0ab44 分辨率和裁切 2025-03-04 17:43:47 +08:00
9d178a3d34 埋点 2025-03-04 12:36:48 +08:00
1f9632761f effect 2025-03-03 14:27:52 +08:00
fff20610a5 profile level指定及修复 2025-02-27 16:48:57 +08:00
67696739f9 切割模式 2025-02-27 14:02:17 +08:00
2ea248c02e 上传文件也弄个超时 2025-02-16 18:15:35 +08:00
358207efdc 定时清理目录下无用文件 2025-02-16 18:15:25 +08:00
94a5e687df 未生成文件时,上报失败 2025-02-08 15:02:36 +08:00
b7d6797901 忽略无用文件 2025-02-05 10:13:33 +08:00
6d9d373032 only if 逻辑 2025-01-23 14:28:51 +08:00
549ee8320a ffprobe 报错后不采用其内容 2025-01-22 16:04:33 +08:00
29bb80f3b9 渲染后再to annexb,使用新逻辑拼接 2025-01-22 14:31:59 +08:00
37 changed files with 3776 additions and 922 deletions

4
.env
View File

@@ -1,4 +0,0 @@
TEMPLATE_DIR=template/
API_ENDPOINT=http://127.0.0.1:8030/task/v1
ACCESS_KEY=TEST_ACCESS_KEY
TEMP_DIR=tmp/

13
.env.example Normal file
View File

@@ -0,0 +1,13 @@
TEMPLATE_DIR=template/
API_ENDPOINT=https://zhentuai.com/task/v1
ACCESS_KEY=TEST_ACCESS_KEY
TEMP_DIR=tmp/
#REDIRECT_TO_URL=https://renderworker-deuvulkhes.cn-shanghai.fcapp.run/
# QSV
ENCODER_ARGS="-c:v h264_qsv -global_quality 28 -look_ahead 1"
# NVENC
#ENCODER_ARGS="-c:v h264_nvenc -cq:v 24 -preset:v p7 -tune:v hq -profile:v high"
# HEVC
#VIDEO_ARGS="-profile:v main
UPLOAD_METHOD="rclone"
RCLONE_REPLACE_MAP="https://oss.zhentuai.com|alioss://frametour-assets,https://frametour-assets.oss-cn-shanghai.aliyuncs.com|alioss://frametour-assets"

8
.gitignore vendored
View File

@@ -6,6 +6,11 @@ __pycache__/
*.so
.Python
build/
dist/
*.mp4
*.ts
rand*.ts
tmp_concat_*.txt
*.egg-info/
*.egg
*.manifest
@@ -26,3 +31,6 @@ target/
.venv
venv/
cython_debug/
.env
.serena
.claude

21
Dockerfile Normal file
View File

@@ -0,0 +1,21 @@
FROM linuxserver/ffmpeg:7.1.1
LABEL authors="Jerry Yan"
RUN sed -i 's@//.*archive.ubuntu.com@//mirrors.ustc.edu.cn@g' /etc/apt/sources.list && \
sed -i 's/security.ubuntu.com/mirrors.ustc.edu.cn/g' /etc/apt/sources.list
RUN apt-get update && \
apt-get install -y --no-install-recommends \
python3-pip \
python3-dev \
python3-setuptools \
python3-wheel \
python3-venv
RUN apt-get clean && \
rm -rf /var/lib/apt/lists/*
COPY . /app/
RUN python3 -m venv /app/venv
RUN /app/venv/bin/python -m pip config set global.index-url https://mirrors.ustc.edu.cn/pypi/simple
RUN /app/venv/bin/python -m pip install -r /app/requirements.txt
WORKDIR /app
ENTRYPOINT ["/app/venv/bin/python", "app.py"]

View File

@@ -1,95 +0,0 @@
import json
import os.path
import time
from entity.ffmpeg import FfmpegTask
import logging
from util import ffmpeg, oss
logger = logging.getLogger('biz/ffmpeg')
def parse_ffmpeg_task(task_info, template_info):
tasks = []
# 中间片段
task_params_str = task_info.get("taskParams", "{}")
task_params = json.loads(task_params_str)
for part in template_info.get("video_parts"):
source = parse_video(part.get('source'), task_params, template_info)
if not source:
logger.warning("no video found for part: " + str(part))
continue
sub_ffmpeg_task = FfmpegTask(source)
sub_ffmpeg_task.annexb = True
sub_ffmpeg_task.frame_rate = template_info.get("frame_rate", 25)
for lut in part.get('filters', []):
sub_ffmpeg_task.add_lut(os.path.join(template_info.get("local_path"), lut))
for audio in part.get('audios', []):
sub_ffmpeg_task.add_audios(os.path.join(template_info.get("local_path"), audio))
for overlay in part.get('overlays', []):
sub_ffmpeg_task.add_overlay(os.path.join(template_info.get("local_path"), overlay))
tasks.append(sub_ffmpeg_task)
output_file = "out_" + str(time.time()) + ".mp4"
task = FfmpegTask(tasks, output_file=output_file)
overall = template_info.get("overall_template")
task.frame_rate = template_info.get("frame_rate", 25)
if overall.get('source', ''):
source = parse_video(overall.get('source'), task_params, template_info)
task.add_inputs(source)
for lut in overall.get('filters', []):
task.add_lut(os.path.join(template_info.get("local_path"), lut))
for audio in overall.get('audios', []):
task.add_audios(os.path.join(template_info.get("local_path"), audio))
for overlay in overall.get('overlays', []):
task.add_overlay(os.path.join(template_info.get("local_path"), overlay))
return task
def parse_video(source, task_params, template_info):
print(source)
if source.startswith('PLACEHOLDER_'):
placeholder_id = source.replace('PLACEHOLDER_', '')
new_sources = task_params.get(placeholder_id, [])
if type(new_sources) is list:
if len(new_sources) == 0:
logger.debug("no video found for placeholder: " + placeholder_id)
return None
else:
# TODO: Random Pick / Policy Pick
new_sources = new_sources[0].get("url")
if new_sources.startswith("http"):
_, source_name = os.path.split(new_sources)
oss.download_from_oss(new_sources, source_name)
return source_name
return new_sources
return os.path.join(template_info.get("local_path"), source)
def start_ffmpeg_task(ffmpeg_task):
for task in ffmpeg_task.analyze_input_render_tasks():
start_ffmpeg_task(task)
ffmpeg_task.correct_task_type()
return ffmpeg.start_render(ffmpeg_task)
def clear_task_tmp_file(ffmpeg_task):
for task in ffmpeg_task.analyze_input_render_tasks():
clear_task_tmp_file(task)
try:
if "template" not in ffmpeg_task.get_output_file():
os.remove(ffmpeg_task.get_output_file())
logger.info("delete tmp file: " + ffmpeg_task.get_output_file())
else:
logger.info("skip delete template file: " + ffmpeg_task.get_output_file())
except OSError:
logger.warning("delete tmp file failed: " + ffmpeg_task.get_output_file())
return False
return True
def probe_video_info(ffmpeg_task):
# 获取视频长度宽度和时长
return ffmpeg.probe_video_info(ffmpeg_task.get_output_file())

View File

View File

@@ -1,24 +0,0 @@
from template import get_template_def
from util import api
def start_task(task_info):
from biz.ffmpeg import parse_ffmpeg_task, start_ffmpeg_task, clear_task_tmp_file, probe_video_info
task_info = api.normalize_task(task_info)
template_info = get_template_def(task_info.get("templateId"))
api.report_task_start(task_info)
ffmpeg_task = parse_ffmpeg_task(task_info, template_info)
result = start_ffmpeg_task(ffmpeg_task)
if not result:
return api.report_task_failed(task_info)
oss_result = api.upload_task_file(task_info, ffmpeg_task)
if not oss_result:
return api.report_task_failed(task_info)
# 获取视频长度宽度和时长
width, height, duration = probe_video_info(ffmpeg_task)
clear_task_tmp_file(ffmpeg_task)
api.report_task_success(task_info, videoInfo={
"width": width,
"height": height,
"duration": duration
})

View File

@@ -1,16 +0,0 @@
import datetime
import logging
from logging.handlers import TimedRotatingFileHandler
from dotenv import load_dotenv
load_dotenv()
logging.basicConfig(level=logging.INFO)
root_logger = logging.getLogger()
rf_handler = TimedRotatingFileHandler('all_log.log', when='midnight')
rf_handler.setFormatter(logging.Formatter("[%(asctime)s][%(name)s]%(levelname)s - %(message)s"))
rf_handler.setLevel(logging.DEBUG)
f_handler = TimedRotatingFileHandler('error.log', when='midnight')
f_handler.setLevel(logging.ERROR)
f_handler.setFormatter(logging.Formatter("[%(asctime)s][%(name)s][:%(lineno)d]%(levelname)s - - %(message)s"))
root_logger.addHandler(rf_handler)
root_logger.addHandler(f_handler)

View File

@@ -1,6 +1,63 @@
SUPPORT_FEATURE = (
'simple_render_algo',
'gpu_accelerate',
'intel_gpu_accelerate',
# -*- coding: utf-8 -*-
"""
常量定义
v2 版本常量,用于 Render Worker v2 API。
"""
# 软件版本
SOFTWARE_VERSION = '2.0.0'
# 支持的任务类型
TASK_TYPES = (
'RENDER_SEGMENT_VIDEO',
'COMPOSE_TRANSITION',
'PREPARE_JOB_AUDIO',
'PACKAGE_SEGMENT_TS',
'FINALIZE_MP4',
)
SOFTWARE_VERSION = '0.0.1'
# 默认能力
DEFAULT_CAPABILITIES = list(TASK_TYPES)
# 支持的转场类型(对应 FFmpeg xfade 参数)
TRANSITION_TYPES = (
'fade', # 淡入淡出(默认)
'dissolve', # 溶解过渡
'wipeleft', # 向左擦除
'wiperight', # 向右擦除
'wipeup', # 向上擦除
'wipedown', # 向下擦除
'slideleft', # 向左滑动
'slideright', # 向右滑动
'slideup', # 向上滑动
'slidedown', # 向下滑动
)
# 统一视频编码参数(来自集成文档)
VIDEO_ENCODE_PARAMS = {
'codec': 'libx264',
'preset': 'medium',
'profile': 'main',
'level': '4.0',
'crf': '23',
'pix_fmt': 'yuv420p',
}
# 统一音频编码参数
AUDIO_ENCODE_PARAMS = {
'codec': 'aac',
'bitrate': '128k',
'sample_rate': '48000',
'channels': '2',
}
# 错误码
ERROR_CODES = {
'E_INPUT_UNAVAILABLE': '素材不可访问',
'E_FFMPEG_FAILED': 'FFmpeg 执行失败',
'E_UPLOAD_FAILED': '上传失败',
'E_SPEC_INVALID': '渲染规格非法',
'E_TIMEOUT': '执行超时',
'E_UNKNOWN': '未知错误',
}

12
core/__init__.py Normal file
View File

@@ -0,0 +1,12 @@
# -*- coding: utf-8 -*-
"""
核心抽象层
包含任务处理器抽象基类等核心接口定义。
"""
from core.handler import TaskHandler
__all__ = [
'TaskHandler',
]

79
core/handler.py Normal file
View File

@@ -0,0 +1,79 @@
# -*- coding: utf-8 -*-
"""
任务处理器抽象基类
定义任务处理器的接口规范。
"""
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from domain.task import Task, TaskType
from domain.result import TaskResult
class TaskHandler(ABC):
"""
任务处理器抽象基类
所有任务处理器都必须继承此类并实现相应方法。
"""
@abstractmethod
def handle(self, task: 'Task') -> 'TaskResult':
"""
处理任务的主方法
Args:
task: 任务实体
Returns:
TaskResult: 任务结果(成功或失败)
"""
pass
@abstractmethod
def get_supported_type(self) -> 'TaskType':
"""
返回此处理器支持的任务类型
Returns:
TaskType: 支持的任务类型枚举值
"""
pass
def before_handle(self, task: 'Task') -> None:
"""
处理前钩子(可选重写)
用于任务执行前的准备工作,如日志记录、资源检查等。
Args:
task: 任务实体
"""
pass
def after_handle(self, task: 'Task', result: 'TaskResult') -> None:
"""
处理后钩子(可选重写)
用于任务执行后的清理工作,如资源释放、统计记录等。
Args:
task: 任务实体
result: 任务结果
"""
pass
def validate_task(self, task: 'Task') -> bool:
"""
验证任务是否有效(可选重写)
Args:
task: 任务实体
Returns:
bool: 任务是否有效
"""
return True

24
domain/__init__.py Normal file
View File

@@ -0,0 +1,24 @@
# -*- coding: utf-8 -*-
"""
领域模型层
包含任务实体、结果、配置等核心数据结构。
"""
from domain.task import Task, TaskType, TaskStatus, RenderSpec, OutputSpec, AudioSpec, AudioProfile
from domain.result import TaskResult, ErrorCode, RETRY_CONFIG
from domain.config import WorkerConfig
__all__ = [
'Task',
'TaskType',
'TaskStatus',
'RenderSpec',
'OutputSpec',
'AudioSpec',
'AudioProfile',
'TaskResult',
'ErrorCode',
'RETRY_CONFIG',
'WorkerConfig',
]

122
domain/config.py Normal file
View File

@@ -0,0 +1,122 @@
# -*- coding: utf-8 -*-
"""
Worker 配置模型
定义 Worker 运行时的配置参数。
"""
import os
from dataclasses import dataclass, field
from typing import List, Optional
# 默认支持的任务类型
DEFAULT_CAPABILITIES = [
"RENDER_SEGMENT_VIDEO",
"PREPARE_JOB_AUDIO",
"PACKAGE_SEGMENT_TS",
"FINALIZE_MP4"
]
@dataclass
class WorkerConfig:
"""
Worker 配置
包含 Worker 运行所需的所有配置参数。
"""
# API 配置
api_endpoint: str
access_key: str
worker_id: str
# 并发控制
max_concurrency: int = 4
# 心跳配置
heartbeat_interval: int = 5 # 秒
# 租约配置
lease_extension_threshold: int = 60 # 秒,提前多久续期
lease_extension_duration: int = 300 # 秒,每次续期时长
# 目录配置
temp_dir: str = "/tmp/render_worker"
# 能力配置
capabilities: List[str] = field(default_factory=lambda: DEFAULT_CAPABILITIES.copy())
# FFmpeg 配置
ffmpeg_timeout: int = 3600 # 秒,FFmpeg 执行超时
# 下载/上传配置
download_timeout: int = 300 # 秒,下载超时
upload_timeout: int = 600 # 秒,上传超时
@classmethod
def from_env(cls) -> 'WorkerConfig':
"""从环境变量创建配置"""
# API 端点,优先使用 V2 版本
api_endpoint = os.getenv('API_ENDPOINT_V2') or os.getenv('API_ENDPOINT', '')
if not api_endpoint:
raise ValueError("API_ENDPOINT_V2 or API_ENDPOINT environment variable is required")
# Access Key
access_key = os.getenv('ACCESS_KEY', '')
if not access_key:
raise ValueError("ACCESS_KEY environment variable is required")
# Worker ID
worker_id = os.getenv('WORKER_ID', '100001')
# 并发数
max_concurrency = int(os.getenv('MAX_CONCURRENCY', '4'))
# 心跳间隔
heartbeat_interval = int(os.getenv('HEARTBEAT_INTERVAL', '5'))
# 租约配置
lease_extension_threshold = int(os.getenv('LEASE_EXTENSION_THRESHOLD', '60'))
lease_extension_duration = int(os.getenv('LEASE_EXTENSION_DURATION', '300'))
# 临时目录
temp_dir = os.getenv('TEMP_DIR', os.getenv('TEMP', '/tmp/render_worker'))
# 能力列表
capabilities_str = os.getenv('CAPABILITIES', '')
if capabilities_str:
capabilities = [c.strip() for c in capabilities_str.split(',') if c.strip()]
else:
capabilities = DEFAULT_CAPABILITIES.copy()
# FFmpeg 超时
ffmpeg_timeout = int(os.getenv('FFMPEG_TIMEOUT', '3600'))
# 下载/上传超时
download_timeout = int(os.getenv('DOWNLOAD_TIMEOUT', '300'))
upload_timeout = int(os.getenv('UPLOAD_TIMEOUT', '600'))
return cls(
api_endpoint=api_endpoint,
access_key=access_key,
worker_id=worker_id,
max_concurrency=max_concurrency,
heartbeat_interval=heartbeat_interval,
lease_extension_threshold=lease_extension_threshold,
lease_extension_duration=lease_extension_duration,
temp_dir=temp_dir,
capabilities=capabilities,
ffmpeg_timeout=ffmpeg_timeout,
download_timeout=download_timeout,
upload_timeout=upload_timeout
)
def get_work_dir_path(self, task_id: str) -> str:
"""获取任务工作目录路径"""
return os.path.join(self.temp_dir, f"task_{task_id}")
def ensure_temp_dir(self) -> None:
"""确保临时目录存在"""
os.makedirs(self.temp_dir, exist_ok=True)

105
domain/result.py Normal file
View File

@@ -0,0 +1,105 @@
# -*- coding: utf-8 -*-
"""
任务结果模型
定义错误码、重试配置、任务结果等数据结构。
"""
from enum import Enum
from dataclasses import dataclass
from typing import Optional, Dict, Any, List
class ErrorCode(Enum):
"""错误码枚举"""
E_INPUT_UNAVAILABLE = "E_INPUT_UNAVAILABLE" # 素材不可访问/404
E_FFMPEG_FAILED = "E_FFMPEG_FAILED" # FFmpeg 执行失败
E_UPLOAD_FAILED = "E_UPLOAD_FAILED" # 上传失败
E_SPEC_INVALID = "E_SPEC_INVALID" # renderSpec 非法
E_TIMEOUT = "E_TIMEOUT" # 执行超时
E_UNKNOWN = "E_UNKNOWN" # 未知错误
# 重试配置
RETRY_CONFIG: Dict[ErrorCode, Dict[str, Any]] = {
ErrorCode.E_INPUT_UNAVAILABLE: {
'max_retries': 3,
'backoff': [1, 2, 5] # 重试间隔(秒)
},
ErrorCode.E_FFMPEG_FAILED: {
'max_retries': 2,
'backoff': [1, 3]
},
ErrorCode.E_UPLOAD_FAILED: {
'max_retries': 3,
'backoff': [1, 2, 5]
},
ErrorCode.E_SPEC_INVALID: {
'max_retries': 0, # 不重试
'backoff': []
},
ErrorCode.E_TIMEOUT: {
'max_retries': 2,
'backoff': [5, 10]
},
ErrorCode.E_UNKNOWN: {
'max_retries': 1,
'backoff': [2]
},
}
@dataclass
class TaskResult:
"""
任务结果
封装任务执行的结果,包括成功数据或失败信息。
"""
success: bool
data: Optional[Dict[str, Any]] = None
error_code: Optional[ErrorCode] = None
error_message: Optional[str] = None
@classmethod
def ok(cls, data: Dict[str, Any]) -> 'TaskResult':
"""创建成功结果"""
return cls(success=True, data=data)
@classmethod
def fail(cls, error_code: ErrorCode, error_message: str) -> 'TaskResult':
"""创建失败结果"""
return cls(
success=False,
error_code=error_code,
error_message=error_message
)
def to_report_dict(self) -> Dict[str, Any]:
"""
转换为上报格式
用于 API 上报时的数据格式转换。
"""
if self.success:
return {'result': self.data}
else:
return {
'errorCode': self.error_code.value if self.error_code else 'E_UNKNOWN',
'errorMessage': self.error_message or 'Unknown error'
}
def can_retry(self) -> bool:
"""是否可以重试"""
if self.success:
return False
if not self.error_code:
return True
config = RETRY_CONFIG.get(self.error_code, {})
return config.get('max_retries', 0) > 0
def get_retry_config(self) -> Dict[str, Any]:
"""获取重试配置"""
if not self.error_code:
return {'max_retries': 1, 'backoff': [2]}
return RETRY_CONFIG.get(self.error_code, {'max_retries': 1, 'backoff': [2]})

363
domain/task.py Normal file
View File

@@ -0,0 +1,363 @@
# -*- coding: utf-8 -*-
"""
任务领域模型
定义任务类型、任务实体、渲染规格、输出规格等数据结构。
"""
from enum import Enum
from dataclasses import dataclass, field
from typing import Dict, Any, Optional, List
from datetime import datetime
class TaskType(Enum):
"""任务类型枚举"""
RENDER_SEGMENT_VIDEO = "RENDER_SEGMENT_VIDEO" # 渲染视频片段
COMPOSE_TRANSITION = "COMPOSE_TRANSITION" # 合成转场效果
PREPARE_JOB_AUDIO = "PREPARE_JOB_AUDIO" # 生成全局音频
PACKAGE_SEGMENT_TS = "PACKAGE_SEGMENT_TS" # 封装 TS 分片
FINALIZE_MP4 = "FINALIZE_MP4" # 产出最终 MP4
# 支持的转场类型(对应 FFmpeg xfade 参数)
TRANSITION_TYPES = {
'fade': 'fade', # 淡入淡出(默认)
'dissolve': 'dissolve', # 溶解过渡
'wipeleft': 'wipeleft', # 向左擦除
'wiperight': 'wiperight', # 向右擦除
'wipeup': 'wipeup', # 向上擦除
'wipedown': 'wipedown', # 向下擦除
'slideleft': 'slideleft', # 向左滑动
'slideright': 'slideright', # 向右滑动
'slideup': 'slideup', # 向上滑动
'slidedown': 'slidedown', # 向下滑动
}
class TaskStatus(Enum):
"""任务状态枚举"""
PENDING = "PENDING"
RUNNING = "RUNNING"
SUCCESS = "SUCCESS"
FAILED = "FAILED"
@dataclass
class TransitionConfig:
"""
转场配置
用于 RENDER_SEGMENT_VIDEO 任务的入场/出场转场配置。
"""
type: str = "fade" # 转场类型
duration_ms: int = 500 # 转场时长(毫秒)
@classmethod
def from_dict(cls, data: Optional[Dict]) -> Optional['TransitionConfig']:
"""从字典创建 TransitionConfig"""
if not data:
return None
trans_type = data.get('type', 'fade')
# 验证转场类型是否支持
if trans_type not in TRANSITION_TYPES:
trans_type = 'fade'
return cls(
type=trans_type,
duration_ms=int(data.get('durationMs', 500))
)
def get_overlap_ms(self) -> int:
"""获取 overlap 时长(单边,为转场时长的一半)"""
return self.duration_ms // 2
def get_ffmpeg_transition(self) -> str:
"""获取 FFmpeg xfade 参数"""
return TRANSITION_TYPES.get(self.type, 'fade')
@dataclass
class RenderSpec:
"""
渲染规格
用于 RENDER_SEGMENT_VIDEO 任务,定义视频渲染参数。
"""
crop_enable: bool = False
crop_size: Optional[str] = None
speed: str = "1.0"
lut_url: Optional[str] = None
overlay_url: Optional[str] = None
effects: Optional[str] = None
zoom_cut: bool = False
video_crop: Optional[str] = None
face_pos: Optional[str] = None
transitions: Optional[str] = None
# 转场配置(PRD v2 新增)
transition_in: Optional[TransitionConfig] = None # 入场转场
transition_out: Optional[TransitionConfig] = None # 出场转场
@classmethod
def from_dict(cls, data: Optional[Dict]) -> 'RenderSpec':
"""从字典创建 RenderSpec"""
if not data:
return cls()
return cls(
crop_enable=data.get('cropEnable', False),
crop_size=data.get('cropSize'),
speed=str(data.get('speed', '1.0')),
lut_url=data.get('lutUrl'),
overlay_url=data.get('overlayUrl'),
effects=data.get('effects'),
zoom_cut=data.get('zoomCut', False),
video_crop=data.get('videoCrop'),
face_pos=data.get('facePos'),
transitions=data.get('transitions'),
transition_in=TransitionConfig.from_dict(data.get('transitionIn')),
transition_out=TransitionConfig.from_dict(data.get('transitionOut'))
)
def has_transition_in(self) -> bool:
"""是否有入场转场"""
return self.transition_in is not None and self.transition_in.duration_ms > 0
def has_transition_out(self) -> bool:
"""是否有出场转场"""
return self.transition_out is not None and self.transition_out.duration_ms > 0
def get_overlap_head_ms(self) -> int:
"""获取头部 overlap 时长(毫秒)"""
if self.has_transition_in():
return self.transition_in.get_overlap_ms()
return 0
def get_overlap_tail_ms(self) -> int:
"""获取尾部 overlap 时长(毫秒)"""
if self.has_transition_out():
return self.transition_out.get_overlap_ms()
return 0
@dataclass
class OutputSpec:
"""
输出规格
用于 RENDER_SEGMENT_VIDEO 任务,定义视频输出参数。
"""
width: int = 1080
height: int = 1920
fps: int = 30
bitrate: int = 4000000
codec: str = "h264"
@classmethod
def from_dict(cls, data: Optional[Dict]) -> 'OutputSpec':
"""从字典创建 OutputSpec"""
if not data:
return cls()
return cls(
width=data.get('width', 1080),
height=data.get('height', 1920),
fps=data.get('fps', 30),
bitrate=data.get('bitrate', 4000000),
codec=data.get('codec', 'h264')
)
@dataclass
class AudioSpec:
"""
音频规格
用于 PREPARE_JOB_AUDIO 任务中的片段叠加音效。
"""
audio_url: Optional[str] = None
volume: float = 1.0
fade_in_ms: int = 10
fade_out_ms: int = 10
start_ms: int = 0
delay_ms: int = 0
loop_enable: bool = False
@classmethod
def from_dict(cls, data: Optional[Dict]) -> Optional['AudioSpec']:
"""从字典创建 AudioSpec"""
if not data:
return None
return cls(
audio_url=data.get('audioUrl'),
volume=float(data.get('volume', 1.0)),
fade_in_ms=int(data.get('fadeInMs', 10)),
fade_out_ms=int(data.get('fadeOutMs', 10)),
start_ms=int(data.get('startMs', 0)),
delay_ms=int(data.get('delayMs', 0)),
loop_enable=data.get('loopEnable', False)
)
@dataclass
class AudioProfile:
"""
音频配置
用于 PREPARE_JOB_AUDIO 任务的全局音频参数。
"""
sample_rate: int = 48000
channels: int = 2
codec: str = "aac"
@classmethod
def from_dict(cls, data: Optional[Dict]) -> 'AudioProfile':
"""从字典创建 AudioProfile"""
if not data:
return cls()
return cls(
sample_rate=data.get('sampleRate', 48000),
channels=data.get('channels', 2),
codec=data.get('codec', 'aac')
)
@dataclass
class Task:
"""
任务实体
表示一个待执行的渲染任务。
"""
task_id: str
task_type: TaskType
priority: int
lease_expire_time: datetime
payload: Dict[str, Any]
@classmethod
def from_dict(cls, data: Dict) -> 'Task':
"""从 API 响应字典创建 Task"""
lease_time_str = data.get('leaseExpireTime', '')
# 解析 ISO 8601 时间格式
if lease_time_str:
if lease_time_str.endswith('Z'):
lease_time_str = lease_time_str[:-1] + '+00:00'
try:
lease_expire_time = datetime.fromisoformat(lease_time_str)
except ValueError:
# 解析失败时使用当前时间 + 5分钟
lease_expire_time = datetime.now()
else:
lease_expire_time = datetime.now()
return cls(
task_id=str(data['taskId']),
task_type=TaskType(data['taskType']),
priority=data.get('priority', 0),
lease_expire_time=lease_expire_time,
payload=data.get('payload', {})
)
def get_job_id(self) -> str:
"""获取作业 ID"""
return str(self.payload.get('jobId', ''))
def get_segment_id(self) -> Optional[str]:
"""获取片段 ID(如果有)"""
segment_id = self.payload.get('segmentId')
return str(segment_id) if segment_id else None
def get_plan_segment_index(self) -> int:
"""获取计划片段索引"""
return int(self.payload.get('planSegmentIndex', 0))
def get_duration_ms(self) -> int:
"""获取时长(毫秒)"""
return int(self.payload.get('durationMs', 5000))
def get_material_url(self) -> Optional[str]:
"""获取素材 URL"""
return self.payload.get('boundMaterialUrl') or self.payload.get('sourceRef')
def get_render_spec(self) -> RenderSpec:
"""获取渲染规格"""
return RenderSpec.from_dict(self.payload.get('renderSpec'))
def get_output_spec(self) -> OutputSpec:
"""获取输出规格"""
return OutputSpec.from_dict(self.payload.get('output'))
def get_bgm_url(self) -> Optional[str]:
"""获取 BGM URL"""
return self.payload.get('bgmUrl')
def get_total_duration_ms(self) -> int:
"""获取总时长(毫秒)"""
return int(self.payload.get('totalDurationMs', 0))
def get_segments(self) -> List[Dict]:
"""获取片段列表"""
return self.payload.get('segments', [])
def get_audio_profile(self) -> AudioProfile:
"""获取音频配置"""
return AudioProfile.from_dict(self.payload.get('audioProfile'))
def get_video_url(self) -> Optional[str]:
"""获取视频 URL(用于 PACKAGE_SEGMENT_TS)"""
return self.payload.get('videoUrl')
def get_audio_url(self) -> Optional[str]:
"""获取音频 URL(用于 PACKAGE_SEGMENT_TS)"""
return self.payload.get('audioUrl')
def get_start_time_ms(self) -> int:
"""获取开始时间(毫秒)"""
return int(self.payload.get('startTimeMs', 0))
def get_m3u8_url(self) -> Optional[str]:
"""获取 m3u8 URL(用于 FINALIZE_MP4)"""
return self.payload.get('m3u8Url')
def get_ts_list(self) -> List[str]:
"""获取 TS 列表(用于 FINALIZE_MP4)"""
return self.payload.get('tsList', [])
# ========== COMPOSE_TRANSITION 相关方法 ==========
def get_transition_id(self) -> Optional[str]:
"""获取转场 ID(用于 COMPOSE_TRANSITION)"""
return self.payload.get('transitionId')
def get_prev_segment(self) -> Optional[Dict]:
"""获取前一个片段信息(用于 COMPOSE_TRANSITION)"""
return self.payload.get('prevSegment')
def get_next_segment(self) -> Optional[Dict]:
"""获取后一个片段信息(用于 COMPOSE_TRANSITION)"""
return self.payload.get('nextSegment')
def get_transition_config(self) -> Optional[TransitionConfig]:
"""获取转场配置(用于 COMPOSE_TRANSITION)"""
return TransitionConfig.from_dict(self.payload.get('transition'))
# ========== PACKAGE_SEGMENT_TS 转场相关方法 ==========
def is_transition_segment(self) -> bool:
"""是否为转场分片(用于 PACKAGE_SEGMENT_TS)"""
return self.payload.get('isTransitionSegment', False)
def should_trim_head(self) -> bool:
"""是否需要裁剪头部 overlap(用于 PACKAGE_SEGMENT_TS)"""
return self.payload.get('trimHead', False)
def should_trim_tail(self) -> bool:
"""是否需要裁剪尾部 overlap(用于 PACKAGE_SEGMENT_TS)"""
return self.payload.get('trimTail', False)
def get_trim_head_ms(self) -> int:
"""获取头部裁剪时长(毫秒)"""
return int(self.payload.get('trimHeadMs', 0))
def get_trim_tail_ms(self) -> int:
"""获取尾部裁剪时长(毫秒)"""
return int(self.payload.get('trimTailMs', 0))

View File

@@ -1,283 +0,0 @@
import time
import uuid
class FfmpegTask(object):
def __init__(self, input_file, task_type='copy', output_file=''):
self.annexb = False
if type(input_file) is str:
if input_file.endswith(".ts"):
self.annexb = True
self.input_file = [input_file]
elif type(input_file) is list:
self.input_file = input_file
else:
self.input_file = []
self.task_type = task_type
self.output_file = output_file
self.mute = True
self.speed = 1
self.frame_rate = 25
self.subtitles = []
self.luts = []
self.audios = []
self.overlays = []
def __repr__(self):
_str = f'FfmpegTask(input_file={self.input_file}, task_type={self.task_type}'
if len(self.luts) > 0:
_str += f', luts={self.luts}'
if len(self.audios) > 0:
_str += f', audios={self.audios}'
if len(self.overlays) > 0:
_str += f', overlays={self.overlays}'
if self.annexb:
_str += f', annexb={self.annexb}'
if self.mute:
_str += f', mute={self.mute}'
return _str + ')'
def analyze_input_render_tasks(self):
for i in self.input_file:
if type(i) is str:
continue
elif isinstance(i, FfmpegTask):
if i.need_run():
yield i
def need_run(self):
"""
判断是否需要运行
:rtype: bool
:return:
"""
if self.annexb:
return True
# TODO: copy from url
return not self.check_can_copy()
def add_inputs(self, *inputs):
self.input_file.extend(inputs)
def add_overlay(self, *overlays):
for overlay in overlays:
if str(overlay).endswith('.ass'):
self.subtitles.append(overlay)
else:
self.overlays.append(overlay)
self.correct_task_type()
def add_audios(self, *audios):
self.audios.extend(audios)
self.correct_task_type()
self.check_audio_track()
def add_lut(self, *luts):
self.luts.extend(luts)
self.correct_task_type()
def get_output_file(self):
if self.task_type == 'copy':
return self.input_file[0]
if self.output_file == '':
self.set_output_file()
return self.output_file
def correct_task_type(self):
if self.check_can_copy():
self.task_type = 'copy'
elif self.check_can_concat():
self.task_type = 'concat'
else:
self.task_type = 'encode'
def check_can_concat(self):
if len(self.luts) > 0:
return False
if len(self.overlays) > 0:
return False
if len(self.subtitles) > 0:
return False
if self.speed != 1:
return False
return True
def check_can_copy(self):
if len(self.luts) > 0:
return False
if len(self.overlays) > 0:
return False
if len(self.subtitles) > 0:
return False
if self.speed != 1:
return False
if len(self.audios) > 1:
return False
if len(self.input_file) > 1:
return False
return True
def check_audio_track(self):
if len(self.audios) > 0:
self.mute = False
def get_ffmpeg_args(self):
args = ['-y', '-hide_banner']
if self.task_type == 'encode':
# args += ('-hwaccel', 'qsv', '-hwaccel_output_format', 'qsv')
input_args = []
filter_args = []
output_args = ["-shortest", "-c:v", "h264_qsv", "-global_quality", "28", "-look_ahead", "1"]
if self.annexb:
output_args.append("-bsf:v")
output_args.append("h264_mp4toannexb")
video_output_str = "[0:v]"
audio_output_str = "[0:v]"
video_input_count = 0
audio_input_count = 0
for input_file in self.input_file:
input_args.append("-i")
if type(input_file) is str:
input_args.append(input_file)
elif isinstance(input_file, FfmpegTask):
input_args.append(input_file.get_output_file())
for lut in self.luts:
filter_args.append("[0:v]lut3d=file=" + lut + "[0:v]")
for overlay in self.overlays:
input_index = input_args.count("-i")
input_args.append("-i")
input_args.append(overlay)
filter_args.append(f"{video_output_str}[{input_index}:v]scale=rw:rh[v]")
filter_args.append(f"[v][{input_index}:v]overlay=1:eof_action=endall[v]")
video_output_str = "[v]"
for subtitle in self.subtitles:
filter_args.append(f"{video_output_str}ass={subtitle}[v]")
video_output_str = "[v]"
output_args.append("-map")
output_args.append(video_output_str)
output_args.append("-r")
output_args.append(f"{self.frame_rate}")
if self.mute:
output_args.append("-an")
else:
input_index = 0
for audio in self.audios:
input_index = input_args.count("-i")
input_args.append("-i")
input_args.append(audio.replace("\\", "/"))
if audio_input_count > 0:
filter_args.append(f"{audio_output_str}[{input_index}:a]amix[a]")
audio_output_str = "[a]"
else:
audio_output_str = f"[{input_index}:a]"
audio_input_count += 1
if audio_input_count == 1:
audio_output_str = f"{input_index}"
output_args.append(f"-map")
output_args.append(audio_output_str)
return args + input_args + ["-filter_complex", ";".join(filter_args)] + output_args + [self.get_output_file()]
elif self.task_type == 'concat':
# 无法通过 annexb 合并的
input_args = []
output_args = ["-shortest"]
if self.check_annexb() and len(self.audios) <= 1:
# output_args
if len(self.audios) > 0:
input_args.append("-an")
_tmp_file = "tmp_concat_"+str(time.time())+".txt"
with open(_tmp_file, "w", encoding="utf-8") as f:
for input_file in self.input_file:
if type(input_file) is str:
f.write("file '"+input_file+"'\n")
elif isinstance(input_file, FfmpegTask):
f.write("file '" + input_file.get_output_file() + "'\n")
input_args += ("-f", "concat", "-safe", "0", "-i", _tmp_file)
output_args.append("-c:v")
output_args.append("copy")
if len(self.audios) > 0:
input_args.append("-i")
input_args.append(self.audios[0])
output_args.append("-c:a")
output_args.append("copy")
output_args.append("-f")
output_args.append("mp4")
output_args += ("-c:v", "h264_qsv", "-r", "25", "-global_quality", "28", "-look_ahead", "1")
return args + input_args + output_args + [self.get_output_file()]
output_args += ("-c:v", "h264_qsv", "-r", "25", "-global_quality", "28", "-look_ahead", "1")
filter_args = []
video_output_str = "[0:v]"
audio_output_str = "[0:a]"
video_input_count = 0
audio_input_count = 0
for input_file in self.input_file:
input_index = input_args.count("-i")
input_args.append("-i")
if type(input_file) is str:
input_args.append(input_file.replace("\\", "/"))
elif isinstance(input_file, FfmpegTask):
input_args.append(input_file.get_output_file().replace("\\", "/"))
if video_input_count > 0:
filter_args.append(f"{video_output_str}[{input_index}:v]concat=n=2:v=1:a=0[v]")
video_output_str = "[v]"
else:
video_output_str = f"[{input_index}:v]"
video_input_count += 1
output_args.append("-map")
output_args.append(video_output_str)
if self.mute:
output_args.append("-an")
else:
input_index = 0
for audio in self.audios:
input_index = input_args.count("-i")
input_args.append("-i")
input_args.append(audio.replace("\\", "/"))
if audio_input_count > 0:
filter_args.append(f"{audio_output_str}[{input_index}:a]amix[a]")
audio_output_str = "[a]"
else:
audio_output_str = f"[{input_index}:a]"
audio_input_count += 1
if audio_input_count == 1:
audio_output_str = f"{input_index}"
output_args.append(f"-map")
output_args.append(audio_output_str)
return args + input_args + ["-filter_complex", ";".join(filter_args)] + output_args + [self.get_output_file()]
elif self.task_type == 'copy':
if len(self.input_file) == 1:
if type(self.input_file[0]) is str:
if self.input_file[0] == self.get_output_file():
return []
return args + ["-i", self.input_file[0]] + ["-c", "copy", self.get_output_file()]
def set_output_file(self, file=None):
if file is None:
if self.output_file == '':
if self.annexb:
self.output_file = "rand_" + str(uuid.uuid4()) + ".ts"
else:
self.output_file = "rand_" + str(uuid.uuid4()) + ".mp4"
else:
if isinstance(file, FfmpegTask):
if file == self:
return
self.output_file = file.get_output_file()
if type(file) is str:
self.output_file = file
def check_annexb(self):
for input_file in self.input_file:
if type(input_file) is str:
if self.task_type == 'encode':
return self.annexb
elif self.task_type == 'concat':
return False
elif self.task_type == 'copy':
return self.annexb
else:
return False
elif isinstance(input_file, FfmpegTask):
if not input_file.check_annexb():
return False
return True

22
handlers/__init__.py Normal file
View File

@@ -0,0 +1,22 @@
# -*- coding: utf-8 -*-
"""
任务处理器层
包含各种任务类型的具体处理器实现。
"""
from handlers.base import BaseHandler
from handlers.render_video import RenderSegmentVideoHandler
from handlers.compose_transition import ComposeTransitionHandler
from handlers.prepare_audio import PrepareJobAudioHandler
from handlers.package_ts import PackageSegmentTsHandler
from handlers.finalize_mp4 import FinalizeMp4Handler
__all__ = [
'BaseHandler',
'RenderSegmentVideoHandler',
'ComposeTransitionHandler',
'PrepareJobAudioHandler',
'PackageSegmentTsHandler',
'FinalizeMp4Handler',
]

396
handlers/base.py Normal file
View File

@@ -0,0 +1,396 @@
# -*- coding: utf-8 -*-
"""
任务处理器基类
提供所有处理器共用的基础功能。
"""
import os
import json
import logging
import shutil
import tempfile
import subprocess
from abc import ABC
from typing import Optional, List, Dict, Any, Tuple, TYPE_CHECKING
from core.handler import TaskHandler
from domain.task import Task
from domain.result import TaskResult, ErrorCode
from domain.config import WorkerConfig
from services import storage
if TYPE_CHECKING:
from services.api_client import APIClientV2
logger = logging.getLogger(__name__)
# v2 统一视频编码参数(来自集成文档)
VIDEO_ENCODE_ARGS = [
'-c:v', 'libx264',
'-preset', 'medium',
'-profile:v', 'main',
'-level', '4.0',
'-crf', '23',
'-pix_fmt', 'yuv420p',
]
# v2 统一音频编码参数
AUDIO_ENCODE_ARGS = [
'-c:a', 'aac',
'-b:a', '128k',
'-ar', '48000',
'-ac', '2',
]
def subprocess_args(include_stdout: bool = True) -> Dict[str, Any]:
"""
创建跨平台的 subprocess 参数
在 Windows 上使用 Pyinstaller --noconsole 打包时,需要特殊处理以避免弹出命令行窗口。
Args:
include_stdout: 是否包含 stdout 捕获
Returns:
subprocess.run 使用的参数字典
"""
ret: Dict[str, Any] = {}
# Windows 特殊处理
if hasattr(subprocess, 'STARTUPINFO'):
si = subprocess.STARTUPINFO()
si.dwFlags |= subprocess.STARTF_USESHOWWINDOW
ret['startupinfo'] = si
ret['env'] = os.environ
# 重定向 stdin 避免 "handle is invalid" 错误
ret['stdin'] = subprocess.PIPE
if include_stdout:
ret['stdout'] = subprocess.PIPE
return ret
def probe_video_info(video_file: str) -> Tuple[int, int, float]:
"""
探测视频信息(宽度、高度、时长)
Args:
video_file: 视频文件路径
Returns:
(width, height, duration) 元组,失败返回 (0, 0, 0)
"""
try:
result = subprocess.run(
[
'ffprobe', '-v', 'error',
'-select_streams', 'v:0',
'-show_entries', 'stream=width,height:format=duration',
'-of', 'csv=s=x:p=0',
video_file
],
capture_output=True,
timeout=30,
**subprocess_args(False)
)
if result.returncode != 0:
logger.warning(f"ffprobe failed for {video_file}")
return 0, 0, 0
output = result.stdout.decode('utf-8').strip()
if not output:
return 0, 0, 0
lines = output.split('\n')
if len(lines) >= 2:
wh = lines[0].strip()
duration_str = lines[1].strip()
width, height = wh.split('x')
return int(width), int(height), float(duration_str)
return 0, 0, 0
except Exception as e:
logger.warning(f"probe_video_info error: {e}")
return 0, 0, 0
def probe_duration_json(file_path: str) -> Optional[float]:
"""
使用 ffprobe JSON 输出探测媒体时长
Args:
file_path: 媒体文件路径
Returns:
时长(秒),失败返回 None
"""
try:
result = subprocess.run(
[
'ffprobe', '-v', 'error',
'-show_entries', 'format=duration',
'-of', 'json',
file_path
],
capture_output=True,
timeout=30,
**subprocess_args(False)
)
if result.returncode != 0:
return None
data = json.loads(result.stdout.decode('utf-8'))
duration = data.get('format', {}).get('duration')
return float(duration) if duration else None
except Exception as e:
logger.warning(f"probe_duration_json error: {e}")
return None
class BaseHandler(TaskHandler, ABC):
"""
任务处理器基类
提供所有处理器共用的基础功能,包括:
- 临时目录管理
- 文件下载/上传
- FFmpeg 命令执行
- 日志记录
"""
def __init__(self, config: WorkerConfig, api_client: 'APIClientV2'):
"""
初始化处理器
Args:
config: Worker 配置
api_client: API 客户端
"""
self.config = config
self.api_client = api_client
def before_handle(self, task: Task) -> None:
"""处理前钩子"""
logger.debug(f"[task:{task.task_id}] Before handle: {task.task_type.value}")
def after_handle(self, task: Task, result: TaskResult) -> None:
"""处理后钩子"""
status = "success" if result.success else "failed"
logger.debug(f"[task:{task.task_id}] After handle: {status}")
def create_work_dir(self, task_id: str = None) -> str:
"""
创建临时工作目录
Args:
task_id: 任务 ID(用于目录命名)
Returns:
工作目录路径
"""
# 确保临时根目录存在
os.makedirs(self.config.temp_dir, exist_ok=True)
# 创建唯一的工作目录
prefix = f"task_{task_id}_" if task_id else "task_"
work_dir = tempfile.mkdtemp(dir=self.config.temp_dir, prefix=prefix)
logger.debug(f"Created work directory: {work_dir}")
return work_dir
def cleanup_work_dir(self, work_dir: str) -> None:
"""
清理临时工作目录
Args:
work_dir: 工作目录路径
"""
if not work_dir or not os.path.exists(work_dir):
return
try:
shutil.rmtree(work_dir)
logger.debug(f"Cleaned up work directory: {work_dir}")
except Exception as e:
logger.warning(f"Failed to cleanup work directory {work_dir}: {e}")
def download_file(self, url: str, dest: str, timeout: int = None) -> bool:
"""
下载文件
Args:
url: 文件 URL
dest: 目标路径
timeout: 超时时间(秒)
Returns:
是否成功
"""
if timeout is None:
timeout = self.config.download_timeout
try:
result = storage.download_file(url, dest, timeout=timeout)
if result:
file_size = os.path.getsize(dest) if os.path.exists(dest) else 0
logger.debug(f"Downloaded: {url} -> {dest} ({file_size} bytes)")
return result
except Exception as e:
logger.error(f"Download failed: {url} -> {e}")
return False
def upload_file(
self,
task_id: str,
file_type: str,
file_path: str,
file_name: str = None
) -> Optional[str]:
"""
上传文件并返回访问 URL
Args:
task_id: 任务 ID
file_type: 文件类型(video/audio/ts/mp4)
file_path: 本地文件路径
file_name: 文件名(可选)
Returns:
访问 URL,失败返回 None
"""
# 获取上传 URL
upload_info = self.api_client.get_upload_url(task_id, file_type, file_name)
if not upload_info:
logger.error(f"[task:{task_id}] Failed to get upload URL")
return None
upload_url = upload_info.get('uploadUrl')
access_url = upload_info.get('accessUrl')
if not upload_url:
logger.error(f"[task:{task_id}] Invalid upload URL response")
return None
# 上传文件
try:
result = storage.upload_file(upload_url, file_path, timeout=self.config.upload_timeout)
if result:
file_size = os.path.getsize(file_path)
logger.info(f"[task:{task_id}] Uploaded: {file_path} ({file_size} bytes)")
return access_url
else:
logger.error(f"[task:{task_id}] Upload failed: {file_path}")
return None
except Exception as e:
logger.error(f"[task:{task_id}] Upload error: {e}")
return None
def run_ffmpeg(
self,
cmd: List[str],
task_id: str,
timeout: int = None
) -> bool:
"""
执行 FFmpeg 命令
Args:
cmd: FFmpeg 命令参数列表
task_id: 任务 ID(用于日志)
timeout: 超时时间(秒)
Returns:
是否成功
"""
if timeout is None:
timeout = self.config.ffmpeg_timeout
# 日志记录命令(限制长度)
cmd_str = ' '.join(cmd)
if len(cmd_str) > 500:
cmd_str = cmd_str[:500] + '...'
logger.info(f"[task:{task_id}] FFmpeg: {cmd_str}")
try:
result = subprocess.run(
cmd,
capture_output=True,
timeout=timeout,
**subprocess_args(False)
)
if result.returncode != 0:
stderr = result.stderr.decode('utf-8', errors='replace')[:1000]
logger.error(f"[task:{task_id}] FFmpeg failed (code={result.returncode}): {stderr}")
return False
return True
except subprocess.TimeoutExpired:
logger.error(f"[task:{task_id}] FFmpeg timeout after {timeout}s")
return False
except Exception as e:
logger.error(f"[task:{task_id}] FFmpeg error: {e}")
return False
def probe_duration(self, file_path: str) -> Optional[float]:
"""
探测媒体文件时长
Args:
file_path: 文件路径
Returns:
时长(秒),失败返回 None
"""
# 首先尝试 JSON 输出方式
duration = probe_duration_json(file_path)
if duration is not None:
return duration
# 回退到旧方式
try:
_, _, duration = probe_video_info(file_path)
return float(duration) if duration else None
except Exception as e:
logger.warning(f"Failed to probe duration: {file_path} -> {e}")
return None
def get_file_size(self, file_path: str) -> int:
"""
获取文件大小
Args:
file_path: 文件路径
Returns:
文件大小(字节)
"""
try:
return os.path.getsize(file_path)
except Exception:
return 0
def ensure_file_exists(self, file_path: str, min_size: int = 0) -> bool:
"""
确保文件存在且大小满足要求
Args:
file_path: 文件路径
min_size: 最小大小(字节)
Returns:
是否满足要求
"""
if not os.path.exists(file_path):
return False
return os.path.getsize(file_path) >= min_size

View File

@@ -0,0 +1,273 @@
# -*- coding: utf-8 -*-
"""
转场合成处理器
处理 COMPOSE_TRANSITION 任务,将相邻两个片段的 overlap 区域进行混合,生成转场效果。
使用 FFmpeg xfade 滤镜实现多种转场效果。
"""
import os
import logging
from typing import List, Optional
from handlers.base import BaseHandler, VIDEO_ENCODE_ARGS
from domain.task import Task, TaskType, TransitionConfig, TRANSITION_TYPES
from domain.result import TaskResult, ErrorCode
logger = logging.getLogger(__name__)
class ComposeTransitionHandler(BaseHandler):
"""
转场合成处理器
职责:
- 下载前一个片段的视频(含尾部 overlap)
- 下载后一个片段的视频(含头部 overlap)
- 使用 xfade 滤镜合成转场效果
- 上传转场视频产物
关键约束:
- 转场任务必须等待前后两个片段的 RENDER_SEGMENT_VIDEO 都完成后才能执行
- 输出编码参数必须与片段视频一致,确保后续 TS 封装兼容
- 转场视频不含音频轨道(音频由 PREPARE_JOB_AUDIO 统一处理)
"""
def get_supported_type(self) -> TaskType:
return TaskType.COMPOSE_TRANSITION
def handle(self, task: Task) -> TaskResult:
"""处理转场合成任务"""
work_dir = self.create_work_dir(task.task_id)
try:
# 解析参数
transition_id = task.get_transition_id()
prev_segment = task.get_prev_segment()
next_segment = task.get_next_segment()
transition_config = task.get_transition_config()
output_spec = task.get_output_spec()
# 参数验证
if not transition_id:
return TaskResult.fail(
ErrorCode.E_SPEC_INVALID,
"Missing transitionId"
)
if not prev_segment or not prev_segment.get('videoUrl'):
return TaskResult.fail(
ErrorCode.E_SPEC_INVALID,
"Missing prevSegment.videoUrl"
)
if not next_segment or not next_segment.get('videoUrl'):
return TaskResult.fail(
ErrorCode.E_SPEC_INVALID,
"Missing nextSegment.videoUrl"
)
if not transition_config:
return TaskResult.fail(
ErrorCode.E_SPEC_INVALID,
"Missing transition config"
)
# 获取 overlap 时长
overlap_tail_ms = prev_segment.get('overlapTailMs', 0)
overlap_head_ms = next_segment.get('overlapHeadMs', 0)
transition_duration_ms = transition_config.duration_ms
# 验证 overlap 时长
if overlap_tail_ms <= 0 or overlap_head_ms <= 0:
return TaskResult.fail(
ErrorCode.E_SPEC_INVALID,
f"Invalid overlap duration: tail={overlap_tail_ms}ms, head={overlap_head_ms}ms"
)
logger.info(
f"[task:{task.task_id}] Composing transition: {transition_config.type}, "
f"duration={transition_duration_ms}ms, "
f"overlap_tail={overlap_tail_ms}ms, overlap_head={overlap_head_ms}ms"
)
# 1. 下载前一个片段视频
prev_video_file = os.path.join(work_dir, 'prev_segment.mp4')
if not self.download_file(prev_segment['videoUrl'], prev_video_file):
return TaskResult.fail(
ErrorCode.E_INPUT_UNAVAILABLE,
f"Failed to download prev segment video: {prev_segment['videoUrl']}"
)
# 2. 下载后一个片段视频
next_video_file = os.path.join(work_dir, 'next_segment.mp4')
if not self.download_file(next_segment['videoUrl'], next_video_file):
return TaskResult.fail(
ErrorCode.E_INPUT_UNAVAILABLE,
f"Failed to download next segment video: {next_segment['videoUrl']}"
)
# 3. 获取前一个片段的实际时长
prev_duration = self.probe_duration(prev_video_file)
if not prev_duration:
return TaskResult.fail(
ErrorCode.E_FFMPEG_FAILED,
"Failed to probe prev segment duration"
)
# 4. 构建转场合成命令
output_file = os.path.join(work_dir, 'transition.mp4')
cmd = self._build_command(
prev_video_file=prev_video_file,
next_video_file=next_video_file,
output_file=output_file,
prev_duration_sec=prev_duration,
overlap_tail_ms=overlap_tail_ms,
overlap_head_ms=overlap_head_ms,
transition_config=transition_config,
output_spec=output_spec
)
# 5. 执行 FFmpeg
if not self.run_ffmpeg(cmd, task.task_id):
return TaskResult.fail(
ErrorCode.E_FFMPEG_FAILED,
"FFmpeg transition composition failed"
)
# 6. 验证输出文件
if not self.ensure_file_exists(output_file, min_size=1024):
return TaskResult.fail(
ErrorCode.E_FFMPEG_FAILED,
"Transition output file is missing or too small"
)
# 7. 获取实际时长
actual_duration = self.probe_duration(output_file)
actual_duration_ms = int(actual_duration * 1000) if actual_duration else transition_duration_ms
# 8. 上传产物
transition_video_url = self.upload_file(task.task_id, 'video', output_file)
if not transition_video_url:
return TaskResult.fail(
ErrorCode.E_UPLOAD_FAILED,
"Failed to upload transition video"
)
return TaskResult.ok({
'transitionVideoUrl': transition_video_url,
'actualDurationMs': actual_duration_ms
})
except Exception as e:
logger.error(f"[task:{task.task_id}] Unexpected error: {e}", exc_info=True)
return TaskResult.fail(ErrorCode.E_UNKNOWN, str(e))
finally:
self.cleanup_work_dir(work_dir)
def _build_command(
self,
prev_video_file: str,
next_video_file: str,
output_file: str,
prev_duration_sec: float,
overlap_tail_ms: int,
overlap_head_ms: int,
transition_config: TransitionConfig,
output_spec
) -> List[str]:
"""
构建转场合成命令
使用 xfade 滤镜合成转场效果:
1. 从前一个片段截取尾部 overlap 区域
2. 从后一个片段截取头部 overlap 区域
3. 使用 xfade 进行混合
注意:
- 转场视频时长很短,需要特别处理 GOP 大小
- 确保第一帧是关键帧以便后续 TS 封装
Args:
prev_video_file: 前一个片段视频路径
next_video_file: 后一个片段视频路径
output_file: 输出文件路径
prev_duration_sec: 前一个片段总时长(秒)
overlap_tail_ms: 尾部 overlap 时长(毫秒)
overlap_head_ms: 头部 overlap 时长(毫秒)
transition_config: 转场配置
output_spec: 输出规格
Returns:
FFmpeg 命令参数列表
"""
# 计算时间参数
overlap_tail_sec = overlap_tail_ms / 1000.0
overlap_head_sec = overlap_head_ms / 1000.0
# 前一个片段的尾部 overlap 起始位置
tail_start_sec = prev_duration_sec - overlap_tail_sec
# 转场时长(使用两个 overlap 区域的总和,xfade 会将两段合成为此时长)
# 注意:xfade 的输出时长 = overlap_tail + overlap_head - duration
# 当 duration = overlap_tail + overlap_head 时,输出时长约等于 duration
transition_duration_sec = min(overlap_tail_sec, overlap_head_sec)
# 获取 xfade 转场类型
xfade_transition = transition_config.get_ffmpeg_transition()
# 构建滤镜
# [0:v] trim 截取前一个片段的尾部 overlap
# [1:v] trim 截取后一个片段的头部 overlap
# xfade 混合两段视频
filter_complex = (
f"[0:v]trim=start={tail_start_sec},setpts=PTS-STARTPTS[v0];"
f"[1:v]trim=end={overlap_head_sec},setpts=PTS-STARTPTS[v1];"
f"[v0][v1]xfade=transition={xfade_transition}:duration={transition_duration_sec}:offset=0[outv]"
)
cmd = [
'ffmpeg', '-y', '-hide_banner',
'-i', prev_video_file,
'-i', next_video_file,
'-filter_complex', filter_complex,
'-map', '[outv]',
]
# 编码参数(与片段视频一致)
cmd.extend(VIDEO_ENCODE_ARGS)
# 帧率
fps = output_spec.fps
# 计算输出视频的预估帧数
# xfade 输出时长 ≈ overlap_tail + overlap_head - transition_duration
output_duration_sec = overlap_tail_sec + overlap_head_sec - transition_duration_sec
total_frames = int(output_duration_sec * fps)
# 动态调整 GOP 大小:对于短视频,GOP 不能大于总帧数
# 确保至少有 1 个关键帧(第一帧),最小 GOP = 1
if total_frames <= 1:
gop_size = 1
elif total_frames < fps:
# 短于 1 秒的视频,使用全部帧数作为 GOP(整个视频只有开头一个关键帧)
gop_size = total_frames
else:
# 正常情况,每秒一个关键帧(比标准的 2 秒更密集,适合短视频)
gop_size = fps
cmd.extend(['-r', str(fps)])
cmd.extend(['-g', str(gop_size)])
cmd.extend(['-keyint_min', str(min(gop_size, fps // 2 or 1))])
# 强制第一帧为关键帧
cmd.extend(['-force_key_frames', 'expr:eq(n,0)'])
# 无音频
cmd.append('-an')
# 输出文件
cmd.append(output_file)
return cmd

190
handlers/finalize_mp4.py Normal file
View File

@@ -0,0 +1,190 @@
# -*- coding: utf-8 -*-
"""
最终 MP4 合并处理器
处理 FINALIZE_MP4 任务,将所有 TS 分片合并为最终可下载的 MP4 文件。
"""
import os
import logging
from typing import List
from handlers.base import BaseHandler
from domain.task import Task, TaskType
from domain.result import TaskResult, ErrorCode
logger = logging.getLogger(__name__)
class FinalizeMp4Handler(BaseHandler):
"""
最终 MP4 合并处理器
职责:
- 下载所有 TS 分片
- 使用 concat demuxer 合并
- 产出最终 MP4(remux,不重编码)
- 上传 MP4 产物
关键约束:
- 优先使用 remux(复制流,不重新编码)
- 使用 aac_adtstoasc bitstream filter 处理音频
"""
def get_supported_type(self) -> TaskType:
return TaskType.FINALIZE_MP4
def handle(self, task: Task) -> TaskResult:
"""处理 MP4 合并任务"""
work_dir = self.create_work_dir(task.task_id)
try:
# 获取 TS 列表
ts_list = task.get_ts_list()
m3u8_url = task.get_m3u8_url()
if not ts_list and not m3u8_url:
return TaskResult.fail(
ErrorCode.E_SPEC_INVALID,
"Missing tsList or m3u8Url"
)
output_file = os.path.join(work_dir, 'final.mp4')
if ts_list:
# 方式1:使用 TS 列表
result = self._process_ts_list(task, work_dir, ts_list, output_file)
else:
# 方式2:使用 m3u8 URL
result = self._process_m3u8(task, work_dir, m3u8_url, output_file)
if not result.success:
return result
# 验证输出文件
if not self.ensure_file_exists(output_file, min_size=4096):
return TaskResult.fail(
ErrorCode.E_FFMPEG_FAILED,
"MP4 output file is missing or too small"
)
# 获取文件大小
file_size = self.get_file_size(output_file)
# 上传产物
mp4_url = self.upload_file(task.task_id, 'mp4', output_file)
if not mp4_url:
return TaskResult.fail(
ErrorCode.E_UPLOAD_FAILED,
"Failed to upload MP4"
)
return TaskResult.ok({
'mp4Url': mp4_url,
'fileSizeBytes': file_size
})
except Exception as e:
logger.error(f"[task:{task.task_id}] Unexpected error: {e}", exc_info=True)
return TaskResult.fail(ErrorCode.E_UNKNOWN, str(e))
finally:
self.cleanup_work_dir(work_dir)
def _process_ts_list(
self,
task: Task,
work_dir: str,
ts_list: List[str],
output_file: str
) -> TaskResult:
"""
使用 TS 列表处理
Args:
task: 任务实体
work_dir: 工作目录
ts_list: TS URL 列表
output_file: 输出文件路径
Returns:
TaskResult
"""
# 1. 下载所有 TS 分片
ts_files = []
for i, ts_url in enumerate(ts_list):
ts_file = os.path.join(work_dir, f'seg_{i}.ts')
if not self.download_file(ts_url, ts_file):
return TaskResult.fail(
ErrorCode.E_INPUT_UNAVAILABLE,
f"Failed to download TS segment {i}: {ts_url}"
)
ts_files.append(ts_file)
logger.info(f"[task:{task.task_id}] Downloaded {len(ts_files)} TS segments")
# 2. 创建 concat 文件列表
concat_file = os.path.join(work_dir, 'concat.txt')
with open(concat_file, 'w', encoding='utf-8') as f:
for ts_file in ts_files:
# 路径中的反斜杠需要转义或使用正斜杠
ts_path = ts_file.replace('\\', '/')
f.write(f"file '{ts_path}'\n")
# 3. 构建合并命令(remux,不重编码)
cmd = [
'ffmpeg', '-y', '-hide_banner',
'-f', 'concat',
'-safe', '0',
'-i', concat_file,
'-c', 'copy', # 复制流,不重编码
'-bsf:a', 'aac_adtstoasc', # 音频 bitstream filter
output_file
]
# 4. 执行 FFmpeg
if not self.run_ffmpeg(cmd, task.task_id):
return TaskResult.fail(
ErrorCode.E_FFMPEG_FAILED,
"MP4 concatenation failed"
)
return TaskResult.ok({})
def _process_m3u8(
self,
task: Task,
work_dir: str,
m3u8_url: str,
output_file: str
) -> TaskResult:
"""
使用 m3u8 URL 处理
Args:
task: 任务实体
work_dir: 工作目录
m3u8_url: m3u8 URL
output_file: 输出文件路径
Returns:
TaskResult
"""
# 构建命令
cmd = [
'ffmpeg', '-y', '-hide_banner',
'-protocol_whitelist', 'file,http,https,tcp,tls',
'-i', m3u8_url,
'-c', 'copy',
'-bsf:a', 'aac_adtstoasc',
output_file
]
# 执行 FFmpeg
if not self.run_ffmpeg(cmd, task.task_id):
return TaskResult.fail(
ErrorCode.E_FFMPEG_FAILED,
"MP4 conversion from m3u8 failed"
)
return TaskResult.ok({})

304
handlers/package_ts.py Normal file
View File

@@ -0,0 +1,304 @@
# -*- coding: utf-8 -*-
"""
TS 分片封装处理器
处理 PACKAGE_SEGMENT_TS 任务,将视频片段和对应时间区间的音频封装为 TS 分片。
支持转场相关的 overlap 裁剪和转场分片封装。
"""
import os
import logging
from typing import List, Optional
from handlers.base import BaseHandler, VIDEO_ENCODE_ARGS
from domain.task import Task, TaskType
from domain.result import TaskResult, ErrorCode
logger = logging.getLogger(__name__)
class PackageSegmentTsHandler(BaseHandler):
"""
TS 分片封装处理器
职责:
- 下载视频片段
- 下载全局音频
- 截取对应时间区间的音频
- 封装为 TS 分片
- 上传 TS 产物
关键约束:
- TS 必须包含音视频同轨
- 使用 output_ts_offset 保证时间戳连续
- 输出 extinfDurationSec 供 m3u8 使用
转场相关:
- 普通片段 TS:需要裁剪掉 overlap 区域(已被转场分片使用)
- 转场分片 TS:直接封装转场视频产物,无需裁剪
- 无转场时:走原有逻辑,不做裁剪
精确裁剪:
- 当需要裁剪 overlap 区域时,必须使用重编码方式(-vf trim)才能精确切割
- 使用 -c copy 只能从关键帧切割,会导致不精确
"""
def get_supported_type(self) -> TaskType:
return TaskType.PACKAGE_SEGMENT_TS
def handle(self, task: Task) -> TaskResult:
"""处理 TS 封装任务"""
work_dir = self.create_work_dir(task.task_id)
try:
# 解析参数
video_url = task.get_video_url()
audio_url = task.get_audio_url()
start_time_ms = task.get_start_time_ms()
duration_ms = task.get_duration_ms()
output_spec = task.get_output_spec()
# 转场相关参数
is_transition_segment = task.is_transition_segment()
trim_head = task.should_trim_head()
trim_tail = task.should_trim_tail()
trim_head_ms = task.get_trim_head_ms()
trim_tail_ms = task.get_trim_tail_ms()
if not video_url:
return TaskResult.fail(
ErrorCode.E_SPEC_INVALID,
"Missing videoUrl"
)
if not audio_url:
return TaskResult.fail(
ErrorCode.E_SPEC_INVALID,
"Missing audioUrl"
)
# 计算时间参数
start_sec = start_time_ms / 1000.0
duration_sec = duration_ms / 1000.0
# 1. 下载视频片段
video_file = os.path.join(work_dir, 'video.mp4')
if not self.download_file(video_url, video_file):
return TaskResult.fail(
ErrorCode.E_INPUT_UNAVAILABLE,
f"Failed to download video: {video_url}"
)
# 2. 下载全局音频
audio_file = os.path.join(work_dir, 'audio.aac')
if not self.download_file(audio_url, audio_file):
return TaskResult.fail(
ErrorCode.E_INPUT_UNAVAILABLE,
f"Failed to download audio: {audio_url}"
)
# 3. 判断是否需要精确裁剪视频
needs_video_trim = not is_transition_segment and (
(trim_head and trim_head_ms > 0) or
(trim_tail and trim_tail_ms > 0)
)
# 4. 如果需要裁剪,先重编码裁剪视频
processed_video_file = video_file
if needs_video_trim:
processed_video_file = os.path.join(work_dir, 'trimmed_video.mp4')
trim_cmd = self._build_trim_command(
video_file=video_file,
output_file=processed_video_file,
trim_head_ms=trim_head_ms if trim_head else 0,
trim_tail_ms=trim_tail_ms if trim_tail else 0,
output_spec=output_spec
)
logger.info(f"[task:{task.task_id}] Trimming video: head={trim_head_ms}ms, tail={trim_tail_ms}ms")
if not self.run_ffmpeg(trim_cmd, task.task_id):
return TaskResult.fail(
ErrorCode.E_FFMPEG_FAILED,
"Video trim failed"
)
if not self.ensure_file_exists(processed_video_file, min_size=1024):
return TaskResult.fail(
ErrorCode.E_FFMPEG_FAILED,
"Trimmed video file is missing or too small"
)
# 5. 构建 TS 封装命令
output_file = os.path.join(work_dir, 'segment.ts')
cmd = self._build_package_command(
video_file=processed_video_file,
audio_file=audio_file,
output_file=output_file,
start_sec=start_sec,
duration_sec=duration_sec
)
# 6. 执行 FFmpeg
if not self.run_ffmpeg(cmd, task.task_id):
return TaskResult.fail(
ErrorCode.E_FFMPEG_FAILED,
"TS packaging failed"
)
# 7. 验证输出文件
if not self.ensure_file_exists(output_file, min_size=1024):
return TaskResult.fail(
ErrorCode.E_FFMPEG_FAILED,
"TS output file is missing or too small"
)
# 8. 获取实际时长(用于 EXTINF)
actual_duration = self.probe_duration(output_file)
extinf_duration = actual_duration if actual_duration else duration_sec
# 9. 上传产物
ts_url = self.upload_file(task.task_id, 'ts', output_file)
if not ts_url:
return TaskResult.fail(
ErrorCode.E_UPLOAD_FAILED,
"Failed to upload TS"
)
return TaskResult.ok({
'tsUrl': ts_url,
'extinfDurationSec': extinf_duration
})
except Exception as e:
logger.error(f"[task:{task.task_id}] Unexpected error: {e}", exc_info=True)
return TaskResult.fail(ErrorCode.E_UNKNOWN, str(e))
finally:
self.cleanup_work_dir(work_dir)
def _build_trim_command(
self,
video_file: str,
output_file: str,
trim_head_ms: int,
trim_tail_ms: int,
output_spec
) -> List[str]:
"""
构建视频精确裁剪命令(重编码方式)
使用 trim 滤镜进行精确帧级裁剪,而非 -ss/-t 参数的关键帧裁剪。
Args:
video_file: 输入视频路径
output_file: 输出视频路径
trim_head_ms: 头部裁剪时长(毫秒)
trim_tail_ms: 尾部裁剪时长(毫秒)
output_spec: 输出规格
Returns:
FFmpeg 命令参数列表
"""
# 获取原视频时长
original_duration = self.probe_duration(video_file)
if not original_duration:
original_duration = 10.0 # 默认值,避免除零
trim_head_sec = trim_head_ms / 1000.0
trim_tail_sec = trim_tail_ms / 1000.0
# 计算裁剪后的起止时间
start_time = trim_head_sec
end_time = original_duration - trim_tail_sec
# 构建 trim 滤镜
vf_filter = f"trim=start={start_time}:end={end_time},setpts=PTS-STARTPTS"
cmd = [
'ffmpeg', '-y', '-hide_banner',
'-i', video_file,
'-vf', vf_filter,
]
# 编码参数
cmd.extend(VIDEO_ENCODE_ARGS)
# 帧率
fps = output_spec.fps
cmd.extend(['-r', str(fps)])
# 计算输出视频帧数,动态调整 GOP
output_duration_sec = end_time - start_time
total_frames = int(output_duration_sec * fps)
# 动态 GOP:短视频使用较小的 GOP
if total_frames <= 1:
gop_size = 1
elif total_frames < fps:
gop_size = total_frames
else:
gop_size = fps # 每秒一个关键帧
cmd.extend(['-g', str(gop_size)])
cmd.extend(['-keyint_min', str(min(gop_size, fps // 2 or 1))])
# 强制第一帧为关键帧
cmd.extend(['-force_key_frames', 'expr:eq(n,0)'])
# 无音频(音频单独处理)
cmd.append('-an')
cmd.append(output_file)
return cmd
def _build_package_command(
self,
video_file: str,
audio_file: str,
output_file: str,
start_sec: float,
duration_sec: float
) -> List[str]:
"""
构建 TS 封装命令
将视频和对应时间区间的音频封装为 TS 分片。
视频使用 copy 模式(已经过精确裁剪或无需裁剪)。
Args:
video_file: 视频文件路径(已处理)
audio_file: 音频文件路径
output_file: 输出文件路径
start_sec: 音频开始时间(秒)
duration_sec: 音频时长(秒)
Returns:
FFmpeg 命令参数列表
"""
cmd = [
'ffmpeg', '-y', '-hide_banner',
# 视频输入
'-i', video_file,
# 音频输入(从 start_sec 开始截取 duration_sec)
'-ss', str(start_sec),
'-t', str(duration_sec),
'-i', audio_file,
# 映射流
'-map', '0:v:0', # 使用第一个输入的视频流
'-map', '1:a:0', # 使用第二个输入的音频流
# 复制编码(视频已处理,无需重编码)
'-c:v', 'copy',
'-c:a', 'copy',
# 关键:时间戳偏移,保证整体连续
'-output_ts_offset', str(start_sec),
# 复用参数
'-muxdelay', '0',
'-muxpreload', '0',
# 输出格式
'-f', 'mpegts',
output_file
]
return cmd

251
handlers/prepare_audio.py Normal file
View File

@@ -0,0 +1,251 @@
# -*- coding: utf-8 -*-
"""
全局音频准备处理器
处理 PREPARE_JOB_AUDIO 任务,生成整个视频的连续音频轨道。
"""
import os
import logging
from typing import List, Dict, Optional
from handlers.base import BaseHandler, AUDIO_ENCODE_ARGS
from domain.task import Task, TaskType, AudioSpec, AudioProfile
from domain.result import TaskResult, ErrorCode
logger = logging.getLogger(__name__)
class PrepareJobAudioHandler(BaseHandler):
"""
全局音频准备处理器
职责:
- 下载全局 BGM
- 下载各片段叠加音效
- 构建复杂混音命令
- 执行混音
- 上传音频产物
关键约束:
- 全局 BGM 连续生成一次,贯穿整个时长
- 禁止使用 amix normalize=1
- 只对叠加音轨做极短淡入淡出(5-20ms)
- 不对 BGM 做边界 fade
"""
def get_supported_type(self) -> TaskType:
return TaskType.PREPARE_JOB_AUDIO
def handle(self, task: Task) -> TaskResult:
"""处理音频准备任务"""
work_dir = self.create_work_dir(task.task_id)
try:
# 解析参数
total_duration_ms = task.get_total_duration_ms()
if total_duration_ms <= 0:
return TaskResult.fail(
ErrorCode.E_SPEC_INVALID,
"Invalid totalDurationMs"
)
total_duration_sec = total_duration_ms / 1000.0
audio_profile = task.get_audio_profile()
bgm_url = task.get_bgm_url()
segments = task.get_segments()
# 1. 下载 BGM(如有)
bgm_file = None
if bgm_url:
bgm_file = os.path.join(work_dir, 'bgm.mp3')
if not self.download_file(bgm_url, bgm_file):
logger.warning(f"[task:{task.task_id}] Failed to download BGM")
bgm_file = None
# 2. 下载叠加音效
sfx_files = []
for i, seg in enumerate(segments):
audio_spec_data = seg.get('audioSpecJson')
if audio_spec_data:
audio_spec = AudioSpec.from_dict(audio_spec_data)
if audio_spec and audio_spec.audio_url:
sfx_file = os.path.join(work_dir, f'sfx_{i}.mp3')
if self.download_file(audio_spec.audio_url, sfx_file):
sfx_files.append({
'file': sfx_file,
'spec': audio_spec,
'segment': seg
})
else:
logger.warning(f"[task:{task.task_id}] Failed to download SFX {i}")
# 3. 构建音频混音命令
output_file = os.path.join(work_dir, 'audio_full.aac')
cmd = self._build_audio_command(
bgm_file=bgm_file,
sfx_files=sfx_files,
output_file=output_file,
total_duration_sec=total_duration_sec,
audio_profile=audio_profile
)
# 4. 执行 FFmpeg
if not self.run_ffmpeg(cmd, task.task_id):
return TaskResult.fail(
ErrorCode.E_FFMPEG_FAILED,
"Audio mixing failed"
)
# 5. 验证输出文件
if not self.ensure_file_exists(output_file, min_size=1024):
return TaskResult.fail(
ErrorCode.E_FFMPEG_FAILED,
"Audio output file is missing or too small"
)
# 6. 上传产物
audio_url = self.upload_file(task.task_id, 'audio', output_file)
if not audio_url:
return TaskResult.fail(
ErrorCode.E_UPLOAD_FAILED,
"Failed to upload audio"
)
return TaskResult.ok({
'audioUrl': audio_url
})
except Exception as e:
logger.error(f"[task:{task.task_id}] Unexpected error: {e}", exc_info=True)
return TaskResult.fail(ErrorCode.E_UNKNOWN, str(e))
finally:
self.cleanup_work_dir(work_dir)
def _build_audio_command(
self,
bgm_file: Optional[str],
sfx_files: List[Dict],
output_file: str,
total_duration_sec: float,
audio_profile: AudioProfile
) -> List[str]:
"""
构建音频混音命令
Args:
bgm_file: BGM 文件路径(可选)
sfx_files: 叠加音效列表
output_file: 输出文件路径
total_duration_sec: 总时长(秒)
audio_profile: 音频配置
Returns:
FFmpeg 命令参数列表
"""
sample_rate = audio_profile.sample_rate
channels = audio_profile.channels
# 情况1:无 BGM 也无叠加音效 -> 生成静音
if not bgm_file and not sfx_files:
return [
'ffmpeg', '-y', '-hide_banner',
'-f', 'lavfi',
'-i', f'anullsrc=r={sample_rate}:cl=stereo',
'-t', str(total_duration_sec),
'-c:a', 'aac', '-b:a', '128k',
output_file
]
# 情况2:仅 BGM,无叠加音效
if not sfx_files:
return [
'ffmpeg', '-y', '-hide_banner',
'-i', bgm_file,
'-t', str(total_duration_sec),
'-c:a', 'aac', '-b:a', '128k',
'-ar', str(sample_rate), '-ac', str(channels),
output_file
]
# 情况3:BGM + 叠加音效 -> 复杂滤镜
inputs = []
if bgm_file:
inputs.extend(['-i', bgm_file])
for sfx in sfx_files:
inputs.extend(['-i', sfx['file']])
filter_parts = []
input_idx = 0
# BGM 处理(或生成静音底轨)
if bgm_file:
filter_parts.append(
f"[0:a]atrim=0:{total_duration_sec},asetpts=PTS-STARTPTS,"
f"apad=whole_dur={total_duration_sec}[bgm]"
)
input_idx = 1
else:
filter_parts.append(
f"anullsrc=r={sample_rate}:cl=stereo,"
f"atrim=0:{total_duration_sec}[bgm]"
)
input_idx = 0
# 叠加音效处理
sfx_labels = []
for i, sfx in enumerate(sfx_files):
idx = input_idx + i
spec = sfx['spec']
seg = sfx['segment']
# 计算时间参数
start_time_ms = seg.get('startTimeMs', 0)
duration_ms = seg.get('durationMs', 5000)
delay_ms = start_time_ms + spec.delay_ms
delay_sec = delay_ms / 1000.0
duration_sec = duration_ms / 1000.0
# 淡入淡出参数(极短,5-20ms)
fade_in_sec = spec.fade_in_ms / 1000.0
fade_out_sec = spec.fade_out_ms / 1000.0
# 音量
volume = spec.volume
label = f"sfx{i}"
sfx_labels.append(f"[{label}]")
# 构建滤镜:延迟 + 淡入淡出 + 音量
# 注意:只对叠加音轨做淡入淡出,不对 BGM 做
sfx_filter = (
f"[{idx}:a]"
f"adelay={int(delay_ms)}|{int(delay_ms)},"
f"afade=t=in:st={delay_sec}:d={fade_in_sec},"
f"afade=t=out:st={delay_sec + duration_sec - fade_out_sec}:d={fade_out_sec},"
f"volume={volume}"
f"[{label}]"
)
filter_parts.append(sfx_filter)
# 混音(关键:normalize=0,禁止归一化)
# dropout_transition=0 表示输入结束时不做渐变
mix_inputs = "[bgm]" + "".join(sfx_labels)
num_inputs = 1 + len(sfx_files)
filter_parts.append(
f"{mix_inputs}amix=inputs={num_inputs}:duration=first:"
f"dropout_transition=0:normalize=0[out]"
)
filter_complex = ';'.join(filter_parts)
cmd = ['ffmpeg', '-y', '-hide_banner'] + inputs + [
'-filter_complex', filter_complex,
'-map', '[out]',
'-c:a', 'aac', '-b:a', '128k',
'-ar', str(sample_rate), '-ac', str(channels),
output_file
]
return cmd

312
handlers/render_video.py Normal file
View File

@@ -0,0 +1,312 @@
# -*- coding: utf-8 -*-
"""
视频片段渲染处理器
处理 RENDER_SEGMENT_VIDEO 任务,将原素材渲染为符合输出规格的视频片段。
支持转场 overlap 区域的帧冻结生成。
"""
import os
import logging
from typing import List, Optional, Tuple
from handlers.base import BaseHandler, VIDEO_ENCODE_ARGS
from domain.task import Task, TaskType, RenderSpec, OutputSpec
from domain.result import TaskResult, ErrorCode
logger = logging.getLogger(__name__)
class RenderSegmentVideoHandler(BaseHandler):
"""
视频片段渲染处理器
职责:
- 下载素材文件
- 下载 LUT 文件(如有)
- 下载叠加层(如有)
- 构建 FFmpeg 渲染命令
- 执行渲染(支持帧冻结生成 overlap 区域)
- 上传产物
"""
def get_supported_type(self) -> TaskType:
return TaskType.RENDER_SEGMENT_VIDEO
def handle(self, task: Task) -> TaskResult:
"""处理视频渲染任务"""
work_dir = self.create_work_dir(task.task_id)
try:
# 解析参数
material_url = task.get_material_url()
if not material_url:
return TaskResult.fail(
ErrorCode.E_SPEC_INVALID,
"Missing material URL (boundMaterialUrl or sourceRef)"
)
render_spec = task.get_render_spec()
output_spec = task.get_output_spec()
duration_ms = task.get_duration_ms()
# 1. 下载素材
input_file = os.path.join(work_dir, 'input.mp4')
if not self.download_file(material_url, input_file):
return TaskResult.fail(
ErrorCode.E_INPUT_UNAVAILABLE,
f"Failed to download material: {material_url}"
)
# 2. 下载 LUT(如有)
lut_file = None
if render_spec.lut_url:
lut_file = os.path.join(work_dir, 'lut.cube')
if not self.download_file(render_spec.lut_url, lut_file):
logger.warning(f"[task:{task.task_id}] Failed to download LUT, continuing without it")
lut_file = None
# 3. 下载叠加层(如有)
overlay_file = None
if render_spec.overlay_url:
# 根据 URL 后缀确定文件扩展名
ext = '.png'
if render_spec.overlay_url.lower().endswith('.jpg') or render_spec.overlay_url.lower().endswith('.jpeg'):
ext = '.jpg'
overlay_file = os.path.join(work_dir, f'overlay{ext}')
if not self.download_file(render_spec.overlay_url, overlay_file):
logger.warning(f"[task:{task.task_id}] Failed to download overlay, continuing without it")
overlay_file = None
# 4. 计算 overlap 时长
overlap_head_ms = render_spec.get_overlap_head_ms()
overlap_tail_ms = render_spec.get_overlap_tail_ms()
# 5. 构建 FFmpeg 命令
output_file = os.path.join(work_dir, 'output.mp4')
cmd = self._build_command(
input_file=input_file,
output_file=output_file,
render_spec=render_spec,
output_spec=output_spec,
duration_ms=duration_ms,
lut_file=lut_file,
overlay_file=overlay_file,
overlap_head_ms=overlap_head_ms,
overlap_tail_ms=overlap_tail_ms
)
# 6. 执行 FFmpeg
if not self.run_ffmpeg(cmd, task.task_id):
return TaskResult.fail(
ErrorCode.E_FFMPEG_FAILED,
"FFmpeg rendering failed"
)
# 7. 验证输出文件
if not self.ensure_file_exists(output_file, min_size=4096):
return TaskResult.fail(
ErrorCode.E_FFMPEG_FAILED,
"Output file is missing or too small"
)
# 8. 获取实际时长
actual_duration = self.probe_duration(output_file)
actual_duration_ms = int(actual_duration * 1000) if actual_duration else duration_ms
# 9. 上传产物
video_url = self.upload_file(task.task_id, 'video', output_file)
if not video_url:
return TaskResult.fail(
ErrorCode.E_UPLOAD_FAILED,
"Failed to upload video"
)
# 10. 构建结果(包含 overlap 信息)
result_data = {
'videoUrl': video_url,
'actualDurationMs': actual_duration_ms,
'overlapHeadMs': overlap_head_ms,
'overlapTailMs': overlap_tail_ms
}
return TaskResult.ok(result_data)
except Exception as e:
logger.error(f"[task:{task.task_id}] Unexpected error: {e}", exc_info=True)
return TaskResult.fail(ErrorCode.E_UNKNOWN, str(e))
finally:
self.cleanup_work_dir(work_dir)
def _build_command(
self,
input_file: str,
output_file: str,
render_spec: RenderSpec,
output_spec: OutputSpec,
duration_ms: int,
lut_file: Optional[str] = None,
overlay_file: Optional[str] = None,
overlap_head_ms: int = 0,
overlap_tail_ms: int = 0
) -> List[str]:
"""
构建 FFmpeg 渲染命令
Args:
input_file: 输入文件路径
output_file: 输出文件路径
render_spec: 渲染规格
output_spec: 输出规格
duration_ms: 目标时长(毫秒)
lut_file: LUT 文件路径(可选)
overlay_file: 叠加层文件路径(可选)
overlap_head_ms: 头部 overlap 时长(毫秒)
overlap_tail_ms: 尾部 overlap 时长(毫秒)
Returns:
FFmpeg 命令参数列表
"""
cmd = ['ffmpeg', '-y', '-hide_banner']
# 输入文件
cmd.extend(['-i', input_file])
# 叠加层输入
if overlay_file:
cmd.extend(['-i', overlay_file])
# 构建视频滤镜链
filters = self._build_video_filters(
render_spec=render_spec,
output_spec=output_spec,
lut_file=lut_file,
has_overlay=overlay_file is not None,
overlap_head_ms=overlap_head_ms,
overlap_tail_ms=overlap_tail_ms
)
# 应用滤镜
if overlay_file:
# 使用 filter_complex 处理叠加
cmd.extend(['-filter_complex', filters])
elif filters:
cmd.extend(['-vf', filters])
# 编码参数(v2 统一参数)
cmd.extend(VIDEO_ENCODE_ARGS)
# 帧率
fps = output_spec.fps
cmd.extend(['-r', str(fps)])
# GOP 大小(关键帧间隔)
gop_size = fps * 2 # 2秒一个关键帧
cmd.extend(['-g', str(gop_size)])
cmd.extend(['-keyint_min', str(gop_size)])
# 时长(包含 overlap 区域)
total_duration_ms = duration_ms + overlap_head_ms + overlap_tail_ms
duration_sec = total_duration_ms / 1000.0
cmd.extend(['-t', str(duration_sec)])
# 无音频(视频片段不包含音频)
cmd.append('-an')
# 输出文件
cmd.append(output_file)
return cmd
def _build_video_filters(
self,
render_spec: RenderSpec,
output_spec: OutputSpec,
lut_file: Optional[str] = None,
has_overlay: bool = False,
overlap_head_ms: int = 0,
overlap_tail_ms: int = 0
) -> str:
"""
构建视频滤镜链
Args:
render_spec: 渲染规格
output_spec: 输出规格
lut_file: LUT 文件路径
has_overlay: 是否有叠加层
overlap_head_ms: 头部 overlap 时长(毫秒)
overlap_tail_ms: 尾部 overlap 时长(毫秒)
Returns:
滤镜字符串
"""
filters = []
width = output_spec.width
height = output_spec.height
# 1. 变速处理
speed = float(render_spec.speed) if render_spec.speed else 1.0
if speed != 1.0 and speed > 0:
# setpts 公式:PTS / speed
pts_factor = 1.0 / speed
filters.append(f"setpts={pts_factor}*PTS")
# 2. LUT 调色
if lut_file:
# 路径中的反斜杠需要转义
lut_path = lut_file.replace('\\', '/')
filters.append(f"lut3d='{lut_path}'")
# 3. 裁切处理
if render_spec.crop_enable and render_spec.face_pos:
# 根据人脸位置进行智能裁切
try:
fx, fy = map(float, render_spec.face_pos.split(','))
# 计算裁切区域(保持输出比例)
target_ratio = width / height
# 假设裁切到目标比例
filters.append(
f"crop='min(iw,ih*{target_ratio})':'min(ih,iw/{target_ratio})':"
f"'(iw-min(iw,ih*{target_ratio}))*{fx}':"
f"'(ih-min(ih,iw/{target_ratio}))*{fy}'"
)
except (ValueError, ZeroDivisionError):
logger.warning(f"Invalid face position: {render_spec.face_pos}")
elif render_spec.zoom_cut:
# 中心缩放裁切
target_ratio = width / height
filters.append(
f"crop='min(iw,ih*{target_ratio})':'min(ih,iw/{target_ratio})'"
)
# 4. 缩放和填充
scale_filter = (
f"scale={width}:{height}:force_original_aspect_ratio=decrease,"
f"pad={width}:{height}:(ow-iw)/2:(oh-ih)/2:black"
)
filters.append(scale_filter)
# 5. 帧冻结(tpad)- 用于转场 overlap 区域
# 注意:tpad 必须在缩放之后应用
tpad_parts = []
if overlap_head_ms > 0:
# 头部冻结:将第一帧冻结指定时长
head_duration_sec = overlap_head_ms / 1000.0
tpad_parts.append(f"start_mode=clone:start_duration={head_duration_sec}")
if overlap_tail_ms > 0:
# 尾部冻结:将最后一帧冻结指定时长
tail_duration_sec = overlap_tail_ms / 1000.0
tpad_parts.append(f"stop_mode=clone:stop_duration={tail_duration_sec}")
if tpad_parts:
filters.append(f"tpad={':'.join(tpad_parts)}")
# 6. 构建最终滤镜
if has_overlay:
# 使用 filter_complex 格式
base_filters = ','.join(filters) if filters else 'copy'
return f"[0:v]{base_filters}[base];[base][1:v]overlay=0:0"
else:
return ','.join(filters) if filters else ''

188
index.py
View File

@@ -1,19 +1,177 @@
from time import sleep
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
RenderWorker v2 入口
import biz.task
import config
from template import load_local_template
from util import api
支持 v2 API 协议的渲染 Worker,处理以下任务类型:
- RENDER_SEGMENT_VIDEO: 渲染视频片段
- PREPARE_JOB_AUDIO: 生成全局音频
- PACKAGE_SEGMENT_TS: 封装 TS 分片
- FINALIZE_MP4: 产出最终 MP4
load_local_template()
使用方法:
python index.py
环境变量:
API_ENDPOINT_V2: v2 API 端点(或使用 API_ENDPOINT)
ACCESS_KEY: Worker 认证密钥
WORKER_ID: Worker ID(默认 100001)
MAX_CONCURRENCY: 最大并发数(默认 4)
HEARTBEAT_INTERVAL: 心跳间隔秒数(默认 5)
TEMP_DIR: 临时文件目录
"""
import sys
import time
import signal
import logging
from domain.config import WorkerConfig
from services.api_client import APIClientV2
from services.task_executor import TaskExecutor
from constant import SOFTWARE_VERSION
# 日志配置
logging.basicConfig(
level=logging.INFO,
format='[%(asctime)s] [%(levelname)s] [%(name)s] %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger('worker')
while True:
# print(get_sys_info())
print("waiting for task...")
task_list = api.sync_center()
if len(task_list) == 0:
sleep(5)
for task in task_list:
print("start task:", task)
biz.task.start_task(task)
class WorkerV2:
"""
v2 渲染 Worker 主类
负责:
- 配置加载
- API 客户端初始化
- 任务执行器管理
- 主循环运行
- 优雅退出处理
"""
def __init__(self):
"""初始化 Worker"""
# 加载配置
try:
self.config = WorkerConfig.from_env()
except ValueError as e:
logger.error(f"Configuration error: {e}")
sys.exit(1)
# 初始化 API 客户端
self.api_client = APIClientV2(self.config)
# 初始化任务执行器
self.task_executor = TaskExecutor(self.config, self.api_client)
# 运行状态
self.running = True
# 确保临时目录存在
self.config.ensure_temp_dir()
# 注册信号处理器
self._setup_signal_handlers()
def _setup_signal_handlers(self):
"""设置信号处理器"""
# Windows 不支持 SIGTERM
signal.signal(signal.SIGINT, self._signal_handler)
if hasattr(signal, 'SIGTERM'):
signal.signal(signal.SIGTERM, self._signal_handler)
def _signal_handler(self, signum, frame):
"""
信号处理,优雅退出
Args:
signum: 信号编号
frame: 当前栈帧
"""
signal_name = signal.Signals(signum).name
logger.info(f"Received signal {signal_name}, initiating shutdown...")
self.running = False
def run(self):
"""主循环"""
logger.info("=" * 60)
logger.info("RenderWorker v2 Starting")
logger.info("=" * 60)
logger.info(f"Worker ID: {self.config.worker_id}")
logger.info(f"API Endpoint: {self.config.api_endpoint}")
logger.info(f"Max Concurrency: {self.config.max_concurrency}")
logger.info(f"Heartbeat Interval: {self.config.heartbeat_interval}s")
logger.info(f"Capabilities: {', '.join(self.config.capabilities)}")
logger.info(f"Temp Directory: {self.config.temp_dir}")
logger.info("=" * 60)
consecutive_errors = 0
max_consecutive_errors = 10
while self.running:
try:
# 心跳同步并拉取任务
current_task_ids = self.task_executor.get_current_task_ids()
tasks = self.api_client.sync(current_task_ids)
# 提交新任务
for task in tasks:
if self.task_executor.submit_task(task):
logger.info(f"Submitted task: {task.task_id} ({task.task_type.value})")
# 重置错误计数
consecutive_errors = 0
# 等待下次心跳
time.sleep(self.config.heartbeat_interval)
except KeyboardInterrupt:
logger.info("Keyboard interrupt received")
self.running = False
except Exception as e:
consecutive_errors += 1
logger.error(f"Worker loop error ({consecutive_errors}/{max_consecutive_errors}): {e}", exc_info=True)
# 连续错误过多,增加等待时间
if consecutive_errors >= max_consecutive_errors:
logger.error("Too many consecutive errors, waiting 30 seconds...")
time.sleep(30)
consecutive_errors = 0
else:
time.sleep(5)
# 优雅关闭
self._shutdown()
def _shutdown(self):
"""优雅关闭"""
logger.info("Shutting down...")
# 等待当前任务完成
current_count = self.task_executor.get_current_task_count()
if current_count > 0:
logger.info(f"Waiting for {current_count} running task(s) to complete...")
# 关闭执行器
self.task_executor.shutdown(wait=True)
# 关闭 API 客户端
self.api_client.close()
logger.info("Worker stopped")
def main():
"""主函数"""
logger.info(f"RenderWorker v{SOFTWARE_VERSION}")
# 创建并运行 Worker
worker = WorkerV2()
worker.run()
if __name__ == '__main__':
main()

View File

@@ -1,3 +1,8 @@
requests~=2.32.3
psutil~=6.1.0
python-dotenv~=1.0.1
python-dotenv~=1.0.1
opentelemetry-api~=1.35.0
opentelemetry-sdk~=1.35.0
opentelemetry-exporter-otlp~=1.35.0
opentelemetry-instrumentation-threading~=0.56b0
flask~=3.1.0

18
services/__init__.py Normal file
View File

@@ -0,0 +1,18 @@
# -*- coding: utf-8 -*-
"""
服务层
包含 API 客户端、任务执行器、租约服务、存储服务等组件。
"""
from services.api_client import APIClientV2
from services.lease_service import LeaseService
from services.task_executor import TaskExecutor
from services import storage
__all__ = [
'APIClientV2',
'LeaseService',
'TaskExecutor',
'storage',
]

371
services/api_client.py Normal file
View File

@@ -0,0 +1,371 @@
# -*- coding: utf-8 -*-
"""
v2 API 客户端
实现与渲染服务端 v2 接口的通信。
"""
import logging
import subprocess
import requests
from typing import Dict, List, Optional, Any
from domain.task import Task
from domain.config import WorkerConfig
logger = logging.getLogger(__name__)
class APIClientV2:
"""
v2 API 客户端
负责与渲染服务端的所有 HTTP 通信。
"""
def __init__(self, config: WorkerConfig):
"""
初始化 API 客户端
Args:
config: Worker 配置
"""
self.config = config
self.base_url = config.api_endpoint.rstrip('/')
self.access_key = config.access_key
self.worker_id = config.worker_id
self.session = requests.Session()
# 设置默认请求头
self.session.headers.update({
'Content-Type': 'application/json',
'Accept': 'application/json'
})
def sync(self, current_task_ids: List[str]) -> List[Task]:
"""
心跳同步并拉取任务
Args:
current_task_ids: 当前正在执行的任务 ID 列表
Returns:
List[Task]: 新分配的任务列表
"""
url = f"{self.base_url}/render/v2/worker/sync"
# 将 task_id 转换为整数(服务端期望 []int64)
task_ids_int = [int(tid) for tid in current_task_ids if tid.isdigit()]
payload = {
'accessKey': self.access_key,
'workerId': self.worker_id,
'capabilities': self.config.capabilities,
'maxConcurrency': self.config.max_concurrency,
'currentTaskCount': len(current_task_ids),
'currentTaskIds': task_ids_int,
'ffmpegVersion': self._get_ffmpeg_version(),
'codecInfo': self._get_codec_info(),
'systemInfo': self._get_system_info()
}
try:
resp = self.session.post(url, json=payload, timeout=10)
resp.raise_for_status()
data = resp.json()
if data.get('code') != 200:
logger.warning(f"Sync failed: {data.get('message')}")
return []
# 解析任务列表
tasks = []
for task_data in data.get('data', {}).get('tasks', []):
try:
task = Task.from_dict(task_data)
tasks.append(task)
except Exception as e:
logger.error(f"Failed to parse task: {e}")
if tasks:
logger.info(f"Received {len(tasks)} new tasks")
return tasks
except requests.exceptions.Timeout:
logger.warning("Sync timeout")
return []
except requests.exceptions.RequestException as e:
logger.error(f"Sync request error: {e}")
return []
except Exception as e:
logger.error(f"Sync error: {e}")
return []
def report_start(self, task_id: str) -> bool:
"""
报告任务开始
Args:
task_id: 任务 ID
Returns:
bool: 是否成功
"""
url = f"{self.base_url}/render/v2/task/{task_id}/start"
try:
resp = self.session.post(
url,
json={'workerId': self.worker_id},
timeout=10
)
if resp.status_code == 200:
logger.debug(f"[task:{task_id}] Start reported")
return True
else:
logger.warning(f"[task:{task_id}] Report start failed: {resp.status_code}")
return False
except Exception as e:
logger.error(f"[task:{task_id}] Report start error: {e}")
return False
def report_success(self, task_id: str, result: Dict[str, Any]) -> bool:
"""
报告任务成功
Args:
task_id: 任务 ID
result: 任务结果数据
Returns:
bool: 是否成功
"""
url = f"{self.base_url}/render/v2/task/{task_id}/success"
try:
resp = self.session.post(
url,
json={
'workerId': self.worker_id,
'result': result
},
timeout=10
)
if resp.status_code == 200:
logger.debug(f"[task:{task_id}] Success reported")
return True
else:
logger.warning(f"[task:{task_id}] Report success failed: {resp.status_code}")
return False
except Exception as e:
logger.error(f"[task:{task_id}] Report success error: {e}")
return False
def report_fail(self, task_id: str, error_code: str, error_message: str) -> bool:
"""
报告任务失败
Args:
task_id: 任务 ID
error_code: 错误码
error_message: 错误信息
Returns:
bool: 是否成功
"""
url = f"{self.base_url}/render/v2/task/{task_id}/fail"
try:
resp = self.session.post(
url,
json={
'workerId': self.worker_id,
'errorCode': error_code,
'errorMessage': error_message[:1000] # 限制长度
},
timeout=10
)
if resp.status_code == 200:
logger.debug(f"[task:{task_id}] Failure reported")
return True
else:
logger.warning(f"[task:{task_id}] Report fail failed: {resp.status_code}")
return False
except Exception as e:
logger.error(f"[task:{task_id}] Report fail error: {e}")
return False
def get_upload_url(self, task_id: str, file_type: str, file_name: str = None) -> Optional[Dict[str, str]]:
"""
获取上传 URL
Args:
task_id: 任务 ID
file_type: 文件类型(video/audio/ts/mp4)
file_name: 文件名(可选)
Returns:
Dict 包含 uploadUrl 和 accessUrl,失败返回 None
"""
url = f"{self.base_url}/render/v2/task/{task_id}/uploadUrl"
payload = {'fileType': file_type}
if file_name:
payload['fileName'] = file_name
try:
resp = self.session.post(url, json=payload, timeout=10)
if resp.status_code == 200:
data = resp.json()
if data.get('code') == 200:
return data.get('data')
logger.warning(f"[task:{task_id}] Get upload URL failed: {resp.status_code}")
return None
except Exception as e:
logger.error(f"[task:{task_id}] Get upload URL error: {e}")
return None
def extend_lease(self, task_id: str, extension: int = None) -> bool:
"""
延长租约
Args:
task_id: 任务 ID
extension: 延长秒数(默认使用配置值)
Returns:
bool: 是否成功
"""
if extension is None:
extension = self.config.lease_extension_duration
url = f"{self.base_url}/render/v2/task/{task_id}/extend-lease"
try:
resp = self.session.post(
url,
params={
'workerId': self.worker_id,
'extension': extension
},
timeout=10
)
if resp.status_code == 200:
logger.debug(f"[task:{task_id}] Lease extended by {extension}s")
return True
else:
logger.warning(f"[task:{task_id}] Extend lease failed: {resp.status_code}")
return False
except Exception as e:
logger.error(f"[task:{task_id}] Extend lease error: {e}")
return False
def get_task_info(self, task_id: str) -> Optional[Dict]:
"""
获取任务详情
Args:
task_id: 任务 ID
Returns:
任务详情字典,失败返回 None
"""
url = f"{self.base_url}/render/v2/task/{task_id}"
try:
resp = self.session.get(url, timeout=10)
if resp.status_code == 200:
data = resp.json()
if data.get('code') == 200:
return data.get('data')
return None
except Exception as e:
logger.error(f"[task:{task_id}] Get task info error: {e}")
return None
def _get_ffmpeg_version(self) -> str:
"""获取 FFmpeg 版本"""
try:
result = subprocess.run(
['ffmpeg', '-version'],
capture_output=True,
text=True,
timeout=5
)
first_line = result.stdout.split('\n')[0]
if 'version' in first_line:
parts = first_line.split()
for i, part in enumerate(parts):
if part == 'version' and i + 1 < len(parts):
return parts[i + 1]
return 'unknown'
except Exception:
return 'unknown'
def _get_codec_info(self) -> str:
"""获取支持的编解码器信息"""
try:
result = subprocess.run(
['ffmpeg', '-codecs'],
capture_output=True,
text=True,
timeout=5
)
# 检查常用编解码器
codecs = []
output = result.stdout
if 'libx264' in output:
codecs.append('libx264')
if 'libx265' in output or 'hevc' in output:
codecs.append('libx265')
if 'aac' in output:
codecs.append('aac')
if 'libfdk_aac' in output:
codecs.append('libfdk_aac')
return ', '.join(codecs) if codecs else 'unknown'
except Exception:
return 'unknown'
def _get_system_info(self) -> Dict[str, Any]:
"""获取系统信息"""
try:
import platform
import psutil
info = {
'os': platform.system(),
'cpu': f"{psutil.cpu_count()} cores",
'memory': f"{psutil.virtual_memory().total // (1024**3)}GB",
'cpuUsage': f"{psutil.cpu_percent()}%",
'memoryAvailable': f"{psutil.virtual_memory().available // (1024**3)}GB"
}
# 尝试获取 GPU 信息
gpu_info = self._get_gpu_info()
if gpu_info:
info['gpu'] = gpu_info
return info
except Exception:
return {}
def _get_gpu_info(self) -> Optional[str]:
"""获取 GPU 信息"""
try:
result = subprocess.run(
['nvidia-smi', '--query-gpu=name', '--format=csv,noheader'],
capture_output=True,
text=True,
timeout=5
)
if result.returncode == 0:
gpu_name = result.stdout.strip().split('\n')[0]
return gpu_name
except Exception:
pass
return None
def close(self):
"""关闭会话"""
self.session.close()

110
services/lease_service.py Normal file
View File

@@ -0,0 +1,110 @@
# -*- coding: utf-8 -*-
"""
租约续期服务
后台线程定期为正在执行的任务续期租约。
"""
import logging
import threading
import time
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from services.api_client import APIClientV2
logger = logging.getLogger(__name__)
class LeaseService:
"""
租约续期服务
在后台线程中定期调用 API 延长任务租约,
防止长时间任务因租约过期被回收。
"""
def __init__(
self,
api_client: 'APIClientV2',
task_id: str,
interval: int = 60,
extension: int = 300
):
"""
初始化租约服务
Args:
api_client: API 客户端
task_id: 任务 ID
interval: 续期间隔(秒),默认 60 秒
extension: 每次续期时长(秒),默认 300 秒
"""
self.api_client = api_client
self.task_id = task_id
self.interval = interval
self.extension = extension
self.running = False
self.thread: threading.Thread = None
self._stop_event = threading.Event()
def start(self):
"""启动租约续期线程"""
if self.running:
logger.warning(f"[task:{self.task_id}] Lease service already running")
return
self.running = True
self._stop_event.clear()
self.thread = threading.Thread(
target=self._run,
name=f"LeaseService-{self.task_id}",
daemon=True
)
self.thread.start()
logger.debug(f"[task:{self.task_id}] Lease service started (interval={self.interval}s)")
def stop(self):
"""停止租约续期线程"""
if not self.running:
return
self.running = False
self._stop_event.set()
if self.thread and self.thread.is_alive():
self.thread.join(timeout=5)
logger.debug(f"[task:{self.task_id}] Lease service stopped")
def _run(self):
"""续期线程主循环"""
while self.running:
# 等待指定间隔或收到停止信号
if self._stop_event.wait(timeout=self.interval):
# 收到停止信号
break
if self.running:
self._extend_lease()
def _extend_lease(self):
"""执行租约续期"""
try:
success = self.api_client.extend_lease(self.task_id, self.extension)
if success:
logger.debug(f"[task:{self.task_id}] Lease extended by {self.extension}s")
else:
logger.warning(f"[task:{self.task_id}] Failed to extend lease")
except Exception as e:
logger.warning(f"[task:{self.task_id}] Lease extension error: {e}")
def __enter__(self):
"""上下文管理器入口"""
self.start()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""上下文管理器出口"""
self.stop()
return False

200
services/storage.py Normal file
View File

@@ -0,0 +1,200 @@
# -*- coding: utf-8 -*-
"""
存储服务
提供文件上传/下载功能,支持 OSS 签名 URL 和 HTTP_REPLACE_MAP 环境变量。
"""
import os
import logging
from typing import Optional
import requests
logger = logging.getLogger(__name__)
def _apply_http_replace_map(url: str) -> str:
"""
应用 HTTP_REPLACE_MAP 环境变量替换 URL
Args:
url: 原始 URL
Returns:
替换后的 URL
"""
replace_map = os.getenv("HTTP_REPLACE_MAP", "")
if not replace_map:
return url
new_url = url
replace_list = [i.split("|", 1) for i in replace_map.split(",") if "|" in i]
for src, dst in replace_list:
new_url = new_url.replace(src, dst)
if new_url != url:
logger.debug(f"HTTP_REPLACE_MAP: {url} -> {new_url}")
return new_url
def upload_file(url: str, file_path: str, max_retries: int = 5, timeout: int = 60) -> bool:
"""
使用签名 URL 上传文件到 OSS
Args:
url: 签名 URL
file_path: 本地文件路径
max_retries: 最大重试次数
timeout: 超时时间(秒)
Returns:
是否成功
"""
if not os.path.exists(file_path):
logger.error(f"File not found: {file_path}")
return False
file_size = os.path.getsize(file_path)
logger.info(f"Uploading: {file_path} ({file_size} bytes)")
# 检查是否使用 rclone 上传
if os.getenv("UPLOAD_METHOD") == "rclone":
result = _upload_with_rclone(url, file_path)
if result:
return True
# rclone 失败时回退到 HTTP
# 应用 HTTP_REPLACE_MAP 替换 URL
http_url = _apply_http_replace_map(url)
retries = 0
while retries < max_retries:
try:
with open(file_path, 'rb') as f:
response = requests.put(
http_url,
data=f,
stream=True,
timeout=timeout,
headers={"Content-Type": "application/octet-stream"}
)
response.raise_for_status()
logger.info(f"Upload succeeded: {file_path}")
return True
except requests.exceptions.Timeout:
retries += 1
logger.warning(f"Upload timed out. Retrying {retries}/{max_retries}...")
except requests.exceptions.RequestException as e:
retries += 1
logger.warning(f"Upload failed ({e}). Retrying {retries}/{max_retries}...")
logger.error(f"Upload failed after {max_retries} retries: {file_path}")
return False
def _upload_with_rclone(url: str, file_path: str) -> bool:
"""
使用 rclone 上传文件
Args:
url: 目标 URL
file_path: 本地文件路径
Returns:
是否成功
"""
replace_map = os.getenv("RCLONE_REPLACE_MAP", "")
if not replace_map:
return False
config_file = os.getenv("RCLONE_CONFIG_FILE", "")
rclone_config = f"--config {config_file}" if config_file else ""
# 替换 URL
new_url = url
replace_list = [i.split("|", 1) for i in replace_map.split(",") if "|" in i]
for src, dst in replace_list:
new_url = new_url.replace(src, dst)
new_url = new_url.split("?", 1)[0] # 移除查询参数
if new_url == url:
return False
cmd = (
f"rclone copyto --no-check-dest --ignore-existing "
f"--multi-thread-chunk-size 8M --multi-thread-streams 8 "
f"{rclone_config} {file_path} {new_url}"
)
logger.debug(f"rclone command: {cmd}")
result = os.system(cmd)
if result == 0:
logger.info(f"rclone upload succeeded: {file_path}")
return True
logger.warning(f"rclone upload failed (code={result}): {file_path}")
return False
def download_file(
url: str,
file_path: str,
max_retries: int = 5,
timeout: int = 30,
skip_if_exist: bool = False
) -> bool:
"""
使用签名 URL 下载文件
Args:
url: 签名 URL
file_path: 本地文件路径
max_retries: 最大重试次数
timeout: 超时时间(秒)
skip_if_exist: 如果文件存在则跳过
Returns:
是否成功
"""
# 如果文件已存在且跳过
if skip_if_exist and os.path.exists(file_path):
logger.debug(f"File exists, skipping download: {file_path}")
return True
logger.info(f"Downloading: {url}")
# 确保目标目录存在
file_dir = os.path.dirname(file_path)
if file_dir:
os.makedirs(file_dir, exist_ok=True)
# 应用 HTTP_REPLACE_MAP 替换 URL
http_url = _apply_http_replace_map(url)
retries = 0
while retries < max_retries:
try:
response = requests.get(http_url, timeout=timeout, stream=True)
response.raise_for_status()
with open(file_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
file_size = os.path.getsize(file_path)
logger.info(f"Download succeeded: {file_path} ({file_size} bytes)")
return True
except requests.exceptions.Timeout:
retries += 1
logger.warning(f"Download timed out. Retrying {retries}/{max_retries}...")
except requests.exceptions.RequestException as e:
retries += 1
logger.warning(f"Download failed ({e}). Retrying {retries}/{max_retries}...")
logger.error(f"Download failed after {max_retries} retries: {url}")
return False

236
services/task_executor.py Normal file
View File

@@ -0,0 +1,236 @@
# -*- coding: utf-8 -*-
"""
任务执行器
管理任务的并发执行,协调处理器、租约服务等组件。
"""
import logging
import threading
from concurrent.futures import ThreadPoolExecutor, Future
from typing import Dict, Optional, TYPE_CHECKING
from domain.task import Task, TaskType
from domain.result import TaskResult, ErrorCode
from domain.config import WorkerConfig
from core.handler import TaskHandler
from services.lease_service import LeaseService
if TYPE_CHECKING:
from services.api_client import APIClientV2
logger = logging.getLogger(__name__)
class TaskExecutor:
"""
任务执行器
负责任务的并发调度和执行,包括:
- 注册和管理任务处理器
- 维护任务执行状态
- 协调租约续期
- 上报执行结果
"""
def __init__(self, config: WorkerConfig, api_client: 'APIClientV2'):
"""
初始化任务执行器
Args:
config: Worker 配置
api_client: API 客户端
"""
self.config = config
self.api_client = api_client
# 任务处理器注册表
self.handlers: Dict[TaskType, TaskHandler] = {}
# 当前任务跟踪
self.current_tasks: Dict[str, Task] = {}
self.current_futures: Dict[str, Future] = {}
# 线程池
self.executor = ThreadPoolExecutor(
max_workers=config.max_concurrency,
thread_name_prefix="TaskWorker"
)
# 线程安全锁
self.lock = threading.Lock()
# 注册处理器
self._register_handlers()
def _register_handlers(self):
"""注册所有任务处理器"""
# 延迟导入以避免循环依赖
from handlers.render_video import RenderSegmentVideoHandler
from handlers.compose_transition import ComposeTransitionHandler
from handlers.prepare_audio import PrepareJobAudioHandler
from handlers.package_ts import PackageSegmentTsHandler
from handlers.finalize_mp4 import FinalizeMp4Handler
handlers = [
RenderSegmentVideoHandler(self.config, self.api_client),
ComposeTransitionHandler(self.config, self.api_client),
PrepareJobAudioHandler(self.config, self.api_client),
PackageSegmentTsHandler(self.config, self.api_client),
FinalizeMp4Handler(self.config, self.api_client),
]
for handler in handlers:
task_type = handler.get_supported_type()
self.handlers[task_type] = handler
logger.debug(f"Registered handler for {task_type.value}")
def get_current_task_ids(self) -> list:
"""
获取当前正在执行的任务 ID 列表
Returns:
任务 ID 列表
"""
with self.lock:
return list(self.current_tasks.keys())
def get_current_task_count(self) -> int:
"""
获取当前正在执行的任务数量
Returns:
任务数量
"""
with self.lock:
return len(self.current_tasks)
def can_accept_task(self) -> bool:
"""
检查是否可以接受新任务
Returns:
是否可以接受
"""
return self.get_current_task_count() < self.config.max_concurrency
def submit_task(self, task: Task) -> bool:
"""
提交任务到线程池
Args:
task: 任务实体
Returns:
是否提交成功
"""
with self.lock:
# 检查任务是否已在执行
if task.task_id in self.current_tasks:
logger.warning(f"[task:{task.task_id}] Task already running, skipping")
return False
# 检查是否有对应的处理器
if task.task_type not in self.handlers:
logger.error(f"[task:{task.task_id}] No handler for type: {task.task_type.value}")
return False
# 记录任务
self.current_tasks[task.task_id] = task
# 提交到线程池
future = self.executor.submit(self._process_task, task)
self.current_futures[task.task_id] = future
logger.info(f"[task:{task.task_id}] Submitted ({task.task_type.value})")
return True
def _process_task(self, task: Task):
"""
处理单个任务(在线程池中执行)
Args:
task: 任务实体
"""
task_id = task.task_id
logger.info(f"[task:{task_id}] Starting {task.task_type.value}")
# 启动租约续期服务
lease_service = LeaseService(
self.api_client,
task_id,
interval=self.config.lease_extension_threshold,
extension=self.config.lease_extension_duration
)
lease_service.start()
try:
# 报告任务开始
self.api_client.report_start(task_id)
# 获取处理器
handler = self.handlers.get(task.task_type)
if not handler:
raise ValueError(f"No handler for task type: {task.task_type}")
# 执行前钩子
handler.before_handle(task)
# 执行任务
result = handler.handle(task)
# 执行后钩子
handler.after_handle(task, result)
# 上报结果
if result.success:
self.api_client.report_success(task_id, result.data)
logger.info(f"[task:{task_id}] Completed successfully")
else:
error_code = result.error_code.value if result.error_code else 'E_UNKNOWN'
self.api_client.report_fail(task_id, error_code, result.error_message or '')
logger.error(f"[task:{task_id}] Failed: {result.error_message}")
except Exception as e:
logger.error(f"[task:{task_id}] Exception: {e}", exc_info=True)
self.api_client.report_fail(task_id, 'E_UNKNOWN', str(e))
finally:
# 停止租约续期
lease_service.stop()
# 从当前任务中移除
with self.lock:
self.current_tasks.pop(task_id, None)
self.current_futures.pop(task_id, None)
def shutdown(self, wait: bool = True):
"""
关闭执行器
Args:
wait: 是否等待所有任务完成
"""
logger.info("Shutting down task executor...")
# 关闭线程池
self.executor.shutdown(wait=wait)
# 清理状态
with self.lock:
self.current_tasks.clear()
self.current_futures.clear()
logger.info("Task executor shutdown complete")
def get_handler(self, task_type: TaskType) -> Optional[TaskHandler]:
"""
获取指定类型的处理器
Args:
task_type: 任务类型
Returns:
处理器实例,不存在则返回 None
"""
return self.handlers.get(task_type)

1
template/.gitignore vendored
View File

@@ -1 +0,0 @@
**/*

View File

@@ -1,123 +0,0 @@
import json
import os
import logging
from util import api, oss
TEMPLATES = {}
logger = logging.getLogger("template")
def check_local_template(local_name):
template_def = TEMPLATES[local_name]
base_dir = template_def.get("local_path")
for video_part in template_def.get("video_parts", []):
source_file = video_part.get("source", "")
if str(source_file).startswith("http"):
# download file
...
elif str(source_file).startswith("PLACEHOLDER_"):
continue
else:
if not os.path.isabs(source_file):
source_file = os.path.join(base_dir, source_file)
if not os.path.exists(source_file):
logger.error(f"{source_file} not found, please check the template definition")
raise Exception(f"{source_file} not found, please check the template definition")
for audio in video_part.get("audios", []):
if not os.path.isabs(audio):
audio = os.path.join(base_dir, audio)
if not os.path.exists(audio):
logger.error(f"{audio} not found, please check the template definition")
raise Exception(f"{audio} not found, please check the template definition")
for lut in video_part.get("luts", []):
if not os.path.isabs(lut):
lut = os.path.join(base_dir, lut)
if not os.path.exists(lut):
logger.error(f"{lut} not found, please check the template definition")
raise Exception(f"{lut} not found, please check the template definition")
for mask in video_part.get("overlays", []):
if not os.path.isabs(mask):
mask = os.path.join(base_dir, mask)
if not os.path.exists(mask):
logger.error(f"{mask} not found, please check the template definition")
raise Exception(f"{mask} not found, please check the template definition")
def load_template(template_name, local_path):
global TEMPLATES
logger.info(f"加载视频模板定义:【{template_name}{local_path})】")
template_def_file = os.path.join(local_path, "template.json")
if os.path.exists(template_def_file):
TEMPLATES[template_name] = json.load(open(template_def_file, 'rb'))
TEMPLATES[template_name]["local_path"] = local_path
try:
check_local_template(template_name)
logger.info(f"完成加载【{template_name}】模板")
except Exception as e:
logger.error(f"模板定义文件【{template_def_file}】有误,正在尝试重新下载模板", exc_info=e)
download_template(template_name)
def load_local_template():
for template_name in os.listdir(os.getenv("TEMPLATE_DIR")):
if template_name.startswith("_"):
continue
if template_name.startswith("."):
continue
target_path = os.path.join(os.getenv("TEMPLATE_DIR"), template_name)
if os.path.isdir(target_path):
load_template(template_name, target_path)
def get_template_def(template_id):
if template_id not in TEMPLATES:
download_template(template_id)
return TEMPLATES.get(template_id)
def download_template(template_id):
template_info = api.get_template_info(template_id)
if not os.path.isdir(template_info['local_path']):
os.makedirs(template_info['local_path'])
# download template assets
overall_template = template_info['overall_template']
video_parts = template_info['video_parts']
def _download_assets(_template):
if 'source' in _template:
if str(_template['source']).startswith("http"):
_, _fn = os.path.split(_template['source'])
new_fp = os.path.join(template_info['local_path'], _fn)
oss.download_from_oss(_template['source'], new_fp)
if _fn.endswith(".mp4"):
from util.ffmpeg import to_annexb
new_fp = to_annexb(new_fp)
_template['source'] = os.path.relpath(new_fp, template_info['local_path'])
if 'overlays' in _template:
for i in range(len(_template['overlays'])):
overlay = _template['overlays'][i]
if str(overlay).startswith("http"):
_, _fn = os.path.split(overlay)
oss.download_from_oss(overlay, os.path.join(template_info['local_path'], _fn))
_template['overlays'][i] = _fn
if 'luts' in _template:
for i in range(len(_template['luts'])):
lut = _template['luts'][i]
if str(lut).startswith("http"):
_, _fn = os.path.split(lut)
oss.download_from_oss(lut, os.path.join(template_info['local_path'], _fn))
_template['luts'][i] = _fn
if 'audios' in _template:
for i in range(len(_template['audios'])):
if str(_template['audios'][i]).startswith("http"):
_, _fn = os.path.split(_template['audios'][i])
oss.download_from_oss(_template['audios'][i], os.path.join(template_info['local_path'], _fn))
_template['audios'][i] = _fn
_download_assets(overall_template)
for video_part in video_parts:
_download_assets(video_part)
with open(os.path.join(template_info['local_path'], 'template.json'), 'w', encoding='utf-8') as f:
json.dump(template_info, f)
load_template(template_id, template_info['local_path'])
def analyze_template(template_id):
...

15
util/__init__.py Normal file
View File

@@ -0,0 +1,15 @@
# -*- coding: utf-8 -*-
"""
工具模块
提供系统信息采集等工具函数。
"""
from util.system import get_sys_info, get_capabilities, get_gpu_info, get_ffmpeg_version
__all__ = [
'get_sys_info',
'get_capabilities',
'get_gpu_info',
'get_ffmpeg_version',
]

View File

@@ -1,174 +0,0 @@
import logging
import os
import requests
import util.system
session = requests.Session()
logger = logging.getLogger(__name__)
def normalize_task(task_info):
...
return task_info
def sync_center():
"""
通过接口获取任务
:return: 任务列表
"""
try:
from template import TEMPLATES, download_template
response = session.post(os.getenv('API_ENDPOINT') + "/sync", json={
'accessKey': os.getenv('ACCESS_KEY'),
'clientStatus': util.system.get_sys_info(),
'templateList': [{'id': t.get('id', ''), 'updateTime': t.get('updateTime', '')} for t in TEMPLATES.values()]
}, timeout=10)
response.raise_for_status()
except requests.RequestException as e:
logger.error("请求失败!", e)
return []
data = response.json()
logger.debug("获取任务结果:【%s", data)
if data.get('code', 0) == 200:
templates = data.get('data', {}).get('templates', [])
tasks = data.get('data', {}).get('tasks', [])
else:
tasks = []
templates = []
logger.warning("获取任务失败")
for template in templates:
template_id = template.get('id', '')
if template_id:
logger.info("更新模板:【%s", template_id)
download_template(template_id)
return tasks
def get_template_info(template_id):
"""
通过接口获取模板信息
:rtype: Template
:param template_id: 模板id
:type template_id: str
:return: 模板信息
"""
try:
response = session.post('{0}/template/{1}'.format(os.getenv('API_ENDPOINT'), template_id), json={
'accessKey': os.getenv('ACCESS_KEY'),
}, timeout=10)
response.raise_for_status()
except requests.RequestException as e:
logger.error("请求失败!", e)
return None
data = response.json()
remote_template_info = data.get('data', {})
logger.debug("获取模板信息结果:【%s", remote_template_info)
template = {
'id': template_id,
'updateTime': remote_template_info.get('updateTime', template_id),
'scenic_name': remote_template_info.get('scenicName', '景区'),
'name': remote_template_info.get('name', '模版'),
'video_size': '1920x1080',
'frame_rate': 30,
'overall_duration': 30,
'video_parts': [
]
}
def _template_normalizer(template_info):
_template = {}
_placeholder_type = template_info.get('isPlaceholder', -1)
if _placeholder_type == 0:
# 固定视频
_template['source'] = template_info.get('sourceUrl', '')
elif _placeholder_type == 1:
# 占位符
_template['source'] = "PLACEHOLDER_" + template_info.get('sourceUrl', '')
_template['mute'] = template_info.get('mute', True)
else:
_template['source'] = None
_overlays = template_info.get('overlays', '')
if _overlays:
_template['overlays'] = _overlays.split(",")
_audios = template_info.get('audios', '')
if _audios:
_template['audios'] = _audios.split(",")
_luts = template_info.get('luts', '')
if _luts:
_template['luts'] = _luts.split(",")
return _template
# outer template definition
overall_template = _template_normalizer(remote_template_info)
template['overall_template'] = overall_template
# inter template definition
inter_template_list = remote_template_info.get('children', [])
for children_template in inter_template_list:
parts = _template_normalizer(children_template)
template['video_parts'].append(parts)
template['local_path'] = os.path.join(os.getenv('TEMPLATE_DIR'), str(template_id))
return template
def report_task_success(task_info, **kwargs):
try:
response = session.post('{0}/{1}/success'.format(os.getenv('API_ENDPOINT'), task_info.get("id")), json={
'accessKey': os.getenv('ACCESS_KEY'),
**kwargs
}, timeout=10)
response.raise_for_status()
except requests.RequestException as e:
logger.error("请求失败!", e)
return None
def report_task_start(task_info):
try:
response = session.post('{0}/{1}/start'.format(os.getenv('API_ENDPOINT'), task_info.get("id")), json={
'accessKey': os.getenv('ACCESS_KEY'),
}, timeout=10)
response.raise_for_status()
except requests.RequestException as e:
logger.error("请求失败!", e)
return None
def report_task_failed(task_info, reason=''):
try:
response = session.post('{0}/{1}/fail'.format(os.getenv('API_ENDPOINT'), task_info.get("id")), json={
'accessKey': os.getenv('ACCESS_KEY'),
'reason': reason
}, timeout=10)
response.raise_for_status()
except requests.RequestException as e:
logger.error("请求失败!", e)
return None
def upload_task_file(task_info, ffmpeg_task):
logger.info("开始上传文件: %s", task_info.get("id"))
try:
response = session.post('{0}/{1}/uploadUrl'.format(os.getenv('API_ENDPOINT'), task_info.get("id")), json={
'accessKey': os.getenv('ACCESS_KEY'),
}, timeout=10)
response.raise_for_status()
except requests.RequestException as e:
logger.error("请求失败!", e)
return False
data = response.json()
url = data.get('data', "")
logger.info("开始上传文件: %s%s", task_info.get("id"), url)
try:
with open(ffmpeg_task.get_output_file(), 'rb') as f:
requests.put(url, data=f)
except requests.RequestException as e:
logger.error("上传失败!", e)
return False
finally:
logger.info("上传文件结束: %s", task_info.get("id"))
return True

View File

@@ -1,127 +0,0 @@
import logging
import os
import subprocess
from datetime import datetime
from typing import Optional, IO
from entity.ffmpeg import FfmpegTask
logger = logging.getLogger(__name__)
def to_annexb(file):
if not os.path.exists(file):
return file
logger.info("ToAnnexb: %s", file)
ffmpeg_process = subprocess.run(["ffmpeg", "-y", "-hide_banner", "-i", file, "-c", "copy", "-bsf:v", "h264_mp4toannexb",
"-f", "mpegts", file+".ts"])
logger.info("ToAnnexb: %s, returned: %s", file, ffmpeg_process.returncode)
if ffmpeg_process.returncode == 0:
os.remove(file)
return file+".ts"
else:
return file
def start_render(ffmpeg_task: FfmpegTask):
logger.info(ffmpeg_task)
if not ffmpeg_task.need_run():
ffmpeg_task.set_output_file(ffmpeg_task.input_file[0])
return True
ffmpeg_args = ffmpeg_task.get_ffmpeg_args()
logger.info(ffmpeg_args)
if len(ffmpeg_args) == 0:
ffmpeg_task.set_output_file(ffmpeg_task.input_file[0])
return True
ffmpeg_process = subprocess.run(["ffmpeg", "-progress", "-", "-loglevel", "error", *ffmpeg_args], **subprocess_args(True))
logger.info("FINISH TASK, OUTPUT IS %s", handle_ffmpeg_output(ffmpeg_process.stdout))
code = ffmpeg_process.returncode
return code == 0
def handle_ffmpeg_output(stdout: Optional[bytes]) -> str:
out_time = "0:0:0.0"
if stdout is None:
print("[!]STDOUT is null")
return out_time
speed = "0"
for line in stdout.split(b"\n"):
if line == b"":
break
if line.strip() == b"progress=end":
# 处理完毕
break
if line.startswith(b"out_time="):
out_time = line.replace(b"out_time=", b"").decode().strip()
if line.startswith(b"speed="):
speed = line.replace(b"speed=", b"").decode().strip()
print("[ ]Speed:", out_time, "@", speed)
return out_time+"@"+speed
def duration_str_to_float(duration_str: str) -> float:
_duration = datetime.strptime(duration_str, "%H:%M:%S.%f") - datetime(1900, 1, 1)
return _duration.total_seconds()
def probe_video_info(video_file):
# 获取宽度和高度
result = subprocess.run(
["ffprobe.exe", '-v', 'error', '-select_streams', 'v:0', '-show_entries', 'stream=width,height:format=duration', '-of',
'csv=s=x:p=0', video_file],
stderr=subprocess.STDOUT,
**subprocess_args(True)
)
all_result = result.stdout.decode('utf-8').strip()
wh, duration = all_result.split('\n')
width, height = wh.strip().split('x')
return int(width), int(height), float(duration)
# Create a set of arguments which make a ``subprocess.Popen`` (and
# variants) call work with or without Pyinstaller, ``--noconsole`` or
# not, on Windows and Linux. Typical use::
#
# subprocess.call(['program_to_run', 'arg_1'], **subprocess_args())
#
# When calling ``check_output``::
#
# subprocess.check_output(['program_to_run', 'arg_1'],
# **subprocess_args(False))
def subprocess_args(include_stdout=True):
# The following is true only on Windows.
if hasattr(subprocess, 'STARTUPINFO'):
# On Windows, subprocess calls will pop up a command window by default
# when run from Pyinstaller with the ``--noconsole`` option. Avoid this
# distraction.
si = subprocess.STARTUPINFO()
si.dwFlags |= subprocess.STARTF_USESHOWWINDOW
# Windows doesn't search the path by default. Pass it an environment so
# it will.
env = os.environ
else:
si = None
env = None
# ``subprocess.check_output`` doesn't allow specifying ``stdout``::
#
# Traceback (most recent call last):
# File "test_subprocess.py", line 58, in <module>
# **subprocess_args(stdout=None))
# File "C:\Python27\lib\subprocess.py", line 567, in check_output
# raise ValueError('stdout argument not allowed, it will be overridden.')
# ValueError: stdout argument not allowed, it will be overridden.
#
# So, add it only if it's needed.
if include_stdout:
ret = {'stdout': subprocess.PIPE}
else:
ret = {}
# On Windows, running this from the binary produced by Pyinstaller
# with the ``--noconsole`` option requires redirecting everything
# (stdin, stdout, stderr) to avoid an OSError exception
# "[Error 6] the handle is invalid."
ret.update({'stdin': subprocess.PIPE,
'startupinfo': si,
'env': env})
return ret

View File

@@ -1,43 +0,0 @@
import logging
import os
import requests
logger = logging.getLogger(__name__)
def upload_to_oss(url, file_path):
"""
使用签名URL上传文件到OSS
:param str url: 签名URL
:param str file_path: 文件路径
:return bool: 是否成功
"""
with open(file_path, 'rb') as f:
try:
response = requests.put(url, data=f)
return response.status_code == 200
except Exception as e:
print(e)
return False
def download_from_oss(url, file_path):
"""
使用签名URL下载文件到OSS
:param str url: 签名URL
:param Union[LiteralString, str, bytes] file_path: 文件路径
:return bool: 是否成功
"""
logging.info("download_from_oss: %s", url)
file_dir, file_name = os.path.split(file_path)
if file_dir:
if not os.path.exists(file_dir):
os.makedirs(file_dir)
try:
response = requests.get(url)
with open(file_path, 'wb') as f:
f.write(response.content)
return True
except Exception as e:
print(e)
return False

View File

@@ -1,24 +1,103 @@
# -*- coding: utf-8 -*-
"""
系统信息工具
提供系统信息采集功能。
"""
import os
import platform
from datetime import datetime
import subprocess
from typing import Optional
import psutil
from constant import SUPPORT_FEATURE, SOFTWARE_VERSION
from constant import SOFTWARE_VERSION, DEFAULT_CAPABILITIES
def get_sys_info():
"""
Returns a dictionary with system information.
获取系统信息
Returns:
dict: 系统信息字典
"""
mem = psutil.virtual_memory()
info = {
'version': SOFTWARE_VERSION,
'client_datetime': datetime.now().isoformat(),
'os': platform.system(),
'cpu': f"{os.cpu_count()} cores",
'memory': f"{mem.total // (1024**3)}GB",
'cpuUsage': f"{psutil.cpu_percent()}%",
'memoryAvailable': f"{mem.available // (1024**3)}GB",
'platform': platform.system(),
'runtime_version': 'Python ' + platform.python_version(),
'cpu_count': os.cpu_count(),
'cpu_usage': psutil.cpu_percent(),
'memory_total': psutil.virtual_memory().total,
'memory_available': psutil.virtual_memory().available,
'support_feature': SUPPORT_FEATURE
'pythonVersion': platform.python_version(),
'version': SOFTWARE_VERSION,
}
# 尝试获取 GPU 信息
gpu_info = get_gpu_info()
if gpu_info:
info['gpu'] = gpu_info
return info
def get_capabilities():
"""
获取 Worker 支持的能力列表
Returns:
list: 能力列表
"""
return DEFAULT_CAPABILITIES.copy()
def get_gpu_info() -> Optional[str]:
"""
尝试获取 GPU 信息
Returns:
str: GPU 信息,失败返回 None
"""
try:
# 尝试使用 nvidia-smi
result = subprocess.run(
['nvidia-smi', '--query-gpu=name', '--format=csv,noheader'],
capture_output=True,
text=True,
timeout=5
)
if result.returncode == 0:
gpu_name = result.stdout.strip().split('\n')[0]
return gpu_name
except Exception:
pass
return None
def get_ffmpeg_version() -> str:
"""
获取 FFmpeg 版本
Returns:
str: FFmpeg 版本号
"""
try:
result = subprocess.run(
['ffmpeg', '-version'],
capture_output=True,
text=True,
timeout=5
)
if result.returncode == 0:
first_line = result.stdout.split('\n')[0]
# 解析版本号,例如 "ffmpeg version 6.0 ..."
parts = first_line.split()
for i, part in enumerate(parts):
if part == 'version' and i + 1 < len(parts):
return parts[i + 1]
except Exception:
pass
return 'unknown'