From 7c1aae040244a41d4312be7ec37e3a052ee4311d Mon Sep 17 00:00:00 2001 From: Virgil Date: Sun, 29 Mar 2026 23:30:08 +0000 Subject: [PATCH] fix(ax): auto-wire monitor IPC hooks Co-Authored-By: Virgil --- pkg/brain/brain.go | 16 +++++++----- pkg/brain/direct.go | 10 +++---- pkg/brain/provider.go | 23 +++++++--------- pkg/brain/register.go | 8 +++--- pkg/monitor/monitor.go | 58 +++++++++++++++++++++++++---------------- pkg/monitor/register.go | 21 +++------------ 6 files changed, 66 insertions(+), 70 deletions(-) diff --git a/pkg/brain/brain.go b/pkg/brain/brain.go index 5e04998..12fe113 100644 --- a/pkg/brain/brain.go +++ b/pkg/brain/brain.go @@ -1,7 +1,9 @@ // SPDX-License-Identifier: EUPL-1.2 -// Package brain provides an MCP subsystem that proxies OpenBrain knowledge -// store operations to the Laravel php-agentic backend via the IDE bridge. +// Package brain gives MCP and HTTP services a shared OpenBrain surface. +// +// sub := brain.New(bridge) +// sub.RegisterTools(server) package brain import ( @@ -29,7 +31,7 @@ func fieldString(values map[string]any, key string) string { // but it has not been initialised (headless mode). var errBridgeNotAvailable = core.E("brain", "bridge not available", nil) -// Subsystem proxies brain_* MCP tools through the shared IDE bridge. +// Subsystem routes brain_* MCP tools through the shared IDE bridge. // // sub := brain.New(bridge) // sub.RegisterTools(server) @@ -37,7 +39,7 @@ type Subsystem struct { bridge *ide.Bridge } -// New creates a bridge-backed brain subsystem. +// New builds the bridge-backed OpenBrain subsystem. // // sub := brain.New(bridge) // _ = sub.Shutdown(context.Background()) @@ -45,12 +47,12 @@ func New(bridge *ide.Bridge) *Subsystem { return &Subsystem{bridge: bridge} } -// Name returns the MCP subsystem name. +// Name keeps the subsystem address stable for core.WithService and MCP. // // name := sub.Name() // "brain" func (s *Subsystem) Name() string { return "brain" } -// RegisterTools adds the bridge-backed brain tools to an MCP server. +// RegisterTools publishes the bridge-backed brain tools on an MCP server. // // sub := brain.New(bridge) // sub.RegisterTools(server) @@ -58,7 +60,7 @@ func (s *Subsystem) RegisterTools(server *mcp.Server) { s.registerBrainTools(server) } -// Shutdown closes the subsystem without additional cleanup. +// Shutdown satisfies the MCP subsystem lifecycle without extra cleanup. // // _ = sub.Shutdown(context.Background()) func (s *Subsystem) Shutdown(_ context.Context) error { diff --git a/pkg/brain/direct.go b/pkg/brain/direct.go index c7475c4..fcc25fd 100644 --- a/pkg/brain/direct.go +++ b/pkg/brain/direct.go @@ -12,7 +12,7 @@ import ( "github.com/modelcontextprotocol/go-sdk/mcp" ) -// DirectSubsystem calls the OpenBrain HTTP API without the IDE bridge. +// DirectSubsystem talks to OpenBrain over HTTP without the IDE bridge. // // sub := brain.NewDirect() // sub.RegisterTools(server) @@ -23,7 +23,7 @@ type DirectSubsystem struct { var _ coremcp.Subsystem = (*DirectSubsystem)(nil) -// NewDirect creates a direct HTTP brain subsystem. +// NewDirect builds the HTTP-backed OpenBrain subsystem. // // sub := brain.NewDirect() // sub.RegisterTools(server) @@ -56,12 +56,12 @@ func NewDirect() *DirectSubsystem { } } -// Name returns the MCP subsystem name. +// Name keeps the direct subsystem address stable for core.WithService and MCP. // // name := sub.Name() // "brain" func (s *DirectSubsystem) Name() string { return "brain" } -// RegisterTools adds the direct OpenBrain tools to an MCP server. +// RegisterTools publishes the direct brain_* and agent_* tools on an MCP server. // // sub := brain.NewDirect() // sub.RegisterTools(server) @@ -85,7 +85,7 @@ func (s *DirectSubsystem) RegisterTools(server *mcp.Server) { s.RegisterMessagingTools(server) } -// Shutdown closes the direct subsystem without additional cleanup. +// Shutdown satisfies the MCP subsystem lifecycle without extra cleanup. // // _ = sub.Shutdown(context.Background()) func (s *DirectSubsystem) Shutdown(_ context.Context) error { return nil } diff --git a/pkg/brain/provider.go b/pkg/brain/provider.go index 08400b9..8b10e2b 100644 --- a/pkg/brain/provider.go +++ b/pkg/brain/provider.go @@ -12,10 +12,7 @@ import ( "github.com/gin-gonic/gin" ) -// BrainProvider wraps the brain Subsystem as a service provider with REST -// endpoints. It delegates to the same IDE bridge that the MCP tools use. -// -// Usage example: +// BrainProvider exposes the same OpenBrain bridge over HTTP routes and WS events. // // provider := brain.NewProvider(bridge, hub) // provider.RegisterRoutes(router.Group("/api/brain")) @@ -39,7 +36,7 @@ const ( statusServiceUnavailable = 503 ) -// NewProvider creates a provider backed by the IDE bridge. +// NewProvider builds the HTTP provider around the IDE bridge and WS hub. // // p := brain.NewProvider(bridge, hub) // p.RegisterRoutes(router.Group("/api/brain")) @@ -50,17 +47,17 @@ func NewProvider(bridge *ide.Bridge, hub *ws.Hub) *BrainProvider { } } -// Name returns the provider name used during API registration. +// Name keeps the provider address stable during API registration. // -// name := p.Name() +// name := p.Name() // "brain" func (p *BrainProvider) Name() string { return "brain" } -// BasePath returns the HTTP base path for the provider routes. +// BasePath shows where the provider mounts its routes. // -// base := p.BasePath() +// base := p.BasePath() // "/api/brain" func (p *BrainProvider) BasePath() string { return "/api/brain" } -// Channels returns the WS channels emitted by brain actions. +// Channels lists the WS events emitted after brain actions complete. // // channels := p.Channels() func (p *BrainProvider) Channels() []string { @@ -71,7 +68,7 @@ func (p *BrainProvider) Channels() []string { } } -// Element returns the UI element metadata for the provider panel. +// Element describes the browser component that renders the brain panel. // // spec := p.Element() func (p *BrainProvider) Element() provider.ElementSpec { @@ -81,7 +78,7 @@ func (p *BrainProvider) Element() provider.ElementSpec { } } -// RegisterRoutes binds the provider handlers onto a router group. +// RegisterRoutes mounts the provider handlers onto a router group. // // p.RegisterRoutes(router.Group("/api/brain")) func (p *BrainProvider) RegisterRoutes(rg *gin.RouterGroup) { @@ -92,7 +89,7 @@ func (p *BrainProvider) RegisterRoutes(rg *gin.RouterGroup) { rg.GET("/status", p.status) } -// Describe returns the OpenAPI route descriptions for the provider. +// Describe returns the route contract used by API discovery and docs. // // routes := p.Describe() func (p *BrainProvider) Describe() []api.RouteDescription { diff --git a/pkg/brain/register.go b/pkg/brain/register.go index 951d353..91b064e 100644 --- a/pkg/brain/register.go +++ b/pkg/brain/register.go @@ -6,12 +6,10 @@ import ( core "dappco.re/go/core" ) -// Register is the service factory for core.WithService. -// Returns the DirectSubsystem — WithService auto-registers it. +// Register exposes the direct OpenBrain subsystem through core.WithService. // -// core.New( -// core.WithService(brain.Register), -// ) +// c := core.New(core.WithService(brain.Register)) +// sub, _ := core.ServiceFor[*brain.DirectSubsystem](c, "brain") func Register(c *core.Core) core.Result { brn := NewDirect() return core.Result{Value: brn, OK: true} diff --git a/pkg/monitor/monitor.go b/pkg/monitor/monitor.go index e00e9a8..4af86f1 100644 --- a/pkg/monitor/monitor.go +++ b/pkg/monitor/monitor.go @@ -1,12 +1,10 @@ // SPDX-License-Identifier: EUPL-1.2 -// Package monitor provides a background subsystem that watches the ecosystem -// and pushes notifications to connected MCP clients. +// Package monitor polls workspace state, repo drift, and agent inboxes, then +// pushes the current view to connected MCP clients. // -// Checks performed on each tick: -// - Agent completions: scans workspace for newly completed agents -// - Repo drift: checks forge for repos with unpushed/unpulled changes -// - Inbox: checks for unread agent messages +// mon := monitor.New(monitor.Options{Interval: 30 * time.Second}) +// mon.RegisterTools(server) package monitor import ( @@ -78,14 +76,16 @@ func resultString(r core.Result) (string, bool) { return value, true } -// MonitorOptions configures the monitor service. +// MonitorOptions is the service-runtime payload used by core.WithService. // -// opts := monitor.MonitorOptions{} +// c := core.New(core.WithService(monitor.Register)) +// _, _ = core.ServiceFor[*monitor.Subsystem](c, "monitor") type MonitorOptions struct{} -// Subsystem implements mcp.Subsystem for background monitoring. +// Subsystem owns the long-running monitor loop and MCP resource surface. // -// core.New(core.WithService(monitor.Register)) +// mon := monitor.New() +// mon.Start(context.Background()) type Subsystem struct { *core.ServiceRuntime[MonitorOptions] server *mcp.Server @@ -109,8 +109,8 @@ type Subsystem struct { var _ coremcp.Subsystem = (*Subsystem)(nil) -// SetCore wires the Core framework instance via ServiceRuntime. -// Deprecated: Use Register with core.WithService(monitor.Register) instead. +// SetCore preserves direct test setup without going through core.WithService. +// Deprecated: prefer Register with core.WithService(monitor.Register). // // mon.SetCore(c) func (m *Subsystem) SetCore(c *core.Core) { @@ -134,7 +134,21 @@ func (m *Subsystem) handleAgentCompleted(ev messages.AgentCompleted) { go m.checkIdleAfterDelay() } -// Options configures the monitor interval. +// HandleIPCEvents lets Core auto-wire monitor side-effects for IPC messages. +// +// c.ACTION(messages.AgentStarted{Agent: "codex", Repo: "go-io", Workspace: "core/go-io/task-5"}) +// c.ACTION(messages.AgentCompleted{Agent: "codex", Repo: "go-io", Workspace: "core/go-io/task-5", Status: "completed"}) +func (m *Subsystem) HandleIPCEvents(_ *core.Core, msg core.Message) core.Result { + switch ev := msg.(type) { + case messages.AgentCompleted: + m.handleAgentCompleted(ev) + case messages.AgentStarted: + m.handleAgentStarted(ev) + } + return core.Result{OK: true} +} + +// Options configures the monitor polling interval. // // opts := monitor.Options{Interval: 30 * time.Second} // mon := monitor.New(opts) @@ -143,7 +157,7 @@ type Options struct { Interval time.Duration } -// New creates a monitor subsystem. +// New builds the monitor with a polling interval and poke channel. // // mon := monitor.New(monitor.Options{Interval: 30 * time.Second}) func New(opts ...Options) *Subsystem { @@ -170,12 +184,12 @@ func (m *Subsystem) debugChannel(msg string) { core.Debug(msg) } -// Name returns the subsystem identifier used by MCP registration. +// Name keeps the monitor address stable for MCP and core.WithService lookups. // -// mon.Name() // "monitor" +// name := mon.Name() // "monitor" func (m *Subsystem) Name() string { return "monitor" } -// RegisterTools binds the monitor resource to an MCP server. +// RegisterTools publishes the monitor status resource on an MCP server. // // mon.RegisterTools(server) func (m *Subsystem) RegisterTools(server *mcp.Server) { @@ -190,7 +204,7 @@ func (m *Subsystem) RegisterTools(server *mcp.Server) { }, m.agentStatusResource) } -// Start begins the background monitoring loop after MCP startup. +// Start launches the background polling loop. // // mon.Start(ctx) func (m *Subsystem) Start(ctx context.Context) { @@ -206,7 +220,7 @@ func (m *Subsystem) Start(ctx context.Context) { }() } -// OnStartup starts the monitoring loop for a registered service. +// OnStartup starts the monitor when Core starts the service lifecycle. // // r := mon.OnStartup(context.Background()) // core.Println(r.OK) @@ -215,7 +229,7 @@ func (m *Subsystem) OnStartup(ctx context.Context) core.Result { return core.Result{OK: true} } -// OnShutdown stops the monitoring loop through the Core lifecycle hook. +// OnShutdown stops the monitor through the Core lifecycle hook. // // r := mon.OnShutdown(context.Background()) // core.Println(r.OK) @@ -224,7 +238,7 @@ func (m *Subsystem) OnShutdown(ctx context.Context) core.Result { return core.Result{OK: true} } -// Shutdown stops the monitoring loop and waits for it to exit. +// Shutdown cancels the monitor loop and waits for the goroutine to exit. // // _ = mon.Shutdown(ctx) func (m *Subsystem) Shutdown(_ context.Context) error { @@ -235,7 +249,7 @@ func (m *Subsystem) Shutdown(_ context.Context) error { return nil } -// Poke triggers an immediate check cycle. +// Poke asks the loop to run a check immediately instead of waiting for the ticker. // // mon.Poke() func (m *Subsystem) Poke() { diff --git a/pkg/monitor/register.go b/pkg/monitor/register.go index 49b4e78..f72d9c8 100644 --- a/pkg/monitor/register.go +++ b/pkg/monitor/register.go @@ -3,30 +3,15 @@ package monitor import ( - "dappco.re/go/agent/pkg/messages" core "dappco.re/go/core" ) -// Register is the service factory for core.WithService. -// Returns the monitor Subsystem — WithService auto-registers it. +// Register wires the monitor service into Core and lets HandleIPCEvents auto-register. // -// core.New( -// core.WithService(monitor.Register), -// ) +// c := core.New(core.WithService(monitor.Register)) +// mon, _ := core.ServiceFor[*monitor.Subsystem](c, "monitor") func Register(c *core.Core) core.Result { mon := New() mon.ServiceRuntime = core.NewServiceRuntime(c, MonitorOptions{}) - - // Register IPC handler for agent lifecycle events - c.RegisterAction(func(c *core.Core, msg core.Message) core.Result { - switch ev := msg.(type) { - case messages.AgentCompleted: - mon.handleAgentCompleted(ev) - case messages.AgentStarted: - mon.handleAgentStarted(ev) - } - return core.Result{OK: true} - }) - return core.Result{Value: mon, OK: true} }