[agent/codex:gpt-5.4] Implement docs/RFC-STORE.md using docs/RFC-CORE-008-AGENT-EX... #29
6 changed files with 123 additions and 2 deletions
18
journal.go
18
journal.go
|
|
@ -199,9 +199,21 @@ func jsonString(value any, operation, message string) (string, error) {
|
|||
}
|
||||
|
||||
func fluxStartTime(value string) (time.Time, error) {
|
||||
value = core.Trim(value)
|
||||
if value == "" {
|
||||
return time.Time{}, core.E("store.fluxStartTime", "range value is empty", nil)
|
||||
}
|
||||
value = firstString(core.Split(value, ","))
|
||||
value = core.Trim(value)
|
||||
if core.HasPrefix(value, "time(v:") && core.HasSuffix(value, ")") {
|
||||
value = core.Trim(core.TrimSuffix(core.TrimPrefix(value, "time(v:"), ")"))
|
||||
}
|
||||
if core.HasPrefix(value, `"`) && core.HasSuffix(value, `"`) {
|
||||
value = core.TrimSuffix(core.TrimPrefix(value, `"`), `"`)
|
||||
}
|
||||
if value == "now()" {
|
||||
return time.Now(), nil
|
||||
}
|
||||
if core.HasSuffix(value, "d") {
|
||||
days, err := strconv.Atoi(core.TrimSuffix(value, "d"))
|
||||
if err != nil {
|
||||
|
|
@ -210,10 +222,14 @@ func fluxStartTime(value string) (time.Time, error) {
|
|||
return time.Now().Add(time.Duration(days) * 24 * time.Hour), nil
|
||||
}
|
||||
lookback, err := time.ParseDuration(value)
|
||||
if err == nil {
|
||||
return time.Now().Add(lookback), nil
|
||||
}
|
||||
parsedTime, err := time.Parse(time.RFC3339Nano, value)
|
||||
if err != nil {
|
||||
return time.Time{}, err
|
||||
}
|
||||
return time.Now().Add(lookback), nil
|
||||
return parsedTime, nil
|
||||
}
|
||||
|
||||
func quotedSubmatch(pattern *regexp.Regexp, value string) string {
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@ package store
|
|||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
|
@ -58,6 +59,39 @@ func TestJournal_QueryJournal_Good_FluxFilters(t *testing.T) {
|
|||
assert.Equal(t, float64(2), fields["like"])
|
||||
}
|
||||
|
||||
func TestJournal_QueryJournal_Good_AbsoluteRangeWithStop(t *testing.T) {
|
||||
storeInstance, err := New(":memory:", WithJournal("http://127.0.0.1:8086", "core", "events"))
|
||||
require.NoError(t, err)
|
||||
defer storeInstance.Close()
|
||||
|
||||
require.True(t,
|
||||
storeInstance.CommitToJournal("session-a", map[string]any{"like": 1}, map[string]string{"workspace": "session-a"}).OK,
|
||||
)
|
||||
require.True(t,
|
||||
storeInstance.CommitToJournal("session-b", map[string]any{"like": 2}, map[string]string{"workspace": "session-b"}).OK,
|
||||
)
|
||||
|
||||
_, err = storeInstance.database.Exec(
|
||||
"UPDATE "+journalEntriesTableName+" SET committed_at = ? WHERE measurement = ?",
|
||||
time.Date(2026, 3, 29, 12, 0, 0, 0, time.UTC).UnixMilli(),
|
||||
"session-a",
|
||||
)
|
||||
require.NoError(t, err)
|
||||
_, err = storeInstance.database.Exec(
|
||||
"UPDATE "+journalEntriesTableName+" SET committed_at = ? WHERE measurement = ?",
|
||||
time.Date(2026, 3, 30, 12, 0, 0, 0, time.UTC).UnixMilli(),
|
||||
"session-b",
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
rows := requireResultRows(
|
||||
t,
|
||||
storeInstance.QueryJournal(`from(bucket: "events") |> range(start: "2026-03-30T00:00:00Z", stop: now())`),
|
||||
)
|
||||
require.Len(t, rows, 1)
|
||||
assert.Equal(t, "session-b", rows[0]["measurement"])
|
||||
}
|
||||
|
||||
func TestJournal_CommitToJournal_Bad_EmptyMeasurement(t *testing.T) {
|
||||
storeInstance, err := New(":memory:")
|
||||
require.NoError(t, err)
|
||||
|
|
|
|||
2
store.go
2
store.go
|
|
@ -128,7 +128,7 @@ func (storeInstance *Store) Get(group, key string) (string, error) {
|
|||
return "", core.E("store.Get", "query row", err)
|
||||
}
|
||||
if expiresAt.Valid && expiresAt.Int64 <= time.Now().UnixMilli() {
|
||||
if _, err := storeInstance.database.Exec("DELETE FROM "+entriesTableName+" WHERE "+entryGroupColumn+" = ? AND "+entryKeyColumn+" = ?", group, key); err != nil {
|
||||
if err := storeInstance.Delete(group, key); err != nil {
|
||||
return "", core.E("store.Get", "delete expired row", err)
|
||||
}
|
||||
return "", core.E("store.Get", core.Concat(group, "/", key), NotFoundError)
|
||||
|
|
|
|||
|
|
@ -1088,6 +1088,33 @@ func TestStore_SetWithTTL_Good_ExpiresOnGet(t *testing.T) {
|
|||
assert.True(t, core.Is(err, NotFoundError), "expired key should be NotFoundError")
|
||||
}
|
||||
|
||||
func TestStore_SetWithTTL_Good_ExpiresOnGetEmitsDeleteEvent(t *testing.T) {
|
||||
storeInstance, _ := New(":memory:")
|
||||
defer storeInstance.Close()
|
||||
|
||||
watcher := storeInstance.Watch("g", "ephemeral")
|
||||
defer storeInstance.Unwatch(watcher)
|
||||
|
||||
require.NoError(t, storeInstance.SetWithTTL("g", "ephemeral", "gone-soon", 1*time.Millisecond))
|
||||
<-watcher.Events
|
||||
|
||||
time.Sleep(5 * time.Millisecond)
|
||||
|
||||
_, err := storeInstance.Get("g", "ephemeral")
|
||||
require.Error(t, err)
|
||||
assert.True(t, core.Is(err, NotFoundError), "expired key should be NotFoundError")
|
||||
|
||||
select {
|
||||
case event := <-watcher.Events:
|
||||
assert.Equal(t, EventDelete, event.Type)
|
||||
assert.Equal(t, "g", event.Group)
|
||||
assert.Equal(t, "ephemeral", event.Key)
|
||||
assert.Empty(t, event.Value)
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("timed out waiting for lazy expiry delete event")
|
||||
}
|
||||
}
|
||||
|
||||
func TestStore_SetWithTTL_Good_ExcludedFromCount(t *testing.T) {
|
||||
storeInstance, _ := New(":memory:")
|
||||
defer storeInstance.Close()
|
||||
|
|
|
|||
|
|
@ -294,6 +294,13 @@ func (storeInstance *Store) commitWorkspaceAggregate(workspaceName string, field
|
|||
return core.E("store.Workspace.Commit", "commit transaction", err)
|
||||
}
|
||||
committed = true
|
||||
storeInstance.notify(Event{
|
||||
Type: EventSet,
|
||||
Group: workspaceSummaryGroup(workspaceName),
|
||||
Key: "summary",
|
||||
Value: fieldsJSON,
|
||||
Timestamp: time.Now(),
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@ package store
|
|||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
core "dappco.re/go/core"
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
|
@ -110,6 +111,42 @@ func TestWorkspace_Commit_Good_JournalAndSummary(t *testing.T) {
|
|||
assert.Equal(t, "scroll-session", tags["workspace"])
|
||||
}
|
||||
|
||||
func TestWorkspace_Commit_Good_EmitsSummaryEvent(t *testing.T) {
|
||||
useWorkspaceStateDirectory(t)
|
||||
|
||||
storeInstance, err := New(":memory:", WithJournal("http://127.0.0.1:8086", "core", "events"))
|
||||
require.NoError(t, err)
|
||||
defer storeInstance.Close()
|
||||
|
||||
watcher := storeInstance.Watch(workspaceSummaryGroup("scroll-session"), "summary")
|
||||
defer storeInstance.Unwatch(watcher)
|
||||
|
||||
workspace, err := storeInstance.NewWorkspace("scroll-session")
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NoError(t, workspace.Put("like", map[string]any{"user": "@alice"}))
|
||||
require.NoError(t, workspace.Put("profile_match", map[string]any{"user": "@charlie"}))
|
||||
|
||||
result := workspace.Commit()
|
||||
require.True(t, result.OK, "workspace commit failed: %v", result.Value)
|
||||
|
||||
select {
|
||||
case event := <-watcher.Events:
|
||||
assert.Equal(t, EventSet, event.Type)
|
||||
assert.Equal(t, workspaceSummaryGroup("scroll-session"), event.Group)
|
||||
assert.Equal(t, "summary", event.Key)
|
||||
assert.False(t, event.Timestamp.IsZero())
|
||||
|
||||
summary := make(map[string]any)
|
||||
summaryResult := core.JSONUnmarshalString(event.Value, &summary)
|
||||
require.True(t, summaryResult.OK, "summary event unmarshal failed: %v", summaryResult.Value)
|
||||
assert.Equal(t, float64(1), summary["like"])
|
||||
assert.Equal(t, float64(1), summary["profile_match"])
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("timed out waiting for workspace summary event")
|
||||
}
|
||||
}
|
||||
|
||||
func TestWorkspace_Discard_Good_Idempotent(t *testing.T) {
|
||||
useWorkspaceStateDirectory(t)
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue