revert(agentic): remove pipeline chaining from dispatch

MCP SDK doesn't support nested struct slices in schema generation.
Pipeline orchestration will be handled at a higher level.

Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
Snider 2026-03-22 07:56:28 +00:00
parent 1d46159340
commit 2992f872f0
2 changed files with 6 additions and 77 deletions

View file

@ -17,14 +17,6 @@ import (
"github.com/modelcontextprotocol/go-sdk/mcp"
)
// PipelineStep defines a follow-up dispatch that runs after the current agent completes.
// The task can contain {{.Findings}} which gets replaced with the previous agent's output.
type PipelineStep struct {
Task string `json:"task"` // Task description ({{.Findings}} = previous output)
Agent string `json:"agent,omitempty"` // Agent type (default: same as parent)
Template string `json:"template,omitempty"` // Template (default: coding)
}
// DispatchInput is the input for agentic_dispatch.
type DispatchInput struct {
Repo string `json:"repo"` // Target repo (e.g. "go-io")
@ -37,7 +29,6 @@ type DispatchInput struct {
Persona string `json:"persona,omitempty"` // Persona: engineering/backend-architect, testing/api-tester, etc.
Issue int `json:"issue,omitempty"` // Forge issue to work from
DryRun bool `json:"dry_run,omitempty"` // Preview without executing
Pipeline []PipelineStep `json:"pipeline,omitempty"` // Follow-up steps: review → fix → verify
}
// DispatchOutput is the output for agentic_dispatch.
@ -236,33 +227,17 @@ func (s *PrepSubsystem) dispatch(ctx context.Context, req *mcp.CallToolRequest,
Runs: 1,
})
// Store pipeline steps in status for the completion handler
if len(input.Pipeline) > 0 {
if st, err := readStatus(wsDir); err == nil {
st.NextSteps = input.Pipeline
writeStatus(wsDir, st)
}
}
// Background goroutine: close file handle when process exits,
// update status, run pipeline next step, then drain queue.
// update status, then drain queue if a slot opened up.
go func() {
cmd.Wait()
outFile.Close()
st, err := readStatus(wsDir)
if err != nil {
s.drainQueue()
return
}
st.Status = "completed"
st.PID = 0
writeStatus(wsDir, st)
// Pipeline: if there are next steps, dispatch the next one
if len(st.NextSteps) > 0 {
s.runNextPipelineStep(ctx, req, wsDir, st)
// Update status to completed
if st, err := readStatus(wsDir); err == nil {
st.Status = "completed"
st.PID = 0
writeStatus(wsDir, st)
}
// Ingest scan findings as issues
@ -282,48 +257,3 @@ func (s *PrepSubsystem) dispatch(ctx context.Context, req *mcp.CallToolRequest,
}, nil
}
// runNextPipelineStep reads the completed agent's output, injects findings
// into the next step's task, and dispatches it. For coding→verify loops,
// it re-dispatches the fix step if verify finds non-exception findings.
func (s *PrepSubsystem) runNextPipelineStep(ctx context.Context, req *mcp.CallToolRequest, wsDir string, st *WorkspaceStatus) {
// Read the agent's output log for findings
logFiles, _ := filepath.Glob(filepath.Join(wsDir, "agent-*.log"))
findings := ""
if len(logFiles) > 0 {
if data, err := coreio.Local.Read(logFiles[len(logFiles)-1]); err == nil {
// Extract last 2000 chars as findings summary
if len(data) > 2000 {
findings = data[len(data)-2000:]
} else {
findings = data
}
}
}
// Pop the next step
step := st.NextSteps[0]
remaining := st.NextSteps[1:]
// Default agent/template from parent
agent := step.Agent
if agent == "" {
agent = st.Agent
}
template := step.Template
if template == "" {
template = "coding"
}
// Replace {{.Findings}} in task with actual findings
task := strings.ReplaceAll(step.Task, "{{.Findings}}", findings)
// Dispatch next step with remaining pipeline
s.dispatch(ctx, req, DispatchInput{
Repo: st.Repo,
Org: st.Org,
Task: task,
Agent: agent,
Template: template,
Pipeline: remaining,
})
}

View file

@ -42,7 +42,6 @@ type WorkspaceStatus struct {
Question string `json:"question,omitempty"` // from BLOCKED.md
Runs int `json:"runs"` // how many times dispatched/resumed
PRURL string `json:"pr_url,omitempty"` // pull request URL (after PR created)
NextSteps []PipelineStep `json:"next_steps,omitempty"` // remaining pipeline steps
}
func writeStatus(wsDir string, status *WorkspaceStatus) error {