gohorsejobs/backend/internal/database/database.go
root e1c61289af feat(config): Add MCP JSON bootstrap and unify docs
Enable backend fallback to MCP JSON for database connection when DATABASE_URL is absent.\nAdd credentials bootstrap support from JSON for cloud/external services.\n\nConsolidate documentation with MCP integration guide and unified status.\nUpdate backlog to move video interviews endpoint out of current sprint.

Co-Authored-By: Claude <noreply@anthropic.com>
2026-03-09 20:39:16 +01:00

245 lines
5.8 KiB
Go

package database
import (
"database/sql"
"encoding/json"
"fmt"
"log"
"os"
"path/filepath"
"sort"
"strings"
"time"
_ "github.com/lib/pq"
)
var DB *sql.DB
func InitDB() {
var err error
connStr, err := BuildConnectionString()
if err != nil {
log.Fatalf("Configuration error: %v", err)
}
DB, err = sql.Open("postgres", connStr)
if err != nil {
log.Fatalf("Error opening database: %v", err)
}
// Connection Pool Settings
// Adjust these values based on your production load and database resources
DB.SetMaxOpenConns(25) // Limit total connections
DB.SetMaxIdleConns(25) // Keep connections open for reuse
DB.SetConnMaxLifetime(5 * time.Minute) // Recycle connections every 5 min (avoids stale connections dropped by firewalls)
DB.SetConnMaxIdleTime(5 * time.Minute) // Close idle connections after 5 min
if err = DB.Ping(); err != nil {
log.Fatalf("Error connecting to database: %v", err)
}
log.Println("✅ Successfully connected to the database")
}
func BuildConnectionString() (string, error) {
if dbURL := os.Getenv("DATABASE_URL"); dbURL != "" {
log.Println("Using DATABASE_URL for connection")
return dbURL, nil
}
if mcpPath := strings.TrimSpace(os.Getenv("MCP_JSON_PATH")); mcpPath != "" {
dbURL, err := databaseURLFromMCPJSON(mcpPath)
if err != nil {
return "", fmt.Errorf("failed to load database url from MCP json (%s): %w", mcpPath, err)
}
if dbURL != "" {
log.Printf("Using DATABASE_URL from MCP_JSON_PATH (%s)", mcpPath)
return dbURL, nil
}
log.Printf("MCP_JSON_PATH is set but no database URL was found in %s", mcpPath)
}
return "", fmt.Errorf("DATABASE_URL environment variable not set")
}
func databaseURLFromMCPJSON(path string) (string, error) {
raw, err := os.ReadFile(path)
if err != nil {
return "", err
}
var payload map[string]interface{}
if err := json.Unmarshal(raw, &payload); err != nil {
return "", err
}
candidates := [][]string{
{"database_url"},
{"databaseUrl"},
{"database", "url"},
{"infra", "database_url"},
{"infra", "databaseUrl"},
{"infra", "database", "url"},
}
for _, pathKeys := range candidates {
if val := nestedString(payload, pathKeys...); val != "" {
return val, nil
}
}
return "", nil
}
func nestedString(input map[string]interface{}, keys ...string) string {
var current interface{} = input
for _, key := range keys {
obj, ok := current.(map[string]interface{})
if !ok {
return ""
}
current, ok = obj[key]
if !ok {
return ""
}
}
str, ok := current.(string)
if !ok {
return ""
}
return strings.TrimSpace(str)
}
func RunMigrations() {
migrationDir, err := resolveMigrationDir()
if err != nil {
log.Printf("⚠️ Warning: Could not list migrations directory: %v", err)
return
}
if err := ensureMigrationsTable(); err != nil {
log.Fatalf("❌ Error ensuring migrations table: %v", err)
}
entries, err := os.ReadDir(migrationDir)
if err != nil {
log.Fatalf("❌ Error reading migrations directory: %v", err)
}
var files []os.DirEntry
for _, entry := range entries {
if entry.IsDir() || !strings.HasSuffix(entry.Name(), ".sql") {
continue
}
files = append(files, entry)
}
sort.Slice(files, func(i, j int) bool {
return files[i].Name() < files[j].Name()
})
warnDuplicateMigrationPrefixes(files)
for _, file := range files {
name := file.Name()
if isMigrationApplied(name) {
log.Printf("⏭️ Migration %s skipped (already tracked)", name)
continue
}
path := filepath.Join(migrationDir, name)
content, err := os.ReadFile(path)
if err != nil {
log.Fatalf("❌ Error reading migration file %s: %v", name, err)
}
log.Printf("📦 Running migration: %s", name)
if err := executeMigration(name, string(content)); err != nil {
log.Fatalf("❌ Error running migration %s: %v", name, err)
}
log.Printf("✅ Migration %s executed successfully", name)
}
log.Println("All migrations processed")
}
func resolveMigrationDir() (string, error) {
candidateDirs := []string{"migrations", "../../migrations"}
for _, dir := range candidateDirs {
if _, err := os.ReadDir(dir); err == nil {
return dir, nil
}
}
return "", fmt.Errorf("migrations directory not found in any known location")
}
func ensureMigrationsTable() error {
_, err := DB.Exec(`
CREATE TABLE IF NOT EXISTS schema_migrations (
filename TEXT PRIMARY KEY,
applied_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
)
`)
return err
}
func isMigrationApplied(filename string) bool {
var exists bool
err := DB.QueryRow("SELECT EXISTS(SELECT 1 FROM schema_migrations WHERE filename = $1)", filename).Scan(&exists)
if err != nil {
log.Fatalf("❌ Error checking migration %s: %v", filename, err)
}
return exists
}
func executeMigration(filename, sqlContent string) error {
tx, err := DB.Begin()
if err != nil {
return err
}
defer tx.Rollback()
if _, err := tx.Exec(sqlContent); err != nil {
errStr := strings.ToLower(err.Error())
if !strings.Contains(errStr, "already exists") {
return err
}
log.Printf("⏭️ Migration %s skipped due to existing resources", filename)
tx.Rollback()
tx2, err := DB.Begin()
if err != nil {
return err
}
defer tx2.Rollback()
if _, err := tx2.Exec("INSERT INTO schema_migrations (filename) VALUES ($1)", filename); err != nil {
return err
}
return tx2.Commit()
}
if _, err := tx.Exec("INSERT INTO schema_migrations (filename) VALUES ($1)", filename); err != nil {
return err
}
return tx.Commit()
}
func warnDuplicateMigrationPrefixes(files []os.DirEntry) {
prefixCount := make(map[string]int)
for _, file := range files {
parts := strings.SplitN(file.Name(), "_", 2)
if len(parts) == 2 {
prefixCount[parts[0]]++
}
}
for prefix, count := range prefixCount {
if count > 1 {
log.Printf("⚠️ Duplicate migration prefix detected (%s appears %d times)", prefix, count)
}
}
}