logo hsb.horse
← Retour à l’index des snippets

Snippets

Implémentation TypeScript de p-limit

Réimplémentation TypeScript de p-limit pour contrôler la concurrence des Promises. Permet un contrôle de concurrence sans dépendance supplémentaire.

Publié: Mis à jour:

p-limit est déjà bien éprouvé et son nombre de téléchargements montre qu’il est largement utilisé.

Je l’ai réimplémenté en TypeScript pour éviter d’ajouter une dépendance au projet et parce que je voulais aussi essayer de le coder moi-même.

Le fonctionnement peut être vérifié dans TypeScript Playground.

interface LimitFunction {
<Arguments extends readonly unknown[], Result>(
fn: (...args: Arguments) => PromiseLike<Result> | Result,
...args: Arguments
): Promise<Result>;
readonly activeCount: number;
readonly pendingCount: number;
clearQueue(): void;
concurrency: number;
map<Input, Result>(
iterable: Iterable<Input>,
fn: (value: Input, index: number) => PromiseLike<Result> | Result
): Promise<Result[]>;
}
interface LimitFunctionOptions {
readonly concurrency: number;
}
function validateConcurrency(
concurrency: number
): asserts concurrency is number {
if (!((Number.isInteger(concurrency) || concurrency === Number.POSITIVE_INFINITY) && concurrency > 0)) {
throw new TypeError('Expected `concurrency` to be a number from 1 and up');
}
}
function pLimit(initialConcurrency: number): LimitFunction {
validateConcurrency(initialConcurrency);
const queue = createQueue<() => void>();
let activeCount = 0;
let concurrency = initialConcurrency;
const resumeNext = (): void => {
if (activeCount < concurrency && queue.size > 0) {
activeCount++;
const next = queue.dequeue();
if (next !== undefined) {
next();
}
}
};
const next = (): void => {
activeCount--;
resumeNext();
};
const run = async <Arguments extends readonly unknown[], Result>(
fn: (...args: Arguments) => PromiseLike<Result> | Result,
resolve: (value: Result | PromiseLike<Result>) => void,
args: Arguments
): Promise<void> => {
const result = (async (): Promise<Result> => fn(...args))();
resolve(result as Result | PromiseLike<Result>);
try {
await result;
} catch { }
next();
};
const enqueue = <Arguments extends readonly unknown[], Result>(
fn: (...arguments_: Arguments) => PromiseLike<Result> | Result,
resolve: (value: Result | PromiseLike<Result>) => void,
args: Arguments
): void => {
new Promise<void>((internalResolve): void => {
queue.enqueue(internalResolve);
}).then((): Promise<void> => run(fn, resolve, args));
if (activeCount < concurrency) {
resumeNext();
}
};
const generator = <Arguments extends readonly unknown[], Result>(
fn: (...args: Arguments) => PromiseLike<Result> | Result,
...args: Arguments
): Promise<Result> => new Promise<Result>((resolve): void => {
enqueue(fn, resolve, args);
});
Object.defineProperties(generator, {
activeCount: {
get(): number {
return activeCount;
},
},
pendingCount: {
get(): number {
return queue.size;
},
},
clearQueue: {
value(): void {
queue.clear();
},
},
concurrency: {
get(): number {
return concurrency;
},
set(newConcurrency: number): void {
validateConcurrency(newConcurrency);
concurrency = newConcurrency;
queueMicrotask((): void => {
while (activeCount < concurrency && queue.size > 0) {
resumeNext();
}
});
},
},
map: {
async value<Input, Result>(
this: LimitFunction,
iterable: Iterable<Input>,
fn: (value: Input, index: number) => PromiseLike<Result> | Result
): Promise<Result[]> {
const promises = Array.from(
iterable,
(value: Input, index: number): Promise<Result> => this(fn, value, index)
);
return Promise.all(promises);
},
},
});
return generator as LimitFunction;
}
function limitFunction<Arguments extends readonly unknown[], Result>(
fn: (...args: Arguments) => PromiseLike<Result> | Result,
options: LimitFunctionOptions
): (...args: Arguments) => Promise<Result> {
const { concurrency } = options;
const limit = pLimit(concurrency);
return (...args: Arguments): Promise<Result> =>
limit((): PromiseLike<Result> | Result => fn(...args));
}
//-----------------------------------------------------
// yocto-queue
//-----------------------------------------------------
interface QNode<T> {
value: T;
next: QNode<T> | undefined;
}
interface Queue<T> {
enqueue(value: T): void;
dequeue(): T | undefined;
peek(): T | undefined;
clear(): void;
readonly size: number;
[Symbol.iterator](): Generator<T, void, unknown>;
drain(): Generator<T | undefined, void, unknown>;
}
function createNode<T>(value: T): QNode<T> {
return { value, next: undefined };
}
function createQueue<T>(): Queue<T> {
let head: QNode<T> | undefined;
let tail: QNode<T> | undefined;
let size: number;
const clear = (): void => {
head = tail = undefined;
size = 0;
}
clear();
const enqueue = (value: T): void => {
const node = createNode(value);
if (head) {
tail && (tail.next = node);
tail = node;
} else {
head = node;
tail = node;
}
size++;
}
const dequeue = (): T | undefined => {
const current = head;
if (!current) return;
head = head?.next;
size--;
return current.value;
}
const peek = (): T | undefined => {
return head?.value;
}
function* iterator(): Generator<T, void, unknown> {
let current = head;
while (current) {
yield current.value;
current = current.next;
}
}
function* drain(): Generator<T | undefined, void, unknown> {
while (head) {
yield dequeue();
}
}
return {
enqueue,
dequeue,
peek,
clear,
get size() {
return size;
},
[Symbol.iterator]: iterator,
drain
}
}

Là où cela paie en pratique

Cette implémentation est utile quand je veux du travail Promise en parallèle, mais qu’un lancement massif coûterait trop cher. Elle convient bien aux batchs d’API, au traitement de fichiers et au scraping, où le plafond de concurrence compte plus que la vitesse brute.

Les deux points à lire avec attention sont la vidange de la file et la gestion des erreurs. Le wrapper change selon qu’un échec doit arrêter tout le lot ou laisser continuer le reste.

Note pratique

Ce snippet convient bien quand on ne veut pas réécrire la même opération ou la même vérification autour de typescript, promise, concurrency. Le garder sous la forme d’un petit utilitaire rend l’intention plus lisible côté appelant.

En revanche, si les branches et les préconditions s’accumulent, mieux vaut ne pas tout tasser dans un seul snippet. Séparer la procédure, le helper et les responsabilités rend l’ensemble plus simple à maintenir.