Lors du traitement de grands ensembles de données dans des workers, l’envoi de toutes les données en une seule fois provoque une pression mémoire due aux coûts de structured clone. Diviser en sous-lots avec des timeouts par lot équilibre l’efficacité mémoire et la détection d’erreurs.
Code
/** * Configuration du pool de workers */export interface WorkerPoolOptions { /** URL du script worker */ workerUrl: string | URL /** Taille du pool (par défaut basée sur les cœurs CPU) */ poolSize?: number /** Maximum d'éléments par sous-lot (par défaut: 1500) */ subBatchSize?: number /** Timeout du sous-lot en millisecondes (par défaut: 30000) */ subBatchTimeout?: number}
/** * Interface du pool de workers */export interface WorkerPool<TInput, TResult> { /** Dispatcher les éléments au pool de workers pour traitement */ dispatch( items: TInput[], onProgress?: (processed: number) => void ): Promise<TResult[]> /** Terminer tous les workers */ terminate(): Promise<void> /** Nombre de workers dans le pool */ readonly size: number}
/** * Protocole worker: message de sous-lot */interface SubBatchMessage<T> { type: 'sub-batch' items: T[]}
/** * Protocole worker: demande de flush */interface FlushMessage { type: 'flush'}
/** * Protocole worker: réponse de sous-lot terminé */interface SubBatchDoneMessage { type: 'sub-batch-done'}
/** * Protocole worker: rapport de progression */interface ProgressMessage { type: 'progress' processed: number}
/** * Protocole worker: notification d'erreur */interface ErrorMessage { type: 'error' error: string}
/** * Protocole worker: résultat final */interface ResultMessage<T> { type: 'result' data: T}
type WorkerMessage<T> = | SubBatchDoneMessage | ProgressMessage | ErrorMessage | ResultMessage<T>
/** * Créer un pool de workers */export const createWorkerPool = <TInput, TResult>( options: WorkerPoolOptions): WorkerPool<TInput, TResult> => { const { workerUrl, poolSize, subBatchSize = 1500, subBatchTimeout = 30000, } = options
// Déterminer la taille du pool (compatible navigateur/Node.js) const size = poolSize ?? getDefaultPoolSize() const workers: Worker[] = []
// Créer les workers for (let i = 0; i < size; i++) { workers.push(createWorker(workerUrl)) }
const dispatch = ( items: TInput[], onProgress?: (processed: number) => void ): Promise<TResult[]> => { if (items.length === 0) return Promise.resolve([])
// Diviser les éléments par nombre de workers const chunkSize = Math.ceil(items.length / size) const chunks: TInput[][] = [] for (let i = 0; i < items.length; i += chunkSize) { chunks.push(items.slice(i, i + chunkSize)) }
// Compteurs de progression par worker const workerProgress = new Array(chunks.length).fill(0)
// Dispatcher les chunks aux workers const promises = chunks.map((chunk, workerIndex) => { const worker = workers[workerIndex] return processChunk<TInput, TResult>( worker, chunk, workerIndex, subBatchSize, subBatchTimeout, (processed) => { workerProgress[workerIndex] = processed if (onProgress) { const total = workerProgress.reduce((a, b) => a + b, 0) onProgress(total) } } ) })
return Promise.all(promises) }
const terminate = async (): Promise<void> => { await Promise.all(workers.map((w) => w.terminate())) workers.length = 0 }
return { dispatch, terminate, size }}
/** * Traiter un chunk dans un seul worker (division en sous-lots) */const processChunk = <TInput, TResult>( worker: Worker, chunk: TInput[], workerIndex: number, subBatchSize: number, subBatchTimeout: number, onProgress: (processed: number) => void): Promise<TResult> => { return new Promise<TResult>((resolve, reject) => { let settled = false let subBatchTimer: ReturnType<typeof setTimeout> | null = null let subBatchIndex = 0
const cleanup = () => { if (subBatchTimer) clearTimeout(subBatchTimer) worker.removeEventListener('message', handler) worker.removeEventListener('error', errorHandler) }
const resetSubBatchTimer = () => { if (subBatchTimer) clearTimeout(subBatchTimer) subBatchTimer = setTimeout(() => { if (!settled) { settled = true cleanup() reject( new Error( `Worker ${workerIndex} sub-batch timed out after ${ subBatchTimeout / 1000 }s (chunk: ${chunk.length} items)` ) ) } }, subBatchTimeout) }
const sendNextSubBatch = () => { const start = subBatchIndex * subBatchSize if (start >= chunk.length) { // Tous les sous-lots envoyés, demander le flush worker.postMessage({ type: 'flush' } as FlushMessage) return } const subBatch = chunk.slice(start, start + subBatchSize) subBatchIndex++ resetSubBatchTimer() worker.postMessage({ type: 'sub-batch', items: subBatch, } as SubBatchMessage<TInput>) }
const handler = (event: MessageEvent<WorkerMessage<TResult>>) => { if (settled) return const msg = event.data
if (msg.type === 'progress') { onProgress(msg.processed) } else if (msg.type === 'sub-batch-done') { sendNextSubBatch() } else if (msg.type === 'error') { settled = true cleanup() reject(new Error(`Worker ${workerIndex} error: ${msg.error}`)) } else if (msg.type === 'result') { settled = true cleanup() resolve(msg.data) } }
const errorHandler = (err: ErrorEvent) => { if (!settled) { settled = true cleanup() reject(err.error || err) } }
worker.addEventListener('message', handler) worker.addEventListener('error', errorHandler) sendNextSubBatch() })}
/** * Créer un worker (compatible navigateur/Node.js) */const createWorker = (workerUrl: string | URL): Worker => { if (typeof Worker !== 'undefined') { // Environnement navigateur return new Worker(workerUrl, { type: 'module' }) } // Environnement Node.js // @ts-expect-error - Node.js Worker est require('node:worker_threads').Worker const { Worker: NodeWorker } = require('node:worker_threads') return new NodeWorker(workerUrl)}
/** * Obtenir la taille par défaut du pool */const getDefaultPoolSize = (): number => { if (typeof navigator !== 'undefined' && navigator.hardwareConcurrency) { // Environnement navigateur return Math.min(8, Math.max(1, navigator.hardwareConcurrency - 1)) } // Environnement Node.js try { // @ts-expect-error - Node.js os module const os = require('node:os') return Math.min(8, Math.max(1, os.cpus().length - 1)) } catch { return 4 // Fallback }}Utilisation
Thread Principal
// Créer le pool de workersconst pool = createWorkerPool<string, ParsedFile>({ workerUrl: new URL('./parser.worker.js', import.meta.url), poolSize: 4, subBatchSize: 1000, subBatchTimeout: 20000,})
// Traiter une liste massive de fichiersconst files = ['file1.txt', 'file2.txt', /* ... 10000 files */]const results = await pool.dispatch(files, (processed) => { console.log(`Processed: ${processed}/${files.length}`)})
// Terminerawait pool.terminate()Côté Worker (parser.worker.ts)
// Tampon d'accumulation par sous-lotlet accumulated: ParsedFile[] = []let totalProcessed = 0
self.addEventListener('message', async (event) => { const msg = event.data
if (msg.type === 'sub-batch') { // Traiter le sous-lot for (const filePath of msg.items) { const parsed = await parseFile(filePath) accumulated.push(parsed) totalProcessed++ }
// Rapporter la progression self.postMessage({ type: 'progress', processed: totalProcessed })
// Notifier la fin du sous-lot self.postMessage({ type: 'sub-batch-done' }) } else if (msg.type === 'flush') { // Retourner le résultat final self.postMessage({ type: 'result', data: accumulated }) accumulated = [] totalProcessed = 0 }})
const parseFile = async (path: string): Promise<ParsedFile> => { // Logique de parsing réelle return { path, content: '...' }}Fonctionnement
- Division à 3 niveaux: Le traitement est divisé en
tous les éléments → chunks de workers → sous-lots - Envoi de sous-lots: Envoyer seulement
subBatchSizeéléments à chaque worker à la fois, puis envoyer le suivant après achèvement - Gestion des timeouts: Définir des timers indépendants par sous-lot pour détecter les anomalies comme les fichiers énormes tôt
- Agrégation de la progression: Compter les éléments traités par worker et agréger la progression totale en temps réel
- Flush des résultats: Après l’envoi de tous les sous-lots, envoyer un message
flushpour demander les résultats finaux aux workers
L’envoi en sous-lots maintient le pic mémoire de structured clone à subBatchSize × taille moyenne au lieu de total éléments × taille moyenne.
Avantages
- Efficacité mémoire: Évite la pression mémoire de structured clone en envoyant les données en sous-lots au lieu de tout en une fois
- Détection précoce d’erreurs: Les timeouts par sous-lot détectent tôt les fichiers énormes ou les boucles infinies
- Visualisation de la progression: L’agrégation de la progression par worker améliore l’UX avec des retours en temps réel
- Agnostique de l’environnement: Fonctionne avec les Web Workers du navigateur et worker_threads de Node.js
Mises en garde
La taille des sous-lots et les valeurs de timeout dépendent de la charge de travail et nécessitent un réglage. Trop petit augmente l’overhead, trop grand réduit l’efficacité mémoire. Directives: 1000-2000 pour les parseurs de fichiers, 100-500 pour la conversion d’images.
Oublier d’implémenter le protocole worker (gestion de sub-batch / flush / result) provoque des deadlocks, donc préparer des templates à l’avance.
Applications
- Parsing massif de fichiers Node.js (TypeScript, Markdown, JSON, etc.)
- Conversion d’images par lots dans le navigateur (redimensionnement, conversion de format)
- Tâches batch gourmandes en CPU (chiffrement, compression, agrégation de données)
- Pipelines de prétraitement de grands ensembles de données
hsb.horse