package store import ( "reflect" "sync" "sync/atomic" "time" ) // Usage example: `if event.Type == store.EventSet { return }` type EventType int const ( // Usage example: `if event.Type == store.EventSet { return }` EventSet EventType = iota // Usage example: `if event.Type == store.EventDelete { return }` EventDelete // Usage example: `if event.Type == store.EventDeleteGroup { return }` EventDeleteGroup ) // Usage example: `label := store.EventDeleteGroup.String()` func (t EventType) String() string { switch t { case EventSet: return "set" case EventDelete: return "delete" case EventDeleteGroup: return "delete_group" default: return "unknown" } } // Usage example: `event := store.Event{Type: store.EventSet, Group: "config", Key: "colour", Value: "blue"}` // Usage example: `event := store.Event{Type: store.EventDeleteGroup, Group: "config"}` type Event struct { // Usage example: `if event.Type == store.EventDeleteGroup { return }` Type EventType // Usage example: `if event.Group == "config" { return }` Group string // Usage example: `if event.Key == "colour" { return }` Key string // Usage example: `if event.Value == "blue" { return }` Value string // Usage example: `if event.Timestamp.IsZero() { return }` Timestamp time.Time } // changeCallbackRegistration keeps the registration ID so unregister can remove // the exact callback later. type changeCallbackRegistration struct { registrationID uint64 callback func(Event) } func closedEventChannel() chan Event { eventChannel := make(chan Event) close(eventChannel) return eventChannel } // Watch("config") can hold 16 pending events before non-blocking sends start // dropping new ones. const watcherEventBufferCapacity = 16 // Usage example: `events := storeInstance.Watch("config")` // Usage example: `events := storeInstance.Watch("*")` func (storeInstance *Store) Watch(group string) <-chan Event { if storeInstance == nil { return closedEventChannel() } storeInstance.lifecycleLock.Lock() closed := storeInstance.isClosed storeInstance.lifecycleLock.Unlock() if closed { return closedEventChannel() } eventChannel := make(chan Event, watcherEventBufferCapacity) storeInstance.watcherLock.Lock() defer storeInstance.watcherLock.Unlock() storeInstance.lifecycleLock.Lock() closed = storeInstance.isClosed storeInstance.lifecycleLock.Unlock() if closed { return closedEventChannel() } if storeInstance.watchers == nil { storeInstance.watchers = make(map[string][]chan Event) } storeInstance.watchers[group] = append(storeInstance.watchers[group], eventChannel) return eventChannel } // Usage example: `storeInstance.Unwatch("config", events)` func (storeInstance *Store) Unwatch(group string, events <-chan Event) { if storeInstance == nil || events == nil { return } storeInstance.lifecycleLock.Lock() closed := storeInstance.isClosed storeInstance.lifecycleLock.Unlock() if closed { return } storeInstance.watcherLock.Lock() defer storeInstance.watcherLock.Unlock() registeredEvents := storeInstance.watchers[group] if len(registeredEvents) == 0 { return } eventsPointer := channelPointer(events) nextRegisteredEvents := registeredEvents[:0] removed := false for _, registeredChannel := range registeredEvents { if channelPointer(registeredChannel) == eventsPointer { if !removed { close(registeredChannel) removed = true } continue } nextRegisteredEvents = append(nextRegisteredEvents, registeredChannel) } if !removed { return } if len(nextRegisteredEvents) == 0 { delete(storeInstance.watchers, group) return } storeInstance.watchers[group] = nextRegisteredEvents } // Usage example: `unregister := storeInstance.OnChange(func(event store.Event) { fmt.Println(event.Group, event.Key, event.Value) })` func (storeInstance *Store) OnChange(callback func(Event)) func() { if callback == nil { return func() {} } if storeInstance == nil { return func() {} } storeInstance.lifecycleLock.Lock() closed := storeInstance.isClosed storeInstance.lifecycleLock.Unlock() if closed { return func() {} } registrationID := atomic.AddUint64(&storeInstance.nextCallbackID, 1) callbackRegistration := changeCallbackRegistration{registrationID: registrationID, callback: callback} storeInstance.callbackLock.Lock() defer storeInstance.callbackLock.Unlock() storeInstance.lifecycleLock.Lock() closed = storeInstance.isClosed storeInstance.lifecycleLock.Unlock() if closed { return func() {} } storeInstance.callbacks = append(storeInstance.callbacks, callbackRegistration) // Return an idempotent unregister function. var once sync.Once return func() { once.Do(func() { storeInstance.callbackLock.Lock() defer storeInstance.callbackLock.Unlock() for i := range storeInstance.callbacks { if storeInstance.callbacks[i].registrationID == registrationID { storeInstance.callbacks = append(storeInstance.callbacks[:i], storeInstance.callbacks[i+1:]...) return } } }) } } // notify(Event{Type: EventSet, Group: "config", Key: "colour", Value: "blue"}) // dispatches matching watchers and callbacks after a successful write. If a // watcher buffer is full, the event is dropped instead of blocking the writer. // Callbacks are copied under a separate lock and invoked after the lock is // released, so they can register or unregister subscriptions without // deadlocking. func (storeInstance *Store) notify(event Event) { if storeInstance == nil { return } if event.Timestamp.IsZero() { event.Timestamp = time.Now() } storeInstance.lifecycleLock.Lock() closed := storeInstance.isClosed storeInstance.lifecycleLock.Unlock() if closed { return } storeInstance.watcherLock.RLock() storeInstance.lifecycleLock.Lock() closed = storeInstance.isClosed storeInstance.lifecycleLock.Unlock() if closed { storeInstance.watcherLock.RUnlock() return } for _, registeredChannel := range storeInstance.watchers["*"] { select { case registeredChannel <- event: default: } } for _, registeredChannel := range storeInstance.watchers[event.Group] { select { case registeredChannel <- event: default: } } storeInstance.watcherLock.RUnlock() storeInstance.callbackLock.RLock() callbacks := append([]changeCallbackRegistration(nil), storeInstance.callbacks...) storeInstance.callbackLock.RUnlock() for _, callback := range callbacks { callback.callback(event) } } func channelPointer(eventChannel <-chan Event) uintptr { if eventChannel == nil { return 0 } return reflect.ValueOf(eventChannel).Pointer() }