- Store interface with 9 methods covering all persistence operations - SQLiteStore implements all 9 methods with exact SQL from current handlers - NewSQLiteStore sets MaxOpenConns(1) and PRAGMA foreign_keys = ON - UpsertEvent uses ON CONFLICT DO UPDATE with acknowledged_at reset to NULL - AssignTag uses INSERT OR REPLACE for tag_assignments table - golang-migrate v4.19.1 dependency added to go.mod
184 lines
5.9 KiB
Go
184 lines
5.9 KiB
Go
package diunwebhook
|
|
|
|
import (
|
|
"database/sql"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// SQLiteStore implements Store using a SQLite database.
|
|
type SQLiteStore struct {
|
|
db *sql.DB
|
|
mu sync.Mutex
|
|
}
|
|
|
|
// NewSQLiteStore creates a new SQLiteStore backed by the given *sql.DB.
|
|
// It sets MaxOpenConns(1) to prevent concurrent write contention and
|
|
// enables foreign key enforcement via PRAGMA foreign_keys = ON.
|
|
func NewSQLiteStore(db *sql.DB) *SQLiteStore {
|
|
db.SetMaxOpenConns(1)
|
|
// PRAGMA foreign_keys must be set per-connection; with MaxOpenConns(1) this covers all queries.
|
|
db.Exec("PRAGMA foreign_keys = ON") //nolint:errcheck
|
|
return &SQLiteStore{db: db}
|
|
}
|
|
|
|
// UpsertEvent inserts or updates a DIUN event in the updates table.
|
|
// On conflict (same image), all fields are updated and acknowledged_at is reset to NULL.
|
|
func (s *SQLiteStore) UpsertEvent(event DiunEvent) error {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
_, err := s.db.Exec(`
|
|
INSERT INTO updates (
|
|
image, diun_version, hostname, status, provider,
|
|
hub_link, mime_type, digest, created, platform,
|
|
ctn_name, ctn_id, ctn_state, ctn_status,
|
|
received_at, acknowledged_at
|
|
) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,NULL)
|
|
ON CONFLICT(image) DO UPDATE SET
|
|
diun_version = excluded.diun_version,
|
|
hostname = excluded.hostname,
|
|
status = excluded.status,
|
|
provider = excluded.provider,
|
|
hub_link = excluded.hub_link,
|
|
mime_type = excluded.mime_type,
|
|
digest = excluded.digest,
|
|
created = excluded.created,
|
|
platform = excluded.platform,
|
|
ctn_name = excluded.ctn_name,
|
|
ctn_id = excluded.ctn_id,
|
|
ctn_state = excluded.ctn_state,
|
|
ctn_status = excluded.ctn_status,
|
|
received_at = excluded.received_at,
|
|
acknowledged_at = NULL`,
|
|
event.Image, event.DiunVersion, event.Hostname, event.Status, event.Provider,
|
|
event.HubLink, event.MimeType, event.Digest,
|
|
event.Created.Format(time.RFC3339), event.Platform,
|
|
event.Metadata.ContainerName, event.Metadata.ContainerID,
|
|
event.Metadata.State, event.Metadata.Status,
|
|
time.Now().Format(time.RFC3339),
|
|
)
|
|
return err
|
|
}
|
|
|
|
// GetUpdates returns all update entries joined with their tag assignments.
|
|
func (s *SQLiteStore) GetUpdates() (map[string]UpdateEntry, error) {
|
|
rows, err := s.db.Query(`SELECT u.image, u.diun_version, u.hostname, u.status, u.provider,
|
|
u.hub_link, u.mime_type, u.digest, u.created, u.platform,
|
|
u.ctn_name, u.ctn_id, u.ctn_state, u.ctn_status, u.received_at, COALESCE(u.acknowledged_at, ''),
|
|
t.id, t.name
|
|
FROM updates u
|
|
LEFT JOIN tag_assignments ta ON u.image = ta.image
|
|
LEFT JOIN tags t ON ta.tag_id = t.id`)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
result := make(map[string]UpdateEntry)
|
|
for rows.Next() {
|
|
var e UpdateEntry
|
|
var createdStr, receivedStr, acknowledgedAt string
|
|
var tagID sql.NullInt64
|
|
var tagName sql.NullString
|
|
err := rows.Scan(&e.Event.Image, &e.Event.DiunVersion, &e.Event.Hostname,
|
|
&e.Event.Status, &e.Event.Provider, &e.Event.HubLink, &e.Event.MimeType,
|
|
&e.Event.Digest, &createdStr, &e.Event.Platform,
|
|
&e.Event.Metadata.ContainerName, &e.Event.Metadata.ContainerID,
|
|
&e.Event.Metadata.State, &e.Event.Metadata.Status,
|
|
&receivedStr, &acknowledgedAt, &tagID, &tagName)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
e.Event.Created, _ = time.Parse(time.RFC3339, createdStr)
|
|
e.ReceivedAt, _ = time.Parse(time.RFC3339, receivedStr)
|
|
e.Acknowledged = acknowledgedAt != ""
|
|
if tagID.Valid && tagName.Valid {
|
|
e.Tag = &Tag{ID: int(tagID.Int64), Name: tagName.String}
|
|
}
|
|
result[e.Event.Image] = e
|
|
}
|
|
return result, rows.Err()
|
|
}
|
|
|
|
// AcknowledgeUpdate marks the given image as acknowledged.
|
|
// Returns found=false if no row with that image exists.
|
|
func (s *SQLiteStore) AcknowledgeUpdate(image string) (found bool, err error) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
res, err := s.db.Exec(`UPDATE updates SET acknowledged_at = datetime('now') WHERE image = ?`, image)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
n, _ := res.RowsAffected()
|
|
return n > 0, nil
|
|
}
|
|
|
|
// ListTags returns all tags ordered by name.
|
|
func (s *SQLiteStore) ListTags() ([]Tag, error) {
|
|
rows, err := s.db.Query(`SELECT id, name FROM tags ORDER BY name`)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
tags := []Tag{}
|
|
for rows.Next() {
|
|
var t Tag
|
|
if err := rows.Scan(&t.ID, &t.Name); err != nil {
|
|
return nil, err
|
|
}
|
|
tags = append(tags, t)
|
|
}
|
|
return tags, rows.Err()
|
|
}
|
|
|
|
// CreateTag inserts a new tag with the given name and returns the created tag.
|
|
func (s *SQLiteStore) CreateTag(name string) (Tag, error) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
res, err := s.db.Exec(`INSERT INTO tags (name) VALUES (?)`, name)
|
|
if err != nil {
|
|
return Tag{}, err
|
|
}
|
|
id, _ := res.LastInsertId()
|
|
return Tag{ID: int(id), Name: name}, nil
|
|
}
|
|
|
|
// DeleteTag deletes the tag with the given id.
|
|
// Returns found=false if no tag with that id exists.
|
|
func (s *SQLiteStore) DeleteTag(id int) (found bool, err error) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
res, err := s.db.Exec(`DELETE FROM tags WHERE id = ?`, id)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
n, _ := res.RowsAffected()
|
|
return n > 0, nil
|
|
}
|
|
|
|
// AssignTag assigns the given image to the given tag.
|
|
// Uses INSERT OR REPLACE so re-assigning an image to a different tag replaces the existing assignment.
|
|
func (s *SQLiteStore) AssignTag(image string, tagID int) error {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
_, err := s.db.Exec(`INSERT OR REPLACE INTO tag_assignments (image, tag_id) VALUES (?, ?)`, image, tagID)
|
|
return err
|
|
}
|
|
|
|
// UnassignTag removes any tag assignment for the given image.
|
|
func (s *SQLiteStore) UnassignTag(image string) error {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
_, err := s.db.Exec(`DELETE FROM tag_assignments WHERE image = ?`, image)
|
|
return err
|
|
}
|
|
|
|
// TagExists returns true if a tag with the given id exists.
|
|
func (s *SQLiteStore) TagExists(id int) (bool, error) {
|
|
var count int
|
|
err := s.db.QueryRow(`SELECT COUNT(*) FROM tags WHERE id = ?`, id).Scan(&count)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
return count > 0, nil
|
|
}
|