Init
This commit is contained in:
82
backend/internal/db/db.go
Normal file
82
backend/internal/db/db.go
Normal file
@@ -0,0 +1,82 @@
|
||||
package db
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io/fs"
|
||||
"sort"
|
||||
"strings"
|
||||
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
)
|
||||
|
||||
func Connect(ctx context.Context, databaseURL string) (*pgxpool.Pool, error) {
|
||||
pool, err := pgxpool.New(ctx, databaseURL)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("connecting to database: %w", err)
|
||||
}
|
||||
if err := pool.Ping(ctx); err != nil {
|
||||
pool.Close()
|
||||
return nil, fmt.Errorf("pinging database: %w", err)
|
||||
}
|
||||
return pool, nil
|
||||
}
|
||||
|
||||
func RunMigrations(ctx context.Context, pool *pgxpool.Pool, migrationsFS fs.FS) error {
|
||||
entries, err := fs.ReadDir(migrationsFS, ".")
|
||||
if err != nil {
|
||||
return fmt.Errorf("reading migrations directory: %w", err)
|
||||
}
|
||||
|
||||
sort.Slice(entries, func(i, j int) bool {
|
||||
return entries[i].Name() < entries[j].Name()
|
||||
})
|
||||
|
||||
for _, entry := range entries {
|
||||
if entry.IsDir() || !strings.HasSuffix(entry.Name(), ".sql") {
|
||||
continue
|
||||
}
|
||||
|
||||
var version int
|
||||
fmt.Sscanf(entry.Name(), "%d", &version)
|
||||
|
||||
var exists bool
|
||||
err := pool.QueryRow(ctx,
|
||||
"SELECT EXISTS(SELECT 1 FROM schema_migrations WHERE version = $1)", version,
|
||||
).Scan(&exists)
|
||||
if err != nil {
|
||||
exists = false
|
||||
}
|
||||
if exists {
|
||||
continue
|
||||
}
|
||||
|
||||
content, err := fs.ReadFile(migrationsFS, entry.Name())
|
||||
if err != nil {
|
||||
return fmt.Errorf("reading migration %s: %w", entry.Name(), err)
|
||||
}
|
||||
|
||||
tx, err := pool.Begin(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("beginning transaction for %s: %w", entry.Name(), err)
|
||||
}
|
||||
|
||||
if _, err := tx.Exec(ctx, string(content)); err != nil {
|
||||
tx.Rollback(ctx)
|
||||
return fmt.Errorf("executing migration %s: %w", entry.Name(), err)
|
||||
}
|
||||
|
||||
if _, err := tx.Exec(ctx,
|
||||
"INSERT INTO schema_migrations (version) VALUES ($1)", version,
|
||||
); err != nil {
|
||||
tx.Rollback(ctx)
|
||||
return fmt.Errorf("recording migration %s: %w", entry.Name(), err)
|
||||
}
|
||||
|
||||
if err := tx.Commit(ctx); err != nil {
|
||||
return fmt.Errorf("committing migration %s: %w", entry.Name(), err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
Reference in New Issue
Block a user