Compare commits

..

4 Commits

Author SHA1 Message Date
c61f6d7521 refactor(ffmpeg): 优化FFmpeg任务处理逻辑
- 添加as_completed导入以支持并发任务执行
- 实现线程池并发处理子任务,提高执行效率
- 添加任务数量监控指标
- 实现快速失败机制,及时取消剩余任务
- 增强异常处理和错误日志记录
- 添加最大工作线程数参数配置
2026-01-01 00:09:31 +08:00
4ef57a208e feat(oss): 添加 HTTP_REPLACE_MAP 环境变量支持
- 实现 _apply_http_replace_map 函数用于 URL 替换
- 在上传文件时应用 HTTP_REPLACE_MAP 环境变量替换 URL
- 添加 http_url 属性到 trace span 中
- 支持通过环境变量配置 URL 替换规则
2025-12-31 17:28:38 +08:00
a415d8571d chore(constant): 更新软件版本号至0.0.8
- 将 SOFTWARE_VERSION 从 0.0.6 更新到 0.0.8

feat(util/oss): 支持自定义rclone配置文件路径

- 新增读取环境变量 RCLONE_CONFIG_FILE 来指定配置文件
- 当 RCLONE_CONFIG_FILE 为空时默认使用 rclone.conf
- 在调用 rclone 命令时加入 --config 参数以应用指定配置文件
2025-12-12 16:00:34 +08:00
4af52d5a54 fix(ffmpeg): 修复视频裁剪时间戳问题
- 在 trim 过滤器后添加 setpts 过滤器以重置时间戳
- 修复 skip、tail 和 show 效果的时间戳计算问题
- 确保裁剪后的视频片段时间戳从零开始
- 避免因时间戳不连续导致的播放问题
2025-12-09 18:07:48 +08:00
4 changed files with 61 additions and 15 deletions

View File

@@ -1,7 +1,7 @@
import json import json
import os.path import os.path
import time import time
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor, as_completed
from opentelemetry.trace import Status, StatusCode from opentelemetry.trace import Status, StatusCode
@@ -129,13 +129,31 @@ def check_placeholder_exist_with_count(placeholder_id, task_params, required_cou
return False return False
def start_ffmpeg_task(ffmpeg_task): def start_ffmpeg_task(ffmpeg_task, max_workers=4):
tracer = get_tracer(__name__) tracer = get_tracer(__name__)
with tracer.start_as_current_span("start_ffmpeg_task") as span: with tracer.start_as_current_span("start_ffmpeg_task") as span:
for task in ffmpeg_task.analyze_input_render_tasks(): sub_tasks = list(ffmpeg_task.analyze_input_render_tasks())
result = start_ffmpeg_task(task)
if not result: if sub_tasks:
return False span.set_attribute("sub_tasks.count", len(sub_tasks))
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {executor.submit(start_ffmpeg_task, task, max_workers): task
for task in sub_tasks}
for future in as_completed(futures):
try:
if not future.result():
# 快速失败:取消剩余任务
for f in futures:
f.cancel()
span.set_status(Status(StatusCode.ERROR))
return False
except Exception as e:
logger.error("子任务执行失败: %s", e)
for f in futures:
f.cancel()
span.set_status(Status(StatusCode.ERROR))
return False
ffmpeg_task.correct_task_type() ffmpeg_task.correct_task_type()
span.set_attribute("task.type", ffmpeg_task.task_type) span.set_attribute("task.type", ffmpeg_task.task_type)
span.set_attribute("task.center_cut", str(ffmpeg_task.center_cut)) span.set_attribute("task.center_cut", str(ffmpeg_task.center_cut))

View File

@@ -6,4 +6,4 @@ SUPPORT_FEATURE = (
'rclone_upload', 'rclone_upload',
'custom_re_encode', 'custom_re_encode',
) )
SOFTWARE_VERSION = '0.0.6' SOFTWARE_VERSION = '0.0.8'

View File

@@ -329,7 +329,7 @@ class FfmpegTask(object):
skip_seconds = float(param) skip_seconds = float(param)
if skip_seconds > 0: if skip_seconds > 0:
effect_index += 1 effect_index += 1
filter_args.append(f"{video_output_str}trim=start={skip_seconds}[v_eff{effect_index}]") filter_args.append(f"{video_output_str}trim=start={skip_seconds},setpts=PTS-STARTPTS[v_eff{effect_index}]")
video_output_str = f"[v_eff{effect_index}]" video_output_str = f"[v_eff{effect_index}]"
elif effect.startswith("tail:"): elif effect.startswith("tail:"):
param = effect.split(":", 2)[1] param = effect.split(":", 2)[1]
@@ -342,7 +342,7 @@ class FfmpegTask(object):
# 使用reverse+trim+reverse的方法来精确获取最后N秒 # 使用reverse+trim+reverse的方法来精确获取最后N秒
filter_args.append(f"{video_output_str}reverse[v_rev{effect_index}]") filter_args.append(f"{video_output_str}reverse[v_rev{effect_index}]")
filter_args.append(f"[v_rev{effect_index}]trim=duration={tail_seconds}[v_trim{effect_index}]") filter_args.append(f"[v_rev{effect_index}]trim=duration={tail_seconds}[v_trim{effect_index}]")
filter_args.append(f"[v_trim{effect_index}]reverse[v_eff{effect_index}]") filter_args.append(f"[v_trim{effect_index}]reverse,setpts=PTS-STARTPTS[v_eff{effect_index}]")
video_output_str = f"[v_eff{effect_index}]" video_output_str = f"[v_eff{effect_index}]"
elif effect.startswith("show:"): elif effect.startswith("show:"):
param = effect.split(":", 2)[1] param = effect.split(":", 2)[1]
@@ -351,7 +351,7 @@ class FfmpegTask(object):
show_seconds = float(param) show_seconds = float(param)
if show_seconds > 0: if show_seconds > 0:
effect_index += 1 effect_index += 1
filter_args.append(f"{video_output_str}trim=end={show_seconds}[v_eff{effect_index}]") filter_args.append(f"{video_output_str}trim=end={show_seconds},setpts=PTS-STARTPTS[v_eff{effect_index}]")
video_output_str = f"[v_eff{effect_index}]" video_output_str = f"[v_eff{effect_index}]"
elif effect.startswith("grid4:"): elif effect.startswith("grid4:"):
param = effect.split(":", 2)[1] param = effect.split(":", 2)[1]

View File

@@ -10,6 +10,24 @@ from telemetry import get_tracer
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def _apply_http_replace_map(url):
"""
应用 HTTP_REPLACE_MAP 环境变量替换 URL
:param str url: 原始 URL
:return str: 替换后的 URL
"""
replace_map = os.getenv("HTTP_REPLACE_MAP", "")
if not replace_map:
return url
replace_list = [i.split("|", 1) for i in replace_map.split(",")]
new_url = url
for (_src, _dst) in replace_list:
new_url = new_url.replace(_src, _dst)
if new_url != url:
logger.debug(f"HTTP_REPLACE_MAP: {url} -> {new_url}")
return new_url
def upload_to_oss(url, file_path): def upload_to_oss(url, file_path):
""" """
使用签名URL上传文件到OSS 使用签名URL上传文件到OSS
@@ -27,6 +45,10 @@ def upload_to_oss(url, file_path):
if os.getenv("UPLOAD_METHOD") == "rclone": if os.getenv("UPLOAD_METHOD") == "rclone":
with tracer.start_as_current_span("rclone_to_oss") as r_span: with tracer.start_as_current_span("rclone_to_oss") as r_span:
replace_map = os.getenv("RCLONE_REPLACE_MAP") replace_map = os.getenv("RCLONE_REPLACE_MAP")
config_file = os.getenv("RCLONE_CONFIG_FILE")
rclone_config = ""
if config_file != "":
rclone_config = f"--config {config_file}"
r_span.set_attribute("rclone.replace_map", replace_map) r_span.set_attribute("rclone.replace_map", replace_map)
if replace_map != "": if replace_map != "":
replace_list = [i.split("|", 1) for i in replace_map.split(",")] replace_list = [i.split("|", 1) for i in replace_map.split(",")]
@@ -36,21 +58,24 @@ def upload_to_oss(url, file_path):
new_url = new_url.split("?", 1)[0] new_url = new_url.split("?", 1)[0]
r_span.set_attribute("rclone.target_dir", new_url) r_span.set_attribute("rclone.target_dir", new_url)
if new_url != url: if new_url != url:
result = os.system(f"rclone copyto --no-check-dest --ignore-existing --multi-thread-chunk-size 8M --multi-thread-streams 8 {file_path} {new_url}") result = os.system(f"rclone copyto --no-check-dest --ignore-existing --multi-thread-chunk-size 8M --multi-thread-streams 8 {rclone_config} {file_path} {new_url}")
r_span.set_attribute("rclone.result", result) r_span.set_attribute("rclone.result", result)
if result == 0: if result == 0:
span.set_status(Status(StatusCode.OK)) span.set_status(Status(StatusCode.OK))
return True return True
else: else:
span.set_status(Status(StatusCode.ERROR)) span.set_status(Status(StatusCode.ERROR))
# 应用 HTTP_REPLACE_MAP 替换 URL
http_url = _apply_http_replace_map(url)
span.set_attribute("file.http_url", http_url)
while retries < max_retries: while retries < max_retries:
with tracer.start_as_current_span("upload_to_oss.request") as req_span: with tracer.start_as_current_span("upload_to_oss.request") as req_span:
req_span.set_attribute("http.retry_count", retries) req_span.set_attribute("http.retry_count", retries)
try: try:
req_span.set_attribute("http.method", "PUT") req_span.set_attribute("http.method", "PUT")
req_span.set_attribute("http.url", url) req_span.set_attribute("http.url", http_url)
with open(file_path, 'rb') as f: with open(file_path, 'rb') as f:
response = requests.put(url, data=f, stream=True, timeout=60, headers={"Content-Type": "video/mp4"}) response = requests.put(http_url, data=f, stream=True, timeout=60, headers={"Content-Type": "video/mp4"})
req_span.set_attribute("http.status_code", response.status_code) req_span.set_attribute("http.status_code", response.status_code)
req_span.set_attribute("http.response", response.text) req_span.set_attribute("http.response", response.text)
response.raise_for_status() response.raise_for_status()
@@ -97,6 +122,9 @@ def download_from_oss(url, file_path, skip_if_exist=None):
if file_dir: if file_dir:
if not os.path.exists(file_dir): if not os.path.exists(file_dir):
os.makedirs(file_dir) os.makedirs(file_dir)
# 应用 HTTP_REPLACE_MAP 替换 URL
http_url = _apply_http_replace_map(url)
span.set_attribute("file.http_url", http_url)
max_retries = 5 max_retries = 5
retries = 0 retries = 0
while retries < max_retries: while retries < max_retries:
@@ -104,8 +132,8 @@ def download_from_oss(url, file_path, skip_if_exist=None):
req_span.set_attribute("http.retry_count", retries) req_span.set_attribute("http.retry_count", retries)
try: try:
req_span.set_attribute("http.method", "GET") req_span.set_attribute("http.method", "GET")
req_span.set_attribute("http.url", url) req_span.set_attribute("http.url", http_url)
response = requests.get(url, timeout=15) # 设置超时时间 response = requests.get(http_url, timeout=15) # 设置超时时间
req_span.set_attribute("http.status_code", response.status_code) req_span.set_attribute("http.status_code", response.status_code)
with open(file_path, 'wb') as f: with open(file_path, 'wb') as f:
f.write(response.content) f.write(response.content)