Compare commits
11 Commits
0688fc6029
...
5fc948b9c5
Author | SHA1 | Date | |
---|---|---|---|
5fc948b9c5 | |||
6e4dbfd843 | |||
09e0f5f3be | |||
52c2df8b65 | |||
b25ad20ddd | |||
7c6e4a97b2 | |||
8f0e69c3de | |||
0da9fe9f7a | |||
b8db0d2b95 | |||
6dc7e86e8e | |||
c62f1ab976 |
@ -2,6 +2,8 @@ import json
|
|||||||
import os.path
|
import os.path
|
||||||
import time
|
import time
|
||||||
|
|
||||||
|
from opentelemetry.trace import Status, StatusCode
|
||||||
|
|
||||||
from entity.ffmpeg import FfmpegTask
|
from entity.ffmpeg import FfmpegTask
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
@ -108,13 +110,18 @@ def check_placeholder_exist(placeholder_id, task_params):
|
|||||||
|
|
||||||
def start_ffmpeg_task(ffmpeg_task):
|
def start_ffmpeg_task(ffmpeg_task):
|
||||||
tracer = get_tracer(__name__)
|
tracer = get_tracer(__name__)
|
||||||
with tracer.start_as_current_span("start_ffmpeg_task"):
|
with tracer.start_as_current_span("start_ffmpeg_task") as span:
|
||||||
for task in ffmpeg_task.analyze_input_render_tasks():
|
for task in ffmpeg_task.analyze_input_render_tasks():
|
||||||
result = start_ffmpeg_task(task)
|
result = start_ffmpeg_task(task)
|
||||||
if not result:
|
if not result:
|
||||||
return False
|
return False
|
||||||
ffmpeg_task.correct_task_type()
|
ffmpeg_task.correct_task_type()
|
||||||
return ffmpeg.start_render(ffmpeg_task)
|
result = ffmpeg.start_render(ffmpeg_task)
|
||||||
|
if not result:
|
||||||
|
span.set_status(Status(StatusCode.ERROR))
|
||||||
|
return False
|
||||||
|
span.set_status(Status(StatusCode.OK))
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
def clear_task_tmp_file(ffmpeg_task):
|
def clear_task_tmp_file(ffmpeg_task):
|
||||||
|
11
biz/task.py
11
biz/task.py
@ -1,3 +1,7 @@
|
|||||||
|
import json
|
||||||
|
|
||||||
|
from opentelemetry.trace import Status, StatusCode
|
||||||
|
|
||||||
from biz.ffmpeg import parse_ffmpeg_task, start_ffmpeg_task, clear_task_tmp_file, probe_video_info
|
from biz.ffmpeg import parse_ffmpeg_task, start_ffmpeg_task, clear_task_tmp_file, probe_video_info
|
||||||
from telemetry import get_tracer
|
from telemetry import get_tracer
|
||||||
from template import get_template_def
|
from template import get_template_def
|
||||||
@ -6,16 +10,20 @@ from util import api
|
|||||||
|
|
||||||
def start_task(task_info):
|
def start_task(task_info):
|
||||||
tracer = get_tracer(__name__)
|
tracer = get_tracer(__name__)
|
||||||
with tracer.start_as_current_span("start_task"):
|
with tracer.start_as_current_span("start_task") as span:
|
||||||
|
span.set_attribute("task.id", task_info)
|
||||||
task_info = api.normalize_task(task_info)
|
task_info = api.normalize_task(task_info)
|
||||||
|
span.set_attribute("task.info", json.dumps(task_info))
|
||||||
template_info = get_template_def(task_info.get("templateId"))
|
template_info = get_template_def(task_info.get("templateId"))
|
||||||
api.report_task_start(task_info)
|
api.report_task_start(task_info)
|
||||||
ffmpeg_task = parse_ffmpeg_task(task_info, template_info)
|
ffmpeg_task = parse_ffmpeg_task(task_info, template_info)
|
||||||
result = start_ffmpeg_task(ffmpeg_task)
|
result = start_ffmpeg_task(ffmpeg_task)
|
||||||
if not result:
|
if not result:
|
||||||
|
span.set_status(Status(StatusCode.ERROR))
|
||||||
return api.report_task_failed(task_info)
|
return api.report_task_failed(task_info)
|
||||||
oss_result = api.upload_task_file(task_info, ffmpeg_task)
|
oss_result = api.upload_task_file(task_info, ffmpeg_task)
|
||||||
if not oss_result:
|
if not oss_result:
|
||||||
|
span.set_status(Status(StatusCode.ERROR))
|
||||||
return api.report_task_failed(task_info)
|
return api.report_task_failed(task_info)
|
||||||
# 获取视频长度宽度和时长
|
# 获取视频长度宽度和时长
|
||||||
width, height, duration = probe_video_info(ffmpeg_task)
|
width, height, duration = probe_video_info(ffmpeg_task)
|
||||||
@ -25,3 +33,4 @@ def start_task(task_info):
|
|||||||
"height": height,
|
"height": height,
|
||||||
"duration": duration
|
"duration": duration
|
||||||
})
|
})
|
||||||
|
span.set_status(Status(StatusCode.OK))
|
||||||
|
@ -141,7 +141,7 @@ class FfmpegTask(object):
|
|||||||
return False
|
return False
|
||||||
if self.speed != 1:
|
if self.speed != 1:
|
||||||
return False
|
return False
|
||||||
if len(self.audios) > 1:
|
if len(self.audios) >= 1:
|
||||||
return False
|
return False
|
||||||
if len(self.input_file) > 1:
|
if len(self.input_file) > 1:
|
||||||
return False
|
return False
|
||||||
@ -220,6 +220,15 @@ class FfmpegTask(object):
|
|||||||
# filter_args.append(f"{_end_out_str}{_start_out_str}overlay=eof_action=pass{video_output_str}")
|
# filter_args.append(f"{_end_out_str}{_start_out_str}overlay=eof_action=pass{video_output_str}")
|
||||||
filter_args.append(f"{_start_out_str}{_mid_out_str}{_end_out_str}concat=n=3:v=1:a=0,setpts=N/{self.frame_rate}/TB{video_output_str}")
|
filter_args.append(f"{_start_out_str}{_mid_out_str}{_end_out_str}concat=n=3:v=1:a=0,setpts=N/{self.frame_rate}/TB{video_output_str}")
|
||||||
effect_index += 1
|
effect_index += 1
|
||||||
|
elif effect.startswith("ospeed:"):
|
||||||
|
param = effect.split(":", 2)[1]
|
||||||
|
if param == '':
|
||||||
|
param = "1"
|
||||||
|
if param != "1":
|
||||||
|
# 视频变速
|
||||||
|
effect_index += 1
|
||||||
|
filter_args.append(f"{video_output_str}setpts={param}*PTS[v_eff{effect_index}]")
|
||||||
|
video_output_str = f"[v_eff{effect_index}]"
|
||||||
elif effect.startswith("zoom:"):
|
elif effect.startswith("zoom:"):
|
||||||
...
|
...
|
||||||
...
|
...
|
||||||
@ -268,15 +277,25 @@ class FfmpegTask(object):
|
|||||||
audio_output_str = ""
|
audio_output_str = ""
|
||||||
audio_track_index = 0
|
audio_track_index = 0
|
||||||
# output_args
|
# output_args
|
||||||
_tmp_file = "tmp_concat_"+str(time.time())+".txt"
|
if len(self.input_file) == 1:
|
||||||
|
_file = self.input_file[0]
|
||||||
|
from util.ffmpeg import probe_video_audio
|
||||||
|
if type(_file) is str:
|
||||||
|
input_args += ["-i", _file]
|
||||||
|
self.mute = not probe_video_audio(_file)
|
||||||
|
elif isinstance(_file, FfmpegTask):
|
||||||
|
input_args += ["-i", _file.get_output_file()]
|
||||||
|
self.mute = not probe_video_audio(_file.get_output_file())
|
||||||
|
else:
|
||||||
|
_tmp_file = "tmp_concat_" + str(time.time()) + ".txt"
|
||||||
|
from util.ffmpeg import probe_video_audio
|
||||||
with open(_tmp_file, "w", encoding="utf-8") as f:
|
with open(_tmp_file, "w", encoding="utf-8") as f:
|
||||||
for input_file in self.input_file:
|
for input_file in self.input_file:
|
||||||
if type(input_file) is str:
|
if type(input_file) is str:
|
||||||
f.write("file '"+input_file+"'\n")
|
f.write("file '" + input_file + "'\n")
|
||||||
elif isinstance(input_file, FfmpegTask):
|
elif isinstance(input_file, FfmpegTask):
|
||||||
f.write("file '" + input_file.get_output_file() + "'\n")
|
f.write("file '" + input_file.get_output_file() + "'\n")
|
||||||
input_args += ["-f", "concat", "-safe", "0", "-i", _tmp_file]
|
input_args += ["-f", "concat", "-safe", "0", "-i", _tmp_file]
|
||||||
from util.ffmpeg import probe_video_audio
|
|
||||||
self.mute = not probe_video_audio(_tmp_file, "concat")
|
self.mute = not probe_video_audio(_tmp_file, "concat")
|
||||||
output_args.append("-map")
|
output_args.append("-map")
|
||||||
output_args.append("0:v")
|
output_args.append("0:v")
|
||||||
@ -304,8 +323,13 @@ class FfmpegTask(object):
|
|||||||
else:
|
else:
|
||||||
output_args.append(audio_output_str)
|
output_args.append(audio_output_str)
|
||||||
output_args += AUDIO_ARGS
|
output_args += AUDIO_ARGS
|
||||||
|
if self.annexb:
|
||||||
|
output_args.append("-bsf:v")
|
||||||
|
output_args.append("h264_mp4toannexb")
|
||||||
|
output_args.append("-bsf:a")
|
||||||
|
output_args.append("setts=pts=DTS")
|
||||||
output_args.append("-f")
|
output_args.append("-f")
|
||||||
output_args.append("mp4")
|
output_args.append("mpegts" if self.annexb else "mp4")
|
||||||
_filter_args = [] if len(filter_args) == 0 else ["-filter_complex", ";".join(filter_args)]
|
_filter_args = [] if len(filter_args) == 0 else ["-filter_complex", ";".join(filter_args)]
|
||||||
return args + input_args + _filter_args + output_args + [self.get_output_file()]
|
return args + input_args + _filter_args + output_args + [self.get_output_file()]
|
||||||
elif self.task_type == 'copy':
|
elif self.task_type == 'copy':
|
||||||
|
49
util/api.py
49
util/api.py
@ -3,9 +3,11 @@ import logging
|
|||||||
import os
|
import os
|
||||||
|
|
||||||
import requests
|
import requests
|
||||||
|
from opentelemetry.trace import Status, StatusCode
|
||||||
|
|
||||||
import util.system
|
import util.system
|
||||||
from telemetry import get_tracer
|
from telemetry import get_tracer
|
||||||
|
from util import oss
|
||||||
|
|
||||||
session = requests.Session()
|
session = requests.Session()
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
@ -24,7 +26,7 @@ def sync_center():
|
|||||||
from template import TEMPLATES, download_template
|
from template import TEMPLATES, download_template
|
||||||
tracer = get_tracer(__name__)
|
tracer = get_tracer(__name__)
|
||||||
with tracer.start_as_current_span("sync_center"):
|
with tracer.start_as_current_span("sync_center"):
|
||||||
with get_tracer("api").start_as_current_span("sync_center.request") as req_span:
|
with tracer.start_as_current_span("sync_center.request") as req_span:
|
||||||
try:
|
try:
|
||||||
req_span.set_attribute("http.method", "POST")
|
req_span.set_attribute("http.method", "POST")
|
||||||
req_span.set_attribute("http.url", os.getenv('API_ENDPOINT') + "/sync")
|
req_span.set_attribute("http.url", os.getenv('API_ENDPOINT') + "/sync")
|
||||||
@ -35,10 +37,10 @@ def sync_center():
|
|||||||
TEMPLATES.values()]
|
TEMPLATES.values()]
|
||||||
}, timeout=10)
|
}, timeout=10)
|
||||||
req_span.set_attribute("http.status_code", response.status_code)
|
req_span.set_attribute("http.status_code", response.status_code)
|
||||||
|
req_span.set_attribute("http.response", response.text)
|
||||||
response.raise_for_status()
|
response.raise_for_status()
|
||||||
req_span.set_attribute("api.response", response.text)
|
|
||||||
except requests.RequestException as e:
|
except requests.RequestException as e:
|
||||||
req_span.set_attribute("api.error", str(e))
|
req_span.set_attribute("http.error", str(e))
|
||||||
logger.error("请求失败!", e)
|
logger.error("请求失败!", e)
|
||||||
return []
|
return []
|
||||||
data = response.json()
|
data = response.json()
|
||||||
@ -68,7 +70,7 @@ def get_template_info(template_id):
|
|||||||
"""
|
"""
|
||||||
tracer = get_tracer(__name__)
|
tracer = get_tracer(__name__)
|
||||||
with tracer.start_as_current_span("get_template_info"):
|
with tracer.start_as_current_span("get_template_info"):
|
||||||
with get_tracer("api").start_as_current_span("get_template_info.request") as req_span:
|
with tracer.start_as_current_span("get_template_info.request") as req_span:
|
||||||
try:
|
try:
|
||||||
req_span.set_attribute("http.method", "POST")
|
req_span.set_attribute("http.method", "POST")
|
||||||
req_span.set_attribute("http.url", '{0}/template/{1}'.format(os.getenv('API_ENDPOINT'), template_id))
|
req_span.set_attribute("http.url", '{0}/template/{1}'.format(os.getenv('API_ENDPOINT'), template_id))
|
||||||
@ -76,8 +78,8 @@ def get_template_info(template_id):
|
|||||||
'accessKey': os.getenv('ACCESS_KEY'),
|
'accessKey': os.getenv('ACCESS_KEY'),
|
||||||
}, timeout=10)
|
}, timeout=10)
|
||||||
req_span.set_attribute("http.status_code", response.status_code)
|
req_span.set_attribute("http.status_code", response.status_code)
|
||||||
|
req_span.set_attribute("http.response", response.text)
|
||||||
response.raise_for_status()
|
response.raise_for_status()
|
||||||
req_span.set_attribute("api.response", response.text)
|
|
||||||
except requests.RequestException as e:
|
except requests.RequestException as e:
|
||||||
req_span.set_attribute("api.error", str(e))
|
req_span.set_attribute("api.error", str(e))
|
||||||
logger.error("请求失败!", e)
|
logger.error("请求失败!", e)
|
||||||
@ -145,7 +147,7 @@ def get_template_info(template_id):
|
|||||||
def report_task_success(task_info, **kwargs):
|
def report_task_success(task_info, **kwargs):
|
||||||
tracer = get_tracer(__name__)
|
tracer = get_tracer(__name__)
|
||||||
with tracer.start_as_current_span("report_task_success"):
|
with tracer.start_as_current_span("report_task_success"):
|
||||||
with get_tracer("api").start_as_current_span("report_task_success.request") as req_span:
|
with tracer.start_as_current_span("report_task_success.request") as req_span:
|
||||||
try:
|
try:
|
||||||
req_span.set_attribute("http.method", "POST")
|
req_span.set_attribute("http.method", "POST")
|
||||||
req_span.set_attribute("http.url",
|
req_span.set_attribute("http.url",
|
||||||
@ -155,8 +157,9 @@ def report_task_success(task_info, **kwargs):
|
|||||||
**kwargs
|
**kwargs
|
||||||
}, timeout=10)
|
}, timeout=10)
|
||||||
req_span.set_attribute("http.status_code", response.status_code)
|
req_span.set_attribute("http.status_code", response.status_code)
|
||||||
|
req_span.set_attribute("http.response", response.text)
|
||||||
response.raise_for_status()
|
response.raise_for_status()
|
||||||
req_span.set_attribute("api.response", response.text)
|
req_span.set_status(Status(StatusCode.OK))
|
||||||
except requests.RequestException as e:
|
except requests.RequestException as e:
|
||||||
req_span.set_attribute("api.error", str(e))
|
req_span.set_attribute("api.error", str(e))
|
||||||
logger.error("请求失败!", e)
|
logger.error("请求失败!", e)
|
||||||
@ -166,7 +169,7 @@ def report_task_success(task_info, **kwargs):
|
|||||||
def report_task_start(task_info):
|
def report_task_start(task_info):
|
||||||
tracer = get_tracer(__name__)
|
tracer = get_tracer(__name__)
|
||||||
with tracer.start_as_current_span("report_task_start"):
|
with tracer.start_as_current_span("report_task_start"):
|
||||||
with get_tracer("api").start_as_current_span("report_task_start.request") as req_span:
|
with tracer.start_as_current_span("report_task_start.request") as req_span:
|
||||||
try:
|
try:
|
||||||
req_span.set_attribute("http.method", "POST")
|
req_span.set_attribute("http.method", "POST")
|
||||||
req_span.set_attribute("http.url",
|
req_span.set_attribute("http.url",
|
||||||
@ -175,8 +178,9 @@ def report_task_start(task_info):
|
|||||||
'accessKey': os.getenv('ACCESS_KEY'),
|
'accessKey': os.getenv('ACCESS_KEY'),
|
||||||
}, timeout=10)
|
}, timeout=10)
|
||||||
req_span.set_attribute("http.status_code", response.status_code)
|
req_span.set_attribute("http.status_code", response.status_code)
|
||||||
|
req_span.set_attribute("http.response", response.text)
|
||||||
response.raise_for_status()
|
response.raise_for_status()
|
||||||
req_span.set_attribute("api.response", response.text)
|
req_span.set_status(Status(StatusCode.OK))
|
||||||
except requests.RequestException as e:
|
except requests.RequestException as e:
|
||||||
req_span.set_attribute("api.error", str(e))
|
req_span.set_attribute("api.error", str(e))
|
||||||
logger.error("请求失败!", e)
|
logger.error("请求失败!", e)
|
||||||
@ -196,21 +200,23 @@ def report_task_failed(task_info, reason=''):
|
|||||||
'reason': reason
|
'reason': reason
|
||||||
}, timeout=10)
|
}, timeout=10)
|
||||||
req_span.set_attribute("http.status_code", response.status_code)
|
req_span.set_attribute("http.status_code", response.status_code)
|
||||||
|
req_span.set_attribute("http.response", response.text)
|
||||||
response.raise_for_status()
|
response.raise_for_status()
|
||||||
req_span.set_attribute("api.response", response.text)
|
req_span.set_status(Status(StatusCode.OK))
|
||||||
except requests.RequestException as e:
|
except requests.RequestException as e:
|
||||||
req_span.set_attribute("api.error", str(e))
|
req_span.set_attribute("api.error", str(e))
|
||||||
|
req_span.set_status(Status(StatusCode.ERROR))
|
||||||
logger.error("请求失败!", e)
|
logger.error("请求失败!", e)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
def upload_task_file(task_info, ffmpeg_task):
|
def upload_task_file(task_info, ffmpeg_task):
|
||||||
tracer = get_tracer(__name__)
|
tracer = get_tracer(__name__)
|
||||||
with tracer.start_as_current_span("upload_task_file") as span:
|
with get_tracer("api").start_as_current_span("upload_task_file") as span:
|
||||||
logger.info("开始上传文件: %s", task_info.get("id"))
|
logger.info("开始上传文件: %s", task_info.get("id"))
|
||||||
span.set_attribute("file.id", task_info.get("id"))
|
span.set_attribute("file.id", task_info.get("id"))
|
||||||
try:
|
|
||||||
with tracer.start_as_current_span("upload_task_file.request_upload_url") as req_span:
|
with tracer.start_as_current_span("upload_task_file.request_upload_url") as req_span:
|
||||||
|
try:
|
||||||
req_span.set_attribute("http.method", "POST")
|
req_span.set_attribute("http.method", "POST")
|
||||||
req_span.set_attribute("http.url",
|
req_span.set_attribute("http.url",
|
||||||
'{0}/{1}/uploadUrl'.format(os.getenv('API_ENDPOINT'), task_info.get("id")))
|
'{0}/{1}/uploadUrl'.format(os.getenv('API_ENDPOINT'), task_info.get("id")))
|
||||||
@ -218,25 +224,16 @@ def upload_task_file(task_info, ffmpeg_task):
|
|||||||
json={
|
json={
|
||||||
'accessKey': os.getenv('ACCESS_KEY'),
|
'accessKey': os.getenv('ACCESS_KEY'),
|
||||||
}, timeout=10)
|
}, timeout=10)
|
||||||
response.raise_for_status()
|
|
||||||
req_span.set_attribute("http.status_code", response.status_code)
|
req_span.set_attribute("http.status_code", response.status_code)
|
||||||
|
req_span.set_attribute("http.response", response.text)
|
||||||
|
response.raise_for_status()
|
||||||
|
req_span.set_status(Status(StatusCode.OK))
|
||||||
except requests.RequestException as e:
|
except requests.RequestException as e:
|
||||||
span.set_attribute("api.error", str(e))
|
span.set_attribute("api.error", str(e))
|
||||||
|
req_span.set_status(Status(StatusCode.ERROR))
|
||||||
logger.error("请求失败!", e)
|
logger.error("请求失败!", e)
|
||||||
return False
|
return False
|
||||||
data = response.json()
|
data = response.json()
|
||||||
url = data.get('data', "")
|
url = data.get('data', "")
|
||||||
logger.info("开始上传文件: %s 至 %s", task_info.get("id"), url)
|
logger.info("开始上传文件: %s 至 %s", task_info.get("id"), url)
|
||||||
try:
|
return oss.upload_to_oss(url, ffmpeg_task.get_output_file())
|
||||||
with tracer.start_as_current_span("upload_task_file.start_upload_file") as upload_span:
|
|
||||||
upload_span.set_attribute("http.method", "PUT")
|
|
||||||
upload_span.set_attribute("http.url", url)
|
|
||||||
with open(ffmpeg_task.get_output_file(), 'rb') as f:
|
|
||||||
requests.put(url, data=f, headers={"Content-Type": "video/mp4"})
|
|
||||||
except requests.RequestException as e:
|
|
||||||
span.set_attribute("api.error", str(e))
|
|
||||||
logger.error("上传失败!", e)
|
|
||||||
return False
|
|
||||||
finally:
|
|
||||||
logger.info("上传文件结束: %s", task_info.get("id"))
|
|
||||||
return True
|
|
||||||
|
@ -5,77 +5,64 @@ import subprocess
|
|||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from typing import Optional, IO
|
from typing import Optional, IO
|
||||||
|
|
||||||
|
from opentelemetry.trace import Status, StatusCode
|
||||||
|
|
||||||
from entity.ffmpeg import FfmpegTask, ENCODER_ARGS, VIDEO_ARGS, AUDIO_ARGS, MUTE_AUDIO_INPUT
|
from entity.ffmpeg import FfmpegTask, ENCODER_ARGS, VIDEO_ARGS, AUDIO_ARGS, MUTE_AUDIO_INPUT
|
||||||
from telemetry import get_tracer
|
from telemetry import get_tracer
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
def to_annexb(file):
|
|
||||||
with get_tracer("ffmpeg").start_as_current_span("to_annexb") as span:
|
|
||||||
span.set_attribute("file.path", file)
|
|
||||||
if not os.path.exists(file):
|
|
||||||
return file
|
|
||||||
logger.info("ToAnnexb: %s", file)
|
|
||||||
ffmpeg_process = subprocess.run(["ffmpeg", "-y", "-hide_banner", "-i", file,
|
|
||||||
*MUTE_AUDIO_INPUT,
|
|
||||||
"-map", "0:v", "-map", "1:a",
|
|
||||||
"-c:v", "copy", "-bsf:v", "h264_mp4toannexb", "-shortest",
|
|
||||||
"-c:a", "aac", "-b:a", "128k", "-ar", "48000", "-ac", "2",
|
|
||||||
"-f", "mpegts", file+".ts"])
|
|
||||||
span.set_attribute("ffmpeg.args", json.dumps(ffmpeg_process.args))
|
|
||||||
logger.info("ToAnnexb: %s, returned: %s", file, ffmpeg_process.returncode)
|
|
||||||
span.set_attribute("ffmpeg.code", ffmpeg_process.returncode)
|
|
||||||
if ffmpeg_process.returncode == 0:
|
|
||||||
span.set_attribute("file.size", os.path.getsize(file+".ts"))
|
|
||||||
os.remove(file)
|
|
||||||
return file+".ts"
|
|
||||||
else:
|
|
||||||
return file
|
|
||||||
|
|
||||||
def re_encode_and_annexb(file):
|
def re_encode_and_annexb(file):
|
||||||
with get_tracer("ffmpeg").start_as_current_span("re_encode_and_annexb") as span:
|
with get_tracer("ffmpeg").start_as_current_span("re_encode_and_annexb") as span:
|
||||||
span.set_attribute("file.path", file)
|
span.set_attribute("file.path", file)
|
||||||
if not os.path.exists(file):
|
if not os.path.exists(file):
|
||||||
|
span.set_status(Status(StatusCode.ERROR))
|
||||||
return file
|
return file
|
||||||
logger.info("ReEncodeAndAnnexb: %s", file)
|
logger.info("ReEncodeAndAnnexb: %s", file)
|
||||||
ffmpeg_process = subprocess.run(["ffmpeg", "-y", "-hide_banner", "-i", file,
|
has_audio = not not probe_video_audio(file)
|
||||||
*MUTE_AUDIO_INPUT,
|
ffmpeg_process = subprocess.run(["ffmpeg", "-y", "-hide_banner", "-vsync", "cfr", "-i", file,
|
||||||
"-map", "0:v", "-map", "1:a",
|
*(set() if has_audio else MUTE_AUDIO_INPUT),
|
||||||
|
"-map", "0:v", "-map", "0:a" if has_audio else "1:a",
|
||||||
*VIDEO_ARGS, "-bsf:v", "h264_mp4toannexb",
|
*VIDEO_ARGS, "-bsf:v", "h264_mp4toannexb",
|
||||||
*AUDIO_ARGS, "-bsf:a", "setts=pts=DTS",
|
*AUDIO_ARGS, "-bsf:a", "setts=pts=DTS",
|
||||||
*ENCODER_ARGS, "-shortest",
|
*ENCODER_ARGS, "-shortest", "-fflags", "+genpts",
|
||||||
"-f", "mpegts", file + ".ts"])
|
"-f", "mpegts", file + ".ts"])
|
||||||
|
logger.info(" ".join(ffmpeg_process.args))
|
||||||
span.set_attribute("ffmpeg.args", json.dumps(ffmpeg_process.args))
|
span.set_attribute("ffmpeg.args", json.dumps(ffmpeg_process.args))
|
||||||
logger.info("ReEncodeAndAnnexb: %s, returned: %s", file, ffmpeg_process.returncode)
|
logger.info("ReEncodeAndAnnexb: %s, returned: %s", file, ffmpeg_process.returncode)
|
||||||
span.set_attribute("ffmpeg.code", ffmpeg_process.returncode)
|
span.set_attribute("ffmpeg.code", ffmpeg_process.returncode)
|
||||||
if ffmpeg_process.returncode == 0:
|
if ffmpeg_process.returncode == 0:
|
||||||
|
span.set_status(Status(StatusCode.OK))
|
||||||
span.set_attribute("file.size", os.path.getsize(file+".ts"))
|
span.set_attribute("file.size", os.path.getsize(file+".ts"))
|
||||||
os.remove(file)
|
# os.remove(file)
|
||||||
return file+".ts"
|
return file+".ts"
|
||||||
else:
|
else:
|
||||||
|
span.set_status(Status(StatusCode.ERROR))
|
||||||
return file
|
return file
|
||||||
|
|
||||||
def start_render(ffmpeg_task: FfmpegTask):
|
def start_render(ffmpeg_task: FfmpegTask):
|
||||||
tracer = get_tracer(__name__)
|
tracer = get_tracer(__name__)
|
||||||
with tracer.start_as_current_span("start_render") as span:
|
with tracer.start_as_current_span("start_render") as span:
|
||||||
span.set_attribute("ffmpeg.task", str(ffmpeg_task))
|
span.set_attribute("ffmpeg.task", str(ffmpeg_task))
|
||||||
logger.info(ffmpeg_task)
|
|
||||||
if not ffmpeg_task.need_run():
|
if not ffmpeg_task.need_run():
|
||||||
ffmpeg_task.set_output_file(ffmpeg_task.input_file[0])
|
ffmpeg_task.set_output_file(ffmpeg_task.input_file[0])
|
||||||
|
span.set_status(Status(StatusCode.OK))
|
||||||
return True
|
return True
|
||||||
ffmpeg_args = ffmpeg_task.get_ffmpeg_args()
|
ffmpeg_args = ffmpeg_task.get_ffmpeg_args()
|
||||||
if len(ffmpeg_args) == 0:
|
if len(ffmpeg_args) == 0:
|
||||||
ffmpeg_task.set_output_file(ffmpeg_task.input_file[0])
|
ffmpeg_task.set_output_file(ffmpeg_task.input_file[0])
|
||||||
|
span.set_status(Status(StatusCode.OK))
|
||||||
return True
|
return True
|
||||||
ffmpeg_process = subprocess.run(["ffmpeg", "-progress", "-", "-loglevel", "error", *ffmpeg_args], **subprocess_args(True))
|
ffmpeg_process = subprocess.run(["ffmpeg", "-progress", "-", "-loglevel", "error", *ffmpeg_args], **subprocess_args(True))
|
||||||
span.set_attribute("ffmpeg.args", json.dumps(ffmpeg_process.args))
|
span.set_attribute("ffmpeg.args", json.dumps(ffmpeg_process.args))
|
||||||
logger.info(ffmpeg_process.args)
|
logger.info(" ".join(ffmpeg_process.args))
|
||||||
ffmpeg_final_out = handle_ffmpeg_output(ffmpeg_process.stdout)
|
ffmpeg_final_out = handle_ffmpeg_output(ffmpeg_process.stdout)
|
||||||
span.set_attribute("ffmpeg.out", ffmpeg_final_out)
|
span.set_attribute("ffmpeg.out", ffmpeg_final_out)
|
||||||
logger.info("FINISH TASK, OUTPUT IS %s", ffmpeg_final_out)
|
logger.info("FINISH TASK, OUTPUT IS %s", ffmpeg_final_out)
|
||||||
code = ffmpeg_process.returncode
|
code = ffmpeg_process.returncode
|
||||||
span.set_attribute("ffmpeg.code", code)
|
span.set_attribute("ffmpeg.code", code)
|
||||||
if code != 0:
|
if code != 0:
|
||||||
|
span.set_status(Status(StatusCode.ERROR))
|
||||||
logger.error("FFMPEG ERROR: %s", ffmpeg_process.stderr)
|
logger.error("FFMPEG ERROR: %s", ffmpeg_process.stderr)
|
||||||
return False
|
return False
|
||||||
span.set_attribute("ffmpeg.out_file", ffmpeg_task.output_file)
|
span.set_attribute("ffmpeg.out_file", ffmpeg_task.output_file)
|
||||||
@ -83,12 +70,15 @@ def start_render(ffmpeg_task: FfmpegTask):
|
|||||||
file_size = os.path.getsize(ffmpeg_task.output_file)
|
file_size = os.path.getsize(ffmpeg_task.output_file)
|
||||||
span.set_attribute("file.size", file_size)
|
span.set_attribute("file.size", file_size)
|
||||||
if file_size < 4096:
|
if file_size < 4096:
|
||||||
|
span.set_status(Status(StatusCode.ERROR))
|
||||||
logger.error("FFMPEG ERROR: OUTPUT FILE IS TOO SMALL")
|
logger.error("FFMPEG ERROR: OUTPUT FILE IS TOO SMALL")
|
||||||
return False
|
return False
|
||||||
except OSError:
|
except OSError:
|
||||||
span.set_attribute("file.size", 0)
|
span.set_attribute("file.size", 0)
|
||||||
|
span.set_status(Status(StatusCode.ERROR))
|
||||||
logger.error("FFMPEG ERROR: OUTPUT FILE NOT FOUND")
|
logger.error("FFMPEG ERROR: OUTPUT FILE NOT FOUND")
|
||||||
return False
|
return False
|
||||||
|
span.set_status(Status(StatusCode.OK))
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def handle_ffmpeg_output(stdout: Optional[bytes]) -> str:
|
def handle_ffmpeg_output(stdout: Optional[bytes]) -> str:
|
||||||
@ -129,9 +119,14 @@ def probe_video_info(video_file):
|
|||||||
span.set_attribute("ffprobe.args", json.dumps(result.args))
|
span.set_attribute("ffprobe.args", json.dumps(result.args))
|
||||||
span.set_attribute("ffprobe.code", result.returncode)
|
span.set_attribute("ffprobe.code", result.returncode)
|
||||||
if result.returncode != 0:
|
if result.returncode != 0:
|
||||||
|
span.set_status(Status(StatusCode.ERROR))
|
||||||
return 0, 0, 0
|
return 0, 0, 0
|
||||||
all_result = result.stdout.decode('utf-8').strip()
|
all_result = result.stdout.decode('utf-8').strip()
|
||||||
span.set_attribute("ffprobe.out", all_result)
|
span.set_attribute("ffprobe.out", all_result)
|
||||||
|
if all_result == '':
|
||||||
|
span.set_status(Status(StatusCode.ERROR))
|
||||||
|
return 0, 0, 0
|
||||||
|
span.set_status(Status(StatusCode.OK))
|
||||||
wh, duration = all_result.split('\n')
|
wh, duration = all_result.split('\n')
|
||||||
width, height = wh.strip().split('x')
|
width, height = wh.strip().split('x')
|
||||||
return int(width), int(height), float(duration)
|
return int(width), int(height), float(duration)
|
||||||
@ -148,6 +143,7 @@ def probe_video_audio(video_file, type=None):
|
|||||||
args.append("-f")
|
args.append("-f")
|
||||||
args.append("concat")
|
args.append("concat")
|
||||||
args.append(video_file)
|
args.append(video_file)
|
||||||
|
logger.info(" ".join(args))
|
||||||
result = subprocess.run(args, stderr=subprocess.STDOUT, **subprocess_args(True))
|
result = subprocess.run(args, stderr=subprocess.STDOUT, **subprocess_args(True))
|
||||||
span.set_attribute("ffprobe.args", json.dumps(result.args))
|
span.set_attribute("ffprobe.args", json.dumps(result.args))
|
||||||
span.set_attribute("ffprobe.code", result.returncode)
|
span.set_attribute("ffprobe.code", result.returncode)
|
||||||
|
23
util/oss.py
23
util/oss.py
@ -2,6 +2,7 @@ import logging
|
|||||||
import os
|
import os
|
||||||
|
|
||||||
import requests
|
import requests
|
||||||
|
from opentelemetry.trace import Status, StatusCode
|
||||||
|
|
||||||
from telemetry import get_tracer
|
from telemetry import get_tracer
|
||||||
|
|
||||||
@ -19,6 +20,7 @@ def upload_to_oss(url, file_path):
|
|||||||
with tracer.start_as_current_span("upload_to_oss") as span:
|
with tracer.start_as_current_span("upload_to_oss") as span:
|
||||||
span.set_attribute("file.url", url)
|
span.set_attribute("file.url", url)
|
||||||
span.set_attribute("file.path", file_path)
|
span.set_attribute("file.path", file_path)
|
||||||
|
span.set_attribute("file.size", os.path.getsize(file_path))
|
||||||
max_retries = 5
|
max_retries = 5
|
||||||
retries = 0
|
retries = 0
|
||||||
while retries < max_retries:
|
while retries < max_retries:
|
||||||
@ -28,18 +30,24 @@ def upload_to_oss(url, file_path):
|
|||||||
req_span.set_attribute("http.method", "PUT")
|
req_span.set_attribute("http.method", "PUT")
|
||||||
req_span.set_attribute("http.url", url)
|
req_span.set_attribute("http.url", url)
|
||||||
with open(file_path, 'rb') as f:
|
with open(file_path, 'rb') as f:
|
||||||
response = requests.put(url, data=f, timeout=60) # 设置超时时间为1分钟
|
response = requests.put(url, data=f, timeout=60, headers={"Content-Type": "video/mp4"})
|
||||||
req_span.set_attribute("http.status_code", response.status_code)
|
req_span.set_attribute("http.status_code", response.status_code)
|
||||||
if response.status_code == 200:
|
req_span.set_attribute("http.response", response.text)
|
||||||
|
response.raise_for_status()
|
||||||
|
req_span.set_status(Status(StatusCode.OK))
|
||||||
|
span.set_status(Status(StatusCode.OK))
|
||||||
return True
|
return True
|
||||||
except requests.exceptions.Timeout:
|
except requests.exceptions.Timeout:
|
||||||
req_span.set_attribute("http.error", "Timeout")
|
req_span.set_attribute("http.error", "Timeout")
|
||||||
|
req_span.set_status(Status(StatusCode.ERROR))
|
||||||
retries += 1
|
retries += 1
|
||||||
logger.warning(f"Upload timed out. Retrying {retries}/{max_retries}...")
|
logger.warning(f"Upload timed out. Retrying {retries}/{max_retries}...")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
req_span.set_attribute("http.error", str(e))
|
req_span.set_attribute("http.error", str(e))
|
||||||
|
req_span.set_status(Status(StatusCode.ERROR))
|
||||||
retries += 1
|
retries += 1
|
||||||
logger.warning(f"Upload failed. Retrying {retries}/{max_retries}...")
|
logger.warning(f"Upload failed. Retrying {retries}/{max_retries}...")
|
||||||
|
span.set_status(Status(StatusCode.ERROR))
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
@ -71,14 +79,19 @@ def download_from_oss(url, file_path):
|
|||||||
req_span.set_attribute("http.status_code", response.status_code)
|
req_span.set_attribute("http.status_code", response.status_code)
|
||||||
with open(file_path, 'wb') as f:
|
with open(file_path, 'wb') as f:
|
||||||
f.write(response.content)
|
f.write(response.content)
|
||||||
span.set_attribute("file.size", os.path.getsize(file_path))
|
req_span.set_attribute("file.size", os.path.getsize(file_path))
|
||||||
|
req_span.set_status(Status(StatusCode.OK))
|
||||||
|
span.set_status(Status(StatusCode.OK))
|
||||||
return True
|
return True
|
||||||
except requests.exceptions.Timeout:
|
except requests.exceptions.Timeout:
|
||||||
span.set_attribute("http.error", "Timeout")
|
req_span.set_attribute("http.error", "Timeout")
|
||||||
|
req_span.set_status(Status(StatusCode.ERROR))
|
||||||
retries += 1
|
retries += 1
|
||||||
logger.warning(f"Download timed out. Retrying {retries}/{max_retries}...")
|
logger.warning(f"Download timed out. Retrying {retries}/{max_retries}...")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
span.set_attribute("http.error", str(e))
|
req_span.set_attribute("http.error", str(e))
|
||||||
|
req_span.set_status(Status(StatusCode.ERROR))
|
||||||
retries += 1
|
retries += 1
|
||||||
logger.warning(f"Download failed. Retrying {retries}/{max_retries}...")
|
logger.warning(f"Download failed. Retrying {retries}/{max_retries}...")
|
||||||
|
span.set_status(Status(StatusCode.ERROR))
|
||||||
return False
|
return False
|
||||||
|
Loading…
x
Reference in New Issue
Block a user