fix(ax): share workspace path helpers across services
Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
parent
65970527e8
commit
4cc763176f
10 changed files with 122 additions and 96 deletions
|
|
@ -149,14 +149,14 @@ func (s *PrepSubsystem) cmdStatus(opts core.Options) core.Result {
|
|||
return core.Result{OK: true}
|
||||
}
|
||||
|
||||
statusFiles := core.PathGlob(core.JoinPath(wsRoot, "*", "status.json"))
|
||||
statusFiles := WorkspaceStatusPaths()
|
||||
if len(statusFiles) == 0 {
|
||||
core.Print(nil, "no workspaces")
|
||||
return core.Result{OK: true}
|
||||
}
|
||||
|
||||
for _, sf := range statusFiles {
|
||||
core.Print(nil, " %s", core.PathBase(core.PathDir(sf)))
|
||||
core.Print(nil, " %s", WorkspaceName(core.PathDir(sf)))
|
||||
}
|
||||
return core.Result{OK: true}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,8 +4,10 @@ package agentic
|
|||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
|
@ -29,13 +31,13 @@ func testPrepWithCore(t *testing.T, srv *httptest.Server) (*PrepSubsystem, *core
|
|||
|
||||
s := &PrepSubsystem{
|
||||
ServiceRuntime: core.NewServiceRuntime(c, AgentOptions{}),
|
||||
forge: f,
|
||||
forgeURL: "",
|
||||
forgeToken: "test-token",
|
||||
codePath: t.TempDir(),
|
||||
pokeCh: make(chan struct{}, 1),
|
||||
backoff: make(map[string]time.Time),
|
||||
failCount: make(map[string]int),
|
||||
forge: f,
|
||||
forgeURL: "",
|
||||
forgeToken: "test-token",
|
||||
codePath: t.TempDir(),
|
||||
pokeCh: make(chan struct{}, 1),
|
||||
backoff: make(map[string]time.Time),
|
||||
failCount: make(map[string]int),
|
||||
}
|
||||
if srv != nil {
|
||||
s.forgeURL = srv.URL
|
||||
|
|
@ -44,6 +46,35 @@ func testPrepWithCore(t *testing.T, srv *httptest.Server) (*PrepSubsystem, *core
|
|||
return s, c
|
||||
}
|
||||
|
||||
func captureStdout(t *testing.T, run func()) string {
|
||||
t.Helper()
|
||||
|
||||
old := os.Stdout
|
||||
reader, writer, err := os.Pipe()
|
||||
if err != nil {
|
||||
t.Fatalf("pipe stdout: %v", err)
|
||||
}
|
||||
os.Stdout = writer
|
||||
defer func() {
|
||||
os.Stdout = old
|
||||
}()
|
||||
|
||||
run()
|
||||
|
||||
if err := writer.Close(); err != nil {
|
||||
t.Fatalf("close writer: %v", err)
|
||||
}
|
||||
data, err := io.ReadAll(reader)
|
||||
if err != nil {
|
||||
t.Fatalf("read stdout: %v", err)
|
||||
}
|
||||
if err := reader.Close(); err != nil {
|
||||
t.Fatalf("close reader: %v", err)
|
||||
}
|
||||
|
||||
return string(data)
|
||||
}
|
||||
|
||||
// --- Forge command methods (extracted from closures) ---
|
||||
|
||||
func TestCommandsforge_CmdIssueGet_Bad_MissingArgs(t *testing.T) {
|
||||
|
|
@ -564,6 +595,25 @@ func TestCommands_CmdStatus_Good_WithWorkspaces(t *testing.T) {
|
|||
assert.True(t, r.OK)
|
||||
}
|
||||
|
||||
func TestCommands_CmdStatus_Good_DeepWorkspace(t *testing.T) {
|
||||
s, _ := testPrepWithCore(t, nil)
|
||||
|
||||
ws := core.JoinPath(WorkspaceRoot(), "core", "go-io", "task-5")
|
||||
fs.EnsureDir(ws)
|
||||
fs.Write(core.JoinPath(ws, "status.json"), core.JSONMarshalString(WorkspaceStatus{
|
||||
Status: "completed",
|
||||
Repo: "go-io",
|
||||
Agent: "codex",
|
||||
}))
|
||||
|
||||
output := captureStdout(t, func() {
|
||||
r := s.cmdStatus(core.NewOptions())
|
||||
assert.True(t, r.OK)
|
||||
})
|
||||
|
||||
assert.Contains(t, output, "core/go-io/task-5")
|
||||
}
|
||||
|
||||
func TestCommands_CmdPrompt_Bad_MissingRepo(t *testing.T) {
|
||||
s, _ := testPrepWithCore(t, nil)
|
||||
r := s.cmdPrompt(core.NewOptions())
|
||||
|
|
@ -814,8 +864,8 @@ func TestCommands_CmdStatus_Bad_NoWorkspaceDir(t *testing.T) {
|
|||
c := core.New()
|
||||
s := &PrepSubsystem{
|
||||
ServiceRuntime: core.NewServiceRuntime(c, AgentOptions{}),
|
||||
backoff: make(map[string]time.Time),
|
||||
failCount: make(map[string]int),
|
||||
backoff: make(map[string]time.Time),
|
||||
failCount: make(map[string]int),
|
||||
}
|
||||
|
||||
r := s.cmdStatus(core.NewOptions())
|
||||
|
|
@ -843,15 +893,15 @@ func TestCommands_CmdStatus_Ugly_NonDirEntries(t *testing.T) {
|
|||
|
||||
func TestCommands_ParseIntStr_Bad_NegativeAndOverflow(t *testing.T) {
|
||||
// parseIntStr extracts digits only, ignoring minus signs
|
||||
assert.Equal(t, 5, parseIntStr("-5")) // extracts "5", ignores "-"
|
||||
assert.Equal(t, 0, parseIntStr("-")) // no digits
|
||||
assert.Equal(t, 0, parseIntStr("---")) // no digits
|
||||
assert.Equal(t, 5, parseIntStr("-5")) // extracts "5", ignores "-"
|
||||
assert.Equal(t, 0, parseIntStr("-")) // no digits
|
||||
assert.Equal(t, 0, parseIntStr("---")) // no digits
|
||||
}
|
||||
|
||||
func TestCommands_ParseIntStr_Ugly_UnicodeAndMixed(t *testing.T) {
|
||||
// Unicode digits (e.g. Arabic-Indic) are NOT ASCII 0-9 so ignored
|
||||
assert.Equal(t, 0, parseIntStr("\u0661\u0662\u0663")) // ١٢٣ — not ASCII digits
|
||||
assert.Equal(t, 42, parseIntStr("abc42xyz")) // mixed chars
|
||||
assert.Equal(t, 123, parseIntStr("1a2b3c")) // interleaved
|
||||
assert.Equal(t, 0, parseIntStr(" \t\n")) // whitespace only
|
||||
assert.Equal(t, 42, parseIntStr("abc42xyz")) // mixed chars
|
||||
assert.Equal(t, 123, parseIntStr("1a2b3c")) // interleaved
|
||||
assert.Equal(t, 0, parseIntStr(" \t\n")) // whitespace only
|
||||
}
|
||||
|
|
|
|||
|
|
@ -33,12 +33,12 @@ func (s *PrepSubsystem) HandleIPCEvents(c *core.Core, msg core.Message) core.Res
|
|||
// Runner asks agentic to spawn a queued workspace
|
||||
wsDir := resolveWorkspace(ev.Workspace)
|
||||
if wsDir == "" {
|
||||
break
|
||||
break
|
||||
}
|
||||
prompt := core.Concat("TASK: ", ev.Task, "\n\nResume from where you left off. Read CODEX.md for conventions. Commit when done.")
|
||||
pid, outputFile, err := s.spawnAgent(ev.Agent, prompt, wsDir)
|
||||
if err != nil {
|
||||
break
|
||||
break
|
||||
}
|
||||
// Update status with real PID
|
||||
if st, serr := ReadStatus(wsDir); serr == nil {
|
||||
|
|
@ -78,10 +78,7 @@ func resolveWorkspace(name string) string {
|
|||
// findWorkspaceByPR finds a workspace directory by repo name and branch.
|
||||
// Scans running/completed workspaces for a matching repo+branch combination.
|
||||
func findWorkspaceByPR(repo, branch string) string {
|
||||
wsRoot := WorkspaceRoot()
|
||||
old := core.PathGlob(core.JoinPath(wsRoot, "*", "status.json"))
|
||||
deep := core.PathGlob(core.JoinPath(wsRoot, "*", "*", "*", "status.json"))
|
||||
for _, path := range append(old, deep...) {
|
||||
for _, path := range WorkspaceStatusPaths() {
|
||||
wsDir := core.PathDir(path)
|
||||
st, err := ReadStatus(wsDir)
|
||||
if err != nil {
|
||||
|
|
|
|||
|
|
@ -258,23 +258,13 @@ func (s *PrepSubsystem) hydrateWorkspaces() {
|
|||
if s.workspaces == nil {
|
||||
s.workspaces = core.NewRegistry[*WorkspaceStatus]()
|
||||
}
|
||||
wsRoot := WorkspaceRoot()
|
||||
// Scan shallow (ws-name/) and deep (org/repo/task/) layouts
|
||||
for _, pattern := range []string{
|
||||
core.JoinPath(wsRoot, "*", "status.json"),
|
||||
core.JoinPath(wsRoot, "*", "*", "*", "status.json"),
|
||||
} {
|
||||
for _, path := range core.PathGlob(pattern) {
|
||||
wsDir := core.PathDir(path)
|
||||
st, err := ReadStatus(wsDir)
|
||||
if err != nil || st == nil {
|
||||
continue
|
||||
}
|
||||
// Key is the relative path from workspace root
|
||||
name := core.TrimPrefix(wsDir, wsRoot)
|
||||
name = core.TrimPrefix(name, "/")
|
||||
s.workspaces.Set(name, st)
|
||||
for _, path := range WorkspaceStatusPaths() {
|
||||
wsDir := core.PathDir(path)
|
||||
st, err := ReadStatus(wsDir)
|
||||
if err != nil || st == nil {
|
||||
continue
|
||||
}
|
||||
s.workspaces.Set(WorkspaceName(wsDir), st)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -71,10 +71,10 @@ func (c *ConcurrencyLimit) UnmarshalYAML(value *yaml.Node) error {
|
|||
//
|
||||
// cfg := agentic.AgentsConfig{Version: 1, Dispatch: agentic.DispatchConfig{DefaultAgent: "claude"}}
|
||||
type AgentsConfig struct {
|
||||
Version int `yaml:"version"`
|
||||
Dispatch DispatchConfig `yaml:"dispatch"`
|
||||
Concurrency map[string]ConcurrencyLimit `yaml:"concurrency"`
|
||||
Rates map[string]RateConfig `yaml:"rates"`
|
||||
Version int `yaml:"version"`
|
||||
Dispatch DispatchConfig `yaml:"dispatch"`
|
||||
Concurrency map[string]ConcurrencyLimit `yaml:"concurrency"`
|
||||
Rates map[string]RateConfig `yaml:"rates"`
|
||||
}
|
||||
|
||||
// loadAgentsConfig reads config/agents.yaml from the code path.
|
||||
|
|
@ -180,12 +180,8 @@ func (s *PrepSubsystem) countRunningByAgent(agent string) int {
|
|||
// countRunningByAgentDisk scans workspace status.json files on disk.
|
||||
// Used only as fallback before Registry hydration completes.
|
||||
func (s *PrepSubsystem) countRunningByAgentDisk(agent string) int {
|
||||
wsRoot := WorkspaceRoot()
|
||||
old := core.PathGlob(core.JoinPath(wsRoot, "*", "status.json"))
|
||||
deep := core.PathGlob(core.JoinPath(wsRoot, "*", "*", "*", "status.json"))
|
||||
|
||||
count := 0
|
||||
for _, statusPath := range append(old, deep...) {
|
||||
for _, statusPath := range WorkspaceStatusPaths() {
|
||||
st, err := ReadStatus(core.PathDir(statusPath))
|
||||
if err != nil || st.Status != "running" {
|
||||
continue
|
||||
|
|
@ -218,12 +214,8 @@ func (s *PrepSubsystem) countRunningByModel(agent string) int {
|
|||
}
|
||||
|
||||
// Fallback: scan disk
|
||||
wsRoot := WorkspaceRoot()
|
||||
old := core.PathGlob(core.JoinPath(wsRoot, "*", "status.json"))
|
||||
deep := core.PathGlob(core.JoinPath(wsRoot, "*", "*", "*", "status.json"))
|
||||
|
||||
count := 0
|
||||
for _, statusPath := range append(old, deep...) {
|
||||
for _, statusPath := range WorkspaceStatusPaths() {
|
||||
st, err := ReadStatus(core.PathDir(statusPath))
|
||||
if err != nil || st.Status != "running" {
|
||||
continue
|
||||
|
|
@ -320,14 +312,7 @@ func (s *PrepSubsystem) drainQueue() {
|
|||
// drainOne finds the oldest queued workspace and spawns it if a slot is available.
|
||||
// Returns true if a task was spawned, false if nothing to do.
|
||||
func (s *PrepSubsystem) drainOne() bool {
|
||||
wsRoot := WorkspaceRoot()
|
||||
|
||||
// Scan both old and new workspace layouts
|
||||
old := core.PathGlob(core.JoinPath(wsRoot, "*", "status.json"))
|
||||
deep := core.PathGlob(core.JoinPath(wsRoot, "*", "*", "*", "status.json"))
|
||||
statusFiles := append(old, deep...)
|
||||
|
||||
for _, statusPath := range statusFiles {
|
||||
for _, statusPath := range WorkspaceStatusPaths() {
|
||||
wsDir := core.PathDir(statusPath)
|
||||
st, err := ReadStatus(wsDir)
|
||||
if err != nil || st.Status != "queued" {
|
||||
|
|
|
|||
|
|
@ -125,15 +125,13 @@ func (s *PrepSubsystem) registerStatusTool(server *mcp.Server) {
|
|||
}
|
||||
|
||||
func (s *PrepSubsystem) status(ctx context.Context, _ *mcp.CallToolRequest, input StatusInput) (*mcp.CallToolResult, StatusOutput, error) {
|
||||
wsRoot := WorkspaceRoot()
|
||||
|
||||
statusFiles := WorkspaceStatusPaths()
|
||||
|
||||
var out StatusOutput
|
||||
|
||||
for _, statusPath := range statusFiles {
|
||||
wsDir := core.PathDir(statusPath)
|
||||
name := wsDir[len(wsRoot)+1:]
|
||||
name := WorkspaceName(wsDir)
|
||||
|
||||
st, err := ReadStatus(wsDir)
|
||||
if err != nil {
|
||||
|
|
|
|||
|
|
@ -252,10 +252,7 @@ func (m *Subsystem) checkIdleAfterDelay() {
|
|||
// countLiveWorkspaces counts workspaces that are genuinely active.
|
||||
// For "running" status, verifies the PID is still alive.
|
||||
func (m *Subsystem) countLiveWorkspaces() (running, queued int) {
|
||||
wsRoot := agentic.WorkspaceRoot()
|
||||
old := core.PathGlob(core.JoinPath(wsRoot, "*", "status.json"))
|
||||
deep := core.PathGlob(core.JoinPath(wsRoot, "*", "*", "*", "status.json"))
|
||||
for _, path := range append(old, deep...) {
|
||||
for _, path := range agentic.WorkspaceStatusPaths() {
|
||||
wsDir := core.PathDir(path)
|
||||
st, err := agentic.ReadStatus(wsDir)
|
||||
if err != nil {
|
||||
|
|
|
|||
|
|
@ -7,6 +7,7 @@ import (
|
|||
"syscall"
|
||||
"time"
|
||||
|
||||
"dappco.re/go/agent/pkg/agentic"
|
||||
core "dappco.re/go/core"
|
||||
"gopkg.in/yaml.v3"
|
||||
)
|
||||
|
|
@ -192,11 +193,7 @@ func (s *Service) drainQueue() {
|
|||
}
|
||||
|
||||
func (s *Service) drainOne() bool {
|
||||
wsRoot := WorkspaceRoot()
|
||||
old := core.PathGlob(core.JoinPath(wsRoot, "*", "status.json"))
|
||||
deep := core.PathGlob(core.JoinPath(wsRoot, "*", "*", "*", "status.json"))
|
||||
|
||||
for _, statusPath := range append(old, deep...) {
|
||||
for _, statusPath := range agentic.WorkspaceStatusPaths() {
|
||||
wsDir := core.PathDir(statusPath)
|
||||
st, err := ReadStatus(wsDir)
|
||||
if err != nil || st.Status != "queued" {
|
||||
|
|
@ -224,11 +221,7 @@ func (s *Service) drainOne() bool {
|
|||
// Ask agentic to spawn — runner doesn't own the spawn logic,
|
||||
// just the gate. Send IPC to trigger the actual spawn.
|
||||
// Workspace name is relative path from workspace root (e.g. "core/go-ai/dev")
|
||||
wsRoot := WorkspaceRoot()
|
||||
wsName := wsDir
|
||||
if len(wsDir) > len(wsRoot)+1 {
|
||||
wsName = wsDir[len(wsRoot)+1:]
|
||||
}
|
||||
wsName := agentic.WorkspaceName(wsDir)
|
||||
core.Info("drainOne: found queued workspace", "workspace", wsName, "agent", st.Agent)
|
||||
|
||||
// Spawn directly — agentic is a Core service, use ServiceFor to get it
|
||||
|
|
|
|||
|
|
@ -13,6 +13,7 @@ import (
|
|||
"syscall"
|
||||
"time"
|
||||
|
||||
"dappco.re/go/agent/pkg/agentic"
|
||||
"dappco.re/go/agent/pkg/messages"
|
||||
core "dappco.re/go/core"
|
||||
)
|
||||
|
|
@ -370,25 +371,17 @@ func (s *Service) hydrateWorkspaces() {
|
|||
if s.workspaces == nil {
|
||||
s.workspaces = core.NewRegistry[*WorkspaceStatus]()
|
||||
}
|
||||
wsRoot := WorkspaceRoot()
|
||||
for _, pattern := range []string{
|
||||
core.JoinPath(wsRoot, "*", "status.json"),
|
||||
core.JoinPath(wsRoot, "*", "*", "*", "status.json"),
|
||||
} {
|
||||
for _, path := range core.PathGlob(pattern) {
|
||||
wsDir := core.PathDir(path)
|
||||
st, err := ReadStatus(wsDir)
|
||||
if err != nil || st == nil {
|
||||
continue
|
||||
}
|
||||
// Re-queue running agents on restart — process is dead, re-dispatch
|
||||
if st.Status == "running" {
|
||||
st.Status = "queued"
|
||||
}
|
||||
name := core.TrimPrefix(wsDir, wsRoot)
|
||||
name = core.TrimPrefix(name, "/")
|
||||
s.workspaces.Set(name, st)
|
||||
for _, path := range agentic.WorkspaceStatusPaths() {
|
||||
wsDir := core.PathDir(path)
|
||||
st, err := ReadStatus(wsDir)
|
||||
if err != nil || st == nil {
|
||||
continue
|
||||
}
|
||||
// Re-queue running agents on restart — process is dead, re-dispatch
|
||||
if st.Status == "running" {
|
||||
st.Status = "queued"
|
||||
}
|
||||
s.workspaces.Set(agentic.WorkspaceName(wsDir), st)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -329,6 +329,29 @@ func TestRunner_HandleIPCEvents_Good_UpdatesMatchingWorkspaceOnly(t *testing.T)
|
|||
assert.Equal(t, 222, second.PID)
|
||||
}
|
||||
|
||||
func TestRunner_HydrateWorkspaces_Good_DeepWorkspaceName(t *testing.T) {
|
||||
root := t.TempDir()
|
||||
t.Setenv("CORE_WORKSPACE", root)
|
||||
|
||||
wsDir := core.JoinPath(root, "workspace", "core", "go-io", "task-5")
|
||||
fs.EnsureDir(wsDir)
|
||||
WriteStatus(wsDir, &WorkspaceStatus{
|
||||
Status: "running",
|
||||
Agent: "codex",
|
||||
Repo: "go-io",
|
||||
PID: 99999999,
|
||||
})
|
||||
|
||||
svc := New()
|
||||
svc.hydrateWorkspaces()
|
||||
|
||||
r := svc.workspaces.Get("core/go-io/task-5")
|
||||
assert.True(t, r.OK)
|
||||
st := r.Value.(*WorkspaceStatus)
|
||||
assert.Equal(t, "queued", st.Status)
|
||||
assert.Equal(t, "go-io", st.Repo)
|
||||
}
|
||||
|
||||
// --- WriteStatus / ReadStatus ---
|
||||
|
||||
func TestRunner_WriteReadStatus_Good(t *testing.T) {
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue