Skip to content

Instantly share code, notes, and snippets.

@jeromelefeuvre
Last active February 10, 2026 01:07
Show Gist options
  • Select an option

  • Save jeromelefeuvre/61d88535a394a9f50eaf7940ff707783 to your computer and use it in GitHub Desktop.

Select an option

Save jeromelefeuvre/61d88535a394a9f50eaf7940ff707783 to your computer and use it in GitHub Desktop.
orb-mqtt-bridge

Orb MQTT Bridge

This bridge polls the Orb Local API and publishes network metrics to an MQTT broker.

Prerequisites

  1. Enable Orb Local API: You need to configure the Local API through the Orb Cloud interface:

    • Go to your Orb Cloud dashboard
    • Click the ellipses ("...") next to your linked Orb
    • Click "Edit"
    • Add the following configuration:
    {
      "datasets.api": [
        "identifiable=true",
        "port=8000",
        "scores_1m",
        "responsiveness_1s",
        "web_responsiveness_results",
        "speed_results",
        "reliability_1s"
      ]
    }
  2. MQTT Broker: Ensure Mosquitto (or another MQTT broker) is running

Configuration

Environment variables:

  • ORB_API_URL - URL of Orb Local API (default: http://localhost:8000)
  • MQTT_BROKER - MQTT broker address (default: localhost)
  • MQTT_PORT - MQTT broker port (default: 1883)
  • MQTT_TOPIC_PREFIX - MQTT topic prefix (default: orb)
  • POLL_INTERVAL - How often to poll Orb API in seconds (default: 60)
  • MQTT_USERNAME - MQTT username (optional)
  • MQTT_PASSWORD - MQTT password (optional)

MQTT Topics

The bridge publishes to the following topics:

Scores (Real-time)

  • orb/score/orb - Overall Orb Score
  • orb/score/responsiveness - Responsiveness Score
  • orb/score/reliability - Reliability Score
  • orb/score/speed - Speed Score

Datasets (Full data, retained)

  • orb/dataset/scores_1m - 1-minute score averages
  • orb/dataset/responsiveness_1s - Responsiveness measurements
  • orb/dataset/web_responsiveness_results - Web responsiveness details
  • orb/dataset/speed_results - Speed test results
  • orb/dataset/reliability_1s - Reliability measurements

Bridge Status

  • orb/bridge/status - Bridge online/offline status
  • orb/bridge/last_update - Timestamp of last successful poll

Usage

Docker Compose (Recommended)

The bridge is already configured in your docker-compose.yml. Just start it:

docker compose up -d orb-mqtt-bridge

Standalone Docker

cd orb-mqtt-bridge
docker build -t orb-mqtt-bridge .
docker run -d \
  --name orb-mqtt-bridge \
  --network host \
  -e ORB_API_URL=http://localhost:8000 \
  -e MQTT_BROKER=localhost \
  -e MQTT_PORT=1883 \
  orb-mqtt-bridge

Local Python

cd orb-mqtt-bridge
pip install -r requirements.txt
python orb_mqtt_bridge.py

Monitoring

View logs:

docker logs -f orb-mqtt-bridge

Subscribe to all Orb topics:

docker exec mosquitto mosquitto_sub -h localhost -t "orb/#" -v

Subscribe to just scores:

docker exec mosquitto mosquitto_sub -h localhost -t "orb/score/#" -v

Troubleshooting

Bridge shows "Failed to fetch" errors

  • Ensure Orb Local API is enabled in Orb Cloud
  • Check that Orb is accessible at the configured URL
  • Verify the Orb container is running with network_mode: host

No data in MQTT

  • Check bridge logs: docker logs orb-mqtt-bridge
  • Verify MQTT broker is running: docker ps | grep mosquitto
  • Test MQTT connection manually

API returns 404

  • The Local API needs to be enabled through Orb Cloud configuration
  • Wait a few minutes after configuration for it to take effect
  • Verify the port (default 8000) is not blocked
FROM python:3.11-slim
WORKDIR /app
# Copy requirements first for better caching
COPY requirements.txt .
# Install dependencies
RUN pip install --no-cache-dir -r requirements.txt
# Copy the bridge script
COPY orb_mqtt_bridge.py .
# Make the script executable
RUN chmod +x orb_mqtt_bridge.py
# Run the bridge
CMD ["python", "-u", "orb_mqtt_bridge.py"]

How to Enable Orb Local API

The MQTT bridge is running but waiting for the Orb Local API to be enabled. Follow these steps:

Step 1: Access Orb Cloud Dashboard

  1. Go to https://orb.net
  2. Sign in with your Orb account
  3. Navigate to your dashboard where your Orb sensor is listed

Step 2: Configure Local API

  1. Find your Orb sensor in the list (should show as "salontv2")
  2. Click the ellipses (...) or three dots menu next to your Orb
  3. Click "Edit" or "Configure"
  4. Look for a configuration or JSON editor section

Step 3: Add Local API Configuration

Paste the following JSON configuration:

{
  "datasets.api": [
    "identifiable=true",
    "port=8000",
    "scores_1m",
    "responsiveness_1s",
    "web_responsiveness_results",
    "speed_results",
    "reliability_1s"
  ]
}

Step 4: Save and Wait

  1. Save the configuration
  2. Wait 2-5 minutes for the Orb sensor to pick up the new configuration
  3. The Orb sensor will automatically start serving the Local API on port 8000

Step 5: Verify the API is Working

After a few minutes, check if the API is responding:

# Test from the host
curl http://localhost:8000/api/v2/datasets/scores_1m.json

# Or check the bridge logs
docker logs orb-mqtt-bridge --tail 20

You should see JSON data instead of connection errors.

Step 6: Monitor MQTT Data

Once the API is enabled, subscribe to all Orb MQTT topics to see the data:

# Subscribe to all Orb topics
docker exec mosquitto mosquitto_sub -h localhost -t "orb/#" -v

# Or just the scores
docker exec mosquitto mosquitto_sub -h localhost -t "orb/score/#" -v

Troubleshooting

Configuration not applying

  • Make sure you're editing the correct Orb sensor
  • Try restarting the Orb container: docker compose restart orb
  • Check Orb logs: docker logs orb --tail 50

API still not responding after 10 minutes

  • Verify the Orb sensor is online in the Orb Cloud dashboard
  • Check if port 8000 is available: netstat -tlnp | grep 8000
  • Restart both containers:
    docker compose restart orb orb-mqtt-bridge

Still having issues?

What Happens Next

Once the Orb Local API is enabled:

  1. The bridge will start fetching data every 60 seconds
  2. Network metrics will be published to MQTT topics under orb/
  3. You can integrate this data with:
    • Home Assistant
    • Node-RED
    • Grafana
    • Any MQTT-compatible system

Current Bridge Status

Check the current status anytime:

# View bridge logs
docker logs -f orb-mqtt-bridge

# Check MQTT status
docker exec mosquitto mosquitto_sub -h localhost -t "orb/bridge/#" -v
#!/usr/bin/env python3
"""
Orb MQTT Bridge
Polls Orb Local API and publishes metrics to MQTT broker
"""
import os
import sys
import json
import time
import logging
import requests
from datetime import datetime
import paho.mqtt.client as mqtt
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('orb-mqtt-bridge')
# Configuration from environment variables
ORB_API_URL = os.getenv('ORB_API_URL', 'http://localhost:8000')
ORB_ID = os.getenv('ORB_ID', 'y82dez394jdmjq5jd7mcj4td6kxd') # Orb sensor ID
MQTT_BROKER = os.getenv('MQTT_BROKER', 'localhost')
MQTT_PORT = int(os.getenv('MQTT_PORT', '1883'))
MQTT_TOPIC_PREFIX = os.getenv('MQTT_TOPIC_PREFIX', 'orb')
POLL_INTERVAL = int(os.getenv('POLL_INTERVAL', '60')) # seconds
MQTT_USERNAME = os.getenv('MQTT_USERNAME', '')
MQTT_PASSWORD = os.getenv('MQTT_PASSWORD', '')
# Datasets to fetch from Orb API
DATASETS = [
'scores_1m',
'responsiveness_1s',
'web_responsiveness_results',
'speed_results',
'reliability_1s'
]
class OrbMQTTBridge:
def __init__(self):
self.mqtt_client = None
self.connected = False
self.setup_mqtt()
def setup_mqtt(self):
"""Initialize MQTT client"""
self.mqtt_client = mqtt.Client(client_id='orb-mqtt-bridge')
if MQTT_USERNAME and MQTT_PASSWORD:
self.mqtt_client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD)
self.mqtt_client.on_connect = self.on_connect
self.mqtt_client.on_disconnect = self.on_disconnect
try:
logger.info(f"Connecting to MQTT broker at {MQTT_BROKER}:{MQTT_PORT}")
self.mqtt_client.connect(MQTT_BROKER, MQTT_PORT, 60)
self.mqtt_client.loop_start()
except Exception as e:
logger.error(f"Failed to connect to MQTT broker: {e}")
sys.exit(1)
def on_connect(self, client, userdata, flags, rc):
"""Callback for when the client connects to the broker"""
if rc == 0:
self.connected = True
logger.info("Connected to MQTT broker successfully")
else:
logger.error(f"Failed to connect to MQTT broker with code {rc}")
def on_disconnect(self, client, userdata, rc):
"""Callback for when the client disconnects from the broker"""
self.connected = False
if rc != 0:
logger.warning(f"Unexpected disconnect from MQTT broker (code {rc})")
def fetch_orb_data(self, dataset):
"""Fetch data from Orb Local API"""
url = f"{ORB_API_URL}/api/v2/datasets/{dataset}.json"
params = {'id': ORB_ID}
try:
response = requests.get(url, params=params, timeout=10)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
logger.warning(f"Failed to fetch {dataset}: {e}")
return None
def publish_to_mqtt(self, topic, payload, retain=False):
"""Publish data to MQTT broker"""
if not self.connected:
logger.warning("Not connected to MQTT broker, skipping publish")
return False
try:
result = self.mqtt_client.publish(topic, json.dumps(payload), qos=0, retain=retain)
if result.rc == mqtt.MQTT_ERR_SUCCESS:
logger.debug(f"Published to {topic}")
return True
else:
logger.error(f"Failed to publish to {topic}: {result.rc}")
return False
except Exception as e:
logger.error(f"Error publishing to MQTT: {e}")
return False
def process_scores(self, data):
"""Process and publish Orb scores"""
if not data or 'data' not in data:
return
# Publish overall scores
for entry in data['data']:
timestamp = entry.get('time', '')
# Overall Orb Score
if 'orb_score' in entry:
self.publish_to_mqtt(
f"{MQTT_TOPIC_PREFIX}/score/orb",
{
'value': entry['orb_score'],
'timestamp': timestamp
}
)
# Responsiveness Score
if 'responsiveness_score' in entry:
self.publish_to_mqtt(
f"{MQTT_TOPIC_PREFIX}/score/responsiveness",
{
'value': entry['responsiveness_score'],
'timestamp': timestamp
}
)
# Reliability Score
if 'reliability_score' in entry:
self.publish_to_mqtt(
f"{MQTT_TOPIC_PREFIX}/score/reliability",
{
'value': entry['reliability_score'],
'timestamp': timestamp
}
)
# Speed Score
if 'speed_score' in entry:
self.publish_to_mqtt(
f"{MQTT_TOPIC_PREFIX}/score/speed",
{
'value': entry['speed_score'],
'timestamp': timestamp
}
)
def process_dataset(self, dataset, data):
"""Process and publish dataset"""
if not data:
return
# Publish full dataset
self.publish_to_mqtt(
f"{MQTT_TOPIC_PREFIX}/dataset/{dataset}",
data,
retain=True
)
# Special handling for scores
if dataset == 'scores_1m':
self.process_scores(data)
def run(self):
"""Main loop"""
logger.info("Starting Orb MQTT Bridge")
logger.info(f"Orb API URL: {ORB_API_URL}")
logger.info(f"MQTT Broker: {MQTT_BROKER}:{MQTT_PORT}")
logger.info(f"Poll Interval: {POLL_INTERVAL} seconds")
# Wait for MQTT connection
wait_time = 0
while not self.connected and wait_time < 30:
logger.info("Waiting for MQTT connection...")
time.sleep(1)
wait_time += 1
if not self.connected:
logger.error("Failed to connect to MQTT broker after 30 seconds")
sys.exit(1)
# Publish status
self.publish_to_mqtt(
f"{MQTT_TOPIC_PREFIX}/bridge/status",
{
'status': 'online',
'timestamp': datetime.utcnow().isoformat()
},
retain=True
)
while True:
try:
logger.info("Polling Orb API...")
for dataset in DATASETS:
data = self.fetch_orb_data(dataset)
if data:
self.process_dataset(dataset, data)
logger.info(f"Successfully published {dataset}")
# Publish last update timestamp
self.publish_to_mqtt(
f"{MQTT_TOPIC_PREFIX}/bridge/last_update",
{
'timestamp': datetime.utcnow().isoformat()
},
retain=True
)
logger.info(f"Sleeping for {POLL_INTERVAL} seconds...")
time.sleep(POLL_INTERVAL)
except KeyboardInterrupt:
logger.info("Shutting down...")
self.publish_to_mqtt(
f"{MQTT_TOPIC_PREFIX}/bridge/status",
{
'status': 'offline',
'timestamp': datetime.utcnow().isoformat()
},
retain=True
)
self.mqtt_client.loop_stop()
self.mqtt_client.disconnect()
break
except Exception as e:
logger.error(f"Error in main loop: {e}", exc_info=True)
time.sleep(10)
if __name__ == '__main__':
bridge = OrbMQTTBridge()
bridge.run()
paho-mqtt==1.6.1
requests==2.31.0
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment