Skip to content

Instantly share code, notes, and snippets.

@benfgit
Last active October 24, 2025 14:05
Show Gist options
  • Select an option

  • Save benfgit/cf95181dbfb238a73712a0509e85a111 to your computer and use it in GitHub Desktop.

Select an option

Save benfgit/cf95181dbfb238a73712a0509e85a111 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import sys
import re
import termios
import tty
from datetime import datetime
class AuditLogViewer:
def __init__(self, log_path='/var/log/audit/audit.log'):
self.log_path = log_path
self.page_size = 30
self.logs = []
self.load_logs()
def load_logs(self):
try:
with open(self.log_path) as f:
lines = f.readlines()
syscalls = {}
execves = {}
root_shell_pids = set()
root_shell_times = {} # <<< NEW >>> pid -> timestamp of root shell start
self.logs = []
skip_next = False
for line in lines:
if skip_next:
skip_next = False
continue
eid_match = re.search(r'msg=audit\(([^:]+):(\d+)\):', line)
if not eid_match:
continue
ts = float(eid_match.group(1))
eid = eid_match.group(2)
if 'type=SYSCALL' in line and 'syscall=59' in line:
syscalls[eid] = line
pid_m = re.search(r' pid=(\d+)', line)
pid = pid_m.group(1) if pid_m else None
# Detect root shell start
if 'uid=0' in line and ('exe="/usr/bin/zsh"' in line or 'exe="/usr/bin/bash"' in line):
if pid:
root_shell_pids.add(pid)
root_shell_times[pid] = ts # <<< NEW >>>
skip_next = True
elif 'type=EXECVE' in line:
execves[eid] = line
for eid, sc_line in syscalls.items():
ex_line = execves.get(eid)
if not ex_line:
continue
ts_m = re.search(r'msg=audit\((\d+\.\d+):', sc_line)
ts = float(ts_m.group(1)) if ts_m else 0.0 # <<< NEW >>>
args = re.findall(r'a\d+=(?:"([^"]*)"|(\S+))', ex_line)
first_arg = (args[0][0] or args[0][1]) if args else ""
ppid_m = re.search(r'ppid=(\d+)', sc_line)
ppid = ppid_m.group(1) if ppid_m else None
# Root shell link
root_shell_id = ppid if (ppid and ppid in root_shell_pids) else None
# Ignore noisy commands within 1 second of root shell start
if root_shell_id and root_shell_id in root_shell_times:
if ts - root_shell_times[root_shell_id] < 1.0:
continue # skip this noisy entry
if "sudo" in first_arg or root_shell_id:
self.logs.append((sc_line, ex_line, root_shell_id, ts)) # Include timestamp in log tuple
# Sort logs by timestamp (4th element in tuple)
self.logs.sort(key=lambda log: log[3])
except (FileNotFoundError, PermissionError) as e:
print(f"Error: {e}")
sys.exit(1)
def get_char(self):
fd = sys.stdin.fileno()
old = termios.tcgetattr(fd)
try:
tty.setraw(fd)
ch = sys.stdin.read(1)
finally:
termios.tcsetattr(fd, termios.TCSADRAIN, old)
return ch
def decode_arg(self, val):
if not val.startswith('/') and all(c in '0123456789abcdefABCDEF' for c in val) and len(val) % 2 == 0:
try:
d = bytes.fromhex(val).decode('utf-8')
if all(c.isprintable() or c.isspace() for c in d):
return d
except:
pass
return val
def extract_username(self, sc_line):
m = re.search(r'UID="([^"]+)"', sc_line)
if m:
return m.group(1)
m2 = re.search(r'uid=(\d+)', sc_line)
return m2.group(1) if m2 else "unknown"
def display_page(self):
print("\033[2J\033[HAudit Log Viewer - Use ↑/↓ to navigate, q to quit\n")
start = max(0, min(self.current_position - self.page_size + 1, max(0, len(self.logs) - self.page_size)))
end = min(start + self.page_size, len(self.logs))
for i in range(start, end):
sc_line, ex_line, root_shell_id, ts = self.logs[i]
time_str = datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S')
user = self.extract_username(sc_line)
args = re.findall(r'a\d+=(?:"([^"]*)"|(\S+))', ex_line)
cmd = ' '.join(self.decode_arg(a[0] or a[1]) for a in args)
# Include root shell id if present
if root_shell_id:
print(f"\033[33m{time_str}\033[0m : \033[32m{user}\033[0m : \033[35m{root_shell_id}\033[0m : {cmd}")
else:
print(f"\033[33m{time_str}\033[0m : \033[32m{user}\033[0m : {cmd}")
print(f"\nEntries {start+1}-{end} of {len(self.logs)}")
def run(self):
if not self.logs:
self.clear_screen()
print("No matching commands found in audit log.")
return
self.current_position = len(self.logs) - 1
self.display_page()
while True:
c = self.get_char()
if c == 'q':
break
if c == '\x1b':
n1, n2 = self.get_char(), self.get_char()
if n1 == '[':
if n2 == 'A':
self.current_position = max(0, self.current_position - 1)
self.display_page()
elif n2 == 'B':
self.current_position = min(len(self.logs) - 1, self.current_position + 1)
self.display_page()
def clear_screen(self):
print("\033[2J\033[H", end='')
if __name__ == '__main__':
AuditLogViewer().run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment