feat(agentic): auto-drain queue when agent completes
When an agent process exits, the completion goroutine updates status to completed then calls drainQueue(). drainQueue finds the oldest queued workspace, spawns it if under concurrency limit, and monitors it recursively. Self-sustaining work pipeline. Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
parent
fb16e18919
commit
58c6f3687c
2 changed files with 91 additions and 3 deletions
|
|
@ -169,12 +169,21 @@ func (s *PrepSubsystem) dispatch(ctx context.Context, req *mcp.CallToolRequest,
|
|||
Runs: 1,
|
||||
})
|
||||
|
||||
// Background goroutine to close file handle when process exits.
|
||||
// This goroutine is fire-and-forget — if the MCP server dies,
|
||||
// the OS reclaims the fd anyway. The agent process survives.
|
||||
// Background goroutine: close file handle when process exits,
|
||||
// update status, then drain queue if a slot opened up.
|
||||
go func() {
|
||||
cmd.Wait()
|
||||
outFile.Close()
|
||||
|
||||
// Update status to completed
|
||||
if st, err := readStatus(wsDir); err == nil {
|
||||
st.Status = "completed"
|
||||
st.PID = 0
|
||||
writeStatus(wsDir, st)
|
||||
}
|
||||
|
||||
// Drain queue: pop next queued workspace and spawn it
|
||||
s.drainQueue()
|
||||
}()
|
||||
|
||||
return nil, DispatchOutput{
|
||||
|
|
|
|||
|
|
@ -3,7 +3,9 @@
|
|||
package agentic
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"syscall"
|
||||
|
||||
|
|
@ -97,3 +99,80 @@ func (s *PrepSubsystem) canDispatch() bool {
|
|||
}
|
||||
return s.countRunning() < cfg.Dispatch.MaxConcurrent
|
||||
}
|
||||
|
||||
// drainQueue finds the oldest queued workspace and spawns it if a slot is available.
|
||||
func (s *PrepSubsystem) drainQueue() {
|
||||
if !s.canDispatch() {
|
||||
return
|
||||
}
|
||||
|
||||
home, _ := os.UserHomeDir()
|
||||
wsRoot := filepath.Join(home, "Code", "host-uk", "core", ".core", "workspace")
|
||||
|
||||
entries, err := os.ReadDir(wsRoot)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Find oldest queued workspace (entries are sorted by name which includes timestamp)
|
||||
for _, entry := range entries {
|
||||
if !entry.IsDir() {
|
||||
continue
|
||||
}
|
||||
|
||||
wsDir := filepath.Join(wsRoot, entry.Name())
|
||||
st, err := readStatus(wsDir)
|
||||
if err != nil || st.Status != "queued" {
|
||||
continue
|
||||
}
|
||||
|
||||
// Found a queued workspace — spawn it
|
||||
srcDir := filepath.Join(wsDir, "src")
|
||||
prompt := "Read PROMPT.md for instructions. All context files (CLAUDE.md, TODO.md, CONTEXT.md, CONSUMERS.md, RECENT.md) are in the parent directory. Work in this directory."
|
||||
|
||||
command, args, err := agentCommand(st.Agent, prompt)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
outputFile := filepath.Join(wsDir, fmt.Sprintf("agent-%s.log", st.Agent))
|
||||
outFile, err := os.Create(outputFile)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
cmd := exec.Command(command, args...)
|
||||
cmd.Dir = srcDir
|
||||
cmd.Stdout = outFile
|
||||
cmd.Stderr = outFile
|
||||
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
|
||||
|
||||
if err := cmd.Start(); err != nil {
|
||||
outFile.Close()
|
||||
continue
|
||||
}
|
||||
|
||||
// Update status to running
|
||||
st.Status = "running"
|
||||
st.PID = cmd.Process.Pid
|
||||
st.Runs++
|
||||
writeStatus(wsDir, st)
|
||||
|
||||
// Monitor this one too
|
||||
go func() {
|
||||
cmd.Wait()
|
||||
outFile.Close()
|
||||
|
||||
if st2, err := readStatus(wsDir); err == nil {
|
||||
st2.Status = "completed"
|
||||
st2.PID = 0
|
||||
writeStatus(wsDir, st2)
|
||||
}
|
||||
|
||||
// Recursively drain — pick up next queued item
|
||||
s.drainQueue()
|
||||
}()
|
||||
|
||||
return // Only spawn one at a time
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue