logo hsb.horse
← Voltar para o índice de snippets

Snippets

Bulk Import with Fallback from Pairwise COPY to Individual Inserts

Primeiro tente bulk import para grandes volumes de dados relacionais e depois recorra a inserções individuais para linhas que violam restrições de esquema. Equilibra velocidade e taxa de sucesso.

Publicado: Atualizado:

Ao importar grandes quantidades de dados relacionais para um banco de dados, uma operação COPY em massa pode falhar completamente devido a violações de restrições de esquema. Ao primeiro tentar a importação em massa e recorrer a inserções individuais para linhas com falha, você pode equilibrar velocidade e taxa de sucesso.

Código

import fs from 'fs/promises';
import { createReadStream } from 'fs';
import { createInterface } from 'readline';
import path from 'path';
import kuzu from 'kuzu';
interface ImportStats {
total: number;
bulkSuccess: number;
individualInserts: number;
failed: number;
}
/**
* Importação em massa de relacionamentos do CSV com fallback para inserções individuais
*/
export async function importRelationshipsWithFallback(
conn: kuzu.Connection,
csvPath: string,
tableName: string
): Promise<ImportStats> {
const stats: ImportStats = {
total: 0,
bulkSuccess: 0,
individualInserts: 0,
failed: 0
};
try {
// Primeiro tentar COPY em massa (mais rápido)
const copyQuery = `COPY ${tableName} FROM '${csvPath}' (HEADER=true)`;
await conn.query(copyQuery);
// Se bem-sucedido, contar linhas
const countResult = await conn.query(`MATCH ()-[r:${tableName}]->() RETURN count(r) AS cnt`);
stats.bulkSuccess = countResult.getNext()?.cnt || 0;
stats.total = stats.bulkSuccess;
return stats;
} catch (bulkError) {
console.warn(`COPY em massa falhou, recorrendo a inserções individuais: ${bulkError}`);
// Se bulk falhar, recorrer a inserções individuais
return await importIndividually(conn, csvPath, tableName, stats);
}
}
/**
* Ler CSV linha por linha e inserir individualmente
*/
async function importIndividually(
conn: kuzu.Connection,
csvPath: string,
tableName: string,
stats: ImportStats
): Promise<ImportStats> {
const fileStream = createReadStream(csvPath);
const rl = createInterface({
input: fileStream,
crlfDelay: Infinity
});
let headers: string[] = [];
let isFirstLine = true;
for await (const line of rl) {
if (isFirstLine) {
// Analisar linha de cabeçalho
headers = line.split(',').map(h => h.trim());
isFirstLine = false;
continue;
}
stats.total++;
const values = line.split(',').map(v => v.trim());
try {
// Executar INSERT individual
const insertQuery = buildInsertQuery(tableName, headers, values);
await conn.query(insertQuery);
stats.individualInserts++;
} catch (insertError) {
// Pular linhas que não correspondem às restrições de esquema
console.warn(`Falha ao inserir linha: ${line}`, insertError);
stats.failed++;
}
}
return stats;
}
/**
* Construir consulta INSERT
*/
function buildInsertQuery(
tableName: string,
headers: string[],
values: string[]
): string {
const columns = headers.join(', ');
const valuePlaceholders = values.map(v => `'${v.replace(/'/g, "''")}'`).join(', ');
return `INSERT INTO ${tableName} (${columns}) VALUES (${valuePlaceholders})`;
}

Uso

// Obter conexão Kuzu DB
const db = new kuzu.Database('./my-graph.db');
const conn = new kuzu.Connection(db);
// Importar relacionamentos
const stats = await importRelationshipsWithFallback(
conn,
'./data/relationships.csv',
'KNOWS'
);
console.log('Importação concluída:');
console.log(` Total: ${stats.total}`);
console.log(` Sucesso em massa: ${stats.bulkSuccess}`);
console.log(` Inserções individuais: ${stats.individualInserts}`);
console.log(` Falhas: ${stats.failed}`);

Como Funciona

  1. Primeiro tentar importação em massa com comando COPY (mais rápido)
  2. Se COPY falhar, ler CSV linha por linha e inserir individualmente
  3. Registrar operações INSERT individuais com falha no contador failed e pular
  4. Retornar estatísticas finais

Ao usar o caminho em massa como padrão, a maioria dos dados válidos é processada rapidamente, e apenas dados problemáticos fluem para o fallback individual.

Benefícios

  • Equilíbrio entre Velocidade e Taxa de Sucesso: Dados válidos processados rapidamente em massa, dados problemáticos tratados individualmente
  • Sucesso Parcial: Importar o que for possível em vez de falhar completamente
  • Diagnosticável: Log individual para cada linha com falha
  • Estabilidade Operacional: Sistema não para quando dados violam restrições de esquema

Ressalvas

INSERTs individuais são lentos para grandes conjuntos de dados, portanto pré-validar o CSV para remover linhas inválidas é mais eficiente quando possível. A estratégia de transação (transação única vs. commits individuais) deve alinhar-se com as características do banco de dados e requisitos.

Aplicações

  • Ingestão CSV: Pipelines ETL para importações periódicas de dados externos
  • Importação Graph DB: Construção de Knowledge Graphs com dados relacionais em larga escala
  • Loader em Massa Sensível a Esquema: Carregamento flexível de dados em bancos de dados com restrições rígidas
  • Migração de Dados: Transferências tolerantes a erros entre diferentes sistemas de banco de dados