From 781a5b414e63c50c71fde70127817079d26f1c01 Mon Sep 17 00:00:00 2001 From: Snider Date: Tue, 24 Mar 2026 14:24:46 +0000 Subject: [PATCH] feat(messages): define IPC message types for inter-service communication 12 message types covering: agent lifecycle (Started/Completed), QA+PR pipeline (QAResult/PRCreated/PRMerged/PRNeedsReview), queue orchestration (QueueDrained/PokeQueue/RateLimitDetected), monitor events (HarvestComplete/HarvestRejected/InboxMessage). These replace the CompletionNotifier and ChannelNotifier callback interfaces with typed broadcast messages via c.ACTION(). Co-Authored-By: Virgil --- pkg/messages/messages.go | 120 ++++++++++++++++++++++++++++++++++ pkg/messages/messages_test.go | 61 +++++++++++++++++ 2 files changed, 181 insertions(+) create mode 100644 pkg/messages/messages.go create mode 100644 pkg/messages/messages_test.go diff --git a/pkg/messages/messages.go b/pkg/messages/messages.go new file mode 100644 index 0000000..5458c5c --- /dev/null +++ b/pkg/messages/messages.go @@ -0,0 +1,120 @@ +// SPDX-License-Identifier: EUPL-1.2 + +// Package messages defines IPC message types for inter-service communication +// within core-agent. Services emit these via c.ACTION() and handle them via +// c.RegisterAction(). No service imports another — they share only these types. +// +// c.ACTION(messages.AgentCompleted{Agent: "codex", Repo: "go-io", Status: "completed"}) +package messages + +// --- Agent Lifecycle --- + +// AgentStarted is broadcast when a subagent process is spawned. +// +// c.ACTION(messages.AgentStarted{Agent: "codex", Repo: "go-io", Workspace: "core/go-io/task-5"}) +type AgentStarted struct { + Agent string + Repo string + Workspace string +} + +// AgentCompleted is broadcast when a subagent process exits. +// +// c.ACTION(messages.AgentCompleted{Agent: "codex", Repo: "go-io", Workspace: "core/go-io/task-5", Status: "completed"}) +type AgentCompleted struct { + Agent string + Repo string + Workspace string + Status string // completed, failed, blocked +} + +// --- QA & PR Pipeline --- + +// QAResult is broadcast after QA runs on a completed workspace. +// +// c.ACTION(messages.QAResult{Workspace: "core/go-io/task-5", Repo: "go-io", Passed: true}) +type QAResult struct { + Workspace string + Repo string + Passed bool + Output string +} + +// PRCreated is broadcast after a PR is auto-created on Forge. +// +// c.ACTION(messages.PRCreated{Repo: "go-io", Branch: "agent/fix-tests", PRURL: "https://...", PRNum: 12}) +type PRCreated struct { + Repo string + Branch string + PRURL string + PRNum int +} + +// PRMerged is broadcast after a PR is auto-verified and merged. +// +// c.ACTION(messages.PRMerged{Repo: "go-io", PRURL: "https://...", PRNum: 12}) +type PRMerged struct { + Repo string + PRURL string + PRNum int +} + +// PRNeedsReview is broadcast when auto-merge fails and human attention is needed. +// +// c.ACTION(messages.PRNeedsReview{Repo: "go-io", PRNum: 12, Reason: "merge conflict"}) +type PRNeedsReview struct { + Repo string + PRURL string + PRNum int + Reason string +} + +// --- Queue --- + +// QueueDrained is broadcast when running=0 and queued=0 (genuinely empty). +// +// c.ACTION(messages.QueueDrained{Completed: 3}) +type QueueDrained struct { + Completed int +} + +// PokeQueue signals the runner to drain the queue immediately. +// +// c.ACTION(messages.PokeQueue{}) +type PokeQueue struct{} + +// RateLimitDetected is broadcast when fast failures trigger agent pool backoff. +// +// c.ACTION(messages.RateLimitDetected{Pool: "codex", Duration: "30m"}) +type RateLimitDetected struct { + Pool string + Duration string +} + +// --- Monitor Events --- + +// HarvestComplete is broadcast when a workspace branch is ready for review. +// +// c.ACTION(messages.HarvestComplete{Repo: "go-io", Branch: "agent/fix-tests", Files: 5}) +type HarvestComplete struct { + Repo string + Branch string + Files int +} + +// HarvestRejected is broadcast when a workspace fails safety checks (binaries, size). +// +// c.ACTION(messages.HarvestRejected{Repo: "go-io", Branch: "agent/fix-tests", Reason: "binary detected"}) +type HarvestRejected struct { + Repo string + Branch string + Reason string +} + +// InboxMessage is broadcast when new inter-agent messages arrive. +// +// c.ACTION(messages.InboxMessage{New: 2, Total: 15}) +type InboxMessage struct { + New int + Total int +} diff --git a/pkg/messages/messages_test.go b/pkg/messages/messages_test.go new file mode 100644 index 0000000..49a3aee --- /dev/null +++ b/pkg/messages/messages_test.go @@ -0,0 +1,61 @@ +// SPDX-License-Identifier: EUPL-1.2 + +package messages + +import ( + "testing" + + core "dappco.re/go/core" + "github.com/stretchr/testify/assert" +) + +// TestMessageTypes_Good_AllSatisfyMessage verifies every message type can be +// used as a core.Message (which is `any`). This is a compile-time + runtime check. +func TestMessageTypes_Good_AllSatisfyMessage(t *testing.T) { + msgs := []core.Message{ + AgentStarted{Agent: "codex", Repo: "go-io", Workspace: "core/go-io/task-5"}, + AgentCompleted{Agent: "codex", Repo: "go-io", Workspace: "core/go-io/task-5", Status: "completed"}, + QAResult{Workspace: "core/go-io/task-5", Repo: "go-io", Passed: true}, + PRCreated{Repo: "go-io", Branch: "agent/fix", PRURL: "https://forge.lthn.ai/core/go-io/pulls/1", PRNum: 1}, + PRMerged{Repo: "go-io", PRURL: "https://forge.lthn.ai/core/go-io/pulls/1", PRNum: 1}, + PRNeedsReview{Repo: "go-io", PRNum: 1, Reason: "merge conflict"}, + QueueDrained{Completed: 3}, + PokeQueue{}, + RateLimitDetected{Pool: "codex", Duration: "30m"}, + HarvestComplete{Repo: "go-io", Branch: "agent/fix", Files: 5}, + HarvestRejected{Repo: "go-io", Branch: "agent/fix", Reason: "binary detected"}, + InboxMessage{New: 2, Total: 15}, + } + + assert.Len(t, msgs, 12, "expected 12 message types") + for _, msg := range msgs { + assert.NotNil(t, msg) + } +} + +// TestAgentCompleted_Good_TypeSwitch verifies the IPC dispatch pattern works. +func TestAgentCompleted_Good_TypeSwitch(t *testing.T) { + var msg core.Message = AgentCompleted{ + Agent: "codex", + Repo: "go-io", + Workspace: "core/go-io/task-5", + Status: "completed", + } + + handled := false + switch ev := msg.(type) { + case AgentCompleted: + assert.Equal(t, "codex", ev.Agent) + assert.Equal(t, "go-io", ev.Repo) + assert.Equal(t, "completed", ev.Status) + handled = true + } + assert.True(t, handled) +} + +// TestPokeQueue_Good_EmptyMessage verifies zero-field messages work as signals. +func TestPokeQueue_Good_EmptyMessage(t *testing.T) { + var msg core.Message = PokeQueue{} + _, ok := msg.(PokeQueue) + assert.True(t, ok) +}