feat(scope): support scoped wildcard watchers
Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
parent
303b75444d
commit
c12aba4145
2 changed files with 122 additions and 1 deletions
102
scope.go
102
scope.go
|
|
@ -3,6 +3,7 @@ package store
|
|||
import (
|
||||
"iter"
|
||||
"regexp"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
core "dappco.re/go/core"
|
||||
|
|
@ -27,6 +28,17 @@ type ScopedStore struct {
|
|||
namespace string
|
||||
MaxKeys int
|
||||
MaxGroups int
|
||||
|
||||
scopedWatchersLock sync.Mutex
|
||||
scopedWatchers map[uintptr]*scopedWatcherBinding
|
||||
}
|
||||
|
||||
type scopedWatcherBinding struct {
|
||||
storeInstance *Store
|
||||
underlyingEvents <-chan Event
|
||||
done chan struct{}
|
||||
stop chan struct{}
|
||||
stopOnce sync.Once
|
||||
}
|
||||
|
||||
func (scopedStore *ScopedStore) storeInstance(operation string) (*Store, error) {
|
||||
|
|
@ -285,7 +297,53 @@ func (scopedStore *ScopedStore) Watch(group string) <-chan Event {
|
|||
if err != nil {
|
||||
return closedEventChannel()
|
||||
}
|
||||
return storeInstance.Watch(scopedStore.namespacedGroup(group))
|
||||
if group != "*" {
|
||||
return storeInstance.Watch(scopedStore.namespacedGroup(group))
|
||||
}
|
||||
|
||||
forwardedEvents := make(chan Event, watcherEventBufferCapacity)
|
||||
binding := &scopedWatcherBinding{
|
||||
storeInstance: storeInstance,
|
||||
underlyingEvents: storeInstance.Watch("*"),
|
||||
done: make(chan struct{}),
|
||||
stop: make(chan struct{}),
|
||||
}
|
||||
|
||||
scopedStore.scopedWatchersLock.Lock()
|
||||
if scopedStore.scopedWatchers == nil {
|
||||
scopedStore.scopedWatchers = make(map[uintptr]*scopedWatcherBinding)
|
||||
}
|
||||
scopedStore.scopedWatchers[channelPointer(forwardedEvents)] = binding
|
||||
scopedStore.scopedWatchersLock.Unlock()
|
||||
|
||||
namespacePrefix := scopedStore.namespacePrefix()
|
||||
go func() {
|
||||
defer close(forwardedEvents)
|
||||
defer close(binding.done)
|
||||
defer scopedStore.forgetScopedWatcher(forwardedEvents)
|
||||
|
||||
for {
|
||||
select {
|
||||
case event, ok := <-binding.underlyingEvents:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
if !core.HasPrefix(event.Group, namespacePrefix) {
|
||||
continue
|
||||
}
|
||||
select {
|
||||
case forwardedEvents <- event:
|
||||
default:
|
||||
}
|
||||
case <-binding.stop:
|
||||
return
|
||||
case <-storeInstance.purgeContext.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return forwardedEvents
|
||||
}
|
||||
|
||||
// Usage example: `scopedStore.Unwatch("config", events)`
|
||||
|
|
@ -294,6 +352,10 @@ func (scopedStore *ScopedStore) Unwatch(group string, events <-chan Event) {
|
|||
if err != nil {
|
||||
return
|
||||
}
|
||||
if group == "*" {
|
||||
scopedStore.forgetAndStopScopedWatcher(events)
|
||||
return
|
||||
}
|
||||
storeInstance.Unwatch(scopedStore.namespacedGroup(group), events)
|
||||
}
|
||||
|
||||
|
|
@ -329,6 +391,44 @@ func (scopedStore *ScopedStore) PurgeExpired() (int64, error) {
|
|||
return removedRows, nil
|
||||
}
|
||||
|
||||
func (scopedStore *ScopedStore) forgetScopedWatcher(events <-chan Event) {
|
||||
if scopedStore == nil || events == nil {
|
||||
return
|
||||
}
|
||||
|
||||
scopedStore.scopedWatchersLock.Lock()
|
||||
defer scopedStore.scopedWatchersLock.Unlock()
|
||||
if scopedStore.scopedWatchers == nil {
|
||||
return
|
||||
}
|
||||
delete(scopedStore.scopedWatchers, channelPointer(events))
|
||||
}
|
||||
|
||||
func (scopedStore *ScopedStore) forgetAndStopScopedWatcher(events <-chan Event) {
|
||||
if scopedStore == nil || events == nil {
|
||||
return
|
||||
}
|
||||
|
||||
scopedStore.scopedWatchersLock.Lock()
|
||||
binding := scopedStore.scopedWatchers[channelPointer(events)]
|
||||
if binding != nil {
|
||||
delete(scopedStore.scopedWatchers, channelPointer(events))
|
||||
}
|
||||
scopedStore.scopedWatchersLock.Unlock()
|
||||
|
||||
if binding == nil {
|
||||
return
|
||||
}
|
||||
|
||||
binding.stopOnce.Do(func() {
|
||||
close(binding.stop)
|
||||
})
|
||||
if binding.storeInstance != nil {
|
||||
binding.storeInstance.Unwatch("*", binding.underlyingEvents)
|
||||
}
|
||||
<-binding.done
|
||||
}
|
||||
|
||||
// checkQuota("store.ScopedStore.Set", "config", "colour") returns nil when the
|
||||
// namespace still has quota available and QuotaExceededError when a new key or
|
||||
// group would exceed the configured limit. Existing keys are treated as
|
||||
|
|
|
|||
|
|
@ -485,6 +485,27 @@ func TestScope_ScopedStore_Good_WatchAndUnwatch(t *testing.T) {
|
|||
require.NoError(t, scopedStore.SetIn("config", "theme", "dark"))
|
||||
}
|
||||
|
||||
func TestScope_ScopedStore_Good_WatchWildcardGroup(t *testing.T) {
|
||||
storeInstance, _ := New(":memory:")
|
||||
defer storeInstance.Close()
|
||||
|
||||
scopedStore := mustScoped(t, storeInstance, "tenant-a")
|
||||
events := scopedStore.Watch("*")
|
||||
|
||||
require.NoError(t, scopedStore.SetIn("config", "theme", "dark"))
|
||||
require.NoError(t, storeInstance.Set("other", "theme", "light"))
|
||||
|
||||
received := drainEvents(events, 1, time.Second)
|
||||
require.Len(t, received, 1)
|
||||
assert.Equal(t, "tenant-a:config", received[0].Group)
|
||||
assert.Equal(t, "theme", received[0].Key)
|
||||
assert.Equal(t, "dark", received[0].Value)
|
||||
|
||||
scopedStore.Unwatch("*", events)
|
||||
_, open := <-events
|
||||
assert.False(t, open, "channel should be closed after wildcard Unwatch")
|
||||
}
|
||||
|
||||
func TestScope_ScopedStore_Good_OnChange(t *testing.T) {
|
||||
storeInstance, _ := New(":memory:")
|
||||
defer storeInstance.Close()
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue