|
import { Worker, parentPort } from 'node:worker_threads'; |
|
|
|
class Transmitter<I, O> { |
|
private listeners: { [pid: string]: (output: O) => void } = {}; |
|
private readonly createUniqueId = createUniqueIdFactory(); |
|
|
|
constructor( |
|
private readonly taskId: string, |
|
private readonly worker: Worker |
|
) { |
|
this.worker = worker; |
|
this.worker.addListener( |
|
'message', |
|
({ data }: { data: { pid: string; taskId: string; output: O } }) => { |
|
if (data.taskId !== this.taskId) { |
|
return; |
|
} |
|
|
|
const listener = this.listeners[data.pid]; |
|
|
|
if (listener) { |
|
listener(data.output); |
|
delete this.listeners[data.pid]; |
|
} else { |
|
throw new Error(`Unknown PID: ${data.pid}`); |
|
} |
|
} |
|
); |
|
} |
|
|
|
async send(input: I) { |
|
const pid = this.createUniqueId(); |
|
this.worker.postMessage({ pid, taskId: this.taskId, input }); |
|
return new Promise<O>((resolve) => { |
|
this.listeners[pid] = resolve; |
|
}); |
|
} |
|
} |
|
|
|
class Receiver<I, O> { |
|
constructor(private readonly taskId: string) {} |
|
|
|
listen(callback: (input: I) => Promise<O | null>) { |
|
parentPort?.addListener( |
|
'message', |
|
async ({ data }: { data: { pid: string; taskId: string; input: I } }) => { |
|
if (data.taskId !== this.taskId) { |
|
return; |
|
} |
|
|
|
const result = await callback(data.input); |
|
|
|
if (result === null) { |
|
return; |
|
} |
|
|
|
parentPort?.postMessage({ pid: data.pid, output: result }); |
|
} |
|
); |
|
} |
|
} |
|
|
|
export class WorkerTask<I, O> { |
|
constructor(private readonly taskId: string) {} |
|
|
|
createTransmitter(worker: Worker) { |
|
return new Transmitter<I, O>(this.taskId, worker); |
|
} |
|
|
|
createReceiver() { |
|
return new Receiver<I, O>(this.taskId); |
|
} |
|
} |
|
|
|
function createUniqueIdFactory() { |
|
let index = 0; |
|
return () => { |
|
index++; |
|
return `${index}`; |
|
}; |
|
} |