feat(store): add transaction api
Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
parent
cc8bebb8e0
commit
e55a8a8457
5 changed files with 339 additions and 4 deletions
|
|
@ -239,11 +239,18 @@ All SQLite access is serialised through a single connection (`SetMaxOpenConns(1)
|
|||
|
||||
All operations are safe to call from multiple goroutines concurrently. The race detector is clean under the project's standard test suite (`go test -race ./...`).
|
||||
|
||||
## Transaction API
|
||||
|
||||
`Store.Transaction(func(tx *StoreTx) error)` opens a SQLite transaction and hands a `StoreTx` helper to the callback. The helper exposes transaction-scoped write methods such as `Set`, `SetWithTTL`, `Delete`, `DeleteGroup`, and `DeletePrefix`. If the callback returns an error, the transaction rolls back. If the callback succeeds, the transaction commits and the staged events are published after commit.
|
||||
|
||||
This API is the supported way to perform atomic multi-group operations without exposing raw `Begin`/`Commit` control to callers.
|
||||
|
||||
## File Layout
|
||||
|
||||
```
|
||||
doc.go Package comment with concrete usage examples
|
||||
store.go Core Store type, CRUD, prefix cleanup, TTL, background purge, iterators, rendering
|
||||
transaction.go Store.Transaction and transaction-scoped mutation helpers
|
||||
events.go EventType, Event, Watch, Unwatch, OnChange, notify
|
||||
scope.go ScopedStore, QuotaConfig, namespace-local helper delegation, quota enforcement
|
||||
journal.go Journal persistence, Flux-like querying, JSON row inflation
|
||||
|
|
|
|||
|
|
@ -204,8 +204,6 @@ These tests exercise correct defensive code. They must continue to pass but are
|
|||
|
||||
**`GetAll` memory usage.** Fetching a group with 10,000 keys allocates approximately 2.3 MB per call. Use `GetPage()` when you need offset/limit pagination over a large group. Applications with very large groups should still prefer smaller groups or selective queries.
|
||||
|
||||
**No cross-group transactions.** There is no API for atomic multi-group operations. Each method is individually atomic at the SQLite level, but there is no `Begin`/`Commit` exposed to callers.
|
||||
|
||||
**No persistence of watcher registrations.** Watchers and callbacks are in-memory only. They are not persisted across `Close`/`New` cycles.
|
||||
|
||||
---
|
||||
|
|
@ -215,4 +213,3 @@ These tests exercise correct defensive code. They must continue to pass but are
|
|||
These are design notes, not committed work:
|
||||
|
||||
- **Indexed prefix keys.** An additional index on `(group_name, entry_key)` prefix would accelerate prefix scans without a full-table scan.
|
||||
- **Cross-group atomic operations.** Exposing a `Transaction(func(tx *StoreTx) error)` API would allow callers to compose atomic multi-group operations.
|
||||
|
|
|
|||
|
|
@ -117,7 +117,8 @@ The entire package lives in a single Go package (`package store`) with the follo
|
|||
| File | Purpose |
|
||||
|------|---------|
|
||||
| `doc.go` | Package comment with concrete usage examples |
|
||||
| `store.go` | Core `Store` type, CRUD operations (`Get`, `Set`, `SetWithTTL`, `Delete`, `DeleteGroup`, `DeletePrefix`), bulk queries (`GetAll`, `GetPage`, `All`, `Count`, `CountAll`, `Groups`, `GroupsSeq`), string splitting helpers (`GetSplit`, `GetFields`), template rendering (`Render`), TTL expiry, background purge goroutine |
|
||||
| `store.go` | Core `Store` type, CRUD operations (`Get`, `Set`, `SetWithTTL`, `Delete`, `DeleteGroup`, `DeletePrefix`), bulk queries (`GetAll`, `GetPage`, `All`, `Count`, `CountAll`, `Groups`, `GroupsSeq`), string splitting helpers (`GetSplit`, `GetFields`), template rendering (`Render`), TTL expiry, background purge goroutine, transaction support |
|
||||
| `transaction.go` | `Store.Transaction`, transaction-scoped write helpers, staged event dispatch |
|
||||
| `events.go` | `EventType` constants, `Event` struct, `Watch`/`Unwatch` channel subscriptions, `OnChange` callback registration, internal `notify` dispatch |
|
||||
| `scope.go` | `ScopedStore` wrapper for namespace isolation, `QuotaConfig` struct, `NewScoped`/`NewScopedWithQuota` constructors, namespace-local helper delegation, quota enforcement logic |
|
||||
| `journal.go` | Journal persistence, Flux-like querying, JSON row inflation, journal schema helpers |
|
||||
|
|
|
|||
247
transaction.go
Normal file
247
transaction.go
Normal file
|
|
@ -0,0 +1,247 @@
|
|||
package store
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"time"
|
||||
|
||||
core "dappco.re/go/core"
|
||||
)
|
||||
|
||||
// Usage example: `err := storeInstance.Transaction(func(tx *store.StoreTx) error { return tx.Set("config", "colour", "blue") })`
|
||||
type StoreTx struct {
|
||||
store *Store
|
||||
transaction *sql.Tx
|
||||
pendingEvents []Event
|
||||
}
|
||||
|
||||
// Usage example: `err := storeInstance.Transaction(func(tx *store.StoreTx) error { if err := tx.Set("tenant-a:config", "colour", "blue"); err != nil { return err }; return tx.Set("tenant-b:config", "language", "en-GB") })`
|
||||
func (storeInstance *Store) Transaction(operation func(*StoreTx) error) error {
|
||||
if err := storeInstance.ensureReady("store.Transaction"); err != nil {
|
||||
return err
|
||||
}
|
||||
if operation == nil {
|
||||
return core.E("store.Transaction", "operation is nil", nil)
|
||||
}
|
||||
|
||||
transaction, err := storeInstance.database.Begin()
|
||||
if err != nil {
|
||||
return core.E("store.Transaction", "begin transaction", err)
|
||||
}
|
||||
|
||||
storeTransaction := &StoreTx{
|
||||
store: storeInstance,
|
||||
transaction: transaction,
|
||||
}
|
||||
|
||||
committed := false
|
||||
defer func() {
|
||||
if !committed {
|
||||
_ = transaction.Rollback()
|
||||
}
|
||||
}()
|
||||
|
||||
if err := operation(storeTransaction); err != nil {
|
||||
return core.E("store.Transaction", "execute transaction", err)
|
||||
}
|
||||
if err := transaction.Commit(); err != nil {
|
||||
return core.E("store.Transaction", "commit transaction", err)
|
||||
}
|
||||
committed = true
|
||||
|
||||
for _, event := range storeTransaction.pendingEvents {
|
||||
storeInstance.notify(event)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (storeTransaction *StoreTx) ensureReady(operation string) error {
|
||||
if storeTransaction == nil {
|
||||
return core.E(operation, "transaction is nil", nil)
|
||||
}
|
||||
if storeTransaction.store == nil {
|
||||
return core.E(operation, "transaction store is nil", nil)
|
||||
}
|
||||
if storeTransaction.transaction == nil {
|
||||
return core.E(operation, "transaction database is nil", nil)
|
||||
}
|
||||
if err := storeTransaction.store.ensureReady(operation); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (storeTransaction *StoreTx) recordEvent(event Event) {
|
||||
if storeTransaction == nil {
|
||||
return
|
||||
}
|
||||
storeTransaction.pendingEvents = append(storeTransaction.pendingEvents, event)
|
||||
}
|
||||
|
||||
// Usage example: `value, err := tx.Get("config", "colour")`
|
||||
func (storeTransaction *StoreTx) Get(group, key string) (string, error) {
|
||||
if err := storeTransaction.ensureReady("store.Transaction.Get"); err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
var value string
|
||||
var expiresAt sql.NullInt64
|
||||
err := storeTransaction.transaction.QueryRow(
|
||||
"SELECT "+entryValueColumn+", expires_at FROM "+entriesTableName+" WHERE "+entryGroupColumn+" = ? AND "+entryKeyColumn+" = ?",
|
||||
group, key,
|
||||
).Scan(&value, &expiresAt)
|
||||
if err == sql.ErrNoRows {
|
||||
return "", core.E("store.Transaction.Get", core.Concat(group, "/", key), NotFoundError)
|
||||
}
|
||||
if err != nil {
|
||||
return "", core.E("store.Transaction.Get", "query row", err)
|
||||
}
|
||||
if expiresAt.Valid && expiresAt.Int64 <= time.Now().UnixMilli() {
|
||||
if err := storeTransaction.Delete(group, key); err != nil {
|
||||
return "", core.E("store.Transaction.Get", "delete expired row", err)
|
||||
}
|
||||
return "", core.E("store.Transaction.Get", core.Concat(group, "/", key), NotFoundError)
|
||||
}
|
||||
return value, nil
|
||||
}
|
||||
|
||||
// Usage example: `if err := tx.Set("config", "colour", "blue"); err != nil { return err }`
|
||||
func (storeTransaction *StoreTx) Set(group, key, value string) error {
|
||||
if err := storeTransaction.ensureReady("store.Transaction.Set"); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err := storeTransaction.transaction.Exec(
|
||||
"INSERT INTO "+entriesTableName+" ("+entryGroupColumn+", "+entryKeyColumn+", "+entryValueColumn+", expires_at) VALUES (?, ?, ?, NULL) "+
|
||||
"ON CONFLICT("+entryGroupColumn+", "+entryKeyColumn+") DO UPDATE SET "+entryValueColumn+" = excluded."+entryValueColumn+", expires_at = NULL",
|
||||
group, key, value,
|
||||
)
|
||||
if err != nil {
|
||||
return core.E("store.Transaction.Set", "execute upsert", err)
|
||||
}
|
||||
storeTransaction.recordEvent(Event{Type: EventSet, Group: group, Key: key, Value: value, Timestamp: time.Now()})
|
||||
return nil
|
||||
}
|
||||
|
||||
// Usage example: `if err := tx.SetWithTTL("session", "token", "abc123", time.Minute); err != nil { return err }`
|
||||
func (storeTransaction *StoreTx) SetWithTTL(group, key, value string, timeToLive time.Duration) error {
|
||||
if err := storeTransaction.ensureReady("store.Transaction.SetWithTTL"); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
expiresAt := time.Now().Add(timeToLive).UnixMilli()
|
||||
_, err := storeTransaction.transaction.Exec(
|
||||
"INSERT INTO "+entriesTableName+" ("+entryGroupColumn+", "+entryKeyColumn+", "+entryValueColumn+", expires_at) VALUES (?, ?, ?, ?) "+
|
||||
"ON CONFLICT("+entryGroupColumn+", "+entryKeyColumn+") DO UPDATE SET "+entryValueColumn+" = excluded."+entryValueColumn+", expires_at = excluded.expires_at",
|
||||
group, key, value, expiresAt,
|
||||
)
|
||||
if err != nil {
|
||||
return core.E("store.Transaction.SetWithTTL", "execute upsert with expiry", err)
|
||||
}
|
||||
storeTransaction.recordEvent(Event{Type: EventSet, Group: group, Key: key, Value: value, Timestamp: time.Now()})
|
||||
return nil
|
||||
}
|
||||
|
||||
// Usage example: `if err := tx.Delete("config", "colour"); err != nil { return err }`
|
||||
func (storeTransaction *StoreTx) Delete(group, key string) error {
|
||||
if err := storeTransaction.ensureReady("store.Transaction.Delete"); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
deleteResult, err := storeTransaction.transaction.Exec(
|
||||
"DELETE FROM "+entriesTableName+" WHERE "+entryGroupColumn+" = ? AND "+entryKeyColumn+" = ?",
|
||||
group, key,
|
||||
)
|
||||
if err != nil {
|
||||
return core.E("store.Transaction.Delete", "delete row", err)
|
||||
}
|
||||
deletedRows, rowsAffectedError := deleteResult.RowsAffected()
|
||||
if rowsAffectedError != nil {
|
||||
return core.E("store.Transaction.Delete", "count deleted rows", rowsAffectedError)
|
||||
}
|
||||
if deletedRows > 0 {
|
||||
storeTransaction.recordEvent(Event{Type: EventDelete, Group: group, Key: key, Timestamp: time.Now()})
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Usage example: `if err := tx.DeleteGroup("cache"); err != nil { return err }`
|
||||
func (storeTransaction *StoreTx) DeleteGroup(group string) error {
|
||||
if err := storeTransaction.ensureReady("store.Transaction.DeleteGroup"); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
deleteResult, err := storeTransaction.transaction.Exec(
|
||||
"DELETE FROM "+entriesTableName+" WHERE "+entryGroupColumn+" = ?",
|
||||
group,
|
||||
)
|
||||
if err != nil {
|
||||
return core.E("store.Transaction.DeleteGroup", "delete group", err)
|
||||
}
|
||||
deletedRows, rowsAffectedError := deleteResult.RowsAffected()
|
||||
if rowsAffectedError != nil {
|
||||
return core.E("store.Transaction.DeleteGroup", "count deleted rows", rowsAffectedError)
|
||||
}
|
||||
if deletedRows > 0 {
|
||||
storeTransaction.recordEvent(Event{Type: EventDeleteGroup, Group: group, Timestamp: time.Now()})
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Usage example: `if err := tx.DeletePrefix("tenant-a:"); err != nil { return err }`
|
||||
func (storeTransaction *StoreTx) DeletePrefix(groupPrefix string) error {
|
||||
if err := storeTransaction.ensureReady("store.Transaction.DeletePrefix"); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var rows *sql.Rows
|
||||
var err error
|
||||
if groupPrefix == "" {
|
||||
rows, err = storeTransaction.transaction.Query(
|
||||
"SELECT DISTINCT " + entryGroupColumn + " FROM " + entriesTableName + " ORDER BY " + entryGroupColumn,
|
||||
)
|
||||
} else {
|
||||
rows, err = storeTransaction.transaction.Query(
|
||||
"SELECT DISTINCT "+entryGroupColumn+" FROM "+entriesTableName+" WHERE "+entryGroupColumn+" LIKE ? ESCAPE '^' ORDER BY "+entryGroupColumn,
|
||||
escapeLike(groupPrefix)+"%",
|
||||
)
|
||||
}
|
||||
if err != nil {
|
||||
return core.E("store.Transaction.DeletePrefix", "list groups", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var groupNames []string
|
||||
for rows.Next() {
|
||||
var groupName string
|
||||
if err := rows.Scan(&groupName); err != nil {
|
||||
return core.E("store.Transaction.DeletePrefix", "scan group name", err)
|
||||
}
|
||||
groupNames = append(groupNames, groupName)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return core.E("store.Transaction.DeletePrefix", "iterate groups", err)
|
||||
}
|
||||
for _, groupName := range groupNames {
|
||||
if err := storeTransaction.DeleteGroup(groupName); err != nil {
|
||||
return core.E("store.Transaction.DeletePrefix", "delete group", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Usage example: `keyCount, err := tx.Count("config")`
|
||||
func (storeTransaction *StoreTx) Count(group string) (int, error) {
|
||||
if err := storeTransaction.ensureReady("store.Transaction.Count"); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
var count int
|
||||
err := storeTransaction.transaction.QueryRow(
|
||||
"SELECT COUNT(*) FROM "+entriesTableName+" WHERE "+entryGroupColumn+" = ? AND (expires_at IS NULL OR expires_at > ?)",
|
||||
group, time.Now().UnixMilli(),
|
||||
).Scan(&count)
|
||||
if err != nil {
|
||||
return 0, core.E("store.Transaction.Count", "count rows", err)
|
||||
}
|
||||
return count, nil
|
||||
}
|
||||
83
transaction_test.go
Normal file
83
transaction_test.go
Normal file
|
|
@ -0,0 +1,83 @@
|
|||
package store
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
core "dappco.re/go/core"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestTransaction_Transaction_Good_CommitsMultipleWrites(t *testing.T) {
|
||||
storeInstance, _ := New(":memory:")
|
||||
defer storeInstance.Close()
|
||||
|
||||
events := storeInstance.Watch("*")
|
||||
defer storeInstance.Unwatch("*", events)
|
||||
|
||||
err := storeInstance.Transaction(func(transaction *StoreTx) error {
|
||||
if err := transaction.Set("alpha", "first", "1"); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := transaction.Set("beta", "second", "2"); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
firstValue, err := storeInstance.Get("alpha", "first")
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "1", firstValue)
|
||||
|
||||
secondValue, err := storeInstance.Get("beta", "second")
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "2", secondValue)
|
||||
|
||||
received := drainEvents(events, 2, time.Second)
|
||||
require.Len(t, received, 2)
|
||||
assert.Equal(t, EventSet, received[0].Type)
|
||||
assert.Equal(t, "alpha", received[0].Group)
|
||||
assert.Equal(t, "first", received[0].Key)
|
||||
assert.Equal(t, EventSet, received[1].Type)
|
||||
assert.Equal(t, "beta", received[1].Group)
|
||||
assert.Equal(t, "second", received[1].Key)
|
||||
}
|
||||
|
||||
func TestTransaction_Transaction_Good_RollbackOnError(t *testing.T) {
|
||||
storeInstance, _ := New(":memory:")
|
||||
defer storeInstance.Close()
|
||||
|
||||
err := storeInstance.Transaction(func(transaction *StoreTx) error {
|
||||
if err := transaction.Set("alpha", "first", "1"); err != nil {
|
||||
return err
|
||||
}
|
||||
return core.E("test", "force rollback", nil)
|
||||
})
|
||||
require.Error(t, err)
|
||||
|
||||
_, err = storeInstance.Get("alpha", "first")
|
||||
assert.ErrorIs(t, err, NotFoundError)
|
||||
}
|
||||
|
||||
func TestTransaction_Transaction_Good_DeletesAtomically(t *testing.T) {
|
||||
storeInstance, _ := New(":memory:")
|
||||
defer storeInstance.Close()
|
||||
|
||||
require.NoError(t, storeInstance.Set("alpha", "first", "1"))
|
||||
require.NoError(t, storeInstance.Set("beta", "second", "2"))
|
||||
|
||||
err := storeInstance.Transaction(func(transaction *StoreTx) error {
|
||||
if err := transaction.DeletePrefix(""); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = storeInstance.Get("alpha", "first")
|
||||
assert.ErrorIs(t, err, NotFoundError)
|
||||
_, err = storeInstance.Get("beta", "second")
|
||||
assert.ErrorIs(t, err, NotFoundError)
|
||||
}
|
||||
Loading…
Add table
Reference in a new issue