From 07bd25816e811344c7c1b9aa2744e349cc4ce3c9 Mon Sep 17 00:00:00 2001 From: Virgil Date: Fri, 3 Apr 2026 05:25:42 +0000 Subject: [PATCH] refactor(store): simplify OnChange callback API Co-Authored-By: Virgil --- docs/RFC-STORE.md | 4 ++-- docs/index.md | 4 ++-- events.go | 46 ++-------------------------------------------- events_test.go | 7 +++++-- 4 files changed, 11 insertions(+), 50 deletions(-) diff --git a/docs/RFC-STORE.md b/docs/RFC-STORE.md index fb624ec..a9781ac 100644 --- a/docs/RFC-STORE.md +++ b/docs/RFC-STORE.md @@ -90,7 +90,7 @@ for group := range st.GroupsSeq() { ... } // Events ch := st.Watch("group") -st.OnChange("group", func(key, val string) { ... }) +st.OnChange(func(event store.Event) { ... }) ``` --- @@ -136,7 +136,7 @@ func (ss *ScopedStore) GetFrom(group, key string) (string, error) { } - `Watch(group string) <-chan Event` — returns buffered channel (cap 16), non-blocking sends drop events - `Unwatch(group string, ch <-chan Event)` — remove a watcher -- `OnChange(group, callback)` — synchronous callback in writer goroutine +- `OnChange(callback)` — synchronous callback in writer goroutine - **Deadlock warning:** `notify()` holds `s.mu` read-lock — calling Watch/Unwatch/OnChange from inside a callback will deadlock --- diff --git a/docs/index.md b/docs/index.md index 12bd7c3..ceeac83 100644 --- a/docs/index.md +++ b/docs/index.md @@ -113,7 +113,7 @@ The entire package lives in a single Go package (`package store`) with three imp |------|---------| | `doc.go` | Package comment with concrete usage examples | | `store.go` | Core `Store` type, CRUD operations (`Get`, `Set`, `SetWithTTL`, `Delete`, `DeleteGroup`), bulk queries (`GetAll`, `All`, `Count`, `CountAll`, `Groups`, `GroupsSeq`), string splitting helpers (`GetSplit`, `GetFields`), template rendering (`Render`), TTL expiry, background purge goroutine | -| `events.go` | `EventType` constants, `Event` struct, `Watcher` type, `Watch`/`Unwatch` subscription management, `OnChange` callback registration, internal `notify` dispatch | +| `events.go` | `EventType` constants, `Event` struct, `Watch`/`Unwatch` channel subscriptions, `OnChange` callback registration, internal `notify` dispatch | | `scope.go` | `ScopedStore` wrapper for namespace isolation, `QuotaConfig` struct, `NewScoped`/`NewScopedWithQuota` constructors, namespace-local helper delegation, quota enforcement logic | Tests are organised in corresponding files: @@ -148,7 +148,7 @@ There are no other direct dependencies. The package uses the Go standard library - **`ScopedStore`** -- wraps a `*Store` with an auto-prefixed namespace. Provides the same API surface with group names transparently prefixed. - **`QuotaConfig`** -- configures per-namespace limits on total keys and distinct groups. - **`Event`** -- describes a single store mutation (type, group, key, value, timestamp). -- **`Watcher`** -- a channel-based subscription to store events, created by `Watch`. `Events` is the read-only channel to select on. +- **`Watch`** -- returns a buffered channel subscription to store events. Use `Unwatch(group, events)` to stop delivery and close the channel. - **`KeyValue`** -- a simple key-value pair struct, used by the `All` iterator. ## Sentinel Errors diff --git a/events.go b/events.go index 6cdac54..5cf1b20 100644 --- a/events.go +++ b/events.go @@ -54,7 +54,6 @@ type Event struct { // the exact callback later. type changeCallbackRegistration struct { registrationID uint64 - group string callback func(Event) } @@ -118,51 +117,13 @@ func (storeInstance *Store) Unwatch(group string, events <-chan Event) { // OnChange registers a synchronous mutation callback. // Usage example: `unregister := storeInstance.OnChange(func(event store.Event) { fmt.Println(event.Group, event.Key, event.Value) })` -// Usage example: `unregister := storeInstance.OnChange("config", func(key, value string) { fmt.Println(key, value) })` -func (storeInstance *Store) OnChange(arguments ...any) func() { - if len(arguments) == 0 { - return func() {} - } - - var ( - callbackGroup string - callback func(Event) - ) - - switch len(arguments) { - case 1: - switch typedCallback := arguments[0].(type) { - case func(Event): - callback = typedCallback - default: - return func() {} - } - case 2: - groupName, ok := arguments[0].(string) - if !ok { - return func() {} - } - callbackGroup = groupName - switch typedCallback := arguments[1].(type) { - case func(Event): - callback = typedCallback - case func(string, string): - callback = func(event Event) { - typedCallback(event.Key, event.Value) - } - default: - return func() {} - } - default: - return func() {} - } - +func (storeInstance *Store) OnChange(callback func(Event)) func() { if callback == nil { return func() {} } registrationID := atomic.AddUint64(&storeInstance.nextCallbackRegistrationID, 1) - callbackRegistration := changeCallbackRegistration{registrationID: registrationID, group: callbackGroup, callback: callback} + callbackRegistration := changeCallbackRegistration{registrationID: registrationID, callback: callback} storeInstance.callbacksLock.Lock() storeInstance.callbacks = append(storeInstance.callbacks, callbackRegistration) @@ -211,9 +172,6 @@ func (storeInstance *Store) notify(event Event) { storeInstance.callbacksLock.RUnlock() for _, callback := range callbacks { - if callback.group != "" && callback.group != "*" && callback.group != event.Group { - continue - } callback.callback(event) } } diff --git a/events_test.go b/events_test.go index 1767ca7..74b4b0c 100644 --- a/events_test.go +++ b/events_test.go @@ -153,8 +153,11 @@ func TestEvents_OnChange_Good_GroupFilteredCallback(t *testing.T) { defer storeInstance.Close() var seen []string - unregister := storeInstance.OnChange("config", func(key, value string) { - seen = append(seen, key+"="+value) + unregister := storeInstance.OnChange(func(event Event) { + if event.Group != "config" { + return + } + seen = append(seen, event.Key+"="+event.Value) }) defer unregister() -- 2.45.3