Merge pull request '[agent/codex:gpt-5.4-mini] Read ~/spec/code/core/go/store/RFC.md fully. Find features d...' (#114) from agent/read---spec-code-core-go-store-rfc-md-fu into dev
This commit is contained in:
commit
e9527e4b76
4 changed files with 93 additions and 3 deletions
|
|
@ -57,7 +57,7 @@ func (storeInstance *Store) Compact(options CompactOptions) core.Result {
|
|||
}
|
||||
|
||||
rows, err := storeInstance.sqliteDatabase.Query(
|
||||
"SELECT entry_id, bucket_name, measurement, fields_json, tags_json, committed_at FROM "+journalEntriesTableName+" WHERE archived_at IS NULL AND committed_at < ? ORDER BY committed_at",
|
||||
"SELECT entry_id, bucket_name, measurement, fields_json, tags_json, committed_at FROM "+journalEntriesTableName+" WHERE archived_at IS NULL AND committed_at < ? ORDER BY committed_at, entry_id",
|
||||
options.Before.UnixMilli(),
|
||||
)
|
||||
if err != nil {
|
||||
|
|
|
|||
|
|
@ -126,3 +126,60 @@ func TestCompact_Compact_Good_NoRows(t *testing.T) {
|
|||
require.True(t, result.OK, "compact failed: %v", result.Value)
|
||||
assert.Equal(t, "", result.Value)
|
||||
}
|
||||
|
||||
func TestCompact_Compact_Good_DeterministicOrderingForSameTimestamp(t *testing.T) {
|
||||
outputDirectory := useArchiveOutputDirectory(t)
|
||||
|
||||
storeInstance, err := New(":memory:", WithJournal("http://127.0.0.1:8086", "core", "events"))
|
||||
require.NoError(t, err)
|
||||
defer storeInstance.Close()
|
||||
require.NoError(t, ensureJournalSchema(storeInstance.sqliteDatabase))
|
||||
|
||||
committedAt := time.Now().Add(-48 * time.Hour).UnixMilli()
|
||||
require.NoError(t, insertJournalEntry(
|
||||
storeInstance.sqliteDatabase,
|
||||
"events",
|
||||
"session-b",
|
||||
`{"like":2}`,
|
||||
`{"workspace":"session-b"}`,
|
||||
committedAt,
|
||||
))
|
||||
require.NoError(t, insertJournalEntry(
|
||||
storeInstance.sqliteDatabase,
|
||||
"events",
|
||||
"session-a",
|
||||
`{"like":1}`,
|
||||
`{"workspace":"session-a"}`,
|
||||
committedAt,
|
||||
))
|
||||
|
||||
result := storeInstance.Compact(CompactOptions{
|
||||
Before: time.Now().Add(-24 * time.Hour),
|
||||
Output: outputDirectory,
|
||||
Format: "gzip",
|
||||
})
|
||||
require.True(t, result.OK, "compact failed: %v", result.Value)
|
||||
|
||||
archivePath, ok := result.Value.(string)
|
||||
require.True(t, ok, "unexpected archive path type: %T", result.Value)
|
||||
|
||||
archiveData := requireCoreReadBytes(t, archivePath)
|
||||
reader, err := gzip.NewReader(bytes.NewReader(archiveData))
|
||||
require.NoError(t, err)
|
||||
defer reader.Close()
|
||||
|
||||
decompressedData, err := io.ReadAll(reader)
|
||||
require.NoError(t, err)
|
||||
lines := core.Split(core.Trim(string(decompressedData)), "\n")
|
||||
require.Len(t, lines, 2)
|
||||
|
||||
firstArchivedRow := make(map[string]any)
|
||||
unmarshalResult := core.JSONUnmarshalString(lines[0], &firstArchivedRow)
|
||||
require.True(t, unmarshalResult.OK, "archive line unmarshal failed: %v", unmarshalResult.Value)
|
||||
assert.Equal(t, "session-b", firstArchivedRow["measurement"])
|
||||
|
||||
secondArchivedRow := make(map[string]any)
|
||||
unmarshalResult = core.JSONUnmarshalString(lines[1], &secondArchivedRow)
|
||||
require.True(t, unmarshalResult.OK, "archive line unmarshal failed: %v", unmarshalResult.Value)
|
||||
assert.Equal(t, "session-a", secondArchivedRow["measurement"])
|
||||
}
|
||||
|
|
|
|||
|
|
@ -109,7 +109,7 @@ func (storeInstance *Store) QueryJournal(flux string) core.Result {
|
|||
trimmedQuery := core.Trim(flux)
|
||||
if trimmedQuery == "" {
|
||||
return storeInstance.queryJournalRows(
|
||||
"SELECT bucket_name, measurement, fields_json, tags_json, committed_at, archived_at FROM " + journalEntriesTableName + " WHERE archived_at IS NULL ORDER BY committed_at",
|
||||
"SELECT bucket_name, measurement, fields_json, tags_json, committed_at, archived_at FROM " + journalEntriesTableName + " WHERE archived_at IS NULL ORDER BY committed_at, entry_id",
|
||||
)
|
||||
}
|
||||
if isRawSQLJournalQuery(trimmedQuery) {
|
||||
|
|
@ -206,7 +206,7 @@ func (storeInstance *Store) queryJournalFlux(flux string) (string, []any, error)
|
|||
}
|
||||
}
|
||||
|
||||
queryBuilder.WriteString(" ORDER BY committed_at")
|
||||
queryBuilder.WriteString(" ORDER BY committed_at, entry_id")
|
||||
return queryBuilder.String(), queryArguments, nil
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -153,6 +153,39 @@ func TestJournal_QueryJournal_Good_BucketFilter(t *testing.T) {
|
|||
assert.Equal(t, "events", rows[0]["bucket_name"])
|
||||
}
|
||||
|
||||
func TestJournal_QueryJournal_Good_DeterministicOrderingForSameTimestamp(t *testing.T) {
|
||||
storeInstance, err := New(":memory:")
|
||||
require.NoError(t, err)
|
||||
defer storeInstance.Close()
|
||||
require.NoError(t, ensureJournalSchema(storeInstance.sqliteDatabase))
|
||||
|
||||
committedAt := time.Date(2026, 3, 30, 12, 0, 0, 0, time.UTC).UnixMilli()
|
||||
require.NoError(t, insertJournalEntry(
|
||||
storeInstance.sqliteDatabase,
|
||||
"events",
|
||||
"session-b",
|
||||
`{"like":2}`,
|
||||
`{"workspace":"session-b"}`,
|
||||
committedAt,
|
||||
))
|
||||
require.NoError(t, insertJournalEntry(
|
||||
storeInstance.sqliteDatabase,
|
||||
"events",
|
||||
"session-a",
|
||||
`{"like":1}`,
|
||||
`{"workspace":"session-a"}`,
|
||||
committedAt,
|
||||
))
|
||||
|
||||
rows := requireResultRows(
|
||||
t,
|
||||
storeInstance.QueryJournal(""),
|
||||
)
|
||||
require.Len(t, rows, 2)
|
||||
assert.Equal(t, "session-b", rows[0]["measurement"])
|
||||
assert.Equal(t, "session-a", rows[1]["measurement"])
|
||||
}
|
||||
|
||||
func TestJournal_QueryJournal_Good_AbsoluteRangeWithStop(t *testing.T) {
|
||||
storeInstance, err := New(":memory:", WithJournal("http://127.0.0.1:8086", "core", "events"))
|
||||
require.NoError(t, err)
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue