Skip to content

Instantly share code, notes, and snippets.

@johnou
Created April 15, 2025 10:02
Show Gist options
  • Select an option

  • Save johnou/46946ef4b8e848e2f566b262655d97da to your computer and use it in GitHub Desktop.

Select an option

Save johnou/46946ef4b8e848e2f566b262655d97da to your computer and use it in GitHub Desktop.
Monitors a Java process and prints top CPU threads with cleaned stack traces.
import subprocess
import time
import re
# === CONFIGURATION ===
JSTACK_PATH = "/usr/lib/jvm/java-11-amazon-corretto/bin/jstack"
JAVA_PID = "30381"
INTERVAL = 5 # seconds between samples
TOP_N = 5 # top N threads to show
STACK_LINES = 5 # stack lines for threads 2..N
TOP_STACK_LINES = 10 # stack lines for top 1 thread
EXCLUDE_PATTERNS = [
"software.amazon.jdbc",
"com.mysql"
]
def get_jstack_output(pid):
result = subprocess.run([JSTACK_PATH, pid], capture_output=True, text=True)
if result.returncode != 0:
raise RuntimeError(f"jstack failed: {result.stderr}")
return result.stdout
def get_thread_cpu(pid):
cmd = f"ps -eLo pid,tid,pcpu --no-headers | awk '$1 == {pid} {{print $2, $3}}'"
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
usage = {}
for line in result.stdout.strip().splitlines():
parts = line.strip().split()
if len(parts) == 2:
tid, cpu = parts
usage[int(tid)] = float(cpu)
return usage
def parse_jstack_threads(jstack_out):
thread_info = {}
current_trace = []
tid = None
thread_name = ""
for line in jstack_out.splitlines():
if line.startswith('"'):
if tid is not None:
thread_info[tid] = {
"name": thread_name,
"trace": "\n".join(current_trace)
}
current_trace = []
m = re.search(r'nid=0x([0-9a-fA-F]+)', line)
if m:
tid = int(m.group(1), 16)
thread_name = line.split('"')[1]
else:
tid = None
elif tid is not None:
current_trace.append(line)
if tid is not None and tid not in thread_info:
thread_info[tid] = {
"name": thread_name,
"trace": "\n".join(current_trace)
}
return thread_info
def filter_trace(trace):
lines = trace.strip().split("\n")
return [line for line in lines if not any(pat in line for pat in EXCLUDE_PATTERNS)]
def monitor_threads():
while True:
try:
jstack_out = get_jstack_output(JAVA_PID)
cpu_usage = get_thread_cpu(JAVA_PID)
thread_info = parse_jstack_threads(jstack_out)
merged = []
for tid, cpu in cpu_usage.items():
if tid in thread_info:
info = thread_info[tid]
merged.append((cpu, tid, info["name"], info["trace"]))
merged.sort(reverse=True)
print(f"\nTop {TOP_N} CPU-consuming threads in PID {JAVA_PID}:\n")
for idx, (cpu, tid, name, trace) in enumerate(merged[:TOP_N]):
print(f"[TID: {tid}] [CPU: {cpu:.2f}%] [Name: {name}]")
filtered = filter_trace(trace)
n_lines = TOP_STACK_LINES if idx == 0 else STACK_LINES
print("\n".join(filtered[:n_lines]))
print("-" * 80)
time.sleep(INTERVAL)
except Exception as e:
print(f"Error: {e}")
time.sleep(INTERVAL)
if __name__ == "__main__":
monitor_threads()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment