logo hsb.horse
← スニペット一覧に戻る

Snippets

Bulk Import with Fallback from Pairwise COPY to Individual Inserts

大量関係データをまず bulk import し、schema 制約に合わない組だけ個別 insert へ退避させる。成功率と速度を両立するパターン。

公開日: 更新日:

大量の関係データを DB へインポートする際、全データを一括 COPY で流すと schema 制約違反で全体が失敗することがある。まず bulk import を試み、エラーが出た行だけ個別 insert へフォールバックすることで、成功率と速度を両立できる。

コード

import fs from 'fs/promises';
import { createReadStream } from 'fs';
import { createInterface } from 'readline';
import path from 'path';
import kuzu from 'kuzu';
interface ImportStats {
total: number;
bulkSuccess: number;
individualInserts: number;
failed: number;
}
/**
* CSV から関係データを Bulk Import し、失敗した行だけ個別 Insert へフォールバック
*/
export async function importRelationshipsWithFallback(
conn: kuzu.Connection,
csvPath: string,
tableName: string
): Promise<ImportStats> {
const stats: ImportStats = {
total: 0,
bulkSuccess: 0,
individualInserts: 0,
failed: 0
};
try {
// まず Bulk COPY を試みる(最速)
const copyQuery = `COPY ${tableName} FROM '${csvPath}' (HEADER=true)`;
await conn.query(copyQuery);
// 成功した場合、行数をカウント
const countResult = await conn.query(`MATCH ()-[r:${tableName}]->() RETURN count(r) AS cnt`);
stats.bulkSuccess = countResult.getNext()?.cnt || 0;
stats.total = stats.bulkSuccess;
return stats;
} catch (bulkError) {
console.warn(`Bulk COPY failed, falling back to individual inserts: ${bulkError}`);
// Bulk が失敗したら、個別 Insert へフォールバック
return await importIndividually(conn, csvPath, tableName, stats);
}
}
/**
* CSV を1行ずつ読み込み、個別に Insert する
*/
async function importIndividually(
conn: kuzu.Connection,
csvPath: string,
tableName: string,
stats: ImportStats
): Promise<ImportStats> {
const fileStream = createReadStream(csvPath);
const rl = createInterface({
input: fileStream,
crlfDelay: Infinity
});
let headers: string[] = [];
let isFirstLine = true;
for await (const line of rl) {
if (isFirstLine) {
// ヘッダー行をパース
headers = line.split(',').map(h => h.trim());
isFirstLine = false;
continue;
}
stats.total++;
const values = line.split(',').map(v => v.trim());
try {
// 個別に INSERT を実行
const insertQuery = buildInsertQuery(tableName, headers, values);
await conn.query(insertQuery);
stats.individualInserts++;
} catch (insertError) {
// schema 制約に合わない行はスキップ
console.warn(`Failed to insert row: ${line}`, insertError);
stats.failed++;
}
}
return stats;
}
/**
* INSERT クエリを構築する
*/
function buildInsertQuery(
tableName: string,
headers: string[],
values: string[]
): string {
const columns = headers.join(', ');
const valuePlaceholders = values.map(v => `'${v.replace(/'/g, "''")}'`).join(', ');
return `INSERT INTO ${tableName} (${columns}) VALUES (${valuePlaceholders})`;
}

使用例

// Kuzu DB の Connection を取得
const db = new kuzu.Database('./my-graph.db');
const conn = new kuzu.Connection(db);
// 関係データをインポート
const stats = await importRelationshipsWithFallback(
conn,
'./data/relationships.csv',
'KNOWS'
);
console.log('Import complete:');
console.log(` Total: ${stats.total}`);
console.log(` Bulk success: ${stats.bulkSuccess}`);
console.log(` Individual inserts: ${stats.individualInserts}`);
console.log(` Failed: ${stats.failed}`);

仕組み

  1. まず COPY コマンドで全データを一括インポートする(最速)
  2. COPY が失敗したら、CSV を1行ずつ読み込んで個別に INSERT する
  3. 個別 INSERT で失敗した行は failed カウントへ記録し、スキップする
  4. 最終的な統計情報を返す

Bulk path をデフォルトにすることで、正常データの大半は高速に処理され、異常データだけ個別 fallback へ流れる。

メリット

  • 速度と成功率の両立:正常データは bulk で高速処理、異常データは個別でスキップ
  • 部分的な成功:全体が失敗するのではなく、インポート可能な部分だけ取り込める
  • 診断可能:どの行が失敗したか個別にログ出力できる
  • 運用安定性:schema 制約に合わないデータが混入しても全体が止まらない

注意点

個別 INSERT は行数が多いと遅いため、可能であれば事前に CSV をバリデーションして異常行を除外する方が効率的。また、トランザクション戦略(全体を1つにするか、個別に commit するか)は DB の特性と要件に合わせる必要がある。

応用

  • CSV インジェスト:外部データを定期的に取り込む ETL パイプライン
  • Graph DB インポート:関係データを大量にロードする Knowledge Graph 構築
  • Schema-sensitive bulk loader:制約が厳しい DB へ柔軟にデータを投入する
  • データマイグレーション:異なる DB 間でデータを移行する際のエラー回避