diff --git a/pkg/agentic/sync.go b/pkg/agentic/sync.go index 165e9fd..310119c 100644 --- a/pkg/agentic/sync.go +++ b/pkg/agentic/sync.go @@ -10,8 +10,9 @@ import ( ) type SyncPushInput struct { - AgentID string `json:"agent_id,omitempty"` - Dispatches []map[string]any `json:"dispatches,omitempty"` + AgentID string `json:"agent_id,omitempty"` + FleetNodeID int `json:"fleet_node_id,omitempty"` + Dispatches []map[string]any `json:"dispatches,omitempty"` } type SyncPushOutput struct { @@ -20,8 +21,9 @@ type SyncPushOutput struct { } type SyncPullInput struct { - AgentID string `json:"agent_id,omitempty"` - Since string `json:"since,omitempty"` + AgentID string `json:"agent_id,omitempty"` + FleetNodeID int `json:"fleet_node_id,omitempty"` + Since string `json:"since,omitempty"` } type SyncPullOutput struct { @@ -32,7 +34,8 @@ type SyncPullOutput struct { // record := agentic.SyncRecord{AgentID: "codex", Direction: "push", ItemsCount: 3, PayloadSize: 512, SyncedAt: "2026-03-31T12:00:00Z"} type SyncRecord struct { - AgentID string `json:"agent_id"` + AgentID string `json:"agent_id,omitempty"` + FleetNodeID int `json:"fleet_node_id,omitempty"` Direction string `json:"direction"` PayloadSize int `json:"payload_size"` ItemsCount int `json:"items_count"` @@ -40,9 +43,10 @@ type SyncRecord struct { } type syncQueuedPush struct { - AgentID string `json:"agent_id"` - Dispatches []map[string]any `json:"dispatches"` - QueuedAt time.Time `json:"queued_at"` + AgentID string `json:"agent_id"` + FleetNodeID int `json:"fleet_node_id,omitempty"` + Dispatches []map[string]any `json:"dispatches"` + QueuedAt time.Time `json:"queued_at"` } type syncStatusState struct { @@ -53,8 +57,9 @@ type syncStatusState struct { // result := c.Action("agentic.sync.push").Run(ctx, core.NewOptions()) func (s *PrepSubsystem) handleSyncPush(ctx context.Context, options core.Options) core.Result { output, err := s.syncPushInput(ctx, SyncPushInput{ - AgentID: optionStringValue(options, "agent_id", "agent-id", "_arg"), - Dispatches: optionAnyMapSliceValue(options, "dispatches"), + AgentID: optionStringValue(options, "agent_id", "agent-id", "_arg"), + FleetNodeID: optionIntValue(options, "fleet_node_id", "fleet-node-id"), + Dispatches: optionAnyMapSliceValue(options, "dispatches"), }) if err != nil { return core.Result{Value: err, OK: false} @@ -65,8 +70,9 @@ func (s *PrepSubsystem) handleSyncPush(ctx context.Context, options core.Options // result := c.Action("agentic.sync.pull").Run(ctx, core.NewOptions()) func (s *PrepSubsystem) handleSyncPull(ctx context.Context, options core.Options) core.Result { output, err := s.syncPullInput(ctx, SyncPullInput{ - AgentID: optionStringValue(options, "agent_id", "agent-id", "_arg"), - Since: optionStringValue(options, "since"), + AgentID: optionStringValue(options, "agent_id", "agent-id", "_arg"), + FleetNodeID: optionIntValue(options, "fleet_node_id", "fleet-node-id"), + Since: optionStringValue(options, "since"), }) if err != nil { return core.Result{Value: err, OK: false} @@ -91,9 +97,10 @@ func (s *PrepSubsystem) syncPushInput(ctx context.Context, input SyncPushInput) queuedPushes := readSyncQueue() if len(dispatches) > 0 { queuedPushes = append(queuedPushes, syncQueuedPush{ - AgentID: agentID, - Dispatches: dispatches, - QueuedAt: time.Now(), + AgentID: agentID, + FleetNodeID: input.FleetNodeID, + Dispatches: dispatches, + QueuedAt: time.Now(), }) } if token == "" { @@ -117,7 +124,7 @@ func (s *PrepSubsystem) syncPushInput(ctx context.Context, input SyncPushInput) } synced += len(queued.Dispatches) recordSyncPush(time.Now()) - recordSyncHistory("push", queued.AgentID, len(core.JSONMarshalString(map[string]any{ + recordSyncHistory("push", queued.AgentID, queued.FleetNodeID, len(core.JSONMarshalString(map[string]any{ "agent_id": queued.AgentID, "dispatches": queued.Dispatches, })), len(queued.Dispatches), time.Now()) @@ -161,7 +168,7 @@ func (s *PrepSubsystem) syncPullInput(ctx context.Context, input SyncPullInput) contextData := syncContextPayload(response) writeSyncContext(contextData) recordSyncPull(time.Now()) - recordSyncHistory("pull", agentID, len(result.Value.(string)), len(contextData), time.Now()) + recordSyncHistory("pull", agentID, input.FleetNodeID, len(result.Value.(string)), len(contextData), time.Now()) return SyncPullOutput{ Success: true, @@ -390,7 +397,7 @@ func writeSyncRecords(records []SyncRecord) { fs.WriteAtomic(syncRecordsPath(), core.JSONMarshalString(records)) } -func recordSyncHistory(direction, agentID string, payloadSize, itemsCount int, at time.Time) { +func recordSyncHistory(direction, agentID string, fleetNodeID, payloadSize, itemsCount int, at time.Time) { direction = core.Trim(direction) if direction == "" { return @@ -398,6 +405,7 @@ func recordSyncHistory(direction, agentID string, payloadSize, itemsCount int, a record := SyncRecord{ AgentID: core.Trim(agentID), + FleetNodeID: fleetNodeID, Direction: direction, PayloadSize: payloadSize, ItemsCount: itemsCount, diff --git a/pkg/agentic/sync_test.go b/pkg/agentic/sync_test.go index d2c31c3..76351aa 100644 --- a/pkg/agentic/sync_test.go +++ b/pkg/agentic/sync_test.go @@ -261,12 +261,13 @@ func TestSync_RecordSyncHistory_Good(t *testing.T) { t.Setenv("CORE_WORKSPACE", t.TempDir()) now := time.Date(2026, 3, 31, 12, 0, 0, 0, time.UTC) - recordSyncHistory("push", "codex", 256, 3, now) - recordSyncHistory("pull", "codex", 128, 1, now.Add(5*time.Minute)) + recordSyncHistory("push", "codex", 7, 256, 3, now) + recordSyncHistory("pull", "codex", 7, 128, 1, now.Add(5*time.Minute)) records := readSyncRecords() require.Len(t, records, 2) assert.Equal(t, "codex", records[0].AgentID) + assert.Equal(t, 7, records[0].FleetNodeID) assert.Equal(t, "push", records[0].Direction) assert.Equal(t, 256, records[0].PayloadSize) assert.Equal(t, 3, records[0].ItemsCount) @@ -275,13 +276,25 @@ func TestSync_RecordSyncHistory_Good(t *testing.T) { assert.Equal(t, 1, records[1].ItemsCount) } +func TestSync_RecordSyncHistory_Good_FleetNodeID(t *testing.T) { + t.Setenv("CORE_WORKSPACE", t.TempDir()) + + now := time.Date(2026, 3, 31, 12, 0, 0, 0, time.UTC) + recordSyncHistory("push", "charon", 42, 512, 2, now) + + records := readSyncRecords() + require.Len(t, records, 1) + assert.Equal(t, 42, records[0].FleetNodeID) + assert.Equal(t, "charon", records[0].AgentID) +} + func TestSync_RecordSyncHistory_Bad_MissingFile(t *testing.T) { t.Setenv("CORE_WORKSPACE", t.TempDir()) records := readSyncRecords() require.Empty(t, records) - recordSyncHistory("", "codex", 64, 1, time.Now()) + recordSyncHistory("", "codex", 0, 64, 1, time.Now()) records = readSyncRecords() require.Empty(t, records) }