feat(agentic): add fleet event polling fallback

Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
Virgil 2026-04-01 17:43:49 +00:00
parent dfaf14f061
commit a472afe4fd
5 changed files with 137 additions and 13 deletions

View file

@ -22,7 +22,7 @@ func (s *PrepSubsystem) registerPlatformCommands() {
c.Command("fleet/task/complete", core.Command{Description: "Complete a fleet task and report findings", Action: s.cmdFleetTaskComplete})
c.Command("fleet/task/next", core.Command{Description: "Ask the platform for the next fleet task", Action: s.cmdFleetTaskNext})
c.Command("fleet/stats", core.Command{Description: "Show fleet activity statistics", Action: s.cmdFleetStats})
c.Command("fleet/events", core.Command{Description: "Read the next fleet event from the platform SSE stream", Action: s.cmdFleetEvents})
c.Command("fleet/events", core.Command{Description: "Read the next fleet event from the platform SSE stream, falling back to polling when needed", Action: s.cmdFleetEvents})
c.Command("credits/award", core.Command{Description: "Award credits to a fleet node", Action: s.cmdCreditsAward})
c.Command("credits/balance", core.Command{Description: "Show credit balance for a fleet node", Action: s.cmdCreditsBalance})

View file

@ -56,7 +56,12 @@ func TestCommandsplatform_CmdFleetNodes_Good(t *testing.T) {
func TestCommandsplatform_CmdFleetEvents_Good(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, _ = w.Write([]byte("data: {\"event\":\"task.assigned\",\"agent_id\":\"charon\",\"task_id\":9,\"repo\":\"core/go-io\",\"branch\":\"dev\"}\n\n"))
switch r.URL.Path {
case "/v1/fleet/events":
_, _ = w.Write([]byte("data: {\"event\":\"task.assigned\",\"agent_id\":\"charon\",\"task_id\":9,\"repo\":\"core/go-io\",\"branch\":\"dev\"}\n\n"))
default:
w.WriteHeader(http.StatusNotFound)
}
}))
defer server.Close()
@ -71,6 +76,32 @@ func TestCommandsplatform_CmdFleetEvents_Good(t *testing.T) {
assert.Contains(t, output, "repo: core/go-io")
}
func TestCommandsplatform_CmdFleetEvents_Good_FallbackToTaskNext(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/v1/fleet/events":
w.WriteHeader(http.StatusServiceUnavailable)
_, _ = w.Write([]byte(`{"error":"event stream unavailable"}`))
case "/v1/fleet/task/next":
_, _ = w.Write([]byte(`{"data":{"id":11,"repo":"core/go-io","branch":"dev","task":"Fix tests","template":"coding","agent_model":"codex","status":"assigned"}}`))
default:
w.WriteHeader(http.StatusNotFound)
}
}))
defer server.Close()
subsystem := testPrepWithPlatformServer(t, server, "secret-token")
output := captureStdout(t, func() {
result := subsystem.cmdFleetEvents(core.NewOptions(core.Option{Key: "_arg", Value: "charon"}))
assert.True(t, result.OK)
})
assert.Contains(t, output, "event: task.assigned")
assert.Contains(t, output, "agent: charon")
assert.Contains(t, output, "task id: 11")
assert.Contains(t, output, "repo: core/go-io")
}
func TestCommandsplatform_CmdSyncStatus_Good(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, _ = w.Write([]byte(`{"data":{"agent_id":"charon","status":"online","last_push_at":"2026-03-31T08:00:00Z"}}`))

View file

@ -383,16 +383,36 @@ func (s *PrepSubsystem) handleFleetEvents(ctx context.Context, options core.Opti
path = appendQueryParam(path, "agent_id", agentID)
result := s.platformEventPayload(ctx, "agentic.fleet.events", path)
if !result.OK {
return result
if result.OK {
output, err := parseFleetEventOutput(result.Value.(map[string]any))
if err == nil {
return core.Result{Value: output, OK: true}
}
}
output, err := parseFleetEventOutput(result.Value.(map[string]any))
if err != nil {
return core.Result{Value: err, OK: false}
fallbackResult := s.handleFleetNextTask(ctx, core.NewOptions(
core.Option{Key: "agent_id", Value: agentID},
core.Option{Key: "capabilities", Value: optionStringSliceValue(options, "capabilities")},
))
if !fallbackResult.OK {
if result.OK {
return result
}
return fallbackResult
}
return core.Result{Value: output, OK: true}
task, ok := fallbackResult.Value.(*FleetTask)
if !ok {
return core.Result{Value: core.E("agentic.fleet.events", "invalid fleet task output", nil), OK: false}
}
if task == nil {
if result.OK {
return core.Result{Value: core.E("agentic.fleet.events", "no fleet event payload returned", nil), OK: false}
}
return core.Result{Value: core.E("agentic.fleet.events", "no fleet task available", nil), OK: false}
}
return core.Result{Value: fleetEventOutputFromTask(agentID, task), OK: true}
}
// result := c.Action("agentic.credits.award").Run(ctx, core.NewOptions(core.Option{Key: "agent_id", Value: "charon"}))
@ -858,6 +878,35 @@ func parseFleetEvent(values map[string]any) FleetEvent {
return event
}
func fleetEventOutputFromTask(agentID string, task *FleetTask) FleetEventOutput {
payload := map[string]any{
"task_id": task.ID,
"repo": task.Repo,
"branch": task.Branch,
"task": task.Task,
"template": task.Template,
"agent_model": task.AgentModel,
"status": task.Status,
"source": "polling",
"fleet_node_id": task.FleetNodeID,
}
return FleetEventOutput{
Success: true,
Event: FleetEvent{
Type: "task.assigned",
Event: "task.assigned",
AgentID: agentID,
TaskID: task.ID,
Repo: task.Repo,
Branch: task.Branch,
Status: "assigned",
Payload: payload,
},
Raw: "polling fallback",
}
}
func parseCreditEntry(values map[string]any) CreditEntry {
return CreditEntry{
ID: intValue(values["id"]),

View file

@ -174,10 +174,47 @@ func TestPlatform_HandleFleetEvents_Good(t *testing.T) {
assert.Equal(t, "dev", output.Event.Branch)
}
func TestPlatform_HandleFleetEvents_Bad(t *testing.T) {
func TestPlatform_HandleFleetEvents_Good_FallbackToTaskNext(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusServiceUnavailable)
_, _ = w.Write([]byte(`{"error":"event stream unavailable"}`))
switch r.URL.Path {
case "/v1/fleet/events":
w.WriteHeader(http.StatusServiceUnavailable)
_, _ = w.Write([]byte(`{"error":"event stream unavailable"}`))
case "/v1/fleet/task/next":
require.Equal(t, "charon", r.URL.Query().Get("agent_id"))
_, _ = w.Write([]byte(`{"data":{"id":12,"repo":"core/go-io","branch":"dev","task":"Fix tests","template":"coding","agent_model":"codex","status":"assigned"}}`))
default:
w.WriteHeader(http.StatusNotFound)
}
}))
defer server.Close()
subsystem := testPrepWithPlatformServer(t, server, "secret-token")
result := subsystem.handleFleetEvents(context.Background(), core.NewOptions(
core.Option{Key: "agent_id", Value: "charon"},
))
require.True(t, result.OK)
output, ok := result.Value.(FleetEventOutput)
require.True(t, ok)
assert.Equal(t, "task.assigned", output.Event.Event)
assert.Equal(t, "charon", output.Event.AgentID)
assert.Equal(t, 12, output.Event.TaskID)
assert.Equal(t, "core/go-io", output.Event.Repo)
assert.Equal(t, "dev", output.Event.Branch)
}
func TestPlatform_HandleFleetEvents_Bad_NoTaskAvailable(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/v1/fleet/events":
w.WriteHeader(http.StatusServiceUnavailable)
_, _ = w.Write([]byte(`{"error":"event stream unavailable"}`))
case "/v1/fleet/task/next":
_, _ = w.Write([]byte(`{"data":null}`))
default:
w.WriteHeader(http.StatusNotFound)
}
}))
defer server.Close()
@ -190,7 +227,14 @@ func TestPlatform_HandleFleetEvents_Bad(t *testing.T) {
func TestPlatform_HandleFleetEvents_Ugly(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, _ = w.Write([]byte("data: not-json\n\n"))
switch r.URL.Path {
case "/v1/fleet/events":
_, _ = w.Write([]byte("data: not-json\n\n"))
case "/v1/fleet/task/next":
_, _ = w.Write([]byte(`{"data":null}`))
default:
w.WriteHeader(http.StatusNotFound)
}
}))
defer server.Close()

View file

@ -107,7 +107,7 @@ func (s *PrepSubsystem) registerPlatformTools(server *mcp.Server) {
mcp.AddTool(server, &mcp.Tool{
Name: "agentic_fleet_events",
Description: "Read the next fleet event from the platform SSE stream.",
Description: "Read the next fleet event from the platform SSE stream, falling back to polling when needed.",
}, s.fleetEventsTool)
mcp.AddTool(server, &mcp.Tool{