Skip to content

Instantly share code, notes, and snippets.

@cconstab
Created December 19, 2025 06:49
Show Gist options
  • Select an option

  • Save cconstab/8c24f10af8e19283f4ec67853f84a874 to your computer and use it in GitHub Desktop.

Select an option

Save cconstab/8c24f10af8e19283f4ec67853f84a874 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
"""
at_doctor - atPlatform Environment Diagnostic Tool (Python Version)
A diagnostic tool for checking if your environment is ready to run atPlatform applications.
"""
import argparse
import socket
import ssl
import sys
import time
import platform
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
from enum import Enum
class TestStatus(Enum):
PASS = ("βœ“", "PASS")
FAIL = ("βœ—", "FAIL")
WARNING = ("⚠", "WARN")
SKIP = ("β—‹", "SKIP")
@property
def icon(self):
return self.value[0]
@property
def display_name(self):
return self.value[1]
@dataclass
class TestResult:
test_name: str
status: TestStatus
message: str
details: Optional[Dict[str, Any]] = None
duration_ms: int = 0
@property
def passed(self):
return self.status == TestStatus.PASS
@property
def failed(self):
return self.status == TestStatus.FAIL
@property
def has_warning(self):
return self.status == TestStatus.WARNING
def __str__(self):
return f"{self.status.icon} [{self.status.display_name}] {self.test_name}: {self.message}"
def to_detailed_string(self):
lines = [str(self)]
if self.details:
lines.append(" Details:")
for key, value in self.details.items():
lines.append(f" {key}: {value}")
lines.append(f" Duration: {self.duration_ms}ms")
return "\n".join(lines)
class DiagnosticTest:
"""Base class for all diagnostic tests"""
@property
def name(self) -> str:
raise NotImplementedError
@property
def description(self) -> str:
raise NotImplementedError
def run(self) -> TestResult:
raise NotImplementedError
class OSInfoTest(DiagnosticTest):
"""Test to identify OS platform and version"""
@property
def name(self) -> str:
return "Operating System"
@property
def description(self) -> str:
return "Identify OS platform and version"
def run(self) -> TestResult:
start = time.time()
try:
system = platform.system().lower()
version = platform.version()
processors = f"{platform.processor()}"
python_version = platform.python_version()
details = {
"Platform": system,
"Version": version,
"Processor": processors,
"Python": python_version,
}
duration_ms = int((time.time() - start) * 1000)
return TestResult(
test_name=self.name,
status=TestStatus.PASS,
message=f"{system} - Python {python_version}",
details=details,
duration_ms=duration_ms,
)
except Exception as e:
duration_ms = int((time.time() - start) * 1000)
return TestResult(
test_name=self.name,
status=TestStatus.FAIL,
message=f"Failed to get OS info: {e}",
duration_ms=duration_ms,
)
class DNSLookupTest(DiagnosticTest):
"""Test DNS resolution"""
def __init__(self, hostname: str = "root.atsign.org"):
self.hostname = hostname
@property
def name(self) -> str:
return "DNS Lookup"
@property
def description(self) -> str:
return f"Test DNS resolution for {self.hostname}"
def run(self) -> TestResult:
start = time.time()
try:
addresses = socket.getaddrinfo(self.hostname, None)
ips = list(set([addr[4][0] for addr in addresses]))
primary_type = "IPv6" if ":" in ips[0] else "IPv4"
details = {
"Hostname": self.hostname,
"IP Addresses": ", ".join(ips),
"Address Count": len(ips),
"Primary Type": primary_type,
}
duration_ms = int((time.time() - start) * 1000)
return TestResult(
test_name=self.name,
status=TestStatus.PASS,
message=f"Resolved {self.hostname} to {len(ips)} address(es)",
details=details,
duration_ms=duration_ms,
)
except socket.gaierror as e:
duration_ms = int((time.time() - start) * 1000)
return TestResult(
test_name=self.name,
status=TestStatus.FAIL,
message=f"DNS lookup failed for {self.hostname}",
details={"Hostname": self.hostname, "Error": str(e)},
duration_ms=duration_ms,
)
except Exception as e:
duration_ms = int((time.time() - start) * 1000)
return TestResult(
test_name=self.name,
status=TestStatus.FAIL,
message=f"Unexpected error: {e}",
details={"Hostname": self.hostname},
duration_ms=duration_ms,
)
class TLSConnectionTest(DiagnosticTest):
"""Test TLS connection to a server"""
def __init__(self, hostname: str = "root.atsign.org", port: int = 64):
self.hostname = hostname
self.port = port
@property
def name(self) -> str:
return "TLS Connection"
@property
def description(self) -> str:
return f"Test TLS connection to {self.hostname}:{self.port}"
def run(self) -> TestResult:
start = time.time()
sock = None
ssl_sock = None
try:
# Create SSL context
context = ssl.create_default_context()
# Create socket and connect with TLS
sock = socket.create_connection((self.hostname, self.port), timeout=10)
ssl_sock = context.wrap_socket(sock, server_hostname=self.hostname)
# Get certificate info
cert = ssl_sock.getpeercert()
subject = dict(x[0] for x in cert.get("subject", []))
issuer = dict(x[0] for x in cert.get("issuer", []))
details = {
"Host": self.hostname,
"Port": self.port,
"Connected": "Yes",
"TLS Version": ssl_sock.version(),
"Certificate Subject": subject.get("commonName", "N/A"),
"Certificate Issuer": issuer.get("commonName", "N/A"),
}
duration_ms = int((time.time() - start) * 1000)
return TestResult(
test_name=self.name,
status=TestStatus.PASS,
message=f"Successfully connected to {self.hostname}:{self.port}",
details=details,
duration_ms=duration_ms,
)
except socket.timeout:
duration_ms = int((time.time() - start) * 1000)
return TestResult(
test_name=self.name,
status=TestStatus.FAIL,
message="Connection timeout",
details={"Host": self.hostname, "Port": self.port},
duration_ms=duration_ms,
)
except Exception as e:
duration_ms = int((time.time() - start) * 1000)
return TestResult(
test_name=self.name,
status=TestStatus.FAIL,
message=f"Connection failed: {e}",
details={"Host": self.hostname, "Port": self.port},
duration_ms=duration_ms,
)
finally:
if ssl_sock:
ssl_sock.close()
elif sock:
sock.close()
class AtSignConnectionTest(DiagnosticTest):
"""Test connection to an atSign's secondary server"""
def __init__(
self,
atsign: str = "@rv_am",
root_server: str = "root.atsign.org",
root_port: int = 64,
):
self.atsign = atsign if atsign.startswith("@") else f"@{atsign}"
self.root_server = root_server
self.root_port = root_port
@property
def name(self) -> str:
return "atSign Connection"
@property
def description(self) -> str:
return f"Test connection to {self.atsign} secondary server"
def run(self) -> TestResult:
start = time.time()
root_sock = None
secondary_sock = None
try:
# Step 1: Connect to root server
context = ssl.create_default_context()
sock = socket.create_connection(
(self.root_server, self.root_port), timeout=10
)
root_sock = context.wrap_socket(sock, server_hostname=self.root_server)
# Step 2: Lookup secondary server
atsign_query = self.atsign[1:] # Remove @ prefix
root_sock.sendall(f"{atsign_query}\n".encode())
response = b""
while b"\n" not in response:
chunk = root_sock.recv(1024)
if not chunk:
break
response += chunk
response_str = response.decode().split("\n")[0].strip()
root_sock.close()
root_sock = None
if "null" in response_str:
duration_ms = int((time.time() - start) * 1000)
return TestResult(
test_name=self.name,
status=TestStatus.FAIL,
message=f"atSign {self.atsign} not found",
details={"atSign": self.atsign, "Response": response_str},
duration_ms=duration_ms,
)
# Parse secondary address
address_part = response_str[1:] if response_str.startswith("@") else response_str
last_colon = address_part.rfind(":")
if last_colon == -1:
duration_ms = int((time.time() - start) * 1000)
return TestResult(
test_name=self.name,
status=TestStatus.FAIL,
message="Invalid secondary address format",
details={"atSign": self.atsign, "Response": response_str},
duration_ms=duration_ms,
)
host = address_part[:last_colon]
try:
port = int(address_part[last_colon + 1:].strip())
except ValueError:
duration_ms = int((time.time() - start) * 1000)
return TestResult(
test_name=self.name,
status=TestStatus.FAIL,
message="Invalid port in secondary address",
details={"atSign": self.atsign, "Response": response_str},
duration_ms=duration_ms,
)
details = {
"atSign": self.atsign,
"Secondary Host": host,
"Secondary Port": port,
}
# Step 3: Connect to secondary server
sock = socket.create_connection((host, port), timeout=10)
secondary_sock = context.wrap_socket(sock, server_hostname=host)
details["Connection"] = "Success"
details["TLS"] = "Established"
# Send info command
secondary_sock.sendall(b"info\n")
info_response = b""
while b"\n" not in info_response:
chunk = secondary_sock.recv(1024)
if not chunk:
break
info_response += chunk
info_str = info_response.decode().split("\n")[0].strip()
if info_str and not info_str.startswith("error"):
details["atServer Response"] = "OK"
details["Info"] = info_str[:50] + "..." if len(info_str) > 50 else info_str
duration_ms = int((time.time() - start) * 1000)
return TestResult(
test_name=self.name,
status=TestStatus.PASS,
message=f"Connected to {self.atsign} at {host}:{port}",
details=details,
duration_ms=duration_ms,
)
except Exception as e:
duration_ms = int((time.time() - start) * 1000)
return TestResult(
test_name=self.name,
status=TestStatus.FAIL,
message=f"Connection failed: {e}",
details={"atSign": self.atsign},
duration_ms=duration_ms,
)
finally:
if root_sock:
root_sock.close()
if secondary_sock:
secondary_sock.close()
class ProxyConnectionTest(DiagnosticTest):
"""Test connection via proxy server"""
def __init__(
self,
atsign: str = "@rv_am",
proxy_server: str = "proxy0001.atsign.org",
proxy_port: int = 443,
):
self.atsign = atsign if atsign.startswith("@") else f"@{atsign}"
self.proxy_server = proxy_server
self.proxy_port = proxy_port
@property
def name(self) -> str:
return "Proxy Connection"
@property
def description(self) -> str:
return f"Test connection to {self.atsign} via proxy (port 443)"
def run(self) -> TestResult:
start = time.time()
proxy_sock = None
secondary_sock = None
try:
# Step 1: Connect to proxy and lookup atSign
context = ssl.create_default_context()
sock = socket.create_connection(
(self.proxy_server, self.proxy_port), timeout=10
)
proxy_sock = context.wrap_socket(sock, server_hostname=self.proxy_server)
# Lookup via proxy - send atSign without @
atsign_query = self.atsign[1:] if self.atsign.startswith('@') else self.atsign
proxy_sock.sendall(f"{atsign_query}\n".encode())
# Read lookup response - need to handle @ prompt and actual response
response = b""
while True:
chunk = proxy_sock.recv(1024)
if not chunk:
break
response += chunk
if b"\n" in response:
break
response_str = response.decode().strip()
# Get first real line (skip @ prompt)
for line in response_str.split("\n"):
line = line.strip()
if line and line != "@":
response_str = line
break
# Close first connection
proxy_sock.close()
proxy_sock = None
if "null" in response_str:
duration_ms = int((time.time() - start) * 1000)
return TestResult(
test_name=self.name,
status=TestStatus.FAIL,
message=f"atSign {self.atsign} not found via proxy",
details={"atSign": self.atsign, "Proxy Response": response_str},
duration_ms=duration_ms,
)
# Parse secondary server address
# Format: @uuid.hostname:port or hostname:port
address_part = response_str.lstrip("@")
if ":" not in address_part:
duration_ms = int((time.time() - start) * 1000)
return TestResult(
test_name=self.name,
status=TestStatus.FAIL,
message="Invalid secondary address format from proxy",
details={"atSign": self.atsign, "Proxy Response": response_str},
duration_ms=duration_ms,
)
# Split by last colon to separate host:port
last_colon = address_part.rfind(":")
host = address_part[:last_colon]
port_str = address_part[last_colon + 1:].strip()
try:
port = int(port_str)
except ValueError:
duration_ms = int((time.time() - start) * 1000)
return TestResult(
test_name=self.name,
status=TestStatus.FAIL,
message="Invalid port in secondary address from proxy",
details={
"atSign": self.atsign,
"Proxy Response": response_str,
"Parsed Host": host,
"Parsed Port": port_str,
},
duration_ms=duration_ms,
)
details = {
"atSign": self.atsign,
"Proxy Server": f"{self.proxy_server}:{self.proxy_port}",
"Secondary Host": host,
"Secondary Port": port,
}
# Step 2: Connect again to proxy to establish connection to secondary
sock = socket.create_connection(
(self.proxy_server, self.proxy_port), timeout=10
)
secondary_sock = context.wrap_socket(sock, server_hostname=self.proxy_server)
secondary_sock.settimeout(5.0)
details["Proxy Connection"] = "Success"
details["TLS to Proxy"] = "Established"
# Send from: command
secondary_sock.sendall(f"from:{self.atsign}\n".encode())
# Read from: response
from_response = b""
for _ in range(5): # Try multiple reads
try:
chunk = secondary_sock.recv(1024)
if chunk:
from_response += chunk
if b"data:" in from_response:
break
except socket.timeout:
break
time.sleep(0.1)
from_str = from_response.decode().strip()
# Look for data: in response
has_data = False
for line in from_str.split("\n"):
line = line.strip().lstrip("@")
if line.startswith("data:"):
has_data = True
details["from: Response"] = "OK"
break
if not has_data:
details["from: Response"] = from_str[:100] if from_str else "(empty)"
duration_ms = int((time.time() - start) * 1000)
return TestResult(
test_name=self.name,
status=TestStatus.FAIL,
message="Failed to establish proxy connection to atSign",
details=details,
duration_ms=duration_ms,
)
# Send info command
secondary_sock.sendall(b"info\n")
# Read info response
info_response = b""
for _ in range(5):
try:
chunk = secondary_sock.recv(1024)
if chunk:
info_response += chunk
if b"data:" in info_response:
break
except socket.timeout:
break
time.sleep(0.1)
info_str = info_response.decode().strip()
# Look for data: in info response
if info_str:
for line in info_str.split("\n"):
line = line.strip().lstrip("@")
if line.startswith("data:"):
details["atServer Response"] = "OK"
details["Info"] = line[:50] + "..." if len(line) > 50 else line
break
duration_ms = int((time.time() - start) * 1000)
return TestResult(
test_name=self.name,
status=TestStatus.PASS,
message=f"Connected to {self.atsign} via proxy at {self.proxy_server}:{self.proxy_port}",
details=details,
duration_ms=duration_ms,
)
except socket.timeout:
duration_ms = int((time.time() - start) * 1000)
return TestResult(
test_name=self.name,
status=TestStatus.FAIL,
message="Proxy connection timeout",
details={"atSign": self.atsign, "Proxy": f"{self.proxy_server}:{self.proxy_port}"},
duration_ms=duration_ms,
)
except ssl.SSLError as e:
duration_ms = int((time.time() - start) * 1000)
return TestResult(
test_name=self.name,
status=TestStatus.FAIL,
message=f"SSL/TLS error: {str(e)}",
details={"atSign": self.atsign, "Proxy": f"{self.proxy_server}:{self.proxy_port}"},
duration_ms=duration_ms,
)
except Exception as e:
duration_ms = int((time.time() - start) * 1000)
return TestResult(
test_name=self.name,
status=TestStatus.FAIL,
message=f"Proxy connection failed: {e}",
details={"atSign": self.atsign, "Proxy": f"{self.proxy_server}:{self.proxy_port}"},
duration_ms=duration_ms,
)
finally:
if proxy_sock:
try:
proxy_sock.close()
except:
pass
if secondary_sock:
try:
secondary_sock.close()
except:
pass
class DiagnosticRunner:
"""Manages and runs diagnostic tests"""
def __init__(self, tests: List[DiagnosticTest], verbose: bool = False):
self.tests = tests
self.verbose = verbose
def run_all(self) -> List[TestResult]:
"""Run all tests and return results"""
results = []
for test in self.tests:
if self.verbose:
print(f"\nπŸ” Running: {test.name}")
print(f" {test.description}")
result = test.run()
results.append(result)
if self.verbose:
print(result.to_detailed_string())
else:
print(result)
return results
def generate_summary(self, results: List[TestResult]) -> str:
"""Generate a summary report"""
passed = sum(1 for r in results if r.passed)
failed = sum(1 for r in results if r.failed)
warnings = sum(1 for r in results if r.has_warning)
lines = [
"",
"=" * 60,
"DIAGNOSTIC SUMMARY",
"=" * 60,
f"Total Tests: {len(results)}",
f"βœ“ Passed: {passed}",
f"βœ— Failed: {failed}",
f"⚠ Warnings: {warnings}",
f"β—‹ Skipped: 0",
]
# Check for special proxy-only case
atsign_test = next((r for r in results if r.test_name == "atSign Connection"), None)
proxy_test = next((r for r in results if r.test_name == "Proxy Connection"), None)
has_atsign_failure = atsign_test and atsign_test.failed
has_proxy_success = proxy_test and proxy_test.passed
proxy_only_mode = has_atsign_failure and has_proxy_success
if failed > 0:
other_failures = [
r for r in results
if r.failed and r.test_name not in ["atSign Connection", "Proxy Connection"]
]
if proxy_only_mode and not other_failures:
lines.append("")
lines.append("βœ… Environment is ready for atPlatform applications!")
lines.append(" (via port 443 proxy only - direct connections blocked)")
else:
lines.append("")
lines.append("❌ Environment is NOT ready for atPlatform applications")
lines.append("")
lines.append("Failed tests:")
for result in results:
if result.failed:
lines.append(f" β€’ {result.test_name}: {result.message}")
else:
lines.append("")
lines.append("βœ… Environment is ready for atPlatform applications!")
total_duration = sum(r.duration_ms for r in results)
lines.append(f"\nTotal duration: {total_duration}ms")
lines.append("=" * 60)
return "\n".join(lines)
def main():
parser = argparse.ArgumentParser(
description="at_doctor - atPlatform Environment Diagnostic Tool",
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument(
"-v", "--verbose",
action="store_true",
help="Show detailed output for each test",
)
parser.add_argument(
"-a", "--atsign",
default="@rv_am",
help="atSign to test connection with (default: @rv_am)",
)
parser.add_argument(
"--root",
default="root.atsign.org",
help="Root server hostname (default: root.atsign.org)",
)
args = parser.parse_args()
print("=" * 60)
print(" at_doctor - atPlatform Diagnostics")
print("=" * 60)
print()
print("Running environment health checks...\n")
# Create test suite
tests = [
OSInfoTest(),
DNSLookupTest(hostname=args.root),
TLSConnectionTest(hostname=args.root, port=64),
AtSignConnectionTest(atsign=args.atsign),
ProxyConnectionTest(atsign=args.atsign),
]
runner = DiagnosticRunner(tests=tests, verbose=args.verbose)
# Run all tests
results = runner.run_all()
# Check for special case
atsign_test = next((r for r in results if r.test_name == "atSign Connection"), None)
proxy_test = next((r for r in results if r.test_name == "Proxy Connection"), None)
if atsign_test and proxy_test:
has_atsign_failure = atsign_test.failed
has_proxy_success = proxy_test.passed
if has_atsign_failure and has_proxy_success and args.verbose:
print("\n⚠️ NOTE: Direct atSign connection failed, but proxy connection succeeded.")
print(" Your environment can use atPlatform through port 443 (HTTPS) only.\n")
# Print summary
print(runner.generate_summary(results))
# Exit with appropriate code
other_failures = any(
r.failed and r.test_name not in ["atSign Connection", "Proxy Connection"]
for r in results
)
connectivity_ok = not atsign_test.failed if atsign_test else True
if proxy_test and proxy_test.passed:
connectivity_ok = True
if other_failures or not connectivity_ok:
sys.exit(1)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment