logo hsb.horse
← スニペット一覧に戻る

Snippets

TypeScriptでp-limit実装

Promise並行実行数を制御するp-limitをTypeScriptで再実装。依存関係を増やさずに並行制御を実現。

公開日: 更新日:

翻訳

p-limitの使い勝手には不満はないし、ダウンロード数などの利用実績からも採用するには不満はない。

プロジェクトの依存関係を増やしたくないケースで採用したかったのと、TypeScriptで実装してみたかったので再実装してみた。

TypeScript Playgroundで動作確認できる。

interface LimitFunction {
<Arguments extends readonly unknown[], Result>(
fn: (...args: Arguments) => PromiseLike<Result> | Result,
...args: Arguments
): Promise<Result>;
readonly activeCount: number;
readonly pendingCount: number;
clearQueue(): void;
concurrency: number;
map<Input, Result>(
iterable: Iterable<Input>,
fn: (value: Input, index: number) => PromiseLike<Result> | Result
): Promise<Result[]>;
}
interface LimitFunctionOptions {
readonly concurrency: number;
}
function validateConcurrency(
concurrency: number
): asserts concurrency is number {
if (!((Number.isInteger(concurrency) || concurrency === Number.POSITIVE_INFINITY) && concurrency > 0)) {
throw new TypeError('Expected `concurrency` to be a number from 1 and up');
}
}
function pLimit(initialConcurrency: number): LimitFunction {
validateConcurrency(initialConcurrency);
const queue = createQueue<() => void>();
let activeCount = 0;
let concurrency = initialConcurrency;
const resumeNext = (): void => {
if (activeCount < concurrency && queue.size > 0) {
activeCount++;
const next = queue.dequeue();
if (next !== undefined) {
next();
}
}
};
const next = (): void => {
activeCount--;
resumeNext();
};
const run = async <Arguments extends readonly unknown[], Result>(
fn: (...args: Arguments) => PromiseLike<Result> | Result,
resolve: (value: Result | PromiseLike<Result>) => void,
args: Arguments
): Promise<void> => {
const result = (async (): Promise<Result> => fn(...args))();
resolve(result as Result | PromiseLike<Result>);
try {
await result;
} catch { }
next();
};
const enqueue = <Arguments extends readonly unknown[], Result>(
fn: (...arguments_: Arguments) => PromiseLike<Result> | Result,
resolve: (value: Result | PromiseLike<Result>) => void,
args: Arguments
): void => {
new Promise<void>((internalResolve): void => {
queue.enqueue(internalResolve);
}).then((): Promise<void> => run(fn, resolve, args));
if (activeCount < concurrency) {
resumeNext();
}
};
const generator = <Arguments extends readonly unknown[], Result>(
fn: (...args: Arguments) => PromiseLike<Result> | Result,
...args: Arguments
): Promise<Result> => new Promise<Result>((resolve): void => {
enqueue(fn, resolve, args);
});
Object.defineProperties(generator, {
activeCount: {
get(): number {
return activeCount;
},
},
pendingCount: {
get(): number {
return queue.size;
},
},
clearQueue: {
value(): void {
queue.clear();
},
},
concurrency: {
get(): number {
return concurrency;
},
set(newConcurrency: number): void {
validateConcurrency(newConcurrency);
concurrency = newConcurrency;
queueMicrotask((): void => {
while (activeCount < concurrency && queue.size > 0) {
resumeNext();
}
});
},
},
map: {
async value<Input, Result>(
this: LimitFunction,
iterable: Iterable<Input>,
fn: (value: Input, index: number) => PromiseLike<Result> | Result
): Promise<Result[]> {
const promises = Array.from(
iterable,
(value: Input, index: number): Promise<Result> => this(fn, value, index)
);
return Promise.all(promises);
},
},
});
return generator as LimitFunction;
}
function limitFunction<Arguments extends readonly unknown[], Result>(
fn: (...args: Arguments) => PromiseLike<Result> | Result,
options: LimitFunctionOptions
): (...args: Arguments) => Promise<Result> {
const { concurrency } = options;
const limit = pLimit(concurrency);
return (...args: Arguments): Promise<Result> =>
limit((): PromiseLike<Result> | Result => fn(...args));
}
//-----------------------------------------------------
// yocto-queue
//-----------------------------------------------------
interface QNode<T> {
value: T;
next: QNode<T> | undefined;
}
interface Queue<T> {
enqueue(value: T): void;
dequeue(): T | undefined;
peek(): T | undefined;
clear(): void;
readonly size: number;
[Symbol.iterator](): Generator<T, void, unknown>;
drain(): Generator<T | undefined, void, unknown>;
}
function createNode<T>(value: T): QNode<T> {
return { value, next: undefined };
}
function createQueue<T>(): Queue<T> {
let head: QNode<T> | undefined;
let tail: QNode<T> | undefined;
let size: number;
const clear = (): void => {
head = tail = undefined;
size = 0;
}
clear();
const enqueue = (value: T): void => {
const node = createNode(value);
if (head) {
tail && (tail.next = node);
tail = node;
} else {
head = node;
tail = node;
}
size++;
}
const dequeue = (): T | undefined => {
const current = head;
if (!current) return;
head = head?.next;
size--;
return current.value;
}
const peek = (): T | undefined => {
return head?.value;
}
function* iterator(): Generator<T, void, unknown> {
let current = head;
while (current) {
yield current.value;
current = current.next;
}
}
function* drain(): Generator<T | undefined, void, unknown> {
while (head) {
yield dequeue();
}
}
return {
enqueue,
dequeue,
peek,
clear,
get size() {
return size;
},
[Symbol.iterator]: iterator,
drain
}
}