[agent/codex:gpt-5.4] Read ~/spec/code/core/go/store/RFC.md fully. Find features d... #116
3 changed files with 78 additions and 35 deletions
|
|
@ -21,8 +21,8 @@ SQLite-backed key-value store with TTL, namespace isolation, reactive events, an
|
|||
| `store.go` | Core `Store`: CRUD on `(grp, key)` compound PK, TTL via `expires_at` (Unix ms), background purge (60s), `text/template` rendering, `iter.Seq2` iterators |
|
||||
| `events.go` | `Watch`/`Unwatch` (buffered chan, cap 16, non-blocking sends) + `OnChange` callbacks (synchronous) |
|
||||
| `scope.go` | `ScopedStore` wraps `*Store`, prefixes groups with `namespace:`. Quota enforcement (`MaxKeys`/`MaxGroups`) |
|
||||
| `workspace.go` | `Workspace` buffer: DuckDB-backed mutable accumulation, atomic commit to journal |
|
||||
| `journal.go` | InfluxDB journal: write completed units, query time-series, retention |
|
||||
| `workspace.go` | `Workspace` buffer: SQLite-backed mutable accumulation in `.duckdb` files, atomic commit to journal |
|
||||
| `journal.go` | SQLite journal table: write completed units, query time-series-shaped data, retention |
|
||||
| `compact.go` | Cold archive: compress journal entries to JSONL.gz |
|
||||
| `store_test.go` | Store unit tests |
|
||||
| `workspace_test.go` | Workspace buffer tests |
|
||||
|
|
@ -137,13 +137,13 @@ func (ss *ScopedStore) GetFrom(group, key string) (string, error) { }
|
|||
- `Watch(group string) <-chan Event` — returns buffered channel (cap 16), non-blocking sends drop events
|
||||
- `Unwatch(group string, ch <-chan Event)` — remove a watcher
|
||||
- `OnChange(callback)` — synchronous callback in writer goroutine
|
||||
- **Deadlock warning:** `notify()` holds `s.mu` read-lock — calling Watch/Unwatch/OnChange from inside a callback will deadlock
|
||||
- `notify()` snapshots callbacks after watcher delivery, so callbacks may register or unregister subscriptions re-entrantly without deadlocking
|
||||
|
||||
---
|
||||
|
||||
## 8. Workspace Buffer
|
||||
|
||||
Stateful work accumulation over time. A workspace is a named DuckDB buffer for mutable work-in-progress. When a unit of work completes, the full state commits atomically to a time-series journal (InfluxDB). A summary updates the identity store (the existing SQLite store or an external database).
|
||||
Stateful work accumulation over time. A workspace is a named SQLite buffer for mutable work-in-progress stored in a `.duckdb` file for path compatibility. When a unit of work completes, the full state commits atomically to the journal table. A summary updates the identity store.
|
||||
|
||||
### 7.1 The Problem
|
||||
|
||||
|
|
@ -153,21 +153,21 @@ Writing every micro-event directly to a time-series makes deltas meaningless —
|
|||
|
||||
```
|
||||
Store (SQLite): "this thing exists" — identity, current summary
|
||||
Buffer (DuckDB): "this thing is working" — mutable temp state, atomic
|
||||
Journal (InfluxDB): "this thing completed" — immutable, delta-ready
|
||||
Buffer (SQLite workspace file): "this thing is working" — mutable temp state, atomic
|
||||
Journal (SQLite journal table): "this thing completed" — immutable, delta-ready
|
||||
```
|
||||
|
||||
| Layer | Store | Mutability | Lifetime |
|
||||
|-------|-------|-----------|----------|
|
||||
| Identity | SQLite (go-store) | Mutable | Permanent |
|
||||
| Hot | DuckDB (temp file) | Mutable | Session/cycle |
|
||||
| Journal | InfluxDB | Append-only | Retention policy |
|
||||
| Hot | SQLite `.duckdb` file | Mutable | Session/cycle |
|
||||
| Journal | SQLite journal table | Append-only | Retention policy |
|
||||
| Cold | Compressed JSONL | Immutable | Archive |
|
||||
|
||||
### 7.3 Workspace API
|
||||
|
||||
```go
|
||||
// Workspace is a named DuckDB buffer for mutable work-in-progress.
|
||||
// Workspace is a named SQLite buffer for mutable work-in-progress.
|
||||
// It holds a reference to the parent Store for identity updates and journal writes.
|
||||
//
|
||||
// ws, _ := st.NewWorkspace("scroll-session-2026-03-30")
|
||||
|
|
@ -176,10 +176,10 @@ Journal (InfluxDB): "this thing completed" — immutable, delta-ready
|
|||
type Workspace struct {
|
||||
name string
|
||||
store *Store // parent store for identity updates + journal config
|
||||
db *sql.DB // DuckDB via database/sql driver (temp file, deleted on commit/discard)
|
||||
db *sql.DB // SQLite via database/sql driver (temp file, deleted on commit/discard)
|
||||
}
|
||||
|
||||
// NewWorkspace creates a workspace buffer. The DuckDB file is created at .core/state/{name}.duckdb.
|
||||
// NewWorkspace creates a workspace buffer. The SQLite file is created at .core/state/{name}.duckdb.
|
||||
//
|
||||
// ws, _ := st.NewWorkspace("scroll-session-2026-03-30")
|
||||
func (s *Store) NewWorkspace(name string) (*Workspace, error) { }
|
||||
|
|
@ -219,13 +219,13 @@ func (ws *Workspace) Query(sql string) core.Result { }
|
|||
Commit writes a single point per completed workspace. One point = one unit of work.
|
||||
|
||||
```go
|
||||
// CommitToJournal writes aggregated state as a single InfluxDB point.
|
||||
// CommitToJournal writes aggregated state as a single journal entry.
|
||||
// Called by Workspace.Commit() internally, but exported for testing.
|
||||
//
|
||||
// s.CommitToJournal("scroll-session", fields, tags)
|
||||
func (s *Store) CommitToJournal(measurement string, fields map[string]any, tags map[string]string) core.Result { }
|
||||
|
||||
// QueryJournal runs a Flux query against the time-series.
|
||||
// QueryJournal runs a Flux-shaped filter or raw SQL query against the journal table.
|
||||
// Returns core.Result where Value is []map[string]any (rows as maps).
|
||||
//
|
||||
// result := s.QueryJournal(`from(bucket: "core") |> range(start: -7d)`)
|
||||
|
|
@ -257,7 +257,7 @@ Output: gzip JSONL files. Each line is a complete unit of work — ready for tra
|
|||
|
||||
### 7.6 File Lifecycle
|
||||
|
||||
DuckDB files are ephemeral:
|
||||
Workspace files are ephemeral:
|
||||
|
||||
```
|
||||
Created: workspace opens → .core/state/{name}.duckdb
|
||||
|
|
@ -271,8 +271,8 @@ Orphan recovery on `New()`:
|
|||
|
||||
```go
|
||||
// New() scans .core/state/ for leftover .duckdb files.
|
||||
// Each orphan is opened, aggregated, and discarded (not committed).
|
||||
// The caller decides whether to commit orphan data via RecoverOrphans().
|
||||
// Each orphan is opened and cached for RecoverOrphans().
|
||||
// The caller decides whether to commit or discard orphan data.
|
||||
//
|
||||
// orphans := st.RecoverOrphans(".core/state/")
|
||||
// for _, ws := range orphans {
|
||||
|
|
|
|||
|
|
@ -177,6 +177,63 @@ func TestEvents_OnChange_Good_GroupFilteredCallback(t *testing.T) {
|
|||
assert.Equal(t, []string{"theme=dark"}, seen)
|
||||
}
|
||||
|
||||
func TestEvents_OnChange_Good_ReentrantSubscriptionChanges(t *testing.T) {
|
||||
storeInstance, _ := New(":memory:")
|
||||
defer storeInstance.Close()
|
||||
|
||||
var (
|
||||
seen []string
|
||||
seenMutex sync.Mutex
|
||||
nestedEvents <-chan Event
|
||||
nestedActive bool
|
||||
nestedStopped bool
|
||||
unregisterNested = func() {}
|
||||
)
|
||||
|
||||
unregisterPrimary := storeInstance.OnChange(func(event Event) {
|
||||
seenMutex.Lock()
|
||||
seen = append(seen, event.Key)
|
||||
seenMutex.Unlock()
|
||||
|
||||
if !nestedActive {
|
||||
nestedEvents = storeInstance.Watch("config")
|
||||
unregisterNested = storeInstance.OnChange(func(nested Event) {
|
||||
seenMutex.Lock()
|
||||
seen = append(seen, "nested:"+nested.Key)
|
||||
seenMutex.Unlock()
|
||||
})
|
||||
nestedActive = true
|
||||
return
|
||||
}
|
||||
|
||||
if !nestedStopped {
|
||||
storeInstance.Unwatch("config", nestedEvents)
|
||||
unregisterNested()
|
||||
nestedStopped = true
|
||||
}
|
||||
})
|
||||
defer unregisterPrimary()
|
||||
|
||||
require.NoError(t, storeInstance.Set("config", "first", "dark"))
|
||||
require.NoError(t, storeInstance.Set("config", "second", "light"))
|
||||
require.NoError(t, storeInstance.Set("config", "third", "blue"))
|
||||
|
||||
seenMutex.Lock()
|
||||
assert.Equal(t, []string{"first", "second", "nested:second", "third"}, seen)
|
||||
seenMutex.Unlock()
|
||||
|
||||
select {
|
||||
case event, open := <-nestedEvents:
|
||||
require.True(t, open)
|
||||
assert.Equal(t, "second", event.Key)
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("timed out waiting for nested watcher event")
|
||||
}
|
||||
|
||||
_, open := <-nestedEvents
|
||||
assert.False(t, open, "nested watcher should be closed after callback-driven unwatch")
|
||||
}
|
||||
|
||||
func TestEvents_Notify_Good_PopulatesTimestamp(t *testing.T) {
|
||||
storeInstance, _ := New(":memory:")
|
||||
defer storeInstance.Close()
|
||||
|
|
|
|||
24
workspace.go
24
workspace.go
|
|
@ -181,6 +181,10 @@ func discoverOrphanWorkspacePaths(stateDirectory string) []string {
|
|||
}
|
||||
|
||||
func discoverOrphanWorkspaces(stateDirectory string, backingStore *Store) []*Workspace {
|
||||
return loadRecoveredWorkspaces(stateDirectory, backingStore)
|
||||
}
|
||||
|
||||
func loadRecoveredWorkspaces(stateDirectory string, backingStore *Store) []*Workspace {
|
||||
filesystem := (&core.Fs{}).NewUnrestricted()
|
||||
orphanWorkspaces := make([]*Workspace, 0)
|
||||
for _, databasePath := range discoverOrphanWorkspacePaths(stateDirectory) {
|
||||
|
|
@ -234,25 +238,7 @@ func (storeInstance *Store) RecoverOrphans(stateDirectory string) []*Workspace {
|
|||
return cachedWorkspaces
|
||||
}
|
||||
}
|
||||
|
||||
filesystem := (&core.Fs{}).NewUnrestricted()
|
||||
var orphanWorkspaces []*Workspace
|
||||
for _, databasePath := range discoverOrphanWorkspacePaths(stateDirectory) {
|
||||
workspaceDatabase, err := openWorkspaceDatabase(databasePath)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
orphanWorkspace := &Workspace{
|
||||
name: workspaceNameFromPath(stateDirectory, databasePath),
|
||||
backingStore: storeInstance,
|
||||
workspaceDatabase: workspaceDatabase,
|
||||
databasePath: databasePath,
|
||||
filesystem: filesystem,
|
||||
}
|
||||
orphanWorkspace.orphanAggregate = orphanWorkspace.captureAggregateSnapshot()
|
||||
orphanWorkspaces = append(orphanWorkspaces, orphanWorkspace)
|
||||
}
|
||||
return orphanWorkspaces
|
||||
return loadRecoveredWorkspaces(stateDirectory, storeInstance)
|
||||
}
|
||||
|
||||
// Usage example: `err := workspace.Put("like", map[string]any{"user": "@alice", "post": "video_123"})`
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue