From 2f9ffd5324d46e60cb730f5cd0835c328d30379c Mon Sep 17 00:00:00 2001 From: Snider Date: Sat, 25 Apr 2026 23:57:25 +0100 Subject: [PATCH] feat(agent/pipeline): implement pipeline/budget + pipeline/training (#536) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replaces the #535 stubs with full impl per RFC.pipeline.md. Lands: * pkg/agentic/pipeline_budget.go (extend) — budget/plan reads pool/rate config, counts logged dispatches from .core/db.duckdb (JSONL fallback), prints per-pool remaining budget. budget/log appends to .core/journal/dispatch.jsonl + mirrors to state store. * pkg/agentic/pipeline_training.go (extend) — training/capture pulls PR meta via MetaReader, captures PR diff via Forge PR-diff endpoint with `git show` fallback, records structural CodeRabbit-equivalent finding counts from review-thread totals, appends to .core/training/journal.jsonl. training/stats aggregates totals + zero-finding counts by repo. training/export filters to zero-finding entries → .core/training/export.jsonl (clean LEM training data). * pkg/agentic/training_journal.go (NEW) — shared journal helpers * AX-10 tests replace stubs (pipeline_budget_test.go + pipeline_training_test.go) * tests/cli/pipeline/Taskfile.yaml — end-to-end smoke covers all 5 subcommands against isolated temp workspace + local Forge stub LEM training data pipeline now feedable: merged PRs → training/capture → journal.jsonl → training/export (zero-finding filter) → ready for next LEK iteration. Sandbox blocked from go test by go.work + private-dep resolution; gofmt clean. Forge PR diff endpoint shape verified against Gitea API docs (1.19). Co-authored-by: Codex Closes tasks.lthn.sh/view.php?id=536 --- go.sum | 4 + pkg/agentic/pipeline_budget.go | 239 +++++++++++++++++++++-- pkg/agentic/pipeline_budget_test.go | 98 +++++++++- pkg/agentic/pipeline_training.go | 204 ++++++++++++++++++-- pkg/agentic/pipeline_training_test.go | 261 +++++++++++++++++++++++++- pkg/agentic/training_journal.go | 188 +++++++++++++++++++ tests/cli/pipeline/Taskfile.yaml | 119 +++++++++++- 7 files changed, 1075 insertions(+), 38 deletions(-) create mode 100644 pkg/agentic/training_journal.go diff --git a/go.sum b/go.sum index 3ec4d14..3c9d75f 100644 --- a/go.sum +++ b/go.sum @@ -298,6 +298,7 @@ golang.org/x/arch v0.25.0/go.mod h1:0X+GdSIP+kL5wPmpK7sdkEVTt2XoYP0cSjQSbZBwOi8= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.50.0 h1:zO47/JPrL6vsNkINmLoo/PH1gcxpls50DNogFvB5ZGI= +golang.org/x/crypto v0.50.0/go.mod h1:3muZ7vA7PBCE6xgPX7nkzzjiUq87kRItoJQM1Yo8S+Q= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/mod v0.34.0 h1:xIHgNUUnW6sYkcM5Jleh05DvLOtwc6RitGHbDk4akRI= golang.org/x/mod v0.34.0/go.mod h1:ykgH52iCZe79kzLLMhyCUzhMci+nQj+0XkbXpNYtVjY= @@ -307,6 +308,7 @@ golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.53.0 h1:d+qAbo5L0orcWAr0a9JweQpjXF19LMXJE8Ey7hwOdUA= +golang.org/x/net v0.53.0/go.mod h1:JvMuJH7rrdiCfbeHoo3fCQU24Lf5JJwT9W3sJFulfgs= golang.org/x/oauth2 v0.36.0 h1:peZ/1z27fi9hUOFCAZaHyrpWG5lwe0RJEEEeH0ThlIs= golang.org/x/oauth2 v0.36.0/go.mod h1:YDBUJMTkDnJS+A4BP4eZBjCqtokkg1hODuPjwiGPO7Q= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -321,6 +323,7 @@ golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.43.0 h1:Rlag2XtaFTxp19wS8MXlJwTvoh8ArU6ezoyFsMyCTNI= +golang.org/x/sys v0.43.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= @@ -330,6 +333,7 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.36.0 h1:JfKh3XmcRPqZPKevfXVpI1wXPTqbkE5f7JA92a55Yxg= +golang.org/x/text v0.36.0/go.mod h1:NIdBknypM8iqVmPiuco0Dh6P5Jcdk8lJL0CUebqK164= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190425150028-36563e24a262/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= diff --git a/pkg/agentic/pipeline_budget.go b/pkg/agentic/pipeline_budget.go index 59bf568..f575e2f 100644 --- a/pkg/agentic/pipeline_budget.go +++ b/pkg/agentic/pipeline_budget.go @@ -2,22 +2,237 @@ package agentic -import core "dappco.re/go/core" +import ( + "sort" + "time" + + core "dappco.re/go/core" +) + +const pipelineBudgetStoreGroup = "pipeline_budget_dispatch" + +// entry := pipelineBudgetEntry{Repo: "go-io", Agent: "codex", Model: "gpt-5.4", Status: "started"} +type pipelineBudgetEntry struct { + Timestamp string `json:"timestamp"` + Repo string `json:"repo"` + Agent string `json:"agent"` + Model string `json:"model,omitempty"` + Ticket string `json:"ticket,omitempty"` + Status string `json:"status,omitempty"` +} + +// row := pipelineBudgetPlanRow{Pool: "codex", UsedToday: 1, Remaining: "2"} +type pipelineBudgetPlanRow struct { + Pool string + UsedToday int + DailyLimit string + Remaining string + Running int + Concurrency string + ResetUTC string +} + +// path := pipelineBudgetJournalPath() +func pipelineBudgetJournalPath() string { + return core.JoinPath(CoreRoot(), "journal", "dispatch.jsonl") +} func (s *PrepSubsystem) cmdPipelineBudgetPlan(_ core.Options) core.Result { - core.Print(nil, "status: not yet implemented") - core.Print(nil, "reason: blocked-on-sibling") - return core.Result{ - Value: core.E("agentic.cmdPipelineBudgetPlan", "not yet implemented - blocked-on-sibling", nil), - OK: false, + rows := s.pipelineBudgetPlanRows(time.Now().UTC()) + if len(rows) == 0 { + core.Print(nil, "no configured dispatch pools") + return core.Result{OK: true} } + + core.Print(nil, " %-12s %-6s %-12s %-12s %-8s %-11s %s", "POOL", "USED", "DAILY_LIMIT", "REMAINING", "RUNNING", "CONCURRENCY", "RESET") + for _, row := range rows { + core.Print(nil, " %-12s %-6d %-12s %-12s %-8d %-11s %s", + row.Pool, + row.UsedToday, + row.DailyLimit, + row.Remaining, + row.Running, + row.Concurrency, + row.ResetUTC, + ) + } + if s.stateStoreInstance() == nil && s.stateStoreErr() != nil { + core.Print(nil, " note: using %s fallback because .core/db.duckdb is unavailable", pipelineBudgetJournalPath()) + } + return core.Result{Value: rows, OK: true} } -func (s *PrepSubsystem) cmdPipelineBudgetLog(_ core.Options) core.Result { - core.Print(nil, "status: not yet implemented") - core.Print(nil, "reason: blocked-on-sibling") - return core.Result{ - Value: core.E("agentic.cmdPipelineBudgetLog", "not yet implemented - blocked-on-sibling", nil), - OK: false, +func (s *PrepSubsystem) cmdPipelineBudgetLog(options core.Options) core.Result { + repo := pipelineRepoValue(options) + agent := optionStringValue(options, "agent") + if repo == "" || agent == "" { + core.Print(nil, "usage: core-agent pipeline/budget/log --repo= --agent= [--model=] [--ticket=] [--status=started]") + return core.Result{Value: core.E("agentic.cmdPipelineBudgetLog", "repo and agent are required", nil), OK: false} } + + model := optionStringValue(options, "model") + if model == "" { + model = modelVariant(agent) + } + entry := pipelineBudgetEntry{ + Timestamp: time.Now().UTC().Format(time.RFC3339Nano), + Repo: repo, + Agent: baseAgent(agent), + Model: model, + Ticket: optionStringValue(options, "ticket"), + Status: optionStringValue(options, "status"), + } + if entry.Status == "" { + entry.Status = "started" + } + + if err := appendJSONLRecord(pipelineBudgetJournalPath(), entry); err != nil { + core.Print(nil, "error: %v", err) + return core.Result{Value: err, OK: false} + } + s.pipelineBudgetMirrorToStore(entry) + + core.Print(nil, "timestamp: %s", entry.Timestamp) + core.Print(nil, "repo: %s", entry.Repo) + core.Print(nil, "agent: %s", entry.Agent) + core.Print(nil, "model: %s", firstNonEmpty(entry.Model, "-")) + core.Print(nil, "ticket: %s", firstNonEmpty(entry.Ticket, "-")) + core.Print(nil, "status: %s", entry.Status) + core.Print(nil, "journal: %s", pipelineBudgetJournalPath()) + return core.Result{Value: entry, OK: true} +} + +// rows := s.pipelineBudgetPlanRows(time.Now().UTC()) +func (s *PrepSubsystem) pipelineBudgetPlanRows(now time.Time) []pipelineBudgetPlanRow { + config := s.loadAgentsConfig() + pools := map[string]bool{} + for pool := range config.Concurrency { + pools[pool] = true + } + for pool := range config.Rates { + pools[pool] = true + } + if len(pools) == 0 { + return nil + } + + names := make([]string, 0, len(pools)) + for pool := range pools { + names = append(names, pool) + } + sort.Strings(names) + + entries := s.pipelineBudgetEntries() + rows := make([]pipelineBudgetPlanRow, 0, len(names)) + for _, pool := range names { + rate := config.Rates[pool] + windowStart, _ := pipelineBudgetWindow(rate.ResetUTC, now) + used := 0 + for _, entry := range entries { + if baseAgent(entry.Agent) != pool { + continue + } + timestamp, err := time.Parse(time.RFC3339Nano, entry.Timestamp) + if err != nil { + continue + } + if timestamp.Before(windowStart) { + continue + } + used++ + } + + row := pipelineBudgetPlanRow{ + Pool: pool, + UsedToday: used, + DailyLimit: "unlimited", + Remaining: "unlimited", + Running: s.countRunningByAgent(pool), + Concurrency: pipelineBudgetConcurrencyString(config.Concurrency[pool]), + ResetUTC: firstNonEmpty(rate.ResetUTC, "00:00"), + } + if rate.DailyLimit > 0 { + row.DailyLimit = core.Sprint(rate.DailyLimit) + remaining := rate.DailyLimit - used + if remaining < 0 { + remaining = 0 + } + row.Remaining = core.Sprint(remaining) + } + rows = append(rows, row) + } + return rows +} + +// entries := s.pipelineBudgetEntries() +func (s *PrepSubsystem) pipelineBudgetEntries() []pipelineBudgetEntry { + entries := []pipelineBudgetEntry{} + seen := map[string]bool{} + + s.stateStoreRestore(pipelineBudgetStoreGroup, func(_ string, value string) bool { + var entry pipelineBudgetEntry + if parseResult := core.JSONUnmarshalString(value, &entry); !parseResult.OK { + return true + } + if entry.Repo == "" || entry.Agent == "" || entry.Timestamp == "" { + return true + } + key := pipelineBudgetEntryKey(entry) + if !seen[key] { + seen[key] = true + entries = append(entries, entry) + } + return true + }) + + for _, line := range readJSONLLines(pipelineBudgetJournalPath()) { + var entry pipelineBudgetEntry + if parseResult := core.JSONUnmarshalString(line, &entry); !parseResult.OK { + continue + } + if entry.Repo == "" || entry.Agent == "" || entry.Timestamp == "" { + continue + } + key := pipelineBudgetEntryKey(entry) + if !seen[key] { + seen[key] = true + entries = append(entries, entry) + } + } + + return entries +} + +// _ = s.pipelineBudgetMirrorToStore(entry) +func (s *PrepSubsystem) pipelineBudgetMirrorToStore(entry pipelineBudgetEntry) { + key := pipelineBudgetEntryKey(entry) + s.stateStoreSet(pipelineBudgetStoreGroup, key, entry) +} + +// key := pipelineBudgetEntryKey(entry) +func pipelineBudgetEntryKey(entry pipelineBudgetEntry) string { + return core.Sprintf("%s|%s|%s|%s|%s|%s", entry.Timestamp, entry.Repo, entry.Agent, entry.Model, entry.Ticket, entry.Status) +} + +// start, end := pipelineBudgetWindow("06:00", time.Now().UTC()) +func pipelineBudgetWindow(resetUTC string, now time.Time) (time.Time, time.Time) { + resetHour, resetMinute := 0, 0 + parts := core.Split(resetUTC, ":") + if len(parts) >= 2 { + resetHour = parseIntString(parts[0]) + resetMinute = parseIntString(parts[1]) + } + windowStart := time.Date(now.Year(), now.Month(), now.Day(), resetHour, resetMinute, 0, 0, time.UTC) + if now.Before(windowStart) { + windowStart = windowStart.AddDate(0, 0, -1) + } + return windowStart, windowStart.AddDate(0, 0, 1) +} + +// text := pipelineBudgetConcurrencyString(ConcurrencyLimit{Total: 2}) +func pipelineBudgetConcurrencyString(limit ConcurrencyLimit) string { + if limit.Total <= 0 { + return "unlimited" + } + return core.Sprint(limit.Total) } diff --git a/pkg/agentic/pipeline_budget_test.go b/pkg/agentic/pipeline_budget_test.go index 36b33c9..2585e16 100644 --- a/pkg/agentic/pipeline_budget_test.go +++ b/pkg/agentic/pipeline_budget_test.go @@ -4,6 +4,7 @@ package agentic import ( "testing" + "time" core "dappco.re/go/core" "github.com/stretchr/testify/assert" @@ -22,13 +23,104 @@ func TestPipelineBudget_Good_RootHelp(t *testing.T) { assert.Contains(t, output, "core-agent pipeline/budget/log") } -func TestPipelineBudget_Bad_PlanBlockedOnSibling(t *testing.T) { +func TestPipelineBudget_Good_PlanShowsRemainingBudgetByPool(t *testing.T) { + s, _ := testPrepWithCore(t, nil) + require.True(t, fs.Write(core.JoinPath(CoreRoot(), "agents.yaml"), core.Concat( + "version: 1\n", + "concurrency:\n", + " codex: 1\n", + "rates:\n", + " codex:\n", + " reset_utc: \"00:00\"\n", + " daily_limit: 3\n", + " sustained_delay: 30\n", + )).OK) + + result := s.cmdPipelineBudgetLog(core.NewOptions( + core.Option{Key: "repo", Value: "go-io"}, + core.Option{Key: "agent", Value: "codex"}, + core.Option{Key: "model", Value: "gpt-5.4"}, + )) + require.True(t, result.OK) + + var rows []pipelineBudgetPlanRow + output := captureStdout(t, func() { + plan := s.cmdPipelineBudgetPlan(core.NewOptions()) + require.True(t, plan.OK) + var ok bool + rows, ok = plan.Value.([]pipelineBudgetPlanRow) + require.True(t, ok) + }) + + require.Len(t, rows, 1) + assert.Contains(t, output, "codex") + assert.Equal(t, 1, rows[0].UsedToday) + assert.Equal(t, "3", rows[0].DailyLimit) + assert.Equal(t, "2", rows[0].Remaining) +} + +func TestPipelineBudget_Good_LogAppendsJournal(t *testing.T) { s, _ := testPrepWithCore(t, nil) - result := s.cmdPipelineBudgetPlan(core.NewOptions()) + first := s.cmdPipelineBudgetLog(core.NewOptions( + core.Option{Key: "repo", Value: "go-io"}, + core.Option{Key: "agent", Value: "codex"}, + core.Option{Key: "model", Value: "gpt-5.4"}, + )) + second := s.cmdPipelineBudgetLog(core.NewOptions( + core.Option{Key: "repo", Value: "go-log"}, + core.Option{Key: "agent", Value: "claude"}, + core.Option{Key: "status", Value: "queued"}, + )) + + require.True(t, first.OK) + require.True(t, second.OK) + + lines := readJSONLLines(pipelineBudgetJournalPath()) + require.Len(t, lines, 2) + assert.Contains(t, lines[0], `"repo":"go-io"`) + assert.Contains(t, lines[1], `"repo":"go-log"`) +} + +func TestPipelineBudget_Bad_LogRequiresRepoAndAgent(t *testing.T) { + s, _ := testPrepWithCore(t, nil) + + result := s.cmdPipelineBudgetLog(core.NewOptions( + core.Option{Key: "repo", Value: "go-io"}, + )) require.False(t, result.OK) err, ok := result.Value.(error) require.True(t, ok) - assert.Contains(t, err.Error(), "blocked-on-sibling") + assert.Contains(t, err.Error(), "repo and agent are required") +} + +func TestPipelineBudget_Ugly_PlanSkipsCorruptJournalRows(t *testing.T) { + s, _ := testPrepWithCore(t, nil) + require.True(t, fs.Write(core.JoinPath(CoreRoot(), "agents.yaml"), core.Concat( + "version: 1\n", + "rates:\n", + " codex:\n", + " reset_utc: \"00:00\"\n", + " daily_limit: 2\n", + " sustained_delay: 30\n", + )).OK) + + valid := pipelineBudgetEntry{ + Timestamp: time.Now().UTC().Format(time.RFC3339), + Repo: "go-io", + Agent: "codex", + Model: "gpt-5.4", + Status: "started", + } + require.NoError(t, ensureParentDir(pipelineBudgetJournalPath())) + require.True(t, fs.WriteAtomic(pipelineBudgetJournalPath(), core.Concat( + "{not-json}\n", + core.JSONMarshalString(valid), + "\n", + )).OK) + + rows := s.pipelineBudgetPlanRows(time.Now().UTC()) + require.Len(t, rows, 1) + assert.Equal(t, 1, rows[0].UsedToday) } diff --git a/pkg/agentic/pipeline_training.go b/pkg/agentic/pipeline_training.go index a93ad2f..151af41 100644 --- a/pkg/agentic/pipeline_training.go +++ b/pkg/agentic/pipeline_training.go @@ -2,31 +2,201 @@ package agentic -import core "dappco.re/go/core" +import ( + "context" + "time" -func (s *PrepSubsystem) cmdPipelineTrainingCapture(_ core.Options) core.Result { - core.Print(nil, "status: not yet implemented") - core.Print(nil, "reason: blocked-on-sibling") - return core.Result{ - Value: core.E("agentic.cmdPipelineTrainingCapture", "not yet implemented - blocked-on-sibling", nil), - OK: false, + core "dappco.re/go/core" +) + +// input := PipelineTrainingCaptureInput{Org: "core", Repo: "go-io", Number: 42} +type PipelineTrainingCaptureInput struct { + Org string `json:"org,omitempty"` + Repo string `json:"repo"` + Number int `json:"number"` +} + +// out := PipelineTrainingCaptureOutput{Success: true, Repo: "go-io", PRNumber: 42} +type PipelineTrainingCaptureOutput struct { + Success bool `json:"success"` + Org string `json:"org,omitempty"` + Repo string `json:"repo"` + PRNumber int `json:"pr_number"` + CodeRabbitFindings int `json:"coderabbit_findings"` + DiffSource string `json:"diff_source,omitempty"` + JournalPath string `json:"journal_path,omitempty"` +} + +// out := PipelineTrainingExportOutput{Success: true, Exported: 3, Path: "/tmp/.core/training/export.jsonl"} +type PipelineTrainingExportOutput struct { + Success bool `json:"success"` + Exported int `json:"exported"` + Path string `json:"path"` +} + +// checks, passing, failing := pipelineTrainingCheckCounts(meta.Checks) +func pipelineTrainingCheckCounts(checks []PipelineCheckMeta) (int, int, int) { + total := len(checks) + passing := 0 + failing := 0 + for _, check := range checks { + switch core.Lower(firstNonEmpty(check.Conclusion, check.Status)) { + case "success", "completed": + passing++ + case "failure", "error", "cancelled", "timed_out": + failing++ + } } + return total, passing, failing +} + +func (s *PrepSubsystem) cmdPipelineTrainingCapture(options core.Options) core.Result { + ctx := s.commandContext() + org, repo, number := pipelineRepoAndNumberValue(options) + if repo == "" || number <= 0 { + core.Print(nil, "usage: core-agent pipeline/training/capture --repo= [--org=core]") + return core.Result{Value: core.E("agentic.cmdPipelineTrainingCapture", "repo and pull request number are required", nil), OK: false} + } + + output, err := s.pipelineTrainingCapture(ctx, PipelineTrainingCaptureInput{Org: org, Repo: repo, Number: number}) + if err != nil { + core.Print(nil, "error: %v", err) + return core.Result{Value: err, OK: false} + } + + core.Print(nil, "pr: %s/%s#%d", output.Org, output.Repo, output.PRNumber) + core.Print(nil, "findings: %d", output.CodeRabbitFindings) + core.Print(nil, "diff: %s", output.DiffSource) + core.Print(nil, "journal: %s", output.JournalPath) + return core.Result{Value: output, OK: true} } func (s *PrepSubsystem) cmdPipelineTrainingStats(_ core.Options) core.Result { - core.Print(nil, "status: not yet implemented") - core.Print(nil, "reason: blocked-on-sibling") - return core.Result{ - Value: core.E("agentic.cmdPipelineTrainingStats", "not yet implemented - blocked-on-sibling", nil), - OK: false, + entries := readPipelineTrainingJournal(pipelineTrainingJournalPath()) + stats := summarisePipelineTraining(entries) + + core.Print(nil, "journal: %s", pipelineTrainingJournalPath()) + core.Print(nil, "total_prs: %d", stats.TotalPRs) + core.Print(nil, "zero_finding_prs: %d", stats.ZeroFindingPRs) + for _, repo := range sortedTrainingRepos(stats.ByRepo) { + core.Print(nil, " %-16s total=%d zero=%d", repo, stats.ByRepo[repo], stats.ByRepoZeroClean[repo]) } + + return core.Result{Value: stats, OK: true} } func (s *PrepSubsystem) cmdPipelineTrainingExport(_ core.Options) core.Result { - core.Print(nil, "status: not yet implemented") - core.Print(nil, "reason: blocked-on-sibling") - return core.Result{ - Value: core.E("agentic.cmdPipelineTrainingExport", "not yet implemented - blocked-on-sibling", nil), - OK: false, + entries := filterZeroFindingTrainingEntries(readPipelineTrainingJournal(pipelineTrainingJournalPath())) + path := pipelineTrainingExportPath() + if err := writePipelineTrainingExport(path, entries); err != nil { + core.Print(nil, "error: %v", err) + return core.Result{Value: err, OK: false} } + output := PipelineTrainingExportOutput{ + Success: true, + Exported: len(entries), + Path: path, + } + core.Print(nil, "exported: %d", output.Exported) + core.Print(nil, "path: %s", output.Path) + return core.Result{Value: output, OK: true} +} + +func (s *PrepSubsystem) pipelineTrainingCapture(ctx context.Context, input PipelineTrainingCaptureInput) (PipelineTrainingCaptureOutput, error) { + if input.Repo == "" || input.Number <= 0 { + return PipelineTrainingCaptureOutput{}, core.E("pipelineTrainingCapture", "repo and pull request number are required", nil) + } + if input.Org == "" { + input.Org = "core" + } + if s.forgeToken == "" || s.forgeURL == "" { + return PipelineTrainingCaptureOutput{}, core.E("pipelineTrainingCapture", "Forge access is required to capture PR training data", nil) + } + + reader := &pipelineForgeMetaReader{subsystem: s, org: input.Org} + meta, err := reader.GetPRMeta(ctx, input.Repo, input.Number) + if err != nil { + return PipelineTrainingCaptureOutput{}, err + } + if meta.State != "merged" { + return PipelineTrainingCaptureOutput{}, core.E("pipelineTrainingCapture", core.Concat("pull request is not merged: ", core.Sprint(input.Number)), nil) + } + + diff, diffSource, err := s.pipelineTrainingReadDiff(ctx, input.Org, input.Repo, input.Number, meta) + if err != nil { + return PipelineTrainingCaptureOutput{}, err + } + + checksTotal, checksPassing, checksFailing := pipelineTrainingCheckCounts(meta.Checks) + entry := PipelineTrainingEntry{ + CapturedAt: time.Now().UTC().Format(time.RFC3339), + Org: input.Org, + Repo: input.Repo, + PRNumber: meta.Number, + PRURL: meta.URL, + State: meta.State, + Mergeable: meta.Mergeable, + BaseBranch: meta.BaseBranch, + HeadBranch: meta.HeadBranch, + HeadSHA: meta.HeadSHA, + ChecksTotal: checksTotal, + ChecksPassing: checksPassing, + ChecksFailing: checksFailing, + ReviewThreadsTotal: meta.ThreadsTotal, + ReviewThreadsResolved: meta.ThreadsResolved, + CodeRabbitFindings: meta.ThreadsTotal, + FindingSource: "review_threads_total", + Diff: diff, + DiffSource: diffSource, + } + if err := appendJSONLRecord(pipelineTrainingJournalPath(), entry); err != nil { + return PipelineTrainingCaptureOutput{}, err + } + + return PipelineTrainingCaptureOutput{ + Success: true, + Org: input.Org, + Repo: input.Repo, + PRNumber: meta.Number, + CodeRabbitFindings: entry.CodeRabbitFindings, + DiffSource: diffSource, + JournalPath: pipelineTrainingJournalPath(), + }, nil +} + +// diff, source, err := s.pipelineTrainingReadDiff(ctx, "core", "go-io", 42, meta) +func (s *PrepSubsystem) pipelineTrainingReadDiff(ctx context.Context, org, repo string, number int, meta PipelinePRMeta) (string, string, error) { + url := core.Sprintf("%s/api/v1/repos/%s/%s/pulls/%d.diff", s.forgeURL, org, repo, number) + result := HTTPGet(ctx, url, s.forgeToken, "token") + if result.OK && core.Trim(resultText(result)) != "" { + return resultText(result), "forge.pull.diff", nil + } + return s.pipelineTrainingReadGitDiff(ctx, org, repo, meta) +} + +// diff, source, err := s.pipelineTrainingReadGitDiff(ctx, "core", "go-io", meta) +func (s *PrepSubsystem) pipelineTrainingReadGitDiff(ctx context.Context, org, repo string, meta PipelinePRMeta) (string, string, error) { + repoDir := s.localRepoDir(org, repo) + if repoDir == "" || !fs.Exists(repoDir) || fs.IsFile(repoDir) { + return "", "", core.E("pipelineTrainingReadGitDiff", "no local repo checkout and Forge diff endpoint was unavailable", nil) + } + + process := s.Core().Process() + if meta.HeadSHA != "" { + showResult := process.RunIn(ctx, repoDir, "git", "show", "--format=", meta.HeadSHA) + if showResult.OK && core.Trim(resultText(showResult)) != "" { + return resultText(showResult), "git.show", nil + } + } + + if meta.BaseBranch != "" && meta.HeadBranch != "" { + _ = process.RunIn(ctx, repoDir, "git", "fetch", "origin", meta.BaseBranch) + _ = process.RunIn(ctx, repoDir, "git", "fetch", "origin", meta.HeadBranch) + diffResult := process.RunIn(ctx, repoDir, "git", "diff", core.Concat("origin/", meta.BaseBranch), core.Concat("origin/", meta.HeadBranch)) + if diffResult.OK && core.Trim(resultText(diffResult)) != "" { + return resultText(diffResult), "git.diff", nil + } + } + + return "", "", core.E("pipelineTrainingReadGitDiff", "unable to resolve pull request diff", nil) } diff --git a/pkg/agentic/pipeline_training_test.go b/pkg/agentic/pipeline_training_test.go index 8c4c068..e05d5ab 100644 --- a/pkg/agentic/pipeline_training_test.go +++ b/pkg/agentic/pipeline_training_test.go @@ -3,7 +3,11 @@ package agentic import ( + "context" + "net/http" + "net/http/httptest" "testing" + "time" core "dappco.re/go/core" "github.com/stretchr/testify/assert" @@ -23,7 +27,97 @@ func TestPipelineTraining_Good_RootHelp(t *testing.T) { assert.Contains(t, output, "core-agent pipeline/training/export") } -func TestPipelineTraining_Bad_CaptureBlockedOnSibling(t *testing.T) { +func TestPipelineTraining_Good_CaptureWritesJournal(t *testing.T) { + srv := newTrainingTestServer(t, trainingTestServerConfig{ + Repo: "go-io", + Number: 7, + Merged: true, + State: "closed", + HeadSHA: "deadbeef", + HeadRef: "agent/fix-tests", + BaseRef: "dev", + ReviewThreadsTotal: 2, + ReviewThreadsResolved: 2, + Diff: "diff --git a/main.go b/main.go\n+package main\n", + }) + t.Cleanup(srv.Close) + + s, _ := testPrepWithCore(t, srv) + output := captureStdout(t, func() { + result := s.cmdPipelineTrainingCapture(core.NewOptions( + core.Option{Key: "_arg", Value: "7"}, + core.Option{Key: "repo", Value: "go-io"}, + )) + require.True(t, result.OK) + }) + + entries := readPipelineTrainingJournal(pipelineTrainingJournalPath()) + require.Len(t, entries, 1) + assert.Contains(t, output, "go-io#7") + assert.Equal(t, 2, entries[0].CodeRabbitFindings) + assert.Equal(t, "forge.pull.diff", entries[0].DiffSource) + assert.Contains(t, entries[0].Diff, "diff --git") +} + +func TestPipelineTraining_Good_StatsAggregatesJournal(t *testing.T) { + s, _ := testPrepWithCore(t, nil) + require.NoError(t, appendJSONLRecord(pipelineTrainingJournalPath(), PipelineTrainingEntry{ + CapturedAt: "2026-04-25T12:00:00Z", + Repo: "go-io", + PRNumber: 7, + CodeRabbitFindings: 0, + })) + require.NoError(t, appendJSONLRecord(pipelineTrainingJournalPath(), PipelineTrainingEntry{ + CapturedAt: "2026-04-25T12:10:00Z", + Repo: "go-log", + PRNumber: 8, + CodeRabbitFindings: 3, + })) + + var stats PipelineTrainingStats + output := captureStdout(t, func() { + result := s.cmdPipelineTrainingStats(core.NewOptions()) + require.True(t, result.OK) + var ok bool + stats, ok = result.Value.(PipelineTrainingStats) + require.True(t, ok) + }) + + assert.Equal(t, 2, stats.TotalPRs) + assert.Equal(t, 1, stats.ZeroFindingPRs) + assert.Equal(t, 1, stats.ByRepo["go-io"]) + assert.Equal(t, 1, stats.ByRepo["go-log"]) + assert.Contains(t, output, "go-io") + assert.Contains(t, output, "go-log") +} + +func TestPipelineTraining_Good_ExportWritesZeroFindingOnly(t *testing.T) { + s, _ := testPrepWithCore(t, nil) + require.NoError(t, appendJSONLRecord(pipelineTrainingJournalPath(), PipelineTrainingEntry{ + CapturedAt: "2026-04-25T12:00:00Z", + Repo: "go-io", + PRNumber: 7, + CodeRabbitFindings: 0, + Diff: "clean diff", + })) + require.NoError(t, appendJSONLRecord(pipelineTrainingJournalPath(), PipelineTrainingEntry{ + CapturedAt: "2026-04-25T12:10:00Z", + Repo: "go-log", + PRNumber: 8, + CodeRabbitFindings: 2, + Diff: "noisy diff", + })) + + result := s.cmdPipelineTrainingExport(core.NewOptions()) + require.True(t, result.OK) + + exported := readPipelineTrainingJournal(pipelineTrainingExportPath()) + require.Len(t, exported, 1) + assert.Equal(t, "go-io", exported[0].Repo) + assert.Equal(t, 0, exported[0].CodeRabbitFindings) +} + +func TestPipelineTraining_Bad_CaptureRequiresRepoAndNumber(t *testing.T) { s, _ := testPrepWithCore(t, nil) result := s.cmdPipelineTrainingCapture(core.NewOptions()) @@ -31,5 +125,168 @@ func TestPipelineTraining_Bad_CaptureBlockedOnSibling(t *testing.T) { require.False(t, result.OK) err, ok := result.Value.(error) require.True(t, ok) - assert.Contains(t, err.Error(), "blocked-on-sibling") + assert.Contains(t, err.Error(), "repo and pull request number are required") +} + +func TestPipelineTraining_Bad_CaptureRejectsOpenPullRequest(t *testing.T) { + srv := newTrainingTestServer(t, trainingTestServerConfig{ + Repo: "go-io", + Number: 7, + Merged: false, + State: "open", + HeadSHA: "deadbeef", + HeadRef: "agent/fix-tests", + BaseRef: "dev", + Diff: "diff --git a/main.go b/main.go\n", + }) + t.Cleanup(srv.Close) + + s, _ := testPrepWithCore(t, srv) + result := s.cmdPipelineTrainingCapture(core.NewOptions( + core.Option{Key: "_arg", Value: "7"}, + core.Option{Key: "repo", Value: "go-io"}, + )) + + require.False(t, result.OK) + err, ok := result.Value.(error) + require.True(t, ok) + assert.Contains(t, err.Error(), "pull request is not merged") +} + +func TestPipelineTraining_Ugly_CaptureFallsBackToGitShow(t *testing.T) { + root := t.TempDir() + setTestWorkspace(t, root) + + c := core.New() + s := &PrepSubsystem{ + ServiceRuntime: core.NewServiceRuntime(c, AgentOptions{}), + codePath: t.TempDir(), + forgeToken: "test-token", + backoff: make(map[string]time.Time), + failCount: make(map[string]int), + } + + repoDir, headSHA := createTrainingGitRepo(t, c, s.codePath) + assert.NotEmpty(t, repoDir) + + srv := newTrainingTestServer(t, trainingTestServerConfig{ + Repo: "go-io", + Number: 9, + Merged: true, + State: "closed", + HeadSHA: headSHA, + HeadRef: "agent/feature", + BaseRef: "dev", + ReviewThreadsTotal: 0, + ReviewThreadsResolved: 0, + DiffStatus: http.StatusNotFound, + }) + t.Cleanup(srv.Close) + s.forgeURL = srv.URL + + output, err := s.pipelineTrainingCapture(context.Background(), PipelineTrainingCaptureInput{ + Org: "core", + Repo: "go-io", + Number: 9, + }) + require.NoError(t, err) + assert.Equal(t, "git.show", output.DiffSource) +} + +func TestPipelineTraining_Ugly_StatsSkipsCorruptJournalRows(t *testing.T) { + s, _ := testPrepWithCore(t, nil) + require.NoError(t, ensureParentDir(pipelineTrainingJournalPath())) + require.True(t, fs.WriteAtomic(pipelineTrainingJournalPath(), core.Concat( + "{not-json}\n", + core.JSONMarshalString(PipelineTrainingEntry{CapturedAt: "2026-04-25T12:00:00Z", Repo: "go-io", PRNumber: 7, CodeRabbitFindings: 0}), + "\n", + )).OK) + + result := s.cmdPipelineTrainingStats(core.NewOptions()) + require.True(t, result.OK) + stats, ok := result.Value.(PipelineTrainingStats) + require.True(t, ok) + assert.Equal(t, 1, stats.TotalPRs) +} + +type trainingTestServerConfig struct { + Repo string + Number int + Merged bool + State string + HeadSHA string + HeadRef string + BaseRef string + ReviewThreadsTotal int + ReviewThreadsResolved int + Diff string + DiffStatus int +} + +func newTrainingTestServer(t *testing.T, config trainingTestServerConfig) *httptest.Server { + t.Helper() + if config.DiffStatus == 0 { + config.DiffStatus = http.StatusOK + } + if config.State == "" { + config.State = "closed" + } + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case core.Sprintf("/api/v1/repos/core/%s/pulls/%d", config.Repo, config.Number): + payload := map[string]any{ + "number": config.Number, + "title": "Test PR", + "state": config.State, + "html_url": core.Sprintf("https://forge.test/core/%s/pulls/%d", config.Repo, config.Number), + "merged": config.Merged, + "mergeable": true, + "mergeable_state": "clean", + "review_threads_total": config.ReviewThreadsTotal, + "review_threads_resolved": config.ReviewThreadsResolved, + "review_comments": config.ReviewThreadsTotal, + "head": map[string]any{ + "ref": config.HeadRef, + "sha": config.HeadSHA, + "repo": map[string]any{ + "updated_at": "2026-04-25T12:00:00Z", + "pushed_at": "2026-04-25T12:00:00Z", + }, + }, + "base": map[string]any{ + "ref": config.BaseRef, + }, + } + _, _ = w.Write([]byte(core.JSONMarshalString(payload))) + case core.Sprintf("/api/v1/repos/core/%s/commits/%s/status", config.Repo, config.HeadSHA): + _, _ = w.Write([]byte(core.JSONMarshalString(map[string]any{ + "statuses": []map[string]any{ + {"context": "qa", "state": "success"}, + }, + }))) + case core.Sprintf("/api/v1/repos/core/%s/pulls/%d.diff", config.Repo, config.Number): + w.WriteHeader(config.DiffStatus) + if config.DiffStatus == http.StatusOK { + _, _ = w.Write([]byte(config.Diff)) + } + default: + w.WriteHeader(http.StatusNotFound) + } + })) +} + +func createTrainingGitRepo(t *testing.T, c *core.Core, codePath string) (string, string) { + t.Helper() + repoDir := core.JoinPath(codePath, "core", "go-io") + require.True(t, fs.EnsureDir(repoDir).OK) + require.True(t, c.Process().Run(context.Background(), "git", "init", "-b", "dev", repoDir).OK) + require.True(t, c.Process().RunIn(context.Background(), repoDir, "git", "config", "user.name", "Test").OK) + require.True(t, c.Process().RunIn(context.Background(), repoDir, "git", "config", "user.email", "test@example.com").OK) + require.True(t, fs.Write(core.JoinPath(repoDir, "main.go"), "package main\n\nfunc main() {}\n").OK) + require.True(t, c.Process().RunIn(context.Background(), repoDir, "git", "add", ".").OK) + require.True(t, c.Process().RunIn(context.Background(), repoDir, "git", "commit", "-m", "init").OK) + + head := c.Process().RunIn(context.Background(), repoDir, "git", "rev-parse", "HEAD") + require.True(t, head.OK) + return repoDir, core.Trim(resultText(head)) } diff --git a/pkg/agentic/training_journal.go b/pkg/agentic/training_journal.go new file mode 100644 index 0000000..43c8356 --- /dev/null +++ b/pkg/agentic/training_journal.go @@ -0,0 +1,188 @@ +// SPDX-License-Identifier: EUPL-1.2 + +package agentic + +import ( + "sort" + + core "dappco.re/go/core" +) + +// entry := PipelineTrainingEntry{Repo: "go-io", PRNumber: 42, CodeRabbitFindings: 0} +type PipelineTrainingEntry struct { + CapturedAt string `json:"captured_at"` + Org string `json:"org,omitempty"` + Repo string `json:"repo"` + PRNumber int `json:"pr_number"` + PRURL string `json:"pr_url,omitempty"` + State string `json:"state,omitempty"` + Mergeable string `json:"mergeable,omitempty"` + BaseBranch string `json:"base_branch,omitempty"` + HeadBranch string `json:"head_branch,omitempty"` + HeadSHA string `json:"head_sha,omitempty"` + ChecksTotal int `json:"checks_total,omitempty"` + ChecksPassing int `json:"checks_passing,omitempty"` + ChecksFailing int `json:"checks_failing,omitempty"` + ReviewThreadsTotal int `json:"review_threads_total,omitempty"` + ReviewThreadsResolved int `json:"review_threads_resolved,omitempty"` + CodeRabbitFindings int `json:"coderabbit_findings"` + FindingSource string `json:"finding_source,omitempty"` + Diff string `json:"diff,omitempty"` + DiffSource string `json:"diff_source,omitempty"` +} + +// stats := PipelineTrainingStats{TotalPRs: 5, ZeroFindingPRs: 2, ByRepo: map[string]int{"go-io": 3}} +type PipelineTrainingStats struct { + TotalPRs int `json:"total_prs"` + ZeroFindingPRs int `json:"zero_finding_prs"` + ByRepo map[string]int `json:"by_repo,omitempty"` + ByRepoZeroClean map[string]int `json:"by_repo_zero_finding,omitempty"` +} + +// path := pipelineTrainingJournalPath() +func pipelineTrainingJournalPath() string { + return core.JoinPath(CoreRoot(), "training", "journal.jsonl") +} + +// path := pipelineTrainingExportPath() +func pipelineTrainingExportPath() string { + return core.JoinPath(CoreRoot(), "training", "export.jsonl") +} + +// _ = ensureParentDir("/tmp/.core/training/journal.jsonl") +func ensureParentDir(path string) error { + if ensureResult := fs.EnsureDir(core.PathDir(path)); !ensureResult.OK { + if err, ok := ensureResult.Value.(error); ok { + return core.E("agentic.ensureParentDir", "prepare journal directory", err) + } + return core.E("agentic.ensureParentDir", "prepare journal directory", nil) + } + return nil +} + +// _ = appendJSONLRecord("/tmp/test.jsonl", map[string]any{"repo": "go-io"}) +func appendJSONLRecord(path string, value any) error { + if err := ensureParentDir(path); err != nil { + return err + } + handle := fs.Append(path) + if !handle.OK { + if err, ok := handle.Value.(error); ok { + return core.E("agentic.appendJSONLRecord", "open journal", err) + } + return core.E("agentic.appendJSONLRecord", "open journal", nil) + } + if writeResult := core.WriteAll(handle.Value, core.Concat(core.JSONMarshalString(value), "\n")); !writeResult.OK { + if err, ok := writeResult.Value.(error); ok { + return core.E("agentic.appendJSONLRecord", "append journal entry", err) + } + return core.E("agentic.appendJSONLRecord", "append journal entry", nil) + } + return nil +} + +// lines := readJSONLLines("/tmp/test.jsonl") +func readJSONLLines(path string) []string { + readResult := fs.Read(path) + if !readResult.OK { + return nil + } + lines := []string{} + for _, line := range core.Split(readResult.Value.(string), "\n") { + trimmed := core.Trim(line) + if trimmed != "" { + lines = append(lines, trimmed) + } + } + return lines +} + +// entries := readPipelineTrainingJournal("/tmp/.core/training/journal.jsonl") +func readPipelineTrainingJournal(path string) []PipelineTrainingEntry { + lines := readJSONLLines(path) + if len(lines) == 0 { + return nil + } + + entries := make([]PipelineTrainingEntry, 0, len(lines)) + for _, line := range lines { + var entry PipelineTrainingEntry + if parseResult := core.JSONUnmarshalString(line, &entry); !parseResult.OK { + continue + } + if entry.Repo == "" || entry.PRNumber <= 0 { + continue + } + entries = append(entries, entry) + } + return entries +} + +// stats := summarisePipelineTraining(entries) +func summarisePipelineTraining(entries []PipelineTrainingEntry) PipelineTrainingStats { + stats := PipelineTrainingStats{ + ByRepo: map[string]int{}, + ByRepoZeroClean: map[string]int{}, + } + for _, entry := range entries { + stats.TotalPRs++ + stats.ByRepo[entry.Repo]++ + if entry.CodeRabbitFindings == 0 { + stats.ZeroFindingPRs++ + stats.ByRepoZeroClean[entry.Repo]++ + } + } + if len(stats.ByRepo) == 0 { + stats.ByRepo = nil + } + if len(stats.ByRepoZeroClean) == 0 { + stats.ByRepoZeroClean = nil + } + return stats +} + +// clean := filterZeroFindingTrainingEntries(entries) +func filterZeroFindingTrainingEntries(entries []PipelineTrainingEntry) []PipelineTrainingEntry { + if len(entries) == 0 { + return nil + } + clean := []PipelineTrainingEntry{} + for _, entry := range entries { + if entry.CodeRabbitFindings == 0 { + clean = append(clean, entry) + } + } + return clean +} + +// _ = writePipelineTrainingExport("/tmp/.core/training/export.jsonl", entries) +func writePipelineTrainingExport(path string, entries []PipelineTrainingEntry) error { + if err := ensureParentDir(path); err != nil { + return err + } + builder := core.NewBuilder() + for _, entry := range entries { + builder.WriteString(core.JSONMarshalString(entry)) + builder.WriteString("\n") + } + if writeResult := fs.WriteAtomic(path, builder.String()); !writeResult.OK { + if err, ok := writeResult.Value.(error); ok { + return core.E("agentic.writePipelineTrainingExport", "write export", err) + } + return core.E("agentic.writePipelineTrainingExport", "write export", nil) + } + return nil +} + +// names := sortedTrainingRepos(map[string]int{"go-log": 1, "go-io": 2}) +func sortedTrainingRepos(values map[string]int) []string { + if len(values) == 0 { + return nil + } + names := make([]string, 0, len(values)) + for name := range values { + names = append(names, name) + } + sort.Strings(names) + return names +} diff --git a/tests/cli/pipeline/Taskfile.yaml b/tests/cli/pipeline/Taskfile.yaml index 44a5ed4..43f06bd 100644 --- a/tests/cli/pipeline/Taskfile.yaml +++ b/tests/cli/pipeline/Taskfile.yaml @@ -10,6 +10,98 @@ tasks: go build -trimpath -ldflags="-s -w" -o bin/core-agent ../../../cmd/core-agent + workspace="$(mktemp -d)" + port_file="$(mktemp)" + trap '[[ -n "${server_pid:-}" ]] && kill "${server_pid}" 2>/dev/null || true' EXIT + + export CORE_WORKSPACE="$workspace" + export CORE_HOME="$workspace" + export DIR_HOME="$workspace" + export CODE_PATH="$workspace/code" + + mkdir -p "$CODE_PATH/core" + cat >"$workspace/agents.yaml" <<'YAML' + version: 1 + concurrency: + codex: 1 + rates: + codex: + reset_utc: "00:00" + daily_limit: 2 + sustained_delay: 30 + YAML + + python3 - "$port_file" <<'PY' & + import json + import sys + from http.server import BaseHTTPRequestHandler, HTTPServer + + port_file = sys.argv[1] + + class Handler(BaseHTTPRequestHandler): + def do_GET(self): + if self.path == "/api/v1/repos/core/go-io/pulls/7": + payload = { + "number": 7, + "title": "Test PR", + "state": "closed", + "html_url": "https://forge.test/core/go-io/pulls/7", + "merged": True, + "mergeable": True, + "mergeable_state": "clean", + "review_threads_total": 0, + "review_threads_resolved": 0, + "review_comments": 0, + "comments": 0, + "head": { + "ref": "agent/fix-tests", + "sha": "deadbeef", + "repo": { + "updated_at": "2026-04-25T12:00:00Z", + "pushed_at": "2026-04-25T12:00:00Z", + }, + }, + "base": {"ref": "dev"}, + } + self.send_response(200) + self.end_headers() + self.wfile.write(json.dumps(payload).encode()) + return + + if self.path == "/api/v1/repos/core/go-io/commits/deadbeef/status": + payload = {"statuses": [{"context": "qa", "state": "success"}]} + self.send_response(200) + self.end_headers() + self.wfile.write(json.dumps(payload).encode()) + return + + if self.path == "/api/v1/repos/core/go-io/pulls/7.diff": + self.send_response(200) + self.end_headers() + self.wfile.write(b"diff --git a/main.go b/main.go\n+package main\n") + return + + self.send_response(404) + self.end_headers() + + def log_message(self, *_args): + return + + server = HTTPServer(("127.0.0.1", 0), Handler) + with open(port_file, "w", encoding="utf-8") as handle: + handle.write(str(server.server_address[1])) + server.serve_forever() + PY + server_pid=$! + + for _ in $(seq 1 50); do + [[ -s "$port_file" ]] && break + sleep 0.1 + done + + export FORGE_TOKEN="test-token" + export FORGE_URL="http://127.0.0.1:$(cat "$port_file")" + output="$(mktemp)" run_capture_all 0 "$output" ./bin/core-agent pipeline @@ -18,9 +110,28 @@ tasks: run_capture_all 1 "$output" ./bin/core-agent pipeline/audit assert_contains "pipeline/audit " "$output" - run_capture_all 1 "$output" ./bin/core-agent pipeline/budget/plan - assert_contains "blocked-on-sibling" "$output" + run_capture_all 0 "$output" ./bin/core-agent pipeline/budget/plan + assert_contains "codex" "$output" + grep -Eq 'codex[[:space:]]+0[[:space:]]+2[[:space:]]+2' "$output" - run_capture_all 1 "$output" ./bin/core-agent pipeline/training/capture - assert_contains "blocked-on-sibling" "$output" + run_capture_all 0 "$output" ./bin/core-agent pipeline/budget/log --repo=go-io --agent=codex --model=gpt-5.4 + assert_contains "journal:" "$output" + + run_capture_all 0 "$output" ./bin/core-agent pipeline/budget/plan + assert_contains "codex" "$output" + grep -Eq 'codex[[:space:]]+1[[:space:]]+2[[:space:]]+1' "$output" + + run_capture_all 0 "$output" ./bin/core-agent pipeline/training/capture 7 --repo=go-io + assert_contains "go-io#7" "$output" + + run_capture_all 0 "$output" ./bin/core-agent pipeline/training/stats + assert_contains "total_prs: 1" "$output" + assert_contains "zero_finding_prs: 1" "$output" + + run_capture_all 0 "$output" ./bin/core-agent pipeline/training/export + assert_contains "exported: 1" "$output" + + assert_contains '"repo":"go-io"' "$workspace/journal/dispatch.jsonl" + assert_contains '"pr_number":7' "$workspace/training/journal.jsonl" + assert_jq '.repo == "go-io" and .pr_number == 7 and .coderabbit_findings == 0' "$workspace/training/export.jsonl" EOF