Last active
December 18, 2025 09:09
-
-
Save gartmeier/6ee0c9694fc77fe58ab6564af75a5c65 to your computer and use it in GitHub Desktop.
Node.js UDP ingress with batching for high-throughput IoT measurements
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| // udp-ingress.js | |
| // Node.js UDP ingress with batching for high-throughput IoT measurements | |
| const dgram = require('dgram'); | |
| const http = require('http'); | |
| const cluster = require('cluster'); | |
| const os = require('os'); | |
| // Configuration | |
| const CONFIG = { | |
| udpPort: 5000, | |
| batchSize: 100, // Max events per batch | |
| flushIntervalMs: 50, // Max wait time before sending partial batch | |
| djangoHost: 'localhost', | |
| djangoPort: 8000, | |
| djangoPath: '/api/measurements/batch/', | |
| }; | |
| // HTTP agent with connection pooling | |
| const httpAgent = new http.Agent({ | |
| keepAlive: true, | |
| maxSockets: 50, | |
| maxFreeSockets: 10, | |
| timeout: 60000, | |
| }); | |
| // Batch buffer | |
| let batch = []; | |
| let flushTimer = null; | |
| function flushBatch() { | |
| if (batch.length === 0) return; | |
| const payload = JSON.stringify({ measurements: batch }); | |
| const batchCount = batch.length; | |
| batch = []; | |
| const req = http.request({ | |
| hostname: CONFIG.djangoHost, | |
| port: CONFIG.djangoPort, | |
| path: CONFIG.djangoPath, | |
| method: 'POST', | |
| agent: httpAgent, | |
| headers: { | |
| 'Content-Type': 'application/json', | |
| 'Content-Length': Buffer.byteLength(payload), | |
| }, | |
| }, (res) => { | |
| if (res.statusCode !== 200 && res.statusCode !== 201) { | |
| console.error(`Batch failed: ${res.statusCode} (${batchCount} events)`); | |
| } | |
| res.resume(); // Drain response | |
| }); | |
| req.on('error', (err) => { | |
| console.error(`Request error: ${err.message} (${batchCount} events lost)`); | |
| // TODO: Add to retry queue or dead letter queue | |
| }); | |
| req.write(payload); | |
| req.end(); | |
| } | |
| function addToBatch(measurement) { | |
| batch.push(measurement); | |
| // Flush if batch is full | |
| if (batch.length >= CONFIG.batchSize) { | |
| clearTimeout(flushTimer); | |
| flushBatch(); | |
| return; | |
| } | |
| // Start flush timer if this is first item in batch | |
| if (batch.length === 1) { | |
| flushTimer = setTimeout(flushBatch, CONFIG.flushIntervalMs); | |
| } | |
| } | |
| function parseMeasurement(msg, rinfo) { | |
| try { | |
| // Adjust parsing to your actual UDP message format | |
| const data = JSON.parse(msg.toString()); | |
| return { | |
| vehicle_id: data.vehicle_id, | |
| timestamp: data.timestamp || Date.now(), | |
| d1: data.d1, // Ignition | |
| speed: data.speed, // km/h | |
| received_at: Date.now(), | |
| source_ip: rinfo.address, | |
| }; | |
| } catch (err) { | |
| console.error(`Parse error: ${err.message}`); | |
| return null; | |
| } | |
| } | |
| function startWorker() { | |
| const server = dgram.createSocket({ | |
| type: 'udp4', | |
| reuseAddr: true, | |
| }); | |
| server.on('message', (msg, rinfo) => { | |
| const measurement = parseMeasurement(msg, rinfo); | |
| if (measurement) { | |
| addToBatch(measurement); | |
| } | |
| }); | |
| server.on('error', (err) => { | |
| console.error(`UDP error: ${err.message}`); | |
| server.close(); | |
| }); | |
| server.bind({ | |
| port: CONFIG.udpPort, | |
| exclusive: false, // Allow socket sharing between workers | |
| }); | |
| console.log(`Worker ${process.pid} listening on UDP port ${CONFIG.udpPort}`); | |
| // Flush remaining batch on shutdown | |
| process.on('SIGTERM', () => { | |
| flushBatch(); | |
| server.close(); | |
| }); | |
| } | |
| // Cluster mode: spawn worker per CPU core | |
| if (cluster.isMaster) { | |
| const numWorkers = os.cpus().length; | |
| console.log(`Master ${process.pid} spawning ${numWorkers} workers`); | |
| for (let i = 0; i < numWorkers; i++) { | |
| cluster.fork(); | |
| } | |
| cluster.on('exit', (worker, code) => { | |
| console.log(`Worker ${worker.process.pid} exited (code ${code}), restarting...`); | |
| cluster.fork(); | |
| }); | |
| } else { | |
| startWorker(); | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment