Compare commits

..

5 Commits

Author SHA1 Message Date
f12422b346 Merge branch 'master' into windows_nvidia
# Conflicts:
#	entity/ffmpeg.py
2025-03-06 14:06:38 +08:00
94b08dfcb5 Merge branch 'master' into windows_nvidia 2025-03-04 17:48:57 +08:00
d8ab94fcba ffmpeg 参数 2025-03-04 16:11:56 +08:00
9385945030 Merge branch 'master' into windows_nvidia 2025-03-04 12:40:30 +08:00
9041093324 windows nvidia 2025-02-27 16:49:48 +08:00
13 changed files with 202 additions and 447 deletions

7
.env
View File

@ -1,9 +1,4 @@
TEMPLATE_DIR=template/ TEMPLATE_DIR=template/
API_ENDPOINT=https://zhentuai.com/task/v1 API_ENDPOINT=http://127.0.0.1:8030/task/v1
ACCESS_KEY=TEST_ACCESS_KEY ACCESS_KEY=TEST_ACCESS_KEY
TEMP_DIR=tmp/ TEMP_DIR=tmp/
#REDIRECT_TO_URL=https://worker-renderworker-re-kekuflqjxx.cn-shanghai.fcapp.run/
# QSV
ENCODER_ARGS="-c:v h264_qsv -global_quality 28 -look_ahead 1"
# NVENC
#ENCODER_ARGS="-c:v h264_nvenc -cq:v 24 -preset:v p7 -tune:v hq -profile:v high"

View File

@ -1,21 +0,0 @@
FROM linuxserver/ffmpeg:7.1.1
LABEL authors="Jerry Yan"
RUN sed -i 's@//.*archive.ubuntu.com@//mirrors.ustc.edu.cn@g' /etc/apt/sources.list && \
sed -i 's/security.ubuntu.com/mirrors.ustc.edu.cn/g' /etc/apt/sources.list
RUN apt-get update && \
apt-get install -y --no-install-recommends \
python3-pip \
python3-dev \
python3-setuptools \
python3-wheel \
python3-venv
RUN apt-get clean && \
rm -rf /var/lib/apt/lists/*
COPY . /app/
RUN python3 -m venv /app/venv
RUN /app/venv/bin/python -m pip config set global.index-url https://mirrors.ustc.edu.cn/pypi/simple
RUN /app/venv/bin/python -m pip install -r /app/requirements.txt
WORKDIR /app
ENTRYPOINT ["/app/venv/bin/python", "app.py"]

40
app.py
View File

@ -1,40 +0,0 @@
import time
import flask
import config
import biz.task
import template
from telemetry import init_opentelemetry
from template import load_local_template
from util import api
load_local_template()
import logging
LOGGER = logging.getLogger(__name__)
init_opentelemetry(batch=False)
app = flask.Flask(__name__)
@app.get('/health/check')
def health_check():
return api.sync_center()
@app.post('/')
def do_nothing():
return "NOOP"
@app.post('/<task_id>')
def do_task(task_id):
task_info = api.get_task_info(task_id)
local_template_info = template.get_template_def(task_info.get("templateId"))
template_info = api.get_template_info(task_info.get("templateId"))
if local_template_info:
if local_template_info.get("updateTime") != template_info.get("updateTime"):
template.download_template(task_info.get("templateId"))
biz.task.start_task(task_info)
return "OK"
if __name__ == '__main__':
app.run(host="0.0.0.0", port=9998)

View File

@ -2,13 +2,10 @@ import json
import os.path import os.path
import time import time
from opentelemetry.trace import Status, StatusCode
from entity.ffmpeg import FfmpegTask from entity.ffmpeg import FfmpegTask
import logging import logging
from util import ffmpeg, oss from util import ffmpeg, oss
from util.ffmpeg import fade_out_audio
from telemetry import get_tracer from telemetry import get_tracer
logger = logging.getLogger('biz/ffmpeg') logger = logging.getLogger('biz/ffmpeg')
@ -16,27 +13,24 @@ logger = logging.getLogger('biz/ffmpeg')
def parse_ffmpeg_task(task_info, template_info): def parse_ffmpeg_task(task_info, template_info):
tracer = get_tracer(__name__) tracer = get_tracer(__name__)
with tracer.start_as_current_span("parse_ffmpeg_task") as span: with tracer.start_as_current_span("parse_ffmpeg_task"):
tasks = [] tasks = []
# 中间片段 # 中间片段
task_params_str = task_info.get("taskParams", "{}") task_params_str = task_info.get("taskParams", "{}")
span.set_attribute("task_params", task_params_str)
task_params = json.loads(task_params_str) task_params = json.loads(task_params_str)
task_params_orig = json.loads(task_params_str)
for part in template_info.get("video_parts"): for part in template_info.get("video_parts"):
source, ext_data = parse_video(part.get('source'), task_params, template_info) source = parse_video(part.get('source'), task_params, template_info)
if not source: if not source:
logger.warning("no video found for part: " + str(part)) logger.warning("no video found for part: " + str(part))
continue continue
only_if = part.get('only_if', '') only_if = part.get('only_if', '')
if only_if: if only_if:
if not check_placeholder_exist(only_if, task_params_orig): if not check_placeholder_exist(only_if, task_params):
logger.info("because only_if exist, placeholder: %s not exist, skip part: %s", only_if, part) logger.info("because only_if exist, placeholder: %s not exist, skip part: %s", only_if, part)
continue continue
sub_ffmpeg_task = FfmpegTask(source) sub_ffmpeg_task = FfmpegTask(source)
sub_ffmpeg_task.resolution = template_info.get("video_size", "")
sub_ffmpeg_task.annexb = True sub_ffmpeg_task.annexb = True
sub_ffmpeg_task.ext_data = ext_data or {} sub_ffmpeg_task.ext_data = find_placeholder_params(part.get('source'), task_params) or {}
sub_ffmpeg_task.frame_rate = template_info.get("frame_rate", 25) sub_ffmpeg_task.frame_rate = template_info.get("frame_rate", 25)
sub_ffmpeg_task.center_cut = part.get("crop_mode", None) sub_ffmpeg_task.center_cut = part.get("crop_mode", None)
for effect in part.get('effects', []): for effect in part.get('effects', []):
@ -50,14 +44,13 @@ def parse_ffmpeg_task(task_info, template_info):
tasks.append(sub_ffmpeg_task) tasks.append(sub_ffmpeg_task)
output_file = "out_" + str(time.time()) + ".mp4" output_file = "out_" + str(time.time()) + ".mp4"
task = FfmpegTask(tasks, output_file=output_file) task = FfmpegTask(tasks, output_file=output_file)
task.resolution = template_info.get("video_size", "")
overall = template_info.get("overall_template") overall = template_info.get("overall_template")
task.mute = False
task.center_cut = template_info.get("crop_mode", None) task.center_cut = template_info.get("crop_mode", None)
task.frame_rate = template_info.get("frame_rate", 25) task.frame_rate = template_info.get("frame_rate", 25)
if overall.get('source', ''): if overall.get('source', ''):
source, ext_data = parse_video(overall.get('source'), task_params, template_info) source = parse_video(overall.get('source'), task_params, template_info)
task.add_inputs(source) task.add_inputs(source)
task.ext_data = ext_data or {}
for effect in overall.get('effects', []): for effect in overall.get('effects', []):
task.add_effect(effect) task.add_effect(effect)
for lut in overall.get('filters', []): for lut in overall.get('filters', []):
@ -69,24 +62,37 @@ def parse_ffmpeg_task(task_info, template_info):
return task return task
def parse_video(source, task_params, template_info): def find_placeholder_params(source, task_params):
if source.startswith('PLACEHOLDER_'): if source.startswith('PLACEHOLDER_'):
placeholder_id = source.replace('PLACEHOLDER_', '') placeholder_id = source.replace('PLACEHOLDER_', '')
new_sources = task_params.get(placeholder_id, []) new_sources = task_params.get(placeholder_id, [])
_pick_source = {}
if type(new_sources) is list: if type(new_sources) is list:
if len(new_sources) == 0: if len(new_sources) == 0:
logger.debug("no video found for placeholder: " + placeholder_id) logger.debug("no video found for placeholder: " + placeholder_id)
return None, _pick_source return {}
else: else:
_pick_source = new_sources.pop(0) return new_sources[0]
new_sources = _pick_source.get("url") return {}
def parse_video(source, task_params, template_info):
print(source)
if source.startswith('PLACEHOLDER_'):
placeholder_id = source.replace('PLACEHOLDER_', '')
new_sources = task_params.get(placeholder_id, [])
if type(new_sources) is list:
if len(new_sources) == 0:
logger.debug("no video found for placeholder: " + placeholder_id)
return None
else:
# TODO: Random Pick / Policy Pick
new_sources = new_sources[0].get("url")
if new_sources.startswith("http"): if new_sources.startswith("http"):
_, source_name = os.path.split(new_sources) _, source_name = os.path.split(new_sources)
oss.download_from_oss(new_sources, source_name) oss.download_from_oss(new_sources, source_name)
return source_name, _pick_source return source_name
return new_sources, _pick_source return new_sources
return os.path.join(template_info.get("local_path"), source), None return os.path.join(template_info.get("local_path"), source)
def check_placeholder_exist(placeholder_id, task_params): def check_placeholder_exist(placeholder_id, task_params):
@ -103,30 +109,20 @@ def check_placeholder_exist(placeholder_id, task_params):
def start_ffmpeg_task(ffmpeg_task): def start_ffmpeg_task(ffmpeg_task):
tracer = get_tracer(__name__) tracer = get_tracer(__name__)
with tracer.start_as_current_span("start_ffmpeg_task") as span: with tracer.start_as_current_span("start_ffmpeg_task"):
for task in ffmpeg_task.analyze_input_render_tasks(): for task in ffmpeg_task.analyze_input_render_tasks():
result = start_ffmpeg_task(task) result = start_ffmpeg_task(task)
if not result: if not result:
return False return False
ffmpeg_task.correct_task_type() ffmpeg_task.correct_task_type()
span.set_attribute("task.type", ffmpeg_task.task_type) return ffmpeg.start_render(ffmpeg_task)
span.set_attribute("task.center_cut", str(ffmpeg_task.center_cut))
span.set_attribute("task.frame_rate", ffmpeg_task.frame_rate)
span.set_attribute("task.resolution", str(ffmpeg_task.resolution))
span.set_attribute("task.ext_data", json.dumps(ffmpeg_task.ext_data))
result = ffmpeg.start_render(ffmpeg_task)
if not result:
span.set_status(Status(StatusCode.ERROR))
return False
span.set_status(Status(StatusCode.OK))
return True
def clear_task_tmp_file(ffmpeg_task): def clear_task_tmp_file(ffmpeg_task):
for task in ffmpeg_task.analyze_input_render_tasks(): for task in ffmpeg_task.analyze_input_render_tasks():
clear_task_tmp_file(task) clear_task_tmp_file(task)
try: try:
if os.getenv("TEMPLATE_DIR") not in ffmpeg_task.get_output_file(): if "template" not in ffmpeg_task.get_output_file():
os.remove(ffmpeg_task.get_output_file()) os.remove(ffmpeg_task.get_output_file())
logger.info("delete tmp file: " + ffmpeg_task.get_output_file()) logger.info("delete tmp file: " + ffmpeg_task.get_output_file())
else: else:

View File

@ -1,8 +1,4 @@
import json from biz.ffmpeg import parse_ffmpeg_task, start_ffmpeg_task, clear_task_tmp_file, probe_video_info
from opentelemetry.trace import Status, StatusCode
from biz.ffmpeg import parse_ffmpeg_task, start_ffmpeg_task, clear_task_tmp_file, probe_video_info, fade_out_audio
from telemetry import get_tracer from telemetry import get_tracer
from template import get_template_def from template import get_template_def
from util import api from util import api
@ -10,35 +6,22 @@ from util import api
def start_task(task_info): def start_task(task_info):
tracer = get_tracer(__name__) tracer = get_tracer(__name__)
with tracer.start_as_current_span("start_task") as span: with tracer.start_as_current_span("start_task"):
task_info = api.normalize_task(task_info) task_info = api.normalize_task(task_info)
span.set_attribute("task", json.dumps(task_info))
span.set_attribute("scenicId", task_info.get("scenicId", "?"))
span.set_attribute("templateId", task_info.get("templateId"))
template_info = get_template_def(task_info.get("templateId")) template_info = get_template_def(task_info.get("templateId"))
api.report_task_start(task_info) api.report_task_start(task_info)
ffmpeg_task = parse_ffmpeg_task(task_info, template_info) ffmpeg_task = parse_ffmpeg_task(task_info, template_info)
result = start_ffmpeg_task(ffmpeg_task) result = start_ffmpeg_task(ffmpeg_task)
if not result: if not result:
span.set_status(Status(StatusCode.ERROR))
return api.report_task_failed(task_info) return api.report_task_failed(task_info)
width, height, duration = probe_video_info(ffmpeg_task)
span.set_attribute("probe.width", width)
span.set_attribute("probe.height", height)
span.set_attribute("probe.duration", duration)
# 音频淡出
new_fn = fade_out_audio(ffmpeg_task.get_output_file(), duration)
ffmpeg_task.set_output_file(new_fn)
oss_result = api.upload_task_file(task_info, ffmpeg_task) oss_result = api.upload_task_file(task_info, ffmpeg_task)
if not oss_result: if not oss_result:
span.set_status(Status(StatusCode.ERROR))
return api.report_task_failed(task_info) return api.report_task_failed(task_info)
# 获取视频长度宽度和时长 # 获取视频长度宽度和时长
width, height, duration = probe_video_info(ffmpeg_task)
clear_task_tmp_file(ffmpeg_task) clear_task_tmp_file(ffmpeg_task)
api.report_task_success(task_info, videoInfo={ api.report_task_success(task_info, videoInfo={
"width": width, "width": width,
"height": height, "height": height,
"duration": duration "duration": duration
}) })
span.set_status(Status(StatusCode.OK))
return None

View File

@ -1,11 +1,10 @@
import json import json
import os
import time import time
import uuid import uuid
from typing import Any from typing import Any
DEFAULT_ARGS = ("-shortest",) DEFAULT_ARGS = ("-shortest",)
ENCODER_ARGS = ("-c:v", "h264", ) if not os.getenv("ENCODER_ARGS", False) else os.getenv("ENCODER_ARGS", "").split(" ") ENCODER_ARGS = ("-c:v", "h264_nvenc", "-cq:v", "24", "-preset:v", "p7", "-tune:v", "hq",)
VIDEO_ARGS = ("-profile:v", "high", "-level:v", "4", ) VIDEO_ARGS = ("-profile:v", "high", "-level:v", "4", )
AUDIO_ARGS = ("-c:a", "aac", "-b:a", "128k", "-ar", "48000", "-ac", "2", ) AUDIO_ARGS = ("-c:a", "aac", "-b:a", "128k", "-ar", "48000", "-ac", "2", )
MUTE_AUDIO_INPUT = ("-f", "lavfi", "-i", "anullsrc=cl=stereo:r=48000", ) MUTE_AUDIO_INPUT = ("-f", "lavfi", "-i", "anullsrc=cl=stereo:r=48000", )
@ -33,7 +32,6 @@ class FfmpegTask(object):
self.mute = True self.mute = True
self.speed = 1 self.speed = 1
self.frame_rate = 25 self.frame_rate = 25
self.resolution = None
self.subtitles = [] self.subtitles = []
self.luts = [] self.luts = []
self.audios = [] self.audios = []
@ -143,7 +141,7 @@ class FfmpegTask(object):
return False return False
if self.speed != 1: if self.speed != 1:
return False return False
if len(self.audios) >= 1: if len(self.audios) > 1:
return False return False
if len(self.input_file) > 1: if len(self.input_file) > 1:
return False return False
@ -169,7 +167,6 @@ class FfmpegTask(object):
output_args.append("1") output_args.append("1")
video_output_str = "[0:v]" video_output_str = "[0:v]"
audio_output_str = "" audio_output_str = ""
audio_track_index = 0
effect_index = 0 effect_index = 0
for input_file in self.input_file: for input_file in self.input_file:
input_args.append("-i") input_args.append("-i")
@ -183,7 +180,7 @@ class FfmpegTask(object):
_v_w = pos_json.get('imgWidth', 1) _v_w = pos_json.get('imgWidth', 1)
_f_x = pos_json.get('ltX', 0) _f_x = pos_json.get('ltX', 0)
_f_x2 = pos_json.get('rbX', 0) _f_x2 = pos_json.get('rbX', 0)
_x = f'{float((_f_x2 + _f_x)/(2 * _v_w)) :.4f}*iw-ih*ih/(2*iw)' _x = f'{float((_f_x2 - _f_x)/(2 * _v_w)) :.4f}*iw'
filter_args.append(f"{video_output_str}crop=x={_x}:y=0:w=ih*ih/iw:h=ih[v_cut{effect_index}]") filter_args.append(f"{video_output_str}crop=x={_x}:y=0:w=ih*ih/iw:h=ih[v_cut{effect_index}]")
video_output_str = f"[v_cut{effect_index}]" video_output_str = f"[v_cut{effect_index}]"
effect_index += 1 effect_index += 1
@ -209,9 +206,9 @@ class FfmpegTask(object):
_mid_out_str = "[eff_m]" _mid_out_str = "[eff_m]"
_end_out_str = "[eff_e]" _end_out_str = "[eff_e]"
filter_args.append(f"{video_output_str}split=3{_start_out_str}{_mid_out_str}{_end_out_str}") filter_args.append(f"{video_output_str}split=3{_start_out_str}{_mid_out_str}{_end_out_str}")
filter_args.append(f"{_start_out_str}select=lt(n\\,{int(start*self.frame_rate)}){_start_out_str}") filter_args.append(f"{_start_out_str}select=lt(n\,{int(start*self.frame_rate)}){_start_out_str}")
filter_args.append(f"{_end_out_str}select=gt(n\\,{int(start*self.frame_rate)}){_end_out_str}") filter_args.append(f"{_end_out_str}select=gt(n\,{int(start*self.frame_rate)}){_end_out_str}")
filter_args.append(f"{_mid_out_str}select=eq(n\\,{int(start*self.frame_rate)}){_mid_out_str}") filter_args.append(f"{_mid_out_str}select=eq(n\,{int(start*self.frame_rate)}){_mid_out_str}")
filter_args.append(f"{_mid_out_str}tpad=start_mode=clone:start_duration={duration:.4f}{_mid_out_str}") filter_args.append(f"{_mid_out_str}tpad=start_mode=clone:start_duration={duration:.4f}{_mid_out_str}")
if rotate_deg != 0: if rotate_deg != 0:
filter_args.append(f"{_mid_out_str}rotate=PI*{rotate_deg}/360{_mid_out_str}") filter_args.append(f"{_mid_out_str}rotate=PI*{rotate_deg}/360{_mid_out_str}")
@ -222,30 +219,15 @@ class FfmpegTask(object):
# filter_args.append(f"{_end_out_str}{_start_out_str}overlay=eof_action=pass{video_output_str}") # filter_args.append(f"{_end_out_str}{_start_out_str}overlay=eof_action=pass{video_output_str}")
filter_args.append(f"{_start_out_str}{_mid_out_str}{_end_out_str}concat=n=3:v=1:a=0,setpts=N/{self.frame_rate}/TB{video_output_str}") filter_args.append(f"{_start_out_str}{_mid_out_str}{_end_out_str}concat=n=3:v=1:a=0,setpts=N/{self.frame_rate}/TB{video_output_str}")
effect_index += 1 effect_index += 1
elif effect.startswith("ospeed:"):
param = effect.split(":", 2)[1]
if param == '':
param = "1"
if param != "1":
# 视频变速
effect_index += 1
filter_args.append(f"{video_output_str}setpts={param}*PTS[v_eff{effect_index}]")
video_output_str = f"[v_eff{effect_index}]"
elif effect.startswith("zoom:"): elif effect.startswith("zoom:"):
... ...
... ...
for lut in self.luts: for lut in self.luts:
filter_args.append(f"{video_output_str}lut3d=file={lut}{video_output_str}") filter_args.append(f"{video_output_str}lut3d=file={lut}{video_output_str}")
if self.resolution:
filter_args.append(f"{video_output_str}scale={self.resolution.replace('x', ':')}[v]")
video_output_str = "[v]"
for overlay in self.overlays: for overlay in self.overlays:
input_index = input_args.count("-i") input_index = input_args.count("-i")
input_args.append("-i") input_args.append("-i")
input_args.append(overlay) input_args.append(overlay)
if os.getenv("OLD_FFMPEG"):
filter_args.append(f"{video_output_str}[{input_index}:v]scale2ref=iw:ih[v]")
else:
filter_args.append(f"{video_output_str}[{input_index}:v]scale=rw:rh[v]") filter_args.append(f"{video_output_str}[{input_index}:v]scale=rw:rh[v]")
filter_args.append(f"[v][{input_index}:v]overlay=1:eof_action=endall[v]") filter_args.append(f"[v][{input_index}:v]overlay=1:eof_action=endall[v]")
video_output_str = "[v]" video_output_str = "[v]"
@ -260,17 +242,18 @@ class FfmpegTask(object):
input_index = input_args.count("-i") input_index = input_args.count("-i")
input_args += MUTE_AUDIO_INPUT input_args += MUTE_AUDIO_INPUT
filter_args.append(f"[{input_index}:a]acopy[a]") filter_args.append(f"[{input_index}:a]acopy[a]")
audio_track_index += 1
audio_output_str = "[a]" audio_output_str = "[a]"
else: else:
audio_output_str = "[0:a]" audio_output_str = "[0:a]"
audio_track_index += 1
for audio in self.audios: for audio in self.audios:
input_index = input_args.count("-i") input_index = input_args.count("-i")
input_args.append("-i") input_args.append("-i")
input_args.append(audio.replace("\\", "/")) input_args.append(audio.replace("\\", "/"))
audio_track_index += 1 if audio_output_str == "":
filter_args.append(f"{audio_output_str}[{input_index}:a]amix=duration=shortest:dropout_transition=0:normalize=0[a]") filter_args.append(f"[{input_index}:a]acopy[a]")
audio_output_str = "[a]"
else:
filter_args.append(f"{audio_output_str}[{input_index}:a]amix[a]")
audio_output_str = "[a]" audio_output_str = "[a]"
if audio_output_str: if audio_output_str:
output_args.append("-map") output_args.append("-map")
@ -285,18 +268,7 @@ class FfmpegTask(object):
audio_output_str = "" audio_output_str = ""
audio_track_index = 0 audio_track_index = 0
# output_args # output_args
if len(self.input_file) == 1:
_file = self.input_file[0]
from util.ffmpeg import probe_video_audio
if type(_file) is str:
input_args += ["-i", _file]
self.mute = not probe_video_audio(_file)
elif isinstance(_file, FfmpegTask):
input_args += ["-i", _file.get_output_file()]
self.mute = not probe_video_audio(_file.get_output_file())
else:
_tmp_file = "tmp_concat_"+str(time.time())+".txt" _tmp_file = "tmp_concat_"+str(time.time())+".txt"
from util.ffmpeg import probe_video_audio
with open(_tmp_file, "w", encoding="utf-8") as f: with open(_tmp_file, "w", encoding="utf-8") as f:
for input_file in self.input_file: for input_file in self.input_file:
if type(input_file) is str: if type(input_file) is str:
@ -304,7 +276,6 @@ class FfmpegTask(object):
elif isinstance(input_file, FfmpegTask): elif isinstance(input_file, FfmpegTask):
f.write("file '" + input_file.get_output_file() + "'\n") f.write("file '" + input_file.get_output_file() + "'\n")
input_args += ["-f", "concat", "-safe", "0", "-i", _tmp_file] input_args += ["-f", "concat", "-safe", "0", "-i", _tmp_file]
self.mute = not probe_video_audio(_tmp_file, "concat")
output_args.append("-map") output_args.append("-map")
output_args.append("0:v") output_args.append("0:v")
output_args.append("-c:v") output_args.append("-c:v")
@ -322,7 +293,7 @@ class FfmpegTask(object):
input_args.append("-i") input_args.append("-i")
input_args.append(audio.replace("\\", "/")) input_args.append(audio.replace("\\", "/"))
audio_track_index += 1 audio_track_index += 1
filter_args.append(f"{audio_output_str}[{input_index}:a]amix=duration=shortest:dropout_transition=0:normalize=0[a]") filter_args.append(f"{audio_output_str}[{input_index}:a]amix[a]")
audio_output_str = "[a]" audio_output_str = "[a]"
if audio_output_str: if audio_output_str:
output_args.append("-map") output_args.append("-map")
@ -331,13 +302,8 @@ class FfmpegTask(object):
else: else:
output_args.append(audio_output_str) output_args.append(audio_output_str)
output_args += AUDIO_ARGS output_args += AUDIO_ARGS
if self.annexb:
output_args.append("-bsf:v")
output_args.append("h264_mp4toannexb")
output_args.append("-bsf:a")
output_args.append("setts=pts=DTS")
output_args.append("-f") output_args.append("-f")
output_args.append("mpegts" if self.annexb else "mp4") output_args.append("mp4")
_filter_args = [] if len(filter_args) == 0 else ["-filter_complex", ";".join(filter_args)] _filter_args = [] if len(filter_args) == 0 else ["-filter_complex", ";".join(filter_args)]
return args + input_args + _filter_args + output_args + [self.get_output_file()] return args + input_args + _filter_args + output_args + [self.get_output_file()]
elif self.task_type == 'copy': elif self.task_type == 'copy':
@ -346,7 +312,6 @@ class FfmpegTask(object):
if self.input_file[0] == self.get_output_file(): if self.input_file[0] == self.get_output_file():
return [] return []
return args + ["-i", self.input_file[0]] + ["-c", "copy", self.get_output_file()] return args + ["-i", self.input_file[0]] + ["-c", "copy", self.get_output_file()]
return []
def set_output_file(self, file=None): def set_output_file(self, file=None):
if file is None: if file is None:

View File

@ -1,7 +1,7 @@
from time import sleep from time import sleep
import config
import biz.task import biz.task
import config
from telemetry import init_opentelemetry from telemetry import init_opentelemetry
from template import load_local_template from template import load_local_template
from util import api from util import api

View File

@ -1,7 +1,3 @@
requests~=2.32.3 requests~=2.32.3
psutil~=6.1.0 psutil~=6.1.0
python-dotenv~=1.0.1 python-dotenv~=1.0.1
opentelemetry-api~=1.30.0
opentelemetry-sdk~=1.30.0
opentelemetry-exporter-otlp~=1.30.0
flask~=3.1.0

View File

@ -5,14 +5,14 @@ from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter as OTLPSpanHttpExporter from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter as OTLPSpanHttpExporter
from opentelemetry.sdk.resources import DEPLOYMENT_ENVIRONMENT, HOST_NAME, Resource, SERVICE_NAME, SERVICE_VERSION from opentelemetry.sdk.resources import DEPLOYMENT_ENVIRONMENT, HOST_NAME, Resource, SERVICE_NAME, SERVICE_VERSION
from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, SimpleSpanProcessor from opentelemetry.sdk.trace.export import BatchSpanProcessor
def get_tracer(name): def get_tracer(name):
return trace.get_tracer(name) return trace.get_tracer(name)
# 初始化 OpenTelemetry # 初始化 OpenTelemetry
def init_opentelemetry(batch=True): def init_opentelemetry():
# 设置服务名、主机名 # 设置服务名、主机名
resource = Resource(attributes={ resource = Resource(attributes={
SERVICE_NAME: "RENDER_WORKER", SERVICE_NAME: "RENDER_WORKER",
@ -22,13 +22,8 @@ def init_opentelemetry(batch=True):
}) })
# 使用HTTP协议上报 # 使用HTTP协议上报
if batch:
span_processor = BatchSpanProcessor(OTLPSpanHttpExporter( span_processor = BatchSpanProcessor(OTLPSpanHttpExporter(
endpoint="https://oltp.jerryyan.top/v1/traces", endpoint="http://tracing-analysis-dc-sh.aliyuncs.com/adapt_e7qojqi4e0@aa79b4d367fb6b7_e7qojqi4e0@53df7ad2afe8301/api/otlp/traces",
))
else:
span_processor = SimpleSpanProcessor(OTLPSpanHttpExporter(
endpoint="https://oltp.jerryyan.top/v1/traces",
)) ))
trace_provider = TracerProvider(resource=resource, active_span_processor=span_processor) trace_provider = TracerProvider(resource=resource, active_span_processor=span_processor)

View File

@ -2,7 +2,6 @@ import json
import os import os
import logging import logging
from telemetry import get_tracer
from util import api, oss from util import api, oss
TEMPLATES = {} TEMPLATES = {}
@ -76,8 +75,6 @@ def get_template_def(template_id):
return TEMPLATES.get(template_id) return TEMPLATES.get(template_id)
def download_template(template_id): def download_template(template_id):
tracer = get_tracer(__name__)
with tracer.start_as_current_span("download_template"):
template_info = api.get_template_info(template_id) template_info = api.get_template_info(template_id)
if not os.path.isdir(template_info['local_path']): if not os.path.isdir(template_info['local_path']):
os.makedirs(template_info['local_path']) os.makedirs(template_info['local_path'])

View File

@ -1,14 +1,11 @@
import json import json
import logging import logging
import os import os
import threading
import requests import requests
from opentelemetry.trace import Status, StatusCode
import util.system import util.system
from telemetry import get_tracer from telemetry import get_tracer
from util import oss
session = requests.Session() session = requests.Session()
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -25,15 +22,23 @@ def sync_center():
:return: 任务列表 :return: 任务列表
""" """
from template import TEMPLATES, download_template from template import TEMPLATES, download_template
tracer = get_tracer(__name__)
with tracer.start_as_current_span("sync_center"):
with get_tracer("api").start_as_current_span("sync_center.request") as req_span:
try: try:
req_span.set_attribute("http.method", "POST")
req_span.set_attribute("http.url", os.getenv('API_ENDPOINT') + "/sync")
response = session.post(os.getenv('API_ENDPOINT') + "/sync", json={ response = session.post(os.getenv('API_ENDPOINT') + "/sync", json={
'accessKey': os.getenv('ACCESS_KEY'), 'accessKey': os.getenv('ACCESS_KEY'),
'clientStatus': util.system.get_sys_info(), 'clientStatus': util.system.get_sys_info(),
'templateList': [{'id': t.get('id', ''), 'updateTime': t.get('updateTime', '')} for t in 'templateList': [{'id': t.get('id', ''), 'updateTime': t.get('updateTime', '')} for t in
TEMPLATES.values()] TEMPLATES.values()]
}, timeout=10) }, timeout=10)
req_span.set_attribute("http.status_code", response.status_code)
response.raise_for_status() response.raise_for_status()
req_span.set_attribute("api.response", response.text)
except requests.RequestException as e: except requests.RequestException as e:
req_span.set_attribute("api.error", str(e))
logger.error("请求失败!", e) logger.error("请求失败!", e)
return [] return []
data = response.json() data = response.json()
@ -45,13 +50,6 @@ def sync_center():
tasks = [] tasks = []
templates = [] templates = []
logger.warning("获取任务失败") logger.warning("获取任务失败")
if os.getenv("REDIRECT_TO_URL", False) != False:
for task in tasks:
_sess = requests.Session()
logger.info("重定向任务【%s】至配置的地址:%s", task.get("id"), os.getenv("REDIRECT_TO_URL"))
url = f"{os.getenv('REDIRECT_TO_URL')}{task.get('id')}"
threading.Thread(target=requests.post, args=(url,)).start()
return []
for template in templates: for template in templates:
template_id = template.get('id', '') template_id = template.get('id', '')
if template_id: if template_id:
@ -70,7 +68,7 @@ def get_template_info(template_id):
""" """
tracer = get_tracer(__name__) tracer = get_tracer(__name__)
with tracer.start_as_current_span("get_template_info"): with tracer.start_as_current_span("get_template_info"):
with tracer.start_as_current_span("get_template_info.request") as req_span: with get_tracer("api").start_as_current_span("get_template_info.request") as req_span:
try: try:
req_span.set_attribute("http.method", "POST") req_span.set_attribute("http.method", "POST")
req_span.set_attribute("http.url", '{0}/template/{1}'.format(os.getenv('API_ENDPOINT'), template_id)) req_span.set_attribute("http.url", '{0}/template/{1}'.format(os.getenv('API_ENDPOINT'), template_id))
@ -78,8 +76,8 @@ def get_template_info(template_id):
'accessKey': os.getenv('ACCESS_KEY'), 'accessKey': os.getenv('ACCESS_KEY'),
}, timeout=10) }, timeout=10)
req_span.set_attribute("http.status_code", response.status_code) req_span.set_attribute("http.status_code", response.status_code)
req_span.set_attribute("http.response", response.text)
response.raise_for_status() response.raise_for_status()
req_span.set_attribute("api.response", response.text)
except requests.RequestException as e: except requests.RequestException as e:
req_span.set_attribute("api.error", str(e)) req_span.set_attribute("api.error", str(e))
logger.error("请求失败!", e) logger.error("请求失败!", e)
@ -147,7 +145,7 @@ def get_template_info(template_id):
def report_task_success(task_info, **kwargs): def report_task_success(task_info, **kwargs):
tracer = get_tracer(__name__) tracer = get_tracer(__name__)
with tracer.start_as_current_span("report_task_success"): with tracer.start_as_current_span("report_task_success"):
with tracer.start_as_current_span("report_task_success.request") as req_span: with get_tracer("api").start_as_current_span("report_task_success.request") as req_span:
try: try:
req_span.set_attribute("http.method", "POST") req_span.set_attribute("http.method", "POST")
req_span.set_attribute("http.url", req_span.set_attribute("http.url",
@ -157,9 +155,8 @@ def report_task_success(task_info, **kwargs):
**kwargs **kwargs
}, timeout=10) }, timeout=10)
req_span.set_attribute("http.status_code", response.status_code) req_span.set_attribute("http.status_code", response.status_code)
req_span.set_attribute("http.response", response.text)
response.raise_for_status() response.raise_for_status()
req_span.set_status(Status(StatusCode.OK)) req_span.set_attribute("api.response", response.text)
except requests.RequestException as e: except requests.RequestException as e:
req_span.set_attribute("api.error", str(e)) req_span.set_attribute("api.error", str(e))
logger.error("请求失败!", e) logger.error("请求失败!", e)
@ -169,7 +166,7 @@ def report_task_success(task_info, **kwargs):
def report_task_start(task_info): def report_task_start(task_info):
tracer = get_tracer(__name__) tracer = get_tracer(__name__)
with tracer.start_as_current_span("report_task_start"): with tracer.start_as_current_span("report_task_start"):
with tracer.start_as_current_span("report_task_start.request") as req_span: with get_tracer("api").start_as_current_span("report_task_start.request") as req_span:
try: try:
req_span.set_attribute("http.method", "POST") req_span.set_attribute("http.method", "POST")
req_span.set_attribute("http.url", req_span.set_attribute("http.url",
@ -178,9 +175,8 @@ def report_task_start(task_info):
'accessKey': os.getenv('ACCESS_KEY'), 'accessKey': os.getenv('ACCESS_KEY'),
}, timeout=10) }, timeout=10)
req_span.set_attribute("http.status_code", response.status_code) req_span.set_attribute("http.status_code", response.status_code)
req_span.set_attribute("http.response", response.text)
response.raise_for_status() response.raise_for_status()
req_span.set_status(Status(StatusCode.OK)) req_span.set_attribute("api.response", response.text)
except requests.RequestException as e: except requests.RequestException as e:
req_span.set_attribute("api.error", str(e)) req_span.set_attribute("api.error", str(e))
logger.error("请求失败!", e) logger.error("请求失败!", e)
@ -189,9 +185,7 @@ def report_task_start(task_info):
def report_task_failed(task_info, reason=''): def report_task_failed(task_info, reason=''):
tracer = get_tracer(__name__) tracer = get_tracer(__name__)
with tracer.start_as_current_span("report_task_failed") as span: with tracer.start_as_current_span("report_task_failed"):
span.set_attribute("task_id", task_info.get("id"))
span.set_attribute("reason", reason)
with tracer.start_as_current_span("report_task_failed.request") as req_span: with tracer.start_as_current_span("report_task_failed.request") as req_span:
try: try:
req_span.set_attribute("http.method", "POST") req_span.set_attribute("http.method", "POST")
@ -202,23 +196,21 @@ def report_task_failed(task_info, reason=''):
'reason': reason 'reason': reason
}, timeout=10) }, timeout=10)
req_span.set_attribute("http.status_code", response.status_code) req_span.set_attribute("http.status_code", response.status_code)
req_span.set_attribute("http.response", response.text)
response.raise_for_status() response.raise_for_status()
req_span.set_status(Status(StatusCode.OK)) req_span.set_attribute("api.response", response.text)
except requests.RequestException as e: except requests.RequestException as e:
req_span.set_attribute("api.error", str(e)) req_span.set_attribute("api.error", str(e))
req_span.set_status(Status(StatusCode.ERROR))
logger.error("请求失败!", e) logger.error("请求失败!", e)
return None return None
def upload_task_file(task_info, ffmpeg_task): def upload_task_file(task_info, ffmpeg_task):
tracer = get_tracer(__name__) tracer = get_tracer(__name__)
with get_tracer("api").start_as_current_span("upload_task_file") as span: with tracer.start_as_current_span("upload_task_file") as span:
logger.info("开始上传文件: %s", task_info.get("id")) logger.info("开始上传文件: %s", task_info.get("id"))
span.set_attribute("file.id", task_info.get("id")) span.set_attribute("file.id", task_info.get("id"))
with tracer.start_as_current_span("upload_task_file.request_upload_url") as req_span:
try: try:
with tracer.start_as_current_span("upload_task_file.request_upload_url") as req_span:
req_span.set_attribute("http.method", "POST") req_span.set_attribute("http.method", "POST")
req_span.set_attribute("http.url", req_span.set_attribute("http.url",
'{0}/{1}/uploadUrl'.format(os.getenv('API_ENDPOINT'), task_info.get("id"))) '{0}/{1}/uploadUrl'.format(os.getenv('API_ENDPOINT'), task_info.get("id")))
@ -226,31 +218,25 @@ def upload_task_file(task_info, ffmpeg_task):
json={ json={
'accessKey': os.getenv('ACCESS_KEY'), 'accessKey': os.getenv('ACCESS_KEY'),
}, timeout=10) }, timeout=10)
req_span.set_attribute("http.status_code", response.status_code)
req_span.set_attribute("http.response", response.text)
response.raise_for_status() response.raise_for_status()
req_span.set_status(Status(StatusCode.OK)) req_span.set_attribute("http.status_code", response.status_code)
except requests.RequestException as e: except requests.RequestException as e:
span.set_attribute("api.error", str(e)) span.set_attribute("api.error", str(e))
req_span.set_status(Status(StatusCode.ERROR))
logger.error("请求失败!", e) logger.error("请求失败!", e)
return False return False
data = response.json() data = response.json()
url = data.get('data', "") url = data.get('data', "")
logger.info("开始上传文件: %s%s", task_info.get("id"), url) logger.info("开始上传文件: %s%s", task_info.get("id"), url)
return oss.upload_to_oss(url, ffmpeg_task.get_output_file())
def get_task_info(id):
try: try:
response = session.get(os.getenv('API_ENDPOINT') + "/" + id + "/info", params={ with tracer.start_as_current_span("upload_task_file.start_upload_file") as upload_span:
'accessKey': os.getenv('ACCESS_KEY'), upload_span.set_attribute("http.method", "PUT")
}, timeout=10) upload_span.set_attribute("http.url", url)
response.raise_for_status() with open(ffmpeg_task.get_output_file(), 'rb') as f:
requests.put(url, data=f, headers={"Content-Type": "video/mp4"})
except requests.RequestException as e: except requests.RequestException as e:
logger.error("请求失败!", e) span.set_attribute("api.error", str(e))
return [] logger.error("上传失败!", e)
data = response.json() return False
logger.debug("获取任务结果:【%s", data) finally:
if data.get('code', 0) == 200: logger.info("上传文件结束: %s", task_info.get("id"))
return data.get('data', {}) return True

View File

@ -5,65 +5,68 @@ import subprocess
from datetime import datetime from datetime import datetime
from typing import Optional, IO from typing import Optional, IO
from opentelemetry.trace import Status, StatusCode from entity.ffmpeg import FfmpegTask, ENCODER_ARGS, VIDEO_ARGS, AUDIO_ARGS
from entity.ffmpeg import FfmpegTask, ENCODER_ARGS, VIDEO_ARGS, AUDIO_ARGS, MUTE_AUDIO_INPUT
from telemetry import get_tracer from telemetry import get_tracer
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def to_annexb(file):
with get_tracer("ffmpeg").start_as_current_span("to_annexb") as span:
span.set_attribute("file.path", file)
if not os.path.exists(file):
return file
logger.info("ToAnnexb: %s", file)
ffmpeg_process = subprocess.run(["ffmpeg", "-y", "-hide_banner", "-i", file, "-c", "copy", "-bsf:v", "h264_mp4toannexb",
"-f", "mpegts", file+".ts"])
span.set_attribute("ffmpeg.args", json.dumps(ffmpeg_process.args))
logger.info("ToAnnexb: %s, returned: %s", file, ffmpeg_process.returncode)
span.set_attribute("ffmpeg.code", ffmpeg_process.returncode)
if ffmpeg_process.returncode == 0:
span.set_attribute("file.size", os.path.getsize(file+".ts"))
os.remove(file)
return file+".ts"
else:
return file
def re_encode_and_annexb(file): def re_encode_and_annexb(file):
with get_tracer("ffmpeg").start_as_current_span("re_encode_and_annexb") as span: with get_tracer("ffmpeg").start_as_current_span("re_encode_and_annexb") as span:
span.set_attribute("file.path", file) span.set_attribute("file.path", file)
if not os.path.exists(file): if not os.path.exists(file):
span.set_status(Status(StatusCode.ERROR))
return file return file
logger.info("ReEncodeAndAnnexb: %s", file) logger.info("ReEncodeAndAnnexb: %s", file)
has_audio = not not probe_video_audio(file) ffmpeg_process = subprocess.run(["ffmpeg", "-y", "-hide_banner", "-i", file, *VIDEO_ARGS, *AUDIO_ARGS, *ENCODER_ARGS, "-bsf:v", "h264_mp4toannexb",
ffmpeg_process = subprocess.run(["ffmpeg", "-y", "-hide_banner", "-vsync", "cfr", "-i", file,
*(set() if has_audio else MUTE_AUDIO_INPUT),
"-map", "0:v", "-map", "0:a" if has_audio else "1:a",
*VIDEO_ARGS, "-bsf:v", "h264_mp4toannexb",
*AUDIO_ARGS, "-bsf:a", "setts=pts=DTS",
*ENCODER_ARGS, "-shortest", "-fflags", "+genpts",
"-f", "mpegts", file +".ts"]) "-f", "mpegts", file +".ts"])
logger.info(" ".join(ffmpeg_process.args))
span.set_attribute("ffmpeg.args", json.dumps(ffmpeg_process.args)) span.set_attribute("ffmpeg.args", json.dumps(ffmpeg_process.args))
logger.info("ReEncodeAndAnnexb: %s, returned: %s", file, ffmpeg_process.returncode) logger.info("ReEncodeAndAnnexb: %s, returned: %s", file, ffmpeg_process.returncode)
span.set_attribute("ffmpeg.code", ffmpeg_process.returncode) span.set_attribute("ffmpeg.code", ffmpeg_process.returncode)
if ffmpeg_process.returncode == 0: if ffmpeg_process.returncode == 0:
span.set_status(Status(StatusCode.OK))
span.set_attribute("file.size", os.path.getsize(file+".ts")) span.set_attribute("file.size", os.path.getsize(file+".ts"))
# os.remove(file) os.remove(file)
return file+".ts" return file+".ts"
else: else:
span.set_status(Status(StatusCode.ERROR))
return file return file
def start_render(ffmpeg_task: FfmpegTask): def start_render(ffmpeg_task: FfmpegTask):
tracer = get_tracer(__name__) tracer = get_tracer(__name__)
with tracer.start_as_current_span("start_render") as span: with tracer.start_as_current_span("start_render") as span:
span.set_attribute("ffmpeg.task", str(ffmpeg_task)) span.set_attribute("ffmpeg.task", str(ffmpeg_task))
logger.info(ffmpeg_task)
if not ffmpeg_task.need_run(): if not ffmpeg_task.need_run():
ffmpeg_task.set_output_file(ffmpeg_task.input_file[0]) ffmpeg_task.set_output_file(ffmpeg_task.input_file[0])
span.set_status(Status(StatusCode.OK))
return True return True
ffmpeg_args = ffmpeg_task.get_ffmpeg_args() ffmpeg_args = ffmpeg_task.get_ffmpeg_args()
if len(ffmpeg_args) == 0: if len(ffmpeg_args) == 0:
ffmpeg_task.set_output_file(ffmpeg_task.input_file[0]) ffmpeg_task.set_output_file(ffmpeg_task.input_file[0])
span.set_status(Status(StatusCode.OK))
return True return True
ffmpeg_process = subprocess.run(["ffmpeg", "-progress", "-", "-loglevel", "error", *ffmpeg_args], stderr=subprocess.PIPE, **subprocess_args(True)) ffmpeg_process = subprocess.run(["ffmpeg", "-progress", "-", "-loglevel", "error", *ffmpeg_args], **subprocess_args(True))
span.set_attribute("ffmpeg.args", json.dumps(ffmpeg_process.args)) span.set_attribute("ffmpeg.args", json.dumps(ffmpeg_process.args))
logger.info(" ".join(ffmpeg_process.args)) logger.info(ffmpeg_process.args)
ffmpeg_final_out = handle_ffmpeg_output(ffmpeg_process.stdout) ffmpeg_final_out = handle_ffmpeg_output(ffmpeg_process.stdout)
span.set_attribute("ffmpeg.out", ffmpeg_final_out) span.set_attribute("ffmpeg.out", ffmpeg_final_out)
logger.info("FINISH TASK, OUTPUT IS %s", ffmpeg_final_out) logger.info("FINISH TASK, OUTPUT IS %s", ffmpeg_final_out)
code = ffmpeg_process.returncode code = ffmpeg_process.returncode
span.set_attribute("ffmpeg.code", code) span.set_attribute("ffmpeg.code", code)
if code != 0: if code != 0:
span.set_attribute("ffmpeg.err", str(ffmpeg_process.stderr))
span.set_status(Status(StatusCode.ERROR, "FFMPEG异常退出"))
logger.error("FFMPEG ERROR: %s", ffmpeg_process.stderr) logger.error("FFMPEG ERROR: %s", ffmpeg_process.stderr)
return False return False
span.set_attribute("ffmpeg.out_file", ffmpeg_task.output_file) span.set_attribute("ffmpeg.out_file", ffmpeg_task.output_file)
@ -71,16 +74,12 @@ def start_render(ffmpeg_task: FfmpegTask):
file_size = os.path.getsize(ffmpeg_task.output_file) file_size = os.path.getsize(ffmpeg_task.output_file)
span.set_attribute("file.size", file_size) span.set_attribute("file.size", file_size)
if file_size < 4096: if file_size < 4096:
span.set_status(Status(StatusCode.ERROR, "输出文件过小"))
logger.error("FFMPEG ERROR: OUTPUT FILE IS TOO SMALL") logger.error("FFMPEG ERROR: OUTPUT FILE IS TOO SMALL")
return False return False
except OSError as e: except OSError:
span.set_attribute("file.size", 0) span.set_attribute("file.size", 0)
span.set_attribute("file.error", e.strerror)
span.set_status(Status(StatusCode.ERROR, "输出文件不存在"))
logger.error("FFMPEG ERROR: OUTPUT FILE NOT FOUND") logger.error("FFMPEG ERROR: OUTPUT FILE NOT FOUND")
return False return False
span.set_status(Status(StatusCode.OK))
return True return True
def handle_ffmpeg_output(stdout: Optional[bytes]) -> str: def handle_ffmpeg_output(stdout: Optional[bytes]) -> str:
@ -121,78 +120,14 @@ def probe_video_info(video_file):
span.set_attribute("ffprobe.args", json.dumps(result.args)) span.set_attribute("ffprobe.args", json.dumps(result.args))
span.set_attribute("ffprobe.code", result.returncode) span.set_attribute("ffprobe.code", result.returncode)
if result.returncode != 0: if result.returncode != 0:
span.set_status(Status(StatusCode.ERROR))
return 0, 0, 0 return 0, 0, 0
all_result = result.stdout.decode('utf-8').strip() all_result = result.stdout.decode('utf-8').strip()
span.set_attribute("ffprobe.out", all_result) span.set_attribute("ffprobe.out", all_result)
if all_result == '':
span.set_status(Status(StatusCode.ERROR))
return 0, 0, 0
span.set_status(Status(StatusCode.OK))
wh, duration = all_result.split('\n') wh, duration = all_result.split('\n')
width, height = wh.strip().split('x') width, height = wh.strip().split('x')
return int(width), int(height), float(duration) return int(width), int(height), float(duration)
def probe_video_audio(video_file, type=None):
tracer = get_tracer(__name__)
with tracer.start_as_current_span("probe_video_audio") as span:
span.set_attribute("video.file", video_file)
args = ["ffprobe", "-hide_banner", "-v", "error", "-select_streams", "a", "-show_entries", "stream=index", "-of", "csv=p=0"]
if type == 'concat':
args.append("-safe")
args.append("0")
args.append("-f")
args.append("concat")
args.append(video_file)
logger.info(" ".join(args))
result = subprocess.run(args, stderr=subprocess.STDOUT, **subprocess_args(True))
span.set_attribute("ffprobe.args", json.dumps(result.args))
span.set_attribute("ffprobe.code", result.returncode)
logger.info("probe_video_audio: %s", result.stdout.decode('utf-8').strip())
if result.returncode != 0:
return False
if result.stdout.decode('utf-8').strip() == '':
return False
return True
# 音频淡出2秒
def fade_out_audio(file, duration, fade_out_sec = 2):
if type(duration) == str:
try:
duration = float(duration)
except Exception as e:
logger.error("duration is not float: %s", e)
return file
tracer = get_tracer(__name__)
with tracer.start_as_current_span("fade_out_audio") as span:
span.set_attribute("audio.file", file)
if duration <= fade_out_sec:
return file
else:
new_fn = file + "_.mp4"
if os.path.exists(new_fn):
os.remove(new_fn)
logger.info("delete tmp file: " + new_fn)
try:
process = subprocess.run(["ffmpeg", "-i", file, "-c:v", "copy", "-c:a", "aac", "-af", "afade=t=out:st=" + str(duration - fade_out_sec) + ":d=" + str(fade_out_sec), "-y", new_fn], **subprocess_args(True))
span.set_attribute("ffmpeg.args", json.dumps(process.args))
logger.info(" ".join(process.args))
if process.returncode != 0:
span.set_status(Status(StatusCode.ERROR))
logger.error("FFMPEG ERROR: %s", process.stderr)
return file
else:
span.set_status(Status(StatusCode.OK))
return new_fn
except Exception as e:
span.set_status(Status(StatusCode.ERROR))
logger.error("FFMPEG ERROR: %s", e)
return file
# Create a set of arguments which make a ``subprocess.Popen`` (and # Create a set of arguments which make a ``subprocess.Popen`` (and
# variants) call work with or without Pyinstaller, ``--noconsole`` or # variants) call work with or without Pyinstaller, ``--noconsole`` or
# not, on Windows and Linux. Typical use:: # not, on Windows and Linux. Typical use::

View File

@ -2,7 +2,6 @@ import logging
import os import os
import requests import requests
from opentelemetry.trace import Status, StatusCode
from telemetry import get_tracer from telemetry import get_tracer
@ -20,28 +19,8 @@ def upload_to_oss(url, file_path):
with tracer.start_as_current_span("upload_to_oss") as span: with tracer.start_as_current_span("upload_to_oss") as span:
span.set_attribute("file.url", url) span.set_attribute("file.url", url)
span.set_attribute("file.path", file_path) span.set_attribute("file.path", file_path)
span.set_attribute("file.size", os.path.getsize(file_path))
max_retries = 5 max_retries = 5
retries = 0 retries = 0
if os.getenv("UPLOAD_METHOD") == "rclone":
with tracer.start_as_current_span("rclone_to_oss") as r_span:
replace_map = os.getenv("RCLONE_REPLACE_MAP")
r_span.set_attribute("rclone.replace_map", replace_map)
if replace_map != "":
replace_list = [i.split("|", 1) for i in replace_map.split(",")]
new_url = url
for (_src, _dst) in replace_list:
new_url = new_url.replace(_src, _dst)
new_url = new_url.split("?", 1)[0]
r_span.set_attribute("rclone.target_dir", new_url)
if new_url != url:
result = os.system(f"rclone copyto --no-check-dest --ignore-existing --multi-thread-chunk-size 32M --multi-thread-streams 8 {file_path} {new_url}")
r_span.set_attribute("rclone.result", result)
if result == 0:
span.set_status(Status(StatusCode.OK))
return True
else:
span.set_status(Status(StatusCode.ERROR))
while retries < max_retries: while retries < max_retries:
with tracer.start_as_current_span("upload_to_oss.request") as req_span: with tracer.start_as_current_span("upload_to_oss.request") as req_span:
req_span.set_attribute("http.retry_count", retries) req_span.set_attribute("http.retry_count", retries)
@ -49,24 +28,18 @@ def upload_to_oss(url, file_path):
req_span.set_attribute("http.method", "PUT") req_span.set_attribute("http.method", "PUT")
req_span.set_attribute("http.url", url) req_span.set_attribute("http.url", url)
with open(file_path, 'rb') as f: with open(file_path, 'rb') as f:
response = requests.put(url, data=f, stream=True, timeout=60, headers={"Content-Type": "video/mp4"}) response = requests.put(url, data=f, timeout=60) # 设置超时时间为1分钟
req_span.set_attribute("http.status_code", response.status_code) req_span.set_attribute("http.status_code", response.status_code)
req_span.set_attribute("http.response", response.text) if response.status_code == 200:
response.raise_for_status()
req_span.set_status(Status(StatusCode.OK))
span.set_status(Status(StatusCode.OK))
return True return True
except requests.exceptions.Timeout: except requests.exceptions.Timeout:
req_span.set_attribute("http.error", "Timeout") req_span.set_attribute("http.error", "Timeout")
req_span.set_status(Status(StatusCode.ERROR))
retries += 1 retries += 1
logger.warning(f"Upload timed out. Retrying {retries}/{max_retries}...") logger.warning(f"Upload timed out. Retrying {retries}/{max_retries}...")
except Exception as e: except Exception as e:
req_span.set_attribute("http.error", str(e)) req_span.set_attribute("http.error", str(e))
req_span.set_status(Status(StatusCode.ERROR))
retries += 1 retries += 1
logger.warning(f"Upload failed. Retrying {retries}/{max_retries}...") logger.warning(f"Upload failed. Retrying {retries}/{max_retries}...")
span.set_status(Status(StatusCode.ERROR))
return False return False
@ -98,19 +71,14 @@ def download_from_oss(url, file_path):
req_span.set_attribute("http.status_code", response.status_code) req_span.set_attribute("http.status_code", response.status_code)
with open(file_path, 'wb') as f: with open(file_path, 'wb') as f:
f.write(response.content) f.write(response.content)
req_span.set_attribute("file.size", os.path.getsize(file_path)) span.set_attribute("file.size", os.path.getsize(file_path))
req_span.set_status(Status(StatusCode.OK))
span.set_status(Status(StatusCode.OK))
return True return True
except requests.exceptions.Timeout: except requests.exceptions.Timeout:
req_span.set_attribute("http.error", "Timeout") span.set_attribute("http.error", "Timeout")
req_span.set_status(Status(StatusCode.ERROR))
retries += 1 retries += 1
logger.warning(f"Download timed out. Retrying {retries}/{max_retries}...") logger.warning(f"Download timed out. Retrying {retries}/{max_retries}...")
except Exception as e: except Exception as e:
req_span.set_attribute("http.error", str(e)) span.set_attribute("http.error", str(e))
req_span.set_status(Status(StatusCode.ERROR))
retries += 1 retries += 1
logger.warning(f"Download failed. Retrying {retries}/{max_retries}...") logger.warning(f"Download failed. Retrying {retries}/{max_retries}...")
span.set_status(Status(StatusCode.ERROR))
return False return False