From 1b986f9f2df6c01122127612f15205ce2667d95f Mon Sep 17 00:00:00 2001 From: Virgil Date: Thu, 2 Apr 2026 05:33:16 +0000 Subject: [PATCH] fix(agentic): preserve fleet SSE event names Co-Authored-By: Virgil --- pkg/agentic/platform.go | 62 +++++++++++++++++++++++------------- pkg/agentic/platform_test.go | 34 +++++++++++++++++++- 2 files changed, 73 insertions(+), 23 deletions(-) diff --git a/pkg/agentic/platform.go b/pkg/agentic/platform.go index 2985514..e826510 100644 --- a/pkg/agentic/platform.go +++ b/pkg/agentic/platform.go @@ -892,27 +892,48 @@ func (s *PrepSubsystem) eventPayloadValue(body string) map[string]any { return nil } - if core.HasPrefix(trimmed, "data: ") { - trimmed = core.Trim(core.TrimPrefix(trimmed, "data: ")) - } - - var payload map[string]any - if parseResult := core.JSONUnmarshalString(trimmed, &payload); parseResult.OK { - if payload != nil { - payload["raw"] = trimmed - } - return payload - } - - return map[string]any{ + payload := map[string]any{ "raw": trimmed, } + + lines := core.Split(trimmed, "\n") + dataLines := make([]string, 0, len(lines)) + for _, line := range lines { + line = core.Trim(line) + if line == "" { + continue + } + + switch { + case core.HasPrefix(line, "event:"): + eventName := core.Trim(core.TrimPrefix(line, "event:")) + if eventName != "" { + payload["event"] = eventName + payload["type"] = eventName + } + case core.HasPrefix(line, "data:"): + dataLines = append(dataLines, core.Trim(core.TrimPrefix(line, "data:"))) + } + } + + dataBody := core.Join("\n", dataLines...) + if dataBody != "" { + var dataPayload map[string]any + if parseResult := core.JSONUnmarshalString(dataBody, &dataPayload); parseResult.OK { + for key, value := range dataPayload { + payload[key] = value + } + return payload + } + payload["data"] = dataBody + } + + return payload } func readFleetEventBody(body interface{ Read([]byte) (int, error) }) (string, error) { reader := bufio.NewReader(body) rawLines := make([]string, 0, 4) - dataLines := make([]string, 0, 4) for { line, err := reader.ReadString('\n') @@ -920,19 +941,16 @@ func readFleetEventBody(body interface{ Read([]byte) (int, error) }) (string, er trimmed := core.Trim(line) if trimmed != "" { rawLines = append(rawLines, trimmed) - if core.HasPrefix(trimmed, "data:") { - dataLines = append(dataLines, core.Trim(core.TrimPrefix(trimmed, "data:"))) - } - } else if len(dataLines) > 0 { - return core.Join("\n", dataLines...), nil + } else if len(rawLines) > 0 { + return core.Join("\n", rawLines...), nil } } if err != nil && err.Error() == "EOF" { - if len(dataLines) > 0 { - return core.Join("\n", dataLines...), nil + if len(rawLines) > 0 { + return core.Join("\n", rawLines...), nil } - return core.Join("\n", rawLines...), nil + return "", nil } if err != nil { return "", err diff --git a/pkg/agentic/platform_test.go b/pkg/agentic/platform_test.go index 37997a9..33285e7 100644 --- a/pkg/agentic/platform_test.go +++ b/pkg/agentic/platform_test.go @@ -233,7 +233,7 @@ func TestPlatform_HandleFleetEvents_Good(t *testing.T) { require.Equal(t, "/v1/fleet/events", r.URL.Path) require.Equal(t, "charon", r.URL.Query().Get("agent_id")) require.Equal(t, "Bearer secret-token", r.Header.Get("Authorization")) - _, _ = w.Write([]byte("data: {\"event\":\"task.assigned\",\"type\":\"task.assigned\",\"agent_id\":\"charon\",\"task_id\":9,\"repo\":\"core/go-io\",\"branch\":\"dev\",\"status\":\"assigned\"}\n\n")) + _, _ = w.Write([]byte("event: task.assigned\ndata: {\"agent_id\":\"charon\",\"task_id\":9,\"repo\":\"core/go-io\",\"branch\":\"dev\",\"status\":\"assigned\"}\n\n")) })) defer server.Close() @@ -252,6 +252,38 @@ func TestPlatform_HandleFleetEvents_Good(t *testing.T) { assert.Equal(t, "dev", output.Event.Branch) } +func TestPlatform_EventPayloadValue_Good_EventAndData(t *testing.T) { + subsystem := testPrepWithPlatformServer(t, nil, "secret-token") + + payload := subsystem.eventPayloadValue("event: task.assigned\ndata: {\"agent_id\":\"charon\",\"task_id\":9,\"repo\":\"core/go-io\"}") + + require.NotNil(t, payload) + assert.Equal(t, "task.assigned", payload["event"]) + assert.Equal(t, "task.assigned", payload["type"]) + assert.Equal(t, "charon", payload["agent_id"]) + assert.Equal(t, 9.0, payload["task_id"]) + assert.Equal(t, "core/go-io", payload["repo"]) +} + +func TestPlatform_EventPayloadValue_Bad_Empty(t *testing.T) { + subsystem := testPrepWithPlatformServer(t, nil, "secret-token") + + payload := subsystem.eventPayloadValue("") + + assert.Nil(t, payload) +} + +func TestPlatform_EventPayloadValue_Ugly_InvalidData(t *testing.T) { + subsystem := testPrepWithPlatformServer(t, nil, "secret-token") + + payload := subsystem.eventPayloadValue("event: task.assigned\ndata: not-json") + + require.NotNil(t, payload) + assert.Equal(t, "task.assigned", payload["event"]) + assert.Equal(t, "task.assigned", payload["type"]) + assert.Equal(t, "not-json", payload["data"]) +} + func TestPlatform_HandleFleetEvents_Good_FallbackToTaskNext(t *testing.T) { server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { switch r.URL.Path {