fix(ax): trim remaining implementation comments

Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
Virgil 2026-03-31 05:28:26 +00:00
parent b0c0a7af0b
commit e8cb1f2d39
13 changed files with 72 additions and 181 deletions

View file

@ -21,7 +21,6 @@ func (s *PrepSubsystem) autoCreatePR(workspaceDir string) {
repoDir := WorkspaceRepoDir(workspaceDir)
process := s.Core().Process()
// PRs target dev — agents never merge directly to main
defaultBranch := "dev"
processResult := process.RunIn(ctx, repoDir, "git", "log", "--oneline", core.Concat("origin/", defaultBranch, "..HEAD"))
@ -40,7 +39,6 @@ func (s *PrepSubsystem) autoCreatePR(workspaceDir string) {
org = "core"
}
// Push the branch to forge
forgeRemote := core.Sprintf("ssh://git@forge.lthn.ai:2223/%s/%s.git", org, workspaceStatus.Repo)
if !process.RunIn(ctx, repoDir, "git", "push", forgeRemote, workspaceStatus.Branch).OK {
if result := ReadStatusResult(workspaceDir); result.OK {
@ -54,7 +52,6 @@ func (s *PrepSubsystem) autoCreatePR(workspaceDir string) {
return
}
// Create PR via Forge API
title := core.Sprintf("[agent/%s] %s", workspaceStatus.Agent, truncate(workspaceStatus.Task, 60))
body := s.buildAutoPRBody(workspaceStatus, commitCount)
@ -74,7 +71,6 @@ func (s *PrepSubsystem) autoCreatePR(workspaceDir string) {
return
}
// Update status with PR URL
if result := ReadStatusResult(workspaceDir); result.OK {
workspaceStatusUpdate, ok := workspaceStatusValue(result)
if !ok {

View file

@ -18,19 +18,19 @@ type workspaceTracker interface {
// input := agentic.DispatchInput{Repo: "go-io", Task: "Fix the failing tests", Agent: "codex", Issue: 15}
type DispatchInput struct {
Repo string `json:"repo"` // Target repo (e.g. "go-io")
Org string `json:"org,omitempty"` // Forge org (default "core")
Task string `json:"task"` // What the agent should do
Agent string `json:"agent,omitempty"` // "codex" (default), "claude", "gemini"
Template string `json:"template,omitempty"` // "conventions", "security", "coding" (default)
PlanTemplate string `json:"plan_template,omitempty"` // Plan template slug
Variables map[string]string `json:"variables,omitempty"` // Template variable substitution
Persona string `json:"persona,omitempty"` // Persona slug
Issue int `json:"issue,omitempty"` // Forge issue number → workspace: task-{num}/
PR int `json:"pr,omitempty"` // PR number → workspace: pr-{num}/
Branch string `json:"branch,omitempty"` // Branch → workspace: {branch}/
Tag string `json:"tag,omitempty"` // Tag → workspace: {tag}/ (immutable)
DryRun bool `json:"dry_run,omitempty"` // Preview without executing
Repo string `json:"repo"`
Org string `json:"org,omitempty"`
Task string `json:"task"`
Agent string `json:"agent,omitempty"`
Template string `json:"template,omitempty"`
PlanTemplate string `json:"plan_template,omitempty"`
Variables map[string]string `json:"variables,omitempty"`
Persona string `json:"persona,omitempty"`
Issue int `json:"issue,omitempty"`
PR int `json:"pr,omitempty"`
Branch string `json:"branch,omitempty"`
Tag string `json:"tag,omitempty"`
DryRun bool `json:"dry_run,omitempty"`
}
// out := agentic.DispatchOutput{Success: true, Agent: "codex", Repo: "go-io", WorkspaceDir: ".core/workspace/core/go-io/task-15"}
@ -92,15 +92,12 @@ func agentCommandResult(agent, prompt string) core.Result {
return core.Result{Value: agentCommandResultValue{command: "gemini", args: args}, OK: true}
case "codex":
if model == "review" {
// Use exec with bypass — codex review subcommand has its own sandbox that blocks shell
// No -o flag — stdout captured by process output, ../.meta path unreliable in sandbox
return core.Result{Value: agentCommandResultValue{command: "codex", args: []string{
"exec",
"--dangerously-bypass-approvals-and-sandbox",
"Review the last 2 commits via git diff HEAD~2. Check for bugs, security issues, missing tests, naming issues. Report pass/fail with specifics. Do NOT make changes.",
}}, OK: true}
}
// Container IS the sandbox — let codex run unrestricted inside it
args := []string{
"exec",
"--dangerously-bypass-approvals-and-sandbox",
@ -133,9 +130,6 @@ func agentCommandResult(agent, prompt string) core.Result {
}
return core.Result{Value: agentCommandResultValue{command: "coderabbit", args: args}, OK: true}
case "local":
// Local model via codex --oss → Ollama. Default model: devstral-24b
// socat proxies localhost:11434 → host.docker.internal:11434
// because codex hardcodes localhost check for Ollama.
localModel := model
if localModel == "" {
localModel = "devstral-24b"
@ -163,46 +157,36 @@ func containerCommand(command string, args []string, repoDir, metaDir string) (s
dockerArgs := []string{
"run", "--rm",
// Host access for Ollama (local models)
"--add-host=host.docker.internal:host-gateway",
// Workspace: repo + meta
"-v", core.Concat(repoDir, ":/workspace"),
"-v", core.Concat(metaDir, ":/workspace/.meta"),
"-w", "/workspace",
// Auth: agent configs only — NO SSH keys, git push runs on host
"-v", core.Concat(core.JoinPath(home, ".codex"), ":/home/dev/.codex:ro"),
// API keys — passed by name, Docker resolves from host env
"-e", "OPENAI_API_KEY",
"-e", "ANTHROPIC_API_KEY",
"-e", "GEMINI_API_KEY",
"-e", "GOOGLE_API_KEY",
// Agent environment
"-e", "TERM=dumb",
"-e", "NO_COLOR=1",
"-e", "CI=true",
"-e", "GIT_USER_NAME=Virgil",
"-e", "GIT_USER_EMAIL=virgil@lethean.io",
// Go workspace — local modules bypass checksum verification
"-e", "GONOSUMCHECK=dappco.re/*,forge.lthn.ai/*",
"-e", "GOFLAGS=-mod=mod",
}
// Mount Claude config if dispatching claude agent
if command == "claude" {
dockerArgs = append(dockerArgs,
"-v", core.Concat(core.JoinPath(home, ".claude"), ":/home/dev/.claude:ro"),
)
}
// Mount Gemini config if dispatching gemini agent
if command == "gemini" {
dockerArgs = append(dockerArgs,
"-v", core.Concat(core.JoinPath(home, ".gemini"), ":/home/dev/.gemini:ro"),
)
}
// Wrap agent command in sh -c to chmod workspace after exit.
// Docker runs as a different user — without this, host can't delete workspace files.
quoted := core.NewBuilder()
quoted.WriteString(command)
for _, a := range args {
@ -252,15 +236,14 @@ func (s *PrepSubsystem) trackFailureRate(agent, status string, startedAt time.Ti
return true
}
} else {
s.failCount[pool] = 0 // slow failure = real failure, reset count
s.failCount[pool] = 0
}
} else {
s.failCount[pool] = 0 // success resets count
s.failCount[pool] = 0
}
return false
}
// s.startIssueTracking("/srv/.core/workspace/core/go-io/task-5")
func (s *PrepSubsystem) startIssueTracking(workspaceDir string) {
if s.forge == nil {
return
@ -277,7 +260,6 @@ func (s *PrepSubsystem) startIssueTracking(workspaceDir string) {
s.forge.Issues.StartStopwatch(context.Background(), org, workspaceStatus.Repo, int64(workspaceStatus.Issue))
}
// s.stopIssueTracking("/srv/.core/workspace/core/go-io/task-5")
func (s *PrepSubsystem) stopIssueTracking(workspaceDir string) {
if s.forge == nil {
return
@ -351,7 +333,6 @@ func (s *PrepSubsystem) onAgentComplete(agent, workspaceDir, outputFile string,
s.broadcastComplete(agent, workspaceDir, finalStatus)
// c.PerformAsync("agentic.complete", core.NewOptions(core.Option{Key: "workspace", Value: workspaceDir}))
if finalStatus == "completed" && s.ServiceRuntime != nil {
s.Core().PerformAsync("agentic.complete", core.NewOptions(
core.Option{Key: "workspace", Value: workspaceDir},
@ -370,10 +351,8 @@ func (s *PrepSubsystem) spawnAgent(agent, prompt, workspaceDir string) (int, str
metaDir := WorkspaceMetaDir(workspaceDir)
outputFile := agentOutputFile(workspaceDir, agent)
// Clean up stale BLOCKED.md from previous runs
fs.Delete(WorkspaceBlockedPath(workspaceDir))
// All agents run containerised
command, args = containerCommand(command, args, repoDir, metaDir)
procSvc, ok := core.ServiceFor[*process.Service](s.Core(), "process")
@ -397,8 +376,6 @@ func (s *PrepSubsystem) spawnAgent(agent, prompt, workspaceDir string) (int, str
s.broadcastStart(agent, workspaceDir)
s.startIssueTracking(workspaceDir)
// Register a one-shot Action that monitors this agent, then run it via PerformAsync.
// PerformAsync tracks it in Core's WaitGroup — ServiceShutdown waits for it.
monitorAction := core.Concat("agentic.monitor.", core.Replace(WorkspaceName(workspaceDir), "/", "."))
monitor := &agentCompletionMonitor{
service: s,
@ -419,8 +396,6 @@ type completionProcess interface {
Output() string
}
// monitor := &agentCompletionMonitor{service: s, agent: "codex", workspaceDir: workspaceDir, outputFile: outputFile, process: proc}
// s.Core().Action("agentic.monitor.core.go-io.task-5", monitor.run)
type agentCompletionMonitor struct {
service *PrepSubsystem
agent string
@ -443,7 +418,6 @@ func (m *agentCompletionMonitor) run(_ context.Context, _ core.Options) core.Res
return core.Result{OK: true}
}
// passed := s.runQA(workspaceDir)
func (s *PrepSubsystem) runQA(workspaceDir string) bool {
ctx := context.Background()
repoDir := WorkspaceRepoDir(workspaceDir)

View file

@ -29,7 +29,6 @@ type DispatchSyncResult struct {
// result := prep.DispatchSync(ctx, input)
func (s *PrepSubsystem) DispatchSync(ctx context.Context, input DispatchSyncInput) DispatchSyncResult {
// Prep workspace
prepInput := PrepInput{
Org: input.Org,
Repo: input.Repo,
@ -55,7 +54,6 @@ func (s *PrepSubsystem) DispatchSync(ctx context.Context, input DispatchSyncInpu
core.Print(nil, " workspace: %s", workspaceDir)
core.Print(nil, " branch: %s", prepOut.Branch)
// Spawn agent directly — no queue, no concurrency check
pid, processID, _, err := s.spawnAgent(input.Agent, prompt, workspaceDir)
if err != nil {
return DispatchSyncResult{Error: core.E("agentic.DispatchSync", "spawn agent failed", err)}
@ -69,7 +67,6 @@ func (s *PrepSubsystem) DispatchSync(ctx context.Context, input DispatchSyncInpu
runtime = s.Core()
}
// Poll for process exit
ticker := time.NewTicker(3 * time.Second)
defer ticker.Stop()
@ -79,7 +76,6 @@ func (s *PrepSubsystem) DispatchSync(ctx context.Context, input DispatchSyncInpu
return DispatchSyncResult{Error: core.E("agentic.DispatchSync", "cancelled", ctx.Err())}
case <-ticker.C:
if pid > 0 && !ProcessAlive(runtime, processID, pid) {
// Process exited — read final status
result := ReadStatusResult(workspaceDir)
st, ok := workspaceStatusValue(result)
if !ok {

View file

@ -10,9 +10,9 @@ import (
)
type MirrorInput struct {
Repo string `json:"repo,omitempty"` // Specific repo, or empty for all
DryRun bool `json:"dry_run,omitempty"` // Preview without pushing
MaxFiles int `json:"max_files,omitempty"` // Max files per PR (default 50, CodeRabbit limit)
Repo string `json:"repo,omitempty"`
DryRun bool `json:"dry_run,omitempty"`
MaxFiles int `json:"max_files,omitempty"`
}
type MirrorOutput struct {
@ -52,7 +52,6 @@ func (s *PrepSubsystem) mirror(ctx context.Context, _ *mcp.CallToolRequest, inpu
basePath = core.JoinPath(basePath, "core")
}
// Build list of repos to sync
var repos []string
if input.Repo != "" {
repos = []string{input.Repo}
@ -66,23 +65,19 @@ func (s *PrepSubsystem) mirror(ctx context.Context, _ *mcp.CallToolRequest, inpu
for _, repo := range repos {
repoDir := core.JoinPath(basePath, repo)
// Check if github remote exists
if !s.hasRemote(repoDir, "github") {
skipped = append(skipped, core.Concat(repo, ": no github remote"))
continue
}
// Fetch github to get current state
process.RunIn(ctx, repoDir, "git", "fetch", "github")
// Check how far ahead local default branch is vs github
localBase := s.DefaultBranch(repoDir)
ahead := s.commitsAhead(repoDir, "github/main", localBase)
if ahead == 0 {
continue // Already in sync
continue
}
// Count files changed
files := s.filesChanged(repoDir, "github/main", localBase)
sync := MirrorSync{
@ -91,7 +86,6 @@ func (s *PrepSubsystem) mirror(ctx context.Context, _ *mcp.CallToolRequest, inpu
FilesChanged: files,
}
// Skip if too many files for one PR
if files > maxFiles {
sync.Skipped = core.Sprintf("%d files exceeds limit of %d", files, maxFiles)
synced = append(synced, sync)
@ -104,10 +98,8 @@ func (s *PrepSubsystem) mirror(ctx context.Context, _ *mcp.CallToolRequest, inpu
continue
}
// Ensure dev branch exists on GitHub
s.ensureDevBranch(repoDir)
// Push local main to github dev (explicit main, not HEAD)
base := s.DefaultBranch(repoDir)
if r := process.RunIn(ctx, repoDir, "git", "push", "github", core.Concat(base, ":refs/heads/dev"), "--force"); !r.OK {
sync.Skipped = core.Sprintf("push failed: %s", r.Value)
@ -116,7 +108,6 @@ func (s *PrepSubsystem) mirror(ctx context.Context, _ *mcp.CallToolRequest, inpu
}
sync.Pushed = true
// Create PR: dev → main on GitHub
pullRequestURL, err := s.createGitHubPR(ctx, repoDir, repo, ahead, files)
if err != nil {
sync.Skipped = core.Sprintf("PR creation failed: %v", err)
@ -140,7 +131,6 @@ func (s *PrepSubsystem) createGitHubPR(ctx context.Context, repoDir, repo string
ghRepo := core.Sprintf("%s/%s", GitHubOrg(), repo)
process := s.Core().Process()
// Check if there's already an open PR from dev
r := process.RunIn(ctx, repoDir, "gh", "pr", "list", "--repo", ghRepo, "--head", "dev", "--state", "open", "--json", "url", "--limit", "1")
if r.OK {
out := r.Value.(string)
@ -209,7 +199,6 @@ func (s *PrepSubsystem) listLocalRepos(basePath string) []string {
if !fs.IsDir(p) {
continue
}
// Must have a .git directory
if fs.IsDir(core.JoinPath(basePath, name, ".git")) {
repos = append(repos, name)
}

View file

@ -15,7 +15,7 @@ import (
type Plan struct {
ID string `json:"id"`
Title string `json:"title"`
Status string `json:"status"` // draft, ready, in_progress, needs_verification, verified, approved
Status string `json:"status"`
Repo string `json:"repo,omitempty"`
Org string `json:"org,omitempty"`
Objective string `json:"objective"`
@ -30,7 +30,7 @@ type Plan struct {
type Phase struct {
Number int `json:"number"`
Name string `json:"name"`
Status string `json:"status"` // pending, in_progress, done
Status string `json:"status"`
Criteria []string `json:"criteria,omitempty"`
Tests int `json:"tests,omitempty"`
Notes string `json:"notes,omitempty"`
@ -144,7 +144,6 @@ func (s *PrepSubsystem) planCreate(_ context.Context, _ *mcp.CallToolRequest, in
UpdatedAt: time.Now(),
}
// Default phase status to pending
for i := range plan.Phases {
if plan.Phases[i].Status == "" {
plan.Phases[i].Status = "pending"
@ -216,7 +215,6 @@ func (s *PrepSubsystem) planUpdate(_ context.Context, _ *mcp.CallToolRequest, in
return nil, PlanUpdateOutput{}, core.E("planUpdate", "invalid plan payload", nil)
}
// Apply partial updates
if input.Status != "" {
if !validPlanStatus(input.Status) {
return nil, PlanUpdateOutput{}, core.E("planUpdate", core.Concat("invalid status: ", input.Status, " (valid: draft, ready, in_progress, needs_verification, verified, approved)"), nil)

View file

@ -12,11 +12,11 @@ import (
// input := agentic.CreatePRInput{Workspace: "core/go-io/task-42", Title: "Fix watcher panic"}
type CreatePRInput struct {
Workspace string `json:"workspace"` // workspace name (e.g. "core/go-io/task-42")
Title string `json:"title,omitempty"` // PR title (default: task description)
Body string `json:"body,omitempty"` // PR body (default: auto-generated)
Base string `json:"base,omitempty"` // base branch (default: "main")
DryRun bool `json:"dry_run,omitempty"` // preview without creating
Workspace string `json:"workspace"`
Title string `json:"title,omitempty"`
Body string `json:"body,omitempty"`
Base string `json:"base,omitempty"`
DryRun bool `json:"dry_run,omitempty"`
}
// out := agentic.CreatePROutput{Success: true, PRURL: "https://forge.example/core/go-io/pulls/12", PRNum: 12}
@ -52,7 +52,6 @@ func (s *PrepSubsystem) createPR(ctx context.Context, _ *mcp.CallToolRequest, in
return nil, CreatePROutput{}, core.E("createPR", core.Concat("workspace not found: ", input.Workspace), nil)
}
// Read workspace status for repo, branch, issue context
result := ReadStatusResult(workspaceDir)
workspaceStatus, ok := workspaceStatusValue(result)
if !ok {
@ -81,7 +80,6 @@ func (s *PrepSubsystem) createPR(ctx context.Context, _ *mcp.CallToolRequest, in
base = "dev"
}
// Build PR title
title := input.Title
if title == "" {
title = workspaceStatus.Task
@ -90,7 +88,6 @@ func (s *PrepSubsystem) createPR(ctx context.Context, _ *mcp.CallToolRequest, in
title = core.Sprintf("Agent work on %s", workspaceStatus.Branch)
}
// Build PR body
body := input.Body
if body == "" {
body = s.buildPRBody(workspaceStatus)
@ -105,24 +102,20 @@ func (s *PrepSubsystem) createPR(ctx context.Context, _ *mcp.CallToolRequest, in
}, nil
}
// Push branch to Forge (origin is the local clone, not Forge)
forgeRemote := core.Sprintf("ssh://git@forge.lthn.ai:2223/%s/%s.git", org, workspaceStatus.Repo)
pushResult := s.Core().Process().RunIn(ctx, repoDir, "git", "push", forgeRemote, workspaceStatus.Branch)
if !pushResult.OK {
return nil, CreatePROutput{}, core.E("createPR", core.Concat("git push failed: ", pushResult.Value.(string)), nil)
}
// Create PR via Forge API
pullRequestURL, pullRequestNumber, err := s.forgeCreatePR(ctx, org, workspaceStatus.Repo, workspaceStatus.Branch, base, title, body)
if err != nil {
return nil, CreatePROutput{}, core.E("createPR", "failed to create PR", err)
}
// Update status with PR URL
workspaceStatus.PRURL = pullRequestURL
writeStatusResult(workspaceDir, workspaceStatus)
// Comment on issue if tracked
if workspaceStatus.Issue > 0 {
comment := core.Sprintf("Pull request created: %s", pullRequestURL)
s.commentOnIssue(ctx, org, workspaceStatus.Repo, workspaceStatus.Issue, comment)

View file

@ -30,13 +30,13 @@ type PrepSubsystem struct {
brainKey string
codePath string
startupContext context.Context
dispatchMu sync.Mutex // serialises concurrency check + spawn
dispatchMu sync.Mutex
drainMu sync.Mutex
pokeCh chan struct{}
frozen bool
backoff map[string]time.Time // pool → paused until
failCount map[string]int // pool → consecutive fast failures
workspaces *core.Registry[*WorkspaceStatus] // in-memory workspace state
backoff map[string]time.Time
failCount map[string]int
workspaces *core.Registry[*WorkspaceStatus]
}
var _ coremcp.Subsystem = (*PrepSubsystem)(nil)
@ -83,19 +83,10 @@ func (s *PrepSubsystem) SetCore(c *core.Core) {
func (s *PrepSubsystem) OnStartup(ctx context.Context) core.Result {
c := s.Core()
// Entitlement — gates agentic Actions when queue is frozen.
// Per-agent concurrency is checked inside handlers (needs Options for agent name).
// Entitlement gates the global capability: "can this Core dispatch at all?"
//
// e := c.Entitled("agentic.dispatch")
// e.Allowed // false when frozen
// e.Reason // "agent queue is frozen"
c.SetEntitlementChecker(func(action string, qty int, _ context.Context) core.Entitlement {
// Only gate agentic.* actions
if !core.HasPrefix(action, "agentic.") {
return core.Entitlement{Allowed: true, Unlimited: true}
}
// Read-only + internal actions always allowed
if core.HasPrefix(action, "agentic.monitor.") || core.HasPrefix(action, "agentic.complete") {
return core.Entitlement{Allowed: true, Unlimited: true}
}
@ -105,20 +96,14 @@ func (s *PrepSubsystem) OnStartup(ctx context.Context) core.Result {
"agentic.prompt", "agentic.task", "agentic.flow", "agentic.persona":
return core.Entitlement{Allowed: true, Unlimited: true}
}
// Write actions gated by frozen state
if s.frozen {
return core.Entitlement{Allowed: false, Reason: "agent queue is frozen — shutting down"}
}
return core.Entitlement{Allowed: true}
})
// Data — mount embedded content so other services can access it via c.Data()
//
// c.Data().ReadString("prompts/coding.md")
// c.Data().ListNames("flows")
lib.MountData(c)
// Transport — register HTTP protocol + Drive endpoints
RegisterHTTPTransport(c)
c.Drive().New(core.NewOptions(
core.Option{Key: "name", Value: "forge"},
@ -131,7 +116,6 @@ func (s *PrepSubsystem) OnStartup(ctx context.Context) core.Result {
core.Option{Key: "token", Value: s.brainKey},
))
// Dispatch & workspace
c.Action("agentic.dispatch", s.handleDispatch).Description = "Prep workspace and spawn a subagent"
c.Action("agentic.prep", s.handlePrep).Description = "Clone repo and build agent prompt"
c.Action("agentic.status", s.handleStatus).Description = "List workspace states (running/completed/blocked)"
@ -139,7 +123,6 @@ func (s *PrepSubsystem) OnStartup(ctx context.Context) core.Result {
c.Action("agentic.scan", s.handleScan).Description = "Scan Forge repos for actionable issues"
c.Action("agentic.watch", s.handleWatch).Description = "Watch workspace for changes and report"
// Pipeline
c.Action("agentic.qa", s.handleQA).Description = "Run build + test QA checks on workspace"
c.Action("agentic.auto-pr", s.handleAutoPR).Description = "Create PR from completed workspace"
c.Action("agentic.verify", s.handleVerify).Description = "Verify PR and auto-merge if clean"
@ -147,7 +130,6 @@ func (s *PrepSubsystem) OnStartup(ctx context.Context) core.Result {
c.Action("agentic.poke", s.handlePoke).Description = "Drain next queued task from the queue"
c.Action("agentic.mirror", s.handleMirror).Description = "Mirror agent branches to GitHub"
// Forge
c.Action("agentic.issue.get", s.handleIssueGet).Description = "Get a Forge issue by number"
c.Action("agentic.issue.list", s.handleIssueList).Description = "List Forge issues for a repo"
c.Action("agentic.issue.create", s.handleIssueCreate).Description = "Create a Forge issue"
@ -155,19 +137,15 @@ func (s *PrepSubsystem) OnStartup(ctx context.Context) core.Result {
c.Action("agentic.pr.list", s.handlePRList).Description = "List Forge PRs for a repo"
c.Action("agentic.pr.merge", s.handlePRMerge).Description = "Merge a Forge PR"
// Review
c.Action("agentic.review-queue", s.handleReviewQueue).Description = "Run CodeRabbit review on completed workspaces"
// Epic
c.Action("agentic.epic", s.handleEpic).Description = "Create sub-issues from an epic plan"
// Content — accessible via IPC, no lib import needed
c.Action("agentic.prompt", s.handlePrompt).Description = "Read a system prompt by slug"
c.Action("agentic.task", s.handleTask).Description = "Read a task plan by slug"
c.Action("agentic.flow", s.handleFlow).Description = "Read a build/release flow by slug"
c.Action("agentic.persona", s.handlePersona).Description = "Read a persona by path"
// Completion pipeline — Task composition
c.Task("agent.completion", core.Task{
Description: "QA → PR → Verify → Merge",
Steps: []core.Step{
@ -179,17 +157,10 @@ func (s *PrepSubsystem) OnStartup(ctx context.Context) core.Result {
},
})
// PerformAsync wrapper — runs the completion Task in background with progress tracking.
// c.PerformAsync("agentic.complete", options) broadcasts ActionTaskStarted/Completed.
c.Action("agentic.complete", s.handleComplete).Description = "Run completion pipeline (QA → PR → Verify) in background"
// Hydrate workspace registry from disk
s.hydrateWorkspaces()
// QUERY handler — "what workspaces exist?"
//
// r := c.QUERY(agentic.WorkspaceQuery{})
// if r.OK { workspaces := r.Value.(*core.Registry[*WorkspaceStatus]) }
c.RegisterQuery(s.handleWorkspaceQuery)
s.StartRunner()

View file

@ -10,14 +10,14 @@ import (
// input := agentic.RemoteDispatchInput{Host: "charon", Repo: "go-io", Task: "Run the review queue"}
type RemoteDispatchInput struct {
Host string `json:"host"` // Remote agent host (e.g. "charon", "10.69.69.165:9101")
Repo string `json:"repo"` // Target repo
Task string `json:"task"` // What the agent should do
Agent string `json:"agent,omitempty"` // Agent type (default: claude:opus)
Template string `json:"template,omitempty"` // Prompt template
Persona string `json:"persona,omitempty"` // Persona slug
Org string `json:"org,omitempty"` // Forge org (default: core)
Variables map[string]string `json:"variables,omitempty"` // Template variables
Host string `json:"host"`
Repo string `json:"repo"`
Task string `json:"task"`
Agent string `json:"agent,omitempty"`
Template string `json:"template,omitempty"`
Persona string `json:"persona,omitempty"`
Org string `json:"org,omitempty"`
Variables map[string]string `json:"variables,omitempty"`
}
// out := agentic.RemoteDispatchOutput{Success: true, Host: "charon", Repo: "go-io", Agent: "claude:opus"}

View file

@ -9,7 +9,7 @@ import (
)
type RemoteStatusInput struct {
Host string `json:"host"` // Remote agent host (e.g. "charon")
Host string `json:"host"`
}
type RemoteStatusOutput struct {

View file

@ -11,10 +11,10 @@ import (
// input := agentic.ResumeInput{Workspace: "core/go-scm/task-42", Answer: "Use the existing queue config"}
type ResumeInput struct {
Workspace string `json:"workspace"` // workspace name (e.g. "core/go-scm/task-42")
Answer string `json:"answer,omitempty"` // answer to the blocked question (written to ANSWER.md)
Agent string `json:"agent,omitempty"` // override agent type (default: same as original)
DryRun bool `json:"dry_run,omitempty"` // preview without executing
Workspace string `json:"workspace"`
Answer string `json:"answer,omitempty"`
Agent string `json:"agent,omitempty"`
DryRun bool `json:"dry_run,omitempty"`
}
// out := agentic.ResumeOutput{Success: true, Workspace: "core/go-scm/task-42", Agent: "codex"}
@ -42,12 +42,10 @@ func (s *PrepSubsystem) resume(ctx context.Context, _ *mcp.CallToolRequest, inpu
workspaceDir := core.JoinPath(WorkspaceRoot(), input.Workspace)
repoDir := WorkspaceRepoDir(workspaceDir)
// Verify workspace exists
if !fs.IsDir(core.JoinPath(repoDir, ".git")) {
return nil, ResumeOutput{}, core.E("resume", core.Concat("workspace not found: ", input.Workspace), nil)
}
// Read current status
result := ReadStatusResult(workspaceDir)
workspaceStatus, ok := workspaceStatusValue(result)
if !ok {
@ -59,13 +57,11 @@ func (s *PrepSubsystem) resume(ctx context.Context, _ *mcp.CallToolRequest, inpu
return nil, ResumeOutput{}, core.E("resume", core.Concat("workspace is ", workspaceStatus.Status, ", not resumable (must be blocked, failed, or completed)"), nil)
}
// Determine agent
agent := workspaceStatus.Agent
if input.Agent != "" {
agent = input.Agent
}
// Write ANSWER.md if answer provided
if input.Answer != "" {
answerPath := workspaceAnswerPath(workspaceDir)
content := core.Sprintf("# Answer\n\n%s\n", input.Answer)
@ -75,7 +71,6 @@ func (s *PrepSubsystem) resume(ctx context.Context, _ *mcp.CallToolRequest, inpu
}
}
// Build resume prompt — inline the task and answer, no file references
prompt := core.Concat("You are resuming previous work.\n\nORIGINAL TASK:\n", workspaceStatus.Task)
if input.Answer != "" {
prompt = core.Concat(prompt, "\n\nANSWER TO YOUR QUESTION:\n", input.Answer)
@ -91,13 +86,11 @@ func (s *PrepSubsystem) resume(ctx context.Context, _ *mcp.CallToolRequest, inpu
}, nil
}
// Spawn agent via go-process
pid, processID, _, err := s.spawnAgent(agent, prompt, workspaceDir)
if err != nil {
return nil, ResumeOutput{}, err
}
// Update status
workspaceStatus.Status = "running"
workspaceStatus.PID = pid
workspaceStatus.ProcessID = processID

View file

@ -13,10 +13,10 @@ import (
// input := agentic.ReviewQueueInput{Reviewer: "coderabbit", Limit: 4, DryRun: true}
type ReviewQueueInput struct {
Limit int `json:"limit,omitempty"` // Max PRs to process this run (default: 4)
Reviewer string `json:"reviewer,omitempty"` // "coderabbit" (default), "codex", or "both"
DryRun bool `json:"dry_run,omitempty"` // Preview without acting
LocalOnly bool `json:"local_only,omitempty"` // Run review locally, don't touch GitHub
Limit int `json:"limit,omitempty"`
Reviewer string `json:"reviewer,omitempty"`
DryRun bool `json:"dry_run,omitempty"`
LocalOnly bool `json:"local_only,omitempty"`
}
// out := agentic.ReviewQueueOutput{Success: true, Processed: []agentic.ReviewResult{{Repo: "go-io", Verdict: "clean"}}}
@ -30,9 +30,9 @@ type ReviewQueueOutput struct {
// result := agentic.ReviewResult{Repo: "go-io", Verdict: "findings", Findings: 3, Action: "fix_dispatched"}
type ReviewResult struct {
Repo string `json:"repo"`
Verdict string `json:"verdict"` // clean, findings, rate_limited, error
Findings int `json:"findings"` // Number of findings (0 = clean)
Action string `json:"action"` // merged, fix_dispatched, skipped, waiting
Verdict string `json:"verdict"`
Findings int `json:"findings"`
Action string `json:"action"`
Detail string `json:"detail,omitempty"`
}
@ -72,7 +72,6 @@ func (s *PrepSubsystem) reviewQueue(ctx context.Context, _ *mcp.CallToolRequest,
}
basePath = core.JoinPath(basePath, "core")
// Find repos with draft PRs (ahead of GitHub)
candidates := s.findReviewCandidates(basePath)
if len(candidates) == 0 {
return nil, ReviewQueueOutput{
@ -91,7 +90,6 @@ func (s *PrepSubsystem) reviewQueue(ctx context.Context, _ *mcp.CallToolRequest,
continue
}
// Check rate limit from previous run
if rateInfo != nil && rateInfo.Limited && time.Now().Before(rateInfo.RetryAt) {
skipped = append(skipped, core.Concat(repo, " (rate limited)"))
continue
@ -104,7 +102,6 @@ func (s *PrepSubsystem) reviewQueue(ctx context.Context, _ *mcp.CallToolRequest,
}
result := s.reviewRepo(ctx, repoDir, repo, reviewer, input.DryRun, input.LocalOnly)
// Parse rate limit from result
if result.Verdict == "rate_limited" {
retryAfter := parseRetryAfter(result.Detail)
rateInfo = &RateLimitInfo{
@ -112,7 +109,6 @@ func (s *PrepSubsystem) reviewQueue(ctx context.Context, _ *mcp.CallToolRequest,
RetryAt: time.Now().Add(retryAfter),
Message: result.Detail,
}
// Don't count rate-limited as processed — save the slot
skipped = append(skipped, core.Concat(repo, " (rate limited: ", retryAfter.String(), ")"))
continue
}
@ -120,7 +116,6 @@ func (s *PrepSubsystem) reviewQueue(ctx context.Context, _ *mcp.CallToolRequest,
processed = append(processed, result)
}
// Save rate limit state for next run
if rateInfo != nil {
s.saveRateLimitState(rateInfo)
}
@ -159,14 +154,12 @@ func (s *PrepSubsystem) reviewRepo(ctx context.Context, repoDir, repo, reviewer
result := ReviewResult{Repo: repo}
process := s.Core().Process()
// Check saved rate limit
if rl := s.loadRateLimitState(); rl != nil && rl.Limited && time.Now().Before(rl.RetryAt) {
result.Verdict = "rate_limited"
result.Detail = core.Sprintf("retry after %s", rl.RetryAt.Format(time.RFC3339))
return result
}
// Run reviewer CLI locally — use the reviewer passed from reviewQueue
if reviewer == "" {
reviewer = "coderabbit"
}
@ -174,24 +167,20 @@ func (s *PrepSubsystem) reviewRepo(ctx context.Context, repoDir, repo, reviewer
r := process.RunIn(ctx, repoDir, command, args...)
output, _ := r.Value.(string)
// Parse rate limit (both reviewers use similar patterns)
if core.Contains(output, "Rate limit exceeded") || core.Contains(output, "rate limit") {
result.Verdict = "rate_limited"
result.Detail = output
return result
}
// Parse error
if !r.OK && !core.Contains(output, "No findings") && !core.Contains(output, "no issues") {
result.Verdict = "error"
result.Detail = output
return result
}
// Store raw output for training data
s.storeReviewOutput(repoDir, repo, reviewer, output)
// Parse verdict
if core.Contains(output, "No findings") || core.Contains(output, "no issues") || core.Contains(output, "LGTM") {
result.Verdict = "clean"
result.Findings = 0
@ -206,14 +195,12 @@ func (s *PrepSubsystem) reviewRepo(ctx context.Context, repoDir, repo, reviewer
return result
}
// Push to GitHub and mark PR ready / merge
if err := s.pushAndMerge(ctx, repoDir, repo); err != nil {
result.Action = core.Concat("push failed: ", err.Error())
} else {
result.Action = "merged"
}
} else {
// Has findings — count them and dispatch fix agent
result.Verdict = "findings"
result.Findings = countFindings(output)
result.Detail = truncate(output, 500)
@ -250,7 +237,6 @@ func (s *PrepSubsystem) pushAndMerge(ctx context.Context, repoDir, repo string)
return core.E("pushAndMerge", core.Concat("push failed: ", r.Value.(string)), nil)
}
// Mark PR ready if draft
process.RunIn(ctx, repoDir, "gh", "pr", "ready", "--repo", core.Concat(GitHubOrg(), "/", repo))
if r := process.RunIn(ctx, repoDir, "gh", "pr", "merge", "--merge", "--delete-branch"); !r.OK {
@ -262,7 +248,6 @@ func (s *PrepSubsystem) pushAndMerge(ctx context.Context, repoDir, repo string)
// _ = s.dispatchFixFromQueue(ctx, "go-io", task)
func (s *PrepSubsystem) dispatchFixFromQueue(ctx context.Context, repo, task string) error {
// Use the dispatch system — creates workspace, spawns agent
input := DispatchInput{
Repo: repo,
Task: task,
@ -280,7 +265,6 @@ func (s *PrepSubsystem) dispatchFixFromQueue(ctx context.Context, repo, task str
// findings := countFindings(output)
func countFindings(output string) int {
// Count lines that look like findings
count := 0
for _, line := range core.Split(output, "\n") {
trimmed := core.Trim(line)
@ -291,7 +275,7 @@ func countFindings(output string) int {
}
}
if count == 0 && !core.Contains(output, "No findings") {
count = 1 // At least one finding if not clean
count = 1
}
return count
}
@ -310,7 +294,6 @@ func parseRetryAfter(message string) time.Duration {
}
return time.Duration(mins)*time.Minute + time.Duration(secs)*time.Second
}
// Default: 5 minutes
return 5 * time.Minute
}

View file

@ -13,28 +13,28 @@ import (
// result := ReadStatusResult(workspaceDir)
// if result.OK && result.Value.(*WorkspaceStatus).Status == "completed" { autoCreatePR(workspaceDir) }
type WorkspaceStatus struct {
Status string `json:"status"` // running, completed, blocked, failed
Agent string `json:"agent"` // gemini, claude, codex
Repo string `json:"repo"` // target repo
Org string `json:"org,omitempty"` // forge org (e.g. "core")
Task string `json:"task"` // task description
Branch string `json:"branch,omitempty"` // git branch name
Issue int `json:"issue,omitempty"` // forge issue number
PID int `json:"pid,omitempty"` // OS process ID (if running)
ProcessID string `json:"process_id,omitempty"` // go-process ID for managed lookup
StartedAt time.Time `json:"started_at"` // when dispatch started
UpdatedAt time.Time `json:"updated_at"` // last status change
Question string `json:"question,omitempty"` // from BLOCKED.md
Runs int `json:"runs"` // how many times dispatched/resumed
PRURL string `json:"pr_url,omitempty"` // pull request URL (after PR created)
Status string `json:"status"`
Agent string `json:"agent"`
Repo string `json:"repo"`
Org string `json:"org,omitempty"`
Task string `json:"task"`
Branch string `json:"branch,omitempty"`
Issue int `json:"issue,omitempty"`
PID int `json:"pid,omitempty"`
ProcessID string `json:"process_id,omitempty"`
StartedAt time.Time `json:"started_at"`
UpdatedAt time.Time `json:"updated_at"`
Question string `json:"question,omitempty"`
Runs int `json:"runs"`
PRURL string `json:"pr_url,omitempty"`
}
// r := c.QUERY(agentic.WorkspaceQuery{})
// if r.OK { reg := r.Value.(*core.Registry[*WorkspaceStatus]) }
// r := c.QUERY(agentic.WorkspaceQuery{Name: "core/go-io/task-5"})
type WorkspaceQuery struct {
Name string // specific workspace (empty = all)
Status string // filter by status (empty = all)
Name string
Status string
}
func writeStatus(workspaceDir string, status *WorkspaceStatus) error {
@ -103,9 +103,9 @@ func workspaceStatusValue(result core.Result) (*WorkspaceStatus, bool) {
// input := agentic.StatusInput{Workspace: "core/go-io/task-42", Limit: 50}
type StatusInput struct {
Workspace string `json:"workspace,omitempty"` // specific workspace name, or empty for all
Limit int `json:"limit,omitempty"` // max results (default 100)
Status string `json:"status,omitempty"` // filter: running, completed, failed, blocked
Workspace string `json:"workspace,omitempty"`
Limit int `json:"limit,omitempty"`
Status string `json:"status,omitempty"`
}
// out := agentic.StatusOutput{Total: 42, Running: 3, Queued: 10, Completed: 25}
@ -154,7 +154,6 @@ func (s *PrepSubsystem) status(ctx context.Context, _ *mcp.CallToolRequest, inpu
continue
}
// If status is "running", check whether the managed process is still alive.
if workspaceStatus.Status == "running" && (workspaceStatus.ProcessID != "" || workspaceStatus.PID > 0) {
if !ProcessAlive(runtime, workspaceStatus.ProcessID, workspaceStatus.PID) {
blockedPath := workspaceBlockedPath(workspaceDir)

View file

@ -18,7 +18,7 @@ type harvestResult struct {
repo string
branch string
files int
rejected string // non-empty if rejected (binary, too large, etc.)
rejected string
}
// summary := m.harvestCompleted()
@ -151,7 +151,6 @@ func (m *Subsystem) countUnpushed(repoDir, branch string) int {
base := m.defaultBranch(repoDir)
out := m.gitOutput(repoDir, "rev-list", "--count", core.Concat("origin/", base, "..", branch))
if out == "" {
// Fallback
out2 := m.gitOutput(repoDir, "log", "--oneline", core.Concat(base, "..", branch))
if out2 == "" {
return 0
@ -251,7 +250,7 @@ func updateStatus(workspaceDir, status, question string) {
if question != "" {
workspaceStatus["question"] = question
} else {
delete(workspaceStatus, "question") // clear stale question from previous state
delete(workspaceStatus, "question")
}
statusPath := agentic.WorkspaceStatusPath(workspaceDir)
if writeResult := fs.WriteAtomic(statusPath, core.JSONMarshalString(workspaceStatus)); !writeResult.OK {