Ao importar grandes quantidades de dados relacionais para um banco de dados, uma operação COPY em massa pode falhar completamente devido a violações de restrições de esquema. Ao primeiro tentar a importação em massa e recorrer a inserções individuais para linhas com falha, você pode equilibrar velocidade e taxa de sucesso.
Código
import fs from 'fs/promises';import { createReadStream } from 'fs';import { createInterface } from 'readline';import path from 'path';import kuzu from 'kuzu';
interface ImportStats { total: number; bulkSuccess: number; individualInserts: number; failed: number;}
/** * Importação em massa de relacionamentos do CSV com fallback para inserções individuais */export async function importRelationshipsWithFallback( conn: kuzu.Connection, csvPath: string, tableName: string): Promise<ImportStats> { const stats: ImportStats = { total: 0, bulkSuccess: 0, individualInserts: 0, failed: 0 };
try { // Primeiro tentar COPY em massa (mais rápido) const copyQuery = `COPY ${tableName} FROM '${csvPath}' (HEADER=true)`; await conn.query(copyQuery);
// Se bem-sucedido, contar linhas const countResult = await conn.query(`MATCH ()-[r:${tableName}]->() RETURN count(r) AS cnt`); stats.bulkSuccess = countResult.getNext()?.cnt || 0; stats.total = stats.bulkSuccess;
return stats; } catch (bulkError) { console.warn(`COPY em massa falhou, recorrendo a inserções individuais: ${bulkError}`);
// Se bulk falhar, recorrer a inserções individuais return await importIndividually(conn, csvPath, tableName, stats); }}
/** * Ler CSV linha por linha e inserir individualmente */async function importIndividually( conn: kuzu.Connection, csvPath: string, tableName: string, stats: ImportStats): Promise<ImportStats> { const fileStream = createReadStream(csvPath); const rl = createInterface({ input: fileStream, crlfDelay: Infinity });
let headers: string[] = []; let isFirstLine = true;
for await (const line of rl) { if (isFirstLine) { // Analisar linha de cabeçalho headers = line.split(',').map(h => h.trim()); isFirstLine = false; continue; }
stats.total++; const values = line.split(',').map(v => v.trim());
try { // Executar INSERT individual const insertQuery = buildInsertQuery(tableName, headers, values); await conn.query(insertQuery); stats.individualInserts++; } catch (insertError) { // Pular linhas que não correspondem às restrições de esquema console.warn(`Falha ao inserir linha: ${line}`, insertError); stats.failed++; } }
return stats;}
/** * Construir consulta INSERT */function buildInsertQuery( tableName: string, headers: string[], values: string[]): string { const columns = headers.join(', '); const valuePlaceholders = values.map(v => `'${v.replace(/'/g, "''")}'`).join(', '); return `INSERT INTO ${tableName} (${columns}) VALUES (${valuePlaceholders})`;}Uso
// Obter conexão Kuzu DBconst db = new kuzu.Database('./my-graph.db');const conn = new kuzu.Connection(db);
// Importar relacionamentosconst stats = await importRelationshipsWithFallback( conn, './data/relationships.csv', 'KNOWS');
console.log('Importação concluída:');console.log(` Total: ${stats.total}`);console.log(` Sucesso em massa: ${stats.bulkSuccess}`);console.log(` Inserções individuais: ${stats.individualInserts}`);console.log(` Falhas: ${stats.failed}`);Como Funciona
- Primeiro tentar importação em massa com comando
COPY(mais rápido) - Se
COPYfalhar, ler CSV linha por linha e inserir individualmente - Registrar operações
INSERTindividuais com falha no contadorfailede pular - Retornar estatísticas finais
Ao usar o caminho em massa como padrão, a maioria dos dados válidos é processada rapidamente, e apenas dados problemáticos fluem para o fallback individual.
Benefícios
- Equilíbrio entre Velocidade e Taxa de Sucesso: Dados válidos processados rapidamente em massa, dados problemáticos tratados individualmente
- Sucesso Parcial: Importar o que for possível em vez de falhar completamente
- Diagnosticável: Log individual para cada linha com falha
- Estabilidade Operacional: Sistema não para quando dados violam restrições de esquema
Ressalvas
INSERTs individuais são lentos para grandes conjuntos de dados, portanto pré-validar o CSV para remover linhas inválidas é mais eficiente quando possível. A estratégia de transação (transação única vs. commits individuais) deve alinhar-se com as características do banco de dados e requisitos.
Aplicações
- Ingestão CSV: Pipelines ETL para importações periódicas de dados externos
- Importação Graph DB: Construção de Knowledge Graphs com dados relacionais em larga escala
- Loader em Massa Sensível a Esquema: Carregamento flexível de dados em bancos de dados com restrições rígidas
- Migração de Dados: Transferências tolerantes a erros entre diferentes sistemas de banco de dados
hsb.horse