[agent/codex:gpt-5.4-mini] Read ~/spec/code/core/go/store/RFC.md fully. Find features d... #31
10 changed files with 249 additions and 663 deletions
10
README.md
10
README.md
|
|
@ -43,17 +43,17 @@ func main() {
|
|||
fmt.Println(colourValue)
|
||||
|
||||
// Watch "config" mutations and print each event as it arrives.
|
||||
watcher := storeInstance.Watch("config", "*")
|
||||
defer storeInstance.Unwatch(watcher)
|
||||
events := storeInstance.Watch("config")
|
||||
defer storeInstance.Unwatch("config", events)
|
||||
go func() {
|
||||
for event := range watcher.Events {
|
||||
for event := range events {
|
||||
fmt.Println(event.Type, event.Group, event.Key, event.Value)
|
||||
}
|
||||
}()
|
||||
|
||||
// Store tenant-42 preferences under the "tenant-42:" prefix.
|
||||
scopedStore, err := store.NewScoped(storeInstance, "tenant-42")
|
||||
if err != nil {
|
||||
scopedStore := store.NewScoped(storeInstance, "tenant-42")
|
||||
if scopedStore == nil {
|
||||
return
|
||||
}
|
||||
if err := scopedStore.Set("preferences", "locale", "en-GB"); err != nil {
|
||||
|
|
|
|||
|
|
@ -284,9 +284,10 @@ func TestCoverage_ScopedStore_Bad_GroupsClosedStore(t *testing.T) {
|
|||
storeInstance, _ := New(":memory:")
|
||||
require.NoError(t, storeInstance.Close())
|
||||
|
||||
scopedStore, err := NewScoped(storeInstance, "tenant-a")
|
||||
require.NoError(t, err)
|
||||
scopedStore := NewScoped(storeInstance, "tenant-a")
|
||||
require.NotNil(t, scopedStore)
|
||||
|
||||
var err error
|
||||
_, err = scopedStore.Groups("")
|
||||
require.Error(t, err)
|
||||
assert.Contains(t, err.Error(), "store.Groups")
|
||||
|
|
|
|||
6
doc.go
6
doc.go
|
|
@ -30,10 +30,10 @@
|
|||
// fmt.Println(entry.Key, entry.Value)
|
||||
// }
|
||||
//
|
||||
// watcher := storeInstance.Watch("config", "*")
|
||||
// defer storeInstance.Unwatch(watcher)
|
||||
// events := storeInstance.Watch("config")
|
||||
// defer storeInstance.Unwatch("config", events)
|
||||
// go func() {
|
||||
// for event := range watcher.Events {
|
||||
// for event := range events {
|
||||
// fmt.Println(event.Type, event.Group, event.Key, event.Value)
|
||||
// }
|
||||
// }()
|
||||
|
|
|
|||
123
events.go
123
events.go
|
|
@ -1,7 +1,7 @@
|
|||
package store
|
||||
|
||||
import (
|
||||
"slices"
|
||||
"reflect"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
|
@ -50,20 +50,6 @@ type Event struct {
|
|||
Timestamp time.Time
|
||||
}
|
||||
|
||||
// Watcher exposes the read-only event stream returned by Watch.
|
||||
// Usage example: `watcher := storeInstance.Watch("config", "*"); defer storeInstance.Unwatch(watcher); for event := range watcher.Events { if event.Type == EventDeleteGroup { return } }`
|
||||
type Watcher struct {
|
||||
// Usage example: `for event := range watcher.Events { if event.Key == "colour" { return } }`
|
||||
Events <-chan Event
|
||||
|
||||
// eventsChannel is the internal write channel (same underlying channel as Events).
|
||||
eventsChannel chan Event
|
||||
|
||||
groupPattern string
|
||||
keyPattern string
|
||||
registrationID uint64
|
||||
}
|
||||
|
||||
// changeCallbackRegistration keeps the registration ID so unregister can remove
|
||||
// the exact callback later.
|
||||
type changeCallbackRegistration struct {
|
||||
|
|
@ -71,51 +57,62 @@ type changeCallbackRegistration struct {
|
|||
callback func(Event)
|
||||
}
|
||||
|
||||
// Watch("config", "*") can hold 16 pending events before non-blocking sends
|
||||
// start dropping new ones.
|
||||
// Watch("config") can hold 16 pending events before non-blocking sends start
|
||||
// dropping new ones.
|
||||
const watcherEventBufferCapacity = 16
|
||||
|
||||
// Watch registers a buffered subscription for matching mutations.
|
||||
// Usage example: `watcher := storeInstance.Watch("*", "*")`
|
||||
// Usage example: `watcher := storeInstance.Watch("config")`
|
||||
func (storeInstance *Store) Watch(group string, key ...string) *Watcher {
|
||||
keyPattern := "*"
|
||||
if len(key) > 0 && key[0] != "" {
|
||||
keyPattern = key[0]
|
||||
}
|
||||
// Watch registers a buffered subscription for one group.
|
||||
// Usage example: `events := storeInstance.Watch("config")`
|
||||
// Usage example: `events := storeInstance.Watch("*")`
|
||||
func (storeInstance *Store) Watch(group string) <-chan Event {
|
||||
eventChannel := make(chan Event, watcherEventBufferCapacity)
|
||||
watcher := &Watcher{
|
||||
Events: eventChannel,
|
||||
eventsChannel: eventChannel,
|
||||
groupPattern: group,
|
||||
keyPattern: keyPattern,
|
||||
registrationID: atomic.AddUint64(&storeInstance.nextWatcherRegistrationID, 1),
|
||||
}
|
||||
|
||||
storeInstance.watchersLock.Lock()
|
||||
storeInstance.watchers = append(storeInstance.watchers, watcher)
|
||||
if storeInstance.watchers == nil {
|
||||
storeInstance.watchers = make(map[string][]chan Event)
|
||||
}
|
||||
storeInstance.watchers[group] = append(storeInstance.watchers[group], eventChannel)
|
||||
storeInstance.watchersLock.Unlock()
|
||||
|
||||
return watcher
|
||||
return eventChannel
|
||||
}
|
||||
|
||||
// Unwatch removes a watcher and closes its event stream.
|
||||
// Usage example: `storeInstance.Unwatch(watcher)`
|
||||
func (storeInstance *Store) Unwatch(watcher *Watcher) {
|
||||
if watcher == nil {
|
||||
// Unwatch removes a watcher for one group and closes its event stream.
|
||||
// Usage example: `storeInstance.Unwatch("config", events)`
|
||||
func (storeInstance *Store) Unwatch(group string, events <-chan Event) {
|
||||
if events == nil {
|
||||
return
|
||||
}
|
||||
|
||||
storeInstance.watchersLock.Lock()
|
||||
defer storeInstance.watchersLock.Unlock()
|
||||
|
||||
storeInstance.watchers = slices.DeleteFunc(storeInstance.watchers, func(existing *Watcher) bool {
|
||||
if existing.registrationID == watcher.registrationID {
|
||||
close(watcher.eventsChannel)
|
||||
return true
|
||||
registeredEvents := storeInstance.watchers[group]
|
||||
if len(registeredEvents) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
eventsPointer := channelPointer(events)
|
||||
nextRegisteredEvents := registeredEvents[:0]
|
||||
removed := false
|
||||
for _, registeredChannel := range registeredEvents {
|
||||
if channelPointer(registeredChannel) == eventsPointer {
|
||||
if !removed {
|
||||
close(registeredChannel)
|
||||
removed = true
|
||||
}
|
||||
continue
|
||||
}
|
||||
return false
|
||||
})
|
||||
nextRegisteredEvents = append(nextRegisteredEvents, registeredChannel)
|
||||
}
|
||||
if !removed {
|
||||
return
|
||||
}
|
||||
if len(nextRegisteredEvents) == 0 {
|
||||
delete(storeInstance.watchers, group)
|
||||
return
|
||||
}
|
||||
storeInstance.watchers[group] = nextRegisteredEvents
|
||||
}
|
||||
|
||||
// OnChange registers a synchronous mutation callback.
|
||||
|
|
@ -140,9 +137,12 @@ func (storeInstance *Store) OnChange(arguments ...any) func() {
|
|||
once.Do(func() {
|
||||
storeInstance.callbacksLock.Lock()
|
||||
defer storeInstance.callbacksLock.Unlock()
|
||||
storeInstance.callbacks = slices.DeleteFunc(storeInstance.callbacks, func(existing changeCallbackRegistration) bool {
|
||||
return existing.registrationID == registrationID
|
||||
})
|
||||
for i := range storeInstance.callbacks {
|
||||
if storeInstance.callbacks[i].registrationID == registrationID {
|
||||
storeInstance.callbacks = append(storeInstance.callbacks[:i], storeInstance.callbacks[i+1:]...)
|
||||
return
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
@ -188,13 +188,15 @@ func onChangeCallback(arguments []any) func(Event) {
|
|||
// deadlocking.
|
||||
func (storeInstance *Store) notify(event Event) {
|
||||
storeInstance.watchersLock.RLock()
|
||||
for _, watcher := range storeInstance.watchers {
|
||||
if !watcherMatches(watcher, event) {
|
||||
continue
|
||||
}
|
||||
// Non-blocking send: drop the event rather than block the writer.
|
||||
for _, registeredChannel := range storeInstance.watchers["*"] {
|
||||
select {
|
||||
case watcher.eventsChannel <- event:
|
||||
case registeredChannel <- event:
|
||||
default:
|
||||
}
|
||||
}
|
||||
for _, registeredChannel := range storeInstance.watchers[event.Group] {
|
||||
select {
|
||||
case registeredChannel <- event:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
|
@ -209,16 +211,9 @@ func (storeInstance *Store) notify(event Event) {
|
|||
}
|
||||
}
|
||||
|
||||
// watcherMatches reports whether Watch("config", "*") should receive
|
||||
// Event{Group: "config", Key: "colour"}.
|
||||
func watcherMatches(watcher *Watcher, event Event) bool {
|
||||
if watcher.groupPattern != "*" && watcher.groupPattern != event.Group {
|
||||
return false
|
||||
func channelPointer(eventChannel <-chan Event) uintptr {
|
||||
if eventChannel == nil {
|
||||
return 0
|
||||
}
|
||||
if watcher.keyPattern != "*" && watcher.keyPattern != event.Key {
|
||||
// EventDeleteGroup has an empty Key — only wildcard watchers or
|
||||
// group-level watchers (key="*") should receive it.
|
||||
return false
|
||||
}
|
||||
return true
|
||||
return reflect.ValueOf(eventChannel).Pointer()
|
||||
}
|
||||
|
|
|
|||
473
events_test.go
473
events_test.go
|
|
@ -2,7 +2,6 @@ package store
|
|||
|
||||
import (
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
|
@ -11,111 +10,37 @@ import (
|
|||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Watch — specific key
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func TestEvents_Watch_Good_SpecificKey(t *testing.T) {
|
||||
func TestEvents_Watch_Good_Group(t *testing.T) {
|
||||
storeInstance, _ := New(":memory:")
|
||||
defer storeInstance.Close()
|
||||
|
||||
watcher := storeInstance.Watch("config", "theme")
|
||||
defer storeInstance.Unwatch(watcher)
|
||||
|
||||
require.NoError(t, storeInstance.Set("config", "theme", "dark"))
|
||||
|
||||
select {
|
||||
case event := <-watcher.Events:
|
||||
assert.Equal(t, EventSet, event.Type)
|
||||
assert.Equal(t, "config", event.Group)
|
||||
assert.Equal(t, "theme", event.Key)
|
||||
assert.Equal(t, "dark", event.Value)
|
||||
assert.False(t, event.Timestamp.IsZero())
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("timed out waiting for event")
|
||||
}
|
||||
|
||||
// A Set to a different key in the same group should NOT trigger this watcher.
|
||||
require.NoError(t, storeInstance.Set("config", "colour", "blue"))
|
||||
|
||||
select {
|
||||
case event := <-watcher.Events:
|
||||
t.Fatalf("unexpected event for non-matching key: %+v", event)
|
||||
case <-time.After(50 * time.Millisecond):
|
||||
// Expected: no event.
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Watch — wildcard key "*"
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func TestEvents_Watch_Good_WildcardKey(t *testing.T) {
|
||||
storeInstance, _ := New(":memory:")
|
||||
defer storeInstance.Close()
|
||||
|
||||
watcher := storeInstance.Watch("config", "*")
|
||||
defer storeInstance.Unwatch(watcher)
|
||||
events := storeInstance.Watch("config")
|
||||
defer storeInstance.Unwatch("config", events)
|
||||
|
||||
require.NoError(t, storeInstance.Set("config", "theme", "dark"))
|
||||
require.NoError(t, storeInstance.Set("config", "colour", "blue"))
|
||||
|
||||
received := drainEvents(watcher.Events, 2, time.Second)
|
||||
received := drainEvents(events, 2, time.Second)
|
||||
require.Len(t, received, 2)
|
||||
assert.Equal(t, "theme", received[0].Key)
|
||||
assert.Equal(t, "colour", received[1].Key)
|
||||
assert.Equal(t, "config", received[0].Group)
|
||||
assert.Equal(t, "config", received[1].Group)
|
||||
}
|
||||
|
||||
func TestEvents_Watch_Good_DefaultWildcardKey(t *testing.T) {
|
||||
func TestEvents_Watch_Good_WildcardGroup(t *testing.T) {
|
||||
storeInstance, _ := New(":memory:")
|
||||
defer storeInstance.Close()
|
||||
|
||||
watcher := storeInstance.Watch("config")
|
||||
defer storeInstance.Unwatch(watcher)
|
||||
|
||||
require.NoError(t, storeInstance.Set("config", "theme", "dark"))
|
||||
require.NoError(t, storeInstance.Set("config", "colour", "blue"))
|
||||
|
||||
received := drainEvents(watcher.Events, 2, time.Second)
|
||||
require.Len(t, received, 2)
|
||||
assert.Equal(t, "theme", received[0].Key)
|
||||
assert.Equal(t, "colour", received[1].Key)
|
||||
}
|
||||
|
||||
func TestEvents_Watch_Good_GroupMismatch(t *testing.T) {
|
||||
storeInstance, _ := New(":memory:")
|
||||
defer storeInstance.Close()
|
||||
|
||||
watcher := storeInstance.Watch("config", "*")
|
||||
defer storeInstance.Unwatch(watcher)
|
||||
|
||||
require.NoError(t, storeInstance.Set("other", "theme", "dark"))
|
||||
|
||||
select {
|
||||
case event := <-watcher.Events:
|
||||
t.Fatalf("unexpected event for non-matching group: %+v", event)
|
||||
case <-time.After(50 * time.Millisecond):
|
||||
// Expected: no event.
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Watch — wildcard ("*", "*") matches everything
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func TestEvents_Watch_Good_WildcardAll(t *testing.T) {
|
||||
storeInstance, _ := New(":memory:")
|
||||
defer storeInstance.Close()
|
||||
|
||||
watcher := storeInstance.Watch("*", "*")
|
||||
defer storeInstance.Unwatch(watcher)
|
||||
events := storeInstance.Watch("*")
|
||||
defer storeInstance.Unwatch("*", events)
|
||||
|
||||
require.NoError(t, storeInstance.Set("g1", "k1", "v1"))
|
||||
require.NoError(t, storeInstance.Set("g2", "k2", "v2"))
|
||||
require.NoError(t, storeInstance.Delete("g1", "k1"))
|
||||
require.NoError(t, storeInstance.DeleteGroup("g2"))
|
||||
|
||||
received := drainEvents(watcher.Events, 4, time.Second)
|
||||
received := drainEvents(events, 4, time.Second)
|
||||
require.Len(t, received, 4)
|
||||
assert.Equal(t, EventSet, received[0].Type)
|
||||
assert.Equal(t, EventSet, received[1].Type)
|
||||
|
|
@ -123,22 +48,16 @@ func TestEvents_Watch_Good_WildcardAll(t *testing.T) {
|
|||
assert.Equal(t, EventDeleteGroup, received[3].Type)
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Unwatch — stops delivery, channel closed
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func TestEvents_Unwatch_Good_StopsDelivery(t *testing.T) {
|
||||
storeInstance, _ := New(":memory:")
|
||||
defer storeInstance.Close()
|
||||
|
||||
watcher := storeInstance.Watch("g", "k")
|
||||
storeInstance.Unwatch(watcher)
|
||||
events := storeInstance.Watch("g")
|
||||
storeInstance.Unwatch("g", events)
|
||||
|
||||
// Channel should be closed.
|
||||
_, open := <-watcher.Events
|
||||
_, open := <-events
|
||||
assert.False(t, open, "channel should be closed after Unwatch")
|
||||
|
||||
// Set after Unwatch should not panic or block.
|
||||
require.NoError(t, storeInstance.Set("g", "k", "v"))
|
||||
}
|
||||
|
||||
|
|
@ -146,114 +65,65 @@ func TestEvents_Unwatch_Good_Idempotent(t *testing.T) {
|
|||
storeInstance, _ := New(":memory:")
|
||||
defer storeInstance.Close()
|
||||
|
||||
watcher := storeInstance.Watch("g", "k")
|
||||
|
||||
// Calling Unwatch multiple times should not panic.
|
||||
storeInstance.Unwatch(watcher)
|
||||
storeInstance.Unwatch(watcher) // second call is a no-op
|
||||
events := storeInstance.Watch("g")
|
||||
storeInstance.Unwatch("g", events)
|
||||
storeInstance.Unwatch("g", events)
|
||||
}
|
||||
|
||||
func TestEvents_Unwatch_Good_NilWatcher(t *testing.T) {
|
||||
func TestEvents_Unwatch_Good_NilChannel(t *testing.T) {
|
||||
storeInstance, _ := New(":memory:")
|
||||
defer storeInstance.Close()
|
||||
|
||||
storeInstance.Unwatch(nil)
|
||||
storeInstance.Unwatch("g", nil)
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Delete triggers event
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func TestEvents_Watch_Good_DeleteEvent(t *testing.T) {
|
||||
storeInstance, _ := New(":memory:")
|
||||
defer storeInstance.Close()
|
||||
|
||||
watcher := storeInstance.Watch("g", "k")
|
||||
defer storeInstance.Unwatch(watcher)
|
||||
events := storeInstance.Watch("g")
|
||||
defer storeInstance.Unwatch("g", events)
|
||||
|
||||
require.NoError(t, storeInstance.Set("g", "k", "v"))
|
||||
// Drain the Set event.
|
||||
<-watcher.Events
|
||||
<-events
|
||||
|
||||
require.NoError(t, storeInstance.Delete("g", "k"))
|
||||
|
||||
select {
|
||||
case event := <-watcher.Events:
|
||||
case event := <-events:
|
||||
assert.Equal(t, EventDelete, event.Type)
|
||||
assert.Equal(t, "g", event.Group)
|
||||
assert.Equal(t, "k", event.Key)
|
||||
assert.Empty(t, event.Value, "Delete events should have empty Value")
|
||||
assert.Empty(t, event.Value)
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("timed out waiting for delete event")
|
||||
}
|
||||
}
|
||||
|
||||
func TestEvents_Watch_Good_DeleteMissingKeyDoesNotEmitEvent(t *testing.T) {
|
||||
storeInstance, _ := New(":memory:")
|
||||
defer storeInstance.Close()
|
||||
|
||||
watcher := storeInstance.Watch("*", "*")
|
||||
defer storeInstance.Unwatch(watcher)
|
||||
|
||||
require.NoError(t, storeInstance.Delete("g", "missing"))
|
||||
|
||||
select {
|
||||
case event := <-watcher.Events:
|
||||
t.Fatalf("unexpected event for missing key delete: %+v", event)
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// DeleteGroup triggers event
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func TestEvents_Watch_Good_DeleteGroupEvent(t *testing.T) {
|
||||
storeInstance, _ := New(":memory:")
|
||||
defer storeInstance.Close()
|
||||
|
||||
// A wildcard-key watcher for the group should receive DeleteGroup events.
|
||||
watcher := storeInstance.Watch("g", "*")
|
||||
defer storeInstance.Unwatch(watcher)
|
||||
events := storeInstance.Watch("g")
|
||||
defer storeInstance.Unwatch("g", events)
|
||||
|
||||
require.NoError(t, storeInstance.Set("g", "a", "1"))
|
||||
require.NoError(t, storeInstance.Set("g", "b", "2"))
|
||||
// Drain Set events.
|
||||
<-watcher.Events
|
||||
<-watcher.Events
|
||||
<-events
|
||||
<-events
|
||||
|
||||
require.NoError(t, storeInstance.DeleteGroup("g"))
|
||||
|
||||
select {
|
||||
case event := <-watcher.Events:
|
||||
case event := <-events:
|
||||
assert.Equal(t, EventDeleteGroup, event.Type)
|
||||
assert.Equal(t, "g", event.Group)
|
||||
assert.Empty(t, event.Key, "DeleteGroup events should have empty Key")
|
||||
assert.Empty(t, event.Key)
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("timed out waiting for delete_group event")
|
||||
}
|
||||
}
|
||||
|
||||
func TestEvents_Watch_Good_DeleteMissingGroupDoesNotEmitEvent(t *testing.T) {
|
||||
storeInstance, _ := New(":memory:")
|
||||
defer storeInstance.Close()
|
||||
|
||||
watcher := storeInstance.Watch("*", "*")
|
||||
defer storeInstance.Unwatch(watcher)
|
||||
|
||||
require.NoError(t, storeInstance.DeleteGroup("missing"))
|
||||
|
||||
select {
|
||||
case event := <-watcher.Events:
|
||||
t.Fatalf("unexpected event for missing group delete: %+v", event)
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// OnChange — callback fires on mutations
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func TestEvents_OnChange_Good_Fires(t *testing.T) {
|
||||
storeInstance, _ := New(":memory:")
|
||||
defer storeInstance.Close()
|
||||
|
|
@ -278,242 +148,96 @@ func TestEvents_OnChange_Good_Fires(t *testing.T) {
|
|||
assert.Equal(t, EventDelete, events[1].Type)
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// OnChange — unregister stops callback
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func TestEvents_OnChange_Good_Unregister(t *testing.T) {
|
||||
func TestEvents_OnChange_Good_GroupFilter(t *testing.T) {
|
||||
storeInstance, _ := New(":memory:")
|
||||
defer storeInstance.Close()
|
||||
|
||||
var count atomic.Int32
|
||||
|
||||
unregister := storeInstance.OnChange(func(event Event) {
|
||||
count.Add(1)
|
||||
})
|
||||
|
||||
require.NoError(t, storeInstance.Set("g", "k", "v1"))
|
||||
assert.Equal(t, int32(1), count.Load())
|
||||
|
||||
unregister()
|
||||
|
||||
require.NoError(t, storeInstance.Set("g", "k", "v2"))
|
||||
assert.Equal(t, int32(1), count.Load(), "callback should not fire after unregister")
|
||||
|
||||
// Calling unregister again should not panic.
|
||||
unregister()
|
||||
}
|
||||
|
||||
func TestEvents_OnChange_Good_NilCallbackNoOp(t *testing.T) {
|
||||
storeInstance, _ := New(":memory:")
|
||||
defer storeInstance.Close()
|
||||
|
||||
unregister := storeInstance.OnChange(nil)
|
||||
require.NotNil(t, unregister)
|
||||
|
||||
unregister()
|
||||
require.NoError(t, storeInstance.Set("g", "k", "v"))
|
||||
unregister()
|
||||
}
|
||||
|
||||
func TestEvents_OnChange_Good_GroupCallback(t *testing.T) {
|
||||
storeInstance, _ := New(":memory:")
|
||||
defer storeInstance.Close()
|
||||
|
||||
var keys []string
|
||||
var calls []string
|
||||
unregister := storeInstance.OnChange("config", func(key, value string) {
|
||||
keys = append(keys, key)
|
||||
calls = append(calls, key+"="+value)
|
||||
})
|
||||
defer unregister()
|
||||
|
||||
require.NoError(t, storeInstance.Set("config", "theme", "dark"))
|
||||
require.NoError(t, storeInstance.Set("other", "theme", "ignored"))
|
||||
require.NoError(t, storeInstance.Set("other", "theme", "light"))
|
||||
|
||||
assert.Equal(t, []string{"theme"}, keys)
|
||||
assert.Equal(t, []string{"theme=dark"}, calls)
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// OnChange — callback can manage subscriptions while handling an event
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func TestEvents_OnChange_Good_ReentrantSubscriptions(t *testing.T) {
|
||||
func TestEvents_Watch_Good_BufferDrops(t *testing.T) {
|
||||
storeInstance, _ := New(":memory:")
|
||||
defer storeInstance.Close()
|
||||
|
||||
var callbackCount atomic.Int32
|
||||
var unregister func()
|
||||
unregister = storeInstance.OnChange(func(event Event) {
|
||||
callbackCount.Add(1)
|
||||
events := storeInstance.Watch("g")
|
||||
defer storeInstance.Unwatch("g", events)
|
||||
|
||||
nestedWatcher := storeInstance.Watch("nested", "*")
|
||||
storeInstance.Unwatch(nestedWatcher)
|
||||
|
||||
if unregister != nil {
|
||||
unregister()
|
||||
}
|
||||
})
|
||||
|
||||
writeDone := make(chan error, 1)
|
||||
go func() {
|
||||
writeDone <- storeInstance.Set("g", "k", "v")
|
||||
}()
|
||||
|
||||
select {
|
||||
case err := <-writeDone:
|
||||
require.NoError(t, err)
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("timed out waiting for Set to complete")
|
||||
for i := 0; i < watcherEventBufferCapacity+8; i++ {
|
||||
require.NoError(t, storeInstance.Set("g", core.Sprintf("k-%d", i), "v"))
|
||||
}
|
||||
|
||||
assert.Equal(t, int32(1), callbackCount.Load())
|
||||
|
||||
// The callback unregistered itself, so later writes should not increment it.
|
||||
require.NoError(t, storeInstance.Set("g", "k", "v2"))
|
||||
assert.Equal(t, int32(1), callbackCount.Load())
|
||||
received := drainEvents(events, watcherEventBufferCapacity, time.Second)
|
||||
assert.LessOrEqual(t, len(received), watcherEventBufferCapacity)
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Buffer-full doesn't block the writer
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func TestEvents_Watch_Good_BufferFullDoesNotBlock(t *testing.T) {
|
||||
storeInstance, _ := New(":memory:")
|
||||
defer storeInstance.Close()
|
||||
|
||||
watcher := storeInstance.Watch("g", "*")
|
||||
defer storeInstance.Unwatch(watcher)
|
||||
|
||||
// Fill the buffer (cap 16) plus extra writes. None should block.
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
defer close(done)
|
||||
for i := range 32 {
|
||||
require.NoError(t, storeInstance.Set("g", core.Sprintf("k%d", i), "v"))
|
||||
}
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
// Success: all writes completed without blocking.
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Fatal("writes blocked — buffer-full condition caused deadlock")
|
||||
}
|
||||
|
||||
// Drain what we can — should get exactly watcherEventBufferCapacity events.
|
||||
var received int
|
||||
for range watcherEventBufferCapacity {
|
||||
select {
|
||||
case <-watcher.Events:
|
||||
received++
|
||||
default:
|
||||
}
|
||||
}
|
||||
assert.Equal(t, watcherEventBufferCapacity, received, "should receive exactly buffer-size events")
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Multiple watchers on same key
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func TestEvents_Watch_Good_MultipleWatchersSameKey(t *testing.T) {
|
||||
storeInstance, _ := New(":memory:")
|
||||
defer storeInstance.Close()
|
||||
|
||||
firstWatcher := storeInstance.Watch("g", "k")
|
||||
secondWatcher := storeInstance.Watch("g", "k")
|
||||
defer storeInstance.Unwatch(firstWatcher)
|
||||
defer storeInstance.Unwatch(secondWatcher)
|
||||
|
||||
require.NoError(t, storeInstance.Set("g", "k", "v"))
|
||||
|
||||
// Both watchers should receive the event independently.
|
||||
select {
|
||||
case event := <-firstWatcher.Events:
|
||||
assert.Equal(t, EventSet, event.Type)
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("firstWatcher timed out")
|
||||
}
|
||||
|
||||
select {
|
||||
case event := <-secondWatcher.Events:
|
||||
assert.Equal(t, EventSet, event.Type)
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("secondWatcher timed out")
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Concurrent Watch/Unwatch during writes (race test)
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func TestEvents_Watch_Good_ConcurrentWatchUnwatch(t *testing.T) {
|
||||
storeInstance, _ := New(":memory:")
|
||||
defer storeInstance.Close()
|
||||
|
||||
const goroutines = 10
|
||||
const ops = 50
|
||||
const workers = 10
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(workers)
|
||||
|
||||
var waitGroup sync.WaitGroup
|
||||
|
||||
// Writers — continuously mutate the store.
|
||||
waitGroup.Go(func() {
|
||||
for i := range goroutines * ops {
|
||||
_ = storeInstance.Set("g", core.Sprintf("k%d", i), "v")
|
||||
}
|
||||
})
|
||||
|
||||
// Watchers — add and remove watchers concurrently.
|
||||
for range goroutines {
|
||||
waitGroup.Go(func() {
|
||||
for range ops {
|
||||
watcher := storeInstance.Watch("g", "*")
|
||||
// Drain a few events to exercise the channel path.
|
||||
for range 3 {
|
||||
select {
|
||||
case <-watcher.Events:
|
||||
case <-time.After(time.Millisecond):
|
||||
}
|
||||
}
|
||||
storeInstance.Unwatch(watcher)
|
||||
}
|
||||
})
|
||||
for worker := 0; worker < workers; worker++ {
|
||||
go func(worker int) {
|
||||
defer wg.Done()
|
||||
group := core.Sprintf("g-%d", worker)
|
||||
events := storeInstance.Watch(group)
|
||||
_ = storeInstance.Set(group, "k", "v")
|
||||
storeInstance.Unwatch(group, events)
|
||||
}(worker)
|
||||
}
|
||||
|
||||
waitGroup.Wait()
|
||||
// If we got here without a data race or panic, the test passes.
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// ScopedStore events — prefixed group name
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func TestEvents_Watch_Good_ScopedStoreEvents(t *testing.T) {
|
||||
func TestEvents_Watch_Good_ScopedStoreEventGroup(t *testing.T) {
|
||||
storeInstance, _ := New(":memory:")
|
||||
defer storeInstance.Close()
|
||||
|
||||
scopedStore, err := NewScoped(storeInstance, "tenant-a")
|
||||
require.NoError(t, err)
|
||||
scopedStore := NewScoped(storeInstance, "tenant-a")
|
||||
require.NotNil(t, scopedStore)
|
||||
|
||||
// Watch on the underlying store with the full prefixed group name.
|
||||
watcher := storeInstance.Watch("tenant-a:config", "theme")
|
||||
defer storeInstance.Unwatch(watcher)
|
||||
events := storeInstance.Watch("tenant-a:config")
|
||||
defer storeInstance.Unwatch("tenant-a:config", events)
|
||||
|
||||
require.NoError(t, scopedStore.Set("config", "theme", "dark"))
|
||||
|
||||
select {
|
||||
case event := <-watcher.Events:
|
||||
assert.Equal(t, EventSet, event.Type)
|
||||
case event := <-events:
|
||||
assert.Equal(t, "tenant-a:config", event.Group)
|
||||
assert.Equal(t, "theme", event.Key)
|
||||
assert.Equal(t, "dark", event.Value)
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("timed out waiting for scoped store event")
|
||||
t.Fatal("timed out waiting for scoped event")
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// EventType.String()
|
||||
// ---------------------------------------------------------------------------
|
||||
func TestEvents_Watch_Good_SetWithTTL(t *testing.T) {
|
||||
storeInstance, _ := New(":memory:")
|
||||
defer storeInstance.Close()
|
||||
|
||||
events := storeInstance.Watch("g")
|
||||
defer storeInstance.Unwatch("g", events)
|
||||
|
||||
require.NoError(t, storeInstance.SetWithTTL("g", "ephemeral", "v", time.Minute))
|
||||
|
||||
select {
|
||||
case event := <-events:
|
||||
assert.Equal(t, EventSet, event.Type)
|
||||
assert.Equal(t, "ephemeral", event.Key)
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("timed out waiting for TTL event")
|
||||
}
|
||||
}
|
||||
|
||||
func TestEvents_EventType_Good_String(t *testing.T) {
|
||||
assert.Equal(t, "set", EventSet.String())
|
||||
|
|
@ -522,45 +246,16 @@ func TestEvents_EventType_Good_String(t *testing.T) {
|
|||
assert.Equal(t, "unknown", EventType(99).String())
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// SetWithTTL emits events
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func TestEvents_Watch_Good_SetWithTTLEmitsEvent(t *testing.T) {
|
||||
storeInstance, _ := New(":memory:")
|
||||
defer storeInstance.Close()
|
||||
|
||||
watcher := storeInstance.Watch("g", "k")
|
||||
defer storeInstance.Unwatch(watcher)
|
||||
|
||||
require.NoError(t, storeInstance.SetWithTTL("g", "k", "ttl-val", time.Hour))
|
||||
|
||||
select {
|
||||
case event := <-watcher.Events:
|
||||
assert.Equal(t, EventSet, event.Type)
|
||||
assert.Equal(t, "g", event.Group)
|
||||
assert.Equal(t, "k", event.Key)
|
||||
assert.Equal(t, "ttl-val", event.Value)
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("timed out waiting for SetWithTTL event")
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Helpers
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
// drainEvents collects up to n events from ch within the given timeout.
|
||||
func drainEvents(ch <-chan Event, count int, timeout time.Duration) []Event {
|
||||
var events []Event
|
||||
func drainEvents(events <-chan Event, count int, timeout time.Duration) []Event {
|
||||
received := make([]Event, 0, count)
|
||||
deadline := time.After(timeout)
|
||||
for range count {
|
||||
for len(received) < count {
|
||||
select {
|
||||
case event := <-ch:
|
||||
events = append(events, event)
|
||||
case event := <-events:
|
||||
received = append(received, event)
|
||||
case <-deadline:
|
||||
return events
|
||||
return received
|
||||
}
|
||||
}
|
||||
return events
|
||||
return received
|
||||
}
|
||||
|
|
|
|||
23
scope.go
23
scope.go
|
|
@ -23,7 +23,7 @@ type QuotaConfig struct {
|
|||
}
|
||||
|
||||
// ScopedStore prefixes group names with namespace + ":" before delegating to Store.
|
||||
// Usage example: `scopedStore, err := store.NewScoped(storeInstance, "tenant-a"); if err != nil { return }; if err := scopedStore.Set("config", "colour", "blue"); err != nil { return }`
|
||||
// Usage example: `scopedStore := store.NewScoped(storeInstance, "tenant-a"); if scopedStore == nil { return }; if err := scopedStore.Set("config", "colour", "blue"); err != nil { return }`
|
||||
type ScopedStore struct {
|
||||
storeInstance *Store
|
||||
namespace string
|
||||
|
|
@ -32,24 +32,27 @@ type ScopedStore struct {
|
|||
}
|
||||
|
||||
// NewScoped validates a namespace and prefixes groups with namespace + ":".
|
||||
// Usage example: `scopedStore, err := store.NewScoped(storeInstance, "tenant-a"); if err != nil { return }`
|
||||
func NewScoped(storeInstance *Store, namespace string) (*ScopedStore, error) {
|
||||
// Usage example: `scopedStore := store.NewScoped(storeInstance, "tenant-a")`
|
||||
func NewScoped(storeInstance *Store, namespace string) *ScopedStore {
|
||||
if storeInstance == nil {
|
||||
return nil, core.E("store.NewScoped", "store instance is nil", nil)
|
||||
return nil
|
||||
}
|
||||
if !validNamespace.MatchString(namespace) {
|
||||
return nil, core.E("store.NewScoped", core.Sprintf("namespace %q is invalid; use names like %q or %q", namespace, "tenant-a", "tenant-42"), nil)
|
||||
return nil
|
||||
}
|
||||
scopedStore := &ScopedStore{storeInstance: storeInstance, namespace: namespace}
|
||||
return scopedStore, nil
|
||||
return scopedStore
|
||||
}
|
||||
|
||||
// NewScopedWithQuota adds per-namespace key and group limits.
|
||||
// Usage example: `scopedStore, err := store.NewScopedWithQuota(storeInstance, "tenant-a", store.QuotaConfig{MaxKeys: 100, MaxGroups: 10}); if err != nil { return }`
|
||||
func NewScopedWithQuota(storeInstance *Store, namespace string, quota QuotaConfig) (*ScopedStore, error) {
|
||||
scopedStore, err := NewScoped(storeInstance, namespace)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
scopedStore := NewScoped(storeInstance, namespace)
|
||||
if scopedStore == nil {
|
||||
if storeInstance == nil {
|
||||
return nil, core.E("store.NewScopedWithQuota", "store instance is nil", nil)
|
||||
}
|
||||
return nil, core.E("store.NewScopedWithQuota", core.Sprintf("namespace %q is invalid; use names like %q or %q", namespace, "tenant-a", "tenant-42"), nil)
|
||||
}
|
||||
if quota.MaxKeys < 0 || quota.MaxGroups < 0 {
|
||||
return nil, core.E(
|
||||
|
|
@ -80,7 +83,7 @@ func (scopedStore *ScopedStore) trimNamespacePrefix(groupName string) string {
|
|||
}
|
||||
|
||||
// Namespace returns the namespace string.
|
||||
// Usage example: `scopedStore, err := store.NewScoped(storeInstance, "tenant-a"); if err != nil { return }; namespace := scopedStore.Namespace(); fmt.Println(namespace)`
|
||||
// Usage example: `scopedStore := store.NewScoped(storeInstance, "tenant-a"); if scopedStore == nil { return }; namespace := scopedStore.Namespace(); fmt.Println(namespace)`
|
||||
func (scopedStore *ScopedStore) Namespace() string {
|
||||
return scopedStore.namespace
|
||||
}
|
||||
|
|
|
|||
248
scope_test.go
248
scope_test.go
|
|
@ -9,6 +9,14 @@ import (
|
|||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func mustScoped(t *testing.T, storeInstance *Store, namespace string) *ScopedStore {
|
||||
t.Helper()
|
||||
|
||||
scopedStore := NewScoped(storeInstance, namespace)
|
||||
require.NotNil(t, scopedStore)
|
||||
return scopedStore
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// NewScoped — constructor validation
|
||||
// ---------------------------------------------------------------------------
|
||||
|
|
@ -17,9 +25,7 @@ func TestScope_NewScoped_Good(t *testing.T) {
|
|||
storeInstance, _ := New(":memory:")
|
||||
defer storeInstance.Close()
|
||||
|
||||
scopedStore, err := NewScoped(storeInstance, "tenant-1")
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, scopedStore)
|
||||
scopedStore := mustScoped(t, storeInstance, "tenant-1")
|
||||
assert.Equal(t, "tenant-1", scopedStore.Namespace())
|
||||
}
|
||||
|
||||
|
|
@ -27,11 +33,8 @@ func TestScope_NewScoped_Good_AlphanumericHyphens(t *testing.T) {
|
|||
storeInstance, _ := New(":memory:")
|
||||
defer storeInstance.Close()
|
||||
|
||||
valid := []string{"abc", "ABC", "123", "a-b-c", "tenant-42", "A1-B2"}
|
||||
for _, namespace := range valid {
|
||||
scopedStore, err := NewScoped(storeInstance, namespace)
|
||||
require.NoError(t, err, "namespace %q should be valid", namespace)
|
||||
require.NotNil(t, scopedStore)
|
||||
for _, namespace := range []string{"abc", "ABC", "123", "a-b-c", "tenant-42", "A1-B2"} {
|
||||
require.NotNil(t, NewScoped(storeInstance, namespace), "namespace %q should be valid", namespace)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -39,25 +42,19 @@ func TestScope_NewScoped_Bad_Empty(t *testing.T) {
|
|||
storeInstance, _ := New(":memory:")
|
||||
defer storeInstance.Close()
|
||||
|
||||
_, err := NewScoped(storeInstance, "")
|
||||
require.Error(t, err)
|
||||
assert.Contains(t, err.Error(), "invalid")
|
||||
assert.Nil(t, NewScoped(storeInstance, ""))
|
||||
}
|
||||
|
||||
func TestScope_NewScoped_Bad_NilStore(t *testing.T) {
|
||||
_, err := NewScoped(nil, "tenant-a")
|
||||
require.Error(t, err)
|
||||
assert.Contains(t, err.Error(), "store instance is nil")
|
||||
assert.Nil(t, NewScoped(nil, "tenant-a"))
|
||||
}
|
||||
|
||||
func TestScope_NewScoped_Bad_InvalidChars(t *testing.T) {
|
||||
storeInstance, _ := New(":memory:")
|
||||
defer storeInstance.Close()
|
||||
|
||||
invalid := []string{"foo.bar", "foo:bar", "foo bar", "foo/bar", "foo_bar", "tenant!", "@ns"}
|
||||
for _, namespace := range invalid {
|
||||
_, err := NewScoped(storeInstance, namespace)
|
||||
require.Error(t, err, "namespace %q should be invalid", namespace)
|
||||
for _, namespace := range []string{"foo.bar", "foo:bar", "foo bar", "foo/bar", "foo_bar", "tenant!", "@ns"} {
|
||||
assert.Nil(t, NewScoped(storeInstance, namespace), "namespace %q should be invalid", namespace)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -67,7 +64,7 @@ func TestScope_NewScopedWithQuota_Bad_InvalidNamespace(t *testing.T) {
|
|||
|
||||
_, err := NewScopedWithQuota(storeInstance, "tenant_a", QuotaConfig{MaxKeys: 1})
|
||||
require.Error(t, err)
|
||||
assert.Contains(t, err.Error(), "store.NewScoped")
|
||||
assert.Contains(t, err.Error(), "namespace")
|
||||
}
|
||||
|
||||
func TestScope_NewScopedWithQuota_Bad_NilStore(t *testing.T) {
|
||||
|
|
@ -113,7 +110,7 @@ func TestScope_ScopedStore_Good_SetGet(t *testing.T) {
|
|||
storeInstance, _ := New(":memory:")
|
||||
defer storeInstance.Close()
|
||||
|
||||
scopedStore, _ := NewScoped(storeInstance, "tenant-a")
|
||||
scopedStore := mustScoped(t, storeInstance, "tenant-a")
|
||||
require.NoError(t, scopedStore.Set("config", "theme", "dark"))
|
||||
|
||||
value, err := scopedStore.Get("config", "theme")
|
||||
|
|
@ -125,7 +122,7 @@ func TestScope_ScopedStore_Good_DefaultGroupHelpers(t *testing.T) {
|
|||
storeInstance, _ := New(":memory:")
|
||||
defer storeInstance.Close()
|
||||
|
||||
scopedStore, _ := NewScoped(storeInstance, "tenant-a")
|
||||
scopedStore := mustScoped(t, storeInstance, "tenant-a")
|
||||
require.NoError(t, scopedStore.Set("theme", "dark"))
|
||||
|
||||
value, err := scopedStore.Get("theme")
|
||||
|
|
@ -141,7 +138,7 @@ func TestScope_ScopedStore_Good_SetInGetFrom(t *testing.T) {
|
|||
storeInstance, _ := New(":memory:")
|
||||
defer storeInstance.Close()
|
||||
|
||||
scopedStore, _ := NewScoped(storeInstance, "tenant-a")
|
||||
scopedStore := mustScoped(t, storeInstance, "tenant-a")
|
||||
require.NoError(t, scopedStore.SetIn("config", "theme", "dark"))
|
||||
|
||||
value, err := scopedStore.GetFrom("config", "theme")
|
||||
|
|
@ -153,15 +150,13 @@ func TestScope_ScopedStore_Good_PrefixedInUnderlyingStore(t *testing.T) {
|
|||
storeInstance, _ := New(":memory:")
|
||||
defer storeInstance.Close()
|
||||
|
||||
scopedStore, _ := NewScoped(storeInstance, "tenant-a")
|
||||
scopedStore := mustScoped(t, storeInstance, "tenant-a")
|
||||
require.NoError(t, scopedStore.Set("config", "key", "val"))
|
||||
|
||||
// The underlying store should have the prefixed group name.
|
||||
value, err := storeInstance.Get("tenant-a:config", "key")
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "val", value)
|
||||
|
||||
// Direct access without prefix should fail.
|
||||
_, err = storeInstance.Get("config", "key")
|
||||
assert.True(t, core.Is(err, NotFoundError))
|
||||
}
|
||||
|
|
@ -170,8 +165,8 @@ func TestScope_ScopedStore_Good_NamespaceIsolation(t *testing.T) {
|
|||
storeInstance, _ := New(":memory:")
|
||||
defer storeInstance.Close()
|
||||
|
||||
alphaStore, _ := NewScoped(storeInstance, "tenant-a")
|
||||
betaStore, _ := NewScoped(storeInstance, "tenant-b")
|
||||
alphaStore := mustScoped(t, storeInstance, "tenant-a")
|
||||
betaStore := mustScoped(t, storeInstance, "tenant-b")
|
||||
|
||||
require.NoError(t, alphaStore.Set("config", "colour", "blue"))
|
||||
require.NoError(t, betaStore.Set("config", "colour", "red"))
|
||||
|
|
@ -189,7 +184,7 @@ func TestScope_ScopedStore_Good_Delete(t *testing.T) {
|
|||
storeInstance, _ := New(":memory:")
|
||||
defer storeInstance.Close()
|
||||
|
||||
scopedStore, _ := NewScoped(storeInstance, "tenant-a")
|
||||
scopedStore := mustScoped(t, storeInstance, "tenant-a")
|
||||
require.NoError(t, scopedStore.Set("g", "k", "v"))
|
||||
require.NoError(t, scopedStore.Delete("g", "k"))
|
||||
|
||||
|
|
@ -201,7 +196,7 @@ func TestScope_ScopedStore_Good_DeleteGroup(t *testing.T) {
|
|||
storeInstance, _ := New(":memory:")
|
||||
defer storeInstance.Close()
|
||||
|
||||
scopedStore, _ := NewScoped(storeInstance, "tenant-a")
|
||||
scopedStore := mustScoped(t, storeInstance, "tenant-a")
|
||||
require.NoError(t, scopedStore.Set("g", "a", "1"))
|
||||
require.NoError(t, scopedStore.Set("g", "b", "2"))
|
||||
require.NoError(t, scopedStore.DeleteGroup("g"))
|
||||
|
|
@ -215,8 +210,8 @@ func TestScope_ScopedStore_Good_GetAll(t *testing.T) {
|
|||
storeInstance, _ := New(":memory:")
|
||||
defer storeInstance.Close()
|
||||
|
||||
alphaStore, _ := NewScoped(storeInstance, "tenant-a")
|
||||
betaStore, _ := NewScoped(storeInstance, "tenant-b")
|
||||
alphaStore := mustScoped(t, storeInstance, "tenant-a")
|
||||
betaStore := mustScoped(t, storeInstance, "tenant-b")
|
||||
|
||||
require.NoError(t, alphaStore.Set("items", "x", "1"))
|
||||
require.NoError(t, alphaStore.Set("items", "y", "2"))
|
||||
|
|
@ -235,7 +230,7 @@ func TestScope_ScopedStore_Good_All(t *testing.T) {
|
|||
storeInstance, _ := New(":memory:")
|
||||
defer storeInstance.Close()
|
||||
|
||||
scopedStore, _ := NewScoped(storeInstance, "tenant-a")
|
||||
scopedStore := mustScoped(t, storeInstance, "tenant-a")
|
||||
require.NoError(t, scopedStore.Set("items", "first", "1"))
|
||||
require.NoError(t, scopedStore.Set("items", "second", "2"))
|
||||
|
||||
|
|
@ -252,7 +247,7 @@ func TestScope_ScopedStore_Good_All_SortedByKey(t *testing.T) {
|
|||
storeInstance, _ := New(":memory:")
|
||||
defer storeInstance.Close()
|
||||
|
||||
scopedStore, _ := NewScoped(storeInstance, "tenant-a")
|
||||
scopedStore := mustScoped(t, storeInstance, "tenant-a")
|
||||
require.NoError(t, scopedStore.Set("items", "charlie", "3"))
|
||||
require.NoError(t, scopedStore.Set("items", "alpha", "1"))
|
||||
require.NoError(t, scopedStore.Set("items", "bravo", "2"))
|
||||
|
|
@ -270,7 +265,7 @@ func TestScope_ScopedStore_Good_Count(t *testing.T) {
|
|||
storeInstance, _ := New(":memory:")
|
||||
defer storeInstance.Close()
|
||||
|
||||
scopedStore, _ := NewScoped(storeInstance, "tenant-a")
|
||||
scopedStore := mustScoped(t, storeInstance, "tenant-a")
|
||||
require.NoError(t, scopedStore.Set("g", "a", "1"))
|
||||
require.NoError(t, scopedStore.Set("g", "b", "2"))
|
||||
|
||||
|
|
@ -283,7 +278,7 @@ func TestScope_ScopedStore_Good_SetWithTTL(t *testing.T) {
|
|||
storeInstance, _ := New(":memory:")
|
||||
defer storeInstance.Close()
|
||||
|
||||
scopedStore, _ := NewScoped(storeInstance, "tenant-a")
|
||||
scopedStore := mustScoped(t, storeInstance, "tenant-a")
|
||||
require.NoError(t, scopedStore.SetWithTTL("g", "k", "v", time.Hour))
|
||||
|
||||
value, err := scopedStore.Get("g", "k")
|
||||
|
|
@ -295,7 +290,7 @@ func TestScope_ScopedStore_Good_SetWithTTL_Expires(t *testing.T) {
|
|||
storeInstance, _ := New(":memory:")
|
||||
defer storeInstance.Close()
|
||||
|
||||
scopedStore, _ := NewScoped(storeInstance, "tenant-a")
|
||||
scopedStore := mustScoped(t, storeInstance, "tenant-a")
|
||||
require.NoError(t, scopedStore.SetWithTTL("g", "k", "v", 1*time.Millisecond))
|
||||
time.Sleep(5 * time.Millisecond)
|
||||
|
||||
|
|
@ -307,7 +302,7 @@ func TestScope_ScopedStore_Good_Render(t *testing.T) {
|
|||
storeInstance, _ := New(":memory:")
|
||||
defer storeInstance.Close()
|
||||
|
||||
scopedStore, _ := NewScoped(storeInstance, "tenant-a")
|
||||
scopedStore := mustScoped(t, storeInstance, "tenant-a")
|
||||
require.NoError(t, scopedStore.Set("user", "name", "Alice"))
|
||||
|
||||
renderedTemplate, err := scopedStore.Render("Hello {{ .name }}", "user")
|
||||
|
|
@ -319,8 +314,8 @@ func TestScope_ScopedStore_Good_BulkHelpers(t *testing.T) {
|
|||
storeInstance, _ := New(":memory:")
|
||||
defer storeInstance.Close()
|
||||
|
||||
alphaStore, _ := NewScoped(storeInstance, "tenant-a")
|
||||
betaStore, _ := NewScoped(storeInstance, "tenant-b")
|
||||
alphaStore := mustScoped(t, storeInstance, "tenant-a")
|
||||
betaStore := mustScoped(t, storeInstance, "tenant-b")
|
||||
|
||||
require.NoError(t, alphaStore.Set("config", "colour", "blue"))
|
||||
require.NoError(t, alphaStore.Set("sessions", "token", "abc123"))
|
||||
|
|
@ -361,7 +356,7 @@ func TestScope_ScopedStore_Good_GroupsSeqStopsEarly(t *testing.T) {
|
|||
storeInstance, _ := New(":memory:")
|
||||
defer storeInstance.Close()
|
||||
|
||||
scopedStore, _ := NewScoped(storeInstance, "tenant-a")
|
||||
scopedStore := mustScoped(t, storeInstance, "tenant-a")
|
||||
require.NoError(t, scopedStore.Set("alpha", "a", "1"))
|
||||
require.NoError(t, scopedStore.Set("beta", "b", "2"))
|
||||
|
||||
|
|
@ -380,7 +375,7 @@ func TestScope_ScopedStore_Good_GroupsSeqSorted(t *testing.T) {
|
|||
storeInstance, _ := New(":memory:")
|
||||
defer storeInstance.Close()
|
||||
|
||||
scopedStore, _ := NewScoped(storeInstance, "tenant-a")
|
||||
scopedStore := mustScoped(t, storeInstance, "tenant-a")
|
||||
require.NoError(t, scopedStore.Set("charlie", "c", "3"))
|
||||
require.NoError(t, scopedStore.Set("alpha", "a", "1"))
|
||||
require.NoError(t, scopedStore.Set("bravo", "b", "2"))
|
||||
|
|
@ -398,7 +393,7 @@ func TestScope_ScopedStore_Good_GetSplitAndGetFields(t *testing.T) {
|
|||
storeInstance, _ := New(":memory:")
|
||||
defer storeInstance.Close()
|
||||
|
||||
scopedStore, _ := NewScoped(storeInstance, "tenant-a")
|
||||
scopedStore := mustScoped(t, storeInstance, "tenant-a")
|
||||
require.NoError(t, scopedStore.Set("config", "hosts", "alpha,beta,gamma"))
|
||||
require.NoError(t, scopedStore.Set("config", "flags", "one two\tthree\n"))
|
||||
|
||||
|
|
@ -425,7 +420,7 @@ func TestScope_ScopedStore_Good_PurgeExpired(t *testing.T) {
|
|||
storeInstance, _ := New(":memory:")
|
||||
defer storeInstance.Close()
|
||||
|
||||
scopedStore, _ := NewScoped(storeInstance, "tenant-a")
|
||||
scopedStore := mustScoped(t, storeInstance, "tenant-a")
|
||||
require.NoError(t, scopedStore.SetWithTTL("session", "token", "abc123", 1*time.Millisecond))
|
||||
time.Sleep(5 * time.Millisecond)
|
||||
|
||||
|
|
@ -441,8 +436,8 @@ func TestScope_ScopedStore_Good_PurgeExpired_NamespaceLocal(t *testing.T) {
|
|||
storeInstance, _ := New(":memory:")
|
||||
defer storeInstance.Close()
|
||||
|
||||
alphaStore, _ := NewScoped(storeInstance, "tenant-a")
|
||||
betaStore, _ := NewScoped(storeInstance, "tenant-b")
|
||||
alphaStore := mustScoped(t, storeInstance, "tenant-a")
|
||||
betaStore := mustScoped(t, storeInstance, "tenant-b")
|
||||
|
||||
require.NoError(t, alphaStore.SetWithTTL("session", "alpha-token", "alpha", 1*time.Millisecond))
|
||||
require.NoError(t, betaStore.SetWithTTL("session", "beta-token", "beta", 1*time.Millisecond))
|
||||
|
|
@ -470,15 +465,13 @@ func TestScope_Quota_Good_MaxKeys(t *testing.T) {
|
|||
scopedStore, err := NewScopedWithQuota(storeInstance, "tenant-a", QuotaConfig{MaxKeys: 5})
|
||||
require.NoError(t, err)
|
||||
|
||||
// Insert 5 keys across different groups — should be fine.
|
||||
for i := range 5 {
|
||||
require.NoError(t, scopedStore.Set("g", keyName(i), "v"))
|
||||
}
|
||||
|
||||
// 6th key should fail.
|
||||
err = scopedStore.Set("g", "overflow", "v")
|
||||
require.Error(t, err)
|
||||
assert.True(t, core.Is(err, QuotaExceededError), "expected QuotaExceededError, got: %v", err)
|
||||
assert.True(t, core.Is(err, QuotaExceededError))
|
||||
}
|
||||
|
||||
func TestScope_Quota_Bad_QuotaCheckQueryError(t *testing.T) {
|
||||
|
|
@ -508,7 +501,6 @@ func TestScope_Quota_Good_MaxKeys_AcrossGroups(t *testing.T) {
|
|||
require.NoError(t, scopedStore.Set("g2", "b", "2"))
|
||||
require.NoError(t, scopedStore.Set("g3", "c", "3"))
|
||||
|
||||
// Total is now 3 — any new key should fail regardless of group.
|
||||
err := scopedStore.Set("g4", "d", "4")
|
||||
assert.True(t, core.Is(err, QuotaExceededError))
|
||||
}
|
||||
|
|
@ -522,8 +514,6 @@ func TestScope_Quota_Good_UpsertDoesNotCount(t *testing.T) {
|
|||
require.NoError(t, scopedStore.Set("g", "a", "1"))
|
||||
require.NoError(t, scopedStore.Set("g", "b", "2"))
|
||||
require.NoError(t, scopedStore.Set("g", "c", "3"))
|
||||
|
||||
// Upserting existing key should succeed.
|
||||
require.NoError(t, scopedStore.Set("g", "a", "updated"))
|
||||
|
||||
value, err := scopedStore.Get("g", "a")
|
||||
|
|
@ -540,8 +530,6 @@ func TestScope_Quota_Good_DeleteAndReInsert(t *testing.T) {
|
|||
require.NoError(t, scopedStore.Set("g", "a", "1"))
|
||||
require.NoError(t, scopedStore.Set("g", "b", "2"))
|
||||
require.NoError(t, scopedStore.Set("g", "c", "3"))
|
||||
|
||||
// Delete one key, then insert a new one — should work.
|
||||
require.NoError(t, scopedStore.Delete("g", "c"))
|
||||
require.NoError(t, scopedStore.Set("g", "d", "4"))
|
||||
}
|
||||
|
|
@ -552,7 +540,6 @@ func TestScope_Quota_Good_ZeroMeansUnlimited(t *testing.T) {
|
|||
|
||||
scopedStore, _ := NewScopedWithQuota(storeInstance, "tenant-a", QuotaConfig{MaxKeys: 0, MaxGroups: 0})
|
||||
|
||||
// Should be able to insert many keys and groups without error.
|
||||
for i := range 100 {
|
||||
require.NoError(t, scopedStore.Set("g", keyName(i), "v"))
|
||||
}
|
||||
|
|
@ -564,18 +551,15 @@ func TestScope_Quota_Good_ExpiredKeysExcluded(t *testing.T) {
|
|||
|
||||
scopedStore, _ := NewScopedWithQuota(storeInstance, "tenant-a", QuotaConfig{MaxKeys: 3})
|
||||
|
||||
// Insert 3 keys, 2 with short TTL.
|
||||
require.NoError(t, scopedStore.SetWithTTL("g", "temp1", "v", 1*time.Millisecond))
|
||||
require.NoError(t, scopedStore.SetWithTTL("g", "temp2", "v", 1*time.Millisecond))
|
||||
require.NoError(t, scopedStore.Set("g", "permanent", "v"))
|
||||
|
||||
time.Sleep(5 * time.Millisecond)
|
||||
|
||||
// After expiry, only 1 key counts — should be able to insert 2 more.
|
||||
require.NoError(t, scopedStore.Set("g", "new1", "v"))
|
||||
require.NoError(t, scopedStore.Set("g", "new2", "v"))
|
||||
|
||||
// Now at 3 — next should fail.
|
||||
err := scopedStore.Set("g", "new3", "v")
|
||||
assert.True(t, core.Is(err, QuotaExceededError))
|
||||
}
|
||||
|
|
@ -607,7 +591,6 @@ func TestScope_Quota_Good_MaxGroups(t *testing.T) {
|
|||
require.NoError(t, scopedStore.Set("g2", "k", "v"))
|
||||
require.NoError(t, scopedStore.Set("g3", "k", "v"))
|
||||
|
||||
// 4th group should fail.
|
||||
err := scopedStore.Set("g4", "k", "v")
|
||||
require.Error(t, err)
|
||||
assert.True(t, core.Is(err, QuotaExceededError))
|
||||
|
|
@ -621,8 +604,6 @@ func TestScope_Quota_Good_MaxGroups_ExistingGroupOK(t *testing.T) {
|
|||
|
||||
require.NoError(t, scopedStore.Set("g1", "a", "1"))
|
||||
require.NoError(t, scopedStore.Set("g2", "b", "2"))
|
||||
|
||||
// Adding more keys to existing groups should be fine.
|
||||
require.NoError(t, scopedStore.Set("g1", "c", "3"))
|
||||
require.NoError(t, scopedStore.Set("g2", "d", "4"))
|
||||
}
|
||||
|
|
@ -635,8 +616,6 @@ func TestScope_Quota_Good_MaxGroups_DeleteAndRecreate(t *testing.T) {
|
|||
|
||||
require.NoError(t, scopedStore.Set("g1", "k", "v"))
|
||||
require.NoError(t, scopedStore.Set("g2", "k", "v"))
|
||||
|
||||
// Delete a group, then create a new one.
|
||||
require.NoError(t, scopedStore.DeleteGroup("g1"))
|
||||
require.NoError(t, scopedStore.Set("g3", "k", "v"))
|
||||
}
|
||||
|
|
@ -658,13 +637,11 @@ func TestScope_Quota_Good_MaxGroups_ExpiredGroupExcluded(t *testing.T) {
|
|||
|
||||
scopedStore, _ := NewScopedWithQuota(storeInstance, "tenant-a", QuotaConfig{MaxGroups: 2})
|
||||
|
||||
// Create 2 groups, one with only TTL keys.
|
||||
require.NoError(t, scopedStore.SetWithTTL("g1", "k", "v", 1*time.Millisecond))
|
||||
require.NoError(t, scopedStore.Set("g2", "k", "v"))
|
||||
|
||||
time.Sleep(5 * time.Millisecond)
|
||||
|
||||
// g1's only key has expired, so group count should be 1 — we can create a new one.
|
||||
require.NoError(t, scopedStore.Set("g3", "k", "v"))
|
||||
}
|
||||
|
||||
|
|
@ -677,11 +654,9 @@ func TestScope_Quota_Good_BothLimits(t *testing.T) {
|
|||
require.NoError(t, scopedStore.Set("g1", "a", "1"))
|
||||
require.NoError(t, scopedStore.Set("g2", "b", "2"))
|
||||
|
||||
// Group limit hit.
|
||||
err := scopedStore.Set("g3", "c", "3")
|
||||
assert.True(t, core.Is(err, QuotaExceededError))
|
||||
|
||||
// But adding to existing groups is fine (within key limit).
|
||||
require.NoError(t, scopedStore.Set("g1", "d", "4"))
|
||||
}
|
||||
|
||||
|
|
@ -697,11 +672,9 @@ func TestScope_Quota_Good_DoesNotAffectOtherNamespaces(t *testing.T) {
|
|||
require.NoError(t, betaStore.Set("g", "b1", "v"))
|
||||
require.NoError(t, betaStore.Set("g", "b2", "v"))
|
||||
|
||||
// alphaStore is at limit — but betaStore's keys don't count against alphaStore.
|
||||
err := alphaStore.Set("g", "a3", "v")
|
||||
assert.True(t, core.Is(err, QuotaExceededError))
|
||||
|
||||
// betaStore is also at limit independently.
|
||||
err = betaStore.Set("g", "b3", "v")
|
||||
assert.True(t, core.Is(err, QuotaExceededError))
|
||||
}
|
||||
|
|
@ -732,21 +705,15 @@ func TestScope_CountAll_Good_WithPrefix_Wildcards(t *testing.T) {
|
|||
storeInstance, _ := New(":memory:")
|
||||
defer storeInstance.Close()
|
||||
|
||||
// Add keys in groups that look like wildcards.
|
||||
require.NoError(t, storeInstance.Set("user_1", "k", "v"))
|
||||
require.NoError(t, storeInstance.Set("user_2", "k", "v"))
|
||||
require.NoError(t, storeInstance.Set("user%test", "k", "v"))
|
||||
require.NoError(t, storeInstance.Set("user_test", "k", "v"))
|
||||
|
||||
// Prefix "user_" should ONLY match groups starting with "user_".
|
||||
// Since we escape "_", it matches literal "_".
|
||||
// Groups: "user_1", "user_2", "user_test" (3 total).
|
||||
// "user%test" is NOT matched because "_" is literal.
|
||||
count, err := storeInstance.CountAll("user_")
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 3, count)
|
||||
|
||||
// Prefix "user%" should ONLY match "user%test".
|
||||
count, err = storeInstance.CountAll("user%")
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 1, count)
|
||||
|
|
@ -764,36 +731,6 @@ func TestScope_CountAll_Good_EmptyPrefix(t *testing.T) {
|
|||
assert.Equal(t, 2, count)
|
||||
}
|
||||
|
||||
func TestScope_CountAll_Good_ExcludesExpired(t *testing.T) {
|
||||
storeInstance, _ := New(":memory:")
|
||||
defer storeInstance.Close()
|
||||
|
||||
require.NoError(t, storeInstance.Set("ns:g", "permanent", "v"))
|
||||
require.NoError(t, storeInstance.SetWithTTL("ns:g", "temp", "v", 1*time.Millisecond))
|
||||
time.Sleep(5 * time.Millisecond)
|
||||
|
||||
count, err := storeInstance.CountAll("ns:")
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 1, count, "expired keys should not be counted")
|
||||
}
|
||||
|
||||
func TestScope_CountAll_Good_Empty(t *testing.T) {
|
||||
storeInstance, _ := New(":memory:")
|
||||
defer storeInstance.Close()
|
||||
|
||||
count, err := storeInstance.CountAll("nonexistent:")
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 0, count)
|
||||
}
|
||||
|
||||
func TestScope_CountAll_Bad_ClosedStore(t *testing.T) {
|
||||
storeInstance, _ := New(":memory:")
|
||||
storeInstance.Close()
|
||||
|
||||
_, err := storeInstance.CountAll("")
|
||||
require.Error(t, err)
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Groups
|
||||
// ---------------------------------------------------------------------------
|
||||
|
|
@ -802,106 +739,57 @@ func TestScope_Groups_Good_WithPrefix(t *testing.T) {
|
|||
storeInstance, _ := New(":memory:")
|
||||
defer storeInstance.Close()
|
||||
|
||||
require.NoError(t, storeInstance.Set("ns-a:g1", "k", "v"))
|
||||
require.NoError(t, storeInstance.Set("ns-a:g2", "k", "v"))
|
||||
require.NoError(t, storeInstance.Set("ns-a:g2", "k2", "v")) // duplicate group
|
||||
require.NoError(t, storeInstance.Set("ns-b:g1", "k", "v"))
|
||||
require.NoError(t, storeInstance.Set("ns-a:group-1", "k", "v"))
|
||||
require.NoError(t, storeInstance.Set("ns-a:group-2", "k", "v"))
|
||||
require.NoError(t, storeInstance.Set("ns-b:group-1", "k", "v"))
|
||||
|
||||
groups, err := storeInstance.Groups("ns-a:")
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, groups, 2)
|
||||
assert.Contains(t, groups, "ns-a:g1")
|
||||
assert.Contains(t, groups, "ns-a:g2")
|
||||
assert.Equal(t, []string{"ns-a:group-1", "ns-a:group-2"}, groups)
|
||||
}
|
||||
|
||||
func TestScope_Groups_Good_EmptyPrefix(t *testing.T) {
|
||||
func TestScope_GroupsSeq_Good_EmptyPrefix(t *testing.T) {
|
||||
storeInstance, _ := New(":memory:")
|
||||
defer storeInstance.Close()
|
||||
|
||||
require.NoError(t, storeInstance.Set("g1", "k", "v"))
|
||||
require.NoError(t, storeInstance.Set("g2", "k", "v"))
|
||||
require.NoError(t, storeInstance.Set("g3", "k", "v"))
|
||||
require.NoError(t, storeInstance.Set("g1", "k1", "v"))
|
||||
require.NoError(t, storeInstance.Set("g2", "k2", "v"))
|
||||
|
||||
groups, err := storeInstance.Groups("")
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, groups, 3)
|
||||
var groups []string
|
||||
for groupName, err := range storeInstance.GroupsSeq("") {
|
||||
require.NoError(t, err)
|
||||
groups = append(groups, groupName)
|
||||
}
|
||||
assert.Equal(t, []string{"g1", "g2"}, groups)
|
||||
}
|
||||
|
||||
func TestScope_Groups_Good_Distinct(t *testing.T) {
|
||||
func TestScope_GroupsSeq_Good_StopsEarly(t *testing.T) {
|
||||
storeInstance, _ := New(":memory:")
|
||||
defer storeInstance.Close()
|
||||
|
||||
// Multiple keys in the same group should produce one entry.
|
||||
require.NoError(t, storeInstance.Set("g1", "a", "v"))
|
||||
require.NoError(t, storeInstance.Set("g1", "b", "v"))
|
||||
require.NoError(t, storeInstance.Set("g1", "c", "v"))
|
||||
require.NoError(t, storeInstance.Set("g1", "k1", "v"))
|
||||
require.NoError(t, storeInstance.Set("g2", "k2", "v"))
|
||||
|
||||
groups, err := storeInstance.Groups("")
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, groups, 1)
|
||||
assert.Equal(t, "g1", groups[0])
|
||||
count := 0
|
||||
for range storeInstance.GroupsSeq("") {
|
||||
count++
|
||||
break
|
||||
}
|
||||
assert.Equal(t, 1, count)
|
||||
}
|
||||
|
||||
func TestScope_Groups_Good_ExcludesExpired(t *testing.T) {
|
||||
storeInstance, _ := New(":memory:")
|
||||
defer storeInstance.Close()
|
||||
|
||||
require.NoError(t, storeInstance.Set("ns:g1", "permanent", "v"))
|
||||
require.NoError(t, storeInstance.SetWithTTL("ns:g2", "temp", "v", 1*time.Millisecond))
|
||||
time.Sleep(5 * time.Millisecond)
|
||||
|
||||
groups, err := storeInstance.Groups("ns:")
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, groups, 1, "group with only expired keys should be excluded")
|
||||
assert.Equal(t, "ns:g1", groups[0])
|
||||
func keyName(index int) string {
|
||||
return core.Sprintf("key-%02d", index)
|
||||
}
|
||||
|
||||
func TestScope_Groups_Good_SortedByGroupName(t *testing.T) {
|
||||
storeInstance, _ := New(":memory:")
|
||||
defer storeInstance.Close()
|
||||
|
||||
require.NoError(t, storeInstance.Set("charlie", "c", "3"))
|
||||
require.NoError(t, storeInstance.Set("alpha", "a", "1"))
|
||||
require.NoError(t, storeInstance.Set("bravo", "b", "2"))
|
||||
|
||||
groups, err := storeInstance.Groups("")
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, []string{"alpha", "bravo", "charlie"}, groups)
|
||||
}
|
||||
|
||||
func TestScope_Groups_Good_Empty(t *testing.T) {
|
||||
storeInstance, _ := New(":memory:")
|
||||
defer storeInstance.Close()
|
||||
|
||||
groups, err := storeInstance.Groups("nonexistent:")
|
||||
require.NoError(t, err)
|
||||
assert.Empty(t, groups)
|
||||
}
|
||||
|
||||
func TestScope_Groups_Bad_ClosedStore(t *testing.T) {
|
||||
storeInstance, _ := New(":memory:")
|
||||
storeInstance.Close()
|
||||
|
||||
_, err := storeInstance.Groups("")
|
||||
require.Error(t, err)
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Helpers
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func keyName(i int) string {
|
||||
return "key-" + string(rune('a'+i%26))
|
||||
}
|
||||
|
||||
func rawEntryCount(t *testing.T, storeInstance *Store, group string) int {
|
||||
t.Helper()
|
||||
func rawEntryCount(tb testing.TB, storeInstance *Store, group string) int {
|
||||
tb.Helper()
|
||||
|
||||
var count int
|
||||
err := storeInstance.database.QueryRow(
|
||||
"SELECT COUNT(*) FROM "+entriesTableName+" WHERE "+entryGroupColumn+" = ?",
|
||||
group,
|
||||
).Scan(&count)
|
||||
require.NoError(t, err)
|
||||
require.NoError(tb, err)
|
||||
return count
|
||||
}
|
||||
|
|
|
|||
10
store.go
10
store.go
|
|
@ -50,11 +50,10 @@ type Store struct {
|
|||
journal journalConfig
|
||||
|
||||
// Event dispatch state.
|
||||
watchers []*Watcher
|
||||
watchers map[string][]chan Event
|
||||
callbacks []changeCallbackRegistration
|
||||
watchersLock sync.RWMutex // protects watcher registration and dispatch
|
||||
callbacksLock sync.RWMutex // protects callback registration and dispatch
|
||||
nextWatcherRegistrationID uint64 // monotonic ID for watcher registrations
|
||||
nextCallbackRegistrationID uint64 // monotonic ID for callback registrations
|
||||
}
|
||||
|
||||
|
|
@ -93,7 +92,12 @@ func New(databasePath string, options ...StoreOption) (*Store, error) {
|
|||
}
|
||||
|
||||
purgeContext, cancel := context.WithCancel(context.Background())
|
||||
storeInstance := &Store{database: sqliteDatabase, cancelPurge: cancel, purgeInterval: 60 * time.Second}
|
||||
storeInstance := &Store{
|
||||
database: sqliteDatabase,
|
||||
cancelPurge: cancel,
|
||||
purgeInterval: 60 * time.Second,
|
||||
watchers: make(map[string][]chan Event),
|
||||
}
|
||||
for _, option := range options {
|
||||
if option != nil {
|
||||
option(storeInstance)
|
||||
|
|
|
|||
|
|
@ -1092,11 +1092,11 @@ func TestStore_SetWithTTL_Good_ExpiresOnGetEmitsDeleteEvent(t *testing.T) {
|
|||
storeInstance, _ := New(":memory:")
|
||||
defer storeInstance.Close()
|
||||
|
||||
watcher := storeInstance.Watch("g", "ephemeral")
|
||||
defer storeInstance.Unwatch(watcher)
|
||||
events := storeInstance.Watch("g")
|
||||
defer storeInstance.Unwatch("g", events)
|
||||
|
||||
require.NoError(t, storeInstance.SetWithTTL("g", "ephemeral", "gone-soon", 1*time.Millisecond))
|
||||
<-watcher.Events
|
||||
<-events
|
||||
|
||||
time.Sleep(5 * time.Millisecond)
|
||||
|
||||
|
|
@ -1105,7 +1105,7 @@ func TestStore_SetWithTTL_Good_ExpiresOnGetEmitsDeleteEvent(t *testing.T) {
|
|||
assert.True(t, core.Is(err, NotFoundError), "expired key should be NotFoundError")
|
||||
|
||||
select {
|
||||
case event := <-watcher.Events:
|
||||
case event := <-events:
|
||||
assert.Equal(t, EventDelete, event.Type)
|
||||
assert.Equal(t, "g", event.Group)
|
||||
assert.Equal(t, "ephemeral", event.Key)
|
||||
|
|
|
|||
|
|
@ -118,8 +118,8 @@ func TestWorkspace_Commit_Good_EmitsSummaryEvent(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
defer storeInstance.Close()
|
||||
|
||||
watcher := storeInstance.Watch(workspaceSummaryGroup("scroll-session"), "summary")
|
||||
defer storeInstance.Unwatch(watcher)
|
||||
events := storeInstance.Watch(workspaceSummaryGroup("scroll-session"))
|
||||
defer storeInstance.Unwatch(workspaceSummaryGroup("scroll-session"), events)
|
||||
|
||||
workspace, err := storeInstance.NewWorkspace("scroll-session")
|
||||
require.NoError(t, err)
|
||||
|
|
@ -131,7 +131,7 @@ func TestWorkspace_Commit_Good_EmitsSummaryEvent(t *testing.T) {
|
|||
require.True(t, result.OK, "workspace commit failed: %v", result.Value)
|
||||
|
||||
select {
|
||||
case event := <-watcher.Events:
|
||||
case event := <-events:
|
||||
assert.Equal(t, EventSet, event.Type)
|
||||
assert.Equal(t, workspaceSummaryGroup("scroll-session"), event.Group)
|
||||
assert.Equal(t, "summary", event.Key)
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue