- PostgresStore struct with *sql.DB field (no mutex needed for PostgreSQL)
- NewPostgresStore constructor with pool config: MaxOpenConns(25), MaxIdleConns(5), ConnMaxLifetime(5m)
- UpsertEvent with $1..$15 positional params and ON CONFLICT DO UPDATE
- GetUpdates identical SQL to SQLiteStore (TEXT timestamps, COALESCE)
- AcknowledgeUpdate uses NOW() instead of datetime('now')
- ListTags identical to SQLiteStore
- CreateTag uses RETURNING id (pgx does not support LastInsertId)
- DeleteTag, UnassignTag, TagExists use $1 positional param
- AssignTag uses ON CONFLICT (image) DO UPDATE SET tag_id = EXCLUDED.tag_id
177 lines
5.8 KiB
Go
177 lines
5.8 KiB
Go
package diunwebhook
|
|
|
|
import (
|
|
"database/sql"
|
|
"time"
|
|
)
|
|
|
|
// PostgresStore implements Store using a PostgreSQL database.
|
|
type PostgresStore struct {
|
|
db *sql.DB
|
|
}
|
|
|
|
// NewPostgresStore creates a new PostgresStore backed by the given *sql.DB.
|
|
// Configures connection pool settings appropriate for PostgreSQL.
|
|
// PostgreSQL handles concurrent writes natively so no mutex is needed.
|
|
func NewPostgresStore(db *sql.DB) *PostgresStore {
|
|
db.SetMaxOpenConns(25)
|
|
db.SetMaxIdleConns(5)
|
|
db.SetConnMaxLifetime(5 * time.Minute)
|
|
return &PostgresStore{db: db}
|
|
}
|
|
|
|
// UpsertEvent inserts or updates a DIUN event in the updates table.
|
|
// On conflict (same image), all fields are updated and acknowledged_at is reset to NULL.
|
|
func (s *PostgresStore) UpsertEvent(event DiunEvent) error {
|
|
_, err := s.db.Exec(`
|
|
INSERT INTO updates (
|
|
image, diun_version, hostname, status, provider,
|
|
hub_link, mime_type, digest, created, platform,
|
|
ctn_name, ctn_id, ctn_state, ctn_status,
|
|
received_at, acknowledged_at
|
|
) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,NULL)
|
|
ON CONFLICT(image) DO UPDATE SET
|
|
diun_version = EXCLUDED.diun_version,
|
|
hostname = EXCLUDED.hostname,
|
|
status = EXCLUDED.status,
|
|
provider = EXCLUDED.provider,
|
|
hub_link = EXCLUDED.hub_link,
|
|
mime_type = EXCLUDED.mime_type,
|
|
digest = EXCLUDED.digest,
|
|
created = EXCLUDED.created,
|
|
platform = EXCLUDED.platform,
|
|
ctn_name = EXCLUDED.ctn_name,
|
|
ctn_id = EXCLUDED.ctn_id,
|
|
ctn_state = EXCLUDED.ctn_state,
|
|
ctn_status = EXCLUDED.ctn_status,
|
|
received_at = EXCLUDED.received_at,
|
|
acknowledged_at = NULL`,
|
|
event.Image, event.DiunVersion, event.Hostname, event.Status, event.Provider,
|
|
event.HubLink, event.MimeType, event.Digest,
|
|
event.Created.Format(time.RFC3339), event.Platform,
|
|
event.Metadata.ContainerName, event.Metadata.ContainerID,
|
|
event.Metadata.State, event.Metadata.Status,
|
|
time.Now().Format(time.RFC3339),
|
|
)
|
|
return err
|
|
}
|
|
|
|
// GetUpdates returns all update entries joined with their tag assignments.
|
|
func (s *PostgresStore) GetUpdates() (map[string]UpdateEntry, error) {
|
|
rows, err := s.db.Query(`SELECT u.image, u.diun_version, u.hostname, u.status, u.provider,
|
|
u.hub_link, u.mime_type, u.digest, u.created, u.platform,
|
|
u.ctn_name, u.ctn_id, u.ctn_state, u.ctn_status, u.received_at, COALESCE(u.acknowledged_at, ''),
|
|
t.id, t.name
|
|
FROM updates u
|
|
LEFT JOIN tag_assignments ta ON u.image = ta.image
|
|
LEFT JOIN tags t ON ta.tag_id = t.id`)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
result := make(map[string]UpdateEntry)
|
|
for rows.Next() {
|
|
var e UpdateEntry
|
|
var createdStr, receivedStr, acknowledgedAt string
|
|
var tagID sql.NullInt64
|
|
var tagName sql.NullString
|
|
err := rows.Scan(&e.Event.Image, &e.Event.DiunVersion, &e.Event.Hostname,
|
|
&e.Event.Status, &e.Event.Provider, &e.Event.HubLink, &e.Event.MimeType,
|
|
&e.Event.Digest, &createdStr, &e.Event.Platform,
|
|
&e.Event.Metadata.ContainerName, &e.Event.Metadata.ContainerID,
|
|
&e.Event.Metadata.State, &e.Event.Metadata.Status,
|
|
&receivedStr, &acknowledgedAt, &tagID, &tagName)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
e.Event.Created, _ = time.Parse(time.RFC3339, createdStr)
|
|
e.ReceivedAt, _ = time.Parse(time.RFC3339, receivedStr)
|
|
e.Acknowledged = acknowledgedAt != ""
|
|
if tagID.Valid && tagName.Valid {
|
|
e.Tag = &Tag{ID: int(tagID.Int64), Name: tagName.String}
|
|
}
|
|
result[e.Event.Image] = e
|
|
}
|
|
return result, rows.Err()
|
|
}
|
|
|
|
// AcknowledgeUpdate marks the given image as acknowledged.
|
|
// Returns found=false if no row with that image exists.
|
|
func (s *PostgresStore) AcknowledgeUpdate(image string) (found bool, err error) {
|
|
res, err := s.db.Exec(`UPDATE updates SET acknowledged_at = NOW() WHERE image = $1`, image)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
n, _ := res.RowsAffected()
|
|
return n > 0, nil
|
|
}
|
|
|
|
// ListTags returns all tags ordered by name.
|
|
func (s *PostgresStore) ListTags() ([]Tag, error) {
|
|
rows, err := s.db.Query(`SELECT id, name FROM tags ORDER BY name`)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
tags := []Tag{}
|
|
for rows.Next() {
|
|
var t Tag
|
|
if err := rows.Scan(&t.ID, &t.Name); err != nil {
|
|
return nil, err
|
|
}
|
|
tags = append(tags, t)
|
|
}
|
|
return tags, rows.Err()
|
|
}
|
|
|
|
// CreateTag inserts a new tag with the given name and returns the created tag.
|
|
// Uses RETURNING id since pgx does not support LastInsertId.
|
|
func (s *PostgresStore) CreateTag(name string) (Tag, error) {
|
|
var id int
|
|
err := s.db.QueryRow(
|
|
`INSERT INTO tags (name) VALUES ($1) RETURNING id`, name,
|
|
).Scan(&id)
|
|
if err != nil {
|
|
return Tag{}, err
|
|
}
|
|
return Tag{ID: id, Name: name}, nil
|
|
}
|
|
|
|
// DeleteTag deletes the tag with the given id.
|
|
// Returns found=false if no tag with that id exists.
|
|
func (s *PostgresStore) DeleteTag(id int) (found bool, err error) {
|
|
res, err := s.db.Exec(`DELETE FROM tags WHERE id = $1`, id)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
n, _ := res.RowsAffected()
|
|
return n > 0, nil
|
|
}
|
|
|
|
// AssignTag assigns the given image to the given tag.
|
|
// Uses INSERT ... ON CONFLICT DO UPDATE so re-assigning an image to a different tag replaces the existing assignment.
|
|
func (s *PostgresStore) AssignTag(image string, tagID int) error {
|
|
_, err := s.db.Exec(
|
|
`INSERT INTO tag_assignments (image, tag_id) VALUES ($1, $2)
|
|
ON CONFLICT (image) DO UPDATE SET tag_id = EXCLUDED.tag_id`,
|
|
image, tagID,
|
|
)
|
|
return err
|
|
}
|
|
|
|
// UnassignTag removes any tag assignment for the given image.
|
|
func (s *PostgresStore) UnassignTag(image string) error {
|
|
_, err := s.db.Exec(`DELETE FROM tag_assignments WHERE image = $1`, image)
|
|
return err
|
|
}
|
|
|
|
// TagExists returns true if a tag with the given id exists.
|
|
func (s *PostgresStore) TagExists(id int) (bool, error) {
|
|
var count int
|
|
err := s.db.QueryRow(`SELECT COUNT(*) FROM tags WHERE id = $1`, id).Scan(&count)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
return count > 0, nil
|
|
}
|