2026-03-30 20:46:43 +00:00
|
|
|
package store
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"database/sql"
|
|
|
|
|
"io/fs"
|
2026-04-04 09:23:05 +00:00
|
|
|
"maps"
|
2026-04-03 04:56:08 +00:00
|
|
|
"slices"
|
2026-03-30 20:46:43 +00:00
|
|
|
"sync"
|
|
|
|
|
"time"
|
|
|
|
|
|
|
|
|
|
core "dappco.re/go/core"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
const (
|
2026-04-04 12:13:08 +00:00
|
|
|
workspaceEntriesTableName = "workspace_entries"
|
|
|
|
|
workspaceSummaryGroupPrefix = "workspace"
|
2026-03-30 20:46:43 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
|
|
const createWorkspaceEntriesTableSQL = `CREATE TABLE IF NOT EXISTS workspace_entries (
|
|
|
|
|
entry_id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
|
|
|
entry_kind TEXT NOT NULL,
|
|
|
|
|
entry_data TEXT NOT NULL,
|
|
|
|
|
created_at INTEGER NOT NULL
|
|
|
|
|
)`
|
|
|
|
|
|
2026-03-30 20:59:09 +00:00
|
|
|
const createWorkspaceEntriesViewSQL = `CREATE VIEW IF NOT EXISTS entries AS
|
|
|
|
|
SELECT
|
|
|
|
|
entry_id AS id,
|
|
|
|
|
entry_kind AS kind,
|
|
|
|
|
entry_data AS data,
|
|
|
|
|
created_at
|
|
|
|
|
FROM workspace_entries`
|
|
|
|
|
|
2026-04-04 09:01:23 +00:00
|
|
|
var defaultWorkspaceStateDirectory = ".core/state/"
|
2026-03-30 20:46:43 +00:00
|
|
|
|
2026-04-04 17:43:55 +00:00
|
|
|
// Usage example: `workspace, err := storeInstance.NewWorkspace("scroll-session"); if err != nil { return }; defer workspace.Discard()`
|
2026-04-04 08:57:06 +00:00
|
|
|
// Usage example: `workspace, err := storeInstance.NewWorkspace("scroll-session-2026-03-30"); if err != nil { return }; defer workspace.Discard(); _ = workspace.Put("like", map[string]any{"user": "@alice"})`
|
2026-04-04 21:29:27 +00:00
|
|
|
// Each workspace keeps mutable work-in-progress in a SQLite file such as
|
|
|
|
|
// `.core/state/scroll-session.duckdb` until `Commit()` or `Discard()` removes
|
|
|
|
|
// it.
|
2026-03-30 20:46:43 +00:00
|
|
|
type Workspace struct {
|
2026-04-04 21:09:20 +00:00
|
|
|
name string
|
|
|
|
|
store *Store
|
|
|
|
|
sqliteDatabase *sql.DB
|
|
|
|
|
databasePath string
|
|
|
|
|
filesystem *core.Fs
|
|
|
|
|
cachedOrphanAggregate map[string]any
|
|
|
|
|
|
|
|
|
|
lifecycleLock sync.Mutex
|
|
|
|
|
isClosed bool
|
2026-03-30 20:46:43 +00:00
|
|
|
}
|
|
|
|
|
|
2026-04-03 08:07:32 +00:00
|
|
|
// Usage example: `workspaceName := workspace.Name(); fmt.Println(workspaceName)`
|
|
|
|
|
func (workspace *Workspace) Name() string {
|
|
|
|
|
if workspace == nil {
|
|
|
|
|
return ""
|
|
|
|
|
}
|
|
|
|
|
return workspace.name
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-03 08:49:45 +00:00
|
|
|
// Usage example: `workspacePath := workspace.DatabasePath(); fmt.Println(workspacePath)`
|
|
|
|
|
func (workspace *Workspace) DatabasePath() string {
|
|
|
|
|
if workspace == nil {
|
|
|
|
|
return ""
|
|
|
|
|
}
|
|
|
|
|
return workspace.databasePath
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-04 17:05:51 +00:00
|
|
|
// Usage example: `if err := workspace.Close(); err != nil { return }`
|
2026-04-04 08:57:06 +00:00
|
|
|
// Usage example: `if err := workspace.Close(); err != nil { return }; orphans := storeInstance.RecoverOrphans(".core/state"); _ = orphans`
|
2026-04-04 21:29:27 +00:00
|
|
|
// `Close()` keeps the `.duckdb` file on disk so `RecoverOrphans(".core/state")`
|
|
|
|
|
// can reopen it after a crash or interrupted agent run.
|
2026-04-04 08:17:20 +00:00
|
|
|
func (workspace *Workspace) Close() error {
|
|
|
|
|
return workspace.closeWithoutRemovingFiles()
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-03 06:31:35 +00:00
|
|
|
func (workspace *Workspace) ensureReady(operation string) error {
|
|
|
|
|
if workspace == nil {
|
|
|
|
|
return core.E(operation, "workspace is nil", nil)
|
|
|
|
|
}
|
2026-04-04 20:11:54 +00:00
|
|
|
if workspace.store == nil {
|
2026-04-03 06:31:35 +00:00
|
|
|
return core.E(operation, "workspace store is nil", nil)
|
|
|
|
|
}
|
2026-04-04 11:43:54 +00:00
|
|
|
if workspace.sqliteDatabase == nil {
|
2026-04-03 06:31:35 +00:00
|
|
|
return core.E(operation, "workspace database is nil", nil)
|
|
|
|
|
}
|
|
|
|
|
if workspace.filesystem == nil {
|
|
|
|
|
return core.E(operation, "workspace filesystem is nil", nil)
|
|
|
|
|
}
|
2026-04-04 20:11:54 +00:00
|
|
|
if err := workspace.store.ensureReady(operation); err != nil {
|
2026-04-03 06:31:35 +00:00
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-04 21:09:20 +00:00
|
|
|
workspace.lifecycleLock.Lock()
|
|
|
|
|
closed := workspace.isClosed
|
|
|
|
|
workspace.lifecycleLock.Unlock()
|
2026-04-03 06:31:35 +00:00
|
|
|
if closed {
|
|
|
|
|
return core.E(operation, "workspace is closed", nil)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-04 08:57:06 +00:00
|
|
|
// Usage example: `workspace, err := storeInstance.NewWorkspace("scroll-session-2026-03-30"); if err != nil { return }; defer workspace.Discard()`
|
2026-04-04 21:29:27 +00:00
|
|
|
// This creates `.core/state/scroll-session-2026-03-30.duckdb` by default and
|
|
|
|
|
// removes it when the workspace is committed or discarded.
|
2026-03-30 20:46:43 +00:00
|
|
|
func (storeInstance *Store) NewWorkspace(name string) (*Workspace, error) {
|
2026-04-03 06:31:35 +00:00
|
|
|
if err := storeInstance.ensureReady("store.NewWorkspace"); err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-04 18:52:33 +00:00
|
|
|
workspaceNameValidation := core.ValidateName(name)
|
|
|
|
|
if !workspaceNameValidation.OK {
|
|
|
|
|
return nil, core.E("store.NewWorkspace", "validate workspace name", workspaceNameValidation.Value.(error))
|
2026-03-30 20:46:43 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
filesystem := (&core.Fs{}).NewUnrestricted()
|
2026-04-04 14:43:42 +00:00
|
|
|
stateDirectory := storeInstance.workspaceStateDirectoryPath()
|
|
|
|
|
databasePath := workspaceFilePath(stateDirectory, name)
|
2026-03-30 20:46:43 +00:00
|
|
|
if filesystem.Exists(databasePath) {
|
|
|
|
|
return nil, core.E("store.NewWorkspace", core.Concat("workspace already exists: ", name), nil)
|
|
|
|
|
}
|
2026-04-04 14:43:42 +00:00
|
|
|
if result := filesystem.EnsureDir(stateDirectory); !result.OK {
|
2026-03-30 20:46:43 +00:00
|
|
|
return nil, core.E("store.NewWorkspace", "ensure state directory", result.Value.(error))
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-04 11:43:54 +00:00
|
|
|
sqliteDatabase, err := openWorkspaceDatabase(databasePath)
|
2026-03-30 20:46:43 +00:00
|
|
|
if err != nil {
|
|
|
|
|
return nil, core.E("store.NewWorkspace", "open workspace database", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return &Workspace{
|
2026-04-04 11:43:54 +00:00
|
|
|
name: name,
|
2026-04-04 20:11:54 +00:00
|
|
|
store: storeInstance,
|
2026-04-04 11:43:54 +00:00
|
|
|
sqliteDatabase: sqliteDatabase,
|
|
|
|
|
databasePath: databasePath,
|
|
|
|
|
filesystem: filesystem,
|
2026-03-30 20:46:43 +00:00
|
|
|
}, nil
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-04 09:16:52 +00:00
|
|
|
// discoverOrphanWorkspacePaths(".core/state") returns leftover SQLite workspace
|
|
|
|
|
// files such as `scroll-session.duckdb` without opening them.
|
2026-04-03 07:22:12 +00:00
|
|
|
func discoverOrphanWorkspacePaths(stateDirectory string) []string {
|
|
|
|
|
filesystem := (&core.Fs{}).NewUnrestricted()
|
2026-03-30 20:46:43 +00:00
|
|
|
if stateDirectory == "" {
|
|
|
|
|
stateDirectory = defaultWorkspaceStateDirectory
|
|
|
|
|
}
|
|
|
|
|
if !filesystem.Exists(stateDirectory) {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
listResult := filesystem.List(stateDirectory)
|
|
|
|
|
if !listResult.OK {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-03 06:51:49 +00:00
|
|
|
directoryEntries, ok := listResult.Value.([]fs.DirEntry)
|
2026-03-30 20:46:43 +00:00
|
|
|
if !ok {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-03 06:51:49 +00:00
|
|
|
slices.SortFunc(directoryEntries, func(left, right fs.DirEntry) int {
|
2026-04-03 04:56:08 +00:00
|
|
|
switch {
|
|
|
|
|
case left.Name() < right.Name():
|
|
|
|
|
return -1
|
|
|
|
|
case left.Name() > right.Name():
|
|
|
|
|
return 1
|
|
|
|
|
default:
|
|
|
|
|
return 0
|
|
|
|
|
}
|
|
|
|
|
})
|
|
|
|
|
|
2026-04-03 07:22:12 +00:00
|
|
|
orphanPaths := make([]string, 0, len(directoryEntries))
|
2026-04-03 06:51:49 +00:00
|
|
|
for _, dirEntry := range directoryEntries {
|
2026-03-30 20:46:43 +00:00
|
|
|
if dirEntry.IsDir() || !core.HasSuffix(dirEntry.Name(), ".duckdb") {
|
|
|
|
|
continue
|
|
|
|
|
}
|
2026-04-03 07:22:12 +00:00
|
|
|
orphanPaths = append(orphanPaths, workspaceFilePath(stateDirectory, core.TrimSuffix(dirEntry.Name(), ".duckdb")))
|
|
|
|
|
}
|
|
|
|
|
return orphanPaths
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-04 20:11:54 +00:00
|
|
|
func discoverOrphanWorkspaces(stateDirectory string, store *Store) []*Workspace {
|
|
|
|
|
return loadRecoveredWorkspaces(stateDirectory, store)
|
2026-04-04 10:09:25 +00:00
|
|
|
}
|
|
|
|
|
|
2026-04-04 20:11:54 +00:00
|
|
|
func loadRecoveredWorkspaces(stateDirectory string, store *Store) []*Workspace {
|
2026-04-04 07:57:24 +00:00
|
|
|
filesystem := (&core.Fs{}).NewUnrestricted()
|
|
|
|
|
orphanWorkspaces := make([]*Workspace, 0)
|
|
|
|
|
for _, databasePath := range discoverOrphanWorkspacePaths(stateDirectory) {
|
2026-04-04 11:43:54 +00:00
|
|
|
sqliteDatabase, err := openWorkspaceDatabase(databasePath)
|
2026-04-04 07:57:24 +00:00
|
|
|
if err != nil {
|
|
|
|
|
continue
|
|
|
|
|
}
|
2026-04-04 09:23:05 +00:00
|
|
|
orphanWorkspace := &Workspace{
|
2026-04-04 11:43:54 +00:00
|
|
|
name: workspaceNameFromPath(stateDirectory, databasePath),
|
2026-04-04 20:11:54 +00:00
|
|
|
store: store,
|
2026-04-04 11:43:54 +00:00
|
|
|
sqliteDatabase: sqliteDatabase,
|
|
|
|
|
databasePath: databasePath,
|
|
|
|
|
filesystem: filesystem,
|
2026-04-04 09:23:05 +00:00
|
|
|
}
|
2026-04-04 21:09:20 +00:00
|
|
|
orphanWorkspace.cachedOrphanAggregate = orphanWorkspace.captureAggregateSnapshot()
|
2026-04-04 09:23:05 +00:00
|
|
|
orphanWorkspaces = append(orphanWorkspaces, orphanWorkspace)
|
2026-04-04 07:57:24 +00:00
|
|
|
}
|
|
|
|
|
return orphanWorkspaces
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-04 08:28:07 +00:00
|
|
|
func normaliseWorkspaceStateDirectory(stateDirectory string) string {
|
2026-04-04 09:01:23 +00:00
|
|
|
return normaliseDirectoryPath(stateDirectory)
|
2026-04-04 08:28:07 +00:00
|
|
|
}
|
|
|
|
|
|
2026-04-03 07:22:12 +00:00
|
|
|
func workspaceNameFromPath(stateDirectory, databasePath string) string {
|
|
|
|
|
relativePath := core.TrimPrefix(databasePath, joinPath(stateDirectory, ""))
|
|
|
|
|
return core.TrimSuffix(relativePath, ".duckdb")
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-04 08:57:06 +00:00
|
|
|
// Usage example: `orphans := storeInstance.RecoverOrphans(".core/state"); for _, orphanWorkspace := range orphans { fmt.Println(orphanWorkspace.Name(), orphanWorkspace.Aggregate()) }`
|
2026-04-04 21:29:27 +00:00
|
|
|
// This reopens leftover `.duckdb` files such as `scroll-session-2026-03-30`
|
|
|
|
|
// so callers can inspect `Aggregate()` and choose `Commit()` or `Discard()`.
|
2026-04-03 07:22:12 +00:00
|
|
|
func (storeInstance *Store) RecoverOrphans(stateDirectory string) []*Workspace {
|
|
|
|
|
if storeInstance == nil {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if stateDirectory == "" {
|
2026-04-04 14:43:42 +00:00
|
|
|
stateDirectory = storeInstance.workspaceStateDirectoryPath()
|
2026-04-03 07:22:12 +00:00
|
|
|
}
|
2026-04-04 08:28:07 +00:00
|
|
|
stateDirectory = normaliseWorkspaceStateDirectory(stateDirectory)
|
2026-04-03 07:22:12 +00:00
|
|
|
|
2026-04-04 14:43:42 +00:00
|
|
|
if stateDirectory == storeInstance.workspaceStateDirectoryPath() {
|
2026-04-04 21:09:20 +00:00
|
|
|
storeInstance.orphanWorkspaceLock.Lock()
|
|
|
|
|
cachedWorkspaces := slices.Clone(storeInstance.cachedOrphanWorkspaces)
|
|
|
|
|
storeInstance.cachedOrphanWorkspaces = nil
|
|
|
|
|
storeInstance.orphanWorkspaceLock.Unlock()
|
2026-04-04 07:57:24 +00:00
|
|
|
if len(cachedWorkspaces) > 0 {
|
|
|
|
|
return cachedWorkspaces
|
|
|
|
|
}
|
|
|
|
|
}
|
2026-04-04 10:09:25 +00:00
|
|
|
return loadRecoveredWorkspaces(stateDirectory, storeInstance)
|
2026-03-30 20:46:43 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Usage example: `err := workspace.Put("like", map[string]any{"user": "@alice", "post": "video_123"})`
|
|
|
|
|
func (workspace *Workspace) Put(kind string, data map[string]any) error {
|
2026-04-03 06:31:35 +00:00
|
|
|
if err := workspace.ensureReady("store.Workspace.Put"); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
2026-03-30 20:46:43 +00:00
|
|
|
if kind == "" {
|
|
|
|
|
return core.E("store.Workspace.Put", "kind is empty", nil)
|
|
|
|
|
}
|
|
|
|
|
if data == nil {
|
|
|
|
|
data = map[string]any{}
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-04 11:05:48 +00:00
|
|
|
dataJSON, err := marshalJSONText(data, "store.Workspace.Put", "marshal entry data")
|
2026-03-30 20:46:43 +00:00
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-04 11:43:54 +00:00
|
|
|
_, err = workspace.sqliteDatabase.Exec(
|
2026-03-30 20:46:43 +00:00
|
|
|
"INSERT INTO "+workspaceEntriesTableName+" (entry_kind, entry_data, created_at) VALUES (?, ?, ?)",
|
|
|
|
|
kind,
|
|
|
|
|
dataJSON,
|
|
|
|
|
time.Now().UnixMilli(),
|
|
|
|
|
)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return core.E("store.Workspace.Put", "insert entry", err)
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-05 08:58:26 +01:00
|
|
|
// Usage example: `entryCount, err := workspace.Count(); if err != nil { return }; fmt.Println(entryCount)`
|
|
|
|
|
func (workspace *Workspace) Count() (int, error) {
|
|
|
|
|
if err := workspace.ensureReady("store.Workspace.Count"); err != nil {
|
|
|
|
|
return 0, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var count int
|
|
|
|
|
err := workspace.sqliteDatabase.QueryRow(
|
|
|
|
|
"SELECT COUNT(*) FROM " + workspaceEntriesTableName,
|
|
|
|
|
).Scan(&count)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return 0, core.E("store.Workspace.Count", "count entries", err)
|
|
|
|
|
}
|
|
|
|
|
return count, nil
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-04 11:43:54 +00:00
|
|
|
// Usage example: `summary := workspace.Aggregate(); fmt.Println(summary["like"])`
|
2026-03-30 20:46:43 +00:00
|
|
|
func (workspace *Workspace) Aggregate() map[string]any {
|
2026-04-04 09:23:05 +00:00
|
|
|
if workspace.shouldUseOrphanAggregate() {
|
|
|
|
|
return workspace.aggregateFallback()
|
|
|
|
|
}
|
2026-04-03 06:31:35 +00:00
|
|
|
if err := workspace.ensureReady("store.Workspace.Aggregate"); err != nil {
|
2026-04-04 09:23:05 +00:00
|
|
|
return workspace.aggregateFallback()
|
2026-04-03 06:31:35 +00:00
|
|
|
}
|
|
|
|
|
|
2026-03-30 20:46:43 +00:00
|
|
|
fields, err := workspace.aggregateFields()
|
|
|
|
|
if err != nil {
|
2026-04-04 09:23:05 +00:00
|
|
|
return workspace.aggregateFallback()
|
2026-03-30 20:46:43 +00:00
|
|
|
}
|
|
|
|
|
return fields
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-04 08:57:06 +00:00
|
|
|
// Usage example: `result := workspace.Commit(); if !result.OK { return }; fmt.Println(result.Value)`
|
2026-04-04 21:29:27 +00:00
|
|
|
// `Commit()` writes one completed workspace row to the journal, upserts the
|
|
|
|
|
// `workspace:NAME/summary` entry, and removes the workspace file.
|
2026-03-30 20:46:43 +00:00
|
|
|
func (workspace *Workspace) Commit() core.Result {
|
2026-04-03 06:31:35 +00:00
|
|
|
if err := workspace.ensureReady("store.Workspace.Commit"); err != nil {
|
|
|
|
|
return core.Result{Value: err, OK: false}
|
|
|
|
|
}
|
|
|
|
|
|
2026-03-30 20:46:43 +00:00
|
|
|
fields, err := workspace.aggregateFields()
|
|
|
|
|
if err != nil {
|
|
|
|
|
return core.Result{Value: core.E("store.Workspace.Commit", "aggregate workspace", err), OK: false}
|
|
|
|
|
}
|
2026-04-04 20:11:54 +00:00
|
|
|
if err := workspace.store.commitWorkspaceAggregate(workspace.name, fields); err != nil {
|
2026-03-30 20:46:43 +00:00
|
|
|
return core.Result{Value: err, OK: false}
|
|
|
|
|
}
|
2026-04-03 06:51:49 +00:00
|
|
|
if err := workspace.closeAndRemoveFiles(); err != nil {
|
2026-03-30 20:46:43 +00:00
|
|
|
return core.Result{Value: err, OK: false}
|
|
|
|
|
}
|
2026-04-04 14:15:53 +00:00
|
|
|
return core.Result{Value: cloneAnyMap(fields), OK: true}
|
2026-03-30 20:46:43 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Usage example: `workspace.Discard()`
|
|
|
|
|
func (workspace *Workspace) Discard() {
|
2026-04-03 06:31:35 +00:00
|
|
|
if workspace == nil {
|
|
|
|
|
return
|
|
|
|
|
}
|
2026-04-03 06:51:49 +00:00
|
|
|
_ = workspace.closeAndRemoveFiles()
|
2026-03-30 20:46:43 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Usage example: `result := workspace.Query("SELECT entry_kind, COUNT(*) AS count FROM workspace_entries GROUP BY entry_kind")`
|
2026-04-04 21:29:27 +00:00
|
|
|
// `result.Value` contains `[]map[string]any`, which lets an agent inspect the
|
|
|
|
|
// current buffer state without defining extra result types.
|
2026-04-04 10:33:25 +00:00
|
|
|
func (workspace *Workspace) Query(query string) core.Result {
|
2026-04-03 06:31:35 +00:00
|
|
|
if err := workspace.ensureReady("store.Workspace.Query"); err != nil {
|
|
|
|
|
return core.Result{Value: err, OK: false}
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-04 11:43:54 +00:00
|
|
|
rows, err := workspace.sqliteDatabase.Query(query)
|
2026-03-30 20:46:43 +00:00
|
|
|
if err != nil {
|
|
|
|
|
return core.Result{Value: core.E("store.Workspace.Query", "query workspace", err), OK: false}
|
|
|
|
|
}
|
|
|
|
|
defer rows.Close()
|
|
|
|
|
|
|
|
|
|
rowMaps, err := queryRowsAsMaps(rows)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return core.Result{Value: core.E("store.Workspace.Query", "scan rows", err), OK: false}
|
|
|
|
|
}
|
|
|
|
|
return core.Result{Value: rowMaps, OK: true}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (workspace *Workspace) aggregateFields() (map[string]any, error) {
|
2026-04-03 06:31:35 +00:00
|
|
|
if err := workspace.ensureReady("store.Workspace.aggregateFields"); err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
2026-04-04 09:23:05 +00:00
|
|
|
return workspace.aggregateFieldsWithoutReadiness()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (workspace *Workspace) captureAggregateSnapshot() map[string]any {
|
2026-04-04 11:43:54 +00:00
|
|
|
if workspace == nil || workspace.sqliteDatabase == nil {
|
2026-04-04 09:23:05 +00:00
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fields, err := workspace.aggregateFieldsWithoutReadiness()
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
return fields
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (workspace *Workspace) aggregateFallback() map[string]any {
|
2026-04-04 21:09:20 +00:00
|
|
|
if workspace == nil || workspace.cachedOrphanAggregate == nil {
|
2026-04-04 09:23:05 +00:00
|
|
|
return map[string]any{}
|
|
|
|
|
}
|
2026-04-04 21:09:20 +00:00
|
|
|
return maps.Clone(workspace.cachedOrphanAggregate)
|
2026-04-04 09:23:05 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (workspace *Workspace) shouldUseOrphanAggregate() bool {
|
2026-04-04 21:09:20 +00:00
|
|
|
if workspace == nil || workspace.cachedOrphanAggregate == nil {
|
2026-04-04 09:23:05 +00:00
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
if workspace.filesystem == nil || workspace.databasePath == "" {
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
return !workspace.filesystem.Exists(workspace.databasePath)
|
|
|
|
|
}
|
2026-04-03 06:31:35 +00:00
|
|
|
|
2026-04-04 09:23:05 +00:00
|
|
|
func (workspace *Workspace) aggregateFieldsWithoutReadiness() (map[string]any, error) {
|
2026-04-04 11:43:54 +00:00
|
|
|
rows, err := workspace.sqliteDatabase.Query(
|
2026-03-30 20:46:43 +00:00
|
|
|
"SELECT entry_kind, COUNT(*) FROM " + workspaceEntriesTableName + " GROUP BY entry_kind ORDER BY entry_kind",
|
|
|
|
|
)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
defer rows.Close()
|
|
|
|
|
|
|
|
|
|
fields := make(map[string]any)
|
|
|
|
|
for rows.Next() {
|
|
|
|
|
var (
|
|
|
|
|
kind string
|
|
|
|
|
count int
|
|
|
|
|
)
|
|
|
|
|
if err := rows.Scan(&kind, &count); err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
fields[kind] = count
|
|
|
|
|
}
|
|
|
|
|
if err := rows.Err(); err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
return fields, nil
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-03 06:51:49 +00:00
|
|
|
func (workspace *Workspace) closeAndRemoveFiles() error {
|
2026-04-04 08:12:29 +00:00
|
|
|
return workspace.closeAndCleanup(true)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// closeWithoutRemovingFiles closes the database handle but leaves the orphan
|
|
|
|
|
// file on disk so a later store instance can recover it.
|
|
|
|
|
func (workspace *Workspace) closeWithoutRemovingFiles() error {
|
|
|
|
|
return workspace.closeAndCleanup(false)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (workspace *Workspace) closeAndCleanup(removeFiles bool) error {
|
2026-04-03 06:31:35 +00:00
|
|
|
if workspace == nil {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
2026-04-04 19:02:35 +00:00
|
|
|
if workspace.sqliteDatabase == nil {
|
2026-04-03 06:31:35 +00:00
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-04 21:09:20 +00:00
|
|
|
workspace.lifecycleLock.Lock()
|
|
|
|
|
alreadyClosed := workspace.isClosed
|
2026-04-04 10:13:27 +00:00
|
|
|
if !alreadyClosed {
|
2026-04-04 21:09:20 +00:00
|
|
|
workspace.isClosed = true
|
2026-03-30 20:46:43 +00:00
|
|
|
}
|
2026-04-04 21:09:20 +00:00
|
|
|
workspace.lifecycleLock.Unlock()
|
2026-03-30 20:46:43 +00:00
|
|
|
|
2026-04-04 10:13:27 +00:00
|
|
|
if !alreadyClosed {
|
2026-04-04 11:43:54 +00:00
|
|
|
if err := workspace.sqliteDatabase.Close(); err != nil {
|
2026-04-04 10:23:00 +00:00
|
|
|
return core.E("store.Workspace.closeAndCleanup", "close workspace database", err)
|
2026-04-04 10:13:27 +00:00
|
|
|
}
|
2026-03-30 20:46:43 +00:00
|
|
|
}
|
2026-04-04 19:02:35 +00:00
|
|
|
if !removeFiles || workspace.filesystem == nil {
|
2026-04-04 08:12:29 +00:00
|
|
|
return nil
|
|
|
|
|
}
|
2026-03-30 20:46:43 +00:00
|
|
|
for _, path := range []string{workspace.databasePath, workspace.databasePath + "-wal", workspace.databasePath + "-shm"} {
|
|
|
|
|
if result := workspace.filesystem.Delete(path); !result.OK && workspace.filesystem.Exists(path) {
|
2026-04-04 10:23:00 +00:00
|
|
|
return core.E("store.Workspace.closeAndCleanup", "delete workspace file", result.Value.(error))
|
2026-03-30 20:46:43 +00:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (storeInstance *Store) commitWorkspaceAggregate(workspaceName string, fields map[string]any) error {
|
2026-04-03 06:31:35 +00:00
|
|
|
if err := storeInstance.ensureReady("store.Workspace.Commit"); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
2026-04-04 09:04:56 +00:00
|
|
|
if err := ensureJournalSchema(storeInstance.sqliteDatabase); err != nil {
|
2026-03-30 20:46:43 +00:00
|
|
|
return core.E("store.Workspace.Commit", "ensure journal schema", err)
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-04 09:04:56 +00:00
|
|
|
transaction, err := storeInstance.sqliteDatabase.Begin()
|
2026-03-30 20:46:43 +00:00
|
|
|
if err != nil {
|
|
|
|
|
return core.E("store.Workspace.Commit", "begin transaction", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
committed := false
|
|
|
|
|
defer func() {
|
|
|
|
|
if !committed {
|
|
|
|
|
_ = transaction.Rollback()
|
|
|
|
|
}
|
|
|
|
|
}()
|
|
|
|
|
|
2026-04-04 11:05:48 +00:00
|
|
|
fieldsJSON, err := marshalJSONText(fields, "store.Workspace.Commit", "marshal summary")
|
2026-03-30 20:46:43 +00:00
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
2026-04-04 11:05:48 +00:00
|
|
|
tagsJSON, err := marshalJSONText(map[string]string{"workspace": workspaceName}, "store.Workspace.Commit", "marshal tags")
|
2026-03-30 20:46:43 +00:00
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-04 11:54:03 +00:00
|
|
|
if err := commitJournalEntry(
|
2026-03-30 20:46:43 +00:00
|
|
|
transaction,
|
|
|
|
|
storeInstance.journalBucket(),
|
|
|
|
|
workspaceName,
|
|
|
|
|
fieldsJSON,
|
|
|
|
|
tagsJSON,
|
|
|
|
|
time.Now().UnixMilli(),
|
|
|
|
|
); err != nil {
|
|
|
|
|
return core.E("store.Workspace.Commit", "insert journal entry", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if _, err := transaction.Exec(
|
|
|
|
|
"INSERT INTO "+entriesTableName+" ("+entryGroupColumn+", "+entryKeyColumn+", "+entryValueColumn+", expires_at) VALUES (?, ?, ?, NULL) "+
|
|
|
|
|
"ON CONFLICT("+entryGroupColumn+", "+entryKeyColumn+") DO UPDATE SET "+entryValueColumn+" = excluded."+entryValueColumn+", expires_at = NULL",
|
|
|
|
|
workspaceSummaryGroup(workspaceName),
|
|
|
|
|
"summary",
|
|
|
|
|
fieldsJSON,
|
|
|
|
|
); err != nil {
|
|
|
|
|
return core.E("store.Workspace.Commit", "upsert workspace summary", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if err := transaction.Commit(); err != nil {
|
|
|
|
|
return core.E("store.Workspace.Commit", "commit transaction", err)
|
|
|
|
|
}
|
|
|
|
|
committed = true
|
2026-03-30 21:07:30 +00:00
|
|
|
storeInstance.notify(Event{
|
|
|
|
|
Type: EventSet,
|
|
|
|
|
Group: workspaceSummaryGroup(workspaceName),
|
|
|
|
|
Key: "summary",
|
|
|
|
|
Value: fieldsJSON,
|
|
|
|
|
Timestamp: time.Now(),
|
|
|
|
|
})
|
2026-03-30 20:46:43 +00:00
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func openWorkspaceDatabase(databasePath string) (*sql.DB, error) {
|
2026-04-04 11:43:54 +00:00
|
|
|
sqliteDatabase, err := sql.Open("sqlite", databasePath)
|
2026-03-30 20:46:43 +00:00
|
|
|
if err != nil {
|
2026-04-04 08:41:28 +00:00
|
|
|
return nil, core.E("store.openWorkspaceDatabase", "open workspace database", err)
|
2026-03-30 20:46:43 +00:00
|
|
|
}
|
2026-04-04 11:43:54 +00:00
|
|
|
sqliteDatabase.SetMaxOpenConns(1)
|
|
|
|
|
if _, err := sqliteDatabase.Exec("PRAGMA journal_mode=WAL"); err != nil {
|
|
|
|
|
sqliteDatabase.Close()
|
2026-04-04 08:41:28 +00:00
|
|
|
return nil, core.E("store.openWorkspaceDatabase", "set WAL journal mode", err)
|
2026-03-30 20:46:43 +00:00
|
|
|
}
|
2026-04-04 11:43:54 +00:00
|
|
|
if _, err := sqliteDatabase.Exec("PRAGMA busy_timeout=5000"); err != nil {
|
|
|
|
|
sqliteDatabase.Close()
|
2026-04-04 08:41:28 +00:00
|
|
|
return nil, core.E("store.openWorkspaceDatabase", "set busy timeout", err)
|
2026-03-30 20:46:43 +00:00
|
|
|
}
|
2026-04-04 11:43:54 +00:00
|
|
|
if _, err := sqliteDatabase.Exec(createWorkspaceEntriesTableSQL); err != nil {
|
|
|
|
|
sqliteDatabase.Close()
|
2026-04-04 08:41:28 +00:00
|
|
|
return nil, core.E("store.openWorkspaceDatabase", "create workspace entries table", err)
|
2026-03-30 20:46:43 +00:00
|
|
|
}
|
2026-04-04 11:43:54 +00:00
|
|
|
if _, err := sqliteDatabase.Exec(createWorkspaceEntriesViewSQL); err != nil {
|
|
|
|
|
sqliteDatabase.Close()
|
2026-04-04 08:41:28 +00:00
|
|
|
return nil, core.E("store.openWorkspaceDatabase", "create workspace entries view", err)
|
2026-03-30 20:59:09 +00:00
|
|
|
}
|
2026-04-04 11:43:54 +00:00
|
|
|
return sqliteDatabase, nil
|
2026-03-30 20:46:43 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func workspaceSummaryGroup(workspaceName string) string {
|
2026-04-04 12:13:08 +00:00
|
|
|
return core.Concat(workspaceSummaryGroupPrefix, ":", workspaceName)
|
2026-03-30 20:46:43 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func workspaceFilePath(stateDirectory, name string) string {
|
|
|
|
|
return joinPath(stateDirectory, core.Concat(name, ".duckdb"))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func joinPath(base, name string) string {
|
|
|
|
|
if base == "" {
|
|
|
|
|
return name
|
|
|
|
|
}
|
2026-04-04 09:01:23 +00:00
|
|
|
return core.Concat(normaliseDirectoryPath(base), "/", name)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func normaliseDirectoryPath(directory string) string {
|
|
|
|
|
for directory != "" && core.HasSuffix(directory, "/") {
|
|
|
|
|
directory = core.TrimSuffix(directory, "/")
|
|
|
|
|
}
|
|
|
|
|
return directory
|
2026-03-30 20:46:43 +00:00
|
|
|
}
|