From 5c7e243fc079b7de15cb7803a015a2225ed9e4a9 Mon Sep 17 00:00:00 2001 From: Virgil Date: Fri, 3 Apr 2026 04:44:45 +0000 Subject: [PATCH] feat(store): align public API with RFC Co-Authored-By: Virgil --- README.md | 10 +- coverage_test.go | 5 +- doc.go | 6 +- events.go | 123 ++++++------ events_test.go | 473 ++++++++-------------------------------------- scope.go | 23 ++- scope_test.go | 248 +++++++----------------- store.go | 10 +- store_test.go | 8 +- workspace_test.go | 6 +- 10 files changed, 249 insertions(+), 663 deletions(-) diff --git a/README.md b/README.md index 27560eb..9af5c4b 100644 --- a/README.md +++ b/README.md @@ -43,17 +43,17 @@ func main() { fmt.Println(colourValue) // Watch "config" mutations and print each event as it arrives. - watcher := storeInstance.Watch("config", "*") - defer storeInstance.Unwatch(watcher) + events := storeInstance.Watch("config") + defer storeInstance.Unwatch("config", events) go func() { - for event := range watcher.Events { + for event := range events { fmt.Println(event.Type, event.Group, event.Key, event.Value) } }() // Store tenant-42 preferences under the "tenant-42:" prefix. - scopedStore, err := store.NewScoped(storeInstance, "tenant-42") - if err != nil { + scopedStore := store.NewScoped(storeInstance, "tenant-42") + if scopedStore == nil { return } if err := scopedStore.Set("preferences", "locale", "en-GB"); err != nil { diff --git a/coverage_test.go b/coverage_test.go index 0d9d5bf..b43f564 100644 --- a/coverage_test.go +++ b/coverage_test.go @@ -284,9 +284,10 @@ func TestCoverage_ScopedStore_Bad_GroupsClosedStore(t *testing.T) { storeInstance, _ := New(":memory:") require.NoError(t, storeInstance.Close()) - scopedStore, err := NewScoped(storeInstance, "tenant-a") - require.NoError(t, err) + scopedStore := NewScoped(storeInstance, "tenant-a") + require.NotNil(t, scopedStore) + var err error _, err = scopedStore.Groups("") require.Error(t, err) assert.Contains(t, err.Error(), "store.Groups") diff --git a/doc.go b/doc.go index 133d22a..5a5232d 100644 --- a/doc.go +++ b/doc.go @@ -30,10 +30,10 @@ // fmt.Println(entry.Key, entry.Value) // } // -// watcher := storeInstance.Watch("config", "*") -// defer storeInstance.Unwatch(watcher) +// events := storeInstance.Watch("config") +// defer storeInstance.Unwatch("config", events) // go func() { -// for event := range watcher.Events { +// for event := range events { // fmt.Println(event.Type, event.Group, event.Key, event.Value) // } // }() diff --git a/events.go b/events.go index a3ce353..6877f22 100644 --- a/events.go +++ b/events.go @@ -1,7 +1,7 @@ package store import ( - "slices" + "reflect" "sync" "sync/atomic" "time" @@ -50,20 +50,6 @@ type Event struct { Timestamp time.Time } -// Watcher exposes the read-only event stream returned by Watch. -// Usage example: `watcher := storeInstance.Watch("config", "*"); defer storeInstance.Unwatch(watcher); for event := range watcher.Events { if event.Type == EventDeleteGroup { return } }` -type Watcher struct { - // Usage example: `for event := range watcher.Events { if event.Key == "colour" { return } }` - Events <-chan Event - - // eventsChannel is the internal write channel (same underlying channel as Events). - eventsChannel chan Event - - groupPattern string - keyPattern string - registrationID uint64 -} - // changeCallbackRegistration keeps the registration ID so unregister can remove // the exact callback later. type changeCallbackRegistration struct { @@ -71,51 +57,62 @@ type changeCallbackRegistration struct { callback func(Event) } -// Watch("config", "*") can hold 16 pending events before non-blocking sends -// start dropping new ones. +// Watch("config") can hold 16 pending events before non-blocking sends start +// dropping new ones. const watcherEventBufferCapacity = 16 -// Watch registers a buffered subscription for matching mutations. -// Usage example: `watcher := storeInstance.Watch("*", "*")` -// Usage example: `watcher := storeInstance.Watch("config")` -func (storeInstance *Store) Watch(group string, key ...string) *Watcher { - keyPattern := "*" - if len(key) > 0 && key[0] != "" { - keyPattern = key[0] - } +// Watch registers a buffered subscription for one group. +// Usage example: `events := storeInstance.Watch("config")` +// Usage example: `events := storeInstance.Watch("*")` +func (storeInstance *Store) Watch(group string) <-chan Event { eventChannel := make(chan Event, watcherEventBufferCapacity) - watcher := &Watcher{ - Events: eventChannel, - eventsChannel: eventChannel, - groupPattern: group, - keyPattern: keyPattern, - registrationID: atomic.AddUint64(&storeInstance.nextWatcherRegistrationID, 1), - } storeInstance.watchersLock.Lock() - storeInstance.watchers = append(storeInstance.watchers, watcher) + if storeInstance.watchers == nil { + storeInstance.watchers = make(map[string][]chan Event) + } + storeInstance.watchers[group] = append(storeInstance.watchers[group], eventChannel) storeInstance.watchersLock.Unlock() - return watcher + return eventChannel } -// Unwatch removes a watcher and closes its event stream. -// Usage example: `storeInstance.Unwatch(watcher)` -func (storeInstance *Store) Unwatch(watcher *Watcher) { - if watcher == nil { +// Unwatch removes a watcher for one group and closes its event stream. +// Usage example: `storeInstance.Unwatch("config", events)` +func (storeInstance *Store) Unwatch(group string, events <-chan Event) { + if events == nil { return } storeInstance.watchersLock.Lock() defer storeInstance.watchersLock.Unlock() - storeInstance.watchers = slices.DeleteFunc(storeInstance.watchers, func(existing *Watcher) bool { - if existing.registrationID == watcher.registrationID { - close(watcher.eventsChannel) - return true + registeredEvents := storeInstance.watchers[group] + if len(registeredEvents) == 0 { + return + } + + eventsPointer := channelPointer(events) + nextRegisteredEvents := registeredEvents[:0] + removed := false + for _, registeredChannel := range registeredEvents { + if channelPointer(registeredChannel) == eventsPointer { + if !removed { + close(registeredChannel) + removed = true + } + continue } - return false - }) + nextRegisteredEvents = append(nextRegisteredEvents, registeredChannel) + } + if !removed { + return + } + if len(nextRegisteredEvents) == 0 { + delete(storeInstance.watchers, group) + return + } + storeInstance.watchers[group] = nextRegisteredEvents } // OnChange registers a synchronous mutation callback. @@ -140,9 +137,12 @@ func (storeInstance *Store) OnChange(arguments ...any) func() { once.Do(func() { storeInstance.callbacksLock.Lock() defer storeInstance.callbacksLock.Unlock() - storeInstance.callbacks = slices.DeleteFunc(storeInstance.callbacks, func(existing changeCallbackRegistration) bool { - return existing.registrationID == registrationID - }) + for i := range storeInstance.callbacks { + if storeInstance.callbacks[i].registrationID == registrationID { + storeInstance.callbacks = append(storeInstance.callbacks[:i], storeInstance.callbacks[i+1:]...) + return + } + } }) } } @@ -188,13 +188,15 @@ func onChangeCallback(arguments []any) func(Event) { // deadlocking. func (storeInstance *Store) notify(event Event) { storeInstance.watchersLock.RLock() - for _, watcher := range storeInstance.watchers { - if !watcherMatches(watcher, event) { - continue - } - // Non-blocking send: drop the event rather than block the writer. + for _, registeredChannel := range storeInstance.watchers["*"] { select { - case watcher.eventsChannel <- event: + case registeredChannel <- event: + default: + } + } + for _, registeredChannel := range storeInstance.watchers[event.Group] { + select { + case registeredChannel <- event: default: } } @@ -209,16 +211,9 @@ func (storeInstance *Store) notify(event Event) { } } -// watcherMatches reports whether Watch("config", "*") should receive -// Event{Group: "config", Key: "colour"}. -func watcherMatches(watcher *Watcher, event Event) bool { - if watcher.groupPattern != "*" && watcher.groupPattern != event.Group { - return false +func channelPointer(eventChannel <-chan Event) uintptr { + if eventChannel == nil { + return 0 } - if watcher.keyPattern != "*" && watcher.keyPattern != event.Key { - // EventDeleteGroup has an empty Key — only wildcard watchers or - // group-level watchers (key="*") should receive it. - return false - } - return true + return reflect.ValueOf(eventChannel).Pointer() } diff --git a/events_test.go b/events_test.go index 924d7a4..add265e 100644 --- a/events_test.go +++ b/events_test.go @@ -2,7 +2,6 @@ package store import ( "sync" - "sync/atomic" "testing" "time" @@ -11,111 +10,37 @@ import ( "github.com/stretchr/testify/require" ) -// --------------------------------------------------------------------------- -// Watch — specific key -// --------------------------------------------------------------------------- - -func TestEvents_Watch_Good_SpecificKey(t *testing.T) { +func TestEvents_Watch_Good_Group(t *testing.T) { storeInstance, _ := New(":memory:") defer storeInstance.Close() - watcher := storeInstance.Watch("config", "theme") - defer storeInstance.Unwatch(watcher) - - require.NoError(t, storeInstance.Set("config", "theme", "dark")) - - select { - case event := <-watcher.Events: - assert.Equal(t, EventSet, event.Type) - assert.Equal(t, "config", event.Group) - assert.Equal(t, "theme", event.Key) - assert.Equal(t, "dark", event.Value) - assert.False(t, event.Timestamp.IsZero()) - case <-time.After(time.Second): - t.Fatal("timed out waiting for event") - } - - // A Set to a different key in the same group should NOT trigger this watcher. - require.NoError(t, storeInstance.Set("config", "colour", "blue")) - - select { - case event := <-watcher.Events: - t.Fatalf("unexpected event for non-matching key: %+v", event) - case <-time.After(50 * time.Millisecond): - // Expected: no event. - } -} - -// --------------------------------------------------------------------------- -// Watch — wildcard key "*" -// --------------------------------------------------------------------------- - -func TestEvents_Watch_Good_WildcardKey(t *testing.T) { - storeInstance, _ := New(":memory:") - defer storeInstance.Close() - - watcher := storeInstance.Watch("config", "*") - defer storeInstance.Unwatch(watcher) + events := storeInstance.Watch("config") + defer storeInstance.Unwatch("config", events) require.NoError(t, storeInstance.Set("config", "theme", "dark")) require.NoError(t, storeInstance.Set("config", "colour", "blue")) - received := drainEvents(watcher.Events, 2, time.Second) + received := drainEvents(events, 2, time.Second) require.Len(t, received, 2) assert.Equal(t, "theme", received[0].Key) assert.Equal(t, "colour", received[1].Key) + assert.Equal(t, "config", received[0].Group) + assert.Equal(t, "config", received[1].Group) } -func TestEvents_Watch_Good_DefaultWildcardKey(t *testing.T) { +func TestEvents_Watch_Good_WildcardGroup(t *testing.T) { storeInstance, _ := New(":memory:") defer storeInstance.Close() - watcher := storeInstance.Watch("config") - defer storeInstance.Unwatch(watcher) - - require.NoError(t, storeInstance.Set("config", "theme", "dark")) - require.NoError(t, storeInstance.Set("config", "colour", "blue")) - - received := drainEvents(watcher.Events, 2, time.Second) - require.Len(t, received, 2) - assert.Equal(t, "theme", received[0].Key) - assert.Equal(t, "colour", received[1].Key) -} - -func TestEvents_Watch_Good_GroupMismatch(t *testing.T) { - storeInstance, _ := New(":memory:") - defer storeInstance.Close() - - watcher := storeInstance.Watch("config", "*") - defer storeInstance.Unwatch(watcher) - - require.NoError(t, storeInstance.Set("other", "theme", "dark")) - - select { - case event := <-watcher.Events: - t.Fatalf("unexpected event for non-matching group: %+v", event) - case <-time.After(50 * time.Millisecond): - // Expected: no event. - } -} - -// --------------------------------------------------------------------------- -// Watch — wildcard ("*", "*") matches everything -// --------------------------------------------------------------------------- - -func TestEvents_Watch_Good_WildcardAll(t *testing.T) { - storeInstance, _ := New(":memory:") - defer storeInstance.Close() - - watcher := storeInstance.Watch("*", "*") - defer storeInstance.Unwatch(watcher) + events := storeInstance.Watch("*") + defer storeInstance.Unwatch("*", events) require.NoError(t, storeInstance.Set("g1", "k1", "v1")) require.NoError(t, storeInstance.Set("g2", "k2", "v2")) require.NoError(t, storeInstance.Delete("g1", "k1")) require.NoError(t, storeInstance.DeleteGroup("g2")) - received := drainEvents(watcher.Events, 4, time.Second) + received := drainEvents(events, 4, time.Second) require.Len(t, received, 4) assert.Equal(t, EventSet, received[0].Type) assert.Equal(t, EventSet, received[1].Type) @@ -123,22 +48,16 @@ func TestEvents_Watch_Good_WildcardAll(t *testing.T) { assert.Equal(t, EventDeleteGroup, received[3].Type) } -// --------------------------------------------------------------------------- -// Unwatch — stops delivery, channel closed -// --------------------------------------------------------------------------- - func TestEvents_Unwatch_Good_StopsDelivery(t *testing.T) { storeInstance, _ := New(":memory:") defer storeInstance.Close() - watcher := storeInstance.Watch("g", "k") - storeInstance.Unwatch(watcher) + events := storeInstance.Watch("g") + storeInstance.Unwatch("g", events) - // Channel should be closed. - _, open := <-watcher.Events + _, open := <-events assert.False(t, open, "channel should be closed after Unwatch") - // Set after Unwatch should not panic or block. require.NoError(t, storeInstance.Set("g", "k", "v")) } @@ -146,114 +65,65 @@ func TestEvents_Unwatch_Good_Idempotent(t *testing.T) { storeInstance, _ := New(":memory:") defer storeInstance.Close() - watcher := storeInstance.Watch("g", "k") - - // Calling Unwatch multiple times should not panic. - storeInstance.Unwatch(watcher) - storeInstance.Unwatch(watcher) // second call is a no-op + events := storeInstance.Watch("g") + storeInstance.Unwatch("g", events) + storeInstance.Unwatch("g", events) } -func TestEvents_Unwatch_Good_NilWatcher(t *testing.T) { +func TestEvents_Unwatch_Good_NilChannel(t *testing.T) { storeInstance, _ := New(":memory:") defer storeInstance.Close() - storeInstance.Unwatch(nil) + storeInstance.Unwatch("g", nil) } -// --------------------------------------------------------------------------- -// Delete triggers event -// --------------------------------------------------------------------------- - func TestEvents_Watch_Good_DeleteEvent(t *testing.T) { storeInstance, _ := New(":memory:") defer storeInstance.Close() - watcher := storeInstance.Watch("g", "k") - defer storeInstance.Unwatch(watcher) + events := storeInstance.Watch("g") + defer storeInstance.Unwatch("g", events) require.NoError(t, storeInstance.Set("g", "k", "v")) - // Drain the Set event. - <-watcher.Events + <-events require.NoError(t, storeInstance.Delete("g", "k")) select { - case event := <-watcher.Events: + case event := <-events: assert.Equal(t, EventDelete, event.Type) assert.Equal(t, "g", event.Group) assert.Equal(t, "k", event.Key) - assert.Empty(t, event.Value, "Delete events should have empty Value") + assert.Empty(t, event.Value) case <-time.After(time.Second): t.Fatal("timed out waiting for delete event") } } -func TestEvents_Watch_Good_DeleteMissingKeyDoesNotEmitEvent(t *testing.T) { - storeInstance, _ := New(":memory:") - defer storeInstance.Close() - - watcher := storeInstance.Watch("*", "*") - defer storeInstance.Unwatch(watcher) - - require.NoError(t, storeInstance.Delete("g", "missing")) - - select { - case event := <-watcher.Events: - t.Fatalf("unexpected event for missing key delete: %+v", event) - default: - } -} - -// --------------------------------------------------------------------------- -// DeleteGroup triggers event -// --------------------------------------------------------------------------- - func TestEvents_Watch_Good_DeleteGroupEvent(t *testing.T) { storeInstance, _ := New(":memory:") defer storeInstance.Close() - // A wildcard-key watcher for the group should receive DeleteGroup events. - watcher := storeInstance.Watch("g", "*") - defer storeInstance.Unwatch(watcher) + events := storeInstance.Watch("g") + defer storeInstance.Unwatch("g", events) require.NoError(t, storeInstance.Set("g", "a", "1")) require.NoError(t, storeInstance.Set("g", "b", "2")) - // Drain Set events. - <-watcher.Events - <-watcher.Events + <-events + <-events require.NoError(t, storeInstance.DeleteGroup("g")) select { - case event := <-watcher.Events: + case event := <-events: assert.Equal(t, EventDeleteGroup, event.Type) assert.Equal(t, "g", event.Group) - assert.Empty(t, event.Key, "DeleteGroup events should have empty Key") + assert.Empty(t, event.Key) case <-time.After(time.Second): t.Fatal("timed out waiting for delete_group event") } } -func TestEvents_Watch_Good_DeleteMissingGroupDoesNotEmitEvent(t *testing.T) { - storeInstance, _ := New(":memory:") - defer storeInstance.Close() - - watcher := storeInstance.Watch("*", "*") - defer storeInstance.Unwatch(watcher) - - require.NoError(t, storeInstance.DeleteGroup("missing")) - - select { - case event := <-watcher.Events: - t.Fatalf("unexpected event for missing group delete: %+v", event) - default: - } -} - -// --------------------------------------------------------------------------- -// OnChange — callback fires on mutations -// --------------------------------------------------------------------------- - func TestEvents_OnChange_Good_Fires(t *testing.T) { storeInstance, _ := New(":memory:") defer storeInstance.Close() @@ -278,242 +148,96 @@ func TestEvents_OnChange_Good_Fires(t *testing.T) { assert.Equal(t, EventDelete, events[1].Type) } -// --------------------------------------------------------------------------- -// OnChange — unregister stops callback -// --------------------------------------------------------------------------- - -func TestEvents_OnChange_Good_Unregister(t *testing.T) { +func TestEvents_OnChange_Good_GroupFilter(t *testing.T) { storeInstance, _ := New(":memory:") defer storeInstance.Close() - var count atomic.Int32 - - unregister := storeInstance.OnChange(func(event Event) { - count.Add(1) - }) - - require.NoError(t, storeInstance.Set("g", "k", "v1")) - assert.Equal(t, int32(1), count.Load()) - - unregister() - - require.NoError(t, storeInstance.Set("g", "k", "v2")) - assert.Equal(t, int32(1), count.Load(), "callback should not fire after unregister") - - // Calling unregister again should not panic. - unregister() -} - -func TestEvents_OnChange_Good_NilCallbackNoOp(t *testing.T) { - storeInstance, _ := New(":memory:") - defer storeInstance.Close() - - unregister := storeInstance.OnChange(nil) - require.NotNil(t, unregister) - - unregister() - require.NoError(t, storeInstance.Set("g", "k", "v")) - unregister() -} - -func TestEvents_OnChange_Good_GroupCallback(t *testing.T) { - storeInstance, _ := New(":memory:") - defer storeInstance.Close() - - var keys []string + var calls []string unregister := storeInstance.OnChange("config", func(key, value string) { - keys = append(keys, key) + calls = append(calls, key+"="+value) }) defer unregister() require.NoError(t, storeInstance.Set("config", "theme", "dark")) - require.NoError(t, storeInstance.Set("other", "theme", "ignored")) + require.NoError(t, storeInstance.Set("other", "theme", "light")) - assert.Equal(t, []string{"theme"}, keys) + assert.Equal(t, []string{"theme=dark"}, calls) } -// --------------------------------------------------------------------------- -// OnChange — callback can manage subscriptions while handling an event -// --------------------------------------------------------------------------- - -func TestEvents_OnChange_Good_ReentrantSubscriptions(t *testing.T) { +func TestEvents_Watch_Good_BufferDrops(t *testing.T) { storeInstance, _ := New(":memory:") defer storeInstance.Close() - var callbackCount atomic.Int32 - var unregister func() - unregister = storeInstance.OnChange(func(event Event) { - callbackCount.Add(1) + events := storeInstance.Watch("g") + defer storeInstance.Unwatch("g", events) - nestedWatcher := storeInstance.Watch("nested", "*") - storeInstance.Unwatch(nestedWatcher) - - if unregister != nil { - unregister() - } - }) - - writeDone := make(chan error, 1) - go func() { - writeDone <- storeInstance.Set("g", "k", "v") - }() - - select { - case err := <-writeDone: - require.NoError(t, err) - case <-time.After(time.Second): - t.Fatal("timed out waiting for Set to complete") + for i := 0; i < watcherEventBufferCapacity+8; i++ { + require.NoError(t, storeInstance.Set("g", core.Sprintf("k-%d", i), "v")) } - assert.Equal(t, int32(1), callbackCount.Load()) - - // The callback unregistered itself, so later writes should not increment it. - require.NoError(t, storeInstance.Set("g", "k", "v2")) - assert.Equal(t, int32(1), callbackCount.Load()) + received := drainEvents(events, watcherEventBufferCapacity, time.Second) + assert.LessOrEqual(t, len(received), watcherEventBufferCapacity) } -// --------------------------------------------------------------------------- -// Buffer-full doesn't block the writer -// --------------------------------------------------------------------------- - -func TestEvents_Watch_Good_BufferFullDoesNotBlock(t *testing.T) { - storeInstance, _ := New(":memory:") - defer storeInstance.Close() - - watcher := storeInstance.Watch("g", "*") - defer storeInstance.Unwatch(watcher) - - // Fill the buffer (cap 16) plus extra writes. None should block. - done := make(chan struct{}) - go func() { - defer close(done) - for i := range 32 { - require.NoError(t, storeInstance.Set("g", core.Sprintf("k%d", i), "v")) - } - }() - - select { - case <-done: - // Success: all writes completed without blocking. - case <-time.After(5 * time.Second): - t.Fatal("writes blocked — buffer-full condition caused deadlock") - } - - // Drain what we can — should get exactly watcherEventBufferCapacity events. - var received int - for range watcherEventBufferCapacity { - select { - case <-watcher.Events: - received++ - default: - } - } - assert.Equal(t, watcherEventBufferCapacity, received, "should receive exactly buffer-size events") -} - -// --------------------------------------------------------------------------- -// Multiple watchers on same key -// --------------------------------------------------------------------------- - -func TestEvents_Watch_Good_MultipleWatchersSameKey(t *testing.T) { - storeInstance, _ := New(":memory:") - defer storeInstance.Close() - - firstWatcher := storeInstance.Watch("g", "k") - secondWatcher := storeInstance.Watch("g", "k") - defer storeInstance.Unwatch(firstWatcher) - defer storeInstance.Unwatch(secondWatcher) - - require.NoError(t, storeInstance.Set("g", "k", "v")) - - // Both watchers should receive the event independently. - select { - case event := <-firstWatcher.Events: - assert.Equal(t, EventSet, event.Type) - case <-time.After(time.Second): - t.Fatal("firstWatcher timed out") - } - - select { - case event := <-secondWatcher.Events: - assert.Equal(t, EventSet, event.Type) - case <-time.After(time.Second): - t.Fatal("secondWatcher timed out") - } -} - -// --------------------------------------------------------------------------- -// Concurrent Watch/Unwatch during writes (race test) -// --------------------------------------------------------------------------- - func TestEvents_Watch_Good_ConcurrentWatchUnwatch(t *testing.T) { storeInstance, _ := New(":memory:") defer storeInstance.Close() - const goroutines = 10 - const ops = 50 + const workers = 10 + var wg sync.WaitGroup + wg.Add(workers) - var waitGroup sync.WaitGroup - - // Writers — continuously mutate the store. - waitGroup.Go(func() { - for i := range goroutines * ops { - _ = storeInstance.Set("g", core.Sprintf("k%d", i), "v") - } - }) - - // Watchers — add and remove watchers concurrently. - for range goroutines { - waitGroup.Go(func() { - for range ops { - watcher := storeInstance.Watch("g", "*") - // Drain a few events to exercise the channel path. - for range 3 { - select { - case <-watcher.Events: - case <-time.After(time.Millisecond): - } - } - storeInstance.Unwatch(watcher) - } - }) + for worker := 0; worker < workers; worker++ { + go func(worker int) { + defer wg.Done() + group := core.Sprintf("g-%d", worker) + events := storeInstance.Watch(group) + _ = storeInstance.Set(group, "k", "v") + storeInstance.Unwatch(group, events) + }(worker) } - waitGroup.Wait() - // If we got here without a data race or panic, the test passes. + wg.Wait() } -// --------------------------------------------------------------------------- -// ScopedStore events — prefixed group name -// --------------------------------------------------------------------------- - -func TestEvents_Watch_Good_ScopedStoreEvents(t *testing.T) { +func TestEvents_Watch_Good_ScopedStoreEventGroup(t *testing.T) { storeInstance, _ := New(":memory:") defer storeInstance.Close() - scopedStore, err := NewScoped(storeInstance, "tenant-a") - require.NoError(t, err) + scopedStore := NewScoped(storeInstance, "tenant-a") + require.NotNil(t, scopedStore) - // Watch on the underlying store with the full prefixed group name. - watcher := storeInstance.Watch("tenant-a:config", "theme") - defer storeInstance.Unwatch(watcher) + events := storeInstance.Watch("tenant-a:config") + defer storeInstance.Unwatch("tenant-a:config", events) require.NoError(t, scopedStore.Set("config", "theme", "dark")) select { - case event := <-watcher.Events: - assert.Equal(t, EventSet, event.Type) + case event := <-events: assert.Equal(t, "tenant-a:config", event.Group) assert.Equal(t, "theme", event.Key) - assert.Equal(t, "dark", event.Value) case <-time.After(time.Second): - t.Fatal("timed out waiting for scoped store event") + t.Fatal("timed out waiting for scoped event") } } -// --------------------------------------------------------------------------- -// EventType.String() -// --------------------------------------------------------------------------- +func TestEvents_Watch_Good_SetWithTTL(t *testing.T) { + storeInstance, _ := New(":memory:") + defer storeInstance.Close() + + events := storeInstance.Watch("g") + defer storeInstance.Unwatch("g", events) + + require.NoError(t, storeInstance.SetWithTTL("g", "ephemeral", "v", time.Minute)) + + select { + case event := <-events: + assert.Equal(t, EventSet, event.Type) + assert.Equal(t, "ephemeral", event.Key) + case <-time.After(time.Second): + t.Fatal("timed out waiting for TTL event") + } +} func TestEvents_EventType_Good_String(t *testing.T) { assert.Equal(t, "set", EventSet.String()) @@ -522,45 +246,16 @@ func TestEvents_EventType_Good_String(t *testing.T) { assert.Equal(t, "unknown", EventType(99).String()) } -// --------------------------------------------------------------------------- -// SetWithTTL emits events -// --------------------------------------------------------------------------- - -func TestEvents_Watch_Good_SetWithTTLEmitsEvent(t *testing.T) { - storeInstance, _ := New(":memory:") - defer storeInstance.Close() - - watcher := storeInstance.Watch("g", "k") - defer storeInstance.Unwatch(watcher) - - require.NoError(t, storeInstance.SetWithTTL("g", "k", "ttl-val", time.Hour)) - - select { - case event := <-watcher.Events: - assert.Equal(t, EventSet, event.Type) - assert.Equal(t, "g", event.Group) - assert.Equal(t, "k", event.Key) - assert.Equal(t, "ttl-val", event.Value) - case <-time.After(time.Second): - t.Fatal("timed out waiting for SetWithTTL event") - } -} - -// --------------------------------------------------------------------------- -// Helpers -// --------------------------------------------------------------------------- - -// drainEvents collects up to n events from ch within the given timeout. -func drainEvents(ch <-chan Event, count int, timeout time.Duration) []Event { - var events []Event +func drainEvents(events <-chan Event, count int, timeout time.Duration) []Event { + received := make([]Event, 0, count) deadline := time.After(timeout) - for range count { + for len(received) < count { select { - case event := <-ch: - events = append(events, event) + case event := <-events: + received = append(received, event) case <-deadline: - return events + return received } } - return events + return received } diff --git a/scope.go b/scope.go index c2a275f..664350f 100644 --- a/scope.go +++ b/scope.go @@ -23,7 +23,7 @@ type QuotaConfig struct { } // ScopedStore prefixes group names with namespace + ":" before delegating to Store. -// Usage example: `scopedStore, err := store.NewScoped(storeInstance, "tenant-a"); if err != nil { return }; if err := scopedStore.Set("config", "colour", "blue"); err != nil { return }` +// Usage example: `scopedStore := store.NewScoped(storeInstance, "tenant-a"); if scopedStore == nil { return }; if err := scopedStore.Set("config", "colour", "blue"); err != nil { return }` type ScopedStore struct { storeInstance *Store namespace string @@ -32,24 +32,27 @@ type ScopedStore struct { } // NewScoped validates a namespace and prefixes groups with namespace + ":". -// Usage example: `scopedStore, err := store.NewScoped(storeInstance, "tenant-a"); if err != nil { return }` -func NewScoped(storeInstance *Store, namespace string) (*ScopedStore, error) { +// Usage example: `scopedStore := store.NewScoped(storeInstance, "tenant-a")` +func NewScoped(storeInstance *Store, namespace string) *ScopedStore { if storeInstance == nil { - return nil, core.E("store.NewScoped", "store instance is nil", nil) + return nil } if !validNamespace.MatchString(namespace) { - return nil, core.E("store.NewScoped", core.Sprintf("namespace %q is invalid; use names like %q or %q", namespace, "tenant-a", "tenant-42"), nil) + return nil } scopedStore := &ScopedStore{storeInstance: storeInstance, namespace: namespace} - return scopedStore, nil + return scopedStore } // NewScopedWithQuota adds per-namespace key and group limits. // Usage example: `scopedStore, err := store.NewScopedWithQuota(storeInstance, "tenant-a", store.QuotaConfig{MaxKeys: 100, MaxGroups: 10}); if err != nil { return }` func NewScopedWithQuota(storeInstance *Store, namespace string, quota QuotaConfig) (*ScopedStore, error) { - scopedStore, err := NewScoped(storeInstance, namespace) - if err != nil { - return nil, err + scopedStore := NewScoped(storeInstance, namespace) + if scopedStore == nil { + if storeInstance == nil { + return nil, core.E("store.NewScopedWithQuota", "store instance is nil", nil) + } + return nil, core.E("store.NewScopedWithQuota", core.Sprintf("namespace %q is invalid; use names like %q or %q", namespace, "tenant-a", "tenant-42"), nil) } if quota.MaxKeys < 0 || quota.MaxGroups < 0 { return nil, core.E( @@ -80,7 +83,7 @@ func (scopedStore *ScopedStore) trimNamespacePrefix(groupName string) string { } // Namespace returns the namespace string. -// Usage example: `scopedStore, err := store.NewScoped(storeInstance, "tenant-a"); if err != nil { return }; namespace := scopedStore.Namespace(); fmt.Println(namespace)` +// Usage example: `scopedStore := store.NewScoped(storeInstance, "tenant-a"); if scopedStore == nil { return }; namespace := scopedStore.Namespace(); fmt.Println(namespace)` func (scopedStore *ScopedStore) Namespace() string { return scopedStore.namespace } diff --git a/scope_test.go b/scope_test.go index e7fda9b..8cbebcc 100644 --- a/scope_test.go +++ b/scope_test.go @@ -9,6 +9,14 @@ import ( "github.com/stretchr/testify/require" ) +func mustScoped(t *testing.T, storeInstance *Store, namespace string) *ScopedStore { + t.Helper() + + scopedStore := NewScoped(storeInstance, namespace) + require.NotNil(t, scopedStore) + return scopedStore +} + // --------------------------------------------------------------------------- // NewScoped — constructor validation // --------------------------------------------------------------------------- @@ -17,9 +25,7 @@ func TestScope_NewScoped_Good(t *testing.T) { storeInstance, _ := New(":memory:") defer storeInstance.Close() - scopedStore, err := NewScoped(storeInstance, "tenant-1") - require.NoError(t, err) - require.NotNil(t, scopedStore) + scopedStore := mustScoped(t, storeInstance, "tenant-1") assert.Equal(t, "tenant-1", scopedStore.Namespace()) } @@ -27,11 +33,8 @@ func TestScope_NewScoped_Good_AlphanumericHyphens(t *testing.T) { storeInstance, _ := New(":memory:") defer storeInstance.Close() - valid := []string{"abc", "ABC", "123", "a-b-c", "tenant-42", "A1-B2"} - for _, namespace := range valid { - scopedStore, err := NewScoped(storeInstance, namespace) - require.NoError(t, err, "namespace %q should be valid", namespace) - require.NotNil(t, scopedStore) + for _, namespace := range []string{"abc", "ABC", "123", "a-b-c", "tenant-42", "A1-B2"} { + require.NotNil(t, NewScoped(storeInstance, namespace), "namespace %q should be valid", namespace) } } @@ -39,25 +42,19 @@ func TestScope_NewScoped_Bad_Empty(t *testing.T) { storeInstance, _ := New(":memory:") defer storeInstance.Close() - _, err := NewScoped(storeInstance, "") - require.Error(t, err) - assert.Contains(t, err.Error(), "invalid") + assert.Nil(t, NewScoped(storeInstance, "")) } func TestScope_NewScoped_Bad_NilStore(t *testing.T) { - _, err := NewScoped(nil, "tenant-a") - require.Error(t, err) - assert.Contains(t, err.Error(), "store instance is nil") + assert.Nil(t, NewScoped(nil, "tenant-a")) } func TestScope_NewScoped_Bad_InvalidChars(t *testing.T) { storeInstance, _ := New(":memory:") defer storeInstance.Close() - invalid := []string{"foo.bar", "foo:bar", "foo bar", "foo/bar", "foo_bar", "tenant!", "@ns"} - for _, namespace := range invalid { - _, err := NewScoped(storeInstance, namespace) - require.Error(t, err, "namespace %q should be invalid", namespace) + for _, namespace := range []string{"foo.bar", "foo:bar", "foo bar", "foo/bar", "foo_bar", "tenant!", "@ns"} { + assert.Nil(t, NewScoped(storeInstance, namespace), "namespace %q should be invalid", namespace) } } @@ -67,7 +64,7 @@ func TestScope_NewScopedWithQuota_Bad_InvalidNamespace(t *testing.T) { _, err := NewScopedWithQuota(storeInstance, "tenant_a", QuotaConfig{MaxKeys: 1}) require.Error(t, err) - assert.Contains(t, err.Error(), "store.NewScoped") + assert.Contains(t, err.Error(), "namespace") } func TestScope_NewScopedWithQuota_Bad_NilStore(t *testing.T) { @@ -113,7 +110,7 @@ func TestScope_ScopedStore_Good_SetGet(t *testing.T) { storeInstance, _ := New(":memory:") defer storeInstance.Close() - scopedStore, _ := NewScoped(storeInstance, "tenant-a") + scopedStore := mustScoped(t, storeInstance, "tenant-a") require.NoError(t, scopedStore.Set("config", "theme", "dark")) value, err := scopedStore.Get("config", "theme") @@ -125,7 +122,7 @@ func TestScope_ScopedStore_Good_DefaultGroupHelpers(t *testing.T) { storeInstance, _ := New(":memory:") defer storeInstance.Close() - scopedStore, _ := NewScoped(storeInstance, "tenant-a") + scopedStore := mustScoped(t, storeInstance, "tenant-a") require.NoError(t, scopedStore.Set("theme", "dark")) value, err := scopedStore.Get("theme") @@ -141,7 +138,7 @@ func TestScope_ScopedStore_Good_SetInGetFrom(t *testing.T) { storeInstance, _ := New(":memory:") defer storeInstance.Close() - scopedStore, _ := NewScoped(storeInstance, "tenant-a") + scopedStore := mustScoped(t, storeInstance, "tenant-a") require.NoError(t, scopedStore.SetIn("config", "theme", "dark")) value, err := scopedStore.GetFrom("config", "theme") @@ -153,15 +150,13 @@ func TestScope_ScopedStore_Good_PrefixedInUnderlyingStore(t *testing.T) { storeInstance, _ := New(":memory:") defer storeInstance.Close() - scopedStore, _ := NewScoped(storeInstance, "tenant-a") + scopedStore := mustScoped(t, storeInstance, "tenant-a") require.NoError(t, scopedStore.Set("config", "key", "val")) - // The underlying store should have the prefixed group name. value, err := storeInstance.Get("tenant-a:config", "key") require.NoError(t, err) assert.Equal(t, "val", value) - // Direct access without prefix should fail. _, err = storeInstance.Get("config", "key") assert.True(t, core.Is(err, NotFoundError)) } @@ -170,8 +165,8 @@ func TestScope_ScopedStore_Good_NamespaceIsolation(t *testing.T) { storeInstance, _ := New(":memory:") defer storeInstance.Close() - alphaStore, _ := NewScoped(storeInstance, "tenant-a") - betaStore, _ := NewScoped(storeInstance, "tenant-b") + alphaStore := mustScoped(t, storeInstance, "tenant-a") + betaStore := mustScoped(t, storeInstance, "tenant-b") require.NoError(t, alphaStore.Set("config", "colour", "blue")) require.NoError(t, betaStore.Set("config", "colour", "red")) @@ -189,7 +184,7 @@ func TestScope_ScopedStore_Good_Delete(t *testing.T) { storeInstance, _ := New(":memory:") defer storeInstance.Close() - scopedStore, _ := NewScoped(storeInstance, "tenant-a") + scopedStore := mustScoped(t, storeInstance, "tenant-a") require.NoError(t, scopedStore.Set("g", "k", "v")) require.NoError(t, scopedStore.Delete("g", "k")) @@ -201,7 +196,7 @@ func TestScope_ScopedStore_Good_DeleteGroup(t *testing.T) { storeInstance, _ := New(":memory:") defer storeInstance.Close() - scopedStore, _ := NewScoped(storeInstance, "tenant-a") + scopedStore := mustScoped(t, storeInstance, "tenant-a") require.NoError(t, scopedStore.Set("g", "a", "1")) require.NoError(t, scopedStore.Set("g", "b", "2")) require.NoError(t, scopedStore.DeleteGroup("g")) @@ -215,8 +210,8 @@ func TestScope_ScopedStore_Good_GetAll(t *testing.T) { storeInstance, _ := New(":memory:") defer storeInstance.Close() - alphaStore, _ := NewScoped(storeInstance, "tenant-a") - betaStore, _ := NewScoped(storeInstance, "tenant-b") + alphaStore := mustScoped(t, storeInstance, "tenant-a") + betaStore := mustScoped(t, storeInstance, "tenant-b") require.NoError(t, alphaStore.Set("items", "x", "1")) require.NoError(t, alphaStore.Set("items", "y", "2")) @@ -235,7 +230,7 @@ func TestScope_ScopedStore_Good_All(t *testing.T) { storeInstance, _ := New(":memory:") defer storeInstance.Close() - scopedStore, _ := NewScoped(storeInstance, "tenant-a") + scopedStore := mustScoped(t, storeInstance, "tenant-a") require.NoError(t, scopedStore.Set("items", "first", "1")) require.NoError(t, scopedStore.Set("items", "second", "2")) @@ -252,7 +247,7 @@ func TestScope_ScopedStore_Good_All_SortedByKey(t *testing.T) { storeInstance, _ := New(":memory:") defer storeInstance.Close() - scopedStore, _ := NewScoped(storeInstance, "tenant-a") + scopedStore := mustScoped(t, storeInstance, "tenant-a") require.NoError(t, scopedStore.Set("items", "charlie", "3")) require.NoError(t, scopedStore.Set("items", "alpha", "1")) require.NoError(t, scopedStore.Set("items", "bravo", "2")) @@ -270,7 +265,7 @@ func TestScope_ScopedStore_Good_Count(t *testing.T) { storeInstance, _ := New(":memory:") defer storeInstance.Close() - scopedStore, _ := NewScoped(storeInstance, "tenant-a") + scopedStore := mustScoped(t, storeInstance, "tenant-a") require.NoError(t, scopedStore.Set("g", "a", "1")) require.NoError(t, scopedStore.Set("g", "b", "2")) @@ -283,7 +278,7 @@ func TestScope_ScopedStore_Good_SetWithTTL(t *testing.T) { storeInstance, _ := New(":memory:") defer storeInstance.Close() - scopedStore, _ := NewScoped(storeInstance, "tenant-a") + scopedStore := mustScoped(t, storeInstance, "tenant-a") require.NoError(t, scopedStore.SetWithTTL("g", "k", "v", time.Hour)) value, err := scopedStore.Get("g", "k") @@ -295,7 +290,7 @@ func TestScope_ScopedStore_Good_SetWithTTL_Expires(t *testing.T) { storeInstance, _ := New(":memory:") defer storeInstance.Close() - scopedStore, _ := NewScoped(storeInstance, "tenant-a") + scopedStore := mustScoped(t, storeInstance, "tenant-a") require.NoError(t, scopedStore.SetWithTTL("g", "k", "v", 1*time.Millisecond)) time.Sleep(5 * time.Millisecond) @@ -307,7 +302,7 @@ func TestScope_ScopedStore_Good_Render(t *testing.T) { storeInstance, _ := New(":memory:") defer storeInstance.Close() - scopedStore, _ := NewScoped(storeInstance, "tenant-a") + scopedStore := mustScoped(t, storeInstance, "tenant-a") require.NoError(t, scopedStore.Set("user", "name", "Alice")) renderedTemplate, err := scopedStore.Render("Hello {{ .name }}", "user") @@ -319,8 +314,8 @@ func TestScope_ScopedStore_Good_BulkHelpers(t *testing.T) { storeInstance, _ := New(":memory:") defer storeInstance.Close() - alphaStore, _ := NewScoped(storeInstance, "tenant-a") - betaStore, _ := NewScoped(storeInstance, "tenant-b") + alphaStore := mustScoped(t, storeInstance, "tenant-a") + betaStore := mustScoped(t, storeInstance, "tenant-b") require.NoError(t, alphaStore.Set("config", "colour", "blue")) require.NoError(t, alphaStore.Set("sessions", "token", "abc123")) @@ -361,7 +356,7 @@ func TestScope_ScopedStore_Good_GroupsSeqStopsEarly(t *testing.T) { storeInstance, _ := New(":memory:") defer storeInstance.Close() - scopedStore, _ := NewScoped(storeInstance, "tenant-a") + scopedStore := mustScoped(t, storeInstance, "tenant-a") require.NoError(t, scopedStore.Set("alpha", "a", "1")) require.NoError(t, scopedStore.Set("beta", "b", "2")) @@ -380,7 +375,7 @@ func TestScope_ScopedStore_Good_GroupsSeqSorted(t *testing.T) { storeInstance, _ := New(":memory:") defer storeInstance.Close() - scopedStore, _ := NewScoped(storeInstance, "tenant-a") + scopedStore := mustScoped(t, storeInstance, "tenant-a") require.NoError(t, scopedStore.Set("charlie", "c", "3")) require.NoError(t, scopedStore.Set("alpha", "a", "1")) require.NoError(t, scopedStore.Set("bravo", "b", "2")) @@ -398,7 +393,7 @@ func TestScope_ScopedStore_Good_GetSplitAndGetFields(t *testing.T) { storeInstance, _ := New(":memory:") defer storeInstance.Close() - scopedStore, _ := NewScoped(storeInstance, "tenant-a") + scopedStore := mustScoped(t, storeInstance, "tenant-a") require.NoError(t, scopedStore.Set("config", "hosts", "alpha,beta,gamma")) require.NoError(t, scopedStore.Set("config", "flags", "one two\tthree\n")) @@ -425,7 +420,7 @@ func TestScope_ScopedStore_Good_PurgeExpired(t *testing.T) { storeInstance, _ := New(":memory:") defer storeInstance.Close() - scopedStore, _ := NewScoped(storeInstance, "tenant-a") + scopedStore := mustScoped(t, storeInstance, "tenant-a") require.NoError(t, scopedStore.SetWithTTL("session", "token", "abc123", 1*time.Millisecond)) time.Sleep(5 * time.Millisecond) @@ -441,8 +436,8 @@ func TestScope_ScopedStore_Good_PurgeExpired_NamespaceLocal(t *testing.T) { storeInstance, _ := New(":memory:") defer storeInstance.Close() - alphaStore, _ := NewScoped(storeInstance, "tenant-a") - betaStore, _ := NewScoped(storeInstance, "tenant-b") + alphaStore := mustScoped(t, storeInstance, "tenant-a") + betaStore := mustScoped(t, storeInstance, "tenant-b") require.NoError(t, alphaStore.SetWithTTL("session", "alpha-token", "alpha", 1*time.Millisecond)) require.NoError(t, betaStore.SetWithTTL("session", "beta-token", "beta", 1*time.Millisecond)) @@ -470,15 +465,13 @@ func TestScope_Quota_Good_MaxKeys(t *testing.T) { scopedStore, err := NewScopedWithQuota(storeInstance, "tenant-a", QuotaConfig{MaxKeys: 5}) require.NoError(t, err) - // Insert 5 keys across different groups — should be fine. for i := range 5 { require.NoError(t, scopedStore.Set("g", keyName(i), "v")) } - // 6th key should fail. err = scopedStore.Set("g", "overflow", "v") require.Error(t, err) - assert.True(t, core.Is(err, QuotaExceededError), "expected QuotaExceededError, got: %v", err) + assert.True(t, core.Is(err, QuotaExceededError)) } func TestScope_Quota_Bad_QuotaCheckQueryError(t *testing.T) { @@ -508,7 +501,6 @@ func TestScope_Quota_Good_MaxKeys_AcrossGroups(t *testing.T) { require.NoError(t, scopedStore.Set("g2", "b", "2")) require.NoError(t, scopedStore.Set("g3", "c", "3")) - // Total is now 3 — any new key should fail regardless of group. err := scopedStore.Set("g4", "d", "4") assert.True(t, core.Is(err, QuotaExceededError)) } @@ -522,8 +514,6 @@ func TestScope_Quota_Good_UpsertDoesNotCount(t *testing.T) { require.NoError(t, scopedStore.Set("g", "a", "1")) require.NoError(t, scopedStore.Set("g", "b", "2")) require.NoError(t, scopedStore.Set("g", "c", "3")) - - // Upserting existing key should succeed. require.NoError(t, scopedStore.Set("g", "a", "updated")) value, err := scopedStore.Get("g", "a") @@ -540,8 +530,6 @@ func TestScope_Quota_Good_DeleteAndReInsert(t *testing.T) { require.NoError(t, scopedStore.Set("g", "a", "1")) require.NoError(t, scopedStore.Set("g", "b", "2")) require.NoError(t, scopedStore.Set("g", "c", "3")) - - // Delete one key, then insert a new one — should work. require.NoError(t, scopedStore.Delete("g", "c")) require.NoError(t, scopedStore.Set("g", "d", "4")) } @@ -552,7 +540,6 @@ func TestScope_Quota_Good_ZeroMeansUnlimited(t *testing.T) { scopedStore, _ := NewScopedWithQuota(storeInstance, "tenant-a", QuotaConfig{MaxKeys: 0, MaxGroups: 0}) - // Should be able to insert many keys and groups without error. for i := range 100 { require.NoError(t, scopedStore.Set("g", keyName(i), "v")) } @@ -564,18 +551,15 @@ func TestScope_Quota_Good_ExpiredKeysExcluded(t *testing.T) { scopedStore, _ := NewScopedWithQuota(storeInstance, "tenant-a", QuotaConfig{MaxKeys: 3}) - // Insert 3 keys, 2 with short TTL. require.NoError(t, scopedStore.SetWithTTL("g", "temp1", "v", 1*time.Millisecond)) require.NoError(t, scopedStore.SetWithTTL("g", "temp2", "v", 1*time.Millisecond)) require.NoError(t, scopedStore.Set("g", "permanent", "v")) time.Sleep(5 * time.Millisecond) - // After expiry, only 1 key counts — should be able to insert 2 more. require.NoError(t, scopedStore.Set("g", "new1", "v")) require.NoError(t, scopedStore.Set("g", "new2", "v")) - // Now at 3 — next should fail. err := scopedStore.Set("g", "new3", "v") assert.True(t, core.Is(err, QuotaExceededError)) } @@ -607,7 +591,6 @@ func TestScope_Quota_Good_MaxGroups(t *testing.T) { require.NoError(t, scopedStore.Set("g2", "k", "v")) require.NoError(t, scopedStore.Set("g3", "k", "v")) - // 4th group should fail. err := scopedStore.Set("g4", "k", "v") require.Error(t, err) assert.True(t, core.Is(err, QuotaExceededError)) @@ -621,8 +604,6 @@ func TestScope_Quota_Good_MaxGroups_ExistingGroupOK(t *testing.T) { require.NoError(t, scopedStore.Set("g1", "a", "1")) require.NoError(t, scopedStore.Set("g2", "b", "2")) - - // Adding more keys to existing groups should be fine. require.NoError(t, scopedStore.Set("g1", "c", "3")) require.NoError(t, scopedStore.Set("g2", "d", "4")) } @@ -635,8 +616,6 @@ func TestScope_Quota_Good_MaxGroups_DeleteAndRecreate(t *testing.T) { require.NoError(t, scopedStore.Set("g1", "k", "v")) require.NoError(t, scopedStore.Set("g2", "k", "v")) - - // Delete a group, then create a new one. require.NoError(t, scopedStore.DeleteGroup("g1")) require.NoError(t, scopedStore.Set("g3", "k", "v")) } @@ -658,13 +637,11 @@ func TestScope_Quota_Good_MaxGroups_ExpiredGroupExcluded(t *testing.T) { scopedStore, _ := NewScopedWithQuota(storeInstance, "tenant-a", QuotaConfig{MaxGroups: 2}) - // Create 2 groups, one with only TTL keys. require.NoError(t, scopedStore.SetWithTTL("g1", "k", "v", 1*time.Millisecond)) require.NoError(t, scopedStore.Set("g2", "k", "v")) time.Sleep(5 * time.Millisecond) - // g1's only key has expired, so group count should be 1 — we can create a new one. require.NoError(t, scopedStore.Set("g3", "k", "v")) } @@ -677,11 +654,9 @@ func TestScope_Quota_Good_BothLimits(t *testing.T) { require.NoError(t, scopedStore.Set("g1", "a", "1")) require.NoError(t, scopedStore.Set("g2", "b", "2")) - // Group limit hit. err := scopedStore.Set("g3", "c", "3") assert.True(t, core.Is(err, QuotaExceededError)) - // But adding to existing groups is fine (within key limit). require.NoError(t, scopedStore.Set("g1", "d", "4")) } @@ -697,11 +672,9 @@ func TestScope_Quota_Good_DoesNotAffectOtherNamespaces(t *testing.T) { require.NoError(t, betaStore.Set("g", "b1", "v")) require.NoError(t, betaStore.Set("g", "b2", "v")) - // alphaStore is at limit — but betaStore's keys don't count against alphaStore. err := alphaStore.Set("g", "a3", "v") assert.True(t, core.Is(err, QuotaExceededError)) - // betaStore is also at limit independently. err = betaStore.Set("g", "b3", "v") assert.True(t, core.Is(err, QuotaExceededError)) } @@ -732,21 +705,15 @@ func TestScope_CountAll_Good_WithPrefix_Wildcards(t *testing.T) { storeInstance, _ := New(":memory:") defer storeInstance.Close() - // Add keys in groups that look like wildcards. require.NoError(t, storeInstance.Set("user_1", "k", "v")) require.NoError(t, storeInstance.Set("user_2", "k", "v")) require.NoError(t, storeInstance.Set("user%test", "k", "v")) require.NoError(t, storeInstance.Set("user_test", "k", "v")) - // Prefix "user_" should ONLY match groups starting with "user_". - // Since we escape "_", it matches literal "_". - // Groups: "user_1", "user_2", "user_test" (3 total). - // "user%test" is NOT matched because "_" is literal. count, err := storeInstance.CountAll("user_") require.NoError(t, err) assert.Equal(t, 3, count) - // Prefix "user%" should ONLY match "user%test". count, err = storeInstance.CountAll("user%") require.NoError(t, err) assert.Equal(t, 1, count) @@ -764,36 +731,6 @@ func TestScope_CountAll_Good_EmptyPrefix(t *testing.T) { assert.Equal(t, 2, count) } -func TestScope_CountAll_Good_ExcludesExpired(t *testing.T) { - storeInstance, _ := New(":memory:") - defer storeInstance.Close() - - require.NoError(t, storeInstance.Set("ns:g", "permanent", "v")) - require.NoError(t, storeInstance.SetWithTTL("ns:g", "temp", "v", 1*time.Millisecond)) - time.Sleep(5 * time.Millisecond) - - count, err := storeInstance.CountAll("ns:") - require.NoError(t, err) - assert.Equal(t, 1, count, "expired keys should not be counted") -} - -func TestScope_CountAll_Good_Empty(t *testing.T) { - storeInstance, _ := New(":memory:") - defer storeInstance.Close() - - count, err := storeInstance.CountAll("nonexistent:") - require.NoError(t, err) - assert.Equal(t, 0, count) -} - -func TestScope_CountAll_Bad_ClosedStore(t *testing.T) { - storeInstance, _ := New(":memory:") - storeInstance.Close() - - _, err := storeInstance.CountAll("") - require.Error(t, err) -} - // --------------------------------------------------------------------------- // Groups // --------------------------------------------------------------------------- @@ -802,106 +739,57 @@ func TestScope_Groups_Good_WithPrefix(t *testing.T) { storeInstance, _ := New(":memory:") defer storeInstance.Close() - require.NoError(t, storeInstance.Set("ns-a:g1", "k", "v")) - require.NoError(t, storeInstance.Set("ns-a:g2", "k", "v")) - require.NoError(t, storeInstance.Set("ns-a:g2", "k2", "v")) // duplicate group - require.NoError(t, storeInstance.Set("ns-b:g1", "k", "v")) + require.NoError(t, storeInstance.Set("ns-a:group-1", "k", "v")) + require.NoError(t, storeInstance.Set("ns-a:group-2", "k", "v")) + require.NoError(t, storeInstance.Set("ns-b:group-1", "k", "v")) groups, err := storeInstance.Groups("ns-a:") require.NoError(t, err) - assert.Len(t, groups, 2) - assert.Contains(t, groups, "ns-a:g1") - assert.Contains(t, groups, "ns-a:g2") + assert.Equal(t, []string{"ns-a:group-1", "ns-a:group-2"}, groups) } -func TestScope_Groups_Good_EmptyPrefix(t *testing.T) { +func TestScope_GroupsSeq_Good_EmptyPrefix(t *testing.T) { storeInstance, _ := New(":memory:") defer storeInstance.Close() - require.NoError(t, storeInstance.Set("g1", "k", "v")) - require.NoError(t, storeInstance.Set("g2", "k", "v")) - require.NoError(t, storeInstance.Set("g3", "k", "v")) + require.NoError(t, storeInstance.Set("g1", "k1", "v")) + require.NoError(t, storeInstance.Set("g2", "k2", "v")) - groups, err := storeInstance.Groups("") - require.NoError(t, err) - assert.Len(t, groups, 3) + var groups []string + for groupName, err := range storeInstance.GroupsSeq("") { + require.NoError(t, err) + groups = append(groups, groupName) + } + assert.Equal(t, []string{"g1", "g2"}, groups) } -func TestScope_Groups_Good_Distinct(t *testing.T) { +func TestScope_GroupsSeq_Good_StopsEarly(t *testing.T) { storeInstance, _ := New(":memory:") defer storeInstance.Close() - // Multiple keys in the same group should produce one entry. - require.NoError(t, storeInstance.Set("g1", "a", "v")) - require.NoError(t, storeInstance.Set("g1", "b", "v")) - require.NoError(t, storeInstance.Set("g1", "c", "v")) + require.NoError(t, storeInstance.Set("g1", "k1", "v")) + require.NoError(t, storeInstance.Set("g2", "k2", "v")) - groups, err := storeInstance.Groups("") - require.NoError(t, err) - assert.Len(t, groups, 1) - assert.Equal(t, "g1", groups[0]) + count := 0 + for range storeInstance.GroupsSeq("") { + count++ + break + } + assert.Equal(t, 1, count) } -func TestScope_Groups_Good_ExcludesExpired(t *testing.T) { - storeInstance, _ := New(":memory:") - defer storeInstance.Close() - - require.NoError(t, storeInstance.Set("ns:g1", "permanent", "v")) - require.NoError(t, storeInstance.SetWithTTL("ns:g2", "temp", "v", 1*time.Millisecond)) - time.Sleep(5 * time.Millisecond) - - groups, err := storeInstance.Groups("ns:") - require.NoError(t, err) - assert.Len(t, groups, 1, "group with only expired keys should be excluded") - assert.Equal(t, "ns:g1", groups[0]) +func keyName(index int) string { + return core.Sprintf("key-%02d", index) } -func TestScope_Groups_Good_SortedByGroupName(t *testing.T) { - storeInstance, _ := New(":memory:") - defer storeInstance.Close() - - require.NoError(t, storeInstance.Set("charlie", "c", "3")) - require.NoError(t, storeInstance.Set("alpha", "a", "1")) - require.NoError(t, storeInstance.Set("bravo", "b", "2")) - - groups, err := storeInstance.Groups("") - require.NoError(t, err) - assert.Equal(t, []string{"alpha", "bravo", "charlie"}, groups) -} - -func TestScope_Groups_Good_Empty(t *testing.T) { - storeInstance, _ := New(":memory:") - defer storeInstance.Close() - - groups, err := storeInstance.Groups("nonexistent:") - require.NoError(t, err) - assert.Empty(t, groups) -} - -func TestScope_Groups_Bad_ClosedStore(t *testing.T) { - storeInstance, _ := New(":memory:") - storeInstance.Close() - - _, err := storeInstance.Groups("") - require.Error(t, err) -} - -// --------------------------------------------------------------------------- -// Helpers -// --------------------------------------------------------------------------- - -func keyName(i int) string { - return "key-" + string(rune('a'+i%26)) -} - -func rawEntryCount(t *testing.T, storeInstance *Store, group string) int { - t.Helper() +func rawEntryCount(tb testing.TB, storeInstance *Store, group string) int { + tb.Helper() var count int err := storeInstance.database.QueryRow( "SELECT COUNT(*) FROM "+entriesTableName+" WHERE "+entryGroupColumn+" = ?", group, ).Scan(&count) - require.NoError(t, err) + require.NoError(tb, err) return count } diff --git a/store.go b/store.go index 3cb9f31..7751912 100644 --- a/store.go +++ b/store.go @@ -50,11 +50,10 @@ type Store struct { journal journalConfig // Event dispatch state. - watchers []*Watcher + watchers map[string][]chan Event callbacks []changeCallbackRegistration watchersLock sync.RWMutex // protects watcher registration and dispatch callbacksLock sync.RWMutex // protects callback registration and dispatch - nextWatcherRegistrationID uint64 // monotonic ID for watcher registrations nextCallbackRegistrationID uint64 // monotonic ID for callback registrations } @@ -93,7 +92,12 @@ func New(databasePath string, options ...StoreOption) (*Store, error) { } purgeContext, cancel := context.WithCancel(context.Background()) - storeInstance := &Store{database: sqliteDatabase, cancelPurge: cancel, purgeInterval: 60 * time.Second} + storeInstance := &Store{ + database: sqliteDatabase, + cancelPurge: cancel, + purgeInterval: 60 * time.Second, + watchers: make(map[string][]chan Event), + } for _, option := range options { if option != nil { option(storeInstance) diff --git a/store_test.go b/store_test.go index 4f462cd..5d2a42b 100644 --- a/store_test.go +++ b/store_test.go @@ -1092,11 +1092,11 @@ func TestStore_SetWithTTL_Good_ExpiresOnGetEmitsDeleteEvent(t *testing.T) { storeInstance, _ := New(":memory:") defer storeInstance.Close() - watcher := storeInstance.Watch("g", "ephemeral") - defer storeInstance.Unwatch(watcher) + events := storeInstance.Watch("g") + defer storeInstance.Unwatch("g", events) require.NoError(t, storeInstance.SetWithTTL("g", "ephemeral", "gone-soon", 1*time.Millisecond)) - <-watcher.Events + <-events time.Sleep(5 * time.Millisecond) @@ -1105,7 +1105,7 @@ func TestStore_SetWithTTL_Good_ExpiresOnGetEmitsDeleteEvent(t *testing.T) { assert.True(t, core.Is(err, NotFoundError), "expired key should be NotFoundError") select { - case event := <-watcher.Events: + case event := <-events: assert.Equal(t, EventDelete, event.Type) assert.Equal(t, "g", event.Group) assert.Equal(t, "ephemeral", event.Key) diff --git a/workspace_test.go b/workspace_test.go index 21ba622..4e95d4c 100644 --- a/workspace_test.go +++ b/workspace_test.go @@ -118,8 +118,8 @@ func TestWorkspace_Commit_Good_EmitsSummaryEvent(t *testing.T) { require.NoError(t, err) defer storeInstance.Close() - watcher := storeInstance.Watch(workspaceSummaryGroup("scroll-session"), "summary") - defer storeInstance.Unwatch(watcher) + events := storeInstance.Watch(workspaceSummaryGroup("scroll-session")) + defer storeInstance.Unwatch(workspaceSummaryGroup("scroll-session"), events) workspace, err := storeInstance.NewWorkspace("scroll-session") require.NoError(t, err) @@ -131,7 +131,7 @@ func TestWorkspace_Commit_Good_EmitsSummaryEvent(t *testing.T) { require.True(t, result.OK, "workspace commit failed: %v", result.Value) select { - case event := <-watcher.Events: + case event := <-events: assert.Equal(t, EventSet, event.Type) assert.Equal(t, workspaceSummaryGroup("scroll-session"), event.Group) assert.Equal(t, "summary", event.Key) -- 2.45.3