Compare commits

...

26 Commits

Author SHA1 Message Date
6e4dbfd843 固定模板支持音乐 2025-03-28 18:08:19 +08:00
09e0f5f3be concat支持annexb 2025-03-28 18:07:57 +08:00
52c2df8b65 删除无用方法 2025-03-28 17:30:28 +08:00
b25ad20ddd 日志 2025-03-28 17:27:24 +08:00
7c6e4a97b2 片段有音频不可以copy 2025-03-28 17:26:49 +08:00
8f0e69c3de overall_speed片段全局变速 2025-03-24 10:30:36 +08:00
b8db0d2b95 metrics调整 2025-03-23 18:36:26 +08:00
6dc7e86e8e 埋点采集部分接口调整 2025-03-18 13:58:36 +08:00
c62f1ab976 upload返回结果 2025-03-11 16:15:51 +08:00
744fe28421 修复居中切割的问题 2025-03-10 15:16:23 +08:00
cf43e6d549 修复amix降低声音的问题,修复reencode_to_annexb不添加音轨的问题 2025-03-10 10:13:30 +08:00
dcf5f5630d 主动判断是否有音频 2025-03-06 23:02:54 +08:00
56bdad7ad1 音轨叠加 2025-03-06 10:34:28 +08:00
94373cee72 cameraShot特效及旋转 2025-03-05 14:57:02 +08:00
4549b0ab44 分辨率和裁切 2025-03-04 17:43:47 +08:00
9d178a3d34 埋点 2025-03-04 12:36:48 +08:00
1f9632761f effect 2025-03-03 14:27:52 +08:00
fff20610a5 profile level指定及修复 2025-02-27 16:48:57 +08:00
67696739f9 切割模式 2025-02-27 14:02:17 +08:00
2ea248c02e 上传文件也弄个超时 2025-02-16 18:15:35 +08:00
358207efdc 定时清理目录下无用文件 2025-02-16 18:15:25 +08:00
94a5e687df 未生成文件时,上报失败 2025-02-08 15:02:36 +08:00
b7d6797901 忽略无用文件 2025-02-05 10:13:33 +08:00
6d9d373032 only if 逻辑 2025-01-23 14:28:51 +08:00
549ee8320a ffprobe 报错后不采用其内容 2025-01-22 16:04:33 +08:00
29bb80f3b9 渲染后再to annexb,使用新逻辑拼接 2025-01-22 14:31:59 +08:00
10 changed files with 733 additions and 327 deletions

5
.gitignore vendored
View File

@ -6,6 +6,11 @@ __pycache__/
*.so *.so
.Python .Python
build/ build/
dist/
*.mp4
*.ts
rand*.ts
tmp_concat_*.txt
*.egg-info/ *.egg-info/
*.egg *.egg
*.manifest *.manifest

View File

@ -2,15 +2,20 @@ import json
import os.path import os.path
import time import time
from opentelemetry.trace import Status, StatusCode
from entity.ffmpeg import FfmpegTask from entity.ffmpeg import FfmpegTask
import logging import logging
from util import ffmpeg, oss from util import ffmpeg, oss
from telemetry import get_tracer
logger = logging.getLogger('biz/ffmpeg') logger = logging.getLogger('biz/ffmpeg')
def parse_ffmpeg_task(task_info, template_info): def parse_ffmpeg_task(task_info, template_info):
tracer = get_tracer(__name__)
with tracer.start_as_current_span("parse_ffmpeg_task"):
tasks = [] tasks = []
# 中间片段 # 中间片段
task_params_str = task_info.get("taskParams", "{}") task_params_str = task_info.get("taskParams", "{}")
@ -20,9 +25,18 @@ def parse_ffmpeg_task(task_info, template_info):
if not source: if not source:
logger.warning("no video found for part: " + str(part)) logger.warning("no video found for part: " + str(part))
continue continue
only_if = part.get('only_if', '')
if only_if:
if not check_placeholder_exist(only_if, task_params):
logger.info("because only_if exist, placeholder: %s not exist, skip part: %s", only_if, part)
continue
sub_ffmpeg_task = FfmpegTask(source) sub_ffmpeg_task = FfmpegTask(source)
sub_ffmpeg_task.annexb = True sub_ffmpeg_task.annexb = True
sub_ffmpeg_task.ext_data = find_placeholder_params(part.get('source'), task_params) or {}
sub_ffmpeg_task.frame_rate = template_info.get("frame_rate", 25) sub_ffmpeg_task.frame_rate = template_info.get("frame_rate", 25)
sub_ffmpeg_task.center_cut = part.get("crop_mode", None)
for effect in part.get('effects', []):
sub_ffmpeg_task.add_effect(effect)
for lut in part.get('filters', []): for lut in part.get('filters', []):
sub_ffmpeg_task.add_lut(os.path.join(template_info.get("local_path"), lut)) sub_ffmpeg_task.add_lut(os.path.join(template_info.get("local_path"), lut))
for audio in part.get('audios', []): for audio in part.get('audios', []):
@ -33,10 +47,13 @@ def parse_ffmpeg_task(task_info, template_info):
output_file = "out_" + str(time.time()) + ".mp4" output_file = "out_" + str(time.time()) + ".mp4"
task = FfmpegTask(tasks, output_file=output_file) task = FfmpegTask(tasks, output_file=output_file)
overall = template_info.get("overall_template") overall = template_info.get("overall_template")
task.center_cut = template_info.get("crop_mode", None)
task.frame_rate = template_info.get("frame_rate", 25) task.frame_rate = template_info.get("frame_rate", 25)
if overall.get('source', ''): if overall.get('source', ''):
source = parse_video(overall.get('source'), task_params, template_info) source = parse_video(overall.get('source'), task_params, template_info)
task.add_inputs(source) task.add_inputs(source)
for effect in overall.get('effects', []):
task.add_effect(effect)
for lut in overall.get('filters', []): for lut in overall.get('filters', []):
task.add_lut(os.path.join(template_info.get("local_path"), lut)) task.add_lut(os.path.join(template_info.get("local_path"), lut))
for audio in overall.get('audios', []): for audio in overall.get('audios', []):
@ -46,6 +63,19 @@ def parse_ffmpeg_task(task_info, template_info):
return task return task
def find_placeholder_params(source, task_params):
if source.startswith('PLACEHOLDER_'):
placeholder_id = source.replace('PLACEHOLDER_', '')
new_sources = task_params.get(placeholder_id, [])
if type(new_sources) is list:
if len(new_sources) == 0:
logger.debug("no video found for placeholder: " + placeholder_id)
return {}
else:
return new_sources[0]
return {}
def parse_video(source, task_params, template_info): def parse_video(source, task_params, template_info):
print(source) print(source)
if source.startswith('PLACEHOLDER_'): if source.startswith('PLACEHOLDER_'):
@ -66,11 +96,32 @@ def parse_video(source, task_params, template_info):
return os.path.join(template_info.get("local_path"), source) return os.path.join(template_info.get("local_path"), source)
def check_placeholder_exist(placeholder_id, task_params):
if placeholder_id in task_params:
new_sources = task_params.get(placeholder_id, [])
if type(new_sources) is list:
if len(new_sources) == 0:
return False
else:
return True
return True
return False
def start_ffmpeg_task(ffmpeg_task): def start_ffmpeg_task(ffmpeg_task):
tracer = get_tracer(__name__)
with tracer.start_as_current_span("start_ffmpeg_task") as span:
for task in ffmpeg_task.analyze_input_render_tasks(): for task in ffmpeg_task.analyze_input_render_tasks():
start_ffmpeg_task(task) result = start_ffmpeg_task(task)
if not result:
return False
ffmpeg_task.correct_task_type() ffmpeg_task.correct_task_type()
return ffmpeg.start_render(ffmpeg_task) result = ffmpeg.start_render(ffmpeg_task)
if not result:
span.set_status(Status(StatusCode.ERROR))
return False
span.set_status(Status(StatusCode.OK))
return True
def clear_task_tmp_file(ffmpeg_task): def clear_task_tmp_file(ffmpeg_task):

View File

@ -1,18 +1,29 @@
import json
from opentelemetry.trace import Status, StatusCode
from biz.ffmpeg import parse_ffmpeg_task, start_ffmpeg_task, clear_task_tmp_file, probe_video_info
from telemetry import get_tracer
from template import get_template_def from template import get_template_def
from util import api from util import api
def start_task(task_info): def start_task(task_info):
from biz.ffmpeg import parse_ffmpeg_task, start_ffmpeg_task, clear_task_tmp_file, probe_video_info tracer = get_tracer(__name__)
with tracer.start_as_current_span("start_task") as span:
span.set_attribute("task.id", task_info)
task_info = api.normalize_task(task_info) task_info = api.normalize_task(task_info)
span.set_attribute("task.info", json.dumps(task_info))
template_info = get_template_def(task_info.get("templateId")) template_info = get_template_def(task_info.get("templateId"))
api.report_task_start(task_info) api.report_task_start(task_info)
ffmpeg_task = parse_ffmpeg_task(task_info, template_info) ffmpeg_task = parse_ffmpeg_task(task_info, template_info)
result = start_ffmpeg_task(ffmpeg_task) result = start_ffmpeg_task(ffmpeg_task)
if not result: if not result:
span.set_status(Status(StatusCode.ERROR))
return api.report_task_failed(task_info) return api.report_task_failed(task_info)
oss_result = api.upload_task_file(task_info, ffmpeg_task) oss_result = api.upload_task_file(task_info, ffmpeg_task)
if not oss_result: if not oss_result:
span.set_status(Status(StatusCode.ERROR))
return api.report_task_failed(task_info) return api.report_task_failed(task_info)
# 获取视频长度宽度和时长 # 获取视频长度宽度和时长
width, height, duration = probe_video_info(ffmpeg_task) width, height, duration = probe_video_info(ffmpeg_task)
@ -22,3 +33,4 @@ def start_task(task_info):
"height": height, "height": height,
"duration": duration "duration": duration
}) })
span.set_status(Status(StatusCode.OK))

View File

@ -1,9 +1,19 @@
import json
import time import time
import uuid import uuid
from typing import Any
DEFAULT_ARGS = ("-shortest",)
ENCODER_ARGS = ("-c:v", "h264_qsv", "-global_quality", "28", "-look_ahead", "1", )
VIDEO_ARGS = ("-profile:v", "high", "-level:v", "4", )
AUDIO_ARGS = ("-c:a", "aac", "-b:a", "128k", "-ar", "48000", "-ac", "2", )
MUTE_AUDIO_INPUT = ("-f", "lavfi", "-i", "anullsrc=cl=stereo:r=48000", )
class FfmpegTask(object): class FfmpegTask(object):
effects: list[str]
def __init__(self, input_file, task_type='copy', output_file=''): def __init__(self, input_file, task_type='copy', output_file=''):
self.annexb = False self.annexb = False
if type(input_file) is str: if type(input_file) is str:
@ -14,6 +24,9 @@ class FfmpegTask(object):
self.input_file = input_file self.input_file = input_file
else: else:
self.input_file = [] self.input_file = []
self.zoom_cut = None
self.center_cut = None
self.ext_data = {}
self.task_type = task_type self.task_type = task_type
self.output_file = output_file self.output_file = output_file
self.mute = True self.mute = True
@ -23,6 +36,7 @@ class FfmpegTask(object):
self.luts = [] self.luts = []
self.audios = [] self.audios = []
self.overlays = [] self.overlays = []
self.effects = []
def __repr__(self): def __repr__(self):
_str = f'FfmpegTask(input_file={self.input_file}, task_type={self.task_type}' _str = f'FfmpegTask(input_file={self.input_file}, task_type={self.task_type}'
@ -34,8 +48,11 @@ class FfmpegTask(object):
_str += f', overlays={self.overlays}' _str += f', overlays={self.overlays}'
if self.annexb: if self.annexb:
_str += f', annexb={self.annexb}' _str += f', annexb={self.annexb}'
if self.effects:
_str += f', effects={self.effects}'
if self.mute: if self.mute:
_str += f', mute={self.mute}' _str += f', mute={self.mute}'
_str += f', center_cut={self.center_cut}'
return _str + ')' return _str + ')'
def analyze_input_render_tasks(self): def analyze_input_render_tasks(self):
@ -77,6 +94,10 @@ class FfmpegTask(object):
self.luts.extend(luts) self.luts.extend(luts)
self.correct_task_type() self.correct_task_type()
def add_effect(self, *effects):
self.effects.extend(effects)
self.correct_task_type()
def get_output_file(self): def get_output_file(self):
if self.task_type == 'copy': if self.task_type == 'copy':
return self.input_file[0] return self.input_file[0]
@ -99,8 +120,14 @@ class FfmpegTask(object):
return False return False
if len(self.subtitles) > 0: if len(self.subtitles) > 0:
return False return False
if len(self.effects) > 0:
return False
if self.speed != 1: if self.speed != 1:
return False return False
if self.zoom_cut is not None:
return False
if self.center_cut is not None:
return False
return True return True
def check_can_copy(self): def check_can_copy(self):
@ -110,40 +137,103 @@ class FfmpegTask(object):
return False return False
if len(self.subtitles) > 0: if len(self.subtitles) > 0:
return False return False
if len(self.effects) > 0:
return False
if self.speed != 1: if self.speed != 1:
return False return False
if len(self.audios) > 1: if len(self.audios) >= 1:
return False return False
if len(self.input_file) > 1: if len(self.input_file) > 1:
return False return False
if self.zoom_cut is not None:
return False
if self.center_cut is not None:
return False
return True return True
def check_audio_track(self): def check_audio_track(self):
if len(self.audios) > 0: ...
self.mute = False
def get_ffmpeg_args(self): def get_ffmpeg_args(self):
args = ['-y', '-hide_banner'] args = ['-y', '-hide_banner']
if self.task_type == 'encode': if self.task_type == 'encode':
# args += ('-hwaccel', 'qsv', '-hwaccel_output_format', 'qsv')
input_args = [] input_args = []
filter_args = [] filter_args = []
output_args = ["-shortest", "-c:v", "h264_qsv", "-global_quality", "28", "-look_ahead", "1"] output_args = [*VIDEO_ARGS, *AUDIO_ARGS, *ENCODER_ARGS, *DEFAULT_ARGS]
if self.annexb: if self.annexb:
output_args.append("-bsf:v") output_args.append("-bsf:v")
output_args.append("h264_mp4toannexb") output_args.append("h264_mp4toannexb")
output_args.append("-reset_timestamps")
output_args.append("1")
video_output_str = "[0:v]" video_output_str = "[0:v]"
audio_output_str = "[0:v]" audio_output_str = ""
video_input_count = 0 audio_track_index = 0
audio_input_count = 0 effect_index = 0
for input_file in self.input_file: for input_file in self.input_file:
input_args.append("-i") input_args.append("-i")
if type(input_file) is str: if type(input_file) is str:
input_args.append(input_file) input_args.append(input_file)
elif isinstance(input_file, FfmpegTask): elif isinstance(input_file, FfmpegTask):
input_args.append(input_file.get_output_file()) input_args.append(input_file.get_output_file())
if self.center_cut == 1:
pos_json_str = self.ext_data.get('posJson', '{}')
pos_json = json.loads(pos_json_str)
_v_w = pos_json.get('imgWidth', 1)
_f_x = pos_json.get('ltX', 0)
_f_x2 = pos_json.get('rbX', 0)
_x = f'{float((_f_x2 + _f_x)/(2 * _v_w)) :.4f}*iw-ih*ih/(2*iw)'
filter_args.append(f"{video_output_str}crop=x={_x}:y=0:w=ih*ih/iw:h=ih[v_cut{effect_index}]")
video_output_str = f"[v_cut{effect_index}]"
effect_index += 1
for effect in self.effects:
if effect.startswith("cameraShot:"):
param = effect.split(":", 2)[1]
if param == '':
param = "3,1,0"
_split = param.split(",")
start = 3
duration = 1
rotate_deg = 0
if len(_split) >= 3:
if _split[2] == '':
rotate_deg = 0
else:
rotate_deg = int(_split[2])
if len(_split) >= 2:
duration = float(_split[1])
if len(_split) >= 1:
start = float(_split[0])
_start_out_str = "[eff_s]"
_mid_out_str = "[eff_m]"
_end_out_str = "[eff_e]"
filter_args.append(f"{video_output_str}split=3{_start_out_str}{_mid_out_str}{_end_out_str}")
filter_args.append(f"{_start_out_str}select=lt(n\,{int(start*self.frame_rate)}){_start_out_str}")
filter_args.append(f"{_end_out_str}select=gt(n\,{int(start*self.frame_rate)}){_end_out_str}")
filter_args.append(f"{_mid_out_str}select=eq(n\,{int(start*self.frame_rate)}){_mid_out_str}")
filter_args.append(f"{_mid_out_str}tpad=start_mode=clone:start_duration={duration:.4f}{_mid_out_str}")
if rotate_deg != 0:
filter_args.append(f"{_mid_out_str}rotate=PI*{rotate_deg}/360{_mid_out_str}")
# filter_args.append(f"{video_output_str}trim=start=0:end={start+duration},tpad=stop_mode=clone:stop_duration={duration},setpts=PTS-STARTPTS{_start_out_str}")
# filter_args.append(f"tpad=start_mode=clone:start_duration={duration},setpts=PTS-STARTPTS{_start_out_str}")
# filter_args.append(f"{_end_out_str}trim=start={start}{_end_out_str}")
video_output_str = f"[v_eff{effect_index}]"
# filter_args.append(f"{_end_out_str}{_start_out_str}overlay=eof_action=pass{video_output_str}")
filter_args.append(f"{_start_out_str}{_mid_out_str}{_end_out_str}concat=n=3:v=1:a=0,setpts=N/{self.frame_rate}/TB{video_output_str}")
effect_index += 1
elif effect.startswith("ospeed:"):
param = effect.split(":", 2)[1]
if param == '':
param = "1"
if param != "1":
# 视频变速
effect_index += 1
filter_args.append(f"{video_output_str}setpts={param}*PTS[v_eff{effect_index}]")
video_output_str = f"[v_eff{effect_index}]"
elif effect.startswith("zoom:"):
...
...
for lut in self.luts: for lut in self.luts:
filter_args.append("[0:v]lut3d=file=" + lut + "[0:v]") filter_args.append(f"{video_output_str}lut3d=file={lut}{video_output_str}")
for overlay in self.overlays: for overlay in self.overlays:
input_index = input_args.count("-i") input_index = input_args.count("-i")
input_args.append("-i") input_args.append("-i")
@ -159,91 +249,89 @@ class FfmpegTask(object):
output_args.append("-r") output_args.append("-r")
output_args.append(f"{self.frame_rate}") output_args.append(f"{self.frame_rate}")
if self.mute: if self.mute:
output_args.append("-an") input_index = input_args.count("-i")
input_args += MUTE_AUDIO_INPUT
filter_args.append(f"[{input_index}:a]acopy[a]")
audio_track_index += 1
audio_output_str = "[a]"
else: else:
input_index = 0 audio_output_str = "[0:a]"
audio_track_index += 1
for audio in self.audios: for audio in self.audios:
input_index = input_args.count("-i") input_index = input_args.count("-i")
input_args.append("-i") input_args.append("-i")
input_args.append(audio.replace("\\", "/")) input_args.append(audio.replace("\\", "/"))
if audio_input_count > 0: audio_track_index += 1
filter_args.append(f"{audio_output_str}[{input_index}:a]amix[a]") filter_args.append(f"{audio_output_str}[{input_index}:a]amix=duration=shortest:dropout_transition=0:normalize=0[a]")
audio_output_str = "[a]" audio_output_str = "[a]"
else: if audio_output_str:
audio_output_str = f"[{input_index}:a]" output_args.append("-map")
audio_input_count += 1
if audio_input_count == 1:
audio_output_str = f"{input_index}"
output_args.append(f"-map")
output_args.append(audio_output_str) output_args.append(audio_output_str)
return args + input_args + ["-filter_complex", ";".join(filter_args)] + output_args + [self.get_output_file()] _filter_args = [] if len(filter_args) == 0 else ["-filter_complex", ";".join(filter_args)]
return args + input_args + _filter_args + output_args + [self.get_output_file()]
elif self.task_type == 'concat': elif self.task_type == 'concat':
# 无法通过 annexb 合并的 # 无法通过 annexb 合并的
input_args = [] input_args = []
output_args = ["-shortest"] output_args = [*DEFAULT_ARGS]
if self.check_annexb() and len(self.audios) <= 1: filter_args = []
audio_output_str = ""
audio_track_index = 0
# output_args # output_args
if len(self.audios) > 0: if len(self.input_file) == 1:
input_args.append("-an") _file = self.input_file[0]
from util.ffmpeg import probe_video_audio
if type(_file) is str:
input_args += ["-i", _file]
self.mute = not probe_video_audio(_file)
elif isinstance(_file, FfmpegTask):
input_args += ["-i", _file.get_output_file()]
self.mute = not probe_video_audio(_file.get_output_file())
else:
_tmp_file = "tmp_concat_" + str(time.time()) + ".txt" _tmp_file = "tmp_concat_" + str(time.time()) + ".txt"
from util.ffmpeg import probe_video_audio
with open(_tmp_file, "w", encoding="utf-8") as f: with open(_tmp_file, "w", encoding="utf-8") as f:
for input_file in self.input_file: for input_file in self.input_file:
if type(input_file) is str: if type(input_file) is str:
f.write("file '" + input_file + "'\n") f.write("file '" + input_file + "'\n")
elif isinstance(input_file, FfmpegTask): elif isinstance(input_file, FfmpegTask):
f.write("file '" + input_file.get_output_file() + "'\n") f.write("file '" + input_file.get_output_file() + "'\n")
input_args += ("-f", "concat", "-safe", "0", "-i", _tmp_file) input_args += ["-f", "concat", "-safe", "0", "-i", _tmp_file]
self.mute = not probe_video_audio(_tmp_file, "concat")
output_args.append("-map")
output_args.append("0:v")
output_args.append("-c:v") output_args.append("-c:v")
output_args.append("copy") output_args.append("copy")
if len(self.audios) > 0:
input_args.append("-i")
input_args.append(self.audios[0])
output_args.append("-c:a")
output_args.append("copy")
output_args.append("-f")
output_args.append("mp4")
output_args += ("-c:v", "h264_qsv", "-r", "25", "-global_quality", "28", "-look_ahead", "1")
return args + input_args + output_args + [self.get_output_file()]
output_args += ("-c:v", "h264_qsv", "-r", "25", "-global_quality", "28", "-look_ahead", "1")
filter_args = []
video_output_str = "[0:v]"
audio_output_str = "[0:a]"
video_input_count = 0
audio_input_count = 0
for input_file in self.input_file:
input_index = input_args.count("-i")
input_args.append("-i")
if type(input_file) is str:
input_args.append(input_file.replace("\\", "/"))
elif isinstance(input_file, FfmpegTask):
input_args.append(input_file.get_output_file().replace("\\", "/"))
if video_input_count > 0:
filter_args.append(f"{video_output_str}[{input_index}:v]concat=n=2:v=1:a=0[v]")
video_output_str = "[v]"
else:
video_output_str = f"[{input_index}:v]"
video_input_count += 1
output_args.append("-map")
output_args.append(video_output_str)
if self.mute: if self.mute:
output_args.append("-an") input_index = input_args.count("-i")
input_args += MUTE_AUDIO_INPUT
audio_output_str = f"[{input_index}:a]"
audio_track_index += 1
else: else:
input_index = 0 audio_output_str = "[0:a]"
audio_track_index += 1
for audio in self.audios: for audio in self.audios:
input_index = input_args.count("-i") input_index = input_args.count("-i")
input_args.append("-i") input_args.append("-i")
input_args.append(audio.replace("\\", "/")) input_args.append(audio.replace("\\", "/"))
if audio_input_count > 0: audio_track_index += 1
filter_args.append(f"{audio_output_str}[{input_index}:a]amix[a]") filter_args.append(f"{audio_output_str}[{input_index}:a]amix=duration=shortest:dropout_transition=0:normalize=0[a]")
audio_output_str = "[a]" audio_output_str = "[a]"
if audio_output_str:
output_args.append("-map")
if audio_track_index <= 1:
output_args.append(audio_output_str[1:-1])
else: else:
audio_output_str = f"[{input_index}:a]"
audio_input_count += 1
if audio_input_count == 1:
audio_output_str = f"{input_index}"
output_args.append(f"-map")
output_args.append(audio_output_str) output_args.append(audio_output_str)
return args + input_args + ["-filter_complex", ";".join(filter_args)] + output_args + [self.get_output_file()] output_args += AUDIO_ARGS
if self.annexb:
output_args.append("-bsf:v")
output_args.append("h264_mp4toannexb")
output_args.append("-bsf:a")
output_args.append("setts=pts=DTS")
output_args.append("-f")
output_args.append("mpegts" if self.annexb else "mp4")
_filter_args = [] if len(filter_args) == 0 else ["-filter_complex", ";".join(filter_args)]
return args + input_args + _filter_args + output_args + [self.get_output_file()]
elif self.task_type == 'copy': elif self.task_type == 'copy':
if len(self.input_file) == 1: if len(self.input_file) == 1:
if type(self.input_file[0]) is str: if type(self.input_file[0]) is str:

