대량의 아이템을 워커로 병렬 처리할 때 모든 데이터를 한 번에 보내면 structured clone의 메모리 비용으로 인해 압박이 발생한다. 서브 배치로 나눠 단계적으로 전송하고 각 서브 배치에 타임아웃을 설정하면 메모리 효율과 오류 감지를 모두 달성할 수 있다.
코드
/** * 워커 풀 설정 */export interface WorkerPoolOptions { /** 워커 스크립트 URL */ workerUrl: string | URL /** 풀 크기 (생략 시 CPU 코어 수 기반 자동 결정) */ poolSize?: number /** 서브 배치당 최대 아이템 수 (기본값: 1500) */ subBatchSize?: number /** 서브 배치 타임아웃 (밀리초, 기본값: 30000) */ subBatchTimeout?: number}
/** * 워커 풀 인터페이스 */export interface WorkerPool<TInput, TResult> { /** 아이템을 워커 풀에 배분하여 처리 */ dispatch( items: TInput[], onProgress?: (processed: number) => void ): Promise<TResult[]> /** 모든 워커 종료 */ terminate(): Promise<void> /** 풀 내 워커 수 */ readonly size: number}
/** * 워커 프로토콜: 서브 배치 전송 */interface SubBatchMessage<T> { type: 'sub-batch' items: T[]}
/** * 워커 프로토콜: 플러시 요청 */interface FlushMessage { type: 'flush'}
/** * 워커 프로토콜: 서브 배치 완료 응답 */interface SubBatchDoneMessage { type: 'sub-batch-done'}
/** * 워커 프로토콜: 진행 상황 보고 */interface ProgressMessage { type: 'progress' processed: number}
/** * 워커 프로토콜: 오류 알림 */interface ErrorMessage { type: 'error' error: string}
/** * 워커 프로토콜: 최종 결과 */interface ResultMessage<T> { type: 'result' data: T}
type WorkerMessage<T> = | SubBatchDoneMessage | ProgressMessage | ErrorMessage | ResultMessage<T>
/** * 워커 풀 생성 */export const createWorkerPool = <TInput, TResult>( options: WorkerPoolOptions): WorkerPool<TInput, TResult> => { const { workerUrl, poolSize, subBatchSize = 1500, subBatchTimeout = 30000, } = options
// 풀 크기 결정 (브라우저/Node.js 호환) const size = poolSize ?? getDefaultPoolSize() const workers: Worker[] = []
// 워커 생성 for (let i = 0; i < size; i++) { workers.push(createWorker(workerUrl)) }
const dispatch = ( items: TInput[], onProgress?: (processed: number) => void ): Promise<TResult[]> => { if (items.length === 0) return Promise.resolve([])
// 아이템을 워커 수로 분할 const chunkSize = Math.ceil(items.length / size) const chunks: TInput[][] = [] for (let i = 0; i < items.length; i += chunkSize) { chunks.push(items.slice(i, i + chunkSize)) }
// 워커별 진행 상황 카운터 const workerProgress = new Array(chunks.length).fill(0)
// 각 워커에 청크 배분 const promises = chunks.map((chunk, workerIndex) => { const worker = workers[workerIndex] return processChunk<TInput, TResult>( worker, chunk, workerIndex, subBatchSize, subBatchTimeout, (processed) => { workerProgress[workerIndex] = processed if (onProgress) { const total = workerProgress.reduce((a, b) => a + b, 0) onProgress(total) } } ) })
return Promise.all(promises) }
const terminate = async (): Promise<void> => { await Promise.all(workers.map((w) => w.terminate())) workers.length = 0 }
return { dispatch, terminate, size }}
/** * 단일 워커에서 청크 처리 (서브 배치 분할) */const processChunk = <TInput, TResult>( worker: Worker, chunk: TInput[], workerIndex: number, subBatchSize: number, subBatchTimeout: number, onProgress: (processed: number) => void): Promise<TResult> => { return new Promise<TResult>((resolve, reject) => { let settled = false let subBatchTimer: ReturnType<typeof setTimeout> | null = null let subBatchIndex = 0
const cleanup = () => { if (subBatchTimer) clearTimeout(subBatchTimer) worker.removeEventListener('message', handler) worker.removeEventListener('error', errorHandler) }
const resetSubBatchTimer = () => { if (subBatchTimer) clearTimeout(subBatchTimer) subBatchTimer = setTimeout(() => { if (!settled) { settled = true cleanup() reject( new Error( `Worker ${workerIndex} sub-batch timed out after ${ subBatchTimeout / 1000 }s (chunk: ${chunk.length} items)` ) ) } }, subBatchTimeout) }
const sendNextSubBatch = () => { const start = subBatchIndex * subBatchSize if (start >= chunk.length) { // 모든 서브 배치 전송 완료, 플러시 요청 worker.postMessage({ type: 'flush' } as FlushMessage) return } const subBatch = chunk.slice(start, start + subBatchSize) subBatchIndex++ resetSubBatchTimer() worker.postMessage({ type: 'sub-batch', items: subBatch, } as SubBatchMessage<TInput>) }
const handler = (event: MessageEvent<WorkerMessage<TResult>>) => { if (settled) return const msg = event.data
if (msg.type === 'progress') { onProgress(msg.processed) } else if (msg.type === 'sub-batch-done') { sendNextSubBatch() } else if (msg.type === 'error') { settled = true cleanup() reject(new Error(`Worker ${workerIndex} error: ${msg.error}`)) } else if (msg.type === 'result') { settled = true cleanup() resolve(msg.data) } }
const errorHandler = (err: ErrorEvent) => { if (!settled) { settled = true cleanup() reject(err.error || err) } }
worker.addEventListener('message', handler) worker.addEventListener('error', errorHandler) sendNextSubBatch() })}
/** * 워커 생성 (브라우저/Node.js 호환) */const createWorker = (workerUrl: string | URL): Worker => { if (typeof Worker !== 'undefined') { // 브라우저 환경 return new Worker(workerUrl, { type: 'module' }) } // Node.js 환경 // @ts-expect-error - Node.js Worker는 require('node:worker_threads').Worker const { Worker: NodeWorker } = require('node:worker_threads') return new NodeWorker(workerUrl)}
/** * 기본 풀 크기 가져오기 */const getDefaultPoolSize = (): number => { if (typeof navigator !== 'undefined' && navigator.hardwareConcurrency) { // 브라우저 환경 return Math.min(8, Math.max(1, navigator.hardwareConcurrency - 1)) } // Node.js 환경 try { // @ts-expect-error - Node.js os module const os = require('node:os') return Math.min(8, Math.max(1, os.cpus().length - 1)) } catch { return 4 // 폴백 }}사용 예시
메인 스레드
// 워커 풀 생성const pool = createWorkerPool<string, ParsedFile>({ workerUrl: new URL('./parser.worker.js', import.meta.url), poolSize: 4, subBatchSize: 1000, subBatchTimeout: 20000,})
// 대량 파일 처리const files = ['file1.txt', 'file2.txt', /* ... 10000 files */]const results = await pool.dispatch(files, (processed) => { console.log(`Processed: ${processed}/${files.length}`)})
// 종료await pool.terminate()워커 측 (parser.worker.ts)
// 서브 배치별 누적 버퍼let accumulated: ParsedFile[] = []let totalProcessed = 0
self.addEventListener('message', async (event) => { const msg = event.data
if (msg.type === 'sub-batch') { // 서브 배치 처리 for (const filePath of msg.items) { const parsed = await parseFile(filePath) accumulated.push(parsed) totalProcessed++ }
// 진행 상황 보고 self.postMessage({ type: 'progress', processed: totalProcessed })
// 서브 배치 완료 알림 self.postMessage({ type: 'sub-batch-done' }) } else if (msg.type === 'flush') { // 최종 결과 반환 self.postMessage({ type: 'result', data: accumulated }) accumulated = [] totalProcessed = 0 }})
const parseFile = async (path: string): Promise<ParsedFile> => { // 실제 파싱 로직 return { path, content: '...' }}동작 원리
- 3단계 분할:
전체 아이템 → 워커 청크 → 서브 배치의 3단계로 처리를 분할 - 서브 배치 전송: 각 워커에 한 번에 보내는 것은
subBatchSize개까지, 처리 완료 후 다음을 전송 - 타임아웃 관리: 서브 배치마다 독립 타이머를 설정하여 거대 파일 등 이상 케이스를 조기 감지
- 진행 상황 집계: 워커별 처리 수를 카운트하여 전체 진행 상황을 실시간 집계
- 결과 플러시: 모든 서브 배치 전송 완료 후
flush메시지로 워커에 최종 결과 반환 요청
서브 배치 단위로 전송하면 structured clone의 메모리 피크를 전체 아이템 수 × 평균 크기가 아닌 subBatchSize × 평균 크기로 억제할 수 있다.
장점
- 메모리 효율성: 대량 데이터를 한 번에 전송하지 않고 서브 배치 단위로 보내 structured clone의 메모리 압박을 회피
- 조기 오류 감지: 서브 배치마다 타임아웃을 설정하여 거대 파일이나 무한 루프를 조기에 감지
- 진행 상황 시각화: 워커별 진행 상황을 집계하여 UI에 반영 가능, 사용자 경험 향상
- 환경 독립성: 브라우저 Web Worker와 Node.js worker_threads 모두에서 작동
주의사항
서브 배치 크기와 타임아웃 값은 워크로드에 따라 조정이 필요하다. 너무 작으면 오버헤드가 증가하고, 너무 크면 메모리 효율이 떨어진다. 파일 파서는 1000-2000개, 이미지 변환은 100-500개가 적정 수준이다.
워커 측 프로토콜 구현(sub-batch / flush / result 처리)을 빠뜨리면 데드락이 발생하므로 템플릿을 미리 준비할 것.
응용
- Node.js 대량 파일 파싱 (TypeScript, Markdown, JSON 등)
- 브라우저 일괄 이미지 변환 (리사이즈, 포맷 변환)
- CPU 집약적 배치 작업 (암호화, 압축, 데이터 집계)
- 대규모 데이터셋 전처리 파이프라인
hsb.horse