Bei der Verarbeitung massiver Datensätze in Workern verursacht das Senden aller Daten auf einmal Speicherdruck durch Structured-Clone-Kosten. Die Aufteilung in Sub-Batches mit Timeouts pro Batch gleicht Speichereffizienz und Fehlererkennung aus.
Code
/** * Worker-Pool-Konfiguration */export interface WorkerPoolOptions { /** Worker-Script-URL */ workerUrl: string | URL /** Pool-Größe (standardmäßig basierend auf CPU-Kernen) */ poolSize?: number /** Maximale Items pro Sub-Batch (Standard: 1500) */ subBatchSize?: number /** Sub-Batch-Timeout in Millisekunden (Standard: 30000) */ subBatchTimeout?: number}
/** * Worker-Pool-Interface */export interface WorkerPool<TInput, TResult> { /** Items an Worker-Pool zur Verarbeitung verteilen */ dispatch( items: TInput[], onProgress?: (processed: number) => void ): Promise<TResult[]> /** Alle Worker beenden */ terminate(): Promise<void> /** Anzahl der Worker im Pool */ readonly size: number}
/** * Worker-Protokoll: Sub-Batch-Nachricht */interface SubBatchMessage<T> { type: 'sub-batch' items: T[]}
/** * Worker-Protokoll: Flush-Anfrage */interface FlushMessage { type: 'flush'}
/** * Worker-Protokoll: Sub-Batch-Fertig-Antwort */interface SubBatchDoneMessage { type: 'sub-batch-done'}
/** * Worker-Protokoll: Fortschrittsbericht */interface ProgressMessage { type: 'progress' processed: number}
/** * Worker-Protokoll: Fehlerbenachrichtigung */interface ErrorMessage { type: 'error' error: string}
/** * Worker-Protokoll: Endergebnis */interface ResultMessage<T> { type: 'result' data: T}
type WorkerMessage<T> = | SubBatchDoneMessage | ProgressMessage | ErrorMessage | ResultMessage<T>
/** * Worker-Pool erstellen */export const createWorkerPool = <TInput, TResult>( options: WorkerPoolOptions): WorkerPool<TInput, TResult> => { const { workerUrl, poolSize, subBatchSize = 1500, subBatchTimeout = 30000, } = options
// Pool-Größe bestimmen (Browser/Node.js-kompatibel) const size = poolSize ?? getDefaultPoolSize() const workers: Worker[] = []
// Worker erstellen for (let i = 0; i < size; i++) { workers.push(createWorker(workerUrl)) }
const dispatch = ( items: TInput[], onProgress?: (processed: number) => void ): Promise<TResult[]> => { if (items.length === 0) return Promise.resolve([])
// Items nach Worker-Anzahl aufteilen const chunkSize = Math.ceil(items.length / size) const chunks: TInput[][] = [] for (let i = 0; i < items.length; i += chunkSize) { chunks.push(items.slice(i, i + chunkSize)) }
// Fortschrittszähler pro Worker const workerProgress = new Array(chunks.length).fill(0)
// Chunks an Worker verteilen const promises = chunks.map((chunk, workerIndex) => { const worker = workers[workerIndex] return processChunk<TInput, TResult>( worker, chunk, workerIndex, subBatchSize, subBatchTimeout, (processed) => { workerProgress[workerIndex] = processed if (onProgress) { const total = workerProgress.reduce((a, b) => a + b, 0) onProgress(total) } } ) })
return Promise.all(promises) }
const terminate = async (): Promise<void> => { await Promise.all(workers.map((w) => w.terminate())) workers.length = 0 }
return { dispatch, terminate, size }}
/** * Chunk in einem einzelnen Worker verarbeiten (Sub-Batch-Aufteilung) */const processChunk = <TInput, TResult>( worker: Worker, chunk: TInput[], workerIndex: number, subBatchSize: number, subBatchTimeout: number, onProgress: (processed: number) => void): Promise<TResult> => { return new Promise<TResult>((resolve, reject) => { let settled = false let subBatchTimer: ReturnType<typeof setTimeout> | null = null let subBatchIndex = 0
const cleanup = () => { if (subBatchTimer) clearTimeout(subBatchTimer) worker.removeEventListener('message', handler) worker.removeEventListener('error', errorHandler) }
const resetSubBatchTimer = () => { if (subBatchTimer) clearTimeout(subBatchTimer) subBatchTimer = setTimeout(() => { if (!settled) { settled = true cleanup() reject( new Error( `Worker ${workerIndex} sub-batch timed out after ${ subBatchTimeout / 1000 }s (chunk: ${chunk.length} items)` ) ) } }, subBatchTimeout) }
const sendNextSubBatch = () => { const start = subBatchIndex * subBatchSize if (start >= chunk.length) { // Alle Sub-Batches gesendet, Flush anfordern worker.postMessage({ type: 'flush' } as FlushMessage) return } const subBatch = chunk.slice(start, start + subBatchSize) subBatchIndex++ resetSubBatchTimer() worker.postMessage({ type: 'sub-batch', items: subBatch, } as SubBatchMessage<TInput>) }
const handler = (event: MessageEvent<WorkerMessage<TResult>>) => { if (settled) return const msg = event.data
if (msg.type === 'progress') { onProgress(msg.processed) } else if (msg.type === 'sub-batch-done') { sendNextSubBatch() } else if (msg.type === 'error') { settled = true cleanup() reject(new Error(`Worker ${workerIndex} error: ${msg.error}`)) } else if (msg.type === 'result') { settled = true cleanup() resolve(msg.data) } }
const errorHandler = (err: ErrorEvent) => { if (!settled) { settled = true cleanup() reject(err.error || err) } }
worker.addEventListener('message', handler) worker.addEventListener('error', errorHandler) sendNextSubBatch() })}
/** * Worker erstellen (Browser/Node.js-kompatibel) */const createWorker = (workerUrl: string | URL): Worker => { if (typeof Worker !== 'undefined') { // Browser-Umgebung return new Worker(workerUrl, { type: 'module' }) } // Node.js-Umgebung // @ts-expect-error - Node.js Worker ist require('node:worker_threads').Worker const { Worker: NodeWorker } = require('node:worker_threads') return new NodeWorker(workerUrl)}
/** * Standard-Pool-Größe abrufen */const getDefaultPoolSize = (): number => { if (typeof navigator !== 'undefined' && navigator.hardwareConcurrency) { // Browser-Umgebung return Math.min(8, Math.max(1, navigator.hardwareConcurrency - 1)) } // Node.js-Umgebung try { // @ts-expect-error - Node.js os module const os = require('node:os') return Math.min(8, Math.max(1, os.cpus().length - 1)) } catch { return 4 // Fallback }}Verwendung
Haupt-Thread
// Worker-Pool erstellenconst pool = createWorkerPool<string, ParsedFile>({ workerUrl: new URL('./parser.worker.js', import.meta.url), poolSize: 4, subBatchSize: 1000, subBatchTimeout: 20000,})
// Massive Dateiliste verarbeitenconst files = ['file1.txt', 'file2.txt', /* ... 10000 files */]const results = await pool.dispatch(files, (processed) => { console.log(`Processed: ${processed}/${files.length}`)})
// Beendenawait pool.terminate()Worker-Seite (parser.worker.ts)
// Akkumulationspuffer pro Sub-Batchlet accumulated: ParsedFile[] = []let totalProcessed = 0
self.addEventListener('message', async (event) => { const msg = event.data
if (msg.type === 'sub-batch') { // Sub-Batch verarbeiten for (const filePath of msg.items) { const parsed = await parseFile(filePath) accumulated.push(parsed) totalProcessed++ }
// Fortschritt melden self.postMessage({ type: 'progress', processed: totalProcessed })
// Sub-Batch-Abschluss benachrichtigen self.postMessage({ type: 'sub-batch-done' }) } else if (msg.type === 'flush') { // Endergebnis zurückgeben self.postMessage({ type: 'result', data: accumulated }) accumulated = [] totalProcessed = 0 }})
const parseFile = async (path: string): Promise<ParsedFile> => { // Tatsächliche Parsing-Logik return { path, content: '...' }}Funktionsweise
- 3-Ebenen-Aufteilung: Die Verarbeitung ist in
alle Items → Worker-Chunks → Sub-Batchesaufgeteilt - Sub-Batch-Versand: Senden Sie nur
subBatchSizeItems gleichzeitig an jeden Worker, dann das nächste nach Abschluss - Timeout-Verwaltung: Unabhängige Timer pro Sub-Batch setzen, um Anomalien wie riesige Dateien früh zu erkennen
- Fortschrittsaggregation: Verarbeitete Items pro Worker zählen und Gesamtfortschritt in Echtzeit aggregieren
- Ergebnis-Flush: Nach dem Senden aller Sub-Batches eine
flush-Nachricht senden, um Endergebnisse von Workern anzufordern
Das Senden in Sub-Batches hält den Structured-Clone-Speicherpeak bei subBatchSize × durchschnittliche Größe statt Gesamt-Items × durchschnittliche Größe.
Vorteile
- Speichereffizienz: Vermeidet Structured-Clone-Speicherdruck durch Senden von Daten in Sub-Batches statt alles auf einmal
- Frühe Fehlererkennung: Timeouts pro Sub-Batch erkennen riesige Dateien oder Endlosschleifen früh
- Fortschrittsvisualisierung: Aggregation des Fortschritts pro Worker verbessert UX mit Echtzeit-Feedback
- Umgebungsunabhängig: Funktioniert mit Browser-Web-Workern und Node.js-worker_threads
Einschränkungen
Sub-Batch-Größe und Timeout-Werte sind workload-abhängig und benötigen Feinabstimmung. Zu klein erhöht Overhead, zu groß reduziert Speichereffizienz. Richtlinien: 1000-2000 für Datei-Parser, 100-500 für Bildkonvertierung.
Das Vergessen der Worker-Protokoll-Implementierung (sub-batch / flush / result-Behandlung) verursacht Deadlocks, daher Vorlagen im Voraus vorbereiten.
Anwendungen
- Node.js-Massen-Datei-Parsing (TypeScript, Markdown, JSON usw.)
- Browser-Stapel-Bildkonvertierung (Größenänderung, Formatkonvertierung)
- CPU-intensive Batch-Jobs (Verschlüsselung, Kompression, Datenaggregation)
- Vorverarbeitungs-Pipelines für große Datensätze
hsb.horse