From 1f9632761fee0180bfa388855e5a06980bc67bed Mon Sep 17 00:00:00 2001 From: Jerry Yan <792602257@qq.com> Date: Mon, 3 Mar 2025 14:15:20 +0800 Subject: [PATCH 1/2] effect --- biz/ffmpeg.py | 4 ++++ entity/ffmpeg.py | 39 ++++++++++++++++++++++++++++++++++++++- util/api.py | 3 +++ 3 files changed, 45 insertions(+), 1 deletion(-) diff --git a/biz/ffmpeg.py b/biz/ffmpeg.py index b58c86a..ba2e130 100644 --- a/biz/ffmpeg.py +++ b/biz/ffmpeg.py @@ -28,6 +28,8 @@ def parse_ffmpeg_task(task_info, template_info): sub_ffmpeg_task = FfmpegTask(source) sub_ffmpeg_task.annexb = True sub_ffmpeg_task.frame_rate = template_info.get("frame_rate", 25) + for effect in part.get('effects', []): + sub_ffmpeg_task.add_effect(effect) for lut in part.get('filters', []): sub_ffmpeg_task.add_lut(os.path.join(template_info.get("local_path"), lut)) for audio in part.get('audios', []): @@ -43,6 +45,8 @@ def parse_ffmpeg_task(task_info, template_info): if overall.get('source', ''): source = parse_video(overall.get('source'), task_params, template_info) task.add_inputs(source) + for effect in overall.get('effects', []): + task.add_effect(effect) for lut in overall.get('filters', []): task.add_lut(os.path.join(template_info.get("local_path"), lut)) for audio in overall.get('audios', []): diff --git a/entity/ffmpeg.py b/entity/ffmpeg.py index 1328348..7bdf91a 100644 --- a/entity/ffmpeg.py +++ b/entity/ffmpeg.py @@ -1,5 +1,6 @@ import time import uuid +from typing import Any ENCODER_ARGS = ("-c:v", "h264_qsv", "-global_quality", "28", "-look_ahead", "1",) PROFILE_LEVEL_ARGS = ("-profile:v", "high", "-level:v", "4") @@ -7,6 +8,8 @@ PROFILE_LEVEL_ARGS = ("-profile:v", "high", "-level:v", "4") class FfmpegTask(object): + effects: list[str] + def __init__(self, input_file, task_type='copy', output_file=''): self.annexb = False if type(input_file) is str: @@ -29,6 +32,7 @@ class FfmpegTask(object): self.luts = [] self.audios = [] self.overlays = [] + self.effects = [] def __repr__(self): _str = f'FfmpegTask(input_file={self.input_file}, task_type={self.task_type}' @@ -40,6 +44,8 @@ class FfmpegTask(object): _str += f', overlays={self.overlays}' if self.annexb: _str += f', annexb={self.annexb}' + if self.effects: + _str += f', effects={self.effects}' if self.mute: _str += f', mute={self.mute}' return _str + ')' @@ -83,6 +89,10 @@ class FfmpegTask(object): self.luts.extend(luts) self.correct_task_type() + def add_effect(self, *effects): + self.effects.extend(effects) + self.correct_task_type() + def get_output_file(self): if self.task_type == 'copy': return self.input_file[0] @@ -105,6 +115,8 @@ class FfmpegTask(object): return False if len(self.subtitles) > 0: return False + if len(self.effects) > 0: + return False if self.speed != 1: return False if self.zoom_cut is not None: @@ -120,6 +132,8 @@ class FfmpegTask(object): return False if len(self.subtitles) > 0: return False + if len(self.effects) > 0: + return False if self.speed != 1: return False if len(self.audios) > 1: @@ -141,7 +155,7 @@ class FfmpegTask(object): if self.task_type == 'encode': input_args = [] filter_args = [] - output_args = ["-profile", "high", "-level", "4","-shortest", *ENCODER_ARGS] + output_args = [*PROFILE_LEVEL_ARGS,"-shortest", *ENCODER_ARGS] if self.annexb: output_args.append("-bsf:v") output_args.append("h264_mp4toannexb") @@ -162,6 +176,29 @@ class FfmpegTask(object): _f_x = pos_json.get('ltX', 0) _x = f'{float(_f_x/_v_w) :.5f}*iw' filter_args.append(f"[{video_output_str}]crop=x={_x}:y=0:w=ih*ih/iw:h=ih[{video_output_str}]") + for effect in self.effects: + if effect.startswith("cameraShot:"): + param = effect.split(":", 2)[1] + if param == '': + param = "3,1" + _split = param.split(",") + start = 3 + duration = 1 + if len(_split) >= 2: + start = int(_split[0]) + duration = int(_split[1]) + elif len(_split) == 1: + start = int(_split[0]) + _start_out_str = "[eff_s]" + _end_out_str = "[eff_e]" + filter_args.append(f"{video_output_str}fps=fps={self.frame_rate},split{_start_out_str}{_end_out_str}") + filter_args.append(f"{_start_out_str}trim=start={0}:end={start+duration},freezeframes=first={start*self.frame_rate}:replace={start*self.frame_rate}{_start_out_str}") + filter_args.append(f"{_end_out_str}trim=start={start}{_end_out_str}") + video_output_str = "[v_eff]" + filter_args.append(f"{_start_out_str}{_end_out_str}concat=n=2:v=1:a=0{video_output_str}") + elif effect.startswith("zoom:"): + ... + ... for lut in self.luts: filter_args.append(f"[{video_output_str}]lut3d=file={lut}[{video_output_str}]") for overlay in self.overlays: diff --git a/util/api.py b/util/api.py index f264490..8f2550b 100644 --- a/util/api.py +++ b/util/api.py @@ -105,6 +105,9 @@ def get_template_info(template_id): _only_if = template_info.get('onlyIf', '') if _only_if: _template['only_if'] = _only_if + _effects = template_info.get('effects', '') + if _effects: + _template['effects'] = _effects.split("|") return _template # outer template definition From 9d178a3d343b2819d572eda88b66117a9fc68a95 Mon Sep 17 00:00:00 2001 From: Jerry Yan <792602257@qq.com> Date: Tue, 4 Mar 2025 11:42:31 +0800 Subject: [PATCH 2/2] =?UTF-8?q?=E5=9F=8B=E7=82=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- biz/ffmpeg.py | 102 +++++++------ biz/task.py | 41 +++--- index.py | 2 + telemetry/__init__.py | 30 ++++ util/api.py | 331 +++++++++++++++++++++++++----------------- util/ffmpeg.py | 146 +++++++++++-------- util/oss.py | 86 ++++++----- 7 files changed, 443 insertions(+), 295 deletions(-) create mode 100644 telemetry/__init__.py diff --git a/biz/ffmpeg.py b/biz/ffmpeg.py index ba2e130..615f427 100644 --- a/biz/ffmpeg.py +++ b/biz/ffmpeg.py @@ -6,54 +6,57 @@ from entity.ffmpeg import FfmpegTask import logging from util import ffmpeg, oss +from telemetry import get_tracer logger = logging.getLogger('biz/ffmpeg') def parse_ffmpeg_task(task_info, template_info): - tasks = [] - # 中间片段 - task_params_str = task_info.get("taskParams", "{}") - task_params = json.loads(task_params_str) - for part in template_info.get("video_parts"): - source = parse_video(part.get('source'), task_params, template_info) - if not source: - logger.warning("no video found for part: " + str(part)) - continue - only_if = part.get('only_if', '') - if only_if: - if not check_placeholder_exist(only_if, task_params): - logger.info("because only_if exist, placeholder: %s not exist, skip part: %s", only_if, part) + tracer = get_tracer(__name__) + with tracer.start_as_current_span("parse_ffmpeg_task"): + tasks = [] + # 中间片段 + task_params_str = task_info.get("taskParams", "{}") + task_params = json.loads(task_params_str) + for part in template_info.get("video_parts"): + source = parse_video(part.get('source'), task_params, template_info) + if not source: + logger.warning("no video found for part: " + str(part)) continue - sub_ffmpeg_task = FfmpegTask(source) - sub_ffmpeg_task.annexb = True - sub_ffmpeg_task.frame_rate = template_info.get("frame_rate", 25) - for effect in part.get('effects', []): - sub_ffmpeg_task.add_effect(effect) - for lut in part.get('filters', []): - sub_ffmpeg_task.add_lut(os.path.join(template_info.get("local_path"), lut)) - for audio in part.get('audios', []): - sub_ffmpeg_task.add_audios(os.path.join(template_info.get("local_path"), audio)) - for overlay in part.get('overlays', []): - sub_ffmpeg_task.add_overlay(os.path.join(template_info.get("local_path"), overlay)) - tasks.append(sub_ffmpeg_task) - output_file = "out_" + str(time.time()) + ".mp4" - task = FfmpegTask(tasks, output_file=output_file) - overall = template_info.get("overall_template") - task.center_cut = template_info.get("crop_mode", None) - task.frame_rate = template_info.get("frame_rate", 25) - if overall.get('source', ''): - source = parse_video(overall.get('source'), task_params, template_info) - task.add_inputs(source) - for effect in overall.get('effects', []): - task.add_effect(effect) - for lut in overall.get('filters', []): - task.add_lut(os.path.join(template_info.get("local_path"), lut)) - for audio in overall.get('audios', []): - task.add_audios(os.path.join(template_info.get("local_path"), audio)) - for overlay in overall.get('overlays', []): - task.add_overlay(os.path.join(template_info.get("local_path"), overlay)) - return task + only_if = part.get('only_if', '') + if only_if: + if not check_placeholder_exist(only_if, task_params): + logger.info("because only_if exist, placeholder: %s not exist, skip part: %s", only_if, part) + continue + sub_ffmpeg_task = FfmpegTask(source) + sub_ffmpeg_task.annexb = True + sub_ffmpeg_task.frame_rate = template_info.get("frame_rate", 25) + for effect in part.get('effects', []): + sub_ffmpeg_task.add_effect(effect) + for lut in part.get('filters', []): + sub_ffmpeg_task.add_lut(os.path.join(template_info.get("local_path"), lut)) + for audio in part.get('audios', []): + sub_ffmpeg_task.add_audios(os.path.join(template_info.get("local_path"), audio)) + for overlay in part.get('overlays', []): + sub_ffmpeg_task.add_overlay(os.path.join(template_info.get("local_path"), overlay)) + tasks.append(sub_ffmpeg_task) + output_file = "out_" + str(time.time()) + ".mp4" + task = FfmpegTask(tasks, output_file=output_file) + overall = template_info.get("overall_template") + task.center_cut = template_info.get("crop_mode", None) + task.frame_rate = template_info.get("frame_rate", 25) + if overall.get('source', ''): + source = parse_video(overall.get('source'), task_params, template_info) + task.add_inputs(source) + for effect in overall.get('effects', []): + task.add_effect(effect) + for lut in overall.get('filters', []): + task.add_lut(os.path.join(template_info.get("local_path"), lut)) + for audio in overall.get('audios', []): + task.add_audios(os.path.join(template_info.get("local_path"), audio)) + for overlay in overall.get('overlays', []): + task.add_overlay(os.path.join(template_info.get("local_path"), overlay)) + return task def parse_video(source, task_params, template_info): @@ -87,13 +90,16 @@ def check_placeholder_exist(placeholder_id, task_params): return True return False + def start_ffmpeg_task(ffmpeg_task): - for task in ffmpeg_task.analyze_input_render_tasks(): - result = start_ffmpeg_task(task) - if not result: - return False - ffmpeg_task.correct_task_type() - return ffmpeg.start_render(ffmpeg_task) + tracer = get_tracer(__name__) + with tracer.start_as_current_span("start_ffmpeg_task"): + for task in ffmpeg_task.analyze_input_render_tasks(): + result = start_ffmpeg_task(task) + if not result: + return False + ffmpeg_task.correct_task_type() + return ffmpeg.start_render(ffmpeg_task) def clear_task_tmp_file(ffmpeg_task): diff --git a/biz/task.py b/biz/task.py index 2fd879a..834197f 100644 --- a/biz/task.py +++ b/biz/task.py @@ -1,24 +1,27 @@ +from biz.ffmpeg import parse_ffmpeg_task, start_ffmpeg_task, clear_task_tmp_file, probe_video_info +from telemetry import get_tracer from template import get_template_def from util import api def start_task(task_info): - from biz.ffmpeg import parse_ffmpeg_task, start_ffmpeg_task, clear_task_tmp_file, probe_video_info - task_info = api.normalize_task(task_info) - template_info = get_template_def(task_info.get("templateId")) - api.report_task_start(task_info) - ffmpeg_task = parse_ffmpeg_task(task_info, template_info) - result = start_ffmpeg_task(ffmpeg_task) - if not result: - return api.report_task_failed(task_info) - oss_result = api.upload_task_file(task_info, ffmpeg_task) - if not oss_result: - return api.report_task_failed(task_info) - # 获取视频长度宽度和时长 - width, height, duration = probe_video_info(ffmpeg_task) - clear_task_tmp_file(ffmpeg_task) - api.report_task_success(task_info, videoInfo={ - "width": width, - "height": height, - "duration": duration - }) \ No newline at end of file + tracer = get_tracer(__name__) + with tracer.start_as_current_span("start_task"): + task_info = api.normalize_task(task_info) + template_info = get_template_def(task_info.get("templateId")) + api.report_task_start(task_info) + ffmpeg_task = parse_ffmpeg_task(task_info, template_info) + result = start_ffmpeg_task(ffmpeg_task) + if not result: + return api.report_task_failed(task_info) + oss_result = api.upload_task_file(task_info, ffmpeg_task) + if not oss_result: + return api.report_task_failed(task_info) + # 获取视频长度宽度和时长 + width, height, duration = probe_video_info(ffmpeg_task) + clear_task_tmp_file(ffmpeg_task) + api.report_task_success(task_info, videoInfo={ + "width": width, + "height": height, + "duration": duration + }) diff --git a/index.py b/index.py index 84e4958..e7117b3 100644 --- a/index.py +++ b/index.py @@ -2,6 +2,7 @@ from time import sleep import biz.task import config +from telemetry import init_opentelemetry from template import load_local_template from util import api @@ -12,6 +13,7 @@ load_local_template() import logging LOGGER = logging.getLogger(__name__) +init_opentelemetry() while True: # print(get_sys_info()) diff --git a/telemetry/__init__.py b/telemetry/__init__.py new file mode 100644 index 0000000..d8666db --- /dev/null +++ b/telemetry/__init__.py @@ -0,0 +1,30 @@ +import os + +from constant import SOFTWARE_VERSION +from opentelemetry import trace +from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter as OTLPSpanHttpExporter +from opentelemetry.sdk.resources import DEPLOYMENT_ENVIRONMENT, HOST_NAME, Resource, SERVICE_NAME, SERVICE_VERSION +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor + + +def get_tracer(name): + return trace.get_tracer(name) + +# 初始化 OpenTelemetry +def init_opentelemetry(): + # 设置服务名、主机名 + resource = Resource(attributes={ + SERVICE_NAME: "RENDER_WORKER", + SERVICE_VERSION: SOFTWARE_VERSION, + DEPLOYMENT_ENVIRONMENT: "Python", + HOST_NAME: os.getenv("ACCESS_KEY"), + }) + + # 使用HTTP协议上报 + span_processor = BatchSpanProcessor(OTLPSpanHttpExporter( + endpoint="http://tracing-analysis-dc-sh.aliyuncs.com/adapt_e7qojqi4e0@aa79b4d367fb6b7_e7qojqi4e0@53df7ad2afe8301/api/otlp/traces", + )) + + trace_provider = TracerProvider(resource=resource, active_span_processor=span_processor) + trace.set_tracer_provider(trace_provider) diff --git a/util/api.py b/util/api.py index 8f2550b..dff5846 100644 --- a/util/api.py +++ b/util/api.py @@ -1,9 +1,11 @@ +import json import logging import os import requests import util.system +from telemetry import get_tracer session = requests.Session() logger = logging.getLogger(__name__) @@ -19,33 +21,41 @@ def sync_center(): 通过接口获取任务 :return: 任务列表 """ - try: - from template import TEMPLATES, download_template - - response = session.post(os.getenv('API_ENDPOINT') + "/sync", json={ - 'accessKey': os.getenv('ACCESS_KEY'), - 'clientStatus': util.system.get_sys_info(), - 'templateList': [{'id': t.get('id', ''), 'updateTime': t.get('updateTime', '')} for t in TEMPLATES.values()] - }, timeout=10) - response.raise_for_status() - except requests.RequestException as e: - logger.error("请求失败!", e) - return [] - data = response.json() - logger.debug("获取任务结果:【%s】", data) - if data.get('code', 0) == 200: - templates = data.get('data', {}).get('templates', []) - tasks = data.get('data', {}).get('tasks', []) - else: - tasks = [] - templates = [] - logger.warning("获取任务失败") - for template in templates: - template_id = template.get('id', '') - if template_id: - logger.info("更新模板:【%s】", template_id) - download_template(template_id) - return tasks + from template import TEMPLATES, download_template + tracer = get_tracer(__name__) + with tracer.start_as_current_span("sync_center"): + with get_tracer("api").start_as_current_span("sync_center.request") as req_span: + try: + req_span.set_attribute("http.method", "POST") + req_span.set_attribute("http.url", os.getenv('API_ENDPOINT') + "/sync") + response = session.post(os.getenv('API_ENDPOINT') + "/sync", json={ + 'accessKey': os.getenv('ACCESS_KEY'), + 'clientStatus': util.system.get_sys_info(), + 'templateList': [{'id': t.get('id', ''), 'updateTime': t.get('updateTime', '')} for t in + TEMPLATES.values()] + }, timeout=10) + req_span.set_attribute("http.status_code", response.status_code) + response.raise_for_status() + req_span.set_attribute("api.response", response.text) + except requests.RequestException as e: + req_span.set_attribute("api.error", str(e)) + logger.error("请求失败!", e) + return [] + data = response.json() + logger.debug("获取任务结果:【%s】", data) + if data.get('code', 0) == 200: + templates = data.get('data', {}).get('templates', []) + tasks = data.get('data', {}).get('tasks', []) + else: + tasks = [] + templates = [] + logger.warning("获取任务失败") + for template in templates: + template_id = template.get('id', '') + if template_id: + logger.info("更新模板:【%s】", template_id) + download_template(template_id) + return tasks def get_template_info(template_id): @@ -56,126 +66,177 @@ def get_template_info(template_id): :type template_id: str :return: 模板信息 """ - try: - response = session.post('{0}/template/{1}'.format(os.getenv('API_ENDPOINT'), template_id), json={ - 'accessKey': os.getenv('ACCESS_KEY'), - }, timeout=10) - response.raise_for_status() - except requests.RequestException as e: - logger.error("请求失败!", e) - return None - data = response.json() - remote_template_info = data.get('data', {}) - logger.debug("获取模板信息结果:【%s】", remote_template_info) - template = { - 'id': template_id, - 'updateTime': remote_template_info.get('updateTime', template_id), - 'scenic_name': remote_template_info.get('scenicName', '景区'), - 'name': remote_template_info.get('name', '模版'), - 'video_size': '1920x1080', - 'frame_rate': 25, - 'overall_duration': 30, - 'video_parts': [ + tracer = get_tracer(__name__) + with tracer.start_as_current_span("get_template_info"): + with get_tracer("api").start_as_current_span("get_template_info.request") as req_span: + try: + req_span.set_attribute("http.method", "POST") + req_span.set_attribute("http.url", '{0}/template/{1}'.format(os.getenv('API_ENDPOINT'), template_id)) + response = session.post('{0}/template/{1}'.format(os.getenv('API_ENDPOINT'), template_id), json={ + 'accessKey': os.getenv('ACCESS_KEY'), + }, timeout=10) + req_span.set_attribute("http.status_code", response.status_code) + response.raise_for_status() + req_span.set_attribute("api.response", response.text) + except requests.RequestException as e: + req_span.set_attribute("api.error", str(e)) + logger.error("请求失败!", e) + return None + data = response.json() + logger.debug("获取模板信息结果:【%s】", data) + remote_template_info = data.get('data', {}) + template = { + 'id': template_id, + 'updateTime': remote_template_info.get('updateTime', template_id), + 'scenic_name': remote_template_info.get('scenicName', '景区'), + 'name': remote_template_info.get('name', '模版'), + 'video_size': '1920x1080', + 'frame_rate': 25, + 'overall_duration': 30, + 'video_parts': [ - ] - } + ] + } - def _template_normalizer(template_info): - _template = {} - _placeholder_type = template_info.get('isPlaceholder', -1) - if _placeholder_type == 0: - # 固定视频 - _template['source'] = template_info.get('sourceUrl', '') - elif _placeholder_type == 1: - # 占位符 - _template['source'] = "PLACEHOLDER_" + template_info.get('sourceUrl', '') - _template['mute'] = template_info.get('mute', True) - _template['crop_mode'] = template_info.get('cropEnable', None) - else: - _template['source'] = None - _overlays = template_info.get('overlays', '') - if _overlays: - _template['overlays'] = _overlays.split(",") - _audios = template_info.get('audios', '') - if _audios: - _template['audios'] = _audios.split(",") - _luts = template_info.get('luts', '') - if _luts: - _template['luts'] = _luts.split(",") - _only_if = template_info.get('onlyIf', '') - if _only_if: - _template['only_if'] = _only_if - _effects = template_info.get('effects', '') - if _effects: - _template['effects'] = _effects.split("|") - return _template + def _template_normalizer(template_info): + _template = {} + _placeholder_type = template_info.get('isPlaceholder', -1) + if _placeholder_type == 0: + # 固定视频 + _template['source'] = template_info.get('sourceUrl', '') + elif _placeholder_type == 1: + # 占位符 + _template['source'] = "PLACEHOLDER_" + template_info.get('sourceUrl', '') + _template['mute'] = template_info.get('mute', True) + _template['crop_mode'] = template_info.get('cropEnable', None) + else: + _template['source'] = None + _overlays = template_info.get('overlays', '') + if _overlays: + _template['overlays'] = _overlays.split(",") + _audios = template_info.get('audios', '') + if _audios: + _template['audios'] = _audios.split(",") + _luts = template_info.get('luts', '') + if _luts: + _template['luts'] = _luts.split(",") + _only_if = template_info.get('onlyIf', '') + if _only_if: + _template['only_if'] = _only_if + _effects = template_info.get('effects', '') + if _effects: + _template['effects'] = _effects.split("|") + return _template - # outer template definition - overall_template = _template_normalizer(remote_template_info) - template['overall_template'] = overall_template - # inter template definition - inter_template_list = remote_template_info.get('children', []) - for children_template in inter_template_list: - parts = _template_normalizer(children_template) - template['video_parts'].append(parts) - template['local_path'] = os.path.join(os.getenv('TEMPLATE_DIR'), str(template_id)) - return template + # outer template definition + overall_template = _template_normalizer(remote_template_info) + template['overall_template'] = overall_template + # inter template definition + inter_template_list = remote_template_info.get('children', []) + for children_template in inter_template_list: + parts = _template_normalizer(children_template) + template['video_parts'].append(parts) + template['local_path'] = os.path.join(os.getenv('TEMPLATE_DIR'), str(template_id)) + with get_tracer("api").start_as_current_span("get_template_info.template") as res_span: + res_span.set_attribute("normalized.response", json.dumps(template)) + return template def report_task_success(task_info, **kwargs): - try: - response = session.post('{0}/{1}/success'.format(os.getenv('API_ENDPOINT'), task_info.get("id")), json={ - 'accessKey': os.getenv('ACCESS_KEY'), - **kwargs - }, timeout=10) - response.raise_for_status() - except requests.RequestException as e: - logger.error("请求失败!", e) - return None + tracer = get_tracer(__name__) + with tracer.start_as_current_span("report_task_success"): + with get_tracer("api").start_as_current_span("report_task_success.request") as req_span: + try: + req_span.set_attribute("http.method", "POST") + req_span.set_attribute("http.url", + '{0}/{1}/success'.format(os.getenv('API_ENDPOINT'), task_info.get("id"))) + response = session.post('{0}/{1}/success'.format(os.getenv('API_ENDPOINT'), task_info.get("id")), json={ + 'accessKey': os.getenv('ACCESS_KEY'), + **kwargs + }, timeout=10) + req_span.set_attribute("http.status_code", response.status_code) + response.raise_for_status() + req_span.set_attribute("api.response", response.text) + except requests.RequestException as e: + req_span.set_attribute("api.error", str(e)) + logger.error("请求失败!", e) + return None def report_task_start(task_info): - try: - response = session.post('{0}/{1}/start'.format(os.getenv('API_ENDPOINT'), task_info.get("id")), json={ - 'accessKey': os.getenv('ACCESS_KEY'), - }, timeout=10) - response.raise_for_status() - except requests.RequestException as e: - logger.error("请求失败!", e) - return None + tracer = get_tracer(__name__) + with tracer.start_as_current_span("report_task_start"): + with get_tracer("api").start_as_current_span("report_task_start.request") as req_span: + try: + req_span.set_attribute("http.method", "POST") + req_span.set_attribute("http.url", + '{0}/{1}/start'.format(os.getenv('API_ENDPOINT'), task_info.get("id"))) + response = session.post('{0}/{1}/start'.format(os.getenv('API_ENDPOINT'), task_info.get("id")), json={ + 'accessKey': os.getenv('ACCESS_KEY'), + }, timeout=10) + req_span.set_attribute("http.status_code", response.status_code) + response.raise_for_status() + req_span.set_attribute("api.response", response.text) + except requests.RequestException as e: + req_span.set_attribute("api.error", str(e)) + logger.error("请求失败!", e) + return None def report_task_failed(task_info, reason=''): - try: - response = session.post('{0}/{1}/fail'.format(os.getenv('API_ENDPOINT'), task_info.get("id")), json={ - 'accessKey': os.getenv('ACCESS_KEY'), - 'reason': reason - }, timeout=10) - response.raise_for_status() - except requests.RequestException as e: - logger.error("请求失败!", e) - return None + tracer = get_tracer(__name__) + with tracer.start_as_current_span("report_task_failed"): + with tracer.start_as_current_span("report_task_failed.request") as req_span: + try: + req_span.set_attribute("http.method", "POST") + req_span.set_attribute("http.url", + '{0}/{1}/fail'.format(os.getenv('API_ENDPOINT'), task_info.get("id"))) + response = session.post('{0}/{1}/fail'.format(os.getenv('API_ENDPOINT'), task_info.get("id")), json={ + 'accessKey': os.getenv('ACCESS_KEY'), + 'reason': reason + }, timeout=10) + req_span.set_attribute("http.status_code", response.status_code) + response.raise_for_status() + req_span.set_attribute("api.response", response.text) + except requests.RequestException as e: + req_span.set_attribute("api.error", str(e)) + logger.error("请求失败!", e) + return None def upload_task_file(task_info, ffmpeg_task): - logger.info("开始上传文件: %s", task_info.get("id")) - try: - response = session.post('{0}/{1}/uploadUrl'.format(os.getenv('API_ENDPOINT'), task_info.get("id")), json={ - 'accessKey': os.getenv('ACCESS_KEY'), - }, timeout=10) - response.raise_for_status() - except requests.RequestException as e: - logger.error("请求失败!", e) - return False - data = response.json() - url = data.get('data', "") - logger.info("开始上传文件: %s 至 %s", task_info.get("id"), url) - try: - with open(ffmpeg_task.get_output_file(), 'rb') as f: - requests.put(url, data=f, headers={"Content-Type": "video/mp4"}) - except requests.RequestException as e: - logger.error("上传失败!", e) - return False - finally: - logger.info("上传文件结束: %s", task_info.get("id")) - return True + tracer = get_tracer(__name__) + with tracer.start_as_current_span("upload_task_file") as span: + logger.info("开始上传文件: %s", task_info.get("id")) + span.set_attribute("file.id", task_info.get("id")) + try: + with tracer.start_as_current_span("upload_task_file.request_upload_url") as req_span: + req_span.set_attribute("http.method", "POST") + req_span.set_attribute("http.url", + '{0}/{1}/uploadUrl'.format(os.getenv('API_ENDPOINT'), task_info.get("id"))) + response = session.post('{0}/{1}/uploadUrl'.format(os.getenv('API_ENDPOINT'), task_info.get("id")), + json={ + 'accessKey': os.getenv('ACCESS_KEY'), + }, timeout=10) + response.raise_for_status() + req_span.set_attribute("http.status_code", response.status_code) + except requests.RequestException as e: + span.set_attribute("api.error", str(e)) + logger.error("请求失败!", e) + return False + data = response.json() + url = data.get('data', "") + logger.info("开始上传文件: %s 至 %s", task_info.get("id"), url) + try: + with tracer.start_as_current_span("upload_task_file.start_upload_file") as upload_span: + upload_span.set_attribute("http.method", "PUT") + upload_span.set_attribute("http.url", url) + with open(ffmpeg_task.get_output_file(), 'rb') as f: + requests.put(url, data=f, headers={"Content-Type": "video/mp4"}) + except requests.RequestException as e: + span.set_attribute("api.error", str(e)) + logger.error("上传失败!", e) + return False + finally: + logger.info("上传文件结束: %s", task_info.get("id")) + return True diff --git a/util/ffmpeg.py b/util/ffmpeg.py index f572f78..da56e67 100644 --- a/util/ffmpeg.py +++ b/util/ffmpeg.py @@ -1,3 +1,4 @@ +import json import logging import os import subprocess @@ -5,60 +6,81 @@ from datetime import datetime from typing import Optional, IO from entity.ffmpeg import FfmpegTask, ENCODER_ARGS, PROFILE_LEVEL_ARGS +from telemetry import get_tracer logger = logging.getLogger(__name__) def to_annexb(file): - if not os.path.exists(file): - return file - logger.info("ToAnnexb: %s", file) - ffmpeg_process = subprocess.run(["ffmpeg", "-y", "-hide_banner", "-i", file, "-c", "copy", "-bsf:v", "h264_mp4toannexb", - "-f", "mpegts", file+".ts"]) - logger.info("ToAnnexb: %s, returned: %s", file, ffmpeg_process.returncode) - if ffmpeg_process.returncode == 0: - os.remove(file) - return file+".ts" - else: - return file + with get_tracer("ffmpeg").start_as_current_span("to_annexb") as span: + span.set_attribute("file.path", file) + if not os.path.exists(file): + return file + logger.info("ToAnnexb: %s", file) + ffmpeg_process = subprocess.run(["ffmpeg", "-y", "-hide_banner", "-i", file, "-c", "copy", "-bsf:v", "h264_mp4toannexb", + "-f", "mpegts", file+".ts"]) + span.set_attribute("ffmpeg.args", json.dumps(ffmpeg_process.args)) + logger.info("ToAnnexb: %s, returned: %s", file, ffmpeg_process.returncode) + span.set_attribute("ffmpeg.code", ffmpeg_process.returncode) + if ffmpeg_process.returncode == 0: + span.set_attribute("file.size", os.path.getsize(file+".ts")) + os.remove(file) + return file+".ts" + else: + return file def re_encode_and_annexb(file): - if not os.path.exists(file): - return file - logger.info("ReEncodeAndAnnexb: %s", file) - ffmpeg_process = subprocess.run(["ffmpeg", "-y", "-hide_banner", "-i", file, *PROFILE_LEVEL_ARGS, *ENCODER_ARGS, "-bsf:v", "h264_mp4toannexb", - "-f", "mpegts", file +".ts"]) - logger.info("ReEncodeAndAnnexb: %s, returned: %s", file, ffmpeg_process.returncode) - if ffmpeg_process.returncode == 0: - os.remove(file) - return file+".ts" - else: - return file + with get_tracer("ffmpeg").start_as_current_span("re_encode_and_annexb") as span: + span.set_attribute("file.path", file) + if not os.path.exists(file): + return file + logger.info("ReEncodeAndAnnexb: %s", file) + ffmpeg_process = subprocess.run(["ffmpeg", "-y", "-hide_banner", "-i", file, *PROFILE_LEVEL_ARGS, *ENCODER_ARGS, "-bsf:v", "h264_mp4toannexb", + "-f", "mpegts", file +".ts"]) + span.set_attribute("ffmpeg.args", json.dumps(ffmpeg_process.args)) + logger.info("ReEncodeAndAnnexb: %s, returned: %s", file, ffmpeg_process.returncode) + span.set_attribute("ffmpeg.code", ffmpeg_process.returncode) + if ffmpeg_process.returncode == 0: + span.set_attribute("file.size", os.path.getsize(file+".ts")) + os.remove(file) + return file+".ts" + else: + return file def start_render(ffmpeg_task: FfmpegTask): - logger.info(ffmpeg_task) - if not ffmpeg_task.need_run(): - ffmpeg_task.set_output_file(ffmpeg_task.input_file[0]) - return True - ffmpeg_args = ffmpeg_task.get_ffmpeg_args() - logger.info(ffmpeg_args) - if len(ffmpeg_args) == 0: - ffmpeg_task.set_output_file(ffmpeg_task.input_file[0]) - return True - ffmpeg_process = subprocess.run(["ffmpeg", "-progress", "-", "-loglevel", "error", *ffmpeg_args], **subprocess_args(True)) - logger.info("FINISH TASK, OUTPUT IS %s", handle_ffmpeg_output(ffmpeg_process.stdout)) - code = ffmpeg_process.returncode - if code != 0: - logger.error("FFMPEG ERROR: %s", ffmpeg_process.stderr) - return False - try: - out_file_stat = os.stat(ffmpeg_task.output_file) - if out_file_stat.st_size < 4096: - logger.error("FFMPEG ERROR: OUTPUT FILE IS TOO SMALL") + tracer = get_tracer(__name__) + with tracer.start_as_current_span("start_render") as span: + span.set_attribute("ffmpeg.task", str(ffmpeg_task)) + logger.info(ffmpeg_task) + if not ffmpeg_task.need_run(): + ffmpeg_task.set_output_file(ffmpeg_task.input_file[0]) + return True + ffmpeg_args = ffmpeg_task.get_ffmpeg_args() + logger.info(ffmpeg_args) + if len(ffmpeg_args) == 0: + ffmpeg_task.set_output_file(ffmpeg_task.input_file[0]) + return True + ffmpeg_process = subprocess.run(["ffmpeg", "-progress", "-", "-loglevel", "error", *ffmpeg_args], **subprocess_args(True)) + span.set_attribute("ffmpeg.args", json.dumps(ffmpeg_process.args)) + ffmpeg_final_out = handle_ffmpeg_output(ffmpeg_process.stdout) + span.set_attribute("ffmpeg.out", ffmpeg_final_out) + logger.info("FINISH TASK, OUTPUT IS %s", ffmpeg_final_out) + code = ffmpeg_process.returncode + span.set_attribute("ffmpeg.code", code) + if code != 0: + logger.error("FFMPEG ERROR: %s", ffmpeg_process.stderr) return False - except OSError: - logger.error("FFMPEG ERROR: OUTPUT FILE NOT FOUND") - return False - return True + span.set_attribute("ffmpeg.out_file", ffmpeg_task.output_file) + try: + file_size = os.path.getsize(ffmpeg_task.output_file) + span.set_attribute("file.size", file_size) + if file_size < 4096: + logger.error("FFMPEG ERROR: OUTPUT FILE IS TOO SMALL") + return False + except OSError: + span.set_attribute("file.size", 0) + logger.error("FFMPEG ERROR: OUTPUT FILE NOT FOUND") + return False + return True def handle_ffmpeg_output(stdout: Optional[bytes]) -> str: out_time = "0:0:0.0" @@ -79,27 +101,31 @@ def handle_ffmpeg_output(stdout: Optional[bytes]) -> str: print("[ ]Speed:", out_time, "@", speed) return out_time+"@"+speed - def duration_str_to_float(duration_str: str) -> float: _duration = datetime.strptime(duration_str, "%H:%M:%S.%f") - datetime(1900, 1, 1) return _duration.total_seconds() def probe_video_info(video_file): - # 获取宽度和高度 - result = subprocess.run( - ["ffprobe", '-v', 'error', '-select_streams', 'v:0', '-show_entries', 'stream=width,height:format=duration', '-of', - 'csv=s=x:p=0', video_file], - stderr=subprocess.STDOUT, - **subprocess_args(True) - ) - if result.returncode != 0: - return 0, 0, 0 - all_result = result.stdout.decode('utf-8').strip() - wh, duration = all_result.split('\n') - width, height = wh.strip().split('x') - - return int(width), int(height), float(duration) + tracer = get_tracer(__name__) + with tracer.start_as_current_span("probe_video_info") as span: + span.set_attribute("video.file", video_file) + # 获取宽度和高度 + result = subprocess.run( + ["ffprobe", '-v', 'error', '-select_streams', 'v:0', '-show_entries', 'stream=width,height:format=duration', '-of', + 'csv=s=x:p=0', video_file], + stderr=subprocess.STDOUT, + **subprocess_args(True) + ) + span.set_attribute("ffprobe.args", json.dumps(result.args)) + span.set_attribute("ffprobe.code", result.returncode) + if result.returncode != 0: + return 0, 0, 0 + all_result = result.stdout.decode('utf-8').strip() + span.set_attribute("ffprobe.out", all_result) + wh, duration = all_result.split('\n') + width, height = wh.strip().split('x') + return int(width), int(height), float(duration) # Create a set of arguments which make a ``subprocess.Popen`` (and diff --git a/util/oss.py b/util/oss.py index e41f6d1..f267489 100644 --- a/util/oss.py +++ b/util/oss.py @@ -3,6 +3,8 @@ import os import requests +from telemetry import get_tracer + logger = logging.getLogger(__name__) @@ -13,20 +15,29 @@ def upload_to_oss(url, file_path): :param str file_path: 文件路径 :return bool: 是否成功 """ - max_retries = 5 - retries = 0 - while retries < max_retries: - try: - with open(file_path, 'rb') as f: - response = requests.put(url, data=f, timeout=60) # 设置超时时间为1分钟 - if response.status_code == 200: - return True - except requests.exceptions.Timeout: - retries += 1 - logger.warning(f"Upload timed out. Retrying {retries}/{max_retries}...") - except Exception as e: - logger.warning(f"Upload failed. Retrying {retries}/{max_retries}...") - retries += 1 + tracer = get_tracer(__name__) + with tracer.start_as_current_span("upload_to_oss") as span: + span.set_attribute("file.url", url) + span.set_attribute("file.path", file_path) + max_retries = 5 + retries = 0 + while retries < max_retries: + try: + with tracer.start_as_current_span("upload_to_oss.request") as req_span: + req_span.set_attribute("http.method", "PUT") + req_span.set_attribute("http.url", url) + with open(file_path, 'rb') as f: + response = requests.put(url, data=f, timeout=60) # 设置超时时间为1分钟 + req_span.set_attribute("http.status_code", response.status_code) + if response.status_code == 200: + return True + except requests.exceptions.Timeout: + retries += 1 + logger.warning(f"Upload timed out. Retrying {retries}/{max_retries}...") + except Exception as e: + retries += 1 + span.set_attribute("oss.error", str(e)) + logger.warning(f"Upload failed. Retrying {retries}/{max_retries}...") return False @@ -37,23 +48,32 @@ def download_from_oss(url, file_path): :param Union[LiteralString, str, bytes] file_path: 文件路径 :return bool: 是否成功 """ - logging.info("download_from_oss: %s", url) - file_dir, file_name = os.path.split(file_path) - if file_dir: - if not os.path.exists(file_dir): - os.makedirs(file_dir) - max_retries = 5 - retries = 0 - while retries < max_retries: - try: - response = requests.get(url, timeout=15) # 设置超时时间 - with open(file_path, 'wb') as f: - f.write(response.content) - return True - except requests.exceptions.Timeout: - retries += 1 - logger.warning(f"Download timed out. Retrying {retries}/{max_retries}...") - except Exception as e: - logger.warning(f"Download failed. Retrying {retries}/{max_retries}...") - retries += 1 + tracer = get_tracer(__name__) + with tracer.start_as_current_span("download_from_oss") as span: + span.set_attribute("file.url", url) + span.set_attribute("file.path", file_path) + logging.info("download_from_oss: %s", url) + file_dir, file_name = os.path.split(file_path) + if file_dir: + if not os.path.exists(file_dir): + os.makedirs(file_dir) + max_retries = 5 + retries = 0 + while retries < max_retries: + try: + with tracer.start_as_current_span("download_from_oss.request") as req_span: + req_span.set_attribute("http.method", "GET") + req_span.set_attribute("http.url", url) + response = requests.get(url, timeout=15) # 设置超时时间 + req_span.set_attribute("http.status_code", response.status_code) + with open(file_path, 'wb') as f: + f.write(response.content) + return True + except requests.exceptions.Timeout: + retries += 1 + logger.warning(f"Download timed out. Retrying {retries}/{max_retries}...") + except Exception as e: + retries += 1 + span.set_attribute("oss.error", str(e)) + logger.warning(f"Download failed. Retrying {retries}/{max_retries}...") return False