Skip to content

Instantly share code, notes, and snippets.

@yindia
Created September 16, 2025 10:36
Show Gist options
  • Select an option

  • Save yindia/268129516ef18bd3161292a7d034c0f5 to your computer and use it in GitHub Desktop.

Select an option

Save yindia/268129516ef18bd3161292a7d034c0f5 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import sys, math, argparse, pathlib, textwrap
from dataclasses import dataclass
from typing import List, Dict, Any, Tuple
import yaml
# ---------------- Units ----------------
def parse_cpu(s: str) -> int:
s = str(s).strip()
if s.endswith('m'): return int(float(s[:-1]))
return int(float(s) * 1000)
def parse_mem(s: str) -> int:
s = str(s).strip().lower()
units = [('ti', 1024*1024), ('gi', 1024), ('mi', 1), ('ki', 1/1024)]
for suffix, mult in units:
if s.endswith(suffix):
return int(round(float(s[:-len(suffix)]) * mult))
return int(float(s))
def parse_gpu(v) -> int:
"""GPUs are whole-count devices (e.g., 0,1,2...)."""
if v is None: return 0
s = str(v).strip().lower()
# accept plain ints or '0', '1', etc.
return int(float(s))
def fmt_cpu(m: int) -> str: return f"{m/1000:.3f} vCPU"
def fmt_mem(mi: int) -> str: return f"{mi/1024:.2f} Gi" if mi >= 1024 else f"{mi} Mi"
# ---------------- Data models ----------------
@dataclass
class NodeType:
name: str
cpu_m: int
mem_mi: int
max_pods: int
gpus: int = 0
kube_reserved_cpu_m: int = 0
kube_reserved_mem_mi: int = 0
system_reserved_cpu_m: int = 0
system_reserved_mem_mi: int = 0
gpu_reserved: int = 0 # rare, but supported
def base_allocatable(self) -> Tuple[int,int,int,int]:
cpu = self.cpu_m - self.kube_reserved_cpu_m - self.system_reserved_cpu_m
mem = self.mem_mi - self.kube_reserved_mem_mi - self.system_reserved_mem_mi
pods = self.max_pods
gpus = max(self.gpus - self.gpu_reserved, 0)
return max(cpu,0), max(mem,0), max(pods,0), gpus
@dataclass
class DaemonSet:
name: str
cpu_m: int
mem_mi: int
gpus: int = 0
pods: int = 1
@dataclass
class Workload:
name: str
replicas: int
cpu_m: int
mem_mi: int
gpus: int = 0
# ---------------- Packing ----------------
class Bin:
def __init__(self, cpu_m: int, mem_mi: int, pods_cap: int, gpus: int, node_name: str):
self.cpu_m = cpu_m
self.mem_mi = mem_mi
self.pods_left = pods_cap
self.gpus = gpus
self.node_name = node_name
self.contents: List[Tuple[str,int,int,int]] = [] # (name,cpu,mem,gpu)
def can_fit(self, cpu_m: int, mem_mi: int, gpus: int) -> bool:
return (self.cpu_m >= cpu_m and self.mem_mi >= mem_mi and
self.pods_left >= 1 and self.gpus >= gpus)
def place(self, wl_name: str, cpu_m: int, mem_mi: int, gpus: int):
self.cpu_m -= cpu_m
self.mem_mi -= mem_mi
self.pods_left -= 1
self.gpus -= gpus
self.contents.append((wl_name, cpu_m, mem_mi, gpus))
def first_fit_decreasing(items: List[Tuple[str,int,int,int]],
node_cpu: int, node_mem: int, node_pods: int, node_gpus: int) -> List[Bin]:
# Sort by the max pressure across all dimensions (including GPUs)
def key(it):
_, c, m, g = it
pressures = [
c / max(node_cpu, 1),
m / max(node_mem, 1),
(1 if node_pods == 0 else 0), # pod slots are discrete; packing sorts by CPU/Mem/GPU
(g / max(node_gpus, 1)) if node_gpus > 0 else 0
]
return max(pressures)
items = sorted(items, key=key, reverse=True)
bins: List[Bin] = []
node_idx = 0
for name, c, m, g in items:
placed = False
for b in bins:
if b.can_fit(c, m, g):
b.place(name, c, m, g)
placed = True
break
if not placed:
node_idx += 1
b = Bin(node_cpu, node_mem, node_pods, node_gpus, f"node-{node_idx}")
if not b.can_fit(c, m, g):
raise ValueError(
f"Pod '{name}' (CPU {fmt_cpu(c)}, Mem {fmt_mem(m)}, GPUs {g}) "
f"exceeds per-node allocatable (CPU {fmt_cpu(node_cpu)}, Mem {fmt_mem(node_mem)}, "
f"Pods {node_pods}, GPUs {node_gpus})."
)
b.place(name, c, m, g)
bins.append(b)
return bins
# ---------------- Planner ----------------
def plan_for_node_type(node: NodeType, ds_list: List[DaemonSet], workloads: List[Workload],
mode: str, headroom: float) -> Dict[str, Any]:
cpu_alloc, mem_alloc, pods_cap, gpu_alloc = node.base_allocatable()
# per-node DaemonSet overhead
ds_cpu = sum(d.cpu_m * d.pods for d in ds_list)
ds_mem = sum(d.mem_mi * d.pods for d in ds_list)
ds_pods = sum(d.pods for d in ds_list)
ds_gpus = sum(d.gpus * d.pods for d in ds_list)
node_cpu_after = max(cpu_alloc - ds_cpu, 0)
node_mem_after = max(mem_alloc - ds_mem, 0)
node_pods_after = max(pods_cap - ds_pods, 0)
node_gpus_after = max(gpu_alloc - ds_gpus, 0)
# headroom (fractional) on CPU/Mem/GPU and pod slots
node_cpu_after = int(node_cpu_after * (1.0 - headroom))
node_mem_after = int(node_mem_after * (1.0 - headroom))
node_gpus_after = int(math.floor(node_gpus_after * (1.0 - headroom)))
if node_pods_after > 0:
node_pods_after = max(int(math.floor(node_pods_after * (1.0 - headroom))), 1)
# expand workloads into items
items: List[Tuple[str,int,int,int]] = []
for w in workloads:
for i in range(w.replicas):
items.append((f"{w.name}#{i+1}", w.cpu_m, w.mem_mi, w.gpus))
bins = first_fit_decreasing(items, node_cpu_after, node_mem_after, node_pods_after, node_gpus_after)
total_nodes = len(bins)
used_cpu = sum((node_cpu_after - b.cpu_m) for b in bins)
used_mem = sum((node_mem_after - b.mem_mi) for b in bins)
used_pods = sum((node_pods_after - b.pods_left) for b in bins)
used_gpu = sum((node_gpus_after - b.gpus) for b in bins)
return {
"node_type": node.name,
"nodes_required": total_nodes,
"per_node_effective_capacity": {
"cpu": node_cpu_after, "mem": node_mem_after, "pods": node_pods_after, "gpus": node_gpus_after
},
"daemonsets": {
"cpu_per_node": ds_cpu, "mem_per_node": ds_mem, "pods_per_node": ds_pods, "gpus_per_node": ds_gpus
},
"cluster_utilization": {
"cpu_used": used_cpu, "cpu_total": total_nodes * node_cpu_after,
"mem_used": used_mem, "mem_total": total_nodes * node_mem_after,
"pods_used": used_pods, "pods_total": total_nodes * node_pods_after,
"gpus_used": used_gpu, "gpus_total": total_nodes * node_gpus_after
},
"placement": [
{
"node": b.node_name,
"workloads": [{"name": n, "cpu": c, "mem": m, "gpus": g} for (n,c,m,g) in b.contents],
"remaining": {"cpu": b.cpu_m, "mem": b.mem_mi, "pods": b.pods_left, "gpus": b.gpus}
} for b in bins
]
}
def load_input(path: pathlib.Path, mode: str) -> Tuple[List[NodeType], List[DaemonSet], List[Workload]]:
data = yaml.safe_load(path.read_text())
def choose(res):
if isinstance(res, dict):
key = "requests" if mode == "requests" else "limits"
if key not in res: key = "limits" if mode == "requests" else "requests"
return res.get(key, {})
return res or {}
node_types: List[NodeType] = []
for n in data.get("node_types", []):
node_types.append(NodeType(
name=n["name"],
cpu_m=parse_cpu(n["capacity"]["cpu"]),
mem_mi=parse_mem(n["capacity"]["memory"]),
max_pods=int(n["maxPods"]),
gpus=parse_gpu(n.get("capacity", {}).get("gpus", n.get("gpus", 0))),
kube_reserved_cpu_m=parse_cpu(n.get("kubeReserved", {}).get("cpu", "0")),
kube_reserved_mem_mi=parse_mem(n.get("kubeReserved", {}).get("memory", "0")),
system_reserved_cpu_m=parse_cpu(n.get("systemReserved", {}).get("cpu", "0")),
system_reserved_mem_mi=parse_mem(n.get("systemReserved", {}).get("memory", "0")),
gpu_reserved=parse_gpu(n.get("gpuReserved", 0)),
))
ds_list: List[DaemonSet] = []
for d in data.get("daemonsets", []):
res = choose(d.get("resources", {}))
ds_list.append(DaemonSet(
name=d["name"],
cpu_m=parse_cpu(res.get("cpu", "0")),
mem_mi=parse_mem(res.get("memory", "0")),
gpus=parse_gpu(res.get("gpu", 0)),
pods=int(d.get("podsPerNode", 1))
))
workloads: List[Workload] = []
for w in data.get("workloads", []):
res = choose(w.get("resources", {}))
workloads.append(Workload(
name=w["name"],
replicas=int(w["replicas"]),
cpu_m=parse_cpu(res.get("cpu", "0")),
mem_mi=parse_mem(res.get("memory", "0")),
gpus=parse_gpu(res.get("gpu", 0)),
))
return node_types, ds_list, workloads
def human_summary(result: Dict[str,Any]) -> str:
cap = result["per_node_effective_capacity"]
clu = result["cluster_utilization"]
ds = result["daemonsets"]
lines = []
lines.append(f"Node type: {result['node_type']}")
lines.append(f"Nodes required: {result['nodes_required']}")
lines.append("Per-node effective capacity (after reserves + DS + headroom):")
lines.append(f" CPU: {fmt_cpu(cap['cpu'])} | Mem: {fmt_mem(cap['mem'])} | Pods: {cap['pods']} | GPUs: {cap['gpus']}")
lines.append("DaemonSets (per node):")
lines.append(f" CPU: {fmt_cpu(ds['cpu_per_node'])} | Mem: {fmt_mem(ds['mem_per_node'])} | Pods: {ds['pods_per_node']} | GPUs: {ds['gpus_per_node']}")
lines.append("Cluster utilization (packed):")
lines.append(f" CPU: {fmt_cpu(clu['cpu_used'])} / {fmt_cpu(clu['cpu_total'])}")
lines.append(f" Mem: {fmt_mem(clu['mem_used'])} / {fmt_mem(clu['mem_total'])}")
lines.append(f" Pods: {clu['pods_used']} / {clu['pods_total']}")
lines.append(f" GPUs: {clu['gpus_used']} / {clu['gpus_total']}")
return "\n".join(lines)
def main():
parser = argparse.ArgumentParser(
description="Kubernetes capacity planning (CPU/Mem/Pods/GPUs) by requests/limits with headroom.",
formatter_class=argparse.RawTextHelpFormatter,
epilog=textwrap.dedent("""
Tips:
• Use --mode requests for conservative planning, --mode limits for worst-case.
• Include 'gpu' in resources for GPU workloads (whole numbers).
• Use --headroom 0.15 to leave 15% extra space for spikes/fragmentation.
""")
)
parser.add_argument("input", type=pathlib.Path, help="YAML file")
parser.add_argument("--mode", choices=["requests","limits"], default="requests")
parser.add_argument("--headroom", type=float, default=0.10)
parser.add_argument("--detail", action="store_true")
args = parser.parse_args()
node_types, ds_list, workloads = load_input(args.input, args.mode)
if not node_types: print("No node_types provided.", file=sys.stderr); sys.exit(2)
if not workloads: print("No workloads provided.", file=sys.stderr); sys.exit(2)
for nt in node_types:
try:
res = plan_for_node_type(nt, ds_list, workloads, args.mode, args.headroom)
except ValueError as e:
print(f"\n[{nt.name}] ERROR: {e}\n")
continue
print("\n" + "="*72)
print(human_summary(res))
if args.detail:
print("\nPlacement:")
for node in res["placement"]:
used_cpu = sum(w["cpu"] for w in node["workloads"])
used_mem = sum(w["mem"] for w in node["workloads"])
used_gpu = sum(w["gpus"] for w in node["workloads"])
print(f" {node['node']}: used CPU {fmt_cpu(used_cpu)}, Mem {fmt_mem(used_mem)}, "
f"GPUs {used_gpu}, remaining CPU {fmt_cpu(node['remaining']['cpu'])}, "
f"Mem {fmt_mem(node['remaining']['mem'])}, pods left {node['remaining']['pods']}, "
f"GPUs left {node['remaining']['gpus']}")
for w in node["workloads"]:
print(f" - {w['name']} ({fmt_cpu(w['cpu'])}, {fmt_mem(w['mem'])}, GPUs {w['gpus']})")
return 0
if __name__ == "__main__":
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment