from __future__ import annotations
import platform
from collections import deque, namedtuple
from concurrent.futures import ThreadPoolExecutor
from typing import Any
from duetector.config import Config, Configuable
from duetector.extension.collector import hookimpl
from duetector.log import logger
from .models import Tracking
[docs]
class Collector(Configuable):
"""
Base class for all collectors, provide a ThreadPoolExecutor each instance for async emit.
By default, the config scope of ``Collector`` is ``collector.{class_name}``.
Implementations should override ``_emit`` and ``summary`` method, see ``DequeCollector`` as an example.
"""
default_config = {
"disabled": False,
"statis_id": "",
"backend_args": {
"max_workers": 10,
},
}
"""
Default config for ``Collector``
"""
_backend_imp = ThreadPoolExecutor
"""
Default backend implementation for ``Collector``
"""
def __init__(self, config: dict[str, Any] | None = None, *args, **kwargs):
super().__init__(config, *args, **kwargs)
self._backend = self._backend_imp(**self.backend_args._config_dict)
@property
def config_scope(self):
"""
Config scope for current collector
"""
return self.__class__.__name__
@property
def disabled(self):
"""
If current collector is disabled
"""
return self.config.disabled
@property
def id(self) -> str:
"""
ID for current collector, used to identify current collector in database
If not set, use hostname
"""
return self.config.statis_id or platform.node()
@property
def backend_args(self):
"""
Arguments for backend ``self._backend_imp``
"""
return self.config.backend_args
[docs]
def emit(self, tracer, data: namedtuple):
"""
Wrapper for ``self._emit``, submit to backend executor
"""
if self.disabled:
return
if not tracer:
logger.warning("Empty tracer, skip emit")
return self._backend.submit(self._emit, Tracking.from_namedtuple(tracer, data))
[docs]
def _emit(self, t: Tracking):
"""
Emit a tracking to collector, should be implemented by subclasses
"""
raise NotImplementedError
[docs]
def summary(self) -> dict:
"""
Get summary of current collector, should be implemented by subclasses
"""
raise NotImplementedError
[docs]
def shutdown(self):
"""
Shutdown backend executor
"""
self._backend.shutdown()
[docs]
class DequeCollector(Collector):
"""
A simple collector using deque, disabled by default
Config:
- ``maxlen``: Max length of deque
"""
default_config = {
**Collector.default_config,
"disabled": True,
"maxlen": 1024,
}
"""
Default config for ``DequeCollector``
"""
@property
def maxlen(self):
"""
Max length of deque
"""
return self.config.maxlen
def __init__(self, config: dict[str, Any] | None = None, *args, **kwargs):
super().__init__(config, *args, **kwargs)
self._trackings: dict[str, deque[Tracking]] = {}
[docs]
def _emit(self, t: Tracking):
self._trackings.setdefault(t.tracer, deque(maxlen=self.maxlen))
self._trackings[t.tracer].append(t)
[docs]
def summary(self) -> dict:
return {
tracer: {
"count": len(trackings),
"first": trackings[0].dt,
"last": trackings[-1].dt,
"most_recent": trackings[-1].model_dump(),
}
for tracer, trackings in self._trackings.items()
}
@hookimpl
def init_collector(config):
return DequeCollector(config)