refactor(mcp): centralize notification channel names

Keep channel emitters, provider metadata, and capability advertising in sync by sharing the same constants across the MCP subsystems.

Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
Virgil 2026-04-02 11:28:49 +00:00
parent d5a76bf2c7
commit dd33bfb691
13 changed files with 100 additions and 67 deletions

View file

@ -12,6 +12,7 @@ import (
"syscall"
"time"
coremcp "dappco.re/go/mcp/pkg/mcp"
coreio "forge.lthn.ai/core/go-io"
coreerr "forge.lthn.ai/core/go-log"
"github.com/modelcontextprotocol/go-sdk/mcp"
@ -243,7 +244,7 @@ func (s *PrepSubsystem) dispatch(ctx context.Context, req *mcp.CallToolRequest,
postCtx := context.WithoutCancel(ctx)
status := "completed"
channel := "agent.complete"
channel := coremcp.ChannelAgentComplete
payload := map[string]any{
"workspace": filepath.Base(wsDir),
"repo": input.Repo,
@ -257,7 +258,7 @@ func (s *PrepSubsystem) dispatch(ctx context.Context, req *mcp.CallToolRequest,
st.PID = 0
if data, err := coreio.Local.Read(filepath.Join(wsDir, "src", "BLOCKED.md")); err == nil {
status = "blocked"
channel = "agent.blocked"
channel = coremcp.ChannelAgentBlocked
st.Status = status
st.Question = strings.TrimSpace(data)
if st.Question != "" {
@ -271,7 +272,7 @@ func (s *PrepSubsystem) dispatch(ctx context.Context, req *mcp.CallToolRequest,
payload["status"] = status
s.emitChannel(postCtx, channel, payload)
s.emitChannel(postCtx, "agent.status", payload)
s.emitChannel(postCtx, coremcp.ChannelAgentStatus, payload)
// Ingest scan findings as issues
s.ingestFindings(wsDir)

View file

@ -12,6 +12,7 @@ import (
"path/filepath"
"strings"
coremcp "dappco.re/go/mcp/pkg/mcp"
coreio "forge.lthn.ai/core/go-io"
)
@ -134,7 +135,7 @@ func (s *PrepSubsystem) createIssueViaAPI(repo, title, description, issueType, p
// ctx := context.Background()
// s.emitHarvestComplete(ctx, "go-io-123", "go-io", 4, true)
func (s *PrepSubsystem) emitHarvestComplete(ctx context.Context, workspace, repo string, findings int, issueCreated bool) {
s.emitChannel(ctx, "harvest.complete", map[string]any{
s.emitChannel(ctx, coremcp.ChannelHarvestComplete, map[string]any{
"workspace": workspace,
"repo": repo,
"findings": findings,

View file

@ -6,6 +6,8 @@ import (
"context"
"strings"
"testing"
coremcp "dappco.re/go/mcp/pkg/mcp"
)
type recordingNotifier struct {
@ -110,10 +112,10 @@ func TestSetNotifier_Good_EmitsChannelEvents(t *testing.T) {
notifier := &recordingNotifier{}
s.SetNotifier(notifier)
s.emitChannel(context.Background(), "agent.status", map[string]any{"status": "running"})
s.emitChannel(context.Background(), coremcp.ChannelAgentStatus, map[string]any{"status": "running"})
if notifier.channel != "agent.status" {
t.Fatalf("expected agent.status channel, got %q", notifier.channel)
if notifier.channel != coremcp.ChannelAgentStatus {
t.Fatalf("expected %s channel, got %q", coremcp.ChannelAgentStatus, notifier.channel)
}
if payload, ok := notifier.data.(map[string]any); !ok || payload["status"] != "running" {
t.Fatalf("expected payload to include running status, got %#v", notifier.data)
@ -127,8 +129,8 @@ func TestEmitHarvestComplete_Good_EmitsChannelEvents(t *testing.T) {
s.emitHarvestComplete(context.Background(), "go-io-123", "go-io", 4, true)
if notifier.channel != "harvest.complete" {
t.Fatalf("expected harvest.complete channel, got %q", notifier.channel)
if notifier.channel != coremcp.ChannelHarvestComplete {
t.Fatalf("expected %s channel, got %q", coremcp.ChannelHarvestComplete, notifier.channel)
}
payload, ok := notifier.data.(map[string]any)
if !ok {

View file

@ -11,6 +11,7 @@ import (
"strings"
"syscall"
coremcp "dappco.re/go/mcp/pkg/mcp"
coreio "forge.lthn.ai/core/go-io"
coreerr "forge.lthn.ai/core/go-log"
"github.com/modelcontextprotocol/go-sdk/mcp"
@ -140,7 +141,7 @@ func (s *PrepSubsystem) resume(ctx context.Context, _ *mcp.CallToolRequest, inpu
postCtx := context.WithoutCancel(ctx)
status := "completed"
channel := "agent.complete"
channel := coremcp.ChannelAgentComplete
payload := map[string]any{
"workspace": input.Workspace,
"agent": agent,
@ -150,7 +151,7 @@ func (s *PrepSubsystem) resume(ctx context.Context, _ *mcp.CallToolRequest, inpu
if data, err := coreio.Local.Read(filepath.Join(srcDir, "BLOCKED.md")); err == nil {
status = "blocked"
channel = "agent.blocked"
channel = coremcp.ChannelAgentBlocked
st.Question = strings.TrimSpace(data)
if st.Question != "" {
payload["question"] = st.Question
@ -163,7 +164,7 @@ func (s *PrepSubsystem) resume(ctx context.Context, _ *mcp.CallToolRequest, inpu
payload["status"] = status
s.emitChannel(postCtx, channel, payload)
s.emitChannel(postCtx, "agent.status", payload)
s.emitChannel(postCtx, coremcp.ChannelAgentStatus, payload)
}()
return nil, ResumeOutput{

View file

@ -10,6 +10,7 @@ import (
"strings"
"time"
coremcp "dappco.re/go/mcp/pkg/mcp"
coreio "forge.lthn.ai/core/go-io"
coreerr "forge.lthn.ai/core/go-log"
"github.com/modelcontextprotocol/go-sdk/mcp"
@ -142,7 +143,7 @@ func (s *PrepSubsystem) status(ctx context.Context, _ *mcp.CallToolRequest, inpu
if err != nil || proc.Signal(nil) != nil {
prevStatus := st.Status
status := "completed"
channel := "agent.complete"
channel := coremcp.ChannelAgentComplete
payload := map[string]any{
"workspace": name,
"agent": st.Agent,
@ -158,7 +159,7 @@ func (s *PrepSubsystem) status(ctx context.Context, _ *mcp.CallToolRequest, inpu
st.Status = "blocked"
st.Question = info.Question
status = "blocked"
channel = "agent.blocked"
channel = coremcp.ChannelAgentBlocked
if st.Question != "" {
payload["question"] = st.Question
}
@ -171,7 +172,7 @@ func (s *PrepSubsystem) status(ctx context.Context, _ *mcp.CallToolRequest, inpu
if prevStatus != status {
payload["status"] = status
s.emitChannel(ctx, channel, payload)
s.emitChannel(ctx, "agent.status", payload)
s.emitChannel(ctx, coremcp.ChannelAgentStatus, payload)
}
}
}

View file

@ -14,6 +14,7 @@ import (
"strings"
"time"
coremcp "dappco.re/go/mcp/pkg/mcp"
coreio "forge.lthn.ai/core/go-io"
coreerr "forge.lthn.ai/core/go-log"
"github.com/modelcontextprotocol/go-sdk/mcp"
@ -153,7 +154,7 @@ func (s *DirectSubsystem) remember(ctx context.Context, _ *mcp.CallToolRequest,
id, _ := result["id"].(string)
if s.onChannel != nil {
s.onChannel(ctx, "brain.remember.complete", map[string]any{
s.onChannel(ctx, coremcp.ChannelBrainRememberDone, map[string]any{
"id": id,
"type": input.Type,
"project": input.Project,
@ -213,7 +214,7 @@ func (s *DirectSubsystem) recall(ctx context.Context, _ *mcp.CallToolRequest, in
}
if s.onChannel != nil {
s.onChannel(ctx, "brain.recall.complete", map[string]any{
s.onChannel(ctx, coremcp.ChannelBrainRecallDone, map[string]any{
"query": input.Query,
"count": len(memories),
})
@ -287,7 +288,7 @@ func (s *DirectSubsystem) list(ctx context.Context, _ *mcp.CallToolRequest, inpu
}
if s.onChannel != nil {
s.onChannel(ctx, "brain.list.complete", map[string]any{
s.onChannel(ctx, coremcp.ChannelBrainListDone, map[string]any{
"project": input.Project,
"type": input.Type,
"agent": input.AgentID,

View file

@ -5,10 +5,11 @@ package brain
import (
"net/http"
coremcp "dappco.re/go/mcp/pkg/mcp"
"dappco.re/go/mcp/pkg/mcp/ide"
"forge.lthn.ai/core/api"
"forge.lthn.ai/core/api/pkg/provider"
"forge.lthn.ai/core/go-ws"
"dappco.re/go/mcp/pkg/mcp/ide"
"github.com/gin-gonic/gin"
)
@ -45,10 +46,10 @@ func (p *BrainProvider) BasePath() string { return "/api/brain" }
// Channels implements provider.Streamable.
func (p *BrainProvider) Channels() []string {
return []string{
"brain.remember.complete",
"brain.recall.complete",
"brain.forget.complete",
"brain.list.complete",
coremcp.ChannelBrainRememberDone,
coremcp.ChannelBrainRecallDone,
coremcp.ChannelBrainForgetDone,
coremcp.ChannelBrainListDone,
}
}
@ -212,7 +213,7 @@ func (p *BrainProvider) remember(c *gin.Context) {
return
}
p.emitEvent("brain.remember.complete", map[string]any{
p.emitEvent(coremcp.ChannelBrainRememberDone, map[string]any{
"type": input.Type,
"project": input.Project,
})
@ -245,7 +246,7 @@ func (p *BrainProvider) recall(c *gin.Context) {
return
}
p.emitEvent("brain.recall.complete", map[string]any{
p.emitEvent(coremcp.ChannelBrainRecallDone, map[string]any{
"query": input.Query,
})
@ -279,7 +280,7 @@ func (p *BrainProvider) forget(c *gin.Context) {
return
}
p.emitEvent("brain.forget.complete", map[string]any{
p.emitEvent(coremcp.ChannelBrainForgetDone, map[string]any{
"id": input.ID,
})
@ -314,7 +315,7 @@ func (p *BrainProvider) list(c *gin.Context) {
return
}
p.emitEvent("brain.list.complete", map[string]any{
p.emitEvent(coremcp.ChannelBrainListDone, map[string]any{
"project": project,
"type": typ,
"agent": agentID,

View file

@ -6,6 +6,7 @@ import (
"context"
"time"
coremcp "dappco.re/go/mcp/pkg/mcp"
"dappco.re/go/mcp/pkg/mcp/ide"
coreerr "forge.lthn.ai/core/go-log"
"github.com/modelcontextprotocol/go-sdk/mcp"
@ -150,7 +151,7 @@ func (s *Subsystem) brainRemember(ctx context.Context, _ *mcp.CallToolRequest, i
return nil, RememberOutput{}, coreerr.E("brain.remember", "failed to send brain_remember", err)
}
s.emitChannel(ctx, "brain.remember.complete", map[string]any{
s.emitChannel(ctx, coremcp.ChannelBrainRememberDone, map[string]any{
"type": input.Type,
"project": input.Project,
})
@ -178,7 +179,7 @@ func (s *Subsystem) brainRecall(ctx context.Context, _ *mcp.CallToolRequest, inp
return nil, RecallOutput{}, coreerr.E("brain.recall", "failed to send brain_recall", err)
}
s.emitChannel(ctx, "brain.recall.complete", map[string]any{
s.emitChannel(ctx, coremcp.ChannelBrainRecallDone, map[string]any{
"query": input.Query,
"count": 0,
})
@ -205,7 +206,7 @@ func (s *Subsystem) brainForget(ctx context.Context, _ *mcp.CallToolRequest, inp
return nil, ForgetOutput{}, coreerr.E("brain.forget", "failed to send brain_forget", err)
}
s.emitChannel(ctx, "brain.forget.complete", map[string]any{
s.emitChannel(ctx, coremcp.ChannelBrainForgetDone, map[string]any{
"id": input.ID,
})
@ -238,7 +239,7 @@ func (s *Subsystem) brainList(ctx context.Context, _ *mcp.CallToolRequest, input
return nil, ListOutput{}, coreerr.E("brain.list", "failed to send brain_list", err)
}
s.emitChannel(ctx, "brain.list.complete", map[string]any{
s.emitChannel(ctx, coremcp.ChannelBrainListDone, map[string]any{
"project": input.Project,
"type": input.Type,
"agent": input.AgentID,

View file

@ -336,11 +336,11 @@ func (s *Subsystem) emitBuildLifecycle(build BuildInfo) {
channel := ""
switch build.Status {
case "running", "in_progress", "started":
channel = "build.start"
channel = coremcp.ChannelBuildStart
case "success", "succeeded", "completed", "passed":
channel = "build.complete"
channel = coremcp.ChannelBuildComplete
case "failed", "error":
channel = "build.failed"
channel = coremcp.ChannelBuildFailed
default:
return
}

View file

@ -8,6 +8,7 @@ import (
"testing"
"time"
coremcp "dappco.re/go/mcp/pkg/mcp"
"forge.lthn.ai/core/go-ws"
)
@ -344,8 +345,8 @@ func TestBuildStatus_Good_EmitsLifecycle(t *testing.T) {
},
})
if notifier.channel != "build.complete" {
t.Fatalf("expected build.complete channel, got %q", notifier.channel)
if notifier.channel != coremcp.ChannelBuildComplete {
t.Fatalf("expected %s channel, got %q", coremcp.ChannelBuildComplete, notifier.channel)
}
payload, ok := notifier.data.(map[string]any)
if !ok {
@ -372,8 +373,8 @@ func TestBuildStatus_Good_EmitsStartLifecycle(t *testing.T) {
},
})
if notifier.channel != "build.start" {
t.Fatalf("expected build.start channel, got %q", notifier.channel)
if notifier.channel != coremcp.ChannelBuildStart {
t.Fatalf("expected %s channel, got %q", coremcp.ChannelBuildStart, notifier.channel)
}
payload, ok := notifier.data.(map[string]any)
if !ok {

View file

@ -12,6 +12,7 @@ import (
"iter"
"os"
"reflect"
"slices"
"sync"
"unsafe"
@ -41,6 +42,44 @@ var sharedStdout = &lockedWriter{w: os.Stdout}
const channelNotificationMethod = "notifications/claude/channel"
const loggingNotificationMethod = "notifications/message"
// Shared channel names. Keeping them central avoids drift between emitters
// and the advertised claude/channel capability.
const (
ChannelBuildStart = "build.start"
ChannelBuildComplete = "build.complete"
ChannelBuildFailed = "build.failed"
ChannelAgentComplete = "agent.complete"
ChannelAgentBlocked = "agent.blocked"
ChannelAgentStatus = "agent.status"
ChannelBrainForgetDone = "brain.forget.complete"
ChannelBrainListDone = "brain.list.complete"
ChannelBrainRecallDone = "brain.recall.complete"
ChannelBrainRememberDone = "brain.remember.complete"
ChannelHarvestComplete = "harvest.complete"
ChannelInboxMessage = "inbox.message"
ChannelProcessExit = "process.exit"
ChannelProcessStart = "process.start"
ChannelTestResult = "test.result"
)
var channelCapabilityList = []string{
ChannelBuildStart,
ChannelAgentComplete,
ChannelAgentBlocked,
ChannelAgentStatus,
ChannelBuildComplete,
ChannelBuildFailed,
ChannelBrainForgetDone,
ChannelBrainListDone,
ChannelBrainRecallDone,
ChannelBrainRememberDone,
ChannelHarvestComplete,
ChannelInboxMessage,
ChannelProcessExit,
ChannelProcessStart,
ChannelTestResult,
}
// ChannelNotification is the payload sent through the experimental channel
// notification method.
type ChannelNotification struct {
@ -186,21 +225,5 @@ func channelCapability() map[string]any {
// channelCapabilityChannels lists the named channel events advertised by the
// experimental capability.
func channelCapabilityChannels() []string {
return []string{
"build.start",
"agent.complete",
"agent.blocked",
"agent.status",
"build.complete",
"build.failed",
"brain.forget.complete",
"brain.list.complete",
"brain.recall.complete",
"brain.remember.complete",
"harvest.complete",
"inbox.message",
"process.exit",
"process.start",
"test.result",
}
return slices.Clone(channelCapabilityList)
}

View file

@ -18,7 +18,7 @@ func TestSendNotificationToAllClients_Good(t *testing.T) {
ctx := context.Background()
svc.SendNotificationToAllClients(ctx, "info", "test", map[string]any{
"event": "build.complete",
"event": ChannelBuildComplete,
})
}
@ -47,7 +47,7 @@ func TestSendNotificationToAllClients_Good_CustomNotification(t *testing.T) {
sent := make(chan struct{})
go func() {
svc.SendNotificationToAllClients(ctx, "info", "test", map[string]any{
"event": "build.complete",
"event": ChannelBuildComplete,
})
close(sent)
}()
@ -84,8 +84,8 @@ func TestSendNotificationToAllClients_Good_CustomNotification(t *testing.T) {
if !ok {
t.Fatalf("expected data object, got %T", params["data"])
}
if data["event"] != "build.complete" {
t.Fatalf("expected event build.complete, got %v", data["event"])
if data["event"] != ChannelBuildComplete {
t.Fatalf("expected event %s, got %v", ChannelBuildComplete, data["event"])
}
}
@ -96,7 +96,7 @@ func TestChannelSend_Good(t *testing.T) {
}
ctx := context.Background()
svc.ChannelSend(ctx, "build.complete", map[string]any{
svc.ChannelSend(ctx, ChannelBuildComplete, map[string]any{
"repo": "go-io",
})
}
@ -108,7 +108,7 @@ func TestChannelSendToSession_Good_GuardNilSession(t *testing.T) {
}
ctx := context.Background()
svc.ChannelSendToSession(ctx, nil, "agent.status", map[string]any{
svc.ChannelSendToSession(ctx, nil, ChannelAgentStatus, map[string]any{
"ok": true,
})
}
@ -149,7 +149,7 @@ func TestChannelSendToSession_Good_CustomNotification(t *testing.T) {
sent := make(chan struct{})
go func() {
svc.ChannelSendToSession(ctx, session, "build.complete", map[string]any{
svc.ChannelSendToSession(ctx, session, ChannelBuildComplete, map[string]any{
"repo": "go-io",
})
close(sent)
@ -177,8 +177,8 @@ func TestChannelSendToSession_Good_CustomNotification(t *testing.T) {
if !ok {
t.Fatalf("expected params object, got %T", msg["params"])
}
if params["channel"] != "build.complete" {
t.Fatalf("expected channel build.complete, got %v", params["channel"])
if params["channel"] != ChannelBuildComplete {
t.Fatalf("expected channel %s, got %v", ChannelBuildComplete, params["channel"])
}
payload, ok := params["data"].(map[string]any)
if !ok {

View file

@ -201,7 +201,7 @@ func (s *Service) processStart(ctx context.Context, req *mcp.CallToolRequest, in
Args: proc.Args,
StartedAt: proc.StartedAt,
}
s.ChannelSend(ctx, "process.start", map[string]any{
s.ChannelSend(ctx, ChannelProcessStart, map[string]any{
"id": output.ID, "pid": output.PID, "command": output.Command,
})
return nil, output, nil
@ -228,7 +228,7 @@ func (s *Service) processStop(ctx context.Context, req *mcp.CallToolRequest, inp
return nil, ProcessStopOutput{}, log.E("processStop", "failed to stop process", err)
}
s.ChannelSend(ctx, "process.exit", map[string]any{"id": input.ID, "signal": "stop"})
s.ChannelSend(ctx, ChannelProcessExit, map[string]any{"id": input.ID, "signal": "stop"})
return nil, ProcessStopOutput{
ID: input.ID,
Success: true,
@ -249,7 +249,7 @@ func (s *Service) processKill(ctx context.Context, req *mcp.CallToolRequest, inp
return nil, ProcessKillOutput{}, log.E("processKill", "failed to kill process", err)
}
s.ChannelSend(ctx, "process.exit", map[string]any{"id": input.ID, "signal": "kill"})
s.ChannelSend(ctx, ChannelProcessExit, map[string]any{"id": input.ID, "signal": "kill"})
return nil, ProcessKillOutput{
ID: input.ID,
Success: true,