agent/pkg/runner/runner.go
Snider 0fc6eeb4cc feat(runner): extract dispatch runner into independent Core service
Moves concurrency, queue drain, workspace lifecycle, and frozen state
from agentic/prep into pkg/runner/ — a standalone Core service that
communicates via IPC Actions only.

- runner.Register wires Actions: dispatch, status, start, stop, kill, poke
- runner.HandleIPCEvents catches AgentCompleted → ChannelPush + queue poke
- Agentic dispatch asks runner for permission via c.Action("runner.dispatch")
- Dispatch mutex moved to struct-level sync.Mutex (fixes core.Lock init race)
- Registry-based concurrency counting replaces disk scanning
- TrackWorkspace called on both queued and running status writes
- SpawnQueued message added for runner→agentic spawn requests
- ChannelPush message in core/mcp enables any service to push channel events
- 51 new tests covering runner service, queue, and config parsing

Co-Authored-By: Virgil <virgil@lethean.io>
2026-03-26 11:00:47 +00:00

345 lines
9 KiB
Go

// SPDX-License-Identifier: EUPL-1.2
// Package runner is the agent dispatch service.
// Owns concurrency, queue drain, workspace lifecycle, and frozen state.
// Communicates with other services via Core IPC — Actions, Tasks, and Messages.
//
// core.New(core.WithService(runner.Register))
package runner
import (
"context"
"sync"
"syscall"
"time"
"dappco.re/go/agent/pkg/messages"
core "dappco.re/go/core"
coremcp "dappco.re/go/mcp/pkg/mcp"
)
// Options configures the runner service.
type Options struct{}
// Service is the agent dispatch runner.
// Manages concurrency limits, queue drain, workspace lifecycle, and frozen state.
// All dispatch requests — MCP tool, CLI, or IPC — go through this service.
//
// r := runner.New()
// r.Dispatch(ctx, input) // checks frozen + concurrency, spawns or queues
type Service struct {
*core.ServiceRuntime[Options]
dispatchMu sync.Mutex
drainMu sync.Mutex
pokeCh chan struct{}
frozen bool
backoff map[string]time.Time
failCount map[string]int
workspaces *core.Registry[*WorkspaceStatus]
}
// New creates a runner service.
//
// svc := runner.New()
func New() *Service {
return &Service{
backoff: make(map[string]time.Time),
failCount: make(map[string]int),
workspaces: core.NewRegistry[*WorkspaceStatus](),
}
}
// Register is the service factory for core.WithService.
//
// core.New(core.WithService(runner.Register))
func Register(c *core.Core) core.Result {
svc := New()
svc.ServiceRuntime = core.NewServiceRuntime(c, Options{})
// Load agents config
cfg := svc.loadAgentsConfig()
c.Config().Set("agents.concurrency", cfg.Concurrency)
c.Config().Set("agents.rates", cfg.Rates)
c.Config().Set("agents.dispatch", cfg.Dispatch)
return core.Result{Value: svc, OK: true}
}
// OnStartup registers Actions and starts the queue runner.
//
// c.Perform("runner.dispatch", opts) // dispatch an agent
// c.Perform("runner.status", opts) // query workspace status
func (s *Service) OnStartup(ctx context.Context) core.Result {
c := s.Core()
// Actions — the runner's capability map
c.Action("runner.dispatch", s.actionDispatch).Description = "Dispatch a subagent (checks frozen + concurrency)"
c.Action("runner.status", s.actionStatus).Description = "Query workspace status"
c.Action("runner.start", s.actionStart).Description = "Unfreeze dispatch queue"
c.Action("runner.stop", s.actionStop).Description = "Freeze dispatch queue (graceful)"
c.Action("runner.kill", s.actionKill).Description = "Kill all running agents (hard stop)"
c.Action("runner.poke", s.actionPoke).Description = "Drain next queued task"
// Hydrate workspace registry from disk
s.hydrateWorkspaces()
// QUERY handler — workspace state queries
c.RegisterQuery(func(_ *core.Core, q core.Query) core.Result {
wq, ok := q.(WorkspaceQuery)
if !ok {
return core.Result{}
}
if wq.Name != "" {
return s.workspaces.Get(wq.Name)
}
if wq.Status != "" {
var names []string
s.workspaces.Each(func(name string, st *WorkspaceStatus) {
if st.Status == wq.Status {
names = append(names, name)
}
})
return core.Result{Value: names, OK: true}
}
return core.Result{Value: s.workspaces, OK: true}
})
// Start the background queue runner
s.startRunner()
return core.Result{OK: true}
}
// OnShutdown freezes the queue.
func (s *Service) OnShutdown(_ context.Context) core.Result {
s.frozen = true
return core.Result{OK: true}
}
// HandleIPCEvents catches agent lifecycle events from other services.
//
// AgentCompleted → push channel notification + poke queue
// PokeQueue → drain queue
func (s *Service) HandleIPCEvents(c *core.Core, msg core.Message) core.Result {
switch ev := msg.(type) {
case messages.AgentCompleted:
// Push channel event to Claude Code via MCP service
c.ACTION(coremcp.ChannelPush{
Channel: "agent.status",
Data: map[string]any{
"agent": ev.Agent,
"repo": ev.Repo,
"workspace": ev.Workspace,
"status": ev.Status,
},
})
// Poke queue to fill freed slot
s.Poke()
case messages.PokeQueue:
s.drainQueue()
_ = ev
}
return core.Result{OK: true}
}
// IsFrozen returns whether dispatch is currently frozen.
//
// if s.IsFrozen() { return "queue is frozen" }
func (s *Service) IsFrozen() bool {
return s.frozen
}
// Poke signals the runner to check the queue immediately.
//
// s.Poke()
func (s *Service) Poke() {
if s.pokeCh == nil {
return
}
select {
case s.pokeCh <- struct{}{}:
default:
}
}
// TrackWorkspace registers or updates a workspace in the in-memory Registry.
// Accepts any status type — agentic passes *agentic.WorkspaceStatus,
// runner stores its own *WorkspaceStatus copy.
//
// s.TrackWorkspace("core/go-io/task-5", st)
func (s *Service) TrackWorkspace(name string, st any) {
if s.workspaces == nil {
return
}
// Convert from agentic's type to runner's via JSON round-trip
json := core.JSONMarshalString(st)
var ws WorkspaceStatus
if r := core.JSONUnmarshalString(json, &ws); r.OK {
s.workspaces.Set(name, &ws)
}
}
// Workspaces returns the workspace Registry.
//
// s.Workspaces().Each(func(name string, st *WorkspaceStatus) { ... })
func (s *Service) Workspaces() *core.Registry[*WorkspaceStatus] {
return s.workspaces
}
// --- Actions ---
func (s *Service) actionDispatch(_ context.Context, opts core.Options) core.Result {
if s.frozen {
return core.Result{Value: "queue is frozen", OK: false}
}
// Dispatch is called by agentic via IPC — the actual spawn logic
// is delegated back to agentic which owns workspace prep + prompt building.
// Runner just gates: frozen check + concurrency check.
agent := opts.String("agent")
if agent == "" {
agent = "codex"
}
s.dispatchMu.Lock()
defer s.dispatchMu.Unlock()
if !s.canDispatchAgent(agent) {
return core.Result{Value: "queued — at concurrency limit", OK: false}
}
return core.Result{OK: true}
}
func (s *Service) actionStatus(_ context.Context, _ core.Options) core.Result {
running, queued, completed, failed := 0, 0, 0, 0
s.workspaces.Each(func(_ string, st *WorkspaceStatus) {
switch st.Status {
case "running":
running++
case "queued":
queued++
case "completed", "merged", "ready-for-review":
completed++
case "failed", "blocked":
failed++
}
})
return core.Result{Value: map[string]int{
"running": running, "queued": queued,
"completed": completed, "failed": failed,
"total": running + queued + completed + failed,
}, OK: true}
}
func (s *Service) actionStart(_ context.Context, _ core.Options) core.Result {
s.frozen = false
s.Poke()
return core.Result{Value: "dispatch started", OK: true}
}
func (s *Service) actionStop(_ context.Context, _ core.Options) core.Result {
s.frozen = true
return core.Result{Value: "queue frozen", OK: true}
}
func (s *Service) actionKill(_ context.Context, _ core.Options) core.Result {
s.frozen = true
killed := 0
s.workspaces.Each(func(_ string, st *WorkspaceStatus) {
if st.Status == "running" && st.PID > 0 {
if syscall.Kill(st.PID, syscall.SIGTERM) == nil {
killed++
}
st.Status = "failed"
st.PID = 0
}
if st.Status == "queued" {
st.Status = "failed"
}
})
return core.Result{Value: core.Sprintf("killed %d agents", killed), OK: true}
}
func (s *Service) actionPoke(_ context.Context, _ core.Options) core.Result {
s.drainQueue()
return core.Result{OK: true}
}
// --- Queue runner ---
func (s *Service) startRunner() {
s.pokeCh = make(chan struct{}, 1)
if core.Env("CORE_AGENT_DISPATCH") == "1" {
s.frozen = false
} else {
s.frozen = true
}
go s.runLoop()
}
func (s *Service) runLoop() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
s.drainQueue()
case <-s.pokeCh:
s.drainQueue()
}
}
}
// --- Workspace hydration ---
func (s *Service) hydrateWorkspaces() {
if s.workspaces == nil {
s.workspaces = core.NewRegistry[*WorkspaceStatus]()
}
wsRoot := WorkspaceRoot()
for _, pattern := range []string{
core.JoinPath(wsRoot, "*", "status.json"),
core.JoinPath(wsRoot, "*", "*", "*", "status.json"),
} {
for _, path := range core.PathGlob(pattern) {
wsDir := core.PathDir(path)
st, err := ReadStatus(wsDir)
if err != nil || st == nil {
continue
}
name := core.TrimPrefix(wsDir, wsRoot)
name = core.TrimPrefix(name, "/")
s.workspaces.Set(name, st)
}
}
}
// --- Types ---
// WorkspaceQuery is the QUERY type for workspace lookups.
//
// r := c.QUERY(runner.WorkspaceQuery{Status: "running"})
type WorkspaceQuery struct {
Name string
Status string
}
// WorkspaceStatus tracks the state of an agent workspace.
//
// st := &runner.WorkspaceStatus{Status: "running", Agent: "codex", Repo: "go-io", PID: 12345}
type WorkspaceStatus struct {
Status string `json:"status"`
Agent string `json:"agent"`
Repo string `json:"repo"`
Org string `json:"org,omitempty"`
Task string `json:"task,omitempty"`
Branch string `json:"branch,omitempty"`
PID int `json:"pid,omitempty"`
Question string `json:"question,omitempty"`
PRURL string `json:"pr_url,omitempty"`
StartedAt time.Time `json:"started_at"`
Runs int `json:"runs"`
}