diff --git a/pkg/agentic/commands_platform.go b/pkg/agentic/commands_platform.go index 7f0b061..bd106a4 100644 --- a/pkg/agentic/commands_platform.go +++ b/pkg/agentic/commands_platform.go @@ -22,6 +22,7 @@ func (s *PrepSubsystem) registerPlatformCommands() { c.Command("fleet/task/complete", core.Command{Description: "Complete a fleet task and report findings", Action: s.cmdFleetTaskComplete}) c.Command("fleet/task/next", core.Command{Description: "Ask the platform for the next fleet task", Action: s.cmdFleetTaskNext}) c.Command("fleet/stats", core.Command{Description: "Show fleet activity statistics", Action: s.cmdFleetStats}) + c.Command("fleet/events", core.Command{Description: "Read the next fleet event from the platform SSE stream", Action: s.cmdFleetEvents}) c.Command("credits/award", core.Command{Description: "Award credits to a fleet node", Action: s.cmdCreditsAward}) c.Command("credits/balance", core.Command{Description: "Show credit balance for a fleet node", Action: s.cmdCreditsBalance}) @@ -369,6 +370,46 @@ func (s *PrepSubsystem) cmdFleetStats(options core.Options) core.Result { return core.Result{OK: true} } +func (s *PrepSubsystem) cmdFleetEvents(options core.Options) core.Result { + result := s.handleFleetEvents(s.commandContext(), normalisePlatformCommandOptions(options)) + if !result.OK { + err := commandResultError("agentic.cmdFleetEvents", result) + core.Print(nil, "error: %v", err) + return core.Result{Value: err, OK: false} + } + + output, ok := result.Value.(FleetEventOutput) + if !ok { + err := core.E("agentic.cmdFleetEvents", "invalid fleet event output", nil) + core.Print(nil, "error: %v", err) + return core.Result{Value: err, OK: false} + } + + core.Print(nil, "event: %s", output.Event.Event) + if output.Event.Type != "" && output.Event.Type != output.Event.Event { + core.Print(nil, "type: %s", output.Event.Type) + } + if output.Event.AgentID != "" { + core.Print(nil, "agent: %s", output.Event.AgentID) + } + if output.Event.Repo != "" { + core.Print(nil, "repo: %s", output.Event.Repo) + } + if output.Event.Branch != "" { + core.Print(nil, "branch: %s", output.Event.Branch) + } + if output.Event.TaskID > 0 { + core.Print(nil, "task id: %d", output.Event.TaskID) + } + if output.Event.Status != "" { + core.Print(nil, "status: %s", output.Event.Status) + } + if len(output.Event.Payload) > 0 { + core.Print(nil, "payload: %s", core.JSONMarshalString(output.Event.Payload)) + } + return core.Result{OK: true} +} + func (s *PrepSubsystem) cmdCreditsAward(options core.Options) core.Result { agentID := optionStringValue(options, "agent_id", "agent-id", "_arg") taskType := optionStringValue(options, "task_type", "task-type") diff --git a/pkg/agentic/commands_platform_test.go b/pkg/agentic/commands_platform_test.go index f072c52..5fc9450 100644 --- a/pkg/agentic/commands_platform_test.go +++ b/pkg/agentic/commands_platform_test.go @@ -54,6 +54,23 @@ func TestCommandsplatform_CmdFleetNodes_Good(t *testing.T) { assert.Contains(t, output, "total: 1") } +func TestCommandsplatform_CmdFleetEvents_Good(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _, _ = w.Write([]byte("data: {\"event\":\"task.assigned\",\"agent_id\":\"charon\",\"task_id\":9,\"repo\":\"core/go-io\",\"branch\":\"dev\"}\n\n")) + })) + defer server.Close() + + subsystem := testPrepWithPlatformServer(t, server, "secret-token") + output := captureStdout(t, func() { + result := subsystem.cmdFleetEvents(core.NewOptions(core.Option{Key: "_arg", Value: "charon"})) + assert.True(t, result.OK) + }) + + assert.Contains(t, output, "event: task.assigned") + assert.Contains(t, output, "agent: charon") + assert.Contains(t, output, "repo: core/go-io") +} + func TestCommandsplatform_CmdSyncStatus_Good(t *testing.T) { server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { _, _ = w.Write([]byte(`{"data":{"agent_id":"charon","status":"online","last_push_at":"2026-03-31T08:00:00Z"}}`)) diff --git a/pkg/agentic/platform.go b/pkg/agentic/platform.go index 0326742..b304890 100644 --- a/pkg/agentic/platform.go +++ b/pkg/agentic/platform.go @@ -3,7 +3,10 @@ package agentic import ( + "bufio" "context" + "io" + "net/http" "time" core "dappco.re/go/core" @@ -59,6 +62,26 @@ type FleetStats struct { ComputeHours int `json:"compute_hours"` } +// event := agentic.FleetEvent{Type: "task.assigned", AgentID: "charon", Repo: "core/go-io"} +type FleetEvent struct { + Type string `json:"type,omitempty"` + Event string `json:"event,omitempty"` + AgentID string `json:"agent_id,omitempty"` + TaskID int `json:"task_id,omitempty"` + Repo string `json:"repo,omitempty"` + Branch string `json:"branch,omitempty"` + Status string `json:"status,omitempty"` + ReceivedAt string `json:"received_at,omitempty"` + Payload map[string]any `json:"payload,omitempty"` +} + +// out := agentic.FleetEventOutput{Success: true, Event: agentic.FleetEvent{Type: "task.assigned"}} +type FleetEventOutput struct { + Success bool `json:"success"` + Event FleetEvent `json:"event"` + Raw string `json:"raw,omitempty"` +} + // status := agentic.SyncStatusOutput{AgentID: "charon", Status: "online"} type SyncStatusOutput struct { AgentID string `json:"agent_id"` @@ -353,6 +376,25 @@ func (s *PrepSubsystem) handleFleetStats(ctx context.Context, options core.Optio return core.Result{Value: parseFleetStats(payloadResourceMap(result.Value.(map[string]any), "stats")), OK: true} } +// result := c.Action("agentic.fleet.events").Run(ctx, core.NewOptions(core.Option{Key: "agent_id", Value: "charon"})) +func (s *PrepSubsystem) handleFleetEvents(ctx context.Context, options core.Options) core.Result { + agentID := optionStringValue(options, "agent_id", "agent-id", "_arg") + path := "/v1/fleet/events" + path = appendQueryParam(path, "agent_id", agentID) + + result := s.platformEventPayload(ctx, "agentic.fleet.events", path) + if !result.OK { + return result + } + + output, err := parseFleetEventOutput(result.Value.(map[string]any)) + if err != nil { + return core.Result{Value: err, OK: false} + } + + return core.Result{Value: output, OK: true} +} + // result := c.Action("agentic.credits.award").Run(ctx, core.NewOptions(core.Option{Key: "agent_id", Value: "charon"})) func (s *PrepSubsystem) handleCreditsAward(ctx context.Context, options core.Options) core.Result { agentID := optionStringValue(options, "agent_id", "agent-id", "_arg") @@ -494,6 +536,50 @@ func (s *PrepSubsystem) platformPayload(ctx context.Context, action, method, pat return core.Result{Value: payload, OK: true} } +func (s *PrepSubsystem) platformEventPayload(ctx context.Context, action, path string) core.Result { + token := s.syncToken() + if token == "" { + return core.Result{Value: core.E(action, "no platform API key configured", nil), OK: false} + } + + request, err := http.NewRequestWithContext(ctx, "GET", core.Concat(s.syncAPIURL(), path), nil) + if err != nil { + return core.Result{Value: core.E(action, "create request", err), OK: false} + } + request.Header.Set("Accept", "text/event-stream, application/json") + request.Header.Set("Authorization", core.Concat("Bearer ", token)) + + response, err := defaultClient.Do(request) + if err != nil { + return core.Result{Value: core.E(action, "request failed", err), OK: false} + } + defer response.Body.Close() + + if response.StatusCode >= 400 { + readResult := core.ReadAll(response.Body) + if !readResult.OK { + return core.Result{Value: core.E(action, core.Sprintf("HTTP %d", response.StatusCode), nil), OK: false} + } + body := core.Trim(readResult.Value.(string)) + if body == "" { + return core.Result{Value: core.E(action, core.Sprintf("HTTP %d", response.StatusCode), nil), OK: false} + } + return core.Result{Value: platformResultError(action, core.Result{Value: body, OK: false}), OK: false} + } + + eventBody, readErr := readFleetEventBody(response.Body) + if readErr != nil { + return core.Result{Value: core.E(action, "failed to read event stream", readErr), OK: false} + } + + payload := s.eventPayloadValue(eventBody) + if len(payload) == 0 { + return core.Result{Value: core.E(action, "no fleet event payload returned", nil), OK: false} + } + + return core.Result{Value: payload, OK: true} +} + func platformResultError(action string, result core.Result) error { if err, ok := result.Value.(error); ok && err != nil { return core.E(action, "platform request failed", err) @@ -664,6 +750,114 @@ func parseFleetStats(values map[string]any) FleetStats { } } +func parseFleetEventOutput(values map[string]any) (FleetEventOutput, error) { + eventValues := payloadResourceMap(values, "event") + if len(eventValues) == 0 { + eventValues = values + } + + event := parseFleetEvent(eventValues) + if event.Event == "" && event.Type == "" && event.AgentID == "" && event.Repo == "" { + return FleetEventOutput{}, core.E("parseFleetEventOutput", "fleet event payload is empty", nil) + } + + return FleetEventOutput{ + Success: true, + Event: event, + Raw: core.Trim(stringValue(values["raw"])), + }, nil +} + +func (s *PrepSubsystem) eventPayloadValue(body string) map[string]any { + trimmed := core.Trim(body) + if trimmed == "" { + return nil + } + + if core.HasPrefix(trimmed, "data: ") { + trimmed = core.Trim(core.TrimPrefix(trimmed, "data: ")) + } + + var payload map[string]any + if parseResult := core.JSONUnmarshalString(trimmed, &payload); parseResult.OK { + if payload != nil { + payload["raw"] = trimmed + } + return payload + } + + return map[string]any{ + "raw": trimmed, + } +} + +func readFleetEventBody(body io.ReadCloser) (string, error) { + reader := bufio.NewReader(body) + rawLines := make([]string, 0, 4) + dataLines := make([]string, 0, 4) + + for { + line, err := reader.ReadString('\n') + if line != "" { + trimmed := core.Trim(line) + if trimmed != "" { + rawLines = append(rawLines, trimmed) + if core.HasPrefix(trimmed, "data:") { + dataLines = append(dataLines, core.Trim(core.TrimPrefix(trimmed, "data:"))) + } + } else if len(dataLines) > 0 { + return core.Join("\n", dataLines...), nil + } + } + + if err == io.EOF { + if len(dataLines) > 0 { + return core.Join("\n", dataLines...), nil + } + return core.Join("\n", rawLines...), nil + } + if err != nil { + return "", err + } + } +} + +func parseFleetEvent(values map[string]any) FleetEvent { + payload := map[string]any{} + for key, value := range values { + switch key { + case "type", "event", "agent_id", "task_id", "repo", "branch", "status", "received_at": + continue + default: + payload[key] = value + } + } + if len(payload) == 0 { + payload = nil + } + + event := FleetEvent{ + Type: stringValue(values["type"]), + Event: stringValue(values["event"]), + AgentID: stringValue(values["agent_id"]), + TaskID: intValue(values["task_id"]), + Repo: stringValue(values["repo"]), + Branch: stringValue(values["branch"]), + Status: stringValue(values["status"]), + ReceivedAt: stringValue(values["received_at"]), + Payload: payload, + } + + if event.Event == "" { + event.Event = event.Type + } + if event.Type == "" { + event.Type = event.Event + } + + return event +} + func parseCreditEntry(values map[string]any) CreditEntry { return CreditEntry{ ID: intValue(values["id"]), diff --git a/pkg/agentic/platform_test.go b/pkg/agentic/platform_test.go index badcbf1..c1ce9da 100644 --- a/pkg/agentic/platform_test.go +++ b/pkg/agentic/platform_test.go @@ -150,6 +150,57 @@ func TestPlatform_HandleFleetNextTask_Ugly(t *testing.T) { assert.Nil(t, task) } +func TestPlatform_HandleFleetEvents_Good(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + require.Equal(t, "/v1/fleet/events", r.URL.Path) + require.Equal(t, "charon", r.URL.Query().Get("agent_id")) + require.Equal(t, "Bearer secret-token", r.Header.Get("Authorization")) + _, _ = w.Write([]byte("data: {\"event\":\"task.assigned\",\"type\":\"task.assigned\",\"agent_id\":\"charon\",\"task_id\":9,\"repo\":\"core/go-io\",\"branch\":\"dev\",\"status\":\"assigned\"}\n\n")) + })) + defer server.Close() + + subsystem := testPrepWithPlatformServer(t, server, "secret-token") + result := subsystem.handleFleetEvents(context.Background(), core.NewOptions( + core.Option{Key: "agent_id", Value: "charon"}, + )) + require.True(t, result.OK) + + output, ok := result.Value.(FleetEventOutput) + require.True(t, ok) + assert.Equal(t, "task.assigned", output.Event.Event) + assert.Equal(t, "charon", output.Event.AgentID) + assert.Equal(t, 9, output.Event.TaskID) + assert.Equal(t, "core/go-io", output.Event.Repo) + assert.Equal(t, "dev", output.Event.Branch) +} + +func TestPlatform_HandleFleetEvents_Bad(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusServiceUnavailable) + _, _ = w.Write([]byte(`{"error":"event stream unavailable"}`)) + })) + defer server.Close() + + subsystem := testPrepWithPlatformServer(t, server, "secret-token") + result := subsystem.handleFleetEvents(context.Background(), core.NewOptions( + core.Option{Key: "agent_id", Value: "charon"}, + )) + assert.False(t, result.OK) +} + +func TestPlatform_HandleFleetEvents_Ugly(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _, _ = w.Write([]byte("data: not-json\n\n")) + })) + defer server.Close() + + subsystem := testPrepWithPlatformServer(t, server, "secret-token") + result := subsystem.handleFleetEvents(context.Background(), core.NewOptions( + core.Option{Key: "agent_id", Value: "charon"}, + )) + assert.False(t, result.OK) +} + func TestPlatform_HandleSyncStatus_Good(t *testing.T) { server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { require.Equal(t, "/v1/agent/status", r.URL.Path) diff --git a/pkg/agentic/platform_tools.go b/pkg/agentic/platform_tools.go index 5d2a1c6..3b3c10c 100644 --- a/pkg/agentic/platform_tools.go +++ b/pkg/agentic/platform_tools.go @@ -105,6 +105,11 @@ func (s *PrepSubsystem) registerPlatformTools(server *mcp.Server) { Description: "Read aggregate fleet activity statistics.", }, s.fleetStatsTool) + mcp.AddTool(server, &mcp.Tool{ + Name: "agentic_fleet_events", + Description: "Read the next fleet event from the platform SSE stream.", + }, s.fleetEventsTool) + mcp.AddTool(server, &mcp.Tool{ Name: "agentic_credits_award", Description: "Award credits to a fleet node for completed work.", @@ -329,6 +334,20 @@ func (s *PrepSubsystem) fleetStatsTool(ctx context.Context, _ *mcp.CallToolReque return nil, output, nil } +func (s *PrepSubsystem) fleetEventsTool(ctx context.Context, _ *mcp.CallToolRequest, input struct { + AgentID string `json:"agent_id,omitempty"` +}) (*mcp.CallToolResult, FleetEventOutput, error) { + result := s.handleFleetEvents(ctx, platformOptions(core.Option{Key: "agent_id", Value: input.AgentID})) + if !result.OK { + return nil, FleetEventOutput{}, resultErrorValue("agentic.fleet.events", result) + } + output, ok := result.Value.(FleetEventOutput) + if !ok { + return nil, FleetEventOutput{}, core.E("agentic.fleet.events", "invalid fleet event output", nil) + } + return nil, output, nil +} + func (s *PrepSubsystem) creditsAwardTool(ctx context.Context, _ *mcp.CallToolRequest, input struct { AgentID string `json:"agent_id"` TaskType string `json:"task_type"` diff --git a/pkg/agentic/prep.go b/pkg/agentic/prep.go index 2d3262c..824ec5b 100644 --- a/pkg/agentic/prep.go +++ b/pkg/agentic/prep.go @@ -88,7 +88,7 @@ func (s *PrepSubsystem) OnStartup(ctx context.Context) core.Result { case "agentic.status", "agentic.scan", "agentic.watch", "agentic.issue.get", "agentic.issue.list", "agentic.pr.get", "agentic.pr.list", "agentic.prompt", "agentic.task", "agentic.flow", "agentic.persona", - "agentic.sync.status", "agentic.fleet.nodes", "agentic.fleet.stats", + "agentic.sync.status", "agentic.fleet.nodes", "agentic.fleet.stats", "agentic.fleet.events", "agentic.credits.balance", "agentic.credits.history", "agentic.subscription.detect", "agentic.subscription.budget": return core.Entitlement{Allowed: true, Unlimited: true} @@ -139,6 +139,8 @@ func (s *PrepSubsystem) OnStartup(ctx context.Context) core.Result { c.Action("agent.fleet.task.next", s.handleFleetNextTask).Description = "Ask the platform for the next fleet task" c.Action("agentic.fleet.stats", s.handleFleetStats).Description = "Get fleet activity statistics" c.Action("agent.fleet.stats", s.handleFleetStats).Description = "Get fleet activity statistics" + c.Action("agentic.fleet.events", s.handleFleetEvents).Description = "Read fleet task assignment events from the platform API" + c.Action("agent.fleet.events", s.handleFleetEvents).Description = "Read fleet task assignment events from the platform API" c.Action("agentic.credits.award", s.handleCreditsAward).Description = "Award credits to a fleet node" c.Action("agent.credits.award", s.handleCreditsAward).Description = "Award credits to a fleet node" c.Action("agentic.credits.balance", s.handleCreditsBalance).Description = "Get credit balance for a fleet node" diff --git a/pkg/agentic/prep_test.go b/pkg/agentic/prep_test.go index 44899e4..5cd187c 100644 --- a/pkg/agentic/prep_test.go +++ b/pkg/agentic/prep_test.go @@ -533,6 +533,8 @@ func TestPrep_OnStartup_Good_RegistersPlatformActionAliases(t *testing.T) { assert.True(t, c.Action("agent.fleet.register").Exists()) assert.True(t, c.Action("agentic.credits.balance").Exists()) assert.True(t, c.Action("agent.credits.balance").Exists()) + assert.True(t, c.Action("agentic.fleet.events").Exists()) + assert.True(t, c.Action("agent.fleet.events").Exists()) assert.True(t, c.Action("agentic.subscription.budget.update").Exists()) assert.True(t, c.Action("agent.subscription.budget.update").Exists()) } @@ -550,6 +552,7 @@ func TestPrep_OnStartup_Good_RegistersPlatformCommandAlias(t *testing.T) { assert.Contains(t, c.Commands(), "auth/revoke") assert.Contains(t, c.Commands(), "subscription/budget/update") assert.Contains(t, c.Commands(), "subscription/update-budget") + assert.Contains(t, c.Commands(), "fleet/events") } func TestPrep_OnStartup_Bad(t *testing.T) {