大量のアイテムをWorkerで並列処理する際、全データを一気に送ると structured clone のメモリコストで圧迫される。サブバッチに分割して段階的に送信し、各サブバッチにタイムアウトを張ることで、メモリ効率とエラー検出を両立する。
コード
/** * Workerプール設定 */export interface WorkerPoolOptions { /** Worker スクリプトのURL */ workerUrl: string | URL /** プールサイズ(省略時はCPUコア数に基づき自動決定) */ poolSize?: number /** サブバッチあたりの最大アイテム数(デフォルト: 1500) */ subBatchSize?: number /** サブバッチのタイムアウト(ミリ秒、デフォルト: 30000) */ subBatchTimeout?: number}
/** * Workerプールインターフェース */export interface WorkerPool<TInput, TResult> { /** アイテムをWorkerプールに配分して処理 */ dispatch( items: TInput[], onProgress?: (processed: number) => void ): Promise<TResult[]> /** 全Workerを終了 */ terminate(): Promise<void> /** プール内のWorker数 */ readonly size: number}
/** * Workerプロトコル: サブバッチ送信 */interface SubBatchMessage<T> { type: 'sub-batch' items: T[]}
/** * Workerプロトコル: フラッシュ要求 */interface FlushMessage { type: 'flush'}
/** * Workerプロトコル: サブバッチ完了応答 */interface SubBatchDoneMessage { type: 'sub-batch-done'}
/** * Workerプロトコル: 進捗報告 */interface ProgressMessage { type: 'progress' processed: number}
/** * Workerプロトコル: エラー通知 */interface ErrorMessage { type: 'error' error: string}
/** * Workerプロトコル: 最終結果 */interface ResultMessage<T> { type: 'result' data: T}
type WorkerMessage<T> = | SubBatchDoneMessage | ProgressMessage | ErrorMessage | ResultMessage<T>
/** * Workerプールを作成 */export const createWorkerPool = <TInput, TResult>( options: WorkerPoolOptions): WorkerPool<TInput, TResult> => { const { workerUrl, poolSize, subBatchSize = 1500, subBatchTimeout = 30000, } = options
// プールサイズを決定(ブラウザ/Node.js両対応) const size = poolSize ?? getDefaultPoolSize() const workers: Worker[] = []
// Workerを生成 for (let i = 0; i < size; i++) { workers.push(createWorker(workerUrl)) }
const dispatch = ( items: TInput[], onProgress?: (processed: number) => void ): Promise<TResult[]> => { if (items.length === 0) return Promise.resolve([])
// アイテムをWorker数で分割 const chunkSize = Math.ceil(items.length / size) const chunks: TInput[][] = [] for (let i = 0; i < items.length; i += chunkSize) { chunks.push(items.slice(i, i + chunkSize)) }
// Worker別進捗カウンター const workerProgress = new Array(chunks.length).fill(0)
// 各Workerにチャンクを配分 const promises = chunks.map((chunk, workerIndex) => { const worker = workers[workerIndex] return processChunk<TInput, TResult>( worker, chunk, workerIndex, subBatchSize, subBatchTimeout, (processed) => { workerProgress[workerIndex] = processed if (onProgress) { const total = workerProgress.reduce((a, b) => a + b, 0) onProgress(total) } } ) })
return Promise.all(promises) }
const terminate = async (): Promise<void> => { await Promise.all(workers.map((w) => w.terminate())) workers.length = 0 }
return { dispatch, terminate, size }}
/** * 単一Workerでチャンクを処理(サブバッチ分割) */const processChunk = <TInput, TResult>( worker: Worker, chunk: TInput[], workerIndex: number, subBatchSize: number, subBatchTimeout: number, onProgress: (processed: number) => void): Promise<TResult> => { return new Promise<TResult>((resolve, reject) => { let settled = false let subBatchTimer: ReturnType<typeof setTimeout> | null = null let subBatchIndex = 0
const cleanup = () => { if (subBatchTimer) clearTimeout(subBatchTimer) worker.removeEventListener('message', handler) worker.removeEventListener('error', errorHandler) }
const resetSubBatchTimer = () => { if (subBatchTimer) clearTimeout(subBatchTimer) subBatchTimer = setTimeout(() => { if (!settled) { settled = true cleanup() reject( new Error( `Worker ${workerIndex} sub-batch timed out after ${ subBatchTimeout / 1000 }s (chunk: ${chunk.length} items)` ) ) } }, subBatchTimeout) }
const sendNextSubBatch = () => { const start = subBatchIndex * subBatchSize if (start >= chunk.length) { // 全サブバッチ送信完了、フラッシュを要求 worker.postMessage({ type: 'flush' } as FlushMessage) return } const subBatch = chunk.slice(start, start + subBatchSize) subBatchIndex++ resetSubBatchTimer() worker.postMessage({ type: 'sub-batch', items: subBatch, } as SubBatchMessage<TInput>) }
const handler = (event: MessageEvent<WorkerMessage<TResult>>) => { if (settled) return const msg = event.data
if (msg.type === 'progress') { onProgress(msg.processed) } else if (msg.type === 'sub-batch-done') { sendNextSubBatch() } else if (msg.type === 'error') { settled = true cleanup() reject(new Error(`Worker ${workerIndex} error: ${msg.error}`)) } else if (msg.type === 'result') { settled = true cleanup() resolve(msg.data) } }
const errorHandler = (err: ErrorEvent) => { if (!settled) { settled = true cleanup() reject(err.error || err) } }
worker.addEventListener('message', handler) worker.addEventListener('error', errorHandler) sendNextSubBatch() })}
/** * Workerを生成(ブラウザ/Node.js両対応) */const createWorker = (workerUrl: string | URL): Worker => { if (typeof Worker !== 'undefined') { // ブラウザ環境 return new Worker(workerUrl, { type: 'module' }) } // Node.js環境 // @ts-expect-error - Node.js Worker は require('node:worker_threads').Worker const { Worker: NodeWorker } = require('node:worker_threads') return new NodeWorker(workerUrl)}
/** * デフォルトプールサイズを取得 */const getDefaultPoolSize = (): number => { if (typeof navigator !== 'undefined' && navigator.hardwareConcurrency) { // ブラウザ環境 return Math.min(8, Math.max(1, navigator.hardwareConcurrency - 1)) } // Node.js環境 try { // @ts-expect-error - Node.js os module const os = require('node:os') return Math.min(8, Math.max(1, os.cpus().length - 1)) } catch { return 4 // フォールバック }}使用例
メインスレッド側
// Workerプールを作成const pool = createWorkerPool<string, ParsedFile>({ workerUrl: new URL('./parser.worker.js', import.meta.url), poolSize: 4, subBatchSize: 1000, subBatchTimeout: 20000,})
// 大量ファイルを処理const files = ['file1.txt', 'file2.txt', /* ... 10000 files */]const results = await pool.dispatch(files, (processed) => { console.log(`Processed: ${processed}/${files.length}`)})
// 終了await pool.terminate()Worker側(parser.worker.ts)
// サブバッチごとの蓄積バッファlet accumulated: ParsedFile[] = []let totalProcessed = 0
self.addEventListener('message', async (event) => { const msg = event.data
if (msg.type === 'sub-batch') { // サブバッチを処理 for (const filePath of msg.items) { const parsed = await parseFile(filePath) accumulated.push(parsed) totalProcessed++ }
// 進捗報告 self.postMessage({ type: 'progress', processed: totalProcessed })
// サブバッチ完了を通知 self.postMessage({ type: 'sub-batch-done' }) } else if (msg.type === 'flush') { // 最終結果を返す self.postMessage({ type: 'result', data: accumulated }) accumulated = [] totalProcessed = 0 }})
const parseFile = async (path: string): Promise<ParsedFile> => { // 実際のパース処理 return { path, content: '...' }}仕組み
- 3段階分割:
全アイテム → Workerチャンク → サブバッチの3段階で処理を分割する - サブバッチ送信: 各Workerへ一度に送るのは
subBatchSize個まで、処理完了後に次を送る - タイムアウト管理: サブバッチごとに独立したタイマーを設定し、巨大ファイルなど異常ケースを早期検出する
- 進捗集約: Worker別に処理数をカウントし、全体進捗をリアルタイムで集約する
- 結果フラッシュ: 全サブバッチ送信完了後、
flushメッセージでWorkerに最終結果の返却を要求する
サブバッチ単位で送信することで、structured clone のメモリピークを 全アイテム数 × 平均サイズ ではなく subBatchSize × 平均サイズ に抑えられる。
メリット
- メモリ効率: 大量データを一気に転送せず、サブバッチ単位で送るため structured clone のメモリ圧迫を回避できる
- 早期エラー検出: サブバッチごとにタイムアウトを張るため、巨大ファイルや無限ループを早期に検出できる
- 進捗可視化: Worker別進捗を集約して UI に反映でき、ユーザー体験が向上する
- 環境非依存: ブラウザの Web Worker と Node.js の worker_threads 両方で動作する
注意点
サブバッチサイズとタイムアウト値はワークロード依存で調整が必要。小さすぎるとオーバーヘッドが増え、大きすぎるとメモリ効率が落ちる。ファイルパーサーなら 1000-2000 件、画像変換なら 100-500 件が目安。
Worker側のプロトコル実装(sub-batch / flush / result の処理)を忘れるとデッドロックするため、テンプレートを用意しておくこと。
応用
- Node.js の大量ファイルパース(TypeScript、Markdown、JSON など)
- ブラウザでの画像一括変換(リサイズ、フォーマット変換)
- CPU ヘビーなバッチジョブ(暗号化、圧縮、データ集計)
- 大規模データセットの前処理パイプライン
hsb.horse