From aae444ac3b8176563de4f4487ba392ed54505ed1 Mon Sep 17 00:00:00 2001 From: Virgil Date: Fri, 3 Apr 2026 08:11:24 +0000 Subject: [PATCH] fix(store): harden event shutdown Co-Authored-By: Virgil --- events.go | 23 +++++++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/events.go b/events.go index 7e1438a..80aca8a 100644 --- a/events.go +++ b/events.go @@ -82,11 +82,17 @@ func (storeInstance *Store) Watch(group string) <-chan Event { eventChannel := make(chan Event, watcherEventBufferCapacity) storeInstance.watchersLock.Lock() + defer storeInstance.watchersLock.Unlock() + storeInstance.closeLock.Lock() + closed = storeInstance.closed + storeInstance.closeLock.Unlock() + if closed { + return closedEventChannel() + } if storeInstance.watchers == nil { storeInstance.watchers = make(map[string][]chan Event) } storeInstance.watchers[group] = append(storeInstance.watchers[group], eventChannel) - storeInstance.watchersLock.Unlock() return eventChannel } @@ -156,8 +162,14 @@ func (storeInstance *Store) OnChange(callback func(Event)) func() { callbackRegistration := changeCallbackRegistration{registrationID: registrationID, callback: callback} storeInstance.callbacksLock.Lock() + defer storeInstance.callbacksLock.Unlock() + storeInstance.closeLock.Lock() + closed = storeInstance.closed + storeInstance.closeLock.Unlock() + if closed { + return func() {} + } storeInstance.callbacks = append(storeInstance.callbacks, callbackRegistration) - storeInstance.callbacksLock.Unlock() // Return an idempotent unregister function. var once sync.Once @@ -194,6 +206,13 @@ func (storeInstance *Store) notify(event Event) { } storeInstance.watchersLock.RLock() + storeInstance.closeLock.Lock() + closed = storeInstance.closed + storeInstance.closeLock.Unlock() + if closed { + storeInstance.watchersLock.RUnlock() + return + } for _, registeredChannel := range storeInstance.watchers["*"] { select { case registeredChannel <- event: