Skip to content

Instantly share code, notes, and snippets.

@vfmatzkin
Created February 24, 2026 16:31
Show Gist options
  • Select an option

  • Save vfmatzkin/36b52485cb4305c07081a1e4d1183b29 to your computer and use it in GitHub Desktop.

Select an option

Save vfmatzkin/36b52485cb4305c07081a1e4d1183b29 to your computer and use it in GitHub Desktop.
Interactive Log Pager (tail + search + scroll)
#!/usr/bin/env python3
"""
Colored log viewer with search and scroll.
Keys:
/ Search n/N Next/prev match
e/E Errors ↓/↑ w/W Warnings+ ↓/↑
s/S Line ↑/↓ d/D Page ↑/↓
g/G Top/bottom p Pause/unpause
a Resume live c Clear q Quit
"""
import sys, re, os, argparse, threading, time, select
from collections import deque
from datetime import datetime
if os.name == "nt":
os.system("")
# --- Colors ------------------------------------------------------------------
R = "\033[0m"; RED = "\033[1;31m"; YEL = "\033[33m"; GRN = "\033[32m"
CYN = "\033[36m"; MAG = "\033[35m"; DIM = "\033[90m"; BOLD = "\033[1m"
INV = "\033[7m"; BG_YEL = "\033[43m"; BG_RED = "\033[41m"; WHITE = "\033[97m"; BLACK = "\033[30m"
ANSI_RE = re.compile(r"\033\[[0-9;]*m")
def truncate(s, width):
vlen = 0; i = 0
while i < len(s) and vlen < width:
if s[i] == "\033":
end = s.find("m", i)
if end != -1: i = end + 1; continue
vlen += 1; i += 1
while i < len(s) and s[i] == "\033":
end = s.find("m", i)
if end != -1: i = end + 1
else: break
return s[:i] + R
# --- Patterns ----------------------------------------------------------------
http_re = re.compile(r'"(GET|POST|PUT|DELETE|PATCH) ([^ ?]+)\S* HTTP/\S+" (\d+)')
access_re = re.compile(r'^INFO:\s+\d+\.\d+\.\d+\.\d+:\d+\s+-\s+"(GET|POST|PUT|DELETE|PATCH) ([^ ?]+)\S* HTTP/\S+" (\d+)')
info_re = re.compile(r"^INFO:(.+?):(.+)$")
err_re = re.compile(r"^ERROR:(.+?):(.+)$")
wrn_re = re.compile(r"^WARNING:(.+?):(.+)$")
LEVEL_ORDER = {"DEBUG": 0, "INFO": 1, "WARNING": 2, "ERROR": 3}
_docker_ts_re = re.compile(r'^(\d{4}-\d{2}-\d{2}T(\d{2}:\d{2}:\d{2})\.\d+Z)\s+')
def _strip_docker_ts(raw):
"""Strip docker --timestamps prefix; return (time_str, clean_line)."""
m = _docker_ts_re.match(raw)
if m:
return m.group(2), raw[m.end():]
return None, raw
def _http_match(raw):
return access_re.match(raw) or http_re.search(raw)
def _is_http(raw):
return bool(_http_match(raw))
def line_level(raw):
if "ERROR" in raw or "Traceback" in raw or "Exception" in raw: return "ERROR"
if "WARNING" in raw: return "WARNING"
if "DEBUG" in raw: return "DEBUG"
m = _http_match(raw)
if m:
code = int(m.group(3))
return "ERROR" if code >= 500 else "WARNING" if code >= 400 else "INFO"
return "INFO"
def _module_tag(raw):
if _is_http(raw): return "HTTP", None
if (m := err_re.match(raw)): return "ERR", m.group(1).rsplit(".", 1)[-1]
if "ERROR" in raw or "Traceback" in raw: return "ERR", None
if (m := wrn_re.match(raw)): return "WRN", m.group(1).rsplit(".", 1)[-1]
if "WARNING" in raw: return "WRN", None
if (m := info_re.match(raw)): return "INFO", m.group(1).rsplit(".", 1)[-1]
return "OTHER", None
def _format_one(raw, t, highlight=None):
"""Format a single raw line. Returns string."""
if (m := _http_match(raw)):
method, path, status = m.group(1), m.group(2), m.group(3)
code = int(status)
c = RED if code >= 500 else YEL if code >= 400 else MAG if code >= 300 else CYN
line = f"{DIM}{t}{R} {c}{BOLD}{method:<5}{R} {c}{status} {path}{R}"
elif "ERROR" in raw or "Traceback" in raw or "Exception" in raw:
m2 = err_re.match(raw)
if m2: line = f"{DIM}{t}{R} {RED}{DIM}\u2502{R} {RED}{m2.group(2)}{R}"
else: line = f"{DIM}{t}{R} {RED}{raw}{R}"
elif "WARNING" in raw:
m2 = wrn_re.match(raw)
if m2: line = f"{DIM}{t}{R} {YEL}{DIM}\u2502{R} {YEL}{m2.group(2)}{R}"
else: line = f"{DIM}{t}{R} {YEL}{raw}{R}"
elif (m := info_re.match(raw)):
line = f"{DIM}{t}{R} {GRN}{DIM}\u2502{R} {m.group(2)}"
elif "DEBUG" in raw:
msg = re.sub(r"^DEBUG:\S*:", "", raw)
line = f"{DIM}{t} {msg}{R}"
else:
line = f"{DIM}{t}{R} {raw}"
if highlight:
try:
pat = re.compile(f"({highlight})", re.IGNORECASE)
line = pat.sub(f"{BG_RED}{WHITE}{BOLD}\\1{R}", line)
except re.error: pass
return line
def _group_hdr(tag, mod):
if tag == "ERR": return f" {RED}{BOLD}{mod}{R}"
if tag == "WRN": return f" {YEL}{BOLD}{mod}{R}"
return f" {GRN}{BOLD}{mod}{R}"
# --- Terminal ----------------------------------------------------------------
def term_size():
try: return os.get_terminal_size()
except OSError: return 80, 24
def show_status(msg):
cols, rows = term_size()
sys.stdout.write(f"\033[{rows};0H\033[2K{INV} {msg:<{cols-1}}{R}")
sys.stdout.flush()
def clear_status():
_, rows = term_size()
sys.stdout.write(f"\033[{rows};0H\033[2K")
sys.stdout.flush()
def setup_live_region():
"""Set scroll region to exclude last row (for persistent status bar)."""
_, rows = term_size()
sys.stdout.write(f"\033[1;{rows-1}r\033[{rows-1};1H")
sys.stdout.flush()
def reset_scroll_region():
sys.stdout.write("\033[r")
sys.stdout.flush()
def show_live_status(msg):
cols, rows = term_size()
sys.stdout.write(f"\0337\033[{rows};0H\033[2K{INV} {msg:<{cols-1}}{R}\0338")
sys.stdout.flush()
# --- Keyboard ---------------------------------------------------------------
if os.name == "nt":
# Windows: no interactive keys when stdin is piped (console input unreliable)
# Use --level and --filter flags instead, or pipe through `more`
_INTERACTIVE = False
def key_nb(): return None
def key_b(): return None
else:
import tty, termios
_tty_fd = None; _tty_old = None
def _init_tty():
global _tty_fd, _tty_old
if _tty_fd is not None: return
try:
_tty_fd = os.open("/dev/tty", os.O_RDONLY)
_tty_old = termios.tcgetattr(_tty_fd)
tty.setraw(_tty_fd)
except (OSError, termios.error): _tty_fd = None
def _cleanup_tty():
if _tty_fd is not None and _tty_old is not None:
try: termios.tcsetattr(_tty_fd, termios.TCSADRAIN, _tty_old); os.close(_tty_fd)
except (OSError, termios.error): pass
import atexit; atexit.register(_cleanup_tty)
def key_nb():
_init_tty()
if _tty_fd is None: return None
try:
r, _, _ = select.select([_tty_fd], [], [], 0)
return os.read(_tty_fd, 1).decode("utf-8", errors="ignore") or None if r else None
except OSError: return None
def key_b():
_init_tty()
if _tty_fd is None: return None
try:
r, _, _ = select.select([_tty_fd], [], [], 5)
return os.read(_tty_fd, 1).decode("utf-8", errors="ignore") if r else None
except OSError: return None
# --- Pager: render only the visible window from raw lines --------------------
def _write_line(row, text, cols):
"""Write text at a specific row, clear the line first, truncate to fit."""
safe = truncate(text, cols - 2)
sys.stdout.write(f"\033[{row};1H\033[2K{safe}{R}")
def draw_window(raw_lines, offset, visible, highlight=None, mark_idx=None):
"""Render `visible` lines directly from raw_lines[offset..]. Only formats what's on screen."""
cols, _ = term_size()
sys.stdout.write("\033[?25l\033[r\033[2J\033[H") # hide cursor, reset scroll region, clear, home
total = len(raw_lines)
if total == 0:
sys.stdout.flush()
return 0
if offset < 0:
offset = max(0, total - visible)
offset = max(0, min(offset, max(0, total - visible)))
prev_grp = None
if offset > 0:
prev_grp = _module_tag(raw_lines[offset - 1][1])
row = 1
i = offset
while i < total and row <= visible:
t, raw = raw_lines[i]
tag, mod = _module_tag(raw)
grp = (tag, mod)
if mod and grp != prev_grp and row <= visible:
_write_line(row, _group_hdr(tag, mod), cols)
row += 1
if row > visible:
break
prev_grp = grp
line = _format_one(raw, t, highlight)
if mark_idx is not None and i == mark_idx:
_write_line(row, f"{YEL}>{R}{line}", cols)
else:
_write_line(row, f" {line}", cols)
row += 1
i += 1
sys.stdout.write("\033[?25h") # show cursor
sys.stdout.flush()
return offset
# --- Main --------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--filter", "-f")
parser.add_argument("--level", "-l")
parser.add_argument("--no-http", action="store_true")
parser.add_argument("--buffer", "-b", type=int, default=10000)
parser.add_argument("--no-ts", action="store_true", help="Disable docker timestamp parsing")
args = parser.parse_args()
min_level = LEVEL_ORDER.get(args.level.upper(), 0) if args.level else 0
line_filter = re.compile(args.filter, re.IGNORECASE) if args.filter else None
parse_ts = not args.no_ts
buf = deque(maxlen=args.buffer)
queue = deque()
eof = threading.Event()
def reader():
try:
for line in sys.stdin:
queue.append(line.rstrip())
eof.set()
except Exception:
eof.set()
threading.Thread(target=reader, daemon=True).start()
# Catchup
show_status("Loading history...")
while not eof.is_set():
drained = False
while queue:
raw = queue.popleft()
if raw and "core.middleware" not in raw:
if parse_ts:
t, raw = _strip_docker_ts(raw)
t = t or datetime.now().strftime("%H:%M:%S")
buf.append((t, raw))
drained = True
if not drained:
time.sleep(0.3)
if not queue: break
else:
time.sleep(0.02)
# Initial display
_, rows = term_size()
vis = rows - 2
bl = list(buf)
draw_window(bl, -1, vis)
setup_live_region()
show_live_status(f"{len(bl)} lines | /:search e/E:errors w/W:warn p:pause q:quit")
# State
paused = False
view = None # raw lines for browse mode (None = live)
view_off = -1
highlight = None
matches = [] # indices into view[]
match_idx = -1
mark = None
prev_grp = None
filter_mode = None # None, "error", "warn"
q_msg_until = 0
def pct(off, total):
v = term_size()[1] - 2
end = min(off + v, total) if off >= 0 else total
p = int(end / max(total, 1) * 100)
return "[END]" if p >= 100 else f"[{p}%]"
def browse(lines, hl=None, center=None, msg=""):
nonlocal paused, view, view_off, highlight, mark
paused = True
view = lines
highlight = hl
mark = center
_, r = term_size()
v = r - 2
if center is not None:
view_off = max(0, center - v // 2)
else:
view_off = -1
view_off = draw_window(view, view_off, v, highlight, mark)
show_status(f"{msg} {pct(view_off, len(view))} s/d:scroll n/N:match g/G:top/end a:live q:quit")
def redraw(msg=""):
nonlocal view_off
if view is None: return
_, r = term_size()
v = r - 2
view_off = draw_window(view, view_off, v, highlight, mark)
show_status(f"{msg} {pct(view_off, len(view))} s/d:scroll n/N:match g/G:top/end a:live q:quit")
def scroll(delta):
nonlocal view_off, mark
if view is None: return
_, r = term_size()
v = r - 2
mx = max(0, len(view) - v)
if view_off < 0: view_off = mx
view_off = max(0, min(view_off + delta, mx))
mark = None
redraw()
def read_search():
reset_scroll_region()
show_status("Search: ")
chars = []
while True:
c = key_b()
if c is None: continue
if c in ("\r", "\n"): break
if c == "\x1b": return None
if c in ("\x7f", "\x08"):
if chars: chars.pop()
show_status(f"Search: {''.join(chars)}")
continue
chars.append(c)
show_status(f"Search: {''.join(chars)}")
return "".join(chars) if chars else None
def resume_live():
nonlocal paused, view, matches, highlight, mark, prev_grp, filter_mode
paused = False
view = None
matches = []
highlight = None
mark = None
filter_mode = None
bl = list(buf)
_, r = term_size()
draw_window(bl, -1, r - 2)
setup_live_region()
show_live_status(f"{len(buf)} lines | /:search e/E:errors w/W:warn p:pause q:quit")
prev_grp = None
# --- Live loop ---
try:
while True:
k = key_nb()
if k in ("\x03", "\x04"): # Ctrl+C, Ctrl+D
break
elif k == "q":
q_msg_until = time.time() + 2
if paused:
show_status("Press Ctrl+C twice to exit")
else:
show_live_status("Press Ctrl+C twice to exit")
elif k == "/":
filter_mode = None
pat_str = read_search()
if pat_str:
bl = list(buf)
try:
pat = re.compile(pat_str, re.IGNORECASE)
matches = [i for i, (_, r) in enumerate(bl) if pat.search(r)]
except re.error:
matches = []
if matches:
match_idx = len(matches) - 1
browse(bl, hl=pat_str, center=matches[match_idx],
msg=f"{len(matches)} matches |")
else:
show_status(f"No matches for '{pat_str}' | a: resume")
paused = True
view = bl
else:
setup_live_region()
show_live_status(f"{len(buf)} lines | /:search e/E:errors w/W:warn p:pause q:quit")
elif k == "n" and matches and view:
match_idx = min(match_idx + 1, len(matches) - 1)
mark = matches[match_idx]
_, r = term_size()
view_off = max(0, mark - (r - 2) // 2)
redraw(f"Match {match_idx+1}/{len(matches)} |")
elif k == "N" and matches and view:
match_idx = max(match_idx - 1, 0)
mark = matches[match_idx]
_, r = term_size()
view_off = max(0, mark - (r - 2) // 2)
redraw(f"Match {match_idx+1}/{len(matches)} |")
elif k in ("e", "E"):
if filter_mode == "error" and view and matches:
if k == "e":
match_idx = max(match_idx - 1, 0)
else:
match_idx = min(match_idx + 1, len(matches) - 1)
mark = matches[match_idx]
_, r = term_size()
view_off = max(0, mark - (r - 2) // 2)
redraw(f"Error {match_idx+1}/{len(matches)} |")
elif k == "e":
bl = list(buf)
err_indices = [i for i, (_, r) in enumerate(bl) if line_level(r) == "ERROR"]
if err_indices:
filter_mode = "error"
matches = err_indices
match_idx = len(err_indices) - 1
browse(bl, center=err_indices[-1],
msg=f"Error {len(err_indices)}/{len(err_indices)} |")
else:
show_status("No errors found")
elif k in ("w", "W"):
if filter_mode == "warn" and view and matches:
if k == "w":
match_idx = max(match_idx - 1, 0)
else:
match_idx = min(match_idx + 1, len(matches) - 1)
mark = matches[match_idx]
_, r = term_size()
view_off = max(0, mark - (r - 2) // 2)
redraw(f"Warn {match_idx+1}/{len(matches)} |")
elif k == "w":
bl = list(buf)
wrn_indices = [i for i, (_, r) in enumerate(bl) if LEVEL_ORDER.get(line_level(r), 0) >= 2]
if wrn_indices:
filter_mode = "warn"
matches = wrn_indices
match_idx = len(wrn_indices) - 1
browse(bl, center=wrn_indices[-1],
msg=f"Warn {len(wrn_indices)}/{len(wrn_indices)} |")
else:
show_status("No warnings found")
elif k in ("s", "S", "d", "D"):
if not paused:
filter_mode = None
matches = []
highlight = None
browse(list(buf), msg="PAUSED |")
if k == "s": scroll(-1)
elif k == "S": scroll(1)
elif k == "d": scroll(-(term_size()[1] - 2))
elif k == "D": scroll(term_size()[1] - 2)
elif k == "G" and paused: scroll(999999)
elif k == "g" and paused:
if view:
view_off = 0; mark = None; redraw()
elif k == "\x1b":
k2 = key_nb()
if k2 == "[":
k3 = key_nb()
if k3 in ("A", "B", "C", "D", "1"):
# Shift+arrows: \x1b[1;2A / \x1b[1;2B
shift = False
if k3 == "1":
k4 = key_nb() # ;
k5 = key_nb() # 2
k3 = key_nb() # A or B
shift = True
if not paused:
filter_mode = None
matches = []
highlight = None
browse(list(buf), msg="PAUSED |")
if shift:
if k3 == "A": scroll(-(term_size()[1] - 2))
elif k3 == "B": scroll(term_size()[1] - 2)
else:
if k3 == "A": scroll(-1)
elif k3 == "B": scroll(1)
elif k3 in ("C", "D") and matches and view:
if k3 == "C": # right = next (newer)
match_idx = min(match_idx + 1, len(matches) - 1)
else: # left = prev (older)
match_idx = max(match_idx - 1, 0)
mark = matches[match_idx]
_, r = term_size()
view_off = max(0, mark - (r - 2) // 2)
redraw(f"Match {match_idx+1}/{len(matches)} |")
elif k == "a": resume_live()
elif k == "p":
if not paused:
filter_mode = None
matches = []
highlight = None
browse(list(buf), msg="PAUSED |")
else:
resume_live()
elif k == "c":
sys.stdout.write("\033[2J")
_, r = term_size()
sys.stdout.write(f"\033[{r-1};1H")
sys.stdout.flush()
show_live_status(f"{len(buf)} lines | /:search e/E:errors w/W:warn p:pause q:quit")
prev_grp = None
# --- Live lines ---
processed = 0
while queue and processed < 50:
raw = queue.popleft()
processed += 1
if not raw or "core.middleware" in raw: continue
if parse_ts:
ts, raw = _strip_docker_ts(raw)
else:
ts = None
t = ts or datetime.now().strftime("%H:%M:%S")
buf.append((t, raw))
if paused: continue
lvl = line_level(raw)
if LEVEL_ORDER.get(lvl, 0) < min_level: continue
if args.no_http and _is_http(raw): continue
if line_filter and not line_filter.search(raw): continue
cols, _ = term_size()
tag, mod = _module_tag(raw)
grp = (tag, mod)
if mod and grp != prev_grp:
sys.stdout.write(f"\033[2K{truncate(_group_hdr(tag, mod), cols - 2)}{R}\n")
prev_grp = grp
line = _format_one(raw, t)
sys.stdout.write(f"\033[2K{truncate(f' {line}', cols - 2)}{R}\n")
sys.stdout.flush()
if q_msg_until and time.time() > q_msg_until:
q_msg_until = 0
if paused:
redraw()
else:
show_live_status(f"{len(buf)} lines | /:search e/E:errors w/W:warn p:pause q:quit")
elif not paused and not q_msg_until:
show_live_status(f"{len(buf)} lines | /:search e/E:errors w/W:warn p:pause q:quit")
if eof.is_set() and not queue: break
time.sleep(0.05)
except KeyboardInterrupt:
pass
finally:
reset_scroll_region()
clear_status()
print(R)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment