Created
February 24, 2026 16:31
-
-
Save vfmatzkin/36b52485cb4305c07081a1e4d1183b29 to your computer and use it in GitHub Desktop.
Interactive Log Pager (tail + search + scroll)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Colored log viewer with search and scroll. | |
| Keys: | |
| / Search n/N Next/prev match | |
| e/E Errors ↓/↑ w/W Warnings+ ↓/↑ | |
| s/S Line ↑/↓ d/D Page ↑/↓ | |
| g/G Top/bottom p Pause/unpause | |
| a Resume live c Clear q Quit | |
| """ | |
| import sys, re, os, argparse, threading, time, select | |
| from collections import deque | |
| from datetime import datetime | |
| if os.name == "nt": | |
| os.system("") | |
| # --- Colors ------------------------------------------------------------------ | |
| R = "\033[0m"; RED = "\033[1;31m"; YEL = "\033[33m"; GRN = "\033[32m" | |
| CYN = "\033[36m"; MAG = "\033[35m"; DIM = "\033[90m"; BOLD = "\033[1m" | |
| INV = "\033[7m"; BG_YEL = "\033[43m"; BG_RED = "\033[41m"; WHITE = "\033[97m"; BLACK = "\033[30m" | |
| ANSI_RE = re.compile(r"\033\[[0-9;]*m") | |
| def truncate(s, width): | |
| vlen = 0; i = 0 | |
| while i < len(s) and vlen < width: | |
| if s[i] == "\033": | |
| end = s.find("m", i) | |
| if end != -1: i = end + 1; continue | |
| vlen += 1; i += 1 | |
| while i < len(s) and s[i] == "\033": | |
| end = s.find("m", i) | |
| if end != -1: i = end + 1 | |
| else: break | |
| return s[:i] + R | |
| # --- Patterns ---------------------------------------------------------------- | |
| http_re = re.compile(r'"(GET|POST|PUT|DELETE|PATCH) ([^ ?]+)\S* HTTP/\S+" (\d+)') | |
| access_re = re.compile(r'^INFO:\s+\d+\.\d+\.\d+\.\d+:\d+\s+-\s+"(GET|POST|PUT|DELETE|PATCH) ([^ ?]+)\S* HTTP/\S+" (\d+)') | |
| info_re = re.compile(r"^INFO:(.+?):(.+)$") | |
| err_re = re.compile(r"^ERROR:(.+?):(.+)$") | |
| wrn_re = re.compile(r"^WARNING:(.+?):(.+)$") | |
| LEVEL_ORDER = {"DEBUG": 0, "INFO": 1, "WARNING": 2, "ERROR": 3} | |
| _docker_ts_re = re.compile(r'^(\d{4}-\d{2}-\d{2}T(\d{2}:\d{2}:\d{2})\.\d+Z)\s+') | |
| def _strip_docker_ts(raw): | |
| """Strip docker --timestamps prefix; return (time_str, clean_line).""" | |
| m = _docker_ts_re.match(raw) | |
| if m: | |
| return m.group(2), raw[m.end():] | |
| return None, raw | |
| def _http_match(raw): | |
| return access_re.match(raw) or http_re.search(raw) | |
| def _is_http(raw): | |
| return bool(_http_match(raw)) | |
| def line_level(raw): | |
| if "ERROR" in raw or "Traceback" in raw or "Exception" in raw: return "ERROR" | |
| if "WARNING" in raw: return "WARNING" | |
| if "DEBUG" in raw: return "DEBUG" | |
| m = _http_match(raw) | |
| if m: | |
| code = int(m.group(3)) | |
| return "ERROR" if code >= 500 else "WARNING" if code >= 400 else "INFO" | |
| return "INFO" | |
| def _module_tag(raw): | |
| if _is_http(raw): return "HTTP", None | |
| if (m := err_re.match(raw)): return "ERR", m.group(1).rsplit(".", 1)[-1] | |
| if "ERROR" in raw or "Traceback" in raw: return "ERR", None | |
| if (m := wrn_re.match(raw)): return "WRN", m.group(1).rsplit(".", 1)[-1] | |
| if "WARNING" in raw: return "WRN", None | |
| if (m := info_re.match(raw)): return "INFO", m.group(1).rsplit(".", 1)[-1] | |
| return "OTHER", None | |
| def _format_one(raw, t, highlight=None): | |
| """Format a single raw line. Returns string.""" | |
| if (m := _http_match(raw)): | |
| method, path, status = m.group(1), m.group(2), m.group(3) | |
| code = int(status) | |
| c = RED if code >= 500 else YEL if code >= 400 else MAG if code >= 300 else CYN | |
| line = f"{DIM}{t}{R} {c}{BOLD}{method:<5}{R} {c}{status} {path}{R}" | |
| elif "ERROR" in raw or "Traceback" in raw or "Exception" in raw: | |
| m2 = err_re.match(raw) | |
| if m2: line = f"{DIM}{t}{R} {RED}{DIM}\u2502{R} {RED}{m2.group(2)}{R}" | |
| else: line = f"{DIM}{t}{R} {RED}{raw}{R}" | |
| elif "WARNING" in raw: | |
| m2 = wrn_re.match(raw) | |
| if m2: line = f"{DIM}{t}{R} {YEL}{DIM}\u2502{R} {YEL}{m2.group(2)}{R}" | |
| else: line = f"{DIM}{t}{R} {YEL}{raw}{R}" | |
| elif (m := info_re.match(raw)): | |
| line = f"{DIM}{t}{R} {GRN}{DIM}\u2502{R} {m.group(2)}" | |
| elif "DEBUG" in raw: | |
| msg = re.sub(r"^DEBUG:\S*:", "", raw) | |
| line = f"{DIM}{t} {msg}{R}" | |
| else: | |
| line = f"{DIM}{t}{R} {raw}" | |
| if highlight: | |
| try: | |
| pat = re.compile(f"({highlight})", re.IGNORECASE) | |
| line = pat.sub(f"{BG_RED}{WHITE}{BOLD}\\1{R}", line) | |
| except re.error: pass | |
| return line | |
| def _group_hdr(tag, mod): | |
| if tag == "ERR": return f" {RED}{BOLD}{mod}{R}" | |
| if tag == "WRN": return f" {YEL}{BOLD}{mod}{R}" | |
| return f" {GRN}{BOLD}{mod}{R}" | |
| # --- Terminal ---------------------------------------------------------------- | |
| def term_size(): | |
| try: return os.get_terminal_size() | |
| except OSError: return 80, 24 | |
| def show_status(msg): | |
| cols, rows = term_size() | |
| sys.stdout.write(f"\033[{rows};0H\033[2K{INV} {msg:<{cols-1}}{R}") | |
| sys.stdout.flush() | |
| def clear_status(): | |
| _, rows = term_size() | |
| sys.stdout.write(f"\033[{rows};0H\033[2K") | |
| sys.stdout.flush() | |
| def setup_live_region(): | |
| """Set scroll region to exclude last row (for persistent status bar).""" | |
| _, rows = term_size() | |
| sys.stdout.write(f"\033[1;{rows-1}r\033[{rows-1};1H") | |
| sys.stdout.flush() | |
| def reset_scroll_region(): | |
| sys.stdout.write("\033[r") | |
| sys.stdout.flush() | |
| def show_live_status(msg): | |
| cols, rows = term_size() | |
| sys.stdout.write(f"\0337\033[{rows};0H\033[2K{INV} {msg:<{cols-1}}{R}\0338") | |
| sys.stdout.flush() | |
| # --- Keyboard --------------------------------------------------------------- | |
| if os.name == "nt": | |
| # Windows: no interactive keys when stdin is piped (console input unreliable) | |
| # Use --level and --filter flags instead, or pipe through `more` | |
| _INTERACTIVE = False | |
| def key_nb(): return None | |
| def key_b(): return None | |
| else: | |
| import tty, termios | |
| _tty_fd = None; _tty_old = None | |
| def _init_tty(): | |
| global _tty_fd, _tty_old | |
| if _tty_fd is not None: return | |
| try: | |
| _tty_fd = os.open("/dev/tty", os.O_RDONLY) | |
| _tty_old = termios.tcgetattr(_tty_fd) | |
| tty.setraw(_tty_fd) | |
| except (OSError, termios.error): _tty_fd = None | |
| def _cleanup_tty(): | |
| if _tty_fd is not None and _tty_old is not None: | |
| try: termios.tcsetattr(_tty_fd, termios.TCSADRAIN, _tty_old); os.close(_tty_fd) | |
| except (OSError, termios.error): pass | |
| import atexit; atexit.register(_cleanup_tty) | |
| def key_nb(): | |
| _init_tty() | |
| if _tty_fd is None: return None | |
| try: | |
| r, _, _ = select.select([_tty_fd], [], [], 0) | |
| return os.read(_tty_fd, 1).decode("utf-8", errors="ignore") or None if r else None | |
| except OSError: return None | |
| def key_b(): | |
| _init_tty() | |
| if _tty_fd is None: return None | |
| try: | |
| r, _, _ = select.select([_tty_fd], [], [], 5) | |
| return os.read(_tty_fd, 1).decode("utf-8", errors="ignore") if r else None | |
| except OSError: return None | |
| # --- Pager: render only the visible window from raw lines -------------------- | |
| def _write_line(row, text, cols): | |
| """Write text at a specific row, clear the line first, truncate to fit.""" | |
| safe = truncate(text, cols - 2) | |
| sys.stdout.write(f"\033[{row};1H\033[2K{safe}{R}") | |
| def draw_window(raw_lines, offset, visible, highlight=None, mark_idx=None): | |
| """Render `visible` lines directly from raw_lines[offset..]. Only formats what's on screen.""" | |
| cols, _ = term_size() | |
| sys.stdout.write("\033[?25l\033[r\033[2J\033[H") # hide cursor, reset scroll region, clear, home | |
| total = len(raw_lines) | |
| if total == 0: | |
| sys.stdout.flush() | |
| return 0 | |
| if offset < 0: | |
| offset = max(0, total - visible) | |
| offset = max(0, min(offset, max(0, total - visible))) | |
| prev_grp = None | |
| if offset > 0: | |
| prev_grp = _module_tag(raw_lines[offset - 1][1]) | |
| row = 1 | |
| i = offset | |
| while i < total and row <= visible: | |
| t, raw = raw_lines[i] | |
| tag, mod = _module_tag(raw) | |
| grp = (tag, mod) | |
| if mod and grp != prev_grp and row <= visible: | |
| _write_line(row, _group_hdr(tag, mod), cols) | |
| row += 1 | |
| if row > visible: | |
| break | |
| prev_grp = grp | |
| line = _format_one(raw, t, highlight) | |
| if mark_idx is not None and i == mark_idx: | |
| _write_line(row, f"{YEL}>{R}{line}", cols) | |
| else: | |
| _write_line(row, f" {line}", cols) | |
| row += 1 | |
| i += 1 | |
| sys.stdout.write("\033[?25h") # show cursor | |
| sys.stdout.flush() | |
| return offset | |
| # --- Main -------------------------------------------------------------------- | |
| def main(): | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument("--filter", "-f") | |
| parser.add_argument("--level", "-l") | |
| parser.add_argument("--no-http", action="store_true") | |
| parser.add_argument("--buffer", "-b", type=int, default=10000) | |
| parser.add_argument("--no-ts", action="store_true", help="Disable docker timestamp parsing") | |
| args = parser.parse_args() | |
| min_level = LEVEL_ORDER.get(args.level.upper(), 0) if args.level else 0 | |
| line_filter = re.compile(args.filter, re.IGNORECASE) if args.filter else None | |
| parse_ts = not args.no_ts | |
| buf = deque(maxlen=args.buffer) | |
| queue = deque() | |
| eof = threading.Event() | |
| def reader(): | |
| try: | |
| for line in sys.stdin: | |
| queue.append(line.rstrip()) | |
| eof.set() | |
| except Exception: | |
| eof.set() | |
| threading.Thread(target=reader, daemon=True).start() | |
| # Catchup | |
| show_status("Loading history...") | |
| while not eof.is_set(): | |
| drained = False | |
| while queue: | |
| raw = queue.popleft() | |
| if raw and "core.middleware" not in raw: | |
| if parse_ts: | |
| t, raw = _strip_docker_ts(raw) | |
| t = t or datetime.now().strftime("%H:%M:%S") | |
| buf.append((t, raw)) | |
| drained = True | |
| if not drained: | |
| time.sleep(0.3) | |
| if not queue: break | |
| else: | |
| time.sleep(0.02) | |
| # Initial display | |
| _, rows = term_size() | |
| vis = rows - 2 | |
| bl = list(buf) | |
| draw_window(bl, -1, vis) | |
| setup_live_region() | |
| show_live_status(f"{len(bl)} lines | /:search e/E:errors w/W:warn p:pause q:quit") | |
| # State | |
| paused = False | |
| view = None # raw lines for browse mode (None = live) | |
| view_off = -1 | |
| highlight = None | |
| matches = [] # indices into view[] | |
| match_idx = -1 | |
| mark = None | |
| prev_grp = None | |
| filter_mode = None # None, "error", "warn" | |
| q_msg_until = 0 | |
| def pct(off, total): | |
| v = term_size()[1] - 2 | |
| end = min(off + v, total) if off >= 0 else total | |
| p = int(end / max(total, 1) * 100) | |
| return "[END]" if p >= 100 else f"[{p}%]" | |
| def browse(lines, hl=None, center=None, msg=""): | |
| nonlocal paused, view, view_off, highlight, mark | |
| paused = True | |
| view = lines | |
| highlight = hl | |
| mark = center | |
| _, r = term_size() | |
| v = r - 2 | |
| if center is not None: | |
| view_off = max(0, center - v // 2) | |
| else: | |
| view_off = -1 | |
| view_off = draw_window(view, view_off, v, highlight, mark) | |
| show_status(f"{msg} {pct(view_off, len(view))} s/d:scroll n/N:match g/G:top/end a:live q:quit") | |
| def redraw(msg=""): | |
| nonlocal view_off | |
| if view is None: return | |
| _, r = term_size() | |
| v = r - 2 | |
| view_off = draw_window(view, view_off, v, highlight, mark) | |
| show_status(f"{msg} {pct(view_off, len(view))} s/d:scroll n/N:match g/G:top/end a:live q:quit") | |
| def scroll(delta): | |
| nonlocal view_off, mark | |
| if view is None: return | |
| _, r = term_size() | |
| v = r - 2 | |
| mx = max(0, len(view) - v) | |
| if view_off < 0: view_off = mx | |
| view_off = max(0, min(view_off + delta, mx)) | |
| mark = None | |
| redraw() | |
| def read_search(): | |
| reset_scroll_region() | |
| show_status("Search: ") | |
| chars = [] | |
| while True: | |
| c = key_b() | |
| if c is None: continue | |
| if c in ("\r", "\n"): break | |
| if c == "\x1b": return None | |
| if c in ("\x7f", "\x08"): | |
| if chars: chars.pop() | |
| show_status(f"Search: {''.join(chars)}") | |
| continue | |
| chars.append(c) | |
| show_status(f"Search: {''.join(chars)}") | |
| return "".join(chars) if chars else None | |
| def resume_live(): | |
| nonlocal paused, view, matches, highlight, mark, prev_grp, filter_mode | |
| paused = False | |
| view = None | |
| matches = [] | |
| highlight = None | |
| mark = None | |
| filter_mode = None | |
| bl = list(buf) | |
| _, r = term_size() | |
| draw_window(bl, -1, r - 2) | |
| setup_live_region() | |
| show_live_status(f"{len(buf)} lines | /:search e/E:errors w/W:warn p:pause q:quit") | |
| prev_grp = None | |
| # --- Live loop --- | |
| try: | |
| while True: | |
| k = key_nb() | |
| if k in ("\x03", "\x04"): # Ctrl+C, Ctrl+D | |
| break | |
| elif k == "q": | |
| q_msg_until = time.time() + 2 | |
| if paused: | |
| show_status("Press Ctrl+C twice to exit") | |
| else: | |
| show_live_status("Press Ctrl+C twice to exit") | |
| elif k == "/": | |
| filter_mode = None | |
| pat_str = read_search() | |
| if pat_str: | |
| bl = list(buf) | |
| try: | |
| pat = re.compile(pat_str, re.IGNORECASE) | |
| matches = [i for i, (_, r) in enumerate(bl) if pat.search(r)] | |
| except re.error: | |
| matches = [] | |
| if matches: | |
| match_idx = len(matches) - 1 | |
| browse(bl, hl=pat_str, center=matches[match_idx], | |
| msg=f"{len(matches)} matches |") | |
| else: | |
| show_status(f"No matches for '{pat_str}' | a: resume") | |
| paused = True | |
| view = bl | |
| else: | |
| setup_live_region() | |
| show_live_status(f"{len(buf)} lines | /:search e/E:errors w/W:warn p:pause q:quit") | |
| elif k == "n" and matches and view: | |
| match_idx = min(match_idx + 1, len(matches) - 1) | |
| mark = matches[match_idx] | |
| _, r = term_size() | |
| view_off = max(0, mark - (r - 2) // 2) | |
| redraw(f"Match {match_idx+1}/{len(matches)} |") | |
| elif k == "N" and matches and view: | |
| match_idx = max(match_idx - 1, 0) | |
| mark = matches[match_idx] | |
| _, r = term_size() | |
| view_off = max(0, mark - (r - 2) // 2) | |
| redraw(f"Match {match_idx+1}/{len(matches)} |") | |
| elif k in ("e", "E"): | |
| if filter_mode == "error" and view and matches: | |
| if k == "e": | |
| match_idx = max(match_idx - 1, 0) | |
| else: | |
| match_idx = min(match_idx + 1, len(matches) - 1) | |
| mark = matches[match_idx] | |
| _, r = term_size() | |
| view_off = max(0, mark - (r - 2) // 2) | |
| redraw(f"Error {match_idx+1}/{len(matches)} |") | |
| elif k == "e": | |
| bl = list(buf) | |
| err_indices = [i for i, (_, r) in enumerate(bl) if line_level(r) == "ERROR"] | |
| if err_indices: | |
| filter_mode = "error" | |
| matches = err_indices | |
| match_idx = len(err_indices) - 1 | |
| browse(bl, center=err_indices[-1], | |
| msg=f"Error {len(err_indices)}/{len(err_indices)} |") | |
| else: | |
| show_status("No errors found") | |
| elif k in ("w", "W"): | |
| if filter_mode == "warn" and view and matches: | |
| if k == "w": | |
| match_idx = max(match_idx - 1, 0) | |
| else: | |
| match_idx = min(match_idx + 1, len(matches) - 1) | |
| mark = matches[match_idx] | |
| _, r = term_size() | |
| view_off = max(0, mark - (r - 2) // 2) | |
| redraw(f"Warn {match_idx+1}/{len(matches)} |") | |
| elif k == "w": | |
| bl = list(buf) | |
| wrn_indices = [i for i, (_, r) in enumerate(bl) if LEVEL_ORDER.get(line_level(r), 0) >= 2] | |
| if wrn_indices: | |
| filter_mode = "warn" | |
| matches = wrn_indices | |
| match_idx = len(wrn_indices) - 1 | |
| browse(bl, center=wrn_indices[-1], | |
| msg=f"Warn {len(wrn_indices)}/{len(wrn_indices)} |") | |
| else: | |
| show_status("No warnings found") | |
| elif k in ("s", "S", "d", "D"): | |
| if not paused: | |
| filter_mode = None | |
| matches = [] | |
| highlight = None | |
| browse(list(buf), msg="PAUSED |") | |
| if k == "s": scroll(-1) | |
| elif k == "S": scroll(1) | |
| elif k == "d": scroll(-(term_size()[1] - 2)) | |
| elif k == "D": scroll(term_size()[1] - 2) | |
| elif k == "G" and paused: scroll(999999) | |
| elif k == "g" and paused: | |
| if view: | |
| view_off = 0; mark = None; redraw() | |
| elif k == "\x1b": | |
| k2 = key_nb() | |
| if k2 == "[": | |
| k3 = key_nb() | |
| if k3 in ("A", "B", "C", "D", "1"): | |
| # Shift+arrows: \x1b[1;2A / \x1b[1;2B | |
| shift = False | |
| if k3 == "1": | |
| k4 = key_nb() # ; | |
| k5 = key_nb() # 2 | |
| k3 = key_nb() # A or B | |
| shift = True | |
| if not paused: | |
| filter_mode = None | |
| matches = [] | |
| highlight = None | |
| browse(list(buf), msg="PAUSED |") | |
| if shift: | |
| if k3 == "A": scroll(-(term_size()[1] - 2)) | |
| elif k3 == "B": scroll(term_size()[1] - 2) | |
| else: | |
| if k3 == "A": scroll(-1) | |
| elif k3 == "B": scroll(1) | |
| elif k3 in ("C", "D") and matches and view: | |
| if k3 == "C": # right = next (newer) | |
| match_idx = min(match_idx + 1, len(matches) - 1) | |
| else: # left = prev (older) | |
| match_idx = max(match_idx - 1, 0) | |
| mark = matches[match_idx] | |
| _, r = term_size() | |
| view_off = max(0, mark - (r - 2) // 2) | |
| redraw(f"Match {match_idx+1}/{len(matches)} |") | |
| elif k == "a": resume_live() | |
| elif k == "p": | |
| if not paused: | |
| filter_mode = None | |
| matches = [] | |
| highlight = None | |
| browse(list(buf), msg="PAUSED |") | |
| else: | |
| resume_live() | |
| elif k == "c": | |
| sys.stdout.write("\033[2J") | |
| _, r = term_size() | |
| sys.stdout.write(f"\033[{r-1};1H") | |
| sys.stdout.flush() | |
| show_live_status(f"{len(buf)} lines | /:search e/E:errors w/W:warn p:pause q:quit") | |
| prev_grp = None | |
| # --- Live lines --- | |
| processed = 0 | |
| while queue and processed < 50: | |
| raw = queue.popleft() | |
| processed += 1 | |
| if not raw or "core.middleware" in raw: continue | |
| if parse_ts: | |
| ts, raw = _strip_docker_ts(raw) | |
| else: | |
| ts = None | |
| t = ts or datetime.now().strftime("%H:%M:%S") | |
| buf.append((t, raw)) | |
| if paused: continue | |
| lvl = line_level(raw) | |
| if LEVEL_ORDER.get(lvl, 0) < min_level: continue | |
| if args.no_http and _is_http(raw): continue | |
| if line_filter and not line_filter.search(raw): continue | |
| cols, _ = term_size() | |
| tag, mod = _module_tag(raw) | |
| grp = (tag, mod) | |
| if mod and grp != prev_grp: | |
| sys.stdout.write(f"\033[2K{truncate(_group_hdr(tag, mod), cols - 2)}{R}\n") | |
| prev_grp = grp | |
| line = _format_one(raw, t) | |
| sys.stdout.write(f"\033[2K{truncate(f' {line}', cols - 2)}{R}\n") | |
| sys.stdout.flush() | |
| if q_msg_until and time.time() > q_msg_until: | |
| q_msg_until = 0 | |
| if paused: | |
| redraw() | |
| else: | |
| show_live_status(f"{len(buf)} lines | /:search e/E:errors w/W:warn p:pause q:quit") | |
| elif not paused and not q_msg_until: | |
| show_live_status(f"{len(buf)} lines | /:search e/E:errors w/W:warn p:pause q:quit") | |
| if eof.is_set() and not queue: break | |
| time.sleep(0.05) | |
| except KeyboardInterrupt: | |
| pass | |
| finally: | |
| reset_scroll_region() | |
| clear_status() | |
| print(R) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment