fix(agentic): preserve fleet SSE event names
Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
parent
9a9cf8e40b
commit
1b986f9f2d
2 changed files with 73 additions and 23 deletions
|
|
@ -892,27 +892,48 @@ func (s *PrepSubsystem) eventPayloadValue(body string) map[string]any {
|
|||
return nil
|
||||
}
|
||||
|
||||
if core.HasPrefix(trimmed, "data: ") {
|
||||
trimmed = core.Trim(core.TrimPrefix(trimmed, "data: "))
|
||||
}
|
||||
|
||||
var payload map[string]any
|
||||
if parseResult := core.JSONUnmarshalString(trimmed, &payload); parseResult.OK {
|
||||
if payload != nil {
|
||||
payload["raw"] = trimmed
|
||||
}
|
||||
return payload
|
||||
}
|
||||
|
||||
return map[string]any{
|
||||
payload := map[string]any{
|
||||
"raw": trimmed,
|
||||
}
|
||||
|
||||
lines := core.Split(trimmed, "\n")
|
||||
dataLines := make([]string, 0, len(lines))
|
||||
for _, line := range lines {
|
||||
line = core.Trim(line)
|
||||
if line == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
switch {
|
||||
case core.HasPrefix(line, "event:"):
|
||||
eventName := core.Trim(core.TrimPrefix(line, "event:"))
|
||||
if eventName != "" {
|
||||
payload["event"] = eventName
|
||||
payload["type"] = eventName
|
||||
}
|
||||
case core.HasPrefix(line, "data:"):
|
||||
dataLines = append(dataLines, core.Trim(core.TrimPrefix(line, "data:")))
|
||||
}
|
||||
}
|
||||
|
||||
dataBody := core.Join("\n", dataLines...)
|
||||
if dataBody != "" {
|
||||
var dataPayload map[string]any
|
||||
if parseResult := core.JSONUnmarshalString(dataBody, &dataPayload); parseResult.OK {
|
||||
for key, value := range dataPayload {
|
||||
payload[key] = value
|
||||
}
|
||||
return payload
|
||||
}
|
||||
payload["data"] = dataBody
|
||||
}
|
||||
|
||||
return payload
|
||||
}
|
||||
|
||||
func readFleetEventBody(body interface{ Read([]byte) (int, error) }) (string, error) {
|
||||
reader := bufio.NewReader(body)
|
||||
rawLines := make([]string, 0, 4)
|
||||
dataLines := make([]string, 0, 4)
|
||||
|
||||
for {
|
||||
line, err := reader.ReadString('\n')
|
||||
|
|
@ -920,19 +941,16 @@ func readFleetEventBody(body interface{ Read([]byte) (int, error) }) (string, er
|
|||
trimmed := core.Trim(line)
|
||||
if trimmed != "" {
|
||||
rawLines = append(rawLines, trimmed)
|
||||
if core.HasPrefix(trimmed, "data:") {
|
||||
dataLines = append(dataLines, core.Trim(core.TrimPrefix(trimmed, "data:")))
|
||||
}
|
||||
} else if len(dataLines) > 0 {
|
||||
return core.Join("\n", dataLines...), nil
|
||||
} else if len(rawLines) > 0 {
|
||||
return core.Join("\n", rawLines...), nil
|
||||
}
|
||||
}
|
||||
|
||||
if err != nil && err.Error() == "EOF" {
|
||||
if len(dataLines) > 0 {
|
||||
return core.Join("\n", dataLines...), nil
|
||||
if len(rawLines) > 0 {
|
||||
return core.Join("\n", rawLines...), nil
|
||||
}
|
||||
return core.Join("\n", rawLines...), nil
|
||||
return "", nil
|
||||
}
|
||||
if err != nil {
|
||||
return "", err
|
||||
|
|
|
|||
|
|
@ -233,7 +233,7 @@ func TestPlatform_HandleFleetEvents_Good(t *testing.T) {
|
|||
require.Equal(t, "/v1/fleet/events", r.URL.Path)
|
||||
require.Equal(t, "charon", r.URL.Query().Get("agent_id"))
|
||||
require.Equal(t, "Bearer secret-token", r.Header.Get("Authorization"))
|
||||
_, _ = w.Write([]byte("data: {\"event\":\"task.assigned\",\"type\":\"task.assigned\",\"agent_id\":\"charon\",\"task_id\":9,\"repo\":\"core/go-io\",\"branch\":\"dev\",\"status\":\"assigned\"}\n\n"))
|
||||
_, _ = w.Write([]byte("event: task.assigned\ndata: {\"agent_id\":\"charon\",\"task_id\":9,\"repo\":\"core/go-io\",\"branch\":\"dev\",\"status\":\"assigned\"}\n\n"))
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
|
|
@ -252,6 +252,38 @@ func TestPlatform_HandleFleetEvents_Good(t *testing.T) {
|
|||
assert.Equal(t, "dev", output.Event.Branch)
|
||||
}
|
||||
|
||||
func TestPlatform_EventPayloadValue_Good_EventAndData(t *testing.T) {
|
||||
subsystem := testPrepWithPlatformServer(t, nil, "secret-token")
|
||||
|
||||
payload := subsystem.eventPayloadValue("event: task.assigned\ndata: {\"agent_id\":\"charon\",\"task_id\":9,\"repo\":\"core/go-io\"}")
|
||||
|
||||
require.NotNil(t, payload)
|
||||
assert.Equal(t, "task.assigned", payload["event"])
|
||||
assert.Equal(t, "task.assigned", payload["type"])
|
||||
assert.Equal(t, "charon", payload["agent_id"])
|
||||
assert.Equal(t, 9.0, payload["task_id"])
|
||||
assert.Equal(t, "core/go-io", payload["repo"])
|
||||
}
|
||||
|
||||
func TestPlatform_EventPayloadValue_Bad_Empty(t *testing.T) {
|
||||
subsystem := testPrepWithPlatformServer(t, nil, "secret-token")
|
||||
|
||||
payload := subsystem.eventPayloadValue("")
|
||||
|
||||
assert.Nil(t, payload)
|
||||
}
|
||||
|
||||
func TestPlatform_EventPayloadValue_Ugly_InvalidData(t *testing.T) {
|
||||
subsystem := testPrepWithPlatformServer(t, nil, "secret-token")
|
||||
|
||||
payload := subsystem.eventPayloadValue("event: task.assigned\ndata: not-json")
|
||||
|
||||
require.NotNil(t, payload)
|
||||
assert.Equal(t, "task.assigned", payload["event"])
|
||||
assert.Equal(t, "task.assigned", payload["type"])
|
||||
assert.Equal(t, "not-json", payload["data"])
|
||||
}
|
||||
|
||||
func TestPlatform_HandleFleetEvents_Good_FallbackToTaskNext(t *testing.T) {
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
switch r.URL.Path {
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue