go-store/journal_test.go
Virgil d8183f26b6
Some checks are pending
Security Scan / security (push) Waiting to run
Test / test (push) Waiting to run
fix: support scalar Flux journal filters
Co-Authored-By: Virgil <virgil@lethean.io>
2026-04-04 17:39:45 +00:00

337 lines
12 KiB
Go

package store
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestJournal_CommitToJournal_Good_WithQueryJournalSQL(t *testing.T) {
storeInstance, err := New(":memory:", WithJournal("http://127.0.0.1:8086", "core", "events"))
require.NoError(t, err)
defer storeInstance.Close()
first := storeInstance.CommitToJournal("session-a", map[string]any{"like": 4}, map[string]string{"workspace": "session-a"})
second := storeInstance.CommitToJournal("session-b", map[string]any{"profile_match": 2}, map[string]string{"workspace": "session-b"})
require.True(t, first.OK, "first journal commit failed: %v", first.Value)
require.True(t, second.OK, "second journal commit failed: %v", second.Value)
rows := requireResultRows(
t,
storeInstance.QueryJournal("SELECT bucket_name, measurement, fields_json, tags_json FROM journal_entries ORDER BY entry_id"),
)
require.Len(t, rows, 2)
assert.Equal(t, "events", rows[0]["bucket_name"])
assert.Equal(t, "session-a", rows[0]["measurement"])
fields, ok := rows[0]["fields"].(map[string]any)
require.True(t, ok, "unexpected fields type: %T", rows[0]["fields"])
assert.Equal(t, float64(4), fields["like"])
tags, ok := rows[1]["tags"].(map[string]string)
require.True(t, ok, "unexpected tags type: %T", rows[1]["tags"])
assert.Equal(t, "session-b", tags["workspace"])
}
func TestJournal_CommitToJournal_Good_ResultCopiesInputMaps(t *testing.T) {
storeInstance, err := New(":memory:", WithJournal("http://127.0.0.1:8086", "core", "events"))
require.NoError(t, err)
defer storeInstance.Close()
fields := map[string]any{"like": 4}
tags := map[string]string{"workspace": "session-a"}
result := storeInstance.CommitToJournal("session-a", fields, tags)
require.True(t, result.OK, "journal commit failed: %v", result.Value)
fields["like"] = 99
tags["workspace"] = "session-b"
value, ok := result.Value.(map[string]any)
require.True(t, ok, "unexpected result type: %T", result.Value)
resultFields, ok := value["fields"].(map[string]any)
require.True(t, ok, "unexpected fields type: %T", value["fields"])
assert.Equal(t, 4, resultFields["like"])
resultTags, ok := value["tags"].(map[string]string)
require.True(t, ok, "unexpected tags type: %T", value["tags"])
assert.Equal(t, "session-a", resultTags["workspace"])
}
func TestJournal_QueryJournal_Good_RawSQLWithCTE(t *testing.T) {
storeInstance, err := New(":memory:", WithJournal("http://127.0.0.1:8086", "core", "events"))
require.NoError(t, err)
defer storeInstance.Close()
require.True(t,
storeInstance.CommitToJournal("session-a", map[string]any{"like": 4}, map[string]string{"workspace": "session-a"}).OK,
)
rows := requireResultRows(
t,
storeInstance.QueryJournal(`
WITH journal_rows AS (
SELECT bucket_name, measurement, fields_json, tags_json, committed_at, archived_at
FROM journal_entries
)
SELECT bucket_name, measurement, fields_json, tags_json, committed_at, archived_at
FROM journal_rows
ORDER BY committed_at
`),
)
require.Len(t, rows, 1)
assert.Equal(t, "session-a", rows[0]["measurement"])
}
func TestJournal_QueryJournal_Good_PragmaSQL(t *testing.T) {
storeInstance, err := New(":memory:", WithJournal("http://127.0.0.1:8086", "core", "events"))
require.NoError(t, err)
defer storeInstance.Close()
rows := requireResultRows(
t,
storeInstance.QueryJournal("PRAGMA table_info(journal_entries)"),
)
require.NotEmpty(t, rows)
var columnNames []string
for _, row := range rows {
name, ok := row["name"].(string)
require.True(t, ok, "unexpected column name type: %T", row["name"])
columnNames = append(columnNames, name)
}
assert.Contains(t, columnNames, "bucket_name")
}
func TestJournal_QueryJournal_Good_FluxFilters(t *testing.T) {
storeInstance, err := New(":memory:", WithJournal("http://127.0.0.1:8086", "core", "events"))
require.NoError(t, err)
defer storeInstance.Close()
require.True(t,
storeInstance.CommitToJournal("session-a", map[string]any{"like": 1}, map[string]string{"workspace": "session-a"}).OK,
)
require.True(t,
storeInstance.CommitToJournal("session-b", map[string]any{"like": 2}, map[string]string{"workspace": "session-b"}).OK,
)
rows := requireResultRows(
t,
storeInstance.QueryJournal(`from(bucket: "events") |> range(start: -24h) |> filter(fn: (r) => r._measurement == "session-b")`),
)
require.Len(t, rows, 1)
assert.Equal(t, "session-b", rows[0]["measurement"])
fields, ok := rows[0]["fields"].(map[string]any)
require.True(t, ok, "unexpected fields type: %T", rows[0]["fields"])
assert.Equal(t, float64(2), fields["like"])
}
func TestJournal_QueryJournal_Good_TagFilter(t *testing.T) {
storeInstance, err := New(":memory:", WithJournal("http://127.0.0.1:8086", "core", "events"))
require.NoError(t, err)
defer storeInstance.Close()
require.True(t,
storeInstance.CommitToJournal("session-a", map[string]any{"like": 1}, map[string]string{"workspace": "session-a"}).OK,
)
require.True(t,
storeInstance.CommitToJournal("session-b", map[string]any{"like": 2}, map[string]string{"workspace": "session-b"}).OK,
)
rows := requireResultRows(
t,
storeInstance.QueryJournal(`from(bucket: "events") |> range(start: -24h) |> filter(fn: (r) => r.workspace == "session-b")`),
)
require.Len(t, rows, 1)
assert.Equal(t, "session-b", rows[0]["measurement"])
tags, ok := rows[0]["tags"].(map[string]string)
require.True(t, ok, "unexpected tags type: %T", rows[0]["tags"])
assert.Equal(t, "session-b", tags["workspace"])
}
func TestJournal_QueryJournal_Good_NumericFieldFilter(t *testing.T) {
storeInstance, err := New(":memory:", WithJournal("http://127.0.0.1:8086", "core", "events"))
require.NoError(t, err)
defer storeInstance.Close()
require.True(t,
storeInstance.CommitToJournal("session-a", map[string]any{"like": 1}, map[string]string{"workspace": "session-a"}).OK,
)
require.True(t,
storeInstance.CommitToJournal("session-b", map[string]any{"like": 2}, map[string]string{"workspace": "session-b"}).OK,
)
rows := requireResultRows(
t,
storeInstance.QueryJournal(`from(bucket: "events") |> range(start: -24h) |> filter(fn: (r) => r.like == 2)`),
)
require.Len(t, rows, 1)
assert.Equal(t, "session-b", rows[0]["measurement"])
fields, ok := rows[0]["fields"].(map[string]any)
require.True(t, ok, "unexpected fields type: %T", rows[0]["fields"])
assert.Equal(t, float64(2), fields["like"])
}
func TestJournal_QueryJournal_Good_BooleanFieldFilter(t *testing.T) {
storeInstance, err := New(":memory:", WithJournal("http://127.0.0.1:8086", "core", "events"))
require.NoError(t, err)
defer storeInstance.Close()
require.True(t,
storeInstance.CommitToJournal("session-a", map[string]any{"complete": false}, map[string]string{"workspace": "session-a"}).OK,
)
require.True(t,
storeInstance.CommitToJournal("session-b", map[string]any{"complete": true}, map[string]string{"workspace": "session-b"}).OK,
)
rows := requireResultRows(
t,
storeInstance.QueryJournal(`from(bucket: "events") |> range(start: -24h) |> filter(fn: (r) => r["complete"] == true)`),
)
require.Len(t, rows, 1)
assert.Equal(t, "session-b", rows[0]["measurement"])
fields, ok := rows[0]["fields"].(map[string]any)
require.True(t, ok, "unexpected fields type: %T", rows[0]["fields"])
assert.Equal(t, true, fields["complete"])
}
func TestJournal_QueryJournal_Good_BucketFilter(t *testing.T) {
storeInstance, err := New(":memory:")
require.NoError(t, err)
defer storeInstance.Close()
require.True(t,
storeInstance.CommitToJournal("session-a", map[string]any{"like": 1}, map[string]string{"workspace": "session-a"}).OK,
)
require.NoError(t, commitJournalEntry(
storeInstance.sqliteDatabase,
"events",
"session-b",
`{"like":2}`,
`{"workspace":"session-b"}`,
time.Now().UnixMilli(),
))
rows := requireResultRows(
t,
storeInstance.QueryJournal(`from(bucket: "events") |> range(start: -24h) |> filter(fn: (r) => r._bucket == "events")`),
)
require.Len(t, rows, 1)
assert.Equal(t, "session-b", rows[0]["measurement"])
assert.Equal(t, "events", rows[0]["bucket_name"])
}
func TestJournal_QueryJournal_Good_DeterministicOrderingForSameTimestamp(t *testing.T) {
storeInstance, err := New(":memory:")
require.NoError(t, err)
defer storeInstance.Close()
require.NoError(t, ensureJournalSchema(storeInstance.sqliteDatabase))
committedAt := time.Date(2026, 3, 30, 12, 0, 0, 0, time.UTC).UnixMilli()
require.NoError(t, commitJournalEntry(
storeInstance.sqliteDatabase,
"events",
"session-b",
`{"like":2}`,
`{"workspace":"session-b"}`,
committedAt,
))
require.NoError(t, commitJournalEntry(
storeInstance.sqliteDatabase,
"events",
"session-a",
`{"like":1}`,
`{"workspace":"session-a"}`,
committedAt,
))
rows := requireResultRows(
t,
storeInstance.QueryJournal(""),
)
require.Len(t, rows, 2)
assert.Equal(t, "session-b", rows[0]["measurement"])
assert.Equal(t, "session-a", rows[1]["measurement"])
}
func TestJournal_QueryJournal_Good_AbsoluteRangeWithStop(t *testing.T) {
storeInstance, err := New(":memory:", WithJournal("http://127.0.0.1:8086", "core", "events"))
require.NoError(t, err)
defer storeInstance.Close()
require.True(t,
storeInstance.CommitToJournal("session-a", map[string]any{"like": 1}, map[string]string{"workspace": "session-a"}).OK,
)
require.True(t,
storeInstance.CommitToJournal("session-b", map[string]any{"like": 2}, map[string]string{"workspace": "session-b"}).OK,
)
_, err = storeInstance.sqliteDatabase.Exec(
"UPDATE "+journalEntriesTableName+" SET committed_at = ? WHERE measurement = ?",
time.Date(2026, 3, 29, 12, 0, 0, 0, time.UTC).UnixMilli(),
"session-a",
)
require.NoError(t, err)
_, err = storeInstance.sqliteDatabase.Exec(
"UPDATE "+journalEntriesTableName+" SET committed_at = ? WHERE measurement = ?",
time.Date(2026, 3, 30, 12, 0, 0, 0, time.UTC).UnixMilli(),
"session-b",
)
require.NoError(t, err)
rows := requireResultRows(
t,
storeInstance.QueryJournal(`from(bucket: "events") |> range(start: "2026-03-30T00:00:00Z", stop: now())`),
)
require.Len(t, rows, 1)
assert.Equal(t, "session-b", rows[0]["measurement"])
}
func TestJournal_QueryJournal_Good_AbsoluteRangeHonoursStop(t *testing.T) {
storeInstance, err := New(":memory:", WithJournal("http://127.0.0.1:8086", "core", "events"))
require.NoError(t, err)
defer storeInstance.Close()
require.True(t,
storeInstance.CommitToJournal("session-a", map[string]any{"like": 1}, map[string]string{"workspace": "session-a"}).OK,
)
require.True(t,
storeInstance.CommitToJournal("session-b", map[string]any{"like": 2}, map[string]string{"workspace": "session-b"}).OK,
)
_, err = storeInstance.sqliteDatabase.Exec(
"UPDATE "+journalEntriesTableName+" SET committed_at = ? WHERE measurement = ?",
time.Date(2026, 3, 29, 12, 0, 0, 0, time.UTC).UnixMilli(),
"session-a",
)
require.NoError(t, err)
_, err = storeInstance.sqliteDatabase.Exec(
"UPDATE "+journalEntriesTableName+" SET committed_at = ? WHERE measurement = ?",
time.Date(2026, 3, 30, 12, 0, 0, 0, time.UTC).UnixMilli(),
"session-b",
)
require.NoError(t, err)
rows := requireResultRows(
t,
storeInstance.QueryJournal(`from(bucket: "events") |> range(start: "2026-03-29T00:00:00Z", stop: "2026-03-30T00:00:00Z")`),
)
require.Len(t, rows, 1)
assert.Equal(t, "session-a", rows[0]["measurement"])
}
func TestJournal_CommitToJournal_Bad_EmptyMeasurement(t *testing.T) {
storeInstance, err := New(":memory:")
require.NoError(t, err)
defer storeInstance.Close()
result := storeInstance.CommitToJournal("", map[string]any{"like": 1}, map[string]string{"workspace": "missing"})
require.False(t, result.OK)
assert.Contains(t, result.Value.(error).Error(), "measurement is empty")
}