logo hsb.horse
← Back to snippets index

Snippets

Bulk Import with Fallback from Pairwise COPY to Individual Inserts

First attempt bulk import for large relational data, then fall back to individual inserts for rows that violate schema constraints. Balances speed and success rate.

Published: Updated:

When importing large amounts of relational data into a database, a bulk COPY operation can fail entirely due to schema constraint violations. By first attempting bulk import and falling back to individual inserts for failed rows, you can balance speed and success rate.

Code

import fs from 'fs/promises';
import { createReadStream } from 'fs';
import { createInterface } from 'readline';
import path from 'path';
import kuzu from 'kuzu';
interface ImportStats {
total: number;
bulkSuccess: number;
individualInserts: number;
failed: number;
}
/**
* Bulk import relationships from CSV with fallback to individual inserts
*/
export async function importRelationshipsWithFallback(
conn: kuzu.Connection,
csvPath: string,
tableName: string
): Promise<ImportStats> {
const stats: ImportStats = {
total: 0,
bulkSuccess: 0,
individualInserts: 0,
failed: 0
};
try {
// First attempt bulk COPY (fastest)
const copyQuery = `COPY ${tableName} FROM '${csvPath}' (HEADER=true)`;
await conn.query(copyQuery);
// If successful, count rows
const countResult = await conn.query(`MATCH ()-[r:${tableName}]->() RETURN count(r) AS cnt`);
stats.bulkSuccess = countResult.getNext()?.cnt || 0;
stats.total = stats.bulkSuccess;
return stats;
} catch (bulkError) {
console.warn(`Bulk COPY failed, falling back to individual inserts: ${bulkError}`);
// If bulk fails, fall back to individual inserts
return await importIndividually(conn, csvPath, tableName, stats);
}
}
/**
* Read CSV line by line and insert individually
*/
async function importIndividually(
conn: kuzu.Connection,
csvPath: string,
tableName: string,
stats: ImportStats
): Promise<ImportStats> {
const fileStream = createReadStream(csvPath);
const rl = createInterface({
input: fileStream,
crlfDelay: Infinity
});
let headers: string[] = [];
let isFirstLine = true;
for await (const line of rl) {
if (isFirstLine) {
// Parse header row
headers = line.split(',').map(h => h.trim());
isFirstLine = false;
continue;
}
stats.total++;
const values = line.split(',').map(v => v.trim());
try {
// Execute individual INSERT
const insertQuery = buildInsertQuery(tableName, headers, values);
await conn.query(insertQuery);
stats.individualInserts++;
} catch (insertError) {
// Skip rows that don't match schema constraints
console.warn(`Failed to insert row: ${line}`, insertError);
stats.failed++;
}
}
return stats;
}
/**
* Build INSERT query
*/
function buildInsertQuery(
tableName: string,
headers: string[],
values: string[]
): string {
const columns = headers.join(', ');
const valuePlaceholders = values.map(v => `'${v.replace(/'/g, "''")}'`).join(', ');
return `INSERT INTO ${tableName} (${columns}) VALUES (${valuePlaceholders})`;
}

Usage

// Get Kuzu DB connection
const db = new kuzu.Database('./my-graph.db');
const conn = new kuzu.Connection(db);
// Import relationships
const stats = await importRelationshipsWithFallback(
conn,
'./data/relationships.csv',
'KNOWS'
);
console.log('Import complete:');
console.log(` Total: ${stats.total}`);
console.log(` Bulk success: ${stats.bulkSuccess}`);
console.log(` Individual inserts: ${stats.individualInserts}`);
console.log(` Failed: ${stats.failed}`);

How It Works

  1. First attempt bulk import with COPY command (fastest)
  2. If COPY fails, read CSV line by line and insert individually
  3. Record failed individual INSERT operations in failed count and skip
  4. Return final statistics

By defaulting to the bulk path, most valid data is processed quickly, and only problematic data flows to individual fallback.

Benefits

  • Balance Speed and Success Rate: Valid data processed quickly in bulk, problematic data handled individually
  • Partial Success: Import what’s possible instead of failing entirely
  • Diagnosable: Individual logging for each failed row
  • Operational Stability: System doesn’t halt when data violates schema constraints

Caveats

Individual INSERTs are slow for large datasets, so pre-validating the CSV to remove invalid rows is more efficient when possible. Transaction strategy (single transaction vs. individual commits) should align with database characteristics and requirements.

Applications

  • CSV Ingestion: ETL pipelines for periodic external data imports
  • Graph DB Import: Building Knowledge Graphs with large-scale relationship data
  • Schema-sensitive Bulk Loader: Flexible data loading into databases with strict constraints
  • Data Migration: Error-tolerant transfers between different database systems