diff --git a/pkg/agentic/commands.go b/pkg/agentic/commands.go index a905c74..123bd2c 100644 --- a/pkg/agentic/commands.go +++ b/pkg/agentic/commands.go @@ -111,7 +111,7 @@ func (s *PrepSubsystem) cmdRunTask(options core.Options) core.Result { } func (s *PrepSubsystem) cmdRunFlow(options core.Options) core.Result { - return s.runFlowCommand(options, "run flow") + return s.runFlowExecutionCommand(options, "run flow") } func (s *PrepSubsystem) cmdFlowPreview(options core.Options) core.Result { @@ -1021,13 +1021,17 @@ func parseIntString(s string) int { } type FlowRunOutput struct { - Success bool `json:"success"` - Source string `json:"source,omitempty"` - Name string `json:"name,omitempty"` - Description string `json:"description,omitempty"` - Steps int `json:"steps,omitempty"` - ResolvedSteps int `json:"resolved_steps,omitempty"` - Parsed bool `json:"parsed,omitempty"` + Success bool `json:"success"` + Source string `json:"source,omitempty"` + Name string `json:"name,omitempty"` + Description string `json:"description,omitempty"` + Steps int `json:"steps,omitempty"` + ResolvedSteps int `json:"resolved_steps,omitempty"` + Parsed bool `json:"parsed,omitempty"` + Executed int `json:"executed,omitempty"` + Passed int `json:"passed,omitempty"` + Failed int `json:"failed,omitempty"` + StepResults []FlowRunStepOutput `json:"step_results,omitempty"` } type flowDefinition struct { @@ -1037,17 +1041,20 @@ type flowDefinition struct { } type flowDefinitionStep struct { - Name string `yaml:"name"` - Run string `yaml:"run"` - Flow string `yaml:"flow"` - Agent string `yaml:"agent"` - Prompt string `yaml:"prompt"` - Template string `yaml:"template"` - Timeout string `yaml:"timeout"` - When string `yaml:"when"` - Output string `yaml:"output"` - Gate string `yaml:"gate"` - Parallel []flowDefinitionStep `yaml:"parallel"` + Name string `yaml:"name"` + Cmd string `yaml:"cmd"` + Args []string `yaml:"args"` + Run string `yaml:"run"` + Flow string `yaml:"flow"` + Agent string `yaml:"agent"` + Prompt string `yaml:"prompt"` + Template string `yaml:"template"` + Timeout string `yaml:"timeout"` + When string `yaml:"when"` + Output string `yaml:"output"` + Gate string `yaml:"gate"` + ContinueOnError bool `yaml:"continueOnError"` + Parallel []flowDefinitionStep `yaml:"parallel"` } type flowRunDocument struct { @@ -1109,7 +1116,7 @@ func parseFlowDefinition(content string) (flowDefinition, error) { if err := yaml.Unmarshal([]byte(content), &definition); err != nil { return flowDefinition{}, core.E("agentic.parseFlowDefinition", "invalid flow definition", err) } - if definition.Name == "" || len(definition.Steps) == 0 { + if definition.Name == "" { return flowDefinition{}, core.E("agentic.parseFlowDefinition", "invalid flow definition", nil) } return definition, nil @@ -1132,6 +1139,9 @@ func flowStepSummary(step flowDefinitionStep) string { if label == "" { label = core.Trim(step.Flow) } + if label == "" { + label = core.Trim(step.Cmd) + } if label == "" { label = core.Trim(step.Agent) } @@ -1145,6 +1155,8 @@ func flowStepSummary(step flowDefinitionStep) string { switch { case step.Flow != "": return core.Concat(label, ": flow ", step.Flow) + case step.Cmd != "": + return core.Concat(label, ": cmd ", flowStepCommandLine(step)) case step.Agent != "": return core.Concat(label, ": agent ", step.Agent) case step.Run != "": diff --git a/pkg/agentic/commands_flow_test.go b/pkg/agentic/commands_flow_test.go index 0ad80b1..2741ddf 100644 --- a/pkg/agentic/commands_flow_test.go +++ b/pkg/agentic/commands_flow_test.go @@ -3,15 +3,24 @@ package agentic import ( + "os" "testing" - "time" core "dappco.re/go/core" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) -func TestCommandsFlow_CmdRunFlow_Good_ReadsYamlFlowFile(t *testing.T) { +func newFlowCommandPrep() (*PrepSubsystem, *core.Core) { + c := core.New() + return &PrepSubsystem{ + ServiceRuntime: core.NewServiceRuntime(c, AgentOptions{}), + backoff: make(map[string]time.Time), + failCount: make(map[string]int), + }, c +} + +func TestCommandsFlow_CmdFlowPreview_Good_ReadsYamlFlowFile(t *testing.T) { dir := t.TempDir() flowPath := core.JoinPath(dir, "pkg", "lib", "flow", "verify") require.True(t, fs.EnsureDir(flowPath).OK) @@ -29,7 +38,7 @@ func TestCommandsFlow_CmdRunFlow_Good_ReadsYamlFlowFile(t *testing.T) { s := newTestPrep(t) output := captureStdout(t, func() { - r := s.cmdRunFlow(core.NewOptions(core.Option{Key: "_arg", Value: filePath})) + r := s.cmdFlowPreview(core.NewOptions(core.Option{Key: "_arg", Value: filePath})) require.True(t, r.OK) flowOutput, ok := r.Value.(FlowRunOutput) @@ -46,6 +55,71 @@ func TestCommandsFlow_CmdRunFlow_Good_ReadsYamlFlowFile(t *testing.T) { assert.Contains(t, output, "verify: flow verify/go-qa.yaml") } +func TestCommandsFlow_CmdRunFlow_Good_ExecutesSequentialSteps(t *testing.T) { + dir := t.TempDir() + filePath := core.JoinPath(dir, "execute-flow.yaml") + require.True(t, fs.Write(filePath, core.Concat( + "name: Execute Flow\n", + "description: Run registered commands\n", + "steps:\n", + " - name: first\n", + " cmd: flow/test-first\n", + " - name: second\n", + " cmd: flow/test-second\n", + " args:\n", + " - --mode=fast\n", + " - name: third\n", + " cmd: flow/test-third\n", + " args:\n", + " - payload\n", + )).OK) + + s, c := newFlowCommandPrep() + invoked := []string{} + require.True(t, c.Command("flow/test-first", core.Command{Action: func(_ core.Options) core.Result { + invoked = append(invoked, "first") + core.Print(nil, "first stdout") + return core.Result{OK: true} + }}).OK) + require.True(t, c.Command("flow/test-second", core.Command{Action: func(options core.Options) core.Result { + invoked = append(invoked, "second") + assert.Equal(t, "fast", options.String("mode")) + _, _ = os.Stderr.WriteString("second stderr\n") + return core.Result{OK: true} + }}).OK) + require.True(t, c.Command("flow/test-third", core.Command{Action: func(options core.Options) core.Result { + invoked = append(invoked, "third") + assert.Equal(t, "payload", options.String("_arg")) + return core.Result{OK: true} + }}).OK) + + output := captureStdout(t, func() { + r := s.cmdRunFlow(core.NewOptions(core.Option{Key: "_arg", Value: filePath})) + require.True(t, r.OK) + + flowOutput, ok := r.Value.(FlowRunOutput) + require.True(t, ok) + assert.True(t, flowOutput.Success) + assert.Equal(t, filePath, flowOutput.Source) + assert.Equal(t, "Execute Flow", flowOutput.Name) + assert.Equal(t, "Run registered commands", flowOutput.Description) + assert.Equal(t, 3, flowOutput.Steps) + assert.Equal(t, 3, flowOutput.Executed) + assert.Equal(t, 3, flowOutput.Passed) + assert.Equal(t, 0, flowOutput.Failed) + require.Len(t, flowOutput.StepResults, 3) + assert.Equal(t, "first", flowOutput.StepResults[0].Name) + assert.Equal(t, "flow/test-second", flowOutput.StepResults[1].Command) + assert.Equal(t, "second stderr\n", flowOutput.StepResults[1].Stderr) + }) + + assert.Equal(t, []string{"first", "second", "third"}, invoked) + assert.Contains(t, output, "steps: 3") + assert.Contains(t, output, "first stdout") + assert.Contains(t, output, "second stderr") + assert.Contains(t, output, "totals: ran=3 passed=3 failed=0") +} + func TestCommandsFlow_CmdRunFlow_Good_RendersVariablesAndDryRun(t *testing.T) { dir := t.TempDir() flowPath := core.JoinPath(dir, "pkg", "lib", "flow", "verify") @@ -57,7 +131,9 @@ func TestCommandsFlow_CmdRunFlow_Good_RendersVariablesAndDryRun(t *testing.T) { "description: Build {{ repo }}\n", "steps:\n", " - name: build\n", - " run: go build ./...\n", + " cmd: flow/test-build\n", + " args:\n", + " - --repo={{ repo }}\n", )).OK) s := newTestPrep(t) @@ -80,9 +156,10 @@ func TestCommandsFlow_CmdRunFlow_Good_RendersVariablesAndDryRun(t *testing.T) { assert.Contains(t, output, "dry-run: true") assert.Contains(t, output, "vars: 1") assert.Contains(t, output, "desc: Build core/go") + assert.Contains(t, output, "build: cmd flow/test-build --repo=core/go") } -func TestCommandsFlow_CmdRunFlow_Good_ResolvesNestedFlowReferences(t *testing.T) { +func TestCommandsFlow_CmdFlowPreview_Good_ResolvesNestedFlowReferences(t *testing.T) { dir := t.TempDir() flowRoot := core.JoinPath(dir, "pkg", "lib", "flow") require.True(t, fs.EnsureDir(core.JoinPath(flowRoot, "verify")).OK) @@ -107,7 +184,7 @@ func TestCommandsFlow_CmdRunFlow_Good_ResolvesNestedFlowReferences(t *testing.T) s := newTestPrep(t) output := captureStdout(t, func() { - r := s.cmdRunFlow(core.NewOptions(core.Option{Key: "_arg", Value: rootPath})) + r := s.cmdFlowPreview(core.NewOptions(core.Option{Key: "_arg", Value: rootPath})) require.True(t, r.OK) flowOutput, ok := r.Value.(FlowRunOutput) @@ -132,6 +209,34 @@ func TestCommandsFlow_CmdRunFlow_Bad_MissingPath(t *testing.T) { assert.Contains(t, err.Error(), "flow path or slug is required") } +func TestCommandsFlow_CmdRunFlow_Bad_RejectsUnknownCommandAtParseTime(t *testing.T) { + dir := t.TempDir() + filePath := core.JoinPath(dir, "missing-command.yaml") + require.True(t, fs.Write(filePath, core.Concat( + "name: Missing Command\n", + "steps:\n", + " - name: first\n", + " cmd: flow/known\n", + " - name: second\n", + " cmd: flow/missing\n", + )).OK) + + s, c := newFlowCommandPrep() + invoked := []string{} + require.True(t, c.Command("flow/known", core.Command{Action: func(_ core.Options) core.Result { + invoked = append(invoked, "known") + return core.Result{OK: true} + }}).OK) + + r := s.cmdRunFlow(core.NewOptions(core.Option{Key: "_arg", Value: filePath})) + require.False(t, r.OK) + + err, ok := r.Value.(error) + require.True(t, ok) + assert.Contains(t, err.Error(), "references unknown command: flow/missing") + assert.Empty(t, invoked) +} + func TestCommandsFlow_CmdRunFlow_Ugly_InvalidYaml(t *testing.T) { dir := t.TempDir() filePath := core.JoinPath(dir, "broken-flow.yaml") @@ -146,6 +251,133 @@ func TestCommandsFlow_CmdRunFlow_Ugly_InvalidYaml(t *testing.T) { assert.Contains(t, err.Error(), "invalid flow definition") } +func TestCommandsFlow_CmdRunFlow_Ugly_StopsOnFirstFailure(t *testing.T) { + dir := t.TempDir() + filePath := core.JoinPath(dir, "stops-on-failure.yaml") + require.True(t, fs.Write(filePath, core.Concat( + "name: Stop On Failure\n", + "steps:\n", + " - name: first\n", + " cmd: flow/first\n", + " - name: second\n", + " cmd: flow/fail\n", + " - name: third\n", + " cmd: flow/third\n", + )).OK) + + s, c := newFlowCommandPrep() + invoked := []string{} + require.True(t, c.Command("flow/first", core.Command{Action: func(_ core.Options) core.Result { + invoked = append(invoked, "first") + return core.Result{OK: true} + }}).OK) + require.True(t, c.Command("flow/fail", core.Command{Action: func(_ core.Options) core.Result { + invoked = append(invoked, "second") + return core.Result{Value: "boom", OK: false} + }}).OK) + require.True(t, c.Command("flow/third", core.Command{Action: func(_ core.Options) core.Result { + invoked = append(invoked, "third") + return core.Result{OK: true} + }}).OK) + + output := captureStdout(t, func() { + r := s.cmdRunFlow(core.NewOptions(core.Option{Key: "_arg", Value: filePath})) + require.False(t, r.OK) + + flowOutput, ok := r.Value.(FlowRunOutput) + require.True(t, ok) + assert.False(t, flowOutput.Success) + assert.Equal(t, 2, flowOutput.Executed) + assert.Equal(t, 1, flowOutput.Passed) + assert.Equal(t, 1, flowOutput.Failed) + require.Len(t, flowOutput.StepResults, 2) + assert.Equal(t, "boom", flowOutput.StepResults[1].Error) + }) + + assert.Equal(t, []string{"first", "second"}, invoked) + assert.Contains(t, output, "second: failed") + assert.Contains(t, output, "totals: ran=2 passed=1 failed=1") + assert.NotContains(t, output, "third: passed") +} + +func TestCommandsFlow_CmdRunFlow_Ugly_ContinueOnErrorRunsRemainingSteps(t *testing.T) { + dir := t.TempDir() + filePath := core.JoinPath(dir, "continue-on-error.yaml") + require.True(t, fs.Write(filePath, core.Concat( + "name: Continue On Error\n", + "steps:\n", + " - name: first\n", + " cmd: flow/first\n", + " - name: second\n", + " cmd: flow/fail\n", + " continueOnError: true\n", + " - name: third\n", + " cmd: flow/third\n", + )).OK) + + s, c := newFlowCommandPrep() + invoked := []string{} + require.True(t, c.Command("flow/first", core.Command{Action: func(_ core.Options) core.Result { + invoked = append(invoked, "first") + return core.Result{OK: true} + }}).OK) + require.True(t, c.Command("flow/fail", core.Command{Action: func(_ core.Options) core.Result { + invoked = append(invoked, "second") + return core.Result{Value: "boom", OK: false} + }}).OK) + require.True(t, c.Command("flow/third", core.Command{Action: func(_ core.Options) core.Result { + invoked = append(invoked, "third") + return core.Result{OK: true} + }}).OK) + + output := captureStdout(t, func() { + r := s.cmdRunFlow(core.NewOptions(core.Option{Key: "_arg", Value: filePath})) + require.True(t, r.OK) + + flowOutput, ok := r.Value.(FlowRunOutput) + require.True(t, ok) + assert.True(t, flowOutput.Success) + assert.Equal(t, 3, flowOutput.Executed) + assert.Equal(t, 2, flowOutput.Passed) + assert.Equal(t, 1, flowOutput.Failed) + require.Len(t, flowOutput.StepResults, 3) + assert.True(t, flowOutput.StepResults[1].ContinueOnError) + assert.Equal(t, "boom", flowOutput.StepResults[1].Error) + }) + + assert.Equal(t, []string{"first", "second", "third"}, invoked) + assert.Contains(t, output, "second: failed (continued)") + assert.Contains(t, output, "third: passed") + assert.Contains(t, output, "totals: ran=3 passed=2 failed=1") +} + +func TestCommandsFlow_CmdRunFlow_Good_EmptyFlowIsNoop(t *testing.T) { + dir := t.TempDir() + filePath := core.JoinPath(dir, "empty-flow.yaml") + require.True(t, fs.Write(filePath, core.Concat( + "name: Empty Flow\n", + "steps: []\n", + )).OK) + + s, _ := newFlowCommandPrep() + output := captureStdout(t, func() { + r := s.cmdRunFlow(core.NewOptions(core.Option{Key: "_arg", Value: filePath})) + require.True(t, r.OK) + + flowOutput, ok := r.Value.(FlowRunOutput) + require.True(t, ok) + assert.True(t, flowOutput.Success) + assert.Equal(t, 0, flowOutput.Steps) + assert.Equal(t, 0, flowOutput.Executed) + assert.Equal(t, 0, flowOutput.Passed) + assert.Equal(t, 0, flowOutput.Failed) + assert.Empty(t, flowOutput.StepResults) + }) + + assert.Contains(t, output, "steps: 0") + assert.Contains(t, output, "totals: ran=0 passed=0 failed=0") +} + func TestCommandsFlow_CmdFlowPreview_Good_VariablesAlias(t *testing.T) { root := t.TempDir() flowPath := core.JoinPath(root, "preview.yaml") diff --git a/pkg/agentic/flow.go b/pkg/agentic/flow.go new file mode 100644 index 0000000..c479f4c --- /dev/null +++ b/pkg/agentic/flow.go @@ -0,0 +1,337 @@ +// SPDX-License-Identifier: EUPL-1.2 + +package agentic + +import ( + "io" + "os" + + core "dappco.re/go/core" +) + +type FlowRunStepOutput struct { + Name string `json:"name,omitempty"` + Command string `json:"command,omitempty"` + Args []string `json:"args,omitempty"` + Success bool `json:"success"` + ContinueOnError bool `json:"continue_on_error,omitempty"` + Stdout string `json:"stdout,omitempty"` + Stderr string `json:"stderr,omitempty"` + Error string `json:"error,omitempty"` +} + +type flowExecutionSummary struct { + Success bool + Executed int + Passed int + Failed int + StepResults []FlowRunStepOutput +} + +func (s *PrepSubsystem) runFlowExecutionCommand(options core.Options, commandLabel string) core.Result { + if optionBoolValue(options, "dry_run", "dry-run") { + return s.runFlowCommand(options, commandLabel) + } + + flowPath := optionStringValue(options, "_arg", "path", "slug") + if flowPath == "" { + core.Print(nil, "usage: core-agent %s [--dry-run] [--var=key=value] [--vars='{\"key\":\"value\"}'] [--variables='{\"key\":\"value\"}']", commandLabel) + return core.Result{Value: core.E("agentic.cmdRunFlow", "flow path or slug is required", nil), OK: false} + } + + variables := optionStringMapValue(options, "var", "vars", "variables") + flowResult := readFlowDocument(flowPath, variables) + if !flowResult.OK { + core.Print(nil, "error: %v", flowResult.Value) + return core.Result{Value: flowResult.Value, OK: false} + } + + document, ok := flowResult.Value.(flowRunDocument) + if !ok || !document.Parsed { + err := core.E("agentic.cmdRunFlow", "invalid flow definition", nil) + core.Print(nil, "error: %v", err) + return core.Result{Value: err, OK: false} + } + + validation := s.validateExecutableFlowDefinition(document) + if !validation.OK { + err, ok := validation.Value.(error) + if !ok { + err = core.E("agentic.cmdRunFlow", "invalid flow definition", nil) + } + core.Print(nil, "error: %v", err) + return core.Result{Value: err, OK: false} + } + + output := FlowRunOutput{ + Success: true, + Source: document.Source, + Name: document.Definition.Name, + Description: document.Definition.Description, + Steps: len(document.Definition.Steps), + Parsed: document.Parsed, + } + + core.Print(nil, "flow: %s", document.Source) + if len(variables) > 0 { + core.Print(nil, "vars: %d", len(variables)) + } + if output.Name != "" { + core.Print(nil, "name: %s", output.Name) + } + if output.Description != "" { + core.Print(nil, "desc: %s", output.Description) + } + + if len(document.Definition.Steps) == 0 { + core.Print(nil, "steps: 0") + printFlowExecutionSummary(output) + return core.Result{Value: output, OK: true} + } + + core.Print(nil, "steps: %d", len(document.Definition.Steps)) + execution := s.executeFlowDefinition(document) + output.Success = execution.Success + output.Executed = execution.Executed + output.Passed = execution.Passed + output.Failed = execution.Failed + output.StepResults = execution.StepResults + + printFlowExecutionSummary(output) + return core.Result{Value: output, OK: output.Success} +} + +func (s *PrepSubsystem) validateExecutableFlowDefinition(document flowRunDocument) core.Result { + for index, step := range document.Definition.Steps { + if err := s.validateExecutableFlowStep(index+1, step); err != nil { + return core.Result{Value: err, OK: false} + } + } + return core.Result{OK: true} +} + +func (s *PrepSubsystem) validateExecutableFlowStep(index int, step flowDefinitionStep) error { + stepName := flowStepDisplayName(index, step) + + if core.Trim(step.Cmd) == "" { + switch { + case core.Trim(step.Flow) != "": + return core.E("agentic.validateExecutableFlowStep", core.Concat("step \"", stepName, "\" cannot execute nested flow references; use flow/preview or convert to cmd"), nil) + case core.Trim(step.Run) != "": + return core.E("agentic.validateExecutableFlowStep", core.Concat("step \"", stepName, "\" uses legacy run syntax; use cmd and args"), nil) + default: + return core.E("agentic.validateExecutableFlowStep", core.Concat("step \"", stepName, "\" must define cmd"), nil) + } + } + + commandResult := s.Core().Command(step.Cmd) + if !commandResult.OK { + return core.E("agentic.validateExecutableFlowStep", core.Concat("step \"", stepName, "\" references unknown command: ", step.Cmd), nil) + } + + command, ok := commandResult.Value.(*core.Command) + if !ok || command == nil || command.Action == nil { + return core.E("agentic.validateExecutableFlowStep", core.Concat("step \"", stepName, "\" references a non-executable command: ", step.Cmd), nil) + } + + return nil +} + +func (s *PrepSubsystem) executeFlowDefinition(document flowRunDocument) flowExecutionSummary { + summary := flowExecutionSummary{Success: true} + + for index, step := range document.Definition.Steps { + stepOutput := s.executeFlowStep(index+1, step) + summary.Executed++ + summary.StepResults = append(summary.StepResults, stepOutput) + + if stepOutput.Success { + summary.Passed++ + continue + } + + summary.Failed++ + if stepOutput.ContinueOnError { + continue + } + + summary.Success = false + break + } + + return summary +} + +func (s *PrepSubsystem) executeFlowStep(index int, step flowDefinitionStep) FlowRunStepOutput { + stepOutput := FlowRunStepOutput{ + Name: flowStepDisplayName(index, step), + Command: step.Cmd, + Args: append([]string(nil), step.Args...), + ContinueOnError: step.ContinueOnError, + } + + core.Print(nil, "%d. %s", index, flowStepSummary(step)) + + command := s.Core().Command(step.Cmd).Value.(*core.Command) + result, stdout, stderr, err := captureFlowStepOutput(func() core.Result { + return command.Run(flowStepOptions(step.Args)) + }) + + if stdout != "" { + stepOutput.Stdout = stdout + printFlowStepStream("stdout", stdout) + } + if stderr != "" { + stepOutput.Stderr = stderr + printFlowStepStream("stderr", stderr) + } + + if err != nil { + stepOutput.Error = err.Error() + core.Print(nil, " status: failed") + core.Print(nil, " error: %s", stepOutput.Error) + return stepOutput + } + + stepOutput.Success = result.OK + if result.OK { + core.Print(nil, " status: passed") + return stepOutput + } + + stepOutput.Error = commandResultError("agentic.cmdRunFlow", result).Error() + if stepOutput.ContinueOnError { + core.Print(nil, " status: failed (continued)") + } else { + core.Print(nil, " status: failed") + } + core.Print(nil, " error: %s", stepOutput.Error) + return stepOutput +} + +func flowStepDisplayName(index int, step flowDefinitionStep) string { + if name := core.Trim(step.Name); name != "" { + return name + } + if name := core.Trim(step.Cmd); name != "" { + return name + } + if name := core.Trim(step.Flow); name != "" { + return name + } + if name := core.Trim(step.Run); name != "" { + return name + } + return core.Concat("step-", core.Itoa(index)) +} + +func flowStepCommandLine(step flowDefinitionStep) string { + command := core.Trim(step.Cmd) + if len(step.Args) == 0 { + return command + } + return core.Concat(command, " ", core.Join(" ", step.Args...)) +} + +func flowStepOptions(args []string) core.Options { + options := core.NewOptions() + for _, arg := range args { + key, value, ok := core.ParseFlag(arg) + if ok { + if core.Contains(arg, "=") { + options.Set(key, value) + } else { + options.Set(key, true) + } + continue + } + + if !core.IsFlag(arg) { + options.Set("_arg", arg) + } + } + return options +} + +func captureFlowStepOutput(run func() core.Result) (core.Result, string, string, error) { + stdoutReader, stdoutWriter, err := os.Pipe() + if err != nil { + return core.Result{}, "", "", core.E("agentic.captureFlowStepOutput", "create stdout pipe", err) + } + stderrReader, stderrWriter, err := os.Pipe() + if err != nil { + stdoutReader.Close() + stdoutWriter.Close() + return core.Result{}, "", "", core.E("agentic.captureFlowStepOutput", "create stderr pipe", err) + } + + oldStdout := os.Stdout + oldStderr := os.Stderr + os.Stdout = stdoutWriter + os.Stderr = stderrWriter + + result := run() + + os.Stdout = oldStdout + os.Stderr = oldStderr + + if err := stdoutWriter.Close(); err != nil { + stdoutReader.Close() + stderrReader.Close() + stderrWriter.Close() + return result, "", "", core.E("agentic.captureFlowStepOutput", "close stdout pipe", err) + } + if err := stderrWriter.Close(); err != nil { + stdoutReader.Close() + stderrReader.Close() + return result, "", "", core.E("agentic.captureFlowStepOutput", "close stderr pipe", err) + } + + stdoutData, err := io.ReadAll(stdoutReader) + if err != nil { + stdoutReader.Close() + stderrReader.Close() + return result, "", "", core.E("agentic.captureFlowStepOutput", "read stdout pipe", err) + } + stderrData, err := io.ReadAll(stderrReader) + if err != nil { + stdoutReader.Close() + stderrReader.Close() + return result, "", "", core.E("agentic.captureFlowStepOutput", "read stderr pipe", err) + } + + stdoutReader.Close() + stderrReader.Close() + return result, string(stdoutData), string(stderrData), nil +} + +func printFlowStepStream(label, stream string) { + trimmed := core.TrimSuffix(core.Replace(stream, "\r\n", "\n"), "\n") + if trimmed == "" { + return + } + + core.Print(nil, " %s:", label) + for _, line := range core.Split(trimmed, "\n") { + core.Print(nil, " %s", line) + } +} + +func printFlowExecutionSummary(output FlowRunOutput) { + core.Print(nil, "") + core.Print(nil, "summary:") + + for _, step := range output.StepResults { + status := "passed" + if !step.Success { + if step.ContinueOnError { + status = "failed (continued)" + } else { + status = "failed" + } + } + core.Print(nil, " %s: %s", step.Name, status) + } + + core.Print(nil, "totals: ran=%d passed=%d failed=%d", output.Executed, output.Passed, output.Failed) +}