-
-
Save yindia/268129516ef18bd3161292a7d034c0f5 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| import sys, math, argparse, pathlib, textwrap | |
| from dataclasses import dataclass | |
| from typing import List, Dict, Any, Tuple | |
| import yaml | |
| # ---------------- Units ---------------- | |
| def parse_cpu(s: str) -> int: | |
| s = str(s).strip() | |
| if s.endswith('m'): return int(float(s[:-1])) | |
| return int(float(s) * 1000) | |
| def parse_mem(s: str) -> int: | |
| s = str(s).strip().lower() | |
| units = [('ti', 1024*1024), ('gi', 1024), ('mi', 1), ('ki', 1/1024)] | |
| for suffix, mult in units: | |
| if s.endswith(suffix): | |
| return int(round(float(s[:-len(suffix)]) * mult)) | |
| return int(float(s)) | |
| def parse_gpu(v) -> int: | |
| """GPUs are whole-count devices (e.g., 0,1,2...).""" | |
| if v is None: return 0 | |
| s = str(v).strip().lower() | |
| # accept plain ints or '0', '1', etc. | |
| return int(float(s)) | |
| def fmt_cpu(m: int) -> str: return f"{m/1000:.3f} vCPU" | |
| def fmt_mem(mi: int) -> str: return f"{mi/1024:.2f} Gi" if mi >= 1024 else f"{mi} Mi" | |
| # ---------------- Data models ---------------- | |
| @dataclass | |
| class NodeType: | |
| name: str | |
| cpu_m: int | |
| mem_mi: int | |
| max_pods: int | |
| gpus: int = 0 | |
| kube_reserved_cpu_m: int = 0 | |
| kube_reserved_mem_mi: int = 0 | |
| system_reserved_cpu_m: int = 0 | |
| system_reserved_mem_mi: int = 0 | |
| gpu_reserved: int = 0 # rare, but supported | |
| def base_allocatable(self) -> Tuple[int,int,int,int]: | |
| cpu = self.cpu_m - self.kube_reserved_cpu_m - self.system_reserved_cpu_m | |
| mem = self.mem_mi - self.kube_reserved_mem_mi - self.system_reserved_mem_mi | |
| pods = self.max_pods | |
| gpus = max(self.gpus - self.gpu_reserved, 0) | |
| return max(cpu,0), max(mem,0), max(pods,0), gpus | |
| @dataclass | |
| class DaemonSet: | |
| name: str | |
| cpu_m: int | |
| mem_mi: int | |
| gpus: int = 0 | |
| pods: int = 1 | |
| @dataclass | |
| class Workload: | |
| name: str | |
| replicas: int | |
| cpu_m: int | |
| mem_mi: int | |
| gpus: int = 0 | |
| # ---------------- Packing ---------------- | |
| class Bin: | |
| def __init__(self, cpu_m: int, mem_mi: int, pods_cap: int, gpus: int, node_name: str): | |
| self.cpu_m = cpu_m | |
| self.mem_mi = mem_mi | |
| self.pods_left = pods_cap | |
| self.gpus = gpus | |
| self.node_name = node_name | |
| self.contents: List[Tuple[str,int,int,int]] = [] # (name,cpu,mem,gpu) | |
| def can_fit(self, cpu_m: int, mem_mi: int, gpus: int) -> bool: | |
| return (self.cpu_m >= cpu_m and self.mem_mi >= mem_mi and | |
| self.pods_left >= 1 and self.gpus >= gpus) | |
| def place(self, wl_name: str, cpu_m: int, mem_mi: int, gpus: int): | |
| self.cpu_m -= cpu_m | |
| self.mem_mi -= mem_mi | |
| self.pods_left -= 1 | |
| self.gpus -= gpus | |
| self.contents.append((wl_name, cpu_m, mem_mi, gpus)) | |
| def first_fit_decreasing(items: List[Tuple[str,int,int,int]], | |
| node_cpu: int, node_mem: int, node_pods: int, node_gpus: int) -> List[Bin]: | |
| # Sort by the max pressure across all dimensions (including GPUs) | |
| def key(it): | |
| _, c, m, g = it | |
| pressures = [ | |
| c / max(node_cpu, 1), | |
| m / max(node_mem, 1), | |
| (1 if node_pods == 0 else 0), # pod slots are discrete; packing sorts by CPU/Mem/GPU | |
| (g / max(node_gpus, 1)) if node_gpus > 0 else 0 | |
| ] | |
| return max(pressures) | |
| items = sorted(items, key=key, reverse=True) | |
| bins: List[Bin] = [] | |
| node_idx = 0 | |
| for name, c, m, g in items: | |
| placed = False | |
| for b in bins: | |
| if b.can_fit(c, m, g): | |
| b.place(name, c, m, g) | |
| placed = True | |
| break | |
| if not placed: | |
| node_idx += 1 | |
| b = Bin(node_cpu, node_mem, node_pods, node_gpus, f"node-{node_idx}") | |
| if not b.can_fit(c, m, g): | |
| raise ValueError( | |
| f"Pod '{name}' (CPU {fmt_cpu(c)}, Mem {fmt_mem(m)}, GPUs {g}) " | |
| f"exceeds per-node allocatable (CPU {fmt_cpu(node_cpu)}, Mem {fmt_mem(node_mem)}, " | |
| f"Pods {node_pods}, GPUs {node_gpus})." | |
| ) | |
| b.place(name, c, m, g) | |
| bins.append(b) | |
| return bins | |
| # ---------------- Planner ---------------- | |
| def plan_for_node_type(node: NodeType, ds_list: List[DaemonSet], workloads: List[Workload], | |
| mode: str, headroom: float) -> Dict[str, Any]: | |
| cpu_alloc, mem_alloc, pods_cap, gpu_alloc = node.base_allocatable() | |
| # per-node DaemonSet overhead | |
| ds_cpu = sum(d.cpu_m * d.pods for d in ds_list) | |
| ds_mem = sum(d.mem_mi * d.pods for d in ds_list) | |
| ds_pods = sum(d.pods for d in ds_list) | |
| ds_gpus = sum(d.gpus * d.pods for d in ds_list) | |
| node_cpu_after = max(cpu_alloc - ds_cpu, 0) | |
| node_mem_after = max(mem_alloc - ds_mem, 0) | |
| node_pods_after = max(pods_cap - ds_pods, 0) | |
| node_gpus_after = max(gpu_alloc - ds_gpus, 0) | |
| # headroom (fractional) on CPU/Mem/GPU and pod slots | |
| node_cpu_after = int(node_cpu_after * (1.0 - headroom)) | |
| node_mem_after = int(node_mem_after * (1.0 - headroom)) | |
| node_gpus_after = int(math.floor(node_gpus_after * (1.0 - headroom))) | |
| if node_pods_after > 0: | |
| node_pods_after = max(int(math.floor(node_pods_after * (1.0 - headroom))), 1) | |
| # expand workloads into items | |
| items: List[Tuple[str,int,int,int]] = [] | |
| for w in workloads: | |
| for i in range(w.replicas): | |
| items.append((f"{w.name}#{i+1}", w.cpu_m, w.mem_mi, w.gpus)) | |
| bins = first_fit_decreasing(items, node_cpu_after, node_mem_after, node_pods_after, node_gpus_after) | |
| total_nodes = len(bins) | |
| used_cpu = sum((node_cpu_after - b.cpu_m) for b in bins) | |
| used_mem = sum((node_mem_after - b.mem_mi) for b in bins) | |
| used_pods = sum((node_pods_after - b.pods_left) for b in bins) | |
| used_gpu = sum((node_gpus_after - b.gpus) for b in bins) | |
| return { | |
| "node_type": node.name, | |
| "nodes_required": total_nodes, | |
| "per_node_effective_capacity": { | |
| "cpu": node_cpu_after, "mem": node_mem_after, "pods": node_pods_after, "gpus": node_gpus_after | |
| }, | |
| "daemonsets": { | |
| "cpu_per_node": ds_cpu, "mem_per_node": ds_mem, "pods_per_node": ds_pods, "gpus_per_node": ds_gpus | |
| }, | |
| "cluster_utilization": { | |
| "cpu_used": used_cpu, "cpu_total": total_nodes * node_cpu_after, | |
| "mem_used": used_mem, "mem_total": total_nodes * node_mem_after, | |
| "pods_used": used_pods, "pods_total": total_nodes * node_pods_after, | |
| "gpus_used": used_gpu, "gpus_total": total_nodes * node_gpus_after | |
| }, | |
| "placement": [ | |
| { | |
| "node": b.node_name, | |
| "workloads": [{"name": n, "cpu": c, "mem": m, "gpus": g} for (n,c,m,g) in b.contents], | |
| "remaining": {"cpu": b.cpu_m, "mem": b.mem_mi, "pods": b.pods_left, "gpus": b.gpus} | |
| } for b in bins | |
| ] | |
| } | |
| def load_input(path: pathlib.Path, mode: str) -> Tuple[List[NodeType], List[DaemonSet], List[Workload]]: | |
| data = yaml.safe_load(path.read_text()) | |
| def choose(res): | |
| if isinstance(res, dict): | |
| key = "requests" if mode == "requests" else "limits" | |
| if key not in res: key = "limits" if mode == "requests" else "requests" | |
| return res.get(key, {}) | |
| return res or {} | |
| node_types: List[NodeType] = [] | |
| for n in data.get("node_types", []): | |
| node_types.append(NodeType( | |
| name=n["name"], | |
| cpu_m=parse_cpu(n["capacity"]["cpu"]), | |
| mem_mi=parse_mem(n["capacity"]["memory"]), | |
| max_pods=int(n["maxPods"]), | |
| gpus=parse_gpu(n.get("capacity", {}).get("gpus", n.get("gpus", 0))), | |
| kube_reserved_cpu_m=parse_cpu(n.get("kubeReserved", {}).get("cpu", "0")), | |
| kube_reserved_mem_mi=parse_mem(n.get("kubeReserved", {}).get("memory", "0")), | |
| system_reserved_cpu_m=parse_cpu(n.get("systemReserved", {}).get("cpu", "0")), | |
| system_reserved_mem_mi=parse_mem(n.get("systemReserved", {}).get("memory", "0")), | |
| gpu_reserved=parse_gpu(n.get("gpuReserved", 0)), | |
| )) | |
| ds_list: List[DaemonSet] = [] | |
| for d in data.get("daemonsets", []): | |
| res = choose(d.get("resources", {})) | |
| ds_list.append(DaemonSet( | |
| name=d["name"], | |
| cpu_m=parse_cpu(res.get("cpu", "0")), | |
| mem_mi=parse_mem(res.get("memory", "0")), | |
| gpus=parse_gpu(res.get("gpu", 0)), | |
| pods=int(d.get("podsPerNode", 1)) | |
| )) | |
| workloads: List[Workload] = [] | |
| for w in data.get("workloads", []): | |
| res = choose(w.get("resources", {})) | |
| workloads.append(Workload( | |
| name=w["name"], | |
| replicas=int(w["replicas"]), | |
| cpu_m=parse_cpu(res.get("cpu", "0")), | |
| mem_mi=parse_mem(res.get("memory", "0")), | |
| gpus=parse_gpu(res.get("gpu", 0)), | |
| )) | |
| return node_types, ds_list, workloads | |
| def human_summary(result: Dict[str,Any]) -> str: | |
| cap = result["per_node_effective_capacity"] | |
| clu = result["cluster_utilization"] | |
| ds = result["daemonsets"] | |
| lines = [] | |
| lines.append(f"Node type: {result['node_type']}") | |
| lines.append(f"Nodes required: {result['nodes_required']}") | |
| lines.append("Per-node effective capacity (after reserves + DS + headroom):") | |
| lines.append(f" CPU: {fmt_cpu(cap['cpu'])} | Mem: {fmt_mem(cap['mem'])} | Pods: {cap['pods']} | GPUs: {cap['gpus']}") | |
| lines.append("DaemonSets (per node):") | |
| lines.append(f" CPU: {fmt_cpu(ds['cpu_per_node'])} | Mem: {fmt_mem(ds['mem_per_node'])} | Pods: {ds['pods_per_node']} | GPUs: {ds['gpus_per_node']}") | |
| lines.append("Cluster utilization (packed):") | |
| lines.append(f" CPU: {fmt_cpu(clu['cpu_used'])} / {fmt_cpu(clu['cpu_total'])}") | |
| lines.append(f" Mem: {fmt_mem(clu['mem_used'])} / {fmt_mem(clu['mem_total'])}") | |
| lines.append(f" Pods: {clu['pods_used']} / {clu['pods_total']}") | |
| lines.append(f" GPUs: {clu['gpus_used']} / {clu['gpus_total']}") | |
| return "\n".join(lines) | |
| def main(): | |
| parser = argparse.ArgumentParser( | |
| description="Kubernetes capacity planning (CPU/Mem/Pods/GPUs) by requests/limits with headroom.", | |
| formatter_class=argparse.RawTextHelpFormatter, | |
| epilog=textwrap.dedent(""" | |
| Tips: | |
| • Use --mode requests for conservative planning, --mode limits for worst-case. | |
| • Include 'gpu' in resources for GPU workloads (whole numbers). | |
| • Use --headroom 0.15 to leave 15% extra space for spikes/fragmentation. | |
| """) | |
| ) | |
| parser.add_argument("input", type=pathlib.Path, help="YAML file") | |
| parser.add_argument("--mode", choices=["requests","limits"], default="requests") | |
| parser.add_argument("--headroom", type=float, default=0.10) | |
| parser.add_argument("--detail", action="store_true") | |
| args = parser.parse_args() | |
| node_types, ds_list, workloads = load_input(args.input, args.mode) | |
| if not node_types: print("No node_types provided.", file=sys.stderr); sys.exit(2) | |
| if not workloads: print("No workloads provided.", file=sys.stderr); sys.exit(2) | |
| for nt in node_types: | |
| try: | |
| res = plan_for_node_type(nt, ds_list, workloads, args.mode, args.headroom) | |
| except ValueError as e: | |
| print(f"\n[{nt.name}] ERROR: {e}\n") | |
| continue | |
| print("\n" + "="*72) | |
| print(human_summary(res)) | |
| if args.detail: | |
| print("\nPlacement:") | |
| for node in res["placement"]: | |
| used_cpu = sum(w["cpu"] for w in node["workloads"]) | |
| used_mem = sum(w["mem"] for w in node["workloads"]) | |
| used_gpu = sum(w["gpus"] for w in node["workloads"]) | |
| print(f" {node['node']}: used CPU {fmt_cpu(used_cpu)}, Mem {fmt_mem(used_mem)}, " | |
| f"GPUs {used_gpu}, remaining CPU {fmt_cpu(node['remaining']['cpu'])}, " | |
| f"Mem {fmt_mem(node['remaining']['mem'])}, pods left {node['remaining']['pods']}, " | |
| f"GPUs left {node['remaining']['gpus']}") | |
| for w in node["workloads"]: | |
| print(f" - {w['name']} ({fmt_cpu(w['cpu'])}, {fmt_mem(w['mem'])}, GPUs {w['gpus']})") | |
| return 0 | |
| if __name__ == "__main__": | |
| sys.exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment