Compare commits

...

47 Commits

Author SHA1 Message Date
6d631d873e onlyif判断 2025-05-05 19:47:54 +08:00
02dd2b72a0 根据模板定义的分辨率进行操作 2025-04-30 18:07:33 +08:00
d8bc3c8595 健康检查时同步信息 2025-04-28 17:59:44 +08:00
5d58198b7e 接口支持查询模板信息,避免使用旧模板 2025-04-28 17:57:34 +08:00
789513a0be 避免勿删模板 2025-04-28 16:32:34 +08:00
b3911839f3 app不使用批量上报 2025-04-28 16:28:27 +08:00
1c0e4ce411 添加dockerfile 2025-04-28 15:45:22 +08:00
1603be9157 尝试传入resolution,不使用scale自适应模板 2025-04-28 15:02:50 +08:00
f139fbccd7 下载模板时trace归组 2025-04-27 14:24:12 +08:00
2fb0f93886 收集ffmpeg异常,流式上传 2025-04-27 13:47:52 +08:00
9537f995a1 支持redirection 2025-04-20 14:32:06 +08:00
ec03f8180e 修改接口 2025-04-20 12:32:33 +08:00
972b6a4e4d 修改接口 2025-04-20 12:09:17 +08:00
3d810e5c5b 添加接口,添加方便测试的方法 2025-04-20 11:50:32 +08:00
a9043361ec 支持通过env获取encoder args 2025-04-20 11:02:23 +08:00
740a3c7a63 完善requirements.txt 2025-04-20 10:54:49 +08:00
450240bd5a 支持同机位多视频片段复用 2025-04-14 14:15:16 +08:00
6b5975d8b9 更换oltp服务器 2025-04-04 15:51:10 +08:00
85c2e7459e 删除sync_center埋点 2025-04-03 11:22:20 +08:00
364ceb29a1 音频淡出 2025-04-01 17:17:57 +08:00
ced0c1ad1e 修改 2025-03-30 18:11:03 +08:00
6e4dbfd843 固定模板支持音乐 2025-03-28 18:08:19 +08:00
09e0f5f3be concat支持annexb 2025-03-28 18:07:57 +08:00
52c2df8b65 删除无用方法 2025-03-28 17:30:28 +08:00
b25ad20ddd 日志 2025-03-28 17:27:24 +08:00
7c6e4a97b2 片段有音频不可以copy 2025-03-28 17:26:49 +08:00
8f0e69c3de overall_speed片段全局变速 2025-03-24 10:30:36 +08:00
b8db0d2b95 metrics调整 2025-03-23 18:36:26 +08:00
6dc7e86e8e 埋点采集部分接口调整 2025-03-18 13:58:36 +08:00
c62f1ab976 upload返回结果 2025-03-11 16:15:51 +08:00
744fe28421 修复居中切割的问题 2025-03-10 15:16:23 +08:00
cf43e6d549 修复amix降低声音的问题,修复reencode_to_annexb不添加音轨的问题 2025-03-10 10:13:30 +08:00
dcf5f5630d 主动判断是否有音频 2025-03-06 23:02:54 +08:00
56bdad7ad1 音轨叠加 2025-03-06 10:34:28 +08:00
94373cee72 cameraShot特效及旋转 2025-03-05 14:57:02 +08:00
4549b0ab44 分辨率和裁切 2025-03-04 17:43:47 +08:00
9d178a3d34 埋点 2025-03-04 12:36:48 +08:00
1f9632761f effect 2025-03-03 14:27:52 +08:00
fff20610a5 profile level指定及修复 2025-02-27 16:48:57 +08:00
67696739f9 切割模式 2025-02-27 14:02:17 +08:00
2ea248c02e 上传文件也弄个超时 2025-02-16 18:15:35 +08:00
358207efdc 定时清理目录下无用文件 2025-02-16 18:15:25 +08:00
94a5e687df 未生成文件时,上报失败 2025-02-08 15:02:36 +08:00
b7d6797901 忽略无用文件 2025-02-05 10:13:33 +08:00
6d9d373032 only if 逻辑 2025-01-23 14:28:51 +08:00
549ee8320a ffprobe 报错后不采用其内容 2025-01-22 16:04:33 +08:00
29bb80f3b9 渲染后再to annexb,使用新逻辑拼接 2025-01-22 14:31:59 +08:00
14 changed files with 902 additions and 344 deletions

7
.env
View File

@ -1,4 +1,9 @@
TEMPLATE_DIR=template/ TEMPLATE_DIR=template/
API_ENDPOINT=http://127.0.0.1:8030/task/v1 API_ENDPOINT=https://zhentuai.com/task/v1
ACCESS_KEY=TEST_ACCESS_KEY ACCESS_KEY=TEST_ACCESS_KEY
TEMP_DIR=tmp/ TEMP_DIR=tmp/
#REDIRECT_TO_URL=https://worker-renderworker-re-kekuflqjxx.cn-shanghai.fcapp.run/
# QSV
ENCODER_ARGS="-c:v h264_qsv -global_quality 28 -look_ahead 1"
# NVENC
#ENCODER_ARGS="-c:v h264_nvenc -cq:v 24 -preset:v p7 -tune:v hq -profile:v high"

5
.gitignore vendored
View File

@ -6,6 +6,11 @@ __pycache__/
*.so *.so
.Python .Python
build/ build/
dist/
*.mp4
*.ts
rand*.ts
tmp_concat_*.txt
*.egg-info/ *.egg-info/
*.egg *.egg
*.manifest *.manifest

21
Dockerfile Normal file
View File

@ -0,0 +1,21 @@
FROM linuxserver/ffmpeg:7.1.1
LABEL authors="Jerry Yan"
RUN sed -i 's@//.*archive.ubuntu.com@//mirrors.ustc.edu.cn@g' /etc/apt/sources.list && \
sed -i 's/security.ubuntu.com/mirrors.ustc.edu.cn/g' /etc/apt/sources.list
RUN apt-get update && \
apt-get install -y --no-install-recommends \
python3-pip \
python3-dev \
python3-setuptools \
python3-wheel \
python3-venv
RUN apt-get clean && \
rm -rf /var/lib/apt/lists/*
COPY . /app/
RUN python3 -m venv /app/venv
RUN /app/venv/bin/python -m pip config set global.index-url https://mirrors.ustc.edu.cn/pypi/simple
RUN /app/venv/bin/python -m pip install -r /app/requirements.txt
WORKDIR /app
ENTRYPOINT ["/app/venv/bin/python", "app.py"]

40
app.py Normal file
View File

@ -0,0 +1,40 @@
import time
import flask
import config
import biz.task
import template
from telemetry import init_opentelemetry
from template import load_local_template
from util import api
load_local_template()
import logging
LOGGER = logging.getLogger(__name__)
init_opentelemetry(batch=False)
app = flask.Flask(__name__)
@app.get('/health/check')
def health_check():
return api.sync_center()
@app.post('/')
def do_nothing():
return "NOOP"
@app.post('/<task_id>')
def do_task(task_id):
task_info = api.get_task_info(task_id)
local_template_info = template.get_template_def(task_info.get("templateId"))
template_info = api.get_template_info(task_info.get("templateId"))
if local_template_info:
if local_template_info.get("updateTime") != template_info.get("updateTime"):
template.download_template(task_info.get("templateId"))
biz.task.start_task(task_info)
return "OK"
if __name__ == '__main__':
app.run(host="0.0.0.0", port=9998)

View File

@ -2,48 +2,83 @@ import json
import os.path import os.path
import time import time
from opentelemetry.trace import Status, StatusCode
from entity.ffmpeg import FfmpegTask from entity.ffmpeg import FfmpegTask
import logging import logging
from util import ffmpeg, oss from util import ffmpeg, oss
from util.ffmpeg import fade_out_audio
from telemetry import get_tracer
logger = logging.getLogger('biz/ffmpeg') logger = logging.getLogger('biz/ffmpeg')
def parse_ffmpeg_task(task_info, template_info): def parse_ffmpeg_task(task_info, template_info):
tasks = [] tracer = get_tracer(__name__)
# 中间片段 with tracer.start_as_current_span("parse_ffmpeg_task") as span:
task_params_str = task_info.get("taskParams", "{}") tasks = []
task_params = json.loads(task_params_str) # 中间片段
for part in template_info.get("video_parts"): task_params_str = task_info.get("taskParams", "{}")
source = parse_video(part.get('source'), task_params, template_info) span.set_attribute("task_params", task_params_str)
if not source: task_params = json.loads(task_params_str)
logger.warning("no video found for part: " + str(part)) task_params_orig = json.loads(task_params_str)
continue for part in template_info.get("video_parts"):
sub_ffmpeg_task = FfmpegTask(source) source = parse_video(part.get('source'), task_params, template_info)
sub_ffmpeg_task.annexb = True if not source:
sub_ffmpeg_task.frame_rate = template_info.get("frame_rate", 25) logger.warning("no video found for part: " + str(part))
for lut in part.get('filters', []): continue
sub_ffmpeg_task.add_lut(os.path.join(template_info.get("local_path"), lut)) only_if = part.get('only_if', '')
for audio in part.get('audios', []): if only_if:
sub_ffmpeg_task.add_audios(os.path.join(template_info.get("local_path"), audio)) if not check_placeholder_exist(only_if, task_params_orig):
for overlay in part.get('overlays', []): logger.info("because only_if exist, placeholder: %s not exist, skip part: %s", only_if, part)
sub_ffmpeg_task.add_overlay(os.path.join(template_info.get("local_path"), overlay)) continue
tasks.append(sub_ffmpeg_task) sub_ffmpeg_task = FfmpegTask(source)
output_file = "out_" + str(time.time()) + ".mp4" sub_ffmpeg_task.resolution = template_info.get("video_size", "")
task = FfmpegTask(tasks, output_file=output_file) sub_ffmpeg_task.annexb = True
overall = template_info.get("overall_template") sub_ffmpeg_task.ext_data = find_placeholder_params(part.get('source'), task_params) or {}
task.frame_rate = template_info.get("frame_rate", 25) sub_ffmpeg_task.frame_rate = template_info.get("frame_rate", 25)
if overall.get('source', ''): sub_ffmpeg_task.center_cut = part.get("crop_mode", None)
source = parse_video(overall.get('source'), task_params, template_info) for effect in part.get('effects', []):
task.add_inputs(source) sub_ffmpeg_task.add_effect(effect)
for lut in overall.get('filters', []): for lut in part.get('filters', []):
task.add_lut(os.path.join(template_info.get("local_path"), lut)) sub_ffmpeg_task.add_lut(os.path.join(template_info.get("local_path"), lut))
for audio in overall.get('audios', []): for audio in part.get('audios', []):
task.add_audios(os.path.join(template_info.get("local_path"), audio)) sub_ffmpeg_task.add_audios(os.path.join(template_info.get("local_path"), audio))
for overlay in overall.get('overlays', []): for overlay in part.get('overlays', []):
task.add_overlay(os.path.join(template_info.get("local_path"), overlay)) sub_ffmpeg_task.add_overlay(os.path.join(template_info.get("local_path"), overlay))
return task tasks.append(sub_ffmpeg_task)
output_file = "out_" + str(time.time()) + ".mp4"
task = FfmpegTask(tasks, output_file=output_file)
task.resolution = template_info.get("video_size", "")
overall = template_info.get("overall_template")
task.center_cut = template_info.get("crop_mode", None)
task.frame_rate = template_info.get("frame_rate", 25)
if overall.get('source', ''):
source = parse_video(overall.get('source'), task_params, template_info)
task.add_inputs(source)
for effect in overall.get('effects', []):
task.add_effect(effect)
for lut in overall.get('filters', []):
task.add_lut(os.path.join(template_info.get("local_path"), lut))
for audio in overall.get('audios', []):
task.add_audios(os.path.join(template_info.get("local_path"), audio))
for overlay in overall.get('overlays', []):
task.add_overlay(os.path.join(template_info.get("local_path"), overlay))
return task
def find_placeholder_params(source, task_params):
if source.startswith('PLACEHOLDER_'):
placeholder_id = source.replace('PLACEHOLDER_', '')
new_sources = task_params.get(placeholder_id, [])
if type(new_sources) is list:
if len(new_sources) == 0:
logger.debug("no video found for placeholder: " + placeholder_id)
return {}
else:
return new_sources[0]
return {}
def parse_video(source, task_params, template_info): def parse_video(source, task_params, template_info):
@ -56,8 +91,8 @@ def parse_video(source, task_params, template_info):
logger.debug("no video found for placeholder: " + placeholder_id) logger.debug("no video found for placeholder: " + placeholder_id)
return None return None
else: else:
# TODO: Random Pick / Policy Pick _pick_source = new_sources.pop(0)
new_sources = new_sources[0].get("url") new_sources = _pick_source.get("url")
if new_sources.startswith("http"): if new_sources.startswith("http"):
_, source_name = os.path.split(new_sources) _, source_name = os.path.split(new_sources)
oss.download_from_oss(new_sources, source_name) oss.download_from_oss(new_sources, source_name)
@ -66,18 +101,39 @@ def parse_video(source, task_params, template_info):
return os.path.join(template_info.get("local_path"), source) return os.path.join(template_info.get("local_path"), source)
def check_placeholder_exist(placeholder_id, task_params):
if placeholder_id in task_params:
new_sources = task_params.get(placeholder_id, [])
if type(new_sources) is list:
if len(new_sources) == 0:
return False
else:
return True
return True
return False
def start_ffmpeg_task(ffmpeg_task): def start_ffmpeg_task(ffmpeg_task):
for task in ffmpeg_task.analyze_input_render_tasks(): tracer = get_tracer(__name__)
start_ffmpeg_task(task) with tracer.start_as_current_span("start_ffmpeg_task") as span:
ffmpeg_task.correct_task_type() for task in ffmpeg_task.analyze_input_render_tasks():
return ffmpeg.start_render(ffmpeg_task) result = start_ffmpeg_task(task)
if not result:
return False
ffmpeg_task.correct_task_type()
result = ffmpeg.start_render(ffmpeg_task)
if not result:
span.set_status(Status(StatusCode.ERROR))
return False
span.set_status(Status(StatusCode.OK))
return True
def clear_task_tmp_file(ffmpeg_task): def clear_task_tmp_file(ffmpeg_task):
for task in ffmpeg_task.analyze_input_render_tasks(): for task in ffmpeg_task.analyze_input_render_tasks():
clear_task_tmp_file(task) clear_task_tmp_file(task)
try: try:
if "template" not in ffmpeg_task.get_output_file(): if os.getenv("TEMPLATE_DIR") not in ffmpeg_task.get_output_file():
os.remove(ffmpeg_task.get_output_file()) os.remove(ffmpeg_task.get_output_file())
logger.info("delete tmp file: " + ffmpeg_task.get_output_file()) logger.info("delete tmp file: " + ffmpeg_task.get_output_file())
else: else:

View File

@ -1,24 +1,40 @@
import json
from opentelemetry.trace import Status, StatusCode
from biz.ffmpeg import parse_ffmpeg_task, start_ffmpeg_task, clear_task_tmp_file, probe_video_info, fade_out_audio
from telemetry import get_tracer
from template import get_template_def from template import get_template_def
from util import api from util import api
def start_task(task_info): def start_task(task_info):
from biz.ffmpeg import parse_ffmpeg_task, start_ffmpeg_task, clear_task_tmp_file, probe_video_info tracer = get_tracer(__name__)
task_info = api.normalize_task(task_info) with tracer.start_as_current_span("start_task") as span:
template_info = get_template_def(task_info.get("templateId")) task_info = api.normalize_task(task_info)
api.report_task_start(task_info) span.set_attribute("task", json.dumps(task_info))
ffmpeg_task = parse_ffmpeg_task(task_info, template_info) span.set_attribute("scenicId", task_info.get("scenicId", "?"))
result = start_ffmpeg_task(ffmpeg_task) span.set_attribute("templateId", task_info.get("templateId"))
if not result: template_info = get_template_def(task_info.get("templateId"))
return api.report_task_failed(task_info) api.report_task_start(task_info)
oss_result = api.upload_task_file(task_info, ffmpeg_task) ffmpeg_task = parse_ffmpeg_task(task_info, template_info)
if not oss_result: result = start_ffmpeg_task(ffmpeg_task)
return api.report_task_failed(task_info) if not result:
# 获取视频长度宽度和时长 span.set_status(Status(StatusCode.ERROR))
width, height, duration = probe_video_info(ffmpeg_task) return api.report_task_failed(task_info)
clear_task_tmp_file(ffmpeg_task) width, height, duration = probe_video_info(ffmpeg_task)
api.report_task_success(task_info, videoInfo={ # 音频淡出
"width": width, new_fn = fade_out_audio(ffmpeg_task.get_output_file(), duration)
"height": height, ffmpeg_task.set_output_file(new_fn)
"duration": duration oss_result = api.upload_task_file(task_info, ffmpeg_task)
}) if not oss_result:
span.set_status(Status(StatusCode.ERROR))
return api.report_task_failed(task_info)
# 获取视频长度宽度和时长
clear_task_tmp_file(ffmpeg_task)
api.report_task_success(task_info, videoInfo={
"width": width,
"height": height,
"duration": duration
})
span.set_status(Status(StatusCode.OK))

View File

@ -1,9 +1,20 @@
import json
import os
import time import time
import uuid import uuid
from typing import Any
DEFAULT_ARGS = ("-shortest",)
ENCODER_ARGS = ("-c:v", "h264", ) if not os.getenv("ENCODER_ARGS", False) else os.getenv("ENCODER_ARGS", "").split(" ")
VIDEO_ARGS = ("-profile:v", "high", "-level:v", "4", )
AUDIO_ARGS = ("-c:a", "aac", "-b:a", "128k", "-ar", "48000", "-ac", "2", )
MUTE_AUDIO_INPUT = ("-f", "lavfi", "-i", "anullsrc=cl=stereo:r=48000", )
class FfmpegTask(object): class FfmpegTask(object):
effects: list[str]
def __init__(self, input_file, task_type='copy', output_file=''): def __init__(self, input_file, task_type='copy', output_file=''):
self.annexb = False self.annexb = False
if type(input_file) is str: if type(input_file) is str:
@ -14,15 +25,20 @@ class FfmpegTask(object):
self.input_file = input_file self.input_file = input_file
else: else:
self.input_file = [] self.input_file = []
self.zoom_cut = None
self.center_cut = None
self.ext_data = {}
self.task_type = task_type self.task_type = task_type
self.output_file = output_file self.output_file = output_file
self.mute = True self.mute = True
self.speed = 1 self.speed = 1
self.frame_rate = 25 self.frame_rate = 25
self.resolution = None
self.subtitles = [] self.subtitles = []
self.luts = [] self.luts = []
self.audios = [] self.audios = []
self.overlays = [] self.overlays = []
self.effects = []
def __repr__(self): def __repr__(self):
_str = f'FfmpegTask(input_file={self.input_file}, task_type={self.task_type}' _str = f'FfmpegTask(input_file={self.input_file}, task_type={self.task_type}'
@ -34,8 +50,11 @@ class FfmpegTask(object):
_str += f', overlays={self.overlays}' _str += f', overlays={self.overlays}'
if self.annexb: if self.annexb:
_str += f', annexb={self.annexb}' _str += f', annexb={self.annexb}'
if self.effects:
_str += f', effects={self.effects}'
if self.mute: if self.mute:
_str += f', mute={self.mute}' _str += f', mute={self.mute}'
_str += f', center_cut={self.center_cut}'
return _str + ')' return _str + ')'
def analyze_input_render_tasks(self): def analyze_input_render_tasks(self):
@ -77,6 +96,10 @@ class FfmpegTask(object):
self.luts.extend(luts) self.luts.extend(luts)
self.correct_task_type() self.correct_task_type()
def add_effect(self, *effects):
self.effects.extend(effects)
self.correct_task_type()
def get_output_file(self): def get_output_file(self):
if self.task_type == 'copy': if self.task_type == 'copy':
return self.input_file[0] return self.input_file[0]
@ -99,8 +122,14 @@ class FfmpegTask(object):
return False return False
if len(self.subtitles) > 0: if len(self.subtitles) > 0:
return False return False
if len(self.effects) > 0:
return False
if self.speed != 1: if self.speed != 1:
return False return False
if self.zoom_cut is not None:
return False
if self.center_cut is not None:
return False
return True return True
def check_can_copy(self): def check_can_copy(self):
@ -110,45 +139,114 @@ class FfmpegTask(object):
return False return False
if len(self.subtitles) > 0: if len(self.subtitles) > 0:
return False return False
if len(self.effects) > 0:
return False
if self.speed != 1: if self.speed != 1:
return False return False
if len(self.audios) > 1: if len(self.audios) >= 1:
return False return False
if len(self.input_file) > 1: if len(self.input_file) > 1:
return False return False
if self.zoom_cut is not None:
return False
if self.center_cut is not None:
return False
return True return True
def check_audio_track(self): def check_audio_track(self):
if len(self.audios) > 0: ...
self.mute = False
def get_ffmpeg_args(self): def get_ffmpeg_args(self):
args = ['-y', '-hide_banner'] args = ['-y', '-hide_banner']
if self.task_type == 'encode': if self.task_type == 'encode':
# args += ('-hwaccel', 'qsv', '-hwaccel_output_format', 'qsv')
input_args = [] input_args = []
filter_args = [] filter_args = []
output_args = ["-shortest", "-c:v", "h264_qsv", "-global_quality", "28", "-look_ahead", "1"] output_args = [*VIDEO_ARGS, *AUDIO_ARGS, *ENCODER_ARGS, *DEFAULT_ARGS]
if self.annexb: if self.annexb:
output_args.append("-bsf:v") output_args.append("-bsf:v")
output_args.append("h264_mp4toannexb") output_args.append("h264_mp4toannexb")
output_args.append("-reset_timestamps")
output_args.append("1")
video_output_str = "[0:v]" video_output_str = "[0:v]"
audio_output_str = "[0:v]" audio_output_str = ""
video_input_count = 0 audio_track_index = 0
audio_input_count = 0 effect_index = 0
for input_file in self.input_file: for input_file in self.input_file:
input_args.append("-i") input_args.append("-i")
if type(input_file) is str: if type(input_file) is str:
input_args.append(input_file) input_args.append(input_file)
elif isinstance(input_file, FfmpegTask): elif isinstance(input_file, FfmpegTask):
input_args.append(input_file.get_output_file()) input_args.append(input_file.get_output_file())
if self.center_cut == 1:
pos_json_str = self.ext_data.get('posJson', '{}')
pos_json = json.loads(pos_json_str)
_v_w = pos_json.get('imgWidth', 1)
_f_x = pos_json.get('ltX', 0)
_f_x2 = pos_json.get('rbX', 0)
_x = f'{float((_f_x2 + _f_x)/(2 * _v_w)) :.4f}*iw-ih*ih/(2*iw)'
filter_args.append(f"{video_output_str}crop=x={_x}:y=0:w=ih*ih/iw:h=ih[v_cut{effect_index}]")
video_output_str = f"[v_cut{effect_index}]"
effect_index += 1
for effect in self.effects:
if effect.startswith("cameraShot:"):
param = effect.split(":", 2)[1]
if param == '':
param = "3,1,0"
_split = param.split(",")
start = 3
duration = 1
rotate_deg = 0
if len(_split) >= 3:
if _split[2] == '':
rotate_deg = 0
else:
rotate_deg = int(_split[2])
if len(_split) >= 2:
duration = float(_split[1])
if len(_split) >= 1:
start = float(_split[0])
_start_out_str = "[eff_s]"
_mid_out_str = "[eff_m]"
_end_out_str = "[eff_e]"
filter_args.append(f"{video_output_str}split=3{_start_out_str}{_mid_out_str}{_end_out_str}")
filter_args.append(f"{_start_out_str}select=lt(n\\,{int(start*self.frame_rate)}){_start_out_str}")
filter_args.append(f"{_end_out_str}select=gt(n\\,{int(start*self.frame_rate)}){_end_out_str}")
filter_args.append(f"{_mid_out_str}select=eq(n\\,{int(start*self.frame_rate)}){_mid_out_str}")
filter_args.append(f"{_mid_out_str}tpad=start_mode=clone:start_duration={duration:.4f}{_mid_out_str}")
if rotate_deg != 0:
filter_args.append(f"{_mid_out_str}rotate=PI*{rotate_deg}/360{_mid_out_str}")
# filter_args.append(f"{video_output_str}trim=start=0:end={start+duration},tpad=stop_mode=clone:stop_duration={duration},setpts=PTS-STARTPTS{_start_out_str}")
# filter_args.append(f"tpad=start_mode=clone:start_duration={duration},setpts=PTS-STARTPTS{_start_out_str}")
# filter_args.append(f"{_end_out_str}trim=start={start}{_end_out_str}")
video_output_str = f"[v_eff{effect_index}]"
# filter_args.append(f"{_end_out_str}{_start_out_str}overlay=eof_action=pass{video_output_str}")
filter_args.append(f"{_start_out_str}{_mid_out_str}{_end_out_str}concat=n=3:v=1:a=0,setpts=N/{self.frame_rate}/TB{video_output_str}")
effect_index += 1
elif effect.startswith("ospeed:"):
param = effect.split(":", 2)[1]
if param == '':
param = "1"
if param != "1":
# 视频变速
effect_index += 1
filter_args.append(f"{video_output_str}setpts={param}*PTS[v_eff{effect_index}]")
video_output_str = f"[v_eff{effect_index}]"
elif effect.startswith("zoom:"):
...
...
for lut in self.luts: for lut in self.luts:
filter_args.append("[0:v]lut3d=file=" + lut + "[0:v]") filter_args.append(f"{video_output_str}lut3d=file={lut}{video_output_str}")
if self.resolution:
filter_args.append(f"{video_output_str}scale={self.resolution.replace('x', ':')}[v]")
video_output_str = "[v]"
for overlay in self.overlays: for overlay in self.overlays:
input_index = input_args.count("-i") input_index = input_args.count("-i")
input_args.append("-i") input_args.append("-i")
input_args.append(overlay) input_args.append(overlay)
filter_args.append(f"{video_output_str}[{input_index}:v]scale=rw:rh[v]") if os.getenv("OLD_FFMPEG"):
filter_args.append(f"{video_output_str}[{input_index}:v]scale2ref=iw:ih[v]")
else:
filter_args.append(f"{video_output_str}[{input_index}:v]scale=rw:rh[v]")
filter_args.append(f"[v][{input_index}:v]overlay=1:eof_action=endall[v]") filter_args.append(f"[v][{input_index}:v]overlay=1:eof_action=endall[v]")
video_output_str = "[v]" video_output_str = "[v]"
for subtitle in self.subtitles: for subtitle in self.subtitles:
@ -159,97 +257,96 @@ class FfmpegTask(object):
output_args.append("-r") output_args.append("-r")
output_args.append(f"{self.frame_rate}") output_args.append(f"{self.frame_rate}")
if self.mute: if self.mute:
output_args.append("-an") input_index = input_args.count("-i")
input_args += MUTE_AUDIO_INPUT
filter_args.append(f"[{input_index}:a]acopy[a]")
audio_track_index += 1
audio_output_str = "[a]"
else: else:
input_index = 0 audio_output_str = "[0:a]"
for audio in self.audios: audio_track_index += 1
input_index = input_args.count("-i") for audio in self.audios:
input_args.append("-i") input_index = input_args.count("-i")
input_args.append(audio.replace("\\", "/")) input_args.append("-i")
if audio_input_count > 0: input_args.append(audio.replace("\\", "/"))
filter_args.append(f"{audio_output_str}[{input_index}:a]amix[a]") audio_track_index += 1
audio_output_str = "[a]" filter_args.append(f"{audio_output_str}[{input_index}:a]amix=duration=shortest:dropout_transition=0:normalize=0[a]")
else: audio_output_str = "[a]"
audio_output_str = f"[{input_index}:a]" if audio_output_str:
audio_input_count += 1 output_args.append("-map")
if audio_input_count == 1:
audio_output_str = f"{input_index}"
output_args.append(f"-map")
output_args.append(audio_output_str) output_args.append(audio_output_str)
return args + input_args + ["-filter_complex", ";".join(filter_args)] + output_args + [self.get_output_file()] _filter_args = [] if len(filter_args) == 0 else ["-filter_complex", ";".join(filter_args)]
return args + input_args + _filter_args + output_args + [self.get_output_file()]
elif self.task_type == 'concat': elif self.task_type == 'concat':
# 无法通过 annexb 合并的 # 无法通过 annexb 合并的
input_args = [] input_args = []
output_args = ["-shortest"] output_args = [*DEFAULT_ARGS]
if self.check_annexb() and len(self.audios) <= 1: filter_args = []
# output_args audio_output_str = ""
if len(self.audios) > 0: audio_track_index = 0
input_args.append("-an") # output_args
_tmp_file = "tmp_concat_"+str(time.time())+".txt" if len(self.input_file) == 1:
_file = self.input_file[0]
from util.ffmpeg import probe_video_audio
if type(_file) is str:
input_args += ["-i", _file]
self.mute = not probe_video_audio(_file)
elif isinstance(_file, FfmpegTask):
input_args += ["-i", _file.get_output_file()]
self.mute = not probe_video_audio(_file.get_output_file())
else:
_tmp_file = "tmp_concat_" + str(time.time()) + ".txt"
from util.ffmpeg import probe_video_audio
with open(_tmp_file, "w", encoding="utf-8") as f: with open(_tmp_file, "w", encoding="utf-8") as f:
for input_file in self.input_file: for input_file in self.input_file:
if type(input_file) is str: if type(input_file) is str:
f.write("file '"+input_file+"'\n") f.write("file '" + input_file + "'\n")
elif isinstance(input_file, FfmpegTask): elif isinstance(input_file, FfmpegTask):
f.write("file '" + input_file.get_output_file() + "'\n") f.write("file '" + input_file.get_output_file() + "'\n")
input_args += ("-f", "concat", "-safe", "0", "-i", _tmp_file) input_args += ["-f", "concat", "-safe", "0", "-i", _tmp_file]
output_args.append("-c:v") self.mute = not probe_video_audio(_tmp_file, "concat")
output_args.append("copy") output_args.append("-map")
if len(self.audios) > 0: output_args.append("0:v")
input_args.append("-i") output_args.append("-c:v")
input_args.append(self.audios[0]) output_args.append("copy")
output_args.append("-c:a") if self.mute:
output_args.append("copy") input_index = input_args.count("-i")
output_args.append("-f") input_args += MUTE_AUDIO_INPUT
output_args.append("mp4") audio_output_str = f"[{input_index}:a]"
output_args += ("-c:v", "h264_qsv", "-r", "25", "-global_quality", "28", "-look_ahead", "1") audio_track_index += 1
return args + input_args + output_args + [self.get_output_file()] else:
output_args += ("-c:v", "h264_qsv", "-r", "25", "-global_quality", "28", "-look_ahead", "1") audio_output_str = "[0:a]"
filter_args = [] audio_track_index += 1
video_output_str = "[0:v]" for audio in self.audios:
audio_output_str = "[0:a]"
video_input_count = 0
audio_input_count = 0
for input_file in self.input_file:
input_index = input_args.count("-i") input_index = input_args.count("-i")
input_args.append("-i") input_args.append("-i")
if type(input_file) is str: input_args.append(audio.replace("\\", "/"))
input_args.append(input_file.replace("\\", "/")) audio_track_index += 1
elif isinstance(input_file, FfmpegTask): filter_args.append(f"{audio_output_str}[{input_index}:a]amix=duration=shortest:dropout_transition=0:normalize=0[a]")
input_args.append(input_file.get_output_file().replace("\\", "/")) audio_output_str = "[a]"
if video_input_count > 0: if audio_output_str:
filter_args.append(f"{video_output_str}[{input_index}:v]concat=n=2:v=1:a=0[v]") output_args.append("-map")
video_output_str = "[v]" if audio_track_index <= 1:
output_args.append(audio_output_str[1:-1])
else: else:
video_output_str = f"[{input_index}:v]" output_args.append(audio_output_str)
video_input_count += 1 output_args += AUDIO_ARGS
output_args.append("-map") if self.annexb:
output_args.append(video_output_str) output_args.append("-bsf:v")
if self.mute: output_args.append("h264_mp4toannexb")
output_args.append("-an") output_args.append("-bsf:a")
else: output_args.append("setts=pts=DTS")
input_index = 0 output_args.append("-f")
for audio in self.audios: output_args.append("mpegts" if self.annexb else "mp4")
input_index = input_args.count("-i") _filter_args = [] if len(filter_args) == 0 else ["-filter_complex", ";".join(filter_args)]
input_args.append("-i") return args + input_args + _filter_args + output_args + [self.get_output_file()]
input_args.append(audio.replace("\\", "/"))
if audio_input_count > 0:
filter_args.append(f"{audio_output_str}[{input_index}:a]amix[a]")
audio_output_str = "[a]"
else:
audio_output_str = f"[{input_index}:a]"
audio_input_count += 1
if audio_input_count == 1:
audio_output_str = f"{input_index}"
output_args.append(f"-map")
output_args.append(audio_output_str)
return args + input_args + ["-filter_complex", ";".join(filter_args)] + output_args + [self.get_output_file()]
elif self.task_type == 'copy': elif self.task_type == 'copy':
if len(self.input_file) == 1: if len(self.input_file) == 1:
if type(self.input_file[0]) is str: if type(self.input_file[0]) is str:
if self.input_file[0] == self.get_output_file(): if self.input_file[0] == self.get_output_file():
return [] return []
return args + ["-i", self.input_file[0]] + ["-c", "copy", self.get_output_file()] return args + ["-i", self.input_file[0]] + ["-c", "copy", self.get_output_file()]
return []
def set_output_file(self, file=None): def set_output_file(self, file=None):
if file is None: if file is None:

View File

@ -1,19 +1,42 @@
from time import sleep from time import sleep
import biz.task
import config import config
import biz.task
from telemetry import init_opentelemetry
from template import load_local_template from template import load_local_template
from util import api from util import api
load_local_template() import os
import glob
load_local_template()
import logging
LOGGER = logging.getLogger(__name__)
init_opentelemetry()
while True: while True:
# print(get_sys_info()) # print(get_sys_info())
print("waiting for task...") print("waiting for task...")
task_list = api.sync_center() try:
task_list = api.sync_center()
except Exception as e:
LOGGER.error("sync_center error", exc_info=e)
sleep(5)
continue
if len(task_list) == 0: if len(task_list) == 0:
# 删除当前文件夹下所有以.mp4、.ts结尾的文件
for file_globs in ['*.mp4', '*.ts', 'tmp_concat*.txt']:
for file_path in glob.glob(file_globs):
try:
os.remove(file_path)
print(f"Deleted file: {file_path}")
except Exception as e:
LOGGER.error(f"Error deleting file {file_path}", exc_info=e)
sleep(5) sleep(5)
for task in task_list: for task in task_list:
print("start task:", task) print("start task:", task)
biz.task.start_task(task) try:
biz.task.start_task(task)
except Exception as e:
LOGGER.error("task_start error", exc_info=e)

View File

@ -1,3 +1,7 @@
requests~=2.32.3 requests~=2.32.3
psutil~=6.1.0 psutil~=6.1.0
python-dotenv~=1.0.1 python-dotenv~=1.0.1
opentelemetry-api~=1.30.0
opentelemetry-sdk~=1.30.0
opentelemetry-exporter-otlp~=1.30.0
flask~=3.1.0

35
telemetry/__init__.py Normal file
View File

@ -0,0 +1,35 @@
import os
from constant import SOFTWARE_VERSION
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter as OTLPSpanHttpExporter
from opentelemetry.sdk.resources import DEPLOYMENT_ENVIRONMENT, HOST_NAME, Resource, SERVICE_NAME, SERVICE_VERSION
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, SimpleSpanProcessor
def get_tracer(name):
return trace.get_tracer(name)
# 初始化 OpenTelemetry
def init_opentelemetry(batch=True):
# 设置服务名、主机名
resource = Resource(attributes={
SERVICE_NAME: "RENDER_WORKER",
SERVICE_VERSION: SOFTWARE_VERSION,
DEPLOYMENT_ENVIRONMENT: "Python",
HOST_NAME: os.getenv("ACCESS_KEY"),
})
# 使用HTTP协议上报
if batch:
span_processor = BatchSpanProcessor(OTLPSpanHttpExporter(
endpoint="https://oltp.jerryyan.top/v1/traces",
))
else:
span_processor = SimpleSpanProcessor(OTLPSpanHttpExporter(
endpoint="https://oltp.jerryyan.top/v1/traces",
))
trace_provider = TracerProvider(resource=resource, active_span_processor=span_processor)
trace.set_tracer_provider(trace_provider)

View File

@ -2,6 +2,7 @@ import json
import os import os
import logging import logging
from telemetry import get_tracer
from util import api, oss from util import api, oss
TEMPLATES = {} TEMPLATES = {}
@ -75,48 +76,50 @@ def get_template_def(template_id):
return TEMPLATES.get(template_id) return TEMPLATES.get(template_id)
def download_template(template_id): def download_template(template_id):
template_info = api.get_template_info(template_id) tracer = get_tracer(__name__)
if not os.path.isdir(template_info['local_path']): with tracer.start_as_current_span("download_template"):
os.makedirs(template_info['local_path']) template_info = api.get_template_info(template_id)
# download template assets if not os.path.isdir(template_info['local_path']):
overall_template = template_info['overall_template'] os.makedirs(template_info['local_path'])
video_parts = template_info['video_parts'] # download template assets
def _download_assets(_template): overall_template = template_info['overall_template']
if 'source' in _template: video_parts = template_info['video_parts']
if str(_template['source']).startswith("http"): def _download_assets(_template):
_, _fn = os.path.split(_template['source']) if 'source' in _template:
new_fp = os.path.join(template_info['local_path'], _fn) if str(_template['source']).startswith("http"):
oss.download_from_oss(_template['source'], new_fp) _, _fn = os.path.split(_template['source'])
if _fn.endswith(".mp4"): new_fp = os.path.join(template_info['local_path'], _fn)
from util.ffmpeg import to_annexb oss.download_from_oss(_template['source'], new_fp)
new_fp = to_annexb(new_fp) if _fn.endswith(".mp4"):
_template['source'] = os.path.relpath(new_fp, template_info['local_path']) from util.ffmpeg import re_encode_and_annexb
if 'overlays' in _template: new_fp = re_encode_and_annexb(new_fp)
for i in range(len(_template['overlays'])): _template['source'] = os.path.relpath(new_fp, template_info['local_path'])
overlay = _template['overlays'][i] if 'overlays' in _template:
if str(overlay).startswith("http"): for i in range(len(_template['overlays'])):
_, _fn = os.path.split(overlay) overlay = _template['overlays'][i]
oss.download_from_oss(overlay, os.path.join(template_info['local_path'], _fn)) if str(overlay).startswith("http"):
_template['overlays'][i] = _fn _, _fn = os.path.split(overlay)
if 'luts' in _template: oss.download_from_oss(overlay, os.path.join(template_info['local_path'], _fn))
for i in range(len(_template['luts'])): _template['overlays'][i] = _fn
lut = _template['luts'][i] if 'luts' in _template:
if str(lut).startswith("http"): for i in range(len(_template['luts'])):
_, _fn = os.path.split(lut) lut = _template['luts'][i]
oss.download_from_oss(lut, os.path.join(template_info['local_path'], _fn)) if str(lut).startswith("http"):
_template['luts'][i] = _fn _, _fn = os.path.split(lut)
if 'audios' in _template: oss.download_from_oss(lut, os.path.join(template_info['local_path'], _fn))
for i in range(len(_template['audios'])): _template['luts'][i] = _fn
if str(_template['audios'][i]).startswith("http"): if 'audios' in _template:
_, _fn = os.path.split(_template['audios'][i]) for i in range(len(_template['audios'])):
oss.download_from_oss(_template['audios'][i], os.path.join(template_info['local_path'], _fn)) if str(_template['audios'][i]).startswith("http"):
_template['audios'][i] = _fn _, _fn = os.path.split(_template['audios'][i])
_download_assets(overall_template) oss.download_from_oss(_template['audios'][i], os.path.join(template_info['local_path'], _fn))
for video_part in video_parts: _template['audios'][i] = _fn
_download_assets(video_part) _download_assets(overall_template)
with open(os.path.join(template_info['local_path'], 'template.json'), 'w', encoding='utf-8') as f: for video_part in video_parts:
json.dump(template_info, f) _download_assets(video_part)
load_template(template_id, template_info['local_path']) with open(os.path.join(template_info['local_path'], 'template.json'), 'w', encoding='utf-8') as f:
json.dump(template_info, f)
load_template(template_id, template_info['local_path'])
def analyze_template(template_id): def analyze_template(template_id):

View File

@ -1,9 +1,14 @@
import json
import logging import logging
import os import os
import threading
import requests import requests
from opentelemetry.trace import Status, StatusCode
import util.system import util.system
from telemetry import get_tracer
from util import oss
session = requests.Session() session = requests.Session()
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -19,13 +24,13 @@ def sync_center():
通过接口获取任务 通过接口获取任务
:return: 任务列表 :return: 任务列表
""" """
from template import TEMPLATES, download_template
try: try:
from template import TEMPLATES, download_template
response = session.post(os.getenv('API_ENDPOINT') + "/sync", json={ response = session.post(os.getenv('API_ENDPOINT') + "/sync", json={
'accessKey': os.getenv('ACCESS_KEY'), 'accessKey': os.getenv('ACCESS_KEY'),
'clientStatus': util.system.get_sys_info(), 'clientStatus': util.system.get_sys_info(),
'templateList': [{'id': t.get('id', ''), 'updateTime': t.get('updateTime', '')} for t in TEMPLATES.values()] 'templateList': [{'id': t.get('id', ''), 'updateTime': t.get('updateTime', '')} for t in
TEMPLATES.values()]
}, timeout=10) }, timeout=10)
response.raise_for_status() response.raise_for_status()
except requests.RequestException as e: except requests.RequestException as e:
@ -40,6 +45,13 @@ def sync_center():
tasks = [] tasks = []
templates = [] templates = []
logger.warning("获取任务失败") logger.warning("获取任务失败")
if os.getenv("REDIRECT_TO_URL", False) != False:
for task in tasks:
_sess = requests.Session()
logger.info("重定向任务【%s】至配置的地址:%s", task.get("id"), os.getenv("REDIRECT_TO_URL"))
url = f"{os.getenv('REDIRECT_TO_URL')}{task.get('id')}"
threading.Thread(target=requests.post, args=(url,)).start()
return []
for template in templates: for template in templates:
template_id = template.get('id', '') template_id = template.get('id', '')
if template_id: if template_id:
@ -56,119 +68,189 @@ def get_template_info(template_id):
:type template_id: str :type template_id: str
:return: 模板信息 :return: 模板信息
""" """
try: tracer = get_tracer(__name__)
response = session.post('{0}/template/{1}'.format(os.getenv('API_ENDPOINT'), template_id), json={ with tracer.start_as_current_span("get_template_info"):
'accessKey': os.getenv('ACCESS_KEY'), with tracer.start_as_current_span("get_template_info.request") as req_span:
}, timeout=10) try:
response.raise_for_status() req_span.set_attribute("http.method", "POST")
except requests.RequestException as e: req_span.set_attribute("http.url", '{0}/template/{1}'.format(os.getenv('API_ENDPOINT'), template_id))
logger.error("请求失败!", e) response = session.post('{0}/template/{1}'.format(os.getenv('API_ENDPOINT'), template_id), json={
return None 'accessKey': os.getenv('ACCESS_KEY'),
data = response.json() }, timeout=10)
remote_template_info = data.get('data', {}) req_span.set_attribute("http.status_code", response.status_code)
logger.debug("获取模板信息结果:【%s", remote_template_info) req_span.set_attribute("http.response", response.text)
template = { response.raise_for_status()
'id': template_id, except requests.RequestException as e:
'updateTime': remote_template_info.get('updateTime', template_id), req_span.set_attribute("api.error", str(e))
'scenic_name': remote_template_info.get('scenicName', '景区'), logger.error("请求失败!", e)
'name': remote_template_info.get('name', '模版'), return None
'video_size': '1920x1080', data = response.json()
'frame_rate': 30, logger.debug("获取模板信息结果:【%s", data)
'overall_duration': 30, remote_template_info = data.get('data', {})
'video_parts': [ template = {
'id': template_id,
'updateTime': remote_template_info.get('updateTime', template_id),
'scenic_name': remote_template_info.get('scenicName', '景区'),
'name': remote_template_info.get('name', '模版'),
'video_size': remote_template_info.get('resolution', '1920x1080'),
'frame_rate': 25,
'overall_duration': 30,
'video_parts': [
] ]
} }
def _template_normalizer(template_info): def _template_normalizer(template_info):
_template = {} _template = {}
_placeholder_type = template_info.get('isPlaceholder', -1) _placeholder_type = template_info.get('isPlaceholder', -1)
if _placeholder_type == 0: if _placeholder_type == 0:
# 固定视频 # 固定视频
_template['source'] = template_info.get('sourceUrl', '') _template['source'] = template_info.get('sourceUrl', '')
elif _placeholder_type == 1: elif _placeholder_type == 1:
# 占位符 # 占位符
_template['source'] = "PLACEHOLDER_" + template_info.get('sourceUrl', '') _template['source'] = "PLACEHOLDER_" + template_info.get('sourceUrl', '')
_template['mute'] = template_info.get('mute', True) _template['mute'] = template_info.get('mute', True)
else: _template['crop_mode'] = template_info.get('cropEnable', None)
_template['source'] = None else:
_overlays = template_info.get('overlays', '') _template['source'] = None
if _overlays: _overlays = template_info.get('overlays', '')
_template['overlays'] = _overlays.split(",") if _overlays:
_audios = template_info.get('audios', '') _template['overlays'] = _overlays.split(",")
if _audios: _audios = template_info.get('audios', '')
_template['audios'] = _audios.split(",") if _audios:
_luts = template_info.get('luts', '') _template['audios'] = _audios.split(",")
if _luts: _luts = template_info.get('luts', '')
_template['luts'] = _luts.split(",") if _luts:
return _template _template['luts'] = _luts.split(",")
_only_if = template_info.get('onlyIf', '')
if _only_if:
_template['only_if'] = _only_if
_effects = template_info.get('effects', '')
if _effects:
_template['effects'] = _effects.split("|")
return _template
# outer template definition # outer template definition
overall_template = _template_normalizer(remote_template_info) overall_template = _template_normalizer(remote_template_info)
template['overall_template'] = overall_template template['overall_template'] = overall_template
# inter template definition # inter template definition
inter_template_list = remote_template_info.get('children', []) inter_template_list = remote_template_info.get('children', [])
for children_template in inter_template_list: for children_template in inter_template_list:
parts = _template_normalizer(children_template) parts = _template_normalizer(children_template)
template['video_parts'].append(parts) template['video_parts'].append(parts)
template['local_path'] = os.path.join(os.getenv('TEMPLATE_DIR'), str(template_id)) template['local_path'] = os.path.join(os.getenv('TEMPLATE_DIR'), str(template_id))
return template with get_tracer("api").start_as_current_span("get_template_info.template") as res_span:
res_span.set_attribute("normalized.response", json.dumps(template))
return template
def report_task_success(task_info, **kwargs): def report_task_success(task_info, **kwargs):
try: tracer = get_tracer(__name__)
response = session.post('{0}/{1}/success'.format(os.getenv('API_ENDPOINT'), task_info.get("id")), json={ with tracer.start_as_current_span("report_task_success"):
'accessKey': os.getenv('ACCESS_KEY'), with tracer.start_as_current_span("report_task_success.request") as req_span:
**kwargs try:
}, timeout=10) req_span.set_attribute("http.method", "POST")
response.raise_for_status() req_span.set_attribute("http.url",
except requests.RequestException as e: '{0}/{1}/success'.format(os.getenv('API_ENDPOINT'), task_info.get("id")))
logger.error("请求失败!", e) response = session.post('{0}/{1}/success'.format(os.getenv('API_ENDPOINT'), task_info.get("id")), json={
return None 'accessKey': os.getenv('ACCESS_KEY'),
**kwargs
}, timeout=10)
req_span.set_attribute("http.status_code", response.status_code)
req_span.set_attribute("http.response", response.text)
response.raise_for_status()
req_span.set_status(Status(StatusCode.OK))
except requests.RequestException as e:
req_span.set_attribute("api.error", str(e))
logger.error("请求失败!", e)
return None
def report_task_start(task_info): def report_task_start(task_info):
try: tracer = get_tracer(__name__)
response = session.post('{0}/{1}/start'.format(os.getenv('API_ENDPOINT'), task_info.get("id")), json={ with tracer.start_as_current_span("report_task_start"):
'accessKey': os.getenv('ACCESS_KEY'), with tracer.start_as_current_span("report_task_start.request") as req_span:
}, timeout=10) try:
response.raise_for_status() req_span.set_attribute("http.method", "POST")
except requests.RequestException as e: req_span.set_attribute("http.url",
logger.error("请求失败!", e) '{0}/{1}/start'.format(os.getenv('API_ENDPOINT'), task_info.get("id")))
return None response = session.post('{0}/{1}/start'.format(os.getenv('API_ENDPOINT'), task_info.get("id")), json={
'accessKey': os.getenv('ACCESS_KEY'),
}, timeout=10)
req_span.set_attribute("http.status_code", response.status_code)
req_span.set_attribute("http.response", response.text)
response.raise_for_status()
req_span.set_status(Status(StatusCode.OK))
except requests.RequestException as e:
req_span.set_attribute("api.error", str(e))
logger.error("请求失败!", e)
return None
def report_task_failed(task_info, reason=''): def report_task_failed(task_info, reason=''):
try: tracer = get_tracer(__name__)
response = session.post('{0}/{1}/fail'.format(os.getenv('API_ENDPOINT'), task_info.get("id")), json={ with tracer.start_as_current_span("report_task_failed") as span:
'accessKey': os.getenv('ACCESS_KEY'), span.set_attribute("task_id", task_info.get("id"))
'reason': reason span.set_attribute("reason", reason)
}, timeout=10) with tracer.start_as_current_span("report_task_failed.request") as req_span:
response.raise_for_status() try:
except requests.RequestException as e: req_span.set_attribute("http.method", "POST")
logger.error("请求失败!", e) req_span.set_attribute("http.url",
return None '{0}/{1}/fail'.format(os.getenv('API_ENDPOINT'), task_info.get("id")))
response = session.post('{0}/{1}/fail'.format(os.getenv('API_ENDPOINT'), task_info.get("id")), json={
'accessKey': os.getenv('ACCESS_KEY'),
'reason': reason
}, timeout=10)
req_span.set_attribute("http.status_code", response.status_code)
req_span.set_attribute("http.response", response.text)
response.raise_for_status()
req_span.set_status(Status(StatusCode.OK))
except requests.RequestException as e:
req_span.set_attribute("api.error", str(e))
req_span.set_status(Status(StatusCode.ERROR))
logger.error("请求失败!", e)
return None
def upload_task_file(task_info, ffmpeg_task): def upload_task_file(task_info, ffmpeg_task):
logger.info("开始上传文件: %s", task_info.get("id")) tracer = get_tracer(__name__)
with get_tracer("api").start_as_current_span("upload_task_file") as span:
logger.info("开始上传文件: %s", task_info.get("id"))
span.set_attribute("file.id", task_info.get("id"))
with tracer.start_as_current_span("upload_task_file.request_upload_url") as req_span:
try:
req_span.set_attribute("http.method", "POST")
req_span.set_attribute("http.url",
'{0}/{1}/uploadUrl'.format(os.getenv('API_ENDPOINT'), task_info.get("id")))
response = session.post('{0}/{1}/uploadUrl'.format(os.getenv('API_ENDPOINT'), task_info.get("id")),
json={
'accessKey': os.getenv('ACCESS_KEY'),
}, timeout=10)
req_span.set_attribute("http.status_code", response.status_code)
req_span.set_attribute("http.response", response.text)
response.raise_for_status()
req_span.set_status(Status(StatusCode.OK))
except requests.RequestException as e:
span.set_attribute("api.error", str(e))
req_span.set_status(Status(StatusCode.ERROR))
logger.error("请求失败!", e)
return False
data = response.json()
url = data.get('data', "")
logger.info("开始上传文件: %s%s", task_info.get("id"), url)
return oss.upload_to_oss(url, ffmpeg_task.get_output_file())
def get_task_info(id):
try: try:
response = session.post('{0}/{1}/uploadUrl'.format(os.getenv('API_ENDPOINT'), task_info.get("id")), json={ response = session.get(os.getenv('API_ENDPOINT') + "/" + id + "/info", params={
'accessKey': os.getenv('ACCESS_KEY'), 'accessKey': os.getenv('ACCESS_KEY'),
}, timeout=10) }, timeout=10)
response.raise_for_status() response.raise_for_status()
except requests.RequestException as e: except requests.RequestException as e:
logger.error("请求失败!", e) logger.error("请求失败!", e)
return False return []
data = response.json() data = response.json()
url = data.get('data', "") logger.debug("获取任务结果:【%s", data)
logger.info("开始上传文件: %s%s", task_info.get("id"), url) if data.get('code', 0) == 200:
try: return data.get('data', {})
with open(ffmpeg_task.get_output_file(), 'rb') as f:
requests.put(url, data=f)
except requests.RequestException as e:
logger.error("上传失败!", e)
return False
finally:
logger.info("上传文件结束: %s", task_info.get("id"))
return True

View File

@ -1,40 +1,87 @@
import json
import logging import logging
import os import os
import subprocess import subprocess
from datetime import datetime from datetime import datetime
from typing import Optional, IO from typing import Optional, IO
from entity.ffmpeg import FfmpegTask from opentelemetry.trace import Status, StatusCode
from entity.ffmpeg import FfmpegTask, ENCODER_ARGS, VIDEO_ARGS, AUDIO_ARGS, MUTE_AUDIO_INPUT
from telemetry import get_tracer
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def to_annexb(file): def re_encode_and_annexb(file):
if not os.path.exists(file): with get_tracer("ffmpeg").start_as_current_span("re_encode_and_annexb") as span:
return file span.set_attribute("file.path", file)
logger.info("ToAnnexb: %s", file) if not os.path.exists(file):
ffmpeg_process = subprocess.run(["ffmpeg", "-y", "-hide_banner", "-i", file, "-c", "copy", "-bsf:v", "h264_mp4toannexb", span.set_status(Status(StatusCode.ERROR))
"-f", "mpegts", file+".ts"]) return file
logger.info("ToAnnexb: %s, returned: %s", file, ffmpeg_process.returncode) logger.info("ReEncodeAndAnnexb: %s", file)
if ffmpeg_process.returncode == 0: has_audio = not not probe_video_audio(file)
os.remove(file) ffmpeg_process = subprocess.run(["ffmpeg", "-y", "-hide_banner", "-vsync", "cfr", "-i", file,
return file+".ts" *(set() if has_audio else MUTE_AUDIO_INPUT),
else: "-map", "0:v", "-map", "0:a" if has_audio else "1:a",
return file *VIDEO_ARGS, "-bsf:v", "h264_mp4toannexb",
*AUDIO_ARGS, "-bsf:a", "setts=pts=DTS",
*ENCODER_ARGS, "-shortest", "-fflags", "+genpts",
"-f", "mpegts", file + ".ts"])
logger.info(" ".join(ffmpeg_process.args))
span.set_attribute("ffmpeg.args", json.dumps(ffmpeg_process.args))
logger.info("ReEncodeAndAnnexb: %s, returned: %s", file, ffmpeg_process.returncode)
span.set_attribute("ffmpeg.code", ffmpeg_process.returncode)
if ffmpeg_process.returncode == 0:
span.set_status(Status(StatusCode.OK))
span.set_attribute("file.size", os.path.getsize(file+".ts"))
# os.remove(file)
return file+".ts"
else:
span.set_status(Status(StatusCode.ERROR))
return file
def start_render(ffmpeg_task: FfmpegTask): def start_render(ffmpeg_task: FfmpegTask):
logger.info(ffmpeg_task) tracer = get_tracer(__name__)
if not ffmpeg_task.need_run(): with tracer.start_as_current_span("start_render") as span:
ffmpeg_task.set_output_file(ffmpeg_task.input_file[0]) span.set_attribute("ffmpeg.task", str(ffmpeg_task))
if not ffmpeg_task.need_run():
ffmpeg_task.set_output_file(ffmpeg_task.input_file[0])
span.set_status(Status(StatusCode.OK))
return True
ffmpeg_args = ffmpeg_task.get_ffmpeg_args()
if len(ffmpeg_args) == 0:
ffmpeg_task.set_output_file(ffmpeg_task.input_file[0])
span.set_status(Status(StatusCode.OK))
return True
ffmpeg_process = subprocess.run(["ffmpeg", "-progress", "-", "-loglevel", "error", *ffmpeg_args], stderr=subprocess.PIPE, **subprocess_args(True))
span.set_attribute("ffmpeg.args", json.dumps(ffmpeg_process.args))
logger.info(" ".join(ffmpeg_process.args))
ffmpeg_final_out = handle_ffmpeg_output(ffmpeg_process.stdout)
span.set_attribute("ffmpeg.out", ffmpeg_final_out)
logger.info("FINISH TASK, OUTPUT IS %s", ffmpeg_final_out)
code = ffmpeg_process.returncode
span.set_attribute("ffmpeg.code", code)
if code != 0:
span.set_attribute("ffmpeg.err", str(ffmpeg_process.stderr))
span.set_status(Status(StatusCode.ERROR, "FFMPEG异常退出"))
logger.error("FFMPEG ERROR: %s", ffmpeg_process.stderr)
return False
span.set_attribute("ffmpeg.out_file", ffmpeg_task.output_file)
try:
file_size = os.path.getsize(ffmpeg_task.output_file)
span.set_attribute("file.size", file_size)
if file_size < 4096:
span.set_status(Status(StatusCode.ERROR, "输出文件过小"))
logger.error("FFMPEG ERROR: OUTPUT FILE IS TOO SMALL")
return False
except OSError as e:
span.set_attribute("file.size", 0)
span.set_attribute("file.error", e.strerror)
span.set_status(Status(StatusCode.ERROR, "输出文件不存在"))
logger.error("FFMPEG ERROR: OUTPUT FILE NOT FOUND")
return False
span.set_status(Status(StatusCode.OK))
return True return True
ffmpeg_args = ffmpeg_task.get_ffmpeg_args()
logger.info(ffmpeg_args)
if len(ffmpeg_args) == 0:
ffmpeg_task.set_output_file(ffmpeg_task.input_file[0])
return True
ffmpeg_process = subprocess.run(["ffmpeg", "-progress", "-", "-loglevel", "error", *ffmpeg_args], **subprocess_args(True))
logger.info("FINISH TASK, OUTPUT IS %s", handle_ffmpeg_output(ffmpeg_process.stdout))
code = ffmpeg_process.returncode
return code == 0
def handle_ffmpeg_output(stdout: Optional[bytes]) -> str: def handle_ffmpeg_output(stdout: Optional[bytes]) -> str:
out_time = "0:0:0.0" out_time = "0:0:0.0"
@ -55,25 +102,95 @@ def handle_ffmpeg_output(stdout: Optional[bytes]) -> str:
print("[ ]Speed:", out_time, "@", speed) print("[ ]Speed:", out_time, "@", speed)
return out_time+"@"+speed return out_time+"@"+speed
def duration_str_to_float(duration_str: str) -> float: def duration_str_to_float(duration_str: str) -> float:
_duration = datetime.strptime(duration_str, "%H:%M:%S.%f") - datetime(1900, 1, 1) _duration = datetime.strptime(duration_str, "%H:%M:%S.%f") - datetime(1900, 1, 1)
return _duration.total_seconds() return _duration.total_seconds()
def probe_video_info(video_file): def probe_video_info(video_file):
# 获取宽度和高度 tracer = get_tracer(__name__)
result = subprocess.run( with tracer.start_as_current_span("probe_video_info") as span:
["ffprobe.exe", '-v', 'error', '-select_streams', 'v:0', '-show_entries', 'stream=width,height:format=duration', '-of', span.set_attribute("video.file", video_file)
'csv=s=x:p=0', video_file], # 获取宽度和高度
stderr=subprocess.STDOUT, result = subprocess.run(
**subprocess_args(True) ["ffprobe", '-v', 'error', '-select_streams', 'v:0', '-show_entries', 'stream=width,height:format=duration', '-of',
) 'csv=s=x:p=0', video_file],
all_result = result.stdout.decode('utf-8').strip() stderr=subprocess.STDOUT,
wh, duration = all_result.split('\n') **subprocess_args(True)
width, height = wh.strip().split('x') )
span.set_attribute("ffprobe.args", json.dumps(result.args))
span.set_attribute("ffprobe.code", result.returncode)
if result.returncode != 0:
span.set_status(Status(StatusCode.ERROR))
return 0, 0, 0
all_result = result.stdout.decode('utf-8').strip()
span.set_attribute("ffprobe.out", all_result)
if all_result == '':
span.set_status(Status(StatusCode.ERROR))
return 0, 0, 0
span.set_status(Status(StatusCode.OK))
wh, duration = all_result.split('\n')
width, height = wh.strip().split('x')
return int(width), int(height), float(duration)
def probe_video_audio(video_file, type=None):
tracer = get_tracer(__name__)
with tracer.start_as_current_span("probe_video_audio") as span:
span.set_attribute("video.file", video_file)
args = ["ffprobe", "-hide_banner", "-v", "error", "-select_streams", "a", "-show_entries", "stream=index", "-of", "csv=p=0"]
if type == 'concat':
args.append("-safe")
args.append("0")
args.append("-f")
args.append("concat")
args.append(video_file)
logger.info(" ".join(args))
result = subprocess.run(args, stderr=subprocess.STDOUT, **subprocess_args(True))
span.set_attribute("ffprobe.args", json.dumps(result.args))
span.set_attribute("ffprobe.code", result.returncode)
logger.info("probe_video_audio: %s", result.stdout.decode('utf-8').strip())
if result.returncode != 0:
return False
if result.stdout.decode('utf-8').strip() == '':
return False
return True
# 音频淡出2秒
def fade_out_audio(file, duration, fade_out_sec = 2):
if type(duration) == str:
try:
duration = float(duration)
except Exception as e:
logger.error("duration is not float: %s", e)
return file
tracer = get_tracer(__name__)
with tracer.start_as_current_span("fade_out_audio") as span:
span.set_attribute("audio.file", file)
if duration <= fade_out_sec:
return file
else:
new_fn = file + "_.mp4"
if os.path.exists(new_fn):
os.remove(new_fn)
logger.info("delete tmp file: " + new_fn)
try:
process = subprocess.run(["ffmpeg", "-i", file, "-c:v", "copy", "-c:a", "aac", "-af", "afade=t=out:st=" + str(duration - fade_out_sec) + ":d=" + str(fade_out_sec), "-y", new_fn], **subprocess_args(True))
span.set_attribute("ffmpeg.args", json.dumps(process.args))
logger.info(" ".join(process.args))
if process.returncode != 0:
span.set_status(Status(StatusCode.ERROR))
logger.error("FFMPEG ERROR: %s", process.stderr)
return file
else:
span.set_status(Status(StatusCode.OK))
return new_fn
except Exception as e:
span.set_status(Status(StatusCode.ERROR))
logger.error("FFMPEG ERROR: %s", e)
return file
return int(width), int(height), float(duration)
# Create a set of arguments which make a ``subprocess.Popen`` (and # Create a set of arguments which make a ``subprocess.Popen`` (and

View File

@ -2,6 +2,9 @@ import logging
import os import os
import requests import requests
from opentelemetry.trace import Status, StatusCode
from telemetry import get_tracer
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -13,13 +16,40 @@ def upload_to_oss(url, file_path):
:param str file_path: 文件路径 :param str file_path: 文件路径
:return bool: 是否成功 :return bool: 是否成功
""" """
with open(file_path, 'rb') as f: tracer = get_tracer(__name__)
try: with tracer.start_as_current_span("upload_to_oss") as span:
response = requests.put(url, data=f) span.set_attribute("file.url", url)
return response.status_code == 200 span.set_attribute("file.path", file_path)
except Exception as e: span.set_attribute("file.size", os.path.getsize(file_path))
print(e) max_retries = 5
return False retries = 0
while retries < max_retries:
with tracer.start_as_current_span("upload_to_oss.request") as req_span:
req_span.set_attribute("http.retry_count", retries)
try:
req_span.set_attribute("http.method", "PUT")
req_span.set_attribute("http.url", url)
with open(file_path, 'rb') as f:
response = requests.put(url, data=f, stream=True, timeout=60, headers={"Content-Type": "video/mp4"})
req_span.set_attribute("http.status_code", response.status_code)
req_span.set_attribute("http.response", response.text)
response.raise_for_status()
req_span.set_status(Status(StatusCode.OK))
span.set_status(Status(StatusCode.OK))
return True
except requests.exceptions.Timeout:
req_span.set_attribute("http.error", "Timeout")
req_span.set_status(Status(StatusCode.ERROR))
retries += 1
logger.warning(f"Upload timed out. Retrying {retries}/{max_retries}...")
except Exception as e:
req_span.set_attribute("http.error", str(e))
req_span.set_status(Status(StatusCode.ERROR))
retries += 1
logger.warning(f"Upload failed. Retrying {retries}/{max_retries}...")
span.set_status(Status(StatusCode.ERROR))
return False
def download_from_oss(url, file_path): def download_from_oss(url, file_path):
""" """
@ -28,16 +58,40 @@ def download_from_oss(url, file_path):
:param Union[LiteralString, str, bytes] file_path: 文件路径 :param Union[LiteralString, str, bytes] file_path: 文件路径
:return bool: 是否成功 :return bool: 是否成功
""" """
logging.info("download_from_oss: %s", url) tracer = get_tracer(__name__)
file_dir, file_name = os.path.split(file_path) with tracer.start_as_current_span("download_from_oss") as span:
if file_dir: span.set_attribute("file.url", url)
if not os.path.exists(file_dir): span.set_attribute("file.path", file_path)
os.makedirs(file_dir) logging.info("download_from_oss: %s", url)
try: file_dir, file_name = os.path.split(file_path)
response = requests.get(url) if file_dir:
with open(file_path, 'wb') as f: if not os.path.exists(file_dir):
f.write(response.content) os.makedirs(file_dir)
return True max_retries = 5
except Exception as e: retries = 0
print(e) while retries < max_retries:
return False with tracer.start_as_current_span("download_from_oss.request") as req_span:
req_span.set_attribute("http.retry_count", retries)
try:
req_span.set_attribute("http.method", "GET")
req_span.set_attribute("http.url", url)
response = requests.get(url, timeout=15) # 设置超时时间
req_span.set_attribute("http.status_code", response.status_code)
with open(file_path, 'wb') as f:
f.write(response.content)
req_span.set_attribute("file.size", os.path.getsize(file_path))
req_span.set_status(Status(StatusCode.OK))
span.set_status(Status(StatusCode.OK))
return True
except requests.exceptions.Timeout:
req_span.set_attribute("http.error", "Timeout")
req_span.set_status(Status(StatusCode.ERROR))
retries += 1
logger.warning(f"Download timed out. Retrying {retries}/{max_retries}...")
except Exception as e:
req_span.set_attribute("http.error", str(e))
req_span.set_status(Status(StatusCode.ERROR))
retries += 1
logger.warning(f"Download failed. Retrying {retries}/{max_retries}...")
span.set_status(Status(StatusCode.ERROR))
return False