from __future__ import annotations
import threading
from typing import Any
from duetector.config import Configuable
from duetector.log import logger
[docs]
class Poller(Configuable):
"""
A wrapper for ``threading.Thread``
Special config:
- interval_ms: Polling interval in milliseconds
"""
config_scope = "poller"
"""
Config scope for this poller.
"""
default_config = {
"interval_ms": 500,
"call_when_shutdown": True,
}
"""
Default config for this poller.
"""
def __init__(
self,
config: dict[str, Any] | None = None,
*args,
**kwargs,
):
super().__init__(config=config, *args, **kwargs)
self._thread: threading.Thread | None = None
self.shutdown_event = threading.Event()
@property
def interval_ms(self):
"""
Polling interval in milliseconds
"""
return self.config.interval_ms
@property
def call_when_shutdown(self):
"""
Whether to call ``func`` when shutdown
"""
return self.config.call_when_shutdown
[docs]
def start(self, func, *args, **kwargs):
"""
Start a poller thread, until ``shutdown`` is called.
Exceptions:
- RuntimeError: If poller thread is already started.
"""
if self._thread:
raise RuntimeError("Poller thread is already started, try shutdown and wait first.")
def _poll():
while not self.shutdown_event.is_set():
try:
func(*args, **kwargs)
except Exception as e:
logger.exception(e)
self.shutdown_event.wait(timeout=self.interval_ms / 1000)
# call func one last time before exit
if self.call_when_shutdown:
try:
func(*args, **kwargs)
except Exception as e:
logger.exception(e)
self._thread = threading.Thread(target=_poll)
self.shutdown_event.clear()
self._thread.start()
[docs]
def shutdown(self):
"""
Shutdown poller thread. It's safe to call this method multiple times.
After shutdown, ``wait`` should be called to wait for the thread to exit.
"""
self.shutdown_event.set()
[docs]
def wait(self, timeout_ms=None):
"""
Wait for poller thread to exit.
Call this method after ``shutdown``.
"""
if not self._thread:
return
timeout = (timeout_ms or self.interval_ms * 3) / 1000
self._thread.join(timeout=timeout)
if self._thread.is_alive():
# FIXME: should we raise an exception here?
logger.warning("Poller thread is still alive after timeout")
self.shutdown()
else:
self._thread = None
self.shutdown_event.clear()