feat(agentic): add fleet node id to sync records

Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
Virgil 2026-04-02 04:38:16 +00:00
parent ec5fdb3c67
commit 8efa9460bd
2 changed files with 42 additions and 21 deletions

View file

@ -10,8 +10,9 @@ import (
)
type SyncPushInput struct {
AgentID string `json:"agent_id,omitempty"`
Dispatches []map[string]any `json:"dispatches,omitempty"`
AgentID string `json:"agent_id,omitempty"`
FleetNodeID int `json:"fleet_node_id,omitempty"`
Dispatches []map[string]any `json:"dispatches,omitempty"`
}
type SyncPushOutput struct {
@ -20,8 +21,9 @@ type SyncPushOutput struct {
}
type SyncPullInput struct {
AgentID string `json:"agent_id,omitempty"`
Since string `json:"since,omitempty"`
AgentID string `json:"agent_id,omitempty"`
FleetNodeID int `json:"fleet_node_id,omitempty"`
Since string `json:"since,omitempty"`
}
type SyncPullOutput struct {
@ -32,7 +34,8 @@ type SyncPullOutput struct {
// record := agentic.SyncRecord{AgentID: "codex", Direction: "push", ItemsCount: 3, PayloadSize: 512, SyncedAt: "2026-03-31T12:00:00Z"}
type SyncRecord struct {
AgentID string `json:"agent_id"`
AgentID string `json:"agent_id,omitempty"`
FleetNodeID int `json:"fleet_node_id,omitempty"`
Direction string `json:"direction"`
PayloadSize int `json:"payload_size"`
ItemsCount int `json:"items_count"`
@ -40,9 +43,10 @@ type SyncRecord struct {
}
type syncQueuedPush struct {
AgentID string `json:"agent_id"`
Dispatches []map[string]any `json:"dispatches"`
QueuedAt time.Time `json:"queued_at"`
AgentID string `json:"agent_id"`
FleetNodeID int `json:"fleet_node_id,omitempty"`
Dispatches []map[string]any `json:"dispatches"`
QueuedAt time.Time `json:"queued_at"`
}
type syncStatusState struct {
@ -53,8 +57,9 @@ type syncStatusState struct {
// result := c.Action("agentic.sync.push").Run(ctx, core.NewOptions())
func (s *PrepSubsystem) handleSyncPush(ctx context.Context, options core.Options) core.Result {
output, err := s.syncPushInput(ctx, SyncPushInput{
AgentID: optionStringValue(options, "agent_id", "agent-id", "_arg"),
Dispatches: optionAnyMapSliceValue(options, "dispatches"),
AgentID: optionStringValue(options, "agent_id", "agent-id", "_arg"),
FleetNodeID: optionIntValue(options, "fleet_node_id", "fleet-node-id"),
Dispatches: optionAnyMapSliceValue(options, "dispatches"),
})
if err != nil {
return core.Result{Value: err, OK: false}
@ -65,8 +70,9 @@ func (s *PrepSubsystem) handleSyncPush(ctx context.Context, options core.Options
// result := c.Action("agentic.sync.pull").Run(ctx, core.NewOptions())
func (s *PrepSubsystem) handleSyncPull(ctx context.Context, options core.Options) core.Result {
output, err := s.syncPullInput(ctx, SyncPullInput{
AgentID: optionStringValue(options, "agent_id", "agent-id", "_arg"),
Since: optionStringValue(options, "since"),
AgentID: optionStringValue(options, "agent_id", "agent-id", "_arg"),
FleetNodeID: optionIntValue(options, "fleet_node_id", "fleet-node-id"),
Since: optionStringValue(options, "since"),
})
if err != nil {
return core.Result{Value: err, OK: false}
@ -91,9 +97,10 @@ func (s *PrepSubsystem) syncPushInput(ctx context.Context, input SyncPushInput)
queuedPushes := readSyncQueue()
if len(dispatches) > 0 {
queuedPushes = append(queuedPushes, syncQueuedPush{
AgentID: agentID,
Dispatches: dispatches,
QueuedAt: time.Now(),
AgentID: agentID,
FleetNodeID: input.FleetNodeID,
Dispatches: dispatches,
QueuedAt: time.Now(),
})
}
if token == "" {
@ -117,7 +124,7 @@ func (s *PrepSubsystem) syncPushInput(ctx context.Context, input SyncPushInput)
}
synced += len(queued.Dispatches)
recordSyncPush(time.Now())
recordSyncHistory("push", queued.AgentID, len(core.JSONMarshalString(map[string]any{
recordSyncHistory("push", queued.AgentID, queued.FleetNodeID, len(core.JSONMarshalString(map[string]any{
"agent_id": queued.AgentID,
"dispatches": queued.Dispatches,
})), len(queued.Dispatches), time.Now())
@ -161,7 +168,7 @@ func (s *PrepSubsystem) syncPullInput(ctx context.Context, input SyncPullInput)
contextData := syncContextPayload(response)
writeSyncContext(contextData)
recordSyncPull(time.Now())
recordSyncHistory("pull", agentID, len(result.Value.(string)), len(contextData), time.Now())
recordSyncHistory("pull", agentID, input.FleetNodeID, len(result.Value.(string)), len(contextData), time.Now())
return SyncPullOutput{
Success: true,
@ -390,7 +397,7 @@ func writeSyncRecords(records []SyncRecord) {
fs.WriteAtomic(syncRecordsPath(), core.JSONMarshalString(records))
}
func recordSyncHistory(direction, agentID string, payloadSize, itemsCount int, at time.Time) {
func recordSyncHistory(direction, agentID string, fleetNodeID, payloadSize, itemsCount int, at time.Time) {
direction = core.Trim(direction)
if direction == "" {
return
@ -398,6 +405,7 @@ func recordSyncHistory(direction, agentID string, payloadSize, itemsCount int, a
record := SyncRecord{
AgentID: core.Trim(agentID),
FleetNodeID: fleetNodeID,
Direction: direction,
PayloadSize: payloadSize,
ItemsCount: itemsCount,

View file

@ -261,12 +261,13 @@ func TestSync_RecordSyncHistory_Good(t *testing.T) {
t.Setenv("CORE_WORKSPACE", t.TempDir())
now := time.Date(2026, 3, 31, 12, 0, 0, 0, time.UTC)
recordSyncHistory("push", "codex", 256, 3, now)
recordSyncHistory("pull", "codex", 128, 1, now.Add(5*time.Minute))
recordSyncHistory("push", "codex", 7, 256, 3, now)
recordSyncHistory("pull", "codex", 7, 128, 1, now.Add(5*time.Minute))
records := readSyncRecords()
require.Len(t, records, 2)
assert.Equal(t, "codex", records[0].AgentID)
assert.Equal(t, 7, records[0].FleetNodeID)
assert.Equal(t, "push", records[0].Direction)
assert.Equal(t, 256, records[0].PayloadSize)
assert.Equal(t, 3, records[0].ItemsCount)
@ -275,13 +276,25 @@ func TestSync_RecordSyncHistory_Good(t *testing.T) {
assert.Equal(t, 1, records[1].ItemsCount)
}
func TestSync_RecordSyncHistory_Good_FleetNodeID(t *testing.T) {
t.Setenv("CORE_WORKSPACE", t.TempDir())
now := time.Date(2026, 3, 31, 12, 0, 0, 0, time.UTC)
recordSyncHistory("push", "charon", 42, 512, 2, now)
records := readSyncRecords()
require.Len(t, records, 1)
assert.Equal(t, 42, records[0].FleetNodeID)
assert.Equal(t, "charon", records[0].AgentID)
}
func TestSync_RecordSyncHistory_Bad_MissingFile(t *testing.T) {
t.Setenv("CORE_WORKSPACE", t.TempDir())
records := readSyncRecords()
require.Empty(t, records)
recordSyncHistory("", "codex", 64, 1, time.Now())
recordSyncHistory("", "codex", 0, 64, 1, time.Now())
records = readSyncRecords()
require.Empty(t, records)
}