You've already forked FrameTour-RenderWorker
- 在 base.py 中添加文件下载、上传和 FFmpeg 执行的链路追踪 - 在 api_client.py 中实现 API 请求的链路追踪和错误标记 - 在 lease_service.py 中添加租约续期的链路追踪支持 - 在 task_executor.py 中集成任务执行的完整链路追踪 - 新增 util/tracing.py 工具模块提供统一的追踪上下文管理 - 在 .env.example 中添加 OTEL 配置选项 - 在 index.py 中初始化和关闭链路追踪功能
261 lines
7.8 KiB
Python
261 lines
7.8 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""
|
|
OTel 链路追踪工具。
|
|
|
|
提供统一的 tracing 初始化、任务上下文管理与 Span 创建能力。
|
|
"""
|
|
|
|
import logging
|
|
import os
|
|
from contextlib import contextmanager, nullcontext
|
|
from contextvars import ContextVar
|
|
from dataclasses import dataclass
|
|
from typing import Any, Dict, Iterator, Mapping, Optional
|
|
|
|
from opentelemetry import context as otel_context
|
|
from opentelemetry import propagate, trace
|
|
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
|
|
from opentelemetry.sdk.resources import Resource, SERVICE_NAME, SERVICE_VERSION
|
|
from opentelemetry.sdk.trace import TracerProvider
|
|
from opentelemetry.sdk.trace.export import BatchSpanProcessor
|
|
from opentelemetry.trace import Span, SpanKind, Status, StatusCode
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
_DEFAULT_SERVICE_NAME = "RenderWorkerNext"
|
|
_DEFAULT_TRACER_NAME = "render.worker"
|
|
_OTEL_EXPORTER_OTLP_ENDPOINT = "https://oltp.jerryyan.top/v1/traces"
|
|
_TASK_ID_ATTR = "render.task.id"
|
|
_TASK_TYPE_ATTR = "render.task.type"
|
|
_JOB_ID_ATTR = "render.job.id"
|
|
_SEGMENT_ID_ATTR = "render.segment.id"
|
|
_ERROR_CODE_ATTR = "render.error.code"
|
|
_ERROR_MESSAGE_ATTR = "render.error.message"
|
|
_TRUE_VALUES = {"1", "true", "yes", "on"}
|
|
|
|
_TRACING_INITIALIZED = False
|
|
_TRACING_ENABLED = False
|
|
_TRACER_PROVIDER: Optional[TracerProvider] = None
|
|
_CURRENT_TASK_CONTEXT: ContextVar[Optional["TaskTraceContext"]] = ContextVar(
|
|
"render_worker_task_trace_context",
|
|
default=None,
|
|
)
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class TaskTraceContext:
|
|
"""任务维度的 tracing 上下文。"""
|
|
|
|
task_id: str
|
|
task_type: str
|
|
job_id: str = ""
|
|
segment_id: str = ""
|
|
|
|
def to_attributes(self) -> Dict[str, str]:
|
|
attributes = {
|
|
_TASK_ID_ATTR: self.task_id,
|
|
_TASK_TYPE_ATTR: self.task_type,
|
|
}
|
|
if self.job_id:
|
|
attributes[_JOB_ID_ATTR] = self.job_id
|
|
if self.segment_id:
|
|
attributes[_SEGMENT_ID_ATTR] = self.segment_id
|
|
return attributes
|
|
|
|
|
|
def _parse_bool(value: str, default: bool) -> bool:
|
|
if value is None:
|
|
return default
|
|
return value.strip().lower() in _TRUE_VALUES
|
|
|
|
|
|
def is_tracing_enabled() -> bool:
|
|
return _TRACING_ENABLED
|
|
|
|
|
|
def initialize_tracing(worker_id: str, service_version: str) -> bool:
|
|
"""
|
|
初始化 OTel tracing。
|
|
"""
|
|
global _TRACING_INITIALIZED
|
|
global _TRACING_ENABLED
|
|
global _TRACER_PROVIDER
|
|
|
|
if _TRACING_INITIALIZED:
|
|
return _TRACING_ENABLED
|
|
|
|
_TRACING_INITIALIZED = True
|
|
if not _parse_bool(os.getenv("OTEL_ENABLED"), default=True):
|
|
logger.info("OTel tracing disabled by OTEL_ENABLED")
|
|
_TRACING_ENABLED = False
|
|
return False
|
|
|
|
service_name = _DEFAULT_SERVICE_NAME
|
|
attributes: Dict[str, str] = {
|
|
SERVICE_NAME: service_name,
|
|
SERVICE_VERSION: service_version,
|
|
"render.worker.id": str(worker_id),
|
|
}
|
|
|
|
resource = Resource.create(attributes)
|
|
tracer_provider = TracerProvider(resource=resource)
|
|
tracer_provider.add_span_processor(
|
|
BatchSpanProcessor(
|
|
OTLPSpanExporter(endpoint=_OTEL_EXPORTER_OTLP_ENDPOINT)
|
|
)
|
|
)
|
|
trace.set_tracer_provider(tracer_provider)
|
|
|
|
_TRACING_ENABLED = True
|
|
if trace.get_tracer_provider() is tracer_provider:
|
|
_TRACER_PROVIDER = tracer_provider
|
|
|
|
logger.info("OTel tracing initialized (service=%s, worker=%s)", service_name, worker_id)
|
|
return True
|
|
|
|
|
|
def shutdown_tracing() -> None:
|
|
"""优雅关闭 tracing provider,刷新剩余 span。"""
|
|
global _TRACING_ENABLED
|
|
|
|
if not _TRACING_ENABLED:
|
|
return
|
|
|
|
provider = _TRACER_PROVIDER
|
|
if provider is not None:
|
|
try:
|
|
provider.shutdown()
|
|
except Exception as exc:
|
|
logger.warning("Failed to shutdown tracing provider: %s", exc)
|
|
|
|
_TRACING_ENABLED = False
|
|
|
|
|
|
def build_task_trace_context(task: Any) -> TaskTraceContext:
|
|
task_id = str(getattr(task, "task_id", ""))
|
|
task_type_obj = getattr(task, "task_type", "")
|
|
task_type = str(getattr(task_type_obj, "value", task_type_obj))
|
|
|
|
job_id = ""
|
|
if hasattr(task, "get_job_id"):
|
|
job_id = str(task.get_job_id() or "")
|
|
|
|
segment_id = ""
|
|
if hasattr(task, "get_segment_id"):
|
|
segment_value = task.get_segment_id()
|
|
segment_id = str(segment_value) if segment_value is not None else ""
|
|
|
|
return TaskTraceContext(
|
|
task_id=task_id,
|
|
task_type=task_type,
|
|
job_id=job_id,
|
|
segment_id=segment_id,
|
|
)
|
|
|
|
|
|
def get_current_task_context() -> Optional[TaskTraceContext]:
|
|
return _CURRENT_TASK_CONTEXT.get()
|
|
|
|
|
|
def capture_otel_context() -> Any:
|
|
return otel_context.get_current()
|
|
|
|
|
|
@contextmanager
|
|
def bind_trace_context(parent_otel_context: Any, task_context: Optional[TaskTraceContext]) -> Iterator[None]:
|
|
"""
|
|
在当前线程绑定父 OTel 上下文与任务上下文。
|
|
|
|
用于跨线程延续任务链路(例如租约续期线程)。
|
|
"""
|
|
otel_token = None
|
|
task_token = None
|
|
|
|
if parent_otel_context is not None:
|
|
otel_token = otel_context.attach(parent_otel_context)
|
|
if task_context is not None:
|
|
task_token = _CURRENT_TASK_CONTEXT.set(task_context)
|
|
|
|
try:
|
|
yield
|
|
finally:
|
|
if task_token is not None:
|
|
_CURRENT_TASK_CONTEXT.reset(task_token)
|
|
if otel_token is not None:
|
|
otel_context.detach(otel_token)
|
|
|
|
|
|
@contextmanager
|
|
def task_trace_scope(task: Any, span_name: str = "render.task.process") -> Iterator[Optional[Span]]:
|
|
"""创建任务根 Span 并绑定任务上下文。"""
|
|
task_context = build_task_trace_context(task)
|
|
task_token = _CURRENT_TASK_CONTEXT.set(task_context)
|
|
|
|
span_cm = nullcontext(None)
|
|
if _TRACING_ENABLED:
|
|
tracer = trace.get_tracer(_DEFAULT_TRACER_NAME)
|
|
span_cm = tracer.start_as_current_span(span_name, kind=SpanKind.CONSUMER)
|
|
|
|
try:
|
|
with span_cm as span:
|
|
if span is not None:
|
|
for key, value in task_context.to_attributes().items():
|
|
span.set_attribute(key, value)
|
|
yield span
|
|
finally:
|
|
_CURRENT_TASK_CONTEXT.reset(task_token)
|
|
|
|
|
|
@contextmanager
|
|
def start_span(
|
|
name: str,
|
|
*,
|
|
attributes: Optional[Mapping[str, Any]] = None,
|
|
kind: SpanKind = SpanKind.INTERNAL,
|
|
task_id: Optional[str] = None,
|
|
) -> Iterator[Optional[Span]]:
|
|
"""
|
|
创建任务内子 Span。
|
|
|
|
当 tracing 未启用,或当前不在任务上下文中且未显式传入 task_id 时,返回空上下文。
|
|
"""
|
|
task_context = get_current_task_context()
|
|
should_trace = _TRACING_ENABLED and (task_context is not None or bool(task_id))
|
|
if not should_trace:
|
|
with nullcontext(None) as span:
|
|
yield span
|
|
return
|
|
|
|
tracer = trace.get_tracer(_DEFAULT_TRACER_NAME)
|
|
with tracer.start_as_current_span(name, kind=kind) as span:
|
|
if task_context is not None:
|
|
for key, value in task_context.to_attributes().items():
|
|
span.set_attribute(key, value)
|
|
if task_id and (task_context is None or task_context.task_id != task_id):
|
|
span.set_attribute(_TASK_ID_ATTR, task_id)
|
|
if attributes:
|
|
for key, value in attributes.items():
|
|
if value is not None:
|
|
span.set_attribute(key, value)
|
|
yield span
|
|
|
|
|
|
def mark_span_error(span: Optional[Span], message: str, error_code: str = "") -> None:
|
|
"""标记 Span 为错误状态。"""
|
|
if span is None:
|
|
return
|
|
|
|
if error_code:
|
|
span.set_attribute(_ERROR_CODE_ATTR, error_code)
|
|
if message:
|
|
span.set_attribute(_ERROR_MESSAGE_ATTR, message[:500])
|
|
span.set_status(Status(StatusCode.ERROR, message[:200]))
|
|
|
|
|
|
def inject_trace_headers(headers: Optional[Mapping[str, str]] = None) -> Dict[str, str]:
|
|
"""向 HTTP 头注入当前 trace 上下文。"""
|
|
carrier = dict(headers) if headers else {}
|
|
if _TRACING_ENABLED:
|
|
propagate.inject(carrier)
|
|
return carrier
|