This commit is contained in:
Jerry Yan 2025-03-04 11:42:31 +08:00
parent 1f9632761f
commit 9d178a3d34
7 changed files with 443 additions and 295 deletions

View File

@ -6,11 +6,14 @@ from entity.ffmpeg import FfmpegTask
import logging
from util import ffmpeg, oss
from telemetry import get_tracer
logger = logging.getLogger('biz/ffmpeg')
def parse_ffmpeg_task(task_info, template_info):
tracer = get_tracer(__name__)
with tracer.start_as_current_span("parse_ffmpeg_task"):
tasks = []
# 中间片段
task_params_str = task_info.get("taskParams", "{}")
@ -87,7 +90,10 @@ def check_placeholder_exist(placeholder_id, task_params):
return True
return False
def start_ffmpeg_task(ffmpeg_task):
tracer = get_tracer(__name__)
with tracer.start_as_current_span("start_ffmpeg_task"):
for task in ffmpeg_task.analyze_input_render_tasks():
result = start_ffmpeg_task(task)
if not result:

View File

@ -1,9 +1,12 @@
from biz.ffmpeg import parse_ffmpeg_task, start_ffmpeg_task, clear_task_tmp_file, probe_video_info
from telemetry import get_tracer
from template import get_template_def
from util import api
def start_task(task_info):
from biz.ffmpeg import parse_ffmpeg_task, start_ffmpeg_task, clear_task_tmp_file, probe_video_info
tracer = get_tracer(__name__)
with tracer.start_as_current_span("start_task"):
task_info = api.normalize_task(task_info)
template_info = get_template_def(task_info.get("templateId"))
api.report_task_start(task_info)

View File

@ -2,6 +2,7 @@ from time import sleep
import biz.task
import config
from telemetry import init_opentelemetry
from template import load_local_template
from util import api
@ -12,6 +13,7 @@ load_local_template()
import logging
LOGGER = logging.getLogger(__name__)
init_opentelemetry()
while True:
# print(get_sys_info())

30
telemetry/__init__.py Normal file
View File

@ -0,0 +1,30 @@
import os
from constant import SOFTWARE_VERSION
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter as OTLPSpanHttpExporter
from opentelemetry.sdk.resources import DEPLOYMENT_ENVIRONMENT, HOST_NAME, Resource, SERVICE_NAME, SERVICE_VERSION
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
def get_tracer(name):
return trace.get_tracer(name)
# 初始化 OpenTelemetry
def init_opentelemetry():
# 设置服务名、主机名
resource = Resource(attributes={
SERVICE_NAME: "RENDER_WORKER",
SERVICE_VERSION: SOFTWARE_VERSION,
DEPLOYMENT_ENVIRONMENT: "Python",
HOST_NAME: os.getenv("ACCESS_KEY"),
})
# 使用HTTP协议上报
span_processor = BatchSpanProcessor(OTLPSpanHttpExporter(
endpoint="http://tracing-analysis-dc-sh.aliyuncs.com/adapt_e7qojqi4e0@aa79b4d367fb6b7_e7qojqi4e0@53df7ad2afe8301/api/otlp/traces",
))
trace_provider = TracerProvider(resource=resource, active_span_processor=span_processor)
trace.set_tracer_provider(trace_provider)

View File

@ -1,9 +1,11 @@
import json
import logging
import os
import requests
import util.system
from telemetry import get_tracer
session = requests.Session()
logger = logging.getLogger(__name__)
@ -19,16 +21,24 @@ def sync_center():
通过接口获取任务
:return: 任务列表
"""
try:
from template import TEMPLATES, download_template
tracer = get_tracer(__name__)
with tracer.start_as_current_span("sync_center"):
with get_tracer("api").start_as_current_span("sync_center.request") as req_span:
try:
req_span.set_attribute("http.method", "POST")
req_span.set_attribute("http.url", os.getenv('API_ENDPOINT') + "/sync")
response = session.post(os.getenv('API_ENDPOINT') + "/sync", json={
'accessKey': os.getenv('ACCESS_KEY'),
'clientStatus': util.system.get_sys_info(),
'templateList': [{'id': t.get('id', ''), 'updateTime': t.get('updateTime', '')} for t in TEMPLATES.values()]
'templateList': [{'id': t.get('id', ''), 'updateTime': t.get('updateTime', '')} for t in
TEMPLATES.values()]
}, timeout=10)
req_span.set_attribute("http.status_code", response.status_code)
response.raise_for_status()
req_span.set_attribute("api.response", response.text)
except requests.RequestException as e:
req_span.set_attribute("api.error", str(e))
logger.error("请求失败!", e)
return []
data = response.json()
@ -56,17 +66,25 @@ def get_template_info(template_id):
:type template_id: str
:return: 模板信息
"""
tracer = get_tracer(__name__)
with tracer.start_as_current_span("get_template_info"):
with get_tracer("api").start_as_current_span("get_template_info.request") as req_span:
try:
req_span.set_attribute("http.method", "POST")
req_span.set_attribute("http.url", '{0}/template/{1}'.format(os.getenv('API_ENDPOINT'), template_id))
response = session.post('{0}/template/{1}'.format(os.getenv('API_ENDPOINT'), template_id), json={
'accessKey': os.getenv('ACCESS_KEY'),
}, timeout=10)
req_span.set_attribute("http.status_code", response.status_code)
response.raise_for_status()
req_span.set_attribute("api.response", response.text)
except requests.RequestException as e:
req_span.set_attribute("api.error", str(e))
logger.error("请求失败!", e)
return None
data = response.json()
logger.debug("获取模板信息结果:【%s", data)
remote_template_info = data.get('data', {})
logger.debug("获取模板信息结果:【%s", remote_template_info)
template = {
'id': template_id,
'updateTime': remote_template_info.get('updateTime', template_id),
@ -119,61 +137,104 @@ def get_template_info(template_id):
parts = _template_normalizer(children_template)
template['video_parts'].append(parts)
template['local_path'] = os.path.join(os.getenv('TEMPLATE_DIR'), str(template_id))
with get_tracer("api").start_as_current_span("get_template_info.template") as res_span:
res_span.set_attribute("normalized.response", json.dumps(template))
return template
def report_task_success(task_info, **kwargs):
tracer = get_tracer(__name__)
with tracer.start_as_current_span("report_task_success"):
with get_tracer("api").start_as_current_span("report_task_success.request") as req_span:
try:
req_span.set_attribute("http.method", "POST")
req_span.set_attribute("http.url",
'{0}/{1}/success'.format(os.getenv('API_ENDPOINT'), task_info.get("id")))
response = session.post('{0}/{1}/success'.format(os.getenv('API_ENDPOINT'), task_info.get("id")), json={
'accessKey': os.getenv('ACCESS_KEY'),
**kwargs
}, timeout=10)
req_span.set_attribute("http.status_code", response.status_code)
response.raise_for_status()
req_span.set_attribute("api.response", response.text)
except requests.RequestException as e:
req_span.set_attribute("api.error", str(e))
logger.error("请求失败!", e)
return None
def report_task_start(task_info):
tracer = get_tracer(__name__)
with tracer.start_as_current_span("report_task_start"):
with get_tracer("api").start_as_current_span("report_task_start.request") as req_span:
try:
req_span.set_attribute("http.method", "POST")
req_span.set_attribute("http.url",
'{0}/{1}/start'.format(os.getenv('API_ENDPOINT'), task_info.get("id")))
response = session.post('{0}/{1}/start'.format(os.getenv('API_ENDPOINT'), task_info.get("id")), json={
'accessKey': os.getenv('ACCESS_KEY'),
}, timeout=10)
req_span.set_attribute("http.status_code", response.status_code)
response.raise_for_status()
req_span.set_attribute("api.response", response.text)
except requests.RequestException as e:
req_span.set_attribute("api.error", str(e))
logger.error("请求失败!", e)
return None
def report_task_failed(task_info, reason=''):
tracer = get_tracer(__name__)
with tracer.start_as_current_span("report_task_failed"):
with tracer.start_as_current_span("report_task_failed.request") as req_span:
try:
req_span.set_attribute("http.method", "POST")
req_span.set_attribute("http.url",
'{0}/{1}/fail'.format(os.getenv('API_ENDPOINT'), task_info.get("id")))
response = session.post('{0}/{1}/fail'.format(os.getenv('API_ENDPOINT'), task_info.get("id")), json={
'accessKey': os.getenv('ACCESS_KEY'),
'reason': reason
}, timeout=10)
req_span.set_attribute("http.status_code", response.status_code)
response.raise_for_status()
req_span.set_attribute("api.response", response.text)
except requests.RequestException as e:
req_span.set_attribute("api.error", str(e))
logger.error("请求失败!", e)
return None
def upload_task_file(task_info, ffmpeg_task):
tracer = get_tracer(__name__)
with tracer.start_as_current_span("upload_task_file") as span:
logger.info("开始上传文件: %s", task_info.get("id"))
span.set_attribute("file.id", task_info.get("id"))
try:
response = session.post('{0}/{1}/uploadUrl'.format(os.getenv('API_ENDPOINT'), task_info.get("id")), json={
with tracer.start_as_current_span("upload_task_file.request_upload_url") as req_span:
req_span.set_attribute("http.method", "POST")
req_span.set_attribute("http.url",
'{0}/{1}/uploadUrl'.format(os.getenv('API_ENDPOINT'), task_info.get("id")))
response = session.post('{0}/{1}/uploadUrl'.format(os.getenv('API_ENDPOINT'), task_info.get("id")),
json={
'accessKey': os.getenv('ACCESS_KEY'),
}, timeout=10)
response.raise_for_status()
req_span.set_attribute("http.status_code", response.status_code)
except requests.RequestException as e:
span.set_attribute("api.error", str(e))
logger.error("请求失败!", e)
return False
data = response.json()
url = data.get('data', "")
logger.info("开始上传文件: %s%s", task_info.get("id"), url)
try:
with tracer.start_as_current_span("upload_task_file.start_upload_file") as upload_span:
upload_span.set_attribute("http.method", "PUT")
upload_span.set_attribute("http.url", url)
with open(ffmpeg_task.get_output_file(), 'rb') as f:
requests.put(url, data=f, headers={"Content-Type": "video/mp4"})
except requests.RequestException as e:
span.set_attribute("api.error", str(e))
logger.error("上传失败!", e)
return False
finally:

View File

@ -1,3 +1,4 @@
import json
import logging
import os
import subprocess
@ -5,36 +6,50 @@ from datetime import datetime
from typing import Optional, IO
from entity.ffmpeg import FfmpegTask, ENCODER_ARGS, PROFILE_LEVEL_ARGS
from telemetry import get_tracer
logger = logging.getLogger(__name__)
def to_annexb(file):
with get_tracer("ffmpeg").start_as_current_span("to_annexb") as span:
span.set_attribute("file.path", file)
if not os.path.exists(file):
return file
logger.info("ToAnnexb: %s", file)
ffmpeg_process = subprocess.run(["ffmpeg", "-y", "-hide_banner", "-i", file, "-c", "copy", "-bsf:v", "h264_mp4toannexb",
"-f", "mpegts", file+".ts"])
span.set_attribute("ffmpeg.args", json.dumps(ffmpeg_process.args))
logger.info("ToAnnexb: %s, returned: %s", file, ffmpeg_process.returncode)
span.set_attribute("ffmpeg.code", ffmpeg_process.returncode)
if ffmpeg_process.returncode == 0:
span.set_attribute("file.size", os.path.getsize(file+".ts"))
os.remove(file)
return file+".ts"
else:
return file
def re_encode_and_annexb(file):
with get_tracer("ffmpeg").start_as_current_span("re_encode_and_annexb") as span:
span.set_attribute("file.path", file)
if not os.path.exists(file):
return file
logger.info("ReEncodeAndAnnexb: %s", file)
ffmpeg_process = subprocess.run(["ffmpeg", "-y", "-hide_banner", "-i", file, *PROFILE_LEVEL_ARGS, *ENCODER_ARGS, "-bsf:v", "h264_mp4toannexb",
"-f", "mpegts", file +".ts"])
span.set_attribute("ffmpeg.args", json.dumps(ffmpeg_process.args))
logger.info("ReEncodeAndAnnexb: %s, returned: %s", file, ffmpeg_process.returncode)
span.set_attribute("ffmpeg.code", ffmpeg_process.returncode)
if ffmpeg_process.returncode == 0:
span.set_attribute("file.size", os.path.getsize(file+".ts"))
os.remove(file)
return file+".ts"
else:
return file
def start_render(ffmpeg_task: FfmpegTask):
tracer = get_tracer(__name__)
with tracer.start_as_current_span("start_render") as span:
span.set_attribute("ffmpeg.task", str(ffmpeg_task))
logger.info(ffmpeg_task)
if not ffmpeg_task.need_run():
ffmpeg_task.set_output_file(ffmpeg_task.input_file[0])
@ -45,17 +60,24 @@ def start_render(ffmpeg_task: FfmpegTask):
ffmpeg_task.set_output_file(ffmpeg_task.input_file[0])
return True
ffmpeg_process = subprocess.run(["ffmpeg", "-progress", "-", "-loglevel", "error", *ffmpeg_args], **subprocess_args(True))
logger.info("FINISH TASK, OUTPUT IS %s", handle_ffmpeg_output(ffmpeg_process.stdout))
span.set_attribute("ffmpeg.args", json.dumps(ffmpeg_process.args))
ffmpeg_final_out = handle_ffmpeg_output(ffmpeg_process.stdout)
span.set_attribute("ffmpeg.out", ffmpeg_final_out)
logger.info("FINISH TASK, OUTPUT IS %s", ffmpeg_final_out)
code = ffmpeg_process.returncode
span.set_attribute("ffmpeg.code", code)
if code != 0:
logger.error("FFMPEG ERROR: %s", ffmpeg_process.stderr)
return False
span.set_attribute("ffmpeg.out_file", ffmpeg_task.output_file)
try:
out_file_stat = os.stat(ffmpeg_task.output_file)
if out_file_stat.st_size < 4096:
file_size = os.path.getsize(ffmpeg_task.output_file)
span.set_attribute("file.size", file_size)
if file_size < 4096:
logger.error("FFMPEG ERROR: OUTPUT FILE IS TOO SMALL")
return False
except OSError:
span.set_attribute("file.size", 0)
logger.error("FFMPEG ERROR: OUTPUT FILE NOT FOUND")
return False
return True
@ -79,13 +101,15 @@ def handle_ffmpeg_output(stdout: Optional[bytes]) -> str:
print("[ ]Speed:", out_time, "@", speed)
return out_time+"@"+speed
def duration_str_to_float(duration_str: str) -> float:
_duration = datetime.strptime(duration_str, "%H:%M:%S.%f") - datetime(1900, 1, 1)
return _duration.total_seconds()
def probe_video_info(video_file):
tracer = get_tracer(__name__)
with tracer.start_as_current_span("probe_video_info") as span:
span.set_attribute("video.file", video_file)
# 获取宽度和高度
result = subprocess.run(
["ffprobe", '-v', 'error', '-select_streams', 'v:0', '-show_entries', 'stream=width,height:format=duration', '-of',
@ -93,12 +117,14 @@ def probe_video_info(video_file):
stderr=subprocess.STDOUT,
**subprocess_args(True)
)
span.set_attribute("ffprobe.args", json.dumps(result.args))
span.set_attribute("ffprobe.code", result.returncode)
if result.returncode != 0:
return 0, 0, 0
all_result = result.stdout.decode('utf-8').strip()
span.set_attribute("ffprobe.out", all_result)
wh, duration = all_result.split('\n')
width, height = wh.strip().split('x')
return int(width), int(height), float(duration)

View File

@ -3,6 +3,8 @@ import os
import requests
from telemetry import get_tracer
logger = logging.getLogger(__name__)
@ -13,20 +15,29 @@ def upload_to_oss(url, file_path):
:param str file_path: 文件路径
:return bool: 是否成功
"""
tracer = get_tracer(__name__)
with tracer.start_as_current_span("upload_to_oss") as span:
span.set_attribute("file.url", url)
span.set_attribute("file.path", file_path)
max_retries = 5
retries = 0
while retries < max_retries:
try:
with tracer.start_as_current_span("upload_to_oss.request") as req_span:
req_span.set_attribute("http.method", "PUT")
req_span.set_attribute("http.url", url)
with open(file_path, 'rb') as f:
response = requests.put(url, data=f, timeout=60) # 设置超时时间为1分钟
req_span.set_attribute("http.status_code", response.status_code)
if response.status_code == 200:
return True
except requests.exceptions.Timeout:
retries += 1
logger.warning(f"Upload timed out. Retrying {retries}/{max_retries}...")
except Exception as e:
logger.warning(f"Upload failed. Retrying {retries}/{max_retries}...")
retries += 1
span.set_attribute("oss.error", str(e))
logger.warning(f"Upload failed. Retrying {retries}/{max_retries}...")
return False
@ -37,6 +48,10 @@ def download_from_oss(url, file_path):
:param Union[LiteralString, str, bytes] file_path: 文件路径
:return bool: 是否成功
"""
tracer = get_tracer(__name__)
with tracer.start_as_current_span("download_from_oss") as span:
span.set_attribute("file.url", url)
span.set_attribute("file.path", file_path)
logging.info("download_from_oss: %s", url)
file_dir, file_name = os.path.split(file_path)
if file_dir:
@ -46,7 +61,11 @@ def download_from_oss(url, file_path):
retries = 0
while retries < max_retries:
try:
with tracer.start_as_current_span("download_from_oss.request") as req_span:
req_span.set_attribute("http.method", "GET")
req_span.set_attribute("http.url", url)
response = requests.get(url, timeout=15) # 设置超时时间
req_span.set_attribute("http.status_code", response.status_code)
with open(file_path, 'wb') as f:
f.write(response.content)
return True
@ -54,6 +73,7 @@ def download_from_oss(url, file_path):
retries += 1
logger.warning(f"Download timed out. Retrying {retries}/{max_retries}...")
except Exception as e:
logger.warning(f"Download failed. Retrying {retries}/{max_retries}...")
retries += 1
span.set_attribute("oss.error", str(e))
logger.warning(f"Download failed. Retrying {retries}/{max_retries}...")
return False