diff --git a/cmd/core-agent/main.go b/cmd/core-agent/main.go index f0d3c24..bfaa6c8 100644 --- a/cmd/core-agent/main.go +++ b/cmd/core-agent/main.go @@ -4,7 +4,6 @@ import ( "context" "os" "os/signal" - "strconv" "syscall" "dappco.re/go/core" @@ -33,20 +32,7 @@ func main() { core.WithService(agentic.Register), core.WithService(monitor.Register), core.WithService(brain.Register), - // MCP — registered last, retrieves other services for tool registration - core.WithName("mcp", func(c *core.Core) core.Result { - agSvc, _ := core.ServiceFor[*agentic.PrepSubsystem](c, "agentic") - monSvc, _ := core.ServiceFor[*monitor.Subsystem](c, "monitor") - brnSvc, _ := core.ServiceFor[*brain.DirectSubsystem](c, "brain") - mcpSvc, err := mcp.New(mcp.Options{ - Subsystems: []mcp.Subsystem{brnSvc, agSvc, monSvc}, - }) - if err != nil { - return core.Result{Value: core.E("main", "create MCP service", err), OK: false} - } - monSvc.SetNotifier(mcpSvc) - return core.Result{Value: mcpSvc, OK: true} - }), + core.WithService(mcp.Register), ) if !r.OK { core.Error("failed to create core", "err", r.Value) @@ -329,12 +315,14 @@ func main() { ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) defer cancel() + // Start all services — registers commands, starts runners, wires IPC + c.ServiceStartup(ctx, nil) + // mcp — stdio transport (Claude Code integration) c.Command("mcp", core.Command{ Description: "Start the MCP server on stdio", Action: func(opts core.Options) core.Result { - mcpSvc, _ := core.ServiceFor[*mcp.Service](c, "mcp") - c.ServiceStartup(ctx, nil) + mcpSvc := c.Service("mcp").Value.(*mcp.Service) if err := mcpSvc.Run(ctx); err != nil { return core.Result{Value: err, OK: false} } @@ -347,7 +335,7 @@ func main() { c.Command("serve", core.Command{ Description: "Start as a persistent HTTP daemon", Action: func(opts core.Options) core.Result { - mcpSvc, _ := core.ServiceFor[*mcp.Service](c, "mcp") + mcpSvc := c.Service("mcp").Value.(*mcp.Service) addr := core.Env("MCP_HTTP_ADDR") if addr == "" { @@ -391,83 +379,9 @@ func main() { }, }) - // run task — single task e2e (prep → spawn → wait → done) - c.Command("run/task", core.Command{ - Description: "Run a single task end-to-end", - Action: func(opts core.Options) core.Result { - repo := opts.String("repo") - agent := opts.String("agent") - task := opts.String("task") - issueStr := opts.String("issue") - org := opts.String("org") - - if repo == "" || task == "" { - core.Print(nil, "usage: core-agent run task --repo= --task=\"...\" --agent=codex [--issue=N] [--org=core]") - return core.Result{OK: false} - } - if agent == "" { - agent = "codex" - } - if org == "" { - org = "core" - } - - issue := 0 - if issueStr != "" { - if n, err := strconv.Atoi(issueStr); err == nil { - issue = n - } - } - - core.Print(os.Stderr, "core-agent run task") - core.Print(os.Stderr, " repo: %s/%s", org, repo) - core.Print(os.Stderr, " agent: %s", agent) - if issue > 0 { - core.Print(os.Stderr, " issue: #%d", issue) - } - core.Print(os.Stderr, " task: %s", task) - core.Print(os.Stderr, "") - - // Dispatch and wait - agSvc, _ := core.ServiceFor[*agentic.PrepSubsystem](c, "agentic") - result := agSvc.DispatchSync(ctx, agentic.DispatchSyncInput{ - Org: org, - Repo: repo, - Agent: agent, - Task: task, - Issue: issue, - }) - - if !result.OK { - core.Print(os.Stderr, "FAILED: %v", result.Error) - return core.Result{Value: result.Error, OK: false} - } - - core.Print(os.Stderr, "DONE: %s", result.Status) - if result.PRURL != "" { - core.Print(os.Stderr, " PR: %s", result.PRURL) - } - return core.Result{OK: true} - }, - }) - - // run orchestrator — standalone queue runner without MCP stdio - c.Command("run/orchestrator", core.Command{ - Description: "Run the queue orchestrator (standalone, no MCP)", - Action: func(opts core.Options) core.Result { - c.ServiceStartup(ctx, nil) - - core.Print(os.Stderr, "core-agent orchestrator running (pid %s)", core.Env("PID")) - core.Print(os.Stderr, " workspace: %s", agentic.WorkspaceRoot()) - core.Print(os.Stderr, " watching queue, draining on 30s tick + completion poke") - - // Block until signal - <-ctx.Done() - core.Print(os.Stderr, "orchestrator shutting down") - c.ServiceShutdown(context.Background()) - return core.Result{OK: true} - }, - }) + // Commands registered by services during OnStartup: + // - run/task (agentic) + // - run/orchestrator (agentic) // Run CLI — resolves os.Args to command path result := c.Cli().Run() diff --git a/pkg/agentic/prep.go b/pkg/agentic/prep.go index 9659db7..e56b14a 100644 --- a/pkg/agentic/prep.go +++ b/pkg/agentic/prep.go @@ -10,6 +10,7 @@ import ( "encoding/json" goio "io" "net/http" + "os" "os/exec" "sync" "time" @@ -87,12 +88,90 @@ func (s *PrepSubsystem) SetCore(c *core.Core) { s.core = c } -// OnStartup implements core.Startable — starts the queue runner. +// OnStartup implements core.Startable — starts the queue runner and registers commands. func (s *PrepSubsystem) OnStartup(ctx context.Context) error { s.StartRunner() + s.registerCommands(ctx) return nil } +// registerCommands adds agentic CLI commands to Core's command tree. +func (s *PrepSubsystem) registerCommands(ctx context.Context) { + c := s.core + + c.Command("run/task", core.Command{ + Description: "Run a single task end-to-end", + Action: func(opts core.Options) core.Result { + repo := opts.String("repo") + agent := opts.String("agent") + task := opts.String("task") + issueStr := opts.String("issue") + org := opts.String("org") + + if repo == "" || task == "" { + core.Print(nil, "usage: core-agent run task --repo= --task=\"...\" --agent=codex [--issue=N] [--org=core]") + return core.Result{OK: false} + } + if agent == "" { + agent = "codex" + } + if org == "" { + org = "core" + } + + issue := 0 + if issueStr != "" { + for _, ch := range issueStr { + if ch >= '0' && ch <= '9' { + issue = issue*10 + int(ch-'0') + } + } + } + + core.Print(os.Stderr, "core-agent run task") + core.Print(os.Stderr, " repo: %s/%s", org, repo) + core.Print(os.Stderr, " agent: %s", agent) + if issue > 0 { + core.Print(os.Stderr, " issue: #%d", issue) + } + core.Print(os.Stderr, " task: %s", task) + core.Print(os.Stderr, "") + + result := s.DispatchSync(ctx, DispatchSyncInput{ + Org: org, + Repo: repo, + Agent: agent, + Task: task, + Issue: issue, + }) + + if !result.OK { + core.Print(os.Stderr, "FAILED: %v", result.Error) + return core.Result{Value: result.Error, OK: false} + } + + core.Print(os.Stderr, "DONE: %s", result.Status) + if result.PRURL != "" { + core.Print(os.Stderr, " PR: %s", result.PRURL) + } + return core.Result{OK: true} + }, + }) + + c.Command("run/orchestrator", core.Command{ + Description: "Run the queue orchestrator (standalone, no MCP)", + Action: func(opts core.Options) core.Result { + core.Print(os.Stderr, "core-agent orchestrator running (pid %s)", core.Env("PID")) + core.Print(os.Stderr, " workspace: %s", WorkspaceRoot()) + core.Print(os.Stderr, " watching queue, draining on 30s tick + completion poke") + + <-ctx.Done() + core.Print(os.Stderr, "orchestrator shutting down") + return core.Result{OK: true} + }, + }) +} + // OnShutdown implements core.Stoppable — freezes the queue. func (s *PrepSubsystem) OnShutdown(ctx context.Context) error { s.frozen = true