9 Commits

Author SHA1 Message Date
c61f6d7521 refactor(ffmpeg): 优化FFmpeg任务处理逻辑
- 添加as_completed导入以支持并发任务执行
- 实现线程池并发处理子任务,提高执行效率
- 添加任务数量监控指标
- 实现快速失败机制,及时取消剩余任务
- 增强异常处理和错误日志记录
- 添加最大工作线程数参数配置
2026-01-01 00:09:31 +08:00
4ef57a208e feat(oss): 添加 HTTP_REPLACE_MAP 环境变量支持
- 实现 _apply_http_replace_map 函数用于 URL 替换
- 在上传文件时应用 HTTP_REPLACE_MAP 环境变量替换 URL
- 添加 http_url 属性到 trace span 中
- 支持通过环境变量配置 URL 替换规则
2025-12-31 17:28:38 +08:00
a415d8571d chore(constant): 更新软件版本号至0.0.8
- 将 SOFTWARE_VERSION 从 0.0.6 更新到 0.0.8

feat(util/oss): 支持自定义rclone配置文件路径

- 新增读取环境变量 RCLONE_CONFIG_FILE 来指定配置文件
- 当 RCLONE_CONFIG_FILE 为空时默认使用 rclone.conf
- 在调用 rclone 命令时加入 --config 参数以应用指定配置文件
2025-12-12 16:00:34 +08:00
4af52d5a54 fix(ffmpeg): 修复视频裁剪时间戳问题
- 在 trim 过滤器后添加 setpts 过滤器以重置时间戳
- 修复 skip、tail 和 show 效果的时间戳计算问题
- 确保裁剪后的视频片段时间戳从零开始
- 避免因时间戳不连续导致的播放问题
2025-12-09 18:07:48 +08:00
d7704005b6 feat(entity/ffmpeg.py): 添加grid4效果支持在ffmpeg.py中增加了对grid4效果的支持。该功能允许用户通过指定参数来创建一个四宫格视频布局,每个格子显示不同的视频片段,并且可以设置延迟时间以实现更丰富的视觉效果。具体改动包括:
- 解析`grid4`效果的参数,如果未提供则默认为1。
- 根据提供的或默认的分辨率分割视频流为四个部分。
-为每个分割后的视频流应用缩放和时间延迟处理。
- 创建黑色背景并使用overlay滤镜将处理后的视频流放置于正确的位置上,形成最终的四宫格布局。
2025-09-18 17:01:03 +08:00
f85ccea933 feat(constant): 更新软件版本号至 0.0.6- 在 constant/__init__.py 文件中将 SOFTWARE_VERSION 从 '0.0.5' 修改为 '0.0.6'
- 在 entity/ffmpeg.py 文件中添加了新的视频效果处理逻辑,支持显示特定时长的视频片段
2025-09-18 09:42:57 +08:00
0c7181911e refactor(entity): 优化视频变速和缩放效果处理
-将视频变速实现从 minterpolate 改为使用 setpts,避免 PTS 冲突问题
-简化缩放效果处理逻辑
2025-09-12 18:01:41 +08:00
cf43f6379e feat(ffmpeg): 使用 minterpolate 替代 fps 调整视频速度
- 将视频变速功能从直接调整帧率改为使用 minterpolate 滤镜- 通过设置 fps 和 mi_mode 参数实现平滑的视频慢放效果
- 解决了直接调整帧率可能导致的 PTS 冲突问题
2025-09-12 16:50:53 +08:00
ce8854404b fix(entity): 修复视频慢放时 PTS 冲突问题
- 修改视频变速功能,通过改变帧率实现慢放效果
-避免使用 setpts滤镜导致的 PTS 冲突
- 优化代码结构,提高可读性和可维护性
2025-09-12 14:54:01 +08:00
5 changed files with 115 additions and 37 deletions

3
.gitignore vendored
View File

@@ -31,4 +31,5 @@ target/
.venv
venv/
cython_debug/
.env
.env
.serena

View File

