diff --git a/biz/ffmpeg.py b/biz/ffmpeg.py index 220d598..13ad6e3 100644 --- a/biz/ffmpeg.py +++ b/biz/ffmpeg.py @@ -1,7 +1,7 @@ import json import os.path import time -from concurrent.futures import ThreadPoolExecutor +from concurrent.futures import ThreadPoolExecutor, as_completed from opentelemetry.trace import Status, StatusCode @@ -129,13 +129,31 @@ def check_placeholder_exist_with_count(placeholder_id, task_params, required_cou return False -def start_ffmpeg_task(ffmpeg_task): +def start_ffmpeg_task(ffmpeg_task, max_workers=4): tracer = get_tracer(__name__) with tracer.start_as_current_span("start_ffmpeg_task") as span: - for task in ffmpeg_task.analyze_input_render_tasks(): - result = start_ffmpeg_task(task) - if not result: - return False + sub_tasks = list(ffmpeg_task.analyze_input_render_tasks()) + + if sub_tasks: + span.set_attribute("sub_tasks.count", len(sub_tasks)) + with ThreadPoolExecutor(max_workers=max_workers) as executor: + futures = {executor.submit(start_ffmpeg_task, task, max_workers): task + for task in sub_tasks} + for future in as_completed(futures): + try: + if not future.result(): + # 快速失败:取消剩余任务 + for f in futures: + f.cancel() + span.set_status(Status(StatusCode.ERROR)) + return False + except Exception as e: + logger.error("子任务执行失败: %s", e) + for f in futures: + f.cancel() + span.set_status(Status(StatusCode.ERROR)) + return False + ffmpeg_task.correct_task_type() span.set_attribute("task.type", ffmpeg_task.task_type) span.set_attribute("task.center_cut", str(ffmpeg_task.center_cut))