logo hsb.horse
← 스니펫 목록으로 돌아가기

Snippets

서브 배치와 배치별 타임아웃을 갖춘 워커 풀

대량 데이터를 작은 서브 배치로 나눠 워커에 전송하여 메모리 압박을 줄이고 각 배치에 타임아웃을 설정해 이상을 조기 감지하는 패턴.

게시일: 수정일:

대량의 아이템을 워커로 병렬 처리할 때 모든 데이터를 한 번에 보내면 structured clone의 메모리 비용으로 인해 압박이 발생한다. 서브 배치로 나눠 단계적으로 전송하고 각 서브 배치에 타임아웃을 설정하면 메모리 효율과 오류 감지를 모두 달성할 수 있다.

코드

/**
* 워커 풀 설정
*/
export interface WorkerPoolOptions {
/** 워커 스크립트 URL */
workerUrl: string | URL
/** 풀 크기 (생략 시 CPU 코어 수 기반 자동 결정) */
poolSize?: number
/** 서브 배치당 최대 아이템 수 (기본값: 1500) */
subBatchSize?: number
/** 서브 배치 타임아웃 (밀리초, 기본값: 30000) */
subBatchTimeout?: number
}
/**
* 워커 풀 인터페이스
*/
export interface WorkerPool<TInput, TResult> {
/** 아이템을 워커 풀에 배분하여 처리 */
dispatch(
items: TInput[],
onProgress?: (processed: number) => void
): Promise<TResult[]>
/** 모든 워커 종료 */
terminate(): Promise<void>
/** 풀 내 워커 수 */
readonly size: number
}
/**
* 워커 프로토콜: 서브 배치 전송
*/
interface SubBatchMessage<T> {
type: 'sub-batch'
items: T[]
}
/**
* 워커 프로토콜: 플러시 요청
*/
interface FlushMessage {
type: 'flush'
}
/**
* 워커 프로토콜: 서브 배치 완료 응답
*/
interface SubBatchDoneMessage {
type: 'sub-batch-done'
}
/**
* 워커 프로토콜: 진행 상황 보고
*/
interface ProgressMessage {
type: 'progress'
processed: number
}
/**
* 워커 프로토콜: 오류 알림
*/
interface ErrorMessage {
type: 'error'
error: string
}
/**
* 워커 프로토콜: 최종 결과
*/
interface ResultMessage<T> {
type: 'result'
data: T
}
type WorkerMessage<T> =
| SubBatchDoneMessage
| ProgressMessage
| ErrorMessage
| ResultMessage<T>
/**
* 워커 풀 생성
*/
export const createWorkerPool = <TInput, TResult>(
options: WorkerPoolOptions
): WorkerPool<TInput, TResult> => {
const {
workerUrl,
poolSize,
subBatchSize = 1500,
subBatchTimeout = 30000,
} = options
// 풀 크기 결정 (브라우저/Node.js 호환)
const size = poolSize ?? getDefaultPoolSize()
const workers: Worker[] = []
// 워커 생성
for (let i = 0; i < size; i++) {
workers.push(createWorker(workerUrl))
}
const dispatch = (
items: TInput[],
onProgress?: (processed: number) => void
): Promise<TResult[]> => {
if (items.length === 0) return Promise.resolve([])
// 아이템을 워커 수로 분할
const chunkSize = Math.ceil(items.length / size)
const chunks: TInput[][] = []
for (let i = 0; i < items.length; i += chunkSize) {
chunks.push(items.slice(i, i + chunkSize))
}
// 워커별 진행 상황 카운터
const workerProgress = new Array(chunks.length).fill(0)
// 각 워커에 청크 배분
const promises = chunks.map((chunk, workerIndex) => {
const worker = workers[workerIndex]
return processChunk<TInput, TResult>(
worker,
chunk,
workerIndex,
subBatchSize,
subBatchTimeout,
(processed) => {
workerProgress[workerIndex] = processed
if (onProgress) {
const total = workerProgress.reduce((a, b) => a + b, 0)
onProgress(total)
}
}
)
})
return Promise.all(promises)
}
const terminate = async (): Promise<void> => {
await Promise.all(workers.map((w) => w.terminate()))
workers.length = 0
}
return { dispatch, terminate, size }
}
/**
* 단일 워커에서 청크 처리 (서브 배치 분할)
*/
const processChunk = <TInput, TResult>(
worker: Worker,
chunk: TInput[],
workerIndex: number,
subBatchSize: number,
subBatchTimeout: number,
onProgress: (processed: number) => void
): Promise<TResult> => {
return new Promise<TResult>((resolve, reject) => {
let settled = false
let subBatchTimer: ReturnType<typeof setTimeout> | null = null
let subBatchIndex = 0
const cleanup = () => {
if (subBatchTimer) clearTimeout(subBatchTimer)
worker.removeEventListener('message', handler)
worker.removeEventListener('error', errorHandler)
}
const resetSubBatchTimer = () => {
if (subBatchTimer) clearTimeout(subBatchTimer)
subBatchTimer = setTimeout(() => {
if (!settled) {
settled = true
cleanup()
reject(
new Error(
`Worker ${workerIndex} sub-batch timed out after ${
subBatchTimeout / 1000
}s (chunk: ${chunk.length} items)`
)
)
}
}, subBatchTimeout)
}
const sendNextSubBatch = () => {
const start = subBatchIndex * subBatchSize
if (start >= chunk.length) {
// 모든 서브 배치 전송 완료, 플러시 요청
worker.postMessage({ type: 'flush' } as FlushMessage)
return
}
const subBatch = chunk.slice(start, start + subBatchSize)
subBatchIndex++
resetSubBatchTimer()
worker.postMessage({
type: 'sub-batch',
items: subBatch,
} as SubBatchMessage<TInput>)
}
const handler = (event: MessageEvent<WorkerMessage<TResult>>) => {
if (settled) return
const msg = event.data
if (msg.type === 'progress') {
onProgress(msg.processed)
} else if (msg.type === 'sub-batch-done') {
sendNextSubBatch()
} else if (msg.type === 'error') {
settled = true
cleanup()
reject(new Error(`Worker ${workerIndex} error: ${msg.error}`))
} else if (msg.type === 'result') {
settled = true
cleanup()
resolve(msg.data)
}
}
const errorHandler = (err: ErrorEvent) => {
if (!settled) {
settled = true
cleanup()
reject(err.error || err)
}
}
worker.addEventListener('message', handler)
worker.addEventListener('error', errorHandler)
sendNextSubBatch()
})
}
/**
* 워커 생성 (브라우저/Node.js 호환)
*/
const createWorker = (workerUrl: string | URL): Worker => {
if (typeof Worker !== 'undefined') {
// 브라우저 환경
return new Worker(workerUrl, { type: 'module' })
}
// Node.js 환경
// @ts-expect-error - Node.js Worker는 require('node:worker_threads').Worker
const { Worker: NodeWorker } = require('node:worker_threads')
return new NodeWorker(workerUrl)
}
/**
* 기본 풀 크기 가져오기
*/
const getDefaultPoolSize = (): number => {
if (typeof navigator !== 'undefined' && navigator.hardwareConcurrency) {
// 브라우저 환경
return Math.min(8, Math.max(1, navigator.hardwareConcurrency - 1))
}
// Node.js 환경
try {
// @ts-expect-error - Node.js os module
const os = require('node:os')
return Math.min(8, Math.max(1, os.cpus().length - 1))
} catch {
return 4 // 폴백
}
}

사용 예시

메인 스레드

// 워커 풀 생성
const pool = createWorkerPool<string, ParsedFile>({
workerUrl: new URL('./parser.worker.js', import.meta.url),
poolSize: 4,
subBatchSize: 1000,
subBatchTimeout: 20000,
})
// 대량 파일 처리
const files = ['file1.txt', 'file2.txt', /* ... 10000 files */]
const results = await pool.dispatch(files, (processed) => {
console.log(`Processed: ${processed}/${files.length}`)
})
// 종료
await pool.terminate()

워커 측 (parser.worker.ts)

// 서브 배치별 누적 버퍼
let accumulated: ParsedFile[] = []
let totalProcessed = 0
self.addEventListener('message', async (event) => {
const msg = event.data
if (msg.type === 'sub-batch') {
// 서브 배치 처리
for (const filePath of msg.items) {
const parsed = await parseFile(filePath)
accumulated.push(parsed)
totalProcessed++
}
// 진행 상황 보고
self.postMessage({ type: 'progress', processed: totalProcessed })
// 서브 배치 완료 알림
self.postMessage({ type: 'sub-batch-done' })
} else if (msg.type === 'flush') {
// 최종 결과 반환
self.postMessage({ type: 'result', data: accumulated })
accumulated = []
totalProcessed = 0
}
})
const parseFile = async (path: string): Promise<ParsedFile> => {
// 실제 파싱 로직
return { path, content: '...' }
}

동작 원리

  1. 3단계 분할: 전체 아이템 → 워커 청크 → 서브 배치의 3단계로 처리를 분할
  2. 서브 배치 전송: 각 워커에 한 번에 보내는 것은 subBatchSize개까지, 처리 완료 후 다음을 전송
  3. 타임아웃 관리: 서브 배치마다 독립 타이머를 설정하여 거대 파일 등 이상 케이스를 조기 감지
  4. 진행 상황 집계: 워커별 처리 수를 카운트하여 전체 진행 상황을 실시간 집계
  5. 결과 플러시: 모든 서브 배치 전송 완료 후 flush 메시지로 워커에 최종 결과 반환 요청

서브 배치 단위로 전송하면 structured clone의 메모리 피크를 전체 아이템 수 × 평균 크기가 아닌 subBatchSize × 평균 크기로 억제할 수 있다.

장점

  • 메모리 효율성: 대량 데이터를 한 번에 전송하지 않고 서브 배치 단위로 보내 structured clone의 메모리 압박을 회피
  • 조기 오류 감지: 서브 배치마다 타임아웃을 설정하여 거대 파일이나 무한 루프를 조기에 감지
  • 진행 상황 시각화: 워커별 진행 상황을 집계하여 UI에 반영 가능, 사용자 경험 향상
  • 환경 독립성: 브라우저 Web Worker와 Node.js worker_threads 모두에서 작동

주의사항

서브 배치 크기와 타임아웃 값은 워크로드에 따라 조정이 필요하다. 너무 작으면 오버헤드가 증가하고, 너무 크면 메모리 효율이 떨어진다. 파일 파서는 1000-2000개, 이미지 변환은 100-500개가 적정 수준이다.

워커 측 프로토콜 구현(sub-batch / flush / result 처리)을 빠뜨리면 데드락이 발생하므로 템플릿을 미리 준비할 것.

응용

  • Node.js 대량 파일 파싱 (TypeScript, Markdown, JSON 등)
  • 브라우저 일괄 이미지 변환 (리사이즈, 포맷 변환)
  • CPU 집약적 배치 작업 (암호화, 압축, 데이터 집계)
  • 대규모 데이터셋 전처리 파이프라인