233 lines
11 KiB
Python
233 lines
11 KiB
Python
import json
|
|
import logging
|
|
import os
|
|
|
|
import requests
|
|
|
|
import util.system
|
|
from telemetry import get_tracer
|
|
from util import oss
|
|
|
|
session = requests.Session()
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def normalize_task(task_info):
|
|
...
|
|
return task_info
|
|
|
|
|
|
def sync_center():
|
|
"""
|
|
通过接口获取任务
|
|
:return: 任务列表
|
|
"""
|
|
from template import TEMPLATES, download_template
|
|
tracer = get_tracer(__name__)
|
|
with tracer.start_as_current_span("sync_center"):
|
|
with get_tracer("api").start_as_current_span("sync_center.request") as req_span:
|
|
try:
|
|
req_span.set_attribute("http.method", "POST")
|
|
req_span.set_attribute("http.url", os.getenv('API_ENDPOINT') + "/sync")
|
|
response = session.post(os.getenv('API_ENDPOINT') + "/sync", json={
|
|
'accessKey': os.getenv('ACCESS_KEY'),
|
|
'clientStatus': util.system.get_sys_info(),
|
|
'templateList': [{'id': t.get('id', ''), 'updateTime': t.get('updateTime', '')} for t in
|
|
TEMPLATES.values()]
|
|
}, timeout=10)
|
|
req_span.set_attribute("http.status_code", response.status_code)
|
|
req_span.set_attribute("http.response", response.text)
|
|
response.raise_for_status()
|
|
except requests.RequestException as e:
|
|
req_span.set_attribute("http.error", str(e))
|
|
logger.error("请求失败!", e)
|
|
return []
|
|
data = response.json()
|
|
logger.debug("获取任务结果:【%s】", data)
|
|
if data.get('code', 0) == 200:
|
|
templates = data.get('data', {}).get('templates', [])
|
|
tasks = data.get('data', {}).get('tasks', [])
|
|
else:
|
|
tasks = []
|
|
templates = []
|
|
logger.warning("获取任务失败")
|
|
for template in templates:
|
|
template_id = template.get('id', '')
|
|
if template_id:
|
|
logger.info("更新模板:【%s】", template_id)
|
|
download_template(template_id)
|
|
return tasks
|
|
|
|
|
|
def get_template_info(template_id):
|
|
"""
|
|
通过接口获取模板信息
|
|
:rtype: Template
|
|
:param template_id: 模板id
|
|
:type template_id: str
|
|
:return: 模板信息
|
|
"""
|
|
tracer = get_tracer(__name__)
|
|
with tracer.start_as_current_span("get_template_info"):
|
|
with get_tracer("api").start_as_current_span("get_template_info.request") as req_span:
|
|
try:
|
|
req_span.set_attribute("http.method", "POST")
|
|
req_span.set_attribute("http.url", '{0}/template/{1}'.format(os.getenv('API_ENDPOINT'), template_id))
|
|
response = session.post('{0}/template/{1}'.format(os.getenv('API_ENDPOINT'), template_id), json={
|
|
'accessKey': os.getenv('ACCESS_KEY'),
|
|
}, timeout=10)
|
|
req_span.set_attribute("http.status_code", response.status_code)
|
|
req_span.set_attribute("http.response", response.text)
|
|
response.raise_for_status()
|
|
except requests.RequestException as e:
|
|
req_span.set_attribute("api.error", str(e))
|
|
logger.error("请求失败!", e)
|
|
return None
|
|
data = response.json()
|
|
logger.debug("获取模板信息结果:【%s】", data)
|
|
remote_template_info = data.get('data', {})
|
|
template = {
|
|
'id': template_id,
|
|
'updateTime': remote_template_info.get('updateTime', template_id),
|
|
'scenic_name': remote_template_info.get('scenicName', '景区'),
|
|
'name': remote_template_info.get('name', '模版'),
|
|
'video_size': remote_template_info.get('resolution', '1920x1080'),
|
|
'frame_rate': 25,
|
|
'overall_duration': 30,
|
|
'video_parts': [
|
|
|
|
]
|
|
}
|
|
|
|
def _template_normalizer(template_info):
|
|
_template = {}
|
|
_placeholder_type = template_info.get('isPlaceholder', -1)
|
|
if _placeholder_type == 0:
|
|
# 固定视频
|
|
_template['source'] = template_info.get('sourceUrl', '')
|
|
elif _placeholder_type == 1:
|
|
# 占位符
|
|
_template['source'] = "PLACEHOLDER_" + template_info.get('sourceUrl', '')
|
|
_template['mute'] = template_info.get('mute', True)
|
|
_template['crop_mode'] = template_info.get('cropEnable', None)
|
|
else:
|
|
_template['source'] = None
|
|
_overlays = template_info.get('overlays', '')
|
|
if _overlays:
|
|
_template['overlays'] = _overlays.split(",")
|
|
_audios = template_info.get('audios', '')
|
|
if _audios:
|
|
_template['audios'] = _audios.split(",")
|
|
_luts = template_info.get('luts', '')
|
|
if _luts:
|
|
_template['luts'] = _luts.split(",")
|
|
_only_if = template_info.get('onlyIf', '')
|
|
if _only_if:
|
|
_template['only_if'] = _only_if
|
|
_effects = template_info.get('effects', '')
|
|
if _effects:
|
|
_template['effects'] = _effects.split("|")
|
|
return _template
|
|
|
|
# outer template definition
|
|
overall_template = _template_normalizer(remote_template_info)
|
|
template['overall_template'] = overall_template
|
|
# inter template definition
|
|
inter_template_list = remote_template_info.get('children', [])
|
|
for children_template in inter_template_list:
|
|
parts = _template_normalizer(children_template)
|
|
template['video_parts'].append(parts)
|
|
template['local_path'] = os.path.join(os.getenv('TEMPLATE_DIR'), str(template_id))
|
|
with get_tracer("api").start_as_current_span("get_template_info.template") as res_span:
|
|
res_span.set_attribute("normalized.response", json.dumps(template))
|
|
return template
|
|
|
|
|
|
def report_task_success(task_info, **kwargs):
|
|
tracer = get_tracer(__name__)
|
|
with tracer.start_as_current_span("report_task_success"):
|
|
with get_tracer("api").start_as_current_span("report_task_success.request") as req_span:
|
|
try:
|
|
req_span.set_attribute("http.method", "POST")
|
|
req_span.set_attribute("http.url",
|
|
'{0}/{1}/success'.format(os.getenv('API_ENDPOINT'), task_info.get("id")))
|
|
response = session.post('{0}/{1}/success'.format(os.getenv('API_ENDPOINT'), task_info.get("id")), json={
|
|
'accessKey': os.getenv('ACCESS_KEY'),
|
|
**kwargs
|
|
}, timeout=10)
|
|
req_span.set_attribute("http.status_code", response.status_code)
|
|
req_span.set_attribute("http.response", response.text)
|
|
response.raise_for_status()
|
|
except requests.RequestException as e:
|
|
req_span.set_attribute("api.error", str(e))
|
|
logger.error("请求失败!", e)
|
|
return None
|
|
|
|
|
|
def report_task_start(task_info):
|
|
tracer = get_tracer(__name__)
|
|
with tracer.start_as_current_span("report_task_start"):
|
|
with get_tracer("api").start_as_current_span("report_task_start.request") as req_span:
|
|
try:
|
|
req_span.set_attribute("http.method", "POST")
|
|
req_span.set_attribute("http.url",
|
|
'{0}/{1}/start'.format(os.getenv('API_ENDPOINT'), task_info.get("id")))
|
|
response = session.post('{0}/{1}/start'.format(os.getenv('API_ENDPOINT'), task_info.get("id")), json={
|
|
'accessKey': os.getenv('ACCESS_KEY'),
|
|
}, timeout=10)
|
|
req_span.set_attribute("http.status_code", response.status_code)
|
|
req_span.set_attribute("http.response", response.text)
|
|
response.raise_for_status()
|
|
except requests.RequestException as e:
|
|
req_span.set_attribute("api.error", str(e))
|
|
logger.error("请求失败!", e)
|
|
return None
|
|
|
|
|
|
def report_task_failed(task_info, reason=''):
|
|
tracer = get_tracer(__name__)
|
|
with tracer.start_as_current_span("report_task_failed"):
|
|
with get_tracer("api").start_as_current_span("report_task_failed.request") as req_span:
|
|
try:
|
|
req_span.set_attribute("http.method", "POST")
|
|
req_span.set_attribute("http.url",
|
|
'{0}/{1}/fail'.format(os.getenv('API_ENDPOINT'), task_info.get("id")))
|
|
response = session.post('{0}/{1}/fail'.format(os.getenv('API_ENDPOINT'), task_info.get("id")), json={
|
|
'accessKey': os.getenv('ACCESS_KEY'),
|
|
'reason': reason
|
|
}, timeout=10)
|
|
req_span.set_attribute("http.status_code", response.status_code)
|
|
req_span.set_attribute("http.response", response.text)
|
|
response.raise_for_status()
|
|
except requests.RequestException as e:
|
|
req_span.set_attribute("api.error", str(e))
|
|
logger.error("请求失败!", e)
|
|
return None
|
|
|
|
|
|
def upload_task_file(task_info, ffmpeg_task):
|
|
tracer = get_tracer(__name__)
|
|
with get_tracer("api").start_as_current_span("upload_task_file") as span:
|
|
logger.info("开始上传文件: %s", task_info.get("id"))
|
|
span.set_attribute("file.id", task_info.get("id"))
|
|
try:
|
|
with tracer.start_as_current_span("upload_task_file.request_upload_url") as req_span:
|
|
req_span.set_attribute("http.method", "POST")
|
|
req_span.set_attribute("http.url",
|
|
'{0}/{1}/uploadUrl'.format(os.getenv('API_ENDPOINT'), task_info.get("id")))
|
|
response = session.post('{0}/{1}/uploadUrl'.format(os.getenv('API_ENDPOINT'), task_info.get("id")),
|
|
json={
|
|
'accessKey': os.getenv('ACCESS_KEY'),
|
|
}, timeout=10)
|
|
req_span.set_attribute("http.status_code", response.status_code)
|
|
req_span.set_attribute("http.response", response.text)
|
|
response.raise_for_status()
|
|
except requests.RequestException as e:
|
|
span.set_attribute("api.error", str(e))
|
|
logger.error("请求失败!", e)
|
|
return False
|
|
data = response.json()
|
|
url = data.get('data', "")
|
|
logger.info("开始上传文件: %s 至 %s", task_info.get("id"), url)
|
|
return oss.upload_to_oss(url, ffmpeg_task.get_output_file())
|