fix(store): support Flux bucket filters in QueryJournal
Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
parent
2a28b5a71b
commit
3742da144e
2 changed files with 42 additions and 1 deletions
17
journal.go
17
journal.go
|
|
@ -30,6 +30,10 @@ var (
|
|||
regexp.MustCompile(`(?:_measurement|measurement)\s*==\s*"([^"]+)"`),
|
||||
regexp.MustCompile(`\[\s*"(?:_measurement|measurement)"\s*\]\s*==\s*"([^"]+)"`),
|
||||
}
|
||||
journalBucketEqualityPatterns = []*regexp.Regexp{
|
||||
regexp.MustCompile(`r\.(?:_bucket|bucket|bucket_name)\s*==\s*"([^"]+)"`),
|
||||
regexp.MustCompile(`r\[\s*"(?:_bucket|bucket|bucket_name)"\s*\]\s*==\s*"([^"]+)"`),
|
||||
}
|
||||
journalEqualityPatterns = []*regexp.Regexp{
|
||||
regexp.MustCompile(`r\.([a-zA-Z0-9_:-]+)\s*==\s*"([^"]+)"`),
|
||||
regexp.MustCompile(`r\[\s*"([a-zA-Z0-9_:-]+)"\s*\]\s*==\s*"([^"]+)"`),
|
||||
|
|
@ -175,6 +179,17 @@ func (storeInstance *Store) queryJournalFlux(flux string) (string, []any, error)
|
|||
queryArguments = append(queryArguments, stopTime.UnixMilli())
|
||||
}
|
||||
|
||||
for _, pattern := range journalBucketEqualityPatterns {
|
||||
bucketMatches := pattern.FindAllStringSubmatch(flux, -1)
|
||||
for _, match := range bucketMatches {
|
||||
if len(match) < 2 {
|
||||
continue
|
||||
}
|
||||
queryBuilder.WriteString(" AND bucket_name = ?")
|
||||
queryArguments = append(queryArguments, match[1])
|
||||
}
|
||||
}
|
||||
|
||||
for _, pattern := range journalEqualityPatterns {
|
||||
matches := pattern.FindAllStringSubmatch(flux, -1)
|
||||
for _, match := range matches {
|
||||
|
|
@ -183,7 +198,7 @@ func (storeInstance *Store) queryJournalFlux(flux string) (string, []any, error)
|
|||
}
|
||||
columnName := match[1]
|
||||
filterValue := match[2]
|
||||
if columnName == "_measurement" || columnName == "measurement" || columnName == "_bucket" || columnName == "bucket" {
|
||||
if columnName == "_measurement" || columnName == "measurement" || columnName == "_bucket" || columnName == "bucket" || columnName == "bucket_name" {
|
||||
continue
|
||||
}
|
||||
queryBuilder.WriteString(" AND (CAST(json_extract(tags_json, '$.\"' || ? || '\"') AS TEXT) = ? OR CAST(json_extract(fields_json, '$.\"' || ? || '\"') AS TEXT) = ?)")
|
||||
|
|
|
|||
|
|
@ -127,6 +127,32 @@ func TestJournal_QueryJournal_Good_TagFilter(t *testing.T) {
|
|||
assert.Equal(t, "session-b", tags["workspace"])
|
||||
}
|
||||
|
||||
func TestJournal_QueryJournal_Good_BucketFilter(t *testing.T) {
|
||||
storeInstance, err := New(":memory:")
|
||||
require.NoError(t, err)
|
||||
defer storeInstance.Close()
|
||||
|
||||
require.True(t,
|
||||
storeInstance.CommitToJournal("session-a", map[string]any{"like": 1}, map[string]string{"workspace": "session-a"}).OK,
|
||||
)
|
||||
require.NoError(t, insertJournalEntry(
|
||||
storeInstance.database,
|
||||
"events",
|
||||
"session-b",
|
||||
`{"like":2}`,
|
||||
`{"workspace":"session-b"}`,
|
||||
time.Now().UnixMilli(),
|
||||
))
|
||||
|
||||
rows := requireResultRows(
|
||||
t,
|
||||
storeInstance.QueryJournal(`from(bucket: "events") |> range(start: -24h) |> filter(fn: (r) => r._bucket == "events")`),
|
||||
)
|
||||
require.Len(t, rows, 1)
|
||||
assert.Equal(t, "session-b", rows[0]["measurement"])
|
||||
assert.Equal(t, "events", rows[0]["bucket_name"])
|
||||
}
|
||||
|
||||
func TestJournal_QueryJournal_Good_AbsoluteRangeWithStop(t *testing.T) {
|
||||
storeInstance, err := New(":memory:", WithJournal("http://127.0.0.1:8086", "core", "events"))
|
||||
require.NoError(t, err)
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue