feat(agentic): persist sync history records
Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
parent
3d8a65c3d4
commit
d61c74d7e0
2 changed files with 98 additions and 0 deletions
|
|
@ -30,6 +30,15 @@ type SyncPullOutput struct {
|
|||
Context []map[string]any `json:"context"`
|
||||
}
|
||||
|
||||
// record := agentic.SyncRecord{AgentID: "codex", Direction: "push", ItemsCount: 3, PayloadSize: 512, SyncedAt: "2026-03-31T12:00:00Z"}
|
||||
type SyncRecord struct {
|
||||
AgentID string `json:"agent_id"`
|
||||
Direction string `json:"direction"`
|
||||
PayloadSize int `json:"payload_size"`
|
||||
ItemsCount int `json:"items_count"`
|
||||
SyncedAt string `json:"synced_at"`
|
||||
}
|
||||
|
||||
type syncQueuedPush struct {
|
||||
AgentID string `json:"agent_id"`
|
||||
Dispatches []map[string]any `json:"dispatches"`
|
||||
|
|
@ -108,6 +117,10 @@ func (s *PrepSubsystem) syncPushInput(ctx context.Context, input SyncPushInput)
|
|||
}
|
||||
synced += len(queued.Dispatches)
|
||||
recordSyncPush(time.Now())
|
||||
recordSyncHistory("push", queued.AgentID, len(core.JSONMarshalString(map[string]any{
|
||||
"agent_id": queued.AgentID,
|
||||
"dispatches": queued.Dispatches,
|
||||
})), len(queued.Dispatches), time.Now())
|
||||
}
|
||||
|
||||
writeSyncQueue(nil)
|
||||
|
|
@ -148,6 +161,7 @@ func (s *PrepSubsystem) syncPullInput(ctx context.Context, input SyncPullInput)
|
|||
contextData := syncContextPayload(response)
|
||||
writeSyncContext(contextData)
|
||||
recordSyncPull(time.Now())
|
||||
recordSyncHistory("pull", agentID, len(result.Value.(string)), len(contextData), time.Now())
|
||||
|
||||
return SyncPullOutput{
|
||||
Success: true,
|
||||
|
|
@ -352,6 +366,52 @@ func writeSyncStatusState(state syncStatusState) {
|
|||
fs.WriteAtomic(syncStatusPath(), core.JSONMarshalString(state))
|
||||
}
|
||||
|
||||
func syncRecordsPath() string {
|
||||
return core.JoinPath(syncStateDir(), "records.json")
|
||||
}
|
||||
|
||||
func readSyncRecords() []SyncRecord {
|
||||
var records []SyncRecord
|
||||
result := fs.Read(syncRecordsPath())
|
||||
if !result.OK {
|
||||
return records
|
||||
}
|
||||
|
||||
parseResult := core.JSONUnmarshalString(result.Value.(string), &records)
|
||||
if !parseResult.OK {
|
||||
return []SyncRecord{}
|
||||
}
|
||||
|
||||
return records
|
||||
}
|
||||
|
||||
func writeSyncRecords(records []SyncRecord) {
|
||||
fs.EnsureDir(syncStateDir())
|
||||
fs.WriteAtomic(syncRecordsPath(), core.JSONMarshalString(records))
|
||||
}
|
||||
|
||||
func recordSyncHistory(direction, agentID string, payloadSize, itemsCount int, at time.Time) {
|
||||
direction = core.Trim(direction)
|
||||
if direction == "" {
|
||||
return
|
||||
}
|
||||
|
||||
record := SyncRecord{
|
||||
AgentID: core.Trim(agentID),
|
||||
Direction: direction,
|
||||
PayloadSize: payloadSize,
|
||||
ItemsCount: itemsCount,
|
||||
SyncedAt: at.UTC().Format(time.RFC3339),
|
||||
}
|
||||
|
||||
records := readSyncRecords()
|
||||
records = append(records, record)
|
||||
if len(records) > 100 {
|
||||
records = records[len(records)-100:]
|
||||
}
|
||||
writeSyncRecords(records)
|
||||
}
|
||||
|
||||
func recordSyncPush(at time.Time) {
|
||||
state := readSyncStatusState()
|
||||
state.LastPushAt = at
|
||||
|
|
|
|||
|
|
@ -257,6 +257,44 @@ func TestSync_HandleSyncPull_Good_SinceQuery(t *testing.T) {
|
|||
assert.Equal(t, "mem-2", output.Context[0]["id"])
|
||||
}
|
||||
|
||||
func TestSync_RecordSyncHistory_Good(t *testing.T) {
|
||||
t.Setenv("CORE_WORKSPACE", t.TempDir())
|
||||
|
||||
now := time.Date(2026, 3, 31, 12, 0, 0, 0, time.UTC)
|
||||
recordSyncHistory("push", "codex", 256, 3, now)
|
||||
recordSyncHistory("pull", "codex", 128, 1, now.Add(5*time.Minute))
|
||||
|
||||
records := readSyncRecords()
|
||||
require.Len(t, records, 2)
|
||||
assert.Equal(t, "codex", records[0].AgentID)
|
||||
assert.Equal(t, "push", records[0].Direction)
|
||||
assert.Equal(t, 256, records[0].PayloadSize)
|
||||
assert.Equal(t, 3, records[0].ItemsCount)
|
||||
assert.Equal(t, "2026-03-31T12:00:00Z", records[0].SyncedAt)
|
||||
assert.Equal(t, "pull", records[1].Direction)
|
||||
assert.Equal(t, 1, records[1].ItemsCount)
|
||||
}
|
||||
|
||||
func TestSync_RecordSyncHistory_Bad_MissingFile(t *testing.T) {
|
||||
t.Setenv("CORE_WORKSPACE", t.TempDir())
|
||||
|
||||
records := readSyncRecords()
|
||||
require.Empty(t, records)
|
||||
|
||||
recordSyncHistory("", "codex", 64, 1, time.Now())
|
||||
records = readSyncRecords()
|
||||
require.Empty(t, records)
|
||||
}
|
||||
|
||||
func TestSync_RecordSyncHistory_Ugly_CorruptFile(t *testing.T) {
|
||||
root := t.TempDir()
|
||||
t.Setenv("CORE_WORKSPACE", root)
|
||||
require.True(t, fs.WriteAtomic(syncRecordsPath(), "{not-json").OK)
|
||||
|
||||
records := readSyncRecords()
|
||||
require.Empty(t, records)
|
||||
}
|
||||
|
||||
func TestSync_HandleSyncPush_Good_ReportMetadata(t *testing.T) {
|
||||
root := t.TempDir()
|
||||
t.Setenv("CORE_WORKSPACE", root)
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue