Lors de l’importation de grandes quantités de données relationnelles dans une base de données, une opération COPY en masse peut échouer entièrement en raison de violations de contraintes de schéma. En tentant d’abord un import en masse et en basculant vers des insertions individuelles pour les lignes échouées, vous pouvez équilibrer vitesse et taux de réussite.
Code
import fs from 'fs/promises';import { createReadStream } from 'fs';import { createInterface } from 'readline';import path from 'path';import kuzu from 'kuzu';
interface ImportStats { total: number; bulkSuccess: number; individualInserts: number; failed: number;}
/** * Import en masse des relations depuis CSV avec repli vers insertions individuelles */export async function importRelationshipsWithFallback( conn: kuzu.Connection, csvPath: string, tableName: string): Promise<ImportStats> { const stats: ImportStats = { total: 0, bulkSuccess: 0, individualInserts: 0, failed: 0 };
try { // Tentative de COPY en masse d'abord (le plus rapide) const copyQuery = `COPY ${tableName} FROM '${csvPath}' (HEADER=true)`; await conn.query(copyQuery);
// Si réussi, compter les lignes const countResult = await conn.query(`MATCH ()-[r:${tableName}]->() RETURN count(r) AS cnt`); stats.bulkSuccess = countResult.getNext()?.cnt || 0; stats.total = stats.bulkSuccess;
return stats; } catch (bulkError) { console.warn(`Échec du COPY en masse, repli vers insertions individuelles: ${bulkError}`);
// Si le bulk échoue, basculer vers insertions individuelles return await importIndividually(conn, csvPath, tableName, stats); }}
/** * Lire le CSV ligne par ligne et insérer individuellement */async function importIndividually( conn: kuzu.Connection, csvPath: string, tableName: string, stats: ImportStats): Promise<ImportStats> { const fileStream = createReadStream(csvPath); const rl = createInterface({ input: fileStream, crlfDelay: Infinity });
let headers: string[] = []; let isFirstLine = true;
for await (const line of rl) { if (isFirstLine) { // Analyser la ligne d'en-tête headers = line.split(',').map(h => h.trim()); isFirstLine = false; continue; }
stats.total++; const values = line.split(',').map(v => v.trim());
try { // Exécuter INSERT individuel const insertQuery = buildInsertQuery(tableName, headers, values); await conn.query(insertQuery); stats.individualInserts++; } catch (insertError) { // Ignorer les lignes qui ne correspondent pas aux contraintes de schéma console.warn(`Échec de l'insertion de ligne: ${line}`, insertError); stats.failed++; } }
return stats;}
/** * Construire la requête INSERT */function buildInsertQuery( tableName: string, headers: string[], values: string[]): string { const columns = headers.join(', '); const valuePlaceholders = values.map(v => `'${v.replace(/'/g, "''")}'`).join(', '); return `INSERT INTO ${tableName} (${columns}) VALUES (${valuePlaceholders})`;}Utilisation
// Obtenir la connexion Kuzu DBconst db = new kuzu.Database('./my-graph.db');const conn = new kuzu.Connection(db);
// Importer les relationsconst stats = await importRelationshipsWithFallback( conn, './data/relationships.csv', 'KNOWS');
console.log('Import terminé:');console.log(` Total: ${stats.total}`);console.log(` Succès en masse: ${stats.bulkSuccess}`);console.log(` Insertions individuelles: ${stats.individualInserts}`);console.log(` Échecs: ${stats.failed}`);Fonctionnement
- Tentative d’import en masse avec la commande
COPY(le plus rapide) - Si
COPYéchoue, lire le CSV ligne par ligne et insérer individuellement - Enregistrer les opérations
INSERTindividuelles échouées dans le compteurfailedet ignorer - Retourner les statistiques finales
En privilégiant le chemin en masse par défaut, la plupart des données valides sont traitées rapidement, et seules les données problématiques passent au repli individuel.
Avantages
- Équilibre vitesse et taux de réussite: Données valides traitées rapidement en masse, données problématiques gérées individuellement
- Succès partiel: Importer ce qui est possible au lieu d’échouer complètement
- Diagnosticable: Journalisation individuelle pour chaque ligne échouée
- Stabilité opérationnelle: Le système ne s’arrête pas lorsque des données violent les contraintes de schéma
Précautions
Les INSERT individuels sont lents pour les grands ensembles de données, donc pré-valider le CSV pour supprimer les lignes invalides est plus efficace lorsque possible. La stratégie de transaction (transaction unique vs. commits individuels) doit s’aligner sur les caractéristiques de la base de données et les exigences.
Applications
- Ingestion CSV: Pipelines ETL pour importations périodiques de données externes
- Import Graph DB: Construction de Knowledge Graphs avec données relationnelles à grande échelle
- Chargeur en masse sensible au schéma: Chargement flexible de données dans des bases de données avec contraintes strictes
- Migration de données: Transferts tolérants aux erreurs entre différents systèmes de bases de données
hsb.horse