Beim Importieren großer Mengen relationaler Daten in eine Datenbank kann ein Bulk-COPY-Vorgang aufgrund von Schema-Einschränkungsverletzungen vollständig fehlschlagen. Indem Sie zunächst einen Bulk-Import versuchen und bei fehlgeschlagenen Zeilen auf einzelne Einfügungen zurückgreifen, können Sie Geschwindigkeit und Erfolgsrate ausbalancieren.
Code
import fs from 'fs/promises';import { createReadStream } from 'fs';import { createInterface } from 'readline';import path from 'path';import kuzu from 'kuzu';
interface ImportStats { total: number; bulkSuccess: number; individualInserts: number; failed: number;}
/** * Bulk-Import von Beziehungen aus CSV mit Fallback auf einzelne Einfügungen */export async function importRelationshipsWithFallback( conn: kuzu.Connection, csvPath: string, tableName: string): Promise<ImportStats> { const stats: ImportStats = { total: 0, bulkSuccess: 0, individualInserts: 0, failed: 0 };
try { // Zuerst Bulk-COPY versuchen (am schnellsten) const copyQuery = `COPY ${tableName} FROM '${csvPath}' (HEADER=true)`; await conn.query(copyQuery);
// Bei Erfolg Zeilen zählen const countResult = await conn.query(`MATCH ()-[r:${tableName}]->() RETURN count(r) AS cnt`); stats.bulkSuccess = countResult.getNext()?.cnt || 0; stats.total = stats.bulkSuccess;
return stats; } catch (bulkError) { console.warn(`Bulk-COPY fehlgeschlagen, Fallback auf einzelne Einfügungen: ${bulkError}`);
// Bei Bulk-Fehler auf einzelne Einfügungen zurückgreifen return await importIndividually(conn, csvPath, tableName, stats); }}
/** * CSV zeilenweise lesen und einzeln einfügen */async function importIndividually( conn: kuzu.Connection, csvPath: string, tableName: string, stats: ImportStats): Promise<ImportStats> { const fileStream = createReadStream(csvPath); const rl = createInterface({ input: fileStream, crlfDelay: Infinity });
let headers: string[] = []; let isFirstLine = true;
for await (const line of rl) { if (isFirstLine) { // Kopfzeile parsen headers = line.split(',').map(h => h.trim()); isFirstLine = false; continue; }
stats.total++; const values = line.split(',').map(v => v.trim());
try { // Einzelnes INSERT ausführen const insertQuery = buildInsertQuery(tableName, headers, values); await conn.query(insertQuery); stats.individualInserts++; } catch (insertError) { // Zeilen überspringen, die nicht den Schema-Einschränkungen entsprechen console.warn(`Fehler beim Einfügen der Zeile: ${line}`, insertError); stats.failed++; } }
return stats;}
/** * INSERT-Abfrage erstellen */function buildInsertQuery( tableName: string, headers: string[], values: string[]): string { const columns = headers.join(', '); const valuePlaceholders = values.map(v => `'${v.replace(/'/g, "''")}'`).join(', '); return `INSERT INTO ${tableName} (${columns}) VALUES (${valuePlaceholders})`;}Verwendung
// Kuzu DB-Verbindung herstellenconst db = new kuzu.Database('./my-graph.db');const conn = new kuzu.Connection(db);
// Beziehungen importierenconst stats = await importRelationshipsWithFallback( conn, './data/relationships.csv', 'KNOWS');
console.log('Import abgeschlossen:');console.log(` Gesamt: ${stats.total}`);console.log(` Bulk-Erfolg: ${stats.bulkSuccess}`);console.log(` Einzelne Einfügungen: ${stats.individualInserts}`);console.log(` Fehlgeschlagen: ${stats.failed}`);Funktionsweise
- Zunächst Bulk-Import mit
COPY-Befehl versuchen (am schnellsten) - Bei
COPY-Fehler CSV zeilenweise lesen und einzeln einfügen - Fehlgeschlagene einzelne
INSERT-Operationen imfailed-Zähler aufzeichnen und überspringen - Endgültige Statistiken zurückgeben
Durch Standardnutzung des Bulk-Pfads werden die meisten gültigen Daten schnell verarbeitet, und nur problematische Daten fließen zum einzelnen Fallback.
Vorteile
- Balance zwischen Geschwindigkeit und Erfolgsrate: Gültige Daten werden schnell im Bulk verarbeitet, problematische Daten einzeln behandelt
- Teilerfolg: Importieren was möglich ist, anstatt vollständig zu scheitern
- Diagnosebereit: Individuelle Protokollierung für jede fehlgeschlagene Zeile
- Betriebliche Stabilität: System stoppt nicht, wenn Daten Schema-Einschränkungen verletzen
Hinweise
Einzelne INSERTs sind bei großen Datensätzen langsam, daher ist eine Vorvalidierung des CSV zur Entfernung ungültiger Zeilen effizienter, wenn möglich. Die Transaktionsstrategie (einzelne Transaktion vs. einzelne Commits) sollte mit den Datenbankmerkmalen und Anforderungen übereinstimmen.
Anwendungen
- CSV-Aufnahme: ETL-Pipelines für periodische externe Datenimporte
- Graph-DB-Import: Aufbau von Knowledge Graphs mit umfangreichen Beziehungsdaten
- Schema-sensitiver Bulk-Loader: Flexible Datenladen in Datenbanken mit strengen Einschränkungen
- Datenmigration: Fehlertolerante Übertragungen zwischen verschiedenen Datenbanksystemen
hsb.horse