Source code for duetector.monitors.subprocess_monitor

from __future__ import annotations

import subprocess
import threading
from collections import Counter, namedtuple
from io import DEFAULT_BUFFER_SIZE
from select import select
from typing import IO, Any, Callable

import psutil

from duetector.collectors.base import Collector
from duetector.injectors.base import Injector
from duetector.log import logger
from duetector.managers.collector import CollectorManager
from duetector.managers.filter import FilterManager
from duetector.managers.injector import InjectorManager
from duetector.managers.tracer import TracerManager
from duetector.monitors.base import Monitor
from duetector.proto.subprocess import (
    EventMessage,
    InitMessage,
    StopMessage,
    StoppedMessage,
    dispatch_message,
)
from duetector.tracers.base import SubprocessTracer


[docs] class SubprocessHost: def __init__( self, timeout, backend, poll_szie=1024, bufsize=DEFAULT_BUFFER_SIZE * 4, kill_timeout=5, restart_times=0, ) -> None: self.tracers: dict[SubprocessTracer, subprocess.Popen] = {} self.callbacks: dict[SubprocessTracer, Callable[[namedtuple], None]] = {} self.timeout = timeout self.backend = backend self.bufsize = bufsize self.poll_szie = poll_szie self.kill_timeout = kill_timeout self.restart_times = restart_times self.restart_counter: Counter = Counter() self.shutdown_event = threading.Event() self.shutdown_event.clear()
[docs] def notify_init(self, tracer: SubprocessTracer): logger.debug(f"Notify init for tracer {tracer.__class__.__name__}") self._writeline( InitMessage.from_host(self, tracer).model_dump_json(), self.tracers[tracer].stdin, ) self._poll(tracer, self.tracers[tracer].stdout.readline())
[docs] def notify_stop(self, tracer: SubprocessTracer): logger.debug(f"Notify stop for tracer {tracer.__class__.__name__}") self._writeline(StopMessage.from_host(self).model_dump_json(), self.tracers[tracer].stdin)
[docs] def notify_poll(self, tracer: SubprocessTracer): logger.debug(f"Notify poll for tracer {tracer.__class__.__name__}") self._writeline(EventMessage.from_host(self).model_dump_json(), self.tracers[tracer].stdin)
[docs] def _writeline(self, json_str: str, io: IO): if not json_str.endswith("\n"): json_str += "\n" io.write(json_str) io.flush()
[docs] def attach(self, tracer: SubprocessTracer): if self.shutdown_event.is_set(): raise RuntimeError("Host already shutdown") p = subprocess.Popen( tracer.comm, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, # FIXME: We currently using a text proto, may change to binary proto in the future bufsize=self.bufsize, ) self.tracers[tracer] = p self.notify_init(tracer)
[docs] def detach(self, tracer: SubprocessTracer): if tracer in self.tracers: p = self.tracers.get(tracer) try: p = psutil.Process(p.pid) except psutil.NoSuchProcess: # Already stopped logger.warning("Detaching a stopped tracer") self.poll(tracer) self.tracers.pop(tracer) return try: self.notify_stop(tracer) logger.info(f"Detaching {tracer}") p.terminate() logger.info("Wating for subprocess to stop") p.wait(self.kill_timeout) except psutil.TimeoutExpired: logger.warning("Timeout for terminate subprocess, kill it.") p.kill() self.poll(tracer) self.tracers.pop(tracer) else: logger.warning("Tracer not attached, ignore")
[docs] def shutdown(self): logger.info("Shutting down host") self.shutdown_event.set() for tracer in list(self.tracers): self.detach(tracer)
[docs] def is_alive(self, tracer: SubprocessTracer): p = self.tracers[tracer] try: psutil_p = psutil.Process(p.pid) except psutil.NoSuchProcess: return False return psutil_p.is_running()
[docs] def poll(self, tracer: SubprocessTracer): """ Poll a tracer. """ p = self.tracers[tracer] if not self.is_alive(tracer): # Restart tracer if self.restart_times == 0: return if ( self.restart_counter[tracer] >= self.restart_times and not self.shutdown_event.is_set() ): logger.warning( f"Tracer {tracer.__class__.__name__} restart times exceed limit, stop it." ) self.detach(tracer) return logger.warning(f"Tracer {tracer.__class__.__name__} stopped, restart it.") self.restart_counter[tracer] += 1 self.attach(tracer) p = self.tracers[tracer] # Poll next time else: logger.debug(f"Polling tracer {tracer.__class__.__name__}") self.notify_poll(tracer) ready = select([p.stdout.fileno()], [], [], self.timeout)[0] poll_count = 0 while ready and poll_count < self.poll_szie and not self.shutdown_event.is_set(): poll_count += 1 output = p.stdout.readline() if not output: break self._poll(tracer, output) ready = select([p.stdout.fileno()], [], [], self.timeout)[0] logger.debug(f"Total poll count: {poll_count}")
[docs] def _poll(self, tracer: SubprocessTracer, output): if not output: # Empty output return msg = dispatch_message(output) if not msg: return if isinstance(msg, EventMessage): self.callbacks[tracer](msg.serialize_namedtuple()) if isinstance(msg, StoppedMessage): self.detach(tracer) if isinstance(msg, InitMessage): logger.info(f"Tracer {tracer.__class__.__name__} initialized")
[docs] def poll_all(self): """ Poll all tracers. """ return self.backend.map(self.poll, self.tracers)
[docs] def set_callback(self, tracer, callback): """ Set callback for tracer. """ self.callbacks[tracer] = callback
[docs] class SubprocessMonitor(Monitor): config_scope = "monitor.subprocess" default_config = { **Monitor.default_config, "auto_init": True, "timeout": 0.01, "kill_timeout": 5, "pool_size": 1024, "bufsize": DEFAULT_BUFFER_SIZE * 4, "restart_times": 0, } @property def auto_init(self): """ Auto init tracers when init monitor. """ return self.config.auto_init @property def timeout(self): """ Timeout for poll. """ return float(self.config.timeout) @property def kill_timeout(self): """ Timeout for kill subprocess. """ return int(self.config.kill_timeout) @property def bufsize(self): """ Buffer size for subprocess. """ return int(self.config.bufsize) @property def poll_szie(self): """ Poll size for subprocess. """ return float(self.config.pool_size) def __init__(self, config: dict[str, Any] | None = None, *args, **kwargs): super().__init__(config=config, *args, **kwargs) if self.disabled: logger.info("SubprocessMonitor disabled") return self.tracers: list[SubprocessTracer] = TracerManager(config).init(tracer_type=SubprocessTracer) # type: ignore self.host = SubprocessHost( timeout=self.timeout, backend=self._backend, bufsize=self.bufsize, poll_szie=self.poll_szie, kill_timeout=self.kill_timeout, ) if self.auto_init: self.init()
[docs] def init(self): for tracer in self.tracers: tracer.attach(self.host) self._set_callback(self.host, tracer) logger.info(f"Tracer {tracer.__class__.__name__} attached")
[docs] def shutdown(self): self.host.shutdown() super().shutdown()
[docs] def poll_all(self): return self.host.poll_all()
[docs] def poll(self, tracer: SubprocessTracer): # type: ignore return self.host.poll(tracer)