logo hsb.horse
← Retour à l’index des snippets

Snippets

Bulk Import with Fallback from Pairwise COPY to Individual Inserts

Tentez d'abord un bulk import pour les données relationnelles volumineuses, puis basculez vers des insertions individuelles pour les lignes qui violent les contraintes de schéma. Équilibre vitesse et taux de réussite.

Publié: Mis à jour:

Lors de l’importation de grandes quantités de données relationnelles dans une base de données, une opération COPY en masse peut échouer entièrement en raison de violations de contraintes de schéma. En tentant d’abord un import en masse et en basculant vers des insertions individuelles pour les lignes échouées, vous pouvez équilibrer vitesse et taux de réussite.

Code

import fs from 'fs/promises';
import { createReadStream } from 'fs';
import { createInterface } from 'readline';
import path from 'path';
import kuzu from 'kuzu';
interface ImportStats {
total: number;
bulkSuccess: number;
individualInserts: number;
failed: number;
}
/**
* Import en masse des relations depuis CSV avec repli vers insertions individuelles
*/
export async function importRelationshipsWithFallback(
conn: kuzu.Connection,
csvPath: string,
tableName: string
): Promise<ImportStats> {
const stats: ImportStats = {
total: 0,
bulkSuccess: 0,
individualInserts: 0,
failed: 0
};
try {
// Tentative de COPY en masse d'abord (le plus rapide)
const copyQuery = `COPY ${tableName} FROM '${csvPath}' (HEADER=true)`;
await conn.query(copyQuery);
// Si réussi, compter les lignes
const countResult = await conn.query(`MATCH ()-[r:${tableName}]->() RETURN count(r) AS cnt`);
stats.bulkSuccess = countResult.getNext()?.cnt || 0;
stats.total = stats.bulkSuccess;
return stats;
} catch (bulkError) {
console.warn(`Échec du COPY en masse, repli vers insertions individuelles: ${bulkError}`);
// Si le bulk échoue, basculer vers insertions individuelles
return await importIndividually(conn, csvPath, tableName, stats);
}
}
/**
* Lire le CSV ligne par ligne et insérer individuellement
*/
async function importIndividually(
conn: kuzu.Connection,
csvPath: string,
tableName: string,
stats: ImportStats
): Promise<ImportStats> {
const fileStream = createReadStream(csvPath);
const rl = createInterface({
input: fileStream,
crlfDelay: Infinity
});
let headers: string[] = [];
let isFirstLine = true;
for await (const line of rl) {
if (isFirstLine) {
// Analyser la ligne d'en-tête
headers = line.split(',').map(h => h.trim());
isFirstLine = false;
continue;
}
stats.total++;
const values = line.split(',').map(v => v.trim());
try {
// Exécuter INSERT individuel
const insertQuery = buildInsertQuery(tableName, headers, values);
await conn.query(insertQuery);
stats.individualInserts++;
} catch (insertError) {
// Ignorer les lignes qui ne correspondent pas aux contraintes de schéma
console.warn(`Échec de l'insertion de ligne: ${line}`, insertError);
stats.failed++;
}
}
return stats;
}
/**
* Construire la requête INSERT
*/
function buildInsertQuery(
tableName: string,
headers: string[],
values: string[]
): string {
const columns = headers.join(', ');
const valuePlaceholders = values.map(v => `'${v.replace(/'/g, "''")}'`).join(', ');
return `INSERT INTO ${tableName} (${columns}) VALUES (${valuePlaceholders})`;
}

Utilisation

// Obtenir la connexion Kuzu DB
const db = new kuzu.Database('./my-graph.db');
const conn = new kuzu.Connection(db);
// Importer les relations
const stats = await importRelationshipsWithFallback(
conn,
'./data/relationships.csv',
'KNOWS'
);
console.log('Import terminé:');
console.log(` Total: ${stats.total}`);
console.log(` Succès en masse: ${stats.bulkSuccess}`);
console.log(` Insertions individuelles: ${stats.individualInserts}`);
console.log(` Échecs: ${stats.failed}`);

Fonctionnement

  1. Tentative d’import en masse avec la commande COPY (le plus rapide)
  2. Si COPY échoue, lire le CSV ligne par ligne et insérer individuellement
  3. Enregistrer les opérations INSERT individuelles échouées dans le compteur failed et ignorer
  4. Retourner les statistiques finales

En privilégiant le chemin en masse par défaut, la plupart des données valides sont traitées rapidement, et seules les données problématiques passent au repli individuel.

Avantages

  • Équilibre vitesse et taux de réussite: Données valides traitées rapidement en masse, données problématiques gérées individuellement
  • Succès partiel: Importer ce qui est possible au lieu d’échouer complètement
  • Diagnosticable: Journalisation individuelle pour chaque ligne échouée
  • Stabilité opérationnelle: Le système ne s’arrête pas lorsque des données violent les contraintes de schéma

Précautions

Les INSERT individuels sont lents pour les grands ensembles de données, donc pré-valider le CSV pour supprimer les lignes invalides est plus efficace lorsque possible. La stratégie de transaction (transaction unique vs. commits individuels) doit s’aligner sur les caractéristiques de la base de données et les exigences.

Applications

  • Ingestion CSV: Pipelines ETL pour importations périodiques de données externes
  • Import Graph DB: Construction de Knowledge Graphs avec données relationnelles à grande échelle
  • Chargeur en masse sensible au schéma: Chargement flexible de données dans des bases de données avec contraintes strictes
  • Migration de données: Transferts tolérants aux erreurs entre différents systèmes de bases de données