From b8db0d2b959ff47412e21582cabe6e83561b3890 Mon Sep 17 00:00:00 2001 From: Jerry Yan <792602257@qq.com> Date: Sun, 23 Mar 2025 18:36:26 +0800 Subject: [PATCH] =?UTF-8?q?metrics=E8=B0=83=E6=95=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- biz/ffmpeg.py | 11 +++++++++-- biz/task.py | 11 ++++++++++- util/api.py | 29 ++++++++++++++++++----------- util/ffmpeg.py | 19 +++++++++++++++++++ util/oss.py | 18 +++++++++++++++--- 5 files changed, 71 insertions(+), 17 deletions(-) diff --git a/biz/ffmpeg.py b/biz/ffmpeg.py index ee7d89f..d7d9e4d 100644 --- a/biz/ffmpeg.py +++ b/biz/ffmpeg.py @@ -2,6 +2,8 @@ import json import os.path import time +from opentelemetry.trace import Status, StatusCode + from entity.ffmpeg import FfmpegTask import logging @@ -108,13 +110,18 @@ def check_placeholder_exist(placeholder_id, task_params): def start_ffmpeg_task(ffmpeg_task): tracer = get_tracer(__name__) - with tracer.start_as_current_span("start_ffmpeg_task"): + with tracer.start_as_current_span("start_ffmpeg_task") as span: for task in ffmpeg_task.analyze_input_render_tasks(): result = start_ffmpeg_task(task) if not result: return False ffmpeg_task.correct_task_type() - return ffmpeg.start_render(ffmpeg_task) + result = ffmpeg.start_render(ffmpeg_task) + if not result: + span.set_status(Status(StatusCode.ERROR)) + return False + span.set_status(Status(StatusCode.OK)) + return True def clear_task_tmp_file(ffmpeg_task): diff --git a/biz/task.py b/biz/task.py index 834197f..f51fb5b 100644 --- a/biz/task.py +++ b/biz/task.py @@ -1,3 +1,7 @@ +import json + +from opentelemetry.trace import Status, StatusCode + from biz.ffmpeg import parse_ffmpeg_task, start_ffmpeg_task, clear_task_tmp_file, probe_video_info from telemetry import get_tracer from template import get_template_def @@ -6,16 +10,20 @@ from util import api def start_task(task_info): tracer = get_tracer(__name__) - with tracer.start_as_current_span("start_task"): + with tracer.start_as_current_span("start_task") as span: + span.set_attribute("task.id", task_info) task_info = api.normalize_task(task_info) + span.set_attribute("task.info", json.dumps(task_info)) template_info = get_template_def(task_info.get("templateId")) api.report_task_start(task_info) ffmpeg_task = parse_ffmpeg_task(task_info, template_info) result = start_ffmpeg_task(ffmpeg_task) if not result: + span.set_status(Status(StatusCode.ERROR)) return api.report_task_failed(task_info) oss_result = api.upload_task_file(task_info, ffmpeg_task) if not oss_result: + span.set_status(Status(StatusCode.ERROR)) return api.report_task_failed(task_info) # 获取视频长度宽度和时长 width, height, duration = probe_video_info(ffmpeg_task) @@ -25,3 +33,4 @@ def start_task(task_info): "height": height, "duration": duration }) + span.set_status(Status(StatusCode.OK)) diff --git a/util/api.py b/util/api.py index f5cfc38..b2d31a4 100644 --- a/util/api.py +++ b/util/api.py @@ -3,6 +3,7 @@ import logging import os import requests +from opentelemetry.trace import Status, StatusCode import util.system from telemetry import get_tracer @@ -25,7 +26,7 @@ def sync_center(): from template import TEMPLATES, download_template tracer = get_tracer(__name__) with tracer.start_as_current_span("sync_center"): - with get_tracer("api").start_as_current_span("sync_center.request") as req_span: + with tracer.start_as_current_span("sync_center.request") as req_span: try: req_span.set_attribute("http.method", "POST") req_span.set_attribute("http.url", os.getenv('API_ENDPOINT') + "/sync") @@ -69,7 +70,7 @@ def get_template_info(template_id): """ tracer = get_tracer(__name__) with tracer.start_as_current_span("get_template_info"): - with get_tracer("api").start_as_current_span("get_template_info.request") as req_span: + with tracer.start_as_current_span("get_template_info.request") as req_span: try: req_span.set_attribute("http.method", "POST") req_span.set_attribute("http.url", '{0}/template/{1}'.format(os.getenv('API_ENDPOINT'), template_id)) @@ -146,7 +147,7 @@ def get_template_info(template_id): def report_task_success(task_info, **kwargs): tracer = get_tracer(__name__) with tracer.start_as_current_span("report_task_success"): - with get_tracer("api").start_as_current_span("report_task_success.request") as req_span: + with tracer.start_as_current_span("report_task_success.request") as req_span: try: req_span.set_attribute("http.method", "POST") req_span.set_attribute("http.url", @@ -158,6 +159,7 @@ def report_task_success(task_info, **kwargs): req_span.set_attribute("http.status_code", response.status_code) req_span.set_attribute("http.response", response.text) response.raise_for_status() + req_span.set_status(Status(StatusCode.OK)) except requests.RequestException as e: req_span.set_attribute("api.error", str(e)) logger.error("请求失败!", e) @@ -167,7 +169,7 @@ def report_task_success(task_info, **kwargs): def report_task_start(task_info): tracer = get_tracer(__name__) with tracer.start_as_current_span("report_task_start"): - with get_tracer("api").start_as_current_span("report_task_start.request") as req_span: + with tracer.start_as_current_span("report_task_start.request") as req_span: try: req_span.set_attribute("http.method", "POST") req_span.set_attribute("http.url", @@ -178,6 +180,7 @@ def report_task_start(task_info): req_span.set_attribute("http.status_code", response.status_code) req_span.set_attribute("http.response", response.text) response.raise_for_status() + req_span.set_status(Status(StatusCode.OK)) except requests.RequestException as e: req_span.set_attribute("api.error", str(e)) logger.error("请求失败!", e) @@ -187,7 +190,7 @@ def report_task_start(task_info): def report_task_failed(task_info, reason=''): tracer = get_tracer(__name__) with tracer.start_as_current_span("report_task_failed"): - with get_tracer("api").start_as_current_span("report_task_failed.request") as req_span: + with tracer.start_as_current_span("report_task_failed.request") as req_span: try: req_span.set_attribute("http.method", "POST") req_span.set_attribute("http.url", @@ -199,8 +202,10 @@ def report_task_failed(task_info, reason=''): req_span.set_attribute("http.status_code", response.status_code) req_span.set_attribute("http.response", response.text) response.raise_for_status() + req_span.set_status(Status(StatusCode.OK)) except requests.RequestException as e: req_span.set_attribute("api.error", str(e)) + req_span.set_status(Status(StatusCode.ERROR)) logger.error("请求失败!", e) return None @@ -210,8 +215,8 @@ def upload_task_file(task_info, ffmpeg_task): with get_tracer("api").start_as_current_span("upload_task_file") as span: logger.info("开始上传文件: %s", task_info.get("id")) span.set_attribute("file.id", task_info.get("id")) - try: - with tracer.start_as_current_span("upload_task_file.request_upload_url") as req_span: + with tracer.start_as_current_span("upload_task_file.request_upload_url") as req_span: + try: req_span.set_attribute("http.method", "POST") req_span.set_attribute("http.url", '{0}/{1}/uploadUrl'.format(os.getenv('API_ENDPOINT'), task_info.get("id"))) @@ -222,10 +227,12 @@ def upload_task_file(task_info, ffmpeg_task): req_span.set_attribute("http.status_code", response.status_code) req_span.set_attribute("http.response", response.text) response.raise_for_status() - except requests.RequestException as e: - span.set_attribute("api.error", str(e)) - logger.error("请求失败!", e) - return False + req_span.set_status(Status(StatusCode.OK)) + except requests.RequestException as e: + span.set_attribute("api.error", str(e)) + req_span.set_status(Status(StatusCode.ERROR)) + logger.error("请求失败!", e) + return False data = response.json() url = data.get('data', "") logger.info("开始上传文件: %s 至 %s", task_info.get("id"), url) diff --git a/util/ffmpeg.py b/util/ffmpeg.py index dd3179b..4a3fe59 100644 --- a/util/ffmpeg.py +++ b/util/ffmpeg.py @@ -5,6 +5,8 @@ import subprocess from datetime import datetime from typing import Optional, IO +from opentelemetry.trace import Status, StatusCode + from entity.ffmpeg import FfmpegTask, ENCODER_ARGS, VIDEO_ARGS, AUDIO_ARGS, MUTE_AUDIO_INPUT from telemetry import get_tracer @@ -14,6 +16,7 @@ def to_annexb(file): with get_tracer("ffmpeg").start_as_current_span("to_annexb") as span: span.set_attribute("file.path", file) if not os.path.exists(file): + span.set_status(Status(StatusCode.ERROR)) return file logger.info("ToAnnexb: %s", file) ffmpeg_process = subprocess.run(["ffmpeg", "-y", "-hide_banner", "-i", file, @@ -26,16 +29,19 @@ def to_annexb(file): logger.info("ToAnnexb: %s, returned: %s", file, ffmpeg_process.returncode) span.set_attribute("ffmpeg.code", ffmpeg_process.returncode) if ffmpeg_process.returncode == 0: + span.set_status(Status(StatusCode.OK)) span.set_attribute("file.size", os.path.getsize(file+".ts")) os.remove(file) return file+".ts" else: + span.set_status(Status(StatusCode.ERROR)) return file def re_encode_and_annexb(file): with get_tracer("ffmpeg").start_as_current_span("re_encode_and_annexb") as span: span.set_attribute("file.path", file) if not os.path.exists(file): + span.set_status(Status(StatusCode.ERROR)) return file logger.info("ReEncodeAndAnnexb: %s", file) ffmpeg_process = subprocess.run(["ffmpeg", "-y", "-hide_banner", "-i", file, @@ -49,10 +55,12 @@ def re_encode_and_annexb(file): logger.info("ReEncodeAndAnnexb: %s, returned: %s", file, ffmpeg_process.returncode) span.set_attribute("ffmpeg.code", ffmpeg_process.returncode) if ffmpeg_process.returncode == 0: + span.set_status(Status(StatusCode.OK)) span.set_attribute("file.size", os.path.getsize(file+".ts")) os.remove(file) return file+".ts" else: + span.set_status(Status(StatusCode.ERROR)) return file def start_render(ffmpeg_task: FfmpegTask): @@ -62,10 +70,12 @@ def start_render(ffmpeg_task: FfmpegTask): logger.info(ffmpeg_task) if not ffmpeg_task.need_run(): ffmpeg_task.set_output_file(ffmpeg_task.input_file[0]) + span.set_status(Status(StatusCode.OK)) return True ffmpeg_args = ffmpeg_task.get_ffmpeg_args() if len(ffmpeg_args) == 0: ffmpeg_task.set_output_file(ffmpeg_task.input_file[0]) + span.set_status(Status(StatusCode.OK)) return True ffmpeg_process = subprocess.run(["ffmpeg", "-progress", "-", "-loglevel", "error", *ffmpeg_args], **subprocess_args(True)) span.set_attribute("ffmpeg.args", json.dumps(ffmpeg_process.args)) @@ -76,6 +86,7 @@ def start_render(ffmpeg_task: FfmpegTask): code = ffmpeg_process.returncode span.set_attribute("ffmpeg.code", code) if code != 0: + span.set_status(Status(StatusCode.ERROR)) logger.error("FFMPEG ERROR: %s", ffmpeg_process.stderr) return False span.set_attribute("ffmpeg.out_file", ffmpeg_task.output_file) @@ -83,12 +94,15 @@ def start_render(ffmpeg_task: FfmpegTask): file_size = os.path.getsize(ffmpeg_task.output_file) span.set_attribute("file.size", file_size) if file_size < 4096: + span.set_status(Status(StatusCode.ERROR)) logger.error("FFMPEG ERROR: OUTPUT FILE IS TOO SMALL") return False except OSError: span.set_attribute("file.size", 0) + span.set_status(Status(StatusCode.ERROR)) logger.error("FFMPEG ERROR: OUTPUT FILE NOT FOUND") return False + span.set_status(Status(StatusCode.OK)) return True def handle_ffmpeg_output(stdout: Optional[bytes]) -> str: @@ -129,9 +143,14 @@ def probe_video_info(video_file): span.set_attribute("ffprobe.args", json.dumps(result.args)) span.set_attribute("ffprobe.code", result.returncode) if result.returncode != 0: + span.set_status(Status(StatusCode.ERROR)) return 0, 0, 0 all_result = result.stdout.decode('utf-8').strip() span.set_attribute("ffprobe.out", all_result) + if all_result == '': + span.set_status(Status(StatusCode.ERROR)) + return 0, 0, 0 + span.set_status(Status(StatusCode.OK)) wh, duration = all_result.split('\n') width, height = wh.strip().split('x') return int(width), int(height), float(duration) diff --git a/util/oss.py b/util/oss.py index aa4019d..faa375a 100644 --- a/util/oss.py +++ b/util/oss.py @@ -2,6 +2,7 @@ import logging import os import requests +from opentelemetry.trace import Status, StatusCode from telemetry import get_tracer @@ -19,6 +20,7 @@ def upload_to_oss(url, file_path): with tracer.start_as_current_span("upload_to_oss") as span: span.set_attribute("file.url", url) span.set_attribute("file.path", file_path) + span.set_attribute("file.size", os.path.getsize(file_path)) max_retries = 5 retries = 0 while retries < max_retries: @@ -32,15 +34,20 @@ def upload_to_oss(url, file_path): req_span.set_attribute("http.status_code", response.status_code) req_span.set_attribute("http.response", response.text) response.raise_for_status() + req_span.set_status(Status(StatusCode.OK)) + span.set_status(Status(StatusCode.OK)) return True except requests.exceptions.Timeout: req_span.set_attribute("http.error", "Timeout") + req_span.set_status(Status(StatusCode.ERROR)) retries += 1 logger.warning(f"Upload timed out. Retrying {retries}/{max_retries}...") except Exception as e: req_span.set_attribute("http.error", str(e)) + req_span.set_status(Status(StatusCode.ERROR)) retries += 1 logger.warning(f"Upload failed. Retrying {retries}/{max_retries}...") + span.set_status(Status(StatusCode.ERROR)) return False @@ -72,14 +79,19 @@ def download_from_oss(url, file_path): req_span.set_attribute("http.status_code", response.status_code) with open(file_path, 'wb') as f: f.write(response.content) - span.set_attribute("file.size", os.path.getsize(file_path)) + req_span.set_attribute("file.size", os.path.getsize(file_path)) + req_span.set_status(Status(StatusCode.OK)) + span.set_status(Status(StatusCode.OK)) return True except requests.exceptions.Timeout: - span.set_attribute("http.error", "Timeout") + req_span.set_attribute("http.error", "Timeout") + req_span.set_status(Status(StatusCode.ERROR)) retries += 1 logger.warning(f"Download timed out. Retrying {retries}/{max_retries}...") except Exception as e: - span.set_attribute("http.error", str(e)) + req_span.set_attribute("http.error", str(e)) + req_span.set_status(Status(StatusCode.ERROR)) retries += 1 logger.warning(f"Download failed. Retrying {retries}/{max_retries}...") + span.set_status(Status(StatusCode.ERROR)) return False