Skip to content

Instantly share code, notes, and snippets.

@erain
Created February 10, 2026 07:35
Show Gist options
  • Select an option

  • Save erain/f7d13d973a29f90a40b16d7228df8ec8 to your computer and use it in GitHub Desktop.

Select an option

Save erain/f7d13d973a29f90a40b16d7228df8ec8 to your computer and use it in GitHub Desktop.
OpenClaw Self-Health Check Script (Sanitized)
#!/usr/bin/env python3
import json
import os
import subprocess
import sys
import time
from datetime import datetime, timezone
from typing import List, Tuple
# OpenClaw Self-Health Checker
#
# FEATURES:
# - Full deep inspection (Cron jobs, Models, Browser, System Resources).
# - Auto-repair capabilities for Cron jobs.
# - Load Awareness: Skips heavy CLI calls if system is already under heavy load.
# - Deadlock Protection: If system remains "busy" for too long (e.g. hung script),
# it force-kills the stuck process and runs the check anyway.
# --- CONFIGURATION ---
# Adjust these paths and values for your environment
OPENCLAW_CMD = ["openclaw"] # Ensure 'openclaw' is in your PATH
SAFE_MODEL = "flash"
SAFE_UPDATE_SCRIPT = os.path.expanduser("~/scripts/openclaw_stable_release_update.sh")
LEGACY_UPDATE_TOKENS = ("auto-stash", "update.run", "git stash")
RECENT_ERROR_WINDOW_MS = 36 * 60 * 60 * 1000
ACTIVE_PROBE_ENV = "OPENCLAW_HEALTH_ACTIVE_PROBE"
ACTIVE_PROBE_MIN_INTERVAL_MS = 6 * 60 * 60 * 1000
ACTIVE_PROBE_STAMP_FILE = os.path.expanduser("~/.openclaw/cron/.last_model_probe_ms")
OAUTH_EXPIRY_WARN_MS = 5 * 60 * 1000
ROUTE_STATE_FILE = os.path.expanduser("~/.openclaw/cron/model_route_guard.json")
FAILURE_STATE_FILE = os.path.expanduser("~/.openclaw/cron/health_check_failure_state.json")
FAILURE_THRESHOLD = 3
MAX_CONSECUTIVE_SKIPS = 4 # Max times to skip due to load (4 * 30m = 2 hours)
ADMIN_USER_ID = os.getenv("OPENCLAW_ADMIN_USER_ID", "YOUR_ADMIN_ID_HERE") # Target for alerts
ROUTE_BUFFER_MS = int(os.getenv("OPENCLAW_ROUTE_BUFFER_MS", "60000") or "60000")
ROUTE_TARGET_PRIMARY = os.getenv(
"OPENCLAW_ROUTE_TARGET_PRIMARY", "google-gemini-cli/gemini-3-pro-preview"
).strip()
ROUTE_FALLBACK_PRIMARY = os.getenv(
"OPENCLAW_ROUTE_FALLBACK_PRIMARY", "openai-codex/gpt-5.2"
).strip()
ROUTE_FLASH_FALLBACK = os.getenv(
"OPENCLAW_ROUTE_FLASH_FALLBACK", "google-gemini-cli/gemini-3-flash-preview"
).strip()
ROUTE_GLM_FALLBACK = os.getenv(
"OPENCLAW_ROUTE_GLM_FALLBACK", "zai/glm-4.7-flash"
).strip()
def parse_model_list_env(env_name: str, default_value: List[str]) -> List[str]:
raw = os.getenv(env_name, "").strip()
if not raw:
return list(default_value)
try:
data = json.loads(raw)
if isinstance(data, list) and all(isinstance(x, str) for x in data):
return [x.strip() for x in data if str(x).strip()]
except Exception:
return list(default_value)
return list(default_value)
ROUTE_NORMAL_FALLBACKS = parse_model_list_env(
"OPENCLAW_ROUTE_NORMAL_FALLBACKS",
[ROUTE_FALLBACK_PRIMARY, ROUTE_FLASH_FALLBACK, ROUTE_GLM_FALLBACK],
)
ROUTE_COOLDOWN_FALLBACKS = parse_model_list_env(
"OPENCLAW_ROUTE_COOLDOWN_FALLBACKS",
[ROUTE_FLASH_FALLBACK, ROUTE_GLM_FALLBACK],
)
def now_ms() -> int:
return int(time.time() * 1000)
def ms_to_utc(ms: int) -> str:
try:
return datetime.fromtimestamp(ms / 1000, tz=timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC")
except Exception:
return str(ms)
def load_route_state() -> dict:
try:
with open(ROUTE_STATE_FILE, "r", encoding="utf-8") as handle:
data = json.load(handle)
if isinstance(data, dict):
return data
except FileNotFoundError:
return {}
except Exception:
return {}
return {}
def save_route_state(state: dict) -> None:
os.makedirs(os.path.dirname(ROUTE_STATE_FILE), exist_ok=True)
with open(ROUTE_STATE_FILE, "w", encoding="utf-8") as handle:
json.dump(state, handle, ensure_ascii=True, indent=2)
def clear_route_state() -> None:
try:
os.remove(ROUTE_STATE_FILE)
except FileNotFoundError:
return
except Exception:
return
def set_primary_model(model_ref: str) -> Tuple[bool, str]:
_, err, code = run_openclaw(
["config", "set", "agents.defaults.model.primary", model_ref],
timeout=180,
)
if code == 0:
return True, ""
return False, err
def set_fallback_models(model_refs: List[str]) -> Tuple[bool, str]:
value = json.dumps(model_refs)
_, err, code = run_openclaw(
["config", "set", "--json", "agents.defaults.model.fallbacks", value],
timeout=180,
)
if code == 0:
return True, ""
return False, err
def extract_provider(model_ref: str) -> str:
if "/" not in model_ref:
return ""
return model_ref.split("/", 1)[0].strip().lower()
def reconcile_primary_route_with_cooldown(
status_payload: dict, report: List[str]
) -> None:
auth = status_payload.get("auth", {}) or {}
unusable = auth.get("unusableProfiles", []) or []
current_primary = str(
status_payload.get("resolvedDefault")
or status_payload.get("defaultModel")
or ""
).strip()
current_fallbacks = status_payload.get("fallbacks", []) or []
if not isinstance(current_fallbacks, list):
current_fallbacks = []
current_fallbacks = [str(x).strip() for x in current_fallbacks if str(x).strip()]
target_provider = extract_provider(ROUTE_TARGET_PRIMARY)
current_ms = now_ms()
state = load_route_state()
cooldown_until_ms = 0
for item in unusable:
provider = str(item.get("provider", "")).strip().lower()
kind = str(item.get("kind", "")).strip().lower()
until = int(item.get("until", 0) or 0)
if provider == target_provider and kind == "cooldown" and until > cooldown_until_ms:
cooldown_until_ms = until
if cooldown_until_ms > current_ms:
restore_at_ms = cooldown_until_ms + ROUTE_BUFFER_MS
if current_primary == ROUTE_TARGET_PRIMARY:
ok, err = set_primary_model(ROUTE_FALLBACK_PRIMARY)
if ok:
report.append(
"✅ Switched primary model to "
f"{ROUTE_FALLBACK_PRIMARY} due to {target_provider} cooldown "
f"(restore after {ms_to_utc(restore_at_ms)})."
)
else:
report.append(
"❌ Failed to switch primary model to fallback "
f"{ROUTE_FALLBACK_PRIMARY}: {err}"
)
if current_primary in {ROUTE_TARGET_PRIMARY, ROUTE_FALLBACK_PRIMARY}:
if current_fallbacks != ROUTE_COOLDOWN_FALLBACKS:
ok, err = set_fallback_models(ROUTE_COOLDOWN_FALLBACKS)
if ok:
report.append(
"✅ Updated fallback models for cooldown mode: "
+ ", ".join(ROUTE_COOLDOWN_FALLBACKS)
)
else:
report.append(
"❌ Failed to update fallback models for cooldown mode: "
f"{err}"
)
else:
report.append(
"⚠️ Primary model was manually set to "
f"{current_primary}; skipping automatic cooldown override."
)
save_route_state(
{
"active": True,
"targetPrimary": ROUTE_TARGET_PRIMARY,
"fallbackPrimary": ROUTE_FALLBACK_PRIMARY,
"provider": target_provider,
"cooldownUntilMs": cooldown_until_ms,
"restoreAtMs": restore_at_ms,
"updatedAtMs": current_ms,
}
)
return
if current_primary == ROUTE_TARGET_PRIMARY and current_fallbacks != ROUTE_NORMAL_FALLBACKS:
fb_ok, fb_err = set_fallback_models(ROUTE_NORMAL_FALLBACKS)
if fb_ok:
report.append(
"✅ Updated fallback models for normal mode: "
+ ", ".join(ROUTE_NORMAL_FALLBACKS)
)
else:
report.append(
"❌ Failed to update fallback models for normal mode: "
f"{fb_err}"
)
active = bool(state.get("active"))
restore_at_ms = int(state.get("restoreAtMs", 0) or 0)
if not active:
return
if current_ms < restore_at_ms:
report.append(
"ℹ️ Waiting to restore primary model "
f"{ROUTE_TARGET_PRIMARY} at {ms_to_utc(restore_at_ms)}."
)
return
if current_primary == ROUTE_FALLBACK_PRIMARY:
ok, err = set_primary_model(ROUTE_TARGET_PRIMARY)
if ok:
report.append(
"✅ Restored primary model to "
f"{ROUTE_TARGET_PRIMARY} after cooldown window."
)
if current_fallbacks != ROUTE_NORMAL_FALLBACKS:
fb_ok, fb_err = set_fallback_models(ROUTE_NORMAL_FALLBACKS)
if fb_ok:
report.append(
"✅ Restored fallback models for normal mode: "
+ ", ".join(ROUTE_NORMAL_FALLBACKS)
)
else:
report.append(
"❌ Failed to restore fallback models for normal mode: "
f"{fb_err}"
)
clear_route_state()
else:
report.append(
"❌ Failed to restore primary model to "
f"{ROUTE_TARGET_PRIMARY}: {err}"
)
state["updatedAtMs"] = current_ms
save_route_state(state)
return
if current_primary == ROUTE_TARGET_PRIMARY:
if current_fallbacks != ROUTE_NORMAL_FALLBACKS:
fb_ok, fb_err = set_fallback_models(ROUTE_NORMAL_FALLBACKS)
if fb_ok:
report.append(
"✅ Ensured fallback models for normal mode: "
+ ", ".join(ROUTE_NORMAL_FALLBACKS)
)
else:
report.append(
"❌ Failed to ensure fallback models for normal mode: "
f"{fb_err}"
)
clear_route_state()
return
report.append(
"⚠️ Primary model changed to "
f"{current_primary}; clearing automatic route guard state."
)
clear_route_state()
def run_command(args: List[str], timeout: int = 120) -> Tuple[str, str, int]:
try:
result = subprocess.run(
args,
capture_output=True,
text=True,
timeout=timeout,
check=False,
)
return result.stdout.strip(), result.stderr.strip(), result.returncode
except subprocess.TimeoutExpired:
return "", f"timeout after {timeout}s", 124
except Exception as exc:
return "", str(exc), 1
def run_openclaw(args: List[str], timeout: int = 120, retries: int = 2) -> Tuple[str, str, int]:
last_stdout, last_stderr, last_code = "", "", 0
for attempt in range(retries + 1):
last_stdout, last_stderr, last_code = run_command(OPENCLAW_CMD + args, timeout=timeout)
if last_code == 0:
return last_stdout, last_stderr, last_code
is_transient = "timeout" in last_stderr.lower() or "timeout" in last_stdout.lower()
if not is_transient or attempt == retries:
break
wait_time = 2 ** (attempt + 1)
time.sleep(wait_time)
return last_stdout, last_stderr, last_code
def check_update_script_health() -> List[str]:
issues: List[str] = []
if not os.path.exists(SAFE_UPDATE_SCRIPT):
# issues.append(f"❌ Missing stable update script: {SAFE_UPDATE_SCRIPT}")
pass # Optional check
# if not os.access(SAFE_UPDATE_SCRIPT, os.X_OK):
# issues.append(f"❌ Stable update script is not executable: {SAFE_UPDATE_SCRIPT}")
return issues
def check_cron_jobs() -> List[str]:
print("--- Checking Cron Jobs (Deep) ---")
stdout, stderr, code = run_openclaw(["cron", "list", "--all", "--json"])
if code != 0:
return [f"ERROR: Could not list cron jobs: {stderr or stdout}"]
try:
payload = json.loads(stdout)
except Exception as exc:
return [f"ERROR: Failed to parse cron list JSON: {exc}"]
jobs = payload.get("jobs", [])
anomalies: List[str] = []
for job in jobs:
job_id = str(job.get("id", ""))
job_name = str(job.get("name", "<unnamed>"))
enabled = bool(job.get("enabled", False))
state = job.get("state", {}) or {}
last_status = str(state.get("lastStatus", ""))
last_error = str(state.get("lastError", ""))
last_run_at_ms = int(state.get("lastRunAtMs", 0) or 0)
message = str((job.get("payload") or {}).get("message", ""))
message_lc = message.lower()
model_name = str((job.get("payload") or {}).get("model", ""))
is_recent_error = (
last_status == "error"
and last_error
and (int(time.time() * 1000) - last_run_at_ms) <= RECENT_ERROR_WINDOW_MS
)
is_model_auth_error = (
"model not allowed" in last_error
or 'No API key found for provider "google"' in last_error
)
resolved_model_auth_error = is_model_auth_error and model_name == SAFE_MODEL
if is_recent_error:
if not resolved_model_auth_error:
anomalies.append(f"❌ Job '{job_name}' failed last run: {last_error}")
needs_model_repair = is_model_auth_error
if is_recent_error and needs_model_repair and model_name != SAFE_MODEL:
print(
f"🔧 Attempting model repair for '{job_name}' -> '{SAFE_MODEL}'..."
)
_, repair_err, repair_code = run_openclaw(
["cron", "edit", job_id, "--model", SAFE_MODEL], timeout=180
)
if repair_code == 0:
anomalies.append(f"✅ Repaired model for '{job_name}'")
else:
anomalies.append(
f"❌ Failed model repair for '{job_name}': {repair_err}"
)
if "self-update" in job_name.lower() and enabled:
if any(token in message_lc for token in LEGACY_UPDATE_TOKENS):
print(f"🔧 Disabling legacy self-update job '{job_name}'...")
_, disable_err, disable_code = run_openclaw(
["cron", "disable", job_id], timeout=120
)
if disable_code == 0:
anomalies.append(
f"✅ Disabled legacy self-update job '{job_name}'"
)
else:
anomalies.append(
f"❌ Failed to disable legacy self-update job '{job_name}': {disable_err}"
)
return anomalies
def should_run_active_probe() -> bool:
if os.getenv(ACTIVE_PROBE_ENV, "0") != "1":
return False
try:
with open(ACTIVE_PROBE_STAMP_FILE, "r", encoding="utf-8") as handle:
last_ms = int(handle.read().strip() or "0")
except FileNotFoundError:
last_ms = 0
except Exception:
last_ms = 0
now_ms = int(time.time() * 1000)
return (now_ms - last_ms) >= ACTIVE_PROBE_MIN_INTERVAL_MS
def mark_active_probe_run() -> None:
os.makedirs(os.path.dirname(ACTIVE_PROBE_STAMP_FILE), exist_ok=True)
with open(ACTIVE_PROBE_STAMP_FILE, "w", encoding="utf-8") as handle:
handle.write(str(int(time.time() * 1000)))
def check_model_health() -> List[str]:
print("--- Checking Model & Alias Health (Deep) ---")
issues: List[str] = []
stdout, stderr, code = run_openclaw(["models", "status", "--json"])
if code != 0:
return [f"ERROR: Could not read model status: {stderr or stdout}"]
try:
data = json.loads(stdout)
reconcile_primary_route_with_cooldown(data, issues)
auth = data.get("auth", {}) or {}
missing = auth.get("missingProvidersInUse", []) or []
if missing:
issues.append(
f"❌ Missing auth for providers in use: {', '.join(str(x) for x in missing)}"
)
unusable = auth.get("unusableProfiles", []) or []
for item in unusable:
provider = str(item.get("provider", "unknown"))
kind = str(item.get("kind", "unknown"))
remaining_ms = int(item.get("remainingMs", 0) or 0)
if kind == "cooldown":
issues.append(
f"⚠️ Provider '{provider}' is cooling down ({max(remaining_ms // 1000, 0)}s remaining)."
)
else:
issues.append(f"⚠️ Profile unusable: provider={provider}, reason={kind}")
oauth = ((auth.get("oauth", {}) or {}).get("profiles", [])) or []
for profile in oauth:
provider = str(profile.get("provider", "unknown"))
status = str(profile.get("status", "unknown"))
if status != "ok":
issues.append(f"⚠️ OAuth profile unhealthy: {provider} status={status}")
except Exception as exc:
return [f"ERROR: Failed to parse model status JSON: {exc}"]
if not should_run_active_probe():
return issues
print("--- Running active model probe ---")
probe_stdout, probe_stderr, probe_code = run_openclaw(
["models", "status", "--probe", "--json"], timeout=180
)
mark_active_probe_run()
if probe_code != 0:
issues.append(f"ERROR: Active model probe failed: {probe_stderr or probe_stdout}")
return issues
try:
probe = json.loads(probe_stdout)
results = (((probe.get("auth", {}) or {}).get("probes", {}) or {}).get("results", []))
for result in results:
status = str(result.get("status", "unknown"))
provider = str(result.get("provider", "unknown"))
model = str(result.get("model", "unknown"))
if status not in {"ok", "unknown"}:
issues.append(f"❌ Active probe failed: {provider}/{model} status={status}")
elif status == "unknown":
err = str(result.get("error", "unknown"))
issues.append(f"⚠️ Active probe uncertain: {provider}/{model} ({err})")
except Exception as exc:
issues.append(f"ERROR: Failed to parse active probe JSON: {exc}")
return issues
def check_browser_relay() -> List[str]:
print("--- Checking Browser Relay (Deep) ---")
stdout, stderr, code = run_openclaw(["browser", "status", "--json"])
if code != 0:
return [f"ERROR: Could not check browser status: {stderr or stdout}"]
try:
payload = json.loads(stdout)
if not payload.get("running"):
return []
return []
except Exception as exc:
return [f"ERROR: Failed to parse browser status: {exc}"]
def check_system_resources() -> List[str]:
print("--- Checking System Resources ---")
issues: List[str] = []
disk_out, _, disk_code = run_command(["bash", "-lc", "df -h / --output=pcent | tail -1"])
if disk_code == 0 and disk_out:
try:
disk_usage = int(disk_out.replace("%", "").strip())
if disk_usage > 90:
issues.append(f"🚨 Disk usage critical: {disk_usage}%")
except ValueError:
issues.append(f"⚠️ Could not parse disk usage output: {disk_out}")
mem_out, _, mem_code = run_command(
["bash", "-lc", "free | awk '/Mem:/ {printf \"%.1f\", ($3/$2)*100}'"]
)
if mem_code == 0 and mem_out:
try:
mem_usage = float(mem_out)
if mem_usage > 90:
issues.append(f"🚨 Memory usage high: {mem_usage:.1f}%")
except ValueError:
issues.append(f"⚠️ Could not parse memory usage output: {mem_out}")
return issues
def get_system_diagnostics() -> str:
diag = ["\n--- System Diagnostics ---"]
stdout, _, code = run_command(["systemctl", "--user", "status", "openclaw-gateway", "--no-pager"])
if code == 0:
diag.append(f"Gateway Status:\n{stdout}")
else:
diag.append("Gateway Status: Service not found or error checking status.")
stdout, _, code = run_command(["journalctl", "--user", "-u", "openclaw-gateway", "-n", "20", "--no-pager"])
if code == 0:
diag.append(f"Recent Logs:\n{stdout}")
return "\n".join(diag)
def load_failure_state() -> dict:
try:
with open(FAILURE_STATE_FILE, "r", encoding="utf-8") as handle:
return json.load(handle)
except Exception:
return {"consecutive_failures": 0, "consecutive_skips": 0}
def save_failure_state(state: dict) -> None:
os.makedirs(os.path.dirname(FAILURE_STATE_FILE), exist_ok=True)
with open(FAILURE_STATE_FILE, "w", encoding="utf-8") as handle:
json.dump(state, handle, indent=2)
HEAVY_SIGS = [
"knowledge/sync_script.py",
"knowledge/auto_sync.sh",
"apt-get",
"dpkg"
]
def kill_heavy_processes():
print("🗡️ Force-killing stuck heavy processes...")
for sig in HEAVY_SIGS:
try:
subprocess.run(["pkill", "-9", "-f", sig], check=False)
except: pass
def check_heavy_load(state) -> bool:
# Check for known heavy processes
found_heavy = False
try:
for sig in HEAVY_SIGS:
res = subprocess.run(["pgrep", "-f", sig], stdout=subprocess.DEVNULL)
if res.returncode == 0:
found_heavy = True
break
except: pass
if found_heavy:
skips = state.get("consecutive_skips", 0) + 1
state["consecutive_skips"] = skips
if skips >= MAX_CONSECUTIVE_SKIPS:
print(f"🚨 Deadlock Detected! System busy for {skips} consecutive checks.")
kill_heavy_processes()
state["consecutive_skips"] = 0 # Reset after kill
save_failure_state(state)
# Send specific alert for deadlock
try:
subprocess.run([
"openclaw", "message", "send",
"--target", ADMIN_USER_ID,
"--message", f"🚨 **System Deadlock Resolved**\nProcess stuck for >2h. Force-killed: {HEAVY_SIGS}"
], check=False, timeout=60)
except: pass
return False # Continue to deep check since we killed the blockers
print(f"ℹ️ High load detected (Skip {skips}/{MAX_CONSECUTIVE_SKIPS}). Skipping deep checks.")
save_failure_state(state)
return True
# Not heavy
if state.get("consecutive_skips", 0) > 0:
state["consecutive_skips"] = 0
save_failure_state(state)
return False
def main() -> None:
report: List[str] = []
state = load_failure_state()
print(f"Starting Health Check ({datetime.now()})...")
# 1. Always run lightweight resource checks
report.extend(check_update_script_health())
report.extend(check_system_resources())
# 2. Smart Throttling with Deadlock Protection
if check_heavy_load(state):
pass # Skipped due to load
else:
# Run heavy checks
report.extend(check_cron_jobs())
report.extend(check_model_health())
report.extend(check_browser_relay())
if not report:
if state.get("consecutive_failures", 0) > 0:
print(f"✅ System recovered after {state['consecutive_failures']} failures.")
state["consecutive_failures"] = 0
save_failure_state(state)
try:
subprocess.run([
"openclaw", "message", "send",
"--target", ADMIN_USER_ID,
"--message", "✅ **System Health Recovered**\nAll systems nominal."
], check=False, timeout=60)
except: pass
print("All systems nominal.")
sys.exit(0)
# Increment failure count
state["consecutive_failures"] = state.get("consecutive_failures", 0) + 1
save_failure_state(state)
msg = "\n".join(report)
print(msg)
if state["consecutive_failures"] >= FAILURE_THRESHOLD:
print(f"🚨 Failure threshold reached ({state['consecutive_failures']}). Sending alert...")
try:
diagnostics = get_system_diagnostics()
if len(diagnostics) > 3000:
diagnostics = diagnostics[:3000] + "\n...[truncated]"
alert_text = (
f"🚨 **Self-Health Check Alert** ({state['consecutive_failures']} failures)\n\n"
f"{msg}\n\n"
f"```\n{diagnostics}\n```"
)
channel = os.getenv("OPENCLAW_HEALTH_ALERT_CHANNEL", "telegram").strip() or "telegram"
target = os.getenv("OPENCLAW_HEALTH_ALERT_TARGET", ADMIN_USER_ID).strip() or ADMIN_USER_ID
subprocess.run(
[
"openclaw",
"message",
"send",
"--channel",
channel,
"--target",
target,
"--message",
alert_text,
],
check=False,
timeout=60,
)
except Exception as exc:
print(f"Failed to send alert message: {exc}")
else:
print(f"ℹ️ Issue detected, but suppression active (Failure {state['consecutive_failures']}/{FAILURE_THRESHOLD}).")
sys.exit(1)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment