From d8183f26b6ad461ceaba779dbf759356d1af6580 Mon Sep 17 00:00:00 2001 From: Virgil Date: Sat, 4 Apr 2026 17:39:45 +0000 Subject: [PATCH] fix: support scalar Flux journal filters Co-Authored-By: Virgil --- journal.go | 88 +++++++++++++++++++++++++++++++++++++++++-------- journal_test.go | 48 +++++++++++++++++++++++++++ 2 files changed, 123 insertions(+), 13 deletions(-) diff --git a/journal.go b/journal.go index cb90dc7..ab2e91e 100644 --- a/journal.go +++ b/journal.go @@ -34,12 +34,22 @@ var ( regexp.MustCompile(`r\.(?:_bucket|bucket|bucket_name)\s*==\s*"([^"]+)"`), regexp.MustCompile(`r\[\s*"(?:_bucket|bucket|bucket_name)"\s*\]\s*==\s*"([^"]+)"`), } - journalEqualityPatterns = []*regexp.Regexp{ + journalStringEqualityPatterns = []*regexp.Regexp{ regexp.MustCompile(`r\.([a-zA-Z0-9_:-]+)\s*==\s*"([^"]+)"`), regexp.MustCompile(`r\[\s*"([a-zA-Z0-9_:-]+)"\s*\]\s*==\s*"([^"]+)"`), } + journalScalarEqualityPatterns = []*regexp.Regexp{ + regexp.MustCompile(`r\.([a-zA-Z0-9_:-]+)\s*==\s*(true|false|-?[0-9]+(?:\.[0-9]+)?)`), + regexp.MustCompile(`r\[\s*"([a-zA-Z0-9_:-]+)"\s*\]\s*==\s*(true|false|-?[0-9]+(?:\.[0-9]+)?)`), + } ) +type journalEqualityFilter struct { + columnName string + filterValue any + stringCompare bool +} + type journalExecutor interface { Exec(query string, args ...any) (sql.Result, error) } @@ -191,20 +201,15 @@ func (storeInstance *Store) queryJournalFromFlux(flux string) (string, []any, er } } - for _, pattern := range journalEqualityPatterns { - matches := pattern.FindAllStringSubmatch(flux, -1) - for _, match := range matches { - if len(match) < 3 { - continue - } - columnName := match[1] - filterValue := match[2] - if columnName == "_measurement" || columnName == "measurement" || columnName == "_bucket" || columnName == "bucket" || columnName == "bucket_name" { - continue - } + for _, filter := range journalEqualityFilters(flux) { + if filter.stringCompare { queryBuilder.WriteString(" AND (CAST(json_extract(tags_json, '$.\"' || ? || '\"') AS TEXT) = ? OR CAST(json_extract(fields_json, '$.\"' || ? || '\"') AS TEXT) = ?)") - queryArguments = append(queryArguments, columnName, filterValue, columnName, filterValue) + queryArguments = append(queryArguments, filter.columnName, filter.filterValue, filter.columnName, filter.filterValue) + continue } + + queryBuilder.WriteString(" AND json_extract(fields_json, '$.\"' || ? || '\"') = ?") + queryArguments = append(queryArguments, filter.columnName, filter.filterValue) } queryBuilder.WriteString(" ORDER BY committed_at, entry_id") @@ -430,6 +435,63 @@ func normaliseRowValue(value any) any { } } +func journalEqualityFilters(flux string) []journalEqualityFilter { + var filters []journalEqualityFilter + appendFilter := func(columnName string, filterValue any, stringCompare bool) { + if columnName == "_measurement" || columnName == "measurement" || columnName == "_bucket" || columnName == "bucket" || columnName == "bucket_name" { + return + } + filters = append(filters, journalEqualityFilter{ + columnName: columnName, + filterValue: filterValue, + stringCompare: stringCompare, + }) + } + + for _, pattern := range journalStringEqualityPatterns { + matches := pattern.FindAllStringSubmatch(flux, -1) + for _, match := range matches { + if len(match) < 3 { + continue + } + appendFilter(match[1], match[2], true) + } + } + + for _, pattern := range journalScalarEqualityPatterns { + matches := pattern.FindAllStringSubmatch(flux, -1) + for _, match := range matches { + if len(match) < 3 { + continue + } + filterValue, ok := parseJournalScalarValue(match[2]) + if !ok { + continue + } + appendFilter(match[1], filterValue, false) + } + } + + return filters +} + +func parseJournalScalarValue(value string) (any, bool) { + switch value { + case "true": + return true, true + case "false": + return false, true + } + + if integerValue, err := strconv.ParseInt(value, 10, 64); err == nil { + return integerValue, true + } + if floatValue, err := strconv.ParseFloat(value, 64); err == nil { + return floatValue, true + } + return nil, false +} + func cloneAnyMap(input map[string]any) map[string]any { if input == nil { return map[string]any{} diff --git a/journal_test.go b/journal_test.go index 1bdcb05..12d44fb 100644 --- a/journal_test.go +++ b/journal_test.go @@ -153,6 +153,54 @@ func TestJournal_QueryJournal_Good_TagFilter(t *testing.T) { assert.Equal(t, "session-b", tags["workspace"]) } +func TestJournal_QueryJournal_Good_NumericFieldFilter(t *testing.T) { + storeInstance, err := New(":memory:", WithJournal("http://127.0.0.1:8086", "core", "events")) + require.NoError(t, err) + defer storeInstance.Close() + + require.True(t, + storeInstance.CommitToJournal("session-a", map[string]any{"like": 1}, map[string]string{"workspace": "session-a"}).OK, + ) + require.True(t, + storeInstance.CommitToJournal("session-b", map[string]any{"like": 2}, map[string]string{"workspace": "session-b"}).OK, + ) + + rows := requireResultRows( + t, + storeInstance.QueryJournal(`from(bucket: "events") |> range(start: -24h) |> filter(fn: (r) => r.like == 2)`), + ) + require.Len(t, rows, 1) + assert.Equal(t, "session-b", rows[0]["measurement"]) + + fields, ok := rows[0]["fields"].(map[string]any) + require.True(t, ok, "unexpected fields type: %T", rows[0]["fields"]) + assert.Equal(t, float64(2), fields["like"]) +} + +func TestJournal_QueryJournal_Good_BooleanFieldFilter(t *testing.T) { + storeInstance, err := New(":memory:", WithJournal("http://127.0.0.1:8086", "core", "events")) + require.NoError(t, err) + defer storeInstance.Close() + + require.True(t, + storeInstance.CommitToJournal("session-a", map[string]any{"complete": false}, map[string]string{"workspace": "session-a"}).OK, + ) + require.True(t, + storeInstance.CommitToJournal("session-b", map[string]any{"complete": true}, map[string]string{"workspace": "session-b"}).OK, + ) + + rows := requireResultRows( + t, + storeInstance.QueryJournal(`from(bucket: "events") |> range(start: -24h) |> filter(fn: (r) => r["complete"] == true)`), + ) + require.Len(t, rows, 1) + assert.Equal(t, "session-b", rows[0]["measurement"]) + + fields, ok := rows[0]["fields"].(map[string]any) + require.True(t, ok, "unexpected fields type: %T", rows[0]["fields"]) + assert.Equal(t, true, fields["complete"]) +} + func TestJournal_QueryJournal_Good_BucketFilter(t *testing.T) { storeInstance, err := New(":memory:") require.NoError(t, err)