Compare commits

...

19 Commits

Author SHA1 Message Date
f12422b346 Merge branch 'master' into windows_nvidia
# Conflicts:
#	entity/ffmpeg.py
2025-03-06 14:06:38 +08:00
56bdad7ad1 音轨叠加 2025-03-06 10:34:28 +08:00
94373cee72 cameraShot特效及旋转 2025-03-05 14:57:02 +08:00
94b08dfcb5 Merge branch 'master' into windows_nvidia 2025-03-04 17:48:57 +08:00
4549b0ab44 分辨率和裁切 2025-03-04 17:43:47 +08:00
d8ab94fcba ffmpeg 参数 2025-03-04 16:11:56 +08:00
9385945030 Merge branch 'master' into windows_nvidia 2025-03-04 12:40:30 +08:00
9d178a3d34 埋点 2025-03-04 12:36:48 +08:00
1f9632761f effect 2025-03-03 14:27:52 +08:00
9041093324 windows nvidia 2025-02-27 16:49:48 +08:00
fff20610a5 profile level指定及修复 2025-02-27 16:48:57 +08:00
67696739f9 切割模式 2025-02-27 14:02:17 +08:00
2ea248c02e 上传文件也弄个超时 2025-02-16 18:15:35 +08:00
358207efdc 定时清理目录下无用文件 2025-02-16 18:15:25 +08:00
94a5e687df 未生成文件时,上报失败 2025-02-08 15:02:36 +08:00
b7d6797901 忽略无用文件 2025-02-05 10:13:33 +08:00
6d9d373032 only if 逻辑 2025-01-23 14:28:51 +08:00
549ee8320a ffprobe 报错后不采用其内容 2025-01-22 16:04:33 +08:00
29bb80f3b9 渲染后再to annexb,使用新逻辑拼接 2025-01-22 14:31:59 +08:00
10 changed files with 659 additions and 330 deletions

5
.gitignore vendored
View File

@ -6,6 +6,11 @@ __pycache__/
*.so *.so
.Python .Python
build/ build/
dist/
*.mp4
*.ts
rand*.ts
tmp_concat_*.txt
*.egg-info/ *.egg-info/
*.egg *.egg
*.manifest *.manifest

View File

@ -6,44 +6,73 @@ from entity.ffmpeg import FfmpegTask
import logging import logging
from util import ffmpeg, oss from util import ffmpeg, oss
from telemetry import get_tracer
logger = logging.getLogger('biz/ffmpeg') logger = logging.getLogger('biz/ffmpeg')
def parse_ffmpeg_task(task_info, template_info): def parse_ffmpeg_task(task_info, template_info):
tasks = [] tracer = get_tracer(__name__)
# 中间片段 with tracer.start_as_current_span("parse_ffmpeg_task"):
task_params_str = task_info.get("taskParams", "{}") tasks = []
task_params = json.loads(task_params_str) # 中间片段
for part in template_info.get("video_parts"): task_params_str = task_info.get("taskParams", "{}")
source = parse_video(part.get('source'), task_params, template_info) task_params = json.loads(task_params_str)
if not source: for part in template_info.get("video_parts"):
logger.warning("no video found for part: " + str(part)) source = parse_video(part.get('source'), task_params, template_info)
continue if not source:
sub_ffmpeg_task = FfmpegTask(source) logger.warning("no video found for part: " + str(part))
sub_ffmpeg_task.annexb = True continue
sub_ffmpeg_task.frame_rate = template_info.get("frame_rate", 25) only_if = part.get('only_if', '')
for lut in part.get('filters', []): if only_if:
sub_ffmpeg_task.add_lut(os.path.join(template_info.get("local_path"), lut)) if not check_placeholder_exist(only_if, task_params):
for audio in part.get('audios', []): logger.info("because only_if exist, placeholder: %s not exist, skip part: %s", only_if, part)
sub_ffmpeg_task.add_audios(os.path.join(template_info.get("local_path"), audio)) continue
for overlay in part.get('overlays', []): sub_ffmpeg_task = FfmpegTask(source)
sub_ffmpeg_task.add_overlay(os.path.join(template_info.get("local_path"), overlay)) sub_ffmpeg_task.annexb = True
tasks.append(sub_ffmpeg_task) sub_ffmpeg_task.ext_data = find_placeholder_params(part.get('source'), task_params) or {}
output_file = "out_" + str(time.time()) + ".mp4" sub_ffmpeg_task.frame_rate = template_info.get("frame_rate", 25)
task = FfmpegTask(tasks, output_file=output_file) sub_ffmpeg_task.center_cut = part.get("crop_mode", None)
overall = template_info.get("overall_template") for effect in part.get('effects', []):
task.frame_rate = template_info.get("frame_rate", 25) sub_ffmpeg_task.add_effect(effect)
if overall.get('source', ''): for lut in part.get('filters', []):
source = parse_video(overall.get('source'), task_params, template_info) sub_ffmpeg_task.add_lut(os.path.join(template_info.get("local_path"), lut))
task.add_inputs(source) for audio in part.get('audios', []):
for lut in overall.get('filters', []): sub_ffmpeg_task.add_audios(os.path.join(template_info.get("local_path"), audio))
task.add_lut(os.path.join(template_info.get("local_path"), lut)) for overlay in part.get('overlays', []):
for audio in overall.get('audios', []): sub_ffmpeg_task.add_overlay(os.path.join(template_info.get("local_path"), overlay))
task.add_audios(os.path.join(template_info.get("local_path"), audio)) tasks.append(sub_ffmpeg_task)
for overlay in overall.get('overlays', []): output_file = "out_" + str(time.time()) + ".mp4"
task.add_overlay(os.path.join(template_info.get("local_path"), overlay)) task = FfmpegTask(tasks, output_file=output_file)
return task overall = template_info.get("overall_template")
task.mute = False
task.center_cut = template_info.get("crop_mode", None)
task.frame_rate = template_info.get("frame_rate", 25)
if overall.get('source', ''):
source = parse_video(overall.get('source'), task_params, template_info)
task.add_inputs(source)
for effect in overall.get('effects', []):
task.add_effect(effect)
for lut in overall.get('filters', []):
task.add_lut(os.path.join(template_info.get("local_path"), lut))
for audio in overall.get('audios', []):
task.add_audios(os.path.join(template_info.get("local_path"), audio))
for overlay in overall.get('overlays', []):
task.add_overlay(os.path.join(template_info.get("local_path"), overlay))
return task
def find_placeholder_params(source, task_params):
if source.startswith('PLACEHOLDER_'):
placeholder_id = source.replace('PLACEHOLDER_', '')
new_sources = task_params.get(placeholder_id, [])
if type(new_sources) is list:
if len(new_sources) == 0:
logger.debug("no video found for placeholder: " + placeholder_id)
return {}
else:
return new_sources[0]
return {}
def parse_video(source, task_params, template_info): def parse_video(source, task_params, template_info):
@ -66,11 +95,27 @@ def parse_video(source, task_params, template_info):
return os.path.join(template_info.get("local_path"), source) return os.path.join(template_info.get("local_path"), source)
def check_placeholder_exist(placeholder_id, task_params):
if placeholder_id in task_params:
new_sources = task_params.get(placeholder_id, [])
if type(new_sources) is list:
if len(new_sources) == 0:
return False
else:
return True
return True
return False
def start_ffmpeg_task(ffmpeg_task): def start_ffmpeg_task(ffmpeg_task):
for task in ffmpeg_task.analyze_input_render_tasks(): tracer = get_tracer(__name__)
start_ffmpeg_task(task) with tracer.start_as_current_span("start_ffmpeg_task"):
ffmpeg_task.correct_task_type() for task in ffmpeg_task.analyze_input_render_tasks():
return ffmpeg.start_render(ffmpeg_task) result = start_ffmpeg_task(task)
if not result:
return False
ffmpeg_task.correct_task_type()
return ffmpeg.start_render(ffmpeg_task)
def clear_task_tmp_file(ffmpeg_task): def clear_task_tmp_file(ffmpeg_task):

View File

@ -1,24 +1,27 @@
from biz.ffmpeg import parse_ffmpeg_task, start_ffmpeg_task, clear_task_tmp_file, probe_video_info
from telemetry import get_tracer
from template import get_template_def from template import get_template_def
from util import api from util import api
def start_task(task_info): def start_task(task_info):
from biz.ffmpeg import parse_ffmpeg_task, start_ffmpeg_task, clear_task_tmp_file, probe_video_info tracer = get_tracer(__name__)
task_info = api.normalize_task(task_info) with tracer.start_as_current_span("start_task"):
template_info = get_template_def(task_info.get("templateId")) task_info = api.normalize_task(task_info)
api.report_task_start(task_info) template_info = get_template_def(task_info.get("templateId"))
ffmpeg_task = parse_ffmpeg_task(task_info, template_info) api.report_task_start(task_info)
result = start_ffmpeg_task(ffmpeg_task) ffmpeg_task = parse_ffmpeg_task(task_info, template_info)
if not result: result = start_ffmpeg_task(ffmpeg_task)
return api.report_task_failed(task_info) if not result:
oss_result = api.upload_task_file(task_info, ffmpeg_task) return api.report_task_failed(task_info)
if not oss_result: oss_result = api.upload_task_file(task_info, ffmpeg_task)
return api.report_task_failed(task_info) if not oss_result:
# 获取视频长度宽度和时长 return api.report_task_failed(task_info)
width, height, duration = probe_video_info(ffmpeg_task) # 获取视频长度宽度和时长
clear_task_tmp_file(ffmpeg_task) width, height, duration = probe_video_info(ffmpeg_task)
api.report_task_success(task_info, videoInfo={ clear_task_tmp_file(ffmpeg_task)
"width": width, api.report_task_success(task_info, videoInfo={
"height": height, "width": width,
"duration": duration "height": height,
}) "duration": duration
})

View File

@ -1,9 +1,19 @@
import json
import time import time
import uuid import uuid
from typing import Any
DEFAULT_ARGS = ("-shortest",)
ENCODER_ARGS = ("-c:v", "h264_nvenc", "-cq:v", "24", "-preset:v", "p7", "-tune:v", "hq",)
VIDEO_ARGS = ("-profile:v", "high", "-level:v", "4", )
AUDIO_ARGS = ("-c:a", "aac", "-b:a", "128k", "-ar", "48000", "-ac", "2", )
MUTE_AUDIO_INPUT = ("-f", "lavfi", "-i", "anullsrc=cl=stereo:r=48000", )
class FfmpegTask(object): class FfmpegTask(object):
effects: list[str]
def __init__(self, input_file, task_type='copy', output_file=''): def __init__(self, input_file, task_type='copy', output_file=''):
self.annexb = False self.annexb = False
if type(input_file) is str: if type(input_file) is str:
@ -14,6 +24,9 @@ class FfmpegTask(object):
self.input_file = input_file self.input_file = input_file
else: else:
self.input_file = [] self.input_file = []
self.zoom_cut = None
self.center_cut = None
self.ext_data = {}
self.task_type = task_type self.task_type = task_type
self.output_file = output_file self.output_file = output_file
self.mute = True self.mute = True
@ -23,6 +36,7 @@ class FfmpegTask(object):
self.luts = [] self.luts = []
self.audios = [] self.audios = []
self.overlays = [] self.overlays = []
self.effects = []
def __repr__(self): def __repr__(self):
_str = f'FfmpegTask(input_file={self.input_file}, task_type={self.task_type}' _str = f'FfmpegTask(input_file={self.input_file}, task_type={self.task_type}'
@ -34,8 +48,11 @@ class FfmpegTask(object):
_str += f', overlays={self.overlays}' _str += f', overlays={self.overlays}'
if self.annexb: if self.annexb:
_str += f', annexb={self.annexb}' _str += f', annexb={self.annexb}'
if self.effects:
_str += f', effects={self.effects}'
if self.mute: if self.mute:
_str += f', mute={self.mute}' _str += f', mute={self.mute}'
_str += f', center_cut={self.center_cut}'
return _str + ')' return _str + ')'
def analyze_input_render_tasks(self): def analyze_input_render_tasks(self):
@ -77,6 +94,10 @@ class FfmpegTask(object):
self.luts.extend(luts) self.luts.extend(luts)
self.correct_task_type() self.correct_task_type()
def add_effect(self, *effects):
self.effects.extend(effects)
self.correct_task_type()
def get_output_file(self): def get_output_file(self):
if self.task_type == 'copy': if self.task_type == 'copy':
return self.input_file[0] return self.input_file[0]
@ -99,8 +120,14 @@ class FfmpegTask(object):
return False return False
if len(self.subtitles) > 0: if len(self.subtitles) > 0:
return False return False
if len(self.effects) > 0:
return False
if self.speed != 1: if self.speed != 1:
return False return False
if self.zoom_cut is not None:
return False
if self.center_cut is not None:
return False
return True return True
def check_can_copy(self): def check_can_copy(self):
@ -110,40 +137,93 @@ class FfmpegTask(object):
return False return False
if len(self.subtitles) > 0: if len(self.subtitles) > 0:
return False return False
if len(self.effects) > 0:
return False
if self.speed != 1: if self.speed != 1:
return False return False
if len(self.audios) > 1: if len(self.audios) > 1:
return False return False
if len(self.input_file) > 1: if len(self.input_file) > 1:
return False return False
if self.zoom_cut is not None:
return False
if self.center_cut is not None:
return False
return True return True
def check_audio_track(self): def check_audio_track(self):
if len(self.audios) > 0: ...
self.mute = False
def get_ffmpeg_args(self): def get_ffmpeg_args(self):
args = ['-y', '-hide_banner'] args = ['-y', '-hide_banner']
if self.task_type == 'encode': if self.task_type == 'encode':
# args += ('-hwaccel', 'qsv', '-hwaccel_output_format', 'qsv')
input_args = [] input_args = []
filter_args = [] filter_args = []
output_args = ["-shortest", "-c:v", "h264_qsv", "-global_quality", "28", "-look_ahead", "1"] output_args = [*VIDEO_ARGS, *AUDIO_ARGS, *ENCODER_ARGS, *DEFAULT_ARGS]
if self.annexb: if self.annexb:
output_args.append("-bsf:v") output_args.append("-bsf:v")
output_args.append("h264_mp4toannexb") output_args.append("h264_mp4toannexb")
output_args.append("-reset_timestamps")
output_args.append("1")
video_output_str = "[0:v]" video_output_str = "[0:v]"
audio_output_str = "[0:v]" audio_output_str = ""
video_input_count = 0 effect_index = 0
audio_input_count = 0
for input_file in self.input_file: for input_file in self.input_file:
input_args.append("-i") input_args.append("-i")
if type(input_file) is str: if type(input_file) is str:
input_args.append(input_file) input_args.append(input_file)
elif isinstance(input_file, FfmpegTask): elif isinstance(input_file, FfmpegTask):
input_args.append(input_file.get_output_file()) input_args.append(input_file.get_output_file())
if self.center_cut == 1:
pos_json_str = self.ext_data.get('posJson', '{}')
pos_json = json.loads(pos_json_str)
_v_w = pos_json.get('imgWidth', 1)
_f_x = pos_json.get('ltX', 0)
_f_x2 = pos_json.get('rbX', 0)
_x = f'{float((_f_x2 - _f_x)/(2 * _v_w)) :.4f}*iw'
filter_args.append(f"{video_output_str}crop=x={_x}:y=0:w=ih*ih/iw:h=ih[v_cut{effect_index}]")
video_output_str = f"[v_cut{effect_index}]"
effect_index += 1
for effect in self.effects:
if effect.startswith("cameraShot:"):
param = effect.split(":", 2)[1]
if param == '':
param = "3,1,0"
_split = param.split(",")
start = 3
duration = 1
rotate_deg = 0
if len(_split) >= 3:
if _split[2] == '':
rotate_deg = 0
else:
rotate_deg = int(_split[2])
if len(_split) >= 2:
duration = float(_split[1])
if len(_split) >= 1:
start = float(_split[0])
_start_out_str = "[eff_s]"
_mid_out_str = "[eff_m]"
_end_out_str = "[eff_e]"
filter_args.append(f"{video_output_str}split=3{_start_out_str}{_mid_out_str}{_end_out_str}")
filter_args.append(f"{_start_out_str}select=lt(n\,{int(start*self.frame_rate)}){_start_out_str}")
filter_args.append(f"{_end_out_str}select=gt(n\,{int(start*self.frame_rate)}){_end_out_str}")
filter_args.append(f"{_mid_out_str}select=eq(n\,{int(start*self.frame_rate)}){_mid_out_str}")
filter_args.append(f"{_mid_out_str}tpad=start_mode=clone:start_duration={duration:.4f}{_mid_out_str}")
if rotate_deg != 0:
filter_args.append(f"{_mid_out_str}rotate=PI*{rotate_deg}/360{_mid_out_str}")
# filter_args.append(f"{video_output_str}trim=start=0:end={start+duration},tpad=stop_mode=clone:stop_duration={duration},setpts=PTS-STARTPTS{_start_out_str}")
# filter_args.append(f"tpad=start_mode=clone:start_duration={duration},setpts=PTS-STARTPTS{_start_out_str}")
# filter_args.append(f"{_end_out_str}trim=start={start}{_end_out_str}")
video_output_str = f"[v_eff{effect_index}]"
# filter_args.append(f"{_end_out_str}{_start_out_str}overlay=eof_action=pass{video_output_str}")
filter_args.append(f"{_start_out_str}{_mid_out_str}{_end_out_str}concat=n=3:v=1:a=0,setpts=N/{self.frame_rate}/TB{video_output_str}")
effect_index += 1
elif effect.startswith("zoom:"):
...
...
for lut in self.luts: for lut in self.luts:
filter_args.append("[0:v]lut3d=file=" + lut + "[0:v]") filter_args.append(f"{video_output_str}lut3d=file={lut}{video_output_str}")
for overlay in self.overlays: for overlay in self.overlays:
input_index = input_args.count("-i") input_index = input_args.count("-i")
input_args.append("-i") input_args.append("-i")
@ -159,91 +239,73 @@ class FfmpegTask(object):
output_args.append("-r") output_args.append("-r")
output_args.append(f"{self.frame_rate}") output_args.append(f"{self.frame_rate}")
if self.mute: if self.mute:
output_args.append("-an") input_index = input_args.count("-i")
input_args += MUTE_AUDIO_INPUT
filter_args.append(f"[{input_index}:a]acopy[a]")
audio_output_str = "[a]"
else: else:
input_index = 0 audio_output_str = "[0:a]"
for audio in self.audios: for audio in self.audios:
input_index = input_args.count("-i") input_index = input_args.count("-i")
input_args.append("-i") input_args.append("-i")
input_args.append(audio.replace("\\", "/")) input_args.append(audio.replace("\\", "/"))
if audio_input_count > 0: if audio_output_str == "":
filter_args.append(f"{audio_output_str}[{input_index}:a]amix[a]") filter_args.append(f"[{input_index}:a]acopy[a]")
audio_output_str = "[a]" audio_output_str = "[a]"
else: else:
audio_output_str = f"[{input_index}:a]" filter_args.append(f"{audio_output_str}[{input_index}:a]amix[a]")
audio_input_count += 1 audio_output_str = "[a]"
if audio_input_count == 1: if audio_output_str:
audio_output_str = f"{input_index}" output_args.append("-map")
output_args.append(f"-map")
output_args.append(audio_output_str) output_args.append(audio_output_str)
return args + input_args + ["-filter_complex", ";".join(filter_args)] + output_args + [self.get_output_file()] _filter_args = [] if len(filter_args) == 0 else ["-filter_complex", ";".join(filter_args)]
return args + input_args + _filter_args + output_args + [self.get_output_file()]
elif self.task_type == 'concat': elif self.task_type == 'concat':
# 无法通过 annexb 合并的 # 无法通过 annexb 合并的
input_args = [] input_args = []
output_args = ["-shortest"] output_args = [*DEFAULT_ARGS]
if self.check_annexb() and len(self.audios) <= 1:
# output_args
if len(self.audios) > 0:
input_args.append("-an")
_tmp_file = "tmp_concat_"+str(time.time())+".txt"
with open(_tmp_file, "w", encoding="utf-8") as f:
for input_file in self.input_file:
if type(input_file) is str:
f.write("file '"+input_file+"'\n")
elif isinstance(input_file, FfmpegTask):
f.write("file '" + input_file.get_output_file() + "'\n")
input_args += ("-f", "concat", "-safe", "0", "-i", _tmp_file)
output_args.append("-c:v")
output_args.append("copy")
if len(self.audios) > 0:
input_args.append("-i")
input_args.append(self.audios[0])
output_args.append("-c:a")
output_args.append("copy")
output_args.append("-f")
output_args.append("mp4")
output_args += ("-c:v", "h264_qsv", "-r", "25", "-global_quality", "28", "-look_ahead", "1")
return args + input_args + output_args + [self.get_output_file()]
output_args += ("-c:v", "h264_qsv", "-r", "25", "-global_quality", "28", "-look_ahead", "1")
filter_args = [] filter_args = []
video_output_str = "[0:v]" audio_output_str = ""
audio_output_str = "[0:a]" audio_track_index = 0
video_input_count = 0 # output_args
audio_input_count = 0 _tmp_file = "tmp_concat_"+str(time.time())+".txt"
for input_file in self.input_file: with open(_tmp_file, "w", encoding="utf-8") as f:
for input_file in self.input_file:
if type(input_file) is str:
f.write("file '"+input_file+"'\n")
elif isinstance(input_file, FfmpegTask):
f.write("file '" + input_file.get_output_file() + "'\n")
input_args += ["-f", "concat", "-safe", "0", "-i", _tmp_file]
output_args.append("-map")
output_args.append("0:v")
output_args.append("-c:v")
output_args.append("copy")
if self.mute:
input_index = input_args.count("-i")
input_args += MUTE_AUDIO_INPUT
audio_output_str = f"[{input_index}:a]"
audio_track_index += 1
else:
audio_output_str = "[0:a]"
audio_track_index += 1
for audio in self.audios:
input_index = input_args.count("-i") input_index = input_args.count("-i")
input_args.append("-i") input_args.append("-i")
if type(input_file) is str: input_args.append(audio.replace("\\", "/"))
input_args.append(input_file.replace("\\", "/")) audio_track_index += 1
elif isinstance(input_file, FfmpegTask): filter_args.append(f"{audio_output_str}[{input_index}:a]amix[a]")
input_args.append(input_file.get_output_file().replace("\\", "/")) audio_output_str = "[a]"
if video_input_count > 0: if audio_output_str:
filter_args.append(f"{video_output_str}[{input_index}:v]concat=n=2:v=1:a=0[v]") output_args.append("-map")
video_output_str = "[v]" if audio_track_index <= 1:
output_args.append(audio_output_str[1:-1])
else: else:
video_output_str = f"[{input_index}:v]" output_args.append(audio_output_str)
video_input_count += 1 output_args += AUDIO_ARGS
output_args.append("-map") output_args.append("-f")
output_args.append(video_output_str) output_args.append("mp4")
if self.mute: _filter_args = [] if len(filter_args) == 0 else ["-filter_complex", ";".join(filter_args)]
output_args.append("-an") return args + input_args + _filter_args + output_args + [self.get_output_file()]
else:
input_index = 0
for audio in self.audios:
input_index = input_args.count("-i")
input_args.append("-i")
input_args.append(audio.replace("\\", "/"))
if audio_input_count > 0:
filter_args.append(f"{audio_output_str}[{input_index}:a]amix[a]")
audio_output_str = "[a]"
else:
audio_output_str = f"[{input_index}:a]"
audio_input_count += 1
if audio_input_count == 1:
audio_output_str = f"{input_index}"
output_args.append(f"-map")
output_args.append(audio_output_str)
return args + input_args + ["-filter_complex", ";".join(filter_args)] + output_args + [self.get_output_file()]
elif self.task_type == 'copy': elif self.task_type == 'copy':
if len(self.input_file) == 1: if len(self.input_file) == 1:
if type(self.input_file[0]) is str: if type(self.input_file[0]) is str:

