- Project Description: Collaborative platform that enables organisations to manage projects, contributions, solutions, and voting mechanisms with AI-driven analysis
- Date Created: September 2025
- Last Updated: September 2025
- Project Information
- Table of Contents
- 1. Introduction
- 2. Architectural Overview
- 3. Design Details
- 4. Data Flow
- 5. Deployment Strategy
- 6. Infrastructure
- 6.1 Data Access Layer - Repository Pattern
- 6.2 Cache Service Integration - Module Factory Pattern
- 6.3 SSE Channel Subscription Examples
- 6.4 PubSub Service Facade - Multi-Backend Support
- 6.4.1 Architectural Rationale: Why Redis Pub/Sub in Addition to Message Queues?
- 6.4.2 PubSub Service Interface
- 6.4.3 Redis Adapter Implementation (Multi-Cloud)
- 6.4.4 Multi-Cloud Redis Configuration
- 6.4.5 Connection Lifecycle Management
- 6.5 SSE Subscription Management
- 6.6 Message Bus Facade Pattern (Existing Implementation)
- 6.7 Event Processing Architecture
- 7. Technologies Used
- 8. Non-Functional Requirements
- 9. Known Issues
- 10. Version History
- 11. References
- 11.1 Security Considerations
- 11.1.1 Authentication & Authorization Architecture
- 11.1.2 SSE-Specific Security Implementation
- 11.1.3 Token Expiry Handling for SSE
- 11.1.4 Security Best Practices
- 11.2 Redis Operations and Abstraction
- 11.3 Additional Considerations
- 11.4 Conclusion
- 11.1 Security Considerations
This architecture decision record (ADR) analyses Server-Sent Events (SSE) implementation strategies for a distributed NodeJS environment without session affinity for project event notifications in a collaborative platform.
This ADR covers the implementation of Server-Sent Events (SSE) infrastructure for distributed NodeJS cluster without session affinity. The architecture provides real-time event streaming capabilities where external systems can publish events to specific channels and connected users receive notifications. This is purely an SSE implementation - the actual subscription management and alert logic are handled by separate services/projects using proxy patterns around repositories. However, this ADR can not be defined in a vacuum and consequently will include demo code and concepts that will show how it is intended to be used.
This document analyses Server-Sent Events (SSE) implementation strategies for a distributed NodeJS environment without session affinity.
What is session affinity? Session Affinity (also called Sticky Sessions) forces all requests from a client to be routed to the same server. This is typically achieved through load balancer configuration that uses cookies or IP addresses to maintain client-server affinity.
Why Avoid Session Affinity?
- Reduced scalability: Cannot freely distribute load across servers
- Complex failover: If a server fails, all its "stuck" clients lose their sessions
- Inefficient resource usage: Some servers may be overloaded while others are idle
- Deployment challenges: Rolling updates become complex when clients are pinned to servers
- Cloud-native incompatibility: Contradicts stateless, horizontally-scalable architecture principles
The architecture must support event-driven workflows, where users receive various types of events (such as solution additions, project changes, messages, and system state changes). Originally intended for project state changes, users can specify the alerts they desire. With external workers publishing events to a durable message bus (Kafka, Azure Service Bus, or AWS SQS), a notification worker processes these events and delivers targeted notifications via SSE, using Redis for coordination. This is important because external resources (such as workers) may need to signal to the client that a server event has occurred.
Key Critical Finding: Retain the durable message bus for event publishing from external workers, using Redis solely for SSE coordination (pub/sub and subscription caching) across multiple NodeJS servers without session affinity. Dedicated services like Mercure could provide better scalability if needed but for this release we will focus on Redis and Coordination within the NodeJS API servers.
Without session affinity, a client's SSE connection goes to a random server, but events might originate from any server in the cluster. We need a way to route messages to the correct server. Additionally, we face security challenges with token lifecycle management.
sequenceDiagram
participant Client
participant LoadBalancer
participant Server1
participant Server2
participant Server3
participant TokenSystem
Client->>LoadBalancer: SSE Connection Request
LoadBalancer->>Server1: Randomly Assigns Connection
Note over Server1: Client connected, but events may originate elsewhere
Server2->>LoadBalancer: Event Generated
Server3->>LoadBalancer: Event Generated
Note over LoadBalancer: Challenge: Route events to correct server
LoadBalancer-->>Server1: Needs routing solution
Client->>TokenSystem: Request Token
TokenSystem-->>Client: Issues Token
Note over TokenSystem: Security Challenge: Token Lifecycle Management
sequenceDiagram
participant C1 as Client Browser Tab 1
participant C2 as Client Browser Tab 2
participant M as Mobile Device
participant N as Nginx (Load Balancer)
participant S1 as API Server 1
participant S2 as API Server 2
participant S3 as API Server 3
participant PS as PubSub Service
participant CS as Cache Service
Note over C1: Multiple Connection Support
C1 ->> N: GET /sse/connect (tab 1)
N ->> S2: Forward request (random)
S2 ->> S2: Create connection ID: conn-123
S2 ->> CS: Store connection metadata
S2 ->> PS: Subscribe to user:userId (if first)
S2 -->> C1: event: connected\ndata: {"connectionId": "conn-123"}
C2 ->> N: GET /sse/connect (tab 2)
N ->> S2: Forward request (may hit same server)
S2 ->> S2: Create connection ID: conn-456
S2 ->> CS: Add to existing user connections
Note over S2: User now has 2 connections on S2
S2 -->> C2: event: connected\ndata: {"connectionId": "conn-456"}
M ->> N: GET /sse/connect (mobile)
N ->> S1: Forward request (different server)
S1 ->> S1: Create connection ID: conn-789
S1 ->> CS: Store connection metadata
S1 ->> PS: Subscribe to user:userId
S1 -->> M: event: connected\ndata: {"connectionId": "conn-789"}
Note over C1: Event Publishing Flow
C1 ->> N: POST /api/send-message
N ->> S3: Forward request (any server)
S3 ->> S3: Process message/event
S3 ->> PS: Publish to user:userId channel
Note over PS, S3: Message Distribution to All Connections
PS -->> S1: Message via subscription
S1 ->> S1: Find connections for userId
S1 -->> M: event: message\ndata: {"text": "Hello"}
PS -->> S2: Message via subscription
S2 ->> S2: Find connections for userId (has 2!)
S2 -->> C1: event: message\ndata: {"text": "Hello"}
S2 -->> C2: event: message\ndata: {"text": "Hello"}
PS -->> S3: Message (no connections)
Note over S3: S3 ignores - no connections
Note over C1: Connection Cleanup
C2 --x N: Tab closed
N --x S2: Connection terminated
S2 ->> S2: Remove conn-456 from Set
S2 ->> CS: Remove connection metadata
Note over S2: Still has conn-123, keeps subscription
C1 --x N: Last tab closed
N --x S2: Connection terminated
S2 ->> S2: Remove conn-123 from Set
S2 ->> CS: Remove connection metadata
S2 ->> PS: Unsubscribe from user:userId
Note over S2: No more connections for this user
// Client connects to any available server (multiple tabs/devices supported)
const eventSource = new EventSource('/sse/connect');
// Nginx randomly selects Server 2
// Server 2 handles the connection:
// - Validates authentication
// - Generates unique connection ID (includes timestamp, device info)
// - Adds connection to user's connection Set in local memory
// - Registers connection in Cache Service with connection ID
// - Subscribes to user's channel via PubSub Service (if first connection)
// - Keeps HTTP connection open
// Connection tracking structure:
// localConnections = Map<userId, Set<{
// connectionId: string,
// response: ServerResponse,
// deviceInfo: string,
// connectedAt: Date
// }>>With this system we can coordinate internal messages via an API, a nice side effect that we can also use to test critical componets.
// Any API server can publish messages
// Example: Server 1 receives a POST request
app.post('/api/send-message', async (req, res) => {
// Save to MongoDB
await db.messages.insert({...});
// Publish via PubSub - Server 1 doesn't need the connection!
await pubSubService.publish(`user:${targetUserId}`, JSON.stringify({
event: 'message',
data: messageData
}));
});// ALL servers receive the PubSub message
// Each server checks if it has ANY connections for this user
// Server 2 (has one or more connections for this user):
// This is handled internally by the PubSub service subscription
pubSubService.subscribe(`user:${userId}`, (message) => {
// userId is already known from the subscription
// CRITICAL: User may have multiple connections (tabs, devices)
// and the connections could be scattered across the cluster
const userConnections = localConnections.get(userId); // Returns Set or Array
if (userConnections && userConnections.size > 0) {
// Send to ALL connections for this user on this server
userConnections.forEach(connection => {
try {
connection.write(`event: message\ndata: ${message}\n\n`);
} catch (err) {
// Connection might be dead, clean it up
userConnections.delete(connection);
if (userConnections.size === 0) {
// No more connections for this user on this server
localConnections.delete(userId);
pubSubService.unsubscribe(`user:${userId}`);
}
}
});
}
// Servers 1 & 3: No local connections for this user, ignore message
});
// Connection Storage Structure:
// localConnections = Map<userId, Set<connectionObject>>
// Each user can have multiple connection objects (different tabs/devices)- Connection State: Each server only maintains its own local connections
- Redis Pub/Sub: Acts as a message broker between all servers
- Channel Subscription: Servers only subscribe to channels for their connected users
- Stateless Publishing: Any server can publish without knowing where connections live
- Handle 10,000+ concurrent SSE connections
- No session affinity / sticky sessions (by design choice)
- Work with existing MongoDB and Message Bus (Kafka/Azure Service Bus)
- Use abstracted Cache Service (Redis implementation with circuit breaker)
- NO direct database access - All data operations MUST go through Repository pattern
- Resilience: System must continue functioning if cache service fails (without real-time features)
- Circuit Breaker: Automatic fallback when cache service is unavailable
- Seamless failover if a server dies
- Prevent message duplication
- Support both local and production environments
- External systems can publish events to SSE channels
- System handles thousands of concurrent SSE connections
- Worker-based event publishing from message bus
- Cache-based subscription coordination with graceful degradation
- Delivers any event types as requested by external systems
When the cache service (Redis) fails:
- Circuit breaker activates - Prevents cascading failures
- SSE endpoints return 503 - Service temporarily unavailable
- System continues operating - Core functionality remains intact
- Real-time features disabled - Clients must poll or wait
- Automatic recovery - When cache returns, SSE re-enables
- Better load distribution - Connections spread evenly
- Improved fault tolerance - Client reconnects to any available server
- Easier scaling - Add/remove servers without session migration
- Simplified deployment - No session affinity configuration
Our load balancer does not use session affinity by design. This means:
- Any user's SSE connection can land on any NodeJS server
- A user might have multiple connections across different servers (multiple tabs)
- Notifications must reach users regardless of which server hosts their connection
- Redis Pub/Sub becomes essential for cross-server coordination
graph TB
subgraph "Current Setup"
LB[Nginx Load Balancer<br/>Without Session Affinity]
subgraph "API Cluster"
API1[NodeJS API 1]
API2[NodeJS API 2]
API3[NodeJS API 3]
end
subgraph "Infrastructure"
REDIS[(Redis)]
ASB[Azure Message Bus /<br/>AWS SQS /<br/>Kafka]
end
CLIENT[Clients] --> LB
LB --> API1
LB --> API2
LB --> API3
API1 <--> REDIS
API2 <--> REDIS
API3 <--> REDIS
API1 --> ASB
API2 --> ASB
API3 --> ASB
end
graph LR
subgraph "Current System"
PROJECT[Project Service] --> EMITTER[Global Event Emitter]
EMITTER --> PRODUCER[MessageProducer<br/>message-producer.js]
PRODUCER --> MB[Azure Message Bus /<br/>AWS SQS /<br/>Kafka]
MB --> DOWNSTREAM[Downstream Systems]
end
graph TB
subgraph Production Infra Structure
REDIS((Redis Cache))
MB[Azure Message Bus /<br/>AWS SQS /<br/>Kafka]
DB[(Mongo DataBase)]
end
subgraph API
PROJECT[Project Service] --> REPO
REPO --> DB
REPO --> EMITTER[Global Event Emitter]
EMITTER --> PRODUCER[MessageProducer<br/>message-producer.js]
PRODUCER --> MB
SSE[SSE Service] --> REDIS
SSE --> USERS[Connected Users]
end
subgraph Worker
MB --> WORKER[Event Notification Worker]
WORKER --> REDIS
end
subgraph Down Stream
MB --> DOWNSTREAM[Other Systems]
end
CRITICAL: SSE Service should not directly access repositories. Database changes that need to trigger SSE events should use a proxy pattern around repositories to call the SSE Service. This is for another ADR and is only here for Demonstration Purposes.
CRITICAL: this is not part of this ADR, it is here for demonstration purposes only.
// project-repository.js - Module Factory Pattern
module.exports = ({
User, // Mongoose model
Organization, // Mongoose model
Role, // Mongoose model
Project, // Mongoose model
mongoose,
mongoSaveOptions,
cryptology,
globalEventEmitter,
calculateLeastPrivilegePermissionsDictionary,
compactLeastPrivilegePermissionsDictionary,
SSERepositoryProxyInstance,
}) => {
// Repository functions for User operations
const updateProjectStatus = async (projectId, changes) => { /** ... update the status of a project ... **/ }
return {
updateProjectStatus: SSERepositoryProxyInstance.proxy( updateProjectStatus, 'key-to-send-to-event-subscribers' ),
}
}Repository changes that need to trigger SSE events should use a proxy pattern. Again this is here ONLY for demonstration purposes and is NOT part of this ADR.
// Repository Proxy for SSE Integration
class SSERepositoryProxy {
constructor( globalEventEmiiter ) {
this.globalEventEmiiter = globalEventEmiiter;
}
async async proxy(fn, changes) {
return async ( ..args ) => {
globalEventEmiiter.emit( 'sse-event', xxxx )
return fn( ..args )
};
}
}CRITICAL: The SSE Service follows the Module Factory pattern for dependency injection, consistent with the rest of the codebase.
The SSE Service requires extended cache operations beyond basic get/set:
-
Connection Management
- Store/retrieve active connection information per server
- Hash operations for server→user→connection mappings
- Connection metadata storage with TTL
-
Subscription Caching
- Cache user's SSE channel subscriptions
- Set operations for channel→subscribers mappings
- Set operations for user→channels mappings
- TTL-based cache invalidation
-
Pending Notifications
- Sorted set operations for time-ordered notification queues
- Automatic expiry for old notifications
- Batch retrieval and clearing
-
Fallback Handling
- Graceful degradation when cache is disabled
- In-memory fallback for critical operations
- Repository fallback for data retrieval
- Members: Individual items in a set (e.g., user IDs in a project's subscriber list)
- Identifier: The unique key for the entity (e.g., projectId, userId, 'global')
- Set: A collection of unique values (e.g., all users subscribed to project-123)
- Sorted Set: A collection ordered by score (e.g., notifications ordered by timestamp)
This architecture supports multiple SSE channel scopes:
-
User-Specific Channels (
user:${userId})- Direct messages to a specific user
- Personal notifications
-
Entity Channels (
entity:${entityType}:${entityId})- Users subscribed to specific entities
- Entity updates, state changes
-
Global Broadcasts (
broadcast:global)- System-wide announcements to all connected users
- Maintenance notices, feature releases
-
Topic-Based Channels (extensible pattern)
topic:${topicName}- Topic subscriberscategory:${categoryId}- Category followers- Any pattern following
channelType:identifier
// Additional CacheKeys needed for SSE
const CacheKeys = {
// ... existing keys ...
SSE_CONNECTION: Symbol('sseConnection'),
SSE_SERVER_CONNECTIONS: Symbol('sseServerConnections'),
// SSE subscription keys by type
USER_SUBSCRIPTIONS: Symbol('userSubscriptions'), // user -> channels
CHANNEL_SUBSCRIBERS: Symbol('channelSubscribers'), // channel -> users
GLOBAL_SUBSCRIBERS: Symbol('globalSubscribers'), // all connected users
TOPIC_SUBSCRIBERS: Symbol('topicSubscribers'), // topic -> users
PENDING_NOTIFICATIONS: Symbol('pendingNotifications'),
}
/**
* Extended CacheService methods required for SSE support
* @class CacheService
* @extends BaseCacheService
*/
class CacheService {
/**
* Add members to a Set (for subscription tracking)
* @async
* @param {Symbol} keyType - Cache key type (e.g., PROJECT_SUBSCRIBERS)
* @param {string} identifier - Entity ID (e.g., 'project-123', 'org-456', 'global')
* @param {string|string[]} members - User ID(s) to add to subscription
* @param {Object} [options] - Options
* @param {Function} [options.onDisabled] - Callback when cache disabled
* @returns {Promise<number|null>} Number added or null if disabled
*
* @example
* // Subscribe user-789 to entity channel
* await cache.addToSet(CacheKeys.CHANNEL_SUBSCRIBERS, 'entity:project:123', 'user-789')
*
* // Subscribe multiple users to topic
* await cache.addToSet(CacheKeys.TOPIC_SUBSCRIBERS, 'ai-research', ['user-1', 'user-2'])
*
* // Add user to global broadcast list
* await cache.addToSet(CacheKeys.GLOBAL_SUBSCRIBERS, 'global', userId)
*/
async addToSet(keyType, identifier, members, options)
/**
* Remove members from a Set
* @async
* @param {Symbol} keyType - Cache key type symbol
* @param {string} identifier - Unique identifier
* @param {string|string[]} members - Member(s) to remove
* @param {Object} [options] - Options
* @param {Function} [options.onDisabled] - Callback when cache disabled
* @returns {Promise<number|null>} Number removed or null if disabled
*/
async removeFromSet(keyType, identifier, members, options)
/**
* Get all members of a Set
* @async
* @param {Symbol} keyType - Cache key type symbol
* @param {string} identifier - Unique identifier
* @param {Object} [options] - Options
* @param {Function} [options.onDisabled] - Callback when cache disabled
* @returns {Promise<string[]>} Array of members or empty array
*/
async getSetMembers(keyType, identifier, options)
/**
* Add member to Sorted Set with score (for pending notifications)
* @async
* @param {Symbol} keyType - Cache key type symbol
* @param {string} identifier - Unique identifier
* @param {number} score - Score (typically timestamp)
* @param {*} member - Member to add (will be JSON stringified)
* @param {Object} [options] - Options
* @param {Function} [options.onDisabled] - Callback when cache disabled
* @returns {Promise<number|null>} 1 if added, 0 if updated, null if disabled
*/
async addToSortedSet(keyType, identifier, score, member, options)
/**
* Get range from Sorted Set
* @async
* @param {Symbol} keyType - Cache key type symbol
* @param {string} identifier - Unique identifier
* @param {number} start - Start index (0-based, can be negative)
* @param {number} stop - Stop index (-1 for last)
* @param {Object} [options] - Options
* @param {Function} [options.onDisabled] - Callback when cache disabled
* @returns {Promise<Array>} Array of parsed members or empty array
*/
async getSortedSetRange(keyType, identifier, start, stop, options)
/**
* Set field in Hash (for connection metadata)
* @async
* @param {Symbol} keyType - Cache key type symbol
* @param {string} identifier - Unique identifier
* @param {string} field - Field name
* @param {*} value - Value (will be JSON stringified)
* @param {Object} [options] - Options
* @param {Function} [options.onDisabled] - Callback when cache disabled
* @returns {Promise<number|null>} 1 if new, 0 if updated, null if disabled
*/
async setHashField(keyType, identifier, field, value, options)
/**
* Get field from Hash
* @async
* @param {Symbol} keyType - Cache key type symbol
* @param {string} identifier - Unique identifier
* @param {string} field - Field name
* @param {Object} [options] - Options
* @param {Function} [options.onDisabled] - Callback when cache disabled
* @returns {Promise<*|null>} Parsed value or null
*/
async getHashField(keyType, identifier, field, options)
/**
* Remove field from Hash
* @async
* @param {Symbol} keyType - Cache key type symbol
* @param {string} identifier - Unique identifier
* @param {string} field - Field name
* @param {Object} [options] - Options
* @param {Function} [options.onDisabled] - Callback when cache disabled
* @returns {Promise<number|null>} Number removed or null if disabled
*/
async removeHashField(keyType, identifier, field, options)
/**
* Get all fields from Hash
* @async
* @param {Symbol} keyType - Cache key type symbol
* @param {string} identifier - Unique identifier
* @param {Object} [options] - Options
* @param {Function} [options.onDisabled] - Callback when cache disabled
* @returns {Promise<Object>} Object with field-value pairs or empty object
*/
async getAllHashFields(keyType, identifier, options)
}
// Implementation details:
// - All methods follow existing assert validation patterns
// - All methods support onDisabled callback for graceful degradation
// - Complex values are JSON stringified/parsed automatically
// - Methods return null when cache disabled, appropriate types otherwise
// - Consistent error handling and logging patterns maintained
// - The implementation should be swapable so that other vendor solutions can be injected in./**
* SSE Service Module Factory
* Manages Server-Sent Events connections without session affinity
* @module services/sse-service
* @param {Object} dependencies - Injected dependencies
* @param {Object} dependencies.cacheService - Cache service for connection tracking
* @param {Object} dependencies.pubSubService - PubSub service for cross-server communication
* @param {Object} dependencies.CacheKeys - Cache key symbols
* @param {Object} dependencies.logger - Logger instance
* @param {string} [dependencies.serverId] - Server identifier
* @param {number} [dependencies.heartbeatInterval] - Heartbeat interval in ms
* @param {number} [dependencies.connectionTimeout] - Connection timeout in ms
* @param {number} [dependencies.maxPendingNotifications] - Max pending notifications
* @returns {ProjectSSEService} SSE service instance
*/
module.exports = ({
// Required dependencies
cacheService, // Cache service instance
pubSubService, // PubSub service instance
CacheKeys, // Cache key symbols
// Optional dependencies with defaults (for testing overrides)
serverId = `server-${process.pid}`,
heartbeatInterval = 30000,
connectionTimeout = 120000,
maxPendingNotifications = 100,
logger = console
}) => {
/**
* @class ProjectSSEService
* @description Manages SSE connections and event distribution
*/
class ProjectSSEService {
constructor() {
this.serverId = serverId;
this.cache = cacheService;
this.pubSub = pubSubService;
this.logger = logger;
// Local in-memory tracking (per server)
this.connections = new Map();
this.userConnections = new Map();
this.subscriptions = new Map();
// Start maintenance tasks
this.startHeartbeat();
}
/**
* Handles new SSE connection from client
* @async
* @param {Object} req - Express request object
* @param {Object} res - Express response object
* @returns {Promise<void>}
* @throws {Error} If cache service is unavailable
*/
async handleSSEConnection(req, res) {
// Check if services are available
if (!this.cache.isAvailable() || !this.pubSub.isAvailable()) {
res.status(503).json({
error: 'Real-time service temporarily unavailable',
fallback: 'Please use polling endpoints'
});
return;
}
const userId = req.user.id;
const connectionId = this.generateConnectionId();
// Set SSE headers
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'X-Accel-Buffering': 'no'
});
// Store connection locally
this.storeLocalConnection(connectionId, userId, req, res);
// Register in cache with metadata
await this.registerConnectionInCache(connectionId, userId, req);
// Subscribe to user's channel
await this.subscribeToUserChannel(userId);
// Send initial connection event
this.sendEvent(res, 'connected', {
connectionId,
serverId: this.serverId
});
// SSE connection established - subscriptions are managed externally
// Handle disconnect
req.on('close', () => {
this.handleDisconnect(connectionId, userId);
});
}
/**
* Stores connection in local memory (supports multiple connections per user)
* @private
* @param {string} connectionId - Connection identifier
* @param {string} userId - User identifier
* @param {Object} req - Request object
* @param {Object} res - Response object
*/
storeLocalConnection(connectionId, userId, req, res) {
const connection = {
id: connectionId,
userId,
request: req,
response: res,
deviceInfo: req.headers['user-agent'] || 'unknown',
createdAt: Date.now(),
lastActivity: Date.now()
};
this.connections.set(connectionId, connection);
// Support multiple connections per user (tabs, devices)
if (!this.userConnections.has(userId)) {
this.userConnections.set(userId, new Set());
}
this.userConnections.get(userId).add(connectionId);
this.logger.debug(`Connection ${connectionId} established for user ${userId} (${this.userConnections.get(userId).size} active connections)`);
}
/**
* Registers connection in cache service
* @private
* @async
* @param {string} connectionId - Connection identifier
* @param {string} userId - User identifier
* @param {Object} req - Request object
* @returns {Promise<void>}
*/
async registerConnectionInCache(connectionId, userId, req) {
// Store connection metadata in cache
await this.cache.setHashField(
CacheKeys.SSE_SERVER_CONNECTIONS,
this.serverId,
`${userId}:${connectionId}`,
{
connectionId,
userId,
createdAt: Date.now(),
lastActivity: Date.now(),
userAgent: req.headers['user-agent'],
ip: req.ip
},
{
onDisabled: () => {
this.logger.warn('Cache disabled - SSE will not work across servers');
}
}
);
// Set TTL on connection
await this.cache.set(
CacheKeys.SSE_CONNECTION,
connectionId,
{
userId,
serverId: this.serverId,
createdAt: Date.now()
},
{ ttlSeconds: this.connectionTimeout / 1000 }
);
}
/**
* Subscribes to user's PubSub channel
* @private
* @async
* @param {string} userId - User identifier
* @returns {Promise<void>}
*/
async subscribeToUserChannel(userId) {
if (this.subscriptions.has(userId)) {
return; // Already subscribed
}
const subscriptionId = await this.pubSub.subscribe(
`user:${userId}`,
(message) => {
this.handlePubSubMessage(userId, message);
}
);
this.subscriptions.set(userId, subscriptionId);
this.logger.debug(`Subscribed to channel user:${userId}`);
}
/**
* Handles message from PubSub
* @private
* @param {string} userId - User identifier
* @param {Object} message - Message payload
*/
handlePubSubMessage(userId, message) {
const userConns = this.userConnections.get(userId);
if (!userConns || userConns.size === 0) {
return; // No local connections for this user
}
// Send to all user's connections on this server
for (const connId of userConns) {
const conn = this.connections.get(connId);
if (conn) {
this.sendEvent(conn.response, message.event, message.data);
conn.lastActivity = Date.now();
}
}
}
/**
* Subscribe user to a channel (called externally by subscription management)
* @async
* @param {string} userId - User identifier
* @param {string} channelId - Channel to subscribe to
* @returns {Promise<void>}
*/
async subscribeUserToChannel(userId, channelId) {
// Add user to channel's subscriber set
await this.cache.addToSet(
CacheKeys.CHANNEL_SUBSCRIBERS,
channelId,
userId
);
// Track user's subscriptions
await this.cache.addToSet(
CacheKeys.USER_SUBSCRIPTIONS,
userId,
channelId
);
}
/**
* Unsubscribe user from a channel
* @async
* @param {string} userId - User identifier
* @param {string} channelId - Channel to unsubscribe from
* @returns {Promise<void>}
*/
async unsubscribeUserFromChannel(userId, channelId) {
// Remove user from channel's subscriber set
await this.cache.removeFromSet(
CacheKeys.CHANNEL_SUBSCRIBERS,
channelId,
userId
);
// Remove from user's subscriptions
await this.cache.removeFromSet(
CacheKeys.USER_SUBSCRIPTIONS,
userId,
channelId
);
}
/**
* Sends SSE event to client
* @private
* @param {Object} res - Response object
* @param {string} event - Event type
* @param {Object} data - Event data
*/
sendEvent(res, event, data) {
try {
if (!res.writableEnded) {
res.write(`event: ${event}\n`);
res.write(`data: ${JSON.stringify(data)}\n\n`);
}
} catch (error) {
this.logger.error(`Failed to send event: ${error.message}`);
}
}
/**
* Handles connection disconnect
* @private
* @param {string} connectionId - Connection identifier
* @param {string} userId - User identifier
*/
handleDisconnect(connectionId, userId) {
// Remove from local maps
this.connections.delete(connectionId);
const userConns = this.userConnections.get(userId);
if (userConns) {
userConns.delete(connectionId);
if (userConns.size === 0) {
this.userConnections.delete(userId);
// Unsubscribe from PubSub if no more connections
const subscriptionId = this.subscriptions.get(userId);
if (subscriptionId) {
this.pubSub.unsubscribe(subscriptionId).catch(err => {
this.logger.error(`Failed to unsubscribe: ${err.message}`);
});
this.subscriptions.delete(userId);
}
}
}
// Remove from cache
this.cache.removeHashField(
CacheKeys.SSE_SERVER_CONNECTIONS,
this.serverId,
`${userId}:${connectionId}`
).catch(err => {
this.logger.error(`Failed to remove from cache: ${err.message}`);
});
this.logger.debug(`Connection ${connectionId} disconnected`);
}
/**
* Generates unique connection identifier
* @private
* @returns {string} Connection ID
*/
generateConnectionId() {
return `${this.serverId}_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}
/**
* Starts heartbeat interval
* @private
*/
startHeartbeat() {
setInterval(() => {
this.sendHeartbeats();
}, this.heartbeatInterval);
}
/**
* Sends heartbeats to all connections
* @private
*/
sendHeartbeats() {
const now = Date.now();
for (const [connId, conn] of this.connections) {
// Check for stale connections
if (now - conn.lastActivity > this.connectionTimeout) {
this.handleDisconnect(connId, conn.userId);
continue;
}
// Send heartbeat
try {
conn.response.write(': heartbeat\n\n');
} catch (error) {
// Connection dead
this.handleDisconnect(connId, conn.userId);
}
}
}
}
return new ProjectSSEService();
}/**
* Application startup with SSE service configuration
* Demonstrates proper dependency injection following Module Factory pattern
* @module app/startup
*/
// Import service factories
const createCacheService = require('./src/utils/cache-service');
const createPubSubService = require('./src/services/pubsub-service');
const createSSEService = require('./src/services/sse-service');
const createUserRepository = require('./src/repositories/user-repository');
/**
* Initialize cache service with circuit breaker
* Cache implementation is abstracted - could be Redis, MongoDB, etc.
*/
const { CacheService, CacheKeys } = createCacheService(cacheImplementation);
/**
* Initialize PubSub service with configured provider
* Supports Redis, SQS, Azure Service Bus, Kafka, AMQP
*/
const pubSubService = createPubSubService({
provider: process.env.PUBSUB_PROVIDER || 'redis',
connectionOptions: {
// Provider-specific options injected here
host: process.env.CACHE_HOST,
port: process.env.CACHE_PORT,
// OR for Azure: connectionString: process.env.AZURE_CONNECTION
// OR for AWS: region: process.env.AWS_REGION
},
logger: appLogger
});
/**
* Create SSE service with dependencies injected
* NO direct database access - uses proxy pattern for repository integration
*/
const sseService = createSSEService({
// Required dependencies - all are facades/abstractions
cacheService: CacheService, // Cache operations facade
pubSubService: pubSubService, // PubSub operations facade
CacheKeys: CacheKeys, // Cache key symbols for type safety
// Optional configuration overrides for testing
serverId: process.env.SSE_SERVER_ID || `api-${process.pid}`,
heartbeatInterval: parseInt(process.env.SSE_HEARTBEAT) || 30000,
connectionTimeout: parseInt(process.env.SSE_TIMEOUT) || 120000,
maxPendingNotifications: parseInt(process.env.SSE_MAX_PENDING) || 100,
logger: appLogger
});
// Initialize services
await pubSubService.connect();
// Inject into Express app for route access
app.locals.sseService = sseService;
app.locals.cacheService = CacheService;
app.locals.pubSubService = pubSubService;
// Graceful shutdown handling
process.on('SIGTERM', async () => {
console.log('Shutting down SSE service...');
// Close all SSE connections
await sseService.shutdown();
// Disconnect from PubSub
await pubSubService.disconnect();
// Other cleanup...
});- Testability - Easy to inject mock cache and PubSub services
- Consistency - Follows same pattern as rest of codebase
- Flexibility - Testers can override any dependency
- Graceful Degradation - Cache failures don't break SSE functionality
- Separation of Concerns - SSE operations separate from business logic
- Repository Independence - Uses proxy pattern instead of direct repository injection
CRITICAL: In a clustered environment with round-robin load balancing, SSE cannot function without the cache service.
When cache service is unavailable:
- SSE endpoints return 503 - Service Unavailable
- Clients must fall back to polling or wait for service restoration
- System continues operating - All other features work normally
- No real-time updates - Events are still processed but not delivered in real-time
Why local maps don't work as fallback:
- Each server has isolated memory
- Round-robin sends requests to different servers
- Connection on Server A is invisible to Server B
- No cross-server coordination possible without cache/pubsub
/**
* Send system-wide announcement to all connected users
* @param {string} message - Announcement message
* @param {string} severity - 'info', 'warning', 'critical'
*/
async function sendGlobalAnnouncement(message, severity) {
// Publish to global broadcast channel
await pubSubService.publish('broadcast:global', {
event: 'system_announcement',
data: {
message,
severity,
timestamp: new Date().toISOString()
}
});
}
// In SSE Service - Subscribe all connections to global channel
async handleSSEConnection(req, res) {
// ... existing connection setup ...
// Subscribe to user-specific channel
await this.pubSub.subscribe(`user:${userId}`, handler);
// ALSO subscribe to global broadcast channel
await this.pubSub.subscribe('broadcast:global', (message) => {
this.handleGlobalBroadcast(message);
});
}
// Usage: Maintenance notification
await sendGlobalAnnouncement(
'System maintenance scheduled for 2am-3am EST',
'warning'
);/**
* Subscribe user to entity channel
* @param {string} userId - User ID
* @param {string} entityType - Entity type (project, document, etc.)
* @param {string} entityId - Entity ID
*/
async function subscribeToEntity(userId, entityType, entityId) {
const channelId = `entity:${entityType}:${entityId}`;
// Add user to channel's subscriber set
await cacheService.addToSet(
CacheKeys.CHANNEL_SUBSCRIBERS,
channelId, // identifier: the channel
userId // member: the user subscribing
);
// Track user's channel subscriptions
await cacheService.addToSet(
CacheKeys.USER_SUBSCRIPTIONS,
userId, // identifier: the user
channelId // member: the channel they're subscribing to
);
}
/**
* Send notification to all entity subscribers
* @param {string} entityType - Entity type
* @param {string} entityId - Entity ID
* @param {Object} notification - Notification data
*/
async function notifyEntitySubscribers(entityType, entityId, notification) {
const channelId = `entity:${entityType}:${entityId}`;
// Get all subscribers for this channel
const subscribers = await cacheService.getSetMembers(
CacheKeys.CHANNEL_SUBSCRIBERS,
channelId
);
// Send to each subscriber's personal channel
for (const userId of subscribers) {
await pubSubService.publish(`user:${userId}`, {
event: 'entity_update',
data: {
entityType,
entityId,
...notification
}
});
}
}
// Usage: Entity state changed
await notifyEntitySubscribers('project', '123', {
type: 'state_changed',
newState: 'active',
changedBy: 'user-789'
});/**
* Subscribe user to organization channel
* @param {string} userId - User ID
* @param {string} orgId - Organization ID
*/
async function subscribeToOrganization(userId, orgId) {
const channelId = `org:${orgId}`;
// Add user to org channel subscriber set
await cacheService.addToSet(
CacheKeys.CHANNEL_SUBSCRIBERS,
channelId, // identifier: the organization channel
userId // member: the user
);
// Track user's channel subscriptions
await cacheService.addToSet(
CacheKeys.USER_SUBSCRIPTIONS,
userId, // identifier: the user
channelId // member: the channel
);
}
/**
* Send notification to all organization members
* @param {string} orgId - Organization ID
* @param {Object} notification - Notification data
*/
async function notifyOrgMembers(orgId, notification) {
const channelId = `org:${orgId}`;
const members = await cacheService.getSetMembers(
CacheKeys.CHANNEL_SUBSCRIBERS,
channelId
);
// Batch publish to all members
const publishPromises = members.map(userId =>
pubSubService.publish(`user:${userId}`, {
event: 'org_announcement',
data: {
orgId,
...notification
}
})
);
await Promise.all(publishPromises);
}
// Usage: Organization-wide announcement
await notifyOrgMembers('org-acme', {
type: 'announcement',
title: 'Company Meeting Tomorrow',
time: '2024-01-15T10:00:00Z'
});/**
* Subscribe user to topic notifications
* @param {string} userId - User ID
* @param {string} topic - Topic name (e.g., 'ai-ml', 'security')
*/
async function subscribeToTopic(userId, topic) {
await cacheService.addToSet(
CacheKeys.TOPIC_SUBSCRIBERS,
topic, // identifier: the topic
userId // member: the user
);
}
/**
* Notify all subscribers of a topic
* @param {string} topic - Topic name
* @param {Object} notification - Notification data
*/
async function notifyTopicSubscribers(topic, notification) {
const subscribers = await cacheService.getSetMembers(
CacheKeys.TOPIC_SUBSCRIBERS,
topic
);
for (const userId of subscribers) {
await pubSubService.publish(`user:${userId}`, {
event: 'topic_update',
data: {
topic,
...notification
}
});
}
}
// Usage: New AI/ML research paper
await notifyTopicSubscribers('ai-ml', {
type: 'new_content',
title: 'Breakthrough in Neural Network Efficiency',
link: '/papers/nn-efficiency-2024'
});/**
* Send direct message to specific user
* @param {string} fromUserId - Sender user ID
* @param {string} toUserId - Recipient user ID
* @param {string} message - Message content
*/
async function sendDirectMessage(fromUserId, toUserId, message) {
// No subscription needed - direct to user channel
await pubSubService.publish(`user:${toUserId}`, {
event: 'direct_message',
data: {
from: fromUserId,
message,
timestamp: new Date().toISOString()
}
});
}
// Usage: Send DM
await sendDirectMessage('user-123', 'user-456', 'Can we discuss the project?');/**
* Generic subscription handler for any entity type
* @param {string} entityType - Type of entity (contribution, vote, etc.)
* @param {string} entityId - Entity ID
* @param {string} userId - User ID to subscribe
*/
async function subscribeToEntity(entityType, entityId, userId) {
const cacheKey = Symbol.for(`${entityType}Subscribers`);
await cacheService.addToSet(
cacheKey,
entityId, // identifier: the entity
userId // member: the user
);
}
/**
* Notify all entity subscribers
* @param {string} entityType - Type of entity
* @param {string} entityId - Entity ID
* @param {Object} notification - Notification data
*/
async function notifyEntitySubscribers(entityType, entityId, notification) {
const cacheKey = Symbol.for(`${entityType}Subscribers`);
const subscribers = await cacheService.getSetMembers(
cacheKey,
entityId
);
for (const userId of subscribers) {
await pubSubService.publish(`user:${userId}`, {
event: `${entityType}_update`,
data: {
entityType,
entityId,
...notification
}
});
}
}
// Usage: Subscribe to contribution updates
await subscribeToEntity('contribution', 'contrib-789', 'user-123');
// Notify when contribution gets a comment
await notifyEntitySubscribers('contribution', 'contrib-789', {
type: 'new_comment',
commentId: 'comment-456',
author: 'user-999'
});/**
* SSE Service can filter alerts based on user preferences
*/
class ProjectSSEService {
async handlePubSubMessage(userId, message) {
// Get user's notification preferences
const preferences = await this.getUserPreferences(userId);
// Filter based on preferences
if (this.shouldSendNotification(message, preferences)) {
// Check alert priority
const priority = message.data.priority || 'normal';
// High priority alerts bypass rate limiting
if (priority === 'high' || this.checkRateLimit(userId)) {
this.sendToUserConnections(userId, message);
}
}
}
shouldSendNotification(message, preferences) {
// Check user's subscription settings
if (message.event === 'entity_update' && !preferences.entityUpdates) {
return false;
}
// Check quiet hours
if (preferences.quietHours && this.isQuietHours(preferences)) {
return message.data.priority === 'high'; // Only high priority
}
return true;
}
}
**Correct behaviour:**
```javascript
if (!cacheService.isAvailable()) {
res.status(503).json({
error: 'Real-time service temporarily unavailable',
fallback: 'Please use polling endpoints'
});
return;
}CRITICAL: The PubSub Service provides an abstraction layer for publish/subscribe operations, supporting multiple backend implementations.
While Kafka, SQS, RabbitMQ, AMQP, and Azure Service Bus all support publish/subscribe patterns, there are important semantic and operational differences:
-
Real-time vs Durability Trade-offs
- Redis Pub/Sub: Fire-and-forget, no persistence, sub-millisecond latency
- Message Queues: Durable, guaranteed delivery, higher latency (10-100ms)
- SSE Requirement: Real-time notifications need immediate delivery, not durability
-
Connection State Synchronization
- Redis Pub/Sub: Ephemeral channels perfect for "server1 has user X connected"
- Message Queues: Would persist these transient states unnecessarily
- SSE Requirement: Connection states are temporary and shouldn't be durable
-
Pattern Matching Subscriptions
- Redis: Native pattern subscriptions (
project:*:update) - SQS: No pattern matching, requires multiple queues
- Kafka: Can auto-create topics with
auto.create.topics.enable=true, supports regex subscriptions - RabbitMQ/AMQP: Topic exchanges support patterns but with overhead
- Azure Service Bus: Topic filters require SQL-like filter rules on each subscription
- Redis: Native pattern subscriptions (
What we need: Fast, ephemeral pub/sub for SSE coordination with pattern matching support.
| Provider | Service | Type | Pattern Support | Latency | Setup Complexity | SSE Suitability |
|---|---|---|---|---|---|---|
| AWS | ElastiCache for Redis | Managed Redis | Yes (PSUBSCRIBE) |
<1ms | Low | Excellent |
| AWS | MemoryDB for Redis | Durable Redis | Yes (PSUBSCRIBE) |
<2ms | Low | Excellent |
| Azure | Azure Cache for Redis | Managed Redis | Yes (PSUBSCRIBE) |
<1ms | Low | Excellent |
| GCP | Memorystore for Redis | Managed Redis | Yes (PSUBSCRIBE) |
<1ms | Low | Excellent |
| Self-hosted | Redis | Open source | Yes (PSUBSCRIBE) |
<1ms | Medium | Excellent |
Alternative Non-Redis Solutions:
| Provider | Service | Type | Pattern Support | Latency | Setup Complexity | SSE Suitability |
|---|---|---|---|---|---|---|
| AWS | IoT Core | MQTT Broker | Yes (wildcards) | <5ms | Medium | Good |
| Azure | Web PubSub | WebSocket Service | Yes (groups) | <5ms | Low | Very Good |
| Azure | SignalR Service | Real-time Service | Yes (groups/users) | <5ms | Low | Excellent |
| GCP | Firebase Realtime | Real-time Database | Yes (paths) | <10ms | Low | Good |
Why Redis is preferred for SSE:
- Sub-millisecond latency - Critical for real-time feel
- Pattern subscriptions -
PSUBSCRIBE user:*handles dynamic channels - Ephemeral by design - Perfect for connection state coordination
- Battle-tested - Mature technology with extensive tooling
- Cloud-agnostic - Same Redis code works across all providers
Cloud Provider Redis Services Configuration:
// AWS ElastiCache for Redis
const redis = require('redis');
const client = redis.createClient({
host: 'your-cluster.cache.amazonaws.com',
port: 6379,
// Cluster mode supported for high availability
cluster: {
enableReadyCheck: false,
redisOptions: {
password: process.env.REDIS_AUTH_TOKEN
}
}
});
// Azure Cache for Redis
const azureClient = redis.createClient({
host: 'your-cache.redis.cache.windows.net',
port: 6380, // SSL port
password: process.env.AZURE_REDIS_KEY,
tls: {
servername: 'your-cache.redis.cache.windows.net'
}
});
// GCP Memorystore for Redis
const gcpClient = redis.createClient({
host: 'your-instance-ip', // Private IP in VPC
port: 6379,
// Auth is via VPC networking, not password
});
// All support the same Redis PubSub commands:
await client.psubscribe('user:*', (channel, message) => {
console.log(`Received on ${channel}: ${message}`);
});Cost Comparison (Approximate monthly costs for SSE workload):
| Provider | Service | Instance Type | Monthly Cost | Notes |
|---|---|---|---|---|
| AWS | ElastiCache | cache.t3.micro | $15 | Good for development |
| AWS | ElastiCache | cache.r6g.large | $150 | Production ready |
| Azure | Cache for Redis | C1 Standard | $73 | 1GB, good for SSE |
| Azure | Cache for Redis | C3 Standard | $300 | 6GB, high availability |
| GCP | Memorystore | 1GB Standard | $45 | Basic tier |
| GCP | Memorystore | 1GB High Availability | $90 | With replica |
The system uses both patterns for different purposes:
-
Message Queues (Kafka/SQS/etc.) for:
- Business events that must be processed
- Audit trails and event sourcing
- Workflow orchestration
- Guaranteed delivery requirements
-
Redis Pub/Sub for:
- SSE connection coordination between servers
- Real-time notification fan-out
- Ephemeral state synchronisation
- Pattern-based event routing
Azure Service Bus requires more setup for SSE pattern matching because:
- Subscription Creation: Each pattern requires a separate subscription with SQL filters
- Filter Rules: Must define SQL-like expressions for each subscription
- Management Overhead: Cannot dynamically create subscriptions from application code without elevated permissions
// Azure Service Bus Setup Complexity
const { ServiceBusClient } = require('@azure/service-bus');
// Need to pre-create subscriptions with filters
// This typically requires Azure Portal or ARM templates:
/*
Topic: "notifications"
Subscription: "user-123-alerts"
Filter: "userId = '123'"
Subscription: "project-456-alerts"
Filter: "projectId = '456'"
Subscription: "global-alerts"
Filter: "type = 'broadcast'"
*/
// Application code becomes complex
async function setupUserSubscription(userId) {
// Cannot create subscription dynamically without admin rights
// Must use pre-created subscription with filter
const receiver = sbClient.createReceiver(
'notifications',
`user-${userId}-alerts` // Must exist already!
);
}-
AWS IoT Core
- Pros: MQTT protocol, WebSocket support, device shadows, rules engine
- Cons: Designed for IoT, connection limits, topic hierarchy restrictions
- SSE Fit: Good for real-time, supports wildcards
// Supports pattern subscriptions await iotClient.subscribe('user/+/notifications');
-
Amazon Kinesis Data Streams
- Pros: Real-time streaming, auto-scaling, multiple consumers
- Cons: Shard management, ordered by partition key only
- SSE Fit: Better for analytics than notifications
-
AWS AppSync
- Pros: GraphQL subscriptions, real-time updates, offline support
- Cons: GraphQL-only, resolver complexity
- SSE Fit: Excellent if using GraphQL
// GraphQL subscription subscription OnNotification($userId: ID!) { onNotification(userId: $userId) { message } }
-
Amazon EventBridge
- Pros: Event routing, content-based filtering, serverless
- Cons: Not real-time (minimum 1 minute), no WebSocket
- SSE Fit: Poor for real-time SSE
-
Azure SignalR Service
- Pros: Fully managed SignalR, auto-scaling, WebSocket/SSE/Long Polling
- Cons: SignalR protocol specific, client library required
- SSE Fit: Excellent - built for this use case
// Direct SSE support const connection = new signalR.HubConnectionBuilder() .withUrl("/hub") .build();
-
Azure Event Grid
- Pros: Event routing, advanced filtering, WebHook delivery
- Cons: HTTP push model, not true pub/sub for clients
- SSE Fit: Needs adapter for SSE
-
Azure Web PubSub
- Pros: WebSocket service, pattern subscriptions, presence
- Cons: Newer service, WebSocket focused
- SSE Fit: Very Good - supports patterns
// Supports pattern-based groups await client.joinGroup(`project:${projectId}:*`);
-
Azure Notification Hubs
- Pros: Push notifications, device management
- Cons: Mobile/desktop push only, not for web SSE
- SSE Fit: Not applicable for SSE
-
Firebase Realtime Database
- Pros: Real-time sync, offline support, simple API
- Cons: NoSQL structure, data modeling constraints
- SSE Fit: Good but requires data structure changes
// Auto-syncs to all clients firebase.database().ref(`users/${userId}/notifications`) .on('child_added', (snapshot) => { /* handle */ });
-
Cloud Pub/Sub
- Pros: Scalable, reliable, push/pull delivery
- Cons: Not browser-compatible, needs proxy
- SSE Fit: Requires SSE adapter service
-
Firestore
- Pros: Real-time listeners, offline support, queries
- Cons: Document database, query limitations
- SSE Fit: Good with proper data model
// Real-time query subscriptions firestore.collection('notifications') .where('userId', '==', userId) .onSnapshot((snapshot) => { /* handle */ });
-
Google Cloud Tasks
- Pros: Task queuing, scheduling
- Cons: Not real-time, HTTP push only
- SSE Fit: Not suitable for SSE
| Service | Provider | Pattern Support | Setup Complexity | Cost Model | Best For |
|---|---|---|---|---|---|
| Azure SignalR | Azure | Groups/Users | Low | Connections/Messages | Best overall for SSE |
| Azure Web PubSub | Azure | Pattern groups | Low | Connections/Messages | Modern WebSocket apps |
| AWS IoT Core | AWS | MQTT wildcards | Medium | Messages | IoT + Web hybrid |
| AWS AppSync | AWS | GraphQL filters | Medium | Queries/Data | GraphQL APIs |
| Firebase Realtime | GCP | Path-based | Low | Storage/Bandwidth | Simple real-time |
| Firestore | GCP | Query-based | Low | Reads/Writes | Document-based apps |
| Pusher Channels | 3rd Party | Channels/Events | Very Low | Connections/Messages | Quick implementation |
| Ably | 3rd Party | Channel patterns | Very Low | Connections/Messages | Global scale |
For SSE without session affinity / sticky sessions using only managed services:
- Azure-centric: Use Azure SignalR Service or Azure Web PubSub
- AWS-centric: Use AWS IoT Core (despite the name) or AWS AppSync
- GCP-centric: Use Firebase Realtime Database or Firestore
- Multi-cloud: Use Pusher, Ably, or PubNub
For SSE without session affinity, the dual-pattern approach is optimal:
-
Primary Choice: Managed Redis Services
- AWS: ElastiCache for Redis or MemoryDB for Redis
- Azure: Azure Cache for Redis
- GCP: Memorystore for Redis
- Self-hosted: Redis with high availability setup
-
Architecture Pattern:
- Existing message queue (Kafka/SQS/Service Bus) for business events
- Redis Pub/Sub for SSE coordination and real-time delivery
- Service facades to abstract both implementations
-
Why This Works:
- Redis handles ephemeral SSE connection coordination
- Message queues handle durable business event processing
- Both can coexist and serve different purposes
- Same Redis API across all cloud providers
- Sub-millisecond latency for real-time user experience
/**
* PubSub Service Module Factory
* Abstracts pub/sub operations across different message brokers
* @module services/pubsub-service
* @param {Object} config - Configuration object
* @param {string} config.provider - Provider type: 'redis', 'sqs', 'azure', 'kafka', 'amqp'
* @param {Object} config.connectionOptions - Provider-specific connection options
* @param {Object} [config.logger] - Optional logger instance
* @returns {PubSubService} PubSub service instance
*/
module.exports = ({ provider, connectionOptions, logger = console }) => {
/**
* @class PubSubService
* @description Unified interface for pub/sub operations across different backends
*/
class PubSubService {
constructor() {
this.provider = provider;
this.logger = logger;
this.adapter = this.createAdapter(provider, connectionOptions);
this.subscriptions = new Map();
this.isConnected = false;
}
/**
* Creates the appropriate adapter based on provider
* @private
* @param {string} provider - Provider type
* @param {Object} options - Connection options
* @returns {Object} Provider adapter
*/
createAdapter(provider, options) {
switch(provider) {
case 'redis':
return new RedisPubSubAdapter(options);
case 'sqs':
return new SQSAdapter(options);
case 'azure':
return new AzureServiceBusAdapter(options);
case 'kafka':
return new KafkaAdapter(options);
case 'amqp':
return new AMQPAdapter(options);
default:
throw new Error(`Unsupported provider: ${provider}`);
}
}
/**
* Connects to the pub/sub backend
* @async
* @returns {Promise<void>}
* @throws {Error} Connection error
*/
async connect() {
try {
await this.adapter.connect();
this.isConnected = true;
this.logger.info(`PubSub connected to ${this.provider}`);
} catch (error) {
this.logger.error(`PubSub connection failed: ${error.message}`);
throw error;
}
}
/**
* Publishes a message to a channel
* @async
* @param {string} channel - Channel/topic name
* @param {Object} message - Message payload
* @returns {Promise<void>}
* @example
* await pubSubService.publish('user:123', { event: 'notification', data: {...} })
*/
async publish(channel, message) {
if (!this.isConnected) {
throw new Error('PubSub service not connected');
}
try {
const payload = JSON.stringify(message);
await this.adapter.publish(channel, payload);
this.logger.debug(`Published to ${channel}`, message);
} catch (error) {
this.logger.error(`Publish failed to ${channel}: ${error.message}`);
throw error;
}
}
/**
* Subscribes to a channel
* @async
* @param {string} channel - Channel/topic name
* @param {Function} handler - Message handler function
* @returns {Promise<string>} Subscription ID
* @example
* const subId = await pubSubService.subscribe('user:123', (message) => {
* console.log('Received:', message);
* });
*/
async subscribe(channel, handler) {
if (!this.isConnected) {
throw new Error('PubSub service not connected');
}
const subscriptionId = `${channel}_${Date.now()}`;
const wrappedHandler = (rawMessage) => {
try {
const message = JSON.parse(rawMessage);
handler(message);
} catch (error) {
this.logger.error(`Handler error for ${channel}: ${error.message}`);
}
};
await this.adapter.subscribe(channel, wrappedHandler);
this.subscriptions.set(subscriptionId, { channel, handler: wrappedHandler });
this.logger.debug(`Subscribed to ${channel} with ID ${subscriptionId}`);
return subscriptionId;
}
/**
* Subscribes to a pattern (for providers that support it)
* @async
* @param {string} pattern - Pattern to match channels
* @param {Function} handler - Message handler function
* @returns {Promise<string>} Subscription ID
* @example
* await pubSubService.psubscribe('user:*', (channel, message) => {
* console.log(`Received on ${channel}:`, message);
* });
*/
async psubscribe(pattern, handler) {
if (!this.adapter.psubscribe) {
throw new Error(`Pattern subscription not supported by ${this.provider}`);
}
const subscriptionId = `pattern_${pattern}_${Date.now()}`;
const wrappedHandler = (channel, rawMessage) => {
try {
const message = JSON.parse(rawMessage);
handler(channel, message);
} catch (error) {
this.logger.error(`Pattern handler error: ${error.message}`);
}
};
await this.adapter.psubscribe(pattern, wrappedHandler);
this.subscriptions.set(subscriptionId, { pattern, handler: wrappedHandler });
return subscriptionId;
}
/**
* Unsubscribes from a channel or pattern
* @async
* @param {string} subscriptionId - Subscription ID to cancel
* @returns {Promise<void>}
*/
async unsubscribe(subscriptionId) {
const subscription = this.subscriptions.get(subscriptionId);
if (!subscription) {
throw new Error(`Subscription ${subscriptionId} not found`);
}
if (subscription.channel) {
await this.adapter.unsubscribe(subscription.channel);
} else if (subscription.pattern) {
await this.adapter.punsubscribe(subscription.pattern);
}
this.subscriptions.delete(subscriptionId);
this.logger.debug(`Unsubscribed ${subscriptionId}`);
}
/**
* Checks if service is available
* @returns {boolean} Service availability
*/
isAvailable() {
return this.isConnected && this.adapter.isHealthy();
}
/**
* Disconnects from the pub/sub backend
* @async
* @returns {Promise<void>}
*/
async disconnect() {
for (const [id, _] of this.subscriptions) {
await this.unsubscribe(id);
}
await this.adapter.disconnect();
this.isConnected = false;
this.logger.info(`PubSub disconnected from ${this.provider}`);
}
}
return new PubSubService();
};/**
* Redis PubSub Adapter - Works with all cloud provider Redis services
* @class RedisPubSubAdapter
* @private
*/
class RedisPubSubAdapter {
constructor(options) {
const Redis = require('ioredis');
// Auto-detect cloud provider configuration
const redisConfig = this.buildRedisConfig(options);
// Separate clients for pub/sub (Redis best practice)
this.pubClient = new Redis(redisConfig);
this.subClient = new Redis(redisConfig);
this.healthy = true;
// Health monitoring
this.setupHealthChecks();
}
/**
* Builds Redis configuration for different cloud providers
* @private
*/
buildRedisConfig(options) {
// AWS ElastiCache / MemoryDB
if (options.aws || options.host?.includes('cache.amazonaws.com')) {
return {
host: options.host,
port: options.port || 6379,
password: options.password,
tls: options.tls ? {} : undefined,
retryDelayOnFailover: 100,
enableReadyCheck: false,
maxRetriesPerRequest: 3
};
}
// Azure Cache for Redis
if (options.azure || options.host?.includes('redis.cache.windows.net')) {
return {
host: options.host,
port: options.port || 6380,
password: options.password,
tls: {
servername: options.host
},
family: 4,
keepAlive: true
};
}
// GCP Memorystore
if (options.gcp || options.region) {
return {
host: options.host,
port: options.port || 6379,
// Memorystore uses VPC auth, no password needed
connectTimeout: 10000,
lazyConnect: true
};
}
// Default/Self-hosted Redis
return {
host: options.host || 'localhost',
port: options.port || 6379,
password: options.password,
retryDelayOnFailover: 100,
maxRetriesPerRequest: 3
};
}
/**
* Sets up health monitoring for both clients
* @private
*/
setupHealthChecks() {
this.subClient.on('error', (err) => {
console.error('Redis subscription client error:', err);
this.healthy = false;
});
this.pubClient.on('error', (err) => {
console.error('Redis publisher client error:', err);
this.healthy = false;
});
this.subClient.on('connect', () => {
console.log('Redis subscription client connected');
this.healthy = true;
});
this.pubClient.on('connect', () => {
console.log('Redis publisher client connected');
});
}
async connect() {
try {
await Promise.all([
this.pubClient.ping(),
this.subClient.ping()
]);
this.healthy = true;
console.log('Redis PubSub adapter connected successfully');
} catch (error) {
this.healthy = false;
throw new Error(`Redis connection failed: ${error.message}`);
}
}
async publish(channel, message) {
if (!this.healthy) {
throw new Error('Redis publisher not healthy');
}
return this.pubClient.publish(channel, message);
}
async subscribe(channel, handler) {
this.subClient.subscribe(channel);
this.subClient.on('message', (ch, message) => {
if (ch === channel) handler(message);
});
}
async psubscribe(pattern, handler) {
this.subClient.psubscribe(pattern);
this.subClient.on('pmessage', handler);
}
async unsubscribe(channel) {
return this.subClient.unsubscribe(channel);
}
async punsubscribe(pattern) {
return this.subClient.punsubscribe(pattern);
}
isHealthy() {
return this.healthy &&
this.pubClient.status === 'ready' &&
this.subClient.status === 'ready';
}
async disconnect() {
await Promise.all([
this.pubClient.quit(),
this.subClient.quit()
]);
console.log('Redis PubSub adapter disconnected');
}
}
/**
* Example configurations for different cloud providers:
*/
// AWS ElastiCache for Redis
const awsAdapter = new RedisPubSubAdapter({
aws: true,
host: 'your-cluster.abcdef.cache.amazonaws.com',
port: 6379,
password: process.env.REDIS_AUTH_TOKEN
});
// Azure Cache for Redis
const azureAdapter = new RedisPubSubAdapter({
azure: true,
host: 'your-cache.redis.cache.windows.net',
port: 6380,
password: process.env.AZURE_REDIS_KEY
});
// GCP Memorystore for Redis
const gcpAdapter = new RedisPubSubAdapter({
gcp: true,
host: '10.1.1.100', // Private VPC IP
port: 6379
});
// Self-hosted Redis
const selfHostedAdapter = new RedisPubSubAdapter({
host: 'redis.yourdomain.com',
port: 6379,
password: process.env.REDIS_PASSWORD
});// Production configuration supporting different cloud Redis services
const pubSubService = createPubSubService({
provider: 'redis', // Primary choice for SSE
connectionOptions: {
// AWS ElastiCache
aws: process.env.CLOUD_PROVIDER === 'aws',
host: process.env.REDIS_HOST,
port: parseInt(process.env.REDIS_PORT) || 6379,
password: process.env.REDIS_PASSWORD,
// Azure Cache for Redis
azure: process.env.CLOUD_PROVIDER === 'azure',
// host: process.env.REDIS_HOST, // same as above
// port: 6380 for Azure SSL
// GCP Memorystore
gcp: process.env.CLOUD_PROVIDER === 'gcp',
region: process.env.GCP_REGION,
// TLS configuration for cloud providers
tls: process.env.REDIS_TLS === 'true' ? {} : undefined,
// Connection pooling and retry settings
retryDelayOnFailover: 100,
maxRetriesPerRequest: 3,
enableReadyCheck: false
},
logger: winston.createLogger({...})
});CRITICAL: Handling multiple connections per user across the cluster requires proper lifecycle management to prevent memory exhaustion.
The safest strategy combines LRU (Least Recently Used) eviction with TTL (Time To Live) to guarantee memory bounds:
class SSEConnectionManager {
constructor(options = {}) {
// Memory safety configuration
this.maxConnections = options.maxConnections || 5000; // Hard limit per server
this.maxConnectionAge = options.maxConnectionAge || 3600000; // 1 hour max age
this.staleTimeout = options.staleTimeout || 120000; // 2 min inactivity
this.maxUserConnections = options.maxUserConnections || 5; // Max tabs per user
// Connection tracking
this.connections = new Map();
this.userConnections = new Map();
this.connectionMetadata = new Map(); // Stores lastActivity, createdAt
// Start cleanup intervals
this.startMaintenanceTasks();
}
startMaintenanceTasks() {
// Check every 30 seconds for stale connections
setInterval(() => this.evictStaleConnections(), 30000);
// Enforce memory limits every 10 seconds
setInterval(() => this.enforceLRU(), 10000);
// Log metrics every minute
setInterval(() => this.logMetrics(), 60000);
}
addConnection(connectionId, userId, request, response) {
// Enforce per-user limit first
this.enforceUserLimit(userId);
// Enforce global limit
if (this.connections.size >= this.maxConnections) {
this.evictLRUConnection();
}
// Add the connection with both request and response
const connection = {
id: connectionId,
userId,
request, // Need this for removing listeners
response, // Need this for sending data and closing
createdAt: Date.now(),
lastActivity: Date.now()
};
this.connections.set(connectionId, connection);
this.connectionMetadata.set(connectionId, {
lastActivity: Date.now(),
createdAt: Date.now()
});
// Track user connections
if (!this.userConnections.has(userId)) {
this.userConnections.set(userId, new Set());
}
this.userConnections.get(userId).add(connectionId);
}
evictStaleConnections() {
const now = Date.now();
const connectionsToEvict = [];
for (const [connId, metadata] of this.connectionMetadata) {
// Check both age and activity
const age = now - metadata.createdAt;
const inactivity = now - metadata.lastActivity;
if (age > this.maxConnectionAge || inactivity > this.staleTimeout) {
connectionsToEvict.push(connId);
}
}
// Evict stale connections
for (const connId of connectionsToEvict) {
const conn = this.connections.get(connId);
if (conn) {
console.log(`Evicting stale connection ${connId}: age=${age}ms, inactive=${inactivity}ms`);
this.closeConnection(connId, 'Connection timeout');
}
}
}
evictLRUConnection() {
// Find the least recently used connection
let lruConnId = null;
let oldestActivity = Date.now();
for (const [connId, metadata] of this.connectionMetadata) {
if (metadata.lastActivity < oldestActivity) {
oldestActivity = metadata.lastActivity;
lruConnId = connId;
}
}
if (lruConnId) {
console.log(`Evicting LRU connection ${lruConnId} to prevent memory exhaustion`);
this.closeConnection(lruConnId, 'Server memory limit reached');
}
}
enforceUserLimit(userId) {
const userConns = this.userConnections.get(userId);
if (userConns && userConns.size >= this.maxUserConnections) {
// Find oldest connection for this user
let oldestConnId = null;
let oldestTime = Date.now();
for (const connId of userConns) {
const metadata = this.connectionMetadata.get(connId);
if (metadata && metadata.createdAt < oldestTime) {
oldestTime = metadata.createdAt;
oldestConnId = connId;
}
}
if (oldestConnId) {
console.log(`User ${userId} exceeded connection limit, evicting oldest`);
this.closeConnection(oldestConnId, 'Too many connections');
}
}
}
closeConnection(connectionId, reason) {
const conn = this.connections.get(connectionId);
if (!conn) return;
// CRITICAL: Properly close the HTTP response stream
try {
// Check if response is still writable
if (conn.response && !conn.response.writableEnded) {
// Send close event to client with reason
conn.response.write(`event: close\ndata: ${JSON.stringify({
reason,
reconnect: true,
timestamp: Date.now()
})}\n\n`);
// End the response stream
conn.response.end();
}
} catch (e) {
// Connection already dead or closed
console.debug(`Connection ${connectionId} already closed: ${e.message}`);
}
// Remove all event listeners to prevent memory leaks
if (conn.request) {
conn.request.removeAllListeners('close');
conn.request.removeAllListeners('error');
}
// Clean up all maps
this.connections.delete(connectionId);
this.connectionMetadata.delete(connectionId);
const userConns = this.userConnections.get(conn.userId);
if (userConns) {
userConns.delete(connectionId);
if (userConns.size === 0) {
this.userConnections.delete(conn.userId);
}
}
// Clean up Redis if cache is available
if (this.cache) {
this.cache.removeHashField(
CacheKeys.SSE_SERVER_CONNECTIONS,
this.serverId,
`${conn.userId}:${connectionId}`
).catch(err => {
console.error(`Failed to remove connection from Redis: ${err.message}`);
});
}
console.info(`Connection ${connectionId} closed: ${reason}`);
}
updateActivity(connectionId) {
const metadata = this.connectionMetadata.get(connectionId);
if (metadata) {
metadata.lastActivity = Date.now();
}
}
logMetrics() {
const metrics = {
totalConnections: this.connections.size,
uniqueUsers: this.userConnections.size,
memoryUsage: Math.round(process.memoryUsage().heapUsed / 1024 / 1024) + 'MB',
limits: {
maxConnections: this.maxConnections,
currentUtilization: (this.connections.size / this.maxConnections * 100).toFixed(1) + '%'
}
};
console.info('SSE Connection Metrics:', metrics);
// Emit warning if approaching limits
if (this.connections.size > this.maxConnections * 0.8) {
console.warn('WARNING: Approaching connection limit!', metrics);
}
}
}- Hard Connection Limit - Never exceed
maxConnections(e.g., 5000) - Automatic Age-Out - Connections older than 1 hour are evicted
- Inactivity Timeout - Stale connections evicted after 2 minutes
- Per-User Limits - Prevent single user from consuming too many connections
- Continuous Cleanup - Regular intervals ensure timely eviction
This guarantees:
- Memory bounded by
maxConnections * avgConnectionSize - No zombie connections older than TTL
- Fair resource allocation across users
- Predictable memory usage for capacity planning
// Local in-memory tracking (per server)
class ConnectionManager {
constructor() {
// Map of connectionId -> connection object
this.connections = new Map();
// Map of userId -> Set of connectionIds
this.userConnections = new Map();
// LRU eviction for memory management
this.maxConnections = 10000;
this.connectionAccessTime = new Map();
}
}
// Redis tracking (cluster-wide)
// Hash: sse_connections:server1
// Field: userId:connectionId
// Value: { metadata including lastActivity timestamp }Users can have multiple SSE connections (tabs, devices):
// User with 3 tabs open
userConnections.get('user123') // Set(['conn1', 'conn2', 'conn3'])
// Each connection tracked separately
connections.get('conn1') // { userId: 'user123', response: res1, ... }
connections.get('conn2') // { userId: 'user123', response: res2, ... }
connections.get('conn3') // { userId: 'user123', response: res3, ... }PRIMARY STRATEGY: LRU + TTL - This combination prevents memory exhaustion and is the safest approach.
- Client Disconnect (Polite Close)
req.on('close', async () => {
// Remove from local maps
this.connections.delete(connectionId);
const userConns = this.userConnections.get(userId);
if (userConns) {
userConns.delete(connectionId);
if (userConns.size === 0) {
this.userConnections.delete(userId);
}
}
// Remove from Redis
await this.cache.removeHashField(
CacheKeys.SSE_SERVER_CONNECTIONS,
this.serverId,
`${userId}:${connectionId}`
);
await this.cache.delete(CacheKeys.SSE_CONNECTION, connectionId);
});- TTL-Based Eviction (Stale Connections)
// Heartbeat checks for stale connections
sendHeartbeats() {
const now = Date.now();
const staleThreshold = 2 * 60 * 1000; // 2 minutes
for (const [connId, conn] of this.connections) {
if (now - conn.lastActivity > staleThreshold) {
// Connection is stale - evict it
console.log(`Evicting stale connection ${connId}`);
try {
// Send close event
this.sendEvent(conn, {
type: 'connection_timeout',
data: { reason: 'Inactivity timeout' }
});
conn.response.end();
} catch (e) {
// Connection already dead
}
// Clean up
this.handleDisconnect(connId, conn.userId);
} else {
// Send heartbeat
try {
conn.response.write(': heartbeat\n\n');
conn.lastActivity = now;
} catch (e) {
// Connection dead - clean up
this.handleDisconnect(connId, conn.userId);
}
}
}
}- LRU Eviction (Memory Pressure)
enforceConnectionLimit() {
if (this.connections.size > this.maxConnections) {
// Find LRU connection
let oldestTime = Date.now();
let oldestConnId = null;
for (const [connId, conn] of this.connections) {
if (conn.lastActivity < oldestTime) {
oldestTime = conn.lastActivity;
oldestConnId = connId;
}
}
if (oldestConnId) {
const conn = this.connections.get(oldestConnId);
// Notify and close
this.sendEvent(conn, {
type: 'connection_limit',
data: { reason: 'Server connection limit reached' }
});
conn.response.end();
// Clean up
this.handleDisconnect(oldestConnId, conn.userId);
}
}
}- Redis TTL (Automatic Cleanup)
// Connection metadata expires automatically
await this.cache.set(
CacheKeys.SSE_CONNECTION,
connectionId,
metadata,
{ ttlSeconds: 3600 } // Auto-expires after 1 hour
);
// Periodic cleanup job removes orphaned entries
async cleanupOrphanedConnections() {
const servers = await this.cache.keys('sse_connections:*');
for (const serverKey of servers) {
const connections = await this.cache.getAllHashFields(
CacheKeys.SSE_SERVER_CONNECTIONS,
serverKey.split(':')[1]
);
for (const [field, metadata] of Object.entries(connections)) {
// Check if connection metadata still exists
const connExists = await this.cache.get(
CacheKeys.SSE_CONNECTION,
metadata.connectionId
);
if (!connExists) {
// Connection expired - remove from hash
await this.cache.removeHashField(
CacheKeys.SSE_SERVER_CONNECTIONS,
metadata.serverId,
field
);
}
}
}
}- Server Shutdown (Graceful Cleanup)
async shutdown() {
// Notify all connections
for (const [connId, conn] of this.connections) {
this.sendEvent(conn, {
type: 'server_shutdown',
data: { message: 'Server shutting down, please reconnect' }
});
conn.response.end();
}
// Clear Redis entries for this server
await this.cache.delete(
CacheKeys.SSE_SERVER_CONNECTIONS,
this.serverId
);
// Clear local maps
this.connections.clear();
this.userConnections.clear();
}// Prevent connection spam from single user
async canUserConnect(userId) {
const userConns = this.userConnections.get(userId);
if (userConns && userConns.size >= MAX_CONNECTIONS_PER_USER) {
// Evict oldest connection for this user
const connections = Array.from(userConns)
.map(id => ({ id, conn: this.connections.get(id) }))
.sort((a, b) => a.conn.createdAt - b.conn.createdAt);
const oldest = connections[0];
// Close oldest
this.sendEvent(oldest.conn, {
type: 'connection_replaced',
data: { reason: 'New connection opened' }
});
oldest.conn.response.end();
this.handleDisconnect(oldest.id, userId);
}
return true;
}getConnectionMetrics() {
return {
totalConnections: this.connections.size,
uniqueUsers: this.userConnections.size,
avgConnectionsPerUser: this.connections.size / Math.max(1, this.userConnections.size),
oldestConnection: Math.min(...Array.from(this.connections.values()).map(c => c.createdAt)),
memoryUsage: process.memoryUsage().heapUsed,
connectionsByAge: {
under1min: 0,
under5min: 0,
under1hour: 0,
over1hour: 0
}
};
}IMPORTANT: This section shows how SSE subscriptions could be managed, but the actual subscription logic is handled by separate services/ADRs. The SSE service only handles the delivery mechanism.
// SSE subscriptions could be stored in MongoDB (handled by separate service)
{
_id: ObjectId("..."),
userId: ObjectId("507f1f77bcf86cd799439010"),
sseSubscriptions: [
{
channelType: "entity",
entityType: "project",
entityId: "PRJ-001",
subscriptionDate: ISODate("2024-01-01T00:00:00Z")
},
{
channelType: "topic",
topicName: "ai-research",
subscriptionDate: ISODate("2024-01-02T00:00:00Z")
},
{
channelType: "global",
subscriptionDate: ISODate("2024-01-01T00:00:00Z")
}
]
}
// Example entity that users might subscribe to
{
_id: "PRJ-001",
name: "Alpha Project",
state: "ACTIVE",
createdAt: ISODate("2024-01-01T00:00:00Z"),
updatedAt: ISODate("2025-01-20T15:45:00Z"),
}// 1. Channel -> Users mapping (who's subscribed to this channel?)
// Key: channel_subscribers:entity:project:PRJ-001
// Value: Set of user IDs subscribed to this channel
SADD channel_subscribers:entity:project:PRJ-001 user123 user456 user789
SADD channel_subscribers:topic:ai-research user123 user999
// 2. User -> Channels mapping (what channels is this user subscribed to?)
// Key: user_subscriptions:user123
// Value: Set of channel IDs
SADD user_subscriptions:user123 entity:project:PRJ-001 topic:ai-research
// 3. Active SSE connections by server
// Key: sse_connections:server1
// Value: Hash of userId:connectionId -> metadata
HSET sse_connections:server1 user123:conn_abc '{"created":1234567890,"lastActivity":1234567890}'
HSET sse_connections:server1 user123:conn_def '{"created":1234567900,"lastActivity":1234567900}'
// 4. Connection details with TTL
// Key: sse_connection:conn_abc
// Value: JSON with connection metadata (expires automatically)
SETEX sse_connection:conn_abc 3600 '{"userId":"user123","serverId":"server1","created":1234567890}'
// 5. Global subscribers (for broadcast messages)
// Key: global_subscribers
// Value: Set of all connected user IDs
SADD global_subscribers user123 user456 user789
// 6. Topic subscribers
// Key: topic_subscribers:ai-research
// Value: Set of user IDs interested in this topic
SADD topic_subscribers:ai-research user123 user999The system already has a message-producer.js that implements the facade pattern for message bus abstraction. This producer listens to ALL events from the Global Event Emitter and forwards them to the configured message bus (Kafka or Azure Service Bus):
// message-bus-facade.js
class MessageBusFacade {
constructor(config = {}) {
this.provider = config.provider || process.env.MESSAGE_BUS_PROVIDER || 'redis';
switch(this.provider) {
case 'azure':
this.implementation = new AzureServiceBusAdapter(config);
break;
case 'aws':
this.implementation = new AWSSQSAdapter(config);
break;
case 'redis':
this.implementation = new RedisAdapter(config);
break;
case 'memory':
this.implementation = new InMemoryAdapter(config);
break;
default:
throw new Error(`Unknown provider: ${this.provider}`);
}
}
async publish(topic, message) {
return this.implementation.publish(topic, message);
}
async subscribe(topic, handler) {
return this.implementation.subscribe(topic, handler);
}
async createSubscription(topic, subscriptionName) {
return this.implementation.createSubscription(topic, subscriptionName);
}
}
// Azure Service Bus Adapter
class AzureServiceBusAdapter {
constructor(config) {
const { ServiceBusClient } = require('@azure/service-bus');
this.client = new ServiceBusClient(config.connectionString);
this.senders = new Map();
this.receivers = new Map();
}
async publish(topic, message) {
if (!this.senders.has(topic)) {
this.senders.set(topic, this.client.createSender(topic));
}
const sender = this.senders.get(topic);
await sender.sendMessages({
body: message,
contentType: 'application/json',
messageId: message.id || crypto.randomUUID()
});
return { success: true, messageId: message.id };
}
async subscribe(topic, handler) {
const receiver = this.client.createReceiver(topic, this.subscriptionName);
receiver.subscribe({
processMessage: async (message) => {
await handler(message.body);
await receiver.completeMessage(message);
},
processError: async (error) => {
console.error('Azure Service Bus error:', error);
}
});
this.receivers.set(topic, receiver);
return () => receiver.close();
}
}
// AWS SQS Adapter
class AWSSQSAdapter {
constructor(config) {
const AWS = require('aws-sdk');
this.sqs = new AWS.SQS({ region: config.region });
this.sns = new AWS.SNS({ region: config.region });
this.queueUrls = config.queueUrls || {};
this.topicArns = config.topicArns || {};
}
async publish(topic, message) {
const params = {
TopicArn: this.topicArns[topic],
Message: JSON.stringify(message),
MessageAttributes: {
type: {
DataType: 'String',
StringValue: message.type || 'notification'
}
}
};
const result = await this.sns.publish(params).promise();
return { success: true, messageId: result.MessageId };
}
async subscribe(topic, handler) {
const queueUrl = this.queueUrls[topic];
let running = true;
const poll = async () => {
while (running) {
try {
const params = {
QueueUrl: queueUrl,
MaxNumberOfMessages: 10,
WaitTimeSeconds: 20
};
const result = await this.sqs.receiveMessage(params).promise();
if (result.Messages) {
for (const message of result.Messages) {
const body = JSON.parse(message.Body);
await handler(body);
await this.sqs.deleteMessage({
QueueUrl: queueUrl,
ReceiptHandle: message.ReceiptHandle
}).promise();
}
}
} catch (error) {
console.error('SQS polling error:', error);
await new Promise(resolve => setTimeout(resolve, 5000));
}
}
};
poll();
return () => { running = false; };
}
}
// Redis Adapter (for local development)
class RedisAdapter {
constructor(config) {
const Redis = require('ioredis');
this.pubClient = new Redis(config);
this.subClient = new Redis(config);
this.handlers = new Map();
}
async publish(topic, message) {
await this.pubClient.publish(topic, JSON.stringify(message));
return { success: true };
}
async subscribe(topic, handler) {
if (!this.handlers.has(topic)) {
this.handlers.set(topic, new Set());
await this.subClient.subscribe(topic);
}
this.handlers.get(topic).add(handler);
this.subClient.on('message', (channel, message) => {
if (channel === topic) {
const handlers = this.handlers.get(topic);
handlers.forEach(h => h(JSON.parse(message)));
}
});
return () => {
const handlers = this.handlers.get(topic);
handlers.delete(handler);
};
}
}CRITICAL: The system consists of TWO separate types of processes:
-
API Servers (Multiple instances)
- Handle HTTP requests and SSE connections
- Run the SSE Service for managing client connections
- Subscribe to Redis Pub/Sub to receive notifications
- Do NOT process events from the message bus
- Scale horizontally based on connection load
-
Notification Worker (Separate process/container)
- Runs as an independent service/process
- Consumes events from message bus (Kafka/Azure Service Bus)
- Determines which users should receive notifications
- Publishes to Redis Pub/Sub for distribution
- Does NOT handle HTTP or SSE connections
- Can be scaled independently from API servers
graph TD
subgraph Production_Deployment[Production Deployment]
direction TB
subgraph API_Servers[ ]
A[API Server Process 1<br>Express App<br>- HTTP Routes<br>- SSE Service<br>- Redis Sub] -->|Subscribes| R
B[API Server Process 2<br>Express App<br>- HTTP Routes<br>- SSE Service<br>- Redis Sub] -->|Subscribes| R
end
R[Redis Pub/Sub] -->|Publishes| N
subgraph Worker[ ]
N[Notification Worker Process<br>Separate Container/Service<br>- Consumes from Message Bus<br>- Processes Events<br>- Publishes to Redis]
end
N -->|Consumes| M[Message Bus<br>Kafka/Azure Service Bus]
end
classDef boxStyle fill:#f9f9f9,stroke:#333,stroke-width:2px;
classDef containerStyle fill:none,stroke:#333,stroke-width:2px;
class Production_Deployment,API_Servers,Worker containerStyle;
class A,B,R,N,M boxStyle;
sequenceDiagram
participant PS as Project Service
participant EE as Global Event Emitter
participant MP as MessageProducer<br/>(message-producer.js)
participant MB as Message Bus<br/>(Kafka/Azure)
participant NW as Notification Worker
participant RC as Redis Cache
participant RP as Redis Pub/Sub
participant SSE1 as SSE Server 1
participant SSE2 as SSE Server 2
participant U1 as User 1<br/>(on Server 1)
participant U2 as User 2<br/>(on Server 2)
Note over PS,EE: Project Event Occurs
PS->>EE: emit('project.solution_added', data)
EE->>MP: Event captured (automatic listener)
MP->>MB: sendMessage('project.solution_added', data)
Note over MB,NW: Worker Processing
MB->>NW: Poll/receive message (topic: project.solution_added)
NW->>RC: SMEMBERS project_alerts:507f1f77bcf86cd799439011
RC-->>NW: [user123, user456]
Note over NW,RC: Check Active Connections
NW->>RC: HKEYS sse_connections:*
RC-->>NW: [server1, server2]
Note over NW,RP: Publish to User Channels
NW->>RP: PUBLISH sse:user:user123 {notification}
NW->>RP: PUBLISH sse:user:user456 {notification}
Note over RP,SSE2: Redis Pub/Sub Distribution
RP->>SSE1: Message for user123
RP->>SSE2: Message for user456
Note over SSE1,U2: SSE Delivery
SSE1->>SSE1: Check local connections
SSE1->>U1: Write SSE event
SSE2->>SSE2: Check local connections
SSE2->>U2: Write SSE event
Consider an external worker that updates a project's status (e.g., from 'active' to 'completed'). This worker publishes the event to the durable message bus (e.g., Azure Message Buss, Kafka or SQS) for reliable delivery.
- Event Publishing: The worker sends a 'project.state_changed' event to the bus, including details like projectId, oldState, newState.
- Notification Worker Processing: The worker consumes the event, queries Redis for subscribers (users with the project in alerts).
- SSE Delivery: For each subscriber, publish to their user-specific Redis pub/sub channel. SSE servers subscribed to these channels deliver the event to connected clients.
- Redis Role: Used only for fast subscriber lookup and pub/sub coordination—not for durable storage. If Redis is down, SSE falls back to polling, but the bus ensures the event isn't lost.
This ensures reliable notification even if SSE servers are temporarily unavailable, while Redis handles efficient real-time routing.
IMPORTANT: The NotificationWorker runs as a separate process independent from the API servers. It:
- Consumes events from the message bus (Kafka/Azure Service Bus)
- Processes events to determine recipients (via external service calls, not direct repository access)
- Publishes notifications to Redis Pub/Sub channels
- Does NOT run inside the API process
// notification-worker.js - Runs as separate process
const createNotificationWorker = ({ cacheService, pubSubService, messageBusConsumer, logger = console }) => {
class NotificationWorker {
constructor() {
this.serverId = `worker-${process.pid}`;
this.cache = cacheService;
this.pubSub = pubSubService;
this.consumer = messageBusConsumer;
this.logger = logger;
// Metrics
this.metrics = {
eventsProcessed: 0,
notificationsSent: 0,
errors: 0,
lastProcessedAt: null
};
}
async start() {
// Subscribe to entity events from message bus
// These are published by message-producer.js when events occur
await this.consumer.subscribe({
topics: [
'entity.state_changed',
'entity.updated',
'system.broadcast'
],
fromBeginning: false
});
await this.consumer.run({
eachMessage: async ({ topic, message }) => {
try {
const event = JSON.parse(message.value);
await this.handleEvent(topic, event);
this.metrics.eventsProcessed++;
} catch (error) {
this.logger.error(`Error processing event: ${error.message}`, { topic, error });
this.metrics.errors++;
}
}
});
// Log metrics periodically
this.metricsInterval = setInterval(
() => this.logMetrics(),
30000 // Every 30 seconds
);
this.logger.info(`Notification worker ${this.serverId} started`);
}
async handleEvent(topic, event) {
try {
this.metrics.lastProcessedAt = new Date();
this.logger.debug(`Processing event: ${topic}`, { eventId: event.id, entityType: event.entityType });
switch (topic) {
case 'entity.state_changed':
await this.handleEntityStateChanged(event);
break;
case 'entity.updated':
await this.handleEntityUpdated(event);
break;
case 'system.broadcast':
await this.handleSystemBroadcast(event);
break;
default:
this.logger.warn(`Unknown event topic: ${topic}`);
}
} catch (error) {
this.logger.error('Error processing event:', error);
this.metrics.errors++;
// Could implement retry logic or dead letter queue here
await this.handleFailedEvent(event, error);
}
}
async handleEntityStateChanged(event) {
const { entityType, entityId, oldState, newState, timestamp, metadata } = event;
const channelId = `entity:${entityType}:${entityId}`;
// Get all users subscribed to this entity channel
const subscribedUserIds = await this.cache.getSetMembers(
CacheKeys.CHANNEL_SUBSCRIBERS,
channelId
);
if (!subscribedUserIds || subscribedUserIds.length === 0) {
this.logger.debug(`No subscribers for entity state change: ${channelId}`);
return;
}
this.logger.info(`Found ${subscribedUserIds.length} subscribers for ${channelId}`);
// Prepare notification
const notification = {
event: 'entity_state_changed',
data: {
id: `notif_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`,
entityType,
entityId,
entityName: metadata?.name || entityId,
stateTransition: {
from: oldState,
to: newState
},
timestamp,
eventId: event.id
}
};
// Send to each subscribed user
await this.broadcastToUsers(subscribedUserIds, notification);
}
async handleEntityUpdated(event) {
const { entityType, entityId, changes, timestamp, metadata } = event;
const channelId = `entity:${entityType}:${entityId}`;
// Get all users subscribed to this entity channel
const subscribedUserIds = await this.cache.getSetMembers(
CacheKeys.CHANNEL_SUBSCRIBERS,
channelId
);
if (!subscribedUserIds || subscribedUserIds.length === 0) {
this.logger.debug(`No subscribers for entity update: ${channelId}`);
return;
}
// Prepare notification
const notification = {
event: 'entity_updated',
data: {
id: `notif_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`,
entityType,
entityId,
entityName: metadata?.name || entityId,
changes,
timestamp,
eventId: event.id
}
};
// Send to each subscribed user
await this.broadcastToUsers(subscribedUserIds, notification);
}
async handleSystemBroadcast(event) {
const { message, severity, timestamp } = event;
// Get all connected users for broadcast
const connectedUsers = await this.cache.getSetMembers(
CacheKeys.GLOBAL_SUBSCRIBERS,
'global'
);
if (!connectedUsers || connectedUsers.length === 0) {
this.logger.debug('No connected users for system broadcast');
return;
}
// Prepare broadcast notification
const notification = {
event: 'system_broadcast',
data: {
id: `broadcast_${Date.now()}`,
message,
severity,
timestamp,
eventId: event.id
}
};
await this.broadcastToUsers(connectedUsers, notification);
}
async broadcastToUsers(userIds, notification) {
// Send notifications via pub/sub to user channels
const publishPromises = userIds.map(userId =>
this.pubSub.publish(`user:${userId}`, notification)
);
try {
await Promise.all(publishPromises);
this.metrics.notificationsSent += userIds.length;
this.logger.info(`Sent ${notification.event} notification to ${userIds.length} users`);
} catch (error) {
this.logger.error('Error broadcasting to users:', error);
throw error;
}
}
async handleFailedEvent(event, error) {
// Log the failure
this.logger.error('Failed to process event', {
eventId: event.id,
eventType: event.type,
error: error.message
});
// Could implement dead letter queue here
// await this.sendToDeadLetterQueue(event, error);
}
logMetrics() {
this.logger.info('Notification Worker Metrics', this.metrics);
}
async shutdown() {
this.logger.info('Shutting down notification worker...');
// Clear intervals
if (this.metricsInterval) {
clearInterval(this.metricsInterval);
}
// Close consumer
if (this.consumer) {
await this.consumer.disconnect();
}
// Close pub/sub
if (this.pubSub) {
await this.pubSub.disconnect();
}
this.logger.info('Notification worker shutdown complete');
}
}
return new NotificationWorker();
};
}
await this.redisPub.quit();
await this.mongo.close();
console.log(`Worker ${this.serverId} stopped`);
}
}
// Start the worker as a SEPARATE PROCESS
// This would typically be in a separate deployment/container
// Example: node notification-worker.js
// Or in Docker: CMD ["node", "notification-worker.js"]
// Or in PM2: pm2 start notification-worker.js --name "notification-worker"
if (require.main === module) {
const worker = new NotificationWorker({
messageBusProvider: process.env.MESSAGE_BUS_PROVIDER || 'azure',
messageBusConfig: {
connectionString: process.env.AZURE_SERVICE_BUS_CONNECTION_STRING,
// or for AWS:
// region: process.env.AWS_REGION,
// queueUrls: { ... },
// topicArns: { ... }
},
redis: {
host: process.env.REDIS_HOST || 'localhost',
port: process.env.REDIS_PORT || 6379
},
mongoUri: process.env.MONGODB_URI,
dbName: process.env.DB_NAME || 'myapp'
});
worker.start().catch(console.error);
// Graceful shutdown
process.on('SIGINT', async () => {
console.log('Received SIGINT, shutting down gracefully...');
await worker.stop();
process.exit(0);
});
}
module.exports = NotificationWorker;The SSE implementation uses core technologies focused on real-time event streaming infrastructure:
The SSE service is implemented using the Module Factory pattern as detailed in section 6.2.3. Key technologies:
- Node.js - Runtime environment
- Redis - PubSub coordination and caching (ElastiCache, Azure Cache, Memorystore)
- Express.js - HTTP server framework
- ioredis - Redis client library
- Module Factory Pattern - Dependency injection
// routes/sse.js
const express = require('express');
const router = express.Router();
// SSE endpoint - uses injected SSE service
router.get('/connect', authenticateMiddleware, async (req, res) => {
const sseService = req.app.locals.sseService;
await sseService.handleSSEConnection(req, res);
});
// Health check for SSE service
router.get('/health', (req, res) => {
const sseService = req.app.locals.sseService;
res.json({
available: sseService.isAvailable(),
connections: sseService.getConnectionCount(),
serverId: sseService.serverId
});
});
module.exports = router;# nginx.conf - No session affinity required!
upstream nodejs_cluster {
least_conn; # or round_robin, ip_hash NOT needed
server node1:3000 max_fails=3 fail_timeout=30s;
server node2:3000 max_fails=3 fail_timeout=30s;
server node3:3000 max_fails=3 fail_timeout=30s;
keepalive 64;
}
server {
listen 80;
# Regular API endpoints
location /api {
proxy_pass http://nodejs_cluster;
proxy_http_version 1.1;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_buffering on;
# Normal timeouts for regular APIs
proxy_connect_timeout 5s;
proxy_send_timeout 10s;
proxy_read_timeout 10s;
}
# SSE endpoint - no session affinity needed!
location /sse {
proxy_pass http://nodejs_cluster;
# CRITICAL SSE Settings
proxy_buffering off; # Don't buffer SSE streams
proxy_cache off; # Don't cache SSE
proxy_http_version 1.1;
proxy_set_header Connection '';
# Long timeout for persistent connections
proxy_read_timeout 3600s; # 1 hour
# Tell upstream not to buffer
proxy_set_header X-Accel-Buffering no;
# Headers
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header Host $host;
# CORS if needed
add_header 'Access-Control-Allow-Origin' '*' always;
add_header 'Access-Control-Allow-Credentials' 'true' always;
# No need for ip_hash or session affinity!
# Redis Pub/Sub handles the distribution
}
}// sse-client.js - Generic SSE client
class SSEClient {
constructor(config = {}) {
this.config = {
url: '/sse/connect',
reconnectDelay: 1000,
maxReconnectDelay: 30000,
maxReconnectAttempts: null,
onConnect: () => {},
onMessage: () => {},
onError: () => {},
onDisconnect: () => {},
...config
};
this.eventSource = null;
this.connected = false;
this.reconnectAttempts = 0;
this.reconnectTimer = null;
this.eventHandlers = new Map();
}
async connect() {
try {
console.log('Connecting to SSE...');
const url = new URL(this.config.url, window.location.origin);
this.eventSource = new EventSource(url, {
withCredentials: true
});
this.setupEventHandlers();
} catch (error) {
console.error('Failed to connect:', error);
this.scheduleReconnect();
}
}
setupEventHandlers() {
this.eventSource.onopen = () => {
console.log('SSE Connected');
this.connected = true;
this.reconnectAttempts = 0;
this.config.onConnect();
};
this.eventSource.onerror = (error) => {
console.error('SSE Error:', error);
this.connected = false;
this.config.onError(error);
if (error.status === 403 || error.status === 401) {
console.log('Authentication error, closing connection');
this.eventSource.close();
return;
}
if (this.eventSource.readyState === EventSource.CLOSED) {
this.scheduleReconnect();
}
};
this.eventSource.onmessage = (event) => {
try {
const data = JSON.parse(event.data);
console.log('SSE message received:', data);
this.config.onMessage(data);
} catch (error) {
console.error('Error parsing SSE message:', error);
}
};
this.eventSource.addEventListener('connected', (event) => {
const data = JSON.parse(event.data);
console.log('Connected to SSE server:', data.serverId);
});
}
on(eventType, handler) {
this.eventHandlers.set(eventType, handler);
if (this.eventSource) {
this.eventSource.addEventListener(eventType, (event) => {
try {
const data = JSON.parse(event.data);
handler(data, event);
} catch (error) {
console.error(`Error handling ${eventType} event:`, error);
}
});
}
}
off(eventType) {
this.eventHandlers.delete(eventType);
}
scheduleReconnect() {
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
}
if (this.config.maxReconnectAttempts &&
this.reconnectAttempts >= this.config.maxReconnectAttempts) {
console.log('Max reconnection attempts reached');
this.config.onError(new Error('Max reconnection attempts reached'));
return;
}
const delay = Math.min(
this.config.reconnectDelay * Math.pow(2, this.reconnectAttempts),
this.config.maxReconnectDelay
);
console.log(`Reconnecting in ${delay}ms (attempt ${this.reconnectAttempts + 1})`);
this.reconnectTimer = setTimeout(() => {
this.reconnectAttempts++;
this.connect();
}, delay);
}
disconnect() {
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
this.reconnectTimer = null;
}
if (this.eventSource) {
this.eventSource.close();
this.eventSource = null;
}
this.connected = false;
this.config.onDisconnect();
}
getState() {
return {
connected: this.connected,
readyState: this.eventSource?.readyState,
reconnectAttempts: this.reconnectAttempts
};
}
}
// Usage example:
const sseClient = new SSEClient({
url: '/sse/connect',
onConnect: () => console.log('SSE connected'),
onError: (error) => console.error('SSE error:', error),
onMessage: (data) => console.log('SSE message:', data)
});
// Register custom event handlers
sseClient.on('entity_updated', (data) => {
console.log('Entity updated:', data);
// Handle entity update in UI
});
sseClient.on('system_broadcast', (data) => {
console.log('System broadcast:', data);
// Show system announcement
});
// Connect to SSE
sseClient.connect();SSE infrastructure must handle:
-
Redis Memory Usage:
- Per channel: ~50 bytes + (user_id_size × subscriber_count)
- 10,000 channels × 100 avg subscribers = ~20-50MB
- Connection metadata: ~5KB per connection × 10,000 = ~50MB
-
Event Processing:
- Channel lookup: O(1) Redis set membership check
- Notification fanout: O(n) where n = subscribers
- Target: <100ms processing per event
-
SSE Connection Overhead:
- Per connection: ~5-20KB memory
- 10,000 concurrent users = ~50-200MB RAM per server
// Batch notification processing for efficiency
class BatchNotificationProcessor {
constructor(pubSubService) {
this.pubSub = pubSubService;
this.queue = [];
this.batchSize = 100;
this.batchInterval = 100; // ms
setInterval(() => this.processBatch(), this.batchInterval);
}
addNotification(userId, notification) {
this.queue.push({ userId, notification });
if (this.queue.length >= this.batchSize) {
this.processBatch();
}
}
async processBatch() {
if (this.queue.length === 0) return;
const batch = this.queue.splice(0, this.batchSize);
// Group by user for efficient publishing
const byUser = batch.reduce((acc, item) => {
if (!acc[item.userId]) acc[item.userId] = [];
acc[item.userId].push(item.notification);
return acc;
}, {});
// Batch publish to user channels
const publishPromises = Object.entries(byUser).map(([userId, notifications]) =>
this.pubSub.publish(`user:${userId}`, {
event: 'batch_notifications',
data: { notifications }
})
);
await Promise.all(publishPromises);
}
}Why Consider Mercure:
- Purpose-built for SSE
- Handles all complexity
- 100K+ connections per instance
- JWT-based authorization
Architecture with Mercure:
graph LR
subgraph "Mercure Architecture"
C[Clients] --> M[Mercure Hub]
API1[NodeJS] --> P[Publish]
API2[NodeJS] --> P
API3[NodeJS] --> P
P --> M
end
Implementation:
// Publishing from NodeJS
const publishToMercure = async (userId, data) => {
const jwt = generateJWT({mercure: {publish: ['*']}});
await fetch('http://mercure/.well-known/mercure', {
method: 'POST',
headers: {
'Authorization': `Bearer ${jwt}`,
'Content-Type': 'application/x-www-form-urlencoded',
},
body: new URLSearchParams({
'topic': `/users/${userId}`,
'data': JSON.stringify(data),
'private': 'on'
})
});
};Features:
- WebSocket + SSE support
- Built-in presence
- Message history
- Channel-based messaging
For extreme scale (500K+ connections):
// Goroutines use only ~2KB each
func handleSSE(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
flusher := w.(http.Flusher)
for {
select {
case msg := <-messageChan:
fmt.Fprintf(w, "data: %s\n\n", msg)
flusher.Flush()
case <-r.Context().Done():
return
}
}
}| Solution | Max Connections | Memory/Connection | CPU Usage | Latency |
|---|---|---|---|---|
| NodeJS + Redis | 10-50K | 50KB | Medium | 2-5ms |
| Mercure | 100K+ | 5KB | Low | 2-3ms |
| Centrifugo | 100K+ | 10KB | Low | 2-3ms |
| Custom Go | 500K+ | 2KB | Very Low | 1-2ms |
| Nginx nchan | 1M+ | 1KB | Minimal | <1ms |
| Solution | Development | Infrastructure | Maintenance | Total TCO |
|---|---|---|---|---|
| NodeJS + Redis | $12,000 | $0 | $7,200 | $19,200 |
| Mercure | $4,000 | $3,600 | $1,800 | $9,400 |
| Centrifugo | $5,000 | $7,200 | $2,880 | $15,080 |
| Custom Go | $20,000 | $10,800 | $5,400 | $36,200 |
// Prometheus metrics for monitoring
const promClient = require('prom-client');
const metrics = {
// Worker metrics
eventsProcessed: new promClient.Counter({
name: 'notification_events_processed_total',
help: 'Total events processed',
labelNames: ['event_type', 'worker_id']
}),
notificationsSent: new promClient.Counter({
name: 'notifications_sent_total',
help: 'Total notifications sent',
labelNames: ['notification_type', 'delivery_method']
}),
notificationLatency: new promClient.Histogram({
name: 'notification_latency_seconds',
help: 'Time from event to notification delivery',
buckets: [0.01, 0.05, 0.1, 0.5, 1, 2, 5]
}),
subscriptionCount: new promClient.Gauge({
name: 'active_subscriptions_total',
help: 'Total active project subscriptions',
labelNames: ['event_type']
}),
// SSE metrics
activeConnections: new promClient.Gauge({
name: 'sse_active_connections',
help: 'Current SSE connections',
labelNames: ['server_id']
}),
sseMessagesSent: new promClient.Counter({
name: 'sse_messages_sent_total',
help: 'Total SSE messages sent',
labelNames: ['server_id', 'message_type']
}),
// Redis Pub/Sub Metrics
redisPubSubMessages: new promClient.Counter({
name: 'redis_pubsub_messages_total',
help: 'Total Redis pub/sub messages',
labelNames: ['direction', 'channel_pattern']
})
};groups:
- name: notification_system
rules:
- alert: HighNotificationLatency
expr: histogram_quantile(0.95, notification_delivery_latency_ms) > 1000
for: 5m
annotations:
summary: "High notification latency detected"
- alert: WorkerBacklog
expr: azure_servicebus_queue_length{queue="project-events"} > 1000
for: 10m
annotations:
summary: "Event processing backlog growing"
- alert: LowSSEConnections
expr: sse_active_connections < 10 AND up > 30
for: 5m
annotations:
summary: "Abnormally low SSE connections"✅ Pros:
- Designed for SSE
- No session affinity needed
- Minimal code changes
- Battle-tested at scale
- JWT integration
❌ Cons:
- Additional service to deploy
- Slight latency increase (1-2ms)
✅ When to Choose:
- Cannot deploy additional services
- Team prefers JavaScript-only
- Under 50K concurrent connections
❌ Limitations:
- Higher memory usage
- Complex reconnection logic
- More code to maintain
Phase 1: Proof of Concept (Week 1-2)
- Implement NodeJS + Redis locally
- Test with 100 connections
- Deploy notification worker
- Setup Redis subscription cache
- Implement basic SSE service
Phase 2: Scale Testing (Week 3)
- Load test with 10K connections
- Monitor memory and CPU
- Identify bottlenecks
- Handle solution_added events
- Handle state_changed events
- Implement notification queuing
Phase 3: Production Decision (Week 4)
- If NodeJS handles load → Deploy
- If not → Switch to Mercure
- Build notification client library
- Add subscription management UI
- Implement notification display
Phase 4: Production Deployment (Week 5)
- Deploy with monitoring
- Implement gradual rollout
- Set up alerts
- Add batching for high-volume events
- Implement subscription sync optimisation
- Add comprehensive monitoring
| Factor | Your Requirement | NodeJS + Redis | Mercure | Centrifugo | Go | nchan |
|---|---|---|---|---|---|---|
| No Session Affinity | ✅ Required | ✅ Works | ✅ Works | ✅ Works | ✅ Works | ✅ Works |
| 10K+ Connections | ✅ Required | ✅ Easy | ✅ Easy | ✅ Easy | ✅ Easy | |
| Existing Stack | NodeJS/Redis | ✅ Native | ➕ Add Service | ➕ Add Service | ❌ New Lang | ➕ Nginx Module |
| Complexity | Low preferred | ✅ Low | ✅ Low | ❌ High | ||
| Cost | Minimize | ✅ Lowest | ✅ Low | ❌ High | ✅ Low | |
| Time to Market | Fast | ✅ 1 week | ✅ 1 week | ❌ 4-6 weeks |
Start with NodeJS + Redis for immediate implementation, but plan for Mercure if you expect growth beyond 50K concurrent connections.
The NodeJS + Redis solution works perfectly without session affinity using the Redis Pub/Sub pattern shown in the swim lane diagram. Each server maintains only its local connections, and Redis ensures messages reach the right server.
Based on the architectural requirements:
Architecture: Worker-based processing with Redis Pub/Sub for non-sticky session SSE delivery
Components:
- Message Bus Facade - Abstraction over Azure Service Bus/AWS SQS/Redis
- Notification Worker - Processes events and determines recipients
- Redis Pub/Sub - Distributes notifications across SSE servers
- NodeJS SSE Services - Multiple instances without session affinity
- MongoDB - Source of truth for subscriptions
- No Session Affinity Required: Redis Pub/Sub ensures notifications reach users regardless of which server they're connected to
- Scalable: Can add more SSE servers or workers independently
- Flexible: Message bus facade allows switching between Azure/AWS/Redis
- Reliable: Worker pattern with message acknowledgment ensures delivery
- Maintainable: Clear separation of concerns
- Users receive real-time updates for projects they care about
- Scalable to thousands of projects with selective subscriptions
- Scales horizontally without sticky session complexity
- Message bus abstraction provides vendor flexibility
- Worker pattern ensures reliable event processing
- Redis Pub/Sub provides efficient cross-server coordination
- Decoupled from main application flow via worker pattern
- Reliable delivery with queueing for offline users
- Additional infrastructure complexity (workers, Redis cache)
- Additional Redis dependency for pub/sub
- Subscription management overhead
- Need to maintain subscription cache consistency
- Network overhead of Redis pub/sub
- Need to monitor multiple components
- Potential notification storms during mass events
- Redis Sentinel/Cluster for high availability
- Periodic subscription sync from MongoDB
- Connection pooling and batching for efficiency
- Comprehensive monitoring and alerting
- Rate limiting per user/project
- Notification batching for rapid events
- Circuit breakers for downstream failures
- Graceful degradation to email/digest notifications
The system uses a dual-token authentication strategy with secure HTTP-only cookies:
-
Access Token
- Short-lived JWT (expires quickly - minutes)
- HTTP-only secure cookie (not accessible via JavaScript)
- Used for all API requests including SSE connection establishment
- Contains user claims and permissions
- Must be refreshed frequently using refresh token
-
Refresh Token
- Long-lived JWT (extended expiry)
- HTTP-only secure cookie
- JTI (JWT ID) links to Redis-cached device fingerprint
- Used only for obtaining new access tokens
- Validated against device fingerprint for security
// axios interceptor for token refresh
axios.interceptors.response.use(
response => response,
async error => {
if (error.response?.status === 403) {
// Access token expired - refresh it
await refreshAccessToken();
// Retry original request
return axios(error.config);
}
return Promise.reject(error);
}
);// Client-side SSE connection with cookie-based auth
class ProjectNotificationClient {
async connect() {
// Cookies (access token) automatically sent with request
const eventSource = new EventSource('/sse/connect', {
withCredentials: true // Include cookies
});
eventSource.onerror = (error) => {
// If 403, access token expired
if (error.status === 403) {
// Close current connection
eventSource.close();
// Wait for Axios interceptor to refresh token
setTimeout(() => {
this.connect(); // Reconnect with new token
}, 1000);
}
};
}
}// SSE endpoint with JWT validation
app.get('/sse/connect',
authenticateMiddleware, // Validates access token from cookie
async (req, res) => {
// Access token already validated by middleware
const userId = req.user.id;
// Establish SSE connection
sseService.handleSSEConnection(req, res);
}
);
// Authentication middleware
const authenticateMiddleware = (req, res, next) => {
const accessToken = req.cookies.accessToken;
if (!accessToken) {
return res.status(401).json({ error: 'No access token' });
}
try {
const decoded = jwt.verify(accessToken, ACCESS_TOKEN_SECRET);
req.user = decoded;
next();
} catch (error) {
if (error.name === 'TokenExpiredError') {
return res.status(403).json({ error: 'Access token expired' });
}
return res.status(401).json({ error: 'Invalid token' });
}
};CRITICAL ISSUE: SSE connections can live for hours, but access tokens expire in minutes. This creates a security challenge.
Timeline:
0 min: User connects to SSE (access token valid)
5 min: Access token expires in cookie
10 min: SSE connection still open (using expired token!)
15 min: Security vulnerability - connection outlives authorization
class SSEConnectionManager {
constructor(options = {}) {
// Connection TTL matches access token expiry
this.maxConnectionAge = options.accessTokenTTL || 300000; // 5 minutes
this.gracePeriod = 30000; // 30 second grace period for refresh
}
addConnection(connectionId, userId, req, res) {
const connection = {
id: connectionId,
userId,
request: req,
response: res,
createdAt: Date.now(),
lastActivity: Date.now(),
tokenExpiry: Date.now() + this.maxConnectionAge,
// Store token metadata (not the token itself)
tokenMetadata: {
issuedAt: req.user.iat * 1000,
expiresAt: req.user.exp * 1000,
sessionId: req.user.sessionId
}
};
// Schedule connection close before token expires
setTimeout(() => {
this.closeConnectionForTokenExpiry(connectionId);
}, this.maxConnectionAge - this.gracePeriod);
this.connections.set(connectionId, connection);
}
closeConnectionForTokenExpiry(connectionId) {
const conn = this.connections.get(connectionId);
if (!conn) return;
// Send token expiry event to trigger client reconnect
try {
if (conn.response && !conn.response.writableEnded) {
conn.response.write(`event: token_expiring\ndata: ${JSON.stringify({
reason: 'Access token expiring',
reconnectIn: this.gracePeriod,
timestamp: Date.now()
})}\n\n`);
// Give client time to reconnect
setTimeout(() => {
this.closeConnection(connectionId, 'Token expired');
}, this.gracePeriod);
}
} catch (e) {
this.closeConnection(connectionId, 'Token expired - connection dead');
}
}
}class SSEConnectionManager {
constructor(options = {}) {
this.sessionCheckInterval = 60000; // Check every minute
this.cacheService = options.cacheService;
this.startSessionValidation();
}
startSessionValidation() {
setInterval(async () => {
await this.validateAllSessions();
}, this.sessionCheckInterval);
}
async validateAllSessions() {
for (const [connId, conn] of this.connections) {
// Check if user session is still valid in Redis
const sessionValid = await this.cacheService.get(
CacheKeys.USER_SESSION,
conn.tokenMetadata.sessionId
);
if (!sessionValid) {
// Session invalidated (logout, token refresh failed, etc.)
this.closeConnection(connId, 'Session invalidated');
continue;
}
// Check if refresh token is still valid
const refreshTokenValid = await this.cacheService.get(
CacheKeys.REFRESH_TOKEN,
conn.userId
);
if (!refreshTokenValid) {
// User's refresh token expired or revoked
this.closeConnection(connId, 'Authentication expired');
}
}
}
}// Client-side handling
class ProjectNotificationClient {
constructor() {
this.tokenRefreshTimer = null;
this.accessTokenTTL = 5 * 60 * 1000; // 5 minutes
}
async connect() {
// Clear any existing timer
if (this.tokenRefreshTimer) {
clearTimeout(this.tokenRefreshTimer);
}
// Schedule reconnect before token expires
this.tokenRefreshTimer = setTimeout(() => {
this.handleTokenExpiry();
}, this.accessTokenTTL - 30000); // 30 seconds before expiry
// Create SSE connection
this.eventSource = new EventSource('/sse/connect', {
withCredentials: true
});
this.setupEventHandlers();
}
async handleTokenExpiry() {
console.log('Access token expiring, refreshing and reconnecting...');
// Close current connection
if (this.eventSource) {
this.eventSource.close();
}
// Trigger token refresh via any API call
try {
await axios.get('/api/auth/ping'); // This triggers refresh
// Reconnect with new token
this.connect();
} catch (error) {
console.error('Token refresh failed:', error);
// Let user know they need to re-authenticate
}
}
setupEventHandlers() {
// Handle server-initiated token expiry
this.eventSource.addEventListener('token_expiring', (event) => {
const data = JSON.parse(event.data);
console.log('Server: token expiring', data);
// Reconnect flow
this.handleTokenExpiry();
});
// Handle forced disconnection
this.eventSource.addEventListener('auth_expired', (event) => {
console.log('Server: authentication expired');
this.eventSource.close();
// Trigger re-authentication flow
window.location.href = '/login';
});
}
}Use Approach 1 (Connection Lifetime = Token Lifetime) with Client Reconnect:
-
Server Side:
- Force connection close at token expiry time
- Send warning event 30 seconds before closing
- Clean disconnect with clear reason
-
Client Side:
- Listen for token_expiring event
- Proactively reconnect before forced disconnect
- Handle reconnection with exponential backoff
-
Security Benefits:
- No connections outlive their authorization
- Clean audit trail of connection lifecycle
- Prevents zombie authenticated connections
-
User Experience:
- Seamless reconnection (user doesn't notice)
- No lost events (reconnect is planned)
- Clear error messages if auth fails
1. Initial Connection (T+0):
- Client connects with valid access token (5 min TTL)
- Server validates token, establishes SSE
- Server schedules disconnect at T+4:30
2. Normal Operations (T+0 to T+4:30):
- Events flow normally
- Access token in cookie expires at T+5:00
- But SSE connection still authenticated
3. Pre-Expiry Warning (T+4:30):
- Server sends "token_expiring" event
- Client receives warning, starts reconnect
4. Client Reconnect (T+4:35):
- Client closes old SSE connection
- Client makes API call (triggers token refresh)
- Axios interceptor refreshes access token
- Client establishes new SSE connection
5. Server Cleanup (T+5:00):
- If client didn't reconnect, server force-closes
- Connection removed from all tracking
- Clean security boundary maintained
6. Continuous Cycle:
- Process repeats every 5 minutes
- User never notices disconnection
- Security always maintained
1794 + 1980 = 3774
// Server configuration
const sseManager = new SSEConnectionManager({
maxConnections: 5000,
maxConnectionAge: 5 * 60 * 1000, // Match access token TTL
gracePeriod: 30 * 1000, // 30 second warning
staleTimeout: 2 * 60 * 1000, // 2 min inactivity
maxUserConnections: 5,
accessTokenTTL: process.env.ACCESS_TOKEN_TTL || 300000,
cacheService: cacheService
});
// Client configuration
const notificationClient = new ProjectNotificationClient({
url: '/sse/connect',
accessTokenTTL: 5 * 60 * 1000, // Must match server
reconnectDelay: 1000,
maxReconnectAttempts: 5,
onTokenExpiring: async () => {
// Custom handling if needed
await refreshMyTokens();
}
});This approach ensures:
- Security: No connection outlives its authorization
- Reliability: Planned reconnects, not unexpected drops
- Performance: Connection pooling within token lifetime
- User Experience: Transparent to end user
// SSE Service with periodic auth validation
class ProjectSSEService {
async validateConnection(connection) {
// Optionally validate connection is still authorized
// Could check Redis for user session validity
const sessionValid = await this.cache.get(
CacheKeys.USER_SESSION,
connection.userId
);
if (!sessionValid) {
// Send disconnect event
this.sendEvent(connection, {
type: 'auth_expired',
data: { message: 'Authentication expired, please reconnect' }
});
// Close connection
connection.response.end();
this.handleDisconnect(connection.id, connection.userId);
}
}
}- No tokens in URLs - Tokens only in secure HTTP-only cookies
- Automatic token refresh - Axios interceptor handles 403 responses
- Device fingerprinting - Refresh tokens validated against device
- Redis session cache - Quick validation without database hits
- Channel isolation - User-specific Redis pub/sub channels
- Rate limiting - Applied to SSE endpoints like all APIs
- HTTPS only - All connections over TLS
- CORS configuration - Properly configured for SSE endpoints
The implementation uses various Redis operations beyond simple get/set:
- Basic Operations:
SET,GET,DEL,EXPIRE- Connection details and temporary data - Hash Operations:
HSET,HGET,HDEL,HKEYS,HLEN- Server-user connection mappings - Set Operations:
SADD,SMEMBERS,SREM- Project subscriber lists - Sorted Set Operations:
ZADD,ZRANGE,ZREMRANGEBYRANK- Pending notification queues with timestamps
- Pub/Sub:
PUBLISH,SUBSCRIBE,PSUBSCRIBE,UNSUBSCRIBE- Cross-server message distribution - Pipeline: Batch operations for efficient bulk updates during sync
- Connection Registry (Hashes) - Track which users are connected to which servers
- Subscription Management (Sets & Hashes) - Map projects to users and vice versa
- Notification Queue (Sorted Sets) - Store time-ordered pending notifications
- Cross-Server Communication (Pub/Sub) - Enable SSE without session affinity
- Performance Optimization (Pipeline) - Reduce network round trips for bulk operations
A Redis facade abstraction is recommended for this architecture to provide:
class CacheFacade {
// Connection Management
async setConnection(serverId, userId, connectionId)
async removeConnection(serverId, userId)
async getServerConnections(serverId)
// Subscription Management
async addSubscription(projectId, eventType, userId)
async removeSubscription(projectId, eventType, userId)
async getSubscribers(projectId, eventType)
async getUserSubscriptions(userId)
// Notification Queue
async queueNotification(userId, notification)
async getPendingNotifications(userId)
async clearPendingNotifications(userId)
// Batching
async batchOperations(operations)
}Benefits of Facade:
- Swap Redis for alternatives (Memcached, Hazelcast, DynamoDB)
- Integrated circuit breaker and failover logic
- Simplified testing with mock implementations
- Centralized monitoring and metrics
- Abstracted connection pooling
Note: Pub/Sub operations should remain separate as they're handled by the existing MessageBusFacade.
Important: Redis is used as a cache, not a source of truth. MongoDB stores all persistent data.
When Redis restarts, all in-memory data is lost:
- Active Connections (
sse_connections:*) - Automatically recreated when clients reconnect - Subscription Cache (
project_subs:*,user_subs:*) - Rebuilt from MongoDB within 60 seconds - Pending Notifications - Lost (acceptable for non-critical notifications)
- Pub/Sub Subscriptions - Re-established automatically by SSE services
The Notification Worker already handles Redis recovery through periodic sync:
// Worker syncs subscriptions from MongoDB every 60 seconds
this.syncInterval = setInterval(() => this.syncSubscriptions(), 60000);
async syncSubscriptions() {
// Clear old cache
const projectSubKeys = await this.redis.keys('project_subs:*');
if (projectSubKeys.length > 0) {
await this.redis.del(...projectSubKeys);
}
// Rebuild from MongoDB (source of truth)
const users = await this.db.collection('users')
.find({ 'projectSubscriptions.0': { $exists: true } })
.toArray();
// Populate Redis cache
// ... (batch operations to rebuild subscription mappings)
}To minimise the recovery window:
- Immediate Sync on Redis Connection
this.redis.on('ready', async () => {
await this.syncSubscriptions();
});- Fallback to MongoDB During Cache Miss
async getSubscribers(projectId, eventType) {
let subscribers = await this.redis.smembers(`project_subs:${projectId}:${eventType}`);
if (subscribers.length === 0 && !await this.redis.exists('cache_sync_timestamp')) {
// Cache invalid, query MongoDB directly
subscribers = await this.queryMongoDBForSubscribers(projectId, eventType);
setImmediate(() => this.syncSubscriptions());
}
return subscribers;
}- Circuit Breaker with Local Fallback
- When Redis is unavailable, fall back to local in-memory map per API instance
- Automatically reconnect and resync when Redis returns
Key Point: The system remains operational during Redis failures, with temporary degradation in notification routing that self-heals within 60 seconds.
While not exhaustively covered in this document, the implementation includes:
- Redis circuit breaker - Automatically fails over to local in-memory map per API instance when Redis is unavailable, then flips back to Redis once connectivity is restored
- MongoDB fallback - Query source of truth directly when cache is invalid
- Automatic cache rebuild - Subscriptions resynced from MongoDB every 60 seconds
- Redis connection retry logic with exponential backoff
- Worker failure recovery through message bus acknowledgment patterns
- Client-side automatic reconnection with backoff
- Graceful degradation to queued notifications for offline users
- Load testing should target the specific connection limits mentioned (10K+ concurrent)
- Prometheus metrics provided enable comprehensive monitoring
- Health check endpoints allow for orchestrator integration
- Detailed testing strategy is out of scope for this ADR
- Deployment guides, CI/CD pipelines, and operational runbooks will be documented separately
- Container orchestration, scaling policies, and infrastructure as code are not yet documented
- These aspects will be addressed in dedicated deployment and operations documentation
- Periodic sync between MongoDB and Redis subscription cache (every 60 seconds)
- Event-driven cache updates for real-time consistency
- Message bus ensures at-least-once delivery semantics
- MongoDB replica set with automatic failover (infrastructure level, out of scope for this document)
SSE without session affinity is completely achievable using Redis Pub/Sub for coordination. The key insight is that the server handling the connection doesn't need to be the server processing the business logic - they communicate through Redis.
For production at scale, consider Mercure or Centrifugo as they handle all the complexity for you, but the NodeJS + Redis solution is production-ready for moderate scales and provides a solid foundation for project-based event notifications.