diff --git a/pkg/diunwebhook/postgres_store.go b/pkg/diunwebhook/postgres_store.go new file mode 100644 index 0000000..dc95eb3 --- /dev/null +++ b/pkg/diunwebhook/postgres_store.go @@ -0,0 +1,176 @@ +package diunwebhook + +import ( + "database/sql" + "time" +) + +// PostgresStore implements Store using a PostgreSQL database. +type PostgresStore struct { + db *sql.DB +} + +// NewPostgresStore creates a new PostgresStore backed by the given *sql.DB. +// Configures connection pool settings appropriate for PostgreSQL. +// PostgreSQL handles concurrent writes natively so no mutex is needed. +func NewPostgresStore(db *sql.DB) *PostgresStore { + db.SetMaxOpenConns(25) + db.SetMaxIdleConns(5) + db.SetConnMaxLifetime(5 * time.Minute) + return &PostgresStore{db: db} +} + +// UpsertEvent inserts or updates a DIUN event in the updates table. +// On conflict (same image), all fields are updated and acknowledged_at is reset to NULL. +func (s *PostgresStore) UpsertEvent(event DiunEvent) error { + _, err := s.db.Exec(` + INSERT INTO updates ( + image, diun_version, hostname, status, provider, + hub_link, mime_type, digest, created, platform, + ctn_name, ctn_id, ctn_state, ctn_status, + received_at, acknowledged_at + ) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,NULL) + ON CONFLICT(image) DO UPDATE SET + diun_version = EXCLUDED.diun_version, + hostname = EXCLUDED.hostname, + status = EXCLUDED.status, + provider = EXCLUDED.provider, + hub_link = EXCLUDED.hub_link, + mime_type = EXCLUDED.mime_type, + digest = EXCLUDED.digest, + created = EXCLUDED.created, + platform = EXCLUDED.platform, + ctn_name = EXCLUDED.ctn_name, + ctn_id = EXCLUDED.ctn_id, + ctn_state = EXCLUDED.ctn_state, + ctn_status = EXCLUDED.ctn_status, + received_at = EXCLUDED.received_at, + acknowledged_at = NULL`, + event.Image, event.DiunVersion, event.Hostname, event.Status, event.Provider, + event.HubLink, event.MimeType, event.Digest, + event.Created.Format(time.RFC3339), event.Platform, + event.Metadata.ContainerName, event.Metadata.ContainerID, + event.Metadata.State, event.Metadata.Status, + time.Now().Format(time.RFC3339), + ) + return err +} + +// GetUpdates returns all update entries joined with their tag assignments. +func (s *PostgresStore) GetUpdates() (map[string]UpdateEntry, error) { + rows, err := s.db.Query(`SELECT u.image, u.diun_version, u.hostname, u.status, u.provider, + u.hub_link, u.mime_type, u.digest, u.created, u.platform, + u.ctn_name, u.ctn_id, u.ctn_state, u.ctn_status, u.received_at, COALESCE(u.acknowledged_at, ''), + t.id, t.name + FROM updates u + LEFT JOIN tag_assignments ta ON u.image = ta.image + LEFT JOIN tags t ON ta.tag_id = t.id`) + if err != nil { + return nil, err + } + defer rows.Close() + result := make(map[string]UpdateEntry) + for rows.Next() { + var e UpdateEntry + var createdStr, receivedStr, acknowledgedAt string + var tagID sql.NullInt64 + var tagName sql.NullString + err := rows.Scan(&e.Event.Image, &e.Event.DiunVersion, &e.Event.Hostname, + &e.Event.Status, &e.Event.Provider, &e.Event.HubLink, &e.Event.MimeType, + &e.Event.Digest, &createdStr, &e.Event.Platform, + &e.Event.Metadata.ContainerName, &e.Event.Metadata.ContainerID, + &e.Event.Metadata.State, &e.Event.Metadata.Status, + &receivedStr, &acknowledgedAt, &tagID, &tagName) + if err != nil { + return nil, err + } + e.Event.Created, _ = time.Parse(time.RFC3339, createdStr) + e.ReceivedAt, _ = time.Parse(time.RFC3339, receivedStr) + e.Acknowledged = acknowledgedAt != "" + if tagID.Valid && tagName.Valid { + e.Tag = &Tag{ID: int(tagID.Int64), Name: tagName.String} + } + result[e.Event.Image] = e + } + return result, rows.Err() +} + +// AcknowledgeUpdate marks the given image as acknowledged. +// Returns found=false if no row with that image exists. +func (s *PostgresStore) AcknowledgeUpdate(image string) (found bool, err error) { + res, err := s.db.Exec(`UPDATE updates SET acknowledged_at = NOW() WHERE image = $1`, image) + if err != nil { + return false, err + } + n, _ := res.RowsAffected() + return n > 0, nil +} + +// ListTags returns all tags ordered by name. +func (s *PostgresStore) ListTags() ([]Tag, error) { + rows, err := s.db.Query(`SELECT id, name FROM tags ORDER BY name`) + if err != nil { + return nil, err + } + defer rows.Close() + tags := []Tag{} + for rows.Next() { + var t Tag + if err := rows.Scan(&t.ID, &t.Name); err != nil { + return nil, err + } + tags = append(tags, t) + } + return tags, rows.Err() +} + +// CreateTag inserts a new tag with the given name and returns the created tag. +// Uses RETURNING id since pgx does not support LastInsertId. +func (s *PostgresStore) CreateTag(name string) (Tag, error) { + var id int + err := s.db.QueryRow( + `INSERT INTO tags (name) VALUES ($1) RETURNING id`, name, + ).Scan(&id) + if err != nil { + return Tag{}, err + } + return Tag{ID: id, Name: name}, nil +} + +// DeleteTag deletes the tag with the given id. +// Returns found=false if no tag with that id exists. +func (s *PostgresStore) DeleteTag(id int) (found bool, err error) { + res, err := s.db.Exec(`DELETE FROM tags WHERE id = $1`, id) + if err != nil { + return false, err + } + n, _ := res.RowsAffected() + return n > 0, nil +} + +// AssignTag assigns the given image to the given tag. +// Uses INSERT ... ON CONFLICT DO UPDATE so re-assigning an image to a different tag replaces the existing assignment. +func (s *PostgresStore) AssignTag(image string, tagID int) error { + _, err := s.db.Exec( + `INSERT INTO tag_assignments (image, tag_id) VALUES ($1, $2) + ON CONFLICT (image) DO UPDATE SET tag_id = EXCLUDED.tag_id`, + image, tagID, + ) + return err +} + +// UnassignTag removes any tag assignment for the given image. +func (s *PostgresStore) UnassignTag(image string) error { + _, err := s.db.Exec(`DELETE FROM tag_assignments WHERE image = $1`, image) + return err +} + +// TagExists returns true if a tag with the given id exists. +func (s *PostgresStore) TagExists(id int) (bool, error) { + var count int + err := s.db.QueryRow(`SELECT COUNT(*) FROM tags WHERE id = $1`, id).Scan(&count) + if err != nil { + return false, err + } + return count > 0, nil +}