package diunwebhook import ( "database/sql" "time" ) // PostgresStore implements Store using a PostgreSQL database. type PostgresStore struct { db *sql.DB } // NewPostgresStore creates a new PostgresStore backed by the given *sql.DB. // Configures connection pool settings appropriate for PostgreSQL. // PostgreSQL handles concurrent writes natively so no mutex is needed. func NewPostgresStore(db *sql.DB) *PostgresStore { db.SetMaxOpenConns(25) db.SetMaxIdleConns(5) db.SetConnMaxLifetime(5 * time.Minute) return &PostgresStore{db: db} } // UpsertEvent inserts or updates a DIUN event in the updates table. // On conflict (same image), all fields are updated and acknowledged_at is reset to NULL. func (s *PostgresStore) UpsertEvent(event DiunEvent) error { _, err := s.db.Exec(` INSERT INTO updates ( image, diun_version, hostname, status, provider, hub_link, mime_type, digest, created, platform, ctn_name, ctn_id, ctn_state, ctn_status, received_at, acknowledged_at ) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,NULL) ON CONFLICT(image) DO UPDATE SET diun_version = EXCLUDED.diun_version, hostname = EXCLUDED.hostname, status = EXCLUDED.status, provider = EXCLUDED.provider, hub_link = EXCLUDED.hub_link, mime_type = EXCLUDED.mime_type, digest = EXCLUDED.digest, created = EXCLUDED.created, platform = EXCLUDED.platform, ctn_name = EXCLUDED.ctn_name, ctn_id = EXCLUDED.ctn_id, ctn_state = EXCLUDED.ctn_state, ctn_status = EXCLUDED.ctn_status, received_at = EXCLUDED.received_at, acknowledged_at = NULL`, event.Image, event.DiunVersion, event.Hostname, event.Status, event.Provider, event.HubLink, event.MimeType, event.Digest, event.Created.Format(time.RFC3339), event.Platform, event.Metadata.ContainerName, event.Metadata.ContainerID, event.Metadata.State, event.Metadata.Status, time.Now().Format(time.RFC3339), ) return err } // GetUpdates returns all update entries joined with their tag assignments. func (s *PostgresStore) GetUpdates() (map[string]UpdateEntry, error) { rows, err := s.db.Query(`SELECT u.image, u.diun_version, u.hostname, u.status, u.provider, u.hub_link, u.mime_type, u.digest, u.created, u.platform, u.ctn_name, u.ctn_id, u.ctn_state, u.ctn_status, u.received_at, COALESCE(u.acknowledged_at, ''), t.id, t.name FROM updates u LEFT JOIN tag_assignments ta ON u.image = ta.image LEFT JOIN tags t ON ta.tag_id = t.id`) if err != nil { return nil, err } defer rows.Close() result := make(map[string]UpdateEntry) for rows.Next() { var e UpdateEntry var createdStr, receivedStr, acknowledgedAt string var tagID sql.NullInt64 var tagName sql.NullString err := rows.Scan(&e.Event.Image, &e.Event.DiunVersion, &e.Event.Hostname, &e.Event.Status, &e.Event.Provider, &e.Event.HubLink, &e.Event.MimeType, &e.Event.Digest, &createdStr, &e.Event.Platform, &e.Event.Metadata.ContainerName, &e.Event.Metadata.ContainerID, &e.Event.Metadata.State, &e.Event.Metadata.Status, &receivedStr, &acknowledgedAt, &tagID, &tagName) if err != nil { return nil, err } e.Event.Created, _ = time.Parse(time.RFC3339, createdStr) e.ReceivedAt, _ = time.Parse(time.RFC3339, receivedStr) e.Acknowledged = acknowledgedAt != "" if tagID.Valid && tagName.Valid { e.Tag = &Tag{ID: int(tagID.Int64), Name: tagName.String} } result[e.Event.Image] = e } return result, rows.Err() } // AcknowledgeUpdate marks the given image as acknowledged. // Returns found=false if no row with that image exists. func (s *PostgresStore) AcknowledgeUpdate(image string) (found bool, err error) { res, err := s.db.Exec(`UPDATE updates SET acknowledged_at = NOW() WHERE image = $1`, image) if err != nil { return false, err } n, _ := res.RowsAffected() return n > 0, nil } // ListTags returns all tags ordered by name. func (s *PostgresStore) ListTags() ([]Tag, error) { rows, err := s.db.Query(`SELECT id, name FROM tags ORDER BY name`) if err != nil { return nil, err } defer rows.Close() tags := []Tag{} for rows.Next() { var t Tag if err := rows.Scan(&t.ID, &t.Name); err != nil { return nil, err } tags = append(tags, t) } return tags, rows.Err() } // CreateTag inserts a new tag with the given name and returns the created tag. // Uses RETURNING id since pgx does not support LastInsertId. func (s *PostgresStore) CreateTag(name string) (Tag, error) { var id int err := s.db.QueryRow( `INSERT INTO tags (name) VALUES ($1) RETURNING id`, name, ).Scan(&id) if err != nil { return Tag{}, err } return Tag{ID: id, Name: name}, nil } // DeleteTag deletes the tag with the given id. // Returns found=false if no tag with that id exists. func (s *PostgresStore) DeleteTag(id int) (found bool, err error) { res, err := s.db.Exec(`DELETE FROM tags WHERE id = $1`, id) if err != nil { return false, err } n, _ := res.RowsAffected() return n > 0, nil } // AssignTag assigns the given image to the given tag. // Uses INSERT ... ON CONFLICT DO UPDATE so re-assigning an image to a different tag replaces the existing assignment. func (s *PostgresStore) AssignTag(image string, tagID int) error { _, err := s.db.Exec( `INSERT INTO tag_assignments (image, tag_id) VALUES ($1, $2) ON CONFLICT (image) DO UPDATE SET tag_id = EXCLUDED.tag_id`, image, tagID, ) return err } // UnassignTag removes any tag assignment for the given image. func (s *PostgresStore) UnassignTag(image string) error { _, err := s.db.Exec(`DELETE FROM tag_assignments WHERE image = $1`, image) return err } // TagExists returns true if a tag with the given id exists. func (s *PostgresStore) TagExists(id int) (bool, error) { var count int err := s.db.QueryRow(`SELECT COUNT(*) FROM tags WHERE id = $1`, id).Scan(&count) if err != nil { return false, err } return count > 0, nil }