diff --git a/compact.go b/compact.go new file mode 100644 index 0000000..9af6b8c --- /dev/null +++ b/compact.go @@ -0,0 +1,191 @@ +package store + +import ( + "compress/gzip" + "io" + "time" + + core "dappco.re/go/core" +) + +var defaultArchiveOutputDirectory = ".core/archive" + +// CompactOptions controls cold archive generation. +// Usage example: `options := store.CompactOptions{Before: time.Now().Add(-90 * 24 * time.Hour), Output: "/tmp/archive", Format: "gzip"}` +type CompactOptions struct { + Before time.Time + Output string + Format string +} + +type compactArchiveEntry struct { + id int64 + bucket string + measurement string + fieldsJSON string + tagsJSON string + committedAt int64 +} + +// Compact archives old journal entries as newline-delimited JSON. +// Usage example: `result := storeInstance.Compact(store.CompactOptions{Before: time.Now().Add(-30 * 24 * time.Hour), Output: "/tmp/archive", Format: "gzip"})` +func (storeInstance *Store) Compact(options CompactOptions) core.Result { + if err := ensureJournalSchema(storeInstance.database); err != nil { + return core.Result{Value: core.E("store.Compact", "ensure journal schema", err), OK: false} + } + + outputDirectory := options.Output + if outputDirectory == "" { + outputDirectory = defaultArchiveOutputDirectory + } + format := options.Format + if format == "" { + format = "gzip" + } + if format != "gzip" { + return core.Result{Value: core.E("store.Compact", core.Concat("unsupported archive format: ", format), nil), OK: false} + } + + filesystem := (&core.Fs{}).NewUnrestricted() + if result := filesystem.EnsureDir(outputDirectory); !result.OK { + return core.Result{Value: core.E("store.Compact", "ensure archive directory", result.Value.(error)), OK: false} + } + + rows, err := storeInstance.database.Query( + "SELECT entry_id, bucket_name, measurement, fields_json, tags_json, committed_at FROM "+journalEntriesTableName+" WHERE archived_at IS NULL AND committed_at < ? ORDER BY committed_at", + options.Before.UnixMilli(), + ) + if err != nil { + return core.Result{Value: core.E("store.Compact", "query journal rows", err), OK: false} + } + defer rows.Close() + + var archiveEntries []compactArchiveEntry + for rows.Next() { + var entry compactArchiveEntry + if err := rows.Scan( + &entry.id, + &entry.bucket, + &entry.measurement, + &entry.fieldsJSON, + &entry.tagsJSON, + &entry.committedAt, + ); err != nil { + return core.Result{Value: core.E("store.Compact", "scan journal row", err), OK: false} + } + archiveEntries = append(archiveEntries, entry) + } + if err := rows.Err(); err != nil { + return core.Result{Value: core.E("store.Compact", "iterate journal rows", err), OK: false} + } + if len(archiveEntries) == 0 { + return core.Result{Value: "", OK: true} + } + + outputPath := compactOutputPath(outputDirectory, format) + createResult := filesystem.Create(outputPath) + if !createResult.OK { + return core.Result{Value: core.E("store.Compact", "create archive file", createResult.Value.(error)), OK: false} + } + + file, ok := createResult.Value.(io.WriteCloser) + if !ok { + return core.Result{Value: core.E("store.Compact", "archive file is not writable", nil), OK: false} + } + fileClosed := false + defer func() { + if !fileClosed { + _ = file.Close() + } + }() + + writer := gzip.NewWriter(file) + writeOK := false + defer func() { + if !writeOK { + _ = writer.Close() + } + }() + + for _, entry := range archiveEntries { + lineMap, err := compactArchiveLine(entry) + if err != nil { + return core.Result{Value: err, OK: false} + } + lineJSON, err := jsonString(lineMap, "store.Compact", "marshal archive line") + if err != nil { + return core.Result{Value: err, OK: false} + } + if _, err := io.WriteString(writer, lineJSON+"\n"); err != nil { + return core.Result{Value: core.E("store.Compact", "write archive line", err), OK: false} + } + } + if err := writer.Close(); err != nil { + return core.Result{Value: core.E("store.Compact", "close archive writer", err), OK: false} + } + writeOK = true + if err := file.Close(); err != nil { + return core.Result{Value: core.E("store.Compact", "close archive file", err), OK: false} + } + fileClosed = true + + transaction, err := storeInstance.database.Begin() + if err != nil { + return core.Result{Value: core.E("store.Compact", "begin archive transaction", err), OK: false} + } + + committed := false + defer func() { + if !committed { + _ = transaction.Rollback() + } + }() + + archivedAt := time.Now().UnixMilli() + for _, entry := range archiveEntries { + if _, err := transaction.Exec( + "UPDATE "+journalEntriesTableName+" SET archived_at = ? WHERE entry_id = ?", + archivedAt, + entry.id, + ); err != nil { + return core.Result{Value: core.E("store.Compact", "mark journal row archived", err), OK: false} + } + } + if err := transaction.Commit(); err != nil { + return core.Result{Value: core.E("store.Compact", "commit archive transaction", err), OK: false} + } + committed = true + + return core.Result{Value: outputPath, OK: true} +} + +func compactArchiveLine(entry compactArchiveEntry) (map[string]any, error) { + fields := make(map[string]any) + fieldsResult := core.JSONUnmarshalString(entry.fieldsJSON, &fields) + if !fieldsResult.OK { + return nil, core.E("store.Compact", "unmarshal fields", fieldsResult.Value.(error)) + } + + tags := make(map[string]string) + tagsResult := core.JSONUnmarshalString(entry.tagsJSON, &tags) + if !tagsResult.OK { + return nil, core.E("store.Compact", "unmarshal tags", tagsResult.Value.(error)) + } + + return map[string]any{ + "bucket": entry.bucket, + "measurement": entry.measurement, + "fields": fields, + "tags": tags, + "committed_at": entry.committedAt, + }, nil +} + +func compactOutputPath(outputDirectory, format string) string { + extension := ".jsonl" + if format == "gzip" { + extension = ".jsonl.gz" + } + filename := core.Concat("journal-", time.Now().UTC().Format("20060102-150405"), extension) + return joinPath(outputDirectory, filename) +} diff --git a/compact_test.go b/compact_test.go new file mode 100644 index 0000000..0cb1889 --- /dev/null +++ b/compact_test.go @@ -0,0 +1,81 @@ +package store + +import ( + "bytes" + "compress/gzip" + "io" + "testing" + "time" + + core "dappco.re/go/core" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestCompact_Compact_Good_GzipArchive(t *testing.T) { + outputDirectory := useArchiveOutputDirectory(t) + + storeInstance, err := New(":memory:", WithJournal("http://127.0.0.1:8086", "core", "events")) + require.NoError(t, err) + defer storeInstance.Close() + + require.True(t, + storeInstance.CommitToJournal("session-a", map[string]any{"like": 1}, map[string]string{"workspace": "session-a"}).OK, + ) + require.True(t, + storeInstance.CommitToJournal("session-b", map[string]any{"like": 2}, map[string]string{"workspace": "session-b"}).OK, + ) + + _, err = storeInstance.database.Exec( + "UPDATE "+journalEntriesTableName+" SET committed_at = ? WHERE measurement = ?", + time.Now().Add(-48*time.Hour).UnixMilli(), + "session-a", + ) + require.NoError(t, err) + + result := storeInstance.Compact(CompactOptions{ + Before: time.Now().Add(-24 * time.Hour), + Output: outputDirectory, + Format: "gzip", + }) + require.True(t, result.OK, "compact failed: %v", result.Value) + + archivePath, ok := result.Value.(string) + require.True(t, ok, "unexpected archive path type: %T", result.Value) + assert.True(t, testFilesystem().Exists(archivePath)) + + archiveData := requireCoreReadBytes(t, archivePath) + reader, err := gzip.NewReader(bytes.NewReader(archiveData)) + require.NoError(t, err) + defer reader.Close() + + decompressedData, err := io.ReadAll(reader) + require.NoError(t, err) + lines := core.Split(core.Trim(string(decompressedData)), "\n") + require.Len(t, lines, 1) + + archivedRow := make(map[string]any) + unmarshalResult := core.JSONUnmarshalString(lines[0], &archivedRow) + require.True(t, unmarshalResult.OK, "archive line unmarshal failed: %v", unmarshalResult.Value) + assert.Equal(t, "session-a", archivedRow["measurement"]) + + remainingRows := requireResultRows(t, storeInstance.QueryJournal("")) + require.Len(t, remainingRows, 1) + assert.Equal(t, "session-b", remainingRows[0]["measurement"]) +} + +func TestCompact_Compact_Good_NoRows(t *testing.T) { + outputDirectory := useArchiveOutputDirectory(t) + + storeInstance, err := New(":memory:") + require.NoError(t, err) + defer storeInstance.Close() + + result := storeInstance.Compact(CompactOptions{ + Before: time.Now(), + Output: outputDirectory, + Format: "gzip", + }) + require.True(t, result.OK, "compact failed: %v", result.Value) + assert.Equal(t, "", result.Value) +} diff --git a/events.go b/events.go index bbe89ab..a3ce353 100644 --- a/events.go +++ b/events.go @@ -77,13 +77,18 @@ const watcherEventBufferCapacity = 16 // Watch registers a buffered subscription for matching mutations. // Usage example: `watcher := storeInstance.Watch("*", "*")` -func (storeInstance *Store) Watch(group, key string) *Watcher { +// Usage example: `watcher := storeInstance.Watch("config")` +func (storeInstance *Store) Watch(group string, key ...string) *Watcher { + keyPattern := "*" + if len(key) > 0 && key[0] != "" { + keyPattern = key[0] + } eventChannel := make(chan Event, watcherEventBufferCapacity) watcher := &Watcher{ Events: eventChannel, eventsChannel: eventChannel, groupPattern: group, - keyPattern: key, + keyPattern: keyPattern, registrationID: atomic.AddUint64(&storeInstance.nextWatcherRegistrationID, 1), } @@ -115,7 +120,9 @@ func (storeInstance *Store) Unwatch(watcher *Watcher) { // OnChange registers a synchronous mutation callback. // Usage example: `events := make(chan store.Event, 1); unregister := storeInstance.OnChange(func(event store.Event) { events <- event }); defer unregister()` -func (storeInstance *Store) OnChange(callback func(Event)) func() { +// Usage example: `unregister := storeInstance.OnChange("config", func(key, value string) { fmt.Println(key, value) })` +func (storeInstance *Store) OnChange(arguments ...any) func() { + callback := onChangeCallback(arguments) if callback == nil { return func() {} } @@ -140,6 +147,39 @@ func (storeInstance *Store) OnChange(callback func(Event)) func() { } } +func onChangeCallback(arguments []any) func(Event) { + switch len(arguments) { + case 0: + return nil + case 1: + if arguments[0] == nil { + return nil + } + callback, ok := arguments[0].(func(Event)) + if !ok { + return nil + } + return callback + case 2: + group, ok := arguments[0].(string) + if !ok { + return nil + } + callback, ok := arguments[1].(func(string, string)) + if !ok || callback == nil { + return nil + } + return func(event Event) { + if event.Group != group { + return + } + callback(event.Key, event.Value) + } + default: + return nil + } +} + // notify(Event{Type: EventSet, Group: "config", Key: "colour", Value: "blue"}) // dispatches matching watchers and callbacks after a successful write. If a // watcher buffer is full, the event is dropped instead of blocking the writer. diff --git a/events_test.go b/events_test.go index 2e04080..924d7a4 100644 --- a/events_test.go +++ b/events_test.go @@ -66,6 +66,22 @@ func TestEvents_Watch_Good_WildcardKey(t *testing.T) { assert.Equal(t, "colour", received[1].Key) } +func TestEvents_Watch_Good_DefaultWildcardKey(t *testing.T) { + storeInstance, _ := New(":memory:") + defer storeInstance.Close() + + watcher := storeInstance.Watch("config") + defer storeInstance.Unwatch(watcher) + + require.NoError(t, storeInstance.Set("config", "theme", "dark")) + require.NoError(t, storeInstance.Set("config", "colour", "blue")) + + received := drainEvents(watcher.Events, 2, time.Second) + require.Len(t, received, 2) + assert.Equal(t, "theme", received[0].Key) + assert.Equal(t, "colour", received[1].Key) +} + func TestEvents_Watch_Good_GroupMismatch(t *testing.T) { storeInstance, _ := New(":memory:") defer storeInstance.Close() @@ -300,6 +316,22 @@ func TestEvents_OnChange_Good_NilCallbackNoOp(t *testing.T) { unregister() } +func TestEvents_OnChange_Good_GroupCallback(t *testing.T) { + storeInstance, _ := New(":memory:") + defer storeInstance.Close() + + var keys []string + unregister := storeInstance.OnChange("config", func(key, value string) { + keys = append(keys, key) + }) + defer unregister() + + require.NoError(t, storeInstance.Set("config", "theme", "dark")) + require.NoError(t, storeInstance.Set("other", "theme", "ignored")) + + assert.Equal(t, []string{"theme"}, keys) +} + // --------------------------------------------------------------------------- // OnChange — callback can manage subscriptions while handling an event // --------------------------------------------------------------------------- diff --git a/journal.go b/journal.go new file mode 100644 index 0000000..04b255d --- /dev/null +++ b/journal.go @@ -0,0 +1,291 @@ +package store + +import ( + "database/sql" + "regexp" + "strconv" + "time" + + core "dappco.re/go/core" +) + +const ( + journalEntriesTableName = "journal_entries" + defaultJournalBucket = "store" +) + +const createJournalEntriesTableSQL = `CREATE TABLE IF NOT EXISTS journal_entries ( + entry_id INTEGER PRIMARY KEY AUTOINCREMENT, + bucket_name TEXT NOT NULL, + measurement TEXT NOT NULL, + fields_json TEXT NOT NULL, + tags_json TEXT NOT NULL, + committed_at INTEGER NOT NULL, + archived_at INTEGER +)` + +var ( + journalBucketPattern = regexp.MustCompile(`bucket:\s*"([^"]+)"`) + journalRangePattern = regexp.MustCompile(`range\(\s*start:\s*([^)]+)\)`) + journalMeasurementPattern = regexp.MustCompile(`(?:_measurement|measurement)\s*==\s*"([^"]+)"`) +) + +type journalExecutor interface { + Exec(query string, args ...any) (sql.Result, error) +} + +// CommitToJournal records one completed unit of work in the store journal. +// Usage example: `result := storeInstance.CommitToJournal("scroll-session", map[string]any{"like": 4}, map[string]string{"workspace": "scroll-session"})` +func (storeInstance *Store) CommitToJournal(measurement string, fields map[string]any, tags map[string]string) core.Result { + if measurement == "" { + return core.Result{Value: core.E("store.CommitToJournal", "measurement is empty", nil), OK: false} + } + if fields == nil { + fields = map[string]any{} + } + if tags == nil { + tags = map[string]string{} + } + if err := ensureJournalSchema(storeInstance.database); err != nil { + return core.Result{Value: core.E("store.CommitToJournal", "ensure journal schema", err), OK: false} + } + + fieldsJSON, err := jsonString(fields, "store.CommitToJournal", "marshal fields") + if err != nil { + return core.Result{Value: err, OK: false} + } + tagsJSON, err := jsonString(tags, "store.CommitToJournal", "marshal tags") + if err != nil { + return core.Result{Value: err, OK: false} + } + + committedAt := time.Now().UnixMilli() + if err := insertJournalEntry( + storeInstance.database, + storeInstance.journalBucket(), + measurement, + fieldsJSON, + tagsJSON, + committedAt, + ); err != nil { + return core.Result{Value: core.E("store.CommitToJournal", "insert journal entry", err), OK: false} + } + + return core.Result{ + Value: map[string]any{ + "bucket": storeInstance.journalBucket(), + "measurement": measurement, + "fields": fields, + "tags": tags, + "committed_at": committedAt, + }, + OK: true, + } +} + +// QueryJournal reads journal rows either through a small Flux-like filter +// surface or a direct SQL SELECT against the internal journal table. +// Usage example: `result := storeInstance.QueryJournal(\`from(bucket: "store") |> range(start: -24h)\`)` +func (storeInstance *Store) QueryJournal(flux string) core.Result { + if err := ensureJournalSchema(storeInstance.database); err != nil { + return core.Result{Value: core.E("store.QueryJournal", "ensure journal schema", err), OK: false} + } + + trimmedQuery := core.Trim(flux) + if trimmedQuery == "" { + return storeInstance.queryJournalRows( + "SELECT bucket_name, measurement, fields_json, tags_json, committed_at, archived_at FROM " + journalEntriesTableName + " WHERE archived_at IS NULL ORDER BY committed_at", + ) + } + if core.HasPrefix(trimmedQuery, "SELECT") || core.HasPrefix(trimmedQuery, "select") { + return storeInstance.queryJournalRows(trimmedQuery) + } + + selectSQL, arguments, err := storeInstance.queryJournalFlux(trimmedQuery) + if err != nil { + return core.Result{Value: err, OK: false} + } + return storeInstance.queryJournalRows(selectSQL, arguments...) +} + +func (storeInstance *Store) queryJournalRows(query string, arguments ...any) core.Result { + rows, err := storeInstance.database.Query(query, arguments...) + if err != nil { + return core.Result{Value: core.E("store.QueryJournal", "query rows", err), OK: false} + } + defer rows.Close() + + rowMaps, err := queryRowsAsMaps(rows) + if err != nil { + return core.Result{Value: core.E("store.QueryJournal", "scan rows", err), OK: false} + } + return core.Result{Value: inflateJournalRows(rowMaps), OK: true} +} + +func (storeInstance *Store) queryJournalFlux(flux string) (string, []any, error) { + builder := core.NewBuilder() + builder.WriteString("SELECT bucket_name, measurement, fields_json, tags_json, committed_at, archived_at FROM ") + builder.WriteString(journalEntriesTableName) + builder.WriteString(" WHERE archived_at IS NULL") + + var arguments []any + if bucket := quotedSubmatch(journalBucketPattern, flux); bucket != "" { + builder.WriteString(" AND bucket_name = ?") + arguments = append(arguments, bucket) + } + if measurement := quotedSubmatch(journalMeasurementPattern, flux); measurement != "" { + builder.WriteString(" AND measurement = ?") + arguments = append(arguments, measurement) + } + + rangeMatch := quotedSubmatch(journalRangePattern, flux) + if rangeMatch == "" { + rangeMatch = regexpSubmatch(journalRangePattern, flux, 1) + } + if rangeMatch != "" { + startTime, err := fluxStartTime(core.Trim(rangeMatch)) + if err != nil { + return "", nil, core.E("store.QueryJournal", "parse range", err) + } + builder.WriteString(" AND committed_at >= ?") + arguments = append(arguments, startTime.UnixMilli()) + } + + builder.WriteString(" ORDER BY committed_at") + return builder.String(), arguments, nil +} + +func (storeInstance *Store) journalBucket() string { + if storeInstance.journal.bucket == "" { + return defaultJournalBucket + } + return storeInstance.journal.bucket +} + +func ensureJournalSchema(database schemaDatabase) error { + if _, err := database.Exec(createJournalEntriesTableSQL); err != nil { + return err + } + if _, err := database.Exec( + "CREATE INDEX IF NOT EXISTS journal_entries_bucket_committed_at_idx ON " + journalEntriesTableName + " (bucket_name, committed_at)", + ); err != nil { + return err + } + return nil +} + +func insertJournalEntry( + executor journalExecutor, + bucket, measurement, fieldsJSON, tagsJSON string, + committedAt int64, +) error { + _, err := executor.Exec( + "INSERT INTO "+journalEntriesTableName+" (bucket_name, measurement, fields_json, tags_json, committed_at, archived_at) VALUES (?, ?, ?, ?, ?, NULL)", + bucket, + measurement, + fieldsJSON, + tagsJSON, + committedAt, + ) + return err +} + +func jsonString(value any, operation, message string) (string, error) { + result := core.JSONMarshal(value) + if !result.OK { + return "", core.E(operation, message, result.Value.(error)) + } + return string(result.Value.([]byte)), nil +} + +func fluxStartTime(value string) (time.Time, error) { + if value == "" { + return time.Time{}, core.E("store.fluxStartTime", "range value is empty", nil) + } + if core.HasSuffix(value, "d") { + days, err := strconv.Atoi(core.TrimSuffix(value, "d")) + if err != nil { + return time.Time{}, err + } + return time.Now().Add(time.Duration(days) * 24 * time.Hour), nil + } + lookback, err := time.ParseDuration(value) + if err != nil { + return time.Time{}, err + } + return time.Now().Add(lookback), nil +} + +func quotedSubmatch(pattern *regexp.Regexp, value string) string { + match := pattern.FindStringSubmatch(value) + if len(match) < 2 { + return "" + } + return match[1] +} + +func regexpSubmatch(pattern *regexp.Regexp, value string, index int) string { + match := pattern.FindStringSubmatch(value) + if len(match) <= index { + return "" + } + return match[index] +} + +func queryRowsAsMaps(rows *sql.Rows) ([]map[string]any, error) { + columnNames, err := rows.Columns() + if err != nil { + return nil, err + } + + var result []map[string]any + for rows.Next() { + rawValues := make([]any, len(columnNames)) + scanTargets := make([]any, len(columnNames)) + for i := range rawValues { + scanTargets[i] = &rawValues[i] + } + if err := rows.Scan(scanTargets...); err != nil { + return nil, err + } + + row := make(map[string]any, len(columnNames)) + for i, columnName := range columnNames { + row[columnName] = normaliseRowValue(rawValues[i]) + } + result = append(result, row) + } + if err := rows.Err(); err != nil { + return nil, err + } + return result, nil +} + +func inflateJournalRows(rows []map[string]any) []map[string]any { + for _, row := range rows { + if fieldsJSON, ok := row["fields_json"].(string); ok { + fields := make(map[string]any) + result := core.JSONUnmarshalString(fieldsJSON, &fields) + if result.OK { + row["fields"] = fields + } + } + if tagsJSON, ok := row["tags_json"].(string); ok { + tags := make(map[string]string) + result := core.JSONUnmarshalString(tagsJSON, &tags) + if result.OK { + row["tags"] = tags + } + } + } + return rows +} + +func normaliseRowValue(value any) any { + switch typedValue := value.(type) { + case []byte: + return string(typedValue) + default: + return typedValue + } +} diff --git a/journal_test.go b/journal_test.go new file mode 100644 index 0000000..9801079 --- /dev/null +++ b/journal_test.go @@ -0,0 +1,69 @@ +package store + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestJournal_CommitToJournal_Good_WithQueryJournalSQL(t *testing.T) { + storeInstance, err := New(":memory:", WithJournal("http://127.0.0.1:8086", "core", "events")) + require.NoError(t, err) + defer storeInstance.Close() + + first := storeInstance.CommitToJournal("session-a", map[string]any{"like": 4}, map[string]string{"workspace": "session-a"}) + second := storeInstance.CommitToJournal("session-b", map[string]any{"profile_match": 2}, map[string]string{"workspace": "session-b"}) + require.True(t, first.OK, "first journal commit failed: %v", first.Value) + require.True(t, second.OK, "second journal commit failed: %v", second.Value) + + rows := requireResultRows( + t, + storeInstance.QueryJournal("SELECT bucket_name, measurement, fields_json, tags_json FROM journal_entries ORDER BY entry_id"), + ) + require.Len(t, rows, 2) + assert.Equal(t, "events", rows[0]["bucket_name"]) + assert.Equal(t, "session-a", rows[0]["measurement"]) + + fields, ok := rows[0]["fields"].(map[string]any) + require.True(t, ok, "unexpected fields type: %T", rows[0]["fields"]) + assert.Equal(t, float64(4), fields["like"]) + + tags, ok := rows[1]["tags"].(map[string]string) + require.True(t, ok, "unexpected tags type: %T", rows[1]["tags"]) + assert.Equal(t, "session-b", tags["workspace"]) +} + +func TestJournal_QueryJournal_Good_FluxFilters(t *testing.T) { + storeInstance, err := New(":memory:", WithJournal("http://127.0.0.1:8086", "core", "events")) + require.NoError(t, err) + defer storeInstance.Close() + + require.True(t, + storeInstance.CommitToJournal("session-a", map[string]any{"like": 1}, map[string]string{"workspace": "session-a"}).OK, + ) + require.True(t, + storeInstance.CommitToJournal("session-b", map[string]any{"like": 2}, map[string]string{"workspace": "session-b"}).OK, + ) + + rows := requireResultRows( + t, + storeInstance.QueryJournal(`from(bucket: "events") |> range(start: -24h) |> filter(fn: (r) => r._measurement == "session-b")`), + ) + require.Len(t, rows, 1) + assert.Equal(t, "session-b", rows[0]["measurement"]) + + fields, ok := rows[0]["fields"].(map[string]any) + require.True(t, ok, "unexpected fields type: %T", rows[0]["fields"]) + assert.Equal(t, float64(2), fields["like"]) +} + +func TestJournal_CommitToJournal_Bad_EmptyMeasurement(t *testing.T) { + storeInstance, err := New(":memory:") + require.NoError(t, err) + defer storeInstance.Close() + + result := storeInstance.CommitToJournal("", map[string]any{"like": 1}, map[string]string{"workspace": "missing"}) + require.False(t, result.OK) + assert.Contains(t, result.Value.(error).Error(), "measurement is empty") +} diff --git a/scope.go b/scope.go index ef8c05f..c2a275f 100644 --- a/scope.go +++ b/scope.go @@ -11,6 +11,8 @@ import ( // validNamespace.MatchString("tenant-a") is true; validNamespace.MatchString("tenant_a") is false. var validNamespace = regexp.MustCompile(`^[a-zA-Z0-9-]+$`) +const defaultScopedGroupName = "default" + // QuotaConfig sets per-namespace key and group limits. // Usage example: `quota := store.QuotaConfig{MaxKeys: 100, MaxGroups: 10}` type QuotaConfig struct { @@ -25,7 +27,8 @@ type QuotaConfig struct { type ScopedStore struct { storeInstance *Store namespace string - quota QuotaConfig + MaxKeys int + MaxGroups int } // NewScoped validates a namespace and prefixes groups with namespace + ":". @@ -55,7 +58,8 @@ func NewScopedWithQuota(storeInstance *Store, namespace string, quota QuotaConfi nil, ) } - scopedStore.quota = quota + scopedStore.MaxKeys = quota.MaxKeys + scopedStore.MaxGroups = quota.MaxGroups return scopedStore, nil } @@ -67,6 +71,10 @@ func (scopedStore *ScopedStore) namespacePrefix() string { return scopedStore.namespace + ":" } +func (scopedStore *ScopedStore) defaultGroup() string { + return defaultScopedGroupName +} + func (scopedStore *ScopedStore) trimNamespacePrefix(groupName string) string { return core.TrimPrefix(groupName, scopedStore.namespacePrefix()) } @@ -77,19 +85,41 @@ func (scopedStore *ScopedStore) Namespace() string { return scopedStore.namespace } +// Usage example: `colourValue, err := scopedStore.Get("colour")` // Usage example: `colourValue, err := scopedStore.Get("config", "colour")` -func (scopedStore *ScopedStore) Get(group, key string) (string, error) { +func (scopedStore *ScopedStore) Get(arguments ...string) (string, error) { + group, key, err := scopedStore.getArguments(arguments) + if err != nil { + return "", err + } return scopedStore.storeInstance.Get(scopedStore.namespacedGroup(group), key) } +// GetFrom reads a key from an explicit namespaced group. +// Usage example: `colourValue, err := scopedStore.GetFrom("config", "colour")` +func (scopedStore *ScopedStore) GetFrom(group, key string) (string, error) { + return scopedStore.Get(group, key) +} + +// Usage example: `if err := scopedStore.Set("colour", "blue"); err != nil { return }` // Usage example: `if err := scopedStore.Set("config", "colour", "blue"); err != nil { return }` -func (scopedStore *ScopedStore) Set(group, key, value string) error { +func (scopedStore *ScopedStore) Set(arguments ...string) error { + group, key, value, err := scopedStore.setArguments(arguments) + if err != nil { + return err + } if err := scopedStore.checkQuota("store.ScopedStore.Set", group, key); err != nil { return err } return scopedStore.storeInstance.Set(scopedStore.namespacedGroup(group), key, value) } +// SetIn writes a key to an explicit namespaced group. +// Usage example: `if err := scopedStore.SetIn("config", "colour", "blue"); err != nil { return }` +func (scopedStore *ScopedStore) SetIn(group, key, value string) error { + return scopedStore.Set(group, key, value) +} + // Usage example: `if err := scopedStore.SetWithTTL("sessions", "token", "abc123", time.Hour); err != nil { return }` func (scopedStore *ScopedStore) SetWithTTL(group, key, value string, timeToLive time.Duration) error { if err := scopedStore.checkQuota("store.ScopedStore.SetWithTTL", group, key); err != nil { @@ -118,19 +148,26 @@ func (scopedStore *ScopedStore) All(group string) iter.Seq2[KeyValue, error] { return scopedStore.storeInstance.All(scopedStore.namespacedGroup(group)) } +// Usage example: `for entry, err := range scopedStore.AllSeq("config") { if err != nil { break }; fmt.Println(entry.Key, entry.Value) }` +func (scopedStore *ScopedStore) AllSeq(group string) iter.Seq2[KeyValue, error] { + return scopedStore.All(group) +} + // Usage example: `keyCount, err := scopedStore.Count("config")` func (scopedStore *ScopedStore) Count(group string) (int, error) { return scopedStore.storeInstance.Count(scopedStore.namespacedGroup(group)) } // Usage example: `keyCount, err := scopedStore.CountAll("config")` -func (scopedStore *ScopedStore) CountAll(groupPrefix string) (int, error) { - return scopedStore.storeInstance.CountAll(scopedStore.namespacedGroup(groupPrefix)) +// Usage example: `keyCount, err := scopedStore.CountAll()` +func (scopedStore *ScopedStore) CountAll(groupPrefix ...string) (int, error) { + return scopedStore.storeInstance.CountAll(scopedStore.namespacedGroup(firstString(groupPrefix))) } // Usage example: `groupNames, err := scopedStore.Groups("config")` -func (scopedStore *ScopedStore) Groups(groupPrefix string) ([]string, error) { - groupNames, err := scopedStore.storeInstance.Groups(scopedStore.namespacedGroup(groupPrefix)) +// Usage example: `groupNames, err := scopedStore.Groups()` +func (scopedStore *ScopedStore) Groups(groupPrefix ...string) ([]string, error) { + groupNames, err := scopedStore.storeInstance.Groups(scopedStore.namespacedGroup(firstString(groupPrefix))) if err != nil { return nil, err } @@ -141,10 +178,11 @@ func (scopedStore *ScopedStore) Groups(groupPrefix string) ([]string, error) { } // Usage example: `for groupName, err := range scopedStore.GroupsSeq("config") { if err != nil { break }; fmt.Println(groupName) }` -func (scopedStore *ScopedStore) GroupsSeq(groupPrefix string) iter.Seq2[string, error] { +// Usage example: `for groupName, err := range scopedStore.GroupsSeq() { if err != nil { break }; fmt.Println(groupName) }` +func (scopedStore *ScopedStore) GroupsSeq(groupPrefix ...string) iter.Seq2[string, error] { return func(yield func(string, error) bool) { namespacePrefix := scopedStore.namespacePrefix() - for groupName, err := range scopedStore.storeInstance.GroupsSeq(scopedStore.namespacedGroup(groupPrefix)) { + for groupName, err := range scopedStore.storeInstance.GroupsSeq(scopedStore.namespacedGroup(firstString(groupPrefix))) { if err != nil { if !yield("", err) { return @@ -187,7 +225,7 @@ func (scopedStore *ScopedStore) PurgeExpired() (int64, error) { // group would exceed the configured limit. Existing keys are treated as // upserts and do not consume quota. func (scopedStore *ScopedStore) checkQuota(operation, group, key string) error { - if scopedStore.quota.MaxKeys == 0 && scopedStore.quota.MaxGroups == 0 { + if scopedStore.MaxKeys == 0 && scopedStore.MaxGroups == 0 { return nil } @@ -206,18 +244,18 @@ func (scopedStore *ScopedStore) checkQuota(operation, group, key string) error { } // Check MaxKeys quota. - if scopedStore.quota.MaxKeys > 0 { + if scopedStore.MaxKeys > 0 { keyCount, err := scopedStore.storeInstance.CountAll(namespacePrefix) if err != nil { return core.E(operation, "quota check", err) } - if keyCount >= scopedStore.quota.MaxKeys { - return core.E(operation, core.Sprintf("key limit (%d)", scopedStore.quota.MaxKeys), QuotaExceededError) + if keyCount >= scopedStore.MaxKeys { + return core.E(operation, core.Sprintf("key limit (%d)", scopedStore.MaxKeys), QuotaExceededError) } } // Check MaxGroups quota — only if this would create a new group. - if scopedStore.quota.MaxGroups > 0 { + if scopedStore.MaxGroups > 0 { existingGroupCount, err := scopedStore.storeInstance.Count(namespacedGroup) if err != nil { return core.E(operation, "quota check", err) @@ -231,11 +269,41 @@ func (scopedStore *ScopedStore) checkQuota(operation, group, key string) error { } knownGroupCount++ } - if knownGroupCount >= scopedStore.quota.MaxGroups { - return core.E(operation, core.Sprintf("group limit (%d)", scopedStore.quota.MaxGroups), QuotaExceededError) + if knownGroupCount >= scopedStore.MaxGroups { + return core.E(operation, core.Sprintf("group limit (%d)", scopedStore.MaxGroups), QuotaExceededError) } } } return nil } + +func (scopedStore *ScopedStore) getArguments(arguments []string) (string, string, error) { + switch len(arguments) { + case 1: + return scopedStore.defaultGroup(), arguments[0], nil + case 2: + return arguments[0], arguments[1], nil + default: + return "", "", core.E( + "store.ScopedStore.Get", + core.Sprintf("expected 1 or 2 arguments; got %d", len(arguments)), + nil, + ) + } +} + +func (scopedStore *ScopedStore) setArguments(arguments []string) (string, string, string, error) { + switch len(arguments) { + case 2: + return scopedStore.defaultGroup(), arguments[0], arguments[1], nil + case 3: + return arguments[0], arguments[1], arguments[2], nil + default: + return "", "", "", core.E( + "store.ScopedStore.Set", + core.Sprintf("expected 2 or 3 arguments; got %d", len(arguments)), + nil, + ) + } +} diff --git a/scope_test.go b/scope_test.go index ee76260..e7fda9b 100644 --- a/scope_test.go +++ b/scope_test.go @@ -94,6 +94,17 @@ func TestScope_NewScopedWithQuota_Bad_NegativeMaxGroups(t *testing.T) { assert.Contains(t, err.Error(), "zero or positive") } +func TestScope_NewScopedWithQuota_Good_InlineQuotaFields(t *testing.T) { + storeInstance, _ := New(":memory:") + defer storeInstance.Close() + + scopedStore, err := NewScopedWithQuota(storeInstance, "tenant-a", QuotaConfig{MaxKeys: 4, MaxGroups: 2}) + require.NoError(t, err) + + assert.Equal(t, 4, scopedStore.MaxKeys) + assert.Equal(t, 2, scopedStore.MaxGroups) +} + // --------------------------------------------------------------------------- // ScopedStore — basic CRUD // --------------------------------------------------------------------------- @@ -110,6 +121,34 @@ func TestScope_ScopedStore_Good_SetGet(t *testing.T) { assert.Equal(t, "dark", value) } +func TestScope_ScopedStore_Good_DefaultGroupHelpers(t *testing.T) { + storeInstance, _ := New(":memory:") + defer storeInstance.Close() + + scopedStore, _ := NewScoped(storeInstance, "tenant-a") + require.NoError(t, scopedStore.Set("theme", "dark")) + + value, err := scopedStore.Get("theme") + require.NoError(t, err) + assert.Equal(t, "dark", value) + + rawValue, err := storeInstance.Get("tenant-a:default", "theme") + require.NoError(t, err) + assert.Equal(t, "dark", rawValue) +} + +func TestScope_ScopedStore_Good_SetInGetFrom(t *testing.T) { + storeInstance, _ := New(":memory:") + defer storeInstance.Close() + + scopedStore, _ := NewScoped(storeInstance, "tenant-a") + require.NoError(t, scopedStore.SetIn("config", "theme", "dark")) + + value, err := scopedStore.GetFrom("config", "theme") + require.NoError(t, err) + assert.Equal(t, "dark", value) +} + func TestScope_ScopedStore_Good_PrefixedInUnderlyingStore(t *testing.T) { storeInstance, _ := New(":memory:") defer storeInstance.Close() diff --git a/store.go b/store.go index 49db3c4..7eb3ab5 100644 --- a/store.go +++ b/store.go @@ -29,14 +29,25 @@ const ( entryValueColumn = "entry_value" ) +// StoreOption customises Store construction. +// Usage example: `storeInstance, err := store.New("/tmp/go-store.db", store.WithJournal("http://127.0.0.1:8086", "core", "events"))` +type StoreOption func(*Store) + +type journalConfig struct { + url string + org string + bucket string +} + // Store provides SQLite-backed grouped entries with TTL expiry, namespace -// isolation, and reactive change notifications. +// isolation, reactive change notifications, and optional journal support. // Usage example: `storeInstance, err := store.New(":memory:"); if err != nil { return }; if err := storeInstance.Set("config", "colour", "blue"); err != nil { return }` type Store struct { database *sql.DB cancelPurge context.CancelFunc purgeWaitGroup sync.WaitGroup purgeInterval time.Duration // interval between background purge cycles + journal journalConfig // Event dispatch state. watchers []*Watcher @@ -47,8 +58,18 @@ type Store struct { nextCallbackRegistrationID uint64 // monotonic ID for callback registrations } -// Usage example: `storeInstance, err := store.New(":memory:"); if err != nil { return }` -func New(databasePath string) (*Store, error) { +// WithJournal records journal connection metadata for workspace commits, +// journal queries, and archive generation. +// Usage example: `storeInstance, err := store.New("/tmp/go-store.db", store.WithJournal("http://127.0.0.1:8086", "core", "events"))` +func WithJournal(url, org, bucket string) StoreOption { + return func(storeInstance *Store) { + storeInstance.journal = journalConfig{url: url, org: org, bucket: bucket} + } +} + +// Usage example: `storeInstance, err := store.New(":memory:")` +// Usage example: `storeInstance, err := store.New("/tmp/go-store.db", store.WithJournal("http://127.0.0.1:8086", "core", "events"))` +func New(databasePath string, options ...StoreOption) (*Store, error) { sqliteDatabase, err := sql.Open("sqlite", databasePath) if err != nil { return nil, core.E("store.New", "open database", err) @@ -73,6 +94,11 @@ func New(databasePath string) (*Store, error) { purgeContext, cancel := context.WithCancel(context.Background()) storeInstance := &Store{database: sqliteDatabase, cancelPurge: cancel, purgeInterval: 60 * time.Second} + for _, option := range options { + if option != nil { + option(storeInstance) + } + } storeInstance.startBackgroundPurge(purgeContext) return storeInstance, nil } @@ -236,6 +262,11 @@ func (storeInstance *Store) All(group string) iter.Seq2[KeyValue, error] { } } +// Usage example: `for entry, err := range storeInstance.AllSeq("config") { if err != nil { break }; fmt.Println(entry.Key, entry.Value) }` +func (storeInstance *Store) AllSeq(group string) iter.Seq2[KeyValue, error] { + return storeInstance.All(group) +} + // Usage example: `parts, err := storeInstance.GetSplit("config", "hosts", ","); if err != nil { return }; for part := range parts { fmt.Println(part) }` func (storeInstance *Store) GetSplit(group, key, separator string) (iter.Seq[string], error) { value, err := storeInstance.Get(group, key) @@ -297,9 +328,10 @@ func (storeInstance *Store) CountAll(groupPrefix string) (int, error) { } // Usage example: `tenantGroupNames, err := storeInstance.Groups("tenant-a:")` -func (storeInstance *Store) Groups(groupPrefix string) ([]string, error) { +// Usage example: `allGroupNames, err := storeInstance.Groups()` +func (storeInstance *Store) Groups(groupPrefix ...string) ([]string, error) { var groupNames []string - for groupName, err := range storeInstance.GroupsSeq(groupPrefix) { + for groupName, err := range storeInstance.GroupsSeq(groupPrefix...) { if err != nil { return nil, err } @@ -309,12 +341,14 @@ func (storeInstance *Store) Groups(groupPrefix string) ([]string, error) { } // Usage example: `for tenantGroupName, err := range storeInstance.GroupsSeq("tenant-a:") { if err != nil { break }; fmt.Println(tenantGroupName) }` -func (storeInstance *Store) GroupsSeq(groupPrefix string) iter.Seq2[string, error] { +// Usage example: `for groupName, err := range storeInstance.GroupsSeq() { if err != nil { break }; fmt.Println(groupName) }` +func (storeInstance *Store) GroupsSeq(groupPrefix ...string) iter.Seq2[string, error] { + actualGroupPrefix := firstString(groupPrefix) return func(yield func(string, error) bool) { var rows *sql.Rows var err error now := time.Now().UnixMilli() - if groupPrefix == "" { + if actualGroupPrefix == "" { rows, err = storeInstance.database.Query( "SELECT DISTINCT "+entryGroupColumn+" FROM "+entriesTableName+" WHERE (expires_at IS NULL OR expires_at > ?) ORDER BY "+entryGroupColumn, now, @@ -322,7 +356,7 @@ func (storeInstance *Store) GroupsSeq(groupPrefix string) iter.Seq2[string, erro } else { rows, err = storeInstance.database.Query( "SELECT DISTINCT "+entryGroupColumn+" FROM "+entriesTableName+" WHERE "+entryGroupColumn+" LIKE ? ESCAPE '^' AND (expires_at IS NULL OR expires_at > ?) ORDER BY "+entryGroupColumn, - escapeLike(groupPrefix)+"%", now, + escapeLike(actualGroupPrefix)+"%", now, ) } if err != nil { @@ -349,6 +383,13 @@ func (storeInstance *Store) GroupsSeq(groupPrefix string) iter.Seq2[string, erro } } +func firstString(values []string) string { + if len(values) == 0 { + return "" + } + return values[0] +} + // escapeLike("tenant_%") returns "tenant^_^%" so LIKE queries treat wildcards // literally. func escapeLike(text string) string { diff --git a/store_test.go b/store_test.go index 0089aac..6dc2e44 100644 --- a/store_test.go +++ b/store_test.go @@ -98,6 +98,16 @@ func TestStore_New_Good_WALMode(t *testing.T) { assert.Equal(t, "wal", mode, "journal_mode should be WAL") } +func TestStore_New_Good_WithJournalOption(t *testing.T) { + storeInstance, err := New(":memory:", WithJournal("http://127.0.0.1:8086", "core", "events")) + require.NoError(t, err) + defer storeInstance.Close() + + assert.Equal(t, "events", storeInstance.journal.bucket) + assert.Equal(t, "core", storeInstance.journal.org) + assert.Equal(t, "http://127.0.0.1:8086", storeInstance.journal.url) +} + // --------------------------------------------------------------------------- // Set / Get — core CRUD // --------------------------------------------------------------------------- @@ -372,6 +382,22 @@ func TestStore_All_Good_SortedByKey(t *testing.T) { assert.Equal(t, []string{"alpha", "bravo", "charlie"}, keys) } +func TestStore_AllSeq_Good_Alias(t *testing.T) { + storeInstance, _ := New(":memory:") + defer storeInstance.Close() + + require.NoError(t, storeInstance.Set("g", "alpha", "1")) + require.NoError(t, storeInstance.Set("g", "bravo", "2")) + + var keys []string + for entry, err := range storeInstance.AllSeq("g") { + require.NoError(t, err) + keys = append(keys, entry.Key) + } + + assert.Equal(t, []string{"alpha", "bravo"}, keys) +} + func TestStore_All_Bad_ClosedStore(t *testing.T) { storeInstance, _ := New(":memory:") storeInstance.Close() @@ -434,6 +460,22 @@ func TestStore_GroupsSeq_Good_SortedByGroupName(t *testing.T) { assert.Equal(t, []string{"alpha", "bravo", "charlie"}, groups) } +func TestStore_GroupsSeq_Good_DefaultArgument(t *testing.T) { + storeInstance, _ := New(":memory:") + defer storeInstance.Close() + + require.NoError(t, storeInstance.Set("alpha", "a", "1")) + require.NoError(t, storeInstance.Set("beta", "b", "2")) + + var groups []string + for group, err := range storeInstance.GroupsSeq() { + require.NoError(t, err) + groups = append(groups, group) + } + + assert.Equal(t, []string{"alpha", "beta"}, groups) +} + func TestStore_GroupsSeq_Bad_ClosedStore(t *testing.T) { storeInstance, _ := New(":memory:") storeInstance.Close() diff --git a/test_helpers_test.go b/test_helpers_test.go index a7a5c12..8d4a052 100644 --- a/test_helpers_test.go +++ b/test_helpers_test.go @@ -43,3 +43,38 @@ func repeatString(value string, count int) string { } return builder.String() } + +func useWorkspaceStateDirectory(tb testing.TB) string { + tb.Helper() + + previous := defaultWorkspaceStateDirectory + stateDirectory := testPath(tb, "state") + defaultWorkspaceStateDirectory = stateDirectory + tb.Cleanup(func() { + defaultWorkspaceStateDirectory = previous + _ = testFilesystem().DeleteAll(stateDirectory) + }) + return stateDirectory +} + +func useArchiveOutputDirectory(tb testing.TB) string { + tb.Helper() + + previous := defaultArchiveOutputDirectory + outputDirectory := testPath(tb, "archive") + defaultArchiveOutputDirectory = outputDirectory + tb.Cleanup(func() { + defaultArchiveOutputDirectory = previous + _ = testFilesystem().DeleteAll(outputDirectory) + }) + return outputDirectory +} + +func requireResultRows(tb testing.TB, result core.Result) []map[string]any { + tb.Helper() + + require.True(tb, result.OK, "core result failed: %v", result.Value) + rows, ok := result.Value.([]map[string]any) + require.True(tb, ok, "unexpected row type: %T", result.Value) + return rows +} diff --git a/workspace.go b/workspace.go new file mode 100644 index 0000000..2f8753d --- /dev/null +++ b/workspace.go @@ -0,0 +1,326 @@ +package store + +import ( + "database/sql" + "io/fs" + "sync" + "time" + + core "dappco.re/go/core" +) + +const ( + workspaceEntriesTableName = "workspace_entries" + workspaceIdentityGroupName = "workspace" +) + +const createWorkspaceEntriesTableSQL = `CREATE TABLE IF NOT EXISTS workspace_entries ( + entry_id INTEGER PRIMARY KEY AUTOINCREMENT, + entry_kind TEXT NOT NULL, + entry_data TEXT NOT NULL, + created_at INTEGER NOT NULL +)` + +var defaultWorkspaceStateDirectory = ".core/state" + +// Workspace accumulates mutable work-in-progress entries before they are +// committed to the durable store journal. +// Usage example: `workspace, err := storeInstance.NewWorkspace("scroll-session-2026-03-30"); if err != nil { return }; defer workspace.Discard()` +type Workspace struct { + name string + storeInstance *Store + database *sql.DB + databasePath string + filesystem *core.Fs + + closeLock sync.Mutex + closed bool +} + +// NewWorkspace creates a workspace state file under `.core/state/`. +// Usage example: `workspace, err := storeInstance.NewWorkspace("scroll-session-2026-03-30")` +func (storeInstance *Store) NewWorkspace(name string) (*Workspace, error) { + validation := core.ValidateName(name) + if !validation.OK { + return nil, core.E("store.NewWorkspace", "validate workspace name", validation.Value.(error)) + } + + filesystem := (&core.Fs{}).NewUnrestricted() + databasePath := workspaceFilePath(defaultWorkspaceStateDirectory, name) + if filesystem.Exists(databasePath) { + return nil, core.E("store.NewWorkspace", core.Concat("workspace already exists: ", name), nil) + } + if result := filesystem.EnsureDir(defaultWorkspaceStateDirectory); !result.OK { + return nil, core.E("store.NewWorkspace", "ensure state directory", result.Value.(error)) + } + + workspaceDatabase, err := openWorkspaceDatabase(databasePath) + if err != nil { + return nil, core.E("store.NewWorkspace", "open workspace database", err) + } + + return &Workspace{ + name: name, + storeInstance: storeInstance, + database: workspaceDatabase, + databasePath: databasePath, + filesystem: filesystem, + }, nil +} + +// RecoverOrphans opens any leftover workspace files so callers can inspect and +// decide whether to commit or discard them. +// Usage example: `orphans := storeInstance.RecoverOrphans(".core/state")` +func (storeInstance *Store) RecoverOrphans(stateDirectory string) []*Workspace { + if stateDirectory == "" { + stateDirectory = defaultWorkspaceStateDirectory + } + + filesystem := (&core.Fs{}).NewUnrestricted() + if !filesystem.Exists(stateDirectory) { + return nil + } + + listResult := filesystem.List(stateDirectory) + if !listResult.OK { + return nil + } + + entries, ok := listResult.Value.([]fs.DirEntry) + if !ok { + return nil + } + + var workspaces []*Workspace + for _, dirEntry := range entries { + if dirEntry.IsDir() || !core.HasSuffix(dirEntry.Name(), ".duckdb") { + continue + } + name := core.TrimSuffix(dirEntry.Name(), ".duckdb") + databasePath := workspaceFilePath(stateDirectory, name) + workspaceDatabase, err := openWorkspaceDatabase(databasePath) + if err != nil { + continue + } + workspaces = append(workspaces, &Workspace{ + name: name, + storeInstance: storeInstance, + database: workspaceDatabase, + databasePath: databasePath, + filesystem: filesystem, + }) + } + return workspaces +} + +// Put appends one entry to the workspace buffer. +// Usage example: `err := workspace.Put("like", map[string]any{"user": "@alice", "post": "video_123"})` +func (workspace *Workspace) Put(kind string, data map[string]any) error { + if kind == "" { + return core.E("store.Workspace.Put", "kind is empty", nil) + } + if data == nil { + data = map[string]any{} + } + + dataJSON, err := jsonString(data, "store.Workspace.Put", "marshal entry data") + if err != nil { + return err + } + + _, err = workspace.database.Exec( + "INSERT INTO "+workspaceEntriesTableName+" (entry_kind, entry_data, created_at) VALUES (?, ?, ?)", + kind, + dataJSON, + time.Now().UnixMilli(), + ) + if err != nil { + return core.E("store.Workspace.Put", "insert entry", err) + } + return nil +} + +// Aggregate returns the current per-kind entry counts in the workspace. +// Usage example: `summary := workspace.Aggregate()` +func (workspace *Workspace) Aggregate() map[string]any { + fields, err := workspace.aggregateFields() + if err != nil { + return map[string]any{} + } + return fields +} + +// Commit writes the aggregated workspace state to the journal and updates the +// store summary entry for the workspace. +// Usage example: `result := workspace.Commit()` +func (workspace *Workspace) Commit() core.Result { + fields, err := workspace.aggregateFields() + if err != nil { + return core.Result{Value: core.E("store.Workspace.Commit", "aggregate workspace", err), OK: false} + } + if err := workspace.storeInstance.commitWorkspaceAggregate(workspace.name, fields); err != nil { + return core.Result{Value: err, OK: false} + } + if err := workspace.closeAndDelete(); err != nil { + return core.Result{Value: err, OK: false} + } + return core.Result{Value: fields, OK: true} +} + +// Discard closes the workspace and removes its backing file. +// Usage example: `workspace.Discard()` +func (workspace *Workspace) Discard() { + _ = workspace.closeAndDelete() +} + +// Query runs ad-hoc SQL against the workspace buffer. +// Usage example: `result := workspace.Query("SELECT entry_kind, COUNT(*) AS count FROM workspace_entries GROUP BY entry_kind")` +func (workspace *Workspace) Query(sqlQuery string) core.Result { + rows, err := workspace.database.Query(sqlQuery) + if err != nil { + return core.Result{Value: core.E("store.Workspace.Query", "query workspace", err), OK: false} + } + defer rows.Close() + + rowMaps, err := queryRowsAsMaps(rows) + if err != nil { + return core.Result{Value: core.E("store.Workspace.Query", "scan rows", err), OK: false} + } + return core.Result{Value: rowMaps, OK: true} +} + +func (workspace *Workspace) aggregateFields() (map[string]any, error) { + rows, err := workspace.database.Query( + "SELECT entry_kind, COUNT(*) FROM " + workspaceEntriesTableName + " GROUP BY entry_kind ORDER BY entry_kind", + ) + if err != nil { + return nil, err + } + defer rows.Close() + + fields := make(map[string]any) + for rows.Next() { + var ( + kind string + count int + ) + if err := rows.Scan(&kind, &count); err != nil { + return nil, err + } + fields[kind] = count + } + if err := rows.Err(); err != nil { + return nil, err + } + return fields, nil +} + +func (workspace *Workspace) closeAndDelete() error { + workspace.closeLock.Lock() + defer workspace.closeLock.Unlock() + + if workspace.closed { + return nil + } + workspace.closed = true + + if err := workspace.database.Close(); err != nil { + return core.E("store.Workspace.closeAndDelete", "close workspace database", err) + } + for _, path := range []string{workspace.databasePath, workspace.databasePath + "-wal", workspace.databasePath + "-shm"} { + if result := workspace.filesystem.Delete(path); !result.OK && workspace.filesystem.Exists(path) { + return core.E("store.Workspace.closeAndDelete", "delete workspace file", result.Value.(error)) + } + } + return nil +} + +func (storeInstance *Store) commitWorkspaceAggregate(workspaceName string, fields map[string]any) error { + if err := ensureJournalSchema(storeInstance.database); err != nil { + return core.E("store.Workspace.Commit", "ensure journal schema", err) + } + + transaction, err := storeInstance.database.Begin() + if err != nil { + return core.E("store.Workspace.Commit", "begin transaction", err) + } + + committed := false + defer func() { + if !committed { + _ = transaction.Rollback() + } + }() + + fieldsJSON, err := jsonString(fields, "store.Workspace.Commit", "marshal summary") + if err != nil { + return err + } + tagsJSON, err := jsonString(map[string]string{"workspace": workspaceName}, "store.Workspace.Commit", "marshal tags") + if err != nil { + return err + } + + if err := insertJournalEntry( + transaction, + storeInstance.journalBucket(), + workspaceName, + fieldsJSON, + tagsJSON, + time.Now().UnixMilli(), + ); err != nil { + return core.E("store.Workspace.Commit", "insert journal entry", err) + } + + if _, err := transaction.Exec( + "INSERT INTO "+entriesTableName+" ("+entryGroupColumn+", "+entryKeyColumn+", "+entryValueColumn+", expires_at) VALUES (?, ?, ?, NULL) "+ + "ON CONFLICT("+entryGroupColumn+", "+entryKeyColumn+") DO UPDATE SET "+entryValueColumn+" = excluded."+entryValueColumn+", expires_at = NULL", + workspaceSummaryGroup(workspaceName), + "summary", + fieldsJSON, + ); err != nil { + return core.E("store.Workspace.Commit", "upsert workspace summary", err) + } + + if err := transaction.Commit(); err != nil { + return core.E("store.Workspace.Commit", "commit transaction", err) + } + committed = true + return nil +} + +func openWorkspaceDatabase(databasePath string) (*sql.DB, error) { + workspaceDatabase, err := sql.Open("sqlite", databasePath) + if err != nil { + return nil, err + } + workspaceDatabase.SetMaxOpenConns(1) + if _, err := workspaceDatabase.Exec("PRAGMA journal_mode=WAL"); err != nil { + workspaceDatabase.Close() + return nil, err + } + if _, err := workspaceDatabase.Exec("PRAGMA busy_timeout=5000"); err != nil { + workspaceDatabase.Close() + return nil, err + } + if _, err := workspaceDatabase.Exec(createWorkspaceEntriesTableSQL); err != nil { + workspaceDatabase.Close() + return nil, err + } + return workspaceDatabase, nil +} + +func workspaceSummaryGroup(workspaceName string) string { + return core.Concat(workspaceIdentityGroupName, ":", workspaceName) +} + +func workspaceFilePath(stateDirectory, name string) string { + return joinPath(stateDirectory, core.Concat(name, ".duckdb")) +} + +func joinPath(base, name string) string { + if base == "" { + return name + } + return core.Concat(core.TrimSuffix(base, "/"), "/", name) +} diff --git a/workspace_test.go b/workspace_test.go new file mode 100644 index 0000000..214afe8 --- /dev/null +++ b/workspace_test.go @@ -0,0 +1,121 @@ +package store + +import ( + "testing" + + core "dappco.re/go/core" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestWorkspace_NewWorkspace_Good_CreatePutAggregateQuery(t *testing.T) { + stateDirectory := useWorkspaceStateDirectory(t) + + storeInstance, err := New(":memory:", WithJournal("http://127.0.0.1:8086", "core", "events")) + require.NoError(t, err) + defer storeInstance.Close() + + workspace, err := storeInstance.NewWorkspace("scroll-session") + require.NoError(t, err) + defer workspace.Discard() + + assert.Equal(t, workspaceFilePath(stateDirectory, "scroll-session"), workspace.databasePath) + assert.True(t, testFilesystem().Exists(workspace.databasePath)) + + require.NoError(t, workspace.Put("like", map[string]any{"user": "@alice"})) + require.NoError(t, workspace.Put("like", map[string]any{"user": "@bob"})) + require.NoError(t, workspace.Put("profile_match", map[string]any{"user": "@charlie"})) + + assert.Equal(t, map[string]any{"like": 2, "profile_match": 1}, workspace.Aggregate()) + + rows := requireResultRows( + t, + workspace.Query("SELECT entry_kind, COUNT(*) AS entry_count FROM workspace_entries GROUP BY entry_kind ORDER BY entry_kind"), + ) + require.Len(t, rows, 2) + assert.Equal(t, "like", rows[0]["entry_kind"]) + assert.Equal(t, int64(2), rows[0]["entry_count"]) + assert.Equal(t, "profile_match", rows[1]["entry_kind"]) + assert.Equal(t, int64(1), rows[1]["entry_count"]) +} + +func TestWorkspace_Commit_Good_JournalAndSummary(t *testing.T) { + useWorkspaceStateDirectory(t) + + storeInstance, err := New(":memory:", WithJournal("http://127.0.0.1:8086", "core", "events")) + require.NoError(t, err) + defer storeInstance.Close() + + workspace, err := storeInstance.NewWorkspace("scroll-session") + require.NoError(t, err) + + require.NoError(t, workspace.Put("like", map[string]any{"user": "@alice"})) + require.NoError(t, workspace.Put("like", map[string]any{"user": "@bob"})) + require.NoError(t, workspace.Put("profile_match", map[string]any{"user": "@charlie"})) + + result := workspace.Commit() + require.True(t, result.OK, "workspace commit failed: %v", result.Value) + assert.Equal(t, map[string]any{"like": 2, "profile_match": 1}, result.Value) + assert.False(t, testFilesystem().Exists(workspace.databasePath)) + + summaryJSON, err := storeInstance.Get(workspaceSummaryGroup("scroll-session"), "summary") + require.NoError(t, err) + + summary := make(map[string]any) + summaryResult := core.JSONUnmarshalString(summaryJSON, &summary) + require.True(t, summaryResult.OK, "summary unmarshal failed: %v", summaryResult.Value) + assert.Equal(t, float64(2), summary["like"]) + assert.Equal(t, float64(1), summary["profile_match"]) + + rows := requireResultRows( + t, + storeInstance.QueryJournal(`from(bucket: "events") |> range(start: -24h) |> filter(fn: (r) => r._measurement == "scroll-session")`), + ) + require.Len(t, rows, 1) + assert.Equal(t, "scroll-session", rows[0]["measurement"]) + + fields, ok := rows[0]["fields"].(map[string]any) + require.True(t, ok, "unexpected fields type: %T", rows[0]["fields"]) + assert.Equal(t, float64(2), fields["like"]) + assert.Equal(t, float64(1), fields["profile_match"]) + + tags, ok := rows[0]["tags"].(map[string]string) + require.True(t, ok, "unexpected tags type: %T", rows[0]["tags"]) + assert.Equal(t, "scroll-session", tags["workspace"]) +} + +func TestWorkspace_Discard_Good_Idempotent(t *testing.T) { + useWorkspaceStateDirectory(t) + + storeInstance, err := New(":memory:") + require.NoError(t, err) + defer storeInstance.Close() + + workspace, err := storeInstance.NewWorkspace("discard-session") + require.NoError(t, err) + + workspace.Discard() + workspace.Discard() + + assert.False(t, testFilesystem().Exists(workspace.databasePath)) +} + +func TestWorkspace_RecoverOrphans_Good(t *testing.T) { + stateDirectory := useWorkspaceStateDirectory(t) + + storeInstance, err := New(":memory:", WithJournal("http://127.0.0.1:8086", "core", "events")) + require.NoError(t, err) + defer storeInstance.Close() + + workspace, err := storeInstance.NewWorkspace("orphan-session") + require.NoError(t, err) + require.NoError(t, workspace.Put("like", map[string]any{"user": "@alice"})) + require.NoError(t, workspace.database.Close()) + + orphans := storeInstance.RecoverOrphans(stateDirectory) + require.Len(t, orphans, 1) + assert.Equal(t, map[string]any{"like": 1}, orphans[0].Aggregate()) + + orphans[0].Discard() + assert.False(t, testFilesystem().Exists(workspaceFilePath(stateDirectory, "orphan-session"))) +}