From 04e3d492e9b5ebb474526d34099e81c402ab9b9d Mon Sep 17 00:00:00 2001 From: Snider Date: Tue, 24 Mar 2026 13:02:41 +0000 Subject: [PATCH] fix(monitor): emit agent.completed per task, verify PIDs for queue.drained MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Export ReadStatus (was readStatus) for cross-package use - AgentCompleted now emits agent.completed with repo/agent/workspace/status for every finished task, not just failures - queue.drained only fires when genuinely empty — verified by checking PIDs are alive via kill(0), not just trusting stale status files - Fix Docker mount paths: /root/ → /home/dev/ for non-root container - Update all callers and tests Co-Authored-By: Virgil --- pkg/agentic/auto_pr.go | 8 ++-- pkg/agentic/dispatch.go | 20 ++++----- pkg/agentic/dispatch_sync.go | 2 +- pkg/agentic/ingest.go | 2 +- pkg/agentic/pr.go | 2 +- pkg/agentic/queue.go | 4 +- pkg/agentic/resume.go | 2 +- pkg/agentic/shutdown.go | 2 +- pkg/agentic/status.go | 11 +++-- pkg/agentic/status_test.go | 14 +++--- pkg/agentic/verify.go | 8 ++-- pkg/agentic/watch.go | 4 +- pkg/monitor/monitor.go | 83 +++++++++++++++++++++++------------- pkg/monitor/monitor_test.go | 10 +++-- 14 files changed, 101 insertions(+), 71 deletions(-) diff --git a/pkg/agentic/auto_pr.go b/pkg/agentic/auto_pr.go index 25ce776..c2471bd 100644 --- a/pkg/agentic/auto_pr.go +++ b/pkg/agentic/auto_pr.go @@ -13,7 +13,7 @@ import ( // autoCreatePR pushes the agent's branch and creates a PR on Forge // if the agent made any commits beyond the initial clone. func (s *PrepSubsystem) autoCreatePR(wsDir string) { - st, err := readStatus(wsDir) + st, err := ReadStatus(wsDir) if err != nil || st.Branch == "" || st.Repo == "" { return } @@ -44,7 +44,7 @@ func (s *PrepSubsystem) autoCreatePR(wsDir string) { pushCmd.Dir = repoDir if pushErr := pushCmd.Run(); pushErr != nil { // Push failed — update status with error but don't block - if st2, err := readStatus(wsDir); err == nil { + if st2, err := ReadStatus(wsDir); err == nil { st2.Question = core.Sprintf("PR push failed: %v", pushErr) writeStatus(wsDir, st2) } @@ -60,7 +60,7 @@ func (s *PrepSubsystem) autoCreatePR(wsDir string) { prURL, _, err := s.forgeCreatePR(ctx, org, st.Repo, st.Branch, base, title, body) if err != nil { - if st2, err := readStatus(wsDir); err == nil { + if st2, err := ReadStatus(wsDir); err == nil { st2.Question = core.Sprintf("PR creation failed: %v", err) writeStatus(wsDir, st2) } @@ -68,7 +68,7 @@ func (s *PrepSubsystem) autoCreatePR(wsDir string) { } // Update status with PR URL - if st2, err := readStatus(wsDir); err == nil { + if st2, err := ReadStatus(wsDir); err == nil { st2.PRURL = prURL writeStatus(wsDir, st2) } diff --git a/pkg/agentic/dispatch.go b/pkg/agentic/dispatch.go index aff3aa0..e85eddf 100644 --- a/pkg/agentic/dispatch.go +++ b/pkg/agentic/dispatch.go @@ -156,7 +156,7 @@ func containerCommand(agentType, command string, args []string, repoDir, metaDir "-v", metaDir + ":/workspace/.meta", "-w", "/workspace", // Auth: agent configs only — NO SSH keys, git push runs on host - "-v", core.JoinPath(home, ".codex") + ":/root/.codex:ro", + "-v", core.JoinPath(home, ".codex") + ":/home/dev/.codex:ro", // API keys — passed by name, Docker resolves from host env "-e", "OPENAI_API_KEY", "-e", "ANTHROPIC_API_KEY", @@ -175,14 +175,14 @@ func containerCommand(agentType, command string, args []string, repoDir, metaDir // Mount Claude config if dispatching claude agent if command == "claude" { dockerArgs = append(dockerArgs, - "-v", core.JoinPath(home, ".claude")+":/root/.claude:ro", + "-v", core.JoinPath(home, ".claude")+":/home/dev/.claude:ro", ) } // Mount Gemini config if dispatching gemini agent if command == "gemini" { dockerArgs = append(dockerArgs, - "-v", core.JoinPath(home, ".gemini")+":/root/.gemini:ro", + "-v", core.JoinPath(home, ".gemini")+":/home/dev/.gemini:ro", ) } @@ -228,7 +228,7 @@ func (s *PrepSubsystem) spawnAgent(agent, prompt, wsDir string) (int, string, er // Notify monitor directly — no filesystem polling if s.onComplete != nil { - st, _ := readStatus(wsDir) + st, _ := ReadStatus(wsDir) repo := "" if st != nil { repo = st.Repo @@ -238,7 +238,7 @@ func (s *PrepSubsystem) spawnAgent(agent, prompt, wsDir string) (int, string, er emitStartEvent(agent, core.PathBase(wsDir)) // audit log // Start Forge stopwatch on the issue (time tracking) - if st, _ := readStatus(wsDir); st != nil && st.Issue > 0 { + if st, _ := ReadStatus(wsDir); st != nil && st.Issue > 0 { org := st.Org if org == "" { org = "core" @@ -281,7 +281,7 @@ func (s *PrepSubsystem) spawnAgent(agent, prompt, wsDir string) (int, string, er } } - if st, stErr := readStatus(wsDir); stErr == nil { + if st, stErr := ReadStatus(wsDir); stErr == nil { st.Status = finalStatus st.PID = 0 st.Question = question @@ -293,7 +293,7 @@ func (s *PrepSubsystem) spawnAgent(agent, prompt, wsDir string) (int, string, er // Rate-limit detection: if agent failed fast (<60s), track consecutive failures pool := baseAgent(agent) if finalStatus == "failed" { - if st, _ := readStatus(wsDir); st != nil { + if st, _ := ReadStatus(wsDir); st != nil { elapsed := time.Since(st.StartedAt) if elapsed < 60*time.Second { s.failCount[pool]++ @@ -310,7 +310,7 @@ func (s *PrepSubsystem) spawnAgent(agent, prompt, wsDir string) (int, string, er } // Stop Forge stopwatch on the issue (time tracking) - if st, _ := readStatus(wsDir); st != nil && st.Issue > 0 { + if st, _ := ReadStatus(wsDir); st != nil && st.Issue > 0 { org := st.Org if org == "" { org = "core" @@ -320,7 +320,7 @@ func (s *PrepSubsystem) spawnAgent(agent, prompt, wsDir string) (int, string, er // Push notification directly — no filesystem polling if s.onComplete != nil { - stNow, _ := readStatus(wsDir) + stNow, _ := ReadStatus(wsDir) repoName := "" if stNow != nil { repoName = stNow.Repo @@ -333,7 +333,7 @@ func (s *PrepSubsystem) spawnAgent(agent, prompt, wsDir string) (int, string, er if !s.runQA(wsDir) { finalStatus = "failed" question = "QA check failed — build or tests did not pass" - if st, stErr := readStatus(wsDir); stErr == nil { + if st, stErr := ReadStatus(wsDir); stErr == nil { st.Status = finalStatus st.Question = question writeStatus(wsDir, st) diff --git a/pkg/agentic/dispatch_sync.go b/pkg/agentic/dispatch_sync.go index df0ceba..fb0776d 100644 --- a/pkg/agentic/dispatch_sync.go +++ b/pkg/agentic/dispatch_sync.go @@ -82,7 +82,7 @@ func (s *PrepSubsystem) DispatchSync(ctx context.Context, input DispatchSyncInpu case <-ticker.C: if pid > 0 && syscall.Kill(pid, 0) != nil { // Process exited — read final status - st, err := readStatus(wsDir) + st, err := ReadStatus(wsDir) if err != nil { return DispatchSyncResult{Error: "can't read final status"} } diff --git a/pkg/agentic/ingest.go b/pkg/agentic/ingest.go index d033258..ed5edb3 100644 --- a/pkg/agentic/ingest.go +++ b/pkg/agentic/ingest.go @@ -13,7 +13,7 @@ import ( // ingestFindings reads the agent output log and creates issues via the API // for scan/audit results. Only runs for conventions and security templates. func (s *PrepSubsystem) ingestFindings(wsDir string) { - st, err := readStatus(wsDir) + st, err := ReadStatus(wsDir) if err != nil || st.Status != "completed" { return } diff --git a/pkg/agentic/pr.go b/pkg/agentic/pr.go index b564459..2cc298a 100644 --- a/pkg/agentic/pr.go +++ b/pkg/agentic/pr.go @@ -61,7 +61,7 @@ func (s *PrepSubsystem) createPR(ctx context.Context, _ *mcp.CallToolRequest, in } // Read workspace status for repo, branch, issue context - st, err := readStatus(wsDir) + st, err := ReadStatus(wsDir) if err != nil { return nil, CreatePROutput{}, core.E("createPR", "no status.json", err) } diff --git a/pkg/agentic/queue.go b/pkg/agentic/queue.go index 9e2003c..52a5669 100644 --- a/pkg/agentic/queue.go +++ b/pkg/agentic/queue.go @@ -126,7 +126,7 @@ func (s *PrepSubsystem) countRunningByAgent(agent string) int { count := 0 for _, statusPath := range paths { - st, err := readStatus(core.PathDir(statusPath)) + st, err := ReadStatus(core.PathDir(statusPath)) if err != nil || st.Status != "running" { continue } @@ -188,7 +188,7 @@ func (s *PrepSubsystem) drainOne() bool { for _, statusPath := range statusFiles { wsDir := core.PathDir(statusPath) - st, err := readStatus(wsDir) + st, err := ReadStatus(wsDir) if err != nil || st.Status != "queued" { continue } diff --git a/pkg/agentic/resume.go b/pkg/agentic/resume.go index c4b5cef..ac90624 100644 --- a/pkg/agentic/resume.go +++ b/pkg/agentic/resume.go @@ -52,7 +52,7 @@ func (s *PrepSubsystem) resume(ctx context.Context, _ *mcp.CallToolRequest, inpu } // Read current status - st, err := readStatus(wsDir) + st, err := ReadStatus(wsDir) if err != nil { return nil, ResumeOutput{}, core.E("resume", "no status.json in workspace", err) } diff --git a/pkg/agentic/shutdown.go b/pkg/agentic/shutdown.go index 220e351..97d3fdc 100644 --- a/pkg/agentic/shutdown.go +++ b/pkg/agentic/shutdown.go @@ -81,7 +81,7 @@ func (s *PrepSubsystem) shutdownNow(ctx context.Context, _ *mcp.CallToolRequest, for _, statusPath := range statusFiles { wsDir := core.PathDir(statusPath) - st, err := readStatus(wsDir) + st, err := ReadStatus(wsDir) if err != nil { continue } diff --git a/pkg/agentic/status.go b/pkg/agentic/status.go index 0258f61..c2095e2 100644 --- a/pkg/agentic/status.go +++ b/pkg/agentic/status.go @@ -27,7 +27,7 @@ import ( // WorkspaceStatus represents the current state of an agent workspace. // -// st, err := readStatus(wsDir) +// st, err := ReadStatus(wsDir) // if err == nil && st.Status == "completed" { autoCreatePR(wsDir) } type WorkspaceStatus struct { Status string `json:"status"` // running, completed, blocked, failed @@ -58,10 +58,13 @@ func writeStatus(wsDir string, status *WorkspaceStatus) error { return nil } -func readStatus(wsDir string) (*WorkspaceStatus, error) { +// ReadStatus parses the status.json in a workspace directory. +// +// st, err := agentic.ReadStatus("/path/to/workspace") +func ReadStatus(wsDir string) (*WorkspaceStatus, error) { r := fs.Read(core.JoinPath(wsDir, "status.json")) if !r.OK { - return nil, core.E("readStatus", "status not found", nil) + return nil, core.E("ReadStatus", "status not found", nil) } var s WorkspaceStatus if err := json.Unmarshal([]byte(r.Value.(string)), &s); err != nil { @@ -125,7 +128,7 @@ func (s *PrepSubsystem) status(ctx context.Context, _ *mcp.CallToolRequest, inpu wsDir := core.PathDir(statusPath) name := wsDir[len(wsRoot)+1:] - st, err := readStatus(wsDir) + st, err := ReadStatus(wsDir) if err != nil { out.Total++ out.Failed++ diff --git a/pkg/agentic/status_test.go b/pkg/agentic/status_test.go index 9130529..94cff9d 100644 --- a/pkg/agentic/status_test.go +++ b/pkg/agentic/status_test.go @@ -77,7 +77,7 @@ func TestReadStatus_Good(t *testing.T) { require.NoError(t, err) require.True(t, fs.Write(filepath.Join(dir, "status.json"), string(data)).OK) - read, err := readStatus(dir) + read, err := ReadStatus(dir) require.NoError(t, err) assert.Equal(t, "completed", read.Status) @@ -91,7 +91,7 @@ func TestReadStatus_Good(t *testing.T) { func TestReadStatus_Bad_NoFile(t *testing.T) { dir := t.TempDir() - _, err := readStatus(dir) + _, err := ReadStatus(dir) assert.Error(t, err) } @@ -99,7 +99,7 @@ func TestReadStatus_Bad_InvalidJSON(t *testing.T) { dir := t.TempDir() require.True(t, fs.Write(filepath.Join(dir, "status.json"), "not json{").OK) - _, err := readStatus(dir) + _, err := ReadStatus(dir) assert.Error(t, err) } @@ -117,7 +117,7 @@ func TestReadStatus_Good_BlockedWithQuestion(t *testing.T) { require.NoError(t, err) require.True(t, fs.Write(filepath.Join(dir, "status.json"), string(data)).OK) - read, err := readStatus(dir) + read, err := ReadStatus(dir) require.NoError(t, err) assert.Equal(t, "blocked", read.Status) @@ -143,7 +143,7 @@ func TestWriteReadStatus_Good_Roundtrip(t *testing.T) { err := writeStatus(dir, original) require.NoError(t, err) - read, err := readStatus(dir) + read, err := ReadStatus(dir) require.NoError(t, err) assert.Equal(t, original.Status, read.Status) @@ -168,7 +168,7 @@ func TestWriteStatus_Good_OverwriteExisting(t *testing.T) { err = writeStatus(dir, second) require.NoError(t, err) - read, err := readStatus(dir) + read, err := ReadStatus(dir) require.NoError(t, err) assert.Equal(t, "completed", read.Status) } @@ -177,6 +177,6 @@ func TestReadStatus_Ugly_EmptyFile(t *testing.T) { dir := t.TempDir() require.True(t, fs.Write(filepath.Join(dir, "status.json"), "").OK) - _, err := readStatus(dir) + _, err := ReadStatus(dir) assert.Error(t, err) } diff --git a/pkg/agentic/verify.go b/pkg/agentic/verify.go index d98e376..487bee9 100644 --- a/pkg/agentic/verify.go +++ b/pkg/agentic/verify.go @@ -22,7 +22,7 @@ import ( // // agentic_dispatch repo=go-crypt template=verify persona=engineering/engineering-security-engineer func (s *PrepSubsystem) autoVerifyAndMerge(wsDir string) { - st, err := readStatus(wsDir) + st, err := ReadStatus(wsDir) if err != nil || st.PRURL == "" || st.Repo == "" { return } @@ -40,7 +40,7 @@ func (s *PrepSubsystem) autoVerifyAndMerge(wsDir string) { // markMerged is a helper to avoid repeating the status update. markMerged := func() { - if st2, err := readStatus(wsDir); err == nil { + if st2, err := ReadStatus(wsDir); err == nil { st2.Status = "merged" writeStatus(wsDir, st2) } @@ -66,7 +66,7 @@ func (s *PrepSubsystem) autoVerifyAndMerge(wsDir string) { // Both attempts failed — flag for human review s.flagForReview(org, st.Repo, prNum, result) - if st2, err := readStatus(wsDir); err == nil { + if st2, err := ReadStatus(wsDir); err == nil { st2.Question = "Flagged for review — auto-merge failed after retry" writeStatus(wsDir, st2) } @@ -129,7 +129,7 @@ func (s *PrepSubsystem) rebaseBranch(repoDir, branch string) bool { } // Force-push the rebased branch to Forge (origin is local clone) - st, _ := readStatus(core.PathDir(repoDir)) + st, _ := ReadStatus(core.PathDir(repoDir)) org := "core" repo := "" if st != nil { diff --git a/pkg/agentic/watch.go b/pkg/agentic/watch.go index ac10bff..ba82b9a 100644 --- a/pkg/agentic/watch.go +++ b/pkg/agentic/watch.go @@ -109,7 +109,7 @@ func (s *PrepSubsystem) watch(ctx context.Context, req *mcp.CallToolRequest, inp for ws := range remaining { wsDir := s.resolveWorkspaceDir(ws) - st, err := readStatus(wsDir) + st, err := ReadStatus(wsDir) if err != nil { continue } @@ -196,7 +196,7 @@ func (s *PrepSubsystem) findActiveWorkspaces() []string { var active []string for _, entry := range entries { wsDir := core.PathDir(entry) - st, err := readStatus(wsDir) + st, err := ReadStatus(wsDir) if err != nil { continue } diff --git a/pkg/monitor/monitor.go b/pkg/monitor/monitor.go index 6562eef..8820aeb 100644 --- a/pkg/monitor/monitor.go +++ b/pkg/monitor/monitor.go @@ -17,6 +17,7 @@ import ( "os" "path/filepath" "sync" + "syscall" "time" "dappco.re/go/agent/pkg/agentic" @@ -231,7 +232,7 @@ func (m *Subsystem) AgentStarted(agent, repo, workspace string) { } // AgentCompleted is called when an agent finishes. -// Only sends notifications for failures. Sends "queue.drained" when all work is done. +// Emits agent.completed for every finish, then checks if the queue is empty. // // mon.AgentCompleted("codex", "go-io", "core/go-io/task-5", "completed") func (m *Subsystem) AgentCompleted(agent, repo, workspace, status string) { @@ -240,53 +241,68 @@ func (m *Subsystem) AgentCompleted(agent, repo, workspace, status string) { m.mu.Unlock() if m.notifier != nil { - // Only notify on failures — those need attention - if status == "failed" || status == "blocked" { - m.notifier.ChannelSend(context.Background(), "agent.failed", map[string]any{ - "repo": repo, - "agent": agent, - "status": status, - }) - } + m.notifier.ChannelSend(context.Background(), "agent.completed", map[string]any{ + "repo": repo, + "agent": agent, + "workspace": workspace, + "status": status, + }) } - // Check if queue is drained (0 running + 0 queued) m.Poke() go m.checkIdleAfterDelay() } -// checkIdleAfterDelay waits briefly then checks if the fleet is idle. -// Sends a single "queue.drained" notification when all work stops. +// checkIdleAfterDelay waits briefly then checks if the fleet is genuinely idle. +// Only emits queue.drained when there are truly zero running or queued agents, +// verified by checking PIDs are alive, not just trusting status files. func (m *Subsystem) checkIdleAfterDelay() { - time.Sleep(5 * time.Second) // wait for runner to fill slots + time.Sleep(5 * time.Second) // wait for queue drain to fill slots if m.notifier == nil { return } - // Quick count — scan for running/queued - running := 0 - queued := 0 + running, queued := m.countLiveWorkspaces() + if running == 0 && queued == 0 { + m.notifier.ChannelSend(context.Background(), "queue.drained", map[string]any{ + "running": running, + "queued": queued, + }) + } +} + +// countLiveWorkspaces counts workspaces that are genuinely active. +// For "running" status, verifies the PID is still alive. +func (m *Subsystem) countLiveWorkspaces() (running, queued int) { wsRoot := agentic.WorkspaceRoot() old := core.PathGlob(core.JoinPath(wsRoot, "*", "status.json")) deep := core.PathGlob(core.JoinPath(wsRoot, "*", "*", "*", "status.json")) for _, path := range append(old, deep...) { - r := fs.Read(path) - if !r.OK { + wsDir := core.PathDir(path) + st, err := agentic.ReadStatus(wsDir) + if err != nil { continue } - s := r.Value.(string) - if core.Contains(s, `"status":"running"`) { - running++ - } else if core.Contains(s, `"status":"queued"`) { + switch st.Status { + case "running": + if st.PID > 0 && pidAlive(st.PID) { + running++ + } + case "queued": queued++ } } + return +} - if running == 0 && queued == 0 { - m.notifier.ChannelSend(context.Background(), "queue.drained", map[string]any{ - "message": "all work complete", - }) +// pidAlive checks whether a process is still running. +func pidAlive(pid int) bool { + proc, err := os.FindProcess(pid) + if err != nil { + return false } + err = proc.Signal(syscall.Signal(0)) + return err == nil } func (m *Subsystem) loop(ctx context.Context) { @@ -430,11 +446,20 @@ func (m *Subsystem) checkCompletions() string { return "" } - // Only notify on queue drain (0 running + 0 queued) — individual completions are noise - if m.notifier != nil && running == 0 && queued == 0 { + // Emit agent.completed for each newly finished task + if m.notifier != nil { + for _, desc := range newlyCompleted { + m.notifier.ChannelSend(context.Background(), "agent.completed", map[string]any{ + "description": desc, + }) + } + } + + // Only emit queue.drained when genuinely empty — verified by live PID check + liveRunning, liveQueued := m.countLiveWorkspaces() + if m.notifier != nil && liveRunning == 0 && liveQueued == 0 { m.notifier.ChannelSend(context.Background(), "queue.drained", map[string]any{ "completed": len(newlyCompleted), - "message": "all work complete", }) } diff --git a/pkg/monitor/monitor_test.go b/pkg/monitor/monitor_test.go index fb41d25..4f48f41 100644 --- a/pkg/monitor/monitor_test.go +++ b/pkg/monitor/monitor_test.go @@ -146,10 +146,12 @@ func TestCheckCompletions_Good_NewCompletions(t *testing.T) { assert.Contains(t, msg, "2 agent(s) completed") events := notifier.Events() - require.Len(t, events, 1) - assert.Equal(t, "agent.complete", events[0].channel) - eventData := events[0].data.(map[string]any) - assert.Equal(t, 2, eventData["count"]) + require.Len(t, events, 3) // 2 agent.completed + 1 queue.drained + assert.Equal(t, "agent.completed", events[0].channel) + assert.Equal(t, "agent.completed", events[1].channel) + assert.Equal(t, "queue.drained", events[2].channel) + drainData := events[2].data.(map[string]any) + assert.Equal(t, 2, drainData["completed"]) } func TestCheckCompletions_Good_MixedStatuses(t *testing.T) {