Compare commits
5 Commits
6e4dbfd843
...
450240bd5a
Author | SHA1 | Date | |
---|---|---|---|
450240bd5a | |||
6b5975d8b9 | |||
85c2e7459e | |||
364ceb29a1 | |||
ced0c1ad1e |
@ -8,6 +8,7 @@ from entity.ffmpeg import FfmpegTask
|
|||||||
import logging
|
import logging
|
||||||
|
|
||||||
from util import ffmpeg, oss
|
from util import ffmpeg, oss
|
||||||
|
from util.ffmpeg import fade_out_audio
|
||||||
from telemetry import get_tracer
|
from telemetry import get_tracer
|
||||||
|
|
||||||
logger = logging.getLogger('biz/ffmpeg')
|
logger = logging.getLogger('biz/ffmpeg')
|
||||||
@ -15,10 +16,11 @@ logger = logging.getLogger('biz/ffmpeg')
|
|||||||
|
|
||||||
def parse_ffmpeg_task(task_info, template_info):
|
def parse_ffmpeg_task(task_info, template_info):
|
||||||
tracer = get_tracer(__name__)
|
tracer = get_tracer(__name__)
|
||||||
with tracer.start_as_current_span("parse_ffmpeg_task"):
|
with tracer.start_as_current_span("parse_ffmpeg_task") as span:
|
||||||
tasks = []
|
tasks = []
|
||||||
# 中间片段
|
# 中间片段
|
||||||
task_params_str = task_info.get("taskParams", "{}")
|
task_params_str = task_info.get("taskParams", "{}")
|
||||||
|
span.set_attribute("task_params", task_params_str)
|
||||||
task_params = json.loads(task_params_str)
|
task_params = json.loads(task_params_str)
|
||||||
for part in template_info.get("video_parts"):
|
for part in template_info.get("video_parts"):
|
||||||
source = parse_video(part.get('source'), task_params, template_info)
|
source = parse_video(part.get('source'), task_params, template_info)
|
||||||
@ -86,8 +88,8 @@ def parse_video(source, task_params, template_info):
|
|||||||
logger.debug("no video found for placeholder: " + placeholder_id)
|
logger.debug("no video found for placeholder: " + placeholder_id)
|
||||||
return None
|
return None
|
||||||
else:
|
else:
|
||||||
# TODO: Random Pick / Policy Pick
|
_pick_source = new_sources.pop(0)
|
||||||
new_sources = new_sources[0].get("url")
|
new_sources = _pick_source.get("url")
|
||||||
if new_sources.startswith("http"):
|
if new_sources.startswith("http"):
|
||||||
_, source_name = os.path.split(new_sources)
|
_, source_name = os.path.split(new_sources)
|
||||||
oss.download_from_oss(new_sources, source_name)
|
oss.download_from_oss(new_sources, source_name)
|
||||||
|
12
biz/task.py
12
biz/task.py
@ -2,7 +2,7 @@ import json
|
|||||||
|
|
||||||
from opentelemetry.trace import Status, StatusCode
|
from opentelemetry.trace import Status, StatusCode
|
||||||
|
|
||||||
from biz.ffmpeg import parse_ffmpeg_task, start_ffmpeg_task, clear_task_tmp_file, probe_video_info
|
from biz.ffmpeg import parse_ffmpeg_task, start_ffmpeg_task, clear_task_tmp_file, probe_video_info, fade_out_audio
|
||||||
from telemetry import get_tracer
|
from telemetry import get_tracer
|
||||||
from template import get_template_def
|
from template import get_template_def
|
||||||
from util import api
|
from util import api
|
||||||
@ -11,9 +11,10 @@ from util import api
|
|||||||
def start_task(task_info):
|
def start_task(task_info):
|
||||||
tracer = get_tracer(__name__)
|
tracer = get_tracer(__name__)
|
||||||
with tracer.start_as_current_span("start_task") as span:
|
with tracer.start_as_current_span("start_task") as span:
|
||||||
span.set_attribute("task.id", task_info)
|
|
||||||
task_info = api.normalize_task(task_info)
|
task_info = api.normalize_task(task_info)
|
||||||
span.set_attribute("task.info", json.dumps(task_info))
|
span.set_attribute("task", json.dumps(task_info))
|
||||||
|
span.set_attribute("scenicId", task_info.get("scenicId", "?"))
|
||||||
|
span.set_attribute("templateId", task_info.get("templateId"))
|
||||||
template_info = get_template_def(task_info.get("templateId"))
|
template_info = get_template_def(task_info.get("templateId"))
|
||||||
api.report_task_start(task_info)
|
api.report_task_start(task_info)
|
||||||
ffmpeg_task = parse_ffmpeg_task(task_info, template_info)
|
ffmpeg_task = parse_ffmpeg_task(task_info, template_info)
|
||||||
@ -21,12 +22,15 @@ def start_task(task_info):
|
|||||||
if not result:
|
if not result:
|
||||||
span.set_status(Status(StatusCode.ERROR))
|
span.set_status(Status(StatusCode.ERROR))
|
||||||
return api.report_task_failed(task_info)
|
return api.report_task_failed(task_info)
|
||||||
|
width, height, duration = probe_video_info(ffmpeg_task)
|
||||||
|
# 音频淡出
|
||||||
|
new_fn = fade_out_audio(ffmpeg_task.get_output_file(), duration)
|
||||||
|
ffmpeg_task.set_output_file(new_fn)
|
||||||
oss_result = api.upload_task_file(task_info, ffmpeg_task)
|
oss_result = api.upload_task_file(task_info, ffmpeg_task)
|
||||||
if not oss_result:
|
if not oss_result:
|
||||||
span.set_status(Status(StatusCode.ERROR))
|
span.set_status(Status(StatusCode.ERROR))
|
||||||
return api.report_task_failed(task_info)
|
return api.report_task_failed(task_info)
|
||||||
# 获取视频长度宽度和时长
|
# 获取视频长度宽度和时长
|
||||||
width, height, duration = probe_video_info(ffmpeg_task)
|
|
||||||
clear_task_tmp_file(ffmpeg_task)
|
clear_task_tmp_file(ffmpeg_task)
|
||||||
api.report_task_success(task_info, videoInfo={
|
api.report_task_success(task_info, videoInfo={
|
||||||
"width": width,
|
"width": width,
|
||||||
|
@ -23,7 +23,7 @@ def init_opentelemetry():
|
|||||||
|
|
||||||
# 使用HTTP协议上报
|
# 使用HTTP协议上报
|
||||||
span_processor = BatchSpanProcessor(OTLPSpanHttpExporter(
|
span_processor = BatchSpanProcessor(OTLPSpanHttpExporter(
|
||||||
endpoint="http://tracing-analysis-dc-sh.aliyuncs.com/adapt_e7qojqi4e0@aa79b4d367fb6b7_e7qojqi4e0@53df7ad2afe8301/api/otlp/traces",
|
endpoint="https://oltp.jerryyan.top/v1/traces",
|
||||||
))
|
))
|
||||||
|
|
||||||
trace_provider = TracerProvider(resource=resource, active_span_processor=span_processor)
|
trace_provider = TracerProvider(resource=resource, active_span_processor=span_processor)
|
||||||
|
64
util/api.py
64
util/api.py
@ -24,40 +24,32 @@ def sync_center():
|
|||||||
:return: 任务列表
|
:return: 任务列表
|
||||||
"""
|
"""
|
||||||
from template import TEMPLATES, download_template
|
from template import TEMPLATES, download_template
|
||||||
tracer = get_tracer(__name__)
|
try:
|
||||||
with tracer.start_as_current_span("sync_center"):
|
response = session.post(os.getenv('API_ENDPOINT') + "/sync", json={
|
||||||
with tracer.start_as_current_span("sync_center.request") as req_span:
|
'accessKey': os.getenv('ACCESS_KEY'),
|
||||||
try:
|
'clientStatus': util.system.get_sys_info(),
|
||||||
req_span.set_attribute("http.method", "POST")
|
'templateList': [{'id': t.get('id', ''), 'updateTime': t.get('updateTime', '')} for t in
|
||||||
req_span.set_attribute("http.url", os.getenv('API_ENDPOINT') + "/sync")
|
TEMPLATES.values()]
|
||||||
response = session.post(os.getenv('API_ENDPOINT') + "/sync", json={
|
}, timeout=10)
|
||||||
'accessKey': os.getenv('ACCESS_KEY'),
|
response.raise_for_status()
|
||||||
'clientStatus': util.system.get_sys_info(),
|
except requests.RequestException as e:
|
||||||
'templateList': [{'id': t.get('id', ''), 'updateTime': t.get('updateTime', '')} for t in
|
logger.error("请求失败!", e)
|
||||||
TEMPLATES.values()]
|
return []
|
||||||
}, timeout=10)
|
data = response.json()
|
||||||
req_span.set_attribute("http.status_code", response.status_code)
|
logger.debug("获取任务结果:【%s】", data)
|
||||||
req_span.set_attribute("http.response", response.text)
|
if data.get('code', 0) == 200:
|
||||||
response.raise_for_status()
|
templates = data.get('data', {}).get('templates', [])
|
||||||
except requests.RequestException as e:
|
tasks = data.get('data', {}).get('tasks', [])
|
||||||
req_span.set_attribute("http.error", str(e))
|
else:
|
||||||
logger.error("请求失败!", e)
|
tasks = []
|
||||||
return []
|
templates = []
|
||||||
data = response.json()
|
logger.warning("获取任务失败")
|
||||||
logger.debug("获取任务结果:【%s】", data)
|
for template in templates:
|
||||||
if data.get('code', 0) == 200:
|
template_id = template.get('id', '')
|
||||||
templates = data.get('data', {}).get('templates', [])
|
if template_id:
|
||||||
tasks = data.get('data', {}).get('tasks', [])
|
logger.info("更新模板:【%s】", template_id)
|
||||||
else:
|
download_template(template_id)
|
||||||
tasks = []
|
return tasks
|
||||||
templates = []
|
|
||||||
logger.warning("获取任务失败")
|
|
||||||
for template in templates:
|
|
||||||
template_id = template.get('id', '')
|
|
||||||
if template_id:
|
|
||||||
logger.info("更新模板:【%s】", template_id)
|
|
||||||
download_template(template_id)
|
|
||||||
return tasks
|
|
||||||
|
|
||||||
|
|
||||||
def get_template_info(template_id):
|
def get_template_info(template_id):
|
||||||
@ -189,7 +181,9 @@ def report_task_start(task_info):
|
|||||||
|
|
||||||
def report_task_failed(task_info, reason=''):
|
def report_task_failed(task_info, reason=''):
|
||||||
tracer = get_tracer(__name__)
|
tracer = get_tracer(__name__)
|
||||||
with tracer.start_as_current_span("report_task_failed"):
|
with tracer.start_as_current_span("report_task_failed") as span:
|
||||||
|
span.set_attribute("task_id", task_info.get("id"))
|
||||||
|
span.set_attribute("reason", reason)
|
||||||
with tracer.start_as_current_span("report_task_failed.request") as req_span:
|
with tracer.start_as_current_span("report_task_failed.request") as req_span:
|
||||||
try:
|
try:
|
||||||
req_span.set_attribute("http.method", "POST")
|
req_span.set_attribute("http.method", "POST")
|
||||||
|
@ -154,6 +154,43 @@ def probe_video_audio(video_file, type=None):
|
|||||||
return False
|
return False
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
# 音频淡出2秒
|
||||||
|
def fade_out_audio(file, duration, fade_out_sec = 2):
|
||||||
|
if type(duration) == str:
|
||||||
|
try:
|
||||||
|
duration = float(duration)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("duration is not float: %s", e)
|
||||||
|
return file
|
||||||
|
tracer = get_tracer(__name__)
|
||||||
|
with tracer.start_as_current_span("fade_out_audio") as span:
|
||||||
|
span.set_attribute("audio.file", file)
|
||||||
|
if duration <= fade_out_sec:
|
||||||
|
return file
|
||||||
|
else:
|
||||||
|
new_fn = file + "_.mp4"
|
||||||
|
if os.path.exists(new_fn):
|
||||||
|
os.remove(new_fn)
|
||||||
|
logger.info("delete tmp file: " + new_fn)
|
||||||
|
try:
|
||||||
|
process = subprocess.run(["ffmpeg", "-i", file, "-c:v", "copy", "-c:a", "aac", "-af", "afade=t=out:st=" + str(duration - fade_out_sec) + ":d=" + str(fade_out_sec), "-y", new_fn], **subprocess_args(True))
|
||||||
|
span.set_attribute("ffmpeg.args", json.dumps(process.args))
|
||||||
|
logger.info(" ".join(process.args))
|
||||||
|
if process.returncode != 0:
|
||||||
|
span.set_status(Status(StatusCode.ERROR))
|
||||||
|
logger.error("FFMPEG ERROR: %s", process.stderr)
|
||||||
|
return file
|
||||||
|
else:
|
||||||
|
span.set_status(Status(StatusCode.OK))
|
||||||
|
return new_fn
|
||||||
|
except Exception as e:
|
||||||
|
span.set_status(Status(StatusCode.ERROR))
|
||||||
|
logger.error("FFMPEG ERROR: %s", e)
|
||||||
|
return file
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
# Create a set of arguments which make a ``subprocess.Popen`` (and
|
# Create a set of arguments which make a ``subprocess.Popen`` (and
|
||||||
# variants) call work with or without Pyinstaller, ``--noconsole`` or
|
# variants) call work with or without Pyinstaller, ``--noconsole`` or
|
||||||
# not, on Windows and Linux. Typical use::
|
# not, on Windows and Linux. Typical use::
|
||||||
|
Loading…
x
Reference in New Issue
Block a user