fix(ax): extract dispatch completion monitor
Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
parent
95c7df04da
commit
d4c82ccd56
3 changed files with 150 additions and 6 deletions
|
|
@ -423,6 +423,7 @@ Every exported function MUST have a usage-example comment:
|
|||
|
||||
## Changelog
|
||||
|
||||
- 2026-03-30: dispatch completion monitoring now uses a named helper instead of an inline Action closure, keeping the spawned-process finaliser AX-native.
|
||||
- 2026-03-30: lib task bundle and recursive embed traversal now use `JoinPath` for filesystem paths, removing the last string-concatenated path joins in `pkg/lib`.
|
||||
- 2026-03-30: runner workspace status projections now use explicit typed copies, and `ReadStatusResult` gained direct AX-7 coverage in both runner and agentic packages.
|
||||
- 2026-03-30: transport helpers preserve request and read causes, brain direct API calls surface upstream bodies, and review queue retry parsing no longer uses `MustCompile`.
|
||||
|
|
|
|||
|
|
@ -428,17 +428,51 @@ func (s *PrepSubsystem) spawnAgent(agent, prompt, wsDir string) (int, string, st
|
|||
// Register a one-shot Action that monitors this agent, then run it via PerformAsync.
|
||||
// PerformAsync tracks it in Core's WaitGroup — ServiceShutdown waits for it.
|
||||
monitorAction := core.Concat("agentic.monitor.", core.Replace(WorkspaceName(wsDir), "/", "."))
|
||||
s.Core().Action(monitorAction, func(_ context.Context, _ core.Options) core.Result {
|
||||
<-proc.Done()
|
||||
s.onAgentComplete(agent, wsDir, outputFile,
|
||||
proc.Info().ExitCode, string(proc.Info().Status), proc.Output())
|
||||
return core.Result{OK: true}
|
||||
})
|
||||
monitor := &agentCompletionMonitor{
|
||||
service: s,
|
||||
agent: agent,
|
||||
workspaceDir: wsDir,
|
||||
outputFile: outputFile,
|
||||
process: proc,
|
||||
}
|
||||
s.Core().Action(monitorAction, monitor.run)
|
||||
s.Core().PerformAsync(monitorAction, core.NewOptions())
|
||||
|
||||
return pid, processID, outputFile, nil
|
||||
}
|
||||
|
||||
type completionProcess interface {
|
||||
Done() <-chan struct{}
|
||||
Info() process.Info
|
||||
Output() string
|
||||
}
|
||||
|
||||
// agentCompletionMonitor waits for a spawned process to finish, then finalises the workspace.
|
||||
//
|
||||
// monitor := &agentCompletionMonitor{service: s, agent: "codex", workspaceDir: wsDir, outputFile: outputFile, process: proc}
|
||||
// s.Core().Action("agentic.monitor.core.go-io.task-5", monitor.run)
|
||||
type agentCompletionMonitor struct {
|
||||
service *PrepSubsystem
|
||||
agent string
|
||||
workspaceDir string
|
||||
outputFile string
|
||||
process completionProcess
|
||||
}
|
||||
|
||||
func (m *agentCompletionMonitor) run(_ context.Context, _ core.Options) core.Result {
|
||||
if m == nil || m.service == nil {
|
||||
return core.Result{Value: core.E("agentic.monitor", "service is required", nil), OK: false}
|
||||
}
|
||||
if m.process == nil {
|
||||
return core.Result{Value: core.E("agentic.monitor", "process is required", nil), OK: false}
|
||||
}
|
||||
|
||||
<-m.process.Done()
|
||||
info := m.process.Info()
|
||||
m.service.onAgentComplete(m.agent, m.workspaceDir, m.outputFile, info.ExitCode, string(info.Status), m.process.Output())
|
||||
return core.Result{OK: true}
|
||||
}
|
||||
|
||||
// runQA runs build + test checks on the repo after agent completion.
|
||||
// Returns true if QA passes, false if build or tests fail.
|
||||
func (s *PrepSubsystem) runQA(wsDir string) bool {
|
||||
|
|
|
|||
|
|
@ -11,10 +11,21 @@ import (
|
|||
|
||||
core "dappco.re/go/core"
|
||||
"dappco.re/go/core/forge"
|
||||
"dappco.re/go/core/process"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
type fakeCompletionProcess struct {
|
||||
done chan struct{}
|
||||
info process.Info
|
||||
output string
|
||||
}
|
||||
|
||||
func (p *fakeCompletionProcess) Done() <-chan struct{} { return p.done }
|
||||
func (p *fakeCompletionProcess) Info() process.Info { return p.info }
|
||||
func (p *fakeCompletionProcess) Output() string { return p.output }
|
||||
|
||||
// --- agentCommand ---
|
||||
|
||||
// Good: tested in logic_test.go (TestAgentCommand_Good_*)
|
||||
|
|
@ -243,6 +254,104 @@ func TestDispatch_BroadcastComplete_Ugly(t *testing.T) {
|
|||
s.broadcastComplete("codex", t.TempDir(), "completed")
|
||||
}
|
||||
|
||||
// --- agentCompletionMonitor ---
|
||||
|
||||
func TestDispatch_AgentCompletionMonitor_Good(t *testing.T) {
|
||||
root := t.TempDir()
|
||||
t.Setenv("CORE_WORKSPACE", root)
|
||||
|
||||
wsDir := core.JoinPath(root, "ws-monitor")
|
||||
repoDir := core.JoinPath(wsDir, "repo")
|
||||
metaDir := core.JoinPath(wsDir, ".meta")
|
||||
fs.EnsureDir(repoDir)
|
||||
fs.EnsureDir(metaDir)
|
||||
|
||||
st := &WorkspaceStatus{Status: "running", Repo: "go-io", Agent: "codex", StartedAt: time.Now()}
|
||||
fs.Write(core.JoinPath(wsDir, "status.json"), core.JSONMarshalString(st))
|
||||
|
||||
proc := &fakeCompletionProcess{
|
||||
done: make(chan struct{}),
|
||||
info: process.Info{ExitCode: 0, Status: process.Status("completed")},
|
||||
output: "monitor output",
|
||||
}
|
||||
close(proc.done)
|
||||
|
||||
s := newPrepWithProcess()
|
||||
monitor := &agentCompletionMonitor{
|
||||
service: s,
|
||||
agent: "codex",
|
||||
workspaceDir: wsDir,
|
||||
outputFile: core.JoinPath(metaDir, "agent-codex.log"),
|
||||
process: proc,
|
||||
}
|
||||
|
||||
r := monitor.run(context.Background(), core.NewOptions())
|
||||
assert.True(t, r.OK)
|
||||
|
||||
updated, err := ReadStatus(wsDir)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "completed", updated.Status)
|
||||
assert.Equal(t, 0, updated.PID)
|
||||
|
||||
output := fs.Read(core.JoinPath(metaDir, "agent-codex.log"))
|
||||
require.True(t, output.OK)
|
||||
assert.Equal(t, "monitor output", output.Value.(string))
|
||||
}
|
||||
|
||||
func TestDispatch_AgentCompletionMonitor_Bad(t *testing.T) {
|
||||
s := newPrepWithProcess()
|
||||
monitor := &agentCompletionMonitor{
|
||||
service: s,
|
||||
agent: "codex",
|
||||
workspaceDir: t.TempDir(),
|
||||
}
|
||||
|
||||
r := monitor.run(context.Background(), core.NewOptions())
|
||||
assert.False(t, r.OK)
|
||||
require.Error(t, r.Value.(error))
|
||||
assert.Contains(t, r.Value.(error).Error(), "process is required")
|
||||
}
|
||||
|
||||
func TestDispatch_AgentCompletionMonitor_Ugly(t *testing.T) {
|
||||
root := t.TempDir()
|
||||
t.Setenv("CORE_WORKSPACE", root)
|
||||
|
||||
wsDir := core.JoinPath(root, "ws-blocked")
|
||||
repoDir := core.JoinPath(wsDir, "repo")
|
||||
metaDir := core.JoinPath(wsDir, ".meta")
|
||||
fs.EnsureDir(repoDir)
|
||||
fs.EnsureDir(metaDir)
|
||||
|
||||
fs.Write(core.JoinPath(repoDir, "BLOCKED.md"), "Need credentials")
|
||||
st := &WorkspaceStatus{Status: "running", Repo: "go-io", Agent: "codex", StartedAt: time.Now()}
|
||||
fs.Write(core.JoinPath(wsDir, "status.json"), core.JSONMarshalString(st))
|
||||
|
||||
proc := &fakeCompletionProcess{
|
||||
done: make(chan struct{}),
|
||||
info: process.Info{ExitCode: 1, Status: process.Status("failed")},
|
||||
output: "",
|
||||
}
|
||||
close(proc.done)
|
||||
|
||||
s := newPrepWithProcess()
|
||||
monitor := &agentCompletionMonitor{
|
||||
service: s,
|
||||
agent: "codex",
|
||||
workspaceDir: wsDir,
|
||||
outputFile: core.JoinPath(metaDir, "agent-codex.log"),
|
||||
process: proc,
|
||||
}
|
||||
|
||||
r := monitor.run(context.Background(), core.NewOptions())
|
||||
assert.True(t, r.OK)
|
||||
|
||||
updated, err := ReadStatus(wsDir)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "blocked", updated.Status)
|
||||
assert.Equal(t, "Need credentials", updated.Question)
|
||||
assert.False(t, fs.Exists(core.JoinPath(metaDir, "agent-codex.log")))
|
||||
}
|
||||
|
||||
// --- onAgentComplete ---
|
||||
|
||||
func TestDispatch_OnAgentComplete_Good(t *testing.T) {
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue