|
#!/usr/bin/env python3 |
|
""" |
|
Orb MQTT Bridge |
|
Polls Orb Local API and publishes metrics to MQTT broker |
|
""" |
|
|
|
import os |
|
import sys |
|
import json |
|
import time |
|
import logging |
|
import requests |
|
from datetime import datetime |
|
import paho.mqtt.client as mqtt |
|
|
|
# Configure logging |
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' |
|
) |
|
logger = logging.getLogger('orb-mqtt-bridge') |
|
|
|
# Configuration from environment variables |
|
ORB_API_URL = os.getenv('ORB_API_URL', 'http://localhost:8000') |
|
ORB_ID = os.getenv('ORB_ID', 'y82dez394jdmjq5jd7mcj4td6kxd') # Orb sensor ID |
|
MQTT_BROKER = os.getenv('MQTT_BROKER', 'localhost') |
|
MQTT_PORT = int(os.getenv('MQTT_PORT', '1883')) |
|
MQTT_TOPIC_PREFIX = os.getenv('MQTT_TOPIC_PREFIX', 'orb') |
|
POLL_INTERVAL = int(os.getenv('POLL_INTERVAL', '60')) # seconds |
|
MQTT_USERNAME = os.getenv('MQTT_USERNAME', '') |
|
MQTT_PASSWORD = os.getenv('MQTT_PASSWORD', '') |
|
|
|
# Datasets to fetch from Orb API |
|
DATASETS = [ |
|
'scores_1m', |
|
'responsiveness_1s', |
|
'web_responsiveness_results', |
|
'speed_results', |
|
'reliability_1s' |
|
] |
|
|
|
class OrbMQTTBridge: |
|
def __init__(self): |
|
self.mqtt_client = None |
|
self.connected = False |
|
self.setup_mqtt() |
|
|
|
def setup_mqtt(self): |
|
"""Initialize MQTT client""" |
|
self.mqtt_client = mqtt.Client(client_id='orb-mqtt-bridge') |
|
|
|
if MQTT_USERNAME and MQTT_PASSWORD: |
|
self.mqtt_client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD) |
|
|
|
self.mqtt_client.on_connect = self.on_connect |
|
self.mqtt_client.on_disconnect = self.on_disconnect |
|
|
|
try: |
|
logger.info(f"Connecting to MQTT broker at {MQTT_BROKER}:{MQTT_PORT}") |
|
self.mqtt_client.connect(MQTT_BROKER, MQTT_PORT, 60) |
|
self.mqtt_client.loop_start() |
|
except Exception as e: |
|
logger.error(f"Failed to connect to MQTT broker: {e}") |
|
sys.exit(1) |
|
|
|
def on_connect(self, client, userdata, flags, rc): |
|
"""Callback for when the client connects to the broker""" |
|
if rc == 0: |
|
self.connected = True |
|
logger.info("Connected to MQTT broker successfully") |
|
else: |
|
logger.error(f"Failed to connect to MQTT broker with code {rc}") |
|
|
|
def on_disconnect(self, client, userdata, rc): |
|
"""Callback for when the client disconnects from the broker""" |
|
self.connected = False |
|
if rc != 0: |
|
logger.warning(f"Unexpected disconnect from MQTT broker (code {rc})") |
|
|
|
def fetch_orb_data(self, dataset): |
|
"""Fetch data from Orb Local API""" |
|
url = f"{ORB_API_URL}/api/v2/datasets/{dataset}.json" |
|
params = {'id': ORB_ID} |
|
try: |
|
response = requests.get(url, params=params, timeout=10) |
|
response.raise_for_status() |
|
return response.json() |
|
except requests.exceptions.RequestException as e: |
|
logger.warning(f"Failed to fetch {dataset}: {e}") |
|
return None |
|
|
|
def publish_to_mqtt(self, topic, payload, retain=False): |
|
"""Publish data to MQTT broker""" |
|
if not self.connected: |
|
logger.warning("Not connected to MQTT broker, skipping publish") |
|
return False |
|
|
|
try: |
|
result = self.mqtt_client.publish(topic, json.dumps(payload), qos=0, retain=retain) |
|
if result.rc == mqtt.MQTT_ERR_SUCCESS: |
|
logger.debug(f"Published to {topic}") |
|
return True |
|
else: |
|
logger.error(f"Failed to publish to {topic}: {result.rc}") |
|
return False |
|
except Exception as e: |
|
logger.error(f"Error publishing to MQTT: {e}") |
|
return False |
|
|
|
def process_scores(self, data): |
|
"""Process and publish Orb scores""" |
|
if not data or 'data' not in data: |
|
return |
|
|
|
# Publish overall scores |
|
for entry in data['data']: |
|
timestamp = entry.get('time', '') |
|
|
|
# Overall Orb Score |
|
if 'orb_score' in entry: |
|
self.publish_to_mqtt( |
|
f"{MQTT_TOPIC_PREFIX}/score/orb", |
|
{ |
|
'value': entry['orb_score'], |
|
'timestamp': timestamp |
|
} |
|
) |
|
|
|
# Responsiveness Score |
|
if 'responsiveness_score' in entry: |
|
self.publish_to_mqtt( |
|
f"{MQTT_TOPIC_PREFIX}/score/responsiveness", |
|
{ |
|
'value': entry['responsiveness_score'], |
|
'timestamp': timestamp |
|
} |
|
) |
|
|
|
# Reliability Score |
|
if 'reliability_score' in entry: |
|
self.publish_to_mqtt( |
|
f"{MQTT_TOPIC_PREFIX}/score/reliability", |
|
{ |
|
'value': entry['reliability_score'], |
|
'timestamp': timestamp |
|
} |
|
) |
|
|
|
# Speed Score |
|
if 'speed_score' in entry: |
|
self.publish_to_mqtt( |
|
f"{MQTT_TOPIC_PREFIX}/score/speed", |
|
{ |
|
'value': entry['speed_score'], |
|
'timestamp': timestamp |
|
} |
|
) |
|
|
|
def process_dataset(self, dataset, data): |
|
"""Process and publish dataset""" |
|
if not data: |
|
return |
|
|
|
# Publish full dataset |
|
self.publish_to_mqtt( |
|
f"{MQTT_TOPIC_PREFIX}/dataset/{dataset}", |
|
data, |
|
retain=True |
|
) |
|
|
|
# Special handling for scores |
|
if dataset == 'scores_1m': |
|
self.process_scores(data) |
|
|
|
def run(self): |
|
"""Main loop""" |
|
logger.info("Starting Orb MQTT Bridge") |
|
logger.info(f"Orb API URL: {ORB_API_URL}") |
|
logger.info(f"MQTT Broker: {MQTT_BROKER}:{MQTT_PORT}") |
|
logger.info(f"Poll Interval: {POLL_INTERVAL} seconds") |
|
|
|
# Wait for MQTT connection |
|
wait_time = 0 |
|
while not self.connected and wait_time < 30: |
|
logger.info("Waiting for MQTT connection...") |
|
time.sleep(1) |
|
wait_time += 1 |
|
|
|
if not self.connected: |
|
logger.error("Failed to connect to MQTT broker after 30 seconds") |
|
sys.exit(1) |
|
|
|
# Publish status |
|
self.publish_to_mqtt( |
|
f"{MQTT_TOPIC_PREFIX}/bridge/status", |
|
{ |
|
'status': 'online', |
|
'timestamp': datetime.utcnow().isoformat() |
|
}, |
|
retain=True |
|
) |
|
|
|
while True: |
|
try: |
|
logger.info("Polling Orb API...") |
|
|
|
for dataset in DATASETS: |
|
data = self.fetch_orb_data(dataset) |
|
if data: |
|
self.process_dataset(dataset, data) |
|
logger.info(f"Successfully published {dataset}") |
|
|
|
# Publish last update timestamp |
|
self.publish_to_mqtt( |
|
f"{MQTT_TOPIC_PREFIX}/bridge/last_update", |
|
{ |
|
'timestamp': datetime.utcnow().isoformat() |
|
}, |
|
retain=True |
|
) |
|
|
|
logger.info(f"Sleeping for {POLL_INTERVAL} seconds...") |
|
time.sleep(POLL_INTERVAL) |
|
|
|
except KeyboardInterrupt: |
|
logger.info("Shutting down...") |
|
self.publish_to_mqtt( |
|
f"{MQTT_TOPIC_PREFIX}/bridge/status", |
|
{ |
|
'status': 'offline', |
|
'timestamp': datetime.utcnow().isoformat() |
|
}, |
|
retain=True |
|
) |
|
self.mqtt_client.loop_stop() |
|
self.mqtt_client.disconnect() |
|
break |
|
except Exception as e: |
|
logger.error(f"Error in main loop: {e}", exc_info=True) |
|
time.sleep(10) |
|
|
|
if __name__ == '__main__': |
|
bridge = OrbMQTTBridge() |
|
bridge.run() |