go-store/events.go

248 lines
6.5 KiB
Go
Raw Permalink Normal View History

package store
import (
"reflect"
"sync"
"sync/atomic"
"time"
)
// Usage example: `if event.Type == store.EventSet { return }`
type EventType int
const (
// Usage example: `if event.Type == store.EventSet { return }`
EventSet EventType = iota
// Usage example: `if event.Type == store.EventDelete { return }`
EventDelete
// Usage example: `if event.Type == store.EventDeleteGroup { return }`
EventDeleteGroup
)
// Usage example: `label := store.EventDeleteGroup.String()`
func (t EventType) String() string {
switch t {
case EventSet:
return "set"
case EventDelete:
return "delete"
case EventDeleteGroup:
return "delete_group"
default:
return "unknown"
}
}
// Usage example: `event := store.Event{Type: store.EventSet, Group: "config", Key: "colour", Value: "blue"}`
// Usage example: `event := store.Event{Type: store.EventDeleteGroup, Group: "config"}`
type Event struct {
// Usage example: `if event.Type == store.EventDeleteGroup { return }`
Type EventType
// Usage example: `if event.Group == "config" { return }`
Group string
// Usage example: `if event.Key == "colour" { return }`
Key string
// Usage example: `if event.Value == "blue" { return }`
Value string
// Usage example: `if event.Timestamp.IsZero() { return }`
Timestamp time.Time
}
// changeCallbackRegistration keeps the registration ID so unregister can remove
// the exact callback later.
type changeCallbackRegistration struct {
registrationID uint64
callback func(Event)
}
func closedEventChannel() chan Event {
eventChannel := make(chan Event)
close(eventChannel)
return eventChannel
}
// Watch("config") can hold 16 pending events before non-blocking sends start
// dropping new ones.
const watcherEventBufferCapacity = 16
// Usage example: `events := storeInstance.Watch("config")`
// Usage example: `events := storeInstance.Watch("*")`
func (storeInstance *Store) Watch(group string) <-chan Event {
if storeInstance == nil {
return closedEventChannel()
}
storeInstance.lifecycleLock.Lock()
closed := storeInstance.isClosed
storeInstance.lifecycleLock.Unlock()
if closed {
return closedEventChannel()
}
eventChannel := make(chan Event, watcherEventBufferCapacity)
storeInstance.watcherLock.Lock()
defer storeInstance.watcherLock.Unlock()
storeInstance.lifecycleLock.Lock()
closed = storeInstance.isClosed
storeInstance.lifecycleLock.Unlock()
if closed {
return closedEventChannel()
}
if storeInstance.watchers == nil {
storeInstance.watchers = make(map[string][]chan Event)
}
storeInstance.watchers[group] = append(storeInstance.watchers[group], eventChannel)
return eventChannel
}
// Usage example: `storeInstance.Unwatch("config", events)`
func (storeInstance *Store) Unwatch(group string, events <-chan Event) {
if storeInstance == nil || events == nil {
return
}
storeInstance.lifecycleLock.Lock()
closed := storeInstance.isClosed
storeInstance.lifecycleLock.Unlock()
if closed {
return
}
storeInstance.watcherLock.Lock()
defer storeInstance.watcherLock.Unlock()
registeredEvents := storeInstance.watchers[group]
if len(registeredEvents) == 0 {
return
}
eventsPointer := channelPointer(events)
nextRegisteredEvents := registeredEvents[:0]
removed := false
for _, registeredChannel := range registeredEvents {
if channelPointer(registeredChannel) == eventsPointer {
if !removed {
close(registeredChannel)
removed = true
}
continue
}
nextRegisteredEvents = append(nextRegisteredEvents, registeredChannel)
}
if !removed {
return
}
if len(nextRegisteredEvents) == 0 {
delete(storeInstance.watchers, group)
return
}
storeInstance.watchers[group] = nextRegisteredEvents
}
// Usage example: `unregister := storeInstance.OnChange(func(event store.Event) { fmt.Println(event.Group, event.Key, event.Value) })`
func (storeInstance *Store) OnChange(callback func(Event)) func() {
if callback == nil {
return func() {}
}
if storeInstance == nil {
return func() {}
}
storeInstance.lifecycleLock.Lock()
closed := storeInstance.isClosed
storeInstance.lifecycleLock.Unlock()
if closed {
return func() {}
}
registrationID := atomic.AddUint64(&storeInstance.nextCallbackID, 1)
callbackRegistration := changeCallbackRegistration{registrationID: registrationID, callback: callback}
storeInstance.callbackLock.Lock()
defer storeInstance.callbackLock.Unlock()
storeInstance.lifecycleLock.Lock()
closed = storeInstance.isClosed
storeInstance.lifecycleLock.Unlock()
if closed {
return func() {}
}
storeInstance.callbacks = append(storeInstance.callbacks, callbackRegistration)
// Return an idempotent unregister function.
var once sync.Once
return func() {
once.Do(func() {
storeInstance.callbackLock.Lock()
defer storeInstance.callbackLock.Unlock()
for i := range storeInstance.callbacks {
if storeInstance.callbacks[i].registrationID == registrationID {
storeInstance.callbacks = append(storeInstance.callbacks[:i], storeInstance.callbacks[i+1:]...)
return
}
}
})
}
}
// notify(Event{Type: EventSet, Group: "config", Key: "colour", Value: "blue"})
// dispatches matching watchers and callbacks after a successful write. If a
// watcher buffer is full, the event is dropped instead of blocking the writer.
// Callbacks are copied under a separate lock and invoked after the lock is
// released, so they can register or unregister subscriptions without
// deadlocking.
func (storeInstance *Store) notify(event Event) {
if storeInstance == nil {
return
}
if event.Timestamp.IsZero() {
event.Timestamp = time.Now()
}
storeInstance.lifecycleLock.Lock()
closed := storeInstance.isClosed
storeInstance.lifecycleLock.Unlock()
if closed {
return
}
storeInstance.watcherLock.RLock()
storeInstance.lifecycleLock.Lock()
closed = storeInstance.isClosed
storeInstance.lifecycleLock.Unlock()
if closed {
storeInstance.watcherLock.RUnlock()
return
}
for _, registeredChannel := range storeInstance.watchers["*"] {
select {
case registeredChannel <- event:
default:
}
}
for _, registeredChannel := range storeInstance.watchers[event.Group] {
select {
case registeredChannel <- event:
default:
}
}
storeInstance.watcherLock.RUnlock()
storeInstance.callbackLock.RLock()
callbacks := append([]changeCallbackRegistration(nil), storeInstance.callbacks...)
storeInstance.callbackLock.RUnlock()
for _, callback := range callbacks {
callback.callback(event)
}
}
func channelPointer(eventChannel <-chan Event) uintptr {
if eventChannel == nil {
return 0
}
return reflect.ValueOf(eventChannel).Pointer()
}