logo hsb.horse
← 스니펫 목록으로 돌아가기

Snippets

Bulk Import with Fallback from Pairwise COPY to Individual Inserts

대량 관계 데이터를 먼저 bulk import하고, 스키마 제약 조건에 맞지 않는 행만 개별 insert로 처리하여 속도와 성공률을 모두 달성하는 패턴입니다.

게시일: 수정일:

대량의 관계 데이터를 데이터베이스로 가져올 때, 전체 데이터를 일괄 COPY로 처리하면 스키마 제약 조건 위반으로 전체가 실패할 수 있습니다. 먼저 bulk import를 시도하고, 실패한 행만 개별 insert로 폴백하면 속도와 성공률을 모두 달성할 수 있습니다.

코드

import fs from 'fs/promises';
import { createReadStream } from 'fs';
import { createInterface } from 'readline';
import path from 'path';
import kuzu from 'kuzu';
interface ImportStats {
total: number;
bulkSuccess: number;
individualInserts: number;
failed: number;
}
/**
* CSV에서 관계 데이터를 Bulk Import하고 실패한 행은 개별 Insert로 폴백
*/
export async function importRelationshipsWithFallback(
conn: kuzu.Connection,
csvPath: string,
tableName: string
): Promise<ImportStats> {
const stats: ImportStats = {
total: 0,
bulkSuccess: 0,
individualInserts: 0,
failed: 0
};
try {
// 먼저 Bulk COPY 시도 (가장 빠름)
const copyQuery = `COPY ${tableName} FROM '${csvPath}' (HEADER=true)`;
await conn.query(copyQuery);
// 성공하면 행 수 계산
const countResult = await conn.query(`MATCH ()-[r:${tableName}]->() RETURN count(r) AS cnt`);
stats.bulkSuccess = countResult.getNext()?.cnt || 0;
stats.total = stats.bulkSuccess;
return stats;
} catch (bulkError) {
console.warn(`Bulk COPY 실패, 개별 insert로 폴백: ${bulkError}`);
// Bulk가 실패하면 개별 Insert로 폴백
return await importIndividually(conn, csvPath, tableName, stats);
}
}
/**
* CSV를 한 줄씩 읽어 개별적으로 Insert
*/
async function importIndividually(
conn: kuzu.Connection,
csvPath: string,
tableName: string,
stats: ImportStats
): Promise<ImportStats> {
const fileStream = createReadStream(csvPath);
const rl = createInterface({
input: fileStream,
crlfDelay: Infinity
});
let headers: string[] = [];
let isFirstLine = true;
for await (const line of rl) {
if (isFirstLine) {
// 헤더 행 파싱
headers = line.split(',').map(h => h.trim());
isFirstLine = false;
continue;
}
stats.total++;
const values = line.split(',').map(v => v.trim());
try {
// 개별 INSERT 실행
const insertQuery = buildInsertQuery(tableName, headers, values);
await conn.query(insertQuery);
stats.individualInserts++;
} catch (insertError) {
// 스키마 제약 조건에 맞지 않는 행은 건너뜀
console.warn(`행 삽입 실패: ${line}`, insertError);
stats.failed++;
}
}
return stats;
}
/**
* INSERT 쿼리 생성
*/
function buildInsertQuery(
tableName: string,
headers: string[],
values: string[]
): string {
const columns = headers.join(', ');
const valuePlaceholders = values.map(v => `'${v.replace(/'/g, "''")}'`).join(', ');
return `INSERT INTO ${tableName} (${columns}) VALUES (${valuePlaceholders})`;
}

사용 예시

// Kuzu DB Connection 가져오기
const db = new kuzu.Database('./my-graph.db');
const conn = new kuzu.Connection(db);
// 관계 데이터 가져오기
const stats = await importRelationshipsWithFallback(
conn,
'./data/relationships.csv',
'KNOWS'
);
console.log('가져오기 완료:');
console.log(` 전체: ${stats.total}`);
console.log(` Bulk 성공: ${stats.bulkSuccess}`);
console.log(` 개별 insert: ${stats.individualInserts}`);
console.log(` 실패: ${stats.failed}`);

작동 원리

  1. 먼저 COPY 명령으로 모든 데이터를 일괄 가져오기 (가장 빠름)
  2. COPY가 실패하면 CSV를 한 줄씩 읽어 개별적으로 INSERT
  3. 개별 INSERT에서 실패한 행은 failed 카운트에 기록하고 건너뛰기
  4. 최종 통계 정보 반환

Bulk path를 기본으로 사용하면 대부분의 정상 데이터는 빠르게 처리되고, 문제가 있는 데이터만 개별 폴백으로 처리됩니다.

장점

  • 속도와 성공률의 균형: 정상 데이터는 bulk로 빠르게 처리, 문제 데이터는 개별 처리
  • 부분 성공: 전체가 실패하는 대신 가능한 부분만 가져오기
  • 진단 가능: 실패한 각 행에 대한 개별 로깅
  • 운영 안정성: 스키마 제약 조건을 위반하는 데이터가 있어도 전체 시스템이 중단되지 않음

주의사항

개별 INSERT는 행 수가 많으면 느리므로, 가능하면 CSV를 사전 검증하여 잘못된 행을 제거하는 것이 더 효율적입니다. 또한 트랜잭션 전략(전체를 하나로 할지, 개별 커밋할지)은 데이터베이스 특성과 요구사항에 맞춰야 합니다.

응용 분야

  • CSV 수집: 정기적으로 외부 데이터를 가져오는 ETL 파이프라인
  • Graph DB Import: 대규모 관계 데이터로 Knowledge Graph 구축
  • Schema-sensitive Bulk Loader: 제약 조건이 엄격한 DB에 유연하게 데이터 로드
  • 데이터 마이그레이션: 서로 다른 데이터베이스 간 데이터 전송 시 오류 방지