Last active
October 24, 2025 14:05
-
-
Save benfgit/cf95181dbfb238a73712a0509e85a111 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| import sys | |
| import re | |
| import termios | |
| import tty | |
| from datetime import datetime | |
| class AuditLogViewer: | |
| def __init__(self, log_path='/var/log/audit/audit.log'): | |
| self.log_path = log_path | |
| self.page_size = 30 | |
| self.logs = [] | |
| self.load_logs() | |
| def load_logs(self): | |
| try: | |
| with open(self.log_path) as f: | |
| lines = f.readlines() | |
| syscalls = {} | |
| execves = {} | |
| root_shell_pids = set() | |
| root_shell_times = {} # <<< NEW >>> pid -> timestamp of root shell start | |
| self.logs = [] | |
| skip_next = False | |
| for line in lines: | |
| if skip_next: | |
| skip_next = False | |
| continue | |
| eid_match = re.search(r'msg=audit\(([^:]+):(\d+)\):', line) | |
| if not eid_match: | |
| continue | |
| ts = float(eid_match.group(1)) | |
| eid = eid_match.group(2) | |
| if 'type=SYSCALL' in line and 'syscall=59' in line: | |
| syscalls[eid] = line | |
| pid_m = re.search(r' pid=(\d+)', line) | |
| pid = pid_m.group(1) if pid_m else None | |
| # Detect root shell start | |
| if 'uid=0' in line and ('exe="/usr/bin/zsh"' in line or 'exe="/usr/bin/bash"' in line): | |
| if pid: | |
| root_shell_pids.add(pid) | |
| root_shell_times[pid] = ts # <<< NEW >>> | |
| skip_next = True | |
| elif 'type=EXECVE' in line: | |
| execves[eid] = line | |
| for eid, sc_line in syscalls.items(): | |
| ex_line = execves.get(eid) | |
| if not ex_line: | |
| continue | |
| ts_m = re.search(r'msg=audit\((\d+\.\d+):', sc_line) | |
| ts = float(ts_m.group(1)) if ts_m else 0.0 # <<< NEW >>> | |
| args = re.findall(r'a\d+=(?:"([^"]*)"|(\S+))', ex_line) | |
| first_arg = (args[0][0] or args[0][1]) if args else "" | |
| ppid_m = re.search(r'ppid=(\d+)', sc_line) | |
| ppid = ppid_m.group(1) if ppid_m else None | |
| # Root shell link | |
| root_shell_id = ppid if (ppid and ppid in root_shell_pids) else None | |
| # Ignore noisy commands within 1 second of root shell start | |
| if root_shell_id and root_shell_id in root_shell_times: | |
| if ts - root_shell_times[root_shell_id] < 1.0: | |
| continue # skip this noisy entry | |
| if "sudo" in first_arg or root_shell_id: | |
| self.logs.append((sc_line, ex_line, root_shell_id, ts)) # Include timestamp in log tuple | |
| # Sort logs by timestamp (4th element in tuple) | |
| self.logs.sort(key=lambda log: log[3]) | |
| except (FileNotFoundError, PermissionError) as e: | |
| print(f"Error: {e}") | |
| sys.exit(1) | |
| def get_char(self): | |
| fd = sys.stdin.fileno() | |
| old = termios.tcgetattr(fd) | |
| try: | |
| tty.setraw(fd) | |
| ch = sys.stdin.read(1) | |
| finally: | |
| termios.tcsetattr(fd, termios.TCSADRAIN, old) | |
| return ch | |
| def decode_arg(self, val): | |
| if not val.startswith('/') and all(c in '0123456789abcdefABCDEF' for c in val) and len(val) % 2 == 0: | |
| try: | |
| d = bytes.fromhex(val).decode('utf-8') | |
| if all(c.isprintable() or c.isspace() for c in d): | |
| return d | |
| except: | |
| pass | |
| return val | |
| def extract_username(self, sc_line): | |
| m = re.search(r'UID="([^"]+)"', sc_line) | |
| if m: | |
| return m.group(1) | |
| m2 = re.search(r'uid=(\d+)', sc_line) | |
| return m2.group(1) if m2 else "unknown" | |
| def display_page(self): | |
| print("\033[2J\033[HAudit Log Viewer - Use ↑/↓ to navigate, q to quit\n") | |
| start = max(0, min(self.current_position - self.page_size + 1, max(0, len(self.logs) - self.page_size))) | |
| end = min(start + self.page_size, len(self.logs)) | |
| for i in range(start, end): | |
| sc_line, ex_line, root_shell_id, ts = self.logs[i] | |
| time_str = datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S') | |
| user = self.extract_username(sc_line) | |
| args = re.findall(r'a\d+=(?:"([^"]*)"|(\S+))', ex_line) | |
| cmd = ' '.join(self.decode_arg(a[0] or a[1]) for a in args) | |
| # Include root shell id if present | |
| if root_shell_id: | |
| print(f"\033[33m{time_str}\033[0m : \033[32m{user}\033[0m : \033[35m{root_shell_id}\033[0m : {cmd}") | |
| else: | |
| print(f"\033[33m{time_str}\033[0m : \033[32m{user}\033[0m : {cmd}") | |
| print(f"\nEntries {start+1}-{end} of {len(self.logs)}") | |
| def run(self): | |
| if not self.logs: | |
| self.clear_screen() | |
| print("No matching commands found in audit log.") | |
| return | |
| self.current_position = len(self.logs) - 1 | |
| self.display_page() | |
| while True: | |
| c = self.get_char() | |
| if c == 'q': | |
| break | |
| if c == '\x1b': | |
| n1, n2 = self.get_char(), self.get_char() | |
| if n1 == '[': | |
| if n2 == 'A': | |
| self.current_position = max(0, self.current_position - 1) | |
| self.display_page() | |
| elif n2 == 'B': | |
| self.current_position = min(len(self.logs) - 1, self.current_position + 1) | |
| self.display_page() | |
| def clear_screen(self): | |
| print("\033[2J\033[H", end='') | |
| if __name__ == '__main__': | |
| AuditLogViewer().run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment