diff --git a/events.go b/events.go index 7e1438a..80aca8a 100644 --- a/events.go +++ b/events.go @@ -82,11 +82,17 @@ func (storeInstance *Store) Watch(group string) <-chan Event { eventChannel := make(chan Event, watcherEventBufferCapacity) storeInstance.watchersLock.Lock() + defer storeInstance.watchersLock.Unlock() + storeInstance.closeLock.Lock() + closed = storeInstance.closed + storeInstance.closeLock.Unlock() + if closed { + return closedEventChannel() + } if storeInstance.watchers == nil { storeInstance.watchers = make(map[string][]chan Event) } storeInstance.watchers[group] = append(storeInstance.watchers[group], eventChannel) - storeInstance.watchersLock.Unlock() return eventChannel } @@ -156,8 +162,14 @@ func (storeInstance *Store) OnChange(callback func(Event)) func() { callbackRegistration := changeCallbackRegistration{registrationID: registrationID, callback: callback} storeInstance.callbacksLock.Lock() + defer storeInstance.callbacksLock.Unlock() + storeInstance.closeLock.Lock() + closed = storeInstance.closed + storeInstance.closeLock.Unlock() + if closed { + return func() {} + } storeInstance.callbacks = append(storeInstance.callbacks, callbackRegistration) - storeInstance.callbacksLock.Unlock() // Return an idempotent unregister function. var once sync.Once @@ -194,6 +206,13 @@ func (storeInstance *Store) notify(event Event) { } storeInstance.watchersLock.RLock() + storeInstance.closeLock.Lock() + closed = storeInstance.closed + storeInstance.closeLock.Unlock() + if closed { + storeInstance.watchersLock.RUnlock() + return + } for _, registeredChannel := range storeInstance.watchers["*"] { select { case registeredChannel <- event: