from collections import namedtuple
from typing import Callable
from duetector.extension.tracer import hookimpl
from duetector.tracers.base import BccTracer
[docs]
class OpenTracer(BccTracer):
"""
A tracer for openat2 syscall.
"""
name = "do_sys_openat2"
default_config = {
**BccTracer.default_config,
"attach_event": "do_sys_openat2",
"poll_timeout": 10,
}
attach_type = "kprobe"
@property
def attatch_args(self):
return {"fn_name": "do_trace", "event": self.config.attach_event}
attatch_args = {"fn_name": "trace_entry", "event": "do_sys_openat2"}
poll_fn = "ring_buffer_poll"
@property
def poll_args(self):
return {"timeout": int(self.config.poll_timeout)}
data_t = namedtuple("OpenTracking", ["pid", "uid", "gid", "comm", "fname", "timestamp"])
prog = """
#include <linux/sched.h>
#include <linux/fs_struct.h>
struct data_t {
u32 pid;
u32 uid;
u32 gid;
char comm[TASK_COMM_LEN];
char fname[NAME_MAX];
u64 timestamp;
};
BPF_RINGBUF_OUTPUT(buffer, 1 << 4);
int trace_entry(struct pt_regs *ctx, int dfd, const char __user *filename, struct open_how *how) {
struct data_t data = {};
data.pid = bpf_get_current_pid_tgid();
data.uid = bpf_get_current_uid_gid();
data.gid = bpf_get_current_uid_gid() >> 32;
data.timestamp = bpf_ktime_get_ns();
bpf_get_current_comm(&data.comm, sizeof(data.comm));
bpf_probe_read_user_str(&data.fname, sizeof(data.fname), filename);
buffer.ringbuf_output(&data, sizeof(data), 0);
return 0;
}
"""
[docs]
def set_callback(self, host, callback: Callable[[namedtuple], None]):
def _(ctx, data, size):
event = host["buffer"].event(data)
return callback(self._convert_data(event)) # type: ignore
host["buffer"].open_ring_buffer(_)
@hookimpl
def init_tracer(config):
return OpenTracer(config)
if __name__ == "__main__":
from bcc import BPF
b = BPF(text=OpenTracer.prog)
tracer = OpenTracer()
tracer.attach(b)
def print_callback(data: namedtuple):
print(f"[{data.comm} ({data.pid})] {data.timestamp} OPEN {data.fname}") # type: ignore
tracer.set_callback(b, print_callback)
poller = tracer.get_poller(b)
while True:
try:
poller(**tracer.poll_args)
except KeyboardInterrupt:
exit()