logo hsb.horse
← Back to snippets index

Snippets

Worker Pool with Sub-batches and Per-sub-batch Timeout

A pattern that splits large datasets into small sub-batches sent to workers, reducing memory pressure while setting timeouts per batch for early anomaly detection.

Published: Updated:

When processing massive datasets in workers, sending all data at once causes memory pressure from structured clone costs. Splitting into sub-batches with per-batch timeouts balances memory efficiency and error detection.

Code

/**
* Worker pool configuration
*/
export interface WorkerPoolOptions {
/** Worker script URL */
workerUrl: string | URL
/** Pool size (defaults to CPU cores if omitted) */
poolSize?: number
/** Max items per sub-batch (default: 1500) */
subBatchSize?: number
/** Sub-batch timeout in milliseconds (default: 30000) */
subBatchTimeout?: number
}
/**
* Worker pool interface
*/
export interface WorkerPool<TInput, TResult> {
/** Dispatch items to worker pool for processing */
dispatch(
items: TInput[],
onProgress?: (processed: number) => void
): Promise<TResult[]>
/** Terminate all workers */
terminate(): Promise<void>
/** Number of workers in pool */
readonly size: number
}
/**
* Worker protocol: sub-batch message
*/
interface SubBatchMessage<T> {
type: 'sub-batch'
items: T[]
}
/**
* Worker protocol: flush request
*/
interface FlushMessage {
type: 'flush'
}
/**
* Worker protocol: sub-batch done response
*/
interface SubBatchDoneMessage {
type: 'sub-batch-done'
}
/**
* Worker protocol: progress report
*/
interface ProgressMessage {
type: 'progress'
processed: number
}
/**
* Worker protocol: error notification
*/
interface ErrorMessage {
type: 'error'
error: string
}
/**
* Worker protocol: final result
*/
interface ResultMessage<T> {
type: 'result'
data: T
}
type WorkerMessage<T> =
| SubBatchDoneMessage
| ProgressMessage
| ErrorMessage
| ResultMessage<T>
/**
* Create a worker pool
*/
export const createWorkerPool = <TInput, TResult>(
options: WorkerPoolOptions
): WorkerPool<TInput, TResult> => {
const {
workerUrl,
poolSize,
subBatchSize = 1500,
subBatchTimeout = 30000,
} = options
// Determine pool size (browser/Node.js compatible)
const size = poolSize ?? getDefaultPoolSize()
const workers: Worker[] = []
// Create workers
for (let i = 0; i < size; i++) {
workers.push(createWorker(workerUrl))
}
const dispatch = (
items: TInput[],
onProgress?: (processed: number) => void
): Promise<TResult[]> => {
if (items.length === 0) return Promise.resolve([])
// Split items by worker count
const chunkSize = Math.ceil(items.length / size)
const chunks: TInput[][] = []
for (let i = 0; i < items.length; i += chunkSize) {
chunks.push(items.slice(i, i + chunkSize))
}
// Per-worker progress counters
const workerProgress = new Array(chunks.length).fill(0)
// Dispatch chunks to workers
const promises = chunks.map((chunk, workerIndex) => {
const worker = workers[workerIndex]
return processChunk<TInput, TResult>(
worker,
chunk,
workerIndex,
subBatchSize,
subBatchTimeout,
(processed) => {
workerProgress[workerIndex] = processed
if (onProgress) {
const total = workerProgress.reduce((a, b) => a + b, 0)
onProgress(total)
}
}
)
})
return Promise.all(promises)
}
const terminate = async (): Promise<void> => {
await Promise.all(workers.map((w) => w.terminate()))
workers.length = 0
}
return { dispatch, terminate, size }
}
/**
* Process chunk in a single worker (sub-batch splitting)
*/
const processChunk = <TInput, TResult>(
worker: Worker,
chunk: TInput[],
workerIndex: number,
subBatchSize: number,
subBatchTimeout: number,
onProgress: (processed: number) => void
): Promise<TResult> => {
return new Promise<TResult>((resolve, reject) => {
let settled = false
let subBatchTimer: ReturnType<typeof setTimeout> | null = null
let subBatchIndex = 0
const cleanup = () => {
if (subBatchTimer) clearTimeout(subBatchTimer)
worker.removeEventListener('message', handler)
worker.removeEventListener('error', errorHandler)
}
const resetSubBatchTimer = () => {
if (subBatchTimer) clearTimeout(subBatchTimer)
subBatchTimer = setTimeout(() => {
if (!settled) {
settled = true
cleanup()
reject(
new Error(
`Worker ${workerIndex} sub-batch timed out after ${
subBatchTimeout / 1000
}s (chunk: ${chunk.length} items)`
)
)
}
}, subBatchTimeout)
}
const sendNextSubBatch = () => {
const start = subBatchIndex * subBatchSize
if (start >= chunk.length) {
// All sub-batches sent, request flush
worker.postMessage({ type: 'flush' } as FlushMessage)
return
}
const subBatch = chunk.slice(start, start + subBatchSize)
subBatchIndex++
resetSubBatchTimer()
worker.postMessage({
type: 'sub-batch',
items: subBatch,
} as SubBatchMessage<TInput>)
}
const handler = (event: MessageEvent<WorkerMessage<TResult>>) => {
if (settled) return
const msg = event.data
if (msg.type === 'progress') {
onProgress(msg.processed)
} else if (msg.type === 'sub-batch-done') {
sendNextSubBatch()
} else if (msg.type === 'error') {
settled = true
cleanup()
reject(new Error(`Worker ${workerIndex} error: ${msg.error}`))
} else if (msg.type === 'result') {
settled = true
cleanup()
resolve(msg.data)
}
}
const errorHandler = (err: ErrorEvent) => {
if (!settled) {
settled = true
cleanup()
reject(err.error || err)
}
}
worker.addEventListener('message', handler)
worker.addEventListener('error', errorHandler)
sendNextSubBatch()
})
}
/**
* Create a worker (browser/Node.js compatible)
*/
const createWorker = (workerUrl: string | URL): Worker => {
if (typeof Worker !== 'undefined') {
// Browser environment
return new Worker(workerUrl, { type: 'module' })
}
// Node.js environment
// @ts-expect-error - Node.js Worker is require('node:worker_threads').Worker
const { Worker: NodeWorker } = require('node:worker_threads')
return new NodeWorker(workerUrl)
}
/**
* Get default pool size
*/
const getDefaultPoolSize = (): number => {
if (typeof navigator !== 'undefined' && navigator.hardwareConcurrency) {
// Browser environment
return Math.min(8, Math.max(1, navigator.hardwareConcurrency - 1))
}
// Node.js environment
try {
// @ts-expect-error - Node.js os module
const os = require('node:os')
return Math.min(8, Math.max(1, os.cpus().length - 1))
} catch {
return 4 // Fallback
}
}

Usage

Main Thread

// Create worker pool
const pool = createWorkerPool<string, ParsedFile>({
workerUrl: new URL('./parser.worker.js', import.meta.url),
poolSize: 4,
subBatchSize: 1000,
subBatchTimeout: 20000,
})
// Process massive file list
const files = ['file1.txt', 'file2.txt', /* ... 10000 files */]
const results = await pool.dispatch(files, (processed) => {
console.log(`Processed: ${processed}/${files.length}`)
})
// Terminate
await pool.terminate()

Worker Side (parser.worker.ts)

// Per-sub-batch accumulation buffer
let accumulated: ParsedFile[] = []
let totalProcessed = 0
self.addEventListener('message', async (event) => {
const msg = event.data
if (msg.type === 'sub-batch') {
// Process sub-batch
for (const filePath of msg.items) {
const parsed = await parseFile(filePath)
accumulated.push(parsed)
totalProcessed++
}
// Report progress
self.postMessage({ type: 'progress', processed: totalProcessed })
// Notify sub-batch completion
self.postMessage({ type: 'sub-batch-done' })
} else if (msg.type === 'flush') {
// Return final result
self.postMessage({ type: 'result', data: accumulated })
accumulated = []
totalProcessed = 0
}
})
const parseFile = async (path: string): Promise<ParsedFile> => {
// Actual parsing logic
return { path, content: '...' }
}

How It Works

  1. 3-level splitting: Process is divided into all items → worker chunks → sub-batches
  2. Sub-batch sending: Send only subBatchSize items to each worker at once, then send next after completion
  3. Timeout management: Set independent timers per sub-batch to detect anomalies like huge files early
  4. Progress aggregation: Count processed items per worker and aggregate total progress in real-time
  5. Result flushing: After all sub-batches are sent, send a flush message to request final results from workers

Sending in sub-batches keeps structured clone memory peak at subBatchSize × avg size instead of total items × avg size.

Benefits

  • Memory Efficiency: Avoids structured clone memory pressure by sending data in sub-batches instead of all at once
  • Early Error Detection: Per-sub-batch timeouts catch huge files or infinite loops early
  • Progress Visualization: Aggregating per-worker progress improves UX with real-time feedback
  • Environment Agnostic: Works with both browser Web Workers and Node.js worker_threads

Caveats

Sub-batch size and timeout values are workload-dependent and need tuning. Too small increases overhead, too large reduces memory efficiency. Guidelines: 1000-2000 for file parsers, 100-500 for image conversion.

Forgetting to implement the worker protocol (sub-batch / flush / result handling) causes deadlocks, so prepare templates in advance.

Applications

  • Node.js mass file parsing (TypeScript, Markdown, JSON, etc.)
  • Browser batch image conversion (resize, format conversion)
  • CPU-heavy batch jobs (encryption, compression, data aggregation)
  • Large dataset preprocessing pipelines