logo hsb.horse
← スニペット一覧に戻る

Snippets

サブバッチ分割とタイムアウト管理を持つWorkerプール

大量データを小さなサブバッチに分けてWorkerへ送信し、メモリ圧迫を抑えつつ各バッチにタイムアウトを設定して異常を早期検出するパターン。

公開日: 更新日:

大量のアイテムをWorkerで並列処理する際、全データを一気に送ると structured clone のメモリコストで圧迫される。サブバッチに分割して段階的に送信し、各サブバッチにタイムアウトを張ることで、メモリ効率とエラー検出を両立する。

コード

/**
* Workerプール設定
*/
export interface WorkerPoolOptions {
/** Worker スクリプトのURL */
workerUrl: string | URL
/** プールサイズ(省略時はCPUコア数に基づき自動決定) */
poolSize?: number
/** サブバッチあたりの最大アイテム数(デフォルト: 1500) */
subBatchSize?: number
/** サブバッチのタイムアウト(ミリ秒、デフォルト: 30000) */
subBatchTimeout?: number
}
/**
* Workerプールインターフェース
*/
export interface WorkerPool<TInput, TResult> {
/** アイテムをWorkerプールに配分して処理 */
dispatch(
items: TInput[],
onProgress?: (processed: number) => void
): Promise<TResult[]>
/** 全Workerを終了 */
terminate(): Promise<void>
/** プール内のWorker数 */
readonly size: number
}
/**
* Workerプロトコル: サブバッチ送信
*/
interface SubBatchMessage<T> {
type: 'sub-batch'
items: T[]
}
/**
* Workerプロトコル: フラッシュ要求
*/
interface FlushMessage {
type: 'flush'
}
/**
* Workerプロトコル: サブバッチ完了応答
*/
interface SubBatchDoneMessage {
type: 'sub-batch-done'
}
/**
* Workerプロトコル: 進捗報告
*/
interface ProgressMessage {
type: 'progress'
processed: number
}
/**
* Workerプロトコル: エラー通知
*/
interface ErrorMessage {
type: 'error'
error: string
}
/**
* Workerプロトコル: 最終結果
*/
interface ResultMessage<T> {
type: 'result'
data: T
}
type WorkerMessage<T> =
| SubBatchDoneMessage
| ProgressMessage
| ErrorMessage
| ResultMessage<T>
/**
* Workerプールを作成
*/
export const createWorkerPool = <TInput, TResult>(
options: WorkerPoolOptions
): WorkerPool<TInput, TResult> => {
const {
workerUrl,
poolSize,
subBatchSize = 1500,
subBatchTimeout = 30000,
} = options
// プールサイズを決定(ブラウザ/Node.js両対応)
const size = poolSize ?? getDefaultPoolSize()
const workers: Worker[] = []
// Workerを生成
for (let i = 0; i < size; i++) {
workers.push(createWorker(workerUrl))
}
const dispatch = (
items: TInput[],
onProgress?: (processed: number) => void
): Promise<TResult[]> => {
if (items.length === 0) return Promise.resolve([])
// アイテムをWorker数で分割
const chunkSize = Math.ceil(items.length / size)
const chunks: TInput[][] = []
for (let i = 0; i < items.length; i += chunkSize) {
chunks.push(items.slice(i, i + chunkSize))
}
// Worker別進捗カウンター
const workerProgress = new Array(chunks.length).fill(0)
// 各Workerにチャンクを配分
const promises = chunks.map((chunk, workerIndex) => {
const worker = workers[workerIndex]
return processChunk<TInput, TResult>(
worker,
chunk,
workerIndex,
subBatchSize,
subBatchTimeout,
(processed) => {
workerProgress[workerIndex] = processed
if (onProgress) {
const total = workerProgress.reduce((a, b) => a + b, 0)
onProgress(total)
}
}
)
})
return Promise.all(promises)
}
const terminate = async (): Promise<void> => {
await Promise.all(workers.map((w) => w.terminate()))
workers.length = 0
}
return { dispatch, terminate, size }
}
/**
* 単一Workerでチャンクを処理(サブバッチ分割)
*/
const processChunk = <TInput, TResult>(
worker: Worker,
chunk: TInput[],
workerIndex: number,
subBatchSize: number,
subBatchTimeout: number,
onProgress: (processed: number) => void
): Promise<TResult> => {
return new Promise<TResult>((resolve, reject) => {
let settled = false
let subBatchTimer: ReturnType<typeof setTimeout> | null = null
let subBatchIndex = 0
const cleanup = () => {
if (subBatchTimer) clearTimeout(subBatchTimer)
worker.removeEventListener('message', handler)
worker.removeEventListener('error', errorHandler)
}
const resetSubBatchTimer = () => {
if (subBatchTimer) clearTimeout(subBatchTimer)
subBatchTimer = setTimeout(() => {
if (!settled) {
settled = true
cleanup()
reject(
new Error(
`Worker ${workerIndex} sub-batch timed out after ${
subBatchTimeout / 1000
}s (chunk: ${chunk.length} items)`
)
)
}
}, subBatchTimeout)
}
const sendNextSubBatch = () => {
const start = subBatchIndex * subBatchSize
if (start >= chunk.length) {
// 全サブバッチ送信完了、フラッシュを要求
worker.postMessage({ type: 'flush' } as FlushMessage)
return
}
const subBatch = chunk.slice(start, start + subBatchSize)
subBatchIndex++
resetSubBatchTimer()
worker.postMessage({
type: 'sub-batch',
items: subBatch,
} as SubBatchMessage<TInput>)
}
const handler = (event: MessageEvent<WorkerMessage<TResult>>) => {
if (settled) return
const msg = event.data
if (msg.type === 'progress') {
onProgress(msg.processed)
} else if (msg.type === 'sub-batch-done') {
sendNextSubBatch()
} else if (msg.type === 'error') {
settled = true
cleanup()
reject(new Error(`Worker ${workerIndex} error: ${msg.error}`))
} else if (msg.type === 'result') {
settled = true
cleanup()
resolve(msg.data)
}
}
const errorHandler = (err: ErrorEvent) => {
if (!settled) {
settled = true
cleanup()
reject(err.error || err)
}
}
worker.addEventListener('message', handler)
worker.addEventListener('error', errorHandler)
sendNextSubBatch()
})
}
/**
* Workerを生成(ブラウザ/Node.js両対応)
*/
const createWorker = (workerUrl: string | URL): Worker => {
if (typeof Worker !== 'undefined') {
// ブラウザ環境
return new Worker(workerUrl, { type: 'module' })
}
// Node.js環境
// @ts-expect-error - Node.js Worker は require('node:worker_threads').Worker
const { Worker: NodeWorker } = require('node:worker_threads')
return new NodeWorker(workerUrl)
}
/**
* デフォルトプールサイズを取得
*/
const getDefaultPoolSize = (): number => {
if (typeof navigator !== 'undefined' && navigator.hardwareConcurrency) {
// ブラウザ環境
return Math.min(8, Math.max(1, navigator.hardwareConcurrency - 1))
}
// Node.js環境
try {
// @ts-expect-error - Node.js os module
const os = require('node:os')
return Math.min(8, Math.max(1, os.cpus().length - 1))
} catch {
return 4 // フォールバック
}
}

使用例

メインスレッド側

// Workerプールを作成
const pool = createWorkerPool<string, ParsedFile>({
workerUrl: new URL('./parser.worker.js', import.meta.url),
poolSize: 4,
subBatchSize: 1000,
subBatchTimeout: 20000,
})
// 大量ファイルを処理
const files = ['file1.txt', 'file2.txt', /* ... 10000 files */]
const results = await pool.dispatch(files, (processed) => {
console.log(`Processed: ${processed}/${files.length}`)
})
// 終了
await pool.terminate()

Worker側(parser.worker.ts)

// サブバッチごとの蓄積バッファ
let accumulated: ParsedFile[] = []
let totalProcessed = 0
self.addEventListener('message', async (event) => {
const msg = event.data
if (msg.type === 'sub-batch') {
// サブバッチを処理
for (const filePath of msg.items) {
const parsed = await parseFile(filePath)
accumulated.push(parsed)
totalProcessed++
}
// 進捗報告
self.postMessage({ type: 'progress', processed: totalProcessed })
// サブバッチ完了を通知
self.postMessage({ type: 'sub-batch-done' })
} else if (msg.type === 'flush') {
// 最終結果を返す
self.postMessage({ type: 'result', data: accumulated })
accumulated = []
totalProcessed = 0
}
})
const parseFile = async (path: string): Promise<ParsedFile> => {
// 実際のパース処理
return { path, content: '...' }
}

仕組み

  1. 3段階分割: 全アイテム → Workerチャンク → サブバッチ の3段階で処理を分割する
  2. サブバッチ送信: 各Workerへ一度に送るのは subBatchSize 個まで、処理完了後に次を送る
  3. タイムアウト管理: サブバッチごとに独立したタイマーを設定し、巨大ファイルなど異常ケースを早期検出する
  4. 進捗集約: Worker別に処理数をカウントし、全体進捗をリアルタイムで集約する
  5. 結果フラッシュ: 全サブバッチ送信完了後、flush メッセージでWorkerに最終結果の返却を要求する

サブバッチ単位で送信することで、structured clone のメモリピークを 全アイテム数 × 平均サイズ ではなく subBatchSize × 平均サイズ に抑えられる。

メリット

  • メモリ効率: 大量データを一気に転送せず、サブバッチ単位で送るため structured clone のメモリ圧迫を回避できる
  • 早期エラー検出: サブバッチごとにタイムアウトを張るため、巨大ファイルや無限ループを早期に検出できる
  • 進捗可視化: Worker別進捗を集約して UI に反映でき、ユーザー体験が向上する
  • 環境非依存: ブラウザの Web Worker と Node.js の worker_threads 両方で動作する

注意点

サブバッチサイズとタイムアウト値はワークロード依存で調整が必要。小さすぎるとオーバーヘッドが増え、大きすぎるとメモリ効率が落ちる。ファイルパーサーなら 1000-2000 件、画像変換なら 100-500 件が目安。

Worker側のプロトコル実装(sub-batch / flush / result の処理)を忘れるとデッドロックするため、テンプレートを用意しておくこと。

応用

  • Node.js の大量ファイルパース(TypeScript、Markdown、JSON など)
  • ブラウザでの画像一括変換(リサイズ、フォーマット変換)
  • CPU ヘビーなバッチジョブ(暗号化、圧縮、データ集計)
  • 大規模データセットの前処理パイプライン