Compare commits
47 Commits
Author | SHA1 | Date | |
---|---|---|---|
6d631d873e | |||
02dd2b72a0 | |||
d8bc3c8595 | |||
5d58198b7e | |||
789513a0be | |||
b3911839f3 | |||
1c0e4ce411 | |||
1603be9157 | |||
f139fbccd7 | |||
2fb0f93886 | |||
9537f995a1 | |||
ec03f8180e | |||
972b6a4e4d | |||
3d810e5c5b | |||
a9043361ec | |||
740a3c7a63 | |||
450240bd5a | |||
6b5975d8b9 | |||
85c2e7459e | |||
364ceb29a1 | |||
ced0c1ad1e | |||
6e4dbfd843 | |||
09e0f5f3be | |||
52c2df8b65 | |||
b25ad20ddd | |||
7c6e4a97b2 | |||
8f0e69c3de | |||
b8db0d2b95 | |||
6dc7e86e8e | |||
c62f1ab976 | |||
744fe28421 | |||
cf43e6d549 | |||
dcf5f5630d | |||
56bdad7ad1 | |||
94373cee72 | |||
4549b0ab44 | |||
9d178a3d34 | |||
1f9632761f | |||
fff20610a5 | |||
67696739f9 | |||
2ea248c02e | |||
358207efdc | |||
94a5e687df | |||
b7d6797901 | |||
6d9d373032 | |||
549ee8320a | |||
29bb80f3b9 |
7
.env
7
.env
@ -1,4 +1,9 @@
|
|||||||
TEMPLATE_DIR=template/
|
TEMPLATE_DIR=template/
|
||||||
API_ENDPOINT=http://127.0.0.1:8030/task/v1
|
API_ENDPOINT=https://zhentuai.com/task/v1
|
||||||
ACCESS_KEY=TEST_ACCESS_KEY
|
ACCESS_KEY=TEST_ACCESS_KEY
|
||||||
TEMP_DIR=tmp/
|
TEMP_DIR=tmp/
|
||||||
|
#REDIRECT_TO_URL=https://worker-renderworker-re-kekuflqjxx.cn-shanghai.fcapp.run/
|
||||||
|
# QSV
|
||||||
|
ENCODER_ARGS="-c:v h264_qsv -global_quality 28 -look_ahead 1"
|
||||||
|
# NVENC
|
||||||
|
#ENCODER_ARGS="-c:v h264_nvenc -cq:v 24 -preset:v p7 -tune:v hq -profile:v high"
|
5
.gitignore
vendored
5
.gitignore
vendored
@ -6,6 +6,11 @@ __pycache__/
|
|||||||
*.so
|
*.so
|
||||||
.Python
|
.Python
|
||||||
build/
|
build/
|
||||||
|
dist/
|
||||||
|
*.mp4
|
||||||
|
*.ts
|
||||||
|
rand*.ts
|
||||||
|
tmp_concat_*.txt
|
||||||
*.egg-info/
|
*.egg-info/
|
||||||
*.egg
|
*.egg
|
||||||
*.manifest
|
*.manifest
|
||||||
|
21
Dockerfile
Normal file
21
Dockerfile
Normal file
@ -0,0 +1,21 @@
|
|||||||
|
FROM linuxserver/ffmpeg:7.1.1
|
||||||
|
LABEL authors="Jerry Yan"
|
||||||
|
|
||||||
|
RUN sed -i 's@//.*archive.ubuntu.com@//mirrors.ustc.edu.cn@g' /etc/apt/sources.list && \
|
||||||
|
sed -i 's/security.ubuntu.com/mirrors.ustc.edu.cn/g' /etc/apt/sources.list
|
||||||
|
RUN apt-get update && \
|
||||||
|
apt-get install -y --no-install-recommends \
|
||||||
|
python3-pip \
|
||||||
|
python3-dev \
|
||||||
|
python3-setuptools \
|
||||||
|
python3-wheel \
|
||||||
|
python3-venv
|
||||||
|
RUN apt-get clean && \
|
||||||
|
rm -rf /var/lib/apt/lists/*
|
||||||
|
COPY . /app/
|
||||||
|
RUN python3 -m venv /app/venv
|
||||||
|
RUN /app/venv/bin/python -m pip config set global.index-url https://mirrors.ustc.edu.cn/pypi/simple
|
||||||
|
RUN /app/venv/bin/python -m pip install -r /app/requirements.txt
|
||||||
|
|
||||||
|
WORKDIR /app
|
||||||
|
ENTRYPOINT ["/app/venv/bin/python", "app.py"]
|
40
app.py
Normal file
40
app.py
Normal file
@ -0,0 +1,40 @@
|
|||||||
|
import time
|
||||||
|
|
||||||
|
import flask
|
||||||
|
|
||||||
|
import config
|
||||||
|
import biz.task
|
||||||
|
import template
|
||||||
|
from telemetry import init_opentelemetry
|
||||||
|
from template import load_local_template
|
||||||
|
from util import api
|
||||||
|
|
||||||
|
load_local_template()
|
||||||
|
import logging
|
||||||
|
|
||||||
|
LOGGER = logging.getLogger(__name__)
|
||||||
|
init_opentelemetry(batch=False)
|
||||||
|
app = flask.Flask(__name__)
|
||||||
|
|
||||||
|
@app.get('/health/check')
|
||||||
|
def health_check():
|
||||||
|
return api.sync_center()
|
||||||
|
|
||||||
|
@app.post('/')
|
||||||
|
def do_nothing():
|
||||||
|
return "NOOP"
|
||||||
|
|
||||||
|
@app.post('/<task_id>')
|
||||||
|
def do_task(task_id):
|
||||||
|
task_info = api.get_task_info(task_id)
|
||||||
|
local_template_info = template.get_template_def(task_info.get("templateId"))
|
||||||
|
template_info = api.get_template_info(task_info.get("templateId"))
|
||||||
|
if local_template_info:
|
||||||
|
if local_template_info.get("updateTime") != template_info.get("updateTime"):
|
||||||
|
template.download_template(task_info.get("templateId"))
|
||||||
|
biz.task.start_task(task_info)
|
||||||
|
return "OK"
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
app.run(host="0.0.0.0", port=9998)
|
136
biz/ffmpeg.py
136
biz/ffmpeg.py
@ -2,48 +2,83 @@ import json
|
|||||||
import os.path
|
import os.path
|
||||||
import time
|
import time
|
||||||
|
|
||||||
|
from opentelemetry.trace import Status, StatusCode
|
||||||
|
|
||||||
from entity.ffmpeg import FfmpegTask
|
from entity.ffmpeg import FfmpegTask
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
from util import ffmpeg, oss
|
from util import ffmpeg, oss
|
||||||
|
from util.ffmpeg import fade_out_audio
|
||||||
|
from telemetry import get_tracer
|
||||||
|
|
||||||
logger = logging.getLogger('biz/ffmpeg')
|
logger = logging.getLogger('biz/ffmpeg')
|
||||||
|
|
||||||
|
|
||||||
def parse_ffmpeg_task(task_info, template_info):
|
def parse_ffmpeg_task(task_info, template_info):
|
||||||
tasks = []
|
tracer = get_tracer(__name__)
|
||||||
# 中间片段
|
with tracer.start_as_current_span("parse_ffmpeg_task") as span:
|
||||||
task_params_str = task_info.get("taskParams", "{}")
|
tasks = []
|
||||||
task_params = json.loads(task_params_str)
|
# 中间片段
|
||||||
for part in template_info.get("video_parts"):
|
task_params_str = task_info.get("taskParams", "{}")
|
||||||
source = parse_video(part.get('source'), task_params, template_info)
|
span.set_attribute("task_params", task_params_str)
|
||||||
if not source:
|
task_params = json.loads(task_params_str)
|
||||||
logger.warning("no video found for part: " + str(part))
|
task_params_orig = json.loads(task_params_str)
|
||||||
continue
|
for part in template_info.get("video_parts"):
|
||||||
sub_ffmpeg_task = FfmpegTask(source)
|
source = parse_video(part.get('source'), task_params, template_info)
|
||||||
sub_ffmpeg_task.annexb = True
|
if not source:
|
||||||
sub_ffmpeg_task.frame_rate = template_info.get("frame_rate", 25)
|
logger.warning("no video found for part: " + str(part))
|
||||||
for lut in part.get('filters', []):
|
continue
|
||||||
sub_ffmpeg_task.add_lut(os.path.join(template_info.get("local_path"), lut))
|
only_if = part.get('only_if', '')
|
||||||
for audio in part.get('audios', []):
|
if only_if:
|
||||||
sub_ffmpeg_task.add_audios(os.path.join(template_info.get("local_path"), audio))
|
if not check_placeholder_exist(only_if, task_params_orig):
|
||||||
for overlay in part.get('overlays', []):
|
logger.info("because only_if exist, placeholder: %s not exist, skip part: %s", only_if, part)
|
||||||
sub_ffmpeg_task.add_overlay(os.path.join(template_info.get("local_path"), overlay))
|
continue
|
||||||
tasks.append(sub_ffmpeg_task)
|
sub_ffmpeg_task = FfmpegTask(source)
|
||||||
output_file = "out_" + str(time.time()) + ".mp4"
|
sub_ffmpeg_task.resolution = template_info.get("video_size", "")
|
||||||
task = FfmpegTask(tasks, output_file=output_file)
|
sub_ffmpeg_task.annexb = True
|
||||||
overall = template_info.get("overall_template")
|
sub_ffmpeg_task.ext_data = find_placeholder_params(part.get('source'), task_params) or {}
|
||||||
task.frame_rate = template_info.get("frame_rate", 25)
|
sub_ffmpeg_task.frame_rate = template_info.get("frame_rate", 25)
|
||||||
if overall.get('source', ''):
|
sub_ffmpeg_task.center_cut = part.get("crop_mode", None)
|
||||||
source = parse_video(overall.get('source'), task_params, template_info)
|
for effect in part.get('effects', []):
|
||||||
task.add_inputs(source)
|
sub_ffmpeg_task.add_effect(effect)
|
||||||
for lut in overall.get('filters', []):
|
for lut in part.get('filters', []):
|
||||||
task.add_lut(os.path.join(template_info.get("local_path"), lut))
|
sub_ffmpeg_task.add_lut(os.path.join(template_info.get("local_path"), lut))
|
||||||
for audio in overall.get('audios', []):
|
for audio in part.get('audios', []):
|
||||||
task.add_audios(os.path.join(template_info.get("local_path"), audio))
|
sub_ffmpeg_task.add_audios(os.path.join(template_info.get("local_path"), audio))
|
||||||
for overlay in overall.get('overlays', []):
|
for overlay in part.get('overlays', []):
|
||||||
task.add_overlay(os.path.join(template_info.get("local_path"), overlay))
|
sub_ffmpeg_task.add_overlay(os.path.join(template_info.get("local_path"), overlay))
|
||||||
return task
|
tasks.append(sub_ffmpeg_task)
|
||||||
|
output_file = "out_" + str(time.time()) + ".mp4"
|
||||||
|
task = FfmpegTask(tasks, output_file=output_file)
|
||||||
|
task.resolution = template_info.get("video_size", "")
|
||||||
|
overall = template_info.get("overall_template")
|
||||||
|
task.center_cut = template_info.get("crop_mode", None)
|
||||||
|
task.frame_rate = template_info.get("frame_rate", 25)
|
||||||
|
if overall.get('source', ''):
|
||||||
|
source = parse_video(overall.get('source'), task_params, template_info)
|
||||||
|
task.add_inputs(source)
|
||||||
|
for effect in overall.get('effects', []):
|
||||||
|
task.add_effect(effect)
|
||||||
|
for lut in overall.get('filters', []):
|
||||||
|
task.add_lut(os.path.join(template_info.get("local_path"), lut))
|
||||||
|
for audio in overall.get('audios', []):
|
||||||
|
task.add_audios(os.path.join(template_info.get("local_path"), audio))
|
||||||
|
for overlay in overall.get('overlays', []):
|
||||||
|
task.add_overlay(os.path.join(template_info.get("local_path"), overlay))
|
||||||
|
return task
|
||||||
|
|
||||||
|
|
||||||
|
def find_placeholder_params(source, task_params):
|
||||||
|
if source.startswith('PLACEHOLDER_'):
|
||||||
|
placeholder_id = source.replace('PLACEHOLDER_', '')
|
||||||
|
new_sources = task_params.get(placeholder_id, [])
|
||||||
|
if type(new_sources) is list:
|
||||||
|
if len(new_sources) == 0:
|
||||||
|
logger.debug("no video found for placeholder: " + placeholder_id)
|
||||||
|
return {}
|
||||||
|
else:
|
||||||
|
return new_sources[0]
|
||||||
|
return {}
|
||||||
|
|
||||||
|
|
||||||
def parse_video(source, task_params, template_info):
|
def parse_video(source, task_params, template_info):
|
||||||
@ -56,8 +91,8 @@ def parse_video(source, task_params, template_info):
|
|||||||
logger.debug("no video found for placeholder: " + placeholder_id)
|
logger.debug("no video found for placeholder: " + placeholder_id)
|
||||||
return None
|
return None
|
||||||
else:
|
else:
|
||||||
# TODO: Random Pick / Policy Pick
|
_pick_source = new_sources.pop(0)
|
||||||
new_sources = new_sources[0].get("url")
|
new_sources = _pick_source.get("url")
|
||||||
if new_sources.startswith("http"):
|
if new_sources.startswith("http"):
|
||||||
_, source_name = os.path.split(new_sources)
|
_, source_name = os.path.split(new_sources)
|
||||||
oss.download_from_oss(new_sources, source_name)
|
oss.download_from_oss(new_sources, source_name)
|
||||||
@ -66,18 +101,39 @@ def parse_video(source, task_params, template_info):
|
|||||||
return os.path.join(template_info.get("local_path"), source)
|
return os.path.join(template_info.get("local_path"), source)
|
||||||
|
|
||||||
|
|
||||||
|
def check_placeholder_exist(placeholder_id, task_params):
|
||||||
|
if placeholder_id in task_params:
|
||||||
|
new_sources = task_params.get(placeholder_id, [])
|
||||||
|
if type(new_sources) is list:
|
||||||
|
if len(new_sources) == 0:
|
||||||
|
return False
|
||||||
|
else:
|
||||||
|
return True
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
def start_ffmpeg_task(ffmpeg_task):
|
def start_ffmpeg_task(ffmpeg_task):
|
||||||
for task in ffmpeg_task.analyze_input_render_tasks():
|
tracer = get_tracer(__name__)
|
||||||
start_ffmpeg_task(task)
|
with tracer.start_as_current_span("start_ffmpeg_task") as span:
|
||||||
ffmpeg_task.correct_task_type()
|
for task in ffmpeg_task.analyze_input_render_tasks():
|
||||||
return ffmpeg.start_render(ffmpeg_task)
|
result = start_ffmpeg_task(task)
|
||||||
|
if not result:
|
||||||
|
return False
|
||||||
|
ffmpeg_task.correct_task_type()
|
||||||
|
result = ffmpeg.start_render(ffmpeg_task)
|
||||||
|
if not result:
|
||||||
|
span.set_status(Status(StatusCode.ERROR))
|
||||||
|
return False
|
||||||
|
span.set_status(Status(StatusCode.OK))
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
def clear_task_tmp_file(ffmpeg_task):
|
def clear_task_tmp_file(ffmpeg_task):
|
||||||
for task in ffmpeg_task.analyze_input_render_tasks():
|
for task in ffmpeg_task.analyze_input_render_tasks():
|
||||||
clear_task_tmp_file(task)
|
clear_task_tmp_file(task)
|
||||||
try:
|
try:
|
||||||
if "template" not in ffmpeg_task.get_output_file():
|
if os.getenv("TEMPLATE_DIR") not in ffmpeg_task.get_output_file():
|
||||||
os.remove(ffmpeg_task.get_output_file())
|
os.remove(ffmpeg_task.get_output_file())
|
||||||
logger.info("delete tmp file: " + ffmpeg_task.get_output_file())
|
logger.info("delete tmp file: " + ffmpeg_task.get_output_file())
|
||||||
else:
|
else:
|
||||||
|
54
biz/task.py
54
biz/task.py
@ -1,24 +1,40 @@
|
|||||||
|
import json
|
||||||
|
|
||||||
|
from opentelemetry.trace import Status, StatusCode
|
||||||
|
|
||||||
|
from biz.ffmpeg import parse_ffmpeg_task, start_ffmpeg_task, clear_task_tmp_file, probe_video_info, fade_out_audio
|
||||||
|
from telemetry import get_tracer
|
||||||
from template import get_template_def
|
from template import get_template_def
|
||||||
from util import api
|
from util import api
|
||||||
|
|
||||||
|
|
||||||
def start_task(task_info):
|
def start_task(task_info):
|
||||||
from biz.ffmpeg import parse_ffmpeg_task, start_ffmpeg_task, clear_task_tmp_file, probe_video_info
|
tracer = get_tracer(__name__)
|
||||||
task_info = api.normalize_task(task_info)
|
with tracer.start_as_current_span("start_task") as span:
|
||||||
template_info = get_template_def(task_info.get("templateId"))
|
task_info = api.normalize_task(task_info)
|
||||||
api.report_task_start(task_info)
|
span.set_attribute("task", json.dumps(task_info))
|
||||||
ffmpeg_task = parse_ffmpeg_task(task_info, template_info)
|
span.set_attribute("scenicId", task_info.get("scenicId", "?"))
|
||||||
result = start_ffmpeg_task(ffmpeg_task)
|
span.set_attribute("templateId", task_info.get("templateId"))
|
||||||
if not result:
|
template_info = get_template_def(task_info.get("templateId"))
|
||||||
return api.report_task_failed(task_info)
|
api.report_task_start(task_info)
|
||||||
oss_result = api.upload_task_file(task_info, ffmpeg_task)
|
ffmpeg_task = parse_ffmpeg_task(task_info, template_info)
|
||||||
if not oss_result:
|
result = start_ffmpeg_task(ffmpeg_task)
|
||||||
return api.report_task_failed(task_info)
|
if not result:
|
||||||
# 获取视频长度宽度和时长
|
span.set_status(Status(StatusCode.ERROR))
|
||||||
width, height, duration = probe_video_info(ffmpeg_task)
|
return api.report_task_failed(task_info)
|
||||||
clear_task_tmp_file(ffmpeg_task)
|
width, height, duration = probe_video_info(ffmpeg_task)
|
||||||
api.report_task_success(task_info, videoInfo={
|
# 音频淡出
|
||||||
"width": width,
|
new_fn = fade_out_audio(ffmpeg_task.get_output_file(), duration)
|
||||||
"height": height,
|
ffmpeg_task.set_output_file(new_fn)
|
||||||
"duration": duration
|
oss_result = api.upload_task_file(task_info, ffmpeg_task)
|
||||||
})
|
if not oss_result:
|
||||||
|
span.set_status(Status(StatusCode.ERROR))
|
||||||
|
return api.report_task_failed(task_info)
|
||||||
|
# 获取视频长度宽度和时长
|
||||||
|
clear_task_tmp_file(ffmpeg_task)
|
||||||
|
api.report_task_success(task_info, videoInfo={
|
||||||
|
"width": width,
|
||||||
|
"height": height,
|
||||||
|
"duration": duration
|
||||||
|
})
|
||||||
|
span.set_status(Status(StatusCode.OK))
|
||||||
|
261
entity/ffmpeg.py
261
entity/ffmpeg.py
@ -1,9 +1,20 @@
|
|||||||
|
import json
|
||||||
|
import os
|
||||||
import time
|
import time
|
||||||
import uuid
|
import uuid
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
DEFAULT_ARGS = ("-shortest",)
|
||||||
|
ENCODER_ARGS = ("-c:v", "h264", ) if not os.getenv("ENCODER_ARGS", False) else os.getenv("ENCODER_ARGS", "").split(" ")
|
||||||
|
VIDEO_ARGS = ("-profile:v", "high", "-level:v", "4", )
|
||||||
|
AUDIO_ARGS = ("-c:a", "aac", "-b:a", "128k", "-ar", "48000", "-ac", "2", )
|
||||||
|
MUTE_AUDIO_INPUT = ("-f", "lavfi", "-i", "anullsrc=cl=stereo:r=48000", )
|
||||||
|
|
||||||
|
|
||||||
class FfmpegTask(object):
|
class FfmpegTask(object):
|
||||||
|
|
||||||
|
effects: list[str]
|
||||||
|
|
||||||
def __init__(self, input_file, task_type='copy', output_file=''):
|
def __init__(self, input_file, task_type='copy', output_file=''):
|
||||||
self.annexb = False
|
self.annexb = False
|
||||||
if type(input_file) is str:
|
if type(input_file) is str:
|
||||||
@ -14,15 +25,20 @@ class FfmpegTask(object):
|
|||||||
self.input_file = input_file
|
self.input_file = input_file
|
||||||
else:
|
else:
|
||||||
self.input_file = []
|
self.input_file = []
|
||||||
|
self.zoom_cut = None
|
||||||
|
self.center_cut = None
|
||||||
|
self.ext_data = {}
|
||||||
self.task_type = task_type
|
self.task_type = task_type
|
||||||
self.output_file = output_file
|
self.output_file = output_file
|
||||||
self.mute = True
|
self.mute = True
|
||||||
self.speed = 1
|
self.speed = 1
|
||||||
self.frame_rate = 25
|
self.frame_rate = 25
|
||||||
|
self.resolution = None
|
||||||
self.subtitles = []
|
self.subtitles = []
|
||||||
self.luts = []
|
self.luts = []
|
||||||
self.audios = []
|
self.audios = []
|
||||||
self.overlays = []
|
self.overlays = []
|
||||||
|
self.effects = []
|
||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
_str = f'FfmpegTask(input_file={self.input_file}, task_type={self.task_type}'
|
_str = f'FfmpegTask(input_file={self.input_file}, task_type={self.task_type}'
|
||||||
@ -34,8 +50,11 @@ class FfmpegTask(object):
|
|||||||
_str += f', overlays={self.overlays}'
|
_str += f', overlays={self.overlays}'
|
||||||
if self.annexb:
|
if self.annexb:
|
||||||
_str += f', annexb={self.annexb}'
|
_str += f', annexb={self.annexb}'
|
||||||
|
if self.effects:
|
||||||
|
_str += f', effects={self.effects}'
|
||||||
if self.mute:
|
if self.mute:
|
||||||
_str += f', mute={self.mute}'
|
_str += f', mute={self.mute}'
|
||||||
|
_str += f', center_cut={self.center_cut}'
|
||||||
return _str + ')'
|
return _str + ')'
|
||||||
|
|
||||||
def analyze_input_render_tasks(self):
|
def analyze_input_render_tasks(self):
|
||||||
@ -77,6 +96,10 @@ class FfmpegTask(object):
|
|||||||
self.luts.extend(luts)
|
self.luts.extend(luts)
|
||||||
self.correct_task_type()
|
self.correct_task_type()
|
||||||
|
|
||||||
|
def add_effect(self, *effects):
|
||||||
|
self.effects.extend(effects)
|
||||||
|
self.correct_task_type()
|
||||||
|
|
||||||
def get_output_file(self):
|
def get_output_file(self):
|
||||||
if self.task_type == 'copy':
|
if self.task_type == 'copy':
|
||||||
return self.input_file[0]
|
return self.input_file[0]
|
||||||
@ -99,8 +122,14 @@ class FfmpegTask(object):
|
|||||||
return False
|
return False
|
||||||
if len(self.subtitles) > 0:
|
if len(self.subtitles) > 0:
|
||||||
return False
|
return False
|
||||||
|
if len(self.effects) > 0:
|
||||||
|
return False
|
||||||
if self.speed != 1:
|
if self.speed != 1:
|
||||||
return False
|
return False
|
||||||
|
if self.zoom_cut is not None:
|
||||||
|
return False
|
||||||
|
if self.center_cut is not None:
|
||||||
|
return False
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def check_can_copy(self):
|
def check_can_copy(self):
|
||||||
@ -110,45 +139,114 @@ class FfmpegTask(object):
|
|||||||
return False
|
return False
|
||||||
if len(self.subtitles) > 0:
|
if len(self.subtitles) > 0:
|
||||||
return False
|
return False
|
||||||
|
if len(self.effects) > 0:
|
||||||
|
return False
|
||||||
if self.speed != 1:
|
if self.speed != 1:
|
||||||
return False
|
return False
|
||||||
if len(self.audios) > 1:
|
if len(self.audios) >= 1:
|
||||||
return False
|
return False
|
||||||
if len(self.input_file) > 1:
|
if len(self.input_file) > 1:
|
||||||
return False
|
return False
|
||||||
|
if self.zoom_cut is not None:
|
||||||
|
return False
|
||||||
|
if self.center_cut is not None:
|
||||||
|
return False
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def check_audio_track(self):
|
def check_audio_track(self):
|
||||||
if len(self.audios) > 0:
|
...
|
||||||
self.mute = False
|
|
||||||
|
|
||||||
def get_ffmpeg_args(self):
|
def get_ffmpeg_args(self):
|
||||||
args = ['-y', '-hide_banner']
|
args = ['-y', '-hide_banner']
|
||||||
if self.task_type == 'encode':
|
if self.task_type == 'encode':
|
||||||
# args += ('-hwaccel', 'qsv', '-hwaccel_output_format', 'qsv')
|
|
||||||
input_args = []
|
input_args = []
|
||||||
filter_args = []
|
filter_args = []
|
||||||
output_args = ["-shortest", "-c:v", "h264_qsv", "-global_quality", "28", "-look_ahead", "1"]
|
output_args = [*VIDEO_ARGS, *AUDIO_ARGS, *ENCODER_ARGS, *DEFAULT_ARGS]
|
||||||
if self.annexb:
|
if self.annexb:
|
||||||
output_args.append("-bsf:v")
|
output_args.append("-bsf:v")
|
||||||
output_args.append("h264_mp4toannexb")
|
output_args.append("h264_mp4toannexb")
|
||||||
|
output_args.append("-reset_timestamps")
|
||||||
|
output_args.append("1")
|
||||||
video_output_str = "[0:v]"
|
video_output_str = "[0:v]"
|
||||||
audio_output_str = "[0:v]"
|
audio_output_str = ""
|
||||||
video_input_count = 0
|
audio_track_index = 0
|
||||||
audio_input_count = 0
|
effect_index = 0
|
||||||
for input_file in self.input_file:
|
for input_file in self.input_file:
|
||||||
input_args.append("-i")
|
input_args.append("-i")
|
||||||
if type(input_file) is str:
|
if type(input_file) is str:
|
||||||
input_args.append(input_file)
|
input_args.append(input_file)
|
||||||
elif isinstance(input_file, FfmpegTask):
|
elif isinstance(input_file, FfmpegTask):
|
||||||
input_args.append(input_file.get_output_file())
|
input_args.append(input_file.get_output_file())
|
||||||
|
if self.center_cut == 1:
|
||||||
|
pos_json_str = self.ext_data.get('posJson', '{}')
|
||||||
|
pos_json = json.loads(pos_json_str)
|
||||||
|
_v_w = pos_json.get('imgWidth', 1)
|
||||||
|
_f_x = pos_json.get('ltX', 0)
|
||||||
|
_f_x2 = pos_json.get('rbX', 0)
|
||||||
|
_x = f'{float((_f_x2 + _f_x)/(2 * _v_w)) :.4f}*iw-ih*ih/(2*iw)'
|
||||||
|
filter_args.append(f"{video_output_str}crop=x={_x}:y=0:w=ih*ih/iw:h=ih[v_cut{effect_index}]")
|
||||||
|
video_output_str = f"[v_cut{effect_index}]"
|
||||||
|
effect_index += 1
|
||||||
|
for effect in self.effects:
|
||||||
|
if effect.startswith("cameraShot:"):
|
||||||
|
param = effect.split(":", 2)[1]
|
||||||
|
if param == '':
|
||||||
|
param = "3,1,0"
|
||||||
|
_split = param.split(",")
|
||||||
|
start = 3
|
||||||
|
duration = 1
|
||||||
|
rotate_deg = 0
|
||||||
|
if len(_split) >= 3:
|
||||||
|
if _split[2] == '':
|
||||||
|
rotate_deg = 0
|
||||||
|
else:
|
||||||
|
rotate_deg = int(_split[2])
|
||||||
|
if len(_split) >= 2:
|
||||||
|
duration = float(_split[1])
|
||||||
|
if len(_split) >= 1:
|
||||||
|
start = float(_split[0])
|
||||||
|
_start_out_str = "[eff_s]"
|
||||||
|
_mid_out_str = "[eff_m]"
|
||||||
|
_end_out_str = "[eff_e]"
|
||||||
|
filter_args.append(f"{video_output_str}split=3{_start_out_str}{_mid_out_str}{_end_out_str}")
|
||||||
|
filter_args.append(f"{_start_out_str}select=lt(n\\,{int(start*self.frame_rate)}){_start_out_str}")
|
||||||
|
filter_args.append(f"{_end_out_str}select=gt(n\\,{int(start*self.frame_rate)}){_end_out_str}")
|
||||||
|
filter_args.append(f"{_mid_out_str}select=eq(n\\,{int(start*self.frame_rate)}){_mid_out_str}")
|
||||||
|
filter_args.append(f"{_mid_out_str}tpad=start_mode=clone:start_duration={duration:.4f}{_mid_out_str}")
|
||||||
|
if rotate_deg != 0:
|
||||||
|
filter_args.append(f"{_mid_out_str}rotate=PI*{rotate_deg}/360{_mid_out_str}")
|
||||||
|
# filter_args.append(f"{video_output_str}trim=start=0:end={start+duration},tpad=stop_mode=clone:stop_duration={duration},setpts=PTS-STARTPTS{_start_out_str}")
|
||||||
|
# filter_args.append(f"tpad=start_mode=clone:start_duration={duration},setpts=PTS-STARTPTS{_start_out_str}")
|
||||||
|
# filter_args.append(f"{_end_out_str}trim=start={start}{_end_out_str}")
|
||||||
|
video_output_str = f"[v_eff{effect_index}]"
|
||||||
|
# filter_args.append(f"{_end_out_str}{_start_out_str}overlay=eof_action=pass{video_output_str}")
|
||||||
|
filter_args.append(f"{_start_out_str}{_mid_out_str}{_end_out_str}concat=n=3:v=1:a=0,setpts=N/{self.frame_rate}/TB{video_output_str}")
|
||||||
|
effect_index += 1
|
||||||
|
elif effect.startswith("ospeed:"):
|
||||||
|
param = effect.split(":", 2)[1]
|
||||||
|
if param == '':
|
||||||
|
param = "1"
|
||||||
|
if param != "1":
|
||||||
|
# 视频变速
|
||||||
|
effect_index += 1
|
||||||
|
filter_args.append(f"{video_output_str}setpts={param}*PTS[v_eff{effect_index}]")
|
||||||
|
video_output_str = f"[v_eff{effect_index}]"
|
||||||
|
elif effect.startswith("zoom:"):
|
||||||
|
...
|
||||||
|
...
|
||||||
for lut in self.luts:
|
for lut in self.luts:
|
||||||
filter_args.append("[0:v]lut3d=file=" + lut + "[0:v]")
|
filter_args.append(f"{video_output_str}lut3d=file={lut}{video_output_str}")
|
||||||
|
if self.resolution:
|
||||||
|
filter_args.append(f"{video_output_str}scale={self.resolution.replace('x', ':')}[v]")
|
||||||
|
video_output_str = "[v]"
|
||||||
for overlay in self.overlays:
|
for overlay in self.overlays:
|
||||||
input_index = input_args.count("-i")
|
input_index = input_args.count("-i")
|
||||||
input_args.append("-i")
|
input_args.append("-i")
|
||||||
input_args.append(overlay)
|
input_args.append(overlay)
|
||||||
filter_args.append(f"{video_output_str}[{input_index}:v]scale=rw:rh[v]")
|
if os.getenv("OLD_FFMPEG"):
|
||||||
|
filter_args.append(f"{video_output_str}[{input_index}:v]scale2ref=iw:ih[v]")
|
||||||
|
else:
|
||||||
|
filter_args.append(f"{video_output_str}[{input_index}:v]scale=rw:rh[v]")
|
||||||
filter_args.append(f"[v][{input_index}:v]overlay=1:eof_action=endall[v]")
|
filter_args.append(f"[v][{input_index}:v]overlay=1:eof_action=endall[v]")
|
||||||
video_output_str = "[v]"
|
video_output_str = "[v]"
|
||||||
for subtitle in self.subtitles:
|
for subtitle in self.subtitles:
|
||||||
@ -159,97 +257,96 @@ class FfmpegTask(object):
|
|||||||
output_args.append("-r")
|
output_args.append("-r")
|
||||||
output_args.append(f"{self.frame_rate}")
|
output_args.append(f"{self.frame_rate}")
|
||||||
if self.mute:
|
if self.mute:
|
||||||
output_args.append("-an")
|
input_index = input_args.count("-i")
|
||||||
|
input_args += MUTE_AUDIO_INPUT
|
||||||
|
filter_args.append(f"[{input_index}:a]acopy[a]")
|
||||||
|
audio_track_index += 1
|
||||||
|
audio_output_str = "[a]"
|
||||||
else:
|
else:
|
||||||
input_index = 0
|
audio_output_str = "[0:a]"
|
||||||
for audio in self.audios:
|
audio_track_index += 1
|
||||||
input_index = input_args.count("-i")
|
for audio in self.audios:
|
||||||
input_args.append("-i")
|
input_index = input_args.count("-i")
|
||||||
input_args.append(audio.replace("\\", "/"))
|
input_args.append("-i")
|
||||||
if audio_input_count > 0:
|
input_args.append(audio.replace("\\", "/"))
|
||||||
filter_args.append(f"{audio_output_str}[{input_index}:a]amix[a]")
|
audio_track_index += 1
|
||||||
audio_output_str = "[a]"
|
filter_args.append(f"{audio_output_str}[{input_index}:a]amix=duration=shortest:dropout_transition=0:normalize=0[a]")
|
||||||
else:
|
audio_output_str = "[a]"
|
||||||
audio_output_str = f"[{input_index}:a]"
|
if audio_output_str:
|
||||||
audio_input_count += 1
|
output_args.append("-map")
|
||||||
if audio_input_count == 1:
|
|
||||||
audio_output_str = f"{input_index}"
|
|
||||||
output_args.append(f"-map")
|
|
||||||
output_args.append(audio_output_str)
|
output_args.append(audio_output_str)
|
||||||
return args + input_args + ["-filter_complex", ";".join(filter_args)] + output_args + [self.get_output_file()]
|
_filter_args = [] if len(filter_args) == 0 else ["-filter_complex", ";".join(filter_args)]
|
||||||
|
return args + input_args + _filter_args + output_args + [self.get_output_file()]
|
||||||
elif self.task_type == 'concat':
|
elif self.task_type == 'concat':
|
||||||
# 无法通过 annexb 合并的
|
# 无法通过 annexb 合并的
|
||||||
input_args = []
|
input_args = []
|
||||||
output_args = ["-shortest"]
|
output_args = [*DEFAULT_ARGS]
|
||||||
if self.check_annexb() and len(self.audios) <= 1:
|
filter_args = []
|
||||||
# output_args
|
audio_output_str = ""
|
||||||
if len(self.audios) > 0:
|
audio_track_index = 0
|
||||||
input_args.append("-an")
|
# output_args
|
||||||
_tmp_file = "tmp_concat_"+str(time.time())+".txt"
|
if len(self.input_file) == 1:
|
||||||
|
_file = self.input_file[0]
|
||||||
|
from util.ffmpeg import probe_video_audio
|
||||||
|
if type(_file) is str:
|
||||||
|
input_args += ["-i", _file]
|
||||||
|
self.mute = not probe_video_audio(_file)
|
||||||
|
elif isinstance(_file, FfmpegTask):
|
||||||
|
input_args += ["-i", _file.get_output_file()]
|
||||||
|
self.mute = not probe_video_audio(_file.get_output_file())
|
||||||
|
else:
|
||||||
|
_tmp_file = "tmp_concat_" + str(time.time()) + ".txt"
|
||||||
|
from util.ffmpeg import probe_video_audio
|
||||||
with open(_tmp_file, "w", encoding="utf-8") as f:
|
with open(_tmp_file, "w", encoding="utf-8") as f:
|
||||||
for input_file in self.input_file:
|
for input_file in self.input_file:
|
||||||
if type(input_file) is str:
|
if type(input_file) is str:
|
||||||
f.write("file '"+input_file+"'\n")
|
f.write("file '" + input_file + "'\n")
|
||||||
elif isinstance(input_file, FfmpegTask):
|
elif isinstance(input_file, FfmpegTask):
|
||||||
f.write("file '" + input_file.get_output_file() + "'\n")
|
f.write("file '" + input_file.get_output_file() + "'\n")
|
||||||
input_args += ("-f", "concat", "-safe", "0", "-i", _tmp_file)
|
input_args += ["-f", "concat", "-safe", "0", "-i", _tmp_file]
|
||||||
output_args.append("-c:v")
|
self.mute = not probe_video_audio(_tmp_file, "concat")
|
||||||
output_args.append("copy")
|
output_args.append("-map")
|
||||||
if len(self.audios) > 0:
|
output_args.append("0:v")
|
||||||
input_args.append("-i")
|
output_args.append("-c:v")
|
||||||
input_args.append(self.audios[0])
|
output_args.append("copy")
|
||||||
output_args.append("-c:a")
|
if self.mute:
|
||||||
output_args.append("copy")
|
input_index = input_args.count("-i")
|
||||||
output_args.append("-f")
|
input_args += MUTE_AUDIO_INPUT
|
||||||
output_args.append("mp4")
|
audio_output_str = f"[{input_index}:a]"
|
||||||
output_args += ("-c:v", "h264_qsv", "-r", "25", "-global_quality", "28", "-look_ahead", "1")
|
audio_track_index += 1
|
||||||
return args + input_args + output_args + [self.get_output_file()]
|
else:
|
||||||
output_args += ("-c:v", "h264_qsv", "-r", "25", "-global_quality", "28", "-look_ahead", "1")
|
audio_output_str = "[0:a]"
|
||||||
filter_args = []
|
audio_track_index += 1
|
||||||
video_output_str = "[0:v]"
|
for audio in self.audios:
|
||||||
audio_output_str = "[0:a]"
|
|
||||||
video_input_count = 0
|
|
||||||
audio_input_count = 0
|
|
||||||
for input_file in self.input_file:
|
|
||||||
input_index = input_args.count("-i")
|
input_index = input_args.count("-i")
|
||||||
input_args.append("-i")
|
input_args.append("-i")
|
||||||
if type(input_file) is str:
|
input_args.append(audio.replace("\\", "/"))
|
||||||
input_args.append(input_file.replace("\\", "/"))
|
audio_track_index += 1
|
||||||
elif isinstance(input_file, FfmpegTask):
|
filter_args.append(f"{audio_output_str}[{input_index}:a]amix=duration=shortest:dropout_transition=0:normalize=0[a]")
|
||||||
input_args.append(input_file.get_output_file().replace("\\", "/"))
|
audio_output_str = "[a]"
|
||||||
if video_input_count > 0:
|
if audio_output_str:
|
||||||
filter_args.append(f"{video_output_str}[{input_index}:v]concat=n=2:v=1:a=0[v]")
|
output_args.append("-map")
|
||||||
video_output_str = "[v]"
|
if audio_track_index <= 1:
|
||||||
|
output_args.append(audio_output_str[1:-1])
|
||||||
else:
|
else:
|
||||||
video_output_str = f"[{input_index}:v]"
|
output_args.append(audio_output_str)
|
||||||
video_input_count += 1
|
output_args += AUDIO_ARGS
|
||||||
output_args.append("-map")
|
if self.annexb:
|
||||||
output_args.append(video_output_str)
|
output_args.append("-bsf:v")
|
||||||
if self.mute:
|
output_args.append("h264_mp4toannexb")
|
||||||
output_args.append("-an")
|
output_args.append("-bsf:a")
|
||||||
else:
|
output_args.append("setts=pts=DTS")
|
||||||
input_index = 0
|
output_args.append("-f")
|
||||||
for audio in self.audios:
|
output_args.append("mpegts" if self.annexb else "mp4")
|
||||||
input_index = input_args.count("-i")
|
_filter_args = [] if len(filter_args) == 0 else ["-filter_complex", ";".join(filter_args)]
|
||||||
input_args.append("-i")
|
return args + input_args + _filter_args + output_args + [self.get_output_file()]
|
||||||
input_args.append(audio.replace("\\", "/"))
|
|
||||||
if audio_input_count > 0:
|
|
||||||
filter_args.append(f"{audio_output_str}[{input_index}:a]amix[a]")
|
|
||||||
audio_output_str = "[a]"
|
|
||||||
else:
|
|
||||||
audio_output_str = f"[{input_index}:a]"
|
|
||||||
audio_input_count += 1
|
|
||||||
if audio_input_count == 1:
|
|
||||||
audio_output_str = f"{input_index}"
|
|
||||||
output_args.append(f"-map")
|
|
||||||
output_args.append(audio_output_str)
|
|
||||||
return args + input_args + ["-filter_complex", ";".join(filter_args)] + output_args + [self.get_output_file()]
|
|
||||||
elif self.task_type == 'copy':
|
elif self.task_type == 'copy':
|
||||||
if len(self.input_file) == 1:
|
if len(self.input_file) == 1:
|
||||||
if type(self.input_file[0]) is str:
|
if type(self.input_file[0]) is str:
|
||||||
if self.input_file[0] == self.get_output_file():
|
if self.input_file[0] == self.get_output_file():
|
||||||
return []
|
return []
|
||||||
return args + ["-i", self.input_file[0]] + ["-c", "copy", self.get_output_file()]
|
return args + ["-i", self.input_file[0]] + ["-c", "copy", self.get_output_file()]
|
||||||
|
return []
|
||||||
|
|
||||||
def set_output_file(self, file=None):
|
def set_output_file(self, file=None):
|
||||||
if file is None:
|
if file is None:
|
||||||
|
31
index.py
31
index.py
@ -1,19 +1,42 @@
|
|||||||
from time import sleep
|
from time import sleep
|
||||||
|
|
||||||
import biz.task
|
|
||||||
import config
|
import config
|
||||||
|
import biz.task
|
||||||
|
from telemetry import init_opentelemetry
|
||||||
from template import load_local_template
|
from template import load_local_template
|
||||||
from util import api
|
from util import api
|
||||||
|
|
||||||
load_local_template()
|
import os
|
||||||
|
import glob
|
||||||
|
|
||||||
|
load_local_template()
|
||||||
|
import logging
|
||||||
|
|
||||||
|
LOGGER = logging.getLogger(__name__)
|
||||||
|
init_opentelemetry()
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
# print(get_sys_info())
|
# print(get_sys_info())
|
||||||
print("waiting for task...")
|
print("waiting for task...")
|
||||||
task_list = api.sync_center()
|
try:
|
||||||
|
task_list = api.sync_center()
|
||||||
|
except Exception as e:
|
||||||
|
LOGGER.error("sync_center error", exc_info=e)
|
||||||
|
sleep(5)
|
||||||
|
continue
|
||||||
if len(task_list) == 0:
|
if len(task_list) == 0:
|
||||||
|
# 删除当前文件夹下所有以.mp4、.ts结尾的文件
|
||||||
|
for file_globs in ['*.mp4', '*.ts', 'tmp_concat*.txt']:
|
||||||
|
for file_path in glob.glob(file_globs):
|
||||||
|
try:
|
||||||
|
os.remove(file_path)
|
||||||
|
print(f"Deleted file: {file_path}")
|
||||||
|
except Exception as e:
|
||||||
|
LOGGER.error(f"Error deleting file {file_path}", exc_info=e)
|
||||||
sleep(5)
|
sleep(5)
|
||||||
for task in task_list:
|
for task in task_list:
|
||||||
print("start task:", task)
|
print("start task:", task)
|
||||||
biz.task.start_task(task)
|
try:
|
||||||
|
biz.task.start_task(task)
|
||||||
|
except Exception as e:
|
||||||
|
LOGGER.error("task_start error", exc_info=e)
|
||||||
|
@ -1,3 +1,7 @@
|
|||||||
requests~=2.32.3
|
requests~=2.32.3
|
||||||
psutil~=6.1.0
|
psutil~=6.1.0
|
||||||
python-dotenv~=1.0.1
|
python-dotenv~=1.0.1
|
||||||
|
opentelemetry-api~=1.30.0
|
||||||
|
opentelemetry-sdk~=1.30.0
|
||||||
|
opentelemetry-exporter-otlp~=1.30.0
|
||||||
|
flask~=3.1.0
|
35
telemetry/__init__.py
Normal file
35
telemetry/__init__.py
Normal file
@ -0,0 +1,35 @@
|
|||||||
|
import os
|
||||||
|
|
||||||
|
from constant import SOFTWARE_VERSION
|
||||||
|
from opentelemetry import trace
|
||||||
|
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter as OTLPSpanHttpExporter
|
||||||
|
from opentelemetry.sdk.resources import DEPLOYMENT_ENVIRONMENT, HOST_NAME, Resource, SERVICE_NAME, SERVICE_VERSION
|
||||||
|
from opentelemetry.sdk.trace import TracerProvider
|
||||||
|
from opentelemetry.sdk.trace.export import BatchSpanProcessor, SimpleSpanProcessor
|
||||||
|
|
||||||
|
|
||||||
|
def get_tracer(name):
|
||||||
|
return trace.get_tracer(name)
|
||||||
|
|
||||||
|
# 初始化 OpenTelemetry
|
||||||
|
def init_opentelemetry(batch=True):
|
||||||
|
# 设置服务名、主机名
|
||||||
|
resource = Resource(attributes={
|
||||||
|
SERVICE_NAME: "RENDER_WORKER",
|
||||||
|
SERVICE_VERSION: SOFTWARE_VERSION,
|
||||||
|
DEPLOYMENT_ENVIRONMENT: "Python",
|
||||||
|
HOST_NAME: os.getenv("ACCESS_KEY"),
|
||||||
|
})
|
||||||
|
|
||||||
|
# 使用HTTP协议上报
|
||||||
|
if batch:
|
||||||
|
span_processor = BatchSpanProcessor(OTLPSpanHttpExporter(
|
||||||
|
endpoint="https://oltp.jerryyan.top/v1/traces",
|
||||||
|
))
|
||||||
|
else:
|
||||||
|
span_processor = SimpleSpanProcessor(OTLPSpanHttpExporter(
|
||||||
|
endpoint="https://oltp.jerryyan.top/v1/traces",
|
||||||
|
))
|
||||||
|
|
||||||
|
trace_provider = TracerProvider(resource=resource, active_span_processor=span_processor)
|
||||||
|
trace.set_tracer_provider(trace_provider)
|
@ -2,6 +2,7 @@ import json
|
|||||||
import os
|
import os
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
|
from telemetry import get_tracer
|
||||||
from util import api, oss
|
from util import api, oss
|
||||||
|
|
||||||
TEMPLATES = {}
|
TEMPLATES = {}
|
||||||
@ -75,48 +76,50 @@ def get_template_def(template_id):
|
|||||||
return TEMPLATES.get(template_id)
|
return TEMPLATES.get(template_id)
|
||||||
|
|
||||||
def download_template(template_id):
|
def download_template(template_id):
|
||||||
template_info = api.get_template_info(template_id)
|
tracer = get_tracer(__name__)
|
||||||
if not os.path.isdir(template_info['local_path']):
|
with tracer.start_as_current_span("download_template"):
|
||||||
os.makedirs(template_info['local_path'])
|
template_info = api.get_template_info(template_id)
|
||||||
# download template assets
|
if not os.path.isdir(template_info['local_path']):
|
||||||
overall_template = template_info['overall_template']
|
os.makedirs(template_info['local_path'])
|
||||||
video_parts = template_info['video_parts']
|
# download template assets
|
||||||
def _download_assets(_template):
|
overall_template = template_info['overall_template']
|
||||||
if 'source' in _template:
|
video_parts = template_info['video_parts']
|
||||||
if str(_template['source']).startswith("http"):
|
def _download_assets(_template):
|
||||||
_, _fn = os.path.split(_template['source'])
|
if 'source' in _template:
|
||||||
new_fp = os.path.join(template_info['local_path'], _fn)
|
if str(_template['source']).startswith("http"):
|
||||||
oss.download_from_oss(_template['source'], new_fp)
|
_, _fn = os.path.split(_template['source'])
|
||||||
if _fn.endswith(".mp4"):
|
new_fp = os.path.join(template_info['local_path'], _fn)
|
||||||
from util.ffmpeg import to_annexb
|
oss.download_from_oss(_template['source'], new_fp)
|
||||||
new_fp = to_annexb(new_fp)
|
if _fn.endswith(".mp4"):
|
||||||
_template['source'] = os.path.relpath(new_fp, template_info['local_path'])
|
from util.ffmpeg import re_encode_and_annexb
|
||||||
if 'overlays' in _template:
|
new_fp = re_encode_and_annexb(new_fp)
|
||||||
for i in range(len(_template['overlays'])):
|
_template['source'] = os.path.relpath(new_fp, template_info['local_path'])
|
||||||
overlay = _template['overlays'][i]
|
if 'overlays' in _template:
|
||||||
if str(overlay).startswith("http"):
|
for i in range(len(_template['overlays'])):
|
||||||
_, _fn = os.path.split(overlay)
|
overlay = _template['overlays'][i]
|
||||||
oss.download_from_oss(overlay, os.path.join(template_info['local_path'], _fn))
|
if str(overlay).startswith("http"):
|
||||||
_template['overlays'][i] = _fn
|
_, _fn = os.path.split(overlay)
|
||||||
if 'luts' in _template:
|
oss.download_from_oss(overlay, os.path.join(template_info['local_path'], _fn))
|
||||||
for i in range(len(_template['luts'])):
|
_template['overlays'][i] = _fn
|
||||||
lut = _template['luts'][i]
|
if 'luts' in _template:
|
||||||
if str(lut).startswith("http"):
|
for i in range(len(_template['luts'])):
|
||||||
_, _fn = os.path.split(lut)
|
lut = _template['luts'][i]
|
||||||
oss.download_from_oss(lut, os.path.join(template_info['local_path'], _fn))
|
if str(lut).startswith("http"):
|
||||||
_template['luts'][i] = _fn
|
_, _fn = os.path.split(lut)
|
||||||
if 'audios' in _template:
|
oss.download_from_oss(lut, os.path.join(template_info['local_path'], _fn))
|
||||||
for i in range(len(_template['audios'])):
|
_template['luts'][i] = _fn
|
||||||
if str(_template['audios'][i]).startswith("http"):
|
if 'audios' in _template:
|
||||||
_, _fn = os.path.split(_template['audios'][i])
|
for i in range(len(_template['audios'])):
|
||||||
oss.download_from_oss(_template['audios'][i], os.path.join(template_info['local_path'], _fn))
|
if str(_template['audios'][i]).startswith("http"):
|
||||||
_template['audios'][i] = _fn
|
_, _fn = os.path.split(_template['audios'][i])
|
||||||
_download_assets(overall_template)
|
oss.download_from_oss(_template['audios'][i], os.path.join(template_info['local_path'], _fn))
|
||||||
for video_part in video_parts:
|
_template['audios'][i] = _fn
|
||||||
_download_assets(video_part)
|
_download_assets(overall_template)
|
||||||
with open(os.path.join(template_info['local_path'], 'template.json'), 'w', encoding='utf-8') as f:
|
for video_part in video_parts:
|
||||||
json.dump(template_info, f)
|
_download_assets(video_part)
|
||||||
load_template(template_id, template_info['local_path'])
|
with open(os.path.join(template_info['local_path'], 'template.json'), 'w', encoding='utf-8') as f:
|
||||||
|
json.dump(template_info, f)
|
||||||
|
load_template(template_id, template_info['local_path'])
|
||||||
|
|
||||||
|
|
||||||
def analyze_template(template_id):
|
def analyze_template(template_id):
|
||||||
|
276
util/api.py
276
util/api.py
@ -1,9 +1,14 @@
|
|||||||
|
import json
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
|
import threading
|
||||||
|
|
||||||
import requests
|
import requests
|
||||||
|
from opentelemetry.trace import Status, StatusCode
|
||||||
|
|
||||||
import util.system
|
import util.system
|
||||||
|
from telemetry import get_tracer
|
||||||
|
from util import oss
|
||||||
|
|
||||||
session = requests.Session()
|
session = requests.Session()
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
@ -19,13 +24,13 @@ def sync_center():
|
|||||||
通过接口获取任务
|
通过接口获取任务
|
||||||
:return: 任务列表
|
:return: 任务列表
|
||||||
"""
|
"""
|
||||||
|
from template import TEMPLATES, download_template
|
||||||
try:
|
try:
|
||||||
from template import TEMPLATES, download_template
|
|
||||||
|
|
||||||
response = session.post(os.getenv('API_ENDPOINT') + "/sync", json={
|
response = session.post(os.getenv('API_ENDPOINT') + "/sync", json={
|
||||||
'accessKey': os.getenv('ACCESS_KEY'),
|
'accessKey': os.getenv('ACCESS_KEY'),
|
||||||
'clientStatus': util.system.get_sys_info(),
|
'clientStatus': util.system.get_sys_info(),
|
||||||
'templateList': [{'id': t.get('id', ''), 'updateTime': t.get('updateTime', '')} for t in TEMPLATES.values()]
|
'templateList': [{'id': t.get('id', ''), 'updateTime': t.get('updateTime', '')} for t in
|
||||||
|
TEMPLATES.values()]
|
||||||
}, timeout=10)
|
}, timeout=10)
|
||||||
response.raise_for_status()
|
response.raise_for_status()
|
||||||
except requests.RequestException as e:
|
except requests.RequestException as e:
|
||||||
@ -40,6 +45,13 @@ def sync_center():
|
|||||||
tasks = []
|
tasks = []
|
||||||
templates = []
|
templates = []
|
||||||
logger.warning("获取任务失败")
|
logger.warning("获取任务失败")
|
||||||
|
if os.getenv("REDIRECT_TO_URL", False) != False:
|
||||||
|
for task in tasks:
|
||||||
|
_sess = requests.Session()
|
||||||
|
logger.info("重定向任务【%s】至配置的地址:%s", task.get("id"), os.getenv("REDIRECT_TO_URL"))
|
||||||
|
url = f"{os.getenv('REDIRECT_TO_URL')}{task.get('id')}"
|
||||||
|
threading.Thread(target=requests.post, args=(url,)).start()
|
||||||
|
return []
|
||||||
for template in templates:
|
for template in templates:
|
||||||
template_id = template.get('id', '')
|
template_id = template.get('id', '')
|
||||||
if template_id:
|
if template_id:
|
||||||
@ -56,119 +68,189 @@ def get_template_info(template_id):
|
|||||||
:type template_id: str
|
:type template_id: str
|
||||||
:return: 模板信息
|
:return: 模板信息
|
||||||
"""
|
"""
|
||||||
try:
|
tracer = get_tracer(__name__)
|
||||||
response = session.post('{0}/template/{1}'.format(os.getenv('API_ENDPOINT'), template_id), json={
|
with tracer.start_as_current_span("get_template_info"):
|
||||||
'accessKey': os.getenv('ACCESS_KEY'),
|
with tracer.start_as_current_span("get_template_info.request") as req_span:
|
||||||
}, timeout=10)
|
try:
|
||||||
response.raise_for_status()
|
req_span.set_attribute("http.method", "POST")
|
||||||
except requests.RequestException as e:
|
req_span.set_attribute("http.url", '{0}/template/{1}'.format(os.getenv('API_ENDPOINT'), template_id))
|
||||||
logger.error("请求失败!", e)
|
response = session.post('{0}/template/{1}'.format(os.getenv('API_ENDPOINT'), template_id), json={
|
||||||
return None
|
'accessKey': os.getenv('ACCESS_KEY'),
|
||||||
data = response.json()
|
}, timeout=10)
|
||||||
remote_template_info = data.get('data', {})
|
req_span.set_attribute("http.status_code", response.status_code)
|
||||||
logger.debug("获取模板信息结果:【%s】", remote_template_info)
|
req_span.set_attribute("http.response", response.text)
|
||||||
template = {
|
response.raise_for_status()
|
||||||
'id': template_id,
|
except requests.RequestException as e:
|
||||||
'updateTime': remote_template_info.get('updateTime', template_id),
|
req_span.set_attribute("api.error", str(e))
|
||||||
'scenic_name': remote_template_info.get('scenicName', '景区'),
|
logger.error("请求失败!", e)
|
||||||
'name': remote_template_info.get('name', '模版'),
|
return None
|
||||||
'video_size': '1920x1080',
|
data = response.json()
|
||||||
'frame_rate': 30,
|
logger.debug("获取模板信息结果:【%s】", data)
|
||||||
'overall_duration': 30,
|
remote_template_info = data.get('data', {})
|
||||||
'video_parts': [
|
template = {
|
||||||
|
'id': template_id,
|
||||||
|
'updateTime': remote_template_info.get('updateTime', template_id),
|
||||||
|
'scenic_name': remote_template_info.get('scenicName', '景区'),
|
||||||
|
'name': remote_template_info.get('name', '模版'),
|
||||||
|
'video_size': remote_template_info.get('resolution', '1920x1080'),
|
||||||
|
'frame_rate': 25,
|
||||||
|
'overall_duration': 30,
|
||||||
|
'video_parts': [
|
||||||
|
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
|
|
||||||
def _template_normalizer(template_info):
|
def _template_normalizer(template_info):
|
||||||
_template = {}
|
_template = {}
|
||||||
_placeholder_type = template_info.get('isPlaceholder', -1)
|
_placeholder_type = template_info.get('isPlaceholder', -1)
|
||||||
if _placeholder_type == 0:
|
if _placeholder_type == 0:
|
||||||
# 固定视频
|
# 固定视频
|
||||||
_template['source'] = template_info.get('sourceUrl', '')
|
_template['source'] = template_info.get('sourceUrl', '')
|
||||||
elif _placeholder_type == 1:
|
elif _placeholder_type == 1:
|
||||||
# 占位符
|
# 占位符
|
||||||
_template['source'] = "PLACEHOLDER_" + template_info.get('sourceUrl', '')
|
_template['source'] = "PLACEHOLDER_" + template_info.get('sourceUrl', '')
|
||||||
_template['mute'] = template_info.get('mute', True)
|
_template['mute'] = template_info.get('mute', True)
|
||||||
else:
|
_template['crop_mode'] = template_info.get('cropEnable', None)
|
||||||
_template['source'] = None
|
else:
|
||||||
_overlays = template_info.get('overlays', '')
|
_template['source'] = None
|
||||||
if _overlays:
|
_overlays = template_info.get('overlays', '')
|
||||||
_template['overlays'] = _overlays.split(",")
|
if _overlays:
|
||||||
_audios = template_info.get('audios', '')
|
_template['overlays'] = _overlays.split(",")
|
||||||
if _audios:
|
_audios = template_info.get('audios', '')
|
||||||
_template['audios'] = _audios.split(",")
|
if _audios:
|
||||||
_luts = template_info.get('luts', '')
|
_template['audios'] = _audios.split(",")
|
||||||
if _luts:
|
_luts = template_info.get('luts', '')
|
||||||
_template['luts'] = _luts.split(",")
|
if _luts:
|
||||||
return _template
|
_template['luts'] = _luts.split(",")
|
||||||
|
_only_if = template_info.get('onlyIf', '')
|
||||||
|
if _only_if:
|
||||||
|
_template['only_if'] = _only_if
|
||||||
|
_effects = template_info.get('effects', '')
|
||||||
|
if _effects:
|
||||||
|
_template['effects'] = _effects.split("|")
|
||||||
|
return _template
|
||||||
|
|
||||||
# outer template definition
|
# outer template definition
|
||||||
overall_template = _template_normalizer(remote_template_info)
|
overall_template = _template_normalizer(remote_template_info)
|
||||||
template['overall_template'] = overall_template
|
template['overall_template'] = overall_template
|
||||||
# inter template definition
|
# inter template definition
|
||||||
inter_template_list = remote_template_info.get('children', [])
|
inter_template_list = remote_template_info.get('children', [])
|
||||||
for children_template in inter_template_list:
|
for children_template in inter_template_list:
|
||||||
parts = _template_normalizer(children_template)
|
parts = _template_normalizer(children_template)
|
||||||
template['video_parts'].append(parts)
|
template['video_parts'].append(parts)
|
||||||
template['local_path'] = os.path.join(os.getenv('TEMPLATE_DIR'), str(template_id))
|
template['local_path'] = os.path.join(os.getenv('TEMPLATE_DIR'), str(template_id))
|
||||||
return template
|
with get_tracer("api").start_as_current_span("get_template_info.template") as res_span:
|
||||||
|
res_span.set_attribute("normalized.response", json.dumps(template))
|
||||||
|
return template
|
||||||
|
|
||||||
|
|
||||||
def report_task_success(task_info, **kwargs):
|
def report_task_success(task_info, **kwargs):
|
||||||
try:
|
tracer = get_tracer(__name__)
|
||||||
response = session.post('{0}/{1}/success'.format(os.getenv('API_ENDPOINT'), task_info.get("id")), json={
|
with tracer.start_as_current_span("report_task_success"):
|
||||||
'accessKey': os.getenv('ACCESS_KEY'),
|
with tracer.start_as_current_span("report_task_success.request") as req_span:
|
||||||
**kwargs
|
try:
|
||||||
}, timeout=10)
|
req_span.set_attribute("http.method", "POST")
|
||||||
response.raise_for_status()
|
req_span.set_attribute("http.url",
|
||||||
except requests.RequestException as e:
|
'{0}/{1}/success'.format(os.getenv('API_ENDPOINT'), task_info.get("id")))
|
||||||
logger.error("请求失败!", e)
|
response = session.post('{0}/{1}/success'.format(os.getenv('API_ENDPOINT'), task_info.get("id")), json={
|
||||||
return None
|
'accessKey': os.getenv('ACCESS_KEY'),
|
||||||
|
**kwargs
|
||||||
|
}, timeout=10)
|
||||||
|
req_span.set_attribute("http.status_code", response.status_code)
|
||||||
|
req_span.set_attribute("http.response", response.text)
|
||||||
|
response.raise_for_status()
|
||||||
|
req_span.set_status(Status(StatusCode.OK))
|
||||||
|
except requests.RequestException as e:
|
||||||
|
req_span.set_attribute("api.error", str(e))
|
||||||
|
logger.error("请求失败!", e)
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
def report_task_start(task_info):
|
def report_task_start(task_info):
|
||||||
try:
|
tracer = get_tracer(__name__)
|
||||||
response = session.post('{0}/{1}/start'.format(os.getenv('API_ENDPOINT'), task_info.get("id")), json={
|
with tracer.start_as_current_span("report_task_start"):
|
||||||
'accessKey': os.getenv('ACCESS_KEY'),
|
with tracer.start_as_current_span("report_task_start.request") as req_span:
|
||||||
}, timeout=10)
|
try:
|
||||||
response.raise_for_status()
|
req_span.set_attribute("http.method", "POST")
|
||||||
except requests.RequestException as e:
|
req_span.set_attribute("http.url",
|
||||||
logger.error("请求失败!", e)
|
'{0}/{1}/start'.format(os.getenv('API_ENDPOINT'), task_info.get("id")))
|
||||||
return None
|
response = session.post('{0}/{1}/start'.format(os.getenv('API_ENDPOINT'), task_info.get("id")), json={
|
||||||
|
'accessKey': os.getenv('ACCESS_KEY'),
|
||||||
|
}, timeout=10)
|
||||||
|
req_span.set_attribute("http.status_code", response.status_code)
|
||||||
|
req_span.set_attribute("http.response", response.text)
|
||||||
|
response.raise_for_status()
|
||||||
|
req_span.set_status(Status(StatusCode.OK))
|
||||||
|
except requests.RequestException as e:
|
||||||
|
req_span.set_attribute("api.error", str(e))
|
||||||
|
logger.error("请求失败!", e)
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
def report_task_failed(task_info, reason=''):
|
def report_task_failed(task_info, reason=''):
|
||||||
try:
|
tracer = get_tracer(__name__)
|
||||||
response = session.post('{0}/{1}/fail'.format(os.getenv('API_ENDPOINT'), task_info.get("id")), json={
|
with tracer.start_as_current_span("report_task_failed") as span:
|
||||||
'accessKey': os.getenv('ACCESS_KEY'),
|
span.set_attribute("task_id", task_info.get("id"))
|
||||||
'reason': reason
|
span.set_attribute("reason", reason)
|
||||||
}, timeout=10)
|
with tracer.start_as_current_span("report_task_failed.request") as req_span:
|
||||||
response.raise_for_status()
|
try:
|
||||||
except requests.RequestException as e:
|
req_span.set_attribute("http.method", "POST")
|
||||||
logger.error("请求失败!", e)
|
req_span.set_attribute("http.url",
|
||||||
return None
|
'{0}/{1}/fail'.format(os.getenv('API_ENDPOINT'), task_info.get("id")))
|
||||||
|
response = session.post('{0}/{1}/fail'.format(os.getenv('API_ENDPOINT'), task_info.get("id")), json={
|
||||||
|
'accessKey': os.getenv('ACCESS_KEY'),
|
||||||
|
'reason': reason
|
||||||
|
}, timeout=10)
|
||||||
|
req_span.set_attribute("http.status_code", response.status_code)
|
||||||
|
req_span.set_attribute("http.response", response.text)
|
||||||
|
response.raise_for_status()
|
||||||
|
req_span.set_status(Status(StatusCode.OK))
|
||||||
|
except requests.RequestException as e:
|
||||||
|
req_span.set_attribute("api.error", str(e))
|
||||||
|
req_span.set_status(Status(StatusCode.ERROR))
|
||||||
|
logger.error("请求失败!", e)
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
def upload_task_file(task_info, ffmpeg_task):
|
def upload_task_file(task_info, ffmpeg_task):
|
||||||
logger.info("开始上传文件: %s", task_info.get("id"))
|
tracer = get_tracer(__name__)
|
||||||
|
with get_tracer("api").start_as_current_span("upload_task_file") as span:
|
||||||
|
logger.info("开始上传文件: %s", task_info.get("id"))
|
||||||
|
span.set_attribute("file.id", task_info.get("id"))
|
||||||
|
with tracer.start_as_current_span("upload_task_file.request_upload_url") as req_span:
|
||||||
|
try:
|
||||||
|
req_span.set_attribute("http.method", "POST")
|
||||||
|
req_span.set_attribute("http.url",
|
||||||
|
'{0}/{1}/uploadUrl'.format(os.getenv('API_ENDPOINT'), task_info.get("id")))
|
||||||
|
response = session.post('{0}/{1}/uploadUrl'.format(os.getenv('API_ENDPOINT'), task_info.get("id")),
|
||||||
|
json={
|
||||||
|
'accessKey': os.getenv('ACCESS_KEY'),
|
||||||
|
}, timeout=10)
|
||||||
|
req_span.set_attribute("http.status_code", response.status_code)
|
||||||
|
req_span.set_attribute("http.response", response.text)
|
||||||
|
response.raise_for_status()
|
||||||
|
req_span.set_status(Status(StatusCode.OK))
|
||||||
|
except requests.RequestException as e:
|
||||||
|
span.set_attribute("api.error", str(e))
|
||||||
|
req_span.set_status(Status(StatusCode.ERROR))
|
||||||
|
logger.error("请求失败!", e)
|
||||||
|
return False
|
||||||
|
data = response.json()
|
||||||
|
url = data.get('data', "")
|
||||||
|
logger.info("开始上传文件: %s 至 %s", task_info.get("id"), url)
|
||||||
|
return oss.upload_to_oss(url, ffmpeg_task.get_output_file())
|
||||||
|
|
||||||
|
|
||||||
|
def get_task_info(id):
|
||||||
try:
|
try:
|
||||||
response = session.post('{0}/{1}/uploadUrl'.format(os.getenv('API_ENDPOINT'), task_info.get("id")), json={
|
response = session.get(os.getenv('API_ENDPOINT') + "/" + id + "/info", params={
|
||||||
'accessKey': os.getenv('ACCESS_KEY'),
|
'accessKey': os.getenv('ACCESS_KEY'),
|
||||||
}, timeout=10)
|
}, timeout=10)
|
||||||
response.raise_for_status()
|
response.raise_for_status()
|
||||||
except requests.RequestException as e:
|
except requests.RequestException as e:
|
||||||
logger.error("请求失败!", e)
|
logger.error("请求失败!", e)
|
||||||
return False
|
return []
|
||||||
data = response.json()
|
data = response.json()
|
||||||
url = data.get('data', "")
|
logger.debug("获取任务结果:【%s】", data)
|
||||||
logger.info("开始上传文件: %s 至 %s", task_info.get("id"), url)
|
if data.get('code', 0) == 200:
|
||||||
try:
|
return data.get('data', {})
|
||||||
with open(ffmpeg_task.get_output_file(), 'rb') as f:
|
|
||||||
requests.put(url, data=f)
|
|
||||||
except requests.RequestException as e:
|
|
||||||
logger.error("上传失败!", e)
|
|
||||||
return False
|
|
||||||
finally:
|
|
||||||
logger.info("上传文件结束: %s", task_info.get("id"))
|
|
||||||
return True
|
|
191
util/ffmpeg.py
191
util/ffmpeg.py
@ -1,40 +1,87 @@
|
|||||||
|
import json
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
import subprocess
|
import subprocess
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from typing import Optional, IO
|
from typing import Optional, IO
|
||||||
|
|
||||||
from entity.ffmpeg import FfmpegTask
|
from opentelemetry.trace import Status, StatusCode
|
||||||
|
|
||||||
|
from entity.ffmpeg import FfmpegTask, ENCODER_ARGS, VIDEO_ARGS, AUDIO_ARGS, MUTE_AUDIO_INPUT
|
||||||
|
from telemetry import get_tracer
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
def to_annexb(file):
|
def re_encode_and_annexb(file):
|
||||||
if not os.path.exists(file):
|
with get_tracer("ffmpeg").start_as_current_span("re_encode_and_annexb") as span:
|
||||||
return file
|
span.set_attribute("file.path", file)
|
||||||
logger.info("ToAnnexb: %s", file)
|
if not os.path.exists(file):
|
||||||
ffmpeg_process = subprocess.run(["ffmpeg", "-y", "-hide_banner", "-i", file, "-c", "copy", "-bsf:v", "h264_mp4toannexb",
|
span.set_status(Status(StatusCode.ERROR))
|
||||||
"-f", "mpegts", file+".ts"])
|
return file
|
||||||
logger.info("ToAnnexb: %s, returned: %s", file, ffmpeg_process.returncode)
|
logger.info("ReEncodeAndAnnexb: %s", file)
|
||||||
if ffmpeg_process.returncode == 0:
|
has_audio = not not probe_video_audio(file)
|
||||||
os.remove(file)
|
ffmpeg_process = subprocess.run(["ffmpeg", "-y", "-hide_banner", "-vsync", "cfr", "-i", file,
|
||||||
return file+".ts"
|
*(set() if has_audio else MUTE_AUDIO_INPUT),
|
||||||
else:
|
"-map", "0:v", "-map", "0:a" if has_audio else "1:a",
|
||||||
return file
|
*VIDEO_ARGS, "-bsf:v", "h264_mp4toannexb",
|
||||||
|
*AUDIO_ARGS, "-bsf:a", "setts=pts=DTS",
|
||||||
|
*ENCODER_ARGS, "-shortest", "-fflags", "+genpts",
|
||||||
|
"-f", "mpegts", file + ".ts"])
|
||||||
|
logger.info(" ".join(ffmpeg_process.args))
|
||||||
|
span.set_attribute("ffmpeg.args", json.dumps(ffmpeg_process.args))
|
||||||
|
logger.info("ReEncodeAndAnnexb: %s, returned: %s", file, ffmpeg_process.returncode)
|
||||||
|
span.set_attribute("ffmpeg.code", ffmpeg_process.returncode)
|
||||||
|
if ffmpeg_process.returncode == 0:
|
||||||
|
span.set_status(Status(StatusCode.OK))
|
||||||
|
span.set_attribute("file.size", os.path.getsize(file+".ts"))
|
||||||
|
# os.remove(file)
|
||||||
|
return file+".ts"
|
||||||
|
else:
|
||||||
|
span.set_status(Status(StatusCode.ERROR))
|
||||||
|
return file
|
||||||
|
|
||||||
def start_render(ffmpeg_task: FfmpegTask):
|
def start_render(ffmpeg_task: FfmpegTask):
|
||||||
logger.info(ffmpeg_task)
|
tracer = get_tracer(__name__)
|
||||||
if not ffmpeg_task.need_run():
|
with tracer.start_as_current_span("start_render") as span:
|
||||||
ffmpeg_task.set_output_file(ffmpeg_task.input_file[0])
|
span.set_attribute("ffmpeg.task", str(ffmpeg_task))
|
||||||
|
if not ffmpeg_task.need_run():
|
||||||
|
ffmpeg_task.set_output_file(ffmpeg_task.input_file[0])
|
||||||
|
span.set_status(Status(StatusCode.OK))
|
||||||
|
return True
|
||||||
|
ffmpeg_args = ffmpeg_task.get_ffmpeg_args()
|
||||||
|
if len(ffmpeg_args) == 0:
|
||||||
|
ffmpeg_task.set_output_file(ffmpeg_task.input_file[0])
|
||||||
|
span.set_status(Status(StatusCode.OK))
|
||||||
|
return True
|
||||||
|
ffmpeg_process = subprocess.run(["ffmpeg", "-progress", "-", "-loglevel", "error", *ffmpeg_args], stderr=subprocess.PIPE, **subprocess_args(True))
|
||||||
|
span.set_attribute("ffmpeg.args", json.dumps(ffmpeg_process.args))
|
||||||
|
logger.info(" ".join(ffmpeg_process.args))
|
||||||
|
ffmpeg_final_out = handle_ffmpeg_output(ffmpeg_process.stdout)
|
||||||
|
span.set_attribute("ffmpeg.out", ffmpeg_final_out)
|
||||||
|
logger.info("FINISH TASK, OUTPUT IS %s", ffmpeg_final_out)
|
||||||
|
code = ffmpeg_process.returncode
|
||||||
|
span.set_attribute("ffmpeg.code", code)
|
||||||
|
if code != 0:
|
||||||
|
span.set_attribute("ffmpeg.err", str(ffmpeg_process.stderr))
|
||||||
|
span.set_status(Status(StatusCode.ERROR, "FFMPEG异常退出"))
|
||||||
|
logger.error("FFMPEG ERROR: %s", ffmpeg_process.stderr)
|
||||||
|
return False
|
||||||
|
span.set_attribute("ffmpeg.out_file", ffmpeg_task.output_file)
|
||||||
|
try:
|
||||||
|
file_size = os.path.getsize(ffmpeg_task.output_file)
|
||||||
|
span.set_attribute("file.size", file_size)
|
||||||
|
if file_size < 4096:
|
||||||
|
span.set_status(Status(StatusCode.ERROR, "输出文件过小"))
|
||||||
|
logger.error("FFMPEG ERROR: OUTPUT FILE IS TOO SMALL")
|
||||||
|
return False
|
||||||
|
except OSError as e:
|
||||||
|
span.set_attribute("file.size", 0)
|
||||||
|
span.set_attribute("file.error", e.strerror)
|
||||||
|
span.set_status(Status(StatusCode.ERROR, "输出文件不存在"))
|
||||||
|
logger.error("FFMPEG ERROR: OUTPUT FILE NOT FOUND")
|
||||||
|
return False
|
||||||
|
span.set_status(Status(StatusCode.OK))
|
||||||
return True
|
return True
|
||||||
ffmpeg_args = ffmpeg_task.get_ffmpeg_args()
|
|
||||||
logger.info(ffmpeg_args)
|
|
||||||
if len(ffmpeg_args) == 0:
|
|
||||||
ffmpeg_task.set_output_file(ffmpeg_task.input_file[0])
|
|
||||||
return True
|
|
||||||
ffmpeg_process = subprocess.run(["ffmpeg", "-progress", "-", "-loglevel", "error", *ffmpeg_args], **subprocess_args(True))
|
|
||||||
logger.info("FINISH TASK, OUTPUT IS %s", handle_ffmpeg_output(ffmpeg_process.stdout))
|
|
||||||
code = ffmpeg_process.returncode
|
|
||||||
return code == 0
|
|
||||||
|
|
||||||
def handle_ffmpeg_output(stdout: Optional[bytes]) -> str:
|
def handle_ffmpeg_output(stdout: Optional[bytes]) -> str:
|
||||||
out_time = "0:0:0.0"
|
out_time = "0:0:0.0"
|
||||||
@ -55,25 +102,95 @@ def handle_ffmpeg_output(stdout: Optional[bytes]) -> str:
|
|||||||
print("[ ]Speed:", out_time, "@", speed)
|
print("[ ]Speed:", out_time, "@", speed)
|
||||||
return out_time+"@"+speed
|
return out_time+"@"+speed
|
||||||
|
|
||||||
|
|
||||||
def duration_str_to_float(duration_str: str) -> float:
|
def duration_str_to_float(duration_str: str) -> float:
|
||||||
_duration = datetime.strptime(duration_str, "%H:%M:%S.%f") - datetime(1900, 1, 1)
|
_duration = datetime.strptime(duration_str, "%H:%M:%S.%f") - datetime(1900, 1, 1)
|
||||||
return _duration.total_seconds()
|
return _duration.total_seconds()
|
||||||
|
|
||||||
|
|
||||||
def probe_video_info(video_file):
|
def probe_video_info(video_file):
|
||||||
# 获取宽度和高度
|
tracer = get_tracer(__name__)
|
||||||
result = subprocess.run(
|
with tracer.start_as_current_span("probe_video_info") as span:
|
||||||
["ffprobe.exe", '-v', 'error', '-select_streams', 'v:0', '-show_entries', 'stream=width,height:format=duration', '-of',
|
span.set_attribute("video.file", video_file)
|
||||||
'csv=s=x:p=0', video_file],
|
# 获取宽度和高度
|
||||||
stderr=subprocess.STDOUT,
|
result = subprocess.run(
|
||||||
**subprocess_args(True)
|
["ffprobe", '-v', 'error', '-select_streams', 'v:0', '-show_entries', 'stream=width,height:format=duration', '-of',
|
||||||
)
|
'csv=s=x:p=0', video_file],
|
||||||
all_result = result.stdout.decode('utf-8').strip()
|
stderr=subprocess.STDOUT,
|
||||||
wh, duration = all_result.split('\n')
|
**subprocess_args(True)
|
||||||
width, height = wh.strip().split('x')
|
)
|
||||||
|
span.set_attribute("ffprobe.args", json.dumps(result.args))
|
||||||
|
span.set_attribute("ffprobe.code", result.returncode)
|
||||||
|
if result.returncode != 0:
|
||||||
|
span.set_status(Status(StatusCode.ERROR))
|
||||||
|
return 0, 0, 0
|
||||||
|
all_result = result.stdout.decode('utf-8').strip()
|
||||||
|
span.set_attribute("ffprobe.out", all_result)
|
||||||
|
if all_result == '':
|
||||||
|
span.set_status(Status(StatusCode.ERROR))
|
||||||
|
return 0, 0, 0
|
||||||
|
span.set_status(Status(StatusCode.OK))
|
||||||
|
wh, duration = all_result.split('\n')
|
||||||
|
width, height = wh.strip().split('x')
|
||||||
|
return int(width), int(height), float(duration)
|
||||||
|
|
||||||
|
|
||||||
|
def probe_video_audio(video_file, type=None):
|
||||||
|
tracer = get_tracer(__name__)
|
||||||
|
with tracer.start_as_current_span("probe_video_audio") as span:
|
||||||
|
span.set_attribute("video.file", video_file)
|
||||||
|
args = ["ffprobe", "-hide_banner", "-v", "error", "-select_streams", "a", "-show_entries", "stream=index", "-of", "csv=p=0"]
|
||||||
|
if type == 'concat':
|
||||||
|
args.append("-safe")
|
||||||
|
args.append("0")
|
||||||
|
args.append("-f")
|
||||||
|
args.append("concat")
|
||||||
|
args.append(video_file)
|
||||||
|
logger.info(" ".join(args))
|
||||||
|
result = subprocess.run(args, stderr=subprocess.STDOUT, **subprocess_args(True))
|
||||||
|
span.set_attribute("ffprobe.args", json.dumps(result.args))
|
||||||
|
span.set_attribute("ffprobe.code", result.returncode)
|
||||||
|
logger.info("probe_video_audio: %s", result.stdout.decode('utf-8').strip())
|
||||||
|
if result.returncode != 0:
|
||||||
|
return False
|
||||||
|
if result.stdout.decode('utf-8').strip() == '':
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
# 音频淡出2秒
|
||||||
|
def fade_out_audio(file, duration, fade_out_sec = 2):
|
||||||
|
if type(duration) == str:
|
||||||
|
try:
|
||||||
|
duration = float(duration)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("duration is not float: %s", e)
|
||||||
|
return file
|
||||||
|
tracer = get_tracer(__name__)
|
||||||
|
with tracer.start_as_current_span("fade_out_audio") as span:
|
||||||
|
span.set_attribute("audio.file", file)
|
||||||
|
if duration <= fade_out_sec:
|
||||||
|
return file
|
||||||
|
else:
|
||||||
|
new_fn = file + "_.mp4"
|
||||||
|
if os.path.exists(new_fn):
|
||||||
|
os.remove(new_fn)
|
||||||
|
logger.info("delete tmp file: " + new_fn)
|
||||||
|
try:
|
||||||
|
process = subprocess.run(["ffmpeg", "-i", file, "-c:v", "copy", "-c:a", "aac", "-af", "afade=t=out:st=" + str(duration - fade_out_sec) + ":d=" + str(fade_out_sec), "-y", new_fn], **subprocess_args(True))
|
||||||
|
span.set_attribute("ffmpeg.args", json.dumps(process.args))
|
||||||
|
logger.info(" ".join(process.args))
|
||||||
|
if process.returncode != 0:
|
||||||
|
span.set_status(Status(StatusCode.ERROR))
|
||||||
|
logger.error("FFMPEG ERROR: %s", process.stderr)
|
||||||
|
return file
|
||||||
|
else:
|
||||||
|
span.set_status(Status(StatusCode.OK))
|
||||||
|
return new_fn
|
||||||
|
except Exception as e:
|
||||||
|
span.set_status(Status(StatusCode.ERROR))
|
||||||
|
logger.error("FFMPEG ERROR: %s", e)
|
||||||
|
return file
|
||||||
|
|
||||||
return int(width), int(height), float(duration)
|
|
||||||
|
|
||||||
|
|
||||||
# Create a set of arguments which make a ``subprocess.Popen`` (and
|
# Create a set of arguments which make a ``subprocess.Popen`` (and
|
||||||
|
94
util/oss.py
94
util/oss.py
@ -2,6 +2,9 @@ import logging
|
|||||||
import os
|
import os
|
||||||
|
|
||||||
import requests
|
import requests
|
||||||
|
from opentelemetry.trace import Status, StatusCode
|
||||||
|
|
||||||
|
from telemetry import get_tracer
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
@ -13,13 +16,40 @@ def upload_to_oss(url, file_path):
|
|||||||
:param str file_path: 文件路径
|
:param str file_path: 文件路径
|
||||||
:return bool: 是否成功
|
:return bool: 是否成功
|
||||||
"""
|
"""
|
||||||
with open(file_path, 'rb') as f:
|
tracer = get_tracer(__name__)
|
||||||
try:
|
with tracer.start_as_current_span("upload_to_oss") as span:
|
||||||
response = requests.put(url, data=f)
|
span.set_attribute("file.url", url)
|
||||||
return response.status_code == 200
|
span.set_attribute("file.path", file_path)
|
||||||
except Exception as e:
|
span.set_attribute("file.size", os.path.getsize(file_path))
|
||||||
print(e)
|
max_retries = 5
|
||||||
return False
|
retries = 0
|
||||||
|
while retries < max_retries:
|
||||||
|
with tracer.start_as_current_span("upload_to_oss.request") as req_span:
|
||||||
|
req_span.set_attribute("http.retry_count", retries)
|
||||||
|
try:
|
||||||
|
req_span.set_attribute("http.method", "PUT")
|
||||||
|
req_span.set_attribute("http.url", url)
|
||||||
|
with open(file_path, 'rb') as f:
|
||||||
|
response = requests.put(url, data=f, stream=True, timeout=60, headers={"Content-Type": "video/mp4"})
|
||||||
|
req_span.set_attribute("http.status_code", response.status_code)
|
||||||
|
req_span.set_attribute("http.response", response.text)
|
||||||
|
response.raise_for_status()
|
||||||
|
req_span.set_status(Status(StatusCode.OK))
|
||||||
|
span.set_status(Status(StatusCode.OK))
|
||||||
|
return True
|
||||||
|
except requests.exceptions.Timeout:
|
||||||
|
req_span.set_attribute("http.error", "Timeout")
|
||||||
|
req_span.set_status(Status(StatusCode.ERROR))
|
||||||
|
retries += 1
|
||||||
|
logger.warning(f"Upload timed out. Retrying {retries}/{max_retries}...")
|
||||||
|
except Exception as e:
|
||||||
|
req_span.set_attribute("http.error", str(e))
|
||||||
|
req_span.set_status(Status(StatusCode.ERROR))
|
||||||
|
retries += 1
|
||||||
|
logger.warning(f"Upload failed. Retrying {retries}/{max_retries}...")
|
||||||
|
span.set_status(Status(StatusCode.ERROR))
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
def download_from_oss(url, file_path):
|
def download_from_oss(url, file_path):
|
||||||
"""
|
"""
|
||||||
@ -28,16 +58,40 @@ def download_from_oss(url, file_path):
|
|||||||
:param Union[LiteralString, str, bytes] file_path: 文件路径
|
:param Union[LiteralString, str, bytes] file_path: 文件路径
|
||||||
:return bool: 是否成功
|
:return bool: 是否成功
|
||||||
"""
|
"""
|
||||||
logging.info("download_from_oss: %s", url)
|
tracer = get_tracer(__name__)
|
||||||
file_dir, file_name = os.path.split(file_path)
|
with tracer.start_as_current_span("download_from_oss") as span:
|
||||||
if file_dir:
|
span.set_attribute("file.url", url)
|
||||||
if not os.path.exists(file_dir):
|
span.set_attribute("file.path", file_path)
|
||||||
os.makedirs(file_dir)
|
logging.info("download_from_oss: %s", url)
|
||||||
try:
|
file_dir, file_name = os.path.split(file_path)
|
||||||
response = requests.get(url)
|
if file_dir:
|
||||||
with open(file_path, 'wb') as f:
|
if not os.path.exists(file_dir):
|
||||||
f.write(response.content)
|
os.makedirs(file_dir)
|
||||||
return True
|
max_retries = 5
|
||||||
except Exception as e:
|
retries = 0
|
||||||
print(e)
|
while retries < max_retries:
|
||||||
return False
|
with tracer.start_as_current_span("download_from_oss.request") as req_span:
|
||||||
|
req_span.set_attribute("http.retry_count", retries)
|
||||||
|
try:
|
||||||
|
req_span.set_attribute("http.method", "GET")
|
||||||
|
req_span.set_attribute("http.url", url)
|
||||||
|
response = requests.get(url, timeout=15) # 设置超时时间
|
||||||
|
req_span.set_attribute("http.status_code", response.status_code)
|
||||||
|
with open(file_path, 'wb') as f:
|
||||||
|
f.write(response.content)
|
||||||
|
req_span.set_attribute("file.size", os.path.getsize(file_path))
|
||||||
|
req_span.set_status(Status(StatusCode.OK))
|
||||||
|
span.set_status(Status(StatusCode.OK))
|
||||||
|
return True
|
||||||
|
except requests.exceptions.Timeout:
|
||||||
|
req_span.set_attribute("http.error", "Timeout")
|
||||||
|
req_span.set_status(Status(StatusCode.ERROR))
|
||||||
|
retries += 1
|
||||||
|
logger.warning(f"Download timed out. Retrying {retries}/{max_retries}...")
|
||||||
|
except Exception as e:
|
||||||
|
req_span.set_attribute("http.error", str(e))
|
||||||
|
req_span.set_status(Status(StatusCode.ERROR))
|
||||||
|
retries += 1
|
||||||
|
logger.warning(f"Download failed. Retrying {retries}/{max_retries}...")
|
||||||
|
span.set_status(Status(StatusCode.ERROR))
|
||||||
|
return False
|
||||||
|
Loading…
x
Reference in New Issue
Block a user