From 4cc763176f66bf43b0bf3a6fc0b049d8f615cfcf Mon Sep 17 00:00:00 2001 From: Virgil Date: Sun, 29 Mar 2026 21:19:37 +0000 Subject: [PATCH] fix(ax): share workspace path helpers across services Co-Authored-By: Virgil --- pkg/agentic/commands.go | 4 +- pkg/agentic/commands_test.go | 80 +++++++++++++++++++++++++++++------- pkg/agentic/handlers.go | 9 ++-- pkg/agentic/prep.go | 22 +++------- pkg/agentic/queue.go | 29 ++++--------- pkg/agentic/status.go | 4 +- pkg/monitor/monitor.go | 5 +-- pkg/runner/queue.go | 13 ++---- pkg/runner/runner.go | 29 +++++-------- pkg/runner/runner_test.go | 23 +++++++++++ 10 files changed, 122 insertions(+), 96 deletions(-) diff --git a/pkg/agentic/commands.go b/pkg/agentic/commands.go index a3fbf8d..683878d 100644 --- a/pkg/agentic/commands.go +++ b/pkg/agentic/commands.go @@ -149,14 +149,14 @@ func (s *PrepSubsystem) cmdStatus(opts core.Options) core.Result { return core.Result{OK: true} } - statusFiles := core.PathGlob(core.JoinPath(wsRoot, "*", "status.json")) + statusFiles := WorkspaceStatusPaths() if len(statusFiles) == 0 { core.Print(nil, "no workspaces") return core.Result{OK: true} } for _, sf := range statusFiles { - core.Print(nil, " %s", core.PathBase(core.PathDir(sf))) + core.Print(nil, " %s", WorkspaceName(core.PathDir(sf))) } return core.Result{OK: true} } diff --git a/pkg/agentic/commands_test.go b/pkg/agentic/commands_test.go index 5f08374..a6ac4f8 100644 --- a/pkg/agentic/commands_test.go +++ b/pkg/agentic/commands_test.go @@ -4,8 +4,10 @@ package agentic import ( "context" + "io" "net/http" "net/http/httptest" + "os" "testing" "time" @@ -29,13 +31,13 @@ func testPrepWithCore(t *testing.T, srv *httptest.Server) (*PrepSubsystem, *core s := &PrepSubsystem{ ServiceRuntime: core.NewServiceRuntime(c, AgentOptions{}), - forge: f, - forgeURL: "", - forgeToken: "test-token", - codePath: t.TempDir(), - pokeCh: make(chan struct{}, 1), - backoff: make(map[string]time.Time), - failCount: make(map[string]int), + forge: f, + forgeURL: "", + forgeToken: "test-token", + codePath: t.TempDir(), + pokeCh: make(chan struct{}, 1), + backoff: make(map[string]time.Time), + failCount: make(map[string]int), } if srv != nil { s.forgeURL = srv.URL @@ -44,6 +46,35 @@ func testPrepWithCore(t *testing.T, srv *httptest.Server) (*PrepSubsystem, *core return s, c } +func captureStdout(t *testing.T, run func()) string { + t.Helper() + + old := os.Stdout + reader, writer, err := os.Pipe() + if err != nil { + t.Fatalf("pipe stdout: %v", err) + } + os.Stdout = writer + defer func() { + os.Stdout = old + }() + + run() + + if err := writer.Close(); err != nil { + t.Fatalf("close writer: %v", err) + } + data, err := io.ReadAll(reader) + if err != nil { + t.Fatalf("read stdout: %v", err) + } + if err := reader.Close(); err != nil { + t.Fatalf("close reader: %v", err) + } + + return string(data) +} + // --- Forge command methods (extracted from closures) --- func TestCommandsforge_CmdIssueGet_Bad_MissingArgs(t *testing.T) { @@ -564,6 +595,25 @@ func TestCommands_CmdStatus_Good_WithWorkspaces(t *testing.T) { assert.True(t, r.OK) } +func TestCommands_CmdStatus_Good_DeepWorkspace(t *testing.T) { + s, _ := testPrepWithCore(t, nil) + + ws := core.JoinPath(WorkspaceRoot(), "core", "go-io", "task-5") + fs.EnsureDir(ws) + fs.Write(core.JoinPath(ws, "status.json"), core.JSONMarshalString(WorkspaceStatus{ + Status: "completed", + Repo: "go-io", + Agent: "codex", + })) + + output := captureStdout(t, func() { + r := s.cmdStatus(core.NewOptions()) + assert.True(t, r.OK) + }) + + assert.Contains(t, output, "core/go-io/task-5") +} + func TestCommands_CmdPrompt_Bad_MissingRepo(t *testing.T) { s, _ := testPrepWithCore(t, nil) r := s.cmdPrompt(core.NewOptions()) @@ -814,8 +864,8 @@ func TestCommands_CmdStatus_Bad_NoWorkspaceDir(t *testing.T) { c := core.New() s := &PrepSubsystem{ ServiceRuntime: core.NewServiceRuntime(c, AgentOptions{}), - backoff: make(map[string]time.Time), - failCount: make(map[string]int), + backoff: make(map[string]time.Time), + failCount: make(map[string]int), } r := s.cmdStatus(core.NewOptions()) @@ -843,15 +893,15 @@ func TestCommands_CmdStatus_Ugly_NonDirEntries(t *testing.T) { func TestCommands_ParseIntStr_Bad_NegativeAndOverflow(t *testing.T) { // parseIntStr extracts digits only, ignoring minus signs - assert.Equal(t, 5, parseIntStr("-5")) // extracts "5", ignores "-" - assert.Equal(t, 0, parseIntStr("-")) // no digits - assert.Equal(t, 0, parseIntStr("---")) // no digits + assert.Equal(t, 5, parseIntStr("-5")) // extracts "5", ignores "-" + assert.Equal(t, 0, parseIntStr("-")) // no digits + assert.Equal(t, 0, parseIntStr("---")) // no digits } func TestCommands_ParseIntStr_Ugly_UnicodeAndMixed(t *testing.T) { // Unicode digits (e.g. Arabic-Indic) are NOT ASCII 0-9 so ignored assert.Equal(t, 0, parseIntStr("\u0661\u0662\u0663")) // ١٢٣ — not ASCII digits - assert.Equal(t, 42, parseIntStr("abc42xyz")) // mixed chars - assert.Equal(t, 123, parseIntStr("1a2b3c")) // interleaved - assert.Equal(t, 0, parseIntStr(" \t\n")) // whitespace only + assert.Equal(t, 42, parseIntStr("abc42xyz")) // mixed chars + assert.Equal(t, 123, parseIntStr("1a2b3c")) // interleaved + assert.Equal(t, 0, parseIntStr(" \t\n")) // whitespace only } diff --git a/pkg/agentic/handlers.go b/pkg/agentic/handlers.go index e54a319..fb4cbbb 100644 --- a/pkg/agentic/handlers.go +++ b/pkg/agentic/handlers.go @@ -33,12 +33,12 @@ func (s *PrepSubsystem) HandleIPCEvents(c *core.Core, msg core.Message) core.Res // Runner asks agentic to spawn a queued workspace wsDir := resolveWorkspace(ev.Workspace) if wsDir == "" { - break + break } prompt := core.Concat("TASK: ", ev.Task, "\n\nResume from where you left off. Read CODEX.md for conventions. Commit when done.") pid, outputFile, err := s.spawnAgent(ev.Agent, prompt, wsDir) if err != nil { - break + break } // Update status with real PID if st, serr := ReadStatus(wsDir); serr == nil { @@ -78,10 +78,7 @@ func resolveWorkspace(name string) string { // findWorkspaceByPR finds a workspace directory by repo name and branch. // Scans running/completed workspaces for a matching repo+branch combination. func findWorkspaceByPR(repo, branch string) string { - wsRoot := WorkspaceRoot() - old := core.PathGlob(core.JoinPath(wsRoot, "*", "status.json")) - deep := core.PathGlob(core.JoinPath(wsRoot, "*", "*", "*", "status.json")) - for _, path := range append(old, deep...) { + for _, path := range WorkspaceStatusPaths() { wsDir := core.PathDir(path) st, err := ReadStatus(wsDir) if err != nil { diff --git a/pkg/agentic/prep.go b/pkg/agentic/prep.go index 10d0ba2..8cbb9c9 100644 --- a/pkg/agentic/prep.go +++ b/pkg/agentic/prep.go @@ -258,23 +258,13 @@ func (s *PrepSubsystem) hydrateWorkspaces() { if s.workspaces == nil { s.workspaces = core.NewRegistry[*WorkspaceStatus]() } - wsRoot := WorkspaceRoot() - // Scan shallow (ws-name/) and deep (org/repo/task/) layouts - for _, pattern := range []string{ - core.JoinPath(wsRoot, "*", "status.json"), - core.JoinPath(wsRoot, "*", "*", "*", "status.json"), - } { - for _, path := range core.PathGlob(pattern) { - wsDir := core.PathDir(path) - st, err := ReadStatus(wsDir) - if err != nil || st == nil { - continue - } - // Key is the relative path from workspace root - name := core.TrimPrefix(wsDir, wsRoot) - name = core.TrimPrefix(name, "/") - s.workspaces.Set(name, st) + for _, path := range WorkspaceStatusPaths() { + wsDir := core.PathDir(path) + st, err := ReadStatus(wsDir) + if err != nil || st == nil { + continue } + s.workspaces.Set(WorkspaceName(wsDir), st) } } diff --git a/pkg/agentic/queue.go b/pkg/agentic/queue.go index c642b98..5021df0 100644 --- a/pkg/agentic/queue.go +++ b/pkg/agentic/queue.go @@ -71,10 +71,10 @@ func (c *ConcurrencyLimit) UnmarshalYAML(value *yaml.Node) error { // // cfg := agentic.AgentsConfig{Version: 1, Dispatch: agentic.DispatchConfig{DefaultAgent: "claude"}} type AgentsConfig struct { - Version int `yaml:"version"` - Dispatch DispatchConfig `yaml:"dispatch"` - Concurrency map[string]ConcurrencyLimit `yaml:"concurrency"` - Rates map[string]RateConfig `yaml:"rates"` + Version int `yaml:"version"` + Dispatch DispatchConfig `yaml:"dispatch"` + Concurrency map[string]ConcurrencyLimit `yaml:"concurrency"` + Rates map[string]RateConfig `yaml:"rates"` } // loadAgentsConfig reads config/agents.yaml from the code path. @@ -180,12 +180,8 @@ func (s *PrepSubsystem) countRunningByAgent(agent string) int { // countRunningByAgentDisk scans workspace status.json files on disk. // Used only as fallback before Registry hydration completes. func (s *PrepSubsystem) countRunningByAgentDisk(agent string) int { - wsRoot := WorkspaceRoot() - old := core.PathGlob(core.JoinPath(wsRoot, "*", "status.json")) - deep := core.PathGlob(core.JoinPath(wsRoot, "*", "*", "*", "status.json")) - count := 0 - for _, statusPath := range append(old, deep...) { + for _, statusPath := range WorkspaceStatusPaths() { st, err := ReadStatus(core.PathDir(statusPath)) if err != nil || st.Status != "running" { continue @@ -218,12 +214,8 @@ func (s *PrepSubsystem) countRunningByModel(agent string) int { } // Fallback: scan disk - wsRoot := WorkspaceRoot() - old := core.PathGlob(core.JoinPath(wsRoot, "*", "status.json")) - deep := core.PathGlob(core.JoinPath(wsRoot, "*", "*", "*", "status.json")) - count := 0 - for _, statusPath := range append(old, deep...) { + for _, statusPath := range WorkspaceStatusPaths() { st, err := ReadStatus(core.PathDir(statusPath)) if err != nil || st.Status != "running" { continue @@ -320,14 +312,7 @@ func (s *PrepSubsystem) drainQueue() { // drainOne finds the oldest queued workspace and spawns it if a slot is available. // Returns true if a task was spawned, false if nothing to do. func (s *PrepSubsystem) drainOne() bool { - wsRoot := WorkspaceRoot() - - // Scan both old and new workspace layouts - old := core.PathGlob(core.JoinPath(wsRoot, "*", "status.json")) - deep := core.PathGlob(core.JoinPath(wsRoot, "*", "*", "*", "status.json")) - statusFiles := append(old, deep...) - - for _, statusPath := range statusFiles { + for _, statusPath := range WorkspaceStatusPaths() { wsDir := core.PathDir(statusPath) st, err := ReadStatus(wsDir) if err != nil || st.Status != "queued" { diff --git a/pkg/agentic/status.go b/pkg/agentic/status.go index bcf2253..c7e20be 100644 --- a/pkg/agentic/status.go +++ b/pkg/agentic/status.go @@ -125,15 +125,13 @@ func (s *PrepSubsystem) registerStatusTool(server *mcp.Server) { } func (s *PrepSubsystem) status(ctx context.Context, _ *mcp.CallToolRequest, input StatusInput) (*mcp.CallToolResult, StatusOutput, error) { - wsRoot := WorkspaceRoot() - statusFiles := WorkspaceStatusPaths() var out StatusOutput for _, statusPath := range statusFiles { wsDir := core.PathDir(statusPath) - name := wsDir[len(wsRoot)+1:] + name := WorkspaceName(wsDir) st, err := ReadStatus(wsDir) if err != nil { diff --git a/pkg/monitor/monitor.go b/pkg/monitor/monitor.go index 30620ee..e91e716 100644 --- a/pkg/monitor/monitor.go +++ b/pkg/monitor/monitor.go @@ -252,10 +252,7 @@ func (m *Subsystem) checkIdleAfterDelay() { // countLiveWorkspaces counts workspaces that are genuinely active. // For "running" status, verifies the PID is still alive. func (m *Subsystem) countLiveWorkspaces() (running, queued int) { - wsRoot := agentic.WorkspaceRoot() - old := core.PathGlob(core.JoinPath(wsRoot, "*", "status.json")) - deep := core.PathGlob(core.JoinPath(wsRoot, "*", "*", "*", "status.json")) - for _, path := range append(old, deep...) { + for _, path := range agentic.WorkspaceStatusPaths() { wsDir := core.PathDir(path) st, err := agentic.ReadStatus(wsDir) if err != nil { diff --git a/pkg/runner/queue.go b/pkg/runner/queue.go index 56db845..2e032ad 100644 --- a/pkg/runner/queue.go +++ b/pkg/runner/queue.go @@ -7,6 +7,7 @@ import ( "syscall" "time" + "dappco.re/go/agent/pkg/agentic" core "dappco.re/go/core" "gopkg.in/yaml.v3" ) @@ -192,11 +193,7 @@ func (s *Service) drainQueue() { } func (s *Service) drainOne() bool { - wsRoot := WorkspaceRoot() - old := core.PathGlob(core.JoinPath(wsRoot, "*", "status.json")) - deep := core.PathGlob(core.JoinPath(wsRoot, "*", "*", "*", "status.json")) - - for _, statusPath := range append(old, deep...) { + for _, statusPath := range agentic.WorkspaceStatusPaths() { wsDir := core.PathDir(statusPath) st, err := ReadStatus(wsDir) if err != nil || st.Status != "queued" { @@ -224,11 +221,7 @@ func (s *Service) drainOne() bool { // Ask agentic to spawn — runner doesn't own the spawn logic, // just the gate. Send IPC to trigger the actual spawn. // Workspace name is relative path from workspace root (e.g. "core/go-ai/dev") - wsRoot := WorkspaceRoot() - wsName := wsDir - if len(wsDir) > len(wsRoot)+1 { - wsName = wsDir[len(wsRoot)+1:] - } + wsName := agentic.WorkspaceName(wsDir) core.Info("drainOne: found queued workspace", "workspace", wsName, "agent", st.Agent) // Spawn directly — agentic is a Core service, use ServiceFor to get it diff --git a/pkg/runner/runner.go b/pkg/runner/runner.go index 97ce45f..386dfe9 100644 --- a/pkg/runner/runner.go +++ b/pkg/runner/runner.go @@ -13,6 +13,7 @@ import ( "syscall" "time" + "dappco.re/go/agent/pkg/agentic" "dappco.re/go/agent/pkg/messages" core "dappco.re/go/core" ) @@ -370,25 +371,17 @@ func (s *Service) hydrateWorkspaces() { if s.workspaces == nil { s.workspaces = core.NewRegistry[*WorkspaceStatus]() } - wsRoot := WorkspaceRoot() - for _, pattern := range []string{ - core.JoinPath(wsRoot, "*", "status.json"), - core.JoinPath(wsRoot, "*", "*", "*", "status.json"), - } { - for _, path := range core.PathGlob(pattern) { - wsDir := core.PathDir(path) - st, err := ReadStatus(wsDir) - if err != nil || st == nil { - continue - } - // Re-queue running agents on restart — process is dead, re-dispatch - if st.Status == "running" { - st.Status = "queued" - } - name := core.TrimPrefix(wsDir, wsRoot) - name = core.TrimPrefix(name, "/") - s.workspaces.Set(name, st) + for _, path := range agentic.WorkspaceStatusPaths() { + wsDir := core.PathDir(path) + st, err := ReadStatus(wsDir) + if err != nil || st == nil { + continue } + // Re-queue running agents on restart — process is dead, re-dispatch + if st.Status == "running" { + st.Status = "queued" + } + s.workspaces.Set(agentic.WorkspaceName(wsDir), st) } } diff --git a/pkg/runner/runner_test.go b/pkg/runner/runner_test.go index cd8e748..17c7cf4 100644 --- a/pkg/runner/runner_test.go +++ b/pkg/runner/runner_test.go @@ -329,6 +329,29 @@ func TestRunner_HandleIPCEvents_Good_UpdatesMatchingWorkspaceOnly(t *testing.T) assert.Equal(t, 222, second.PID) } +func TestRunner_HydrateWorkspaces_Good_DeepWorkspaceName(t *testing.T) { + root := t.TempDir() + t.Setenv("CORE_WORKSPACE", root) + + wsDir := core.JoinPath(root, "workspace", "core", "go-io", "task-5") + fs.EnsureDir(wsDir) + WriteStatus(wsDir, &WorkspaceStatus{ + Status: "running", + Agent: "codex", + Repo: "go-io", + PID: 99999999, + }) + + svc := New() + svc.hydrateWorkspaces() + + r := svc.workspaces.Get("core/go-io/task-5") + assert.True(t, r.OK) + st := r.Value.(*WorkspaceStatus) + assert.Equal(t, "queued", st.Status) + assert.Equal(t, "go-io", st.Repo) +} + // --- WriteStatus / ReadStatus --- func TestRunner_WriteReadStatus_Good(t *testing.T) {