[agent/codex:gpt-5.4] Read ~/spec/code/core/go/store/RFC.md fully. Find features d... #116

Merged
Virgil merged 1 commit from agent/read---spec-code-core-go-store-rfc-md-fu into dev 2026-04-04 10:09:50 +00:00
3 changed files with 78 additions and 35 deletions

View file

@ -21,8 +21,8 @@ SQLite-backed key-value store with TTL, namespace isolation, reactive events, an
| `store.go` | Core `Store`: CRUD on `(grp, key)` compound PK, TTL via `expires_at` (Unix ms), background purge (60s), `text/template` rendering, `iter.Seq2` iterators |
| `events.go` | `Watch`/`Unwatch` (buffered chan, cap 16, non-blocking sends) + `OnChange` callbacks (synchronous) |
| `scope.go` | `ScopedStore` wraps `*Store`, prefixes groups with `namespace:`. Quota enforcement (`MaxKeys`/`MaxGroups`) |
| `workspace.go` | `Workspace` buffer: DuckDB-backed mutable accumulation, atomic commit to journal |
| `journal.go` | InfluxDB journal: write completed units, query time-series, retention |
| `workspace.go` | `Workspace` buffer: SQLite-backed mutable accumulation in `.duckdb` files, atomic commit to journal |
| `journal.go` | SQLite journal table: write completed units, query time-series-shaped data, retention |
| `compact.go` | Cold archive: compress journal entries to JSONL.gz |
| `store_test.go` | Store unit tests |
| `workspace_test.go` | Workspace buffer tests |
@ -137,13 +137,13 @@ func (ss *ScopedStore) GetFrom(group, key string) (string, error) { }
- `Watch(group string) <-chan Event` — returns buffered channel (cap 16), non-blocking sends drop events
- `Unwatch(group string, ch <-chan Event)` — remove a watcher
- `OnChange(callback)` — synchronous callback in writer goroutine
- **Deadlock warning:** `notify()` holds `s.mu` read-lock — calling Watch/Unwatch/OnChange from inside a callback will deadlock
- `notify()` snapshots callbacks after watcher delivery, so callbacks may register or unregister subscriptions re-entrantly without deadlocking
---
## 8. Workspace Buffer
Stateful work accumulation over time. A workspace is a named DuckDB buffer for mutable work-in-progress. When a unit of work completes, the full state commits atomically to a time-series journal (InfluxDB). A summary updates the identity store (the existing SQLite store or an external database).
Stateful work accumulation over time. A workspace is a named SQLite buffer for mutable work-in-progress stored in a `.duckdb` file for path compatibility. When a unit of work completes, the full state commits atomically to the journal table. A summary updates the identity store.
### 7.1 The Problem
@ -153,21 +153,21 @@ Writing every micro-event directly to a time-series makes deltas meaningless —
```
Store (SQLite): "this thing exists" — identity, current summary
Buffer (DuckDB): "this thing is working" — mutable temp state, atomic
Journal (InfluxDB): "this thing completed" — immutable, delta-ready
Buffer (SQLite workspace file): "this thing is working" — mutable temp state, atomic
Journal (SQLite journal table): "this thing completed" — immutable, delta-ready
```
| Layer | Store | Mutability | Lifetime |
|-------|-------|-----------|----------|
| Identity | SQLite (go-store) | Mutable | Permanent |
| Hot | DuckDB (temp file) | Mutable | Session/cycle |
| Journal | InfluxDB | Append-only | Retention policy |
| Hot | SQLite `.duckdb` file | Mutable | Session/cycle |
| Journal | SQLite journal table | Append-only | Retention policy |
| Cold | Compressed JSONL | Immutable | Archive |
### 7.3 Workspace API
```go
// Workspace is a named DuckDB buffer for mutable work-in-progress.
// Workspace is a named SQLite buffer for mutable work-in-progress.
// It holds a reference to the parent Store for identity updates and journal writes.
//
// ws, _ := st.NewWorkspace("scroll-session-2026-03-30")
@ -176,10 +176,10 @@ Journal (InfluxDB): "this thing completed" — immutable, delta-ready
type Workspace struct {
name string
store *Store // parent store for identity updates + journal config
db *sql.DB // DuckDB via database/sql driver (temp file, deleted on commit/discard)
db *sql.DB // SQLite via database/sql driver (temp file, deleted on commit/discard)
}
// NewWorkspace creates a workspace buffer. The DuckDB file is created at .core/state/{name}.duckdb.
// NewWorkspace creates a workspace buffer. The SQLite file is created at .core/state/{name}.duckdb.
//
// ws, _ := st.NewWorkspace("scroll-session-2026-03-30")
func (s *Store) NewWorkspace(name string) (*Workspace, error) { }
@ -219,13 +219,13 @@ func (ws *Workspace) Query(sql string) core.Result { }
Commit writes a single point per completed workspace. One point = one unit of work.
```go
// CommitToJournal writes aggregated state as a single InfluxDB point.
// CommitToJournal writes aggregated state as a single journal entry.
// Called by Workspace.Commit() internally, but exported for testing.
//
// s.CommitToJournal("scroll-session", fields, tags)
func (s *Store) CommitToJournal(measurement string, fields map[string]any, tags map[string]string) core.Result { }
// QueryJournal runs a Flux query against the time-series.
// QueryJournal runs a Flux-shaped filter or raw SQL query against the journal table.
// Returns core.Result where Value is []map[string]any (rows as maps).
//
// result := s.QueryJournal(`from(bucket: "core") |> range(start: -7d)`)
@ -257,7 +257,7 @@ Output: gzip JSONL files. Each line is a complete unit of work — ready for tra
### 7.6 File Lifecycle
DuckDB files are ephemeral:
Workspace files are ephemeral:
```
Created: workspace opens → .core/state/{name}.duckdb
@ -271,8 +271,8 @@ Orphan recovery on `New()`:
```go
// New() scans .core/state/ for leftover .duckdb files.
// Each orphan is opened, aggregated, and discarded (not committed).
// The caller decides whether to commit orphan data via RecoverOrphans().
// Each orphan is opened and cached for RecoverOrphans().
// The caller decides whether to commit or discard orphan data.
//
// orphans := st.RecoverOrphans(".core/state/")
// for _, ws := range orphans {

View file

@ -177,6 +177,63 @@ func TestEvents_OnChange_Good_GroupFilteredCallback(t *testing.T) {
assert.Equal(t, []string{"theme=dark"}, seen)
}
func TestEvents_OnChange_Good_ReentrantSubscriptionChanges(t *testing.T) {
storeInstance, _ := New(":memory:")
defer storeInstance.Close()
var (
seen []string
seenMutex sync.Mutex
nestedEvents <-chan Event
nestedActive bool
nestedStopped bool
unregisterNested = func() {}
)
unregisterPrimary := storeInstance.OnChange(func(event Event) {
seenMutex.Lock()
seen = append(seen, event.Key)
seenMutex.Unlock()
if !nestedActive {
nestedEvents = storeInstance.Watch("config")
unregisterNested = storeInstance.OnChange(func(nested Event) {
seenMutex.Lock()
seen = append(seen, "nested:"+nested.Key)
seenMutex.Unlock()
})
nestedActive = true
return
}
if !nestedStopped {
storeInstance.Unwatch("config", nestedEvents)
unregisterNested()
nestedStopped = true
}
})
defer unregisterPrimary()
require.NoError(t, storeInstance.Set("config", "first", "dark"))
require.NoError(t, storeInstance.Set("config", "second", "light"))
require.NoError(t, storeInstance.Set("config", "third", "blue"))
seenMutex.Lock()
assert.Equal(t, []string{"first", "second", "nested:second", "third"}, seen)
seenMutex.Unlock()
select {
case event, open := <-nestedEvents:
require.True(t, open)
assert.Equal(t, "second", event.Key)
case <-time.After(time.Second):
t.Fatal("timed out waiting for nested watcher event")
}
_, open := <-nestedEvents
assert.False(t, open, "nested watcher should be closed after callback-driven unwatch")
}
func TestEvents_Notify_Good_PopulatesTimestamp(t *testing.T) {
storeInstance, _ := New(":memory:")
defer storeInstance.Close()

View file

@ -181,6 +181,10 @@ func discoverOrphanWorkspacePaths(stateDirectory string) []string {
}
func discoverOrphanWorkspaces(stateDirectory string, backingStore *Store) []*Workspace {
return loadRecoveredWorkspaces(stateDirectory, backingStore)
}
func loadRecoveredWorkspaces(stateDirectory string, backingStore *Store) []*Workspace {
filesystem := (&core.Fs{}).NewUnrestricted()
orphanWorkspaces := make([]*Workspace, 0)
for _, databasePath := range discoverOrphanWorkspacePaths(stateDirectory) {
@ -234,25 +238,7 @@ func (storeInstance *Store) RecoverOrphans(stateDirectory string) []*Workspace {
return cachedWorkspaces
}
}
filesystem := (&core.Fs{}).NewUnrestricted()
var orphanWorkspaces []*Workspace
for _, databasePath := range discoverOrphanWorkspacePaths(stateDirectory) {
workspaceDatabase, err := openWorkspaceDatabase(databasePath)
if err != nil {
continue
}
orphanWorkspace := &Workspace{
name: workspaceNameFromPath(stateDirectory, databasePath),
backingStore: storeInstance,
workspaceDatabase: workspaceDatabase,
databasePath: databasePath,
filesystem: filesystem,
}
orphanWorkspace.orphanAggregate = orphanWorkspace.captureAggregateSnapshot()
orphanWorkspaces = append(orphanWorkspaces, orphanWorkspace)
}
return orphanWorkspaces
return loadRecoveredWorkspaces(stateDirectory, storeInstance)
}
// Usage example: `err := workspace.Put("like", map[string]any{"user": "@alice", "post": "video_123"})`