[agent/codex:gpt-5.4] Implement docs/RFC-STORE.md using docs/RFC-CORE-008-AGENT-EX... #26
13 changed files with 1404 additions and 28 deletions
191
compact.go
Normal file
191
compact.go
Normal file
|
|
@ -0,0 +1,191 @@
|
|||
package store
|
||||
|
||||
import (
|
||||
"compress/gzip"
|
||||
"io"
|
||||
"time"
|
||||
|
||||
core "dappco.re/go/core"
|
||||
)
|
||||
|
||||
var defaultArchiveOutputDirectory = ".core/archive"
|
||||
|
||||
// CompactOptions controls cold archive generation.
|
||||
// Usage example: `options := store.CompactOptions{Before: time.Now().Add(-90 * 24 * time.Hour), Output: "/tmp/archive", Format: "gzip"}`
|
||||
type CompactOptions struct {
|
||||
Before time.Time
|
||||
Output string
|
||||
Format string
|
||||
}
|
||||
|
||||
type compactArchiveEntry struct {
|
||||
id int64
|
||||
bucket string
|
||||
measurement string
|
||||
fieldsJSON string
|
||||
tagsJSON string
|
||||
committedAt int64
|
||||
}
|
||||
|
||||
// Compact archives old journal entries as newline-delimited JSON.
|
||||
// Usage example: `result := storeInstance.Compact(store.CompactOptions{Before: time.Now().Add(-30 * 24 * time.Hour), Output: "/tmp/archive", Format: "gzip"})`
|
||||
func (storeInstance *Store) Compact(options CompactOptions) core.Result {
|
||||
if err := ensureJournalSchema(storeInstance.database); err != nil {
|
||||
return core.Result{Value: core.E("store.Compact", "ensure journal schema", err), OK: false}
|
||||
}
|
||||
|
||||
outputDirectory := options.Output
|
||||
if outputDirectory == "" {
|
||||
outputDirectory = defaultArchiveOutputDirectory
|
||||
}
|
||||
format := options.Format
|
||||
if format == "" {
|
||||
format = "gzip"
|
||||
}
|
||||
if format != "gzip" {
|
||||
return core.Result{Value: core.E("store.Compact", core.Concat("unsupported archive format: ", format), nil), OK: false}
|
||||
}
|
||||
|
||||
filesystem := (&core.Fs{}).NewUnrestricted()
|
||||
if result := filesystem.EnsureDir(outputDirectory); !result.OK {
|
||||
return core.Result{Value: core.E("store.Compact", "ensure archive directory", result.Value.(error)), OK: false}
|
||||
}
|
||||
|
||||
rows, err := storeInstance.database.Query(
|
||||
"SELECT entry_id, bucket_name, measurement, fields_json, tags_json, committed_at FROM "+journalEntriesTableName+" WHERE archived_at IS NULL AND committed_at < ? ORDER BY committed_at",
|
||||
options.Before.UnixMilli(),
|
||||
)
|
||||
if err != nil {
|
||||
return core.Result{Value: core.E("store.Compact", "query journal rows", err), OK: false}
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var archiveEntries []compactArchiveEntry
|
||||
for rows.Next() {
|
||||
var entry compactArchiveEntry
|
||||
if err := rows.Scan(
|
||||
&entry.id,
|
||||
&entry.bucket,
|
||||
&entry.measurement,
|
||||
&entry.fieldsJSON,
|
||||
&entry.tagsJSON,
|
||||
&entry.committedAt,
|
||||
); err != nil {
|
||||
return core.Result{Value: core.E("store.Compact", "scan journal row", err), OK: false}
|
||||
}
|
||||
archiveEntries = append(archiveEntries, entry)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return core.Result{Value: core.E("store.Compact", "iterate journal rows", err), OK: false}
|
||||
}
|
||||
if len(archiveEntries) == 0 {
|
||||
return core.Result{Value: "", OK: true}
|
||||
}
|
||||
|
||||
outputPath := compactOutputPath(outputDirectory, format)
|
||||
createResult := filesystem.Create(outputPath)
|
||||
if !createResult.OK {
|
||||
return core.Result{Value: core.E("store.Compact", "create archive file", createResult.Value.(error)), OK: false}
|
||||
}
|
||||
|
||||
file, ok := createResult.Value.(io.WriteCloser)
|
||||
if !ok {
|
||||
return core.Result{Value: core.E("store.Compact", "archive file is not writable", nil), OK: false}
|
||||
}
|
||||
fileClosed := false
|
||||
defer func() {
|
||||
if !fileClosed {
|
||||
_ = file.Close()
|
||||
}
|
||||
}()
|
||||
|
||||
writer := gzip.NewWriter(file)
|
||||
writeOK := false
|
||||
defer func() {
|
||||
if !writeOK {
|
||||
_ = writer.Close()
|
||||
}
|
||||
}()
|
||||
|
||||
for _, entry := range archiveEntries {
|
||||
lineMap, err := compactArchiveLine(entry)
|
||||
if err != nil {
|
||||
return core.Result{Value: err, OK: false}
|
||||
}
|
||||
lineJSON, err := jsonString(lineMap, "store.Compact", "marshal archive line")
|
||||
if err != nil {
|
||||
return core.Result{Value: err, OK: false}
|
||||
}
|
||||
if _, err := io.WriteString(writer, lineJSON+"\n"); err != nil {
|
||||
return core.Result{Value: core.E("store.Compact", "write archive line", err), OK: false}
|
||||
}
|
||||
}
|
||||
if err := writer.Close(); err != nil {
|
||||
return core.Result{Value: core.E("store.Compact", "close archive writer", err), OK: false}
|
||||
}
|
||||
writeOK = true
|
||||
if err := file.Close(); err != nil {
|
||||
return core.Result{Value: core.E("store.Compact", "close archive file", err), OK: false}
|
||||
}
|
||||
fileClosed = true
|
||||
|
||||
transaction, err := storeInstance.database.Begin()
|
||||
if err != nil {
|
||||
return core.Result{Value: core.E("store.Compact", "begin archive transaction", err), OK: false}
|
||||
}
|
||||
|
||||
committed := false
|
||||
defer func() {
|
||||
if !committed {
|
||||
_ = transaction.Rollback()
|
||||
}
|
||||
}()
|
||||
|
||||
archivedAt := time.Now().UnixMilli()
|
||||
for _, entry := range archiveEntries {
|
||||
if _, err := transaction.Exec(
|
||||
"UPDATE "+journalEntriesTableName+" SET archived_at = ? WHERE entry_id = ?",
|
||||
archivedAt,
|
||||
entry.id,
|
||||
); err != nil {
|
||||
return core.Result{Value: core.E("store.Compact", "mark journal row archived", err), OK: false}
|
||||
}
|
||||
}
|
||||
if err := transaction.Commit(); err != nil {
|
||||
return core.Result{Value: core.E("store.Compact", "commit archive transaction", err), OK: false}
|
||||
}
|
||||
committed = true
|
||||
|
||||
return core.Result{Value: outputPath, OK: true}
|
||||
}
|
||||
|
||||
func compactArchiveLine(entry compactArchiveEntry) (map[string]any, error) {
|
||||
fields := make(map[string]any)
|
||||
fieldsResult := core.JSONUnmarshalString(entry.fieldsJSON, &fields)
|
||||
if !fieldsResult.OK {
|
||||
return nil, core.E("store.Compact", "unmarshal fields", fieldsResult.Value.(error))
|
||||
}
|
||||
|
||||
tags := make(map[string]string)
|
||||
tagsResult := core.JSONUnmarshalString(entry.tagsJSON, &tags)
|
||||
if !tagsResult.OK {
|
||||
return nil, core.E("store.Compact", "unmarshal tags", tagsResult.Value.(error))
|
||||
}
|
||||
|
||||
return map[string]any{
|
||||
"bucket": entry.bucket,
|
||||
"measurement": entry.measurement,
|
||||
"fields": fields,
|
||||
"tags": tags,
|
||||
"committed_at": entry.committedAt,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func compactOutputPath(outputDirectory, format string) string {
|
||||
extension := ".jsonl"
|
||||
if format == "gzip" {
|
||||
extension = ".jsonl.gz"
|
||||
}
|
||||
filename := core.Concat("journal-", time.Now().UTC().Format("20060102-150405"), extension)
|
||||
return joinPath(outputDirectory, filename)
|
||||
}
|
||||
81
compact_test.go
Normal file
81
compact_test.go
Normal file
|
|
@ -0,0 +1,81 @@
|
|||
package store
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"compress/gzip"
|
||||
"io"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
core "dappco.re/go/core"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestCompact_Compact_Good_GzipArchive(t *testing.T) {
|
||||
outputDirectory := useArchiveOutputDirectory(t)
|
||||
|
||||
storeInstance, err := New(":memory:", WithJournal("http://127.0.0.1:8086", "core", "events"))
|
||||
require.NoError(t, err)
|
||||
defer storeInstance.Close()
|
||||
|
||||
require.True(t,
|
||||
storeInstance.CommitToJournal("session-a", map[string]any{"like": 1}, map[string]string{"workspace": "session-a"}).OK,
|
||||
)
|
||||
require.True(t,
|
||||
storeInstance.CommitToJournal("session-b", map[string]any{"like": 2}, map[string]string{"workspace": "session-b"}).OK,
|
||||
)
|
||||
|
||||
_, err = storeInstance.database.Exec(
|
||||
"UPDATE "+journalEntriesTableName+" SET committed_at = ? WHERE measurement = ?",
|
||||
time.Now().Add(-48*time.Hour).UnixMilli(),
|
||||
"session-a",
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
result := storeInstance.Compact(CompactOptions{
|
||||
Before: time.Now().Add(-24 * time.Hour),
|
||||
Output: outputDirectory,
|
||||
Format: "gzip",
|
||||
})
|
||||
require.True(t, result.OK, "compact failed: %v", result.Value)
|
||||
|
||||
archivePath, ok := result.Value.(string)
|
||||
require.True(t, ok, "unexpected archive path type: %T", result.Value)
|
||||
assert.True(t, testFilesystem().Exists(archivePath))
|
||||
|
||||
archiveData := requireCoreReadBytes(t, archivePath)
|
||||
reader, err := gzip.NewReader(bytes.NewReader(archiveData))
|
||||
require.NoError(t, err)
|
||||
defer reader.Close()
|
||||
|
||||
decompressedData, err := io.ReadAll(reader)
|
||||
require.NoError(t, err)
|
||||
lines := core.Split(core.Trim(string(decompressedData)), "\n")
|
||||
require.Len(t, lines, 1)
|
||||
|
||||
archivedRow := make(map[string]any)
|
||||
unmarshalResult := core.JSONUnmarshalString(lines[0], &archivedRow)
|
||||
require.True(t, unmarshalResult.OK, "archive line unmarshal failed: %v", unmarshalResult.Value)
|
||||
assert.Equal(t, "session-a", archivedRow["measurement"])
|
||||
|
||||
remainingRows := requireResultRows(t, storeInstance.QueryJournal(""))
|
||||
require.Len(t, remainingRows, 1)
|
||||
assert.Equal(t, "session-b", remainingRows[0]["measurement"])
|
||||
}
|
||||
|
||||
func TestCompact_Compact_Good_NoRows(t *testing.T) {
|
||||
outputDirectory := useArchiveOutputDirectory(t)
|
||||
|
||||
storeInstance, err := New(":memory:")
|
||||
require.NoError(t, err)
|
||||
defer storeInstance.Close()
|
||||
|
||||
result := storeInstance.Compact(CompactOptions{
|
||||
Before: time.Now(),
|
||||
Output: outputDirectory,
|
||||
Format: "gzip",
|
||||
})
|
||||
require.True(t, result.OK, "compact failed: %v", result.Value)
|
||||
assert.Equal(t, "", result.Value)
|
||||
}
|
||||
46
events.go
46
events.go
|
|
@ -77,13 +77,18 @@ const watcherEventBufferCapacity = 16
|
|||
|
||||
// Watch registers a buffered subscription for matching mutations.
|
||||
// Usage example: `watcher := storeInstance.Watch("*", "*")`
|
||||
func (storeInstance *Store) Watch(group, key string) *Watcher {
|
||||
// Usage example: `watcher := storeInstance.Watch("config")`
|
||||
func (storeInstance *Store) Watch(group string, key ...string) *Watcher {
|
||||
keyPattern := "*"
|
||||
if len(key) > 0 && key[0] != "" {
|
||||
keyPattern = key[0]
|
||||
}
|
||||
eventChannel := make(chan Event, watcherEventBufferCapacity)
|
||||
watcher := &Watcher{
|
||||
Events: eventChannel,
|
||||
eventsChannel: eventChannel,
|
||||
groupPattern: group,
|
||||
keyPattern: key,
|
||||
keyPattern: keyPattern,
|
||||
registrationID: atomic.AddUint64(&storeInstance.nextWatcherRegistrationID, 1),
|
||||
}
|
||||
|
||||
|
|
@ -115,7 +120,9 @@ func (storeInstance *Store) Unwatch(watcher *Watcher) {
|
|||
|
||||
// OnChange registers a synchronous mutation callback.
|
||||
// Usage example: `events := make(chan store.Event, 1); unregister := storeInstance.OnChange(func(event store.Event) { events <- event }); defer unregister()`
|
||||
func (storeInstance *Store) OnChange(callback func(Event)) func() {
|
||||
// Usage example: `unregister := storeInstance.OnChange("config", func(key, value string) { fmt.Println(key, value) })`
|
||||
func (storeInstance *Store) OnChange(arguments ...any) func() {
|
||||
callback := onChangeCallback(arguments)
|
||||
if callback == nil {
|
||||
return func() {}
|
||||
}
|
||||
|
|
@ -140,6 +147,39 @@ func (storeInstance *Store) OnChange(callback func(Event)) func() {
|
|||
}
|
||||
}
|
||||
|
||||
func onChangeCallback(arguments []any) func(Event) {
|
||||
switch len(arguments) {
|
||||
case 0:
|
||||
return nil
|
||||
case 1:
|
||||
if arguments[0] == nil {
|
||||
return nil
|
||||
}
|
||||
callback, ok := arguments[0].(func(Event))
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
return callback
|
||||
case 2:
|
||||
group, ok := arguments[0].(string)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
callback, ok := arguments[1].(func(string, string))
|
||||
if !ok || callback == nil {
|
||||
return nil
|
||||
}
|
||||
return func(event Event) {
|
||||
if event.Group != group {
|
||||
return
|
||||
}
|
||||
callback(event.Key, event.Value)
|
||||
}
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// notify(Event{Type: EventSet, Group: "config", Key: "colour", Value: "blue"})
|
||||
// dispatches matching watchers and callbacks after a successful write. If a
|
||||
// watcher buffer is full, the event is dropped instead of blocking the writer.
|
||||
|
|
|
|||
|
|
@ -66,6 +66,22 @@ func TestEvents_Watch_Good_WildcardKey(t *testing.T) {
|
|||
assert.Equal(t, "colour", received[1].Key)
|
||||
}
|
||||
|
||||
func TestEvents_Watch_Good_DefaultWildcardKey(t *testing.T) {
|
||||
storeInstance, _ := New(":memory:")
|
||||
defer storeInstance.Close()
|
||||
|
||||
watcher := storeInstance.Watch("config")
|
||||
defer storeInstance.Unwatch(watcher)
|
||||
|
||||
require.NoError(t, storeInstance.Set("config", "theme", "dark"))
|
||||
require.NoError(t, storeInstance.Set("config", "colour", "blue"))
|
||||
|
||||
received := drainEvents(watcher.Events, 2, time.Second)
|
||||
require.Len(t, received, 2)
|
||||
assert.Equal(t, "theme", received[0].Key)
|
||||
assert.Equal(t, "colour", received[1].Key)
|
||||
}
|
||||
|
||||
func TestEvents_Watch_Good_GroupMismatch(t *testing.T) {
|
||||
storeInstance, _ := New(":memory:")
|
||||
defer storeInstance.Close()
|
||||
|
|
@ -300,6 +316,22 @@ func TestEvents_OnChange_Good_NilCallbackNoOp(t *testing.T) {
|
|||
unregister()
|
||||
}
|
||||
|
||||
func TestEvents_OnChange_Good_GroupCallback(t *testing.T) {
|
||||
storeInstance, _ := New(":memory:")
|
||||
defer storeInstance.Close()
|
||||
|
||||
var keys []string
|
||||
unregister := storeInstance.OnChange("config", func(key, value string) {
|
||||
keys = append(keys, key)
|
||||
})
|
||||
defer unregister()
|
||||
|
||||
require.NoError(t, storeInstance.Set("config", "theme", "dark"))
|
||||
require.NoError(t, storeInstance.Set("other", "theme", "ignored"))
|
||||
|
||||
assert.Equal(t, []string{"theme"}, keys)
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// OnChange — callback can manage subscriptions while handling an event
|
||||
// ---------------------------------------------------------------------------
|
||||
|
|
|
|||
291
journal.go
Normal file
291
journal.go
Normal file
|
|
@ -0,0 +1,291 @@
|
|||
package store
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
core "dappco.re/go/core"
|
||||
)
|
||||
|
||||
const (
|
||||
journalEntriesTableName = "journal_entries"
|
||||
defaultJournalBucket = "store"
|
||||
)
|
||||
|
||||
const createJournalEntriesTableSQL = `CREATE TABLE IF NOT EXISTS journal_entries (
|
||||
entry_id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
bucket_name TEXT NOT NULL,
|
||||
measurement TEXT NOT NULL,
|
||||
fields_json TEXT NOT NULL,
|
||||
tags_json TEXT NOT NULL,
|
||||
committed_at INTEGER NOT NULL,
|
||||
archived_at INTEGER
|
||||
)`
|
||||
|
||||
var (
|
||||
journalBucketPattern = regexp.MustCompile(`bucket:\s*"([^"]+)"`)
|
||||
journalRangePattern = regexp.MustCompile(`range\(\s*start:\s*([^)]+)\)`)
|
||||
journalMeasurementPattern = regexp.MustCompile(`(?:_measurement|measurement)\s*==\s*"([^"]+)"`)
|
||||
)
|
||||
|
||||
type journalExecutor interface {
|
||||
Exec(query string, args ...any) (sql.Result, error)
|
||||
}
|
||||
|
||||
// CommitToJournal records one completed unit of work in the store journal.
|
||||
// Usage example: `result := storeInstance.CommitToJournal("scroll-session", map[string]any{"like": 4}, map[string]string{"workspace": "scroll-session"})`
|
||||
func (storeInstance *Store) CommitToJournal(measurement string, fields map[string]any, tags map[string]string) core.Result {
|
||||
if measurement == "" {
|
||||
return core.Result{Value: core.E("store.CommitToJournal", "measurement is empty", nil), OK: false}
|
||||
}
|
||||
if fields == nil {
|
||||
fields = map[string]any{}
|
||||
}
|
||||
if tags == nil {
|
||||
tags = map[string]string{}
|
||||
}
|
||||
if err := ensureJournalSchema(storeInstance.database); err != nil {
|
||||
return core.Result{Value: core.E("store.CommitToJournal", "ensure journal schema", err), OK: false}
|
||||
}
|
||||
|
||||
fieldsJSON, err := jsonString(fields, "store.CommitToJournal", "marshal fields")
|
||||
if err != nil {
|
||||
return core.Result{Value: err, OK: false}
|
||||
}
|
||||
tagsJSON, err := jsonString(tags, "store.CommitToJournal", "marshal tags")
|
||||
if err != nil {
|
||||
return core.Result{Value: err, OK: false}
|
||||
}
|
||||
|
||||
committedAt := time.Now().UnixMilli()
|
||||
if err := insertJournalEntry(
|
||||
storeInstance.database,
|
||||
storeInstance.journalBucket(),
|
||||
measurement,
|
||||
fieldsJSON,
|
||||
tagsJSON,
|
||||
committedAt,
|
||||
); err != nil {
|
||||
return core.Result{Value: core.E("store.CommitToJournal", "insert journal entry", err), OK: false}
|
||||
}
|
||||
|
||||
return core.Result{
|
||||
Value: map[string]any{
|
||||
"bucket": storeInstance.journalBucket(),
|
||||
"measurement": measurement,
|
||||
"fields": fields,
|
||||
"tags": tags,
|
||||
"committed_at": committedAt,
|
||||
},
|
||||
OK: true,
|
||||
}
|
||||
}
|
||||
|
||||
// QueryJournal reads journal rows either through a small Flux-like filter
|
||||
// surface or a direct SQL SELECT against the internal journal table.
|
||||
// Usage example: `result := storeInstance.QueryJournal(\`from(bucket: "store") |> range(start: -24h)\`)`
|
||||
func (storeInstance *Store) QueryJournal(flux string) core.Result {
|
||||
if err := ensureJournalSchema(storeInstance.database); err != nil {
|
||||
return core.Result{Value: core.E("store.QueryJournal", "ensure journal schema", err), OK: false}
|
||||
}
|
||||
|
||||
trimmedQuery := core.Trim(flux)
|
||||
if trimmedQuery == "" {
|
||||
return storeInstance.queryJournalRows(
|
||||
"SELECT bucket_name, measurement, fields_json, tags_json, committed_at, archived_at FROM " + journalEntriesTableName + " WHERE archived_at IS NULL ORDER BY committed_at",
|
||||
)
|
||||
}
|
||||
if core.HasPrefix(trimmedQuery, "SELECT") || core.HasPrefix(trimmedQuery, "select") {
|
||||
return storeInstance.queryJournalRows(trimmedQuery)
|
||||
}
|
||||
|
||||
selectSQL, arguments, err := storeInstance.queryJournalFlux(trimmedQuery)
|
||||
if err != nil {
|
||||
return core.Result{Value: err, OK: false}
|
||||
}
|
||||
return storeInstance.queryJournalRows(selectSQL, arguments...)
|
||||
}
|
||||
|
||||
func (storeInstance *Store) queryJournalRows(query string, arguments ...any) core.Result {
|
||||
rows, err := storeInstance.database.Query(query, arguments...)
|
||||
if err != nil {
|
||||
return core.Result{Value: core.E("store.QueryJournal", "query rows", err), OK: false}
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
rowMaps, err := queryRowsAsMaps(rows)
|
||||
if err != nil {
|
||||
return core.Result{Value: core.E("store.QueryJournal", "scan rows", err), OK: false}
|
||||
}
|
||||
return core.Result{Value: inflateJournalRows(rowMaps), OK: true}
|
||||
}
|
||||
|
||||
func (storeInstance *Store) queryJournalFlux(flux string) (string, []any, error) {
|
||||
builder := core.NewBuilder()
|
||||
builder.WriteString("SELECT bucket_name, measurement, fields_json, tags_json, committed_at, archived_at FROM ")
|
||||
builder.WriteString(journalEntriesTableName)
|
||||
builder.WriteString(" WHERE archived_at IS NULL")
|
||||
|
||||
var arguments []any
|
||||
if bucket := quotedSubmatch(journalBucketPattern, flux); bucket != "" {
|
||||
builder.WriteString(" AND bucket_name = ?")
|
||||
arguments = append(arguments, bucket)
|
||||
}
|
||||
if measurement := quotedSubmatch(journalMeasurementPattern, flux); measurement != "" {
|
||||
builder.WriteString(" AND measurement = ?")
|
||||
arguments = append(arguments, measurement)
|
||||
}
|
||||
|
||||
rangeMatch := quotedSubmatch(journalRangePattern, flux)
|
||||
if rangeMatch == "" {
|
||||
rangeMatch = regexpSubmatch(journalRangePattern, flux, 1)
|
||||
}
|
||||
if rangeMatch != "" {
|
||||
startTime, err := fluxStartTime(core.Trim(rangeMatch))
|
||||
if err != nil {
|
||||
return "", nil, core.E("store.QueryJournal", "parse range", err)
|
||||
}
|
||||
builder.WriteString(" AND committed_at >= ?")
|
||||
arguments = append(arguments, startTime.UnixMilli())
|
||||
}
|
||||
|
||||
builder.WriteString(" ORDER BY committed_at")
|
||||
return builder.String(), arguments, nil
|
||||
}
|
||||
|
||||
func (storeInstance *Store) journalBucket() string {
|
||||
if storeInstance.journal.bucket == "" {
|
||||
return defaultJournalBucket
|
||||
}
|
||||
return storeInstance.journal.bucket
|
||||
}
|
||||
|
||||
func ensureJournalSchema(database schemaDatabase) error {
|
||||
if _, err := database.Exec(createJournalEntriesTableSQL); err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := database.Exec(
|
||||
"CREATE INDEX IF NOT EXISTS journal_entries_bucket_committed_at_idx ON " + journalEntriesTableName + " (bucket_name, committed_at)",
|
||||
); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func insertJournalEntry(
|
||||
executor journalExecutor,
|
||||
bucket, measurement, fieldsJSON, tagsJSON string,
|
||||
committedAt int64,
|
||||
) error {
|
||||
_, err := executor.Exec(
|
||||
"INSERT INTO "+journalEntriesTableName+" (bucket_name, measurement, fields_json, tags_json, committed_at, archived_at) VALUES (?, ?, ?, ?, ?, NULL)",
|
||||
bucket,
|
||||
measurement,
|
||||
fieldsJSON,
|
||||
tagsJSON,
|
||||
committedAt,
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
||||
func jsonString(value any, operation, message string) (string, error) {
|
||||
result := core.JSONMarshal(value)
|
||||
if !result.OK {
|
||||
return "", core.E(operation, message, result.Value.(error))
|
||||
}
|
||||
return string(result.Value.([]byte)), nil
|
||||
}
|
||||
|
||||
func fluxStartTime(value string) (time.Time, error) {
|
||||
if value == "" {
|
||||
return time.Time{}, core.E("store.fluxStartTime", "range value is empty", nil)
|
||||
}
|
||||
if core.HasSuffix(value, "d") {
|
||||
days, err := strconv.Atoi(core.TrimSuffix(value, "d"))
|
||||
if err != nil {
|
||||
return time.Time{}, err
|
||||
}
|
||||
return time.Now().Add(time.Duration(days) * 24 * time.Hour), nil
|
||||
}
|
||||
lookback, err := time.ParseDuration(value)
|
||||
if err != nil {
|
||||
return time.Time{}, err
|
||||
}
|
||||
return time.Now().Add(lookback), nil
|
||||
}
|
||||
|
||||
func quotedSubmatch(pattern *regexp.Regexp, value string) string {
|
||||
match := pattern.FindStringSubmatch(value)
|
||||
if len(match) < 2 {
|
||||
return ""
|
||||
}
|
||||
return match[1]
|
||||
}
|
||||
|
||||
func regexpSubmatch(pattern *regexp.Regexp, value string, index int) string {
|
||||
match := pattern.FindStringSubmatch(value)
|
||||
if len(match) <= index {
|
||||
return ""
|
||||
}
|
||||
return match[index]
|
||||
}
|
||||
|
||||
func queryRowsAsMaps(rows *sql.Rows) ([]map[string]any, error) {
|
||||
columnNames, err := rows.Columns()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var result []map[string]any
|
||||
for rows.Next() {
|
||||
rawValues := make([]any, len(columnNames))
|
||||
scanTargets := make([]any, len(columnNames))
|
||||
for i := range rawValues {
|
||||
scanTargets[i] = &rawValues[i]
|
||||
}
|
||||
if err := rows.Scan(scanTargets...); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
row := make(map[string]any, len(columnNames))
|
||||
for i, columnName := range columnNames {
|
||||
row[columnName] = normaliseRowValue(rawValues[i])
|
||||
}
|
||||
result = append(result, row)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func inflateJournalRows(rows []map[string]any) []map[string]any {
|
||||
for _, row := range rows {
|
||||
if fieldsJSON, ok := row["fields_json"].(string); ok {
|
||||
fields := make(map[string]any)
|
||||
result := core.JSONUnmarshalString(fieldsJSON, &fields)
|
||||
if result.OK {
|
||||
row["fields"] = fields
|
||||
}
|
||||
}
|
||||
if tagsJSON, ok := row["tags_json"].(string); ok {
|
||||
tags := make(map[string]string)
|
||||
result := core.JSONUnmarshalString(tagsJSON, &tags)
|
||||
if result.OK {
|
||||
row["tags"] = tags
|
||||
}
|
||||
}
|
||||
}
|
||||
return rows
|
||||
}
|
||||
|
||||
func normaliseRowValue(value any) any {
|
||||
switch typedValue := value.(type) {
|
||||
case []byte:
|
||||
return string(typedValue)
|
||||
default:
|
||||
return typedValue
|
||||
}
|
||||
}
|
||||
69
journal_test.go
Normal file
69
journal_test.go
Normal file
|
|
@ -0,0 +1,69 @@
|
|||
package store
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestJournal_CommitToJournal_Good_WithQueryJournalSQL(t *testing.T) {
|
||||
storeInstance, err := New(":memory:", WithJournal("http://127.0.0.1:8086", "core", "events"))
|
||||
require.NoError(t, err)
|
||||
defer storeInstance.Close()
|
||||
|
||||
first := storeInstance.CommitToJournal("session-a", map[string]any{"like": 4}, map[string]string{"workspace": "session-a"})
|
||||
second := storeInstance.CommitToJournal("session-b", map[string]any{"profile_match": 2}, map[string]string{"workspace": "session-b"})
|
||||
require.True(t, first.OK, "first journal commit failed: %v", first.Value)
|
||||
require.True(t, second.OK, "second journal commit failed: %v", second.Value)
|
||||
|
||||
rows := requireResultRows(
|
||||
t,
|
||||
storeInstance.QueryJournal("SELECT bucket_name, measurement, fields_json, tags_json FROM journal_entries ORDER BY entry_id"),
|
||||
)
|
||||
require.Len(t, rows, 2)
|
||||
assert.Equal(t, "events", rows[0]["bucket_name"])
|
||||
assert.Equal(t, "session-a", rows[0]["measurement"])
|
||||
|
||||
fields, ok := rows[0]["fields"].(map[string]any)
|
||||
require.True(t, ok, "unexpected fields type: %T", rows[0]["fields"])
|
||||
assert.Equal(t, float64(4), fields["like"])
|
||||
|
||||
tags, ok := rows[1]["tags"].(map[string]string)
|
||||
require.True(t, ok, "unexpected tags type: %T", rows[1]["tags"])
|
||||
assert.Equal(t, "session-b", tags["workspace"])
|
||||
}
|
||||
|
||||
func TestJournal_QueryJournal_Good_FluxFilters(t *testing.T) {
|
||||
storeInstance, err := New(":memory:", WithJournal("http://127.0.0.1:8086", "core", "events"))
|
||||
require.NoError(t, err)
|
||||
defer storeInstance.Close()
|
||||
|
||||
require.True(t,
|
||||
storeInstance.CommitToJournal("session-a", map[string]any{"like": 1}, map[string]string{"workspace": "session-a"}).OK,
|
||||
)
|
||||
require.True(t,
|
||||
storeInstance.CommitToJournal("session-b", map[string]any{"like": 2}, map[string]string{"workspace": "session-b"}).OK,
|
||||
)
|
||||
|
||||
rows := requireResultRows(
|
||||
t,
|
||||
storeInstance.QueryJournal(`from(bucket: "events") |> range(start: -24h) |> filter(fn: (r) => r._measurement == "session-b")`),
|
||||
)
|
||||
require.Len(t, rows, 1)
|
||||
assert.Equal(t, "session-b", rows[0]["measurement"])
|
||||
|
||||
fields, ok := rows[0]["fields"].(map[string]any)
|
||||
require.True(t, ok, "unexpected fields type: %T", rows[0]["fields"])
|
||||
assert.Equal(t, float64(2), fields["like"])
|
||||
}
|
||||
|
||||
func TestJournal_CommitToJournal_Bad_EmptyMeasurement(t *testing.T) {
|
||||
storeInstance, err := New(":memory:")
|
||||
require.NoError(t, err)
|
||||
defer storeInstance.Close()
|
||||
|
||||
result := storeInstance.CommitToJournal("", map[string]any{"like": 1}, map[string]string{"workspace": "missing"})
|
||||
require.False(t, result.OK)
|
||||
assert.Contains(t, result.Value.(error).Error(), "measurement is empty")
|
||||
}
|
||||
102
scope.go
102
scope.go
|
|
@ -11,6 +11,8 @@ import (
|
|||
// validNamespace.MatchString("tenant-a") is true; validNamespace.MatchString("tenant_a") is false.
|
||||
var validNamespace = regexp.MustCompile(`^[a-zA-Z0-9-]+$`)
|
||||
|
||||
const defaultScopedGroupName = "default"
|
||||
|
||||
// QuotaConfig sets per-namespace key and group limits.
|
||||
// Usage example: `quota := store.QuotaConfig{MaxKeys: 100, MaxGroups: 10}`
|
||||
type QuotaConfig struct {
|
||||
|
|
@ -25,7 +27,8 @@ type QuotaConfig struct {
|
|||
type ScopedStore struct {
|
||||
storeInstance *Store
|
||||
namespace string
|
||||
quota QuotaConfig
|
||||
MaxKeys int
|
||||
MaxGroups int
|
||||
}
|
||||
|
||||
// NewScoped validates a namespace and prefixes groups with namespace + ":".
|
||||
|
|
@ -55,7 +58,8 @@ func NewScopedWithQuota(storeInstance *Store, namespace string, quota QuotaConfi
|
|||
nil,
|
||||
)
|
||||
}
|
||||
scopedStore.quota = quota
|
||||
scopedStore.MaxKeys = quota.MaxKeys
|
||||
scopedStore.MaxGroups = quota.MaxGroups
|
||||
return scopedStore, nil
|
||||
}
|
||||
|
||||
|
|
@ -67,6 +71,10 @@ func (scopedStore *ScopedStore) namespacePrefix() string {
|
|||
return scopedStore.namespace + ":"
|
||||
}
|
||||
|
||||
func (scopedStore *ScopedStore) defaultGroup() string {
|
||||
return defaultScopedGroupName
|
||||
}
|
||||
|
||||
func (scopedStore *ScopedStore) trimNamespacePrefix(groupName string) string {
|
||||
return core.TrimPrefix(groupName, scopedStore.namespacePrefix())
|
||||
}
|
||||
|
|
@ -77,19 +85,41 @@ func (scopedStore *ScopedStore) Namespace() string {
|
|||
return scopedStore.namespace
|
||||
}
|
||||
|
||||
// Usage example: `colourValue, err := scopedStore.Get("colour")`
|
||||
// Usage example: `colourValue, err := scopedStore.Get("config", "colour")`
|
||||
func (scopedStore *ScopedStore) Get(group, key string) (string, error) {
|
||||
func (scopedStore *ScopedStore) Get(arguments ...string) (string, error) {
|
||||
group, key, err := scopedStore.getArguments(arguments)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return scopedStore.storeInstance.Get(scopedStore.namespacedGroup(group), key)
|
||||
}
|
||||
|
||||
// GetFrom reads a key from an explicit namespaced group.
|
||||
// Usage example: `colourValue, err := scopedStore.GetFrom("config", "colour")`
|
||||
func (scopedStore *ScopedStore) GetFrom(group, key string) (string, error) {
|
||||
return scopedStore.Get(group, key)
|
||||
}
|
||||
|
||||
// Usage example: `if err := scopedStore.Set("colour", "blue"); err != nil { return }`
|
||||
// Usage example: `if err := scopedStore.Set("config", "colour", "blue"); err != nil { return }`
|
||||
func (scopedStore *ScopedStore) Set(group, key, value string) error {
|
||||
func (scopedStore *ScopedStore) Set(arguments ...string) error {
|
||||
group, key, value, err := scopedStore.setArguments(arguments)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := scopedStore.checkQuota("store.ScopedStore.Set", group, key); err != nil {
|
||||
return err
|
||||
}
|
||||
return scopedStore.storeInstance.Set(scopedStore.namespacedGroup(group), key, value)
|
||||
}
|
||||
|
||||
// SetIn writes a key to an explicit namespaced group.
|
||||
// Usage example: `if err := scopedStore.SetIn("config", "colour", "blue"); err != nil { return }`
|
||||
func (scopedStore *ScopedStore) SetIn(group, key, value string) error {
|
||||
return scopedStore.Set(group, key, value)
|
||||
}
|
||||
|
||||
// Usage example: `if err := scopedStore.SetWithTTL("sessions", "token", "abc123", time.Hour); err != nil { return }`
|
||||
func (scopedStore *ScopedStore) SetWithTTL(group, key, value string, timeToLive time.Duration) error {
|
||||
if err := scopedStore.checkQuota("store.ScopedStore.SetWithTTL", group, key); err != nil {
|
||||
|
|
@ -118,19 +148,26 @@ func (scopedStore *ScopedStore) All(group string) iter.Seq2[KeyValue, error] {
|
|||
return scopedStore.storeInstance.All(scopedStore.namespacedGroup(group))
|
||||
}
|
||||
|
||||
// Usage example: `for entry, err := range scopedStore.AllSeq("config") { if err != nil { break }; fmt.Println(entry.Key, entry.Value) }`
|
||||
func (scopedStore *ScopedStore) AllSeq(group string) iter.Seq2[KeyValue, error] {
|
||||
return scopedStore.All(group)
|
||||
}
|
||||
|
||||
// Usage example: `keyCount, err := scopedStore.Count("config")`
|
||||
func (scopedStore *ScopedStore) Count(group string) (int, error) {
|
||||
return scopedStore.storeInstance.Count(scopedStore.namespacedGroup(group))
|
||||
}
|
||||
|
||||
// Usage example: `keyCount, err := scopedStore.CountAll("config")`
|
||||
func (scopedStore *ScopedStore) CountAll(groupPrefix string) (int, error) {
|
||||
return scopedStore.storeInstance.CountAll(scopedStore.namespacedGroup(groupPrefix))
|
||||
// Usage example: `keyCount, err := scopedStore.CountAll()`
|
||||
func (scopedStore *ScopedStore) CountAll(groupPrefix ...string) (int, error) {
|
||||
return scopedStore.storeInstance.CountAll(scopedStore.namespacedGroup(firstString(groupPrefix)))
|
||||
}
|
||||
|
||||
// Usage example: `groupNames, err := scopedStore.Groups("config")`
|
||||
func (scopedStore *ScopedStore) Groups(groupPrefix string) ([]string, error) {
|
||||
groupNames, err := scopedStore.storeInstance.Groups(scopedStore.namespacedGroup(groupPrefix))
|
||||
// Usage example: `groupNames, err := scopedStore.Groups()`
|
||||
func (scopedStore *ScopedStore) Groups(groupPrefix ...string) ([]string, error) {
|
||||
groupNames, err := scopedStore.storeInstance.Groups(scopedStore.namespacedGroup(firstString(groupPrefix)))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -141,10 +178,11 @@ func (scopedStore *ScopedStore) Groups(groupPrefix string) ([]string, error) {
|
|||
}
|
||||
|
||||
// Usage example: `for groupName, err := range scopedStore.GroupsSeq("config") { if err != nil { break }; fmt.Println(groupName) }`
|
||||
func (scopedStore *ScopedStore) GroupsSeq(groupPrefix string) iter.Seq2[string, error] {
|
||||
// Usage example: `for groupName, err := range scopedStore.GroupsSeq() { if err != nil { break }; fmt.Println(groupName) }`
|
||||
func (scopedStore *ScopedStore) GroupsSeq(groupPrefix ...string) iter.Seq2[string, error] {
|
||||
return func(yield func(string, error) bool) {
|
||||
namespacePrefix := scopedStore.namespacePrefix()
|
||||
for groupName, err := range scopedStore.storeInstance.GroupsSeq(scopedStore.namespacedGroup(groupPrefix)) {
|
||||
for groupName, err := range scopedStore.storeInstance.GroupsSeq(scopedStore.namespacedGroup(firstString(groupPrefix))) {
|
||||
if err != nil {
|
||||
if !yield("", err) {
|
||||
return
|
||||
|
|
@ -187,7 +225,7 @@ func (scopedStore *ScopedStore) PurgeExpired() (int64, error) {
|
|||
// group would exceed the configured limit. Existing keys are treated as
|
||||
// upserts and do not consume quota.
|
||||
func (scopedStore *ScopedStore) checkQuota(operation, group, key string) error {
|
||||
if scopedStore.quota.MaxKeys == 0 && scopedStore.quota.MaxGroups == 0 {
|
||||
if scopedStore.MaxKeys == 0 && scopedStore.MaxGroups == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
@ -206,18 +244,18 @@ func (scopedStore *ScopedStore) checkQuota(operation, group, key string) error {
|
|||
}
|
||||
|
||||
// Check MaxKeys quota.
|
||||
if scopedStore.quota.MaxKeys > 0 {
|
||||
if scopedStore.MaxKeys > 0 {
|
||||
keyCount, err := scopedStore.storeInstance.CountAll(namespacePrefix)
|
||||
if err != nil {
|
||||
return core.E(operation, "quota check", err)
|
||||
}
|
||||
if keyCount >= scopedStore.quota.MaxKeys {
|
||||
return core.E(operation, core.Sprintf("key limit (%d)", scopedStore.quota.MaxKeys), QuotaExceededError)
|
||||
if keyCount >= scopedStore.MaxKeys {
|
||||
return core.E(operation, core.Sprintf("key limit (%d)", scopedStore.MaxKeys), QuotaExceededError)
|
||||
}
|
||||
}
|
||||
|
||||
// Check MaxGroups quota — only if this would create a new group.
|
||||
if scopedStore.quota.MaxGroups > 0 {
|
||||
if scopedStore.MaxGroups > 0 {
|
||||
existingGroupCount, err := scopedStore.storeInstance.Count(namespacedGroup)
|
||||
if err != nil {
|
||||
return core.E(operation, "quota check", err)
|
||||
|
|
@ -231,11 +269,41 @@ func (scopedStore *ScopedStore) checkQuota(operation, group, key string) error {
|
|||
}
|
||||
knownGroupCount++
|
||||
}
|
||||
if knownGroupCount >= scopedStore.quota.MaxGroups {
|
||||
return core.E(operation, core.Sprintf("group limit (%d)", scopedStore.quota.MaxGroups), QuotaExceededError)
|
||||
if knownGroupCount >= scopedStore.MaxGroups {
|
||||
return core.E(operation, core.Sprintf("group limit (%d)", scopedStore.MaxGroups), QuotaExceededError)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (scopedStore *ScopedStore) getArguments(arguments []string) (string, string, error) {
|
||||
switch len(arguments) {
|
||||
case 1:
|
||||
return scopedStore.defaultGroup(), arguments[0], nil
|
||||
case 2:
|
||||
return arguments[0], arguments[1], nil
|
||||
default:
|
||||
return "", "", core.E(
|
||||
"store.ScopedStore.Get",
|
||||
core.Sprintf("expected 1 or 2 arguments; got %d", len(arguments)),
|
||||
nil,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
func (scopedStore *ScopedStore) setArguments(arguments []string) (string, string, string, error) {
|
||||
switch len(arguments) {
|
||||
case 2:
|
||||
return scopedStore.defaultGroup(), arguments[0], arguments[1], nil
|
||||
case 3:
|
||||
return arguments[0], arguments[1], arguments[2], nil
|
||||
default:
|
||||
return "", "", "", core.E(
|
||||
"store.ScopedStore.Set",
|
||||
core.Sprintf("expected 2 or 3 arguments; got %d", len(arguments)),
|
||||
nil,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -94,6 +94,17 @@ func TestScope_NewScopedWithQuota_Bad_NegativeMaxGroups(t *testing.T) {
|
|||
assert.Contains(t, err.Error(), "zero or positive")
|
||||
}
|
||||
|
||||
func TestScope_NewScopedWithQuota_Good_InlineQuotaFields(t *testing.T) {
|
||||
storeInstance, _ := New(":memory:")
|
||||
defer storeInstance.Close()
|
||||
|
||||
scopedStore, err := NewScopedWithQuota(storeInstance, "tenant-a", QuotaConfig{MaxKeys: 4, MaxGroups: 2})
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.Equal(t, 4, scopedStore.MaxKeys)
|
||||
assert.Equal(t, 2, scopedStore.MaxGroups)
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// ScopedStore — basic CRUD
|
||||
// ---------------------------------------------------------------------------
|
||||
|
|
@ -110,6 +121,34 @@ func TestScope_ScopedStore_Good_SetGet(t *testing.T) {
|
|||
assert.Equal(t, "dark", value)
|
||||
}
|
||||
|
||||
func TestScope_ScopedStore_Good_DefaultGroupHelpers(t *testing.T) {
|
||||
storeInstance, _ := New(":memory:")
|
||||
defer storeInstance.Close()
|
||||
|
||||
scopedStore, _ := NewScoped(storeInstance, "tenant-a")
|
||||
require.NoError(t, scopedStore.Set("theme", "dark"))
|
||||
|
||||
value, err := scopedStore.Get("theme")
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "dark", value)
|
||||
|
||||
rawValue, err := storeInstance.Get("tenant-a:default", "theme")
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "dark", rawValue)
|
||||
}
|
||||
|
||||
func TestScope_ScopedStore_Good_SetInGetFrom(t *testing.T) {
|
||||
storeInstance, _ := New(":memory:")
|
||||
defer storeInstance.Close()
|
||||
|
||||
scopedStore, _ := NewScoped(storeInstance, "tenant-a")
|
||||
require.NoError(t, scopedStore.SetIn("config", "theme", "dark"))
|
||||
|
||||
value, err := scopedStore.GetFrom("config", "theme")
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "dark", value)
|
||||
}
|
||||
|
||||
func TestScope_ScopedStore_Good_PrefixedInUnderlyingStore(t *testing.T) {
|
||||
storeInstance, _ := New(":memory:")
|
||||
defer storeInstance.Close()
|
||||
|
|
|
|||
57
store.go
57
store.go
|
|
@ -29,14 +29,25 @@ const (
|
|||
entryValueColumn = "entry_value"
|
||||
)
|
||||
|
||||
// StoreOption customises Store construction.
|
||||
// Usage example: `storeInstance, err := store.New("/tmp/go-store.db", store.WithJournal("http://127.0.0.1:8086", "core", "events"))`
|
||||
type StoreOption func(*Store)
|
||||
|
||||
type journalConfig struct {
|
||||
url string
|
||||
org string
|
||||
bucket string
|
||||
}
|
||||
|
||||
// Store provides SQLite-backed grouped entries with TTL expiry, namespace
|
||||
// isolation, and reactive change notifications.
|
||||
// isolation, reactive change notifications, and optional journal support.
|
||||
// Usage example: `storeInstance, err := store.New(":memory:"); if err != nil { return }; if err := storeInstance.Set("config", "colour", "blue"); err != nil { return }`
|
||||
type Store struct {
|
||||
database *sql.DB
|
||||
cancelPurge context.CancelFunc
|
||||
purgeWaitGroup sync.WaitGroup
|
||||
purgeInterval time.Duration // interval between background purge cycles
|
||||
journal journalConfig
|
||||
|
||||
// Event dispatch state.
|
||||
watchers []*Watcher
|
||||
|
|
@ -47,8 +58,18 @@ type Store struct {
|
|||
nextCallbackRegistrationID uint64 // monotonic ID for callback registrations
|
||||
}
|
||||
|
||||
// Usage example: `storeInstance, err := store.New(":memory:"); if err != nil { return }`
|
||||
func New(databasePath string) (*Store, error) {
|
||||
// WithJournal records journal connection metadata for workspace commits,
|
||||
// journal queries, and archive generation.
|
||||
// Usage example: `storeInstance, err := store.New("/tmp/go-store.db", store.WithJournal("http://127.0.0.1:8086", "core", "events"))`
|
||||
func WithJournal(url, org, bucket string) StoreOption {
|
||||
return func(storeInstance *Store) {
|
||||
storeInstance.journal = journalConfig{url: url, org: org, bucket: bucket}
|
||||
}
|
||||
}
|
||||
|
||||
// Usage example: `storeInstance, err := store.New(":memory:")`
|
||||
// Usage example: `storeInstance, err := store.New("/tmp/go-store.db", store.WithJournal("http://127.0.0.1:8086", "core", "events"))`
|
||||
func New(databasePath string, options ...StoreOption) (*Store, error) {
|
||||
sqliteDatabase, err := sql.Open("sqlite", databasePath)
|
||||
if err != nil {
|
||||
return nil, core.E("store.New", "open database", err)
|
||||
|
|
@ -73,6 +94,11 @@ func New(databasePath string) (*Store, error) {
|
|||
|
||||
purgeContext, cancel := context.WithCancel(context.Background())
|
||||
storeInstance := &Store{database: sqliteDatabase, cancelPurge: cancel, purgeInterval: 60 * time.Second}
|
||||
for _, option := range options {
|
||||
if option != nil {
|
||||
option(storeInstance)
|
||||
}
|
||||
}
|
||||
storeInstance.startBackgroundPurge(purgeContext)
|
||||
return storeInstance, nil
|
||||
}
|
||||
|
|
@ -236,6 +262,11 @@ func (storeInstance *Store) All(group string) iter.Seq2[KeyValue, error] {
|
|||
}
|
||||
}
|
||||
|
||||
// Usage example: `for entry, err := range storeInstance.AllSeq("config") { if err != nil { break }; fmt.Println(entry.Key, entry.Value) }`
|
||||
func (storeInstance *Store) AllSeq(group string) iter.Seq2[KeyValue, error] {
|
||||
return storeInstance.All(group)
|
||||
}
|
||||
|
||||
// Usage example: `parts, err := storeInstance.GetSplit("config", "hosts", ","); if err != nil { return }; for part := range parts { fmt.Println(part) }`
|
||||
func (storeInstance *Store) GetSplit(group, key, separator string) (iter.Seq[string], error) {
|
||||
value, err := storeInstance.Get(group, key)
|
||||
|
|
@ -297,9 +328,10 @@ func (storeInstance *Store) CountAll(groupPrefix string) (int, error) {
|
|||
}
|
||||
|
||||
// Usage example: `tenantGroupNames, err := storeInstance.Groups("tenant-a:")`
|
||||
func (storeInstance *Store) Groups(groupPrefix string) ([]string, error) {
|
||||
// Usage example: `allGroupNames, err := storeInstance.Groups()`
|
||||
func (storeInstance *Store) Groups(groupPrefix ...string) ([]string, error) {
|
||||
var groupNames []string
|
||||
for groupName, err := range storeInstance.GroupsSeq(groupPrefix) {
|
||||
for groupName, err := range storeInstance.GroupsSeq(groupPrefix...) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -309,12 +341,14 @@ func (storeInstance *Store) Groups(groupPrefix string) ([]string, error) {
|
|||
}
|
||||
|
||||
// Usage example: `for tenantGroupName, err := range storeInstance.GroupsSeq("tenant-a:") { if err != nil { break }; fmt.Println(tenantGroupName) }`
|
||||
func (storeInstance *Store) GroupsSeq(groupPrefix string) iter.Seq2[string, error] {
|
||||
// Usage example: `for groupName, err := range storeInstance.GroupsSeq() { if err != nil { break }; fmt.Println(groupName) }`
|
||||
func (storeInstance *Store) GroupsSeq(groupPrefix ...string) iter.Seq2[string, error] {
|
||||
actualGroupPrefix := firstString(groupPrefix)
|
||||
return func(yield func(string, error) bool) {
|
||||
var rows *sql.Rows
|
||||
var err error
|
||||
now := time.Now().UnixMilli()
|
||||
if groupPrefix == "" {
|
||||
if actualGroupPrefix == "" {
|
||||
rows, err = storeInstance.database.Query(
|
||||
"SELECT DISTINCT "+entryGroupColumn+" FROM "+entriesTableName+" WHERE (expires_at IS NULL OR expires_at > ?) ORDER BY "+entryGroupColumn,
|
||||
now,
|
||||
|
|
@ -322,7 +356,7 @@ func (storeInstance *Store) GroupsSeq(groupPrefix string) iter.Seq2[string, erro
|
|||
} else {
|
||||
rows, err = storeInstance.database.Query(
|
||||
"SELECT DISTINCT "+entryGroupColumn+" FROM "+entriesTableName+" WHERE "+entryGroupColumn+" LIKE ? ESCAPE '^' AND (expires_at IS NULL OR expires_at > ?) ORDER BY "+entryGroupColumn,
|
||||
escapeLike(groupPrefix)+"%", now,
|
||||
escapeLike(actualGroupPrefix)+"%", now,
|
||||
)
|
||||
}
|
||||
if err != nil {
|
||||
|
|
@ -349,6 +383,13 @@ func (storeInstance *Store) GroupsSeq(groupPrefix string) iter.Seq2[string, erro
|
|||
}
|
||||
}
|
||||
|
||||
func firstString(values []string) string {
|
||||
if len(values) == 0 {
|
||||
return ""
|
||||
}
|
||||
return values[0]
|
||||
}
|
||||
|
||||
// escapeLike("tenant_%") returns "tenant^_^%" so LIKE queries treat wildcards
|
||||
// literally.
|
||||
func escapeLike(text string) string {
|
||||
|
|
|
|||
|
|
@ -98,6 +98,16 @@ func TestStore_New_Good_WALMode(t *testing.T) {
|
|||
assert.Equal(t, "wal", mode, "journal_mode should be WAL")
|
||||
}
|
||||
|
||||
func TestStore_New_Good_WithJournalOption(t *testing.T) {
|
||||
storeInstance, err := New(":memory:", WithJournal("http://127.0.0.1:8086", "core", "events"))
|
||||
require.NoError(t, err)
|
||||
defer storeInstance.Close()
|
||||
|
||||
assert.Equal(t, "events", storeInstance.journal.bucket)
|
||||
assert.Equal(t, "core", storeInstance.journal.org)
|
||||
assert.Equal(t, "http://127.0.0.1:8086", storeInstance.journal.url)
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Set / Get — core CRUD
|
||||
// ---------------------------------------------------------------------------
|
||||
|
|
@ -372,6 +382,22 @@ func TestStore_All_Good_SortedByKey(t *testing.T) {
|
|||
assert.Equal(t, []string{"alpha", "bravo", "charlie"}, keys)
|
||||
}
|
||||
|
||||
func TestStore_AllSeq_Good_Alias(t *testing.T) {
|
||||
storeInstance, _ := New(":memory:")
|
||||
defer storeInstance.Close()
|
||||
|
||||
require.NoError(t, storeInstance.Set("g", "alpha", "1"))
|
||||
require.NoError(t, storeInstance.Set("g", "bravo", "2"))
|
||||
|
||||
var keys []string
|
||||
for entry, err := range storeInstance.AllSeq("g") {
|
||||
require.NoError(t, err)
|
||||
keys = append(keys, entry.Key)
|
||||
}
|
||||
|
||||
assert.Equal(t, []string{"alpha", "bravo"}, keys)
|
||||
}
|
||||
|
||||
func TestStore_All_Bad_ClosedStore(t *testing.T) {
|
||||
storeInstance, _ := New(":memory:")
|
||||
storeInstance.Close()
|
||||
|
|
@ -434,6 +460,22 @@ func TestStore_GroupsSeq_Good_SortedByGroupName(t *testing.T) {
|
|||
assert.Equal(t, []string{"alpha", "bravo", "charlie"}, groups)
|
||||
}
|
||||
|
||||
func TestStore_GroupsSeq_Good_DefaultArgument(t *testing.T) {
|
||||
storeInstance, _ := New(":memory:")
|
||||
defer storeInstance.Close()
|
||||
|
||||
require.NoError(t, storeInstance.Set("alpha", "a", "1"))
|
||||
require.NoError(t, storeInstance.Set("beta", "b", "2"))
|
||||
|
||||
var groups []string
|
||||
for group, err := range storeInstance.GroupsSeq() {
|
||||
require.NoError(t, err)
|
||||
groups = append(groups, group)
|
||||
}
|
||||
|
||||
assert.Equal(t, []string{"alpha", "beta"}, groups)
|
||||
}
|
||||
|
||||
func TestStore_GroupsSeq_Bad_ClosedStore(t *testing.T) {
|
||||
storeInstance, _ := New(":memory:")
|
||||
storeInstance.Close()
|
||||
|
|
|
|||
|
|
@ -43,3 +43,38 @@ func repeatString(value string, count int) string {
|
|||
}
|
||||
return builder.String()
|
||||
}
|
||||
|
||||
func useWorkspaceStateDirectory(tb testing.TB) string {
|
||||
tb.Helper()
|
||||
|
||||
previous := defaultWorkspaceStateDirectory
|
||||
stateDirectory := testPath(tb, "state")
|
||||
defaultWorkspaceStateDirectory = stateDirectory
|
||||
tb.Cleanup(func() {
|
||||
defaultWorkspaceStateDirectory = previous
|
||||
_ = testFilesystem().DeleteAll(stateDirectory)
|
||||
})
|
||||
return stateDirectory
|
||||
}
|
||||
|
||||
func useArchiveOutputDirectory(tb testing.TB) string {
|
||||
tb.Helper()
|
||||
|
||||
previous := defaultArchiveOutputDirectory
|
||||
outputDirectory := testPath(tb, "archive")
|
||||
defaultArchiveOutputDirectory = outputDirectory
|
||||
tb.Cleanup(func() {
|
||||
defaultArchiveOutputDirectory = previous
|
||||
_ = testFilesystem().DeleteAll(outputDirectory)
|
||||
})
|
||||
return outputDirectory
|
||||
}
|
||||
|
||||
func requireResultRows(tb testing.TB, result core.Result) []map[string]any {
|
||||
tb.Helper()
|
||||
|
||||
require.True(tb, result.OK, "core result failed: %v", result.Value)
|
||||
rows, ok := result.Value.([]map[string]any)
|
||||
require.True(tb, ok, "unexpected row type: %T", result.Value)
|
||||
return rows
|
||||
}
|
||||
|
|
|
|||
326
workspace.go
Normal file
326
workspace.go
Normal file
|
|
@ -0,0 +1,326 @@
|
|||
package store
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"io/fs"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
core "dappco.re/go/core"
|
||||
)
|
||||
|
||||
const (
|
||||
workspaceEntriesTableName = "workspace_entries"
|
||||
workspaceIdentityGroupName = "workspace"
|
||||
)
|
||||
|
||||
const createWorkspaceEntriesTableSQL = `CREATE TABLE IF NOT EXISTS workspace_entries (
|
||||
entry_id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
entry_kind TEXT NOT NULL,
|
||||
entry_data TEXT NOT NULL,
|
||||
created_at INTEGER NOT NULL
|
||||
)`
|
||||
|
||||
var defaultWorkspaceStateDirectory = ".core/state"
|
||||
|
||||
// Workspace accumulates mutable work-in-progress entries before they are
|
||||
// committed to the durable store journal.
|
||||
// Usage example: `workspace, err := storeInstance.NewWorkspace("scroll-session-2026-03-30"); if err != nil { return }; defer workspace.Discard()`
|
||||
type Workspace struct {
|
||||
name string
|
||||
storeInstance *Store
|
||||
database *sql.DB
|
||||
databasePath string
|
||||
filesystem *core.Fs
|
||||
|
||||
closeLock sync.Mutex
|
||||
closed bool
|
||||
}
|
||||
|
||||
// NewWorkspace creates a workspace state file under `.core/state/`.
|
||||
// Usage example: `workspace, err := storeInstance.NewWorkspace("scroll-session-2026-03-30")`
|
||||
func (storeInstance *Store) NewWorkspace(name string) (*Workspace, error) {
|
||||
validation := core.ValidateName(name)
|
||||
if !validation.OK {
|
||||
return nil, core.E("store.NewWorkspace", "validate workspace name", validation.Value.(error))
|
||||
}
|
||||
|
||||
filesystem := (&core.Fs{}).NewUnrestricted()
|
||||
databasePath := workspaceFilePath(defaultWorkspaceStateDirectory, name)
|
||||
if filesystem.Exists(databasePath) {
|
||||
return nil, core.E("store.NewWorkspace", core.Concat("workspace already exists: ", name), nil)
|
||||
}
|
||||
if result := filesystem.EnsureDir(defaultWorkspaceStateDirectory); !result.OK {
|
||||
return nil, core.E("store.NewWorkspace", "ensure state directory", result.Value.(error))
|
||||
}
|
||||
|
||||
workspaceDatabase, err := openWorkspaceDatabase(databasePath)
|
||||
if err != nil {
|
||||
return nil, core.E("store.NewWorkspace", "open workspace database", err)
|
||||
}
|
||||
|
||||
return &Workspace{
|
||||
name: name,
|
||||
storeInstance: storeInstance,
|
||||
database: workspaceDatabase,
|
||||
databasePath: databasePath,
|
||||
filesystem: filesystem,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// RecoverOrphans opens any leftover workspace files so callers can inspect and
|
||||
// decide whether to commit or discard them.
|
||||
// Usage example: `orphans := storeInstance.RecoverOrphans(".core/state")`
|
||||
func (storeInstance *Store) RecoverOrphans(stateDirectory string) []*Workspace {
|
||||
if stateDirectory == "" {
|
||||
stateDirectory = defaultWorkspaceStateDirectory
|
||||
}
|
||||
|
||||
filesystem := (&core.Fs{}).NewUnrestricted()
|
||||
if !filesystem.Exists(stateDirectory) {
|
||||
return nil
|
||||
}
|
||||
|
||||
listResult := filesystem.List(stateDirectory)
|
||||
if !listResult.OK {
|
||||
return nil
|
||||
}
|
||||
|
||||
entries, ok := listResult.Value.([]fs.DirEntry)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
var workspaces []*Workspace
|
||||
for _, dirEntry := range entries {
|
||||
if dirEntry.IsDir() || !core.HasSuffix(dirEntry.Name(), ".duckdb") {
|
||||
continue
|
||||
}
|
||||
name := core.TrimSuffix(dirEntry.Name(), ".duckdb")
|
||||
databasePath := workspaceFilePath(stateDirectory, name)
|
||||
workspaceDatabase, err := openWorkspaceDatabase(databasePath)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
workspaces = append(workspaces, &Workspace{
|
||||
name: name,
|
||||
storeInstance: storeInstance,
|
||||
database: workspaceDatabase,
|
||||
databasePath: databasePath,
|
||||
filesystem: filesystem,
|
||||
})
|
||||
}
|
||||
return workspaces
|
||||
}
|
||||
|
||||
// Put appends one entry to the workspace buffer.
|
||||
// Usage example: `err := workspace.Put("like", map[string]any{"user": "@alice", "post": "video_123"})`
|
||||
func (workspace *Workspace) Put(kind string, data map[string]any) error {
|
||||
if kind == "" {
|
||||
return core.E("store.Workspace.Put", "kind is empty", nil)
|
||||
}
|
||||
if data == nil {
|
||||
data = map[string]any{}
|
||||
}
|
||||
|
||||
dataJSON, err := jsonString(data, "store.Workspace.Put", "marshal entry data")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = workspace.database.Exec(
|
||||
"INSERT INTO "+workspaceEntriesTableName+" (entry_kind, entry_data, created_at) VALUES (?, ?, ?)",
|
||||
kind,
|
||||
dataJSON,
|
||||
time.Now().UnixMilli(),
|
||||
)
|
||||
if err != nil {
|
||||
return core.E("store.Workspace.Put", "insert entry", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Aggregate returns the current per-kind entry counts in the workspace.
|
||||
// Usage example: `summary := workspace.Aggregate()`
|
||||
func (workspace *Workspace) Aggregate() map[string]any {
|
||||
fields, err := workspace.aggregateFields()
|
||||
if err != nil {
|
||||
return map[string]any{}
|
||||
}
|
||||
return fields
|
||||
}
|
||||
|
||||
// Commit writes the aggregated workspace state to the journal and updates the
|
||||
// store summary entry for the workspace.
|
||||
// Usage example: `result := workspace.Commit()`
|
||||
func (workspace *Workspace) Commit() core.Result {
|
||||
fields, err := workspace.aggregateFields()
|
||||
if err != nil {
|
||||
return core.Result{Value: core.E("store.Workspace.Commit", "aggregate workspace", err), OK: false}
|
||||
}
|
||||
if err := workspace.storeInstance.commitWorkspaceAggregate(workspace.name, fields); err != nil {
|
||||
return core.Result{Value: err, OK: false}
|
||||
}
|
||||
if err := workspace.closeAndDelete(); err != nil {
|
||||
return core.Result{Value: err, OK: false}
|
||||
}
|
||||
return core.Result{Value: fields, OK: true}
|
||||
}
|
||||
|
||||
// Discard closes the workspace and removes its backing file.
|
||||
// Usage example: `workspace.Discard()`
|
||||
func (workspace *Workspace) Discard() {
|
||||
_ = workspace.closeAndDelete()
|
||||
}
|
||||
|
||||
// Query runs ad-hoc SQL against the workspace buffer.
|
||||
// Usage example: `result := workspace.Query("SELECT entry_kind, COUNT(*) AS count FROM workspace_entries GROUP BY entry_kind")`
|
||||
func (workspace *Workspace) Query(sqlQuery string) core.Result {
|
||||
rows, err := workspace.database.Query(sqlQuery)
|
||||
if err != nil {
|
||||
return core.Result{Value: core.E("store.Workspace.Query", "query workspace", err), OK: false}
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
rowMaps, err := queryRowsAsMaps(rows)
|
||||
if err != nil {
|
||||
return core.Result{Value: core.E("store.Workspace.Query", "scan rows", err), OK: false}
|
||||
}
|
||||
return core.Result{Value: rowMaps, OK: true}
|
||||
}
|
||||
|
||||
func (workspace *Workspace) aggregateFields() (map[string]any, error) {
|
||||
rows, err := workspace.database.Query(
|
||||
"SELECT entry_kind, COUNT(*) FROM " + workspaceEntriesTableName + " GROUP BY entry_kind ORDER BY entry_kind",
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
fields := make(map[string]any)
|
||||
for rows.Next() {
|
||||
var (
|
||||
kind string
|
||||
count int
|
||||
)
|
||||
if err := rows.Scan(&kind, &count); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
fields[kind] = count
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return fields, nil
|
||||
}
|
||||
|
||||
func (workspace *Workspace) closeAndDelete() error {
|
||||
workspace.closeLock.Lock()
|
||||
defer workspace.closeLock.Unlock()
|
||||
|
||||
if workspace.closed {
|
||||
return nil
|
||||
}
|
||||
workspace.closed = true
|
||||
|
||||
if err := workspace.database.Close(); err != nil {
|
||||
return core.E("store.Workspace.closeAndDelete", "close workspace database", err)
|
||||
}
|
||||
for _, path := range []string{workspace.databasePath, workspace.databasePath + "-wal", workspace.databasePath + "-shm"} {
|
||||
if result := workspace.filesystem.Delete(path); !result.OK && workspace.filesystem.Exists(path) {
|
||||
return core.E("store.Workspace.closeAndDelete", "delete workspace file", result.Value.(error))
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (storeInstance *Store) commitWorkspaceAggregate(workspaceName string, fields map[string]any) error {
|
||||
if err := ensureJournalSchema(storeInstance.database); err != nil {
|
||||
return core.E("store.Workspace.Commit", "ensure journal schema", err)
|
||||
}
|
||||
|
||||
transaction, err := storeInstance.database.Begin()
|
||||
if err != nil {
|
||||
return core.E("store.Workspace.Commit", "begin transaction", err)
|
||||
}
|
||||
|
||||
committed := false
|
||||
defer func() {
|
||||
if !committed {
|
||||
_ = transaction.Rollback()
|
||||
}
|
||||
}()
|
||||
|
||||
fieldsJSON, err := jsonString(fields, "store.Workspace.Commit", "marshal summary")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
tagsJSON, err := jsonString(map[string]string{"workspace": workspaceName}, "store.Workspace.Commit", "marshal tags")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := insertJournalEntry(
|
||||
transaction,
|
||||
storeInstance.journalBucket(),
|
||||
workspaceName,
|
||||
fieldsJSON,
|
||||
tagsJSON,
|
||||
time.Now().UnixMilli(),
|
||||
); err != nil {
|
||||
return core.E("store.Workspace.Commit", "insert journal entry", err)
|
||||
}
|
||||
|
||||
if _, err := transaction.Exec(
|
||||
"INSERT INTO "+entriesTableName+" ("+entryGroupColumn+", "+entryKeyColumn+", "+entryValueColumn+", expires_at) VALUES (?, ?, ?, NULL) "+
|
||||
"ON CONFLICT("+entryGroupColumn+", "+entryKeyColumn+") DO UPDATE SET "+entryValueColumn+" = excluded."+entryValueColumn+", expires_at = NULL",
|
||||
workspaceSummaryGroup(workspaceName),
|
||||
"summary",
|
||||
fieldsJSON,
|
||||
); err != nil {
|
||||
return core.E("store.Workspace.Commit", "upsert workspace summary", err)
|
||||
}
|
||||
|
||||
if err := transaction.Commit(); err != nil {
|
||||
return core.E("store.Workspace.Commit", "commit transaction", err)
|
||||
}
|
||||
committed = true
|
||||
return nil
|
||||
}
|
||||
|
||||
func openWorkspaceDatabase(databasePath string) (*sql.DB, error) {
|
||||
workspaceDatabase, err := sql.Open("sqlite", databasePath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
workspaceDatabase.SetMaxOpenConns(1)
|
||||
if _, err := workspaceDatabase.Exec("PRAGMA journal_mode=WAL"); err != nil {
|
||||
workspaceDatabase.Close()
|
||||
return nil, err
|
||||
}
|
||||
if _, err := workspaceDatabase.Exec("PRAGMA busy_timeout=5000"); err != nil {
|
||||
workspaceDatabase.Close()
|
||||
return nil, err
|
||||
}
|
||||
if _, err := workspaceDatabase.Exec(createWorkspaceEntriesTableSQL); err != nil {
|
||||
workspaceDatabase.Close()
|
||||
return nil, err
|
||||
}
|
||||
return workspaceDatabase, nil
|
||||
}
|
||||
|
||||
func workspaceSummaryGroup(workspaceName string) string {
|
||||
return core.Concat(workspaceIdentityGroupName, ":", workspaceName)
|
||||
}
|
||||
|
||||
func workspaceFilePath(stateDirectory, name string) string {
|
||||
return joinPath(stateDirectory, core.Concat(name, ".duckdb"))
|
||||
}
|
||||
|
||||
func joinPath(base, name string) string {
|
||||
if base == "" {
|
||||
return name
|
||||
}
|
||||
return core.Concat(core.TrimSuffix(base, "/"), "/", name)
|
||||
}
|
||||
121
workspace_test.go
Normal file
121
workspace_test.go
Normal file
|
|
@ -0,0 +1,121 @@
|
|||
package store
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
core "dappco.re/go/core"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestWorkspace_NewWorkspace_Good_CreatePutAggregateQuery(t *testing.T) {
|
||||
stateDirectory := useWorkspaceStateDirectory(t)
|
||||
|
||||
storeInstance, err := New(":memory:", WithJournal("http://127.0.0.1:8086", "core", "events"))
|
||||
require.NoError(t, err)
|
||||
defer storeInstance.Close()
|
||||
|
||||
workspace, err := storeInstance.NewWorkspace("scroll-session")
|
||||
require.NoError(t, err)
|
||||
defer workspace.Discard()
|
||||
|
||||
assert.Equal(t, workspaceFilePath(stateDirectory, "scroll-session"), workspace.databasePath)
|
||||
assert.True(t, testFilesystem().Exists(workspace.databasePath))
|
||||
|
||||
require.NoError(t, workspace.Put("like", map[string]any{"user": "@alice"}))
|
||||
require.NoError(t, workspace.Put("like", map[string]any{"user": "@bob"}))
|
||||
require.NoError(t, workspace.Put("profile_match", map[string]any{"user": "@charlie"}))
|
||||
|
||||
assert.Equal(t, map[string]any{"like": 2, "profile_match": 1}, workspace.Aggregate())
|
||||
|
||||
rows := requireResultRows(
|
||||
t,
|
||||
workspace.Query("SELECT entry_kind, COUNT(*) AS entry_count FROM workspace_entries GROUP BY entry_kind ORDER BY entry_kind"),
|
||||
)
|
||||
require.Len(t, rows, 2)
|
||||
assert.Equal(t, "like", rows[0]["entry_kind"])
|
||||
assert.Equal(t, int64(2), rows[0]["entry_count"])
|
||||
assert.Equal(t, "profile_match", rows[1]["entry_kind"])
|
||||
assert.Equal(t, int64(1), rows[1]["entry_count"])
|
||||
}
|
||||
|
||||
func TestWorkspace_Commit_Good_JournalAndSummary(t *testing.T) {
|
||||
useWorkspaceStateDirectory(t)
|
||||
|
||||
storeInstance, err := New(":memory:", WithJournal("http://127.0.0.1:8086", "core", "events"))
|
||||
require.NoError(t, err)
|
||||
defer storeInstance.Close()
|
||||
|
||||
workspace, err := storeInstance.NewWorkspace("scroll-session")
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NoError(t, workspace.Put("like", map[string]any{"user": "@alice"}))
|
||||
require.NoError(t, workspace.Put("like", map[string]any{"user": "@bob"}))
|
||||
require.NoError(t, workspace.Put("profile_match", map[string]any{"user": "@charlie"}))
|
||||
|
||||
result := workspace.Commit()
|
||||
require.True(t, result.OK, "workspace commit failed: %v", result.Value)
|
||||
assert.Equal(t, map[string]any{"like": 2, "profile_match": 1}, result.Value)
|
||||
assert.False(t, testFilesystem().Exists(workspace.databasePath))
|
||||
|
||||
summaryJSON, err := storeInstance.Get(workspaceSummaryGroup("scroll-session"), "summary")
|
||||
require.NoError(t, err)
|
||||
|
||||
summary := make(map[string]any)
|
||||
summaryResult := core.JSONUnmarshalString(summaryJSON, &summary)
|
||||
require.True(t, summaryResult.OK, "summary unmarshal failed: %v", summaryResult.Value)
|
||||
assert.Equal(t, float64(2), summary["like"])
|
||||
assert.Equal(t, float64(1), summary["profile_match"])
|
||||
|
||||
rows := requireResultRows(
|
||||
t,
|
||||
storeInstance.QueryJournal(`from(bucket: "events") |> range(start: -24h) |> filter(fn: (r) => r._measurement == "scroll-session")`),
|
||||
)
|
||||
require.Len(t, rows, 1)
|
||||
assert.Equal(t, "scroll-session", rows[0]["measurement"])
|
||||
|
||||
fields, ok := rows[0]["fields"].(map[string]any)
|
||||
require.True(t, ok, "unexpected fields type: %T", rows[0]["fields"])
|
||||
assert.Equal(t, float64(2), fields["like"])
|
||||
assert.Equal(t, float64(1), fields["profile_match"])
|
||||
|
||||
tags, ok := rows[0]["tags"].(map[string]string)
|
||||
require.True(t, ok, "unexpected tags type: %T", rows[0]["tags"])
|
||||
assert.Equal(t, "scroll-session", tags["workspace"])
|
||||
}
|
||||
|
||||
func TestWorkspace_Discard_Good_Idempotent(t *testing.T) {
|
||||
useWorkspaceStateDirectory(t)
|
||||
|
||||
storeInstance, err := New(":memory:")
|
||||
require.NoError(t, err)
|
||||
defer storeInstance.Close()
|
||||
|
||||
workspace, err := storeInstance.NewWorkspace("discard-session")
|
||||
require.NoError(t, err)
|
||||
|
||||
workspace.Discard()
|
||||
workspace.Discard()
|
||||
|
||||
assert.False(t, testFilesystem().Exists(workspace.databasePath))
|
||||
}
|
||||
|
||||
func TestWorkspace_RecoverOrphans_Good(t *testing.T) {
|
||||
stateDirectory := useWorkspaceStateDirectory(t)
|
||||
|
||||
storeInstance, err := New(":memory:", WithJournal("http://127.0.0.1:8086", "core", "events"))
|
||||
require.NoError(t, err)
|
||||
defer storeInstance.Close()
|
||||
|
||||
workspace, err := storeInstance.NewWorkspace("orphan-session")
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, workspace.Put("like", map[string]any{"user": "@alice"}))
|
||||
require.NoError(t, workspace.database.Close())
|
||||
|
||||
orphans := storeInstance.RecoverOrphans(stateDirectory)
|
||||
require.Len(t, orphans, 1)
|
||||
assert.Equal(t, map[string]any{"like": 1}, orphans[0].Aggregate())
|
||||
|
||||
orphans[0].Discard()
|
||||
assert.False(t, testFilesystem().Exists(workspaceFilePath(stateDirectory, "orphan-session")))
|
||||
}
|
||||
Loading…
Add table
Reference in a new issue