[agent/codex:gpt-5.4-mini] Read ~/spec/code/core/go/store/RFC.md fully. Find features d... #39
4 changed files with 11 additions and 50 deletions
|
|
@ -90,7 +90,7 @@ for group := range st.GroupsSeq() { ... }
|
|||
|
||||
// Events
|
||||
ch := st.Watch("group")
|
||||
st.OnChange("group", func(key, val string) { ... })
|
||||
st.OnChange(func(event store.Event) { ... })
|
||||
```
|
||||
|
||||
---
|
||||
|
|
@ -136,7 +136,7 @@ func (ss *ScopedStore) GetFrom(group, key string) (string, error) { }
|
|||
|
||||
- `Watch(group string) <-chan Event` — returns buffered channel (cap 16), non-blocking sends drop events
|
||||
- `Unwatch(group string, ch <-chan Event)` — remove a watcher
|
||||
- `OnChange(group, callback)` — synchronous callback in writer goroutine
|
||||
- `OnChange(callback)` — synchronous callback in writer goroutine
|
||||
- **Deadlock warning:** `notify()` holds `s.mu` read-lock — calling Watch/Unwatch/OnChange from inside a callback will deadlock
|
||||
|
||||
---
|
||||
|
|
|
|||
|
|
@ -113,7 +113,7 @@ The entire package lives in a single Go package (`package store`) with three imp
|
|||
|------|---------|
|
||||
| `doc.go` | Package comment with concrete usage examples |
|
||||
| `store.go` | Core `Store` type, CRUD operations (`Get`, `Set`, `SetWithTTL`, `Delete`, `DeleteGroup`), bulk queries (`GetAll`, `All`, `Count`, `CountAll`, `Groups`, `GroupsSeq`), string splitting helpers (`GetSplit`, `GetFields`), template rendering (`Render`), TTL expiry, background purge goroutine |
|
||||
| `events.go` | `EventType` constants, `Event` struct, `Watcher` type, `Watch`/`Unwatch` subscription management, `OnChange` callback registration, internal `notify` dispatch |
|
||||
| `events.go` | `EventType` constants, `Event` struct, `Watch`/`Unwatch` channel subscriptions, `OnChange` callback registration, internal `notify` dispatch |
|
||||
| `scope.go` | `ScopedStore` wrapper for namespace isolation, `QuotaConfig` struct, `NewScoped`/`NewScopedWithQuota` constructors, namespace-local helper delegation, quota enforcement logic |
|
||||
|
||||
Tests are organised in corresponding files:
|
||||
|
|
@ -148,7 +148,7 @@ There are no other direct dependencies. The package uses the Go standard library
|
|||
- **`ScopedStore`** -- wraps a `*Store` with an auto-prefixed namespace. Provides the same API surface with group names transparently prefixed.
|
||||
- **`QuotaConfig`** -- configures per-namespace limits on total keys and distinct groups.
|
||||
- **`Event`** -- describes a single store mutation (type, group, key, value, timestamp).
|
||||
- **`Watcher`** -- a channel-based subscription to store events, created by `Watch`. `Events` is the read-only channel to select on.
|
||||
- **`Watch`** -- returns a buffered channel subscription to store events. Use `Unwatch(group, events)` to stop delivery and close the channel.
|
||||
- **`KeyValue`** -- a simple key-value pair struct, used by the `All` iterator.
|
||||
|
||||
## Sentinel Errors
|
||||
|
|
|
|||
46
events.go
46
events.go
|
|
@ -54,7 +54,6 @@ type Event struct {
|
|||
// the exact callback later.
|
||||
type changeCallbackRegistration struct {
|
||||
registrationID uint64
|
||||
group string
|
||||
callback func(Event)
|
||||
}
|
||||
|
||||
|
|
@ -118,51 +117,13 @@ func (storeInstance *Store) Unwatch(group string, events <-chan Event) {
|
|||
|
||||
// OnChange registers a synchronous mutation callback.
|
||||
// Usage example: `unregister := storeInstance.OnChange(func(event store.Event) { fmt.Println(event.Group, event.Key, event.Value) })`
|
||||
// Usage example: `unregister := storeInstance.OnChange("config", func(key, value string) { fmt.Println(key, value) })`
|
||||
func (storeInstance *Store) OnChange(arguments ...any) func() {
|
||||
if len(arguments) == 0 {
|
||||
return func() {}
|
||||
}
|
||||
|
||||
var (
|
||||
callbackGroup string
|
||||
callback func(Event)
|
||||
)
|
||||
|
||||
switch len(arguments) {
|
||||
case 1:
|
||||
switch typedCallback := arguments[0].(type) {
|
||||
case func(Event):
|
||||
callback = typedCallback
|
||||
default:
|
||||
return func() {}
|
||||
}
|
||||
case 2:
|
||||
groupName, ok := arguments[0].(string)
|
||||
if !ok {
|
||||
return func() {}
|
||||
}
|
||||
callbackGroup = groupName
|
||||
switch typedCallback := arguments[1].(type) {
|
||||
case func(Event):
|
||||
callback = typedCallback
|
||||
case func(string, string):
|
||||
callback = func(event Event) {
|
||||
typedCallback(event.Key, event.Value)
|
||||
}
|
||||
default:
|
||||
return func() {}
|
||||
}
|
||||
default:
|
||||
return func() {}
|
||||
}
|
||||
|
||||
func (storeInstance *Store) OnChange(callback func(Event)) func() {
|
||||
if callback == nil {
|
||||
return func() {}
|
||||
}
|
||||
|
||||
registrationID := atomic.AddUint64(&storeInstance.nextCallbackRegistrationID, 1)
|
||||
callbackRegistration := changeCallbackRegistration{registrationID: registrationID, group: callbackGroup, callback: callback}
|
||||
callbackRegistration := changeCallbackRegistration{registrationID: registrationID, callback: callback}
|
||||
|
||||
storeInstance.callbacksLock.Lock()
|
||||
storeInstance.callbacks = append(storeInstance.callbacks, callbackRegistration)
|
||||
|
|
@ -211,9 +172,6 @@ func (storeInstance *Store) notify(event Event) {
|
|||
storeInstance.callbacksLock.RUnlock()
|
||||
|
||||
for _, callback := range callbacks {
|
||||
if callback.group != "" && callback.group != "*" && callback.group != event.Group {
|
||||
continue
|
||||
}
|
||||
callback.callback(event)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -153,8 +153,11 @@ func TestEvents_OnChange_Good_GroupFilteredCallback(t *testing.T) {
|
|||
defer storeInstance.Close()
|
||||
|
||||
var seen []string
|
||||
unregister := storeInstance.OnChange("config", func(key, value string) {
|
||||
seen = append(seen, key+"="+value)
|
||||
unregister := storeInstance.OnChange(func(event Event) {
|
||||
if event.Group != "config" {
|
||||
return
|
||||
}
|
||||
seen = append(seen, event.Key+"="+event.Value)
|
||||
})
|
||||
defer unregister()
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue