Compare commits

...

21 Commits

Author SHA1 Message Date
13a10b9812 修复 2025-06-04 15:38:14 +08:00
3976b72607 优化埋点 2025-05-29 10:05:35 +08:00
04ce423811 优化裁切参数获取,避免同机位多素材出问题 2025-05-29 10:01:21 +08:00
399c3d2dc6 修复裁切 2025-05-27 11:09:10 +08:00
ef3edafcd6 支持rclone多线程上传 2025-05-26 16:17:38 +08:00
6d631d873e onlyif判断 2025-05-05 19:47:54 +08:00
02dd2b72a0 根据模板定义的分辨率进行操作 2025-04-30 18:07:33 +08:00
d8bc3c8595 健康检查时同步信息 2025-04-28 17:59:44 +08:00
5d58198b7e 接口支持查询模板信息,避免使用旧模板 2025-04-28 17:57:34 +08:00
789513a0be 避免勿删模板 2025-04-28 16:32:34 +08:00
b3911839f3 app不使用批量上报 2025-04-28 16:28:27 +08:00
1c0e4ce411 添加dockerfile 2025-04-28 15:45:22 +08:00
1603be9157 尝试传入resolution,不使用scale自适应模板 2025-04-28 15:02:50 +08:00
f139fbccd7 下载模板时trace归组 2025-04-27 14:24:12 +08:00
2fb0f93886 收集ffmpeg异常,流式上传 2025-04-27 13:47:52 +08:00
9537f995a1 支持redirection 2025-04-20 14:32:06 +08:00
ec03f8180e 修改接口 2025-04-20 12:32:33 +08:00
972b6a4e4d 修改接口 2025-04-20 12:09:17 +08:00
3d810e5c5b 添加接口,添加方便测试的方法 2025-04-20 11:50:32 +08:00
a9043361ec 支持通过env获取encoder args 2025-04-20 11:02:23 +08:00
740a3c7a63 完善requirements.txt 2025-04-20 10:54:49 +08:00
13 changed files with 216 additions and 85 deletions

7
.env
View File

@ -1,4 +1,9 @@
TEMPLATE_DIR=template/
API_ENDPOINT=http://127.0.0.1:8030/task/v1
API_ENDPOINT=https://zhentuai.com/task/v1
ACCESS_KEY=TEST_ACCESS_KEY
TEMP_DIR=tmp/
#REDIRECT_TO_URL=https://worker-renderworker-re-kekuflqjxx.cn-shanghai.fcapp.run/
# QSV
ENCODER_ARGS="-c:v h264_qsv -global_quality 28 -look_ahead 1"
# NVENC
#ENCODER_ARGS="-c:v h264_nvenc -cq:v 24 -preset:v p7 -tune:v hq -profile:v high"

21
Dockerfile Normal file
View File

@ -0,0 +1,21 @@
FROM linuxserver/ffmpeg:7.1.1
LABEL authors="Jerry Yan"
RUN sed -i 's@//.*archive.ubuntu.com@//mirrors.ustc.edu.cn@g' /etc/apt/sources.list && \
sed -i 's/security.ubuntu.com/mirrors.ustc.edu.cn/g' /etc/apt/sources.list
RUN apt-get update && \
apt-get install -y --no-install-recommends \
python3-pip \
python3-dev \
python3-setuptools \
python3-wheel \
python3-venv
RUN apt-get clean && \
rm -rf /var/lib/apt/lists/*
COPY . /app/
RUN python3 -m venv /app/venv
RUN /app/venv/bin/python -m pip config set global.index-url https://mirrors.ustc.edu.cn/pypi/simple
RUN /app/venv/bin/python -m pip install -r /app/requirements.txt
WORKDIR /app
ENTRYPOINT ["/app/venv/bin/python", "app.py"]

40
app.py Normal file
View File

@ -0,0 +1,40 @@
import time
import flask
import config
import biz.task
import template
from telemetry import init_opentelemetry
from template import load_local_template
from util import api
load_local_template()
import logging
LOGGER = logging.getLogger(__name__)
init_opentelemetry(batch=False)
app = flask.Flask(__name__)
@app.get('/health/check')
def health_check():
return api.sync_center()
@app.post('/')
def do_nothing():
return "NOOP"
@app.post('/<task_id>')
def do_task(task_id):
task_info = api.get_task_info(task_id)
local_template_info = template.get_template_def(task_info.get("templateId"))
template_info = api.get_template_info(task_info.get("templateId"))
if local_template_info:
if local_template_info.get("updateTime") != template_info.get("updateTime"):
template.download_template(task_info.get("templateId"))
biz.task.start_task(task_info)
return "OK"
if __name__ == '__main__':
app.run(host="0.0.0.0", port=9998)

View File

@ -22,19 +22,21 @@ def parse_ffmpeg_task(task_info, template_info):
task_params_str = task_info.get("taskParams", "{}")
span.set_attribute("task_params", task_params_str)
task_params = json.loads(task_params_str)
task_params_orig = json.loads(task_params_str)
for part in template_info.get("video_parts"):
source = parse_video(part.get('source'), task_params, template_info)
source, ext_data = parse_video(part.get('source'), task_params, template_info)
if not source:
logger.warning("no video found for part: " + str(part))
continue
only_if = part.get('only_if', '')
if only_if:
if not check_placeholder_exist(only_if, task_params):
if not check_placeholder_exist(only_if, task_params_orig):
logger.info("because only_if exist, placeholder: %s not exist, skip part: %s", only_if, part)
continue
sub_ffmpeg_task = FfmpegTask(source)
sub_ffmpeg_task.resolution = template_info.get("video_size", "")
sub_ffmpeg_task.annexb = True
sub_ffmpeg_task.ext_data = find_placeholder_params(part.get('source'), task_params) or {}
sub_ffmpeg_task.ext_data = ext_data or {}
sub_ffmpeg_task.frame_rate = template_info.get("frame_rate", 25)
sub_ffmpeg_task.center_cut = part.get("crop_mode", None)
for effect in part.get('effects', []):
@ -48,12 +50,14 @@ def parse_ffmpeg_task(task_info, template_info):
tasks.append(sub_ffmpeg_task)
output_file = "out_" + str(time.time()) + ".mp4"
task = FfmpegTask(tasks, output_file=output_file)
task.resolution = template_info.get("video_size", "")
overall = template_info.get("overall_template")
task.center_cut = template_info.get("crop_mode", None)
task.frame_rate = template_info.get("frame_rate", 25)
if overall.get('source', ''):
source = parse_video(overall.get('source'), task_params, template_info)
source, ext_data = parse_video(overall.get('source'), task_params, template_info)
task.add_inputs(source)
task.ext_data = ext_data or {}
for effect in overall.get('effects', []):
task.add_effect(effect)
for lut in overall.get('filters', []):
@ -65,37 +69,24 @@ def parse_ffmpeg_task(task_info, template_info):
return task
def find_placeholder_params(source, task_params):
if source.startswith('PLACEHOLDER_'):
placeholder_id = source.replace('PLACEHOLDER_', '')
new_sources = task_params.get(placeholder_id, [])
if type(new_sources) is list:
if len(new_sources) == 0:
logger.debug("no video found for placeholder: " + placeholder_id)
return {}
else:
return new_sources[0]
return {}
def parse_video(source, task_params, template_info):
print(source)
if source.startswith('PLACEHOLDER_'):
placeholder_id = source.replace('PLACEHOLDER_', '')
new_sources = task_params.get(placeholder_id, [])
_pick_source = {}
if type(new_sources) is list:
if len(new_sources) == 0:
logger.debug("no video found for placeholder: " + placeholder_id)
return None
return None, _pick_source
else:
_pick_source = new_sources.pop(0)
new_sources = _pick_source.get("url")
if new_sources.startswith("http"):
_, source_name = os.path.split(new_sources)
oss.download_from_oss(new_sources, source_name)
return source_name
return new_sources
return os.path.join(template_info.get("local_path"), source)
return source_name, _pick_source
return new_sources, _pick_source
return os.path.join(template_info.get("local_path"), source), None
def check_placeholder_exist(placeholder_id, task_params):
@ -118,6 +109,11 @@ def start_ffmpeg_task(ffmpeg_task):
if not result:
return False
ffmpeg_task.correct_task_type()
span.set_attribute("task.type", ffmpeg_task.task_type)
span.set_attribute("task.center_cut", str(ffmpeg_task.center_cut))
span.set_attribute("task.frame_rate", ffmpeg_task.frame_rate)
span.set_attribute("task.resolution", str(ffmpeg_task.resolution))
span.set_attribute("task.ext_data", json.dumps(ffmpeg_task.ext_data))
result = ffmpeg.start_render(ffmpeg_task)
if not result:
span.set_status(Status(StatusCode.ERROR))
@ -130,7 +126,7 @@ def clear_task_tmp_file(ffmpeg_task):
for task in ffmpeg_task.analyze_input_render_tasks():
clear_task_tmp_file(task)
try:
if "template" not in ffmpeg_task.get_output_file():
if os.getenv("TEMPLATE_DIR") not in ffmpeg_task.get_output_file():
os.remove(ffmpeg_task.get_output_file())
logger.info("delete tmp file: " + ffmpeg_task.get_output_file())
else:

View File

@ -23,6 +23,9 @@ def start_task(task_info):
span.set_status(Status(StatusCode.ERROR))
return api.report_task_failed(task_info)
width, height, duration = probe_video_info(ffmpeg_task)
span.set_attribute("probe.width", width)
span.set_attribute("probe.height", height)
span.set_attribute("probe.duration", duration)
# 音频淡出
new_fn = fade_out_audio(ffmpeg_task.get_output_file(), duration)
ffmpeg_task.set_output_file(new_fn)
@ -38,3 +41,4 @@ def start_task(task_info):
"duration": duration
})
span.set_status(Status(StatusCode.OK))
return None

View File

@ -1,10 +1,11 @@
import json
import os
import time
import uuid
from typing import Any
DEFAULT_ARGS = ("-shortest",)
ENCODER_ARGS = ("-c:v", "h264_qsv", "-global_quality", "28", "-look_ahead", "1", )
ENCODER_ARGS = ("-c:v", "h264", ) if not os.getenv("ENCODER_ARGS", False) else os.getenv("ENCODER_ARGS", "").split(" ")
VIDEO_ARGS = ("-profile:v", "high", "-level:v", "4", )
AUDIO_ARGS = ("-c:a", "aac", "-b:a", "128k", "-ar", "48000", "-ac", "2", )
MUTE_AUDIO_INPUT = ("-f", "lavfi", "-i", "anullsrc=cl=stereo:r=48000", )
@ -32,6 +33,7 @@ class FfmpegTask(object):
self.mute = True
self.speed = 1
self.frame_rate = 25
self.resolution = None
self.subtitles = []
self.luts = []
self.audios = []
@ -207,9 +209,9 @@ class FfmpegTask(object):
_mid_out_str = "[eff_m]"
_end_out_str = "[eff_e]"
filter_args.append(f"{video_output_str}split=3{_start_out_str}{_mid_out_str}{_end_out_str}")
filter_args.append(f"{_start_out_str}select=lt(n\,{int(start*self.frame_rate)}){_start_out_str}")
filter_args.append(f"{_end_out_str}select=gt(n\,{int(start*self.frame_rate)}){_end_out_str}")
filter_args.append(f"{_mid_out_str}select=eq(n\,{int(start*self.frame_rate)}){_mid_out_str}")
filter_args.append(f"{_start_out_str}select=lt(n\\,{int(start*self.frame_rate)}){_start_out_str}")
filter_args.append(f"{_end_out_str}select=gt(n\\,{int(start*self.frame_rate)}){_end_out_str}")
filter_args.append(f"{_mid_out_str}select=eq(n\\,{int(start*self.frame_rate)}){_mid_out_str}")
filter_args.append(f"{_mid_out_str}tpad=start_mode=clone:start_duration={duration:.4f}{_mid_out_str}")
if rotate_deg != 0:
filter_args.append(f"{_mid_out_str}rotate=PI*{rotate_deg}/360{_mid_out_str}")
@ -234,10 +236,16 @@ class FfmpegTask(object):
...
for lut in self.luts:
filter_args.append(f"{video_output_str}lut3d=file={lut}{video_output_str}")
if self.resolution:
filter_args.append(f"{video_output_str}scale={self.resolution.replace('x', ':')}[v]")
video_output_str = "[v]"
for overlay in self.overlays:
input_index = input_args.count("-i")
input_args.append("-i")
input_args.append(overlay)
if os.getenv("OLD_FFMPEG"):
filter_args.append(f"{video_output_str}[{input_index}:v]scale2ref=iw:ih[v]")
else:
filter_args.append(f"{video_output_str}[{input_index}:v]scale=rw:rh[v]")
filter_args.append(f"[v][{input_index}:v]overlay=1:eof_action=endall[v]")
video_output_str = "[v]"
@ -338,6 +346,7 @@ class FfmpegTask(object):
if self.input_file[0] == self.get_output_file():
return []
return args + ["-i", self.input_file[0]] + ["-c", "copy", self.get_output_file()]
return []
def set_output_file(self, file=None):
if file is None:

View File

@ -1,7 +1,7 @@
from time import sleep
import biz.task
import config
import biz.task
from telemetry import init_opentelemetry
from template import load_local_template
from util import api

View File

@ -1,3 +1,7 @@
requests~=2.32.3
psutil~=6.1.0
python-dotenv~=1.0.1
opentelemetry-api~=1.30.0
opentelemetry-sdk~=1.30.0
opentelemetry-exporter-otlp~=1.30.0
flask~=3.1.0

View File

@ -5,14 +5,14 @@ from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter as OTLPSpanHttpExporter
from opentelemetry.sdk.resources import DEPLOYMENT_ENVIRONMENT, HOST_NAME, Resource, SERVICE_NAME, SERVICE_VERSION
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.trace.export import BatchSpanProcessor, SimpleSpanProcessor
def get_tracer(name):
return trace.get_tracer(name)
# 初始化 OpenTelemetry
def init_opentelemetry():
def init_opentelemetry(batch=True):
# 设置服务名、主机名
resource = Resource(attributes={
SERVICE_NAME: "RENDER_WORKER",
@ -22,9 +22,14 @@ def init_opentelemetry():
})
# 使用HTTP协议上报
if batch:
span_processor = BatchSpanProcessor(OTLPSpanHttpExporter(
endpoint="https://oltp.jerryyan.top/v1/traces",
))
else:
span_processor = SimpleSpanProcessor(OTLPSpanHttpExporter(
endpoint="https://oltp.jerryyan.top/v1/traces",
))
trace_provider = TracerProvider(resource=resource, active_span_processor=span_processor)
trace.set_tracer_provider(trace_provider)

View File

@ -2,6 +2,7 @@ import json
import os
import logging
from telemetry import get_tracer
from util import api, oss
TEMPLATES = {}
@ -75,6 +76,8 @@ def get_template_def(template_id):
return TEMPLATES.get(template_id)
def download_template(template_id):
tracer = get_tracer(__name__)
with tracer.start_as_current_span("download_template"):
template_info = api.get_template_info(template_id)
if not os.path.isdir(template_info['local_path']):
os.makedirs(template_info['local_path'])

View File

@ -1,6 +1,7 @@
import json
import logging
import os
import threading
import requests
from opentelemetry.trace import Status, StatusCode
@ -44,6 +45,13 @@ def sync_center():
tasks = []
templates = []
logger.warning("获取任务失败")
if os.getenv("REDIRECT_TO_URL", False) != False:
for task in tasks:
_sess = requests.Session()
logger.info("重定向任务【%s】至配置的地址:%s", task.get("id"), os.getenv("REDIRECT_TO_URL"))
url = f"{os.getenv('REDIRECT_TO_URL')}{task.get('id')}"
threading.Thread(target=requests.post, args=(url,)).start()
return []
for template in templates:
template_id = template.get('id', '')
if template_id:
@ -231,3 +239,18 @@ def upload_task_file(task_info, ffmpeg_task):
url = data.get('data', "")
logger.info("开始上传文件: %s%s", task_info.get("id"), url)
return oss.upload_to_oss(url, ffmpeg_task.get_output_file())
def get_task_info(id):
try:
response = session.get(os.getenv('API_ENDPOINT') + "/" + id + "/info", params={
'accessKey': os.getenv('ACCESS_KEY'),
}, timeout=10)
response.raise_for_status()
except requests.RequestException as e:
logger.error("请求失败!", e)
return []
data = response.json()
logger.debug("获取任务结果:【%s", data)
if data.get('code', 0) == 200:
return data.get('data', {})

View File

@ -53,7 +53,7 @@ def start_render(ffmpeg_task: FfmpegTask):
ffmpeg_task.set_output_file(ffmpeg_task.input_file[0])
span.set_status(Status(StatusCode.OK))
return True
ffmpeg_process = subprocess.run(["ffmpeg", "-progress", "-", "-loglevel", "error", *ffmpeg_args], **subprocess_args(True))
ffmpeg_process = subprocess.run(["ffmpeg", "-progress", "-", "-loglevel", "error", *ffmpeg_args], stderr=subprocess.PIPE, **subprocess_args(True))
span.set_attribute("ffmpeg.args", json.dumps(ffmpeg_process.args))
logger.info(" ".join(ffmpeg_process.args))
ffmpeg_final_out = handle_ffmpeg_output(ffmpeg_process.stdout)
@ -62,7 +62,8 @@ def start_render(ffmpeg_task: FfmpegTask):
code = ffmpeg_process.returncode
span.set_attribute("ffmpeg.code", code)
if code != 0:
span.set_status(Status(StatusCode.ERROR))
span.set_attribute("ffmpeg.err", str(ffmpeg_process.stderr))
span.set_status(Status(StatusCode.ERROR, "FFMPEG异常退出"))
logger.error("FFMPEG ERROR: %s", ffmpeg_process.stderr)
return False
span.set_attribute("ffmpeg.out_file", ffmpeg_task.output_file)
@ -70,12 +71,13 @@ def start_render(ffmpeg_task: FfmpegTask):
file_size = os.path.getsize(ffmpeg_task.output_file)
span.set_attribute("file.size", file_size)
if file_size < 4096:
span.set_status(Status(StatusCode.ERROR))
span.set_status(Status(StatusCode.ERROR, "输出文件过小"))
logger.error("FFMPEG ERROR: OUTPUT FILE IS TOO SMALL")
return False
except OSError:
except OSError as e:
span.set_attribute("file.size", 0)
span.set_status(Status(StatusCode.ERROR))
span.set_attribute("file.error", e.strerror)
span.set_status(Status(StatusCode.ERROR, "输出文件不存在"))
logger.error("FFMPEG ERROR: OUTPUT FILE NOT FOUND")
return False
span.set_status(Status(StatusCode.OK))

View File

@ -23,6 +23,25 @@ def upload_to_oss(url, file_path):
span.set_attribute("file.size", os.path.getsize(file_path))
max_retries = 5
retries = 0
if os.getenv("UPLOAD_METHOD") == "rclone":
with tracer.start_as_current_span("rclone_to_oss") as r_span:
replace_map = os.getenv("RCLONE_REPLACE_MAP")
r_span.set_attribute("rclone.replace_map", replace_map)
if replace_map != "":
replace_list = [i.split("|", 1) for i in replace_map.split(",")]
new_url = url
for (_src, _dst) in replace_list:
new_url = new_url.replace(_src, _dst)
new_url = new_url.split("?", 1)[0]
r_span.set_attribute("rclone.target_dir", new_url)
if new_url != url:
result = os.system(f"rclone copyto --no-check-dest --ignore-existing --multi-thread-chunk-size 32M --multi-thread-streams 8 {file_path} {new_url}")
r_span.set_attribute("rclone.result", result)
if result == 0:
span.set_status(Status(StatusCode.OK))
return True
else:
span.set_status(Status(StatusCode.ERROR))
while retries < max_retries:
with tracer.start_as_current_span("upload_to_oss.request") as req_span:
req_span.set_attribute("http.retry_count", retries)
@ -30,7 +49,7 @@ def upload_to_oss(url, file_path):
req_span.set_attribute("http.method", "PUT")
req_span.set_attribute("http.url", url)
with open(file_path, 'rb') as f:
response = requests.put(url, data=f, timeout=60, headers={"Content-Type": "video/mp4"})
response = requests.put(url, data=f, stream=True, timeout=60, headers={"Content-Type": "video/mp4"})
req_span.set_attribute("http.status_code", response.status_code)
req_span.set_attribute("http.response", response.text)
response.raise_for_status()