diff --git a/pkg/agentic/commands_platform.go b/pkg/agentic/commands_platform.go index bd106a4..f725916 100644 --- a/pkg/agentic/commands_platform.go +++ b/pkg/agentic/commands_platform.go @@ -22,7 +22,7 @@ func (s *PrepSubsystem) registerPlatformCommands() { c.Command("fleet/task/complete", core.Command{Description: "Complete a fleet task and report findings", Action: s.cmdFleetTaskComplete}) c.Command("fleet/task/next", core.Command{Description: "Ask the platform for the next fleet task", Action: s.cmdFleetTaskNext}) c.Command("fleet/stats", core.Command{Description: "Show fleet activity statistics", Action: s.cmdFleetStats}) - c.Command("fleet/events", core.Command{Description: "Read the next fleet event from the platform SSE stream", Action: s.cmdFleetEvents}) + c.Command("fleet/events", core.Command{Description: "Read the next fleet event from the platform SSE stream, falling back to polling when needed", Action: s.cmdFleetEvents}) c.Command("credits/award", core.Command{Description: "Award credits to a fleet node", Action: s.cmdCreditsAward}) c.Command("credits/balance", core.Command{Description: "Show credit balance for a fleet node", Action: s.cmdCreditsBalance}) diff --git a/pkg/agentic/commands_platform_test.go b/pkg/agentic/commands_platform_test.go index 5fc9450..517c08e 100644 --- a/pkg/agentic/commands_platform_test.go +++ b/pkg/agentic/commands_platform_test.go @@ -56,7 +56,12 @@ func TestCommandsplatform_CmdFleetNodes_Good(t *testing.T) { func TestCommandsplatform_CmdFleetEvents_Good(t *testing.T) { server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - _, _ = w.Write([]byte("data: {\"event\":\"task.assigned\",\"agent_id\":\"charon\",\"task_id\":9,\"repo\":\"core/go-io\",\"branch\":\"dev\"}\n\n")) + switch r.URL.Path { + case "/v1/fleet/events": + _, _ = w.Write([]byte("data: {\"event\":\"task.assigned\",\"agent_id\":\"charon\",\"task_id\":9,\"repo\":\"core/go-io\",\"branch\":\"dev\"}\n\n")) + default: + w.WriteHeader(http.StatusNotFound) + } })) defer server.Close() @@ -71,6 +76,32 @@ func TestCommandsplatform_CmdFleetEvents_Good(t *testing.T) { assert.Contains(t, output, "repo: core/go-io") } +func TestCommandsplatform_CmdFleetEvents_Good_FallbackToTaskNext(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/v1/fleet/events": + w.WriteHeader(http.StatusServiceUnavailable) + _, _ = w.Write([]byte(`{"error":"event stream unavailable"}`)) + case "/v1/fleet/task/next": + _, _ = w.Write([]byte(`{"data":{"id":11,"repo":"core/go-io","branch":"dev","task":"Fix tests","template":"coding","agent_model":"codex","status":"assigned"}}`)) + default: + w.WriteHeader(http.StatusNotFound) + } + })) + defer server.Close() + + subsystem := testPrepWithPlatformServer(t, server, "secret-token") + output := captureStdout(t, func() { + result := subsystem.cmdFleetEvents(core.NewOptions(core.Option{Key: "_arg", Value: "charon"})) + assert.True(t, result.OK) + }) + + assert.Contains(t, output, "event: task.assigned") + assert.Contains(t, output, "agent: charon") + assert.Contains(t, output, "task id: 11") + assert.Contains(t, output, "repo: core/go-io") +} + func TestCommandsplatform_CmdSyncStatus_Good(t *testing.T) { server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { _, _ = w.Write([]byte(`{"data":{"agent_id":"charon","status":"online","last_push_at":"2026-03-31T08:00:00Z"}}`)) diff --git a/pkg/agentic/platform.go b/pkg/agentic/platform.go index b304890..2674163 100644 --- a/pkg/agentic/platform.go +++ b/pkg/agentic/platform.go @@ -383,16 +383,36 @@ func (s *PrepSubsystem) handleFleetEvents(ctx context.Context, options core.Opti path = appendQueryParam(path, "agent_id", agentID) result := s.platformEventPayload(ctx, "agentic.fleet.events", path) - if !result.OK { - return result + if result.OK { + output, err := parseFleetEventOutput(result.Value.(map[string]any)) + if err == nil { + return core.Result{Value: output, OK: true} + } } - output, err := parseFleetEventOutput(result.Value.(map[string]any)) - if err != nil { - return core.Result{Value: err, OK: false} + fallbackResult := s.handleFleetNextTask(ctx, core.NewOptions( + core.Option{Key: "agent_id", Value: agentID}, + core.Option{Key: "capabilities", Value: optionStringSliceValue(options, "capabilities")}, + )) + if !fallbackResult.OK { + if result.OK { + return result + } + return fallbackResult } - return core.Result{Value: output, OK: true} + task, ok := fallbackResult.Value.(*FleetTask) + if !ok { + return core.Result{Value: core.E("agentic.fleet.events", "invalid fleet task output", nil), OK: false} + } + if task == nil { + if result.OK { + return core.Result{Value: core.E("agentic.fleet.events", "no fleet event payload returned", nil), OK: false} + } + return core.Result{Value: core.E("agentic.fleet.events", "no fleet task available", nil), OK: false} + } + + return core.Result{Value: fleetEventOutputFromTask(agentID, task), OK: true} } // result := c.Action("agentic.credits.award").Run(ctx, core.NewOptions(core.Option{Key: "agent_id", Value: "charon"})) @@ -858,6 +878,35 @@ func parseFleetEvent(values map[string]any) FleetEvent { return event } +func fleetEventOutputFromTask(agentID string, task *FleetTask) FleetEventOutput { + payload := map[string]any{ + "task_id": task.ID, + "repo": task.Repo, + "branch": task.Branch, + "task": task.Task, + "template": task.Template, + "agent_model": task.AgentModel, + "status": task.Status, + "source": "polling", + "fleet_node_id": task.FleetNodeID, + } + + return FleetEventOutput{ + Success: true, + Event: FleetEvent{ + Type: "task.assigned", + Event: "task.assigned", + AgentID: agentID, + TaskID: task.ID, + Repo: task.Repo, + Branch: task.Branch, + Status: "assigned", + Payload: payload, + }, + Raw: "polling fallback", + } +} + func parseCreditEntry(values map[string]any) CreditEntry { return CreditEntry{ ID: intValue(values["id"]), diff --git a/pkg/agentic/platform_test.go b/pkg/agentic/platform_test.go index c1ce9da..4dc89d0 100644 --- a/pkg/agentic/platform_test.go +++ b/pkg/agentic/platform_test.go @@ -174,10 +174,47 @@ func TestPlatform_HandleFleetEvents_Good(t *testing.T) { assert.Equal(t, "dev", output.Event.Branch) } -func TestPlatform_HandleFleetEvents_Bad(t *testing.T) { +func TestPlatform_HandleFleetEvents_Good_FallbackToTaskNext(t *testing.T) { server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusServiceUnavailable) - _, _ = w.Write([]byte(`{"error":"event stream unavailable"}`)) + switch r.URL.Path { + case "/v1/fleet/events": + w.WriteHeader(http.StatusServiceUnavailable) + _, _ = w.Write([]byte(`{"error":"event stream unavailable"}`)) + case "/v1/fleet/task/next": + require.Equal(t, "charon", r.URL.Query().Get("agent_id")) + _, _ = w.Write([]byte(`{"data":{"id":12,"repo":"core/go-io","branch":"dev","task":"Fix tests","template":"coding","agent_model":"codex","status":"assigned"}}`)) + default: + w.WriteHeader(http.StatusNotFound) + } + })) + defer server.Close() + + subsystem := testPrepWithPlatformServer(t, server, "secret-token") + result := subsystem.handleFleetEvents(context.Background(), core.NewOptions( + core.Option{Key: "agent_id", Value: "charon"}, + )) + require.True(t, result.OK) + + output, ok := result.Value.(FleetEventOutput) + require.True(t, ok) + assert.Equal(t, "task.assigned", output.Event.Event) + assert.Equal(t, "charon", output.Event.AgentID) + assert.Equal(t, 12, output.Event.TaskID) + assert.Equal(t, "core/go-io", output.Event.Repo) + assert.Equal(t, "dev", output.Event.Branch) +} + +func TestPlatform_HandleFleetEvents_Bad_NoTaskAvailable(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/v1/fleet/events": + w.WriteHeader(http.StatusServiceUnavailable) + _, _ = w.Write([]byte(`{"error":"event stream unavailable"}`)) + case "/v1/fleet/task/next": + _, _ = w.Write([]byte(`{"data":null}`)) + default: + w.WriteHeader(http.StatusNotFound) + } })) defer server.Close() @@ -190,7 +227,14 @@ func TestPlatform_HandleFleetEvents_Bad(t *testing.T) { func TestPlatform_HandleFleetEvents_Ugly(t *testing.T) { server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - _, _ = w.Write([]byte("data: not-json\n\n")) + switch r.URL.Path { + case "/v1/fleet/events": + _, _ = w.Write([]byte("data: not-json\n\n")) + case "/v1/fleet/task/next": + _, _ = w.Write([]byte(`{"data":null}`)) + default: + w.WriteHeader(http.StatusNotFound) + } })) defer server.Close() diff --git a/pkg/agentic/platform_tools.go b/pkg/agentic/platform_tools.go index 3b3c10c..6b34c7c 100644 --- a/pkg/agentic/platform_tools.go +++ b/pkg/agentic/platform_tools.go @@ -107,7 +107,7 @@ func (s *PrepSubsystem) registerPlatformTools(server *mcp.Server) { mcp.AddTool(server, &mcp.Tool{ Name: "agentic_fleet_events", - Description: "Read the next fleet event from the platform SSE stream.", + Description: "Read the next fleet event from the platform SSE stream, falling back to polling when needed.", }, s.fleetEventsTool) mcp.AddTool(server, &mcp.Tool{