Skip to content

Instantly share code, notes, and snippets.

@egorsmkv
Created December 26, 2025 23:36
Show Gist options
  • Select an option

  • Save egorsmkv/b13f3c779324ce8c93e653b6c7aa54c2 to your computer and use it in GitHub Desktop.

Select an option

Save egorsmkv/b13f3c779324ce8c93e653b6c7aa54c2 to your computer and use it in GitHub Desktop.
import math
from typing import Dict, Set, Tuple, Optional, List
from collections import defaultdict
# Type Aliases for code clarity and PEP 484 compliance
PointID = int
Coordinates = Tuple[float, ...]
class Point:
"""
Lightweight point representation using __slots__ for memory optimization.
Crucial for high-volume streams where object overhead dominates.
"""
__slots__ = ['id', 'coords']
def __init__(self, pid: PointID, coords: Coordinates):
self.id = pid
self.coords = coords
def dist_sq(self, other: 'Point') -> float:
"""
Squared Euclidean distance.
Preferred for comparisons to avoid expensive sqrt() calls.
"""
return sum((a - b) ** 2 for a, b in zip(self.coords, other.coords))
def dist(self, other: 'Point') -> float:
"""Euclidean distance."""
return math.sqrt(self.dist_sq(other))
def __repr__(self) -> str:
return f"P({self.id})"
class DynamicKCenter:
"""
Fully Dynamic k-Center Clustering with Ghost Center Recourse.
Implements the 'Ghost' heuristic: Removed centers persist as 'ghosts'
until their cluster is empty, preventing avalanche reassignments.
"""
def __init__(self, k: int, initial_radius: float = 1.0):
self.k = k
self.radius = initial_radius
# Data Stores
self.points: Dict[PointID, Point] = {} # Active points
self.ghosts: Dict[PointID, Point] = {} # Deleted points acting as centers
# Clustering Structure
self.centers: Set[PointID] = set() # IDs of ALL centers (Active + Ghost)
self.assignments: Dict[PointID, PointID] = {} # PointID -> CenterID
self.clusters: Dict[PointID, Set[PointID]] = defaultdict(set) # CenterID -> {PointIDs}
def _get_center_point(self, cid: PointID) -> Point:
"""Helper to retrieve a center's Point object (Active or Ghost)."""
if cid in self.points:
return self.points[cid]
if cid in self.ghosts:
return self.ghosts[cid]
raise RuntimeError(f"Center {cid} exists in ID set but data is missing.")
def insert(self, pid: PointID, coords: Coordinates) -> None:
"""
O(k) Insert.
Assigns to nearest existing center (active or ghost) if within 2*radius.
Otherwise, opens a new center.
"""
if pid in self.points:
return # Handle duplicate insertions gracefully
new_pt = Point(pid, coords)
self.points[pid] = new_pt
# 1. Greedy Assignment Strategy
# We scan all current centers (including ghosts).
# This keeps the cover valid without opening new centers unnecessarily.
best_center: Optional[PointID] = None
min_dist_sq = float('inf')
threshold_sq = (2 * self.radius) ** 2
for cid in self.centers:
center_pt = self._get_center_point(cid)
d_sq = new_pt.dist_sq(center_pt)
if d_sq <= threshold_sq:
if d_sq < min_dist_sq:
min_dist_sq = d_sq
best_center = cid
if best_center is not None:
# Covered by existing center
self.assignments[pid] = best_center
self.clusters[best_center].add(pid)
else:
# Not covered; promote to Center
self.centers.add(pid)
self.assignments[pid] = pid
self.clusters[pid].add(pid)
# Check invariant: |S| <= k
if len(self.centers) > self.k:
self._double_radius_and_recluster()
def delete(self, pid: PointID) -> None:
"""
O(1) Delete (Amortized).
If 'pid' is a center, it becomes a Ghost.
It is NOT removed from self.centers until its cluster is empty.
"""
if pid not in self.points:
return # Idempotent handling
# 1. Remove from active data store
point_obj = self.points.pop(pid)
# 2. Logic Branch: Is it a Center?
if pid in self.centers:
# TRANSITION TO GHOST
# We preserve it in 'ghosts' so it can continue covering its children.
self.ghosts[pid] = point_obj
# Remove itself from its own cluster bookkeeping
self.clusters[pid].discard(pid)
# Cleanup: If it has no other children, the ghost can die immediately.
if not self.clusters[pid]:
self._remove_ghost(pid)
else:
# Standard Point Deletion
center_id = self.assignments.pop(pid)
self.clusters[center_id].discard(pid)
# Cleanup: Check if this deletion killed a ghost center
# (A ghost center dies only when its LAST child is removed)
if center_id in self.ghosts and not self.clusters[center_id]:
self._remove_ghost(center_id)
def _remove_ghost(self, cid: PointID) -> None:
"""Final cleanup of a ghost center."""
self.centers.discard(cid)
self.ghosts.pop(cid, None)
self.clusters.pop(cid, None)
def _double_radius_and_recluster(self) -> None:
"""
Triggered when |S| > k.
1. Doubles radius (tau).
2. Computes a Maximal Independent Set (MIS) on the current centers.
3. Merges non-selected centers into selected ones.
"""
self.radius *= 2
# Snapshot current centers to iterate safely
sorted_centers = sorted(list(self.centers)) # Deterministic order
keep_set: Set[PointID] = set()
covered: Set[PointID] = set()
threshold_sq = (2 * self.radius) ** 2
# Greedy MIS Selection
for u_id in sorted_centers:
if u_id in covered:
continue
# 'u' is selected to remain a center
keep_set.add(u_id)
u_pt = self._get_center_point(u_id)
# Find neighbors 'v' to merge into 'u'
for v_id in sorted_centers:
if v_id in keep_set or v_id in covered:
continue
v_pt = self._get_center_point(v_id)
if u_pt.dist_sq(v_pt) <= threshold_sq:
covered.add(v_id)
self._merge_clusters(source=v_id, target=u_id)
# Update strict sets
self.centers = keep_set
# Garbage Collection: Remove ghosts that were merged (and thus empty)
# If a ghost was NOT kept in 'keep_set', and we merged its points,
# it is now empty and can be dropped.
for gid in list(self.ghosts.keys()):
if gid not in self.centers:
# Double check cluster is empty (it should be after merge)
if not self.clusters[gid]:
del self.ghosts[gid]
del self.clusters[gid]
def _merge_clusters(self, source: PointID, target: PointID) -> None:
"""Moves all points from source cluster to target cluster."""
if source == target: return
# Move all children
for child_id in self.clusters[source]:
self.assignments[child_id] = target
self.clusters[target].add(child_id)
# Clear source
self.clusters[source].clear()
# --- Sanity Check / Simulation ---
def run():
sim = DynamicKCenter(k=2, initial_radius=1.0)
# 1. Cluster A: Near (0,0)
sim.insert(1, (0,0))
sim.insert(2, (0.1, 0.1))
# 2. Cluster B: Near (10,10)
sim.insert(3, (10,10))
sim.insert(4, (10.1, 10.1))
# Check Stability
print(f"Centers after init: {sim.centers}")
assert len(sim.centers) <= 2
# 3. Insert Outlier -> Triggers Doubling
sim.insert(5, (100,100))
print(f"Centers after outlier: {sim.centers} (Radius: {sim.radius})")
# 4. Ghosting Test
# Find a center and delete it
victim = next(iter(sim.centers))
print(f"Deleting center {victim}...")
sim.delete(victim)
# Victim should be in ghosts, not in points
assert victim in sim.ghosts
assert victim not in sim.points
assert victim in sim.centers # Still acts as center
print("Ghost invariant holds.")
# 5. Cleanup Test
# Delete children of the ghost.
# NOTE: We need to find points assigned to the victim
children = list(sim.clusters[victim])
print(f"Deleting children of ghost {victim}: {children}")
for child in children:
sim.delete(child)
# Now ghost should be gone
assert victim not in sim.ghosts
assert victim not in sim.centers
print("Ghost cleanup invariant holds.")
if __name__ == "__main__":
run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment