From e55a8a8457bf0d2d7b78ece7ee9f7fee20d5295d Mon Sep 17 00:00:00 2001 From: Virgil Date: Sat, 4 Apr 2026 08:46:59 +0000 Subject: [PATCH] feat(store): add transaction api Co-Authored-By: Virgil --- docs/architecture.md | 7 ++ docs/history.md | 3 - docs/index.md | 3 +- transaction.go | 247 +++++++++++++++++++++++++++++++++++++++++++ transaction_test.go | 83 +++++++++++++++ 5 files changed, 339 insertions(+), 4 deletions(-) create mode 100644 transaction.go create mode 100644 transaction_test.go diff --git a/docs/architecture.md b/docs/architecture.md index f495f3c..6d67f65 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -239,11 +239,18 @@ All SQLite access is serialised through a single connection (`SetMaxOpenConns(1) All operations are safe to call from multiple goroutines concurrently. The race detector is clean under the project's standard test suite (`go test -race ./...`). +## Transaction API + +`Store.Transaction(func(tx *StoreTx) error)` opens a SQLite transaction and hands a `StoreTx` helper to the callback. The helper exposes transaction-scoped write methods such as `Set`, `SetWithTTL`, `Delete`, `DeleteGroup`, and `DeletePrefix`. If the callback returns an error, the transaction rolls back. If the callback succeeds, the transaction commits and the staged events are published after commit. + +This API is the supported way to perform atomic multi-group operations without exposing raw `Begin`/`Commit` control to callers. + ## File Layout ``` doc.go Package comment with concrete usage examples store.go Core Store type, CRUD, prefix cleanup, TTL, background purge, iterators, rendering +transaction.go Store.Transaction and transaction-scoped mutation helpers events.go EventType, Event, Watch, Unwatch, OnChange, notify scope.go ScopedStore, QuotaConfig, namespace-local helper delegation, quota enforcement journal.go Journal persistence, Flux-like querying, JSON row inflation diff --git a/docs/history.md b/docs/history.md index 52963b3..f35992e 100644 --- a/docs/history.md +++ b/docs/history.md @@ -204,8 +204,6 @@ These tests exercise correct defensive code. They must continue to pass but are **`GetAll` memory usage.** Fetching a group with 10,000 keys allocates approximately 2.3 MB per call. Use `GetPage()` when you need offset/limit pagination over a large group. Applications with very large groups should still prefer smaller groups or selective queries. -**No cross-group transactions.** There is no API for atomic multi-group operations. Each method is individually atomic at the SQLite level, but there is no `Begin`/`Commit` exposed to callers. - **No persistence of watcher registrations.** Watchers and callbacks are in-memory only. They are not persisted across `Close`/`New` cycles. --- @@ -215,4 +213,3 @@ These tests exercise correct defensive code. They must continue to pass but are These are design notes, not committed work: - **Indexed prefix keys.** An additional index on `(group_name, entry_key)` prefix would accelerate prefix scans without a full-table scan. -- **Cross-group atomic operations.** Exposing a `Transaction(func(tx *StoreTx) error)` API would allow callers to compose atomic multi-group operations. diff --git a/docs/index.md b/docs/index.md index 6dbc6b3..94d738c 100644 --- a/docs/index.md +++ b/docs/index.md @@ -117,7 +117,8 @@ The entire package lives in a single Go package (`package store`) with the follo | File | Purpose | |------|---------| | `doc.go` | Package comment with concrete usage examples | -| `store.go` | Core `Store` type, CRUD operations (`Get`, `Set`, `SetWithTTL`, `Delete`, `DeleteGroup`, `DeletePrefix`), bulk queries (`GetAll`, `GetPage`, `All`, `Count`, `CountAll`, `Groups`, `GroupsSeq`), string splitting helpers (`GetSplit`, `GetFields`), template rendering (`Render`), TTL expiry, background purge goroutine | +| `store.go` | Core `Store` type, CRUD operations (`Get`, `Set`, `SetWithTTL`, `Delete`, `DeleteGroup`, `DeletePrefix`), bulk queries (`GetAll`, `GetPage`, `All`, `Count`, `CountAll`, `Groups`, `GroupsSeq`), string splitting helpers (`GetSplit`, `GetFields`), template rendering (`Render`), TTL expiry, background purge goroutine, transaction support | +| `transaction.go` | `Store.Transaction`, transaction-scoped write helpers, staged event dispatch | | `events.go` | `EventType` constants, `Event` struct, `Watch`/`Unwatch` channel subscriptions, `OnChange` callback registration, internal `notify` dispatch | | `scope.go` | `ScopedStore` wrapper for namespace isolation, `QuotaConfig` struct, `NewScoped`/`NewScopedWithQuota` constructors, namespace-local helper delegation, quota enforcement logic | | `journal.go` | Journal persistence, Flux-like querying, JSON row inflation, journal schema helpers | diff --git a/transaction.go b/transaction.go new file mode 100644 index 0000000..fa77d44 --- /dev/null +++ b/transaction.go @@ -0,0 +1,247 @@ +package store + +import ( + "database/sql" + "time" + + core "dappco.re/go/core" +) + +// Usage example: `err := storeInstance.Transaction(func(tx *store.StoreTx) error { return tx.Set("config", "colour", "blue") })` +type StoreTx struct { + store *Store + transaction *sql.Tx + pendingEvents []Event +} + +// Usage example: `err := storeInstance.Transaction(func(tx *store.StoreTx) error { if err := tx.Set("tenant-a:config", "colour", "blue"); err != nil { return err }; return tx.Set("tenant-b:config", "language", "en-GB") })` +func (storeInstance *Store) Transaction(operation func(*StoreTx) error) error { + if err := storeInstance.ensureReady("store.Transaction"); err != nil { + return err + } + if operation == nil { + return core.E("store.Transaction", "operation is nil", nil) + } + + transaction, err := storeInstance.database.Begin() + if err != nil { + return core.E("store.Transaction", "begin transaction", err) + } + + storeTransaction := &StoreTx{ + store: storeInstance, + transaction: transaction, + } + + committed := false + defer func() { + if !committed { + _ = transaction.Rollback() + } + }() + + if err := operation(storeTransaction); err != nil { + return core.E("store.Transaction", "execute transaction", err) + } + if err := transaction.Commit(); err != nil { + return core.E("store.Transaction", "commit transaction", err) + } + committed = true + + for _, event := range storeTransaction.pendingEvents { + storeInstance.notify(event) + } + return nil +} + +func (storeTransaction *StoreTx) ensureReady(operation string) error { + if storeTransaction == nil { + return core.E(operation, "transaction is nil", nil) + } + if storeTransaction.store == nil { + return core.E(operation, "transaction store is nil", nil) + } + if storeTransaction.transaction == nil { + return core.E(operation, "transaction database is nil", nil) + } + if err := storeTransaction.store.ensureReady(operation); err != nil { + return err + } + return nil +} + +func (storeTransaction *StoreTx) recordEvent(event Event) { + if storeTransaction == nil { + return + } + storeTransaction.pendingEvents = append(storeTransaction.pendingEvents, event) +} + +// Usage example: `value, err := tx.Get("config", "colour")` +func (storeTransaction *StoreTx) Get(group, key string) (string, error) { + if err := storeTransaction.ensureReady("store.Transaction.Get"); err != nil { + return "", err + } + + var value string + var expiresAt sql.NullInt64 + err := storeTransaction.transaction.QueryRow( + "SELECT "+entryValueColumn+", expires_at FROM "+entriesTableName+" WHERE "+entryGroupColumn+" = ? AND "+entryKeyColumn+" = ?", + group, key, + ).Scan(&value, &expiresAt) + if err == sql.ErrNoRows { + return "", core.E("store.Transaction.Get", core.Concat(group, "/", key), NotFoundError) + } + if err != nil { + return "", core.E("store.Transaction.Get", "query row", err) + } + if expiresAt.Valid && expiresAt.Int64 <= time.Now().UnixMilli() { + if err := storeTransaction.Delete(group, key); err != nil { + return "", core.E("store.Transaction.Get", "delete expired row", err) + } + return "", core.E("store.Transaction.Get", core.Concat(group, "/", key), NotFoundError) + } + return value, nil +} + +// Usage example: `if err := tx.Set("config", "colour", "blue"); err != nil { return err }` +func (storeTransaction *StoreTx) Set(group, key, value string) error { + if err := storeTransaction.ensureReady("store.Transaction.Set"); err != nil { + return err + } + + _, err := storeTransaction.transaction.Exec( + "INSERT INTO "+entriesTableName+" ("+entryGroupColumn+", "+entryKeyColumn+", "+entryValueColumn+", expires_at) VALUES (?, ?, ?, NULL) "+ + "ON CONFLICT("+entryGroupColumn+", "+entryKeyColumn+") DO UPDATE SET "+entryValueColumn+" = excluded."+entryValueColumn+", expires_at = NULL", + group, key, value, + ) + if err != nil { + return core.E("store.Transaction.Set", "execute upsert", err) + } + storeTransaction.recordEvent(Event{Type: EventSet, Group: group, Key: key, Value: value, Timestamp: time.Now()}) + return nil +} + +// Usage example: `if err := tx.SetWithTTL("session", "token", "abc123", time.Minute); err != nil { return err }` +func (storeTransaction *StoreTx) SetWithTTL(group, key, value string, timeToLive time.Duration) error { + if err := storeTransaction.ensureReady("store.Transaction.SetWithTTL"); err != nil { + return err + } + + expiresAt := time.Now().Add(timeToLive).UnixMilli() + _, err := storeTransaction.transaction.Exec( + "INSERT INTO "+entriesTableName+" ("+entryGroupColumn+", "+entryKeyColumn+", "+entryValueColumn+", expires_at) VALUES (?, ?, ?, ?) "+ + "ON CONFLICT("+entryGroupColumn+", "+entryKeyColumn+") DO UPDATE SET "+entryValueColumn+" = excluded."+entryValueColumn+", expires_at = excluded.expires_at", + group, key, value, expiresAt, + ) + if err != nil { + return core.E("store.Transaction.SetWithTTL", "execute upsert with expiry", err) + } + storeTransaction.recordEvent(Event{Type: EventSet, Group: group, Key: key, Value: value, Timestamp: time.Now()}) + return nil +} + +// Usage example: `if err := tx.Delete("config", "colour"); err != nil { return err }` +func (storeTransaction *StoreTx) Delete(group, key string) error { + if err := storeTransaction.ensureReady("store.Transaction.Delete"); err != nil { + return err + } + + deleteResult, err := storeTransaction.transaction.Exec( + "DELETE FROM "+entriesTableName+" WHERE "+entryGroupColumn+" = ? AND "+entryKeyColumn+" = ?", + group, key, + ) + if err != nil { + return core.E("store.Transaction.Delete", "delete row", err) + } + deletedRows, rowsAffectedError := deleteResult.RowsAffected() + if rowsAffectedError != nil { + return core.E("store.Transaction.Delete", "count deleted rows", rowsAffectedError) + } + if deletedRows > 0 { + storeTransaction.recordEvent(Event{Type: EventDelete, Group: group, Key: key, Timestamp: time.Now()}) + } + return nil +} + +// Usage example: `if err := tx.DeleteGroup("cache"); err != nil { return err }` +func (storeTransaction *StoreTx) DeleteGroup(group string) error { + if err := storeTransaction.ensureReady("store.Transaction.DeleteGroup"); err != nil { + return err + } + + deleteResult, err := storeTransaction.transaction.Exec( + "DELETE FROM "+entriesTableName+" WHERE "+entryGroupColumn+" = ?", + group, + ) + if err != nil { + return core.E("store.Transaction.DeleteGroup", "delete group", err) + } + deletedRows, rowsAffectedError := deleteResult.RowsAffected() + if rowsAffectedError != nil { + return core.E("store.Transaction.DeleteGroup", "count deleted rows", rowsAffectedError) + } + if deletedRows > 0 { + storeTransaction.recordEvent(Event{Type: EventDeleteGroup, Group: group, Timestamp: time.Now()}) + } + return nil +} + +// Usage example: `if err := tx.DeletePrefix("tenant-a:"); err != nil { return err }` +func (storeTransaction *StoreTx) DeletePrefix(groupPrefix string) error { + if err := storeTransaction.ensureReady("store.Transaction.DeletePrefix"); err != nil { + return err + } + + var rows *sql.Rows + var err error + if groupPrefix == "" { + rows, err = storeTransaction.transaction.Query( + "SELECT DISTINCT " + entryGroupColumn + " FROM " + entriesTableName + " ORDER BY " + entryGroupColumn, + ) + } else { + rows, err = storeTransaction.transaction.Query( + "SELECT DISTINCT "+entryGroupColumn+" FROM "+entriesTableName+" WHERE "+entryGroupColumn+" LIKE ? ESCAPE '^' ORDER BY "+entryGroupColumn, + escapeLike(groupPrefix)+"%", + ) + } + if err != nil { + return core.E("store.Transaction.DeletePrefix", "list groups", err) + } + defer rows.Close() + + var groupNames []string + for rows.Next() { + var groupName string + if err := rows.Scan(&groupName); err != nil { + return core.E("store.Transaction.DeletePrefix", "scan group name", err) + } + groupNames = append(groupNames, groupName) + } + if err := rows.Err(); err != nil { + return core.E("store.Transaction.DeletePrefix", "iterate groups", err) + } + for _, groupName := range groupNames { + if err := storeTransaction.DeleteGroup(groupName); err != nil { + return core.E("store.Transaction.DeletePrefix", "delete group", err) + } + } + return nil +} + +// Usage example: `keyCount, err := tx.Count("config")` +func (storeTransaction *StoreTx) Count(group string) (int, error) { + if err := storeTransaction.ensureReady("store.Transaction.Count"); err != nil { + return 0, err + } + + var count int + err := storeTransaction.transaction.QueryRow( + "SELECT COUNT(*) FROM "+entriesTableName+" WHERE "+entryGroupColumn+" = ? AND (expires_at IS NULL OR expires_at > ?)", + group, time.Now().UnixMilli(), + ).Scan(&count) + if err != nil { + return 0, core.E("store.Transaction.Count", "count rows", err) + } + return count, nil +} diff --git a/transaction_test.go b/transaction_test.go new file mode 100644 index 0000000..c374351 --- /dev/null +++ b/transaction_test.go @@ -0,0 +1,83 @@ +package store + +import ( + "testing" + "time" + + core "dappco.re/go/core" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestTransaction_Transaction_Good_CommitsMultipleWrites(t *testing.T) { + storeInstance, _ := New(":memory:") + defer storeInstance.Close() + + events := storeInstance.Watch("*") + defer storeInstance.Unwatch("*", events) + + err := storeInstance.Transaction(func(transaction *StoreTx) error { + if err := transaction.Set("alpha", "first", "1"); err != nil { + return err + } + if err := transaction.Set("beta", "second", "2"); err != nil { + return err + } + return nil + }) + require.NoError(t, err) + + firstValue, err := storeInstance.Get("alpha", "first") + require.NoError(t, err) + assert.Equal(t, "1", firstValue) + + secondValue, err := storeInstance.Get("beta", "second") + require.NoError(t, err) + assert.Equal(t, "2", secondValue) + + received := drainEvents(events, 2, time.Second) + require.Len(t, received, 2) + assert.Equal(t, EventSet, received[0].Type) + assert.Equal(t, "alpha", received[0].Group) + assert.Equal(t, "first", received[0].Key) + assert.Equal(t, EventSet, received[1].Type) + assert.Equal(t, "beta", received[1].Group) + assert.Equal(t, "second", received[1].Key) +} + +func TestTransaction_Transaction_Good_RollbackOnError(t *testing.T) { + storeInstance, _ := New(":memory:") + defer storeInstance.Close() + + err := storeInstance.Transaction(func(transaction *StoreTx) error { + if err := transaction.Set("alpha", "first", "1"); err != nil { + return err + } + return core.E("test", "force rollback", nil) + }) + require.Error(t, err) + + _, err = storeInstance.Get("alpha", "first") + assert.ErrorIs(t, err, NotFoundError) +} + +func TestTransaction_Transaction_Good_DeletesAtomically(t *testing.T) { + storeInstance, _ := New(":memory:") + defer storeInstance.Close() + + require.NoError(t, storeInstance.Set("alpha", "first", "1")) + require.NoError(t, storeInstance.Set("beta", "second", "2")) + + err := storeInstance.Transaction(func(transaction *StoreTx) error { + if err := transaction.DeletePrefix(""); err != nil { + return err + } + return nil + }) + require.NoError(t, err) + + _, err = storeInstance.Get("alpha", "first") + assert.ErrorIs(t, err, NotFoundError) + _, err = storeInstance.Get("beta", "second") + assert.ErrorIs(t, err, NotFoundError) +}