Compare commits

70 Commits

Author SHA1 Message Date
d770d84927 feat(重构): 实现新的渲染服务架构
- 新增 RenderTask
2025-09-12 14:41:58 +08:00
c36e838d4f fix(entity): 修复缩放效果
fix(entity): 移除 ffmpeg缩放和裁剪滤镜中的 setpts 指令

移除了 ffmpeg 缩放、裁剪和尾部处理滤镜中的 setpts=PTS-STARTPTS指令。这个指令在某些情况下可能导致视频处理出现问题,例如在使用 zoompan 滤镜时。此修改旨在提高视频处理的稳定性和正确性。

fix(entity): 修复缩放效果中中心点计算错误

-针对静态缩放和动态缩放分别修正了中心点计算公式
- 确保在不同缩放因子下,图像中心点位置保持正确

fix(entity): 修复 ffmpeg zoompan 滤镜参数

-将 zoompan 滤镜的参数从 'z=' 改为 'z=',统一参数格式- 此修改解决了 ffmpeg 在处理某些视频时可能遇到的参数解析问题

feat(zoom): 实现视频缩放特效的自定义中心点功能

- 添加代码以解析 posJson 数据,计算并设置缩放中心点
- 使用 zoompan滤镜替代原有的 scale 和 crop滤镜,支持动态缩放
- 优化静态缩放的实现,确保整个视频时长的应用

fix(entity): 修复视频缩放效果的 FFmpeg 命令

- 在 zoom_expr 中添加转义字符,以解决 FFmpeg 解析问题
- 修改缩放和裁剪滤镜的参数,提高视频处理的准确性
2025-09-12 13:20:10 +08:00
1571934943 fix(entity): 修复中心裁剪计算逻辑并优化 JSON 解析
- 在解析 posJson 时添加异常处理,避免无效 JSON 导致程序崩溃
- 修复中心裁剪计算逻辑中的取整问题,确保裁剪位置准确
2025-09-07 01:45:56 +08:00
35693ac83c build(constant): 更新软件版本号
- 将 SOFTWARE_VERSION 从 '0.0.4' 修改为 '0.0.5'
2025-09-06 15:44:24 +08:00
d154f2c74d feat(api): 添加模板属性 zoom_cut
在模板信息中增加了 zoom_cut 属性,用于获取模板的缩放裁剪信息。
2025-09-06 15:43:55 +08:00
bd0c44b17f tail效果 2025-08-12 14:22:26 +08:00
432472fd19 逻辑问题 2025-08-09 10:57:45 +08:00
8f0250df43 通过argv传skip_if_exist默认值 2025-08-08 13:58:01 +08:00
0209c5de3f 单独渲染模板 2025-08-08 13:58:01 +08:00
51e7d21f84 帧跳过、zoom 2025-08-08 13:58:01 +08:00
0770cb361d vsync 2025-08-05 17:43:01 +08:00
2f694da5fd hevc+重下模板 2025-08-05 12:43:27 +08:00
bf912037d1 lut 2025-08-01 17:24:14 +08:00
1119a7b030 onlyIf判断优化 2025-08-01 17:24:14 +08:00
5282e58a10 支持zoom_cut 2025-07-21 10:58:07 +08:00
f7141e5d4e Thread-span支持 2025-07-19 14:07:39 +08:00
f23bcfdd25 调小chunk-size 2025-07-18 13:54:38 +08:00
4b080771f6 允许跳过下载,并发下载,env和版本更新 2025-07-18 13:48:46 +08:00
13a10b9812 修复 2025-06-04 15:38:14 +08:00
3976b72607 优化埋点 2025-05-29 10:05:35 +08:00
04ce423811 优化裁切参数获取,避免同机位多素材出问题 2025-05-29 10:01:21 +08:00
399c3d2dc6 修复裁切 2025-05-27 11:09:10 +08:00
ef3edafcd6 支持rclone多线程上传 2025-05-26 16:17:38 +08:00
6d631d873e onlyif判断 2025-05-05 19:47:54 +08:00
02dd2b72a0 根据模板定义的分辨率进行操作 2025-04-30 18:07:33 +08:00
d8bc3c8595 健康检查时同步信息 2025-04-28 17:59:44 +08:00
5d58198b7e 接口支持查询模板信息,避免使用旧模板 2025-04-28 17:57:34 +08:00
789513a0be 避免勿删模板 2025-04-28 16:32:34 +08:00
b3911839f3 app不使用批量上报 2025-04-28 16:28:27 +08:00
1c0e4ce411 添加dockerfile 2025-04-28 15:45:22 +08:00
1603be9157 尝试传入resolution,不使用scale自适应模板 2025-04-28 15:02:50 +08:00
f139fbccd7 下载模板时trace归组 2025-04-27 14:24:12 +08:00
2fb0f93886 收集ffmpeg异常,流式上传 2025-04-27 13:47:52 +08:00
9537f995a1 支持redirection 2025-04-20 14:32:06 +08:00
ec03f8180e 修改接口 2025-04-20 12:32:33 +08:00
972b6a4e4d 修改接口 2025-04-20 12:09:17 +08:00
3d810e5c5b 添加接口,添加方便测试的方法 2025-04-20 11:50:32 +08:00
a9043361ec 支持通过env获取encoder args 2025-04-20 11:02:23 +08:00
740a3c7a63 完善requirements.txt 2025-04-20 10:54:49 +08:00
450240bd5a 支持同机位多视频片段复用 2025-04-14 14:15:16 +08:00
6b5975d8b9 更换oltp服务器 2025-04-04 15:51:10 +08:00
85c2e7459e 删除sync_center埋点 2025-04-03 11:22:20 +08:00
364ceb29a1 音频淡出 2025-04-01 17:17:57 +08:00
ced0c1ad1e 修改 2025-03-30 18:11:03 +08:00
6e4dbfd843 固定模板支持音乐 2025-03-28 18:08:19 +08:00
09e0f5f3be concat支持annexb 2025-03-28 18:07:57 +08:00
52c2df8b65 删除无用方法 2025-03-28 17:30:28 +08:00
b25ad20ddd 日志 2025-03-28 17:27:24 +08:00
7c6e4a97b2 片段有音频不可以copy 2025-03-28 17:26:49 +08:00
8f0e69c3de overall_speed片段全局变速 2025-03-24 10:30:36 +08:00
b8db0d2b95 metrics调整 2025-03-23 18:36:26 +08:00
6dc7e86e8e 埋点采集部分接口调整 2025-03-18 13:58:36 +08:00
c62f1ab976 upload返回结果 2025-03-11 16:15:51 +08:00
744fe28421 修复居中切割的问题 2025-03-10 15:16:23 +08:00
cf43e6d549 修复amix降低声音的问题,修复reencode_to_annexb不添加音轨的问题 2025-03-10 10:13:30 +08:00
dcf5f5630d 主动判断是否有音频 2025-03-06 23:02:54 +08:00
56bdad7ad1 音轨叠加 2025-03-06 10:34:28 +08:00
94373cee72 cameraShot特效及旋转 2025-03-05 14:57:02 +08:00
4549b0ab44 分辨率和裁切 2025-03-04 17:43:47 +08:00
9d178a3d34 埋点 2025-03-04 12:36:48 +08:00
1f9632761f effect 2025-03-03 14:27:52 +08:00
fff20610a5 profile level指定及修复 2025-02-27 16:48:57 +08:00
67696739f9 切割模式 2025-02-27 14:02:17 +08:00
2ea248c02e 上传文件也弄个超时 2025-02-16 18:15:35 +08:00
358207efdc 定时清理目录下无用文件 2025-02-16 18:15:25 +08:00
94a5e687df 未生成文件时,上报失败 2025-02-08 15:02:36 +08:00
b7d6797901 忽略无用文件 2025-02-05 10:13:33 +08:00
6d9d373032 only if 逻辑 2025-01-23 14:28:51 +08:00
549ee8320a ffprobe 报错后不采用其内容 2025-01-22 16:04:33 +08:00
29bb80f3b9 渲染后再to annexb,使用新逻辑拼接 2025-01-22 14:31:59 +08:00
32 changed files with 3019 additions and 424 deletions

4
.env
View File

@@ -1,4 +0,0 @@
TEMPLATE_DIR=template/
API_ENDPOINT=http://127.0.0.1:8030/task/v1
ACCESS_KEY=TEST_ACCESS_KEY
TEMP_DIR=tmp/

13
.env.example Normal file
View File

@@ -0,0 +1,13 @@
TEMPLATE_DIR=template/
API_ENDPOINT=https://zhentuai.com/task/v1
ACCESS_KEY=TEST_ACCESS_KEY
TEMP_DIR=tmp/
#REDIRECT_TO_URL=https://renderworker-deuvulkhes.cn-shanghai.fcapp.run/
# QSV
ENCODER_ARGS="-c:v h264_qsv -global_quality 28 -look_ahead 1"
# NVENC
#ENCODER_ARGS="-c:v h264_nvenc -cq:v 24 -preset:v p7 -tune:v hq -profile:v high"
# HEVC
#VIDEO_ARGS="-profile:v main
UPLOAD_METHOD="rclone"
RCLONE_REPLACE_MAP="https://oss.zhentuai.com|alioss://frametour-assets,https://frametour-assets.oss-cn-shanghai.aliyuncs.com|alioss://frametour-assets"

6
.gitignore vendored
View File

@@ -6,6 +6,11 @@ __pycache__/
*.so *.so
.Python .Python
build/ build/
dist/
*.mp4
*.ts
rand*.ts
tmp_concat_*.txt
*.egg-info/ *.egg-info/
*.egg *.egg
*.manifest *.manifest
@@ -26,3 +31,4 @@ target/
.venv .venv
venv/ venv/
cython_debug/ cython_debug/
.env

21
Dockerfile Normal file
View File

@@ -0,0 +1,21 @@
FROM linuxserver/ffmpeg:7.1.1
LABEL authors="Jerry Yan"
RUN sed -i 's@//.*archive.ubuntu.com@//mirrors.ustc.edu.cn@g' /etc/apt/sources.list && \
sed -i 's/security.ubuntu.com/mirrors.ustc.edu.cn/g' /etc/apt/sources.list
RUN apt-get update && \
apt-get install -y --no-install-recommends \
python3-pip \
python3-dev \
python3-setuptools \
python3-wheel \
python3-venv
RUN apt-get clean && \
rm -rf /var/lib/apt/lists/*
COPY . /app/
RUN python3 -m venv /app/venv
RUN /app/venv/bin/python -m pip config set global.index-url https://mirrors.ustc.edu.cn/pypi/simple
RUN /app/venv/bin/python -m pip install -r /app/requirements.txt
WORKDIR /app
ENTRYPOINT ["/app/venv/bin/python", "app.py"]

42
app.py Normal file
View File

@@ -0,0 +1,42 @@
import time
import flask
import config
import biz.task
from services import DefaultTemplateService
from telemetry import init_opentelemetry
from util import api
# 使用新的服务架构
template_service = DefaultTemplateService()
template_service.load_local_templates()
import logging
LOGGER = logging.getLogger(__name__)
init_opentelemetry(batch=False)
app = flask.Flask(__name__)
@app.get('/health/check')
def health_check():
return api.sync_center()
@app.post('/')
def do_nothing():
return "NOOP"
@app.post('/<task_id>')
def do_task(task_id):
task_info = api.get_task_info(task_id)
local_template_info = template_service.get_template(task_info.get("templateId"))
template_info = api.get_template_info(task_info.get("templateId"))
if local_template_info:
if local_template_info.get("updateTime") != template_info.get("updateTime"):
template_service.download_template(task_info.get("templateId"))
biz.task.start_task(task_info)
return "OK"
if __name__ == '__main__':
app.run(host="0.0.0.0", port=9998)

View File

@@ -1,30 +1,76 @@
import json import json
import os.path import os.path
import time import time
from concurrent.futures import ThreadPoolExecutor
from opentelemetry.trace import Status, StatusCode
# 使用新架构组件,保持对旧FfmpegTask的兼容
from entity.ffmpeg import FfmpegTask from entity.ffmpeg import FfmpegTask
from entity.render_task import RenderTask, TaskType
from services import DefaultRenderService
import logging import logging
from util import ffmpeg, oss from util import ffmpeg, oss
from util.ffmpeg import fade_out_audio
from telemetry import get_tracer
logger = logging.getLogger('biz/ffmpeg') logger = logging.getLogger('biz/ffmpeg')
_render_service = None
def _get_render_service():
"""获取渲染服务实例"""
global _render_service
if _render_service is None:
_render_service = DefaultRenderService()
return _render_service
def parse_ffmpeg_task(task_info, template_info): def parse_ffmpeg_task(task_info, template_info):
tracer = get_tracer(__name__)
with tracer.start_as_current_span("parse_ffmpeg_task") as span:
tasks = [] tasks = []
# 中间片段 # 中间片段
task_params_str = task_info.get("taskParams", "{}") task_params_str = task_info.get("taskParams", "{}")
task_params = json.loads(task_params_str) span.set_attribute("task_params", task_params_str)
task_params: dict = json.loads(task_params_str)
task_params_orig = json.loads(task_params_str)
# 统计only_if占位符的使用次数
only_if_usage_count = {}
with tracer.start_as_current_span("parse_ffmpeg_task.download_all") as sub_span:
with ThreadPoolExecutor(max_workers=8) as executor:
param_list: list[dict]
for param_list in task_params.values():
for param in param_list:
url = param.get("url")
if url.startswith("http"):
_, fn = os.path.split(url)
executor.submit(oss.download_from_oss, url, fn, True)
executor.shutdown(wait=True)
for part in template_info.get("video_parts"): for part in template_info.get("video_parts"):
source = parse_video(part.get('source'), task_params, template_info) source, ext_data = parse_video(part.get('source'), task_params, template_info)
if not source: if not source:
logger.warning("no video found for part: " + str(part)) logger.warning("no video found for part: " + str(part))
continue continue
only_if = part.get('only_if', '')
if only_if:
only_if_usage_count[only_if] = only_if_usage_count.get(only_if, 0) + 1
required_count = only_if_usage_count.get(only_if)
if not check_placeholder_exist_with_count(only_if, task_params_orig, required_count):
logger.info("because only_if exist, placeholder: %s insufficient (need %d), skip part: %s", only_if, required_count, part)
continue
sub_ffmpeg_task = FfmpegTask(source) sub_ffmpeg_task = FfmpegTask(source)
sub_ffmpeg_task.resolution = template_info.get("video_size", "")
sub_ffmpeg_task.annexb = True sub_ffmpeg_task.annexb = True
sub_ffmpeg_task.ext_data = ext_data or {}
sub_ffmpeg_task.frame_rate = template_info.get("frame_rate", 25) sub_ffmpeg_task.frame_rate = template_info.get("frame_rate", 25)
for lut in part.get('filters', []): sub_ffmpeg_task.center_cut = part.get("crop_mode", None)
sub_ffmpeg_task.add_lut(os.path.join(template_info.get("local_path"), lut)) sub_ffmpeg_task.zoom_cut = part.get("zoom_cut", None)
for effect in part.get('effects', []):
sub_ffmpeg_task.add_effect(effect)
for lut in part.get('luts', []):
sub_ffmpeg_task.add_lut(os.path.join(template_info.get("local_path"), lut).replace("\\", "/"))
for audio in part.get('audios', []): for audio in part.get('audios', []):
sub_ffmpeg_task.add_audios(os.path.join(template_info.get("local_path"), audio)) sub_ffmpeg_task.add_audios(os.path.join(template_info.get("local_path"), audio))
for overlay in part.get('overlays', []): for overlay in part.get('overlays', []):
@@ -32,13 +78,19 @@ def parse_ffmpeg_task(task_info, template_info):
tasks.append(sub_ffmpeg_task) tasks.append(sub_ffmpeg_task)
output_file = "out_" + str(time.time()) + ".mp4" output_file = "out_" + str(time.time()) + ".mp4"
task = FfmpegTask(tasks, output_file=output_file) task = FfmpegTask(tasks, output_file=output_file)
task.resolution = template_info.get("video_size", "")
overall = template_info.get("overall_template") overall = template_info.get("overall_template")
task.center_cut = template_info.get("crop_mode", None)
task.zoom_cut = template_info.get("zoom_cut", None)
task.frame_rate = template_info.get("frame_rate", 25) task.frame_rate = template_info.get("frame_rate", 25)
if overall.get('source', ''): # if overall.get('source', ''):
source = parse_video(overall.get('source'), task_params, template_info) # source, ext_data = parse_video(overall.get('source'), task_params, template_info)
task.add_inputs(source) # task.add_inputs(source)
for lut in overall.get('filters', []): # task.ext_data = ext_data or {}
task.add_lut(os.path.join(template_info.get("local_path"), lut)) for effect in overall.get('effects', []):
task.add_effect(effect)
for lut in overall.get('luts', []):
task.add_lut(os.path.join(template_info.get("local_path"), lut).replace("\\", "/"))
for audio in overall.get('audios', []): for audio in overall.get('audios', []):
task.add_audios(os.path.join(template_info.get("local_path"), audio)) task.add_audios(os.path.join(template_info.get("local_path"), audio))
for overlay in overall.get('overlays', []): for overlay in overall.get('overlays', []):
@@ -47,37 +99,74 @@ def parse_ffmpeg_task(task_info, template_info):
def parse_video(source, task_params, template_info): def parse_video(source, task_params, template_info):
print(source)
if source.startswith('PLACEHOLDER_'): if source.startswith('PLACEHOLDER_'):
placeholder_id = source.replace('PLACEHOLDER_', '') placeholder_id = source.replace('PLACEHOLDER_', '')
new_sources = task_params.get(placeholder_id, []) new_sources = task_params.get(placeholder_id, [])
_pick_source = {}
if type(new_sources) is list: if type(new_sources) is list:
if len(new_sources) == 0: if len(new_sources) == 0:
logger.debug("no video found for placeholder: " + placeholder_id) logger.debug("no video found for placeholder: " + placeholder_id)
return None return None, _pick_source
else: else:
# TODO: Random Pick / Policy Pick _pick_source = new_sources.pop(0)
new_sources = new_sources[0].get("url") new_sources = _pick_source.get("url")
if new_sources.startswith("http"): if new_sources.startswith("http"):
_, source_name = os.path.split(new_sources) _, source_name = os.path.split(new_sources)
oss.download_from_oss(new_sources, source_name) oss.download_from_oss(new_sources, source_name, True)
return source_name return source_name, _pick_source
return new_sources return new_sources, _pick_source
return os.path.join(template_info.get("local_path"), source) return os.path.join(template_info.get("local_path"), source), None
def check_placeholder_exist(placeholder_id, task_params):
if placeholder_id in task_params:
new_sources = task_params.get(placeholder_id, [])
if type(new_sources) is list:
if len(new_sources) == 0:
return False
else:
return True
return True
return False
def check_placeholder_exist_with_count(placeholder_id, task_params, required_count=1):
"""检查占位符是否存在足够数量的片段"""
if placeholder_id in task_params:
new_sources = task_params.get(placeholder_id, [])
if type(new_sources) is list:
return len(new_sources) >= required_count
return required_count <= 1
return False
def start_ffmpeg_task(ffmpeg_task): def start_ffmpeg_task(ffmpeg_task):
for task in ffmpeg_task.analyze_input_render_tasks(): """启动FFmpeg任务 - 使用新的渲染服务"""
start_ffmpeg_task(task) tracer = get_tracer(__name__)
ffmpeg_task.correct_task_type() with tracer.start_as_current_span("start_ffmpeg_task") as span:
return ffmpeg.start_render(ffmpeg_task) try:
# 使用新的渲染服务
render_service = _get_render_service()
result = render_service.render(ffmpeg_task)
if result:
span.set_status(Status(StatusCode.OK))
else:
span.set_status(Status(StatusCode.ERROR))
return result
except Exception as e:
span.set_status(Status(StatusCode.ERROR))
logger.error(f"FFmpeg task failed: {e}", exc_info=True)
return False
def clear_task_tmp_file(ffmpeg_task): def clear_task_tmp_file(ffmpeg_task):
for task in ffmpeg_task.analyze_input_render_tasks(): for task in ffmpeg_task.analyze_input_render_tasks():
clear_task_tmp_file(task) clear_task_tmp_file(task)
try: try:
if "template" not in ffmpeg_task.get_output_file(): if os.getenv("TEMPLATE_DIR") not in ffmpeg_task.get_output_file():
os.remove(ffmpeg_task.get_output_file()) os.remove(ffmpeg_task.get_output_file())
logger.info("delete tmp file: " + ffmpeg_task.get_output_file()) logger.info("delete tmp file: " + ffmpeg_task.get_output_file())
else: else:
@@ -89,7 +178,8 @@ def clear_task_tmp_file(ffmpeg_task):
def probe_video_info(ffmpeg_task): def probe_video_info(ffmpeg_task):
# 获取视频长度宽度和时长 """获取视频长度宽度和时长 - 使用新的渲染服务"""
return ffmpeg.probe_video_info(ffmpeg_task.get_output_file()) render_service = _get_render_service()
return render_service.get_video_info(ffmpeg_task.get_output_file())

View File

@@ -1,24 +1,55 @@
from template import get_template_def import json
from util import api import logging
from opentelemetry.trace import Status, StatusCode
# 使用新的服务架构
from services import DefaultTaskService, DefaultRenderService, DefaultTemplateService
from telemetry import get_tracer
logger = logging.getLogger(__name__)
# 创建服务实例(单例模式)
_render_service = None
_template_service = None
_task_service = None
def _get_services():
"""获取服务实例(懒加载)"""
global _render_service, _template_service, _task_service
if _render_service is None:
_render_service = DefaultRenderService()
if _template_service is None:
_template_service = DefaultTemplateService()
_template_service.load_local_templates() # 加载本地模板
if _task_service is None:
_task_service = DefaultTaskService(_render_service, _template_service)
return _task_service, _render_service, _template_service
def start_task(task_info): def start_task(task_info):
from biz.ffmpeg import parse_ffmpeg_task, start_ffmpeg_task, clear_task_tmp_file, probe_video_info """启动任务处理(保持向后兼容的接口)"""
task_info = api.normalize_task(task_info) tracer = get_tracer(__name__)
template_info = get_template_def(task_info.get("templateId")) with tracer.start_as_current_span("start_task_legacy") as span:
api.report_task_start(task_info) try:
ffmpeg_task = parse_ffmpeg_task(task_info, template_info) task_service, _, _ = _get_services()
result = start_ffmpeg_task(ffmpeg_task)
if not result: # 使用新的任务服务处理
return api.report_task_failed(task_info) result = task_service.process_task(task_info)
oss_result = api.upload_task_file(task_info, ffmpeg_task)
if not oss_result: if result:
return api.report_task_failed(task_info) span.set_status(Status(StatusCode.OK))
# 获取视频长度宽度和时长 logger.info("Task completed successfully: %s", task_info.get("id"))
width, height, duration = probe_video_info(ffmpeg_task) else:
clear_task_tmp_file(ffmpeg_task) span.set_status(Status(StatusCode.ERROR))
api.report_task_success(task_info, videoInfo={ logger.error("Task failed: %s", task_info.get("id"))
"width": width,
"height": height, return None # 保持原有返回值格式
"duration": duration
}) except Exception as e:
span.set_status(Status(StatusCode.ERROR))
logger.error("Task processing failed: %s", e, exc_info=True)
return None

View File

@@ -3,6 +3,9 @@ import logging
from logging.handlers import TimedRotatingFileHandler from logging.handlers import TimedRotatingFileHandler
from dotenv import load_dotenv from dotenv import load_dotenv
# 导入新的配置系统,保持向后兼容
from .settings import get_config, get_ffmpeg_config, get_api_config, get_storage_config, get_server_config
load_dotenv() load_dotenv()
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
root_logger = logging.getLogger() root_logger = logging.getLogger()

139
config/settings.py Normal file
View File

@@ -0,0 +1,139 @@
import os
from dataclasses import dataclass
from typing import Dict, List, Optional, Union
import logging
from dotenv import load_dotenv
load_dotenv()
@dataclass
class FFmpegConfig:
"""FFmpeg相关配置"""
encoder_args: List[str]
video_args: List[str]
audio_args: List[str]
default_args: List[str]
old_ffmpeg: bool = False
re_encode_video_args: Optional[List[str]] = None
re_encode_encoder_args: Optional[List[str]] = None
@classmethod
def from_env(cls) -> 'FFmpegConfig':
encoder_args = os.getenv("ENCODER_ARGS", "-c:v h264").split(" ")
video_args = os.getenv("VIDEO_ARGS", "-profile:v high -level:v 4").split(" ")
audio_args = ["-c:a", "aac", "-b:a", "128k", "-ar", "48000", "-ac", "2"]
default_args = ["-shortest"]
re_encode_video_args = None
if os.getenv("RE_ENCODE_VIDEO_ARGS"):
re_encode_video_args = os.getenv("RE_ENCODE_VIDEO_ARGS").split(" ")
re_encode_encoder_args = None
if os.getenv("RE_ENCODE_ENCODER_ARGS"):
re_encode_encoder_args = os.getenv("RE_ENCODE_ENCODER_ARGS").split(" ")
return cls(
encoder_args=encoder_args,
video_args=video_args,
audio_args=audio_args,
default_args=default_args,
old_ffmpeg=bool(os.getenv("OLD_FFMPEG", False)),
re_encode_video_args=re_encode_video_args,
re_encode_encoder_args=re_encode_encoder_args
)
@dataclass
class APIConfig:
"""API相关配置"""
endpoint: str
access_key: str
timeout: int = 10
redirect_to_url: Optional[str] = None
@classmethod
def from_env(cls) -> 'APIConfig':
endpoint = os.getenv('API_ENDPOINT', '')
if not endpoint:
raise ValueError("API_ENDPOINT environment variable is required")
access_key = os.getenv('ACCESS_KEY', '')
if not access_key:
raise ValueError("ACCESS_KEY environment variable is required")
return cls(
endpoint=endpoint,
access_key=access_key,
timeout=int(os.getenv('API_TIMEOUT', '10')),
redirect_to_url=os.getenv("REDIRECT_TO_URL") or None
)
@dataclass
class StorageConfig:
"""存储相关配置"""
template_dir: str
@classmethod
def from_env(cls) -> 'StorageConfig':
template_dir = os.getenv('TEMPLATE_DIR', './template')
return cls(template_dir=template_dir)
@dataclass
class ServerConfig:
"""服务器相关配置"""
host: str = "0.0.0.0"
port: int = 9998
debug: bool = False
@classmethod
def from_env(cls) -> 'ServerConfig':
return cls(
host=os.getenv('HOST', '0.0.0.0'),
port=int(os.getenv('PORT', '9998')),
debug=bool(os.getenv('DEBUG', False))
)
@dataclass
class AppConfig:
"""应用总配置"""
ffmpeg: FFmpegConfig
api: APIConfig
storage: StorageConfig
server: ServerConfig
@classmethod
def from_env(cls) -> 'AppConfig':
return cls(
ffmpeg=FFmpegConfig.from_env(),
api=APIConfig.from_env(),
storage=StorageConfig.from_env(),
server=ServerConfig.from_env()
)
# 全局配置实例
_config: Optional[AppConfig] = None
def get_config() -> AppConfig:
"""获取全局配置实例"""
global _config
if _config is None:
_config = AppConfig.from_env()
return _config
def reload_config() -> AppConfig:
"""重新加载配置"""
global _config
_config = AppConfig.from_env()
return _config
# 向后兼容的配置获取函数
def get_ffmpeg_config() -> FFmpegConfig:
return get_config().ffmpeg
def get_api_config() -> APIConfig:
return get_config().api
def get_storage_config() -> StorageConfig:
return get_config().storage
def get_server_config() -> ServerConfig:
return get_config().server

View File

@@ -1,6 +1,9 @@
SUPPORT_FEATURE = ( SUPPORT_FEATURE = (
'simple_render_algo', 'simple_render_algo',
'gpu_accelerate', 'gpu_accelerate',
'intel_gpu_accelerate', 'hevc_encode',
'rapid_download',
'rclone_upload',
'custom_re_encode',
) )
SOFTWARE_VERSION = '0.0.1' SOFTWARE_VERSION = '0.0.5'

View File

@@ -0,0 +1,25 @@
from .base import EffectProcessor, EffectRegistry
from .camera_shot import CameraShotEffect
from .speed import SpeedEffect
from .zoom import ZoomEffect
from .skip import SkipEffect
from .tail import TailEffect
# 注册所有效果处理器
registry = EffectRegistry()
registry.register('cameraShot', CameraShotEffect)
registry.register('ospeed', SpeedEffect)
registry.register('zoom', ZoomEffect)
registry.register('skip', SkipEffect)
registry.register('tail', TailEffect)
__all__ = [
'EffectProcessor',
'EffectRegistry',
'registry',
'CameraShotEffect',
'SpeedEffect',
'ZoomEffect',
'SkipEffect',
'TailEffect'
]

94
entity/effects/base.py Normal file
View File

@@ -0,0 +1,94 @@
from abc import ABC, abstractmethod
from typing import Dict, List, Type, Any, Optional
import json
import logging
logger = logging.getLogger(__name__)
class EffectProcessor(ABC):
"""效果处理器抽象基类"""
def __init__(self, params: str = "", ext_data: Optional[Dict[str, Any]] = None):
self.params = params
self.ext_data = ext_data or {}
self.frame_rate = 25 # 默认帧率
@abstractmethod
def validate_params(self) -> bool:
"""验证参数是否有效"""
pass
@abstractmethod
def generate_filter_args(self, video_input: str, effect_index: int) -> tuple[List[str], str]:
"""
生成FFmpeg滤镜参数
Args:
video_input: 输入视频流标识符 (例如: "[0:v]", "[v_eff1]")
effect_index: 效果索引,用于生成唯一的输出标识符
Returns:
tuple: (filter_args_list, output_stream_identifier)
"""
pass
@abstractmethod
def get_effect_name(self) -> str:
"""获取效果名称"""
pass
def parse_params(self) -> List[str]:
"""解析参数字符串为列表"""
if not self.params:
return []
return self.params.split(',')
def get_pos_json(self) -> Dict[str, Any]:
"""获取位置JSON数据"""
pos_json_str = self.ext_data.get('posJson', '{}')
try:
return json.loads(pos_json_str) if pos_json_str != '{}' else {}
except Exception as e:
logger.warning(f"Failed to parse posJson: {e}")
return {}
class EffectRegistry:
"""效果处理器注册表"""
def __init__(self):
self._processors: Dict[str, Type[EffectProcessor]] = {}
def register(self, name: str, processor_class: Type[EffectProcessor]):
"""注册效果处理器"""
if not issubclass(processor_class, EffectProcessor):
raise ValueError(f"{processor_class} must be a subclass of EffectProcessor")
self._processors[name] = processor_class
logger.debug(f"Registered effect processor: {name}")
def get_processor(self, effect_name: str, params: str = "", ext_data: Optional[Dict[str, Any]] = None) -> Optional[EffectProcessor]:
"""获取效果处理器实例"""
if effect_name not in self._processors:
logger.warning(f"Unknown effect: {effect_name}")
return None
processor_class = self._processors[effect_name]
return processor_class(params, ext_data)
def list_effects(self) -> List[str]:
"""列出所有注册的效果"""
return list(self._processors.keys())
def parse_effect_string(self, effect_string: str) -> tuple[str, str]:
"""
解析效果字符串
Args:
effect_string: 效果字符串,格式为 "effect_name:params"
Returns:
tuple: (effect_name, params)
"""
if ':' in effect_string:
parts = effect_string.split(':', 2)
return parts[0], parts[1] if len(parts) > 1 else ""
return effect_string, ""

View File

@@ -0,0 +1,79 @@
from typing import List, Dict, Any
from .base import EffectProcessor
class CameraShotEffect(EffectProcessor):
"""相机镜头效果处理器"""
def validate_params(self) -> bool:
"""验证参数:start_time,duration,rotate_deg"""
params = self.parse_params()
if not params:
return True # 使用默认参数
# 参数格式: "start_time,duration,rotate_deg"
if len(params) > 3:
return False
try:
for i, param in enumerate(params):
if param == '':
continue
if i == 2: # rotate_deg
int(param)
else: # start_time, duration
float(param)
return True
except ValueError:
return False
def generate_filter_args(self, video_input: str, effect_index: int) -> tuple[List[str], str]:
"""生成相机镜头效果的滤镜参数"""
if not self.validate_params():
return [], video_input
params = self.parse_params()
# 设置默认值
start = 3.0
duration = 1.0
rotate_deg = 0
if len(params) >= 1 and params[0] != '':
start = float(params[0])
if len(params) >= 2 and params[1] != '':
duration = float(params[1])
if len(params) >= 3 and params[2] != '':
rotate_deg = int(params[2])
filter_args = []
# 生成输出流标识符
start_out_str = "[eff_s]"
mid_out_str = "[eff_m]"
end_out_str = "[eff_e]"
final_output = f"[v_eff{effect_index}]"
# 分割视频流为三部分
filter_args.append(f"{video_input}split=3{start_out_str}{mid_out_str}{end_out_str}")
# 选择开始部分帧
filter_args.append(f"{start_out_str}select=lt(n\\,{int(start * self.frame_rate)}){start_out_str}")
# 选择结束部分帧
filter_args.append(f"{end_out_str}select=gt(n\\,{int(start * self.frame_rate)}){end_out_str}")
# 选择中间特定帧并扩展
filter_args.append(f"{mid_out_str}select=eq(n\\,{int(start * self.frame_rate)}){mid_out_str}")
filter_args.append(f"{mid_out_str}tpad=start_mode=clone:start_duration={duration:.4f}{mid_out_str}")
# 如果需要旋转
if rotate_deg != 0:
filter_args.append(f"{mid_out_str}rotate=PI*{rotate_deg}/180{mid_out_str}")
# 连接三部分
filter_args.append(f"{start_out_str}{mid_out_str}{end_out_str}concat=n=3:v=1:a=0,setpts=N/{self.frame_rate}/TB{final_output}")
return filter_args, final_output
def get_effect_name(self) -> str:
return "cameraShot"

38
entity/effects/skip.py Normal file
View File

@@ -0,0 +1,38 @@
from typing import List
from .base import EffectProcessor
class SkipEffect(EffectProcessor):
"""跳过开头效果处理器"""
def validate_params(self) -> bool:
"""验证参数:跳过的秒数"""
if not self.params:
return True # 默认不跳过
try:
skip_seconds = float(self.params)
return skip_seconds >= 0
except ValueError:
return False
def generate_filter_args(self, video_input: str, effect_index: int) -> tuple[List[str], str]:
"""生成跳过开头效果的滤镜参数"""
if not self.validate_params():
return [], video_input
if not self.params:
return [], video_input
skip_seconds = float(self.params)
if skip_seconds <= 0:
return [], video_input
output_stream = f"[v_eff{effect_index}]"
# 使用trim滤镜跳过开头
filter_args = [f"{video_input}trim=start={skip_seconds}{output_stream}"]
return filter_args, output_stream
def get_effect_name(self) -> str:
return "skip"

35
entity/effects/speed.py Normal file
View File

@@ -0,0 +1,35 @@
from typing import List
from .base import EffectProcessor
class SpeedEffect(EffectProcessor):
"""视频变速效果处理器"""
def validate_params(self) -> bool:
"""验证参数:速度倍数"""
if not self.params:
return True # 默认不变速
try:
speed = float(self.params)
return speed > 0
except ValueError:
return False
def generate_filter_args(self, video_input: str, effect_index: int) -> tuple[List[str], str]:
"""生成变速效果的滤镜参数"""
if not self.validate_params():
return [], video_input
if not self.params or self.params == "1":
return [], video_input # 不需要变速
speed = float(self.params)
output_stream = f"[v_eff{effect_index}]"
# 使用setpts进行变速
filter_args = [f"{video_input}setpts={speed}*PTS{output_stream}"]
return filter_args, output_stream
def get_effect_name(self) -> str:
return "ospeed"

42
entity/effects/tail.py Normal file
View File

@@ -0,0 +1,42 @@
from typing import List
from .base import EffectProcessor
class TailEffect(EffectProcessor):
"""保留末尾效果处理器"""
def validate_params(self) -> bool:
"""验证参数:保留的秒数"""
if not self.params:
return True # 默认不截取
try:
tail_seconds = float(self.params)
return tail_seconds >= 0
except ValueError:
return False
def generate_filter_args(self, video_input: str, effect_index: int) -> tuple[List[str], str]:
"""生成保留末尾效果的滤镜参数"""
if not self.validate_params():
return [], video_input
if not self.params:
return [], video_input
tail_seconds = float(self.params)
if tail_seconds <= 0:
return [], video_input
output_stream = f"[v_eff{effect_index}]"
# 使用reverse+trim+reverse的方法来精确获取最后N秒
filter_args = [
f"{video_input}reverse[v_rev{effect_index}]",
f"[v_rev{effect_index}]trim=duration={tail_seconds}[v_trim{effect_index}]",
f"[v_trim{effect_index}]reverse{output_stream}"
]
return filter_args, output_stream
def get_effect_name(self) -> str:
return "tail"

89
entity/effects/zoom.py Normal file
View File

@@ -0,0 +1,89 @@
from typing import List
import json
from .base import EffectProcessor
class ZoomEffect(EffectProcessor):
"""缩放效果处理器"""
def validate_params(self) -> bool:
"""验证参数:start_time,zoom_factor,duration"""
params = self.parse_params()
if len(params) < 3:
return False
try:
start_time = float(params[0])
zoom_factor = float(params[1])
duration = float(params[2])
return (start_time >= 0 and
zoom_factor > 0 and
duration >= 0)
except (ValueError, IndexError):
return False
def generate_filter_args(self, video_input: str, effect_index: int) -> tuple[List[str], str]:
"""生成缩放效果的滤镜参数"""
if not self.validate_params():
return [], video_input
params = self.parse_params()
start_time = float(params[0])
zoom_factor = float(params[1])
duration = float(params[2])
if zoom_factor == 1:
return [], video_input # 不需要缩放
output_stream = f"[v_eff{effect_index}]"
# 获取缩放中心点
center_x, center_y = self._get_zoom_center()
filter_args = []
if duration == 0:
# 静态缩放(整个视频时长)
x_expr = f"({center_x})-(ow*zoom)/2"
y_expr = f"({center_y})-(oh*zoom)/2"
filter_args.append(
f"{video_input}trim=start={start_time},zoompan=z={zoom_factor}:x={x_expr}:y={y_expr}:d=1{output_stream}"
)
else:
# 动态缩放(指定时间段内)
zoom_expr = f"if(between(t\\,{start_time}\\,{start_time + duration})\\,{zoom_factor}\\,1)"
x_expr = f"({center_x})-(ow*zoom)/2"
y_expr = f"({center_y})-(oh*zoom)/2"
filter_args.append(
f"{video_input}zoompan=z={zoom_expr}:x={x_expr}:y={y_expr}:d=1{output_stream}"
)
return filter_args, output_stream
def _get_zoom_center(self) -> tuple[str, str]:
"""获取缩放中心点坐标表达式"""
# 默认中心点
center_x = "iw/2"
center_y = "ih/2"
pos_json = self.get_pos_json()
if pos_json:
_f_x = pos_json.get('ltX', 0)
_f_x2 = pos_json.get('rbX', 0)
_f_y = pos_json.get('ltY', 0)
_f_y2 = pos_json.get('rbY', 0)
_v_w = pos_json.get('imgWidth', 1)
_v_h = pos_json.get('imgHeight', 1)
if _v_w > 0 and _v_h > 0:
# 计算坐标系统中的中心点
center_x_ratio = (_f_x + _f_x2) / (2 * _v_w)
center_y_ratio = (_f_y + _f_y2) / (2 * _v_h)
# 转换为视频坐标系统
center_x = f"iw*{center_x_ratio:.6f}"
center_y = f"ih*{center_y_ratio:.6f}"
return center_x, center_y
def get_effect_name(self) -> str:
return "zoom"

View File

@@ -1,9 +1,31 @@
import json
import os
import time import time
import uuid import uuid
from typing import Any
DEFAULT_ARGS = ("-shortest",)
ENCODER_ARGS = ("-c:v", "h264", ) if not os.getenv("ENCODER_ARGS", False) else os.getenv("ENCODER_ARGS", "").split(" ")
VIDEO_ARGS = ("-profile:v", "high", "-level:v", "4", ) if not os.getenv("VIDEO_ARGS", False) else os.getenv("VIDEO_ARGS", "").split(" ")
AUDIO_ARGS = ("-c:a", "aac", "-b:a", "128k", "-ar", "48000", "-ac", "2", )
MUTE_AUDIO_INPUT = ("-f", "lavfi", "-i", "anullsrc=cl=stereo:r=48000", )
def get_mp4toannexb_filter():
"""
Determine which mp4toannexb filter to use based on ENCODER_ARGS.
Returns 'hevc_mp4toannexb' if ENCODER_ARGS contains 'hevc', otherwise 'h264_mp4toannexb'.
"""
encoder_args_str = os.getenv("ENCODER_ARGS", "").lower()
if "hevc" in encoder_args_str:
return "hevc_mp4toannexb"
return "h264_mp4toannexb"
class FfmpegTask(object): class FfmpegTask(object):
effects: list[str]
def __init__(self, input_file, task_type='copy', output_file=''): def __init__(self, input_file, task_type='copy', output_file=''):
self.annexb = False self.annexb = False
if type(input_file) is str: if type(input_file) is str:
@@ -14,15 +36,20 @@ class FfmpegTask(object):
self.input_file = input_file self.input_file = input_file
else: else:
self.input_file = [] self.input_file = []
self.zoom_cut = None
self.center_cut = None
self.ext_data = {}
self.task_type = task_type self.task_type = task_type
self.output_file = output_file self.output_file = output_file
self.mute = True self.mute = True
self.speed = 1 self.speed = 1
self.frame_rate = 25 self.frame_rate = 25
self.resolution = None
self.subtitles = [] self.subtitles = []
self.luts = [] self.luts = []
self.audios = [] self.audios = []
self.overlays = [] self.overlays = []
self.effects = []
def __repr__(self): def __repr__(self):
_str = f'FfmpegTask(input_file={self.input_file}, task_type={self.task_type}' _str = f'FfmpegTask(input_file={self.input_file}, task_type={self.task_type}'
@@ -34,8 +61,11 @@ class FfmpegTask(object):
_str += f', overlays={self.overlays}' _str += f', overlays={self.overlays}'
if self.annexb: if self.annexb:
_str += f', annexb={self.annexb}' _str += f', annexb={self.annexb}'
if self.effects:
_str += f', effects={self.effects}'
if self.mute: if self.mute:
_str += f', mute={self.mute}' _str += f', mute={self.mute}'
_str += f', center_cut={self.center_cut}'
return _str + ')' return _str + ')'
def analyze_input_render_tasks(self): def analyze_input_render_tasks(self):
@@ -77,6 +107,10 @@ class FfmpegTask(object):
self.luts.extend(luts) self.luts.extend(luts)
self.correct_task_type() self.correct_task_type()
def add_effect(self, *effects):
self.effects.extend(effects)
self.correct_task_type()
def get_output_file(self): def get_output_file(self):
if self.task_type == 'copy': if self.task_type == 'copy':
return self.input_file[0] return self.input_file[0]
@@ -99,8 +133,14 @@ class FfmpegTask(object):
return False return False
if len(self.subtitles) > 0: if len(self.subtitles) > 0:
return False return False
if len(self.effects) > 0:
return False
if self.speed != 1: if self.speed != 1:
return False return False
if self.zoom_cut is not None:
return False
if self.center_cut is not None:
return False
return True return True
def check_can_copy(self): def check_can_copy(self):
@@ -110,44 +150,227 @@ class FfmpegTask(object):
return False return False
if len(self.subtitles) > 0: if len(self.subtitles) > 0:
return False return False
if len(self.effects) > 0:
return False
if self.speed != 1: if self.speed != 1:
return False return False
if len(self.audios) > 1: if len(self.audios) >= 1:
return False return False
if len(self.input_file) > 1: if len(self.input_file) > 1:
return False return False
if self.zoom_cut is not None:
return False
if self.center_cut is not None:
return False
return True return True
def check_audio_track(self): def check_audio_track(self):
if len(self.audios) > 0: ...
self.mute = False
def get_ffmpeg_args(self): def get_ffmpeg_args(self):
args = ['-y', '-hide_banner'] args = ['-y', '-hide_banner']
if self.task_type == 'encode': if self.task_type == 'encode':
# args += ('-hwaccel', 'qsv', '-hwaccel_output_format', 'qsv')
input_args = [] input_args = []
filter_args = [] filter_args = []
output_args = ["-shortest", "-c:v", "h264_qsv", "-global_quality", "28", "-look_ahead", "1"] output_args = [*VIDEO_ARGS, *AUDIO_ARGS, *ENCODER_ARGS, *DEFAULT_ARGS]
if self.annexb: if self.annexb:
output_args.append("-bsf:v") output_args.append("-bsf:v")
output_args.append("h264_mp4toannexb") output_args.append(get_mp4toannexb_filter())
output_args.append("-reset_timestamps")
output_args.append("1")
video_output_str = "[0:v]" video_output_str = "[0:v]"
audio_output_str = "[0:v]" audio_output_str = ""
video_input_count = 0 audio_track_index = 0
audio_input_count = 0 effect_index = 0
for input_file in self.input_file: for input_file in self.input_file:
input_args.append("-i") input_args.append("-i")
if type(input_file) is str: if type(input_file) is str:
input_args.append(input_file) input_args.append(input_file)
elif isinstance(input_file, FfmpegTask): elif isinstance(input_file, FfmpegTask):
input_args.append(input_file.get_output_file()) input_args.append(input_file.get_output_file())
if self.center_cut == 1:
pos_json_str = self.ext_data.get('posJson', '{}')
try:
pos_json = json.loads(pos_json_str)
except Exception as e:
pos_json = {}
_v_w = pos_json.get('imgWidth', 1)
_f_x = pos_json.get('ltX', 0)
_f_x2 = pos_json.get('rbX', 0)
_x = f'{float((_f_x2 + _f_x)/(2 * _v_w)) :.4f}*iw-ih*ih/(2*iw)'
filter_args.append(f"{video_output_str}crop=x={_x}:y=0:w=ih*ih/iw:h=ih[v_cut{effect_index}]")
video_output_str = f"[v_cut{effect_index}]"
effect_index += 1
if self.zoom_cut == 1 and self.resolution:
_input = None
for input_file in self.input_file:
if type(input_file) is str:
_input = input_file
break
elif isinstance(input_file, FfmpegTask):
_input = input_file.get_output_file()
break
if _input:
from util.ffmpeg import probe_video_info
_iw, _ih, _ = probe_video_info(_input)
_w, _h = self.resolution.split('x', 1)
pos_json_str = self.ext_data.get('posJson', '{}')
try:
pos_json = json.loads(pos_json_str)
except Exception as e:
pos_json = {}
_v_w = pos_json.get('imgWidth', 1)
_v_h = pos_json.get('imgHeight', 1)
_f_x = pos_json.get('ltX', 0)
_f_x2 = pos_json.get('rbX', 0)
_f_y = pos_json.get('ltY', 0)
_f_y2 = pos_json.get('rbY', 0)
_x = min(max(0, int((_f_x + _f_x2) / 2 - int(_w) / 2)), _iw - int(_w))
_y = min(max(0, int((_f_y + _f_y2) / 2 - int(_h) / 2)), _ih - int(_h))
filter_args.append(f"{video_output_str}crop=x={_x}:y={_y}:w={_w}:h={_h}[vz_cut{effect_index}]")
video_output_str = f"[vz_cut{effect_index}]"
effect_index += 1
for effect in self.effects:
if effect.startswith("cameraShot:"):
param = effect.split(":", 2)[1]
if param == '':
param = "3,1,0"
_split = param.split(",")
start = 3
duration = 1
rotate_deg = 0
if len(_split) >= 3:
if _split[2] == '':
rotate_deg = 0
else:
rotate_deg = int(_split[2])
if len(_split) >= 2:
duration = float(_split[1])
if len(_split) >= 1:
start = float(_split[0])
_start_out_str = "[eff_s]"
_mid_out_str = "[eff_m]"
_end_out_str = "[eff_e]"
filter_args.append(f"{video_output_str}split=3{_start_out_str}{_mid_out_str}{_end_out_str}")
filter_args.append(f"{_start_out_str}select=lt(n\\,{int(start * self.frame_rate)}){_start_out_str}")
filter_args.append(f"{_end_out_str}select=gt(n\\,{int(start * self.frame_rate)}){_end_out_str}")
filter_args.append(f"{_mid_out_str}select=eq(n\\,{int(start * self.frame_rate)}){_mid_out_str}")
filter_args.append(
f"{_mid_out_str}tpad=start_mode=clone:start_duration={duration:.4f}{_mid_out_str}")
if rotate_deg != 0:
filter_args.append(f"{_mid_out_str}rotate=PI*{rotate_deg}/180{_mid_out_str}")
# filter_args.append(f"{video_output_str}trim=start=0:end={start+duration},tpad=stop_mode=clone:stop_duration={duration},setpts=PTS-STARTPTS{_start_out_str}")
# filter_args.append(f"tpad=start_mode=clone:start_duration={duration},setpts=PTS-STARTPTS{_start_out_str}")
# filter_args.append(f"{_end_out_str}trim=start={start}{_end_out_str}")
video_output_str = f"[v_eff{effect_index}]"
# filter_args.append(f"{_end_out_str}{_start_out_str}overlay=eof_action=pass{video_output_str}")
filter_args.append(f"{_start_out_str}{_mid_out_str}{_end_out_str}concat=n=3:v=1:a=0,setpts=N/{self.frame_rate}/TB{video_output_str}")
effect_index += 1
elif effect.startswith("ospeed:"):
param = effect.split(":", 2)[1]
if param == '':
param = "1"
if param != "1":
# 视频变速
effect_index += 1
filter_args.append(f"{video_output_str}setpts={param}*PTS[v_eff{effect_index}]")
video_output_str = f"[v_eff{effect_index}]"
elif effect.startswith("zoom:"):
param = effect.split(":", 2)[1]
if param == '':
continue
_split = param.split(",")
if len(_split) < 3:
continue
try:
start_time = float(_split[0])
zoom_factor = float(_split[1])
duration = float(_split[2])
if start_time < 0:
start_time = 0
if duration < 0:
duration = 0
if zoom_factor <= 0:
zoom_factor = 1
except (ValueError, IndexError):
start_time = 0
duration = 0
zoom_factor = 1
if zoom_factor == 1:
continue
effect_index += 1
# 获取缩放中心点(从pos_json或使用默认中心)
center_x = "iw/2"
center_y = "ih/2"
pos_json_str = self.ext_data.get('posJson', '{}')
try:
pos_json = json.loads(pos_json_str) if pos_json_str != '{}' else {}
if pos_json:
_f_x = pos_json.get('ltX', 0)
_f_x2 = pos_json.get('rbX', 0)
_f_y = pos_json.get('ltY', 0)
_f_y2 = pos_json.get('rbY', 0)
_v_w = pos_json.get('imgWidth', 1)
_v_h = pos_json.get('imgHeight', 1)
if _v_w > 0 and _v_h > 0:
# 计算坐标系统中的中心点
center_x_ratio = (_f_x + _f_x2) / (2 * _v_w)
center_y_ratio = (_f_y + _f_y2) / (2 * _v_h)
# 转换为视频坐标系统
center_x = f"iw*{center_x_ratio:.6f}"
center_y = f"ih*{center_y_ratio:.6f}"
except Exception as e:
# 解析失败使用默认中心
pass
if duration == 0:
# 静态缩放(整个视频时长)
x_expr = f"({center_x})-(ow*zoom)/2"
y_expr = f"({center_y})-(oh*zoom)/2"
filter_args.append(f"{video_output_str}trim=start={start_time},zoompan=z={zoom_factor}:x={x_expr}:y={y_expr}:d=1[v_eff{effect_index}]")
else:
# 动态缩放(指定时间段内)
zoom_expr = f"if(between(t\\,{start_time}\\,{start_time + duration})\\,{zoom_factor}\\,1)"
x_expr = f"({center_x})-(ow*zoom)/2"
y_expr = f"({center_y})-(oh*zoom)/2"
filter_args.append(f"{video_output_str}zoompan=z={zoom_expr}:x={x_expr}:y={y_expr}:d=1[v_eff{effect_index}]")
video_output_str = f"[v_eff{effect_index}]"
elif effect.startswith("skip:"):
param = effect.split(":", 2)[1]
if param == '':
param = "0"
skip_seconds = float(param)
if skip_seconds > 0:
effect_index += 1
filter_args.append(f"{video_output_str}trim=start={skip_seconds}[v_eff{effect_index}]")
video_output_str = f"[v_eff{effect_index}]"
elif effect.startswith("tail:"):
param = effect.split(":", 2)[1]
if param == '':
param = "0"
tail_seconds = float(param)
if tail_seconds > 0:
effect_index += 1
# 首先获取视频总时长,然后计算开始时间
# 使用reverse+trim+reverse的方法来精确获取最后N秒
filter_args.append(f"{video_output_str}reverse[v_rev{effect_index}]")
filter_args.append(f"[v_rev{effect_index}]trim=duration={tail_seconds}[v_trim{effect_index}]")
filter_args.append(f"[v_trim{effect_index}]reverse[v_eff{effect_index}]")
video_output_str = f"[v_eff{effect_index}]"
...
if self.resolution:
filter_args.append(f"{video_output_str}scale={self.resolution.replace('x', ':')}[v]")
video_output_str = "[v]"
for lut in self.luts: for lut in self.luts:
filter_args.append("[0:v]lut3d=file=" + lut + "[0:v]") filter_args.append(f"{video_output_str}lut3d=file={lut}{video_output_str}")
for overlay in self.overlays: for overlay in self.overlays:
input_index = input_args.count("-i") input_index = input_args.count("-i")
input_args.append("-i") input_args.append("-i")
input_args.append(overlay) input_args.append(overlay)
if os.getenv("OLD_FFMPEG"):
filter_args.append(f"{video_output_str}[{input_index}:v]scale2ref=iw:ih[v]")
else:
filter_args.append(f"{video_output_str}[{input_index}:v]scale=rw:rh[v]") filter_args.append(f"{video_output_str}[{input_index}:v]scale=rw:rh[v]")
filter_args.append(f"[v][{input_index}:v]overlay=1:eof_action=endall[v]") filter_args.append(f"[v][{input_index}:v]overlay=1:eof_action=endall[v]")
video_output_str = "[v]" video_output_str = "[v]"
@@ -158,98 +381,99 @@ class FfmpegTask(object):
output_args.append(video_output_str) output_args.append(video_output_str)
output_args.append("-r") output_args.append("-r")
output_args.append(f"{self.frame_rate}") output_args.append(f"{self.frame_rate}")
output_args.append("-fps_mode")
output_args.append("cfr")
if self.mute: if self.mute:
output_args.append("-an") input_index = input_args.count("-i")
input_args += MUTE_AUDIO_INPUT
filter_args.append(f"[{input_index}:a]acopy[a]")
audio_track_index += 1
audio_output_str = "[a]"
else: else:
input_index = 0 audio_output_str = "[0:a]"
audio_track_index += 1
for audio in self.audios: for audio in self.audios:
input_index = input_args.count("-i") input_index = input_args.count("-i")
input_args.append("-i") input_args.append("-i")
input_args.append(audio.replace("\\", "/")) input_args.append(audio.replace("\\", "/"))
if audio_input_count > 0: audio_track_index += 1
filter_args.append(f"{audio_output_str}[{input_index}:a]amix[a]") filter_args.append(f"{audio_output_str}[{input_index}:a]amix=duration=shortest:dropout_transition=0:normalize=0[a]")
audio_output_str = "[a]" audio_output_str = "[a]"
else: if audio_output_str:
audio_output_str = f"[{input_index}:a]" output_args.append("-map")
audio_input_count += 1
if audio_input_count == 1:
audio_output_str = f"{input_index}"
output_args.append(f"-map")
output_args.append(audio_output_str) output_args.append(audio_output_str)
return args + input_args + ["-filter_complex", ";".join(filter_args)] + output_args + [self.get_output_file()] _filter_args = [] if len(filter_args) == 0 else ["-filter_complex", ";".join(filter_args)]
return args + input_args + _filter_args + output_args + [self.get_output_file()]
elif self.task_type == 'concat': elif self.task_type == 'concat':
# 无法通过 annexb 合并的 # 无法通过 annexb 合并的
input_args = [] input_args = []
output_args = ["-shortest"] output_args = [*DEFAULT_ARGS]
if self.check_annexb() and len(self.audios) <= 1: filter_args = []
audio_output_str = ""
audio_track_index = 0
# output_args # output_args
if len(self.audios) > 0: if len(self.input_file) == 1:
input_args.append("-an") _file = self.input_file[0]
from util.ffmpeg import probe_video_audio
if type(_file) is str:
input_args += ["-i", _file]
self.mute = not probe_video_audio(_file)
elif isinstance(_file, FfmpegTask):
input_args += ["-i", _file.get_output_file()]
self.mute = not probe_video_audio(_file.get_output_file())
else:
_tmp_file = "tmp_concat_" + str(time.time()) + ".txt" _tmp_file = "tmp_concat_" + str(time.time()) + ".txt"
from util.ffmpeg import probe_video_audio
with open(_tmp_file, "w", encoding="utf-8") as f: with open(_tmp_file, "w", encoding="utf-8") as f:
for input_file in self.input_file: for input_file in self.input_file:
if type(input_file) is str: if type(input_file) is str:
f.write("file '" + input_file + "'\n") f.write("file '" + input_file + "'\n")
elif isinstance(input_file, FfmpegTask): elif isinstance(input_file, FfmpegTask):
f.write("file '" + input_file.get_output_file() + "'\n") f.write("file '" + input_file.get_output_file() + "'\n")
input_args += ("-f", "concat", "-safe", "0", "-i", _tmp_file) input_args += ["-f", "concat", "-safe", "0", "-i", _tmp_file]
self.mute = not probe_video_audio(_tmp_file, "concat")
output_args.append("-map")
output_args.append("0:v")
output_args.append("-c:v") output_args.append("-c:v")
output_args.append("copy") output_args.append("copy")
if len(self.audios) > 0:
input_args.append("-i")
input_args.append(self.audios[0])
output_args.append("-c:a")
output_args.append("copy")
output_args.append("-f")
output_args.append("mp4")
output_args += ("-c:v", "h264_qsv", "-r", "25", "-global_quality", "28", "-look_ahead", "1")
return args + input_args + output_args + [self.get_output_file()]
output_args += ("-c:v", "h264_qsv", "-r", "25", "-global_quality", "28", "-look_ahead", "1")
filter_args = []
video_output_str = "[0:v]"
audio_output_str = "[0:a]"
video_input_count = 0
audio_input_count = 0
for input_file in self.input_file:
input_index = input_args.count("-i")
input_args.append("-i")
if type(input_file) is str:
input_args.append(input_file.replace("\\", "/"))
elif isinstance(input_file, FfmpegTask):
input_args.append(input_file.get_output_file().replace("\\", "/"))
if video_input_count > 0:
filter_args.append(f"{video_output_str}[{input_index}:v]concat=n=2:v=1:a=0[v]")
video_output_str = "[v]"
else:
video_output_str = f"[{input_index}:v]"
video_input_count += 1
output_args.append("-map")
output_args.append(video_output_str)
if self.mute: if self.mute:
output_args.append("-an") input_index = input_args.count("-i")
input_args += MUTE_AUDIO_INPUT
audio_output_str = f"[{input_index}:a]"
audio_track_index += 1
else: else:
input_index = 0 audio_output_str = "[0:a]"
audio_track_index += 1
for audio in self.audios: for audio in self.audios:
input_index = input_args.count("-i") input_index = input_args.count("-i")
input_args.append("-i") input_args.append("-i")
input_args.append(audio.replace("\\", "/")) input_args.append(audio.replace("\\", "/"))
if audio_input_count > 0: audio_track_index += 1
filter_args.append(f"{audio_output_str}[{input_index}:a]amix[a]") filter_args.append(f"{audio_output_str}[{input_index}:a]amix=duration=shortest:dropout_transition=0:normalize=0[a]")
audio_output_str = "[a]" audio_output_str = "[a]"
if audio_output_str:
output_args.append("-map")
if audio_track_index <= 1:
output_args.append(audio_output_str[1:-1])
else: else:
audio_output_str = f"[{input_index}:a]"
audio_input_count += 1
if audio_input_count == 1:
audio_output_str = f"{input_index}"
output_args.append(f"-map")
output_args.append(audio_output_str) output_args.append(audio_output_str)
return args + input_args + ["-filter_complex", ";".join(filter_args)] + output_args + [self.get_output_file()] output_args += AUDIO_ARGS
if self.annexb:
output_args.append("-bsf:v")
output_args.append(get_mp4toannexb_filter())
output_args.append("-bsf:a")
output_args.append("setts=pts=DTS")
output_args.append("-f")
output_args.append("mpegts" if self.annexb else "mp4")
_filter_args = [] if len(filter_args) == 0 else ["-filter_complex", ";".join(filter_args)]
return args + input_args + _filter_args + output_args + [self.get_output_file()]
elif self.task_type == 'copy': elif self.task_type == 'copy':
if len(self.input_file) == 1: if len(self.input_file) == 1:
if type(self.input_file[0]) is str: if type(self.input_file[0]) is str:
if self.input_file[0] == self.get_output_file(): if self.input_file[0] == self.get_output_file():
return [] return []
return args + ["-i", self.input_file[0]] + ["-c", "copy", self.get_output_file()] return args + ["-i", self.input_file[0]] + ["-c", "copy", self.get_output_file()]
return []
def set_output_file(self, file=None): def set_output_file(self, file=None):
if file is None: if file is None:

View File

@@ -0,0 +1,281 @@
import json
import os
import time
from typing import List, Optional
from config.settings import get_ffmpeg_config
from entity.render_task import RenderTask, TaskType
from entity.effects import registry as effect_registry
from util.exceptions import FFmpegError
from util.ffmpeg import probe_video_info, probe_video_audio
import logging
logger = logging.getLogger(__name__)
class FFmpegCommandBuilder:
"""FFmpeg命令构建器"""
def __init__(self, task: RenderTask):
self.task = task
self.config = get_ffmpeg_config()
def build_command(self) -> List[str]:
"""构建FFmpeg命令"""
self.task.update_task_type()
if self.task.task_type == TaskType.COPY:
return self._build_copy_command()
elif self.task.task_type == TaskType.CONCAT:
return self._build_concat_command()
elif self.task.task_type == TaskType.ENCODE:
return self._build_encode_command()
else:
raise FFmpegError(f"Unsupported task type: {self.task.task_type}")
def _build_copy_command(self) -> List[str]:
"""构建复制命令"""
if len(self.task.input_files) == 1:
input_file = self.task.input_files[0]
if input_file == self.task.output_file:
return [] # 不需要处理
return [
"ffmpeg", "-y", "-hide_banner",
"-i", self.task.input_files[0],
"-c", "copy",
self.task.output_file
]
def _build_concat_command(self) -> List[str]:
"""构建拼接命令"""
args = ["ffmpeg", "-y", "-hide_banner"]
input_args = []
output_args = [*self.config.default_args]
filter_args = []
if len(self.task.input_files) == 1:
# 单个文件
file = self.task.input_files[0]
input_args.extend(["-i", file])
self.task.mute = not probe_video_audio(file)
else:
# 多个文件使用concat协议
tmp_file = f"tmp_concat_{time.time()}.txt"
with open(tmp_file, "w", encoding="utf-8") as f:
for input_file in self.task.input_files:
f.write(f"file '{input_file}'\n")
input_args.extend(["-f", "concat", "-safe", "0", "-i", tmp_file])
self.task.mute = not probe_video_audio(tmp_file, "concat")
# 视频流映射
output_args.extend(["-map", "0:v", "-c:v", "copy"])
# 音频处理
audio_output_str = self._handle_audio_concat(input_args, filter_args)
if audio_output_str:
output_args.extend(["-map", audio_output_str])
output_args.extend(self.config.audio_args)
# annexb处理
if self.task.annexb:
output_args.extend(["-bsf:v", self._get_mp4toannexb_filter()])
output_args.extend(["-bsf:a", "setts=pts=DTS"])
output_args.extend(["-f", "mpegts"])
else:
output_args.extend(["-f", "mp4"])
filter_complex = ["-filter_complex", ";".join(filter_args)] if filter_args else []
return args + input_args + filter_complex + output_args + [self.task.output_file]
def _build_encode_command(self) -> List[str]:
"""构建编码命令"""
args = ["ffmpeg", "-y", "-hide_banner"]
input_args = []
filter_args = []
output_args = [
*self.config.video_args,
*self.config.audio_args,
*self.config.encoder_args,
*self.config.default_args
]
# annexb处理
if self.task.annexb:
output_args.extend(["-bsf:v", self._get_mp4toannexb_filter()])
output_args.extend(["-reset_timestamps", "1"])
# 处理输入文件
for input_file in self.task.input_files:
input_args.extend(["-i", input_file])
# 处理视频流
video_output_str = "[0:v]"
effect_index = 0
# 处理中心裁剪
if self.task.center_cut == 1:
video_output_str, effect_index = self._add_center_cut(filter_args, video_output_str, effect_index)
# 处理缩放裁剪
if self.task.zoom_cut == 1 and self.task.resolution:
video_output_str, effect_index = self._add_zoom_cut(filter_args, video_output_str, effect_index)
# 处理效果
video_output_str, effect_index = self._add_effects(filter_args, video_output_str, effect_index)
# 处理分辨率
if self.task.resolution:
filter_args.append(f"{video_output_str}scale={self.task.resolution.replace('x', ':')}[v]")
video_output_str = "[v]"
# 处理LUT
for lut in self.task.luts:
filter_args.append(f"{video_output_str}lut3d=file={lut}{video_output_str}")
# 处理覆盖层
video_output_str = self._add_overlays(input_args, filter_args, video_output_str)
# 处理字幕
for subtitle in self.task.subtitles:
filter_args.append(f"{video_output_str}ass={subtitle}[v]")
video_output_str = "[v]"
# 映射视频流
output_args.extend(["-map", video_output_str])
output_args.extend(["-r", str(self.task.frame_rate)])
output_args.extend(["-fps_mode", "cfr"])
# 处理音频
audio_output_str = self._handle_audio_encode(input_args, filter_args)
if audio_output_str:
output_args.extend(["-map", audio_output_str])
filter_complex = ["-filter_complex", ";".join(filter_args)] if filter_args else []
return args + input_args + filter_complex + output_args + [self.task.output_file]
def _add_center_cut(self, filter_args: List[str], video_input: str, effect_index: int) -> tuple[str, int]:
"""添加中心裁剪"""
pos_json = self.task.ext_data.get('posJson', '{}')
try:
pos_data = json.loads(pos_json) if pos_json != '{}' else {}
except:
pos_data = {}
_v_w = pos_data.get('imgWidth', 1)
_f_x = pos_data.get('ltX', 0)
_f_x2 = pos_data.get('rbX', 0)
_x = f'{float((_f_x2 + _f_x)/(2 * _v_w)):.4f}*iw-ih*ih/(2*iw)'
filter_args.append(f"{video_input}crop=x={_x}:y=0:w=ih*ih/iw:h=ih[v_cut{effect_index}]")
return f"[v_cut{effect_index}]", effect_index + 1
def _add_zoom_cut(self, filter_args: List[str], video_input: str, effect_index: int) -> tuple[str, int]:
"""添加缩放裁剪"""
# 获取输入视频尺寸
input_file = self.task.input_files[0]
_iw, _ih, _ = probe_video_info(input_file)
_w, _h = self.task.resolution.split('x', 1)
pos_json = self.task.ext_data.get('posJson', '{}')
try:
pos_data = json.loads(pos_json) if pos_json != '{}' else {}
except:
pos_data = {}
_v_w = pos_data.get('imgWidth', 1)
_v_h = pos_data.get('imgHeight', 1)
_f_x = pos_data.get('ltX', 0)
_f_x2 = pos_data.get('rbX', 0)
_f_y = pos_data.get('ltY', 0)
_f_y2 = pos_data.get('rbY', 0)
_x = min(max(0, int((_f_x + _f_x2) / 2 - int(_w) / 2)), _iw - int(_w))
_y = min(max(0, int((_f_y + _f_y2) / 2 - int(_h) / 2)), _ih - int(_h))
filter_args.append(f"{video_input}crop=x={_x}:y={_y}:w={_w}:h={_h}[vz_cut{effect_index}]")
return f"[vz_cut{effect_index}]", effect_index + 1
def _add_effects(self, filter_args: List[str], video_input: str, effect_index: int) -> tuple[str, int]:
"""添加效果处理"""
current_input = video_input
for effect_str in self.task.effects:
effect_name, params = effect_registry.parse_effect_string(effect_str)
processor = effect_registry.get_processor(effect_name, params, self.task.ext_data)
if processor:
processor.frame_rate = self.task.frame_rate
effect_filters, output_stream = processor.generate_filter_args(current_input, effect_index)
if effect_filters:
filter_args.extend(effect_filters)
current_input = output_stream
effect_index += 1
return current_input, effect_index
def _add_overlays(self, input_args: List[str], filter_args: List[str], video_input: str) -> str:
"""添加覆盖层"""
current_input = video_input
for overlay in self.task.overlays:
input_index = input_args.count("-i") // 2 # 每个输入占两个参数 -i filename
input_args.extend(["-i", overlay])
if self.config.old_ffmpeg:
filter_args.append(f"{current_input}[{input_index}:v]scale2ref=iw:ih[v]")
else:
filter_args.append(f"{current_input}[{input_index}:v]scale=rw:rh[v]")
filter_args.append(f"[v][{input_index}:v]overlay=1:eof_action=endall[v]")
current_input = "[v]"
return current_input
def _handle_audio_concat(self, input_args: List[str], filter_args: List[str]) -> Optional[str]:
"""处理concat模式的音频"""
audio_output_str = ""
if self.task.mute:
input_index = input_args.count("-i") // 2
input_args.extend(["-f", "lavfi", "-i", "anullsrc=cl=stereo:r=48000"])
audio_output_str = f"[{input_index}:a]"
else:
audio_output_str = "[0:a]"
for audio in self.task.audios:
input_index = input_args.count("-i") // 2
input_args.extend(["-i", audio.replace("\\", "/")])
filter_args.append(f"{audio_output_str}[{input_index}:a]amix=duration=shortest:dropout_transition=0:normalize=0[a]")
audio_output_str = "[a]"
return audio_output_str.strip("[]") if audio_output_str else None
def _handle_audio_encode(self, input_args: List[str], filter_args: List[str]) -> Optional[str]:
"""处理encode模式的音频"""
audio_output_str = ""
if self.task.mute:
input_index = input_args.count("-i") // 2
input_args.extend(["-f", "lavfi", "-i", "anullsrc=cl=stereo:r=48000"])
filter_args.append(f"[{input_index}:a]acopy[a]")
audio_output_str = "[a]"
else:
audio_output_str = "[0:a]"
for audio in self.task.audios:
input_index = input_args.count("-i") // 2
input_args.extend(["-i", audio.replace("\\", "/")])
filter_args.append(f"{audio_output_str}[{input_index}:a]amix=duration=shortest:dropout_transition=0:normalize=0[a]")
audio_output_str = "[a]"
return audio_output_str if audio_output_str else None
def _get_mp4toannexb_filter(self) -> str:
"""获取mp4toannexb滤镜"""
encoder_args_str = " ".join(self.config.encoder_args).lower()
if "hevc" in encoder_args_str:
return "hevc_mp4toannexb"
return "h264_mp4toannexb"

146
entity/render_task.py Normal file
View File

@@ -0,0 +1,146 @@
import os
import uuid
from typing import List, Optional, Dict, Any
from dataclasses import dataclass, field
from enum import Enum
from config.settings import get_ffmpeg_config
from util.exceptions import TaskValidationError, EffectError
from entity.effects import registry as effect_registry
class TaskType(Enum):
COPY = "copy"
CONCAT = "concat"
ENCODE = "encode"
@dataclass
class RenderTask:
"""渲染任务数据类,只包含任务数据,不包含处理逻辑"""
input_files: List[str] = field(default_factory=list)
output_file: str = ""
task_type: TaskType = TaskType.COPY
# 视频参数
resolution: Optional[str] = None
frame_rate: int = 25
speed: float = 1.0
mute: bool = True
annexb: bool = False
# 裁剪参数
zoom_cut: Optional[int] = None
center_cut: Optional[int] = None
# 资源列表
subtitles: List[str] = field(default_factory=list)
luts: List[str] = field(default_factory=list)
audios: List[str] = field(default_factory=list)
overlays: List[str] = field(default_factory=list)
effects: List[str] = field(default_factory=list)
# 扩展数据
ext_data: Dict[str, Any] = field(default_factory=dict)
def __post_init__(self):
"""初始化后处理"""
# 检测annexb格式
for input_file in self.input_files:
if isinstance(input_file, str) and input_file.endswith(".ts"):
self.annexb = True
break
# 自动生成输出文件名
if not self.output_file:
self._generate_output_filename()
def _generate_output_filename(self):
"""生成输出文件名"""
if self.annexb:
self.output_file = f"rand_{uuid.uuid4()}.ts"
else:
self.output_file = f"rand_{uuid.uuid4()}.mp4"
def add_input_file(self, file_path: str):
"""添加输入文件"""
self.input_files.append(file_path)
if file_path.endswith(".ts"):
self.annexb = True
def add_overlay(self, *overlays: str):
"""添加覆盖层"""
for overlay in overlays:
if overlay.endswith('.ass'):
self.subtitles.append(overlay)
else:
self.overlays.append(overlay)
def add_audios(self, *audios: str):
"""添加音频"""
self.audios.extend(audios)
def add_lut(self, *luts: str):
"""添加LUT"""
self.luts.extend(luts)
def add_effect(self, *effects: str):
"""添加效果"""
self.effects.extend(effects)
def validate(self) -> bool:
"""验证任务参数"""
if not self.input_files:
raise TaskValidationError("No input files specified")
# 验证所有效果
for effect_str in self.effects:
effect_name, params = effect_registry.parse_effect_string(effect_str)
processor = effect_registry.get_processor(effect_name, params, self.ext_data)
if processor and not processor.validate_params():
raise EffectError(f"Invalid parameters for effect {effect_name}: {params}", effect_name, params)
return True
def can_copy(self) -> bool:
"""检查是否可以直接复制"""
return (len(self.luts) == 0 and
len(self.overlays) == 0 and
len(self.subtitles) == 0 and
len(self.effects) == 0 and
self.speed == 1 and
len(self.audios) == 0 and
len(self.input_files) == 1 and
self.zoom_cut is None and
self.center_cut is None)
def can_concat(self) -> bool:
"""检查是否可以使用concat模式"""
return (len(self.luts) == 0 and
len(self.overlays) == 0 and
len(self.subtitles) == 0 and
len(self.effects) == 0 and
self.speed == 1 and
self.zoom_cut is None and
self.center_cut is None)
def determine_task_type(self) -> TaskType:
"""自动确定任务类型"""
if self.can_copy():
return TaskType.COPY
elif self.can_concat():
return TaskType.CONCAT
else:
return TaskType.ENCODE
def update_task_type(self):
"""更新任务类型"""
self.task_type = self.determine_task_type()
def need_processing(self) -> bool:
"""检查是否需要处理"""
if self.annexb:
return True
return not self.can_copy()
def get_output_extension(self) -> str:
"""获取输出文件扩展名"""
return ".ts" if self.annexb else ".mp4"

View File

@@ -1,19 +1,54 @@
from time import sleep from time import sleep
import sys
import biz.task
import config import config
from template import load_local_template import biz.task
from telemetry import init_opentelemetry
from services import DefaultTemplateService
from util import api from util import api
load_local_template() import os
import glob
# 使用新的服务架构
template_service = DefaultTemplateService()
template_service.load_local_templates()
# Check for redownload parameter
if 'redownload' in sys.argv:
print("Redownloading all templates...")
for template_name in template_service.templates.keys():
print(f"Redownloading template: {template_name}")
template_service.download_template(template_name)
print("All templates redownloaded successfully!")
sys.exit(0)
import logging
LOGGER = logging.getLogger(__name__)
init_opentelemetry()
while True: while True:
# print(get_sys_info()) # print(get_sys_info())
print("waiting for task...") print("waiting for task...")
try:
task_list = api.sync_center() task_list = api.sync_center()
except Exception as e:
LOGGER.error("sync_center error", exc_info=e)
sleep(5)
continue
if len(task_list) == 0: if len(task_list) == 0:
# 删除当前文件夹下所有以.mp4、.ts结尾的文件
for file_globs in ['*.mp4', '*.ts', 'tmp_concat*.txt']:
for file_path in glob.glob(file_globs):
try:
os.remove(file_path)
print(f"Deleted file: {file_path}")
except Exception as e:
LOGGER.error(f"Error deleting file {file_path}", exc_info=e)
sleep(5) sleep(5)
for task in task_list: for task in task_list:
print("start task:", task) print("start task:", task)
try:
biz.task.start_task(task) biz.task.start_task(task)
except Exception as e:
LOGGER.error("task_start error", exc_info=e)

View File

@@ -1,3 +1,8 @@
requests~=2.32.3 requests~=2.32.3
psutil~=6.1.0 psutil~=6.1.0
python-dotenv~=1.0.1 python-dotenv~=1.0.1
opentelemetry-api~=1.35.0
opentelemetry-sdk~=1.35.0
opentelemetry-exporter-otlp~=1.35.0
opentelemetry-instrumentation-threading~=0.56b0
flask~=3.1.0

12
services/__init__.py Normal file
View File

@@ -0,0 +1,12 @@
from .render_service import RenderService, DefaultRenderService
from .task_service import TaskService, DefaultTaskService
from .template_service import TemplateService, DefaultTemplateService
__all__ = [
'RenderService',
'DefaultRenderService',
'TaskService',
'DefaultTaskService',
'TemplateService',
'DefaultTemplateService'
]

237
services/render_service.py Normal file
View File

@@ -0,0 +1,237 @@
import subprocess
import os
import logging
from abc import ABC, abstractmethod
from typing import Optional, Union
from opentelemetry.trace import Status, StatusCode
from entity.render_task import RenderTask
from entity.ffmpeg_command_builder import FFmpegCommandBuilder
from util.exceptions import RenderError, FFmpegError
from util.ffmpeg import probe_video_info, fade_out_audio, handle_ffmpeg_output, subprocess_args
from telemetry import get_tracer
logger = logging.getLogger(__name__)
def _convert_ffmpeg_task_to_render_task(ffmpeg_task):
"""将旧的FfmpegTask转换为新的RenderTask"""
from entity.render_task import RenderTask, TaskType
# 获取输入文件
input_files = []
for inp in ffmpeg_task.input_file:
if hasattr(inp, 'get_output_file'):
input_files.append(inp.get_output_file())
else:
input_files.append(str(inp))
# 确定任务类型
task_type = TaskType.COPY
if ffmpeg_task.task_type == 'concat':
task_type = TaskType.CONCAT
elif ffmpeg_task.task_type == 'encode':
task_type = TaskType.ENCODE
# 创建新任务
render_task = RenderTask(
input_files=input_files,
output_file=ffmpeg_task.output_file,
task_type=task_type,
resolution=ffmpeg_task.resolution,
frame_rate=ffmpeg_task.frame_rate,
annexb=ffmpeg_task.annexb,
center_cut=ffmpeg_task.center_cut,
zoom_cut=ffmpeg_task.zoom_cut,
ext_data=getattr(ffmpeg_task, 'ext_data', {})
)
# 复制各种资源
render_task.effects = getattr(ffmpeg_task, 'effects', [])
render_task.luts = getattr(ffmpeg_task, 'luts', [])
render_task.audios = getattr(ffmpeg_task, 'audios', [])
render_task.overlays = getattr(ffmpeg_task, 'overlays', [])
render_task.subtitles = getattr(ffmpeg_task, 'subtitles', [])
return render_task
class RenderService(ABC):
"""渲染服务抽象接口"""
@abstractmethod
def render(self, task: Union[RenderTask, 'FfmpegTask']) -> bool:
"""
执行渲染任务
Args:
task: 渲染任务
Returns:
bool: 渲染是否成功
"""
pass
@abstractmethod
def get_video_info(self, file_path: str) -> tuple[int, int, float]:
"""
获取视频信息
Args:
file_path: 视频文件路径
Returns:
tuple: (width, height, duration)
"""
pass
@abstractmethod
def fade_out_audio(self, file_path: str, duration: float, fade_seconds: float = 2.0) -> str:
"""
音频淡出处理
Args:
file_path: 音频文件路径
duration: 音频总时长
fade_seconds: 淡出时长
Returns:
str: 处理后的文件路径
"""
pass
class DefaultRenderService(RenderService):
"""默认渲染服务实现"""
def render(self, task: Union[RenderTask, 'FfmpegTask']) -> bool:
"""执行渲染任务"""
# 兼容旧的FfmpegTask
if hasattr(task, 'get_ffmpeg_args'): # 这是FfmpegTask
# 使用旧的方式执行
return self._render_legacy_ffmpeg_task(task)
tracer = get_tracer(__name__)
with tracer.start_as_current_span("render_task") as span:
try:
# 验证任务
task.validate()
span.set_attribute("task.type", task.task_type.value)
span.set_attribute("task.input_files", len(task.input_files))
span.set_attribute("task.output_file", task.output_file)
# 检查是否需要处理
if not task.need_processing():
if len(task.input_files) == 1:
task.output_file = task.input_files[0]
span.set_status(Status(StatusCode.OK))
return True
# 构建FFmpeg命令
builder = FFmpegCommandBuilder(task)
ffmpeg_args = builder.build_command()
if not ffmpeg_args:
# 不需要处理,直接返回
if len(task.input_files) == 1:
task.output_file = task.input_files[0]
span.set_status(Status(StatusCode.OK))
return True
# 执行FFmpeg命令
return self._execute_ffmpeg(ffmpeg_args, span)
except Exception as e:
span.set_status(Status(StatusCode.ERROR))
logger.error(f"Render failed: {e}", exc_info=True)
raise RenderError(f"Render failed: {e}") from e
def _execute_ffmpeg(self, args: list[str], span) -> bool:
"""执行FFmpeg命令"""
span.set_attribute("ffmpeg.args", " ".join(args))
logger.info("Executing FFmpeg: %s", " ".join(args))
try:
# 执行FFmpeg进程
process = subprocess.run(
["ffmpeg", "-progress", "-", "-loglevel", "error"] + args[1:],
stderr=subprocess.PIPE,
**subprocess_args(True)
)
span.set_attribute("ffmpeg.return_code", process.returncode)
# 处理输出
if process.stdout:
output = handle_ffmpeg_output(process.stdout)
span.set_attribute("ffmpeg.output", output)
logger.info("FFmpeg output: %s", output)
# 检查返回码
if process.returncode != 0:
error_msg = process.stderr.decode() if process.stderr else "Unknown error"
span.set_attribute("ffmpeg.error", error_msg)
span.set_status(Status(StatusCode.ERROR))
logger.error("FFmpeg failed with return code %d: %s", process.returncode, error_msg)
raise FFmpegError(
f"FFmpeg execution failed",
command=args,
return_code=process.returncode,
stderr=error_msg
)
# 检查输出文件
output_file = args[-1] # 输出文件总是最后一个参数
if not os.path.exists(output_file):
span.set_status(Status(StatusCode.ERROR))
raise RenderError(f"Output file not created: {output_file}")
# 检查文件大小
file_size = os.path.getsize(output_file)
span.set_attribute("output.file_size", file_size)
if file_size < 4096: # 文件过小
span.set_status(Status(StatusCode.ERROR))
raise RenderError(f"Output file too small: {file_size} bytes")
span.set_status(Status(StatusCode.OK))
logger.info("FFmpeg execution completed successfully")
return True
except subprocess.SubprocessError as e:
span.set_status(Status(StatusCode.ERROR))
logger.error("Subprocess error: %s", e)
raise FFmpegError(f"Subprocess error: {e}") from e
def get_video_info(self, file_path: str) -> tuple[int, int, float]:
"""获取视频信息"""
return probe_video_info(file_path)
def fade_out_audio(self, file_path: str, duration: float, fade_seconds: float = 2.0) -> str:
"""音频淡出处理"""
return fade_out_audio(file_path, duration, fade_seconds)
def _render_legacy_ffmpeg_task(self, ffmpeg_task) -> bool:
"""兼容处理旧的FfmpegTask"""
tracer = get_tracer(__name__)
with tracer.start_as_current_span("render_legacy_ffmpeg_task") as span:
try:
# 处理依赖任务
for sub_task in ffmpeg_task.analyze_input_render_tasks():
if not self.render(sub_task):
span.set_status(Status(StatusCode.ERROR))
return False
# 获取FFmpeg参数
ffmpeg_args = ffmpeg_task.get_ffmpeg_args()
if not ffmpeg_args:
# 不需要处理,直接返回
span.set_status(Status(StatusCode.OK))
return True
# 执行FFmpeg命令
return self._execute_ffmpeg(ffmpeg_args, span)
except Exception as e:
span.set_status(Status(StatusCode.ERROR))
logger.error(f"Legacy FFmpeg task render failed: {e}", exc_info=True)
raise RenderError(f"Legacy render failed: {e}") from e

289
services/task_service.py Normal file
View File

@@ -0,0 +1,289 @@
import json
import logging
import os
from abc import ABC, abstractmethod
from concurrent.futures import ThreadPoolExecutor
from typing import Dict, Any, List, Optional
from opentelemetry.trace import Status, StatusCode
from entity.render_task import RenderTask
from services.render_service import RenderService
from services.template_service import TemplateService
from util.exceptions import TaskError, TaskValidationError
from util import api, oss
from telemetry import get_tracer
logger = logging.getLogger(__name__)
class TaskService(ABC):
"""任务服务抽象接口"""
@abstractmethod
def process_task(self, task_info: Dict[str, Any]) -> bool:
"""
处理任务
Args:
task_info: 任务信息
Returns:
bool: 处理是否成功
"""
pass
@abstractmethod
def create_render_task(self, task_info: Dict[str, Any], template_info: Dict[str, Any]) -> RenderTask:
"""
创建渲染任务
Args:
task_info: 任务信息
template_info: 模板信息
Returns:
RenderTask: 渲染任务对象
"""
pass
class DefaultTaskService(TaskService):
"""默认任务服务实现"""
def __init__(self, render_service: RenderService, template_service: TemplateService):
self.render_service = render_service
self.template_service = template_service
def process_task(self, task_info: Dict[str, Any]) -> bool:
"""处理任务"""
tracer = get_tracer(__name__)
with tracer.start_as_current_span("process_task") as span:
try:
# 标准化任务信息
task_info = api.normalize_task(task_info)
span.set_attribute("task.id", task_info.get("id", "unknown"))
span.set_attribute("task.template_id", task_info.get("templateId", "unknown"))
# 获取模板信息
template_id = task_info.get("templateId")
template_info = self.template_service.get_template(template_id)
if not template_info:
raise TaskError(f"Template not found: {template_id}")
# 报告任务开始
api.report_task_start(task_info)
# 创建渲染任务
render_task = self.create_render_task(task_info, template_info)
# 执行渲染
success = self.render_service.render(render_task)
if not success:
span.set_status(Status(StatusCode.ERROR))
api.report_task_failed(task_info, "Render failed")
return False
# 获取视频信息
width, height, duration = self.render_service.get_video_info(render_task.output_file)
span.set_attribute("video.width", width)
span.set_attribute("video.height", height)
span.set_attribute("video.duration", duration)
# 音频淡出
new_file = self.render_service.fade_out_audio(render_task.output_file, duration)
render_task.output_file = new_file
# 上传文件 - 创建一个兼容对象
class TaskCompat:
def __init__(self, output_file):
self.output_file = output_file
def get_output_file(self):
return self.output_file
task_compat = TaskCompat(render_task.output_file)
upload_success = api.upload_task_file(task_info, task_compat)
if not upload_success:
span.set_status(Status(StatusCode.ERROR))
api.report_task_failed(task_info, "Upload failed")
return False
# 清理临时文件
self._cleanup_temp_files(render_task)
# 报告任务成功
api.report_task_success(task_info, videoInfo={
"width": width,
"height": height,
"duration": duration
})
span.set_status(Status(StatusCode.OK))
return True
except Exception as e:
span.set_status(Status(StatusCode.ERROR))
logger.error(f"Task processing failed: {e}", exc_info=True)
api.report_task_failed(task_info, str(e))
return False
def create_render_task(self, task_info: Dict[str, Any], template_info: Dict[str, Any]) -> RenderTask:
"""创建渲染任务"""
tracer = get_tracer(__name__)
with tracer.start_as_current_span("create_render_task") as span:
# 解析任务参数
task_params_str = task_info.get("taskParams", "{}")
span.set_attribute("task_params", task_params_str)
try:
task_params = json.loads(task_params_str)
task_params_orig = json.loads(task_params_str)
except json.JSONDecodeError as e:
raise TaskValidationError(f"Invalid task params JSON: {e}")
# 并行下载资源
self._download_resources(task_params)
# 创建子任务列表
sub_tasks = []
only_if_usage_count = {}
for part in template_info.get("video_parts", []):
source, ext_data = self._parse_video_source(
part.get('source'), task_params, template_info
)
if not source:
logger.warning("No video found for part: %s", part)
continue
# 检查only_if条件
only_if = part.get('only_if', '')
if only_if:
only_if_usage_count[only_if] = only_if_usage_count.get(only_if, 0) + 1
required_count = only_if_usage_count[only_if]
if not self._check_placeholder_exist_with_count(only_if, task_params_orig, required_count):
logger.info("Skipping part due to only_if condition: %s (need %d)", only_if, required_count)
continue
# 创建子任务
sub_task = self._create_sub_task(part, source, ext_data, template_info)
sub_tasks.append(sub_task)
# 创建主任务
output_file = f"out_{task_info.get('id', 'unknown')}.mp4"
main_task = RenderTask(
input_files=[task.output_file for task in sub_tasks],
output_file=output_file,
resolution=template_info.get("video_size", ""),
frame_rate=template_info.get("frame_rate", 25),
center_cut=template_info.get("crop_mode"),
zoom_cut=template_info.get("zoom_cut")
)
# 应用整体模板设置
overall_template = template_info.get("overall_template", {})
self._apply_template_settings(main_task, overall_template, template_info)
# 设置扩展数据
main_task.ext_data = task_info
span.set_attribute("render_task.sub_tasks", len(sub_tasks))
span.set_attribute("render_task.effects", len(main_task.effects))
return main_task
def _download_resources(self, task_params: Dict[str, Any]):
"""并行下载资源"""
with ThreadPoolExecutor(max_workers=8) as executor:
for param_list in task_params.values():
if isinstance(param_list, list):
for param in param_list:
url = param.get("url", "")
if url.startswith("http"):
_, filename = os.path.split(url)
executor.submit(oss.download_from_oss, url, filename, True)
def _parse_video_source(self, source: str, task_params: Dict[str, Any],
template_info: Dict[str, Any]) -> tuple[Optional[str], Dict[str, Any]]:
"""解析视频源"""
if source.startswith('PLACEHOLDER_'):
placeholder_id = source.replace('PLACEHOLDER_', '')
new_sources = task_params.get(placeholder_id, [])
pick_source = {}
if isinstance(new_sources, list):
if len(new_sources) == 0:
logger.debug("No video found for placeholder: %s", placeholder_id)
return None, pick_source
else:
pick_source = new_sources.pop(0)
new_sources = pick_source.get("url", "")
if new_sources.startswith("http"):
_, source_name = os.path.split(new_sources)
oss.download_from_oss(new_sources, source_name, True)
return source_name, pick_source
return new_sources, pick_source
return os.path.join(template_info.get("local_path", ""), source), {}
def _check_placeholder_exist_with_count(self, placeholder_id: str, task_params: Dict[str, Any],
required_count: int = 1) -> bool:
"""检查占位符是否存在足够数量的片段"""
if placeholder_id in task_params:
new_sources = task_params.get(placeholder_id, [])
if isinstance(new_sources, list):
return len(new_sources) >= required_count
return required_count <= 1
return False
def _create_sub_task(self, part: Dict[str, Any], source: str, ext_data: Dict[str, Any],
template_info: Dict[str, Any]) -> RenderTask:
"""创建子任务"""
sub_task = RenderTask(
input_files=[source],
resolution=template_info.get("video_size", ""),
frame_rate=template_info.get("frame_rate", 25),
annexb=True,
center_cut=part.get("crop_mode"),
zoom_cut=part.get("zoom_cut"),
ext_data=ext_data
)
# 应用部分模板设置
self._apply_template_settings(sub_task, part, template_info)
return sub_task
def _apply_template_settings(self, task: RenderTask, template_part: Dict[str, Any],
template_info: Dict[str, Any]):
"""应用模板设置到任务"""
# 添加效果
for effect in template_part.get('effects', []):
task.add_effect(effect)
# 添加LUT
for lut in template_part.get('luts', []):
full_path = os.path.join(template_info.get("local_path", ""), lut)
task.add_lut(full_path.replace("\\", "/"))
# 添加音频
for audio in template_part.get('audios', []):
full_path = os.path.join(template_info.get("local_path", ""), audio)
task.add_audios(full_path)
# 添加覆盖层
for overlay in template_part.get('overlays', []):
full_path = os.path.join(template_info.get("local_path", ""), overlay)
task.add_overlay(full_path)
def _cleanup_temp_files(self, task: RenderTask):
"""清理临时文件"""
try:
template_dir = os.getenv("TEMPLATE_DIR", "")
if template_dir and template_dir not in task.output_file:
if os.path.exists(task.output_file):
os.remove(task.output_file)
logger.info("Cleaned up temp file: %s", task.output_file)
else:
logger.info("Skipped cleanup of template file: %s", task.output_file)
except OSError as e:
logger.warning("Failed to cleanup temp file %s: %s", task.output_file, e)

View File

@@ -0,0 +1,266 @@
import json
import os
import logging
from abc import ABC, abstractmethod
from typing import Dict, Any, Optional
from opentelemetry.trace import Status, StatusCode
from util.exceptions import TemplateError, TemplateNotFoundError, TemplateValidationError
from util import api, oss
from config.settings import get_storage_config
from telemetry import get_tracer
logger = logging.getLogger(__name__)
class TemplateService(ABC):
"""模板服务抽象接口"""
@abstractmethod
def get_template(self, template_id: str) -> Optional[Dict[str, Any]]:
"""
获取模板信息
Args:
template_id: 模板ID
Returns:
Dict[str, Any]: 模板信息,如果不存在则返回None
"""
pass
@abstractmethod
def load_local_templates(self):
"""加载本地模板"""
pass
@abstractmethod
def download_template(self, template_id: str) -> bool:
"""
下载模板
Args:
template_id: 模板ID
Returns:
bool: 下载是否成功
"""
pass
@abstractmethod
def validate_template(self, template_info: Dict[str, Any]) -> bool:
"""
验证模板
Args:
template_info: 模板信息
Returns:
bool: 验证是否通过
"""
pass
class DefaultTemplateService(TemplateService):
"""默认模板服务实现"""
def __init__(self):
self.templates: Dict[str, Dict[str, Any]] = {}
self.storage_config = get_storage_config()
def get_template(self, template_id: str) -> Optional[Dict[str, Any]]:
"""获取模板信息"""
if template_id not in self.templates:
# 尝试下载模板
if not self.download_template(template_id):
return None
return self.templates.get(template_id)
def load_local_templates(self):
"""加载本地模板"""
template_dir = self.storage_config.template_dir
if not os.path.exists(template_dir):
logger.warning("Template directory does not exist: %s", template_dir)
return
for template_name in os.listdir(template_dir):
if template_name.startswith("_") or template_name.startswith("."):
continue
target_path = os.path.join(template_dir, template_name)
if os.path.isdir(target_path):
try:
self._load_template(template_name, target_path)
except Exception as e:
logger.error("Failed to load template %s: %s", template_name, e)
def download_template(self, template_id: str) -> bool:
"""下载模板"""
tracer = get_tracer(__name__)
with tracer.start_as_current_span("download_template") as span:
try:
span.set_attribute("template.id", template_id)
# 获取远程模板信息
template_info = api.get_template_info(template_id)
if template_info is None:
logger.warning("Failed to get template info: %s", template_id)
return False
local_path = template_info.get('local_path')
if not local_path:
local_path = os.path.join(self.storage_config.template_dir, str(template_id))
template_info['local_path'] = local_path
# 创建本地目录
if not os.path.isdir(local_path):
os.makedirs(local_path)
# 下载模板资源
overall_template = template_info.get('overall_template', {})
video_parts = template_info.get('video_parts', [])
self._download_template_assets(overall_template, template_info)
for video_part in video_parts:
self._download_template_assets(video_part, template_info)
# 保存模板定义文件
template_file = os.path.join(local_path, 'template.json')
with open(template_file, 'w', encoding='utf-8') as f:
json.dump(template_info, f, ensure_ascii=False, indent=2)
# 加载到内存
self._load_template(template_id, local_path)
span.set_status(Status(StatusCode.OK))
logger.info("Template downloaded successfully: %s", template_id)
return True
except Exception as e:
span.set_status(Status(StatusCode.ERROR))
logger.error("Failed to download template %s: %s", template_id, e)
return False
def validate_template(self, template_info: Dict[str, Any]) -> bool:
"""验证模板"""
try:
local_path = template_info.get("local_path")
if not local_path:
raise TemplateValidationError("Template missing local_path")
# 验证视频部分
for video_part in template_info.get("video_parts", []):
self._validate_template_part(video_part, local_path)
# 验证整体模板
overall_template = template_info.get("overall_template", {})
if overall_template:
self._validate_template_part(overall_template, local_path)
return True
except TemplateValidationError:
raise
except Exception as e:
raise TemplateValidationError(f"Template validation failed: {e}")
def _load_template(self, template_name: str, local_path: str):
"""加载单个模板"""
logger.info("Loading template: %s (%s)", template_name, local_path)
template_def_file = os.path.join(local_path, "template.json")
if not os.path.exists(template_def_file):
raise TemplateNotFoundError(f"Template definition file not found: {template_def_file}")
try:
with open(template_def_file, 'r', encoding='utf-8') as f:
template_info = json.load(f)
except json.JSONDecodeError as e:
raise TemplateError(f"Invalid template JSON: {e}")
template_info["local_path"] = local_path
try:
self.validate_template(template_info)
self.templates[template_name] = template_info
logger.info("Template loaded successfully: %s", template_name)
except TemplateValidationError as e:
logger.error("Template validation failed for %s: %s. Attempting to re-download.", template_name, e)
# 模板验证失败,尝试重新下载
if self.download_template(template_name):
logger.info("Template re-downloaded successfully: %s", template_name)
else:
logger.error("Failed to re-download template: %s", template_name)
raise
def _download_template_assets(self, template_part: Dict[str, Any], template_info: Dict[str, Any]):
"""下载模板资源"""
local_path = template_info['local_path']
# 下载源文件
if 'source' in template_part:
source = template_part['source']
if isinstance(source, str) and source.startswith("http"):
_, filename = os.path.split(source)
new_file_path = os.path.join(local_path, filename)
oss.download_from_oss(source, new_file_path)
if filename.endswith(".mp4"):
from util.ffmpeg import re_encode_and_annexb
new_file_path = re_encode_and_annexb(new_file_path)
template_part['source'] = os.path.relpath(new_file_path, local_path)
# 下载覆盖层
if 'overlays' in template_part:
for i, overlay in enumerate(template_part['overlays']):
if isinstance(overlay, str) and overlay.startswith("http"):
_, filename = os.path.split(overlay)
oss.download_from_oss(overlay, os.path.join(local_path, filename))
template_part['overlays'][i] = filename
# 下载LUT
if 'luts' in template_part:
for i, lut in enumerate(template_part['luts']):
if isinstance(lut, str) and lut.startswith("http"):
_, filename = os.path.split(lut)
oss.download_from_oss(lut, os.path.join(local_path, filename))
template_part['luts'][i] = filename
# 下载音频
if 'audios' in template_part:
for i, audio in enumerate(template_part['audios']):
if isinstance(audio, str) and audio.startswith("http"):
_, filename = os.path.split(audio)
oss.download_from_oss(audio, os.path.join(local_path, filename))
template_part['audios'][i] = filename
def _validate_template_part(self, template_part: Dict[str, Any], base_dir: str):
"""验证模板部分"""
# 验证源文件
source_file = template_part.get("source", "")
if source_file and not source_file.startswith("http") and not source_file.startswith("PLACEHOLDER_"):
if not os.path.isabs(source_file):
source_file = os.path.join(base_dir, source_file)
if not os.path.exists(source_file):
raise TemplateValidationError(f"Source file not found: {source_file}")
# 验证音频文件
for audio in template_part.get("audios", []):
if not os.path.isabs(audio):
audio = os.path.join(base_dir, audio)
if not os.path.exists(audio):
raise TemplateValidationError(f"Audio file not found: {audio}")
# 验证LUT文件
for lut in template_part.get("luts", []):
if not os.path.isabs(lut):
lut = os.path.join(base_dir, lut)
if not os.path.exists(lut):
raise TemplateValidationError(f"LUT file not found: {lut}")
# 验证覆盖层文件
for overlay in template_part.get("overlays", []):
if not os.path.isabs(overlay):
overlay = os.path.join(base_dir, overlay)
if not os.path.exists(overlay):
raise TemplateValidationError(f"Overlay file not found: {overlay}")

37
telemetry/__init__.py Normal file
View File

@@ -0,0 +1,37 @@
import os
from constant import SOFTWARE_VERSION
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter as OTLPSpanHttpExporter
from opentelemetry.sdk.resources import DEPLOYMENT_ENVIRONMENT, HOST_NAME, Resource, SERVICE_NAME, SERVICE_VERSION
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, SimpleSpanProcessor
from opentelemetry.instrumentation.threading import ThreadingInstrumentor
ThreadingInstrumentor().instrument()
def get_tracer(name):
return trace.get_tracer(name)
# 初始化 OpenTelemetry
def init_opentelemetry(batch=True):
# 设置服务名、主机名
resource = Resource(attributes={
SERVICE_NAME: "RENDER_WORKER",
SERVICE_VERSION: SOFTWARE_VERSION,
DEPLOYMENT_ENVIRONMENT: "Python",
HOST_NAME: os.getenv("ACCESS_KEY"),
})
# 使用HTTP协议上报
if batch:
span_processor = BatchSpanProcessor(OTLPSpanHttpExporter(
endpoint="https://oltp.jerryyan.top/v1/traces",
))
else:
span_processor = SimpleSpanProcessor(OTLPSpanHttpExporter(
endpoint="https://oltp.jerryyan.top/v1/traces",
))
trace_provider = TracerProvider(resource=resource, active_span_processor=span_processor)
trace.set_tracer_provider(trace_provider)

View File

@@ -2,122 +2,68 @@ import json
import os import os
import logging import logging
from telemetry import get_tracer
from util import api, oss from util import api, oss
from services.template_service import DefaultTemplateService
TEMPLATES = {}
logger = logging.getLogger("template") logger = logging.getLogger("template")
def check_local_template(local_name): # 全局模板服务实例
template_def = TEMPLATES[local_name] _template_service = None
base_dir = template_def.get("local_path")
for video_part in template_def.get("video_parts", []):
source_file = video_part.get("source", "")
if str(source_file).startswith("http"):
# download file
...
elif str(source_file).startswith("PLACEHOLDER_"):
continue
else:
if not os.path.isabs(source_file):
source_file = os.path.join(base_dir, source_file)
if not os.path.exists(source_file):
logger.error(f"{source_file} not found, please check the template definition")
raise Exception(f"{source_file} not found, please check the template definition")
for audio in video_part.get("audios", []):
if not os.path.isabs(audio):
audio = os.path.join(base_dir, audio)
if not os.path.exists(audio):
logger.error(f"{audio} not found, please check the template definition")
raise Exception(f"{audio} not found, please check the template definition")
for lut in video_part.get("luts", []):
if not os.path.isabs(lut):
lut = os.path.join(base_dir, lut)
if not os.path.exists(lut):
logger.error(f"{lut} not found, please check the template definition")
raise Exception(f"{lut} not found, please check the template definition")
for mask in video_part.get("overlays", []):
if not os.path.isabs(mask):
mask = os.path.join(base_dir, mask)
if not os.path.exists(mask):
logger.error(f"{mask} not found, please check the template definition")
raise Exception(f"{mask} not found, please check the template definition")
def _get_template_service():
"""获取模板服务实例"""
global _template_service
if _template_service is None:
_template_service = DefaultTemplateService()
return _template_service
# 向后兼容的全局变量和函数
TEMPLATES = {}
def _update_templates_dict():
"""更新全局TEMPLATES字典以保持向后兼容"""
service = _get_template_service()
TEMPLATES.clear()
TEMPLATES.update(service.templates)
def check_local_template(local_name):
"""向后兼容函数"""
service = _get_template_service()
template_def = service.templates.get(local_name)
if template_def:
try:
service.validate_template(template_def)
except Exception as e:
logger.error(f"Template validation failed: {e}")
raise
def load_template(template_name, local_path): def load_template(template_name, local_path):
global TEMPLATES """向后兼容函数"""
logger.info(f"加载视频模板定义:【{template_name}{local_path})】") service = _get_template_service()
template_def_file = os.path.join(local_path, "template.json") service._load_template(template_name, local_path)
if os.path.exists(template_def_file): _update_templates_dict()
TEMPLATES[template_name] = json.load(open(template_def_file, 'rb'))
TEMPLATES[template_name]["local_path"] = local_path
try:
check_local_template(template_name)
logger.info(f"完成加载【{template_name}】模板")
except Exception as e:
logger.error(f"模板定义文件【{template_def_file}】有误,正在尝试重新下载模板", exc_info=e)
download_template(template_name)
def load_local_template(): def load_local_template():
for template_name in os.listdir(os.getenv("TEMPLATE_DIR")): """加载本地模板(向后兼容函数)"""
if template_name.startswith("_"): service = _get_template_service()
continue service.load_local_templates()
if template_name.startswith("."): _update_templates_dict()
continue
target_path = os.path.join(os.getenv("TEMPLATE_DIR"), template_name)
if os.path.isdir(target_path):
load_template(template_name, target_path)
def get_template_def(template_id): def get_template_def(template_id):
if template_id not in TEMPLATES: """获取模板定义(向后兼容函数)"""
download_template(template_id) service = _get_template_service()
return TEMPLATES.get(template_id) template = service.get_template(template_id)
_update_templates_dict()
return template
def download_template(template_id): def download_template(template_id):
template_info = api.get_template_info(template_id) """下载模板(向后兼容函数)"""
if not os.path.isdir(template_info['local_path']): service = _get_template_service()
os.makedirs(template_info['local_path']) success = service.download_template(template_id)
# download template assets _update_templates_dict()
overall_template = template_info['overall_template'] return success
video_parts = template_info['video_parts']
def _download_assets(_template):
if 'source' in _template:
if str(_template['source']).startswith("http"):
_, _fn = os.path.split(_template['source'])
new_fp = os.path.join(template_info['local_path'], _fn)
oss.download_from_oss(_template['source'], new_fp)
if _fn.endswith(".mp4"):
from util.ffmpeg import to_annexb
new_fp = to_annexb(new_fp)
_template['source'] = os.path.relpath(new_fp, template_info['local_path'])
if 'overlays' in _template:
for i in range(len(_template['overlays'])):
overlay = _template['overlays'][i]
if str(overlay).startswith("http"):
_, _fn = os.path.split(overlay)
oss.download_from_oss(overlay, os.path.join(template_info['local_path'], _fn))
_template['overlays'][i] = _fn
if 'luts' in _template:
for i in range(len(_template['luts'])):
lut = _template['luts'][i]
if str(lut).startswith("http"):
_, _fn = os.path.split(lut)
oss.download_from_oss(lut, os.path.join(template_info['local_path'], _fn))
_template['luts'][i] = _fn
if 'audios' in _template:
for i in range(len(_template['audios'])):
if str(_template['audios'][i]).startswith("http"):
_, _fn = os.path.split(_template['audios'][i])
oss.download_from_oss(_template['audios'][i], os.path.join(template_info['local_path'], _fn))
_template['audios'][i] = _fn
_download_assets(overall_template)
for video_part in video_parts:
_download_assets(video_part)
with open(os.path.join(template_info['local_path'], 'template.json'), 'w', encoding='utf-8') as f:
json.dump(template_info, f)
load_template(template_id, template_info['local_path'])
def analyze_template(template_id): def analyze_template(template_id):
... """分析模板(占位符函数)"""
pass

View File

@@ -1,9 +1,14 @@
import json
import logging import logging
import os import os
import threading
import requests import requests
from opentelemetry.trace import Status, StatusCode
import util.system import util.system
from telemetry import get_tracer
from util import oss
session = requests.Session() session = requests.Session()
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -19,13 +24,14 @@ def sync_center():
通过接口获取任务 通过接口获取任务
:return: 任务列表 :return: 任务列表
""" """
from services import DefaultTemplateService
template_service = DefaultTemplateService()
try: try:
from template import TEMPLATES, download_template
response = session.post(os.getenv('API_ENDPOINT') + "/sync", json={ response = session.post(os.getenv('API_ENDPOINT') + "/sync", json={
'accessKey': os.getenv('ACCESS_KEY'), 'accessKey': os.getenv('ACCESS_KEY'),
'clientStatus': util.system.get_sys_info(), 'clientStatus': util.system.get_sys_info(),
'templateList': [{'id': t.get('id', ''), 'updateTime': t.get('updateTime', '')} for t in TEMPLATES.values()] 'templateList': [{'id': t.get('id', ''), 'updateTime': t.get('updateTime', '')} for t in
template_service.templates.values()]
}, timeout=10) }, timeout=10)
response.raise_for_status() response.raise_for_status()
except requests.RequestException as e: except requests.RequestException as e:
@@ -40,11 +46,18 @@ def sync_center():
tasks = [] tasks = []
templates = [] templates = []
logger.warning("获取任务失败") logger.warning("获取任务失败")
if os.getenv("REDIRECT_TO_URL", False) != False:
for task in tasks:
_sess = requests.Session()
logger.info("重定向任务【%s】至配置的地址:%s", task.get("id"), os.getenv("REDIRECT_TO_URL"))
url = f"{os.getenv('REDIRECT_TO_URL')}{task.get('id')}"
threading.Thread(target=requests.post, args=(url,)).start()
return []
for template in templates: for template in templates:
template_id = template.get('id', '') template_id = template.get('id', '')
if template_id: if template_id:
logger.info("更新模板:【%s", template_id) logger.info("更新模板:【%s", template_id)
download_template(template_id) template_service.download_template(template_id)
return tasks return tasks
@@ -56,24 +69,35 @@ def get_template_info(template_id):
:type template_id: str :type template_id: str
:return: 模板信息 :return: 模板信息
""" """
tracer = get_tracer(__name__)
with tracer.start_as_current_span("get_template_info"):
with tracer.start_as_current_span("get_template_info.request") as req_span:
try: try:
req_span.set_attribute("http.method", "POST")
req_span.set_attribute("http.url", '{0}/template/{1}'.format(os.getenv('API_ENDPOINT'), template_id))
response = session.post('{0}/template/{1}'.format(os.getenv('API_ENDPOINT'), template_id), json={ response = session.post('{0}/template/{1}'.format(os.getenv('API_ENDPOINT'), template_id), json={
'accessKey': os.getenv('ACCESS_KEY'), 'accessKey': os.getenv('ACCESS_KEY'),
}, timeout=10) }, timeout=10)
req_span.set_attribute("http.status_code", response.status_code)
req_span.set_attribute("http.response", response.text)
response.raise_for_status() response.raise_for_status()
except requests.RequestException as e: except requests.RequestException as e:
req_span.set_attribute("api.error", str(e))
logger.error("请求失败!", e) logger.error("请求失败!", e)
return None return None
data = response.json() data = response.json()
logger.debug("获取模板信息结果:【%s", data)
remote_template_info = data.get('data', {}) remote_template_info = data.get('data', {})
logger.debug("获取模板信息结果:【%s", remote_template_info) if not remote_template_info:
logger.warning("获取模板信息结果为空", data)
return None
template = { template = {
'id': template_id, 'id': template_id,
'updateTime': remote_template_info.get('updateTime', template_id), 'updateTime': remote_template_info.get('updateTime', template_id),
'scenic_name': remote_template_info.get('scenicName', '景区'), 'scenic_name': remote_template_info.get('scenicName', '景区'),
'name': remote_template_info.get('name', '模版'), 'name': remote_template_info.get('name', '模版'),
'video_size': '1920x1080', 'video_size': remote_template_info.get('resolution', '1920x1080'),
'frame_rate': 30, 'frame_rate': 25,
'overall_duration': 30, 'overall_duration': 30,
'video_parts': [ 'video_parts': [
@@ -90,6 +114,8 @@ def get_template_info(template_id):
# 占位符 # 占位符
_template['source'] = "PLACEHOLDER_" + template_info.get('sourceUrl', '') _template['source'] = "PLACEHOLDER_" + template_info.get('sourceUrl', '')
_template['mute'] = template_info.get('mute', True) _template['mute'] = template_info.get('mute', True)
_template['crop_mode'] = template_info.get('cropEnable', None)
_template['zoom_cut'] = template_info.get('zoomCut', None)
else: else:
_template['source'] = None _template['source'] = None
_overlays = template_info.get('overlays', '') _overlays = template_info.get('overlays', '')
@@ -101,6 +127,12 @@ def get_template_info(template_id):
_luts = template_info.get('luts', '') _luts = template_info.get('luts', '')
if _luts: if _luts:
_template['luts'] = _luts.split(",") _template['luts'] = _luts.split(",")
_only_if = template_info.get('onlyIf', '')
if _only_if:
_template['only_if'] = _only_if
_effects = template_info.get('effects', '')
if _effects:
_template['effects'] = _effects.split("|")
return _template return _template
# outer template definition # outer template definition
@@ -112,63 +144,118 @@ def get_template_info(template_id):
parts = _template_normalizer(children_template) parts = _template_normalizer(children_template)
template['video_parts'].append(parts) template['video_parts'].append(parts)
template['local_path'] = os.path.join(os.getenv('TEMPLATE_DIR'), str(template_id)) template['local_path'] = os.path.join(os.getenv('TEMPLATE_DIR'), str(template_id))
with get_tracer("api").start_as_current_span("get_template_info.template") as res_span:
res_span.set_attribute("normalized.response", json.dumps(template))
return template return template
def report_task_success(task_info, **kwargs): def report_task_success(task_info, **kwargs):
tracer = get_tracer(__name__)
with tracer.start_as_current_span("report_task_success"):
with tracer.start_as_current_span("report_task_success.request") as req_span:
try: try:
req_span.set_attribute("http.method", "POST")
req_span.set_attribute("http.url",
'{0}/{1}/success'.format(os.getenv('API_ENDPOINT'), task_info.get("id")))
response = session.post('{0}/{1}/success'.format(os.getenv('API_ENDPOINT'), task_info.get("id")), json={ response = session.post('{0}/{1}/success'.format(os.getenv('API_ENDPOINT'), task_info.get("id")), json={
'accessKey': os.getenv('ACCESS_KEY'), 'accessKey': os.getenv('ACCESS_KEY'),
**kwargs **kwargs
}, timeout=10) }, timeout=10)
req_span.set_attribute("http.status_code", response.status_code)
req_span.set_attribute("http.response", response.text)
response.raise_for_status() response.raise_for_status()
req_span.set_status(Status(StatusCode.OK))
except requests.RequestException as e: except requests.RequestException as e:
req_span.set_attribute("api.error", str(e))
logger.error("请求失败!", e) logger.error("请求失败!", e)
return None return None
def report_task_start(task_info): def report_task_start(task_info):
tracer = get_tracer(__name__)
with tracer.start_as_current_span("report_task_start"):
with tracer.start_as_current_span("report_task_start.request") as req_span:
try: try:
req_span.set_attribute("http.method", "POST")
req_span.set_attribute("http.url",
'{0}/{1}/start'.format(os.getenv('API_ENDPOINT'), task_info.get("id")))
response = session.post('{0}/{1}/start'.format(os.getenv('API_ENDPOINT'), task_info.get("id")), json={ response = session.post('{0}/{1}/start'.format(os.getenv('API_ENDPOINT'), task_info.get("id")), json={
'accessKey': os.getenv('ACCESS_KEY'), 'accessKey': os.getenv('ACCESS_KEY'),
}, timeout=10) }, timeout=10)
req_span.set_attribute("http.status_code", response.status_code)
req_span.set_attribute("http.response", response.text)
response.raise_for_status() response.raise_for_status()
req_span.set_status(Status(StatusCode.OK))
except requests.RequestException as e: except requests.RequestException as e:
req_span.set_attribute("api.error", str(e))
logger.error("请求失败!", e) logger.error("请求失败!", e)
return None return None
def report_task_failed(task_info, reason=''): def report_task_failed(task_info, reason=''):
tracer = get_tracer(__name__)
with tracer.start_as_current_span("report_task_failed") as span:
span.set_attribute("task_id", task_info.get("id"))
span.set_attribute("reason", reason)
with tracer.start_as_current_span("report_task_failed.request") as req_span:
try: try:
req_span.set_attribute("http.method", "POST")
req_span.set_attribute("http.url",
'{0}/{1}/fail'.format(os.getenv('API_ENDPOINT'), task_info.get("id")))
response = session.post('{0}/{1}/fail'.format(os.getenv('API_ENDPOINT'), task_info.get("id")), json={ response = session.post('{0}/{1}/fail'.format(os.getenv('API_ENDPOINT'), task_info.get("id")), json={
'accessKey': os.getenv('ACCESS_KEY'), 'accessKey': os.getenv('ACCESS_KEY'),
'reason': reason 'reason': reason
}, timeout=10) }, timeout=10)
req_span.set_attribute("http.status_code", response.status_code)
req_span.set_attribute("http.response", response.text)
response.raise_for_status() response.raise_for_status()
req_span.set_status(Status(StatusCode.OK))
except requests.RequestException as e: except requests.RequestException as e:
req_span.set_attribute("api.error", str(e))
req_span.set_status(Status(StatusCode.ERROR))
logger.error("请求失败!", e) logger.error("请求失败!", e)
return None return None
def upload_task_file(task_info, ffmpeg_task): def upload_task_file(task_info, ffmpeg_task):
tracer = get_tracer(__name__)
with get_tracer("api").start_as_current_span("upload_task_file") as span:
logger.info("开始上传文件: %s", task_info.get("id")) logger.info("开始上传文件: %s", task_info.get("id"))
span.set_attribute("file.id", task_info.get("id"))
with tracer.start_as_current_span("upload_task_file.request_upload_url") as req_span:
try: try:
response = session.post('{0}/{1}/uploadUrl'.format(os.getenv('API_ENDPOINT'), task_info.get("id")), json={ req_span.set_attribute("http.method", "POST")
req_span.set_attribute("http.url",
'{0}/{1}/uploadUrl'.format(os.getenv('API_ENDPOINT'), task_info.get("id")))
response = session.post('{0}/{1}/uploadUrl'.format(os.getenv('API_ENDPOINT'), task_info.get("id")),
json={
'accessKey': os.getenv('ACCESS_KEY'), 'accessKey': os.getenv('ACCESS_KEY'),
}, timeout=10) }, timeout=10)
req_span.set_attribute("http.status_code", response.status_code)
req_span.set_attribute("http.response", response.text)
response.raise_for_status() response.raise_for_status()
req_span.set_status(Status(StatusCode.OK))
except requests.RequestException as e: except requests.RequestException as e:
span.set_attribute("api.error", str(e))
req_span.set_status(Status(StatusCode.ERROR))
logger.error("请求失败!", e) logger.error("请求失败!", e)
return False return False
data = response.json() data = response.json()
url = data.get('data', "") url = data.get('data', "")
logger.info("开始上传文件: %s%s", task_info.get("id"), url) logger.info("开始上传文件: %s%s", task_info.get("id"), url)
return oss.upload_to_oss(url, ffmpeg_task.get_output_file())
def get_task_info(id):
try: try:
with open(ffmpeg_task.get_output_file(), 'rb') as f: response = session.get(os.getenv('API_ENDPOINT') + "/" + id + "/info", params={
requests.put(url, data=f) 'accessKey': os.getenv('ACCESS_KEY'),
}, timeout=10)
response.raise_for_status()
except requests.RequestException as e: except requests.RequestException as e:
logger.error("上传失败!", e) logger.error("请求失败!", e)
return False return []
finally: data = response.json()
logger.info("上传文件结束: %s", task_info.get("id")) logger.debug("获取任务结果:【%s", data)
return True if data.get('code', 0) == 200:
return data.get('data', {})

72
util/exceptions.py Normal file
View File

@@ -0,0 +1,72 @@
class RenderWorkerError(Exception):
"""RenderWorker基础异常类"""
def __init__(self, message: str, error_code: str = None):
super().__init__(message)
self.message = message
self.error_code = error_code or self.__class__.__name__
class ConfigurationError(RenderWorkerError):
"""配置错误"""
pass
class TemplateError(RenderWorkerError):
"""模板相关错误"""
pass
class TemplateNotFoundError(TemplateError):
"""模板未找到错误"""
pass
class TemplateValidationError(TemplateError):
"""模板验证错误"""
pass
class TaskError(RenderWorkerError):
"""任务处理错误"""
pass
class TaskValidationError(TaskError):
"""任务参数验证错误"""
pass
class RenderError(RenderWorkerError):
"""渲染处理错误"""
pass
class FFmpegError(RenderError):
"""FFmpeg执行错误"""
def __init__(self, message: str, command: list = None, return_code: int = None, stderr: str = None):
super().__init__(message)
self.command = command
self.return_code = return_code
self.stderr = stderr
class EffectError(RenderError):
"""效果处理错误"""
def __init__(self, message: str, effect_name: str = None, effect_params: str = None):
super().__init__(message)
self.effect_name = effect_name
self.effect_params = effect_params
class StorageError(RenderWorkerError):
"""存储相关错误"""
pass
class APIError(RenderWorkerError):
"""API调用错误"""
def __init__(self, message: str, status_code: int = None, response_body: str = None):
super().__init__(message)
self.status_code = status_code
self.response_body = response_body
class ResourceError(RenderWorkerError):
"""资源相关错误"""
pass
class ResourceNotFoundError(ResourceError):
"""资源未找到错误"""
pass
class DownloadError(ResourceError):
"""下载错误"""
pass

View File

@@ -1,40 +1,98 @@
import json
import logging import logging
import os import os
import subprocess import subprocess
from datetime import datetime from datetime import datetime
from typing import Optional, IO from typing import Optional, IO
from entity.ffmpeg import FfmpegTask from opentelemetry.trace import Status, StatusCode
from entity.ffmpeg import FfmpegTask, ENCODER_ARGS, VIDEO_ARGS, AUDIO_ARGS, MUTE_AUDIO_INPUT, get_mp4toannexb_filter
from telemetry import get_tracer
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def to_annexb(file): def re_encode_and_annexb(file):
with get_tracer("ffmpeg").start_as_current_span("re_encode_and_annexb") as span:
span.set_attribute("file.path", file)
if not os.path.exists(file): if not os.path.exists(file):
span.set_status(Status(StatusCode.ERROR))
return file return file
logger.info("ToAnnexb: %s", file) logger.info("ReEncodeAndAnnexb: %s", file)
ffmpeg_process = subprocess.run(["ffmpeg", "-y", "-hide_banner", "-i", file, "-c", "copy", "-bsf:v", "h264_mp4toannexb", has_audio = not not probe_video_audio(file)
# 优先使用RE_ENCODE_VIDEO_ARGS环境变量,其次使用默认的VIDEO_ARGS
if os.getenv("RE_ENCODE_VIDEO_ARGS", False):
_video_args = tuple(os.getenv("RE_ENCODE_VIDEO_ARGS", "").split(" "))
else:
_video_args = VIDEO_ARGS
# 优先使用RE_ENCODE_ENCODER_ARGS环境变量,其次使用默认的ENCODER_ARGS
if os.getenv("RE_ENCODE_ENCODER_ARGS", False):
_encoder_args = tuple(os.getenv("RE_ENCODE_ENCODER_ARGS", "").split(" "))
else:
_encoder_args = ENCODER_ARGS
ffmpeg_process = subprocess.run(["ffmpeg", "-y", "-hide_banner", "-i", file,
*(set() if has_audio else MUTE_AUDIO_INPUT),
"-fps_mode", "cfr",
"-map", "0:v", "-map", "0:a" if has_audio else "1:a",
*_video_args, "-bsf:v", get_mp4toannexb_filter(),
*AUDIO_ARGS, "-bsf:a", "setts=pts=DTS",
*_encoder_args, "-shortest", "-fflags", "+genpts",
"-f", "mpegts", file + ".ts"]) "-f", "mpegts", file + ".ts"])
logger.info("ToAnnexb: %s, returned: %s", file, ffmpeg_process.returncode) logger.info(" ".join(ffmpeg_process.args))
span.set_attribute("ffmpeg.args", json.dumps(ffmpeg_process.args))
logger.info("ReEncodeAndAnnexb: %s, returned: %s", file, ffmpeg_process.returncode)
span.set_attribute("ffmpeg.code", ffmpeg_process.returncode)
if ffmpeg_process.returncode == 0: if ffmpeg_process.returncode == 0:
os.remove(file) span.set_status(Status(StatusCode.OK))
span.set_attribute("file.size", os.path.getsize(file+".ts"))
# os.remove(file)
return file+".ts" return file+".ts"
else: else:
span.set_status(Status(StatusCode.ERROR))
return file return file
def start_render(ffmpeg_task: FfmpegTask): def start_render(ffmpeg_task: FfmpegTask):
logger.info(ffmpeg_task) tracer = get_tracer(__name__)
with tracer.start_as_current_span("start_render") as span:
span.set_attribute("ffmpeg.task", str(ffmpeg_task))
if not ffmpeg_task.need_run(): if not ffmpeg_task.need_run():
ffmpeg_task.set_output_file(ffmpeg_task.input_file[0]) ffmpeg_task.set_output_file(ffmpeg_task.input_file[0])
span.set_status(Status(StatusCode.OK))
return True return True
ffmpeg_args = ffmpeg_task.get_ffmpeg_args() ffmpeg_args = ffmpeg_task.get_ffmpeg_args()
logger.info(ffmpeg_args)
if len(ffmpeg_args) == 0: if len(ffmpeg_args) == 0:
ffmpeg_task.set_output_file(ffmpeg_task.input_file[0]) ffmpeg_task.set_output_file(ffmpeg_task.input_file[0])
span.set_status(Status(StatusCode.OK))
return True return True
ffmpeg_process = subprocess.run(["ffmpeg", "-progress", "-", "-loglevel", "error", *ffmpeg_args], **subprocess_args(True)) ffmpeg_process = subprocess.run(["ffmpeg", "-progress", "-", "-loglevel", "error", *ffmpeg_args], stderr=subprocess.PIPE, **subprocess_args(True))
logger.info("FINISH TASK, OUTPUT IS %s", handle_ffmpeg_output(ffmpeg_process.stdout)) span.set_attribute("ffmpeg.args", json.dumps(ffmpeg_process.args))
logger.info(" ".join(ffmpeg_process.args))
ffmpeg_final_out = handle_ffmpeg_output(ffmpeg_process.stdout)
span.set_attribute("ffmpeg.out", ffmpeg_final_out)
logger.info("FINISH TASK, OUTPUT IS %s", ffmpeg_final_out)
code = ffmpeg_process.returncode code = ffmpeg_process.returncode
return code == 0 span.set_attribute("ffmpeg.code", code)
if code != 0:
span.set_attribute("ffmpeg.err", str(ffmpeg_process.stderr))
span.set_status(Status(StatusCode.ERROR, "FFMPEG异常退出"))
logger.error("FFMPEG ERROR: %s", ffmpeg_process.stderr)
return False
span.set_attribute("ffmpeg.out_file", ffmpeg_task.output_file)
try:
file_size = os.path.getsize(ffmpeg_task.output_file)
span.set_attribute("file.size", file_size)
if file_size < 4096:
span.set_status(Status(StatusCode.ERROR, "输出文件过小"))
logger.error("FFMPEG ERROR: OUTPUT FILE IS TOO SMALL")
return False
except OSError as e:
span.set_attribute("file.size", 0)
span.set_attribute("file.error", e.strerror)
span.set_status(Status(StatusCode.ERROR, "输出文件不存在"))
logger.error("FFMPEG ERROR: OUTPUT FILE NOT FOUND")
return False
span.set_status(Status(StatusCode.OK))
return True
def handle_ffmpeg_output(stdout: Optional[bytes]) -> str: def handle_ffmpeg_output(stdout: Optional[bytes]) -> str:
out_time = "0:0:0.0" out_time = "0:0:0.0"
@@ -55,27 +113,97 @@ def handle_ffmpeg_output(stdout: Optional[bytes]) -> str:
print("[ ]Speed:", out_time, "@", speed) print("[ ]Speed:", out_time, "@", speed)
return out_time+"@"+speed return out_time+"@"+speed
def duration_str_to_float(duration_str: str) -> float: def duration_str_to_float(duration_str: str) -> float:
_duration = datetime.strptime(duration_str, "%H:%M:%S.%f") - datetime(1900, 1, 1) _duration = datetime.strptime(duration_str, "%H:%M:%S.%f") - datetime(1900, 1, 1)
return _duration.total_seconds() return _duration.total_seconds()
def probe_video_info(video_file): def probe_video_info(video_file):
tracer = get_tracer(__name__)
with tracer.start_as_current_span("probe_video_info") as span:
span.set_attribute("video.file", video_file)
# 获取宽度和高度 # 获取宽度和高度
result = subprocess.run( result = subprocess.run(
["ffprobe.exe", '-v', 'error', '-select_streams', 'v:0', '-show_entries', 'stream=width,height:format=duration', '-of', ["ffprobe", '-v', 'error', '-select_streams', 'v:0', '-show_entries', 'stream=width,height:format=duration', '-of',
'csv=s=x:p=0', video_file], 'csv=s=x:p=0', video_file],
stderr=subprocess.STDOUT, stderr=subprocess.STDOUT,
**subprocess_args(True) **subprocess_args(True)
) )
span.set_attribute("ffprobe.args", json.dumps(result.args))
span.set_attribute("ffprobe.code", result.returncode)
if result.returncode != 0:
span.set_status(Status(StatusCode.ERROR))
return 0, 0, 0
all_result = result.stdout.decode('utf-8').strip() all_result = result.stdout.decode('utf-8').strip()
span.set_attribute("ffprobe.out", all_result)
if all_result == '':
span.set_status(Status(StatusCode.ERROR))
return 0, 0, 0
span.set_status(Status(StatusCode.OK))
wh, duration = all_result.split('\n') wh, duration = all_result.split('\n')
width, height = wh.strip().split('x') width, height = wh.strip().split('x')
return int(width), int(height), float(duration) return int(width), int(height), float(duration)
def probe_video_audio(video_file, type=None):
tracer = get_tracer(__name__)
with tracer.start_as_current_span("probe_video_audio") as span:
span.set_attribute("video.file", video_file)
args = ["ffprobe", "-hide_banner", "-v", "error", "-select_streams", "a", "-show_entries", "stream=index", "-of", "csv=p=0"]
if type == 'concat':
args.append("-safe")
args.append("0")
args.append("-f")
args.append("concat")
args.append(video_file)
logger.info(" ".join(args))
result = subprocess.run(args, stderr=subprocess.STDOUT, **subprocess_args(True))
span.set_attribute("ffprobe.args", json.dumps(result.args))
span.set_attribute("ffprobe.code", result.returncode)
logger.info("probe_video_audio: %s", result.stdout.decode('utf-8').strip())
if result.returncode != 0:
return False
if result.stdout.decode('utf-8').strip() == '':
return False
return True
# 音频淡出2秒
def fade_out_audio(file, duration, fade_out_sec = 2):
if type(duration) == str:
try:
duration = float(duration)
except Exception as e:
logger.error("duration is not float: %s", e)
return file
tracer = get_tracer(__name__)
with tracer.start_as_current_span("fade_out_audio") as span:
span.set_attribute("audio.file", file)
if duration <= fade_out_sec:
return file
else:
new_fn = file + "_.mp4"
if os.path.exists(new_fn):
os.remove(new_fn)
logger.info("delete tmp file: " + new_fn)
try:
process = subprocess.run(["ffmpeg", "-i", file, "-c:v", "copy", "-c:a", "aac", "-af", "afade=t=out:st=" + str(duration - fade_out_sec) + ":d=" + str(fade_out_sec), "-y", new_fn], **subprocess_args(True))
span.set_attribute("ffmpeg.args", json.dumps(process.args))
logger.info(" ".join(process.args))
if process.returncode != 0:
span.set_status(Status(StatusCode.ERROR))
logger.error("FFMPEG ERROR: %s", process.stderr)
return file
else:
span.set_status(Status(StatusCode.OK))
return new_fn
except Exception as e:
span.set_status(Status(StatusCode.ERROR))
logger.error("FFMPEG ERROR: %s", e)
return file
# Create a set of arguments which make a ``subprocess.Popen`` (and # Create a set of arguments which make a ``subprocess.Popen`` (and
# variants) call work with or without Pyinstaller, ``--noconsole`` or # variants) call work with or without Pyinstaller, ``--noconsole`` or
# not, on Windows and Linux. Typical use:: # not, on Windows and Linux. Typical use::

View File

@@ -1,7 +1,11 @@
import logging import logging
import os import os
import sys
import requests import requests
from opentelemetry.trace import Status, StatusCode
from telemetry import get_tracer
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -13,31 +17,111 @@ def upload_to_oss(url, file_path):
:param str file_path: 文件路径 :param str file_path: 文件路径
:return bool: 是否成功 :return bool: 是否成功
""" """
with open(file_path, 'rb') as f: tracer = get_tracer(__name__)
with tracer.start_as_current_span("upload_to_oss") as span:
span.set_attribute("file.url", url)
span.set_attribute("file.path", file_path)
span.set_attribute("file.size", os.path.getsize(file_path))
max_retries = 5
retries = 0
if os.getenv("UPLOAD_METHOD") == "rclone":
with tracer.start_as_current_span("rclone_to_oss") as r_span:
replace_map = os.getenv("RCLONE_REPLACE_MAP")
r_span.set_attribute("rclone.replace_map", replace_map)
if replace_map != "":
replace_list = [i.split("|", 1) for i in replace_map.split(",")]
new_url = url
for (_src, _dst) in replace_list:
new_url = new_url.replace(_src, _dst)
new_url = new_url.split("?", 1)[0]
r_span.set_attribute("rclone.target_dir", new_url)
if new_url != url:
result = os.system(f"rclone copyto --no-check-dest --ignore-existing --multi-thread-chunk-size 8M --multi-thread-streams 8 {file_path} {new_url}")
r_span.set_attribute("rclone.result", result)
if result == 0:
span.set_status(Status(StatusCode.OK))
return True
else:
span.set_status(Status(StatusCode.ERROR))
while retries < max_retries:
with tracer.start_as_current_span("upload_to_oss.request") as req_span:
req_span.set_attribute("http.retry_count", retries)
try: try:
response = requests.put(url, data=f) req_span.set_attribute("http.method", "PUT")
return response.status_code == 200 req_span.set_attribute("http.url", url)
with open(file_path, 'rb') as f:
response = requests.put(url, data=f, stream=True, timeout=60, headers={"Content-Type": "video/mp4"})
req_span.set_attribute("http.status_code", response.status_code)
req_span.set_attribute("http.response", response.text)
response.raise_for_status()
req_span.set_status(Status(StatusCode.OK))
span.set_status(Status(StatusCode.OK))
return True
except requests.exceptions.Timeout:
req_span.set_attribute("http.error", "Timeout")
req_span.set_status(Status(StatusCode.ERROR))
retries += 1
logger.warning(f"Upload timed out. Retrying {retries}/{max_retries}...")
except Exception as e: except Exception as e:
print(e) req_span.set_attribute("http.error", str(e))
req_span.set_status(Status(StatusCode.ERROR))
retries += 1
logger.warning(f"Upload failed. Retrying {retries}/{max_retries}...")
span.set_status(Status(StatusCode.ERROR))
return False return False
def download_from_oss(url, file_path):
def download_from_oss(url, file_path, skip_if_exist=None):
""" """
使用签名URL下载文件到OSS 使用签名URL下载文件到OSS
:param skip_if_exist: 如果存在就不下载了
:param str url: 签名URL :param str url: 签名URL
:param Union[LiteralString, str, bytes] file_path: 文件路径 :param Union[LiteralString, str, bytes] file_path: 文件路径
:return bool: 是否成功 :return bool: 是否成功
""" """
tracer = get_tracer(__name__)
with tracer.start_as_current_span("download_from_oss") as span:
span.set_attribute("file.url", url)
span.set_attribute("file.path", file_path)
# 如果skip_if_exist为None,则从启动参数中读取
if skip_if_exist is None:
skip_if_exist = 'skip_if_exist' in sys.argv
if skip_if_exist and os.path.exists(file_path):
span.set_attribute("file.exist", True)
span.set_attribute("file.size", os.path.getsize(file_path))
return True
logging.info("download_from_oss: %s", url) logging.info("download_from_oss: %s", url)
file_dir, file_name = os.path.split(file_path) file_dir, file_name = os.path.split(file_path)
if file_dir: if file_dir:
if not os.path.exists(file_dir): if not os.path.exists(file_dir):
os.makedirs(file_dir) os.makedirs(file_dir)
max_retries = 5
retries = 0
while retries < max_retries:
with tracer.start_as_current_span("download_from_oss.request") as req_span:
req_span.set_attribute("http.retry_count", retries)
try: try:
response = requests.get(url) req_span.set_attribute("http.method", "GET")
req_span.set_attribute("http.url", url)
response = requests.get(url, timeout=15) # 设置超时时间
req_span.set_attribute("http.status_code", response.status_code)
with open(file_path, 'wb') as f: with open(file_path, 'wb') as f:
f.write(response.content) f.write(response.content)
req_span.set_attribute("file.size", os.path.getsize(file_path))
req_span.set_status(Status(StatusCode.OK))
span.set_status(Status(StatusCode.OK))
return True return True
except requests.exceptions.Timeout:
req_span.set_attribute("http.error", "Timeout")
req_span.set_status(Status(StatusCode.ERROR))
retries += 1
logger.warning(f"Download timed out. Retrying {retries}/{max_retries}...")
except Exception as e: except Exception as e:
print(e) req_span.set_attribute("http.error", str(e))
req_span.set_status(Status(StatusCode.ERROR))
retries += 1
logger.warning(f"Download failed. Retrying {retries}/{max_retries}...")
span.set_status(Status(StatusCode.ERROR))
return False return False