feat(agentic): add fleet event SSE surface
Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
parent
800007907a
commit
57ee930717
7 changed files with 328 additions and 1 deletions
|
|
@ -22,6 +22,7 @@ func (s *PrepSubsystem) registerPlatformCommands() {
|
|||
c.Command("fleet/task/complete", core.Command{Description: "Complete a fleet task and report findings", Action: s.cmdFleetTaskComplete})
|
||||
c.Command("fleet/task/next", core.Command{Description: "Ask the platform for the next fleet task", Action: s.cmdFleetTaskNext})
|
||||
c.Command("fleet/stats", core.Command{Description: "Show fleet activity statistics", Action: s.cmdFleetStats})
|
||||
c.Command("fleet/events", core.Command{Description: "Read the next fleet event from the platform SSE stream", Action: s.cmdFleetEvents})
|
||||
|
||||
c.Command("credits/award", core.Command{Description: "Award credits to a fleet node", Action: s.cmdCreditsAward})
|
||||
c.Command("credits/balance", core.Command{Description: "Show credit balance for a fleet node", Action: s.cmdCreditsBalance})
|
||||
|
|
@ -369,6 +370,46 @@ func (s *PrepSubsystem) cmdFleetStats(options core.Options) core.Result {
|
|||
return core.Result{OK: true}
|
||||
}
|
||||
|
||||
func (s *PrepSubsystem) cmdFleetEvents(options core.Options) core.Result {
|
||||
result := s.handleFleetEvents(s.commandContext(), normalisePlatformCommandOptions(options))
|
||||
if !result.OK {
|
||||
err := commandResultError("agentic.cmdFleetEvents", result)
|
||||
core.Print(nil, "error: %v", err)
|
||||
return core.Result{Value: err, OK: false}
|
||||
}
|
||||
|
||||
output, ok := result.Value.(FleetEventOutput)
|
||||
if !ok {
|
||||
err := core.E("agentic.cmdFleetEvents", "invalid fleet event output", nil)
|
||||
core.Print(nil, "error: %v", err)
|
||||
return core.Result{Value: err, OK: false}
|
||||
}
|
||||
|
||||
core.Print(nil, "event: %s", output.Event.Event)
|
||||
if output.Event.Type != "" && output.Event.Type != output.Event.Event {
|
||||
core.Print(nil, "type: %s", output.Event.Type)
|
||||
}
|
||||
if output.Event.AgentID != "" {
|
||||
core.Print(nil, "agent: %s", output.Event.AgentID)
|
||||
}
|
||||
if output.Event.Repo != "" {
|
||||
core.Print(nil, "repo: %s", output.Event.Repo)
|
||||
}
|
||||
if output.Event.Branch != "" {
|
||||
core.Print(nil, "branch: %s", output.Event.Branch)
|
||||
}
|
||||
if output.Event.TaskID > 0 {
|
||||
core.Print(nil, "task id: %d", output.Event.TaskID)
|
||||
}
|
||||
if output.Event.Status != "" {
|
||||
core.Print(nil, "status: %s", output.Event.Status)
|
||||
}
|
||||
if len(output.Event.Payload) > 0 {
|
||||
core.Print(nil, "payload: %s", core.JSONMarshalString(output.Event.Payload))
|
||||
}
|
||||
return core.Result{OK: true}
|
||||
}
|
||||
|
||||
func (s *PrepSubsystem) cmdCreditsAward(options core.Options) core.Result {
|
||||
agentID := optionStringValue(options, "agent_id", "agent-id", "_arg")
|
||||
taskType := optionStringValue(options, "task_type", "task-type")
|
||||
|
|
|
|||
|
|
@ -54,6 +54,23 @@ func TestCommandsplatform_CmdFleetNodes_Good(t *testing.T) {
|
|||
assert.Contains(t, output, "total: 1")
|
||||
}
|
||||
|
||||
func TestCommandsplatform_CmdFleetEvents_Good(t *testing.T) {
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
_, _ = w.Write([]byte("data: {\"event\":\"task.assigned\",\"agent_id\":\"charon\",\"task_id\":9,\"repo\":\"core/go-io\",\"branch\":\"dev\"}\n\n"))
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
subsystem := testPrepWithPlatformServer(t, server, "secret-token")
|
||||
output := captureStdout(t, func() {
|
||||
result := subsystem.cmdFleetEvents(core.NewOptions(core.Option{Key: "_arg", Value: "charon"}))
|
||||
assert.True(t, result.OK)
|
||||
})
|
||||
|
||||
assert.Contains(t, output, "event: task.assigned")
|
||||
assert.Contains(t, output, "agent: charon")
|
||||
assert.Contains(t, output, "repo: core/go-io")
|
||||
}
|
||||
|
||||
func TestCommandsplatform_CmdSyncStatus_Good(t *testing.T) {
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
_, _ = w.Write([]byte(`{"data":{"agent_id":"charon","status":"online","last_push_at":"2026-03-31T08:00:00Z"}}`))
|
||||
|
|
|
|||
|
|
@ -3,7 +3,10 @@
|
|||
package agentic
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"io"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
core "dappco.re/go/core"
|
||||
|
|
@ -59,6 +62,26 @@ type FleetStats struct {
|
|||
ComputeHours int `json:"compute_hours"`
|
||||
}
|
||||
|
||||
// event := agentic.FleetEvent{Type: "task.assigned", AgentID: "charon", Repo: "core/go-io"}
|
||||
type FleetEvent struct {
|
||||
Type string `json:"type,omitempty"`
|
||||
Event string `json:"event,omitempty"`
|
||||
AgentID string `json:"agent_id,omitempty"`
|
||||
TaskID int `json:"task_id,omitempty"`
|
||||
Repo string `json:"repo,omitempty"`
|
||||
Branch string `json:"branch,omitempty"`
|
||||
Status string `json:"status,omitempty"`
|
||||
ReceivedAt string `json:"received_at,omitempty"`
|
||||
Payload map[string]any `json:"payload,omitempty"`
|
||||
}
|
||||
|
||||
// out := agentic.FleetEventOutput{Success: true, Event: agentic.FleetEvent{Type: "task.assigned"}}
|
||||
type FleetEventOutput struct {
|
||||
Success bool `json:"success"`
|
||||
Event FleetEvent `json:"event"`
|
||||
Raw string `json:"raw,omitempty"`
|
||||
}
|
||||
|
||||
// status := agentic.SyncStatusOutput{AgentID: "charon", Status: "online"}
|
||||
type SyncStatusOutput struct {
|
||||
AgentID string `json:"agent_id"`
|
||||
|
|
@ -353,6 +376,25 @@ func (s *PrepSubsystem) handleFleetStats(ctx context.Context, options core.Optio
|
|||
return core.Result{Value: parseFleetStats(payloadResourceMap(result.Value.(map[string]any), "stats")), OK: true}
|
||||
}
|
||||
|
||||
// result := c.Action("agentic.fleet.events").Run(ctx, core.NewOptions(core.Option{Key: "agent_id", Value: "charon"}))
|
||||
func (s *PrepSubsystem) handleFleetEvents(ctx context.Context, options core.Options) core.Result {
|
||||
agentID := optionStringValue(options, "agent_id", "agent-id", "_arg")
|
||||
path := "/v1/fleet/events"
|
||||
path = appendQueryParam(path, "agent_id", agentID)
|
||||
|
||||
result := s.platformEventPayload(ctx, "agentic.fleet.events", path)
|
||||
if !result.OK {
|
||||
return result
|
||||
}
|
||||
|
||||
output, err := parseFleetEventOutput(result.Value.(map[string]any))
|
||||
if err != nil {
|
||||
return core.Result{Value: err, OK: false}
|
||||
}
|
||||
|
||||
return core.Result{Value: output, OK: true}
|
||||
}
|
||||
|
||||
// result := c.Action("agentic.credits.award").Run(ctx, core.NewOptions(core.Option{Key: "agent_id", Value: "charon"}))
|
||||
func (s *PrepSubsystem) handleCreditsAward(ctx context.Context, options core.Options) core.Result {
|
||||
agentID := optionStringValue(options, "agent_id", "agent-id", "_arg")
|
||||
|
|
@ -494,6 +536,50 @@ func (s *PrepSubsystem) platformPayload(ctx context.Context, action, method, pat
|
|||
return core.Result{Value: payload, OK: true}
|
||||
}
|
||||
|
||||
func (s *PrepSubsystem) platformEventPayload(ctx context.Context, action, path string) core.Result {
|
||||
token := s.syncToken()
|
||||
if token == "" {
|
||||
return core.Result{Value: core.E(action, "no platform API key configured", nil), OK: false}
|
||||
}
|
||||
|
||||
request, err := http.NewRequestWithContext(ctx, "GET", core.Concat(s.syncAPIURL(), path), nil)
|
||||
if err != nil {
|
||||
return core.Result{Value: core.E(action, "create request", err), OK: false}
|
||||
}
|
||||
request.Header.Set("Accept", "text/event-stream, application/json")
|
||||
request.Header.Set("Authorization", core.Concat("Bearer ", token))
|
||||
|
||||
response, err := defaultClient.Do(request)
|
||||
if err != nil {
|
||||
return core.Result{Value: core.E(action, "request failed", err), OK: false}
|
||||
}
|
||||
defer response.Body.Close()
|
||||
|
||||
if response.StatusCode >= 400 {
|
||||
readResult := core.ReadAll(response.Body)
|
||||
if !readResult.OK {
|
||||
return core.Result{Value: core.E(action, core.Sprintf("HTTP %d", response.StatusCode), nil), OK: false}
|
||||
}
|
||||
body := core.Trim(readResult.Value.(string))
|
||||
if body == "" {
|
||||
return core.Result{Value: core.E(action, core.Sprintf("HTTP %d", response.StatusCode), nil), OK: false}
|
||||
}
|
||||
return core.Result{Value: platformResultError(action, core.Result{Value: body, OK: false}), OK: false}
|
||||
}
|
||||
|
||||
eventBody, readErr := readFleetEventBody(response.Body)
|
||||
if readErr != nil {
|
||||
return core.Result{Value: core.E(action, "failed to read event stream", readErr), OK: false}
|
||||
}
|
||||
|
||||
payload := s.eventPayloadValue(eventBody)
|
||||
if len(payload) == 0 {
|
||||
return core.Result{Value: core.E(action, "no fleet event payload returned", nil), OK: false}
|
||||
}
|
||||
|
||||
return core.Result{Value: payload, OK: true}
|
||||
}
|
||||
|
||||
func platformResultError(action string, result core.Result) error {
|
||||
if err, ok := result.Value.(error); ok && err != nil {
|
||||
return core.E(action, "platform request failed", err)
|
||||
|
|
@ -664,6 +750,114 @@ func parseFleetStats(values map[string]any) FleetStats {
|
|||
}
|
||||
}
|
||||
|
||||
func parseFleetEventOutput(values map[string]any) (FleetEventOutput, error) {
|
||||
eventValues := payloadResourceMap(values, "event")
|
||||
if len(eventValues) == 0 {
|
||||
eventValues = values
|
||||
}
|
||||
|
||||
event := parseFleetEvent(eventValues)
|
||||
if event.Event == "" && event.Type == "" && event.AgentID == "" && event.Repo == "" {
|
||||
return FleetEventOutput{}, core.E("parseFleetEventOutput", "fleet event payload is empty", nil)
|
||||
}
|
||||
|
||||
return FleetEventOutput{
|
||||
Success: true,
|
||||
Event: event,
|
||||
Raw: core.Trim(stringValue(values["raw"])),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *PrepSubsystem) eventPayloadValue(body string) map[string]any {
|
||||
trimmed := core.Trim(body)
|
||||
if trimmed == "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
if core.HasPrefix(trimmed, "data: ") {
|
||||
trimmed = core.Trim(core.TrimPrefix(trimmed, "data: "))
|
||||
}
|
||||
|
||||
var payload map[string]any
|
||||
if parseResult := core.JSONUnmarshalString(trimmed, &payload); parseResult.OK {
|
||||
if payload != nil {
|
||||
payload["raw"] = trimmed
|
||||
}
|
||||
return payload
|
||||
}
|
||||
|
||||
return map[string]any{
|
||||
"raw": trimmed,
|
||||
}
|
||||
}
|
||||
|
||||
func readFleetEventBody(body io.ReadCloser) (string, error) {
|
||||
reader := bufio.NewReader(body)
|
||||
rawLines := make([]string, 0, 4)
|
||||
dataLines := make([]string, 0, 4)
|
||||
|
||||
for {
|
||||
line, err := reader.ReadString('\n')
|
||||
if line != "" {
|
||||
trimmed := core.Trim(line)
|
||||
if trimmed != "" {
|
||||
rawLines = append(rawLines, trimmed)
|
||||
if core.HasPrefix(trimmed, "data:") {
|
||||
dataLines = append(dataLines, core.Trim(core.TrimPrefix(trimmed, "data:")))
|
||||
}
|
||||
} else if len(dataLines) > 0 {
|
||||
return core.Join("\n", dataLines...), nil
|
||||
}
|
||||
}
|
||||
|
||||
if err == io.EOF {
|
||||
if len(dataLines) > 0 {
|
||||
return core.Join("\n", dataLines...), nil
|
||||
}
|
||||
return core.Join("\n", rawLines...), nil
|
||||
}
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func parseFleetEvent(values map[string]any) FleetEvent {
|
||||
payload := map[string]any{}
|
||||
for key, value := range values {
|
||||
switch key {
|
||||
case "type", "event", "agent_id", "task_id", "repo", "branch", "status", "received_at":
|
||||
continue
|
||||
default:
|
||||
payload[key] = value
|
||||
}
|
||||
}
|
||||
if len(payload) == 0 {
|
||||
payload = nil
|
||||
}
|
||||
|
||||
event := FleetEvent{
|
||||
Type: stringValue(values["type"]),
|
||||
Event: stringValue(values["event"]),
|
||||
AgentID: stringValue(values["agent_id"]),
|
||||
TaskID: intValue(values["task_id"]),
|
||||
Repo: stringValue(values["repo"]),
|
||||
Branch: stringValue(values["branch"]),
|
||||
Status: stringValue(values["status"]),
|
||||
ReceivedAt: stringValue(values["received_at"]),
|
||||
Payload: payload,
|
||||
}
|
||||
|
||||
if event.Event == "" {
|
||||
event.Event = event.Type
|
||||
}
|
||||
if event.Type == "" {
|
||||
event.Type = event.Event
|
||||
}
|
||||
|
||||
return event
|
||||
}
|
||||
|
||||
func parseCreditEntry(values map[string]any) CreditEntry {
|
||||
return CreditEntry{
|
||||
ID: intValue(values["id"]),
|
||||
|
|
|
|||
|
|
@ -150,6 +150,57 @@ func TestPlatform_HandleFleetNextTask_Ugly(t *testing.T) {
|
|||
assert.Nil(t, task)
|
||||
}
|
||||
|
||||
func TestPlatform_HandleFleetEvents_Good(t *testing.T) {
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
require.Equal(t, "/v1/fleet/events", r.URL.Path)
|
||||
require.Equal(t, "charon", r.URL.Query().Get("agent_id"))
|
||||
require.Equal(t, "Bearer secret-token", r.Header.Get("Authorization"))
|
||||
_, _ = w.Write([]byte("data: {\"event\":\"task.assigned\",\"type\":\"task.assigned\",\"agent_id\":\"charon\",\"task_id\":9,\"repo\":\"core/go-io\",\"branch\":\"dev\",\"status\":\"assigned\"}\n\n"))
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
subsystem := testPrepWithPlatformServer(t, server, "secret-token")
|
||||
result := subsystem.handleFleetEvents(context.Background(), core.NewOptions(
|
||||
core.Option{Key: "agent_id", Value: "charon"},
|
||||
))
|
||||
require.True(t, result.OK)
|
||||
|
||||
output, ok := result.Value.(FleetEventOutput)
|
||||
require.True(t, ok)
|
||||
assert.Equal(t, "task.assigned", output.Event.Event)
|
||||
assert.Equal(t, "charon", output.Event.AgentID)
|
||||
assert.Equal(t, 9, output.Event.TaskID)
|
||||
assert.Equal(t, "core/go-io", output.Event.Repo)
|
||||
assert.Equal(t, "dev", output.Event.Branch)
|
||||
}
|
||||
|
||||
func TestPlatform_HandleFleetEvents_Bad(t *testing.T) {
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusServiceUnavailable)
|
||||
_, _ = w.Write([]byte(`{"error":"event stream unavailable"}`))
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
subsystem := testPrepWithPlatformServer(t, server, "secret-token")
|
||||
result := subsystem.handleFleetEvents(context.Background(), core.NewOptions(
|
||||
core.Option{Key: "agent_id", Value: "charon"},
|
||||
))
|
||||
assert.False(t, result.OK)
|
||||
}
|
||||
|
||||
func TestPlatform_HandleFleetEvents_Ugly(t *testing.T) {
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
_, _ = w.Write([]byte("data: not-json\n\n"))
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
subsystem := testPrepWithPlatformServer(t, server, "secret-token")
|
||||
result := subsystem.handleFleetEvents(context.Background(), core.NewOptions(
|
||||
core.Option{Key: "agent_id", Value: "charon"},
|
||||
))
|
||||
assert.False(t, result.OK)
|
||||
}
|
||||
|
||||
func TestPlatform_HandleSyncStatus_Good(t *testing.T) {
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
require.Equal(t, "/v1/agent/status", r.URL.Path)
|
||||
|
|
|
|||
|
|
@ -105,6 +105,11 @@ func (s *PrepSubsystem) registerPlatformTools(server *mcp.Server) {
|
|||
Description: "Read aggregate fleet activity statistics.",
|
||||
}, s.fleetStatsTool)
|
||||
|
||||
mcp.AddTool(server, &mcp.Tool{
|
||||
Name: "agentic_fleet_events",
|
||||
Description: "Read the next fleet event from the platform SSE stream.",
|
||||
}, s.fleetEventsTool)
|
||||
|
||||
mcp.AddTool(server, &mcp.Tool{
|
||||
Name: "agentic_credits_award",
|
||||
Description: "Award credits to a fleet node for completed work.",
|
||||
|
|
@ -329,6 +334,20 @@ func (s *PrepSubsystem) fleetStatsTool(ctx context.Context, _ *mcp.CallToolReque
|
|||
return nil, output, nil
|
||||
}
|
||||
|
||||
func (s *PrepSubsystem) fleetEventsTool(ctx context.Context, _ *mcp.CallToolRequest, input struct {
|
||||
AgentID string `json:"agent_id,omitempty"`
|
||||
}) (*mcp.CallToolResult, FleetEventOutput, error) {
|
||||
result := s.handleFleetEvents(ctx, platformOptions(core.Option{Key: "agent_id", Value: input.AgentID}))
|
||||
if !result.OK {
|
||||
return nil, FleetEventOutput{}, resultErrorValue("agentic.fleet.events", result)
|
||||
}
|
||||
output, ok := result.Value.(FleetEventOutput)
|
||||
if !ok {
|
||||
return nil, FleetEventOutput{}, core.E("agentic.fleet.events", "invalid fleet event output", nil)
|
||||
}
|
||||
return nil, output, nil
|
||||
}
|
||||
|
||||
func (s *PrepSubsystem) creditsAwardTool(ctx context.Context, _ *mcp.CallToolRequest, input struct {
|
||||
AgentID string `json:"agent_id"`
|
||||
TaskType string `json:"task_type"`
|
||||
|
|
|
|||
|
|
@ -88,7 +88,7 @@ func (s *PrepSubsystem) OnStartup(ctx context.Context) core.Result {
|
|||
case "agentic.status", "agentic.scan", "agentic.watch",
|
||||
"agentic.issue.get", "agentic.issue.list", "agentic.pr.get", "agentic.pr.list",
|
||||
"agentic.prompt", "agentic.task", "agentic.flow", "agentic.persona",
|
||||
"agentic.sync.status", "agentic.fleet.nodes", "agentic.fleet.stats",
|
||||
"agentic.sync.status", "agentic.fleet.nodes", "agentic.fleet.stats", "agentic.fleet.events",
|
||||
"agentic.credits.balance", "agentic.credits.history",
|
||||
"agentic.subscription.detect", "agentic.subscription.budget":
|
||||
return core.Entitlement{Allowed: true, Unlimited: true}
|
||||
|
|
@ -139,6 +139,8 @@ func (s *PrepSubsystem) OnStartup(ctx context.Context) core.Result {
|
|||
c.Action("agent.fleet.task.next", s.handleFleetNextTask).Description = "Ask the platform for the next fleet task"
|
||||
c.Action("agentic.fleet.stats", s.handleFleetStats).Description = "Get fleet activity statistics"
|
||||
c.Action("agent.fleet.stats", s.handleFleetStats).Description = "Get fleet activity statistics"
|
||||
c.Action("agentic.fleet.events", s.handleFleetEvents).Description = "Read fleet task assignment events from the platform API"
|
||||
c.Action("agent.fleet.events", s.handleFleetEvents).Description = "Read fleet task assignment events from the platform API"
|
||||
c.Action("agentic.credits.award", s.handleCreditsAward).Description = "Award credits to a fleet node"
|
||||
c.Action("agent.credits.award", s.handleCreditsAward).Description = "Award credits to a fleet node"
|
||||
c.Action("agentic.credits.balance", s.handleCreditsBalance).Description = "Get credit balance for a fleet node"
|
||||
|
|
|
|||
|
|
@ -533,6 +533,8 @@ func TestPrep_OnStartup_Good_RegistersPlatformActionAliases(t *testing.T) {
|
|||
assert.True(t, c.Action("agent.fleet.register").Exists())
|
||||
assert.True(t, c.Action("agentic.credits.balance").Exists())
|
||||
assert.True(t, c.Action("agent.credits.balance").Exists())
|
||||
assert.True(t, c.Action("agentic.fleet.events").Exists())
|
||||
assert.True(t, c.Action("agent.fleet.events").Exists())
|
||||
assert.True(t, c.Action("agentic.subscription.budget.update").Exists())
|
||||
assert.True(t, c.Action("agent.subscription.budget.update").Exists())
|
||||
}
|
||||
|
|
@ -550,6 +552,7 @@ func TestPrep_OnStartup_Good_RegistersPlatformCommandAlias(t *testing.T) {
|
|||
assert.Contains(t, c.Commands(), "auth/revoke")
|
||||
assert.Contains(t, c.Commands(), "subscription/budget/update")
|
||||
assert.Contains(t, c.Commands(), "subscription/update-budget")
|
||||
assert.Contains(t, c.Commands(), "fleet/events")
|
||||
}
|
||||
|
||||
func TestPrep_OnStartup_Bad(t *testing.T) {
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue