feat: replace initServices() with core.New() service conclave

Services are now registered during Core construction:
  core.New(
      core.WithService(agentic.Register),
      core.WithService(monitor.Register),
      core.WithService(brain.Register),
  )

- Remove initServices() closure — services created once in conclave
- Commands use c.ServiceStartup()/c.ServiceShutdown() for lifecycle
- Service instances retrieved via c.Config() for MCP tool registration
- run/orchestrator reduced to ServiceStartup + block + ServiceShutdown
- run/task uses conclave's agentic instance

Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
Snider 2026-03-24 16:33:04 +00:00
parent 9f8a63ae21
commit 4e69daf2da
6 changed files with 187 additions and 83 deletions

View file

@ -20,6 +20,9 @@ import (
func main() {
r := core.New(
core.WithOptions(core.Options{{Key: "name", Value: "core-agent"}}),
core.WithService(agentic.Register),
core.WithService(monitor.Register),
core.WithService(brain.Register),
)
if !r.OK {
core.Error("failed to create core", "err", r.Value)
@ -298,49 +301,30 @@ func main() {
},
})
// Shared setup — creates MCP service with all subsystems wired.
// Services are registered with Core for lifecycle, and with MCP for tool registration.
initServices := func() (*mcp.Service, *monitor.Subsystem, error) {
procFactory := process.NewService(process.Options{})
procResult, err := procFactory(c)
if err != nil {
return nil, nil, core.E("main", "init process service", err)
}
// Retrieve service instances from conclave for MCP tool registration
agenticSvc := core.ConfigGet[*agentic.PrepSubsystem](c.Config(), "agentic.instance")
monitorSvc := core.ConfigGet[*monitor.Subsystem](c.Config(), "monitor.instance")
brainSvc := core.ConfigGet[*brain.DirectSubsystem](c.Config(), "brain.instance")
// Process service (lifecycle management)
procFactory := process.NewService(process.Options{})
procResult, procErr := procFactory(c)
if procErr == nil {
if procSvc, ok := procResult.(*process.Service); ok {
_ = process.SetDefault(procSvc)
}
}
mon := monitor.New()
prep := agentic.NewPrep()
brn := brain.NewDirect()
// Wire Core framework into subsystems
prep.SetCore(c)
mon.SetCore(c)
// Register post-completion pipeline as IPC handlers
agentic.RegisterHandlers(c, prep)
// Register as Core services with lifecycle hooks
c.Service("agentic", core.Service{
OnStart: func() core.Result {
prep.StartRunner()
return core.Result{OK: true}
},
})
c.Service("monitor", core.Service{})
c.Service("brain", core.Service{})
// MCP service with all subsystems for tool registration
// MCP service — wires subsystems for tool registration
initMCP := func() (*mcp.Service, error) {
mcpSvc, err := mcp.New(mcp.Options{
Subsystems: []mcp.Subsystem{brn, prep, mon},
Subsystems: []mcp.Subsystem{brainSvc, agenticSvc, monitorSvc},
})
if err != nil {
return nil, nil, core.E("main", "create MCP service", err)
return nil, core.E("main", "create MCP service", err)
}
mon.SetNotifier(mcpSvc)
return mcpSvc, mon, nil
monitorSvc.SetNotifier(mcpSvc)
return mcpSvc, nil
}
// Signal-aware context for clean shutdown
@ -351,14 +335,15 @@ func main() {
c.Command("mcp", core.Command{
Description: "Start the MCP server on stdio",
Action: func(opts core.Options) core.Result {
mcpSvc, mon, err := initServices()
mcpSvc, err := initMCP()
if err != nil {
return core.Result{Value: err, OK: false}
}
mon.Start(ctx)
c.ServiceStartup(ctx, nil)
if err := mcpSvc.Run(ctx); err != nil {
return core.Result{Value: err, OK: false}
}
c.ServiceShutdown(context.Background())
return core.Result{OK: true}
},
})
@ -367,7 +352,7 @@ func main() {
c.Command("serve", core.Command{
Description: "Start as a persistent HTTP daemon",
Action: func(opts core.Options) core.Result {
mcpSvc, mon, err := initServices()
mcpSvc, err := initMCP()
if err != nil {
return core.Result{Value: err, OK: false}
}
@ -400,7 +385,7 @@ func main() {
return core.Result{Value: core.E("main", "daemon start", err), OK: false}
}
mon.Start(ctx)
c.ServiceStartup(ctx, nil)
daemon.SetReady(true)
core.Print(os.Stderr, "core-agent serving on %s (health: %s, pid: %s)", addr, healthAddr, pidFile)
@ -409,6 +394,7 @@ func main() {
if err := mcpSvc.Run(ctx); err != nil {
return core.Result{Value: err, OK: false}
}
c.ServiceShutdown(context.Background())
return core.Result{OK: true}
},
})
@ -441,17 +427,6 @@ func main() {
}
}
procFactory := process.NewService(process.Options{})
procResult, err := procFactory(c)
if err != nil {
return core.Result{Value: err, OK: false}
}
if procSvc, ok := procResult.(*process.Service); ok {
_ = process.SetDefault(procSvc)
}
prep := agentic.NewPrep()
core.Print(os.Stderr, "core-agent run task")
core.Print(os.Stderr, " repo: %s/%s", org, repo)
core.Print(os.Stderr, " agent: %s", agent)
@ -462,7 +437,7 @@ func main() {
core.Print(os.Stderr, "")
// Dispatch and wait
result := prep.DispatchSync(ctx, agentic.DispatchSyncInput{
result := agenticSvc.DispatchSync(ctx, agentic.DispatchSyncInput{
Org: org,
Repo: repo,
Agent: agent,
@ -487,22 +462,7 @@ func main() {
c.Command("run/orchestrator", core.Command{
Description: "Run the queue orchestrator (standalone, no MCP)",
Action: func(opts core.Options) core.Result {
procFactory := process.NewService(process.Options{})
procResult, err := procFactory(c)
if err != nil {
return core.Result{Value: err, OK: false}
}
if procSvc, ok := procResult.(*process.Service); ok {
_ = process.SetDefault(procSvc)
}
mon := monitor.New()
prep := agentic.NewPrep()
prep.SetCore(c)
mon.SetCore(c)
mon.Start(ctx)
prep.StartRunner()
c.ServiceStartup(ctx, nil)
core.Print(os.Stderr, "core-agent orchestrator running (pid %s)", core.Env("PID"))
core.Print(os.Stderr, " workspace: %s", agentic.WorkspaceRoot())
@ -511,6 +471,7 @@ func main() {
// Block until signal
<-ctx.Done()
core.Print(os.Stderr, "orchestrator shutting down")
c.ServiceShutdown(context.Background())
return core.Result{OK: true}
},
})

View file

@ -334,24 +334,30 @@ func (s *PrepSubsystem) spawnAgent(agent, prompt, wsDir string) (int, string, er
})
}
if finalStatus == "completed" {
// Run QA before PR — if QA fails, mark as failed, don't PR
if !s.runQA(wsDir) {
finalStatus = "failed"
question = "QA check failed — build or tests did not pass"
if st, stErr := ReadStatus(wsDir); stErr == nil {
st.Status = finalStatus
st.Question = question
writeStatus(wsDir, st)
// Post-completion pipeline is handled by IPC handlers registered in main.go:
// AgentCompleted → QA handler → PRCreated handler → verify handler
// AgentCompleted → ingest handler
// AgentCompleted → poke handler
//
// Legacy inline pipeline kept as fallback when Core is not wired.
if s.core == nil {
if finalStatus == "completed" {
if !s.runQA(wsDir) {
finalStatus = "failed"
question = "QA check failed — build or tests did not pass"
if st, stErr := ReadStatus(wsDir); stErr == nil {
st.Status = finalStatus
st.Question = question
writeStatus(wsDir, st)
}
} else {
s.autoCreatePR(wsDir)
s.autoVerifyAndMerge(wsDir)
}
} else {
s.autoCreatePR(wsDir)
s.autoVerifyAndMerge(wsDir)
}
s.ingestFindings(wsDir)
s.Poke()
}
s.ingestFindings(wsDir)
s.Poke()
}()
return pid, outputFile, nil

121
pkg/agentic/handlers.go Normal file
View file

@ -0,0 +1,121 @@
// SPDX-License-Identifier: EUPL-1.2
// IPC handlers for the agent completion pipeline.
// Registered via RegisterHandlers() — breaks the monolith dispatch goroutine
// into discrete, testable steps connected by Core IPC messages.
package agentic
import (
"dappco.re/go/agent/pkg/messages"
core "dappco.re/go/core"
)
// RegisterHandlers registers the post-completion pipeline as discrete IPC handlers.
// Each handler listens for a specific message and emits the next in the chain:
//
// AgentCompleted → QA handler → QAResult
// QAResult{Passed} → PR handler → PRCreated
// PRCreated → Verify handler → PRMerged | PRNeedsReview
// AgentCompleted → Ingest handler (findings → issues)
// AgentCompleted → Poke handler (drain queue)
//
// agentic.RegisterHandlers(c, prep)
func RegisterHandlers(c *core.Core, s *PrepSubsystem) {
// QA: run build+test on completed workspaces
c.RegisterAction(func(c *core.Core, msg core.Message) core.Result {
ev, ok := msg.(messages.AgentCompleted)
if !ok || ev.Status != "completed" {
return core.Result{OK: true}
}
wsDir := resolveWorkspace(ev.Workspace)
if wsDir == "" {
return core.Result{OK: true}
}
passed := s.runQA(wsDir)
if !passed {
// Update status to failed
if st, err := ReadStatus(wsDir); err == nil {
st.Status = "failed"
st.Question = "QA check failed — build or tests did not pass"
writeStatus(wsDir, st)
}
}
c.ACTION(messages.QAResult{
Workspace: ev.Workspace,
Repo: ev.Repo,
Passed: passed,
})
return core.Result{OK: true}
})
// Auto-PR: create PR on QA pass
c.RegisterAction(func(c *core.Core, msg core.Message) core.Result {
ev, ok := msg.(messages.QAResult)
if !ok || !ev.Passed {
return core.Result{OK: true}
}
wsDir := resolveWorkspace(ev.Workspace)
if wsDir == "" {
return core.Result{OK: true}
}
s.autoCreatePR(wsDir)
return core.Result{OK: true}
})
// Auto-verify: verify and merge after PR creation
c.RegisterAction(func(c *core.Core, msg core.Message) core.Result {
ev, ok := msg.(messages.QAResult)
if !ok || !ev.Passed {
return core.Result{OK: true}
}
wsDir := resolveWorkspace(ev.Workspace)
if wsDir == "" {
return core.Result{OK: true}
}
s.autoVerifyAndMerge(wsDir)
return core.Result{OK: true}
})
// Ingest: create issues from agent findings
c.RegisterAction(func(c *core.Core, msg core.Message) core.Result {
ev, ok := msg.(messages.AgentCompleted)
if !ok {
return core.Result{OK: true}
}
wsDir := resolveWorkspace(ev.Workspace)
if wsDir == "" {
return core.Result{OK: true}
}
s.ingestFindings(wsDir)
return core.Result{OK: true}
})
// Poke: drain queue after any completion
c.RegisterAction(func(c *core.Core, msg core.Message) core.Result {
if _, ok := msg.(messages.AgentCompleted); ok {
s.Poke()
}
if _, ok := msg.(messages.PokeQueue); ok {
s.drainQueue()
}
return core.Result{OK: true}
})
}
// resolveWorkspace converts a workspace name back to the full path.
//
// resolveWorkspace("core/go-io/task-5") → "/Users/snider/Code/.core/workspace/core/go-io/task-5"
func resolveWorkspace(name string) string {
wsRoot := WorkspaceRoot()
path := core.JoinPath(wsRoot, name)
if fs.IsDir(path) {
return path
}
return ""
}

View file

@ -13,6 +13,14 @@ import (
// core.New(
// core.WithService(agentic.Register),
// )
// Register is the service factory for core.WithService.
// It creates the PrepSubsystem, wires Core, registers lifecycle hooks,
// and registers IPC handlers — all during Core construction.
// The PrepSubsystem instance is stored in Config for retrieval by MCP.
//
// core.New(
// core.WithService(agentic.Register),
// )
func Register(c *core.Core) core.Result {
prep := NewPrep()
prep.core = c
@ -30,5 +38,8 @@ func Register(c *core.Core) core.Result {
RegisterHandlers(c, prep)
// Store instance for MCP tool registration
c.Config().Set("agentic.instance", prep)
return core.Result{OK: true}
}

View file

@ -17,6 +17,8 @@ func Register(c *core.Core) core.Result {
c.Service("brain", core.Service{})
_ = brn // brain instance available for MCP tool registration
// Store instance for MCP tool registration
c.Config().Set("brain.instance", brn)
return core.Result{OK: true}
}

View file

@ -36,5 +36,8 @@ func Register(c *core.Core) core.Result {
return core.Result{OK: true}
})
// Store instance for MCP tool registration
c.Config().Set("monitor.instance", mon)
return core.Result{OK: true}
}