From 53b46c33daf19ed605ceff2a6cc5e36d2c6fade5 Mon Sep 17 00:00:00 2001 From: Snider Date: Sat, 25 Apr 2026 23:25:35 +0100 Subject: [PATCH] feat(agent/pipeline): add core pipeline command tree (#535) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Per RFC.pipeline.md "core pipeline Command Tree": full audit/epic/ monitor/fix/onboard/budget/training subcommand tree wired into core-agent. Lands across 18 files: * pkg/agentic/pipeline_commands.go — registry: audit, epic/create|run| status|sync, monitor, fix/reviews|conflicts|format|threads, onboard, budget/*, training/* * pkg/agentic/commands.go — pipeline registration wired in * pkg/agentic/pipeline_audit.go — audit issue expansion + bug fix: audit-created implementation issues no longer carry the 'audit' label, so epic + onboard see them as implementation candidates * pkg/agentic/pipeline_epic.go — epic group/run/sync * pkg/agentic/pipeline_monitor.go — open-PR watcher * pkg/agentic/pipeline_fix.go — reviews/conflicts/format/threads helpers * pkg/agentic/pipeline_onboard.go — chained audit → epic → dispatch * pkg/agentic/pipeline_budget.go + pipeline_training.go — stubs returning blocked-on-sibling pointer (sibling tickets own deep impl) * pkg/agentic/pipeline_*_test.go — AX-10 per handler * tests/cli/pipeline/Taskfile.yaml — CLI smoke coverage go test could not complete in this sandbox due to wider workspace go.sum/private-module issues outside this ticket; supervisor catches in clean workspace. Co-authored-by: Codex Closes tasks.lthn.sh/view.php?id=535 --- pkg/agentic/commands.go | 1 + pkg/agentic/pipeline_audit.go | 452 +++++++++++++++++++++ pkg/agentic/pipeline_audit_test.go | 74 ++++ pkg/agentic/pipeline_budget.go | 23 ++ pkg/agentic/pipeline_budget_test.go | 34 ++ pkg/agentic/pipeline_commands.go | 283 +++++++++++++ pkg/agentic/pipeline_commands_test.go | 452 +++++++++++++++++++++ pkg/agentic/pipeline_epic.go | 546 ++++++++++++++++++++++++++ pkg/agentic/pipeline_epic_test.go | 92 +++++ pkg/agentic/pipeline_fix.go | 286 ++++++++++++++ pkg/agentic/pipeline_fix_test.go | 85 ++++ pkg/agentic/pipeline_monitor.go | 501 +++++++++++++++++++++++ pkg/agentic/pipeline_monitor_test.go | 104 +++++ pkg/agentic/pipeline_onboard.go | 162 ++++++++ pkg/agentic/pipeline_onboard_test.go | 72 ++++ pkg/agentic/pipeline_training.go | 32 ++ pkg/agentic/pipeline_training_test.go | 35 ++ tests/cli/pipeline/Taskfile.yaml | 26 ++ 18 files changed, 3260 insertions(+) create mode 100644 pkg/agentic/pipeline_audit.go create mode 100644 pkg/agentic/pipeline_audit_test.go create mode 100644 pkg/agentic/pipeline_budget.go create mode 100644 pkg/agentic/pipeline_budget_test.go create mode 100644 pkg/agentic/pipeline_commands.go create mode 100644 pkg/agentic/pipeline_commands_test.go create mode 100644 pkg/agentic/pipeline_epic.go create mode 100644 pkg/agentic/pipeline_epic_test.go create mode 100644 pkg/agentic/pipeline_fix.go create mode 100644 pkg/agentic/pipeline_fix_test.go create mode 100644 pkg/agentic/pipeline_monitor.go create mode 100644 pkg/agentic/pipeline_monitor_test.go create mode 100644 pkg/agentic/pipeline_onboard.go create mode 100644 pkg/agentic/pipeline_onboard_test.go create mode 100644 pkg/agentic/pipeline_training.go create mode 100644 pkg/agentic/pipeline_training_test.go create mode 100644 tests/cli/pipeline/Taskfile.yaml diff --git a/pkg/agentic/commands.go b/pkg/agentic/commands.go index 5c2ec19..c6ea7e4 100644 --- a/pkg/agentic/commands.go +++ b/pkg/agentic/commands.go @@ -95,6 +95,7 @@ func (s *PrepSubsystem) registerCommands(ctx context.Context) { s.registerSprintCommands() s.registerStateCommands() s.registerCoreCommands() + s.registerPipelineCommands() s.registerLanguageCommands() s.registerSetupCommands() } diff --git a/pkg/agentic/pipeline_audit.go b/pkg/agentic/pipeline_audit.go new file mode 100644 index 0000000..b5f4778 --- /dev/null +++ b/pkg/agentic/pipeline_audit.go @@ -0,0 +1,452 @@ +// SPDX-License-Identifier: EUPL-1.2 + +package agentic + +import ( + "context" + "regexp" + + core "dappco.re/go/core" +) + +var pipelineBulletPattern = regexp.MustCompile(`^\s*(?:[-*]|\d+\.)\s+(.*)$`) +var pipelineWhitespacePattern = regexp.MustCompile(`\s+`) + +type PipelineIssueRef struct { + Number int `json:"number"` + Title string `json:"title"` + State string `json:"state,omitempty"` + URL string `json:"url,omitempty"` + Labels []string `json:"labels,omitempty"` +} + +type PipelineAuditInput struct { + Org string `json:"org,omitempty"` + Repo string `json:"repo"` + DryRun bool `json:"dry_run,omitempty"` +} + +type PipelineAuditOutput struct { + Success bool `json:"success"` + Org string `json:"org,omitempty"` + Repo string `json:"repo"` + Audits []PipelineIssueRef `json:"audits"` + Created []PipelineIssueRef `json:"created"` + Existing []PipelineIssueRef `json:"existing"` + Closed []int `json:"closed,omitempty"` +} + +type pipelineRepoRecord struct { + Name string `json:"name"` + DefaultBranch string `json:"default_branch"` + HTMLURL string `json:"html_url"` +} + +type pipelineLabelRecord struct { + ID int64 `json:"id"` + Name string `json:"name"` +} + +type pipelineIssueRecord struct { + Number int `json:"number"` + Title string `json:"title"` + Body string `json:"body"` + State string `json:"state"` + HTMLURL string `json:"html_url"` + Labels []pipelineLabelRecord `json:"labels"` + PullRequest map[string]any `json:"pull_request"` +} + +func (s *PrepSubsystem) cmdPipelineAudit(options core.Options) core.Result { + ctx := s.commandContext() + org := pipelineOrgValue(options) + repo := pipelineRepoValue(options) + if repo == "" { + core.Print(nil, "usage: core-agent pipeline/audit [--org=core] [--dry-run]") + return core.Result{Value: core.E("agentic.cmdPipelineAudit", "repo is required", nil), OK: false} + } + + output, err := s.pipelineAudit(ctx, PipelineAuditInput{ + Org: org, + Repo: repo, + DryRun: optionBoolValue(options, "dry_run", "dry-run"), + }) + if err != nil { + core.Print(nil, "error: %v", err) + return core.Result{Value: err, OK: false} + } + + core.Print(nil, "repo: %s/%s", output.Org, output.Repo) + core.Print(nil, "audits: %d", len(output.Audits)) + core.Print(nil, "created: %d", len(output.Created)) + core.Print(nil, "existing: %d", len(output.Existing)) + if len(output.Closed) > 0 { + core.Print(nil, "closed: %d", len(output.Closed)) + } + for _, issue := range output.Created { + core.Print(nil, " created: #%d %s", issue.Number, issue.Title) + } + for _, issue := range output.Existing { + core.Print(nil, " existing: #%d %s", issue.Number, issue.Title) + } + if len(output.Audits) == 0 { + core.Print(nil, "no audit issues") + } + + return core.Result{Value: output, OK: true} +} + +func (s *PrepSubsystem) pipelineAudit(ctx context.Context, input PipelineAuditInput) (PipelineAuditOutput, error) { + if input.Repo == "" { + return PipelineAuditOutput{}, core.E("pipelineAudit", "repo is required", nil) + } + if s.forgeToken == "" { + return PipelineAuditOutput{}, core.E("pipelineAudit", "no Forge token configured", nil) + } + if input.Org == "" { + input.Org = "core" + } + + issues, err := s.pipelineListIssues(ctx, input.Org, input.Repo, "open") + if err != nil { + return PipelineAuditOutput{}, err + } + + output := PipelineAuditOutput{ + Success: true, + Org: input.Org, + Repo: input.Repo, + } + + existingByTitle := make(map[string]PipelineIssueRef) + for _, issue := range issues { + if pipelineIssueState(issue) != "open" || pipelineIssueIsAudit(issue) || pipelineIssueIsEpic(issue) { + continue + } + key := pipelineAuditExistingKey(issue) + if key == "" { + continue + } + existingByTitle[key] = pipelineIssueRefFromRecord(issue) + } + + for _, issue := range issues { + if !pipelineIssueIsAudit(issue) { + continue + } + output.Audits = append(output.Audits, pipelineIssueRefFromRecord(issue)) + + findings := pipelineAuditFindings(issue) + if len(findings) == 0 { + findings = []string{issue.Title} + } + + linked := make([]PipelineIssueRef, 0, len(findings)) + for _, finding := range findings { + title := pipelineAuditImplementationTitle(input.Repo, issue, finding) + key := core.Concat(core.Sprint(issue.Number), ":", title) + if existing, ok := existingByTitle[key]; ok { + output.Existing = append(output.Existing, existing) + linked = append(linked, existing) + continue + } + + labels := pipelineAuditLabels(issue) + body := pipelineAuditImplementationBody(issue, finding) + if input.DryRun { + ref := PipelineIssueRef{Title: title, State: "planned", Labels: labels} + output.Created = append(output.Created, ref) + linked = append(linked, ref) + continue + } + + labelIDs := s.resolveLabelIDs(ctx, input.Org, input.Repo, labels) + created, createErr := s.createIssue(ctx, input.Org, input.Repo, title, body, labelIDs) + if createErr != nil { + return PipelineAuditOutput{}, core.E("pipelineAudit", core.Concat("failed to create implementation issue for audit #", core.Sprint(issue.Number)), createErr) + } + + ref := PipelineIssueRef{ + Number: created.Number, + Title: created.Title, + URL: created.URL, + State: "open", + Labels: labels, + } + existingByTitle[key] = ref + output.Created = append(output.Created, ref) + linked = append(linked, ref) + } + + if input.DryRun || len(linked) == 0 { + continue + } + + s.commentOnIssue(ctx, input.Org, input.Repo, issue.Number, pipelineAuditLinkedComment(linked)) + if closeErr := s.pipelinePatchIssue(ctx, input.Org, input.Repo, issue.Number, map[string]any{"state": "closed"}); closeErr != nil { + return PipelineAuditOutput{}, closeErr + } + output.Closed = append(output.Closed, issue.Number) + } + + return output, nil +} + +func (s *PrepSubsystem) pipelineListOrgRepos(ctx context.Context, org string) ([]pipelineRepoRecord, error) { + url := core.Sprintf("%s/api/v1/orgs/%s/repos?limit=100&page=1", s.forgeURL, org) + result := HTTPGet(ctx, url, s.forgeToken, "token") + if !result.OK { + return nil, core.E("pipelineListOrgRepos", core.Concat("failed to list repos for ", org), nil) + } + + var repos []pipelineRepoRecord + if err := pipelineDecodeJSON(result.Value.(string), &repos, "pipelineListOrgRepos", "failed to decode repo list"); err != nil { + return nil, err + } + return repos, nil +} + +func (s *PrepSubsystem) pipelineGetRepo(ctx context.Context, org, repo string) (pipelineRepoRecord, error) { + url := core.Sprintf("%s/api/v1/repos/%s/%s", s.forgeURL, org, repo) + result := HTTPGet(ctx, url, s.forgeToken, "token") + if !result.OK { + return pipelineRepoRecord{}, core.E("pipelineGetRepo", core.Concat("failed to read repo ", repo), nil) + } + + var record pipelineRepoRecord + if err := pipelineDecodeJSON(result.Value.(string), &record, "pipelineGetRepo", "failed to decode repo"); err != nil { + return pipelineRepoRecord{}, err + } + return record, nil +} + +func (s *PrepSubsystem) pipelineListIssues(ctx context.Context, org, repo, state string) ([]pipelineIssueRecord, error) { + if state == "" { + state = "open" + } + + url := core.Sprintf("%s/api/v1/repos/%s/%s/issues?state=%s&limit=100&type=issues", s.forgeURL, org, repo, state) + result := HTTPGet(ctx, url, s.forgeToken, "token") + if !result.OK { + return nil, core.E("pipelineListIssues", core.Concat("failed to list issues for ", repo), nil) + } + + var issues []pipelineIssueRecord + if err := pipelineDecodeJSON(result.Value.(string), &issues, "pipelineListIssues", "failed to decode issue list"); err != nil { + return nil, err + } + return issues, nil +} + +func (s *PrepSubsystem) pipelineGetIssue(ctx context.Context, org, repo string, number int) (pipelineIssueRecord, error) { + url := core.Sprintf("%s/api/v1/repos/%s/%s/issues/%d", s.forgeURL, org, repo, number) + result := HTTPGet(ctx, url, s.forgeToken, "token") + if !result.OK { + return pipelineIssueRecord{}, core.E("pipelineGetIssue", core.Concat("failed to read issue #", core.Sprint(number)), nil) + } + + var issue pipelineIssueRecord + if err := pipelineDecodeJSON(result.Value.(string), &issue, "pipelineGetIssue", "failed to decode issue"); err != nil { + return pipelineIssueRecord{}, err + } + return issue, nil +} + +func (s *PrepSubsystem) pipelinePatchIssue(ctx context.Context, org, repo string, number int, payload map[string]any) error { + url := core.Sprintf("%s/api/v1/repos/%s/%s/issues/%d", s.forgeURL, org, repo, number) + result := HTTPPatch(ctx, url, core.JSONMarshalString(payload), s.forgeToken, "token") + if !result.OK { + return core.E("pipelinePatchIssue", core.Concat("failed to update issue #", core.Sprint(number)), nil) + } + return nil +} + +func pipelineDecodeJSON(data string, target any, errorName, message string) error { + parseResult := core.JSONUnmarshalString(data, target) + if !parseResult.OK { + err, _ := parseResult.Value.(error) + return core.E(errorName, message, err) + } + return nil +} + +func pipelineIssueRefFromRecord(issue pipelineIssueRecord) PipelineIssueRef { + return PipelineIssueRef{ + Number: issue.Number, + Title: issue.Title, + State: pipelineIssueState(issue), + URL: issue.HTMLURL, + Labels: pipelineIssueLabelNames(issue), + } +} + +func pipelineIssueState(issue pipelineIssueRecord) string { + state := core.Lower(core.Trim(issue.State)) + if state == "" { + return "open" + } + return state +} + +func pipelineIssueLabelNames(issue pipelineIssueRecord) []string { + names := make([]string, 0, len(issue.Labels)) + for _, label := range issue.Labels { + if label.Name != "" { + names = append(names, label.Name) + } + } + return names +} + +func pipelineIssueHasLabel(issue pipelineIssueRecord, want string) bool { + for _, name := range pipelineIssueLabelNames(issue) { + if core.Lower(name) == core.Lower(want) { + return true + } + } + return false +} + +func pipelineIssueIsAudit(issue pipelineIssueRecord) bool { + title := core.Lower(issue.Title) + return pipelineIssueHasLabel(issue, "audit") || core.Contains(title, "[audit]") || core.HasPrefix(title, "audit:") +} + +func pipelineIssueIsEpic(issue pipelineIssueRecord) bool { + return pipelineIssueHasLabel(issue, "epic") || regexp.MustCompile(`(?m)^\s*-\s*\[[ xX]\]\s*#\d+`).MatchString(issue.Body) +} + +func pipelineIssueIsImplementationCandidate(issue pipelineIssueRecord) bool { + if pipelineIssueState(issue) != "open" || pipelineIssueIsAudit(issue) || pipelineIssueIsEpic(issue) { + return false + } + if len(issue.PullRequest) > 0 { + return false + } + return !core.Contains(issue.Body, "Parent: #") +} + +func pipelineAuditFindings(issue pipelineIssueRecord) []string { + lines := core.Split(issue.Body, "\n") + findings := []string{} + for _, line := range lines { + trimmed := core.Trim(line) + if trimmed == "" || core.HasPrefix(trimmed, "#") { + continue + } + + match := pipelineBulletPattern.FindStringSubmatch(trimmed) + if len(match) == 2 { + findings = append(findings, pipelineFindingSummary(match[1])) + } + } + + if len(findings) > 0 { + return findings + } + + paragraphs := core.Split(issue.Body, "\n\n") + for _, paragraph := range paragraphs { + summary := pipelineFindingSummary(paragraph) + if summary == "" || summary == issue.Title { + continue + } + findings = append(findings, summary) + break + } + return findings +} + +func pipelineFindingSummary(value string) string { + summary := pipelineWhitespacePattern.ReplaceAllString(core.Trim(value), " ") + summary = regexp.MustCompile("`").ReplaceAllString(summary, "") + if summary == "" { + return "" + } + if len(summary) > 96 { + summary = core.Concat(summary[:93], "...") + } + return summary +} + +func pipelineAuditIssueType(issue pipelineIssueRecord) string { + haystack := core.Lower(core.Concat(issue.Title, " ", issue.Body, " ", core.JSONMarshalString(pipelineIssueLabelNames(issue)))) + switch { + case core.Contains(haystack, "security"), core.Contains(haystack, "owasp"), core.Contains(haystack, "auth"), core.Contains(haystack, "sanitize"), core.Contains(haystack, "validation"): + return "security" + case core.Contains(haystack, "test"), core.Contains(haystack, "coverage"): + return "testing" + case core.Contains(haystack, "perf"), core.Contains(haystack, "performance"): + return "performance" + case core.Contains(haystack, "doc"): + return "docs" + default: + return "quality" + } +} + +func pipelineAuditSeverity(issue pipelineIssueRecord) string { + haystack := core.Lower(core.Concat(issue.Title, " ", issue.Body)) + switch { + case core.Contains(haystack, "critical"): + return "critical" + case core.Contains(haystack, "high"): + return "high" + case core.Contains(haystack, "medium"): + return "medium" + case core.Contains(haystack, "low"): + return "low" + default: + return "" + } +} + +func pipelineAuditLabels(issue pipelineIssueRecord) []string { + labels := []string{"agentic", pipelineAuditIssueType(issue)} + if severity := pipelineAuditSeverity(issue); severity != "" { + labels = append(labels, severity) + } + return labels +} + +func pipelineAuditImplementationTitle(repo string, issue pipelineIssueRecord, finding string) string { + issueType := pipelineAuditIssueType(issue) + summary := pipelineFindingSummary(finding) + if summary == "" { + summary = issue.Title + } + return core.Sprintf("%s(%s): %s", issueType, repo, summary) +} + +func pipelineAuditImplementationBody(issue pipelineIssueRecord, finding string) string { + builder := core.NewBuilder() + builder.WriteString(core.Sprintf("Parent audit: #%d\n\n", issue.Number)) + builder.WriteString("## Finding\n\n") + builder.WriteString(pipelineFindingSummary(finding)) + builder.WriteString("\n\n## Acceptance Criteria\n\n") + builder.WriteString("- [ ] Implement the audit finding\n") + builder.WriteString("- [ ] Update tests or validation where needed\n") + builder.WriteString("- [ ] Link the resulting PR back to this issue\n") + return builder.String() +} + +func pipelineAuditExistingKey(issue pipelineIssueRecord) string { + match := regexp.MustCompile(`Parent audit:\s*#(\d+)`).FindStringSubmatch(issue.Body) + if len(match) != 2 { + return "" + } + return core.Concat(match[1], ":", issue.Title) +} + +func pipelineAuditLinkedComment(linked []PipelineIssueRef) string { + builder := core.NewBuilder() + builder.WriteString("Implementation issues created:\n") + for _, issue := range linked { + if issue.Number > 0 { + builder.WriteString(core.Sprintf("- #%d %s\n", issue.Number, issue.Title)) + continue + } + builder.WriteString(core.Sprintf("- %s\n", issue.Title)) + } + return builder.String() +} diff --git a/pkg/agentic/pipeline_audit_test.go b/pkg/agentic/pipeline_audit_test.go new file mode 100644 index 0000000..12adf68 --- /dev/null +++ b/pkg/agentic/pipeline_audit_test.go @@ -0,0 +1,74 @@ +// SPDX-License-Identifier: EUPL-1.2 + +package agentic + +import ( + "context" + "testing" + + core "dappco.re/go/core" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestPipelineAudit_Good_CreatesImplementationIssuesAndClosesAudit(t *testing.T) { + repo := newPipelineTestRepo() + repo.Issues[1] = &pipelineTestIssue{ + Number: 1, + Title: "[Audit] Security", + Body: "- Validate tokens\n- Sanitize input\n- Add rate limiting", + State: "open", + Labels: []string{"audit", "security"}, + } + srv := newPipelineTestServer(t, map[string]*pipelineTestRepo{"go-io": repo}) + + s, _ := testPrepWithCore(t, srv) + output, err := s.pipelineAudit(context.Background(), PipelineAuditInput{Org: "core", Repo: "go-io"}) + + require.NoError(t, err) + assert.True(t, output.Success) + assert.Len(t, output.Audits, 1) + assert.Len(t, output.Created, 3) + assert.Equal(t, []int{1}, output.Closed) + assert.Equal(t, "closed", repo.Issues[1].State) + assert.Len(t, repo.Comments[1], 1) + assert.Contains(t, repo.Comments[1][0], "Implementation issues created") +} + +func TestPipelineAudit_Bad_MissingRepo(t *testing.T) { + s, _ := testPrepWithCore(t, nil) + + result := s.cmdPipelineAudit(core.NewOptions()) + + require.False(t, result.OK) + err, ok := result.Value.(error) + require.True(t, ok) + assert.Contains(t, err.Error(), "repo is required") +} + +func TestPipelineAudit_Ugly_DeduplicatesExistingImplementationIssue(t *testing.T) { + repo := newPipelineTestRepo() + repo.Issues[1] = &pipelineTestIssue{ + Number: 1, + Title: "[Audit] Security", + Body: "- Validate tokens\n- Sanitize input", + State: "open", + Labels: []string{"audit", "security"}, + } + repo.Issues[2] = &pipelineTestIssue{ + Number: 2, + Title: "security(go-io): Validate tokens", + Body: "Parent audit: #1", + State: "open", + Labels: []string{"agentic", "security"}, + } + srv := newPipelineTestServer(t, map[string]*pipelineTestRepo{"go-io": repo}) + + s, _ := testPrepWithCore(t, srv) + output, err := s.pipelineAudit(context.Background(), PipelineAuditInput{Org: "core", Repo: "go-io"}) + + require.NoError(t, err) + assert.Len(t, output.Existing, 1) + assert.Len(t, output.Created, 1) + assert.Equal(t, 2, output.Existing[0].Number) +} diff --git a/pkg/agentic/pipeline_budget.go b/pkg/agentic/pipeline_budget.go new file mode 100644 index 0000000..59bf568 --- /dev/null +++ b/pkg/agentic/pipeline_budget.go @@ -0,0 +1,23 @@ +// SPDX-License-Identifier: EUPL-1.2 + +package agentic + +import core "dappco.re/go/core" + +func (s *PrepSubsystem) cmdPipelineBudgetPlan(_ core.Options) core.Result { + core.Print(nil, "status: not yet implemented") + core.Print(nil, "reason: blocked-on-sibling") + return core.Result{ + Value: core.E("agentic.cmdPipelineBudgetPlan", "not yet implemented - blocked-on-sibling", nil), + OK: false, + } +} + +func (s *PrepSubsystem) cmdPipelineBudgetLog(_ core.Options) core.Result { + core.Print(nil, "status: not yet implemented") + core.Print(nil, "reason: blocked-on-sibling") + return core.Result{ + Value: core.E("agentic.cmdPipelineBudgetLog", "not yet implemented - blocked-on-sibling", nil), + OK: false, + } +} diff --git a/pkg/agentic/pipeline_budget_test.go b/pkg/agentic/pipeline_budget_test.go new file mode 100644 index 0000000..36b33c9 --- /dev/null +++ b/pkg/agentic/pipeline_budget_test.go @@ -0,0 +1,34 @@ +// SPDX-License-Identifier: EUPL-1.2 + +package agentic + +import ( + "testing" + + core "dappco.re/go/core" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestPipelineBudget_Good_RootHelp(t *testing.T) { + s, _ := testPrepWithCore(t, nil) + + output := captureStdout(t, func() { + result := s.cmdPipelineBudget(core.NewOptions()) + require.True(t, result.OK) + }) + + assert.Contains(t, output, "core-agent pipeline/budget/plan") + assert.Contains(t, output, "core-agent pipeline/budget/log") +} + +func TestPipelineBudget_Bad_PlanBlockedOnSibling(t *testing.T) { + s, _ := testPrepWithCore(t, nil) + + result := s.cmdPipelineBudgetPlan(core.NewOptions()) + + require.False(t, result.OK) + err, ok := result.Value.(error) + require.True(t, ok) + assert.Contains(t, err.Error(), "blocked-on-sibling") +} diff --git a/pkg/agentic/pipeline_commands.go b/pkg/agentic/pipeline_commands.go new file mode 100644 index 0000000..10829f8 --- /dev/null +++ b/pkg/agentic/pipeline_commands.go @@ -0,0 +1,283 @@ +// SPDX-License-Identifier: EUPL-1.2 + +package agentic + +import ( + "regexp" + "unicode" + + core "dappco.re/go/core" +) + +var pipelineNumberPattern = regexp.MustCompile(`^[0-9]+$`) + +func (s *PrepSubsystem) registerPipelineCommands() { + c := s.Core() + + c.Command("pipeline", core.Command{Description: "Run the agent pipeline command tree", Action: s.cmdPipeline}) + c.Command("agentic:pipeline", core.Command{Description: "Run the agent pipeline command tree", Action: s.cmdPipeline}) + c.Command("pipeline/audit", core.Command{Description: "Stage 1: audit issues into implementation work", Action: s.cmdPipelineAudit}) + c.Command("agentic:pipeline/audit", core.Command{Description: "Stage 1: audit issues into implementation work", Action: s.cmdPipelineAudit}) + c.Command("pipeline/epic", core.Command{Description: "Stage 2 and 3 epic orchestration commands", Action: s.cmdPipelineEpic}) + c.Command("agentic:pipeline/epic", core.Command{Description: "Stage 2 and 3 epic orchestration commands", Action: s.cmdPipelineEpic}) + c.Command("pipeline/epic/create", core.Command{Description: "Group implementation issues into epics", Action: s.cmdPipelineEpicCreate}) + c.Command("agentic:pipeline/epic/create", core.Command{Description: "Group implementation issues into epics", Action: s.cmdPipelineEpicCreate}) + c.Command("pipeline/epic/run", core.Command{Description: "Dispatch and monitor an epic", Action: s.cmdPipelineEpicRun}) + c.Command("agentic:pipeline/epic/run", core.Command{Description: "Dispatch and monitor an epic", Action: s.cmdPipelineEpicRun}) + c.Command("pipeline/epic/status", core.Command{Description: "Show epic progress", Action: s.cmdPipelineEpicStatus}) + c.Command("agentic:pipeline/epic/status", core.Command{Description: "Show epic progress", Action: s.cmdPipelineEpicStatus}) + c.Command("pipeline/epic/sync", core.Command{Description: "Sync epic checklist state from child issues", Action: s.cmdPipelineEpicSync}) + c.Command("agentic:pipeline/epic/sync", core.Command{Description: "Sync epic checklist state from child issues", Action: s.cmdPipelineEpicSync}) + c.Command("pipeline/monitor", core.Command{Description: "Watch open PRs and auto-intervene", Action: s.cmdPipelineMonitor}) + c.Command("agentic:pipeline/monitor", core.Command{Description: "Watch open PRs and auto-intervene", Action: s.cmdPipelineMonitor}) + c.Command("pipeline/fix", core.Command{Description: "Pipeline fix-up commands", Action: s.cmdPipelineFix}) + c.Command("agentic:pipeline/fix", core.Command{Description: "Pipeline fix-up commands", Action: s.cmdPipelineFix}) + c.Command("pipeline/fix/reviews", core.Command{Description: "Ask the agent to fix code reviews on a pull request", Action: s.cmdPipelineFixReviews}) + c.Command("agentic:pipeline/fix/reviews", core.Command{Description: "Ask the agent to fix code reviews on a pull request", Action: s.cmdPipelineFixReviews}) + c.Command("pipeline/fix/conflicts", core.Command{Description: "Ask the agent to fix a merge conflict on a pull request", Action: s.cmdPipelineFixConflicts}) + c.Command("agentic:pipeline/fix/conflicts", core.Command{Description: "Ask the agent to fix a merge conflict on a pull request", Action: s.cmdPipelineFixConflicts}) + c.Command("pipeline/fix/format", core.Command{Description: "Apply formatting-only fixes in a workspace or repo checkout", Action: s.cmdPipelineFixFormat}) + c.Command("agentic:pipeline/fix/format", core.Command{Description: "Apply formatting-only fixes in a workspace or repo checkout", Action: s.cmdPipelineFixFormat}) + c.Command("pipeline/fix/threads", core.Command{Description: "Handle review-thread follow-up for a pull request", Action: s.cmdPipelineFixThreads}) + c.Command("agentic:pipeline/fix/threads", core.Command{Description: "Handle review-thread follow-up for a pull request", Action: s.cmdPipelineFixThreads}) + c.Command("pipeline/onboard", core.Command{Description: "Run audit, epic creation, and dispatch onboarding for a repo", Action: s.cmdPipelineOnboard}) + c.Command("agentic:pipeline/onboard", core.Command{Description: "Run audit, epic creation, and dispatch onboarding for a repo", Action: s.cmdPipelineOnboard}) + c.Command("pipeline/budget", core.Command{Description: "Budget planning commands", Action: s.cmdPipelineBudget}) + c.Command("agentic:pipeline/budget", core.Command{Description: "Budget planning commands", Action: s.cmdPipelineBudget}) + c.Command("pipeline/budget/plan", core.Command{Description: "Show daily dispatch budget planning", Action: s.cmdPipelineBudgetPlan}) + c.Command("agentic:pipeline/budget/plan", core.Command{Description: "Show daily dispatch budget planning", Action: s.cmdPipelineBudgetPlan}) + c.Command("pipeline/budget/log", core.Command{Description: "Append a dispatch event to the budget journal", Action: s.cmdPipelineBudgetLog}) + c.Command("agentic:pipeline/budget/log", core.Command{Description: "Append a dispatch event to the budget journal", Action: s.cmdPipelineBudgetLog}) + c.Command("pipeline/training", core.Command{Description: "Training journal commands", Action: s.cmdPipelineTraining}) + c.Command("agentic:pipeline/training", core.Command{Description: "Training journal commands", Action: s.cmdPipelineTraining}) + c.Command("pipeline/training/capture", core.Command{Description: "Capture a merged pull request for training", Action: s.cmdPipelineTrainingCapture}) + c.Command("agentic:pipeline/training/capture", core.Command{Description: "Capture a merged pull request for training", Action: s.cmdPipelineTrainingCapture}) + c.Command("pipeline/training/stats", core.Command{Description: "Summarise training journal data", Action: s.cmdPipelineTrainingStats}) + c.Command("agentic:pipeline/training/stats", core.Command{Description: "Summarise training journal data", Action: s.cmdPipelineTrainingStats}) + c.Command("pipeline/training/export", core.Command{Description: "Export training journal data", Action: s.cmdPipelineTrainingExport}) + c.Command("agentic:pipeline/training/export", core.Command{Description: "Export training journal data", Action: s.cmdPipelineTrainingExport}) +} + +func (s *PrepSubsystem) cmdPipeline(options core.Options) core.Result { + switch action := optionStringValue(options, "action", "_arg"); action { + case "", "help": + printPipelineUsage() + return core.Result{OK: true} + case "audit": + return s.cmdPipelineAudit(options) + case "epic": + return s.cmdPipelineEpic(options) + case "monitor": + return s.cmdPipelineMonitor(options) + case "fix": + return s.cmdPipelineFix(options) + case "onboard": + return s.cmdPipelineOnboard(options) + case "budget": + return s.cmdPipelineBudget(options) + case "training": + return s.cmdPipelineTraining(options) + default: + printPipelineUsage() + return core.Result{Value: core.E("agentic.cmdPipeline", core.Concat("unknown pipeline command: ", action), nil), OK: false} + } +} + +func (s *PrepSubsystem) cmdPipelineEpic(options core.Options) core.Result { + switch action := optionStringValue(options, "action", "_arg"); action { + case "", "help": + printPipelineEpicUsage() + return core.Result{OK: true} + case "create": + return s.cmdPipelineEpicCreate(options) + case "run": + return s.cmdPipelineEpicRun(options) + case "status": + return s.cmdPipelineEpicStatus(options) + case "sync": + return s.cmdPipelineEpicSync(options) + default: + printPipelineEpicUsage() + return core.Result{Value: core.E("agentic.cmdPipelineEpic", core.Concat("unknown pipeline epic command: ", action), nil), OK: false} + } +} + +func (s *PrepSubsystem) cmdPipelineFix(options core.Options) core.Result { + switch action := optionStringValue(options, "action", "_arg"); action { + case "", "help": + printPipelineFixUsage() + return core.Result{OK: true} + case "reviews": + return s.cmdPipelineFixReviews(options) + case "conflicts": + return s.cmdPipelineFixConflicts(options) + case "format": + return s.cmdPipelineFixFormat(options) + case "threads": + return s.cmdPipelineFixThreads(options) + default: + printPipelineFixUsage() + return core.Result{Value: core.E("agentic.cmdPipelineFix", core.Concat("unknown pipeline fix command: ", action), nil), OK: false} + } +} + +func (s *PrepSubsystem) cmdPipelineBudget(options core.Options) core.Result { + switch action := optionStringValue(options, "action", "_arg"); action { + case "", "help": + printPipelineBudgetUsage() + return core.Result{OK: true} + case "plan": + return s.cmdPipelineBudgetPlan(options) + case "log": + return s.cmdPipelineBudgetLog(options) + default: + printPipelineBudgetUsage() + return core.Result{Value: core.E("agentic.cmdPipelineBudget", core.Concat("unknown pipeline budget command: ", action), nil), OK: false} + } +} + +func (s *PrepSubsystem) cmdPipelineTraining(options core.Options) core.Result { + switch action := optionStringValue(options, "action", "_arg"); action { + case "", "help": + printPipelineTrainingUsage() + return core.Result{OK: true} + case "capture": + return s.cmdPipelineTrainingCapture(options) + case "stats": + return s.cmdPipelineTrainingStats(options) + case "export": + return s.cmdPipelineTrainingExport(options) + default: + printPipelineTrainingUsage() + return core.Result{Value: core.E("agentic.cmdPipelineTraining", core.Concat("unknown pipeline training command: ", action), nil), OK: false} + } +} + +func pipelineOrgValue(options core.Options) string { + org := optionStringValue(options, "org") + if org == "" { + org = "core" + } + return org +} + +func pipelineRepoValue(options core.Options) string { + repo := optionStringValue(options, "repo") + if repo != "" { + return repo + } + + arg := optionStringValue(options, "_arg") + if arg == "" || pipelineNumberPattern.MatchString(core.Trim(arg)) { + return "" + } + + return arg +} + +func pipelineRepoAndNumberValue(options core.Options) (string, string, int) { + org := pipelineOrgValue(options) + repo := pipelineRepoValue(options) + number := optionIntValue(options, "number") + if number == 0 { + arg := optionStringValue(options, "_arg") + if pipelineNumberPattern.MatchString(core.Trim(arg)) { + number = parseIntString(arg) + } + } + return org, repo, number +} + +func pipelineWorkspaceDir(options core.Options) string { + workspace := optionStringValue(options, "workspace") + if workspace != "" { + return core.JoinPath(WorkspaceRoot(), workspace, "repo") + } + return optionStringValue(options, "repo_dir", "repo-dir", "path") +} + +func pipelineSlug(value string) string { + normalised := core.Lower(core.Trim(value)) + if normalised == "" { + return "pipeline" + } + + lastDash := false + out := make([]rune, 0, len(normalised)) + for _, r := range normalised { + if unicode.IsLetter(r) || unicode.IsDigit(r) { + out = append(out, r) + lastDash = false + continue + } + if lastDash { + continue + } + out = append(out, '-') + lastDash = true + } + + slug := string(out) + for len(slug) > 0 && slug[0] == '-' { + slug = slug[1:] + } + for len(slug) > 0 && slug[len(slug)-1] == '-' { + slug = slug[:len(slug)-1] + } + if slug == "" { + return "pipeline" + } + return slug +} + +func pipelineDisplayTheme(theme string) string { + switch theme { + case "security": + return "Security" + case "testing": + return "Testing" + case "docs": + return "Docs" + case "performance": + return "Performance" + case "features": + return "Features" + default: + return "Quality" + } +} + +func printPipelineUsage() { + core.Print(nil, "usage: core-agent pipeline [audit|epic|monitor|fix|onboard|budget|training]") + core.Print(nil, " core-agent pipeline/audit [--org=core] [--dry-run]") + core.Print(nil, " core-agent pipeline/epic/create [--org=core] [--dry-run]") + core.Print(nil, " core-agent pipeline/epic/run --repo= [--agent=codex] [--dry-run]") + core.Print(nil, " core-agent pipeline/monitor [] [--org=core] [--dry-run]") + core.Print(nil, " core-agent pipeline/fix/reviews --repo= [--org=core] [--dry-run]") + core.Print(nil, " core-agent pipeline/onboard [--org=core] [--agent=codex] [--dry-run]") +} + +func printPipelineEpicUsage() { + core.Print(nil, "usage: core-agent pipeline/epic/create [--org=core] [--theme=security] [--dry-run]") + core.Print(nil, " core-agent pipeline/epic/run --repo= [--org=core] [--agent=codex] [--dry-run]") + core.Print(nil, " core-agent pipeline/epic/status --repo= [--org=core]") + core.Print(nil, " core-agent pipeline/epic/sync --repo= [--org=core] [--dry-run]") +} + +func printPipelineFixUsage() { + core.Print(nil, "usage: core-agent pipeline/fix/reviews --repo= [--org=core] [--dry-run]") + core.Print(nil, " core-agent pipeline/fix/conflicts --repo= [--org=core] [--dry-run]") + core.Print(nil, " core-agent pipeline/fix/format --repo= [--org=core] [--workspace=|--repo-dir=] [--commit] [--push] [--dry-run]") + core.Print(nil, " core-agent pipeline/fix/threads --repo= [--org=core] [--dry-run]") +} + +func printPipelineBudgetUsage() { + core.Print(nil, "usage: core-agent pipeline/budget/plan") + core.Print(nil, " core-agent pipeline/budget/log") +} + +func printPipelineTrainingUsage() { + core.Print(nil, "usage: core-agent pipeline/training/capture --repo= [--org=core]") + core.Print(nil, " core-agent pipeline/training/stats") + core.Print(nil, " core-agent pipeline/training/export") +} diff --git a/pkg/agentic/pipeline_commands_test.go b/pkg/agentic/pipeline_commands_test.go new file mode 100644 index 0000000..b4d2062 --- /dev/null +++ b/pkg/agentic/pipeline_commands_test.go @@ -0,0 +1,452 @@ +// SPDX-License-Identifier: EUPL-1.2 + +package agentic + +import ( + "net/http" + "net/http/httptest" + "sort" + "testing" + + core "dappco.re/go/core" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestPipelineCommands_RegisterPipelineCommands_Good(t *testing.T) { + s, c := testPrepWithCore(t, nil) + + s.registerPipelineCommands() + + commands := c.Commands() + assert.Contains(t, commands, "pipeline") + assert.Contains(t, commands, "pipeline/audit") + assert.Contains(t, commands, "pipeline/epic") + assert.Contains(t, commands, "pipeline/epic/create") + assert.Contains(t, commands, "pipeline/epic/run") + assert.Contains(t, commands, "pipeline/epic/status") + assert.Contains(t, commands, "pipeline/epic/sync") + assert.Contains(t, commands, "pipeline/monitor") + assert.Contains(t, commands, "pipeline/fix") + assert.Contains(t, commands, "pipeline/fix/reviews") + assert.Contains(t, commands, "pipeline/fix/conflicts") + assert.Contains(t, commands, "pipeline/fix/format") + assert.Contains(t, commands, "pipeline/fix/threads") + assert.Contains(t, commands, "pipeline/onboard") + assert.Contains(t, commands, "pipeline/budget") + assert.Contains(t, commands, "pipeline/budget/plan") + assert.Contains(t, commands, "pipeline/budget/log") + assert.Contains(t, commands, "pipeline/training") + assert.Contains(t, commands, "pipeline/training/capture") + assert.Contains(t, commands, "pipeline/training/stats") + assert.Contains(t, commands, "pipeline/training/export") +} + +func TestPipelineCommands_CmdPipeline_Good_Help(t *testing.T) { + s, _ := testPrepWithCore(t, nil) + + output := captureStdout(t, func() { + result := s.cmdPipeline(core.NewOptions()) + require.True(t, result.OK) + }) + + assert.Contains(t, output, "core-agent pipeline/audit ") + assert.Contains(t, output, "core-agent pipeline/epic/create ") + assert.Contains(t, output, "core-agent pipeline/onboard ") +} + +func TestPipelineCommands_CmdPipeline_Bad_UnknownCommand(t *testing.T) { + s, _ := testPrepWithCore(t, nil) + + result := s.cmdPipeline(core.NewOptions(core.Option{Key: "_arg", Value: "unknown"})) + + require.False(t, result.OK) + err, ok := result.Value.(error) + require.True(t, ok) + assert.Contains(t, err.Error(), "unknown pipeline command") +} + +type pipelineTestIssue struct { + Number int + Title string + Body string + State string + Labels []string +} + +type pipelineTestPR struct { + Number int + Title string + State string + Merged bool + Mergeable *bool + MergeableState string + HeadRef string + HeadSHA string + BaseRef string + ReviewThreadsTotal int + ReviewThreadsResolved int + ReviewThreadsUnresolved int + ReviewComments int + Comments int + Reactions map[string]int + Statuses []map[string]any +} + +type pipelineTestRepo struct { + DefaultBranch string + RefSHA string + Labels map[string]int64 + Issues map[int]*pipelineTestIssue + Pulls map[int]*pipelineTestPR + Comments map[int][]string + Branches map[string]string + Merged []int +} + +func newPipelineTestRepo() *pipelineTestRepo { + return &pipelineTestRepo{ + DefaultBranch: "dev", + RefSHA: "deadbeef", + Labels: map[string]int64{ + "agentic": 1, + "audit": 2, + "security": 3, + "quality": 4, + "testing": 5, + "performance": 6, + "docs": 7, + "epic": 8, + "critical": 9, + "high": 10, + "medium": 11, + "low": 12, + }, + Issues: map[int]*pipelineTestIssue{}, + Pulls: map[int]*pipelineTestPR{}, + Comments: map[int][]string{}, + Branches: map[string]string{"dev": "deadbeef"}, + Merged: []int{}, + } +} + +func newPipelineTestServer(t *testing.T, repos map[string]*pipelineTestRepo) *httptest.Server { + t.Helper() + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + path := r.URL.Path + if len(path) > 0 && path[0] == '/' { + path = path[1:] + } + parts := core.Split(path, "/") + + if len(parts) == 5 && parts[0] == "api" && parts[1] == "v1" && parts[2] == "orgs" && parts[4] == "repos" && r.Method == http.MethodGet { + org := parts[3] + _ = org + names := make([]string, 0, len(repos)) + for name := range repos { + names = append(names, name) + } + sort.Strings(names) + out := []map[string]any{} + for _, name := range names { + repo := repos[name] + out = append(out, map[string]any{ + "name": name, + "default_branch": repo.DefaultBranch, + "html_url": "https://forge.test/core/" + name, + }) + } + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(core.JSONMarshalString(out))) + return + } + + if len(parts) < 5 || parts[0] != "api" || parts[1] != "v1" || parts[2] != "repos" { + w.WriteHeader(http.StatusNotFound) + return + } + + repoName := parts[4] + repo, ok := repos[repoName] + if !ok { + w.WriteHeader(http.StatusNotFound) + return + } + + if len(parts) == 5 && r.Method == http.MethodGet { + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(core.JSONMarshalString(map[string]any{ + "name": repoName, + "default_branch": repo.DefaultBranch, + "html_url": "https://forge.test/core/" + repoName, + }))) + return + } + + if len(parts) >= 6 && parts[5] == "labels" { + switch r.Method { + case http.MethodGet: + names := make([]string, 0, len(repo.Labels)) + for name := range repo.Labels { + names = append(names, name) + } + sort.Strings(names) + out := []map[string]any{} + for _, name := range names { + out = append(out, map[string]any{"id": repo.Labels[name], "name": name}) + } + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(core.JSONMarshalString(out))) + return + case http.MethodPost: + bodyResult := core.ReadAll(r.Body) + require.True(t, bodyResult.OK) + var payload map[string]any + require.True(t, core.JSONUnmarshalString(bodyResult.Value.(string), &payload).OK) + name := payload["name"].(string) + repo.Labels[name] = int64(len(repo.Labels) + 1) + w.WriteHeader(http.StatusCreated) + _, _ = w.Write([]byte(core.JSONMarshalString(map[string]any{"id": repo.Labels[name]}))) + return + } + } + + if len(parts) >= 6 && parts[5] == "issues" { + switch { + case len(parts) == 6 && r.Method == http.MethodGet: + numbers := make([]int, 0, len(repo.Issues)) + for number := range repo.Issues { + numbers = append(numbers, number) + } + sort.Ints(numbers) + out := []map[string]any{} + for _, number := range numbers { + issue := repo.Issues[number] + if issue == nil { + continue + } + if state := r.URL.Query().Get("state"); state == "open" && issue.State == "closed" { + continue + } + out = append(out, pipelineIssuePayload(repoName, issue)) + } + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(core.JSONMarshalString(out))) + return + case len(parts) == 6 && r.Method == http.MethodPost: + bodyResult := core.ReadAll(r.Body) + require.True(t, bodyResult.OK) + var payload map[string]any + require.True(t, core.JSONUnmarshalString(bodyResult.Value.(string), &payload).OK) + next := 1 + for number := range repo.Issues { + if number >= next { + next = number + 1 + } + } + title := payload["title"].(string) + body, _ := payload["body"].(string) + labels := []string{} + if raw, ok := payload["labels"].([]any); ok { + for _, labelID := range raw { + for name, id := range repo.Labels { + if int64(labelID.(float64)) == id { + labels = append(labels, name) + } + } + } + } + repo.Issues[next] = &pipelineTestIssue{Number: next, Title: title, Body: body, State: "open", Labels: labels} + w.WriteHeader(http.StatusCreated) + _, _ = w.Write([]byte(core.JSONMarshalString(map[string]any{ + "number": next, + "html_url": "https://forge.test/core/" + repoName + "/issues/" + core.Sprint(next), + }))) + return + case len(parts) == 7 && r.Method == http.MethodGet: + number := parseIntString(parts[6]) + issue := repo.Issues[number] + if issue == nil { + w.WriteHeader(http.StatusNotFound) + return + } + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(core.JSONMarshalString(pipelineIssuePayload(repoName, issue)))) + return + case len(parts) == 7 && r.Method == http.MethodPatch: + number := parseIntString(parts[6]) + issue := repo.Issues[number] + if issue == nil { + w.WriteHeader(http.StatusNotFound) + return + } + bodyResult := core.ReadAll(r.Body) + require.True(t, bodyResult.OK) + var payload map[string]any + require.True(t, core.JSONUnmarshalString(bodyResult.Value.(string), &payload).OK) + if state, ok := payload["state"].(string); ok { + issue.State = state + } + if body, ok := payload["body"].(string); ok { + issue.Body = body + } + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(core.JSONMarshalString(pipelineIssuePayload(repoName, issue)))) + return + case len(parts) == 8 && parts[7] == "comments" && r.Method == http.MethodPost: + number := parseIntString(parts[6]) + bodyResult := core.ReadAll(r.Body) + require.True(t, bodyResult.OK) + var payload map[string]any + require.True(t, core.JSONUnmarshalString(bodyResult.Value.(string), &payload).OK) + repo.Comments[number] = append(repo.Comments[number], payload["body"].(string)) + w.WriteHeader(http.StatusCreated) + _, _ = w.Write([]byte(core.JSONMarshalString(map[string]any{"id": len(repo.Comments[number])}))) + return + } + } + + if len(parts) >= 6 && parts[5] == "pulls" { + switch { + case len(parts) == 6 && r.Method == http.MethodGet: + numbers := make([]int, 0, len(repo.Pulls)) + for number := range repo.Pulls { + numbers = append(numbers, number) + } + sort.Ints(numbers) + out := []map[string]any{} + for _, number := range numbers { + pullRequest := repo.Pulls[number] + if pullRequest == nil { + continue + } + if state := r.URL.Query().Get("state"); state == "open" && pullRequest.State != "open" { + continue + } + out = append(out, pipelinePRPayload(repoName, pullRequest)) + } + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(core.JSONMarshalString(out))) + return + case len(parts) == 7 && r.Method == http.MethodGet: + number := parseIntString(parts[6]) + pullRequest := repo.Pulls[number] + if pullRequest == nil { + w.WriteHeader(http.StatusNotFound) + return + } + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(core.JSONMarshalString(pipelinePRPayload(repoName, pullRequest)))) + return + case len(parts) == 8 && parts[7] == "merge" && r.Method == http.MethodPost: + number := parseIntString(parts[6]) + repo.Merged = append(repo.Merged, number) + if pullRequest := repo.Pulls[number]; pullRequest != nil { + pullRequest.State = "merged" + pullRequest.Merged = true + } + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte("{}")) + return + } + } + + if len(parts) == 8 && parts[5] == "commits" && parts[7] == "status" && r.Method == http.MethodGet { + sha := parts[6] + for _, pullRequest := range repo.Pulls { + if pullRequest != nil && pullRequest.HeadSHA == sha { + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(core.JSONMarshalString(map[string]any{"statuses": pullRequest.Statuses}))) + return + } + } + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(core.JSONMarshalString(map[string]any{"statuses": []map[string]any{}}))) + return + } + + if len(parts) >= 8 && parts[5] == "git" && parts[6] == "refs" { + switch { + case len(parts) >= 9 && parts[7] == "heads" && r.Method == http.MethodGet: + branch := core.Join("/", parts[8:]...) + sha := repo.Branches[branch] + if sha == "" { + sha = repo.RefSHA + } + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(core.JSONMarshalString(map[string]any{"object": map[string]any{"sha": sha}}))) + return + case len(parts) == 7 && r.Method == http.MethodPost: + bodyResult := core.ReadAll(r.Body) + require.True(t, bodyResult.OK) + var payload map[string]any + require.True(t, core.JSONUnmarshalString(bodyResult.Value.(string), &payload).OK) + ref := payload["ref"].(string) + sha := payload["sha"].(string) + branch := ref[len("refs/heads/"):] + repo.Branches[branch] = sha + w.WriteHeader(http.StatusCreated) + _, _ = w.Write([]byte(core.JSONMarshalString(map[string]any{"ref": ref}))) + return + } + } + + w.WriteHeader(http.StatusNotFound) + })) + + t.Cleanup(server.Close) + return server +} + +func pipelineIssuePayload(repoName string, issue *pipelineTestIssue) map[string]any { + labels := []map[string]any{} + for _, label := range issue.Labels { + labels = append(labels, map[string]any{"name": label}) + } + return map[string]any{ + "number": issue.Number, + "title": issue.Title, + "body": issue.Body, + "state": issue.State, + "labels": labels, + "html_url": "https://forge.test/core/" + repoName + "/issues/" + core.Sprint(issue.Number), + } +} + +func pipelinePRPayload(repoName string, pullRequest *pipelineTestPR) map[string]any { + payload := map[string]any{ + "number": pullRequest.Number, + "title": pullRequest.Title, + "state": pullRequest.State, + "html_url": "https://forge.test/core/" + repoName + "/pulls/" + core.Sprint(pullRequest.Number), + "merged": pullRequest.Merged, + "mergeable": pullRequest.Mergeable, + "mergeable_state": pullRequest.MergeableState, + "review_threads_total": pullRequest.ReviewThreadsTotal, + "review_threads_resolved": pullRequest.ReviewThreadsResolved, + "review_threads_unresolved": pullRequest.ReviewThreadsUnresolved, + "review_comments": pullRequest.ReviewComments, + "comments": pullRequest.Comments, + "reactions": pullRequest.Reactions, + "head": map[string]any{ + "ref": pullRequest.HeadRef, + "sha": pullRequest.HeadSHA, + "repo": map[string]any{ + "updated_at": "2026-04-25T12:00:00Z", + "pushed_at": "2026-04-25T12:00:00Z", + }, + }, + "base": map[string]any{ + "ref": pullRequest.BaseRef, + }, + } + if pullRequest.Mergeable == nil { + payload["mergeable"] = nil + } + return payload +} + +func boolPtr(value bool) *bool { + return &value +} diff --git a/pkg/agentic/pipeline_epic.go b/pkg/agentic/pipeline_epic.go new file mode 100644 index 0000000..4e236c9 --- /dev/null +++ b/pkg/agentic/pipeline_epic.go @@ -0,0 +1,546 @@ +// SPDX-License-Identifier: EUPL-1.2 + +package agentic + +import ( + "context" + "sort" + + core "dappco.re/go/core" +) + +type PipelineEpicCreateInput struct { + Org string `json:"org,omitempty"` + Repo string `json:"repo"` + Theme string `json:"theme,omitempty"` + DryRun bool `json:"dry_run,omitempty"` + Candidates []PipelineIssueRef `json:"candidates,omitempty"` +} + +type PipelineEpicCreateOutput struct { + Success bool `json:"success"` + Org string `json:"org,omitempty"` + Repo string `json:"repo"` + Candidates []PipelineIssueRef `json:"candidates,omitempty"` + Epics []PipelineEpicMeta `json:"epics,omitempty"` + Existing []PipelineEpicMeta `json:"existing,omitempty"` +} + +type PipelineEpicRunInput struct { + Org string `json:"org,omitempty"` + Repo string `json:"repo"` + EpicNumber int `json:"epic_number"` + Agent string `json:"agent,omitempty"` + Template string `json:"template,omitempty"` + DryRun bool `json:"dry_run,omitempty"` + Limit int `json:"limit,omitempty"` + Epic *PipelineEpicMeta `json:"epic,omitempty"` +} + +type PipelineEpicRunOutput struct { + Success bool `json:"success"` + Org string `json:"org,omitempty"` + Repo string `json:"repo"` + EpicNumber int `json:"epic_number"` + Branch string `json:"branch,omitempty"` + Dispatched []PipelineIssueRef `json:"dispatched,omitempty"` + Skipped []PipelineIssueRef `json:"skipped,omitempty"` +} + +type PipelineEpicStatusOutput struct { + Success bool `json:"success"` + Org string `json:"org,omitempty"` + Repo string `json:"repo"` + Epic PipelineEpicMeta `json:"epic"` +} + +type PipelineEpicSyncOutput struct { + Success bool `json:"success"` + Org string `json:"org,omitempty"` + Repo string `json:"repo"` + EpicNumber int `json:"epic_number"` + Checked int `json:"checked"` + Total int `json:"total"` + Updated bool `json:"updated"` +} + +func (s *PrepSubsystem) cmdPipelineEpicCreate(options core.Options) core.Result { + ctx := s.commandContext() + repo := pipelineRepoValue(options) + if repo == "" { + core.Print(nil, "usage: core-agent pipeline/epic/create [--org=core] [--theme=security] [--dry-run]") + return core.Result{Value: core.E("agentic.cmdPipelineEpicCreate", "repo is required", nil), OK: false} + } + + output, err := s.pipelineEpicCreate(ctx, PipelineEpicCreateInput{ + Org: pipelineOrgValue(options), + Repo: repo, + Theme: optionStringValue(options, "theme"), + DryRun: optionBoolValue(options, "dry_run", "dry-run"), + }) + if err != nil { + core.Print(nil, "error: %v", err) + return core.Result{Value: err, OK: false} + } + + core.Print(nil, "repo: %s/%s", output.Org, output.Repo) + core.Print(nil, "candidates: %d", len(output.Candidates)) + core.Print(nil, "epics: %d", len(output.Epics)) + for _, epic := range output.Epics { + core.Print(nil, " epic: #%d %s", epic.Number, epic.Title) + if epic.Branch != "" { + core.Print(nil, " branch: %s", epic.Branch) + } + core.Print(nil, " child: %d", len(epic.Children)) + } + if len(output.Existing) > 0 { + core.Print(nil, "existing: %d", len(output.Existing)) + } + + return core.Result{Value: output, OK: true} +} + +func (s *PrepSubsystem) cmdPipelineEpicRun(options core.Options) core.Result { + ctx := s.commandContext() + org, repo, number := pipelineRepoAndNumberValue(options) + if repo == "" || number <= 0 { + core.Print(nil, "usage: core-agent pipeline/epic/run --repo= [--org=core] [--agent=codex] [--dry-run]") + return core.Result{Value: core.E("agentic.cmdPipelineEpicRun", "repo and epic number are required", nil), OK: false} + } + + output, err := s.pipelineEpicRun(ctx, PipelineEpicRunInput{ + Org: org, + Repo: repo, + EpicNumber: number, + Agent: optionStringValue(options, "agent"), + Template: optionStringValue(options, "template"), + DryRun: optionBoolValue(options, "dry_run", "dry-run"), + Limit: optionIntValue(options, "limit"), + }) + if err != nil { + core.Print(nil, "error: %v", err) + return core.Result{Value: err, OK: false} + } + + core.Print(nil, "epic: #%d", output.EpicNumber) + core.Print(nil, "repo: %s/%s", output.Org, output.Repo) + core.Print(nil, "dispatched: %d", len(output.Dispatched)) + core.Print(nil, "skipped: %d", len(output.Skipped)) + if output.Branch != "" { + core.Print(nil, "branch: %s", output.Branch) + } + for _, issue := range output.Dispatched { + core.Print(nil, " issue: #%d %s", issue.Number, issue.Title) + } + + return core.Result{Value: output, OK: true} +} + +func (s *PrepSubsystem) cmdPipelineEpicStatus(options core.Options) core.Result { + ctx := s.commandContext() + org, repo, number := pipelineRepoAndNumberValue(options) + if repo == "" || number <= 0 { + core.Print(nil, "usage: core-agent pipeline/epic/status --repo= [--org=core]") + return core.Result{Value: core.E("agentic.cmdPipelineEpicStatus", "repo and epic number are required", nil), OK: false} + } + + reader := &pipelineForgeMetaReader{subsystem: s, org: org} + epic, err := reader.GetEpicMeta(ctx, repo, number) + if err != nil { + core.Print(nil, "error: %v", err) + return core.Result{Value: err, OK: false} + } + + output := PipelineEpicStatusOutput{ + Success: true, + Org: org, + Repo: repo, + Epic: epic, + } + + core.Print(nil, "epic: #%d %s", epic.Number, epic.Title) + core.Print(nil, "state: %s", epic.State) + core.Print(nil, "branch: %s", epic.Branch) + core.Print(nil, "child: %d", len(epic.Children)) + for _, child := range epic.Children { + box := "[ ]" + if child.Checked { + box = "[x]" + } + core.Print(nil, " %s #%d %-8s %s", box, child.Number, child.State, child.Title) + } + + return core.Result{Value: output, OK: true} +} + +func (s *PrepSubsystem) cmdPipelineEpicSync(options core.Options) core.Result { + ctx := s.commandContext() + org, repo, number := pipelineRepoAndNumberValue(options) + if repo == "" || number <= 0 { + core.Print(nil, "usage: core-agent pipeline/epic/sync --repo= [--org=core] [--dry-run]") + return core.Result{Value: core.E("agentic.cmdPipelineEpicSync", "repo and epic number are required", nil), OK: false} + } + + output, err := s.pipelineEpicSync(ctx, org, repo, number, optionBoolValue(options, "dry_run", "dry-run")) + if err != nil { + core.Print(nil, "error: %v", err) + return core.Result{Value: err, OK: false} + } + + core.Print(nil, "epic: #%d", output.EpicNumber) + core.Print(nil, "checked: %d/%d", output.Checked, output.Total) + core.Print(nil, "updated: %v", output.Updated) + return core.Result{Value: output, OK: true} +} + +func (s *PrepSubsystem) pipelineEpicCreate(ctx context.Context, input PipelineEpicCreateInput) (PipelineEpicCreateOutput, error) { + if input.Repo == "" { + return PipelineEpicCreateOutput{}, core.E("pipelineEpicCreate", "repo is required", nil) + } + if s.forgeToken == "" { + return PipelineEpicCreateOutput{}, core.E("pipelineEpicCreate", "no Forge token configured", nil) + } + if input.Org == "" { + input.Org = "core" + } + + candidates := input.Candidates + if len(candidates) == 0 { + issues, err := s.pipelineListIssues(ctx, input.Org, input.Repo, "open") + if err != nil { + return PipelineEpicCreateOutput{}, err + } + for _, issue := range issues { + if !pipelineIssueIsImplementationCandidate(issue) { + continue + } + ref := pipelineIssueRefFromRecord(issue) + if input.Theme != "" && pipelineEpicTheme(ref) != input.Theme { + continue + } + candidates = append(candidates, ref) + } + } + + sort.Slice(candidates, func(i, j int) bool { + return candidates[i].Number < candidates[j].Number + }) + + output := PipelineEpicCreateOutput{ + Success: true, + Org: input.Org, + Repo: input.Repo, + Candidates: candidates, + } + + if len(candidates) < 3 { + return output, nil + } + + groups := map[string][]PipelineIssueRef{} + for _, candidate := range candidates { + theme := pipelineEpicTheme(candidate) + if input.Theme != "" { + theme = input.Theme + } + groups[theme] = append(groups[theme], candidate) + } + + existingIssues, err := s.pipelineListIssues(ctx, input.Org, input.Repo, "open") + if err != nil { + return PipelineEpicCreateOutput{}, err + } + existingByTitle := map[string]pipelineIssueRecord{} + for _, issue := range existingIssues { + if pipelineIssueIsEpic(issue) { + existingByTitle[issue.Title] = issue + } + } + + groupNames := make([]string, 0, len(groups)) + for theme, group := range groups { + if len(group) >= 3 { + groupNames = append(groupNames, theme) + } + } + sort.Strings(groupNames) + + reader := &pipelineForgeMetaReader{subsystem: s, org: input.Org} + for _, theme := range groupNames { + group := groups[theme] + title := pipelineEpicTitle(input.Repo, theme) + if existing, ok := existingByTitle[title]; ok { + meta, metaErr := reader.GetEpicMeta(ctx, input.Repo, existing.Number) + if metaErr != nil { + return PipelineEpicCreateOutput{}, metaErr + } + output.Existing = append(output.Existing, meta) + continue + } + + labels := []string{"agentic", "epic", theme} + body := pipelineEpicBody(title, "", theme, group) + + meta := PipelineEpicMeta{ + Title: title, + State: "open", + Branch: "", + } + for _, child := range group { + meta.Children = append(meta.Children, PipelineEpicChildMeta{ + Number: child.Number, + Title: child.Title, + Checked: false, + State: "open", + URL: child.URL, + }) + } + + if input.DryRun { + meta.Branch = core.Sprintf("epic/dry-run-%s", pipelineSlug(theme)) + output.Epics = append(output.Epics, meta) + continue + } + + labelIDs := s.resolveLabelIDs(ctx, input.Org, input.Repo, labels) + created, createErr := s.createIssue(ctx, input.Org, input.Repo, title, body, labelIDs) + if createErr != nil { + return PipelineEpicCreateOutput{}, core.E("pipelineEpicCreate", "failed to create epic issue", createErr) + } + + branch, branchErr := s.pipelineCreateEpicBranch(ctx, input.Org, input.Repo, created.Number, theme) + if branchErr != nil { + return PipelineEpicCreateOutput{}, branchErr + } + + patchedBody := pipelineEpicBody(title, branch, theme, group) + if patchErr := s.pipelinePatchIssue(ctx, input.Org, input.Repo, created.Number, map[string]any{"body": patchedBody}); patchErr != nil { + return PipelineEpicCreateOutput{}, patchErr + } + + for _, child := range group { + comment := core.Sprintf("Parent: #%d\nEpic branch: `%s`", created.Number, branch) + s.commentOnIssue(ctx, input.Org, input.Repo, child.Number, comment) + } + + meta.Number = created.Number + meta.URL = created.URL + meta.Branch = branch + output.Epics = append(output.Epics, meta) + } + + return output, nil +} + +func (s *PrepSubsystem) pipelineEpicRun(ctx context.Context, input PipelineEpicRunInput) (PipelineEpicRunOutput, error) { + if input.Repo == "" || input.EpicNumber <= 0 { + return PipelineEpicRunOutput{}, core.E("pipelineEpicRun", "repo and epic number are required", nil) + } + if input.Org == "" { + input.Org = "core" + } + if input.Agent == "" { + input.Agent = "codex" + } + if input.Template == "" { + input.Template = "coding" + } + + meta := input.Epic + if meta == nil { + reader := &pipelineForgeMetaReader{subsystem: s, org: input.Org} + readMeta, err := reader.GetEpicMeta(ctx, input.Repo, input.EpicNumber) + if err != nil { + return PipelineEpicRunOutput{}, err + } + meta = &readMeta + } + + output := PipelineEpicRunOutput{ + Success: true, + Org: input.Org, + Repo: input.Repo, + EpicNumber: meta.Number, + Branch: meta.Branch, + } + + for _, child := range meta.Children { + if child.Checked || child.State != "open" { + output.Skipped = append(output.Skipped, PipelineIssueRef{Number: child.Number, Title: child.Title, State: child.State, URL: child.URL}) + continue + } + if !pipelineIssueAutoDispatchAllowed(child.Title) { + output.Skipped = append(output.Skipped, PipelineIssueRef{Number: child.Number, Title: child.Title, State: "manual-review", URL: child.URL}) + continue + } + if input.Limit > 0 && len(output.Dispatched) >= input.Limit { + break + } + + ref := PipelineIssueRef{Number: child.Number, Title: child.Title, State: child.State, URL: child.URL} + if input.DryRun { + output.Dispatched = append(output.Dispatched, ref) + continue + } + + if meta.Branch != "" { + s.commentOnIssue(ctx, input.Org, input.Repo, child.Number, core.Sprintf("Target branch: `%s` (epic #%d)", meta.Branch, meta.Number)) + } + + _, _, err := s.dispatch(ctx, nil, DispatchInput{ + Org: input.Org, + Repo: input.Repo, + Task: child.Title, + Issue: child.Number, + Branch: meta.Branch, + Agent: input.Agent, + Template: input.Template, + DryRun: false, + }) + if err != nil { + return PipelineEpicRunOutput{}, core.E("pipelineEpicRun", core.Concat("failed to dispatch child issue #", core.Sprint(child.Number)), err) + } + + output.Dispatched = append(output.Dispatched, ref) + } + + return output, nil +} + +func (s *PrepSubsystem) pipelineEpicSync(ctx context.Context, org, repo string, number int, dryRun bool) (PipelineEpicSyncOutput, error) { + reader := &pipelineForgeMetaReader{subsystem: s, org: org} + meta, err := reader.GetEpicMeta(ctx, repo, number) + if err != nil { + return PipelineEpicSyncOutput{}, err + } + + checkedByNumber := map[int]bool{} + checkedCount := 0 + for _, child := range meta.Children { + checked := core.Lower(child.State) == "closed" + if checked { + checkedCount++ + } + checkedByNumber[child.Number] = checked + } + + lines := core.Split(meta.Body, "\n") + updated := false + for i, line := range lines { + match := pipelineEpicChildPattern.FindStringSubmatch(line) + if len(match) != 6 { + continue + } + childNumber := parseIntString(match[4]) + wantChecked := checkedByNumber[childNumber] + currentChecked := core.Lower(match[2]) == "x" + if wantChecked == currentChecked { + continue + } + mark := " " + if wantChecked { + mark = "x" + } + lines[i] = core.Concat(match[1], mark, match[3], match[4], match[5]) + updated = true + } + + if updated && !dryRun { + if err := s.pipelinePatchIssue(ctx, org, repo, number, map[string]any{"body": core.Join("\n", lines...)}); err != nil { + return PipelineEpicSyncOutput{}, err + } + } + + return PipelineEpicSyncOutput{ + Success: true, + Org: org, + Repo: repo, + EpicNumber: number, + Checked: checkedCount, + Total: len(meta.Children), + Updated: updated, + }, nil +} + +func (s *PrepSubsystem) pipelineCreateEpicBranch(ctx context.Context, org, repo string, epicNumber int, theme string) (string, error) { + repository, err := s.pipelineGetRepo(ctx, org, repo) + if err != nil { + return "", err + } + + defaultBranch := repository.DefaultBranch + if defaultBranch == "" { + defaultBranch = "dev" + } + + shaURL := core.Sprintf("%s/api/v1/repos/%s/%s/git/refs/heads/%s", s.forgeURL, org, repo, defaultBranch) + shaResult := HTTPGet(ctx, shaURL, s.forgeToken, "token") + if !shaResult.OK { + return "", core.E("pipelineCreateEpicBranch", core.Concat("failed to read ref for ", defaultBranch), nil) + } + + var ref struct { + Object struct { + SHA string `json:"sha"` + } `json:"object"` + } + if err := pipelineDecodeJSON(shaResult.Value.(string), &ref, "pipelineCreateEpicBranch", "failed to decode git ref"); err != nil { + return "", err + } + + branch := core.Sprintf("epic/%d-%s", epicNumber, pipelineSlug(theme)) + createURL := core.Sprintf("%s/api/v1/repos/%s/%s/git/refs", s.forgeURL, org, repo) + createResult := HTTPPost(ctx, createURL, core.JSONMarshalString(map[string]any{ + "ref": core.Concat("refs/heads/", branch), + "sha": ref.Object.SHA, + }), s.forgeToken, "token") + if !createResult.OK { + return "", core.E("pipelineCreateEpicBranch", core.Concat("failed to create branch ", branch), nil) + } + + return branch, nil +} + +func pipelineEpicTheme(issue PipelineIssueRef) string { + haystack := core.Lower(core.Concat(issue.Title, " ", core.JSONMarshalString(issue.Labels))) + switch { + case core.Contains(haystack, "security"): + return "security" + case core.Contains(haystack, "testing"), core.Contains(haystack, "test"): + return "testing" + case core.Contains(haystack, "docs"), core.Contains(haystack, "doc"): + return "docs" + case core.Contains(haystack, "performance"), core.Contains(haystack, "perf"): + return "performance" + case core.HasPrefix(core.Lower(issue.Title), "feat("): + return "features" + default: + return "quality" + } +} + +func pipelineEpicTitle(repo, theme string) string { + return core.Sprintf("epic(%s): %s pipeline", repo, core.Lower(pipelineDisplayTheme(theme))) +} + +func pipelineEpicBody(title, branch, theme string, children []PipelineIssueRef) string { + builder := core.NewBuilder() + builder.WriteString("## Overview\n\n") + builder.WriteString(core.Sprintf("%s groups related implementation issues into a tracked epic.\n\n", title)) + if branch != "" { + builder.WriteString(core.Sprintf("Epic branch: `%s`\n\n", branch)) + } + builder.WriteString("## Child Issues\n\n") + builder.WriteString(core.Sprintf("### Phase 1: %s\n", pipelineDisplayTheme(theme))) + for _, child := range children { + builder.WriteString(core.Sprintf("- [ ] #%d %s\n", child.Number, child.Title)) + } + builder.WriteString("\n## Acceptance Criteria\n\n") + builder.WriteString("- [ ] All child issues are closed\n") + builder.WriteString("- [ ] Epic checklist is fully synced\n") + return builder.String() +} + +func pipelineIssueAutoDispatchAllowed(title string) bool { + return !core.HasPrefix(core.Lower(core.Trim(title)), "feat(") +} diff --git a/pkg/agentic/pipeline_epic_test.go b/pkg/agentic/pipeline_epic_test.go new file mode 100644 index 0000000..d265465 --- /dev/null +++ b/pkg/agentic/pipeline_epic_test.go @@ -0,0 +1,92 @@ +// SPDX-License-Identifier: EUPL-1.2 + +package agentic + +import ( + "context" + "testing" + + core "dappco.re/go/core" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestPipelineEpic_Good_CreateGroupsIssuesIntoEpic(t *testing.T) { + repo := newPipelineTestRepo() + repo.Issues[10] = &pipelineTestIssue{Number: 10, Title: "security(go-io): Validate tokens", State: "open", Labels: []string{"agentic", "security"}} + repo.Issues[11] = &pipelineTestIssue{Number: 11, Title: "security(go-io): Sanitize input", State: "open", Labels: []string{"agentic", "security"}} + repo.Issues[12] = &pipelineTestIssue{Number: 12, Title: "security(go-io): Add rate limiting", State: "open", Labels: []string{"agentic", "security"}} + srv := newPipelineTestServer(t, map[string]*pipelineTestRepo{"go-io": repo}) + + s, _ := testPrepWithCore(t, srv) + output, err := s.pipelineEpicCreate(context.Background(), PipelineEpicCreateInput{Org: "core", Repo: "go-io"}) + + require.NoError(t, err) + require.Len(t, output.Epics, 1) + assert.Equal(t, 13, output.Epics[0].Number) + assert.Equal(t, "epic/13-security", output.Epics[0].Branch) + assert.Contains(t, repo.Issues[13].Body, "- [ ] #10 security(go-io): Validate tokens") + assert.Contains(t, repo.Comments[10][0], "Parent: #13") + assert.Equal(t, "deadbeef", repo.Branches["epic/13-security"]) +} + +func TestPipelineEpic_Bad_RunRequiresRepoAndNumber(t *testing.T) { + s, _ := testPrepWithCore(t, nil) + + result := s.cmdPipelineEpicRun(core.NewOptions()) + + require.False(t, result.OK) + err, ok := result.Value.(error) + require.True(t, ok) + assert.Contains(t, err.Error(), "repo and epic number are required") +} + +func TestPipelineEpic_Ugly_SyncMarksClosedChildrenChecked(t *testing.T) { + repo := newPipelineTestRepo() + repo.Issues[10] = &pipelineTestIssue{Number: 10, Title: "security(go-io): Validate tokens", State: "closed", Labels: []string{"agentic", "security"}} + repo.Issues[11] = &pipelineTestIssue{Number: 11, Title: "security(go-io): Sanitize input", State: "open", Labels: []string{"agentic", "security"}} + repo.Issues[20] = &pipelineTestIssue{ + Number: 20, + Title: "epic(go-io): security pipeline", + State: "open", + Labels: []string{"agentic", "epic", "security"}, + Body: "## Overview\n\nEpic branch: `epic/20-security`\n\n## Child Issues\n\n- [ ] #10 security(go-io): Validate tokens\n- [ ] #11 security(go-io): Sanitize input\n", + } + srv := newPipelineTestServer(t, map[string]*pipelineTestRepo{"go-io": repo}) + + s, _ := testPrepWithCore(t, srv) + output, err := s.pipelineEpicSync(context.Background(), "core", "go-io", 20, false) + + require.NoError(t, err) + assert.True(t, output.Updated) + assert.Equal(t, 1, output.Checked) + assert.Contains(t, repo.Issues[20].Body, "- [x] #10 security(go-io): Validate tokens") + assert.Contains(t, repo.Issues[20].Body, "- [ ] #11 security(go-io): Sanitize input") +} + +func TestPipelineEpic_Run_Good_DryRunDispatchesUncheckedChildren(t *testing.T) { + repo := newPipelineTestRepo() + repo.Issues[10] = &pipelineTestIssue{Number: 10, Title: "security(go-io): Validate tokens", State: "open", Labels: []string{"agentic", "security"}} + repo.Issues[11] = &pipelineTestIssue{Number: 11, Title: "security(go-io): Sanitize input", State: "open", Labels: []string{"agentic", "security"}} + repo.Issues[12] = &pipelineTestIssue{Number: 12, Title: "security(go-io): Add rate limiting", State: "open", Labels: []string{"agentic", "security"}} + repo.Issues[20] = &pipelineTestIssue{ + Number: 20, + Title: "epic(go-io): security pipeline", + State: "open", + Labels: []string{"agentic", "epic", "security"}, + Body: "## Overview\n\nEpic branch: `epic/20-security`\n\n## Child Issues\n\n- [ ] #10 security(go-io): Validate tokens\n- [ ] #11 security(go-io): Sanitize input\n- [ ] #12 security(go-io): Add rate limiting\n", + } + srv := newPipelineTestServer(t, map[string]*pipelineTestRepo{"go-io": repo}) + + s, _ := testPrepWithCore(t, srv) + output, err := s.pipelineEpicRun(context.Background(), PipelineEpicRunInput{ + Org: "core", + Repo: "go-io", + EpicNumber: 20, + DryRun: true, + }) + + require.NoError(t, err) + assert.Len(t, output.Dispatched, 3) + assert.Equal(t, "epic/20-security", output.Branch) +} diff --git a/pkg/agentic/pipeline_fix.go b/pkg/agentic/pipeline_fix.go new file mode 100644 index 0000000..6a840db --- /dev/null +++ b/pkg/agentic/pipeline_fix.go @@ -0,0 +1,286 @@ +// SPDX-License-Identifier: EUPL-1.2 + +package agentic + +import ( + "context" + + core "dappco.re/go/core" +) + +type PipelineFixInput struct { + Org string `json:"org,omitempty"` + Repo string `json:"repo"` + Number int `json:"number"` + WorkspaceDir string `json:"workspace_dir,omitempty"` + Commit bool `json:"commit,omitempty"` + Push bool `json:"push,omitempty"` + DryRun bool `json:"dry_run,omitempty"` +} + +type PipelineFixOutput struct { + Success bool `json:"success"` + Org string `json:"org,omitempty"` + Repo string `json:"repo"` + Number int `json:"number"` + Action string `json:"action"` + Message string `json:"message,omitempty"` + Files int `json:"files,omitempty"` + Committed bool `json:"committed,omitempty"` + Pushed bool `json:"pushed,omitempty"` +} + +func (s *PrepSubsystem) cmdPipelineFixReviews(options core.Options) core.Result { + ctx := s.commandContext() + org, repo, number := pipelineRepoAndNumberValue(options) + if repo == "" || number <= 0 { + core.Print(nil, "usage: core-agent pipeline/fix/reviews --repo= [--org=core] [--dry-run]") + return core.Result{Value: core.E("agentic.cmdPipelineFixReviews", "repo and pull request number are required", nil), OK: false} + } + + output, err := s.pipelineFixReviews(ctx, PipelineFixInput{Org: org, Repo: repo, Number: number, DryRun: optionBoolValue(options, "dry_run", "dry-run")}) + if err != nil { + core.Print(nil, "error: %v", err) + return core.Result{Value: err, OK: false} + } + + core.Print(nil, "pr: %s/%s#%d", output.Org, output.Repo, output.Number) + core.Print(nil, "action: %s", output.Action) + core.Print(nil, "message: %s", output.Message) + return core.Result{Value: output, OK: true} +} + +func (s *PrepSubsystem) cmdPipelineFixConflicts(options core.Options) core.Result { + ctx := s.commandContext() + org, repo, number := pipelineRepoAndNumberValue(options) + if repo == "" || number <= 0 { + core.Print(nil, "usage: core-agent pipeline/fix/conflicts --repo= [--org=core] [--dry-run]") + return core.Result{Value: core.E("agentic.cmdPipelineFixConflicts", "repo and pull request number are required", nil), OK: false} + } + + output, err := s.pipelineFixConflicts(ctx, PipelineFixInput{Org: org, Repo: repo, Number: number, DryRun: optionBoolValue(options, "dry_run", "dry-run")}) + if err != nil { + core.Print(nil, "error: %v", err) + return core.Result{Value: err, OK: false} + } + + core.Print(nil, "pr: %s/%s#%d", output.Org, output.Repo, output.Number) + core.Print(nil, "action: %s", output.Action) + core.Print(nil, "message: %s", output.Message) + return core.Result{Value: output, OK: true} +} + +func (s *PrepSubsystem) cmdPipelineFixFormat(options core.Options) core.Result { + ctx := s.commandContext() + org, repo, number := pipelineRepoAndNumberValue(options) + if repo == "" || number <= 0 { + core.Print(nil, "usage: core-agent pipeline/fix/format --repo= [--org=core] [--workspace=|--repo-dir=] [--commit] [--push] [--dry-run]") + return core.Result{Value: core.E("agentic.cmdPipelineFixFormat", "repo and pull request number are required", nil), OK: false} + } + + output, err := s.pipelineFixFormat(ctx, PipelineFixInput{ + Org: org, + Repo: repo, + Number: number, + WorkspaceDir: pipelineWorkspaceDir(options), + Commit: optionBoolValue(options, "commit"), + Push: optionBoolValue(options, "push"), + DryRun: optionBoolValue(options, "dry_run", "dry-run"), + }) + if err != nil { + core.Print(nil, "error: %v", err) + return core.Result{Value: err, OK: false} + } + + core.Print(nil, "pr: %s/%s#%d", output.Org, output.Repo, output.Number) + core.Print(nil, "action: %s", output.Action) + core.Print(nil, "files: %d", output.Files) + core.Print(nil, "committed: %v", output.Committed) + core.Print(nil, "pushed: %v", output.Pushed) + if output.Message != "" { + core.Print(nil, "message: %s", output.Message) + } + return core.Result{Value: output, OK: true} +} + +func (s *PrepSubsystem) cmdPipelineFixThreads(options core.Options) core.Result { + ctx := s.commandContext() + org, repo, number := pipelineRepoAndNumberValue(options) + if repo == "" || number <= 0 { + core.Print(nil, "usage: core-agent pipeline/fix/threads --repo= [--org=core] [--dry-run]") + return core.Result{Value: core.E("agentic.cmdPipelineFixThreads", "repo and pull request number are required", nil), OK: false} + } + + output, err := s.pipelineFixThreads(ctx, PipelineFixInput{Org: org, Repo: repo, Number: number, DryRun: optionBoolValue(options, "dry_run", "dry-run")}) + if err != nil { + core.Print(nil, "error: %v", err) + return core.Result{Value: err, OK: false} + } + + core.Print(nil, "pr: %s/%s#%d", output.Org, output.Repo, output.Number) + core.Print(nil, "action: %s", output.Action) + core.Print(nil, "message: %s", output.Message) + return core.Result{Value: output, OK: true} +} + +func (s *PrepSubsystem) pipelineFixReviews(ctx context.Context, input PipelineFixInput) (PipelineFixOutput, error) { + if input.Repo == "" || input.Number <= 0 { + return PipelineFixOutput{}, core.E("pipelineFixReviews", "repo and pull request number are required", nil) + } + if input.Org == "" { + input.Org = "core" + } + if !input.DryRun { + s.commentOnIssue(ctx, input.Org, input.Repo, input.Number, "Can you fix the code reviews?") + } + return PipelineFixOutput{ + Success: true, + Org: input.Org, + Repo: input.Repo, + Number: input.Number, + Action: "comment", + Message: "Can you fix the code reviews?", + }, nil +} + +func (s *PrepSubsystem) pipelineFixConflicts(ctx context.Context, input PipelineFixInput) (PipelineFixOutput, error) { + if input.Repo == "" || input.Number <= 0 { + return PipelineFixOutput{}, core.E("pipelineFixConflicts", "repo and pull request number are required", nil) + } + if input.Org == "" { + input.Org = "core" + } + if !input.DryRun { + s.commentOnIssue(ctx, input.Org, input.Repo, input.Number, "Can you fix the merge conflict?") + } + return PipelineFixOutput{ + Success: true, + Org: input.Org, + Repo: input.Repo, + Number: input.Number, + Action: "comment", + Message: "Can you fix the merge conflict?", + }, nil +} + +func (s *PrepSubsystem) pipelineFixFormat(ctx context.Context, input PipelineFixInput) (PipelineFixOutput, error) { + if input.Repo == "" || input.Number <= 0 { + return PipelineFixOutput{}, core.E("pipelineFixFormat", "repo and pull request number are required", nil) + } + if input.WorkspaceDir == "" { + return PipelineFixOutput{}, core.E("pipelineFixFormat", "workspace or repo_dir is required for formatting", nil) + } + if input.Org == "" { + input.Org = "core" + } + + process := s.Core().Process() + fileList := process.RunIn(ctx, input.WorkspaceDir, "rg", "--files", "-g", "*.go") + if !fileList.OK { + fileList = process.RunIn(ctx, input.WorkspaceDir, "find", ".", "-name", "*.go", "-type", "f") + } + + goFiles := []string{} + if fileList.OK { + for _, file := range core.Split(core.Trim(resultText(fileList)), "\n") { + trimmed := core.Trim(file) + if trimmed != "" { + goFiles = append(goFiles, trimmed) + } + } + } + + output := PipelineFixOutput{ + Success: true, + Org: input.Org, + Repo: input.Repo, + Number: input.Number, + Action: "format", + Files: len(goFiles), + Message: "no Go files matched", + } + if len(goFiles) == 0 { + return output, nil + } + + output.Message = "formatted Go files" + if input.DryRun { + return output, nil + } + + formatResult := process.RunIn(ctx, input.WorkspaceDir, "sh", "-lc", "files=$(rg --files -g '*.go' 2>/dev/null || find . -name '*.go' -type f); if [ -n \"$files\" ]; then gofmt -w $files; fi") + if !formatResult.OK { + return PipelineFixOutput{}, core.E("pipelineFixFormat", "gofmt failed", nil) + } + + statusResult := process.RunIn(ctx, input.WorkspaceDir, "git", "status", "--porcelain") + if !statusResult.OK { + return output, nil + } + if core.Trim(resultText(statusResult)) == "" { + output.Message = "repo already formatted" + return output, nil + } + + if input.Commit || input.Push { + addResult := process.RunIn(ctx, input.WorkspaceDir, "git", "add", "-A") + if !addResult.OK { + return PipelineFixOutput{}, core.E("pipelineFixFormat", "git add failed", nil) + } + commitResult := process.RunIn(ctx, input.WorkspaceDir, "git", "commit", "-m", "chore: apply formatting") + if !commitResult.OK { + return PipelineFixOutput{}, core.E("pipelineFixFormat", "git commit failed", nil) + } + output.Committed = true + } + + if input.Push { + pushResult := process.RunIn(ctx, input.WorkspaceDir, "git", "push") + if !pushResult.OK { + return PipelineFixOutput{}, core.E("pipelineFixFormat", "git push failed", nil) + } + output.Pushed = true + } + + return output, nil +} + +func (s *PrepSubsystem) pipelineFixThreads(ctx context.Context, input PipelineFixInput) (PipelineFixOutput, error) { + if input.Repo == "" || input.Number <= 0 { + return PipelineFixOutput{}, core.E("pipelineFixThreads", "repo and pull request number are required", nil) + } + if input.Org == "" { + input.Org = "core" + } + + reader := &pipelineForgeMetaReader{subsystem: s, org: input.Org} + meta, err := reader.GetPRMeta(ctx, input.Repo, input.Number) + if err != nil { + return PipelineFixOutput{}, err + } + unresolved := meta.ThreadsTotal - meta.ThreadsResolved + if unresolved <= 0 { + return PipelineFixOutput{ + Success: true, + Org: input.Org, + Repo: input.Repo, + Number: input.Number, + Action: "noop", + Message: "no unresolved review threads remain", + }, nil + } + + message := core.Sprintf("Please resolve the %d remaining review thread(s) after the latest fix.", unresolved) + if !input.DryRun { + s.commentOnIssue(ctx, input.Org, input.Repo, input.Number, message) + } + + return PipelineFixOutput{ + Success: true, + Org: input.Org, + Repo: input.Repo, + Number: input.Number, + Action: "comment", + Message: message, + }, nil +} diff --git a/pkg/agentic/pipeline_fix_test.go b/pkg/agentic/pipeline_fix_test.go new file mode 100644 index 0000000..ce1d4c4 --- /dev/null +++ b/pkg/agentic/pipeline_fix_test.go @@ -0,0 +1,85 @@ +// SPDX-License-Identifier: EUPL-1.2 + +package agentic + +import ( + "context" + "testing" + "time" + + core "dappco.re/go/core" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestPipelineFix_Good_ReviewsPostsComment(t *testing.T) { + repo := newPipelineTestRepo() + srv := newPipelineTestServer(t, map[string]*pipelineTestRepo{"go-io": repo}) + + s, _ := testPrepWithCore(t, srv) + output, err := s.pipelineFixReviews(context.Background(), PipelineFixInput{Org: "core", Repo: "go-io", Number: 7}) + + require.NoError(t, err) + assert.True(t, output.Success) + assert.Contains(t, repo.Comments[7][0], "Can you fix the code reviews?") +} + +func TestPipelineFix_Bad_FormatRequiresWorkspace(t *testing.T) { + s := &PrepSubsystem{ + ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}), + backoff: make(map[string]time.Time), + failCount: make(map[string]int), + } + + _, err := s.pipelineFixFormat(context.Background(), PipelineFixInput{Org: "core", Repo: "go-io", Number: 9}) + + require.Error(t, err) + assert.Contains(t, err.Error(), "workspace or repo_dir is required") +} + +func TestPipelineFix_Ugly_ThreadsNoopWhenAlreadyResolved(t *testing.T) { + repo := newPipelineTestRepo() + repo.Pulls[5] = &pipelineTestPR{ + Number: 5, + Title: "Already clean", + State: "open", + Mergeable: boolPtr(true), + HeadRef: "agent/clean", + HeadSHA: "sha-clean", + BaseRef: "dev", + ReviewThreadsTotal: 2, + ReviewThreadsResolved: 2, + } + srv := newPipelineTestServer(t, map[string]*pipelineTestRepo{"go-io": repo}) + + s, _ := testPrepWithCore(t, srv) + output, err := s.pipelineFixThreads(context.Background(), PipelineFixInput{Org: "core", Repo: "go-io", Number: 5}) + + require.NoError(t, err) + assert.Equal(t, "noop", output.Action) + assert.Equal(t, "no unresolved review threads remain", output.Message) +} + +func TestPipelineFix_Format_Good_DryRunCountsFiles(t *testing.T) { + dir := t.TempDir() + require.True(t, fs.Write(core.JoinPath(dir, "one.go"), "package main\nfunc main( ){}\n").OK) + require.True(t, fs.Write(core.JoinPath(dir, "two.go"), "package main\nfunc helper( ){}\n").OK) + + s := &PrepSubsystem{ + ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}), + backoff: make(map[string]time.Time), + failCount: make(map[string]int), + } + + output, err := s.pipelineFixFormat(context.Background(), PipelineFixInput{ + Org: "core", + Repo: "go-io", + Number: 12, + WorkspaceDir: dir, + DryRun: true, + }) + + require.NoError(t, err) + assert.Equal(t, "format", output.Action) + assert.Equal(t, 2, output.Files) +} diff --git a/pkg/agentic/pipeline_monitor.go b/pkg/agentic/pipeline_monitor.go new file mode 100644 index 0000000..92c365d --- /dev/null +++ b/pkg/agentic/pipeline_monitor.go @@ -0,0 +1,501 @@ +// SPDX-License-Identifier: EUPL-1.2 + +package agentic + +import ( + "context" + "regexp" + + core "dappco.re/go/core" +) + +var pipelineEpicBranchPattern = regexp.MustCompile("Epic branch:\\s*`([^`]+)`") +var pipelineEpicChildPattern = regexp.MustCompile(`(?m)^(\s*-\s*\[)([ xX])(\]\s*#)(\d+)(.*)$`) + +type MetaReader interface { + GetPRMeta(ctx context.Context, repo string, prNumber int) (PipelinePRMeta, error) + GetEpicMeta(ctx context.Context, repo string, issueNumber int) (PipelineEpicMeta, error) + GetIssueState(ctx context.Context, repo string, issueNumber int) (PipelineIssueState, error) + GetCommentReactions(ctx context.Context, repo string, commentID int64) ([]PipelineReactionMeta, error) +} + +type PipelineCheckMeta struct { + Name string `json:"name"` + Conclusion string `json:"conclusion,omitempty"` + Status string `json:"status,omitempty"` +} + +type PipelinePRMeta struct { + Number int `json:"number"` + State string `json:"state"` + Mergeable string `json:"mergeable"` + HeadSHA string `json:"head_sha,omitempty"` + HeadDate string `json:"head_date,omitempty"` + BaseBranch string `json:"base_branch,omitempty"` + HeadBranch string `json:"head_branch,omitempty"` + Checks []PipelineCheckMeta `json:"checks,omitempty"` + ThreadsTotal int `json:"threads_total,omitempty"` + ThreadsResolved int `json:"threads_resolved,omitempty"` + HasEyesReaction bool `json:"has_eyes_reaction,omitempty"` + URL string `json:"url,omitempty"` +} + +type PipelineIssueState struct { + Number int `json:"number"` + State string `json:"state"` + Title string `json:"title"` + URL string `json:"url,omitempty"` + Labels []string `json:"labels,omitempty"` +} + +type PipelineEpicChildMeta struct { + Number int `json:"number"` + Title string `json:"title"` + Checked bool `json:"checked"` + State string `json:"state"` + URL string `json:"url,omitempty"` + PRs []int `json:"prs,omitempty"` +} + +type PipelineEpicMeta struct { + Number int `json:"number"` + Title string `json:"title"` + State string `json:"state"` + URL string `json:"url,omitempty"` + Body string `json:"body,omitempty"` + Branch string `json:"branch,omitempty"` + Children []PipelineEpicChildMeta `json:"children"` +} + +type PipelineReactionMeta struct { + Content string `json:"content"` + Count int `json:"count"` +} + +type PipelineMonitorInput struct { + Org string `json:"org,omitempty"` + Repo string `json:"repo,omitempty"` + DryRun bool `json:"dry_run,omitempty"` +} + +type PipelineMonitorAction struct { + Repo string `json:"repo"` + Number int `json:"number"` + Action string `json:"action"` + Reason string `json:"reason"` + URL string `json:"url,omitempty"` +} + +type PipelineMonitorOutput struct { + Success bool `json:"success"` + Org string `json:"org,omitempty"` + Repo string `json:"repo,omitempty"` + Actions []PipelineMonitorAction `json:"actions,omitempty"` +} + +type pipelinePullRequestRecord struct { + Number int `json:"number"` + Title string `json:"title"` + State string `json:"state"` + HTMLURL string `json:"html_url"` + Merged bool `json:"merged"` + Mergeable *bool `json:"mergeable"` + MergeableState string `json:"mergeable_state"` + UpdatedAt string `json:"updated_at"` + ReviewThreadsTotal int `json:"review_threads_total"` + ReviewThreadsResolved int `json:"review_threads_resolved"` + ReviewThreadsUnresolved int `json:"review_threads_unresolved"` + ReviewComments int `json:"review_comments"` + Comments int `json:"comments"` + Reactions map[string]int `json:"reactions"` + Head struct { + Ref string `json:"ref"` + SHA string `json:"sha"` + Date string `json:"date"` + UpdatedAt string `json:"updated_at"` + Repo struct { + UpdatedAt string `json:"updated_at"` + PushedAt string `json:"pushed_at"` + } `json:"repo"` + } `json:"head"` + Base struct { + Ref string `json:"ref"` + } `json:"base"` +} + +type pipelineStatusRecord struct { + Statuses []struct { + Context string `json:"context"` + Name string `json:"name"` + Status string `json:"status"` + State string `json:"state"` + Conclusion string `json:"conclusion"` + UpdatedAt string `json:"updated_at"` + CreatedAt string `json:"created_at"` + } `json:"statuses"` +} + +type pipelineForgeMetaReader struct { + subsystem *PrepSubsystem + org string +} + +func (s *PrepSubsystem) cmdPipelineMonitor(options core.Options) core.Result { + ctx := s.commandContext() + output, err := s.pipelineMonitorWithReader(ctx, PipelineMonitorInput{ + Org: pipelineOrgValue(options), + Repo: pipelineRepoValue(options), + DryRun: optionBoolValue(options, "dry_run", "dry-run"), + }, &pipelineForgeMetaReader{subsystem: s, org: pipelineOrgValue(options)}) + if err != nil { + core.Print(nil, "error: %v", err) + return core.Result{Value: err, OK: false} + } + + if output.Repo != "" { + core.Print(nil, "repo: %s/%s", output.Org, output.Repo) + } else { + core.Print(nil, "org: %s", output.Org) + } + core.Print(nil, "actions: %d", len(output.Actions)) + for _, action := range output.Actions { + core.Print(nil, " %s #%d %s", action.Repo, action.Number, action.Action) + } + if len(output.Actions) == 0 { + core.Print(nil, "no interventions") + } + + return core.Result{Value: output, OK: true} +} + +func (s *PrepSubsystem) pipelineMonitorWithReader(ctx context.Context, input PipelineMonitorInput, reader MetaReader) (PipelineMonitorOutput, error) { + if s.forgeToken == "" { + return PipelineMonitorOutput{}, core.E("pipelineMonitor", "no Forge token configured", nil) + } + if input.Org == "" { + input.Org = "core" + } + + repos := []string{} + if input.Repo != "" { + repos = append(repos, input.Repo) + } else { + records, err := s.pipelineListOrgRepos(ctx, input.Org) + if err != nil { + return PipelineMonitorOutput{}, err + } + for _, record := range records { + repos = append(repos, record.Name) + } + } + + output := PipelineMonitorOutput{ + Success: true, + Org: input.Org, + Repo: input.Repo, + } + + for _, repo := range repos { + pullRequests, err := s.pipelineListPullRequests(ctx, input.Org, repo, "open") + if err != nil { + return PipelineMonitorOutput{}, err + } + + for _, pullRequest := range pullRequests { + meta, err := reader.GetPRMeta(ctx, repo, pullRequest.Number) + if err != nil { + return PipelineMonitorOutput{}, err + } + if meta.State != "open" { + continue + } + + action := PipelineMonitorAction{ + Repo: repo, + Number: meta.Number, + URL: meta.URL, + } + + switch { + case meta.Mergeable == "conflicting": + action.Action = "fix/conflicts" + action.Reason = "merge conflict detected" + if !input.DryRun { + s.commentOnIssue(ctx, input.Org, repo, meta.Number, "Can you fix the merge conflict?") + } + case meta.ThreadsTotal > meta.ThreadsResolved: + action.Action = "fix/reviews" + action.Reason = "unresolved review threads detected" + if !input.DryRun { + s.commentOnIssue(ctx, input.Org, repo, meta.Number, "Can you fix the code reviews?") + } + case pipelineChecksSuccessful(meta.Checks): + action.Action = "merge" + action.Reason = "all checks passed and no review blockers remain" + if !input.DryRun { + mergeResult := s.forgeMergePR(ctx, input.Org, repo, meta.Number) + if !mergeResult.OK { + return PipelineMonitorOutput{}, commandResultError("pipelineMonitor.merge", mergeResult) + } + } + default: + continue + } + + output.Actions = append(output.Actions, action) + } + } + + return output, nil +} + +func (s *PrepSubsystem) pipelineListPullRequests(ctx context.Context, org, repo, state string) ([]pipelinePullRequestRecord, error) { + if state == "" { + state = "open" + } + + url := core.Sprintf("%s/api/v1/repos/%s/%s/pulls?state=%s&limit=100&page=1", s.forgeURL, org, repo, state) + result := HTTPGet(ctx, url, s.forgeToken, "token") + if !result.OK { + return nil, core.E("pipelineListPullRequests", core.Concat("failed to list pull requests for ", repo), nil) + } + + var pullRequests []pipelinePullRequestRecord + if err := pipelineDecodeJSON(result.Value.(string), &pullRequests, "pipelineListPullRequests", "failed to decode pull request list"); err != nil { + return nil, err + } + return pullRequests, nil +} + +func (r *pipelineForgeMetaReader) GetPRMeta(ctx context.Context, repo string, prNumber int) (PipelinePRMeta, error) { + url := core.Sprintf("%s/api/v1/repos/%s/%s/pulls/%d", r.subsystem.forgeURL, r.org, repo, prNumber) + result := HTTPGet(ctx, url, r.subsystem.forgeToken, "token") + if !result.OK { + return PipelinePRMeta{}, core.E("MetaReader.GetPRMeta", core.Concat("failed to read PR #", core.Sprint(prNumber)), nil) + } + + var pullRequest pipelinePullRequestRecord + if err := pipelineDecodeJSON(result.Value.(string), &pullRequest, "MetaReader.GetPRMeta", "failed to decode pull request"); err != nil { + return PipelinePRMeta{}, err + } + + statuses := []PipelineCheckMeta{} + if pullRequest.Head.SHA != "" { + statusURL := core.Sprintf("%s/api/v1/repos/%s/%s/commits/%s/status", r.subsystem.forgeURL, r.org, repo, pullRequest.Head.SHA) + statusResult := HTTPGet(ctx, statusURL, r.subsystem.forgeToken, "token") + if statusResult.OK { + var status pipelineStatusRecord + if err := pipelineDecodeJSON(statusResult.Value.(string), &status, "MetaReader.GetPRMeta", "failed to decode commit status"); err == nil { + for _, entry := range status.Statuses { + rawState := entry.Status + if rawState == "" { + rawState = entry.State + } + if rawState == "" { + rawState = entry.Conclusion + } + + name := entry.Context + if name == "" { + name = entry.Name + } + statuses = append(statuses, PipelineCheckMeta{ + Name: name, + Conclusion: pipelineCheckConclusion(rawState), + Status: pipelineCheckStatus(rawState), + }) + } + } + } + } + + state := core.Lower(pullRequest.State) + if pullRequest.Merged { + state = "merged" + } + if state == "" { + state = "open" + } + + mergeable := "unknown" + switch { + case pullRequest.Mergeable != nil && *pullRequest.Mergeable: + mergeable = "mergeable" + case pullRequest.Mergeable != nil && !*pullRequest.Mergeable: + mergeable = "conflicting" + case core.Lower(pullRequest.MergeableState) == "clean", core.Lower(pullRequest.MergeableState) == "mergeable": + mergeable = "mergeable" + case core.Lower(pullRequest.MergeableState) == "dirty", core.Lower(pullRequest.MergeableState) == "conflicting": + mergeable = "conflicting" + } + + threadsTotal := pullRequest.ReviewThreadsTotal + if threadsTotal == 0 { + if pullRequest.ReviewComments > 0 { + threadsTotal = pullRequest.ReviewComments + } else { + threadsTotal = pullRequest.Comments + } + } + + threadsResolved := pullRequest.ReviewThreadsResolved + if threadsResolved == 0 && pullRequest.ReviewThreadsUnresolved > 0 && threadsTotal >= pullRequest.ReviewThreadsUnresolved { + threadsResolved = threadsTotal - pullRequest.ReviewThreadsUnresolved + } + + headDate := pullRequest.Head.Date + if headDate == "" { + headDate = pullRequest.Head.UpdatedAt + } + if headDate == "" { + headDate = pullRequest.Head.Repo.UpdatedAt + } + if headDate == "" { + headDate = pullRequest.Head.Repo.PushedAt + } + if headDate == "" { + headDate = pullRequest.UpdatedAt + } + + return PipelinePRMeta{ + Number: pullRequest.Number, + State: state, + Mergeable: mergeable, + HeadSHA: pullRequest.Head.SHA, + HeadDate: headDate, + BaseBranch: pullRequest.Base.Ref, + HeadBranch: pullRequest.Head.Ref, + Checks: statuses, + ThreadsTotal: threadsTotal, + ThreadsResolved: threadsResolved, + HasEyesReaction: pullRequest.Reactions["eyes"] > 0, + URL: pullRequest.HTMLURL, + }, nil +} + +func (r *pipelineForgeMetaReader) GetEpicMeta(ctx context.Context, repo string, issueNumber int) (PipelineEpicMeta, error) { + issue, err := r.subsystem.pipelineGetIssue(ctx, r.org, repo, issueNumber) + if err != nil { + return PipelineEpicMeta{}, err + } + + meta := PipelineEpicMeta{ + Number: issue.Number, + Title: issue.Title, + State: issue.State, + URL: issue.HTMLURL, + Body: issue.Body, + } + + branchMatch := pipelineEpicBranchPattern.FindStringSubmatch(issue.Body) + if len(branchMatch) == 2 { + meta.Branch = branchMatch[1] + } + + matches := pipelineEpicChildPattern.FindAllStringSubmatch(issue.Body, -1) + for _, match := range matches { + if len(match) != 6 { + continue + } + number := parseIntString(match[4]) + if number <= 0 { + continue + } + + state, stateErr := r.GetIssueState(ctx, repo, number) + if stateErr != nil { + return PipelineEpicMeta{}, stateErr + } + meta.Children = append(meta.Children, PipelineEpicChildMeta{ + Number: number, + Title: state.Title, + Checked: core.Lower(match[2]) == "x", + State: state.State, + URL: state.URL, + }) + } + + return meta, nil +} + +func (r *pipelineForgeMetaReader) GetIssueState(ctx context.Context, repo string, issueNumber int) (PipelineIssueState, error) { + issue, err := r.subsystem.pipelineGetIssue(ctx, r.org, repo, issueNumber) + if err != nil { + return PipelineIssueState{}, err + } + + state := issue.State + if state == "" { + state = "open" + } + + return PipelineIssueState{ + Number: issue.Number, + State: state, + Title: issue.Title, + URL: issue.HTMLURL, + Labels: pipelineIssueLabelNames(issue), + }, nil +} + +func (r *pipelineForgeMetaReader) GetCommentReactions(ctx context.Context, repo string, commentID int64) ([]PipelineReactionMeta, error) { + url := core.Sprintf("%s/api/v1/repos/%s/%s/issues/comments/%d/reactions", r.subsystem.forgeURL, r.org, repo, commentID) + result := HTTPGet(ctx, url, r.subsystem.forgeToken, "token") + if !result.OK { + return nil, core.E("MetaReader.GetCommentReactions", core.Concat("failed to read comment reactions for ", core.Sprint(commentID)), nil) + } + + var reactions []struct { + Content string `json:"content"` + } + if err := pipelineDecodeJSON(result.Value.(string), &reactions, "MetaReader.GetCommentReactions", "failed to decode reactions"); err != nil { + return nil, err + } + + counts := map[string]int{} + for _, reaction := range reactions { + counts[reaction.Content]++ + } + + aggregated := []PipelineReactionMeta{} + for content, count := range counts { + aggregated = append(aggregated, PipelineReactionMeta{Content: content, Count: count}) + } + return aggregated, nil +} + +func pipelineCheckConclusion(rawState string) string { + switch core.Lower(rawState) { + case "success": + return "success" + case "failure", "error": + return "failure" + default: + return "" + } +} + +func pipelineCheckStatus(rawState string) string { + switch core.Lower(rawState) { + case "success", "failure", "error": + return "completed" + case "pending", "queued": + return "queued" + case "running", "in_progress": + return "in_progress" + default: + return "" + } +} + +func pipelineChecksSuccessful(checks []PipelineCheckMeta) bool { + if len(checks) == 0 { + return false + } + for _, check := range checks { + if check.Status != "completed" || check.Conclusion != "success" { + return false + } + } + return true +} diff --git a/pkg/agentic/pipeline_monitor_test.go b/pkg/agentic/pipeline_monitor_test.go new file mode 100644 index 0000000..a8e855b --- /dev/null +++ b/pkg/agentic/pipeline_monitor_test.go @@ -0,0 +1,104 @@ +// SPDX-License-Identifier: EUPL-1.2 + +package agentic + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestPipelineMonitor_Good_AutoIntervenesAndMerges(t *testing.T) { + repo := newPipelineTestRepo() + repo.Pulls[1] = &pipelineTestPR{ + Number: 1, + Title: "Fix conflicts", + State: "open", + Mergeable: boolPtr(false), + HeadRef: "agent/conflicts", + HeadSHA: "sha-conflicts", + BaseRef: "dev", + MergeableState: "dirty", + } + repo.Pulls[2] = &pipelineTestPR{ + Number: 2, + Title: "Fix reviews", + State: "open", + Mergeable: boolPtr(true), + HeadRef: "agent/reviews", + HeadSHA: "sha-reviews", + BaseRef: "dev", + ReviewThreadsTotal: 3, + ReviewThreadsResolved: 1, + ReviewThreadsUnresolved: 2, + Statuses: []map[string]any{ + {"context": "qa", "status": "success"}, + }, + } + repo.Pulls[3] = &pipelineTestPR{ + Number: 3, + Title: "Ready to merge", + State: "open", + Mergeable: boolPtr(true), + HeadRef: "agent/merge", + HeadSHA: "sha-merge", + BaseRef: "dev", + ReviewThreadsTotal: 1, + ReviewThreadsResolved: 1, + Statuses: []map[string]any{ + {"context": "qa", "status": "success"}, + {"context": "build", "status": "success"}, + }, + } + srv := newPipelineTestServer(t, map[string]*pipelineTestRepo{"go-io": repo}) + + s, _ := testPrepWithCore(t, srv) + output, err := s.pipelineMonitorWithReader(context.Background(), PipelineMonitorInput{ + Org: "core", + Repo: "go-io", + }, &pipelineForgeMetaReader{subsystem: s, org: "core"}) + + require.NoError(t, err) + assert.Len(t, output.Actions, 3) + assert.Contains(t, repo.Comments[1][0], "Can you fix the merge conflict?") + assert.Contains(t, repo.Comments[2][0], "Can you fix the code reviews?") + assert.Contains(t, repo.Merged, 3) +} + +func TestPipelineMonitor_Bad_NoToken(t *testing.T) { + s, _ := testPrepWithCore(t, nil) + s.forgeToken = "" + + _, err := s.pipelineMonitorWithReader(context.Background(), PipelineMonitorInput{Org: "core", Repo: "go-io"}, &pipelineForgeMetaReader{subsystem: s, org: "core"}) + + require.Error(t, err) + assert.Contains(t, err.Error(), "no Forge token configured") +} + +func TestPipelineMonitor_Ugly_NoActionWhenChecksPending(t *testing.T) { + repo := newPipelineTestRepo() + repo.Pulls[4] = &pipelineTestPR{ + Number: 4, + Title: "Waiting for CI", + State: "open", + Mergeable: boolPtr(true), + HeadRef: "agent/pending", + HeadSHA: "sha-pending", + BaseRef: "dev", + Statuses: []map[string]any{{"context": "qa", "status": "pending"}}, + Reactions: map[string]int{}, + Comments: 0, + } + srv := newPipelineTestServer(t, map[string]*pipelineTestRepo{"go-io": repo}) + + s, _ := testPrepWithCore(t, srv) + output, err := s.pipelineMonitorWithReader(context.Background(), PipelineMonitorInput{ + Org: "core", + Repo: "go-io", + }, &pipelineForgeMetaReader{subsystem: s, org: "core"}) + + require.NoError(t, err) + assert.Empty(t, output.Actions) +} diff --git a/pkg/agentic/pipeline_onboard.go b/pkg/agentic/pipeline_onboard.go new file mode 100644 index 0000000..f758799 --- /dev/null +++ b/pkg/agentic/pipeline_onboard.go @@ -0,0 +1,162 @@ +// SPDX-License-Identifier: EUPL-1.2 + +package agentic + +import ( + "context" + + core "dappco.re/go/core" +) + +type PipelineOnboardInput struct { + Org string `json:"org,omitempty"` + Repo string `json:"repo"` + Agent string `json:"agent,omitempty"` + Template string `json:"template,omitempty"` + DispatchDryRun bool `json:"dispatch_dry_run,omitempty"` + Limit int `json:"limit,omitempty"` +} + +type PipelineOnboardOutput struct { + Success bool `json:"success"` + Org string `json:"org,omitempty"` + Repo string `json:"repo"` + Audit PipelineAuditOutput `json:"audit"` + Epic PipelineEpicCreateOutput `json:"epic"` + Runs []PipelineEpicRunOutput `json:"runs,omitempty"` + Direct []PipelineIssueRef `json:"direct,omitempty"` +} + +func (s *PrepSubsystem) cmdPipelineOnboard(options core.Options) core.Result { + ctx := s.commandContext() + repo := pipelineRepoValue(options) + if repo == "" { + core.Print(nil, "usage: core-agent pipeline/onboard [--org=core] [--agent=codex] [--template=coding] [--dry-run]") + return core.Result{Value: core.E("agentic.cmdPipelineOnboard", "repo is required", nil), OK: false} + } + + output, err := s.pipelineOnboard(ctx, PipelineOnboardInput{ + Org: pipelineOrgValue(options), + Repo: repo, + Agent: optionStringValue(options, "agent"), + Template: optionStringValue(options, "template"), + DispatchDryRun: optionBoolValue(options, "dry_run", "dry-run"), + Limit: optionIntValue(options, "limit"), + }) + if err != nil { + core.Print(nil, "error: %v", err) + return core.Result{Value: err, OK: false} + } + + core.Print(nil, "repo: %s/%s", output.Org, output.Repo) + core.Print(nil, "audit created: %d", len(output.Audit.Created)) + core.Print(nil, "epics: %d", len(output.Epic.Epics)) + core.Print(nil, "epic runs: %d", len(output.Runs)) + core.Print(nil, "direct: %d", len(output.Direct)) + for _, run := range output.Runs { + core.Print(nil, " epic #%d dispatched %d issue(s)", run.EpicNumber, len(run.Dispatched)) + } + return core.Result{Value: output, OK: true} +} + +func (s *PrepSubsystem) pipelineOnboard(ctx context.Context, input PipelineOnboardInput) (PipelineOnboardOutput, error) { + if input.Repo == "" { + return PipelineOnboardOutput{}, core.E("pipelineOnboard", "repo is required", nil) + } + if input.Org == "" { + input.Org = "core" + } + if input.Agent == "" { + input.Agent = "codex" + } + if input.Template == "" { + input.Template = "coding" + } + + auditOutput, err := s.pipelineAudit(ctx, PipelineAuditInput{ + Org: input.Org, + Repo: input.Repo, + }) + if err != nil { + return PipelineOnboardOutput{}, err + } + + epicOutput, err := s.pipelineEpicCreate(ctx, PipelineEpicCreateInput{ + Org: input.Org, + Repo: input.Repo, + }) + if err != nil { + return PipelineOnboardOutput{}, err + } + + output := PipelineOnboardOutput{ + Success: true, + Org: input.Org, + Repo: input.Repo, + Audit: auditOutput, + Epic: epicOutput, + } + + epicsToRun := make([]PipelineEpicMeta, 0, len(epicOutput.Epics)+len(epicOutput.Existing)) + epicsToRun = append(epicsToRun, epicOutput.Epics...) + epicsToRun = append(epicsToRun, epicOutput.Existing...) + + if len(epicsToRun) == 0 { + direct, directErr := s.pipelineOnboardDispatchDirect(ctx, input, epicOutput.Candidates) + if directErr != nil { + return PipelineOnboardOutput{}, directErr + } + output.Direct = direct + return output, nil + } + + for _, epic := range epicsToRun { + runOutput, runErr := s.pipelineEpicRun(ctx, PipelineEpicRunInput{ + Org: input.Org, + Repo: input.Repo, + EpicNumber: epic.Number, + Agent: input.Agent, + Template: input.Template, + DryRun: input.DispatchDryRun, + Limit: input.Limit, + Epic: &epic, + }) + if runErr != nil { + return PipelineOnboardOutput{}, runErr + } + output.Runs = append(output.Runs, runOutput) + } + + return output, nil +} + +func (s *PrepSubsystem) pipelineOnboardDispatchDirect(ctx context.Context, input PipelineOnboardInput, issues []PipelineIssueRef) ([]PipelineIssueRef, error) { + dispatched := []PipelineIssueRef{} + for _, issue := range issues { + if issue.Number <= 0 || !pipelineIssueAutoDispatchAllowed(issue.Title) { + continue + } + if input.Limit > 0 && len(dispatched) >= input.Limit { + break + } + + if input.DispatchDryRun { + dispatched = append(dispatched, issue) + continue + } + + _, _, err := s.dispatch(ctx, nil, DispatchInput{ + Org: input.Org, + Repo: input.Repo, + Task: issue.Title, + Issue: issue.Number, + Agent: input.Agent, + Template: input.Template, + }) + if err != nil { + return nil, core.E("pipelineOnboard", core.Concat("failed to dispatch issue #", core.Sprint(issue.Number)), err) + } + dispatched = append(dispatched, issue) + } + return dispatched, nil +} diff --git a/pkg/agentic/pipeline_onboard_test.go b/pkg/agentic/pipeline_onboard_test.go new file mode 100644 index 0000000..2b96017 --- /dev/null +++ b/pkg/agentic/pipeline_onboard_test.go @@ -0,0 +1,72 @@ +// SPDX-License-Identifier: EUPL-1.2 + +package agentic + +import ( + "context" + "testing" + + core "dappco.re/go/core" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestPipelineOnboard_Good_ChainsAuditEpicAndDispatch(t *testing.T) { + repo := newPipelineTestRepo() + repo.Issues[1] = &pipelineTestIssue{ + Number: 1, + Title: "[Audit] Security", + Body: "- Validate tokens\n- Sanitize input\n- Add rate limiting", + State: "open", + Labels: []string{"audit", "security"}, + } + srv := newPipelineTestServer(t, map[string]*pipelineTestRepo{"go-io": repo}) + + s, _ := testPrepWithCore(t, srv) + output, err := s.pipelineOnboard(context.Background(), PipelineOnboardInput{ + Org: "core", + Repo: "go-io", + DispatchDryRun: true, + }) + + require.NoError(t, err) + assert.True(t, output.Success) + assert.Len(t, output.Audit.Created, 3) + require.Len(t, output.Runs, 1) + assert.Len(t, output.Runs[0].Dispatched, 3) + assert.Empty(t, output.Direct) +} + +func TestPipelineOnboard_Bad_MissingRepo(t *testing.T) { + s, _ := testPrepWithCore(t, nil) + + result := s.cmdPipelineOnboard(core.NewOptions()) + + require.False(t, result.OK) + err, ok := result.Value.(error) + require.True(t, ok) + assert.Contains(t, err.Error(), "repo is required") +} + +func TestPipelineOnboard_Ugly_DirectDispatchWhenEpicNotCreated(t *testing.T) { + repo := newPipelineTestRepo() + repo.Issues[1] = &pipelineTestIssue{ + Number: 1, + Title: "[Audit] Security", + Body: "- Validate tokens\n- Sanitize input", + State: "open", + Labels: []string{"audit", "security"}, + } + srv := newPipelineTestServer(t, map[string]*pipelineTestRepo{"go-io": repo}) + + s, _ := testPrepWithCore(t, srv) + output, err := s.pipelineOnboard(context.Background(), PipelineOnboardInput{ + Org: "core", + Repo: "go-io", + DispatchDryRun: true, + }) + + require.NoError(t, err) + assert.Empty(t, output.Runs) + assert.Len(t, output.Direct, 2) +} diff --git a/pkg/agentic/pipeline_training.go b/pkg/agentic/pipeline_training.go new file mode 100644 index 0000000..a93ad2f --- /dev/null +++ b/pkg/agentic/pipeline_training.go @@ -0,0 +1,32 @@ +// SPDX-License-Identifier: EUPL-1.2 + +package agentic + +import core "dappco.re/go/core" + +func (s *PrepSubsystem) cmdPipelineTrainingCapture(_ core.Options) core.Result { + core.Print(nil, "status: not yet implemented") + core.Print(nil, "reason: blocked-on-sibling") + return core.Result{ + Value: core.E("agentic.cmdPipelineTrainingCapture", "not yet implemented - blocked-on-sibling", nil), + OK: false, + } +} + +func (s *PrepSubsystem) cmdPipelineTrainingStats(_ core.Options) core.Result { + core.Print(nil, "status: not yet implemented") + core.Print(nil, "reason: blocked-on-sibling") + return core.Result{ + Value: core.E("agentic.cmdPipelineTrainingStats", "not yet implemented - blocked-on-sibling", nil), + OK: false, + } +} + +func (s *PrepSubsystem) cmdPipelineTrainingExport(_ core.Options) core.Result { + core.Print(nil, "status: not yet implemented") + core.Print(nil, "reason: blocked-on-sibling") + return core.Result{ + Value: core.E("agentic.cmdPipelineTrainingExport", "not yet implemented - blocked-on-sibling", nil), + OK: false, + } +} diff --git a/pkg/agentic/pipeline_training_test.go b/pkg/agentic/pipeline_training_test.go new file mode 100644 index 0000000..8c4c068 --- /dev/null +++ b/pkg/agentic/pipeline_training_test.go @@ -0,0 +1,35 @@ +// SPDX-License-Identifier: EUPL-1.2 + +package agentic + +import ( + "testing" + + core "dappco.re/go/core" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestPipelineTraining_Good_RootHelp(t *testing.T) { + s, _ := testPrepWithCore(t, nil) + + output := captureStdout(t, func() { + result := s.cmdPipelineTraining(core.NewOptions()) + require.True(t, result.OK) + }) + + assert.Contains(t, output, "core-agent pipeline/training/capture") + assert.Contains(t, output, "core-agent pipeline/training/stats") + assert.Contains(t, output, "core-agent pipeline/training/export") +} + +func TestPipelineTraining_Bad_CaptureBlockedOnSibling(t *testing.T) { + s, _ := testPrepWithCore(t, nil) + + result := s.cmdPipelineTrainingCapture(core.NewOptions()) + + require.False(t, result.OK) + err, ok := result.Value.(error) + require.True(t, ok) + assert.Contains(t, err.Error(), "blocked-on-sibling") +} diff --git a/tests/cli/pipeline/Taskfile.yaml b/tests/cli/pipeline/Taskfile.yaml new file mode 100644 index 0000000..44a5ed4 --- /dev/null +++ b/tests/cli/pipeline/Taskfile.yaml @@ -0,0 +1,26 @@ +version: "3" + +tasks: + test: + cmds: + - | + bash <<'EOF' + set -euo pipefail + source ../_lib/run.sh + + go build -trimpath -ldflags="-s -w" -o bin/core-agent ../../../cmd/core-agent + + output="$(mktemp)" + + run_capture_all 0 "$output" ./bin/core-agent pipeline + assert_contains "pipeline/audit " "$output" + + run_capture_all 1 "$output" ./bin/core-agent pipeline/audit + assert_contains "pipeline/audit " "$output" + + run_capture_all 1 "$output" ./bin/core-agent pipeline/budget/plan + assert_contains "blocked-on-sibling" "$output" + + run_capture_all 1 "$output" ./bin/core-agent pipeline/training/capture + assert_contains "blocked-on-sibling" "$output" + EOF