package db import ( "context" "fmt" "io/fs" "sort" "strings" "github.com/jackc/pgx/v5/pgxpool" ) func Connect(ctx context.Context, databaseURL string) (*pgxpool.Pool, error) { pool, err := pgxpool.New(ctx, databaseURL) if err != nil { return nil, fmt.Errorf("connecting to database: %w", err) } if err := pool.Ping(ctx); err != nil { pool.Close() return nil, fmt.Errorf("pinging database: %w", err) } return pool, nil } func RunMigrations(ctx context.Context, pool *pgxpool.Pool, migrationsFS fs.FS) error { entries, err := fs.ReadDir(migrationsFS, ".") if err != nil { return fmt.Errorf("reading migrations directory: %w", err) } sort.Slice(entries, func(i, j int) bool { return entries[i].Name() < entries[j].Name() }) for _, entry := range entries { if entry.IsDir() || !strings.HasSuffix(entry.Name(), ".sql") { continue } var version int fmt.Sscanf(entry.Name(), "%d", &version) var exists bool err := pool.QueryRow(ctx, "SELECT EXISTS(SELECT 1 FROM schema_migrations WHERE version = $1)", version, ).Scan(&exists) if err != nil { exists = false } if exists { continue } content, err := fs.ReadFile(migrationsFS, entry.Name()) if err != nil { return fmt.Errorf("reading migration %s: %w", entry.Name(), err) } tx, err := pool.Begin(ctx) if err != nil { return fmt.Errorf("beginning transaction for %s: %w", entry.Name(), err) } if _, err := tx.Exec(ctx, string(content)); err != nil { tx.Rollback(ctx) return fmt.Errorf("executing migration %s: %w", entry.Name(), err) } if _, err := tx.Exec(ctx, "INSERT INTO schema_migrations (version) VALUES ($1)", version, ); err != nil { tx.Rollback(ctx) return fmt.Errorf("recording migration %s: %w", entry.Name(), err) } if err := tx.Commit(ctx); err != nil { return fmt.Errorf("committing migration %s: %w", entry.Name(), err) } } return nil }