View File

@ -2,18 +2,41 @@ from time import sleep
import biz.task import biz.task
import config import config
from telemetry import init_opentelemetry
from template import load_local_template from template import load_local_template
from util import api from util import api
load_local_template() import os
import glob
load_local_template()
import logging
LOGGER = logging.getLogger(__name__)
init_opentelemetry()
while True: while True:
# print(get_sys_info()) # print(get_sys_info())
print("waiting for task...") print("waiting for task...")
try:
task_list = api.sync_center() task_list = api.sync_center()
except Exception as e:
LOGGER.error("sync_center error", exc_info=e)
sleep(5)
continue
if len(task_list) == 0: if len(task_list) == 0:
# 删除当前文件夹下所有以.mp4、.ts结尾的文件
for file_globs in ['*.mp4', '*.ts', 'tmp_concat*.txt']:
for file_path in glob.glob(file_globs):
try:
os.remove(file_path)
print(f"Deleted file: {file_path}")
except Exception as e:
LOGGER.error(f"Error deleting file {file_path}", exc_info=e)
sleep(5) sleep(5)
for task in task_list: for task in task_list:
print("start task:", task) print("start task:", task)
try:
biz.task.start_task(task) biz.task.start_task(task)
except Exception as e:
LOGGER.error("task_start error", exc_info=e)

