From 4415c358462c245fe468173e104be3dede6c6892 Mon Sep 17 00:00:00 2001 From: Virgil Date: Sat, 4 Apr 2026 09:54:03 +0000 Subject: [PATCH] fix(journal): stabilise journal ordering Order journal queries and archive compaction by committed_at, entry_id so rows with identical timestamps are returned predictably. Co-Authored-By: Virgil --- compact.go | 2 +- compact_test.go | 57 +++++++++++++++++++++++++++++++++++++++++++++++++ journal.go | 4 ++-- journal_test.go | 33 ++++++++++++++++++++++++++++ 4 files changed, 93 insertions(+), 3 deletions(-) diff --git a/compact.go b/compact.go index 23b198f..0eb4b08 100644 --- a/compact.go +++ b/compact.go @@ -57,7 +57,7 @@ func (storeInstance *Store) Compact(options CompactOptions) core.Result { } rows, err := storeInstance.sqliteDatabase.Query( - "SELECT entry_id, bucket_name, measurement, fields_json, tags_json, committed_at FROM "+journalEntriesTableName+" WHERE archived_at IS NULL AND committed_at < ? ORDER BY committed_at", + "SELECT entry_id, bucket_name, measurement, fields_json, tags_json, committed_at FROM "+journalEntriesTableName+" WHERE archived_at IS NULL AND committed_at < ? ORDER BY committed_at, entry_id", options.Before.UnixMilli(), ) if err != nil { diff --git a/compact_test.go b/compact_test.go index ff86130..c4a71d9 100644 --- a/compact_test.go +++ b/compact_test.go @@ -126,3 +126,60 @@ func TestCompact_Compact_Good_NoRows(t *testing.T) { require.True(t, result.OK, "compact failed: %v", result.Value) assert.Equal(t, "", result.Value) } + +func TestCompact_Compact_Good_DeterministicOrderingForSameTimestamp(t *testing.T) { + outputDirectory := useArchiveOutputDirectory(t) + + storeInstance, err := New(":memory:", WithJournal("http://127.0.0.1:8086", "core", "events")) + require.NoError(t, err) + defer storeInstance.Close() + require.NoError(t, ensureJournalSchema(storeInstance.sqliteDatabase)) + + committedAt := time.Now().Add(-48 * time.Hour).UnixMilli() + require.NoError(t, insertJournalEntry( + storeInstance.sqliteDatabase, + "events", + "session-b", + `{"like":2}`, + `{"workspace":"session-b"}`, + committedAt, + )) + require.NoError(t, insertJournalEntry( + storeInstance.sqliteDatabase, + "events", + "session-a", + `{"like":1}`, + `{"workspace":"session-a"}`, + committedAt, + )) + + result := storeInstance.Compact(CompactOptions{ + Before: time.Now().Add(-24 * time.Hour), + Output: outputDirectory, + Format: "gzip", + }) + require.True(t, result.OK, "compact failed: %v", result.Value) + + archivePath, ok := result.Value.(string) + require.True(t, ok, "unexpected archive path type: %T", result.Value) + + archiveData := requireCoreReadBytes(t, archivePath) + reader, err := gzip.NewReader(bytes.NewReader(archiveData)) + require.NoError(t, err) + defer reader.Close() + + decompressedData, err := io.ReadAll(reader) + require.NoError(t, err) + lines := core.Split(core.Trim(string(decompressedData)), "\n") + require.Len(t, lines, 2) + + firstArchivedRow := make(map[string]any) + unmarshalResult := core.JSONUnmarshalString(lines[0], &firstArchivedRow) + require.True(t, unmarshalResult.OK, "archive line unmarshal failed: %v", unmarshalResult.Value) + assert.Equal(t, "session-b", firstArchivedRow["measurement"]) + + secondArchivedRow := make(map[string]any) + unmarshalResult = core.JSONUnmarshalString(lines[1], &secondArchivedRow) + require.True(t, unmarshalResult.OK, "archive line unmarshal failed: %v", unmarshalResult.Value) + assert.Equal(t, "session-a", secondArchivedRow["measurement"]) +} diff --git a/journal.go b/journal.go index 8f22e66..a770c9e 100644 --- a/journal.go +++ b/journal.go @@ -109,7 +109,7 @@ func (storeInstance *Store) QueryJournal(flux string) core.Result { trimmedQuery := core.Trim(flux) if trimmedQuery == "" { return storeInstance.queryJournalRows( - "SELECT bucket_name, measurement, fields_json, tags_json, committed_at, archived_at FROM " + journalEntriesTableName + " WHERE archived_at IS NULL ORDER BY committed_at", + "SELECT bucket_name, measurement, fields_json, tags_json, committed_at, archived_at FROM " + journalEntriesTableName + " WHERE archived_at IS NULL ORDER BY committed_at, entry_id", ) } if isRawSQLJournalQuery(trimmedQuery) { @@ -206,7 +206,7 @@ func (storeInstance *Store) queryJournalFlux(flux string) (string, []any, error) } } - queryBuilder.WriteString(" ORDER BY committed_at") + queryBuilder.WriteString(" ORDER BY committed_at, entry_id") return queryBuilder.String(), queryArguments, nil } diff --git a/journal_test.go b/journal_test.go index 7ffc08f..2a2d3dc 100644 --- a/journal_test.go +++ b/journal_test.go @@ -153,6 +153,39 @@ func TestJournal_QueryJournal_Good_BucketFilter(t *testing.T) { assert.Equal(t, "events", rows[0]["bucket_name"]) } +func TestJournal_QueryJournal_Good_DeterministicOrderingForSameTimestamp(t *testing.T) { + storeInstance, err := New(":memory:") + require.NoError(t, err) + defer storeInstance.Close() + require.NoError(t, ensureJournalSchema(storeInstance.sqliteDatabase)) + + committedAt := time.Date(2026, 3, 30, 12, 0, 0, 0, time.UTC).UnixMilli() + require.NoError(t, insertJournalEntry( + storeInstance.sqliteDatabase, + "events", + "session-b", + `{"like":2}`, + `{"workspace":"session-b"}`, + committedAt, + )) + require.NoError(t, insertJournalEntry( + storeInstance.sqliteDatabase, + "events", + "session-a", + `{"like":1}`, + `{"workspace":"session-a"}`, + committedAt, + )) + + rows := requireResultRows( + t, + storeInstance.QueryJournal(""), + ) + require.Len(t, rows, 2) + assert.Equal(t, "session-b", rows[0]["measurement"]) + assert.Equal(t, "session-a", rows[1]["measurement"]) +} + func TestJournal_QueryJournal_Good_AbsoluteRangeWithStop(t *testing.T) { storeInstance, err := New(":memory:", WithJournal("http://127.0.0.1:8086", "core", "events")) require.NoError(t, err) -- 2.45.3