Last active
November 27, 2024 20:42
-
-
Save ondrejmo/beeb4276b0b72b3ff1b867a2e35529a3 to your computer and use it in GitHub Desktop.
Simple library I used for querying Prometheus API and storing the results in Polars dataframes
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import os | |
| from datetime import UTC, datetime, timedelta | |
| from requests import get | |
| # https://prometheus.io/docs/prometheus/latest/querying/api/ | |
| class Prometheus: | |
| """ | |
| Class for querying Prometheus | |
| """ | |
| def __init__( | |
| self, | |
| url: str = os.getenv("PROMETHEUS_URL"), | |
| auth: tuple = (os.getenv("PROMETHEUS_USER"), os.getenv("PROMETHEUS_PASS")), | |
| ) -> None: | |
| self.url = url | |
| self.auth = auth | |
| def labels(self, start: datetime | None = None, end: datetime | None = None) -> list: | |
| """ | |
| List all labels | |
| """ | |
| if start is None: | |
| start = datetime.now(tz=UTC) - timedelta(days=1) | |
| if end is None: | |
| end = datetime.now(tz=UTC) | |
| params = ( | |
| {} | |
| if start == "" or end == "" | |
| else { | |
| "start": start.astimezone().isoformat(), | |
| "end": end.astimezone().isoformat(), | |
| } | |
| ) | |
| res = get(url=self.url + "/api/v1/labels", auth=self.auth, params=params, timeout=300) | |
| if res.status_code != 200 or res.json()["status"] != "success": | |
| raise Exception("Well, fuck...") | |
| return res.json()["data"] | |
| def values(self, label: str, start: datetime | None = None, end: datetime | None = None) -> list: | |
| """ | |
| List all values of given label | |
| """ | |
| if start is None: | |
| start = datetime.now(tz=UTC) - timedelta(days=1) | |
| if end is None: | |
| end = datetime.now(tz=UTC) | |
| params = ( | |
| {} | |
| if start == "" or end == "" | |
| else { | |
| "start": start.astimezone().isoformat(), | |
| "end": end.astimezone().isoformat(), | |
| } | |
| ) | |
| res = get(url=self.url + "/api/v1/label/" + label + "/values", auth=self.auth, params=params, timeout=300) | |
| if res.status_code != 200 or res.json()["status"] != "success": | |
| raise Exception("Well, fuck...") | |
| return res.json()["data"] | |
| def query(self, expression: str = '{job=~".+"}', time: datetime | None = None) -> tuple[str, list]: | |
| """ | |
| An instant query expression | |
| """ | |
| if time is None: | |
| time = datetime.now(tz=UTC) | |
| res = get( | |
| url=self.url + "/api/v1/query", | |
| auth=self.auth, | |
| params={ | |
| "query": expression, | |
| "time": time.astimezone().isoformat(), | |
| }, | |
| timeout=300, | |
| ) | |
| if res.status_code != 200 or res.json()["status"] != "success": | |
| raise Exception("Well, fuck...") | |
| return res.json()["data"]["resultType"], res.json()["data"]["result"] | |
| def query_range( | |
| self, expression: str = '{job=~".+"}', start: datetime | None = None, end: datetime | None = None | |
| ) -> tuple[str, list]: | |
| """ | |
| A range query expression | |
| """ | |
| if start is None: | |
| start = datetime.now(tz=UTC) - timedelta(days=1) | |
| if end is None: | |
| end = datetime.now(tz=UTC) | |
| res = get( | |
| url=self.url + "/api/v1/query_range", | |
| auth=self.auth, | |
| params={ | |
| "query": expression, | |
| "start": start.astimezone().isoformat(), | |
| "end": end.astimezone().isoformat(), | |
| "step": int((end - start).total_seconds()) // 10000, # this is hardcoded limit | |
| }, | |
| timeout=300, | |
| ) | |
| if res.status_code != 200 or res.json()["status"] != "success": | |
| raise Exception("Well, fuck...") | |
| return res.json()["data"]["resultType"], res.json()["data"]["result"] | |
| def query_name(self, name: str) -> tuple[str, list]: | |
| """ | |
| Instant, return raw metrics from type "vector", based on provided __name__ label | |
| """ | |
| res = self.query(expression=f'{{__name__="{name}"}}') | |
| if res[0] != "vector": | |
| raise Exception("Well, fuck...") | |
| return res[1] | |
| def query_range_name(self, name: str, start: datetime | None = None, end: datetime | None = None) -> list: | |
| """ | |
| Range, return raw metrics from type "matrix", based on provided __name__ label | |
| """ | |
| if start is None: | |
| start = datetime.now(tz=UTC) - timedelta(days=1) | |
| if end is None: | |
| end = datetime.now(tz=UTC) | |
| res = self.query_range( | |
| expression=f'{{__name__="{name}"}}', | |
| start=start.astimezone().isoformat(), | |
| end=end.astimezone().isoformat(), | |
| ) | |
| if res[0] != "matrix": | |
| raise Exception("Well, fuck...") | |
| return res[1] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment