diff --git a/pkg/mcp/notify.go b/pkg/mcp/notify.go index eddb6f4..164b32f 100644 --- a/pkg/mcp/notify.go +++ b/pkg/mcp/notify.go @@ -1,18 +1,24 @@ // SPDX-License-Identifier: EUPL-1.2 // Notification broadcasting for the MCP service. -// Pushes events to connected MCP sessions via the logging protocol. -// Channel events use the claude/channel experimental capability. +// Channel events use the claude/channel experimental capability +// via notifications/claude/channel JSON-RPC notifications. package mcp import ( "context" + "encoding/json" "iter" + "os" + "sync" "github.com/modelcontextprotocol/go-sdk/mcp" ) +// stdoutMu protects stdout writes from concurrent goroutines. +var stdoutMu sync.Mutex + // SendNotificationToAllClients broadcasts a log-level notification to every // connected MCP session (stdio, HTTP, TCP, and Unix). // Errors on individual sessions are logged but do not stop the broadcast. @@ -30,34 +36,64 @@ func (s *Service) SendNotificationToAllClients(ctx context.Context, level mcp.Lo } } -// ChannelSend pushes a channel event to all connected clients. -// Channel names follow "subsystem.event" convention. +// channelNotification is the JSON-RPC notification format for claude/channel. +type channelNotification struct { + JSONRPC string `json:"jsonrpc"` + Method string `json:"method"` + Params channelParams `json:"params"` +} + +type channelParams struct { + Content string `json:"content"` + Meta map[string]string `json:"meta,omitempty"` +} + +// ChannelSend pushes a channel event to all connected clients via +// the notifications/claude/channel JSON-RPC method. // // s.ChannelSend(ctx, "agent.complete", map[string]any{"repo": "go-io", "workspace": "go-io-123"}) // s.ChannelSend(ctx, "build.failed", map[string]any{"repo": "core", "error": "test timeout"}) func (s *Service) ChannelSend(ctx context.Context, channel string, data any) { - payload := map[string]any{ - "channel": channel, - "data": data, + // Marshal the data payload as the content string + contentBytes, err := json.Marshal(data) + if err != nil { + s.logger.Debug("channel: failed to marshal data", "channel", channel, "error", err) + return } - s.SendNotificationToAllClients(ctx, "info", "channel", payload) + + notification := channelNotification{ + JSONRPC: "2.0", + Method: "notifications/claude/channel", + Params: channelParams{ + Content: string(contentBytes), + Meta: map[string]string{ + "source": "core-agent", + "channel": channel, + }, + }, + } + + msg, err := json.Marshal(notification) + if err != nil { + s.logger.Debug("channel: failed to marshal notification", "channel", channel, "error", err) + return + } + + // Write directly to stdout (stdio transport) with newline delimiter. + // The official SDK doesn't expose a way to send custom notification methods, + // so we write the JSON-RPC notification directly to the transport. + stdoutMu.Lock() + os.Stdout.Write(append(msg, '\n')) + stdoutMu.Unlock() } // ChannelSendToSession pushes a channel event to a specific session. +// Falls back to stdout for stdio transport. // // s.ChannelSendToSession(ctx, session, "agent.progress", progressData) func (s *Service) ChannelSendToSession(ctx context.Context, session *mcp.ServerSession, channel string, data any) { - payload := map[string]any{ - "channel": channel, - "data": data, - } - if err := session.Log(ctx, &mcp.LoggingMessageParams{ - Level: "info", - Logger: "channel", - Data: payload, - }); err != nil { - s.logger.Debug("channel: failed to send to session", "session", session.ID(), "channel", channel, "error", err) - } + // For now, channel events go to all sessions via stdout + s.ChannelSend(ctx, channel, data) } // Sessions returns an iterator over all connected MCP sessions.