logo hsb.horse
← Zur Snippets-Übersicht

Snippets

TypeScript-Implementierung von p-limit

TypeScript-Neuimplementierung von p-limit zur Steuerung der Promise-Konkurrenz. Konkurrenzkontrolle ohne zusätzliche Abhängigkeiten.

Veröffentlicht: Aktualisiert:

p-limit ist funktional bereits gut belegt und auch die Downloadzahlen zeigen, dass es breit genutzt wird.

Ich habe es in TypeScript nachgebaut, weil ich keine zusätzliche Abhängigkeit ins Projekt holen wollte und die Implementierung selbst ausprobieren wollte.

Die Funktion lässt sich im TypeScript Playground prüfen.

interface LimitFunction {
<Arguments extends readonly unknown[], Result>(
fn: (...args: Arguments) => PromiseLike<Result> | Result,
...args: Arguments
): Promise<Result>;
readonly activeCount: number;
readonly pendingCount: number;
clearQueue(): void;
concurrency: number;
map<Input, Result>(
iterable: Iterable<Input>,
fn: (value: Input, index: number) => PromiseLike<Result> | Result
): Promise<Result[]>;
}
interface LimitFunctionOptions {
readonly concurrency: number;
}
function validateConcurrency(
concurrency: number
): asserts concurrency is number {
if (!((Number.isInteger(concurrency) || concurrency === Number.POSITIVE_INFINITY) && concurrency > 0)) {
throw new TypeError('Expected `concurrency` to be a number from 1 and up');
}
}
function pLimit(initialConcurrency: number): LimitFunction {
validateConcurrency(initialConcurrency);
const queue = createQueue<() => void>();
let activeCount = 0;
let concurrency = initialConcurrency;
const resumeNext = (): void => {
if (activeCount < concurrency && queue.size > 0) {
activeCount++;
const next = queue.dequeue();
if (next !== undefined) {
next();
}
}
};
const next = (): void => {
activeCount--;
resumeNext();
};
const run = async <Arguments extends readonly unknown[], Result>(
fn: (...args: Arguments) => PromiseLike<Result> | Result,
resolve: (value: Result | PromiseLike<Result>) => void,
args: Arguments
): Promise<void> => {
const result = (async (): Promise<Result> => fn(...args))();
resolve(result as Result | PromiseLike<Result>);
try {
await result;
} catch { }
next();
};
const enqueue = <Arguments extends readonly unknown[], Result>(
fn: (...arguments_: Arguments) => PromiseLike<Result> | Result,
resolve: (value: Result | PromiseLike<Result>) => void,
args: Arguments
): void => {
new Promise<void>((internalResolve): void => {
queue.enqueue(internalResolve);
}).then((): Promise<void> => run(fn, resolve, args));
if (activeCount < concurrency) {
resumeNext();
}
};
const generator = <Arguments extends readonly unknown[], Result>(
fn: (...args: Arguments) => PromiseLike<Result> | Result,
...args: Arguments
): Promise<Result> => new Promise<Result>((resolve): void => {
enqueue(fn, resolve, args);
});
Object.defineProperties(generator, {
activeCount: {
get(): number {
return activeCount;
},
},
pendingCount: {
get(): number {
return queue.size;
},
},
clearQueue: {
value(): void {
queue.clear();
},
},
concurrency: {
get(): number {
return concurrency;
},
set(newConcurrency: number): void {
validateConcurrency(newConcurrency);
concurrency = newConcurrency;
queueMicrotask((): void => {
while (activeCount < concurrency && queue.size > 0) {
resumeNext();
}
});
},
},
map: {
async value<Input, Result>(
this: LimitFunction,
iterable: Iterable<Input>,
fn: (value: Input, index: number) => PromiseLike<Result> | Result
): Promise<Result[]> {
const promises = Array.from(
iterable,
(value: Input, index: number): Promise<Result> => this(fn, value, index)
);
return Promise.all(promises);
},
},
});
return generator as LimitFunction;
}
function limitFunction<Arguments extends readonly unknown[], Result>(
fn: (...args: Arguments) => PromiseLike<Result> | Result,
options: LimitFunctionOptions
): (...args: Arguments) => Promise<Result> {
const { concurrency } = options;
const limit = pLimit(concurrency);
return (...args: Arguments): Promise<Result> =>
limit((): PromiseLike<Result> | Result => fn(...args));
}
//-----------------------------------------------------
// yocto-queue
//-----------------------------------------------------
interface QNode<T> {
value: T;
next: QNode<T> | undefined;
}
interface Queue<T> {
enqueue(value: T): void;
dequeue(): T | undefined;
peek(): T | undefined;
clear(): void;
readonly size: number;
[Symbol.iterator](): Generator<T, void, unknown>;
drain(): Generator<T | undefined, void, unknown>;
}
function createNode<T>(value: T): QNode<T> {
return { value, next: undefined };
}
function createQueue<T>(): Queue<T> {
let head: QNode<T> | undefined;
let tail: QNode<T> | undefined;
let size: number;
const clear = (): void => {
head = tail = undefined;
size = 0;
}
clear();
const enqueue = (value: T): void => {
const node = createNode(value);
if (head) {
tail && (tail.next = node);
tail = node;
} else {
head = node;
tail = node;
}
size++;
}
const dequeue = (): T | undefined => {
const current = head;
if (!current) return;
head = head?.next;
size--;
return current.value;
}
const peek = (): T | undefined => {
return head?.value;
}
function* iterator(): Generator<T, void, unknown> {
let current = head;
while (current) {
yield current.value;
current = current.next;
}
}
function* drain(): Generator<T | undefined, void, unknown> {
while (head) {
yield dequeue();
}
}
return {
enqueue,
dequeue,
peek,
clear,
get size() {
return size;
},
[Symbol.iterator]: iterator,
drain
}
}

Wo es in der Praxis wirkt

Diese Implementierung hilft, wenn Promise-Arbeit parallel laufen soll, aber ein gleichzeitiger Vollstart zu teuer wäre. Sie passt zu API-Batches, Dateiverarbeitung und Scraping, bei denen die Obergrenze der Parallelität wichtiger ist als rohe Geschwindigkeit.

Genau hinsehen sollte man bei der Entleerung der Queue und beim Fehlerverhalten. Je nachdem, ob ein einzelner Fehler alles stoppen oder nur den einen Task betreffen soll, sieht der Wrapper anders aus.

Praxishinweis

Dieses Snippet passt gut, wenn dieselbe Operation oder Prüfung im Umfeld von typescript, promise, concurrency nicht immer wieder neu geschrieben werden soll. Als kleine Hilfsfunktion bleibt aufrufender Code leichter lesbar.

Wenn jedoch Verzweigungen und Voraussetzungen zunehmen, sollte nicht alles in ein einziges Snippet gepackt werden. Getrennte Schritte und klar abgegrenzte Helfer bleiben auf Dauer wartbarer.