refactor(store): make orphan recovery deterministic

Align the watcher examples with the current API and sort recovered workspaces for predictable output.

Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
Virgil 2026-04-03 04:56:08 +00:00
parent 2353bdf2f7
commit c2f7fc26ff
3 changed files with 25 additions and 14 deletions

View file

@ -146,23 +146,22 @@ Events are emitted synchronously after each successful database write inside the
### Watch/Unwatch
`Watch(group, key)` creates a `Watcher` with a buffered channel (`Events <-chan Event`, capacity 16).
`Watch(group)` creates a buffered event channel (`<-chan Event`, capacity 16).
| group argument | key argument | Receives |
|---|---|---|
| `"mygroup"` | `"mykey"` | Only mutations to that exact key |
| `"mygroup"` | `"*"` | All mutations within the group, including `DeleteGroup` |
| `"*"` | `"*"` | Every mutation across the entire store |
| group argument | Receives |
|---|---|
| `"mygroup"` | Mutations within that group, including `DeleteGroup` |
| `"*"` | Every mutation across the entire store |
`Unwatch(watcher)` removes the watcher from the registry and closes its channel. It is safe to call multiple times; subsequent calls are no-ops.
`Unwatch(group, events)` removes the watcher from the registry and closes its channel. It is safe to call multiple times; subsequent calls are no-ops.
**Backpressure.** Event dispatch to a watcher channel is non-blocking: if the channel buffer is full, the event is dropped silently. This prevents a slow consumer from blocking a writer. Applications that cannot afford dropped events should drain the channel promptly or use `OnChange` callbacks instead.
```go
watcher := storeInstance.Watch("config", "*")
defer storeInstance.Unwatch(watcher)
events := storeInstance.Watch("config")
defer storeInstance.Unwatch("config", events)
for event := range watcher.Events {
for event := range events {
fmt.Println(event.Type, event.Group, event.Key, event.Value)
}
```
@ -187,7 +186,7 @@ Callbacks may safely register or unregister watchers and callbacks while handlin
The `notify(event Event)` method first acquires the watcher read-lock, iterates all watchers with non-blocking channel sends, then releases the lock. It then acquires the callback read-lock, snapshots the registered callbacks, releases the lock, and invokes each callback synchronously. This keeps watcher delivery non-blocking while allowing callbacks to manage subscriptions re-entrantly.
Watcher matching is handled by the `watcherMatches` helper, which checks the group and key filters against the event. Wildcard `"*"` matches any value in its position.
Watcher delivery is grouped by the registered group name. Wildcard `"*"` matches every mutation across the entire store.
## Namespace Isolation (ScopedStore)

View file

@ -89,10 +89,10 @@ func main() {
}
// Watch "config" changes and print each event as it arrives.
watcher := storeInstance.Watch("config", "*")
defer storeInstance.Unwatch(watcher)
events := storeInstance.Watch("config")
defer storeInstance.Unwatch("config", events)
go func() {
for event := range watcher.Events {
for event := range events {
fmt.Println("event", event.Type, event.Group, event.Key, event.Value)
}
}()

View file

@ -3,6 +3,7 @@ package store
import (
"database/sql"
"io/fs"
"slices"
"sync"
"time"
@ -99,6 +100,17 @@ func (storeInstance *Store) RecoverOrphans(stateDirectory string) []*Workspace {
return nil
}
slices.SortFunc(entries, func(left, right fs.DirEntry) int {
switch {
case left.Name() < right.Name():
return -1
case left.Name() > right.Name():
return 1
default:
return 0
}
})
var workspaces []*Workspace
for _, dirEntry := range entries {
if dirEntry.IsDir() || !core.HasSuffix(dirEntry.Name(), ".duckdb") {