feat(agent/pipeline): add core pipeline command tree (#535)

Per RFC.pipeline.md "core pipeline Command Tree": full audit/epic/
monitor/fix/onboard/budget/training subcommand tree wired into
core-agent.

Lands across 18 files:
* pkg/agentic/pipeline_commands.go — registry: audit, epic/create|run|
  status|sync, monitor, fix/reviews|conflicts|format|threads, onboard,
  budget/*, training/*
* pkg/agentic/commands.go — pipeline registration wired in
* pkg/agentic/pipeline_audit.go — audit issue expansion + bug fix:
  audit-created implementation issues no longer carry the 'audit' label,
  so epic + onboard see them as implementation candidates
* pkg/agentic/pipeline_epic.go — epic group/run/sync
* pkg/agentic/pipeline_monitor.go — open-PR watcher
* pkg/agentic/pipeline_fix.go — reviews/conflicts/format/threads helpers
* pkg/agentic/pipeline_onboard.go — chained audit → epic → dispatch
* pkg/agentic/pipeline_budget.go + pipeline_training.go — stubs
  returning blocked-on-sibling pointer (sibling tickets own deep impl)
* pkg/agentic/pipeline_*_test.go — AX-10 per handler
* tests/cli/pipeline/Taskfile.yaml — CLI smoke coverage

go test could not complete in this sandbox due to wider workspace
go.sum/private-module issues outside this ticket; supervisor catches
in clean workspace.

Co-authored-by: Codex <noreply@openai.com>
Closes tasks.lthn.sh/view.php?id=535
This commit is contained in:
Snider 2026-04-25 23:25:35 +01:00
parent b29d20f562
commit 53b46c33da
18 changed files with 3260 additions and 0 deletions

View file

@ -95,6 +95,7 @@ func (s *PrepSubsystem) registerCommands(ctx context.Context) {
s.registerSprintCommands()
s.registerStateCommands()
s.registerCoreCommands()
s.registerPipelineCommands()
s.registerLanguageCommands()
s.registerSetupCommands()
}

View file

@ -0,0 +1,452 @@
// SPDX-License-Identifier: EUPL-1.2
package agentic
import (
"context"
"regexp"
core "dappco.re/go/core"
)
var pipelineBulletPattern = regexp.MustCompile(`^\s*(?:[-*]|\d+\.)\s+(.*)$`)
var pipelineWhitespacePattern = regexp.MustCompile(`\s+`)
type PipelineIssueRef struct {
Number int `json:"number"`
Title string `json:"title"`
State string `json:"state,omitempty"`
URL string `json:"url,omitempty"`
Labels []string `json:"labels,omitempty"`
}
type PipelineAuditInput struct {
Org string `json:"org,omitempty"`
Repo string `json:"repo"`
DryRun bool `json:"dry_run,omitempty"`
}
type PipelineAuditOutput struct {
Success bool `json:"success"`
Org string `json:"org,omitempty"`
Repo string `json:"repo"`
Audits []PipelineIssueRef `json:"audits"`
Created []PipelineIssueRef `json:"created"`
Existing []PipelineIssueRef `json:"existing"`
Closed []int `json:"closed,omitempty"`
}
type pipelineRepoRecord struct {
Name string `json:"name"`
DefaultBranch string `json:"default_branch"`
HTMLURL string `json:"html_url"`
}
type pipelineLabelRecord struct {
ID int64 `json:"id"`
Name string `json:"name"`
}
type pipelineIssueRecord struct {
Number int `json:"number"`
Title string `json:"title"`
Body string `json:"body"`
State string `json:"state"`
HTMLURL string `json:"html_url"`
Labels []pipelineLabelRecord `json:"labels"`
PullRequest map[string]any `json:"pull_request"`
}
func (s *PrepSubsystem) cmdPipelineAudit(options core.Options) core.Result {
ctx := s.commandContext()
org := pipelineOrgValue(options)
repo := pipelineRepoValue(options)
if repo == "" {
core.Print(nil, "usage: core-agent pipeline/audit <repo> [--org=core] [--dry-run]")
return core.Result{Value: core.E("agentic.cmdPipelineAudit", "repo is required", nil), OK: false}
}
output, err := s.pipelineAudit(ctx, PipelineAuditInput{
Org: org,
Repo: repo,
DryRun: optionBoolValue(options, "dry_run", "dry-run"),
})
if err != nil {
core.Print(nil, "error: %v", err)
return core.Result{Value: err, OK: false}
}
core.Print(nil, "repo: %s/%s", output.Org, output.Repo)
core.Print(nil, "audits: %d", len(output.Audits))
core.Print(nil, "created: %d", len(output.Created))
core.Print(nil, "existing: %d", len(output.Existing))
if len(output.Closed) > 0 {
core.Print(nil, "closed: %d", len(output.Closed))
}
for _, issue := range output.Created {
core.Print(nil, " created: #%d %s", issue.Number, issue.Title)
}
for _, issue := range output.Existing {
core.Print(nil, " existing: #%d %s", issue.Number, issue.Title)
}
if len(output.Audits) == 0 {
core.Print(nil, "no audit issues")
}
return core.Result{Value: output, OK: true}
}
func (s *PrepSubsystem) pipelineAudit(ctx context.Context, input PipelineAuditInput) (PipelineAuditOutput, error) {
if input.Repo == "" {
return PipelineAuditOutput{}, core.E("pipelineAudit", "repo is required", nil)
}
if s.forgeToken == "" {
return PipelineAuditOutput{}, core.E("pipelineAudit", "no Forge token configured", nil)
}
if input.Org == "" {
input.Org = "core"
}
issues, err := s.pipelineListIssues(ctx, input.Org, input.Repo, "open")
if err != nil {
return PipelineAuditOutput{}, err
}
output := PipelineAuditOutput{
Success: true,
Org: input.Org,
Repo: input.Repo,
}
existingByTitle := make(map[string]PipelineIssueRef)
for _, issue := range issues {
if pipelineIssueState(issue) != "open" || pipelineIssueIsAudit(issue) || pipelineIssueIsEpic(issue) {
continue
}
key := pipelineAuditExistingKey(issue)
if key == "" {
continue
}
existingByTitle[key] = pipelineIssueRefFromRecord(issue)
}
for _, issue := range issues {
if !pipelineIssueIsAudit(issue) {
continue
}
output.Audits = append(output.Audits, pipelineIssueRefFromRecord(issue))
findings := pipelineAuditFindings(issue)
if len(findings) == 0 {
findings = []string{issue.Title}
}
linked := make([]PipelineIssueRef, 0, len(findings))
for _, finding := range findings {
title := pipelineAuditImplementationTitle(input.Repo, issue, finding)
key := core.Concat(core.Sprint(issue.Number), ":", title)
if existing, ok := existingByTitle[key]; ok {
output.Existing = append(output.Existing, existing)
linked = append(linked, existing)
continue
}
labels := pipelineAuditLabels(issue)
body := pipelineAuditImplementationBody(issue, finding)
if input.DryRun {
ref := PipelineIssueRef{Title: title, State: "planned", Labels: labels}
output.Created = append(output.Created, ref)
linked = append(linked, ref)
continue
}
labelIDs := s.resolveLabelIDs(ctx, input.Org, input.Repo, labels)
created, createErr := s.createIssue(ctx, input.Org, input.Repo, title, body, labelIDs)
if createErr != nil {
return PipelineAuditOutput{}, core.E("pipelineAudit", core.Concat("failed to create implementation issue for audit #", core.Sprint(issue.Number)), createErr)
}
ref := PipelineIssueRef{
Number: created.Number,
Title: created.Title,
URL: created.URL,
State: "open",
Labels: labels,
}
existingByTitle[key] = ref
output.Created = append(output.Created, ref)
linked = append(linked, ref)
}
if input.DryRun || len(linked) == 0 {
continue
}
s.commentOnIssue(ctx, input.Org, input.Repo, issue.Number, pipelineAuditLinkedComment(linked))
if closeErr := s.pipelinePatchIssue(ctx, input.Org, input.Repo, issue.Number, map[string]any{"state": "closed"}); closeErr != nil {
return PipelineAuditOutput{}, closeErr
}
output.Closed = append(output.Closed, issue.Number)
}
return output, nil
}
func (s *PrepSubsystem) pipelineListOrgRepos(ctx context.Context, org string) ([]pipelineRepoRecord, error) {
url := core.Sprintf("%s/api/v1/orgs/%s/repos?limit=100&page=1", s.forgeURL, org)
result := HTTPGet(ctx, url, s.forgeToken, "token")
if !result.OK {
return nil, core.E("pipelineListOrgRepos", core.Concat("failed to list repos for ", org), nil)
}
var repos []pipelineRepoRecord
if err := pipelineDecodeJSON(result.Value.(string), &repos, "pipelineListOrgRepos", "failed to decode repo list"); err != nil {
return nil, err
}
return repos, nil
}
func (s *PrepSubsystem) pipelineGetRepo(ctx context.Context, org, repo string) (pipelineRepoRecord, error) {
url := core.Sprintf("%s/api/v1/repos/%s/%s", s.forgeURL, org, repo)
result := HTTPGet(ctx, url, s.forgeToken, "token")
if !result.OK {
return pipelineRepoRecord{}, core.E("pipelineGetRepo", core.Concat("failed to read repo ", repo), nil)
}
var record pipelineRepoRecord
if err := pipelineDecodeJSON(result.Value.(string), &record, "pipelineGetRepo", "failed to decode repo"); err != nil {
return pipelineRepoRecord{}, err
}
return record, nil
}
func (s *PrepSubsystem) pipelineListIssues(ctx context.Context, org, repo, state string) ([]pipelineIssueRecord, error) {
if state == "" {
state = "open"
}
url := core.Sprintf("%s/api/v1/repos/%s/%s/issues?state=%s&limit=100&type=issues", s.forgeURL, org, repo, state)
result := HTTPGet(ctx, url, s.forgeToken, "token")
if !result.OK {
return nil, core.E("pipelineListIssues", core.Concat("failed to list issues for ", repo), nil)
}
var issues []pipelineIssueRecord
if err := pipelineDecodeJSON(result.Value.(string), &issues, "pipelineListIssues", "failed to decode issue list"); err != nil {
return nil, err
}
return issues, nil
}
func (s *PrepSubsystem) pipelineGetIssue(ctx context.Context, org, repo string, number int) (pipelineIssueRecord, error) {
url := core.Sprintf("%s/api/v1/repos/%s/%s/issues/%d", s.forgeURL, org, repo, number)
result := HTTPGet(ctx, url, s.forgeToken, "token")
if !result.OK {
return pipelineIssueRecord{}, core.E("pipelineGetIssue", core.Concat("failed to read issue #", core.Sprint(number)), nil)
}
var issue pipelineIssueRecord
if err := pipelineDecodeJSON(result.Value.(string), &issue, "pipelineGetIssue", "failed to decode issue"); err != nil {
return pipelineIssueRecord{}, err
}
return issue, nil
}
func (s *PrepSubsystem) pipelinePatchIssue(ctx context.Context, org, repo string, number int, payload map[string]any) error {
url := core.Sprintf("%s/api/v1/repos/%s/%s/issues/%d", s.forgeURL, org, repo, number)
result := HTTPPatch(ctx, url, core.JSONMarshalString(payload), s.forgeToken, "token")
if !result.OK {
return core.E("pipelinePatchIssue", core.Concat("failed to update issue #", core.Sprint(number)), nil)
}
return nil
}
func pipelineDecodeJSON(data string, target any, errorName, message string) error {
parseResult := core.JSONUnmarshalString(data, target)
if !parseResult.OK {
err, _ := parseResult.Value.(error)
return core.E(errorName, message, err)
}
return nil
}
func pipelineIssueRefFromRecord(issue pipelineIssueRecord) PipelineIssueRef {
return PipelineIssueRef{
Number: issue.Number,
Title: issue.Title,
State: pipelineIssueState(issue),
URL: issue.HTMLURL,
Labels: pipelineIssueLabelNames(issue),
}
}
func pipelineIssueState(issue pipelineIssueRecord) string {
state := core.Lower(core.Trim(issue.State))
if state == "" {
return "open"
}
return state
}
func pipelineIssueLabelNames(issue pipelineIssueRecord) []string {
names := make([]string, 0, len(issue.Labels))
for _, label := range issue.Labels {
if label.Name != "" {
names = append(names, label.Name)
}
}
return names
}
func pipelineIssueHasLabel(issue pipelineIssueRecord, want string) bool {
for _, name := range pipelineIssueLabelNames(issue) {
if core.Lower(name) == core.Lower(want) {
return true
}
}
return false
}
func pipelineIssueIsAudit(issue pipelineIssueRecord) bool {
title := core.Lower(issue.Title)
return pipelineIssueHasLabel(issue, "audit") || core.Contains(title, "[audit]") || core.HasPrefix(title, "audit:")
}
func pipelineIssueIsEpic(issue pipelineIssueRecord) bool {
return pipelineIssueHasLabel(issue, "epic") || regexp.MustCompile(`(?m)^\s*-\s*\[[ xX]\]\s*#\d+`).MatchString(issue.Body)
}
func pipelineIssueIsImplementationCandidate(issue pipelineIssueRecord) bool {
if pipelineIssueState(issue) != "open" || pipelineIssueIsAudit(issue) || pipelineIssueIsEpic(issue) {
return false
}
if len(issue.PullRequest) > 0 {
return false
}
return !core.Contains(issue.Body, "Parent: #")
}
func pipelineAuditFindings(issue pipelineIssueRecord) []string {
lines := core.Split(issue.Body, "\n")
findings := []string{}
for _, line := range lines {
trimmed := core.Trim(line)
if trimmed == "" || core.HasPrefix(trimmed, "#") {
continue
}
match := pipelineBulletPattern.FindStringSubmatch(trimmed)
if len(match) == 2 {
findings = append(findings, pipelineFindingSummary(match[1]))
}
}
if len(findings) > 0 {
return findings
}
paragraphs := core.Split(issue.Body, "\n\n")
for _, paragraph := range paragraphs {
summary := pipelineFindingSummary(paragraph)
if summary == "" || summary == issue.Title {
continue
}
findings = append(findings, summary)
break
}
return findings
}
func pipelineFindingSummary(value string) string {
summary := pipelineWhitespacePattern.ReplaceAllString(core.Trim(value), " ")
summary = regexp.MustCompile("`").ReplaceAllString(summary, "")
if summary == "" {
return ""
}
if len(summary) > 96 {
summary = core.Concat(summary[:93], "...")
}
return summary
}
func pipelineAuditIssueType(issue pipelineIssueRecord) string {
haystack := core.Lower(core.Concat(issue.Title, " ", issue.Body, " ", core.JSONMarshalString(pipelineIssueLabelNames(issue))))
switch {
case core.Contains(haystack, "security"), core.Contains(haystack, "owasp"), core.Contains(haystack, "auth"), core.Contains(haystack, "sanitize"), core.Contains(haystack, "validation"):
return "security"
case core.Contains(haystack, "test"), core.Contains(haystack, "coverage"):
return "testing"
case core.Contains(haystack, "perf"), core.Contains(haystack, "performance"):
return "performance"
case core.Contains(haystack, "doc"):
return "docs"
default:
return "quality"
}
}
func pipelineAuditSeverity(issue pipelineIssueRecord) string {
haystack := core.Lower(core.Concat(issue.Title, " ", issue.Body))
switch {
case core.Contains(haystack, "critical"):
return "critical"
case core.Contains(haystack, "high"):
return "high"
case core.Contains(haystack, "medium"):
return "medium"
case core.Contains(haystack, "low"):
return "low"
default:
return ""
}
}
func pipelineAuditLabels(issue pipelineIssueRecord) []string {
labels := []string{"agentic", pipelineAuditIssueType(issue)}
if severity := pipelineAuditSeverity(issue); severity != "" {
labels = append(labels, severity)
}
return labels
}
func pipelineAuditImplementationTitle(repo string, issue pipelineIssueRecord, finding string) string {
issueType := pipelineAuditIssueType(issue)
summary := pipelineFindingSummary(finding)
if summary == "" {
summary = issue.Title
}
return core.Sprintf("%s(%s): %s", issueType, repo, summary)
}
func pipelineAuditImplementationBody(issue pipelineIssueRecord, finding string) string {
builder := core.NewBuilder()
builder.WriteString(core.Sprintf("Parent audit: #%d\n\n", issue.Number))
builder.WriteString("## Finding\n\n")
builder.WriteString(pipelineFindingSummary(finding))
builder.WriteString("\n\n## Acceptance Criteria\n\n")
builder.WriteString("- [ ] Implement the audit finding\n")
builder.WriteString("- [ ] Update tests or validation where needed\n")
builder.WriteString("- [ ] Link the resulting PR back to this issue\n")
return builder.String()
}
func pipelineAuditExistingKey(issue pipelineIssueRecord) string {
match := regexp.MustCompile(`Parent audit:\s*#(\d+)`).FindStringSubmatch(issue.Body)
if len(match) != 2 {
return ""
}
return core.Concat(match[1], ":", issue.Title)
}
func pipelineAuditLinkedComment(linked []PipelineIssueRef) string {
builder := core.NewBuilder()
builder.WriteString("Implementation issues created:\n")
for _, issue := range linked {
if issue.Number > 0 {
builder.WriteString(core.Sprintf("- #%d %s\n", issue.Number, issue.Title))
continue
}
builder.WriteString(core.Sprintf("- %s\n", issue.Title))
}
return builder.String()
}

View file

@ -0,0 +1,74 @@
// SPDX-License-Identifier: EUPL-1.2
package agentic
import (
"context"
"testing"
core "dappco.re/go/core"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestPipelineAudit_Good_CreatesImplementationIssuesAndClosesAudit(t *testing.T) {
repo := newPipelineTestRepo()
repo.Issues[1] = &pipelineTestIssue{
Number: 1,
Title: "[Audit] Security",
Body: "- Validate tokens\n- Sanitize input\n- Add rate limiting",
State: "open",
Labels: []string{"audit", "security"},
}
srv := newPipelineTestServer(t, map[string]*pipelineTestRepo{"go-io": repo})
s, _ := testPrepWithCore(t, srv)
output, err := s.pipelineAudit(context.Background(), PipelineAuditInput{Org: "core", Repo: "go-io"})
require.NoError(t, err)
assert.True(t, output.Success)
assert.Len(t, output.Audits, 1)
assert.Len(t, output.Created, 3)
assert.Equal(t, []int{1}, output.Closed)
assert.Equal(t, "closed", repo.Issues[1].State)
assert.Len(t, repo.Comments[1], 1)
assert.Contains(t, repo.Comments[1][0], "Implementation issues created")
}
func TestPipelineAudit_Bad_MissingRepo(t *testing.T) {
s, _ := testPrepWithCore(t, nil)
result := s.cmdPipelineAudit(core.NewOptions())
require.False(t, result.OK)
err, ok := result.Value.(error)
require.True(t, ok)
assert.Contains(t, err.Error(), "repo is required")
}
func TestPipelineAudit_Ugly_DeduplicatesExistingImplementationIssue(t *testing.T) {
repo := newPipelineTestRepo()
repo.Issues[1] = &pipelineTestIssue{
Number: 1,
Title: "[Audit] Security",
Body: "- Validate tokens\n- Sanitize input",
State: "open",
Labels: []string{"audit", "security"},
}
repo.Issues[2] = &pipelineTestIssue{
Number: 2,
Title: "security(go-io): Validate tokens",
Body: "Parent audit: #1",
State: "open",
Labels: []string{"agentic", "security"},
}
srv := newPipelineTestServer(t, map[string]*pipelineTestRepo{"go-io": repo})
s, _ := testPrepWithCore(t, srv)
output, err := s.pipelineAudit(context.Background(), PipelineAuditInput{Org: "core", Repo: "go-io"})
require.NoError(t, err)
assert.Len(t, output.Existing, 1)
assert.Len(t, output.Created, 1)
assert.Equal(t, 2, output.Existing[0].Number)
}

View file

@ -0,0 +1,23 @@
// SPDX-License-Identifier: EUPL-1.2
package agentic
import core "dappco.re/go/core"
func (s *PrepSubsystem) cmdPipelineBudgetPlan(_ core.Options) core.Result {
core.Print(nil, "status: not yet implemented")
core.Print(nil, "reason: blocked-on-sibling")
return core.Result{
Value: core.E("agentic.cmdPipelineBudgetPlan", "not yet implemented - blocked-on-sibling", nil),
OK: false,
}
}
func (s *PrepSubsystem) cmdPipelineBudgetLog(_ core.Options) core.Result {
core.Print(nil, "status: not yet implemented")
core.Print(nil, "reason: blocked-on-sibling")
return core.Result{
Value: core.E("agentic.cmdPipelineBudgetLog", "not yet implemented - blocked-on-sibling", nil),
OK: false,
}
}

View file

@ -0,0 +1,34 @@
// SPDX-License-Identifier: EUPL-1.2
package agentic
import (
"testing"
core "dappco.re/go/core"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestPipelineBudget_Good_RootHelp(t *testing.T) {
s, _ := testPrepWithCore(t, nil)
output := captureStdout(t, func() {
result := s.cmdPipelineBudget(core.NewOptions())
require.True(t, result.OK)
})
assert.Contains(t, output, "core-agent pipeline/budget/plan")
assert.Contains(t, output, "core-agent pipeline/budget/log")
}
func TestPipelineBudget_Bad_PlanBlockedOnSibling(t *testing.T) {
s, _ := testPrepWithCore(t, nil)
result := s.cmdPipelineBudgetPlan(core.NewOptions())
require.False(t, result.OK)
err, ok := result.Value.(error)
require.True(t, ok)
assert.Contains(t, err.Error(), "blocked-on-sibling")
}

View file

@ -0,0 +1,283 @@
// SPDX-License-Identifier: EUPL-1.2
package agentic
import (
"regexp"
"unicode"
core "dappco.re/go/core"
)
var pipelineNumberPattern = regexp.MustCompile(`^[0-9]+$`)
func (s *PrepSubsystem) registerPipelineCommands() {
c := s.Core()
c.Command("pipeline", core.Command{Description: "Run the agent pipeline command tree", Action: s.cmdPipeline})
c.Command("agentic:pipeline", core.Command{Description: "Run the agent pipeline command tree", Action: s.cmdPipeline})
c.Command("pipeline/audit", core.Command{Description: "Stage 1: audit issues into implementation work", Action: s.cmdPipelineAudit})
c.Command("agentic:pipeline/audit", core.Command{Description: "Stage 1: audit issues into implementation work", Action: s.cmdPipelineAudit})
c.Command("pipeline/epic", core.Command{Description: "Stage 2 and 3 epic orchestration commands", Action: s.cmdPipelineEpic})
c.Command("agentic:pipeline/epic", core.Command{Description: "Stage 2 and 3 epic orchestration commands", Action: s.cmdPipelineEpic})
c.Command("pipeline/epic/create", core.Command{Description: "Group implementation issues into epics", Action: s.cmdPipelineEpicCreate})
c.Command("agentic:pipeline/epic/create", core.Command{Description: "Group implementation issues into epics", Action: s.cmdPipelineEpicCreate})
c.Command("pipeline/epic/run", core.Command{Description: "Dispatch and monitor an epic", Action: s.cmdPipelineEpicRun})
c.Command("agentic:pipeline/epic/run", core.Command{Description: "Dispatch and monitor an epic", Action: s.cmdPipelineEpicRun})
c.Command("pipeline/epic/status", core.Command{Description: "Show epic progress", Action: s.cmdPipelineEpicStatus})
c.Command("agentic:pipeline/epic/status", core.Command{Description: "Show epic progress", Action: s.cmdPipelineEpicStatus})
c.Command("pipeline/epic/sync", core.Command{Description: "Sync epic checklist state from child issues", Action: s.cmdPipelineEpicSync})
c.Command("agentic:pipeline/epic/sync", core.Command{Description: "Sync epic checklist state from child issues", Action: s.cmdPipelineEpicSync})
c.Command("pipeline/monitor", core.Command{Description: "Watch open PRs and auto-intervene", Action: s.cmdPipelineMonitor})
c.Command("agentic:pipeline/monitor", core.Command{Description: "Watch open PRs and auto-intervene", Action: s.cmdPipelineMonitor})
c.Command("pipeline/fix", core.Command{Description: "Pipeline fix-up commands", Action: s.cmdPipelineFix})
c.Command("agentic:pipeline/fix", core.Command{Description: "Pipeline fix-up commands", Action: s.cmdPipelineFix})
c.Command("pipeline/fix/reviews", core.Command{Description: "Ask the agent to fix code reviews on a pull request", Action: s.cmdPipelineFixReviews})
c.Command("agentic:pipeline/fix/reviews", core.Command{Description: "Ask the agent to fix code reviews on a pull request", Action: s.cmdPipelineFixReviews})
c.Command("pipeline/fix/conflicts", core.Command{Description: "Ask the agent to fix a merge conflict on a pull request", Action: s.cmdPipelineFixConflicts})
c.Command("agentic:pipeline/fix/conflicts", core.Command{Description: "Ask the agent to fix a merge conflict on a pull request", Action: s.cmdPipelineFixConflicts})
c.Command("pipeline/fix/format", core.Command{Description: "Apply formatting-only fixes in a workspace or repo checkout", Action: s.cmdPipelineFixFormat})
c.Command("agentic:pipeline/fix/format", core.Command{Description: "Apply formatting-only fixes in a workspace or repo checkout", Action: s.cmdPipelineFixFormat})
c.Command("pipeline/fix/threads", core.Command{Description: "Handle review-thread follow-up for a pull request", Action: s.cmdPipelineFixThreads})
c.Command("agentic:pipeline/fix/threads", core.Command{Description: "Handle review-thread follow-up for a pull request", Action: s.cmdPipelineFixThreads})
c.Command("pipeline/onboard", core.Command{Description: "Run audit, epic creation, and dispatch onboarding for a repo", Action: s.cmdPipelineOnboard})
c.Command("agentic:pipeline/onboard", core.Command{Description: "Run audit, epic creation, and dispatch onboarding for a repo", Action: s.cmdPipelineOnboard})
c.Command("pipeline/budget", core.Command{Description: "Budget planning commands", Action: s.cmdPipelineBudget})
c.Command("agentic:pipeline/budget", core.Command{Description: "Budget planning commands", Action: s.cmdPipelineBudget})
c.Command("pipeline/budget/plan", core.Command{Description: "Show daily dispatch budget planning", Action: s.cmdPipelineBudgetPlan})
c.Command("agentic:pipeline/budget/plan", core.Command{Description: "Show daily dispatch budget planning", Action: s.cmdPipelineBudgetPlan})
c.Command("pipeline/budget/log", core.Command{Description: "Append a dispatch event to the budget journal", Action: s.cmdPipelineBudgetLog})
c.Command("agentic:pipeline/budget/log", core.Command{Description: "Append a dispatch event to the budget journal", Action: s.cmdPipelineBudgetLog})
c.Command("pipeline/training", core.Command{Description: "Training journal commands", Action: s.cmdPipelineTraining})
c.Command("agentic:pipeline/training", core.Command{Description: "Training journal commands", Action: s.cmdPipelineTraining})
c.Command("pipeline/training/capture", core.Command{Description: "Capture a merged pull request for training", Action: s.cmdPipelineTrainingCapture})
c.Command("agentic:pipeline/training/capture", core.Command{Description: "Capture a merged pull request for training", Action: s.cmdPipelineTrainingCapture})
c.Command("pipeline/training/stats", core.Command{Description: "Summarise training journal data", Action: s.cmdPipelineTrainingStats})
c.Command("agentic:pipeline/training/stats", core.Command{Description: "Summarise training journal data", Action: s.cmdPipelineTrainingStats})
c.Command("pipeline/training/export", core.Command{Description: "Export training journal data", Action: s.cmdPipelineTrainingExport})
c.Command("agentic:pipeline/training/export", core.Command{Description: "Export training journal data", Action: s.cmdPipelineTrainingExport})
}
func (s *PrepSubsystem) cmdPipeline(options core.Options) core.Result {
switch action := optionStringValue(options, "action", "_arg"); action {
case "", "help":
printPipelineUsage()
return core.Result{OK: true}
case "audit":
return s.cmdPipelineAudit(options)
case "epic":
return s.cmdPipelineEpic(options)
case "monitor":
return s.cmdPipelineMonitor(options)
case "fix":
return s.cmdPipelineFix(options)
case "onboard":
return s.cmdPipelineOnboard(options)
case "budget":
return s.cmdPipelineBudget(options)
case "training":
return s.cmdPipelineTraining(options)
default:
printPipelineUsage()
return core.Result{Value: core.E("agentic.cmdPipeline", core.Concat("unknown pipeline command: ", action), nil), OK: false}
}
}
func (s *PrepSubsystem) cmdPipelineEpic(options core.Options) core.Result {
switch action := optionStringValue(options, "action", "_arg"); action {
case "", "help":
printPipelineEpicUsage()
return core.Result{OK: true}
case "create":
return s.cmdPipelineEpicCreate(options)
case "run":
return s.cmdPipelineEpicRun(options)
case "status":
return s.cmdPipelineEpicStatus(options)
case "sync":
return s.cmdPipelineEpicSync(options)
default:
printPipelineEpicUsage()
return core.Result{Value: core.E("agentic.cmdPipelineEpic", core.Concat("unknown pipeline epic command: ", action), nil), OK: false}
}
}
func (s *PrepSubsystem) cmdPipelineFix(options core.Options) core.Result {
switch action := optionStringValue(options, "action", "_arg"); action {
case "", "help":
printPipelineFixUsage()
return core.Result{OK: true}
case "reviews":
return s.cmdPipelineFixReviews(options)
case "conflicts":
return s.cmdPipelineFixConflicts(options)
case "format":
return s.cmdPipelineFixFormat(options)
case "threads":
return s.cmdPipelineFixThreads(options)
default:
printPipelineFixUsage()
return core.Result{Value: core.E("agentic.cmdPipelineFix", core.Concat("unknown pipeline fix command: ", action), nil), OK: false}
}
}
func (s *PrepSubsystem) cmdPipelineBudget(options core.Options) core.Result {
switch action := optionStringValue(options, "action", "_arg"); action {
case "", "help":
printPipelineBudgetUsage()
return core.Result{OK: true}
case "plan":
return s.cmdPipelineBudgetPlan(options)
case "log":
return s.cmdPipelineBudgetLog(options)
default:
printPipelineBudgetUsage()
return core.Result{Value: core.E("agentic.cmdPipelineBudget", core.Concat("unknown pipeline budget command: ", action), nil), OK: false}
}
}
func (s *PrepSubsystem) cmdPipelineTraining(options core.Options) core.Result {
switch action := optionStringValue(options, "action", "_arg"); action {
case "", "help":
printPipelineTrainingUsage()
return core.Result{OK: true}
case "capture":
return s.cmdPipelineTrainingCapture(options)
case "stats":
return s.cmdPipelineTrainingStats(options)
case "export":
return s.cmdPipelineTrainingExport(options)
default:
printPipelineTrainingUsage()
return core.Result{Value: core.E("agentic.cmdPipelineTraining", core.Concat("unknown pipeline training command: ", action), nil), OK: false}
}
}
func pipelineOrgValue(options core.Options) string {
org := optionStringValue(options, "org")
if org == "" {
org = "core"
}
return org
}
func pipelineRepoValue(options core.Options) string {
repo := optionStringValue(options, "repo")
if repo != "" {
return repo
}
arg := optionStringValue(options, "_arg")
if arg == "" || pipelineNumberPattern.MatchString(core.Trim(arg)) {
return ""
}
return arg
}
func pipelineRepoAndNumberValue(options core.Options) (string, string, int) {
org := pipelineOrgValue(options)
repo := pipelineRepoValue(options)
number := optionIntValue(options, "number")
if number == 0 {
arg := optionStringValue(options, "_arg")
if pipelineNumberPattern.MatchString(core.Trim(arg)) {
number = parseIntString(arg)
}
}
return org, repo, number
}
func pipelineWorkspaceDir(options core.Options) string {
workspace := optionStringValue(options, "workspace")
if workspace != "" {
return core.JoinPath(WorkspaceRoot(), workspace, "repo")
}
return optionStringValue(options, "repo_dir", "repo-dir", "path")
}
func pipelineSlug(value string) string {
normalised := core.Lower(core.Trim(value))
if normalised == "" {
return "pipeline"
}
lastDash := false
out := make([]rune, 0, len(normalised))
for _, r := range normalised {
if unicode.IsLetter(r) || unicode.IsDigit(r) {
out = append(out, r)
lastDash = false
continue
}
if lastDash {
continue
}
out = append(out, '-')
lastDash = true
}
slug := string(out)
for len(slug) > 0 && slug[0] == '-' {
slug = slug[1:]
}
for len(slug) > 0 && slug[len(slug)-1] == '-' {
slug = slug[:len(slug)-1]
}
if slug == "" {
return "pipeline"
}
return slug
}
func pipelineDisplayTheme(theme string) string {
switch theme {
case "security":
return "Security"
case "testing":
return "Testing"
case "docs":
return "Docs"
case "performance":
return "Performance"
case "features":
return "Features"
default:
return "Quality"
}
}
func printPipelineUsage() {
core.Print(nil, "usage: core-agent pipeline [audit|epic|monitor|fix|onboard|budget|training]")
core.Print(nil, " core-agent pipeline/audit <repo> [--org=core] [--dry-run]")
core.Print(nil, " core-agent pipeline/epic/create <repo> [--org=core] [--dry-run]")
core.Print(nil, " core-agent pipeline/epic/run <epic-number> --repo=<repo> [--agent=codex] [--dry-run]")
core.Print(nil, " core-agent pipeline/monitor [<repo>] [--org=core] [--dry-run]")
core.Print(nil, " core-agent pipeline/fix/reviews <pr-number> --repo=<repo> [--org=core] [--dry-run]")
core.Print(nil, " core-agent pipeline/onboard <repo> [--org=core] [--agent=codex] [--dry-run]")
}
func printPipelineEpicUsage() {
core.Print(nil, "usage: core-agent pipeline/epic/create <repo> [--org=core] [--theme=security] [--dry-run]")
core.Print(nil, " core-agent pipeline/epic/run <epic-number> --repo=<repo> [--org=core] [--agent=codex] [--dry-run]")
core.Print(nil, " core-agent pipeline/epic/status <epic-number> --repo=<repo> [--org=core]")
core.Print(nil, " core-agent pipeline/epic/sync <epic-number> --repo=<repo> [--org=core] [--dry-run]")
}
func printPipelineFixUsage() {
core.Print(nil, "usage: core-agent pipeline/fix/reviews <pr-number> --repo=<repo> [--org=core] [--dry-run]")
core.Print(nil, " core-agent pipeline/fix/conflicts <pr-number> --repo=<repo> [--org=core] [--dry-run]")
core.Print(nil, " core-agent pipeline/fix/format <pr-number> --repo=<repo> [--org=core] [--workspace=<name>|--repo-dir=<path>] [--commit] [--push] [--dry-run]")
core.Print(nil, " core-agent pipeline/fix/threads <pr-number> --repo=<repo> [--org=core] [--dry-run]")
}
func printPipelineBudgetUsage() {
core.Print(nil, "usage: core-agent pipeline/budget/plan")
core.Print(nil, " core-agent pipeline/budget/log")
}
func printPipelineTrainingUsage() {
core.Print(nil, "usage: core-agent pipeline/training/capture <pr-number> --repo=<repo> [--org=core]")
core.Print(nil, " core-agent pipeline/training/stats")
core.Print(nil, " core-agent pipeline/training/export")
}

View file

@ -0,0 +1,452 @@
// SPDX-License-Identifier: EUPL-1.2
package agentic
import (
"net/http"
"net/http/httptest"
"sort"
"testing"
core "dappco.re/go/core"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestPipelineCommands_RegisterPipelineCommands_Good(t *testing.T) {
s, c := testPrepWithCore(t, nil)
s.registerPipelineCommands()
commands := c.Commands()
assert.Contains(t, commands, "pipeline")
assert.Contains(t, commands, "pipeline/audit")
assert.Contains(t, commands, "pipeline/epic")
assert.Contains(t, commands, "pipeline/epic/create")
assert.Contains(t, commands, "pipeline/epic/run")
assert.Contains(t, commands, "pipeline/epic/status")
assert.Contains(t, commands, "pipeline/epic/sync")
assert.Contains(t, commands, "pipeline/monitor")
assert.Contains(t, commands, "pipeline/fix")
assert.Contains(t, commands, "pipeline/fix/reviews")
assert.Contains(t, commands, "pipeline/fix/conflicts")
assert.Contains(t, commands, "pipeline/fix/format")
assert.Contains(t, commands, "pipeline/fix/threads")
assert.Contains(t, commands, "pipeline/onboard")
assert.Contains(t, commands, "pipeline/budget")
assert.Contains(t, commands, "pipeline/budget/plan")
assert.Contains(t, commands, "pipeline/budget/log")
assert.Contains(t, commands, "pipeline/training")
assert.Contains(t, commands, "pipeline/training/capture")
assert.Contains(t, commands, "pipeline/training/stats")
assert.Contains(t, commands, "pipeline/training/export")
}
func TestPipelineCommands_CmdPipeline_Good_Help(t *testing.T) {
s, _ := testPrepWithCore(t, nil)
output := captureStdout(t, func() {
result := s.cmdPipeline(core.NewOptions())
require.True(t, result.OK)
})
assert.Contains(t, output, "core-agent pipeline/audit <repo>")
assert.Contains(t, output, "core-agent pipeline/epic/create <repo>")
assert.Contains(t, output, "core-agent pipeline/onboard <repo>")
}
func TestPipelineCommands_CmdPipeline_Bad_UnknownCommand(t *testing.T) {
s, _ := testPrepWithCore(t, nil)
result := s.cmdPipeline(core.NewOptions(core.Option{Key: "_arg", Value: "unknown"}))
require.False(t, result.OK)
err, ok := result.Value.(error)
require.True(t, ok)
assert.Contains(t, err.Error(), "unknown pipeline command")
}
type pipelineTestIssue struct {
Number int
Title string
Body string
State string
Labels []string
}
type pipelineTestPR struct {
Number int
Title string
State string
Merged bool
Mergeable *bool
MergeableState string
HeadRef string
HeadSHA string
BaseRef string
ReviewThreadsTotal int
ReviewThreadsResolved int
ReviewThreadsUnresolved int
ReviewComments int
Comments int
Reactions map[string]int
Statuses []map[string]any
}
type pipelineTestRepo struct {
DefaultBranch string
RefSHA string
Labels map[string]int64
Issues map[int]*pipelineTestIssue
Pulls map[int]*pipelineTestPR
Comments map[int][]string
Branches map[string]string
Merged []int
}
func newPipelineTestRepo() *pipelineTestRepo {
return &pipelineTestRepo{
DefaultBranch: "dev",
RefSHA: "deadbeef",
Labels: map[string]int64{
"agentic": 1,
"audit": 2,
"security": 3,
"quality": 4,
"testing": 5,
"performance": 6,
"docs": 7,
"epic": 8,
"critical": 9,
"high": 10,
"medium": 11,
"low": 12,
},
Issues: map[int]*pipelineTestIssue{},
Pulls: map[int]*pipelineTestPR{},
Comments: map[int][]string{},
Branches: map[string]string{"dev": "deadbeef"},
Merged: []int{},
}
}
func newPipelineTestServer(t *testing.T, repos map[string]*pipelineTestRepo) *httptest.Server {
t.Helper()
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
path := r.URL.Path
if len(path) > 0 && path[0] == '/' {
path = path[1:]
}
parts := core.Split(path, "/")
if len(parts) == 5 && parts[0] == "api" && parts[1] == "v1" && parts[2] == "orgs" && parts[4] == "repos" && r.Method == http.MethodGet {
org := parts[3]
_ = org
names := make([]string, 0, len(repos))
for name := range repos {
names = append(names, name)
}
sort.Strings(names)
out := []map[string]any{}
for _, name := range names {
repo := repos[name]
out = append(out, map[string]any{
"name": name,
"default_branch": repo.DefaultBranch,
"html_url": "https://forge.test/core/" + name,
})
}
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(core.JSONMarshalString(out)))
return
}
if len(parts) < 5 || parts[0] != "api" || parts[1] != "v1" || parts[2] != "repos" {
w.WriteHeader(http.StatusNotFound)
return
}
repoName := parts[4]
repo, ok := repos[repoName]
if !ok {
w.WriteHeader(http.StatusNotFound)
return
}
if len(parts) == 5 && r.Method == http.MethodGet {
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(core.JSONMarshalString(map[string]any{
"name": repoName,
"default_branch": repo.DefaultBranch,
"html_url": "https://forge.test/core/" + repoName,
})))
return
}
if len(parts) >= 6 && parts[5] == "labels" {
switch r.Method {
case http.MethodGet:
names := make([]string, 0, len(repo.Labels))
for name := range repo.Labels {
names = append(names, name)
}
sort.Strings(names)
out := []map[string]any{}
for _, name := range names {
out = append(out, map[string]any{"id": repo.Labels[name], "name": name})
}
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(core.JSONMarshalString(out)))
return
case http.MethodPost:
bodyResult := core.ReadAll(r.Body)
require.True(t, bodyResult.OK)
var payload map[string]any
require.True(t, core.JSONUnmarshalString(bodyResult.Value.(string), &payload).OK)
name := payload["name"].(string)
repo.Labels[name] = int64(len(repo.Labels) + 1)
w.WriteHeader(http.StatusCreated)
_, _ = w.Write([]byte(core.JSONMarshalString(map[string]any{"id": repo.Labels[name]})))
return
}
}
if len(parts) >= 6 && parts[5] == "issues" {
switch {
case len(parts) == 6 && r.Method == http.MethodGet:
numbers := make([]int, 0, len(repo.Issues))
for number := range repo.Issues {
numbers = append(numbers, number)
}
sort.Ints(numbers)
out := []map[string]any{}
for _, number := range numbers {
issue := repo.Issues[number]
if issue == nil {
continue
}
if state := r.URL.Query().Get("state"); state == "open" && issue.State == "closed" {
continue
}
out = append(out, pipelineIssuePayload(repoName, issue))
}
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(core.JSONMarshalString(out)))
return
case len(parts) == 6 && r.Method == http.MethodPost:
bodyResult := core.ReadAll(r.Body)
require.True(t, bodyResult.OK)
var payload map[string]any
require.True(t, core.JSONUnmarshalString(bodyResult.Value.(string), &payload).OK)
next := 1
for number := range repo.Issues {
if number >= next {
next = number + 1
}
}
title := payload["title"].(string)
body, _ := payload["body"].(string)
labels := []string{}
if raw, ok := payload["labels"].([]any); ok {
for _, labelID := range raw {
for name, id := range repo.Labels {
if int64(labelID.(float64)) == id {
labels = append(labels, name)
}
}
}
}
repo.Issues[next] = &pipelineTestIssue{Number: next, Title: title, Body: body, State: "open", Labels: labels}
w.WriteHeader(http.StatusCreated)
_, _ = w.Write([]byte(core.JSONMarshalString(map[string]any{
"number": next,
"html_url": "https://forge.test/core/" + repoName + "/issues/" + core.Sprint(next),
})))
return
case len(parts) == 7 && r.Method == http.MethodGet:
number := parseIntString(parts[6])
issue := repo.Issues[number]
if issue == nil {
w.WriteHeader(http.StatusNotFound)
return
}
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(core.JSONMarshalString(pipelineIssuePayload(repoName, issue))))
return
case len(parts) == 7 && r.Method == http.MethodPatch:
number := parseIntString(parts[6])
issue := repo.Issues[number]
if issue == nil {
w.WriteHeader(http.StatusNotFound)
return
}
bodyResult := core.ReadAll(r.Body)
require.True(t, bodyResult.OK)
var payload map[string]any
require.True(t, core.JSONUnmarshalString(bodyResult.Value.(string), &payload).OK)
if state, ok := payload["state"].(string); ok {
issue.State = state
}
if body, ok := payload["body"].(string); ok {
issue.Body = body
}
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(core.JSONMarshalString(pipelineIssuePayload(repoName, issue))))
return
case len(parts) == 8 && parts[7] == "comments" && r.Method == http.MethodPost:
number := parseIntString(parts[6])
bodyResult := core.ReadAll(r.Body)
require.True(t, bodyResult.OK)
var payload map[string]any
require.True(t, core.JSONUnmarshalString(bodyResult.Value.(string), &payload).OK)
repo.Comments[number] = append(repo.Comments[number], payload["body"].(string))
w.WriteHeader(http.StatusCreated)
_, _ = w.Write([]byte(core.JSONMarshalString(map[string]any{"id": len(repo.Comments[number])})))
return
}
}
if len(parts) >= 6 && parts[5] == "pulls" {
switch {
case len(parts) == 6 && r.Method == http.MethodGet:
numbers := make([]int, 0, len(repo.Pulls))
for number := range repo.Pulls {
numbers = append(numbers, number)
}
sort.Ints(numbers)
out := []map[string]any{}
for _, number := range numbers {
pullRequest := repo.Pulls[number]
if pullRequest == nil {
continue
}
if state := r.URL.Query().Get("state"); state == "open" && pullRequest.State != "open" {
continue
}
out = append(out, pipelinePRPayload(repoName, pullRequest))
}
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(core.JSONMarshalString(out)))
return
case len(parts) == 7 && r.Method == http.MethodGet:
number := parseIntString(parts[6])
pullRequest := repo.Pulls[number]
if pullRequest == nil {
w.WriteHeader(http.StatusNotFound)
return
}
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(core.JSONMarshalString(pipelinePRPayload(repoName, pullRequest))))
return
case len(parts) == 8 && parts[7] == "merge" && r.Method == http.MethodPost:
number := parseIntString(parts[6])
repo.Merged = append(repo.Merged, number)
if pullRequest := repo.Pulls[number]; pullRequest != nil {
pullRequest.State = "merged"
pullRequest.Merged = true
}
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("{}"))
return
}
}
if len(parts) == 8 && parts[5] == "commits" && parts[7] == "status" && r.Method == http.MethodGet {
sha := parts[6]
for _, pullRequest := range repo.Pulls {
if pullRequest != nil && pullRequest.HeadSHA == sha {
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(core.JSONMarshalString(map[string]any{"statuses": pullRequest.Statuses})))
return
}
}
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(core.JSONMarshalString(map[string]any{"statuses": []map[string]any{}})))
return
}
if len(parts) >= 8 && parts[5] == "git" && parts[6] == "refs" {
switch {
case len(parts) >= 9 && parts[7] == "heads" && r.Method == http.MethodGet:
branch := core.Join("/", parts[8:]...)
sha := repo.Branches[branch]
if sha == "" {
sha = repo.RefSHA
}
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(core.JSONMarshalString(map[string]any{"object": map[string]any{"sha": sha}})))
return
case len(parts) == 7 && r.Method == http.MethodPost:
bodyResult := core.ReadAll(r.Body)
require.True(t, bodyResult.OK)
var payload map[string]any
require.True(t, core.JSONUnmarshalString(bodyResult.Value.(string), &payload).OK)
ref := payload["ref"].(string)
sha := payload["sha"].(string)
branch := ref[len("refs/heads/"):]
repo.Branches[branch] = sha
w.WriteHeader(http.StatusCreated)
_, _ = w.Write([]byte(core.JSONMarshalString(map[string]any{"ref": ref})))
return
}
}
w.WriteHeader(http.StatusNotFound)
}))
t.Cleanup(server.Close)
return server
}
func pipelineIssuePayload(repoName string, issue *pipelineTestIssue) map[string]any {
labels := []map[string]any{}
for _, label := range issue.Labels {
labels = append(labels, map[string]any{"name": label})
}
return map[string]any{
"number": issue.Number,
"title": issue.Title,
"body": issue.Body,
"state": issue.State,
"labels": labels,
"html_url": "https://forge.test/core/" + repoName + "/issues/" + core.Sprint(issue.Number),
}
}
func pipelinePRPayload(repoName string, pullRequest *pipelineTestPR) map[string]any {
payload := map[string]any{
"number": pullRequest.Number,
"title": pullRequest.Title,
"state": pullRequest.State,
"html_url": "https://forge.test/core/" + repoName + "/pulls/" + core.Sprint(pullRequest.Number),
"merged": pullRequest.Merged,
"mergeable": pullRequest.Mergeable,
"mergeable_state": pullRequest.MergeableState,
"review_threads_total": pullRequest.ReviewThreadsTotal,
"review_threads_resolved": pullRequest.ReviewThreadsResolved,
"review_threads_unresolved": pullRequest.ReviewThreadsUnresolved,
"review_comments": pullRequest.ReviewComments,
"comments": pullRequest.Comments,
"reactions": pullRequest.Reactions,
"head": map[string]any{
"ref": pullRequest.HeadRef,
"sha": pullRequest.HeadSHA,
"repo": map[string]any{
"updated_at": "2026-04-25T12:00:00Z",
"pushed_at": "2026-04-25T12:00:00Z",
},
},
"base": map[string]any{
"ref": pullRequest.BaseRef,
},
}
if pullRequest.Mergeable == nil {
payload["mergeable"] = nil
}
return payload
}
func boolPtr(value bool) *bool {
return &value
}

View file

@ -0,0 +1,546 @@
// SPDX-License-Identifier: EUPL-1.2
package agentic
import (
"context"
"sort"
core "dappco.re/go/core"
)
type PipelineEpicCreateInput struct {
Org string `json:"org,omitempty"`
Repo string `json:"repo"`
Theme string `json:"theme,omitempty"`
DryRun bool `json:"dry_run,omitempty"`
Candidates []PipelineIssueRef `json:"candidates,omitempty"`
}
type PipelineEpicCreateOutput struct {
Success bool `json:"success"`
Org string `json:"org,omitempty"`
Repo string `json:"repo"`
Candidates []PipelineIssueRef `json:"candidates,omitempty"`
Epics []PipelineEpicMeta `json:"epics,omitempty"`
Existing []PipelineEpicMeta `json:"existing,omitempty"`
}
type PipelineEpicRunInput struct {
Org string `json:"org,omitempty"`
Repo string `json:"repo"`
EpicNumber int `json:"epic_number"`
Agent string `json:"agent,omitempty"`
Template string `json:"template,omitempty"`
DryRun bool `json:"dry_run,omitempty"`
Limit int `json:"limit,omitempty"`
Epic *PipelineEpicMeta `json:"epic,omitempty"`
}
type PipelineEpicRunOutput struct {
Success bool `json:"success"`
Org string `json:"org,omitempty"`
Repo string `json:"repo"`
EpicNumber int `json:"epic_number"`
Branch string `json:"branch,omitempty"`
Dispatched []PipelineIssueRef `json:"dispatched,omitempty"`
Skipped []PipelineIssueRef `json:"skipped,omitempty"`
}
type PipelineEpicStatusOutput struct {
Success bool `json:"success"`
Org string `json:"org,omitempty"`
Repo string `json:"repo"`
Epic PipelineEpicMeta `json:"epic"`
}
type PipelineEpicSyncOutput struct {
Success bool `json:"success"`
Org string `json:"org,omitempty"`
Repo string `json:"repo"`
EpicNumber int `json:"epic_number"`
Checked int `json:"checked"`
Total int `json:"total"`
Updated bool `json:"updated"`
}
func (s *PrepSubsystem) cmdPipelineEpicCreate(options core.Options) core.Result {
ctx := s.commandContext()
repo := pipelineRepoValue(options)
if repo == "" {
core.Print(nil, "usage: core-agent pipeline/epic/create <repo> [--org=core] [--theme=security] [--dry-run]")
return core.Result{Value: core.E("agentic.cmdPipelineEpicCreate", "repo is required", nil), OK: false}
}
output, err := s.pipelineEpicCreate(ctx, PipelineEpicCreateInput{
Org: pipelineOrgValue(options),
Repo: repo,
Theme: optionStringValue(options, "theme"),
DryRun: optionBoolValue(options, "dry_run", "dry-run"),
})
if err != nil {
core.Print(nil, "error: %v", err)
return core.Result{Value: err, OK: false}
}
core.Print(nil, "repo: %s/%s", output.Org, output.Repo)
core.Print(nil, "candidates: %d", len(output.Candidates))
core.Print(nil, "epics: %d", len(output.Epics))
for _, epic := range output.Epics {
core.Print(nil, " epic: #%d %s", epic.Number, epic.Title)
if epic.Branch != "" {
core.Print(nil, " branch: %s", epic.Branch)
}
core.Print(nil, " child: %d", len(epic.Children))
}
if len(output.Existing) > 0 {
core.Print(nil, "existing: %d", len(output.Existing))
}
return core.Result{Value: output, OK: true}
}
func (s *PrepSubsystem) cmdPipelineEpicRun(options core.Options) core.Result {
ctx := s.commandContext()
org, repo, number := pipelineRepoAndNumberValue(options)
if repo == "" || number <= 0 {
core.Print(nil, "usage: core-agent pipeline/epic/run <epic-number> --repo=<repo> [--org=core] [--agent=codex] [--dry-run]")
return core.Result{Value: core.E("agentic.cmdPipelineEpicRun", "repo and epic number are required", nil), OK: false}
}
output, err := s.pipelineEpicRun(ctx, PipelineEpicRunInput{
Org: org,
Repo: repo,
EpicNumber: number,
Agent: optionStringValue(options, "agent"),
Template: optionStringValue(options, "template"),
DryRun: optionBoolValue(options, "dry_run", "dry-run"),
Limit: optionIntValue(options, "limit"),
})
if err != nil {
core.Print(nil, "error: %v", err)
return core.Result{Value: err, OK: false}
}
core.Print(nil, "epic: #%d", output.EpicNumber)
core.Print(nil, "repo: %s/%s", output.Org, output.Repo)
core.Print(nil, "dispatched: %d", len(output.Dispatched))
core.Print(nil, "skipped: %d", len(output.Skipped))
if output.Branch != "" {
core.Print(nil, "branch: %s", output.Branch)
}
for _, issue := range output.Dispatched {
core.Print(nil, " issue: #%d %s", issue.Number, issue.Title)
}
return core.Result{Value: output, OK: true}
}
func (s *PrepSubsystem) cmdPipelineEpicStatus(options core.Options) core.Result {
ctx := s.commandContext()
org, repo, number := pipelineRepoAndNumberValue(options)
if repo == "" || number <= 0 {
core.Print(nil, "usage: core-agent pipeline/epic/status <epic-number> --repo=<repo> [--org=core]")
return core.Result{Value: core.E("agentic.cmdPipelineEpicStatus", "repo and epic number are required", nil), OK: false}
}
reader := &pipelineForgeMetaReader{subsystem: s, org: org}
epic, err := reader.GetEpicMeta(ctx, repo, number)
if err != nil {
core.Print(nil, "error: %v", err)
return core.Result{Value: err, OK: false}
}
output := PipelineEpicStatusOutput{
Success: true,
Org: org,
Repo: repo,
Epic: epic,
}
core.Print(nil, "epic: #%d %s", epic.Number, epic.Title)
core.Print(nil, "state: %s", epic.State)
core.Print(nil, "branch: %s", epic.Branch)
core.Print(nil, "child: %d", len(epic.Children))
for _, child := range epic.Children {
box := "[ ]"
if child.Checked {
box = "[x]"
}
core.Print(nil, " %s #%d %-8s %s", box, child.Number, child.State, child.Title)
}
return core.Result{Value: output, OK: true}
}
func (s *PrepSubsystem) cmdPipelineEpicSync(options core.Options) core.Result {
ctx := s.commandContext()
org, repo, number := pipelineRepoAndNumberValue(options)
if repo == "" || number <= 0 {
core.Print(nil, "usage: core-agent pipeline/epic/sync <epic-number> --repo=<repo> [--org=core] [--dry-run]")
return core.Result{Value: core.E("agentic.cmdPipelineEpicSync", "repo and epic number are required", nil), OK: false}
}
output, err := s.pipelineEpicSync(ctx, org, repo, number, optionBoolValue(options, "dry_run", "dry-run"))
if err != nil {
core.Print(nil, "error: %v", err)
return core.Result{Value: err, OK: false}
}
core.Print(nil, "epic: #%d", output.EpicNumber)
core.Print(nil, "checked: %d/%d", output.Checked, output.Total)
core.Print(nil, "updated: %v", output.Updated)
return core.Result{Value: output, OK: true}
}
func (s *PrepSubsystem) pipelineEpicCreate(ctx context.Context, input PipelineEpicCreateInput) (PipelineEpicCreateOutput, error) {
if input.Repo == "" {
return PipelineEpicCreateOutput{}, core.E("pipelineEpicCreate", "repo is required", nil)
}
if s.forgeToken == "" {
return PipelineEpicCreateOutput{}, core.E("pipelineEpicCreate", "no Forge token configured", nil)
}
if input.Org == "" {
input.Org = "core"
}
candidates := input.Candidates
if len(candidates) == 0 {
issues, err := s.pipelineListIssues(ctx, input.Org, input.Repo, "open")
if err != nil {
return PipelineEpicCreateOutput{}, err
}
for _, issue := range issues {
if !pipelineIssueIsImplementationCandidate(issue) {
continue
}
ref := pipelineIssueRefFromRecord(issue)
if input.Theme != "" && pipelineEpicTheme(ref) != input.Theme {
continue
}
candidates = append(candidates, ref)
}
}
sort.Slice(candidates, func(i, j int) bool {
return candidates[i].Number < candidates[j].Number
})
output := PipelineEpicCreateOutput{
Success: true,
Org: input.Org,
Repo: input.Repo,
Candidates: candidates,
}
if len(candidates) < 3 {
return output, nil
}
groups := map[string][]PipelineIssueRef{}
for _, candidate := range candidates {
theme := pipelineEpicTheme(candidate)
if input.Theme != "" {
theme = input.Theme
}
groups[theme] = append(groups[theme], candidate)
}
existingIssues, err := s.pipelineListIssues(ctx, input.Org, input.Repo, "open")
if err != nil {
return PipelineEpicCreateOutput{}, err
}
existingByTitle := map[string]pipelineIssueRecord{}
for _, issue := range existingIssues {
if pipelineIssueIsEpic(issue) {
existingByTitle[issue.Title] = issue
}
}
groupNames := make([]string, 0, len(groups))
for theme, group := range groups {
if len(group) >= 3 {
groupNames = append(groupNames, theme)
}
}
sort.Strings(groupNames)
reader := &pipelineForgeMetaReader{subsystem: s, org: input.Org}
for _, theme := range groupNames {
group := groups[theme]
title := pipelineEpicTitle(input.Repo, theme)
if existing, ok := existingByTitle[title]; ok {
meta, metaErr := reader.GetEpicMeta(ctx, input.Repo, existing.Number)
if metaErr != nil {
return PipelineEpicCreateOutput{}, metaErr
}
output.Existing = append(output.Existing, meta)
continue
}
labels := []string{"agentic", "epic", theme}
body := pipelineEpicBody(title, "", theme, group)
meta := PipelineEpicMeta{
Title: title,
State: "open",
Branch: "",
}
for _, child := range group {
meta.Children = append(meta.Children, PipelineEpicChildMeta{
Number: child.Number,
Title: child.Title,
Checked: false,
State: "open",
URL: child.URL,
})
}
if input.DryRun {
meta.Branch = core.Sprintf("epic/dry-run-%s", pipelineSlug(theme))
output.Epics = append(output.Epics, meta)
continue
}
labelIDs := s.resolveLabelIDs(ctx, input.Org, input.Repo, labels)
created, createErr := s.createIssue(ctx, input.Org, input.Repo, title, body, labelIDs)
if createErr != nil {
return PipelineEpicCreateOutput{}, core.E("pipelineEpicCreate", "failed to create epic issue", createErr)
}
branch, branchErr := s.pipelineCreateEpicBranch(ctx, input.Org, input.Repo, created.Number, theme)
if branchErr != nil {
return PipelineEpicCreateOutput{}, branchErr
}
patchedBody := pipelineEpicBody(title, branch, theme, group)
if patchErr := s.pipelinePatchIssue(ctx, input.Org, input.Repo, created.Number, map[string]any{"body": patchedBody}); patchErr != nil {
return PipelineEpicCreateOutput{}, patchErr
}
for _, child := range group {
comment := core.Sprintf("Parent: #%d\nEpic branch: `%s`", created.Number, branch)
s.commentOnIssue(ctx, input.Org, input.Repo, child.Number, comment)
}
meta.Number = created.Number
meta.URL = created.URL
meta.Branch = branch
output.Epics = append(output.Epics, meta)
}
return output, nil
}
func (s *PrepSubsystem) pipelineEpicRun(ctx context.Context, input PipelineEpicRunInput) (PipelineEpicRunOutput, error) {
if input.Repo == "" || input.EpicNumber <= 0 {
return PipelineEpicRunOutput{}, core.E("pipelineEpicRun", "repo and epic number are required", nil)
}
if input.Org == "" {
input.Org = "core"
}
if input.Agent == "" {
input.Agent = "codex"
}
if input.Template == "" {
input.Template = "coding"
}
meta := input.Epic
if meta == nil {
reader := &pipelineForgeMetaReader{subsystem: s, org: input.Org}
readMeta, err := reader.GetEpicMeta(ctx, input.Repo, input.EpicNumber)
if err != nil {
return PipelineEpicRunOutput{}, err
}
meta = &readMeta
}
output := PipelineEpicRunOutput{
Success: true,
Org: input.Org,
Repo: input.Repo,
EpicNumber: meta.Number,
Branch: meta.Branch,
}
for _, child := range meta.Children {
if child.Checked || child.State != "open" {
output.Skipped = append(output.Skipped, PipelineIssueRef{Number: child.Number, Title: child.Title, State: child.State, URL: child.URL})
continue
}
if !pipelineIssueAutoDispatchAllowed(child.Title) {
output.Skipped = append(output.Skipped, PipelineIssueRef{Number: child.Number, Title: child.Title, State: "manual-review", URL: child.URL})
continue
}
if input.Limit > 0 && len(output.Dispatched) >= input.Limit {
break
}
ref := PipelineIssueRef{Number: child.Number, Title: child.Title, State: child.State, URL: child.URL}
if input.DryRun {
output.Dispatched = append(output.Dispatched, ref)
continue
}
if meta.Branch != "" {
s.commentOnIssue(ctx, input.Org, input.Repo, child.Number, core.Sprintf("Target branch: `%s` (epic #%d)", meta.Branch, meta.Number))
}
_, _, err := s.dispatch(ctx, nil, DispatchInput{
Org: input.Org,
Repo: input.Repo,
Task: child.Title,
Issue: child.Number,
Branch: meta.Branch,
Agent: input.Agent,
Template: input.Template,
DryRun: false,
})
if err != nil {
return PipelineEpicRunOutput{}, core.E("pipelineEpicRun", core.Concat("failed to dispatch child issue #", core.Sprint(child.Number)), err)
}
output.Dispatched = append(output.Dispatched, ref)
}
return output, nil
}
func (s *PrepSubsystem) pipelineEpicSync(ctx context.Context, org, repo string, number int, dryRun bool) (PipelineEpicSyncOutput, error) {
reader := &pipelineForgeMetaReader{subsystem: s, org: org}
meta, err := reader.GetEpicMeta(ctx, repo, number)
if err != nil {
return PipelineEpicSyncOutput{}, err
}
checkedByNumber := map[int]bool{}
checkedCount := 0
for _, child := range meta.Children {
checked := core.Lower(child.State) == "closed"
if checked {
checkedCount++
}
checkedByNumber[child.Number] = checked
}
lines := core.Split(meta.Body, "\n")
updated := false
for i, line := range lines {
match := pipelineEpicChildPattern.FindStringSubmatch(line)
if len(match) != 6 {
continue
}
childNumber := parseIntString(match[4])
wantChecked := checkedByNumber[childNumber]
currentChecked := core.Lower(match[2]) == "x"
if wantChecked == currentChecked {
continue
}
mark := " "
if wantChecked {
mark = "x"
}
lines[i] = core.Concat(match[1], mark, match[3], match[4], match[5])
updated = true
}
if updated && !dryRun {
if err := s.pipelinePatchIssue(ctx, org, repo, number, map[string]any{"body": core.Join("\n", lines...)}); err != nil {
return PipelineEpicSyncOutput{}, err
}
}
return PipelineEpicSyncOutput{
Success: true,
Org: org,
Repo: repo,
EpicNumber: number,
Checked: checkedCount,
Total: len(meta.Children),
Updated: updated,
}, nil
}
func (s *PrepSubsystem) pipelineCreateEpicBranch(ctx context.Context, org, repo string, epicNumber int, theme string) (string, error) {
repository, err := s.pipelineGetRepo(ctx, org, repo)
if err != nil {
return "", err
}
defaultBranch := repository.DefaultBranch
if defaultBranch == "" {
defaultBranch = "dev"
}
shaURL := core.Sprintf("%s/api/v1/repos/%s/%s/git/refs/heads/%s", s.forgeURL, org, repo, defaultBranch)
shaResult := HTTPGet(ctx, shaURL, s.forgeToken, "token")
if !shaResult.OK {
return "", core.E("pipelineCreateEpicBranch", core.Concat("failed to read ref for ", defaultBranch), nil)
}
var ref struct {
Object struct {
SHA string `json:"sha"`
} `json:"object"`
}
if err := pipelineDecodeJSON(shaResult.Value.(string), &ref, "pipelineCreateEpicBranch", "failed to decode git ref"); err != nil {
return "", err
}
branch := core.Sprintf("epic/%d-%s", epicNumber, pipelineSlug(theme))
createURL := core.Sprintf("%s/api/v1/repos/%s/%s/git/refs", s.forgeURL, org, repo)
createResult := HTTPPost(ctx, createURL, core.JSONMarshalString(map[string]any{
"ref": core.Concat("refs/heads/", branch),
"sha": ref.Object.SHA,
}), s.forgeToken, "token")
if !createResult.OK {
return "", core.E("pipelineCreateEpicBranch", core.Concat("failed to create branch ", branch), nil)
}
return branch, nil
}
func pipelineEpicTheme(issue PipelineIssueRef) string {
haystack := core.Lower(core.Concat(issue.Title, " ", core.JSONMarshalString(issue.Labels)))
switch {
case core.Contains(haystack, "security"):
return "security"
case core.Contains(haystack, "testing"), core.Contains(haystack, "test"):
return "testing"
case core.Contains(haystack, "docs"), core.Contains(haystack, "doc"):
return "docs"
case core.Contains(haystack, "performance"), core.Contains(haystack, "perf"):
return "performance"
case core.HasPrefix(core.Lower(issue.Title), "feat("):
return "features"
default:
return "quality"
}
}
func pipelineEpicTitle(repo, theme string) string {
return core.Sprintf("epic(%s): %s pipeline", repo, core.Lower(pipelineDisplayTheme(theme)))
}
func pipelineEpicBody(title, branch, theme string, children []PipelineIssueRef) string {
builder := core.NewBuilder()
builder.WriteString("## Overview\n\n")
builder.WriteString(core.Sprintf("%s groups related implementation issues into a tracked epic.\n\n", title))
if branch != "" {
builder.WriteString(core.Sprintf("Epic branch: `%s`\n\n", branch))
}
builder.WriteString("## Child Issues\n\n")
builder.WriteString(core.Sprintf("### Phase 1: %s\n", pipelineDisplayTheme(theme)))
for _, child := range children {
builder.WriteString(core.Sprintf("- [ ] #%d %s\n", child.Number, child.Title))
}
builder.WriteString("\n## Acceptance Criteria\n\n")
builder.WriteString("- [ ] All child issues are closed\n")
builder.WriteString("- [ ] Epic checklist is fully synced\n")
return builder.String()
}
func pipelineIssueAutoDispatchAllowed(title string) bool {
return !core.HasPrefix(core.Lower(core.Trim(title)), "feat(")
}

View file

@ -0,0 +1,92 @@
// SPDX-License-Identifier: EUPL-1.2
package agentic
import (
"context"
"testing"
core "dappco.re/go/core"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestPipelineEpic_Good_CreateGroupsIssuesIntoEpic(t *testing.T) {
repo := newPipelineTestRepo()
repo.Issues[10] = &pipelineTestIssue{Number: 10, Title: "security(go-io): Validate tokens", State: "open", Labels: []string{"agentic", "security"}}
repo.Issues[11] = &pipelineTestIssue{Number: 11, Title: "security(go-io): Sanitize input", State: "open", Labels: []string{"agentic", "security"}}
repo.Issues[12] = &pipelineTestIssue{Number: 12, Title: "security(go-io): Add rate limiting", State: "open", Labels: []string{"agentic", "security"}}
srv := newPipelineTestServer(t, map[string]*pipelineTestRepo{"go-io": repo})
s, _ := testPrepWithCore(t, srv)
output, err := s.pipelineEpicCreate(context.Background(), PipelineEpicCreateInput{Org: "core", Repo: "go-io"})
require.NoError(t, err)
require.Len(t, output.Epics, 1)
assert.Equal(t, 13, output.Epics[0].Number)
assert.Equal(t, "epic/13-security", output.Epics[0].Branch)
assert.Contains(t, repo.Issues[13].Body, "- [ ] #10 security(go-io): Validate tokens")
assert.Contains(t, repo.Comments[10][0], "Parent: #13")
assert.Equal(t, "deadbeef", repo.Branches["epic/13-security"])
}
func TestPipelineEpic_Bad_RunRequiresRepoAndNumber(t *testing.T) {
s, _ := testPrepWithCore(t, nil)
result := s.cmdPipelineEpicRun(core.NewOptions())
require.False(t, result.OK)
err, ok := result.Value.(error)
require.True(t, ok)
assert.Contains(t, err.Error(), "repo and epic number are required")
}
func TestPipelineEpic_Ugly_SyncMarksClosedChildrenChecked(t *testing.T) {
repo := newPipelineTestRepo()
repo.Issues[10] = &pipelineTestIssue{Number: 10, Title: "security(go-io): Validate tokens", State: "closed", Labels: []string{"agentic", "security"}}
repo.Issues[11] = &pipelineTestIssue{Number: 11, Title: "security(go-io): Sanitize input", State: "open", Labels: []string{"agentic", "security"}}
repo.Issues[20] = &pipelineTestIssue{
Number: 20,
Title: "epic(go-io): security pipeline",
State: "open",
Labels: []string{"agentic", "epic", "security"},
Body: "## Overview\n\nEpic branch: `epic/20-security`\n\n## Child Issues\n\n- [ ] #10 security(go-io): Validate tokens\n- [ ] #11 security(go-io): Sanitize input\n",
}
srv := newPipelineTestServer(t, map[string]*pipelineTestRepo{"go-io": repo})
s, _ := testPrepWithCore(t, srv)
output, err := s.pipelineEpicSync(context.Background(), "core", "go-io", 20, false)
require.NoError(t, err)
assert.True(t, output.Updated)
assert.Equal(t, 1, output.Checked)
assert.Contains(t, repo.Issues[20].Body, "- [x] #10 security(go-io): Validate tokens")
assert.Contains(t, repo.Issues[20].Body, "- [ ] #11 security(go-io): Sanitize input")
}
func TestPipelineEpic_Run_Good_DryRunDispatchesUncheckedChildren(t *testing.T) {
repo := newPipelineTestRepo()
repo.Issues[10] = &pipelineTestIssue{Number: 10, Title: "security(go-io): Validate tokens", State: "open", Labels: []string{"agentic", "security"}}
repo.Issues[11] = &pipelineTestIssue{Number: 11, Title: "security(go-io): Sanitize input", State: "open", Labels: []string{"agentic", "security"}}
repo.Issues[12] = &pipelineTestIssue{Number: 12, Title: "security(go-io): Add rate limiting", State: "open", Labels: []string{"agentic", "security"}}
repo.Issues[20] = &pipelineTestIssue{
Number: 20,
Title: "epic(go-io): security pipeline",
State: "open",
Labels: []string{"agentic", "epic", "security"},
Body: "## Overview\n\nEpic branch: `epic/20-security`\n\n## Child Issues\n\n- [ ] #10 security(go-io): Validate tokens\n- [ ] #11 security(go-io): Sanitize input\n- [ ] #12 security(go-io): Add rate limiting\n",
}
srv := newPipelineTestServer(t, map[string]*pipelineTestRepo{"go-io": repo})
s, _ := testPrepWithCore(t, srv)
output, err := s.pipelineEpicRun(context.Background(), PipelineEpicRunInput{
Org: "core",
Repo: "go-io",
EpicNumber: 20,
DryRun: true,
})
require.NoError(t, err)
assert.Len(t, output.Dispatched, 3)
assert.Equal(t, "epic/20-security", output.Branch)
}

286
pkg/agentic/pipeline_fix.go Normal file
View file

@ -0,0 +1,286 @@
// SPDX-License-Identifier: EUPL-1.2
package agentic
import (
"context"
core "dappco.re/go/core"
)
type PipelineFixInput struct {
Org string `json:"org,omitempty"`
Repo string `json:"repo"`
Number int `json:"number"`
WorkspaceDir string `json:"workspace_dir,omitempty"`
Commit bool `json:"commit,omitempty"`
Push bool `json:"push,omitempty"`
DryRun bool `json:"dry_run,omitempty"`
}
type PipelineFixOutput struct {
Success bool `json:"success"`
Org string `json:"org,omitempty"`
Repo string `json:"repo"`
Number int `json:"number"`
Action string `json:"action"`
Message string `json:"message,omitempty"`
Files int `json:"files,omitempty"`
Committed bool `json:"committed,omitempty"`
Pushed bool `json:"pushed,omitempty"`
}
func (s *PrepSubsystem) cmdPipelineFixReviews(options core.Options) core.Result {
ctx := s.commandContext()
org, repo, number := pipelineRepoAndNumberValue(options)
if repo == "" || number <= 0 {
core.Print(nil, "usage: core-agent pipeline/fix/reviews <pr-number> --repo=<repo> [--org=core] [--dry-run]")
return core.Result{Value: core.E("agentic.cmdPipelineFixReviews", "repo and pull request number are required", nil), OK: false}
}
output, err := s.pipelineFixReviews(ctx, PipelineFixInput{Org: org, Repo: repo, Number: number, DryRun: optionBoolValue(options, "dry_run", "dry-run")})
if err != nil {
core.Print(nil, "error: %v", err)
return core.Result{Value: err, OK: false}
}
core.Print(nil, "pr: %s/%s#%d", output.Org, output.Repo, output.Number)
core.Print(nil, "action: %s", output.Action)
core.Print(nil, "message: %s", output.Message)
return core.Result{Value: output, OK: true}
}
func (s *PrepSubsystem) cmdPipelineFixConflicts(options core.Options) core.Result {
ctx := s.commandContext()
org, repo, number := pipelineRepoAndNumberValue(options)
if repo == "" || number <= 0 {
core.Print(nil, "usage: core-agent pipeline/fix/conflicts <pr-number> --repo=<repo> [--org=core] [--dry-run]")
return core.Result{Value: core.E("agentic.cmdPipelineFixConflicts", "repo and pull request number are required", nil), OK: false}
}
output, err := s.pipelineFixConflicts(ctx, PipelineFixInput{Org: org, Repo: repo, Number: number, DryRun: optionBoolValue(options, "dry_run", "dry-run")})
if err != nil {
core.Print(nil, "error: %v", err)
return core.Result{Value: err, OK: false}
}
core.Print(nil, "pr: %s/%s#%d", output.Org, output.Repo, output.Number)
core.Print(nil, "action: %s", output.Action)
core.Print(nil, "message: %s", output.Message)
return core.Result{Value: output, OK: true}
}
func (s *PrepSubsystem) cmdPipelineFixFormat(options core.Options) core.Result {
ctx := s.commandContext()
org, repo, number := pipelineRepoAndNumberValue(options)
if repo == "" || number <= 0 {
core.Print(nil, "usage: core-agent pipeline/fix/format <pr-number> --repo=<repo> [--org=core] [--workspace=<name>|--repo-dir=<path>] [--commit] [--push] [--dry-run]")
return core.Result{Value: core.E("agentic.cmdPipelineFixFormat", "repo and pull request number are required", nil), OK: false}
}
output, err := s.pipelineFixFormat(ctx, PipelineFixInput{
Org: org,
Repo: repo,
Number: number,
WorkspaceDir: pipelineWorkspaceDir(options),
Commit: optionBoolValue(options, "commit"),
Push: optionBoolValue(options, "push"),
DryRun: optionBoolValue(options, "dry_run", "dry-run"),
})
if err != nil {
core.Print(nil, "error: %v", err)
return core.Result{Value: err, OK: false}
}
core.Print(nil, "pr: %s/%s#%d", output.Org, output.Repo, output.Number)
core.Print(nil, "action: %s", output.Action)
core.Print(nil, "files: %d", output.Files)
core.Print(nil, "committed: %v", output.Committed)
core.Print(nil, "pushed: %v", output.Pushed)
if output.Message != "" {
core.Print(nil, "message: %s", output.Message)
}
return core.Result{Value: output, OK: true}
}
func (s *PrepSubsystem) cmdPipelineFixThreads(options core.Options) core.Result {
ctx := s.commandContext()
org, repo, number := pipelineRepoAndNumberValue(options)
if repo == "" || number <= 0 {
core.Print(nil, "usage: core-agent pipeline/fix/threads <pr-number> --repo=<repo> [--org=core] [--dry-run]")
return core.Result{Value: core.E("agentic.cmdPipelineFixThreads", "repo and pull request number are required", nil), OK: false}
}
output, err := s.pipelineFixThreads(ctx, PipelineFixInput{Org: org, Repo: repo, Number: number, DryRun: optionBoolValue(options, "dry_run", "dry-run")})
if err != nil {
core.Print(nil, "error: %v", err)
return core.Result{Value: err, OK: false}
}
core.Print(nil, "pr: %s/%s#%d", output.Org, output.Repo, output.Number)
core.Print(nil, "action: %s", output.Action)
core.Print(nil, "message: %s", output.Message)
return core.Result{Value: output, OK: true}
}
func (s *PrepSubsystem) pipelineFixReviews(ctx context.Context, input PipelineFixInput) (PipelineFixOutput, error) {
if input.Repo == "" || input.Number <= 0 {
return PipelineFixOutput{}, core.E("pipelineFixReviews", "repo and pull request number are required", nil)
}
if input.Org == "" {
input.Org = "core"
}
if !input.DryRun {
s.commentOnIssue(ctx, input.Org, input.Repo, input.Number, "Can you fix the code reviews?")
}
return PipelineFixOutput{
Success: true,
Org: input.Org,
Repo: input.Repo,
Number: input.Number,
Action: "comment",
Message: "Can you fix the code reviews?",
}, nil
}
func (s *PrepSubsystem) pipelineFixConflicts(ctx context.Context, input PipelineFixInput) (PipelineFixOutput, error) {
if input.Repo == "" || input.Number <= 0 {
return PipelineFixOutput{}, core.E("pipelineFixConflicts", "repo and pull request number are required", nil)
}
if input.Org == "" {
input.Org = "core"
}
if !input.DryRun {
s.commentOnIssue(ctx, input.Org, input.Repo, input.Number, "Can you fix the merge conflict?")
}
return PipelineFixOutput{
Success: true,
Org: input.Org,
Repo: input.Repo,
Number: input.Number,
Action: "comment",
Message: "Can you fix the merge conflict?",
}, nil
}
func (s *PrepSubsystem) pipelineFixFormat(ctx context.Context, input PipelineFixInput) (PipelineFixOutput, error) {
if input.Repo == "" || input.Number <= 0 {
return PipelineFixOutput{}, core.E("pipelineFixFormat", "repo and pull request number are required", nil)
}
if input.WorkspaceDir == "" {
return PipelineFixOutput{}, core.E("pipelineFixFormat", "workspace or repo_dir is required for formatting", nil)
}
if input.Org == "" {
input.Org = "core"
}
process := s.Core().Process()
fileList := process.RunIn(ctx, input.WorkspaceDir, "rg", "--files", "-g", "*.go")
if !fileList.OK {
fileList = process.RunIn(ctx, input.WorkspaceDir, "find", ".", "-name", "*.go", "-type", "f")
}
goFiles := []string{}
if fileList.OK {
for _, file := range core.Split(core.Trim(resultText(fileList)), "\n") {
trimmed := core.Trim(file)
if trimmed != "" {
goFiles = append(goFiles, trimmed)
}
}
}
output := PipelineFixOutput{
Success: true,
Org: input.Org,
Repo: input.Repo,
Number: input.Number,
Action: "format",
Files: len(goFiles),
Message: "no Go files matched",
}
if len(goFiles) == 0 {
return output, nil
}
output.Message = "formatted Go files"
if input.DryRun {
return output, nil
}
formatResult := process.RunIn(ctx, input.WorkspaceDir, "sh", "-lc", "files=$(rg --files -g '*.go' 2>/dev/null || find . -name '*.go' -type f); if [ -n \"$files\" ]; then gofmt -w $files; fi")
if !formatResult.OK {
return PipelineFixOutput{}, core.E("pipelineFixFormat", "gofmt failed", nil)
}
statusResult := process.RunIn(ctx, input.WorkspaceDir, "git", "status", "--porcelain")
if !statusResult.OK {
return output, nil
}
if core.Trim(resultText(statusResult)) == "" {
output.Message = "repo already formatted"
return output, nil
}
if input.Commit || input.Push {
addResult := process.RunIn(ctx, input.WorkspaceDir, "git", "add", "-A")
if !addResult.OK {
return PipelineFixOutput{}, core.E("pipelineFixFormat", "git add failed", nil)
}
commitResult := process.RunIn(ctx, input.WorkspaceDir, "git", "commit", "-m", "chore: apply formatting")
if !commitResult.OK {
return PipelineFixOutput{}, core.E("pipelineFixFormat", "git commit failed", nil)
}
output.Committed = true
}
if input.Push {
pushResult := process.RunIn(ctx, input.WorkspaceDir, "git", "push")
if !pushResult.OK {
return PipelineFixOutput{}, core.E("pipelineFixFormat", "git push failed", nil)
}
output.Pushed = true
}
return output, nil
}
func (s *PrepSubsystem) pipelineFixThreads(ctx context.Context, input PipelineFixInput) (PipelineFixOutput, error) {
if input.Repo == "" || input.Number <= 0 {
return PipelineFixOutput{}, core.E("pipelineFixThreads", "repo and pull request number are required", nil)
}
if input.Org == "" {
input.Org = "core"
}
reader := &pipelineForgeMetaReader{subsystem: s, org: input.Org}
meta, err := reader.GetPRMeta(ctx, input.Repo, input.Number)
if err != nil {
return PipelineFixOutput{}, err
}
unresolved := meta.ThreadsTotal - meta.ThreadsResolved
if unresolved <= 0 {
return PipelineFixOutput{
Success: true,
Org: input.Org,
Repo: input.Repo,
Number: input.Number,
Action: "noop",
Message: "no unresolved review threads remain",
}, nil
}
message := core.Sprintf("Please resolve the %d remaining review thread(s) after the latest fix.", unresolved)
if !input.DryRun {
s.commentOnIssue(ctx, input.Org, input.Repo, input.Number, message)
}
return PipelineFixOutput{
Success: true,
Org: input.Org,
Repo: input.Repo,
Number: input.Number,
Action: "comment",
Message: message,
}, nil
}

View file

@ -0,0 +1,85 @@
// SPDX-License-Identifier: EUPL-1.2
package agentic
import (
"context"
"testing"
"time"
core "dappco.re/go/core"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestPipelineFix_Good_ReviewsPostsComment(t *testing.T) {
repo := newPipelineTestRepo()
srv := newPipelineTestServer(t, map[string]*pipelineTestRepo{"go-io": repo})
s, _ := testPrepWithCore(t, srv)
output, err := s.pipelineFixReviews(context.Background(), PipelineFixInput{Org: "core", Repo: "go-io", Number: 7})
require.NoError(t, err)
assert.True(t, output.Success)
assert.Contains(t, repo.Comments[7][0], "Can you fix the code reviews?")
}
func TestPipelineFix_Bad_FormatRequiresWorkspace(t *testing.T) {
s := &PrepSubsystem{
ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}),
backoff: make(map[string]time.Time),
failCount: make(map[string]int),
}
_, err := s.pipelineFixFormat(context.Background(), PipelineFixInput{Org: "core", Repo: "go-io", Number: 9})
require.Error(t, err)
assert.Contains(t, err.Error(), "workspace or repo_dir is required")
}
func TestPipelineFix_Ugly_ThreadsNoopWhenAlreadyResolved(t *testing.T) {
repo := newPipelineTestRepo()
repo.Pulls[5] = &pipelineTestPR{
Number: 5,
Title: "Already clean",
State: "open",
Mergeable: boolPtr(true),
HeadRef: "agent/clean",
HeadSHA: "sha-clean",
BaseRef: "dev",
ReviewThreadsTotal: 2,
ReviewThreadsResolved: 2,
}
srv := newPipelineTestServer(t, map[string]*pipelineTestRepo{"go-io": repo})
s, _ := testPrepWithCore(t, srv)
output, err := s.pipelineFixThreads(context.Background(), PipelineFixInput{Org: "core", Repo: "go-io", Number: 5})
require.NoError(t, err)
assert.Equal(t, "noop", output.Action)
assert.Equal(t, "no unresolved review threads remain", output.Message)
}
func TestPipelineFix_Format_Good_DryRunCountsFiles(t *testing.T) {
dir := t.TempDir()
require.True(t, fs.Write(core.JoinPath(dir, "one.go"), "package main\nfunc main( ){}\n").OK)
require.True(t, fs.Write(core.JoinPath(dir, "two.go"), "package main\nfunc helper( ){}\n").OK)
s := &PrepSubsystem{
ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}),
backoff: make(map[string]time.Time),
failCount: make(map[string]int),
}
output, err := s.pipelineFixFormat(context.Background(), PipelineFixInput{
Org: "core",
Repo: "go-io",
Number: 12,
WorkspaceDir: dir,
DryRun: true,
})
require.NoError(t, err)
assert.Equal(t, "format", output.Action)
assert.Equal(t, 2, output.Files)
}

View file

@ -0,0 +1,501 @@
// SPDX-License-Identifier: EUPL-1.2
package agentic
import (
"context"
"regexp"
core "dappco.re/go/core"
)
var pipelineEpicBranchPattern = regexp.MustCompile("Epic branch:\\s*`([^`]+)`")
var pipelineEpicChildPattern = regexp.MustCompile(`(?m)^(\s*-\s*\[)([ xX])(\]\s*#)(\d+)(.*)$`)
type MetaReader interface {
GetPRMeta(ctx context.Context, repo string, prNumber int) (PipelinePRMeta, error)
GetEpicMeta(ctx context.Context, repo string, issueNumber int) (PipelineEpicMeta, error)
GetIssueState(ctx context.Context, repo string, issueNumber int) (PipelineIssueState, error)
GetCommentReactions(ctx context.Context, repo string, commentID int64) ([]PipelineReactionMeta, error)
}
type PipelineCheckMeta struct {
Name string `json:"name"`
Conclusion string `json:"conclusion,omitempty"`
Status string `json:"status,omitempty"`
}
type PipelinePRMeta struct {
Number int `json:"number"`
State string `json:"state"`
Mergeable string `json:"mergeable"`
HeadSHA string `json:"head_sha,omitempty"`
HeadDate string `json:"head_date,omitempty"`
BaseBranch string `json:"base_branch,omitempty"`
HeadBranch string `json:"head_branch,omitempty"`
Checks []PipelineCheckMeta `json:"checks,omitempty"`
ThreadsTotal int `json:"threads_total,omitempty"`
ThreadsResolved int `json:"threads_resolved,omitempty"`
HasEyesReaction bool `json:"has_eyes_reaction,omitempty"`
URL string `json:"url,omitempty"`
}
type PipelineIssueState struct {
Number int `json:"number"`
State string `json:"state"`
Title string `json:"title"`
URL string `json:"url,omitempty"`
Labels []string `json:"labels,omitempty"`
}
type PipelineEpicChildMeta struct {
Number int `json:"number"`
Title string `json:"title"`
Checked bool `json:"checked"`
State string `json:"state"`
URL string `json:"url,omitempty"`
PRs []int `json:"prs,omitempty"`
}
type PipelineEpicMeta struct {
Number int `json:"number"`
Title string `json:"title"`
State string `json:"state"`
URL string `json:"url,omitempty"`
Body string `json:"body,omitempty"`
Branch string `json:"branch,omitempty"`
Children []PipelineEpicChildMeta `json:"children"`
}
type PipelineReactionMeta struct {
Content string `json:"content"`
Count int `json:"count"`
}
type PipelineMonitorInput struct {
Org string `json:"org,omitempty"`
Repo string `json:"repo,omitempty"`
DryRun bool `json:"dry_run,omitempty"`
}
type PipelineMonitorAction struct {
Repo string `json:"repo"`
Number int `json:"number"`
Action string `json:"action"`
Reason string `json:"reason"`
URL string `json:"url,omitempty"`
}
type PipelineMonitorOutput struct {
Success bool `json:"success"`
Org string `json:"org,omitempty"`
Repo string `json:"repo,omitempty"`
Actions []PipelineMonitorAction `json:"actions,omitempty"`
}
type pipelinePullRequestRecord struct {
Number int `json:"number"`
Title string `json:"title"`
State string `json:"state"`
HTMLURL string `json:"html_url"`
Merged bool `json:"merged"`
Mergeable *bool `json:"mergeable"`
MergeableState string `json:"mergeable_state"`
UpdatedAt string `json:"updated_at"`
ReviewThreadsTotal int `json:"review_threads_total"`
ReviewThreadsResolved int `json:"review_threads_resolved"`
ReviewThreadsUnresolved int `json:"review_threads_unresolved"`
ReviewComments int `json:"review_comments"`
Comments int `json:"comments"`
Reactions map[string]int `json:"reactions"`
Head struct {
Ref string `json:"ref"`
SHA string `json:"sha"`
Date string `json:"date"`
UpdatedAt string `json:"updated_at"`
Repo struct {
UpdatedAt string `json:"updated_at"`
PushedAt string `json:"pushed_at"`
} `json:"repo"`
} `json:"head"`
Base struct {
Ref string `json:"ref"`
} `json:"base"`
}
type pipelineStatusRecord struct {
Statuses []struct {
Context string `json:"context"`
Name string `json:"name"`
Status string `json:"status"`
State string `json:"state"`
Conclusion string `json:"conclusion"`
UpdatedAt string `json:"updated_at"`
CreatedAt string `json:"created_at"`
} `json:"statuses"`
}
type pipelineForgeMetaReader struct {
subsystem *PrepSubsystem
org string
}
func (s *PrepSubsystem) cmdPipelineMonitor(options core.Options) core.Result {
ctx := s.commandContext()
output, err := s.pipelineMonitorWithReader(ctx, PipelineMonitorInput{
Org: pipelineOrgValue(options),
Repo: pipelineRepoValue(options),
DryRun: optionBoolValue(options, "dry_run", "dry-run"),
}, &pipelineForgeMetaReader{subsystem: s, org: pipelineOrgValue(options)})
if err != nil {
core.Print(nil, "error: %v", err)
return core.Result{Value: err, OK: false}
}
if output.Repo != "" {
core.Print(nil, "repo: %s/%s", output.Org, output.Repo)
} else {
core.Print(nil, "org: %s", output.Org)
}
core.Print(nil, "actions: %d", len(output.Actions))
for _, action := range output.Actions {
core.Print(nil, " %s #%d %s", action.Repo, action.Number, action.Action)
}
if len(output.Actions) == 0 {
core.Print(nil, "no interventions")
}
return core.Result{Value: output, OK: true}
}
func (s *PrepSubsystem) pipelineMonitorWithReader(ctx context.Context, input PipelineMonitorInput, reader MetaReader) (PipelineMonitorOutput, error) {
if s.forgeToken == "" {
return PipelineMonitorOutput{}, core.E("pipelineMonitor", "no Forge token configured", nil)
}
if input.Org == "" {
input.Org = "core"
}
repos := []string{}
if input.Repo != "" {
repos = append(repos, input.Repo)
} else {
records, err := s.pipelineListOrgRepos(ctx, input.Org)
if err != nil {
return PipelineMonitorOutput{}, err
}
for _, record := range records {
repos = append(repos, record.Name)
}
}
output := PipelineMonitorOutput{
Success: true,
Org: input.Org,
Repo: input.Repo,
}
for _, repo := range repos {
pullRequests, err := s.pipelineListPullRequests(ctx, input.Org, repo, "open")
if err != nil {
return PipelineMonitorOutput{}, err
}
for _, pullRequest := range pullRequests {
meta, err := reader.GetPRMeta(ctx, repo, pullRequest.Number)
if err != nil {
return PipelineMonitorOutput{}, err
}
if meta.State != "open" {
continue
}
action := PipelineMonitorAction{
Repo: repo,
Number: meta.Number,
URL: meta.URL,
}
switch {
case meta.Mergeable == "conflicting":
action.Action = "fix/conflicts"
action.Reason = "merge conflict detected"
if !input.DryRun {
s.commentOnIssue(ctx, input.Org, repo, meta.Number, "Can you fix the merge conflict?")
}
case meta.ThreadsTotal > meta.ThreadsResolved:
action.Action = "fix/reviews"
action.Reason = "unresolved review threads detected"
if !input.DryRun {
s.commentOnIssue(ctx, input.Org, repo, meta.Number, "Can you fix the code reviews?")
}
case pipelineChecksSuccessful(meta.Checks):
action.Action = "merge"
action.Reason = "all checks passed and no review blockers remain"
if !input.DryRun {
mergeResult := s.forgeMergePR(ctx, input.Org, repo, meta.Number)
if !mergeResult.OK {
return PipelineMonitorOutput{}, commandResultError("pipelineMonitor.merge", mergeResult)
}
}
default:
continue
}
output.Actions = append(output.Actions, action)
}
}
return output, nil
}
func (s *PrepSubsystem) pipelineListPullRequests(ctx context.Context, org, repo, state string) ([]pipelinePullRequestRecord, error) {
if state == "" {
state = "open"
}
url := core.Sprintf("%s/api/v1/repos/%s/%s/pulls?state=%s&limit=100&page=1", s.forgeURL, org, repo, state)
result := HTTPGet(ctx, url, s.forgeToken, "token")
if !result.OK {
return nil, core.E("pipelineListPullRequests", core.Concat("failed to list pull requests for ", repo), nil)
}
var pullRequests []pipelinePullRequestRecord
if err := pipelineDecodeJSON(result.Value.(string), &pullRequests, "pipelineListPullRequests", "failed to decode pull request list"); err != nil {
return nil, err
}
return pullRequests, nil
}
func (r *pipelineForgeMetaReader) GetPRMeta(ctx context.Context, repo string, prNumber int) (PipelinePRMeta, error) {
url := core.Sprintf("%s/api/v1/repos/%s/%s/pulls/%d", r.subsystem.forgeURL, r.org, repo, prNumber)
result := HTTPGet(ctx, url, r.subsystem.forgeToken, "token")
if !result.OK {
return PipelinePRMeta{}, core.E("MetaReader.GetPRMeta", core.Concat("failed to read PR #", core.Sprint(prNumber)), nil)
}
var pullRequest pipelinePullRequestRecord
if err := pipelineDecodeJSON(result.Value.(string), &pullRequest, "MetaReader.GetPRMeta", "failed to decode pull request"); err != nil {
return PipelinePRMeta{}, err
}
statuses := []PipelineCheckMeta{}
if pullRequest.Head.SHA != "" {
statusURL := core.Sprintf("%s/api/v1/repos/%s/%s/commits/%s/status", r.subsystem.forgeURL, r.org, repo, pullRequest.Head.SHA)
statusResult := HTTPGet(ctx, statusURL, r.subsystem.forgeToken, "token")
if statusResult.OK {
var status pipelineStatusRecord
if err := pipelineDecodeJSON(statusResult.Value.(string), &status, "MetaReader.GetPRMeta", "failed to decode commit status"); err == nil {
for _, entry := range status.Statuses {
rawState := entry.Status
if rawState == "" {
rawState = entry.State
}
if rawState == "" {
rawState = entry.Conclusion
}
name := entry.Context
if name == "" {
name = entry.Name
}
statuses = append(statuses, PipelineCheckMeta{
Name: name,
Conclusion: pipelineCheckConclusion(rawState),
Status: pipelineCheckStatus(rawState),
})
}
}
}
}
state := core.Lower(pullRequest.State)
if pullRequest.Merged {
state = "merged"
}
if state == "" {
state = "open"
}
mergeable := "unknown"
switch {
case pullRequest.Mergeable != nil && *pullRequest.Mergeable:
mergeable = "mergeable"
case pullRequest.Mergeable != nil && !*pullRequest.Mergeable:
mergeable = "conflicting"
case core.Lower(pullRequest.MergeableState) == "clean", core.Lower(pullRequest.MergeableState) == "mergeable":
mergeable = "mergeable"
case core.Lower(pullRequest.MergeableState) == "dirty", core.Lower(pullRequest.MergeableState) == "conflicting":
mergeable = "conflicting"
}
threadsTotal := pullRequest.ReviewThreadsTotal
if threadsTotal == 0 {
if pullRequest.ReviewComments > 0 {
threadsTotal = pullRequest.ReviewComments
} else {
threadsTotal = pullRequest.Comments
}
}
threadsResolved := pullRequest.ReviewThreadsResolved
if threadsResolved == 0 && pullRequest.ReviewThreadsUnresolved > 0 && threadsTotal >= pullRequest.ReviewThreadsUnresolved {
threadsResolved = threadsTotal - pullRequest.ReviewThreadsUnresolved
}
headDate := pullRequest.Head.Date
if headDate == "" {
headDate = pullRequest.Head.UpdatedAt
}
if headDate == "" {
headDate = pullRequest.Head.Repo.UpdatedAt
}
if headDate == "" {
headDate = pullRequest.Head.Repo.PushedAt
}
if headDate == "" {
headDate = pullRequest.UpdatedAt
}
return PipelinePRMeta{
Number: pullRequest.Number,
State: state,
Mergeable: mergeable,
HeadSHA: pullRequest.Head.SHA,
HeadDate: headDate,
BaseBranch: pullRequest.Base.Ref,
HeadBranch: pullRequest.Head.Ref,
Checks: statuses,
ThreadsTotal: threadsTotal,
ThreadsResolved: threadsResolved,
HasEyesReaction: pullRequest.Reactions["eyes"] > 0,
URL: pullRequest.HTMLURL,
}, nil
}
func (r *pipelineForgeMetaReader) GetEpicMeta(ctx context.Context, repo string, issueNumber int) (PipelineEpicMeta, error) {
issue, err := r.subsystem.pipelineGetIssue(ctx, r.org, repo, issueNumber)
if err != nil {
return PipelineEpicMeta{}, err
}
meta := PipelineEpicMeta{
Number: issue.Number,
Title: issue.Title,
State: issue.State,
URL: issue.HTMLURL,
Body: issue.Body,
}
branchMatch := pipelineEpicBranchPattern.FindStringSubmatch(issue.Body)
if len(branchMatch) == 2 {
meta.Branch = branchMatch[1]
}
matches := pipelineEpicChildPattern.FindAllStringSubmatch(issue.Body, -1)
for _, match := range matches {
if len(match) != 6 {
continue
}
number := parseIntString(match[4])
if number <= 0 {
continue
}
state, stateErr := r.GetIssueState(ctx, repo, number)
if stateErr != nil {
return PipelineEpicMeta{}, stateErr
}
meta.Children = append(meta.Children, PipelineEpicChildMeta{
Number: number,
Title: state.Title,
Checked: core.Lower(match[2]) == "x",
State: state.State,
URL: state.URL,
})
}
return meta, nil
}
func (r *pipelineForgeMetaReader) GetIssueState(ctx context.Context, repo string, issueNumber int) (PipelineIssueState, error) {
issue, err := r.subsystem.pipelineGetIssue(ctx, r.org, repo, issueNumber)
if err != nil {
return PipelineIssueState{}, err
}
state := issue.State
if state == "" {
state = "open"
}
return PipelineIssueState{
Number: issue.Number,
State: state,
Title: issue.Title,
URL: issue.HTMLURL,
Labels: pipelineIssueLabelNames(issue),
}, nil
}
func (r *pipelineForgeMetaReader) GetCommentReactions(ctx context.Context, repo string, commentID int64) ([]PipelineReactionMeta, error) {
url := core.Sprintf("%s/api/v1/repos/%s/%s/issues/comments/%d/reactions", r.subsystem.forgeURL, r.org, repo, commentID)
result := HTTPGet(ctx, url, r.subsystem.forgeToken, "token")
if !result.OK {
return nil, core.E("MetaReader.GetCommentReactions", core.Concat("failed to read comment reactions for ", core.Sprint(commentID)), nil)
}
var reactions []struct {
Content string `json:"content"`
}
if err := pipelineDecodeJSON(result.Value.(string), &reactions, "MetaReader.GetCommentReactions", "failed to decode reactions"); err != nil {
return nil, err
}
counts := map[string]int{}
for _, reaction := range reactions {
counts[reaction.Content]++
}
aggregated := []PipelineReactionMeta{}
for content, count := range counts {
aggregated = append(aggregated, PipelineReactionMeta{Content: content, Count: count})
}
return aggregated, nil
}
func pipelineCheckConclusion(rawState string) string {
switch core.Lower(rawState) {
case "success":
return "success"
case "failure", "error":
return "failure"
default:
return ""
}
}
func pipelineCheckStatus(rawState string) string {
switch core.Lower(rawState) {
case "success", "failure", "error":
return "completed"
case "pending", "queued":
return "queued"
case "running", "in_progress":
return "in_progress"
default:
return ""
}
}
func pipelineChecksSuccessful(checks []PipelineCheckMeta) bool {
if len(checks) == 0 {
return false
}
for _, check := range checks {
if check.Status != "completed" || check.Conclusion != "success" {
return false
}
}
return true
}

View file

@ -0,0 +1,104 @@
// SPDX-License-Identifier: EUPL-1.2
package agentic
import (
"context"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestPipelineMonitor_Good_AutoIntervenesAndMerges(t *testing.T) {
repo := newPipelineTestRepo()
repo.Pulls[1] = &pipelineTestPR{
Number: 1,
Title: "Fix conflicts",
State: "open",
Mergeable: boolPtr(false),
HeadRef: "agent/conflicts",
HeadSHA: "sha-conflicts",
BaseRef: "dev",
MergeableState: "dirty",
}
repo.Pulls[2] = &pipelineTestPR{
Number: 2,
Title: "Fix reviews",
State: "open",
Mergeable: boolPtr(true),
HeadRef: "agent/reviews",
HeadSHA: "sha-reviews",
BaseRef: "dev",
ReviewThreadsTotal: 3,
ReviewThreadsResolved: 1,
ReviewThreadsUnresolved: 2,
Statuses: []map[string]any{
{"context": "qa", "status": "success"},
},
}
repo.Pulls[3] = &pipelineTestPR{
Number: 3,
Title: "Ready to merge",
State: "open",
Mergeable: boolPtr(true),
HeadRef: "agent/merge",
HeadSHA: "sha-merge",
BaseRef: "dev",
ReviewThreadsTotal: 1,
ReviewThreadsResolved: 1,
Statuses: []map[string]any{
{"context": "qa", "status": "success"},
{"context": "build", "status": "success"},
},
}
srv := newPipelineTestServer(t, map[string]*pipelineTestRepo{"go-io": repo})
s, _ := testPrepWithCore(t, srv)
output, err := s.pipelineMonitorWithReader(context.Background(), PipelineMonitorInput{
Org: "core",
Repo: "go-io",
}, &pipelineForgeMetaReader{subsystem: s, org: "core"})
require.NoError(t, err)
assert.Len(t, output.Actions, 3)
assert.Contains(t, repo.Comments[1][0], "Can you fix the merge conflict?")
assert.Contains(t, repo.Comments[2][0], "Can you fix the code reviews?")
assert.Contains(t, repo.Merged, 3)
}
func TestPipelineMonitor_Bad_NoToken(t *testing.T) {
s, _ := testPrepWithCore(t, nil)
s.forgeToken = ""
_, err := s.pipelineMonitorWithReader(context.Background(), PipelineMonitorInput{Org: "core", Repo: "go-io"}, &pipelineForgeMetaReader{subsystem: s, org: "core"})
require.Error(t, err)
assert.Contains(t, err.Error(), "no Forge token configured")
}
func TestPipelineMonitor_Ugly_NoActionWhenChecksPending(t *testing.T) {
repo := newPipelineTestRepo()
repo.Pulls[4] = &pipelineTestPR{
Number: 4,
Title: "Waiting for CI",
State: "open",
Mergeable: boolPtr(true),
HeadRef: "agent/pending",
HeadSHA: "sha-pending",
BaseRef: "dev",
Statuses: []map[string]any{{"context": "qa", "status": "pending"}},
Reactions: map[string]int{},
Comments: 0,
}
srv := newPipelineTestServer(t, map[string]*pipelineTestRepo{"go-io": repo})
s, _ := testPrepWithCore(t, srv)
output, err := s.pipelineMonitorWithReader(context.Background(), PipelineMonitorInput{
Org: "core",
Repo: "go-io",
}, &pipelineForgeMetaReader{subsystem: s, org: "core"})
require.NoError(t, err)
assert.Empty(t, output.Actions)
}

View file

@ -0,0 +1,162 @@
// SPDX-License-Identifier: EUPL-1.2
package agentic
import (
"context"
core "dappco.re/go/core"
)
type PipelineOnboardInput struct {
Org string `json:"org,omitempty"`
Repo string `json:"repo"`
Agent string `json:"agent,omitempty"`
Template string `json:"template,omitempty"`
DispatchDryRun bool `json:"dispatch_dry_run,omitempty"`
Limit int `json:"limit,omitempty"`
}
type PipelineOnboardOutput struct {
Success bool `json:"success"`
Org string `json:"org,omitempty"`
Repo string `json:"repo"`
Audit PipelineAuditOutput `json:"audit"`
Epic PipelineEpicCreateOutput `json:"epic"`
Runs []PipelineEpicRunOutput `json:"runs,omitempty"`
Direct []PipelineIssueRef `json:"direct,omitempty"`
}
func (s *PrepSubsystem) cmdPipelineOnboard(options core.Options) core.Result {
ctx := s.commandContext()
repo := pipelineRepoValue(options)
if repo == "" {
core.Print(nil, "usage: core-agent pipeline/onboard <repo> [--org=core] [--agent=codex] [--template=coding] [--dry-run]")
return core.Result{Value: core.E("agentic.cmdPipelineOnboard", "repo is required", nil), OK: false}
}
output, err := s.pipelineOnboard(ctx, PipelineOnboardInput{
Org: pipelineOrgValue(options),
Repo: repo,
Agent: optionStringValue(options, "agent"),
Template: optionStringValue(options, "template"),
DispatchDryRun: optionBoolValue(options, "dry_run", "dry-run"),
Limit: optionIntValue(options, "limit"),
})
if err != nil {
core.Print(nil, "error: %v", err)
return core.Result{Value: err, OK: false}
}
core.Print(nil, "repo: %s/%s", output.Org, output.Repo)
core.Print(nil, "audit created: %d", len(output.Audit.Created))
core.Print(nil, "epics: %d", len(output.Epic.Epics))
core.Print(nil, "epic runs: %d", len(output.Runs))
core.Print(nil, "direct: %d", len(output.Direct))
for _, run := range output.Runs {
core.Print(nil, " epic #%d dispatched %d issue(s)", run.EpicNumber, len(run.Dispatched))
}
return core.Result{Value: output, OK: true}
}
func (s *PrepSubsystem) pipelineOnboard(ctx context.Context, input PipelineOnboardInput) (PipelineOnboardOutput, error) {
if input.Repo == "" {
return PipelineOnboardOutput{}, core.E("pipelineOnboard", "repo is required", nil)
}
if input.Org == "" {
input.Org = "core"
}
if input.Agent == "" {
input.Agent = "codex"
}
if input.Template == "" {
input.Template = "coding"
}
auditOutput, err := s.pipelineAudit(ctx, PipelineAuditInput{
Org: input.Org,
Repo: input.Repo,
})
if err != nil {
return PipelineOnboardOutput{}, err
}
epicOutput, err := s.pipelineEpicCreate(ctx, PipelineEpicCreateInput{
Org: input.Org,
Repo: input.Repo,
})
if err != nil {
return PipelineOnboardOutput{}, err
}
output := PipelineOnboardOutput{
Success: true,
Org: input.Org,
Repo: input.Repo,
Audit: auditOutput,
Epic: epicOutput,
}
epicsToRun := make([]PipelineEpicMeta, 0, len(epicOutput.Epics)+len(epicOutput.Existing))
epicsToRun = append(epicsToRun, epicOutput.Epics...)
epicsToRun = append(epicsToRun, epicOutput.Existing...)
if len(epicsToRun) == 0 {
direct, directErr := s.pipelineOnboardDispatchDirect(ctx, input, epicOutput.Candidates)
if directErr != nil {
return PipelineOnboardOutput{}, directErr
}
output.Direct = direct
return output, nil
}
for _, epic := range epicsToRun {
runOutput, runErr := s.pipelineEpicRun(ctx, PipelineEpicRunInput{
Org: input.Org,
Repo: input.Repo,
EpicNumber: epic.Number,
Agent: input.Agent,
Template: input.Template,
DryRun: input.DispatchDryRun,
Limit: input.Limit,
Epic: &epic,
})
if runErr != nil {
return PipelineOnboardOutput{}, runErr
}
output.Runs = append(output.Runs, runOutput)
}
return output, nil
}
func (s *PrepSubsystem) pipelineOnboardDispatchDirect(ctx context.Context, input PipelineOnboardInput, issues []PipelineIssueRef) ([]PipelineIssueRef, error) {
dispatched := []PipelineIssueRef{}
for _, issue := range issues {
if issue.Number <= 0 || !pipelineIssueAutoDispatchAllowed(issue.Title) {
continue
}
if input.Limit > 0 && len(dispatched) >= input.Limit {
break
}
if input.DispatchDryRun {
dispatched = append(dispatched, issue)
continue
}
_, _, err := s.dispatch(ctx, nil, DispatchInput{
Org: input.Org,
Repo: input.Repo,
Task: issue.Title,
Issue: issue.Number,
Agent: input.Agent,
Template: input.Template,
})
if err != nil {
return nil, core.E("pipelineOnboard", core.Concat("failed to dispatch issue #", core.Sprint(issue.Number)), err)
}
dispatched = append(dispatched, issue)
}
return dispatched, nil
}

View file

@ -0,0 +1,72 @@
// SPDX-License-Identifier: EUPL-1.2
package agentic
import (
"context"
"testing"
core "dappco.re/go/core"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestPipelineOnboard_Good_ChainsAuditEpicAndDispatch(t *testing.T) {
repo := newPipelineTestRepo()
repo.Issues[1] = &pipelineTestIssue{
Number: 1,
Title: "[Audit] Security",
Body: "- Validate tokens\n- Sanitize input\n- Add rate limiting",
State: "open",
Labels: []string{"audit", "security"},
}
srv := newPipelineTestServer(t, map[string]*pipelineTestRepo{"go-io": repo})
s, _ := testPrepWithCore(t, srv)
output, err := s.pipelineOnboard(context.Background(), PipelineOnboardInput{
Org: "core",
Repo: "go-io",
DispatchDryRun: true,
})
require.NoError(t, err)
assert.True(t, output.Success)
assert.Len(t, output.Audit.Created, 3)
require.Len(t, output.Runs, 1)
assert.Len(t, output.Runs[0].Dispatched, 3)
assert.Empty(t, output.Direct)
}
func TestPipelineOnboard_Bad_MissingRepo(t *testing.T) {
s, _ := testPrepWithCore(t, nil)
result := s.cmdPipelineOnboard(core.NewOptions())
require.False(t, result.OK)
err, ok := result.Value.(error)
require.True(t, ok)
assert.Contains(t, err.Error(), "repo is required")
}
func TestPipelineOnboard_Ugly_DirectDispatchWhenEpicNotCreated(t *testing.T) {
repo := newPipelineTestRepo()
repo.Issues[1] = &pipelineTestIssue{
Number: 1,
Title: "[Audit] Security",
Body: "- Validate tokens\n- Sanitize input",
State: "open",
Labels: []string{"audit", "security"},
}
srv := newPipelineTestServer(t, map[string]*pipelineTestRepo{"go-io": repo})
s, _ := testPrepWithCore(t, srv)
output, err := s.pipelineOnboard(context.Background(), PipelineOnboardInput{
Org: "core",
Repo: "go-io",
DispatchDryRun: true,
})
require.NoError(t, err)
assert.Empty(t, output.Runs)
assert.Len(t, output.Direct, 2)
}

View file

@ -0,0 +1,32 @@
// SPDX-License-Identifier: EUPL-1.2
package agentic
import core "dappco.re/go/core"
func (s *PrepSubsystem) cmdPipelineTrainingCapture(_ core.Options) core.Result {
core.Print(nil, "status: not yet implemented")
core.Print(nil, "reason: blocked-on-sibling")
return core.Result{
Value: core.E("agentic.cmdPipelineTrainingCapture", "not yet implemented - blocked-on-sibling", nil),
OK: false,
}
}
func (s *PrepSubsystem) cmdPipelineTrainingStats(_ core.Options) core.Result {
core.Print(nil, "status: not yet implemented")
core.Print(nil, "reason: blocked-on-sibling")
return core.Result{
Value: core.E("agentic.cmdPipelineTrainingStats", "not yet implemented - blocked-on-sibling", nil),
OK: false,
}
}
func (s *PrepSubsystem) cmdPipelineTrainingExport(_ core.Options) core.Result {
core.Print(nil, "status: not yet implemented")
core.Print(nil, "reason: blocked-on-sibling")
return core.Result{
Value: core.E("agentic.cmdPipelineTrainingExport", "not yet implemented - blocked-on-sibling", nil),
OK: false,
}
}

View file

@ -0,0 +1,35 @@
// SPDX-License-Identifier: EUPL-1.2
package agentic
import (
"testing"
core "dappco.re/go/core"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestPipelineTraining_Good_RootHelp(t *testing.T) {
s, _ := testPrepWithCore(t, nil)
output := captureStdout(t, func() {
result := s.cmdPipelineTraining(core.NewOptions())
require.True(t, result.OK)
})
assert.Contains(t, output, "core-agent pipeline/training/capture")
assert.Contains(t, output, "core-agent pipeline/training/stats")
assert.Contains(t, output, "core-agent pipeline/training/export")
}
func TestPipelineTraining_Bad_CaptureBlockedOnSibling(t *testing.T) {
s, _ := testPrepWithCore(t, nil)
result := s.cmdPipelineTrainingCapture(core.NewOptions())
require.False(t, result.OK)
err, ok := result.Value.(error)
require.True(t, ok)
assert.Contains(t, err.Error(), "blocked-on-sibling")
}

View file

@ -0,0 +1,26 @@
version: "3"
tasks:
test:
cmds:
- |
bash <<'EOF'
set -euo pipefail
source ../_lib/run.sh
go build -trimpath -ldflags="-s -w" -o bin/core-agent ../../../cmd/core-agent
output="$(mktemp)"
run_capture_all 0 "$output" ./bin/core-agent pipeline
assert_contains "pipeline/audit <repo>" "$output"
run_capture_all 1 "$output" ./bin/core-agent pipeline/audit
assert_contains "pipeline/audit <repo>" "$output"
run_capture_all 1 "$output" ./bin/core-agent pipeline/budget/plan
assert_contains "blocked-on-sibling" "$output"
run_capture_all 1 "$output" ./bin/core-agent pipeline/training/capture
assert_contains "blocked-on-sibling" "$output"
EOF