go-store/events.go
Virgil e1341ff2d5
Some checks are pending
Security Scan / security (push) Waiting to run
Test / test (push) Waiting to run
refactor(store): align internal lifecycle naming with AX
Use more descriptive private lifecycle, watcher, and orphan cache field names so the implementation reads more directly for agent consumers while preserving the exported API and behaviour.\n\nCo-Authored-By: Virgil <virgil@lethean.io>
2026-04-04 21:09:20 +00:00

247 lines
6.5 KiB
Go

package store
import (
"reflect"
"sync"
"sync/atomic"
"time"
)
// Usage example: `if event.Type == store.EventSet { return }`
type EventType int
const (
// Usage example: `if event.Type == store.EventSet { return }`
EventSet EventType = iota
// Usage example: `if event.Type == store.EventDelete { return }`
EventDelete
// Usage example: `if event.Type == store.EventDeleteGroup { return }`
EventDeleteGroup
)
// Usage example: `label := store.EventDeleteGroup.String()`
func (t EventType) String() string {
switch t {
case EventSet:
return "set"
case EventDelete:
return "delete"
case EventDeleteGroup:
return "delete_group"
default:
return "unknown"
}
}
// Usage example: `event := store.Event{Type: store.EventSet, Group: "config", Key: "colour", Value: "blue"}`
// Usage example: `event := store.Event{Type: store.EventDeleteGroup, Group: "config"}`
type Event struct {
// Usage example: `if event.Type == store.EventDeleteGroup { return }`
Type EventType
// Usage example: `if event.Group == "config" { return }`
Group string
// Usage example: `if event.Key == "colour" { return }`
Key string
// Usage example: `if event.Value == "blue" { return }`
Value string
// Usage example: `if event.Timestamp.IsZero() { return }`
Timestamp time.Time
}
// changeCallbackRegistration keeps the registration ID so unregister can remove
// the exact callback later.
type changeCallbackRegistration struct {
registrationID uint64
callback func(Event)
}
func closedEventChannel() chan Event {
eventChannel := make(chan Event)
close(eventChannel)
return eventChannel
}
// Watch("config") can hold 16 pending events before non-blocking sends start
// dropping new ones.
const watcherEventBufferCapacity = 16
// Usage example: `events := storeInstance.Watch("config")`
// Usage example: `events := storeInstance.Watch("*")`
func (storeInstance *Store) Watch(group string) <-chan Event {
if storeInstance == nil {
return closedEventChannel()
}
storeInstance.lifecycleLock.Lock()
closed := storeInstance.isClosed
storeInstance.lifecycleLock.Unlock()
if closed {
return closedEventChannel()
}
eventChannel := make(chan Event, watcherEventBufferCapacity)
storeInstance.watcherLock.Lock()
defer storeInstance.watcherLock.Unlock()
storeInstance.lifecycleLock.Lock()
closed = storeInstance.isClosed
storeInstance.lifecycleLock.Unlock()
if closed {
return closedEventChannel()
}
if storeInstance.watchers == nil {
storeInstance.watchers = make(map[string][]chan Event)
}
storeInstance.watchers[group] = append(storeInstance.watchers[group], eventChannel)
return eventChannel
}
// Usage example: `storeInstance.Unwatch("config", events)`
func (storeInstance *Store) Unwatch(group string, events <-chan Event) {
if storeInstance == nil || events == nil {
return
}
storeInstance.lifecycleLock.Lock()
closed := storeInstance.isClosed
storeInstance.lifecycleLock.Unlock()
if closed {
return
}
storeInstance.watcherLock.Lock()
defer storeInstance.watcherLock.Unlock()
registeredEvents := storeInstance.watchers[group]
if len(registeredEvents) == 0 {
return
}
eventsPointer := channelPointer(events)
nextRegisteredEvents := registeredEvents[:0]
removed := false
for _, registeredChannel := range registeredEvents {
if channelPointer(registeredChannel) == eventsPointer {
if !removed {
close(registeredChannel)
removed = true
}
continue
}
nextRegisteredEvents = append(nextRegisteredEvents, registeredChannel)
}
if !removed {
return
}
if len(nextRegisteredEvents) == 0 {
delete(storeInstance.watchers, group)
return
}
storeInstance.watchers[group] = nextRegisteredEvents
}
// Usage example: `unregister := storeInstance.OnChange(func(event store.Event) { fmt.Println(event.Group, event.Key, event.Value) })`
func (storeInstance *Store) OnChange(callback func(Event)) func() {
if callback == nil {
return func() {}
}
if storeInstance == nil {
return func() {}
}
storeInstance.lifecycleLock.Lock()
closed := storeInstance.isClosed
storeInstance.lifecycleLock.Unlock()
if closed {
return func() {}
}
registrationID := atomic.AddUint64(&storeInstance.nextCallbackID, 1)
callbackRegistration := changeCallbackRegistration{registrationID: registrationID, callback: callback}
storeInstance.callbackLock.Lock()
defer storeInstance.callbackLock.Unlock()
storeInstance.lifecycleLock.Lock()
closed = storeInstance.isClosed
storeInstance.lifecycleLock.Unlock()
if closed {
return func() {}
}
storeInstance.callbacks = append(storeInstance.callbacks, callbackRegistration)
// Return an idempotent unregister function.
var once sync.Once
return func() {
once.Do(func() {
storeInstance.callbackLock.Lock()
defer storeInstance.callbackLock.Unlock()
for i := range storeInstance.callbacks {
if storeInstance.callbacks[i].registrationID == registrationID {
storeInstance.callbacks = append(storeInstance.callbacks[:i], storeInstance.callbacks[i+1:]...)
return
}
}
})
}
}
// notify(Event{Type: EventSet, Group: "config", Key: "colour", Value: "blue"})
// dispatches matching watchers and callbacks after a successful write. If a
// watcher buffer is full, the event is dropped instead of blocking the writer.
// Callbacks are copied under a separate lock and invoked after the lock is
// released, so they can register or unregister subscriptions without
// deadlocking.
func (storeInstance *Store) notify(event Event) {
if storeInstance == nil {
return
}
if event.Timestamp.IsZero() {
event.Timestamp = time.Now()
}
storeInstance.lifecycleLock.Lock()
closed := storeInstance.isClosed
storeInstance.lifecycleLock.Unlock()
if closed {
return
}
storeInstance.watcherLock.RLock()
storeInstance.lifecycleLock.Lock()
closed = storeInstance.isClosed
storeInstance.lifecycleLock.Unlock()
if closed {
storeInstance.watcherLock.RUnlock()
return
}
for _, registeredChannel := range storeInstance.watchers["*"] {
select {
case registeredChannel <- event:
default:
}
}
for _, registeredChannel := range storeInstance.watchers[event.Group] {
select {
case registeredChannel <- event:
default:
}
}
storeInstance.watcherLock.RUnlock()
storeInstance.callbackLock.RLock()
callbacks := append([]changeCallbackRegistration(nil), storeInstance.callbacks...)
storeInstance.callbackLock.RUnlock()
for _, callback := range callbacks {
callback.callback(event)
}
}
func channelPointer(eventChannel <-chan Event) uintptr {
if eventChannel == nil {
return 0
}
return reflect.ValueOf(eventChannel).Pointer()
}