From c12aba414513bd3d1dfe6bbd5e2dbf410a5ded28 Mon Sep 17 00:00:00 2001 From: Virgil Date: Fri, 3 Apr 2026 07:00:35 +0000 Subject: [PATCH] feat(scope): support scoped wildcard watchers Co-Authored-By: Virgil --- scope.go | 102 +++++++++++++++++++++++++++++++++++++++++++++++++- scope_test.go | 21 +++++++++++ 2 files changed, 122 insertions(+), 1 deletion(-) diff --git a/scope.go b/scope.go index 5ff39e7..6f02298 100644 --- a/scope.go +++ b/scope.go @@ -3,6 +3,7 @@ package store import ( "iter" "regexp" + "sync" "time" core "dappco.re/go/core" @@ -27,6 +28,17 @@ type ScopedStore struct { namespace string MaxKeys int MaxGroups int + + scopedWatchersLock sync.Mutex + scopedWatchers map[uintptr]*scopedWatcherBinding +} + +type scopedWatcherBinding struct { + storeInstance *Store + underlyingEvents <-chan Event + done chan struct{} + stop chan struct{} + stopOnce sync.Once } func (scopedStore *ScopedStore) storeInstance(operation string) (*Store, error) { @@ -285,7 +297,53 @@ func (scopedStore *ScopedStore) Watch(group string) <-chan Event { if err != nil { return closedEventChannel() } - return storeInstance.Watch(scopedStore.namespacedGroup(group)) + if group != "*" { + return storeInstance.Watch(scopedStore.namespacedGroup(group)) + } + + forwardedEvents := make(chan Event, watcherEventBufferCapacity) + binding := &scopedWatcherBinding{ + storeInstance: storeInstance, + underlyingEvents: storeInstance.Watch("*"), + done: make(chan struct{}), + stop: make(chan struct{}), + } + + scopedStore.scopedWatchersLock.Lock() + if scopedStore.scopedWatchers == nil { + scopedStore.scopedWatchers = make(map[uintptr]*scopedWatcherBinding) + } + scopedStore.scopedWatchers[channelPointer(forwardedEvents)] = binding + scopedStore.scopedWatchersLock.Unlock() + + namespacePrefix := scopedStore.namespacePrefix() + go func() { + defer close(forwardedEvents) + defer close(binding.done) + defer scopedStore.forgetScopedWatcher(forwardedEvents) + + for { + select { + case event, ok := <-binding.underlyingEvents: + if !ok { + return + } + if !core.HasPrefix(event.Group, namespacePrefix) { + continue + } + select { + case forwardedEvents <- event: + default: + } + case <-binding.stop: + return + case <-storeInstance.purgeContext.Done(): + return + } + } + }() + + return forwardedEvents } // Usage example: `scopedStore.Unwatch("config", events)` @@ -294,6 +352,10 @@ func (scopedStore *ScopedStore) Unwatch(group string, events <-chan Event) { if err != nil { return } + if group == "*" { + scopedStore.forgetAndStopScopedWatcher(events) + return + } storeInstance.Unwatch(scopedStore.namespacedGroup(group), events) } @@ -329,6 +391,44 @@ func (scopedStore *ScopedStore) PurgeExpired() (int64, error) { return removedRows, nil } +func (scopedStore *ScopedStore) forgetScopedWatcher(events <-chan Event) { + if scopedStore == nil || events == nil { + return + } + + scopedStore.scopedWatchersLock.Lock() + defer scopedStore.scopedWatchersLock.Unlock() + if scopedStore.scopedWatchers == nil { + return + } + delete(scopedStore.scopedWatchers, channelPointer(events)) +} + +func (scopedStore *ScopedStore) forgetAndStopScopedWatcher(events <-chan Event) { + if scopedStore == nil || events == nil { + return + } + + scopedStore.scopedWatchersLock.Lock() + binding := scopedStore.scopedWatchers[channelPointer(events)] + if binding != nil { + delete(scopedStore.scopedWatchers, channelPointer(events)) + } + scopedStore.scopedWatchersLock.Unlock() + + if binding == nil { + return + } + + binding.stopOnce.Do(func() { + close(binding.stop) + }) + if binding.storeInstance != nil { + binding.storeInstance.Unwatch("*", binding.underlyingEvents) + } + <-binding.done +} + // checkQuota("store.ScopedStore.Set", "config", "colour") returns nil when the // namespace still has quota available and QuotaExceededError when a new key or // group would exceed the configured limit. Existing keys are treated as diff --git a/scope_test.go b/scope_test.go index 0c5f3e8..0aa567f 100644 --- a/scope_test.go +++ b/scope_test.go @@ -485,6 +485,27 @@ func TestScope_ScopedStore_Good_WatchAndUnwatch(t *testing.T) { require.NoError(t, scopedStore.SetIn("config", "theme", "dark")) } +func TestScope_ScopedStore_Good_WatchWildcardGroup(t *testing.T) { + storeInstance, _ := New(":memory:") + defer storeInstance.Close() + + scopedStore := mustScoped(t, storeInstance, "tenant-a") + events := scopedStore.Watch("*") + + require.NoError(t, scopedStore.SetIn("config", "theme", "dark")) + require.NoError(t, storeInstance.Set("other", "theme", "light")) + + received := drainEvents(events, 1, time.Second) + require.Len(t, received, 1) + assert.Equal(t, "tenant-a:config", received[0].Group) + assert.Equal(t, "theme", received[0].Key) + assert.Equal(t, "dark", received[0].Value) + + scopedStore.Unwatch("*", events) + _, open := <-events + assert.False(t, open, "channel should be closed after wildcard Unwatch") +} + func TestScope_ScopedStore_Good_OnChange(t *testing.T) { storeInstance, _ := New(":memory:") defer storeInstance.Close()