Skip to content

Instantly share code, notes, and snippets.

@gartmeier
Last active December 18, 2025 09:09
Show Gist options
  • Select an option

  • Save gartmeier/6ee0c9694fc77fe58ab6564af75a5c65 to your computer and use it in GitHub Desktop.

Select an option

Save gartmeier/6ee0c9694fc77fe58ab6564af75a5c65 to your computer and use it in GitHub Desktop.
Node.js UDP ingress with batching for high-throughput IoT measurements
// udp-ingress.js
// Node.js UDP ingress with batching for high-throughput IoT measurements
const dgram = require('dgram');
const http = require('http');
const cluster = require('cluster');
const os = require('os');
// Configuration
const CONFIG = {
udpPort: 5000,
batchSize: 100, // Max events per batch
flushIntervalMs: 50, // Max wait time before sending partial batch
djangoHost: 'localhost',
djangoPort: 8000,
djangoPath: '/api/measurements/batch/',
};
// HTTP agent with connection pooling
const httpAgent = new http.Agent({
keepAlive: true,
maxSockets: 50,
maxFreeSockets: 10,
timeout: 60000,
});
// Batch buffer
let batch = [];
let flushTimer = null;
function flushBatch() {
if (batch.length === 0) return;
const payload = JSON.stringify({ measurements: batch });
const batchCount = batch.length;
batch = [];
const req = http.request({
hostname: CONFIG.djangoHost,
port: CONFIG.djangoPort,
path: CONFIG.djangoPath,
method: 'POST',
agent: httpAgent,
headers: {
'Content-Type': 'application/json',
'Content-Length': Buffer.byteLength(payload),
},
}, (res) => {
if (res.statusCode !== 200 && res.statusCode !== 201) {
console.error(`Batch failed: ${res.statusCode} (${batchCount} events)`);
}
res.resume(); // Drain response
});
req.on('error', (err) => {
console.error(`Request error: ${err.message} (${batchCount} events lost)`);
// TODO: Add to retry queue or dead letter queue
});
req.write(payload);
req.end();
}
function addToBatch(measurement) {
batch.push(measurement);
// Flush if batch is full
if (batch.length >= CONFIG.batchSize) {
clearTimeout(flushTimer);
flushBatch();
return;
}
// Start flush timer if this is first item in batch
if (batch.length === 1) {
flushTimer = setTimeout(flushBatch, CONFIG.flushIntervalMs);
}
}
function parseMeasurement(msg, rinfo) {
try {
// Adjust parsing to your actual UDP message format
const data = JSON.parse(msg.toString());
return {
vehicle_id: data.vehicle_id,
timestamp: data.timestamp || Date.now(),
d1: data.d1, // Ignition
speed: data.speed, // km/h
received_at: Date.now(),
source_ip: rinfo.address,
};
} catch (err) {
console.error(`Parse error: ${err.message}`);
return null;
}
}
function startWorker() {
const server = dgram.createSocket({
type: 'udp4',
reuseAddr: true,
});
server.on('message', (msg, rinfo) => {
const measurement = parseMeasurement(msg, rinfo);
if (measurement) {
addToBatch(measurement);
}
});
server.on('error', (err) => {
console.error(`UDP error: ${err.message}`);
server.close();
});
server.bind({
port: CONFIG.udpPort,
exclusive: false, // Allow socket sharing between workers
});
console.log(`Worker ${process.pid} listening on UDP port ${CONFIG.udpPort}`);
// Flush remaining batch on shutdown
process.on('SIGTERM', () => {
flushBatch();
server.close();
});
}
// Cluster mode: spawn worker per CPU core
if (cluster.isMaster) {
const numWorkers = os.cpus().length;
console.log(`Master ${process.pid} spawning ${numWorkers} workers`);
for (let i = 0; i < numWorkers; i++) {
cluster.fork();
}
cluster.on('exit', (worker, code) => {
console.log(`Worker ${worker.process.pid} exited (code ${code}), restarting...`);
cluster.fork();
});
} else {
startWorker();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment