From a3f49539f48832f672e78d97414453a921f1e283 Mon Sep 17 00:00:00 2001 From: Virgil Date: Sat, 4 Apr 2026 09:09:12 +0000 Subject: [PATCH] feat(store): add transaction read helpers Co-Authored-By: Virgil --- docs/architecture.md | 2 +- transaction.go | 235 +++++++++++++++++++++++++++++++++++++++++++ transaction_test.go | 55 ++++++++++ 3 files changed, 291 insertions(+), 1 deletion(-) diff --git a/docs/architecture.md b/docs/architecture.md index 9a06d59..51c9d81 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -241,7 +241,7 @@ All operations are safe to call from multiple goroutines concurrently. The race ## Transaction API -`Store.Transaction(func(transaction *StoreTransaction) error)` opens a SQLite transaction and hands a `StoreTransaction` helper to the callback. The helper exposes transaction-scoped write methods such as `Set`, `SetWithTTL`, `Delete`, `DeleteGroup`, and `DeletePrefix`. If the callback returns an error, the transaction rolls back. If the callback succeeds, the transaction commits and the staged events are published after commit. +`Store.Transaction(func(transaction *StoreTransaction) error)` opens a SQLite transaction and hands a `StoreTransaction` helper to the callback. The helper exposes transaction-scoped write methods such as `Set`, `SetWithTTL`, `Delete`, `DeleteGroup`, and `DeletePrefix`, plus read helpers such as `Get`, `GetAll`, `All`, `Count`, `CountAll`, `Groups`, `GroupsSeq`, `Render`, `GetSplit`, and `GetFields` so callers can inspect uncommitted writes before commit. If the callback returns an error, the transaction rolls back. If the callback succeeds, the transaction commits and the staged events are published after commit. This API is the supported way to perform atomic multi-group operations without exposing raw `Begin`/`Commit` control to callers. diff --git a/transaction.go b/transaction.go index a029689..71aff6e 100644 --- a/transaction.go +++ b/transaction.go @@ -2,6 +2,8 @@ package store import ( "database/sql" + "iter" + "text/template" "time" core "dappco.re/go/core" @@ -245,3 +247,236 @@ func (storeTransaction *StoreTransaction) Count(group string) (int, error) { } return count, nil } + +// Usage example: `colourEntries, err := tx.GetAll("config")` +func (storeTransaction *StoreTransaction) GetAll(group string) (map[string]string, error) { + if err := storeTransaction.ensureReady("store.Transaction.GetAll"); err != nil { + return nil, err + } + + entriesByKey := make(map[string]string) + for entry, err := range storeTransaction.All(group) { + if err != nil { + return nil, core.E("store.Transaction.GetAll", "iterate rows", err) + } + entriesByKey[entry.Key] = entry.Value + } + return entriesByKey, nil +} + +// Usage example: `page, err := tx.GetPage("config", 0, 25); if err != nil { return }; for _, entry := range page { fmt.Println(entry.Key, entry.Value) }` +func (storeTransaction *StoreTransaction) GetPage(group string, offset, limit int) ([]KeyValue, error) { + if err := storeTransaction.ensureReady("store.Transaction.GetPage"); err != nil { + return nil, err + } + if offset < 0 { + return nil, core.E("store.Transaction.GetPage", "offset must be zero or positive", nil) + } + if limit < 0 { + return nil, core.E("store.Transaction.GetPage", "limit must be zero or positive", nil) + } + + rows, err := storeTransaction.transaction.Query( + "SELECT "+entryKeyColumn+", "+entryValueColumn+" FROM "+entriesTableName+" WHERE "+entryGroupColumn+" = ? AND (expires_at IS NULL OR expires_at > ?) ORDER BY "+entryKeyColumn+" LIMIT ? OFFSET ?", + group, time.Now().UnixMilli(), limit, offset, + ) + if err != nil { + return nil, core.E("store.Transaction.GetPage", "query rows", err) + } + defer rows.Close() + + page := make([]KeyValue, 0, limit) + for rows.Next() { + var entry KeyValue + if err := rows.Scan(&entry.Key, &entry.Value); err != nil { + return nil, core.E("store.Transaction.GetPage", "scan row", err) + } + page = append(page, entry) + } + if err := rows.Err(); err != nil { + return nil, core.E("store.Transaction.GetPage", "rows iteration", err) + } + return page, nil +} + +// Usage example: `for entry, err := range tx.All("config") { if err != nil { break }; fmt.Println(entry.Key, entry.Value) }` +func (storeTransaction *StoreTransaction) All(group string) iter.Seq2[KeyValue, error] { + return storeTransaction.AllSeq(group) +} + +// Usage example: `for entry, err := range tx.AllSeq("config") { if err != nil { break }; fmt.Println(entry.Key, entry.Value) }` +func (storeTransaction *StoreTransaction) AllSeq(group string) iter.Seq2[KeyValue, error] { + return func(yield func(KeyValue, error) bool) { + if err := storeTransaction.ensureReady("store.Transaction.All"); err != nil { + yield(KeyValue{}, err) + return + } + + rows, err := storeTransaction.transaction.Query( + "SELECT "+entryKeyColumn+", "+entryValueColumn+" FROM "+entriesTableName+" WHERE "+entryGroupColumn+" = ? AND (expires_at IS NULL OR expires_at > ?) ORDER BY "+entryKeyColumn, + group, time.Now().UnixMilli(), + ) + if err != nil { + yield(KeyValue{}, core.E("store.Transaction.All", "query rows", err)) + return + } + defer rows.Close() + + for rows.Next() { + var entry KeyValue + if err := rows.Scan(&entry.Key, &entry.Value); err != nil { + if !yield(KeyValue{}, core.E("store.Transaction.All", "scan row", err)) { + return + } + continue + } + if !yield(entry, nil) { + return + } + } + if err := rows.Err(); err != nil { + yield(KeyValue{}, core.E("store.Transaction.All", "rows iteration", err)) + } + } +} + +// Usage example: `removedRows, err := tx.CountAll("tenant-a:")` +func (storeTransaction *StoreTransaction) CountAll(groupPrefix string) (int, error) { + if err := storeTransaction.ensureReady("store.Transaction.CountAll"); err != nil { + return 0, err + } + + var count int + var err error + if groupPrefix == "" { + err = storeTransaction.transaction.QueryRow( + "SELECT COUNT(*) FROM "+entriesTableName+" WHERE (expires_at IS NULL OR expires_at > ?)", + time.Now().UnixMilli(), + ).Scan(&count) + } else { + err = storeTransaction.transaction.QueryRow( + "SELECT COUNT(*) FROM "+entriesTableName+" WHERE "+entryGroupColumn+" LIKE ? ESCAPE '^' AND (expires_at IS NULL OR expires_at > ?)", + escapeLike(groupPrefix)+"%", time.Now().UnixMilli(), + ).Scan(&count) + } + if err != nil { + return 0, core.E("store.Transaction.CountAll", "count rows", err) + } + return count, nil +} + +// Usage example: `groupNames, err := tx.Groups("tenant-a:")` +// Usage example: `groupNames, err := tx.Groups()` +func (storeTransaction *StoreTransaction) Groups(groupPrefix ...string) ([]string, error) { + if err := storeTransaction.ensureReady("store.Transaction.Groups"); err != nil { + return nil, err + } + + var groupNames []string + for groupName, err := range storeTransaction.GroupsSeq(groupPrefix...) { + if err != nil { + return nil, err + } + groupNames = append(groupNames, groupName) + } + return groupNames, nil +} + +// Usage example: `for groupName, err := range tx.GroupsSeq("tenant-a:") { if err != nil { break }; fmt.Println(groupName) }` +// Usage example: `for groupName, err := range tx.GroupsSeq() { if err != nil { break }; fmt.Println(groupName) }` +func (storeTransaction *StoreTransaction) GroupsSeq(groupPrefix ...string) iter.Seq2[string, error] { + actualGroupPrefix := firstOrEmptyString(groupPrefix) + return func(yield func(string, error) bool) { + if err := storeTransaction.ensureReady("store.Transaction.GroupsSeq"); err != nil { + yield("", err) + return + } + + var rows *sql.Rows + var err error + now := time.Now().UnixMilli() + if actualGroupPrefix == "" { + rows, err = storeTransaction.transaction.Query( + "SELECT DISTINCT "+entryGroupColumn+" FROM "+entriesTableName+" WHERE (expires_at IS NULL OR expires_at > ?) ORDER BY "+entryGroupColumn, + now, + ) + } else { + rows, err = storeTransaction.transaction.Query( + "SELECT DISTINCT "+entryGroupColumn+" FROM "+entriesTableName+" WHERE "+entryGroupColumn+" LIKE ? ESCAPE '^' AND (expires_at IS NULL OR expires_at > ?) ORDER BY "+entryGroupColumn, + escapeLike(actualGroupPrefix)+"%", now, + ) + } + if err != nil { + yield("", core.E("store.Transaction.GroupsSeq", "query group names", err)) + return + } + defer rows.Close() + + for rows.Next() { + var groupName string + if err := rows.Scan(&groupName); err != nil { + if !yield("", core.E("store.Transaction.GroupsSeq", "scan group name", err)) { + return + } + continue + } + if !yield(groupName, nil) { + return + } + } + if err := rows.Err(); err != nil { + yield("", core.E("store.Transaction.GroupsSeq", "rows iteration", err)) + } + } +} + +// Usage example: `renderedTemplate, err := tx.Render("Hello {{ .name }}", "user")` +func (storeTransaction *StoreTransaction) Render(templateSource, group string) (string, error) { + if err := storeTransaction.ensureReady("store.Transaction.Render"); err != nil { + return "", err + } + + templateData := make(map[string]string) + for entry, err := range storeTransaction.All(group) { + if err != nil { + return "", core.E("store.Transaction.Render", "iterate rows", err) + } + templateData[entry.Key] = entry.Value + } + + renderTemplate, err := template.New("render").Parse(templateSource) + if err != nil { + return "", core.E("store.Transaction.Render", "parse template", err) + } + builder := core.NewBuilder() + if err := renderTemplate.Execute(builder, templateData); err != nil { + return "", core.E("store.Transaction.Render", "execute template", err) + } + return builder.String(), nil +} + +// Usage example: `parts, err := tx.GetSplit("config", "hosts", ","); if err != nil { return }; for part := range parts { fmt.Println(part) }` +func (storeTransaction *StoreTransaction) GetSplit(group, key, separator string) (iter.Seq[string], error) { + if err := storeTransaction.ensureReady("store.Transaction.GetSplit"); err != nil { + return nil, err + } + + value, err := storeTransaction.Get(group, key) + if err != nil { + return nil, err + } + return splitValueSeq(value, separator), nil +} + +// Usage example: `fields, err := tx.GetFields("config", "flags"); if err != nil { return }; for field := range fields { fmt.Println(field) }` +func (storeTransaction *StoreTransaction) GetFields(group, key string) (iter.Seq[string], error) { + if err := storeTransaction.ensureReady("store.Transaction.GetFields"); err != nil { + return nil, err + } + + value, err := storeTransaction.Get(group, key) + if err != nil { + return nil, err + } + return fieldsValueSeq(value), nil +} diff --git a/transaction_test.go b/transaction_test.go index 70f981d..73411d3 100644 --- a/transaction_test.go +++ b/transaction_test.go @@ -1,6 +1,7 @@ package store import ( + "iter" "testing" "time" @@ -81,3 +82,57 @@ func TestTransaction_Transaction_Good_DeletesAtomically(t *testing.T) { _, err = storeInstance.Get("beta", "second") assert.ErrorIs(t, err, NotFoundError) } + +func TestTransaction_Transaction_Good_ReadHelpersSeePendingWrites(t *testing.T) { + storeInstance, _ := New(":memory:") + defer storeInstance.Close() + + err := storeInstance.Transaction(func(transaction *StoreTransaction) error { + if err := transaction.Set("config", "colour", "blue"); err != nil { + return err + } + if err := transaction.Set("config", "hosts", "alpha beta"); err != nil { + return err + } + if err := transaction.Set("audit", "enabled", "true"); err != nil { + return err + } + + entriesByKey, err := transaction.GetAll("config") + require.NoError(t, err) + assert.Equal(t, map[string]string{"colour": "blue", "hosts": "alpha beta"}, entriesByKey) + + count, err := transaction.CountAll("") + require.NoError(t, err) + assert.Equal(t, 3, count) + + groupNames, err := transaction.Groups() + require.NoError(t, err) + assert.Equal(t, []string{"audit", "config"}, groupNames) + + renderedTemplate, err := transaction.Render("{{ .colour }} / {{ .hosts }}", "config") + require.NoError(t, err) + assert.Equal(t, "blue / alpha beta", renderedTemplate) + + splitParts, err := transaction.GetSplit("config", "hosts", " ") + require.NoError(t, err) + assert.Equal(t, []string{"alpha", "beta"}, collectSeq(t, splitParts)) + + fieldParts, err := transaction.GetFields("config", "hosts") + require.NoError(t, err) + assert.Equal(t, []string{"alpha", "beta"}, collectSeq(t, fieldParts)) + + return nil + }) + require.NoError(t, err) +} + +func collectSeq[T any](t *testing.T, sequence iter.Seq[T]) []T { + t.Helper() + + values := make([]T, 0) + for value := range sequence { + values = append(values, value) + } + return values +} -- 2.45.3