[agent/codex:gpt-5.4] Read docs/RFC-STORE.md fully. Find features described in the... #153
4 changed files with 94 additions and 24 deletions
53
store.go
53
store.go
|
|
@ -42,6 +42,8 @@ type StoreConfig struct {
|
|||
Journal JournalConfiguration
|
||||
// Usage example: `config := store.StoreConfig{PurgeInterval: 30 * time.Second}`
|
||||
PurgeInterval time.Duration
|
||||
// Usage example: `config := store.StoreConfig{WorkspaceStateDirectory: "/tmp/core-state"}`
|
||||
WorkspaceStateDirectory string
|
||||
}
|
||||
|
||||
// Usage example: `if err := (store.StoreConfig{DatabasePath: ":memory:", PurgeInterval: 30 * time.Second}).Validate(); err != nil { return }`
|
||||
|
|
@ -89,15 +91,16 @@ func (journalConfig JournalConfiguration) isConfigured() bool {
|
|||
// Usage example: `storeInstance, err := store.NewConfigured(store.StoreConfig{DatabasePath: ":memory:", Journal: store.JournalConfiguration{EndpointURL: "http://127.0.0.1:8086", Organisation: "core", BucketName: "events"}, PurgeInterval: 30 * time.Second})`
|
||||
// Usage example: `value, err := storeInstance.Get("config", "colour")`
|
||||
type Store struct {
|
||||
sqliteDatabase *sql.DB
|
||||
databasePath string
|
||||
purgeContext context.Context
|
||||
cancelPurge context.CancelFunc
|
||||
purgeWaitGroup sync.WaitGroup
|
||||
purgeInterval time.Duration // interval between background purge cycles
|
||||
journalConfiguration JournalConfiguration
|
||||
closeLock sync.Mutex
|
||||
closed bool
|
||||
sqliteDatabase *sql.DB
|
||||
databasePath string
|
||||
workspaceStateDirectory string
|
||||
purgeContext context.Context
|
||||
cancelPurge context.CancelFunc
|
||||
purgeWaitGroup sync.WaitGroup
|
||||
purgeInterval time.Duration // interval between background purge cycles
|
||||
journalConfiguration JournalConfiguration
|
||||
closeLock sync.Mutex
|
||||
closed bool
|
||||
|
||||
// Event dispatch state.
|
||||
watchers map[string][]chan Event
|
||||
|
|
@ -164,9 +167,10 @@ func (storeInstance *Store) Config() StoreConfig {
|
|||
return StoreConfig{}
|
||||
}
|
||||
return StoreConfig{
|
||||
DatabasePath: storeInstance.databasePath,
|
||||
Journal: storeInstance.JournalConfiguration(),
|
||||
PurgeInterval: storeInstance.purgeInterval,
|
||||
DatabasePath: storeInstance.databasePath,
|
||||
Journal: storeInstance.JournalConfiguration(),
|
||||
PurgeInterval: storeInstance.purgeInterval,
|
||||
WorkspaceStateDirectory: storeInstance.workspaceStateDirectoryPath(),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -223,10 +227,13 @@ func openConfiguredStore(operation string, storeConfig StoreConfig) (*Store, err
|
|||
if storeConfig.PurgeInterval > 0 {
|
||||
storeInstance.purgeInterval = storeConfig.PurgeInterval
|
||||
}
|
||||
if storeConfig.WorkspaceStateDirectory != "" {
|
||||
storeInstance.workspaceStateDirectory = normaliseWorkspaceStateDirectory(storeConfig.WorkspaceStateDirectory)
|
||||
}
|
||||
|
||||
// New() performs a non-destructive orphan scan so callers can discover
|
||||
// leftover workspaces via RecoverOrphans().
|
||||
storeInstance.orphanWorkspaces = discoverOrphanWorkspaces(defaultWorkspaceStateDirectory, storeInstance)
|
||||
storeInstance.orphanWorkspaces = discoverOrphanWorkspaces(storeInstance.workspaceStateDirectoryPath(), storeInstance)
|
||||
storeInstance.startBackgroundPurge()
|
||||
return storeInstance, nil
|
||||
}
|
||||
|
|
@ -267,15 +274,23 @@ func openSQLiteStore(operation, databasePath string) (*Store, error) {
|
|||
|
||||
purgeContext, cancel := context.WithCancel(context.Background())
|
||||
return &Store{
|
||||
sqliteDatabase: sqliteDatabase,
|
||||
databasePath: databasePath,
|
||||
purgeContext: purgeContext,
|
||||
cancelPurge: cancel,
|
||||
purgeInterval: 60 * time.Second,
|
||||
watchers: make(map[string][]chan Event),
|
||||
sqliteDatabase: sqliteDatabase,
|
||||
databasePath: databasePath,
|
||||
workspaceStateDirectory: normaliseWorkspaceStateDirectory(defaultWorkspaceStateDirectory),
|
||||
purgeContext: purgeContext,
|
||||
cancelPurge: cancel,
|
||||
purgeInterval: 60 * time.Second,
|
||||
watchers: make(map[string][]chan Event),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (storeInstance *Store) workspaceStateDirectoryPath() string {
|
||||
if storeInstance == nil || storeInstance.workspaceStateDirectory == "" {
|
||||
return normaliseWorkspaceStateDirectory(defaultWorkspaceStateDirectory)
|
||||
}
|
||||
return normaliseWorkspaceStateDirectory(storeInstance.workspaceStateDirectory)
|
||||
}
|
||||
|
||||
// Usage example: `storeInstance, err := store.New(":memory:"); if err != nil { return }; defer storeInstance.Close()`
|
||||
func (storeInstance *Store) Close() error {
|
||||
if storeInstance == nil {
|
||||
|
|
|
|||
|
|
@ -108,6 +108,26 @@ func TestStore_New_Good_WithJournalOption(t *testing.T) {
|
|||
assert.Equal(t, "http://127.0.0.1:8086", storeInstance.journalConfiguration.EndpointURL)
|
||||
}
|
||||
|
||||
func TestStore_NewConfigured_Good_WorkspaceStateDirectory(t *testing.T) {
|
||||
workspaceStateDirectory := testPath(t, "workspace-state")
|
||||
|
||||
storeInstance, err := NewConfigured(StoreConfig{
|
||||
DatabasePath: ":memory:",
|
||||
WorkspaceStateDirectory: workspaceStateDirectory,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
defer storeInstance.Close()
|
||||
|
||||
assert.Equal(t, workspaceStateDirectory, storeInstance.Config().WorkspaceStateDirectory)
|
||||
|
||||
workspace, err := storeInstance.NewWorkspace("scroll-session")
|
||||
require.NoError(t, err)
|
||||
defer workspace.Discard()
|
||||
|
||||
assert.Equal(t, workspaceFilePath(workspaceStateDirectory, "scroll-session"), workspace.DatabasePath())
|
||||
assert.True(t, testFilesystem().Exists(workspace.DatabasePath()))
|
||||
}
|
||||
|
||||
func TestStore_JournalConfiguration_Good(t *testing.T) {
|
||||
storeInstance, err := New(":memory:", WithJournal("http://127.0.0.1:8086", "core", "events"))
|
||||
require.NoError(t, err)
|
||||
|
|
@ -212,7 +232,8 @@ func TestStore_Config_Good(t *testing.T) {
|
|||
Organisation: "core",
|
||||
BucketName: "events",
|
||||
},
|
||||
PurgeInterval: 20 * time.Millisecond,
|
||||
PurgeInterval: 20 * time.Millisecond,
|
||||
WorkspaceStateDirectory: normaliseWorkspaceStateDirectory(defaultWorkspaceStateDirectory),
|
||||
}, storeInstance.Config())
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -116,11 +116,12 @@ func (storeInstance *Store) NewWorkspace(name string) (*Workspace, error) {
|
|||
}
|
||||
|
||||
filesystem := (&core.Fs{}).NewUnrestricted()
|
||||
databasePath := workspaceFilePath(defaultWorkspaceStateDirectory, name)
|
||||
stateDirectory := storeInstance.workspaceStateDirectoryPath()
|
||||
databasePath := workspaceFilePath(stateDirectory, name)
|
||||
if filesystem.Exists(databasePath) {
|
||||
return nil, core.E("store.NewWorkspace", core.Concat("workspace already exists: ", name), nil)
|
||||
}
|
||||
if result := filesystem.EnsureDir(defaultWorkspaceStateDirectory); !result.OK {
|
||||
if result := filesystem.EnsureDir(stateDirectory); !result.OK {
|
||||
return nil, core.E("store.NewWorkspace", "ensure state directory", result.Value.(error))
|
||||
}
|
||||
|
||||
|
|
@ -225,11 +226,11 @@ func (storeInstance *Store) RecoverOrphans(stateDirectory string) []*Workspace {
|
|||
}
|
||||
|
||||
if stateDirectory == "" {
|
||||
stateDirectory = defaultWorkspaceStateDirectory
|
||||
stateDirectory = storeInstance.workspaceStateDirectoryPath()
|
||||
}
|
||||
stateDirectory = normaliseWorkspaceStateDirectory(stateDirectory)
|
||||
|
||||
if stateDirectory == normaliseWorkspaceStateDirectory(defaultWorkspaceStateDirectory) {
|
||||
if stateDirectory == storeInstance.workspaceStateDirectoryPath() {
|
||||
storeInstance.orphanWorkspacesLock.Lock()
|
||||
cachedWorkspaces := slices.Clone(storeInstance.orphanWorkspaces)
|
||||
storeInstance.orphanWorkspaces = nil
|
||||
|
|
|
|||
|
|
@ -310,6 +310,39 @@ func TestWorkspace_New_Good_CachesOrphansDuringConstruction(t *testing.T) {
|
|||
orphans[0].Discard()
|
||||
}
|
||||
|
||||
func TestWorkspace_NewConfigured_Good_CachesOrphansFromConfiguredStateDirectory(t *testing.T) {
|
||||
stateDirectory := testPath(t, "configured-state")
|
||||
requireCoreOK(t, testFilesystem().EnsureDir(stateDirectory))
|
||||
|
||||
orphanDatabasePath := workspaceFilePath(stateDirectory, "orphan-session")
|
||||
orphanDatabase, err := openWorkspaceDatabase(orphanDatabasePath)
|
||||
require.NoError(t, err)
|
||||
_, err = orphanDatabase.Exec(
|
||||
"INSERT INTO "+workspaceEntriesTableName+" (entry_kind, entry_data, created_at) VALUES (?, ?, ?)",
|
||||
"like",
|
||||
`{"user":"@alice"}`,
|
||||
time.Now().UnixMilli(),
|
||||
)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, orphanDatabase.Close())
|
||||
|
||||
storeInstance, err := NewConfigured(StoreConfig{
|
||||
DatabasePath: ":memory:",
|
||||
WorkspaceStateDirectory: stateDirectory,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
defer storeInstance.Close()
|
||||
|
||||
requireCoreOK(t, testFilesystem().DeleteAll(stateDirectory))
|
||||
assert.False(t, testFilesystem().Exists(orphanDatabasePath))
|
||||
|
||||
orphans := storeInstance.RecoverOrphans("")
|
||||
require.Len(t, orphans, 1)
|
||||
assert.Equal(t, "orphan-session", orphans[0].Name())
|
||||
assert.Equal(t, map[string]any{"like": 1}, orphans[0].Aggregate())
|
||||
orphans[0].Discard()
|
||||
}
|
||||
|
||||
func TestWorkspace_RecoverOrphans_Good_TrailingSlashUsesCache(t *testing.T) {
|
||||
stateDirectory := useWorkspaceStateDirectory(t)
|
||||
requireCoreOK(t, testFilesystem().EnsureDir(stateDirectory))
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue