From 78e663a0e039055306272816a79f4d2d511cc83d Mon Sep 17 00:00:00 2001 From: Virgil Date: Wed, 1 Apr 2026 22:30:20 +0000 Subject: [PATCH] feat(agentic): add flow preview command Co-Authored-By: Virgil --- pkg/agentic/commands.go | 193 ++++++++++++++++++++++++++++++ pkg/agentic/commands_flow_test.go | 71 +++++++++++ pkg/agentic/commands_test.go | 1 + 3 files changed, 265 insertions(+) create mode 100644 pkg/agentic/commands_flow_test.go diff --git a/pkg/agentic/commands.go b/pkg/agentic/commands.go index 25ef074..e6559a6 100644 --- a/pkg/agentic/commands.go +++ b/pkg/agentic/commands.go @@ -10,6 +10,7 @@ import ( "dappco.re/go/agent/pkg/lib" core "dappco.re/go/core" + "gopkg.in/yaml.v3" ) // c.Command("run/task", core.Command{Description: "Run a single task end-to-end", Action: s.cmdRunTask}) @@ -18,6 +19,7 @@ func (s *PrepSubsystem) registerCommands(ctx context.Context) { s.startupContext = ctx c := s.Core() c.Command("run/task", core.Command{Description: "Run a single task end-to-end", Action: s.cmdRunTask}) + c.Command("run/flow", core.Command{Description: "Show a flow definition from disk or the embedded library", Action: s.cmdRunFlow}) c.Command("dispatch/sync", core.Command{Description: "Dispatch a single task synchronously and block until it completes", Action: s.cmdDispatchSync}) c.Command("run/orchestrator", core.Command{Description: "Run the queue orchestrator (standalone, no MCP)", Action: s.cmdOrchestrator}) c.Command("dispatch", core.Command{Description: "Dispatch queued agents", Action: s.cmdDispatch}) @@ -78,6 +80,62 @@ func (s *PrepSubsystem) cmdRunTask(options core.Options) core.Result { return s.runDispatchSync(s.commandContext(), options, "run task", "agentic.runTask") } +func (s *PrepSubsystem) cmdRunFlow(options core.Options) core.Result { + flowPath := optionStringValue(options, "_arg", "path", "slug") + if flowPath == "" { + core.Print(nil, "usage: core-agent run flow ") + return core.Result{Value: core.E("agentic.cmdRunFlow", "flow path or slug is required", nil), OK: false} + } + + flowResult := readFlowDocument(flowPath) + if !flowResult.OK { + core.Print(nil, "error: %v", flowResult.Value) + return core.Result{Value: flowResult.Value, OK: false} + } + + document, ok := flowResult.Value.(flowRunDocument) + if !ok { + err := core.E("agentic.cmdRunFlow", "invalid flow definition", nil) + core.Print(nil, "error: %v", err) + return core.Result{Value: err, OK: false} + } + + output := FlowRunOutput{ + Success: true, + Source: document.Source, + Name: document.Definition.Name, + Description: document.Definition.Description, + Steps: len(document.Definition.Steps), + Parsed: document.Parsed, + } + + core.Print(nil, "flow: %s", document.Source) + if document.Parsed { + if document.Definition.Name != "" { + core.Print(nil, "name: %s", document.Definition.Name) + } + if document.Definition.Description != "" { + core.Print(nil, "desc: %s", document.Definition.Description) + } + if len(document.Definition.Steps) == 0 { + core.Print(nil, "steps: 0") + return core.Result{Value: output, OK: true} + } + + core.Print(nil, "steps: %d", len(document.Definition.Steps)) + for index, step := range document.Definition.Steps { + core.Print(nil, " %d. %s", index+1, flowStepSummary(step)) + } + return core.Result{Value: output, OK: true} + } + + if document.Content != "" { + core.Print(nil, "content: %d chars", len(document.Content)) + } + + return core.Result{Value: output, OK: true} +} + func (s *PrepSubsystem) cmdDispatchSync(options core.Options) core.Result { return s.runDispatchSync(s.commandContext(), options, "dispatch sync", "agentic.runDispatchSync") } @@ -748,6 +806,141 @@ func parseIntString(s string) int { return n } +type FlowRunOutput struct { + Success bool `json:"success"` + Source string `json:"source,omitempty"` + Name string `json:"name,omitempty"` + Description string `json:"description,omitempty"` + Steps int `json:"steps,omitempty"` + Parsed bool `json:"parsed,omitempty"` +} + +type flowDefinition struct { + Name string `yaml:"name"` + Description string `yaml:"description"` + Steps []flowDefinitionStep `yaml:"steps"` +} + +type flowDefinitionStep struct { + Name string `yaml:"name"` + Run string `yaml:"run"` + Flow string `yaml:"flow"` + Agent string `yaml:"agent"` + Prompt string `yaml:"prompt"` + Template string `yaml:"template"` + Timeout string `yaml:"timeout"` + When string `yaml:"when"` + Output string `yaml:"output"` + Gate string `yaml:"gate"` + Parallel []flowDefinitionStep `yaml:"parallel"` +} + +type flowRunDocument struct { + Source string + Content string + Parsed bool + Definition flowDefinition +} + +func readFlowDocument(path string) core.Result { + if readResult := fs.Read(path); readResult.OK { + content := readResult.Value.(string) + definition, err := parseFlowDefinition(content) + if err != nil { + if flowInputLooksYaml(path) { + return core.Result{Value: err, OK: false} + } + return core.Result{Value: flowRunDocument{ + Source: path, + Content: content, + Parsed: false, + }, OK: true} + } + return core.Result{Value: flowRunDocument{ + Source: path, + Content: content, + Parsed: true, + Definition: definition, + }, OK: true} + } + + flowResult := lib.Flow(flowSlugFromPath(path)) + if !flowResult.OK { + if err, ok := flowResult.Value.(error); ok { + return core.Result{Value: core.E("agentic.cmdRunFlow", core.Concat("flow not found: ", path), err), OK: false} + } + return core.Result{Value: core.E("agentic.cmdRunFlow", core.Concat("flow not found: ", path), nil), OK: false} + } + + content := flowResult.Value.(string) + definition, err := parseFlowDefinition(content) + if err != nil { + return core.Result{Value: flowRunDocument{ + Source: core.Concat("embedded:", flowSlugFromPath(path)), + Content: content, + Parsed: false, + }, OK: true} + } + return core.Result{Value: flowRunDocument{ + Source: core.Concat("embedded:", flowSlugFromPath(path)), + Content: content, + Parsed: true, + Definition: definition, + }, OK: true} +} + +func parseFlowDefinition(content string) (flowDefinition, error) { + var definition flowDefinition + if err := yaml.Unmarshal([]byte(content), &definition); err != nil { + return flowDefinition{}, core.E("agentic.parseFlowDefinition", "invalid flow definition", err) + } + if definition.Name == "" || len(definition.Steps) == 0 { + return flowDefinition{}, core.E("agentic.parseFlowDefinition", "invalid flow definition", nil) + } + return definition, nil +} + +func flowInputLooksYaml(path string) bool { + return core.HasSuffix(path, ".yaml") || core.HasSuffix(path, ".yml") +} + +func flowSlugFromPath(path string) string { + slug := core.PathBase(path) + for _, suffix := range []string{".yaml", ".yml", ".md"} { + slug = core.TrimSuffix(slug, suffix) + } + return slug +} + +func flowStepSummary(step flowDefinitionStep) string { + label := core.Trim(step.Name) + if label == "" { + label = core.Trim(step.Flow) + } + if label == "" { + label = core.Trim(step.Agent) + } + if label == "" { + label = core.Trim(step.Run) + } + if label == "" { + label = "step" + } + + switch { + case step.Flow != "": + return core.Concat(label, ": flow ", step.Flow) + case step.Agent != "": + return core.Concat(label, ": agent ", step.Agent) + case step.Run != "": + return core.Concat(label, ": run ", step.Run) + case step.Gate != "": + return core.Concat(label, ": gate ", step.Gate) + default: + return label + } +} + type brainListOutput struct { Count int `json:"count"` Memories []brainListOutputEntry `json:"memories"` diff --git a/pkg/agentic/commands_flow_test.go b/pkg/agentic/commands_flow_test.go new file mode 100644 index 0000000..6848ac4 --- /dev/null +++ b/pkg/agentic/commands_flow_test.go @@ -0,0 +1,71 @@ +// SPDX-License-Identifier: EUPL-1.2 + +package agentic + +import ( + "testing" + + core "dappco.re/go/core" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestCommandsFlow_CmdRunFlow_Good_ReadsYamlFlowFile(t *testing.T) { + dir := t.TempDir() + flowPath := core.JoinPath(dir, "pkg", "lib", "flow", "verify") + require.True(t, fs.EnsureDir(flowPath).OK) + + filePath := core.JoinPath(flowPath, "go-qa.yaml") + require.True(t, fs.Write(filePath, core.Concat( + "name: Go QA\n", + "description: Build and test a Go project\n", + "steps:\n", + " - name: build\n", + " run: go build ./...\n", + " - name: verify\n", + " flow: verify/go-qa.yaml\n", + )).OK) + + s := newTestPrep(t) + output := captureStdout(t, func() { + r := s.cmdRunFlow(core.NewOptions(core.Option{Key: "_arg", Value: filePath})) + require.True(t, r.OK) + + flowOutput, ok := r.Value.(FlowRunOutput) + require.True(t, ok) + assert.True(t, flowOutput.Success) + assert.Equal(t, filePath, flowOutput.Source) + assert.Equal(t, "Go QA", flowOutput.Name) + assert.Equal(t, "Build and test a Go project", flowOutput.Description) + assert.Equal(t, 2, flowOutput.Steps) + }) + + assert.Contains(t, output, "steps: 2") + assert.Contains(t, output, "build: run go build ./...") + assert.Contains(t, output, "verify: flow verify/go-qa.yaml") +} + +func TestCommandsFlow_CmdRunFlow_Bad_MissingPath(t *testing.T) { + s := newTestPrep(t) + + r := s.cmdRunFlow(core.NewOptions()) + require.False(t, r.OK) + + err, ok := r.Value.(error) + require.True(t, ok) + assert.Contains(t, err.Error(), "flow path or slug is required") +} + +func TestCommandsFlow_CmdRunFlow_Ugly_InvalidYaml(t *testing.T) { + dir := t.TempDir() + filePath := core.JoinPath(dir, "broken-flow.yaml") + require.True(t, fs.Write(filePath, "name: [broken\n").OK) + + s := newTestPrep(t) + r := s.cmdRunFlow(core.NewOptions(core.Option{Key: "_arg", Value: filePath})) + require.False(t, r.OK) + + err, ok := r.Value.(error) + require.True(t, ok) + assert.Contains(t, err.Error(), "invalid flow definition") +} diff --git a/pkg/agentic/commands_test.go b/pkg/agentic/commands_test.go index 473533f..734184c 100644 --- a/pkg/agentic/commands_test.go +++ b/pkg/agentic/commands_test.go @@ -1350,6 +1350,7 @@ func TestCommands_RegisterCommands_Good_AllRegistered(t *testing.T) { cmds := c.Commands() assert.Contains(t, cmds, "run/task") + assert.Contains(t, cmds, "run/flow") assert.Contains(t, cmds, "dispatch/sync") assert.Contains(t, cmds, "run/orchestrator") assert.Contains(t, cmds, "dispatch")