[agent/codex:gpt-5.4] Implement docs/RFC-STORE.md using docs/RFC-CORE-008-AGENT-EX... #26

Merged
Virgil merged 1 commit from agent/implement-docs-rfc-store-md-using-docs-r into dev 2026-03-30 20:47:17 +00:00
13 changed files with 1404 additions and 28 deletions

191
compact.go Normal file
View file

@ -0,0 +1,191 @@
package store
import (
"compress/gzip"
"io"
"time"
core "dappco.re/go/core"
)
var defaultArchiveOutputDirectory = ".core/archive"
// CompactOptions controls cold archive generation.
// Usage example: `options := store.CompactOptions{Before: time.Now().Add(-90 * 24 * time.Hour), Output: "/tmp/archive", Format: "gzip"}`
type CompactOptions struct {
Before time.Time
Output string
Format string
}
type compactArchiveEntry struct {
id int64
bucket string
measurement string
fieldsJSON string
tagsJSON string
committedAt int64
}
// Compact archives old journal entries as newline-delimited JSON.
// Usage example: `result := storeInstance.Compact(store.CompactOptions{Before: time.Now().Add(-30 * 24 * time.Hour), Output: "/tmp/archive", Format: "gzip"})`
func (storeInstance *Store) Compact(options CompactOptions) core.Result {
if err := ensureJournalSchema(storeInstance.database); err != nil {
return core.Result{Value: core.E("store.Compact", "ensure journal schema", err), OK: false}
}
outputDirectory := options.Output
if outputDirectory == "" {
outputDirectory = defaultArchiveOutputDirectory
}
format := options.Format
if format == "" {
format = "gzip"
}
if format != "gzip" {
return core.Result{Value: core.E("store.Compact", core.Concat("unsupported archive format: ", format), nil), OK: false}
}
filesystem := (&core.Fs{}).NewUnrestricted()
if result := filesystem.EnsureDir(outputDirectory); !result.OK {
return core.Result{Value: core.E("store.Compact", "ensure archive directory", result.Value.(error)), OK: false}
}
rows, err := storeInstance.database.Query(
"SELECT entry_id, bucket_name, measurement, fields_json, tags_json, committed_at FROM "+journalEntriesTableName+" WHERE archived_at IS NULL AND committed_at < ? ORDER BY committed_at",
options.Before.UnixMilli(),
)
if err != nil {
return core.Result{Value: core.E("store.Compact", "query journal rows", err), OK: false}
}
defer rows.Close()
var archiveEntries []compactArchiveEntry
for rows.Next() {
var entry compactArchiveEntry
if err := rows.Scan(
&entry.id,
&entry.bucket,
&entry.measurement,
&entry.fieldsJSON,
&entry.tagsJSON,
&entry.committedAt,
); err != nil {
return core.Result{Value: core.E("store.Compact", "scan journal row", err), OK: false}
}
archiveEntries = append(archiveEntries, entry)
}
if err := rows.Err(); err != nil {
return core.Result{Value: core.E("store.Compact", "iterate journal rows", err), OK: false}
}
if len(archiveEntries) == 0 {
return core.Result{Value: "", OK: true}
}
outputPath := compactOutputPath(outputDirectory, format)
createResult := filesystem.Create(outputPath)
if !createResult.OK {
return core.Result{Value: core.E("store.Compact", "create archive file", createResult.Value.(error)), OK: false}
}
file, ok := createResult.Value.(io.WriteCloser)
if !ok {
return core.Result{Value: core.E("store.Compact", "archive file is not writable", nil), OK: false}
}
fileClosed := false
defer func() {
if !fileClosed {
_ = file.Close()
}
}()
writer := gzip.NewWriter(file)
writeOK := false
defer func() {
if !writeOK {
_ = writer.Close()
}
}()
for _, entry := range archiveEntries {
lineMap, err := compactArchiveLine(entry)
if err != nil {
return core.Result{Value: err, OK: false}
}
lineJSON, err := jsonString(lineMap, "store.Compact", "marshal archive line")
if err != nil {
return core.Result{Value: err, OK: false}
}
if _, err := io.WriteString(writer, lineJSON+"\n"); err != nil {
return core.Result{Value: core.E("store.Compact", "write archive line", err), OK: false}
}
}
if err := writer.Close(); err != nil {
return core.Result{Value: core.E("store.Compact", "close archive writer", err), OK: false}
}
writeOK = true
if err := file.Close(); err != nil {
return core.Result{Value: core.E("store.Compact", "close archive file", err), OK: false}
}
fileClosed = true
transaction, err := storeInstance.database.Begin()
if err != nil {
return core.Result{Value: core.E("store.Compact", "begin archive transaction", err), OK: false}
}
committed := false
defer func() {
if !committed {
_ = transaction.Rollback()
}
}()
archivedAt := time.Now().UnixMilli()
for _, entry := range archiveEntries {
if _, err := transaction.Exec(
"UPDATE "+journalEntriesTableName+" SET archived_at = ? WHERE entry_id = ?",
archivedAt,
entry.id,
); err != nil {
return core.Result{Value: core.E("store.Compact", "mark journal row archived", err), OK: false}
}
}
if err := transaction.Commit(); err != nil {
return core.Result{Value: core.E("store.Compact", "commit archive transaction", err), OK: false}
}
committed = true
return core.Result{Value: outputPath, OK: true}
}
func compactArchiveLine(entry compactArchiveEntry) (map[string]any, error) {
fields := make(map[string]any)
fieldsResult := core.JSONUnmarshalString(entry.fieldsJSON, &fields)
if !fieldsResult.OK {
return nil, core.E("store.Compact", "unmarshal fields", fieldsResult.Value.(error))
}
tags := make(map[string]string)
tagsResult := core.JSONUnmarshalString(entry.tagsJSON, &tags)
if !tagsResult.OK {
return nil, core.E("store.Compact", "unmarshal tags", tagsResult.Value.(error))
}
return map[string]any{
"bucket": entry.bucket,
"measurement": entry.measurement,
"fields": fields,
"tags": tags,
"committed_at": entry.committedAt,
}, nil
}
func compactOutputPath(outputDirectory, format string) string {
extension := ".jsonl"
if format == "gzip" {
extension = ".jsonl.gz"
}
filename := core.Concat("journal-", time.Now().UTC().Format("20060102-150405"), extension)
return joinPath(outputDirectory, filename)
}

81
compact_test.go Normal file
View file

@ -0,0 +1,81 @@
package store
import (
"bytes"
"compress/gzip"
"io"
"testing"
"time"
core "dappco.re/go/core"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestCompact_Compact_Good_GzipArchive(t *testing.T) {
outputDirectory := useArchiveOutputDirectory(t)
storeInstance, err := New(":memory:", WithJournal("http://127.0.0.1:8086", "core", "events"))
require.NoError(t, err)
defer storeInstance.Close()
require.True(t,
storeInstance.CommitToJournal("session-a", map[string]any{"like": 1}, map[string]string{"workspace": "session-a"}).OK,
)
require.True(t,
storeInstance.CommitToJournal("session-b", map[string]any{"like": 2}, map[string]string{"workspace": "session-b"}).OK,
)
_, err = storeInstance.database.Exec(
"UPDATE "+journalEntriesTableName+" SET committed_at = ? WHERE measurement = ?",
time.Now().Add(-48*time.Hour).UnixMilli(),
"session-a",
)
require.NoError(t, err)
result := storeInstance.Compact(CompactOptions{
Before: time.Now().Add(-24 * time.Hour),
Output: outputDirectory,
Format: "gzip",
})
require.True(t, result.OK, "compact failed: %v", result.Value)
archivePath, ok := result.Value.(string)
require.True(t, ok, "unexpected archive path type: %T", result.Value)
assert.True(t, testFilesystem().Exists(archivePath))
archiveData := requireCoreReadBytes(t, archivePath)
reader, err := gzip.NewReader(bytes.NewReader(archiveData))
require.NoError(t, err)
defer reader.Close()
decompressedData, err := io.ReadAll(reader)
require.NoError(t, err)
lines := core.Split(core.Trim(string(decompressedData)), "\n")
require.Len(t, lines, 1)
archivedRow := make(map[string]any)
unmarshalResult := core.JSONUnmarshalString(lines[0], &archivedRow)
require.True(t, unmarshalResult.OK, "archive line unmarshal failed: %v", unmarshalResult.Value)
assert.Equal(t, "session-a", archivedRow["measurement"])
remainingRows := requireResultRows(t, storeInstance.QueryJournal(""))
require.Len(t, remainingRows, 1)
assert.Equal(t, "session-b", remainingRows[0]["measurement"])
}
func TestCompact_Compact_Good_NoRows(t *testing.T) {
outputDirectory := useArchiveOutputDirectory(t)
storeInstance, err := New(":memory:")
require.NoError(t, err)
defer storeInstance.Close()
result := storeInstance.Compact(CompactOptions{
Before: time.Now(),
Output: outputDirectory,
Format: "gzip",
})
require.True(t, result.OK, "compact failed: %v", result.Value)
assert.Equal(t, "", result.Value)
}

View file

@ -77,13 +77,18 @@ const watcherEventBufferCapacity = 16
// Watch registers a buffered subscription for matching mutations.
// Usage example: `watcher := storeInstance.Watch("*", "*")`
func (storeInstance *Store) Watch(group, key string) *Watcher {
// Usage example: `watcher := storeInstance.Watch("config")`
func (storeInstance *Store) Watch(group string, key ...string) *Watcher {
keyPattern := "*"
if len(key) > 0 && key[0] != "" {
keyPattern = key[0]
}
eventChannel := make(chan Event, watcherEventBufferCapacity)
watcher := &Watcher{
Events: eventChannel,
eventsChannel: eventChannel,
groupPattern: group,
keyPattern: key,
keyPattern: keyPattern,
registrationID: atomic.AddUint64(&storeInstance.nextWatcherRegistrationID, 1),
}
@ -115,7 +120,9 @@ func (storeInstance *Store) Unwatch(watcher *Watcher) {
// OnChange registers a synchronous mutation callback.
// Usage example: `events := make(chan store.Event, 1); unregister := storeInstance.OnChange(func(event store.Event) { events <- event }); defer unregister()`
func (storeInstance *Store) OnChange(callback func(Event)) func() {
// Usage example: `unregister := storeInstance.OnChange("config", func(key, value string) { fmt.Println(key, value) })`
func (storeInstance *Store) OnChange(arguments ...any) func() {
callback := onChangeCallback(arguments)
if callback == nil {
return func() {}
}
@ -140,6 +147,39 @@ func (storeInstance *Store) OnChange(callback func(Event)) func() {
}
}
func onChangeCallback(arguments []any) func(Event) {
switch len(arguments) {
case 0:
return nil
case 1:
if arguments[0] == nil {
return nil
}
callback, ok := arguments[0].(func(Event))
if !ok {
return nil
}
return callback
case 2:
group, ok := arguments[0].(string)
if !ok {
return nil
}
callback, ok := arguments[1].(func(string, string))
if !ok || callback == nil {
return nil
}
return func(event Event) {
if event.Group != group {
return
}
callback(event.Key, event.Value)
}
default:
return nil
}
}
// notify(Event{Type: EventSet, Group: "config", Key: "colour", Value: "blue"})
// dispatches matching watchers and callbacks after a successful write. If a
// watcher buffer is full, the event is dropped instead of blocking the writer.

View file

@ -66,6 +66,22 @@ func TestEvents_Watch_Good_WildcardKey(t *testing.T) {
assert.Equal(t, "colour", received[1].Key)
}
func TestEvents_Watch_Good_DefaultWildcardKey(t *testing.T) {
storeInstance, _ := New(":memory:")
defer storeInstance.Close()
watcher := storeInstance.Watch("config")
defer storeInstance.Unwatch(watcher)
require.NoError(t, storeInstance.Set("config", "theme", "dark"))
require.NoError(t, storeInstance.Set("config", "colour", "blue"))
received := drainEvents(watcher.Events, 2, time.Second)
require.Len(t, received, 2)
assert.Equal(t, "theme", received[0].Key)
assert.Equal(t, "colour", received[1].Key)
}
func TestEvents_Watch_Good_GroupMismatch(t *testing.T) {
storeInstance, _ := New(":memory:")
defer storeInstance.Close()
@ -300,6 +316,22 @@ func TestEvents_OnChange_Good_NilCallbackNoOp(t *testing.T) {
unregister()
}
func TestEvents_OnChange_Good_GroupCallback(t *testing.T) {
storeInstance, _ := New(":memory:")
defer storeInstance.Close()
var keys []string
unregister := storeInstance.OnChange("config", func(key, value string) {
keys = append(keys, key)
})
defer unregister()
require.NoError(t, storeInstance.Set("config", "theme", "dark"))
require.NoError(t, storeInstance.Set("other", "theme", "ignored"))
assert.Equal(t, []string{"theme"}, keys)
}
// ---------------------------------------------------------------------------
// OnChange — callback can manage subscriptions while handling an event
// ---------------------------------------------------------------------------

291
journal.go Normal file
View file

@ -0,0 +1,291 @@
package store
import (
"database/sql"
"regexp"
"strconv"
"time"
core "dappco.re/go/core"
)
const (
journalEntriesTableName = "journal_entries"
defaultJournalBucket = "store"
)
const createJournalEntriesTableSQL = `CREATE TABLE IF NOT EXISTS journal_entries (
entry_id INTEGER PRIMARY KEY AUTOINCREMENT,
bucket_name TEXT NOT NULL,
measurement TEXT NOT NULL,
fields_json TEXT NOT NULL,
tags_json TEXT NOT NULL,
committed_at INTEGER NOT NULL,
archived_at INTEGER
)`
var (
journalBucketPattern = regexp.MustCompile(`bucket:\s*"([^"]+)"`)
journalRangePattern = regexp.MustCompile(`range\(\s*start:\s*([^)]+)\)`)
journalMeasurementPattern = regexp.MustCompile(`(?:_measurement|measurement)\s*==\s*"([^"]+)"`)
)
type journalExecutor interface {
Exec(query string, args ...any) (sql.Result, error)
}
// CommitToJournal records one completed unit of work in the store journal.
// Usage example: `result := storeInstance.CommitToJournal("scroll-session", map[string]any{"like": 4}, map[string]string{"workspace": "scroll-session"})`
func (storeInstance *Store) CommitToJournal(measurement string, fields map[string]any, tags map[string]string) core.Result {
if measurement == "" {
return core.Result{Value: core.E("store.CommitToJournal", "measurement is empty", nil), OK: false}
}
if fields == nil {
fields = map[string]any{}
}
if tags == nil {
tags = map[string]string{}
}
if err := ensureJournalSchema(storeInstance.database); err != nil {
return core.Result{Value: core.E("store.CommitToJournal", "ensure journal schema", err), OK: false}
}
fieldsJSON, err := jsonString(fields, "store.CommitToJournal", "marshal fields")
if err != nil {
return core.Result{Value: err, OK: false}
}
tagsJSON, err := jsonString(tags, "store.CommitToJournal", "marshal tags")
if err != nil {
return core.Result{Value: err, OK: false}
}
committedAt := time.Now().UnixMilli()
if err := insertJournalEntry(
storeInstance.database,
storeInstance.journalBucket(),
measurement,
fieldsJSON,
tagsJSON,
committedAt,
); err != nil {
return core.Result{Value: core.E("store.CommitToJournal", "insert journal entry", err), OK: false}
}
return core.Result{
Value: map[string]any{
"bucket": storeInstance.journalBucket(),
"measurement": measurement,
"fields": fields,
"tags": tags,
"committed_at": committedAt,
},
OK: true,
}
}
// QueryJournal reads journal rows either through a small Flux-like filter
// surface or a direct SQL SELECT against the internal journal table.
// Usage example: `result := storeInstance.QueryJournal(\`from(bucket: "store") |> range(start: -24h)\`)`
func (storeInstance *Store) QueryJournal(flux string) core.Result {
if err := ensureJournalSchema(storeInstance.database); err != nil {
return core.Result{Value: core.E("store.QueryJournal", "ensure journal schema", err), OK: false}
}
trimmedQuery := core.Trim(flux)
if trimmedQuery == "" {
return storeInstance.queryJournalRows(
"SELECT bucket_name, measurement, fields_json, tags_json, committed_at, archived_at FROM " + journalEntriesTableName + " WHERE archived_at IS NULL ORDER BY committed_at",
)
}
if core.HasPrefix(trimmedQuery, "SELECT") || core.HasPrefix(trimmedQuery, "select") {
return storeInstance.queryJournalRows(trimmedQuery)
}
selectSQL, arguments, err := storeInstance.queryJournalFlux(trimmedQuery)
if err != nil {
return core.Result{Value: err, OK: false}
}
return storeInstance.queryJournalRows(selectSQL, arguments...)
}
func (storeInstance *Store) queryJournalRows(query string, arguments ...any) core.Result {
rows, err := storeInstance.database.Query(query, arguments...)
if err != nil {
return core.Result{Value: core.E("store.QueryJournal", "query rows", err), OK: false}
}
defer rows.Close()
rowMaps, err := queryRowsAsMaps(rows)
if err != nil {
return core.Result{Value: core.E("store.QueryJournal", "scan rows", err), OK: false}
}
return core.Result{Value: inflateJournalRows(rowMaps), OK: true}
}
func (storeInstance *Store) queryJournalFlux(flux string) (string, []any, error) {
builder := core.NewBuilder()
builder.WriteString("SELECT bucket_name, measurement, fields_json, tags_json, committed_at, archived_at FROM ")
builder.WriteString(journalEntriesTableName)
builder.WriteString(" WHERE archived_at IS NULL")
var arguments []any
if bucket := quotedSubmatch(journalBucketPattern, flux); bucket != "" {
builder.WriteString(" AND bucket_name = ?")
arguments = append(arguments, bucket)
}
if measurement := quotedSubmatch(journalMeasurementPattern, flux); measurement != "" {
builder.WriteString(" AND measurement = ?")
arguments = append(arguments, measurement)
}
rangeMatch := quotedSubmatch(journalRangePattern, flux)
if rangeMatch == "" {
rangeMatch = regexpSubmatch(journalRangePattern, flux, 1)
}
if rangeMatch != "" {
startTime, err := fluxStartTime(core.Trim(rangeMatch))
if err != nil {
return "", nil, core.E("store.QueryJournal", "parse range", err)
}
builder.WriteString(" AND committed_at >= ?")
arguments = append(arguments, startTime.UnixMilli())
}
builder.WriteString(" ORDER BY committed_at")
return builder.String(), arguments, nil
}
func (storeInstance *Store) journalBucket() string {
if storeInstance.journal.bucket == "" {
return defaultJournalBucket
}
return storeInstance.journal.bucket
}
func ensureJournalSchema(database schemaDatabase) error {
if _, err := database.Exec(createJournalEntriesTableSQL); err != nil {
return err
}
if _, err := database.Exec(
"CREATE INDEX IF NOT EXISTS journal_entries_bucket_committed_at_idx ON " + journalEntriesTableName + " (bucket_name, committed_at)",
); err != nil {
return err
}
return nil
}
func insertJournalEntry(
executor journalExecutor,
bucket, measurement, fieldsJSON, tagsJSON string,
committedAt int64,
) error {
_, err := executor.Exec(
"INSERT INTO "+journalEntriesTableName+" (bucket_name, measurement, fields_json, tags_json, committed_at, archived_at) VALUES (?, ?, ?, ?, ?, NULL)",
bucket,
measurement,
fieldsJSON,
tagsJSON,
committedAt,
)
return err
}
func jsonString(value any, operation, message string) (string, error) {
result := core.JSONMarshal(value)
if !result.OK {
return "", core.E(operation, message, result.Value.(error))
}
return string(result.Value.([]byte)), nil
}
func fluxStartTime(value string) (time.Time, error) {
if value == "" {
return time.Time{}, core.E("store.fluxStartTime", "range value is empty", nil)
}
if core.HasSuffix(value, "d") {
days, err := strconv.Atoi(core.TrimSuffix(value, "d"))
if err != nil {
return time.Time{}, err
}
return time.Now().Add(time.Duration(days) * 24 * time.Hour), nil
}
lookback, err := time.ParseDuration(value)
if err != nil {
return time.Time{}, err
}
return time.Now().Add(lookback), nil
}
func quotedSubmatch(pattern *regexp.Regexp, value string) string {
match := pattern.FindStringSubmatch(value)
if len(match) < 2 {
return ""
}
return match[1]
}
func regexpSubmatch(pattern *regexp.Regexp, value string, index int) string {
match := pattern.FindStringSubmatch(value)
if len(match) <= index {
return ""
}
return match[index]
}
func queryRowsAsMaps(rows *sql.Rows) ([]map[string]any, error) {
columnNames, err := rows.Columns()
if err != nil {
return nil, err
}
var result []map[string]any
for rows.Next() {
rawValues := make([]any, len(columnNames))
scanTargets := make([]any, len(columnNames))
for i := range rawValues {
scanTargets[i] = &rawValues[i]
}
if err := rows.Scan(scanTargets...); err != nil {
return nil, err
}
row := make(map[string]any, len(columnNames))
for i, columnName := range columnNames {
row[columnName] = normaliseRowValue(rawValues[i])
}
result = append(result, row)
}
if err := rows.Err(); err != nil {
return nil, err
}
return result, nil
}
func inflateJournalRows(rows []map[string]any) []map[string]any {
for _, row := range rows {
if fieldsJSON, ok := row["fields_json"].(string); ok {
fields := make(map[string]any)
result := core.JSONUnmarshalString(fieldsJSON, &fields)
if result.OK {
row["fields"] = fields
}
}
if tagsJSON, ok := row["tags_json"].(string); ok {
tags := make(map[string]string)
result := core.JSONUnmarshalString(tagsJSON, &tags)
if result.OK {
row["tags"] = tags
}
}
}
return rows
}
func normaliseRowValue(value any) any {
switch typedValue := value.(type) {
case []byte:
return string(typedValue)
default:
return typedValue
}
}

69
journal_test.go Normal file
View file

@ -0,0 +1,69 @@
package store
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestJournal_CommitToJournal_Good_WithQueryJournalSQL(t *testing.T) {
storeInstance, err := New(":memory:", WithJournal("http://127.0.0.1:8086", "core", "events"))
require.NoError(t, err)
defer storeInstance.Close()
first := storeInstance.CommitToJournal("session-a", map[string]any{"like": 4}, map[string]string{"workspace": "session-a"})
second := storeInstance.CommitToJournal("session-b", map[string]any{"profile_match": 2}, map[string]string{"workspace": "session-b"})
require.True(t, first.OK, "first journal commit failed: %v", first.Value)
require.True(t, second.OK, "second journal commit failed: %v", second.Value)
rows := requireResultRows(
t,
storeInstance.QueryJournal("SELECT bucket_name, measurement, fields_json, tags_json FROM journal_entries ORDER BY entry_id"),
)
require.Len(t, rows, 2)
assert.Equal(t, "events", rows[0]["bucket_name"])
assert.Equal(t, "session-a", rows[0]["measurement"])
fields, ok := rows[0]["fields"].(map[string]any)
require.True(t, ok, "unexpected fields type: %T", rows[0]["fields"])
assert.Equal(t, float64(4), fields["like"])
tags, ok := rows[1]["tags"].(map[string]string)
require.True(t, ok, "unexpected tags type: %T", rows[1]["tags"])
assert.Equal(t, "session-b", tags["workspace"])
}
func TestJournal_QueryJournal_Good_FluxFilters(t *testing.T) {
storeInstance, err := New(":memory:", WithJournal("http://127.0.0.1:8086", "core", "events"))
require.NoError(t, err)
defer storeInstance.Close()
require.True(t,
storeInstance.CommitToJournal("session-a", map[string]any{"like": 1}, map[string]string{"workspace": "session-a"}).OK,
)
require.True(t,
storeInstance.CommitToJournal("session-b", map[string]any{"like": 2}, map[string]string{"workspace": "session-b"}).OK,
)
rows := requireResultRows(
t,
storeInstance.QueryJournal(`from(bucket: "events") |> range(start: -24h) |> filter(fn: (r) => r._measurement == "session-b")`),
)
require.Len(t, rows, 1)
assert.Equal(t, "session-b", rows[0]["measurement"])
fields, ok := rows[0]["fields"].(map[string]any)
require.True(t, ok, "unexpected fields type: %T", rows[0]["fields"])
assert.Equal(t, float64(2), fields["like"])
}
func TestJournal_CommitToJournal_Bad_EmptyMeasurement(t *testing.T) {
storeInstance, err := New(":memory:")
require.NoError(t, err)
defer storeInstance.Close()
result := storeInstance.CommitToJournal("", map[string]any{"like": 1}, map[string]string{"workspace": "missing"})
require.False(t, result.OK)
assert.Contains(t, result.Value.(error).Error(), "measurement is empty")
}

102
scope.go
View file

@ -11,6 +11,8 @@ import (
// validNamespace.MatchString("tenant-a") is true; validNamespace.MatchString("tenant_a") is false.
var validNamespace = regexp.MustCompile(`^[a-zA-Z0-9-]+$`)
const defaultScopedGroupName = "default"
// QuotaConfig sets per-namespace key and group limits.
// Usage example: `quota := store.QuotaConfig{MaxKeys: 100, MaxGroups: 10}`
type QuotaConfig struct {
@ -25,7 +27,8 @@ type QuotaConfig struct {
type ScopedStore struct {
storeInstance *Store
namespace string
quota QuotaConfig
MaxKeys int
MaxGroups int
}
// NewScoped validates a namespace and prefixes groups with namespace + ":".
@ -55,7 +58,8 @@ func NewScopedWithQuota(storeInstance *Store, namespace string, quota QuotaConfi
nil,
)
}
scopedStore.quota = quota
scopedStore.MaxKeys = quota.MaxKeys
scopedStore.MaxGroups = quota.MaxGroups
return scopedStore, nil
}
@ -67,6 +71,10 @@ func (scopedStore *ScopedStore) namespacePrefix() string {
return scopedStore.namespace + ":"
}
func (scopedStore *ScopedStore) defaultGroup() string {
return defaultScopedGroupName
}
func (scopedStore *ScopedStore) trimNamespacePrefix(groupName string) string {
return core.TrimPrefix(groupName, scopedStore.namespacePrefix())
}
@ -77,19 +85,41 @@ func (scopedStore *ScopedStore) Namespace() string {
return scopedStore.namespace
}
// Usage example: `colourValue, err := scopedStore.Get("colour")`
// Usage example: `colourValue, err := scopedStore.Get("config", "colour")`
func (scopedStore *ScopedStore) Get(group, key string) (string, error) {
func (scopedStore *ScopedStore) Get(arguments ...string) (string, error) {
group, key, err := scopedStore.getArguments(arguments)
if err != nil {
return "", err
}
return scopedStore.storeInstance.Get(scopedStore.namespacedGroup(group), key)
}
// GetFrom reads a key from an explicit namespaced group.
// Usage example: `colourValue, err := scopedStore.GetFrom("config", "colour")`
func (scopedStore *ScopedStore) GetFrom(group, key string) (string, error) {
return scopedStore.Get(group, key)
}
// Usage example: `if err := scopedStore.Set("colour", "blue"); err != nil { return }`
// Usage example: `if err := scopedStore.Set("config", "colour", "blue"); err != nil { return }`
func (scopedStore *ScopedStore) Set(group, key, value string) error {
func (scopedStore *ScopedStore) Set(arguments ...string) error {
group, key, value, err := scopedStore.setArguments(arguments)
if err != nil {
return err
}
if err := scopedStore.checkQuota("store.ScopedStore.Set", group, key); err != nil {
return err
}
return scopedStore.storeInstance.Set(scopedStore.namespacedGroup(group), key, value)
}
// SetIn writes a key to an explicit namespaced group.
// Usage example: `if err := scopedStore.SetIn("config", "colour", "blue"); err != nil { return }`
func (scopedStore *ScopedStore) SetIn(group, key, value string) error {
return scopedStore.Set(group, key, value)
}
// Usage example: `if err := scopedStore.SetWithTTL("sessions", "token", "abc123", time.Hour); err != nil { return }`
func (scopedStore *ScopedStore) SetWithTTL(group, key, value string, timeToLive time.Duration) error {
if err := scopedStore.checkQuota("store.ScopedStore.SetWithTTL", group, key); err != nil {
@ -118,19 +148,26 @@ func (scopedStore *ScopedStore) All(group string) iter.Seq2[KeyValue, error] {
return scopedStore.storeInstance.All(scopedStore.namespacedGroup(group))
}
// Usage example: `for entry, err := range scopedStore.AllSeq("config") { if err != nil { break }; fmt.Println(entry.Key, entry.Value) }`
func (scopedStore *ScopedStore) AllSeq(group string) iter.Seq2[KeyValue, error] {
return scopedStore.All(group)
}
// Usage example: `keyCount, err := scopedStore.Count("config")`
func (scopedStore *ScopedStore) Count(group string) (int, error) {
return scopedStore.storeInstance.Count(scopedStore.namespacedGroup(group))
}
// Usage example: `keyCount, err := scopedStore.CountAll("config")`
func (scopedStore *ScopedStore) CountAll(groupPrefix string) (int, error) {
return scopedStore.storeInstance.CountAll(scopedStore.namespacedGroup(groupPrefix))
// Usage example: `keyCount, err := scopedStore.CountAll()`
func (scopedStore *ScopedStore) CountAll(groupPrefix ...string) (int, error) {
return scopedStore.storeInstance.CountAll(scopedStore.namespacedGroup(firstString(groupPrefix)))
}
// Usage example: `groupNames, err := scopedStore.Groups("config")`
func (scopedStore *ScopedStore) Groups(groupPrefix string) ([]string, error) {
groupNames, err := scopedStore.storeInstance.Groups(scopedStore.namespacedGroup(groupPrefix))
// Usage example: `groupNames, err := scopedStore.Groups()`
func (scopedStore *ScopedStore) Groups(groupPrefix ...string) ([]string, error) {
groupNames, err := scopedStore.storeInstance.Groups(scopedStore.namespacedGroup(firstString(groupPrefix)))
if err != nil {
return nil, err
}
@ -141,10 +178,11 @@ func (scopedStore *ScopedStore) Groups(groupPrefix string) ([]string, error) {
}
// Usage example: `for groupName, err := range scopedStore.GroupsSeq("config") { if err != nil { break }; fmt.Println(groupName) }`
func (scopedStore *ScopedStore) GroupsSeq(groupPrefix string) iter.Seq2[string, error] {
// Usage example: `for groupName, err := range scopedStore.GroupsSeq() { if err != nil { break }; fmt.Println(groupName) }`
func (scopedStore *ScopedStore) GroupsSeq(groupPrefix ...string) iter.Seq2[string, error] {
return func(yield func(string, error) bool) {
namespacePrefix := scopedStore.namespacePrefix()
for groupName, err := range scopedStore.storeInstance.GroupsSeq(scopedStore.namespacedGroup(groupPrefix)) {
for groupName, err := range scopedStore.storeInstance.GroupsSeq(scopedStore.namespacedGroup(firstString(groupPrefix))) {
if err != nil {
if !yield("", err) {
return
@ -187,7 +225,7 @@ func (scopedStore *ScopedStore) PurgeExpired() (int64, error) {
// group would exceed the configured limit. Existing keys are treated as
// upserts and do not consume quota.
func (scopedStore *ScopedStore) checkQuota(operation, group, key string) error {
if scopedStore.quota.MaxKeys == 0 && scopedStore.quota.MaxGroups == 0 {
if scopedStore.MaxKeys == 0 && scopedStore.MaxGroups == 0 {
return nil
}
@ -206,18 +244,18 @@ func (scopedStore *ScopedStore) checkQuota(operation, group, key string) error {
}
// Check MaxKeys quota.
if scopedStore.quota.MaxKeys > 0 {
if scopedStore.MaxKeys > 0 {
keyCount, err := scopedStore.storeInstance.CountAll(namespacePrefix)
if err != nil {
return core.E(operation, "quota check", err)
}
if keyCount >= scopedStore.quota.MaxKeys {
return core.E(operation, core.Sprintf("key limit (%d)", scopedStore.quota.MaxKeys), QuotaExceededError)
if keyCount >= scopedStore.MaxKeys {
return core.E(operation, core.Sprintf("key limit (%d)", scopedStore.MaxKeys), QuotaExceededError)
}
}
// Check MaxGroups quota — only if this would create a new group.
if scopedStore.quota.MaxGroups > 0 {
if scopedStore.MaxGroups > 0 {
existingGroupCount, err := scopedStore.storeInstance.Count(namespacedGroup)
if err != nil {
return core.E(operation, "quota check", err)
@ -231,11 +269,41 @@ func (scopedStore *ScopedStore) checkQuota(operation, group, key string) error {
}
knownGroupCount++
}
if knownGroupCount >= scopedStore.quota.MaxGroups {
return core.E(operation, core.Sprintf("group limit (%d)", scopedStore.quota.MaxGroups), QuotaExceededError)
if knownGroupCount >= scopedStore.MaxGroups {
return core.E(operation, core.Sprintf("group limit (%d)", scopedStore.MaxGroups), QuotaExceededError)
}
}
}
return nil
}
func (scopedStore *ScopedStore) getArguments(arguments []string) (string, string, error) {
switch len(arguments) {
case 1:
return scopedStore.defaultGroup(), arguments[0], nil
case 2:
return arguments[0], arguments[1], nil
default:
return "", "", core.E(
"store.ScopedStore.Get",
core.Sprintf("expected 1 or 2 arguments; got %d", len(arguments)),
nil,
)
}
}
func (scopedStore *ScopedStore) setArguments(arguments []string) (string, string, string, error) {
switch len(arguments) {
case 2:
return scopedStore.defaultGroup(), arguments[0], arguments[1], nil
case 3:
return arguments[0], arguments[1], arguments[2], nil
default:
return "", "", "", core.E(
"store.ScopedStore.Set",
core.Sprintf("expected 2 or 3 arguments; got %d", len(arguments)),
nil,
)
}
}

View file

@ -94,6 +94,17 @@ func TestScope_NewScopedWithQuota_Bad_NegativeMaxGroups(t *testing.T) {
assert.Contains(t, err.Error(), "zero or positive")
}
func TestScope_NewScopedWithQuota_Good_InlineQuotaFields(t *testing.T) {
storeInstance, _ := New(":memory:")
defer storeInstance.Close()
scopedStore, err := NewScopedWithQuota(storeInstance, "tenant-a", QuotaConfig{MaxKeys: 4, MaxGroups: 2})
require.NoError(t, err)
assert.Equal(t, 4, scopedStore.MaxKeys)
assert.Equal(t, 2, scopedStore.MaxGroups)
}
// ---------------------------------------------------------------------------
// ScopedStore — basic CRUD
// ---------------------------------------------------------------------------
@ -110,6 +121,34 @@ func TestScope_ScopedStore_Good_SetGet(t *testing.T) {
assert.Equal(t, "dark", value)
}
func TestScope_ScopedStore_Good_DefaultGroupHelpers(t *testing.T) {
storeInstance, _ := New(":memory:")
defer storeInstance.Close()
scopedStore, _ := NewScoped(storeInstance, "tenant-a")
require.NoError(t, scopedStore.Set("theme", "dark"))
value, err := scopedStore.Get("theme")
require.NoError(t, err)
assert.Equal(t, "dark", value)
rawValue, err := storeInstance.Get("tenant-a:default", "theme")
require.NoError(t, err)
assert.Equal(t, "dark", rawValue)
}
func TestScope_ScopedStore_Good_SetInGetFrom(t *testing.T) {
storeInstance, _ := New(":memory:")
defer storeInstance.Close()
scopedStore, _ := NewScoped(storeInstance, "tenant-a")
require.NoError(t, scopedStore.SetIn("config", "theme", "dark"))
value, err := scopedStore.GetFrom("config", "theme")
require.NoError(t, err)
assert.Equal(t, "dark", value)
}
func TestScope_ScopedStore_Good_PrefixedInUnderlyingStore(t *testing.T) {
storeInstance, _ := New(":memory:")
defer storeInstance.Close()

View file

@ -29,14 +29,25 @@ const (
entryValueColumn = "entry_value"
)
// StoreOption customises Store construction.
// Usage example: `storeInstance, err := store.New("/tmp/go-store.db", store.WithJournal("http://127.0.0.1:8086", "core", "events"))`
type StoreOption func(*Store)
type journalConfig struct {
url string
org string
bucket string
}
// Store provides SQLite-backed grouped entries with TTL expiry, namespace
// isolation, and reactive change notifications.
// isolation, reactive change notifications, and optional journal support.
// Usage example: `storeInstance, err := store.New(":memory:"); if err != nil { return }; if err := storeInstance.Set("config", "colour", "blue"); err != nil { return }`
type Store struct {
database *sql.DB
cancelPurge context.CancelFunc
purgeWaitGroup sync.WaitGroup
purgeInterval time.Duration // interval between background purge cycles
journal journalConfig
// Event dispatch state.
watchers []*Watcher
@ -47,8 +58,18 @@ type Store struct {
nextCallbackRegistrationID uint64 // monotonic ID for callback registrations
}
// Usage example: `storeInstance, err := store.New(":memory:"); if err != nil { return }`
func New(databasePath string) (*Store, error) {
// WithJournal records journal connection metadata for workspace commits,
// journal queries, and archive generation.
// Usage example: `storeInstance, err := store.New("/tmp/go-store.db", store.WithJournal("http://127.0.0.1:8086", "core", "events"))`
func WithJournal(url, org, bucket string) StoreOption {
return func(storeInstance *Store) {
storeInstance.journal = journalConfig{url: url, org: org, bucket: bucket}
}
}
// Usage example: `storeInstance, err := store.New(":memory:")`
// Usage example: `storeInstance, err := store.New("/tmp/go-store.db", store.WithJournal("http://127.0.0.1:8086", "core", "events"))`
func New(databasePath string, options ...StoreOption) (*Store, error) {
sqliteDatabase, err := sql.Open("sqlite", databasePath)
if err != nil {
return nil, core.E("store.New", "open database", err)
@ -73,6 +94,11 @@ func New(databasePath string) (*Store, error) {
purgeContext, cancel := context.WithCancel(context.Background())
storeInstance := &Store{database: sqliteDatabase, cancelPurge: cancel, purgeInterval: 60 * time.Second}
for _, option := range options {
if option != nil {
option(storeInstance)
}
}
storeInstance.startBackgroundPurge(purgeContext)
return storeInstance, nil
}
@ -236,6 +262,11 @@ func (storeInstance *Store) All(group string) iter.Seq2[KeyValue, error] {
}
}
// Usage example: `for entry, err := range storeInstance.AllSeq("config") { if err != nil { break }; fmt.Println(entry.Key, entry.Value) }`
func (storeInstance *Store) AllSeq(group string) iter.Seq2[KeyValue, error] {
return storeInstance.All(group)
}
// Usage example: `parts, err := storeInstance.GetSplit("config", "hosts", ","); if err != nil { return }; for part := range parts { fmt.Println(part) }`
func (storeInstance *Store) GetSplit(group, key, separator string) (iter.Seq[string], error) {
value, err := storeInstance.Get(group, key)
@ -297,9 +328,10 @@ func (storeInstance *Store) CountAll(groupPrefix string) (int, error) {
}
// Usage example: `tenantGroupNames, err := storeInstance.Groups("tenant-a:")`
func (storeInstance *Store) Groups(groupPrefix string) ([]string, error) {
// Usage example: `allGroupNames, err := storeInstance.Groups()`
func (storeInstance *Store) Groups(groupPrefix ...string) ([]string, error) {
var groupNames []string
for groupName, err := range storeInstance.GroupsSeq(groupPrefix) {
for groupName, err := range storeInstance.GroupsSeq(groupPrefix...) {
if err != nil {
return nil, err
}
@ -309,12 +341,14 @@ func (storeInstance *Store) Groups(groupPrefix string) ([]string, error) {
}
// Usage example: `for tenantGroupName, err := range storeInstance.GroupsSeq("tenant-a:") { if err != nil { break }; fmt.Println(tenantGroupName) }`
func (storeInstance *Store) GroupsSeq(groupPrefix string) iter.Seq2[string, error] {
// Usage example: `for groupName, err := range storeInstance.GroupsSeq() { if err != nil { break }; fmt.Println(groupName) }`
func (storeInstance *Store) GroupsSeq(groupPrefix ...string) iter.Seq2[string, error] {
actualGroupPrefix := firstString(groupPrefix)
return func(yield func(string, error) bool) {
var rows *sql.Rows
var err error
now := time.Now().UnixMilli()
if groupPrefix == "" {
if actualGroupPrefix == "" {
rows, err = storeInstance.database.Query(
"SELECT DISTINCT "+entryGroupColumn+" FROM "+entriesTableName+" WHERE (expires_at IS NULL OR expires_at > ?) ORDER BY "+entryGroupColumn,
now,
@ -322,7 +356,7 @@ func (storeInstance *Store) GroupsSeq(groupPrefix string) iter.Seq2[string, erro
} else {
rows, err = storeInstance.database.Query(
"SELECT DISTINCT "+entryGroupColumn+" FROM "+entriesTableName+" WHERE "+entryGroupColumn+" LIKE ? ESCAPE '^' AND (expires_at IS NULL OR expires_at > ?) ORDER BY "+entryGroupColumn,
escapeLike(groupPrefix)+"%", now,
escapeLike(actualGroupPrefix)+"%", now,
)
}
if err != nil {
@ -349,6 +383,13 @@ func (storeInstance *Store) GroupsSeq(groupPrefix string) iter.Seq2[string, erro
}
}
func firstString(values []string) string {
if len(values) == 0 {
return ""
}
return values[0]
}
// escapeLike("tenant_%") returns "tenant^_^%" so LIKE queries treat wildcards
// literally.
func escapeLike(text string) string {

View file

@ -98,6 +98,16 @@ func TestStore_New_Good_WALMode(t *testing.T) {
assert.Equal(t, "wal", mode, "journal_mode should be WAL")
}
func TestStore_New_Good_WithJournalOption(t *testing.T) {
storeInstance, err := New(":memory:", WithJournal("http://127.0.0.1:8086", "core", "events"))
require.NoError(t, err)
defer storeInstance.Close()
assert.Equal(t, "events", storeInstance.journal.bucket)
assert.Equal(t, "core", storeInstance.journal.org)
assert.Equal(t, "http://127.0.0.1:8086", storeInstance.journal.url)
}
// ---------------------------------------------------------------------------
// Set / Get — core CRUD
// ---------------------------------------------------------------------------
@ -372,6 +382,22 @@ func TestStore_All_Good_SortedByKey(t *testing.T) {
assert.Equal(t, []string{"alpha", "bravo", "charlie"}, keys)
}
func TestStore_AllSeq_Good_Alias(t *testing.T) {
storeInstance, _ := New(":memory:")
defer storeInstance.Close()
require.NoError(t, storeInstance.Set("g", "alpha", "1"))
require.NoError(t, storeInstance.Set("g", "bravo", "2"))
var keys []string
for entry, err := range storeInstance.AllSeq("g") {
require.NoError(t, err)
keys = append(keys, entry.Key)
}
assert.Equal(t, []string{"alpha", "bravo"}, keys)
}
func TestStore_All_Bad_ClosedStore(t *testing.T) {
storeInstance, _ := New(":memory:")
storeInstance.Close()
@ -434,6 +460,22 @@ func TestStore_GroupsSeq_Good_SortedByGroupName(t *testing.T) {
assert.Equal(t, []string{"alpha", "bravo", "charlie"}, groups)
}
func TestStore_GroupsSeq_Good_DefaultArgument(t *testing.T) {
storeInstance, _ := New(":memory:")
defer storeInstance.Close()
require.NoError(t, storeInstance.Set("alpha", "a", "1"))
require.NoError(t, storeInstance.Set("beta", "b", "2"))
var groups []string
for group, err := range storeInstance.GroupsSeq() {
require.NoError(t, err)
groups = append(groups, group)
}
assert.Equal(t, []string{"alpha", "beta"}, groups)
}
func TestStore_GroupsSeq_Bad_ClosedStore(t *testing.T) {
storeInstance, _ := New(":memory:")
storeInstance.Close()

View file

@ -43,3 +43,38 @@ func repeatString(value string, count int) string {
}
return builder.String()
}
func useWorkspaceStateDirectory(tb testing.TB) string {
tb.Helper()
previous := defaultWorkspaceStateDirectory
stateDirectory := testPath(tb, "state")
defaultWorkspaceStateDirectory = stateDirectory
tb.Cleanup(func() {
defaultWorkspaceStateDirectory = previous
_ = testFilesystem().DeleteAll(stateDirectory)
})
return stateDirectory
}
func useArchiveOutputDirectory(tb testing.TB) string {
tb.Helper()
previous := defaultArchiveOutputDirectory
outputDirectory := testPath(tb, "archive")
defaultArchiveOutputDirectory = outputDirectory
tb.Cleanup(func() {
defaultArchiveOutputDirectory = previous
_ = testFilesystem().DeleteAll(outputDirectory)
})
return outputDirectory
}
func requireResultRows(tb testing.TB, result core.Result) []map[string]any {
tb.Helper()
require.True(tb, result.OK, "core result failed: %v", result.Value)
rows, ok := result.Value.([]map[string]any)
require.True(tb, ok, "unexpected row type: %T", result.Value)
return rows
}

326
workspace.go Normal file
View file

@ -0,0 +1,326 @@
package store
import (
"database/sql"
"io/fs"
"sync"
"time"
core "dappco.re/go/core"
)
const (
workspaceEntriesTableName = "workspace_entries"
workspaceIdentityGroupName = "workspace"
)
const createWorkspaceEntriesTableSQL = `CREATE TABLE IF NOT EXISTS workspace_entries (
entry_id INTEGER PRIMARY KEY AUTOINCREMENT,
entry_kind TEXT NOT NULL,
entry_data TEXT NOT NULL,
created_at INTEGER NOT NULL
)`
var defaultWorkspaceStateDirectory = ".core/state"
// Workspace accumulates mutable work-in-progress entries before they are
// committed to the durable store journal.
// Usage example: `workspace, err := storeInstance.NewWorkspace("scroll-session-2026-03-30"); if err != nil { return }; defer workspace.Discard()`
type Workspace struct {
name string
storeInstance *Store
database *sql.DB
databasePath string
filesystem *core.Fs
closeLock sync.Mutex
closed bool
}
// NewWorkspace creates a workspace state file under `.core/state/`.
// Usage example: `workspace, err := storeInstance.NewWorkspace("scroll-session-2026-03-30")`
func (storeInstance *Store) NewWorkspace(name string) (*Workspace, error) {
validation := core.ValidateName(name)
if !validation.OK {
return nil, core.E("store.NewWorkspace", "validate workspace name", validation.Value.(error))
}
filesystem := (&core.Fs{}).NewUnrestricted()
databasePath := workspaceFilePath(defaultWorkspaceStateDirectory, name)
if filesystem.Exists(databasePath) {
return nil, core.E("store.NewWorkspace", core.Concat("workspace already exists: ", name), nil)
}
if result := filesystem.EnsureDir(defaultWorkspaceStateDirectory); !result.OK {
return nil, core.E("store.NewWorkspace", "ensure state directory", result.Value.(error))
}
workspaceDatabase, err := openWorkspaceDatabase(databasePath)
if err != nil {
return nil, core.E("store.NewWorkspace", "open workspace database", err)
}
return &Workspace{
name: name,
storeInstance: storeInstance,
database: workspaceDatabase,
databasePath: databasePath,
filesystem: filesystem,
}, nil
}
// RecoverOrphans opens any leftover workspace files so callers can inspect and
// decide whether to commit or discard them.
// Usage example: `orphans := storeInstance.RecoverOrphans(".core/state")`
func (storeInstance *Store) RecoverOrphans(stateDirectory string) []*Workspace {
if stateDirectory == "" {
stateDirectory = defaultWorkspaceStateDirectory
}
filesystem := (&core.Fs{}).NewUnrestricted()
if !filesystem.Exists(stateDirectory) {
return nil
}
listResult := filesystem.List(stateDirectory)
if !listResult.OK {
return nil
}
entries, ok := listResult.Value.([]fs.DirEntry)
if !ok {
return nil
}
var workspaces []*Workspace
for _, dirEntry := range entries {
if dirEntry.IsDir() || !core.HasSuffix(dirEntry.Name(), ".duckdb") {
continue
}
name := core.TrimSuffix(dirEntry.Name(), ".duckdb")
databasePath := workspaceFilePath(stateDirectory, name)
workspaceDatabase, err := openWorkspaceDatabase(databasePath)
if err != nil {
continue
}
workspaces = append(workspaces, &Workspace{
name: name,
storeInstance: storeInstance,
database: workspaceDatabase,
databasePath: databasePath,
filesystem: filesystem,
})
}
return workspaces
}
// Put appends one entry to the workspace buffer.
// Usage example: `err := workspace.Put("like", map[string]any{"user": "@alice", "post": "video_123"})`
func (workspace *Workspace) Put(kind string, data map[string]any) error {
if kind == "" {
return core.E("store.Workspace.Put", "kind is empty", nil)
}
if data == nil {
data = map[string]any{}
}
dataJSON, err := jsonString(data, "store.Workspace.Put", "marshal entry data")
if err != nil {
return err
}
_, err = workspace.database.Exec(
"INSERT INTO "+workspaceEntriesTableName+" (entry_kind, entry_data, created_at) VALUES (?, ?, ?)",
kind,
dataJSON,
time.Now().UnixMilli(),
)
if err != nil {
return core.E("store.Workspace.Put", "insert entry", err)
}
return nil
}
// Aggregate returns the current per-kind entry counts in the workspace.
// Usage example: `summary := workspace.Aggregate()`
func (workspace *Workspace) Aggregate() map[string]any {
fields, err := workspace.aggregateFields()
if err != nil {
return map[string]any{}
}
return fields
}
// Commit writes the aggregated workspace state to the journal and updates the
// store summary entry for the workspace.
// Usage example: `result := workspace.Commit()`
func (workspace *Workspace) Commit() core.Result {
fields, err := workspace.aggregateFields()
if err != nil {
return core.Result{Value: core.E("store.Workspace.Commit", "aggregate workspace", err), OK: false}
}
if err := workspace.storeInstance.commitWorkspaceAggregate(workspace.name, fields); err != nil {
return core.Result{Value: err, OK: false}
}
if err := workspace.closeAndDelete(); err != nil {
return core.Result{Value: err, OK: false}
}
return core.Result{Value: fields, OK: true}
}
// Discard closes the workspace and removes its backing file.
// Usage example: `workspace.Discard()`
func (workspace *Workspace) Discard() {
_ = workspace.closeAndDelete()
}
// Query runs ad-hoc SQL against the workspace buffer.
// Usage example: `result := workspace.Query("SELECT entry_kind, COUNT(*) AS count FROM workspace_entries GROUP BY entry_kind")`
func (workspace *Workspace) Query(sqlQuery string) core.Result {
rows, err := workspace.database.Query(sqlQuery)
if err != nil {
return core.Result{Value: core.E("store.Workspace.Query", "query workspace", err), OK: false}
}
defer rows.Close()
rowMaps, err := queryRowsAsMaps(rows)
if err != nil {
return core.Result{Value: core.E("store.Workspace.Query", "scan rows", err), OK: false}
}
return core.Result{Value: rowMaps, OK: true}
}
func (workspace *Workspace) aggregateFields() (map[string]any, error) {
rows, err := workspace.database.Query(
"SELECT entry_kind, COUNT(*) FROM " + workspaceEntriesTableName + " GROUP BY entry_kind ORDER BY entry_kind",
)
if err != nil {
return nil, err
}
defer rows.Close()
fields := make(map[string]any)
for rows.Next() {
var (
kind string
count int
)
if err := rows.Scan(&kind, &count); err != nil {
return nil, err
}
fields[kind] = count
}
if err := rows.Err(); err != nil {
return nil, err
}
return fields, nil
}
func (workspace *Workspace) closeAndDelete() error {
workspace.closeLock.Lock()
defer workspace.closeLock.Unlock()
if workspace.closed {
return nil
}
workspace.closed = true
if err := workspace.database.Close(); err != nil {
return core.E("store.Workspace.closeAndDelete", "close workspace database", err)
}
for _, path := range []string{workspace.databasePath, workspace.databasePath + "-wal", workspace.databasePath + "-shm"} {
if result := workspace.filesystem.Delete(path); !result.OK && workspace.filesystem.Exists(path) {
return core.E("store.Workspace.closeAndDelete", "delete workspace file", result.Value.(error))
}
}
return nil
}
func (storeInstance *Store) commitWorkspaceAggregate(workspaceName string, fields map[string]any) error {
if err := ensureJournalSchema(storeInstance.database); err != nil {
return core.E("store.Workspace.Commit", "ensure journal schema", err)
}
transaction, err := storeInstance.database.Begin()
if err != nil {
return core.E("store.Workspace.Commit", "begin transaction", err)
}
committed := false
defer func() {
if !committed {
_ = transaction.Rollback()
}
}()
fieldsJSON, err := jsonString(fields, "store.Workspace.Commit", "marshal summary")
if err != nil {
return err
}
tagsJSON, err := jsonString(map[string]string{"workspace": workspaceName}, "store.Workspace.Commit", "marshal tags")
if err != nil {
return err
}
if err := insertJournalEntry(
transaction,
storeInstance.journalBucket(),
workspaceName,
fieldsJSON,
tagsJSON,
time.Now().UnixMilli(),
); err != nil {
return core.E("store.Workspace.Commit", "insert journal entry", err)
}
if _, err := transaction.Exec(
"INSERT INTO "+entriesTableName+" ("+entryGroupColumn+", "+entryKeyColumn+", "+entryValueColumn+", expires_at) VALUES (?, ?, ?, NULL) "+
"ON CONFLICT("+entryGroupColumn+", "+entryKeyColumn+") DO UPDATE SET "+entryValueColumn+" = excluded."+entryValueColumn+", expires_at = NULL",
workspaceSummaryGroup(workspaceName),
"summary",
fieldsJSON,
); err != nil {
return core.E("store.Workspace.Commit", "upsert workspace summary", err)
}
if err := transaction.Commit(); err != nil {
return core.E("store.Workspace.Commit", "commit transaction", err)
}
committed = true
return nil
}
func openWorkspaceDatabase(databasePath string) (*sql.DB, error) {
workspaceDatabase, err := sql.Open("sqlite", databasePath)
if err != nil {
return nil, err
}
workspaceDatabase.SetMaxOpenConns(1)
if _, err := workspaceDatabase.Exec("PRAGMA journal_mode=WAL"); err != nil {
workspaceDatabase.Close()
return nil, err
}
if _, err := workspaceDatabase.Exec("PRAGMA busy_timeout=5000"); err != nil {
workspaceDatabase.Close()
return nil, err
}
if _, err := workspaceDatabase.Exec(createWorkspaceEntriesTableSQL); err != nil {
workspaceDatabase.Close()
return nil, err
}
return workspaceDatabase, nil
}
func workspaceSummaryGroup(workspaceName string) string {
return core.Concat(workspaceIdentityGroupName, ":", workspaceName)
}
func workspaceFilePath(stateDirectory, name string) string {
return joinPath(stateDirectory, core.Concat(name, ".duckdb"))
}
func joinPath(base, name string) string {
if base == "" {
return name
}
return core.Concat(core.TrimSuffix(base, "/"), "/", name)
}

121
workspace_test.go Normal file
View file

@ -0,0 +1,121 @@
package store
import (
"testing"
core "dappco.re/go/core"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestWorkspace_NewWorkspace_Good_CreatePutAggregateQuery(t *testing.T) {
stateDirectory := useWorkspaceStateDirectory(t)
storeInstance, err := New(":memory:", WithJournal("http://127.0.0.1:8086", "core", "events"))
require.NoError(t, err)
defer storeInstance.Close()
workspace, err := storeInstance.NewWorkspace("scroll-session")
require.NoError(t, err)
defer workspace.Discard()
assert.Equal(t, workspaceFilePath(stateDirectory, "scroll-session"), workspace.databasePath)
assert.True(t, testFilesystem().Exists(workspace.databasePath))
require.NoError(t, workspace.Put("like", map[string]any{"user": "@alice"}))
require.NoError(t, workspace.Put("like", map[string]any{"user": "@bob"}))
require.NoError(t, workspace.Put("profile_match", map[string]any{"user": "@charlie"}))
assert.Equal(t, map[string]any{"like": 2, "profile_match": 1}, workspace.Aggregate())
rows := requireResultRows(
t,
workspace.Query("SELECT entry_kind, COUNT(*) AS entry_count FROM workspace_entries GROUP BY entry_kind ORDER BY entry_kind"),
)
require.Len(t, rows, 2)
assert.Equal(t, "like", rows[0]["entry_kind"])
assert.Equal(t, int64(2), rows[0]["entry_count"])
assert.Equal(t, "profile_match", rows[1]["entry_kind"])
assert.Equal(t, int64(1), rows[1]["entry_count"])
}
func TestWorkspace_Commit_Good_JournalAndSummary(t *testing.T) {
useWorkspaceStateDirectory(t)
storeInstance, err := New(":memory:", WithJournal("http://127.0.0.1:8086", "core", "events"))
require.NoError(t, err)
defer storeInstance.Close()
workspace, err := storeInstance.NewWorkspace("scroll-session")
require.NoError(t, err)
require.NoError(t, workspace.Put("like", map[string]any{"user": "@alice"}))
require.NoError(t, workspace.Put("like", map[string]any{"user": "@bob"}))
require.NoError(t, workspace.Put("profile_match", map[string]any{"user": "@charlie"}))
result := workspace.Commit()
require.True(t, result.OK, "workspace commit failed: %v", result.Value)
assert.Equal(t, map[string]any{"like": 2, "profile_match": 1}, result.Value)
assert.False(t, testFilesystem().Exists(workspace.databasePath))
summaryJSON, err := storeInstance.Get(workspaceSummaryGroup("scroll-session"), "summary")
require.NoError(t, err)
summary := make(map[string]any)
summaryResult := core.JSONUnmarshalString(summaryJSON, &summary)
require.True(t, summaryResult.OK, "summary unmarshal failed: %v", summaryResult.Value)
assert.Equal(t, float64(2), summary["like"])
assert.Equal(t, float64(1), summary["profile_match"])
rows := requireResultRows(
t,
storeInstance.QueryJournal(`from(bucket: "events") |> range(start: -24h) |> filter(fn: (r) => r._measurement == "scroll-session")`),
)
require.Len(t, rows, 1)
assert.Equal(t, "scroll-session", rows[0]["measurement"])
fields, ok := rows[0]["fields"].(map[string]any)
require.True(t, ok, "unexpected fields type: %T", rows[0]["fields"])
assert.Equal(t, float64(2), fields["like"])
assert.Equal(t, float64(1), fields["profile_match"])
tags, ok := rows[0]["tags"].(map[string]string)
require.True(t, ok, "unexpected tags type: %T", rows[0]["tags"])
assert.Equal(t, "scroll-session", tags["workspace"])
}
func TestWorkspace_Discard_Good_Idempotent(t *testing.T) {
useWorkspaceStateDirectory(t)
storeInstance, err := New(":memory:")
require.NoError(t, err)
defer storeInstance.Close()
workspace, err := storeInstance.NewWorkspace("discard-session")
require.NoError(t, err)
workspace.Discard()
workspace.Discard()
assert.False(t, testFilesystem().Exists(workspace.databasePath))
}
func TestWorkspace_RecoverOrphans_Good(t *testing.T) {
stateDirectory := useWorkspaceStateDirectory(t)
storeInstance, err := New(":memory:", WithJournal("http://127.0.0.1:8086", "core", "events"))
require.NoError(t, err)
defer storeInstance.Close()
workspace, err := storeInstance.NewWorkspace("orphan-session")
require.NoError(t, err)
require.NoError(t, workspace.Put("like", map[string]any{"user": "@alice"}))
require.NoError(t, workspace.database.Close())
orphans := storeInstance.RecoverOrphans(stateDirectory)
require.Len(t, orphans, 1)
assert.Equal(t, map[string]any{"like": 1}, orphans[0].Aggregate())
orphans[0].Discard()
assert.False(t, testFilesystem().Exists(workspaceFilePath(stateDirectory, "orphan-session")))
}