feat(agentic): add flow preview command
Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
parent
3024286e4d
commit
78e663a0e0
3 changed files with 265 additions and 0 deletions
|
|
@ -10,6 +10,7 @@ import (
|
|||
|
||||
"dappco.re/go/agent/pkg/lib"
|
||||
core "dappco.re/go/core"
|
||||
"gopkg.in/yaml.v3"
|
||||
)
|
||||
|
||||
// c.Command("run/task", core.Command{Description: "Run a single task end-to-end", Action: s.cmdRunTask})
|
||||
|
|
@ -18,6 +19,7 @@ func (s *PrepSubsystem) registerCommands(ctx context.Context) {
|
|||
s.startupContext = ctx
|
||||
c := s.Core()
|
||||
c.Command("run/task", core.Command{Description: "Run a single task end-to-end", Action: s.cmdRunTask})
|
||||
c.Command("run/flow", core.Command{Description: "Show a flow definition from disk or the embedded library", Action: s.cmdRunFlow})
|
||||
c.Command("dispatch/sync", core.Command{Description: "Dispatch a single task synchronously and block until it completes", Action: s.cmdDispatchSync})
|
||||
c.Command("run/orchestrator", core.Command{Description: "Run the queue orchestrator (standalone, no MCP)", Action: s.cmdOrchestrator})
|
||||
c.Command("dispatch", core.Command{Description: "Dispatch queued agents", Action: s.cmdDispatch})
|
||||
|
|
@ -78,6 +80,62 @@ func (s *PrepSubsystem) cmdRunTask(options core.Options) core.Result {
|
|||
return s.runDispatchSync(s.commandContext(), options, "run task", "agentic.runTask")
|
||||
}
|
||||
|
||||
func (s *PrepSubsystem) cmdRunFlow(options core.Options) core.Result {
|
||||
flowPath := optionStringValue(options, "_arg", "path", "slug")
|
||||
if flowPath == "" {
|
||||
core.Print(nil, "usage: core-agent run flow <path-or-slug>")
|
||||
return core.Result{Value: core.E("agentic.cmdRunFlow", "flow path or slug is required", nil), OK: false}
|
||||
}
|
||||
|
||||
flowResult := readFlowDocument(flowPath)
|
||||
if !flowResult.OK {
|
||||
core.Print(nil, "error: %v", flowResult.Value)
|
||||
return core.Result{Value: flowResult.Value, OK: false}
|
||||
}
|
||||
|
||||
document, ok := flowResult.Value.(flowRunDocument)
|
||||
if !ok {
|
||||
err := core.E("agentic.cmdRunFlow", "invalid flow definition", nil)
|
||||
core.Print(nil, "error: %v", err)
|
||||
return core.Result{Value: err, OK: false}
|
||||
}
|
||||
|
||||
output := FlowRunOutput{
|
||||
Success: true,
|
||||
Source: document.Source,
|
||||
Name: document.Definition.Name,
|
||||
Description: document.Definition.Description,
|
||||
Steps: len(document.Definition.Steps),
|
||||
Parsed: document.Parsed,
|
||||
}
|
||||
|
||||
core.Print(nil, "flow: %s", document.Source)
|
||||
if document.Parsed {
|
||||
if document.Definition.Name != "" {
|
||||
core.Print(nil, "name: %s", document.Definition.Name)
|
||||
}
|
||||
if document.Definition.Description != "" {
|
||||
core.Print(nil, "desc: %s", document.Definition.Description)
|
||||
}
|
||||
if len(document.Definition.Steps) == 0 {
|
||||
core.Print(nil, "steps: 0")
|
||||
return core.Result{Value: output, OK: true}
|
||||
}
|
||||
|
||||
core.Print(nil, "steps: %d", len(document.Definition.Steps))
|
||||
for index, step := range document.Definition.Steps {
|
||||
core.Print(nil, " %d. %s", index+1, flowStepSummary(step))
|
||||
}
|
||||
return core.Result{Value: output, OK: true}
|
||||
}
|
||||
|
||||
if document.Content != "" {
|
||||
core.Print(nil, "content: %d chars", len(document.Content))
|
||||
}
|
||||
|
||||
return core.Result{Value: output, OK: true}
|
||||
}
|
||||
|
||||
func (s *PrepSubsystem) cmdDispatchSync(options core.Options) core.Result {
|
||||
return s.runDispatchSync(s.commandContext(), options, "dispatch sync", "agentic.runDispatchSync")
|
||||
}
|
||||
|
|
@ -748,6 +806,141 @@ func parseIntString(s string) int {
|
|||
return n
|
||||
}
|
||||
|
||||
type FlowRunOutput struct {
|
||||
Success bool `json:"success"`
|
||||
Source string `json:"source,omitempty"`
|
||||
Name string `json:"name,omitempty"`
|
||||
Description string `json:"description,omitempty"`
|
||||
Steps int `json:"steps,omitempty"`
|
||||
Parsed bool `json:"parsed,omitempty"`
|
||||
}
|
||||
|
||||
type flowDefinition struct {
|
||||
Name string `yaml:"name"`
|
||||
Description string `yaml:"description"`
|
||||
Steps []flowDefinitionStep `yaml:"steps"`
|
||||
}
|
||||
|
||||
type flowDefinitionStep struct {
|
||||
Name string `yaml:"name"`
|
||||
Run string `yaml:"run"`
|
||||
Flow string `yaml:"flow"`
|
||||
Agent string `yaml:"agent"`
|
||||
Prompt string `yaml:"prompt"`
|
||||
Template string `yaml:"template"`
|
||||
Timeout string `yaml:"timeout"`
|
||||
When string `yaml:"when"`
|
||||
Output string `yaml:"output"`
|
||||
Gate string `yaml:"gate"`
|
||||
Parallel []flowDefinitionStep `yaml:"parallel"`
|
||||
}
|
||||
|
||||
type flowRunDocument struct {
|
||||
Source string
|
||||
Content string
|
||||
Parsed bool
|
||||
Definition flowDefinition
|
||||
}
|
||||
|
||||
func readFlowDocument(path string) core.Result {
|
||||
if readResult := fs.Read(path); readResult.OK {
|
||||
content := readResult.Value.(string)
|
||||
definition, err := parseFlowDefinition(content)
|
||||
if err != nil {
|
||||
if flowInputLooksYaml(path) {
|
||||
return core.Result{Value: err, OK: false}
|
||||
}
|
||||
return core.Result{Value: flowRunDocument{
|
||||
Source: path,
|
||||
Content: content,
|
||||
Parsed: false,
|
||||
}, OK: true}
|
||||
}
|
||||
return core.Result{Value: flowRunDocument{
|
||||
Source: path,
|
||||
Content: content,
|
||||
Parsed: true,
|
||||
Definition: definition,
|
||||
}, OK: true}
|
||||
}
|
||||
|
||||
flowResult := lib.Flow(flowSlugFromPath(path))
|
||||
if !flowResult.OK {
|
||||
if err, ok := flowResult.Value.(error); ok {
|
||||
return core.Result{Value: core.E("agentic.cmdRunFlow", core.Concat("flow not found: ", path), err), OK: false}
|
||||
}
|
||||
return core.Result{Value: core.E("agentic.cmdRunFlow", core.Concat("flow not found: ", path), nil), OK: false}
|
||||
}
|
||||
|
||||
content := flowResult.Value.(string)
|
||||
definition, err := parseFlowDefinition(content)
|
||||
if err != nil {
|
||||
return core.Result{Value: flowRunDocument{
|
||||
Source: core.Concat("embedded:", flowSlugFromPath(path)),
|
||||
Content: content,
|
||||
Parsed: false,
|
||||
}, OK: true}
|
||||
}
|
||||
return core.Result{Value: flowRunDocument{
|
||||
Source: core.Concat("embedded:", flowSlugFromPath(path)),
|
||||
Content: content,
|
||||
Parsed: true,
|
||||
Definition: definition,
|
||||
}, OK: true}
|
||||
}
|
||||
|
||||
func parseFlowDefinition(content string) (flowDefinition, error) {
|
||||
var definition flowDefinition
|
||||
if err := yaml.Unmarshal([]byte(content), &definition); err != nil {
|
||||
return flowDefinition{}, core.E("agentic.parseFlowDefinition", "invalid flow definition", err)
|
||||
}
|
||||
if definition.Name == "" || len(definition.Steps) == 0 {
|
||||
return flowDefinition{}, core.E("agentic.parseFlowDefinition", "invalid flow definition", nil)
|
||||
}
|
||||
return definition, nil
|
||||
}
|
||||
|
||||
func flowInputLooksYaml(path string) bool {
|
||||
return core.HasSuffix(path, ".yaml") || core.HasSuffix(path, ".yml")
|
||||
}
|
||||
|
||||
func flowSlugFromPath(path string) string {
|
||||
slug := core.PathBase(path)
|
||||
for _, suffix := range []string{".yaml", ".yml", ".md"} {
|
||||
slug = core.TrimSuffix(slug, suffix)
|
||||
}
|
||||
return slug
|
||||
}
|
||||
|
||||
func flowStepSummary(step flowDefinitionStep) string {
|
||||
label := core.Trim(step.Name)
|
||||
if label == "" {
|
||||
label = core.Trim(step.Flow)
|
||||
}
|
||||
if label == "" {
|
||||
label = core.Trim(step.Agent)
|
||||
}
|
||||
if label == "" {
|
||||
label = core.Trim(step.Run)
|
||||
}
|
||||
if label == "" {
|
||||
label = "step"
|
||||
}
|
||||
|
||||
switch {
|
||||
case step.Flow != "":
|
||||
return core.Concat(label, ": flow ", step.Flow)
|
||||
case step.Agent != "":
|
||||
return core.Concat(label, ": agent ", step.Agent)
|
||||
case step.Run != "":
|
||||
return core.Concat(label, ": run ", step.Run)
|
||||
case step.Gate != "":
|
||||
return core.Concat(label, ": gate ", step.Gate)
|
||||
default:
|
||||
return label
|
||||
}
|
||||
}
|
||||
|
||||
type brainListOutput struct {
|
||||
Count int `json:"count"`
|
||||
Memories []brainListOutputEntry `json:"memories"`
|
||||
|
|
|
|||
71
pkg/agentic/commands_flow_test.go
Normal file
71
pkg/agentic/commands_flow_test.go
Normal file
|
|
@ -0,0 +1,71 @@
|
|||
// SPDX-License-Identifier: EUPL-1.2
|
||||
|
||||
package agentic
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
core "dappco.re/go/core"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestCommandsFlow_CmdRunFlow_Good_ReadsYamlFlowFile(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
flowPath := core.JoinPath(dir, "pkg", "lib", "flow", "verify")
|
||||
require.True(t, fs.EnsureDir(flowPath).OK)
|
||||
|
||||
filePath := core.JoinPath(flowPath, "go-qa.yaml")
|
||||
require.True(t, fs.Write(filePath, core.Concat(
|
||||
"name: Go QA\n",
|
||||
"description: Build and test a Go project\n",
|
||||
"steps:\n",
|
||||
" - name: build\n",
|
||||
" run: go build ./...\n",
|
||||
" - name: verify\n",
|
||||
" flow: verify/go-qa.yaml\n",
|
||||
)).OK)
|
||||
|
||||
s := newTestPrep(t)
|
||||
output := captureStdout(t, func() {
|
||||
r := s.cmdRunFlow(core.NewOptions(core.Option{Key: "_arg", Value: filePath}))
|
||||
require.True(t, r.OK)
|
||||
|
||||
flowOutput, ok := r.Value.(FlowRunOutput)
|
||||
require.True(t, ok)
|
||||
assert.True(t, flowOutput.Success)
|
||||
assert.Equal(t, filePath, flowOutput.Source)
|
||||
assert.Equal(t, "Go QA", flowOutput.Name)
|
||||
assert.Equal(t, "Build and test a Go project", flowOutput.Description)
|
||||
assert.Equal(t, 2, flowOutput.Steps)
|
||||
})
|
||||
|
||||
assert.Contains(t, output, "steps: 2")
|
||||
assert.Contains(t, output, "build: run go build ./...")
|
||||
assert.Contains(t, output, "verify: flow verify/go-qa.yaml")
|
||||
}
|
||||
|
||||
func TestCommandsFlow_CmdRunFlow_Bad_MissingPath(t *testing.T) {
|
||||
s := newTestPrep(t)
|
||||
|
||||
r := s.cmdRunFlow(core.NewOptions())
|
||||
require.False(t, r.OK)
|
||||
|
||||
err, ok := r.Value.(error)
|
||||
require.True(t, ok)
|
||||
assert.Contains(t, err.Error(), "flow path or slug is required")
|
||||
}
|
||||
|
||||
func TestCommandsFlow_CmdRunFlow_Ugly_InvalidYaml(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
filePath := core.JoinPath(dir, "broken-flow.yaml")
|
||||
require.True(t, fs.Write(filePath, "name: [broken\n").OK)
|
||||
|
||||
s := newTestPrep(t)
|
||||
r := s.cmdRunFlow(core.NewOptions(core.Option{Key: "_arg", Value: filePath}))
|
||||
require.False(t, r.OK)
|
||||
|
||||
err, ok := r.Value.(error)
|
||||
require.True(t, ok)
|
||||
assert.Contains(t, err.Error(), "invalid flow definition")
|
||||
}
|
||||
|
|
@ -1350,6 +1350,7 @@ func TestCommands_RegisterCommands_Good_AllRegistered(t *testing.T) {
|
|||
|
||||
cmds := c.Commands()
|
||||
assert.Contains(t, cmds, "run/task")
|
||||
assert.Contains(t, cmds, "run/flow")
|
||||
assert.Contains(t, cmds, "dispatch/sync")
|
||||
assert.Contains(t, cmds, "run/orchestrator")
|
||||
assert.Contains(t, cmds, "dispatch")
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue