// SPDX-License-Identifier: EUPL-1.2 // Package runner is the agent dispatch service. // Owns concurrency, queue drain, workspace lifecycle, and frozen state. // Communicates with other services via Core IPC — Actions, Tasks, and Messages. // // core.New(core.WithService(runner.Register)) package runner import ( "context" "sync" "syscall" "time" "dappco.re/go/agent/pkg/messages" core "dappco.re/go/core" coremcp "dappco.re/go/mcp/pkg/mcp" ) // Options configures the runner service. type Options struct{} // Service is the agent dispatch runner. // Manages concurrency limits, queue drain, workspace lifecycle, and frozen state. // All dispatch requests — MCP tool, CLI, or IPC — go through this service. // // r := runner.New() // r.Dispatch(ctx, input) // checks frozen + concurrency, spawns or queues type Service struct { *core.ServiceRuntime[Options] dispatchMu sync.Mutex drainMu sync.Mutex pokeCh chan struct{} frozen bool backoff map[string]time.Time failCount map[string]int workspaces *core.Registry[*WorkspaceStatus] } // New creates a runner service. // // svc := runner.New() func New() *Service { return &Service{ backoff: make(map[string]time.Time), failCount: make(map[string]int), workspaces: core.NewRegistry[*WorkspaceStatus](), } } // Register is the service factory for core.WithService. // // core.New(core.WithService(runner.Register)) func Register(c *core.Core) core.Result { svc := New() svc.ServiceRuntime = core.NewServiceRuntime(c, Options{}) // Load agents config cfg := svc.loadAgentsConfig() c.Config().Set("agents.concurrency", cfg.Concurrency) c.Config().Set("agents.rates", cfg.Rates) c.Config().Set("agents.dispatch", cfg.Dispatch) return core.Result{Value: svc, OK: true} } // OnStartup registers Actions and starts the queue runner. // // c.Perform("runner.dispatch", opts) // dispatch an agent // c.Perform("runner.status", opts) // query workspace status func (s *Service) OnStartup(ctx context.Context) core.Result { c := s.Core() // Actions — the runner's capability map c.Action("runner.dispatch", s.actionDispatch).Description = "Dispatch a subagent (checks frozen + concurrency)" c.Action("runner.status", s.actionStatus).Description = "Query workspace status" c.Action("runner.start", s.actionStart).Description = "Unfreeze dispatch queue" c.Action("runner.stop", s.actionStop).Description = "Freeze dispatch queue (graceful)" c.Action("runner.kill", s.actionKill).Description = "Kill all running agents (hard stop)" c.Action("runner.poke", s.actionPoke).Description = "Drain next queued task" // Hydrate workspace registry from disk s.hydrateWorkspaces() // QUERY handler — workspace state queries c.RegisterQuery(func(_ *core.Core, q core.Query) core.Result { wq, ok := q.(WorkspaceQuery) if !ok { return core.Result{} } if wq.Name != "" { return s.workspaces.Get(wq.Name) } if wq.Status != "" { var names []string s.workspaces.Each(func(name string, st *WorkspaceStatus) { if st.Status == wq.Status { names = append(names, name) } }) return core.Result{Value: names, OK: true} } return core.Result{Value: s.workspaces, OK: true} }) // Start the background queue runner s.startRunner() return core.Result{OK: true} } // OnShutdown freezes the queue. func (s *Service) OnShutdown(_ context.Context) core.Result { s.frozen = true return core.Result{OK: true} } // HandleIPCEvents catches agent lifecycle events from other services. // // AgentCompleted → push channel notification + poke queue // PokeQueue → drain queue func (s *Service) HandleIPCEvents(c *core.Core, msg core.Message) core.Result { switch ev := msg.(type) { case messages.AgentCompleted: // Push channel event to Claude Code via MCP service c.ACTION(coremcp.ChannelPush{ Channel: "agent.status", Data: map[string]any{ "agent": ev.Agent, "repo": ev.Repo, "workspace": ev.Workspace, "status": ev.Status, }, }) // Poke queue to fill freed slot s.Poke() case messages.PokeQueue: s.drainQueue() _ = ev } return core.Result{OK: true} } // IsFrozen returns whether dispatch is currently frozen. // // if s.IsFrozen() { return "queue is frozen" } func (s *Service) IsFrozen() bool { return s.frozen } // Poke signals the runner to check the queue immediately. // // s.Poke() func (s *Service) Poke() { if s.pokeCh == nil { return } select { case s.pokeCh <- struct{}{}: default: } } // TrackWorkspace registers or updates a workspace in the in-memory Registry. // Accepts any status type — agentic passes *agentic.WorkspaceStatus, // runner stores its own *WorkspaceStatus copy. // // s.TrackWorkspace("core/go-io/task-5", st) func (s *Service) TrackWorkspace(name string, st any) { if s.workspaces == nil { return } // Convert from agentic's type to runner's via JSON round-trip json := core.JSONMarshalString(st) var ws WorkspaceStatus if r := core.JSONUnmarshalString(json, &ws); r.OK { s.workspaces.Set(name, &ws) } } // Workspaces returns the workspace Registry. // // s.Workspaces().Each(func(name string, st *WorkspaceStatus) { ... }) func (s *Service) Workspaces() *core.Registry[*WorkspaceStatus] { return s.workspaces } // --- Actions --- func (s *Service) actionDispatch(_ context.Context, opts core.Options) core.Result { if s.frozen { return core.Result{Value: "queue is frozen", OK: false} } agent := opts.String("agent") if agent == "" { agent = "codex" } repo := opts.String("repo") s.dispatchMu.Lock() defer s.dispatchMu.Unlock() if !s.canDispatchAgent(agent) { return core.Result{Value: "queued — at concurrency limit", OK: false} } // Reserve the slot immediately — before returning to agentic. // Without this, parallel dispatches all see count < limit. name := core.Concat("pending/", repo) s.workspaces.Set(name, &WorkspaceStatus{ Status: "running", Agent: agent, Repo: repo, PID: -1, // placeholder — agentic will update with real PID via TrackWorkspace }) return core.Result{OK: true} } func (s *Service) actionStatus(_ context.Context, _ core.Options) core.Result { running, queued, completed, failed := 0, 0, 0, 0 s.workspaces.Each(func(_ string, st *WorkspaceStatus) { switch st.Status { case "running": running++ case "queued": queued++ case "completed", "merged", "ready-for-review": completed++ case "failed", "blocked": failed++ } }) return core.Result{Value: map[string]int{ "running": running, "queued": queued, "completed": completed, "failed": failed, "total": running + queued + completed + failed, }, OK: true} } func (s *Service) actionStart(_ context.Context, _ core.Options) core.Result { s.frozen = false s.Poke() return core.Result{Value: "dispatch started", OK: true} } func (s *Service) actionStop(_ context.Context, _ core.Options) core.Result { s.frozen = true return core.Result{Value: "queue frozen", OK: true} } func (s *Service) actionKill(_ context.Context, _ core.Options) core.Result { s.frozen = true killed := 0 s.workspaces.Each(func(_ string, st *WorkspaceStatus) { if st.Status == "running" && st.PID > 0 { if syscall.Kill(st.PID, syscall.SIGTERM) == nil { killed++ } st.Status = "failed" st.PID = 0 } if st.Status == "queued" { st.Status = "failed" } }) return core.Result{Value: core.Sprintf("killed %d agents", killed), OK: true} } func (s *Service) actionPoke(_ context.Context, _ core.Options) core.Result { s.drainQueue() return core.Result{OK: true} } // --- Queue runner --- func (s *Service) startRunner() { s.pokeCh = make(chan struct{}, 1) if core.Env("CORE_AGENT_DISPATCH") == "1" { s.frozen = false } else { s.frozen = true } go s.runLoop() } func (s *Service) runLoop() { ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() for { select { case <-ticker.C: s.drainQueue() case <-s.pokeCh: s.drainQueue() } } } // --- Workspace hydration --- func (s *Service) hydrateWorkspaces() { if s.workspaces == nil { s.workspaces = core.NewRegistry[*WorkspaceStatus]() } wsRoot := WorkspaceRoot() for _, pattern := range []string{ core.JoinPath(wsRoot, "*", "status.json"), core.JoinPath(wsRoot, "*", "*", "*", "status.json"), } { for _, path := range core.PathGlob(pattern) { wsDir := core.PathDir(path) st, err := ReadStatus(wsDir) if err != nil || st == nil { continue } name := core.TrimPrefix(wsDir, wsRoot) name = core.TrimPrefix(name, "/") s.workspaces.Set(name, st) } } } // --- Types --- // WorkspaceQuery is the QUERY type for workspace lookups. // // r := c.QUERY(runner.WorkspaceQuery{Status: "running"}) type WorkspaceQuery struct { Name string Status string } // WorkspaceStatus tracks the state of an agent workspace. // // st := &runner.WorkspaceStatus{Status: "running", Agent: "codex", Repo: "go-io", PID: 12345} type WorkspaceStatus struct { Status string `json:"status"` Agent string `json:"agent"` Repo string `json:"repo"` Org string `json:"org,omitempty"` Task string `json:"task,omitempty"` Branch string `json:"branch,omitempty"` PID int `json:"pid,omitempty"` Question string `json:"question,omitempty"` PRURL string `json:"pr_url,omitempty"` StartedAt time.Time `json:"started_at"` Runs int `json:"runs"` }