feat(mcp): align channel notifications with AX notifier flow
This commit is contained in:
parent
014c18e563
commit
ea8478b776
5 changed files with 191 additions and 51 deletions
50
docs/migration-guide-options.md
Normal file
50
docs/migration-guide-options.md
Normal file
|
|
@ -0,0 +1,50 @@
|
|||
# Migrating to `Options{}` for MCP Service Construction
|
||||
|
||||
## Before (functional options)
|
||||
|
||||
```go
|
||||
svc, err := mcp.New(
|
||||
mcp.WithWorkspaceRoot("/path/to/project"),
|
||||
mcp.WithProcessService(processSvc),
|
||||
mcp.WithWSHub(hub),
|
||||
mcp.WithSubsystem(brainSub),
|
||||
mcp.WithSubsystem(ideSub),
|
||||
)
|
||||
```
|
||||
|
||||
## After (`Options{}` DTO)
|
||||
|
||||
```go
|
||||
svc, err := mcp.New(mcp.Options{
|
||||
WorkspaceRoot: "/path/to/project",
|
||||
ProcessService: processSvc,
|
||||
WSHub: hub,
|
||||
Subsystems: []mcp.Subsystem{
|
||||
brainSub,
|
||||
ideSub,
|
||||
},
|
||||
})
|
||||
```
|
||||
|
||||
## Notification helpers
|
||||
|
||||
```go
|
||||
// Broadcast to all MCP sessions.
|
||||
svc.SendNotificationToAllClients(ctx, "info", "build", map[string]any{
|
||||
"event": "build.complete",
|
||||
"repo": "go-io",
|
||||
})
|
||||
|
||||
// Broadcast a named channel event.
|
||||
svc.ChannelSend(ctx, "build.complete", map[string]any{
|
||||
"repo": "go-io",
|
||||
"status": "passed",
|
||||
})
|
||||
|
||||
// Send to one session.
|
||||
for session := range svc.Sessions() {
|
||||
svc.ChannelSendToSession(ctx, session, "agent.status", map[string]any{
|
||||
"state": "running",
|
||||
})
|
||||
}
|
||||
```
|
||||
|
|
@ -7,8 +7,8 @@ package brain
|
|||
import (
|
||||
"context"
|
||||
|
||||
coreerr "forge.lthn.ai/core/go-log"
|
||||
"dappco.re/go/mcp/pkg/mcp/ide"
|
||||
coreerr "forge.lthn.ai/core/go-log"
|
||||
"github.com/modelcontextprotocol/go-sdk/mcp"
|
||||
)
|
||||
|
||||
|
|
@ -19,7 +19,8 @@ var errBridgeNotAvailable = coreerr.E("brain", "bridge not available", nil)
|
|||
// Subsystem implements mcp.Subsystem for OpenBrain knowledge store operations.
|
||||
// It proxies brain_* tool calls to the Laravel backend via the shared IDE bridge.
|
||||
type Subsystem struct {
|
||||
bridge *ide.Bridge
|
||||
bridge *ide.Bridge
|
||||
notifier Notifier
|
||||
}
|
||||
|
||||
// New creates a brain subsystem that uses the given IDE bridge for Laravel communication.
|
||||
|
|
@ -31,6 +32,16 @@ func New(bridge *ide.Bridge) *Subsystem {
|
|||
// Name implements mcp.Subsystem.
|
||||
func (s *Subsystem) Name() string { return "brain" }
|
||||
|
||||
// Notifier pushes events to MCP sessions (matches pkg/mcp.Notifier).
|
||||
type Notifier interface {
|
||||
ChannelSend(ctx context.Context, channel string, data any)
|
||||
}
|
||||
|
||||
// SetNotifier stores the shared notifier so this subsystem can emit channel events.
|
||||
func (s *Subsystem) SetNotifier(n Notifier) {
|
||||
s.notifier = n
|
||||
}
|
||||
|
||||
// RegisterTools implements mcp.Subsystem.
|
||||
func (s *Subsystem) RegisterTools(server *mcp.Server) {
|
||||
s.registerBrainTools(server)
|
||||
|
|
|
|||
|
|
@ -6,11 +6,18 @@ import (
|
|||
"context"
|
||||
"time"
|
||||
|
||||
coreerr "forge.lthn.ai/core/go-log"
|
||||
"dappco.re/go/mcp/pkg/mcp/ide"
|
||||
coreerr "forge.lthn.ai/core/go-log"
|
||||
"github.com/modelcontextprotocol/go-sdk/mcp"
|
||||
)
|
||||
|
||||
// emitChannel pushes a brain event through the shared notifier.
|
||||
func (s *Subsystem) emitChannel(ctx context.Context, channel string, data any) {
|
||||
if s.notifier != nil {
|
||||
s.notifier.ChannelSend(ctx, channel, data)
|
||||
}
|
||||
}
|
||||
|
||||
// -- Input/Output types -------------------------------------------------------
|
||||
|
||||
// RememberInput is the input for brain_remember.
|
||||
|
|
@ -33,17 +40,17 @@ type RememberOutput struct {
|
|||
|
||||
// RecallInput is the input for brain_recall.
|
||||
type RecallInput struct {
|
||||
Query string `json:"query"`
|
||||
TopK int `json:"top_k,omitempty"`
|
||||
Query string `json:"query"`
|
||||
TopK int `json:"top_k,omitempty"`
|
||||
Filter RecallFilter `json:"filter,omitempty"`
|
||||
}
|
||||
|
||||
// RecallFilter holds optional filter criteria for brain_recall.
|
||||
type RecallFilter struct {
|
||||
Project string `json:"project,omitempty"`
|
||||
Type any `json:"type,omitempty"`
|
||||
AgentID string `json:"agent_id,omitempty"`
|
||||
MinConfidence float64 `json:"min_confidence,omitempty"`
|
||||
Project string `json:"project,omitempty"`
|
||||
Type any `json:"type,omitempty"`
|
||||
AgentID string `json:"agent_id,omitempty"`
|
||||
MinConfidence float64 `json:"min_confidence,omitempty"`
|
||||
}
|
||||
|
||||
// RecallOutput is the output for brain_recall.
|
||||
|
|
@ -122,7 +129,7 @@ func (s *Subsystem) registerBrainTools(server *mcp.Server) {
|
|||
|
||||
// -- Tool handlers ------------------------------------------------------------
|
||||
|
||||
func (s *Subsystem) brainRemember(_ context.Context, _ *mcp.CallToolRequest, input RememberInput) (*mcp.CallToolResult, RememberOutput, error) {
|
||||
func (s *Subsystem) brainRemember(ctx context.Context, _ *mcp.CallToolRequest, input RememberInput) (*mcp.CallToolResult, RememberOutput, error) {
|
||||
if s.bridge == nil {
|
||||
return nil, RememberOutput{}, errBridgeNotAvailable
|
||||
}
|
||||
|
|
@ -143,13 +150,18 @@ func (s *Subsystem) brainRemember(_ context.Context, _ *mcp.CallToolRequest, inp
|
|||
return nil, RememberOutput{}, coreerr.E("brain.remember", "failed to send brain_remember", err)
|
||||
}
|
||||
|
||||
s.emitChannel(ctx, "brain.remember.complete", map[string]any{
|
||||
"type": input.Type,
|
||||
"project": input.Project,
|
||||
})
|
||||
|
||||
return nil, RememberOutput{
|
||||
Success: true,
|
||||
Timestamp: time.Now(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *Subsystem) brainRecall(_ context.Context, _ *mcp.CallToolRequest, input RecallInput) (*mcp.CallToolResult, RecallOutput, error) {
|
||||
func (s *Subsystem) brainRecall(ctx context.Context, _ *mcp.CallToolRequest, input RecallInput) (*mcp.CallToolResult, RecallOutput, error) {
|
||||
if s.bridge == nil {
|
||||
return nil, RecallOutput{}, errBridgeNotAvailable
|
||||
}
|
||||
|
|
@ -166,13 +178,18 @@ func (s *Subsystem) brainRecall(_ context.Context, _ *mcp.CallToolRequest, input
|
|||
return nil, RecallOutput{}, coreerr.E("brain.recall", "failed to send brain_recall", err)
|
||||
}
|
||||
|
||||
s.emitChannel(ctx, "brain.recall.complete", map[string]any{
|
||||
"query": input.Query,
|
||||
"count": 0,
|
||||
})
|
||||
|
||||
return nil, RecallOutput{
|
||||
Success: true,
|
||||
Memories: []Memory{},
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *Subsystem) brainForget(_ context.Context, _ *mcp.CallToolRequest, input ForgetInput) (*mcp.CallToolResult, ForgetOutput, error) {
|
||||
func (s *Subsystem) brainForget(ctx context.Context, _ *mcp.CallToolRequest, input ForgetInput) (*mcp.CallToolResult, ForgetOutput, error) {
|
||||
if s.bridge == nil {
|
||||
return nil, ForgetOutput{}, errBridgeNotAvailable
|
||||
}
|
||||
|
|
@ -188,6 +205,10 @@ func (s *Subsystem) brainForget(_ context.Context, _ *mcp.CallToolRequest, input
|
|||
return nil, ForgetOutput{}, coreerr.E("brain.forget", "failed to send brain_forget", err)
|
||||
}
|
||||
|
||||
s.emitChannel(ctx, "brain.forget.complete", map[string]any{
|
||||
"id": input.ID,
|
||||
})
|
||||
|
||||
return nil, ForgetOutput{
|
||||
Success: true,
|
||||
Forgotten: input.ID,
|
||||
|
|
@ -195,7 +216,7 @@ func (s *Subsystem) brainForget(_ context.Context, _ *mcp.CallToolRequest, input
|
|||
}, nil
|
||||
}
|
||||
|
||||
func (s *Subsystem) brainList(_ context.Context, _ *mcp.CallToolRequest, input ListInput) (*mcp.CallToolResult, ListOutput, error) {
|
||||
func (s *Subsystem) brainList(ctx context.Context, _ *mcp.CallToolRequest, input ListInput) (*mcp.CallToolResult, ListOutput, error) {
|
||||
if s.bridge == nil {
|
||||
return nil, ListOutput{}, errBridgeNotAvailable
|
||||
}
|
||||
|
|
@ -217,6 +238,13 @@ func (s *Subsystem) brainList(_ context.Context, _ *mcp.CallToolRequest, input L
|
|||
return nil, ListOutput{}, coreerr.E("brain.list", "failed to send brain_list", err)
|
||||
}
|
||||
|
||||
s.emitChannel(ctx, "brain.list.complete", map[string]any{
|
||||
"project": input.Project,
|
||||
"type": input.Type,
|
||||
"agent": input.AgentID,
|
||||
"limit": limit,
|
||||
})
|
||||
|
||||
return nil, ListOutput{
|
||||
Success: true,
|
||||
Memories: []Memory{},
|
||||
|
|
|
|||
|
|
@ -13,7 +13,6 @@ import (
|
|||
"os"
|
||||
"sync"
|
||||
|
||||
core "dappco.re/go/core"
|
||||
"github.com/modelcontextprotocol/go-sdk/mcp"
|
||||
)
|
||||
|
||||
|
|
@ -54,56 +53,38 @@ func (s *Service) SendNotificationToAllClients(ctx context.Context, level mcp.Lo
|
|||
}
|
||||
}
|
||||
|
||||
// channelNotification is the JSON-RPC notification format for claude/channel.
|
||||
type channelNotification struct {
|
||||
JSONRPC string `json:"jsonrpc"`
|
||||
Method string `json:"method"`
|
||||
Params channelParams `json:"params"`
|
||||
}
|
||||
|
||||
type channelParams struct {
|
||||
Content string `json:"content"`
|
||||
Meta map[string]string `json:"meta,omitempty"`
|
||||
}
|
||||
|
||||
// ChannelSend pushes a channel event to all connected clients via
|
||||
// the notifications/claude/channel JSON-RPC method.
|
||||
//
|
||||
// s.ChannelSend(ctx, "agent.complete", map[string]any{"repo": "go-io", "workspace": "go-io-123"})
|
||||
// s.ChannelSend(ctx, "build.failed", map[string]any{"repo": "core", "error": "test timeout"})
|
||||
func (s *Service) ChannelSend(ctx context.Context, channel string, data any) {
|
||||
// Marshal the data payload as the content string
|
||||
content := core.JSONMarshalString(data)
|
||||
|
||||
notification := channelNotification{
|
||||
JSONRPC: "2.0",
|
||||
Method: "notifications/claude/channel",
|
||||
Params: channelParams{
|
||||
Content: content,
|
||||
Meta: map[string]string{
|
||||
"source": "core-agent",
|
||||
"channel": channel,
|
||||
},
|
||||
},
|
||||
payload := map[string]any{
|
||||
"channel": channel,
|
||||
"data": data,
|
||||
}
|
||||
|
||||
msg := core.JSONMarshalString(notification)
|
||||
|
||||
// Write through the shared locked writer (same one the SDK transport uses).
|
||||
// This prevents channel notifications from interleaving with SDK messages.
|
||||
if !s.stdioMode {
|
||||
return
|
||||
}
|
||||
sharedStdout.Write([]byte(core.Concat(msg, "\n")))
|
||||
s.SendNotificationToAllClients(ctx, mcp.LoggingLevel("info"), "channel", payload)
|
||||
}
|
||||
|
||||
// ChannelSendToSession pushes a channel event to a specific session.
|
||||
// Falls back to stdout for stdio transport.
|
||||
//
|
||||
// s.ChannelSendToSession(ctx, session, "agent.progress", progressData)
|
||||
func (s *Service) ChannelSendToSession(ctx context.Context, session *mcp.ServerSession, channel string, data any) {
|
||||
// For now, channel events go to all sessions via stdout
|
||||
s.ChannelSend(ctx, channel, data)
|
||||
if session == nil {
|
||||
return
|
||||
}
|
||||
|
||||
payload := map[string]any{
|
||||
"channel": channel,
|
||||
"data": data,
|
||||
}
|
||||
if err := session.Log(ctx, &mcp.LoggingMessageParams{
|
||||
Level: mcp.LoggingLevel("info"),
|
||||
Logger: "channel",
|
||||
Data: payload,
|
||||
}); err != nil {
|
||||
s.logger.Debug("channel: failed to send to session", "session", session.ID(), "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Sessions returns an iterator over all connected MCP sessions.
|
||||
|
|
@ -128,6 +109,9 @@ func channelCapability() map[string]any {
|
|||
"agent.status",
|
||||
"build.complete",
|
||||
"build.failed",
|
||||
"brain.list.complete",
|
||||
"brain.forget.complete",
|
||||
"brain.remember.complete",
|
||||
"brain.recall.complete",
|
||||
"inbox.message",
|
||||
"process.exit",
|
||||
|
|
|
|||
67
pkg/mcp/notify_test.go
Normal file
67
pkg/mcp/notify_test.go
Normal file
|
|
@ -0,0 +1,67 @@
|
|||
package mcp
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestSendNotificationToAllClients_Good(t *testing.T) {
|
||||
svc, err := New(Options{})
|
||||
if err != nil {
|
||||
t.Fatalf("New() failed: %v", err)
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
svc.SendNotificationToAllClients(ctx, "info", "test", map[string]any{
|
||||
"event": "build.complete",
|
||||
})
|
||||
}
|
||||
|
||||
func TestChannelSend_Good(t *testing.T) {
|
||||
svc, err := New(Options{})
|
||||
if err != nil {
|
||||
t.Fatalf("New() failed: %v", err)
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
svc.ChannelSend(ctx, "build.complete", map[string]any{
|
||||
"repo": "go-io",
|
||||
})
|
||||
}
|
||||
|
||||
func TestChannelSendToSession_Good_GuardNilSession(t *testing.T) {
|
||||
svc, err := New(Options{})
|
||||
if err != nil {
|
||||
t.Fatalf("New() failed: %v", err)
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
svc.ChannelSendToSession(ctx, nil, "agent.status", map[string]any{
|
||||
"ok": true,
|
||||
})
|
||||
}
|
||||
|
||||
func TestChannelCapability_Good(t *testing.T) {
|
||||
caps := channelCapability()
|
||||
raw, ok := caps["claude/channel"]
|
||||
if !ok {
|
||||
t.Fatal("expected claude/channel capability entry")
|
||||
}
|
||||
|
||||
cap, ok := raw.(map[string]any)
|
||||
if !ok {
|
||||
t.Fatalf("expected claude/channel to be a map, got %T", raw)
|
||||
}
|
||||
|
||||
if cap["version"] == nil || cap["description"] == nil {
|
||||
t.Fatalf("expected capability to include version and description: %#v", cap)
|
||||
}
|
||||
|
||||
channels, ok := cap["channels"].([]string)
|
||||
if !ok {
|
||||
t.Fatalf("expected channels to be []string, got %T", cap["channels"])
|
||||
}
|
||||
if len(channels) == 0 {
|
||||
t.Fatal("expected at least one channel in capability definition")
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Reference in a new issue