Files
FrameTour-RenderWorker/services/task_executor.py
Jerry Yan 9b373dea34 feat(tracing): 集成 OpenTelemetry 链路追踪功能
- 在 base.py 中添加文件下载、上传和 FFmpeg 执行的链路追踪
- 在 api_client.py 中实现 API 请求的链路追踪和错误标记
- 在 lease_service.py 中添加租约续期的链路追踪支持
- 在 task_executor.py 中集成任务执行的完整链路追踪
- 新增 util/tracing.py 工具模块提供统一的追踪上下文管理
- 在 .env.example 中添加 OTEL 配置选项
- 在 index.py 中初始化和关闭链路追踪功能
2026-02-07 00:11:01 +08:00

292 lines
9.5 KiB
Python

# -*- coding: utf-8 -*-
"""
任务执行器
管理任务的并发执行,协调处理器、租约服务等组件。
"""
import logging
import threading
from concurrent.futures import ThreadPoolExecutor, Future
from typing import Dict, Optional, TYPE_CHECKING
from domain.task import Task, TaskType
# 需要 GPU 加速的任务类型
GPU_REQUIRED_TASK_TYPES = {
TaskType.RENDER_SEGMENT_VIDEO,
TaskType.COMPOSE_TRANSITION,
}
from domain.config import WorkerConfig
from core.handler import TaskHandler
from services.lease_service import LeaseService
from services.gpu_scheduler import GPUScheduler
from util.tracing import (
capture_otel_context,
get_current_task_context,
mark_span_error,
start_span,
task_trace_scope,
)
if TYPE_CHECKING:
from services.api_client import APIClientV2
logger = logging.getLogger(__name__)
class TaskExecutor:
"""
任务执行器
负责任务的并发调度和执行,包括:
- 注册和管理任务处理器
- 维护任务执行状态
- 协调租约续期
- 上报执行结果
"""
def __init__(self, config: WorkerConfig, api_client: 'APIClientV2'):
"""
初始化任务执行器
Args:
config: Worker 配置
api_client: API 客户端
"""
self.config = config
self.api_client = api_client
# 任务处理器注册表
self.handlers: Dict[TaskType, TaskHandler] = {}
# 当前任务跟踪
self.current_tasks: Dict[str, Task] = {}
self.current_futures: Dict[str, Future] = {}
# 线程池
self.executor = ThreadPoolExecutor(
max_workers=config.max_concurrency,
thread_name_prefix="TaskWorker"
)
# 线程安全锁
self.lock = threading.Lock()
# GPU 调度器(如果启用硬件加速)
self.gpu_scheduler = GPUScheduler(config)
if self.gpu_scheduler.enabled:
logger.info(f"GPU scheduler enabled with {self.gpu_scheduler.device_count} device(s)")
# 注册处理器
self._register_handlers()
def _register_handlers(self):
"""注册所有任务处理器"""
# 延迟导入以避免循环依赖
from handlers.render_video import RenderSegmentVideoHandler
from handlers.compose_transition import ComposeTransitionHandler
from handlers.prepare_audio import PrepareJobAudioHandler
from handlers.package_ts import PackageSegmentTsHandler
from handlers.finalize_mp4 import FinalizeMp4Handler
handlers = [
RenderSegmentVideoHandler(self.config, self.api_client),
ComposeTransitionHandler(self.config, self.api_client),
PrepareJobAudioHandler(self.config, self.api_client),
PackageSegmentTsHandler(self.config, self.api_client),
FinalizeMp4Handler(self.config, self.api_client),
]
for handler in handlers:
task_type = handler.get_supported_type()
self.handlers[task_type] = handler
logger.debug(f"Registered handler for {task_type.value}")
def get_current_task_ids(self) -> list:
"""
获取当前正在执行的任务 ID 列表
Returns:
任务 ID 列表
"""
with self.lock:
return list(self.current_tasks.keys())
def get_current_task_count(self) -> int:
"""
获取当前正在执行的任务数量
Returns:
任务数量
"""
with self.lock:
return len(self.current_tasks)
def can_accept_task(self) -> bool:
"""
检查是否可以接受新任务
Returns:
是否可以接受
"""
return self.get_current_task_count() < self.config.max_concurrency
def submit_task(self, task: Task) -> bool:
"""
提交任务到线程池
Args:
task: 任务实体
Returns:
是否提交成功
"""
with self.lock:
# 检查任务是否已在执行
if task.task_id in self.current_tasks:
logger.warning(f"[task:{task.task_id}] Task already running, skipping")
return False
# 检查并发上限
if len(self.current_tasks) >= self.config.max_concurrency:
logger.info(
f"[task:{task.task_id}] Max concurrency reached "
f"({self.config.max_concurrency}), rejecting task"
)
return False
# 检查是否有对应的处理器
if task.task_type not in self.handlers:
logger.error(f"[task:{task.task_id}] No handler for type: {task.task_type.value}")
return False
# 记录任务
self.current_tasks[task.task_id] = task
# 提交到线程池
future = self.executor.submit(self._process_task, task)
self.current_futures[task.task_id] = future
logger.info(f"[task:{task.task_id}] Submitted ({task.task_type.value})")
return True
def _process_task(self, task: Task):
"""
处理单个任务(在线程池中执行)
Args:
task: 任务实体
"""
task_id = task.task_id
handler = self.handlers.get(task.task_type)
device_index = None
lease_service = None
with task_trace_scope(task, span_name="render.task.execute") as task_span:
logger.info(f"[task:{task_id}] Starting {task.task_type.value}")
lease_service = LeaseService(
self.api_client,
task_id,
interval=self.config.lease_extension_threshold,
extension=self.config.lease_extension_duration,
parent_otel_context=capture_otel_context(),
task_trace_context=get_current_task_context(),
)
with start_span("render.task.lease.start"):
lease_service.start()
needs_gpu = task.task_type in GPU_REQUIRED_TASK_TYPES
if needs_gpu and self.gpu_scheduler.enabled:
with start_span("render.task.gpu.acquire"):
device_index = self.gpu_scheduler.acquire()
if device_index is not None:
logger.info(f"[task:{task_id}] Assigned to GPU device {device_index}")
try:
with start_span("render.task.report.start"):
self.api_client.report_start(task_id)
if not handler:
raise ValueError(f"No handler for task type: {task.task_type}")
if device_index is not None:
handler.set_gpu_device(device_index)
with start_span("render.task.handler.before"):
handler.before_handle(task)
with start_span("render.task.handler.execute"):
result = handler.handle(task)
with start_span("render.task.handler.after"):
handler.after_handle(task, result)
if result.success:
with start_span("render.task.report.success"):
self.api_client.report_success(task_id, result.data)
if task_span is not None:
task_span.set_attribute("render.task.result", "success")
logger.info(f"[task:{task_id}] Completed successfully")
else:
error_code = result.error_code.value if result.error_code else 'E_UNKNOWN'
with start_span("render.task.report.fail"):
self.api_client.report_fail(task_id, error_code, result.error_message or '')
mark_span_error(task_span, result.error_message or "task failed", error_code)
logger.error(f"[task:{task_id}] Failed: {result.error_message}")
except Exception as e:
mark_span_error(task_span, str(e), "E_UNKNOWN")
logger.error(f"[task:{task_id}] Exception: {e}", exc_info=True)
with start_span("render.task.report.exception"):
self.api_client.report_fail(task_id, 'E_UNKNOWN', str(e))
finally:
if handler:
handler.clear_gpu_device()
if device_index is not None:
with start_span("render.task.gpu.release"):
self.gpu_scheduler.release(device_index)
if lease_service is not None:
with start_span("render.task.lease.stop"):
lease_service.stop()
with self.lock:
self.current_tasks.pop(task_id, None)
self.current_futures.pop(task_id, None)
def shutdown(self, wait: bool = True):
"""
关闭执行器
Args:
wait: 是否等待所有任务完成
"""
logger.info("Shutting down task executor...")
# 关闭线程池
self.executor.shutdown(wait=wait)
# 清理状态
with self.lock:
self.current_tasks.clear()
self.current_futures.clear()
logger.info("Task executor shutdown complete")
def get_handler(self, task_type: TaskType) -> Optional[TaskHandler]:
"""
获取指定类型的处理器
Args:
task_type: 任务类型
Returns:
处理器实例,不存在则返回 None
"""
return self.handlers.get(task_type)