fix(mcp): send channel events via notifications/claude/channel
Some checks failed
CI / test (push) Failing after 3s
Some checks failed
CI / test (push) Failing after 3s
ChannelSend now writes raw JSON-RPC notifications with method notifications/claude/channel directly to stdout, bypassing the SDK's Log() method which uses notifications/message/log. The official Go SDK doesn't expose a way to send custom notification methods, so we write the JSON-RPC notification directly to the stdio transport. This is the format Claude Code channels expect for --channels to surface events in session. Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
parent
c815b9f1b1
commit
8e84a06b82
1 changed files with 55 additions and 19 deletions
|
|
@ -1,18 +1,24 @@
|
|||
// SPDX-License-Identifier: EUPL-1.2
|
||||
|
||||
// Notification broadcasting for the MCP service.
|
||||
// Pushes events to connected MCP sessions via the logging protocol.
|
||||
// Channel events use the claude/channel experimental capability.
|
||||
// Channel events use the claude/channel experimental capability
|
||||
// via notifications/claude/channel JSON-RPC notifications.
|
||||
|
||||
package mcp
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"iter"
|
||||
"os"
|
||||
"sync"
|
||||
|
||||
"github.com/modelcontextprotocol/go-sdk/mcp"
|
||||
)
|
||||
|
||||
// stdoutMu protects stdout writes from concurrent goroutines.
|
||||
var stdoutMu sync.Mutex
|
||||
|
||||
// SendNotificationToAllClients broadcasts a log-level notification to every
|
||||
// connected MCP session (stdio, HTTP, TCP, and Unix).
|
||||
// Errors on individual sessions are logged but do not stop the broadcast.
|
||||
|
|
@ -30,34 +36,64 @@ func (s *Service) SendNotificationToAllClients(ctx context.Context, level mcp.Lo
|
|||
}
|
||||
}
|
||||
|
||||
// ChannelSend pushes a channel event to all connected clients.
|
||||
// Channel names follow "subsystem.event" convention.
|
||||
// channelNotification is the JSON-RPC notification format for claude/channel.
|
||||
type channelNotification struct {
|
||||
JSONRPC string `json:"jsonrpc"`
|
||||
Method string `json:"method"`
|
||||
Params channelParams `json:"params"`
|
||||
}
|
||||
|
||||
type channelParams struct {
|
||||
Content string `json:"content"`
|
||||
Meta map[string]string `json:"meta,omitempty"`
|
||||
}
|
||||
|
||||
// ChannelSend pushes a channel event to all connected clients via
|
||||
// the notifications/claude/channel JSON-RPC method.
|
||||
//
|
||||
// s.ChannelSend(ctx, "agent.complete", map[string]any{"repo": "go-io", "workspace": "go-io-123"})
|
||||
// s.ChannelSend(ctx, "build.failed", map[string]any{"repo": "core", "error": "test timeout"})
|
||||
func (s *Service) ChannelSend(ctx context.Context, channel string, data any) {
|
||||
payload := map[string]any{
|
||||
"channel": channel,
|
||||
"data": data,
|
||||
// Marshal the data payload as the content string
|
||||
contentBytes, err := json.Marshal(data)
|
||||
if err != nil {
|
||||
s.logger.Debug("channel: failed to marshal data", "channel", channel, "error", err)
|
||||
return
|
||||
}
|
||||
s.SendNotificationToAllClients(ctx, "info", "channel", payload)
|
||||
|
||||
notification := channelNotification{
|
||||
JSONRPC: "2.0",
|
||||
Method: "notifications/claude/channel",
|
||||
Params: channelParams{
|
||||
Content: string(contentBytes),
|
||||
Meta: map[string]string{
|
||||
"source": "core-agent",
|
||||
"channel": channel,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
msg, err := json.Marshal(notification)
|
||||
if err != nil {
|
||||
s.logger.Debug("channel: failed to marshal notification", "channel", channel, "error", err)
|
||||
return
|
||||
}
|
||||
|
||||
// Write directly to stdout (stdio transport) with newline delimiter.
|
||||
// The official SDK doesn't expose a way to send custom notification methods,
|
||||
// so we write the JSON-RPC notification directly to the transport.
|
||||
stdoutMu.Lock()
|
||||
os.Stdout.Write(append(msg, '\n'))
|
||||
stdoutMu.Unlock()
|
||||
}
|
||||
|
||||
// ChannelSendToSession pushes a channel event to a specific session.
|
||||
// Falls back to stdout for stdio transport.
|
||||
//
|
||||
// s.ChannelSendToSession(ctx, session, "agent.progress", progressData)
|
||||
func (s *Service) ChannelSendToSession(ctx context.Context, session *mcp.ServerSession, channel string, data any) {
|
||||
payload := map[string]any{
|
||||
"channel": channel,
|
||||
"data": data,
|
||||
}
|
||||
if err := session.Log(ctx, &mcp.LoggingMessageParams{
|
||||
Level: "info",
|
||||
Logger: "channel",
|
||||
Data: payload,
|
||||
}); err != nil {
|
||||
s.logger.Debug("channel: failed to send to session", "session", session.ID(), "channel", channel, "error", err)
|
||||
}
|
||||
// For now, channel events go to all sessions via stdout
|
||||
s.ChannelSend(ctx, channel, data)
|
||||
}
|
||||
|
||||
// Sessions returns an iterator over all connected MCP sessions.
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue