logo hsb.horse
← Voltar para o índice de snippets

Snippets

Pool de Workers com Sub-lotes e Timeout por Sub-lote

Um padrão que divide grandes conjuntos de dados em pequenos sub-lotes enviados aos workers, reduzindo pressão de memória enquanto define timeouts por lote para detecção precoce de anomalias.

Publicado: Atualizado:

Ao processar grandes conjuntos de dados em workers, enviar todos os dados de uma vez causa pressão de memória pelos custos de structured clone. Dividir em sub-lotes com timeouts por lote equilibra eficiência de memória e detecção de erros.

Código

/**
* Configuração do pool de workers
*/
export interface WorkerPoolOptions {
/** URL do script worker */
workerUrl: string | URL
/** Tamanho do pool (padrão baseado em núcleos de CPU) */
poolSize?: number
/** Máximo de itens por sub-lote (padrão: 1500) */
subBatchSize?: number
/** Timeout do sub-lote em milissegundos (padrão: 30000) */
subBatchTimeout?: number
}
/**
* Interface do pool de workers
*/
export interface WorkerPool<TInput, TResult> {
/** Despachar itens ao pool de workers para processamento */
dispatch(
items: TInput[],
onProgress?: (processed: number) => void
): Promise<TResult[]>
/** Terminar todos os workers */
terminate(): Promise<void>
/** Número de workers no pool */
readonly size: number
}
/**
* Protocolo worker: mensagem de sub-lote
*/
interface SubBatchMessage<T> {
type: 'sub-batch'
items: T[]
}
/**
* Protocolo worker: solicitação de flush
*/
interface FlushMessage {
type: 'flush'
}
/**
* Protocolo worker: resposta de sub-lote concluído
*/
interface SubBatchDoneMessage {
type: 'sub-batch-done'
}
/**
* Protocolo worker: relatório de progresso
*/
interface ProgressMessage {
type: 'progress'
processed: number
}
/**
* Protocolo worker: notificação de erro
*/
interface ErrorMessage {
type: 'error'
error: string
}
/**
* Protocolo worker: resultado final
*/
interface ResultMessage<T> {
type: 'result'
data: T
}
type WorkerMessage<T> =
| SubBatchDoneMessage
| ProgressMessage
| ErrorMessage
| ResultMessage<T>
/**
* Criar um pool de workers
*/
export const createWorkerPool = <TInput, TResult>(
options: WorkerPoolOptions
): WorkerPool<TInput, TResult> => {
const {
workerUrl,
poolSize,
subBatchSize = 1500,
subBatchTimeout = 30000,
} = options
// Determinar tamanho do pool (compatível browser/Node.js)
const size = poolSize ?? getDefaultPoolSize()
const workers: Worker[] = []
// Criar workers
for (let i = 0; i < size; i++) {
workers.push(createWorker(workerUrl))
}
const dispatch = (
items: TInput[],
onProgress?: (processed: number) => void
): Promise<TResult[]> => {
if (items.length === 0) return Promise.resolve([])
// Dividir itens pelo número de workers
const chunkSize = Math.ceil(items.length / size)
const chunks: TInput[][] = []
for (let i = 0; i < items.length; i += chunkSize) {
chunks.push(items.slice(i, i + chunkSize))
}
// Contadores de progresso por worker
const workerProgress = new Array(chunks.length).fill(0)
// Despachar chunks aos workers
const promises = chunks.map((chunk, workerIndex) => {
const worker = workers[workerIndex]
return processChunk<TInput, TResult>(
worker,
chunk,
workerIndex,
subBatchSize,
subBatchTimeout,
(processed) => {
workerProgress[workerIndex] = processed
if (onProgress) {
const total = workerProgress.reduce((a, b) => a + b, 0)
onProgress(total)
}
}
)
})
return Promise.all(promises)
}
const terminate = async (): Promise<void> => {
await Promise.all(workers.map((w) => w.terminate()))
workers.length = 0
}
return { dispatch, terminate, size }
}
/**
* Processar chunk em um único worker (divisão de sub-lote)
*/
const processChunk = <TInput, TResult>(
worker: Worker,
chunk: TInput[],
workerIndex: number,
subBatchSize: number,
subBatchTimeout: number,
onProgress: (processed: number) => void
): Promise<TResult> => {
return new Promise<TResult>((resolve, reject) => {
let settled = false
let subBatchTimer: ReturnType<typeof setTimeout> | null = null
let subBatchIndex = 0
const cleanup = () => {
if (subBatchTimer) clearTimeout(subBatchTimer)
worker.removeEventListener('message', handler)
worker.removeEventListener('error', errorHandler)
}
const resetSubBatchTimer = () => {
if (subBatchTimer) clearTimeout(subBatchTimer)
subBatchTimer = setTimeout(() => {
if (!settled) {
settled = true
cleanup()
reject(
new Error(
`Worker ${workerIndex} sub-batch timed out after ${
subBatchTimeout / 1000
}s (chunk: ${chunk.length} items)`
)
)
}
}, subBatchTimeout)
}
const sendNextSubBatch = () => {
const start = subBatchIndex * subBatchSize
if (start >= chunk.length) {
// Todos os sub-lotes enviados, solicitar flush
worker.postMessage({ type: 'flush' } as FlushMessage)
return
}
const subBatch = chunk.slice(start, start + subBatchSize)
subBatchIndex++
resetSubBatchTimer()
worker.postMessage({
type: 'sub-batch',
items: subBatch,
} as SubBatchMessage<TInput>)
}
const handler = (event: MessageEvent<WorkerMessage<TResult>>) => {
if (settled) return
const msg = event.data
if (msg.type === 'progress') {
onProgress(msg.processed)
} else if (msg.type === 'sub-batch-done') {
sendNextSubBatch()
} else if (msg.type === 'error') {
settled = true
cleanup()
reject(new Error(`Worker ${workerIndex} error: ${msg.error}`))
} else if (msg.type === 'result') {
settled = true
cleanup()
resolve(msg.data)
}
}
const errorHandler = (err: ErrorEvent) => {
if (!settled) {
settled = true
cleanup()
reject(err.error || err)
}
}
worker.addEventListener('message', handler)
worker.addEventListener('error', errorHandler)
sendNextSubBatch()
})
}
/**
* Criar um worker (compatível browser/Node.js)
*/
const createWorker = (workerUrl: string | URL): Worker => {
if (typeof Worker !== 'undefined') {
// Ambiente de navegador
return new Worker(workerUrl, { type: 'module' })
}
// Ambiente Node.js
// @ts-expect-error - Node.js Worker é require('node:worker_threads').Worker
const { Worker: NodeWorker } = require('node:worker_threads')
return new NodeWorker(workerUrl)
}
/**
* Obter tamanho padrão do pool
*/
const getDefaultPoolSize = (): number => {
if (typeof navigator !== 'undefined' && navigator.hardwareConcurrency) {
// Ambiente de navegador
return Math.min(8, Math.max(1, navigator.hardwareConcurrency - 1))
}
// Ambiente Node.js
try {
// @ts-expect-error - Node.js os module
const os = require('node:os')
return Math.min(8, Math.max(1, os.cpus().length - 1))
} catch {
return 4 // Fallback
}
}

Uso

Thread Principal

// Criar pool de workers
const pool = createWorkerPool<string, ParsedFile>({
workerUrl: new URL('./parser.worker.js', import.meta.url),
poolSize: 4,
subBatchSize: 1000,
subBatchTimeout: 20000,
})
// Processar lista massiva de arquivos
const files = ['file1.txt', 'file2.txt', /* ... 10000 files */]
const results = await pool.dispatch(files, (processed) => {
console.log(`Processed: ${processed}/${files.length}`)
})
// Terminar
await pool.terminate()

Lado do Worker (parser.worker.ts)

// Buffer de acumulação por sub-lote
let accumulated: ParsedFile[] = []
let totalProcessed = 0
self.addEventListener('message', async (event) => {
const msg = event.data
if (msg.type === 'sub-batch') {
// Processar sub-lote
for (const filePath of msg.items) {
const parsed = await parseFile(filePath)
accumulated.push(parsed)
totalProcessed++
}
// Relatar progresso
self.postMessage({ type: 'progress', processed: totalProcessed })
// Notificar conclusão do sub-lote
self.postMessage({ type: 'sub-batch-done' })
} else if (msg.type === 'flush') {
// Retornar resultado final
self.postMessage({ type: 'result', data: accumulated })
accumulated = []
totalProcessed = 0
}
})
const parseFile = async (path: string): Promise<ParsedFile> => {
// Lógica de parsing real
return { path, content: '...' }
}

Como Funciona

  1. Divisão de 3 níveis: O processamento é dividido em todos os itens → chunks de workers → sub-lotes
  2. Envio de sub-lotes: Enviar apenas subBatchSize itens a cada worker por vez, depois enviar o próximo após conclusão
  3. Gerenciamento de timeout: Definir timers independentes por sub-lote para detectar anomalias como arquivos enormes cedo
  4. Agregação de progresso: Contar itens processados por worker e agregar progresso total em tempo real
  5. Flush de resultados: Após enviar todos os sub-lotes, enviar mensagem flush para solicitar resultados finais dos workers

Enviar em sub-lotes mantém o pico de memória de structured clone em subBatchSize × tamanho médio em vez de total de itens × tamanho médio.

Benefícios

  • Eficiência de memória: Evita pressão de memória de structured clone enviando dados em sub-lotes em vez de tudo de uma vez
  • Detecção precoce de erros: Timeouts por sub-lote detectam arquivos enormes ou loops infinitos cedo
  • Visualização de progresso: Agregação de progresso por worker melhora UX com feedback em tempo real
  • Agnóstico de ambiente: Funciona com Web Workers de navegador e worker_threads de Node.js

Ressalvas

Tamanho de sub-lote e valores de timeout dependem da carga de trabalho e precisam de ajuste. Muito pequeno aumenta overhead, muito grande reduz eficiência de memória. Diretrizes: 1000-2000 para parsers de arquivo, 100-500 para conversão de imagem.

Esquecer de implementar o protocolo worker (manipulação de sub-batch / flush / result) causa deadlocks, então prepare templates com antecedência.

Aplicações

  • Parsing massivo de arquivos Node.js (TypeScript, Markdown, JSON, etc.)
  • Conversão em lote de imagens no navegador (redimensionamento, conversão de formato)
  • Jobs em lote intensivos em CPU (criptografia, compressão, agregação de dados)
  • Pipelines de pré-processamento de grandes conjuntos de dados