diff --git a/pkg/agentic/dispatch.go b/pkg/agentic/dispatch.go index 3321752..b744d3e 100644 --- a/pkg/agentic/dispatch.go +++ b/pkg/agentic/dispatch.go @@ -193,6 +193,147 @@ func containerCommand(agentType, command string, args []string, repoDir, metaDir return "docker", dockerArgs } +// --- spawnAgent: decomposed into testable steps --- + +// agentOutputFile returns the log file path for an agent's output. +func agentOutputFile(wsDir, agent string) string { + agentBase := core.SplitN(agent, ":", 2)[0] + return core.JoinPath(wsDir, ".meta", core.Sprintf("agent-%s.log", agentBase)) +} + +// detectFinalStatus reads workspace state after agent exit to determine outcome. +// Returns (status, question) — "completed", "blocked", or "failed". +func detectFinalStatus(repoDir string, exitCode int, procStatus string) (string, string) { + blockedPath := core.JoinPath(repoDir, "BLOCKED.md") + if r := fs.Read(blockedPath); r.OK && core.Trim(r.Value.(string)) != "" { + return "blocked", core.Trim(r.Value.(string)) + } + if exitCode != 0 || procStatus == "failed" || procStatus == "killed" { + question := "" + if exitCode != 0 { + question = core.Sprintf("Agent exited with code %d", exitCode) + } + return "failed", question + } + return "completed", "" +} + +// trackFailureRate detects fast consecutive failures and applies backoff. +// Returns true if backoff was triggered. +func (s *PrepSubsystem) trackFailureRate(agent, status string, startedAt time.Time) bool { + pool := baseAgent(agent) + if status == "failed" { + elapsed := time.Since(startedAt) + if elapsed < 60*time.Second { + s.failCount[pool]++ + if s.failCount[pool] >= 3 { + s.backoff[pool] = time.Now().Add(30 * time.Minute) + core.Print(nil, "rate-limit detected for %s — pausing pool for 30 minutes", pool) + return true + } + } else { + s.failCount[pool] = 0 // slow failure = real failure, reset count + } + } else { + s.failCount[pool] = 0 // success resets count + } + return false +} + +// startIssueTracking starts a Forge stopwatch on the workspace's issue. +func (s *PrepSubsystem) startIssueTracking(wsDir string) { + if s.forge == nil { + return + } + st, _ := ReadStatus(wsDir) + if st == nil || st.Issue == 0 { + return + } + org := st.Org + if org == "" { + org = "core" + } + s.forge.Issues.StartStopwatch(context.Background(), org, st.Repo, int64(st.Issue)) +} + +// stopIssueTracking stops a Forge stopwatch on the workspace's issue. +func (s *PrepSubsystem) stopIssueTracking(wsDir string) { + if s.forge == nil { + return + } + st, _ := ReadStatus(wsDir) + if st == nil || st.Issue == 0 { + return + } + org := st.Org + if org == "" { + org = "core" + } + s.forge.Issues.StopStopwatch(context.Background(), org, st.Repo, int64(st.Issue)) +} + +// broadcastStart emits IPC + audit events for agent start. +func (s *PrepSubsystem) broadcastStart(agent, wsDir string) { + if s.core != nil { + st, _ := ReadStatus(wsDir) + repo := "" + if st != nil { + repo = st.Repo + } + s.core.ACTION(messages.AgentStarted{ + Agent: agent, Repo: repo, Workspace: core.PathBase(wsDir), + }) + } + emitStartEvent(agent, core.PathBase(wsDir)) +} + +// broadcastComplete emits IPC + audit events for agent completion. +func (s *PrepSubsystem) broadcastComplete(agent, wsDir, finalStatus string) { + emitCompletionEvent(agent, core.PathBase(wsDir), finalStatus) + if s.core != nil { + st, _ := ReadStatus(wsDir) + repo := "" + if st != nil { + repo = st.Repo + } + s.core.ACTION(messages.AgentCompleted{ + Agent: agent, Repo: repo, + Workspace: core.PathBase(wsDir), Status: finalStatus, + }) + } +} + +// onAgentComplete handles all post-completion logic for a spawned agent. +// Called from the monitoring goroutine after the process exits. +func (s *PrepSubsystem) onAgentComplete(agent, wsDir, outputFile string, exitCode int, procStatus, output string) { + // Save output + if output != "" { + fs.Write(outputFile, output) + } + + repoDir := core.JoinPath(wsDir, "repo") + finalStatus, question := detectFinalStatus(repoDir, exitCode, procStatus) + + // Update workspace status + if st, err := ReadStatus(wsDir); err == nil { + st.Status = finalStatus + st.PID = 0 + st.Question = question + writeStatus(wsDir, st) + } + + // Rate-limit tracking + if st, _ := ReadStatus(wsDir); st != nil { + s.trackFailureRate(agent, finalStatus, st.StartedAt) + } + + // Forge time tracking + s.stopIssueTracking(wsDir) + + // Broadcast completion + s.broadcastComplete(agent, wsDir, finalStatus) +} + // spawnAgent launches an agent inside a Docker container. // The repo/ directory is mounted at /workspace, agent runs sandboxed. // Output is captured and written to .meta/agent-{agent}.log on completion. @@ -204,14 +345,13 @@ func (s *PrepSubsystem) spawnAgent(agent, prompt, wsDir string) (int, string, er repoDir := core.JoinPath(wsDir, "repo") metaDir := core.JoinPath(wsDir, ".meta") - // Use base agent name for log file — colon in variants breaks paths - agentBase := core.SplitN(agent, ":", 2)[0] - outputFile := core.JoinPath(metaDir, core.Sprintf("agent-%s.log", agentBase)) + outputFile := agentOutputFile(wsDir, agent) // Clean up stale BLOCKED.md from previous runs fs.Delete(core.JoinPath(repoDir, "BLOCKED.md")) // All agents run containerised + agentBase := core.SplitN(agent, ":", 2)[0] command, args = containerCommand(agentBase, command, args, repoDir, metaDir) proc, err := process.StartWithOptions(context.Background(), process.RunOptions{ @@ -227,27 +367,8 @@ func (s *PrepSubsystem) spawnAgent(agent, prompt, wsDir string) (int, string, er proc.CloseStdin() pid := proc.Info().PID - // Broadcast agent started via Core IPC - if s.core != nil { - st, _ := ReadStatus(wsDir) - repo := "" - if st != nil { - repo = st.Repo - } - s.core.ACTION(messages.AgentStarted{ - Agent: agent, Repo: repo, Workspace: core.PathBase(wsDir), - }) - } - emitStartEvent(agent, core.PathBase(wsDir)) // audit log - - // Start Forge stopwatch on the issue (time tracking) - if st, _ := ReadStatus(wsDir); st != nil && st.Issue > 0 { - org := st.Org - if org == "" { - org = "core" - } - s.forge.Issues.StartStopwatch(context.Background(), org, st.Repo, int64(st.Issue)) - } + s.broadcastStart(agent, wsDir) + s.startIssueTracking(wsDir) go func() { ticker := time.NewTicker(5 * time.Second) @@ -263,81 +384,8 @@ func (s *PrepSubsystem) spawnAgent(agent, prompt, wsDir string) (int, string, er } } done: - - if output := proc.Output(); output != "" { - fs.Write(outputFile, output) - } - - finalStatus := "completed" - exitCode := proc.Info().ExitCode - procStatus := proc.Info().Status - question := "" - - blockedPath := core.JoinPath(repoDir, "BLOCKED.md") - if r := fs.Read(blockedPath); r.OK && core.Trim(r.Value.(string)) != "" { - finalStatus = "blocked" - question = core.Trim(r.Value.(string)) - } else if exitCode != 0 || procStatus == "failed" || procStatus == "killed" { - finalStatus = "failed" - if exitCode != 0 { - question = core.Sprintf("Agent exited with code %d", exitCode) - } - } - - if st, stErr := ReadStatus(wsDir); stErr == nil { - st.Status = finalStatus - st.PID = 0 - st.Question = question - writeStatus(wsDir, st) - } - - emitCompletionEvent(agent, core.PathBase(wsDir), finalStatus) // audit log - - // Rate-limit detection: if agent failed fast (<60s), track consecutive failures - pool := baseAgent(agent) - if finalStatus == "failed" { - if st, _ := ReadStatus(wsDir); st != nil { - elapsed := time.Since(st.StartedAt) - if elapsed < 60*time.Second { - s.failCount[pool]++ - if s.failCount[pool] >= 3 { - s.backoff[pool] = time.Now().Add(30 * time.Minute) - core.Print(nil, "rate-limit detected for %s — pausing pool for 30 minutes", pool) - } - } else { - s.failCount[pool] = 0 // slow failure = real failure, reset count - } - } - } else { - s.failCount[pool] = 0 // success resets count - } - - // Stop Forge stopwatch on the issue (time tracking) - if st, _ := ReadStatus(wsDir); st != nil && st.Issue > 0 { - org := st.Org - if org == "" { - org = "core" - } - s.forge.Issues.StopStopwatch(context.Background(), org, st.Repo, int64(st.Issue)) - } - - // Broadcast agent completed via Core IPC - if s.core != nil { - stNow, _ := ReadStatus(wsDir) - repoName := "" - if stNow != nil { - repoName = stNow.Repo - } - s.core.ACTION(messages.AgentCompleted{ - Agent: agent, Repo: repoName, - Workspace: core.PathBase(wsDir), Status: finalStatus, - }) - } - - // Post-completion pipeline handled by IPC handlers: - // AgentCompleted → QA → PRCreated → Verify → PRMerged|PRNeedsReview - // AgentCompleted → Ingest - // AgentCompleted → Poke + s.onAgentComplete(agent, wsDir, outputFile, + proc.Info().ExitCode, string(proc.Info().Status), proc.Output()) }() return pid, outputFile, nil @@ -367,7 +415,6 @@ func (s *PrepSubsystem) runQA(wsDir string) bool { } if fs.IsFile(core.JoinPath(repoDir, "composer.json")) { - // PHP: composer install + test install := exec.Command("composer", "install", "--no-interaction") install.Dir = repoDir if err := install.Run(); err != nil { @@ -379,7 +426,6 @@ func (s *PrepSubsystem) runQA(wsDir string) bool { } if fs.IsFile(core.JoinPath(repoDir, "package.json")) { - // Node: npm install + test install := exec.Command("npm", "install") install.Dir = repoDir if err := install.Run(); err != nil { diff --git a/pkg/agentic/dispatch_extra_test.go b/pkg/agentic/dispatch_extra_test.go new file mode 100644 index 0000000..cf2b214 --- /dev/null +++ b/pkg/agentic/dispatch_extra_test.go @@ -0,0 +1,313 @@ +// SPDX-License-Identifier: EUPL-1.2 + +package agentic + +import ( + "encoding/json" + "os" + "path/filepath" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +// --- agentOutputFile --- + +func TestAgentOutputFile_Good(t *testing.T) { + assert.Contains(t, agentOutputFile("/ws", "codex"), ".meta/agent-codex.log") + assert.Contains(t, agentOutputFile("/ws", "claude:opus"), ".meta/agent-claude.log") + assert.Contains(t, agentOutputFile("/ws", "gemini:flash"), ".meta/agent-gemini.log") +} + +// --- detectFinalStatus --- + +func TestDetectFinalStatus_Good_Completed(t *testing.T) { + dir := t.TempDir() + status, question := detectFinalStatus(dir, 0, "completed") + assert.Equal(t, "completed", status) + assert.Empty(t, question) +} + +func TestDetectFinalStatus_Good_Blocked(t *testing.T) { + dir := t.TempDir() + os.WriteFile(filepath.Join(dir, "BLOCKED.md"), []byte("Need API key for external service"), 0o644) + + status, question := detectFinalStatus(dir, 0, "completed") + assert.Equal(t, "blocked", status) + assert.Equal(t, "Need API key for external service", question) +} + +func TestDetectFinalStatus_Good_BlockedEmpty(t *testing.T) { + dir := t.TempDir() + // BLOCKED.md exists but is empty — should NOT be treated as blocked + os.WriteFile(filepath.Join(dir, "BLOCKED.md"), []byte(" \n "), 0o644) + + status, _ := detectFinalStatus(dir, 0, "completed") + assert.Equal(t, "completed", status) +} + +func TestDetectFinalStatus_Good_FailedExitCode(t *testing.T) { + dir := t.TempDir() + status, question := detectFinalStatus(dir, 1, "completed") + assert.Equal(t, "failed", status) + assert.Contains(t, question, "code 1") +} + +func TestDetectFinalStatus_Good_FailedKilled(t *testing.T) { + dir := t.TempDir() + status, _ := detectFinalStatus(dir, 0, "killed") + assert.Equal(t, "failed", status) +} + +func TestDetectFinalStatus_Good_FailedStatus(t *testing.T) { + dir := t.TempDir() + status, _ := detectFinalStatus(dir, 0, "failed") + assert.Equal(t, "failed", status) +} + +func TestDetectFinalStatus_Good_BlockedTakesPrecedence(t *testing.T) { + dir := t.TempDir() + // Agent wrote BLOCKED.md AND exited non-zero — blocked takes precedence + os.WriteFile(filepath.Join(dir, "BLOCKED.md"), []byte("Need help"), 0o644) + + status, question := detectFinalStatus(dir, 1, "failed") + assert.Equal(t, "blocked", status) + assert.Equal(t, "Need help", question) +} + +// --- trackFailureRate --- + +func TestTrackFailureRate_Good_SuccessResetsCount(t *testing.T) { + s := &PrepSubsystem{ + backoff: make(map[string]time.Time), + failCount: map[string]int{"codex": 2}, + } + triggered := s.trackFailureRate("codex", "completed", time.Now().Add(-10*time.Second)) + assert.False(t, triggered) + assert.Equal(t, 0, s.failCount["codex"]) +} + +func TestTrackFailureRate_Good_SlowFailureResetsCount(t *testing.T) { + s := &PrepSubsystem{ + backoff: make(map[string]time.Time), + failCount: map[string]int{"codex": 2}, + } + // Started 5 minutes ago = slow failure + triggered := s.trackFailureRate("codex", "failed", time.Now().Add(-5*time.Minute)) + assert.False(t, triggered) + assert.Equal(t, 0, s.failCount["codex"]) +} + +func TestTrackFailureRate_Good_FastFailureIncrementsCount(t *testing.T) { + s := &PrepSubsystem{ + backoff: make(map[string]time.Time), + failCount: make(map[string]int), + } + // Started 10 seconds ago = fast failure + triggered := s.trackFailureRate("codex", "failed", time.Now().Add(-10*time.Second)) + assert.False(t, triggered) + assert.Equal(t, 1, s.failCount["codex"]) +} + +func TestTrackFailureRate_Good_ThreeFailsTriggersBackoff(t *testing.T) { + s := &PrepSubsystem{ + backoff: make(map[string]time.Time), + failCount: map[string]int{"codex": 2}, // already 2 fast failures + } + triggered := s.trackFailureRate("codex", "failed", time.Now().Add(-10*time.Second)) + assert.True(t, triggered) + assert.True(t, time.Now().Before(s.backoff["codex"])) +} + +func TestTrackFailureRate_Good_ModelVariantUsesPool(t *testing.T) { + s := &PrepSubsystem{ + backoff: make(map[string]time.Time), + failCount: make(map[string]int), + } + s.trackFailureRate("codex:gpt-5.4", "failed", time.Now().Add(-10*time.Second)) + assert.Equal(t, 1, s.failCount["codex"], "should track by base agent pool") +} + +// --- startIssueTracking / stopIssueTracking --- + +func TestStartIssueTracking_Good_NoForge(t *testing.T) { + s := &PrepSubsystem{ + forge: nil, + backoff: make(map[string]time.Time), + failCount: make(map[string]int), + } + // Should not panic + s.startIssueTracking(t.TempDir()) +} + +func TestStopIssueTracking_Good_NoForge(t *testing.T) { + s := &PrepSubsystem{ + forge: nil, + backoff: make(map[string]time.Time), + failCount: make(map[string]int), + } + s.stopIssueTracking(t.TempDir()) +} + +func TestStartIssueTracking_Good_NoIssue(t *testing.T) { + dir := t.TempDir() + st := &WorkspaceStatus{Status: "running", Repo: "test"} + data, _ := json.Marshal(st) + os.WriteFile(filepath.Join(dir, "status.json"), data, 0o644) + + s := &PrepSubsystem{ + forge: nil, + backoff: make(map[string]time.Time), + failCount: make(map[string]int), + } + s.startIssueTracking(dir) // no panic, no issue to track +} + +// --- broadcastStart / broadcastComplete --- + +func TestBroadcastStart_Good_NoCore(t *testing.T) { + dir := t.TempDir() + t.Setenv("CORE_WORKSPACE", dir) + + s := &PrepSubsystem{ + core: nil, + backoff: make(map[string]time.Time), + failCount: make(map[string]int), + } + // Should not panic even without Core + s.broadcastStart("codex", dir) +} + +func TestBroadcastComplete_Good_NoCore(t *testing.T) { + dir := t.TempDir() + t.Setenv("CORE_WORKSPACE", dir) + + s := &PrepSubsystem{ + core: nil, + backoff: make(map[string]time.Time), + failCount: make(map[string]int), + } + s.broadcastComplete("codex", dir, "completed") +} + +// --- onAgentComplete --- + +func TestOnAgentComplete_Good_Completed(t *testing.T) { + root := t.TempDir() + t.Setenv("CORE_WORKSPACE", root) + + wsDir := filepath.Join(root, "ws-test") + repoDir := filepath.Join(wsDir, "repo") + metaDir := filepath.Join(wsDir, ".meta") + os.MkdirAll(repoDir, 0o755) + os.MkdirAll(metaDir, 0o755) + + // Write initial status + st := &WorkspaceStatus{Status: "running", Repo: "go-io", Agent: "codex", StartedAt: time.Now()} + data, _ := json.Marshal(st) + os.WriteFile(filepath.Join(wsDir, "status.json"), data, 0o644) + + s := &PrepSubsystem{ + core: nil, + forge: nil, + backoff: make(map[string]time.Time), + failCount: make(map[string]int), + } + + outputFile := filepath.Join(metaDir, "agent-codex.log") + s.onAgentComplete("codex", wsDir, outputFile, 0, "completed", "test output") + + // Verify status was updated + updated, err := ReadStatus(wsDir) + assert.NoError(t, err) + assert.Equal(t, "completed", updated.Status) + assert.Equal(t, 0, updated.PID) + assert.Empty(t, updated.Question) + + // Verify output was written + content, _ := os.ReadFile(outputFile) + assert.Equal(t, "test output", string(content)) +} + +func TestOnAgentComplete_Good_Failed(t *testing.T) { + root := t.TempDir() + t.Setenv("CORE_WORKSPACE", root) + + wsDir := filepath.Join(root, "ws-fail") + repoDir := filepath.Join(wsDir, "repo") + metaDir := filepath.Join(wsDir, ".meta") + os.MkdirAll(repoDir, 0o755) + os.MkdirAll(metaDir, 0o755) + + st := &WorkspaceStatus{Status: "running", Repo: "go-io", Agent: "codex", StartedAt: time.Now()} + data, _ := json.Marshal(st) + os.WriteFile(filepath.Join(wsDir, "status.json"), data, 0o644) + + s := &PrepSubsystem{ + backoff: make(map[string]time.Time), + failCount: make(map[string]int), + } + + s.onAgentComplete("codex", wsDir, filepath.Join(metaDir, "agent-codex.log"), 1, "failed", "error output") + + updated, _ := ReadStatus(wsDir) + assert.Equal(t, "failed", updated.Status) + assert.Contains(t, updated.Question, "code 1") +} + +func TestOnAgentComplete_Good_Blocked(t *testing.T) { + root := t.TempDir() + t.Setenv("CORE_WORKSPACE", root) + + wsDir := filepath.Join(root, "ws-blocked") + repoDir := filepath.Join(wsDir, "repo") + metaDir := filepath.Join(wsDir, ".meta") + os.MkdirAll(repoDir, 0o755) + os.MkdirAll(metaDir, 0o755) + + // Create BLOCKED.md + os.WriteFile(filepath.Join(repoDir, "BLOCKED.md"), []byte("Need credentials"), 0o644) + + st := &WorkspaceStatus{Status: "running", Repo: "go-io", Agent: "codex", StartedAt: time.Now()} + data, _ := json.Marshal(st) + os.WriteFile(filepath.Join(wsDir, "status.json"), data, 0o644) + + s := &PrepSubsystem{ + backoff: make(map[string]time.Time), + failCount: make(map[string]int), + } + + s.onAgentComplete("codex", wsDir, filepath.Join(metaDir, "agent-codex.log"), 0, "completed", "") + + updated, _ := ReadStatus(wsDir) + assert.Equal(t, "blocked", updated.Status) + assert.Equal(t, "Need credentials", updated.Question) +} + +func TestOnAgentComplete_Good_EmptyOutput(t *testing.T) { + root := t.TempDir() + t.Setenv("CORE_WORKSPACE", root) + + wsDir := filepath.Join(root, "ws-empty") + repoDir := filepath.Join(wsDir, "repo") + metaDir := filepath.Join(wsDir, ".meta") + os.MkdirAll(repoDir, 0o755) + os.MkdirAll(metaDir, 0o755) + + st := &WorkspaceStatus{Status: "running", Repo: "test", Agent: "codex", StartedAt: time.Now()} + data, _ := json.Marshal(st) + os.WriteFile(filepath.Join(wsDir, "status.json"), data, 0o644) + + s := &PrepSubsystem{ + backoff: make(map[string]time.Time), + failCount: make(map[string]int), + } + + outputFile := filepath.Join(metaDir, "agent-codex.log") + s.onAgentComplete("codex", wsDir, outputFile, 0, "completed", "") + + // Output file should NOT be created for empty output + _, err := os.Stat(outputFile) + assert.True(t, os.IsNotExist(err)) +}