Skip to content

Instantly share code, notes, and snippets.

@jasonnerothin
Last active November 4, 2025 18:29
Show Gist options
  • Select an option

  • Save jasonnerothin/150829c21315d888360324c2d3b79928 to your computer and use it in GitHub Desktop.

Select an option

Save jasonnerothin/150829c21315d888360324c2d3b79928 to your computer and use it in GitHub Desktop.
Redis side car - message counter
#!/usr/bin/env python3
"""
Redis Channel Monitor - Monitors message throughput on Redis channels
and reports metrics to OpenTelemetry Collector.
"""
import os
import sys
import time
import logging
import redis
from opentelemetry import metrics
from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.sdk.resources import Resource
# Configure logging
log_level = os.getenv('LOG_LEVEL', 'INFO').upper()
logging.basicConfig(
level=getattr(logging, log_level, logging.INFO),
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(sys.stdout)
]
)
# Force flush to ensure logs appear immediately
logging.getLogger().handlers[0].flush = lambda: sys.stdout.flush()
logger = logging.getLogger(__name__)
def main():
# Configuration from environment variables
redis_host = os.getenv('REDIS_HOST', 'localhost')
redis_port = int(os.getenv('REDIS_PORT', '6379'))
otel_endpoint = os.getenv('OTEL_COLLECTOR_BASE_URL', 'http://localhost:4318')
channel_pattern = os.getenv('MONITOR_CHANNEL', 'drone_command')
heartbeat_interval = int(os.getenv('HEARTBEAT_INTERVAL_SECS', '60'))
user_id = os.getenv('USER_ID', 'default-user-id')
logger.info("=" * 60)
logger.info("Starting Redis Channel Monitor")
logger.info("=" * 60)
logger.info(f" Redis: {redis_host}:{redis_port}")
logger.info(f" OTEL Endpoint: {otel_endpoint}")
logger.info(f" Monitoring channel: {channel_pattern}")
logger.info(f" User ID: {user_id}")
logger.info(f" Log level: {log_level}")
logger.info(f" Heartbeat interval: {heartbeat_interval}s")
logger.info("=" * 60)
# Setup OpenTelemetry
resource = Resource(attributes={
"service.name": "redis-channel-monitor",
"service.version": "1.0.0",
"user.id": user_id,
})
reader = PeriodicExportingMetricReader(
OTLPMetricExporter(endpoint=f"{otel_endpoint}/v1/metrics"),
export_interval_millis=10000 # Export every 10 seconds
)
provider = MeterProvider(resource=resource, metric_readers=[reader])
metrics.set_meter_provider(provider)
# Create metrics
meter = metrics.get_meter("redis.monitor", version="1.0.0")
message_counter = meter.create_counter(
name="redis.channel.messages.total",
description="Total number of messages published to Redis channel",
unit="messages"
)
# Message tracking for rate calculation
message_count = {'value': 0, 'last_count': 0, 'last_time': time.time()}
def get_message_rate(options):
current_time = time.time()
time_delta = current_time - message_count['last_time']
if time_delta > 0:
count_delta = message_count['value'] - message_count['last_count']
rate = count_delta / time_delta
message_count['last_count'] = message_count['value']
message_count['last_time'] = current_time
yield metrics.Observation(rate, {"channel": channel_pattern})
# Create observable gauge with callback - compatible with OTEL 1.17.0
message_rate = meter.create_observable_gauge(
name="redis.channel.messages.rate",
description="Message rate per second",
unit="messages/s",
callbacks=[get_message_rate]
)
# Connect to Redis with retry logic
max_retries = 5
retry_delay = 5
for attempt in range(max_retries):
try:
r = redis.Redis(host=redis_host, port=redis_port, decode_responses=True)
r.ping()
logger.info(f"Connected to Redis at {redis_host}:{redis_port}")
break
except redis.ConnectionError as e:
if attempt < max_retries - 1:
logger.warning(f"Failed to connect to Redis (attempt {attempt + 1}/{max_retries}): {e}")
logger.info(f"Retrying in {retry_delay} seconds...")
time.sleep(retry_delay)
else:
logger.error(f"Could not connect to Redis after {max_retries} attempts")
sys.exit(1)
# Subscribe to channel
pubsub = r.pubsub()
pubsub.psubscribe(channel_pattern)
logger.info(f"Subscribed to channel pattern: {channel_pattern}")
logger.info("Monitoring messages... (waiting for data)")
# Listen for messages
message_types_seen = set()
last_heartbeat = time.time()
try:
for message in pubsub.listen():
# Periodic heartbeat
current_time = time.time()
if current_time - last_heartbeat > heartbeat_interval:
logger.info(f"Heartbeat: Still monitoring. Total messages: {message_count['value']}")
last_heartbeat = current_time
msg_type = message['type']
# Track what message types we see for debugging
if msg_type not in message_types_seen:
message_types_seen.add(msg_type)
logger.info(f"Received message type: {msg_type}")
logger.debug(f"Raw message: {message}")
if msg_type == 'pmessage':
channel = message['channel']
message_count['value'] += 1
message_counter.add(1, {"channel": channel})
# Log first message immediately
if message_count['value'] == 1:
logger.info(f"First message received on {channel}!")
# Log every 10 messages at info level
if message_count['value'] % 10 == 0:
logger.info(f"Processed {message_count['value']} messages on {channel}")
# Also log every message at debug level
logger.debug(f"Message #{message_count['value']} on {channel}: {message.get('data', '')[:100]}")
elif msg_type == 'psubscribe':
logger.info(f"✓ Successfully subscribed to pattern: {message['pattern']}")
except KeyboardInterrupt:
logger.info("Shutting down...")
except Exception as e:
logger.error(f"Error: {e}", exc_info=True)
sys.exit(1)
finally:
pubsub.close()
logger.info(f"Monitor stopped. Total messages processed: {message_count['value']}")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment