Skip to content

Instantly share code, notes, and snippets.

@psenger
Last active December 26, 2025 09:27
Show Gist options
  • Select an option

  • Save psenger/d01ea1b140b58cf93b66291cd92b9ca8 to your computer and use it in GitHub Desktop.

Select an option

Save psenger/d01ea1b140b58cf93b66291cd92b9ca8 to your computer and use it in GitHub Desktop.
[ADE] Server-Sent Events Implementation Strategy #adr #server-side-events #sse #realtime #eda

Architecture Design Registry - Server-Sent Events Implementation Strategy

Project Information

  • Project Description: Collaborative platform that enables organisations to manage projects, contributions, solutions, and voting mechanisms with AI-driven analysis
  • Date Created: September 2025
  • Last Updated: September 2025

Table of Contents

1. Introduction

1.1 Purpose

This architecture decision record (ADR) analyses Server-Sent Events (SSE) implementation strategies for a distributed NodeJS environment without session affinity for project event notifications in a collaborative platform.

1.2 Scope

This ADR covers the implementation of Server-Sent Events (SSE) infrastructure for distributed NodeJS cluster without session affinity. The architecture provides real-time event streaming capabilities where external systems can publish events to specific channels and connected users receive notifications. This is purely an SSE implementation - the actual subscription management and alert logic are handled by separate services/projects using proxy patterns around repositories. However, this ADR can not be defined in a vacuum and consequently will include demo code and concepts that will show how it is intended to be used.

2. Architectural Overview

2.1 High-Level Architecture

This document analyses Server-Sent Events (SSE) implementation strategies for a distributed NodeJS environment without session affinity.

What is session affinity? Session Affinity (also called Sticky Sessions) forces all requests from a client to be routed to the same server. This is typically achieved through load balancer configuration that uses cookies or IP addresses to maintain client-server affinity.

Why Avoid Session Affinity?

  • Reduced scalability: Cannot freely distribute load across servers
  • Complex failover: If a server fails, all its "stuck" clients lose their sessions
  • Inefficient resource usage: Some servers may be overloaded while others are idle
  • Deployment challenges: Rolling updates become complex when clients are pinned to servers
  • Cloud-native incompatibility: Contradicts stateless, horizontally-scalable architecture principles

The architecture must support event-driven workflows, where users receive various types of events (such as solution additions, project changes, messages, and system state changes). Originally intended for project state changes, users can specify the alerts they desire. With external workers publishing events to a durable message bus (Kafka, Azure Service Bus, or AWS SQS), a notification worker processes these events and delivers targeted notifications via SSE, using Redis for coordination. This is important because external resources (such as workers) may need to signal to the client that a server event has occurred.

Key Critical Finding: Retain the durable message bus for event publishing from external workers, using Redis solely for SSE coordination (pub/sub and subscription caching) across multiple NodeJS servers without session affinity. Dedicated services like Mercure could provide better scalability if needed but for this release we will focus on Redis and Coordination within the NodeJS API servers.


3. Design Details

3.1 The Challenge

Without session affinity, a client's SSE connection goes to a random server, but events might originate from any server in the cluster. We need a way to route messages to the correct server. Additionally, we face security challenges with token lifecycle management.

sequenceDiagram
    participant Client
    participant LoadBalancer
    participant Server1
    participant Server2
    participant Server3
    participant TokenSystem

    Client->>LoadBalancer: SSE Connection Request
    LoadBalancer->>Server1: Randomly Assigns Connection
    Note over Server1: Client connected, but events may originate elsewhere
    Server2->>LoadBalancer: Event Generated
    Server3->>LoadBalancer: Event Generated
    Note over LoadBalancer: Challenge: Route events to correct server
    LoadBalancer-->>Server1: Needs routing solution
    Client->>TokenSystem: Request Token
    TokenSystem-->>Client: Issues Token
    Note over TokenSystem: Security Challenge: Token Lifecycle Management
Loading

3.2 The Solution: Cache Service with Pub/Sub Coordination

sequenceDiagram
    participant C1 as Client Browser Tab 1
    participant C2 as Client Browser Tab 2
    participant M as Mobile Device
    participant N as Nginx (Load Balancer)
    participant S1 as API Server 1
    participant S2 as API Server 2
    participant S3 as API Server 3
    participant PS as PubSub Service
    participant CS as Cache Service
    Note over C1: Multiple Connection Support
    C1 ->> N: GET /sse/connect (tab 1)
    N ->> S2: Forward request (random)
    S2 ->> S2: Create connection ID: conn-123
    S2 ->> CS: Store connection metadata
    S2 ->> PS: Subscribe to user:userId (if first)
    S2 -->> C1: event: connected\ndata: {"connectionId": "conn-123"}
    
    C2 ->> N: GET /sse/connect (tab 2)
    N ->> S2: Forward request (may hit same server)
    S2 ->> S2: Create connection ID: conn-456
    S2 ->> CS: Add to existing user connections
    Note over S2: User now has 2 connections on S2
    S2 -->> C2: event: connected\ndata: {"connectionId": "conn-456"}
    
    M ->> N: GET /sse/connect (mobile)
    N ->> S1: Forward request (different server)
    S1 ->> S1: Create connection ID: conn-789
    S1 ->> CS: Store connection metadata
    S1 ->> PS: Subscribe to user:userId
    S1 -->> M: event: connected\ndata: {"connectionId": "conn-789"}
    Note over C1: Event Publishing Flow
    C1 ->> N: POST /api/send-message
    N ->> S3: Forward request (any server)
    S3 ->> S3: Process message/event
    S3 ->> PS: Publish to user:userId channel
    
    Note over PS, S3: Message Distribution to All Connections
    PS -->> S1: Message via subscription
    S1 ->> S1: Find connections for userId
    S1 -->> M: event: message\ndata: {"text": "Hello"}
    
    PS -->> S2: Message via subscription
    S2 ->> S2: Find connections for userId (has 2!)
    S2 -->> C1: event: message\ndata: {"text": "Hello"}
    S2 -->> C2: event: message\ndata: {"text": "Hello"}
    
    PS -->> S3: Message (no connections)
    Note over S3: S3 ignores - no connections
    
    Note over C1: Connection Cleanup
    C2 --x N: Tab closed
    N --x S2: Connection terminated
    S2 ->> S2: Remove conn-456 from Set
    S2 ->> CS: Remove connection metadata
    Note over S2: Still has conn-123, keeps subscription
    
    C1 --x N: Last tab closed
    N --x S2: Connection terminated
    S2 ->> S2: Remove conn-123 from Set
    S2 ->> CS: Remove connection metadata
    S2 ->> PS: Unsubscribe from user:userId
    Note over S2: No more connections for this user
Loading

3.3 Detailed Flow Explanation

3.3.1 Connection Phase (Client → Any API Server)

// Client connects to any available server (multiple tabs/devices supported)
const eventSource = new EventSource('/sse/connect');

// Nginx randomly selects Server 2
// Server 2 handles the connection:
// - Validates authentication
// - Generates unique connection ID (includes timestamp, device info)
// - Adds connection to user's connection Set in local memory
// - Registers connection in Cache Service with connection ID
// - Subscribes to user's channel via PubSub Service (if first connection)
// - Keeps HTTP connection open

// Connection tracking structure:
// localConnections = Map<userId, Set<{
//   connectionId: string,
//   response: ServerResponse,
//   deviceInfo: string,
//   connectedAt: Date
// }>>

3.3.2 Message Publishing (Any API Server → PubSub)

With this system we can coordinate internal messages via an API, a nice side effect that we can also use to test critical componets.

// Any API server can publish messages
// Example: Server 1 receives a POST request
app.post('/api/send-message', async (req, res) => {
    // Save to MongoDB
    await db.messages.insert({...});

    // Publish via PubSub - Server 1 doesn't need the connection!
    await pubSubService.publish(`user:${targetUserId}`, JSON.stringify({
        event: 'message',
        data: messageData
    }));
});

3.3.3 Message Distribution (PubSub → All Servers)

// ALL servers receive the PubSub message
// Each server checks if it has ANY connections for this user

// Server 2 (has one or more connections for this user):
// This is handled internally by the PubSub service subscription
pubSubService.subscribe(`user:${userId}`, (message) => {
    // userId is already known from the subscription
    // CRITICAL: User may have multiple connections (tabs, devices)
    //           and the connections could be scattered across the cluster
    const userConnections = localConnections.get(userId); // Returns Set or Array
    
    if (userConnections && userConnections.size > 0) {
        // Send to ALL connections for this user on this server
        userConnections.forEach(connection => {
            try {
                connection.write(`event: message\ndata: ${message}\n\n`);
            } catch (err) {
                // Connection might be dead, clean it up
                userConnections.delete(connection);
                if (userConnections.size === 0) {
                    // No more connections for this user on this server
                    localConnections.delete(userId);
                    pubSubService.unsubscribe(`user:${userId}`);
                }
            }
        });
    }
    // Servers 1 & 3: No local connections for this user, ignore message
});

// Connection Storage Structure:
// localConnections = Map<userId, Set<connectionObject>>
// Each user can have multiple connection objects (different tabs/devices)

3.3.4 Why This Works Without Session Affinity

  • Connection State: Each server only maintains its own local connections
  • Redis Pub/Sub: Acts as a message broker between all servers
  • Channel Subscription: Servers only subscribe to channels for their connected users
  • Stateless Publishing: Any server can publish without knowing where connections live

4. Data Flow

4.1 Requirements

  • Handle 10,000+ concurrent SSE connections
  • No session affinity / sticky sessions (by design choice)
  • Work with existing MongoDB and Message Bus (Kafka/Azure Service Bus)
  • Use abstracted Cache Service (Redis implementation with circuit breaker)
  • NO direct database access - All data operations MUST go through Repository pattern
  • Resilience: System must continue functioning if cache service fails (without real-time features)
  • Circuit Breaker: Automatic fallback when cache service is unavailable
  • Seamless failover if a server dies
  • Prevent message duplication
  • Support both local and production environments
  • External systems can publish events to SSE channels
  • System handles thousands of concurrent SSE connections
  • Worker-based event publishing from message bus
  • Cache-based subscription coordination with graceful degradation
  • Delivers any event types as requested by external systems

4.2 Resilience Strategy

When the cache service (Redis) fails:

  1. Circuit breaker activates - Prevents cascading failures
  2. SSE endpoints return 503 - Service temporarily unavailable
  3. System continues operating - Core functionality remains intact
  4. Real-time features disabled - Clients must poll or wait
  5. Automatic recovery - When cache returns, SSE re-enables

4.3 Why No Session Affinity / Sticky Sessions?

  1. Better load distribution - Connections spread evenly
  2. Improved fault tolerance - Client reconnects to any available server
  3. Easier scaling - Add/remove servers without session migration
  4. Simplified deployment - No session affinity configuration

4.4 Critical Architectural Constraint

Our load balancer does not use session affinity by design. This means:

  • Any user's SSE connection can land on any NodeJS server
  • A user might have multiple connections across different servers (multiple tabs)
  • Notifications must reach users regardless of which server hosts their connection
  • Redis Pub/Sub becomes essential for cross-server coordination

5. Deployment Strategy

graph TB
    subgraph "Current Setup"
        LB[Nginx Load Balancer<br/>Without Session Affinity]

        subgraph "API Cluster"
            API1[NodeJS API 1]
            API2[NodeJS API 2]
            API3[NodeJS API 3]
        end

        subgraph "Infrastructure"
            REDIS[(Redis)]
            ASB[Azure Message Bus /<br/>AWS SQS /<br/>Kafka]
        end

        CLIENT[Clients] --> LB
        LB --> API1
        LB --> API2
        LB --> API3
        API1 <--> REDIS
        API2 <--> REDIS
        API3 <--> REDIS
        API1 --> ASB
        API2 --> ASB
        API3 --> ASB
    end
Loading

5.1 Current Event Flow

graph LR
    subgraph "Current System"
        PROJECT[Project Service] --> EMITTER[Global Event Emitter]
        EMITTER --> PRODUCER[MessageProducer<br/>message-producer.js]
        PRODUCER --> MB[Azure Message Bus /<br/>AWS SQS /<br/>Kafka]
        MB --> DOWNSTREAM[Downstream Systems]
    end
Loading

5.2 Required Enhancement with SSE

graph TB

    subgraph Production Infra Structure
        REDIS((Redis Cache))
        MB[Azure Message Bus /<br/>AWS SQS /<br/>Kafka]
        DB[(Mongo DataBase)]
    end

    subgraph API
        PROJECT[Project Service] --> REPO
        REPO --> DB
        REPO --> EMITTER[Global Event Emitter]
        EMITTER --> PRODUCER[MessageProducer<br/>message-producer.js]
        PRODUCER --> MB
        SSE[SSE Service] --> REDIS
        SSE --> USERS[Connected Users]
    end

    subgraph Worker
        MB --> WORKER[Event Notification Worker]
        WORKER --> REDIS
    end

    subgraph Down Stream
        MB --> DOWNSTREAM[Other Systems]
    end
Loading

6. Infrastructure

6.1 Data Access Layer - Repository Pattern

CRITICAL: SSE Service should not directly access repositories. Database changes that need to trigger SSE events should use a proxy pattern around repositories to call the SSE Service. This is for another ADR and is only here for Demonstration Purposes.

6.1.1 Repository Pattern Implementation with Injection

CRITICAL: this is not part of this ADR, it is here for demonstration purposes only.

// project-repository.js - Module Factory Pattern
module.exports = ({
    User,           // Mongoose model
    Organization,   // Mongoose model  
    Role,          // Mongoose model
    Project,       // Mongoose model
    mongoose,
    mongoSaveOptions,
    cryptology,
    globalEventEmitter,
    calculateLeastPrivilegePermissionsDictionary,
    compactLeastPrivilegePermissionsDictionary,
    SSERepositoryProxyInstance,
}) => {
    // Repository functions for User operations
    const updateProjectStatus = async (projectId, changes) => { /** ... update the status of a project ... **/  }
    return {
        updateProjectStatus: SSERepositoryProxyInstance.proxy( updateProjectStatus, 'key-to-send-to-event-subscribers' ),
    }
}

6.1.2 Repository Proxy

Repository changes that need to trigger SSE events should use a proxy pattern. Again this is here ONLY for demonstration purposes and is NOT part of this ADR.

// Repository Proxy for SSE Integration
class SSERepositoryProxy {
    constructor( globalEventEmiiter ) {
        this.globalEventEmiiter = globalEventEmiiter;
    }
    
    async async proxy(fn, changes) {
        return async ( ..args ) => {
            globalEventEmiiter.emit( 'sse-event', xxxx )
            return fn( ..args )
        };
    }
}

6.2 Cache Service Integration - Module Factory Pattern

CRITICAL: The SSE Service follows the Module Factory pattern for dependency injection, consistent with the rest of the codebase.

6.2.1 Cache Service Requirements for SSE

The SSE Service requires extended cache operations beyond basic get/set:

6.2.1.1 Required Cache Operations
  1. Connection Management

    • Store/retrieve active connection information per server
    • Hash operations for server→user→connection mappings
    • Connection metadata storage with TTL
  2. Subscription Caching

    • Cache user's SSE channel subscriptions
    • Set operations for channel→subscribers mappings
    • Set operations for user→channels mappings
    • TTL-based cache invalidation
  3. Pending Notifications

    • Sorted set operations for time-ordered notification queues
    • Automatic expiry for old notifications
    • Batch retrieval and clearing
  4. Fallback Handling

    • Graceful degradation when cache is disabled
    • In-memory fallback for critical operations
    • Repository fallback for data retrieval

6.2.2 Extended Cache Service Operations

6.2.2.1 Terminology Clarification
  • Members: Individual items in a set (e.g., user IDs in a project's subscriber list)
  • Identifier: The unique key for the entity (e.g., projectId, userId, 'global')
  • Set: A collection of unique values (e.g., all users subscribed to project-123)
  • Sorted Set: A collection ordered by score (e.g., notifications ordered by timestamp)
6.2.2.2 SSE Channel Types Supported

This architecture supports multiple SSE channel scopes:

  1. User-Specific Channels (user:${userId})

    • Direct messages to a specific user
    • Personal notifications
  2. Entity Channels (entity:${entityType}:${entityId})

    • Users subscribed to specific entities
    • Entity updates, state changes
  3. Global Broadcasts (broadcast:global)

    • System-wide announcements to all connected users
    • Maintenance notices, feature releases
  4. Topic-Based Channels (extensible pattern)

    • topic:${topicName} - Topic subscribers
    • category:${categoryId} - Category followers
    • Any pattern following channelType:identifier
// Additional CacheKeys needed for SSE
const CacheKeys = {
    // ... existing keys ...
    SSE_CONNECTION: Symbol('sseConnection'),
    SSE_SERVER_CONNECTIONS: Symbol('sseServerConnections'),
    
    // SSE subscription keys by type
    USER_SUBSCRIPTIONS: Symbol('userSubscriptions'),      // user -> channels
    CHANNEL_SUBSCRIBERS: Symbol('channelSubscribers'),    // channel -> users
    GLOBAL_SUBSCRIBERS: Symbol('globalSubscribers'),      // all connected users
    TOPIC_SUBSCRIBERS: Symbol('topicSubscribers'),        // topic -> users
    
    PENDING_NOTIFICATIONS: Symbol('pendingNotifications'),
}

/**
 * Extended CacheService methods required for SSE support
 * @class CacheService
 * @extends BaseCacheService
 */
class CacheService {
    /**
     * Add members to a Set (for subscription tracking)
     * @async
     * @param {Symbol} keyType - Cache key type (e.g., PROJECT_SUBSCRIBERS)
     * @param {string} identifier - Entity ID (e.g., 'project-123', 'org-456', 'global')
     * @param {string|string[]} members - User ID(s) to add to subscription
     * @param {Object} [options] - Options
     * @param {Function} [options.onDisabled] - Callback when cache disabled
     * @returns {Promise<number|null>} Number added or null if disabled
     * 
     * @example
     * // Subscribe user-789 to entity channel
     * await cache.addToSet(CacheKeys.CHANNEL_SUBSCRIBERS, 'entity:project:123', 'user-789')
     * 
     * // Subscribe multiple users to topic
     * await cache.addToSet(CacheKeys.TOPIC_SUBSCRIBERS, 'ai-research', ['user-1', 'user-2'])
     * 
     * // Add user to global broadcast list
     * await cache.addToSet(CacheKeys.GLOBAL_SUBSCRIBERS, 'global', userId)
     */
    async addToSet(keyType, identifier, members, options)
    
    /**
     * Remove members from a Set
     * @async
     * @param {Symbol} keyType - Cache key type symbol
     * @param {string} identifier - Unique identifier
     * @param {string|string[]} members - Member(s) to remove
     * @param {Object} [options] - Options
     * @param {Function} [options.onDisabled] - Callback when cache disabled
     * @returns {Promise<number|null>} Number removed or null if disabled
     */
    async removeFromSet(keyType, identifier, members, options)
    
    /**
     * Get all members of a Set
     * @async
     * @param {Symbol} keyType - Cache key type symbol
     * @param {string} identifier - Unique identifier
     * @param {Object} [options] - Options
     * @param {Function} [options.onDisabled] - Callback when cache disabled
     * @returns {Promise<string[]>} Array of members or empty array
     */
    async getSetMembers(keyType, identifier, options)
    
    /**
     * Add member to Sorted Set with score (for pending notifications)
     * @async
     * @param {Symbol} keyType - Cache key type symbol
     * @param {string} identifier - Unique identifier
     * @param {number} score - Score (typically timestamp)
     * @param {*} member - Member to add (will be JSON stringified)
     * @param {Object} [options] - Options
     * @param {Function} [options.onDisabled] - Callback when cache disabled
     * @returns {Promise<number|null>} 1 if added, 0 if updated, null if disabled
     */
    async addToSortedSet(keyType, identifier, score, member, options)
    
    /**
     * Get range from Sorted Set
     * @async
     * @param {Symbol} keyType - Cache key type symbol
     * @param {string} identifier - Unique identifier
     * @param {number} start - Start index (0-based, can be negative)
     * @param {number} stop - Stop index (-1 for last)
     * @param {Object} [options] - Options
     * @param {Function} [options.onDisabled] - Callback when cache disabled
     * @returns {Promise<Array>} Array of parsed members or empty array
     */
    async getSortedSetRange(keyType, identifier, start, stop, options)
    
    /**
     * Set field in Hash (for connection metadata)
     * @async
     * @param {Symbol} keyType - Cache key type symbol
     * @param {string} identifier - Unique identifier
     * @param {string} field - Field name
     * @param {*} value - Value (will be JSON stringified)
     * @param {Object} [options] - Options
     * @param {Function} [options.onDisabled] - Callback when cache disabled
     * @returns {Promise<number|null>} 1 if new, 0 if updated, null if disabled
     */
    async setHashField(keyType, identifier, field, value, options)
    
    /**
     * Get field from Hash
     * @async
     * @param {Symbol} keyType - Cache key type symbol
     * @param {string} identifier - Unique identifier
     * @param {string} field - Field name
     * @param {Object} [options] - Options
     * @param {Function} [options.onDisabled] - Callback when cache disabled
     * @returns {Promise<*|null>} Parsed value or null
     */
    async getHashField(keyType, identifier, field, options)
    
    /**
     * Remove field from Hash
     * @async
     * @param {Symbol} keyType - Cache key type symbol
     * @param {string} identifier - Unique identifier
     * @param {string} field - Field name
     * @param {Object} [options] - Options
     * @param {Function} [options.onDisabled] - Callback when cache disabled
     * @returns {Promise<number|null>} Number removed or null if disabled
     */
    async removeHashField(keyType, identifier, field, options)
    
    /**
     * Get all fields from Hash
     * @async
     * @param {Symbol} keyType - Cache key type symbol
     * @param {string} identifier - Unique identifier
     * @param {Object} [options] - Options
     * @param {Function} [options.onDisabled] - Callback when cache disabled
     * @returns {Promise<Object>} Object with field-value pairs or empty object
     */
    async getAllHashFields(keyType, identifier, options)
}

// Implementation details:
// - All methods follow existing assert validation patterns
// - All methods support onDisabled callback for graceful degradation
// - Complex values are JSON stringified/parsed automatically
// - Methods return null when cache disabled, appropriate types otherwise
// - Consistent error handling and logging patterns maintained
// - The implementation should be swapable so that other vendor solutions can be injected in.

6.2.3 SSE Service Module Factory Implementation

/**
 * SSE Service Module Factory
 * Manages Server-Sent Events connections without session affinity
 * @module services/sse-service
 * @param {Object} dependencies - Injected dependencies
 * @param {Object} dependencies.cacheService - Cache service for connection tracking
 * @param {Object} dependencies.pubSubService - PubSub service for cross-server communication
 * @param {Object} dependencies.CacheKeys - Cache key symbols
 * @param {Object} dependencies.logger - Logger instance
 * @param {string} [dependencies.serverId] - Server identifier
 * @param {number} [dependencies.heartbeatInterval] - Heartbeat interval in ms
 * @param {number} [dependencies.connectionTimeout] - Connection timeout in ms
 * @param {number} [dependencies.maxPendingNotifications] - Max pending notifications
 * @returns {ProjectSSEService} SSE service instance
 */
module.exports = ({
    // Required dependencies
    cacheService,       // Cache service instance
    pubSubService,      // PubSub service instance
    CacheKeys,          // Cache key symbols
    
    // Optional dependencies with defaults (for testing overrides)
    serverId = `server-${process.pid}`,
    heartbeatInterval = 30000,
    connectionTimeout = 120000,
    maxPendingNotifications = 100,
    logger = console
}) => {
    
    /**
     * @class ProjectSSEService
     * @description Manages SSE connections and event distribution
     */
    class ProjectSSEService {
        constructor() {
            this.serverId = serverId;
            this.cache = cacheService;
            this.pubSub = pubSubService;
            this.logger = logger;
            
            // Local in-memory tracking (per server)
            this.connections = new Map();
            this.userConnections = new Map();
            this.subscriptions = new Map();
            
            // Start maintenance tasks
            this.startHeartbeat();
        }
        
        /**
         * Handles new SSE connection from client
         * @async
         * @param {Object} req - Express request object
         * @param {Object} res - Express response object
         * @returns {Promise<void>}
         * @throws {Error} If cache service is unavailable
         */
        async handleSSEConnection(req, res) {
            // Check if services are available
            if (!this.cache.isAvailable() || !this.pubSub.isAvailable()) {
                res.status(503).json({
                    error: 'Real-time service temporarily unavailable',
                    fallback: 'Please use polling endpoints'
                });
                return;
            }
            
            const userId = req.user.id;
            const connectionId = this.generateConnectionId();
            
            // Set SSE headers
            res.writeHead(200, {
                'Content-Type': 'text/event-stream',
                'Cache-Control': 'no-cache',
                'Connection': 'keep-alive',
                'X-Accel-Buffering': 'no'
            });
            
            // Store connection locally
            this.storeLocalConnection(connectionId, userId, req, res);
            
            // Register in cache with metadata
            await this.registerConnectionInCache(connectionId, userId, req);
            
            // Subscribe to user's channel
            await this.subscribeToUserChannel(userId);
            
            // Send initial connection event
            this.sendEvent(res, 'connected', {
                connectionId,
                serverId: this.serverId
            });
            
            // SSE connection established - subscriptions are managed externally
            
            // Handle disconnect
            req.on('close', () => {
                this.handleDisconnect(connectionId, userId);
            });
        }
        
        /**
         * Stores connection in local memory (supports multiple connections per user)
         * @private
         * @param {string} connectionId - Connection identifier
         * @param {string} userId - User identifier
         * @param {Object} req - Request object
         * @param {Object} res - Response object
         */
        storeLocalConnection(connectionId, userId, req, res) {
            const connection = {
                id: connectionId,
                userId,
                request: req,
                response: res,
                deviceInfo: req.headers['user-agent'] || 'unknown',
                createdAt: Date.now(),
                lastActivity: Date.now()
            };
            
            this.connections.set(connectionId, connection);
            
            // Support multiple connections per user (tabs, devices)
            if (!this.userConnections.has(userId)) {
                this.userConnections.set(userId, new Set());
            }
            this.userConnections.get(userId).add(connectionId);
            
            this.logger.debug(`Connection ${connectionId} established for user ${userId} (${this.userConnections.get(userId).size} active connections)`);
        }
        
        /**
         * Registers connection in cache service
         * @private
         * @async
         * @param {string} connectionId - Connection identifier
         * @param {string} userId - User identifier
         * @param {Object} req - Request object
         * @returns {Promise<void>}
         */
        async registerConnectionInCache(connectionId, userId, req) {
            // Store connection metadata in cache
            await this.cache.setHashField(
                CacheKeys.SSE_SERVER_CONNECTIONS,
                this.serverId,
                `${userId}:${connectionId}`,
                {
                    connectionId,
                    userId,
                    createdAt: Date.now(),
                    lastActivity: Date.now(),
                    userAgent: req.headers['user-agent'],
                    ip: req.ip
                },
                {
                    onDisabled: () => {
                        this.logger.warn('Cache disabled - SSE will not work across servers');
                    }
                }
            );
            
            // Set TTL on connection
            await this.cache.set(
                CacheKeys.SSE_CONNECTION,
                connectionId,
                {
                    userId,
                    serverId: this.serverId,
                    createdAt: Date.now()
                },
                { ttlSeconds: this.connectionTimeout / 1000 }
            );
        }
        
        /**
         * Subscribes to user's PubSub channel
         * @private
         * @async
         * @param {string} userId - User identifier
         * @returns {Promise<void>}
         */
        async subscribeToUserChannel(userId) {
            if (this.subscriptions.has(userId)) {
                return; // Already subscribed
            }
            
            const subscriptionId = await this.pubSub.subscribe(
                `user:${userId}`,
                (message) => {
                    this.handlePubSubMessage(userId, message);
                }
            );
            
            this.subscriptions.set(userId, subscriptionId);
            this.logger.debug(`Subscribed to channel user:${userId}`);
        }
        
        /**
         * Handles message from PubSub
         * @private
         * @param {string} userId - User identifier
         * @param {Object} message - Message payload
         */
        handlePubSubMessage(userId, message) {
            const userConns = this.userConnections.get(userId);
            if (!userConns || userConns.size === 0) {
                return; // No local connections for this user
            }
            
            // Send to all user's connections on this server
            for (const connId of userConns) {
                const conn = this.connections.get(connId);
                if (conn) {
                    this.sendEvent(conn.response, message.event, message.data);
                    conn.lastActivity = Date.now();
                }
            }
        }
        
        /**
         * Subscribe user to a channel (called externally by subscription management)
         * @async
         * @param {string} userId - User identifier
         * @param {string} channelId - Channel to subscribe to
         * @returns {Promise<void>}
         */
        async subscribeUserToChannel(userId, channelId) {
            // Add user to channel's subscriber set
            await this.cache.addToSet(
                CacheKeys.CHANNEL_SUBSCRIBERS,
                channelId,
                userId
            );
            
            // Track user's subscriptions
            await this.cache.addToSet(
                CacheKeys.USER_SUBSCRIPTIONS,
                userId,
                channelId
            );
        }
        
        /**
         * Unsubscribe user from a channel
         * @async
         * @param {string} userId - User identifier
         * @param {string} channelId - Channel to unsubscribe from
         * @returns {Promise<void>}
         */
        async unsubscribeUserFromChannel(userId, channelId) {
            // Remove user from channel's subscriber set
            await this.cache.removeFromSet(
                CacheKeys.CHANNEL_SUBSCRIBERS,
                channelId,
                userId
            );
            
            // Remove from user's subscriptions
            await this.cache.removeFromSet(
                CacheKeys.USER_SUBSCRIPTIONS,
                userId,
                channelId
            );
        }
        
        /**
         * Sends SSE event to client
         * @private
         * @param {Object} res - Response object
         * @param {string} event - Event type
         * @param {Object} data - Event data
         */
        sendEvent(res, event, data) {
            try {
                if (!res.writableEnded) {
                    res.write(`event: ${event}\n`);
                    res.write(`data: ${JSON.stringify(data)}\n\n`);
                }
            } catch (error) {
                this.logger.error(`Failed to send event: ${error.message}`);
            }
        }
        
        /**
         * Handles connection disconnect
         * @private
         * @param {string} connectionId - Connection identifier
         * @param {string} userId - User identifier
         */
        handleDisconnect(connectionId, userId) {
            // Remove from local maps
            this.connections.delete(connectionId);
            
            const userConns = this.userConnections.get(userId);
            if (userConns) {
                userConns.delete(connectionId);
                if (userConns.size === 0) {
                    this.userConnections.delete(userId);
                    
                    // Unsubscribe from PubSub if no more connections
                    const subscriptionId = this.subscriptions.get(userId);
                    if (subscriptionId) {
                        this.pubSub.unsubscribe(subscriptionId).catch(err => {
                            this.logger.error(`Failed to unsubscribe: ${err.message}`);
                        });
                        this.subscriptions.delete(userId);
                    }
                }
            }
            
            // Remove from cache
            this.cache.removeHashField(
                CacheKeys.SSE_SERVER_CONNECTIONS,
                this.serverId,
                `${userId}:${connectionId}`
            ).catch(err => {
                this.logger.error(`Failed to remove from cache: ${err.message}`);
            });
            
            this.logger.debug(`Connection ${connectionId} disconnected`);
        }
        
        /**
         * Generates unique connection identifier
         * @private
         * @returns {string} Connection ID
         */
        generateConnectionId() {
            return `${this.serverId}_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
        }
        
        /**
         * Starts heartbeat interval
         * @private
         */
        startHeartbeat() {
            setInterval(() => {
                this.sendHeartbeats();
            }, this.heartbeatInterval);
        }
        
        /**
         * Sends heartbeats to all connections
         * @private
         */
        sendHeartbeats() {
            const now = Date.now();
            
            for (const [connId, conn] of this.connections) {
                // Check for stale connections
                if (now - conn.lastActivity > this.connectionTimeout) {
                    this.handleDisconnect(connId, conn.userId);
                    continue;
                }
                
                // Send heartbeat
                try {
                    conn.response.write(': heartbeat\n\n');
                } catch (error) {
                    // Connection dead
                    this.handleDisconnect(connId, conn.userId);
                }
            }
        }
    }
    
    return new ProjectSSEService();
}

6.2.4 Dependency Injection in Application Startup

/**
 * Application startup with SSE service configuration
 * Demonstrates proper dependency injection following Module Factory pattern
 * @module app/startup
 */

// Import service factories
const createCacheService = require('./src/utils/cache-service');
const createPubSubService = require('./src/services/pubsub-service');
const createSSEService = require('./src/services/sse-service');
const createUserRepository = require('./src/repositories/user-repository');

/**
 * Initialize cache service with circuit breaker
 * Cache implementation is abstracted - could be Redis, MongoDB, etc.
 */
const { CacheService, CacheKeys } = createCacheService(cacheImplementation);

/**
 * Initialize PubSub service with configured provider
 * Supports Redis, SQS, Azure Service Bus, Kafka, AMQP
 */
const pubSubService = createPubSubService({
    provider: process.env.PUBSUB_PROVIDER || 'redis',
    connectionOptions: {
        // Provider-specific options injected here
        host: process.env.CACHE_HOST,
        port: process.env.CACHE_PORT,
        // OR for Azure: connectionString: process.env.AZURE_CONNECTION
        // OR for AWS: region: process.env.AWS_REGION
    },
    logger: appLogger
});

/**
 * Create SSE service with dependencies injected
 * NO direct database access - uses proxy pattern for repository integration
 */
const sseService = createSSEService({
    // Required dependencies - all are facades/abstractions
    cacheService: CacheService,     // Cache operations facade
    pubSubService: pubSubService,   // PubSub operations facade
    CacheKeys: CacheKeys,           // Cache key symbols for type safety
    
    // Optional configuration overrides for testing
    serverId: process.env.SSE_SERVER_ID || `api-${process.pid}`,
    heartbeatInterval: parseInt(process.env.SSE_HEARTBEAT) || 30000,
    connectionTimeout: parseInt(process.env.SSE_TIMEOUT) || 120000,
    maxPendingNotifications: parseInt(process.env.SSE_MAX_PENDING) || 100,
    logger: appLogger
});

// Initialize services
await pubSubService.connect();

// Inject into Express app for route access
app.locals.sseService = sseService;
app.locals.cacheService = CacheService;
app.locals.pubSubService = pubSubService;

// Graceful shutdown handling
process.on('SIGTERM', async () => {
    console.log('Shutting down SSE service...');
    
    // Close all SSE connections
    await sseService.shutdown();
    
    // Disconnect from PubSub
    await pubSubService.disconnect();
    
    // Other cleanup...
});

6.2.5 Benefits of Module Factory Pattern for SSE

  1. Testability - Easy to inject mock cache and PubSub services
  2. Consistency - Follows same pattern as rest of codebase
  3. Flexibility - Testers can override any dependency
  4. Graceful Degradation - Cache failures don't break SSE functionality
  5. Separation of Concerns - SSE operations separate from business logic
  6. Repository Independence - Uses proxy pattern instead of direct repository injection

6.2.6 Cache Fallback Strategy

CRITICAL: In a clustered environment with round-robin load balancing, SSE cannot function without the cache service.

When cache service is unavailable:

  1. SSE endpoints return 503 - Service Unavailable
  2. Clients must fall back to polling or wait for service restoration
  3. System continues operating - All other features work normally
  4. No real-time updates - Events are still processed but not delivered in real-time

Why local maps don't work as fallback:

  • Each server has isolated memory
  • Round-robin sends requests to different servers
  • Connection on Server A is invisible to Server B
  • No cross-server coordination possible without cache/pubsub

6.3 SSE Channel Subscription Examples

6.3.1 Global Broadcast to All Connected Users

/**
 * Send system-wide announcement to all connected users
 * @param {string} message - Announcement message
 * @param {string} severity - 'info', 'warning', 'critical'
 */
async function sendGlobalAnnouncement(message, severity) {
    // Publish to global broadcast channel
    await pubSubService.publish('broadcast:global', {
        event: 'system_announcement',
        data: {
            message,
            severity,
            timestamp: new Date().toISOString()
        }
    });
}

// In SSE Service - Subscribe all connections to global channel
async handleSSEConnection(req, res) {
    // ... existing connection setup ...
    
    // Subscribe to user-specific channel
    await this.pubSub.subscribe(`user:${userId}`, handler);
    
    // ALSO subscribe to global broadcast channel
    await this.pubSub.subscribe('broadcast:global', (message) => {
        this.handleGlobalBroadcast(message);
    });
}

// Usage: Maintenance notification
await sendGlobalAnnouncement(
    'System maintenance scheduled for 2am-3am EST',
    'warning'
);

6.3.2 Entity-Specific Channels

/**
 * Subscribe user to entity channel
 * @param {string} userId - User ID
 * @param {string} entityType - Entity type (project, document, etc.)
 * @param {string} entityId - Entity ID
 */
async function subscribeToEntity(userId, entityType, entityId) {
    const channelId = `entity:${entityType}:${entityId}`;
    
    // Add user to channel's subscriber set
    await cacheService.addToSet(
        CacheKeys.CHANNEL_SUBSCRIBERS,
        channelId,        // identifier: the channel
        userId            // member: the user subscribing
    );
    
    // Track user's channel subscriptions
    await cacheService.addToSet(
        CacheKeys.USER_SUBSCRIPTIONS,
        userId,           // identifier: the user
        channelId         // member: the channel they're subscribing to
    );
}

/**
 * Send notification to all entity subscribers
 * @param {string} entityType - Entity type
 * @param {string} entityId - Entity ID
 * @param {Object} notification - Notification data
 */
async function notifyEntitySubscribers(entityType, entityId, notification) {
    const channelId = `entity:${entityType}:${entityId}`;
    
    // Get all subscribers for this channel
    const subscribers = await cacheService.getSetMembers(
        CacheKeys.CHANNEL_SUBSCRIBERS,
        channelId
    );
    
    // Send to each subscriber's personal channel
    for (const userId of subscribers) {
        await pubSubService.publish(`user:${userId}`, {
            event: 'entity_update',
            data: {
                entityType,
                entityId,
                ...notification
            }
        });
    }
}

// Usage: Entity state changed
await notifyEntitySubscribers('project', '123', {
    type: 'state_changed',
    newState: 'active',
    changedBy: 'user-789'
});

6.3.3 Organization-Wide Channels

/**
 * Subscribe user to organization channel
 * @param {string} userId - User ID
 * @param {string} orgId - Organization ID
 */
async function subscribeToOrganization(userId, orgId) {
    const channelId = `org:${orgId}`;
    
    // Add user to org channel subscriber set
    await cacheService.addToSet(
        CacheKeys.CHANNEL_SUBSCRIBERS,
        channelId,        // identifier: the organization channel
        userId            // member: the user
    );
    
    // Track user's channel subscriptions
    await cacheService.addToSet(
        CacheKeys.USER_SUBSCRIPTIONS,
        userId,           // identifier: the user
        channelId         // member: the channel
    );
}

/**
 * Send notification to all organization members
 * @param {string} orgId - Organization ID
 * @param {Object} notification - Notification data
 */
async function notifyOrgMembers(orgId, notification) {
    const channelId = `org:${orgId}`;
    const members = await cacheService.getSetMembers(
        CacheKeys.CHANNEL_SUBSCRIBERS,
        channelId
    );
    
    // Batch publish to all members
    const publishPromises = members.map(userId =>
        pubSubService.publish(`user:${userId}`, {
            event: 'org_announcement',
            data: {
                orgId,
                ...notification
            }
        })
    );
    
    await Promise.all(publishPromises);
}

// Usage: Organization-wide announcement
await notifyOrgMembers('org-acme', {
    type: 'announcement',
    title: 'Company Meeting Tomorrow',
    time: '2024-01-15T10:00:00Z'
});

6.3.4 Topic-Based Subscriptions

/**
 * Subscribe user to topic notifications
 * @param {string} userId - User ID
 * @param {string} topic - Topic name (e.g., 'ai-ml', 'security')
 */
async function subscribeToTopic(userId, topic) {
    await cacheService.addToSet(
        CacheKeys.TOPIC_SUBSCRIBERS,
        topic,            // identifier: the topic
        userId            // member: the user
    );
}

/**
 * Notify all subscribers of a topic
 * @param {string} topic - Topic name
 * @param {Object} notification - Notification data
 */
async function notifyTopicSubscribers(topic, notification) {
    const subscribers = await cacheService.getSetMembers(
        CacheKeys.TOPIC_SUBSCRIBERS,
        topic
    );
    
    for (const userId of subscribers) {
        await pubSubService.publish(`user:${userId}`, {
            event: 'topic_update',
            data: {
                topic,
                ...notification
            }
        });
    }
}

// Usage: New AI/ML research paper
await notifyTopicSubscribers('ai-ml', {
    type: 'new_content',
    title: 'Breakthrough in Neural Network Efficiency',
    link: '/papers/nn-efficiency-2024'
});

6.3.5 Direct User Messaging

/**
 * Send direct message to specific user
 * @param {string} fromUserId - Sender user ID
 * @param {string} toUserId - Recipient user ID
 * @param {string} message - Message content
 */
async function sendDirectMessage(fromUserId, toUserId, message) {
    // No subscription needed - direct to user channel
    await pubSubService.publish(`user:${toUserId}`, {
        event: 'direct_message',
        data: {
            from: fromUserId,
            message,
            timestamp: new Date().toISOString()
        }
    });
}

// Usage: Send DM
await sendDirectMessage('user-123', 'user-456', 'Can we discuss the project?');

6.3.6 Dynamic Entity Subscriptions

/**
 * Generic subscription handler for any entity type
 * @param {string} entityType - Type of entity (contribution, vote, etc.)
 * @param {string} entityId - Entity ID
 * @param {string} userId - User ID to subscribe
 */
async function subscribeToEntity(entityType, entityId, userId) {
    const cacheKey = Symbol.for(`${entityType}Subscribers`);
    
    await cacheService.addToSet(
        cacheKey,
        entityId,         // identifier: the entity
        userId            // member: the user
    );
}

/**
 * Notify all entity subscribers
 * @param {string} entityType - Type of entity
 * @param {string} entityId - Entity ID
 * @param {Object} notification - Notification data
 */
async function notifyEntitySubscribers(entityType, entityId, notification) {
    const cacheKey = Symbol.for(`${entityType}Subscribers`);
    
    const subscribers = await cacheService.getSetMembers(
        cacheKey,
        entityId
    );
    
    for (const userId of subscribers) {
        await pubSubService.publish(`user:${userId}`, {
            event: `${entityType}_update`,
            data: {
                entityType,
                entityId,
                ...notification
            }
        });
    }
}

// Usage: Subscribe to contribution updates
await subscribeToEntity('contribution', 'contrib-789', 'user-123');

// Notify when contribution gets a comment
await notifyEntitySubscribers('contribution', 'contrib-789', {
    type: 'new_comment',
    commentId: 'comment-456',
    author: 'user-999'
});

6.3.7 Message Priority and Filtering

/**
 * SSE Service can filter alerts based on user preferences
 */
class ProjectSSEService {
    async handlePubSubMessage(userId, message) {
        // Get user's notification preferences
        const preferences = await this.getUserPreferences(userId);
        
        // Filter based on preferences
        if (this.shouldSendNotification(message, preferences)) {
            // Check alert priority
            const priority = message.data.priority || 'normal';
            
            // High priority alerts bypass rate limiting
            if (priority === 'high' || this.checkRateLimit(userId)) {
                this.sendToUserConnections(userId, message);
            }
        }
    }
    
    shouldSendNotification(message, preferences) {
        // Check user's subscription settings
        if (message.event === 'entity_update' && !preferences.entityUpdates) {
            return false;
        }
        
        // Check quiet hours
        if (preferences.quietHours && this.isQuietHours(preferences)) {
            return message.data.priority === 'high'; // Only high priority
        }
        
        return true;
    }
}

**Correct behaviour:**
```javascript
if (!cacheService.isAvailable()) {
    res.status(503).json({ 
        error: 'Real-time service temporarily unavailable',
        fallback: 'Please use polling endpoints'
    });
    return;
}

6.4 PubSub Service Facade - Multi-Backend Support

CRITICAL: The PubSub Service provides an abstraction layer for publish/subscribe operations, supporting multiple backend implementations.

6.4.1 Architectural Rationale: Why Redis Pub/Sub in Addition to Message Queues?

6.4.1.1 Message Queue vs Pub/Sub Semantics

While Kafka, SQS, RabbitMQ, AMQP, and Azure Service Bus all support publish/subscribe patterns, there are important semantic and operational differences:

  1. Real-time vs Durability Trade-offs

    • Redis Pub/Sub: Fire-and-forget, no persistence, sub-millisecond latency
    • Message Queues: Durable, guaranteed delivery, higher latency (10-100ms)
    • SSE Requirement: Real-time notifications need immediate delivery, not durability
  2. Connection State Synchronization

    • Redis Pub/Sub: Ephemeral channels perfect for "server1 has user X connected"
    • Message Queues: Would persist these transient states unnecessarily
    • SSE Requirement: Connection states are temporary and shouldn't be durable
  3. Pattern Matching Subscriptions

    • Redis: Native pattern subscriptions (project:*:update)
    • SQS: No pattern matching, requires multiple queues
    • Kafka: Can auto-create topics with auto.create.topics.enable=true, supports regex subscriptions
    • RabbitMQ/AMQP: Topic exchanges support patterns but with overhead
    • Azure Service Bus: Topic filters require SQL-like filter rules on each subscription
6.4.1.2 Redis-Equivalent PubSub Services by Cloud Provider

What we need: Fast, ephemeral pub/sub for SSE coordination with pattern matching support.

Provider Service Type Pattern Support Latency Setup Complexity SSE Suitability
AWS ElastiCache for Redis Managed Redis Yes (PSUBSCRIBE) <1ms Low Excellent
AWS MemoryDB for Redis Durable Redis Yes (PSUBSCRIBE) <2ms Low Excellent
Azure Azure Cache for Redis Managed Redis Yes (PSUBSCRIBE) <1ms Low Excellent
GCP Memorystore for Redis Managed Redis Yes (PSUBSCRIBE) <1ms Low Excellent
Self-hosted Redis Open source Yes (PSUBSCRIBE) <1ms Medium Excellent

Alternative Non-Redis Solutions:

Provider Service Type Pattern Support Latency Setup Complexity SSE Suitability
AWS IoT Core MQTT Broker Yes (wildcards) <5ms Medium Good
Azure Web PubSub WebSocket Service Yes (groups) <5ms Low Very Good
Azure SignalR Service Real-time Service Yes (groups/users) <5ms Low Excellent
GCP Firebase Realtime Real-time Database Yes (paths) <10ms Low Good

Why Redis is preferred for SSE:

  1. Sub-millisecond latency - Critical for real-time feel
  2. Pattern subscriptions - PSUBSCRIBE user:* handles dynamic channels
  3. Ephemeral by design - Perfect for connection state coordination
  4. Battle-tested - Mature technology with extensive tooling
  5. Cloud-agnostic - Same Redis code works across all providers

Cloud Provider Redis Services Configuration:

// AWS ElastiCache for Redis
const redis = require('redis');
const client = redis.createClient({
    host: 'your-cluster.cache.amazonaws.com',
    port: 6379,
    // Cluster mode supported for high availability
    cluster: {
        enableReadyCheck: false,
        redisOptions: {
            password: process.env.REDIS_AUTH_TOKEN
        }
    }
});

// Azure Cache for Redis
const azureClient = redis.createClient({
    host: 'your-cache.redis.cache.windows.net',
    port: 6380, // SSL port
    password: process.env.AZURE_REDIS_KEY,
    tls: {
        servername: 'your-cache.redis.cache.windows.net'
    }
});

// GCP Memorystore for Redis
const gcpClient = redis.createClient({
    host: 'your-instance-ip', // Private IP in VPC
    port: 6379,
    // Auth is via VPC networking, not password
});

// All support the same Redis PubSub commands:
await client.psubscribe('user:*', (channel, message) => {
    console.log(`Received on ${channel}: ${message}`);
});

Cost Comparison (Approximate monthly costs for SSE workload):

Provider Service Instance Type Monthly Cost Notes
AWS ElastiCache cache.t3.micro $15 Good for development
AWS ElastiCache cache.r6g.large $150 Production ready
Azure Cache for Redis C1 Standard $73 1GB, good for SSE
Azure Cache for Redis C3 Standard $300 6GB, high availability
GCP Memorystore 1GB Standard $45 Basic tier
GCP Memorystore 1GB High Availability $90 With replica
6.4.1.3 Dual-Pattern Architecture Decision

The system uses both patterns for different purposes:

  1. Message Queues (Kafka/SQS/etc.) for:

    • Business events that must be processed
    • Audit trails and event sourcing
    • Workflow orchestration
    • Guaranteed delivery requirements
  2. Redis Pub/Sub for:

    • SSE connection coordination between servers
    • Real-time notification fan-out
    • Ephemeral state synchronisation
    • Pattern-based event routing
6.4.1.4 Azure Service Bus Complexity Explained

Azure Service Bus requires more setup for SSE pattern matching because:

  1. Subscription Creation: Each pattern requires a separate subscription with SQL filters
  2. Filter Rules: Must define SQL-like expressions for each subscription
  3. Management Overhead: Cannot dynamically create subscriptions from application code without elevated permissions
// Azure Service Bus Setup Complexity
const { ServiceBusClient } = require('@azure/service-bus');

// Need to pre-create subscriptions with filters
// This typically requires Azure Portal or ARM templates:
/*
Topic: "notifications"
  Subscription: "user-123-alerts"
    Filter: "userId = '123'"
  Subscription: "project-456-alerts"  
    Filter: "projectId = '456'"
  Subscription: "global-alerts"
    Filter: "type = 'broadcast'"
*/

// Application code becomes complex
async function setupUserSubscription(userId) {
    // Cannot create subscription dynamically without admin rights
    // Must use pre-created subscription with filter
    const receiver = sbClient.createReceiver(
        'notifications',
        `user-${userId}-alerts`  // Must exist already!
    );
}
6.4.1.5 Comprehensive Managed Pub/Sub Services Comparison
6.4.1.5.1 AWS Services
  1. AWS IoT Core

    • Pros: MQTT protocol, WebSocket support, device shadows, rules engine
    • Cons: Designed for IoT, connection limits, topic hierarchy restrictions
    • SSE Fit: Good for real-time, supports wildcards
    // Supports pattern subscriptions
    await iotClient.subscribe('user/+/notifications');
  2. Amazon Kinesis Data Streams

    • Pros: Real-time streaming, auto-scaling, multiple consumers
    • Cons: Shard management, ordered by partition key only
    • SSE Fit: Better for analytics than notifications
  3. AWS AppSync

    • Pros: GraphQL subscriptions, real-time updates, offline support
    • Cons: GraphQL-only, resolver complexity
    • SSE Fit: Excellent if using GraphQL
    // GraphQL subscription
    subscription OnNotification($userId: ID!) {
      onNotification(userId: $userId) { message }
    }
  4. Amazon EventBridge

    • Pros: Event routing, content-based filtering, serverless
    • Cons: Not real-time (minimum 1 minute), no WebSocket
    • SSE Fit: Poor for real-time SSE
6.4.1.5.2 Azure Services
  1. Azure SignalR Service

    • Pros: Fully managed SignalR, auto-scaling, WebSocket/SSE/Long Polling
    • Cons: SignalR protocol specific, client library required
    • SSE Fit: Excellent - built for this use case
    // Direct SSE support
    const connection = new signalR.HubConnectionBuilder()
      .withUrl("/hub")
      .build();
  2. Azure Event Grid

    • Pros: Event routing, advanced filtering, WebHook delivery
    • Cons: HTTP push model, not true pub/sub for clients
    • SSE Fit: Needs adapter for SSE
  3. Azure Web PubSub

    • Pros: WebSocket service, pattern subscriptions, presence
    • Cons: Newer service, WebSocket focused
    • SSE Fit: Very Good - supports patterns
    // Supports pattern-based groups
    await client.joinGroup(`project:${projectId}:*`);
  4. Azure Notification Hubs

    • Pros: Push notifications, device management
    • Cons: Mobile/desktop push only, not for web SSE
    • SSE Fit: Not applicable for SSE
6.4.1.5.3 Google Cloud Services
  1. Firebase Realtime Database

    • Pros: Real-time sync, offline support, simple API
    • Cons: NoSQL structure, data modeling constraints
    • SSE Fit: Good but requires data structure changes
    // Auto-syncs to all clients
    firebase.database().ref(`users/${userId}/notifications`)
      .on('child_added', (snapshot) => { /* handle */ });
  2. Cloud Pub/Sub

    • Pros: Scalable, reliable, push/pull delivery
    • Cons: Not browser-compatible, needs proxy
    • SSE Fit: Requires SSE adapter service
  3. Firestore

    • Pros: Real-time listeners, offline support, queries
    • Cons: Document database, query limitations
    • SSE Fit: Good with proper data model
    // Real-time query subscriptions
    firestore.collection('notifications')
      .where('userId', '==', userId)
      .onSnapshot((snapshot) => { /* handle */ });
  4. Google Cloud Tasks

    • Pros: Task queuing, scheduling
    • Cons: Not real-time, HTTP push only
    • SSE Fit: Not suitable for SSE
6.4.1.5.4 Best Managed Services for SSE Without Session Affinity
Service Provider Pattern Support Setup Complexity Cost Model Best For
Azure SignalR Azure Groups/Users Low Connections/Messages Best overall for SSE
Azure Web PubSub Azure Pattern groups Low Connections/Messages Modern WebSocket apps
AWS IoT Core AWS MQTT wildcards Medium Messages IoT + Web hybrid
AWS AppSync AWS GraphQL filters Medium Queries/Data GraphQL APIs
Firebase Realtime GCP Path-based Low Storage/Bandwidth Simple real-time
Firestore GCP Query-based Low Reads/Writes Document-based apps
Pusher Channels 3rd Party Channels/Events Very Low Connections/Messages Quick implementation
Ably 3rd Party Channel patterns Very Low Connections/Messages Global scale
6.4.1.5.5 Recommendation for Managed Services

For SSE without session affinity / sticky sessions using only managed services:

  1. Azure-centric: Use Azure SignalR Service or Azure Web PubSub
  2. AWS-centric: Use AWS IoT Core (despite the name) or AWS AppSync
  3. GCP-centric: Use Firebase Realtime Database or Firestore
  4. Multi-cloud: Use Pusher, Ably, or PubNub
6.4.1.6 Recommendation

For SSE without session affinity, the dual-pattern approach is optimal:

  1. Primary Choice: Managed Redis Services

    • AWS: ElastiCache for Redis or MemoryDB for Redis
    • Azure: Azure Cache for Redis
    • GCP: Memorystore for Redis
    • Self-hosted: Redis with high availability setup
  2. Architecture Pattern:

    • Existing message queue (Kafka/SQS/Service Bus) for business events
    • Redis Pub/Sub for SSE coordination and real-time delivery
    • Service facades to abstract both implementations
  3. Why This Works:

    • Redis handles ephemeral SSE connection coordination
    • Message queues handle durable business event processing
    • Both can coexist and serve different purposes
    • Same Redis API across all cloud providers
    • Sub-millisecond latency for real-time user experience

6.4.2 PubSub Service Interface

/**
 * PubSub Service Module Factory
 * Abstracts pub/sub operations across different message brokers
 * @module services/pubsub-service
 * @param {Object} config - Configuration object
 * @param {string} config.provider - Provider type: 'redis', 'sqs', 'azure', 'kafka', 'amqp'
 * @param {Object} config.connectionOptions - Provider-specific connection options
 * @param {Object} [config.logger] - Optional logger instance
 * @returns {PubSubService} PubSub service instance
 */
module.exports = ({ provider, connectionOptions, logger = console }) => {
    
    /**
     * @class PubSubService
     * @description Unified interface for pub/sub operations across different backends
     */
    class PubSubService {
        constructor() {
            this.provider = provider;
            this.logger = logger;
            this.adapter = this.createAdapter(provider, connectionOptions);
            this.subscriptions = new Map();
            this.isConnected = false;
        }
        
        /**
         * Creates the appropriate adapter based on provider
         * @private
         * @param {string} provider - Provider type
         * @param {Object} options - Connection options
         * @returns {Object} Provider adapter
         */
        createAdapter(provider, options) {
            switch(provider) {
                case 'redis':
                    return new RedisPubSubAdapter(options);
                case 'sqs':
                    return new SQSAdapter(options);
                case 'azure':
                    return new AzureServiceBusAdapter(options);
                case 'kafka':
                    return new KafkaAdapter(options);
                case 'amqp':
                    return new AMQPAdapter(options);
                default:
                    throw new Error(`Unsupported provider: ${provider}`);
            }
        }
        
        /**
         * Connects to the pub/sub backend
         * @async
         * @returns {Promise<void>}
         * @throws {Error} Connection error
         */
        async connect() {
            try {
                await this.adapter.connect();
                this.isConnected = true;
                this.logger.info(`PubSub connected to ${this.provider}`);
            } catch (error) {
                this.logger.error(`PubSub connection failed: ${error.message}`);
                throw error;
            }
        }
        
        /**
         * Publishes a message to a channel
         * @async
         * @param {string} channel - Channel/topic name
         * @param {Object} message - Message payload
         * @returns {Promise<void>}
         * @example
         * await pubSubService.publish('user:123', { event: 'notification', data: {...} })
         */
        async publish(channel, message) {
            if (!this.isConnected) {
                throw new Error('PubSub service not connected');
            }
            
            try {
                const payload = JSON.stringify(message);
                await this.adapter.publish(channel, payload);
                this.logger.debug(`Published to ${channel}`, message);
            } catch (error) {
                this.logger.error(`Publish failed to ${channel}: ${error.message}`);
                throw error;
            }
        }
        
        /**
         * Subscribes to a channel
         * @async
         * @param {string} channel - Channel/topic name
         * @param {Function} handler - Message handler function
         * @returns {Promise<string>} Subscription ID
         * @example
         * const subId = await pubSubService.subscribe('user:123', (message) => {
         *     console.log('Received:', message);
         * });
         */
        async subscribe(channel, handler) {
            if (!this.isConnected) {
                throw new Error('PubSub service not connected');
            }
            
            const subscriptionId = `${channel}_${Date.now()}`;
            
            const wrappedHandler = (rawMessage) => {
                try {
                    const message = JSON.parse(rawMessage);
                    handler(message);
                } catch (error) {
                    this.logger.error(`Handler error for ${channel}: ${error.message}`);
                }
            };
            
            await this.adapter.subscribe(channel, wrappedHandler);
            this.subscriptions.set(subscriptionId, { channel, handler: wrappedHandler });
            
            this.logger.debug(`Subscribed to ${channel} with ID ${subscriptionId}`);
            return subscriptionId;
        }
        
        /**
         * Subscribes to a pattern (for providers that support it)
         * @async
         * @param {string} pattern - Pattern to match channels
         * @param {Function} handler - Message handler function
         * @returns {Promise<string>} Subscription ID
         * @example
         * await pubSubService.psubscribe('user:*', (channel, message) => {
         *     console.log(`Received on ${channel}:`, message);
         * });
         */
        async psubscribe(pattern, handler) {
            if (!this.adapter.psubscribe) {
                throw new Error(`Pattern subscription not supported by ${this.provider}`);
            }
            
            const subscriptionId = `pattern_${pattern}_${Date.now()}`;
            
            const wrappedHandler = (channel, rawMessage) => {
                try {
                    const message = JSON.parse(rawMessage);
                    handler(channel, message);
                } catch (error) {
                    this.logger.error(`Pattern handler error: ${error.message}`);
                }
            };
            
            await this.adapter.psubscribe(pattern, wrappedHandler);
            this.subscriptions.set(subscriptionId, { pattern, handler: wrappedHandler });
            
            return subscriptionId;
        }
        
        /**
         * Unsubscribes from a channel or pattern
         * @async
         * @param {string} subscriptionId - Subscription ID to cancel
         * @returns {Promise<void>}
         */
        async unsubscribe(subscriptionId) {
            const subscription = this.subscriptions.get(subscriptionId);
            if (!subscription) {
                throw new Error(`Subscription ${subscriptionId} not found`);
            }
            
            if (subscription.channel) {
                await this.adapter.unsubscribe(subscription.channel);
            } else if (subscription.pattern) {
                await this.adapter.punsubscribe(subscription.pattern);
            }
            
            this.subscriptions.delete(subscriptionId);
            this.logger.debug(`Unsubscribed ${subscriptionId}`);
        }
        
        /**
         * Checks if service is available
         * @returns {boolean} Service availability
         */
        isAvailable() {
            return this.isConnected && this.adapter.isHealthy();
        }
        
        /**
         * Disconnects from the pub/sub backend
         * @async
         * @returns {Promise<void>}
         */
        async disconnect() {
            for (const [id, _] of this.subscriptions) {
                await this.unsubscribe(id);
            }
            
            await this.adapter.disconnect();
            this.isConnected = false;
            this.logger.info(`PubSub disconnected from ${this.provider}`);
        }
    }
    
    return new PubSubService();
};

6.4.3 Redis Adapter Implementation (Multi-Cloud)

/**
 * Redis PubSub Adapter - Works with all cloud provider Redis services
 * @class RedisPubSubAdapter
 * @private
 */
class RedisPubSubAdapter {
    constructor(options) {
        const Redis = require('ioredis');
        
        // Auto-detect cloud provider configuration
        const redisConfig = this.buildRedisConfig(options);
        
        // Separate clients for pub/sub (Redis best practice)
        this.pubClient = new Redis(redisConfig);
        this.subClient = new Redis(redisConfig);
        this.healthy = true;
        
        // Health monitoring
        this.setupHealthChecks();
    }
    
    /**
     * Builds Redis configuration for different cloud providers
     * @private
     */
    buildRedisConfig(options) {
        // AWS ElastiCache / MemoryDB
        if (options.aws || options.host?.includes('cache.amazonaws.com')) {
            return {
                host: options.host,
                port: options.port || 6379,
                password: options.password,
                tls: options.tls ? {} : undefined,
                retryDelayOnFailover: 100,
                enableReadyCheck: false,
                maxRetriesPerRequest: 3
            };
        }
        
        // Azure Cache for Redis
        if (options.azure || options.host?.includes('redis.cache.windows.net')) {
            return {
                host: options.host,
                port: options.port || 6380,
                password: options.password,
                tls: {
                    servername: options.host
                },
                family: 4,
                keepAlive: true
            };
        }
        
        // GCP Memorystore
        if (options.gcp || options.region) {
            return {
                host: options.host,
                port: options.port || 6379,
                // Memorystore uses VPC auth, no password needed
                connectTimeout: 10000,
                lazyConnect: true
            };
        }
        
        // Default/Self-hosted Redis
        return {
            host: options.host || 'localhost',
            port: options.port || 6379,
            password: options.password,
            retryDelayOnFailover: 100,
            maxRetriesPerRequest: 3
        };
    }
    
    /**
     * Sets up health monitoring for both clients
     * @private
     */
    setupHealthChecks() {
        this.subClient.on('error', (err) => {
            console.error('Redis subscription client error:', err);
            this.healthy = false;
        });
        
        this.pubClient.on('error', (err) => {
            console.error('Redis publisher client error:', err);
            this.healthy = false;
        });
        
        this.subClient.on('connect', () => {
            console.log('Redis subscription client connected');
            this.healthy = true;
        });
        
        this.pubClient.on('connect', () => {
            console.log('Redis publisher client connected');
        });
    }
    
    async connect() {
        try {
            await Promise.all([
                this.pubClient.ping(),
                this.subClient.ping()
            ]);
            this.healthy = true;
            console.log('Redis PubSub adapter connected successfully');
        } catch (error) {
            this.healthy = false;
            throw new Error(`Redis connection failed: ${error.message}`);
        }
    }
    
    async publish(channel, message) {
        if (!this.healthy) {
            throw new Error('Redis publisher not healthy');
        }
        return this.pubClient.publish(channel, message);
    }
    
    async subscribe(channel, handler) {
        this.subClient.subscribe(channel);
        this.subClient.on('message', (ch, message) => {
            if (ch === channel) handler(message);
        });
    }
    
    async psubscribe(pattern, handler) {
        this.subClient.psubscribe(pattern);
        this.subClient.on('pmessage', handler);
    }
    
    async unsubscribe(channel) {
        return this.subClient.unsubscribe(channel);
    }
    
    async punsubscribe(pattern) {
        return this.subClient.punsubscribe(pattern);
    }
    
    isHealthy() {
        return this.healthy && 
               this.pubClient.status === 'ready' && 
               this.subClient.status === 'ready';
    }
    
    async disconnect() {
        await Promise.all([
            this.pubClient.quit(),
            this.subClient.quit()
        ]);
        console.log('Redis PubSub adapter disconnected');
    }
}

/**
 * Example configurations for different cloud providers:
 */

// AWS ElastiCache for Redis
const awsAdapter = new RedisPubSubAdapter({
    aws: true,
    host: 'your-cluster.abcdef.cache.amazonaws.com',
    port: 6379,
    password: process.env.REDIS_AUTH_TOKEN
});

// Azure Cache for Redis
const azureAdapter = new RedisPubSubAdapter({
    azure: true,
    host: 'your-cache.redis.cache.windows.net',
    port: 6380,
    password: process.env.AZURE_REDIS_KEY
});

// GCP Memorystore for Redis
const gcpAdapter = new RedisPubSubAdapter({
    gcp: true,
    host: '10.1.1.100', // Private VPC IP
    port: 6379
});

// Self-hosted Redis
const selfHostedAdapter = new RedisPubSubAdapter({
    host: 'redis.yourdomain.com',
    port: 6379,
    password: process.env.REDIS_PASSWORD
});

6.4.4 Multi-Cloud Redis Configuration

// Production configuration supporting different cloud Redis services
const pubSubService = createPubSubService({
    provider: 'redis', // Primary choice for SSE
    connectionOptions: {
        // AWS ElastiCache
        aws: process.env.CLOUD_PROVIDER === 'aws',
        host: process.env.REDIS_HOST,
        port: parseInt(process.env.REDIS_PORT) || 6379,
        password: process.env.REDIS_PASSWORD,
        
        // Azure Cache for Redis
        azure: process.env.CLOUD_PROVIDER === 'azure',
        // host: process.env.REDIS_HOST, // same as above
        // port: 6380 for Azure SSL
        
        // GCP Memorystore
        gcp: process.env.CLOUD_PROVIDER === 'gcp',
        region: process.env.GCP_REGION,
        
        // TLS configuration for cloud providers
        tls: process.env.REDIS_TLS === 'true' ? {} : undefined,
        
        // Connection pooling and retry settings
        retryDelayOnFailover: 100,
        maxRetriesPerRequest: 3,
        enableReadyCheck: false
    },
    logger: winston.createLogger({...})
});

6.4.5 Connection Lifecycle Management

CRITICAL: Handling multiple connections per user across the cluster requires proper lifecycle management to prevent memory exhaustion.

6.4.5.1 Recommended Approach: LRU + TTL

The safest strategy combines LRU (Least Recently Used) eviction with TTL (Time To Live) to guarantee memory bounds:

class SSEConnectionManager {
    constructor(options = {}) {
        // Memory safety configuration
        this.maxConnections = options.maxConnections || 5000;  // Hard limit per server
        this.maxConnectionAge = options.maxConnectionAge || 3600000; // 1 hour max age
        this.staleTimeout = options.staleTimeout || 120000; // 2 min inactivity
        this.maxUserConnections = options.maxUserConnections || 5; // Max tabs per user
        
        // Connection tracking
        this.connections = new Map();
        this.userConnections = new Map();
        this.connectionMetadata = new Map(); // Stores lastActivity, createdAt
        
        // Start cleanup intervals
        this.startMaintenanceTasks();
    }
    
    startMaintenanceTasks() {
        // Check every 30 seconds for stale connections
        setInterval(() => this.evictStaleConnections(), 30000);
        
        // Enforce memory limits every 10 seconds
        setInterval(() => this.enforceLRU(), 10000);
        
        // Log metrics every minute
        setInterval(() => this.logMetrics(), 60000);
    }
    
    addConnection(connectionId, userId, request, response) {
        // Enforce per-user limit first
        this.enforceUserLimit(userId);
        
        // Enforce global limit
        if (this.connections.size >= this.maxConnections) {
            this.evictLRUConnection();
        }
        
        // Add the connection with both request and response
        const connection = {
            id: connectionId,
            userId,
            request,   // Need this for removing listeners
            response,  // Need this for sending data and closing
            createdAt: Date.now(),
            lastActivity: Date.now()
        };
        
        this.connections.set(connectionId, connection);
        this.connectionMetadata.set(connectionId, {
            lastActivity: Date.now(),
            createdAt: Date.now()
        });
        
        // Track user connections
        if (!this.userConnections.has(userId)) {
            this.userConnections.set(userId, new Set());
        }
        this.userConnections.get(userId).add(connectionId);
    }
    
    evictStaleConnections() {
        const now = Date.now();
        const connectionsToEvict = [];
        
        for (const [connId, metadata] of this.connectionMetadata) {
            // Check both age and activity
            const age = now - metadata.createdAt;
            const inactivity = now - metadata.lastActivity;
            
            if (age > this.maxConnectionAge || inactivity > this.staleTimeout) {
                connectionsToEvict.push(connId);
            }
        }
        
        // Evict stale connections
        for (const connId of connectionsToEvict) {
            const conn = this.connections.get(connId);
            if (conn) {
                console.log(`Evicting stale connection ${connId}: age=${age}ms, inactive=${inactivity}ms`);
                this.closeConnection(connId, 'Connection timeout');
            }
        }
    }
    
    evictLRUConnection() {
        // Find the least recently used connection
        let lruConnId = null;
        let oldestActivity = Date.now();
        
        for (const [connId, metadata] of this.connectionMetadata) {
            if (metadata.lastActivity < oldestActivity) {
                oldestActivity = metadata.lastActivity;
                lruConnId = connId;
            }
        }
        
        if (lruConnId) {
            console.log(`Evicting LRU connection ${lruConnId} to prevent memory exhaustion`);
            this.closeConnection(lruConnId, 'Server memory limit reached');
        }
    }
    
    enforceUserLimit(userId) {
        const userConns = this.userConnections.get(userId);
        
        if (userConns && userConns.size >= this.maxUserConnections) {
            // Find oldest connection for this user
            let oldestConnId = null;
            let oldestTime = Date.now();
            
            for (const connId of userConns) {
                const metadata = this.connectionMetadata.get(connId);
                if (metadata && metadata.createdAt < oldestTime) {
                    oldestTime = metadata.createdAt;
                    oldestConnId = connId;
                }
            }
            
            if (oldestConnId) {
                console.log(`User ${userId} exceeded connection limit, evicting oldest`);
                this.closeConnection(oldestConnId, 'Too many connections');
            }
        }
    }
    
    closeConnection(connectionId, reason) {
        const conn = this.connections.get(connectionId);
        if (!conn) return;
        
        // CRITICAL: Properly close the HTTP response stream
        try {
            // Check if response is still writable
            if (conn.response && !conn.response.writableEnded) {
                // Send close event to client with reason
                conn.response.write(`event: close\ndata: ${JSON.stringify({ 
                    reason, 
                    reconnect: true,
                    timestamp: Date.now()
                })}\n\n`);
                
                // End the response stream
                conn.response.end();
            }
        } catch (e) {
            // Connection already dead or closed
            console.debug(`Connection ${connectionId} already closed: ${e.message}`);
        }
        
        // Remove all event listeners to prevent memory leaks
        if (conn.request) {
            conn.request.removeAllListeners('close');
            conn.request.removeAllListeners('error');
        }
        
        // Clean up all maps
        this.connections.delete(connectionId);
        this.connectionMetadata.delete(connectionId);
        
        const userConns = this.userConnections.get(conn.userId);
        if (userConns) {
            userConns.delete(connectionId);
            if (userConns.size === 0) {
                this.userConnections.delete(conn.userId);
            }
        }
        
        // Clean up Redis if cache is available
        if (this.cache) {
            this.cache.removeHashField(
                CacheKeys.SSE_SERVER_CONNECTIONS,
                this.serverId,
                `${conn.userId}:${connectionId}`
            ).catch(err => {
                console.error(`Failed to remove connection from Redis: ${err.message}`);
            });
        }
        
        console.info(`Connection ${connectionId} closed: ${reason}`);
    }
    
    updateActivity(connectionId) {
        const metadata = this.connectionMetadata.get(connectionId);
        if (metadata) {
            metadata.lastActivity = Date.now();
        }
    }
    
    logMetrics() {
        const metrics = {
            totalConnections: this.connections.size,
            uniqueUsers: this.userConnections.size,
            memoryUsage: Math.round(process.memoryUsage().heapUsed / 1024 / 1024) + 'MB',
            limits: {
                maxConnections: this.maxConnections,
                currentUtilization: (this.connections.size / this.maxConnections * 100).toFixed(1) + '%'
            }
        };
        
        console.info('SSE Connection Metrics:', metrics);
        
        // Emit warning if approaching limits
        if (this.connections.size > this.maxConnections * 0.8) {
            console.warn('WARNING: Approaching connection limit!', metrics);
        }
    }
}
6.4.5.2 Why LRU + TTL Prevents Memory Exhaustion
  1. Hard Connection Limit - Never exceed maxConnections (e.g., 5000)
  2. Automatic Age-Out - Connections older than 1 hour are evicted
  3. Inactivity Timeout - Stale connections evicted after 2 minutes
  4. Per-User Limits - Prevent single user from consuming too many connections
  5. Continuous Cleanup - Regular intervals ensure timely eviction

This guarantees:

  • Memory bounded by maxConnections * avgConnectionSize
  • No zombie connections older than TTL
  • Fair resource allocation across users
  • Predictable memory usage for capacity planning
6.4.5.3 Connection Tracking Structure
// Local in-memory tracking (per server)
class ConnectionManager {
    constructor() {
        // Map of connectionId -> connection object
        this.connections = new Map();
        
        // Map of userId -> Set of connectionIds
        this.userConnections = new Map();
        
        // LRU eviction for memory management
        this.maxConnections = 10000;
        this.connectionAccessTime = new Map();
    }
}

// Redis tracking (cluster-wide)
// Hash: sse_connections:server1
// Field: userId:connectionId
// Value: { metadata including lastActivity timestamp }
6.4.5.4 Multiple Connections Per User

Users can have multiple SSE connections (tabs, devices):

// User with 3 tabs open
userConnections.get('user123') // Set(['conn1', 'conn2', 'conn3'])

// Each connection tracked separately
connections.get('conn1') // { userId: 'user123', response: res1, ... }
connections.get('conn2') // { userId: 'user123', response: res2, ... }
connections.get('conn3') // { userId: 'user123', response: res3, ... }
6.4.5.5 Connection Eviction Strategies

PRIMARY STRATEGY: LRU + TTL - This combination prevents memory exhaustion and is the safest approach.

  1. Client Disconnect (Polite Close)
req.on('close', async () => {
    // Remove from local maps
    this.connections.delete(connectionId);
    
    const userConns = this.userConnections.get(userId);
    if (userConns) {
        userConns.delete(connectionId);
        if (userConns.size === 0) {
            this.userConnections.delete(userId);
        }
    }
    
    // Remove from Redis
    await this.cache.removeHashField(
        CacheKeys.SSE_SERVER_CONNECTIONS,
        this.serverId,
        `${userId}:${connectionId}`
    );
    
    await this.cache.delete(CacheKeys.SSE_CONNECTION, connectionId);
});
  1. TTL-Based Eviction (Stale Connections)
// Heartbeat checks for stale connections
sendHeartbeats() {
    const now = Date.now();
    const staleThreshold = 2 * 60 * 1000; // 2 minutes
    
    for (const [connId, conn] of this.connections) {
        if (now - conn.lastActivity > staleThreshold) {
            // Connection is stale - evict it
            console.log(`Evicting stale connection ${connId}`);
            
            try {
                // Send close event
                this.sendEvent(conn, {
                    type: 'connection_timeout',
                    data: { reason: 'Inactivity timeout' }
                });
                conn.response.end();
            } catch (e) {
                // Connection already dead
            }
            
            // Clean up
            this.handleDisconnect(connId, conn.userId);
        } else {
            // Send heartbeat
            try {
                conn.response.write(': heartbeat\n\n');
                conn.lastActivity = now;
            } catch (e) {
                // Connection dead - clean up
                this.handleDisconnect(connId, conn.userId);
            }
        }
    }
}
  1. LRU Eviction (Memory Pressure)
enforceConnectionLimit() {
    if (this.connections.size > this.maxConnections) {
        // Find LRU connection
        let oldestTime = Date.now();
        let oldestConnId = null;
        
        for (const [connId, conn] of this.connections) {
            if (conn.lastActivity < oldestTime) {
                oldestTime = conn.lastActivity;
                oldestConnId = connId;
            }
        }
        
        if (oldestConnId) {
            const conn = this.connections.get(oldestConnId);
            
            // Notify and close
            this.sendEvent(conn, {
                type: 'connection_limit',
                data: { reason: 'Server connection limit reached' }
            });
            conn.response.end();
            
            // Clean up
            this.handleDisconnect(oldestConnId, conn.userId);
        }
    }
}
  1. Redis TTL (Automatic Cleanup)
// Connection metadata expires automatically
await this.cache.set(
    CacheKeys.SSE_CONNECTION,
    connectionId,
    metadata,
    { ttlSeconds: 3600 }  // Auto-expires after 1 hour
);

// Periodic cleanup job removes orphaned entries
async cleanupOrphanedConnections() {
    const servers = await this.cache.keys('sse_connections:*');
    
    for (const serverKey of servers) {
        const connections = await this.cache.getAllHashFields(
            CacheKeys.SSE_SERVER_CONNECTIONS,
            serverKey.split(':')[1]
        );
        
        for (const [field, metadata] of Object.entries(connections)) {
            // Check if connection metadata still exists
            const connExists = await this.cache.get(
                CacheKeys.SSE_CONNECTION,
                metadata.connectionId
            );
            
            if (!connExists) {
                // Connection expired - remove from hash
                await this.cache.removeHashField(
                    CacheKeys.SSE_SERVER_CONNECTIONS,
                    metadata.serverId,
                    field
                );
            }
        }
    }
}
  1. Server Shutdown (Graceful Cleanup)
async shutdown() {
    // Notify all connections
    for (const [connId, conn] of this.connections) {
        this.sendEvent(conn, {
            type: 'server_shutdown',
            data: { message: 'Server shutting down, please reconnect' }
        });
        conn.response.end();
    }
    
    // Clear Redis entries for this server
    await this.cache.delete(
        CacheKeys.SSE_SERVER_CONNECTIONS,
        this.serverId
    );
    
    // Clear local maps
    this.connections.clear();
    this.userConnections.clear();
}
6.4.5.6 Connection Limits Per User
// Prevent connection spam from single user
async canUserConnect(userId) {
    const userConns = this.userConnections.get(userId);
    
    if (userConns && userConns.size >= MAX_CONNECTIONS_PER_USER) {
        // Evict oldest connection for this user
        const connections = Array.from(userConns)
            .map(id => ({ id, conn: this.connections.get(id) }))
            .sort((a, b) => a.conn.createdAt - b.conn.createdAt);
        
        const oldest = connections[0];
        
        // Close oldest
        this.sendEvent(oldest.conn, {
            type: 'connection_replaced',
            data: { reason: 'New connection opened' }
        });
        oldest.conn.response.end();
        
        this.handleDisconnect(oldest.id, userId);
    }
    
    return true;
}
6.4.5.7 Monitoring and Metrics
getConnectionMetrics() {
    return {
        totalConnections: this.connections.size,
        uniqueUsers: this.userConnections.size,
        avgConnectionsPerUser: this.connections.size / Math.max(1, this.userConnections.size),
        oldestConnection: Math.min(...Array.from(this.connections.values()).map(c => c.createdAt)),
        memoryUsage: process.memoryUsage().heapUsed,
        connectionsByAge: {
            under1min: 0,
            under5min: 0,
            under1hour: 0,
            over1hour: 0
        }
    };
}

6.5 SSE Subscription Management

IMPORTANT: This section shows how SSE subscriptions could be managed, but the actual subscription logic is handled by separate services/ADRs. The SSE service only handles the delivery mechanism.

6.5.1 Example Subscription Storage

// SSE subscriptions could be stored in MongoDB (handled by separate service)
{
  _id: ObjectId("..."),
  userId: ObjectId("507f1f77bcf86cd799439010"),
  sseSubscriptions: [
    {
      channelType: "entity",
      entityType: "project", 
      entityId: "PRJ-001",
      subscriptionDate: ISODate("2024-01-01T00:00:00Z")
    },
    {
      channelType: "topic",
      topicName: "ai-research",
      subscriptionDate: ISODate("2024-01-02T00:00:00Z")
    },
    {
      channelType: "global",
      subscriptionDate: ISODate("2024-01-01T00:00:00Z")
    }
  ]
}

// Example entity that users might subscribe to
{
  _id: "PRJ-001",
  name: "Alpha Project", 
  state: "ACTIVE",
  createdAt: ISODate("2024-01-01T00:00:00Z"),
  updatedAt: ISODate("2025-01-20T15:45:00Z"),
}

6.5.2 Redis SSE Cache Structure

// 1. Channel -> Users mapping (who's subscribed to this channel?)
// Key: channel_subscribers:entity:project:PRJ-001
// Value: Set of user IDs subscribed to this channel
SADD channel_subscribers:entity:project:PRJ-001 user123 user456 user789
SADD channel_subscribers:topic:ai-research user123 user999

// 2. User -> Channels mapping (what channels is this user subscribed to?)
// Key: user_subscriptions:user123
// Value: Set of channel IDs
SADD user_subscriptions:user123 entity:project:PRJ-001 topic:ai-research

// 3. Active SSE connections by server
// Key: sse_connections:server1
// Value: Hash of userId:connectionId -> metadata
HSET sse_connections:server1 user123:conn_abc '{"created":1234567890,"lastActivity":1234567890}'
HSET sse_connections:server1 user123:conn_def '{"created":1234567900,"lastActivity":1234567900}'

// 4. Connection details with TTL
// Key: sse_connection:conn_abc
// Value: JSON with connection metadata (expires automatically)
SETEX sse_connection:conn_abc 3600 '{"userId":"user123","serverId":"server1","created":1234567890}'

// 5. Global subscribers (for broadcast messages)
// Key: global_subscribers
// Value: Set of all connected user IDs
SADD global_subscribers user123 user456 user789

// 6. Topic subscribers 
// Key: topic_subscribers:ai-research
// Value: Set of user IDs interested in this topic
SADD topic_subscribers:ai-research user123 user999

6.6 Message Bus Facade Pattern (Existing Implementation)

The system already has a message-producer.js that implements the facade pattern for message bus abstraction. This producer listens to ALL events from the Global Event Emitter and forwards them to the configured message bus (Kafka or Azure Service Bus):

// message-bus-facade.js
class MessageBusFacade {
    constructor(config = {}) {
        this.provider = config.provider || process.env.MESSAGE_BUS_PROVIDER || 'redis';
        
        switch(this.provider) {
            case 'azure':
                this.implementation = new AzureServiceBusAdapter(config);
                break;
            case 'aws':
                this.implementation = new AWSSQSAdapter(config);
                break;
            case 'redis':
                this.implementation = new RedisAdapter(config);
                break;
            case 'memory':
                this.implementation = new InMemoryAdapter(config);
                break;
            default:
                throw new Error(`Unknown provider: ${this.provider}`);
        }
    }
    
    async publish(topic, message) {
        return this.implementation.publish(topic, message);
    }
    
    async subscribe(topic, handler) {
        return this.implementation.subscribe(topic, handler);
    }
    
    async createSubscription(topic, subscriptionName) {
        return this.implementation.createSubscription(topic, subscriptionName);
    }
}

// Azure Service Bus Adapter
class AzureServiceBusAdapter {
    constructor(config) {
        const { ServiceBusClient } = require('@azure/service-bus');
        this.client = new ServiceBusClient(config.connectionString);
        this.senders = new Map();
        this.receivers = new Map();
    }
    
    async publish(topic, message) {
        if (!this.senders.has(topic)) {
            this.senders.set(topic, this.client.createSender(topic));
        }
        
        const sender = this.senders.get(topic);
        await sender.sendMessages({
            body: message,
            contentType: 'application/json',
            messageId: message.id || crypto.randomUUID()
        });
        
        return { success: true, messageId: message.id };
    }
    
    async subscribe(topic, handler) {
        const receiver = this.client.createReceiver(topic, this.subscriptionName);
        
        receiver.subscribe({
            processMessage: async (message) => {
                await handler(message.body);
                await receiver.completeMessage(message);
            },
            processError: async (error) => {
                console.error('Azure Service Bus error:', error);
            }
        });
        
        this.receivers.set(topic, receiver);
        
        return () => receiver.close();
    }
}

// AWS SQS Adapter  
class AWSSQSAdapter {
    constructor(config) {
        const AWS = require('aws-sdk');
        this.sqs = new AWS.SQS({ region: config.region });
        this.sns = new AWS.SNS({ region: config.region });
        this.queueUrls = config.queueUrls || {};
        this.topicArns = config.topicArns || {};
    }
    
    async publish(topic, message) {
        const params = {
            TopicArn: this.topicArns[topic],
            Message: JSON.stringify(message),
            MessageAttributes: {
                type: {
                    DataType: 'String',
                    StringValue: message.type || 'notification'
                }
            }
        };
        
        const result = await this.sns.publish(params).promise();
        return { success: true, messageId: result.MessageId };
    }
    
    async subscribe(topic, handler) {
        const queueUrl = this.queueUrls[topic];
        let running = true;
        
        const poll = async () => {
            while (running) {
                try {
                    const params = {
                        QueueUrl: queueUrl,
                        MaxNumberOfMessages: 10,
                        WaitTimeSeconds: 20
                    };
                    
                    const result = await this.sqs.receiveMessage(params).promise();
                    
                    if (result.Messages) {
                        for (const message of result.Messages) {
                            const body = JSON.parse(message.Body);
                            await handler(body);
                            
                            await this.sqs.deleteMessage({
                                QueueUrl: queueUrl,
                                ReceiptHandle: message.ReceiptHandle
                            }).promise();
                        }
                    }
                } catch (error) {
                    console.error('SQS polling error:', error);
                    await new Promise(resolve => setTimeout(resolve, 5000));
                }
            }
        };
        
        poll();
        
        return () => { running = false; };
    }
}

// Redis Adapter (for local development)
class RedisAdapter {
    constructor(config) {
        const Redis = require('ioredis');
        this.pubClient = new Redis(config);
        this.subClient = new Redis(config);
        this.handlers = new Map();
    }
    
    async publish(topic, message) {
        await this.pubClient.publish(topic, JSON.stringify(message));
        return { success: true };
    }
    
    async subscribe(topic, handler) {
        if (!this.handlers.has(topic)) {
            this.handlers.set(topic, new Set());
            await this.subClient.subscribe(topic);
        }
        
        this.handlers.get(topic).add(handler);
        
        this.subClient.on('message', (channel, message) => {
            if (channel === topic) {
                const handlers = this.handlers.get(topic);
                handlers.forEach(h => h(JSON.parse(message)));
            }
        });
        
        return () => {
            const handlers = this.handlers.get(topic);
            handlers.delete(handler);
        };
    }
}

6.7 Event Processing Architecture

6.7.1 Deployment Architecture

CRITICAL: The system consists of TWO separate types of processes:

  1. API Servers (Multiple instances)

    • Handle HTTP requests and SSE connections
    • Run the SSE Service for managing client connections
    • Subscribe to Redis Pub/Sub to receive notifications
    • Do NOT process events from the message bus
    • Scale horizontally based on connection load
  2. Notification Worker (Separate process/container)

    • Runs as an independent service/process
    • Consumes events from message bus (Kafka/Azure Service Bus)
    • Determines which users should receive notifications
    • Publishes to Redis Pub/Sub for distribution
    • Does NOT handle HTTP or SSE connections
    • Can be scaled independently from API servers
graph TD

subgraph Production_Deployment[Production Deployment]

direction TB

subgraph API_Servers[ ]

A[API Server Process 1<br>Express App<br>- HTTP Routes<br>- SSE Service<br>- Redis Sub] -->|Subscribes| R

B[API Server Process 2<br>Express App<br>- HTTP Routes<br>- SSE Service<br>- Redis Sub] -->|Subscribes| R

end

R[Redis Pub/Sub] -->|Publishes| N

subgraph Worker[ ]

N[Notification Worker Process<br>Separate Container/Service<br>- Consumes from Message Bus<br>- Processes Events<br>- Publishes to Redis]

end

N -->|Consumes| M[Message Bus<br>Kafka/Azure Service Bus]

end

  

classDef boxStyle fill:#f9f9f9,stroke:#333,stroke-width:2px;

classDef containerStyle fill:none,stroke:#333,stroke-width:2px;

class Production_Deployment,API_Servers,Worker containerStyle;

class A,B,R,N,M boxStyle;
Loading

6.7.2 Complete Event Flow Sequence

sequenceDiagram
    participant PS as Project Service
    participant EE as Global Event Emitter
    participant MP as MessageProducer<br/>(message-producer.js)
    participant MB as Message Bus<br/>(Kafka/Azure)
    participant NW as Notification Worker
    participant RC as Redis Cache
    participant RP as Redis Pub/Sub
    participant SSE1 as SSE Server 1
    participant SSE2 as SSE Server 2
    participant U1 as User 1<br/>(on Server 1)
    participant U2 as User 2<br/>(on Server 2)
    
    Note over PS,EE: Project Event Occurs
    PS->>EE: emit('project.solution_added', data)
    EE->>MP: Event captured (automatic listener)
    MP->>MB: sendMessage('project.solution_added', data)
    
    Note over MB,NW: Worker Processing
    MB->>NW: Poll/receive message (topic: project.solution_added)
    NW->>RC: SMEMBERS project_alerts:507f1f77bcf86cd799439011
    RC-->>NW: [user123, user456]
    
    Note over NW,RC: Check Active Connections
    NW->>RC: HKEYS sse_connections:*
    RC-->>NW: [server1, server2]
    
    Note over NW,RP: Publish to User Channels
    NW->>RP: PUBLISH sse:user:user123 {notification}
    NW->>RP: PUBLISH sse:user:user456 {notification}
    
    Note over RP,SSE2: Redis Pub/Sub Distribution
    RP->>SSE1: Message for user123
    RP->>SSE2: Message for user456
    
    Note over SSE1,U2: SSE Delivery
    SSE1->>SSE1: Check local connections
    SSE1->>U1: Write SSE event
    SSE2->>SSE2: Check local connections  
    SSE2->>U2: Write SSE event
Loading

6.7.3 Practical Example: Project Status Update

Consider an external worker that updates a project's status (e.g., from 'active' to 'completed'). This worker publishes the event to the durable message bus (e.g., Azure Message Buss, Kafka or SQS) for reliable delivery.

  • Event Publishing: The worker sends a 'project.state_changed' event to the bus, including details like projectId, oldState, newState.
  • Notification Worker Processing: The worker consumes the event, queries Redis for subscribers (users with the project in alerts).
  • SSE Delivery: For each subscriber, publish to their user-specific Redis pub/sub channel. SSE servers subscribed to these channels deliver the event to connected clients.
  • Redis Role: Used only for fast subscriber lookup and pub/sub coordination—not for durable storage. If Redis is down, SSE falls back to polling, but the bus ensures the event isn't lost.

This ensures reliable notification even if SSE servers are temporarily unavailable, while Redis handles efficient real-time routing.

6.7.3 Notification Worker Implementation

IMPORTANT: The NotificationWorker runs as a separate process independent from the API servers. It:

  • Consumes events from the message bus (Kafka/Azure Service Bus)
  • Processes events to determine recipients (via external service calls, not direct repository access)
  • Publishes notifications to Redis Pub/Sub channels
  • Does NOT run inside the API process
// notification-worker.js - Runs as separate process
const createNotificationWorker = ({ cacheService, pubSubService, messageBusConsumer, logger = console }) => {

class NotificationWorker {
    constructor() {
        this.serverId = `worker-${process.pid}`;
        this.cache = cacheService;
        this.pubSub = pubSubService;
        this.consumer = messageBusConsumer;
        this.logger = logger;
        
        // Metrics
        this.metrics = {
            eventsProcessed: 0,
            notificationsSent: 0,
            errors: 0,
            lastProcessedAt: null
        };
    }
    
    async start() {
        // Subscribe to entity events from message bus
        // These are published by message-producer.js when events occur
        await this.consumer.subscribe({ 
            topics: [
                'entity.state_changed',
                'entity.updated', 
                'system.broadcast'
            ],
            fromBeginning: false 
        });
        
        await this.consumer.run({
            eachMessage: async ({ topic, message }) => {
                try {
                    const event = JSON.parse(message.value);
                    await this.handleEvent(topic, event);
                    this.metrics.eventsProcessed++;
                } catch (error) {
                    this.logger.error(`Error processing event: ${error.message}`, { topic, error });
                    this.metrics.errors++;
                }
            }
        });
        
        // Log metrics periodically
        this.metricsInterval = setInterval(
            () => this.logMetrics(),
            30000 // Every 30 seconds
        );
        
        this.logger.info(`Notification worker ${this.serverId} started`);
    }
    
    async handleEvent(topic, event) {
        try {
            this.metrics.lastProcessedAt = new Date();
            
            this.logger.debug(`Processing event: ${topic}`, { eventId: event.id, entityType: event.entityType });
            
            switch (topic) {
                case 'entity.state_changed':
                    await this.handleEntityStateChanged(event);
                    break;
                    
                case 'entity.updated':
                    await this.handleEntityUpdated(event);
                    break;
                    
                case 'system.broadcast':
                    await this.handleSystemBroadcast(event);
                    break;
                    
                default:
                    this.logger.warn(`Unknown event topic: ${topic}`);
            }
        } catch (error) {
            this.logger.error('Error processing event:', error);
            this.metrics.errors++;
            
            // Could implement retry logic or dead letter queue here
            await this.handleFailedEvent(event, error);
        }
    }
    
    async handleEntityStateChanged(event) {
        const { entityType, entityId, oldState, newState, timestamp, metadata } = event;
        
        const channelId = `entity:${entityType}:${entityId}`;
        
        // Get all users subscribed to this entity channel
        const subscribedUserIds = await this.cache.getSetMembers(
            CacheKeys.CHANNEL_SUBSCRIBERS,
            channelId
        );
        
        if (!subscribedUserIds || subscribedUserIds.length === 0) {
            this.logger.debug(`No subscribers for entity state change: ${channelId}`);
            return;
        }
        
        this.logger.info(`Found ${subscribedUserIds.length} subscribers for ${channelId}`);
        
        // Prepare notification
        const notification = {
            event: 'entity_state_changed',
            data: {
                id: `notif_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`,
                entityType,
                entityId,
                entityName: metadata?.name || entityId,
                stateTransition: {
                    from: oldState,
                    to: newState
                },
                timestamp,
                eventId: event.id
            }
        };
        
        // Send to each subscribed user
        await this.broadcastToUsers(subscribedUserIds, notification);
    }
    
    async handleEntityUpdated(event) {
        const { entityType, entityId, changes, timestamp, metadata } = event;
        
        const channelId = `entity:${entityType}:${entityId}`;
        
        // Get all users subscribed to this entity channel
        const subscribedUserIds = await this.cache.getSetMembers(
            CacheKeys.CHANNEL_SUBSCRIBERS,
            channelId
        );
        
        if (!subscribedUserIds || subscribedUserIds.length === 0) {
            this.logger.debug(`No subscribers for entity update: ${channelId}`);
            return;
        }
        
        // Prepare notification
        const notification = {
            event: 'entity_updated',
            data: {
                id: `notif_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`,
                entityType,
                entityId,
                entityName: metadata?.name || entityId,
                changes,
                timestamp,
                eventId: event.id
            }
        };
        
        // Send to each subscribed user
        await this.broadcastToUsers(subscribedUserIds, notification);
    }
    
    async handleSystemBroadcast(event) {
        const { message, severity, timestamp } = event;
        
        // Get all connected users for broadcast
        const connectedUsers = await this.cache.getSetMembers(
            CacheKeys.GLOBAL_SUBSCRIBERS,
            'global'
        );
        
        if (!connectedUsers || connectedUsers.length === 0) {
            this.logger.debug('No connected users for system broadcast');
            return;
        }
        
        // Prepare broadcast notification
        const notification = {
            event: 'system_broadcast',
            data: {
                id: `broadcast_${Date.now()}`,
                message,
                severity,
                timestamp,
                eventId: event.id
            }
        };
        
        await this.broadcastToUsers(connectedUsers, notification);
    }
    
    async broadcastToUsers(userIds, notification) {
        // Send notifications via pub/sub to user channels
        const publishPromises = userIds.map(userId => 
            this.pubSub.publish(`user:${userId}`, notification)
        );
        
        try {
            await Promise.all(publishPromises);
            this.metrics.notificationsSent += userIds.length;
            this.logger.info(`Sent ${notification.event} notification to ${userIds.length} users`);
        } catch (error) {
            this.logger.error('Error broadcasting to users:', error);
            throw error;
        }
    }
    
    async handleFailedEvent(event, error) {
        // Log the failure
        this.logger.error('Failed to process event', { 
            eventId: event.id, 
            eventType: event.type, 
            error: error.message 
        });
        
        // Could implement dead letter queue here
        // await this.sendToDeadLetterQueue(event, error);
    }
    
    logMetrics() {
        this.logger.info('Notification Worker Metrics', this.metrics);
    }
    
    async shutdown() {
        this.logger.info('Shutting down notification worker...');
        
        // Clear intervals
        if (this.metricsInterval) {
            clearInterval(this.metricsInterval);
        }
        
        // Close consumer
        if (this.consumer) {
            await this.consumer.disconnect();
        }
        
        // Close pub/sub
        if (this.pubSub) {
            await this.pubSub.disconnect();
        }
        
        this.logger.info('Notification worker shutdown complete');
    }
}

return new NotificationWorker();
};
    }

        await this.redisPub.quit();
        await this.mongo.close();
        
        console.log(`Worker ${this.serverId} stopped`);
    }
}

// Start the worker as a SEPARATE PROCESS
// This would typically be in a separate deployment/container
// Example: node notification-worker.js
// Or in Docker: CMD ["node", "notification-worker.js"]
// Or in PM2: pm2 start notification-worker.js --name "notification-worker"
if (require.main === module) {
    const worker = new NotificationWorker({
        messageBusProvider: process.env.MESSAGE_BUS_PROVIDER || 'azure',
        messageBusConfig: {
            connectionString: process.env.AZURE_SERVICE_BUS_CONNECTION_STRING,
            // or for AWS:
            // region: process.env.AWS_REGION,
            // queueUrls: { ... },
            // topicArns: { ... }
        },
        redis: {
            host: process.env.REDIS_HOST || 'localhost',
            port: process.env.REDIS_PORT || 6379
        },
        mongoUri: process.env.MONGODB_URI,
        dbName: process.env.DB_NAME || 'myapp'
    });
    
    worker.start().catch(console.error);
    
    // Graceful shutdown
    process.on('SIGINT', async () => {
        console.log('Received SIGINT, shutting down gracefully...');
        await worker.stop();
        process.exit(0);
    });
}

module.exports = NotificationWorker;

7. Technologies Used

The SSE implementation uses core technologies focused on real-time event streaming infrastructure:

7.1 SSE Service Implementation (Without Session Affinity)

The SSE service is implemented using the Module Factory pattern as detailed in section 6.2.3. Key technologies:

  • Node.js - Runtime environment
  • Redis - PubSub coordination and caching (ElastiCache, Azure Cache, Memorystore)
  • Express.js - HTTP server framework
  • ioredis - Redis client library
  • Module Factory Pattern - Dependency injection

7.2 Express Routes Implementation

// routes/sse.js
const express = require('express');
const router = express.Router();

// SSE endpoint - uses injected SSE service
router.get('/connect', authenticateMiddleware, async (req, res) => {
    const sseService = req.app.locals.sseService;
    await sseService.handleSSEConnection(req, res);
});

// Health check for SSE service
router.get('/health', (req, res) => {
    const sseService = req.app.locals.sseService;
    res.json({
        available: sseService.isAvailable(),
        connections: sseService.getConnectionCount(),
        serverId: sseService.serverId
    });
});

module.exports = router;

7.3 Nginx Configuration

# nginx.conf - No session affinity required!
upstream nodejs_cluster {
    least_conn;  # or round_robin, ip_hash NOT needed
    server node1:3000 max_fails=3 fail_timeout=30s;
    server node2:3000 max_fails=3 fail_timeout=30s;
    server node3:3000 max_fails=3 fail_timeout=30s;
    keepalive 64;
}

server {
    listen 80;
    
    # Regular API endpoints
    location /api {
        proxy_pass http://nodejs_cluster;
        proxy_http_version 1.1;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_buffering on;
        
        # Normal timeouts for regular APIs
        proxy_connect_timeout 5s;
        proxy_send_timeout 10s;
        proxy_read_timeout 10s;
    }
    
    # SSE endpoint - no session affinity needed!
    location /sse {
        proxy_pass http://nodejs_cluster;
        
        # CRITICAL SSE Settings
        proxy_buffering off;          # Don't buffer SSE streams
        proxy_cache off;              # Don't cache SSE
        proxy_http_version 1.1;
        proxy_set_header Connection '';
        
        # Long timeout for persistent connections
        proxy_read_timeout 3600s;     # 1 hour
        
        # Tell upstream not to buffer
        proxy_set_header X-Accel-Buffering no;
        
        # Headers
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header Host $host;
        
        # CORS if needed
        add_header 'Access-Control-Allow-Origin' '*' always;
        add_header 'Access-Control-Allow-Credentials' 'true' always;
        
        # No need for ip_hash or session affinity!
        # Redis Pub/Sub handles the distribution
    }
}

7.4 Client Implementation

// sse-client.js - Generic SSE client
class SSEClient {
    constructor(config = {}) {
        this.config = {
            url: '/sse/connect',
            reconnectDelay: 1000,
            maxReconnectDelay: 30000,
            maxReconnectAttempts: null,
            onConnect: () => {},
            onMessage: () => {},
            onError: () => {},
            onDisconnect: () => {},
            ...config
        };
        
        this.eventSource = null;
        this.connected = false;
        this.reconnectAttempts = 0;
        this.reconnectTimer = null;
        this.eventHandlers = new Map();
    }
    
    async connect() {
        try {
            console.log('Connecting to SSE...');
            
            const url = new URL(this.config.url, window.location.origin);
            this.eventSource = new EventSource(url, {
                withCredentials: true
            });
            
            this.setupEventHandlers();
            
        } catch (error) {
            console.error('Failed to connect:', error);
            this.scheduleReconnect();
        }
    }
    
    setupEventHandlers() {
        this.eventSource.onopen = () => {
            console.log('SSE Connected');
            this.connected = true;
            this.reconnectAttempts = 0;
            this.config.onConnect();
        };
        
        this.eventSource.onerror = (error) => {
            console.error('SSE Error:', error);
            this.connected = false;
            this.config.onError(error);
            
            if (error.status === 403 || error.status === 401) {
                console.log('Authentication error, closing connection');
                this.eventSource.close();
                return;
            }
            
            if (this.eventSource.readyState === EventSource.CLOSED) {
                this.scheduleReconnect();
            }
        };
        
        this.eventSource.onmessage = (event) => {
            try {
                const data = JSON.parse(event.data);
                console.log('SSE message received:', data);
                this.config.onMessage(data);
            } catch (error) {
                console.error('Error parsing SSE message:', error);
            }
        };
        
        this.eventSource.addEventListener('connected', (event) => {
            const data = JSON.parse(event.data);
            console.log('Connected to SSE server:', data.serverId);
        });
    }
    
    on(eventType, handler) {
        this.eventHandlers.set(eventType, handler);
        
        if (this.eventSource) {
            this.eventSource.addEventListener(eventType, (event) => {
                try {
                    const data = JSON.parse(event.data);
                    handler(data, event);
                } catch (error) {
                    console.error(`Error handling ${eventType} event:`, error);
                }
            });
        }
    }
    
    off(eventType) {
        this.eventHandlers.delete(eventType);
    }
    
    scheduleReconnect() {
        if (this.reconnectTimer) {
            clearTimeout(this.reconnectTimer);
        }
        
        if (this.config.maxReconnectAttempts && 
            this.reconnectAttempts >= this.config.maxReconnectAttempts) {
            console.log('Max reconnection attempts reached');
            this.config.onError(new Error('Max reconnection attempts reached'));
            return;
        }
        
        const delay = Math.min(
            this.config.reconnectDelay * Math.pow(2, this.reconnectAttempts),
            this.config.maxReconnectDelay
        );
        
        console.log(`Reconnecting in ${delay}ms (attempt ${this.reconnectAttempts + 1})`);
        
        this.reconnectTimer = setTimeout(() => {
            this.reconnectAttempts++;
            this.connect();
        }, delay);
    }
    
    disconnect() {
        if (this.reconnectTimer) {
            clearTimeout(this.reconnectTimer);
            this.reconnectTimer = null;
        }
        
        if (this.eventSource) {
            this.eventSource.close();
            this.eventSource = null;
        }
        
        this.connected = false;
        this.config.onDisconnect();
    }
    
    getState() {
        return {
            connected: this.connected,
            readyState: this.eventSource?.readyState,
            reconnectAttempts: this.reconnectAttempts
        };
    }
}

// Usage example:
const sseClient = new SSEClient({
    url: '/sse/connect',
    onConnect: () => console.log('SSE connected'),
    onError: (error) => console.error('SSE error:', error),
    onMessage: (data) => console.log('SSE message:', data)
});

// Register custom event handlers
sseClient.on('entity_updated', (data) => {
    console.log('Entity updated:', data);
    // Handle entity update in UI
});

sseClient.on('system_broadcast', (data) => {
    console.log('System broadcast:', data);
    // Show system announcement
});

// Connect to SSE
sseClient.connect();

8. Non-Functional Requirements

8.1 Subscription Management at Scale

SSE infrastructure must handle:

  1. Redis Memory Usage:

    • Per channel: ~50 bytes + (user_id_size × subscriber_count)
    • 10,000 channels × 100 avg subscribers = ~20-50MB
    • Connection metadata: ~5KB per connection × 10,000 = ~50MB
  2. Event Processing:

    • Channel lookup: O(1) Redis set membership check
    • Notification fanout: O(n) where n = subscribers
    • Target: <100ms processing per event
  3. SSE Connection Overhead:

    • Per connection: ~5-20KB memory
    • 10,000 concurrent users = ~50-200MB RAM per server

8.2 Optimization Strategies

// Batch notification processing for efficiency
class BatchNotificationProcessor {
    constructor(pubSubService) {
        this.pubSub = pubSubService;
        this.queue = [];
        this.batchSize = 100;
        this.batchInterval = 100; // ms
        
        setInterval(() => this.processBatch(), this.batchInterval);
    }
    
    addNotification(userId, notification) {
        this.queue.push({ userId, notification });
        
        if (this.queue.length >= this.batchSize) {
            this.processBatch();
        }
    }
    
    async processBatch() {
        if (this.queue.length === 0) return;
        
        const batch = this.queue.splice(0, this.batchSize);
        
        // Group by user for efficient publishing
        const byUser = batch.reduce((acc, item) => {
            if (!acc[item.userId]) acc[item.userId] = [];
            acc[item.userId].push(item.notification);
            return acc;
        }, {});
        
        // Batch publish to user channels
        const publishPromises = Object.entries(byUser).map(([userId, notifications]) =>
            this.pubSub.publish(`user:${userId}`, {
                event: 'batch_notifications',
                data: { notifications }
            })
        );
        
        await Promise.all(publishPromises);
    }
}

9. Known Issues

9.1 Alternative Solutions

9.1.1 Option 2: Mercure Hub (Recommended for Production) - Not implemented

Why Consider Mercure:

  • Purpose-built for SSE
  • Handles all complexity
  • 100K+ connections per instance
  • JWT-based authorization

Architecture with Mercure:

graph LR
    subgraph "Mercure Architecture"
        C[Clients] --> M[Mercure Hub]
        API1[NodeJS] --> P[Publish]
        API2[NodeJS] --> P
        API3[NodeJS] --> P
        P --> M
    end
Loading

Implementation:

// Publishing from NodeJS
const publishToMercure = async (userId, data) => {
    const jwt = generateJWT({mercure: {publish: ['*']}});

    await fetch('http://mercure/.well-known/mercure', {
        method: 'POST',
        headers: {
            'Authorization': `Bearer ${jwt}`,
            'Content-Type': 'application/x-www-form-urlencoded',
        },
        body: new URLSearchParams({
            'topic': `/users/${userId}`,
            'data': JSON.stringify(data),
            'private': 'on'
        })
    });
};

9.1.2 Option 3: Centrifugo - Not implemented

Features:

  • WebSocket + SSE support
  • Built-in presence
  • Message history
  • Channel-based messaging

9.1.3 Option 4: Custom Go Service - Not implemented

For extreme scale (500K+ connections):

// Goroutines use only ~2KB each
func handleSSE(w http.ResponseWriter, r *http.Request) {
    w.Header().Set("Content-Type", "text/event-stream")
    w.Header().Set("Cache-Control", "no-cache")
    
    flusher := w.(http.Flusher)
    
    for {
        select {
        case msg := <-messageChan:
            fmt.Fprintf(w, "data: %s\n\n", msg)
            flusher.Flush()
        case <-r.Context().Done():
            return
        }
    }
}

9.2 Cost and Performance Analysis - Not implemented

9.2.1 Performance Metrics by Solution

Solution Max Connections Memory/Connection CPU Usage Latency
NodeJS + Redis 10-50K 50KB Medium 2-5ms
Mercure 100K+ 5KB Low 2-3ms
Centrifugo 100K+ 10KB Low 2-3ms
Custom Go 500K+ 2KB Very Low 1-2ms
Nginx nchan 1M+ 1KB Minimal <1ms

9.2.2 Total Cost of Ownership (3 Years)

Solution Development Infrastructure Maintenance Total TCO
NodeJS + Redis $12,000 $0 $7,200 $19,200
Mercure $4,000 $3,600 $1,800 $9,400
Centrifugo $5,000 $7,200 $2,880 $15,080
Custom Go $20,000 $10,800 $5,400 $36,200

10. Version History

10.1 Monitoring and Alerting

10.1.1 Key Metrics

// Prometheus metrics for monitoring
const promClient = require('prom-client');

const metrics = {
    // Worker metrics
    eventsProcessed: new promClient.Counter({
        name: 'notification_events_processed_total',
        help: 'Total events processed',
        labelNames: ['event_type', 'worker_id']
    }),
    
    notificationsSent: new promClient.Counter({
        name: 'notifications_sent_total',
        help: 'Total notifications sent',
        labelNames: ['notification_type', 'delivery_method']
    }),
    
    notificationLatency: new promClient.Histogram({
        name: 'notification_latency_seconds',
        help: 'Time from event to notification delivery',
        buckets: [0.01, 0.05, 0.1, 0.5, 1, 2, 5]
    }),
    
    subscriptionCount: new promClient.Gauge({
        name: 'active_subscriptions_total',
        help: 'Total active project subscriptions',
        labelNames: ['event_type']
    }),
    
    // SSE metrics
    activeConnections: new promClient.Gauge({
        name: 'sse_active_connections',
        help: 'Current SSE connections',
        labelNames: ['server_id']
    }),
    
    sseMessagesSent: new promClient.Counter({
        name: 'sse_messages_sent_total',
        help: 'Total SSE messages sent',
        labelNames: ['server_id', 'message_type']
    }),
    
    // Redis Pub/Sub Metrics
    redisPubSubMessages: new promClient.Counter({
        name: 'redis_pubsub_messages_total',
        help: 'Total Redis pub/sub messages',
        labelNames: ['direction', 'channel_pattern']
    })
};

10.1.2 Alert Rules

groups:
  - name: notification_system
    rules:
      - alert: HighNotificationLatency
        expr: histogram_quantile(0.95, notification_delivery_latency_ms) > 1000
        for: 5m
        annotations:
          summary: "High notification latency detected"
          
      - alert: WorkerBacklog
        expr: azure_servicebus_queue_length{queue="project-events"} > 1000
        for: 10m
        annotations:
          summary: "Event processing backlog growing"
          
      - alert: LowSSEConnections
        expr: sse_active_connections < 10 AND up > 30
        for: 5m
        annotations:
          summary: "Abnormally low SSE connections"

10.2 Recommendations

10.2.1 For Your Use Case (No Session Affinity, 10K+ Connections)

10.2.1.1 Primary Recommendation: Mercure Hub

Pros:

  • Designed for SSE
  • No session affinity needed
  • Minimal code changes
  • Battle-tested at scale
  • JWT integration

Cons:

  • Additional service to deploy
  • Slight latency increase (1-2ms)
10.2.1.2 Alternative: NodeJS + Redis (If constraints exist)

When to Choose:

  • Cannot deploy additional services
  • Team prefers JavaScript-only
  • Under 50K concurrent connections

Limitations:

  • Higher memory usage
  • Complex reconnection logic
  • More code to maintain

10.2.2 Implementation Phases

Phase 1: Proof of Concept (Week 1-2)

  • Implement NodeJS + Redis locally
  • Test with 100 connections
  • Deploy notification worker
  • Setup Redis subscription cache
  • Implement basic SSE service

Phase 2: Scale Testing (Week 3)

  • Load test with 10K connections
  • Monitor memory and CPU
  • Identify bottlenecks
  • Handle solution_added events
  • Handle state_changed events
  • Implement notification queuing

Phase 3: Production Decision (Week 4)

  • If NodeJS handles load → Deploy
  • If not → Switch to Mercure
  • Build notification client library
  • Add subscription management UI
  • Implement notification display

Phase 4: Production Deployment (Week 5)

  • Deploy with monitoring
  • Implement gradual rollout
  • Set up alerts
  • Add batching for high-volume events
  • Implement subscription sync optimisation
  • Add comprehensive monitoring

10.3 Decision Matrix

10.3.1 Quick Decision Guide

Factor Your Requirement NodeJS + Redis Mercure Centrifugo Go nchan
No Session Affinity ✅ Required ✅ Works ✅ Works ✅ Works ✅ Works ✅ Works
10K+ Connections ✅ Required ⚠️ Possible ✅ Easy ✅ Easy ✅ Easy ✅ Easy
Existing Stack NodeJS/Redis ✅ Native ➕ Add Service ➕ Add Service ❌ New Lang ➕ Nginx Module
Complexity Low preferred ⚠️ Medium ✅ Low ✅ Low ❌ High ⚠️ Medium
Cost Minimize ✅ Lowest ✅ Low ⚠️ Medium ❌ High ✅ Low
Time to Market Fast ⚠️ 2-3 weeks ✅ 1 week ✅ 1 week ❌ 4-6 weeks ⚠️ 2 weeks

10.3.2 Final Recommendation

Start with NodeJS + Redis for immediate implementation, but plan for Mercure if you expect growth beyond 50K concurrent connections.

The NodeJS + Redis solution works perfectly without session affinity using the Redis Pub/Sub pattern shown in the swim lane diagram. Each server maintains only its local connections, and Redis ensures messages reach the right server.


10.4 Decision

Based on the architectural requirements:

10.4.1 Recommended Implementation

Architecture: Worker-based processing with Redis Pub/Sub for non-sticky session SSE delivery

Components:

  1. Message Bus Facade - Abstraction over Azure Service Bus/AWS SQS/Redis
  2. Notification Worker - Processes events and determines recipients
  3. Redis Pub/Sub - Distributes notifications across SSE servers
  4. NodeJS SSE Services - Multiple instances without session affinity
  5. MongoDB - Source of truth for subscriptions

10.4.2 Why This Architecture Works

  1. No Session Affinity Required: Redis Pub/Sub ensures notifications reach users regardless of which server they're connected to
  2. Scalable: Can add more SSE servers or workers independently
  3. Flexible: Message bus facade allows switching between Azure/AWS/Redis
  4. Reliable: Worker pattern with message acknowledgment ensures delivery
  5. Maintainable: Clear separation of concerns

10.5 Consequences

10.5.1 Positive

  • Users receive real-time updates for projects they care about
  • Scalable to thousands of projects with selective subscriptions
  • Scales horizontally without sticky session complexity
  • Message bus abstraction provides vendor flexibility
  • Worker pattern ensures reliable event processing
  • Redis Pub/Sub provides efficient cross-server coordination
  • Decoupled from main application flow via worker pattern
  • Reliable delivery with queueing for offline users

10.5.2 Negative

  • Additional infrastructure complexity (workers, Redis cache)
  • Additional Redis dependency for pub/sub
  • Subscription management overhead
  • Need to maintain subscription cache consistency
  • Network overhead of Redis pub/sub
  • Need to monitor multiple components
  • Potential notification storms during mass events

10.5.3 Mitigations

  • Redis Sentinel/Cluster for high availability
  • Periodic subscription sync from MongoDB
  • Connection pooling and batching for efficiency
  • Comprehensive monitoring and alerting
  • Rate limiting per user/project
  • Notification batching for rapid events
  • Circuit breakers for downstream failures
  • Graceful degradation to email/digest notifications

11. References

11.1 Security Considerations

11.1.1 Authentication & Authorization Architecture

11.1.1.1 JWT Token Strategy

The system uses a dual-token authentication strategy with secure HTTP-only cookies:

  1. Access Token

    • Short-lived JWT (expires quickly - minutes)
    • HTTP-only secure cookie (not accessible via JavaScript)
    • Used for all API requests including SSE connection establishment
    • Contains user claims and permissions
    • Must be refreshed frequently using refresh token
  2. Refresh Token

    • Long-lived JWT (extended expiry)
    • HTTP-only secure cookie
    • JTI (JWT ID) links to Redis-cached device fingerprint
    • Used only for obtaining new access tokens
    • Validated against device fingerprint for security
11.1.1.2 Client Implementation (React + Axios + TanStack Query)
// axios interceptor for token refresh
axios.interceptors.response.use(
    response => response,
    async error => {
        if (error.response?.status === 403) {
            // Access token expired - refresh it
            await refreshAccessToken();
            // Retry original request
            return axios(error.config);
        }
        return Promise.reject(error);
    }
);

11.1.2 SSE-Specific Security Implementation

11.1.2.1 SSE Connection Authentication Flow
// Client-side SSE connection with cookie-based auth
class ProjectNotificationClient {
    async connect() {
        // Cookies (access token) automatically sent with request
        const eventSource = new EventSource('/sse/connect', {
            withCredentials: true  // Include cookies
        });
        
        eventSource.onerror = (error) => {
            // If 403, access token expired
            if (error.status === 403) {
                // Close current connection
                eventSource.close();
                
                // Wait for Axios interceptor to refresh token
                setTimeout(() => {
                    this.connect(); // Reconnect with new token
                }, 1000);
            }
        };
    }
}
11.1.2.2 Server-Side SSE Authentication
// SSE endpoint with JWT validation
app.get('/sse/connect',
    authenticateMiddleware,  // Validates access token from cookie
    async (req, res) => {
        // Access token already validated by middleware
        const userId = req.user.id;
        
        // Establish SSE connection
        sseService.handleSSEConnection(req, res);
    }
);

// Authentication middleware
const authenticateMiddleware = (req, res, next) => {
    const accessToken = req.cookies.accessToken;
    
    if (!accessToken) {
        return res.status(401).json({ error: 'No access token' });
    }
    
    try {
        const decoded = jwt.verify(accessToken, ACCESS_TOKEN_SECRET);
        req.user = decoded;
        next();
    } catch (error) {
        if (error.name === 'TokenExpiredError') {
            return res.status(403).json({ error: 'Access token expired' });
        }
        return res.status(401).json({ error: 'Invalid token' });
    }
};

11.1.3 Token Expiry Handling for SSE

CRITICAL ISSUE: SSE connections can live for hours, but access tokens expire in minutes. This creates a security challenge.

11.1.3.1 The Token Lifecycle Problem
Timeline:
0 min:   User connects to SSE (access token valid)
5 min:   Access token expires in cookie
10 min:  SSE connection still open (using expired token!)
15 min:  Security vulnerability - connection outlives authorization
11.1.3.2 Solution Approaches
11.1.3.2.1 Approach 1: Connection Lifetime = Token Lifetime (MOST SECURE)
class SSEConnectionManager {
    constructor(options = {}) {
        // Connection TTL matches access token expiry
        this.maxConnectionAge = options.accessTokenTTL || 300000; // 5 minutes
        this.gracePeriod = 30000; // 30 second grace period for refresh
    }
    
    addConnection(connectionId, userId, req, res) {
        const connection = {
            id: connectionId,
            userId,
            request: req,
            response: res,
            createdAt: Date.now(),
            lastActivity: Date.now(),
            tokenExpiry: Date.now() + this.maxConnectionAge,
            // Store token metadata (not the token itself)
            tokenMetadata: {
                issuedAt: req.user.iat * 1000,
                expiresAt: req.user.exp * 1000,
                sessionId: req.user.sessionId
            }
        };
        
        // Schedule connection close before token expires
        setTimeout(() => {
            this.closeConnectionForTokenExpiry(connectionId);
        }, this.maxConnectionAge - this.gracePeriod);
        
        this.connections.set(connectionId, connection);
    }
    
    closeConnectionForTokenExpiry(connectionId) {
        const conn = this.connections.get(connectionId);
        if (!conn) return;
        
        // Send token expiry event to trigger client reconnect
        try {
            if (conn.response && !conn.response.writableEnded) {
                conn.response.write(`event: token_expiring\ndata: ${JSON.stringify({ 
                    reason: 'Access token expiring',
                    reconnectIn: this.gracePeriod,
                    timestamp: Date.now()
                })}\n\n`);
                
                // Give client time to reconnect
                setTimeout(() => {
                    this.closeConnection(connectionId, 'Token expired');
                }, this.gracePeriod);
            }
        } catch (e) {
            this.closeConnection(connectionId, 'Token expired - connection dead');
        }
    }
}
11.1.3.2.2 Approach 2: Session Validation (HYBRID)
class SSEConnectionManager {
    constructor(options = {}) {
        this.sessionCheckInterval = 60000; // Check every minute
        this.cacheService = options.cacheService;
        this.startSessionValidation();
    }
    
    startSessionValidation() {
        setInterval(async () => {
            await this.validateAllSessions();
        }, this.sessionCheckInterval);
    }
    
    async validateAllSessions() {
        for (const [connId, conn] of this.connections) {
            // Check if user session is still valid in Redis
            const sessionValid = await this.cacheService.get(
                CacheKeys.USER_SESSION,
                conn.tokenMetadata.sessionId
            );
            
            if (!sessionValid) {
                // Session invalidated (logout, token refresh failed, etc.)
                this.closeConnection(connId, 'Session invalidated');
                continue;
            }
            
            // Check if refresh token is still valid
            const refreshTokenValid = await this.cacheService.get(
                CacheKeys.REFRESH_TOKEN,
                conn.userId
            );
            
            if (!refreshTokenValid) {
                // User's refresh token expired or revoked
                this.closeConnection(connId, 'Authentication expired');
            }
        }
    }
}
11.1.3.2.3 Approach 3: Token Refresh Notification (CLIENT-DRIVEN)
// Client-side handling
class ProjectNotificationClient {
    constructor() {
        this.tokenRefreshTimer = null;
        this.accessTokenTTL = 5 * 60 * 1000; // 5 minutes
    }
    
    async connect() {
        // Clear any existing timer
        if (this.tokenRefreshTimer) {
            clearTimeout(this.tokenRefreshTimer);
        }
        
        // Schedule reconnect before token expires
        this.tokenRefreshTimer = setTimeout(() => {
            this.handleTokenExpiry();
        }, this.accessTokenTTL - 30000); // 30 seconds before expiry
        
        // Create SSE connection
        this.eventSource = new EventSource('/sse/connect', {
            withCredentials: true
        });
        
        this.setupEventHandlers();
    }
    
    async handleTokenExpiry() {
        console.log('Access token expiring, refreshing and reconnecting...');
        
        // Close current connection
        if (this.eventSource) {
            this.eventSource.close();
        }
        
        // Trigger token refresh via any API call
        try {
            await axios.get('/api/auth/ping'); // This triggers refresh
            
            // Reconnect with new token
            this.connect();
        } catch (error) {
            console.error('Token refresh failed:', error);
            // Let user know they need to re-authenticate
        }
    }
    
    setupEventHandlers() {
        // Handle server-initiated token expiry
        this.eventSource.addEventListener('token_expiring', (event) => {
            const data = JSON.parse(event.data);
            console.log('Server: token expiring', data);
            
            // Reconnect flow
            this.handleTokenExpiry();
        });
        
        // Handle forced disconnection
        this.eventSource.addEventListener('auth_expired', (event) => {
            console.log('Server: authentication expired');
            this.eventSource.close();
            
            // Trigger re-authentication flow
            window.location.href = '/login';
        });
    }
}
11.1.3.3 Recommended Implementation Strategy

Use Approach 1 (Connection Lifetime = Token Lifetime) with Client Reconnect:

  1. Server Side:

    • Force connection close at token expiry time
    • Send warning event 30 seconds before closing
    • Clean disconnect with clear reason
  2. Client Side:

    • Listen for token_expiring event
    • Proactively reconnect before forced disconnect
    • Handle reconnection with exponential backoff
  3. Security Benefits:

    • No connections outlive their authorization
    • Clean audit trail of connection lifecycle
    • Prevents zombie authenticated connections
  4. User Experience:

    • Seamless reconnection (user doesn't notice)
    • No lost events (reconnect is planned)
    • Clear error messages if auth fails
11.1.3.4 Complete Token/SSE Lifecycle Flow
1. Initial Connection (T+0):
   - Client connects with valid access token (5 min TTL)
   - Server validates token, establishes SSE
   - Server schedules disconnect at T+4:30

2. Normal Operations (T+0 to T+4:30):
   - Events flow normally
   - Access token in cookie expires at T+5:00
   - But SSE connection still authenticated

3. Pre-Expiry Warning (T+4:30):
   - Server sends "token_expiring" event
   - Client receives warning, starts reconnect

4. Client Reconnect (T+4:35):
   - Client closes old SSE connection
   - Client makes API call (triggers token refresh)
   - Axios interceptor refreshes access token
   - Client establishes new SSE connection

5. Server Cleanup (T+5:00):
   - If client didn't reconnect, server force-closes
   - Connection removed from all tracking
   - Clean security boundary maintained

6. Continuous Cycle:
   - Process repeats every 5 minutes
   - User never notices disconnection
   - Security always maintained

1794 + 1980 = 3774

11.1.3.5 Configuration Example
// Server configuration
const sseManager = new SSEConnectionManager({
    maxConnections: 5000,
    maxConnectionAge: 5 * 60 * 1000,      // Match access token TTL
    gracePeriod: 30 * 1000,                // 30 second warning
    staleTimeout: 2 * 60 * 1000,           // 2 min inactivity
    maxUserConnections: 5,
    accessTokenTTL: process.env.ACCESS_TOKEN_TTL || 300000,
    cacheService: cacheService
});

// Client configuration
const notificationClient = new ProjectNotificationClient({
    url: '/sse/connect',
    accessTokenTTL: 5 * 60 * 1000,         // Must match server
    reconnectDelay: 1000,
    maxReconnectAttempts: 5,
    onTokenExpiring: async () => {
        // Custom handling if needed
        await refreshMyTokens();
    }
});

This approach ensures:

  • Security: No connection outlives its authorization
  • Reliability: Planned reconnects, not unexpected drops
  • Performance: Connection pooling within token lifetime
  • User Experience: Transparent to end user
// SSE Service with periodic auth validation
class ProjectSSEService {
    async validateConnection(connection) {
        // Optionally validate connection is still authorized
        // Could check Redis for user session validity
        const sessionValid = await this.cache.get(
            CacheKeys.USER_SESSION,
            connection.userId
        );
        
        if (!sessionValid) {
            // Send disconnect event
            this.sendEvent(connection, {
                type: 'auth_expired',
                data: { message: 'Authentication expired, please reconnect' }
            });
            
            // Close connection
            connection.response.end();
            this.handleDisconnect(connection.id, connection.userId);
        }
    }
}

11.1.4 Security Best Practices

  1. No tokens in URLs - Tokens only in secure HTTP-only cookies
  2. Automatic token refresh - Axios interceptor handles 403 responses
  3. Device fingerprinting - Refresh tokens validated against device
  4. Redis session cache - Quick validation without database hits
  5. Channel isolation - User-specific Redis pub/sub channels
  6. Rate limiting - Applied to SSE endpoints like all APIs
  7. HTTPS only - All connections over TLS
  8. CORS configuration - Properly configured for SSE endpoints

11.2 Redis Operations and Abstraction

11.2.1 Redis Commands Used in This Architecture

The implementation uses various Redis operations beyond simple get/set:

11.2.1.1 Storage Operations
  • Basic Operations: SET, GET, DEL, EXPIRE - Connection details and temporary data
  • Hash Operations: HSET, HGET, HDEL, HKEYS, HLEN - Server-user connection mappings
  • Set Operations: SADD, SMEMBERS, SREM - Project subscriber lists
  • Sorted Set Operations: ZADD, ZRANGE, ZREMRANGEBYRANK - Pending notification queues with timestamps
11.2.1.2 Communication Operations
  • Pub/Sub: PUBLISH, SUBSCRIBE, PSUBSCRIBE, UNSUBSCRIBE - Cross-server message distribution
  • Pipeline: Batch operations for efficient bulk updates during sync
11.2.1.3 Purpose of Each Operation Group
  1. Connection Registry (Hashes) - Track which users are connected to which servers
  2. Subscription Management (Sets & Hashes) - Map projects to users and vice versa
  3. Notification Queue (Sorted Sets) - Store time-ordered pending notifications
  4. Cross-Server Communication (Pub/Sub) - Enable SSE without session affinity
  5. Performance Optimization (Pipeline) - Reduce network round trips for bulk operations

11.2.2 Redis Facade Pattern

A Redis facade abstraction is recommended for this architecture to provide:

class CacheFacade {
  // Connection Management
  async setConnection(serverId, userId, connectionId)
  async removeConnection(serverId, userId)
  async getServerConnections(serverId)
  
  // Subscription Management  
  async addSubscription(projectId, eventType, userId)
  async removeSubscription(projectId, eventType, userId)
  async getSubscribers(projectId, eventType)
  async getUserSubscriptions(userId)
  
  // Notification Queue
  async queueNotification(userId, notification)
  async getPendingNotifications(userId)
  async clearPendingNotifications(userId)
  
  // Batching
  async batchOperations(operations)
}

Benefits of Facade:

  • Swap Redis for alternatives (Memcached, Hazelcast, DynamoDB)
  • Integrated circuit breaker and failover logic
  • Simplified testing with mock implementations
  • Centralized monitoring and metrics
  • Abstracted connection pooling

Note: Pub/Sub operations should remain separate as they're handled by the existing MessageBusFacade.

11.2.3 Redis Durability and Failure Recovery

Important: Redis is used as a cache, not a source of truth. MongoDB stores all persistent data.

11.2.3.1 What Happens on Redis Restart

When Redis restarts, all in-memory data is lost:

  1. Active Connections (sse_connections:*) - Automatically recreated when clients reconnect
  2. Subscription Cache (project_subs:*, user_subs:*) - Rebuilt from MongoDB within 60 seconds
  3. Pending Notifications - Lost (acceptable for non-critical notifications)
  4. Pub/Sub Subscriptions - Re-established automatically by SSE services
11.2.3.2 Recovery Mechanism

The Notification Worker already handles Redis recovery through periodic sync:

// Worker syncs subscriptions from MongoDB every 60 seconds
this.syncInterval = setInterval(() => this.syncSubscriptions(), 60000);

async syncSubscriptions() {
    // Clear old cache
    const projectSubKeys = await this.redis.keys('project_subs:*');
    if (projectSubKeys.length > 0) {
        await this.redis.del(...projectSubKeys);
    }
    
    // Rebuild from MongoDB (source of truth)
    const users = await this.db.collection('users')
        .find({ 'projectSubscriptions.0': { $exists: true } })
        .toArray();
    
    // Populate Redis cache
    // ... (batch operations to rebuild subscription mappings)
}
11.2.3.3 Enhanced Recovery Strategy

To minimise the recovery window:

  1. Immediate Sync on Redis Connection
this.redis.on('ready', async () => {
    await this.syncSubscriptions();
});
  1. Fallback to MongoDB During Cache Miss
async getSubscribers(projectId, eventType) {
    let subscribers = await this.redis.smembers(`project_subs:${projectId}:${eventType}`);
    
    if (subscribers.length === 0 && !await this.redis.exists('cache_sync_timestamp')) {
        // Cache invalid, query MongoDB directly
        subscribers = await this.queryMongoDBForSubscribers(projectId, eventType);
        setImmediate(() => this.syncSubscriptions());
    }
    
    return subscribers;
}
  1. Circuit Breaker with Local Fallback
  • When Redis is unavailable, fall back to local in-memory map per API instance
  • Automatically reconnect and resync when Redis returns

Key Point: The system remains operational during Redis failures, with temporary degradation in notification routing that self-heals within 60 seconds.

11.3 Additional Considerations

11.3.1 Error Handling & Recovery

While not exhaustively covered in this document, the implementation includes:

  • Redis circuit breaker - Automatically fails over to local in-memory map per API instance when Redis is unavailable, then flips back to Redis once connectivity is restored
  • MongoDB fallback - Query source of truth directly when cache is invalid
  • Automatic cache rebuild - Subscriptions resynced from MongoDB every 60 seconds
  • Redis connection retry logic with exponential backoff
  • Worker failure recovery through message bus acknowledgment patterns
  • Client-side automatic reconnection with backoff
  • Graceful degradation to queued notifications for offline users

11.3.2 Testing & Monitoring

  • Load testing should target the specific connection limits mentioned (10K+ concurrent)
  • Prometheus metrics provided enable comprehensive monitoring
  • Health check endpoints allow for orchestrator integration
  • Detailed testing strategy is out of scope for this ADR

11.3.3 Deployment & Operations

  • Deployment guides, CI/CD pipelines, and operational runbooks will be documented separately
  • Container orchestration, scaling policies, and infrastructure as code are not yet documented
  • These aspects will be addressed in dedicated deployment and operations documentation

11.3.4 Data Consistency

  • Periodic sync between MongoDB and Redis subscription cache (every 60 seconds)
  • Event-driven cache updates for real-time consistency
  • Message bus ensures at-least-once delivery semantics
  • MongoDB replica set with automatic failover (infrastructure level, out of scope for this document)

11.4 Conclusion

SSE without session affinity is completely achievable using Redis Pub/Sub for coordination. The key insight is that the server handling the connection doesn't need to be the server processing the business logic - they communicate through Redis.

For production at scale, consider Mercure or Centrifugo as they handle all the complexity for you, but the NodeJS + Redis solution is production-ready for moderate scales and provides a solid foundation for project-based event notifications.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment