docs(store): align RFC-STORE with AX
Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
parent
d6cd9fd818
commit
da29c712b4
1 changed files with 137 additions and 174 deletions
|
|
@ -1,216 +1,179 @@
|
|||
# go-store RFC — SQLite Key-Value Store
|
||||
# go-store RFC - AX-Aligned SQLite Store
|
||||
|
||||
> An agent should be able to use this store from this document alone.
|
||||
|
||||
**Module:** `dappco.re/go/store`
|
||||
**Module:** `dappco.re/go/core/store`
|
||||
**Repository:** `core/go-store`
|
||||
**Files:** 8
|
||||
**Package:** `store`
|
||||
|
||||
---
|
||||
|
||||
## 1. Overview
|
||||
|
||||
SQLite-backed key-value store with TTL, namespace isolation, reactive events, and quota enforcement. Pure Go (no CGO). Used by core/ide for memory caching and by agents for workspace state.
|
||||
go-store is a single-package SQLite-backed key-value store with TTL expiry, namespace isolation, quota enforcement, and reactive mutation events.
|
||||
|
||||
The public surface is intentionally small. Names are descriptive, comments show concrete usage, and the implementation keeps a single SQLite connection so pragma settings stay consistent.
|
||||
|
||||
---
|
||||
|
||||
## 2. Architecture
|
||||
## 2. File Layout
|
||||
|
||||
| File | Purpose |
|
||||
|------|---------|
|
||||
| `store.go` | Core `Store`: CRUD on `(grp, key)` compound PK, TTL via `expires_at` (Unix ms), background purge (60s), `text/template` rendering, `iter.Seq2` iterators |
|
||||
| `events.go` | `Watch`/`Unwatch` (buffered chan, cap 16, non-blocking sends) + `OnChange` callbacks (synchronous) |
|
||||
| `scope.go` | `ScopedStore` wraps `*Store`, prefixes groups with `namespace:`. Quota enforcement (`MaxKeys`/`MaxGroups`) |
|
||||
| `workspace.go` | `Workspace` buffer: DuckDB-backed mutable accumulation, atomic commit to journal |
|
||||
| `journal.go` | InfluxDB journal: write completed units, query time-series, retention |
|
||||
| `compact.go` | Cold archive: compress journal entries to JSONL.gz |
|
||||
| `store_test.go` | Store unit tests |
|
||||
| `workspace_test.go` | Workspace buffer tests |
|
||||
| `doc.go` | Package comment with concrete usage examples |
|
||||
| `store.go` | `Store`, CRUD, TTL, background purge, bulk reads, prefix counts, group discovery, string splitting helpers, template rendering |
|
||||
| `events.go` | `EventType`, `Event`, `Watcher`, `Watch`, `Unwatch`, `OnChange`, internal dispatch |
|
||||
| `scope.go` | `ScopedStore`, `QuotaConfig`, namespace validation, quota enforcement |
|
||||
| `*_test.go` | Behavioural tests for CRUD, TTL, events, quotas, and defensive error paths |
|
||||
|
||||
---
|
||||
|
||||
## 3. Key Design Decisions
|
||||
## 3. Core API
|
||||
|
||||
- **Single-connection SQLite.** `MaxOpenConns(1)` because SQLite pragmas (WAL, busy_timeout) are per-connection — a pool would hand out unpragma'd connections causing `SQLITE_BUSY`
|
||||
- **TTL is triple-layered:** lazy delete on `Get`, query-time `WHERE` filtering, background purge goroutine
|
||||
- **LIKE queries use `escapeLike()`** with `^` as escape char to prevent SQL wildcard injection
|
||||
### Store
|
||||
|
||||
- `New(databasePath string) (*Store, error)` opens a SQLite database, applies WAL and busy-timeout pragmas, and pins access to one connection.
|
||||
- `Close() error` stops the background purge goroutine and closes the database.
|
||||
- `Get(group, key string) (string, error)` returns a stored value or `NotFoundError`.
|
||||
- `Set(group, key, value string) error` stores a value and clears any existing TTL.
|
||||
- `SetWithTTL(group, key, value string, ttl time.Duration) error` stores a value that expires after the supplied duration.
|
||||
- `Delete(group, key string) error` removes one key.
|
||||
- `DeleteGroup(group string) error` removes every key in a group.
|
||||
- `Count(group string) (int, error)` counts non-expired keys in one group.
|
||||
- `CountAll(groupPrefix string) (int, error)` counts non-expired keys across groups that match a prefix.
|
||||
- `GetAll(group string) (map[string]string, error)` returns all non-expired keys in one group.
|
||||
- `All(group string) iter.Seq2[KeyValue, error]` streams all non-expired key-value pairs in one group.
|
||||
- `Groups(groupPrefix string) ([]string, error)` returns distinct non-expired group names that match a prefix.
|
||||
- `GroupsSeq(groupPrefix string) iter.Seq2[string, error]` streams matching group names.
|
||||
- `GetSplit(group, key, separator string) (iter.Seq[string], error)` splits a stored value by a custom separator.
|
||||
- `GetFields(group, key string) (iter.Seq[string], error)` splits a stored value on whitespace.
|
||||
- `Render(templateSource, group string) (string, error)` renders a Go `text/template` using group data.
|
||||
- `PurgeExpired() (int64, error)` removes expired rows immediately.
|
||||
|
||||
### ScopedStore
|
||||
|
||||
- `NewScoped(storeInstance *Store, namespace string) (*ScopedStore, error)` validates a namespace and prefixes groups with `namespace + ":"`.
|
||||
- `NewScopedWithQuota(storeInstance *Store, namespace string, quota QuotaConfig) (*ScopedStore, error)` adds per-namespace key and group limits.
|
||||
- `Namespace() string` returns the namespace string.
|
||||
- `ScopedStore` exposes the same read and write methods as `Store`, with group names prefixed automatically.
|
||||
|
||||
### Events
|
||||
|
||||
- `EventType` values: `EventSet`, `EventDelete`, `EventDeleteGroup`.
|
||||
- `EventType.String()` returns `set`, `delete`, `delete_group`, or `unknown`.
|
||||
- `Event` carries `Type`, `Group`, `Key`, `Value`, and `Timestamp`.
|
||||
- `Watcher` exposes `Events <-chan Event`.
|
||||
- `Watch(group, key string) *Watcher` registers a buffered watcher.
|
||||
- `Unwatch(watcher *Watcher)` removes a watcher and closes its channel.
|
||||
- `OnChange(callback func(Event)) func()` registers a synchronous callback and returns an idempotent unregister function.
|
||||
|
||||
### Quotas and Errors
|
||||
|
||||
- `QuotaConfig{MaxKeys, MaxGroups int}` sets per-namespace limits; zero means unlimited.
|
||||
- `NotFoundError` is returned when a key does not exist or has expired.
|
||||
- `QuotaExceededError` is returned when a namespace quota would be exceeded.
|
||||
- `KeyValue` is the item type returned by `All`.
|
||||
|
||||
---
|
||||
|
||||
## 4. API
|
||||
## 4. Behavioural Rules
|
||||
|
||||
- Names use full words where practical: `Store`, `ScopedStore`, `QuotaConfig`, `Watcher`, `Namespace`.
|
||||
- Public comments show concrete usage instead of restating the signature.
|
||||
- Examples use UK English, for example `colour` and `behaviour`.
|
||||
- The store keeps a single SQLite connection open for all operations. SQLite pragmas are per-connection, so pooling would make behaviour unpredictable.
|
||||
- TTL is enforced in three layers: lazy delete on `Get`, query-time filtering on bulk reads, and a background purge goroutine.
|
||||
- Event delivery to watchers is non-blocking. If a watcher channel is full, the event is dropped rather than blocking the writer.
|
||||
- Callbacks registered with `OnChange` are invoked synchronously after the database write. Callbacks can safely register or unregister other subscriptions because the watcher and callback registries use separate locks.
|
||||
- Quota checks happen before writes. Existing keys count as upserts and do not consume quota.
|
||||
- Namespace strings must match `^[a-zA-Z0-9-]+$`.
|
||||
|
||||
---
|
||||
|
||||
## 5. Concrete Usage
|
||||
|
||||
```go
|
||||
st, _ := store.New(":memory:") // or store.New("/path/to/db")
|
||||
defer st.Close()
|
||||
package main
|
||||
|
||||
st.Set("group", "key", "value")
|
||||
st.SetWithTTL("group", "key", "value", 5*time.Minute)
|
||||
val, _ := st.Get("group", "key") // lazy-deletes expired
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
// Iteration
|
||||
for key, val := range st.AllSeq("group") { ... }
|
||||
for group := range st.GroupsSeq() { ... }
|
||||
"dappco.re/go/core"
|
||||
"dappco.re/go/core/store"
|
||||
)
|
||||
|
||||
// Events
|
||||
ch := st.Watch("group")
|
||||
st.OnChange("group", func(key, val string) { ... })
|
||||
```
|
||||
func main() {
|
||||
storeInstance, err := store.New(":memory:")
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer storeInstance.Close()
|
||||
|
||||
---
|
||||
if err := storeInstance.Set("config", "colour", "blue"); err != nil {
|
||||
return
|
||||
}
|
||||
if err := storeInstance.SetWithTTL("session", "token", "abc123", 5*time.Minute); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
## 5. ScopedStore
|
||||
colourValue, err := storeInstance.Get("config", "colour")
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
fmt.Println(colourValue)
|
||||
|
||||
```go
|
||||
scoped := store.NewScoped(st, "mynamespace")
|
||||
scoped.Set("key", "value") // stored as group "mynamespace:default", key "key"
|
||||
scoped.SetIn("mygroup", "key", "v") // stored as group "mynamespace:mygroup", key "key"
|
||||
```
|
||||
for entry, err := range storeInstance.All("config") {
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
core.Println(entry.Key, entry.Value)
|
||||
}
|
||||
|
||||
- Namespace regex: `^[a-zA-Z0-9-]+$`
|
||||
- Default group: when `Set(key, value)` is called without a group, the literal string `"default"` is used as the group name, prefixed with the namespace: `"mynamespace:default"`
|
||||
- `SetIn(group, key, value)` allows explicit group within the namespace
|
||||
- Quota: `MaxKeys`, `MaxGroups` — checked before writes, upserts bypass
|
||||
for groupName, err := range storeInstance.GroupsSeq("tenant-a:") {
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
core.Println(groupName)
|
||||
}
|
||||
|
||||
---
|
||||
watcher := storeInstance.Watch("config", "*")
|
||||
defer storeInstance.Unwatch(watcher)
|
||||
go func() {
|
||||
for event := range watcher.Events {
|
||||
core.Println(event.Type, event.Group, event.Key, event.Value)
|
||||
}
|
||||
}()
|
||||
|
||||
## 6. Event System
|
||||
unregister := storeInstance.OnChange(func(event store.Event) {
|
||||
core.Println("changed", event.Group, event.Key, event.Value)
|
||||
})
|
||||
defer unregister()
|
||||
|
||||
- `Watch(group)` — returns buffered channel (cap 16), non-blocking sends drop events
|
||||
- `Unwatch(group, ch)` — remove a watcher
|
||||
- `OnChange(group, callback)` — synchronous callback in writer goroutine; callbacks can manage subscriptions re-entrantly
|
||||
|
||||
---
|
||||
|
||||
## 7. Workspace Buffer
|
||||
|
||||
Stateful work accumulation over time. A workspace is a named DuckDB buffer for mutable work-in-progress. When a unit of work completes, the full state commits atomically to a time-series journal (InfluxDB). A summary updates the identity store (the existing SQLite store or an external database).
|
||||
|
||||
### 7.1 The Problem
|
||||
|
||||
Writing every micro-event directly to a time-series makes deltas meaningless — 4000 writes of "+1" produces noise. A mutable buffer accumulates the work, then commits once as a complete unit. The time-series only sees finished work, so deltas between entries represent real change.
|
||||
|
||||
### 7.2 Three Layers
|
||||
|
||||
```
|
||||
Store (SQLite): "this thing exists" — identity, current summary
|
||||
Buffer (DuckDB): "this thing is working" — mutable temp state, atomic
|
||||
Journal (InfluxDB): "this thing completed" — immutable, delta-ready
|
||||
```
|
||||
|
||||
| Layer | Store | Mutability | Lifetime |
|
||||
|-------|-------|-----------|----------|
|
||||
| Identity | SQLite (go-store) | Mutable | Permanent |
|
||||
| Hot | DuckDB (temp file) | Mutable | Session/cycle |
|
||||
| Journal | InfluxDB | Append-only | Retention policy |
|
||||
| Cold | Compressed JSONL | Immutable | Archive |
|
||||
|
||||
### 7.3 Workspace API
|
||||
|
||||
```go
|
||||
// Workspace is a named DuckDB buffer for mutable work-in-progress.
|
||||
// It holds a reference to the parent Store for identity updates and journal writes.
|
||||
//
|
||||
// ws, _ := st.NewWorkspace("scroll-session-2026-03-30")
|
||||
// ws.Put("like", map[string]any{"user": "@handle", "post": "video_123"})
|
||||
// ws.Commit() // atomic → journal + identity summary
|
||||
type Workspace struct {
|
||||
name string
|
||||
store *Store // parent store for identity updates + journal config
|
||||
db *duckdb.DB // mutable buffer (temp file, deleted on commit/discard)
|
||||
scopedStore, err := store.NewScopedWithQuota(
|
||||
storeInstance,
|
||||
"tenant-a",
|
||||
store.QuotaConfig{MaxKeys: 100, MaxGroups: 10},
|
||||
)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if err := scopedStore.Set("prefs", "locale", "en-GB"); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// NewWorkspace creates a workspace buffer. The DuckDB file is created at .core/state/{name}.duckdb.
|
||||
//
|
||||
// ws, _ := st.NewWorkspace("scroll-session-2026-03-30")
|
||||
func (s *Store) NewWorkspace(name string) (*Workspace, error) { }
|
||||
```
|
||||
|
||||
```go
|
||||
// Put accumulates an entry in the workspace buffer
|
||||
//
|
||||
// ws.Put("like", map[string]any{"user": "@handle"})
|
||||
func (ws *Workspace) Put(kind string, data map[string]any) { }
|
||||
|
||||
// Aggregate returns a summary of the current workspace state
|
||||
//
|
||||
// summary := ws.Aggregate() // {"like": 4000, "profile_match": 12}
|
||||
func (ws *Workspace) Aggregate() map[string]any { }
|
||||
|
||||
// Commit writes the aggregated state to the journal and updates the identity store
|
||||
//
|
||||
// result := ws.Commit()
|
||||
func (ws *Workspace) Commit() core.Result { }
|
||||
|
||||
// Discard drops the workspace without committing
|
||||
//
|
||||
// ws.Discard()
|
||||
func (ws *Workspace) Discard() { }
|
||||
|
||||
// Query runs SQL against the buffer for ad-hoc analysis.
|
||||
// Returns core.Result where Value is []map[string]any (rows as maps).
|
||||
//
|
||||
// result := ws.Query("SELECT kind, COUNT(*) as n FROM entries GROUP BY kind")
|
||||
// rows := result.Value.([]map[string]any) // [{"kind": "like", "n": 4000}]
|
||||
func (ws *Workspace) Query(sql string) core.Result { }
|
||||
```
|
||||
|
||||
### 7.4 Journal
|
||||
|
||||
Commit writes a single point per completed workspace. One point = one unit of work.
|
||||
|
||||
```go
|
||||
// commitToJournal writes aggregated state as a single InfluxDB point
|
||||
//
|
||||
// s.commitToJournal("scroll-session", fields, tags)
|
||||
func (s *Store) commitToJournal(measurement string, fields map[string]any, tags map[string]string) core.Result { }
|
||||
|
||||
// QueryJournal runs a Flux query against the time-series.
|
||||
// Returns core.Result where Value is []map[string]any (rows as maps).
|
||||
//
|
||||
// result := s.QueryJournal(`from(bucket: "core") |> range(start: -7d)`)
|
||||
// rows := result.Value.([]map[string]any)
|
||||
func (s *Store) QueryJournal(flux string) core.Result { }
|
||||
```
|
||||
|
||||
Because each point is a complete unit, queries naturally produce meaningful results without complex aggregation.
|
||||
|
||||
### 7.5 Cold Archive
|
||||
|
||||
When journal entries age past retention, they compact to cold storage:
|
||||
|
||||
```go
|
||||
// CompactOptions controls cold archive generation.
|
||||
type CompactOptions struct {
|
||||
Before time.Time // archive entries before this time
|
||||
Output string // output directory (default: .core/archive/)
|
||||
Format string // gzip or zstd (default: gzip)
|
||||
}
|
||||
|
||||
// Compact archives journal entries to compressed JSONL
|
||||
//
|
||||
// st.Compact(store.CompactOptions{Before: time.Now().Add(-90*24*time.Hour), Output: "/archive/"})
|
||||
func (s *Store) Compact(opts CompactOptions) core.Result { }
|
||||
```
|
||||
|
||||
Output: gzip JSONL files. Each line is a complete unit of work — ready for training data ingestion, CDN publishing, or long-term analytics.
|
||||
|
||||
### 7.6 File Lifecycle
|
||||
|
||||
DuckDB files are ephemeral:
|
||||
|
||||
```
|
||||
Created: workspace opens → .core/state/{name}.duckdb
|
||||
Active: Put() accumulates entries
|
||||
Committed: Commit() → journal write → identity update → file deleted
|
||||
Discarded: Discard() → file deleted
|
||||
Crashed: Orphaned .duckdb files recovered on next startup
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 8. Reference Material
|
||||
## 6. Reference Paths
|
||||
|
||||
| Resource | Location |
|
||||
|----------|----------|
|
||||
| Core Go RFC | `code/core/go/RFC.md` |
|
||||
| IO RFC | `code/core/go/io/RFC.md` |
|
||||
| Package comment | `doc.go` |
|
||||
| Core store implementation | `store.go` |
|
||||
| Events and callbacks | `events.go` |
|
||||
| Namespace and quota logic | `scope.go` |
|
||||
| Architecture notes | `docs/architecture.md` |
|
||||
| Agent conventions | `CODEX.md` |
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue