p-limit já é bem validado e os números de download também mostram uso amplo.
Reimplementei em TypeScript para evitar adicionar dependências ao projeto e porque também queria testar a implementação por conta própria.
Dá para verificar o funcionamento no TypeScript Playground.
interface LimitFunction { <Arguments extends readonly unknown[], Result>( fn: (...args: Arguments) => PromiseLike<Result> | Result, ...args: Arguments ): Promise<Result>; readonly activeCount: number; readonly pendingCount: number; clearQueue(): void; concurrency: number; map<Input, Result>( iterable: Iterable<Input>, fn: (value: Input, index: number) => PromiseLike<Result> | Result ): Promise<Result[]>;}
interface LimitFunctionOptions { readonly concurrency: number;}
function validateConcurrency( concurrency: number): asserts concurrency is number { if (!((Number.isInteger(concurrency) || concurrency === Number.POSITIVE_INFINITY) && concurrency > 0)) { throw new TypeError('Expected `concurrency` to be a number from 1 and up'); }}
function pLimit(initialConcurrency: number): LimitFunction { validateConcurrency(initialConcurrency);
const queue = createQueue<() => void>(); let activeCount = 0; let concurrency = initialConcurrency;
const resumeNext = (): void => { if (activeCount < concurrency && queue.size > 0) { activeCount++; const next = queue.dequeue(); if (next !== undefined) { next(); } } };
const next = (): void => { activeCount--; resumeNext(); };
const run = async <Arguments extends readonly unknown[], Result>( fn: (...args: Arguments) => PromiseLike<Result> | Result, resolve: (value: Result | PromiseLike<Result>) => void, args: Arguments ): Promise<void> => { const result = (async (): Promise<Result> => fn(...args))();
resolve(result as Result | PromiseLike<Result>);
try { await result; } catch { }
next(); };
const enqueue = <Arguments extends readonly unknown[], Result>( fn: (...arguments_: Arguments) => PromiseLike<Result> | Result, resolve: (value: Result | PromiseLike<Result>) => void, args: Arguments ): void => { new Promise<void>((internalResolve): void => { queue.enqueue(internalResolve); }).then((): Promise<void> => run(fn, resolve, args));
if (activeCount < concurrency) { resumeNext(); } };
const generator = <Arguments extends readonly unknown[], Result>( fn: (...args: Arguments) => PromiseLike<Result> | Result, ...args: Arguments ): Promise<Result> => new Promise<Result>((resolve): void => { enqueue(fn, resolve, args); });
Object.defineProperties(generator, { activeCount: { get(): number { return activeCount; }, }, pendingCount: { get(): number { return queue.size; }, }, clearQueue: { value(): void { queue.clear(); }, }, concurrency: { get(): number { return concurrency; },
set(newConcurrency: number): void { validateConcurrency(newConcurrency); concurrency = newConcurrency;
queueMicrotask((): void => { while (activeCount < concurrency && queue.size > 0) { resumeNext(); } }); }, }, map: { async value<Input, Result>( this: LimitFunction, iterable: Iterable<Input>, fn: (value: Input, index: number) => PromiseLike<Result> | Result ): Promise<Result[]> { const promises = Array.from( iterable, (value: Input, index: number): Promise<Result> => this(fn, value, index) ); return Promise.all(promises); }, }, });
return generator as LimitFunction;}
function limitFunction<Arguments extends readonly unknown[], Result>( fn: (...args: Arguments) => PromiseLike<Result> | Result, options: LimitFunctionOptions): (...args: Arguments) => Promise<Result> { const { concurrency } = options; const limit = pLimit(concurrency);
return (...args: Arguments): Promise<Result> => limit((): PromiseLike<Result> | Result => fn(...args));}//-----------------------------------------------------// yocto-queue//-----------------------------------------------------
interface QNode<T> { value: T; next: QNode<T> | undefined;}
interface Queue<T> { enqueue(value: T): void; dequeue(): T | undefined; peek(): T | undefined; clear(): void; readonly size: number; [Symbol.iterator](): Generator<T, void, unknown>; drain(): Generator<T | undefined, void, unknown>;}
function createNode<T>(value: T): QNode<T> { return { value, next: undefined };}
function createQueue<T>(): Queue<T> { let head: QNode<T> | undefined; let tail: QNode<T> | undefined; let size: number;
const clear = (): void => { head = tail = undefined; size = 0; }
clear();
const enqueue = (value: T): void => { const node = createNode(value); if (head) { tail && (tail.next = node); tail = node; } else { head = node; tail = node; }
size++; }
const dequeue = (): T | undefined => { const current = head; if (!current) return; head = head?.next; size--; return current.value; }
const peek = (): T | undefined => { return head?.value; }
function* iterator(): Generator<T, void, unknown> { let current = head; while (current) { yield current.value; current = current.next; } }
function* drain(): Generator<T | undefined, void, unknown> { while (head) { yield dequeue(); } }
return { enqueue, dequeue, peek, clear, get size() { return size; }, [Symbol.iterator]: iterator, drain }}Onde isso rende na prática
Esta implementação ajuda quando eu quero trabalho paralelo com Promises, mas disparar tudo de uma vez sairia caro demais. Ela combina com lotes de API, processamento de arquivos e scraping, em que o teto de concorrência importa mais do que a velocidade bruta.
Os pontos principais para ler com atenção são o escoamento da fila e o tratamento de erros. O wrapper muda dependendo de um erro único parar tudo ou deixar o restante continuar.
Nota prática
Este snippet funciona bem quando você não quer reescrever a mesma operação ou a mesma verificação em rotinas ligadas a typescript, promise, concurrency. Mantido como um apoio pequeno, ele deixa a intenção mais clara para quem chama.
Se as ramificações e as pré-condições começarem a crescer, é melhor não concentrar tudo em um único snippet. Separar o passo a passo, o helper e as responsabilidades costuma ser mais fácil de manter.
hsb.horse