Source code for duetector.tools.poller

from __future__ import annotations

import threading
from typing import Any

from duetector.config import Configuable
from duetector.log import logger


[docs] class Poller(Configuable): """ A wrapper for ``threading.Thread`` Special config: - interval_ms: Polling interval in milliseconds """ config_scope = "poller" """ Config scope for this poller. """ default_config = { "interval_ms": 500, "call_when_shutdown": True, } """ Default config for this poller. """ def __init__( self, config: dict[str, Any] | None = None, *args, **kwargs, ): super().__init__(config=config, *args, **kwargs) self._thread: threading.Thread | None = None self.shutdown_event = threading.Event() @property def interval_ms(self): """ Polling interval in milliseconds """ return self.config.interval_ms @property def call_when_shutdown(self): """ Whether to call ``func`` when shutdown """ return self.config.call_when_shutdown
[docs] def start(self, func, *args, **kwargs): """ Start a poller thread, until ``shutdown`` is called. Exceptions: - RuntimeError: If poller thread is already started. """ if self._thread: raise RuntimeError("Poller thread is already started, try shutdown and wait first.") def _poll(): while not self.shutdown_event.is_set(): try: func(*args, **kwargs) except Exception as e: logger.exception(e) self.shutdown_event.wait(timeout=self.interval_ms / 1000) # call func one last time before exit if self.call_when_shutdown: try: func(*args, **kwargs) except Exception as e: logger.exception(e) self._thread = threading.Thread(target=_poll) self.shutdown_event.clear() self._thread.start()
[docs] def shutdown(self): """ Shutdown poller thread. It's safe to call this method multiple times. After shutdown, ``wait`` should be called to wait for the thread to exit. """ self.shutdown_event.set()
[docs] def wait(self, timeout_ms=None): """ Wait for poller thread to exit. Call this method after ``shutdown``. """ if not self._thread: return timeout = (timeout_ms or self.interval_ms * 3) / 1000 self._thread.join(timeout=timeout) if self._thread.is_alive(): # FIXME: should we raise an exception here? logger.warning("Poller thread is still alive after timeout") self.shutdown() else: self._thread = None self.shutdown_event.clear()