diff --git a/journal.go b/journal.go index 04b255d..3cdea87 100644 --- a/journal.go +++ b/journal.go @@ -199,9 +199,21 @@ func jsonString(value any, operation, message string) (string, error) { } func fluxStartTime(value string) (time.Time, error) { + value = core.Trim(value) if value == "" { return time.Time{}, core.E("store.fluxStartTime", "range value is empty", nil) } + value = firstString(core.Split(value, ",")) + value = core.Trim(value) + if core.HasPrefix(value, "time(v:") && core.HasSuffix(value, ")") { + value = core.Trim(core.TrimSuffix(core.TrimPrefix(value, "time(v:"), ")")) + } + if core.HasPrefix(value, `"`) && core.HasSuffix(value, `"`) { + value = core.TrimSuffix(core.TrimPrefix(value, `"`), `"`) + } + if value == "now()" { + return time.Now(), nil + } if core.HasSuffix(value, "d") { days, err := strconv.Atoi(core.TrimSuffix(value, "d")) if err != nil { @@ -210,10 +222,14 @@ func fluxStartTime(value string) (time.Time, error) { return time.Now().Add(time.Duration(days) * 24 * time.Hour), nil } lookback, err := time.ParseDuration(value) + if err == nil { + return time.Now().Add(lookback), nil + } + parsedTime, err := time.Parse(time.RFC3339Nano, value) if err != nil { return time.Time{}, err } - return time.Now().Add(lookback), nil + return parsedTime, nil } func quotedSubmatch(pattern *regexp.Regexp, value string) string { diff --git a/journal_test.go b/journal_test.go index 9801079..14ff31a 100644 --- a/journal_test.go +++ b/journal_test.go @@ -2,6 +2,7 @@ package store import ( "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -58,6 +59,39 @@ func TestJournal_QueryJournal_Good_FluxFilters(t *testing.T) { assert.Equal(t, float64(2), fields["like"]) } +func TestJournal_QueryJournal_Good_AbsoluteRangeWithStop(t *testing.T) { + storeInstance, err := New(":memory:", WithJournal("http://127.0.0.1:8086", "core", "events")) + require.NoError(t, err) + defer storeInstance.Close() + + require.True(t, + storeInstance.CommitToJournal("session-a", map[string]any{"like": 1}, map[string]string{"workspace": "session-a"}).OK, + ) + require.True(t, + storeInstance.CommitToJournal("session-b", map[string]any{"like": 2}, map[string]string{"workspace": "session-b"}).OK, + ) + + _, err = storeInstance.database.Exec( + "UPDATE "+journalEntriesTableName+" SET committed_at = ? WHERE measurement = ?", + time.Date(2026, 3, 29, 12, 0, 0, 0, time.UTC).UnixMilli(), + "session-a", + ) + require.NoError(t, err) + _, err = storeInstance.database.Exec( + "UPDATE "+journalEntriesTableName+" SET committed_at = ? WHERE measurement = ?", + time.Date(2026, 3, 30, 12, 0, 0, 0, time.UTC).UnixMilli(), + "session-b", + ) + require.NoError(t, err) + + rows := requireResultRows( + t, + storeInstance.QueryJournal(`from(bucket: "events") |> range(start: "2026-03-30T00:00:00Z", stop: now())`), + ) + require.Len(t, rows, 1) + assert.Equal(t, "session-b", rows[0]["measurement"]) +} + func TestJournal_CommitToJournal_Bad_EmptyMeasurement(t *testing.T) { storeInstance, err := New(":memory:") require.NoError(t, err) diff --git a/store.go b/store.go index 7eb3ab5..3cb9f31 100644 --- a/store.go +++ b/store.go @@ -128,7 +128,7 @@ func (storeInstance *Store) Get(group, key string) (string, error) { return "", core.E("store.Get", "query row", err) } if expiresAt.Valid && expiresAt.Int64 <= time.Now().UnixMilli() { - if _, err := storeInstance.database.Exec("DELETE FROM "+entriesTableName+" WHERE "+entryGroupColumn+" = ? AND "+entryKeyColumn+" = ?", group, key); err != nil { + if err := storeInstance.Delete(group, key); err != nil { return "", core.E("store.Get", "delete expired row", err) } return "", core.E("store.Get", core.Concat(group, "/", key), NotFoundError) diff --git a/store_test.go b/store_test.go index 6dc2e44..4f462cd 100644 --- a/store_test.go +++ b/store_test.go @@ -1088,6 +1088,33 @@ func TestStore_SetWithTTL_Good_ExpiresOnGet(t *testing.T) { assert.True(t, core.Is(err, NotFoundError), "expired key should be NotFoundError") } +func TestStore_SetWithTTL_Good_ExpiresOnGetEmitsDeleteEvent(t *testing.T) { + storeInstance, _ := New(":memory:") + defer storeInstance.Close() + + watcher := storeInstance.Watch("g", "ephemeral") + defer storeInstance.Unwatch(watcher) + + require.NoError(t, storeInstance.SetWithTTL("g", "ephemeral", "gone-soon", 1*time.Millisecond)) + <-watcher.Events + + time.Sleep(5 * time.Millisecond) + + _, err := storeInstance.Get("g", "ephemeral") + require.Error(t, err) + assert.True(t, core.Is(err, NotFoundError), "expired key should be NotFoundError") + + select { + case event := <-watcher.Events: + assert.Equal(t, EventDelete, event.Type) + assert.Equal(t, "g", event.Group) + assert.Equal(t, "ephemeral", event.Key) + assert.Empty(t, event.Value) + case <-time.After(time.Second): + t.Fatal("timed out waiting for lazy expiry delete event") + } +} + func TestStore_SetWithTTL_Good_ExcludedFromCount(t *testing.T) { storeInstance, _ := New(":memory:") defer storeInstance.Close() diff --git a/workspace.go b/workspace.go index 222a933..ab88c6a 100644 --- a/workspace.go +++ b/workspace.go @@ -294,6 +294,13 @@ func (storeInstance *Store) commitWorkspaceAggregate(workspaceName string, field return core.E("store.Workspace.Commit", "commit transaction", err) } committed = true + storeInstance.notify(Event{ + Type: EventSet, + Group: workspaceSummaryGroup(workspaceName), + Key: "summary", + Value: fieldsJSON, + Timestamp: time.Now(), + }) return nil } diff --git a/workspace_test.go b/workspace_test.go index 09d259b..21ba622 100644 --- a/workspace_test.go +++ b/workspace_test.go @@ -2,6 +2,7 @@ package store import ( "testing" + "time" core "dappco.re/go/core" "github.com/stretchr/testify/assert" @@ -110,6 +111,42 @@ func TestWorkspace_Commit_Good_JournalAndSummary(t *testing.T) { assert.Equal(t, "scroll-session", tags["workspace"]) } +func TestWorkspace_Commit_Good_EmitsSummaryEvent(t *testing.T) { + useWorkspaceStateDirectory(t) + + storeInstance, err := New(":memory:", WithJournal("http://127.0.0.1:8086", "core", "events")) + require.NoError(t, err) + defer storeInstance.Close() + + watcher := storeInstance.Watch(workspaceSummaryGroup("scroll-session"), "summary") + defer storeInstance.Unwatch(watcher) + + workspace, err := storeInstance.NewWorkspace("scroll-session") + require.NoError(t, err) + + require.NoError(t, workspace.Put("like", map[string]any{"user": "@alice"})) + require.NoError(t, workspace.Put("profile_match", map[string]any{"user": "@charlie"})) + + result := workspace.Commit() + require.True(t, result.OK, "workspace commit failed: %v", result.Value) + + select { + case event := <-watcher.Events: + assert.Equal(t, EventSet, event.Type) + assert.Equal(t, workspaceSummaryGroup("scroll-session"), event.Group) + assert.Equal(t, "summary", event.Key) + assert.False(t, event.Timestamp.IsZero()) + + summary := make(map[string]any) + summaryResult := core.JSONUnmarshalString(event.Value, &summary) + require.True(t, summaryResult.OK, "summary event unmarshal failed: %v", summaryResult.Value) + assert.Equal(t, float64(1), summary["like"]) + assert.Equal(t, float64(1), summary["profile_match"]) + case <-time.After(time.Second): + t.Fatal("timed out waiting for workspace summary event") + } +} + func TestWorkspace_Discard_Good_Idempotent(t *testing.T) { useWorkspaceStateDirectory(t)