logo hsb.horse
← 스니펫 목록으로 돌아가기

Snippets

TypeScript p-limit 구현

Promise 동시성을 제어하는 p-limit의 TypeScript 재구현. 의존성을 추가하지 않고 동시성 제어를 구현합니다.

게시일: 수정일:

p-limit은 이미 충분히 검증됐고 다운로드 수 기준으로도 널리 쓰인다.

프로젝트 의존성을 늘리고 싶지 않았고, 직접 구현도 해 보고 싶어서 TypeScript로 다시 작성했다.

TypeScript Playground에서 동작을 확인할 수 있다.

interface LimitFunction {
<Arguments extends readonly unknown[], Result>(
fn: (...args: Arguments) => PromiseLike<Result> | Result,
...args: Arguments
): Promise<Result>;
readonly activeCount: number;
readonly pendingCount: number;
clearQueue(): void;
concurrency: number;
map<Input, Result>(
iterable: Iterable<Input>,
fn: (value: Input, index: number) => PromiseLike<Result> | Result
): Promise<Result[]>;
}
interface LimitFunctionOptions {
readonly concurrency: number;
}
function validateConcurrency(
concurrency: number
): asserts concurrency is number {
if (!((Number.isInteger(concurrency) || concurrency === Number.POSITIVE_INFINITY) && concurrency > 0)) {
throw new TypeError('Expected `concurrency` to be a number from 1 and up');
}
}
function pLimit(initialConcurrency: number): LimitFunction {
validateConcurrency(initialConcurrency);
const queue = createQueue<() => void>();
let activeCount = 0;
let concurrency = initialConcurrency;
const resumeNext = (): void => {
if (activeCount < concurrency && queue.size > 0) {
activeCount++;
const next = queue.dequeue();
if (next !== undefined) {
next();
}
}
};
const next = (): void => {
activeCount--;
resumeNext();
};
const run = async <Arguments extends readonly unknown[], Result>(
fn: (...args: Arguments) => PromiseLike<Result> | Result,
resolve: (value: Result | PromiseLike<Result>) => void,
args: Arguments
): Promise<void> => {
const result = (async (): Promise<Result> => fn(...args))();
resolve(result as Result | PromiseLike<Result>);
try {
await result;
} catch { }
next();
};
const enqueue = <Arguments extends readonly unknown[], Result>(
fn: (...arguments_: Arguments) => PromiseLike<Result> | Result,
resolve: (value: Result | PromiseLike<Result>) => void,
args: Arguments
): void => {
new Promise<void>((internalResolve): void => {
queue.enqueue(internalResolve);
}).then((): Promise<void> => run(fn, resolve, args));
if (activeCount < concurrency) {
resumeNext();
}
};
const generator = <Arguments extends readonly unknown[], Result>(
fn: (...args: Arguments) => PromiseLike<Result> | Result,
...args: Arguments
): Promise<Result> => new Promise<Result>((resolve): void => {
enqueue(fn, resolve, args);
});
Object.defineProperties(generator, {
activeCount: {
get(): number {
return activeCount;
},
},
pendingCount: {
get(): number {
return queue.size;
},
},
clearQueue: {
value(): void {
queue.clear();
},
},
concurrency: {
get(): number {
return concurrency;
},
set(newConcurrency: number): void {
validateConcurrency(newConcurrency);
concurrency = newConcurrency;
queueMicrotask((): void => {
while (activeCount < concurrency && queue.size > 0) {
resumeNext();
}
});
},
},
map: {
async value<Input, Result>(
this: LimitFunction,
iterable: Iterable<Input>,
fn: (value: Input, index: number) => PromiseLike<Result> | Result
): Promise<Result[]> {
const promises = Array.from(
iterable,
(value: Input, index: number): Promise<Result> => this(fn, value, index)
);
return Promise.all(promises);
},
},
});
return generator as LimitFunction;
}
function limitFunction<Arguments extends readonly unknown[], Result>(
fn: (...args: Arguments) => PromiseLike<Result> | Result,
options: LimitFunctionOptions
): (...args: Arguments) => Promise<Result> {
const { concurrency } = options;
const limit = pLimit(concurrency);
return (...args: Arguments): Promise<Result> =>
limit((): PromiseLike<Result> | Result => fn(...args));
}
//-----------------------------------------------------
// yocto-queue
//-----------------------------------------------------
interface QNode<T> {
value: T;
next: QNode<T> | undefined;
}
interface Queue<T> {
enqueue(value: T): void;
dequeue(): T | undefined;
peek(): T | undefined;
clear(): void;
readonly size: number;
[Symbol.iterator](): Generator<T, void, unknown>;
drain(): Generator<T | undefined, void, unknown>;
}
function createNode<T>(value: T): QNode<T> {
return { value, next: undefined };
}
function createQueue<T>(): Queue<T> {
let head: QNode<T> | undefined;
let tail: QNode<T> | undefined;
let size: number;
const clear = (): void => {
head = tail = undefined;
size = 0;
}
clear();
const enqueue = (value: T): void => {
const node = createNode(value);
if (head) {
tail && (tail.next = node);
tail = node;
} else {
head = node;
tail = node;
}
size++;
}
const dequeue = (): T | undefined => {
const current = head;
if (!current) return;
head = head?.next;
size--;
return current.value;
}
const peek = (): T | undefined => {
return head?.value;
}
function* iterator(): Generator<T, void, unknown> {
let current = head;
while (current) {
yield current.value;
current = current.next;
}
}
function* drain(): Generator<T | undefined, void, unknown> {
while (head) {
yield dequeue();
}
}
return {
enqueue,
dequeue,
peek,
clear,
get size() {
return size;
},
[Symbol.iterator]: iterator,
drain
}
}

구현에서 잘 먹히는 장면

이 구현은 Promise를 병렬로 던지고 싶지만 한꺼번에 때리면 부담이 큰 상황에서 잘 맞는다. API 배치, 파일 처리, 스크래핑처럼 속도보다 동시 실행 상한이 더 중요한 작업에 어울린다.

중점적으로 볼 부분은 queue가 비워지는 타이밍과 error 처리다. 하나 실패해도 나머지를 계속할지, 전체를 멈출지에 따라 wrapper 설계가 달라진다.

실무 메모

이 스니펫은 typescript, promise, concurrency 주변에서 같은 조작이나 판정을 매번 다시 쓰고 싶지 않을 때 잘 맞는다. 작은 보조 단위로 빼 두면 호출부에서는 의도만 읽기 쉬워진다.

반대로 분기와 전제조건이 늘어나 책임이 커진다면, 하나의 스니펫에 전부 넣지 않는 편이 낫다. 절차와 helper를 나누거나 역할별로 쪼개 두는 편이 유지보수에 유리하다.

구현 메모

쓸 때는 입력, 출력, 부작용을 먼저 정해 두면 재사용이 쉬워진다. TypeScript p-limit 구현도 typescript, promise, concurrency 문맥에서 무엇을 짧게 처리하려는 스니펫인지 먼저 고정하면 호출부가 읽기 쉬워진다.

Promise 동시성을 제어하는 p-limit의 TypeScript 재구현. 의존성을 추가하지 않고 동시성 제어를 구현합니다. 라는 목적에 대해 전제 확인까지 모두 넣을 필요는 없다. 호출부에 남길 책임과 이 스니펫 안에서 닫을 책임을 나누는 편이 덜 무너진다.