fix(ax): align orchestration comments with usage examples

Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
Virgil 2026-03-30 22:04:36 +00:00
parent 9997e7aecc
commit ce7c81a15b
4 changed files with 19 additions and 58 deletions

View file

@ -43,18 +43,14 @@ type ConcurrencyLimit struct {
Models map[string]int
}
// UnmarshalYAML handles both int and map forms for concurrency limits.
//
// var limit ConcurrencyLimit
// _ = yaml.Unmarshal([]byte("total: 2\ngpt-5.4: 1\n"), &limit)
// var limit ConcurrencyLimit
// _ = yaml.Unmarshal([]byte("total: 2\ngpt-5.4: 1\n"), &limit)
func (c *ConcurrencyLimit) UnmarshalYAML(value *yaml.Node) error {
// Try int first
var n int
if err := value.Decode(&n); err == nil {
c.Total = n
return nil
}
// Try map
var m map[string]int
if err := value.Decode(&m); err != nil {
return err
@ -79,7 +75,7 @@ type AgentsConfig struct {
Rates map[string]RateConfig `yaml:"rates"`
}
// loadAgentsConfig reads config/agents.yaml from the code path.
// config := s.loadAgentsConfig()
func (s *PrepSubsystem) loadAgentsConfig() *AgentsConfig {
paths := []string{
core.JoinPath(CoreRoot(), "agents.yaml"),
@ -110,10 +106,8 @@ func (s *PrepSubsystem) loadAgentsConfig() *AgentsConfig {
}
}
// delayForAgent calculates how long to wait before spawning the next task
// for a given agent type, based on rate config and time of day.
// delay := s.delayForAgent("codex:gpt-5.4")
func (s *PrepSubsystem) delayForAgent(agent string) time.Duration {
// Read from Core Config (loaded once at registration)
var rates map[string]RateConfig
if s.ServiceRuntime != nil {
rates, _ = s.Core().Config().Get("agents.rates").Value.(map[string]RateConfig)
@ -128,7 +122,6 @@ func (s *PrepSubsystem) delayForAgent(agent string) time.Duration {
return 0
}
// Parse reset time
resetHour, resetMin := 6, 0
parts := core.Split(rate.ResetUTC, ":")
if len(parts) >= 2 {
@ -143,18 +136,15 @@ func (s *PrepSubsystem) delayForAgent(agent string) time.Duration {
now := time.Now().UTC()
resetToday := time.Date(now.Year(), now.Month(), now.Day(), resetHour, resetMin, 0, 0, time.UTC)
if now.Before(resetToday) {
// Reset hasn't happened yet today — reset was yesterday
resetToday = resetToday.AddDate(0, 0, -1)
}
nextReset := resetToday.AddDate(0, 0, 1)
hoursUntilReset := nextReset.Sub(now).Hours()
// Burst mode: if within burst window of reset, use burst delay
if rate.BurstWindow > 0 && hoursUntilReset <= float64(rate.BurstWindow) {
return time.Duration(rate.BurstDelay) * time.Second
}
// Sustained mode
return time.Duration(rate.SustainedDelay) * time.Second
}
@ -181,8 +171,6 @@ func (s *PrepSubsystem) countRunningByAgent(agent string) int {
return s.countRunningByAgentDisk(runtime, agent)
}
// countRunningByAgentDisk scans workspace status.json files on disk.
// Used only as fallback before Registry hydration completes.
func (s *PrepSubsystem) countRunningByAgentDisk(runtime *core.Core, agent string) int {
count := 0
for _, statusPath := range WorkspaceStatusPaths() {
@ -224,8 +212,6 @@ func (s *PrepSubsystem) countRunningByModel(agent string) int {
return s.countRunningByModelDisk(runtime, agent)
}
// countRunningByModelDisk scans workspace status.json files on disk.
// Used only as fallback before Registry hydration completes.
func (s *PrepSubsystem) countRunningByModelDisk(runtime *core.Core, agent string) int {
count := 0
for _, statusPath := range WorkspaceStatusPaths() {
@ -244,7 +230,7 @@ func (s *PrepSubsystem) countRunningByModelDisk(runtime *core.Core, agent string
return count
}
// baseAgent strips the model variant (gemini:flash → gemini).
// base := baseAgent("gemini:flash") // "gemini"
func baseAgent(agent string) string {
return core.SplitN(agent, ":", 2)[0]
}
@ -304,8 +290,7 @@ func modelVariant(agent string) string {
return parts[1]
}
// drainQueue fills all available concurrency slots from queued workspaces.
// Serialised via c.Lock("drain") when Core is available, falls back to local mutex.
// s.drainQueue()
func (s *PrepSubsystem) drainQueue() {
if s.frozen {
return
@ -323,8 +308,7 @@ func (s *PrepSubsystem) drainQueue() {
}
}
// drainOne finds the oldest queued workspace and spawns it if a slot is available.
// Returns true if a task was spawned, false if nothing to do.
// spawned := s.drainOne()
func (s *PrepSubsystem) drainOne() bool {
for _, statusPath := range WorkspaceStatusPaths() {
workspaceDir := core.PathDir(statusPath)

View file

@ -9,13 +9,7 @@ import (
core "dappco.re/go/core"
)
// autoVerifyAndMerge runs inline tests (fast gate) and merges if they pass.
// If tests fail or merge fails due to conflict, attempts one rebase+retry.
// If the retry also fails, labels the PR "needs-review" for human attention.
//
// For deeper review (security, conventions), dispatch a separate task:
//
// agentic_dispatch repo=go-crypt template=verify persona=engineering/engineering-security-engineer
// s.autoVerifyAndMerge("/srv/core/workspace/core/go-io/task-5")
func (s *PrepSubsystem) autoVerifyAndMerge(workspaceDir string) {
result := ReadStatusResult(workspaceDir)
workspaceStatus, ok := workspaceStatusValue(result)
@ -34,7 +28,6 @@ func (s *PrepSubsystem) autoVerifyAndMerge(workspaceDir string) {
return
}
// markMerged is a helper to avoid repeating the status update.
markMerged := func() {
if result := ReadStatusResult(workspaceDir); result.OK {
st2, ok := workspaceStatusValue(result)
@ -46,14 +39,12 @@ func (s *PrepSubsystem) autoVerifyAndMerge(workspaceDir string) {
}
}
// Attempt 1: run tests and try to merge
mergeOutcome := s.attemptVerifyAndMerge(repoDir, org, workspaceStatus.Repo, workspaceStatus.Branch, pullRequestNumber)
if mergeOutcome == mergeSuccess {
markMerged()
return
}
// Attempt 2: rebase onto main and retry
if mergeOutcome == mergeConflict || mergeOutcome == testFailed {
if s.rebaseBranch(repoDir, workspaceStatus.Branch) {
if s.attemptVerifyAndMerge(repoDir, org, workspaceStatus.Repo, workspaceStatus.Branch, pullRequestNumber) == mergeSuccess {
@ -63,7 +54,6 @@ func (s *PrepSubsystem) autoVerifyAndMerge(workspaceDir string) {
}
}
// Both attempts failed — flag for human review
s.flagForReview(org, workspaceStatus.Repo, pullRequestNumber, mergeOutcome)
if result := ReadStatusResult(workspaceDir); result.OK {
@ -84,7 +74,7 @@ const (
mergeConflict // tests passed but merge failed (conflict)
)
// attemptVerifyAndMerge runs tests and tries to merge. Returns the outcome.
// s.attemptVerifyAndMerge("/srv/core/workspace/core/go-io/task-5/repo", "core", "go-io", "feature/ax-cleanup", 42)
func (s *PrepSubsystem) attemptVerifyAndMerge(repoDir, org, repo, branch string, pullRequestNumber int) mergeResult {
testResult := s.runVerification(repoDir)
@ -95,7 +85,6 @@ func (s *PrepSubsystem) attemptVerifyAndMerge(repoDir, org, repo, branch string,
return testFailed
}
// Tests passed — try merge
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
@ -110,7 +99,7 @@ func (s *PrepSubsystem) attemptVerifyAndMerge(repoDir, org, repo, branch string,
return mergeSuccess
}
// rebaseBranch rebases the current branch onto the default branch and force-pushes.
// s.rebaseBranch("/srv/core/workspace/core/go-io/task-5/repo", "feature/ax-cleanup")
func (s *PrepSubsystem) rebaseBranch(repoDir, branch string) bool {
ctx := context.Background()
process := s.Core().Process()
@ -139,22 +128,19 @@ func (s *PrepSubsystem) rebaseBranch(repoDir, branch string) bool {
return process.RunIn(ctx, repoDir, "git", "push", "--force-with-lease", forgeRemote, branch).OK
}
// flagForReview adds the "needs-review" label to the PR via Forge API.
// s.flagForReview("core", "go-io", 42, mergeConflict)
func (s *PrepSubsystem) flagForReview(org, repo string, pullRequestNumber int, mergeOutcome mergeResult) {
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
// Ensure the label exists
s.ensureLabel(ctx, org, repo, "needs-review", "e11d48")
// Add label to PR
payload := core.JSONMarshalString(map[string]any{
"labels": []int{s.getLabelID(ctx, org, repo, "needs-review")},
})
url := core.Sprintf("%s/api/v1/repos/%s/%s/issues/%d/labels", s.forgeURL, org, repo, pullRequestNumber)
HTTPPost(ctx, url, payload, s.forgeToken, "token")
// Comment explaining the situation
reason := "Tests failed after rebase"
if mergeOutcome == mergeConflict {
reason = "Merge conflict persists after rebase"
@ -163,7 +149,7 @@ func (s *PrepSubsystem) flagForReview(org, repo string, pullRequestNumber int, m
s.commentOnIssue(ctx, org, repo, pullRequestNumber, comment)
}
// ensureLabel creates a label if it doesn't exist.
// s.ensureLabel(context.Background(), "core", "go-io", "needs-review", "e11d48")
func (s *PrepSubsystem) ensureLabel(ctx context.Context, org, repo, name, colour string) {
payload := core.JSONMarshalString(map[string]string{
"name": name,
@ -173,7 +159,7 @@ func (s *PrepSubsystem) ensureLabel(ctx context.Context, org, repo, name, colour
HTTPPost(ctx, url, payload, s.forgeToken, "token")
}
// getLabelID fetches the ID of a label by name.
// s.getLabelID(context.Background(), "core", "go-io", "needs-review")
func (s *PrepSubsystem) getLabelID(ctx context.Context, org, repo, name string) int {
url := core.Sprintf("%s/api/v1/repos/%s/%s/labels", s.forgeURL, org, repo)
getResult := HTTPGet(ctx, url, s.forgeToken, "token")
@ -194,7 +180,6 @@ func (s *PrepSubsystem) getLabelID(ctx context.Context, org, repo, name string)
return 0
}
// verifyResult holds the outcome of running tests.
type verifyResult struct {
passed bool
output string
@ -212,7 +197,7 @@ func resultText(result core.Result) string {
return ""
}
// runVerification detects the project type and runs the appropriate test suite.
// s.runVerification("/srv/core/workspace/core/go-io/task-5/repo")
func (s *PrepSubsystem) runVerification(repoDir string) verifyResult {
if fileExists(core.JoinPath(repoDir, "go.mod")) {
return s.runGoTests(repoDir)
@ -277,7 +262,7 @@ func (s *PrepSubsystem) runNodeTests(repoDir string) verifyResult {
return verifyResult{passed: testResult.OK, output: out, exitCode: exitCode, testCmd: "npm test"}
}
// forgeMergePR merges a PR via the Forge API.
// s.forgeMergePR(context.Background(), "core", "go-io", 42)
func (s *PrepSubsystem) forgeMergePR(ctx context.Context, org, repo string, pullRequestNumber int) core.Result {
payload := core.JSONMarshalString(map[string]any{
"Do": "merge",
@ -289,7 +274,7 @@ func (s *PrepSubsystem) forgeMergePR(ctx context.Context, org, repo string, pull
return HTTPPost(ctx, url, payload, s.forgeToken, "token")
}
// extractPullRequestNumber gets the PR number from a Forge PR URL.
// extractPullRequestNumber("https://forge.lthn.ai/core/go-io/pulls/42")
func extractPullRequestNumber(pullRequestURL string) int {
parts := core.Split(pullRequestURL, "/")
if len(parts) == 0 {
@ -298,7 +283,6 @@ func extractPullRequestNumber(pullRequestURL string) int {
return parseInt(parts[len(parts)-1])
}
// fileExists checks if a file exists.
func fileExists(path string) bool {
return fs.IsFile(path)
}

View file

@ -19,7 +19,6 @@ type BrainProvider struct {
hub *ws.Hub
}
// compile-time interface checks
var (
_ provider.Provider = (*BrainProvider)(nil)
_ provider.Streamable = (*BrainProvider)(nil)

View file

@ -1,10 +1,9 @@
// SPDX-License-Identifier: EUPL-1.2
// Package runner is the agent dispatch service.
// Owns concurrency, queue drain, workspace lifecycle, and frozen state.
// Communicates with other services via Core IPC — Actions, Tasks, and Messages.
// Package runner owns agent dispatch and workspace lifecycle.
//
// core.New(core.WithService(runner.Register))
// service := runner.New()
// service.TrackWorkspace("core/go-io/task-5", &runner.WorkspaceStatus{Status: "running", Agent: "codex"})
package runner
import (
@ -84,7 +83,6 @@ func Register(coreApp *core.Core) core.Result {
func (s *Service) OnStartup(ctx context.Context) core.Result {
coreApp := s.Core()
// Actions — the runner's capability map
coreApp.Action("runner.dispatch", s.actionDispatch).Description = "Dispatch a subagent (checks frozen + concurrency)"
coreApp.Action("runner.status", s.actionStatus).Description = "Query workspace status"
coreApp.Action("runner.start", s.actionStart).Description = "Unfreeze dispatch queue"
@ -92,13 +90,10 @@ func (s *Service) OnStartup(ctx context.Context) core.Result {
coreApp.Action("runner.kill", s.actionKill).Description = "Kill all running agents (hard stop)"
coreApp.Action("runner.poke", s.actionPoke).Description = "Drain next queued task"
// Hydrate workspace registry from disk
s.hydrateWorkspaces()
// QUERY handler — workspace state queries
coreApp.RegisterQuery(s.handleWorkspaceQuery)
// Start the background queue runner
s.startRunner()
return core.Result{OK: true}
@ -411,7 +406,6 @@ func (s *Service) hydrateWorkspaces() {
if !ok || workspaceStatus == nil {
continue
}
// Re-queue running agents on restart — process is dead, re-dispatch
if workspaceStatus.Status == "running" {
workspaceStatus.Status = "queued"
}