Source code for duetector.collectors.otel

from __future__ import annotations

from typing import Any

from opentelemetry import trace
from opentelemetry.exporter.jaeger.proto.grpc import (
    JaegerExporter as GRPCJaegerExporter,
)
from opentelemetry.exporter.jaeger.thrift import JaegerExporter as ThriftJaegerExporter
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import (
    OTLPSpanExporter as GRPCOTLPSpanExporter,
)
from opentelemetry.exporter.otlp.proto.http.trace_exporter import (
    OTLPSpanExporter as HTTPOTLPSpanExporter,
)
from opentelemetry.exporter.zipkin.json import ZipkinExporter as JSONZipkinExporter
from opentelemetry.exporter.zipkin.proto.http import (
    ZipkinExporter as HTTPZipkinExporter,
)
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter

from duetector.collectors.base import Collector
from duetector.collectors.models import Tracking
from duetector.extension.collector import hookimpl
from duetector.log import logger
from duetector.otel import OTelInspector
from duetector.utils import Singleton, get_grpc_cred_from_path


[docs] class OTelInitiator(metaclass=Singleton): """ Host the OpenTelemetry SDK and initialize the provider and exporter. Avaliable exporters: - ``console`` - ``otlp-grpc`` - ``otlp-http`` - ``jaeger-thrift`` - ``jaeger-grpc`` - ``zipkin-http`` - ``zipkin-json`` - ``prometheus`` Example: .. code-block:: python otel = OTelInitiator() trace = otel.initialize( service_name="duetector", exporter="console", ) from opentelemetry import trace tracer = trace.get_tracer(__name__) with tracer.start_as_current_span("test") as span: span.set_attribute("test", "test") otel.shutdown() """ exporter_cls = { "console": ConsoleSpanExporter, "otlp-grpc": GRPCOTLPSpanExporter, "otlp-http": HTTPOTLPSpanExporter, "jaeger-thrift": ThriftJaegerExporter, "jaeger-grpc": GRPCJaegerExporter, "zipkin-http": HTTPZipkinExporter, "zipkin-json": JSONZipkinExporter, # Prometheus only support metrics # "prometheus": "TODO" } def __init__(self): self._initialized = False self.provider = None
[docs] def initialize( self, service_name="unknown-service", resource_kwargs: dict[str, Any] | None = None, provider_kwargs: dict[str, Any] | None = None, exporter="console", exporter_kwargs: dict[str, Any] | None = None, processor_kwargs: dict[str, Any] | None = None, ) -> None: if self._initialized: logger.info("Already initiated. Skip...") return if not resource_kwargs: resource_kwargs = {} resource_kwargs.setdefault(SERVICE_NAME, service_name) resource = Resource(attributes=resource_kwargs) if not provider_kwargs: provider_kwargs = {} provider = TracerProvider(resource=resource, **provider_kwargs) self.provider = provider if not exporter_kwargs: exporter_kwargs = {} if processor_kwargs: processor_kwargs = {} processor = BatchSpanProcessor( self.exporter_cls[exporter](**exporter_kwargs), **processor_kwargs ) provider.add_span_processor(processor) trace.set_tracer_provider(provider) self._initialized = True
[docs] def shutdown(self): if self._initialized and self.provider: self.provider.shutdown() self._initialized = False self.provider = None
[docs] class OTelCollector(Collector, OTelInspector): """ A collector using OpenTelemetry SDK. Config: - ``exporter``: One of ``console``, ``otlp-grpc``, ``otlp-http``, ``jaeger-thrift``, ``jaeger-grpc``, ``zipkin-http``, ``zipkin-json``, see :class:`OTelInitiator` for more details - ``exporter_kwargs``: A dict of kwargs for exporter Note: Since v1.35, the Jaeger supports OTLP natively. Please use the OTLP exporter instead. Support for this exporter will end July 2023. """ service_prefix = "duetector" service_sep = "-" default_config = { **Collector.default_config, "disabled": True, "exporter": "console", "exporter_kwargs": {}, "grpc_exporter_kwargs": { "secure": False, "root_certificates_path": "", "private_key_path": "", "certificate_chain_path": "", }, "processor_kwargs": {}, } @property def exporter(self) -> str: return self.config.exporter @property def endpoint(self) -> str | None: return self.config.endpoint @property def exporter_kwargs(self) -> dict[str, Any]: return self.config.exporter_kwargs._config_dict @property def processor_kwargs(self) -> dict[str, Any]: return self.config.processor_kwargs._config_dict @property def service_name(self) -> str: return self.generate_service_name(self.id) @property def grpc_exporter_kwargs(self) -> dict[str, Any]: kwargs = self.config.grpc_exporter_kwargs._config_dict wrapped_kwargs = {} if kwargs.get("secure"): creds = get_grpc_cred_from_path( root_certificates_path=kwargs.get("root_certificates_path"), private_key_path=kwargs.get("private_key_path"), certificate_chain_path=kwargs.get("certificate_chain_path"), ) wrapped_kwargs = { "insecure": False, "credentials": creds, } return wrapped_kwargs def __init__(self, config: dict[str, Any] | None = None, *args, **kwargs): super().__init__(config, *args, **kwargs) if "grpc" in self.exporter: logger.info("Merge grpc kwargs into exporter_kwargs") self.exporter_kwargs.update(self.grpc_exporter_kwargs) self.otel = OTelInitiator() self.otel.initialize( service_name=self.service_name, exporter=self.exporter, exporter_kwargs=self.exporter_kwargs, processor_kwargs=self.processor_kwargs, )
[docs] def _emit(self, t: Tracking): tracer = trace.get_tracer(self.id) with tracer.start_as_current_span(self.generate_span_name(t)) as span: t.set_span(self, span)
[docs] def summary(self) -> dict: return {}
[docs] def shutdown(self): super().shutdown() self.otel.shutdown()
@hookimpl def init_collector(config): return OTelCollector(config)