Skip to content

Instantly share code, notes, and snippets.

@bodokaiser
Last active December 26, 2025 18:34
Show Gist options
  • Select an option

  • Save bodokaiser/eb704712d1dbf677e6a2a92a9a596fea to your computer and use it in GitHub Desktop.

Select an option

Save bodokaiser/eb704712d1dbf677e6a2a92a9a596fea to your computer and use it in GitHub Desktop.
Helper tool to work with prometheus tsdb dumps

If you ever find yourself to have to go through old prometheus tsdb files, this is a quick guide on how to do so.

  1. Dump the tsdb data to a text file:
prometheus tsdb dump <path_to_prometheus_data_dir> > dump.txt
  1. List all metric names in the dump:
cat dump.txt | python prometheus_tdsb_dump.py list_label_values __name__
  1. Filter dump by metric name:
cat dump.txt | python prometheus_tdsb_dump.py filter --label __name__=<metric_name>' > dump_<metric_name>.txt
  1. List all label names in the filtered dump:
cat dump_<metric_name>.txt | python prometheus_tdsb_dump.py list_label_names
  1. Filter by label name and value and export as csv:
cat dump_<metric_name>.txt | python prometheus_tdsb_dump.py filter --label <label_name>=<label_value> | python prometheus_tdsb_dump.py csv > dump_<metric_name>_<label_name>_<label_value>.csv
import argparse
import re
import sys
LABEL_SET_RE = re.compile(r"^\{([^}]*)\}")
LABEL_RE = re.compile(r'(\w+?)="([^"]*)"')
def parse_labels(line):
m = LABEL_SET_RE.match(line)
if not m:
return {}
inside = m.group(1)
return {k: v for k, v in LABEL_RE.findall(inside)}
def list_label_values(label_name):
seen = set()
for line in sys.stdin:
labels = parse_labels(line)
if label_name in labels:
value = labels[label_name]
if value not in seen:
seen.add(value)
print(value)
sys.stdout.flush()
def list_label_names():
seen = set()
for line in sys.stdin:
labels = parse_labels(line)
for key in labels.keys():
if key not in seen:
seen.add(key)
print(key)
sys.stdout.flush()
def filter_metrics_by_labels(label_filters):
for line in sys.stdin:
labels = parse_labels(line)
if all(labels.get(k) == v for k, v in label_filters.items()):
sys.stdout.write(line)
def extract_value_timestamp():
for line in sys.stdin:
parts = line.strip().split()
if len(parts) < 2:
continue
value, timestamp = parts[-2], parts[-1]
sys.stdout.write(f"{value},{timestamp}\n")
def main():
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers(dest="command", required=True)
filter_parser = subparsers.add_parser("filter")
filter_parser.add_argument("--label", action="append", required=True)
llv_parser = subparsers.add_parser("list_label_values")
llv_parser.add_argument("label")
subparsers.add_parser("list_label_names")
subparsers.add_parser("csv")
args = parser.parse_args()
if args.command == "filter":
filters = {}
for pair in args.label:
if "=" not in pair:
raise ValueError(f"Invalid label filter: {pair}")
k, v = pair.split("=", 1)
filters[k] = v
filter_metrics_by_labels(filters)
elif args.command == "list_label_values":
list_label_values(args.label)
elif args.command == "list_label_names":
list_label_names()
elif args.command == "csv":
extract_value_timestamp()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment