logo hsb.horse
← Back to snippets index

Snippets

TypeScript p-limit Implementation

TypeScript reimplementation of p-limit for controlling Promise concurrency. Achieve concurrency control without adding dependencies.

Published: Updated:

p-limit is fully functional and has proven usage based on download numbers.

Reimplemented in TypeScript for cases where I wanted to avoid adding project dependencies and wanted to try implementing it myself.

Can verify functionality in TypeScript Playground.

interface LimitFunction {
<Arguments extends readonly unknown[], Result>(
fn: (...args: Arguments) => PromiseLike<Result> | Result,
...args: Arguments
): Promise<Result>;
readonly activeCount: number;
readonly pendingCount: number;
clearQueue(): void;
concurrency: number;
map<Input, Result>(
iterable: Iterable<Input>,
fn: (value: Input, index: number) => PromiseLike<Result> | Result
): Promise<Result[]>;
}
interface LimitFunctionOptions {
readonly concurrency: number;
}
function validateConcurrency(
concurrency: number
): asserts concurrency is number {
if (!((Number.isInteger(concurrency) || concurrency === Number.POSITIVE_INFINITY) && concurrency > 0)) {
throw new TypeError('Expected `concurrency` to be a number from 1 and up');
}
}
function pLimit(initialConcurrency: number): LimitFunction {
validateConcurrency(initialConcurrency);
const queue = createQueue<() => void>();
let activeCount = 0;
let concurrency = initialConcurrency;
const resumeNext = (): void => {
if (activeCount < concurrency && queue.size > 0) {
activeCount++;
const next = queue.dequeue();
if (next !== undefined) {
next();
}
}
};
const next = (): void => {
activeCount--;
resumeNext();
};
const run = async <Arguments extends readonly unknown[], Result>(
fn: (...args: Arguments) => PromiseLike<Result> | Result,
resolve: (value: Result | PromiseLike<Result>) => void,
args: Arguments
): Promise<void> => {
const result = (async (): Promise<Result> => fn(...args))();
resolve(result as Result | PromiseLike<Result>);
try {
await result;
} catch { }
next();
};
const enqueue = <Arguments extends readonly unknown[], Result>(
fn: (...arguments_: Arguments) => PromiseLike<Result> | Result,
resolve: (value: Result | PromiseLike<Result>) => void,
args: Arguments
): void => {
new Promise<void>((internalResolve): void => {
queue.enqueue(internalResolve);
}).then((): Promise<void> => run(fn, resolve, args));
if (activeCount < concurrency) {
resumeNext();
}
};
const generator = <Arguments extends readonly unknown[], Result>(
fn: (...args: Arguments) => PromiseLike<Result> | Result,
...args: Arguments
): Promise<Result> => new Promise<Result>((resolve): void => {
enqueue(fn, resolve, args);
});
Object.defineProperties(generator, {
activeCount: {
get(): number {
return activeCount;
},
},
pendingCount: {
get(): number {
return queue.size;
},
},
clearQueue: {
value(): void {
queue.clear();
},
},
concurrency: {
get(): number {
return concurrency;
},
set(newConcurrency: number): void {
validateConcurrency(newConcurrency);
concurrency = newConcurrency;
queueMicrotask((): void => {
while (activeCount < concurrency && queue.size > 0) {
resumeNext();
}
});
},
},
map: {
async value<Input, Result>(
this: LimitFunction,
iterable: Iterable<Input>,
fn: (value: Input, index: number) => PromiseLike<Result> | Result
): Promise<Result[]> {
const promises = Array.from(
iterable,
(value: Input, index: number): Promise<Result> => this(fn, value, index)
);
return Promise.all(promises);
},
},
});
return generator as LimitFunction;
}
function limitFunction<Arguments extends readonly unknown[], Result>(
fn: (...args: Arguments) => PromiseLike<Result> | Result,
options: LimitFunctionOptions
): (...args: Arguments) => Promise<Result> {
const { concurrency } = options;
const limit = pLimit(concurrency);
return (...args: Arguments): Promise<Result> =>
limit((): PromiseLike<Result> | Result => fn(...args));
}
//-----------------------------------------------------
// yocto-queue
//-----------------------------------------------------
interface QNode<T> {
value: T;
next: QNode<T> | undefined;
}
interface Queue<T> {
enqueue(value: T): void;
dequeue(): T | undefined;
peek(): T | undefined;
clear(): void;
readonly size: number;
[Symbol.iterator](): Generator<T, void, unknown>;
drain(): Generator<T | undefined, void, unknown>;
}
function createNode<T>(value: T): QNode<T> {
return { value, next: undefined };
}
function createQueue<T>(): Queue<T> {
let head: QNode<T> | undefined;
let tail: QNode<T> | undefined;
let size: number;
const clear = (): void => {
head = tail = undefined;
size = 0;
}
clear();
const enqueue = (value: T): void => {
const node = createNode(value);
if (head) {
tail && (tail.next = node);
tail = node;
} else {
head = node;
tail = node;
}
size++;
}
const dequeue = (): T | undefined => {
const current = head;
if (!current) return;
head = head?.next;
size--;
return current.value;
}
const peek = (): T | undefined => {
return head?.value;
}
function* iterator(): Generator<T, void, unknown> {
let current = head;
while (current) {
yield current.value;
current = current.next;
}
}
function* drain(): Generator<T | undefined, void, unknown> {
while (head) {
yield dequeue();
}
}
return {
enqueue,
dequeue,
peek,
clear,
get size() {
return size;
},
[Symbol.iterator]: iterator,
drain
}
}

Where It Pays Off in Practice

This implementation is useful when I want parallel Promise work, but firing everything at once would be too expensive. It fits API batches, file processing, and scraping jobs where the concurrency ceiling matters more than raw speed.

The main places to read carefully are queue draining and error handling. The wrapper changes depending on whether one failure should stop the whole run or let the remaining tasks continue.

Practical Note

This snippet fits well when I do not want to rewrite the same operation or check around typescript, promise, concurrency over and over. Keeping it as a small helper makes the caller easier to read because the intent stays in the foreground.

If the branches and preconditions start growing, it is usually better not to force everything into one snippet. Splitting the steps and helper responsibilities is easier to maintain.