When processing massive datasets in workers, sending all data at once causes memory pressure from structured clone costs. Splitting into sub-batches with per-batch timeouts balances memory efficiency and error detection.
Code
/** * Worker pool configuration */export interface WorkerPoolOptions { /** Worker script URL */ workerUrl: string | URL /** Pool size (defaults to CPU cores if omitted) */ poolSize?: number /** Max items per sub-batch (default: 1500) */ subBatchSize?: number /** Sub-batch timeout in milliseconds (default: 30000) */ subBatchTimeout?: number}
/** * Worker pool interface */export interface WorkerPool<TInput, TResult> { /** Dispatch items to worker pool for processing */ dispatch( items: TInput[], onProgress?: (processed: number) => void ): Promise<TResult[]> /** Terminate all workers */ terminate(): Promise<void> /** Number of workers in pool */ readonly size: number}
/** * Worker protocol: sub-batch message */interface SubBatchMessage<T> { type: 'sub-batch' items: T[]}
/** * Worker protocol: flush request */interface FlushMessage { type: 'flush'}
/** * Worker protocol: sub-batch done response */interface SubBatchDoneMessage { type: 'sub-batch-done'}
/** * Worker protocol: progress report */interface ProgressMessage { type: 'progress' processed: number}
/** * Worker protocol: error notification */interface ErrorMessage { type: 'error' error: string}
/** * Worker protocol: final result */interface ResultMessage<T> { type: 'result' data: T}
type WorkerMessage<T> = | SubBatchDoneMessage | ProgressMessage | ErrorMessage | ResultMessage<T>
/** * Create a worker pool */export const createWorkerPool = <TInput, TResult>( options: WorkerPoolOptions): WorkerPool<TInput, TResult> => { const { workerUrl, poolSize, subBatchSize = 1500, subBatchTimeout = 30000, } = options
// Determine pool size (browser/Node.js compatible) const size = poolSize ?? getDefaultPoolSize() const workers: Worker[] = []
// Create workers for (let i = 0; i < size; i++) { workers.push(createWorker(workerUrl)) }
const dispatch = ( items: TInput[], onProgress?: (processed: number) => void ): Promise<TResult[]> => { if (items.length === 0) return Promise.resolve([])
// Split items by worker count const chunkSize = Math.ceil(items.length / size) const chunks: TInput[][] = [] for (let i = 0; i < items.length; i += chunkSize) { chunks.push(items.slice(i, i + chunkSize)) }
// Per-worker progress counters const workerProgress = new Array(chunks.length).fill(0)
// Dispatch chunks to workers const promises = chunks.map((chunk, workerIndex) => { const worker = workers[workerIndex] return processChunk<TInput, TResult>( worker, chunk, workerIndex, subBatchSize, subBatchTimeout, (processed) => { workerProgress[workerIndex] = processed if (onProgress) { const total = workerProgress.reduce((a, b) => a + b, 0) onProgress(total) } } ) })
return Promise.all(promises) }
const terminate = async (): Promise<void> => { await Promise.all(workers.map((w) => w.terminate())) workers.length = 0 }
return { dispatch, terminate, size }}
/** * Process chunk in a single worker (sub-batch splitting) */const processChunk = <TInput, TResult>( worker: Worker, chunk: TInput[], workerIndex: number, subBatchSize: number, subBatchTimeout: number, onProgress: (processed: number) => void): Promise<TResult> => { return new Promise<TResult>((resolve, reject) => { let settled = false let subBatchTimer: ReturnType<typeof setTimeout> | null = null let subBatchIndex = 0
const cleanup = () => { if (subBatchTimer) clearTimeout(subBatchTimer) worker.removeEventListener('message', handler) worker.removeEventListener('error', errorHandler) }
const resetSubBatchTimer = () => { if (subBatchTimer) clearTimeout(subBatchTimer) subBatchTimer = setTimeout(() => { if (!settled) { settled = true cleanup() reject( new Error( `Worker ${workerIndex} sub-batch timed out after ${ subBatchTimeout / 1000 }s (chunk: ${chunk.length} items)` ) ) } }, subBatchTimeout) }
const sendNextSubBatch = () => { const start = subBatchIndex * subBatchSize if (start >= chunk.length) { // All sub-batches sent, request flush worker.postMessage({ type: 'flush' } as FlushMessage) return } const subBatch = chunk.slice(start, start + subBatchSize) subBatchIndex++ resetSubBatchTimer() worker.postMessage({ type: 'sub-batch', items: subBatch, } as SubBatchMessage<TInput>) }
const handler = (event: MessageEvent<WorkerMessage<TResult>>) => { if (settled) return const msg = event.data
if (msg.type === 'progress') { onProgress(msg.processed) } else if (msg.type === 'sub-batch-done') { sendNextSubBatch() } else if (msg.type === 'error') { settled = true cleanup() reject(new Error(`Worker ${workerIndex} error: ${msg.error}`)) } else if (msg.type === 'result') { settled = true cleanup() resolve(msg.data) } }
const errorHandler = (err: ErrorEvent) => { if (!settled) { settled = true cleanup() reject(err.error || err) } }
worker.addEventListener('message', handler) worker.addEventListener('error', errorHandler) sendNextSubBatch() })}
/** * Create a worker (browser/Node.js compatible) */const createWorker = (workerUrl: string | URL): Worker => { if (typeof Worker !== 'undefined') { // Browser environment return new Worker(workerUrl, { type: 'module' }) } // Node.js environment // @ts-expect-error - Node.js Worker is require('node:worker_threads').Worker const { Worker: NodeWorker } = require('node:worker_threads') return new NodeWorker(workerUrl)}
/** * Get default pool size */const getDefaultPoolSize = (): number => { if (typeof navigator !== 'undefined' && navigator.hardwareConcurrency) { // Browser environment return Math.min(8, Math.max(1, navigator.hardwareConcurrency - 1)) } // Node.js environment try { // @ts-expect-error - Node.js os module const os = require('node:os') return Math.min(8, Math.max(1, os.cpus().length - 1)) } catch { return 4 // Fallback }}Usage
Main Thread
// Create worker poolconst pool = createWorkerPool<string, ParsedFile>({ workerUrl: new URL('./parser.worker.js', import.meta.url), poolSize: 4, subBatchSize: 1000, subBatchTimeout: 20000,})
// Process massive file listconst files = ['file1.txt', 'file2.txt', /* ... 10000 files */]const results = await pool.dispatch(files, (processed) => { console.log(`Processed: ${processed}/${files.length}`)})
// Terminateawait pool.terminate()Worker Side (parser.worker.ts)
// Per-sub-batch accumulation bufferlet accumulated: ParsedFile[] = []let totalProcessed = 0
self.addEventListener('message', async (event) => { const msg = event.data
if (msg.type === 'sub-batch') { // Process sub-batch for (const filePath of msg.items) { const parsed = await parseFile(filePath) accumulated.push(parsed) totalProcessed++ }
// Report progress self.postMessage({ type: 'progress', processed: totalProcessed })
// Notify sub-batch completion self.postMessage({ type: 'sub-batch-done' }) } else if (msg.type === 'flush') { // Return final result self.postMessage({ type: 'result', data: accumulated }) accumulated = [] totalProcessed = 0 }})
const parseFile = async (path: string): Promise<ParsedFile> => { // Actual parsing logic return { path, content: '...' }}How It Works
- 3-level splitting: Process is divided into
all items → worker chunks → sub-batches - Sub-batch sending: Send only
subBatchSizeitems to each worker at once, then send next after completion - Timeout management: Set independent timers per sub-batch to detect anomalies like huge files early
- Progress aggregation: Count processed items per worker and aggregate total progress in real-time
- Result flushing: After all sub-batches are sent, send a
flushmessage to request final results from workers
Sending in sub-batches keeps structured clone memory peak at subBatchSize × avg size instead of total items × avg size.
Benefits
- Memory Efficiency: Avoids structured clone memory pressure by sending data in sub-batches instead of all at once
- Early Error Detection: Per-sub-batch timeouts catch huge files or infinite loops early
- Progress Visualization: Aggregating per-worker progress improves UX with real-time feedback
- Environment Agnostic: Works with both browser Web Workers and Node.js worker_threads
Caveats
Sub-batch size and timeout values are workload-dependent and need tuning. Too small increases overhead, too large reduces memory efficiency. Guidelines: 1000-2000 for file parsers, 100-500 for image conversion.
Forgetting to implement the worker protocol (sub-batch / flush / result handling) causes deadlocks, so prepare templates in advance.
Applications
- Node.js mass file parsing (TypeScript, Markdown, JSON, etc.)
- Browser batch image conversion (resize, format conversion)
- CPU-heavy batch jobs (encryption, compression, data aggregation)
- Large dataset preprocessing pipelines
hsb.horse