diff --git a/seeder-api/src/seeders/location-loader.js b/seeder-api/src/seeders/location-loader.js index 790b0e9..fc781a2 100644 --- a/seeder-api/src/seeders/location-loader.js +++ b/seeder-api/src/seeders/location-loader.js @@ -3,6 +3,7 @@ import { readFileSync, createReadStream } from 'fs'; import { createGunzip } from 'zlib'; import { pipeline } from 'stream/promises'; import { Writable } from 'stream'; +import { createInterface } from 'readline'; import { dirname, join } from 'path'; import { fileURLToPath } from 'url'; @@ -465,6 +466,104 @@ async function executeGzippedSqlFile(filename, tableName) { } } +async function executeGzippedSqlFileStreamed(filename, tableName) { + const filePath = join(SQL_DIR, filename); + console.log(` Loading ${filename} (gzipped, streamed)...`); + + try { + const extractValues = (stmt) => { + const match = stmt.match(/VALUES\s*\((.+)\);?$/is); + if (!match) return null; + return match[1]; + }; + + const shouldIgnoreLine = (line) => { + const trimmed = line.trim(); + if (!trimmed || trimmed.startsWith('--')) return true; + if (trimmed.startsWith('\\restrict') || trimmed.startsWith('\\unrestrict')) return true; + if (/^SET\s+[a-z_]+/i.test(trimmed)) return true; + if (/^ALTER TABLE.*OWNER TO/i.test(trimmed)) return true; + if (/^COMMENT ON/i.test(trimmed)) return true; + if (/^SELECT pg_catalog\.setval/i.test(trimmed)) return true; + if (/^SELECT .*set_config/i.test(trimmed)) return true; + return false; + }; + + let columns = ''; + let transformFunc = null; + if (tableName === 'cities') { + columns = '(id, name, state_id, state_code, country_id, country_code, latitude, longitude, population, timezone, translations, created_at, updated_at, flag, wiki_data_id)'; + transformFunc = (stmt) => { + const fullStmt = transformCitiesInsert(stmt); + return extractValues(fullStmt); + }; + } else { + transformFunc = (stmt) => extractValues(stmt); + } + + const BATCH_SIZE = 2000; + let processedCount = 0; + let batchCount = 0; + let currentBatch = []; + + const flushBatch = async () => { + if (currentBatch.length === 0) return; + const query = `INSERT INTO ${tableName} ${columns} VALUES ${currentBatch.join(', ')} ON CONFLICT DO NOTHING`; + await pool.query(query); + batchCount++; + currentBatch = []; + + if (batchCount % 10 === 0) { + process.stdout.write(`\r ... ${processedCount} rows processed (${batchCount} batches)`); + } + }; + + const rl = createInterface({ + input: createReadStream(filePath).pipe(createGunzip()), + crlfDelay: Infinity, + }); + + let statement = ''; + for await (const line of rl) { + if (shouldIgnoreLine(line)) { + continue; + } + + statement += `${line}\n`; + if (!line.trim().endsWith(';')) { + continue; + } + + const trimmedStatement = statement.trim(); + statement = ''; + + if (!/^INSERT INTO/i.test(trimmedStatement)) { + continue; + } + + const values = transformFunc(trimmedStatement); + if (!values) { + continue; + } + + currentBatch.push(`(${values})`); + processedCount++; + + if (currentBatch.length >= BATCH_SIZE) { + await flushBatch(); + } + } + + await flushBatch(); + console.log(''); + console.log(` inserted ${processedCount} records into ${tableName}`); + return processedCount; + } catch (error) { + console.error(` Error loading ${filename}:`, error.message); + throw error; + } +} + /** * Seed all location data from SQL dumps */ @@ -517,7 +616,7 @@ export async function seedDetailedLocations() { // 5. Cities (~160k records) - This is the big one console.log('5️⃣ Seeding Cities (this may take a while)...'); console.time(' ⏱️ Cities (Bulk Insert)'); - await executeGzippedSqlFile('cities.sql.gz', 'cities'); + await executeGzippedSqlFileStreamed('cities.sql.gz', 'cities'); console.timeEnd(' ⏱️ Cities (Bulk Insert)'); } catch (error) {