feat(agent): background runner, slim status, Docker dispatch, stopwatch, CLI fixes

- Add background queue runner (runner.go) — 30s tick + poke on completion
- drainQueue now loops to fill all slots per tick
- Add run orchestrator command — standalone queue runner without MCP
- Slim agentic_status — stats only, blocked workspaces listed
- Docker containerised dispatch — all agents run in core-dev container
- Forge stopwatch start/stop on issue when agent starts/completes
- issue create supports --milestone, --assignee, --ref
- Auto-PR targets dev branch (not main)
- PR body includes Closes #N for issue-linked work
- CLI usage strings use spaces not slashes
- Review agent uses exec with sandbox bypass (not codex review subcommand)
- Local model support via codex --oss with socat Ollama proxy

Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
Snider 2026-03-23 12:53:33 +00:00
parent 9b225af2f9
commit 6d4b92737e
10 changed files with 330 additions and 91 deletions

View file

@ -8,6 +8,7 @@ import (
"dappco.re/go/core"
"dappco.re/go/core/forge"
forge_types "dappco.re/go/core/forge/types"
)
// newForgeClient creates a Forge client from env config.
@ -48,7 +49,7 @@ func registerForgeCommands(c *core.Core) {
Action: func(opts core.Options) core.Result {
org, repo, num := parseArgs(opts)
if repo == "" || num == 0 {
core.Print(nil, "usage: core-agent issue/get <repo> --number=N [--org=core]")
core.Print(nil, "usage: core-agent issue get <repo> --number=N [--org=core]")
return core.Result{OK: false}
}
@ -75,7 +76,7 @@ func registerForgeCommands(c *core.Core) {
Action: func(opts core.Options) core.Result {
org, repo, _ := parseArgs(opts)
if repo == "" {
core.Print(nil, "usage: core-agent issue/list <repo> [--org=core]")
core.Print(nil, "usage: core-agent issue list <repo> [--org=core]")
return core.Result{OK: false}
}
@ -102,7 +103,7 @@ func registerForgeCommands(c *core.Core) {
org, repo, num := parseArgs(opts)
body := opts.String("body")
if repo == "" || num == 0 || body == "" {
core.Print(nil, "usage: core-agent issue/comment <repo> --number=N --body=\"text\" [--org=core]")
core.Print(nil, "usage: core-agent issue comment <repo> --number=N --body=\"text\" [--org=core]")
return core.Result{OK: false}
}
@ -118,6 +119,77 @@ func registerForgeCommands(c *core.Core) {
},
})
c.Command("issue/create", core.Command{
Description: "Create a Forge issue",
Action: func(opts core.Options) core.Result {
org, repo, _ := parseArgs(opts)
title := opts.String("title")
body := opts.String("body")
labels := opts.String("labels")
milestone := opts.String("milestone")
assignee := opts.String("assignee")
ref := opts.String("ref")
if repo == "" || title == "" {
core.Print(nil, "usage: core-agent issue create <repo> --title=\"...\" [--body=\"...\"] [--labels=\"agentic,bug\"] [--milestone=\"v0.2.0\"] [--assignee=virgil] [--ref=dev] [--org=core]")
return core.Result{OK: false}
}
createOpts := &forge_types.CreateIssueOption{
Title: title,
Body: body,
Ref: ref,
}
// Resolve milestone name to ID
if milestone != "" {
f := newForgeClient()
milestones, err := f.Milestones.ListAll(ctx, forge.Params{"owner": org, "repo": repo})
if err == nil {
for _, m := range milestones {
if m.Title == milestone {
createOpts.Milestone = m.ID
break
}
}
}
}
// Set assignee
if assignee != "" {
createOpts.Assignees = []string{assignee}
}
// Resolve label names to IDs if provided
if labels != "" {
f := newForgeClient()
labelNames := core.Split(labels, ",")
allLabels, err := f.Labels.ListRepoLabels(ctx, org, repo)
if err == nil {
for _, name := range labelNames {
name = core.Trim(name)
for _, l := range allLabels {
if l.Name == name {
createOpts.Labels = append(createOpts.Labels, l.ID)
break
}
}
}
}
}
f := newForgeClient()
issue, err := f.Issues.Create(ctx, forge.Params{"owner": org, "repo": repo}, createOpts)
if err != nil {
core.Print(nil, "error: %v", err)
return core.Result{Value: err, OK: false}
}
core.Print(nil, "#%d %s", issue.Index, issue.Title)
core.Print(nil, " url: %s", issue.HTMLURL)
return core.Result{Value: issue.Index, OK: true}
},
})
// --- Pull Requests ---
c.Command("pr/get", core.Command{
@ -125,7 +197,7 @@ func registerForgeCommands(c *core.Core) {
Action: func(opts core.Options) core.Result {
org, repo, num := parseArgs(opts)
if repo == "" || num == 0 {
core.Print(nil, "usage: core-agent pr/get <repo> --number=N [--org=core]")
core.Print(nil, "usage: core-agent pr get <repo> --number=N [--org=core]")
return core.Result{OK: false}
}
@ -155,7 +227,7 @@ func registerForgeCommands(c *core.Core) {
Action: func(opts core.Options) core.Result {
org, repo, _ := parseArgs(opts)
if repo == "" {
core.Print(nil, "usage: core-agent pr/list <repo> [--org=core]")
core.Print(nil, "usage: core-agent pr list <repo> [--org=core]")
return core.Result{OK: false}
}
@ -185,7 +257,7 @@ func registerForgeCommands(c *core.Core) {
method = "merge"
}
if repo == "" || num == 0 {
core.Print(nil, "usage: core-agent pr/merge <repo> --number=N [--method=merge|rebase|squash] [--org=core]")
core.Print(nil, "usage: core-agent pr merge <repo> --number=N [--method=merge|rebase|squash] [--org=core]")
return core.Result{OK: false}
}
@ -207,7 +279,7 @@ func registerForgeCommands(c *core.Core) {
Action: func(opts core.Options) core.Result {
org, repo, _ := parseArgs(opts)
if repo == "" {
core.Print(nil, "usage: core-agent repo/get <repo> [--org=core]")
core.Print(nil, "usage: core-agent repo get <repo> [--org=core]")
return core.Result{OK: false}
}

View file

@ -307,6 +307,7 @@ func main() {
}
mon.SetNotifier(mcpSvc)
prep.StartRunner()
return mcpSvc, mon, nil
}
@ -380,6 +381,37 @@ func main() {
},
})
// run orchestrator — standalone queue runner without MCP stdio
c.Command("run/orchestrator", core.Command{
Description: "Run the queue orchestrator (standalone, no MCP)",
Action: func(opts core.Options) core.Result {
procFactory := process.NewService(process.Options{})
procResult, err := procFactory(c)
if err != nil {
return core.Result{Value: err, OK: false}
}
if procSvc, ok := procResult.(*process.Service); ok {
_ = process.SetDefault(procSvc)
}
mon := monitor.New()
prep := agentic.NewPrep()
prep.SetCompletionNotifier(mon)
mon.Start(ctx)
prep.StartRunner()
core.Print(os.Stderr, "core-agent orchestrator running (pid %s)", core.Env("PID"))
core.Print(os.Stderr, " workspace: %s", agentic.WorkspaceRoot())
core.Print(os.Stderr, " watching queue, draining on 30s tick + completion poke")
// Block until signal
<-ctx.Done()
core.Print(os.Stderr, "orchestrator shutting down")
return core.Result{OK: true}
},
})
// Run CLI — resolves os.Args to command path
r := c.Cli().Run()
if !r.OK {

View file

@ -20,15 +20,13 @@ func (s *PrepSubsystem) autoCreatePR(wsDir string) {
repoDir := core.JoinPath(wsDir, "repo")
// Detect default branch for this repo
base := DefaultBranch(repoDir)
// PRs target dev — agents never merge directly to main
base := "dev"
// Check if there are commits on the branch beyond the default branch
diffCmd := exec.Command("git", "log", "--oneline", "origin/"+base+"..HEAD")
diffCmd.Dir = repoDir
out, err := diffCmd.Output()
if err != nil || len(core.Trim(string(out))) == 0 {
// No commits — nothing to PR
return
}
@ -81,6 +79,9 @@ func (s *PrepSubsystem) buildAutoPRBody(st *WorkspaceStatus, commits int) string
b.WriteString("## Task\n\n")
b.WriteString(st.Task)
b.WriteString("\n\n")
if st.Issue > 0 {
b.WriteString(core.Sprintf("Closes #%d\n\n", st.Issue))
}
b.WriteString(core.Sprintf("**Agent:** %s\n", st.Agent))
b.WriteString(core.Sprintf("**Commits:** %d\n", commits))
b.WriteString(core.Sprintf("**Branch:** `%s`\n", st.Branch))

View file

@ -71,12 +71,18 @@ func agentCommand(agent, prompt string) (string, []string, error) {
return "gemini", args, nil
case "codex":
if model == "review" {
return "codex", []string{"review", "--base", "HEAD~1"}, nil
// Use exec with bypass — codex review subcommand has its own sandbox that blocks shell
// No -o flag — stdout captured by process output, ../.meta path unreliable in sandbox
return "codex", []string{
"exec",
"--dangerously-bypass-approvals-and-sandbox",
"Review the last 2 commits via git diff HEAD~2. Check for bugs, security issues, missing tests, naming issues. Report pass/fail with specifics. Do NOT make changes.",
}, nil
}
// Codex runs from repo/ which IS a git repo — no --skip-git-repo-check
// Container IS the sandbox — let codex run unrestricted inside it
args := []string{
"exec",
"--full-auto",
"--dangerously-bypass-approvals-and-sandbox",
"-o", "../.meta/agent-codex.log",
}
if model != "" {
@ -107,14 +113,87 @@ func agentCommand(agent, prompt string) (string, []string, error) {
}
return "coderabbit", args, nil
case "local":
script := core.JoinPath(core.Env("DIR_HOME"), "Code", "core", "agent", "scripts", "local-agent.sh")
return "bash", []string{script, prompt}, nil
// Local model via codex --oss → Ollama. Default model: devstral-24b
// socat proxies localhost:11434 → host.docker.internal:11434
// because codex hardcodes localhost check for Ollama.
localModel := model
if localModel == "" {
localModel = "devstral-24b"
}
script := core.Sprintf(
`socat TCP-LISTEN:11434,fork,reuseaddr TCP:host.docker.internal:11434 & sleep 0.5 && codex exec --dangerously-bypass-approvals-and-sandbox --oss --local-provider ollama -m %s -o ../.meta/agent-codex.log %q`,
localModel, prompt,
)
return "sh", []string{"-c", script}, nil
default:
return "", nil, core.E("agentCommand", "unknown agent: "+agent, nil)
}
}
// spawnAgent launches an agent process in the repo/ directory.
// defaultDockerImage is the container image for agent dispatch.
// Override via AGENT_DOCKER_IMAGE env var.
const defaultDockerImage = "core-dev"
// containerCommand wraps an agent command to run inside a Docker container.
// All agents run containerised — no bare metal execution.
// agentType is the base agent name (e.g. "local", "codex", "claude").
//
// cmd, args := containerCommand("local", "codex", []string{"exec", "..."}, repoDir, metaDir)
func containerCommand(agentType, command string, args []string, repoDir, metaDir string) (string, []string) {
image := core.Env("AGENT_DOCKER_IMAGE")
if image == "" {
image = defaultDockerImage
}
home := core.Env("DIR_HOME")
dockerArgs := []string{
"run", "--rm",
// Host access for Ollama (local models)
"--add-host=host.docker.internal:host-gateway",
// Workspace: repo + meta
"-v", repoDir + ":/workspace",
"-v", metaDir + ":/workspace/.meta",
"-w", "/workspace",
// Auth: agent configs only — NO SSH keys, git push runs on host
"-v", core.JoinPath(home, ".codex") + ":/root/.codex:ro",
// API keys — passed by name, Docker resolves from host env
"-e", "OPENAI_API_KEY",
"-e", "ANTHROPIC_API_KEY",
"-e", "GEMINI_API_KEY",
"-e", "GOOGLE_API_KEY",
// Agent environment
"-e", "TERM=dumb",
"-e", "NO_COLOR=1",
"-e", "CI=true",
"-e", "GIT_USER_NAME=Virgil",
"-e", "GIT_USER_EMAIL=virgil@lethean.io",
// Local model access — Ollama on host
"-e", "OLLAMA_HOST=http://host.docker.internal:11434",
}
// Mount Claude config if dispatching claude agent
if command == "claude" {
dockerArgs = append(dockerArgs,
"-v", core.JoinPath(home, ".claude")+":/root/.claude:ro",
)
}
// Mount Gemini config if dispatching gemini agent
if command == "gemini" {
dockerArgs = append(dockerArgs,
"-v", core.JoinPath(home, ".gemini")+":/root/.gemini:ro",
)
}
dockerArgs = append(dockerArgs, image, command)
dockerArgs = append(dockerArgs, args...)
return "docker", dockerArgs
}
// spawnAgent launches an agent inside a Docker container.
// The repo/ directory is mounted at /workspace, agent runs sandboxed.
// Output is captured and written to .meta/agent-{agent}.log on completion.
func (s *PrepSubsystem) spawnAgent(agent, prompt, wsDir string) (int, string, error) {
command, args, err := agentCommand(agent, prompt)
@ -131,11 +210,13 @@ func (s *PrepSubsystem) spawnAgent(agent, prompt, wsDir string) (int, string, er
// Clean up stale BLOCKED.md from previous runs
fs.Delete(core.JoinPath(repoDir, "BLOCKED.md"))
// All agents run containerised
command, args = containerCommand(agentBase, command, args, repoDir, metaDir)
proc, err := process.StartWithOptions(context.Background(), process.RunOptions{
Command: command,
Args: args,
Dir: repoDir,
Env: []string{"TERM=dumb", "NO_COLOR=1", "CI=true"},
Detach: true,
})
if err != nil {
@ -156,6 +237,15 @@ func (s *PrepSubsystem) spawnAgent(agent, prompt, wsDir string) (int, string, er
}
emitStartEvent(agent, core.PathBase(wsDir)) // audit log
// Start Forge stopwatch on the issue (time tracking)
if st, _ := readStatus(wsDir); st != nil && st.Issue > 0 {
org := st.Org
if org == "" {
org = "core"
}
s.forge.Issues.StartStopwatch(context.Background(), org, st.Repo, int64(st.Issue))
}
go func() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
@ -200,6 +290,15 @@ func (s *PrepSubsystem) spawnAgent(agent, prompt, wsDir string) (int, string, er
emitCompletionEvent(agent, core.PathBase(wsDir), finalStatus) // audit log
// Stop Forge stopwatch on the issue (time tracking)
if st, _ := readStatus(wsDir); st != nil && st.Issue > 0 {
org := st.Org
if org == "" {
org = "core"
}
s.forge.Issues.StopStopwatch(context.Background(), org, st.Repo, int64(st.Issue))
}
// Push notification directly — no filesystem polling
if s.onComplete != nil {
stNow, _ := readStatus(wsDir)
@ -227,7 +326,7 @@ func (s *PrepSubsystem) spawnAgent(agent, prompt, wsDir string) (int, string, er
}
s.ingestFindings(wsDir)
s.drainQueue()
s.Poke()
}()
return pid, outputFile, nil

View file

@ -83,7 +83,7 @@ func (s *PrepSubsystem) createPR(ctx context.Context, _ *mcp.CallToolRequest, in
}
base := input.Base
if base == "" {
base = "main"
base = "dev"
}
// Build PR title

View file

@ -45,6 +45,7 @@ type PrepSubsystem struct {
client *http.Client
onComplete CompletionNotifier
drainMu sync.Mutex
pokeCh chan struct{}
}
var _ coremcp.Subsystem = (*PrepSubsystem)(nil)

View file

@ -3,7 +3,6 @@
package agentic
import (
"os"
"strconv"
"syscall"
"time"
@ -159,27 +158,29 @@ func (s *PrepSubsystem) canDispatchAgent(agent string) bool {
return s.countRunningByAgent(base) < limit
}
// drainQueue finds the oldest queued workspace and spawns it if a slot is available.
// Applies rate-based delay between spawns. Serialised via drainMu to prevent
// concurrent drainers from exceeding concurrency limits.
// drainQueue fills all available concurrency slots from queued workspaces.
// Loops until no slots remain or no queued tasks match. Serialised via drainMu.
func (s *PrepSubsystem) drainQueue() {
s.drainMu.Lock()
defer s.drainMu.Unlock()
for s.drainOne() {
// keep filling slots
}
}
// drainOne finds the oldest queued workspace and spawns it if a slot is available.
// Returns true if a task was spawned, false if nothing to do.
func (s *PrepSubsystem) drainOne() bool {
wsRoot := WorkspaceRoot()
r := fs.List(wsRoot)
if !r.OK {
return
}
entries := r.Value.([]os.DirEntry)
// Scan both old and new workspace layouts
old := core.PathGlob(core.JoinPath(wsRoot, "*", "status.json"))
deep := core.PathGlob(core.JoinPath(wsRoot, "*", "*", "*", "status.json"))
statusFiles := append(old, deep...)
for _, entry := range entries {
if !entry.IsDir() {
continue
}
wsDir := core.JoinPath(wsRoot, entry.Name())
for _, statusPath := range statusFiles {
wsDir := core.PathDir(statusPath)
st, err := readStatus(wsDir)
if err != nil || st.Status != "queued" {
continue
@ -212,6 +213,8 @@ func (s *PrepSubsystem) drainQueue() {
st.Runs++
writeStatus(wsDir, st)
return
return true
}
return false
}

View file

@ -23,13 +23,12 @@ type RemoteStatusInput struct {
// RemoteStatusOutput is the response from a remote status check.
//
// out := agentic.RemoteStatusOutput{Success: true, Host: "charon", Count: 2}
// out := agentic.RemoteStatusOutput{Success: true, Host: "charon"}
type RemoteStatusOutput struct {
Success bool `json:"success"`
Host string `json:"host"`
Workspaces []WorkspaceInfo `json:"workspaces"`
Count int `json:"count"`
Error string `json:"error,omitempty"`
Success bool `json:"success"`
Host string `json:"host"`
Stats StatusOutput `json:"stats"`
Error string `json:"error,omitempty"`
}
func (s *PrepSubsystem) registerRemoteStatusTool(server *mcp.Server) {
@ -106,8 +105,7 @@ func (s *PrepSubsystem) statusRemote(ctx context.Context, _ *mcp.CallToolRequest
if len(rpcResp.Result.Content) > 0 {
var statusOut StatusOutput
if json.Unmarshal([]byte(rpcResp.Result.Content[0].Text), &statusOut) == nil {
output.Workspaces = statusOut.Workspaces
output.Count = statusOut.Count
output.Stats = statusOut
}
}

42
pkg/agentic/runner.go Normal file
View file

@ -0,0 +1,42 @@
// SPDX-License-Identifier: EUPL-1.2
package agentic
import "time"
// StartRunner begins the background queue runner.
// Ticks every 30s to drain queued tasks into available slots.
// Also responds to Poke() for immediate drain on completion events.
//
// prep.StartRunner()
func (s *PrepSubsystem) StartRunner() {
s.pokeCh = make(chan struct{}, 1)
go s.runLoop()
}
func (s *PrepSubsystem) runLoop() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
s.drainQueue()
case <-s.pokeCh:
s.drainQueue()
}
}
}
// Poke signals the runner to check the queue immediately.
// Non-blocking — if a poke is already pending, this is a no-op.
//
// s.Poke() // after agent completion
func (s *PrepSubsystem) Poke() {
if s.pokeCh == nil {
return
}
select {
case s.pokeCh <- struct{}{}:
default:
}
}

View file

@ -74,31 +74,34 @@ func readStatus(wsDir string) (*WorkspaceStatus, error) {
// StatusInput is the input for agentic_status.
//
// input := agentic.StatusInput{Workspace: "go-io-123"}
// input := agentic.StatusInput{Workspace: "go-io-123", Limit: 50}
type StatusInput struct {
Workspace string `json:"workspace,omitempty"` // specific workspace name, or empty for all
Limit int `json:"limit,omitempty"` // max results (default 100)
Status string `json:"status,omitempty"` // filter: running, completed, failed, blocked
}
// StatusOutput is the output for agentic_status.
// Returns stats by default. Only blocked workspaces are listed (they need attention).
//
// out := agentic.StatusOutput{Count: 1, Workspaces: []agentic.WorkspaceInfo{{Name: "go-io-123"}}}
// out := agentic.StatusOutput{Total: 42, Running: 3, Queued: 10, Completed: 25}
type StatusOutput struct {
Workspaces []WorkspaceInfo `json:"workspaces"`
Count int `json:"count"`
Total int `json:"total"`
Running int `json:"running"`
Queued int `json:"queued"`
Completed int `json:"completed"`
Failed int `json:"failed"`
Blocked []BlockedInfo `json:"blocked,omitempty"`
}
// WorkspaceInfo summarises one workspace returned by agentic_status.
// BlockedInfo shows a workspace that needs human input.
//
// info := agentic.WorkspaceInfo{Name: "go-io-123", Status: "running", Agent: "codex", Repo: "go-io"}
type WorkspaceInfo struct {
// info := agentic.BlockedInfo{Name: "go-io/task-4", Repo: "go-io", Question: "Which API version?"}
type BlockedInfo struct {
Name string `json:"name"`
Status string `json:"status"`
Agent string `json:"agent"`
Repo string `json:"repo"`
Task string `json:"task"`
Age string `json:"age"`
Question string `json:"question,omitempty"`
Runs int `json:"runs"`
Agent string `json:"agent"`
Question string `json:"question"`
}
func (s *PrepSubsystem) registerStatusTool(server *mcp.Server) {
@ -116,54 +119,32 @@ func (s *PrepSubsystem) status(ctx context.Context, _ *mcp.CallToolRequest, inpu
deep := core.PathGlob(core.JoinPath(wsRoot, "*", "*", "*", "status.json"))
statusFiles := append(old, deep...)
var workspaces []WorkspaceInfo
var out StatusOutput
for _, statusPath := range statusFiles {
wsDir := core.PathDir(statusPath)
// Name: for old layout use dir name, for new use relative path from wsRoot
name := wsDir[len(wsRoot)+1:]
// Filter by specific workspace if requested
if input.Workspace != "" && name != input.Workspace {
continue
}
info := WorkspaceInfo{Name: name}
st, err := readStatus(wsDir)
if err != nil {
info.Status = "unknown"
workspaces = append(workspaces, info)
out.Total++
out.Failed++
continue
}
info.Status = st.Status
info.Agent = st.Agent
info.Repo = st.Repo
info.Task = st.Task
info.Runs = st.Runs
info.Age = time.Since(st.StartedAt).Truncate(time.Minute).String()
// If status is "running", check if PID is still alive
if st.Status == "running" && st.PID > 0 {
if err := syscall.Kill(st.PID, 0); err != nil {
// Process died — check for BLOCKED.md
blockedPath := core.JoinPath(wsDir, "repo", "BLOCKED.md")
if r := fs.Read(blockedPath); r.OK {
info.Status = "blocked"
info.Question = core.Trim(r.Value.(string))
st.Status = "blocked"
st.Question = info.Question
st.Question = core.Trim(r.Value.(string))
} else {
// Dead PID without BLOCKED.md — check exit code from log
// If no evidence of success, mark as failed (not completed)
logFile := core.JoinPath(wsDir, core.Sprintf("agent-%s.log", st.Agent))
if r := fs.Read(logFile); !r.OK {
info.Status = "failed"
st.Status = "failed"
st.Question = "Agent process died (no output log)"
} else {
info.Status = "completed"
st.Status = "completed"
}
}
@ -171,15 +152,25 @@ func (s *PrepSubsystem) status(ctx context.Context, _ *mcp.CallToolRequest, inpu
}
}
if st.Status == "blocked" {
info.Question = st.Question
out.Total++
switch st.Status {
case "running":
out.Running++
case "queued":
out.Queued++
case "completed":
out.Completed++
case "failed":
out.Failed++
case "blocked":
out.Blocked = append(out.Blocked, BlockedInfo{
Name: name,
Repo: st.Repo,
Agent: st.Agent,
Question: st.Question,
})
}
workspaces = append(workspaces, info)
}
return nil, StatusOutput{
Workspaces: workspaces,
Count: len(workspaces),
}, nil
return nil, out, nil
}