diff --git a/pkg/agentic/dispatch.go b/pkg/agentic/dispatch.go index 79c0ceb..3321752 100644 --- a/pkg/agentic/dispatch.go +++ b/pkg/agentic/dispatch.go @@ -334,30 +334,10 @@ func (s *PrepSubsystem) spawnAgent(agent, prompt, wsDir string) (int, string, er }) } - // Post-completion pipeline is handled by IPC handlers registered in main.go: - // AgentCompleted → QA handler → PRCreated handler → verify handler - // AgentCompleted → ingest handler - // AgentCompleted → poke handler - // - // Legacy inline pipeline kept as fallback when Core is not wired. - if s.core == nil { - if finalStatus == "completed" { - if !s.runQA(wsDir) { - finalStatus = "failed" - question = "QA check failed — build or tests did not pass" - if st, stErr := ReadStatus(wsDir); stErr == nil { - st.Status = finalStatus - st.Question = question - writeStatus(wsDir, st) - } - } else { - s.autoCreatePR(wsDir) - s.autoVerifyAndMerge(wsDir) - } - } - s.ingestFindings(wsDir) - s.Poke() - } + // Post-completion pipeline handled by IPC handlers: + // AgentCompleted → QA → PRCreated → Verify → PRMerged|PRNeedsReview + // AgentCompleted → Ingest + // AgentCompleted → Poke }() return pid, outputFile, nil diff --git a/pkg/agentic/handlers.go b/pkg/agentic/handlers.go index 961392e..cc62dc0 100644 --- a/pkg/agentic/handlers.go +++ b/pkg/agentic/handlers.go @@ -51,7 +51,7 @@ func RegisterHandlers(c *core.Core, s *PrepSubsystem) { return core.Result{OK: true} }) - // Auto-PR: create PR on QA pass + // Auto-PR: create PR on QA pass, emit PRCreated c.RegisterAction(func(c *core.Core, msg core.Message) core.Result { ev, ok := msg.(messages.QAResult) if !ok || !ev.Passed { @@ -63,21 +63,51 @@ func RegisterHandlers(c *core.Core, s *PrepSubsystem) { } s.autoCreatePR(wsDir) + + // Check if PR was created (stored in status by autoCreatePR) + if st, err := ReadStatus(wsDir); err == nil && st.PRURL != "" { + c.ACTION(messages.PRCreated{ + Repo: st.Repo, + Branch: st.Branch, + PRURL: st.PRURL, + PRNum: extractPRNumber(st.PRURL), + }) + } return core.Result{OK: true} }) // Auto-verify: verify and merge after PR creation c.RegisterAction(func(c *core.Core, msg core.Message) core.Result { - ev, ok := msg.(messages.QAResult) - if !ok || !ev.Passed { + ev, ok := msg.(messages.PRCreated) + if !ok { return core.Result{OK: true} } - wsDir := resolveWorkspace(ev.Workspace) + + // Find workspace for this repo+branch + wsDir := findWorkspaceByPR(ev.Repo, ev.Branch) if wsDir == "" { return core.Result{OK: true} } s.autoVerifyAndMerge(wsDir) + + // Check final status + if st, err := ReadStatus(wsDir); err == nil { + if st.Status == "merged" { + c.ACTION(messages.PRMerged{ + Repo: ev.Repo, + PRURL: ev.PRURL, + PRNum: ev.PRNum, + }) + } else if st.Question != "" { + c.ACTION(messages.PRNeedsReview{ + Repo: ev.Repo, + PRURL: ev.PRURL, + PRNum: ev.PRNum, + Reason: st.Question, + }) + } + } return core.Result{OK: true} }) @@ -119,3 +149,22 @@ func resolveWorkspace(name string) string { } return "" } + +// findWorkspaceByPR finds a workspace directory by repo name and branch. +// Scans running/completed workspaces for a matching repo+branch combination. +func findWorkspaceByPR(repo, branch string) string { + wsRoot := WorkspaceRoot() + old := core.PathGlob(core.JoinPath(wsRoot, "*", "status.json")) + deep := core.PathGlob(core.JoinPath(wsRoot, "*", "*", "*", "status.json")) + for _, path := range append(old, deep...) { + wsDir := core.PathDir(path) + st, err := ReadStatus(wsDir) + if err != nil { + continue + } + if st.Repo == repo && st.Branch == branch { + return wsDir + } + } + return "" +} diff --git a/pkg/agentic/queue.go b/pkg/agentic/queue.go index 9aa04d1..d361f40 100644 --- a/pkg/agentic/queue.go +++ b/pkg/agentic/queue.go @@ -212,9 +212,18 @@ func baseAgent(agent string) string { // // codex: {total: 2, models: {gpt-5.4: 1}} → max 2 codex total, max 1 gpt-5.4 func (s *PrepSubsystem) canDispatchAgent(agent string) bool { - cfg := s.loadAgentsConfig() + // Read concurrency from shared config (loaded once at startup) + var concurrency map[string]ConcurrencyLimit + if s.core != nil { + concurrency = core.ConfigGet[map[string]ConcurrencyLimit](s.core.Config(), "agents.concurrency") + } + if concurrency == nil { + cfg := s.loadAgentsConfig() + concurrency = cfg.Concurrency + } + base := baseAgent(agent) - limit, ok := cfg.Concurrency[base] + limit, ok := concurrency[base] if !ok || limit.Total <= 0 { return true } @@ -253,13 +262,18 @@ func modelVariant(agent string) string { } // drainQueue fills all available concurrency slots from queued workspaces. -// Loops until no slots remain or no queued tasks match. Serialised via drainMu. +// Serialised via c.Lock("drain") when Core is available, falls back to local mutex. func (s *PrepSubsystem) drainQueue() { if s.frozen { return } - s.drainMu.Lock() - defer s.drainMu.Unlock() + if s.core != nil { + s.core.Lock("drain").Mutex.Lock() + defer s.core.Lock("drain").Mutex.Unlock() + } else { + s.drainMu.Lock() + defer s.drainMu.Unlock() + } for s.drainOne() { // keep filling slots diff --git a/pkg/agentic/register.go b/pkg/agentic/register.go index 884e491..b428e2a 100644 --- a/pkg/agentic/register.go +++ b/pkg/agentic/register.go @@ -25,6 +25,12 @@ func Register(c *core.Core) core.Result { prep := NewPrep() prep.core = c + // Load agents config once into Core shared config + cfg := prep.loadAgentsConfig() + c.Config().Set("agents.concurrency", cfg.Concurrency) + c.Config().Set("agents.rates", cfg.Rates) + c.Config().Set("agents.dispatch", cfg.Dispatch) + c.Service("agentic", core.Service{ OnStart: func() core.Result { prep.StartRunner()