p-limit은 이미 충분히 검증됐고 다운로드 수 기준으로도 널리 쓰인다.
프로젝트 의존성을 늘리고 싶지 않았고, 직접 구현도 해 보고 싶어서 TypeScript로 다시 작성했다.
TypeScript Playground에서 동작을 확인할 수 있다.
interface LimitFunction { <Arguments extends readonly unknown[], Result>( fn: (...args: Arguments) => PromiseLike<Result> | Result, ...args: Arguments ): Promise<Result>; readonly activeCount: number; readonly pendingCount: number; clearQueue(): void; concurrency: number; map<Input, Result>( iterable: Iterable<Input>, fn: (value: Input, index: number) => PromiseLike<Result> | Result ): Promise<Result[]>;}
interface LimitFunctionOptions { readonly concurrency: number;}
function validateConcurrency( concurrency: number): asserts concurrency is number { if (!((Number.isInteger(concurrency) || concurrency === Number.POSITIVE_INFINITY) && concurrency > 0)) { throw new TypeError('Expected `concurrency` to be a number from 1 and up'); }}
function pLimit(initialConcurrency: number): LimitFunction { validateConcurrency(initialConcurrency);
const queue = createQueue<() => void>(); let activeCount = 0; let concurrency = initialConcurrency;
const resumeNext = (): void => { if (activeCount < concurrency && queue.size > 0) { activeCount++; const next = queue.dequeue(); if (next !== undefined) { next(); } } };
const next = (): void => { activeCount--; resumeNext(); };
const run = async <Arguments extends readonly unknown[], Result>( fn: (...args: Arguments) => PromiseLike<Result> | Result, resolve: (value: Result | PromiseLike<Result>) => void, args: Arguments ): Promise<void> => { const result = (async (): Promise<Result> => fn(...args))();
resolve(result as Result | PromiseLike<Result>);
try { await result; } catch { }
next(); };
const enqueue = <Arguments extends readonly unknown[], Result>( fn: (...arguments_: Arguments) => PromiseLike<Result> | Result, resolve: (value: Result | PromiseLike<Result>) => void, args: Arguments ): void => { new Promise<void>((internalResolve): void => { queue.enqueue(internalResolve); }).then((): Promise<void> => run(fn, resolve, args));
if (activeCount < concurrency) { resumeNext(); } };
const generator = <Arguments extends readonly unknown[], Result>( fn: (...args: Arguments) => PromiseLike<Result> | Result, ...args: Arguments ): Promise<Result> => new Promise<Result>((resolve): void => { enqueue(fn, resolve, args); });
Object.defineProperties(generator, { activeCount: { get(): number { return activeCount; }, }, pendingCount: { get(): number { return queue.size; }, }, clearQueue: { value(): void { queue.clear(); }, }, concurrency: { get(): number { return concurrency; },
set(newConcurrency: number): void { validateConcurrency(newConcurrency); concurrency = newConcurrency;
queueMicrotask((): void => { while (activeCount < concurrency && queue.size > 0) { resumeNext(); } }); }, }, map: { async value<Input, Result>( this: LimitFunction, iterable: Iterable<Input>, fn: (value: Input, index: number) => PromiseLike<Result> | Result ): Promise<Result[]> { const promises = Array.from( iterable, (value: Input, index: number): Promise<Result> => this(fn, value, index) ); return Promise.all(promises); }, }, });
return generator as LimitFunction;}
function limitFunction<Arguments extends readonly unknown[], Result>( fn: (...args: Arguments) => PromiseLike<Result> | Result, options: LimitFunctionOptions): (...args: Arguments) => Promise<Result> { const { concurrency } = options; const limit = pLimit(concurrency);
return (...args: Arguments): Promise<Result> => limit((): PromiseLike<Result> | Result => fn(...args));}//-----------------------------------------------------// yocto-queue//-----------------------------------------------------
interface QNode<T> { value: T; next: QNode<T> | undefined;}
interface Queue<T> { enqueue(value: T): void; dequeue(): T | undefined; peek(): T | undefined; clear(): void; readonly size: number; [Symbol.iterator](): Generator<T, void, unknown>; drain(): Generator<T | undefined, void, unknown>;}
function createNode<T>(value: T): QNode<T> { return { value, next: undefined };}
function createQueue<T>(): Queue<T> { let head: QNode<T> | undefined; let tail: QNode<T> | undefined; let size: number;
const clear = (): void => { head = tail = undefined; size = 0; }
clear();
const enqueue = (value: T): void => { const node = createNode(value); if (head) { tail && (tail.next = node); tail = node; } else { head = node; tail = node; }
size++; }
const dequeue = (): T | undefined => { const current = head; if (!current) return; head = head?.next; size--; return current.value; }
const peek = (): T | undefined => { return head?.value; }
function* iterator(): Generator<T, void, unknown> { let current = head; while (current) { yield current.value; current = current.next; } }
function* drain(): Generator<T | undefined, void, unknown> { while (head) { yield dequeue(); } }
return { enqueue, dequeue, peek, clear, get size() { return size; }, [Symbol.iterator]: iterator, drain }}구현에서 잘 먹히는 장면
이 구현은 Promise를 병렬로 던지고 싶지만 한꺼번에 때리면 부담이 큰 상황에서 잘 맞는다. API 배치, 파일 처리, 스크래핑처럼 속도보다 동시 실행 상한이 더 중요한 작업에 어울린다.
중점적으로 볼 부분은 queue가 비워지는 타이밍과 error 처리다. 하나 실패해도 나머지를 계속할지, 전체를 멈출지에 따라 wrapper 설계가 달라진다.
실무 메모
이 스니펫은 typescript, promise, concurrency 주변에서 같은 조작이나 판정을 매번 다시 쓰고 싶지 않을 때 잘 맞는다. 작은 보조 단위로 빼 두면 호출부에서는 의도만 읽기 쉬워진다.
반대로 분기와 전제조건이 늘어나 책임이 커진다면, 하나의 스니펫에 전부 넣지 않는 편이 낫다. 절차와 helper를 나누거나 역할별로 쪼개 두는 편이 유지보수에 유리하다.
구현 메모
쓸 때는 입력, 출력, 부작용을 먼저 정해 두면 재사용이 쉬워진다. TypeScript p-limit 구현도 typescript, promise, concurrency 문맥에서 무엇을 짧게 처리하려는 스니펫인지 먼저 고정하면 호출부가 읽기 쉬워진다.
Promise 동시성을 제어하는 p-limit의 TypeScript 재구현. 의존성을 추가하지 않고 동시성 제어를 구현합니다. 라는 목적에 대해 전제 확인까지 모두 넣을 필요는 없다. 호출부에 남길 책임과 이 스니펫 안에서 닫을 책임을 나누는 편이 덜 무너진다.
hsb.horse