fix: support scalar Flux journal filters
Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
parent
c2ba21342a
commit
d8183f26b6
2 changed files with 123 additions and 13 deletions
88
journal.go
88
journal.go
|
|
@ -34,12 +34,22 @@ var (
|
|||
regexp.MustCompile(`r\.(?:_bucket|bucket|bucket_name)\s*==\s*"([^"]+)"`),
|
||||
regexp.MustCompile(`r\[\s*"(?:_bucket|bucket|bucket_name)"\s*\]\s*==\s*"([^"]+)"`),
|
||||
}
|
||||
journalEqualityPatterns = []*regexp.Regexp{
|
||||
journalStringEqualityPatterns = []*regexp.Regexp{
|
||||
regexp.MustCompile(`r\.([a-zA-Z0-9_:-]+)\s*==\s*"([^"]+)"`),
|
||||
regexp.MustCompile(`r\[\s*"([a-zA-Z0-9_:-]+)"\s*\]\s*==\s*"([^"]+)"`),
|
||||
}
|
||||
journalScalarEqualityPatterns = []*regexp.Regexp{
|
||||
regexp.MustCompile(`r\.([a-zA-Z0-9_:-]+)\s*==\s*(true|false|-?[0-9]+(?:\.[0-9]+)?)`),
|
||||
regexp.MustCompile(`r\[\s*"([a-zA-Z0-9_:-]+)"\s*\]\s*==\s*(true|false|-?[0-9]+(?:\.[0-9]+)?)`),
|
||||
}
|
||||
)
|
||||
|
||||
type journalEqualityFilter struct {
|
||||
columnName string
|
||||
filterValue any
|
||||
stringCompare bool
|
||||
}
|
||||
|
||||
type journalExecutor interface {
|
||||
Exec(query string, args ...any) (sql.Result, error)
|
||||
}
|
||||
|
|
@ -191,20 +201,15 @@ func (storeInstance *Store) queryJournalFromFlux(flux string) (string, []any, er
|
|||
}
|
||||
}
|
||||
|
||||
for _, pattern := range journalEqualityPatterns {
|
||||
matches := pattern.FindAllStringSubmatch(flux, -1)
|
||||
for _, match := range matches {
|
||||
if len(match) < 3 {
|
||||
continue
|
||||
}
|
||||
columnName := match[1]
|
||||
filterValue := match[2]
|
||||
if columnName == "_measurement" || columnName == "measurement" || columnName == "_bucket" || columnName == "bucket" || columnName == "bucket_name" {
|
||||
continue
|
||||
}
|
||||
for _, filter := range journalEqualityFilters(flux) {
|
||||
if filter.stringCompare {
|
||||
queryBuilder.WriteString(" AND (CAST(json_extract(tags_json, '$.\"' || ? || '\"') AS TEXT) = ? OR CAST(json_extract(fields_json, '$.\"' || ? || '\"') AS TEXT) = ?)")
|
||||
queryArguments = append(queryArguments, columnName, filterValue, columnName, filterValue)
|
||||
queryArguments = append(queryArguments, filter.columnName, filter.filterValue, filter.columnName, filter.filterValue)
|
||||
continue
|
||||
}
|
||||
|
||||
queryBuilder.WriteString(" AND json_extract(fields_json, '$.\"' || ? || '\"') = ?")
|
||||
queryArguments = append(queryArguments, filter.columnName, filter.filterValue)
|
||||
}
|
||||
|
||||
queryBuilder.WriteString(" ORDER BY committed_at, entry_id")
|
||||
|
|
@ -430,6 +435,63 @@ func normaliseRowValue(value any) any {
|
|||
}
|
||||
}
|
||||
|
||||
func journalEqualityFilters(flux string) []journalEqualityFilter {
|
||||
var filters []journalEqualityFilter
|
||||
appendFilter := func(columnName string, filterValue any, stringCompare bool) {
|
||||
if columnName == "_measurement" || columnName == "measurement" || columnName == "_bucket" || columnName == "bucket" || columnName == "bucket_name" {
|
||||
return
|
||||
}
|
||||
filters = append(filters, journalEqualityFilter{
|
||||
columnName: columnName,
|
||||
filterValue: filterValue,
|
||||
stringCompare: stringCompare,
|
||||
})
|
||||
}
|
||||
|
||||
for _, pattern := range journalStringEqualityPatterns {
|
||||
matches := pattern.FindAllStringSubmatch(flux, -1)
|
||||
for _, match := range matches {
|
||||
if len(match) < 3 {
|
||||
continue
|
||||
}
|
||||
appendFilter(match[1], match[2], true)
|
||||
}
|
||||
}
|
||||
|
||||
for _, pattern := range journalScalarEqualityPatterns {
|
||||
matches := pattern.FindAllStringSubmatch(flux, -1)
|
||||
for _, match := range matches {
|
||||
if len(match) < 3 {
|
||||
continue
|
||||
}
|
||||
filterValue, ok := parseJournalScalarValue(match[2])
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
appendFilter(match[1], filterValue, false)
|
||||
}
|
||||
}
|
||||
|
||||
return filters
|
||||
}
|
||||
|
||||
func parseJournalScalarValue(value string) (any, bool) {
|
||||
switch value {
|
||||
case "true":
|
||||
return true, true
|
||||
case "false":
|
||||
return false, true
|
||||
}
|
||||
|
||||
if integerValue, err := strconv.ParseInt(value, 10, 64); err == nil {
|
||||
return integerValue, true
|
||||
}
|
||||
if floatValue, err := strconv.ParseFloat(value, 64); err == nil {
|
||||
return floatValue, true
|
||||
}
|
||||
return nil, false
|
||||
}
|
||||
|
||||
func cloneAnyMap(input map[string]any) map[string]any {
|
||||
if input == nil {
|
||||
return map[string]any{}
|
||||
|
|
|
|||
|
|
@ -153,6 +153,54 @@ func TestJournal_QueryJournal_Good_TagFilter(t *testing.T) {
|
|||
assert.Equal(t, "session-b", tags["workspace"])
|
||||
}
|
||||
|
||||
func TestJournal_QueryJournal_Good_NumericFieldFilter(t *testing.T) {
|
||||
storeInstance, err := New(":memory:", WithJournal("http://127.0.0.1:8086", "core", "events"))
|
||||
require.NoError(t, err)
|
||||
defer storeInstance.Close()
|
||||
|
||||
require.True(t,
|
||||
storeInstance.CommitToJournal("session-a", map[string]any{"like": 1}, map[string]string{"workspace": "session-a"}).OK,
|
||||
)
|
||||
require.True(t,
|
||||
storeInstance.CommitToJournal("session-b", map[string]any{"like": 2}, map[string]string{"workspace": "session-b"}).OK,
|
||||
)
|
||||
|
||||
rows := requireResultRows(
|
||||
t,
|
||||
storeInstance.QueryJournal(`from(bucket: "events") |> range(start: -24h) |> filter(fn: (r) => r.like == 2)`),
|
||||
)
|
||||
require.Len(t, rows, 1)
|
||||
assert.Equal(t, "session-b", rows[0]["measurement"])
|
||||
|
||||
fields, ok := rows[0]["fields"].(map[string]any)
|
||||
require.True(t, ok, "unexpected fields type: %T", rows[0]["fields"])
|
||||
assert.Equal(t, float64(2), fields["like"])
|
||||
}
|
||||
|
||||
func TestJournal_QueryJournal_Good_BooleanFieldFilter(t *testing.T) {
|
||||
storeInstance, err := New(":memory:", WithJournal("http://127.0.0.1:8086", "core", "events"))
|
||||
require.NoError(t, err)
|
||||
defer storeInstance.Close()
|
||||
|
||||
require.True(t,
|
||||
storeInstance.CommitToJournal("session-a", map[string]any{"complete": false}, map[string]string{"workspace": "session-a"}).OK,
|
||||
)
|
||||
require.True(t,
|
||||
storeInstance.CommitToJournal("session-b", map[string]any{"complete": true}, map[string]string{"workspace": "session-b"}).OK,
|
||||
)
|
||||
|
||||
rows := requireResultRows(
|
||||
t,
|
||||
storeInstance.QueryJournal(`from(bucket: "events") |> range(start: -24h) |> filter(fn: (r) => r["complete"] == true)`),
|
||||
)
|
||||
require.Len(t, rows, 1)
|
||||
assert.Equal(t, "session-b", rows[0]["measurement"])
|
||||
|
||||
fields, ok := rows[0]["fields"].(map[string]any)
|
||||
require.True(t, ok, "unexpected fields type: %T", rows[0]["fields"])
|
||||
assert.Equal(t, true, fields["complete"])
|
||||
}
|
||||
|
||||
func TestJournal_QueryJournal_Good_BucketFilter(t *testing.T) {
|
||||
storeInstance, err := New(":memory:")
|
||||
require.NoError(t, err)
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue