Created
December 26, 2025 23:36
-
-
Save egorsmkv/b13f3c779324ce8c93e653b6c7aa54c2 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import math | |
| from typing import Dict, Set, Tuple, Optional, List | |
| from collections import defaultdict | |
| # Type Aliases for code clarity and PEP 484 compliance | |
| PointID = int | |
| Coordinates = Tuple[float, ...] | |
| class Point: | |
| """ | |
| Lightweight point representation using __slots__ for memory optimization. | |
| Crucial for high-volume streams where object overhead dominates. | |
| """ | |
| __slots__ = ['id', 'coords'] | |
| def __init__(self, pid: PointID, coords: Coordinates): | |
| self.id = pid | |
| self.coords = coords | |
| def dist_sq(self, other: 'Point') -> float: | |
| """ | |
| Squared Euclidean distance. | |
| Preferred for comparisons to avoid expensive sqrt() calls. | |
| """ | |
| return sum((a - b) ** 2 for a, b in zip(self.coords, other.coords)) | |
| def dist(self, other: 'Point') -> float: | |
| """Euclidean distance.""" | |
| return math.sqrt(self.dist_sq(other)) | |
| def __repr__(self) -> str: | |
| return f"P({self.id})" | |
| class DynamicKCenter: | |
| """ | |
| Fully Dynamic k-Center Clustering with Ghost Center Recourse. | |
| Implements the 'Ghost' heuristic: Removed centers persist as 'ghosts' | |
| until their cluster is empty, preventing avalanche reassignments. | |
| """ | |
| def __init__(self, k: int, initial_radius: float = 1.0): | |
| self.k = k | |
| self.radius = initial_radius | |
| # Data Stores | |
| self.points: Dict[PointID, Point] = {} # Active points | |
| self.ghosts: Dict[PointID, Point] = {} # Deleted points acting as centers | |
| # Clustering Structure | |
| self.centers: Set[PointID] = set() # IDs of ALL centers (Active + Ghost) | |
| self.assignments: Dict[PointID, PointID] = {} # PointID -> CenterID | |
| self.clusters: Dict[PointID, Set[PointID]] = defaultdict(set) # CenterID -> {PointIDs} | |
| def _get_center_point(self, cid: PointID) -> Point: | |
| """Helper to retrieve a center's Point object (Active or Ghost).""" | |
| if cid in self.points: | |
| return self.points[cid] | |
| if cid in self.ghosts: | |
| return self.ghosts[cid] | |
| raise RuntimeError(f"Center {cid} exists in ID set but data is missing.") | |
| def insert(self, pid: PointID, coords: Coordinates) -> None: | |
| """ | |
| O(k) Insert. | |
| Assigns to nearest existing center (active or ghost) if within 2*radius. | |
| Otherwise, opens a new center. | |
| """ | |
| if pid in self.points: | |
| return # Handle duplicate insertions gracefully | |
| new_pt = Point(pid, coords) | |
| self.points[pid] = new_pt | |
| # 1. Greedy Assignment Strategy | |
| # We scan all current centers (including ghosts). | |
| # This keeps the cover valid without opening new centers unnecessarily. | |
| best_center: Optional[PointID] = None | |
| min_dist_sq = float('inf') | |
| threshold_sq = (2 * self.radius) ** 2 | |
| for cid in self.centers: | |
| center_pt = self._get_center_point(cid) | |
| d_sq = new_pt.dist_sq(center_pt) | |
| if d_sq <= threshold_sq: | |
| if d_sq < min_dist_sq: | |
| min_dist_sq = d_sq | |
| best_center = cid | |
| if best_center is not None: | |
| # Covered by existing center | |
| self.assignments[pid] = best_center | |
| self.clusters[best_center].add(pid) | |
| else: | |
| # Not covered; promote to Center | |
| self.centers.add(pid) | |
| self.assignments[pid] = pid | |
| self.clusters[pid].add(pid) | |
| # Check invariant: |S| <= k | |
| if len(self.centers) > self.k: | |
| self._double_radius_and_recluster() | |
| def delete(self, pid: PointID) -> None: | |
| """ | |
| O(1) Delete (Amortized). | |
| If 'pid' is a center, it becomes a Ghost. | |
| It is NOT removed from self.centers until its cluster is empty. | |
| """ | |
| if pid not in self.points: | |
| return # Idempotent handling | |
| # 1. Remove from active data store | |
| point_obj = self.points.pop(pid) | |
| # 2. Logic Branch: Is it a Center? | |
| if pid in self.centers: | |
| # TRANSITION TO GHOST | |
| # We preserve it in 'ghosts' so it can continue covering its children. | |
| self.ghosts[pid] = point_obj | |
| # Remove itself from its own cluster bookkeeping | |
| self.clusters[pid].discard(pid) | |
| # Cleanup: If it has no other children, the ghost can die immediately. | |
| if not self.clusters[pid]: | |
| self._remove_ghost(pid) | |
| else: | |
| # Standard Point Deletion | |
| center_id = self.assignments.pop(pid) | |
| self.clusters[center_id].discard(pid) | |
| # Cleanup: Check if this deletion killed a ghost center | |
| # (A ghost center dies only when its LAST child is removed) | |
| if center_id in self.ghosts and not self.clusters[center_id]: | |
| self._remove_ghost(center_id) | |
| def _remove_ghost(self, cid: PointID) -> None: | |
| """Final cleanup of a ghost center.""" | |
| self.centers.discard(cid) | |
| self.ghosts.pop(cid, None) | |
| self.clusters.pop(cid, None) | |
| def _double_radius_and_recluster(self) -> None: | |
| """ | |
| Triggered when |S| > k. | |
| 1. Doubles radius (tau). | |
| 2. Computes a Maximal Independent Set (MIS) on the current centers. | |
| 3. Merges non-selected centers into selected ones. | |
| """ | |
| self.radius *= 2 | |
| # Snapshot current centers to iterate safely | |
| sorted_centers = sorted(list(self.centers)) # Deterministic order | |
| keep_set: Set[PointID] = set() | |
| covered: Set[PointID] = set() | |
| threshold_sq = (2 * self.radius) ** 2 | |
| # Greedy MIS Selection | |
| for u_id in sorted_centers: | |
| if u_id in covered: | |
| continue | |
| # 'u' is selected to remain a center | |
| keep_set.add(u_id) | |
| u_pt = self._get_center_point(u_id) | |
| # Find neighbors 'v' to merge into 'u' | |
| for v_id in sorted_centers: | |
| if v_id in keep_set or v_id in covered: | |
| continue | |
| v_pt = self._get_center_point(v_id) | |
| if u_pt.dist_sq(v_pt) <= threshold_sq: | |
| covered.add(v_id) | |
| self._merge_clusters(source=v_id, target=u_id) | |
| # Update strict sets | |
| self.centers = keep_set | |
| # Garbage Collection: Remove ghosts that were merged (and thus empty) | |
| # If a ghost was NOT kept in 'keep_set', and we merged its points, | |
| # it is now empty and can be dropped. | |
| for gid in list(self.ghosts.keys()): | |
| if gid not in self.centers: | |
| # Double check cluster is empty (it should be after merge) | |
| if not self.clusters[gid]: | |
| del self.ghosts[gid] | |
| del self.clusters[gid] | |
| def _merge_clusters(self, source: PointID, target: PointID) -> None: | |
| """Moves all points from source cluster to target cluster.""" | |
| if source == target: return | |
| # Move all children | |
| for child_id in self.clusters[source]: | |
| self.assignments[child_id] = target | |
| self.clusters[target].add(child_id) | |
| # Clear source | |
| self.clusters[source].clear() | |
| # --- Sanity Check / Simulation --- | |
| def run(): | |
| sim = DynamicKCenter(k=2, initial_radius=1.0) | |
| # 1. Cluster A: Near (0,0) | |
| sim.insert(1, (0,0)) | |
| sim.insert(2, (0.1, 0.1)) | |
| # 2. Cluster B: Near (10,10) | |
| sim.insert(3, (10,10)) | |
| sim.insert(4, (10.1, 10.1)) | |
| # Check Stability | |
| print(f"Centers after init: {sim.centers}") | |
| assert len(sim.centers) <= 2 | |
| # 3. Insert Outlier -> Triggers Doubling | |
| sim.insert(5, (100,100)) | |
| print(f"Centers after outlier: {sim.centers} (Radius: {sim.radius})") | |
| # 4. Ghosting Test | |
| # Find a center and delete it | |
| victim = next(iter(sim.centers)) | |
| print(f"Deleting center {victim}...") | |
| sim.delete(victim) | |
| # Victim should be in ghosts, not in points | |
| assert victim in sim.ghosts | |
| assert victim not in sim.points | |
| assert victim in sim.centers # Still acts as center | |
| print("Ghost invariant holds.") | |
| # 5. Cleanup Test | |
| # Delete children of the ghost. | |
| # NOTE: We need to find points assigned to the victim | |
| children = list(sim.clusters[victim]) | |
| print(f"Deleting children of ghost {victim}: {children}") | |
| for child in children: | |
| sim.delete(child) | |
| # Now ghost should be gone | |
| assert victim not in sim.ghosts | |
| assert victim not in sim.centers | |
| print("Ghost cleanup invariant holds.") | |
| if __name__ == "__main__": | |
| run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment