Compare commits

..

No commits in common. "master" and "windows" have entirely different histories.

14 changed files with 358 additions and 916 deletions

7
.env
View File

@ -1,9 +1,4 @@
TEMPLATE_DIR=template/ TEMPLATE_DIR=template/
API_ENDPOINT=https://zhentuai.com/task/v1 API_ENDPOINT=http://127.0.0.1:8030/task/v1
ACCESS_KEY=TEST_ACCESS_KEY ACCESS_KEY=TEST_ACCESS_KEY
TEMP_DIR=tmp/ TEMP_DIR=tmp/
#REDIRECT_TO_URL=https://worker-renderworker-re-kekuflqjxx.cn-shanghai.fcapp.run/
# QSV
ENCODER_ARGS="-c:v h264_qsv -global_quality 28 -look_ahead 1"
# NVENC
#ENCODER_ARGS="-c:v h264_nvenc -cq:v 24 -preset:v p7 -tune:v hq -profile:v high"

5
.gitignore vendored
View File

@ -6,11 +6,6 @@ __pycache__/
*.so *.so
.Python .Python
build/ build/
dist/
*.mp4
*.ts
rand*.ts
tmp_concat_*.txt
*.egg-info/ *.egg-info/
*.egg *.egg
*.manifest *.manifest

View File

@ -1,21 +0,0 @@
FROM linuxserver/ffmpeg:7.1.1
LABEL authors="Jerry Yan"
RUN sed -i 's@//.*archive.ubuntu.com@//mirrors.ustc.edu.cn@g' /etc/apt/sources.list && \
sed -i 's/security.ubuntu.com/mirrors.ustc.edu.cn/g' /etc/apt/sources.list
RUN apt-get update && \
apt-get install -y --no-install-recommends \
python3-pip \
python3-dev \
python3-setuptools \
python3-wheel \
python3-venv
RUN apt-get clean && \
rm -rf /var/lib/apt/lists/*
COPY . /app/
RUN python3 -m venv /app/venv
RUN /app/venv/bin/python -m pip config set global.index-url https://mirrors.ustc.edu.cn/pypi/simple
RUN /app/venv/bin/python -m pip install -r /app/requirements.txt
WORKDIR /app
ENTRYPOINT ["/app/venv/bin/python", "app.py"]

40
app.py
View File

@ -1,40 +0,0 @@
import time
import flask
import config
import biz.task
import template
from telemetry import init_opentelemetry
from template import load_local_template
from util import api
load_local_template()
import logging
LOGGER = logging.getLogger(__name__)
init_opentelemetry(batch=False)
app = flask.Flask(__name__)
@app.get('/health/check')
def health_check():
return api.sync_center()
@app.post('/')
def do_nothing():
return "NOOP"
@app.post('/<task_id>')
def do_task(task_id):
task_info = api.get_task_info(task_id)
local_template_info = template.get_template_def(task_info.get("templateId"))
template_info = api.get_template_info(task_info.get("templateId"))
if local_template_info:
if local_template_info.get("updateTime") != template_info.get("updateTime"):
template.download_template(task_info.get("templateId"))
biz.task.start_task(task_info)
return "OK"
if __name__ == '__main__':
app.run(host="0.0.0.0", port=9998)

View File

@ -2,83 +2,48 @@ import json
import os.path import os.path
import time import time
from opentelemetry.trace import Status, StatusCode
from entity.ffmpeg import FfmpegTask from entity.ffmpeg import FfmpegTask
import logging import logging
from util import ffmpeg, oss from util import ffmpeg, oss
from util.ffmpeg import fade_out_audio
from telemetry import get_tracer
logger = logging.getLogger('biz/ffmpeg') logger = logging.getLogger('biz/ffmpeg')
def parse_ffmpeg_task(task_info, template_info): def parse_ffmpeg_task(task_info, template_info):
tracer = get_tracer(__name__) tasks = []
with tracer.start_as_current_span("parse_ffmpeg_task") as span: # 中间片段
tasks = [] task_params_str = task_info.get("taskParams", "{}")
# 中间片段 task_params = json.loads(task_params_str)
task_params_str = task_info.get("taskParams", "{}") for part in template_info.get("video_parts"):
span.set_attribute("task_params", task_params_str) source = parse_video(part.get('source'), task_params, template_info)
task_params = json.loads(task_params_str) if not source:
task_params_orig = json.loads(task_params_str) logger.warning("no video found for part: " + str(part))
for part in template_info.get("video_parts"): continue
source = parse_video(part.get('source'), task_params, template_info) sub_ffmpeg_task = FfmpegTask(source)
if not source: sub_ffmpeg_task.annexb = True
logger.warning("no video found for part: " + str(part)) sub_ffmpeg_task.frame_rate = template_info.get("frame_rate", 25)
continue for lut in part.get('filters', []):
only_if = part.get('only_if', '') sub_ffmpeg_task.add_lut(os.path.join(template_info.get("local_path"), lut))
if only_if: for audio in part.get('audios', []):
if not check_placeholder_exist(only_if, task_params_orig): sub_ffmpeg_task.add_audios(os.path.join(template_info.get("local_path"), audio))
logger.info("because only_if exist, placeholder: %s not exist, skip part: %s", only_if, part) for overlay in part.get('overlays', []):
continue sub_ffmpeg_task.add_overlay(os.path.join(template_info.get("local_path"), overlay))
sub_ffmpeg_task = FfmpegTask(source) tasks.append(sub_ffmpeg_task)
sub_ffmpeg_task.resolution = template_info.get("video_size", "") output_file = "out_" + str(time.time()) + ".mp4"
sub_ffmpeg_task.annexb = True task = FfmpegTask(tasks, output_file=output_file)
sub_ffmpeg_task.ext_data = find_placeholder_params(part.get('source'), task_params) or {} overall = template_info.get("overall_template")
sub_ffmpeg_task.frame_rate = template_info.get("frame_rate", 25) task.frame_rate = template_info.get("frame_rate", 25)
sub_ffmpeg_task.center_cut = part.get("crop_mode", None) if overall.get('source', ''):
for effect in part.get('effects', []): source = parse_video(overall.get('source'), task_params, template_info)
sub_ffmpeg_task.add_effect(effect) task.add_inputs(source)
for lut in part.get('filters', []): for lut in overall.get('filters', []):
sub_ffmpeg_task.add_lut(os.path.join(template_info.get("local_path"), lut)) task.add_lut(os.path.join(template_info.get("local_path"), lut))
for audio in part.get('audios', []): for audio in overall.get('audios', []):
sub_ffmpeg_task.add_audios(os.path.join(template_info.get("local_path"), audio)) task.add_audios(os.path.join(template_info.get("local_path"), audio))
for overlay in part.get('overlays', []): for overlay in overall.get('overlays', []):
sub_ffmpeg_task.add_overlay(os.path.join(template_info.get("local_path"), overlay)) task.add_overlay(os.path.join(template_info.get("local_path"), overlay))
tasks.append(sub_ffmpeg_task) return task
output_file = "out_" + str(time.time()) + ".mp4"
task = FfmpegTask(tasks, output_file=output_file)
task.resolution = template_info.get("video_size", "")
overall = template_info.get("overall_template")
task.center_cut = template_info.get("crop_mode", None)
task.frame_rate = template_info.get("frame_rate", 25)
if overall.get('source', ''):
source = parse_video(overall.get('source'), task_params, template_info)
task.add_inputs(source)
for effect in overall.get('effects', []):
task.add_effect(effect)
for lut in overall.get('filters', []):
task.add_lut(os.path.join(template_info.get("local_path"), lut))
for audio in overall.get('audios', []):
task.add_audios(os.path.join(template_info.get("local_path"), audio))
for overlay in overall.get('overlays', []):
task.add_overlay(os.path.join(template_info.get("local_path"), overlay))
return task
def find_placeholder_params(source, task_params):
if source.startswith('PLACEHOLDER_'):
placeholder_id = source.replace('PLACEHOLDER_', '')
new_sources = task_params.get(placeholder_id, [])
if type(new_sources) is list:
if len(new_sources) == 0:
logger.debug("no video found for placeholder: " + placeholder_id)
return {}
else:
return new_sources[0]
return {}
def parse_video(source, task_params, template_info): def parse_video(source, task_params, template_info):
@ -91,8 +56,8 @@ def parse_video(source, task_params, template_info):
logger.debug("no video found for placeholder: " + placeholder_id) logger.debug("no video found for placeholder: " + placeholder_id)
return None return None
else: else:
_pick_source = new_sources.pop(0) # TODO: Random Pick / Policy Pick
new_sources = _pick_source.get("url") new_sources = new_sources[0].get("url")
if new_sources.startswith("http"): if new_sources.startswith("http"):
_, source_name = os.path.split(new_sources) _, source_name = os.path.split(new_sources)
oss.download_from_oss(new_sources, source_name) oss.download_from_oss(new_sources, source_name)
@ -101,39 +66,18 @@ def parse_video(source, task_params, template_info):
return os.path.join(template_info.get("local_path"), source) return os.path.join(template_info.get("local_path"), source)
def check_placeholder_exist(placeholder_id, task_params):
if placeholder_id in task_params:
new_sources = task_params.get(placeholder_id, [])
if type(new_sources) is list:
if len(new_sources) == 0:
return False
else:
return True
return True
return False
def start_ffmpeg_task(ffmpeg_task): def start_ffmpeg_task(ffmpeg_task):
tracer = get_tracer(__name__) for task in ffmpeg_task.analyze_input_render_tasks():
with tracer.start_as_current_span("start_ffmpeg_task") as span: start_ffmpeg_task(task)
for task in ffmpeg_task.analyze_input_render_tasks(): ffmpeg_task.correct_task_type()
result = start_ffmpeg_task(task) return ffmpeg.start_render(ffmpeg_task)
if not result:
return False
ffmpeg_task.correct_task_type()
result = ffmpeg.start_render(ffmpeg_task)
if not result:
span.set_status(Status(StatusCode.ERROR))
return False
span.set_status(Status(StatusCode.OK))
return True
def clear_task_tmp_file(ffmpeg_task): def clear_task_tmp_file(ffmpeg_task):
for task in ffmpeg_task.analyze_input_render_tasks(): for task in ffmpeg_task.analyze_input_render_tasks():
clear_task_tmp_file(task) clear_task_tmp_file(task)
try: try:
if os.getenv("TEMPLATE_DIR") not in ffmpeg_task.get_output_file(): if "template" not in ffmpeg_task.get_output_file():
os.remove(ffmpeg_task.get_output_file()) os.remove(ffmpeg_task.get_output_file())
logger.info("delete tmp file: " + ffmpeg_task.get_output_file()) logger.info("delete tmp file: " + ffmpeg_task.get_output_file())
else: else:

View File

@ -1,40 +1,24 @@
import json
from opentelemetry.trace import Status, StatusCode
from biz.ffmpeg import parse_ffmpeg_task, start_ffmpeg_task, clear_task_tmp_file, probe_video_info, fade_out_audio
from telemetry import get_tracer
from template import get_template_def from template import get_template_def
from util import api from util import api
def start_task(task_info): def start_task(task_info):
tracer = get_tracer(__name__) from biz.ffmpeg import parse_ffmpeg_task, start_ffmpeg_task, clear_task_tmp_file, probe_video_info
with tracer.start_as_current_span("start_task") as span: task_info = api.normalize_task(task_info)
task_info = api.normalize_task(task_info) template_info = get_template_def(task_info.get("templateId"))
span.set_attribute("task", json.dumps(task_info)) api.report_task_start(task_info)
span.set_attribute("scenicId", task_info.get("scenicId", "?")) ffmpeg_task = parse_ffmpeg_task(task_info, template_info)
span.set_attribute("templateId", task_info.get("templateId")) result = start_ffmpeg_task(ffmpeg_task)
template_info = get_template_def(task_info.get("templateId")) if not result:
api.report_task_start(task_info) return api.report_task_failed(task_info)
ffmpeg_task = parse_ffmpeg_task(task_info, template_info) oss_result = api.upload_task_file(task_info, ffmpeg_task)
result = start_ffmpeg_task(ffmpeg_task) if not oss_result:
if not result: return api.report_task_failed(task_info)
span.set_status(Status(StatusCode.ERROR)) # 获取视频长度宽度和时长
return api.report_task_failed(task_info) width, height, duration = probe_video_info(ffmpeg_task)
width, height, duration = probe_video_info(ffmpeg_task) clear_task_tmp_file(ffmpeg_task)
# 音频淡出 api.report_task_success(task_info, videoInfo={
new_fn = fade_out_audio(ffmpeg_task.get_output_file(), duration) "width": width,
ffmpeg_task.set_output_file(new_fn) "height": height,
oss_result = api.upload_task_file(task_info, ffmpeg_task) "duration": duration
if not oss_result: })
span.set_status(Status(StatusCode.ERROR))
return api.report_task_failed(task_info)
# 获取视频长度宽度和时长
clear_task_tmp_file(ffmpeg_task)
api.report_task_success(task_info, videoInfo={
"width": width,
"height": height,
"duration": duration
})
span.set_status(Status(StatusCode.OK))

View File

@ -1,20 +1,9 @@
import json
import os
import time import time
import uuid import uuid
from typing import Any
DEFAULT_ARGS = ("-shortest",)
ENCODER_ARGS = ("-c:v", "h264", ) if not os.getenv("ENCODER_ARGS", False) else os.getenv("ENCODER_ARGS", "").split(" ")
VIDEO_ARGS = ("-profile:v", "high", "-level:v", "4", )
AUDIO_ARGS = ("-c:a", "aac", "-b:a", "128k", "-ar", "48000", "-ac", "2", )
MUTE_AUDIO_INPUT = ("-f", "lavfi", "-i", "anullsrc=cl=stereo:r=48000", )
class FfmpegTask(object): class FfmpegTask(object):
effects: list[str]
def __init__(self, input_file, task_type='copy', output_file=''): def __init__(self, input_file, task_type='copy', output_file=''):
self.annexb = False self.annexb = False
if type(input_file) is str: if type(input_file) is str:
@ -25,20 +14,15 @@ class FfmpegTask(object):
self.input_file = input_file self.input_file = input_file
else: else:
self.input_file = [] self.input_file = []
self.zoom_cut = None
self.center_cut = None
self.ext_data = {}
self.task_type = task_type self.task_type = task_type
self.output_file = output_file self.output_file = output_file
self.mute = True self.mute = True
self.speed = 1 self.speed = 1
self.frame_rate = 25 self.frame_rate = 25
self.resolution = None
self.subtitles = [] self.subtitles = []
self.luts = [] self.luts = []
self.audios = [] self.audios = []
self.overlays = [] self.overlays = []
self.effects = []
def __repr__(self): def __repr__(self):
_str = f'FfmpegTask(input_file={self.input_file}, task_type={self.task_type}' _str = f'FfmpegTask(input_file={self.input_file}, task_type={self.task_type}'
@ -50,11 +34,8 @@ class FfmpegTask(object):
_str += f', overlays={self.overlays}' _str += f', overlays={self.overlays}'
if self.annexb: if self.annexb:
_str += f', annexb={self.annexb}' _str += f', annexb={self.annexb}'
if self.effects:
_str += f', effects={self.effects}'
if self.mute: if self.mute:
_str += f', mute={self.mute}' _str += f', mute={self.mute}'
_str += f', center_cut={self.center_cut}'
return _str + ')' return _str + ')'
def analyze_input_render_tasks(self): def analyze_input_render_tasks(self):
@ -96,10 +77,6 @@ class FfmpegTask(object):
self.luts.extend(luts) self.luts.extend(luts)
self.correct_task_type() self.correct_task_type()
def add_effect(self, *effects):
self.effects.extend(effects)
self.correct_task_type()
def get_output_file(self): def get_output_file(self):
if self.task_type == 'copy': if self.task_type == 'copy':
return self.input_file[0] return self.input_file[0]
@ -122,14 +99,8 @@ class FfmpegTask(object):
return False return False
if len(self.subtitles) > 0: if len(self.subtitles) > 0:
return False return False
if len(self.effects) > 0:
return False
if self.speed != 1: if self.speed != 1:
return False return False
if self.zoom_cut is not None:
return False
if self.center_cut is not None:
return False
return True return True
def check_can_copy(self): def check_can_copy(self):
@ -139,114 +110,45 @@ class FfmpegTask(object):
return False return False
if len(self.subtitles) > 0: if len(self.subtitles) > 0:
return False return False
if len(self.effects) > 0:
return False
if self.speed != 1: if self.speed != 1:
return False return False
if len(self.audios) >= 1: if len(self.audios) > 1:
return False return False
if len(self.input_file) > 1: if len(self.input_file) > 1:
return False return False
if self.zoom_cut is not None:
return False
if self.center_cut is not None:
return False
return True return True
def check_audio_track(self): def check_audio_track(self):
... if len(self.audios) > 0:
self.mute = False
def get_ffmpeg_args(self): def get_ffmpeg_args(self):
args = ['-y', '-hide_banner'] args = ['-y', '-hide_banner']
if self.task_type == 'encode': if self.task_type == 'encode':
# args += ('-hwaccel', 'qsv', '-hwaccel_output_format', 'qsv')
input_args = [] input_args = []
filter_args = [] filter_args = []
output_args = [*VIDEO_ARGS, *AUDIO_ARGS, *ENCODER_ARGS, *DEFAULT_ARGS] output_args = ["-shortest", "-c:v", "h264_qsv", "-global_quality", "28", "-look_ahead", "1"]
if self.annexb: if self.annexb:
output_args.append("-bsf:v") output_args.append("-bsf:v")
output_args.append("h264_mp4toannexb") output_args.append("h264_mp4toannexb")
output_args.append("-reset_timestamps")
output_args.append("1")
video_output_str = "[0:v]" video_output_str = "[0:v]"
audio_output_str = "" audio_output_str = "[0:v]"
audio_track_index = 0 video_input_count = 0
effect_index = 0 audio_input_count = 0
for input_file in self.input_file: for input_file in self.input_file:
input_args.append("-i") input_args.append("-i")
if type(input_file) is str: if type(input_file) is str:
input_args.append(input_file) input_args.append(input_file)
elif isinstance(input_file, FfmpegTask): elif isinstance(input_file, FfmpegTask):
input_args.append(input_file.get_output_file()) input_args.append(input_file.get_output_file())
if self.center_cut == 1:
pos_json_str = self.ext_data.get('posJson', '{}')
pos_json = json.loads(pos_json_str)
_v_w = pos_json.get('imgWidth', 1)
_f_x = pos_json.get('ltX', 0)
_f_x2 = pos_json.get('rbX', 0)
_x = f'{float((_f_x2 + _f_x)/(2 * _v_w)) :.4f}*iw-ih*ih/(2*iw)'
filter_args.append(f"{video_output_str}crop=x={_x}:y=0:w=ih*ih/iw:h=ih[v_cut{effect_index}]")
video_output_str = f"[v_cut{effect_index}]"
effect_index += 1
for effect in self.effects:
if effect.startswith("cameraShot:"):
param = effect.split(":", 2)[1]
if param == '':
param = "3,1,0"
_split = param.split(",")
start = 3
duration = 1
rotate_deg = 0
if len(_split) >= 3:
if _split[2] == '':
rotate_deg = 0
else:
rotate_deg = int(_split[2])
if len(_split) >= 2:
duration = float(_split[1])
if len(_split) >= 1:
start = float(_split[0])
_start_out_str = "[eff_s]"
_mid_out_str = "[eff_m]"
_end_out_str = "[eff_e]"
filter_args.append(f"{video_output_str}split=3{_start_out_str}{_mid_out_str}{_end_out_str}")
filter_args.append(f"{_start_out_str}select=lt(n\\,{int(start*self.frame_rate)}){_start_out_str}")
filter_args.append(f"{_end_out_str}select=gt(n\\,{int(start*self.frame_rate)}){_end_out_str}")
filter_args.append(f"{_mid_out_str}select=eq(n\\,{int(start*self.frame_rate)}){_mid_out_str}")
filter_args.append(f"{_mid_out_str}tpad=start_mode=clone:start_duration={duration:.4f}{_mid_out_str}")
if rotate_deg != 0:
filter_args.append(f"{_mid_out_str}rotate=PI*{rotate_deg}/360{_mid_out_str}")
# filter_args.append(f"{video_output_str}trim=start=0:end={start+duration},tpad=stop_mode=clone:stop_duration={duration},setpts=PTS-STARTPTS{_start_out_str}")
# filter_args.append(f"tpad=start_mode=clone:start_duration={duration},setpts=PTS-STARTPTS{_start_out_str}")
# filter_args.append(f"{_end_out_str}trim=start={start}{_end_out_str}")
video_output_str = f"[v_eff{effect_index}]"
# filter_args.append(f"{_end_out_str}{_start_out_str}overlay=eof_action=pass{video_output_str}")
filter_args.append(f"{_start_out_str}{_mid_out_str}{_end_out_str}concat=n=3:v=1:a=0,setpts=N/{self.frame_rate}/TB{video_output_str}")
effect_index += 1
elif effect.startswith("ospeed:"):
param = effect.split(":", 2)[1]
if param == '':
param = "1"
if param != "1":
# 视频变速
effect_index += 1
filter_args.append(f"{video_output_str}setpts={param}*PTS[v_eff{effect_index}]")
video_output_str = f"[v_eff{effect_index}]"
elif effect.startswith("zoom:"):
...
...
for lut in self.luts: for lut in self.luts:
filter_args.append(f"{video_output_str}lut3d=file={lut}{video_output_str}") filter_args.append("[0:v]lut3d=file=" + lut + "[0:v]")
if self.resolution:
filter_args.append(f"{video_output_str}scale={self.resolution.replace('x', ':')}[v]")
video_output_str = "[v]"
for overlay in self.overlays: for overlay in self.overlays:
input_index = input_args.count("-i") input_index = input_args.count("-i")
input_args.append("-i") input_args.append("-i")
input_args.append(overlay) input_args.append(overlay)
if os.getenv("OLD_FFMPEG"): filter_args.append(f"{video_output_str}[{input_index}:v]scale=rw:rh[v]")
filter_args.append(f"{video_output_str}[{input_index}:v]scale2ref=iw:ih[v]")
else:
filter_args.append(f"{video_output_str}[{input_index}:v]scale=rw:rh[v]")
filter_args.append(f"[v][{input_index}:v]overlay=1:eof_action=endall[v]") filter_args.append(f"[v][{input_index}:v]overlay=1:eof_action=endall[v]")
video_output_str = "[v]" video_output_str = "[v]"
for subtitle in self.subtitles: for subtitle in self.subtitles:
@ -257,96 +159,97 @@ class FfmpegTask(object):
output_args.append("-r") output_args.append("-r")
output_args.append(f"{self.frame_rate}") output_args.append(f"{self.frame_rate}")
if self.mute: if self.mute:
input_index = input_args.count("-i") output_args.append("-an")
input_args += MUTE_AUDIO_INPUT
filter_args.append(f"[{input_index}:a]acopy[a]")
audio_track_index += 1
audio_output_str = "[a]"
else: else:
audio_output_str = "[0:a]" input_index = 0
audio_track_index += 1 for audio in self.audios:
for audio in self.audios: input_index = input_args.count("-i")
input_index = input_args.count("-i") input_args.append("-i")
input_args.append("-i") input_args.append(audio.replace("\\", "/"))
input_args.append(audio.replace("\\", "/")) if audio_input_count > 0:
audio_track_index += 1 filter_args.append(f"{audio_output_str}[{input_index}:a]amix[a]")
filter_args.append(f"{audio_output_str}[{input_index}:a]amix=duration=shortest:dropout_transition=0:normalize=0[a]") audio_output_str = "[a]"
audio_output_str = "[a]" else:
if audio_output_str: audio_output_str = f"[{input_index}:a]"
output_args.append("-map") audio_input_count += 1
if audio_input_count == 1:
audio_output_str = f"{input_index}"
output_args.append(f"-map")
output_args.append(audio_output_str) output_args.append(audio_output_str)
_filter_args = [] if len(filter_args) == 0 else ["-filter_complex", ";".join(filter_args)] return args + input_args + ["-filter_complex", ";".join(filter_args)] + output_args + [self.get_output_file()]
return args + input_args + _filter_args + output_args + [self.get_output_file()]
elif self.task_type == 'concat': elif self.task_type == 'concat':
# 无法通过 annexb 合并的 # 无法通过 annexb 合并的
input_args = [] input_args = []
output_args = [*DEFAULT_ARGS] output_args = ["-shortest"]
filter_args = [] if self.check_annexb() and len(self.audios) <= 1:
audio_output_str = "" # output_args
audio_track_index = 0 if len(self.audios) > 0:
# output_args input_args.append("-an")
if len(self.input_file) == 1: _tmp_file = "tmp_concat_"+str(time.time())+".txt"
_file = self.input_file[0]
from util.ffmpeg import probe_video_audio
if type(_file) is str:
input_args += ["-i", _file]
self.mute = not probe_video_audio(_file)
elif isinstance(_file, FfmpegTask):
input_args += ["-i", _file.get_output_file()]
self.mute = not probe_video_audio(_file.get_output_file())
else:
_tmp_file = "tmp_concat_" + str(time.time()) + ".txt"
from util.ffmpeg import probe_video_audio
with open(_tmp_file, "w", encoding="utf-8") as f: with open(_tmp_file, "w", encoding="utf-8") as f:
for input_file in self.input_file: for input_file in self.input_file:
if type(input_file) is str: if type(input_file) is str:
f.write("file '" + input_file + "'\n") f.write("file '"+input_file+"'\n")
elif isinstance(input_file, FfmpegTask): elif isinstance(input_file, FfmpegTask):
f.write("file '" + input_file.get_output_file() + "'\n") f.write("file '" + input_file.get_output_file() + "'\n")
input_args += ["-f", "concat", "-safe", "0", "-i", _tmp_file] input_args += ("-f", "concat", "-safe", "0", "-i", _tmp_file)
self.mute = not probe_video_audio(_tmp_file, "concat") output_args.append("-c:v")
output_args.append("-map") output_args.append("copy")
output_args.append("0:v") if len(self.audios) > 0:
output_args.append("-c:v") input_args.append("-i")
output_args.append("copy") input_args.append(self.audios[0])
if self.mute: output_args.append("-c:a")
input_index = input_args.count("-i") output_args.append("copy")
input_args += MUTE_AUDIO_INPUT output_args.append("-f")
audio_output_str = f"[{input_index}:a]" output_args.append("mp4")
audio_track_index += 1 output_args += ("-c:v", "h264_qsv", "-r", "25", "-global_quality", "28", "-look_ahead", "1")
else: return args + input_args + output_args + [self.get_output_file()]
audio_output_str = "[0:a]" output_args += ("-c:v", "h264_qsv", "-r", "25", "-global_quality", "28", "-look_ahead", "1")
audio_track_index += 1 filter_args = []
for audio in self.audios: video_output_str = "[0:v]"
audio_output_str = "[0:a]"
video_input_count = 0
audio_input_count = 0
for input_file in self.input_file:
input_index = input_args.count("-i") input_index = input_args.count("-i")
input_args.append("-i") input_args.append("-i")
input_args.append(audio.replace("\\", "/")) if type(input_file) is str:
audio_track_index += 1 input_args.append(input_file.replace("\\", "/"))
filter_args.append(f"{audio_output_str}[{input_index}:a]amix=duration=shortest:dropout_transition=0:normalize=0[a]") elif isinstance(input_file, FfmpegTask):
audio_output_str = "[a]" input_args.append(input_file.get_output_file().replace("\\", "/"))
if audio_output_str: if video_input_count > 0:
output_args.append("-map") filter_args.append(f"{video_output_str}[{input_index}:v]concat=n=2:v=1:a=0[v]")
if audio_track_index <= 1: video_output_str = "[v]"
output_args.append(audio_output_str[1:-1])
else: else:
output_args.append(audio_output_str) video_output_str = f"[{input_index}:v]"
output_args += AUDIO_ARGS video_input_count += 1
if self.annexb: output_args.append("-map")
output_args.append("-bsf:v") output_args.append(video_output_str)
output_args.append("h264_mp4toannexb") if self.mute:
output_args.append("-bsf:a") output_args.append("-an")
output_args.append("setts=pts=DTS") else:
output_args.append("-f") input_index = 0
output_args.append("mpegts" if self.annexb else "mp4") for audio in self.audios:
_filter_args = [] if len(filter_args) == 0 else ["-filter_complex", ";".join(filter_args)] input_index = input_args.count("-i")
return args + input_args + _filter_args + output_args + [self.get_output_file()] input_args.append("-i")
input_args.append(audio.replace("\\", "/"))
if audio_input_count > 0:
filter_args.append(f"{audio_output_str}[{input_index}:a]amix[a]")
audio_output_str = "[a]"
else:
audio_output_str = f"[{input_index}:a]"
audio_input_count += 1
if audio_input_count == 1:
audio_output_str = f"{input_index}"
output_args.append(f"-map")
output_args.append(audio_output_str)
return args + input_args + ["-filter_complex", ";".join(filter_args)] + output_args + [self.get_output_file()]
elif self.task_type == 'copy': elif self.task_type == 'copy':
if len(self.input_file) == 1: if len(self.input_file) == 1:
if type(self.input_file[0]) is str: if type(self.input_file[0]) is str:
if self.input_file[0] == self.get_output_file(): if self.input_file[0] == self.get_output_file():
return [] return []
return args + ["-i", self.input_file[0]] + ["-c", "copy", self.get_output_file()] return args + ["-i", self.input_file[0]] + ["-c", "copy", self.get_output_file()]
return []
def set_output_file(self, file=None): def set_output_file(self, file=None):
if file is None: if file is None:

View File

@ -1,42 +1,19 @@
from time import sleep from time import sleep
import config
import biz.task import biz.task
from telemetry import init_opentelemetry import config
from template import load_local_template from template import load_local_template
from util import api from util import api
import os
import glob
load_local_template() load_local_template()
import logging
LOGGER = logging.getLogger(__name__)
init_opentelemetry()
while True: while True:
# print(get_sys_info()) # print(get_sys_info())
print("waiting for task...") print("waiting for task...")
try: task_list = api.sync_center()
task_list = api.sync_center()
except Exception as e:
LOGGER.error("sync_center error", exc_info=e)
sleep(5)
continue
if len(task_list) == 0: if len(task_list) == 0:
# 删除当前文件夹下所有以.mp4、.ts结尾的文件
for file_globs in ['*.mp4', '*.ts', 'tmp_concat*.txt']:
for file_path in glob.glob(file_globs):
try:
os.remove(file_path)
print(f"Deleted file: {file_path}")
except Exception as e:
LOGGER.error(f"Error deleting file {file_path}", exc_info=e)
sleep(5) sleep(5)
for task in task_list: for task in task_list:
print("start task:", task) print("start task:", task)
try: biz.task.start_task(task)
biz.task.start_task(task)
except Exception as e:
LOGGER.error("task_start error", exc_info=e)

View File

@ -1,7 +1,3 @@
requests~=2.32.3 requests~=2.32.3
psutil~=6.1.0 psutil~=6.1.0
python-dotenv~=1.0.1 python-dotenv~=1.0.1
opentelemetry-api~=1.30.0
opentelemetry-sdk~=1.30.0
opentelemetry-exporter-otlp~=1.30.0
flask~=3.1.0

View File

@ -1,35 +0,0 @@
import os
from constant import SOFTWARE_VERSION
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter as OTLPSpanHttpExporter
from opentelemetry.sdk.resources import DEPLOYMENT_ENVIRONMENT, HOST_NAME, Resource, SERVICE_NAME, SERVICE_VERSION
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, SimpleSpanProcessor
def get_tracer(name):
return trace.get_tracer(name)
# 初始化 OpenTelemetry
def init_opentelemetry(batch=True):
# 设置服务名、主机名
resource = Resource(attributes={
SERVICE_NAME: "RENDER_WORKER",
SERVICE_VERSION: SOFTWARE_VERSION,
DEPLOYMENT_ENVIRONMENT: "Python",
HOST_NAME: os.getenv("ACCESS_KEY"),
})
# 使用HTTP协议上报
if batch:
span_processor = BatchSpanProcessor(OTLPSpanHttpExporter(
endpoint="https://oltp.jerryyan.top/v1/traces",
))
else:
span_processor = SimpleSpanProcessor(OTLPSpanHttpExporter(
endpoint="https://oltp.jerryyan.top/v1/traces",
))
trace_provider = TracerProvider(resource=resource, active_span_processor=span_processor)
trace.set_tracer_provider(trace_provider)

View File

@ -2,7 +2,6 @@ import json
import os import os
import logging import logging
from telemetry import get_tracer
from util import api, oss from util import api, oss
TEMPLATES = {} TEMPLATES = {}
@ -76,50 +75,48 @@ def get_template_def(template_id):
return TEMPLATES.get(template_id) return TEMPLATES.get(template_id)
def download_template(template_id): def download_template(template_id):
tracer = get_tracer(__name__) template_info = api.get_template_info(template_id)
with tracer.start_as_current_span("download_template"): if not os.path.isdir(template_info['local_path']):
template_info = api.get_template_info(template_id) os.makedirs(template_info['local_path'])
if not os.path.isdir(template_info['local_path']): # download template assets
os.makedirs(template_info['local_path']) overall_template = template_info['overall_template']
# download template assets video_parts = template_info['video_parts']
overall_template = template_info['overall_template'] def _download_assets(_template):
video_parts = template_info['video_parts'] if 'source' in _template:
def _download_assets(_template): if str(_template['source']).startswith("http"):
if 'source' in _template: _, _fn = os.path.split(_template['source'])
if str(_template['source']).startswith("http"): new_fp = os.path.join(template_info['local_path'], _fn)
_, _fn = os.path.split(_template['source']) oss.download_from_oss(_template['source'], new_fp)
new_fp = os.path.join(template_info['local_path'], _fn) if _fn.endswith(".mp4"):
oss.download_from_oss(_template['source'], new_fp) from util.ffmpeg import to_annexb
if _fn.endswith(".mp4"): new_fp = to_annexb(new_fp)
from util.ffmpeg import re_encode_and_annexb _template['source'] = os.path.relpath(new_fp, template_info['local_path'])
new_fp = re_encode_and_annexb(new_fp) if 'overlays' in _template:
_template['source'] = os.path.relpath(new_fp, template_info['local_path']) for i in range(len(_template['overlays'])):
if 'overlays' in _template: overlay = _template['overlays'][i]
for i in range(len(_template['overlays'])): if str(overlay).startswith("http"):
overlay = _template['overlays'][i] _, _fn = os.path.split(overlay)
if str(overlay).startswith("http"): oss.download_from_oss(overlay, os.path.join(template_info['local_path'], _fn))
_, _fn = os.path.split(overlay) _template['overlays'][i] = _fn
oss.download_from_oss(overlay, os.path.join(template_info['local_path'], _fn)) if 'luts' in _template:
_template['overlays'][i] = _fn for i in range(len(_template['luts'])):
if 'luts' in _template: lut = _template['luts'][i]
for i in range(len(_template['luts'])): if str(lut).startswith("http"):
lut = _template['luts'][i] _, _fn = os.path.split(lut)
if str(lut).startswith("http"): oss.download_from_oss(lut, os.path.join(template_info['local_path'], _fn))
_, _fn = os.path.split(lut) _template['luts'][i] = _fn
oss.download_from_oss(lut, os.path.join(template_info['local_path'], _fn)) if 'audios' in _template:
_template['luts'][i] = _fn for i in range(len(_template['audios'])):
if 'audios' in _template: if str(_template['audios'][i]).startswith("http"):
for i in range(len(_template['audios'])): _, _fn = os.path.split(_template['audios'][i])
if str(_template['audios'][i]).startswith("http"): oss.download_from_oss(_template['audios'][i], os.path.join(template_info['local_path'], _fn))
_, _fn = os.path.split(_template['audios'][i]) _template['audios'][i] = _fn
oss.download_from_oss(_template['audios'][i], os.path.join(template_info['local_path'], _fn)) _download_assets(overall_template)
_template['audios'][i] = _fn for video_part in video_parts:
_download_assets(overall_template) _download_assets(video_part)
for video_part in video_parts: with open(os.path.join(template_info['local_path'], 'template.json'), 'w', encoding='utf-8') as f:
_download_assets(video_part) json.dump(template_info, f)
with open(os.path.join(template_info['local_path'], 'template.json'), 'w', encoding='utf-8') as f: load_template(template_id, template_info['local_path'])
json.dump(template_info, f)
load_template(template_id, template_info['local_path'])
def analyze_template(template_id): def analyze_template(template_id):

View File

@ -1,14 +1,9 @@
import json
import logging import logging
import os import os
import threading
import requests import requests
from opentelemetry.trace import Status, StatusCode
import util.system import util.system
from telemetry import get_tracer
from util import oss
session = requests.Session() session = requests.Session()
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -24,13 +19,13 @@ def sync_center():
通过接口获取任务 通过接口获取任务
:return: 任务列表 :return: 任务列表
""" """
from template import TEMPLATES, download_template
try: try:
from template import TEMPLATES, download_template
response = session.post(os.getenv('API_ENDPOINT') + "/sync", json={ response = session.post(os.getenv('API_ENDPOINT') + "/sync", json={
'accessKey': os.getenv('ACCESS_KEY'), 'accessKey': os.getenv('ACCESS_KEY'),
'clientStatus': util.system.get_sys_info(), 'clientStatus': util.system.get_sys_info(),
'templateList': [{'id': t.get('id', ''), 'updateTime': t.get('updateTime', '')} for t in 'templateList': [{'id': t.get('id', ''), 'updateTime': t.get('updateTime', '')} for t in TEMPLATES.values()]
TEMPLATES.values()]
}, timeout=10) }, timeout=10)
response.raise_for_status() response.raise_for_status()
except requests.RequestException as e: except requests.RequestException as e:
@ -45,13 +40,6 @@ def sync_center():
tasks = [] tasks = []
templates = [] templates = []
logger.warning("获取任务失败") logger.warning("获取任务失败")
if os.getenv("REDIRECT_TO_URL", False) != False:
for task in tasks:
_sess = requests.Session()
logger.info("重定向任务【%s】至配置的地址:%s", task.get("id"), os.getenv("REDIRECT_TO_URL"))
url = f"{os.getenv('REDIRECT_TO_URL')}{task.get('id')}"
threading.Thread(target=requests.post, args=(url,)).start()
return []
for template in templates: for template in templates:
template_id = template.get('id', '') template_id = template.get('id', '')
if template_id: if template_id:
@ -68,189 +56,119 @@ def get_template_info(template_id):
:type template_id: str :type template_id: str
:return: 模板信息 :return: 模板信息
""" """
tracer = get_tracer(__name__)
with tracer.start_as_current_span("get_template_info"):
with tracer.start_as_current_span("get_template_info.request") as req_span:
try:
req_span.set_attribute("http.method", "POST")
req_span.set_attribute("http.url", '{0}/template/{1}'.format(os.getenv('API_ENDPOINT'), template_id))
response = session.post('{0}/template/{1}'.format(os.getenv('API_ENDPOINT'), template_id), json={
'accessKey': os.getenv('ACCESS_KEY'),
}, timeout=10)
req_span.set_attribute("http.status_code", response.status_code)
req_span.set_attribute("http.response", response.text)
response.raise_for_status()
except requests.RequestException as e:
req_span.set_attribute("api.error", str(e))
logger.error("请求失败!", e)
return None
data = response.json()
logger.debug("获取模板信息结果:【%s", data)
remote_template_info = data.get('data', {})
template = {
'id': template_id,
'updateTime': remote_template_info.get('updateTime', template_id),
'scenic_name': remote_template_info.get('scenicName', '景区'),
'name': remote_template_info.get('name', '模版'),
'video_size': remote_template_info.get('resolution', '1920x1080'),
'frame_rate': 25,
'overall_duration': 30,
'video_parts': [
]
}
def _template_normalizer(template_info):
_template = {}
_placeholder_type = template_info.get('isPlaceholder', -1)
if _placeholder_type == 0:
# 固定视频
_template['source'] = template_info.get('sourceUrl', '')
elif _placeholder_type == 1:
# 占位符
_template['source'] = "PLACEHOLDER_" + template_info.get('sourceUrl', '')
_template['mute'] = template_info.get('mute', True)
_template['crop_mode'] = template_info.get('cropEnable', None)
else:
_template['source'] = None
_overlays = template_info.get('overlays', '')
if _overlays:
_template['overlays'] = _overlays.split(",")
_audios = template_info.get('audios', '')
if _audios:
_template['audios'] = _audios.split(",")
_luts = template_info.get('luts', '')
if _luts:
_template['luts'] = _luts.split(",")
_only_if = template_info.get('onlyIf', '')
if _only_if:
_template['only_if'] = _only_if
_effects = template_info.get('effects', '')
if _effects:
_template['effects'] = _effects.split("|")
return _template
# outer template definition
overall_template = _template_normalizer(remote_template_info)
template['overall_template'] = overall_template
# inter template definition
inter_template_list = remote_template_info.get('children', [])
for children_template in inter_template_list:
parts = _template_normalizer(children_template)
template['video_parts'].append(parts)
template['local_path'] = os.path.join(os.getenv('TEMPLATE_DIR'), str(template_id))
with get_tracer("api").start_as_current_span("get_template_info.template") as res_span:
res_span.set_attribute("normalized.response", json.dumps(template))
return template
def report_task_success(task_info, **kwargs):
tracer = get_tracer(__name__)
with tracer.start_as_current_span("report_task_success"):
with tracer.start_as_current_span("report_task_success.request") as req_span:
try:
req_span.set_attribute("http.method", "POST")
req_span.set_attribute("http.url",
'{0}/{1}/success'.format(os.getenv('API_ENDPOINT'), task_info.get("id")))
response = session.post('{0}/{1}/success'.format(os.getenv('API_ENDPOINT'), task_info.get("id")), json={
'accessKey': os.getenv('ACCESS_KEY'),
**kwargs
}, timeout=10)
req_span.set_attribute("http.status_code", response.status_code)
req_span.set_attribute("http.response", response.text)
response.raise_for_status()
req_span.set_status(Status(StatusCode.OK))
except requests.RequestException as e:
req_span.set_attribute("api.error", str(e))
logger.error("请求失败!", e)
return None
def report_task_start(task_info):
tracer = get_tracer(__name__)
with tracer.start_as_current_span("report_task_start"):
with tracer.start_as_current_span("report_task_start.request") as req_span:
try:
req_span.set_attribute("http.method", "POST")
req_span.set_attribute("http.url",
'{0}/{1}/start'.format(os.getenv('API_ENDPOINT'), task_info.get("id")))
response = session.post('{0}/{1}/start'.format(os.getenv('API_ENDPOINT'), task_info.get("id")), json={
'accessKey': os.getenv('ACCESS_KEY'),
}, timeout=10)
req_span.set_attribute("http.status_code", response.status_code)
req_span.set_attribute("http.response", response.text)
response.raise_for_status()
req_span.set_status(Status(StatusCode.OK))
except requests.RequestException as e:
req_span.set_attribute("api.error", str(e))
logger.error("请求失败!", e)
return None
def report_task_failed(task_info, reason=''):
tracer = get_tracer(__name__)
with tracer.start_as_current_span("report_task_failed") as span:
span.set_attribute("task_id", task_info.get("id"))
span.set_attribute("reason", reason)
with tracer.start_as_current_span("report_task_failed.request") as req_span:
try:
req_span.set_attribute("http.method", "POST")
req_span.set_attribute("http.url",
'{0}/{1}/fail'.format(os.getenv('API_ENDPOINT'), task_info.get("id")))
response = session.post('{0}/{1}/fail'.format(os.getenv('API_ENDPOINT'), task_info.get("id")), json={
'accessKey': os.getenv('ACCESS_KEY'),
'reason': reason
}, timeout=10)
req_span.set_attribute("http.status_code", response.status_code)
req_span.set_attribute("http.response", response.text)
response.raise_for_status()
req_span.set_status(Status(StatusCode.OK))
except requests.RequestException as e:
req_span.set_attribute("api.error", str(e))
req_span.set_status(Status(StatusCode.ERROR))
logger.error("请求失败!", e)
return None
def upload_task_file(task_info, ffmpeg_task):
tracer = get_tracer(__name__)
with get_tracer("api").start_as_current_span("upload_task_file") as span:
logger.info("开始上传文件: %s", task_info.get("id"))
span.set_attribute("file.id", task_info.get("id"))
with tracer.start_as_current_span("upload_task_file.request_upload_url") as req_span:
try:
req_span.set_attribute("http.method", "POST")
req_span.set_attribute("http.url",
'{0}/{1}/uploadUrl'.format(os.getenv('API_ENDPOINT'), task_info.get("id")))
response = session.post('{0}/{1}/uploadUrl'.format(os.getenv('API_ENDPOINT'), task_info.get("id")),
json={
'accessKey': os.getenv('ACCESS_KEY'),
}, timeout=10)
req_span.set_attribute("http.status_code", response.status_code)
req_span.set_attribute("http.response", response.text)
response.raise_for_status()
req_span.set_status(Status(StatusCode.OK))
except requests.RequestException as e:
span.set_attribute("api.error", str(e))
req_span.set_status(Status(StatusCode.ERROR))
logger.error("请求失败!", e)
return False
data = response.json()
url = data.get('data', "")
logger.info("开始上传文件: %s%s", task_info.get("id"), url)
return oss.upload_to_oss(url, ffmpeg_task.get_output_file())
def get_task_info(id):
try: try:
response = session.get(os.getenv('API_ENDPOINT') + "/" + id + "/info", params={ response = session.post('{0}/template/{1}'.format(os.getenv('API_ENDPOINT'), template_id), json={
'accessKey': os.getenv('ACCESS_KEY'), 'accessKey': os.getenv('ACCESS_KEY'),
}, timeout=10) }, timeout=10)
response.raise_for_status() response.raise_for_status()
except requests.RequestException as e: except requests.RequestException as e:
logger.error("请求失败!", e) logger.error("请求失败!", e)
return [] return None
data = response.json() data = response.json()
logger.debug("获取任务结果:【%s", data) remote_template_info = data.get('data', {})
if data.get('code', 0) == 200: logger.debug("获取模板信息结果:【%s", remote_template_info)
return data.get('data', {}) template = {
'id': template_id,
'updateTime': remote_template_info.get('updateTime', template_id),
'scenic_name': remote_template_info.get('scenicName', '景区'),
'name': remote_template_info.get('name', '模版'),
'video_size': '1920x1080',
'frame_rate': 30,
'overall_duration': 30,
'video_parts': [
]
}
def _template_normalizer(template_info):
_template = {}
_placeholder_type = template_info.get('isPlaceholder', -1)
if _placeholder_type == 0:
# 固定视频
_template['source'] = template_info.get('sourceUrl', '')
elif _placeholder_type == 1:
# 占位符
_template['source'] = "PLACEHOLDER_" + template_info.get('sourceUrl', '')
_template['mute'] = template_info.get('mute', True)
else:
_template['source'] = None
_overlays = template_info.get('overlays', '')
if _overlays:
_template['overlays'] = _overlays.split(",")
_audios = template_info.get('audios', '')
if _audios:
_template['audios'] = _audios.split(",")
_luts = template_info.get('luts', '')
if _luts:
_template['luts'] = _luts.split(",")
return _template
# outer template definition
overall_template = _template_normalizer(remote_template_info)
template['overall_template'] = overall_template
# inter template definition
inter_template_list = remote_template_info.get('children', [])
for children_template in inter_template_list:
parts = _template_normalizer(children_template)
template['video_parts'].append(parts)
template['local_path'] = os.path.join(os.getenv('TEMPLATE_DIR'), str(template_id))
return template
def report_task_success(task_info, **kwargs):
try:
response = session.post('{0}/{1}/success'.format(os.getenv('API_ENDPOINT'), task_info.get("id")), json={
'accessKey': os.getenv('ACCESS_KEY'),
**kwargs
}, timeout=10)
response.raise_for_status()
except requests.RequestException as e:
logger.error("请求失败!", e)
return None
def report_task_start(task_info):
try:
response = session.post('{0}/{1}/start'.format(os.getenv('API_ENDPOINT'), task_info.get("id")), json={
'accessKey': os.getenv('ACCESS_KEY'),
}, timeout=10)
response.raise_for_status()
except requests.RequestException as e:
logger.error("请求失败!", e)
return None
def report_task_failed(task_info, reason=''):
try:
response = session.post('{0}/{1}/fail'.format(os.getenv('API_ENDPOINT'), task_info.get("id")), json={
'accessKey': os.getenv('ACCESS_KEY'),
'reason': reason
}, timeout=10)
response.raise_for_status()
except requests.RequestException as e:
logger.error("请求失败!", e)
return None
def upload_task_file(task_info, ffmpeg_task):
logger.info("开始上传文件: %s", task_info.get("id"))
try:
response = session.post('{0}/{1}/uploadUrl'.format(os.getenv('API_ENDPOINT'), task_info.get("id")), json={
'accessKey': os.getenv('ACCESS_KEY'),
}, timeout=10)
response.raise_for_status()
except requests.RequestException as e:
logger.error("请求失败!", e)
return False
data = response.json()
url = data.get('data', "")
logger.info("开始上传文件: %s%s", task_info.get("id"), url)
try:
with open(ffmpeg_task.get_output_file(), 'rb') as f:
requests.put(url, data=f)
except requests.RequestException as e:
logger.error("上传失败!", e)
return False
finally:
logger.info("上传文件结束: %s", task_info.get("id"))
return True

View File

@ -1,87 +1,40 @@
import json
import logging import logging
import os import os
import subprocess import subprocess
from datetime import datetime from datetime import datetime
from typing import Optional, IO from typing import Optional, IO
from opentelemetry.trace import Status, StatusCode from entity.ffmpeg import FfmpegTask
from entity.ffmpeg import FfmpegTask, ENCODER_ARGS, VIDEO_ARGS, AUDIO_ARGS, MUTE_AUDIO_INPUT
from telemetry import get_tracer
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def re_encode_and_annexb(file): def to_annexb(file):
with get_tracer("ffmpeg").start_as_current_span("re_encode_and_annexb") as span: if not os.path.exists(file):
span.set_attribute("file.path", file) return file
if not os.path.exists(file): logger.info("ToAnnexb: %s", file)
span.set_status(Status(StatusCode.ERROR)) ffmpeg_process = subprocess.run(["ffmpeg", "-y", "-hide_banner", "-i", file, "-c", "copy", "-bsf:v", "h264_mp4toannexb",
return file "-f", "mpegts", file+".ts"])
logger.info("ReEncodeAndAnnexb: %s", file) logger.info("ToAnnexb: %s, returned: %s", file, ffmpeg_process.returncode)
has_audio = not not probe_video_audio(file) if ffmpeg_process.returncode == 0:
ffmpeg_process = subprocess.run(["ffmpeg", "-y", "-hide_banner", "-vsync", "cfr", "-i", file, os.remove(file)
*(set() if has_audio else MUTE_AUDIO_INPUT), return file+".ts"
"-map", "0:v", "-map", "0:a" if has_audio else "1:a", else:
*VIDEO_ARGS, "-bsf:v", "h264_mp4toannexb", return file
*AUDIO_ARGS, "-bsf:a", "setts=pts=DTS",
*ENCODER_ARGS, "-shortest", "-fflags", "+genpts",
"-f", "mpegts", file + ".ts"])
logger.info(" ".join(ffmpeg_process.args))
span.set_attribute("ffmpeg.args", json.dumps(ffmpeg_process.args))
logger.info("ReEncodeAndAnnexb: %s, returned: %s", file, ffmpeg_process.returncode)
span.set_attribute("ffmpeg.code", ffmpeg_process.returncode)
if ffmpeg_process.returncode == 0:
span.set_status(Status(StatusCode.OK))
span.set_attribute("file.size", os.path.getsize(file+".ts"))
# os.remove(file)
return file+".ts"
else:
span.set_status(Status(StatusCode.ERROR))
return file
def start_render(ffmpeg_task: FfmpegTask): def start_render(ffmpeg_task: FfmpegTask):
tracer = get_tracer(__name__) logger.info(ffmpeg_task)
with tracer.start_as_current_span("start_render") as span: if not ffmpeg_task.need_run():
span.set_attribute("ffmpeg.task", str(ffmpeg_task)) ffmpeg_task.set_output_file(ffmpeg_task.input_file[0])
if not ffmpeg_task.need_run():
ffmpeg_task.set_output_file(ffmpeg_task.input_file[0])
span.set_status(Status(StatusCode.OK))
return True
ffmpeg_args = ffmpeg_task.get_ffmpeg_args()
if len(ffmpeg_args) == 0:
ffmpeg_task.set_output_file(ffmpeg_task.input_file[0])
span.set_status(Status(StatusCode.OK))
return True
ffmpeg_process = subprocess.run(["ffmpeg", "-progress", "-", "-loglevel", "error", *ffmpeg_args], stderr=subprocess.PIPE, **subprocess_args(True))
span.set_attribute("ffmpeg.args", json.dumps(ffmpeg_process.args))
logger.info(" ".join(ffmpeg_process.args))
ffmpeg_final_out = handle_ffmpeg_output(ffmpeg_process.stdout)
span.set_attribute("ffmpeg.out", ffmpeg_final_out)
logger.info("FINISH TASK, OUTPUT IS %s", ffmpeg_final_out)
code = ffmpeg_process.returncode
span.set_attribute("ffmpeg.code", code)
if code != 0:
span.set_attribute("ffmpeg.err", str(ffmpeg_process.stderr))
span.set_status(Status(StatusCode.ERROR, "FFMPEG异常退出"))
logger.error("FFMPEG ERROR: %s", ffmpeg_process.stderr)
return False
span.set_attribute("ffmpeg.out_file", ffmpeg_task.output_file)
try:
file_size = os.path.getsize(ffmpeg_task.output_file)
span.set_attribute("file.size", file_size)
if file_size < 4096:
span.set_status(Status(StatusCode.ERROR, "输出文件过小"))
logger.error("FFMPEG ERROR: OUTPUT FILE IS TOO SMALL")
return False
except OSError as e:
span.set_attribute("file.size", 0)
span.set_attribute("file.error", e.strerror)
span.set_status(Status(StatusCode.ERROR, "输出文件不存在"))
logger.error("FFMPEG ERROR: OUTPUT FILE NOT FOUND")
return False
span.set_status(Status(StatusCode.OK))
return True return True
ffmpeg_args = ffmpeg_task.get_ffmpeg_args()
logger.info(ffmpeg_args)
if len(ffmpeg_args) == 0:
ffmpeg_task.set_output_file(ffmpeg_task.input_file[0])
return True
ffmpeg_process = subprocess.run(["ffmpeg", "-progress", "-", "-loglevel", "error", *ffmpeg_args], **subprocess_args(True))
logger.info("FINISH TASK, OUTPUT IS %s", handle_ffmpeg_output(ffmpeg_process.stdout))
code = ffmpeg_process.returncode
return code == 0
def handle_ffmpeg_output(stdout: Optional[bytes]) -> str: def handle_ffmpeg_output(stdout: Optional[bytes]) -> str:
out_time = "0:0:0.0" out_time = "0:0:0.0"
@ -102,95 +55,25 @@ def handle_ffmpeg_output(stdout: Optional[bytes]) -> str:
print("[ ]Speed:", out_time, "@", speed) print("[ ]Speed:", out_time, "@", speed)
return out_time+"@"+speed return out_time+"@"+speed
def duration_str_to_float(duration_str: str) -> float: def duration_str_to_float(duration_str: str) -> float:
_duration = datetime.strptime(duration_str, "%H:%M:%S.%f") - datetime(1900, 1, 1) _duration = datetime.strptime(duration_str, "%H:%M:%S.%f") - datetime(1900, 1, 1)
return _duration.total_seconds() return _duration.total_seconds()
def probe_video_info(video_file): def probe_video_info(video_file):
tracer = get_tracer(__name__) # 获取宽度和高度
with tracer.start_as_current_span("probe_video_info") as span: result = subprocess.run(
span.set_attribute("video.file", video_file) ["ffprobe.exe", '-v', 'error', '-select_streams', 'v:0', '-show_entries', 'stream=width,height:format=duration', '-of',
# 获取宽度和高度 'csv=s=x:p=0', video_file],
result = subprocess.run( stderr=subprocess.STDOUT,
["ffprobe", '-v', 'error', '-select_streams', 'v:0', '-show_entries', 'stream=width,height:format=duration', '-of', **subprocess_args(True)
'csv=s=x:p=0', video_file], )
stderr=subprocess.STDOUT, all_result = result.stdout.decode('utf-8').strip()
**subprocess_args(True) wh, duration = all_result.split('\n')
) width, height = wh.strip().split('x')
span.set_attribute("ffprobe.args", json.dumps(result.args))
span.set_attribute("ffprobe.code", result.returncode)
if result.returncode != 0:
span.set_status(Status(StatusCode.ERROR))
return 0, 0, 0
all_result = result.stdout.decode('utf-8').strip()
span.set_attribute("ffprobe.out", all_result)
if all_result == '':
span.set_status(Status(StatusCode.ERROR))
return 0, 0, 0
span.set_status(Status(StatusCode.OK))
wh, duration = all_result.split('\n')
width, height = wh.strip().split('x')
return int(width), int(height), float(duration)
def probe_video_audio(video_file, type=None):
tracer = get_tracer(__name__)
with tracer.start_as_current_span("probe_video_audio") as span:
span.set_attribute("video.file", video_file)
args = ["ffprobe", "-hide_banner", "-v", "error", "-select_streams", "a", "-show_entries", "stream=index", "-of", "csv=p=0"]
if type == 'concat':
args.append("-safe")
args.append("0")
args.append("-f")
args.append("concat")
args.append(video_file)
logger.info(" ".join(args))
result = subprocess.run(args, stderr=subprocess.STDOUT, **subprocess_args(True))
span.set_attribute("ffprobe.args", json.dumps(result.args))
span.set_attribute("ffprobe.code", result.returncode)
logger.info("probe_video_audio: %s", result.stdout.decode('utf-8').strip())
if result.returncode != 0:
return False
if result.stdout.decode('utf-8').strip() == '':
return False
return True
# 音频淡出2秒
def fade_out_audio(file, duration, fade_out_sec = 2):
if type(duration) == str:
try:
duration = float(duration)
except Exception as e:
logger.error("duration is not float: %s", e)
return file
tracer = get_tracer(__name__)
with tracer.start_as_current_span("fade_out_audio") as span:
span.set_attribute("audio.file", file)
if duration <= fade_out_sec:
return file
else:
new_fn = file + "_.mp4"
if os.path.exists(new_fn):
os.remove(new_fn)
logger.info("delete tmp file: " + new_fn)
try:
process = subprocess.run(["ffmpeg", "-i", file, "-c:v", "copy", "-c:a", "aac", "-af", "afade=t=out:st=" + str(duration - fade_out_sec) + ":d=" + str(fade_out_sec), "-y", new_fn], **subprocess_args(True))
span.set_attribute("ffmpeg.args", json.dumps(process.args))
logger.info(" ".join(process.args))
if process.returncode != 0:
span.set_status(Status(StatusCode.ERROR))
logger.error("FFMPEG ERROR: %s", process.stderr)
return file
else:
span.set_status(Status(StatusCode.OK))
return new_fn
except Exception as e:
span.set_status(Status(StatusCode.ERROR))
logger.error("FFMPEG ERROR: %s", e)
return file
return int(width), int(height), float(duration)
# Create a set of arguments which make a ``subprocess.Popen`` (and # Create a set of arguments which make a ``subprocess.Popen`` (and

View File

@ -2,9 +2,6 @@ import logging
import os import os
import requests import requests
from opentelemetry.trace import Status, StatusCode
from telemetry import get_tracer
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -16,40 +13,13 @@ def upload_to_oss(url, file_path):
:param str file_path: 文件路径 :param str file_path: 文件路径
:return bool: 是否成功 :return bool: 是否成功
""" """
tracer = get_tracer(__name__) with open(file_path, 'rb') as f:
with tracer.start_as_current_span("upload_to_oss") as span: try:
span.set_attribute("file.url", url) response = requests.put(url, data=f)
span.set_attribute("file.path", file_path) return response.status_code == 200
span.set_attribute("file.size", os.path.getsize(file_path)) except Exception as e:
max_retries = 5 print(e)
retries = 0 return False
while retries < max_retries:
with tracer.start_as_current_span("upload_to_oss.request") as req_span:
req_span.set_attribute("http.retry_count", retries)
try:
req_span.set_attribute("http.method", "PUT")
req_span.set_attribute("http.url", url)
with open(file_path, 'rb') as f:
response = requests.put(url, data=f, stream=True, timeout=60, headers={"Content-Type": "video/mp4"})
req_span.set_attribute("http.status_code", response.status_code)
req_span.set_attribute("http.response", response.text)
response.raise_for_status()
req_span.set_status(Status(StatusCode.OK))
span.set_status(Status(StatusCode.OK))
return True
except requests.exceptions.Timeout:
req_span.set_attribute("http.error", "Timeout")
req_span.set_status(Status(StatusCode.ERROR))
retries += 1
logger.warning(f"Upload timed out. Retrying {retries}/{max_retries}...")
except Exception as e:
req_span.set_attribute("http.error", str(e))
req_span.set_status(Status(StatusCode.ERROR))
retries += 1
logger.warning(f"Upload failed. Retrying {retries}/{max_retries}...")
span.set_status(Status(StatusCode.ERROR))
return False
def download_from_oss(url, file_path): def download_from_oss(url, file_path):
""" """
@ -58,40 +28,16 @@ def download_from_oss(url, file_path):
:param Union[LiteralString, str, bytes] file_path: 文件路径 :param Union[LiteralString, str, bytes] file_path: 文件路径
:return bool: 是否成功 :return bool: 是否成功
""" """
tracer = get_tracer(__name__) logging.info("download_from_oss: %s", url)
with tracer.start_as_current_span("download_from_oss") as span: file_dir, file_name = os.path.split(file_path)
span.set_attribute("file.url", url) if file_dir:
span.set_attribute("file.path", file_path) if not os.path.exists(file_dir):
logging.info("download_from_oss: %s", url) os.makedirs(file_dir)
file_dir, file_name = os.path.split(file_path) try:
if file_dir: response = requests.get(url)
if not os.path.exists(file_dir): with open(file_path, 'wb') as f:
os.makedirs(file_dir) f.write(response.content)
max_retries = 5 return True
retries = 0 except Exception as e:
while retries < max_retries: print(e)
with tracer.start_as_current_span("download_from_oss.request") as req_span: return False
req_span.set_attribute("http.retry_count", retries)
try:
req_span.set_attribute("http.method", "GET")
req_span.set_attribute("http.url", url)
response = requests.get(url, timeout=15) # 设置超时时间
req_span.set_attribute("http.status_code", response.status_code)
with open(file_path, 'wb') as f:
f.write(response.content)
req_span.set_attribute("file.size", os.path.getsize(file_path))
req_span.set_status(Status(StatusCode.OK))
span.set_status(Status(StatusCode.OK))
return True
except requests.exceptions.Timeout:
req_span.set_attribute("http.error", "Timeout")
req_span.set_status(Status(StatusCode.ERROR))
retries += 1
logger.warning(f"Download timed out. Retrying {retries}/{max_retries}...")
except Exception as e:
req_span.set_attribute("http.error", str(e))
req_span.set_status(Status(StatusCode.ERROR))
retries += 1
logger.warning(f"Download failed. Retrying {retries}/{max_retries}...")
span.set_status(Status(StatusCode.ERROR))
return False