from __future__ import annotations
import copy
from typing import Any
from sqlalchemy import func, select # type: ignore
from duetector.collectors.base import Collector
from duetector.collectors.models import Tracking
from duetector.db import SessionManager
from duetector.extension.collector import hookimpl
[docs]
class DBCollector(Collector):
"""
A collector using database, sqlite by default.
Every tracker will create a table in database, see ``SessionManager``.get_tracking_model
Config:
- ``db``: A ``SessionManager`` config
"""
default_config = {
**Collector.default_config,
"db": {
**SessionManager.default_config,
"engine": {
"url": "sqlite:///duetector-dbcollector.sqlite3",
},
},
}
def __repr__(self):
config_without_db = copy.deepcopy(self.config._config_dict)
config_without_db.pop("db", None)
return f"<[DBCollector {self.sm}] {config_without_db}>"
def __init__(self, config: dict[str, Any] | None = None, *args, **kwargs):
super().__init__(config, *args, **kwargs)
# Init as a submodel
self.sm = SessionManager(self.config._config_dict)
[docs]
def _emit(self, t: Tracking):
m = self.sm.get_tracking_model(t.tracer, self.id)
with self.sm.begin() as session:
tracking = m(**t.model_dump(exclude=["tracer"]))
session.add(tracking)
session.commit()
[docs]
def summary(self) -> dict:
with self.sm.begin() as session:
return {
tracer: {
"count": session.execute(select(func.count()).select_from(m)).scalar(),
"first at": session.execute(select(m)).first()[0].dt,
"last": session.execute(select(m).order_by(m.id.desc())) # type: ignore
.first()[0]
.to_collector_tracking(),
}
for tracer, m in self.sm.get_all_models().items()
}
@hookimpl
def init_collector(config):
return DBCollector(config)
if __name__ == "__main__":
print(DBCollector())