You've already forked FrameTour-RenderWorker
- 添加 GPU_REQUIRED_TASK_TYPES 集合定义需要 GPU 加速的任务类型 - 修改任务执行逻辑仅对需要 GPU 的任务类型获取 GPU 设备 - 更新 GPU 设备释放逻辑确保仅在实际分配设备时进行释放 - 改进日志记录和资源管理流程
279 lines
8.5 KiB
Python
279 lines
8.5 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""
|
|
任务执行器
|
|
|
|
管理任务的并发执行,协调处理器、租约服务等组件。
|
|
"""
|
|
|
|
import logging
|
|
import threading
|
|
from concurrent.futures import ThreadPoolExecutor, Future
|
|
from typing import Dict, Optional, TYPE_CHECKING
|
|
|
|
from domain.task import Task, TaskType
|
|
from domain.result import TaskResult, ErrorCode
|
|
|
|
# 需要 GPU 加速的任务类型
|
|
GPU_REQUIRED_TASK_TYPES = {
|
|
TaskType.RENDER_SEGMENT_VIDEO,
|
|
TaskType.COMPOSE_TRANSITION,
|
|
}
|
|
from domain.config import WorkerConfig
|
|
from core.handler import TaskHandler
|
|
from services.lease_service import LeaseService
|
|
from services.gpu_scheduler import GPUScheduler
|
|
|
|
if TYPE_CHECKING:
|
|
from services.api_client import APIClientV2
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class TaskExecutor:
|
|
"""
|
|
任务执行器
|
|
|
|
负责任务的并发调度和执行,包括:
|
|
- 注册和管理任务处理器
|
|
- 维护任务执行状态
|
|
- 协调租约续期
|
|
- 上报执行结果
|
|
"""
|
|
|
|
def __init__(self, config: WorkerConfig, api_client: 'APIClientV2'):
|
|
"""
|
|
初始化任务执行器
|
|
|
|
Args:
|
|
config: Worker 配置
|
|
api_client: API 客户端
|
|
"""
|
|
self.config = config
|
|
self.api_client = api_client
|
|
|
|
# 任务处理器注册表
|
|
self.handlers: Dict[TaskType, TaskHandler] = {}
|
|
|
|
# 当前任务跟踪
|
|
self.current_tasks: Dict[str, Task] = {}
|
|
self.current_futures: Dict[str, Future] = {}
|
|
|
|
# 线程池
|
|
self.executor = ThreadPoolExecutor(
|
|
max_workers=config.max_concurrency,
|
|
thread_name_prefix="TaskWorker"
|
|
)
|
|
|
|
# 线程安全锁
|
|
self.lock = threading.Lock()
|
|
|
|
# GPU 调度器(如果启用硬件加速)
|
|
self.gpu_scheduler = GPUScheduler(config)
|
|
|
|
if self.gpu_scheduler.enabled:
|
|
logger.info(f"GPU scheduler enabled with {self.gpu_scheduler.device_count} device(s)")
|
|
|
|
# 注册处理器
|
|
self._register_handlers()
|
|
|
|
def _register_handlers(self):
|
|
"""注册所有任务处理器"""
|
|
# 延迟导入以避免循环依赖
|
|
from handlers.render_video import RenderSegmentVideoHandler
|
|
from handlers.compose_transition import ComposeTransitionHandler
|
|
from handlers.prepare_audio import PrepareJobAudioHandler
|
|
from handlers.package_ts import PackageSegmentTsHandler
|
|
from handlers.finalize_mp4 import FinalizeMp4Handler
|
|
|
|
handlers = [
|
|
RenderSegmentVideoHandler(self.config, self.api_client),
|
|
ComposeTransitionHandler(self.config, self.api_client),
|
|
PrepareJobAudioHandler(self.config, self.api_client),
|
|
PackageSegmentTsHandler(self.config, self.api_client),
|
|
FinalizeMp4Handler(self.config, self.api_client),
|
|
]
|
|
|
|
for handler in handlers:
|
|
task_type = handler.get_supported_type()
|
|
self.handlers[task_type] = handler
|
|
logger.debug(f"Registered handler for {task_type.value}")
|
|
|
|
def get_current_task_ids(self) -> list:
|
|
"""
|
|
获取当前正在执行的任务 ID 列表
|
|
|
|
Returns:
|
|
任务 ID 列表
|
|
"""
|
|
with self.lock:
|
|
return list(self.current_tasks.keys())
|
|
|
|
def get_current_task_count(self) -> int:
|
|
"""
|
|
获取当前正在执行的任务数量
|
|
|
|
Returns:
|
|
任务数量
|
|
"""
|
|
with self.lock:
|
|
return len(self.current_tasks)
|
|
|
|
def can_accept_task(self) -> bool:
|
|
"""
|
|
检查是否可以接受新任务
|
|
|
|
Returns:
|
|
是否可以接受
|
|
"""
|
|
return self.get_current_task_count() < self.config.max_concurrency
|
|
|
|
def submit_task(self, task: Task) -> bool:
|
|
"""
|
|
提交任务到线程池
|
|
|
|
Args:
|
|
task: 任务实体
|
|
|
|
Returns:
|
|
是否提交成功
|
|
"""
|
|
with self.lock:
|
|
# 检查任务是否已在执行
|
|
if task.task_id in self.current_tasks:
|
|
logger.warning(f"[task:{task.task_id}] Task already running, skipping")
|
|
return False
|
|
|
|
# 检查并发上限
|
|
if len(self.current_tasks) >= self.config.max_concurrency:
|
|
logger.info(
|
|
f"[task:{task.task_id}] Max concurrency reached "
|
|
f"({self.config.max_concurrency}), rejecting task"
|
|
)
|
|
return False
|
|
|
|
# 检查是否有对应的处理器
|
|
if task.task_type not in self.handlers:
|
|
logger.error(f"[task:{task.task_id}] No handler for type: {task.task_type.value}")
|
|
return False
|
|
|
|
# 记录任务
|
|
self.current_tasks[task.task_id] = task
|
|
|
|
# 提交到线程池
|
|
future = self.executor.submit(self._process_task, task)
|
|
self.current_futures[task.task_id] = future
|
|
|
|
logger.info(f"[task:{task.task_id}] Submitted ({task.task_type.value})")
|
|
return True
|
|
|
|
def _process_task(self, task: Task):
|
|
"""
|
|
处理单个任务(在线程池中执行)
|
|
|
|
Args:
|
|
task: 任务实体
|
|
"""
|
|
task_id = task.task_id
|
|
logger.info(f"[task:{task_id}] Starting {task.task_type.value}")
|
|
|
|
# 启动租约续期服务
|
|
lease_service = LeaseService(
|
|
self.api_client,
|
|
task_id,
|
|
interval=self.config.lease_extension_threshold,
|
|
extension=self.config.lease_extension_duration
|
|
)
|
|
lease_service.start()
|
|
|
|
# 获取 GPU 设备(仅对需要 GPU 的任务类型)
|
|
device_index = None
|
|
needs_gpu = task.task_type in GPU_REQUIRED_TASK_TYPES
|
|
if needs_gpu and self.gpu_scheduler.enabled:
|
|
device_index = self.gpu_scheduler.acquire()
|
|
if device_index is not None:
|
|
logger.info(f"[task:{task_id}] Assigned to GPU device {device_index}")
|
|
|
|
# 获取处理器(需要在设置 GPU 设备前获取)
|
|
handler = self.handlers.get(task.task_type)
|
|
|
|
try:
|
|
# 报告任务开始
|
|
self.api_client.report_start(task_id)
|
|
|
|
if not handler:
|
|
raise ValueError(f"No handler for task type: {task.task_type}")
|
|
|
|
# 设置 GPU 设备(线程本地存储)
|
|
if device_index is not None:
|
|
handler.set_gpu_device(device_index)
|
|
|
|
# 执行前钩子
|
|
handler.before_handle(task)
|
|
|
|
# 执行任务
|
|
result = handler.handle(task)
|
|
|
|
# 执行后钩子
|
|
handler.after_handle(task, result)
|
|
|
|
# 上报结果
|
|
if result.success:
|
|
self.api_client.report_success(task_id, result.data)
|
|
logger.info(f"[task:{task_id}] Completed successfully")
|
|
else:
|
|
error_code = result.error_code.value if result.error_code else 'E_UNKNOWN'
|
|
self.api_client.report_fail(task_id, error_code, result.error_message or '')
|
|
logger.error(f"[task:{task_id}] Failed: {result.error_message}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"[task:{task_id}] Exception: {e}", exc_info=True)
|
|
self.api_client.report_fail(task_id, 'E_UNKNOWN', str(e))
|
|
|
|
finally:
|
|
# 清除 GPU 设备设置
|
|
if handler:
|
|
handler.clear_gpu_device()
|
|
|
|
# 释放 GPU 设备(仅当实际分配了设备时)
|
|
if device_index is not None:
|
|
self.gpu_scheduler.release(device_index)
|
|
|
|
# 停止租约续期
|
|
lease_service.stop()
|
|
|
|
# 从当前任务中移除
|
|
with self.lock:
|
|
self.current_tasks.pop(task_id, None)
|
|
self.current_futures.pop(task_id, None)
|
|
|
|
def shutdown(self, wait: bool = True):
|
|
"""
|
|
关闭执行器
|
|
|
|
Args:
|
|
wait: 是否等待所有任务完成
|
|
"""
|
|
logger.info("Shutting down task executor...")
|
|
|
|
# 关闭线程池
|
|
self.executor.shutdown(wait=wait)
|
|
|
|
# 清理状态
|
|
with self.lock:
|
|
self.current_tasks.clear()
|
|
self.current_futures.clear()
|
|
|
|
logger.info("Task executor shutdown complete")
|
|
|
|
def get_handler(self, task_type: TaskType) -> Optional[TaskHandler]:
|
|
"""
|
|
获取指定类型的处理器
|
|
|
|
Args:
|
|
task_type: 任务类型
|
|
|
|
Returns:
|
|
处理器实例,不存在则返回 None
|
|
"""
|
|
return self.handlers.get(task_type)
|