logo hsb.horse
← Retour à l’index des snippets

Snippets

Pool de Workers avec Sous-lots et Timeout par Sous-lot

Un pattern qui divise de grands ensembles de données en petits sous-lots envoyés aux workers, réduisant la pression mémoire tout en définissant des timeouts par lot pour une détection précoce des anomalies.

Publié: Mis à jour:

Lors du traitement de grands ensembles de données dans des workers, l’envoi de toutes les données en une seule fois provoque une pression mémoire due aux coûts de structured clone. Diviser en sous-lots avec des timeouts par lot équilibre l’efficacité mémoire et la détection d’erreurs.

Code

/**
* Configuration du pool de workers
*/
export interface WorkerPoolOptions {
/** URL du script worker */
workerUrl: string | URL
/** Taille du pool (par défaut basée sur les cœurs CPU) */
poolSize?: number
/** Maximum d'éléments par sous-lot (par défaut: 1500) */
subBatchSize?: number
/** Timeout du sous-lot en millisecondes (par défaut: 30000) */
subBatchTimeout?: number
}
/**
* Interface du pool de workers
*/
export interface WorkerPool<TInput, TResult> {
/** Dispatcher les éléments au pool de workers pour traitement */
dispatch(
items: TInput[],
onProgress?: (processed: number) => void
): Promise<TResult[]>
/** Terminer tous les workers */
terminate(): Promise<void>
/** Nombre de workers dans le pool */
readonly size: number
}
/**
* Protocole worker: message de sous-lot
*/
interface SubBatchMessage<T> {
type: 'sub-batch'
items: T[]
}
/**
* Protocole worker: demande de flush
*/
interface FlushMessage {
type: 'flush'
}
/**
* Protocole worker: réponse de sous-lot terminé
*/
interface SubBatchDoneMessage {
type: 'sub-batch-done'
}
/**
* Protocole worker: rapport de progression
*/
interface ProgressMessage {
type: 'progress'
processed: number
}
/**
* Protocole worker: notification d'erreur
*/
interface ErrorMessage {
type: 'error'
error: string
}
/**
* Protocole worker: résultat final
*/
interface ResultMessage<T> {
type: 'result'
data: T
}
type WorkerMessage<T> =
| SubBatchDoneMessage
| ProgressMessage
| ErrorMessage
| ResultMessage<T>
/**
* Créer un pool de workers
*/
export const createWorkerPool = <TInput, TResult>(
options: WorkerPoolOptions
): WorkerPool<TInput, TResult> => {
const {
workerUrl,
poolSize,
subBatchSize = 1500,
subBatchTimeout = 30000,
} = options
// Déterminer la taille du pool (compatible navigateur/Node.js)
const size = poolSize ?? getDefaultPoolSize()
const workers: Worker[] = []
// Créer les workers
for (let i = 0; i < size; i++) {
workers.push(createWorker(workerUrl))
}
const dispatch = (
items: TInput[],
onProgress?: (processed: number) => void
): Promise<TResult[]> => {
if (items.length === 0) return Promise.resolve([])
// Diviser les éléments par nombre de workers
const chunkSize = Math.ceil(items.length / size)
const chunks: TInput[][] = []
for (let i = 0; i < items.length; i += chunkSize) {
chunks.push(items.slice(i, i + chunkSize))
}
// Compteurs de progression par worker
const workerProgress = new Array(chunks.length).fill(0)
// Dispatcher les chunks aux workers
const promises = chunks.map((chunk, workerIndex) => {
const worker = workers[workerIndex]
return processChunk<TInput, TResult>(
worker,
chunk,
workerIndex,
subBatchSize,
subBatchTimeout,
(processed) => {
workerProgress[workerIndex] = processed
if (onProgress) {
const total = workerProgress.reduce((a, b) => a + b, 0)
onProgress(total)
}
}
)
})
return Promise.all(promises)
}
const terminate = async (): Promise<void> => {
await Promise.all(workers.map((w) => w.terminate()))
workers.length = 0
}
return { dispatch, terminate, size }
}
/**
* Traiter un chunk dans un seul worker (division en sous-lots)
*/
const processChunk = <TInput, TResult>(
worker: Worker,
chunk: TInput[],
workerIndex: number,
subBatchSize: number,
subBatchTimeout: number,
onProgress: (processed: number) => void
): Promise<TResult> => {
return new Promise<TResult>((resolve, reject) => {
let settled = false
let subBatchTimer: ReturnType<typeof setTimeout> | null = null
let subBatchIndex = 0
const cleanup = () => {
if (subBatchTimer) clearTimeout(subBatchTimer)
worker.removeEventListener('message', handler)
worker.removeEventListener('error', errorHandler)
}
const resetSubBatchTimer = () => {
if (subBatchTimer) clearTimeout(subBatchTimer)
subBatchTimer = setTimeout(() => {
if (!settled) {
settled = true
cleanup()
reject(
new Error(
`Worker ${workerIndex} sub-batch timed out after ${
subBatchTimeout / 1000
}s (chunk: ${chunk.length} items)`
)
)
}
}, subBatchTimeout)
}
const sendNextSubBatch = () => {
const start = subBatchIndex * subBatchSize
if (start >= chunk.length) {
// Tous les sous-lots envoyés, demander le flush
worker.postMessage({ type: 'flush' } as FlushMessage)
return
}
const subBatch = chunk.slice(start, start + subBatchSize)
subBatchIndex++
resetSubBatchTimer()
worker.postMessage({
type: 'sub-batch',
items: subBatch,
} as SubBatchMessage<TInput>)
}
const handler = (event: MessageEvent<WorkerMessage<TResult>>) => {
if (settled) return
const msg = event.data
if (msg.type === 'progress') {
onProgress(msg.processed)
} else if (msg.type === 'sub-batch-done') {
sendNextSubBatch()
} else if (msg.type === 'error') {
settled = true
cleanup()
reject(new Error(`Worker ${workerIndex} error: ${msg.error}`))
} else if (msg.type === 'result') {
settled = true
cleanup()
resolve(msg.data)
}
}
const errorHandler = (err: ErrorEvent) => {
if (!settled) {
settled = true
cleanup()
reject(err.error || err)
}
}
worker.addEventListener('message', handler)
worker.addEventListener('error', errorHandler)
sendNextSubBatch()
})
}
/**
* Créer un worker (compatible navigateur/Node.js)
*/
const createWorker = (workerUrl: string | URL): Worker => {
if (typeof Worker !== 'undefined') {
// Environnement navigateur
return new Worker(workerUrl, { type: 'module' })
}
// Environnement Node.js
// @ts-expect-error - Node.js Worker est require('node:worker_threads').Worker
const { Worker: NodeWorker } = require('node:worker_threads')
return new NodeWorker(workerUrl)
}
/**
* Obtenir la taille par défaut du pool
*/
const getDefaultPoolSize = (): number => {
if (typeof navigator !== 'undefined' && navigator.hardwareConcurrency) {
// Environnement navigateur
return Math.min(8, Math.max(1, navigator.hardwareConcurrency - 1))
}
// Environnement Node.js
try {
// @ts-expect-error - Node.js os module
const os = require('node:os')
return Math.min(8, Math.max(1, os.cpus().length - 1))
} catch {
return 4 // Fallback
}
}

Utilisation

Thread Principal

// Créer le pool de workers
const pool = createWorkerPool<string, ParsedFile>({
workerUrl: new URL('./parser.worker.js', import.meta.url),
poolSize: 4,
subBatchSize: 1000,
subBatchTimeout: 20000,
})
// Traiter une liste massive de fichiers
const files = ['file1.txt', 'file2.txt', /* ... 10000 files */]
const results = await pool.dispatch(files, (processed) => {
console.log(`Processed: ${processed}/${files.length}`)
})
// Terminer
await pool.terminate()

Côté Worker (parser.worker.ts)

// Tampon d'accumulation par sous-lot
let accumulated: ParsedFile[] = []
let totalProcessed = 0
self.addEventListener('message', async (event) => {
const msg = event.data
if (msg.type === 'sub-batch') {
// Traiter le sous-lot
for (const filePath of msg.items) {
const parsed = await parseFile(filePath)
accumulated.push(parsed)
totalProcessed++
}
// Rapporter la progression
self.postMessage({ type: 'progress', processed: totalProcessed })
// Notifier la fin du sous-lot
self.postMessage({ type: 'sub-batch-done' })
} else if (msg.type === 'flush') {
// Retourner le résultat final
self.postMessage({ type: 'result', data: accumulated })
accumulated = []
totalProcessed = 0
}
})
const parseFile = async (path: string): Promise<ParsedFile> => {
// Logique de parsing réelle
return { path, content: '...' }
}

Fonctionnement

  1. Division à 3 niveaux: Le traitement est divisé en tous les éléments → chunks de workers → sous-lots
  2. Envoi de sous-lots: Envoyer seulement subBatchSize éléments à chaque worker à la fois, puis envoyer le suivant après achèvement
  3. Gestion des timeouts: Définir des timers indépendants par sous-lot pour détecter les anomalies comme les fichiers énormes tôt
  4. Agrégation de la progression: Compter les éléments traités par worker et agréger la progression totale en temps réel
  5. Flush des résultats: Après l’envoi de tous les sous-lots, envoyer un message flush pour demander les résultats finaux aux workers

L’envoi en sous-lots maintient le pic mémoire de structured clone à subBatchSize × taille moyenne au lieu de total éléments × taille moyenne.

Avantages

  • Efficacité mémoire: Évite la pression mémoire de structured clone en envoyant les données en sous-lots au lieu de tout en une fois
  • Détection précoce d’erreurs: Les timeouts par sous-lot détectent tôt les fichiers énormes ou les boucles infinies
  • Visualisation de la progression: L’agrégation de la progression par worker améliore l’UX avec des retours en temps réel
  • Agnostique de l’environnement: Fonctionne avec les Web Workers du navigateur et worker_threads de Node.js

Mises en garde

La taille des sous-lots et les valeurs de timeout dépendent de la charge de travail et nécessitent un réglage. Trop petit augmente l’overhead, trop grand réduit l’efficacité mémoire. Directives: 1000-2000 pour les parseurs de fichiers, 100-500 pour la conversion d’images.

Oublier d’implémenter le protocole worker (gestion de sub-batch / flush / result) provoque des deadlocks, donc préparer des templates à l’avance.

Applications

  • Parsing massif de fichiers Node.js (TypeScript, Markdown, JSON, etc.)
  • Conversion d’images par lots dans le navigateur (redimensionnement, conversion de format)
  • Tâches batch gourmandes en CPU (chiffrement, compression, agrégation de données)
  • Pipelines de prétraitement de grands ensembles de données