Created
February 10, 2026 07:35
-
-
Save erain/f7d13d973a29f90a40b16d7228df8ec8 to your computer and use it in GitHub Desktop.
OpenClaw Self-Health Check Script (Sanitized)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| import json | |
| import os | |
| import subprocess | |
| import sys | |
| import time | |
| from datetime import datetime, timezone | |
| from typing import List, Tuple | |
| # OpenClaw Self-Health Checker | |
| # | |
| # FEATURES: | |
| # - Full deep inspection (Cron jobs, Models, Browser, System Resources). | |
| # - Auto-repair capabilities for Cron jobs. | |
| # - Load Awareness: Skips heavy CLI calls if system is already under heavy load. | |
| # - Deadlock Protection: If system remains "busy" for too long (e.g. hung script), | |
| # it force-kills the stuck process and runs the check anyway. | |
| # --- CONFIGURATION --- | |
| # Adjust these paths and values for your environment | |
| OPENCLAW_CMD = ["openclaw"] # Ensure 'openclaw' is in your PATH | |
| SAFE_MODEL = "flash" | |
| SAFE_UPDATE_SCRIPT = os.path.expanduser("~/scripts/openclaw_stable_release_update.sh") | |
| LEGACY_UPDATE_TOKENS = ("auto-stash", "update.run", "git stash") | |
| RECENT_ERROR_WINDOW_MS = 36 * 60 * 60 * 1000 | |
| ACTIVE_PROBE_ENV = "OPENCLAW_HEALTH_ACTIVE_PROBE" | |
| ACTIVE_PROBE_MIN_INTERVAL_MS = 6 * 60 * 60 * 1000 | |
| ACTIVE_PROBE_STAMP_FILE = os.path.expanduser("~/.openclaw/cron/.last_model_probe_ms") | |
| OAUTH_EXPIRY_WARN_MS = 5 * 60 * 1000 | |
| ROUTE_STATE_FILE = os.path.expanduser("~/.openclaw/cron/model_route_guard.json") | |
| FAILURE_STATE_FILE = os.path.expanduser("~/.openclaw/cron/health_check_failure_state.json") | |
| FAILURE_THRESHOLD = 3 | |
| MAX_CONSECUTIVE_SKIPS = 4 # Max times to skip due to load (4 * 30m = 2 hours) | |
| ADMIN_USER_ID = os.getenv("OPENCLAW_ADMIN_USER_ID", "YOUR_ADMIN_ID_HERE") # Target for alerts | |
| ROUTE_BUFFER_MS = int(os.getenv("OPENCLAW_ROUTE_BUFFER_MS", "60000") or "60000") | |
| ROUTE_TARGET_PRIMARY = os.getenv( | |
| "OPENCLAW_ROUTE_TARGET_PRIMARY", "google-gemini-cli/gemini-3-pro-preview" | |
| ).strip() | |
| ROUTE_FALLBACK_PRIMARY = os.getenv( | |
| "OPENCLAW_ROUTE_FALLBACK_PRIMARY", "openai-codex/gpt-5.2" | |
| ).strip() | |
| ROUTE_FLASH_FALLBACK = os.getenv( | |
| "OPENCLAW_ROUTE_FLASH_FALLBACK", "google-gemini-cli/gemini-3-flash-preview" | |
| ).strip() | |
| ROUTE_GLM_FALLBACK = os.getenv( | |
| "OPENCLAW_ROUTE_GLM_FALLBACK", "zai/glm-4.7-flash" | |
| ).strip() | |
| def parse_model_list_env(env_name: str, default_value: List[str]) -> List[str]: | |
| raw = os.getenv(env_name, "").strip() | |
| if not raw: | |
| return list(default_value) | |
| try: | |
| data = json.loads(raw) | |
| if isinstance(data, list) and all(isinstance(x, str) for x in data): | |
| return [x.strip() for x in data if str(x).strip()] | |
| except Exception: | |
| return list(default_value) | |
| return list(default_value) | |
| ROUTE_NORMAL_FALLBACKS = parse_model_list_env( | |
| "OPENCLAW_ROUTE_NORMAL_FALLBACKS", | |
| [ROUTE_FALLBACK_PRIMARY, ROUTE_FLASH_FALLBACK, ROUTE_GLM_FALLBACK], | |
| ) | |
| ROUTE_COOLDOWN_FALLBACKS = parse_model_list_env( | |
| "OPENCLAW_ROUTE_COOLDOWN_FALLBACKS", | |
| [ROUTE_FLASH_FALLBACK, ROUTE_GLM_FALLBACK], | |
| ) | |
| def now_ms() -> int: | |
| return int(time.time() * 1000) | |
| def ms_to_utc(ms: int) -> str: | |
| try: | |
| return datetime.fromtimestamp(ms / 1000, tz=timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC") | |
| except Exception: | |
| return str(ms) | |
| def load_route_state() -> dict: | |
| try: | |
| with open(ROUTE_STATE_FILE, "r", encoding="utf-8") as handle: | |
| data = json.load(handle) | |
| if isinstance(data, dict): | |
| return data | |
| except FileNotFoundError: | |
| return {} | |
| except Exception: | |
| return {} | |
| return {} | |
| def save_route_state(state: dict) -> None: | |
| os.makedirs(os.path.dirname(ROUTE_STATE_FILE), exist_ok=True) | |
| with open(ROUTE_STATE_FILE, "w", encoding="utf-8") as handle: | |
| json.dump(state, handle, ensure_ascii=True, indent=2) | |
| def clear_route_state() -> None: | |
| try: | |
| os.remove(ROUTE_STATE_FILE) | |
| except FileNotFoundError: | |
| return | |
| except Exception: | |
| return | |
| def set_primary_model(model_ref: str) -> Tuple[bool, str]: | |
| _, err, code = run_openclaw( | |
| ["config", "set", "agents.defaults.model.primary", model_ref], | |
| timeout=180, | |
| ) | |
| if code == 0: | |
| return True, "" | |
| return False, err | |
| def set_fallback_models(model_refs: List[str]) -> Tuple[bool, str]: | |
| value = json.dumps(model_refs) | |
| _, err, code = run_openclaw( | |
| ["config", "set", "--json", "agents.defaults.model.fallbacks", value], | |
| timeout=180, | |
| ) | |
| if code == 0: | |
| return True, "" | |
| return False, err | |
| def extract_provider(model_ref: str) -> str: | |
| if "/" not in model_ref: | |
| return "" | |
| return model_ref.split("/", 1)[0].strip().lower() | |
| def reconcile_primary_route_with_cooldown( | |
| status_payload: dict, report: List[str] | |
| ) -> None: | |
| auth = status_payload.get("auth", {}) or {} | |
| unusable = auth.get("unusableProfiles", []) or [] | |
| current_primary = str( | |
| status_payload.get("resolvedDefault") | |
| or status_payload.get("defaultModel") | |
| or "" | |
| ).strip() | |
| current_fallbacks = status_payload.get("fallbacks", []) or [] | |
| if not isinstance(current_fallbacks, list): | |
| current_fallbacks = [] | |
| current_fallbacks = [str(x).strip() for x in current_fallbacks if str(x).strip()] | |
| target_provider = extract_provider(ROUTE_TARGET_PRIMARY) | |
| current_ms = now_ms() | |
| state = load_route_state() | |
| cooldown_until_ms = 0 | |
| for item in unusable: | |
| provider = str(item.get("provider", "")).strip().lower() | |
| kind = str(item.get("kind", "")).strip().lower() | |
| until = int(item.get("until", 0) or 0) | |
| if provider == target_provider and kind == "cooldown" and until > cooldown_until_ms: | |
| cooldown_until_ms = until | |
| if cooldown_until_ms > current_ms: | |
| restore_at_ms = cooldown_until_ms + ROUTE_BUFFER_MS | |
| if current_primary == ROUTE_TARGET_PRIMARY: | |
| ok, err = set_primary_model(ROUTE_FALLBACK_PRIMARY) | |
| if ok: | |
| report.append( | |
| "✅ Switched primary model to " | |
| f"{ROUTE_FALLBACK_PRIMARY} due to {target_provider} cooldown " | |
| f"(restore after {ms_to_utc(restore_at_ms)})." | |
| ) | |
| else: | |
| report.append( | |
| "❌ Failed to switch primary model to fallback " | |
| f"{ROUTE_FALLBACK_PRIMARY}: {err}" | |
| ) | |
| if current_primary in {ROUTE_TARGET_PRIMARY, ROUTE_FALLBACK_PRIMARY}: | |
| if current_fallbacks != ROUTE_COOLDOWN_FALLBACKS: | |
| ok, err = set_fallback_models(ROUTE_COOLDOWN_FALLBACKS) | |
| if ok: | |
| report.append( | |
| "✅ Updated fallback models for cooldown mode: " | |
| + ", ".join(ROUTE_COOLDOWN_FALLBACKS) | |
| ) | |
| else: | |
| report.append( | |
| "❌ Failed to update fallback models for cooldown mode: " | |
| f"{err}" | |
| ) | |
| else: | |
| report.append( | |
| "⚠️ Primary model was manually set to " | |
| f"{current_primary}; skipping automatic cooldown override." | |
| ) | |
| save_route_state( | |
| { | |
| "active": True, | |
| "targetPrimary": ROUTE_TARGET_PRIMARY, | |
| "fallbackPrimary": ROUTE_FALLBACK_PRIMARY, | |
| "provider": target_provider, | |
| "cooldownUntilMs": cooldown_until_ms, | |
| "restoreAtMs": restore_at_ms, | |
| "updatedAtMs": current_ms, | |
| } | |
| ) | |
| return | |
| if current_primary == ROUTE_TARGET_PRIMARY and current_fallbacks != ROUTE_NORMAL_FALLBACKS: | |
| fb_ok, fb_err = set_fallback_models(ROUTE_NORMAL_FALLBACKS) | |
| if fb_ok: | |
| report.append( | |
| "✅ Updated fallback models for normal mode: " | |
| + ", ".join(ROUTE_NORMAL_FALLBACKS) | |
| ) | |
| else: | |
| report.append( | |
| "❌ Failed to update fallback models for normal mode: " | |
| f"{fb_err}" | |
| ) | |
| active = bool(state.get("active")) | |
| restore_at_ms = int(state.get("restoreAtMs", 0) or 0) | |
| if not active: | |
| return | |
| if current_ms < restore_at_ms: | |
| report.append( | |
| "ℹ️ Waiting to restore primary model " | |
| f"{ROUTE_TARGET_PRIMARY} at {ms_to_utc(restore_at_ms)}." | |
| ) | |
| return | |
| if current_primary == ROUTE_FALLBACK_PRIMARY: | |
| ok, err = set_primary_model(ROUTE_TARGET_PRIMARY) | |
| if ok: | |
| report.append( | |
| "✅ Restored primary model to " | |
| f"{ROUTE_TARGET_PRIMARY} after cooldown window." | |
| ) | |
| if current_fallbacks != ROUTE_NORMAL_FALLBACKS: | |
| fb_ok, fb_err = set_fallback_models(ROUTE_NORMAL_FALLBACKS) | |
| if fb_ok: | |
| report.append( | |
| "✅ Restored fallback models for normal mode: " | |
| + ", ".join(ROUTE_NORMAL_FALLBACKS) | |
| ) | |
| else: | |
| report.append( | |
| "❌ Failed to restore fallback models for normal mode: " | |
| f"{fb_err}" | |
| ) | |
| clear_route_state() | |
| else: | |
| report.append( | |
| "❌ Failed to restore primary model to " | |
| f"{ROUTE_TARGET_PRIMARY}: {err}" | |
| ) | |
| state["updatedAtMs"] = current_ms | |
| save_route_state(state) | |
| return | |
| if current_primary == ROUTE_TARGET_PRIMARY: | |
| if current_fallbacks != ROUTE_NORMAL_FALLBACKS: | |
| fb_ok, fb_err = set_fallback_models(ROUTE_NORMAL_FALLBACKS) | |
| if fb_ok: | |
| report.append( | |
| "✅ Ensured fallback models for normal mode: " | |
| + ", ".join(ROUTE_NORMAL_FALLBACKS) | |
| ) | |
| else: | |
| report.append( | |
| "❌ Failed to ensure fallback models for normal mode: " | |
| f"{fb_err}" | |
| ) | |
| clear_route_state() | |
| return | |
| report.append( | |
| "⚠️ Primary model changed to " | |
| f"{current_primary}; clearing automatic route guard state." | |
| ) | |
| clear_route_state() | |
| def run_command(args: List[str], timeout: int = 120) -> Tuple[str, str, int]: | |
| try: | |
| result = subprocess.run( | |
| args, | |
| capture_output=True, | |
| text=True, | |
| timeout=timeout, | |
| check=False, | |
| ) | |
| return result.stdout.strip(), result.stderr.strip(), result.returncode | |
| except subprocess.TimeoutExpired: | |
| return "", f"timeout after {timeout}s", 124 | |
| except Exception as exc: | |
| return "", str(exc), 1 | |
| def run_openclaw(args: List[str], timeout: int = 120, retries: int = 2) -> Tuple[str, str, int]: | |
| last_stdout, last_stderr, last_code = "", "", 0 | |
| for attempt in range(retries + 1): | |
| last_stdout, last_stderr, last_code = run_command(OPENCLAW_CMD + args, timeout=timeout) | |
| if last_code == 0: | |
| return last_stdout, last_stderr, last_code | |
| is_transient = "timeout" in last_stderr.lower() or "timeout" in last_stdout.lower() | |
| if not is_transient or attempt == retries: | |
| break | |
| wait_time = 2 ** (attempt + 1) | |
| time.sleep(wait_time) | |
| return last_stdout, last_stderr, last_code | |
| def check_update_script_health() -> List[str]: | |
| issues: List[str] = [] | |
| if not os.path.exists(SAFE_UPDATE_SCRIPT): | |
| # issues.append(f"❌ Missing stable update script: {SAFE_UPDATE_SCRIPT}") | |
| pass # Optional check | |
| # if not os.access(SAFE_UPDATE_SCRIPT, os.X_OK): | |
| # issues.append(f"❌ Stable update script is not executable: {SAFE_UPDATE_SCRIPT}") | |
| return issues | |
| def check_cron_jobs() -> List[str]: | |
| print("--- Checking Cron Jobs (Deep) ---") | |
| stdout, stderr, code = run_openclaw(["cron", "list", "--all", "--json"]) | |
| if code != 0: | |
| return [f"ERROR: Could not list cron jobs: {stderr or stdout}"] | |
| try: | |
| payload = json.loads(stdout) | |
| except Exception as exc: | |
| return [f"ERROR: Failed to parse cron list JSON: {exc}"] | |
| jobs = payload.get("jobs", []) | |
| anomalies: List[str] = [] | |
| for job in jobs: | |
| job_id = str(job.get("id", "")) | |
| job_name = str(job.get("name", "<unnamed>")) | |
| enabled = bool(job.get("enabled", False)) | |
| state = job.get("state", {}) or {} | |
| last_status = str(state.get("lastStatus", "")) | |
| last_error = str(state.get("lastError", "")) | |
| last_run_at_ms = int(state.get("lastRunAtMs", 0) or 0) | |
| message = str((job.get("payload") or {}).get("message", "")) | |
| message_lc = message.lower() | |
| model_name = str((job.get("payload") or {}).get("model", "")) | |
| is_recent_error = ( | |
| last_status == "error" | |
| and last_error | |
| and (int(time.time() * 1000) - last_run_at_ms) <= RECENT_ERROR_WINDOW_MS | |
| ) | |
| is_model_auth_error = ( | |
| "model not allowed" in last_error | |
| or 'No API key found for provider "google"' in last_error | |
| ) | |
| resolved_model_auth_error = is_model_auth_error and model_name == SAFE_MODEL | |
| if is_recent_error: | |
| if not resolved_model_auth_error: | |
| anomalies.append(f"❌ Job '{job_name}' failed last run: {last_error}") | |
| needs_model_repair = is_model_auth_error | |
| if is_recent_error and needs_model_repair and model_name != SAFE_MODEL: | |
| print( | |
| f"🔧 Attempting model repair for '{job_name}' -> '{SAFE_MODEL}'..." | |
| ) | |
| _, repair_err, repair_code = run_openclaw( | |
| ["cron", "edit", job_id, "--model", SAFE_MODEL], timeout=180 | |
| ) | |
| if repair_code == 0: | |
| anomalies.append(f"✅ Repaired model for '{job_name}'") | |
| else: | |
| anomalies.append( | |
| f"❌ Failed model repair for '{job_name}': {repair_err}" | |
| ) | |
| if "self-update" in job_name.lower() and enabled: | |
| if any(token in message_lc for token in LEGACY_UPDATE_TOKENS): | |
| print(f"🔧 Disabling legacy self-update job '{job_name}'...") | |
| _, disable_err, disable_code = run_openclaw( | |
| ["cron", "disable", job_id], timeout=120 | |
| ) | |
| if disable_code == 0: | |
| anomalies.append( | |
| f"✅ Disabled legacy self-update job '{job_name}'" | |
| ) | |
| else: | |
| anomalies.append( | |
| f"❌ Failed to disable legacy self-update job '{job_name}': {disable_err}" | |
| ) | |
| return anomalies | |
| def should_run_active_probe() -> bool: | |
| if os.getenv(ACTIVE_PROBE_ENV, "0") != "1": | |
| return False | |
| try: | |
| with open(ACTIVE_PROBE_STAMP_FILE, "r", encoding="utf-8") as handle: | |
| last_ms = int(handle.read().strip() or "0") | |
| except FileNotFoundError: | |
| last_ms = 0 | |
| except Exception: | |
| last_ms = 0 | |
| now_ms = int(time.time() * 1000) | |
| return (now_ms - last_ms) >= ACTIVE_PROBE_MIN_INTERVAL_MS | |
| def mark_active_probe_run() -> None: | |
| os.makedirs(os.path.dirname(ACTIVE_PROBE_STAMP_FILE), exist_ok=True) | |
| with open(ACTIVE_PROBE_STAMP_FILE, "w", encoding="utf-8") as handle: | |
| handle.write(str(int(time.time() * 1000))) | |
| def check_model_health() -> List[str]: | |
| print("--- Checking Model & Alias Health (Deep) ---") | |
| issues: List[str] = [] | |
| stdout, stderr, code = run_openclaw(["models", "status", "--json"]) | |
| if code != 0: | |
| return [f"ERROR: Could not read model status: {stderr or stdout}"] | |
| try: | |
| data = json.loads(stdout) | |
| reconcile_primary_route_with_cooldown(data, issues) | |
| auth = data.get("auth", {}) or {} | |
| missing = auth.get("missingProvidersInUse", []) or [] | |
| if missing: | |
| issues.append( | |
| f"❌ Missing auth for providers in use: {', '.join(str(x) for x in missing)}" | |
| ) | |
| unusable = auth.get("unusableProfiles", []) or [] | |
| for item in unusable: | |
| provider = str(item.get("provider", "unknown")) | |
| kind = str(item.get("kind", "unknown")) | |
| remaining_ms = int(item.get("remainingMs", 0) or 0) | |
| if kind == "cooldown": | |
| issues.append( | |
| f"⚠️ Provider '{provider}' is cooling down ({max(remaining_ms // 1000, 0)}s remaining)." | |
| ) | |
| else: | |
| issues.append(f"⚠️ Profile unusable: provider={provider}, reason={kind}") | |
| oauth = ((auth.get("oauth", {}) or {}).get("profiles", [])) or [] | |
| for profile in oauth: | |
| provider = str(profile.get("provider", "unknown")) | |
| status = str(profile.get("status", "unknown")) | |
| if status != "ok": | |
| issues.append(f"⚠️ OAuth profile unhealthy: {provider} status={status}") | |
| except Exception as exc: | |
| return [f"ERROR: Failed to parse model status JSON: {exc}"] | |
| if not should_run_active_probe(): | |
| return issues | |
| print("--- Running active model probe ---") | |
| probe_stdout, probe_stderr, probe_code = run_openclaw( | |
| ["models", "status", "--probe", "--json"], timeout=180 | |
| ) | |
| mark_active_probe_run() | |
| if probe_code != 0: | |
| issues.append(f"ERROR: Active model probe failed: {probe_stderr or probe_stdout}") | |
| return issues | |
| try: | |
| probe = json.loads(probe_stdout) | |
| results = (((probe.get("auth", {}) or {}).get("probes", {}) or {}).get("results", [])) | |
| for result in results: | |
| status = str(result.get("status", "unknown")) | |
| provider = str(result.get("provider", "unknown")) | |
| model = str(result.get("model", "unknown")) | |
| if status not in {"ok", "unknown"}: | |
| issues.append(f"❌ Active probe failed: {provider}/{model} status={status}") | |
| elif status == "unknown": | |
| err = str(result.get("error", "unknown")) | |
| issues.append(f"⚠️ Active probe uncertain: {provider}/{model} ({err})") | |
| except Exception as exc: | |
| issues.append(f"ERROR: Failed to parse active probe JSON: {exc}") | |
| return issues | |
| def check_browser_relay() -> List[str]: | |
| print("--- Checking Browser Relay (Deep) ---") | |
| stdout, stderr, code = run_openclaw(["browser", "status", "--json"]) | |
| if code != 0: | |
| return [f"ERROR: Could not check browser status: {stderr or stdout}"] | |
| try: | |
| payload = json.loads(stdout) | |
| if not payload.get("running"): | |
| return [] | |
| return [] | |
| except Exception as exc: | |
| return [f"ERROR: Failed to parse browser status: {exc}"] | |
| def check_system_resources() -> List[str]: | |
| print("--- Checking System Resources ---") | |
| issues: List[str] = [] | |
| disk_out, _, disk_code = run_command(["bash", "-lc", "df -h / --output=pcent | tail -1"]) | |
| if disk_code == 0 and disk_out: | |
| try: | |
| disk_usage = int(disk_out.replace("%", "").strip()) | |
| if disk_usage > 90: | |
| issues.append(f"🚨 Disk usage critical: {disk_usage}%") | |
| except ValueError: | |
| issues.append(f"⚠️ Could not parse disk usage output: {disk_out}") | |
| mem_out, _, mem_code = run_command( | |
| ["bash", "-lc", "free | awk '/Mem:/ {printf \"%.1f\", ($3/$2)*100}'"] | |
| ) | |
| if mem_code == 0 and mem_out: | |
| try: | |
| mem_usage = float(mem_out) | |
| if mem_usage > 90: | |
| issues.append(f"🚨 Memory usage high: {mem_usage:.1f}%") | |
| except ValueError: | |
| issues.append(f"⚠️ Could not parse memory usage output: {mem_out}") | |
| return issues | |
| def get_system_diagnostics() -> str: | |
| diag = ["\n--- System Diagnostics ---"] | |
| stdout, _, code = run_command(["systemctl", "--user", "status", "openclaw-gateway", "--no-pager"]) | |
| if code == 0: | |
| diag.append(f"Gateway Status:\n{stdout}") | |
| else: | |
| diag.append("Gateway Status: Service not found or error checking status.") | |
| stdout, _, code = run_command(["journalctl", "--user", "-u", "openclaw-gateway", "-n", "20", "--no-pager"]) | |
| if code == 0: | |
| diag.append(f"Recent Logs:\n{stdout}") | |
| return "\n".join(diag) | |
| def load_failure_state() -> dict: | |
| try: | |
| with open(FAILURE_STATE_FILE, "r", encoding="utf-8") as handle: | |
| return json.load(handle) | |
| except Exception: | |
| return {"consecutive_failures": 0, "consecutive_skips": 0} | |
| def save_failure_state(state: dict) -> None: | |
| os.makedirs(os.path.dirname(FAILURE_STATE_FILE), exist_ok=True) | |
| with open(FAILURE_STATE_FILE, "w", encoding="utf-8") as handle: | |
| json.dump(state, handle, indent=2) | |
| HEAVY_SIGS = [ | |
| "knowledge/sync_script.py", | |
| "knowledge/auto_sync.sh", | |
| "apt-get", | |
| "dpkg" | |
| ] | |
| def kill_heavy_processes(): | |
| print("🗡️ Force-killing stuck heavy processes...") | |
| for sig in HEAVY_SIGS: | |
| try: | |
| subprocess.run(["pkill", "-9", "-f", sig], check=False) | |
| except: pass | |
| def check_heavy_load(state) -> bool: | |
| # Check for known heavy processes | |
| found_heavy = False | |
| try: | |
| for sig in HEAVY_SIGS: | |
| res = subprocess.run(["pgrep", "-f", sig], stdout=subprocess.DEVNULL) | |
| if res.returncode == 0: | |
| found_heavy = True | |
| break | |
| except: pass | |
| if found_heavy: | |
| skips = state.get("consecutive_skips", 0) + 1 | |
| state["consecutive_skips"] = skips | |
| if skips >= MAX_CONSECUTIVE_SKIPS: | |
| print(f"🚨 Deadlock Detected! System busy for {skips} consecutive checks.") | |
| kill_heavy_processes() | |
| state["consecutive_skips"] = 0 # Reset after kill | |
| save_failure_state(state) | |
| # Send specific alert for deadlock | |
| try: | |
| subprocess.run([ | |
| "openclaw", "message", "send", | |
| "--target", ADMIN_USER_ID, | |
| "--message", f"🚨 **System Deadlock Resolved**\nProcess stuck for >2h. Force-killed: {HEAVY_SIGS}" | |
| ], check=False, timeout=60) | |
| except: pass | |
| return False # Continue to deep check since we killed the blockers | |
| print(f"ℹ️ High load detected (Skip {skips}/{MAX_CONSECUTIVE_SKIPS}). Skipping deep checks.") | |
| save_failure_state(state) | |
| return True | |
| # Not heavy | |
| if state.get("consecutive_skips", 0) > 0: | |
| state["consecutive_skips"] = 0 | |
| save_failure_state(state) | |
| return False | |
| def main() -> None: | |
| report: List[str] = [] | |
| state = load_failure_state() | |
| print(f"Starting Health Check ({datetime.now()})...") | |
| # 1. Always run lightweight resource checks | |
| report.extend(check_update_script_health()) | |
| report.extend(check_system_resources()) | |
| # 2. Smart Throttling with Deadlock Protection | |
| if check_heavy_load(state): | |
| pass # Skipped due to load | |
| else: | |
| # Run heavy checks | |
| report.extend(check_cron_jobs()) | |
| report.extend(check_model_health()) | |
| report.extend(check_browser_relay()) | |
| if not report: | |
| if state.get("consecutive_failures", 0) > 0: | |
| print(f"✅ System recovered after {state['consecutive_failures']} failures.") | |
| state["consecutive_failures"] = 0 | |
| save_failure_state(state) | |
| try: | |
| subprocess.run([ | |
| "openclaw", "message", "send", | |
| "--target", ADMIN_USER_ID, | |
| "--message", "✅ **System Health Recovered**\nAll systems nominal." | |
| ], check=False, timeout=60) | |
| except: pass | |
| print("All systems nominal.") | |
| sys.exit(0) | |
| # Increment failure count | |
| state["consecutive_failures"] = state.get("consecutive_failures", 0) + 1 | |
| save_failure_state(state) | |
| msg = "\n".join(report) | |
| print(msg) | |
| if state["consecutive_failures"] >= FAILURE_THRESHOLD: | |
| print(f"🚨 Failure threshold reached ({state['consecutive_failures']}). Sending alert...") | |
| try: | |
| diagnostics = get_system_diagnostics() | |
| if len(diagnostics) > 3000: | |
| diagnostics = diagnostics[:3000] + "\n...[truncated]" | |
| alert_text = ( | |
| f"🚨 **Self-Health Check Alert** ({state['consecutive_failures']} failures)\n\n" | |
| f"{msg}\n\n" | |
| f"```\n{diagnostics}\n```" | |
| ) | |
| channel = os.getenv("OPENCLAW_HEALTH_ALERT_CHANNEL", "telegram").strip() or "telegram" | |
| target = os.getenv("OPENCLAW_HEALTH_ALERT_TARGET", ADMIN_USER_ID).strip() or ADMIN_USER_ID | |
| subprocess.run( | |
| [ | |
| "openclaw", | |
| "message", | |
| "send", | |
| "--channel", | |
| channel, | |
| "--target", | |
| target, | |
| "--message", | |
| alert_text, | |
| ], | |
| check=False, | |
| timeout=60, | |
| ) | |
| except Exception as exc: | |
| print(f"Failed to send alert message: {exc}") | |
| else: | |
| print(f"ℹ️ Issue detected, but suppression active (Failure {state['consecutive_failures']}/{FAILURE_THRESHOLD}).") | |
| sys.exit(1) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment