|
import argparse |
|
import re |
|
import sys |
|
|
|
LABEL_SET_RE = re.compile(r"^\{([^}]*)\}") |
|
LABEL_RE = re.compile(r'(\w+?)="([^"]*)"') |
|
|
|
|
|
def parse_labels(line): |
|
m = LABEL_SET_RE.match(line) |
|
if not m: |
|
return {} |
|
inside = m.group(1) |
|
return {k: v for k, v in LABEL_RE.findall(inside)} |
|
|
|
|
|
def list_label_values(label_name): |
|
seen = set() |
|
for line in sys.stdin: |
|
labels = parse_labels(line) |
|
if label_name in labels: |
|
value = labels[label_name] |
|
if value not in seen: |
|
seen.add(value) |
|
print(value) |
|
sys.stdout.flush() |
|
|
|
|
|
def list_label_names(): |
|
seen = set() |
|
for line in sys.stdin: |
|
labels = parse_labels(line) |
|
for key in labels.keys(): |
|
if key not in seen: |
|
seen.add(key) |
|
print(key) |
|
sys.stdout.flush() |
|
|
|
|
|
def filter_metrics_by_labels(label_filters): |
|
for line in sys.stdin: |
|
labels = parse_labels(line) |
|
if all(labels.get(k) == v for k, v in label_filters.items()): |
|
sys.stdout.write(line) |
|
|
|
|
|
def extract_value_timestamp(): |
|
for line in sys.stdin: |
|
parts = line.strip().split() |
|
if len(parts) < 2: |
|
continue |
|
value, timestamp = parts[-2], parts[-1] |
|
sys.stdout.write(f"{value},{timestamp}\n") |
|
|
|
|
|
def main(): |
|
parser = argparse.ArgumentParser() |
|
subparsers = parser.add_subparsers(dest="command", required=True) |
|
|
|
filter_parser = subparsers.add_parser("filter") |
|
filter_parser.add_argument("--label", action="append", required=True) |
|
|
|
llv_parser = subparsers.add_parser("list_label_values") |
|
llv_parser.add_argument("label") |
|
|
|
subparsers.add_parser("list_label_names") |
|
subparsers.add_parser("csv") |
|
|
|
args = parser.parse_args() |
|
|
|
if args.command == "filter": |
|
filters = {} |
|
for pair in args.label: |
|
if "=" not in pair: |
|
raise ValueError(f"Invalid label filter: {pair}") |
|
k, v = pair.split("=", 1) |
|
filters[k] = v |
|
filter_metrics_by_labels(filters) |
|
|
|
elif args.command == "list_label_values": |
|
list_label_values(args.label) |
|
|
|
elif args.command == "list_label_names": |
|
list_label_names() |
|
|
|
elif args.command == "csv": |
|
extract_value_timestamp() |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |