refactor(store): apply AX naming cleanup
Rename terse locals and callback internals, and update the user-facing examples to use explicit names. Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
parent
37740a8bd9
commit
2bfb5af5e2
8 changed files with 172 additions and 168 deletions
24
CLAUDE.md
24
CLAUDE.md
|
|
@ -47,28 +47,28 @@ defer st.Close()
|
|||
|
||||
st.Set("group", "key", "value") // no expiry
|
||||
st.SetWithTTL("group", "key", "value", 5*time.Minute) // expires after TTL
|
||||
val, _ := st.Get("group", "key") // lazy-deletes expired
|
||||
value, _ := st.Get("group", "key") // lazy-deletes expired
|
||||
st.Delete("group", "key")
|
||||
st.DeleteGroup("group")
|
||||
all, _ := st.GetAll("group") // excludes expired
|
||||
n, _ := st.Count("group") // excludes expired
|
||||
out, _ := st.Render(tmpl, "group") // excludes expired
|
||||
entries, _ := st.GetAll("group") // excludes expired
|
||||
count, _ := st.Count("group") // excludes expired
|
||||
output, _ := st.Render(tmpl, "group") // excludes expired
|
||||
removed, _ := st.PurgeExpired() // manual purge
|
||||
total, _ := st.CountAll("prefix:") // count keys matching prefix (excludes expired)
|
||||
groups, _ := st.Groups("prefix:") // distinct group names matching prefix
|
||||
groupNames, _ := st.Groups("prefix:") // distinct group names matching prefix
|
||||
|
||||
// Namespace isolation (auto-prefixes groups with "tenant:")
|
||||
sc, _ := store.NewScoped(st, "tenant")
|
||||
sc.Set("config", "key", "val") // stored as "tenant:config" in underlying store
|
||||
scopedStore, _ := store.NewScoped(st, "tenant")
|
||||
scopedStore.Set("config", "key", "value") // stored as "tenant:config" in underlying store
|
||||
|
||||
// With quota enforcement
|
||||
sq, _ := store.NewScopedWithQuota(st, "tenant", store.QuotaConfig{MaxKeys: 100, MaxGroups: 10})
|
||||
sq.Set("g", "k", "v") // returns QuotaExceededError if limits hit
|
||||
quotaScopedStore, _ := store.NewScopedWithQuota(st, "tenant", store.QuotaConfig{MaxKeys: 100, MaxGroups: 10})
|
||||
quotaScopedStore.Set("g", "k", "v") // returns QuotaExceededError if limits hit
|
||||
|
||||
// Event hooks
|
||||
w := st.Watch("group", "*") // wildcard: all keys in group ("*","*" for all)
|
||||
defer st.Unwatch(w)
|
||||
e := <-w.Events // buffered chan, cap 16
|
||||
watcher := st.Watch("group", "*") // wildcard: all keys in group ("*","*" for all)
|
||||
defer st.Unwatch(watcher)
|
||||
event := <-watcher.Events // buffered chan, cap 16
|
||||
|
||||
unreg := st.OnChange(func(e store.Event) { /* synchronous in writer goroutine */ })
|
||||
defer unreg()
|
||||
|
|
|
|||
16
README.md
16
README.md
|
|
@ -31,21 +31,21 @@ func main() {
|
|||
|
||||
st.Set("config", "theme", "dark")
|
||||
st.SetWithTTL("session", "token", "abc123", 24*time.Hour)
|
||||
val, err := st.Get("config", "theme")
|
||||
fmt.Println(val, err)
|
||||
value, err := st.Get("config", "theme")
|
||||
fmt.Println(value, err)
|
||||
|
||||
// Watch for mutations
|
||||
w := st.Watch("config", "*")
|
||||
defer st.Unwatch(w)
|
||||
watcher := st.Watch("config", "*")
|
||||
defer st.Unwatch(watcher)
|
||||
go func() {
|
||||
for e := range w.Events {
|
||||
fmt.Println(e.Type, e.Key)
|
||||
for event := range watcher.Events {
|
||||
fmt.Println(event.Type, event.Key)
|
||||
}
|
||||
}()
|
||||
|
||||
// Scoped store for tenant isolation
|
||||
sc, _ := store.NewScoped(st, "tenant-42")
|
||||
sc.Set("prefs", "locale", "en-GB")
|
||||
scopedStore, _ := store.NewScoped(st, "tenant-42")
|
||||
scopedStore.Set("prefs", "locale", "en-GB")
|
||||
}
|
||||
```
|
||||
|
||||
|
|
|
|||
3
doc.go
3
doc.go
|
|
@ -1,6 +1,9 @@
|
|||
// Package store provides a SQLite-backed key-value store with group namespaces,
|
||||
// TTL expiry, quota-enforced scoped views, and reactive change notifications.
|
||||
//
|
||||
// st, _ := store.New(":memory:")
|
||||
// value, _ := st.Get("config", "theme")
|
||||
//
|
||||
// Use New to open a store, then Set/Get for CRUD operations. Use
|
||||
// NewScoped/NewScopedWithQuota when group names need tenant isolation or
|
||||
// per-namespace quotas.
|
||||
|
|
|
|||
|
|
@ -98,7 +98,7 @@ Both return `NotFoundError` if the key does not exist or has expired.
|
|||
|
||||
## Template Rendering
|
||||
|
||||
`Render(tmplStr, group)` is a convenience method that fetches all non-expired key-value pairs from a group and renders a Go `text/template` against them. The template data is a `map[string]string` keyed by the field name.
|
||||
`Render(templateSource, group)` is a convenience method that fetches all non-expired key-value pairs from a group and renders a Go `text/template` against them. The template data is a `map[string]string` keyed by the field name.
|
||||
|
||||
```go
|
||||
st.Set("miner", "pool", "pool.lthn.io:3333")
|
||||
|
|
@ -145,16 +145,16 @@ Events are emitted synchronously after each successful database write inside the
|
|||
| `"mygroup"` | `"*"` | All mutations within the group, including `DeleteGroup` |
|
||||
| `"*"` | `"*"` | Every mutation across the entire store |
|
||||
|
||||
`Unwatch(w)` removes the watcher from the registry and closes its channel. It is safe to call multiple times; subsequent calls are no-ops.
|
||||
`Unwatch(watcher)` removes the watcher from the registry and closes its channel. It is safe to call multiple times; subsequent calls are no-ops.
|
||||
|
||||
**Backpressure.** Event dispatch to a watcher channel is non-blocking: if the channel buffer is full, the event is dropped silently. This prevents a slow consumer from blocking a writer. Applications that cannot afford dropped events should drain the channel promptly or use `OnChange` callbacks instead.
|
||||
|
||||
```go
|
||||
w := st.Watch("config", "*")
|
||||
defer st.Unwatch(w)
|
||||
watcher := st.Watch("config", "*")
|
||||
defer st.Unwatch(watcher)
|
||||
|
||||
for e := range w.Events {
|
||||
fmt.Println(e.Type, e.Group, e.Key, e.Value)
|
||||
for event := range watcher.Events {
|
||||
fmt.Println(event.Type, event.Group, event.Key, event.Value)
|
||||
}
|
||||
```
|
||||
|
||||
|
|
@ -186,8 +186,8 @@ Watcher matching is handled by the `watcherMatches` helper, which checks the gro
|
|||
`ScopedStore` wraps a `*Store` and automatically prefixes all group names with `namespace + ":"`. This prevents key collisions when multiple tenants share a single underlying database.
|
||||
|
||||
```go
|
||||
sc, _ := store.NewScoped(st, "tenant-42")
|
||||
sc.Set("config", "theme", "dark")
|
||||
scopedStore, _ := store.NewScoped(st, "tenant-42")
|
||||
scopedStore.Set("config", "theme", "dark")
|
||||
// Stored in underlying store as group="tenant-42:config", key="theme"
|
||||
```
|
||||
|
||||
|
|
|
|||
|
|
@ -35,8 +35,8 @@ func main() {
|
|||
|
||||
// Basic CRUD
|
||||
st.Set("config", "theme", "dark")
|
||||
val, _ := st.Get("config", "theme")
|
||||
core.Println(val) // "dark"
|
||||
value, _ := st.Get("config", "theme")
|
||||
core.Println(value) // "dark"
|
||||
|
||||
// TTL expiry -- key disappears after the duration elapses
|
||||
st.SetWithTTL("session", "token", "abc123", 24*time.Hour)
|
||||
|
|
@ -52,21 +52,21 @@ func main() {
|
|||
core.Println(out) // "smtp.example.com:587"
|
||||
|
||||
// Namespace isolation for multi-tenant use
|
||||
sc, _ := store.NewScoped(st, "tenant-42")
|
||||
sc.Set("prefs", "locale", "en-GB")
|
||||
scopedStore, _ := store.NewScoped(st, "tenant-42")
|
||||
scopedStore.Set("prefs", "locale", "en-GB")
|
||||
// Stored internally as group "tenant-42:prefs", key "locale"
|
||||
|
||||
// Quota enforcement
|
||||
quota := store.QuotaConfig{MaxKeys: 100, MaxGroups: 5}
|
||||
sq, _ := store.NewScopedWithQuota(st, "tenant-99", quota)
|
||||
err = sq.Set("g", "k", "v") // returns store.QuotaExceededError if limits are hit
|
||||
quotaScopedStore, _ := store.NewScopedWithQuota(st, "tenant-99", quota)
|
||||
err = quotaScopedStore.Set("g", "k", "v") // returns store.QuotaExceededError if limits are hit
|
||||
|
||||
// Watch for mutations via a buffered channel
|
||||
w := st.Watch("config", "*")
|
||||
defer st.Unwatch(w)
|
||||
watcher := st.Watch("config", "*")
|
||||
defer st.Unwatch(watcher)
|
||||
go func() {
|
||||
for e := range w.Events {
|
||||
core.Println("event", e.Type, e.Group, e.Key)
|
||||
for event := range watcher.Events {
|
||||
core.Println("event", event.Type, event.Group, event.Key)
|
||||
}
|
||||
}()
|
||||
|
||||
|
|
|
|||
70
events.go
70
events.go
|
|
@ -40,7 +40,7 @@ func (t EventType) String() string {
|
|||
|
||||
// Event describes a single store mutation. Key is empty for EventDeleteGroup.
|
||||
// Value is only populated for EventSet.
|
||||
// Usage example: `func handle(e store.Event) { _ = e.Group }`
|
||||
// Usage example: `func handle(event store.Event) { _ = event.Group }`
|
||||
type Event struct {
|
||||
Type EventType
|
||||
Group string
|
||||
|
|
@ -57,8 +57,8 @@ type Watcher struct {
|
|||
// Events is the public read-only channel that consumers select on.
|
||||
Events <-chan Event
|
||||
|
||||
// ch is the internal write channel (same underlying channel as Events).
|
||||
ch chan Event
|
||||
// eventsChannel is the internal write channel (same underlying channel as Events).
|
||||
eventsChannel chan Event
|
||||
|
||||
group string
|
||||
key string
|
||||
|
|
@ -67,8 +67,8 @@ type Watcher struct {
|
|||
|
||||
// callbackEntry pairs a change callback with its unique ID for unregistration.
|
||||
type callbackEntry struct {
|
||||
id uint64
|
||||
fn func(Event)
|
||||
id uint64
|
||||
callback func(Event)
|
||||
}
|
||||
|
||||
// watcherBufferSize is the capacity of each watcher's buffered channel.
|
||||
|
|
@ -80,27 +80,27 @@ const watcherBufferSize = 16
|
|||
// channel (cap 16); events are dropped if the consumer falls behind.
|
||||
// Usage example: `watcher := st.Watch("config", "*")`
|
||||
func (s *Store) Watch(group, key string) *Watcher {
|
||||
ch := make(chan Event, watcherBufferSize)
|
||||
w := &Watcher{
|
||||
Events: ch,
|
||||
ch: ch,
|
||||
group: group,
|
||||
key: key,
|
||||
id: atomic.AddUint64(&s.nextID, 1),
|
||||
eventsChannel := make(chan Event, watcherBufferSize)
|
||||
watcher := &Watcher{
|
||||
Events: eventsChannel,
|
||||
eventsChannel: eventsChannel,
|
||||
group: group,
|
||||
key: key,
|
||||
id: atomic.AddUint64(&s.nextRegistrationID, 1),
|
||||
}
|
||||
|
||||
s.mu.Lock()
|
||||
s.watchers = append(s.watchers, w)
|
||||
s.watchers = append(s.watchers, watcher)
|
||||
s.mu.Unlock()
|
||||
|
||||
return w
|
||||
return watcher
|
||||
}
|
||||
|
||||
// Unwatch removes a watcher and closes its channel. Safe to call multiple
|
||||
// times; subsequent calls are no-ops.
|
||||
// Usage example: `st.Unwatch(watcher)`
|
||||
func (s *Store) Unwatch(w *Watcher) {
|
||||
if w == nil {
|
||||
func (s *Store) Unwatch(watcher *Watcher) {
|
||||
if watcher == nil {
|
||||
return
|
||||
}
|
||||
|
||||
|
|
@ -108,8 +108,8 @@ func (s *Store) Unwatch(w *Watcher) {
|
|||
defer s.mu.Unlock()
|
||||
|
||||
s.watchers = slices.DeleteFunc(s.watchers, func(existing *Watcher) bool {
|
||||
if existing.id == w.id {
|
||||
close(w.ch)
|
||||
if existing.id == watcher.id {
|
||||
close(watcher.eventsChannel)
|
||||
return true
|
||||
}
|
||||
return false
|
||||
|
|
@ -120,20 +120,20 @@ func (s *Store) Unwatch(w *Watcher) {
|
|||
// are called synchronously in the goroutine that performed the write, so the
|
||||
// caller controls concurrency. Returns an unregister function; calling it stops
|
||||
// future invocations.
|
||||
// Usage example: `unreg := st.OnChange(func(e store.Event) {})`
|
||||
// Usage example: `unreg := st.OnChange(func(event store.Event) {})`
|
||||
//
|
||||
// This is the integration point for go-ws and similar consumers:
|
||||
//
|
||||
// unreg := store.OnChange(func(e store.Event) {
|
||||
// hub.SendToChannel("store-events", e)
|
||||
// unreg := store.OnChange(func(event store.Event) {
|
||||
// hub.SendToChannel("store-events", event)
|
||||
// })
|
||||
// defer unreg()
|
||||
func (s *Store) OnChange(fn func(Event)) func() {
|
||||
id := atomic.AddUint64(&s.nextID, 1)
|
||||
entry := callbackEntry{id: id, fn: fn}
|
||||
func (s *Store) OnChange(callback func(Event)) func() {
|
||||
registrationID := atomic.AddUint64(&s.nextRegistrationID, 1)
|
||||
registrationRecord := callbackEntry{id: registrationID, callback: callback}
|
||||
|
||||
s.mu.Lock()
|
||||
s.callbacks = append(s.callbacks, entry)
|
||||
s.callbacks = append(s.callbacks, registrationRecord)
|
||||
s.mu.Unlock()
|
||||
|
||||
// Return an idempotent unregister function.
|
||||
|
|
@ -142,8 +142,8 @@ func (s *Store) OnChange(fn func(Event)) func() {
|
|||
once.Do(func() {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
s.callbacks = slices.DeleteFunc(s.callbacks, func(cb callbackEntry) bool {
|
||||
return cb.id == id
|
||||
s.callbacks = slices.DeleteFunc(s.callbacks, func(existing callbackEntry) bool {
|
||||
return existing.id == registrationID
|
||||
})
|
||||
})
|
||||
}
|
||||
|
|
@ -157,28 +157,28 @@ func (s *Store) notify(e Event) {
|
|||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
|
||||
for _, w := range s.watchers {
|
||||
if !watcherMatches(w, e) {
|
||||
for _, watcher := range s.watchers {
|
||||
if !watcherMatches(watcher, e) {
|
||||
continue
|
||||
}
|
||||
// Non-blocking send: drop the event rather than block the writer.
|
||||
select {
|
||||
case w.ch <- e:
|
||||
case watcher.eventsChannel <- e:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
for _, cb := range s.callbacks {
|
||||
cb.fn(e)
|
||||
for _, callback := range s.callbacks {
|
||||
callback.callback(e)
|
||||
}
|
||||
}
|
||||
|
||||
// watcherMatches reports whether a watcher's filter matches the given event.
|
||||
func watcherMatches(w *Watcher, e Event) bool {
|
||||
if w.group != "*" && w.group != e.Group {
|
||||
func watcherMatches(watcher *Watcher, event Event) bool {
|
||||
if watcher.group != "*" && watcher.group != event.Group {
|
||||
return false
|
||||
}
|
||||
if w.key != "*" && w.key != e.Key {
|
||||
if watcher.key != "*" && watcher.key != event.Key {
|
||||
// EventDeleteGroup has an empty Key — only wildcard watchers or
|
||||
// group-level watchers (key="*") should receive it.
|
||||
return false
|
||||
|
|
|
|||
59
scope.go
59
scope.go
|
|
@ -21,7 +21,7 @@ type QuotaConfig struct {
|
|||
|
||||
// ScopedStore wraps a *Store and auto-prefixes all group names with a
|
||||
// namespace to prevent key collisions across tenants.
|
||||
// Usage example: `sc, _ := store.NewScoped(st, "tenant-a")`
|
||||
// Usage example: `scopedStore, _ := store.NewScoped(st, "tenant-a")`
|
||||
type ScopedStore struct {
|
||||
store *Store
|
||||
namespace string
|
||||
|
|
@ -31,25 +31,26 @@ type ScopedStore struct {
|
|||
// NewScoped creates a ScopedStore that prefixes all groups with the given
|
||||
// namespace. The namespace must be non-empty and contain only alphanumeric
|
||||
// characters and hyphens.
|
||||
// Usage example: `sc, _ := store.NewScoped(st, "tenant-a")`
|
||||
// Usage example: `scopedStore, _ := store.NewScoped(st, "tenant-a")`
|
||||
func NewScoped(store *Store, namespace string) (*ScopedStore, error) {
|
||||
if !validNamespace.MatchString(namespace) {
|
||||
return nil, core.E("store.NewScoped", core.Sprintf("namespace %q is invalid (must be non-empty, alphanumeric + hyphens)", namespace), nil)
|
||||
}
|
||||
return &ScopedStore{store: store, namespace: namespace}, nil
|
||||
scopedStore := &ScopedStore{store: store, namespace: namespace}
|
||||
return scopedStore, nil
|
||||
}
|
||||
|
||||
// NewScopedWithQuota creates a ScopedStore with quota enforcement. Quotas are
|
||||
// checked on Set and SetWithTTL before inserting new keys or creating new
|
||||
// groups.
|
||||
// Usage example: `sc, _ := store.NewScopedWithQuota(st, "tenant-a", quota)`
|
||||
// Usage example: `scopedStore, _ := store.NewScopedWithQuota(st, "tenant-a", quota)`
|
||||
func NewScopedWithQuota(store *Store, namespace string, quota QuotaConfig) (*ScopedStore, error) {
|
||||
s, err := NewScoped(store, namespace)
|
||||
scopedStore, err := NewScoped(store, namespace)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
s.quota = quota
|
||||
return s, nil
|
||||
scopedStore.quota = quota
|
||||
return scopedStore, nil
|
||||
}
|
||||
|
||||
// namespacedGroup returns the group name with the namespace prefix applied.
|
||||
|
|
@ -58,20 +59,20 @@ func (s *ScopedStore) namespacedGroup(group string) string {
|
|||
}
|
||||
|
||||
// Namespace returns the namespace string for this scoped store.
|
||||
// Usage example: `name := sc.Namespace()`
|
||||
// Usage example: `namespace := scopedStore.Namespace()`
|
||||
func (s *ScopedStore) Namespace() string {
|
||||
return s.namespace
|
||||
}
|
||||
|
||||
// Get retrieves a value by group and key within the namespace.
|
||||
// Usage example: `value, err := sc.Get("config", "theme")`
|
||||
// Usage example: `value, err := scopedStore.Get("config", "theme")`
|
||||
func (s *ScopedStore) Get(group, key string) (string, error) {
|
||||
return s.store.Get(s.namespacedGroup(group), key)
|
||||
}
|
||||
|
||||
// Set stores a value by group and key within the namespace. If quotas are
|
||||
// configured, they are checked before inserting new keys or groups.
|
||||
// Usage example: `err := sc.Set("config", "theme", "dark")`
|
||||
// Usage example: `err := scopedStore.Set("config", "theme", "dark")`
|
||||
func (s *ScopedStore) Set(group, key, value string) error {
|
||||
if err := s.checkQuota(group, key); err != nil {
|
||||
return err
|
||||
|
|
@ -81,7 +82,7 @@ func (s *ScopedStore) Set(group, key, value string) error {
|
|||
|
||||
// SetWithTTL stores a value with a time-to-live within the namespace. Quota
|
||||
// checks are applied for new keys and groups.
|
||||
// Usage example: `err := sc.SetWithTTL("sessions", "token", "abc", time.Hour)`
|
||||
// Usage example: `err := scopedStore.SetWithTTL("sessions", "token", "abc", time.Hour)`
|
||||
func (s *ScopedStore) SetWithTTL(group, key, value string, ttl time.Duration) error {
|
||||
if err := s.checkQuota(group, key); err != nil {
|
||||
return err
|
||||
|
|
@ -90,42 +91,42 @@ func (s *ScopedStore) SetWithTTL(group, key, value string, ttl time.Duration) er
|
|||
}
|
||||
|
||||
// Delete removes a single key from a group within the namespace.
|
||||
// Usage example: `err := sc.Delete("config", "theme")`
|
||||
// Usage example: `err := scopedStore.Delete("config", "theme")`
|
||||
func (s *ScopedStore) Delete(group, key string) error {
|
||||
return s.store.Delete(s.namespacedGroup(group), key)
|
||||
}
|
||||
|
||||
// DeleteGroup removes all keys in a group within the namespace.
|
||||
// Usage example: `err := sc.DeleteGroup("cache")`
|
||||
// Usage example: `err := scopedStore.DeleteGroup("cache")`
|
||||
func (s *ScopedStore) DeleteGroup(group string) error {
|
||||
return s.store.DeleteGroup(s.namespacedGroup(group))
|
||||
}
|
||||
|
||||
// GetAll returns all non-expired key-value pairs in a group within the
|
||||
// namespace.
|
||||
// Usage example: `all, err := sc.GetAll("config")`
|
||||
// Usage example: `entries, err := scopedStore.GetAll("config")`
|
||||
func (s *ScopedStore) GetAll(group string) (map[string]string, error) {
|
||||
return s.store.GetAll(s.namespacedGroup(group))
|
||||
}
|
||||
|
||||
// All returns an iterator over all non-expired key-value pairs in a group
|
||||
// within the namespace.
|
||||
// Usage example: `for item, err := range sc.All("config") { _ = item; _ = err }`
|
||||
// Usage example: `for entry, err := range scopedStore.All("config") { _ = entry; _ = err }`
|
||||
func (s *ScopedStore) All(group string) iter.Seq2[KeyValue, error] {
|
||||
return s.store.All(s.namespacedGroup(group))
|
||||
}
|
||||
|
||||
// Count returns the number of non-expired keys in a group within the namespace.
|
||||
// Usage example: `n, err := sc.Count("config")`
|
||||
// Usage example: `count, err := scopedStore.Count("config")`
|
||||
func (s *ScopedStore) Count(group string) (int, error) {
|
||||
return s.store.Count(s.namespacedGroup(group))
|
||||
}
|
||||
|
||||
// Render loads all non-expired key-value pairs from a namespaced group and
|
||||
// renders a Go template.
|
||||
// Usage example: `out, err := sc.Render("Hello {{ .name }}", "user")`
|
||||
func (s *ScopedStore) Render(tmplStr, group string) (string, error) {
|
||||
return s.store.Render(tmplStr, s.namespacedGroup(group))
|
||||
// Usage example: `output, err := scopedStore.Render("Hello {{ .name }}", "user")`
|
||||
func (s *ScopedStore) Render(templateSource, group string) (string, error) {
|
||||
return s.store.Render(templateSource, s.namespacedGroup(group))
|
||||
}
|
||||
|
||||
// checkQuota verifies that inserting key into group would not exceed the
|
||||
|
|
@ -152,31 +153,31 @@ func (s *ScopedStore) checkQuota(group, key string) error {
|
|||
|
||||
// Check MaxKeys quota.
|
||||
if s.quota.MaxKeys > 0 {
|
||||
count, err := s.store.CountAll(namespacePrefix)
|
||||
keyCount, err := s.store.CountAll(namespacePrefix)
|
||||
if err != nil {
|
||||
return core.E("store.ScopedStore", "quota check", err)
|
||||
}
|
||||
if count >= s.quota.MaxKeys {
|
||||
if keyCount >= s.quota.MaxKeys {
|
||||
return core.E("store.ScopedStore", core.Sprintf("key limit (%d)", s.quota.MaxKeys), QuotaExceededError)
|
||||
}
|
||||
}
|
||||
|
||||
// Check MaxGroups quota — only if this would create a new group.
|
||||
if s.quota.MaxGroups > 0 {
|
||||
groupCount, err := s.store.Count(namespacedGroup)
|
||||
existingGroupCount, err := s.store.Count(namespacedGroup)
|
||||
if err != nil {
|
||||
return core.E("store.ScopedStore", "quota check", err)
|
||||
}
|
||||
if groupCount == 0 {
|
||||
if existingGroupCount == 0 {
|
||||
// This group is new — check if adding it would exceed the group limit.
|
||||
count := 0
|
||||
for _, err := range s.store.GroupsSeq(namespacePrefix) {
|
||||
if err != nil {
|
||||
return core.E("store.ScopedStore", "quota check", err)
|
||||
knownGroupCount := 0
|
||||
for _, iterationErr := range s.store.GroupsSeq(namespacePrefix) {
|
||||
if iterationErr != nil {
|
||||
return core.E("store.ScopedStore", "quota check", iterationErr)
|
||||
}
|
||||
count++
|
||||
knownGroupCount++
|
||||
}
|
||||
if count >= s.quota.MaxGroups {
|
||||
if knownGroupCount >= s.quota.MaxGroups {
|
||||
return core.E("store.ScopedStore", core.Sprintf("group limit (%d)", s.quota.MaxGroups), QuotaExceededError)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
132
store.go
132
store.go
|
|
@ -30,10 +30,10 @@ type Store struct {
|
|||
purgeInterval time.Duration // interval between background purge cycles
|
||||
|
||||
// Event dispatch state.
|
||||
watchers []*Watcher
|
||||
callbacks []callbackEntry
|
||||
mu sync.RWMutex // protects watchers and callbacks
|
||||
nextID uint64 // monotonic ID for watchers and callbacks
|
||||
watchers []*Watcher
|
||||
callbacks []callbackEntry
|
||||
mu sync.RWMutex // protects watchers and callbacks
|
||||
nextRegistrationID uint64 // monotonic ID for watchers and callbacks
|
||||
}
|
||||
|
||||
// New creates a Store at the given SQLite path. Use ":memory:" for tests.
|
||||
|
|
@ -89,16 +89,16 @@ func (s *Store) Close() error {
|
|||
return s.db.Close()
|
||||
}
|
||||
|
||||
// Get retrieves a value by group and key. Expired keys are lazily deleted and
|
||||
// treated as not found.
|
||||
// Get returns the live value for a group/key pair. Expired keys are lazily
|
||||
// deleted and treated as not found.
|
||||
// Usage example: `value, err := st.Get("config", "theme")`
|
||||
func (s *Store) Get(group, key string) (string, error) {
|
||||
var val string
|
||||
var value string
|
||||
var expiresAt sql.NullInt64
|
||||
err := s.db.QueryRow(
|
||||
"SELECT value, expires_at FROM kv WHERE grp = ? AND key = ?",
|
||||
group, key,
|
||||
).Scan(&val, &expiresAt)
|
||||
).Scan(&value, &expiresAt)
|
||||
if err == sql.ErrNoRows {
|
||||
return "", core.E("store.Get", core.Concat(group, "/", key), NotFoundError)
|
||||
}
|
||||
|
|
@ -109,11 +109,11 @@ func (s *Store) Get(group, key string) (string, error) {
|
|||
_, _ = s.db.Exec("DELETE FROM kv WHERE grp = ? AND key = ?", group, key)
|
||||
return "", core.E("store.Get", core.Concat(group, "/", key), NotFoundError)
|
||||
}
|
||||
return val, nil
|
||||
return value, nil
|
||||
}
|
||||
|
||||
// Set stores a value by group and key, overwriting if exists. The key has no
|
||||
// expiry (it persists until explicitly deleted).
|
||||
// Set stores a value by group and key, overwriting any existing row and
|
||||
// clearing its expiry.
|
||||
// Usage example: `err := st.Set("config", "theme", "dark")`
|
||||
func (s *Store) Set(group, key, value string) error {
|
||||
_, err := s.db.Exec(
|
||||
|
|
@ -128,9 +128,9 @@ func (s *Store) Set(group, key, value string) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// SetWithTTL stores a value that expires after the given duration. After expiry
|
||||
// the key is lazily removed on the next Get and periodically by a background
|
||||
// purge goroutine.
|
||||
// SetWithTTL stores a value that expires after the given duration. After
|
||||
// expiry, the key is lazily removed on the next Get and periodically by the
|
||||
// background purge goroutine.
|
||||
// Usage example: `err := st.SetWithTTL("session", "token", "abc", time.Hour)`
|
||||
func (s *Store) SetWithTTL(group, key, value string, ttl time.Duration) error {
|
||||
expiresAt := time.Now().Add(ttl).UnixMilli()
|
||||
|
|
@ -157,18 +157,18 @@ func (s *Store) Delete(group, key string) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// Count returns the number of non-expired keys in a group.
|
||||
// Usage example: `n, err := st.Count("config")`
|
||||
// Count returns the number of live keys in a group.
|
||||
// Usage example: `count, err := st.Count("config")`
|
||||
func (s *Store) Count(group string) (int, error) {
|
||||
var n int
|
||||
var count int
|
||||
err := s.db.QueryRow(
|
||||
"SELECT COUNT(*) FROM kv WHERE grp = ? AND (expires_at IS NULL OR expires_at > ?)",
|
||||
group, time.Now().UnixMilli(),
|
||||
).Scan(&n)
|
||||
).Scan(&count)
|
||||
if err != nil {
|
||||
return 0, core.E("store.Count", "query", err)
|
||||
}
|
||||
return n, nil
|
||||
return count, nil
|
||||
}
|
||||
|
||||
// DeleteGroup removes all keys in a group.
|
||||
|
|
@ -183,26 +183,26 @@ func (s *Store) DeleteGroup(group string) error {
|
|||
}
|
||||
|
||||
// KeyValue represents a key-value pair.
|
||||
// Usage example: `for item, err := range st.All("config") { _ = item }`
|
||||
// Usage example: `for entry, err := range st.All("config") { _ = entry }`
|
||||
type KeyValue struct {
|
||||
Key, Value string
|
||||
}
|
||||
|
||||
// GetAll returns all non-expired key-value pairs in a group.
|
||||
// Usage example: `all, err := st.GetAll("config")`
|
||||
// Usage example: `entries, err := st.GetAll("config")`
|
||||
func (s *Store) GetAll(group string) (map[string]string, error) {
|
||||
result := make(map[string]string)
|
||||
for item, err := range s.All(group) {
|
||||
entriesByKey := make(map[string]string)
|
||||
for entry, err := range s.All(group) {
|
||||
if err != nil {
|
||||
return nil, core.E("store.GetAll", "iterate", err)
|
||||
}
|
||||
result[item.Key] = item.Value
|
||||
entriesByKey[entry.Key] = entry.Value
|
||||
}
|
||||
return result, nil
|
||||
return entriesByKey, nil
|
||||
}
|
||||
|
||||
// All returns an iterator over all non-expired key-value pairs in a group.
|
||||
// Usage example: `for item, err := range st.All("config") { _ = item; _ = err }`
|
||||
// Usage example: `for entry, err := range st.All("config") { _ = entry; _ = err }`
|
||||
func (s *Store) All(group string) iter.Seq2[KeyValue, error] {
|
||||
return func(yield func(KeyValue, error) bool) {
|
||||
rows, err := s.db.Query(
|
||||
|
|
@ -216,14 +216,14 @@ func (s *Store) All(group string) iter.Seq2[KeyValue, error] {
|
|||
defer rows.Close()
|
||||
|
||||
for rows.Next() {
|
||||
var item KeyValue
|
||||
if err := rows.Scan(&item.Key, &item.Value); err != nil {
|
||||
var entry KeyValue
|
||||
if err := rows.Scan(&entry.Key, &entry.Value); err != nil {
|
||||
if !yield(KeyValue{}, core.E("store.All", "scan", err)) {
|
||||
return
|
||||
}
|
||||
continue
|
||||
}
|
||||
if !yield(item, nil) {
|
||||
if !yield(entry, nil) {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
|
@ -234,96 +234,96 @@ func (s *Store) All(group string) iter.Seq2[KeyValue, error] {
|
|||
}
|
||||
|
||||
// GetSplit retrieves a value and returns an iterator over its parts, split by
|
||||
// sep.
|
||||
// separator.
|
||||
// Usage example: `parts, _ := st.GetSplit("config", "hosts", ",")`
|
||||
func (s *Store) GetSplit(group, key, sep string) (iter.Seq[string], error) {
|
||||
val, err := s.Get(group, key)
|
||||
func (s *Store) GetSplit(group, key, separator string) (iter.Seq[string], error) {
|
||||
value, err := s.Get(group, key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return splitSeq(val, sep), nil
|
||||
return splitSeq(value, separator), nil
|
||||
}
|
||||
|
||||
// GetFields retrieves a value and returns an iterator over its parts, split by
|
||||
// whitespace.
|
||||
// Usage example: `fields, _ := st.GetFields("config", "flags")`
|
||||
func (s *Store) GetFields(group, key string) (iter.Seq[string], error) {
|
||||
val, err := s.Get(group, key)
|
||||
value, err := s.Get(group, key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return fieldsSeq(val), nil
|
||||
return fieldsSeq(value), nil
|
||||
}
|
||||
|
||||
// Render loads all non-expired key-value pairs from a group and renders a Go
|
||||
// template.
|
||||
// Usage example: `out, err := st.Render("Hello {{ .name }}", "user")`
|
||||
func (s *Store) Render(tmplStr, group string) (string, error) {
|
||||
vars := make(map[string]string)
|
||||
for item, err := range s.All(group) {
|
||||
func (s *Store) Render(templateSource, group string) (string, error) {
|
||||
templateData := make(map[string]string)
|
||||
for entry, err := range s.All(group) {
|
||||
if err != nil {
|
||||
return "", core.E("store.Render", "iterate", err)
|
||||
}
|
||||
vars[item.Key] = item.Value
|
||||
templateData[entry.Key] = entry.Value
|
||||
}
|
||||
|
||||
tmpl, err := template.New("render").Parse(tmplStr)
|
||||
tmpl, err := template.New("render").Parse(templateSource)
|
||||
if err != nil {
|
||||
return "", core.E("store.Render", "parse", err)
|
||||
}
|
||||
b := core.NewBuilder()
|
||||
if err := tmpl.Execute(b, vars); err != nil {
|
||||
builder := core.NewBuilder()
|
||||
if err := tmpl.Execute(builder, templateData); err != nil {
|
||||
return "", core.E("store.Render", "exec", err)
|
||||
}
|
||||
return b.String(), nil
|
||||
return builder.String(), nil
|
||||
}
|
||||
|
||||
// CountAll returns the total number of non-expired keys across all groups whose
|
||||
// name starts with the given prefix. Pass an empty string to count everything.
|
||||
// Usage example: `n, err := st.CountAll("tenant-a:")`
|
||||
func (s *Store) CountAll(prefix string) (int, error) {
|
||||
var n int
|
||||
// Usage example: `count, err := st.CountAll("tenant-a:")`
|
||||
func (s *Store) CountAll(groupPrefix string) (int, error) {
|
||||
var count int
|
||||
var err error
|
||||
if prefix == "" {
|
||||
if groupPrefix == "" {
|
||||
err = s.db.QueryRow(
|
||||
"SELECT COUNT(*) FROM kv WHERE (expires_at IS NULL OR expires_at > ?)",
|
||||
time.Now().UnixMilli(),
|
||||
).Scan(&n)
|
||||
).Scan(&count)
|
||||
} else {
|
||||
err = s.db.QueryRow(
|
||||
"SELECT COUNT(*) FROM kv WHERE grp LIKE ? ESCAPE '^' AND (expires_at IS NULL OR expires_at > ?)",
|
||||
escapeLike(prefix)+"%", time.Now().UnixMilli(),
|
||||
).Scan(&n)
|
||||
escapeLike(groupPrefix)+"%", time.Now().UnixMilli(),
|
||||
).Scan(&count)
|
||||
}
|
||||
if err != nil {
|
||||
return 0, core.E("store.CountAll", "query", err)
|
||||
}
|
||||
return n, nil
|
||||
return count, nil
|
||||
}
|
||||
|
||||
// Groups returns the distinct group names of all non-expired keys. If prefix is
|
||||
// non-empty, only groups starting with that prefix are returned.
|
||||
// Usage example: `groups, err := st.Groups("tenant-a:")`
|
||||
func (s *Store) Groups(prefix string) ([]string, error) {
|
||||
var groups []string
|
||||
for g, err := range s.GroupsSeq(prefix) {
|
||||
// Usage example: `groupNames, err := st.Groups("tenant-a:")`
|
||||
func (s *Store) Groups(groupPrefix string) ([]string, error) {
|
||||
var groupNames []string
|
||||
for groupName, err := range s.GroupsSeq(groupPrefix) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
groups = append(groups, g)
|
||||
groupNames = append(groupNames, groupName)
|
||||
}
|
||||
return groups, nil
|
||||
return groupNames, nil
|
||||
}
|
||||
|
||||
// GroupsSeq returns an iterator over the distinct group names of all
|
||||
// non-expired keys.
|
||||
// Usage example: `for group, err := range st.GroupsSeq("tenant-a:") { _ = group }`
|
||||
func (s *Store) GroupsSeq(prefix string) iter.Seq2[string, error] {
|
||||
// Usage example: `for groupName, err := range st.GroupsSeq("tenant-a:") { _ = groupName }`
|
||||
func (s *Store) GroupsSeq(groupPrefix string) iter.Seq2[string, error] {
|
||||
return func(yield func(string, error) bool) {
|
||||
var rows *sql.Rows
|
||||
var err error
|
||||
now := time.Now().UnixMilli()
|
||||
if prefix == "" {
|
||||
if groupPrefix == "" {
|
||||
rows, err = s.db.Query(
|
||||
"SELECT DISTINCT grp FROM kv WHERE (expires_at IS NULL OR expires_at > ?)",
|
||||
now,
|
||||
|
|
@ -331,7 +331,7 @@ func (s *Store) GroupsSeq(prefix string) iter.Seq2[string, error] {
|
|||
} else {
|
||||
rows, err = s.db.Query(
|
||||
"SELECT DISTINCT grp FROM kv WHERE grp LIKE ? ESCAPE '^' AND (expires_at IS NULL OR expires_at > ?)",
|
||||
escapeLike(prefix)+"%", now,
|
||||
escapeLike(groupPrefix)+"%", now,
|
||||
)
|
||||
}
|
||||
if err != nil {
|
||||
|
|
@ -341,14 +341,14 @@ func (s *Store) GroupsSeq(prefix string) iter.Seq2[string, error] {
|
|||
defer rows.Close()
|
||||
|
||||
for rows.Next() {
|
||||
var g string
|
||||
if err := rows.Scan(&g); err != nil {
|
||||
var groupName string
|
||||
if err := rows.Scan(&groupName); err != nil {
|
||||
if !yield("", core.E("store.Groups", "scan", err)) {
|
||||
return
|
||||
}
|
||||
continue
|
||||
}
|
||||
if !yield(g, nil) {
|
||||
if !yield(groupName, nil) {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
|
@ -370,12 +370,12 @@ func escapeLike(s string) string {
|
|||
// of rows removed.
|
||||
// Usage example: `removed, err := st.PurgeExpired()`
|
||||
func (s *Store) PurgeExpired() (int64, error) {
|
||||
res, err := s.db.Exec("DELETE FROM kv WHERE expires_at IS NOT NULL AND expires_at <= ?",
|
||||
result, err := s.db.Exec("DELETE FROM kv WHERE expires_at IS NOT NULL AND expires_at <= ?",
|
||||
time.Now().UnixMilli())
|
||||
if err != nil {
|
||||
return 0, core.E("store.PurgeExpired", "exec", err)
|
||||
}
|
||||
return res.RowsAffected()
|
||||
return result.RowsAffected()
|
||||
}
|
||||
|
||||
// startPurge launches a background goroutine that purges expired entries at the
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue