Source code for duetector.monitors.sh_monitor

from __future__ import annotations

import subprocess
from collections import namedtuple
from datetime import datetime
from typing import Any, Callable

from duetector.filters.base import Filter
from duetector.injectors.base import Injector
from duetector.managers.injector import InjectorManager

try:
    from functools import cache
except ImportError:
    from functools import lru_cache as cache

from duetector.collectors.base import Collector
from duetector.log import logger
from duetector.managers.collector import CollectorManager
from duetector.managers.filter import FilterManager
from duetector.managers.tracer import TracerManager
from duetector.monitors.base import Monitor
from duetector.tracers.base import ShellTracer


[docs] class ShTracerHost: """ Host for Shell, provide a way to poll shell command. Use ``subprocess.Popen`` to run shell command. """ def __init__(self, backend, timeout=5): self.tracers: dict[ShellTracer, list[str]] = {} self.callbacks: dict[ShellTracer, Callable[[namedtuple], None]] = {} self.timeout = timeout self.backend = backend
[docs] def attach(self, tracer): self.tracers[tracer] = tracer.comm
[docs] def detach(self, tracer): if tracer in self.tracers: self.tracers.pop(tracer)
[docs] @cache def get_poller(self, tracer) -> Callable[[None], None]: """ Provide a callback function for ``Poller``. Use ``subprocess.Popen`` to run shell command, pipe stdout to callback. """ comm = self.tracers[tracer] enable_cache = tracer.enable_cache def _(): p = subprocess.Popen(comm, stdout=subprocess.PIPE, stderr=subprocess.PIPE) p.wait(self.timeout) output = p.stdout.read().decode("utf-8") if enable_cache: if output == tracer.get_cache(): # No change, no need to call callback return tracer.set_cache(output) callback = self.callbacks[tracer] callback(tracer.data_t(output=output, dt=datetime.now())) return _
[docs] def poll(self, tracer): """ Poll a tracer. """ self.get_poller(tracer)()
[docs] def poll_all(self): """ Poll all tracers. """ return self.backend.map(self.poll, self.tracers)
[docs] def set_callback(self, tracer, callback): """ Set callback for tracer. """ self.callbacks[tracer] = callback
[docs] class ShMonitor(Monitor): """ A monitor for shell command. """ config_scope = "monitor.sh" """ Config scope for this monitor. """ default_config = { **Monitor.default_config, "auto_init": True, "timeout": 5, } """ Default config for this monitor. Two sub-configs are available: - auto_init: Auto init tracers when init monitor. - timeout: Timeout for shell command. """ @property def auto_init(self): """ Auto init tracers when init monitor. """ return self.config.auto_init @property def timeout(self): """ Timeout for shell command. """ return int(self.config.timeout) def __init__(self, config: dict[str, Any] | None = None, *args, **kwargs): super().__init__(config=config, *args, **kwargs) if self.disabled: logger.info("ShMonitor disabled") return self.tracers: list[ShellTracer] = TracerManager(config).init(tracer_type=ShellTracer) # type: ignore self.host = ShTracerHost(self._backend, self.timeout) if self.auto_init: self.init()
[docs] def init(self): for tracer in self.tracers: tracer.attach(self.host) self._set_callback(self.host, tracer) logger.info(f"Tracer {tracer.__class__.__name__} attached")
[docs] def poll_all(self): return self.host.poll_all()
[docs] def poll(self, tracer: ShellTracer): # type: ignore return self.host.poll(tracer)
if __name__ == "__main__": m = ShMonitor() print(m) while True: try: m.poll_all() except KeyboardInterrupt: print(m.summary()) exit()