import json import logging import os import requests from opentelemetry.trace import Status, StatusCode import util.system from telemetry import get_tracer from util import oss session = requests.Session() logger = logging.getLogger(__name__) def normalize_task(task_info): ... return task_info def sync_center(): """ 通过接口获取任务 :return: 任务列表 """ from template import TEMPLATES, download_template try: response = session.post(os.getenv('API_ENDPOINT') + "/sync", json={ 'accessKey': os.getenv('ACCESS_KEY'), 'clientStatus': util.system.get_sys_info(), 'templateList': [{'id': t.get('id', ''), 'updateTime': t.get('updateTime', '')} for t in TEMPLATES.values()] }, timeout=10) response.raise_for_status() except requests.RequestException as e: logger.error("请求失败!", e) return [] data = response.json() logger.debug("获取任务结果:【%s】", data) if data.get('code', 0) == 200: templates = data.get('data', {}).get('templates', []) tasks = data.get('data', {}).get('tasks', []) else: tasks = [] templates = [] logger.warning("获取任务失败") for template in templates: template_id = template.get('id', '') if template_id: logger.info("更新模板:【%s】", template_id) download_template(template_id) return tasks def get_template_info(template_id): """ 通过接口获取模板信息 :rtype: Template :param template_id: 模板id :type template_id: str :return: 模板信息 """ tracer = get_tracer(__name__) with tracer.start_as_current_span("get_template_info"): with tracer.start_as_current_span("get_template_info.request") as req_span: try: req_span.set_attribute("http.method", "POST") req_span.set_attribute("http.url", '{0}/template/{1}'.format(os.getenv('API_ENDPOINT'), template_id)) response = session.post('{0}/template/{1}'.format(os.getenv('API_ENDPOINT'), template_id), json={ 'accessKey': os.getenv('ACCESS_KEY'), }, timeout=10) req_span.set_attribute("http.status_code", response.status_code) req_span.set_attribute("http.response", response.text) response.raise_for_status() except requests.RequestException as e: req_span.set_attribute("api.error", str(e)) logger.error("请求失败!", e) return None data = response.json() logger.debug("获取模板信息结果:【%s】", data) remote_template_info = data.get('data', {}) template = { 'id': template_id, 'updateTime': remote_template_info.get('updateTime', template_id), 'scenic_name': remote_template_info.get('scenicName', '景区'), 'name': remote_template_info.get('name', '模版'), 'video_size': remote_template_info.get('resolution', '1920x1080'), 'frame_rate': 25, 'overall_duration': 30, 'video_parts': [ ] } def _template_normalizer(template_info): _template = {} _placeholder_type = template_info.get('isPlaceholder', -1) if _placeholder_type == 0: # 固定视频 _template['source'] = template_info.get('sourceUrl', '') elif _placeholder_type == 1: # 占位符 _template['source'] = "PLACEHOLDER_" + template_info.get('sourceUrl', '') _template['mute'] = template_info.get('mute', True) _template['crop_mode'] = template_info.get('cropEnable', None) else: _template['source'] = None _overlays = template_info.get('overlays', '') if _overlays: _template['overlays'] = _overlays.split(",") _audios = template_info.get('audios', '') if _audios: _template['audios'] = _audios.split(",") _luts = template_info.get('luts', '') if _luts: _template['luts'] = _luts.split(",") _only_if = template_info.get('onlyIf', '') if _only_if: _template['only_if'] = _only_if _effects = template_info.get('effects', '') if _effects: _template['effects'] = _effects.split("|") return _template # outer template definition overall_template = _template_normalizer(remote_template_info) template['overall_template'] = overall_template # inter template definition inter_template_list = remote_template_info.get('children', []) for children_template in inter_template_list: parts = _template_normalizer(children_template) template['video_parts'].append(parts) template['local_path'] = os.path.join(os.getenv('TEMPLATE_DIR'), str(template_id)) with get_tracer("api").start_as_current_span("get_template_info.template") as res_span: res_span.set_attribute("normalized.response", json.dumps(template)) return template def report_task_success(task_info, **kwargs): tracer = get_tracer(__name__) with tracer.start_as_current_span("report_task_success"): with tracer.start_as_current_span("report_task_success.request") as req_span: try: req_span.set_attribute("http.method", "POST") req_span.set_attribute("http.url", '{0}/{1}/success'.format(os.getenv('API_ENDPOINT'), task_info.get("id"))) response = session.post('{0}/{1}/success'.format(os.getenv('API_ENDPOINT'), task_info.get("id")), json={ 'accessKey': os.getenv('ACCESS_KEY'), **kwargs }, timeout=10) req_span.set_attribute("http.status_code", response.status_code) req_span.set_attribute("http.response", response.text) response.raise_for_status() req_span.set_status(Status(StatusCode.OK)) except requests.RequestException as e: req_span.set_attribute("api.error", str(e)) logger.error("请求失败!", e) return None def report_task_start(task_info): tracer = get_tracer(__name__) with tracer.start_as_current_span("report_task_start"): with tracer.start_as_current_span("report_task_start.request") as req_span: try: req_span.set_attribute("http.method", "POST") req_span.set_attribute("http.url", '{0}/{1}/start'.format(os.getenv('API_ENDPOINT'), task_info.get("id"))) response = session.post('{0}/{1}/start'.format(os.getenv('API_ENDPOINT'), task_info.get("id")), json={ 'accessKey': os.getenv('ACCESS_KEY'), }, timeout=10) req_span.set_attribute("http.status_code", response.status_code) req_span.set_attribute("http.response", response.text) response.raise_for_status() req_span.set_status(Status(StatusCode.OK)) except requests.RequestException as e: req_span.set_attribute("api.error", str(e)) logger.error("请求失败!", e) return None def report_task_failed(task_info, reason=''): tracer = get_tracer(__name__) with tracer.start_as_current_span("report_task_failed") as span: span.set_attribute("task_id", task_info.get("id")) span.set_attribute("reason", reason) with tracer.start_as_current_span("report_task_failed.request") as req_span: try: req_span.set_attribute("http.method", "POST") req_span.set_attribute("http.url", '{0}/{1}/fail'.format(os.getenv('API_ENDPOINT'), task_info.get("id"))) response = session.post('{0}/{1}/fail'.format(os.getenv('API_ENDPOINT'), task_info.get("id")), json={ 'accessKey': os.getenv('ACCESS_KEY'), 'reason': reason }, timeout=10) req_span.set_attribute("http.status_code", response.status_code) req_span.set_attribute("http.response", response.text) response.raise_for_status() req_span.set_status(Status(StatusCode.OK)) except requests.RequestException as e: req_span.set_attribute("api.error", str(e)) req_span.set_status(Status(StatusCode.ERROR)) logger.error("请求失败!", e) return None def upload_task_file(task_info, ffmpeg_task): tracer = get_tracer(__name__) with get_tracer("api").start_as_current_span("upload_task_file") as span: logger.info("开始上传文件: %s", task_info.get("id")) span.set_attribute("file.id", task_info.get("id")) with tracer.start_as_current_span("upload_task_file.request_upload_url") as req_span: try: req_span.set_attribute("http.method", "POST") req_span.set_attribute("http.url", '{0}/{1}/uploadUrl'.format(os.getenv('API_ENDPOINT'), task_info.get("id"))) response = session.post('{0}/{1}/uploadUrl'.format(os.getenv('API_ENDPOINT'), task_info.get("id")), json={ 'accessKey': os.getenv('ACCESS_KEY'), }, timeout=10) req_span.set_attribute("http.status_code", response.status_code) req_span.set_attribute("http.response", response.text) response.raise_for_status() req_span.set_status(Status(StatusCode.OK)) except requests.RequestException as e: span.set_attribute("api.error", str(e)) req_span.set_status(Status(StatusCode.ERROR)) logger.error("请求失败!", e) return False data = response.json() url = data.get('data', "") logger.info("开始上传文件: %s 至 %s", task_info.get("id"), url) return oss.upload_to_oss(url, ffmpeg_task.get_output_file()) def get_task_info(id): try: response = session.get(os.getenv('API_ENDPOINT') + "/" + id + "/info", params={ 'accessKey': os.getenv('ACCESS_KEY'), }, timeout=10) response.raise_for_status() except requests.RequestException as e: logger.error("请求失败!", e) return [] data = response.json() logger.debug("获取任务结果:【%s】", data) if data.get('code', 0) == 200: return data.get('data', {})