From 9d6420d37f8562701f76758381b5ff0c696fa7b4 Mon Sep 17 00:00:00 2001 From: Virgil Date: Sat, 4 Apr 2026 10:09:25 +0000 Subject: [PATCH] test(events): cover re-entrant callback subscriptions Co-Authored-By: Virgil --- docs/RFC-STORE.md | 32 +++++++++++++------------- events_test.go | 57 +++++++++++++++++++++++++++++++++++++++++++++++ workspace.go | 24 +++++--------------- 3 files changed, 78 insertions(+), 35 deletions(-) diff --git a/docs/RFC-STORE.md b/docs/RFC-STORE.md index a9781ac..25299da 100644 --- a/docs/RFC-STORE.md +++ b/docs/RFC-STORE.md @@ -21,8 +21,8 @@ SQLite-backed key-value store with TTL, namespace isolation, reactive events, an | `store.go` | Core `Store`: CRUD on `(grp, key)` compound PK, TTL via `expires_at` (Unix ms), background purge (60s), `text/template` rendering, `iter.Seq2` iterators | | `events.go` | `Watch`/`Unwatch` (buffered chan, cap 16, non-blocking sends) + `OnChange` callbacks (synchronous) | | `scope.go` | `ScopedStore` wraps `*Store`, prefixes groups with `namespace:`. Quota enforcement (`MaxKeys`/`MaxGroups`) | -| `workspace.go` | `Workspace` buffer: DuckDB-backed mutable accumulation, atomic commit to journal | -| `journal.go` | InfluxDB journal: write completed units, query time-series, retention | +| `workspace.go` | `Workspace` buffer: SQLite-backed mutable accumulation in `.duckdb` files, atomic commit to journal | +| `journal.go` | SQLite journal table: write completed units, query time-series-shaped data, retention | | `compact.go` | Cold archive: compress journal entries to JSONL.gz | | `store_test.go` | Store unit tests | | `workspace_test.go` | Workspace buffer tests | @@ -137,13 +137,13 @@ func (ss *ScopedStore) GetFrom(group, key string) (string, error) { } - `Watch(group string) <-chan Event` — returns buffered channel (cap 16), non-blocking sends drop events - `Unwatch(group string, ch <-chan Event)` — remove a watcher - `OnChange(callback)` — synchronous callback in writer goroutine -- **Deadlock warning:** `notify()` holds `s.mu` read-lock — calling Watch/Unwatch/OnChange from inside a callback will deadlock +- `notify()` snapshots callbacks after watcher delivery, so callbacks may register or unregister subscriptions re-entrantly without deadlocking --- ## 8. Workspace Buffer -Stateful work accumulation over time. A workspace is a named DuckDB buffer for mutable work-in-progress. When a unit of work completes, the full state commits atomically to a time-series journal (InfluxDB). A summary updates the identity store (the existing SQLite store or an external database). +Stateful work accumulation over time. A workspace is a named SQLite buffer for mutable work-in-progress stored in a `.duckdb` file for path compatibility. When a unit of work completes, the full state commits atomically to the journal table. A summary updates the identity store. ### 7.1 The Problem @@ -153,21 +153,21 @@ Writing every micro-event directly to a time-series makes deltas meaningless — ``` Store (SQLite): "this thing exists" — identity, current summary -Buffer (DuckDB): "this thing is working" — mutable temp state, atomic -Journal (InfluxDB): "this thing completed" — immutable, delta-ready +Buffer (SQLite workspace file): "this thing is working" — mutable temp state, atomic +Journal (SQLite journal table): "this thing completed" — immutable, delta-ready ``` | Layer | Store | Mutability | Lifetime | |-------|-------|-----------|----------| | Identity | SQLite (go-store) | Mutable | Permanent | -| Hot | DuckDB (temp file) | Mutable | Session/cycle | -| Journal | InfluxDB | Append-only | Retention policy | +| Hot | SQLite `.duckdb` file | Mutable | Session/cycle | +| Journal | SQLite journal table | Append-only | Retention policy | | Cold | Compressed JSONL | Immutable | Archive | ### 7.3 Workspace API ```go -// Workspace is a named DuckDB buffer for mutable work-in-progress. +// Workspace is a named SQLite buffer for mutable work-in-progress. // It holds a reference to the parent Store for identity updates and journal writes. // // ws, _ := st.NewWorkspace("scroll-session-2026-03-30") @@ -176,10 +176,10 @@ Journal (InfluxDB): "this thing completed" — immutable, delta-ready type Workspace struct { name string store *Store // parent store for identity updates + journal config - db *sql.DB // DuckDB via database/sql driver (temp file, deleted on commit/discard) + db *sql.DB // SQLite via database/sql driver (temp file, deleted on commit/discard) } -// NewWorkspace creates a workspace buffer. The DuckDB file is created at .core/state/{name}.duckdb. +// NewWorkspace creates a workspace buffer. The SQLite file is created at .core/state/{name}.duckdb. // // ws, _ := st.NewWorkspace("scroll-session-2026-03-30") func (s *Store) NewWorkspace(name string) (*Workspace, error) { } @@ -219,13 +219,13 @@ func (ws *Workspace) Query(sql string) core.Result { } Commit writes a single point per completed workspace. One point = one unit of work. ```go -// CommitToJournal writes aggregated state as a single InfluxDB point. +// CommitToJournal writes aggregated state as a single journal entry. // Called by Workspace.Commit() internally, but exported for testing. // // s.CommitToJournal("scroll-session", fields, tags) func (s *Store) CommitToJournal(measurement string, fields map[string]any, tags map[string]string) core.Result { } -// QueryJournal runs a Flux query against the time-series. +// QueryJournal runs a Flux-shaped filter or raw SQL query against the journal table. // Returns core.Result where Value is []map[string]any (rows as maps). // // result := s.QueryJournal(`from(bucket: "core") |> range(start: -7d)`) @@ -257,7 +257,7 @@ Output: gzip JSONL files. Each line is a complete unit of work — ready for tra ### 7.6 File Lifecycle -DuckDB files are ephemeral: +Workspace files are ephemeral: ``` Created: workspace opens → .core/state/{name}.duckdb @@ -271,8 +271,8 @@ Orphan recovery on `New()`: ```go // New() scans .core/state/ for leftover .duckdb files. -// Each orphan is opened, aggregated, and discarded (not committed). -// The caller decides whether to commit orphan data via RecoverOrphans(). +// Each orphan is opened and cached for RecoverOrphans(). +// The caller decides whether to commit or discard orphan data. // // orphans := st.RecoverOrphans(".core/state/") // for _, ws := range orphans { diff --git a/events_test.go b/events_test.go index 2dbf6ea..82305d2 100644 --- a/events_test.go +++ b/events_test.go @@ -177,6 +177,63 @@ func TestEvents_OnChange_Good_GroupFilteredCallback(t *testing.T) { assert.Equal(t, []string{"theme=dark"}, seen) } +func TestEvents_OnChange_Good_ReentrantSubscriptionChanges(t *testing.T) { + storeInstance, _ := New(":memory:") + defer storeInstance.Close() + + var ( + seen []string + seenMutex sync.Mutex + nestedEvents <-chan Event + nestedActive bool + nestedStopped bool + unregisterNested = func() {} + ) + + unregisterPrimary := storeInstance.OnChange(func(event Event) { + seenMutex.Lock() + seen = append(seen, event.Key) + seenMutex.Unlock() + + if !nestedActive { + nestedEvents = storeInstance.Watch("config") + unregisterNested = storeInstance.OnChange(func(nested Event) { + seenMutex.Lock() + seen = append(seen, "nested:"+nested.Key) + seenMutex.Unlock() + }) + nestedActive = true + return + } + + if !nestedStopped { + storeInstance.Unwatch("config", nestedEvents) + unregisterNested() + nestedStopped = true + } + }) + defer unregisterPrimary() + + require.NoError(t, storeInstance.Set("config", "first", "dark")) + require.NoError(t, storeInstance.Set("config", "second", "light")) + require.NoError(t, storeInstance.Set("config", "third", "blue")) + + seenMutex.Lock() + assert.Equal(t, []string{"first", "second", "nested:second", "third"}, seen) + seenMutex.Unlock() + + select { + case event, open := <-nestedEvents: + require.True(t, open) + assert.Equal(t, "second", event.Key) + case <-time.After(time.Second): + t.Fatal("timed out waiting for nested watcher event") + } + + _, open := <-nestedEvents + assert.False(t, open, "nested watcher should be closed after callback-driven unwatch") +} + func TestEvents_Notify_Good_PopulatesTimestamp(t *testing.T) { storeInstance, _ := New(":memory:") defer storeInstance.Close() diff --git a/workspace.go b/workspace.go index de971ca..b1e3418 100644 --- a/workspace.go +++ b/workspace.go @@ -181,6 +181,10 @@ func discoverOrphanWorkspacePaths(stateDirectory string) []string { } func discoverOrphanWorkspaces(stateDirectory string, backingStore *Store) []*Workspace { + return loadRecoveredWorkspaces(stateDirectory, backingStore) +} + +func loadRecoveredWorkspaces(stateDirectory string, backingStore *Store) []*Workspace { filesystem := (&core.Fs{}).NewUnrestricted() orphanWorkspaces := make([]*Workspace, 0) for _, databasePath := range discoverOrphanWorkspacePaths(stateDirectory) { @@ -234,25 +238,7 @@ func (storeInstance *Store) RecoverOrphans(stateDirectory string) []*Workspace { return cachedWorkspaces } } - - filesystem := (&core.Fs{}).NewUnrestricted() - var orphanWorkspaces []*Workspace - for _, databasePath := range discoverOrphanWorkspacePaths(stateDirectory) { - workspaceDatabase, err := openWorkspaceDatabase(databasePath) - if err != nil { - continue - } - orphanWorkspace := &Workspace{ - name: workspaceNameFromPath(stateDirectory, databasePath), - backingStore: storeInstance, - workspaceDatabase: workspaceDatabase, - databasePath: databasePath, - filesystem: filesystem, - } - orphanWorkspace.orphanAggregate = orphanWorkspace.captureAggregateSnapshot() - orphanWorkspaces = append(orphanWorkspaces, orphanWorkspace) - } - return orphanWorkspaces + return loadRecoveredWorkspaces(stateDirectory, storeInstance) } // Usage example: `err := workspace.Put("like", map[string]any{"user": "@alice", "post": "video_123"})` -- 2.45.3