# -*- coding: utf-8 -*- """ OTel 链路追踪工具。 提供统一的 tracing 初始化、任务上下文管理与 Span 创建能力。 """ import logging import os from contextlib import contextmanager, nullcontext from contextvars import ContextVar from dataclasses import dataclass from typing import Any, Dict, Iterator, Mapping, Optional from opentelemetry import context as otel_context from opentelemetry import propagate, trace from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter from opentelemetry.sdk.resources import Resource, SERVICE_NAME, SERVICE_VERSION from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor from opentelemetry.trace import Span, SpanKind, Status, StatusCode logger = logging.getLogger(__name__) _DEFAULT_SERVICE_NAME = "RenderWorkerNext" _DEFAULT_TRACER_NAME = "render.worker" _OTEL_EXPORTER_OTLP_ENDPOINT = "https://oltp.jerryyan.top/v1/traces" _TASK_ID_ATTR = "render.task.id" _TASK_TYPE_ATTR = "render.task.type" _JOB_ID_ATTR = "render.job.id" _SEGMENT_ID_ATTR = "render.segment.id" _ERROR_CODE_ATTR = "render.error.code" _ERROR_MESSAGE_ATTR = "render.error.message" _TRUE_VALUES = {"1", "true", "yes", "on"} _TRACING_INITIALIZED = False _TRACING_ENABLED = False _TRACER_PROVIDER: Optional[TracerProvider] = None _CURRENT_TASK_CONTEXT: ContextVar[Optional["TaskTraceContext"]] = ContextVar( "render_worker_task_trace_context", default=None, ) @dataclass(frozen=True) class TaskTraceContext: """任务维度的 tracing 上下文。""" task_id: str task_type: str job_id: str = "" segment_id: str = "" def to_attributes(self) -> Dict[str, str]: attributes = { _TASK_ID_ATTR: self.task_id, _TASK_TYPE_ATTR: self.task_type, } if self.job_id: attributes[_JOB_ID_ATTR] = self.job_id if self.segment_id: attributes[_SEGMENT_ID_ATTR] = self.segment_id return attributes def _parse_bool(value: str, default: bool) -> bool: if value is None: return default return value.strip().lower() in _TRUE_VALUES def is_tracing_enabled() -> bool: return _TRACING_ENABLED def initialize_tracing(worker_id: str, service_version: str) -> bool: """ 初始化 OTel tracing。 """ global _TRACING_INITIALIZED global _TRACING_ENABLED global _TRACER_PROVIDER if _TRACING_INITIALIZED: return _TRACING_ENABLED _TRACING_INITIALIZED = True if not _parse_bool(os.getenv("OTEL_ENABLED"), default=True): logger.info("OTel tracing disabled by OTEL_ENABLED") _TRACING_ENABLED = False return False service_name = _DEFAULT_SERVICE_NAME attributes: Dict[str, str] = { SERVICE_NAME: service_name, SERVICE_VERSION: service_version, "render.worker.id": str(worker_id), } resource = Resource.create(attributes) tracer_provider = TracerProvider(resource=resource) tracer_provider.add_span_processor( BatchSpanProcessor( OTLPSpanExporter(endpoint=_OTEL_EXPORTER_OTLP_ENDPOINT) ) ) trace.set_tracer_provider(tracer_provider) _TRACING_ENABLED = True if trace.get_tracer_provider() is tracer_provider: _TRACER_PROVIDER = tracer_provider logger.info("OTel tracing initialized (service=%s, worker=%s)", service_name, worker_id) return True def shutdown_tracing() -> None: """优雅关闭 tracing provider,刷新剩余 span。""" global _TRACING_ENABLED if not _TRACING_ENABLED: return provider = _TRACER_PROVIDER if provider is not None: try: provider.shutdown() except Exception as exc: logger.warning("Failed to shutdown tracing provider: %s", exc) _TRACING_ENABLED = False def build_task_trace_context(task: Any) -> TaskTraceContext: task_id = str(getattr(task, "task_id", "")) task_type_obj = getattr(task, "task_type", "") task_type = str(getattr(task_type_obj, "value", task_type_obj)) job_id = "" if hasattr(task, "get_job_id"): job_id = str(task.get_job_id() or "") segment_id = "" if hasattr(task, "get_segment_id"): segment_value = task.get_segment_id() segment_id = str(segment_value) if segment_value is not None else "" return TaskTraceContext( task_id=task_id, task_type=task_type, job_id=job_id, segment_id=segment_id, ) def get_current_task_context() -> Optional[TaskTraceContext]: return _CURRENT_TASK_CONTEXT.get() def capture_otel_context() -> Any: return otel_context.get_current() @contextmanager def bind_trace_context(parent_otel_context: Any, task_context: Optional[TaskTraceContext]) -> Iterator[None]: """ 在当前线程绑定父 OTel 上下文与任务上下文。 用于跨线程延续任务链路(例如租约续期线程)。 """ otel_token = None task_token = None if parent_otel_context is not None: otel_token = otel_context.attach(parent_otel_context) if task_context is not None: task_token = _CURRENT_TASK_CONTEXT.set(task_context) try: yield finally: if task_token is not None: _CURRENT_TASK_CONTEXT.reset(task_token) if otel_token is not None: otel_context.detach(otel_token) @contextmanager def task_trace_scope(task: Any, span_name: str = "render.task.process") -> Iterator[Optional[Span]]: """创建任务根 Span 并绑定任务上下文。""" task_context = build_task_trace_context(task) task_token = _CURRENT_TASK_CONTEXT.set(task_context) span_cm = nullcontext(None) if _TRACING_ENABLED: tracer = trace.get_tracer(_DEFAULT_TRACER_NAME) span_cm = tracer.start_as_current_span(span_name, kind=SpanKind.CONSUMER) try: with span_cm as span: if span is not None: for key, value in task_context.to_attributes().items(): span.set_attribute(key, value) yield span finally: _CURRENT_TASK_CONTEXT.reset(task_token) @contextmanager def start_span( name: str, *, attributes: Optional[Mapping[str, Any]] = None, kind: SpanKind = SpanKind.INTERNAL, task_id: Optional[str] = None, ) -> Iterator[Optional[Span]]: """ 创建任务内子 Span。 当 tracing 未启用,或当前不在任务上下文中且未显式传入 task_id 时,返回空上下文。 """ task_context = get_current_task_context() should_trace = _TRACING_ENABLED and (task_context is not None or bool(task_id)) if not should_trace: with nullcontext(None) as span: yield span return tracer = trace.get_tracer(_DEFAULT_TRACER_NAME) with tracer.start_as_current_span(name, kind=kind) as span: if task_context is not None: for key, value in task_context.to_attributes().items(): span.set_attribute(key, value) if task_id and (task_context is None or task_context.task_id != task_id): span.set_attribute(_TASK_ID_ATTR, task_id) if attributes: for key, value in attributes.items(): if value is not None: span.set_attribute(key, value) yield span def mark_span_error(span: Optional[Span], message: str, error_code: str = "") -> None: """标记 Span 为错误状态。""" if span is None: return if error_code: span.set_attribute(_ERROR_CODE_ATTR, error_code) if message: span.set_attribute(_ERROR_MESSAGE_ATTR, message[:500]) span.set_status(Status(StatusCode.ERROR, message[:200])) def inject_trace_headers(headers: Optional[Mapping[str, str]] = None) -> Dict[str, str]: """向 HTTP 头注入当前 trace 上下文。""" carrier = dict(headers) if headers else {} if _TRACING_ENABLED: propagate.inject(carrier) return carrier