From d61c74d7e0e3d35df9ec4c6b15617c4b63aab43e Mon Sep 17 00:00:00 2001 From: Virgil Date: Wed, 1 Apr 2026 23:46:08 +0000 Subject: [PATCH] feat(agentic): persist sync history records Co-Authored-By: Virgil --- pkg/agentic/sync.go | 60 ++++++++++++++++++++++++++++++++++++++++ pkg/agentic/sync_test.go | 38 +++++++++++++++++++++++++ 2 files changed, 98 insertions(+) diff --git a/pkg/agentic/sync.go b/pkg/agentic/sync.go index fa15082..165e9fd 100644 --- a/pkg/agentic/sync.go +++ b/pkg/agentic/sync.go @@ -30,6 +30,15 @@ type SyncPullOutput struct { Context []map[string]any `json:"context"` } +// record := agentic.SyncRecord{AgentID: "codex", Direction: "push", ItemsCount: 3, PayloadSize: 512, SyncedAt: "2026-03-31T12:00:00Z"} +type SyncRecord struct { + AgentID string `json:"agent_id"` + Direction string `json:"direction"` + PayloadSize int `json:"payload_size"` + ItemsCount int `json:"items_count"` + SyncedAt string `json:"synced_at"` +} + type syncQueuedPush struct { AgentID string `json:"agent_id"` Dispatches []map[string]any `json:"dispatches"` @@ -108,6 +117,10 @@ func (s *PrepSubsystem) syncPushInput(ctx context.Context, input SyncPushInput) } synced += len(queued.Dispatches) recordSyncPush(time.Now()) + recordSyncHistory("push", queued.AgentID, len(core.JSONMarshalString(map[string]any{ + "agent_id": queued.AgentID, + "dispatches": queued.Dispatches, + })), len(queued.Dispatches), time.Now()) } writeSyncQueue(nil) @@ -148,6 +161,7 @@ func (s *PrepSubsystem) syncPullInput(ctx context.Context, input SyncPullInput) contextData := syncContextPayload(response) writeSyncContext(contextData) recordSyncPull(time.Now()) + recordSyncHistory("pull", agentID, len(result.Value.(string)), len(contextData), time.Now()) return SyncPullOutput{ Success: true, @@ -352,6 +366,52 @@ func writeSyncStatusState(state syncStatusState) { fs.WriteAtomic(syncStatusPath(), core.JSONMarshalString(state)) } +func syncRecordsPath() string { + return core.JoinPath(syncStateDir(), "records.json") +} + +func readSyncRecords() []SyncRecord { + var records []SyncRecord + result := fs.Read(syncRecordsPath()) + if !result.OK { + return records + } + + parseResult := core.JSONUnmarshalString(result.Value.(string), &records) + if !parseResult.OK { + return []SyncRecord{} + } + + return records +} + +func writeSyncRecords(records []SyncRecord) { + fs.EnsureDir(syncStateDir()) + fs.WriteAtomic(syncRecordsPath(), core.JSONMarshalString(records)) +} + +func recordSyncHistory(direction, agentID string, payloadSize, itemsCount int, at time.Time) { + direction = core.Trim(direction) + if direction == "" { + return + } + + record := SyncRecord{ + AgentID: core.Trim(agentID), + Direction: direction, + PayloadSize: payloadSize, + ItemsCount: itemsCount, + SyncedAt: at.UTC().Format(time.RFC3339), + } + + records := readSyncRecords() + records = append(records, record) + if len(records) > 100 { + records = records[len(records)-100:] + } + writeSyncRecords(records) +} + func recordSyncPush(at time.Time) { state := readSyncStatusState() state.LastPushAt = at diff --git a/pkg/agentic/sync_test.go b/pkg/agentic/sync_test.go index 87050b3..d2c31c3 100644 --- a/pkg/agentic/sync_test.go +++ b/pkg/agentic/sync_test.go @@ -257,6 +257,44 @@ func TestSync_HandleSyncPull_Good_SinceQuery(t *testing.T) { assert.Equal(t, "mem-2", output.Context[0]["id"]) } +func TestSync_RecordSyncHistory_Good(t *testing.T) { + t.Setenv("CORE_WORKSPACE", t.TempDir()) + + now := time.Date(2026, 3, 31, 12, 0, 0, 0, time.UTC) + recordSyncHistory("push", "codex", 256, 3, now) + recordSyncHistory("pull", "codex", 128, 1, now.Add(5*time.Minute)) + + records := readSyncRecords() + require.Len(t, records, 2) + assert.Equal(t, "codex", records[0].AgentID) + assert.Equal(t, "push", records[0].Direction) + assert.Equal(t, 256, records[0].PayloadSize) + assert.Equal(t, 3, records[0].ItemsCount) + assert.Equal(t, "2026-03-31T12:00:00Z", records[0].SyncedAt) + assert.Equal(t, "pull", records[1].Direction) + assert.Equal(t, 1, records[1].ItemsCount) +} + +func TestSync_RecordSyncHistory_Bad_MissingFile(t *testing.T) { + t.Setenv("CORE_WORKSPACE", t.TempDir()) + + records := readSyncRecords() + require.Empty(t, records) + + recordSyncHistory("", "codex", 64, 1, time.Now()) + records = readSyncRecords() + require.Empty(t, records) +} + +func TestSync_RecordSyncHistory_Ugly_CorruptFile(t *testing.T) { + root := t.TempDir() + t.Setenv("CORE_WORKSPACE", root) + require.True(t, fs.WriteAtomic(syncRecordsPath(), "{not-json").OK) + + records := readSyncRecords() + require.Empty(t, records) +} + func TestSync_HandleSyncPush_Good_ReportMetadata(t *testing.T) { root := t.TempDir() t.Setenv("CORE_WORKSPACE", root)