Last active
November 4, 2025 18:29
-
-
Save jasonnerothin/150829c21315d888360324c2d3b79928 to your computer and use it in GitHub Desktop.
Redis side car - message counter
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Redis Channel Monitor - Monitors message throughput on Redis channels | |
| and reports metrics to OpenTelemetry Collector. | |
| """ | |
| import os | |
| import sys | |
| import time | |
| import logging | |
| import redis | |
| from opentelemetry import metrics | |
| from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter | |
| from opentelemetry.sdk.metrics import MeterProvider | |
| from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader | |
| from opentelemetry.sdk.resources import Resource | |
| # Configure logging | |
| log_level = os.getenv('LOG_LEVEL', 'INFO').upper() | |
| logging.basicConfig( | |
| level=getattr(logging, log_level, logging.INFO), | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
| handlers=[ | |
| logging.StreamHandler(sys.stdout) | |
| ] | |
| ) | |
| # Force flush to ensure logs appear immediately | |
| logging.getLogger().handlers[0].flush = lambda: sys.stdout.flush() | |
| logger = logging.getLogger(__name__) | |
| def main(): | |
| # Configuration from environment variables | |
| redis_host = os.getenv('REDIS_HOST', 'localhost') | |
| redis_port = int(os.getenv('REDIS_PORT', '6379')) | |
| otel_endpoint = os.getenv('OTEL_COLLECTOR_BASE_URL', 'http://localhost:4318') | |
| channel_pattern = os.getenv('MONITOR_CHANNEL', 'drone_command') | |
| heartbeat_interval = int(os.getenv('HEARTBEAT_INTERVAL_SECS', '60')) | |
| user_id = os.getenv('USER_ID', 'default-user-id') | |
| logger.info("=" * 60) | |
| logger.info("Starting Redis Channel Monitor") | |
| logger.info("=" * 60) | |
| logger.info(f" Redis: {redis_host}:{redis_port}") | |
| logger.info(f" OTEL Endpoint: {otel_endpoint}") | |
| logger.info(f" Monitoring channel: {channel_pattern}") | |
| logger.info(f" User ID: {user_id}") | |
| logger.info(f" Log level: {log_level}") | |
| logger.info(f" Heartbeat interval: {heartbeat_interval}s") | |
| logger.info("=" * 60) | |
| # Setup OpenTelemetry | |
| resource = Resource(attributes={ | |
| "service.name": "redis-channel-monitor", | |
| "service.version": "1.0.0", | |
| "user.id": user_id, | |
| }) | |
| reader = PeriodicExportingMetricReader( | |
| OTLPMetricExporter(endpoint=f"{otel_endpoint}/v1/metrics"), | |
| export_interval_millis=10000 # Export every 10 seconds | |
| ) | |
| provider = MeterProvider(resource=resource, metric_readers=[reader]) | |
| metrics.set_meter_provider(provider) | |
| # Create metrics | |
| meter = metrics.get_meter("redis.monitor", version="1.0.0") | |
| message_counter = meter.create_counter( | |
| name="redis.channel.messages.total", | |
| description="Total number of messages published to Redis channel", | |
| unit="messages" | |
| ) | |
| # Message tracking for rate calculation | |
| message_count = {'value': 0, 'last_count': 0, 'last_time': time.time()} | |
| def get_message_rate(options): | |
| current_time = time.time() | |
| time_delta = current_time - message_count['last_time'] | |
| if time_delta > 0: | |
| count_delta = message_count['value'] - message_count['last_count'] | |
| rate = count_delta / time_delta | |
| message_count['last_count'] = message_count['value'] | |
| message_count['last_time'] = current_time | |
| yield metrics.Observation(rate, {"channel": channel_pattern}) | |
| # Create observable gauge with callback - compatible with OTEL 1.17.0 | |
| message_rate = meter.create_observable_gauge( | |
| name="redis.channel.messages.rate", | |
| description="Message rate per second", | |
| unit="messages/s", | |
| callbacks=[get_message_rate] | |
| ) | |
| # Connect to Redis with retry logic | |
| max_retries = 5 | |
| retry_delay = 5 | |
| for attempt in range(max_retries): | |
| try: | |
| r = redis.Redis(host=redis_host, port=redis_port, decode_responses=True) | |
| r.ping() | |
| logger.info(f"Connected to Redis at {redis_host}:{redis_port}") | |
| break | |
| except redis.ConnectionError as e: | |
| if attempt < max_retries - 1: | |
| logger.warning(f"Failed to connect to Redis (attempt {attempt + 1}/{max_retries}): {e}") | |
| logger.info(f"Retrying in {retry_delay} seconds...") | |
| time.sleep(retry_delay) | |
| else: | |
| logger.error(f"Could not connect to Redis after {max_retries} attempts") | |
| sys.exit(1) | |
| # Subscribe to channel | |
| pubsub = r.pubsub() | |
| pubsub.psubscribe(channel_pattern) | |
| logger.info(f"Subscribed to channel pattern: {channel_pattern}") | |
| logger.info("Monitoring messages... (waiting for data)") | |
| # Listen for messages | |
| message_types_seen = set() | |
| last_heartbeat = time.time() | |
| try: | |
| for message in pubsub.listen(): | |
| # Periodic heartbeat | |
| current_time = time.time() | |
| if current_time - last_heartbeat > heartbeat_interval: | |
| logger.info(f"Heartbeat: Still monitoring. Total messages: {message_count['value']}") | |
| last_heartbeat = current_time | |
| msg_type = message['type'] | |
| # Track what message types we see for debugging | |
| if msg_type not in message_types_seen: | |
| message_types_seen.add(msg_type) | |
| logger.info(f"Received message type: {msg_type}") | |
| logger.debug(f"Raw message: {message}") | |
| if msg_type == 'pmessage': | |
| channel = message['channel'] | |
| message_count['value'] += 1 | |
| message_counter.add(1, {"channel": channel}) | |
| # Log first message immediately | |
| if message_count['value'] == 1: | |
| logger.info(f"First message received on {channel}!") | |
| # Log every 10 messages at info level | |
| if message_count['value'] % 10 == 0: | |
| logger.info(f"Processed {message_count['value']} messages on {channel}") | |
| # Also log every message at debug level | |
| logger.debug(f"Message #{message_count['value']} on {channel}: {message.get('data', '')[:100]}") | |
| elif msg_type == 'psubscribe': | |
| logger.info(f"✓ Successfully subscribed to pattern: {message['pattern']}") | |
| except KeyboardInterrupt: | |
| logger.info("Shutting down...") | |
| except Exception as e: | |
| logger.error(f"Error: {e}", exc_info=True) | |
| sys.exit(1) | |
| finally: | |
| pubsub.close() | |
| logger.info(f"Monitor stopped. Total messages processed: {message_count['value']}") | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment