大量の関係データを DB へインポートする際、全データを一括 COPY で流すと schema 制約違反で全体が失敗することがある。まず bulk import を試み、エラーが出た行だけ個別 insert へフォールバックすることで、成功率と速度を両立できる。
コード
import fs from 'fs/promises';import { createReadStream } from 'fs';import { createInterface } from 'readline';import path from 'path';import kuzu from 'kuzu';
interface ImportStats { total: number; bulkSuccess: number; individualInserts: number; failed: number;}
/** * CSV から関係データを Bulk Import し、失敗した行だけ個別 Insert へフォールバック */export async function importRelationshipsWithFallback( conn: kuzu.Connection, csvPath: string, tableName: string): Promise<ImportStats> { const stats: ImportStats = { total: 0, bulkSuccess: 0, individualInserts: 0, failed: 0 };
try { // まず Bulk COPY を試みる(最速) const copyQuery = `COPY ${tableName} FROM '${csvPath}' (HEADER=true)`; await conn.query(copyQuery);
// 成功した場合、行数をカウント const countResult = await conn.query(`MATCH ()-[r:${tableName}]->() RETURN count(r) AS cnt`); stats.bulkSuccess = countResult.getNext()?.cnt || 0; stats.total = stats.bulkSuccess;
return stats; } catch (bulkError) { console.warn(`Bulk COPY failed, falling back to individual inserts: ${bulkError}`);
// Bulk が失敗したら、個別 Insert へフォールバック return await importIndividually(conn, csvPath, tableName, stats); }}
/** * CSV を1行ずつ読み込み、個別に Insert する */async function importIndividually( conn: kuzu.Connection, csvPath: string, tableName: string, stats: ImportStats): Promise<ImportStats> { const fileStream = createReadStream(csvPath); const rl = createInterface({ input: fileStream, crlfDelay: Infinity });
let headers: string[] = []; let isFirstLine = true;
for await (const line of rl) { if (isFirstLine) { // ヘッダー行をパース headers = line.split(',').map(h => h.trim()); isFirstLine = false; continue; }
stats.total++; const values = line.split(',').map(v => v.trim());
try { // 個別に INSERT を実行 const insertQuery = buildInsertQuery(tableName, headers, values); await conn.query(insertQuery); stats.individualInserts++; } catch (insertError) { // schema 制約に合わない行はスキップ console.warn(`Failed to insert row: ${line}`, insertError); stats.failed++; } }
return stats;}
/** * INSERT クエリを構築する */function buildInsertQuery( tableName: string, headers: string[], values: string[]): string { const columns = headers.join(', '); const valuePlaceholders = values.map(v => `'${v.replace(/'/g, "''")}'`).join(', '); return `INSERT INTO ${tableName} (${columns}) VALUES (${valuePlaceholders})`;}使用例
// Kuzu DB の Connection を取得const db = new kuzu.Database('./my-graph.db');const conn = new kuzu.Connection(db);
// 関係データをインポートconst stats = await importRelationshipsWithFallback( conn, './data/relationships.csv', 'KNOWS');
console.log('Import complete:');console.log(` Total: ${stats.total}`);console.log(` Bulk success: ${stats.bulkSuccess}`);console.log(` Individual inserts: ${stats.individualInserts}`);console.log(` Failed: ${stats.failed}`);仕組み
- まず
COPYコマンドで全データを一括インポートする(最速) COPYが失敗したら、CSV を1行ずつ読み込んで個別にINSERTする- 個別
INSERTで失敗した行はfailedカウントへ記録し、スキップする - 最終的な統計情報を返す
Bulk path をデフォルトにすることで、正常データの大半は高速に処理され、異常データだけ個別 fallback へ流れる。
メリット
- 速度と成功率の両立:正常データは bulk で高速処理、異常データは個別でスキップ
- 部分的な成功:全体が失敗するのではなく、インポート可能な部分だけ取り込める
- 診断可能:どの行が失敗したか個別にログ出力できる
- 運用安定性:schema 制約に合わないデータが混入しても全体が止まらない
注意点
個別 INSERT は行数が多いと遅いため、可能であれば事前に CSV をバリデーションして異常行を除外する方が効率的。また、トランザクション戦略(全体を1つにするか、個別に commit するか)は DB の特性と要件に合わせる必要がある。
応用
- CSV インジェスト:外部データを定期的に取り込む ETL パイプライン
- Graph DB インポート:関係データを大量にロードする Knowledge Graph 構築
- Schema-sensitive bulk loader:制約が厳しい DB へ柔軟にデータを投入する
- データマイグレーション:異なる DB 間でデータを移行する際のエラー回避
hsb.horse