From 58c6f3687c2d4b2f35eb41db95ad597ef6582c43 Mon Sep 17 00:00:00 2001 From: Snider Date: Sun, 15 Mar 2026 16:09:45 +0000 Subject: [PATCH] feat(agentic): auto-drain queue when agent completes When an agent process exits, the completion goroutine updates status to completed then calls drainQueue(). drainQueue finds the oldest queued workspace, spawns it if under concurrency limit, and monitors it recursively. Self-sustaining work pipeline. Co-Authored-By: Virgil --- pkg/mcp/agentic/dispatch.go | 15 +++++-- pkg/mcp/agentic/queue.go | 79 +++++++++++++++++++++++++++++++++++++ 2 files changed, 91 insertions(+), 3 deletions(-) diff --git a/pkg/mcp/agentic/dispatch.go b/pkg/mcp/agentic/dispatch.go index b19a782..03f07bc 100644 --- a/pkg/mcp/agentic/dispatch.go +++ b/pkg/mcp/agentic/dispatch.go @@ -169,12 +169,21 @@ func (s *PrepSubsystem) dispatch(ctx context.Context, req *mcp.CallToolRequest, Runs: 1, }) - // Background goroutine to close file handle when process exits. - // This goroutine is fire-and-forget — if the MCP server dies, - // the OS reclaims the fd anyway. The agent process survives. + // Background goroutine: close file handle when process exits, + // update status, then drain queue if a slot opened up. go func() { cmd.Wait() outFile.Close() + + // Update status to completed + if st, err := readStatus(wsDir); err == nil { + st.Status = "completed" + st.PID = 0 + writeStatus(wsDir, st) + } + + // Drain queue: pop next queued workspace and spawn it + s.drainQueue() }() return nil, DispatchOutput{ diff --git a/pkg/mcp/agentic/queue.go b/pkg/mcp/agentic/queue.go index 77cf150..702e532 100644 --- a/pkg/mcp/agentic/queue.go +++ b/pkg/mcp/agentic/queue.go @@ -3,7 +3,9 @@ package agentic import ( + "fmt" "os" + "os/exec" "path/filepath" "syscall" @@ -97,3 +99,80 @@ func (s *PrepSubsystem) canDispatch() bool { } return s.countRunning() < cfg.Dispatch.MaxConcurrent } + +// drainQueue finds the oldest queued workspace and spawns it if a slot is available. +func (s *PrepSubsystem) drainQueue() { + if !s.canDispatch() { + return + } + + home, _ := os.UserHomeDir() + wsRoot := filepath.Join(home, "Code", "host-uk", "core", ".core", "workspace") + + entries, err := os.ReadDir(wsRoot) + if err != nil { + return + } + + // Find oldest queued workspace (entries are sorted by name which includes timestamp) + for _, entry := range entries { + if !entry.IsDir() { + continue + } + + wsDir := filepath.Join(wsRoot, entry.Name()) + st, err := readStatus(wsDir) + if err != nil || st.Status != "queued" { + continue + } + + // Found a queued workspace — spawn it + srcDir := filepath.Join(wsDir, "src") + prompt := "Read PROMPT.md for instructions. All context files (CLAUDE.md, TODO.md, CONTEXT.md, CONSUMERS.md, RECENT.md) are in the parent directory. Work in this directory." + + command, args, err := agentCommand(st.Agent, prompt) + if err != nil { + continue + } + + outputFile := filepath.Join(wsDir, fmt.Sprintf("agent-%s.log", st.Agent)) + outFile, err := os.Create(outputFile) + if err != nil { + continue + } + + cmd := exec.Command(command, args...) + cmd.Dir = srcDir + cmd.Stdout = outFile + cmd.Stderr = outFile + cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} + + if err := cmd.Start(); err != nil { + outFile.Close() + continue + } + + // Update status to running + st.Status = "running" + st.PID = cmd.Process.Pid + st.Runs++ + writeStatus(wsDir, st) + + // Monitor this one too + go func() { + cmd.Wait() + outFile.Close() + + if st2, err := readStatus(wsDir); err == nil { + st2.Status = "completed" + st2.PID = 0 + writeStatus(wsDir, st2) + } + + // Recursively drain — pick up next queued item + s.drainQueue() + }() + + return // Only spawn one at a time + } +}