feat(agent/agentic): run/flow now executes sequential YAML steps
run/flow command now runs flow steps via the existing command tree: - Each step's cmd is dispatched through s.Command(...) - stdout/stderr captured per step - Stops on first untolerated failure - continueOnError: true allows step to fail without aborting flow - Parse-time validation rejects unknown/non-executable commands BEFORE any step runs flow/preview keeps the old inspection-only behaviour. Empty flows succeed as no-ops. Returns FlowRunOutput with summary: OK boolean, Executed/Passed/ Failed counts, per-step results. Pest-equivalent Go tests cover: 3-step happy path, non-existent cmd parse-time error, mid-flow failure (with + without continueOnError), empty flow. Co-authored-by: Codex <noreply@openai.com> Closes tasks.lthn.sh/view.php?id=160
This commit is contained in:
parent
55bc34c885
commit
eed51d72b8
3 changed files with 607 additions and 26 deletions
|
|
@ -111,7 +111,7 @@ func (s *PrepSubsystem) cmdRunTask(options core.Options) core.Result {
|
|||
}
|
||||
|
||||
func (s *PrepSubsystem) cmdRunFlow(options core.Options) core.Result {
|
||||
return s.runFlowCommand(options, "run flow")
|
||||
return s.runFlowExecutionCommand(options, "run flow")
|
||||
}
|
||||
|
||||
func (s *PrepSubsystem) cmdFlowPreview(options core.Options) core.Result {
|
||||
|
|
@ -1021,13 +1021,17 @@ func parseIntString(s string) int {
|
|||
}
|
||||
|
||||
type FlowRunOutput struct {
|
||||
Success bool `json:"success"`
|
||||
Source string `json:"source,omitempty"`
|
||||
Name string `json:"name,omitempty"`
|
||||
Description string `json:"description,omitempty"`
|
||||
Steps int `json:"steps,omitempty"`
|
||||
ResolvedSteps int `json:"resolved_steps,omitempty"`
|
||||
Parsed bool `json:"parsed,omitempty"`
|
||||
Success bool `json:"success"`
|
||||
Source string `json:"source,omitempty"`
|
||||
Name string `json:"name,omitempty"`
|
||||
Description string `json:"description,omitempty"`
|
||||
Steps int `json:"steps,omitempty"`
|
||||
ResolvedSteps int `json:"resolved_steps,omitempty"`
|
||||
Parsed bool `json:"parsed,omitempty"`
|
||||
Executed int `json:"executed,omitempty"`
|
||||
Passed int `json:"passed,omitempty"`
|
||||
Failed int `json:"failed,omitempty"`
|
||||
StepResults []FlowRunStepOutput `json:"step_results,omitempty"`
|
||||
}
|
||||
|
||||
type flowDefinition struct {
|
||||
|
|
@ -1037,17 +1041,20 @@ type flowDefinition struct {
|
|||
}
|
||||
|
||||
type flowDefinitionStep struct {
|
||||
Name string `yaml:"name"`
|
||||
Run string `yaml:"run"`
|
||||
Flow string `yaml:"flow"`
|
||||
Agent string `yaml:"agent"`
|
||||
Prompt string `yaml:"prompt"`
|
||||
Template string `yaml:"template"`
|
||||
Timeout string `yaml:"timeout"`
|
||||
When string `yaml:"when"`
|
||||
Output string `yaml:"output"`
|
||||
Gate string `yaml:"gate"`
|
||||
Parallel []flowDefinitionStep `yaml:"parallel"`
|
||||
Name string `yaml:"name"`
|
||||
Cmd string `yaml:"cmd"`
|
||||
Args []string `yaml:"args"`
|
||||
Run string `yaml:"run"`
|
||||
Flow string `yaml:"flow"`
|
||||
Agent string `yaml:"agent"`
|
||||
Prompt string `yaml:"prompt"`
|
||||
Template string `yaml:"template"`
|
||||
Timeout string `yaml:"timeout"`
|
||||
When string `yaml:"when"`
|
||||
Output string `yaml:"output"`
|
||||
Gate string `yaml:"gate"`
|
||||
ContinueOnError bool `yaml:"continueOnError"`
|
||||
Parallel []flowDefinitionStep `yaml:"parallel"`
|
||||
}
|
||||
|
||||
type flowRunDocument struct {
|
||||
|
|
@ -1109,7 +1116,7 @@ func parseFlowDefinition(content string) (flowDefinition, error) {
|
|||
if err := yaml.Unmarshal([]byte(content), &definition); err != nil {
|
||||
return flowDefinition{}, core.E("agentic.parseFlowDefinition", "invalid flow definition", err)
|
||||
}
|
||||
if definition.Name == "" || len(definition.Steps) == 0 {
|
||||
if definition.Name == "" {
|
||||
return flowDefinition{}, core.E("agentic.parseFlowDefinition", "invalid flow definition", nil)
|
||||
}
|
||||
return definition, nil
|
||||
|
|
@ -1132,6 +1139,9 @@ func flowStepSummary(step flowDefinitionStep) string {
|
|||
if label == "" {
|
||||
label = core.Trim(step.Flow)
|
||||
}
|
||||
if label == "" {
|
||||
label = core.Trim(step.Cmd)
|
||||
}
|
||||
if label == "" {
|
||||
label = core.Trim(step.Agent)
|
||||
}
|
||||
|
|
@ -1145,6 +1155,8 @@ func flowStepSummary(step flowDefinitionStep) string {
|
|||
switch {
|
||||
case step.Flow != "":
|
||||
return core.Concat(label, ": flow ", step.Flow)
|
||||
case step.Cmd != "":
|
||||
return core.Concat(label, ": cmd ", flowStepCommandLine(step))
|
||||
case step.Agent != "":
|
||||
return core.Concat(label, ": agent ", step.Agent)
|
||||
case step.Run != "":
|
||||
|
|
|
|||
|
|
@ -3,15 +3,24 @@
|
|||
package agentic
|
||||
|
||||
import (
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
core "dappco.re/go/core"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestCommandsFlow_CmdRunFlow_Good_ReadsYamlFlowFile(t *testing.T) {
|
||||
func newFlowCommandPrep() (*PrepSubsystem, *core.Core) {
|
||||
c := core.New()
|
||||
return &PrepSubsystem{
|
||||
ServiceRuntime: core.NewServiceRuntime(c, AgentOptions{}),
|
||||
backoff: make(map[string]time.Time),
|
||||
failCount: make(map[string]int),
|
||||
}, c
|
||||
}
|
||||
|
||||
func TestCommandsFlow_CmdFlowPreview_Good_ReadsYamlFlowFile(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
flowPath := core.JoinPath(dir, "pkg", "lib", "flow", "verify")
|
||||
require.True(t, fs.EnsureDir(flowPath).OK)
|
||||
|
|
@ -29,7 +38,7 @@ func TestCommandsFlow_CmdRunFlow_Good_ReadsYamlFlowFile(t *testing.T) {
|
|||
|
||||
s := newTestPrep(t)
|
||||
output := captureStdout(t, func() {
|
||||
r := s.cmdRunFlow(core.NewOptions(core.Option{Key: "_arg", Value: filePath}))
|
||||
r := s.cmdFlowPreview(core.NewOptions(core.Option{Key: "_arg", Value: filePath}))
|
||||
require.True(t, r.OK)
|
||||
|
||||
flowOutput, ok := r.Value.(FlowRunOutput)
|
||||
|
|
@ -46,6 +55,71 @@ func TestCommandsFlow_CmdRunFlow_Good_ReadsYamlFlowFile(t *testing.T) {
|
|||
assert.Contains(t, output, "verify: flow verify/go-qa.yaml")
|
||||
}
|
||||
|
||||
func TestCommandsFlow_CmdRunFlow_Good_ExecutesSequentialSteps(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
filePath := core.JoinPath(dir, "execute-flow.yaml")
|
||||
require.True(t, fs.Write(filePath, core.Concat(
|
||||
"name: Execute Flow\n",
|
||||
"description: Run registered commands\n",
|
||||
"steps:\n",
|
||||
" - name: first\n",
|
||||
" cmd: flow/test-first\n",
|
||||
" - name: second\n",
|
||||
" cmd: flow/test-second\n",
|
||||
" args:\n",
|
||||
" - --mode=fast\n",
|
||||
" - name: third\n",
|
||||
" cmd: flow/test-third\n",
|
||||
" args:\n",
|
||||
" - payload\n",
|
||||
)).OK)
|
||||
|
||||
s, c := newFlowCommandPrep()
|
||||
invoked := []string{}
|
||||
require.True(t, c.Command("flow/test-first", core.Command{Action: func(_ core.Options) core.Result {
|
||||
invoked = append(invoked, "first")
|
||||
core.Print(nil, "first stdout")
|
||||
return core.Result{OK: true}
|
||||
}}).OK)
|
||||
require.True(t, c.Command("flow/test-second", core.Command{Action: func(options core.Options) core.Result {
|
||||
invoked = append(invoked, "second")
|
||||
assert.Equal(t, "fast", options.String("mode"))
|
||||
_, _ = os.Stderr.WriteString("second stderr\n")
|
||||
return core.Result{OK: true}
|
||||
}}).OK)
|
||||
require.True(t, c.Command("flow/test-third", core.Command{Action: func(options core.Options) core.Result {
|
||||
invoked = append(invoked, "third")
|
||||
assert.Equal(t, "payload", options.String("_arg"))
|
||||
return core.Result{OK: true}
|
||||
}}).OK)
|
||||
|
||||
output := captureStdout(t, func() {
|
||||
r := s.cmdRunFlow(core.NewOptions(core.Option{Key: "_arg", Value: filePath}))
|
||||
require.True(t, r.OK)
|
||||
|
||||
flowOutput, ok := r.Value.(FlowRunOutput)
|
||||
require.True(t, ok)
|
||||
assert.True(t, flowOutput.Success)
|
||||
assert.Equal(t, filePath, flowOutput.Source)
|
||||
assert.Equal(t, "Execute Flow", flowOutput.Name)
|
||||
assert.Equal(t, "Run registered commands", flowOutput.Description)
|
||||
assert.Equal(t, 3, flowOutput.Steps)
|
||||
assert.Equal(t, 3, flowOutput.Executed)
|
||||
assert.Equal(t, 3, flowOutput.Passed)
|
||||
assert.Equal(t, 0, flowOutput.Failed)
|
||||
require.Len(t, flowOutput.StepResults, 3)
|
||||
assert.Equal(t, "first", flowOutput.StepResults[0].Name)
|
||||
assert.Equal(t, "flow/test-second", flowOutput.StepResults[1].Command)
|
||||
assert.Equal(t, "second stderr\n", flowOutput.StepResults[1].Stderr)
|
||||
})
|
||||
|
||||
assert.Equal(t, []string{"first", "second", "third"}, invoked)
|
||||
assert.Contains(t, output, "steps: 3")
|
||||
assert.Contains(t, output, "first stdout")
|
||||
assert.Contains(t, output, "second stderr")
|
||||
assert.Contains(t, output, "totals: ran=3 passed=3 failed=0")
|
||||
}
|
||||
|
||||
func TestCommandsFlow_CmdRunFlow_Good_RendersVariablesAndDryRun(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
flowPath := core.JoinPath(dir, "pkg", "lib", "flow", "verify")
|
||||
|
|
@ -57,7 +131,9 @@ func TestCommandsFlow_CmdRunFlow_Good_RendersVariablesAndDryRun(t *testing.T) {
|
|||
"description: Build {{ repo }}\n",
|
||||
"steps:\n",
|
||||
" - name: build\n",
|
||||
" run: go build ./...\n",
|
||||
" cmd: flow/test-build\n",
|
||||
" args:\n",
|
||||
" - --repo={{ repo }}\n",
|
||||
)).OK)
|
||||
|
||||
s := newTestPrep(t)
|
||||
|
|
@ -80,9 +156,10 @@ func TestCommandsFlow_CmdRunFlow_Good_RendersVariablesAndDryRun(t *testing.T) {
|
|||
assert.Contains(t, output, "dry-run: true")
|
||||
assert.Contains(t, output, "vars: 1")
|
||||
assert.Contains(t, output, "desc: Build core/go")
|
||||
assert.Contains(t, output, "build: cmd flow/test-build --repo=core/go")
|
||||
}
|
||||
|
||||
func TestCommandsFlow_CmdRunFlow_Good_ResolvesNestedFlowReferences(t *testing.T) {
|
||||
func TestCommandsFlow_CmdFlowPreview_Good_ResolvesNestedFlowReferences(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
flowRoot := core.JoinPath(dir, "pkg", "lib", "flow")
|
||||
require.True(t, fs.EnsureDir(core.JoinPath(flowRoot, "verify")).OK)
|
||||
|
|
@ -107,7 +184,7 @@ func TestCommandsFlow_CmdRunFlow_Good_ResolvesNestedFlowReferences(t *testing.T)
|
|||
|
||||
s := newTestPrep(t)
|
||||
output := captureStdout(t, func() {
|
||||
r := s.cmdRunFlow(core.NewOptions(core.Option{Key: "_arg", Value: rootPath}))
|
||||
r := s.cmdFlowPreview(core.NewOptions(core.Option{Key: "_arg", Value: rootPath}))
|
||||
require.True(t, r.OK)
|
||||
|
||||
flowOutput, ok := r.Value.(FlowRunOutput)
|
||||
|
|
@ -132,6 +209,34 @@ func TestCommandsFlow_CmdRunFlow_Bad_MissingPath(t *testing.T) {
|
|||
assert.Contains(t, err.Error(), "flow path or slug is required")
|
||||
}
|
||||
|
||||
func TestCommandsFlow_CmdRunFlow_Bad_RejectsUnknownCommandAtParseTime(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
filePath := core.JoinPath(dir, "missing-command.yaml")
|
||||
require.True(t, fs.Write(filePath, core.Concat(
|
||||
"name: Missing Command\n",
|
||||
"steps:\n",
|
||||
" - name: first\n",
|
||||
" cmd: flow/known\n",
|
||||
" - name: second\n",
|
||||
" cmd: flow/missing\n",
|
||||
)).OK)
|
||||
|
||||
s, c := newFlowCommandPrep()
|
||||
invoked := []string{}
|
||||
require.True(t, c.Command("flow/known", core.Command{Action: func(_ core.Options) core.Result {
|
||||
invoked = append(invoked, "known")
|
||||
return core.Result{OK: true}
|
||||
}}).OK)
|
||||
|
||||
r := s.cmdRunFlow(core.NewOptions(core.Option{Key: "_arg", Value: filePath}))
|
||||
require.False(t, r.OK)
|
||||
|
||||
err, ok := r.Value.(error)
|
||||
require.True(t, ok)
|
||||
assert.Contains(t, err.Error(), "references unknown command: flow/missing")
|
||||
assert.Empty(t, invoked)
|
||||
}
|
||||
|
||||
func TestCommandsFlow_CmdRunFlow_Ugly_InvalidYaml(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
filePath := core.JoinPath(dir, "broken-flow.yaml")
|
||||
|
|
@ -146,6 +251,133 @@ func TestCommandsFlow_CmdRunFlow_Ugly_InvalidYaml(t *testing.T) {
|
|||
assert.Contains(t, err.Error(), "invalid flow definition")
|
||||
}
|
||||
|
||||
func TestCommandsFlow_CmdRunFlow_Ugly_StopsOnFirstFailure(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
filePath := core.JoinPath(dir, "stops-on-failure.yaml")
|
||||
require.True(t, fs.Write(filePath, core.Concat(
|
||||
"name: Stop On Failure\n",
|
||||
"steps:\n",
|
||||
" - name: first\n",
|
||||
" cmd: flow/first\n",
|
||||
" - name: second\n",
|
||||
" cmd: flow/fail\n",
|
||||
" - name: third\n",
|
||||
" cmd: flow/third\n",
|
||||
)).OK)
|
||||
|
||||
s, c := newFlowCommandPrep()
|
||||
invoked := []string{}
|
||||
require.True(t, c.Command("flow/first", core.Command{Action: func(_ core.Options) core.Result {
|
||||
invoked = append(invoked, "first")
|
||||
return core.Result{OK: true}
|
||||
}}).OK)
|
||||
require.True(t, c.Command("flow/fail", core.Command{Action: func(_ core.Options) core.Result {
|
||||
invoked = append(invoked, "second")
|
||||
return core.Result{Value: "boom", OK: false}
|
||||
}}).OK)
|
||||
require.True(t, c.Command("flow/third", core.Command{Action: func(_ core.Options) core.Result {
|
||||
invoked = append(invoked, "third")
|
||||
return core.Result{OK: true}
|
||||
}}).OK)
|
||||
|
||||
output := captureStdout(t, func() {
|
||||
r := s.cmdRunFlow(core.NewOptions(core.Option{Key: "_arg", Value: filePath}))
|
||||
require.False(t, r.OK)
|
||||
|
||||
flowOutput, ok := r.Value.(FlowRunOutput)
|
||||
require.True(t, ok)
|
||||
assert.False(t, flowOutput.Success)
|
||||
assert.Equal(t, 2, flowOutput.Executed)
|
||||
assert.Equal(t, 1, flowOutput.Passed)
|
||||
assert.Equal(t, 1, flowOutput.Failed)
|
||||
require.Len(t, flowOutput.StepResults, 2)
|
||||
assert.Equal(t, "boom", flowOutput.StepResults[1].Error)
|
||||
})
|
||||
|
||||
assert.Equal(t, []string{"first", "second"}, invoked)
|
||||
assert.Contains(t, output, "second: failed")
|
||||
assert.Contains(t, output, "totals: ran=2 passed=1 failed=1")
|
||||
assert.NotContains(t, output, "third: passed")
|
||||
}
|
||||
|
||||
func TestCommandsFlow_CmdRunFlow_Ugly_ContinueOnErrorRunsRemainingSteps(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
filePath := core.JoinPath(dir, "continue-on-error.yaml")
|
||||
require.True(t, fs.Write(filePath, core.Concat(
|
||||
"name: Continue On Error\n",
|
||||
"steps:\n",
|
||||
" - name: first\n",
|
||||
" cmd: flow/first\n",
|
||||
" - name: second\n",
|
||||
" cmd: flow/fail\n",
|
||||
" continueOnError: true\n",
|
||||
" - name: third\n",
|
||||
" cmd: flow/third\n",
|
||||
)).OK)
|
||||
|
||||
s, c := newFlowCommandPrep()
|
||||
invoked := []string{}
|
||||
require.True(t, c.Command("flow/first", core.Command{Action: func(_ core.Options) core.Result {
|
||||
invoked = append(invoked, "first")
|
||||
return core.Result{OK: true}
|
||||
}}).OK)
|
||||
require.True(t, c.Command("flow/fail", core.Command{Action: func(_ core.Options) core.Result {
|
||||
invoked = append(invoked, "second")
|
||||
return core.Result{Value: "boom", OK: false}
|
||||
}}).OK)
|
||||
require.True(t, c.Command("flow/third", core.Command{Action: func(_ core.Options) core.Result {
|
||||
invoked = append(invoked, "third")
|
||||
return core.Result{OK: true}
|
||||
}}).OK)
|
||||
|
||||
output := captureStdout(t, func() {
|
||||
r := s.cmdRunFlow(core.NewOptions(core.Option{Key: "_arg", Value: filePath}))
|
||||
require.True(t, r.OK)
|
||||
|
||||
flowOutput, ok := r.Value.(FlowRunOutput)
|
||||
require.True(t, ok)
|
||||
assert.True(t, flowOutput.Success)
|
||||
assert.Equal(t, 3, flowOutput.Executed)
|
||||
assert.Equal(t, 2, flowOutput.Passed)
|
||||
assert.Equal(t, 1, flowOutput.Failed)
|
||||
require.Len(t, flowOutput.StepResults, 3)
|
||||
assert.True(t, flowOutput.StepResults[1].ContinueOnError)
|
||||
assert.Equal(t, "boom", flowOutput.StepResults[1].Error)
|
||||
})
|
||||
|
||||
assert.Equal(t, []string{"first", "second", "third"}, invoked)
|
||||
assert.Contains(t, output, "second: failed (continued)")
|
||||
assert.Contains(t, output, "third: passed")
|
||||
assert.Contains(t, output, "totals: ran=3 passed=2 failed=1")
|
||||
}
|
||||
|
||||
func TestCommandsFlow_CmdRunFlow_Good_EmptyFlowIsNoop(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
filePath := core.JoinPath(dir, "empty-flow.yaml")
|
||||
require.True(t, fs.Write(filePath, core.Concat(
|
||||
"name: Empty Flow\n",
|
||||
"steps: []\n",
|
||||
)).OK)
|
||||
|
||||
s, _ := newFlowCommandPrep()
|
||||
output := captureStdout(t, func() {
|
||||
r := s.cmdRunFlow(core.NewOptions(core.Option{Key: "_arg", Value: filePath}))
|
||||
require.True(t, r.OK)
|
||||
|
||||
flowOutput, ok := r.Value.(FlowRunOutput)
|
||||
require.True(t, ok)
|
||||
assert.True(t, flowOutput.Success)
|
||||
assert.Equal(t, 0, flowOutput.Steps)
|
||||
assert.Equal(t, 0, flowOutput.Executed)
|
||||
assert.Equal(t, 0, flowOutput.Passed)
|
||||
assert.Equal(t, 0, flowOutput.Failed)
|
||||
assert.Empty(t, flowOutput.StepResults)
|
||||
})
|
||||
|
||||
assert.Contains(t, output, "steps: 0")
|
||||
assert.Contains(t, output, "totals: ran=0 passed=0 failed=0")
|
||||
}
|
||||
|
||||
func TestCommandsFlow_CmdFlowPreview_Good_VariablesAlias(t *testing.T) {
|
||||
root := t.TempDir()
|
||||
flowPath := core.JoinPath(root, "preview.yaml")
|
||||
|
|
|
|||
337
pkg/agentic/flow.go
Normal file
337
pkg/agentic/flow.go
Normal file
|
|
@ -0,0 +1,337 @@
|
|||
// SPDX-License-Identifier: EUPL-1.2
|
||||
|
||||
package agentic
|
||||
|
||||
import (
|
||||
"io"
|
||||
"os"
|
||||
|
||||
core "dappco.re/go/core"
|
||||
)
|
||||
|
||||
type FlowRunStepOutput struct {
|
||||
Name string `json:"name,omitempty"`
|
||||
Command string `json:"command,omitempty"`
|
||||
Args []string `json:"args,omitempty"`
|
||||
Success bool `json:"success"`
|
||||
ContinueOnError bool `json:"continue_on_error,omitempty"`
|
||||
Stdout string `json:"stdout,omitempty"`
|
||||
Stderr string `json:"stderr,omitempty"`
|
||||
Error string `json:"error,omitempty"`
|
||||
}
|
||||
|
||||
type flowExecutionSummary struct {
|
||||
Success bool
|
||||
Executed int
|
||||
Passed int
|
||||
Failed int
|
||||
StepResults []FlowRunStepOutput
|
||||
}
|
||||
|
||||
func (s *PrepSubsystem) runFlowExecutionCommand(options core.Options, commandLabel string) core.Result {
|
||||
if optionBoolValue(options, "dry_run", "dry-run") {
|
||||
return s.runFlowCommand(options, commandLabel)
|
||||
}
|
||||
|
||||
flowPath := optionStringValue(options, "_arg", "path", "slug")
|
||||
if flowPath == "" {
|
||||
core.Print(nil, "usage: core-agent %s <path-or-slug> [--dry-run] [--var=key=value] [--vars='{\"key\":\"value\"}'] [--variables='{\"key\":\"value\"}']", commandLabel)
|
||||
return core.Result{Value: core.E("agentic.cmdRunFlow", "flow path or slug is required", nil), OK: false}
|
||||
}
|
||||
|
||||
variables := optionStringMapValue(options, "var", "vars", "variables")
|
||||
flowResult := readFlowDocument(flowPath, variables)
|
||||
if !flowResult.OK {
|
||||
core.Print(nil, "error: %v", flowResult.Value)
|
||||
return core.Result{Value: flowResult.Value, OK: false}
|
||||
}
|
||||
|
||||
document, ok := flowResult.Value.(flowRunDocument)
|
||||
if !ok || !document.Parsed {
|
||||
err := core.E("agentic.cmdRunFlow", "invalid flow definition", nil)
|
||||
core.Print(nil, "error: %v", err)
|
||||
return core.Result{Value: err, OK: false}
|
||||
}
|
||||
|
||||
validation := s.validateExecutableFlowDefinition(document)
|
||||
if !validation.OK {
|
||||
err, ok := validation.Value.(error)
|
||||
if !ok {
|
||||
err = core.E("agentic.cmdRunFlow", "invalid flow definition", nil)
|
||||
}
|
||||
core.Print(nil, "error: %v", err)
|
||||
return core.Result{Value: err, OK: false}
|
||||
}
|
||||
|
||||
output := FlowRunOutput{
|
||||
Success: true,
|
||||
Source: document.Source,
|
||||
Name: document.Definition.Name,
|
||||
Description: document.Definition.Description,
|
||||
Steps: len(document.Definition.Steps),
|
||||
Parsed: document.Parsed,
|
||||
}
|
||||
|
||||
core.Print(nil, "flow: %s", document.Source)
|
||||
if len(variables) > 0 {
|
||||
core.Print(nil, "vars: %d", len(variables))
|
||||
}
|
||||
if output.Name != "" {
|
||||
core.Print(nil, "name: %s", output.Name)
|
||||
}
|
||||
if output.Description != "" {
|
||||
core.Print(nil, "desc: %s", output.Description)
|
||||
}
|
||||
|
||||
if len(document.Definition.Steps) == 0 {
|
||||
core.Print(nil, "steps: 0")
|
||||
printFlowExecutionSummary(output)
|
||||
return core.Result{Value: output, OK: true}
|
||||
}
|
||||
|
||||
core.Print(nil, "steps: %d", len(document.Definition.Steps))
|
||||
execution := s.executeFlowDefinition(document)
|
||||
output.Success = execution.Success
|
||||
output.Executed = execution.Executed
|
||||
output.Passed = execution.Passed
|
||||
output.Failed = execution.Failed
|
||||
output.StepResults = execution.StepResults
|
||||
|
||||
printFlowExecutionSummary(output)
|
||||
return core.Result{Value: output, OK: output.Success}
|
||||
}
|
||||
|
||||
func (s *PrepSubsystem) validateExecutableFlowDefinition(document flowRunDocument) core.Result {
|
||||
for index, step := range document.Definition.Steps {
|
||||
if err := s.validateExecutableFlowStep(index+1, step); err != nil {
|
||||
return core.Result{Value: err, OK: false}
|
||||
}
|
||||
}
|
||||
return core.Result{OK: true}
|
||||
}
|
||||
|
||||
func (s *PrepSubsystem) validateExecutableFlowStep(index int, step flowDefinitionStep) error {
|
||||
stepName := flowStepDisplayName(index, step)
|
||||
|
||||
if core.Trim(step.Cmd) == "" {
|
||||
switch {
|
||||
case core.Trim(step.Flow) != "":
|
||||
return core.E("agentic.validateExecutableFlowStep", core.Concat("step \"", stepName, "\" cannot execute nested flow references; use flow/preview or convert to cmd"), nil)
|
||||
case core.Trim(step.Run) != "":
|
||||
return core.E("agentic.validateExecutableFlowStep", core.Concat("step \"", stepName, "\" uses legacy run syntax; use cmd and args"), nil)
|
||||
default:
|
||||
return core.E("agentic.validateExecutableFlowStep", core.Concat("step \"", stepName, "\" must define cmd"), nil)
|
||||
}
|
||||
}
|
||||
|
||||
commandResult := s.Core().Command(step.Cmd)
|
||||
if !commandResult.OK {
|
||||
return core.E("agentic.validateExecutableFlowStep", core.Concat("step \"", stepName, "\" references unknown command: ", step.Cmd), nil)
|
||||
}
|
||||
|
||||
command, ok := commandResult.Value.(*core.Command)
|
||||
if !ok || command == nil || command.Action == nil {
|
||||
return core.E("agentic.validateExecutableFlowStep", core.Concat("step \"", stepName, "\" references a non-executable command: ", step.Cmd), nil)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *PrepSubsystem) executeFlowDefinition(document flowRunDocument) flowExecutionSummary {
|
||||
summary := flowExecutionSummary{Success: true}
|
||||
|
||||
for index, step := range document.Definition.Steps {
|
||||
stepOutput := s.executeFlowStep(index+1, step)
|
||||
summary.Executed++
|
||||
summary.StepResults = append(summary.StepResults, stepOutput)
|
||||
|
||||
if stepOutput.Success {
|
||||
summary.Passed++
|
||||
continue
|
||||
}
|
||||
|
||||
summary.Failed++
|
||||
if stepOutput.ContinueOnError {
|
||||
continue
|
||||
}
|
||||
|
||||
summary.Success = false
|
||||
break
|
||||
}
|
||||
|
||||
return summary
|
||||
}
|
||||
|
||||
func (s *PrepSubsystem) executeFlowStep(index int, step flowDefinitionStep) FlowRunStepOutput {
|
||||
stepOutput := FlowRunStepOutput{
|
||||
Name: flowStepDisplayName(index, step),
|
||||
Command: step.Cmd,
|
||||
Args: append([]string(nil), step.Args...),
|
||||
ContinueOnError: step.ContinueOnError,
|
||||
}
|
||||
|
||||
core.Print(nil, "%d. %s", index, flowStepSummary(step))
|
||||
|
||||
command := s.Core().Command(step.Cmd).Value.(*core.Command)
|
||||
result, stdout, stderr, err := captureFlowStepOutput(func() core.Result {
|
||||
return command.Run(flowStepOptions(step.Args))
|
||||
})
|
||||
|
||||
if stdout != "" {
|
||||
stepOutput.Stdout = stdout
|
||||
printFlowStepStream("stdout", stdout)
|
||||
}
|
||||
if stderr != "" {
|
||||
stepOutput.Stderr = stderr
|
||||
printFlowStepStream("stderr", stderr)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
stepOutput.Error = err.Error()
|
||||
core.Print(nil, " status: failed")
|
||||
core.Print(nil, " error: %s", stepOutput.Error)
|
||||
return stepOutput
|
||||
}
|
||||
|
||||
stepOutput.Success = result.OK
|
||||
if result.OK {
|
||||
core.Print(nil, " status: passed")
|
||||
return stepOutput
|
||||
}
|
||||
|
||||
stepOutput.Error = commandResultError("agentic.cmdRunFlow", result).Error()
|
||||
if stepOutput.ContinueOnError {
|
||||
core.Print(nil, " status: failed (continued)")
|
||||
} else {
|
||||
core.Print(nil, " status: failed")
|
||||
}
|
||||
core.Print(nil, " error: %s", stepOutput.Error)
|
||||
return stepOutput
|
||||
}
|
||||
|
||||
func flowStepDisplayName(index int, step flowDefinitionStep) string {
|
||||
if name := core.Trim(step.Name); name != "" {
|
||||
return name
|
||||
}
|
||||
if name := core.Trim(step.Cmd); name != "" {
|
||||
return name
|
||||
}
|
||||
if name := core.Trim(step.Flow); name != "" {
|
||||
return name
|
||||
}
|
||||
if name := core.Trim(step.Run); name != "" {
|
||||
return name
|
||||
}
|
||||
return core.Concat("step-", core.Itoa(index))
|
||||
}
|
||||
|
||||
func flowStepCommandLine(step flowDefinitionStep) string {
|
||||
command := core.Trim(step.Cmd)
|
||||
if len(step.Args) == 0 {
|
||||
return command
|
||||
}
|
||||
return core.Concat(command, " ", core.Join(" ", step.Args...))
|
||||
}
|
||||
|
||||
func flowStepOptions(args []string) core.Options {
|
||||
options := core.NewOptions()
|
||||
for _, arg := range args {
|
||||
key, value, ok := core.ParseFlag(arg)
|
||||
if ok {
|
||||
if core.Contains(arg, "=") {
|
||||
options.Set(key, value)
|
||||
} else {
|
||||
options.Set(key, true)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
if !core.IsFlag(arg) {
|
||||
options.Set("_arg", arg)
|
||||
}
|
||||
}
|
||||
return options
|
||||
}
|
||||
|
||||
func captureFlowStepOutput(run func() core.Result) (core.Result, string, string, error) {
|
||||
stdoutReader, stdoutWriter, err := os.Pipe()
|
||||
if err != nil {
|
||||
return core.Result{}, "", "", core.E("agentic.captureFlowStepOutput", "create stdout pipe", err)
|
||||
}
|
||||
stderrReader, stderrWriter, err := os.Pipe()
|
||||
if err != nil {
|
||||
stdoutReader.Close()
|
||||
stdoutWriter.Close()
|
||||
return core.Result{}, "", "", core.E("agentic.captureFlowStepOutput", "create stderr pipe", err)
|
||||
}
|
||||
|
||||
oldStdout := os.Stdout
|
||||
oldStderr := os.Stderr
|
||||
os.Stdout = stdoutWriter
|
||||
os.Stderr = stderrWriter
|
||||
|
||||
result := run()
|
||||
|
||||
os.Stdout = oldStdout
|
||||
os.Stderr = oldStderr
|
||||
|
||||
if err := stdoutWriter.Close(); err != nil {
|
||||
stdoutReader.Close()
|
||||
stderrReader.Close()
|
||||
stderrWriter.Close()
|
||||
return result, "", "", core.E("agentic.captureFlowStepOutput", "close stdout pipe", err)
|
||||
}
|
||||
if err := stderrWriter.Close(); err != nil {
|
||||
stdoutReader.Close()
|
||||
stderrReader.Close()
|
||||
return result, "", "", core.E("agentic.captureFlowStepOutput", "close stderr pipe", err)
|
||||
}
|
||||
|
||||
stdoutData, err := io.ReadAll(stdoutReader)
|
||||
if err != nil {
|
||||
stdoutReader.Close()
|
||||
stderrReader.Close()
|
||||
return result, "", "", core.E("agentic.captureFlowStepOutput", "read stdout pipe", err)
|
||||
}
|
||||
stderrData, err := io.ReadAll(stderrReader)
|
||||
if err != nil {
|
||||
stdoutReader.Close()
|
||||
stderrReader.Close()
|
||||
return result, "", "", core.E("agentic.captureFlowStepOutput", "read stderr pipe", err)
|
||||
}
|
||||
|
||||
stdoutReader.Close()
|
||||
stderrReader.Close()
|
||||
return result, string(stdoutData), string(stderrData), nil
|
||||
}
|
||||
|
||||
func printFlowStepStream(label, stream string) {
|
||||
trimmed := core.TrimSuffix(core.Replace(stream, "\r\n", "\n"), "\n")
|
||||
if trimmed == "" {
|
||||
return
|
||||
}
|
||||
|
||||
core.Print(nil, " %s:", label)
|
||||
for _, line := range core.Split(trimmed, "\n") {
|
||||
core.Print(nil, " %s", line)
|
||||
}
|
||||
}
|
||||
|
||||
func printFlowExecutionSummary(output FlowRunOutput) {
|
||||
core.Print(nil, "")
|
||||
core.Print(nil, "summary:")
|
||||
|
||||
for _, step := range output.StepResults {
|
||||
status := "passed"
|
||||
if !step.Success {
|
||||
if step.ContinueOnError {
|
||||
status = "failed (continued)"
|
||||
} else {
|
||||
status = "failed"
|
||||
}
|
||||
}
|
||||
core.Print(nil, " %s: %s", step.Name, status)
|
||||
}
|
||||
|
||||
core.Print(nil, "totals: ran=%d passed=%d failed=%d", output.Executed, output.Passed, output.Failed)
|
||||
}
|
||||
Loading…
Add table
Reference in a new issue