diff --git a/pkg/agentic/auto_pr.go b/pkg/agentic/auto_pr.go index 992687b..93c9024 100644 --- a/pkg/agentic/auto_pr.go +++ b/pkg/agentic/auto_pr.go @@ -21,7 +21,6 @@ func (s *PrepSubsystem) autoCreatePR(workspaceDir string) { repoDir := WorkspaceRepoDir(workspaceDir) process := s.Core().Process() - // PRs target dev — agents never merge directly to main defaultBranch := "dev" processResult := process.RunIn(ctx, repoDir, "git", "log", "--oneline", core.Concat("origin/", defaultBranch, "..HEAD")) @@ -40,7 +39,6 @@ func (s *PrepSubsystem) autoCreatePR(workspaceDir string) { org = "core" } - // Push the branch to forge forgeRemote := core.Sprintf("ssh://git@forge.lthn.ai:2223/%s/%s.git", org, workspaceStatus.Repo) if !process.RunIn(ctx, repoDir, "git", "push", forgeRemote, workspaceStatus.Branch).OK { if result := ReadStatusResult(workspaceDir); result.OK { @@ -54,7 +52,6 @@ func (s *PrepSubsystem) autoCreatePR(workspaceDir string) { return } - // Create PR via Forge API title := core.Sprintf("[agent/%s] %s", workspaceStatus.Agent, truncate(workspaceStatus.Task, 60)) body := s.buildAutoPRBody(workspaceStatus, commitCount) @@ -74,7 +71,6 @@ func (s *PrepSubsystem) autoCreatePR(workspaceDir string) { return } - // Update status with PR URL if result := ReadStatusResult(workspaceDir); result.OK { workspaceStatusUpdate, ok := workspaceStatusValue(result) if !ok { diff --git a/pkg/agentic/dispatch.go b/pkg/agentic/dispatch.go index 5d673f4..6469e2c 100644 --- a/pkg/agentic/dispatch.go +++ b/pkg/agentic/dispatch.go @@ -18,19 +18,19 @@ type workspaceTracker interface { // input := agentic.DispatchInput{Repo: "go-io", Task: "Fix the failing tests", Agent: "codex", Issue: 15} type DispatchInput struct { - Repo string `json:"repo"` // Target repo (e.g. "go-io") - Org string `json:"org,omitempty"` // Forge org (default "core") - Task string `json:"task"` // What the agent should do - Agent string `json:"agent,omitempty"` // "codex" (default), "claude", "gemini" - Template string `json:"template,omitempty"` // "conventions", "security", "coding" (default) - PlanTemplate string `json:"plan_template,omitempty"` // Plan template slug - Variables map[string]string `json:"variables,omitempty"` // Template variable substitution - Persona string `json:"persona,omitempty"` // Persona slug - Issue int `json:"issue,omitempty"` // Forge issue number → workspace: task-{num}/ - PR int `json:"pr,omitempty"` // PR number → workspace: pr-{num}/ - Branch string `json:"branch,omitempty"` // Branch → workspace: {branch}/ - Tag string `json:"tag,omitempty"` // Tag → workspace: {tag}/ (immutable) - DryRun bool `json:"dry_run,omitempty"` // Preview without executing + Repo string `json:"repo"` + Org string `json:"org,omitempty"` + Task string `json:"task"` + Agent string `json:"agent,omitempty"` + Template string `json:"template,omitempty"` + PlanTemplate string `json:"plan_template,omitempty"` + Variables map[string]string `json:"variables,omitempty"` + Persona string `json:"persona,omitempty"` + Issue int `json:"issue,omitempty"` + PR int `json:"pr,omitempty"` + Branch string `json:"branch,omitempty"` + Tag string `json:"tag,omitempty"` + DryRun bool `json:"dry_run,omitempty"` } // out := agentic.DispatchOutput{Success: true, Agent: "codex", Repo: "go-io", WorkspaceDir: ".core/workspace/core/go-io/task-15"} @@ -92,15 +92,12 @@ func agentCommandResult(agent, prompt string) core.Result { return core.Result{Value: agentCommandResultValue{command: "gemini", args: args}, OK: true} case "codex": if model == "review" { - // Use exec with bypass — codex review subcommand has its own sandbox that blocks shell - // No -o flag — stdout captured by process output, ../.meta path unreliable in sandbox return core.Result{Value: agentCommandResultValue{command: "codex", args: []string{ "exec", "--dangerously-bypass-approvals-and-sandbox", "Review the last 2 commits via git diff HEAD~2. Check for bugs, security issues, missing tests, naming issues. Report pass/fail with specifics. Do NOT make changes.", }}, OK: true} } - // Container IS the sandbox — let codex run unrestricted inside it args := []string{ "exec", "--dangerously-bypass-approvals-and-sandbox", @@ -133,9 +130,6 @@ func agentCommandResult(agent, prompt string) core.Result { } return core.Result{Value: agentCommandResultValue{command: "coderabbit", args: args}, OK: true} case "local": - // Local model via codex --oss → Ollama. Default model: devstral-24b - // socat proxies localhost:11434 → host.docker.internal:11434 - // because codex hardcodes localhost check for Ollama. localModel := model if localModel == "" { localModel = "devstral-24b" @@ -163,46 +157,36 @@ func containerCommand(command string, args []string, repoDir, metaDir string) (s dockerArgs := []string{ "run", "--rm", - // Host access for Ollama (local models) "--add-host=host.docker.internal:host-gateway", - // Workspace: repo + meta "-v", core.Concat(repoDir, ":/workspace"), "-v", core.Concat(metaDir, ":/workspace/.meta"), "-w", "/workspace", - // Auth: agent configs only — NO SSH keys, git push runs on host "-v", core.Concat(core.JoinPath(home, ".codex"), ":/home/dev/.codex:ro"), - // API keys — passed by name, Docker resolves from host env "-e", "OPENAI_API_KEY", "-e", "ANTHROPIC_API_KEY", "-e", "GEMINI_API_KEY", "-e", "GOOGLE_API_KEY", - // Agent environment "-e", "TERM=dumb", "-e", "NO_COLOR=1", "-e", "CI=true", "-e", "GIT_USER_NAME=Virgil", "-e", "GIT_USER_EMAIL=virgil@lethean.io", - // Go workspace — local modules bypass checksum verification "-e", "GONOSUMCHECK=dappco.re/*,forge.lthn.ai/*", "-e", "GOFLAGS=-mod=mod", } - // Mount Claude config if dispatching claude agent if command == "claude" { dockerArgs = append(dockerArgs, "-v", core.Concat(core.JoinPath(home, ".claude"), ":/home/dev/.claude:ro"), ) } - // Mount Gemini config if dispatching gemini agent if command == "gemini" { dockerArgs = append(dockerArgs, "-v", core.Concat(core.JoinPath(home, ".gemini"), ":/home/dev/.gemini:ro"), ) } - // Wrap agent command in sh -c to chmod workspace after exit. - // Docker runs as a different user — without this, host can't delete workspace files. quoted := core.NewBuilder() quoted.WriteString(command) for _, a := range args { @@ -252,15 +236,14 @@ func (s *PrepSubsystem) trackFailureRate(agent, status string, startedAt time.Ti return true } } else { - s.failCount[pool] = 0 // slow failure = real failure, reset count + s.failCount[pool] = 0 } } else { - s.failCount[pool] = 0 // success resets count + s.failCount[pool] = 0 } return false } -// s.startIssueTracking("/srv/.core/workspace/core/go-io/task-5") func (s *PrepSubsystem) startIssueTracking(workspaceDir string) { if s.forge == nil { return @@ -277,7 +260,6 @@ func (s *PrepSubsystem) startIssueTracking(workspaceDir string) { s.forge.Issues.StartStopwatch(context.Background(), org, workspaceStatus.Repo, int64(workspaceStatus.Issue)) } -// s.stopIssueTracking("/srv/.core/workspace/core/go-io/task-5") func (s *PrepSubsystem) stopIssueTracking(workspaceDir string) { if s.forge == nil { return @@ -351,7 +333,6 @@ func (s *PrepSubsystem) onAgentComplete(agent, workspaceDir, outputFile string, s.broadcastComplete(agent, workspaceDir, finalStatus) - // c.PerformAsync("agentic.complete", core.NewOptions(core.Option{Key: "workspace", Value: workspaceDir})) if finalStatus == "completed" && s.ServiceRuntime != nil { s.Core().PerformAsync("agentic.complete", core.NewOptions( core.Option{Key: "workspace", Value: workspaceDir}, @@ -370,10 +351,8 @@ func (s *PrepSubsystem) spawnAgent(agent, prompt, workspaceDir string) (int, str metaDir := WorkspaceMetaDir(workspaceDir) outputFile := agentOutputFile(workspaceDir, agent) - // Clean up stale BLOCKED.md from previous runs fs.Delete(WorkspaceBlockedPath(workspaceDir)) - // All agents run containerised command, args = containerCommand(command, args, repoDir, metaDir) procSvc, ok := core.ServiceFor[*process.Service](s.Core(), "process") @@ -397,8 +376,6 @@ func (s *PrepSubsystem) spawnAgent(agent, prompt, workspaceDir string) (int, str s.broadcastStart(agent, workspaceDir) s.startIssueTracking(workspaceDir) - // Register a one-shot Action that monitors this agent, then run it via PerformAsync. - // PerformAsync tracks it in Core's WaitGroup — ServiceShutdown waits for it. monitorAction := core.Concat("agentic.monitor.", core.Replace(WorkspaceName(workspaceDir), "/", ".")) monitor := &agentCompletionMonitor{ service: s, @@ -419,8 +396,6 @@ type completionProcess interface { Output() string } -// monitor := &agentCompletionMonitor{service: s, agent: "codex", workspaceDir: workspaceDir, outputFile: outputFile, process: proc} -// s.Core().Action("agentic.monitor.core.go-io.task-5", monitor.run) type agentCompletionMonitor struct { service *PrepSubsystem agent string @@ -443,7 +418,6 @@ func (m *agentCompletionMonitor) run(_ context.Context, _ core.Options) core.Res return core.Result{OK: true} } -// passed := s.runQA(workspaceDir) func (s *PrepSubsystem) runQA(workspaceDir string) bool { ctx := context.Background() repoDir := WorkspaceRepoDir(workspaceDir) diff --git a/pkg/agentic/dispatch_sync.go b/pkg/agentic/dispatch_sync.go index a8fb15c..bade12e 100644 --- a/pkg/agentic/dispatch_sync.go +++ b/pkg/agentic/dispatch_sync.go @@ -29,7 +29,6 @@ type DispatchSyncResult struct { // result := prep.DispatchSync(ctx, input) func (s *PrepSubsystem) DispatchSync(ctx context.Context, input DispatchSyncInput) DispatchSyncResult { - // Prep workspace prepInput := PrepInput{ Org: input.Org, Repo: input.Repo, @@ -55,7 +54,6 @@ func (s *PrepSubsystem) DispatchSync(ctx context.Context, input DispatchSyncInpu core.Print(nil, " workspace: %s", workspaceDir) core.Print(nil, " branch: %s", prepOut.Branch) - // Spawn agent directly — no queue, no concurrency check pid, processID, _, err := s.spawnAgent(input.Agent, prompt, workspaceDir) if err != nil { return DispatchSyncResult{Error: core.E("agentic.DispatchSync", "spawn agent failed", err)} @@ -69,7 +67,6 @@ func (s *PrepSubsystem) DispatchSync(ctx context.Context, input DispatchSyncInpu runtime = s.Core() } - // Poll for process exit ticker := time.NewTicker(3 * time.Second) defer ticker.Stop() @@ -79,7 +76,6 @@ func (s *PrepSubsystem) DispatchSync(ctx context.Context, input DispatchSyncInpu return DispatchSyncResult{Error: core.E("agentic.DispatchSync", "cancelled", ctx.Err())} case <-ticker.C: if pid > 0 && !ProcessAlive(runtime, processID, pid) { - // Process exited — read final status result := ReadStatusResult(workspaceDir) st, ok := workspaceStatusValue(result) if !ok { diff --git a/pkg/agentic/mirror.go b/pkg/agentic/mirror.go index 9ca5d34..50c7155 100644 --- a/pkg/agentic/mirror.go +++ b/pkg/agentic/mirror.go @@ -10,9 +10,9 @@ import ( ) type MirrorInput struct { - Repo string `json:"repo,omitempty"` // Specific repo, or empty for all - DryRun bool `json:"dry_run,omitempty"` // Preview without pushing - MaxFiles int `json:"max_files,omitempty"` // Max files per PR (default 50, CodeRabbit limit) + Repo string `json:"repo,omitempty"` + DryRun bool `json:"dry_run,omitempty"` + MaxFiles int `json:"max_files,omitempty"` } type MirrorOutput struct { @@ -52,7 +52,6 @@ func (s *PrepSubsystem) mirror(ctx context.Context, _ *mcp.CallToolRequest, inpu basePath = core.JoinPath(basePath, "core") } - // Build list of repos to sync var repos []string if input.Repo != "" { repos = []string{input.Repo} @@ -66,23 +65,19 @@ func (s *PrepSubsystem) mirror(ctx context.Context, _ *mcp.CallToolRequest, inpu for _, repo := range repos { repoDir := core.JoinPath(basePath, repo) - // Check if github remote exists if !s.hasRemote(repoDir, "github") { skipped = append(skipped, core.Concat(repo, ": no github remote")) continue } - // Fetch github to get current state process.RunIn(ctx, repoDir, "git", "fetch", "github") - // Check how far ahead local default branch is vs github localBase := s.DefaultBranch(repoDir) ahead := s.commitsAhead(repoDir, "github/main", localBase) if ahead == 0 { - continue // Already in sync + continue } - // Count files changed files := s.filesChanged(repoDir, "github/main", localBase) sync := MirrorSync{ @@ -91,7 +86,6 @@ func (s *PrepSubsystem) mirror(ctx context.Context, _ *mcp.CallToolRequest, inpu FilesChanged: files, } - // Skip if too many files for one PR if files > maxFiles { sync.Skipped = core.Sprintf("%d files exceeds limit of %d", files, maxFiles) synced = append(synced, sync) @@ -104,10 +98,8 @@ func (s *PrepSubsystem) mirror(ctx context.Context, _ *mcp.CallToolRequest, inpu continue } - // Ensure dev branch exists on GitHub s.ensureDevBranch(repoDir) - // Push local main to github dev (explicit main, not HEAD) base := s.DefaultBranch(repoDir) if r := process.RunIn(ctx, repoDir, "git", "push", "github", core.Concat(base, ":refs/heads/dev"), "--force"); !r.OK { sync.Skipped = core.Sprintf("push failed: %s", r.Value) @@ -116,7 +108,6 @@ func (s *PrepSubsystem) mirror(ctx context.Context, _ *mcp.CallToolRequest, inpu } sync.Pushed = true - // Create PR: dev → main on GitHub pullRequestURL, err := s.createGitHubPR(ctx, repoDir, repo, ahead, files) if err != nil { sync.Skipped = core.Sprintf("PR creation failed: %v", err) @@ -140,7 +131,6 @@ func (s *PrepSubsystem) createGitHubPR(ctx context.Context, repoDir, repo string ghRepo := core.Sprintf("%s/%s", GitHubOrg(), repo) process := s.Core().Process() - // Check if there's already an open PR from dev r := process.RunIn(ctx, repoDir, "gh", "pr", "list", "--repo", ghRepo, "--head", "dev", "--state", "open", "--json", "url", "--limit", "1") if r.OK { out := r.Value.(string) @@ -209,7 +199,6 @@ func (s *PrepSubsystem) listLocalRepos(basePath string) []string { if !fs.IsDir(p) { continue } - // Must have a .git directory if fs.IsDir(core.JoinPath(basePath, name, ".git")) { repos = append(repos, name) } diff --git a/pkg/agentic/plan.go b/pkg/agentic/plan.go index 9970272..1293e7f 100644 --- a/pkg/agentic/plan.go +++ b/pkg/agentic/plan.go @@ -15,7 +15,7 @@ import ( type Plan struct { ID string `json:"id"` Title string `json:"title"` - Status string `json:"status"` // draft, ready, in_progress, needs_verification, verified, approved + Status string `json:"status"` Repo string `json:"repo,omitempty"` Org string `json:"org,omitempty"` Objective string `json:"objective"` @@ -30,7 +30,7 @@ type Plan struct { type Phase struct { Number int `json:"number"` Name string `json:"name"` - Status string `json:"status"` // pending, in_progress, done + Status string `json:"status"` Criteria []string `json:"criteria,omitempty"` Tests int `json:"tests,omitempty"` Notes string `json:"notes,omitempty"` @@ -144,7 +144,6 @@ func (s *PrepSubsystem) planCreate(_ context.Context, _ *mcp.CallToolRequest, in UpdatedAt: time.Now(), } - // Default phase status to pending for i := range plan.Phases { if plan.Phases[i].Status == "" { plan.Phases[i].Status = "pending" @@ -216,7 +215,6 @@ func (s *PrepSubsystem) planUpdate(_ context.Context, _ *mcp.CallToolRequest, in return nil, PlanUpdateOutput{}, core.E("planUpdate", "invalid plan payload", nil) } - // Apply partial updates if input.Status != "" { if !validPlanStatus(input.Status) { return nil, PlanUpdateOutput{}, core.E("planUpdate", core.Concat("invalid status: ", input.Status, " (valid: draft, ready, in_progress, needs_verification, verified, approved)"), nil) diff --git a/pkg/agentic/pr.go b/pkg/agentic/pr.go index 7a1bc90..10dab3a 100644 --- a/pkg/agentic/pr.go +++ b/pkg/agentic/pr.go @@ -12,11 +12,11 @@ import ( // input := agentic.CreatePRInput{Workspace: "core/go-io/task-42", Title: "Fix watcher panic"} type CreatePRInput struct { - Workspace string `json:"workspace"` // workspace name (e.g. "core/go-io/task-42") - Title string `json:"title,omitempty"` // PR title (default: task description) - Body string `json:"body,omitempty"` // PR body (default: auto-generated) - Base string `json:"base,omitempty"` // base branch (default: "main") - DryRun bool `json:"dry_run,omitempty"` // preview without creating + Workspace string `json:"workspace"` + Title string `json:"title,omitempty"` + Body string `json:"body,omitempty"` + Base string `json:"base,omitempty"` + DryRun bool `json:"dry_run,omitempty"` } // out := agentic.CreatePROutput{Success: true, PRURL: "https://forge.example/core/go-io/pulls/12", PRNum: 12} @@ -52,7 +52,6 @@ func (s *PrepSubsystem) createPR(ctx context.Context, _ *mcp.CallToolRequest, in return nil, CreatePROutput{}, core.E("createPR", core.Concat("workspace not found: ", input.Workspace), nil) } - // Read workspace status for repo, branch, issue context result := ReadStatusResult(workspaceDir) workspaceStatus, ok := workspaceStatusValue(result) if !ok { @@ -81,7 +80,6 @@ func (s *PrepSubsystem) createPR(ctx context.Context, _ *mcp.CallToolRequest, in base = "dev" } - // Build PR title title := input.Title if title == "" { title = workspaceStatus.Task @@ -90,7 +88,6 @@ func (s *PrepSubsystem) createPR(ctx context.Context, _ *mcp.CallToolRequest, in title = core.Sprintf("Agent work on %s", workspaceStatus.Branch) } - // Build PR body body := input.Body if body == "" { body = s.buildPRBody(workspaceStatus) @@ -105,24 +102,20 @@ func (s *PrepSubsystem) createPR(ctx context.Context, _ *mcp.CallToolRequest, in }, nil } - // Push branch to Forge (origin is the local clone, not Forge) forgeRemote := core.Sprintf("ssh://git@forge.lthn.ai:2223/%s/%s.git", org, workspaceStatus.Repo) pushResult := s.Core().Process().RunIn(ctx, repoDir, "git", "push", forgeRemote, workspaceStatus.Branch) if !pushResult.OK { return nil, CreatePROutput{}, core.E("createPR", core.Concat("git push failed: ", pushResult.Value.(string)), nil) } - // Create PR via Forge API pullRequestURL, pullRequestNumber, err := s.forgeCreatePR(ctx, org, workspaceStatus.Repo, workspaceStatus.Branch, base, title, body) if err != nil { return nil, CreatePROutput{}, core.E("createPR", "failed to create PR", err) } - // Update status with PR URL workspaceStatus.PRURL = pullRequestURL writeStatusResult(workspaceDir, workspaceStatus) - // Comment on issue if tracked if workspaceStatus.Issue > 0 { comment := core.Sprintf("Pull request created: %s", pullRequestURL) s.commentOnIssue(ctx, org, workspaceStatus.Repo, workspaceStatus.Issue, comment) diff --git a/pkg/agentic/prep.go b/pkg/agentic/prep.go index 2ee75b3..dc31a0a 100644 --- a/pkg/agentic/prep.go +++ b/pkg/agentic/prep.go @@ -30,13 +30,13 @@ type PrepSubsystem struct { brainKey string codePath string startupContext context.Context - dispatchMu sync.Mutex // serialises concurrency check + spawn + dispatchMu sync.Mutex drainMu sync.Mutex pokeCh chan struct{} frozen bool - backoff map[string]time.Time // pool → paused until - failCount map[string]int // pool → consecutive fast failures - workspaces *core.Registry[*WorkspaceStatus] // in-memory workspace state + backoff map[string]time.Time + failCount map[string]int + workspaces *core.Registry[*WorkspaceStatus] } var _ coremcp.Subsystem = (*PrepSubsystem)(nil) @@ -83,19 +83,10 @@ func (s *PrepSubsystem) SetCore(c *core.Core) { func (s *PrepSubsystem) OnStartup(ctx context.Context) core.Result { c := s.Core() - // Entitlement — gates agentic Actions when queue is frozen. - // Per-agent concurrency is checked inside handlers (needs Options for agent name). - // Entitlement gates the global capability: "can this Core dispatch at all?" - // - // e := c.Entitled("agentic.dispatch") - // e.Allowed // false when frozen - // e.Reason // "agent queue is frozen" c.SetEntitlementChecker(func(action string, qty int, _ context.Context) core.Entitlement { - // Only gate agentic.* actions if !core.HasPrefix(action, "agentic.") { return core.Entitlement{Allowed: true, Unlimited: true} } - // Read-only + internal actions always allowed if core.HasPrefix(action, "agentic.monitor.") || core.HasPrefix(action, "agentic.complete") { return core.Entitlement{Allowed: true, Unlimited: true} } @@ -105,20 +96,14 @@ func (s *PrepSubsystem) OnStartup(ctx context.Context) core.Result { "agentic.prompt", "agentic.task", "agentic.flow", "agentic.persona": return core.Entitlement{Allowed: true, Unlimited: true} } - // Write actions gated by frozen state if s.frozen { return core.Entitlement{Allowed: false, Reason: "agent queue is frozen — shutting down"} } return core.Entitlement{Allowed: true} }) - // Data — mount embedded content so other services can access it via c.Data() - // - // c.Data().ReadString("prompts/coding.md") - // c.Data().ListNames("flows") lib.MountData(c) - // Transport — register HTTP protocol + Drive endpoints RegisterHTTPTransport(c) c.Drive().New(core.NewOptions( core.Option{Key: "name", Value: "forge"}, @@ -131,7 +116,6 @@ func (s *PrepSubsystem) OnStartup(ctx context.Context) core.Result { core.Option{Key: "token", Value: s.brainKey}, )) - // Dispatch & workspace c.Action("agentic.dispatch", s.handleDispatch).Description = "Prep workspace and spawn a subagent" c.Action("agentic.prep", s.handlePrep).Description = "Clone repo and build agent prompt" c.Action("agentic.status", s.handleStatus).Description = "List workspace states (running/completed/blocked)" @@ -139,7 +123,6 @@ func (s *PrepSubsystem) OnStartup(ctx context.Context) core.Result { c.Action("agentic.scan", s.handleScan).Description = "Scan Forge repos for actionable issues" c.Action("agentic.watch", s.handleWatch).Description = "Watch workspace for changes and report" - // Pipeline c.Action("agentic.qa", s.handleQA).Description = "Run build + test QA checks on workspace" c.Action("agentic.auto-pr", s.handleAutoPR).Description = "Create PR from completed workspace" c.Action("agentic.verify", s.handleVerify).Description = "Verify PR and auto-merge if clean" @@ -147,7 +130,6 @@ func (s *PrepSubsystem) OnStartup(ctx context.Context) core.Result { c.Action("agentic.poke", s.handlePoke).Description = "Drain next queued task from the queue" c.Action("agentic.mirror", s.handleMirror).Description = "Mirror agent branches to GitHub" - // Forge c.Action("agentic.issue.get", s.handleIssueGet).Description = "Get a Forge issue by number" c.Action("agentic.issue.list", s.handleIssueList).Description = "List Forge issues for a repo" c.Action("agentic.issue.create", s.handleIssueCreate).Description = "Create a Forge issue" @@ -155,19 +137,15 @@ func (s *PrepSubsystem) OnStartup(ctx context.Context) core.Result { c.Action("agentic.pr.list", s.handlePRList).Description = "List Forge PRs for a repo" c.Action("agentic.pr.merge", s.handlePRMerge).Description = "Merge a Forge PR" - // Review c.Action("agentic.review-queue", s.handleReviewQueue).Description = "Run CodeRabbit review on completed workspaces" - // Epic c.Action("agentic.epic", s.handleEpic).Description = "Create sub-issues from an epic plan" - // Content — accessible via IPC, no lib import needed c.Action("agentic.prompt", s.handlePrompt).Description = "Read a system prompt by slug" c.Action("agentic.task", s.handleTask).Description = "Read a task plan by slug" c.Action("agentic.flow", s.handleFlow).Description = "Read a build/release flow by slug" c.Action("agentic.persona", s.handlePersona).Description = "Read a persona by path" - // Completion pipeline — Task composition c.Task("agent.completion", core.Task{ Description: "QA → PR → Verify → Merge", Steps: []core.Step{ @@ -179,17 +157,10 @@ func (s *PrepSubsystem) OnStartup(ctx context.Context) core.Result { }, }) - // PerformAsync wrapper — runs the completion Task in background with progress tracking. - // c.PerformAsync("agentic.complete", options) broadcasts ActionTaskStarted/Completed. c.Action("agentic.complete", s.handleComplete).Description = "Run completion pipeline (QA → PR → Verify) in background" - // Hydrate workspace registry from disk s.hydrateWorkspaces() - // QUERY handler — "what workspaces exist?" - // - // r := c.QUERY(agentic.WorkspaceQuery{}) - // if r.OK { workspaces := r.Value.(*core.Registry[*WorkspaceStatus]) } c.RegisterQuery(s.handleWorkspaceQuery) s.StartRunner() diff --git a/pkg/agentic/remote.go b/pkg/agentic/remote.go index af25ad9..a080a9c 100644 --- a/pkg/agentic/remote.go +++ b/pkg/agentic/remote.go @@ -10,14 +10,14 @@ import ( // input := agentic.RemoteDispatchInput{Host: "charon", Repo: "go-io", Task: "Run the review queue"} type RemoteDispatchInput struct { - Host string `json:"host"` // Remote agent host (e.g. "charon", "10.69.69.165:9101") - Repo string `json:"repo"` // Target repo - Task string `json:"task"` // What the agent should do - Agent string `json:"agent,omitempty"` // Agent type (default: claude:opus) - Template string `json:"template,omitempty"` // Prompt template - Persona string `json:"persona,omitempty"` // Persona slug - Org string `json:"org,omitempty"` // Forge org (default: core) - Variables map[string]string `json:"variables,omitempty"` // Template variables + Host string `json:"host"` + Repo string `json:"repo"` + Task string `json:"task"` + Agent string `json:"agent,omitempty"` + Template string `json:"template,omitempty"` + Persona string `json:"persona,omitempty"` + Org string `json:"org,omitempty"` + Variables map[string]string `json:"variables,omitempty"` } // out := agentic.RemoteDispatchOutput{Success: true, Host: "charon", Repo: "go-io", Agent: "claude:opus"} diff --git a/pkg/agentic/remote_status.go b/pkg/agentic/remote_status.go index 8ac5ed4..94f1b81 100644 --- a/pkg/agentic/remote_status.go +++ b/pkg/agentic/remote_status.go @@ -9,7 +9,7 @@ import ( ) type RemoteStatusInput struct { - Host string `json:"host"` // Remote agent host (e.g. "charon") + Host string `json:"host"` } type RemoteStatusOutput struct { diff --git a/pkg/agentic/resume.go b/pkg/agentic/resume.go index c4769dd..5c96b03 100644 --- a/pkg/agentic/resume.go +++ b/pkg/agentic/resume.go @@ -11,10 +11,10 @@ import ( // input := agentic.ResumeInput{Workspace: "core/go-scm/task-42", Answer: "Use the existing queue config"} type ResumeInput struct { - Workspace string `json:"workspace"` // workspace name (e.g. "core/go-scm/task-42") - Answer string `json:"answer,omitempty"` // answer to the blocked question (written to ANSWER.md) - Agent string `json:"agent,omitempty"` // override agent type (default: same as original) - DryRun bool `json:"dry_run,omitempty"` // preview without executing + Workspace string `json:"workspace"` + Answer string `json:"answer,omitempty"` + Agent string `json:"agent,omitempty"` + DryRun bool `json:"dry_run,omitempty"` } // out := agentic.ResumeOutput{Success: true, Workspace: "core/go-scm/task-42", Agent: "codex"} @@ -42,12 +42,10 @@ func (s *PrepSubsystem) resume(ctx context.Context, _ *mcp.CallToolRequest, inpu workspaceDir := core.JoinPath(WorkspaceRoot(), input.Workspace) repoDir := WorkspaceRepoDir(workspaceDir) - // Verify workspace exists if !fs.IsDir(core.JoinPath(repoDir, ".git")) { return nil, ResumeOutput{}, core.E("resume", core.Concat("workspace not found: ", input.Workspace), nil) } - // Read current status result := ReadStatusResult(workspaceDir) workspaceStatus, ok := workspaceStatusValue(result) if !ok { @@ -59,13 +57,11 @@ func (s *PrepSubsystem) resume(ctx context.Context, _ *mcp.CallToolRequest, inpu return nil, ResumeOutput{}, core.E("resume", core.Concat("workspace is ", workspaceStatus.Status, ", not resumable (must be blocked, failed, or completed)"), nil) } - // Determine agent agent := workspaceStatus.Agent if input.Agent != "" { agent = input.Agent } - // Write ANSWER.md if answer provided if input.Answer != "" { answerPath := workspaceAnswerPath(workspaceDir) content := core.Sprintf("# Answer\n\n%s\n", input.Answer) @@ -75,7 +71,6 @@ func (s *PrepSubsystem) resume(ctx context.Context, _ *mcp.CallToolRequest, inpu } } - // Build resume prompt — inline the task and answer, no file references prompt := core.Concat("You are resuming previous work.\n\nORIGINAL TASK:\n", workspaceStatus.Task) if input.Answer != "" { prompt = core.Concat(prompt, "\n\nANSWER TO YOUR QUESTION:\n", input.Answer) @@ -91,13 +86,11 @@ func (s *PrepSubsystem) resume(ctx context.Context, _ *mcp.CallToolRequest, inpu }, nil } - // Spawn agent via go-process pid, processID, _, err := s.spawnAgent(agent, prompt, workspaceDir) if err != nil { return nil, ResumeOutput{}, err } - // Update status workspaceStatus.Status = "running" workspaceStatus.PID = pid workspaceStatus.ProcessID = processID diff --git a/pkg/agentic/review_queue.go b/pkg/agentic/review_queue.go index 64d8d58..4b821c2 100644 --- a/pkg/agentic/review_queue.go +++ b/pkg/agentic/review_queue.go @@ -13,10 +13,10 @@ import ( // input := agentic.ReviewQueueInput{Reviewer: "coderabbit", Limit: 4, DryRun: true} type ReviewQueueInput struct { - Limit int `json:"limit,omitempty"` // Max PRs to process this run (default: 4) - Reviewer string `json:"reviewer,omitempty"` // "coderabbit" (default), "codex", or "both" - DryRun bool `json:"dry_run,omitempty"` // Preview without acting - LocalOnly bool `json:"local_only,omitempty"` // Run review locally, don't touch GitHub + Limit int `json:"limit,omitempty"` + Reviewer string `json:"reviewer,omitempty"` + DryRun bool `json:"dry_run,omitempty"` + LocalOnly bool `json:"local_only,omitempty"` } // out := agentic.ReviewQueueOutput{Success: true, Processed: []agentic.ReviewResult{{Repo: "go-io", Verdict: "clean"}}} @@ -30,9 +30,9 @@ type ReviewQueueOutput struct { // result := agentic.ReviewResult{Repo: "go-io", Verdict: "findings", Findings: 3, Action: "fix_dispatched"} type ReviewResult struct { Repo string `json:"repo"` - Verdict string `json:"verdict"` // clean, findings, rate_limited, error - Findings int `json:"findings"` // Number of findings (0 = clean) - Action string `json:"action"` // merged, fix_dispatched, skipped, waiting + Verdict string `json:"verdict"` + Findings int `json:"findings"` + Action string `json:"action"` Detail string `json:"detail,omitempty"` } @@ -72,7 +72,6 @@ func (s *PrepSubsystem) reviewQueue(ctx context.Context, _ *mcp.CallToolRequest, } basePath = core.JoinPath(basePath, "core") - // Find repos with draft PRs (ahead of GitHub) candidates := s.findReviewCandidates(basePath) if len(candidates) == 0 { return nil, ReviewQueueOutput{ @@ -91,7 +90,6 @@ func (s *PrepSubsystem) reviewQueue(ctx context.Context, _ *mcp.CallToolRequest, continue } - // Check rate limit from previous run if rateInfo != nil && rateInfo.Limited && time.Now().Before(rateInfo.RetryAt) { skipped = append(skipped, core.Concat(repo, " (rate limited)")) continue @@ -104,7 +102,6 @@ func (s *PrepSubsystem) reviewQueue(ctx context.Context, _ *mcp.CallToolRequest, } result := s.reviewRepo(ctx, repoDir, repo, reviewer, input.DryRun, input.LocalOnly) - // Parse rate limit from result if result.Verdict == "rate_limited" { retryAfter := parseRetryAfter(result.Detail) rateInfo = &RateLimitInfo{ @@ -112,7 +109,6 @@ func (s *PrepSubsystem) reviewQueue(ctx context.Context, _ *mcp.CallToolRequest, RetryAt: time.Now().Add(retryAfter), Message: result.Detail, } - // Don't count rate-limited as processed — save the slot skipped = append(skipped, core.Concat(repo, " (rate limited: ", retryAfter.String(), ")")) continue } @@ -120,7 +116,6 @@ func (s *PrepSubsystem) reviewQueue(ctx context.Context, _ *mcp.CallToolRequest, processed = append(processed, result) } - // Save rate limit state for next run if rateInfo != nil { s.saveRateLimitState(rateInfo) } @@ -159,14 +154,12 @@ func (s *PrepSubsystem) reviewRepo(ctx context.Context, repoDir, repo, reviewer result := ReviewResult{Repo: repo} process := s.Core().Process() - // Check saved rate limit if rl := s.loadRateLimitState(); rl != nil && rl.Limited && time.Now().Before(rl.RetryAt) { result.Verdict = "rate_limited" result.Detail = core.Sprintf("retry after %s", rl.RetryAt.Format(time.RFC3339)) return result } - // Run reviewer CLI locally — use the reviewer passed from reviewQueue if reviewer == "" { reviewer = "coderabbit" } @@ -174,24 +167,20 @@ func (s *PrepSubsystem) reviewRepo(ctx context.Context, repoDir, repo, reviewer r := process.RunIn(ctx, repoDir, command, args...) output, _ := r.Value.(string) - // Parse rate limit (both reviewers use similar patterns) if core.Contains(output, "Rate limit exceeded") || core.Contains(output, "rate limit") { result.Verdict = "rate_limited" result.Detail = output return result } - // Parse error if !r.OK && !core.Contains(output, "No findings") && !core.Contains(output, "no issues") { result.Verdict = "error" result.Detail = output return result } - // Store raw output for training data s.storeReviewOutput(repoDir, repo, reviewer, output) - // Parse verdict if core.Contains(output, "No findings") || core.Contains(output, "no issues") || core.Contains(output, "LGTM") { result.Verdict = "clean" result.Findings = 0 @@ -206,14 +195,12 @@ func (s *PrepSubsystem) reviewRepo(ctx context.Context, repoDir, repo, reviewer return result } - // Push to GitHub and mark PR ready / merge if err := s.pushAndMerge(ctx, repoDir, repo); err != nil { result.Action = core.Concat("push failed: ", err.Error()) } else { result.Action = "merged" } } else { - // Has findings — count them and dispatch fix agent result.Verdict = "findings" result.Findings = countFindings(output) result.Detail = truncate(output, 500) @@ -250,7 +237,6 @@ func (s *PrepSubsystem) pushAndMerge(ctx context.Context, repoDir, repo string) return core.E("pushAndMerge", core.Concat("push failed: ", r.Value.(string)), nil) } - // Mark PR ready if draft process.RunIn(ctx, repoDir, "gh", "pr", "ready", "--repo", core.Concat(GitHubOrg(), "/", repo)) if r := process.RunIn(ctx, repoDir, "gh", "pr", "merge", "--merge", "--delete-branch"); !r.OK { @@ -262,7 +248,6 @@ func (s *PrepSubsystem) pushAndMerge(ctx context.Context, repoDir, repo string) // _ = s.dispatchFixFromQueue(ctx, "go-io", task) func (s *PrepSubsystem) dispatchFixFromQueue(ctx context.Context, repo, task string) error { - // Use the dispatch system — creates workspace, spawns agent input := DispatchInput{ Repo: repo, Task: task, @@ -280,7 +265,6 @@ func (s *PrepSubsystem) dispatchFixFromQueue(ctx context.Context, repo, task str // findings := countFindings(output) func countFindings(output string) int { - // Count lines that look like findings count := 0 for _, line := range core.Split(output, "\n") { trimmed := core.Trim(line) @@ -291,7 +275,7 @@ func countFindings(output string) int { } } if count == 0 && !core.Contains(output, "No findings") { - count = 1 // At least one finding if not clean + count = 1 } return count } @@ -310,7 +294,6 @@ func parseRetryAfter(message string) time.Duration { } return time.Duration(mins)*time.Minute + time.Duration(secs)*time.Second } - // Default: 5 minutes return 5 * time.Minute } diff --git a/pkg/agentic/status.go b/pkg/agentic/status.go index 5c5e3bb..c8c4b36 100644 --- a/pkg/agentic/status.go +++ b/pkg/agentic/status.go @@ -13,28 +13,28 @@ import ( // result := ReadStatusResult(workspaceDir) // if result.OK && result.Value.(*WorkspaceStatus).Status == "completed" { autoCreatePR(workspaceDir) } type WorkspaceStatus struct { - Status string `json:"status"` // running, completed, blocked, failed - Agent string `json:"agent"` // gemini, claude, codex - Repo string `json:"repo"` // target repo - Org string `json:"org,omitempty"` // forge org (e.g. "core") - Task string `json:"task"` // task description - Branch string `json:"branch,omitempty"` // git branch name - Issue int `json:"issue,omitempty"` // forge issue number - PID int `json:"pid,omitempty"` // OS process ID (if running) - ProcessID string `json:"process_id,omitempty"` // go-process ID for managed lookup - StartedAt time.Time `json:"started_at"` // when dispatch started - UpdatedAt time.Time `json:"updated_at"` // last status change - Question string `json:"question,omitempty"` // from BLOCKED.md - Runs int `json:"runs"` // how many times dispatched/resumed - PRURL string `json:"pr_url,omitempty"` // pull request URL (after PR created) + Status string `json:"status"` + Agent string `json:"agent"` + Repo string `json:"repo"` + Org string `json:"org,omitempty"` + Task string `json:"task"` + Branch string `json:"branch,omitempty"` + Issue int `json:"issue,omitempty"` + PID int `json:"pid,omitempty"` + ProcessID string `json:"process_id,omitempty"` + StartedAt time.Time `json:"started_at"` + UpdatedAt time.Time `json:"updated_at"` + Question string `json:"question,omitempty"` + Runs int `json:"runs"` + PRURL string `json:"pr_url,omitempty"` } // r := c.QUERY(agentic.WorkspaceQuery{}) // if r.OK { reg := r.Value.(*core.Registry[*WorkspaceStatus]) } // r := c.QUERY(agentic.WorkspaceQuery{Name: "core/go-io/task-5"}) type WorkspaceQuery struct { - Name string // specific workspace (empty = all) - Status string // filter by status (empty = all) + Name string + Status string } func writeStatus(workspaceDir string, status *WorkspaceStatus) error { @@ -103,9 +103,9 @@ func workspaceStatusValue(result core.Result) (*WorkspaceStatus, bool) { // input := agentic.StatusInput{Workspace: "core/go-io/task-42", Limit: 50} type StatusInput struct { - Workspace string `json:"workspace,omitempty"` // specific workspace name, or empty for all - Limit int `json:"limit,omitempty"` // max results (default 100) - Status string `json:"status,omitempty"` // filter: running, completed, failed, blocked + Workspace string `json:"workspace,omitempty"` + Limit int `json:"limit,omitempty"` + Status string `json:"status,omitempty"` } // out := agentic.StatusOutput{Total: 42, Running: 3, Queued: 10, Completed: 25} @@ -154,7 +154,6 @@ func (s *PrepSubsystem) status(ctx context.Context, _ *mcp.CallToolRequest, inpu continue } - // If status is "running", check whether the managed process is still alive. if workspaceStatus.Status == "running" && (workspaceStatus.ProcessID != "" || workspaceStatus.PID > 0) { if !ProcessAlive(runtime, workspaceStatus.ProcessID, workspaceStatus.PID) { blockedPath := workspaceBlockedPath(workspaceDir) diff --git a/pkg/monitor/harvest.go b/pkg/monitor/harvest.go index 18917d0..498c8ed 100644 --- a/pkg/monitor/harvest.go +++ b/pkg/monitor/harvest.go @@ -18,7 +18,7 @@ type harvestResult struct { repo string branch string files int - rejected string // non-empty if rejected (binary, too large, etc.) + rejected string } // summary := m.harvestCompleted() @@ -151,7 +151,6 @@ func (m *Subsystem) countUnpushed(repoDir, branch string) int { base := m.defaultBranch(repoDir) out := m.gitOutput(repoDir, "rev-list", "--count", core.Concat("origin/", base, "..", branch)) if out == "" { - // Fallback out2 := m.gitOutput(repoDir, "log", "--oneline", core.Concat(base, "..", branch)) if out2 == "" { return 0 @@ -251,7 +250,7 @@ func updateStatus(workspaceDir, status, question string) { if question != "" { workspaceStatus["question"] = question } else { - delete(workspaceStatus, "question") // clear stale question from previous state + delete(workspaceStatus, "question") } statusPath := agentic.WorkspaceStatusPath(workspaceDir) if writeResult := fs.WriteAtomic(statusPath, core.JSONMarshalString(workspaceStatus)); !writeResult.OK {