From 30db60c77fdf607304bae7212ded75cd0d7a9f1f Mon Sep 17 00:00:00 2001 From: Virgil Date: Mon, 30 Mar 2026 15:48:33 +0000 Subject: [PATCH] refactor(store): tighten AX naming and examples Co-Authored-By: Virgil --- docs/architecture.md | 8 ++++---- docs/history.md | 2 +- store.go | 38 +++++++++++++++++++------------------- 3 files changed, 24 insertions(+), 24 deletions(-) diff --git a/docs/architecture.md b/docs/architecture.md index 1eba116..7855936 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -24,7 +24,7 @@ WAL (Write-Ahead Logging) mode allows concurrent readers to proceed without bloc The `database/sql` package maintains a connection pool by default. SQLite pragmas are per-connection: if the pool hands out a second connection, that connection inherits none of the WAL or busy-timeout settings, causing `SQLITE_BUSY` errors under concurrent load. -go-store calls `db.SetMaxOpenConns(1)` to pin all access to a single connection. Since SQLite serialises writes at the file level regardless, this introduces no additional throughput penalty. It eliminates the BUSY errors by ensuring the pragma settings always apply. +go-store calls `database.SetMaxOpenConns(1)` to pin all access to a single connection. Since SQLite serialises writes at the file level regardless, this introduces no additional throughput penalty. It eliminates the BUSY errors by ensuring the pragma settings always apply. ### Schema @@ -165,8 +165,8 @@ for event := range watcher.Events { This is the designed integration point for consumers such as go-ws: ```go -unregister := storeInstance.OnChange(func(e store.Event) { - hub.SendToChannel("store-events", e) +unregister := storeInstance.OnChange(func(event store.Event) { + hub.SendToChannel("store-events", event) }) defer unregister() ``` @@ -176,7 +176,7 @@ Callbacks may safely register or unregister watchers and callbacks while handlin ### Internal Dispatch -The `notify(e Event)` method first acquires the watcher read-lock, iterates all watchers with non-blocking channel sends, then releases the lock. It then acquires the callback read-lock, snapshots the registered callbacks, releases the lock, and invokes each callback synchronously. This keeps watcher delivery non-blocking while allowing callbacks to manage subscriptions re-entrantly. +The `notify(event Event)` method first acquires the watcher read-lock, iterates all watchers with non-blocking channel sends, then releases the lock. It then acquires the callback read-lock, snapshots the registered callbacks, releases the lock, and invokes each callback synchronously. This keeps watcher delivery non-blocking while allowing callbacks to manage subscriptions re-entrantly. Watcher matching is handled by the `watcherMatches` helper, which checks the group and key filters against the event. Wildcard `"*"` matches any value in its position. diff --git a/docs/history.md b/docs/history.md index e9b28aa..bb118bc 100644 --- a/docs/history.md +++ b/docs/history.md @@ -17,7 +17,7 @@ At extraction the package comprised a single source file and a single test file. **Problem.** The `database/sql` connection pool hands out different physical connections for each `Exec` or `Query` call. SQLite pragmas (`PRAGMA journal_mode=WAL`, `PRAGMA busy_timeout`) are per-connection. Under concurrent write load (10 goroutines, 100 ops each), connections from the pool that had not received the WAL pragma would block and return `SQLITE_BUSY` immediately rather than waiting. -**Fix.** `db.SetMaxOpenConns(1)` serialises all database access through a single connection. Because SQLite is a single-writer database by design (it serialises writes at the file-lock level regardless of pool size), this does not reduce write throughput. It eliminates the BUSY errors by ensuring the pragma settings always apply. +**Fix.** `database.SetMaxOpenConns(1)` serialises all database access through a single connection. Because SQLite is a single-writer database by design (it serialises writes at the file-lock level regardless of pool size), this does not reduce write throughput. It eliminates the BUSY errors by ensuring the pragma settings always apply. **Defence in depth.** `PRAGMA busy_timeout=5000` was added to make the single connection wait up to 5 seconds before reporting a timeout error, providing additional resilience. diff --git a/store.go b/store.go index 93a2045..b30d32d 100644 --- a/store.go +++ b/store.go @@ -47,8 +47,8 @@ type Store struct { // New creates a Store at the given SQLite path. Use ":memory:" for tests. // Usage example: `storeInstance, _ := store.New("/tmp/config.db")` -func New(dbPath string) (*Store, error) { - database, err := sql.Open("sqlite", dbPath) +func New(databasePath string) (*Store, error) { + sqliteDatabase, err := sql.Open("sqlite", databasePath) if err != nil { return nil, core.E("store.New", "open", err) } @@ -56,23 +56,23 @@ func New(dbPath string) (*Store, error) { // one writer at a time; using a pool causes SQLITE_BUSY under contention // because pragmas (journal_mode, busy_timeout) are per-connection and the // pool hands out different connections for each call. - database.SetMaxOpenConns(1) - if _, err := database.Exec("PRAGMA journal_mode=WAL"); err != nil { - database.Close() + sqliteDatabase.SetMaxOpenConns(1) + if _, err := sqliteDatabase.Exec("PRAGMA journal_mode=WAL"); err != nil { + sqliteDatabase.Close() return nil, core.E("store.New", "WAL", err) } - if _, err := database.Exec("PRAGMA busy_timeout=5000"); err != nil { - database.Close() + if _, err := sqliteDatabase.Exec("PRAGMA busy_timeout=5000"); err != nil { + sqliteDatabase.Close() return nil, core.E("store.New", "busy_timeout", err) } - if err := ensureSchema(database); err != nil { - database.Close() + if err := ensureSchema(sqliteDatabase); err != nil { + sqliteDatabase.Close() return nil, err } - ctx, cancel := context.WithCancel(context.Background()) - storeInstance := &Store{database: database, cancelPurge: cancel, purgeInterval: 60 * time.Second} - storeInstance.startPurge(ctx) + purgeContext, cancel := context.WithCancel(context.Background()) + storeInstance := &Store{database: sqliteDatabase, cancelPurge: cancel, purgeInterval: 60 * time.Second} + storeInstance.startPurge(purgeContext) return storeInstance, nil } @@ -330,7 +330,7 @@ func (storeInstance *Store) GroupsSeq(groupPrefix string) iter.Seq2[string, erro ) } if err != nil { - yield("", core.E("store.Groups", "query", err)) + yield("", core.E("store.GroupsSeq", "query", err)) return } defer rows.Close() @@ -338,7 +338,7 @@ func (storeInstance *Store) GroupsSeq(groupPrefix string) iter.Seq2[string, erro for rows.Next() { var groupName string if err := rows.Scan(&groupName); err != nil { - if !yield("", core.E("store.Groups", "scan", err)) { + if !yield("", core.E("store.GroupsSeq", "scan", err)) { return } continue @@ -348,7 +348,7 @@ func (storeInstance *Store) GroupsSeq(groupPrefix string) iter.Seq2[string, erro } } if err := rows.Err(); err != nil { - yield("", core.E("store.Groups", "rows", err)) + yield("", core.E("store.GroupsSeq", "rows", err)) } } } @@ -365,23 +365,23 @@ func escapeLike(text string) string { // of rows removed. // Usage example: `removed, err := storeInstance.PurgeExpired()` func (storeInstance *Store) PurgeExpired() (int64, error) { - result, err := storeInstance.database.Exec("DELETE FROM "+entriesTableName+" WHERE expires_at IS NOT NULL AND expires_at <= ?", + deleteResult, err := storeInstance.database.Exec("DELETE FROM "+entriesTableName+" WHERE expires_at IS NOT NULL AND expires_at <= ?", time.Now().UnixMilli()) if err != nil { return 0, core.E("store.PurgeExpired", "exec", err) } - return result.RowsAffected() + return deleteResult.RowsAffected() } // startPurge launches a background goroutine that purges expired entries at the // store's configured purge interval. It stops when the context is cancelled. -func (storeInstance *Store) startPurge(ctx context.Context) { +func (storeInstance *Store) startPurge(purgeContext context.Context) { storeInstance.purgeWaitGroup.Go(func() { ticker := time.NewTicker(storeInstance.purgeInterval) defer ticker.Stop() for { select { - case <-ctx.Done(): + case <-purgeContext.Done(): return case <-ticker.C: if _, err := storeInstance.PurgeExpired(); err != nil {