From 41eaa7c96ce3ca55eefa01c2abf352cdafab8ae9 Mon Sep 17 00:00:00 2001 From: Virgil Date: Fri, 3 Apr 2026 08:20:00 +0000 Subject: [PATCH] feat(journal): support Flux equality filters Co-Authored-By: Virgil --- journal.go | 20 ++++++++++++++++++++ journal_test.go | 24 ++++++++++++++++++++++++ 2 files changed, 44 insertions(+) diff --git a/journal.go b/journal.go index eec9910..72baf3f 100644 --- a/journal.go +++ b/journal.go @@ -30,6 +30,10 @@ var ( regexp.MustCompile(`(?:_measurement|measurement)\s*==\s*"([^"]+)"`), regexp.MustCompile(`\[\s*"(?:_measurement|measurement)"\s*\]\s*==\s*"([^"]+)"`), } + journalEqualityPatterns = []*regexp.Regexp{ + regexp.MustCompile(`r\.([a-zA-Z0-9_:-]+)\s*==\s*"([^"]+)"`), + regexp.MustCompile(`r\[\s*"([a-zA-Z0-9_:-]+)"\s*\]\s*==\s*"([^"]+)"`), + } ) type journalExecutor interface { @@ -163,6 +167,22 @@ func (storeInstance *Store) queryJournalFlux(flux string) (string, []any, error) arguments = append(arguments, stopTime.UnixMilli()) } + for _, pattern := range journalEqualityPatterns { + matches := pattern.FindAllStringSubmatch(flux, -1) + for _, match := range matches { + if len(match) < 3 { + continue + } + columnName := match[1] + filterValue := match[2] + if columnName == "_measurement" || columnName == "measurement" || columnName == "_bucket" || columnName == "bucket" { + continue + } + builder.WriteString(" AND (CAST(json_extract(tags_json, '$.\"' || ? || '\"') AS TEXT) = ? OR CAST(json_extract(fields_json, '$.\"' || ? || '\"') AS TEXT) = ?)") + arguments = append(arguments, columnName, filterValue, columnName, filterValue) + } + } + builder.WriteString(" ORDER BY committed_at") return builder.String(), arguments, nil } diff --git a/journal_test.go b/journal_test.go index ad26a73..c148ba8 100644 --- a/journal_test.go +++ b/journal_test.go @@ -59,6 +59,30 @@ func TestJournal_QueryJournal_Good_FluxFilters(t *testing.T) { assert.Equal(t, float64(2), fields["like"]) } +func TestJournal_QueryJournal_Good_TagFilter(t *testing.T) { + storeInstance, err := New(":memory:", WithJournal("http://127.0.0.1:8086", "core", "events")) + require.NoError(t, err) + defer storeInstance.Close() + + require.True(t, + storeInstance.CommitToJournal("session-a", map[string]any{"like": 1}, map[string]string{"workspace": "session-a"}).OK, + ) + require.True(t, + storeInstance.CommitToJournal("session-b", map[string]any{"like": 2}, map[string]string{"workspace": "session-b"}).OK, + ) + + rows := requireResultRows( + t, + storeInstance.QueryJournal(`from(bucket: "events") |> range(start: -24h) |> filter(fn: (r) => r.workspace == "session-b")`), + ) + require.Len(t, rows, 1) + assert.Equal(t, "session-b", rows[0]["measurement"]) + + tags, ok := rows[0]["tags"].(map[string]string) + require.True(t, ok, "unexpected tags type: %T", rows[0]["tags"]) + assert.Equal(t, "session-b", tags["workspace"]) +} + func TestJournal_QueryJournal_Good_AbsoluteRangeWithStop(t *testing.T) { storeInstance, err := New(":memory:", WithJournal("http://127.0.0.1:8086", "core", "events")) require.NoError(t, err)