logo hsb.horse
← Zur Snippets-Übersicht

Snippets

Bulk Import with Fallback from Pairwise COPY to Individual Inserts

Versuchen Sie zunächst einen Bulk-Import für große relationale Daten und fallen Sie dann auf einzelne Einfügungen für Zeilen zurück, die Schema-Einschränkungen verletzen. Balanciert Geschwindigkeit und Erfolgsrate.

Veröffentlicht: Aktualisiert:

Beim Importieren großer Mengen relationaler Daten in eine Datenbank kann ein Bulk-COPY-Vorgang aufgrund von Schema-Einschränkungsverletzungen vollständig fehlschlagen. Indem Sie zunächst einen Bulk-Import versuchen und bei fehlgeschlagenen Zeilen auf einzelne Einfügungen zurückgreifen, können Sie Geschwindigkeit und Erfolgsrate ausbalancieren.

Code

import fs from 'fs/promises';
import { createReadStream } from 'fs';
import { createInterface } from 'readline';
import path from 'path';
import kuzu from 'kuzu';
interface ImportStats {
total: number;
bulkSuccess: number;
individualInserts: number;
failed: number;
}
/**
* Bulk-Import von Beziehungen aus CSV mit Fallback auf einzelne Einfügungen
*/
export async function importRelationshipsWithFallback(
conn: kuzu.Connection,
csvPath: string,
tableName: string
): Promise<ImportStats> {
const stats: ImportStats = {
total: 0,
bulkSuccess: 0,
individualInserts: 0,
failed: 0
};
try {
// Zuerst Bulk-COPY versuchen (am schnellsten)
const copyQuery = `COPY ${tableName} FROM '${csvPath}' (HEADER=true)`;
await conn.query(copyQuery);
// Bei Erfolg Zeilen zählen
const countResult = await conn.query(`MATCH ()-[r:${tableName}]->() RETURN count(r) AS cnt`);
stats.bulkSuccess = countResult.getNext()?.cnt || 0;
stats.total = stats.bulkSuccess;
return stats;
} catch (bulkError) {
console.warn(`Bulk-COPY fehlgeschlagen, Fallback auf einzelne Einfügungen: ${bulkError}`);
// Bei Bulk-Fehler auf einzelne Einfügungen zurückgreifen
return await importIndividually(conn, csvPath, tableName, stats);
}
}
/**
* CSV zeilenweise lesen und einzeln einfügen
*/
async function importIndividually(
conn: kuzu.Connection,
csvPath: string,
tableName: string,
stats: ImportStats
): Promise<ImportStats> {
const fileStream = createReadStream(csvPath);
const rl = createInterface({
input: fileStream,
crlfDelay: Infinity
});
let headers: string[] = [];
let isFirstLine = true;
for await (const line of rl) {
if (isFirstLine) {
// Kopfzeile parsen
headers = line.split(',').map(h => h.trim());
isFirstLine = false;
continue;
}
stats.total++;
const values = line.split(',').map(v => v.trim());
try {
// Einzelnes INSERT ausführen
const insertQuery = buildInsertQuery(tableName, headers, values);
await conn.query(insertQuery);
stats.individualInserts++;
} catch (insertError) {
// Zeilen überspringen, die nicht den Schema-Einschränkungen entsprechen
console.warn(`Fehler beim Einfügen der Zeile: ${line}`, insertError);
stats.failed++;
}
}
return stats;
}
/**
* INSERT-Abfrage erstellen
*/
function buildInsertQuery(
tableName: string,
headers: string[],
values: string[]
): string {
const columns = headers.join(', ');
const valuePlaceholders = values.map(v => `'${v.replace(/'/g, "''")}'`).join(', ');
return `INSERT INTO ${tableName} (${columns}) VALUES (${valuePlaceholders})`;
}

Verwendung

// Kuzu DB-Verbindung herstellen
const db = new kuzu.Database('./my-graph.db');
const conn = new kuzu.Connection(db);
// Beziehungen importieren
const stats = await importRelationshipsWithFallback(
conn,
'./data/relationships.csv',
'KNOWS'
);
console.log('Import abgeschlossen:');
console.log(` Gesamt: ${stats.total}`);
console.log(` Bulk-Erfolg: ${stats.bulkSuccess}`);
console.log(` Einzelne Einfügungen: ${stats.individualInserts}`);
console.log(` Fehlgeschlagen: ${stats.failed}`);

Funktionsweise

  1. Zunächst Bulk-Import mit COPY-Befehl versuchen (am schnellsten)
  2. Bei COPY-Fehler CSV zeilenweise lesen und einzeln einfügen
  3. Fehlgeschlagene einzelne INSERT-Operationen im failed-Zähler aufzeichnen und überspringen
  4. Endgültige Statistiken zurückgeben

Durch Standardnutzung des Bulk-Pfads werden die meisten gültigen Daten schnell verarbeitet, und nur problematische Daten fließen zum einzelnen Fallback.

Vorteile

  • Balance zwischen Geschwindigkeit und Erfolgsrate: Gültige Daten werden schnell im Bulk verarbeitet, problematische Daten einzeln behandelt
  • Teilerfolg: Importieren was möglich ist, anstatt vollständig zu scheitern
  • Diagnosebereit: Individuelle Protokollierung für jede fehlgeschlagene Zeile
  • Betriebliche Stabilität: System stoppt nicht, wenn Daten Schema-Einschränkungen verletzen

Hinweise

Einzelne INSERTs sind bei großen Datensätzen langsam, daher ist eine Vorvalidierung des CSV zur Entfernung ungültiger Zeilen effizienter, wenn möglich. Die Transaktionsstrategie (einzelne Transaktion vs. einzelne Commits) sollte mit den Datenbankmerkmalen und Anforderungen übereinstimmen.

Anwendungen

  • CSV-Aufnahme: ETL-Pipelines für periodische externe Datenimporte
  • Graph-DB-Import: Aufbau von Knowledge Graphs mit umfangreichen Beziehungsdaten
  • Schema-sensitiver Bulk-Loader: Flexible Datenladen in Datenbanken mit strengen Einschränkungen
  • Datenmigration: Fehlertolerante Übertragungen zwischen verschiedenen Datenbanksystemen