p-limit is fully functional and has proven usage based on download numbers.
Reimplemented in TypeScript for cases where I wanted to avoid adding project dependencies and wanted to try implementing it myself.
Can verify functionality in TypeScript Playground.
interface LimitFunction { <Arguments extends readonly unknown[], Result>( fn: (...args: Arguments) => PromiseLike<Result> | Result, ...args: Arguments ): Promise<Result>; readonly activeCount: number; readonly pendingCount: number; clearQueue(): void; concurrency: number; map<Input, Result>( iterable: Iterable<Input>, fn: (value: Input, index: number) => PromiseLike<Result> | Result ): Promise<Result[]>;}
interface LimitFunctionOptions { readonly concurrency: number;}
function validateConcurrency( concurrency: number): asserts concurrency is number { if (!((Number.isInteger(concurrency) || concurrency === Number.POSITIVE_INFINITY) && concurrency > 0)) { throw new TypeError('Expected `concurrency` to be a number from 1 and up'); }}
function pLimit(initialConcurrency: number): LimitFunction { validateConcurrency(initialConcurrency);
const queue = createQueue<() => void>(); let activeCount = 0; let concurrency = initialConcurrency;
const resumeNext = (): void => { if (activeCount < concurrency && queue.size > 0) { activeCount++; const next = queue.dequeue(); if (next !== undefined) { next(); } } };
const next = (): void => { activeCount--; resumeNext(); };
const run = async <Arguments extends readonly unknown[], Result>( fn: (...args: Arguments) => PromiseLike<Result> | Result, resolve: (value: Result | PromiseLike<Result>) => void, args: Arguments ): Promise<void> => { const result = (async (): Promise<Result> => fn(...args))();
resolve(result as Result | PromiseLike<Result>);
try { await result; } catch { }
next(); };
const enqueue = <Arguments extends readonly unknown[], Result>( fn: (...arguments_: Arguments) => PromiseLike<Result> | Result, resolve: (value: Result | PromiseLike<Result>) => void, args: Arguments ): void => { new Promise<void>((internalResolve): void => { queue.enqueue(internalResolve); }).then((): Promise<void> => run(fn, resolve, args));
if (activeCount < concurrency) { resumeNext(); } };
const generator = <Arguments extends readonly unknown[], Result>( fn: (...args: Arguments) => PromiseLike<Result> | Result, ...args: Arguments ): Promise<Result> => new Promise<Result>((resolve): void => { enqueue(fn, resolve, args); });
Object.defineProperties(generator, { activeCount: { get(): number { return activeCount; }, }, pendingCount: { get(): number { return queue.size; }, }, clearQueue: { value(): void { queue.clear(); }, }, concurrency: { get(): number { return concurrency; },
set(newConcurrency: number): void { validateConcurrency(newConcurrency); concurrency = newConcurrency;
queueMicrotask((): void => { while (activeCount < concurrency && queue.size > 0) { resumeNext(); } }); }, }, map: { async value<Input, Result>( this: LimitFunction, iterable: Iterable<Input>, fn: (value: Input, index: number) => PromiseLike<Result> | Result ): Promise<Result[]> { const promises = Array.from( iterable, (value: Input, index: number): Promise<Result> => this(fn, value, index) ); return Promise.all(promises); }, }, });
return generator as LimitFunction;}
function limitFunction<Arguments extends readonly unknown[], Result>( fn: (...args: Arguments) => PromiseLike<Result> | Result, options: LimitFunctionOptions): (...args: Arguments) => Promise<Result> { const { concurrency } = options; const limit = pLimit(concurrency);
return (...args: Arguments): Promise<Result> => limit((): PromiseLike<Result> | Result => fn(...args));}//-----------------------------------------------------// yocto-queue//-----------------------------------------------------
interface QNode<T> { value: T; next: QNode<T> | undefined;}
interface Queue<T> { enqueue(value: T): void; dequeue(): T | undefined; peek(): T | undefined; clear(): void; readonly size: number; [Symbol.iterator](): Generator<T, void, unknown>; drain(): Generator<T | undefined, void, unknown>;}
function createNode<T>(value: T): QNode<T> { return { value, next: undefined };}
function createQueue<T>(): Queue<T> { let head: QNode<T> | undefined; let tail: QNode<T> | undefined; let size: number;
const clear = (): void => { head = tail = undefined; size = 0; }
clear();
const enqueue = (value: T): void => { const node = createNode(value); if (head) { tail && (tail.next = node); tail = node; } else { head = node; tail = node; }
size++; }
const dequeue = (): T | undefined => { const current = head; if (!current) return; head = head?.next; size--; return current.value; }
const peek = (): T | undefined => { return head?.value; }
function* iterator(): Generator<T, void, unknown> { let current = head; while (current) { yield current.value; current = current.next; } }
function* drain(): Generator<T | undefined, void, unknown> { while (head) { yield dequeue(); } }
return { enqueue, dequeue, peek, clear, get size() { return size; }, [Symbol.iterator]: iterator, drain }}
hsb.horse