feat: complete DI migration — IPC pipeline + Config + Locks

Phase 4 complete:
- Auto-PR handler emits PRCreated message
- Verify handler listens for PRCreated, emits PRMerged/PRNeedsReview
- findWorkspaceByPR() for workspace lookup from PR events
- Remove legacy inline fallback from dispatch goroutine

Phase 5 complete:
- agents.yaml loaded once at startup into c.Config()
- canDispatchAgent reads from c.Config() (no re-parsing)
- drainQueue uses c.Lock("drain") when Core available

Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
Snider 2026-03-24 16:44:19 +00:00
parent 4e69daf2da
commit d9e7fa092b
4 changed files with 82 additions and 33 deletions

View file

@ -334,30 +334,10 @@ func (s *PrepSubsystem) spawnAgent(agent, prompt, wsDir string) (int, string, er
})
}
// Post-completion pipeline is handled by IPC handlers registered in main.go:
// AgentCompleted → QA handler → PRCreated handler → verify handler
// AgentCompleted → ingest handler
// AgentCompleted → poke handler
//
// Legacy inline pipeline kept as fallback when Core is not wired.
if s.core == nil {
if finalStatus == "completed" {
if !s.runQA(wsDir) {
finalStatus = "failed"
question = "QA check failed — build or tests did not pass"
if st, stErr := ReadStatus(wsDir); stErr == nil {
st.Status = finalStatus
st.Question = question
writeStatus(wsDir, st)
}
} else {
s.autoCreatePR(wsDir)
s.autoVerifyAndMerge(wsDir)
}
}
s.ingestFindings(wsDir)
s.Poke()
}
// Post-completion pipeline handled by IPC handlers:
// AgentCompleted → QA → PRCreated → Verify → PRMerged|PRNeedsReview
// AgentCompleted → Ingest
// AgentCompleted → Poke
}()
return pid, outputFile, nil

View file

@ -51,7 +51,7 @@ func RegisterHandlers(c *core.Core, s *PrepSubsystem) {
return core.Result{OK: true}
})
// Auto-PR: create PR on QA pass
// Auto-PR: create PR on QA pass, emit PRCreated
c.RegisterAction(func(c *core.Core, msg core.Message) core.Result {
ev, ok := msg.(messages.QAResult)
if !ok || !ev.Passed {
@ -63,21 +63,51 @@ func RegisterHandlers(c *core.Core, s *PrepSubsystem) {
}
s.autoCreatePR(wsDir)
// Check if PR was created (stored in status by autoCreatePR)
if st, err := ReadStatus(wsDir); err == nil && st.PRURL != "" {
c.ACTION(messages.PRCreated{
Repo: st.Repo,
Branch: st.Branch,
PRURL: st.PRURL,
PRNum: extractPRNumber(st.PRURL),
})
}
return core.Result{OK: true}
})
// Auto-verify: verify and merge after PR creation
c.RegisterAction(func(c *core.Core, msg core.Message) core.Result {
ev, ok := msg.(messages.QAResult)
if !ok || !ev.Passed {
ev, ok := msg.(messages.PRCreated)
if !ok {
return core.Result{OK: true}
}
wsDir := resolveWorkspace(ev.Workspace)
// Find workspace for this repo+branch
wsDir := findWorkspaceByPR(ev.Repo, ev.Branch)
if wsDir == "" {
return core.Result{OK: true}
}
s.autoVerifyAndMerge(wsDir)
// Check final status
if st, err := ReadStatus(wsDir); err == nil {
if st.Status == "merged" {
c.ACTION(messages.PRMerged{
Repo: ev.Repo,
PRURL: ev.PRURL,
PRNum: ev.PRNum,
})
} else if st.Question != "" {
c.ACTION(messages.PRNeedsReview{
Repo: ev.Repo,
PRURL: ev.PRURL,
PRNum: ev.PRNum,
Reason: st.Question,
})
}
}
return core.Result{OK: true}
})
@ -119,3 +149,22 @@ func resolveWorkspace(name string) string {
}
return ""
}
// findWorkspaceByPR finds a workspace directory by repo name and branch.
// Scans running/completed workspaces for a matching repo+branch combination.
func findWorkspaceByPR(repo, branch string) string {
wsRoot := WorkspaceRoot()
old := core.PathGlob(core.JoinPath(wsRoot, "*", "status.json"))
deep := core.PathGlob(core.JoinPath(wsRoot, "*", "*", "*", "status.json"))
for _, path := range append(old, deep...) {
wsDir := core.PathDir(path)
st, err := ReadStatus(wsDir)
if err != nil {
continue
}
if st.Repo == repo && st.Branch == branch {
return wsDir
}
}
return ""
}

View file

@ -212,9 +212,18 @@ func baseAgent(agent string) string {
//
// codex: {total: 2, models: {gpt-5.4: 1}} → max 2 codex total, max 1 gpt-5.4
func (s *PrepSubsystem) canDispatchAgent(agent string) bool {
cfg := s.loadAgentsConfig()
// Read concurrency from shared config (loaded once at startup)
var concurrency map[string]ConcurrencyLimit
if s.core != nil {
concurrency = core.ConfigGet[map[string]ConcurrencyLimit](s.core.Config(), "agents.concurrency")
}
if concurrency == nil {
cfg := s.loadAgentsConfig()
concurrency = cfg.Concurrency
}
base := baseAgent(agent)
limit, ok := cfg.Concurrency[base]
limit, ok := concurrency[base]
if !ok || limit.Total <= 0 {
return true
}
@ -253,13 +262,18 @@ func modelVariant(agent string) string {
}
// drainQueue fills all available concurrency slots from queued workspaces.
// Loops until no slots remain or no queued tasks match. Serialised via drainMu.
// Serialised via c.Lock("drain") when Core is available, falls back to local mutex.
func (s *PrepSubsystem) drainQueue() {
if s.frozen {
return
}
s.drainMu.Lock()
defer s.drainMu.Unlock()
if s.core != nil {
s.core.Lock("drain").Mutex.Lock()
defer s.core.Lock("drain").Mutex.Unlock()
} else {
s.drainMu.Lock()
defer s.drainMu.Unlock()
}
for s.drainOne() {
// keep filling slots

View file

@ -25,6 +25,12 @@ func Register(c *core.Core) core.Result {
prep := NewPrep()
prep.core = c
// Load agents config once into Core shared config
cfg := prep.loadAgentsConfig()
c.Config().Set("agents.concurrency", cfg.Concurrency)
c.Config().Set("agents.rates", cfg.Rates)
c.Config().Set("agents.dispatch", cfg.Dispatch)
c.Service("agentic", core.Service{
OnStart: func() core.Result {
prep.StartRunner()