View File

@ -2,18 +2,41 @@ from time import sleep
import biz.task import biz.task
import config import config
from telemetry import init_opentelemetry
from template import load_local_template from template import load_local_template
from util import api from util import api
load_local_template() import os
import glob
load_local_template()
import logging
LOGGER = logging.getLogger(__name__)
init_opentelemetry()
while True: while True:
# print(get_sys_info()) # print(get_sys_info())
print("waiting for task...") print("waiting for task...")
task_list = api.sync_center() try:
task_list = api.sync_center()
except Exception as e:
LOGGER.error("sync_center error", exc_info=e)
sleep(5)
continue
if len(task_list) == 0: if len(task_list) == 0:
# 删除当前文件夹下所有以.mp4、.ts结尾的文件
for file_globs in ['*.mp4', '*.ts', 'tmp_concat*.txt']:
for file_path in glob.glob(file_globs):
try:
os.remove(file_path)
print(f"Deleted file: {file_path}")
except Exception as e:
LOGGER.error(f"Error deleting file {file_path}", exc_info=e)
sleep(5) sleep(5)
for task in task_list: for task in task_list:
print("start task:", task) print("start task:", task)
biz.task.start_task(task) try:
biz.task.start_task(task)
except Exception as e:
LOGGER.error("task_start error", exc_info=e)

30
telemetry/__init__.py Normal file
View File

@ -0,0 +1,30 @@
import os
from constant import SOFTWARE_VERSION
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter as OTLPSpanHttpExporter
from opentelemetry.sdk.resources import DEPLOYMENT_ENVIRONMENT, HOST_NAME, Resource, SERVICE_NAME, SERVICE_VERSION
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
def get_tracer(name):
return trace.get_tracer(name)
# 初始化 OpenTelemetry
def init_opentelemetry():
# 设置服务名、主机名
resource = Resource(attributes={
SERVICE_NAME: "RENDER_WORKER",
SERVICE_VERSION: SOFTWARE_VERSION,
DEPLOYMENT_ENVIRONMENT: "Python",
HOST_NAME: os.getenv("ACCESS_KEY"),
})
# 使用HTTP协议上报
span_processor = BatchSpanProcessor(OTLPSpanHttpExporter(
endpoint="http://tracing-analysis-dc-sh.aliyuncs.com/adapt_e7qojqi4e0@aa79b4d367fb6b7_e7qojqi4e0@53df7ad2afe8301/api/otlp/traces",
))
trace_provider = TracerProvider(resource=resource, active_span_processor=span_processor)
trace.set_tracer_provider(trace_provider)

View File

@ -88,8 +88,8 @@ def download_template(template_id):
new_fp = os.path.join(template_info['local_path'], _fn) new_fp = os.path.join(template_info['local_path'], _fn)
oss.download_from_oss(_template['source'], new_fp) oss.download_from_oss(_template['source'], new_fp)
if _fn.endswith(".mp4"): if _fn.endswith(".mp4"):
from util.ffmpeg import to_annexb from util.ffmpeg import re_encode_and_annexb
new_fp = to_annexb(new_fp) new_fp = re_encode_and_annexb(new_fp)
_template['source'] = os.path.relpath(new_fp, template_info['local_path']) _template['source'] = os.path.relpath(new_fp, template_info['local_path'])
if 'overlays' in _template: if 'overlays' in _template:
for i in range(len(_template['overlays'])): for i in range(len(_template['overlays'])):

View File

@ -1,9 +1,11 @@
import json
import logging import logging
import os import os
import requests import requests
import util.system import util.system
from telemetry import get_tracer
session = requests.Session() session = requests.Session()
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -19,33 +21,41 @@ def sync_center():
通过接口获取任务 通过接口获取任务
:return: 任务列表 :return: 任务列表
""" """
try: from template import TEMPLATES, download_template
from template import TEMPLATES, download_template tracer = get_tracer(__name__)
with tracer.start_as_current_span("sync_center"):
response = session.post(os.getenv('API_ENDPOINT') + "/sync", json={ with get_tracer("api").start_as_current_span("sync_center.request") as req_span:
'accessKey': os.getenv('ACCESS_KEY'), try:
'clientStatus': util.system.get_sys_info(), req_span.set_attribute("http.method", "POST")
'templateList': [{'id': t.get('id', ''), 'updateTime': t.get('updateTime', '')} for t in TEMPLATES.values()] req_span.set_attribute("http.url", os.getenv('API_ENDPOINT') + "/sync")
}, timeout=10) response = session.post(os.getenv('API_ENDPOINT') + "/sync", json={
response.raise_for_status() 'accessKey': os.getenv('ACCESS_KEY'),
except requests.RequestException as e: 'clientStatus': util.system.get_sys_info(),
logger.error("请求失败!", e) 'templateList': [{'id': t.get('id', ''), 'updateTime': t.get('updateTime', '')} for t in
return [] TEMPLATES.values()]
data = response.json() }, timeout=10)
logger.debug("获取任务结果:【%s", data) req_span.set_attribute("http.status_code", response.status_code)
if data.get('code', 0) == 200: response.raise_for_status()
templates = data.get('data', {}).get('templates', []) req_span.set_attribute("api.response", response.text)
tasks = data.get('data', {}).get('tasks', []) except requests.RequestException as e:
else: req_span.set_attribute("api.error", str(e))
tasks = [] logger.error("请求失败!", e)
templates = [] return []
logger.warning("获取任务失败") data = response.json()
for template in templates: logger.debug("获取任务结果:【%s", data)
template_id = template.get('id', '') if data.get('code', 0) == 200:
if template_id: templates = data.get('data', {}).get('templates', [])
logger.info("更新模板:【%s", template_id) tasks = data.get('data', {}).get('tasks', [])
download_template(template_id) else:
return tasks tasks = []
templates = []
logger.warning("获取任务失败")
for template in templates:
template_id = template.get('id', '')
if template_id:
logger.info("更新模板:【%s", template_id)
download_template(template_id)
return tasks
def get_template_info(template_id): def get_template_info(template_id):
@ -56,119 +66,177 @@ def get_template_info(template_id):
:type template_id: str :type template_id: str
:return: 模板信息 :return: 模板信息
""" """
try: tracer = get_tracer(__name__)
response = session.post('{0}/template/{1}'.format(os.getenv('API_ENDPOINT'), template_id), json={ with tracer.start_as_current_span("get_template_info"):
'accessKey': os.getenv('ACCESS_KEY'), with get_tracer("api").start_as_current_span("get_template_info.request") as req_span:
}, timeout=10) try:
response.raise_for_status() req_span.set_attribute("http.method", "POST")
except requests.RequestException as e: req_span.set_attribute("http.url", '{0}/template/{1}'.format(os.getenv('API_ENDPOINT'), template_id))
logger.error("请求失败!", e) response = session.post('{0}/template/{1}'.format(os.getenv('API_ENDPOINT'), template_id), json={
return None 'accessKey': os.getenv('ACCESS_KEY'),
data = response.json() }, timeout=10)
remote_template_info = data.get('data', {}) req_span.set_attribute("http.status_code", response.status_code)
logger.debug("获取模板信息结果:【%s", remote_template_info) response.raise_for_status()
template = { req_span.set_attribute("api.response", response.text)
'id': template_id, except requests.RequestException as e:
'updateTime': remote_template_info.get('updateTime', template_id), req_span.set_attribute("api.error", str(e))
'scenic_name': remote_template_info.get('scenicName', '景区'), logger.error("请求失败!", e)
'name': remote_template_info.get('name', '模版'), return None
'video_size': '1920x1080', data = response.json()
'frame_rate': 30, logger.debug("获取模板信息结果:【%s", data)
'overall_duration': 30, remote_template_info = data.get('data', {})
'video_parts': [ template = {
'id': template_id,
'updateTime': remote_template_info.get('updateTime', template_id),
'scenic_name': remote_template_info.get('scenicName', '景区'),
'name': remote_template_info.get('name', '模版'),
'video_size': remote_template_info.get('resolution', '1920x1080'),
'frame_rate': 25,
'overall_duration': 30,
'video_parts': [
] ]
} }
def _template_normalizer(template_info): def _template_normalizer(template_info):
_template = {} _template = {}
_placeholder_type = template_info.get('isPlaceholder', -1) _placeholder_type = template_info.get('isPlaceholder', -1)
if _placeholder_type == 0: if _placeholder_type == 0:
# 固定视频 # 固定视频
_template['source'] = template_info.get('sourceUrl', '') _template['source'] = template_info.get('sourceUrl', '')
elif _placeholder_type == 1: elif _placeholder_type == 1:
# 占位符 # 占位符
_template['source'] = "PLACEHOLDER_" + template_info.get('sourceUrl', '') _template['source'] = "PLACEHOLDER_" + template_info.get('sourceUrl', '')
_template['mute'] = template_info.get('mute', True) _template['mute'] = template_info.get('mute', True)
else: _template['crop_mode'] = template_info.get('cropEnable', None)
_template['source'] = None else:
_overlays = template_info.get('overlays', '') _template['source'] = None
if _overlays: _overlays = template_info.get('overlays', '')
_template['overlays'] = _overlays.split(",") if _overlays:
_audios = template_info.get('audios', '') _template['overlays'] = _overlays.split(",")
if _audios: _audios = template_info.get('audios', '')
_template['audios'] = _audios.split(",") if _audios:
_luts = template_info.get('luts', '') _template['audios'] = _audios.split(",")
if _luts: _luts = template_info.get('luts', '')
_template['luts'] = _luts.split(",") if _luts:
return _template _template['luts'] = _luts.split(",")
_only_if = template_info.get('onlyIf', '')
if _only_if:
_template['only_if'] = _only_if
_effects = template_info.get('effects', '')
if _effects:
_template['effects'] = _effects.split("|")
return _template
# outer template definition # outer template definition
overall_template = _template_normalizer(remote_template_info) overall_template = _template_normalizer(remote_template_info)
template['overall_template'] = overall_template template['overall_template'] = overall_template
# inter template definition # inter template definition
inter_template_list = remote_template_info.get('children', []) inter_template_list = remote_template_info.get('children', [])
for children_template in inter_template_list: for children_template in inter_template_list:
parts = _template_normalizer(children_template) parts = _template_normalizer(children_template)
template['video_parts'].append(parts) template['video_parts'].append(parts)
template['local_path'] = os.path.join(os.getenv('TEMPLATE_DIR'), str(template_id)) template['local_path'] = os.path.join(os.getenv('TEMPLATE_DIR'), str(template_id))
return template with get_tracer("api").start_as_current_span("get_template_info.template") as res_span:
res_span.set_attribute("normalized.response", json.dumps(template))
return template
def report_task_success(task_info, **kwargs): def report_task_success(task_info, **kwargs):
try: tracer = get_tracer(__name__)
response = session.post('{0}/{1}/success'.format(os.getenv('API_ENDPOINT'), task_info.get("id")), json={ with tracer.start_as_current_span("report_task_success"):
'accessKey': os.getenv('ACCESS_KEY'), with get_tracer("api").start_as_current_span("report_task_success.request") as req_span:
**kwargs try:
}, timeout=10) req_span.set_attribute("http.method", "POST")
response.raise_for_status() req_span.set_attribute("http.url",
except requests.RequestException as e: '{0}/{1}/success'.format(os.getenv('API_ENDPOINT'), task_info.get("id")))
logger.error("请求失败!", e) response = session.post('{0}/{1}/success'.format(os.getenv('API_ENDPOINT'), task_info.get("id")), json={
return None 'accessKey': os.getenv('ACCESS_KEY'),
**kwargs
}, timeout=10)
req_span.set_attribute("http.status_code", response.status_code)
response.raise_for_status()
req_span.set_attribute("api.response", response.text)
except requests.RequestException as e:
req_span.set_attribute("api.error", str(e))
logger.error("请求失败!", e)
return None
def report_task_start(task_info): def report_task_start(task_info):
try: tracer = get_tracer(__name__)
response = session.post('{0}/{1}/start'.format(os.getenv('API_ENDPOINT'), task_info.get("id")), json={ with tracer.start_as_current_span("report_task_start"):
'accessKey': os.getenv('ACCESS_KEY'), with get_tracer("api").start_as_current_span("report_task_start.request") as req_span:
}, timeout=10) try:
response.raise_for_status() req_span.set_attribute("http.method", "POST")
except requests.RequestException as e: req_span.set_attribute("http.url",
logger.error("请求失败!", e) '{0}/{1}/start'.format(os.getenv('API_ENDPOINT'), task_info.get("id")))
return None response = session.post('{0}/{1}/start'.format(os.getenv('API_ENDPOINT'), task_info.get("id")), json={
'accessKey': os.getenv('ACCESS_KEY'),
}, timeout=10)
req_span.set_attribute("http.status_code", response.status_code)
response.raise_for_status()
req_span.set_attribute("api.response", response.text)
except requests.RequestException as e:
req_span.set_attribute("api.error", str(e))
logger.error("请求失败!", e)
return None
def report_task_failed(task_info, reason=''): def report_task_failed(task_info, reason=''):
try: tracer = get_tracer(__name__)
response = session.post('{0}/{1}/fail'.format(os.getenv('API_ENDPOINT'), task_info.get("id")), json={ with tracer.start_as_current_span("report_task_failed"):
'accessKey': os.getenv('ACCESS_KEY'), with tracer.start_as_current_span("report_task_failed.request") as req_span:
'reason': reason try:
}, timeout=10) req_span.set_attribute("http.method", "POST")
response.raise_for_status() req_span.set_attribute("http.url",
except requests.RequestException as e: '{0}/{1}/fail'.format(os.getenv('API_ENDPOINT'), task_info.get("id")))
logger.error("请求失败!", e) response = session.post('{0}/{1}/fail'.format(os.getenv('API_ENDPOINT'), task_info.get("id")), json={
return None 'accessKey': os.getenv('ACCESS_KEY'),
'reason': reason
}, timeout=10)
req_span.set_attribute("http.status_code", response.status_code)
response.raise_for_status()
req_span.set_attribute("api.response", response.text)
except requests.RequestException as e:
req_span.set_attribute("api.error", str(e))
logger.error("请求失败!", e)
return None
def upload_task_file(task_info, ffmpeg_task): def upload_task_file(task_info, ffmpeg_task):
logger.info("开始上传文件: %s", task_info.get("id")) tracer = get_tracer(__name__)
try: with tracer.start_as_current_span("upload_task_file") as span:
response = session.post('{0}/{1}/uploadUrl'.format(os.getenv('API_ENDPOINT'), task_info.get("id")), json={ logger.info("开始上传文件: %s", task_info.get("id"))
'accessKey': os.getenv('ACCESS_KEY'), span.set_attribute("file.id", task_info.get("id"))
}, timeout=10) try:
response.raise_for_status() with tracer.start_as_current_span("upload_task_file.request_upload_url") as req_span:
except requests.RequestException as e: req_span.set_attribute("http.method", "POST")
logger.error("请求失败!", e) req_span.set_attribute("http.url",
return False '{0}/{1}/uploadUrl'.format(os.getenv('API_ENDPOINT'), task_info.get("id")))
data = response.json() response = session.post('{0}/{1}/uploadUrl'.format(os.getenv('API_ENDPOINT'), task_info.get("id")),
url = data.get('data', "") json={
logger.info("开始上传文件: %s%s", task_info.get("id"), url) 'accessKey': os.getenv('ACCESS_KEY'),
try: }, timeout=10)
with open(ffmpeg_task.get_output_file(), 'rb') as f: response.raise_for_status()
requests.put(url, data=f) req_span.set_attribute("http.status_code", response.status_code)
except requests.RequestException as e: except requests.RequestException as e:
logger.error("上传失败!", e) span.set_attribute("api.error", str(e))
return False logger.error("请求失败!", e)
finally: return False
logger.info("上传文件结束: %s", task_info.get("id")) data = response.json()
return True url = data.get('data', "")
logger.info("开始上传文件: %s%s", task_info.get("id"), url)
try:
with tracer.start_as_current_span("upload_task_file.start_upload_file") as upload_span:
upload_span.set_attribute("http.method", "PUT")
upload_span.set_attribute("http.url", url)
with open(ffmpeg_task.get_output_file(), 'rb') as f:
requests.put(url, data=f, headers={"Content-Type": "video/mp4"})
except requests.RequestException as e:
span.set_attribute("api.error", str(e))
logger.error("上传失败!", e)
return False
finally:
logger.info("上传文件结束: %s", task_info.get("id"))
return True

View File

@ -1,40 +1,86 @@
import json
import logging import logging
import os import os
import subprocess import subprocess
from datetime import datetime from datetime import datetime
from typing import Optional, IO from typing import Optional, IO
from entity.ffmpeg import FfmpegTask from entity.ffmpeg import FfmpegTask, ENCODER_ARGS, VIDEO_ARGS, AUDIO_ARGS
from telemetry import get_tracer
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def to_annexb(file): def to_annexb(file):
if not os.path.exists(file): with get_tracer("ffmpeg").start_as_current_span("to_annexb") as span:
return file span.set_attribute("file.path", file)
logger.info("ToAnnexb: %s", file) if not os.path.exists(file):
ffmpeg_process = subprocess.run(["ffmpeg", "-y", "-hide_banner", "-i", file, "-c", "copy", "-bsf:v", "h264_mp4toannexb", return file
"-f", "mpegts", file+".ts"]) logger.info("ToAnnexb: %s", file)
logger.info("ToAnnexb: %s, returned: %s", file, ffmpeg_process.returncode) ffmpeg_process = subprocess.run(["ffmpeg", "-y", "-hide_banner", "-i", file, "-c", "copy", "-bsf:v", "h264_mp4toannexb",
if ffmpeg_process.returncode == 0: "-f", "mpegts", file+".ts"])
os.remove(file) span.set_attribute("ffmpeg.args", json.dumps(ffmpeg_process.args))
return file+".ts" logger.info("ToAnnexb: %s, returned: %s", file, ffmpeg_process.returncode)
else: span.set_attribute("ffmpeg.code", ffmpeg_process.returncode)
return file if ffmpeg_process.returncode == 0:
span.set_attribute("file.size", os.path.getsize(file+".ts"))
os.remove(file)
return file+".ts"
else:
return file
def re_encode_and_annexb(file):
with get_tracer("ffmpeg").start_as_current_span("re_encode_and_annexb") as span:
span.set_attribute("file.path", file)
if not os.path.exists(file):
return file
logger.info("ReEncodeAndAnnexb: %s", file)
ffmpeg_process = subprocess.run(["ffmpeg", "-y", "-hide_banner", "-i", file, *VIDEO_ARGS, *AUDIO_ARGS, *ENCODER_ARGS, "-bsf:v", "h264_mp4toannexb",
"-f", "mpegts", file +".ts"])
span.set_attribute("ffmpeg.args", json.dumps(ffmpeg_process.args))
logger.info("ReEncodeAndAnnexb: %s, returned: %s", file, ffmpeg_process.returncode)
span.set_attribute("ffmpeg.code", ffmpeg_process.returncode)
if ffmpeg_process.returncode == 0:
span.set_attribute("file.size", os.path.getsize(file+".ts"))
os.remove(file)
return file+".ts"
else:
return file
def start_render(ffmpeg_task: FfmpegTask): def start_render(ffmpeg_task: FfmpegTask):
logger.info(ffmpeg_task) tracer = get_tracer(__name__)
if not ffmpeg_task.need_run(): with tracer.start_as_current_span("start_render") as span:
ffmpeg_task.set_output_file(ffmpeg_task.input_file[0]) span.set_attribute("ffmpeg.task", str(ffmpeg_task))
logger.info(ffmpeg_task)
if not ffmpeg_task.need_run():
ffmpeg_task.set_output_file(ffmpeg_task.input_file[0])
return True
ffmpeg_args = ffmpeg_task.get_ffmpeg_args()
if len(ffmpeg_args) == 0:
ffmpeg_task.set_output_file(ffmpeg_task.input_file[0])
return True
ffmpeg_process = subprocess.run(["ffmpeg", "-progress", "-", "-loglevel", "error", *ffmpeg_args], **subprocess_args(True))
span.set_attribute("ffmpeg.args", json.dumps(ffmpeg_process.args))
logger.info(ffmpeg_process.args)
ffmpeg_final_out = handle_ffmpeg_output(ffmpeg_process.stdout)
span.set_attribute("ffmpeg.out", ffmpeg_final_out)
logger.info("FINISH TASK, OUTPUT IS %s", ffmpeg_final_out)
code = ffmpeg_process.returncode
span.set_attribute("ffmpeg.code", code)
if code != 0:
logger.error("FFMPEG ERROR: %s", ffmpeg_process.stderr)
return False
span.set_attribute("ffmpeg.out_file", ffmpeg_task.output_file)
try:
file_size = os.path.getsize(ffmpeg_task.output_file)
span.set_attribute("file.size", file_size)
if file_size < 4096:
logger.error("FFMPEG ERROR: OUTPUT FILE IS TOO SMALL")
return False
except OSError:
span.set_attribute("file.size", 0)
logger.error("FFMPEG ERROR: OUTPUT FILE NOT FOUND")
return False
return True return True
ffmpeg_args = ffmpeg_task.get_ffmpeg_args()
logger.info(ffmpeg_args)
if len(ffmpeg_args) == 0:
ffmpeg_task.set_output_file(ffmpeg_task.input_file[0])
return True
ffmpeg_process = subprocess.run(["ffmpeg", "-progress", "-", "-loglevel", "error", *ffmpeg_args], **subprocess_args(True))
logger.info("FINISH TASK, OUTPUT IS %s", handle_ffmpeg_output(ffmpeg_process.stdout))
code = ffmpeg_process.returncode
return code == 0
def handle_ffmpeg_output(stdout: Optional[bytes]) -> str: def handle_ffmpeg_output(stdout: Optional[bytes]) -> str:
out_time = "0:0:0.0" out_time = "0:0:0.0"
@ -55,25 +101,31 @@ def handle_ffmpeg_output(stdout: Optional[bytes]) -> str:
print("[ ]Speed:", out_time, "@", speed) print("[ ]Speed:", out_time, "@", speed)
return out_time+"@"+speed return out_time+"@"+speed
def duration_str_to_float(duration_str: str) -> float: def duration_str_to_float(duration_str: str) -> float:
_duration = datetime.strptime(duration_str, "%H:%M:%S.%f") - datetime(1900, 1, 1) _duration = datetime.strptime(duration_str, "%H:%M:%S.%f") - datetime(1900, 1, 1)
return _duration.total_seconds() return _duration.total_seconds()
def probe_video_info(video_file): def probe_video_info(video_file):
# 获取宽度和高度 tracer = get_tracer(__name__)
result = subprocess.run( with tracer.start_as_current_span("probe_video_info") as span:
["ffprobe.exe", '-v', 'error', '-select_streams', 'v:0', '-show_entries', 'stream=width,height:format=duration', '-of', span.set_attribute("video.file", video_file)
'csv=s=x:p=0', video_file], # 获取宽度和高度
stderr=subprocess.STDOUT, result = subprocess.run(
**subprocess_args(True) ["ffprobe", '-v', 'error', '-select_streams', 'v:0', '-show_entries', 'stream=width,height:format=duration', '-of',
) 'csv=s=x:p=0', video_file],
all_result = result.stdout.decode('utf-8').strip() stderr=subprocess.STDOUT,
wh, duration = all_result.split('\n') **subprocess_args(True)
width, height = wh.strip().split('x') )
span.set_attribute("ffprobe.args", json.dumps(result.args))
return int(width), int(height), float(duration) span.set_attribute("ffprobe.code", result.returncode)
if result.returncode != 0:
return 0, 0, 0
all_result = result.stdout.decode('utf-8').strip()
span.set_attribute("ffprobe.out", all_result)
wh, duration = all_result.split('\n')
width, height = wh.strip().split('x')
return int(width), int(height), float(duration)
# Create a set of arguments which make a ``subprocess.Popen`` (and # Create a set of arguments which make a ``subprocess.Popen`` (and

View File

@ -3,6 +3,8 @@ import os
import requests import requests
from telemetry import get_tracer
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -13,13 +15,33 @@ def upload_to_oss(url, file_path):
:param str file_path: 文件路径 :param str file_path: 文件路径
:return bool: 是否成功 :return bool: 是否成功
""" """
with open(file_path, 'rb') as f: tracer = get_tracer(__name__)
try: with tracer.start_as_current_span("upload_to_oss") as span:
response = requests.put(url, data=f) span.set_attribute("file.url", url)
return response.status_code == 200 span.set_attribute("file.path", file_path)
except Exception as e: max_retries = 5
print(e) retries = 0
return False while retries < max_retries:
with tracer.start_as_current_span("upload_to_oss.request") as req_span:
req_span.set_attribute("http.retry_count", retries)
try:
req_span.set_attribute("http.method", "PUT")
req_span.set_attribute("http.url", url)
with open(file_path, 'rb') as f:
response = requests.put(url, data=f, timeout=60) # 设置超时时间为1分钟
req_span.set_attribute("http.status_code", response.status_code)
if response.status_code == 200:
return True
except requests.exceptions.Timeout:
req_span.set_attribute("http.error", "Timeout")
retries += 1
logger.warning(f"Upload timed out. Retrying {retries}/{max_retries}...")
except Exception as e:
req_span.set_attribute("http.error", str(e))
retries += 1
logger.warning(f"Upload failed. Retrying {retries}/{max_retries}...")
return False
def download_from_oss(url, file_path): def download_from_oss(url, file_path):
""" """
@ -28,16 +50,35 @@ def download_from_oss(url, file_path):
:param Union[LiteralString, str, bytes] file_path: 文件路径 :param Union[LiteralString, str, bytes] file_path: 文件路径
:return bool: 是否成功 :return bool: 是否成功
""" """
logging.info("download_from_oss: %s", url) tracer = get_tracer(__name__)
file_dir, file_name = os.path.split(file_path) with tracer.start_as_current_span("download_from_oss") as span:
if file_dir: span.set_attribute("file.url", url)
if not os.path.exists(file_dir): span.set_attribute("file.path", file_path)
os.makedirs(file_dir) logging.info("download_from_oss: %s", url)
try: file_dir, file_name = os.path.split(file_path)
response = requests.get(url) if file_dir:
with open(file_path, 'wb') as f: if not os.path.exists(file_dir):
f.write(response.content) os.makedirs(file_dir)
return True max_retries = 5
except Exception as e: retries = 0
print(e) while retries < max_retries:
return False with tracer.start_as_current_span("download_from_oss.request") as req_span:
req_span.set_attribute("http.retry_count", retries)
try:
req_span.set_attribute("http.method", "GET")
req_span.set_attribute("http.url", url)
response = requests.get(url, timeout=15) # 设置超时时间
req_span.set_attribute("http.status_code", response.status_code)
with open(file_path, 'wb') as f:
f.write(response.content)
span.set_attribute("file.size", os.path.getsize(file_path))
return True
except requests.exceptions.Timeout:
span.set_attribute("http.error", "Timeout")
retries += 1
logger.warning(f"Download timed out. Retrying {retries}/{max_retries}...")
except Exception as e:
span.set_attribute("http.error", str(e))
retries += 1
logger.warning(f"Download failed. Retrying {retries}/{max_retries}...")
return False