You've already forked FrameTour-RenderWorker
refactor(ffmpeg): 优化FFmpeg任务处理逻辑
- 添加as_completed导入以支持并发任务执行 - 实现线程池并发处理子任务,提高执行效率 - 添加任务数量监控指标 - 实现快速失败机制,及时取消剩余任务 - 增强异常处理和错误日志记录 - 添加最大工作线程数参数配置
This commit is contained in:
@@ -1,7 +1,7 @@
|
||||
import json
|
||||
import os.path
|
||||
import time
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
|
||||
from opentelemetry.trace import Status, StatusCode
|
||||
|
||||
@@ -129,13 +129,31 @@ def check_placeholder_exist_with_count(placeholder_id, task_params, required_cou
|
||||
return False
|
||||
|
||||
|
||||
def start_ffmpeg_task(ffmpeg_task):
|
||||
def start_ffmpeg_task(ffmpeg_task, max_workers=4):
|
||||
tracer = get_tracer(__name__)
|
||||
with tracer.start_as_current_span("start_ffmpeg_task") as span:
|
||||
for task in ffmpeg_task.analyze_input_render_tasks():
|
||||
result = start_ffmpeg_task(task)
|
||||
if not result:
|
||||
sub_tasks = list(ffmpeg_task.analyze_input_render_tasks())
|
||||
|
||||
if sub_tasks:
|
||||
span.set_attribute("sub_tasks.count", len(sub_tasks))
|
||||
with ThreadPoolExecutor(max_workers=max_workers) as executor:
|
||||
futures = {executor.submit(start_ffmpeg_task, task, max_workers): task
|
||||
for task in sub_tasks}
|
||||
for future in as_completed(futures):
|
||||
try:
|
||||
if not future.result():
|
||||
# 快速失败:取消剩余任务
|
||||
for f in futures:
|
||||
f.cancel()
|
||||
span.set_status(Status(StatusCode.ERROR))
|
||||
return False
|
||||
except Exception as e:
|
||||
logger.error("子任务执行失败: %s", e)
|
||||
for f in futures:
|
||||
f.cancel()
|
||||
span.set_status(Status(StatusCode.ERROR))
|
||||
return False
|
||||
|
||||
ffmpeg_task.correct_task_type()
|
||||
span.set_attribute("task.type", ffmpeg_task.task_type)
|
||||
span.set_attribute("task.center_cut", str(ffmpeg_task.center_cut))
|
||||
|
||||
Reference in New Issue
Block a user