from __future__ import annotations
from collections import namedtuple
from threading import Lock
from typing import Any, Callable
from duetector.config import Config, Configuable
from duetector.exceptions import TracerError, TreacerDisabledError
[docs]
class Tracer(Configuable):
"""
A base class for all tracers.
As a reverse dependency for host,
subclass should implement ``attach``, ``detach``, ``get_poller`` and ``set_callback``.
This allow tracer to decide how to attach to host, how to detach from host.
``data_t`` is a NamedTuple, which is used to convert raw data to a ``NamedTuple``.
Default scope for config is ``Tracer.__class__.__name__``.
"""
name: str | None
"""
Name for this tracer. Will be used for collecting data.
"""
data_t: namedtuple
"""
Data type for this tracer.
"""
default_config = {
"disabled": False,
}
"""
Default config for this tracer.
"""
@property
def config_scope(self):
"""
Config scope for this tracer.
"""
return self.__class__.__name__
@property
def disabled(self):
"""
If this tracer is disabled.
"""
return self.config.disabled
[docs]
def attach(self, host):
"""
Attach this tracer to host.
"""
raise NotImplementedError("attach not implemented")
[docs]
def detach(self, host):
"""
Detach this tracer from host.
"""
raise NotImplementedError("detach not implemented")
[docs]
def get_poller(self, host) -> Callable:
"""
Get a poller function from host.
"""
raise NotImplementedError("get_poller not implemented")
[docs]
def set_callback(self, host, callback: Callable[[namedtuple], None]):
"""
Set a callback function to host.
"""
raise NotImplementedError("set_callback not implemented")
[docs]
class BccTracer(Tracer):
"""
A Tracer use ``bcc.BPF`` as a host
For simple tracers, you can just set ``attach_type``, ``attatch_args`` to attatch to ``bcc.BPF``.
Equal to `bcc.BPF(prog).attatch_{attatch_type}(**attatch_args)`
For those tracers need to attatch multiple times, set ``many_attatchs`` to attatch multiple times.
``set_callback`` should attatch ``callback`` to ``bpf``, translate raw data to ``data_t`` then call the ``callback``
FIXME:
- Maybe it's hard for using? Maybe we should use a more simple way to implement this?
"""
default_config = {
**Tracer.default_config,
}
"""
Default config for this tracer.
"""
attach_type: str | None = None
"""
Attatch type for ``bcc.BPF``, called as ``BPF.attatch_{attach_type}``,
"""
attatch_args: dict[str, str] = {}
"""
Args for attatch function.
"""
many_attatchs: list[tuple[str, dict[str, str]]] = []
"""
List of attatch function name and args.
``attatch_type``, ``attatch_args`` will merge to this list.
"""
poll_fn: str
"""
Poll function name in ``bcc.BPF``
"""
poll_args: dict[str, str] = {}
"""
Args for poll function.
Remenber to set ``timeout`` for poll function in ``poll_args`` if needed,
"""
prog: str
"""
bpf program
"""
def _convert_data(self, data) -> namedtuple:
"""
Convert raw data to ``data_t``.
"""
args = {}
for k in self.data_t._fields: # type: ignore
v = getattr(data, k)
if isinstance(v, bytes):
try:
v = v.decode("utf-8")
except UnicodeDecodeError:
pass
args[k] = v
return self.data_t(**args) # type: ignore
def _attatch(self, host, attatch_type, attatch_args):
"""
Wrapper for ``bcc.BPF.attatch_{attatch_type}``.
"""
if not attatch_type:
return
attatcher = getattr(host, f"attach_{attatch_type}")
# Prevent AttributeError
attatch_args = attatch_args or {}
return attatcher(**attatch_args)
[docs]
def attach(self, host):
"""
Attatch to host.
Merge ``attatch_type``, ``attatch_args`` to ``many_attatchs`` then attatch.
"""
if self.disabled:
raise TreacerDisabledError("Tracer is disabled")
attatch_list = [*self.many_attatchs, (self.attach_type, self.attatch_args)]
for attatch_type, attatch_args in attatch_list:
self._attatch(host, attatch_type, attatch_args)
def _detach(self, host, attatch_type, attatch_args):
"""
Wrapper for ``bcc.BPF.detach_{attatch_type}``
"""
if not attatch_type:
return
attatcher = getattr(host, f"detach_{attatch_type}")
# Prevent AttributeError
attatch_args = attatch_args or {}
return attatcher(**attatch_args)
[docs]
def detach(self, host):
"""
Detach from host.
Merge ``attatch_type``, ``attatch_args`` to ``many_attatchs`` then detach.
FIXME:
- Maybe we should specify ``detach*`` for detaching?
"""
if self.disabled:
raise TreacerDisabledError("Tracer is disabled")
attatch_list = [*self.many_attatchs, (self.attach_type, self.attatch_args)]
for attatch_type, attatch_args in attatch_list:
self._detach(host, attatch_type, attatch_args)
[docs]
def get_poller(self, host) -> Callable:
"""
Get poller function from host.
"""
if self.disabled:
raise TreacerDisabledError("Tracer is disabled")
if not self.poll_fn:
# Not support poll, prevent AttributeError, fake one
def _(*args, **kwargs):
pass
return _
poller = getattr(host, self.poll_fn)
if not poller:
raise TracerError(f"{self.poll_fn} function not found in BPF")
return poller
[docs]
def set_callback(self, host, callback: Callable[[namedtuple], None]):
"""
Set callback function to host.
Should implemented by subclass.
"""
raise NotImplementedError("set_callback not implemented")
[docs]
class ShellTracer(Tracer):
"""
A tracer use ``ShTracerHost`` as host.
More detail on :doc:`ShellMonitor and ShTracerHost </monitors/sh>`.
Output of shell command will be converted to ``data_t`` and cached by default.
Attributes:
comm (List[str]): shell command
data_t (NamedTuple): data type for this tracer
Special config:
- enable_cache: If enable cache for this tracer.
Cache means the same output will not be converted and emited again.
"""
comm: list[str]
"""
shell command
"""
data_t: namedtuple("ShellOutput", ["output", "dt"])
"""
data type for this tracer
"""
_cache: Any = None
"""
cache for this tracer
"""
default_config = {**Tracer.default_config, "enable_cache": True}
"""
Default config for this tracer.
"""
def __init__(self, config: Config | dict[str, Any] | None = None, *args, **kwargs):
super().__init__(config, *args, **kwargs)
self.mutex = Lock()
@property
def enable_cache(self):
"""
If enable cache for this tracer.
"""
return self.config.enable_cache
[docs]
def set_cache(self, cache):
"""
Set cache for this tracer.
"""
with self.mutex:
self._cache = cache
[docs]
def get_cache(self):
"""
Get cache for this tracer.
"""
return self._cache
[docs]
def attach(self, host):
"""
Attach to host.
"""
host.attach(self)
[docs]
def detach(self, host):
"""
Detach from host.
"""
host.detach(self)
[docs]
def get_poller(self, host) -> Callable:
"""
Get poller function from host.
"""
return host.get_poller(self)
[docs]
def set_callback(self, host, callback: Callable[[namedtuple], None]):
"""
Set callback function to host.
"""
host.set_callback(self, callback)
[docs]
class SubprocessTracer(Tracer):
default_config = {
**Tracer.default_config,
}
"""
Default config for this tracer.
"""
comm: list[str]
"""
shell command
"""
preserve_env: bool = False
"""
If preserve env for this command
"""
def __init__(self, config: Config | dict[str, Any] | None = None, *args, **kwargs):
super().__init__(config, *args, **kwargs)
[docs]
def attach(self, host):
"""
Attach to host.
"""
host.attach(self)
[docs]
def detach(self, host):
"""
Detach from host.
"""
host.detach(self)
[docs]
def get_poller(self, host) -> Callable:
"""
Get poller function from host.
"""
return host.get_poller(self)
[docs]
def set_callback(self, host, callback: Callable[[namedtuple], None]):
"""
Set callback function to host.
"""
host.set_callback(self, callback)