Last active
October 3, 2025 03:48
-
-
Save orrisroot/fc0541c8969e70b661295bb495609bf1 to your computer and use it in GitHub Desktop.
A concurrency‑controlled task queue
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| /** | |
| * A concurrency‑controlled task queue. | |
| * | |
| * It uses `eventemitter3` for signalling when all queued work has finished. | |
| * | |
| * Usage: | |
| * const queue = new TaskQueue(10); | |
| * queue.enqueue(() => doSomething()); | |
| * await queue.waitAll(); // resolves when nothing is left | |
| */ | |
| import EventEmitter from 'eventemitter3'; | |
| /** Helper to schedule a callback on the next tick of the event loop */ | |
| function nextTick(fn: () => void): void { | |
| // Node | |
| if (typeof setImmediate === 'function') return setImmediate(fn); | |
| // Modern browsers – micro‑task queue | |
| if (typeof queueMicrotask === 'function') | |
| return queueMicrotask(fn); | |
| // Fallback for older environments | |
| setTimeout(fn, 0); | |
| } | |
| type TaskFn = () => Promise<void>; | |
| export class TaskQueue { | |
| /** maximum number of tasks that may run concurrently */ | |
| private readonly concurrency: number; | |
| /** how many tasks are currently running */ | |
| private activeCount = 0; | |
| /** FIFO list of pending task factories */ | |
| private readonly queue: TaskFn[] = []; | |
| /** event emitter used to broadcast completion */ | |
| private readonly ev = new EventEmitter(); | |
| constructor(concurrency: number) { | |
| this.concurrency = concurrency; | |
| } | |
| /** | |
| * Add a task to the queue. | |
| * | |
| * @param task factory that returns a Promise when invoked | |
| */ | |
| enqueue(task: TaskFn): void { | |
| this.queue.push(task); | |
| this.runNext(); | |
| } | |
| /** start as many tasks as possible given the concurrency limit */ | |
| private runNext(): void { | |
| if (this.activeCount >= this.concurrency) return; | |
| const task = this.queue.shift(); | |
| if (!task) { | |
| // No pending work → emit idle event | |
| this.ev.emit('idle'); | |
| return; | |
| } | |
| this.activeCount++; | |
| task() | |
| .catch(err => console.error('Task error:', err)) | |
| .finally(() => { | |
| this.activeCount--; | |
| nextTick(() => this.runNext()); | |
| if (this.activeCount === 0 && this.queue.length === 0) { | |
| this.ev.emit('idle'); | |
| } | |
| }); | |
| } | |
| /** | |
| * Returns a Promise that resolves when both: | |
| * - there are no active tasks, and | |
| * - the pending queue is empty. | |
| */ | |
| async waitAll(): Promise<void> { | |
| // Quick path: nothing to wait for | |
| if (this.activeCount === 0 && this.queue.length === 0) return; | |
| return new Promise(resolve => { | |
| const onIdle = () => { | |
| this.ev.removeListener('idle', onIdle); | |
| resolve(); | |
| }; | |
| this.ev.on('idle', onIdle); | |
| }); | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment