diff --git a/claude/core/.claude-plugin/plugin.json b/claude/core/.claude-plugin/plugin.json index 0516bc1..4c6015b 100644 --- a/claude/core/.claude-plugin/plugin.json +++ b/claude/core/.claude-plugin/plugin.json @@ -1,7 +1,7 @@ { "name": "core", "description": "Core agent platform — dispatch (local + remote), verify+merge, CodeRabbit/Codex review queue, GitHub mirror, cross-agent messaging, OpenBrain integration, inbox notifications", - "version": "0.14.0", + "version": "0.15.0", "author": { "name": "Lethean Community", "email": "hello@lethean.io" diff --git a/claude/devops/.claude-plugin/plugin.json b/claude/devops/.claude-plugin/plugin.json index 91a09f8..226c00b 100644 --- a/claude/devops/.claude-plugin/plugin.json +++ b/claude/devops/.claude-plugin/plugin.json @@ -1,6 +1,6 @@ { "name": "devops", - "version": "0.1.0", + "version": "0.2.0", "description": "DevOps utilities for the Core ecosystem — build, install, deploy.", "author": { "name": "Lethean", diff --git a/cmd/core-agent/main.go b/cmd/core-agent/main.go index 8f819ee..3d40025 100644 --- a/cmd/core-agent/main.go +++ b/cmd/core-agent/main.go @@ -4,6 +4,7 @@ import ( "context" "os" "os/signal" + "strconv" "syscall" "dappco.re/go/core" @@ -20,7 +21,7 @@ func main() { c := core.New(core.Options{ {Key: "name", Value: "core-agent"}, }) - c.App().Version = "0.2.0" + c.App().Version = "0.3.0" // version — print version and build info c.Command("version", core.Command{ @@ -381,6 +382,76 @@ func main() { }, }) + // run task — single task e2e (prep → spawn → wait → done) + c.Command("run/task", core.Command{ + Description: "Run a single task end-to-end", + Action: func(opts core.Options) core.Result { + repo := opts.String("repo") + agent := opts.String("agent") + task := opts.String("task") + issueStr := opts.String("issue") + org := opts.String("org") + + if repo == "" || task == "" { + core.Print(nil, "usage: core-agent run task --repo= --task=\"...\" --agent=codex [--issue=N] [--org=core]") + return core.Result{OK: false} + } + if agent == "" { + agent = "codex" + } + if org == "" { + org = "core" + } + + issue := 0 + if issueStr != "" { + if n, err := strconv.Atoi(issueStr); err == nil { + issue = n + } + } + + procFactory := process.NewService(process.Options{}) + procResult, err := procFactory(c) + if err != nil { + return core.Result{Value: err, OK: false} + } + if procSvc, ok := procResult.(*process.Service); ok { + _ = process.SetDefault(procSvc) + } + + prep := agentic.NewPrep() + + core.Print(os.Stderr, "core-agent run task") + core.Print(os.Stderr, " repo: %s/%s", org, repo) + core.Print(os.Stderr, " agent: %s", agent) + if issue > 0 { + core.Print(os.Stderr, " issue: #%d", issue) + } + core.Print(os.Stderr, " task: %s", task) + core.Print(os.Stderr, "") + + // Dispatch and wait + result := prep.DispatchSync(ctx, agentic.DispatchSyncInput{ + Org: org, + Repo: repo, + Agent: agent, + Task: task, + Issue: issue, + }) + + if !result.OK { + core.Print(os.Stderr, "FAILED: %v", result.Error) + return core.Result{Value: result.Error, OK: false} + } + + core.Print(os.Stderr, "DONE: %s", result.Status) + if result.PRURL != "" { + core.Print(os.Stderr, " PR: %s", result.PRURL) + } + return core.Result{OK: true} + }, + }) + // run orchestrator — standalone queue runner without MCP stdio c.Command("run/orchestrator", core.Command{ Description: "Run the queue orchestrator (standalone, no MCP)", diff --git a/pkg/agentic/dispatch.go b/pkg/agentic/dispatch.go index 8df3a32..aff3aa0 100644 --- a/pkg/agentic/dispatch.go +++ b/pkg/agentic/dispatch.go @@ -290,6 +290,25 @@ func (s *PrepSubsystem) spawnAgent(agent, prompt, wsDir string) (int, string, er emitCompletionEvent(agent, core.PathBase(wsDir), finalStatus) // audit log + // Rate-limit detection: if agent failed fast (<60s), track consecutive failures + pool := baseAgent(agent) + if finalStatus == "failed" { + if st, _ := readStatus(wsDir); st != nil { + elapsed := time.Since(st.StartedAt) + if elapsed < 60*time.Second { + s.failCount[pool]++ + if s.failCount[pool] >= 3 { + s.backoff[pool] = time.Now().Add(30 * time.Minute) + core.Print(nil, "rate-limit detected for %s — pausing pool for 30 minutes", pool) + } + } else { + s.failCount[pool] = 0 // slow failure = real failure, reset count + } + } + } else { + s.failCount[pool] = 0 // success resets count + } + // Stop Forge stopwatch on the issue (time tracking) if st, _ := readStatus(wsDir); st != nil && st.Issue > 0 { org := st.Org diff --git a/pkg/agentic/dispatch_sync.go b/pkg/agentic/dispatch_sync.go new file mode 100644 index 0000000..df0ceba --- /dev/null +++ b/pkg/agentic/dispatch_sync.go @@ -0,0 +1,97 @@ +// SPDX-License-Identifier: EUPL-1.2 + +package agentic + +import ( + "context" + "syscall" + "time" + + core "dappco.re/go/core" +) + +// DispatchSyncInput is the input for a synchronous (blocking) task run. +// +// input := agentic.DispatchSyncInput{Repo: "go-crypt", Agent: "codex:gpt-5.3-codex-spark", Task: "fix it", Issue: 7} +type DispatchSyncInput struct { + Org string + Repo string + Agent string + Task string + Issue int +} + +// DispatchSyncResult is the output of a synchronous task run. +// +// if result.OK { fmt.Println("done:", result.Status) } +type DispatchSyncResult struct { + OK bool + Status string + Error string + PRURL string +} + +// DispatchSync preps a workspace, spawns the agent directly (no queue, no concurrency check), +// and blocks until the agent completes. +// +// result := prep.DispatchSync(ctx, input) +func (s *PrepSubsystem) DispatchSync(ctx context.Context, input DispatchSyncInput) DispatchSyncResult { + // Prep workspace + prepInput := PrepInput{ + Org: input.Org, + Repo: input.Repo, + Task: input.Task, + Agent: input.Agent, + Issue: input.Issue, + } + + prepCtx, cancel := context.WithTimeout(ctx, 5*time.Minute) + defer cancel() + + _, prepOut, err := s.prepWorkspace(prepCtx, nil, prepInput) + if err != nil { + return DispatchSyncResult{Error: err.Error()} + } + if !prepOut.Success { + return DispatchSyncResult{Error: "prep failed"} + } + + wsDir := prepOut.WorkspaceDir + prompt := prepOut.Prompt + + core.Print(nil, " workspace: %s", wsDir) + core.Print(nil, " branch: %s", prepOut.Branch) + + // Spawn agent directly — no queue, no concurrency check + pid, _, err := s.spawnAgent(input.Agent, prompt, wsDir) + if err != nil { + return DispatchSyncResult{Error: err.Error()} + } + + core.Print(nil, " pid: %d", pid) + core.Print(nil, " waiting for completion...") + + // Poll for process exit + ticker := time.NewTicker(3 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return DispatchSyncResult{Error: "cancelled"} + case <-ticker.C: + if pid > 0 && syscall.Kill(pid, 0) != nil { + // Process exited — read final status + st, err := readStatus(wsDir) + if err != nil { + return DispatchSyncResult{Error: "can't read final status"} + } + return DispatchSyncResult{ + OK: st.Status == "completed", + Status: st.Status, + PRURL: st.PRURL, + } + } + } + } +} diff --git a/pkg/agentic/prep.go b/pkg/agentic/prep.go index 6989626..da5675c 100644 --- a/pkg/agentic/prep.go +++ b/pkg/agentic/prep.go @@ -46,6 +46,9 @@ type PrepSubsystem struct { onComplete CompletionNotifier drainMu sync.Mutex pokeCh chan struct{} + frozen bool + backoff map[string]time.Time // pool → paused until + failCount map[string]int // pool → consecutive fast failures } var _ coremcp.Subsystem = (*PrepSubsystem)(nil) @@ -79,6 +82,8 @@ func NewPrep() *PrepSubsystem { brainKey: brainKey, codePath: envOr("CODE_PATH", core.JoinPath(home, "Code")), client: &http.Client{Timeout: 30 * time.Second}, + backoff: make(map[string]time.Time), + failCount: make(map[string]int), } } @@ -116,6 +121,7 @@ func (s *PrepSubsystem) RegisterTools(server *mcp.Server) { s.registerRemoteDispatchTool(server) s.registerRemoteStatusTool(server) s.registerReviewQueueTool(server) + s.registerShutdownTools(server) mcp.AddTool(server, &mcp.Tool{ Name: "agentic_scan", diff --git a/pkg/agentic/queue.go b/pkg/agentic/queue.go index 521c1cc..9e2003c 100644 --- a/pkg/agentic/queue.go +++ b/pkg/agentic/queue.go @@ -144,6 +144,10 @@ func (s *PrepSubsystem) countRunningByAgent(agent string) int { // baseAgent strips the model variant (gemini:flash → gemini). func baseAgent(agent string) string { + // codex:gpt-5.3-codex-spark → codex-spark (separate pool) + if core.Contains(agent, "codex-spark") { + return "codex-spark" + } return core.SplitN(agent, ":", 2)[0] } @@ -161,6 +165,9 @@ func (s *PrepSubsystem) canDispatchAgent(agent string) bool { // drainQueue fills all available concurrency slots from queued workspaces. // Loops until no slots remain or no queued tasks match. Serialised via drainMu. func (s *PrepSubsystem) drainQueue() { + if s.frozen { + return + } s.drainMu.Lock() defer s.drainMu.Unlock() @@ -190,6 +197,12 @@ func (s *PrepSubsystem) drainOne() bool { continue } + // Skip if agent pool is in rate-limit backoff + pool := baseAgent(st.Agent) + if until, ok := s.backoff[pool]; ok && time.Now().Before(until) { + continue + } + // Apply rate delay before spawning delay := s.delayForAgent(st.Agent) if delay > 0 { diff --git a/pkg/agentic/runner.go b/pkg/agentic/runner.go index 8348f25..eb943d9 100644 --- a/pkg/agentic/runner.go +++ b/pkg/agentic/runner.go @@ -2,15 +2,28 @@ package agentic -import "time" +import ( + "time" + + core "dappco.re/go/core" +) // StartRunner begins the background queue runner. -// Ticks every 30s to drain queued tasks into available slots. -// Also responds to Poke() for immediate drain on completion events. +// Queue is frozen by default — use agentic_dispatch_start to unfreeze, +// or set CORE_AGENT_DISPATCH=1 to auto-start. // // prep.StartRunner() func (s *PrepSubsystem) StartRunner() { s.pokeCh = make(chan struct{}, 1) + + // Frozen by default — explicit start required + if core.Env("CORE_AGENT_DISPATCH") == "1" { + s.frozen = false + core.Print(nil, "dispatch: auto-start enabled (CORE_AGENT_DISPATCH=1)") + } else { + s.frozen = true + } + go s.runLoop() } diff --git a/pkg/agentic/shutdown.go b/pkg/agentic/shutdown.go new file mode 100644 index 0000000..220e351 --- /dev/null +++ b/pkg/agentic/shutdown.go @@ -0,0 +1,115 @@ +// SPDX-License-Identifier: EUPL-1.2 + +package agentic + +import ( + "context" + "syscall" + + core "dappco.re/go/core" + "github.com/modelcontextprotocol/go-sdk/mcp" +) + +// ShutdownInput is the input for agentic_dispatch_shutdown. +// +// input := agentic.ShutdownInput{} +type ShutdownInput struct{} + +// ShutdownOutput is the output for agentic_dispatch_shutdown. +// +// out := agentic.ShutdownOutput{Success: true, Running: 3, Message: "draining"} +type ShutdownOutput struct { + Success bool `json:"success"` + Running int `json:"running"` + Queued int `json:"queued"` + Message string `json:"message"` +} + +func (s *PrepSubsystem) registerShutdownTools(server *mcp.Server) { + mcp.AddTool(server, &mcp.Tool{ + Name: "agentic_dispatch_start", + Description: "Start the dispatch queue runner. Unfreezes the queue and begins draining.", + }, s.dispatchStart) + + mcp.AddTool(server, &mcp.Tool{ + Name: "agentic_dispatch_shutdown", + Description: "Graceful shutdown: stop accepting new jobs, let running agents finish. Queue is frozen.", + }, s.shutdownGraceful) + + mcp.AddTool(server, &mcp.Tool{ + Name: "agentic_dispatch_shutdown_now", + Description: "Hard shutdown: kill all running agents immediately. Queue is cleared.", + }, s.shutdownNow) +} + +// dispatchStart unfreezes the queue and starts draining. +func (s *PrepSubsystem) dispatchStart(ctx context.Context, _ *mcp.CallToolRequest, input ShutdownInput) (*mcp.CallToolResult, ShutdownOutput, error) { + s.frozen = false + s.Poke() // trigger immediate drain + + return nil, ShutdownOutput{ + Success: true, + Message: "dispatch started — queue unfrozen, draining", + }, nil +} + +// shutdownGraceful freezes the queue — running agents finish, no new dispatches. +func (s *PrepSubsystem) shutdownGraceful(ctx context.Context, _ *mcp.CallToolRequest, input ShutdownInput) (*mcp.CallToolResult, ShutdownOutput, error) { + s.frozen = true + + running := s.countRunningByAgent("codex") + s.countRunningByAgent("claude") + + s.countRunningByAgent("gemini") + s.countRunningByAgent("codex-spark") + + return nil, ShutdownOutput{ + Success: true, + Running: running, + Message: "queue frozen — running agents will finish, no new dispatches", + }, nil +} + +// shutdownNow kills all running agents and clears the queue. +func (s *PrepSubsystem) shutdownNow(ctx context.Context, _ *mcp.CallToolRequest, input ShutdownInput) (*mcp.CallToolResult, ShutdownOutput, error) { + s.frozen = true + + wsRoot := WorkspaceRoot() + old := core.PathGlob(core.JoinPath(wsRoot, "*", "status.json")) + deep := core.PathGlob(core.JoinPath(wsRoot, "*", "*", "*", "status.json")) + statusFiles := append(old, deep...) + + killed := 0 + cleared := 0 + + for _, statusPath := range statusFiles { + wsDir := core.PathDir(statusPath) + st, err := readStatus(wsDir) + if err != nil { + continue + } + + // Kill running agents + if st.Status == "running" && st.PID > 0 { + if syscall.Kill(st.PID, syscall.SIGTERM) == nil { + killed++ + } + st.Status = "failed" + st.Question = "killed by shutdown_now" + st.PID = 0 + writeStatus(wsDir, st) + } + + // Clear queued tasks + if st.Status == "queued" { + st.Status = "failed" + st.Question = "cleared by shutdown_now" + writeStatus(wsDir, st) + cleared++ + } + } + + return nil, ShutdownOutput{ + Success: true, + Running: 0, + Queued: 0, + Message: core.Sprintf("killed %d agents, cleared %d queued tasks", killed, cleared), + }, nil +} diff --git a/pkg/monitor/monitor.go b/pkg/monitor/monitor.go index e4188b9..6562eef 100644 --- a/pkg/monitor/monitor.go +++ b/pkg/monitor/monitor.go @@ -222,41 +222,71 @@ func (m *Subsystem) Poke() { } } -// AgentStarted pushes an immediate notification when an agent spawns. -// Called directly by dispatch — no filesystem polling. +// AgentStarted is called when an agent spawns. +// No individual notification — fleet status is checked on completion. // // mon.AgentStarted("codex:gpt-5.3-codex-spark", "go-io", "core/go-io/task-5") func (m *Subsystem) AgentStarted(agent, repo, workspace string) { - if m.notifier != nil { - m.notifier.ChannelSend(context.Background(), "agent.started", map[string]any{ - "agent": agent, - "repo": repo, - }) - } + // No-op — we only notify on failures and queue drain } -// AgentCompleted pushes an immediate notification when an agent finishes. -// Called directly by dispatch — no filesystem polling needed. +// AgentCompleted is called when an agent finishes. +// Only sends notifications for failures. Sends "queue.drained" when all work is done. // // mon.AgentCompleted("codex", "go-io", "core/go-io/task-5", "completed") func (m *Subsystem) AgentCompleted(agent, repo, workspace, status string) { - if m.notifier != nil { - // Count current running/queued from status for context - running := 0 - queued := 0 - m.mu.Lock() - m.seenCompleted[workspace] = true - m.mu.Unlock() + m.mu.Lock() + m.seenCompleted[workspace] = true + m.mu.Unlock() - m.notifier.ChannelSend(context.Background(), "agent.complete", map[string]any{ - "completed": []string{core.Sprintf("%s (%s) [%s]", repo, agent, status)}, - "count": 1, - "running": running, - "queued": queued, + if m.notifier != nil { + // Only notify on failures — those need attention + if status == "failed" || status == "blocked" { + m.notifier.ChannelSend(context.Background(), "agent.failed", map[string]any{ + "repo": repo, + "agent": agent, + "status": status, + }) + } + } + + // Check if queue is drained (0 running + 0 queued) + m.Poke() + go m.checkIdleAfterDelay() +} + +// checkIdleAfterDelay waits briefly then checks if the fleet is idle. +// Sends a single "queue.drained" notification when all work stops. +func (m *Subsystem) checkIdleAfterDelay() { + time.Sleep(5 * time.Second) // wait for runner to fill slots + if m.notifier == nil { + return + } + + // Quick count — scan for running/queued + running := 0 + queued := 0 + wsRoot := agentic.WorkspaceRoot() + old := core.PathGlob(core.JoinPath(wsRoot, "*", "status.json")) + deep := core.PathGlob(core.JoinPath(wsRoot, "*", "*", "*", "status.json")) + for _, path := range append(old, deep...) { + r := fs.Read(path) + if !r.OK { + continue + } + s := r.Value.(string) + if core.Contains(s, `"status":"running"`) { + running++ + } else if core.Contains(s, `"status":"queued"`) { + queued++ + } + } + + if running == 0 && queued == 0 { + m.notifier.ChannelSend(context.Background(), "queue.drained", map[string]any{ + "message": "all work complete", }) } - // Also poke to update counts for any other monitors - m.Poke() } func (m *Subsystem) loop(ctx context.Context) { @@ -379,12 +409,7 @@ func (m *Subsystem) checkCompletions() string { running++ if !m.seenRunning[wsName] && seeded { m.seenRunning[wsName] = true - if m.notifier != nil { - m.notifier.ChannelSend(context.Background(), "agent.started", map[string]any{ - "repo": st.Repo, - "agent": st.Agent, - }) - } + // No individual start notification — too noisy } case "queued": queued++ @@ -405,13 +430,11 @@ func (m *Subsystem) checkCompletions() string { return "" } - // Push channel events - if m.notifier != nil { - m.notifier.ChannelSend(context.Background(), "agent.complete", map[string]any{ - "count": len(newlyCompleted), - "completed": newlyCompleted, - "running": running, - "queued": queued, + // Only notify on queue drain (0 running + 0 queued) — individual completions are noise + if m.notifier != nil && running == 0 && queued == 0 { + m.notifier.ChannelSend(context.Background(), "queue.drained", map[string]any{ + "completed": len(newlyCompleted), + "message": "all work complete", }) }