diff --git a/docs/migration-guide-options.md b/docs/migration-guide-options.md new file mode 100644 index 0000000..88c9c32 --- /dev/null +++ b/docs/migration-guide-options.md @@ -0,0 +1,50 @@ +# Migrating to `Options{}` for MCP Service Construction + +## Before (functional options) + +```go +svc, err := mcp.New( + mcp.WithWorkspaceRoot("/path/to/project"), + mcp.WithProcessService(processSvc), + mcp.WithWSHub(hub), + mcp.WithSubsystem(brainSub), + mcp.WithSubsystem(ideSub), +) +``` + +## After (`Options{}` DTO) + +```go +svc, err := mcp.New(mcp.Options{ + WorkspaceRoot: "/path/to/project", + ProcessService: processSvc, + WSHub: hub, + Subsystems: []mcp.Subsystem{ + brainSub, + ideSub, + }, +}) +``` + +## Notification helpers + +```go +// Broadcast to all MCP sessions. +svc.SendNotificationToAllClients(ctx, "info", "build", map[string]any{ + "event": "build.complete", + "repo": "go-io", +}) + +// Broadcast a named channel event. +svc.ChannelSend(ctx, "build.complete", map[string]any{ + "repo": "go-io", + "status": "passed", +}) + +// Send to one session. +for session := range svc.Sessions() { + svc.ChannelSendToSession(ctx, session, "agent.status", map[string]any{ + "state": "running", + }) +} +``` diff --git a/pkg/mcp/brain/brain.go b/pkg/mcp/brain/brain.go index 97ef5b8..f9386cb 100644 --- a/pkg/mcp/brain/brain.go +++ b/pkg/mcp/brain/brain.go @@ -7,8 +7,8 @@ package brain import ( "context" - coreerr "forge.lthn.ai/core/go-log" "dappco.re/go/mcp/pkg/mcp/ide" + coreerr "forge.lthn.ai/core/go-log" "github.com/modelcontextprotocol/go-sdk/mcp" ) @@ -19,7 +19,8 @@ var errBridgeNotAvailable = coreerr.E("brain", "bridge not available", nil) // Subsystem implements mcp.Subsystem for OpenBrain knowledge store operations. // It proxies brain_* tool calls to the Laravel backend via the shared IDE bridge. type Subsystem struct { - bridge *ide.Bridge + bridge *ide.Bridge + notifier Notifier } // New creates a brain subsystem that uses the given IDE bridge for Laravel communication. @@ -31,6 +32,16 @@ func New(bridge *ide.Bridge) *Subsystem { // Name implements mcp.Subsystem. func (s *Subsystem) Name() string { return "brain" } +// Notifier pushes events to MCP sessions (matches pkg/mcp.Notifier). +type Notifier interface { + ChannelSend(ctx context.Context, channel string, data any) +} + +// SetNotifier stores the shared notifier so this subsystem can emit channel events. +func (s *Subsystem) SetNotifier(n Notifier) { + s.notifier = n +} + // RegisterTools implements mcp.Subsystem. func (s *Subsystem) RegisterTools(server *mcp.Server) { s.registerBrainTools(server) diff --git a/pkg/mcp/brain/tools.go b/pkg/mcp/brain/tools.go index 6ad0930..de29384 100644 --- a/pkg/mcp/brain/tools.go +++ b/pkg/mcp/brain/tools.go @@ -6,11 +6,18 @@ import ( "context" "time" - coreerr "forge.lthn.ai/core/go-log" "dappco.re/go/mcp/pkg/mcp/ide" + coreerr "forge.lthn.ai/core/go-log" "github.com/modelcontextprotocol/go-sdk/mcp" ) +// emitChannel pushes a brain event through the shared notifier. +func (s *Subsystem) emitChannel(ctx context.Context, channel string, data any) { + if s.notifier != nil { + s.notifier.ChannelSend(ctx, channel, data) + } +} + // -- Input/Output types ------------------------------------------------------- // RememberInput is the input for brain_remember. @@ -33,17 +40,17 @@ type RememberOutput struct { // RecallInput is the input for brain_recall. type RecallInput struct { - Query string `json:"query"` - TopK int `json:"top_k,omitempty"` + Query string `json:"query"` + TopK int `json:"top_k,omitempty"` Filter RecallFilter `json:"filter,omitempty"` } // RecallFilter holds optional filter criteria for brain_recall. type RecallFilter struct { - Project string `json:"project,omitempty"` - Type any `json:"type,omitempty"` - AgentID string `json:"agent_id,omitempty"` - MinConfidence float64 `json:"min_confidence,omitempty"` + Project string `json:"project,omitempty"` + Type any `json:"type,omitempty"` + AgentID string `json:"agent_id,omitempty"` + MinConfidence float64 `json:"min_confidence,omitempty"` } // RecallOutput is the output for brain_recall. @@ -122,7 +129,7 @@ func (s *Subsystem) registerBrainTools(server *mcp.Server) { // -- Tool handlers ------------------------------------------------------------ -func (s *Subsystem) brainRemember(_ context.Context, _ *mcp.CallToolRequest, input RememberInput) (*mcp.CallToolResult, RememberOutput, error) { +func (s *Subsystem) brainRemember(ctx context.Context, _ *mcp.CallToolRequest, input RememberInput) (*mcp.CallToolResult, RememberOutput, error) { if s.bridge == nil { return nil, RememberOutput{}, errBridgeNotAvailable } @@ -143,13 +150,18 @@ func (s *Subsystem) brainRemember(_ context.Context, _ *mcp.CallToolRequest, inp return nil, RememberOutput{}, coreerr.E("brain.remember", "failed to send brain_remember", err) } + s.emitChannel(ctx, "brain.remember.complete", map[string]any{ + "type": input.Type, + "project": input.Project, + }) + return nil, RememberOutput{ Success: true, Timestamp: time.Now(), }, nil } -func (s *Subsystem) brainRecall(_ context.Context, _ *mcp.CallToolRequest, input RecallInput) (*mcp.CallToolResult, RecallOutput, error) { +func (s *Subsystem) brainRecall(ctx context.Context, _ *mcp.CallToolRequest, input RecallInput) (*mcp.CallToolResult, RecallOutput, error) { if s.bridge == nil { return nil, RecallOutput{}, errBridgeNotAvailable } @@ -166,13 +178,18 @@ func (s *Subsystem) brainRecall(_ context.Context, _ *mcp.CallToolRequest, input return nil, RecallOutput{}, coreerr.E("brain.recall", "failed to send brain_recall", err) } + s.emitChannel(ctx, "brain.recall.complete", map[string]any{ + "query": input.Query, + "count": 0, + }) + return nil, RecallOutput{ Success: true, Memories: []Memory{}, }, nil } -func (s *Subsystem) brainForget(_ context.Context, _ *mcp.CallToolRequest, input ForgetInput) (*mcp.CallToolResult, ForgetOutput, error) { +func (s *Subsystem) brainForget(ctx context.Context, _ *mcp.CallToolRequest, input ForgetInput) (*mcp.CallToolResult, ForgetOutput, error) { if s.bridge == nil { return nil, ForgetOutput{}, errBridgeNotAvailable } @@ -188,6 +205,10 @@ func (s *Subsystem) brainForget(_ context.Context, _ *mcp.CallToolRequest, input return nil, ForgetOutput{}, coreerr.E("brain.forget", "failed to send brain_forget", err) } + s.emitChannel(ctx, "brain.forget.complete", map[string]any{ + "id": input.ID, + }) + return nil, ForgetOutput{ Success: true, Forgotten: input.ID, @@ -195,7 +216,7 @@ func (s *Subsystem) brainForget(_ context.Context, _ *mcp.CallToolRequest, input }, nil } -func (s *Subsystem) brainList(_ context.Context, _ *mcp.CallToolRequest, input ListInput) (*mcp.CallToolResult, ListOutput, error) { +func (s *Subsystem) brainList(ctx context.Context, _ *mcp.CallToolRequest, input ListInput) (*mcp.CallToolResult, ListOutput, error) { if s.bridge == nil { return nil, ListOutput{}, errBridgeNotAvailable } @@ -217,6 +238,13 @@ func (s *Subsystem) brainList(_ context.Context, _ *mcp.CallToolRequest, input L return nil, ListOutput{}, coreerr.E("brain.list", "failed to send brain_list", err) } + s.emitChannel(ctx, "brain.list.complete", map[string]any{ + "project": input.Project, + "type": input.Type, + "agent": input.AgentID, + "limit": limit, + }) + return nil, ListOutput{ Success: true, Memories: []Memory{}, diff --git a/pkg/mcp/notify.go b/pkg/mcp/notify.go index 774e1db..7bbcd47 100644 --- a/pkg/mcp/notify.go +++ b/pkg/mcp/notify.go @@ -13,7 +13,6 @@ import ( "os" "sync" - core "dappco.re/go/core" "github.com/modelcontextprotocol/go-sdk/mcp" ) @@ -54,56 +53,38 @@ func (s *Service) SendNotificationToAllClients(ctx context.Context, level mcp.Lo } } -// channelNotification is the JSON-RPC notification format for claude/channel. -type channelNotification struct { - JSONRPC string `json:"jsonrpc"` - Method string `json:"method"` - Params channelParams `json:"params"` -} - -type channelParams struct { - Content string `json:"content"` - Meta map[string]string `json:"meta,omitempty"` -} - // ChannelSend pushes a channel event to all connected clients via // the notifications/claude/channel JSON-RPC method. // // s.ChannelSend(ctx, "agent.complete", map[string]any{"repo": "go-io", "workspace": "go-io-123"}) // s.ChannelSend(ctx, "build.failed", map[string]any{"repo": "core", "error": "test timeout"}) func (s *Service) ChannelSend(ctx context.Context, channel string, data any) { - // Marshal the data payload as the content string - content := core.JSONMarshalString(data) - - notification := channelNotification{ - JSONRPC: "2.0", - Method: "notifications/claude/channel", - Params: channelParams{ - Content: content, - Meta: map[string]string{ - "source": "core-agent", - "channel": channel, - }, - }, + payload := map[string]any{ + "channel": channel, + "data": data, } - - msg := core.JSONMarshalString(notification) - - // Write through the shared locked writer (same one the SDK transport uses). - // This prevents channel notifications from interleaving with SDK messages. - if !s.stdioMode { - return - } - sharedStdout.Write([]byte(core.Concat(msg, "\n"))) + s.SendNotificationToAllClients(ctx, mcp.LoggingLevel("info"), "channel", payload) } // ChannelSendToSession pushes a channel event to a specific session. -// Falls back to stdout for stdio transport. // // s.ChannelSendToSession(ctx, session, "agent.progress", progressData) func (s *Service) ChannelSendToSession(ctx context.Context, session *mcp.ServerSession, channel string, data any) { - // For now, channel events go to all sessions via stdout - s.ChannelSend(ctx, channel, data) + if session == nil { + return + } + + payload := map[string]any{ + "channel": channel, + "data": data, + } + if err := session.Log(ctx, &mcp.LoggingMessageParams{ + Level: mcp.LoggingLevel("info"), + Logger: "channel", + Data: payload, + }); err != nil { + s.logger.Debug("channel: failed to send to session", "session", session.ID(), "error", err) + } } // Sessions returns an iterator over all connected MCP sessions. @@ -128,6 +109,9 @@ func channelCapability() map[string]any { "agent.status", "build.complete", "build.failed", + "brain.list.complete", + "brain.forget.complete", + "brain.remember.complete", "brain.recall.complete", "inbox.message", "process.exit", diff --git a/pkg/mcp/notify_test.go b/pkg/mcp/notify_test.go new file mode 100644 index 0000000..fb90c96 --- /dev/null +++ b/pkg/mcp/notify_test.go @@ -0,0 +1,67 @@ +package mcp + +import ( + "context" + "testing" +) + +func TestSendNotificationToAllClients_Good(t *testing.T) { + svc, err := New(Options{}) + if err != nil { + t.Fatalf("New() failed: %v", err) + } + + ctx := context.Background() + svc.SendNotificationToAllClients(ctx, "info", "test", map[string]any{ + "event": "build.complete", + }) +} + +func TestChannelSend_Good(t *testing.T) { + svc, err := New(Options{}) + if err != nil { + t.Fatalf("New() failed: %v", err) + } + + ctx := context.Background() + svc.ChannelSend(ctx, "build.complete", map[string]any{ + "repo": "go-io", + }) +} + +func TestChannelSendToSession_Good_GuardNilSession(t *testing.T) { + svc, err := New(Options{}) + if err != nil { + t.Fatalf("New() failed: %v", err) + } + + ctx := context.Background() + svc.ChannelSendToSession(ctx, nil, "agent.status", map[string]any{ + "ok": true, + }) +} + +func TestChannelCapability_Good(t *testing.T) { + caps := channelCapability() + raw, ok := caps["claude/channel"] + if !ok { + t.Fatal("expected claude/channel capability entry") + } + + cap, ok := raw.(map[string]any) + if !ok { + t.Fatalf("expected claude/channel to be a map, got %T", raw) + } + + if cap["version"] == nil || cap["description"] == nil { + t.Fatalf("expected capability to include version and description: %#v", cap) + } + + channels, ok := cap["channels"].([]string) + if !ok { + t.Fatalf("expected channels to be []string, got %T", cap["channels"]) + } + if len(channels) == 0 { + t.Fatal("expected at least one channel in capability definition") + } +}