[agent/codex:gpt-5.4-mini] Read docs/RFC-STORE.md and docs/specs/core/go/RFC.md fully. ... #179
4 changed files with 95 additions and 95 deletions
70
events.go
70
events.go
|
|
@ -72,20 +72,20 @@ func (storeInstance *Store) Watch(group string) <-chan Event {
|
|||
return closedEventChannel()
|
||||
}
|
||||
|
||||
storeInstance.closeLock.Lock()
|
||||
closed := storeInstance.closed
|
||||
storeInstance.closeLock.Unlock()
|
||||
storeInstance.lifecycleLock.Lock()
|
||||
closed := storeInstance.isClosed
|
||||
storeInstance.lifecycleLock.Unlock()
|
||||
if closed {
|
||||
return closedEventChannel()
|
||||
}
|
||||
|
||||
eventChannel := make(chan Event, watcherEventBufferCapacity)
|
||||
|
||||
storeInstance.watchersLock.Lock()
|
||||
defer storeInstance.watchersLock.Unlock()
|
||||
storeInstance.closeLock.Lock()
|
||||
closed = storeInstance.closed
|
||||
storeInstance.closeLock.Unlock()
|
||||
storeInstance.watcherLock.Lock()
|
||||
defer storeInstance.watcherLock.Unlock()
|
||||
storeInstance.lifecycleLock.Lock()
|
||||
closed = storeInstance.isClosed
|
||||
storeInstance.lifecycleLock.Unlock()
|
||||
if closed {
|
||||
return closedEventChannel()
|
||||
}
|
||||
|
|
@ -103,15 +103,15 @@ func (storeInstance *Store) Unwatch(group string, events <-chan Event) {
|
|||
return
|
||||
}
|
||||
|
||||
storeInstance.closeLock.Lock()
|
||||
closed := storeInstance.closed
|
||||
storeInstance.closeLock.Unlock()
|
||||
storeInstance.lifecycleLock.Lock()
|
||||
closed := storeInstance.isClosed
|
||||
storeInstance.lifecycleLock.Unlock()
|
||||
if closed {
|
||||
return
|
||||
}
|
||||
|
||||
storeInstance.watchersLock.Lock()
|
||||
defer storeInstance.watchersLock.Unlock()
|
||||
storeInstance.watcherLock.Lock()
|
||||
defer storeInstance.watcherLock.Unlock()
|
||||
|
||||
registeredEvents := storeInstance.watchers[group]
|
||||
if len(registeredEvents) == 0 {
|
||||
|
|
@ -151,21 +151,21 @@ func (storeInstance *Store) OnChange(callback func(Event)) func() {
|
|||
return func() {}
|
||||
}
|
||||
|
||||
storeInstance.closeLock.Lock()
|
||||
closed := storeInstance.closed
|
||||
storeInstance.closeLock.Unlock()
|
||||
storeInstance.lifecycleLock.Lock()
|
||||
closed := storeInstance.isClosed
|
||||
storeInstance.lifecycleLock.Unlock()
|
||||
if closed {
|
||||
return func() {}
|
||||
}
|
||||
|
||||
registrationID := atomic.AddUint64(&storeInstance.nextCallbackRegistrationID, 1)
|
||||
registrationID := atomic.AddUint64(&storeInstance.nextCallbackID, 1)
|
||||
callbackRegistration := changeCallbackRegistration{registrationID: registrationID, callback: callback}
|
||||
|
||||
storeInstance.callbacksLock.Lock()
|
||||
defer storeInstance.callbacksLock.Unlock()
|
||||
storeInstance.closeLock.Lock()
|
||||
closed = storeInstance.closed
|
||||
storeInstance.closeLock.Unlock()
|
||||
storeInstance.callbackLock.Lock()
|
||||
defer storeInstance.callbackLock.Unlock()
|
||||
storeInstance.lifecycleLock.Lock()
|
||||
closed = storeInstance.isClosed
|
||||
storeInstance.lifecycleLock.Unlock()
|
||||
if closed {
|
||||
return func() {}
|
||||
}
|
||||
|
|
@ -175,8 +175,8 @@ func (storeInstance *Store) OnChange(callback func(Event)) func() {
|
|||
var once sync.Once
|
||||
return func() {
|
||||
once.Do(func() {
|
||||
storeInstance.callbacksLock.Lock()
|
||||
defer storeInstance.callbacksLock.Unlock()
|
||||
storeInstance.callbackLock.Lock()
|
||||
defer storeInstance.callbackLock.Unlock()
|
||||
for i := range storeInstance.callbacks {
|
||||
if storeInstance.callbacks[i].registrationID == registrationID {
|
||||
storeInstance.callbacks = append(storeInstance.callbacks[:i], storeInstance.callbacks[i+1:]...)
|
||||
|
|
@ -201,19 +201,19 @@ func (storeInstance *Store) notify(event Event) {
|
|||
event.Timestamp = time.Now()
|
||||
}
|
||||
|
||||
storeInstance.closeLock.Lock()
|
||||
closed := storeInstance.closed
|
||||
storeInstance.closeLock.Unlock()
|
||||
storeInstance.lifecycleLock.Lock()
|
||||
closed := storeInstance.isClosed
|
||||
storeInstance.lifecycleLock.Unlock()
|
||||
if closed {
|
||||
return
|
||||
}
|
||||
|
||||
storeInstance.watchersLock.RLock()
|
||||
storeInstance.closeLock.Lock()
|
||||
closed = storeInstance.closed
|
||||
storeInstance.closeLock.Unlock()
|
||||
storeInstance.watcherLock.RLock()
|
||||
storeInstance.lifecycleLock.Lock()
|
||||
closed = storeInstance.isClosed
|
||||
storeInstance.lifecycleLock.Unlock()
|
||||
if closed {
|
||||
storeInstance.watchersLock.RUnlock()
|
||||
storeInstance.watcherLock.RUnlock()
|
||||
return
|
||||
}
|
||||
for _, registeredChannel := range storeInstance.watchers["*"] {
|
||||
|
|
@ -228,11 +228,11 @@ func (storeInstance *Store) notify(event Event) {
|
|||
default:
|
||||
}
|
||||
}
|
||||
storeInstance.watchersLock.RUnlock()
|
||||
storeInstance.watcherLock.RUnlock()
|
||||
|
||||
storeInstance.callbacksLock.RLock()
|
||||
storeInstance.callbackLock.RLock()
|
||||
callbacks := append([]changeCallbackRegistration(nil), storeInstance.callbacks...)
|
||||
storeInstance.callbacksLock.RUnlock()
|
||||
storeInstance.callbackLock.RUnlock()
|
||||
|
||||
for _, callback := range callbacks {
|
||||
callback.callback(event)
|
||||
|
|
|
|||
16
scope.go
16
scope.go
|
|
@ -71,8 +71,8 @@ type ScopedStore struct {
|
|||
// Usage example: `scopedStore.MaxGroups = 10`
|
||||
MaxGroups int
|
||||
|
||||
watcherLock sync.Mutex
|
||||
watcherBridges map[uintptr]scopedWatcherBridge
|
||||
watcherBridgeLock sync.Mutex
|
||||
watcherBridges map[uintptr]scopedWatcherBridge
|
||||
}
|
||||
|
||||
type scopedWatcherBridge struct {
|
||||
|
|
@ -321,7 +321,7 @@ func (scopedStore *ScopedStore) Watch(group string) <-chan Event {
|
|||
done := make(chan struct{})
|
||||
localEventsPointer := channelPointer(localEvents)
|
||||
|
||||
scopedStore.watcherLock.Lock()
|
||||
scopedStore.watcherBridgeLock.Lock()
|
||||
if scopedStore.watcherBridges == nil {
|
||||
scopedStore.watcherBridges = make(map[uintptr]scopedWatcherBridge)
|
||||
}
|
||||
|
|
@ -330,7 +330,7 @@ func (scopedStore *ScopedStore) Watch(group string) <-chan Event {
|
|||
sourceEvents: sourceEvents,
|
||||
done: done,
|
||||
}
|
||||
scopedStore.watcherLock.Unlock()
|
||||
scopedStore.watcherBridgeLock.Unlock()
|
||||
|
||||
go func() {
|
||||
defer close(localEvents)
|
||||
|
|
@ -368,12 +368,12 @@ func (scopedStore *ScopedStore) Unwatch(group string, events <-chan Event) {
|
|||
return
|
||||
}
|
||||
|
||||
scopedStore.watcherLock.Lock()
|
||||
scopedStore.watcherBridgeLock.Lock()
|
||||
watcherBridge, ok := scopedStore.watcherBridges[channelPointer(events)]
|
||||
if ok {
|
||||
delete(scopedStore.watcherBridges, channelPointer(events))
|
||||
}
|
||||
scopedStore.watcherLock.Unlock()
|
||||
scopedStore.watcherBridgeLock.Unlock()
|
||||
|
||||
if !ok {
|
||||
return
|
||||
|
|
@ -388,9 +388,9 @@ func (scopedStore *ScopedStore) removeWatcherBridge(pointer uintptr) {
|
|||
return
|
||||
}
|
||||
|
||||
scopedStore.watcherLock.Lock()
|
||||
scopedStore.watcherBridgeLock.Lock()
|
||||
delete(scopedStore.watcherBridges, pointer)
|
||||
scopedStore.watcherLock.Unlock()
|
||||
scopedStore.watcherBridgeLock.Unlock()
|
||||
}
|
||||
|
||||
func (scopedStore *ScopedStore) localiseWatchedEvent(event Event) (Event, bool) {
|
||||
|
|
|
|||
58
store.go
58
store.go
|
|
@ -141,18 +141,18 @@ type Store struct {
|
|||
purgeWaitGroup sync.WaitGroup
|
||||
purgeInterval time.Duration // interval between background purge cycles
|
||||
journalConfiguration JournalConfiguration
|
||||
closeLock sync.Mutex
|
||||
closed bool
|
||||
lifecycleLock sync.Mutex
|
||||
isClosed bool
|
||||
|
||||
// Event dispatch state.
|
||||
watchers map[string][]chan Event
|
||||
callbacks []changeCallbackRegistration
|
||||
watchersLock sync.RWMutex // protects watcher registration and dispatch
|
||||
callbacksLock sync.RWMutex // protects callback registration and dispatch
|
||||
nextCallbackRegistrationID uint64 // monotonic ID for callback registrations
|
||||
watchers map[string][]chan Event
|
||||
callbacks []changeCallbackRegistration
|
||||
watcherLock sync.RWMutex // protects watcher registration and dispatch
|
||||
callbackLock sync.RWMutex // protects callback registration and dispatch
|
||||
nextCallbackID uint64 // monotonic ID for callback registrations
|
||||
|
||||
orphanWorkspacesLock sync.Mutex
|
||||
orphanWorkspaces []*Workspace
|
||||
orphanWorkspaceLock sync.Mutex
|
||||
cachedOrphanWorkspaces []*Workspace
|
||||
}
|
||||
|
||||
func (storeInstance *Store) ensureReady(operation string) error {
|
||||
|
|
@ -163,9 +163,9 @@ func (storeInstance *Store) ensureReady(operation string) error {
|
|||
return core.E(operation, "store is not initialised", nil)
|
||||
}
|
||||
|
||||
storeInstance.closeLock.Lock()
|
||||
closed := storeInstance.closed
|
||||
storeInstance.closeLock.Unlock()
|
||||
storeInstance.lifecycleLock.Lock()
|
||||
closed := storeInstance.isClosed
|
||||
storeInstance.lifecycleLock.Unlock()
|
||||
if closed {
|
||||
return core.E(operation, "store is closed", nil)
|
||||
}
|
||||
|
|
@ -250,9 +250,9 @@ func (storeInstance *Store) IsClosed() bool {
|
|||
return true
|
||||
}
|
||||
|
||||
storeInstance.closeLock.Lock()
|
||||
closed := storeInstance.closed
|
||||
storeInstance.closeLock.Unlock()
|
||||
storeInstance.lifecycleLock.Lock()
|
||||
closed := storeInstance.isClosed
|
||||
storeInstance.lifecycleLock.Unlock()
|
||||
return closed
|
||||
}
|
||||
|
||||
|
|
@ -294,7 +294,7 @@ func openConfiguredStore(operation string, storeConfig StoreConfig) (*Store, err
|
|||
|
||||
// New() performs a non-destructive orphan scan so callers can discover
|
||||
// leftover workspaces via RecoverOrphans().
|
||||
storeInstance.orphanWorkspaces = discoverOrphanWorkspaces(storeInstance.workspaceStateDirectoryPath(), storeInstance)
|
||||
storeInstance.cachedOrphanWorkspaces = discoverOrphanWorkspaces(storeInstance.workspaceStateDirectoryPath(), storeInstance)
|
||||
storeInstance.startBackgroundPurge()
|
||||
return storeInstance, nil
|
||||
}
|
||||
|
|
@ -358,41 +358,41 @@ func (storeInstance *Store) Close() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
storeInstance.closeLock.Lock()
|
||||
if storeInstance.closed {
|
||||
storeInstance.closeLock.Unlock()
|
||||
storeInstance.lifecycleLock.Lock()
|
||||
if storeInstance.isClosed {
|
||||
storeInstance.lifecycleLock.Unlock()
|
||||
return nil
|
||||
}
|
||||
storeInstance.closed = true
|
||||
storeInstance.closeLock.Unlock()
|
||||
storeInstance.isClosed = true
|
||||
storeInstance.lifecycleLock.Unlock()
|
||||
|
||||
if storeInstance.cancelPurge != nil {
|
||||
storeInstance.cancelPurge()
|
||||
}
|
||||
storeInstance.purgeWaitGroup.Wait()
|
||||
|
||||
storeInstance.watchersLock.Lock()
|
||||
storeInstance.watcherLock.Lock()
|
||||
for groupName, registeredEvents := range storeInstance.watchers {
|
||||
for _, registeredEventChannel := range registeredEvents {
|
||||
close(registeredEventChannel)
|
||||
}
|
||||
delete(storeInstance.watchers, groupName)
|
||||
}
|
||||
storeInstance.watchersLock.Unlock()
|
||||
storeInstance.watcherLock.Unlock()
|
||||
|
||||
storeInstance.callbacksLock.Lock()
|
||||
storeInstance.callbackLock.Lock()
|
||||
storeInstance.callbacks = nil
|
||||
storeInstance.callbacksLock.Unlock()
|
||||
storeInstance.callbackLock.Unlock()
|
||||
|
||||
storeInstance.orphanWorkspacesLock.Lock()
|
||||
storeInstance.orphanWorkspaceLock.Lock()
|
||||
var orphanCleanupErr error
|
||||
for _, orphanWorkspace := range storeInstance.orphanWorkspaces {
|
||||
for _, orphanWorkspace := range storeInstance.cachedOrphanWorkspaces {
|
||||
if err := orphanWorkspace.closeWithoutRemovingFiles(); err != nil && orphanCleanupErr == nil {
|
||||
orphanCleanupErr = err
|
||||
}
|
||||
}
|
||||
storeInstance.orphanWorkspaces = nil
|
||||
storeInstance.orphanWorkspacesLock.Unlock()
|
||||
storeInstance.cachedOrphanWorkspaces = nil
|
||||
storeInstance.orphanWorkspaceLock.Unlock()
|
||||
|
||||
if storeInstance.sqliteDatabase == nil {
|
||||
return orphanCleanupErr
|
||||
|
|
|
|||
46
workspace.go
46
workspace.go
|
|
@ -40,15 +40,15 @@ var defaultWorkspaceStateDirectory = ".core/state/"
|
|||
//
|
||||
// Usage example: `workspace, err := storeInstance.NewWorkspace("scroll-session-2026-03-30"); if err != nil { return }; defer workspace.Discard(); _ = workspace.Put("like", map[string]any{"user": "@alice"})`
|
||||
type Workspace struct {
|
||||
name string
|
||||
store *Store
|
||||
sqliteDatabase *sql.DB
|
||||
databasePath string
|
||||
filesystem *core.Fs
|
||||
orphanAggregate map[string]any
|
||||
name string
|
||||
store *Store
|
||||
sqliteDatabase *sql.DB
|
||||
databasePath string
|
||||
filesystem *core.Fs
|
||||
cachedOrphanAggregate map[string]any
|
||||
|
||||
closeLock sync.Mutex
|
||||
closed bool
|
||||
lifecycleLock sync.Mutex
|
||||
isClosed bool
|
||||
}
|
||||
|
||||
// Usage example: `workspaceName := workspace.Name(); fmt.Println(workspaceName)`
|
||||
|
|
@ -93,9 +93,9 @@ func (workspace *Workspace) ensureReady(operation string) error {
|
|||
return err
|
||||
}
|
||||
|
||||
workspace.closeLock.Lock()
|
||||
closed := workspace.closed
|
||||
workspace.closeLock.Unlock()
|
||||
workspace.lifecycleLock.Lock()
|
||||
closed := workspace.isClosed
|
||||
workspace.lifecycleLock.Unlock()
|
||||
if closed {
|
||||
return core.E(operation, "workspace is closed", nil)
|
||||
}
|
||||
|
|
@ -203,7 +203,7 @@ func loadRecoveredWorkspaces(stateDirectory string, store *Store) []*Workspace {
|
|||
databasePath: databasePath,
|
||||
filesystem: filesystem,
|
||||
}
|
||||
orphanWorkspace.orphanAggregate = orphanWorkspace.captureAggregateSnapshot()
|
||||
orphanWorkspace.cachedOrphanAggregate = orphanWorkspace.captureAggregateSnapshot()
|
||||
orphanWorkspaces = append(orphanWorkspaces, orphanWorkspace)
|
||||
}
|
||||
return orphanWorkspaces
|
||||
|
|
@ -234,10 +234,10 @@ func (storeInstance *Store) RecoverOrphans(stateDirectory string) []*Workspace {
|
|||
stateDirectory = normaliseWorkspaceStateDirectory(stateDirectory)
|
||||
|
||||
if stateDirectory == storeInstance.workspaceStateDirectoryPath() {
|
||||
storeInstance.orphanWorkspacesLock.Lock()
|
||||
cachedWorkspaces := slices.Clone(storeInstance.orphanWorkspaces)
|
||||
storeInstance.orphanWorkspaces = nil
|
||||
storeInstance.orphanWorkspacesLock.Unlock()
|
||||
storeInstance.orphanWorkspaceLock.Lock()
|
||||
cachedWorkspaces := slices.Clone(storeInstance.cachedOrphanWorkspaces)
|
||||
storeInstance.cachedOrphanWorkspaces = nil
|
||||
storeInstance.orphanWorkspaceLock.Unlock()
|
||||
if len(cachedWorkspaces) > 0 {
|
||||
return cachedWorkspaces
|
||||
}
|
||||
|
|
@ -363,14 +363,14 @@ func (workspace *Workspace) captureAggregateSnapshot() map[string]any {
|
|||
}
|
||||
|
||||
func (workspace *Workspace) aggregateFallback() map[string]any {
|
||||
if workspace == nil || workspace.orphanAggregate == nil {
|
||||
if workspace == nil || workspace.cachedOrphanAggregate == nil {
|
||||
return map[string]any{}
|
||||
}
|
||||
return maps.Clone(workspace.orphanAggregate)
|
||||
return maps.Clone(workspace.cachedOrphanAggregate)
|
||||
}
|
||||
|
||||
func (workspace *Workspace) shouldUseOrphanAggregate() bool {
|
||||
if workspace == nil || workspace.orphanAggregate == nil {
|
||||
if workspace == nil || workspace.cachedOrphanAggregate == nil {
|
||||
return false
|
||||
}
|
||||
if workspace.filesystem == nil || workspace.databasePath == "" {
|
||||
|
|
@ -423,12 +423,12 @@ func (workspace *Workspace) closeAndCleanup(removeFiles bool) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
workspace.closeLock.Lock()
|
||||
alreadyClosed := workspace.closed
|
||||
workspace.lifecycleLock.Lock()
|
||||
alreadyClosed := workspace.isClosed
|
||||
if !alreadyClosed {
|
||||
workspace.closed = true
|
||||
workspace.isClosed = true
|
||||
}
|
||||
workspace.closeLock.Unlock()
|
||||
workspace.lifecycleLock.Unlock()
|
||||
|
||||
if !alreadyClosed {
|
||||
if err := workspace.sqliteDatabase.Close(); err != nil {
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue