Compare commits

...

19 Commits

Author SHA1 Message Date
f12422b346 Merge branch 'master' into windows_nvidia
# Conflicts:
#	entity/ffmpeg.py
2025-03-06 14:06:38 +08:00
56bdad7ad1 音轨叠加 2025-03-06 10:34:28 +08:00
94373cee72 cameraShot特效及旋转 2025-03-05 14:57:02 +08:00
94b08dfcb5 Merge branch 'master' into windows_nvidia 2025-03-04 17:48:57 +08:00
4549b0ab44 分辨率和裁切 2025-03-04 17:43:47 +08:00
d8ab94fcba ffmpeg 参数 2025-03-04 16:11:56 +08:00
9385945030 Merge branch 'master' into windows_nvidia 2025-03-04 12:40:30 +08:00
9d178a3d34 埋点 2025-03-04 12:36:48 +08:00
1f9632761f effect 2025-03-03 14:27:52 +08:00
9041093324 windows nvidia 2025-02-27 16:49:48 +08:00
fff20610a5 profile level指定及修复 2025-02-27 16:48:57 +08:00
67696739f9 切割模式 2025-02-27 14:02:17 +08:00
2ea248c02e 上传文件也弄个超时 2025-02-16 18:15:35 +08:00
358207efdc 定时清理目录下无用文件 2025-02-16 18:15:25 +08:00
94a5e687df 未生成文件时,上报失败 2025-02-08 15:02:36 +08:00
b7d6797901 忽略无用文件 2025-02-05 10:13:33 +08:00
6d9d373032 only if 逻辑 2025-01-23 14:28:51 +08:00
549ee8320a ffprobe 报错后不采用其内容 2025-01-22 16:04:33 +08:00
29bb80f3b9 渲染后再to annexb,使用新逻辑拼接 2025-01-22 14:31:59 +08:00
10 changed files with 659 additions and 330 deletions

5
.gitignore vendored
View File

@ -6,6 +6,11 @@ __pycache__/
*.so
.Python
build/
dist/
*.mp4
*.ts
rand*.ts
tmp_concat_*.txt
*.egg-info/
*.egg
*.manifest

View File

@ -6,11 +6,14 @@ from entity.ffmpeg import FfmpegTask
import logging
from util import ffmpeg, oss
from telemetry import get_tracer
logger = logging.getLogger('biz/ffmpeg')
def parse_ffmpeg_task(task_info, template_info):
tracer = get_tracer(__name__)
with tracer.start_as_current_span("parse_ffmpeg_task"):
tasks = []
# 中间片段
task_params_str = task_info.get("taskParams", "{}")
@ -20,9 +23,18 @@ def parse_ffmpeg_task(task_info, template_info):
if not source:
logger.warning("no video found for part: " + str(part))
continue
only_if = part.get('only_if', '')
if only_if:
if not check_placeholder_exist(only_if, task_params):
logger.info("because only_if exist, placeholder: %s not exist, skip part: %s", only_if, part)
continue
sub_ffmpeg_task = FfmpegTask(source)
sub_ffmpeg_task.annexb = True
sub_ffmpeg_task.ext_data = find_placeholder_params(part.get('source'), task_params) or {}
sub_ffmpeg_task.frame_rate = template_info.get("frame_rate", 25)
sub_ffmpeg_task.center_cut = part.get("crop_mode", None)
for effect in part.get('effects', []):
sub_ffmpeg_task.add_effect(effect)
for lut in part.get('filters', []):
sub_ffmpeg_task.add_lut(os.path.join(template_info.get("local_path"), lut))
for audio in part.get('audios', []):
@ -33,10 +45,14 @@ def parse_ffmpeg_task(task_info, template_info):
output_file = "out_" + str(time.time()) + ".mp4"
task = FfmpegTask(tasks, output_file=output_file)
overall = template_info.get("overall_template")
task.mute = False
task.center_cut = template_info.get("crop_mode", None)
task.frame_rate = template_info.get("frame_rate", 25)
if overall.get('source', ''):
source = parse_video(overall.get('source'), task_params, template_info)
task.add_inputs(source)
for effect in overall.get('effects', []):
task.add_effect(effect)
for lut in overall.get('filters', []):
task.add_lut(os.path.join(template_info.get("local_path"), lut))
for audio in overall.get('audios', []):
@ -46,6 +62,19 @@ def parse_ffmpeg_task(task_info, template_info):
return task
def find_placeholder_params(source, task_params):
if source.startswith('PLACEHOLDER_'):
placeholder_id = source.replace('PLACEHOLDER_', '')
new_sources = task_params.get(placeholder_id, [])
if type(new_sources) is list:
if len(new_sources) == 0:
logger.debug("no video found for placeholder: " + placeholder_id)
return {}
else:
return new_sources[0]
return {}
def parse_video(source, task_params, template_info):
print(source)
if source.startswith('PLACEHOLDER_'):
@ -66,9 +95,25 @@ def parse_video(source, task_params, template_info):
return os.path.join(template_info.get("local_path"), source)
def check_placeholder_exist(placeholder_id, task_params):
if placeholder_id in task_params:
new_sources = task_params.get(placeholder_id, [])
if type(new_sources) is list:
if len(new_sources) == 0:
return False
else:
return True
return True
return False
def start_ffmpeg_task(ffmpeg_task):
tracer = get_tracer(__name__)
with tracer.start_as_current_span("start_ffmpeg_task"):
for task in ffmpeg_task.analyze_input_render_tasks():
start_ffmpeg_task(task)
result = start_ffmpeg_task(task)
if not result:
return False
ffmpeg_task.correct_task_type()
return ffmpeg.start_render(ffmpeg_task)

View File

@ -1,9 +1,12 @@
from biz.ffmpeg import parse_ffmpeg_task, start_ffmpeg_task, clear_task_tmp_file, probe_video_info
from telemetry import get_tracer
from template import get_template_def
from util import api
def start_task(task_info):
from biz.ffmpeg import parse_ffmpeg_task, start_ffmpeg_task, clear_task_tmp_file, probe_video_info
tracer = get_tracer(__name__)
with tracer.start_as_current_span("start_task"):
task_info = api.normalize_task(task_info)
template_info = get_template_def(task_info.get("templateId"))
api.report_task_start(task_info)

View File

@ -1,9 +1,19 @@
import json
import time
import uuid
from typing import Any
DEFAULT_ARGS = ("-shortest",)
ENCODER_ARGS = ("-c:v", "h264_nvenc", "-cq:v", "24", "-preset:v", "p7", "-tune:v", "hq",)
VIDEO_ARGS = ("-profile:v", "high", "-level:v", "4", )
AUDIO_ARGS = ("-c:a", "aac", "-b:a", "128k", "-ar", "48000", "-ac", "2", )
MUTE_AUDIO_INPUT = ("-f", "lavfi", "-i", "anullsrc=cl=stereo:r=48000", )
class FfmpegTask(object):
effects: list[str]
def __init__(self, input_file, task_type='copy', output_file=''):
self.annexb = False
if type(input_file) is str:
@ -14,6 +24,9 @@ class FfmpegTask(object):
self.input_file = input_file
else:
self.input_file = []
self.zoom_cut = None
self.center_cut = None
self.ext_data = {}
self.task_type = task_type
self.output_file = output_file
self.mute = True
@ -23,6 +36,7 @@ class FfmpegTask(object):
self.luts = []
self.audios = []
self.overlays = []
self.effects = []
def __repr__(self):
_str = f'FfmpegTask(input_file={self.input_file}, task_type={self.task_type}'
@ -34,8 +48,11 @@ class FfmpegTask(object):
_str += f', overlays={self.overlays}'
if self.annexb:
_str += f', annexb={self.annexb}'
if self.effects:
_str += f', effects={self.effects}'
if self.mute:
_str += f', mute={self.mute}'
_str += f', center_cut={self.center_cut}'
return _str + ')'
def analyze_input_render_tasks(self):
@ -77,6 +94,10 @@ class FfmpegTask(object):
self.luts.extend(luts)
self.correct_task_type()
def add_effect(self, *effects):
self.effects.extend(effects)
self.correct_task_type()
def get_output_file(self):
if self.task_type == 'copy':
return self.input_file[0]
@ -99,8 +120,14 @@ class FfmpegTask(object):
return False
if len(self.subtitles) > 0:
return False
if len(self.effects) > 0:
return False
if self.speed != 1:
return False
if self.zoom_cut is not None:
return False
if self.center_cut is not None:
return False
return True
def check_can_copy(self):
@ -110,40 +137,93 @@ class FfmpegTask(object):
return False
if len(self.subtitles) > 0:
return False
if len(self.effects) > 0:
return False
if self.speed != 1:
return False
if len(self.audios) > 1:
return False
if len(self.input_file) > 1:
return False
if self.zoom_cut is not None:
return False
if self.center_cut is not None:
return False
return True
def check_audio_track(self):
if len(self.audios) > 0:
self.mute = False
...
def get_ffmpeg_args(self):
args = ['-y', '-hide_banner']
if self.task_type == 'encode':
# args += ('-hwaccel', 'qsv', '-hwaccel_output_format', 'qsv')
input_args = []
filter_args = []
output_args = ["-shortest", "-c:v", "h264_qsv", "-global_quality", "28", "-look_ahead", "1"]
output_args = [*VIDEO_ARGS, *AUDIO_ARGS, *ENCODER_ARGS, *DEFAULT_ARGS]
if self.annexb:
output_args.append("-bsf:v")
output_args.append("h264_mp4toannexb")
output_args.append("-reset_timestamps")
output_args.append("1")
video_output_str = "[0:v]"
audio_output_str = "[0:v]"
video_input_count = 0
audio_input_count = 0
audio_output_str = ""
effect_index = 0
for input_file in self.input_file:
input_args.append("-i")
if type(input_file) is str:
input_args.append(input_file)
elif isinstance(input_file, FfmpegTask):
input_args.append(input_file.get_output_file())
if self.center_cut == 1:
pos_json_str = self.ext_data.get('posJson', '{}')
pos_json = json.loads(pos_json_str)
_v_w = pos_json.get('imgWidth', 1)
_f_x = pos_json.get('ltX', 0)
_f_x2 = pos_json.get('rbX', 0)
_x = f'{float((_f_x2 - _f_x)/(2 * _v_w)) :.4f}*iw'
filter_args.append(f"{video_output_str}crop=x={_x}:y=0:w=ih*ih/iw:h=ih[v_cut{effect_index}]")
video_output_str = f"[v_cut{effect_index}]"
effect_index += 1
for effect in self.effects:
if effect.startswith("cameraShot:"):
param = effect.split(":", 2)[1]
if param == '':
param = "3,1,0"
_split = param.split(",")
start = 3
duration = 1
rotate_deg = 0
if len(_split) >= 3:
if _split[2] == '':
rotate_deg = 0
else:
rotate_deg = int(_split[2])
if len(_split) >= 2:
duration = float(_split[1])
if len(_split) >= 1:
start = float(_split[0])
_start_out_str = "[eff_s]"
_mid_out_str = "[eff_m]"
_end_out_str = "[eff_e]"
filter_args.append(f"{video_output_str}split=3{_start_out_str}{_mid_out_str}{_end_out_str}")
filter_args.append(f"{_start_out_str}select=lt(n\,{int(start*self.frame_rate)}){_start_out_str}")
filter_args.append(f"{_end_out_str}select=gt(n\,{int(start*self.frame_rate)}){_end_out_str}")
filter_args.append(f"{_mid_out_str}select=eq(n\,{int(start*self.frame_rate)}){_mid_out_str}")
filter_args.append(f"{_mid_out_str}tpad=start_mode=clone:start_duration={duration:.4f}{_mid_out_str}")
if rotate_deg != 0:
filter_args.append(f"{_mid_out_str}rotate=PI*{rotate_deg}/360{_mid_out_str}")
# filter_args.append(f"{video_output_str}trim=start=0:end={start+duration},tpad=stop_mode=clone:stop_duration={duration},setpts=PTS-STARTPTS{_start_out_str}")
# filter_args.append(f"tpad=start_mode=clone:start_duration={duration},setpts=PTS-STARTPTS{_start_out_str}")
# filter_args.append(f"{_end_out_str}trim=start={start}{_end_out_str}")
video_output_str = f"[v_eff{effect_index}]"
# filter_args.append(f"{_end_out_str}{_start_out_str}overlay=eof_action=pass{video_output_str}")
filter_args.append(f"{_start_out_str}{_mid_out_str}{_end_out_str}concat=n=3:v=1:a=0,setpts=N/{self.frame_rate}/TB{video_output_str}")
effect_index += 1
elif effect.startswith("zoom:"):
...
...
for lut in self.luts:
filter_args.append("[0:v]lut3d=file=" + lut + "[0:v]")
filter_args.append(f"{video_output_str}lut3d=file={lut}{video_output_str}")
for overlay in self.overlays:
input_index = input_args.count("-i")
input_args.append("-i")
@ -159,32 +239,35 @@ class FfmpegTask(object):
output_args.append("-r")
output_args.append(f"{self.frame_rate}")
if self.mute:
output_args.append("-an")
input_index = input_args.count("-i")
input_args += MUTE_AUDIO_INPUT
filter_args.append(f"[{input_index}:a]acopy[a]")
audio_output_str = "[a]"
else:
input_index = 0
audio_output_str = "[0:a]"
for audio in self.audios:
input_index = input_args.count("-i")
input_args.append("-i")
input_args.append(audio.replace("\\", "/"))
if audio_input_count > 0:
filter_args.append(f"{audio_output_str}[{input_index}:a]amix[a]")
if audio_output_str == "":
filter_args.append(f"[{input_index}:a]acopy[a]")
audio_output_str = "[a]"
else:
audio_output_str = f"[{input_index}:a]"
audio_input_count += 1
if audio_input_count == 1:
audio_output_str = f"{input_index}"
output_args.append(f"-map")
filter_args.append(f"{audio_output_str}[{input_index}:a]amix[a]")
audio_output_str = "[a]"
if audio_output_str:
output_args.append("-map")
output_args.append(audio_output_str)
return args + input_args + ["-filter_complex", ";".join(filter_args)] + output_args + [self.get_output_file()]
_filter_args = [] if len(filter_args) == 0 else ["-filter_complex", ";".join(filter_args)]
return args + input_args + _filter_args + output_args + [self.get_output_file()]
elif self.task_type == 'concat':
# 无法通过 annexb 合并的
input_args = []
output_args = ["-shortest"]
if self.check_annexb() and len(self.audios) <= 1:
output_args = [*DEFAULT_ARGS]
filter_args = []
audio_output_str = ""
audio_track_index = 0
# output_args
if len(self.audios) > 0:
input_args.append("-an")
_tmp_file = "tmp_concat_"+str(time.time())+".txt"
with open(_tmp_file, "w", encoding="utf-8") as f:
for input_file in self.input_file:
@ -192,58 +275,37 @@ class FfmpegTask(object):
f.write("file '"+input_file+"'\n")
elif isinstance(input_file, FfmpegTask):
f.write("file '" + input_file.get_output_file() + "'\n")
input_args += ("-f", "concat", "-safe", "0", "-i", _tmp_file)
input_args += ["-f", "concat", "-safe", "0", "-i", _tmp_file]
output_args.append("-map")
output_args.append("0:v")
output_args.append("-c:v")
output_args.append("copy")
if len(self.audios) > 0:
input_args.append("-i")
input_args.append(self.audios[0])
output_args.append("-c:a")
output_args.append("copy")
output_args.append("-f")
output_args.append("mp4")
output_args += ("-c:v", "h264_qsv", "-r", "25", "-global_quality", "28", "-look_ahead", "1")
return args + input_args + output_args + [self.get_output_file()]
output_args += ("-c:v", "h264_qsv", "-r", "25", "-global_quality", "28", "-look_ahead", "1")
filter_args = []
video_output_str = "[0:v]"
audio_output_str = "[0:a]"
video_input_count = 0
audio_input_count = 0
for input_file in self.input_file:
input_index = input_args.count("-i")
input_args.append("-i")
if type(input_file) is str:
input_args.append(input_file.replace("\\", "/"))
elif isinstance(input_file, FfmpegTask):
input_args.append(input_file.get_output_file().replace("\\", "/"))
if video_input_count > 0:
filter_args.append(f"{video_output_str}[{input_index}:v]concat=n=2:v=1:a=0[v]")
video_output_str = "[v]"
else:
video_output_str = f"[{input_index}:v]"
video_input_count += 1
output_args.append("-map")
output_args.append(video_output_str)
if self.mute:
output_args.append("-an")
input_index = input_args.count("-i")
input_args += MUTE_AUDIO_INPUT
audio_output_str = f"[{input_index}:a]"
audio_track_index += 1
else:
input_index = 0
audio_output_str = "[0:a]"
audio_track_index += 1
for audio in self.audios:
input_index = input_args.count("-i")
input_args.append("-i")
input_args.append(audio.replace("\\", "/"))
if audio_input_count > 0:
audio_track_index += 1
filter_args.append(f"{audio_output_str}[{input_index}:a]amix[a]")
audio_output_str = "[a]"
if audio_output_str:
output_args.append("-map")
if audio_track_index <= 1:
output_args.append(audio_output_str[1:-1])
else:
audio_output_str = f"[{input_index}:a]"
audio_input_count += 1
if audio_input_count == 1:
audio_output_str = f"{input_index}"
output_args.append(f"-map")
output_args.append(audio_output_str)
return args + input_args + ["-filter_complex", ";".join(filter_args)] + output_args + [self.get_output_file()]
output_args += AUDIO_ARGS
output_args.append("-f")
output_args.append("mp4")
_filter_args = [] if len(filter_args) == 0 else ["-filter_complex", ";".join(filter_args)]
return args + input_args + _filter_args + output_args + [self.get_output_file()]
elif self.task_type == 'copy':
if len(self.input_file) == 1:
if type(self.input_file[0]) is str:

