Skip to content

Instantly share code, notes, and snippets.

@orrisroot
Last active October 3, 2025 03:48
Show Gist options
  • Select an option

  • Save orrisroot/fc0541c8969e70b661295bb495609bf1 to your computer and use it in GitHub Desktop.

Select an option

Save orrisroot/fc0541c8969e70b661295bb495609bf1 to your computer and use it in GitHub Desktop.
A concurrency‑controlled task queue
/**
* A concurrency‑controlled task queue.
*
* It uses `eventemitter3` for signalling when all queued work has finished.
*
* Usage:
* const queue = new TaskQueue(10);
* queue.enqueue(() => doSomething());
* await queue.waitAll(); // resolves when nothing is left
*/
import EventEmitter from 'eventemitter3';
/** Helper to schedule a callback on the next tick of the event loop */
function nextTick(fn: () => void): void {
// Node
if (typeof setImmediate === 'function') return setImmediate(fn);
// Modern browsers – micro‑task queue
if (typeof queueMicrotask === 'function')
return queueMicrotask(fn);
// Fallback for older environments
setTimeout(fn, 0);
}
type TaskFn = () => Promise<void>;
export class TaskQueue {
/** maximum number of tasks that may run concurrently */
private readonly concurrency: number;
/** how many tasks are currently running */
private activeCount = 0;
/** FIFO list of pending task factories */
private readonly queue: TaskFn[] = [];
/** event emitter used to broadcast completion */
private readonly ev = new EventEmitter();
constructor(concurrency: number) {
this.concurrency = concurrency;
}
/**
* Add a task to the queue.
*
* @param task factory that returns a Promise when invoked
*/
enqueue(task: TaskFn): void {
this.queue.push(task);
this.runNext();
}
/** start as many tasks as possible given the concurrency limit */
private runNext(): void {
if (this.activeCount >= this.concurrency) return;
const task = this.queue.shift();
if (!task) {
// No pending work → emit idle event
this.ev.emit('idle');
return;
}
this.activeCount++;
task()
.catch(err => console.error('Task error:', err))
.finally(() => {
this.activeCount--;
nextTick(() => this.runNext());
if (this.activeCount === 0 && this.queue.length === 0) {
this.ev.emit('idle');
}
});
}
/**
* Returns a Promise that resolves when both:
* - there are no active tasks, and
* - the pending queue is empty.
*/
async waitAll(): Promise<void> {
// Quick path: nothing to wait for
if (this.activeCount === 0 && this.queue.length === 0) return;
return new Promise(resolve => {
const onIdle = () => {
this.ev.removeListener('idle', onIdle);
resolve();
};
this.ev.on('idle', onIdle);
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment