From da29c712b4a520d973813c81f4aee5e9d031c658 Mon Sep 17 00:00:00 2001 From: Virgil Date: Mon, 30 Mar 2026 16:57:26 +0000 Subject: [PATCH] docs(store): align RFC-STORE with AX Co-Authored-By: Virgil --- docs/RFC-STORE.md | 311 ++++++++++++++++++++-------------------------- 1 file changed, 137 insertions(+), 174 deletions(-) diff --git a/docs/RFC-STORE.md b/docs/RFC-STORE.md index c8479f9..9232482 100644 --- a/docs/RFC-STORE.md +++ b/docs/RFC-STORE.md @@ -1,216 +1,179 @@ -# go-store RFC — SQLite Key-Value Store +# go-store RFC - AX-Aligned SQLite Store > An agent should be able to use this store from this document alone. -**Module:** `dappco.re/go/store` +**Module:** `dappco.re/go/core/store` **Repository:** `core/go-store` -**Files:** 8 +**Package:** `store` --- ## 1. Overview -SQLite-backed key-value store with TTL, namespace isolation, reactive events, and quota enforcement. Pure Go (no CGO). Used by core/ide for memory caching and by agents for workspace state. +go-store is a single-package SQLite-backed key-value store with TTL expiry, namespace isolation, quota enforcement, and reactive mutation events. + +The public surface is intentionally small. Names are descriptive, comments show concrete usage, and the implementation keeps a single SQLite connection so pragma settings stay consistent. --- -## 2. Architecture +## 2. File Layout | File | Purpose | |------|---------| -| `store.go` | Core `Store`: CRUD on `(grp, key)` compound PK, TTL via `expires_at` (Unix ms), background purge (60s), `text/template` rendering, `iter.Seq2` iterators | -| `events.go` | `Watch`/`Unwatch` (buffered chan, cap 16, non-blocking sends) + `OnChange` callbacks (synchronous) | -| `scope.go` | `ScopedStore` wraps `*Store`, prefixes groups with `namespace:`. Quota enforcement (`MaxKeys`/`MaxGroups`) | -| `workspace.go` | `Workspace` buffer: DuckDB-backed mutable accumulation, atomic commit to journal | -| `journal.go` | InfluxDB journal: write completed units, query time-series, retention | -| `compact.go` | Cold archive: compress journal entries to JSONL.gz | -| `store_test.go` | Store unit tests | -| `workspace_test.go` | Workspace buffer tests | +| `doc.go` | Package comment with concrete usage examples | +| `store.go` | `Store`, CRUD, TTL, background purge, bulk reads, prefix counts, group discovery, string splitting helpers, template rendering | +| `events.go` | `EventType`, `Event`, `Watcher`, `Watch`, `Unwatch`, `OnChange`, internal dispatch | +| `scope.go` | `ScopedStore`, `QuotaConfig`, namespace validation, quota enforcement | +| `*_test.go` | Behavioural tests for CRUD, TTL, events, quotas, and defensive error paths | --- -## 3. Key Design Decisions +## 3. Core API -- **Single-connection SQLite.** `MaxOpenConns(1)` because SQLite pragmas (WAL, busy_timeout) are per-connection — a pool would hand out unpragma'd connections causing `SQLITE_BUSY` -- **TTL is triple-layered:** lazy delete on `Get`, query-time `WHERE` filtering, background purge goroutine -- **LIKE queries use `escapeLike()`** with `^` as escape char to prevent SQL wildcard injection +### Store + +- `New(databasePath string) (*Store, error)` opens a SQLite database, applies WAL and busy-timeout pragmas, and pins access to one connection. +- `Close() error` stops the background purge goroutine and closes the database. +- `Get(group, key string) (string, error)` returns a stored value or `NotFoundError`. +- `Set(group, key, value string) error` stores a value and clears any existing TTL. +- `SetWithTTL(group, key, value string, ttl time.Duration) error` stores a value that expires after the supplied duration. +- `Delete(group, key string) error` removes one key. +- `DeleteGroup(group string) error` removes every key in a group. +- `Count(group string) (int, error)` counts non-expired keys in one group. +- `CountAll(groupPrefix string) (int, error)` counts non-expired keys across groups that match a prefix. +- `GetAll(group string) (map[string]string, error)` returns all non-expired keys in one group. +- `All(group string) iter.Seq2[KeyValue, error]` streams all non-expired key-value pairs in one group. +- `Groups(groupPrefix string) ([]string, error)` returns distinct non-expired group names that match a prefix. +- `GroupsSeq(groupPrefix string) iter.Seq2[string, error]` streams matching group names. +- `GetSplit(group, key, separator string) (iter.Seq[string], error)` splits a stored value by a custom separator. +- `GetFields(group, key string) (iter.Seq[string], error)` splits a stored value on whitespace. +- `Render(templateSource, group string) (string, error)` renders a Go `text/template` using group data. +- `PurgeExpired() (int64, error)` removes expired rows immediately. + +### ScopedStore + +- `NewScoped(storeInstance *Store, namespace string) (*ScopedStore, error)` validates a namespace and prefixes groups with `namespace + ":"`. +- `NewScopedWithQuota(storeInstance *Store, namespace string, quota QuotaConfig) (*ScopedStore, error)` adds per-namespace key and group limits. +- `Namespace() string` returns the namespace string. +- `ScopedStore` exposes the same read and write methods as `Store`, with group names prefixed automatically. + +### Events + +- `EventType` values: `EventSet`, `EventDelete`, `EventDeleteGroup`. +- `EventType.String()` returns `set`, `delete`, `delete_group`, or `unknown`. +- `Event` carries `Type`, `Group`, `Key`, `Value`, and `Timestamp`. +- `Watcher` exposes `Events <-chan Event`. +- `Watch(group, key string) *Watcher` registers a buffered watcher. +- `Unwatch(watcher *Watcher)` removes a watcher and closes its channel. +- `OnChange(callback func(Event)) func()` registers a synchronous callback and returns an idempotent unregister function. + +### Quotas and Errors + +- `QuotaConfig{MaxKeys, MaxGroups int}` sets per-namespace limits; zero means unlimited. +- `NotFoundError` is returned when a key does not exist or has expired. +- `QuotaExceededError` is returned when a namespace quota would be exceeded. +- `KeyValue` is the item type returned by `All`. --- -## 4. API +## 4. Behavioural Rules + +- Names use full words where practical: `Store`, `ScopedStore`, `QuotaConfig`, `Watcher`, `Namespace`. +- Public comments show concrete usage instead of restating the signature. +- Examples use UK English, for example `colour` and `behaviour`. +- The store keeps a single SQLite connection open for all operations. SQLite pragmas are per-connection, so pooling would make behaviour unpredictable. +- TTL is enforced in three layers: lazy delete on `Get`, query-time filtering on bulk reads, and a background purge goroutine. +- Event delivery to watchers is non-blocking. If a watcher channel is full, the event is dropped rather than blocking the writer. +- Callbacks registered with `OnChange` are invoked synchronously after the database write. Callbacks can safely register or unregister other subscriptions because the watcher and callback registries use separate locks. +- Quota checks happen before writes. Existing keys count as upserts and do not consume quota. +- Namespace strings must match `^[a-zA-Z0-9-]+$`. + +--- + +## 5. Concrete Usage ```go -st, _ := store.New(":memory:") // or store.New("/path/to/db") -defer st.Close() +package main -st.Set("group", "key", "value") -st.SetWithTTL("group", "key", "value", 5*time.Minute) -val, _ := st.Get("group", "key") // lazy-deletes expired +import ( + "fmt" + "time" -// Iteration -for key, val := range st.AllSeq("group") { ... } -for group := range st.GroupsSeq() { ... } + "dappco.re/go/core" + "dappco.re/go/core/store" +) -// Events -ch := st.Watch("group") -st.OnChange("group", func(key, val string) { ... }) -``` +func main() { + storeInstance, err := store.New(":memory:") + if err != nil { + return + } + defer storeInstance.Close() ---- + if err := storeInstance.Set("config", "colour", "blue"); err != nil { + return + } + if err := storeInstance.SetWithTTL("session", "token", "abc123", 5*time.Minute); err != nil { + return + } -## 5. ScopedStore + colourValue, err := storeInstance.Get("config", "colour") + if err != nil { + return + } + fmt.Println(colourValue) -```go -scoped := store.NewScoped(st, "mynamespace") -scoped.Set("key", "value") // stored as group "mynamespace:default", key "key" -scoped.SetIn("mygroup", "key", "v") // stored as group "mynamespace:mygroup", key "key" -``` + for entry, err := range storeInstance.All("config") { + if err != nil { + return + } + core.Println(entry.Key, entry.Value) + } -- Namespace regex: `^[a-zA-Z0-9-]+$` -- Default group: when `Set(key, value)` is called without a group, the literal string `"default"` is used as the group name, prefixed with the namespace: `"mynamespace:default"` -- `SetIn(group, key, value)` allows explicit group within the namespace -- Quota: `MaxKeys`, `MaxGroups` — checked before writes, upserts bypass + for groupName, err := range storeInstance.GroupsSeq("tenant-a:") { + if err != nil { + return + } + core.Println(groupName) + } ---- + watcher := storeInstance.Watch("config", "*") + defer storeInstance.Unwatch(watcher) + go func() { + for event := range watcher.Events { + core.Println(event.Type, event.Group, event.Key, event.Value) + } + }() -## 6. Event System + unregister := storeInstance.OnChange(func(event store.Event) { + core.Println("changed", event.Group, event.Key, event.Value) + }) + defer unregister() -- `Watch(group)` — returns buffered channel (cap 16), non-blocking sends drop events -- `Unwatch(group, ch)` — remove a watcher -- `OnChange(group, callback)` — synchronous callback in writer goroutine; callbacks can manage subscriptions re-entrantly - ---- - -## 7. Workspace Buffer - -Stateful work accumulation over time. A workspace is a named DuckDB buffer for mutable work-in-progress. When a unit of work completes, the full state commits atomically to a time-series journal (InfluxDB). A summary updates the identity store (the existing SQLite store or an external database). - -### 7.1 The Problem - -Writing every micro-event directly to a time-series makes deltas meaningless — 4000 writes of "+1" produces noise. A mutable buffer accumulates the work, then commits once as a complete unit. The time-series only sees finished work, so deltas between entries represent real change. - -### 7.2 Three Layers - -``` -Store (SQLite): "this thing exists" — identity, current summary -Buffer (DuckDB): "this thing is working" — mutable temp state, atomic -Journal (InfluxDB): "this thing completed" — immutable, delta-ready -``` - -| Layer | Store | Mutability | Lifetime | -|-------|-------|-----------|----------| -| Identity | SQLite (go-store) | Mutable | Permanent | -| Hot | DuckDB (temp file) | Mutable | Session/cycle | -| Journal | InfluxDB | Append-only | Retention policy | -| Cold | Compressed JSONL | Immutable | Archive | - -### 7.3 Workspace API - -```go -// Workspace is a named DuckDB buffer for mutable work-in-progress. -// It holds a reference to the parent Store for identity updates and journal writes. -// -// ws, _ := st.NewWorkspace("scroll-session-2026-03-30") -// ws.Put("like", map[string]any{"user": "@handle", "post": "video_123"}) -// ws.Commit() // atomic → journal + identity summary -type Workspace struct { - name string - store *Store // parent store for identity updates + journal config - db *duckdb.DB // mutable buffer (temp file, deleted on commit/discard) + scopedStore, err := store.NewScopedWithQuota( + storeInstance, + "tenant-a", + store.QuotaConfig{MaxKeys: 100, MaxGroups: 10}, + ) + if err != nil { + return + } + if err := scopedStore.Set("prefs", "locale", "en-GB"); err != nil { + return + } } - -// NewWorkspace creates a workspace buffer. The DuckDB file is created at .core/state/{name}.duckdb. -// -// ws, _ := st.NewWorkspace("scroll-session-2026-03-30") -func (s *Store) NewWorkspace(name string) (*Workspace, error) { } -``` - -```go -// Put accumulates an entry in the workspace buffer -// -// ws.Put("like", map[string]any{"user": "@handle"}) -func (ws *Workspace) Put(kind string, data map[string]any) { } - -// Aggregate returns a summary of the current workspace state -// -// summary := ws.Aggregate() // {"like": 4000, "profile_match": 12} -func (ws *Workspace) Aggregate() map[string]any { } - -// Commit writes the aggregated state to the journal and updates the identity store -// -// result := ws.Commit() -func (ws *Workspace) Commit() core.Result { } - -// Discard drops the workspace without committing -// -// ws.Discard() -func (ws *Workspace) Discard() { } - -// Query runs SQL against the buffer for ad-hoc analysis. -// Returns core.Result where Value is []map[string]any (rows as maps). -// -// result := ws.Query("SELECT kind, COUNT(*) as n FROM entries GROUP BY kind") -// rows := result.Value.([]map[string]any) // [{"kind": "like", "n": 4000}] -func (ws *Workspace) Query(sql string) core.Result { } -``` - -### 7.4 Journal - -Commit writes a single point per completed workspace. One point = one unit of work. - -```go -// commitToJournal writes aggregated state as a single InfluxDB point -// -// s.commitToJournal("scroll-session", fields, tags) -func (s *Store) commitToJournal(measurement string, fields map[string]any, tags map[string]string) core.Result { } - -// QueryJournal runs a Flux query against the time-series. -// Returns core.Result where Value is []map[string]any (rows as maps). -// -// result := s.QueryJournal(`from(bucket: "core") |> range(start: -7d)`) -// rows := result.Value.([]map[string]any) -func (s *Store) QueryJournal(flux string) core.Result { } -``` - -Because each point is a complete unit, queries naturally produce meaningful results without complex aggregation. - -### 7.5 Cold Archive - -When journal entries age past retention, they compact to cold storage: - -```go -// CompactOptions controls cold archive generation. -type CompactOptions struct { - Before time.Time // archive entries before this time - Output string // output directory (default: .core/archive/) - Format string // gzip or zstd (default: gzip) -} - -// Compact archives journal entries to compressed JSONL -// -// st.Compact(store.CompactOptions{Before: time.Now().Add(-90*24*time.Hour), Output: "/archive/"}) -func (s *Store) Compact(opts CompactOptions) core.Result { } -``` - -Output: gzip JSONL files. Each line is a complete unit of work — ready for training data ingestion, CDN publishing, or long-term analytics. - -### 7.6 File Lifecycle - -DuckDB files are ephemeral: - -``` -Created: workspace opens → .core/state/{name}.duckdb -Active: Put() accumulates entries -Committed: Commit() → journal write → identity update → file deleted -Discarded: Discard() → file deleted -Crashed: Orphaned .duckdb files recovered on next startup ``` --- -## 8. Reference Material +## 6. Reference Paths | Resource | Location | |----------|----------| -| Core Go RFC | `code/core/go/RFC.md` | -| IO RFC | `code/core/go/io/RFC.md` | +| Package comment | `doc.go` | +| Core store implementation | `store.go` | +| Events and callbacks | `events.go` | +| Namespace and quota logic | `scope.go` | +| Architecture notes | `docs/architecture.md` | +| Agent conventions | `CODEX.md` | +