refactor(store): tighten AX naming and examples
Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
parent
d54609b974
commit
30db60c77f
3 changed files with 24 additions and 24 deletions
|
|
@ -24,7 +24,7 @@ WAL (Write-Ahead Logging) mode allows concurrent readers to proceed without bloc
|
|||
|
||||
The `database/sql` package maintains a connection pool by default. SQLite pragmas are per-connection: if the pool hands out a second connection, that connection inherits none of the WAL or busy-timeout settings, causing `SQLITE_BUSY` errors under concurrent load.
|
||||
|
||||
go-store calls `db.SetMaxOpenConns(1)` to pin all access to a single connection. Since SQLite serialises writes at the file level regardless, this introduces no additional throughput penalty. It eliminates the BUSY errors by ensuring the pragma settings always apply.
|
||||
go-store calls `database.SetMaxOpenConns(1)` to pin all access to a single connection. Since SQLite serialises writes at the file level regardless, this introduces no additional throughput penalty. It eliminates the BUSY errors by ensuring the pragma settings always apply.
|
||||
|
||||
### Schema
|
||||
|
||||
|
|
@ -165,8 +165,8 @@ for event := range watcher.Events {
|
|||
This is the designed integration point for consumers such as go-ws:
|
||||
|
||||
```go
|
||||
unregister := storeInstance.OnChange(func(e store.Event) {
|
||||
hub.SendToChannel("store-events", e)
|
||||
unregister := storeInstance.OnChange(func(event store.Event) {
|
||||
hub.SendToChannel("store-events", event)
|
||||
})
|
||||
defer unregister()
|
||||
```
|
||||
|
|
@ -176,7 +176,7 @@ Callbacks may safely register or unregister watchers and callbacks while handlin
|
|||
|
||||
### Internal Dispatch
|
||||
|
||||
The `notify(e Event)` method first acquires the watcher read-lock, iterates all watchers with non-blocking channel sends, then releases the lock. It then acquires the callback read-lock, snapshots the registered callbacks, releases the lock, and invokes each callback synchronously. This keeps watcher delivery non-blocking while allowing callbacks to manage subscriptions re-entrantly.
|
||||
The `notify(event Event)` method first acquires the watcher read-lock, iterates all watchers with non-blocking channel sends, then releases the lock. It then acquires the callback read-lock, snapshots the registered callbacks, releases the lock, and invokes each callback synchronously. This keeps watcher delivery non-blocking while allowing callbacks to manage subscriptions re-entrantly.
|
||||
|
||||
Watcher matching is handled by the `watcherMatches` helper, which checks the group and key filters against the event. Wildcard `"*"` matches any value in its position.
|
||||
|
||||
|
|
|
|||
|
|
@ -17,7 +17,7 @@ At extraction the package comprised a single source file and a single test file.
|
|||
|
||||
**Problem.** The `database/sql` connection pool hands out different physical connections for each `Exec` or `Query` call. SQLite pragmas (`PRAGMA journal_mode=WAL`, `PRAGMA busy_timeout`) are per-connection. Under concurrent write load (10 goroutines, 100 ops each), connections from the pool that had not received the WAL pragma would block and return `SQLITE_BUSY` immediately rather than waiting.
|
||||
|
||||
**Fix.** `db.SetMaxOpenConns(1)` serialises all database access through a single connection. Because SQLite is a single-writer database by design (it serialises writes at the file-lock level regardless of pool size), this does not reduce write throughput. It eliminates the BUSY errors by ensuring the pragma settings always apply.
|
||||
**Fix.** `database.SetMaxOpenConns(1)` serialises all database access through a single connection. Because SQLite is a single-writer database by design (it serialises writes at the file-lock level regardless of pool size), this does not reduce write throughput. It eliminates the BUSY errors by ensuring the pragma settings always apply.
|
||||
|
||||
**Defence in depth.** `PRAGMA busy_timeout=5000` was added to make the single connection wait up to 5 seconds before reporting a timeout error, providing additional resilience.
|
||||
|
||||
|
|
|
|||
38
store.go
38
store.go
|
|
@ -47,8 +47,8 @@ type Store struct {
|
|||
|
||||
// New creates a Store at the given SQLite path. Use ":memory:" for tests.
|
||||
// Usage example: `storeInstance, _ := store.New("/tmp/config.db")`
|
||||
func New(dbPath string) (*Store, error) {
|
||||
database, err := sql.Open("sqlite", dbPath)
|
||||
func New(databasePath string) (*Store, error) {
|
||||
sqliteDatabase, err := sql.Open("sqlite", databasePath)
|
||||
if err != nil {
|
||||
return nil, core.E("store.New", "open", err)
|
||||
}
|
||||
|
|
@ -56,23 +56,23 @@ func New(dbPath string) (*Store, error) {
|
|||
// one writer at a time; using a pool causes SQLITE_BUSY under contention
|
||||
// because pragmas (journal_mode, busy_timeout) are per-connection and the
|
||||
// pool hands out different connections for each call.
|
||||
database.SetMaxOpenConns(1)
|
||||
if _, err := database.Exec("PRAGMA journal_mode=WAL"); err != nil {
|
||||
database.Close()
|
||||
sqliteDatabase.SetMaxOpenConns(1)
|
||||
if _, err := sqliteDatabase.Exec("PRAGMA journal_mode=WAL"); err != nil {
|
||||
sqliteDatabase.Close()
|
||||
return nil, core.E("store.New", "WAL", err)
|
||||
}
|
||||
if _, err := database.Exec("PRAGMA busy_timeout=5000"); err != nil {
|
||||
database.Close()
|
||||
if _, err := sqliteDatabase.Exec("PRAGMA busy_timeout=5000"); err != nil {
|
||||
sqliteDatabase.Close()
|
||||
return nil, core.E("store.New", "busy_timeout", err)
|
||||
}
|
||||
if err := ensureSchema(database); err != nil {
|
||||
database.Close()
|
||||
if err := ensureSchema(sqliteDatabase); err != nil {
|
||||
sqliteDatabase.Close()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
storeInstance := &Store{database: database, cancelPurge: cancel, purgeInterval: 60 * time.Second}
|
||||
storeInstance.startPurge(ctx)
|
||||
purgeContext, cancel := context.WithCancel(context.Background())
|
||||
storeInstance := &Store{database: sqliteDatabase, cancelPurge: cancel, purgeInterval: 60 * time.Second}
|
||||
storeInstance.startPurge(purgeContext)
|
||||
return storeInstance, nil
|
||||
}
|
||||
|
||||
|
|
@ -330,7 +330,7 @@ func (storeInstance *Store) GroupsSeq(groupPrefix string) iter.Seq2[string, erro
|
|||
)
|
||||
}
|
||||
if err != nil {
|
||||
yield("", core.E("store.Groups", "query", err))
|
||||
yield("", core.E("store.GroupsSeq", "query", err))
|
||||
return
|
||||
}
|
||||
defer rows.Close()
|
||||
|
|
@ -338,7 +338,7 @@ func (storeInstance *Store) GroupsSeq(groupPrefix string) iter.Seq2[string, erro
|
|||
for rows.Next() {
|
||||
var groupName string
|
||||
if err := rows.Scan(&groupName); err != nil {
|
||||
if !yield("", core.E("store.Groups", "scan", err)) {
|
||||
if !yield("", core.E("store.GroupsSeq", "scan", err)) {
|
||||
return
|
||||
}
|
||||
continue
|
||||
|
|
@ -348,7 +348,7 @@ func (storeInstance *Store) GroupsSeq(groupPrefix string) iter.Seq2[string, erro
|
|||
}
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
yield("", core.E("store.Groups", "rows", err))
|
||||
yield("", core.E("store.GroupsSeq", "rows", err))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -365,23 +365,23 @@ func escapeLike(text string) string {
|
|||
// of rows removed.
|
||||
// Usage example: `removed, err := storeInstance.PurgeExpired()`
|
||||
func (storeInstance *Store) PurgeExpired() (int64, error) {
|
||||
result, err := storeInstance.database.Exec("DELETE FROM "+entriesTableName+" WHERE expires_at IS NOT NULL AND expires_at <= ?",
|
||||
deleteResult, err := storeInstance.database.Exec("DELETE FROM "+entriesTableName+" WHERE expires_at IS NOT NULL AND expires_at <= ?",
|
||||
time.Now().UnixMilli())
|
||||
if err != nil {
|
||||
return 0, core.E("store.PurgeExpired", "exec", err)
|
||||
}
|
||||
return result.RowsAffected()
|
||||
return deleteResult.RowsAffected()
|
||||
}
|
||||
|
||||
// startPurge launches a background goroutine that purges expired entries at the
|
||||
// store's configured purge interval. It stops when the context is cancelled.
|
||||
func (storeInstance *Store) startPurge(ctx context.Context) {
|
||||
func (storeInstance *Store) startPurge(purgeContext context.Context) {
|
||||
storeInstance.purgeWaitGroup.Go(func() {
|
||||
ticker := time.NewTicker(storeInstance.purgeInterval)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case <-purgeContext.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
if _, err := storeInstance.PurgeExpired(); err != nil {
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue