diff --git a/journal.go b/journal.go index 3cdea87..9baa21b 100644 --- a/journal.go +++ b/journal.go @@ -26,7 +26,6 @@ const createJournalEntriesTableSQL = `CREATE TABLE IF NOT EXISTS journal_entries var ( journalBucketPattern = regexp.MustCompile(`bucket:\s*"([^"]+)"`) - journalRangePattern = regexp.MustCompile(`range\(\s*start:\s*([^)]+)\)`) journalMeasurementPattern = regexp.MustCompile(`(?:_measurement|measurement)\s*==\s*"([^"]+)"`) ) @@ -138,18 +137,23 @@ func (storeInstance *Store) queryJournalFlux(flux string) (string, []any, error) arguments = append(arguments, measurement) } - rangeMatch := quotedSubmatch(journalRangePattern, flux) - if rangeMatch == "" { - rangeMatch = regexpSubmatch(journalRangePattern, flux, 1) - } - if rangeMatch != "" { - startTime, err := fluxStartTime(core.Trim(rangeMatch)) + startRange, stopRange := journalRangeBounds(flux) + if startRange != "" { + startTime, err := fluxTime(core.Trim(startRange)) if err != nil { return "", nil, core.E("store.QueryJournal", "parse range", err) } builder.WriteString(" AND committed_at >= ?") arguments = append(arguments, startTime.UnixMilli()) } + if stopRange != "" { + stopTime, err := fluxTime(core.Trim(stopRange)) + if err != nil { + return "", nil, core.E("store.QueryJournal", "parse range", err) + } + builder.WriteString(" AND committed_at < ?") + arguments = append(arguments, stopTime.UnixMilli()) + } builder.WriteString(" ORDER BY committed_at") return builder.String(), arguments, nil @@ -198,10 +202,69 @@ func jsonString(value any, operation, message string) (string, error) { return string(result.Value.([]byte)), nil } -func fluxStartTime(value string) (time.Time, error) { +func journalRangeBounds(flux string) (string, string) { + rangeIndex := indexOf(flux, "range(") + if rangeIndex < 0 { + return "", "" + } + contentStart := rangeIndex + len("range(") + depth := 1 + contentEnd := -1 +scanRange: + for i := contentStart; i < len(flux); i++ { + switch flux[i] { + case '(': + depth++ + case ')': + depth-- + if depth == 0 { + contentEnd = i + break scanRange + } + } + } + if contentEnd < 0 || contentEnd <= contentStart { + return "", "" + } + + content := flux[contentStart:contentEnd] + startPrefix := "start:" + startIndex := indexOf(content, startPrefix) + if startIndex < 0 { + return "", "" + } + startIndex += len(startPrefix) + start := core.Trim(content[startIndex:]) + stop := "" + if stopIndex := indexOf(content, ", stop:"); stopIndex >= 0 { + start = core.Trim(content[startIndex:stopIndex]) + stop = core.Trim(content[stopIndex+len(", stop:"):]) + } else if stopIndex := indexOf(content, ",stop:"); stopIndex >= 0 { + start = core.Trim(content[startIndex:stopIndex]) + stop = core.Trim(content[stopIndex+len(",stop:"):]) + } + return start, stop +} + +func indexOf(text, substring string) int { + if substring == "" { + return 0 + } + if len(substring) > len(text) { + return -1 + } + for i := 0; i <= len(text)-len(substring); i++ { + if text[i:i+len(substring)] == substring { + return i + } + } + return -1 +} + +func fluxTime(value string) (time.Time, error) { value = core.Trim(value) if value == "" { - return time.Time{}, core.E("store.fluxStartTime", "range value is empty", nil) + return time.Time{}, core.E("store.fluxTime", "range value is empty", nil) } value = firstString(core.Split(value, ",")) value = core.Trim(value) diff --git a/journal_test.go b/journal_test.go index 14ff31a..ad26a73 100644 --- a/journal_test.go +++ b/journal_test.go @@ -92,6 +92,39 @@ func TestJournal_QueryJournal_Good_AbsoluteRangeWithStop(t *testing.T) { assert.Equal(t, "session-b", rows[0]["measurement"]) } +func TestJournal_QueryJournal_Good_AbsoluteRangeHonoursStop(t *testing.T) { + storeInstance, err := New(":memory:", WithJournal("http://127.0.0.1:8086", "core", "events")) + require.NoError(t, err) + defer storeInstance.Close() + + require.True(t, + storeInstance.CommitToJournal("session-a", map[string]any{"like": 1}, map[string]string{"workspace": "session-a"}).OK, + ) + require.True(t, + storeInstance.CommitToJournal("session-b", map[string]any{"like": 2}, map[string]string{"workspace": "session-b"}).OK, + ) + + _, err = storeInstance.database.Exec( + "UPDATE "+journalEntriesTableName+" SET committed_at = ? WHERE measurement = ?", + time.Date(2026, 3, 29, 12, 0, 0, 0, time.UTC).UnixMilli(), + "session-a", + ) + require.NoError(t, err) + _, err = storeInstance.database.Exec( + "UPDATE "+journalEntriesTableName+" SET committed_at = ? WHERE measurement = ?", + time.Date(2026, 3, 30, 12, 0, 0, 0, time.UTC).UnixMilli(), + "session-b", + ) + require.NoError(t, err) + + rows := requireResultRows( + t, + storeInstance.QueryJournal(`from(bucket: "events") |> range(start: "2026-03-29T00:00:00Z", stop: "2026-03-30T00:00:00Z")`), + ) + require.Len(t, rows, 1) + assert.Equal(t, "session-a", rows[0]["measurement"]) +} + func TestJournal_CommitToJournal_Bad_EmptyMeasurement(t *testing.T) { storeInstance, err := New(":memory:") require.NoError(t, err)