Enable backend fallback to MCP JSON for database connection when DATABASE_URL is absent.\nAdd credentials bootstrap support from JSON for cloud/external services.\n\nConsolidate documentation with MCP integration guide and unified status.\nUpdate backlog to move video interviews endpoint out of current sprint. Co-Authored-By: Claude <noreply@anthropic.com>
245 lines
5.8 KiB
Go
245 lines
5.8 KiB
Go
package database
|
|
|
|
import (
|
|
"database/sql"
|
|
"encoding/json"
|
|
"fmt"
|
|
"log"
|
|
"os"
|
|
"path/filepath"
|
|
"sort"
|
|
"strings"
|
|
"time"
|
|
|
|
_ "github.com/lib/pq"
|
|
)
|
|
|
|
var DB *sql.DB
|
|
|
|
func InitDB() {
|
|
var err error
|
|
connStr, err := BuildConnectionString()
|
|
if err != nil {
|
|
log.Fatalf("Configuration error: %v", err)
|
|
}
|
|
|
|
DB, err = sql.Open("postgres", connStr)
|
|
if err != nil {
|
|
log.Fatalf("Error opening database: %v", err)
|
|
}
|
|
|
|
// Connection Pool Settings
|
|
// Adjust these values based on your production load and database resources
|
|
DB.SetMaxOpenConns(25) // Limit total connections
|
|
DB.SetMaxIdleConns(25) // Keep connections open for reuse
|
|
DB.SetConnMaxLifetime(5 * time.Minute) // Recycle connections every 5 min (avoids stale connections dropped by firewalls)
|
|
DB.SetConnMaxIdleTime(5 * time.Minute) // Close idle connections after 5 min
|
|
|
|
if err = DB.Ping(); err != nil {
|
|
log.Fatalf("Error connecting to database: %v", err)
|
|
}
|
|
|
|
log.Println("✅ Successfully connected to the database")
|
|
}
|
|
|
|
func BuildConnectionString() (string, error) {
|
|
if dbURL := os.Getenv("DATABASE_URL"); dbURL != "" {
|
|
log.Println("Using DATABASE_URL for connection")
|
|
return dbURL, nil
|
|
}
|
|
|
|
if mcpPath := strings.TrimSpace(os.Getenv("MCP_JSON_PATH")); mcpPath != "" {
|
|
dbURL, err := databaseURLFromMCPJSON(mcpPath)
|
|
if err != nil {
|
|
return "", fmt.Errorf("failed to load database url from MCP json (%s): %w", mcpPath, err)
|
|
}
|
|
if dbURL != "" {
|
|
log.Printf("Using DATABASE_URL from MCP_JSON_PATH (%s)", mcpPath)
|
|
return dbURL, nil
|
|
}
|
|
log.Printf("MCP_JSON_PATH is set but no database URL was found in %s", mcpPath)
|
|
}
|
|
|
|
return "", fmt.Errorf("DATABASE_URL environment variable not set")
|
|
}
|
|
|
|
func databaseURLFromMCPJSON(path string) (string, error) {
|
|
raw, err := os.ReadFile(path)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
var payload map[string]interface{}
|
|
if err := json.Unmarshal(raw, &payload); err != nil {
|
|
return "", err
|
|
}
|
|
|
|
candidates := [][]string{
|
|
{"database_url"},
|
|
{"databaseUrl"},
|
|
{"database", "url"},
|
|
{"infra", "database_url"},
|
|
{"infra", "databaseUrl"},
|
|
{"infra", "database", "url"},
|
|
}
|
|
|
|
for _, pathKeys := range candidates {
|
|
if val := nestedString(payload, pathKeys...); val != "" {
|
|
return val, nil
|
|
}
|
|
}
|
|
|
|
return "", nil
|
|
}
|
|
|
|
func nestedString(input map[string]interface{}, keys ...string) string {
|
|
var current interface{} = input
|
|
for _, key := range keys {
|
|
obj, ok := current.(map[string]interface{})
|
|
if !ok {
|
|
return ""
|
|
}
|
|
current, ok = obj[key]
|
|
if !ok {
|
|
return ""
|
|
}
|
|
}
|
|
|
|
str, ok := current.(string)
|
|
if !ok {
|
|
return ""
|
|
}
|
|
return strings.TrimSpace(str)
|
|
}
|
|
|
|
func RunMigrations() {
|
|
migrationDir, err := resolveMigrationDir()
|
|
if err != nil {
|
|
log.Printf("⚠️ Warning: Could not list migrations directory: %v", err)
|
|
return
|
|
}
|
|
|
|
if err := ensureMigrationsTable(); err != nil {
|
|
log.Fatalf("❌ Error ensuring migrations table: %v", err)
|
|
}
|
|
|
|
entries, err := os.ReadDir(migrationDir)
|
|
if err != nil {
|
|
log.Fatalf("❌ Error reading migrations directory: %v", err)
|
|
}
|
|
|
|
var files []os.DirEntry
|
|
for _, entry := range entries {
|
|
if entry.IsDir() || !strings.HasSuffix(entry.Name(), ".sql") {
|
|
continue
|
|
}
|
|
files = append(files, entry)
|
|
}
|
|
|
|
sort.Slice(files, func(i, j int) bool {
|
|
return files[i].Name() < files[j].Name()
|
|
})
|
|
|
|
warnDuplicateMigrationPrefixes(files)
|
|
|
|
for _, file := range files {
|
|
name := file.Name()
|
|
if isMigrationApplied(name) {
|
|
log.Printf("⏭️ Migration %s skipped (already tracked)", name)
|
|
continue
|
|
}
|
|
|
|
path := filepath.Join(migrationDir, name)
|
|
content, err := os.ReadFile(path)
|
|
if err != nil {
|
|
log.Fatalf("❌ Error reading migration file %s: %v", name, err)
|
|
}
|
|
|
|
log.Printf("📦 Running migration: %s", name)
|
|
if err := executeMigration(name, string(content)); err != nil {
|
|
log.Fatalf("❌ Error running migration %s: %v", name, err)
|
|
}
|
|
log.Printf("✅ Migration %s executed successfully", name)
|
|
}
|
|
|
|
log.Println("All migrations processed")
|
|
}
|
|
|
|
func resolveMigrationDir() (string, error) {
|
|
candidateDirs := []string{"migrations", "../../migrations"}
|
|
|
|
for _, dir := range candidateDirs {
|
|
if _, err := os.ReadDir(dir); err == nil {
|
|
return dir, nil
|
|
}
|
|
}
|
|
|
|
return "", fmt.Errorf("migrations directory not found in any known location")
|
|
}
|
|
|
|
func ensureMigrationsTable() error {
|
|
_, err := DB.Exec(`
|
|
CREATE TABLE IF NOT EXISTS schema_migrations (
|
|
filename TEXT PRIMARY KEY,
|
|
applied_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
|
|
)
|
|
`)
|
|
return err
|
|
}
|
|
|
|
func isMigrationApplied(filename string) bool {
|
|
var exists bool
|
|
err := DB.QueryRow("SELECT EXISTS(SELECT 1 FROM schema_migrations WHERE filename = $1)", filename).Scan(&exists)
|
|
if err != nil {
|
|
log.Fatalf("❌ Error checking migration %s: %v", filename, err)
|
|
}
|
|
return exists
|
|
}
|
|
|
|
func executeMigration(filename, sqlContent string) error {
|
|
tx, err := DB.Begin()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer tx.Rollback()
|
|
|
|
if _, err := tx.Exec(sqlContent); err != nil {
|
|
errStr := strings.ToLower(err.Error())
|
|
if !strings.Contains(errStr, "already exists") {
|
|
return err
|
|
}
|
|
log.Printf("⏭️ Migration %s skipped due to existing resources", filename)
|
|
tx.Rollback()
|
|
tx2, err := DB.Begin()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer tx2.Rollback()
|
|
if _, err := tx2.Exec("INSERT INTO schema_migrations (filename) VALUES ($1)", filename); err != nil {
|
|
return err
|
|
}
|
|
return tx2.Commit()
|
|
}
|
|
|
|
if _, err := tx.Exec("INSERT INTO schema_migrations (filename) VALUES ($1)", filename); err != nil {
|
|
return err
|
|
}
|
|
|
|
return tx.Commit()
|
|
}
|
|
|
|
func warnDuplicateMigrationPrefixes(files []os.DirEntry) {
|
|
prefixCount := make(map[string]int)
|
|
for _, file := range files {
|
|
parts := strings.SplitN(file.Name(), "_", 2)
|
|
if len(parts) == 2 {
|
|
prefixCount[parts[0]]++
|
|
}
|
|
}
|
|
|
|
for prefix, count := range prefixCount {
|
|
if count > 1 {
|
|
log.Printf("⚠️ Duplicate migration prefix detected (%s appears %d times)", prefix, count)
|
|
}
|
|
}
|
|
}
|