diff --git a/journal.go b/journal.go index 81cd85d..9063b59 100644 --- a/journal.go +++ b/journal.go @@ -30,6 +30,10 @@ var ( regexp.MustCompile(`(?:_measurement|measurement)\s*==\s*"([^"]+)"`), regexp.MustCompile(`\[\s*"(?:_measurement|measurement)"\s*\]\s*==\s*"([^"]+)"`), } + journalBucketEqualityPatterns = []*regexp.Regexp{ + regexp.MustCompile(`r\.(?:_bucket|bucket|bucket_name)\s*==\s*"([^"]+)"`), + regexp.MustCompile(`r\[\s*"(?:_bucket|bucket|bucket_name)"\s*\]\s*==\s*"([^"]+)"`), + } journalEqualityPatterns = []*regexp.Regexp{ regexp.MustCompile(`r\.([a-zA-Z0-9_:-]+)\s*==\s*"([^"]+)"`), regexp.MustCompile(`r\[\s*"([a-zA-Z0-9_:-]+)"\s*\]\s*==\s*"([^"]+)"`), @@ -175,6 +179,17 @@ func (storeInstance *Store) queryJournalFlux(flux string) (string, []any, error) queryArguments = append(queryArguments, stopTime.UnixMilli()) } + for _, pattern := range journalBucketEqualityPatterns { + bucketMatches := pattern.FindAllStringSubmatch(flux, -1) + for _, match := range bucketMatches { + if len(match) < 2 { + continue + } + queryBuilder.WriteString(" AND bucket_name = ?") + queryArguments = append(queryArguments, match[1]) + } + } + for _, pattern := range journalEqualityPatterns { matches := pattern.FindAllStringSubmatch(flux, -1) for _, match := range matches { @@ -183,7 +198,7 @@ func (storeInstance *Store) queryJournalFlux(flux string) (string, []any, error) } columnName := match[1] filterValue := match[2] - if columnName == "_measurement" || columnName == "measurement" || columnName == "_bucket" || columnName == "bucket" { + if columnName == "_measurement" || columnName == "measurement" || columnName == "_bucket" || columnName == "bucket" || columnName == "bucket_name" { continue } queryBuilder.WriteString(" AND (CAST(json_extract(tags_json, '$.\"' || ? || '\"') AS TEXT) = ? OR CAST(json_extract(fields_json, '$.\"' || ? || '\"') AS TEXT) = ?)") diff --git a/journal_test.go b/journal_test.go index a8aa10e..ad31482 100644 --- a/journal_test.go +++ b/journal_test.go @@ -127,6 +127,32 @@ func TestJournal_QueryJournal_Good_TagFilter(t *testing.T) { assert.Equal(t, "session-b", tags["workspace"]) } +func TestJournal_QueryJournal_Good_BucketFilter(t *testing.T) { + storeInstance, err := New(":memory:") + require.NoError(t, err) + defer storeInstance.Close() + + require.True(t, + storeInstance.CommitToJournal("session-a", map[string]any{"like": 1}, map[string]string{"workspace": "session-a"}).OK, + ) + require.NoError(t, insertJournalEntry( + storeInstance.database, + "events", + "session-b", + `{"like":2}`, + `{"workspace":"session-b"}`, + time.Now().UnixMilli(), + )) + + rows := requireResultRows( + t, + storeInstance.QueryJournal(`from(bucket: "events") |> range(start: -24h) |> filter(fn: (r) => r._bucket == "events")`), + ) + require.Len(t, rows, 1) + assert.Equal(t, "session-b", rows[0]["measurement"]) + assert.Equal(t, "events", rows[0]["bucket_name"]) +} + func TestJournal_QueryJournal_Good_AbsoluteRangeWithStop(t *testing.T) { storeInstance, err := New(":memory:", WithJournal("http://127.0.0.1:8086", "core", "events")) require.NoError(t, err)