logo hsb.horse
← Zur Snippets-Übersicht

Snippets

Worker-Pool mit Sub-Batches und Per-Sub-Batch-Timeout

Ein Muster, das große Datensätze in kleine Sub-Batches aufteilt, die an Worker gesendet werden, um Speicherdruck zu reduzieren und gleichzeitig Timeouts pro Batch für frühe Anomalieerkennung festzulegen.

Veröffentlicht: Aktualisiert:

Bei der Verarbeitung massiver Datensätze in Workern verursacht das Senden aller Daten auf einmal Speicherdruck durch Structured-Clone-Kosten. Die Aufteilung in Sub-Batches mit Timeouts pro Batch gleicht Speichereffizienz und Fehlererkennung aus.

Code

/**
* Worker-Pool-Konfiguration
*/
export interface WorkerPoolOptions {
/** Worker-Script-URL */
workerUrl: string | URL
/** Pool-Größe (standardmäßig basierend auf CPU-Kernen) */
poolSize?: number
/** Maximale Items pro Sub-Batch (Standard: 1500) */
subBatchSize?: number
/** Sub-Batch-Timeout in Millisekunden (Standard: 30000) */
subBatchTimeout?: number
}
/**
* Worker-Pool-Interface
*/
export interface WorkerPool<TInput, TResult> {
/** Items an Worker-Pool zur Verarbeitung verteilen */
dispatch(
items: TInput[],
onProgress?: (processed: number) => void
): Promise<TResult[]>
/** Alle Worker beenden */
terminate(): Promise<void>
/** Anzahl der Worker im Pool */
readonly size: number
}
/**
* Worker-Protokoll: Sub-Batch-Nachricht
*/
interface SubBatchMessage<T> {
type: 'sub-batch'
items: T[]
}
/**
* Worker-Protokoll: Flush-Anfrage
*/
interface FlushMessage {
type: 'flush'
}
/**
* Worker-Protokoll: Sub-Batch-Fertig-Antwort
*/
interface SubBatchDoneMessage {
type: 'sub-batch-done'
}
/**
* Worker-Protokoll: Fortschrittsbericht
*/
interface ProgressMessage {
type: 'progress'
processed: number
}
/**
* Worker-Protokoll: Fehlerbenachrichtigung
*/
interface ErrorMessage {
type: 'error'
error: string
}
/**
* Worker-Protokoll: Endergebnis
*/
interface ResultMessage<T> {
type: 'result'
data: T
}
type WorkerMessage<T> =
| SubBatchDoneMessage
| ProgressMessage
| ErrorMessage
| ResultMessage<T>
/**
* Worker-Pool erstellen
*/
export const createWorkerPool = <TInput, TResult>(
options: WorkerPoolOptions
): WorkerPool<TInput, TResult> => {
const {
workerUrl,
poolSize,
subBatchSize = 1500,
subBatchTimeout = 30000,
} = options
// Pool-Größe bestimmen (Browser/Node.js-kompatibel)
const size = poolSize ?? getDefaultPoolSize()
const workers: Worker[] = []
// Worker erstellen
for (let i = 0; i < size; i++) {
workers.push(createWorker(workerUrl))
}
const dispatch = (
items: TInput[],
onProgress?: (processed: number) => void
): Promise<TResult[]> => {
if (items.length === 0) return Promise.resolve([])
// Items nach Worker-Anzahl aufteilen
const chunkSize = Math.ceil(items.length / size)
const chunks: TInput[][] = []
for (let i = 0; i < items.length; i += chunkSize) {
chunks.push(items.slice(i, i + chunkSize))
}
// Fortschrittszähler pro Worker
const workerProgress = new Array(chunks.length).fill(0)
// Chunks an Worker verteilen
const promises = chunks.map((chunk, workerIndex) => {
const worker = workers[workerIndex]
return processChunk<TInput, TResult>(
worker,
chunk,
workerIndex,
subBatchSize,
subBatchTimeout,
(processed) => {
workerProgress[workerIndex] = processed
if (onProgress) {
const total = workerProgress.reduce((a, b) => a + b, 0)
onProgress(total)
}
}
)
})
return Promise.all(promises)
}
const terminate = async (): Promise<void> => {
await Promise.all(workers.map((w) => w.terminate()))
workers.length = 0
}
return { dispatch, terminate, size }
}
/**
* Chunk in einem einzelnen Worker verarbeiten (Sub-Batch-Aufteilung)
*/
const processChunk = <TInput, TResult>(
worker: Worker,
chunk: TInput[],
workerIndex: number,
subBatchSize: number,
subBatchTimeout: number,
onProgress: (processed: number) => void
): Promise<TResult> => {
return new Promise<TResult>((resolve, reject) => {
let settled = false
let subBatchTimer: ReturnType<typeof setTimeout> | null = null
let subBatchIndex = 0
const cleanup = () => {
if (subBatchTimer) clearTimeout(subBatchTimer)
worker.removeEventListener('message', handler)
worker.removeEventListener('error', errorHandler)
}
const resetSubBatchTimer = () => {
if (subBatchTimer) clearTimeout(subBatchTimer)
subBatchTimer = setTimeout(() => {
if (!settled) {
settled = true
cleanup()
reject(
new Error(
`Worker ${workerIndex} sub-batch timed out after ${
subBatchTimeout / 1000
}s (chunk: ${chunk.length} items)`
)
)
}
}, subBatchTimeout)
}
const sendNextSubBatch = () => {
const start = subBatchIndex * subBatchSize
if (start >= chunk.length) {
// Alle Sub-Batches gesendet, Flush anfordern
worker.postMessage({ type: 'flush' } as FlushMessage)
return
}
const subBatch = chunk.slice(start, start + subBatchSize)
subBatchIndex++
resetSubBatchTimer()
worker.postMessage({
type: 'sub-batch',
items: subBatch,
} as SubBatchMessage<TInput>)
}
const handler = (event: MessageEvent<WorkerMessage<TResult>>) => {
if (settled) return
const msg = event.data
if (msg.type === 'progress') {
onProgress(msg.processed)
} else if (msg.type === 'sub-batch-done') {
sendNextSubBatch()
} else if (msg.type === 'error') {
settled = true
cleanup()
reject(new Error(`Worker ${workerIndex} error: ${msg.error}`))
} else if (msg.type === 'result') {
settled = true
cleanup()
resolve(msg.data)
}
}
const errorHandler = (err: ErrorEvent) => {
if (!settled) {
settled = true
cleanup()
reject(err.error || err)
}
}
worker.addEventListener('message', handler)
worker.addEventListener('error', errorHandler)
sendNextSubBatch()
})
}
/**
* Worker erstellen (Browser/Node.js-kompatibel)
*/
const createWorker = (workerUrl: string | URL): Worker => {
if (typeof Worker !== 'undefined') {
// Browser-Umgebung
return new Worker(workerUrl, { type: 'module' })
}
// Node.js-Umgebung
// @ts-expect-error - Node.js Worker ist require('node:worker_threads').Worker
const { Worker: NodeWorker } = require('node:worker_threads')
return new NodeWorker(workerUrl)
}
/**
* Standard-Pool-Größe abrufen
*/
const getDefaultPoolSize = (): number => {
if (typeof navigator !== 'undefined' && navigator.hardwareConcurrency) {
// Browser-Umgebung
return Math.min(8, Math.max(1, navigator.hardwareConcurrency - 1))
}
// Node.js-Umgebung
try {
// @ts-expect-error - Node.js os module
const os = require('node:os')
return Math.min(8, Math.max(1, os.cpus().length - 1))
} catch {
return 4 // Fallback
}
}

Verwendung

Haupt-Thread

// Worker-Pool erstellen
const pool = createWorkerPool<string, ParsedFile>({
workerUrl: new URL('./parser.worker.js', import.meta.url),
poolSize: 4,
subBatchSize: 1000,
subBatchTimeout: 20000,
})
// Massive Dateiliste verarbeiten
const files = ['file1.txt', 'file2.txt', /* ... 10000 files */]
const results = await pool.dispatch(files, (processed) => {
console.log(`Processed: ${processed}/${files.length}`)
})
// Beenden
await pool.terminate()

Worker-Seite (parser.worker.ts)

// Akkumulationspuffer pro Sub-Batch
let accumulated: ParsedFile[] = []
let totalProcessed = 0
self.addEventListener('message', async (event) => {
const msg = event.data
if (msg.type === 'sub-batch') {
// Sub-Batch verarbeiten
for (const filePath of msg.items) {
const parsed = await parseFile(filePath)
accumulated.push(parsed)
totalProcessed++
}
// Fortschritt melden
self.postMessage({ type: 'progress', processed: totalProcessed })
// Sub-Batch-Abschluss benachrichtigen
self.postMessage({ type: 'sub-batch-done' })
} else if (msg.type === 'flush') {
// Endergebnis zurückgeben
self.postMessage({ type: 'result', data: accumulated })
accumulated = []
totalProcessed = 0
}
})
const parseFile = async (path: string): Promise<ParsedFile> => {
// Tatsächliche Parsing-Logik
return { path, content: '...' }
}

Funktionsweise

  1. 3-Ebenen-Aufteilung: Die Verarbeitung ist in alle Items → Worker-Chunks → Sub-Batches aufgeteilt
  2. Sub-Batch-Versand: Senden Sie nur subBatchSize Items gleichzeitig an jeden Worker, dann das nächste nach Abschluss
  3. Timeout-Verwaltung: Unabhängige Timer pro Sub-Batch setzen, um Anomalien wie riesige Dateien früh zu erkennen
  4. Fortschrittsaggregation: Verarbeitete Items pro Worker zählen und Gesamtfortschritt in Echtzeit aggregieren
  5. Ergebnis-Flush: Nach dem Senden aller Sub-Batches eine flush-Nachricht senden, um Endergebnisse von Workern anzufordern

Das Senden in Sub-Batches hält den Structured-Clone-Speicherpeak bei subBatchSize × durchschnittliche Größe statt Gesamt-Items × durchschnittliche Größe.

Vorteile

  • Speichereffizienz: Vermeidet Structured-Clone-Speicherdruck durch Senden von Daten in Sub-Batches statt alles auf einmal
  • Frühe Fehlererkennung: Timeouts pro Sub-Batch erkennen riesige Dateien oder Endlosschleifen früh
  • Fortschrittsvisualisierung: Aggregation des Fortschritts pro Worker verbessert UX mit Echtzeit-Feedback
  • Umgebungsunabhängig: Funktioniert mit Browser-Web-Workern und Node.js-worker_threads

Einschränkungen

Sub-Batch-Größe und Timeout-Werte sind workload-abhängig und benötigen Feinabstimmung. Zu klein erhöht Overhead, zu groß reduziert Speichereffizienz. Richtlinien: 1000-2000 für Datei-Parser, 100-500 für Bildkonvertierung.

Das Vergessen der Worker-Protokoll-Implementierung (sub-batch / flush / result-Behandlung) verursacht Deadlocks, daher Vorlagen im Voraus vorbereiten.

Anwendungen

  • Node.js-Massen-Datei-Parsing (TypeScript, Markdown, JSON usw.)
  • Browser-Stapel-Bildkonvertierung (Größenänderung, Formatkonvertierung)
  • CPU-intensive Batch-Jobs (Verschlüsselung, Kompression, Datenaggregation)
  • Vorverarbeitungs-Pipelines für große Datensätze