30
telemetry/__init__.py Normal file
View File

@ -0,0 +1,30 @@
import os
from constant import SOFTWARE_VERSION
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter as OTLPSpanHttpExporter
from opentelemetry.sdk.resources import DEPLOYMENT_ENVIRONMENT, HOST_NAME, Resource, SERVICE_NAME, SERVICE_VERSION
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
def get_tracer(name):
return trace.get_tracer(name)
# 初始化 OpenTelemetry
def init_opentelemetry():
# 设置服务名、主机名
resource = Resource(attributes={
SERVICE_NAME: "RENDER_WORKER",
SERVICE_VERSION: SOFTWARE_VERSION,
DEPLOYMENT_ENVIRONMENT: "Python",
HOST_NAME: os.getenv("ACCESS_KEY"),
})
# 使用HTTP协议上报
span_processor = BatchSpanProcessor(OTLPSpanHttpExporter(
endpoint="http://tracing-analysis-dc-sh.aliyuncs.com/adapt_e7qojqi4e0@aa79b4d367fb6b7_e7qojqi4e0@53df7ad2afe8301/api/otlp/traces",
))
trace_provider = TracerProvider(resource=resource, active_span_processor=span_processor)
trace.set_tracer_provider(trace_provider)

View File

@ -88,8 +88,8 @@ def download_template(template_id):
new_fp = os.path.join(template_info['local_path'], _fn) new_fp = os.path.join(template_info['local_path'], _fn)
oss.download_from_oss(_template['source'], new_fp) oss.download_from_oss(_template['source'], new_fp)
if _fn.endswith(".mp4"): if _fn.endswith(".mp4"):
from util.ffmpeg import to_annexb from util.ffmpeg import re_encode_and_annexb
new_fp = to_annexb(new_fp) new_fp = re_encode_and_annexb(new_fp)
_template['source'] = os.path.relpath(new_fp, template_info['local_path']) _template['source'] = os.path.relpath(new_fp, template_info['local_path'])
if 'overlays' in _template: if 'overlays' in _template:
for i in range(len(_template['overlays'])): for i in range(len(_template['overlays'])):

View File

@ -1,9 +1,13 @@
import json
import logging import logging
import os import os
import requests import requests
from opentelemetry.trace import Status, StatusCode
import util.system import util.system
from telemetry import get_tracer
from util import oss
session = requests.Session() session = requests.Session()
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -19,16 +23,24 @@ def sync_center():
通过接口获取任务 通过接口获取任务
:return: 任务列表 :return: 任务列表
""" """
try:
from template import TEMPLATES, download_template from template import TEMPLATES, download_template
tracer = get_tracer(__name__)
with tracer.start_as_current_span("sync_center"):
with tracer.start_as_current_span("sync_center.request") as req_span:
try:
req_span.set_attribute("http.method", "POST")
req_span.set_attribute("http.url", os.getenv('API_ENDPOINT') + "/sync")
response = session.post(os.getenv('API_ENDPOINT') + "/sync", json={ response = session.post(os.getenv('API_ENDPOINT') + "/sync", json={
'accessKey': os.getenv('ACCESS_KEY'), 'accessKey': os.getenv('ACCESS_KEY'),
'clientStatus': util.system.get_sys_info(), 'clientStatus': util.system.get_sys_info(),
'templateList': [{'id': t.get('id', ''), 'updateTime': t.get('updateTime', '')} for t in TEMPLATES.values()] 'templateList': [{'id': t.get('id', ''), 'updateTime': t.get('updateTime', '')} for t in
TEMPLATES.values()]
}, timeout=10) }, timeout=10)
req_span.set_attribute("http.status_code", response.status_code)
req_span.set_attribute("http.response", response.text)
response.raise_for_status() response.raise_for_status()
except requests.RequestException as e: except requests.RequestException as e:
req_span.set_attribute("http.error", str(e))
logger.error("请求失败!", e) logger.error("请求失败!", e)
return [] return []
data = response.json() data = response.json()
@ -56,24 +68,32 @@ def get_template_info(template_id):
:type template_id: str :type template_id: str
:return: 模板信息 :return: 模板信息
""" """
tracer = get_tracer(__name__)
with tracer.start_as_current_span("get_template_info"):
with tracer.start_as_current_span("get_template_info.request") as req_span:
try: try:
req_span.set_attribute("http.method", "POST")
req_span.set_attribute("http.url", '{0}/template/{1}'.format(os.getenv('API_ENDPOINT'), template_id))
response = session.post('{0}/template/{1}'.format(os.getenv('API_ENDPOINT'), template_id), json={ response = session.post('{0}/template/{1}'.format(os.getenv('API_ENDPOINT'), template_id), json={
'accessKey': os.getenv('ACCESS_KEY'), 'accessKey': os.getenv('ACCESS_KEY'),
}, timeout=10) }, timeout=10)
req_span.set_attribute("http.status_code", response.status_code)
req_span.set_attribute("http.response", response.text)
response.raise_for_status() response.raise_for_status()
except requests.RequestException as e: except requests.RequestException as e:
req_span.set_attribute("api.error", str(e))
logger.error("请求失败!", e) logger.error("请求失败!", e)
return None return None
data = response.json() data = response.json()
logger.debug("获取模板信息结果:【%s", data)
remote_template_info = data.get('data', {}) remote_template_info = data.get('data', {})
logger.debug("获取模板信息结果:【%s", remote_template_info)
template = { template = {
'id': template_id, 'id': template_id,
'updateTime': remote_template_info.get('updateTime', template_id), 'updateTime': remote_template_info.get('updateTime', template_id),
'scenic_name': remote_template_info.get('scenicName', '景区'), 'scenic_name': remote_template_info.get('scenicName', '景区'),
'name': remote_template_info.get('name', '模版'), 'name': remote_template_info.get('name', '模版'),
'video_size': '1920x1080', 'video_size': remote_template_info.get('resolution', '1920x1080'),
'frame_rate': 30, 'frame_rate': 25,
'overall_duration': 30, 'overall_duration': 30,
'video_parts': [ 'video_parts': [
@ -90,6 +110,7 @@ def get_template_info(template_id):
# 占位符 # 占位符
_template['source'] = "PLACEHOLDER_" + template_info.get('sourceUrl', '') _template['source'] = "PLACEHOLDER_" + template_info.get('sourceUrl', '')
_template['mute'] = template_info.get('mute', True) _template['mute'] = template_info.get('mute', True)
_template['crop_mode'] = template_info.get('cropEnable', None)
else: else:
_template['source'] = None _template['source'] = None
_overlays = template_info.get('overlays', '') _overlays = template_info.get('overlays', '')
@ -101,6 +122,12 @@ def get_template_info(template_id):
_luts = template_info.get('luts', '') _luts = template_info.get('luts', '')
if _luts: if _luts:
_template['luts'] = _luts.split(",") _template['luts'] = _luts.split(",")
_only_if = template_info.get('onlyIf', '')
if _only_if:
_template['only_if'] = _only_if
_effects = template_info.get('effects', '')
if _effects:
_template['effects'] = _effects.split("|")
return _template return _template
# outer template definition # outer template definition
@ -112,63 +139,101 @@ def get_template_info(template_id):
parts = _template_normalizer(children_template) parts = _template_normalizer(children_template)
template['video_parts'].append(parts) template['video_parts'].append(parts)
template['local_path'] = os.path.join(os.getenv('TEMPLATE_DIR'), str(template_id)) template['local_path'] = os.path.join(os.getenv('TEMPLATE_DIR'), str(template_id))
with get_tracer("api").start_as_current_span("get_template_info.template") as res_span:
res_span.set_attribute("normalized.response", json.dumps(template))
return template return template
def report_task_success(task_info, **kwargs): def report_task_success(task_info, **kwargs):
tracer = get_tracer(__name__)
with tracer.start_as_current_span("report_task_success"):
with tracer.start_as_current_span("report_task_success.request") as req_span:
try: try:
req_span.set_attribute("http.method", "POST")
req_span.set_attribute("http.url",
'{0}/{1}/success'.format(os.getenv('API_ENDPOINT'), task_info.get("id")))
response = session.post('{0}/{1}/success'.format(os.getenv('API_ENDPOINT'), task_info.get("id")), json={ response = session.post('{0}/{1}/success'.format(os.getenv('API_ENDPOINT'), task_info.get("id")), json={
'accessKey': os.getenv('ACCESS_KEY'), 'accessKey': os.getenv('ACCESS_KEY'),
**kwargs **kwargs
}, timeout=10) }, timeout=10)
req_span.set_attribute("http.status_code", response.status_code)
req_span.set_attribute("http.response", response.text)
response.raise_for_status() response.raise_for_status()
req_span.set_status(Status(StatusCode.OK))
except requests.RequestException as e: except requests.RequestException as e:
req_span.set_attribute("api.error", str(e))
logger.error("请求失败!", e) logger.error("请求失败!", e)
return None return None
def report_task_start(task_info): def report_task_start(task_info):
tracer = get_tracer(__name__)
with tracer.start_as_current_span("report_task_start"):
with tracer.start_as_current_span("report_task_start.request") as req_span:
try: try:
req_span.set_attribute("http.method", "POST")
req_span.set_attribute("http.url",
'{0}/{1}/start'.format(os.getenv('API_ENDPOINT'), task_info.get("id")))
response = session.post('{0}/{1}/start'.format(os.getenv('API_ENDPOINT'), task_info.get("id")), json={ response = session.post('{0}/{1}/start'.format(os.getenv('API_ENDPOINT'), task_info.get("id")), json={
'accessKey': os.getenv('ACCESS_KEY'), 'accessKey': os.getenv('ACCESS_KEY'),
}, timeout=10) }, timeout=10)
req_span.set_attribute("http.status_code", response.status_code)
req_span.set_attribute("http.response", response.text)
response.raise_for_status() response.raise_for_status()
req_span.set_status(Status(StatusCode.OK))
except requests.RequestException as e: except requests.RequestException as e:
req_span.set_attribute("api.error", str(e))
logger.error("请求失败!", e) logger.error("请求失败!", e)
return None return None
def report_task_failed(task_info, reason=''): def report_task_failed(task_info, reason=''):
tracer = get_tracer(__name__)
with tracer.start_as_current_span("report_task_failed"):
with tracer.start_as_current_span("report_task_failed.request") as req_span:
try: try:
req_span.set_attribute("http.method", "POST")
req_span.set_attribute("http.url",
'{0}/{1}/fail'.format(os.getenv('API_ENDPOINT'), task_info.get("id")))
response = session.post('{0}/{1}/fail'.format(os.getenv('API_ENDPOINT'), task_info.get("id")), json={ response = session.post('{0}/{1}/fail'.format(os.getenv('API_ENDPOINT'), task_info.get("id")), json={
'accessKey': os.getenv('ACCESS_KEY'), 'accessKey': os.getenv('ACCESS_KEY'),
'reason': reason 'reason': reason
}, timeout=10) }, timeout=10)
req_span.set_attribute("http.status_code", response.status_code)
req_span.set_attribute("http.response", response.text)
response.raise_for_status() response.raise_for_status()
req_span.set_status(Status(StatusCode.OK))
except requests.RequestException as e: except requests.RequestException as e:
req_span.set_attribute("api.error", str(e))
req_span.set_status(Status(StatusCode.ERROR))
logger.error("请求失败!", e) logger.error("请求失败!", e)
return None return None
def upload_task_file(task_info, ffmpeg_task): def upload_task_file(task_info, ffmpeg_task):
tracer = get_tracer(__name__)
with get_tracer("api").start_as_current_span("upload_task_file") as span:
logger.info("开始上传文件: %s", task_info.get("id")) logger.info("开始上传文件: %s", task_info.get("id"))
span.set_attribute("file.id", task_info.get("id"))
with tracer.start_as_current_span("upload_task_file.request_upload_url") as req_span:
try: try:
response = session.post('{0}/{1}/uploadUrl'.format(os.getenv('API_ENDPOINT'), task_info.get("id")), json={ req_span.set_attribute("http.method", "POST")
req_span.set_attribute("http.url",
'{0}/{1}/uploadUrl'.format(os.getenv('API_ENDPOINT'), task_info.get("id")))
response = session.post('{0}/{1}/uploadUrl'.format(os.getenv('API_ENDPOINT'), task_info.get("id")),
json={
'accessKey': os.getenv('ACCESS_KEY'), 'accessKey': os.getenv('ACCESS_KEY'),
}, timeout=10) }, timeout=10)
req_span.set_attribute("http.status_code", response.status_code)
req_span.set_attribute("http.response", response.text)
response.raise_for_status() response.raise_for_status()
req_span.set_status(Status(StatusCode.OK))
except requests.RequestException as e: except requests.RequestException as e:
span.set_attribute("api.error", str(e))
req_span.set_status(Status(StatusCode.ERROR))
logger.error("请求失败!", e) logger.error("请求失败!", e)
return False return False
data = response.json() data = response.json()
url = data.get('data', "") url = data.get('data', "")
logger.info("开始上传文件: %s%s", task_info.get("id"), url) logger.info("开始上传文件: %s%s", task_info.get("id"), url)
try: return oss.upload_to_oss(url, ffmpeg_task.get_output_file())
with open(ffmpeg_task.get_output_file(), 'rb') as f:
requests.put(url, data=f)
except requests.RequestException as e:
logger.error("上传失败!", e)
return False
finally:
logger.info("上传文件结束: %s", task_info.get("id"))
return True

View File

@ -1,40 +1,85 @@
import json
import logging import logging
import os import os
import subprocess import subprocess
from datetime import datetime from datetime import datetime
from typing import Optional, IO from typing import Optional, IO
from entity.ffmpeg import FfmpegTask from opentelemetry.trace import Status, StatusCode
from entity.ffmpeg import FfmpegTask, ENCODER_ARGS, VIDEO_ARGS, AUDIO_ARGS, MUTE_AUDIO_INPUT
from telemetry import get_tracer
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def to_annexb(file): def re_encode_and_annexb(file):
with get_tracer("ffmpeg").start_as_current_span("re_encode_and_annexb") as span:
span.set_attribute("file.path", file)
if not os.path.exists(file): if not os.path.exists(file):
span.set_status(Status(StatusCode.ERROR))
return file return file
logger.info("ToAnnexb: %s", file) logger.info("ReEncodeAndAnnexb: %s", file)
ffmpeg_process = subprocess.run(["ffmpeg", "-y", "-hide_banner", "-i", file, "-c", "copy", "-bsf:v", "h264_mp4toannexb", has_audio = not not probe_video_audio(file)
ffmpeg_process = subprocess.run(["ffmpeg", "-y", "-hide_banner", "-vsync", "cfr", "-i", file,
*(set() if has_audio else MUTE_AUDIO_INPUT),
"-map", "0:v", "-map", "0:a" if has_audio else "1:a",
*VIDEO_ARGS, "-bsf:v", "h264_mp4toannexb",
*AUDIO_ARGS, "-bsf:a", "setts=pts=DTS",
*ENCODER_ARGS, "-shortest", "-fflags", "+genpts",
"-f", "mpegts", file + ".ts"]) "-f", "mpegts", file + ".ts"])
logger.info("ToAnnexb: %s, returned: %s", file, ffmpeg_process.returncode) logger.info(" ".join(ffmpeg_process.args))
span.set_attribute("ffmpeg.args", json.dumps(ffmpeg_process.args))
logger.info("ReEncodeAndAnnexb: %s, returned: %s", file, ffmpeg_process.returncode)
span.set_attribute("ffmpeg.code", ffmpeg_process.returncode)
if ffmpeg_process.returncode == 0: if ffmpeg_process.returncode == 0:
os.remove(file) span.set_status(Status(StatusCode.OK))
span.set_attribute("file.size", os.path.getsize(file+".ts"))
# os.remove(file)
return file+".ts" return file+".ts"
else: else:
span.set_status(Status(StatusCode.ERROR))
return file return file
def start_render(ffmpeg_task: FfmpegTask): def start_render(ffmpeg_task: FfmpegTask):
logger.info(ffmpeg_task) tracer = get_tracer(__name__)
with tracer.start_as_current_span("start_render") as span:
span.set_attribute("ffmpeg.task", str(ffmpeg_task))
if not ffmpeg_task.need_run(): if not ffmpeg_task.need_run():
ffmpeg_task.set_output_file(ffmpeg_task.input_file[0]) ffmpeg_task.set_output_file(ffmpeg_task.input_file[0])
span.set_status(Status(StatusCode.OK))
return True return True
ffmpeg_args = ffmpeg_task.get_ffmpeg_args() ffmpeg_args = ffmpeg_task.get_ffmpeg_args()
logger.info(ffmpeg_args)
if len(ffmpeg_args) == 0: if len(ffmpeg_args) == 0:
ffmpeg_task.set_output_file(ffmpeg_task.input_file[0]) ffmpeg_task.set_output_file(ffmpeg_task.input_file[0])
span.set_status(Status(StatusCode.OK))
return True return True
ffmpeg_process = subprocess.run(["ffmpeg", "-progress", "-", "-loglevel", "error", *ffmpeg_args], **subprocess_args(True)) ffmpeg_process = subprocess.run(["ffmpeg", "-progress", "-", "-loglevel", "error", *ffmpeg_args], **subprocess_args(True))
logger.info("FINISH TASK, OUTPUT IS %s", handle_ffmpeg_output(ffmpeg_process.stdout)) span.set_attribute("ffmpeg.args", json.dumps(ffmpeg_process.args))
logger.info(" ".join(ffmpeg_process.args))
ffmpeg_final_out = handle_ffmpeg_output(ffmpeg_process.stdout)
span.set_attribute("ffmpeg.out", ffmpeg_final_out)
logger.info("FINISH TASK, OUTPUT IS %s", ffmpeg_final_out)
code = ffmpeg_process.returncode code = ffmpeg_process.returncode
return code == 0 span.set_attribute("ffmpeg.code", code)
if code != 0:
span.set_status(Status(StatusCode.ERROR))
logger.error("FFMPEG ERROR: %s", ffmpeg_process.stderr)
return False
span.set_attribute("ffmpeg.out_file", ffmpeg_task.output_file)
try:
file_size = os.path.getsize(ffmpeg_task.output_file)
span.set_attribute("file.size", file_size)
if file_size < 4096:
span.set_status(Status(StatusCode.ERROR))
logger.error("FFMPEG ERROR: OUTPUT FILE IS TOO SMALL")
return False
except OSError:
span.set_attribute("file.size", 0)
span.set_status(Status(StatusCode.ERROR))
logger.error("FFMPEG ERROR: OUTPUT FILE NOT FOUND")
return False
span.set_status(Status(StatusCode.OK))
return True
def handle_ffmpeg_output(stdout: Optional[bytes]) -> str: def handle_ffmpeg_output(stdout: Optional[bytes]) -> str:
out_time = "0:0:0.0" out_time = "0:0:0.0"
@ -55,27 +100,60 @@ def handle_ffmpeg_output(stdout: Optional[bytes]) -> str:
print("[ ]Speed:", out_time, "@", speed) print("[ ]Speed:", out_time, "@", speed)
return out_time+"@"+speed return out_time+"@"+speed
def duration_str_to_float(duration_str: str) -> float: def duration_str_to_float(duration_str: str) -> float:
_duration = datetime.strptime(duration_str, "%H:%M:%S.%f") - datetime(1900, 1, 1) _duration = datetime.strptime(duration_str, "%H:%M:%S.%f") - datetime(1900, 1, 1)
return _duration.total_seconds() return _duration.total_seconds()
def probe_video_info(video_file): def probe_video_info(video_file):
tracer = get_tracer(__name__)
with tracer.start_as_current_span("probe_video_info") as span:
span.set_attribute("video.file", video_file)
# 获取宽度和高度 # 获取宽度和高度
result = subprocess.run( result = subprocess.run(
["ffprobe.exe", '-v', 'error', '-select_streams', 'v:0', '-show_entries', 'stream=width,height:format=duration', '-of', ["ffprobe", '-v', 'error', '-select_streams', 'v:0', '-show_entries', 'stream=width,height:format=duration', '-of',
'csv=s=x:p=0', video_file], 'csv=s=x:p=0', video_file],
stderr=subprocess.STDOUT, stderr=subprocess.STDOUT,
**subprocess_args(True) **subprocess_args(True)
) )
span.set_attribute("ffprobe.args", json.dumps(result.args))
span.set_attribute("ffprobe.code", result.returncode)
if result.returncode != 0:
span.set_status(Status(StatusCode.ERROR))
return 0, 0, 0
all_result = result.stdout.decode('utf-8').strip() all_result = result.stdout.decode('utf-8').strip()
span.set_attribute("ffprobe.out", all_result)
if all_result == '':
span.set_status(Status(StatusCode.ERROR))
return 0, 0, 0
span.set_status(Status(StatusCode.OK))
wh, duration = all_result.split('\n') wh, duration = all_result.split('\n')
width, height = wh.strip().split('x') width, height = wh.strip().split('x')
return int(width), int(height), float(duration) return int(width), int(height), float(duration)
def probe_video_audio(video_file, type=None):
tracer = get_tracer(__name__)
with tracer.start_as_current_span("probe_video_audio") as span:
span.set_attribute("video.file", video_file)
args = ["ffprobe", "-hide_banner", "-v", "error", "-select_streams", "a", "-show_entries", "stream=index", "-of", "csv=p=0"]
if type == 'concat':
args.append("-safe")
args.append("0")
args.append("-f")
args.append("concat")
args.append(video_file)
logger.info(" ".join(args))
result = subprocess.run(args, stderr=subprocess.STDOUT, **subprocess_args(True))
span.set_attribute("ffprobe.args", json.dumps(result.args))
span.set_attribute("ffprobe.code", result.returncode)
logger.info("probe_video_audio: %s", result.stdout.decode('utf-8').strip())
if result.returncode != 0:
return False
if result.stdout.decode('utf-8').strip() == '':
return False
return True
# Create a set of arguments which make a ``subprocess.Popen`` (and # Create a set of arguments which make a ``subprocess.Popen`` (and
# variants) call work with or without Pyinstaller, ``--noconsole`` or # variants) call work with or without Pyinstaller, ``--noconsole`` or
# not, on Windows and Linux. Typical use:: # not, on Windows and Linux. Typical use::

View File

@ -2,6 +2,9 @@ import logging
import os import os
import requests import requests
from opentelemetry.trace import Status, StatusCode
from telemetry import get_tracer
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -13,14 +16,41 @@ def upload_to_oss(url, file_path):
:param str file_path: 文件路径 :param str file_path: 文件路径
:return bool: 是否成功 :return bool: 是否成功
""" """
with open(file_path, 'rb') as f: tracer = get_tracer(__name__)
with tracer.start_as_current_span("upload_to_oss") as span:
span.set_attribute("file.url", url)
span.set_attribute("file.path", file_path)
span.set_attribute("file.size", os.path.getsize(file_path))
max_retries = 5
retries = 0
while retries < max_retries:
with tracer.start_as_current_span("upload_to_oss.request") as req_span:
req_span.set_attribute("http.retry_count", retries)
try: try:
response = requests.put(url, data=f) req_span.set_attribute("http.method", "PUT")
return response.status_code == 200 req_span.set_attribute("http.url", url)
with open(file_path, 'rb') as f:
response = requests.put(url, data=f, timeout=60, headers={"Content-Type": "video/mp4"})
req_span.set_attribute("http.status_code", response.status_code)
req_span.set_attribute("http.response", response.text)
response.raise_for_status()
req_span.set_status(Status(StatusCode.OK))
span.set_status(Status(StatusCode.OK))
return True
except requests.exceptions.Timeout:
req_span.set_attribute("http.error", "Timeout")
req_span.set_status(Status(StatusCode.ERROR))
retries += 1
logger.warning(f"Upload timed out. Retrying {retries}/{max_retries}...")
except Exception as e: except Exception as e:
print(e) req_span.set_attribute("http.error", str(e))
req_span.set_status(Status(StatusCode.ERROR))
retries += 1
logger.warning(f"Upload failed. Retrying {retries}/{max_retries}...")
span.set_status(Status(StatusCode.ERROR))
return False return False
def download_from_oss(url, file_path): def download_from_oss(url, file_path):
""" """
使用签名URL下载文件到OSS 使用签名URL下载文件到OSS
@ -28,16 +58,40 @@ def download_from_oss(url, file_path):
:param Union[LiteralString, str, bytes] file_path: 文件路径 :param Union[LiteralString, str, bytes] file_path: 文件路径
:return bool: 是否成功 :return bool: 是否成功
""" """
tracer = get_tracer(__name__)
with tracer.start_as_current_span("download_from_oss") as span:
span.set_attribute("file.url", url)
span.set_attribute("file.path", file_path)
logging.info("download_from_oss: %s", url) logging.info("download_from_oss: %s", url)
file_dir, file_name = os.path.split(file_path) file_dir, file_name = os.path.split(file_path)
if file_dir: if file_dir:
if not os.path.exists(file_dir): if not os.path.exists(file_dir):
os.makedirs(file_dir) os.makedirs(file_dir)
max_retries = 5
retries = 0
while retries < max_retries:
with tracer.start_as_current_span("download_from_oss.request") as req_span:
req_span.set_attribute("http.retry_count", retries)
try: try:
response = requests.get(url) req_span.set_attribute("http.method", "GET")
req_span.set_attribute("http.url", url)
response = requests.get(url, timeout=15) # 设置超时时间
req_span.set_attribute("http.status_code", response.status_code)
with open(file_path, 'wb') as f: with open(file_path, 'wb') as f:
f.write(response.content) f.write(response.content)
req_span.set_attribute("file.size", os.path.getsize(file_path))
req_span.set_status(Status(StatusCode.OK))
span.set_status(Status(StatusCode.OK))
return True return True
except requests.exceptions.Timeout:
req_span.set_attribute("http.error", "Timeout")
req_span.set_status(Status(StatusCode.ERROR))
retries += 1
logger.warning(f"Download timed out. Retrying {retries}/{max_retries}...")
except Exception as e: except Exception as e:
print(e) req_span.set_attribute("http.error", str(e))
req_span.set_status(Status(StatusCode.ERROR))
retries += 1
logger.warning(f"Download failed. Retrying {retries}/{max_retries}...")
span.set_status(Status(StatusCode.ERROR))
return False return False