2026-02-20 08:25:03 +00:00
|
|
|
package store
|
|
|
|
|
|
|
|
|
|
import (
|
2026-04-03 04:44:45 +00:00
|
|
|
"reflect"
|
2026-02-20 08:25:03 +00:00
|
|
|
"sync"
|
|
|
|
|
"sync/atomic"
|
|
|
|
|
"time"
|
|
|
|
|
)
|
|
|
|
|
|
2026-03-26 19:17:11 +00:00
|
|
|
// Usage example: `if event.Type == store.EventSet { return }`
|
2026-02-20 08:25:03 +00:00
|
|
|
type EventType int
|
|
|
|
|
|
|
|
|
|
const (
|
2026-03-26 19:17:11 +00:00
|
|
|
// Usage example: `if event.Type == store.EventSet { return }`
|
2026-02-20 08:25:03 +00:00
|
|
|
EventSet EventType = iota
|
2026-03-26 19:17:11 +00:00
|
|
|
// Usage example: `if event.Type == store.EventDelete { return }`
|
2026-02-20 08:25:03 +00:00
|
|
|
EventDelete
|
2026-03-26 19:17:11 +00:00
|
|
|
// Usage example: `if event.Type == store.EventDeleteGroup { return }`
|
2026-02-20 08:25:03 +00:00
|
|
|
EventDeleteGroup
|
|
|
|
|
)
|
|
|
|
|
|
2026-03-30 15:55:04 +00:00
|
|
|
// Usage example: `label := store.EventDeleteGroup.String()`
|
2026-02-20 08:25:03 +00:00
|
|
|
func (t EventType) String() string {
|
|
|
|
|
switch t {
|
|
|
|
|
case EventSet:
|
|
|
|
|
return "set"
|
|
|
|
|
case EventDelete:
|
|
|
|
|
return "delete"
|
|
|
|
|
case EventDeleteGroup:
|
|
|
|
|
return "delete_group"
|
|
|
|
|
default:
|
|
|
|
|
return "unknown"
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2026-03-30 18:49:17 +00:00
|
|
|
// Usage example: `event := store.Event{Type: store.EventSet, Group: "config", Key: "colour", Value: "blue"}`
|
2026-03-30 15:55:04 +00:00
|
|
|
// Usage example: `event := store.Event{Type: store.EventDeleteGroup, Group: "config"}`
|
2026-02-20 08:25:03 +00:00
|
|
|
type Event struct {
|
2026-03-30 16:50:04 +00:00
|
|
|
// Usage example: `if event.Type == store.EventDeleteGroup { return }`
|
|
|
|
|
Type EventType
|
|
|
|
|
// Usage example: `if event.Group == "config" { return }`
|
|
|
|
|
Group string
|
2026-03-30 18:49:17 +00:00
|
|
|
// Usage example: `if event.Key == "colour" { return }`
|
2026-03-30 16:50:04 +00:00
|
|
|
Key string
|
2026-03-30 18:49:17 +00:00
|
|
|
// Usage example: `if event.Value == "blue" { return }`
|
2026-03-30 16:50:04 +00:00
|
|
|
Value string
|
|
|
|
|
// Usage example: `if event.Timestamp.IsZero() { return }`
|
2026-02-20 08:25:03 +00:00
|
|
|
Timestamp time.Time
|
|
|
|
|
}
|
|
|
|
|
|
2026-03-30 19:18:20 +00:00
|
|
|
// changeCallbackRegistration keeps the registration ID so unregister can remove
|
|
|
|
|
// the exact callback later.
|
2026-03-30 16:07:53 +00:00
|
|
|
type changeCallbackRegistration struct {
|
2026-03-30 19:00:51 +00:00
|
|
|
registrationID uint64
|
|
|
|
|
callback func(Event)
|
2026-02-20 08:25:03 +00:00
|
|
|
}
|
|
|
|
|
|
2026-04-03 06:31:35 +00:00
|
|
|
func closedEventChannel() chan Event {
|
|
|
|
|
eventChannel := make(chan Event)
|
|
|
|
|
close(eventChannel)
|
|
|
|
|
return eventChannel
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-03 04:44:45 +00:00
|
|
|
// Watch("config") can hold 16 pending events before non-blocking sends start
|
|
|
|
|
// dropping new ones.
|
2026-03-30 16:07:53 +00:00
|
|
|
const watcherEventBufferCapacity = 16
|
2026-02-20 08:25:03 +00:00
|
|
|
|
2026-04-03 04:44:45 +00:00
|
|
|
// Usage example: `events := storeInstance.Watch("config")`
|
|
|
|
|
// Usage example: `events := storeInstance.Watch("*")`
|
|
|
|
|
func (storeInstance *Store) Watch(group string) <-chan Event {
|
2026-04-03 06:31:35 +00:00
|
|
|
if storeInstance == nil {
|
|
|
|
|
return closedEventChannel()
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-04 21:09:20 +00:00
|
|
|
storeInstance.lifecycleLock.Lock()
|
|
|
|
|
closed := storeInstance.isClosed
|
|
|
|
|
storeInstance.lifecycleLock.Unlock()
|
2026-04-03 06:31:35 +00:00
|
|
|
if closed {
|
|
|
|
|
return closedEventChannel()
|
|
|
|
|
}
|
|
|
|
|
|
2026-03-30 16:07:53 +00:00
|
|
|
eventChannel := make(chan Event, watcherEventBufferCapacity)
|
2026-02-20 08:25:03 +00:00
|
|
|
|
2026-04-04 21:09:20 +00:00
|
|
|
storeInstance.watcherLock.Lock()
|
|
|
|
|
defer storeInstance.watcherLock.Unlock()
|
|
|
|
|
storeInstance.lifecycleLock.Lock()
|
|
|
|
|
closed = storeInstance.isClosed
|
|
|
|
|
storeInstance.lifecycleLock.Unlock()
|
2026-04-03 08:11:24 +00:00
|
|
|
if closed {
|
|
|
|
|
return closedEventChannel()
|
|
|
|
|
}
|
2026-04-03 04:44:45 +00:00
|
|
|
if storeInstance.watchers == nil {
|
|
|
|
|
storeInstance.watchers = make(map[string][]chan Event)
|
|
|
|
|
}
|
|
|
|
|
storeInstance.watchers[group] = append(storeInstance.watchers[group], eventChannel)
|
2026-02-20 08:25:03 +00:00
|
|
|
|
2026-04-03 04:44:45 +00:00
|
|
|
return eventChannel
|
2026-02-20 08:25:03 +00:00
|
|
|
}
|
|
|
|
|
|
2026-04-03 04:44:45 +00:00
|
|
|
// Usage example: `storeInstance.Unwatch("config", events)`
|
|
|
|
|
func (storeInstance *Store) Unwatch(group string, events <-chan Event) {
|
2026-04-03 06:31:35 +00:00
|
|
|
if storeInstance == nil || events == nil {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-04 21:09:20 +00:00
|
|
|
storeInstance.lifecycleLock.Lock()
|
|
|
|
|
closed := storeInstance.isClosed
|
|
|
|
|
storeInstance.lifecycleLock.Unlock()
|
2026-04-03 06:31:35 +00:00
|
|
|
if closed {
|
2026-03-30 14:22:49 +00:00
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-04 21:09:20 +00:00
|
|
|
storeInstance.watcherLock.Lock()
|
|
|
|
|
defer storeInstance.watcherLock.Unlock()
|
2026-02-20 08:25:03 +00:00
|
|
|
|
2026-04-03 04:44:45 +00:00
|
|
|
registeredEvents := storeInstance.watchers[group]
|
|
|
|
|
if len(registeredEvents) == 0 {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
eventsPointer := channelPointer(events)
|
|
|
|
|
nextRegisteredEvents := registeredEvents[:0]
|
|
|
|
|
removed := false
|
|
|
|
|
for _, registeredChannel := range registeredEvents {
|
|
|
|
|
if channelPointer(registeredChannel) == eventsPointer {
|
|
|
|
|
if !removed {
|
|
|
|
|
close(registeredChannel)
|
|
|
|
|
removed = true
|
|
|
|
|
}
|
|
|
|
|
continue
|
2026-02-20 08:25:03 +00:00
|
|
|
}
|
2026-04-03 04:44:45 +00:00
|
|
|
nextRegisteredEvents = append(nextRegisteredEvents, registeredChannel)
|
|
|
|
|
}
|
|
|
|
|
if !removed {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
if len(nextRegisteredEvents) == 0 {
|
|
|
|
|
delete(storeInstance.watchers, group)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
storeInstance.watchers[group] = nextRegisteredEvents
|
2026-02-20 08:25:03 +00:00
|
|
|
}
|
|
|
|
|
|
2026-04-03 05:13:46 +00:00
|
|
|
// Usage example: `unregister := storeInstance.OnChange(func(event store.Event) { fmt.Println(event.Group, event.Key, event.Value) })`
|
2026-04-03 05:25:42 +00:00
|
|
|
func (storeInstance *Store) OnChange(callback func(Event)) func() {
|
2026-03-30 19:39:54 +00:00
|
|
|
if callback == nil {
|
|
|
|
|
return func() {}
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-03 06:31:35 +00:00
|
|
|
if storeInstance == nil {
|
|
|
|
|
return func() {}
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-04 21:09:20 +00:00
|
|
|
storeInstance.lifecycleLock.Lock()
|
|
|
|
|
closed := storeInstance.isClosed
|
|
|
|
|
storeInstance.lifecycleLock.Unlock()
|
2026-04-03 06:31:35 +00:00
|
|
|
if closed {
|
|
|
|
|
return func() {}
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-04 21:09:20 +00:00
|
|
|
registrationID := atomic.AddUint64(&storeInstance.nextCallbackID, 1)
|
2026-04-03 05:25:42 +00:00
|
|
|
callbackRegistration := changeCallbackRegistration{registrationID: registrationID, callback: callback}
|
2026-02-20 08:25:03 +00:00
|
|
|
|
2026-04-04 21:09:20 +00:00
|
|
|
storeInstance.callbackLock.Lock()
|
|
|
|
|
defer storeInstance.callbackLock.Unlock()
|
|
|
|
|
storeInstance.lifecycleLock.Lock()
|
|
|
|
|
closed = storeInstance.isClosed
|
|
|
|
|
storeInstance.lifecycleLock.Unlock()
|
2026-04-03 08:11:24 +00:00
|
|
|
if closed {
|
|
|
|
|
return func() {}
|
|
|
|
|
}
|
2026-03-30 16:07:53 +00:00
|
|
|
storeInstance.callbacks = append(storeInstance.callbacks, callbackRegistration)
|
2026-02-20 08:25:03 +00:00
|
|
|
|
|
|
|
|
// Return an idempotent unregister function.
|
|
|
|
|
var once sync.Once
|
|
|
|
|
return func() {
|
|
|
|
|
once.Do(func() {
|
2026-04-04 21:09:20 +00:00
|
|
|
storeInstance.callbackLock.Lock()
|
|
|
|
|
defer storeInstance.callbackLock.Unlock()
|
2026-04-03 04:44:45 +00:00
|
|
|
for i := range storeInstance.callbacks {
|
|
|
|
|
if storeInstance.callbacks[i].registrationID == registrationID {
|
|
|
|
|
storeInstance.callbacks = append(storeInstance.callbacks[:i], storeInstance.callbacks[i+1:]...)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
}
|
2026-02-20 08:25:03 +00:00
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2026-03-30 18:49:17 +00:00
|
|
|
// notify(Event{Type: EventSet, Group: "config", Key: "colour", Value: "blue"})
|
2026-03-30 17:37:50 +00:00
|
|
|
// dispatches matching watchers and callbacks after a successful write. If a
|
|
|
|
|
// watcher buffer is full, the event is dropped instead of blocking the writer.
|
|
|
|
|
// Callbacks are copied under a separate lock and invoked after the lock is
|
|
|
|
|
// released, so they can register or unregister subscriptions without
|
|
|
|
|
// deadlocking.
|
2026-03-30 15:02:28 +00:00
|
|
|
func (storeInstance *Store) notify(event Event) {
|
2026-04-03 06:31:35 +00:00
|
|
|
if storeInstance == nil {
|
|
|
|
|
return
|
|
|
|
|
}
|
2026-04-04 09:16:52 +00:00
|
|
|
if event.Timestamp.IsZero() {
|
|
|
|
|
event.Timestamp = time.Now()
|
|
|
|
|
}
|
2026-04-03 06:31:35 +00:00
|
|
|
|
2026-04-04 21:09:20 +00:00
|
|
|
storeInstance.lifecycleLock.Lock()
|
|
|
|
|
closed := storeInstance.isClosed
|
|
|
|
|
storeInstance.lifecycleLock.Unlock()
|
2026-04-03 06:31:35 +00:00
|
|
|
if closed {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-04 21:09:20 +00:00
|
|
|
storeInstance.watcherLock.RLock()
|
|
|
|
|
storeInstance.lifecycleLock.Lock()
|
|
|
|
|
closed = storeInstance.isClosed
|
|
|
|
|
storeInstance.lifecycleLock.Unlock()
|
2026-04-03 08:11:24 +00:00
|
|
|
if closed {
|
2026-04-04 21:09:20 +00:00
|
|
|
storeInstance.watcherLock.RUnlock()
|
2026-04-03 08:11:24 +00:00
|
|
|
return
|
|
|
|
|
}
|
2026-04-03 04:44:45 +00:00
|
|
|
for _, registeredChannel := range storeInstance.watchers["*"] {
|
|
|
|
|
select {
|
|
|
|
|
case registeredChannel <- event:
|
|
|
|
|
default:
|
2026-02-20 08:25:03 +00:00
|
|
|
}
|
2026-04-03 04:44:45 +00:00
|
|
|
}
|
|
|
|
|
for _, registeredChannel := range storeInstance.watchers[event.Group] {
|
2026-02-20 08:25:03 +00:00
|
|
|
select {
|
2026-04-03 04:44:45 +00:00
|
|
|
case registeredChannel <- event:
|
2026-02-20 08:25:03 +00:00
|
|
|
default:
|
|
|
|
|
}
|
|
|
|
|
}
|
2026-04-04 21:09:20 +00:00
|
|
|
storeInstance.watcherLock.RUnlock()
|
2026-03-30 15:22:33 +00:00
|
|
|
|
2026-04-04 21:09:20 +00:00
|
|
|
storeInstance.callbackLock.RLock()
|
2026-03-30 16:07:53 +00:00
|
|
|
callbacks := append([]changeCallbackRegistration(nil), storeInstance.callbacks...)
|
2026-04-04 21:09:20 +00:00
|
|
|
storeInstance.callbackLock.RUnlock()
|
2026-02-20 08:25:03 +00:00
|
|
|
|
2026-03-30 15:22:33 +00:00
|
|
|
for _, callback := range callbacks {
|
2026-03-30 15:02:28 +00:00
|
|
|
callback.callback(event)
|
2026-02-20 08:25:03 +00:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-03 04:44:45 +00:00
|
|
|
func channelPointer(eventChannel <-chan Event) uintptr {
|
|
|
|
|
if eventChannel == nil {
|
|
|
|
|
return 0
|
2026-02-20 08:25:03 +00:00
|
|
|
}
|
2026-04-03 04:44:45 +00:00
|
|
|
return reflect.ValueOf(eventChannel).Pointer()
|
2026-02-20 08:25:03 +00:00
|
|
|
}
|