feat(agent): v0.3.0 — dispatch control, run task CLI, quiet notifications, spark pool

- Add agentic_dispatch_start / shutdown / shutdown_now MCP tools
- Queue frozen by default, CORE_AGENT_DISPATCH=1 to auto-start
- Add run task CLI command — single task e2e (prep → spawn → wait)
- Add DispatchSync for blocking dispatch without MCP
- Quiet notifications — only agent.failed and queue.drained events
- Remove duplicate notification paths (direct callback + polling loop)
- codex-spark gets separate concurrency pool (baseAgent routing)
- Rate-limit backoff detection (3 fast failures → 30min pause)
- Review agent uses exec with sandbox bypass (not codex review)
- Bump: core-agent 0.3.0, core plugin 0.15.0, devops plugin 0.2.0

Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
Snider 2026-03-23 16:08:08 +00:00
parent 6d4b92737e
commit 9bdd47d9d5
10 changed files with 400 additions and 43 deletions

View file

@ -1,7 +1,7 @@
{
"name": "core",
"description": "Core agent platform — dispatch (local + remote), verify+merge, CodeRabbit/Codex review queue, GitHub mirror, cross-agent messaging, OpenBrain integration, inbox notifications",
"version": "0.14.0",
"version": "0.15.0",
"author": {
"name": "Lethean Community",
"email": "hello@lethean.io"

View file

@ -1,6 +1,6 @@
{
"name": "devops",
"version": "0.1.0",
"version": "0.2.0",
"description": "DevOps utilities for the Core ecosystem — build, install, deploy.",
"author": {
"name": "Lethean",

View file

@ -4,6 +4,7 @@ import (
"context"
"os"
"os/signal"
"strconv"
"syscall"
"dappco.re/go/core"
@ -20,7 +21,7 @@ func main() {
c := core.New(core.Options{
{Key: "name", Value: "core-agent"},
})
c.App().Version = "0.2.0"
c.App().Version = "0.3.0"
// version — print version and build info
c.Command("version", core.Command{
@ -381,6 +382,76 @@ func main() {
},
})
// run task — single task e2e (prep → spawn → wait → done)
c.Command("run/task", core.Command{
Description: "Run a single task end-to-end",
Action: func(opts core.Options) core.Result {
repo := opts.String("repo")
agent := opts.String("agent")
task := opts.String("task")
issueStr := opts.String("issue")
org := opts.String("org")
if repo == "" || task == "" {
core.Print(nil, "usage: core-agent run task --repo=<repo> --task=\"...\" --agent=codex [--issue=N] [--org=core]")
return core.Result{OK: false}
}
if agent == "" {
agent = "codex"
}
if org == "" {
org = "core"
}
issue := 0
if issueStr != "" {
if n, err := strconv.Atoi(issueStr); err == nil {
issue = n
}
}
procFactory := process.NewService(process.Options{})
procResult, err := procFactory(c)
if err != nil {
return core.Result{Value: err, OK: false}
}
if procSvc, ok := procResult.(*process.Service); ok {
_ = process.SetDefault(procSvc)
}
prep := agentic.NewPrep()
core.Print(os.Stderr, "core-agent run task")
core.Print(os.Stderr, " repo: %s/%s", org, repo)
core.Print(os.Stderr, " agent: %s", agent)
if issue > 0 {
core.Print(os.Stderr, " issue: #%d", issue)
}
core.Print(os.Stderr, " task: %s", task)
core.Print(os.Stderr, "")
// Dispatch and wait
result := prep.DispatchSync(ctx, agentic.DispatchSyncInput{
Org: org,
Repo: repo,
Agent: agent,
Task: task,
Issue: issue,
})
if !result.OK {
core.Print(os.Stderr, "FAILED: %v", result.Error)
return core.Result{Value: result.Error, OK: false}
}
core.Print(os.Stderr, "DONE: %s", result.Status)
if result.PRURL != "" {
core.Print(os.Stderr, " PR: %s", result.PRURL)
}
return core.Result{OK: true}
},
})
// run orchestrator — standalone queue runner without MCP stdio
c.Command("run/orchestrator", core.Command{
Description: "Run the queue orchestrator (standalone, no MCP)",

View file

@ -290,6 +290,25 @@ func (s *PrepSubsystem) spawnAgent(agent, prompt, wsDir string) (int, string, er
emitCompletionEvent(agent, core.PathBase(wsDir), finalStatus) // audit log
// Rate-limit detection: if agent failed fast (<60s), track consecutive failures
pool := baseAgent(agent)
if finalStatus == "failed" {
if st, _ := readStatus(wsDir); st != nil {
elapsed := time.Since(st.StartedAt)
if elapsed < 60*time.Second {
s.failCount[pool]++
if s.failCount[pool] >= 3 {
s.backoff[pool] = time.Now().Add(30 * time.Minute)
core.Print(nil, "rate-limit detected for %s — pausing pool for 30 minutes", pool)
}
} else {
s.failCount[pool] = 0 // slow failure = real failure, reset count
}
}
} else {
s.failCount[pool] = 0 // success resets count
}
// Stop Forge stopwatch on the issue (time tracking)
if st, _ := readStatus(wsDir); st != nil && st.Issue > 0 {
org := st.Org

View file

@ -0,0 +1,97 @@
// SPDX-License-Identifier: EUPL-1.2
package agentic
import (
"context"
"syscall"
"time"
core "dappco.re/go/core"
)
// DispatchSyncInput is the input for a synchronous (blocking) task run.
//
// input := agentic.DispatchSyncInput{Repo: "go-crypt", Agent: "codex:gpt-5.3-codex-spark", Task: "fix it", Issue: 7}
type DispatchSyncInput struct {
Org string
Repo string
Agent string
Task string
Issue int
}
// DispatchSyncResult is the output of a synchronous task run.
//
// if result.OK { fmt.Println("done:", result.Status) }
type DispatchSyncResult struct {
OK bool
Status string
Error string
PRURL string
}
// DispatchSync preps a workspace, spawns the agent directly (no queue, no concurrency check),
// and blocks until the agent completes.
//
// result := prep.DispatchSync(ctx, input)
func (s *PrepSubsystem) DispatchSync(ctx context.Context, input DispatchSyncInput) DispatchSyncResult {
// Prep workspace
prepInput := PrepInput{
Org: input.Org,
Repo: input.Repo,
Task: input.Task,
Agent: input.Agent,
Issue: input.Issue,
}
prepCtx, cancel := context.WithTimeout(ctx, 5*time.Minute)
defer cancel()
_, prepOut, err := s.prepWorkspace(prepCtx, nil, prepInput)
if err != nil {
return DispatchSyncResult{Error: err.Error()}
}
if !prepOut.Success {
return DispatchSyncResult{Error: "prep failed"}
}
wsDir := prepOut.WorkspaceDir
prompt := prepOut.Prompt
core.Print(nil, " workspace: %s", wsDir)
core.Print(nil, " branch: %s", prepOut.Branch)
// Spawn agent directly — no queue, no concurrency check
pid, _, err := s.spawnAgent(input.Agent, prompt, wsDir)
if err != nil {
return DispatchSyncResult{Error: err.Error()}
}
core.Print(nil, " pid: %d", pid)
core.Print(nil, " waiting for completion...")
// Poll for process exit
ticker := time.NewTicker(3 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return DispatchSyncResult{Error: "cancelled"}
case <-ticker.C:
if pid > 0 && syscall.Kill(pid, 0) != nil {
// Process exited — read final status
st, err := readStatus(wsDir)
if err != nil {
return DispatchSyncResult{Error: "can't read final status"}
}
return DispatchSyncResult{
OK: st.Status == "completed",
Status: st.Status,
PRURL: st.PRURL,
}
}
}
}
}

View file

@ -46,6 +46,9 @@ type PrepSubsystem struct {
onComplete CompletionNotifier
drainMu sync.Mutex
pokeCh chan struct{}
frozen bool
backoff map[string]time.Time // pool → paused until
failCount map[string]int // pool → consecutive fast failures
}
var _ coremcp.Subsystem = (*PrepSubsystem)(nil)
@ -79,6 +82,8 @@ func NewPrep() *PrepSubsystem {
brainKey: brainKey,
codePath: envOr("CODE_PATH", core.JoinPath(home, "Code")),
client: &http.Client{Timeout: 30 * time.Second},
backoff: make(map[string]time.Time),
failCount: make(map[string]int),
}
}
@ -116,6 +121,7 @@ func (s *PrepSubsystem) RegisterTools(server *mcp.Server) {
s.registerRemoteDispatchTool(server)
s.registerRemoteStatusTool(server)
s.registerReviewQueueTool(server)
s.registerShutdownTools(server)
mcp.AddTool(server, &mcp.Tool{
Name: "agentic_scan",

View file

@ -144,6 +144,10 @@ func (s *PrepSubsystem) countRunningByAgent(agent string) int {
// baseAgent strips the model variant (gemini:flash → gemini).
func baseAgent(agent string) string {
// codex:gpt-5.3-codex-spark → codex-spark (separate pool)
if core.Contains(agent, "codex-spark") {
return "codex-spark"
}
return core.SplitN(agent, ":", 2)[0]
}
@ -161,6 +165,9 @@ func (s *PrepSubsystem) canDispatchAgent(agent string) bool {
// drainQueue fills all available concurrency slots from queued workspaces.
// Loops until no slots remain or no queued tasks match. Serialised via drainMu.
func (s *PrepSubsystem) drainQueue() {
if s.frozen {
return
}
s.drainMu.Lock()
defer s.drainMu.Unlock()
@ -190,6 +197,12 @@ func (s *PrepSubsystem) drainOne() bool {
continue
}
// Skip if agent pool is in rate-limit backoff
pool := baseAgent(st.Agent)
if until, ok := s.backoff[pool]; ok && time.Now().Before(until) {
continue
}
// Apply rate delay before spawning
delay := s.delayForAgent(st.Agent)
if delay > 0 {

View file

@ -2,15 +2,28 @@
package agentic
import "time"
import (
"time"
core "dappco.re/go/core"
)
// StartRunner begins the background queue runner.
// Ticks every 30s to drain queued tasks into available slots.
// Also responds to Poke() for immediate drain on completion events.
// Queue is frozen by default — use agentic_dispatch_start to unfreeze,
// or set CORE_AGENT_DISPATCH=1 to auto-start.
//
// prep.StartRunner()
func (s *PrepSubsystem) StartRunner() {
s.pokeCh = make(chan struct{}, 1)
// Frozen by default — explicit start required
if core.Env("CORE_AGENT_DISPATCH") == "1" {
s.frozen = false
core.Print(nil, "dispatch: auto-start enabled (CORE_AGENT_DISPATCH=1)")
} else {
s.frozen = true
}
go s.runLoop()
}

115
pkg/agentic/shutdown.go Normal file
View file

@ -0,0 +1,115 @@
// SPDX-License-Identifier: EUPL-1.2
package agentic
import (
"context"
"syscall"
core "dappco.re/go/core"
"github.com/modelcontextprotocol/go-sdk/mcp"
)
// ShutdownInput is the input for agentic_dispatch_shutdown.
//
// input := agentic.ShutdownInput{}
type ShutdownInput struct{}
// ShutdownOutput is the output for agentic_dispatch_shutdown.
//
// out := agentic.ShutdownOutput{Success: true, Running: 3, Message: "draining"}
type ShutdownOutput struct {
Success bool `json:"success"`
Running int `json:"running"`
Queued int `json:"queued"`
Message string `json:"message"`
}
func (s *PrepSubsystem) registerShutdownTools(server *mcp.Server) {
mcp.AddTool(server, &mcp.Tool{
Name: "agentic_dispatch_start",
Description: "Start the dispatch queue runner. Unfreezes the queue and begins draining.",
}, s.dispatchStart)
mcp.AddTool(server, &mcp.Tool{
Name: "agentic_dispatch_shutdown",
Description: "Graceful shutdown: stop accepting new jobs, let running agents finish. Queue is frozen.",
}, s.shutdownGraceful)
mcp.AddTool(server, &mcp.Tool{
Name: "agentic_dispatch_shutdown_now",
Description: "Hard shutdown: kill all running agents immediately. Queue is cleared.",
}, s.shutdownNow)
}
// dispatchStart unfreezes the queue and starts draining.
func (s *PrepSubsystem) dispatchStart(ctx context.Context, _ *mcp.CallToolRequest, input ShutdownInput) (*mcp.CallToolResult, ShutdownOutput, error) {
s.frozen = false
s.Poke() // trigger immediate drain
return nil, ShutdownOutput{
Success: true,
Message: "dispatch started — queue unfrozen, draining",
}, nil
}
// shutdownGraceful freezes the queue — running agents finish, no new dispatches.
func (s *PrepSubsystem) shutdownGraceful(ctx context.Context, _ *mcp.CallToolRequest, input ShutdownInput) (*mcp.CallToolResult, ShutdownOutput, error) {
s.frozen = true
running := s.countRunningByAgent("codex") + s.countRunningByAgent("claude") +
s.countRunningByAgent("gemini") + s.countRunningByAgent("codex-spark")
return nil, ShutdownOutput{
Success: true,
Running: running,
Message: "queue frozen — running agents will finish, no new dispatches",
}, nil
}
// shutdownNow kills all running agents and clears the queue.
func (s *PrepSubsystem) shutdownNow(ctx context.Context, _ *mcp.CallToolRequest, input ShutdownInput) (*mcp.CallToolResult, ShutdownOutput, error) {
s.frozen = true
wsRoot := WorkspaceRoot()
old := core.PathGlob(core.JoinPath(wsRoot, "*", "status.json"))
deep := core.PathGlob(core.JoinPath(wsRoot, "*", "*", "*", "status.json"))
statusFiles := append(old, deep...)
killed := 0
cleared := 0
for _, statusPath := range statusFiles {
wsDir := core.PathDir(statusPath)
st, err := readStatus(wsDir)
if err != nil {
continue
}
// Kill running agents
if st.Status == "running" && st.PID > 0 {
if syscall.Kill(st.PID, syscall.SIGTERM) == nil {
killed++
}
st.Status = "failed"
st.Question = "killed by shutdown_now"
st.PID = 0
writeStatus(wsDir, st)
}
// Clear queued tasks
if st.Status == "queued" {
st.Status = "failed"
st.Question = "cleared by shutdown_now"
writeStatus(wsDir, st)
cleared++
}
}
return nil, ShutdownOutput{
Success: true,
Running: 0,
Queued: 0,
Message: core.Sprintf("killed %d agents, cleared %d queued tasks", killed, cleared),
}, nil
}

View file

@ -222,41 +222,71 @@ func (m *Subsystem) Poke() {
}
}
// AgentStarted pushes an immediate notification when an agent spawns.
// Called directly by dispatch — no filesystem polling.
// AgentStarted is called when an agent spawns.
// No individual notification — fleet status is checked on completion.
//
// mon.AgentStarted("codex:gpt-5.3-codex-spark", "go-io", "core/go-io/task-5")
func (m *Subsystem) AgentStarted(agent, repo, workspace string) {
if m.notifier != nil {
m.notifier.ChannelSend(context.Background(), "agent.started", map[string]any{
"agent": agent,
"repo": repo,
})
}
// No-op — we only notify on failures and queue drain
}
// AgentCompleted pushes an immediate notification when an agent finishes.
// Called directly by dispatch — no filesystem polling needed.
// AgentCompleted is called when an agent finishes.
// Only sends notifications for failures. Sends "queue.drained" when all work is done.
//
// mon.AgentCompleted("codex", "go-io", "core/go-io/task-5", "completed")
func (m *Subsystem) AgentCompleted(agent, repo, workspace, status string) {
if m.notifier != nil {
// Count current running/queued from status for context
running := 0
queued := 0
m.mu.Lock()
m.seenCompleted[workspace] = true
m.mu.Unlock()
m.mu.Lock()
m.seenCompleted[workspace] = true
m.mu.Unlock()
m.notifier.ChannelSend(context.Background(), "agent.complete", map[string]any{
"completed": []string{core.Sprintf("%s (%s) [%s]", repo, agent, status)},
"count": 1,
"running": running,
"queued": queued,
if m.notifier != nil {
// Only notify on failures — those need attention
if status == "failed" || status == "blocked" {
m.notifier.ChannelSend(context.Background(), "agent.failed", map[string]any{
"repo": repo,
"agent": agent,
"status": status,
})
}
}
// Check if queue is drained (0 running + 0 queued)
m.Poke()
go m.checkIdleAfterDelay()
}
// checkIdleAfterDelay waits briefly then checks if the fleet is idle.
// Sends a single "queue.drained" notification when all work stops.
func (m *Subsystem) checkIdleAfterDelay() {
time.Sleep(5 * time.Second) // wait for runner to fill slots
if m.notifier == nil {
return
}
// Quick count — scan for running/queued
running := 0
queued := 0
wsRoot := agentic.WorkspaceRoot()
old := core.PathGlob(core.JoinPath(wsRoot, "*", "status.json"))
deep := core.PathGlob(core.JoinPath(wsRoot, "*", "*", "*", "status.json"))
for _, path := range append(old, deep...) {
r := fs.Read(path)
if !r.OK {
continue
}
s := r.Value.(string)
if core.Contains(s, `"status":"running"`) {
running++
} else if core.Contains(s, `"status":"queued"`) {
queued++
}
}
if running == 0 && queued == 0 {
m.notifier.ChannelSend(context.Background(), "queue.drained", map[string]any{
"message": "all work complete",
})
}
// Also poke to update counts for any other monitors
m.Poke()
}
func (m *Subsystem) loop(ctx context.Context) {
@ -379,12 +409,7 @@ func (m *Subsystem) checkCompletions() string {
running++
if !m.seenRunning[wsName] && seeded {
m.seenRunning[wsName] = true
if m.notifier != nil {
m.notifier.ChannelSend(context.Background(), "agent.started", map[string]any{
"repo": st.Repo,
"agent": st.Agent,
})
}
// No individual start notification — too noisy
}
case "queued":
queued++
@ -405,13 +430,11 @@ func (m *Subsystem) checkCompletions() string {
return ""
}
// Push channel events
if m.notifier != nil {
m.notifier.ChannelSend(context.Background(), "agent.complete", map[string]any{
"count": len(newlyCompleted),
"completed": newlyCompleted,
"running": running,
"queued": queued,
// Only notify on queue drain (0 running + 0 queued) — individual completions are noise
if m.notifier != nil && running == 0 && queued == 0 {
m.notifier.ChannelSend(context.Background(), "queue.drained", map[string]any{
"completed": len(newlyCompleted),
"message": "all work complete",
})
}