fix(seeder): stream city import batches
This commit is contained in:
parent
9616e3db68
commit
0d29976502
1 changed files with 100 additions and 1 deletions
|
|
@ -3,6 +3,7 @@ import { readFileSync, createReadStream } from 'fs';
|
||||||
import { createGunzip } from 'zlib';
|
import { createGunzip } from 'zlib';
|
||||||
import { pipeline } from 'stream/promises';
|
import { pipeline } from 'stream/promises';
|
||||||
import { Writable } from 'stream';
|
import { Writable } from 'stream';
|
||||||
|
import { createInterface } from 'readline';
|
||||||
import { dirname, join } from 'path';
|
import { dirname, join } from 'path';
|
||||||
import { fileURLToPath } from 'url';
|
import { fileURLToPath } from 'url';
|
||||||
|
|
||||||
|
|
@ -465,6 +466,104 @@ async function executeGzippedSqlFile(filename, tableName) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async function executeGzippedSqlFileStreamed(filename, tableName) {
|
||||||
|
const filePath = join(SQL_DIR, filename);
|
||||||
|
console.log(` Loading ${filename} (gzipped, streamed)...`);
|
||||||
|
|
||||||
|
try {
|
||||||
|
const extractValues = (stmt) => {
|
||||||
|
const match = stmt.match(/VALUES\s*\((.+)\);?$/is);
|
||||||
|
if (!match) return null;
|
||||||
|
return match[1];
|
||||||
|
};
|
||||||
|
|
||||||
|
const shouldIgnoreLine = (line) => {
|
||||||
|
const trimmed = line.trim();
|
||||||
|
if (!trimmed || trimmed.startsWith('--')) return true;
|
||||||
|
if (trimmed.startsWith('\\restrict') || trimmed.startsWith('\\unrestrict')) return true;
|
||||||
|
if (/^SET\s+[a-z_]+/i.test(trimmed)) return true;
|
||||||
|
if (/^ALTER TABLE.*OWNER TO/i.test(trimmed)) return true;
|
||||||
|
if (/^COMMENT ON/i.test(trimmed)) return true;
|
||||||
|
if (/^SELECT pg_catalog\.setval/i.test(trimmed)) return true;
|
||||||
|
if (/^SELECT .*set_config/i.test(trimmed)) return true;
|
||||||
|
return false;
|
||||||
|
};
|
||||||
|
|
||||||
|
let columns = '';
|
||||||
|
let transformFunc = null;
|
||||||
|
if (tableName === 'cities') {
|
||||||
|
columns = '(id, name, state_id, state_code, country_id, country_code, latitude, longitude, population, timezone, translations, created_at, updated_at, flag, wiki_data_id)';
|
||||||
|
transformFunc = (stmt) => {
|
||||||
|
const fullStmt = transformCitiesInsert(stmt);
|
||||||
|
return extractValues(fullStmt);
|
||||||
|
};
|
||||||
|
} else {
|
||||||
|
transformFunc = (stmt) => extractValues(stmt);
|
||||||
|
}
|
||||||
|
|
||||||
|
const BATCH_SIZE = 2000;
|
||||||
|
let processedCount = 0;
|
||||||
|
let batchCount = 0;
|
||||||
|
let currentBatch = [];
|
||||||
|
|
||||||
|
const flushBatch = async () => {
|
||||||
|
if (currentBatch.length === 0) return;
|
||||||
|
const query = `INSERT INTO ${tableName} ${columns} VALUES ${currentBatch.join(', ')} ON CONFLICT DO NOTHING`;
|
||||||
|
await pool.query(query);
|
||||||
|
batchCount++;
|
||||||
|
currentBatch = [];
|
||||||
|
|
||||||
|
if (batchCount % 10 === 0) {
|
||||||
|
process.stdout.write(`\r ... ${processedCount} rows processed (${batchCount} batches)`);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
const rl = createInterface({
|
||||||
|
input: createReadStream(filePath).pipe(createGunzip()),
|
||||||
|
crlfDelay: Infinity,
|
||||||
|
});
|
||||||
|
|
||||||
|
let statement = '';
|
||||||
|
for await (const line of rl) {
|
||||||
|
if (shouldIgnoreLine(line)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
statement += `${line}\n`;
|
||||||
|
if (!line.trim().endsWith(';')) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
const trimmedStatement = statement.trim();
|
||||||
|
statement = '';
|
||||||
|
|
||||||
|
if (!/^INSERT INTO/i.test(trimmedStatement)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
const values = transformFunc(trimmedStatement);
|
||||||
|
if (!values) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
currentBatch.push(`(${values})`);
|
||||||
|
processedCount++;
|
||||||
|
|
||||||
|
if (currentBatch.length >= BATCH_SIZE) {
|
||||||
|
await flushBatch();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
await flushBatch();
|
||||||
|
console.log('');
|
||||||
|
console.log(` inserted ${processedCount} records into ${tableName}`);
|
||||||
|
return processedCount;
|
||||||
|
} catch (error) {
|
||||||
|
console.error(` Error loading ${filename}:`, error.message);
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Seed all location data from SQL dumps
|
* Seed all location data from SQL dumps
|
||||||
*/
|
*/
|
||||||
|
|
@ -517,7 +616,7 @@ export async function seedDetailedLocations() {
|
||||||
// 5. Cities (~160k records) - This is the big one
|
// 5. Cities (~160k records) - This is the big one
|
||||||
console.log('5️⃣ Seeding Cities (this may take a while)...');
|
console.log('5️⃣ Seeding Cities (this may take a while)...');
|
||||||
console.time(' ⏱️ Cities (Bulk Insert)');
|
console.time(' ⏱️ Cities (Bulk Insert)');
|
||||||
await executeGzippedSqlFile('cities.sql.gz', 'cities');
|
await executeGzippedSqlFileStreamed('cities.sql.gz', 'cities');
|
||||||
console.timeEnd(' ⏱️ Cities (Bulk Insert)');
|
console.timeEnd(' ⏱️ Cities (Bulk Insert)');
|
||||||
|
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue