From 5b39e13a6e1f98cfb3ce2f8ff26e9c77d6555846 Mon Sep 17 00:00:00 2001 From: Snider Date: Sat, 21 Mar 2026 17:10:43 +0000 Subject: [PATCH] =?UTF-8?q?fix:=20address=20Codex=20round=206=20findings?= =?UTF-8?q?=20=E2=80=94=202=20high,=203=20medium,=201=20low?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit High: workspace names use UnixNano to prevent same-second collisions High: sync only pulls the branch the server reported (was pulling current) Medium: drainQueue serialised via mutex to prevent concurrent over-dispatch Medium: remote_status checks JSON-RPC error field before reporting success Medium: dead agent PIDs without output log marked failed, not completed Low: detectLanguage uses ordered slice instead of map for deterministic results Also: URL-encoded agent names in messaging, monitor inbox, and sync endpoints. Co-Authored-By: Virgil --- pkg/agentic/prep.go | 30 ++++++++++++++++++------------ pkg/agentic/queue.go | 6 +++++- pkg/agentic/remote_status.go | 16 +++++++++++++++- pkg/agentic/status.go | 14 ++++++++++++-- pkg/monitor/sync.go | 19 +++++++++++-------- 5 files changed, 61 insertions(+), 24 deletions(-) diff --git a/pkg/agentic/prep.go b/pkg/agentic/prep.go index 320d9fa..bffd698 100644 --- a/pkg/agentic/prep.go +++ b/pkg/agentic/prep.go @@ -15,6 +15,7 @@ import ( "os/exec" "path/filepath" "strings" + "sync" "time" "dappco.re/go/agent/pkg/lib" @@ -40,6 +41,7 @@ type PrepSubsystem struct { codePath string client *http.Client onComplete CompletionNotifier + drainMu sync.Mutex // protects drainQueue from concurrent execution } // NewPrep creates an agentic subsystem. @@ -154,7 +156,7 @@ func (s *PrepSubsystem) prepWorkspace(ctx context.Context, _ *mcp.CallToolReques // Workspace root: .core/workspace/{repo}-{timestamp}/ wsRoot := WorkspaceRoot() - wsName := fmt.Sprintf("%s-%d", input.Repo, time.Now().Unix()) + wsName := fmt.Sprintf("%s-%d", input.Repo, time.Now().UnixNano()) wsDir := filepath.Join(wsRoot, wsName) // Create workspace structure @@ -604,19 +606,23 @@ func (s *PrepSubsystem) generateTodo(ctx context.Context, org, repo string, issu } // detectLanguage guesses the primary language from repo contents. +// Checks in priority order (Go first) to avoid nondeterministic results. func detectLanguage(repoPath string) string { - checks := map[string]string{ - "go.mod": "go", - "composer.json": "php", - "package.json": "ts", - "Cargo.toml": "rust", - "requirements.txt": "py", - "CMakeLists.txt": "cpp", - "Dockerfile": "docker", + checks := []struct { + file string + lang string + }{ + {"go.mod", "go"}, + {"composer.json", "php"}, + {"package.json", "ts"}, + {"Cargo.toml", "rust"}, + {"requirements.txt", "py"}, + {"CMakeLists.txt", "cpp"}, + {"Dockerfile", "docker"}, } - for file, lang := range checks { - if _, err := os.Stat(filepath.Join(repoPath, file)); err == nil { - return lang + for _, c := range checks { + if _, err := os.Stat(filepath.Join(repoPath, c.file)); err == nil { + return c.lang } } return "go" diff --git a/pkg/agentic/queue.go b/pkg/agentic/queue.go index c990024..d7084fc 100644 --- a/pkg/agentic/queue.go +++ b/pkg/agentic/queue.go @@ -154,8 +154,12 @@ func (s *PrepSubsystem) canDispatchAgent(agent string) bool { } // drainQueue finds the oldest queued workspace and spawns it if a slot is available. -// Applies rate-based delay between spawns. +// Applies rate-based delay between spawns. Serialised via drainMu to prevent +// concurrent drainers from exceeding concurrency limits. func (s *PrepSubsystem) drainQueue() { + s.drainMu.Lock() + defer s.drainMu.Unlock() + wsRoot := WorkspaceRoot() entries, err := os.ReadDir(wsRoot) diff --git a/pkg/agentic/remote_status.go b/pkg/agentic/remote_status.go index 535ed26..18e4b64 100644 --- a/pkg/agentic/remote_status.go +++ b/pkg/agentic/remote_status.go @@ -84,8 +84,22 @@ func (s *PrepSubsystem) statusRemote(ctx context.Context, _ *mcp.CallToolRequest Text string `json:"text"` } `json:"content"` } `json:"result"` + Error *struct { + Code int `json:"code"` + Message string `json:"message"` + } `json:"error"` } - if json.Unmarshal(result, &rpcResp) == nil && len(rpcResp.Result.Content) > 0 { + if json.Unmarshal(result, &rpcResp) != nil { + output.Success = false + output.Error = "failed to parse response" + return nil, output, nil + } + if rpcResp.Error != nil { + output.Success = false + output.Error = rpcResp.Error.Message + return nil, output, nil + } + if len(rpcResp.Result.Content) > 0 { var statusOut StatusOutput if json.Unmarshal([]byte(rpcResp.Result.Content[0].Text), &statusOut) == nil { output.Workspaces = statusOut.Workspaces diff --git a/pkg/agentic/status.go b/pkg/agentic/status.go index dbe87ce..cdf05b2 100644 --- a/pkg/agentic/status.go +++ b/pkg/agentic/status.go @@ -5,6 +5,7 @@ package agentic import ( "context" "encoding/json" + "fmt" "os" "path/filepath" "strings" @@ -157,8 +158,17 @@ func (s *PrepSubsystem) status(ctx context.Context, _ *mcp.CallToolRequest, inpu st.Status = "blocked" st.Question = info.Question } else { - info.Status = "completed" - st.Status = "completed" + // Dead PID without BLOCKED.md — check exit code from log + // If no evidence of success, mark as failed (not completed) + logFile := filepath.Join(wsDir, fmt.Sprintf("agent-%s.log", st.Agent)) + if _, err := coreio.Local.Read(logFile); err != nil { + info.Status = "failed" + st.Status = "failed" + st.Question = "Agent process died (no output log)" + } else { + info.Status = "completed" + st.Status = "completed" + } } writeStatus(wsDir, st) } diff --git a/pkg/monitor/sync.go b/pkg/monitor/sync.go index da78e58..19d9474 100644 --- a/pkg/monitor/sync.go +++ b/pkg/monitor/sync.go @@ -108,13 +108,16 @@ func (m *Subsystem) syncRepos() string { } current := strings.TrimSpace(string(currentBranch)) - // Accept main or master (or whatever the repo reports) - expectedBranch := repo.Branch - if expectedBranch == "" { - expectedBranch = "main" + // Determine which branch to pull — use server-reported branch, + // fall back to current if server didn't specify + targetBranch := repo.Branch + if targetBranch == "" { + targetBranch = current } - if current != expectedBranch && current != "main" && current != "master" { - continue // Don't pull if on a feature branch + + // Only pull if we're on the target branch (or it's a default branch) + if current != targetBranch { + continue // On a different branch — skip } statusCmd := exec.Command("git", "status", "--porcelain") @@ -124,8 +127,8 @@ func (m *Subsystem) syncRepos() string { continue // Don't pull if dirty } - // Fast-forward pull on whatever branch we're on - pullCmd := exec.Command("git", "pull", "--ff-only", "origin", current) + // Fast-forward pull the target branch + pullCmd := exec.Command("git", "pull", "--ff-only", "origin", targetBranch) pullCmd.Dir = repoDir if pullCmd.Run() == nil { pulled = append(pulled, repo.Repo)