When importing large amounts of relational data into a database, a bulk COPY operation can fail entirely due to schema constraint violations. By first attempting bulk import and falling back to individual inserts for failed rows, you can balance speed and success rate.
Code
import fs from 'fs/promises';import { createReadStream } from 'fs';import { createInterface } from 'readline';import path from 'path';import kuzu from 'kuzu';
interface ImportStats { total: number; bulkSuccess: number; individualInserts: number; failed: number;}
/** * Bulk import relationships from CSV with fallback to individual inserts */export async function importRelationshipsWithFallback( conn: kuzu.Connection, csvPath: string, tableName: string): Promise<ImportStats> { const stats: ImportStats = { total: 0, bulkSuccess: 0, individualInserts: 0, failed: 0 };
try { // First attempt bulk COPY (fastest) const copyQuery = `COPY ${tableName} FROM '${csvPath}' (HEADER=true)`; await conn.query(copyQuery);
// If successful, count rows const countResult = await conn.query(`MATCH ()-[r:${tableName}]->() RETURN count(r) AS cnt`); stats.bulkSuccess = countResult.getNext()?.cnt || 0; stats.total = stats.bulkSuccess;
return stats; } catch (bulkError) { console.warn(`Bulk COPY failed, falling back to individual inserts: ${bulkError}`);
// If bulk fails, fall back to individual inserts return await importIndividually(conn, csvPath, tableName, stats); }}
/** * Read CSV line by line and insert individually */async function importIndividually( conn: kuzu.Connection, csvPath: string, tableName: string, stats: ImportStats): Promise<ImportStats> { const fileStream = createReadStream(csvPath); const rl = createInterface({ input: fileStream, crlfDelay: Infinity });
let headers: string[] = []; let isFirstLine = true;
for await (const line of rl) { if (isFirstLine) { // Parse header row headers = line.split(',').map(h => h.trim()); isFirstLine = false; continue; }
stats.total++; const values = line.split(',').map(v => v.trim());
try { // Execute individual INSERT const insertQuery = buildInsertQuery(tableName, headers, values); await conn.query(insertQuery); stats.individualInserts++; } catch (insertError) { // Skip rows that don't match schema constraints console.warn(`Failed to insert row: ${line}`, insertError); stats.failed++; } }
return stats;}
/** * Build INSERT query */function buildInsertQuery( tableName: string, headers: string[], values: string[]): string { const columns = headers.join(', '); const valuePlaceholders = values.map(v => `'${v.replace(/'/g, "''")}'`).join(', '); return `INSERT INTO ${tableName} (${columns}) VALUES (${valuePlaceholders})`;}Usage
// Get Kuzu DB connectionconst db = new kuzu.Database('./my-graph.db');const conn = new kuzu.Connection(db);
// Import relationshipsconst stats = await importRelationshipsWithFallback( conn, './data/relationships.csv', 'KNOWS');
console.log('Import complete:');console.log(` Total: ${stats.total}`);console.log(` Bulk success: ${stats.bulkSuccess}`);console.log(` Individual inserts: ${stats.individualInserts}`);console.log(` Failed: ${stats.failed}`);How It Works
- First attempt bulk import with
COPYcommand (fastest) - If
COPYfails, read CSV line by line and insert individually - Record failed individual
INSERToperations infailedcount and skip - Return final statistics
By defaulting to the bulk path, most valid data is processed quickly, and only problematic data flows to individual fallback.
Benefits
- Balance Speed and Success Rate: Valid data processed quickly in bulk, problematic data handled individually
- Partial Success: Import what’s possible instead of failing entirely
- Diagnosable: Individual logging for each failed row
- Operational Stability: System doesn’t halt when data violates schema constraints
Caveats
Individual INSERTs are slow for large datasets, so pre-validating the CSV to remove invalid rows is more efficient when possible. Transaction strategy (single transaction vs. individual commits) should align with database characteristics and requirements.
Applications
- CSV Ingestion: ETL pipelines for periodic external data imports
- Graph DB Import: Building Knowledge Graphs with large-scale relationship data
- Schema-sensitive Bulk Loader: Flexible data loading into databases with strict constraints
- Data Migration: Error-tolerant transfers between different database systems
hsb.horse