diff --git a/docs/RFC.md b/docs/RFC.md index 7536066..bd0aff9 100644 --- a/docs/RFC.md +++ b/docs/RFC.md @@ -423,6 +423,7 @@ Every exported function MUST have a usage-example comment: ## Changelog +- 2026-03-30: dispatch completion monitoring now uses a named helper instead of an inline Action closure, keeping the spawned-process finaliser AX-native. - 2026-03-30: lib task bundle and recursive embed traversal now use `JoinPath` for filesystem paths, removing the last string-concatenated path joins in `pkg/lib`. - 2026-03-30: runner workspace status projections now use explicit typed copies, and `ReadStatusResult` gained direct AX-7 coverage in both runner and agentic packages. - 2026-03-30: transport helpers preserve request and read causes, brain direct API calls surface upstream bodies, and review queue retry parsing no longer uses `MustCompile`. diff --git a/pkg/agentic/dispatch.go b/pkg/agentic/dispatch.go index 75f9397..b27a8b5 100644 --- a/pkg/agentic/dispatch.go +++ b/pkg/agentic/dispatch.go @@ -428,17 +428,51 @@ func (s *PrepSubsystem) spawnAgent(agent, prompt, wsDir string) (int, string, st // Register a one-shot Action that monitors this agent, then run it via PerformAsync. // PerformAsync tracks it in Core's WaitGroup — ServiceShutdown waits for it. monitorAction := core.Concat("agentic.monitor.", core.Replace(WorkspaceName(wsDir), "/", ".")) - s.Core().Action(monitorAction, func(_ context.Context, _ core.Options) core.Result { - <-proc.Done() - s.onAgentComplete(agent, wsDir, outputFile, - proc.Info().ExitCode, string(proc.Info().Status), proc.Output()) - return core.Result{OK: true} - }) + monitor := &agentCompletionMonitor{ + service: s, + agent: agent, + workspaceDir: wsDir, + outputFile: outputFile, + process: proc, + } + s.Core().Action(monitorAction, monitor.run) s.Core().PerformAsync(monitorAction, core.NewOptions()) return pid, processID, outputFile, nil } +type completionProcess interface { + Done() <-chan struct{} + Info() process.Info + Output() string +} + +// agentCompletionMonitor waits for a spawned process to finish, then finalises the workspace. +// +// monitor := &agentCompletionMonitor{service: s, agent: "codex", workspaceDir: wsDir, outputFile: outputFile, process: proc} +// s.Core().Action("agentic.monitor.core.go-io.task-5", monitor.run) +type agentCompletionMonitor struct { + service *PrepSubsystem + agent string + workspaceDir string + outputFile string + process completionProcess +} + +func (m *agentCompletionMonitor) run(_ context.Context, _ core.Options) core.Result { + if m == nil || m.service == nil { + return core.Result{Value: core.E("agentic.monitor", "service is required", nil), OK: false} + } + if m.process == nil { + return core.Result{Value: core.E("agentic.monitor", "process is required", nil), OK: false} + } + + <-m.process.Done() + info := m.process.Info() + m.service.onAgentComplete(m.agent, m.workspaceDir, m.outputFile, info.ExitCode, string(info.Status), m.process.Output()) + return core.Result{OK: true} +} + // runQA runs build + test checks on the repo after agent completion. // Returns true if QA passes, false if build or tests fail. func (s *PrepSubsystem) runQA(wsDir string) bool { diff --git a/pkg/agentic/dispatch_test.go b/pkg/agentic/dispatch_test.go index 74ea13a..e5affd9 100644 --- a/pkg/agentic/dispatch_test.go +++ b/pkg/agentic/dispatch_test.go @@ -11,10 +11,21 @@ import ( core "dappco.re/go/core" "dappco.re/go/core/forge" + "dappco.re/go/core/process" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) +type fakeCompletionProcess struct { + done chan struct{} + info process.Info + output string +} + +func (p *fakeCompletionProcess) Done() <-chan struct{} { return p.done } +func (p *fakeCompletionProcess) Info() process.Info { return p.info } +func (p *fakeCompletionProcess) Output() string { return p.output } + // --- agentCommand --- // Good: tested in logic_test.go (TestAgentCommand_Good_*) @@ -243,6 +254,104 @@ func TestDispatch_BroadcastComplete_Ugly(t *testing.T) { s.broadcastComplete("codex", t.TempDir(), "completed") } +// --- agentCompletionMonitor --- + +func TestDispatch_AgentCompletionMonitor_Good(t *testing.T) { + root := t.TempDir() + t.Setenv("CORE_WORKSPACE", root) + + wsDir := core.JoinPath(root, "ws-monitor") + repoDir := core.JoinPath(wsDir, "repo") + metaDir := core.JoinPath(wsDir, ".meta") + fs.EnsureDir(repoDir) + fs.EnsureDir(metaDir) + + st := &WorkspaceStatus{Status: "running", Repo: "go-io", Agent: "codex", StartedAt: time.Now()} + fs.Write(core.JoinPath(wsDir, "status.json"), core.JSONMarshalString(st)) + + proc := &fakeCompletionProcess{ + done: make(chan struct{}), + info: process.Info{ExitCode: 0, Status: process.Status("completed")}, + output: "monitor output", + } + close(proc.done) + + s := newPrepWithProcess() + monitor := &agentCompletionMonitor{ + service: s, + agent: "codex", + workspaceDir: wsDir, + outputFile: core.JoinPath(metaDir, "agent-codex.log"), + process: proc, + } + + r := monitor.run(context.Background(), core.NewOptions()) + assert.True(t, r.OK) + + updated, err := ReadStatus(wsDir) + require.NoError(t, err) + assert.Equal(t, "completed", updated.Status) + assert.Equal(t, 0, updated.PID) + + output := fs.Read(core.JoinPath(metaDir, "agent-codex.log")) + require.True(t, output.OK) + assert.Equal(t, "monitor output", output.Value.(string)) +} + +func TestDispatch_AgentCompletionMonitor_Bad(t *testing.T) { + s := newPrepWithProcess() + monitor := &agentCompletionMonitor{ + service: s, + agent: "codex", + workspaceDir: t.TempDir(), + } + + r := monitor.run(context.Background(), core.NewOptions()) + assert.False(t, r.OK) + require.Error(t, r.Value.(error)) + assert.Contains(t, r.Value.(error).Error(), "process is required") +} + +func TestDispatch_AgentCompletionMonitor_Ugly(t *testing.T) { + root := t.TempDir() + t.Setenv("CORE_WORKSPACE", root) + + wsDir := core.JoinPath(root, "ws-blocked") + repoDir := core.JoinPath(wsDir, "repo") + metaDir := core.JoinPath(wsDir, ".meta") + fs.EnsureDir(repoDir) + fs.EnsureDir(metaDir) + + fs.Write(core.JoinPath(repoDir, "BLOCKED.md"), "Need credentials") + st := &WorkspaceStatus{Status: "running", Repo: "go-io", Agent: "codex", StartedAt: time.Now()} + fs.Write(core.JoinPath(wsDir, "status.json"), core.JSONMarshalString(st)) + + proc := &fakeCompletionProcess{ + done: make(chan struct{}), + info: process.Info{ExitCode: 1, Status: process.Status("failed")}, + output: "", + } + close(proc.done) + + s := newPrepWithProcess() + monitor := &agentCompletionMonitor{ + service: s, + agent: "codex", + workspaceDir: wsDir, + outputFile: core.JoinPath(metaDir, "agent-codex.log"), + process: proc, + } + + r := monitor.run(context.Background(), core.NewOptions()) + assert.True(t, r.OK) + + updated, err := ReadStatus(wsDir) + require.NoError(t, err) + assert.Equal(t, "blocked", updated.Status) + assert.Equal(t, "Need credentials", updated.Question) + assert.False(t, fs.Exists(core.JoinPath(metaDir, "agent-codex.log"))) +} + // --- onAgentComplete --- func TestDispatch_OnAgentComplete_Good(t *testing.T) {