From ce7c81a15bb7ab26f8671778b5c22fa930b7aa03 Mon Sep 17 00:00:00 2001 From: Virgil Date: Mon, 30 Mar 2026 22:04:36 +0000 Subject: [PATCH] fix(ax): align orchestration comments with usage examples Co-Authored-By: Virgil --- pkg/agentic/queue.go | 30 +++++++----------------------- pkg/agentic/verify.go | 34 +++++++++------------------------- pkg/brain/provider.go | 1 - pkg/runner/runner.go | 12 +++--------- 4 files changed, 19 insertions(+), 58 deletions(-) diff --git a/pkg/agentic/queue.go b/pkg/agentic/queue.go index 36fc74e..e44bd20 100644 --- a/pkg/agentic/queue.go +++ b/pkg/agentic/queue.go @@ -43,18 +43,14 @@ type ConcurrencyLimit struct { Models map[string]int } -// UnmarshalYAML handles both int and map forms for concurrency limits. -// -// var limit ConcurrencyLimit -// _ = yaml.Unmarshal([]byte("total: 2\ngpt-5.4: 1\n"), &limit) +// var limit ConcurrencyLimit +// _ = yaml.Unmarshal([]byte("total: 2\ngpt-5.4: 1\n"), &limit) func (c *ConcurrencyLimit) UnmarshalYAML(value *yaml.Node) error { - // Try int first var n int if err := value.Decode(&n); err == nil { c.Total = n return nil } - // Try map var m map[string]int if err := value.Decode(&m); err != nil { return err @@ -79,7 +75,7 @@ type AgentsConfig struct { Rates map[string]RateConfig `yaml:"rates"` } -// loadAgentsConfig reads config/agents.yaml from the code path. +// config := s.loadAgentsConfig() func (s *PrepSubsystem) loadAgentsConfig() *AgentsConfig { paths := []string{ core.JoinPath(CoreRoot(), "agents.yaml"), @@ -110,10 +106,8 @@ func (s *PrepSubsystem) loadAgentsConfig() *AgentsConfig { } } -// delayForAgent calculates how long to wait before spawning the next task -// for a given agent type, based on rate config and time of day. +// delay := s.delayForAgent("codex:gpt-5.4") func (s *PrepSubsystem) delayForAgent(agent string) time.Duration { - // Read from Core Config (loaded once at registration) var rates map[string]RateConfig if s.ServiceRuntime != nil { rates, _ = s.Core().Config().Get("agents.rates").Value.(map[string]RateConfig) @@ -128,7 +122,6 @@ func (s *PrepSubsystem) delayForAgent(agent string) time.Duration { return 0 } - // Parse reset time resetHour, resetMin := 6, 0 parts := core.Split(rate.ResetUTC, ":") if len(parts) >= 2 { @@ -143,18 +136,15 @@ func (s *PrepSubsystem) delayForAgent(agent string) time.Duration { now := time.Now().UTC() resetToday := time.Date(now.Year(), now.Month(), now.Day(), resetHour, resetMin, 0, 0, time.UTC) if now.Before(resetToday) { - // Reset hasn't happened yet today — reset was yesterday resetToday = resetToday.AddDate(0, 0, -1) } nextReset := resetToday.AddDate(0, 0, 1) hoursUntilReset := nextReset.Sub(now).Hours() - // Burst mode: if within burst window of reset, use burst delay if rate.BurstWindow > 0 && hoursUntilReset <= float64(rate.BurstWindow) { return time.Duration(rate.BurstDelay) * time.Second } - // Sustained mode return time.Duration(rate.SustainedDelay) * time.Second } @@ -181,8 +171,6 @@ func (s *PrepSubsystem) countRunningByAgent(agent string) int { return s.countRunningByAgentDisk(runtime, agent) } -// countRunningByAgentDisk scans workspace status.json files on disk. -// Used only as fallback before Registry hydration completes. func (s *PrepSubsystem) countRunningByAgentDisk(runtime *core.Core, agent string) int { count := 0 for _, statusPath := range WorkspaceStatusPaths() { @@ -224,8 +212,6 @@ func (s *PrepSubsystem) countRunningByModel(agent string) int { return s.countRunningByModelDisk(runtime, agent) } -// countRunningByModelDisk scans workspace status.json files on disk. -// Used only as fallback before Registry hydration completes. func (s *PrepSubsystem) countRunningByModelDisk(runtime *core.Core, agent string) int { count := 0 for _, statusPath := range WorkspaceStatusPaths() { @@ -244,7 +230,7 @@ func (s *PrepSubsystem) countRunningByModelDisk(runtime *core.Core, agent string return count } -// baseAgent strips the model variant (gemini:flash → gemini). +// base := baseAgent("gemini:flash") // "gemini" func baseAgent(agent string) string { return core.SplitN(agent, ":", 2)[0] } @@ -304,8 +290,7 @@ func modelVariant(agent string) string { return parts[1] } -// drainQueue fills all available concurrency slots from queued workspaces. -// Serialised via c.Lock("drain") when Core is available, falls back to local mutex. +// s.drainQueue() func (s *PrepSubsystem) drainQueue() { if s.frozen { return @@ -323,8 +308,7 @@ func (s *PrepSubsystem) drainQueue() { } } -// drainOne finds the oldest queued workspace and spawns it if a slot is available. -// Returns true if a task was spawned, false if nothing to do. +// spawned := s.drainOne() func (s *PrepSubsystem) drainOne() bool { for _, statusPath := range WorkspaceStatusPaths() { workspaceDir := core.PathDir(statusPath) diff --git a/pkg/agentic/verify.go b/pkg/agentic/verify.go index dfe3bc5..c40312d 100644 --- a/pkg/agentic/verify.go +++ b/pkg/agentic/verify.go @@ -9,13 +9,7 @@ import ( core "dappco.re/go/core" ) -// autoVerifyAndMerge runs inline tests (fast gate) and merges if they pass. -// If tests fail or merge fails due to conflict, attempts one rebase+retry. -// If the retry also fails, labels the PR "needs-review" for human attention. -// -// For deeper review (security, conventions), dispatch a separate task: -// -// agentic_dispatch repo=go-crypt template=verify persona=engineering/engineering-security-engineer +// s.autoVerifyAndMerge("/srv/core/workspace/core/go-io/task-5") func (s *PrepSubsystem) autoVerifyAndMerge(workspaceDir string) { result := ReadStatusResult(workspaceDir) workspaceStatus, ok := workspaceStatusValue(result) @@ -34,7 +28,6 @@ func (s *PrepSubsystem) autoVerifyAndMerge(workspaceDir string) { return } - // markMerged is a helper to avoid repeating the status update. markMerged := func() { if result := ReadStatusResult(workspaceDir); result.OK { st2, ok := workspaceStatusValue(result) @@ -46,14 +39,12 @@ func (s *PrepSubsystem) autoVerifyAndMerge(workspaceDir string) { } } - // Attempt 1: run tests and try to merge mergeOutcome := s.attemptVerifyAndMerge(repoDir, org, workspaceStatus.Repo, workspaceStatus.Branch, pullRequestNumber) if mergeOutcome == mergeSuccess { markMerged() return } - // Attempt 2: rebase onto main and retry if mergeOutcome == mergeConflict || mergeOutcome == testFailed { if s.rebaseBranch(repoDir, workspaceStatus.Branch) { if s.attemptVerifyAndMerge(repoDir, org, workspaceStatus.Repo, workspaceStatus.Branch, pullRequestNumber) == mergeSuccess { @@ -63,7 +54,6 @@ func (s *PrepSubsystem) autoVerifyAndMerge(workspaceDir string) { } } - // Both attempts failed — flag for human review s.flagForReview(org, workspaceStatus.Repo, pullRequestNumber, mergeOutcome) if result := ReadStatusResult(workspaceDir); result.OK { @@ -84,7 +74,7 @@ const ( mergeConflict // tests passed but merge failed (conflict) ) -// attemptVerifyAndMerge runs tests and tries to merge. Returns the outcome. +// s.attemptVerifyAndMerge("/srv/core/workspace/core/go-io/task-5/repo", "core", "go-io", "feature/ax-cleanup", 42) func (s *PrepSubsystem) attemptVerifyAndMerge(repoDir, org, repo, branch string, pullRequestNumber int) mergeResult { testResult := s.runVerification(repoDir) @@ -95,7 +85,6 @@ func (s *PrepSubsystem) attemptVerifyAndMerge(repoDir, org, repo, branch string, return testFailed } - // Tests passed — try merge ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() @@ -110,7 +99,7 @@ func (s *PrepSubsystem) attemptVerifyAndMerge(repoDir, org, repo, branch string, return mergeSuccess } -// rebaseBranch rebases the current branch onto the default branch and force-pushes. +// s.rebaseBranch("/srv/core/workspace/core/go-io/task-5/repo", "feature/ax-cleanup") func (s *PrepSubsystem) rebaseBranch(repoDir, branch string) bool { ctx := context.Background() process := s.Core().Process() @@ -139,22 +128,19 @@ func (s *PrepSubsystem) rebaseBranch(repoDir, branch string) bool { return process.RunIn(ctx, repoDir, "git", "push", "--force-with-lease", forgeRemote, branch).OK } -// flagForReview adds the "needs-review" label to the PR via Forge API. +// s.flagForReview("core", "go-io", 42, mergeConflict) func (s *PrepSubsystem) flagForReview(org, repo string, pullRequestNumber int, mergeOutcome mergeResult) { ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) defer cancel() - // Ensure the label exists s.ensureLabel(ctx, org, repo, "needs-review", "e11d48") - // Add label to PR payload := core.JSONMarshalString(map[string]any{ "labels": []int{s.getLabelID(ctx, org, repo, "needs-review")}, }) url := core.Sprintf("%s/api/v1/repos/%s/%s/issues/%d/labels", s.forgeURL, org, repo, pullRequestNumber) HTTPPost(ctx, url, payload, s.forgeToken, "token") - // Comment explaining the situation reason := "Tests failed after rebase" if mergeOutcome == mergeConflict { reason = "Merge conflict persists after rebase" @@ -163,7 +149,7 @@ func (s *PrepSubsystem) flagForReview(org, repo string, pullRequestNumber int, m s.commentOnIssue(ctx, org, repo, pullRequestNumber, comment) } -// ensureLabel creates a label if it doesn't exist. +// s.ensureLabel(context.Background(), "core", "go-io", "needs-review", "e11d48") func (s *PrepSubsystem) ensureLabel(ctx context.Context, org, repo, name, colour string) { payload := core.JSONMarshalString(map[string]string{ "name": name, @@ -173,7 +159,7 @@ func (s *PrepSubsystem) ensureLabel(ctx context.Context, org, repo, name, colour HTTPPost(ctx, url, payload, s.forgeToken, "token") } -// getLabelID fetches the ID of a label by name. +// s.getLabelID(context.Background(), "core", "go-io", "needs-review") func (s *PrepSubsystem) getLabelID(ctx context.Context, org, repo, name string) int { url := core.Sprintf("%s/api/v1/repos/%s/%s/labels", s.forgeURL, org, repo) getResult := HTTPGet(ctx, url, s.forgeToken, "token") @@ -194,7 +180,6 @@ func (s *PrepSubsystem) getLabelID(ctx context.Context, org, repo, name string) return 0 } -// verifyResult holds the outcome of running tests. type verifyResult struct { passed bool output string @@ -212,7 +197,7 @@ func resultText(result core.Result) string { return "" } -// runVerification detects the project type and runs the appropriate test suite. +// s.runVerification("/srv/core/workspace/core/go-io/task-5/repo") func (s *PrepSubsystem) runVerification(repoDir string) verifyResult { if fileExists(core.JoinPath(repoDir, "go.mod")) { return s.runGoTests(repoDir) @@ -277,7 +262,7 @@ func (s *PrepSubsystem) runNodeTests(repoDir string) verifyResult { return verifyResult{passed: testResult.OK, output: out, exitCode: exitCode, testCmd: "npm test"} } -// forgeMergePR merges a PR via the Forge API. +// s.forgeMergePR(context.Background(), "core", "go-io", 42) func (s *PrepSubsystem) forgeMergePR(ctx context.Context, org, repo string, pullRequestNumber int) core.Result { payload := core.JSONMarshalString(map[string]any{ "Do": "merge", @@ -289,7 +274,7 @@ func (s *PrepSubsystem) forgeMergePR(ctx context.Context, org, repo string, pull return HTTPPost(ctx, url, payload, s.forgeToken, "token") } -// extractPullRequestNumber gets the PR number from a Forge PR URL. +// extractPullRequestNumber("https://forge.lthn.ai/core/go-io/pulls/42") func extractPullRequestNumber(pullRequestURL string) int { parts := core.Split(pullRequestURL, "/") if len(parts) == 0 { @@ -298,7 +283,6 @@ func extractPullRequestNumber(pullRequestURL string) int { return parseInt(parts[len(parts)-1]) } -// fileExists checks if a file exists. func fileExists(path string) bool { return fs.IsFile(path) } diff --git a/pkg/brain/provider.go b/pkg/brain/provider.go index 40a9252..b5de871 100644 --- a/pkg/brain/provider.go +++ b/pkg/brain/provider.go @@ -19,7 +19,6 @@ type BrainProvider struct { hub *ws.Hub } -// compile-time interface checks var ( _ provider.Provider = (*BrainProvider)(nil) _ provider.Streamable = (*BrainProvider)(nil) diff --git a/pkg/runner/runner.go b/pkg/runner/runner.go index d254405..4608f58 100644 --- a/pkg/runner/runner.go +++ b/pkg/runner/runner.go @@ -1,10 +1,9 @@ // SPDX-License-Identifier: EUPL-1.2 -// Package runner is the agent dispatch service. -// Owns concurrency, queue drain, workspace lifecycle, and frozen state. -// Communicates with other services via Core IPC — Actions, Tasks, and Messages. +// Package runner owns agent dispatch and workspace lifecycle. // -// core.New(core.WithService(runner.Register)) +// service := runner.New() +// service.TrackWorkspace("core/go-io/task-5", &runner.WorkspaceStatus{Status: "running", Agent: "codex"}) package runner import ( @@ -84,7 +83,6 @@ func Register(coreApp *core.Core) core.Result { func (s *Service) OnStartup(ctx context.Context) core.Result { coreApp := s.Core() - // Actions — the runner's capability map coreApp.Action("runner.dispatch", s.actionDispatch).Description = "Dispatch a subagent (checks frozen + concurrency)" coreApp.Action("runner.status", s.actionStatus).Description = "Query workspace status" coreApp.Action("runner.start", s.actionStart).Description = "Unfreeze dispatch queue" @@ -92,13 +90,10 @@ func (s *Service) OnStartup(ctx context.Context) core.Result { coreApp.Action("runner.kill", s.actionKill).Description = "Kill all running agents (hard stop)" coreApp.Action("runner.poke", s.actionPoke).Description = "Drain next queued task" - // Hydrate workspace registry from disk s.hydrateWorkspaces() - // QUERY handler — workspace state queries coreApp.RegisterQuery(s.handleWorkspaceQuery) - // Start the background queue runner s.startRunner() return core.Result{OK: true} @@ -411,7 +406,6 @@ func (s *Service) hydrateWorkspaces() { if !ok || workspaceStatus == nil { continue } - // Re-queue running agents on restart — process is dead, re-dispatch if workspaceStatus.Status == "running" { workspaceStatus.Status = "queued" }