feat(messages): define IPC message types for inter-service communication

12 message types covering: agent lifecycle (Started/Completed),
QA+PR pipeline (QAResult/PRCreated/PRMerged/PRNeedsReview),
queue orchestration (QueueDrained/PokeQueue/RateLimitDetected),
monitor events (HarvestComplete/HarvestRejected/InboxMessage).

These replace the CompletionNotifier and ChannelNotifier callback
interfaces with typed broadcast messages via c.ACTION().

Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
Snider 2026-03-24 14:24:46 +00:00
parent b5873a8f31
commit 781a5b414e
2 changed files with 181 additions and 0 deletions

120
pkg/messages/messages.go Normal file
View file

@ -0,0 +1,120 @@
// SPDX-License-Identifier: EUPL-1.2
// Package messages defines IPC message types for inter-service communication
// within core-agent. Services emit these via c.ACTION() and handle them via
// c.RegisterAction(). No service imports another — they share only these types.
//
// c.ACTION(messages.AgentCompleted{Agent: "codex", Repo: "go-io", Status: "completed"})
package messages
// --- Agent Lifecycle ---
// AgentStarted is broadcast when a subagent process is spawned.
//
// c.ACTION(messages.AgentStarted{Agent: "codex", Repo: "go-io", Workspace: "core/go-io/task-5"})
type AgentStarted struct {
Agent string
Repo string
Workspace string
}
// AgentCompleted is broadcast when a subagent process exits.
//
// c.ACTION(messages.AgentCompleted{Agent: "codex", Repo: "go-io", Workspace: "core/go-io/task-5", Status: "completed"})
type AgentCompleted struct {
Agent string
Repo string
Workspace string
Status string // completed, failed, blocked
}
// --- QA & PR Pipeline ---
// QAResult is broadcast after QA runs on a completed workspace.
//
// c.ACTION(messages.QAResult{Workspace: "core/go-io/task-5", Repo: "go-io", Passed: true})
type QAResult struct {
Workspace string
Repo string
Passed bool
Output string
}
// PRCreated is broadcast after a PR is auto-created on Forge.
//
// c.ACTION(messages.PRCreated{Repo: "go-io", Branch: "agent/fix-tests", PRURL: "https://...", PRNum: 12})
type PRCreated struct {
Repo string
Branch string
PRURL string
PRNum int
}
// PRMerged is broadcast after a PR is auto-verified and merged.
//
// c.ACTION(messages.PRMerged{Repo: "go-io", PRURL: "https://...", PRNum: 12})
type PRMerged struct {
Repo string
PRURL string
PRNum int
}
// PRNeedsReview is broadcast when auto-merge fails and human attention is needed.
//
// c.ACTION(messages.PRNeedsReview{Repo: "go-io", PRNum: 12, Reason: "merge conflict"})
type PRNeedsReview struct {
Repo string
PRURL string
PRNum int
Reason string
}
// --- Queue ---
// QueueDrained is broadcast when running=0 and queued=0 (genuinely empty).
//
// c.ACTION(messages.QueueDrained{Completed: 3})
type QueueDrained struct {
Completed int
}
// PokeQueue signals the runner to drain the queue immediately.
//
// c.ACTION(messages.PokeQueue{})
type PokeQueue struct{}
// RateLimitDetected is broadcast when fast failures trigger agent pool backoff.
//
// c.ACTION(messages.RateLimitDetected{Pool: "codex", Duration: "30m"})
type RateLimitDetected struct {
Pool string
Duration string
}
// --- Monitor Events ---
// HarvestComplete is broadcast when a workspace branch is ready for review.
//
// c.ACTION(messages.HarvestComplete{Repo: "go-io", Branch: "agent/fix-tests", Files: 5})
type HarvestComplete struct {
Repo string
Branch string
Files int
}
// HarvestRejected is broadcast when a workspace fails safety checks (binaries, size).
//
// c.ACTION(messages.HarvestRejected{Repo: "go-io", Branch: "agent/fix-tests", Reason: "binary detected"})
type HarvestRejected struct {
Repo string
Branch string
Reason string
}
// InboxMessage is broadcast when new inter-agent messages arrive.
//
// c.ACTION(messages.InboxMessage{New: 2, Total: 15})
type InboxMessage struct {
New int
Total int
}

View file

@ -0,0 +1,61 @@
// SPDX-License-Identifier: EUPL-1.2
package messages
import (
"testing"
core "dappco.re/go/core"
"github.com/stretchr/testify/assert"
)
// TestMessageTypes_Good_AllSatisfyMessage verifies every message type can be
// used as a core.Message (which is `any`). This is a compile-time + runtime check.
func TestMessageTypes_Good_AllSatisfyMessage(t *testing.T) {
msgs := []core.Message{
AgentStarted{Agent: "codex", Repo: "go-io", Workspace: "core/go-io/task-5"},
AgentCompleted{Agent: "codex", Repo: "go-io", Workspace: "core/go-io/task-5", Status: "completed"},
QAResult{Workspace: "core/go-io/task-5", Repo: "go-io", Passed: true},
PRCreated{Repo: "go-io", Branch: "agent/fix", PRURL: "https://forge.lthn.ai/core/go-io/pulls/1", PRNum: 1},
PRMerged{Repo: "go-io", PRURL: "https://forge.lthn.ai/core/go-io/pulls/1", PRNum: 1},
PRNeedsReview{Repo: "go-io", PRNum: 1, Reason: "merge conflict"},
QueueDrained{Completed: 3},
PokeQueue{},
RateLimitDetected{Pool: "codex", Duration: "30m"},
HarvestComplete{Repo: "go-io", Branch: "agent/fix", Files: 5},
HarvestRejected{Repo: "go-io", Branch: "agent/fix", Reason: "binary detected"},
InboxMessage{New: 2, Total: 15},
}
assert.Len(t, msgs, 12, "expected 12 message types")
for _, msg := range msgs {
assert.NotNil(t, msg)
}
}
// TestAgentCompleted_Good_TypeSwitch verifies the IPC dispatch pattern works.
func TestAgentCompleted_Good_TypeSwitch(t *testing.T) {
var msg core.Message = AgentCompleted{
Agent: "codex",
Repo: "go-io",
Workspace: "core/go-io/task-5",
Status: "completed",
}
handled := false
switch ev := msg.(type) {
case AgentCompleted:
assert.Equal(t, "codex", ev.Agent)
assert.Equal(t, "go-io", ev.Repo)
assert.Equal(t, "completed", ev.Status)
handled = true
}
assert.True(t, handled)
}
// TestPokeQueue_Good_EmptyMessage verifies zero-field messages work as signals.
func TestPokeQueue_Good_EmptyMessage(t *testing.T) {
var msg core.Message = PokeQueue{}
_, ok := msg.(PokeQueue)
assert.True(t, ok)
}