[agent/codex:gpt-5.4] Implement docs/RFC-STORE.md using docs/RFC-CORE-008-AGENT-EX... #29

Merged
Virgil merged 1 commit from agent/implement-docs-rfc-store-md-using-docs-r into dev 2026-03-30 21:08:06 +00:00
6 changed files with 123 additions and 2 deletions

View file

@ -199,9 +199,21 @@ func jsonString(value any, operation, message string) (string, error) {
}
func fluxStartTime(value string) (time.Time, error) {
value = core.Trim(value)
if value == "" {
return time.Time{}, core.E("store.fluxStartTime", "range value is empty", nil)
}
value = firstString(core.Split(value, ","))
value = core.Trim(value)
if core.HasPrefix(value, "time(v:") && core.HasSuffix(value, ")") {
value = core.Trim(core.TrimSuffix(core.TrimPrefix(value, "time(v:"), ")"))
}
if core.HasPrefix(value, `"`) && core.HasSuffix(value, `"`) {
value = core.TrimSuffix(core.TrimPrefix(value, `"`), `"`)
}
if value == "now()" {
return time.Now(), nil
}
if core.HasSuffix(value, "d") {
days, err := strconv.Atoi(core.TrimSuffix(value, "d"))
if err != nil {
@ -210,10 +222,14 @@ func fluxStartTime(value string) (time.Time, error) {
return time.Now().Add(time.Duration(days) * 24 * time.Hour), nil
}
lookback, err := time.ParseDuration(value)
if err == nil {
return time.Now().Add(lookback), nil
}
parsedTime, err := time.Parse(time.RFC3339Nano, value)
if err != nil {
return time.Time{}, err
}
return time.Now().Add(lookback), nil
return parsedTime, nil
}
func quotedSubmatch(pattern *regexp.Regexp, value string) string {

View file

@ -2,6 +2,7 @@ package store
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@ -58,6 +59,39 @@ func TestJournal_QueryJournal_Good_FluxFilters(t *testing.T) {
assert.Equal(t, float64(2), fields["like"])
}
func TestJournal_QueryJournal_Good_AbsoluteRangeWithStop(t *testing.T) {
storeInstance, err := New(":memory:", WithJournal("http://127.0.0.1:8086", "core", "events"))
require.NoError(t, err)
defer storeInstance.Close()
require.True(t,
storeInstance.CommitToJournal("session-a", map[string]any{"like": 1}, map[string]string{"workspace": "session-a"}).OK,
)
require.True(t,
storeInstance.CommitToJournal("session-b", map[string]any{"like": 2}, map[string]string{"workspace": "session-b"}).OK,
)
_, err = storeInstance.database.Exec(
"UPDATE "+journalEntriesTableName+" SET committed_at = ? WHERE measurement = ?",
time.Date(2026, 3, 29, 12, 0, 0, 0, time.UTC).UnixMilli(),
"session-a",
)
require.NoError(t, err)
_, err = storeInstance.database.Exec(
"UPDATE "+journalEntriesTableName+" SET committed_at = ? WHERE measurement = ?",
time.Date(2026, 3, 30, 12, 0, 0, 0, time.UTC).UnixMilli(),
"session-b",
)
require.NoError(t, err)
rows := requireResultRows(
t,
storeInstance.QueryJournal(`from(bucket: "events") |> range(start: "2026-03-30T00:00:00Z", stop: now())`),
)
require.Len(t, rows, 1)
assert.Equal(t, "session-b", rows[0]["measurement"])
}
func TestJournal_CommitToJournal_Bad_EmptyMeasurement(t *testing.T) {
storeInstance, err := New(":memory:")
require.NoError(t, err)

View file

@ -128,7 +128,7 @@ func (storeInstance *Store) Get(group, key string) (string, error) {
return "", core.E("store.Get", "query row", err)
}
if expiresAt.Valid && expiresAt.Int64 <= time.Now().UnixMilli() {
if _, err := storeInstance.database.Exec("DELETE FROM "+entriesTableName+" WHERE "+entryGroupColumn+" = ? AND "+entryKeyColumn+" = ?", group, key); err != nil {
if err := storeInstance.Delete(group, key); err != nil {
return "", core.E("store.Get", "delete expired row", err)
}
return "", core.E("store.Get", core.Concat(group, "/", key), NotFoundError)

View file

@ -1088,6 +1088,33 @@ func TestStore_SetWithTTL_Good_ExpiresOnGet(t *testing.T) {
assert.True(t, core.Is(err, NotFoundError), "expired key should be NotFoundError")
}
func TestStore_SetWithTTL_Good_ExpiresOnGetEmitsDeleteEvent(t *testing.T) {
storeInstance, _ := New(":memory:")
defer storeInstance.Close()
watcher := storeInstance.Watch("g", "ephemeral")
defer storeInstance.Unwatch(watcher)
require.NoError(t, storeInstance.SetWithTTL("g", "ephemeral", "gone-soon", 1*time.Millisecond))
<-watcher.Events
time.Sleep(5 * time.Millisecond)
_, err := storeInstance.Get("g", "ephemeral")
require.Error(t, err)
assert.True(t, core.Is(err, NotFoundError), "expired key should be NotFoundError")
select {
case event := <-watcher.Events:
assert.Equal(t, EventDelete, event.Type)
assert.Equal(t, "g", event.Group)
assert.Equal(t, "ephemeral", event.Key)
assert.Empty(t, event.Value)
case <-time.After(time.Second):
t.Fatal("timed out waiting for lazy expiry delete event")
}
}
func TestStore_SetWithTTL_Good_ExcludedFromCount(t *testing.T) {
storeInstance, _ := New(":memory:")
defer storeInstance.Close()

View file

@ -294,6 +294,13 @@ func (storeInstance *Store) commitWorkspaceAggregate(workspaceName string, field
return core.E("store.Workspace.Commit", "commit transaction", err)
}
committed = true
storeInstance.notify(Event{
Type: EventSet,
Group: workspaceSummaryGroup(workspaceName),
Key: "summary",
Value: fieldsJSON,
Timestamp: time.Now(),
})
return nil
}

View file

@ -2,6 +2,7 @@ package store
import (
"testing"
"time"
core "dappco.re/go/core"
"github.com/stretchr/testify/assert"
@ -110,6 +111,42 @@ func TestWorkspace_Commit_Good_JournalAndSummary(t *testing.T) {
assert.Equal(t, "scroll-session", tags["workspace"])
}
func TestWorkspace_Commit_Good_EmitsSummaryEvent(t *testing.T) {
useWorkspaceStateDirectory(t)
storeInstance, err := New(":memory:", WithJournal("http://127.0.0.1:8086", "core", "events"))
require.NoError(t, err)
defer storeInstance.Close()
watcher := storeInstance.Watch(workspaceSummaryGroup("scroll-session"), "summary")
defer storeInstance.Unwatch(watcher)
workspace, err := storeInstance.NewWorkspace("scroll-session")
require.NoError(t, err)
require.NoError(t, workspace.Put("like", map[string]any{"user": "@alice"}))
require.NoError(t, workspace.Put("profile_match", map[string]any{"user": "@charlie"}))
result := workspace.Commit()
require.True(t, result.OK, "workspace commit failed: %v", result.Value)
select {
case event := <-watcher.Events:
assert.Equal(t, EventSet, event.Type)
assert.Equal(t, workspaceSummaryGroup("scroll-session"), event.Group)
assert.Equal(t, "summary", event.Key)
assert.False(t, event.Timestamp.IsZero())
summary := make(map[string]any)
summaryResult := core.JSONUnmarshalString(event.Value, &summary)
require.True(t, summaryResult.OK, "summary event unmarshal failed: %v", summaryResult.Value)
assert.Equal(t, float64(1), summary["like"])
assert.Equal(t, float64(1), summary["profile_match"])
case <-time.After(time.Second):
t.Fatal("timed out waiting for workspace summary event")
}
}
func TestWorkspace_Discard_Good_Idempotent(t *testing.T) {
useWorkspaceStateDirectory(t)