[agent/codex:gpt-5.4-mini] Read ~/spec/code/core/go/store/RFC.md fully. Find ONE featur... #108
3 changed files with 291 additions and 1 deletions
|
|
@ -241,7 +241,7 @@ All operations are safe to call from multiple goroutines concurrently. The race
|
|||
|
||||
## Transaction API
|
||||
|
||||
`Store.Transaction(func(transaction *StoreTransaction) error)` opens a SQLite transaction and hands a `StoreTransaction` helper to the callback. The helper exposes transaction-scoped write methods such as `Set`, `SetWithTTL`, `Delete`, `DeleteGroup`, and `DeletePrefix`. If the callback returns an error, the transaction rolls back. If the callback succeeds, the transaction commits and the staged events are published after commit.
|
||||
`Store.Transaction(func(transaction *StoreTransaction) error)` opens a SQLite transaction and hands a `StoreTransaction` helper to the callback. The helper exposes transaction-scoped write methods such as `Set`, `SetWithTTL`, `Delete`, `DeleteGroup`, and `DeletePrefix`, plus read helpers such as `Get`, `GetAll`, `All`, `Count`, `CountAll`, `Groups`, `GroupsSeq`, `Render`, `GetSplit`, and `GetFields` so callers can inspect uncommitted writes before commit. If the callback returns an error, the transaction rolls back. If the callback succeeds, the transaction commits and the staged events are published after commit.
|
||||
|
||||
This API is the supported way to perform atomic multi-group operations without exposing raw `Begin`/`Commit` control to callers.
|
||||
|
||||
|
|
|
|||
235
transaction.go
235
transaction.go
|
|
@ -2,6 +2,8 @@ package store
|
|||
|
||||
import (
|
||||
"database/sql"
|
||||
"iter"
|
||||
"text/template"
|
||||
"time"
|
||||
|
||||
core "dappco.re/go/core"
|
||||
|
|
@ -245,3 +247,236 @@ func (storeTransaction *StoreTransaction) Count(group string) (int, error) {
|
|||
}
|
||||
return count, nil
|
||||
}
|
||||
|
||||
// Usage example: `colourEntries, err := tx.GetAll("config")`
|
||||
func (storeTransaction *StoreTransaction) GetAll(group string) (map[string]string, error) {
|
||||
if err := storeTransaction.ensureReady("store.Transaction.GetAll"); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
entriesByKey := make(map[string]string)
|
||||
for entry, err := range storeTransaction.All(group) {
|
||||
if err != nil {
|
||||
return nil, core.E("store.Transaction.GetAll", "iterate rows", err)
|
||||
}
|
||||
entriesByKey[entry.Key] = entry.Value
|
||||
}
|
||||
return entriesByKey, nil
|
||||
}
|
||||
|
||||
// Usage example: `page, err := tx.GetPage("config", 0, 25); if err != nil { return }; for _, entry := range page { fmt.Println(entry.Key, entry.Value) }`
|
||||
func (storeTransaction *StoreTransaction) GetPage(group string, offset, limit int) ([]KeyValue, error) {
|
||||
if err := storeTransaction.ensureReady("store.Transaction.GetPage"); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if offset < 0 {
|
||||
return nil, core.E("store.Transaction.GetPage", "offset must be zero or positive", nil)
|
||||
}
|
||||
if limit < 0 {
|
||||
return nil, core.E("store.Transaction.GetPage", "limit must be zero or positive", nil)
|
||||
}
|
||||
|
||||
rows, err := storeTransaction.transaction.Query(
|
||||
"SELECT "+entryKeyColumn+", "+entryValueColumn+" FROM "+entriesTableName+" WHERE "+entryGroupColumn+" = ? AND (expires_at IS NULL OR expires_at > ?) ORDER BY "+entryKeyColumn+" LIMIT ? OFFSET ?",
|
||||
group, time.Now().UnixMilli(), limit, offset,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, core.E("store.Transaction.GetPage", "query rows", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
page := make([]KeyValue, 0, limit)
|
||||
for rows.Next() {
|
||||
var entry KeyValue
|
||||
if err := rows.Scan(&entry.Key, &entry.Value); err != nil {
|
||||
return nil, core.E("store.Transaction.GetPage", "scan row", err)
|
||||
}
|
||||
page = append(page, entry)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, core.E("store.Transaction.GetPage", "rows iteration", err)
|
||||
}
|
||||
return page, nil
|
||||
}
|
||||
|
||||
// Usage example: `for entry, err := range tx.All("config") { if err != nil { break }; fmt.Println(entry.Key, entry.Value) }`
|
||||
func (storeTransaction *StoreTransaction) All(group string) iter.Seq2[KeyValue, error] {
|
||||
return storeTransaction.AllSeq(group)
|
||||
}
|
||||
|
||||
// Usage example: `for entry, err := range tx.AllSeq("config") { if err != nil { break }; fmt.Println(entry.Key, entry.Value) }`
|
||||
func (storeTransaction *StoreTransaction) AllSeq(group string) iter.Seq2[KeyValue, error] {
|
||||
return func(yield func(KeyValue, error) bool) {
|
||||
if err := storeTransaction.ensureReady("store.Transaction.All"); err != nil {
|
||||
yield(KeyValue{}, err)
|
||||
return
|
||||
}
|
||||
|
||||
rows, err := storeTransaction.transaction.Query(
|
||||
"SELECT "+entryKeyColumn+", "+entryValueColumn+" FROM "+entriesTableName+" WHERE "+entryGroupColumn+" = ? AND (expires_at IS NULL OR expires_at > ?) ORDER BY "+entryKeyColumn,
|
||||
group, time.Now().UnixMilli(),
|
||||
)
|
||||
if err != nil {
|
||||
yield(KeyValue{}, core.E("store.Transaction.All", "query rows", err))
|
||||
return
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
for rows.Next() {
|
||||
var entry KeyValue
|
||||
if err := rows.Scan(&entry.Key, &entry.Value); err != nil {
|
||||
if !yield(KeyValue{}, core.E("store.Transaction.All", "scan row", err)) {
|
||||
return
|
||||
}
|
||||
continue
|
||||
}
|
||||
if !yield(entry, nil) {
|
||||
return
|
||||
}
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
yield(KeyValue{}, core.E("store.Transaction.All", "rows iteration", err))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Usage example: `removedRows, err := tx.CountAll("tenant-a:")`
|
||||
func (storeTransaction *StoreTransaction) CountAll(groupPrefix string) (int, error) {
|
||||
if err := storeTransaction.ensureReady("store.Transaction.CountAll"); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
var count int
|
||||
var err error
|
||||
if groupPrefix == "" {
|
||||
err = storeTransaction.transaction.QueryRow(
|
||||
"SELECT COUNT(*) FROM "+entriesTableName+" WHERE (expires_at IS NULL OR expires_at > ?)",
|
||||
time.Now().UnixMilli(),
|
||||
).Scan(&count)
|
||||
} else {
|
||||
err = storeTransaction.transaction.QueryRow(
|
||||
"SELECT COUNT(*) FROM "+entriesTableName+" WHERE "+entryGroupColumn+" LIKE ? ESCAPE '^' AND (expires_at IS NULL OR expires_at > ?)",
|
||||
escapeLike(groupPrefix)+"%", time.Now().UnixMilli(),
|
||||
).Scan(&count)
|
||||
}
|
||||
if err != nil {
|
||||
return 0, core.E("store.Transaction.CountAll", "count rows", err)
|
||||
}
|
||||
return count, nil
|
||||
}
|
||||
|
||||
// Usage example: `groupNames, err := tx.Groups("tenant-a:")`
|
||||
// Usage example: `groupNames, err := tx.Groups()`
|
||||
func (storeTransaction *StoreTransaction) Groups(groupPrefix ...string) ([]string, error) {
|
||||
if err := storeTransaction.ensureReady("store.Transaction.Groups"); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var groupNames []string
|
||||
for groupName, err := range storeTransaction.GroupsSeq(groupPrefix...) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
groupNames = append(groupNames, groupName)
|
||||
}
|
||||
return groupNames, nil
|
||||
}
|
||||
|
||||
// Usage example: `for groupName, err := range tx.GroupsSeq("tenant-a:") { if err != nil { break }; fmt.Println(groupName) }`
|
||||
// Usage example: `for groupName, err := range tx.GroupsSeq() { if err != nil { break }; fmt.Println(groupName) }`
|
||||
func (storeTransaction *StoreTransaction) GroupsSeq(groupPrefix ...string) iter.Seq2[string, error] {
|
||||
actualGroupPrefix := firstOrEmptyString(groupPrefix)
|
||||
return func(yield func(string, error) bool) {
|
||||
if err := storeTransaction.ensureReady("store.Transaction.GroupsSeq"); err != nil {
|
||||
yield("", err)
|
||||
return
|
||||
}
|
||||
|
||||
var rows *sql.Rows
|
||||
var err error
|
||||
now := time.Now().UnixMilli()
|
||||
if actualGroupPrefix == "" {
|
||||
rows, err = storeTransaction.transaction.Query(
|
||||
"SELECT DISTINCT "+entryGroupColumn+" FROM "+entriesTableName+" WHERE (expires_at IS NULL OR expires_at > ?) ORDER BY "+entryGroupColumn,
|
||||
now,
|
||||
)
|
||||
} else {
|
||||
rows, err = storeTransaction.transaction.Query(
|
||||
"SELECT DISTINCT "+entryGroupColumn+" FROM "+entriesTableName+" WHERE "+entryGroupColumn+" LIKE ? ESCAPE '^' AND (expires_at IS NULL OR expires_at > ?) ORDER BY "+entryGroupColumn,
|
||||
escapeLike(actualGroupPrefix)+"%", now,
|
||||
)
|
||||
}
|
||||
if err != nil {
|
||||
yield("", core.E("store.Transaction.GroupsSeq", "query group names", err))
|
||||
return
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
for rows.Next() {
|
||||
var groupName string
|
||||
if err := rows.Scan(&groupName); err != nil {
|
||||
if !yield("", core.E("store.Transaction.GroupsSeq", "scan group name", err)) {
|
||||
return
|
||||
}
|
||||
continue
|
||||
}
|
||||
if !yield(groupName, nil) {
|
||||
return
|
||||
}
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
yield("", core.E("store.Transaction.GroupsSeq", "rows iteration", err))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Usage example: `renderedTemplate, err := tx.Render("Hello {{ .name }}", "user")`
|
||||
func (storeTransaction *StoreTransaction) Render(templateSource, group string) (string, error) {
|
||||
if err := storeTransaction.ensureReady("store.Transaction.Render"); err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
templateData := make(map[string]string)
|
||||
for entry, err := range storeTransaction.All(group) {
|
||||
if err != nil {
|
||||
return "", core.E("store.Transaction.Render", "iterate rows", err)
|
||||
}
|
||||
templateData[entry.Key] = entry.Value
|
||||
}
|
||||
|
||||
renderTemplate, err := template.New("render").Parse(templateSource)
|
||||
if err != nil {
|
||||
return "", core.E("store.Transaction.Render", "parse template", err)
|
||||
}
|
||||
builder := core.NewBuilder()
|
||||
if err := renderTemplate.Execute(builder, templateData); err != nil {
|
||||
return "", core.E("store.Transaction.Render", "execute template", err)
|
||||
}
|
||||
return builder.String(), nil
|
||||
}
|
||||
|
||||
// Usage example: `parts, err := tx.GetSplit("config", "hosts", ","); if err != nil { return }; for part := range parts { fmt.Println(part) }`
|
||||
func (storeTransaction *StoreTransaction) GetSplit(group, key, separator string) (iter.Seq[string], error) {
|
||||
if err := storeTransaction.ensureReady("store.Transaction.GetSplit"); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
value, err := storeTransaction.Get(group, key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return splitValueSeq(value, separator), nil
|
||||
}
|
||||
|
||||
// Usage example: `fields, err := tx.GetFields("config", "flags"); if err != nil { return }; for field := range fields { fmt.Println(field) }`
|
||||
func (storeTransaction *StoreTransaction) GetFields(group, key string) (iter.Seq[string], error) {
|
||||
if err := storeTransaction.ensureReady("store.Transaction.GetFields"); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
value, err := storeTransaction.Get(group, key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return fieldsValueSeq(value), nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
package store
|
||||
|
||||
import (
|
||||
"iter"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
|
@ -81,3 +82,57 @@ func TestTransaction_Transaction_Good_DeletesAtomically(t *testing.T) {
|
|||
_, err = storeInstance.Get("beta", "second")
|
||||
assert.ErrorIs(t, err, NotFoundError)
|
||||
}
|
||||
|
||||
func TestTransaction_Transaction_Good_ReadHelpersSeePendingWrites(t *testing.T) {
|
||||
storeInstance, _ := New(":memory:")
|
||||
defer storeInstance.Close()
|
||||
|
||||
err := storeInstance.Transaction(func(transaction *StoreTransaction) error {
|
||||
if err := transaction.Set("config", "colour", "blue"); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := transaction.Set("config", "hosts", "alpha beta"); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := transaction.Set("audit", "enabled", "true"); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
entriesByKey, err := transaction.GetAll("config")
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, map[string]string{"colour": "blue", "hosts": "alpha beta"}, entriesByKey)
|
||||
|
||||
count, err := transaction.CountAll("")
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 3, count)
|
||||
|
||||
groupNames, err := transaction.Groups()
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, []string{"audit", "config"}, groupNames)
|
||||
|
||||
renderedTemplate, err := transaction.Render("{{ .colour }} / {{ .hosts }}", "config")
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "blue / alpha beta", renderedTemplate)
|
||||
|
||||
splitParts, err := transaction.GetSplit("config", "hosts", " ")
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, []string{"alpha", "beta"}, collectSeq(t, splitParts))
|
||||
|
||||
fieldParts, err := transaction.GetFields("config", "hosts")
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, []string{"alpha", "beta"}, collectSeq(t, fieldParts))
|
||||
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func collectSeq[T any](t *testing.T, sequence iter.Seq[T]) []T {
|
||||
t.Helper()
|
||||
|
||||
values := make([]T, 0)
|
||||
for value := range sequence {
|
||||
values = append(values, value)
|
||||
}
|
||||
return values
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue