diff --git a/biz/ffmpeg.py b/biz/ffmpeg.py index ee7d89f..d7d9e4d 100644 --- a/biz/ffmpeg.py +++ b/biz/ffmpeg.py @@ -2,6 +2,8 @@ import json import os.path import time +from opentelemetry.trace import Status, StatusCode + from entity.ffmpeg import FfmpegTask import logging @@ -108,13 +110,18 @@ def check_placeholder_exist(placeholder_id, task_params): def start_ffmpeg_task(ffmpeg_task): tracer = get_tracer(__name__) - with tracer.start_as_current_span("start_ffmpeg_task"): + with tracer.start_as_current_span("start_ffmpeg_task") as span: for task in ffmpeg_task.analyze_input_render_tasks(): result = start_ffmpeg_task(task) if not result: return False ffmpeg_task.correct_task_type() - return ffmpeg.start_render(ffmpeg_task) + result = ffmpeg.start_render(ffmpeg_task) + if not result: + span.set_status(Status(StatusCode.ERROR)) + return False + span.set_status(Status(StatusCode.OK)) + return True def clear_task_tmp_file(ffmpeg_task): diff --git a/biz/task.py b/biz/task.py index 834197f..f51fb5b 100644 --- a/biz/task.py +++ b/biz/task.py @@ -1,3 +1,7 @@ +import json + +from opentelemetry.trace import Status, StatusCode + from biz.ffmpeg import parse_ffmpeg_task, start_ffmpeg_task, clear_task_tmp_file, probe_video_info from telemetry import get_tracer from template import get_template_def @@ -6,16 +10,20 @@ from util import api def start_task(task_info): tracer = get_tracer(__name__) - with tracer.start_as_current_span("start_task"): + with tracer.start_as_current_span("start_task") as span: + span.set_attribute("task.id", task_info) task_info = api.normalize_task(task_info) + span.set_attribute("task.info", json.dumps(task_info)) template_info = get_template_def(task_info.get("templateId")) api.report_task_start(task_info) ffmpeg_task = parse_ffmpeg_task(task_info, template_info) result = start_ffmpeg_task(ffmpeg_task) if not result: + span.set_status(Status(StatusCode.ERROR)) return api.report_task_failed(task_info) oss_result = api.upload_task_file(task_info, ffmpeg_task) if not oss_result: + span.set_status(Status(StatusCode.ERROR)) return api.report_task_failed(task_info) # 获取视频长度宽度和时长 width, height, duration = probe_video_info(ffmpeg_task) @@ -25,3 +33,4 @@ def start_task(task_info): "height": height, "duration": duration }) + span.set_status(Status(StatusCode.OK)) diff --git a/util/api.py b/util/api.py index 073d524..b2d31a4 100644 --- a/util/api.py +++ b/util/api.py @@ -3,9 +3,11 @@ import logging import os import requests +from opentelemetry.trace import Status, StatusCode import util.system from telemetry import get_tracer +from util import oss session = requests.Session() logger = logging.getLogger(__name__) @@ -24,7 +26,7 @@ def sync_center(): from template import TEMPLATES, download_template tracer = get_tracer(__name__) with tracer.start_as_current_span("sync_center"): - with get_tracer("api").start_as_current_span("sync_center.request") as req_span: + with tracer.start_as_current_span("sync_center.request") as req_span: try: req_span.set_attribute("http.method", "POST") req_span.set_attribute("http.url", os.getenv('API_ENDPOINT') + "/sync") @@ -35,10 +37,10 @@ def sync_center(): TEMPLATES.values()] }, timeout=10) req_span.set_attribute("http.status_code", response.status_code) + req_span.set_attribute("http.response", response.text) response.raise_for_status() - req_span.set_attribute("api.response", response.text) except requests.RequestException as e: - req_span.set_attribute("api.error", str(e)) + req_span.set_attribute("http.error", str(e)) logger.error("请求失败!", e) return [] data = response.json() @@ -68,7 +70,7 @@ def get_template_info(template_id): """ tracer = get_tracer(__name__) with tracer.start_as_current_span("get_template_info"): - with get_tracer("api").start_as_current_span("get_template_info.request") as req_span: + with tracer.start_as_current_span("get_template_info.request") as req_span: try: req_span.set_attribute("http.method", "POST") req_span.set_attribute("http.url", '{0}/template/{1}'.format(os.getenv('API_ENDPOINT'), template_id)) @@ -76,8 +78,8 @@ def get_template_info(template_id): 'accessKey': os.getenv('ACCESS_KEY'), }, timeout=10) req_span.set_attribute("http.status_code", response.status_code) + req_span.set_attribute("http.response", response.text) response.raise_for_status() - req_span.set_attribute("api.response", response.text) except requests.RequestException as e: req_span.set_attribute("api.error", str(e)) logger.error("请求失败!", e) @@ -145,7 +147,7 @@ def get_template_info(template_id): def report_task_success(task_info, **kwargs): tracer = get_tracer(__name__) with tracer.start_as_current_span("report_task_success"): - with get_tracer("api").start_as_current_span("report_task_success.request") as req_span: + with tracer.start_as_current_span("report_task_success.request") as req_span: try: req_span.set_attribute("http.method", "POST") req_span.set_attribute("http.url", @@ -155,8 +157,9 @@ def report_task_success(task_info, **kwargs): **kwargs }, timeout=10) req_span.set_attribute("http.status_code", response.status_code) + req_span.set_attribute("http.response", response.text) response.raise_for_status() - req_span.set_attribute("api.response", response.text) + req_span.set_status(Status(StatusCode.OK)) except requests.RequestException as e: req_span.set_attribute("api.error", str(e)) logger.error("请求失败!", e) @@ -166,7 +169,7 @@ def report_task_success(task_info, **kwargs): def report_task_start(task_info): tracer = get_tracer(__name__) with tracer.start_as_current_span("report_task_start"): - with get_tracer("api").start_as_current_span("report_task_start.request") as req_span: + with tracer.start_as_current_span("report_task_start.request") as req_span: try: req_span.set_attribute("http.method", "POST") req_span.set_attribute("http.url", @@ -175,8 +178,9 @@ def report_task_start(task_info): 'accessKey': os.getenv('ACCESS_KEY'), }, timeout=10) req_span.set_attribute("http.status_code", response.status_code) + req_span.set_attribute("http.response", response.text) response.raise_for_status() - req_span.set_attribute("api.response", response.text) + req_span.set_status(Status(StatusCode.OK)) except requests.RequestException as e: req_span.set_attribute("api.error", str(e)) logger.error("请求失败!", e) @@ -196,21 +200,23 @@ def report_task_failed(task_info, reason=''): 'reason': reason }, timeout=10) req_span.set_attribute("http.status_code", response.status_code) + req_span.set_attribute("http.response", response.text) response.raise_for_status() - req_span.set_attribute("api.response", response.text) + req_span.set_status(Status(StatusCode.OK)) except requests.RequestException as e: req_span.set_attribute("api.error", str(e)) + req_span.set_status(Status(StatusCode.ERROR)) logger.error("请求失败!", e) return None def upload_task_file(task_info, ffmpeg_task): tracer = get_tracer(__name__) - with tracer.start_as_current_span("upload_task_file") as span: + with get_tracer("api").start_as_current_span("upload_task_file") as span: logger.info("开始上传文件: %s", task_info.get("id")) span.set_attribute("file.id", task_info.get("id")) - try: - with tracer.start_as_current_span("upload_task_file.request_upload_url") as req_span: + with tracer.start_as_current_span("upload_task_file.request_upload_url") as req_span: + try: req_span.set_attribute("http.method", "POST") req_span.set_attribute("http.url", '{0}/{1}/uploadUrl'.format(os.getenv('API_ENDPOINT'), task_info.get("id"))) @@ -218,25 +224,16 @@ def upload_task_file(task_info, ffmpeg_task): json={ 'accessKey': os.getenv('ACCESS_KEY'), }, timeout=10) - response.raise_for_status() req_span.set_attribute("http.status_code", response.status_code) - except requests.RequestException as e: - span.set_attribute("api.error", str(e)) - logger.error("请求失败!", e) - return False + req_span.set_attribute("http.response", response.text) + response.raise_for_status() + req_span.set_status(Status(StatusCode.OK)) + except requests.RequestException as e: + span.set_attribute("api.error", str(e)) + req_span.set_status(Status(StatusCode.ERROR)) + logger.error("请求失败!", e) + return False data = response.json() url = data.get('data', "") logger.info("开始上传文件: %s 至 %s", task_info.get("id"), url) - try: - with tracer.start_as_current_span("upload_task_file.start_upload_file") as upload_span: - upload_span.set_attribute("http.method", "PUT") - upload_span.set_attribute("http.url", url) - with open(ffmpeg_task.get_output_file(), 'rb') as f: - requests.put(url, data=f, headers={"Content-Type": "video/mp4"}) - except requests.RequestException as e: - span.set_attribute("api.error", str(e)) - logger.error("上传失败!", e) - return False - finally: - logger.info("上传文件结束: %s", task_info.get("id")) - return True + return oss.upload_to_oss(url, ffmpeg_task.get_output_file()) diff --git a/util/ffmpeg.py b/util/ffmpeg.py index dd3179b..4a3fe59 100644 --- a/util/ffmpeg.py +++ b/util/ffmpeg.py @@ -5,6 +5,8 @@ import subprocess from datetime import datetime from typing import Optional, IO +from opentelemetry.trace import Status, StatusCode + from entity.ffmpeg import FfmpegTask, ENCODER_ARGS, VIDEO_ARGS, AUDIO_ARGS, MUTE_AUDIO_INPUT from telemetry import get_tracer @@ -14,6 +16,7 @@ def to_annexb(file): with get_tracer("ffmpeg").start_as_current_span("to_annexb") as span: span.set_attribute("file.path", file) if not os.path.exists(file): + span.set_status(Status(StatusCode.ERROR)) return file logger.info("ToAnnexb: %s", file) ffmpeg_process = subprocess.run(["ffmpeg", "-y", "-hide_banner", "-i", file, @@ -26,16 +29,19 @@ def to_annexb(file): logger.info("ToAnnexb: %s, returned: %s", file, ffmpeg_process.returncode) span.set_attribute("ffmpeg.code", ffmpeg_process.returncode) if ffmpeg_process.returncode == 0: + span.set_status(Status(StatusCode.OK)) span.set_attribute("file.size", os.path.getsize(file+".ts")) os.remove(file) return file+".ts" else: + span.set_status(Status(StatusCode.ERROR)) return file def re_encode_and_annexb(file): with get_tracer("ffmpeg").start_as_current_span("re_encode_and_annexb") as span: span.set_attribute("file.path", file) if not os.path.exists(file): + span.set_status(Status(StatusCode.ERROR)) return file logger.info("ReEncodeAndAnnexb: %s", file) ffmpeg_process = subprocess.run(["ffmpeg", "-y", "-hide_banner", "-i", file, @@ -49,10 +55,12 @@ def re_encode_and_annexb(file): logger.info("ReEncodeAndAnnexb: %s, returned: %s", file, ffmpeg_process.returncode) span.set_attribute("ffmpeg.code", ffmpeg_process.returncode) if ffmpeg_process.returncode == 0: + span.set_status(Status(StatusCode.OK)) span.set_attribute("file.size", os.path.getsize(file+".ts")) os.remove(file) return file+".ts" else: + span.set_status(Status(StatusCode.ERROR)) return file def start_render(ffmpeg_task: FfmpegTask): @@ -62,10 +70,12 @@ def start_render(ffmpeg_task: FfmpegTask): logger.info(ffmpeg_task) if not ffmpeg_task.need_run(): ffmpeg_task.set_output_file(ffmpeg_task.input_file[0]) + span.set_status(Status(StatusCode.OK)) return True ffmpeg_args = ffmpeg_task.get_ffmpeg_args() if len(ffmpeg_args) == 0: ffmpeg_task.set_output_file(ffmpeg_task.input_file[0]) + span.set_status(Status(StatusCode.OK)) return True ffmpeg_process = subprocess.run(["ffmpeg", "-progress", "-", "-loglevel", "error", *ffmpeg_args], **subprocess_args(True)) span.set_attribute("ffmpeg.args", json.dumps(ffmpeg_process.args)) @@ -76,6 +86,7 @@ def start_render(ffmpeg_task: FfmpegTask): code = ffmpeg_process.returncode span.set_attribute("ffmpeg.code", code) if code != 0: + span.set_status(Status(StatusCode.ERROR)) logger.error("FFMPEG ERROR: %s", ffmpeg_process.stderr) return False span.set_attribute("ffmpeg.out_file", ffmpeg_task.output_file) @@ -83,12 +94,15 @@ def start_render(ffmpeg_task: FfmpegTask): file_size = os.path.getsize(ffmpeg_task.output_file) span.set_attribute("file.size", file_size) if file_size < 4096: + span.set_status(Status(StatusCode.ERROR)) logger.error("FFMPEG ERROR: OUTPUT FILE IS TOO SMALL") return False except OSError: span.set_attribute("file.size", 0) + span.set_status(Status(StatusCode.ERROR)) logger.error("FFMPEG ERROR: OUTPUT FILE NOT FOUND") return False + span.set_status(Status(StatusCode.OK)) return True def handle_ffmpeg_output(stdout: Optional[bytes]) -> str: @@ -129,9 +143,14 @@ def probe_video_info(video_file): span.set_attribute("ffprobe.args", json.dumps(result.args)) span.set_attribute("ffprobe.code", result.returncode) if result.returncode != 0: + span.set_status(Status(StatusCode.ERROR)) return 0, 0, 0 all_result = result.stdout.decode('utf-8').strip() span.set_attribute("ffprobe.out", all_result) + if all_result == '': + span.set_status(Status(StatusCode.ERROR)) + return 0, 0, 0 + span.set_status(Status(StatusCode.OK)) wh, duration = all_result.split('\n') width, height = wh.strip().split('x') return int(width), int(height), float(duration) diff --git a/util/oss.py b/util/oss.py index 5a987b1..faa375a 100644 --- a/util/oss.py +++ b/util/oss.py @@ -2,6 +2,7 @@ import logging import os import requests +from opentelemetry.trace import Status, StatusCode from telemetry import get_tracer @@ -19,6 +20,7 @@ def upload_to_oss(url, file_path): with tracer.start_as_current_span("upload_to_oss") as span: span.set_attribute("file.url", url) span.set_attribute("file.path", file_path) + span.set_attribute("file.size", os.path.getsize(file_path)) max_retries = 5 retries = 0 while retries < max_retries: @@ -28,18 +30,24 @@ def upload_to_oss(url, file_path): req_span.set_attribute("http.method", "PUT") req_span.set_attribute("http.url", url) with open(file_path, 'rb') as f: - response = requests.put(url, data=f, timeout=60) # 设置超时时间为1分钟 + response = requests.put(url, data=f, timeout=60, headers={"Content-Type": "video/mp4"}) req_span.set_attribute("http.status_code", response.status_code) - if response.status_code == 200: - return True + req_span.set_attribute("http.response", response.text) + response.raise_for_status() + req_span.set_status(Status(StatusCode.OK)) + span.set_status(Status(StatusCode.OK)) + return True except requests.exceptions.Timeout: req_span.set_attribute("http.error", "Timeout") + req_span.set_status(Status(StatusCode.ERROR)) retries += 1 logger.warning(f"Upload timed out. Retrying {retries}/{max_retries}...") except Exception as e: req_span.set_attribute("http.error", str(e)) + req_span.set_status(Status(StatusCode.ERROR)) retries += 1 logger.warning(f"Upload failed. Retrying {retries}/{max_retries}...") + span.set_status(Status(StatusCode.ERROR)) return False @@ -71,14 +79,19 @@ def download_from_oss(url, file_path): req_span.set_attribute("http.status_code", response.status_code) with open(file_path, 'wb') as f: f.write(response.content) - span.set_attribute("file.size", os.path.getsize(file_path)) + req_span.set_attribute("file.size", os.path.getsize(file_path)) + req_span.set_status(Status(StatusCode.OK)) + span.set_status(Status(StatusCode.OK)) return True except requests.exceptions.Timeout: - span.set_attribute("http.error", "Timeout") + req_span.set_attribute("http.error", "Timeout") + req_span.set_status(Status(StatusCode.ERROR)) retries += 1 logger.warning(f"Download timed out. Retrying {retries}/{max_retries}...") except Exception as e: - span.set_attribute("http.error", str(e)) + req_span.set_attribute("http.error", str(e)) + req_span.set_status(Status(StatusCode.ERROR)) retries += 1 logger.warning(f"Download failed. Retrying {retries}/{max_retries}...") + span.set_status(Status(StatusCode.ERROR)) return False