fix(store): honour Flux range stop bound

Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
Virgil 2026-04-03 04:50:26 +00:00
parent 5c7e243fc0
commit 2353bdf2f7
2 changed files with 105 additions and 9 deletions

View file

@ -26,7 +26,6 @@ const createJournalEntriesTableSQL = `CREATE TABLE IF NOT EXISTS journal_entries
var (
journalBucketPattern = regexp.MustCompile(`bucket:\s*"([^"]+)"`)
journalRangePattern = regexp.MustCompile(`range\(\s*start:\s*([^)]+)\)`)
journalMeasurementPattern = regexp.MustCompile(`(?:_measurement|measurement)\s*==\s*"([^"]+)"`)
)
@ -138,18 +137,23 @@ func (storeInstance *Store) queryJournalFlux(flux string) (string, []any, error)
arguments = append(arguments, measurement)
}
rangeMatch := quotedSubmatch(journalRangePattern, flux)
if rangeMatch == "" {
rangeMatch = regexpSubmatch(journalRangePattern, flux, 1)
}
if rangeMatch != "" {
startTime, err := fluxStartTime(core.Trim(rangeMatch))
startRange, stopRange := journalRangeBounds(flux)
if startRange != "" {
startTime, err := fluxTime(core.Trim(startRange))
if err != nil {
return "", nil, core.E("store.QueryJournal", "parse range", err)
}
builder.WriteString(" AND committed_at >= ?")
arguments = append(arguments, startTime.UnixMilli())
}
if stopRange != "" {
stopTime, err := fluxTime(core.Trim(stopRange))
if err != nil {
return "", nil, core.E("store.QueryJournal", "parse range", err)
}
builder.WriteString(" AND committed_at < ?")
arguments = append(arguments, stopTime.UnixMilli())
}
builder.WriteString(" ORDER BY committed_at")
return builder.String(), arguments, nil
@ -198,10 +202,69 @@ func jsonString(value any, operation, message string) (string, error) {
return string(result.Value.([]byte)), nil
}
func fluxStartTime(value string) (time.Time, error) {
func journalRangeBounds(flux string) (string, string) {
rangeIndex := indexOf(flux, "range(")
if rangeIndex < 0 {
return "", ""
}
contentStart := rangeIndex + len("range(")
depth := 1
contentEnd := -1
scanRange:
for i := contentStart; i < len(flux); i++ {
switch flux[i] {
case '(':
depth++
case ')':
depth--
if depth == 0 {
contentEnd = i
break scanRange
}
}
}
if contentEnd < 0 || contentEnd <= contentStart {
return "", ""
}
content := flux[contentStart:contentEnd]
startPrefix := "start:"
startIndex := indexOf(content, startPrefix)
if startIndex < 0 {
return "", ""
}
startIndex += len(startPrefix)
start := core.Trim(content[startIndex:])
stop := ""
if stopIndex := indexOf(content, ", stop:"); stopIndex >= 0 {
start = core.Trim(content[startIndex:stopIndex])
stop = core.Trim(content[stopIndex+len(", stop:"):])
} else if stopIndex := indexOf(content, ",stop:"); stopIndex >= 0 {
start = core.Trim(content[startIndex:stopIndex])
stop = core.Trim(content[stopIndex+len(",stop:"):])
}
return start, stop
}
func indexOf(text, substring string) int {
if substring == "" {
return 0
}
if len(substring) > len(text) {
return -1
}
for i := 0; i <= len(text)-len(substring); i++ {
if text[i:i+len(substring)] == substring {
return i
}
}
return -1
}
func fluxTime(value string) (time.Time, error) {
value = core.Trim(value)
if value == "" {
return time.Time{}, core.E("store.fluxStartTime", "range value is empty", nil)
return time.Time{}, core.E("store.fluxTime", "range value is empty", nil)
}
value = firstString(core.Split(value, ","))
value = core.Trim(value)

View file

@ -92,6 +92,39 @@ func TestJournal_QueryJournal_Good_AbsoluteRangeWithStop(t *testing.T) {
assert.Equal(t, "session-b", rows[0]["measurement"])
}
func TestJournal_QueryJournal_Good_AbsoluteRangeHonoursStop(t *testing.T) {
storeInstance, err := New(":memory:", WithJournal("http://127.0.0.1:8086", "core", "events"))
require.NoError(t, err)
defer storeInstance.Close()
require.True(t,
storeInstance.CommitToJournal("session-a", map[string]any{"like": 1}, map[string]string{"workspace": "session-a"}).OK,
)
require.True(t,
storeInstance.CommitToJournal("session-b", map[string]any{"like": 2}, map[string]string{"workspace": "session-b"}).OK,
)
_, err = storeInstance.database.Exec(
"UPDATE "+journalEntriesTableName+" SET committed_at = ? WHERE measurement = ?",
time.Date(2026, 3, 29, 12, 0, 0, 0, time.UTC).UnixMilli(),
"session-a",
)
require.NoError(t, err)
_, err = storeInstance.database.Exec(
"UPDATE "+journalEntriesTableName+" SET committed_at = ? WHERE measurement = ?",
time.Date(2026, 3, 30, 12, 0, 0, 0, time.UTC).UnixMilli(),
"session-b",
)
require.NoError(t, err)
rows := requireResultRows(
t,
storeInstance.QueryJournal(`from(bucket: "events") |> range(start: "2026-03-29T00:00:00Z", stop: "2026-03-30T00:00:00Z")`),
)
require.Len(t, rows, 1)
assert.Equal(t, "session-a", rows[0]["measurement"])
}
func TestJournal_CommitToJournal_Bad_EmptyMeasurement(t *testing.T) {
storeInstance, err := New(":memory:")
require.NoError(t, err)