View File

@ -2,18 +2,41 @@ from time import sleep
import biz.task
import config
from telemetry import init_opentelemetry
from template import load_local_template
from util import api
load_local_template()
import os
import glob
load_local_template()
import logging
LOGGER = logging.getLogger(__name__)
init_opentelemetry()
while True:
# print(get_sys_info())
print("waiting for task...")
try:
task_list = api.sync_center()
except Exception as e:
LOGGER.error("sync_center error", exc_info=e)
sleep(5)
continue
if len(task_list) == 0:
# 删除当前文件夹下所有以.mp4、.ts结尾的文件
for file_globs in ['*.mp4', '*.ts', 'tmp_concat*.txt']:
for file_path in glob.glob(file_globs):
try:
os.remove(file_path)
print(f"Deleted file: {file_path}")
except Exception as e:
LOGGER.error(f"Error deleting file {file_path}", exc_info=e)
sleep(5)
for task in task_list:
print("start task:", task)
try:
biz.task.start_task(task)
except Exception as e:
LOGGER.error("task_start error", exc_info=e)

30
telemetry/__init__.py Normal file
View File

@ -0,0 +1,30 @@
import os
from constant import SOFTWARE_VERSION
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter as OTLPSpanHttpExporter
from opentelemetry.sdk.resources import DEPLOYMENT_ENVIRONMENT, HOST_NAME, Resource, SERVICE_NAME, SERVICE_VERSION
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
def get_tracer(name):
return trace.get_tracer(name)
# 初始化 OpenTelemetry
def init_opentelemetry():
# 设置服务名、主机名
resource = Resource(attributes={
SERVICE_NAME: "RENDER_WORKER",
SERVICE_VERSION: SOFTWARE_VERSION,
DEPLOYMENT_ENVIRONMENT: "Python",
HOST_NAME: os.getenv("ACCESS_KEY"),
})
# 使用HTTP协议上报
span_processor = BatchSpanProcessor(OTLPSpanHttpExporter(
endpoint="http://tracing-analysis-dc-sh.aliyuncs.com/adapt_e7qojqi4e0@aa79b4d367fb6b7_e7qojqi4e0@53df7ad2afe8301/api/otlp/traces",
))
trace_provider = TracerProvider(resource=resource, active_span_processor=span_processor)
trace.set_tracer_provider(trace_provider)

View File

@ -88,8 +88,8 @@ def download_template(template_id):
new_fp = os.path.join(template_info['local_path'], _fn)
oss.download_from_oss(_template['source'], new_fp)
if _fn.endswith(".mp4"):
from util.ffmpeg import to_annexb
new_fp = to_annexb(new_fp)
from util.ffmpeg import re_encode_and_annexb
new_fp = re_encode_and_annexb(new_fp)
_template['source'] = os.path.relpath(new_fp, template_info['local_path'])
if 'overlays' in _template:
for i in range(len(_template['overlays'])):

View File

@ -1,9 +1,11 @@
import json
import logging
import os
import requests
import util.system
from telemetry import get_tracer
session = requests.Session()
logger = logging.getLogger(__name__)
@ -19,16 +21,24 @@ def sync_center():
通过接口获取任务
:return: 任务列表
"""
try:
from template import TEMPLATES, download_template
tracer = get_tracer(__name__)
with tracer.start_as_current_span("sync_center"):
with get_tracer("api").start_as_current_span("sync_center.request") as req_span:
try:
req_span.set_attribute("http.method", "POST")
req_span.set_attribute("http.url", os.getenv('API_ENDPOINT') + "/sync")
response = session.post(os.getenv('API_ENDPOINT') + "/sync", json={
'accessKey': os.getenv('ACCESS_KEY'),
'clientStatus': util.system.get_sys_info(),
'templateList': [{'id': t.get('id', ''), 'updateTime': t.get('updateTime', '')} for t in TEMPLATES.values()]
'templateList': [{'id': t.get('id', ''), 'updateTime': t.get('updateTime', '')} for t in
TEMPLATES.values()]
}, timeout=10)
req_span.set_attribute("http.status_code", response.status_code)
response.raise_for_status()
req_span.set_attribute("api.response", response.text)
except requests.RequestException as e:
req_span.set_attribute("api.error", str(e))
logger.error("请求失败!", e)
return []
data = response.json()
@ -56,24 +66,32 @@ def get_template_info(template_id):
:type template_id: str
:return: 模板信息
"""
tracer = get_tracer(__name__)
with tracer.start_as_current_span("get_template_info"):
with get_tracer("api").start_as_current_span("get_template_info.request") as req_span:
try:
req_span.set_attribute("http.method", "POST")
req_span.set_attribute("http.url", '{0}/template/{1}'.format(os.getenv('API_ENDPOINT'), template_id))
response = session.post('{0}/template/{1}'.format(os.getenv('API_ENDPOINT'), template_id), json={
'accessKey': os.getenv('ACCESS_KEY'),
}, timeout=10)
req_span.set_attribute("http.status_code", response.status_code)
response.raise_for_status()
req_span.set_attribute("api.response", response.text)
except requests.RequestException as e:
req_span.set_attribute("api.error", str(e))
logger.error("请求失败!", e)
return None
data = response.json()
logger.debug("获取模板信息结果:【%s", data)
remote_template_info = data.get('data', {})
logger.debug("获取模板信息结果:【%s", remote_template_info)
template = {
'id': template_id,
'updateTime': remote_template_info.get('updateTime', template_id),
'scenic_name': remote_template_info.get('scenicName', '景区'),
'name': remote_template_info.get('name', '模版'),
'video_size': '1920x1080',
'frame_rate': 30,
'video_size': remote_template_info.get('resolution', '1920x1080'),
'frame_rate': 25,
'overall_duration': 30,
'video_parts': [
@ -90,6 +108,7 @@ def get_template_info(template_id):
# 占位符
_template['source'] = "PLACEHOLDER_" + template_info.get('sourceUrl', '')
_template['mute'] = template_info.get('mute', True)
_template['crop_mode'] = template_info.get('cropEnable', None)
else:
_template['source'] = None
_overlays = template_info.get('overlays', '')
@ -101,6 +120,12 @@ def get_template_info(template_id):
_luts = template_info.get('luts', '')
if _luts:
_template['luts'] = _luts.split(",")
_only_if = template_info.get('onlyIf', '')
if _only_if:
_template['only_if'] = _only_if
_effects = template_info.get('effects', '')
if _effects:
_template['effects'] = _effects.split("|")
return _template
# outer template definition
@ -112,61 +137,104 @@ def get_template_info(template_id):
parts = _template_normalizer(children_template)
template['video_parts'].append(parts)
template['local_path'] = os.path.join(os.getenv('TEMPLATE_DIR'), str(template_id))
with get_tracer("api").start_as_current_span("get_template_info.template") as res_span:
res_span.set_attribute("normalized.response", json.dumps(template))
return template
def report_task_success(task_info, **kwargs):
tracer = get_tracer(__name__)
with tracer.start_as_current_span("report_task_success"):
with get_tracer("api").start_as_current_span("report_task_success.request") as req_span:
try:
req_span.set_attribute("http.method", "POST")
req_span.set_attribute("http.url",
'{0}/{1}/success'.format(os.getenv('API_ENDPOINT'), task_info.get("id")))
response = session.post('{0}/{1}/success'.format(os.getenv('API_ENDPOINT'), task_info.get("id")), json={
'accessKey': os.getenv('ACCESS_KEY'),
**kwargs
}, timeout=10)
req_span.set_attribute("http.status_code", response.status_code)
response.raise_for_status()
req_span.set_attribute("api.response", response.text)
except requests.RequestException as e:
req_span.set_attribute("api.error", str(e))
logger.error("请求失败!", e)
return None
def report_task_start(task_info):
tracer = get_tracer(__name__)
with tracer.start_as_current_span("report_task_start"):
with get_tracer("api").start_as_current_span("report_task_start.request") as req_span:
try:
req_span.set_attribute("http.method", "POST")
req_span.set_attribute("http.url",
'{0}/{1}/start'.format(os.getenv('API_ENDPOINT'), task_info.get("id")))
response = session.post('{0}/{1}/start'.format(os.getenv('API_ENDPOINT'), task_info.get("id")), json={
'accessKey': os.getenv('ACCESS_KEY'),
}, timeout=10)
req_span.set_attribute("http.status_code", response.status_code)
response.raise_for_status()
req_span.set_attribute("api.response", response.text)
except requests.RequestException as e:
req_span.set_attribute("api.error", str(e))
logger.error("请求失败!", e)
return None
def report_task_failed(task_info, reason=''):
tracer = get_tracer(__name__)
with tracer.start_as_current_span("report_task_failed"):
with tracer.start_as_current_span("report_task_failed.request") as req_span:
try:
req_span.set_attribute("http.method", "POST")
req_span.set_attribute("http.url",
'{0}/{1}/fail'.format(os.getenv('API_ENDPOINT'), task_info.get("id")))
response = session.post('{0}/{1}/fail'.format(os.getenv('API_ENDPOINT'), task_info.get("id")), json={
'accessKey': os.getenv('ACCESS_KEY'),
'reason': reason
}, timeout=10)
req_span.set_attribute("http.status_code", response.status_code)
response.raise_for_status()
req_span.set_attribute("api.response", response.text)
except requests.RequestException as e:
req_span.set_attribute("api.error", str(e))
logger.error("请求失败!", e)
return None
def upload_task_file(task_info, ffmpeg_task):
tracer = get_tracer(__name__)
with tracer.start_as_current_span("upload_task_file") as span:
logger.info("开始上传文件: %s", task_info.get("id"))
span.set_attribute("file.id", task_info.get("id"))
try:
response = session.post('{0}/{1}/uploadUrl'.format(os.getenv('API_ENDPOINT'), task_info.get("id")), json={
with tracer.start_as_current_span("upload_task_file.request_upload_url") as req_span:
req_span.set_attribute("http.method", "POST")
req_span.set_attribute("http.url",
'{0}/{1}/uploadUrl'.format(os.getenv('API_ENDPOINT'), task_info.get("id")))
response = session.post('{0}/{1}/uploadUrl'.format(os.getenv('API_ENDPOINT'), task_info.get("id")),
json={
'accessKey': os.getenv('ACCESS_KEY'),
}, timeout=10)
response.raise_for_status()
req_span.set_attribute("http.status_code", response.status_code)
except requests.RequestException as e:
span.set_attribute("api.error", str(e))
logger.error("请求失败!", e)
return False
data = response.json()
url = data.get('data', "")
logger.info("开始上传文件: %s%s", task_info.get("id"), url)
try:
with tracer.start_as_current_span("upload_task_file.start_upload_file") as upload_span:
upload_span.set_attribute("http.method", "PUT")
upload_span.set_attribute("http.url", url)
with open(ffmpeg_task.get_output_file(), 'rb') as f:
requests.put(url, data=f)
requests.put(url, data=f, headers={"Content-Type": "video/mp4"})
except requests.RequestException as e:
span.set_attribute("api.error", str(e))
logger.error("上传失败!", e)
return False
finally:

View File

@ -1,40 +1,86 @@
import json
import logging
import os
import subprocess
from datetime import datetime
from typing import Optional, IO
from entity.ffmpeg import FfmpegTask
from entity.ffmpeg import FfmpegTask, ENCODER_ARGS, VIDEO_ARGS, AUDIO_ARGS
from telemetry import get_tracer
logger = logging.getLogger(__name__)
def to_annexb(file):
with get_tracer("ffmpeg").start_as_current_span("to_annexb") as span:
span.set_attribute("file.path", file)
if not os.path.exists(file):
return file
logger.info("ToAnnexb: %s", file)
ffmpeg_process = subprocess.run(["ffmpeg", "-y", "-hide_banner", "-i", file, "-c", "copy", "-bsf:v", "h264_mp4toannexb",
"-f", "mpegts", file+".ts"])
span.set_attribute("ffmpeg.args", json.dumps(ffmpeg_process.args))
logger.info("ToAnnexb: %s, returned: %s", file, ffmpeg_process.returncode)
span.set_attribute("ffmpeg.code", ffmpeg_process.returncode)
if ffmpeg_process.returncode == 0:
span.set_attribute("file.size", os.path.getsize(file+".ts"))
os.remove(file)
return file+".ts"
else:
return file
def re_encode_and_annexb(file):
with get_tracer("ffmpeg").start_as_current_span("re_encode_and_annexb") as span:
span.set_attribute("file.path", file)
if not os.path.exists(file):
return file
logger.info("ReEncodeAndAnnexb: %s", file)
ffmpeg_process = subprocess.run(["ffmpeg", "-y", "-hide_banner", "-i", file, *VIDEO_ARGS, *AUDIO_ARGS, *ENCODER_ARGS, "-bsf:v", "h264_mp4toannexb",
"-f", "mpegts", file +".ts"])
span.set_attribute("ffmpeg.args", json.dumps(ffmpeg_process.args))
logger.info("ReEncodeAndAnnexb: %s, returned: %s", file, ffmpeg_process.returncode)
span.set_attribute("ffmpeg.code", ffmpeg_process.returncode)
if ffmpeg_process.returncode == 0:
span.set_attribute("file.size", os.path.getsize(file+".ts"))
os.remove(file)
return file+".ts"
else:
return file
def start_render(ffmpeg_task: FfmpegTask):
tracer = get_tracer(__name__)
with tracer.start_as_current_span("start_render") as span:
span.set_attribute("ffmpeg.task", str(ffmpeg_task))
logger.info(ffmpeg_task)
if not ffmpeg_task.need_run():
ffmpeg_task.set_output_file(ffmpeg_task.input_file[0])
return True
ffmpeg_args = ffmpeg_task.get_ffmpeg_args()
logger.info(ffmpeg_args)
if len(ffmpeg_args) == 0:
ffmpeg_task.set_output_file(ffmpeg_task.input_file[0])
return True
ffmpeg_process = subprocess.run(["ffmpeg", "-progress", "-", "-loglevel", "error", *ffmpeg_args], **subprocess_args(True))
logger.info("FINISH TASK, OUTPUT IS %s", handle_ffmpeg_output(ffmpeg_process.stdout))
span.set_attribute("ffmpeg.args", json.dumps(ffmpeg_process.args))
logger.info(ffmpeg_process.args)
ffmpeg_final_out = handle_ffmpeg_output(ffmpeg_process.stdout)
span.set_attribute("ffmpeg.out", ffmpeg_final_out)
logger.info("FINISH TASK, OUTPUT IS %s", ffmpeg_final_out)
code = ffmpeg_process.returncode
return code == 0
span.set_attribute("ffmpeg.code", code)
if code != 0:
logger.error("FFMPEG ERROR: %s", ffmpeg_process.stderr)
return False
span.set_attribute("ffmpeg.out_file", ffmpeg_task.output_file)
try:
file_size = os.path.getsize(ffmpeg_task.output_file)
span.set_attribute("file.size", file_size)
if file_size < 4096:
logger.error("FFMPEG ERROR: OUTPUT FILE IS TOO SMALL")
return False
except OSError:
span.set_attribute("file.size", 0)
logger.error("FFMPEG ERROR: OUTPUT FILE NOT FOUND")
return False
return True
def handle_ffmpeg_output(stdout: Optional[bytes]) -> str:
out_time = "0:0:0.0"
@ -55,24 +101,30 @@ def handle_ffmpeg_output(stdout: Optional[bytes]) -> str:
print("[ ]Speed:", out_time, "@", speed)
return out_time+"@"+speed
def duration_str_to_float(duration_str: str) -> float:
_duration = datetime.strptime(duration_str, "%H:%M:%S.%f") - datetime(1900, 1, 1)
return _duration.total_seconds()
def probe_video_info(video_file):
tracer = get_tracer(__name__)
with tracer.start_as_current_span("probe_video_info") as span:
span.set_attribute("video.file", video_file)
# 获取宽度和高度
result = subprocess.run(
["ffprobe.exe", '-v', 'error', '-select_streams', 'v:0', '-show_entries', 'stream=width,height:format=duration', '-of',
["ffprobe", '-v', 'error', '-select_streams', 'v:0', '-show_entries', 'stream=width,height:format=duration', '-of',
'csv=s=x:p=0', video_file],
stderr=subprocess.STDOUT,
**subprocess_args(True)
)
span.set_attribute("ffprobe.args", json.dumps(result.args))
span.set_attribute("ffprobe.code", result.returncode)
if result.returncode != 0:
return 0, 0, 0
all_result = result.stdout.decode('utf-8').strip()
span.set_attribute("ffprobe.out", all_result)
wh, duration = all_result.split('\n')
width, height = wh.strip().split('x')
return int(width), int(height), float(duration)

View File

@ -3,6 +3,8 @@ import os
import requests
from telemetry import get_tracer
logger = logging.getLogger(__name__)
@ -13,14 +15,34 @@ def upload_to_oss(url, file_path):
:param str file_path: 文件路径
:return bool: 是否成功
"""
with open(file_path, 'rb') as f:
tracer = get_tracer(__name__)
with tracer.start_as_current_span("upload_to_oss") as span:
span.set_attribute("file.url", url)
span.set_attribute("file.path", file_path)
max_retries = 5
retries = 0
while retries < max_retries:
with tracer.start_as_current_span("upload_to_oss.request") as req_span:
req_span.set_attribute("http.retry_count", retries)
try:
response = requests.put(url, data=f)
return response.status_code == 200
req_span.set_attribute("http.method", "PUT")
req_span.set_attribute("http.url", url)
with open(file_path, 'rb') as f:
response = requests.put(url, data=f, timeout=60) # 设置超时时间为1分钟
req_span.set_attribute("http.status_code", response.status_code)
if response.status_code == 200:
return True
except requests.exceptions.Timeout:
req_span.set_attribute("http.error", "Timeout")
retries += 1
logger.warning(f"Upload timed out. Retrying {retries}/{max_retries}...")
except Exception as e:
print(e)
req_span.set_attribute("http.error", str(e))
retries += 1
logger.warning(f"Upload failed. Retrying {retries}/{max_retries}...")
return False
def download_from_oss(url, file_path):
"""
使用签名URL下载文件到OSS
@ -28,16 +50,35 @@ def download_from_oss(url, file_path):
:param Union[LiteralString, str, bytes] file_path: 文件路径
:return bool: 是否成功
"""
tracer = get_tracer(__name__)
with tracer.start_as_current_span("download_from_oss") as span:
span.set_attribute("file.url", url)
span.set_attribute("file.path", file_path)
logging.info("download_from_oss: %s", url)
file_dir, file_name = os.path.split(file_path)
if file_dir:
if not os.path.exists(file_dir):
os.makedirs(file_dir)
max_retries = 5
retries = 0
while retries < max_retries:
with tracer.start_as_current_span("download_from_oss.request") as req_span:
req_span.set_attribute("http.retry_count", retries)
try:
response = requests.get(url)
req_span.set_attribute("http.method", "GET")
req_span.set_attribute("http.url", url)
response = requests.get(url, timeout=15) # 设置超时时间
req_span.set_attribute("http.status_code", response.status_code)
with open(file_path, 'wb') as f:
f.write(response.content)
span.set_attribute("file.size", os.path.getsize(file_path))
return True
except requests.exceptions.Timeout:
span.set_attribute("http.error", "Timeout")
retries += 1
logger.warning(f"Download timed out. Retrying {retries}/{max_retries}...")
except Exception as e:
print(e)
span.set_attribute("http.error", str(e))
retries += 1
logger.warning(f"Download failed. Retrying {retries}/{max_retries}...")
return False