fix(mcp): use shared locked writer for channel notifications
ChannelSend was writing to os.Stdout directly while the SDK's StdioTransport also writes to os.Stdout — causing interleaved JSON-RPC messages. Now both use a shared lockedWriter via IOTransport. Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
parent
3ece6656d2
commit
c8089fd597
2 changed files with 28 additions and 10 deletions
|
|
@ -8,6 +8,7 @@ import (
|
|||
"context"
|
||||
"iter"
|
||||
"net/http"
|
||||
"os"
|
||||
"slices"
|
||||
"sync"
|
||||
|
||||
|
|
@ -661,7 +662,10 @@ func (s *Service) Run(ctx context.Context) error {
|
|||
return s.ServeTCP(ctx, addr)
|
||||
}
|
||||
s.stdioMode = true
|
||||
return s.server.Run(ctx, &mcp.StdioTransport{})
|
||||
return s.server.Run(ctx, &mcp.IOTransport{
|
||||
Reader: os.Stdin,
|
||||
Writer: sharedStdout,
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -8,6 +8,7 @@ package mcp
|
|||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"iter"
|
||||
"os"
|
||||
"sync"
|
||||
|
|
@ -16,8 +17,25 @@ import (
|
|||
"github.com/modelcontextprotocol/go-sdk/mcp"
|
||||
)
|
||||
|
||||
// stdoutMu protects stdout writes from concurrent goroutines.
|
||||
var stdoutMu sync.Mutex
|
||||
// lockedWriter wraps an io.Writer with a mutex.
|
||||
// Both the SDK's transport and ChannelSend use this writer,
|
||||
// ensuring channel notifications don't interleave with SDK messages.
|
||||
type lockedWriter struct {
|
||||
mu sync.Mutex
|
||||
w io.Writer
|
||||
}
|
||||
|
||||
func (lw *lockedWriter) Write(p []byte) (int, error) {
|
||||
lw.mu.Lock()
|
||||
defer lw.mu.Unlock()
|
||||
return lw.w.Write(p)
|
||||
}
|
||||
|
||||
func (lw *lockedWriter) Close() error { return nil }
|
||||
|
||||
// sharedStdout is the single writer for all stdio output.
|
||||
// Created once when the MCP service enters stdio mode.
|
||||
var sharedStdout = &lockedWriter{w: os.Stdout}
|
||||
|
||||
// SendNotificationToAllClients broadcasts a log-level notification to every
|
||||
// connected MCP session (stdio, HTTP, TCP, and Unix).
|
||||
|
|
@ -71,16 +89,12 @@ func (s *Service) ChannelSend(ctx context.Context, channel string, data any) {
|
|||
|
||||
msg := core.JSONMarshalString(notification)
|
||||
|
||||
// Write directly to stdout (stdio transport) with newline delimiter.
|
||||
// The official SDK doesn't expose a way to send custom notification methods,
|
||||
// so we write the JSON-RPC notification directly to the transport.
|
||||
// Only write when running in stdio mode — HTTP/TCP transports don't use stdout.
|
||||
// Write through the shared locked writer (same one the SDK transport uses).
|
||||
// This prevents channel notifications from interleaving with SDK messages.
|
||||
if !s.stdioMode {
|
||||
return
|
||||
}
|
||||
stdoutMu.Lock()
|
||||
os.Stdout.Write([]byte(core.Concat(msg, "\n")))
|
||||
stdoutMu.Unlock()
|
||||
sharedStdout.Write([]byte(core.Concat(msg, "\n")))
|
||||
}
|
||||
|
||||
// ChannelSendToSession pushes a channel event to a specific session.
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue