# -*- coding: utf-8 -*- """ 租约续期服务 后台线程定期为正在执行的任务续期租约。 """ import logging import threading import time from typing import TYPE_CHECKING, Any, Optional if TYPE_CHECKING: from services.api_client import APIClientV2 from util.tracing import TaskTraceContext from util.tracing import bind_trace_context, start_span logger = logging.getLogger(__name__) class LeaseService: """ 租约续期服务 在后台线程中定期调用 API 延长任务租约, 防止长时间任务因租约过期被回收。 """ def __init__( self, api_client: 'APIClientV2', task_id: str, interval: int = 60, extension: int = 300, parent_otel_context: Any = None, task_trace_context: Optional['TaskTraceContext'] = None, ): """ 初始化租约服务 Args: api_client: API 客户端 task_id: 任务 ID interval: 续期间隔(秒),默认 60 秒 extension: 每次续期时长(秒),默认 300 秒 """ self.api_client = api_client self.task_id = task_id self.interval = interval self.extension = extension self.parent_otel_context = parent_otel_context self.task_trace_context = task_trace_context self.running = False self.thread: threading.Thread = None self._stop_event = threading.Event() def start(self): """启动租约续期线程""" if self.running: logger.warning(f"[task:{self.task_id}] Lease service already running") return self.running = True self._stop_event.clear() self.thread = threading.Thread( target=self._run, name=f"LeaseService-{self.task_id}", daemon=True ) self.thread.start() logger.debug(f"[task:{self.task_id}] Lease service started (interval={self.interval}s)") def stop(self): """停止租约续期线程""" if not self.running: return self.running = False self._stop_event.set() if self.thread and self.thread.is_alive(): self.thread.join(timeout=5) logger.debug(f"[task:{self.task_id}] Lease service stopped") def _run(self): """续期线程主循环""" with bind_trace_context(self.parent_otel_context, self.task_trace_context): while self.running: if self._stop_event.wait(timeout=self.interval): break if self.running: self._extend_lease() def _extend_lease(self): """执行租约续期""" with start_span( "render.task.lease.extend", task_id=self.task_id, attributes={"render.lease.extension_seconds": self.extension}, ): try: success = self.api_client.extend_lease(self.task_id, self.extension) if success: logger.debug(f"[task:{self.task_id}] Lease extended by {self.extension}s") else: logger.warning(f"[task:{self.task_id}] Failed to extend lease") except Exception as e: logger.warning(f"[task:{self.task_id}] Lease extension error: {e}") def __enter__(self): """上下文管理器入口""" self.start() return self def __exit__(self, exc_type, exc_val, exc_tb): """上下文管理器出口""" self.stop() return False