From 90bb784e7947797f13e8cbccd45d61d21ee3067e Mon Sep 17 00:00:00 2001 From: Snider Date: Sun, 22 Mar 2026 01:57:12 +0000 Subject: [PATCH 1/9] fix(agentic): workspace root is ~/Code/.core/ not ~/Code/host-uk/core/.core/ The hardcoded host-uk/core path doesn't exist on the homelab, causing countRunningByAgent to always return 0 (no concurrency limiting) and agentic_status to miss workspaces. Co-Authored-By: Virgil --- pkg/mcp/agentic/plan.go | 2 +- pkg/mcp/agentic/prep.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/mcp/agentic/plan.go b/pkg/mcp/agentic/plan.go index 37a611e..db2cf8d 100644 --- a/pkg/mcp/agentic/plan.go +++ b/pkg/mcp/agentic/plan.go @@ -312,7 +312,7 @@ func (s *PrepSubsystem) planList(_ context.Context, _ *mcp.CallToolRequest, inpu // --- Helpers --- func (s *PrepSubsystem) plansDir() string { - return filepath.Join(s.codePath, "host-uk", "core", ".core", "plans") + return filepath.Join(s.codePath, ".core", "plans") } func planPath(dir, id string) string { diff --git a/pkg/mcp/agentic/prep.go b/pkg/mcp/agentic/prep.go index d48aeeb..d10bdc1 100644 --- a/pkg/mcp/agentic/prep.go +++ b/pkg/mcp/agentic/prep.go @@ -98,7 +98,7 @@ func (s *PrepSubsystem) Shutdown(_ context.Context) error { return nil } // workspaceRoot returns the base directory for agent workspaces. func (s *PrepSubsystem) workspaceRoot() string { - return filepath.Join(s.codePath, "host-uk", "core", ".core", "workspace") + return filepath.Join(s.codePath, ".core", "workspace") } // --- Input/Output types --- From 94cc1b9ed508474231994d62722e2904081f854d Mon Sep 17 00:00:00 2001 From: Snider Date: Sun, 22 Mar 2026 02:11:59 +0000 Subject: [PATCH 2/9] fix(agentic): scan nested workspace dirs for concurrency + status listWorkspaceDirs() now recurses one level into subdirectories (e.g. workspace/core/go-io-123/) so countRunningByAgent and agentic_status find workspaces regardless of directory structure. Co-Authored-By: Virgil --- pkg/mcp/agentic/queue.go | 55 +++++++++++++++++++++++---------------- pkg/mcp/agentic/status.go | 20 +++++--------- 2 files changed, 39 insertions(+), 36 deletions(-) diff --git a/pkg/mcp/agentic/queue.go b/pkg/mcp/agentic/queue.go index e7d1a5b..ef6239c 100644 --- a/pkg/mcp/agentic/queue.go +++ b/pkg/mcp/agentic/queue.go @@ -103,31 +103,55 @@ func (s *PrepSubsystem) delayForAgent(agent string) time.Duration { return time.Duration(rate.SustainedDelay) * time.Second } -// countRunningByAgent counts running workspaces for a specific agent type. -func (s *PrepSubsystem) countRunningByAgent(agent string) int { +// listWorkspaceDirs returns all workspace directories, including those +// nested one level deep (e.g. workspace/core/go-io-123/). +func (s *PrepSubsystem) listWorkspaceDirs() []string { wsRoot := s.workspaceRoot() - entries, err := coreio.Local.List(wsRoot) if err != nil { - return 0 + return nil } - count := 0 + var dirs []string for _, entry := range entries { if !entry.IsDir() { continue } + path := filepath.Join(wsRoot, entry.Name()) + // Check if this dir has a status.json (it's a workspace) + if coreio.Local.IsFile(filepath.Join(path, "status.json")) { + dirs = append(dirs, path) + continue + } + // Otherwise check one level deeper (org subdirectory) + subEntries, err := coreio.Local.List(path) + if err != nil { + continue + } + for _, sub := range subEntries { + if sub.IsDir() { + subPath := filepath.Join(path, sub.Name()) + if coreio.Local.IsFile(filepath.Join(subPath, "status.json")) { + dirs = append(dirs, subPath) + } + } + } + } + return dirs +} - st, err := readStatus(filepath.Join(wsRoot, entry.Name())) +// countRunningByAgent counts running workspaces for a specific agent type. +func (s *PrepSubsystem) countRunningByAgent(agent string) int { + count := 0 + for _, wsDir := range s.listWorkspaceDirs() { + st, err := readStatus(wsDir) if err != nil || st.Status != "running" { continue } - // Match on base agent type (gemini:flash matches gemini) stBase := strings.SplitN(st.Agent, ":", 2)[0] if stBase != agent { continue } - if st.PID > 0 { proc, err := os.FindProcess(st.PID) if err == nil && proc.Signal(syscall.Signal(0)) == nil { @@ -135,7 +159,6 @@ func (s *PrepSubsystem) countRunningByAgent(agent string) int { } } } - return count } @@ -163,19 +186,7 @@ func (s *PrepSubsystem) canDispatch() bool { // drainQueue finds the oldest queued workspace and spawns it if a slot is available. // Applies rate-based delay between spawns. func (s *PrepSubsystem) drainQueue() { - wsRoot := s.workspaceRoot() - - entries, err := coreio.Local.List(wsRoot) - if err != nil { - return - } - - for _, entry := range entries { - if !entry.IsDir() { - continue - } - - wsDir := filepath.Join(wsRoot, entry.Name()) + for _, wsDir := range s.listWorkspaceDirs() { st, err := readStatus(wsDir) if err != nil || st.Status != "queued" { continue diff --git a/pkg/mcp/agentic/status.go b/pkg/mcp/agentic/status.go index 1b3f01e..2aa4fae 100644 --- a/pkg/mcp/agentic/status.go +++ b/pkg/mcp/agentic/status.go @@ -95,28 +95,21 @@ func (s *PrepSubsystem) registerStatusTool(server *mcp.Server) { } func (s *PrepSubsystem) status(ctx context.Context, _ *mcp.CallToolRequest, input StatusInput) (*mcp.CallToolResult, StatusOutput, error) { - wsRoot := s.workspaceRoot() - - entries, err := coreio.Local.List(wsRoot) - if err != nil { - return nil, StatusOutput{}, coreerr.E("status", "no workspaces found", err) + wsDirs := s.listWorkspaceDirs() + if len(wsDirs) == 0 { + return nil, StatusOutput{}, coreerr.E("status", "no workspaces found", nil) } var workspaces []WorkspaceInfo - for _, entry := range entries { - if !entry.IsDir() { - continue - } - - name := entry.Name() + for _, wsDir := range wsDirs { + name := filepath.Base(wsDir) // Filter by specific workspace if requested if input.Workspace != "" && name != input.Workspace { continue } - wsDir := filepath.Join(wsRoot, name) info := WorkspaceInfo{Name: name} // Try reading status.json @@ -129,8 +122,7 @@ func (s *PrepSubsystem) status(ctx context.Context, _ *mcp.CallToolRequest, inpu } else { info.Status = "unknown" } - fi, _ := entry.Info() - if fi != nil { + if fi, err := os.Stat(wsDir); err == nil { info.Age = time.Since(fi.ModTime()).Truncate(time.Minute).String() } workspaces = append(workspaces, info) From 72ba11b4819f562b4ee2a8a33e3c9aa6367ae438 Mon Sep 17 00:00:00 2001 From: Snider Date: Sun, 22 Mar 2026 02:28:05 +0000 Subject: [PATCH 3/9] fix(agentic): config path is ~/Code/.core/agents.yaml only MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit All .core/ config lives at codePath/.core/ — not in individual repos. Co-Authored-By: Virgil --- pkg/mcp/agentic/queue.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/pkg/mcp/agentic/queue.go b/pkg/mcp/agentic/queue.go index ef6239c..b8a8972 100644 --- a/pkg/mcp/agentic/queue.go +++ b/pkg/mcp/agentic/queue.go @@ -43,9 +43,7 @@ type AgentsConfig struct { // loadAgentsConfig reads config/agents.yaml from the code path. func (s *PrepSubsystem) loadAgentsConfig() *AgentsConfig { paths := []string{ - filepath.Join(s.codePath, "core", "agent", "config", "agents.yaml"), - filepath.Join(s.codePath, "core", "agent", ".core", "agents.yaml"), - filepath.Join(s.codePath, "host-uk", "core", ".core", "agents.yaml"), + filepath.Join(s.codePath, ".core", "agents.yaml"), } for _, path := range paths { From 4329bd7f27f7c75cc91e2ad165345fd88c7a5697 Mon Sep 17 00:00:00 2001 From: Snider Date: Sun, 22 Mar 2026 02:32:17 +0000 Subject: [PATCH 4/9] fix(agentic): write status before spawn to prevent concurrency race writeStatus("running") was after cmd.Start(), so rapid sequential dispatches all saw 0 running. Now writes status immediately after the concurrency check passes, before spawning. Updates with PID after start. Reverts to "failed" if spawn fails. Co-Authored-By: Virgil --- pkg/mcp/agentic/dispatch.go | 23 +++++++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/pkg/mcp/agentic/dispatch.go b/pkg/mcp/agentic/dispatch.go index c487ec0..6a96c8a 100644 --- a/pkg/mcp/agentic/dispatch.go +++ b/pkg/mcp/agentic/dispatch.go @@ -155,7 +155,19 @@ func (s *PrepSubsystem) dispatch(ctx context.Context, req *mcp.CallToolRequest, }, nil } - // Step 3: Spawn agent as a detached process + // Step 3: Write status BEFORE spawning so concurrent dispatches + // see this workspace as "running" during the concurrency check. + writeStatus(wsDir, &WorkspaceStatus{ + Status: "running", + Agent: input.Agent, + Repo: input.Repo, + Org: input.Org, + Task: input.Task, + StartedAt: time.Now(), + Runs: 1, + }) + + // Step 4: Spawn agent as a detached process // Uses Setpgid so the agent survives parent (MCP server) death. // Output goes directly to log file (not buffered in memory). command, args, err := agentCommand(input.Agent, prompt) @@ -191,12 +203,19 @@ func (s *PrepSubsystem) dispatch(ctx context.Context, req *mcp.CallToolRequest, if err := cmd.Start(); err != nil { outFile.Close() + // Revert status so the slot is freed + writeStatus(wsDir, &WorkspaceStatus{ + Status: "failed", + Agent: input.Agent, + Repo: input.Repo, + Task: input.Task, + }) return nil, DispatchOutput{}, coreerr.E("dispatch", "failed to spawn "+input.Agent, err) } pid := cmd.Process.Pid - // Write initial status + // Update status with PID now that agent is running writeStatus(wsDir, &WorkspaceStatus{ Status: "running", Agent: input.Agent, From 2bbc8063cf3ecba9014f3dcfbaf2a2b76139ae57 Mon Sep 17 00:00:00 2001 From: Snider Date: Sun, 22 Mar 2026 06:47:04 +0000 Subject: [PATCH 5/9] feat(agentic): auto-copy AX spec + Core source into agent workspaces prepWorkspace now copies RFC-025-AGENT-EXPERIENCE.md and all Core .go files into .core/reference/ in every dispatched workspace. Agents can read the AX conventions and Core API without network access. Co-Authored-By: Virgil --- pkg/mcp/agentic/prep.go | 35 +++++++++++++++++++++++++++++++++-- 1 file changed, 33 insertions(+), 2 deletions(-) diff --git a/pkg/mcp/agentic/prep.go b/pkg/mcp/agentic/prep.go index d10bdc1..8ea3590 100644 --- a/pkg/mcp/agentic/prep.go +++ b/pkg/mcp/agentic/prep.go @@ -227,12 +227,15 @@ func (s *PrepSubsystem) prepWorkspace(ctx context.Context, _ *mcp.CallToolReques // 8. Copy spec files into specs/ out.SpecFiles = s.copySpecs(wsDir) - // 9. Write PLAN.md from template (if specified) + // 9. Copy AX reference files into .core/reference/ + s.copyReference(wsDir) + + // 11. Write PLAN.md from template (if specified) if input.PlanTemplate != "" { s.writePlanFromTemplate(input.PlanTemplate, input.Variables, input.Task, wsDir) } - // 10. Write prompt template + // 11. Write prompt template s.writePromptTemplate(input.Template, wsDir) out.Success = true @@ -473,6 +476,34 @@ func (s *PrepSubsystem) copySpecs(wsDir string) int { return count } +// copyReference copies the AX spec and Core source files into .core/reference/ +// so dispatched agents can read the conventions and API without network access. +func (s *PrepSubsystem) copyReference(wsDir string) { + refDir := filepath.Join(wsDir, "src", ".core", "reference") + coreio.Local.EnsureDir(refDir) + + // Copy AX spec from docs repo + axSpec := filepath.Join(s.codePath, "core", "docs", "docs", "specs", "RFC-025-AGENT-EXPERIENCE.md") + if data, err := coreio.Local.Read(axSpec); err == nil { + coreio.Local.Write(filepath.Join(refDir, "RFC-025-AGENT-EXPERIENCE.md"), data) + } + + // Copy Core Go source files for API reference + coreGoDir := filepath.Join(s.codePath, "core", "go") + entries, err := coreio.Local.List(coreGoDir) + if err != nil { + return + } + for _, entry := range entries { + if entry.IsDir() || filepath.Ext(entry.Name()) != ".go" { + continue + } + if data, err := coreio.Local.Read(filepath.Join(coreGoDir, entry.Name())); err == nil { + coreio.Local.Write(filepath.Join(refDir, entry.Name()), data) + } + } +} + func (s *PrepSubsystem) generateContext(ctx context.Context, repo, wsDir string) int { if s.brainKey == "" { return 0 From 517afe627fa57ae1344631c800dfd4bd399544af Mon Sep 17 00:00:00 2001 From: Snider Date: Sun, 22 Mar 2026 07:00:03 +0000 Subject: [PATCH 6/9] =?UTF-8?q?revert(agentic):=20remove=20hardcoded=20cop?= =?UTF-8?q?yReference=20=E2=80=94=20use=20embedded=20templates?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Reference files are now embedded in core-agent's workspace template (pkg/lib/workspace/default/.core/reference/). No hardcoded paths needed. Co-Authored-By: Virgil --- pkg/mcp/agentic/prep.go | 35 ++--------------------------------- 1 file changed, 2 insertions(+), 33 deletions(-) diff --git a/pkg/mcp/agentic/prep.go b/pkg/mcp/agentic/prep.go index 8ea3590..d10bdc1 100644 --- a/pkg/mcp/agentic/prep.go +++ b/pkg/mcp/agentic/prep.go @@ -227,15 +227,12 @@ func (s *PrepSubsystem) prepWorkspace(ctx context.Context, _ *mcp.CallToolReques // 8. Copy spec files into specs/ out.SpecFiles = s.copySpecs(wsDir) - // 9. Copy AX reference files into .core/reference/ - s.copyReference(wsDir) - - // 11. Write PLAN.md from template (if specified) + // 9. Write PLAN.md from template (if specified) if input.PlanTemplate != "" { s.writePlanFromTemplate(input.PlanTemplate, input.Variables, input.Task, wsDir) } - // 11. Write prompt template + // 10. Write prompt template s.writePromptTemplate(input.Template, wsDir) out.Success = true @@ -476,34 +473,6 @@ func (s *PrepSubsystem) copySpecs(wsDir string) int { return count } -// copyReference copies the AX spec and Core source files into .core/reference/ -// so dispatched agents can read the conventions and API without network access. -func (s *PrepSubsystem) copyReference(wsDir string) { - refDir := filepath.Join(wsDir, "src", ".core", "reference") - coreio.Local.EnsureDir(refDir) - - // Copy AX spec from docs repo - axSpec := filepath.Join(s.codePath, "core", "docs", "docs", "specs", "RFC-025-AGENT-EXPERIENCE.md") - if data, err := coreio.Local.Read(axSpec); err == nil { - coreio.Local.Write(filepath.Join(refDir, "RFC-025-AGENT-EXPERIENCE.md"), data) - } - - // Copy Core Go source files for API reference - coreGoDir := filepath.Join(s.codePath, "core", "go") - entries, err := coreio.Local.List(coreGoDir) - if err != nil { - return - } - for _, entry := range entries { - if entry.IsDir() || filepath.Ext(entry.Name()) != ".go" { - continue - } - if data, err := coreio.Local.Read(filepath.Join(coreGoDir, entry.Name())); err == nil { - coreio.Local.Write(filepath.Join(refDir, entry.Name()), data) - } - } -} - func (s *PrepSubsystem) generateContext(ctx context.Context, repo, wsDir string) int { if s.brainKey == "" { return 0 From 1d461593401291b3902da15269ee4f0b886c4d3e Mon Sep 17 00:00:00 2001 From: Snider Date: Sun, 22 Mar 2026 07:46:44 +0000 Subject: [PATCH 7/9] =?UTF-8?q?feat(agentic):=20pipeline=20chaining=20?= =?UTF-8?q?=E2=80=94=20review=E2=86=92fix=E2=86=92verify=20in=20one=20disp?= =?UTF-8?q?atch?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit DispatchInput now accepts Pipeline []PipelineStep for follow-up steps. On agent completion, the next step auto-dispatches with {{.Findings}} replaced by the previous agent's output. Enables: dispatch(review) → auto(fix with findings) → auto(verify) WorkspaceStatus stores NextSteps for the completion handler. Co-Authored-By: Virgil --- pkg/mcp/agentic/dispatch.go | 83 ++++++++++++++++++++++++++++++++++--- pkg/mcp/agentic/status.go | 27 ++++++------ 2 files changed, 91 insertions(+), 19 deletions(-) diff --git a/pkg/mcp/agentic/dispatch.go b/pkg/mcp/agentic/dispatch.go index 6a96c8a..52e1837 100644 --- a/pkg/mcp/agentic/dispatch.go +++ b/pkg/mcp/agentic/dispatch.go @@ -17,6 +17,14 @@ import ( "github.com/modelcontextprotocol/go-sdk/mcp" ) +// PipelineStep defines a follow-up dispatch that runs after the current agent completes. +// The task can contain {{.Findings}} which gets replaced with the previous agent's output. +type PipelineStep struct { + Task string `json:"task"` // Task description ({{.Findings}} = previous output) + Agent string `json:"agent,omitempty"` // Agent type (default: same as parent) + Template string `json:"template,omitempty"` // Template (default: coding) +} + // DispatchInput is the input for agentic_dispatch. type DispatchInput struct { Repo string `json:"repo"` // Target repo (e.g. "go-io") @@ -29,6 +37,7 @@ type DispatchInput struct { Persona string `json:"persona,omitempty"` // Persona: engineering/backend-architect, testing/api-tester, etc. Issue int `json:"issue,omitempty"` // Forge issue to work from DryRun bool `json:"dry_run,omitempty"` // Preview without executing + Pipeline []PipelineStep `json:"pipeline,omitempty"` // Follow-up steps: review → fix → verify } // DispatchOutput is the output for agentic_dispatch. @@ -227,17 +236,33 @@ func (s *PrepSubsystem) dispatch(ctx context.Context, req *mcp.CallToolRequest, Runs: 1, }) + // Store pipeline steps in status for the completion handler + if len(input.Pipeline) > 0 { + if st, err := readStatus(wsDir); err == nil { + st.NextSteps = input.Pipeline + writeStatus(wsDir, st) + } + } + // Background goroutine: close file handle when process exits, - // update status, then drain queue if a slot opened up. + // update status, run pipeline next step, then drain queue. go func() { cmd.Wait() outFile.Close() - // Update status to completed - if st, err := readStatus(wsDir); err == nil { - st.Status = "completed" - st.PID = 0 - writeStatus(wsDir, st) + st, err := readStatus(wsDir) + if err != nil { + s.drainQueue() + return + } + + st.Status = "completed" + st.PID = 0 + writeStatus(wsDir, st) + + // Pipeline: if there are next steps, dispatch the next one + if len(st.NextSteps) > 0 { + s.runNextPipelineStep(ctx, req, wsDir, st) } // Ingest scan findings as issues @@ -256,3 +281,49 @@ func (s *PrepSubsystem) dispatch(ctx context.Context, req *mcp.CallToolRequest, OutputFile: outputFile, }, nil } + +// runNextPipelineStep reads the completed agent's output, injects findings +// into the next step's task, and dispatches it. For coding→verify loops, +// it re-dispatches the fix step if verify finds non-exception findings. +func (s *PrepSubsystem) runNextPipelineStep(ctx context.Context, req *mcp.CallToolRequest, wsDir string, st *WorkspaceStatus) { + // Read the agent's output log for findings + logFiles, _ := filepath.Glob(filepath.Join(wsDir, "agent-*.log")) + findings := "" + if len(logFiles) > 0 { + if data, err := coreio.Local.Read(logFiles[len(logFiles)-1]); err == nil { + // Extract last 2000 chars as findings summary + if len(data) > 2000 { + findings = data[len(data)-2000:] + } else { + findings = data + } + } + } + + // Pop the next step + step := st.NextSteps[0] + remaining := st.NextSteps[1:] + + // Default agent/template from parent + agent := step.Agent + if agent == "" { + agent = st.Agent + } + template := step.Template + if template == "" { + template = "coding" + } + + // Replace {{.Findings}} in task with actual findings + task := strings.ReplaceAll(step.Task, "{{.Findings}}", findings) + + // Dispatch next step with remaining pipeline + s.dispatch(ctx, req, DispatchInput{ + Repo: st.Repo, + Org: st.Org, + Task: task, + Agent: agent, + Template: template, + Pipeline: remaining, + }) +} diff --git a/pkg/mcp/agentic/status.go b/pkg/mcp/agentic/status.go index 2aa4fae..028f04b 100644 --- a/pkg/mcp/agentic/status.go +++ b/pkg/mcp/agentic/status.go @@ -29,19 +29,20 @@ import ( // WorkspaceStatus represents the current state of an agent workspace. type WorkspaceStatus struct { - Status string `json:"status"` // running, completed, blocked, failed - Agent string `json:"agent"` // gemini, claude, codex - Repo string `json:"repo"` // target repo - Org string `json:"org,omitempty"` // forge org (e.g. "core") - Task string `json:"task"` // task description - Branch string `json:"branch,omitempty"` // git branch name - Issue int `json:"issue,omitempty"` // forge issue number - PID int `json:"pid,omitempty"` // process ID (if running) - StartedAt time.Time `json:"started_at"` // when dispatch started - UpdatedAt time.Time `json:"updated_at"` // last status change - Question string `json:"question,omitempty"` // from BLOCKED.md - Runs int `json:"runs"` // how many times dispatched/resumed - PRURL string `json:"pr_url,omitempty"` // pull request URL (after PR created) + Status string `json:"status"` // running, completed, blocked, failed + Agent string `json:"agent"` // gemini, claude, codex + Repo string `json:"repo"` // target repo + Org string `json:"org,omitempty"` // forge org (e.g. "core") + Task string `json:"task"` // task description + Branch string `json:"branch,omitempty"` // git branch name + Issue int `json:"issue,omitempty"` // forge issue number + PID int `json:"pid,omitempty"` // process ID (if running) + StartedAt time.Time `json:"started_at"` // when dispatch started + UpdatedAt time.Time `json:"updated_at"` // last status change + Question string `json:"question,omitempty"` // from BLOCKED.md + Runs int `json:"runs"` // how many times dispatched/resumed + PRURL string `json:"pr_url,omitempty"` // pull request URL (after PR created) + NextSteps []PipelineStep `json:"next_steps,omitempty"` // remaining pipeline steps } func writeStatus(wsDir string, status *WorkspaceStatus) error { From 2992f872f0cf8f46683c13ec367e02207c8e84d5 Mon Sep 17 00:00:00 2001 From: Snider Date: Sun, 22 Mar 2026 07:56:28 +0000 Subject: [PATCH 8/9] revert(agentic): remove pipeline chaining from dispatch MCP SDK doesn't support nested struct slices in schema generation. Pipeline orchestration will be handled at a higher level. Co-Authored-By: Virgil --- pkg/mcp/agentic/dispatch.go | 82 +++---------------------------------- pkg/mcp/agentic/status.go | 1 - 2 files changed, 6 insertions(+), 77 deletions(-) diff --git a/pkg/mcp/agentic/dispatch.go b/pkg/mcp/agentic/dispatch.go index 52e1837..ce01daf 100644 --- a/pkg/mcp/agentic/dispatch.go +++ b/pkg/mcp/agentic/dispatch.go @@ -17,14 +17,6 @@ import ( "github.com/modelcontextprotocol/go-sdk/mcp" ) -// PipelineStep defines a follow-up dispatch that runs after the current agent completes. -// The task can contain {{.Findings}} which gets replaced with the previous agent's output. -type PipelineStep struct { - Task string `json:"task"` // Task description ({{.Findings}} = previous output) - Agent string `json:"agent,omitempty"` // Agent type (default: same as parent) - Template string `json:"template,omitempty"` // Template (default: coding) -} - // DispatchInput is the input for agentic_dispatch. type DispatchInput struct { Repo string `json:"repo"` // Target repo (e.g. "go-io") @@ -37,7 +29,6 @@ type DispatchInput struct { Persona string `json:"persona,omitempty"` // Persona: engineering/backend-architect, testing/api-tester, etc. Issue int `json:"issue,omitempty"` // Forge issue to work from DryRun bool `json:"dry_run,omitempty"` // Preview without executing - Pipeline []PipelineStep `json:"pipeline,omitempty"` // Follow-up steps: review → fix → verify } // DispatchOutput is the output for agentic_dispatch. @@ -236,33 +227,17 @@ func (s *PrepSubsystem) dispatch(ctx context.Context, req *mcp.CallToolRequest, Runs: 1, }) - // Store pipeline steps in status for the completion handler - if len(input.Pipeline) > 0 { - if st, err := readStatus(wsDir); err == nil { - st.NextSteps = input.Pipeline - writeStatus(wsDir, st) - } - } - // Background goroutine: close file handle when process exits, - // update status, run pipeline next step, then drain queue. + // update status, then drain queue if a slot opened up. go func() { cmd.Wait() outFile.Close() - st, err := readStatus(wsDir) - if err != nil { - s.drainQueue() - return - } - - st.Status = "completed" - st.PID = 0 - writeStatus(wsDir, st) - - // Pipeline: if there are next steps, dispatch the next one - if len(st.NextSteps) > 0 { - s.runNextPipelineStep(ctx, req, wsDir, st) + // Update status to completed + if st, err := readStatus(wsDir); err == nil { + st.Status = "completed" + st.PID = 0 + writeStatus(wsDir, st) } // Ingest scan findings as issues @@ -282,48 +257,3 @@ func (s *PrepSubsystem) dispatch(ctx context.Context, req *mcp.CallToolRequest, }, nil } -// runNextPipelineStep reads the completed agent's output, injects findings -// into the next step's task, and dispatches it. For coding→verify loops, -// it re-dispatches the fix step if verify finds non-exception findings. -func (s *PrepSubsystem) runNextPipelineStep(ctx context.Context, req *mcp.CallToolRequest, wsDir string, st *WorkspaceStatus) { - // Read the agent's output log for findings - logFiles, _ := filepath.Glob(filepath.Join(wsDir, "agent-*.log")) - findings := "" - if len(logFiles) > 0 { - if data, err := coreio.Local.Read(logFiles[len(logFiles)-1]); err == nil { - // Extract last 2000 chars as findings summary - if len(data) > 2000 { - findings = data[len(data)-2000:] - } else { - findings = data - } - } - } - - // Pop the next step - step := st.NextSteps[0] - remaining := st.NextSteps[1:] - - // Default agent/template from parent - agent := step.Agent - if agent == "" { - agent = st.Agent - } - template := step.Template - if template == "" { - template = "coding" - } - - // Replace {{.Findings}} in task with actual findings - task := strings.ReplaceAll(step.Task, "{{.Findings}}", findings) - - // Dispatch next step with remaining pipeline - s.dispatch(ctx, req, DispatchInput{ - Repo: st.Repo, - Org: st.Org, - Task: task, - Agent: agent, - Template: template, - Pipeline: remaining, - }) -} diff --git a/pkg/mcp/agentic/status.go b/pkg/mcp/agentic/status.go index 028f04b..980c2f0 100644 --- a/pkg/mcp/agentic/status.go +++ b/pkg/mcp/agentic/status.go @@ -42,7 +42,6 @@ type WorkspaceStatus struct { Question string `json:"question,omitempty"` // from BLOCKED.md Runs int `json:"runs"` // how many times dispatched/resumed PRURL string `json:"pr_url,omitempty"` // pull request URL (after PR created) - NextSteps []PipelineStep `json:"next_steps,omitempty"` // remaining pipeline steps } func writeStatus(wsDir string, status *WorkspaceStatus) error { From d3c721043318f90e65b9ae525b3bc53608b1c397 Mon Sep 17 00:00:00 2001 From: Virgil Date: Mon, 23 Mar 2026 14:33:35 +0000 Subject: [PATCH 9/9] fix(mcp): harden transport auth and workspace prep path validation Co-Authored-By: Virgil --- cmd/mcpcmd/cmd_mcp.go | 19 +++-- go.mod | 3 +- go.sum | 2 - pkg/mcp/agentic/prep.go | 128 +++++++++++++++++++++++-------- pkg/mcp/agentic/prep_test.go | 96 +++++++++++++++++++++++ pkg/mcp/tools_process_ci_test.go | 30 ++++---- pkg/mcp/transport_http.go | 22 ++++-- pkg/mcp/transport_http_test.go | 26 +++++-- 8 files changed, 252 insertions(+), 74 deletions(-) create mode 100644 pkg/mcp/agentic/prep_test.go diff --git a/cmd/mcpcmd/cmd_mcp.go b/cmd/mcpcmd/cmd_mcp.go index 3c2c1c0..3524dd9 100644 --- a/cmd/mcpcmd/cmd_mcp.go +++ b/cmd/mcpcmd/cmd_mcp.go @@ -61,24 +61,23 @@ func AddMCPCommands(root *cli.Command) { } func runServe() error { - // Build MCP service options - var opts []mcp.Option + opts := mcp.Options{} if workspaceFlag != "" { - opts = append(opts, mcp.WithWorkspaceRoot(workspaceFlag)) + opts.WorkspaceRoot = workspaceFlag } else { // Explicitly unrestricted when no workspace specified - opts = append(opts, mcp.WithWorkspaceRoot("")) + opts.Unrestricted = true } - // Register OpenBrain subsystem (direct HTTP to api.lthn.sh) - opts = append(opts, mcp.WithSubsystem(brain.NewDirect())) - - // Register agentic subsystem (workspace prep, agent orchestration) - opts = append(opts, mcp.WithSubsystem(agentic.NewPrep())) + // Register OpenBrain and agentic subsystems + opts.Subsystems = []mcp.Subsystem{ + brain.NewDirect(), + agentic.NewPrep(), + } // Create the MCP service - svc, err := mcp.New(opts...) + svc, err := mcp.New(opts) if err != nil { return cli.Wrap(err, "create MCP service") } diff --git a/go.mod b/go.mod index 8960f2b..e231f69 100644 --- a/go.mod +++ b/go.mod @@ -3,9 +3,9 @@ module forge.lthn.ai/core/mcp go 1.26.0 require ( - dappco.re/go/core v0.4.7 forge.lthn.ai/core/api v0.1.5 forge.lthn.ai/core/cli v0.3.7 + forge.lthn.ai/core/go v0.3.3 forge.lthn.ai/core/go-ai v0.1.12 forge.lthn.ai/core/go-io v0.1.7 forge.lthn.ai/core/go-log v0.0.4 @@ -21,7 +21,6 @@ require ( ) require ( - forge.lthn.ai/core/go v0.3.3 // indirect forge.lthn.ai/core/go-i18n v0.1.7 // indirect forge.lthn.ai/core/go-inference v0.1.6 // indirect github.com/99designs/gqlgen v0.17.88 // indirect diff --git a/go.sum b/go.sum index 77b90fe..cba2eb1 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,3 @@ -dappco.re/go/core v0.4.7 h1:KmIA/2lo6rl1NMtLrKqCWfMlUqpDZYH3q0/d10dTtGA= -dappco.re/go/core v0.4.7/go.mod h1:f2/tBZ3+3IqDrg2F5F598llv0nmb/4gJVCFzM5geE4A= forge.lthn.ai/core/api v0.1.5 h1:NwZrcOyBjaiz5/cn0n0tnlMUodi8Or6FHMx59C7Kv2o= forge.lthn.ai/core/api v0.1.5/go.mod h1:PBnaWyOVXSOGy+0x2XAPUFMYJxQ2CNhppia/D06ZPII= forge.lthn.ai/core/cli v0.3.7 h1:1GrbaGg0wDGHr6+klSbbGyN/9sSbHvFbdySJznymhwg= diff --git a/pkg/mcp/agentic/prep.go b/pkg/mcp/agentic/prep.go index d10bdc1..d241e35 100644 --- a/pkg/mcp/agentic/prep.go +++ b/pkg/mcp/agentic/prep.go @@ -25,13 +25,13 @@ import ( // PrepSubsystem provides agentic MCP tools. type PrepSubsystem struct { - forgeURL string - forgeToken string - brainURL string - brainKey string - specsPath string - codePath string - client *http.Client + forgeURL string + forgeToken string + brainURL string + brainKey string + specsPath string + codePath string + client *http.Client } // NewPrep creates an agentic subsystem. @@ -51,13 +51,13 @@ func NewPrep() *PrepSubsystem { } return &PrepSubsystem{ - forgeURL: envOr("FORGE_URL", "https://forge.lthn.ai"), - forgeToken: forgeToken, - brainURL: envOr("CORE_BRAIN_URL", "https://api.lthn.sh"), - brainKey: brainKey, - specsPath: envOr("SPECS_PATH", filepath.Join(home, "Code", "host-uk", "specs")), - codePath: envOr("CODE_PATH", filepath.Join(home, "Code")), - client: &http.Client{Timeout: 30 * time.Second}, + forgeURL: envOr("FORGE_URL", "https://forge.lthn.ai"), + forgeToken: forgeToken, + brainURL: envOr("CORE_BRAIN_URL", "https://api.lthn.sh"), + brainKey: brainKey, + specsPath: envOr("SPECS_PATH", filepath.Join(home, "Code", "host-uk", "specs")), + codePath: envOr("CODE_PATH", filepath.Join(home, "Code")), + client: &http.Client{Timeout: 30 * time.Second}, } } @@ -68,6 +68,42 @@ func envOr(key, fallback string) string { return fallback } +func sanitizeRepoPathSegment(value, field string, allowSubdirs bool) (string, error) { + if strings.TrimSpace(value) != value { + return "", coreerr.E("prepWorkspace", field+" contains whitespace", nil) + } + if value == "" { + return "", nil + } + if strings.Contains(value, "\\") { + return "", coreerr.E("prepWorkspace", field+" contains invalid path separator", nil) + } + + parts := strings.Split(value, "/") + if !allowSubdirs && len(parts) != 1 { + return "", coreerr.E("prepWorkspace", field+" may not contain subdirectories", nil) + } + + for _, part := range parts { + if part == "" || part == "." || part == ".." { + return "", coreerr.E("prepWorkspace", field+" contains invalid path segment", nil) + } + for _, r := range part { + switch { + case r >= 'a' && r <= 'z', + r >= 'A' && r <= 'Z', + r >= '0' && r <= '9', + r == '-' || r == '_' || r == '.': + continue + default: + return "", coreerr.E("prepWorkspace", field+" contains invalid characters", nil) + } + } + } + + return value, nil +} + // Name implements mcp.Subsystem. func (s *PrepSubsystem) Name() string { return "agentic" } @@ -117,20 +153,41 @@ type PrepInput struct { // PrepOutput is the output for agentic_prep_workspace. type PrepOutput struct { - Success bool `json:"success"` - WorkspaceDir string `json:"workspace_dir"` - WikiPages int `json:"wiki_pages"` - SpecFiles int `json:"spec_files"` - Memories int `json:"memories"` - Consumers int `json:"consumers"` - ClaudeMd bool `json:"claude_md"` - GitLog int `json:"git_log_entries"` + Success bool `json:"success"` + WorkspaceDir string `json:"workspace_dir"` + WikiPages int `json:"wiki_pages"` + SpecFiles int `json:"spec_files"` + Memories int `json:"memories"` + Consumers int `json:"consumers"` + ClaudeMd bool `json:"claude_md"` + GitLog int `json:"git_log_entries"` } func (s *PrepSubsystem) prepWorkspace(ctx context.Context, _ *mcp.CallToolRequest, input PrepInput) (*mcp.CallToolResult, PrepOutput, error) { if input.Repo == "" { return nil, PrepOutput{}, coreerr.E("prepWorkspace", "repo is required", nil) } + + repo, err := sanitizeRepoPathSegment(input.Repo, "repo", false) + if err != nil { + return nil, PrepOutput{}, err + } + input.Repo = repo + + planTemplate, err := sanitizeRepoPathSegment(input.PlanTemplate, "plan_template", false) + if err != nil { + return nil, PrepOutput{}, err + } + input.PlanTemplate = planTemplate + + persona := input.Persona + if persona != "" { + persona, err = sanitizeRepoPathSegment(persona, "persona", true) + if err != nil { + return nil, PrepOutput{}, err + } + } + if input.Org == "" { input.Org = "core" } @@ -154,7 +211,9 @@ func (s *PrepSubsystem) prepWorkspace(ctx context.Context, _ *mcp.CallToolReques // 1. Clone repo into src/ and create feature branch srcDir := filepath.Join(wsDir, "src") cloneCmd := exec.CommandContext(ctx, "git", "clone", repoPath, srcDir) - cloneCmd.Run() + if err := cloneCmd.Run(); err != nil { + return nil, PrepOutput{}, coreerr.E("prepWorkspace", "failed to clone repository", err) + } // Create feature branch taskSlug := strings.Map(func(r rune) rune { @@ -170,11 +229,14 @@ func (s *PrepSubsystem) prepWorkspace(ctx context.Context, _ *mcp.CallToolReques taskSlug = taskSlug[:40] } taskSlug = strings.Trim(taskSlug, "-") - branchName := fmt.Sprintf("agent/%s", taskSlug) - - branchCmd := exec.CommandContext(ctx, "git", "checkout", "-b", branchName) - branchCmd.Dir = srcDir - branchCmd.Run() + if taskSlug != "" { + branchName := fmt.Sprintf("agent/%s", taskSlug) + branchCmd := exec.CommandContext(ctx, "git", "checkout", "-b", branchName) + branchCmd.Dir = srcDir + if err := branchCmd.Run(); err != nil { + return nil, PrepOutput{}, coreerr.E("prepWorkspace", "failed to create branch", err) + } + } // Create context dirs inside src/ coreio.Local.EnsureDir(filepath.Join(srcDir, "kb")) @@ -196,8 +258,8 @@ func (s *PrepSubsystem) prepWorkspace(ctx context.Context, _ *mcp.CallToolReques } // Copy persona if specified - if input.Persona != "" { - personaPath := filepath.Join(s.codePath, "core", "agent", "prompts", "personas", input.Persona+".md") + if persona != "" { + personaPath := filepath.Join(s.codePath, "core", "agent", "prompts", "personas", persona+".md") if data, err := coreio.Local.Read(personaPath); err == nil { coreio.Local.Write(filepath.Join(wsDir, "src", "PERSONA.md"), data) } @@ -338,9 +400,9 @@ func (s *PrepSubsystem) writePlanFromTemplate(templateSlug string, variables map Description string `yaml:"description"` Guidelines []string `yaml:"guidelines"` Phases []struct { - Name string `yaml:"name"` - Description string `yaml:"description"` - Tasks []any `yaml:"tasks"` + Name string `yaml:"name"` + Description string `yaml:"description"` + Tasks []any `yaml:"tasks"` } `yaml:"phases"` } diff --git a/pkg/mcp/agentic/prep_test.go b/pkg/mcp/agentic/prep_test.go new file mode 100644 index 0000000..51f3a60 --- /dev/null +++ b/pkg/mcp/agentic/prep_test.go @@ -0,0 +1,96 @@ +// SPDX-License-Identifier: EUPL-1.2 + +package agentic + +import ( + "context" + "strings" + "testing" +) + +func TestSanitizeRepoPathSegment_Good(t *testing.T) { + t.Run("repo", func(t *testing.T) { + value, err := sanitizeRepoPathSegment("go-io", "repo", false) + if err != nil { + t.Fatalf("expected valid repo name, got error: %v", err) + } + if value != "go-io" { + t.Fatalf("expected normalized value, got: %q", value) + } + }) + + t.Run("persona", func(t *testing.T) { + value, err := sanitizeRepoPathSegment("engineering/backend-architect", "persona", true) + if err != nil { + t.Fatalf("expected valid persona path, got error: %v", err) + } + if value != "engineering/backend-architect" { + t.Fatalf("expected persona path, got: %q", value) + } + }) +} + +func TestSanitizeRepoPathSegment_Bad(t *testing.T) { + cases := []struct { + name string + value string + allowPath bool + }{ + {"repo segment traversal", "../repo", false}, + {"repo nested path", "team/repo", false}, + {"plan template traversal", "../secret", false}, + {"persona traversal", "engineering/../../admin", true}, + {"backslash", "org\\repo", false}, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + _, err := sanitizeRepoPathSegment(tc.value, tc.name, tc.allowPath) + if err == nil { + t.Fatal("expected error") + } + }) + } +} + +func TestPrepWorkspace_Bad_BadRepoTraversal(t *testing.T) { + s := &PrepSubsystem{codePath: t.TempDir()} + + _, _, err := s.prepWorkspace(context.Background(), nil, PrepInput{Repo: "../repo"}) + if err == nil { + t.Fatal("expected error") + } + if !strings.Contains(strings.ToLower(err.Error()), "repo") { + t.Fatalf("expected repo error, got %q", err) + } +} + +func TestPrepWorkspace_Bad_BadPersonaTraversal(t *testing.T) { + s := &PrepSubsystem{codePath: t.TempDir()} + + _, _, err := s.prepWorkspace(context.Background(), nil, PrepInput{ + Repo: "repo", + Persona: "engineering/../../admin", + }) + if err == nil { + t.Fatal("expected error") + } + if !strings.Contains(strings.ToLower(err.Error()), "persona") { + t.Fatalf("expected persona error, got %q", err) + } +} + +func TestPrepWorkspace_Bad_BadPlanTemplateTraversal(t *testing.T) { + s := &PrepSubsystem{codePath: t.TempDir()} + + _, _, err := s.prepWorkspace(context.Background(), nil, PrepInput{ + Repo: "repo", + PlanTemplate: "../secret", + }) + if err == nil { + t.Fatal("expected error") + } + if !strings.Contains(strings.ToLower(err.Error()), "plan_template") { + t.Fatalf("expected plan template error, got %q", err) + } +} diff --git a/pkg/mcp/tools_process_ci_test.go b/pkg/mcp/tools_process_ci_test.go index 79ebe69..4ab8dbe 100644 --- a/pkg/mcp/tools_process_ci_test.go +++ b/pkg/mcp/tools_process_ci_test.go @@ -6,36 +6,34 @@ import ( "testing" "time" - "dappco.re/go/core" "forge.lthn.ai/core/go-process" + core "forge.lthn.ai/core/go/pkg/core" ) // newTestProcessService creates a real process.Service backed by a core.Core for CI tests. func newTestProcessService(t *testing.T) *process.Service { t.Helper() - c := core.New() - raw, err := process.NewService(process.Options{})(c) + c, err := core.New( + core.WithName("process", process.NewService(process.Options{})), + ) if err != nil { t.Fatalf("Failed to create process service: %v", err) } - svc := raw.(*process.Service) - resultFrom := func(err error) core.Result { - if err != nil { - return core.Result{Value: err} - } - return core.Result{OK: true} + svc, err := core.ServiceFor[*process.Service](c, "process") + if err != nil { + t.Fatalf("Failed to get process service: %v", err) } - c.Service("process", core.Service{ - OnStart: func() core.Result { return resultFrom(svc.OnStartup(context.Background())) }, - OnStop: func() core.Result { return resultFrom(svc.OnShutdown(context.Background())) }, + + if err := svc.OnStartup(context.Background()); err != nil { + t.Fatalf("Failed to start process service: %v", err) + } + t.Cleanup(func() { + _ = svc.OnShutdown(context.Background()) + core.ClearInstance() }) - if r := c.ServiceStartup(context.Background(), nil); !r.OK { - t.Fatalf("Failed to start core: %v", r.Value) - } - t.Cleanup(func() { c.ServiceShutdown(context.Background()) }) return svc } diff --git a/pkg/mcp/transport_http.go b/pkg/mcp/transport_http.go index 85840ad..cd25417 100644 --- a/pkg/mcp/transport_http.go +++ b/pkg/mcp/transport_http.go @@ -8,6 +8,7 @@ import ( "net" "net/http" "os" + "strings" "time" coreerr "forge.lthn.ai/core/go-log" @@ -81,18 +82,27 @@ func (s *Service) ServeHTTP(ctx context.Context, addr string) error { } // withAuth wraps an http.Handler with Bearer token authentication. -// If token is empty, authentication is disabled (passthrough). +// If token is empty, requests are rejected. func withAuth(token string, next http.Handler) http.Handler { - if token == "" { - return next - } return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if strings.TrimSpace(token) == "" { + w.Header().Set("WWW-Authenticate", `Bearer`) + http.Error(w, `{"error":"authentication not configured"}`, http.StatusUnauthorized) + return + } + auth := r.Header.Get("Authorization") - if len(auth) < 7 || auth[:7] != "Bearer " { + if !strings.HasPrefix(auth, "Bearer ") { http.Error(w, `{"error":"missing Bearer token"}`, http.StatusUnauthorized) return } - provided := auth[7:] + + provided := strings.TrimSpace(strings.TrimPrefix(auth, "Bearer ")) + if len(provided) == 0 { + http.Error(w, `{"error":"missing Bearer token"}`, http.StatusUnauthorized) + return + } + if subtle.ConstantTimeCompare([]byte(provided), []byte(token)) != 1 { http.Error(w, `{"error":"invalid token"}`, http.StatusUnauthorized) return diff --git a/pkg/mcp/transport_http_test.go b/pkg/mcp/transport_http_test.go index 1172d82..ec8dc10 100644 --- a/pkg/mcp/transport_http_test.go +++ b/pkg/mcp/transport_http_test.go @@ -157,19 +157,35 @@ func TestWithAuth_Bad_MissingToken(t *testing.T) { } } -func TestWithAuth_Good_EmptyTokenPassthrough(t *testing.T) { +func TestWithAuth_Bad_EmptyConfiguredToken(t *testing.T) { handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(200) }) - // Empty token disables auth + // Empty token now requires explicit configuration wrapped := withAuth("", handler) req, _ := http.NewRequest("GET", "/", nil) rr := &fakeResponseWriter{code: 200} wrapped.ServeHTTP(rr, req) - if rr.code != 200 { - t.Errorf("expected 200 with auth disabled, got %d", rr.code) + if rr.code != 401 { + t.Errorf("expected 401 with empty configured token, got %d", rr.code) + } +} + +func TestWithAuth_Bad_NonBearerToken(t *testing.T) { + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(200) + }) + + wrapped := withAuth("my-token", handler) + + req, _ := http.NewRequest("GET", "/", nil) + req.Header.Set("Authorization", "Token my-token") + rr := &fakeResponseWriter{code: 200} + wrapped.ServeHTTP(rr, req) + if rr.code != 401 { + t.Errorf("expected 401 with non-Bearer auth, got %d", rr.code) } } @@ -231,4 +247,4 @@ func (f *fakeResponseWriter) Header() http.Header { } func (f *fakeResponseWriter) Write(b []byte) (int, error) { return len(b), nil } -func (f *fakeResponseWriter) WriteHeader(code int) { f.code = code } +func (f *fakeResponseWriter) WriteHeader(code int) { f.code = code }