[agent/codex:gpt-5.4-mini] Read ~/spec/code/core/go/store/RFC.md fully. Find features d... #31

Merged
Virgil merged 1 commit from agent/read---spec-code-core-go-store-rfc-md-fu into dev 2026-04-03 04:45:00 +00:00
10 changed files with 249 additions and 663 deletions

View file

@ -43,17 +43,17 @@ func main() {
fmt.Println(colourValue)
// Watch "config" mutations and print each event as it arrives.
watcher := storeInstance.Watch("config", "*")
defer storeInstance.Unwatch(watcher)
events := storeInstance.Watch("config")
defer storeInstance.Unwatch("config", events)
go func() {
for event := range watcher.Events {
for event := range events {
fmt.Println(event.Type, event.Group, event.Key, event.Value)
}
}()
// Store tenant-42 preferences under the "tenant-42:" prefix.
scopedStore, err := store.NewScoped(storeInstance, "tenant-42")
if err != nil {
scopedStore := store.NewScoped(storeInstance, "tenant-42")
if scopedStore == nil {
return
}
if err := scopedStore.Set("preferences", "locale", "en-GB"); err != nil {

View file

@ -284,9 +284,10 @@ func TestCoverage_ScopedStore_Bad_GroupsClosedStore(t *testing.T) {
storeInstance, _ := New(":memory:")
require.NoError(t, storeInstance.Close())
scopedStore, err := NewScoped(storeInstance, "tenant-a")
require.NoError(t, err)
scopedStore := NewScoped(storeInstance, "tenant-a")
require.NotNil(t, scopedStore)
var err error
_, err = scopedStore.Groups("")
require.Error(t, err)
assert.Contains(t, err.Error(), "store.Groups")

6
doc.go
View file

@ -30,10 +30,10 @@
// fmt.Println(entry.Key, entry.Value)
// }
//
// watcher := storeInstance.Watch("config", "*")
// defer storeInstance.Unwatch(watcher)
// events := storeInstance.Watch("config")
// defer storeInstance.Unwatch("config", events)
// go func() {
// for event := range watcher.Events {
// for event := range events {
// fmt.Println(event.Type, event.Group, event.Key, event.Value)
// }
// }()

123
events.go
View file

@ -1,7 +1,7 @@
package store
import (
"slices"
"reflect"
"sync"
"sync/atomic"
"time"
@ -50,20 +50,6 @@ type Event struct {
Timestamp time.Time
}
// Watcher exposes the read-only event stream returned by Watch.
// Usage example: `watcher := storeInstance.Watch("config", "*"); defer storeInstance.Unwatch(watcher); for event := range watcher.Events { if event.Type == EventDeleteGroup { return } }`
type Watcher struct {
// Usage example: `for event := range watcher.Events { if event.Key == "colour" { return } }`
Events <-chan Event
// eventsChannel is the internal write channel (same underlying channel as Events).
eventsChannel chan Event
groupPattern string
keyPattern string
registrationID uint64
}
// changeCallbackRegistration keeps the registration ID so unregister can remove
// the exact callback later.
type changeCallbackRegistration struct {
@ -71,51 +57,62 @@ type changeCallbackRegistration struct {
callback func(Event)
}
// Watch("config", "*") can hold 16 pending events before non-blocking sends
// start dropping new ones.
// Watch("config") can hold 16 pending events before non-blocking sends start
// dropping new ones.
const watcherEventBufferCapacity = 16
// Watch registers a buffered subscription for matching mutations.
// Usage example: `watcher := storeInstance.Watch("*", "*")`
// Usage example: `watcher := storeInstance.Watch("config")`
func (storeInstance *Store) Watch(group string, key ...string) *Watcher {
keyPattern := "*"
if len(key) > 0 && key[0] != "" {
keyPattern = key[0]
}
// Watch registers a buffered subscription for one group.
// Usage example: `events := storeInstance.Watch("config")`
// Usage example: `events := storeInstance.Watch("*")`
func (storeInstance *Store) Watch(group string) <-chan Event {
eventChannel := make(chan Event, watcherEventBufferCapacity)
watcher := &Watcher{
Events: eventChannel,
eventsChannel: eventChannel,
groupPattern: group,
keyPattern: keyPattern,
registrationID: atomic.AddUint64(&storeInstance.nextWatcherRegistrationID, 1),
}
storeInstance.watchersLock.Lock()
storeInstance.watchers = append(storeInstance.watchers, watcher)
if storeInstance.watchers == nil {
storeInstance.watchers = make(map[string][]chan Event)
}
storeInstance.watchers[group] = append(storeInstance.watchers[group], eventChannel)
storeInstance.watchersLock.Unlock()
return watcher
return eventChannel
}
// Unwatch removes a watcher and closes its event stream.
// Usage example: `storeInstance.Unwatch(watcher)`
func (storeInstance *Store) Unwatch(watcher *Watcher) {
if watcher == nil {
// Unwatch removes a watcher for one group and closes its event stream.
// Usage example: `storeInstance.Unwatch("config", events)`
func (storeInstance *Store) Unwatch(group string, events <-chan Event) {
if events == nil {
return
}
storeInstance.watchersLock.Lock()
defer storeInstance.watchersLock.Unlock()
storeInstance.watchers = slices.DeleteFunc(storeInstance.watchers, func(existing *Watcher) bool {
if existing.registrationID == watcher.registrationID {
close(watcher.eventsChannel)
return true
registeredEvents := storeInstance.watchers[group]
if len(registeredEvents) == 0 {
return
}
eventsPointer := channelPointer(events)
nextRegisteredEvents := registeredEvents[:0]
removed := false
for _, registeredChannel := range registeredEvents {
if channelPointer(registeredChannel) == eventsPointer {
if !removed {
close(registeredChannel)
removed = true
}
continue
}
return false
})
nextRegisteredEvents = append(nextRegisteredEvents, registeredChannel)
}
if !removed {
return
}
if len(nextRegisteredEvents) == 0 {
delete(storeInstance.watchers, group)
return
}
storeInstance.watchers[group] = nextRegisteredEvents
}
// OnChange registers a synchronous mutation callback.
@ -140,9 +137,12 @@ func (storeInstance *Store) OnChange(arguments ...any) func() {
once.Do(func() {
storeInstance.callbacksLock.Lock()
defer storeInstance.callbacksLock.Unlock()
storeInstance.callbacks = slices.DeleteFunc(storeInstance.callbacks, func(existing changeCallbackRegistration) bool {
return existing.registrationID == registrationID
})
for i := range storeInstance.callbacks {
if storeInstance.callbacks[i].registrationID == registrationID {
storeInstance.callbacks = append(storeInstance.callbacks[:i], storeInstance.callbacks[i+1:]...)
return
}
}
})
}
}
@ -188,13 +188,15 @@ func onChangeCallback(arguments []any) func(Event) {
// deadlocking.
func (storeInstance *Store) notify(event Event) {
storeInstance.watchersLock.RLock()
for _, watcher := range storeInstance.watchers {
if !watcherMatches(watcher, event) {
continue
}
// Non-blocking send: drop the event rather than block the writer.
for _, registeredChannel := range storeInstance.watchers["*"] {
select {
case watcher.eventsChannel <- event:
case registeredChannel <- event:
default:
}
}
for _, registeredChannel := range storeInstance.watchers[event.Group] {
select {
case registeredChannel <- event:
default:
}
}
@ -209,16 +211,9 @@ func (storeInstance *Store) notify(event Event) {
}
}
// watcherMatches reports whether Watch("config", "*") should receive
// Event{Group: "config", Key: "colour"}.
func watcherMatches(watcher *Watcher, event Event) bool {
if watcher.groupPattern != "*" && watcher.groupPattern != event.Group {
return false
func channelPointer(eventChannel <-chan Event) uintptr {
if eventChannel == nil {
return 0
}
if watcher.keyPattern != "*" && watcher.keyPattern != event.Key {
// EventDeleteGroup has an empty Key — only wildcard watchers or
// group-level watchers (key="*") should receive it.
return false
}
return true
return reflect.ValueOf(eventChannel).Pointer()
}

View file

@ -2,7 +2,6 @@ package store
import (
"sync"
"sync/atomic"
"testing"
"time"
@ -11,111 +10,37 @@ import (
"github.com/stretchr/testify/require"
)
// ---------------------------------------------------------------------------
// Watch — specific key
// ---------------------------------------------------------------------------
func TestEvents_Watch_Good_SpecificKey(t *testing.T) {
func TestEvents_Watch_Good_Group(t *testing.T) {
storeInstance, _ := New(":memory:")
defer storeInstance.Close()
watcher := storeInstance.Watch("config", "theme")
defer storeInstance.Unwatch(watcher)
require.NoError(t, storeInstance.Set("config", "theme", "dark"))
select {
case event := <-watcher.Events:
assert.Equal(t, EventSet, event.Type)
assert.Equal(t, "config", event.Group)
assert.Equal(t, "theme", event.Key)
assert.Equal(t, "dark", event.Value)
assert.False(t, event.Timestamp.IsZero())
case <-time.After(time.Second):
t.Fatal("timed out waiting for event")
}
// A Set to a different key in the same group should NOT trigger this watcher.
require.NoError(t, storeInstance.Set("config", "colour", "blue"))
select {
case event := <-watcher.Events:
t.Fatalf("unexpected event for non-matching key: %+v", event)
case <-time.After(50 * time.Millisecond):
// Expected: no event.
}
}
// ---------------------------------------------------------------------------
// Watch — wildcard key "*"
// ---------------------------------------------------------------------------
func TestEvents_Watch_Good_WildcardKey(t *testing.T) {
storeInstance, _ := New(":memory:")
defer storeInstance.Close()
watcher := storeInstance.Watch("config", "*")
defer storeInstance.Unwatch(watcher)
events := storeInstance.Watch("config")
defer storeInstance.Unwatch("config", events)
require.NoError(t, storeInstance.Set("config", "theme", "dark"))
require.NoError(t, storeInstance.Set("config", "colour", "blue"))
received := drainEvents(watcher.Events, 2, time.Second)
received := drainEvents(events, 2, time.Second)
require.Len(t, received, 2)
assert.Equal(t, "theme", received[0].Key)
assert.Equal(t, "colour", received[1].Key)
assert.Equal(t, "config", received[0].Group)
assert.Equal(t, "config", received[1].Group)
}
func TestEvents_Watch_Good_DefaultWildcardKey(t *testing.T) {
func TestEvents_Watch_Good_WildcardGroup(t *testing.T) {
storeInstance, _ := New(":memory:")
defer storeInstance.Close()
watcher := storeInstance.Watch("config")
defer storeInstance.Unwatch(watcher)
require.NoError(t, storeInstance.Set("config", "theme", "dark"))
require.NoError(t, storeInstance.Set("config", "colour", "blue"))
received := drainEvents(watcher.Events, 2, time.Second)
require.Len(t, received, 2)
assert.Equal(t, "theme", received[0].Key)
assert.Equal(t, "colour", received[1].Key)
}
func TestEvents_Watch_Good_GroupMismatch(t *testing.T) {
storeInstance, _ := New(":memory:")
defer storeInstance.Close()
watcher := storeInstance.Watch("config", "*")
defer storeInstance.Unwatch(watcher)
require.NoError(t, storeInstance.Set("other", "theme", "dark"))
select {
case event := <-watcher.Events:
t.Fatalf("unexpected event for non-matching group: %+v", event)
case <-time.After(50 * time.Millisecond):
// Expected: no event.
}
}
// ---------------------------------------------------------------------------
// Watch — wildcard ("*", "*") matches everything
// ---------------------------------------------------------------------------
func TestEvents_Watch_Good_WildcardAll(t *testing.T) {
storeInstance, _ := New(":memory:")
defer storeInstance.Close()
watcher := storeInstance.Watch("*", "*")
defer storeInstance.Unwatch(watcher)
events := storeInstance.Watch("*")
defer storeInstance.Unwatch("*", events)
require.NoError(t, storeInstance.Set("g1", "k1", "v1"))
require.NoError(t, storeInstance.Set("g2", "k2", "v2"))
require.NoError(t, storeInstance.Delete("g1", "k1"))
require.NoError(t, storeInstance.DeleteGroup("g2"))
received := drainEvents(watcher.Events, 4, time.Second)
received := drainEvents(events, 4, time.Second)
require.Len(t, received, 4)
assert.Equal(t, EventSet, received[0].Type)
assert.Equal(t, EventSet, received[1].Type)
@ -123,22 +48,16 @@ func TestEvents_Watch_Good_WildcardAll(t *testing.T) {
assert.Equal(t, EventDeleteGroup, received[3].Type)
}
// ---------------------------------------------------------------------------
// Unwatch — stops delivery, channel closed
// ---------------------------------------------------------------------------
func TestEvents_Unwatch_Good_StopsDelivery(t *testing.T) {
storeInstance, _ := New(":memory:")
defer storeInstance.Close()
watcher := storeInstance.Watch("g", "k")
storeInstance.Unwatch(watcher)
events := storeInstance.Watch("g")
storeInstance.Unwatch("g", events)
// Channel should be closed.
_, open := <-watcher.Events
_, open := <-events
assert.False(t, open, "channel should be closed after Unwatch")
// Set after Unwatch should not panic or block.
require.NoError(t, storeInstance.Set("g", "k", "v"))
}
@ -146,114 +65,65 @@ func TestEvents_Unwatch_Good_Idempotent(t *testing.T) {
storeInstance, _ := New(":memory:")
defer storeInstance.Close()
watcher := storeInstance.Watch("g", "k")
// Calling Unwatch multiple times should not panic.
storeInstance.Unwatch(watcher)
storeInstance.Unwatch(watcher) // second call is a no-op
events := storeInstance.Watch("g")
storeInstance.Unwatch("g", events)
storeInstance.Unwatch("g", events)
}
func TestEvents_Unwatch_Good_NilWatcher(t *testing.T) {
func TestEvents_Unwatch_Good_NilChannel(t *testing.T) {
storeInstance, _ := New(":memory:")
defer storeInstance.Close()
storeInstance.Unwatch(nil)
storeInstance.Unwatch("g", nil)
}
// ---------------------------------------------------------------------------
// Delete triggers event
// ---------------------------------------------------------------------------
func TestEvents_Watch_Good_DeleteEvent(t *testing.T) {
storeInstance, _ := New(":memory:")
defer storeInstance.Close()
watcher := storeInstance.Watch("g", "k")
defer storeInstance.Unwatch(watcher)
events := storeInstance.Watch("g")
defer storeInstance.Unwatch("g", events)
require.NoError(t, storeInstance.Set("g", "k", "v"))
// Drain the Set event.
<-watcher.Events
<-events
require.NoError(t, storeInstance.Delete("g", "k"))
select {
case event := <-watcher.Events:
case event := <-events:
assert.Equal(t, EventDelete, event.Type)
assert.Equal(t, "g", event.Group)
assert.Equal(t, "k", event.Key)
assert.Empty(t, event.Value, "Delete events should have empty Value")
assert.Empty(t, event.Value)
case <-time.After(time.Second):
t.Fatal("timed out waiting for delete event")
}
}
func TestEvents_Watch_Good_DeleteMissingKeyDoesNotEmitEvent(t *testing.T) {
storeInstance, _ := New(":memory:")
defer storeInstance.Close()
watcher := storeInstance.Watch("*", "*")
defer storeInstance.Unwatch(watcher)
require.NoError(t, storeInstance.Delete("g", "missing"))
select {
case event := <-watcher.Events:
t.Fatalf("unexpected event for missing key delete: %+v", event)
default:
}
}
// ---------------------------------------------------------------------------
// DeleteGroup triggers event
// ---------------------------------------------------------------------------
func TestEvents_Watch_Good_DeleteGroupEvent(t *testing.T) {
storeInstance, _ := New(":memory:")
defer storeInstance.Close()
// A wildcard-key watcher for the group should receive DeleteGroup events.
watcher := storeInstance.Watch("g", "*")
defer storeInstance.Unwatch(watcher)
events := storeInstance.Watch("g")
defer storeInstance.Unwatch("g", events)
require.NoError(t, storeInstance.Set("g", "a", "1"))
require.NoError(t, storeInstance.Set("g", "b", "2"))
// Drain Set events.
<-watcher.Events
<-watcher.Events
<-events
<-events
require.NoError(t, storeInstance.DeleteGroup("g"))
select {
case event := <-watcher.Events:
case event := <-events:
assert.Equal(t, EventDeleteGroup, event.Type)
assert.Equal(t, "g", event.Group)
assert.Empty(t, event.Key, "DeleteGroup events should have empty Key")
assert.Empty(t, event.Key)
case <-time.After(time.Second):
t.Fatal("timed out waiting for delete_group event")
}
}
func TestEvents_Watch_Good_DeleteMissingGroupDoesNotEmitEvent(t *testing.T) {
storeInstance, _ := New(":memory:")
defer storeInstance.Close()
watcher := storeInstance.Watch("*", "*")
defer storeInstance.Unwatch(watcher)
require.NoError(t, storeInstance.DeleteGroup("missing"))
select {
case event := <-watcher.Events:
t.Fatalf("unexpected event for missing group delete: %+v", event)
default:
}
}
// ---------------------------------------------------------------------------
// OnChange — callback fires on mutations
// ---------------------------------------------------------------------------
func TestEvents_OnChange_Good_Fires(t *testing.T) {
storeInstance, _ := New(":memory:")
defer storeInstance.Close()
@ -278,242 +148,96 @@ func TestEvents_OnChange_Good_Fires(t *testing.T) {
assert.Equal(t, EventDelete, events[1].Type)
}
// ---------------------------------------------------------------------------
// OnChange — unregister stops callback
// ---------------------------------------------------------------------------
func TestEvents_OnChange_Good_Unregister(t *testing.T) {
func TestEvents_OnChange_Good_GroupFilter(t *testing.T) {
storeInstance, _ := New(":memory:")
defer storeInstance.Close()
var count atomic.Int32
unregister := storeInstance.OnChange(func(event Event) {
count.Add(1)
})
require.NoError(t, storeInstance.Set("g", "k", "v1"))
assert.Equal(t, int32(1), count.Load())
unregister()
require.NoError(t, storeInstance.Set("g", "k", "v2"))
assert.Equal(t, int32(1), count.Load(), "callback should not fire after unregister")
// Calling unregister again should not panic.
unregister()
}
func TestEvents_OnChange_Good_NilCallbackNoOp(t *testing.T) {
storeInstance, _ := New(":memory:")
defer storeInstance.Close()
unregister := storeInstance.OnChange(nil)
require.NotNil(t, unregister)
unregister()
require.NoError(t, storeInstance.Set("g", "k", "v"))
unregister()
}
func TestEvents_OnChange_Good_GroupCallback(t *testing.T) {
storeInstance, _ := New(":memory:")
defer storeInstance.Close()
var keys []string
var calls []string
unregister := storeInstance.OnChange("config", func(key, value string) {
keys = append(keys, key)
calls = append(calls, key+"="+value)
})
defer unregister()
require.NoError(t, storeInstance.Set("config", "theme", "dark"))
require.NoError(t, storeInstance.Set("other", "theme", "ignored"))
require.NoError(t, storeInstance.Set("other", "theme", "light"))
assert.Equal(t, []string{"theme"}, keys)
assert.Equal(t, []string{"theme=dark"}, calls)
}
// ---------------------------------------------------------------------------
// OnChange — callback can manage subscriptions while handling an event
// ---------------------------------------------------------------------------
func TestEvents_OnChange_Good_ReentrantSubscriptions(t *testing.T) {
func TestEvents_Watch_Good_BufferDrops(t *testing.T) {
storeInstance, _ := New(":memory:")
defer storeInstance.Close()
var callbackCount atomic.Int32
var unregister func()
unregister = storeInstance.OnChange(func(event Event) {
callbackCount.Add(1)
events := storeInstance.Watch("g")
defer storeInstance.Unwatch("g", events)
nestedWatcher := storeInstance.Watch("nested", "*")
storeInstance.Unwatch(nestedWatcher)
if unregister != nil {
unregister()
}
})
writeDone := make(chan error, 1)
go func() {
writeDone <- storeInstance.Set("g", "k", "v")
}()
select {
case err := <-writeDone:
require.NoError(t, err)
case <-time.After(time.Second):
t.Fatal("timed out waiting for Set to complete")
for i := 0; i < watcherEventBufferCapacity+8; i++ {
require.NoError(t, storeInstance.Set("g", core.Sprintf("k-%d", i), "v"))
}
assert.Equal(t, int32(1), callbackCount.Load())
// The callback unregistered itself, so later writes should not increment it.
require.NoError(t, storeInstance.Set("g", "k", "v2"))
assert.Equal(t, int32(1), callbackCount.Load())
received := drainEvents(events, watcherEventBufferCapacity, time.Second)
assert.LessOrEqual(t, len(received), watcherEventBufferCapacity)
}
// ---------------------------------------------------------------------------
// Buffer-full doesn't block the writer
// ---------------------------------------------------------------------------
func TestEvents_Watch_Good_BufferFullDoesNotBlock(t *testing.T) {
storeInstance, _ := New(":memory:")
defer storeInstance.Close()
watcher := storeInstance.Watch("g", "*")
defer storeInstance.Unwatch(watcher)
// Fill the buffer (cap 16) plus extra writes. None should block.
done := make(chan struct{})
go func() {
defer close(done)
for i := range 32 {
require.NoError(t, storeInstance.Set("g", core.Sprintf("k%d", i), "v"))
}
}()
select {
case <-done:
// Success: all writes completed without blocking.
case <-time.After(5 * time.Second):
t.Fatal("writes blocked — buffer-full condition caused deadlock")
}
// Drain what we can — should get exactly watcherEventBufferCapacity events.
var received int
for range watcherEventBufferCapacity {
select {
case <-watcher.Events:
received++
default:
}
}
assert.Equal(t, watcherEventBufferCapacity, received, "should receive exactly buffer-size events")
}
// ---------------------------------------------------------------------------
// Multiple watchers on same key
// ---------------------------------------------------------------------------
func TestEvents_Watch_Good_MultipleWatchersSameKey(t *testing.T) {
storeInstance, _ := New(":memory:")
defer storeInstance.Close()
firstWatcher := storeInstance.Watch("g", "k")
secondWatcher := storeInstance.Watch("g", "k")
defer storeInstance.Unwatch(firstWatcher)
defer storeInstance.Unwatch(secondWatcher)
require.NoError(t, storeInstance.Set("g", "k", "v"))
// Both watchers should receive the event independently.
select {
case event := <-firstWatcher.Events:
assert.Equal(t, EventSet, event.Type)
case <-time.After(time.Second):
t.Fatal("firstWatcher timed out")
}
select {
case event := <-secondWatcher.Events:
assert.Equal(t, EventSet, event.Type)
case <-time.After(time.Second):
t.Fatal("secondWatcher timed out")
}
}
// ---------------------------------------------------------------------------
// Concurrent Watch/Unwatch during writes (race test)
// ---------------------------------------------------------------------------
func TestEvents_Watch_Good_ConcurrentWatchUnwatch(t *testing.T) {
storeInstance, _ := New(":memory:")
defer storeInstance.Close()
const goroutines = 10
const ops = 50
const workers = 10
var wg sync.WaitGroup
wg.Add(workers)
var waitGroup sync.WaitGroup
// Writers — continuously mutate the store.
waitGroup.Go(func() {
for i := range goroutines * ops {
_ = storeInstance.Set("g", core.Sprintf("k%d", i), "v")
}
})
// Watchers — add and remove watchers concurrently.
for range goroutines {
waitGroup.Go(func() {
for range ops {
watcher := storeInstance.Watch("g", "*")
// Drain a few events to exercise the channel path.
for range 3 {
select {
case <-watcher.Events:
case <-time.After(time.Millisecond):
}
}
storeInstance.Unwatch(watcher)
}
})
for worker := 0; worker < workers; worker++ {
go func(worker int) {
defer wg.Done()
group := core.Sprintf("g-%d", worker)
events := storeInstance.Watch(group)
_ = storeInstance.Set(group, "k", "v")
storeInstance.Unwatch(group, events)
}(worker)
}
waitGroup.Wait()
// If we got here without a data race or panic, the test passes.
wg.Wait()
}
// ---------------------------------------------------------------------------
// ScopedStore events — prefixed group name
// ---------------------------------------------------------------------------
func TestEvents_Watch_Good_ScopedStoreEvents(t *testing.T) {
func TestEvents_Watch_Good_ScopedStoreEventGroup(t *testing.T) {
storeInstance, _ := New(":memory:")
defer storeInstance.Close()
scopedStore, err := NewScoped(storeInstance, "tenant-a")
require.NoError(t, err)
scopedStore := NewScoped(storeInstance, "tenant-a")
require.NotNil(t, scopedStore)
// Watch on the underlying store with the full prefixed group name.
watcher := storeInstance.Watch("tenant-a:config", "theme")
defer storeInstance.Unwatch(watcher)
events := storeInstance.Watch("tenant-a:config")
defer storeInstance.Unwatch("tenant-a:config", events)
require.NoError(t, scopedStore.Set("config", "theme", "dark"))
select {
case event := <-watcher.Events:
assert.Equal(t, EventSet, event.Type)
case event := <-events:
assert.Equal(t, "tenant-a:config", event.Group)
assert.Equal(t, "theme", event.Key)
assert.Equal(t, "dark", event.Value)
case <-time.After(time.Second):
t.Fatal("timed out waiting for scoped store event")
t.Fatal("timed out waiting for scoped event")
}
}
// ---------------------------------------------------------------------------
// EventType.String()
// ---------------------------------------------------------------------------
func TestEvents_Watch_Good_SetWithTTL(t *testing.T) {
storeInstance, _ := New(":memory:")
defer storeInstance.Close()
events := storeInstance.Watch("g")
defer storeInstance.Unwatch("g", events)
require.NoError(t, storeInstance.SetWithTTL("g", "ephemeral", "v", time.Minute))
select {
case event := <-events:
assert.Equal(t, EventSet, event.Type)
assert.Equal(t, "ephemeral", event.Key)
case <-time.After(time.Second):
t.Fatal("timed out waiting for TTL event")
}
}
func TestEvents_EventType_Good_String(t *testing.T) {
assert.Equal(t, "set", EventSet.String())
@ -522,45 +246,16 @@ func TestEvents_EventType_Good_String(t *testing.T) {
assert.Equal(t, "unknown", EventType(99).String())
}
// ---------------------------------------------------------------------------
// SetWithTTL emits events
// ---------------------------------------------------------------------------
func TestEvents_Watch_Good_SetWithTTLEmitsEvent(t *testing.T) {
storeInstance, _ := New(":memory:")
defer storeInstance.Close()
watcher := storeInstance.Watch("g", "k")
defer storeInstance.Unwatch(watcher)
require.NoError(t, storeInstance.SetWithTTL("g", "k", "ttl-val", time.Hour))
select {
case event := <-watcher.Events:
assert.Equal(t, EventSet, event.Type)
assert.Equal(t, "g", event.Group)
assert.Equal(t, "k", event.Key)
assert.Equal(t, "ttl-val", event.Value)
case <-time.After(time.Second):
t.Fatal("timed out waiting for SetWithTTL event")
}
}
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
// drainEvents collects up to n events from ch within the given timeout.
func drainEvents(ch <-chan Event, count int, timeout time.Duration) []Event {
var events []Event
func drainEvents(events <-chan Event, count int, timeout time.Duration) []Event {
received := make([]Event, 0, count)
deadline := time.After(timeout)
for range count {
for len(received) < count {
select {
case event := <-ch:
events = append(events, event)
case event := <-events:
received = append(received, event)
case <-deadline:
return events
return received
}
}
return events
return received
}

View file

@ -23,7 +23,7 @@ type QuotaConfig struct {
}
// ScopedStore prefixes group names with namespace + ":" before delegating to Store.
// Usage example: `scopedStore, err := store.NewScoped(storeInstance, "tenant-a"); if err != nil { return }; if err := scopedStore.Set("config", "colour", "blue"); err != nil { return }`
// Usage example: `scopedStore := store.NewScoped(storeInstance, "tenant-a"); if scopedStore == nil { return }; if err := scopedStore.Set("config", "colour", "blue"); err != nil { return }`
type ScopedStore struct {
storeInstance *Store
namespace string
@ -32,24 +32,27 @@ type ScopedStore struct {
}
// NewScoped validates a namespace and prefixes groups with namespace + ":".
// Usage example: `scopedStore, err := store.NewScoped(storeInstance, "tenant-a"); if err != nil { return }`
func NewScoped(storeInstance *Store, namespace string) (*ScopedStore, error) {
// Usage example: `scopedStore := store.NewScoped(storeInstance, "tenant-a")`
func NewScoped(storeInstance *Store, namespace string) *ScopedStore {
if storeInstance == nil {
return nil, core.E("store.NewScoped", "store instance is nil", nil)
return nil
}
if !validNamespace.MatchString(namespace) {
return nil, core.E("store.NewScoped", core.Sprintf("namespace %q is invalid; use names like %q or %q", namespace, "tenant-a", "tenant-42"), nil)
return nil
}
scopedStore := &ScopedStore{storeInstance: storeInstance, namespace: namespace}
return scopedStore, nil
return scopedStore
}
// NewScopedWithQuota adds per-namespace key and group limits.
// Usage example: `scopedStore, err := store.NewScopedWithQuota(storeInstance, "tenant-a", store.QuotaConfig{MaxKeys: 100, MaxGroups: 10}); if err != nil { return }`
func NewScopedWithQuota(storeInstance *Store, namespace string, quota QuotaConfig) (*ScopedStore, error) {
scopedStore, err := NewScoped(storeInstance, namespace)
if err != nil {
return nil, err
scopedStore := NewScoped(storeInstance, namespace)
if scopedStore == nil {
if storeInstance == nil {
return nil, core.E("store.NewScopedWithQuota", "store instance is nil", nil)
}
return nil, core.E("store.NewScopedWithQuota", core.Sprintf("namespace %q is invalid; use names like %q or %q", namespace, "tenant-a", "tenant-42"), nil)
}
if quota.MaxKeys < 0 || quota.MaxGroups < 0 {
return nil, core.E(
@ -80,7 +83,7 @@ func (scopedStore *ScopedStore) trimNamespacePrefix(groupName string) string {
}
// Namespace returns the namespace string.
// Usage example: `scopedStore, err := store.NewScoped(storeInstance, "tenant-a"); if err != nil { return }; namespace := scopedStore.Namespace(); fmt.Println(namespace)`
// Usage example: `scopedStore := store.NewScoped(storeInstance, "tenant-a"); if scopedStore == nil { return }; namespace := scopedStore.Namespace(); fmt.Println(namespace)`
func (scopedStore *ScopedStore) Namespace() string {
return scopedStore.namespace
}

View file

@ -9,6 +9,14 @@ import (
"github.com/stretchr/testify/require"
)
func mustScoped(t *testing.T, storeInstance *Store, namespace string) *ScopedStore {
t.Helper()
scopedStore := NewScoped(storeInstance, namespace)
require.NotNil(t, scopedStore)
return scopedStore
}
// ---------------------------------------------------------------------------
// NewScoped — constructor validation
// ---------------------------------------------------------------------------
@ -17,9 +25,7 @@ func TestScope_NewScoped_Good(t *testing.T) {
storeInstance, _ := New(":memory:")
defer storeInstance.Close()
scopedStore, err := NewScoped(storeInstance, "tenant-1")
require.NoError(t, err)
require.NotNil(t, scopedStore)
scopedStore := mustScoped(t, storeInstance, "tenant-1")
assert.Equal(t, "tenant-1", scopedStore.Namespace())
}
@ -27,11 +33,8 @@ func TestScope_NewScoped_Good_AlphanumericHyphens(t *testing.T) {
storeInstance, _ := New(":memory:")
defer storeInstance.Close()
valid := []string{"abc", "ABC", "123", "a-b-c", "tenant-42", "A1-B2"}
for _, namespace := range valid {
scopedStore, err := NewScoped(storeInstance, namespace)
require.NoError(t, err, "namespace %q should be valid", namespace)
require.NotNil(t, scopedStore)
for _, namespace := range []string{"abc", "ABC", "123", "a-b-c", "tenant-42", "A1-B2"} {
require.NotNil(t, NewScoped(storeInstance, namespace), "namespace %q should be valid", namespace)
}
}
@ -39,25 +42,19 @@ func TestScope_NewScoped_Bad_Empty(t *testing.T) {
storeInstance, _ := New(":memory:")
defer storeInstance.Close()
_, err := NewScoped(storeInstance, "")
require.Error(t, err)
assert.Contains(t, err.Error(), "invalid")
assert.Nil(t, NewScoped(storeInstance, ""))
}
func TestScope_NewScoped_Bad_NilStore(t *testing.T) {
_, err := NewScoped(nil, "tenant-a")
require.Error(t, err)
assert.Contains(t, err.Error(), "store instance is nil")
assert.Nil(t, NewScoped(nil, "tenant-a"))
}
func TestScope_NewScoped_Bad_InvalidChars(t *testing.T) {
storeInstance, _ := New(":memory:")
defer storeInstance.Close()
invalid := []string{"foo.bar", "foo:bar", "foo bar", "foo/bar", "foo_bar", "tenant!", "@ns"}
for _, namespace := range invalid {
_, err := NewScoped(storeInstance, namespace)
require.Error(t, err, "namespace %q should be invalid", namespace)
for _, namespace := range []string{"foo.bar", "foo:bar", "foo bar", "foo/bar", "foo_bar", "tenant!", "@ns"} {
assert.Nil(t, NewScoped(storeInstance, namespace), "namespace %q should be invalid", namespace)
}
}
@ -67,7 +64,7 @@ func TestScope_NewScopedWithQuota_Bad_InvalidNamespace(t *testing.T) {
_, err := NewScopedWithQuota(storeInstance, "tenant_a", QuotaConfig{MaxKeys: 1})
require.Error(t, err)
assert.Contains(t, err.Error(), "store.NewScoped")
assert.Contains(t, err.Error(), "namespace")
}
func TestScope_NewScopedWithQuota_Bad_NilStore(t *testing.T) {
@ -113,7 +110,7 @@ func TestScope_ScopedStore_Good_SetGet(t *testing.T) {
storeInstance, _ := New(":memory:")
defer storeInstance.Close()
scopedStore, _ := NewScoped(storeInstance, "tenant-a")
scopedStore := mustScoped(t, storeInstance, "tenant-a")
require.NoError(t, scopedStore.Set("config", "theme", "dark"))
value, err := scopedStore.Get("config", "theme")
@ -125,7 +122,7 @@ func TestScope_ScopedStore_Good_DefaultGroupHelpers(t *testing.T) {
storeInstance, _ := New(":memory:")
defer storeInstance.Close()
scopedStore, _ := NewScoped(storeInstance, "tenant-a")
scopedStore := mustScoped(t, storeInstance, "tenant-a")
require.NoError(t, scopedStore.Set("theme", "dark"))
value, err := scopedStore.Get("theme")
@ -141,7 +138,7 @@ func TestScope_ScopedStore_Good_SetInGetFrom(t *testing.T) {
storeInstance, _ := New(":memory:")
defer storeInstance.Close()
scopedStore, _ := NewScoped(storeInstance, "tenant-a")
scopedStore := mustScoped(t, storeInstance, "tenant-a")
require.NoError(t, scopedStore.SetIn("config", "theme", "dark"))
value, err := scopedStore.GetFrom("config", "theme")
@ -153,15 +150,13 @@ func TestScope_ScopedStore_Good_PrefixedInUnderlyingStore(t *testing.T) {
storeInstance, _ := New(":memory:")
defer storeInstance.Close()
scopedStore, _ := NewScoped(storeInstance, "tenant-a")
scopedStore := mustScoped(t, storeInstance, "tenant-a")
require.NoError(t, scopedStore.Set("config", "key", "val"))
// The underlying store should have the prefixed group name.
value, err := storeInstance.Get("tenant-a:config", "key")
require.NoError(t, err)
assert.Equal(t, "val", value)
// Direct access without prefix should fail.
_, err = storeInstance.Get("config", "key")
assert.True(t, core.Is(err, NotFoundError))
}
@ -170,8 +165,8 @@ func TestScope_ScopedStore_Good_NamespaceIsolation(t *testing.T) {
storeInstance, _ := New(":memory:")
defer storeInstance.Close()
alphaStore, _ := NewScoped(storeInstance, "tenant-a")
betaStore, _ := NewScoped(storeInstance, "tenant-b")
alphaStore := mustScoped(t, storeInstance, "tenant-a")
betaStore := mustScoped(t, storeInstance, "tenant-b")
require.NoError(t, alphaStore.Set("config", "colour", "blue"))
require.NoError(t, betaStore.Set("config", "colour", "red"))
@ -189,7 +184,7 @@ func TestScope_ScopedStore_Good_Delete(t *testing.T) {
storeInstance, _ := New(":memory:")
defer storeInstance.Close()
scopedStore, _ := NewScoped(storeInstance, "tenant-a")
scopedStore := mustScoped(t, storeInstance, "tenant-a")
require.NoError(t, scopedStore.Set("g", "k", "v"))
require.NoError(t, scopedStore.Delete("g", "k"))
@ -201,7 +196,7 @@ func TestScope_ScopedStore_Good_DeleteGroup(t *testing.T) {
storeInstance, _ := New(":memory:")
defer storeInstance.Close()
scopedStore, _ := NewScoped(storeInstance, "tenant-a")
scopedStore := mustScoped(t, storeInstance, "tenant-a")
require.NoError(t, scopedStore.Set("g", "a", "1"))
require.NoError(t, scopedStore.Set("g", "b", "2"))
require.NoError(t, scopedStore.DeleteGroup("g"))
@ -215,8 +210,8 @@ func TestScope_ScopedStore_Good_GetAll(t *testing.T) {
storeInstance, _ := New(":memory:")
defer storeInstance.Close()
alphaStore, _ := NewScoped(storeInstance, "tenant-a")
betaStore, _ := NewScoped(storeInstance, "tenant-b")
alphaStore := mustScoped(t, storeInstance, "tenant-a")
betaStore := mustScoped(t, storeInstance, "tenant-b")
require.NoError(t, alphaStore.Set("items", "x", "1"))
require.NoError(t, alphaStore.Set("items", "y", "2"))
@ -235,7 +230,7 @@ func TestScope_ScopedStore_Good_All(t *testing.T) {
storeInstance, _ := New(":memory:")
defer storeInstance.Close()
scopedStore, _ := NewScoped(storeInstance, "tenant-a")
scopedStore := mustScoped(t, storeInstance, "tenant-a")
require.NoError(t, scopedStore.Set("items", "first", "1"))
require.NoError(t, scopedStore.Set("items", "second", "2"))
@ -252,7 +247,7 @@ func TestScope_ScopedStore_Good_All_SortedByKey(t *testing.T) {
storeInstance, _ := New(":memory:")
defer storeInstance.Close()
scopedStore, _ := NewScoped(storeInstance, "tenant-a")
scopedStore := mustScoped(t, storeInstance, "tenant-a")
require.NoError(t, scopedStore.Set("items", "charlie", "3"))
require.NoError(t, scopedStore.Set("items", "alpha", "1"))
require.NoError(t, scopedStore.Set("items", "bravo", "2"))
@ -270,7 +265,7 @@ func TestScope_ScopedStore_Good_Count(t *testing.T) {
storeInstance, _ := New(":memory:")
defer storeInstance.Close()
scopedStore, _ := NewScoped(storeInstance, "tenant-a")
scopedStore := mustScoped(t, storeInstance, "tenant-a")
require.NoError(t, scopedStore.Set("g", "a", "1"))
require.NoError(t, scopedStore.Set("g", "b", "2"))
@ -283,7 +278,7 @@ func TestScope_ScopedStore_Good_SetWithTTL(t *testing.T) {
storeInstance, _ := New(":memory:")
defer storeInstance.Close()
scopedStore, _ := NewScoped(storeInstance, "tenant-a")
scopedStore := mustScoped(t, storeInstance, "tenant-a")
require.NoError(t, scopedStore.SetWithTTL("g", "k", "v", time.Hour))
value, err := scopedStore.Get("g", "k")
@ -295,7 +290,7 @@ func TestScope_ScopedStore_Good_SetWithTTL_Expires(t *testing.T) {
storeInstance, _ := New(":memory:")
defer storeInstance.Close()
scopedStore, _ := NewScoped(storeInstance, "tenant-a")
scopedStore := mustScoped(t, storeInstance, "tenant-a")
require.NoError(t, scopedStore.SetWithTTL("g", "k", "v", 1*time.Millisecond))
time.Sleep(5 * time.Millisecond)
@ -307,7 +302,7 @@ func TestScope_ScopedStore_Good_Render(t *testing.T) {
storeInstance, _ := New(":memory:")
defer storeInstance.Close()
scopedStore, _ := NewScoped(storeInstance, "tenant-a")
scopedStore := mustScoped(t, storeInstance, "tenant-a")
require.NoError(t, scopedStore.Set("user", "name", "Alice"))
renderedTemplate, err := scopedStore.Render("Hello {{ .name }}", "user")
@ -319,8 +314,8 @@ func TestScope_ScopedStore_Good_BulkHelpers(t *testing.T) {
storeInstance, _ := New(":memory:")
defer storeInstance.Close()
alphaStore, _ := NewScoped(storeInstance, "tenant-a")
betaStore, _ := NewScoped(storeInstance, "tenant-b")
alphaStore := mustScoped(t, storeInstance, "tenant-a")
betaStore := mustScoped(t, storeInstance, "tenant-b")
require.NoError(t, alphaStore.Set("config", "colour", "blue"))
require.NoError(t, alphaStore.Set("sessions", "token", "abc123"))
@ -361,7 +356,7 @@ func TestScope_ScopedStore_Good_GroupsSeqStopsEarly(t *testing.T) {
storeInstance, _ := New(":memory:")
defer storeInstance.Close()
scopedStore, _ := NewScoped(storeInstance, "tenant-a")
scopedStore := mustScoped(t, storeInstance, "tenant-a")
require.NoError(t, scopedStore.Set("alpha", "a", "1"))
require.NoError(t, scopedStore.Set("beta", "b", "2"))
@ -380,7 +375,7 @@ func TestScope_ScopedStore_Good_GroupsSeqSorted(t *testing.T) {
storeInstance, _ := New(":memory:")
defer storeInstance.Close()
scopedStore, _ := NewScoped(storeInstance, "tenant-a")
scopedStore := mustScoped(t, storeInstance, "tenant-a")
require.NoError(t, scopedStore.Set("charlie", "c", "3"))
require.NoError(t, scopedStore.Set("alpha", "a", "1"))
require.NoError(t, scopedStore.Set("bravo", "b", "2"))
@ -398,7 +393,7 @@ func TestScope_ScopedStore_Good_GetSplitAndGetFields(t *testing.T) {
storeInstance, _ := New(":memory:")
defer storeInstance.Close()
scopedStore, _ := NewScoped(storeInstance, "tenant-a")
scopedStore := mustScoped(t, storeInstance, "tenant-a")
require.NoError(t, scopedStore.Set("config", "hosts", "alpha,beta,gamma"))
require.NoError(t, scopedStore.Set("config", "flags", "one two\tthree\n"))
@ -425,7 +420,7 @@ func TestScope_ScopedStore_Good_PurgeExpired(t *testing.T) {
storeInstance, _ := New(":memory:")
defer storeInstance.Close()
scopedStore, _ := NewScoped(storeInstance, "tenant-a")
scopedStore := mustScoped(t, storeInstance, "tenant-a")
require.NoError(t, scopedStore.SetWithTTL("session", "token", "abc123", 1*time.Millisecond))
time.Sleep(5 * time.Millisecond)
@ -441,8 +436,8 @@ func TestScope_ScopedStore_Good_PurgeExpired_NamespaceLocal(t *testing.T) {
storeInstance, _ := New(":memory:")
defer storeInstance.Close()
alphaStore, _ := NewScoped(storeInstance, "tenant-a")
betaStore, _ := NewScoped(storeInstance, "tenant-b")
alphaStore := mustScoped(t, storeInstance, "tenant-a")
betaStore := mustScoped(t, storeInstance, "tenant-b")
require.NoError(t, alphaStore.SetWithTTL("session", "alpha-token", "alpha", 1*time.Millisecond))
require.NoError(t, betaStore.SetWithTTL("session", "beta-token", "beta", 1*time.Millisecond))
@ -470,15 +465,13 @@ func TestScope_Quota_Good_MaxKeys(t *testing.T) {
scopedStore, err := NewScopedWithQuota(storeInstance, "tenant-a", QuotaConfig{MaxKeys: 5})
require.NoError(t, err)
// Insert 5 keys across different groups — should be fine.
for i := range 5 {
require.NoError(t, scopedStore.Set("g", keyName(i), "v"))
}
// 6th key should fail.
err = scopedStore.Set("g", "overflow", "v")
require.Error(t, err)
assert.True(t, core.Is(err, QuotaExceededError), "expected QuotaExceededError, got: %v", err)
assert.True(t, core.Is(err, QuotaExceededError))
}
func TestScope_Quota_Bad_QuotaCheckQueryError(t *testing.T) {
@ -508,7 +501,6 @@ func TestScope_Quota_Good_MaxKeys_AcrossGroups(t *testing.T) {
require.NoError(t, scopedStore.Set("g2", "b", "2"))
require.NoError(t, scopedStore.Set("g3", "c", "3"))
// Total is now 3 — any new key should fail regardless of group.
err := scopedStore.Set("g4", "d", "4")
assert.True(t, core.Is(err, QuotaExceededError))
}
@ -522,8 +514,6 @@ func TestScope_Quota_Good_UpsertDoesNotCount(t *testing.T) {
require.NoError(t, scopedStore.Set("g", "a", "1"))
require.NoError(t, scopedStore.Set("g", "b", "2"))
require.NoError(t, scopedStore.Set("g", "c", "3"))
// Upserting existing key should succeed.
require.NoError(t, scopedStore.Set("g", "a", "updated"))
value, err := scopedStore.Get("g", "a")
@ -540,8 +530,6 @@ func TestScope_Quota_Good_DeleteAndReInsert(t *testing.T) {
require.NoError(t, scopedStore.Set("g", "a", "1"))
require.NoError(t, scopedStore.Set("g", "b", "2"))
require.NoError(t, scopedStore.Set("g", "c", "3"))
// Delete one key, then insert a new one — should work.
require.NoError(t, scopedStore.Delete("g", "c"))
require.NoError(t, scopedStore.Set("g", "d", "4"))
}
@ -552,7 +540,6 @@ func TestScope_Quota_Good_ZeroMeansUnlimited(t *testing.T) {
scopedStore, _ := NewScopedWithQuota(storeInstance, "tenant-a", QuotaConfig{MaxKeys: 0, MaxGroups: 0})
// Should be able to insert many keys and groups without error.
for i := range 100 {
require.NoError(t, scopedStore.Set("g", keyName(i), "v"))
}
@ -564,18 +551,15 @@ func TestScope_Quota_Good_ExpiredKeysExcluded(t *testing.T) {
scopedStore, _ := NewScopedWithQuota(storeInstance, "tenant-a", QuotaConfig{MaxKeys: 3})
// Insert 3 keys, 2 with short TTL.
require.NoError(t, scopedStore.SetWithTTL("g", "temp1", "v", 1*time.Millisecond))
require.NoError(t, scopedStore.SetWithTTL("g", "temp2", "v", 1*time.Millisecond))
require.NoError(t, scopedStore.Set("g", "permanent", "v"))
time.Sleep(5 * time.Millisecond)
// After expiry, only 1 key counts — should be able to insert 2 more.
require.NoError(t, scopedStore.Set("g", "new1", "v"))
require.NoError(t, scopedStore.Set("g", "new2", "v"))
// Now at 3 — next should fail.
err := scopedStore.Set("g", "new3", "v")
assert.True(t, core.Is(err, QuotaExceededError))
}
@ -607,7 +591,6 @@ func TestScope_Quota_Good_MaxGroups(t *testing.T) {
require.NoError(t, scopedStore.Set("g2", "k", "v"))
require.NoError(t, scopedStore.Set("g3", "k", "v"))
// 4th group should fail.
err := scopedStore.Set("g4", "k", "v")
require.Error(t, err)
assert.True(t, core.Is(err, QuotaExceededError))
@ -621,8 +604,6 @@ func TestScope_Quota_Good_MaxGroups_ExistingGroupOK(t *testing.T) {
require.NoError(t, scopedStore.Set("g1", "a", "1"))
require.NoError(t, scopedStore.Set("g2", "b", "2"))
// Adding more keys to existing groups should be fine.
require.NoError(t, scopedStore.Set("g1", "c", "3"))
require.NoError(t, scopedStore.Set("g2", "d", "4"))
}
@ -635,8 +616,6 @@ func TestScope_Quota_Good_MaxGroups_DeleteAndRecreate(t *testing.T) {
require.NoError(t, scopedStore.Set("g1", "k", "v"))
require.NoError(t, scopedStore.Set("g2", "k", "v"))
// Delete a group, then create a new one.
require.NoError(t, scopedStore.DeleteGroup("g1"))
require.NoError(t, scopedStore.Set("g3", "k", "v"))
}
@ -658,13 +637,11 @@ func TestScope_Quota_Good_MaxGroups_ExpiredGroupExcluded(t *testing.T) {
scopedStore, _ := NewScopedWithQuota(storeInstance, "tenant-a", QuotaConfig{MaxGroups: 2})
// Create 2 groups, one with only TTL keys.
require.NoError(t, scopedStore.SetWithTTL("g1", "k", "v", 1*time.Millisecond))
require.NoError(t, scopedStore.Set("g2", "k", "v"))
time.Sleep(5 * time.Millisecond)
// g1's only key has expired, so group count should be 1 — we can create a new one.
require.NoError(t, scopedStore.Set("g3", "k", "v"))
}
@ -677,11 +654,9 @@ func TestScope_Quota_Good_BothLimits(t *testing.T) {
require.NoError(t, scopedStore.Set("g1", "a", "1"))
require.NoError(t, scopedStore.Set("g2", "b", "2"))
// Group limit hit.
err := scopedStore.Set("g3", "c", "3")
assert.True(t, core.Is(err, QuotaExceededError))
// But adding to existing groups is fine (within key limit).
require.NoError(t, scopedStore.Set("g1", "d", "4"))
}
@ -697,11 +672,9 @@ func TestScope_Quota_Good_DoesNotAffectOtherNamespaces(t *testing.T) {
require.NoError(t, betaStore.Set("g", "b1", "v"))
require.NoError(t, betaStore.Set("g", "b2", "v"))
// alphaStore is at limit — but betaStore's keys don't count against alphaStore.
err := alphaStore.Set("g", "a3", "v")
assert.True(t, core.Is(err, QuotaExceededError))
// betaStore is also at limit independently.
err = betaStore.Set("g", "b3", "v")
assert.True(t, core.Is(err, QuotaExceededError))
}
@ -732,21 +705,15 @@ func TestScope_CountAll_Good_WithPrefix_Wildcards(t *testing.T) {
storeInstance, _ := New(":memory:")
defer storeInstance.Close()
// Add keys in groups that look like wildcards.
require.NoError(t, storeInstance.Set("user_1", "k", "v"))
require.NoError(t, storeInstance.Set("user_2", "k", "v"))
require.NoError(t, storeInstance.Set("user%test", "k", "v"))
require.NoError(t, storeInstance.Set("user_test", "k", "v"))
// Prefix "user_" should ONLY match groups starting with "user_".
// Since we escape "_", it matches literal "_".
// Groups: "user_1", "user_2", "user_test" (3 total).
// "user%test" is NOT matched because "_" is literal.
count, err := storeInstance.CountAll("user_")
require.NoError(t, err)
assert.Equal(t, 3, count)
// Prefix "user%" should ONLY match "user%test".
count, err = storeInstance.CountAll("user%")
require.NoError(t, err)
assert.Equal(t, 1, count)
@ -764,36 +731,6 @@ func TestScope_CountAll_Good_EmptyPrefix(t *testing.T) {
assert.Equal(t, 2, count)
}
func TestScope_CountAll_Good_ExcludesExpired(t *testing.T) {
storeInstance, _ := New(":memory:")
defer storeInstance.Close()
require.NoError(t, storeInstance.Set("ns:g", "permanent", "v"))
require.NoError(t, storeInstance.SetWithTTL("ns:g", "temp", "v", 1*time.Millisecond))
time.Sleep(5 * time.Millisecond)
count, err := storeInstance.CountAll("ns:")
require.NoError(t, err)
assert.Equal(t, 1, count, "expired keys should not be counted")
}
func TestScope_CountAll_Good_Empty(t *testing.T) {
storeInstance, _ := New(":memory:")
defer storeInstance.Close()
count, err := storeInstance.CountAll("nonexistent:")
require.NoError(t, err)
assert.Equal(t, 0, count)
}
func TestScope_CountAll_Bad_ClosedStore(t *testing.T) {
storeInstance, _ := New(":memory:")
storeInstance.Close()
_, err := storeInstance.CountAll("")
require.Error(t, err)
}
// ---------------------------------------------------------------------------
// Groups
// ---------------------------------------------------------------------------
@ -802,106 +739,57 @@ func TestScope_Groups_Good_WithPrefix(t *testing.T) {
storeInstance, _ := New(":memory:")
defer storeInstance.Close()
require.NoError(t, storeInstance.Set("ns-a:g1", "k", "v"))
require.NoError(t, storeInstance.Set("ns-a:g2", "k", "v"))
require.NoError(t, storeInstance.Set("ns-a:g2", "k2", "v")) // duplicate group
require.NoError(t, storeInstance.Set("ns-b:g1", "k", "v"))
require.NoError(t, storeInstance.Set("ns-a:group-1", "k", "v"))
require.NoError(t, storeInstance.Set("ns-a:group-2", "k", "v"))
require.NoError(t, storeInstance.Set("ns-b:group-1", "k", "v"))
groups, err := storeInstance.Groups("ns-a:")
require.NoError(t, err)
assert.Len(t, groups, 2)
assert.Contains(t, groups, "ns-a:g1")
assert.Contains(t, groups, "ns-a:g2")
assert.Equal(t, []string{"ns-a:group-1", "ns-a:group-2"}, groups)
}
func TestScope_Groups_Good_EmptyPrefix(t *testing.T) {
func TestScope_GroupsSeq_Good_EmptyPrefix(t *testing.T) {
storeInstance, _ := New(":memory:")
defer storeInstance.Close()
require.NoError(t, storeInstance.Set("g1", "k", "v"))
require.NoError(t, storeInstance.Set("g2", "k", "v"))
require.NoError(t, storeInstance.Set("g3", "k", "v"))
require.NoError(t, storeInstance.Set("g1", "k1", "v"))
require.NoError(t, storeInstance.Set("g2", "k2", "v"))
groups, err := storeInstance.Groups("")
require.NoError(t, err)
assert.Len(t, groups, 3)
var groups []string
for groupName, err := range storeInstance.GroupsSeq("") {
require.NoError(t, err)
groups = append(groups, groupName)
}
assert.Equal(t, []string{"g1", "g2"}, groups)
}
func TestScope_Groups_Good_Distinct(t *testing.T) {
func TestScope_GroupsSeq_Good_StopsEarly(t *testing.T) {
storeInstance, _ := New(":memory:")
defer storeInstance.Close()
// Multiple keys in the same group should produce one entry.
require.NoError(t, storeInstance.Set("g1", "a", "v"))
require.NoError(t, storeInstance.Set("g1", "b", "v"))
require.NoError(t, storeInstance.Set("g1", "c", "v"))
require.NoError(t, storeInstance.Set("g1", "k1", "v"))
require.NoError(t, storeInstance.Set("g2", "k2", "v"))
groups, err := storeInstance.Groups("")
require.NoError(t, err)
assert.Len(t, groups, 1)
assert.Equal(t, "g1", groups[0])
count := 0
for range storeInstance.GroupsSeq("") {
count++
break
}
assert.Equal(t, 1, count)
}
func TestScope_Groups_Good_ExcludesExpired(t *testing.T) {
storeInstance, _ := New(":memory:")
defer storeInstance.Close()
require.NoError(t, storeInstance.Set("ns:g1", "permanent", "v"))
require.NoError(t, storeInstance.SetWithTTL("ns:g2", "temp", "v", 1*time.Millisecond))
time.Sleep(5 * time.Millisecond)
groups, err := storeInstance.Groups("ns:")
require.NoError(t, err)
assert.Len(t, groups, 1, "group with only expired keys should be excluded")
assert.Equal(t, "ns:g1", groups[0])
func keyName(index int) string {
return core.Sprintf("key-%02d", index)
}
func TestScope_Groups_Good_SortedByGroupName(t *testing.T) {
storeInstance, _ := New(":memory:")
defer storeInstance.Close()
require.NoError(t, storeInstance.Set("charlie", "c", "3"))
require.NoError(t, storeInstance.Set("alpha", "a", "1"))
require.NoError(t, storeInstance.Set("bravo", "b", "2"))
groups, err := storeInstance.Groups("")
require.NoError(t, err)
assert.Equal(t, []string{"alpha", "bravo", "charlie"}, groups)
}
func TestScope_Groups_Good_Empty(t *testing.T) {
storeInstance, _ := New(":memory:")
defer storeInstance.Close()
groups, err := storeInstance.Groups("nonexistent:")
require.NoError(t, err)
assert.Empty(t, groups)
}
func TestScope_Groups_Bad_ClosedStore(t *testing.T) {
storeInstance, _ := New(":memory:")
storeInstance.Close()
_, err := storeInstance.Groups("")
require.Error(t, err)
}
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
func keyName(i int) string {
return "key-" + string(rune('a'+i%26))
}
func rawEntryCount(t *testing.T, storeInstance *Store, group string) int {
t.Helper()
func rawEntryCount(tb testing.TB, storeInstance *Store, group string) int {
tb.Helper()
var count int
err := storeInstance.database.QueryRow(
"SELECT COUNT(*) FROM "+entriesTableName+" WHERE "+entryGroupColumn+" = ?",
group,
).Scan(&count)
require.NoError(t, err)
require.NoError(tb, err)
return count
}

View file

@ -50,11 +50,10 @@ type Store struct {
journal journalConfig
// Event dispatch state.
watchers []*Watcher
watchers map[string][]chan Event
callbacks []changeCallbackRegistration
watchersLock sync.RWMutex // protects watcher registration and dispatch
callbacksLock sync.RWMutex // protects callback registration and dispatch
nextWatcherRegistrationID uint64 // monotonic ID for watcher registrations
nextCallbackRegistrationID uint64 // monotonic ID for callback registrations
}
@ -93,7 +92,12 @@ func New(databasePath string, options ...StoreOption) (*Store, error) {
}
purgeContext, cancel := context.WithCancel(context.Background())
storeInstance := &Store{database: sqliteDatabase, cancelPurge: cancel, purgeInterval: 60 * time.Second}
storeInstance := &Store{
database: sqliteDatabase,
cancelPurge: cancel,
purgeInterval: 60 * time.Second,
watchers: make(map[string][]chan Event),
}
for _, option := range options {
if option != nil {
option(storeInstance)

View file

@ -1092,11 +1092,11 @@ func TestStore_SetWithTTL_Good_ExpiresOnGetEmitsDeleteEvent(t *testing.T) {
storeInstance, _ := New(":memory:")
defer storeInstance.Close()
watcher := storeInstance.Watch("g", "ephemeral")
defer storeInstance.Unwatch(watcher)
events := storeInstance.Watch("g")
defer storeInstance.Unwatch("g", events)
require.NoError(t, storeInstance.SetWithTTL("g", "ephemeral", "gone-soon", 1*time.Millisecond))
<-watcher.Events
<-events
time.Sleep(5 * time.Millisecond)
@ -1105,7 +1105,7 @@ func TestStore_SetWithTTL_Good_ExpiresOnGetEmitsDeleteEvent(t *testing.T) {
assert.True(t, core.Is(err, NotFoundError), "expired key should be NotFoundError")
select {
case event := <-watcher.Events:
case event := <-events:
assert.Equal(t, EventDelete, event.Type)
assert.Equal(t, "g", event.Group)
assert.Equal(t, "ephemeral", event.Key)

View file

@ -118,8 +118,8 @@ func TestWorkspace_Commit_Good_EmitsSummaryEvent(t *testing.T) {
require.NoError(t, err)
defer storeInstance.Close()
watcher := storeInstance.Watch(workspaceSummaryGroup("scroll-session"), "summary")
defer storeInstance.Unwatch(watcher)
events := storeInstance.Watch(workspaceSummaryGroup("scroll-session"))
defer storeInstance.Unwatch(workspaceSummaryGroup("scroll-session"), events)
workspace, err := storeInstance.NewWorkspace("scroll-session")
require.NoError(t, err)
@ -131,7 +131,7 @@ func TestWorkspace_Commit_Good_EmitsSummaryEvent(t *testing.T) {
require.True(t, result.OK, "workspace commit failed: %v", result.Value)
select {
case event := <-watcher.Events:
case event := <-events:
assert.Equal(t, EventSet, event.Type)
assert.Equal(t, workspaceSummaryGroup("scroll-session"), event.Group)
assert.Equal(t, "summary", event.Key)