from __future__ import annotations
import os
import re
from ast import literal_eval
from collections import namedtuple
from duetector.extension.filter import hookimpl
from duetector.filters import Filter
[docs]
class PatternFilter(Filter):
"""
A Filter support regex pattern to filter data.
There are following config build-in:
- ``re_exclude_fname``: Regex pattern to filter out ``fname`` field
- ``re_exclude_comm``: Regex pattern to filter out ``comm`` field
- ``exclude_pid``: Filter out ``pid`` field
- ``exclude_uid``: Filter out ``uid`` field
- ``exclude_gid``: Filter out ``gid`` field
Customize exclude is also supported:
- ``re_exclude_custom``: Regex pattern to filter out ``custom`` field
- ``exclude_custom``: Filter out ``custom`` field
You can change ``custom`` to any field you want to filter out.
Config ``enable_customize_exclude`` to enable customize exclude, default is ``True``.
Use ``(?!…)`` for include pattern:
- ``re_exclude_custom``: ``["(?!/proc/)"]`` will include ``/proc`` but exclude others.
Note:
- We using python literal to parse config, so you can use environment variable to pass list:
- Recommended: ``{PREFIX...}RE_EXCLUDE_FNAME="['/proc*', '/sys*']"``.
- Remember to quote the value, otherwise it will be parsed as a expression, e.g. ``{PREFIX...}RE_EXCLUDE_FNAME=[/proc*]`` will cause SyntaxError or ValueError.
and will fallback to split by comma.
So either use python literal or string split by comma:
- Recommended: ``{PREFIX...}RE_EXCLUDE_FNAME="['/proc*', '/sys*']"``
- It's OK: ``{PREFIX...}RE_EXCLUDE_FNAME="/proc*, /sys*"``
- Wrong: ``{PREFIX...}RE_EXCLUDE_FNAME=[/proc*, /sys*]``, this will be converted to a list of ``"[/proc*"`` and ``"/sys*]"``.
"""
default_config = {
**Filter.default_config,
"ignore_current_pid": True,
"enable_customize_exclude": True,
"re_exclude_fname": [
"/proc",
"/sys",
"/lib",
"/dev",
"/run",
"/usr/lib",
"/etc/ld.so.cache",
],
"re_exclude_comm": [],
"exclude_pid": [],
"exclude_uid": [
0,
],
"exclude_gid": [
0,
],
}
"""
Default config for ``PatternFilter``
"""
_re_cache = {}
"""
Cache for re pattern
"""
@property
def enable_customize_exclude(self) -> bool:
"""
If enable customize exclude
"""
return bool(self.config.enable_customize_exclude)
@property
def ignore_current_pid(self) -> bool:
return bool(self.config.ignore_current_pid)
[docs]
@staticmethod
def _wrap_exclude_list(value: str | list[str]) -> set[str]:
"""
Wrap exclude list to list if it's not a list
"""
if isinstance(value, list):
return set(str(v).strip() for v in value)
if not isinstance(value, str):
raise TypeError(f"Type of {value} should be str or list, got {type(value)}")
try:
# Use ast.literal_eval to parse python literal
value = literal_eval(value)
except (SyntaxError, ValueError):
# If value is not a valid python literal, fallback to split by comma
# e.g. "/proc/a*"
value = value.split(",")
try:
return set(str(v).strip() for v in value)
except TypeError:
return set(str(value).strip())
[docs]
def is_exclude(self, data: namedtuple, enable_customize_exclude=False) -> bool:
"""
Customize exclude function, return ``True`` to drop data, return ``False`` to keep data.
"""
for k in self.config._config_dict:
if not enable_customize_exclude and k not in self.default_config:
# If not enable_customize_exclude, only use default config
continue
if k.startswith("exclude_"):
field = k.replace("exclude_", "")
value = getattr(data, field, None)
if value is None:
continue
if str(value).strip() in self._wrap_exclude_list(self.config._config_dict[k]):
return True
if k.startswith("re_exclude_"):
field = k.replace("re_exclude_", "")
value = getattr(data, field, None)
if value is None:
continue
if self.re_exclude(
str(value).strip(),
self._wrap_exclude_list(self.config._config_dict[k]),
):
return True
return False
[docs]
def re_exclude(self, field: str | None, re_list: str | list[str]) -> bool:
"""
Check if field match any pattern in re_list
"""
if not field:
return False
if isinstance(re_list, str):
re_list = [re_list]
def _cached_search(pattern, field):
if pattern not in self._re_cache:
self._re_cache[pattern] = re.compile(pattern)
return self._re_cache[pattern].search(field)
return any(_cached_search(pattern, field) for pattern in re_list)
[docs]
def filter(self, data: namedtuple) -> namedtuple | None:
"""
Filter data, return ``None`` to drop data, return data to keep data.
"""
if self.ignore_current_pid and getattr(data, "pid", None) == os.getpid():
return
if self.is_exclude(data, enable_customize_exclude=self.enable_customize_exclude):
return
return data
@hookimpl
def init_filter(config=None):
return PatternFilter(config=config)