대량의 관계 데이터를 데이터베이스로 가져올 때, 전체 데이터를 일괄 COPY로 처리하면 스키마 제약 조건 위반으로 전체가 실패할 수 있습니다. 먼저 bulk import를 시도하고, 실패한 행만 개별 insert로 폴백하면 속도와 성공률을 모두 달성할 수 있습니다.
코드
import fs from 'fs/promises';import { createReadStream } from 'fs';import { createInterface } from 'readline';import path from 'path';import kuzu from 'kuzu';
interface ImportStats { total: number; bulkSuccess: number; individualInserts: number; failed: number;}
/** * CSV에서 관계 데이터를 Bulk Import하고 실패한 행은 개별 Insert로 폴백 */export async function importRelationshipsWithFallback( conn: kuzu.Connection, csvPath: string, tableName: string): Promise<ImportStats> { const stats: ImportStats = { total: 0, bulkSuccess: 0, individualInserts: 0, failed: 0 };
try { // 먼저 Bulk COPY 시도 (가장 빠름) const copyQuery = `COPY ${tableName} FROM '${csvPath}' (HEADER=true)`; await conn.query(copyQuery);
// 성공하면 행 수 계산 const countResult = await conn.query(`MATCH ()-[r:${tableName}]->() RETURN count(r) AS cnt`); stats.bulkSuccess = countResult.getNext()?.cnt || 0; stats.total = stats.bulkSuccess;
return stats; } catch (bulkError) { console.warn(`Bulk COPY 실패, 개별 insert로 폴백: ${bulkError}`);
// Bulk가 실패하면 개별 Insert로 폴백 return await importIndividually(conn, csvPath, tableName, stats); }}
/** * CSV를 한 줄씩 읽어 개별적으로 Insert */async function importIndividually( conn: kuzu.Connection, csvPath: string, tableName: string, stats: ImportStats): Promise<ImportStats> { const fileStream = createReadStream(csvPath); const rl = createInterface({ input: fileStream, crlfDelay: Infinity });
let headers: string[] = []; let isFirstLine = true;
for await (const line of rl) { if (isFirstLine) { // 헤더 행 파싱 headers = line.split(',').map(h => h.trim()); isFirstLine = false; continue; }
stats.total++; const values = line.split(',').map(v => v.trim());
try { // 개별 INSERT 실행 const insertQuery = buildInsertQuery(tableName, headers, values); await conn.query(insertQuery); stats.individualInserts++; } catch (insertError) { // 스키마 제약 조건에 맞지 않는 행은 건너뜀 console.warn(`행 삽입 실패: ${line}`, insertError); stats.failed++; } }
return stats;}
/** * INSERT 쿼리 생성 */function buildInsertQuery( tableName: string, headers: string[], values: string[]): string { const columns = headers.join(', '); const valuePlaceholders = values.map(v => `'${v.replace(/'/g, "''")}'`).join(', '); return `INSERT INTO ${tableName} (${columns}) VALUES (${valuePlaceholders})`;}사용 예시
// Kuzu DB Connection 가져오기const db = new kuzu.Database('./my-graph.db');const conn = new kuzu.Connection(db);
// 관계 데이터 가져오기const stats = await importRelationshipsWithFallback( conn, './data/relationships.csv', 'KNOWS');
console.log('가져오기 완료:');console.log(` 전체: ${stats.total}`);console.log(` Bulk 성공: ${stats.bulkSuccess}`);console.log(` 개별 insert: ${stats.individualInserts}`);console.log(` 실패: ${stats.failed}`);작동 원리
- 먼저
COPY명령으로 모든 데이터를 일괄 가져오기 (가장 빠름) COPY가 실패하면 CSV를 한 줄씩 읽어 개별적으로INSERT- 개별
INSERT에서 실패한 행은failed카운트에 기록하고 건너뛰기 - 최종 통계 정보 반환
Bulk path를 기본으로 사용하면 대부분의 정상 데이터는 빠르게 처리되고, 문제가 있는 데이터만 개별 폴백으로 처리됩니다.
장점
- 속도와 성공률의 균형: 정상 데이터는 bulk로 빠르게 처리, 문제 데이터는 개별 처리
- 부분 성공: 전체가 실패하는 대신 가능한 부분만 가져오기
- 진단 가능: 실패한 각 행에 대한 개별 로깅
- 운영 안정성: 스키마 제약 조건을 위반하는 데이터가 있어도 전체 시스템이 중단되지 않음
주의사항
개별 INSERT는 행 수가 많으면 느리므로, 가능하면 CSV를 사전 검증하여 잘못된 행을 제거하는 것이 더 효율적입니다. 또한 트랜잭션 전략(전체를 하나로 할지, 개별 커밋할지)은 데이터베이스 특성과 요구사항에 맞춰야 합니다.
응용 분야
- CSV 수집: 정기적으로 외부 데이터를 가져오는 ETL 파이프라인
- Graph DB Import: 대규모 관계 데이터로 Knowledge Graph 구축
- Schema-sensitive Bulk Loader: 제약 조건이 엄격한 DB에 유연하게 데이터 로드
- 데이터 마이그레이션: 서로 다른 데이터베이스 간 데이터 전송 시 오류 방지
hsb.horse