From 4e69daf2da0e77deb26e1866d77ddd6ad981fb81 Mon Sep 17 00:00:00 2001 From: Snider Date: Tue, 24 Mar 2026 16:33:04 +0000 Subject: [PATCH] feat: replace initServices() with core.New() service conclave MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Services are now registered during Core construction: core.New( core.WithService(agentic.Register), core.WithService(monitor.Register), core.WithService(brain.Register), ) - Remove initServices() closure — services created once in conclave - Commands use c.ServiceStartup()/c.ServiceShutdown() for lifecycle - Service instances retrieved via c.Config() for MCP tool registration - run/orchestrator reduced to ServiceStartup + block + ServiceShutdown - run/task uses conclave's agentic instance Co-Authored-By: Virgil --- cmd/core-agent/main.go | 95 ++++++++++--------------------- pkg/agentic/dispatch.go | 36 +++++++----- pkg/agentic/handlers.go | 121 ++++++++++++++++++++++++++++++++++++++++ pkg/agentic/register.go | 11 ++++ pkg/brain/register.go | 4 +- pkg/monitor/register.go | 3 + 6 files changed, 187 insertions(+), 83 deletions(-) create mode 100644 pkg/agentic/handlers.go diff --git a/cmd/core-agent/main.go b/cmd/core-agent/main.go index 9ba1e55..25a6eca 100644 --- a/cmd/core-agent/main.go +++ b/cmd/core-agent/main.go @@ -20,6 +20,9 @@ import ( func main() { r := core.New( core.WithOptions(core.Options{{Key: "name", Value: "core-agent"}}), + core.WithService(agentic.Register), + core.WithService(monitor.Register), + core.WithService(brain.Register), ) if !r.OK { core.Error("failed to create core", "err", r.Value) @@ -298,49 +301,30 @@ func main() { }, }) - // Shared setup — creates MCP service with all subsystems wired. - // Services are registered with Core for lifecycle, and with MCP for tool registration. - initServices := func() (*mcp.Service, *monitor.Subsystem, error) { - procFactory := process.NewService(process.Options{}) - procResult, err := procFactory(c) - if err != nil { - return nil, nil, core.E("main", "init process service", err) - } + // Retrieve service instances from conclave for MCP tool registration + agenticSvc := core.ConfigGet[*agentic.PrepSubsystem](c.Config(), "agentic.instance") + monitorSvc := core.ConfigGet[*monitor.Subsystem](c.Config(), "monitor.instance") + brainSvc := core.ConfigGet[*brain.DirectSubsystem](c.Config(), "brain.instance") + + // Process service (lifecycle management) + procFactory := process.NewService(process.Options{}) + procResult, procErr := procFactory(c) + if procErr == nil { if procSvc, ok := procResult.(*process.Service); ok { _ = process.SetDefault(procSvc) } + } - mon := monitor.New() - prep := agentic.NewPrep() - brn := brain.NewDirect() - - // Wire Core framework into subsystems - prep.SetCore(c) - mon.SetCore(c) - - // Register post-completion pipeline as IPC handlers - agentic.RegisterHandlers(c, prep) - - // Register as Core services with lifecycle hooks - c.Service("agentic", core.Service{ - OnStart: func() core.Result { - prep.StartRunner() - return core.Result{OK: true} - }, - }) - c.Service("monitor", core.Service{}) - c.Service("brain", core.Service{}) - - // MCP service with all subsystems for tool registration + // MCP service — wires subsystems for tool registration + initMCP := func() (*mcp.Service, error) { mcpSvc, err := mcp.New(mcp.Options{ - Subsystems: []mcp.Subsystem{brn, prep, mon}, + Subsystems: []mcp.Subsystem{brainSvc, agenticSvc, monitorSvc}, }) if err != nil { - return nil, nil, core.E("main", "create MCP service", err) + return nil, core.E("main", "create MCP service", err) } - - mon.SetNotifier(mcpSvc) - return mcpSvc, mon, nil + monitorSvc.SetNotifier(mcpSvc) + return mcpSvc, nil } // Signal-aware context for clean shutdown @@ -351,14 +335,15 @@ func main() { c.Command("mcp", core.Command{ Description: "Start the MCP server on stdio", Action: func(opts core.Options) core.Result { - mcpSvc, mon, err := initServices() + mcpSvc, err := initMCP() if err != nil { return core.Result{Value: err, OK: false} } - mon.Start(ctx) + c.ServiceStartup(ctx, nil) if err := mcpSvc.Run(ctx); err != nil { return core.Result{Value: err, OK: false} } + c.ServiceShutdown(context.Background()) return core.Result{OK: true} }, }) @@ -367,7 +352,7 @@ func main() { c.Command("serve", core.Command{ Description: "Start as a persistent HTTP daemon", Action: func(opts core.Options) core.Result { - mcpSvc, mon, err := initServices() + mcpSvc, err := initMCP() if err != nil { return core.Result{Value: err, OK: false} } @@ -400,7 +385,7 @@ func main() { return core.Result{Value: core.E("main", "daemon start", err), OK: false} } - mon.Start(ctx) + c.ServiceStartup(ctx, nil) daemon.SetReady(true) core.Print(os.Stderr, "core-agent serving on %s (health: %s, pid: %s)", addr, healthAddr, pidFile) @@ -409,6 +394,7 @@ func main() { if err := mcpSvc.Run(ctx); err != nil { return core.Result{Value: err, OK: false} } + c.ServiceShutdown(context.Background()) return core.Result{OK: true} }, }) @@ -441,17 +427,6 @@ func main() { } } - procFactory := process.NewService(process.Options{}) - procResult, err := procFactory(c) - if err != nil { - return core.Result{Value: err, OK: false} - } - if procSvc, ok := procResult.(*process.Service); ok { - _ = process.SetDefault(procSvc) - } - - prep := agentic.NewPrep() - core.Print(os.Stderr, "core-agent run task") core.Print(os.Stderr, " repo: %s/%s", org, repo) core.Print(os.Stderr, " agent: %s", agent) @@ -462,7 +437,7 @@ func main() { core.Print(os.Stderr, "") // Dispatch and wait - result := prep.DispatchSync(ctx, agentic.DispatchSyncInput{ + result := agenticSvc.DispatchSync(ctx, agentic.DispatchSyncInput{ Org: org, Repo: repo, Agent: agent, @@ -487,22 +462,7 @@ func main() { c.Command("run/orchestrator", core.Command{ Description: "Run the queue orchestrator (standalone, no MCP)", Action: func(opts core.Options) core.Result { - procFactory := process.NewService(process.Options{}) - procResult, err := procFactory(c) - if err != nil { - return core.Result{Value: err, OK: false} - } - if procSvc, ok := procResult.(*process.Service); ok { - _ = process.SetDefault(procSvc) - } - - mon := monitor.New() - prep := agentic.NewPrep() - prep.SetCore(c) - mon.SetCore(c) - - mon.Start(ctx) - prep.StartRunner() + c.ServiceStartup(ctx, nil) core.Print(os.Stderr, "core-agent orchestrator running (pid %s)", core.Env("PID")) core.Print(os.Stderr, " workspace: %s", agentic.WorkspaceRoot()) @@ -511,6 +471,7 @@ func main() { // Block until signal <-ctx.Done() core.Print(os.Stderr, "orchestrator shutting down") + c.ServiceShutdown(context.Background()) return core.Result{OK: true} }, }) diff --git a/pkg/agentic/dispatch.go b/pkg/agentic/dispatch.go index 196441b..79c0ceb 100644 --- a/pkg/agentic/dispatch.go +++ b/pkg/agentic/dispatch.go @@ -334,24 +334,30 @@ func (s *PrepSubsystem) spawnAgent(agent, prompt, wsDir string) (int, string, er }) } - if finalStatus == "completed" { - // Run QA before PR — if QA fails, mark as failed, don't PR - if !s.runQA(wsDir) { - finalStatus = "failed" - question = "QA check failed — build or tests did not pass" - if st, stErr := ReadStatus(wsDir); stErr == nil { - st.Status = finalStatus - st.Question = question - writeStatus(wsDir, st) + // Post-completion pipeline is handled by IPC handlers registered in main.go: + // AgentCompleted → QA handler → PRCreated handler → verify handler + // AgentCompleted → ingest handler + // AgentCompleted → poke handler + // + // Legacy inline pipeline kept as fallback when Core is not wired. + if s.core == nil { + if finalStatus == "completed" { + if !s.runQA(wsDir) { + finalStatus = "failed" + question = "QA check failed — build or tests did not pass" + if st, stErr := ReadStatus(wsDir); stErr == nil { + st.Status = finalStatus + st.Question = question + writeStatus(wsDir, st) + } + } else { + s.autoCreatePR(wsDir) + s.autoVerifyAndMerge(wsDir) } - } else { - s.autoCreatePR(wsDir) - s.autoVerifyAndMerge(wsDir) } + s.ingestFindings(wsDir) + s.Poke() } - - s.ingestFindings(wsDir) - s.Poke() }() return pid, outputFile, nil diff --git a/pkg/agentic/handlers.go b/pkg/agentic/handlers.go new file mode 100644 index 0000000..961392e --- /dev/null +++ b/pkg/agentic/handlers.go @@ -0,0 +1,121 @@ +// SPDX-License-Identifier: EUPL-1.2 + +// IPC handlers for the agent completion pipeline. +// Registered via RegisterHandlers() — breaks the monolith dispatch goroutine +// into discrete, testable steps connected by Core IPC messages. + +package agentic + +import ( + "dappco.re/go/agent/pkg/messages" + core "dappco.re/go/core" +) + +// RegisterHandlers registers the post-completion pipeline as discrete IPC handlers. +// Each handler listens for a specific message and emits the next in the chain: +// +// AgentCompleted → QA handler → QAResult +// QAResult{Passed} → PR handler → PRCreated +// PRCreated → Verify handler → PRMerged | PRNeedsReview +// AgentCompleted → Ingest handler (findings → issues) +// AgentCompleted → Poke handler (drain queue) +// +// agentic.RegisterHandlers(c, prep) +func RegisterHandlers(c *core.Core, s *PrepSubsystem) { + // QA: run build+test on completed workspaces + c.RegisterAction(func(c *core.Core, msg core.Message) core.Result { + ev, ok := msg.(messages.AgentCompleted) + if !ok || ev.Status != "completed" { + return core.Result{OK: true} + } + wsDir := resolveWorkspace(ev.Workspace) + if wsDir == "" { + return core.Result{OK: true} + } + + passed := s.runQA(wsDir) + if !passed { + // Update status to failed + if st, err := ReadStatus(wsDir); err == nil { + st.Status = "failed" + st.Question = "QA check failed — build or tests did not pass" + writeStatus(wsDir, st) + } + } + + c.ACTION(messages.QAResult{ + Workspace: ev.Workspace, + Repo: ev.Repo, + Passed: passed, + }) + return core.Result{OK: true} + }) + + // Auto-PR: create PR on QA pass + c.RegisterAction(func(c *core.Core, msg core.Message) core.Result { + ev, ok := msg.(messages.QAResult) + if !ok || !ev.Passed { + return core.Result{OK: true} + } + wsDir := resolveWorkspace(ev.Workspace) + if wsDir == "" { + return core.Result{OK: true} + } + + s.autoCreatePR(wsDir) + return core.Result{OK: true} + }) + + // Auto-verify: verify and merge after PR creation + c.RegisterAction(func(c *core.Core, msg core.Message) core.Result { + ev, ok := msg.(messages.QAResult) + if !ok || !ev.Passed { + return core.Result{OK: true} + } + wsDir := resolveWorkspace(ev.Workspace) + if wsDir == "" { + return core.Result{OK: true} + } + + s.autoVerifyAndMerge(wsDir) + return core.Result{OK: true} + }) + + // Ingest: create issues from agent findings + c.RegisterAction(func(c *core.Core, msg core.Message) core.Result { + ev, ok := msg.(messages.AgentCompleted) + if !ok { + return core.Result{OK: true} + } + wsDir := resolveWorkspace(ev.Workspace) + if wsDir == "" { + return core.Result{OK: true} + } + + s.ingestFindings(wsDir) + return core.Result{OK: true} + }) + + // Poke: drain queue after any completion + c.RegisterAction(func(c *core.Core, msg core.Message) core.Result { + if _, ok := msg.(messages.AgentCompleted); ok { + s.Poke() + } + if _, ok := msg.(messages.PokeQueue); ok { + s.drainQueue() + } + return core.Result{OK: true} + }) +} + +// resolveWorkspace converts a workspace name back to the full path. +// +// resolveWorkspace("core/go-io/task-5") → "/Users/snider/Code/.core/workspace/core/go-io/task-5" +func resolveWorkspace(name string) string { + wsRoot := WorkspaceRoot() + path := core.JoinPath(wsRoot, name) + if fs.IsDir(path) { + return path + } + return "" +} diff --git a/pkg/agentic/register.go b/pkg/agentic/register.go index 6b36649..884e491 100644 --- a/pkg/agentic/register.go +++ b/pkg/agentic/register.go @@ -13,6 +13,14 @@ import ( // core.New( // core.WithService(agentic.Register), // ) +// Register is the service factory for core.WithService. +// It creates the PrepSubsystem, wires Core, registers lifecycle hooks, +// and registers IPC handlers — all during Core construction. +// The PrepSubsystem instance is stored in Config for retrieval by MCP. +// +// core.New( +// core.WithService(agentic.Register), +// ) func Register(c *core.Core) core.Result { prep := NewPrep() prep.core = c @@ -30,5 +38,8 @@ func Register(c *core.Core) core.Result { RegisterHandlers(c, prep) + // Store instance for MCP tool registration + c.Config().Set("agentic.instance", prep) + return core.Result{OK: true} } diff --git a/pkg/brain/register.go b/pkg/brain/register.go index 34bfb5a..c4f84b2 100644 --- a/pkg/brain/register.go +++ b/pkg/brain/register.go @@ -17,6 +17,8 @@ func Register(c *core.Core) core.Result { c.Service("brain", core.Service{}) - _ = brn // brain instance available for MCP tool registration + // Store instance for MCP tool registration + c.Config().Set("brain.instance", brn) + return core.Result{OK: true} } diff --git a/pkg/monitor/register.go b/pkg/monitor/register.go index 027e075..53cc309 100644 --- a/pkg/monitor/register.go +++ b/pkg/monitor/register.go @@ -36,5 +36,8 @@ func Register(c *core.Core) core.Result { return core.Result{OK: true} }) + // Store instance for MCP tool registration + c.Config().Set("monitor.instance", mon) + return core.Result{OK: true} }