[agent/codex:gpt-5.4-mini] Read docs/RFC-STORE.md and docs/specs/core/go/RFC.md fully. ... #179

Merged
Virgil merged 1 commit from agent/read-docs-rfc-store-md-and-docs-specs-co into dev 2026-04-04 21:09:33 +00:00
4 changed files with 95 additions and 95 deletions

View file

@ -72,20 +72,20 @@ func (storeInstance *Store) Watch(group string) <-chan Event {
return closedEventChannel()
}
storeInstance.closeLock.Lock()
closed := storeInstance.closed
storeInstance.closeLock.Unlock()
storeInstance.lifecycleLock.Lock()
closed := storeInstance.isClosed
storeInstance.lifecycleLock.Unlock()
if closed {
return closedEventChannel()
}
eventChannel := make(chan Event, watcherEventBufferCapacity)
storeInstance.watchersLock.Lock()
defer storeInstance.watchersLock.Unlock()
storeInstance.closeLock.Lock()
closed = storeInstance.closed
storeInstance.closeLock.Unlock()
storeInstance.watcherLock.Lock()
defer storeInstance.watcherLock.Unlock()
storeInstance.lifecycleLock.Lock()
closed = storeInstance.isClosed
storeInstance.lifecycleLock.Unlock()
if closed {
return closedEventChannel()
}
@ -103,15 +103,15 @@ func (storeInstance *Store) Unwatch(group string, events <-chan Event) {
return
}
storeInstance.closeLock.Lock()
closed := storeInstance.closed
storeInstance.closeLock.Unlock()
storeInstance.lifecycleLock.Lock()
closed := storeInstance.isClosed
storeInstance.lifecycleLock.Unlock()
if closed {
return
}
storeInstance.watchersLock.Lock()
defer storeInstance.watchersLock.Unlock()
storeInstance.watcherLock.Lock()
defer storeInstance.watcherLock.Unlock()
registeredEvents := storeInstance.watchers[group]
if len(registeredEvents) == 0 {
@ -151,21 +151,21 @@ func (storeInstance *Store) OnChange(callback func(Event)) func() {
return func() {}
}
storeInstance.closeLock.Lock()
closed := storeInstance.closed
storeInstance.closeLock.Unlock()
storeInstance.lifecycleLock.Lock()
closed := storeInstance.isClosed
storeInstance.lifecycleLock.Unlock()
if closed {
return func() {}
}
registrationID := atomic.AddUint64(&storeInstance.nextCallbackRegistrationID, 1)
registrationID := atomic.AddUint64(&storeInstance.nextCallbackID, 1)
callbackRegistration := changeCallbackRegistration{registrationID: registrationID, callback: callback}
storeInstance.callbacksLock.Lock()
defer storeInstance.callbacksLock.Unlock()
storeInstance.closeLock.Lock()
closed = storeInstance.closed
storeInstance.closeLock.Unlock()
storeInstance.callbackLock.Lock()
defer storeInstance.callbackLock.Unlock()
storeInstance.lifecycleLock.Lock()
closed = storeInstance.isClosed
storeInstance.lifecycleLock.Unlock()
if closed {
return func() {}
}
@ -175,8 +175,8 @@ func (storeInstance *Store) OnChange(callback func(Event)) func() {
var once sync.Once
return func() {
once.Do(func() {
storeInstance.callbacksLock.Lock()
defer storeInstance.callbacksLock.Unlock()
storeInstance.callbackLock.Lock()
defer storeInstance.callbackLock.Unlock()
for i := range storeInstance.callbacks {
if storeInstance.callbacks[i].registrationID == registrationID {
storeInstance.callbacks = append(storeInstance.callbacks[:i], storeInstance.callbacks[i+1:]...)
@ -201,19 +201,19 @@ func (storeInstance *Store) notify(event Event) {
event.Timestamp = time.Now()
}
storeInstance.closeLock.Lock()
closed := storeInstance.closed
storeInstance.closeLock.Unlock()
storeInstance.lifecycleLock.Lock()
closed := storeInstance.isClosed
storeInstance.lifecycleLock.Unlock()
if closed {
return
}
storeInstance.watchersLock.RLock()
storeInstance.closeLock.Lock()
closed = storeInstance.closed
storeInstance.closeLock.Unlock()
storeInstance.watcherLock.RLock()
storeInstance.lifecycleLock.Lock()
closed = storeInstance.isClosed
storeInstance.lifecycleLock.Unlock()
if closed {
storeInstance.watchersLock.RUnlock()
storeInstance.watcherLock.RUnlock()
return
}
for _, registeredChannel := range storeInstance.watchers["*"] {
@ -228,11 +228,11 @@ func (storeInstance *Store) notify(event Event) {
default:
}
}
storeInstance.watchersLock.RUnlock()
storeInstance.watcherLock.RUnlock()
storeInstance.callbacksLock.RLock()
storeInstance.callbackLock.RLock()
callbacks := append([]changeCallbackRegistration(nil), storeInstance.callbacks...)
storeInstance.callbacksLock.RUnlock()
storeInstance.callbackLock.RUnlock()
for _, callback := range callbacks {
callback.callback(event)

View file

@ -71,8 +71,8 @@ type ScopedStore struct {
// Usage example: `scopedStore.MaxGroups = 10`
MaxGroups int
watcherLock sync.Mutex
watcherBridges map[uintptr]scopedWatcherBridge
watcherBridgeLock sync.Mutex
watcherBridges map[uintptr]scopedWatcherBridge
}
type scopedWatcherBridge struct {
@ -321,7 +321,7 @@ func (scopedStore *ScopedStore) Watch(group string) <-chan Event {
done := make(chan struct{})
localEventsPointer := channelPointer(localEvents)
scopedStore.watcherLock.Lock()
scopedStore.watcherBridgeLock.Lock()
if scopedStore.watcherBridges == nil {
scopedStore.watcherBridges = make(map[uintptr]scopedWatcherBridge)
}
@ -330,7 +330,7 @@ func (scopedStore *ScopedStore) Watch(group string) <-chan Event {
sourceEvents: sourceEvents,
done: done,
}
scopedStore.watcherLock.Unlock()
scopedStore.watcherBridgeLock.Unlock()
go func() {
defer close(localEvents)
@ -368,12 +368,12 @@ func (scopedStore *ScopedStore) Unwatch(group string, events <-chan Event) {
return
}
scopedStore.watcherLock.Lock()
scopedStore.watcherBridgeLock.Lock()
watcherBridge, ok := scopedStore.watcherBridges[channelPointer(events)]
if ok {
delete(scopedStore.watcherBridges, channelPointer(events))
}
scopedStore.watcherLock.Unlock()
scopedStore.watcherBridgeLock.Unlock()
if !ok {
return
@ -388,9 +388,9 @@ func (scopedStore *ScopedStore) removeWatcherBridge(pointer uintptr) {
return
}
scopedStore.watcherLock.Lock()
scopedStore.watcherBridgeLock.Lock()
delete(scopedStore.watcherBridges, pointer)
scopedStore.watcherLock.Unlock()
scopedStore.watcherBridgeLock.Unlock()
}
func (scopedStore *ScopedStore) localiseWatchedEvent(event Event) (Event, bool) {

View file

@ -141,18 +141,18 @@ type Store struct {
purgeWaitGroup sync.WaitGroup
purgeInterval time.Duration // interval between background purge cycles
journalConfiguration JournalConfiguration
closeLock sync.Mutex
closed bool
lifecycleLock sync.Mutex
isClosed bool
// Event dispatch state.
watchers map[string][]chan Event
callbacks []changeCallbackRegistration
watchersLock sync.RWMutex // protects watcher registration and dispatch
callbacksLock sync.RWMutex // protects callback registration and dispatch
nextCallbackRegistrationID uint64 // monotonic ID for callback registrations
watchers map[string][]chan Event
callbacks []changeCallbackRegistration
watcherLock sync.RWMutex // protects watcher registration and dispatch
callbackLock sync.RWMutex // protects callback registration and dispatch
nextCallbackID uint64 // monotonic ID for callback registrations
orphanWorkspacesLock sync.Mutex
orphanWorkspaces []*Workspace
orphanWorkspaceLock sync.Mutex
cachedOrphanWorkspaces []*Workspace
}
func (storeInstance *Store) ensureReady(operation string) error {
@ -163,9 +163,9 @@ func (storeInstance *Store) ensureReady(operation string) error {
return core.E(operation, "store is not initialised", nil)
}
storeInstance.closeLock.Lock()
closed := storeInstance.closed
storeInstance.closeLock.Unlock()
storeInstance.lifecycleLock.Lock()
closed := storeInstance.isClosed
storeInstance.lifecycleLock.Unlock()
if closed {
return core.E(operation, "store is closed", nil)
}
@ -250,9 +250,9 @@ func (storeInstance *Store) IsClosed() bool {
return true
}
storeInstance.closeLock.Lock()
closed := storeInstance.closed
storeInstance.closeLock.Unlock()
storeInstance.lifecycleLock.Lock()
closed := storeInstance.isClosed
storeInstance.lifecycleLock.Unlock()
return closed
}
@ -294,7 +294,7 @@ func openConfiguredStore(operation string, storeConfig StoreConfig) (*Store, err
// New() performs a non-destructive orphan scan so callers can discover
// leftover workspaces via RecoverOrphans().
storeInstance.orphanWorkspaces = discoverOrphanWorkspaces(storeInstance.workspaceStateDirectoryPath(), storeInstance)
storeInstance.cachedOrphanWorkspaces = discoverOrphanWorkspaces(storeInstance.workspaceStateDirectoryPath(), storeInstance)
storeInstance.startBackgroundPurge()
return storeInstance, nil
}
@ -358,41 +358,41 @@ func (storeInstance *Store) Close() error {
return nil
}
storeInstance.closeLock.Lock()
if storeInstance.closed {
storeInstance.closeLock.Unlock()
storeInstance.lifecycleLock.Lock()
if storeInstance.isClosed {
storeInstance.lifecycleLock.Unlock()
return nil
}
storeInstance.closed = true
storeInstance.closeLock.Unlock()
storeInstance.isClosed = true
storeInstance.lifecycleLock.Unlock()
if storeInstance.cancelPurge != nil {
storeInstance.cancelPurge()
}
storeInstance.purgeWaitGroup.Wait()
storeInstance.watchersLock.Lock()
storeInstance.watcherLock.Lock()
for groupName, registeredEvents := range storeInstance.watchers {
for _, registeredEventChannel := range registeredEvents {
close(registeredEventChannel)
}
delete(storeInstance.watchers, groupName)
}
storeInstance.watchersLock.Unlock()
storeInstance.watcherLock.Unlock()
storeInstance.callbacksLock.Lock()
storeInstance.callbackLock.Lock()
storeInstance.callbacks = nil
storeInstance.callbacksLock.Unlock()
storeInstance.callbackLock.Unlock()
storeInstance.orphanWorkspacesLock.Lock()
storeInstance.orphanWorkspaceLock.Lock()
var orphanCleanupErr error
for _, orphanWorkspace := range storeInstance.orphanWorkspaces {
for _, orphanWorkspace := range storeInstance.cachedOrphanWorkspaces {
if err := orphanWorkspace.closeWithoutRemovingFiles(); err != nil && orphanCleanupErr == nil {
orphanCleanupErr = err
}
}
storeInstance.orphanWorkspaces = nil
storeInstance.orphanWorkspacesLock.Unlock()
storeInstance.cachedOrphanWorkspaces = nil
storeInstance.orphanWorkspaceLock.Unlock()
if storeInstance.sqliteDatabase == nil {
return orphanCleanupErr

View file

@ -40,15 +40,15 @@ var defaultWorkspaceStateDirectory = ".core/state/"
//
// Usage example: `workspace, err := storeInstance.NewWorkspace("scroll-session-2026-03-30"); if err != nil { return }; defer workspace.Discard(); _ = workspace.Put("like", map[string]any{"user": "@alice"})`
type Workspace struct {
name string
store *Store
sqliteDatabase *sql.DB
databasePath string
filesystem *core.Fs
orphanAggregate map[string]any
name string
store *Store
sqliteDatabase *sql.DB
databasePath string
filesystem *core.Fs
cachedOrphanAggregate map[string]any
closeLock sync.Mutex
closed bool
lifecycleLock sync.Mutex
isClosed bool
}
// Usage example: `workspaceName := workspace.Name(); fmt.Println(workspaceName)`
@ -93,9 +93,9 @@ func (workspace *Workspace) ensureReady(operation string) error {
return err
}
workspace.closeLock.Lock()
closed := workspace.closed
workspace.closeLock.Unlock()
workspace.lifecycleLock.Lock()
closed := workspace.isClosed
workspace.lifecycleLock.Unlock()
if closed {
return core.E(operation, "workspace is closed", nil)
}
@ -203,7 +203,7 @@ func loadRecoveredWorkspaces(stateDirectory string, store *Store) []*Workspace {
databasePath: databasePath,
filesystem: filesystem,
}
orphanWorkspace.orphanAggregate = orphanWorkspace.captureAggregateSnapshot()
orphanWorkspace.cachedOrphanAggregate = orphanWorkspace.captureAggregateSnapshot()
orphanWorkspaces = append(orphanWorkspaces, orphanWorkspace)
}
return orphanWorkspaces
@ -234,10 +234,10 @@ func (storeInstance *Store) RecoverOrphans(stateDirectory string) []*Workspace {
stateDirectory = normaliseWorkspaceStateDirectory(stateDirectory)
if stateDirectory == storeInstance.workspaceStateDirectoryPath() {
storeInstance.orphanWorkspacesLock.Lock()
cachedWorkspaces := slices.Clone(storeInstance.orphanWorkspaces)
storeInstance.orphanWorkspaces = nil
storeInstance.orphanWorkspacesLock.Unlock()
storeInstance.orphanWorkspaceLock.Lock()
cachedWorkspaces := slices.Clone(storeInstance.cachedOrphanWorkspaces)
storeInstance.cachedOrphanWorkspaces = nil
storeInstance.orphanWorkspaceLock.Unlock()
if len(cachedWorkspaces) > 0 {
return cachedWorkspaces
}
@ -363,14 +363,14 @@ func (workspace *Workspace) captureAggregateSnapshot() map[string]any {
}
func (workspace *Workspace) aggregateFallback() map[string]any {
if workspace == nil || workspace.orphanAggregate == nil {
if workspace == nil || workspace.cachedOrphanAggregate == nil {
return map[string]any{}
}
return maps.Clone(workspace.orphanAggregate)
return maps.Clone(workspace.cachedOrphanAggregate)
}
func (workspace *Workspace) shouldUseOrphanAggregate() bool {
if workspace == nil || workspace.orphanAggregate == nil {
if workspace == nil || workspace.cachedOrphanAggregate == nil {
return false
}
if workspace.filesystem == nil || workspace.databasePath == "" {
@ -423,12 +423,12 @@ func (workspace *Workspace) closeAndCleanup(removeFiles bool) error {
return nil
}
workspace.closeLock.Lock()
alreadyClosed := workspace.closed
workspace.lifecycleLock.Lock()
alreadyClosed := workspace.isClosed
if !alreadyClosed {
workspace.closed = true
workspace.isClosed = true
}
workspace.closeLock.Unlock()
workspace.lifecycleLock.Unlock()
if !alreadyClosed {
if err := workspace.sqliteDatabase.Close(); err != nil {