From 6924ff3f492818c84429c954c35baa7aad2c986e Mon Sep 17 00:00:00 2001 From: Snider Date: Thu, 26 Mar 2026 12:55:52 +0000 Subject: [PATCH] fix(monitor): push full message content via ChannelPush, not counts MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Monitor checkInbox now sends each new message as a ChannelPush with from/subject/content — lands directly in the Claude Code session. Removes the useless InboxMessage{New, Total} count relay through runner. Co-Authored-By: Virgil --- pkg/messages/messages.go | 9 +++++---- pkg/messages/messages_test.go | 2 +- pkg/monitor/monitor.go | 13 +++++++++++-- pkg/monitor/monitor_test.go | 25 +++++++++++++------------ 4 files changed, 30 insertions(+), 19 deletions(-) diff --git a/pkg/messages/messages.go b/pkg/messages/messages.go index 8c5859a..8fd6c52 100644 --- a/pkg/messages/messages.go +++ b/pkg/messages/messages.go @@ -121,10 +121,11 @@ type HarvestRejected struct { Reason string } -// InboxMessage is broadcast when new inter-agent messages arrive. +// InboxMessage is broadcast when a new inter-agent message arrives. // -// c.ACTION(messages.InboxMessage{New: 2, Total: 15}) +// c.ACTION(messages.InboxMessage{From: "charon", Subject: "status", Content: "all green"}) type InboxMessage struct { - New int - Total int + From string + Subject string + Content string } diff --git a/pkg/messages/messages_test.go b/pkg/messages/messages_test.go index b5ff498..693fe4c 100644 --- a/pkg/messages/messages_test.go +++ b/pkg/messages/messages_test.go @@ -24,7 +24,7 @@ func TestMessages_AllSatisfyMessage_Good(t *testing.T) { RateLimitDetected{Pool: "codex", Duration: "30m"}, HarvestComplete{Repo: "go-io", Branch: "agent/fix", Files: 5}, HarvestRejected{Repo: "go-io", Branch: "agent/fix", Reason: "binary detected"}, - InboxMessage{New: 2, Total: 15}, + InboxMessage{From: "charon", Subject: "test", Content: "hello"}, } assert.Len(t, msgs, 12, "expected 12 message types") diff --git a/pkg/monitor/monitor.go b/pkg/monitor/monitor.go index d46c177..44516c8 100644 --- a/pkg/monitor/monitor.go +++ b/pkg/monitor/monitor.go @@ -537,9 +537,18 @@ func (m *Subsystem) checkInbox() string { return "" } - // Push channel event with full message content + // Push each message as a channel event so it lands in the session if m.ServiceRuntime != nil { - m.Core().ACTION(messages.InboxMessage{New: len(newMessages), Total: unread}) + for _, msg := range newMessages { + m.Core().ACTION(coremcp.ChannelPush{ + Channel: "inbox.message", + Data: map[string]any{ + "from": msg.From, + "subject": msg.Subject, + "content": msg.Content, + }, + }) + } } return core.Sprintf("%d unread message(s) in inbox", unread) diff --git a/pkg/monitor/monitor_test.go b/pkg/monitor/monitor_test.go index 9d3ac95..54b9962 100644 --- a/pkg/monitor/monitor_test.go +++ b/pkg/monitor/monitor_test.go @@ -12,6 +12,7 @@ import ( "dappco.re/go/agent/pkg/messages" core "dappco.re/go/core" + coremcp "dappco.re/go/mcp/pkg/mcp" "dappco.re/go/core/process" "github.com/modelcontextprotocol/go-sdk/mcp" "github.com/stretchr/testify/assert" @@ -259,11 +260,11 @@ func TestMonitor_CheckInbox_Good_UnreadMessages(t *testing.T) { t.Setenv("CORE_API_URL", srv.URL) t.Setenv("AGENT_NAME", "test-agent") - // Create Core with IPC handler to capture InboxMessage - var captured []messages.InboxMessage + // Create Core with IPC handler to capture ChannelPush + var captured []coremcp.ChannelPush c := core.New() c.RegisterAction(func(_ *core.Core, msg core.Message) core.Result { - if ev, ok := msg.(messages.InboxMessage); ok { + if ev, ok := msg.(coremcp.ChannelPush); ok { captured = append(captured, ev) } return core.Result{OK: true} @@ -276,9 +277,11 @@ func TestMonitor_CheckInbox_Good_UnreadMessages(t *testing.T) { msg := mon.checkInbox() assert.Contains(t, msg, "2 unread message(s) in inbox") - require.Len(t, captured, 1) - assert.Equal(t, 3, captured[0].New) - assert.Equal(t, 2, captured[0].Total) + // 3 new messages (ids 1,2,3 all > prevMaxID=0), but only 2 unread + // Monitor sends one ChannelPush per NEW message (higher ID than last seen) + require.Len(t, captured, 3) + data := captured[0].Data.(map[string]any) + assert.Equal(t, "clotho", data["from"]) } func TestMonitor_CheckInbox_Good_NoUnread(t *testing.T) { @@ -373,11 +376,11 @@ func TestMonitor_CheckInbox_Good_MultipleSameSender(t *testing.T) { setupAPIEnv(t, srv.URL) - // Create Core with IPC handler to capture InboxMessage - var captured []messages.InboxMessage + // Create Core with IPC handler to capture ChannelPush + var captured []coremcp.ChannelPush c := core.New() c.RegisterAction(func(_ *core.Core, msg core.Message) core.Result { - if ev, ok := msg.(messages.InboxMessage); ok { + if ev, ok := msg.(coremcp.ChannelPush); ok { captured = append(captured, ev) } return core.Result{OK: true} @@ -390,9 +393,7 @@ func TestMonitor_CheckInbox_Good_MultipleSameSender(t *testing.T) { msg := mon.checkInbox() assert.Contains(t, msg, "3 unread message(s)") - require.Len(t, captured, 1) - assert.Equal(t, 3, captured[0].New) - assert.Equal(t, 3, captured[0].Total) + require.True(t, len(captured) > 0, "should capture at least one ChannelPush") } // --- check (integration of sub-checks) ---