Files
DiunDashboard/pkg/diunwebhook/postgres_store.go
Jean-Luc Makiola 8820a9ef9f feat(03-01): add PostgresStore implementing all 9 Store interface methods
- PostgresStore struct with *sql.DB field (no mutex needed for PostgreSQL)
- NewPostgresStore constructor with pool config: MaxOpenConns(25), MaxIdleConns(5), ConnMaxLifetime(5m)
- UpsertEvent with $1..$15 positional params and ON CONFLICT DO UPDATE
- GetUpdates identical SQL to SQLiteStore (TEXT timestamps, COALESCE)
- AcknowledgeUpdate uses NOW() instead of datetime('now')
- ListTags identical to SQLiteStore
- CreateTag uses RETURNING id (pgx does not support LastInsertId)
- DeleteTag, UnassignTag, TagExists use $1 positional param
- AssignTag uses ON CONFLICT (image) DO UPDATE SET tag_id = EXCLUDED.tag_id
2026-03-24 09:09:29 +01:00

177 lines
5.8 KiB
Go

package diunwebhook
import (
"database/sql"
"time"
)
// PostgresStore implements Store using a PostgreSQL database.
type PostgresStore struct {
db *sql.DB
}
// NewPostgresStore creates a new PostgresStore backed by the given *sql.DB.
// Configures connection pool settings appropriate for PostgreSQL.
// PostgreSQL handles concurrent writes natively so no mutex is needed.
func NewPostgresStore(db *sql.DB) *PostgresStore {
db.SetMaxOpenConns(25)
db.SetMaxIdleConns(5)
db.SetConnMaxLifetime(5 * time.Minute)
return &PostgresStore{db: db}
}
// UpsertEvent inserts or updates a DIUN event in the updates table.
// On conflict (same image), all fields are updated and acknowledged_at is reset to NULL.
func (s *PostgresStore) UpsertEvent(event DiunEvent) error {
_, err := s.db.Exec(`
INSERT INTO updates (
image, diun_version, hostname, status, provider,
hub_link, mime_type, digest, created, platform,
ctn_name, ctn_id, ctn_state, ctn_status,
received_at, acknowledged_at
) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,NULL)
ON CONFLICT(image) DO UPDATE SET
diun_version = EXCLUDED.diun_version,
hostname = EXCLUDED.hostname,
status = EXCLUDED.status,
provider = EXCLUDED.provider,
hub_link = EXCLUDED.hub_link,
mime_type = EXCLUDED.mime_type,
digest = EXCLUDED.digest,
created = EXCLUDED.created,
platform = EXCLUDED.platform,
ctn_name = EXCLUDED.ctn_name,
ctn_id = EXCLUDED.ctn_id,
ctn_state = EXCLUDED.ctn_state,
ctn_status = EXCLUDED.ctn_status,
received_at = EXCLUDED.received_at,
acknowledged_at = NULL`,
event.Image, event.DiunVersion, event.Hostname, event.Status, event.Provider,
event.HubLink, event.MimeType, event.Digest,
event.Created.Format(time.RFC3339), event.Platform,
event.Metadata.ContainerName, event.Metadata.ContainerID,
event.Metadata.State, event.Metadata.Status,
time.Now().Format(time.RFC3339),
)
return err
}
// GetUpdates returns all update entries joined with their tag assignments.
func (s *PostgresStore) GetUpdates() (map[string]UpdateEntry, error) {
rows, err := s.db.Query(`SELECT u.image, u.diun_version, u.hostname, u.status, u.provider,
u.hub_link, u.mime_type, u.digest, u.created, u.platform,
u.ctn_name, u.ctn_id, u.ctn_state, u.ctn_status, u.received_at, COALESCE(u.acknowledged_at, ''),
t.id, t.name
FROM updates u
LEFT JOIN tag_assignments ta ON u.image = ta.image
LEFT JOIN tags t ON ta.tag_id = t.id`)
if err != nil {
return nil, err
}
defer rows.Close()
result := make(map[string]UpdateEntry)
for rows.Next() {
var e UpdateEntry
var createdStr, receivedStr, acknowledgedAt string
var tagID sql.NullInt64
var tagName sql.NullString
err := rows.Scan(&e.Event.Image, &e.Event.DiunVersion, &e.Event.Hostname,
&e.Event.Status, &e.Event.Provider, &e.Event.HubLink, &e.Event.MimeType,
&e.Event.Digest, &createdStr, &e.Event.Platform,
&e.Event.Metadata.ContainerName, &e.Event.Metadata.ContainerID,
&e.Event.Metadata.State, &e.Event.Metadata.Status,
&receivedStr, &acknowledgedAt, &tagID, &tagName)
if err != nil {
return nil, err
}
e.Event.Created, _ = time.Parse(time.RFC3339, createdStr)
e.ReceivedAt, _ = time.Parse(time.RFC3339, receivedStr)
e.Acknowledged = acknowledgedAt != ""
if tagID.Valid && tagName.Valid {
e.Tag = &Tag{ID: int(tagID.Int64), Name: tagName.String}
}
result[e.Event.Image] = e
}
return result, rows.Err()
}
// AcknowledgeUpdate marks the given image as acknowledged.
// Returns found=false if no row with that image exists.
func (s *PostgresStore) AcknowledgeUpdate(image string) (found bool, err error) {
res, err := s.db.Exec(`UPDATE updates SET acknowledged_at = NOW() WHERE image = $1`, image)
if err != nil {
return false, err
}
n, _ := res.RowsAffected()
return n > 0, nil
}
// ListTags returns all tags ordered by name.
func (s *PostgresStore) ListTags() ([]Tag, error) {
rows, err := s.db.Query(`SELECT id, name FROM tags ORDER BY name`)
if err != nil {
return nil, err
}
defer rows.Close()
tags := []Tag{}
for rows.Next() {
var t Tag
if err := rows.Scan(&t.ID, &t.Name); err != nil {
return nil, err
}
tags = append(tags, t)
}
return tags, rows.Err()
}
// CreateTag inserts a new tag with the given name and returns the created tag.
// Uses RETURNING id since pgx does not support LastInsertId.
func (s *PostgresStore) CreateTag(name string) (Tag, error) {
var id int
err := s.db.QueryRow(
`INSERT INTO tags (name) VALUES ($1) RETURNING id`, name,
).Scan(&id)
if err != nil {
return Tag{}, err
}
return Tag{ID: id, Name: name}, nil
}
// DeleteTag deletes the tag with the given id.
// Returns found=false if no tag with that id exists.
func (s *PostgresStore) DeleteTag(id int) (found bool, err error) {
res, err := s.db.Exec(`DELETE FROM tags WHERE id = $1`, id)
if err != nil {
return false, err
}
n, _ := res.RowsAffected()
return n > 0, nil
}
// AssignTag assigns the given image to the given tag.
// Uses INSERT ... ON CONFLICT DO UPDATE so re-assigning an image to a different tag replaces the existing assignment.
func (s *PostgresStore) AssignTag(image string, tagID int) error {
_, err := s.db.Exec(
`INSERT INTO tag_assignments (image, tag_id) VALUES ($1, $2)
ON CONFLICT (image) DO UPDATE SET tag_id = EXCLUDED.tag_id`,
image, tagID,
)
return err
}
// UnassignTag removes any tag assignment for the given image.
func (s *PostgresStore) UnassignTag(image string) error {
_, err := s.db.Exec(`DELETE FROM tag_assignments WHERE image = $1`, image)
return err
}
// TagExists returns true if a tag with the given id exists.
func (s *PostgresStore) TagExists(id int) (bool, error) {
var count int
err := s.db.QueryRow(`SELECT COUNT(*) FROM tags WHERE id = $1`, id).Scan(&count)
if err != nil {
return false, err
}
return count > 0, nil
}