Ao processar grandes conjuntos de dados em workers, enviar todos os dados de uma vez causa pressão de memória pelos custos de structured clone. Dividir em sub-lotes com timeouts por lote equilibra eficiência de memória e detecção de erros.
Código
/** * Configuração do pool de workers */export interface WorkerPoolOptions { /** URL do script worker */ workerUrl: string | URL /** Tamanho do pool (padrão baseado em núcleos de CPU) */ poolSize?: number /** Máximo de itens por sub-lote (padrão: 1500) */ subBatchSize?: number /** Timeout do sub-lote em milissegundos (padrão: 30000) */ subBatchTimeout?: number}
/** * Interface do pool de workers */export interface WorkerPool<TInput, TResult> { /** Despachar itens ao pool de workers para processamento */ dispatch( items: TInput[], onProgress?: (processed: number) => void ): Promise<TResult[]> /** Terminar todos os workers */ terminate(): Promise<void> /** Número de workers no pool */ readonly size: number}
/** * Protocolo worker: mensagem de sub-lote */interface SubBatchMessage<T> { type: 'sub-batch' items: T[]}
/** * Protocolo worker: solicitação de flush */interface FlushMessage { type: 'flush'}
/** * Protocolo worker: resposta de sub-lote concluído */interface SubBatchDoneMessage { type: 'sub-batch-done'}
/** * Protocolo worker: relatório de progresso */interface ProgressMessage { type: 'progress' processed: number}
/** * Protocolo worker: notificação de erro */interface ErrorMessage { type: 'error' error: string}
/** * Protocolo worker: resultado final */interface ResultMessage<T> { type: 'result' data: T}
type WorkerMessage<T> = | SubBatchDoneMessage | ProgressMessage | ErrorMessage | ResultMessage<T>
/** * Criar um pool de workers */export const createWorkerPool = <TInput, TResult>( options: WorkerPoolOptions): WorkerPool<TInput, TResult> => { const { workerUrl, poolSize, subBatchSize = 1500, subBatchTimeout = 30000, } = options
// Determinar tamanho do pool (compatível browser/Node.js) const size = poolSize ?? getDefaultPoolSize() const workers: Worker[] = []
// Criar workers for (let i = 0; i < size; i++) { workers.push(createWorker(workerUrl)) }
const dispatch = ( items: TInput[], onProgress?: (processed: number) => void ): Promise<TResult[]> => { if (items.length === 0) return Promise.resolve([])
// Dividir itens pelo número de workers const chunkSize = Math.ceil(items.length / size) const chunks: TInput[][] = [] for (let i = 0; i < items.length; i += chunkSize) { chunks.push(items.slice(i, i + chunkSize)) }
// Contadores de progresso por worker const workerProgress = new Array(chunks.length).fill(0)
// Despachar chunks aos workers const promises = chunks.map((chunk, workerIndex) => { const worker = workers[workerIndex] return processChunk<TInput, TResult>( worker, chunk, workerIndex, subBatchSize, subBatchTimeout, (processed) => { workerProgress[workerIndex] = processed if (onProgress) { const total = workerProgress.reduce((a, b) => a + b, 0) onProgress(total) } } ) })
return Promise.all(promises) }
const terminate = async (): Promise<void> => { await Promise.all(workers.map((w) => w.terminate())) workers.length = 0 }
return { dispatch, terminate, size }}
/** * Processar chunk em um único worker (divisão de sub-lote) */const processChunk = <TInput, TResult>( worker: Worker, chunk: TInput[], workerIndex: number, subBatchSize: number, subBatchTimeout: number, onProgress: (processed: number) => void): Promise<TResult> => { return new Promise<TResult>((resolve, reject) => { let settled = false let subBatchTimer: ReturnType<typeof setTimeout> | null = null let subBatchIndex = 0
const cleanup = () => { if (subBatchTimer) clearTimeout(subBatchTimer) worker.removeEventListener('message', handler) worker.removeEventListener('error', errorHandler) }
const resetSubBatchTimer = () => { if (subBatchTimer) clearTimeout(subBatchTimer) subBatchTimer = setTimeout(() => { if (!settled) { settled = true cleanup() reject( new Error( `Worker ${workerIndex} sub-batch timed out after ${ subBatchTimeout / 1000 }s (chunk: ${chunk.length} items)` ) ) } }, subBatchTimeout) }
const sendNextSubBatch = () => { const start = subBatchIndex * subBatchSize if (start >= chunk.length) { // Todos os sub-lotes enviados, solicitar flush worker.postMessage({ type: 'flush' } as FlushMessage) return } const subBatch = chunk.slice(start, start + subBatchSize) subBatchIndex++ resetSubBatchTimer() worker.postMessage({ type: 'sub-batch', items: subBatch, } as SubBatchMessage<TInput>) }
const handler = (event: MessageEvent<WorkerMessage<TResult>>) => { if (settled) return const msg = event.data
if (msg.type === 'progress') { onProgress(msg.processed) } else if (msg.type === 'sub-batch-done') { sendNextSubBatch() } else if (msg.type === 'error') { settled = true cleanup() reject(new Error(`Worker ${workerIndex} error: ${msg.error}`)) } else if (msg.type === 'result') { settled = true cleanup() resolve(msg.data) } }
const errorHandler = (err: ErrorEvent) => { if (!settled) { settled = true cleanup() reject(err.error || err) } }
worker.addEventListener('message', handler) worker.addEventListener('error', errorHandler) sendNextSubBatch() })}
/** * Criar um worker (compatível browser/Node.js) */const createWorker = (workerUrl: string | URL): Worker => { if (typeof Worker !== 'undefined') { // Ambiente de navegador return new Worker(workerUrl, { type: 'module' }) } // Ambiente Node.js // @ts-expect-error - Node.js Worker é require('node:worker_threads').Worker const { Worker: NodeWorker } = require('node:worker_threads') return new NodeWorker(workerUrl)}
/** * Obter tamanho padrão do pool */const getDefaultPoolSize = (): number => { if (typeof navigator !== 'undefined' && navigator.hardwareConcurrency) { // Ambiente de navegador return Math.min(8, Math.max(1, navigator.hardwareConcurrency - 1)) } // Ambiente Node.js try { // @ts-expect-error - Node.js os module const os = require('node:os') return Math.min(8, Math.max(1, os.cpus().length - 1)) } catch { return 4 // Fallback }}Uso
Thread Principal
// Criar pool de workersconst pool = createWorkerPool<string, ParsedFile>({ workerUrl: new URL('./parser.worker.js', import.meta.url), poolSize: 4, subBatchSize: 1000, subBatchTimeout: 20000,})
// Processar lista massiva de arquivosconst files = ['file1.txt', 'file2.txt', /* ... 10000 files */]const results = await pool.dispatch(files, (processed) => { console.log(`Processed: ${processed}/${files.length}`)})
// Terminarawait pool.terminate()Lado do Worker (parser.worker.ts)
// Buffer de acumulação por sub-lotelet accumulated: ParsedFile[] = []let totalProcessed = 0
self.addEventListener('message', async (event) => { const msg = event.data
if (msg.type === 'sub-batch') { // Processar sub-lote for (const filePath of msg.items) { const parsed = await parseFile(filePath) accumulated.push(parsed) totalProcessed++ }
// Relatar progresso self.postMessage({ type: 'progress', processed: totalProcessed })
// Notificar conclusão do sub-lote self.postMessage({ type: 'sub-batch-done' }) } else if (msg.type === 'flush') { // Retornar resultado final self.postMessage({ type: 'result', data: accumulated }) accumulated = [] totalProcessed = 0 }})
const parseFile = async (path: string): Promise<ParsedFile> => { // Lógica de parsing real return { path, content: '...' }}Como Funciona
- Divisão de 3 níveis: O processamento é dividido em
todos os itens → chunks de workers → sub-lotes - Envio de sub-lotes: Enviar apenas
subBatchSizeitens a cada worker por vez, depois enviar o próximo após conclusão - Gerenciamento de timeout: Definir timers independentes por sub-lote para detectar anomalias como arquivos enormes cedo
- Agregação de progresso: Contar itens processados por worker e agregar progresso total em tempo real
- Flush de resultados: Após enviar todos os sub-lotes, enviar mensagem
flushpara solicitar resultados finais dos workers
Enviar em sub-lotes mantém o pico de memória de structured clone em subBatchSize × tamanho médio em vez de total de itens × tamanho médio.
Benefícios
- Eficiência de memória: Evita pressão de memória de structured clone enviando dados em sub-lotes em vez de tudo de uma vez
- Detecção precoce de erros: Timeouts por sub-lote detectam arquivos enormes ou loops infinitos cedo
- Visualização de progresso: Agregação de progresso por worker melhora UX com feedback em tempo real
- Agnóstico de ambiente: Funciona com Web Workers de navegador e worker_threads de Node.js
Ressalvas
Tamanho de sub-lote e valores de timeout dependem da carga de trabalho e precisam de ajuste. Muito pequeno aumenta overhead, muito grande reduz eficiência de memória. Diretrizes: 1000-2000 para parsers de arquivo, 100-500 para conversão de imagem.
Esquecer de implementar o protocolo worker (manipulação de sub-batch / flush / result) causa deadlocks, então prepare templates com antecedência.
Aplicações
- Parsing massivo de arquivos Node.js (TypeScript, Markdown, JSON, etc.)
- Conversão em lote de imagens no navegador (redimensionamento, conversão de formato)
- Jobs em lote intensivos em CPU (criptografia, compressão, agregação de dados)
- Pipelines de pré-processamento de grandes conjuntos de dados
hsb.horse