@@ -1,7 +1,7 @@
import json
import os.path
import time
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import ThreadPoolExecutor, as_completed
from opentelemetry.trace import Status, StatusCode
@@ -129,13 +129,31 @@ def check_placeholder_exist_with_count(placeholder_id, task_params, required_cou
return False
def start_ffmpeg_task(ffmpeg_task):
def start_ffmpeg_task(ffmpeg_task, max_workers=4):
tracer = get_tracer(__name__)
with tracer.start_as_current_span("start_ffmpeg_task") as span:
for task in ffmpeg_task.analyze_input_render_tasks():
result = start_ffmpeg_task(task)
if not result:
return False
sub_tasks = list(ffmpeg_task.analyze_input_render_tasks())
if sub_tasks:
span.set_attribute("sub_tasks.count", len(sub_tasks))
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {executor.submit(start_ffmpeg_task, task, max_workers): task
for task in sub_tasks}
for future in as_completed(futures):
try:
if not future.result():
# 快速失败:取消剩余任务
for f in futures:
f.cancel()
span.set_status(Status(StatusCode.ERROR))
return False
except Exception as e:
logger.error("子任务执行失败: %s", e)
for f in futures:
f.cancel()
span.set_status(Status(StatusCode.ERROR))
return False
ffmpeg_task.correct_task_type()
span.set_attribute("task.type", ffmpeg_task.task_type)
span.set_attribute("task.center_cut", str(ffmpeg_task.center_cut))

View File

@@ -6,4 +6,4 @@ SUPPORT_FEATURE = (
'rclone_upload',
'custom_re_encode',
)
SOFTWARE_VERSION = '0.0.5'
SOFTWARE_VERSION = '0.0.8'

View File

@@ -271,7 +271,7 @@ class FfmpegTask(object):
if param == '':
param = "1"
if param != "1":
# 视频变速
# 视频变速:使用fps实现,避免PTS冲突
effect_index += 1
filter_args.append(f"{video_output_str}setpts={param}*PTS[v_eff{effect_index}]")
video_output_str = f"[v_eff{effect_index}]"
@@ -280,29 +280,24 @@ class FfmpegTask(object):
if param == '':
continue
_split = param.split(",")
if len(_split) < 3:
if len(_split) < 2:
continue
try:
start_time = float(_split[0])
zoom_factor = float(_split[1])
duration = float(_split[2])
if start_time < 0:
start_time = 0
if duration < 0:
duration = 0
if zoom_factor <= 0:
zoom_factor = 1
except (ValueError, IndexError):
start_time = 0
duration = 0
zoom_factor = 1
if zoom_factor == 1:
continue
effect_index += 1
# 获取缩放中心点(从pos_json或使用默认中心)
center_x = "iw/2"
center_y = "ih/2"
left_x = f"iw/(2*{zoom_factor})"
top_y = f"ih/(2*{zoom_factor})"
pos_json_str = self.ext_data.get('posJson', '{}')
try:
pos_json = json.loads(pos_json_str) if pos_json_str != '{}' else {}
@@ -318,23 +313,14 @@ class FfmpegTask(object):
center_x_ratio = (_f_x + _f_x2) / (2 * _v_w)
center_y_ratio = (_f_y + _f_y2) / (2 * _v_h)
# 转换为视频坐标系统
center_x = f"iw*{center_x_ratio:.6f}"
center_y = f"ih*{center_y_ratio:.6f}"
left_x = f"iw*({center_x_ratio:.6f}-1/(2*{zoom_factor}))"
top_y = f"ih*({center_y_ratio:.6f}-1/(2*{zoom_factor}))"
except Exception as e:
# 解析失败使用默认中心
pass
if duration == 0:
# 静态缩放(整个视频时长)
x_expr = f"({center_x})-(ow*zoom)/2"
y_expr = f"({center_y})-(oh*zoom)/2"
filter_args.append(f"{video_output_str}trim=start={start_time},zoompan=z={zoom_factor}:x={x_expr}:y={y_expr}:d=1[v_eff{effect_index}]")
else:
# 动态缩放(指定时间段内)
zoom_expr = f"if(between(t\\,{start_time}\\,{start_time + duration})\\,{zoom_factor}\\,1)"
x_expr = f"({center_x})-(ow*zoom)/2"
y_expr = f"({center_y})-(oh*zoom)/2"
filter_args.append(f"{video_output_str}zoompan=z={zoom_expr}:x={x_expr}:y={y_expr}:d=1[v_eff{effect_index}]")
# 静态缩放(整个视频时长)
filter_args.append(f"{video_output_str}scale={zoom_factor}*iw:{zoom_factor}*ih,crop=iw/{zoom_factor}:ih/{zoom_factor}:{left_x}:{top_y}[v_eff{effect_index}]")
video_output_str = f"[v_eff{effect_index}]"
elif effect.startswith("skip:"):
param = effect.split(":", 2)[1]
@@ -343,7 +329,7 @@ class FfmpegTask(object):
skip_seconds = float(param)
if skip_seconds > 0:
effect_index += 1
filter_args.append(f"{video_output_str}trim=start={skip_seconds}[v_eff{effect_index}]")
filter_args.append(f"{video_output_str}trim=start={skip_seconds},setpts=PTS-STARTPTS[v_eff{effect_index}]")
video_output_str = f"[v_eff{effect_index}]"
elif effect.startswith("tail:"):
param = effect.split(":", 2)[1]
@@ -356,8 +342,53 @@ class FfmpegTask(object):
# 使用reverse+trim+reverse的方法来精确获取最后N秒
filter_args.append(f"{video_output_str}reverse[v_rev{effect_index}]")
filter_args.append(f"[v_rev{effect_index}]trim=duration={tail_seconds}[v_trim{effect_index}]")
filter_args.append(f"[v_trim{effect_index}]reverse[v_eff{effect_index}]")
filter_args.append(f"[v_trim{effect_index}]reverse,setpts=PTS-STARTPTS[v_eff{effect_index}]")
video_output_str = f"[v_eff{effect_index}]"
elif effect.startswith("show:"):
param = effect.split(":", 2)[1]
if param == '':
param = "0"
show_seconds = float(param)
if show_seconds > 0:
effect_index += 1
filter_args.append(f"{video_output_str}trim=end={show_seconds},setpts=PTS-STARTPTS[v_eff{effect_index}]")
video_output_str = f"[v_eff{effect_index}]"
elif effect.startswith("grid4:"):
param = effect.split(":", 2)[1]
if param == '':
param = "1"
delay_seconds = float(param)
effect_index += 1
# 获取分辨率,如果没有设置则使用默认值
if self.resolution:
width, height = self.resolution.split('x')
else:
width, height = "1920", "1080" # 默认分辨率
# 计算四宫格中每个象限的大小
grid_width = int(width) // 2
grid_height = int(height) // 2
# 分割视频流为4份
filter_args.append(f"{video_output_str}split=4[grid_v1_{effect_index}][grid_v2_{effect_index}][grid_v3_{effect_index}][grid_v4_{effect_index}]")
# 创建黑色背景
filter_args.append(f"color=black:size={width}x{height}:duration=1[bg_{effect_index}]")
# 缩放每个视频流到绝对尺寸
filter_args.append(f"[grid_v1_{effect_index}]scale={grid_width}:{grid_height}[v1_scaled_{effect_index}]")
filter_args.append(f"[grid_v2_{effect_index}]scale={grid_width}:{grid_height},tpad=start_duration={delay_seconds}[v2_scaled_{effect_index}]")
filter_args.append(f"[grid_v3_{effect_index}]scale={grid_width}:{grid_height},tpad=start_duration={delay_seconds*2}[v3_scaled_{effect_index}]")
filter_args.append(f"[grid_v4_{effect_index}]scale={grid_width}:{grid_height},tpad=start_duration={delay_seconds*3}[v4_scaled_{effect_index}]")
# 使用overlay将四个视频流叠加到四个位置
filter_args.append(f"[bg_{effect_index}][v1_scaled_{effect_index}]overlay=0:0:shortest=1[grid_step1_{effect_index}]")
filter_args.append(f"[grid_step1_{effect_index}][v2_scaled_{effect_index}]overlay=w/2:0:shortest=1[grid_step2_{effect_index}]")
filter_args.append(f"[grid_step2_{effect_index}][v3_scaled_{effect_index}]overlay=0:h/2:shortest=1[grid_step3_{effect_index}]")
filter_args.append(f"[grid_step3_{effect_index}][v4_scaled_{effect_index}]overlay=w/2:h/2:shortest=1[v_eff{effect_index}]")
video_output_str = f"[v_eff{effect_index}]"
...
if self.resolution:
filter_args.append(f"{video_output_str}scale={self.resolution.replace('x', ':')}[v]")

View File

@@ -10,6 +10,24 @@ from telemetry import get_tracer
logger = logging.getLogger(__name__)
def _apply_http_replace_map(url):
"""
应用 HTTP_REPLACE_MAP 环境变量替换 URL
:param str url: 原始 URL
:return str: 替换后的 URL
"""
replace_map = os.getenv("HTTP_REPLACE_MAP", "")
if not replace_map:
return url
replace_list = [i.split("|", 1) for i in replace_map.split(",")]
new_url = url
for (_src, _dst) in replace_list:
new_url = new_url.replace(_src, _dst)
if new_url != url:
logger.debug(f"HTTP_REPLACE_MAP: {url} -> {new_url}")
return new_url
def upload_to_oss(url, file_path):
"""
使用签名URL上传文件到OSS
@@ -27,6 +45,10 @@ def upload_to_oss(url, file_path):
if os.getenv("UPLOAD_METHOD") == "rclone":
with tracer.start_as_current_span("rclone_to_oss") as r_span:
replace_map = os.getenv("RCLONE_REPLACE_MAP")
config_file = os.getenv("RCLONE_CONFIG_FILE")
rclone_config = ""
if config_file != "":
rclone_config = f"--config {config_file}"
r_span.set_attribute("rclone.replace_map", replace_map)
if replace_map != "":
replace_list = [i.split("|", 1) for i in replace_map.split(",")]
@@ -36,21 +58,24 @@ def upload_to_oss(url, file_path):
new_url = new_url.split("?", 1)[0]
r_span.set_attribute("rclone.target_dir", new_url)
if new_url != url:
result = os.system(f"rclone copyto --no-check-dest --ignore-existing --multi-thread-chunk-size 8M --multi-thread-streams 8 {file_path} {new_url}")
result = os.system(f"rclone copyto --no-check-dest --ignore-existing --multi-thread-chunk-size 8M --multi-thread-streams 8 {rclone_config} {file_path} {new_url}")
r_span.set_attribute("rclone.result", result)
if result == 0:
span.set_status(Status(StatusCode.OK))
return True
else:
span.set_status(Status(StatusCode.ERROR))
# 应用 HTTP_REPLACE_MAP 替换 URL
http_url = _apply_http_replace_map(url)
span.set_attribute("file.http_url", http_url)
while retries < max_retries:
with tracer.start_as_current_span("upload_to_oss.request") as req_span:
req_span.set_attribute("http.retry_count", retries)
try:
req_span.set_attribute("http.method", "PUT")
req_span.set_attribute("http.url", url)
req_span.set_attribute("http.url", http_url)
with open(file_path, 'rb') as f:
response = requests.put(url, data=f, stream=True, timeout=60, headers={"Content-Type": "video/mp4"})
response = requests.put(http_url, data=f, stream=True, timeout=60, headers={"Content-Type": "video/mp4"})
req_span.set_attribute("http.status_code", response.status_code)
req_span.set_attribute("http.response", response.text)
response.raise_for_status()
@@ -97,6 +122,9 @@ def download_from_oss(url, file_path, skip_if_exist=None):
if file_dir:
if not os.path.exists(file_dir):
os.makedirs(file_dir)
# 应用 HTTP_REPLACE_MAP 替换 URL
http_url = _apply_http_replace_map(url)
span.set_attribute("file.http_url", http_url)
max_retries = 5
retries = 0
while retries < max_retries:
@@ -104,8 +132,8 @@ def download_from_oss(url, file_path, skip_if_exist=None):
req_span.set_attribute("http.retry_count", retries)
try:
req_span.set_attribute("http.method", "GET")
req_span.set_attribute("http.url", url)
response = requests.get(url, timeout=15) # 设置超时时间
req_span.set_attribute("http.url", http_url)
response = requests.get(http_url, timeout=15) # 设置超时时间
req_span.set_attribute("http.status_code", response.status_code)
with open(file_path, 'wb') as f:
f.write(response.content)