feat(02-01): add Store interface and SQLiteStore implementation
- Store interface with 9 methods covering all persistence operations - SQLiteStore implements all 9 methods with exact SQL from current handlers - NewSQLiteStore sets MaxOpenConns(1) and PRAGMA foreign_keys = ON - UpsertEvent uses ON CONFLICT DO UPDATE with acknowledged_at reset to NULL - AssignTag uses INSERT OR REPLACE for tag_assignments table - golang-migrate v4.19.1 dependency added to go.mod
This commit is contained in:
183
pkg/diunwebhook/sqlite_store.go
Normal file
183
pkg/diunwebhook/sqlite_store.go
Normal file
@@ -0,0 +1,183 @@
|
||||
package diunwebhook
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// SQLiteStore implements Store using a SQLite database.
|
||||
type SQLiteStore struct {
|
||||
db *sql.DB
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
// NewSQLiteStore creates a new SQLiteStore backed by the given *sql.DB.
|
||||
// It sets MaxOpenConns(1) to prevent concurrent write contention and
|
||||
// enables foreign key enforcement via PRAGMA foreign_keys = ON.
|
||||
func NewSQLiteStore(db *sql.DB) *SQLiteStore {
|
||||
db.SetMaxOpenConns(1)
|
||||
// PRAGMA foreign_keys must be set per-connection; with MaxOpenConns(1) this covers all queries.
|
||||
db.Exec("PRAGMA foreign_keys = ON") //nolint:errcheck
|
||||
return &SQLiteStore{db: db}
|
||||
}
|
||||
|
||||
// UpsertEvent inserts or updates a DIUN event in the updates table.
|
||||
// On conflict (same image), all fields are updated and acknowledged_at is reset to NULL.
|
||||
func (s *SQLiteStore) UpsertEvent(event DiunEvent) error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
_, err := s.db.Exec(`
|
||||
INSERT INTO updates (
|
||||
image, diun_version, hostname, status, provider,
|
||||
hub_link, mime_type, digest, created, platform,
|
||||
ctn_name, ctn_id, ctn_state, ctn_status,
|
||||
received_at, acknowledged_at
|
||||
) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,NULL)
|
||||
ON CONFLICT(image) DO UPDATE SET
|
||||
diun_version = excluded.diun_version,
|
||||
hostname = excluded.hostname,
|
||||
status = excluded.status,
|
||||
provider = excluded.provider,
|
||||
hub_link = excluded.hub_link,
|
||||
mime_type = excluded.mime_type,
|
||||
digest = excluded.digest,
|
||||
created = excluded.created,
|
||||
platform = excluded.platform,
|
||||
ctn_name = excluded.ctn_name,
|
||||
ctn_id = excluded.ctn_id,
|
||||
ctn_state = excluded.ctn_state,
|
||||
ctn_status = excluded.ctn_status,
|
||||
received_at = excluded.received_at,
|
||||
acknowledged_at = NULL`,
|
||||
event.Image, event.DiunVersion, event.Hostname, event.Status, event.Provider,
|
||||
event.HubLink, event.MimeType, event.Digest,
|
||||
event.Created.Format(time.RFC3339), event.Platform,
|
||||
event.Metadata.ContainerName, event.Metadata.ContainerID,
|
||||
event.Metadata.State, event.Metadata.Status,
|
||||
time.Now().Format(time.RFC3339),
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
||||
// GetUpdates returns all update entries joined with their tag assignments.
|
||||
func (s *SQLiteStore) GetUpdates() (map[string]UpdateEntry, error) {
|
||||
rows, err := s.db.Query(`SELECT u.image, u.diun_version, u.hostname, u.status, u.provider,
|
||||
u.hub_link, u.mime_type, u.digest, u.created, u.platform,
|
||||
u.ctn_name, u.ctn_id, u.ctn_state, u.ctn_status, u.received_at, COALESCE(u.acknowledged_at, ''),
|
||||
t.id, t.name
|
||||
FROM updates u
|
||||
LEFT JOIN tag_assignments ta ON u.image = ta.image
|
||||
LEFT JOIN tags t ON ta.tag_id = t.id`)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
result := make(map[string]UpdateEntry)
|
||||
for rows.Next() {
|
||||
var e UpdateEntry
|
||||
var createdStr, receivedStr, acknowledgedAt string
|
||||
var tagID sql.NullInt64
|
||||
var tagName sql.NullString
|
||||
err := rows.Scan(&e.Event.Image, &e.Event.DiunVersion, &e.Event.Hostname,
|
||||
&e.Event.Status, &e.Event.Provider, &e.Event.HubLink, &e.Event.MimeType,
|
||||
&e.Event.Digest, &createdStr, &e.Event.Platform,
|
||||
&e.Event.Metadata.ContainerName, &e.Event.Metadata.ContainerID,
|
||||
&e.Event.Metadata.State, &e.Event.Metadata.Status,
|
||||
&receivedStr, &acknowledgedAt, &tagID, &tagName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
e.Event.Created, _ = time.Parse(time.RFC3339, createdStr)
|
||||
e.ReceivedAt, _ = time.Parse(time.RFC3339, receivedStr)
|
||||
e.Acknowledged = acknowledgedAt != ""
|
||||
if tagID.Valid && tagName.Valid {
|
||||
e.Tag = &Tag{ID: int(tagID.Int64), Name: tagName.String}
|
||||
}
|
||||
result[e.Event.Image] = e
|
||||
}
|
||||
return result, rows.Err()
|
||||
}
|
||||
|
||||
// AcknowledgeUpdate marks the given image as acknowledged.
|
||||
// Returns found=false if no row with that image exists.
|
||||
func (s *SQLiteStore) AcknowledgeUpdate(image string) (found bool, err error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
res, err := s.db.Exec(`UPDATE updates SET acknowledged_at = datetime('now') WHERE image = ?`, image)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
n, _ := res.RowsAffected()
|
||||
return n > 0, nil
|
||||
}
|
||||
|
||||
// ListTags returns all tags ordered by name.
|
||||
func (s *SQLiteStore) ListTags() ([]Tag, error) {
|
||||
rows, err := s.db.Query(`SELECT id, name FROM tags ORDER BY name`)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
tags := []Tag{}
|
||||
for rows.Next() {
|
||||
var t Tag
|
||||
if err := rows.Scan(&t.ID, &t.Name); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
tags = append(tags, t)
|
||||
}
|
||||
return tags, rows.Err()
|
||||
}
|
||||
|
||||
// CreateTag inserts a new tag with the given name and returns the created tag.
|
||||
func (s *SQLiteStore) CreateTag(name string) (Tag, error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
res, err := s.db.Exec(`INSERT INTO tags (name) VALUES (?)`, name)
|
||||
if err != nil {
|
||||
return Tag{}, err
|
||||
}
|
||||
id, _ := res.LastInsertId()
|
||||
return Tag{ID: int(id), Name: name}, nil
|
||||
}
|
||||
|
||||
// DeleteTag deletes the tag with the given id.
|
||||
// Returns found=false if no tag with that id exists.
|
||||
func (s *SQLiteStore) DeleteTag(id int) (found bool, err error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
res, err := s.db.Exec(`DELETE FROM tags WHERE id = ?`, id)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
n, _ := res.RowsAffected()
|
||||
return n > 0, nil
|
||||
}
|
||||
|
||||
// AssignTag assigns the given image to the given tag.
|
||||
// Uses INSERT OR REPLACE so re-assigning an image to a different tag replaces the existing assignment.
|
||||
func (s *SQLiteStore) AssignTag(image string, tagID int) error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
_, err := s.db.Exec(`INSERT OR REPLACE INTO tag_assignments (image, tag_id) VALUES (?, ?)`, image, tagID)
|
||||
return err
|
||||
}
|
||||
|
||||
// UnassignTag removes any tag assignment for the given image.
|
||||
func (s *SQLiteStore) UnassignTag(image string) error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
_, err := s.db.Exec(`DELETE FROM tag_assignments WHERE image = ?`, image)
|
||||
return err
|
||||
}
|
||||
|
||||
// TagExists returns true if a tag with the given id exists.
|
||||
func (s *SQLiteStore) TagExists(id int) (bool, error) {
|
||||
var count int
|
||||
err := s.db.QueryRow(`SELECT COUNT(*) FROM tags WHERE id = ?`, id).Scan(&count)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return count > 0, nil
|
||||
}
|
||||
15
pkg/diunwebhook/store.go
Normal file
15
pkg/diunwebhook/store.go
Normal file
@@ -0,0 +1,15 @@
|
||||
package diunwebhook
|
||||
|
||||
// Store defines all persistence operations. Implementations must be safe
|
||||
// for concurrent use from HTTP handlers.
|
||||
type Store interface {
|
||||
UpsertEvent(event DiunEvent) error
|
||||
GetUpdates() (map[string]UpdateEntry, error)
|
||||
AcknowledgeUpdate(image string) (found bool, err error)
|
||||
ListTags() ([]Tag, error)
|
||||
CreateTag(name string) (Tag, error)
|
||||
DeleteTag(id int) (found bool, err error)
|
||||
AssignTag(image string, tagID int) error
|
||||
UnassignTag(image string) error
|
||||
TagExists(id int) (bool, error)
|
||||
}
|
||||
Reference in New Issue
Block a user