diff --git a/events.go b/events.go index 845626c..5cf6c37 100644 --- a/events.go +++ b/events.go @@ -72,20 +72,20 @@ func (storeInstance *Store) Watch(group string) <-chan Event { return closedEventChannel() } - storeInstance.closeLock.Lock() - closed := storeInstance.closed - storeInstance.closeLock.Unlock() + storeInstance.lifecycleLock.Lock() + closed := storeInstance.isClosed + storeInstance.lifecycleLock.Unlock() if closed { return closedEventChannel() } eventChannel := make(chan Event, watcherEventBufferCapacity) - storeInstance.watchersLock.Lock() - defer storeInstance.watchersLock.Unlock() - storeInstance.closeLock.Lock() - closed = storeInstance.closed - storeInstance.closeLock.Unlock() + storeInstance.watcherLock.Lock() + defer storeInstance.watcherLock.Unlock() + storeInstance.lifecycleLock.Lock() + closed = storeInstance.isClosed + storeInstance.lifecycleLock.Unlock() if closed { return closedEventChannel() } @@ -103,15 +103,15 @@ func (storeInstance *Store) Unwatch(group string, events <-chan Event) { return } - storeInstance.closeLock.Lock() - closed := storeInstance.closed - storeInstance.closeLock.Unlock() + storeInstance.lifecycleLock.Lock() + closed := storeInstance.isClosed + storeInstance.lifecycleLock.Unlock() if closed { return } - storeInstance.watchersLock.Lock() - defer storeInstance.watchersLock.Unlock() + storeInstance.watcherLock.Lock() + defer storeInstance.watcherLock.Unlock() registeredEvents := storeInstance.watchers[group] if len(registeredEvents) == 0 { @@ -151,21 +151,21 @@ func (storeInstance *Store) OnChange(callback func(Event)) func() { return func() {} } - storeInstance.closeLock.Lock() - closed := storeInstance.closed - storeInstance.closeLock.Unlock() + storeInstance.lifecycleLock.Lock() + closed := storeInstance.isClosed + storeInstance.lifecycleLock.Unlock() if closed { return func() {} } - registrationID := atomic.AddUint64(&storeInstance.nextCallbackRegistrationID, 1) + registrationID := atomic.AddUint64(&storeInstance.nextCallbackID, 1) callbackRegistration := changeCallbackRegistration{registrationID: registrationID, callback: callback} - storeInstance.callbacksLock.Lock() - defer storeInstance.callbacksLock.Unlock() - storeInstance.closeLock.Lock() - closed = storeInstance.closed - storeInstance.closeLock.Unlock() + storeInstance.callbackLock.Lock() + defer storeInstance.callbackLock.Unlock() + storeInstance.lifecycleLock.Lock() + closed = storeInstance.isClosed + storeInstance.lifecycleLock.Unlock() if closed { return func() {} } @@ -175,8 +175,8 @@ func (storeInstance *Store) OnChange(callback func(Event)) func() { var once sync.Once return func() { once.Do(func() { - storeInstance.callbacksLock.Lock() - defer storeInstance.callbacksLock.Unlock() + storeInstance.callbackLock.Lock() + defer storeInstance.callbackLock.Unlock() for i := range storeInstance.callbacks { if storeInstance.callbacks[i].registrationID == registrationID { storeInstance.callbacks = append(storeInstance.callbacks[:i], storeInstance.callbacks[i+1:]...) @@ -201,19 +201,19 @@ func (storeInstance *Store) notify(event Event) { event.Timestamp = time.Now() } - storeInstance.closeLock.Lock() - closed := storeInstance.closed - storeInstance.closeLock.Unlock() + storeInstance.lifecycleLock.Lock() + closed := storeInstance.isClosed + storeInstance.lifecycleLock.Unlock() if closed { return } - storeInstance.watchersLock.RLock() - storeInstance.closeLock.Lock() - closed = storeInstance.closed - storeInstance.closeLock.Unlock() + storeInstance.watcherLock.RLock() + storeInstance.lifecycleLock.Lock() + closed = storeInstance.isClosed + storeInstance.lifecycleLock.Unlock() if closed { - storeInstance.watchersLock.RUnlock() + storeInstance.watcherLock.RUnlock() return } for _, registeredChannel := range storeInstance.watchers["*"] { @@ -228,11 +228,11 @@ func (storeInstance *Store) notify(event Event) { default: } } - storeInstance.watchersLock.RUnlock() + storeInstance.watcherLock.RUnlock() - storeInstance.callbacksLock.RLock() + storeInstance.callbackLock.RLock() callbacks := append([]changeCallbackRegistration(nil), storeInstance.callbacks...) - storeInstance.callbacksLock.RUnlock() + storeInstance.callbackLock.RUnlock() for _, callback := range callbacks { callback.callback(event) diff --git a/scope.go b/scope.go index 9017dee..754ad17 100644 --- a/scope.go +++ b/scope.go @@ -71,8 +71,8 @@ type ScopedStore struct { // Usage example: `scopedStore.MaxGroups = 10` MaxGroups int - watcherLock sync.Mutex - watcherBridges map[uintptr]scopedWatcherBridge + watcherBridgeLock sync.Mutex + watcherBridges map[uintptr]scopedWatcherBridge } type scopedWatcherBridge struct { @@ -321,7 +321,7 @@ func (scopedStore *ScopedStore) Watch(group string) <-chan Event { done := make(chan struct{}) localEventsPointer := channelPointer(localEvents) - scopedStore.watcherLock.Lock() + scopedStore.watcherBridgeLock.Lock() if scopedStore.watcherBridges == nil { scopedStore.watcherBridges = make(map[uintptr]scopedWatcherBridge) } @@ -330,7 +330,7 @@ func (scopedStore *ScopedStore) Watch(group string) <-chan Event { sourceEvents: sourceEvents, done: done, } - scopedStore.watcherLock.Unlock() + scopedStore.watcherBridgeLock.Unlock() go func() { defer close(localEvents) @@ -368,12 +368,12 @@ func (scopedStore *ScopedStore) Unwatch(group string, events <-chan Event) { return } - scopedStore.watcherLock.Lock() + scopedStore.watcherBridgeLock.Lock() watcherBridge, ok := scopedStore.watcherBridges[channelPointer(events)] if ok { delete(scopedStore.watcherBridges, channelPointer(events)) } - scopedStore.watcherLock.Unlock() + scopedStore.watcherBridgeLock.Unlock() if !ok { return @@ -388,9 +388,9 @@ func (scopedStore *ScopedStore) removeWatcherBridge(pointer uintptr) { return } - scopedStore.watcherLock.Lock() + scopedStore.watcherBridgeLock.Lock() delete(scopedStore.watcherBridges, pointer) - scopedStore.watcherLock.Unlock() + scopedStore.watcherBridgeLock.Unlock() } func (scopedStore *ScopedStore) localiseWatchedEvent(event Event) (Event, bool) { diff --git a/store.go b/store.go index da76b82..9f13de3 100644 --- a/store.go +++ b/store.go @@ -141,18 +141,18 @@ type Store struct { purgeWaitGroup sync.WaitGroup purgeInterval time.Duration // interval between background purge cycles journalConfiguration JournalConfiguration - closeLock sync.Mutex - closed bool + lifecycleLock sync.Mutex + isClosed bool // Event dispatch state. - watchers map[string][]chan Event - callbacks []changeCallbackRegistration - watchersLock sync.RWMutex // protects watcher registration and dispatch - callbacksLock sync.RWMutex // protects callback registration and dispatch - nextCallbackRegistrationID uint64 // monotonic ID for callback registrations + watchers map[string][]chan Event + callbacks []changeCallbackRegistration + watcherLock sync.RWMutex // protects watcher registration and dispatch + callbackLock sync.RWMutex // protects callback registration and dispatch + nextCallbackID uint64 // monotonic ID for callback registrations - orphanWorkspacesLock sync.Mutex - orphanWorkspaces []*Workspace + orphanWorkspaceLock sync.Mutex + cachedOrphanWorkspaces []*Workspace } func (storeInstance *Store) ensureReady(operation string) error { @@ -163,9 +163,9 @@ func (storeInstance *Store) ensureReady(operation string) error { return core.E(operation, "store is not initialised", nil) } - storeInstance.closeLock.Lock() - closed := storeInstance.closed - storeInstance.closeLock.Unlock() + storeInstance.lifecycleLock.Lock() + closed := storeInstance.isClosed + storeInstance.lifecycleLock.Unlock() if closed { return core.E(operation, "store is closed", nil) } @@ -250,9 +250,9 @@ func (storeInstance *Store) IsClosed() bool { return true } - storeInstance.closeLock.Lock() - closed := storeInstance.closed - storeInstance.closeLock.Unlock() + storeInstance.lifecycleLock.Lock() + closed := storeInstance.isClosed + storeInstance.lifecycleLock.Unlock() return closed } @@ -294,7 +294,7 @@ func openConfiguredStore(operation string, storeConfig StoreConfig) (*Store, err // New() performs a non-destructive orphan scan so callers can discover // leftover workspaces via RecoverOrphans(). - storeInstance.orphanWorkspaces = discoverOrphanWorkspaces(storeInstance.workspaceStateDirectoryPath(), storeInstance) + storeInstance.cachedOrphanWorkspaces = discoverOrphanWorkspaces(storeInstance.workspaceStateDirectoryPath(), storeInstance) storeInstance.startBackgroundPurge() return storeInstance, nil } @@ -358,41 +358,41 @@ func (storeInstance *Store) Close() error { return nil } - storeInstance.closeLock.Lock() - if storeInstance.closed { - storeInstance.closeLock.Unlock() + storeInstance.lifecycleLock.Lock() + if storeInstance.isClosed { + storeInstance.lifecycleLock.Unlock() return nil } - storeInstance.closed = true - storeInstance.closeLock.Unlock() + storeInstance.isClosed = true + storeInstance.lifecycleLock.Unlock() if storeInstance.cancelPurge != nil { storeInstance.cancelPurge() } storeInstance.purgeWaitGroup.Wait() - storeInstance.watchersLock.Lock() + storeInstance.watcherLock.Lock() for groupName, registeredEvents := range storeInstance.watchers { for _, registeredEventChannel := range registeredEvents { close(registeredEventChannel) } delete(storeInstance.watchers, groupName) } - storeInstance.watchersLock.Unlock() + storeInstance.watcherLock.Unlock() - storeInstance.callbacksLock.Lock() + storeInstance.callbackLock.Lock() storeInstance.callbacks = nil - storeInstance.callbacksLock.Unlock() + storeInstance.callbackLock.Unlock() - storeInstance.orphanWorkspacesLock.Lock() + storeInstance.orphanWorkspaceLock.Lock() var orphanCleanupErr error - for _, orphanWorkspace := range storeInstance.orphanWorkspaces { + for _, orphanWorkspace := range storeInstance.cachedOrphanWorkspaces { if err := orphanWorkspace.closeWithoutRemovingFiles(); err != nil && orphanCleanupErr == nil { orphanCleanupErr = err } } - storeInstance.orphanWorkspaces = nil - storeInstance.orphanWorkspacesLock.Unlock() + storeInstance.cachedOrphanWorkspaces = nil + storeInstance.orphanWorkspaceLock.Unlock() if storeInstance.sqliteDatabase == nil { return orphanCleanupErr diff --git a/workspace.go b/workspace.go index 366350c..3cab19a 100644 --- a/workspace.go +++ b/workspace.go @@ -40,15 +40,15 @@ var defaultWorkspaceStateDirectory = ".core/state/" // // Usage example: `workspace, err := storeInstance.NewWorkspace("scroll-session-2026-03-30"); if err != nil { return }; defer workspace.Discard(); _ = workspace.Put("like", map[string]any{"user": "@alice"})` type Workspace struct { - name string - store *Store - sqliteDatabase *sql.DB - databasePath string - filesystem *core.Fs - orphanAggregate map[string]any + name string + store *Store + sqliteDatabase *sql.DB + databasePath string + filesystem *core.Fs + cachedOrphanAggregate map[string]any - closeLock sync.Mutex - closed bool + lifecycleLock sync.Mutex + isClosed bool } // Usage example: `workspaceName := workspace.Name(); fmt.Println(workspaceName)` @@ -93,9 +93,9 @@ func (workspace *Workspace) ensureReady(operation string) error { return err } - workspace.closeLock.Lock() - closed := workspace.closed - workspace.closeLock.Unlock() + workspace.lifecycleLock.Lock() + closed := workspace.isClosed + workspace.lifecycleLock.Unlock() if closed { return core.E(operation, "workspace is closed", nil) } @@ -203,7 +203,7 @@ func loadRecoveredWorkspaces(stateDirectory string, store *Store) []*Workspace { databasePath: databasePath, filesystem: filesystem, } - orphanWorkspace.orphanAggregate = orphanWorkspace.captureAggregateSnapshot() + orphanWorkspace.cachedOrphanAggregate = orphanWorkspace.captureAggregateSnapshot() orphanWorkspaces = append(orphanWorkspaces, orphanWorkspace) } return orphanWorkspaces @@ -234,10 +234,10 @@ func (storeInstance *Store) RecoverOrphans(stateDirectory string) []*Workspace { stateDirectory = normaliseWorkspaceStateDirectory(stateDirectory) if stateDirectory == storeInstance.workspaceStateDirectoryPath() { - storeInstance.orphanWorkspacesLock.Lock() - cachedWorkspaces := slices.Clone(storeInstance.orphanWorkspaces) - storeInstance.orphanWorkspaces = nil - storeInstance.orphanWorkspacesLock.Unlock() + storeInstance.orphanWorkspaceLock.Lock() + cachedWorkspaces := slices.Clone(storeInstance.cachedOrphanWorkspaces) + storeInstance.cachedOrphanWorkspaces = nil + storeInstance.orphanWorkspaceLock.Unlock() if len(cachedWorkspaces) > 0 { return cachedWorkspaces } @@ -363,14 +363,14 @@ func (workspace *Workspace) captureAggregateSnapshot() map[string]any { } func (workspace *Workspace) aggregateFallback() map[string]any { - if workspace == nil || workspace.orphanAggregate == nil { + if workspace == nil || workspace.cachedOrphanAggregate == nil { return map[string]any{} } - return maps.Clone(workspace.orphanAggregate) + return maps.Clone(workspace.cachedOrphanAggregate) } func (workspace *Workspace) shouldUseOrphanAggregate() bool { - if workspace == nil || workspace.orphanAggregate == nil { + if workspace == nil || workspace.cachedOrphanAggregate == nil { return false } if workspace.filesystem == nil || workspace.databasePath == "" { @@ -423,12 +423,12 @@ func (workspace *Workspace) closeAndCleanup(removeFiles bool) error { return nil } - workspace.closeLock.Lock() - alreadyClosed := workspace.closed + workspace.lifecycleLock.Lock() + alreadyClosed := workspace.isClosed if !alreadyClosed { - workspace.closed = true + workspace.isClosed = true } - workspace.closeLock.Unlock() + workspace.lifecycleLock.Unlock() if !alreadyClosed { if err := workspace.sqliteDatabase.Close(); err != nil {