From dd33bfb691db696aaf8012fbec35989ce09a4179 Mon Sep 17 00:00:00 2001 From: Virgil Date: Thu, 2 Apr 2026 11:28:49 +0000 Subject: [PATCH] refactor(mcp): centralize notification channel names Keep channel emitters, provider metadata, and capability advertising in sync by sharing the same constants across the MCP subsystems. Co-Authored-By: Virgil --- pkg/mcp/agentic/dispatch.go | 7 +++-- pkg/mcp/agentic/ingest.go | 3 +- pkg/mcp/agentic/prep_test.go | 12 ++++---- pkg/mcp/agentic/resume.go | 7 +++-- pkg/mcp/agentic/status.go | 7 +++-- pkg/mcp/brain/direct.go | 7 +++-- pkg/mcp/brain/provider.go | 19 ++++++------ pkg/mcp/brain/tools.go | 9 +++--- pkg/mcp/ide/ide.go | 6 ++-- pkg/mcp/ide/tools_test.go | 9 +++--- pkg/mcp/notify.go | 57 +++++++++++++++++++++++++----------- pkg/mcp/notify_test.go | 18 ++++++------ pkg/mcp/tools_process.go | 6 ++-- 13 files changed, 100 insertions(+), 67 deletions(-) diff --git a/pkg/mcp/agentic/dispatch.go b/pkg/mcp/agentic/dispatch.go index 25c4cff..bbe277b 100644 --- a/pkg/mcp/agentic/dispatch.go +++ b/pkg/mcp/agentic/dispatch.go @@ -12,6 +12,7 @@ import ( "syscall" "time" + coremcp "dappco.re/go/mcp/pkg/mcp" coreio "forge.lthn.ai/core/go-io" coreerr "forge.lthn.ai/core/go-log" "github.com/modelcontextprotocol/go-sdk/mcp" @@ -243,7 +244,7 @@ func (s *PrepSubsystem) dispatch(ctx context.Context, req *mcp.CallToolRequest, postCtx := context.WithoutCancel(ctx) status := "completed" - channel := "agent.complete" + channel := coremcp.ChannelAgentComplete payload := map[string]any{ "workspace": filepath.Base(wsDir), "repo": input.Repo, @@ -257,7 +258,7 @@ func (s *PrepSubsystem) dispatch(ctx context.Context, req *mcp.CallToolRequest, st.PID = 0 if data, err := coreio.Local.Read(filepath.Join(wsDir, "src", "BLOCKED.md")); err == nil { status = "blocked" - channel = "agent.blocked" + channel = coremcp.ChannelAgentBlocked st.Status = status st.Question = strings.TrimSpace(data) if st.Question != "" { @@ -271,7 +272,7 @@ func (s *PrepSubsystem) dispatch(ctx context.Context, req *mcp.CallToolRequest, payload["status"] = status s.emitChannel(postCtx, channel, payload) - s.emitChannel(postCtx, "agent.status", payload) + s.emitChannel(postCtx, coremcp.ChannelAgentStatus, payload) // Ingest scan findings as issues s.ingestFindings(wsDir) diff --git a/pkg/mcp/agentic/ingest.go b/pkg/mcp/agentic/ingest.go index f34aaef..a1d3f2a 100644 --- a/pkg/mcp/agentic/ingest.go +++ b/pkg/mcp/agentic/ingest.go @@ -12,6 +12,7 @@ import ( "path/filepath" "strings" + coremcp "dappco.re/go/mcp/pkg/mcp" coreio "forge.lthn.ai/core/go-io" ) @@ -134,7 +135,7 @@ func (s *PrepSubsystem) createIssueViaAPI(repo, title, description, issueType, p // ctx := context.Background() // s.emitHarvestComplete(ctx, "go-io-123", "go-io", 4, true) func (s *PrepSubsystem) emitHarvestComplete(ctx context.Context, workspace, repo string, findings int, issueCreated bool) { - s.emitChannel(ctx, "harvest.complete", map[string]any{ + s.emitChannel(ctx, coremcp.ChannelHarvestComplete, map[string]any{ "workspace": workspace, "repo": repo, "findings": findings, diff --git a/pkg/mcp/agentic/prep_test.go b/pkg/mcp/agentic/prep_test.go index fce4a19..64b9f8d 100644 --- a/pkg/mcp/agentic/prep_test.go +++ b/pkg/mcp/agentic/prep_test.go @@ -6,6 +6,8 @@ import ( "context" "strings" "testing" + + coremcp "dappco.re/go/mcp/pkg/mcp" ) type recordingNotifier struct { @@ -110,10 +112,10 @@ func TestSetNotifier_Good_EmitsChannelEvents(t *testing.T) { notifier := &recordingNotifier{} s.SetNotifier(notifier) - s.emitChannel(context.Background(), "agent.status", map[string]any{"status": "running"}) + s.emitChannel(context.Background(), coremcp.ChannelAgentStatus, map[string]any{"status": "running"}) - if notifier.channel != "agent.status" { - t.Fatalf("expected agent.status channel, got %q", notifier.channel) + if notifier.channel != coremcp.ChannelAgentStatus { + t.Fatalf("expected %s channel, got %q", coremcp.ChannelAgentStatus, notifier.channel) } if payload, ok := notifier.data.(map[string]any); !ok || payload["status"] != "running" { t.Fatalf("expected payload to include running status, got %#v", notifier.data) @@ -127,8 +129,8 @@ func TestEmitHarvestComplete_Good_EmitsChannelEvents(t *testing.T) { s.emitHarvestComplete(context.Background(), "go-io-123", "go-io", 4, true) - if notifier.channel != "harvest.complete" { - t.Fatalf("expected harvest.complete channel, got %q", notifier.channel) + if notifier.channel != coremcp.ChannelHarvestComplete { + t.Fatalf("expected %s channel, got %q", coremcp.ChannelHarvestComplete, notifier.channel) } payload, ok := notifier.data.(map[string]any) if !ok { diff --git a/pkg/mcp/agentic/resume.go b/pkg/mcp/agentic/resume.go index 9bd1ba5..c14e7bb 100644 --- a/pkg/mcp/agentic/resume.go +++ b/pkg/mcp/agentic/resume.go @@ -11,6 +11,7 @@ import ( "strings" "syscall" + coremcp "dappco.re/go/mcp/pkg/mcp" coreio "forge.lthn.ai/core/go-io" coreerr "forge.lthn.ai/core/go-log" "github.com/modelcontextprotocol/go-sdk/mcp" @@ -140,7 +141,7 @@ func (s *PrepSubsystem) resume(ctx context.Context, _ *mcp.CallToolRequest, inpu postCtx := context.WithoutCancel(ctx) status := "completed" - channel := "agent.complete" + channel := coremcp.ChannelAgentComplete payload := map[string]any{ "workspace": input.Workspace, "agent": agent, @@ -150,7 +151,7 @@ func (s *PrepSubsystem) resume(ctx context.Context, _ *mcp.CallToolRequest, inpu if data, err := coreio.Local.Read(filepath.Join(srcDir, "BLOCKED.md")); err == nil { status = "blocked" - channel = "agent.blocked" + channel = coremcp.ChannelAgentBlocked st.Question = strings.TrimSpace(data) if st.Question != "" { payload["question"] = st.Question @@ -163,7 +164,7 @@ func (s *PrepSubsystem) resume(ctx context.Context, _ *mcp.CallToolRequest, inpu payload["status"] = status s.emitChannel(postCtx, channel, payload) - s.emitChannel(postCtx, "agent.status", payload) + s.emitChannel(postCtx, coremcp.ChannelAgentStatus, payload) }() return nil, ResumeOutput{ diff --git a/pkg/mcp/agentic/status.go b/pkg/mcp/agentic/status.go index ef11184..a9b0302 100644 --- a/pkg/mcp/agentic/status.go +++ b/pkg/mcp/agentic/status.go @@ -10,6 +10,7 @@ import ( "strings" "time" + coremcp "dappco.re/go/mcp/pkg/mcp" coreio "forge.lthn.ai/core/go-io" coreerr "forge.lthn.ai/core/go-log" "github.com/modelcontextprotocol/go-sdk/mcp" @@ -142,7 +143,7 @@ func (s *PrepSubsystem) status(ctx context.Context, _ *mcp.CallToolRequest, inpu if err != nil || proc.Signal(nil) != nil { prevStatus := st.Status status := "completed" - channel := "agent.complete" + channel := coremcp.ChannelAgentComplete payload := map[string]any{ "workspace": name, "agent": st.Agent, @@ -158,7 +159,7 @@ func (s *PrepSubsystem) status(ctx context.Context, _ *mcp.CallToolRequest, inpu st.Status = "blocked" st.Question = info.Question status = "blocked" - channel = "agent.blocked" + channel = coremcp.ChannelAgentBlocked if st.Question != "" { payload["question"] = st.Question } @@ -171,7 +172,7 @@ func (s *PrepSubsystem) status(ctx context.Context, _ *mcp.CallToolRequest, inpu if prevStatus != status { payload["status"] = status s.emitChannel(ctx, channel, payload) - s.emitChannel(ctx, "agent.status", payload) + s.emitChannel(ctx, coremcp.ChannelAgentStatus, payload) } } } diff --git a/pkg/mcp/brain/direct.go b/pkg/mcp/brain/direct.go index b568f0e..37f787f 100644 --- a/pkg/mcp/brain/direct.go +++ b/pkg/mcp/brain/direct.go @@ -14,6 +14,7 @@ import ( "strings" "time" + coremcp "dappco.re/go/mcp/pkg/mcp" coreio "forge.lthn.ai/core/go-io" coreerr "forge.lthn.ai/core/go-log" "github.com/modelcontextprotocol/go-sdk/mcp" @@ -153,7 +154,7 @@ func (s *DirectSubsystem) remember(ctx context.Context, _ *mcp.CallToolRequest, id, _ := result["id"].(string) if s.onChannel != nil { - s.onChannel(ctx, "brain.remember.complete", map[string]any{ + s.onChannel(ctx, coremcp.ChannelBrainRememberDone, map[string]any{ "id": id, "type": input.Type, "project": input.Project, @@ -213,7 +214,7 @@ func (s *DirectSubsystem) recall(ctx context.Context, _ *mcp.CallToolRequest, in } if s.onChannel != nil { - s.onChannel(ctx, "brain.recall.complete", map[string]any{ + s.onChannel(ctx, coremcp.ChannelBrainRecallDone, map[string]any{ "query": input.Query, "count": len(memories), }) @@ -287,7 +288,7 @@ func (s *DirectSubsystem) list(ctx context.Context, _ *mcp.CallToolRequest, inpu } if s.onChannel != nil { - s.onChannel(ctx, "brain.list.complete", map[string]any{ + s.onChannel(ctx, coremcp.ChannelBrainListDone, map[string]any{ "project": input.Project, "type": input.Type, "agent": input.AgentID, diff --git a/pkg/mcp/brain/provider.go b/pkg/mcp/brain/provider.go index 1b12568..c279860 100644 --- a/pkg/mcp/brain/provider.go +++ b/pkg/mcp/brain/provider.go @@ -5,10 +5,11 @@ package brain import ( "net/http" + coremcp "dappco.re/go/mcp/pkg/mcp" + "dappco.re/go/mcp/pkg/mcp/ide" "forge.lthn.ai/core/api" "forge.lthn.ai/core/api/pkg/provider" "forge.lthn.ai/core/go-ws" - "dappco.re/go/mcp/pkg/mcp/ide" "github.com/gin-gonic/gin" ) @@ -45,10 +46,10 @@ func (p *BrainProvider) BasePath() string { return "/api/brain" } // Channels implements provider.Streamable. func (p *BrainProvider) Channels() []string { return []string{ - "brain.remember.complete", - "brain.recall.complete", - "brain.forget.complete", - "brain.list.complete", + coremcp.ChannelBrainRememberDone, + coremcp.ChannelBrainRecallDone, + coremcp.ChannelBrainForgetDone, + coremcp.ChannelBrainListDone, } } @@ -212,7 +213,7 @@ func (p *BrainProvider) remember(c *gin.Context) { return } - p.emitEvent("brain.remember.complete", map[string]any{ + p.emitEvent(coremcp.ChannelBrainRememberDone, map[string]any{ "type": input.Type, "project": input.Project, }) @@ -245,7 +246,7 @@ func (p *BrainProvider) recall(c *gin.Context) { return } - p.emitEvent("brain.recall.complete", map[string]any{ + p.emitEvent(coremcp.ChannelBrainRecallDone, map[string]any{ "query": input.Query, }) @@ -279,7 +280,7 @@ func (p *BrainProvider) forget(c *gin.Context) { return } - p.emitEvent("brain.forget.complete", map[string]any{ + p.emitEvent(coremcp.ChannelBrainForgetDone, map[string]any{ "id": input.ID, }) @@ -314,7 +315,7 @@ func (p *BrainProvider) list(c *gin.Context) { return } - p.emitEvent("brain.list.complete", map[string]any{ + p.emitEvent(coremcp.ChannelBrainListDone, map[string]any{ "project": project, "type": typ, "agent": agentID, diff --git a/pkg/mcp/brain/tools.go b/pkg/mcp/brain/tools.go index de29384..c31cea0 100644 --- a/pkg/mcp/brain/tools.go +++ b/pkg/mcp/brain/tools.go @@ -6,6 +6,7 @@ import ( "context" "time" + coremcp "dappco.re/go/mcp/pkg/mcp" "dappco.re/go/mcp/pkg/mcp/ide" coreerr "forge.lthn.ai/core/go-log" "github.com/modelcontextprotocol/go-sdk/mcp" @@ -150,7 +151,7 @@ func (s *Subsystem) brainRemember(ctx context.Context, _ *mcp.CallToolRequest, i return nil, RememberOutput{}, coreerr.E("brain.remember", "failed to send brain_remember", err) } - s.emitChannel(ctx, "brain.remember.complete", map[string]any{ + s.emitChannel(ctx, coremcp.ChannelBrainRememberDone, map[string]any{ "type": input.Type, "project": input.Project, }) @@ -178,7 +179,7 @@ func (s *Subsystem) brainRecall(ctx context.Context, _ *mcp.CallToolRequest, inp return nil, RecallOutput{}, coreerr.E("brain.recall", "failed to send brain_recall", err) } - s.emitChannel(ctx, "brain.recall.complete", map[string]any{ + s.emitChannel(ctx, coremcp.ChannelBrainRecallDone, map[string]any{ "query": input.Query, "count": 0, }) @@ -205,7 +206,7 @@ func (s *Subsystem) brainForget(ctx context.Context, _ *mcp.CallToolRequest, inp return nil, ForgetOutput{}, coreerr.E("brain.forget", "failed to send brain_forget", err) } - s.emitChannel(ctx, "brain.forget.complete", map[string]any{ + s.emitChannel(ctx, coremcp.ChannelBrainForgetDone, map[string]any{ "id": input.ID, }) @@ -238,7 +239,7 @@ func (s *Subsystem) brainList(ctx context.Context, _ *mcp.CallToolRequest, input return nil, ListOutput{}, coreerr.E("brain.list", "failed to send brain_list", err) } - s.emitChannel(ctx, "brain.list.complete", map[string]any{ + s.emitChannel(ctx, coremcp.ChannelBrainListDone, map[string]any{ "project": input.Project, "type": input.Type, "agent": input.AgentID, diff --git a/pkg/mcp/ide/ide.go b/pkg/mcp/ide/ide.go index 1124279..376f8cb 100644 --- a/pkg/mcp/ide/ide.go +++ b/pkg/mcp/ide/ide.go @@ -336,11 +336,11 @@ func (s *Subsystem) emitBuildLifecycle(build BuildInfo) { channel := "" switch build.Status { case "running", "in_progress", "started": - channel = "build.start" + channel = coremcp.ChannelBuildStart case "success", "succeeded", "completed", "passed": - channel = "build.complete" + channel = coremcp.ChannelBuildComplete case "failed", "error": - channel = "build.failed" + channel = coremcp.ChannelBuildFailed default: return } diff --git a/pkg/mcp/ide/tools_test.go b/pkg/mcp/ide/tools_test.go index 7a961b2..34c96b2 100644 --- a/pkg/mcp/ide/tools_test.go +++ b/pkg/mcp/ide/tools_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + coremcp "dappco.re/go/mcp/pkg/mcp" "forge.lthn.ai/core/go-ws" ) @@ -344,8 +345,8 @@ func TestBuildStatus_Good_EmitsLifecycle(t *testing.T) { }, }) - if notifier.channel != "build.complete" { - t.Fatalf("expected build.complete channel, got %q", notifier.channel) + if notifier.channel != coremcp.ChannelBuildComplete { + t.Fatalf("expected %s channel, got %q", coremcp.ChannelBuildComplete, notifier.channel) } payload, ok := notifier.data.(map[string]any) if !ok { @@ -372,8 +373,8 @@ func TestBuildStatus_Good_EmitsStartLifecycle(t *testing.T) { }, }) - if notifier.channel != "build.start" { - t.Fatalf("expected build.start channel, got %q", notifier.channel) + if notifier.channel != coremcp.ChannelBuildStart { + t.Fatalf("expected %s channel, got %q", coremcp.ChannelBuildStart, notifier.channel) } payload, ok := notifier.data.(map[string]any) if !ok { diff --git a/pkg/mcp/notify.go b/pkg/mcp/notify.go index 4e10079..f3b8eb3 100644 --- a/pkg/mcp/notify.go +++ b/pkg/mcp/notify.go @@ -12,6 +12,7 @@ import ( "iter" "os" "reflect" + "slices" "sync" "unsafe" @@ -41,6 +42,44 @@ var sharedStdout = &lockedWriter{w: os.Stdout} const channelNotificationMethod = "notifications/claude/channel" const loggingNotificationMethod = "notifications/message" +// Shared channel names. Keeping them central avoids drift between emitters +// and the advertised claude/channel capability. +const ( + ChannelBuildStart = "build.start" + ChannelBuildComplete = "build.complete" + ChannelBuildFailed = "build.failed" + ChannelAgentComplete = "agent.complete" + ChannelAgentBlocked = "agent.blocked" + ChannelAgentStatus = "agent.status" + ChannelBrainForgetDone = "brain.forget.complete" + ChannelBrainListDone = "brain.list.complete" + ChannelBrainRecallDone = "brain.recall.complete" + ChannelBrainRememberDone = "brain.remember.complete" + ChannelHarvestComplete = "harvest.complete" + ChannelInboxMessage = "inbox.message" + ChannelProcessExit = "process.exit" + ChannelProcessStart = "process.start" + ChannelTestResult = "test.result" +) + +var channelCapabilityList = []string{ + ChannelBuildStart, + ChannelAgentComplete, + ChannelAgentBlocked, + ChannelAgentStatus, + ChannelBuildComplete, + ChannelBuildFailed, + ChannelBrainForgetDone, + ChannelBrainListDone, + ChannelBrainRecallDone, + ChannelBrainRememberDone, + ChannelHarvestComplete, + ChannelInboxMessage, + ChannelProcessExit, + ChannelProcessStart, + ChannelTestResult, +} + // ChannelNotification is the payload sent through the experimental channel // notification method. type ChannelNotification struct { @@ -186,21 +225,5 @@ func channelCapability() map[string]any { // channelCapabilityChannels lists the named channel events advertised by the // experimental capability. func channelCapabilityChannels() []string { - return []string{ - "build.start", - "agent.complete", - "agent.blocked", - "agent.status", - "build.complete", - "build.failed", - "brain.forget.complete", - "brain.list.complete", - "brain.recall.complete", - "brain.remember.complete", - "harvest.complete", - "inbox.message", - "process.exit", - "process.start", - "test.result", - } + return slices.Clone(channelCapabilityList) } diff --git a/pkg/mcp/notify_test.go b/pkg/mcp/notify_test.go index eafdc73..8fcd345 100644 --- a/pkg/mcp/notify_test.go +++ b/pkg/mcp/notify_test.go @@ -18,7 +18,7 @@ func TestSendNotificationToAllClients_Good(t *testing.T) { ctx := context.Background() svc.SendNotificationToAllClients(ctx, "info", "test", map[string]any{ - "event": "build.complete", + "event": ChannelBuildComplete, }) } @@ -47,7 +47,7 @@ func TestSendNotificationToAllClients_Good_CustomNotification(t *testing.T) { sent := make(chan struct{}) go func() { svc.SendNotificationToAllClients(ctx, "info", "test", map[string]any{ - "event": "build.complete", + "event": ChannelBuildComplete, }) close(sent) }() @@ -84,8 +84,8 @@ func TestSendNotificationToAllClients_Good_CustomNotification(t *testing.T) { if !ok { t.Fatalf("expected data object, got %T", params["data"]) } - if data["event"] != "build.complete" { - t.Fatalf("expected event build.complete, got %v", data["event"]) + if data["event"] != ChannelBuildComplete { + t.Fatalf("expected event %s, got %v", ChannelBuildComplete, data["event"]) } } @@ -96,7 +96,7 @@ func TestChannelSend_Good(t *testing.T) { } ctx := context.Background() - svc.ChannelSend(ctx, "build.complete", map[string]any{ + svc.ChannelSend(ctx, ChannelBuildComplete, map[string]any{ "repo": "go-io", }) } @@ -108,7 +108,7 @@ func TestChannelSendToSession_Good_GuardNilSession(t *testing.T) { } ctx := context.Background() - svc.ChannelSendToSession(ctx, nil, "agent.status", map[string]any{ + svc.ChannelSendToSession(ctx, nil, ChannelAgentStatus, map[string]any{ "ok": true, }) } @@ -149,7 +149,7 @@ func TestChannelSendToSession_Good_CustomNotification(t *testing.T) { sent := make(chan struct{}) go func() { - svc.ChannelSendToSession(ctx, session, "build.complete", map[string]any{ + svc.ChannelSendToSession(ctx, session, ChannelBuildComplete, map[string]any{ "repo": "go-io", }) close(sent) @@ -177,8 +177,8 @@ func TestChannelSendToSession_Good_CustomNotification(t *testing.T) { if !ok { t.Fatalf("expected params object, got %T", msg["params"]) } - if params["channel"] != "build.complete" { - t.Fatalf("expected channel build.complete, got %v", params["channel"]) + if params["channel"] != ChannelBuildComplete { + t.Fatalf("expected channel %s, got %v", ChannelBuildComplete, params["channel"]) } payload, ok := params["data"].(map[string]any) if !ok { diff --git a/pkg/mcp/tools_process.go b/pkg/mcp/tools_process.go index 4f74ba2..ad3b130 100644 --- a/pkg/mcp/tools_process.go +++ b/pkg/mcp/tools_process.go @@ -201,7 +201,7 @@ func (s *Service) processStart(ctx context.Context, req *mcp.CallToolRequest, in Args: proc.Args, StartedAt: proc.StartedAt, } - s.ChannelSend(ctx, "process.start", map[string]any{ + s.ChannelSend(ctx, ChannelProcessStart, map[string]any{ "id": output.ID, "pid": output.PID, "command": output.Command, }) return nil, output, nil @@ -228,7 +228,7 @@ func (s *Service) processStop(ctx context.Context, req *mcp.CallToolRequest, inp return nil, ProcessStopOutput{}, log.E("processStop", "failed to stop process", err) } - s.ChannelSend(ctx, "process.exit", map[string]any{"id": input.ID, "signal": "stop"}) + s.ChannelSend(ctx, ChannelProcessExit, map[string]any{"id": input.ID, "signal": "stop"}) return nil, ProcessStopOutput{ ID: input.ID, Success: true, @@ -249,7 +249,7 @@ func (s *Service) processKill(ctx context.Context, req *mcp.CallToolRequest, inp return nil, ProcessKillOutput{}, log.E("processKill", "failed to kill process", err) } - s.ChannelSend(ctx, "process.exit", map[string]any{"id": input.ID, "signal": "kill"}) + s.ChannelSend(ctx, ChannelProcessExit, map[string]any{"id": input.ID, "signal": "kill"}) return nil, ProcessKillOutput{ ID: input.ID, Success: true,