metrics调整
This commit is contained in:
parent
6dc7e86e8e
commit
b8db0d2b95
@ -2,6 +2,8 @@ import json
|
||||
import os.path
|
||||
import time
|
||||
|
||||
from opentelemetry.trace import Status, StatusCode
|
||||
|
||||
from entity.ffmpeg import FfmpegTask
|
||||
import logging
|
||||
|
||||
@ -108,13 +110,18 @@ def check_placeholder_exist(placeholder_id, task_params):
|
||||
|
||||
def start_ffmpeg_task(ffmpeg_task):
|
||||
tracer = get_tracer(__name__)
|
||||
with tracer.start_as_current_span("start_ffmpeg_task"):
|
||||
with tracer.start_as_current_span("start_ffmpeg_task") as span:
|
||||
for task in ffmpeg_task.analyze_input_render_tasks():
|
||||
result = start_ffmpeg_task(task)
|
||||
if not result:
|
||||
return False
|
||||
ffmpeg_task.correct_task_type()
|
||||
return ffmpeg.start_render(ffmpeg_task)
|
||||
result = ffmpeg.start_render(ffmpeg_task)
|
||||
if not result:
|
||||
span.set_status(Status(StatusCode.ERROR))
|
||||
return False
|
||||
span.set_status(Status(StatusCode.OK))
|
||||
return True
|
||||
|
||||
|
||||
def clear_task_tmp_file(ffmpeg_task):
|
||||
|
11
biz/task.py
11
biz/task.py
@ -1,3 +1,7 @@
|
||||
import json
|
||||
|
||||
from opentelemetry.trace import Status, StatusCode
|
||||
|
||||
from biz.ffmpeg import parse_ffmpeg_task, start_ffmpeg_task, clear_task_tmp_file, probe_video_info
|
||||
from telemetry import get_tracer
|
||||
from template import get_template_def
|
||||
@ -6,16 +10,20 @@ from util import api
|
||||
|
||||
def start_task(task_info):
|
||||
tracer = get_tracer(__name__)
|
||||
with tracer.start_as_current_span("start_task"):
|
||||
with tracer.start_as_current_span("start_task") as span:
|
||||
span.set_attribute("task.id", task_info)
|
||||
task_info = api.normalize_task(task_info)
|
||||
span.set_attribute("task.info", json.dumps(task_info))
|
||||
template_info = get_template_def(task_info.get("templateId"))
|
||||
api.report_task_start(task_info)
|
||||
ffmpeg_task = parse_ffmpeg_task(task_info, template_info)
|
||||
result = start_ffmpeg_task(ffmpeg_task)
|
||||
if not result:
|
||||
span.set_status(Status(StatusCode.ERROR))
|
||||
return api.report_task_failed(task_info)
|
||||
oss_result = api.upload_task_file(task_info, ffmpeg_task)
|
||||
if not oss_result:
|
||||
span.set_status(Status(StatusCode.ERROR))
|
||||
return api.report_task_failed(task_info)
|
||||
# 获取视频长度宽度和时长
|
||||
width, height, duration = probe_video_info(ffmpeg_task)
|
||||
@ -25,3 +33,4 @@ def start_task(task_info):
|
||||
"height": height,
|
||||
"duration": duration
|
||||
})
|
||||
span.set_status(Status(StatusCode.OK))
|
||||
|
19
util/api.py
19
util/api.py
@ -3,6 +3,7 @@ import logging
|
||||
import os
|
||||
|
||||
import requests
|
||||
from opentelemetry.trace import Status, StatusCode
|
||||
|
||||
import util.system
|
||||
from telemetry import get_tracer
|
||||
@ -25,7 +26,7 @@ def sync_center():
|
||||
from template import TEMPLATES, download_template
|
||||
tracer = get_tracer(__name__)
|
||||
with tracer.start_as_current_span("sync_center"):
|
||||
with get_tracer("api").start_as_current_span("sync_center.request") as req_span:
|
||||
with tracer.start_as_current_span("sync_center.request") as req_span:
|
||||
try:
|
||||
req_span.set_attribute("http.method", "POST")
|
||||
req_span.set_attribute("http.url", os.getenv('API_ENDPOINT') + "/sync")
|
||||
@ -69,7 +70,7 @@ def get_template_info(template_id):
|
||||
"""
|
||||
tracer = get_tracer(__name__)
|
||||
with tracer.start_as_current_span("get_template_info"):
|
||||
with get_tracer("api").start_as_current_span("get_template_info.request") as req_span:
|
||||
with tracer.start_as_current_span("get_template_info.request") as req_span:
|
||||
try:
|
||||
req_span.set_attribute("http.method", "POST")
|
||||
req_span.set_attribute("http.url", '{0}/template/{1}'.format(os.getenv('API_ENDPOINT'), template_id))
|
||||
@ -146,7 +147,7 @@ def get_template_info(template_id):
|
||||
def report_task_success(task_info, **kwargs):
|
||||
tracer = get_tracer(__name__)
|
||||
with tracer.start_as_current_span("report_task_success"):
|
||||
with get_tracer("api").start_as_current_span("report_task_success.request") as req_span:
|
||||
with tracer.start_as_current_span("report_task_success.request") as req_span:
|
||||
try:
|
||||
req_span.set_attribute("http.method", "POST")
|
||||
req_span.set_attribute("http.url",
|
||||
@ -158,6 +159,7 @@ def report_task_success(task_info, **kwargs):
|
||||
req_span.set_attribute("http.status_code", response.status_code)
|
||||
req_span.set_attribute("http.response", response.text)
|
||||
response.raise_for_status()
|
||||
req_span.set_status(Status(StatusCode.OK))
|
||||
except requests.RequestException as e:
|
||||
req_span.set_attribute("api.error", str(e))
|
||||
logger.error("请求失败!", e)
|
||||
@ -167,7 +169,7 @@ def report_task_success(task_info, **kwargs):
|
||||
def report_task_start(task_info):
|
||||
tracer = get_tracer(__name__)
|
||||
with tracer.start_as_current_span("report_task_start"):
|
||||
with get_tracer("api").start_as_current_span("report_task_start.request") as req_span:
|
||||
with tracer.start_as_current_span("report_task_start.request") as req_span:
|
||||
try:
|
||||
req_span.set_attribute("http.method", "POST")
|
||||
req_span.set_attribute("http.url",
|
||||
@ -178,6 +180,7 @@ def report_task_start(task_info):
|
||||
req_span.set_attribute("http.status_code", response.status_code)
|
||||
req_span.set_attribute("http.response", response.text)
|
||||
response.raise_for_status()
|
||||
req_span.set_status(Status(StatusCode.OK))
|
||||
except requests.RequestException as e:
|
||||
req_span.set_attribute("api.error", str(e))
|
||||
logger.error("请求失败!", e)
|
||||
@ -187,7 +190,7 @@ def report_task_start(task_info):
|
||||
def report_task_failed(task_info, reason=''):
|
||||
tracer = get_tracer(__name__)
|
||||
with tracer.start_as_current_span("report_task_failed"):
|
||||
with get_tracer("api").start_as_current_span("report_task_failed.request") as req_span:
|
||||
with tracer.start_as_current_span("report_task_failed.request") as req_span:
|
||||
try:
|
||||
req_span.set_attribute("http.method", "POST")
|
||||
req_span.set_attribute("http.url",
|
||||
@ -199,8 +202,10 @@ def report_task_failed(task_info, reason=''):
|
||||
req_span.set_attribute("http.status_code", response.status_code)
|
||||
req_span.set_attribute("http.response", response.text)
|
||||
response.raise_for_status()
|
||||
req_span.set_status(Status(StatusCode.OK))
|
||||
except requests.RequestException as e:
|
||||
req_span.set_attribute("api.error", str(e))
|
||||
req_span.set_status(Status(StatusCode.ERROR))
|
||||
logger.error("请求失败!", e)
|
||||
return None
|
||||
|
||||
@ -210,8 +215,8 @@ def upload_task_file(task_info, ffmpeg_task):
|
||||
with get_tracer("api").start_as_current_span("upload_task_file") as span:
|
||||
logger.info("开始上传文件: %s", task_info.get("id"))
|
||||
span.set_attribute("file.id", task_info.get("id"))
|
||||
try:
|
||||
with tracer.start_as_current_span("upload_task_file.request_upload_url") as req_span:
|
||||
try:
|
||||
req_span.set_attribute("http.method", "POST")
|
||||
req_span.set_attribute("http.url",
|
||||
'{0}/{1}/uploadUrl'.format(os.getenv('API_ENDPOINT'), task_info.get("id")))
|
||||
@ -222,8 +227,10 @@ def upload_task_file(task_info, ffmpeg_task):
|
||||
req_span.set_attribute("http.status_code", response.status_code)
|
||||
req_span.set_attribute("http.response", response.text)
|
||||
response.raise_for_status()
|
||||
req_span.set_status(Status(StatusCode.OK))
|
||||
except requests.RequestException as e:
|
||||
span.set_attribute("api.error", str(e))
|
||||
req_span.set_status(Status(StatusCode.ERROR))
|
||||
logger.error("请求失败!", e)
|
||||
return False
|
||||
data = response.json()
|
||||
|
@ -5,6 +5,8 @@ import subprocess
|
||||
from datetime import datetime
|
||||
from typing import Optional, IO
|
||||
|
||||
from opentelemetry.trace import Status, StatusCode
|
||||
|
||||
from entity.ffmpeg import FfmpegTask, ENCODER_ARGS, VIDEO_ARGS, AUDIO_ARGS, MUTE_AUDIO_INPUT
|
||||
from telemetry import get_tracer
|
||||
|
||||
@ -14,6 +16,7 @@ def to_annexb(file):
|
||||
with get_tracer("ffmpeg").start_as_current_span("to_annexb") as span:
|
||||
span.set_attribute("file.path", file)
|
||||
if not os.path.exists(file):
|
||||
span.set_status(Status(StatusCode.ERROR))
|
||||
return file
|
||||
logger.info("ToAnnexb: %s", file)
|
||||
ffmpeg_process = subprocess.run(["ffmpeg", "-y", "-hide_banner", "-i", file,
|
||||
@ -26,16 +29,19 @@ def to_annexb(file):
|
||||
logger.info("ToAnnexb: %s, returned: %s", file, ffmpeg_process.returncode)
|
||||
span.set_attribute("ffmpeg.code", ffmpeg_process.returncode)
|
||||
if ffmpeg_process.returncode == 0:
|
||||
span.set_status(Status(StatusCode.OK))
|
||||
span.set_attribute("file.size", os.path.getsize(file+".ts"))
|
||||
os.remove(file)
|
||||
return file+".ts"
|
||||
else:
|
||||
span.set_status(Status(StatusCode.ERROR))
|
||||
return file
|
||||
|
||||
def re_encode_and_annexb(file):
|
||||
with get_tracer("ffmpeg").start_as_current_span("re_encode_and_annexb") as span:
|
||||
span.set_attribute("file.path", file)
|
||||
if not os.path.exists(file):
|
||||
span.set_status(Status(StatusCode.ERROR))
|
||||
return file
|
||||
logger.info("ReEncodeAndAnnexb: %s", file)
|
||||
ffmpeg_process = subprocess.run(["ffmpeg", "-y", "-hide_banner", "-i", file,
|
||||
@ -49,10 +55,12 @@ def re_encode_and_annexb(file):
|
||||
logger.info("ReEncodeAndAnnexb: %s, returned: %s", file, ffmpeg_process.returncode)
|
||||
span.set_attribute("ffmpeg.code", ffmpeg_process.returncode)
|
||||
if ffmpeg_process.returncode == 0:
|
||||
span.set_status(Status(StatusCode.OK))
|
||||
span.set_attribute("file.size", os.path.getsize(file+".ts"))
|
||||
os.remove(file)
|
||||
return file+".ts"
|
||||
else:
|
||||
span.set_status(Status(StatusCode.ERROR))
|
||||
return file
|
||||
|
||||
def start_render(ffmpeg_task: FfmpegTask):
|
||||
@ -62,10 +70,12 @@ def start_render(ffmpeg_task: FfmpegTask):
|
||||
logger.info(ffmpeg_task)
|
||||
if not ffmpeg_task.need_run():
|
||||
ffmpeg_task.set_output_file(ffmpeg_task.input_file[0])
|
||||
span.set_status(Status(StatusCode.OK))
|
||||
return True
|
||||
ffmpeg_args = ffmpeg_task.get_ffmpeg_args()
|
||||
if len(ffmpeg_args) == 0:
|
||||
ffmpeg_task.set_output_file(ffmpeg_task.input_file[0])
|
||||
span.set_status(Status(StatusCode.OK))
|
||||
return True
|
||||
ffmpeg_process = subprocess.run(["ffmpeg", "-progress", "-", "-loglevel", "error", *ffmpeg_args], **subprocess_args(True))
|
||||
span.set_attribute("ffmpeg.args", json.dumps(ffmpeg_process.args))
|
||||
@ -76,6 +86,7 @@ def start_render(ffmpeg_task: FfmpegTask):
|
||||
code = ffmpeg_process.returncode
|
||||
span.set_attribute("ffmpeg.code", code)
|
||||
if code != 0:
|
||||
span.set_status(Status(StatusCode.ERROR))
|
||||
logger.error("FFMPEG ERROR: %s", ffmpeg_process.stderr)
|
||||
return False
|
||||
span.set_attribute("ffmpeg.out_file", ffmpeg_task.output_file)
|
||||
@ -83,12 +94,15 @@ def start_render(ffmpeg_task: FfmpegTask):
|
||||
file_size = os.path.getsize(ffmpeg_task.output_file)
|
||||
span.set_attribute("file.size", file_size)
|
||||
if file_size < 4096:
|
||||
span.set_status(Status(StatusCode.ERROR))
|
||||
logger.error("FFMPEG ERROR: OUTPUT FILE IS TOO SMALL")
|
||||
return False
|
||||
except OSError:
|
||||
span.set_attribute("file.size", 0)
|
||||
span.set_status(Status(StatusCode.ERROR))
|
||||
logger.error("FFMPEG ERROR: OUTPUT FILE NOT FOUND")
|
||||
return False
|
||||
span.set_status(Status(StatusCode.OK))
|
||||
return True
|
||||
|
||||
def handle_ffmpeg_output(stdout: Optional[bytes]) -> str:
|
||||
@ -129,9 +143,14 @@ def probe_video_info(video_file):
|
||||
span.set_attribute("ffprobe.args", json.dumps(result.args))
|
||||
span.set_attribute("ffprobe.code", result.returncode)
|
||||
if result.returncode != 0:
|
||||
span.set_status(Status(StatusCode.ERROR))
|
||||
return 0, 0, 0
|
||||
all_result = result.stdout.decode('utf-8').strip()
|
||||
span.set_attribute("ffprobe.out", all_result)
|
||||
if all_result == '':
|
||||
span.set_status(Status(StatusCode.ERROR))
|
||||
return 0, 0, 0
|
||||
span.set_status(Status(StatusCode.OK))
|
||||
wh, duration = all_result.split('\n')
|
||||
width, height = wh.strip().split('x')
|
||||
return int(width), int(height), float(duration)
|
||||
|
18
util/oss.py
18
util/oss.py
@ -2,6 +2,7 @@ import logging
|
||||
import os
|
||||
|
||||
import requests
|
||||
from opentelemetry.trace import Status, StatusCode
|
||||
|
||||
from telemetry import get_tracer
|
||||
|
||||
@ -19,6 +20,7 @@ def upload_to_oss(url, file_path):
|
||||
with tracer.start_as_current_span("upload_to_oss") as span:
|
||||
span.set_attribute("file.url", url)
|
||||
span.set_attribute("file.path", file_path)
|
||||
span.set_attribute("file.size", os.path.getsize(file_path))
|
||||
max_retries = 5
|
||||
retries = 0
|
||||
while retries < max_retries:
|
||||
@ -32,15 +34,20 @@ def upload_to_oss(url, file_path):
|
||||
req_span.set_attribute("http.status_code", response.status_code)
|
||||
req_span.set_attribute("http.response", response.text)
|
||||
response.raise_for_status()
|
||||
req_span.set_status(Status(StatusCode.OK))
|
||||
span.set_status(Status(StatusCode.OK))
|
||||
return True
|
||||
except requests.exceptions.Timeout:
|
||||
req_span.set_attribute("http.error", "Timeout")
|
||||
req_span.set_status(Status(StatusCode.ERROR))
|
||||
retries += 1
|
||||
logger.warning(f"Upload timed out. Retrying {retries}/{max_retries}...")
|
||||
except Exception as e:
|
||||
req_span.set_attribute("http.error", str(e))
|
||||
req_span.set_status(Status(StatusCode.ERROR))
|
||||
retries += 1
|
||||
logger.warning(f"Upload failed. Retrying {retries}/{max_retries}...")
|
||||
span.set_status(Status(StatusCode.ERROR))
|
||||
return False
|
||||
|
||||
|
||||
@ -72,14 +79,19 @@ def download_from_oss(url, file_path):
|
||||
req_span.set_attribute("http.status_code", response.status_code)
|
||||
with open(file_path, 'wb') as f:
|
||||
f.write(response.content)
|
||||
span.set_attribute("file.size", os.path.getsize(file_path))
|
||||
req_span.set_attribute("file.size", os.path.getsize(file_path))
|
||||
req_span.set_status(Status(StatusCode.OK))
|
||||
span.set_status(Status(StatusCode.OK))
|
||||
return True
|
||||
except requests.exceptions.Timeout:
|
||||
span.set_attribute("http.error", "Timeout")
|
||||
req_span.set_attribute("http.error", "Timeout")
|
||||
req_span.set_status(Status(StatusCode.ERROR))
|
||||
retries += 1
|
||||
logger.warning(f"Download timed out. Retrying {retries}/{max_retries}...")
|
||||
except Exception as e:
|
||||
span.set_attribute("http.error", str(e))
|
||||
req_span.set_attribute("http.error", str(e))
|
||||
req_span.set_status(Status(StatusCode.ERROR))
|
||||
retries += 1
|
||||
logger.warning(f"Download failed. Retrying {retries}/{max_retries}...")
|
||||
span.set_status(Status(StatusCode.ERROR))
|
||||
return False
|
||||
|
Loading…
x
Reference in New Issue
Block a user