From 1ac4ff731d6daeea072a4d13a6c159fe72236ba0 Mon Sep 17 00:00:00 2001 From: Snider Date: Sat, 21 Mar 2026 15:16:34 +0000 Subject: [PATCH] feat(mcp): emit channel events for brain recall/remember DirectSubsystem pushes brain.recall.complete and brain.remember.complete events via OnChannel callback. Avoids circular import by using func-based wiring instead of interface-based SubsystemWithNotifier. New() auto-detects subsystems with OnChannel() and wires them to ChannelSend via closure. Co-Authored-By: Virgil --- pkg/mcp/brain/direct.go | 33 ++++++++++++++++++++++++++++++--- pkg/mcp/mcp.go | 13 +++++++++++++ 2 files changed, 43 insertions(+), 3 deletions(-) diff --git a/pkg/mcp/brain/direct.go b/pkg/mcp/brain/direct.go index de4cb2d..b1a20ec 100644 --- a/pkg/mcp/brain/direct.go +++ b/pkg/mcp/brain/direct.go @@ -18,13 +18,27 @@ import ( "github.com/modelcontextprotocol/go-sdk/mcp" ) +// channelSender is the callback for pushing channel events. +type channelSender func(ctx context.Context, channel string, data any) + // DirectSubsystem implements mcp.Subsystem for OpenBrain via direct HTTP calls. // Unlike Subsystem (which uses the IDE WebSocket bridge), this calls the // Laravel API directly — suitable for standalone core-mcp usage. type DirectSubsystem struct { - apiURL string - apiKey string - client *http.Client + apiURL string + apiKey string + client *http.Client + onChannel channelSender +} + +// OnChannel sets a callback for channel event broadcasting. +// Called by the MCP service after creation to wire up notifications. +// +// brain.OnChannel(func(ctx context.Context, ch string, data any) { +// mcpService.ChannelSend(ctx, ch, data) +// }) +func (s *DirectSubsystem) OnChannel(fn func(ctx context.Context, channel string, data any)) { + s.onChannel = fn } // NewDirect creates a brain subsystem that calls the OpenBrain API directly. @@ -132,6 +146,13 @@ func (s *DirectSubsystem) remember(ctx context.Context, _ *mcp.CallToolRequest, } id, _ := result["id"].(string) + if s.onChannel != nil { + s.onChannel(ctx, "brain.remember.complete", map[string]any{ + "id": id, + "type": input.Type, + "project": input.Project, + }) + } return nil, RememberOutput{ Success: true, MemoryID: id, @@ -185,6 +206,12 @@ func (s *DirectSubsystem) recall(ctx context.Context, _ *mcp.CallToolRequest, in } } + if s.onChannel != nil { + s.onChannel(ctx, "brain.recall.complete", map[string]any{ + "query": input.Query, + "count": len(memories), + }) + } return nil, RecallOutput{ Success: true, Count: len(memories), diff --git a/pkg/mcp/mcp.go b/pkg/mcp/mcp.go index 5230237..10b0496 100644 --- a/pkg/mcp/mcp.go +++ b/pkg/mcp/mcp.go @@ -107,6 +107,19 @@ func New(opts Options) (*Service, error) { for _, sub := range s.subsystems { sub.RegisterTools(s.server) + if sn, ok := sub.(SubsystemWithNotifier); ok { + sn.SetNotifier(s) + } + // Wire channel callback for subsystems that use func-based notification + type channelWirer interface { + OnChannel(func(ctx context.Context, channel string, data any)) + } + if cw, ok := sub.(channelWirer); ok { + svc := s // capture for closure + cw.OnChannel(func(ctx context.Context, channel string, data any) { + svc.ChannelSend(ctx, channel, data) + }) + } } return s, nil