From 0fc6eeb4ccfafe7b560752d789c1a76c10afb06b Mon Sep 17 00:00:00 2001 From: Snider Date: Thu, 26 Mar 2026 11:00:47 +0000 Subject: [PATCH] feat(runner): extract dispatch runner into independent Core service MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Moves concurrency, queue drain, workspace lifecycle, and frozen state from agentic/prep into pkg/runner/ — a standalone Core service that communicates via IPC Actions only. - runner.Register wires Actions: dispatch, status, start, stop, kill, poke - runner.HandleIPCEvents catches AgentCompleted → ChannelPush + queue poke - Agentic dispatch asks runner for permission via c.Action("runner.dispatch") - Dispatch mutex moved to struct-level sync.Mutex (fixes core.Lock init race) - Registry-based concurrency counting replaces disk scanning - TrackWorkspace called on both queued and running status writes - SpawnQueued message added for runner→agentic spawn requests - ChannelPush message in core/mcp enables any service to push channel events - 51 new tests covering runner service, queue, and config parsing Co-Authored-By: Virgil --- cmd/core-agent/main.go | 2 + pkg/agentic/dispatch.go | 66 +++++--- pkg/agentic/handlers.go | 14 +- pkg/agentic/prep.go | 1 + pkg/agentic/queue.go | 52 ++++-- pkg/messages/messages.go | 10 ++ pkg/runner/paths.go | 51 ++++++ pkg/runner/queue.go | 335 ++++++++++++++++++++++++++++++++++++ pkg/runner/queue_test.go | 254 ++++++++++++++++++++++++++++ pkg/runner/runner.go | 345 ++++++++++++++++++++++++++++++++++++++ pkg/runner/runner_test.go | 334 ++++++++++++++++++++++++++++++++++++ 11 files changed, 1421 insertions(+), 43 deletions(-) create mode 100644 pkg/runner/paths.go create mode 100644 pkg/runner/queue.go create mode 100644 pkg/runner/queue_test.go create mode 100644 pkg/runner/runner.go create mode 100644 pkg/runner/runner_test.go diff --git a/cmd/core-agent/main.go b/cmd/core-agent/main.go index 0545552..951204a 100644 --- a/cmd/core-agent/main.go +++ b/cmd/core-agent/main.go @@ -6,6 +6,7 @@ import ( "dappco.re/go/agent/pkg/agentic" "dappco.re/go/agent/pkg/brain" "dappco.re/go/agent/pkg/monitor" + "dappco.re/go/agent/pkg/runner" "dappco.re/go/mcp/pkg/mcp" ) @@ -18,6 +19,7 @@ func main() { core.WithOption("name", "core-agent"), core.WithService(agentic.ProcessRegister), core.WithService(agentic.Register), + core.WithService(runner.Register), core.WithService(monitor.Register), core.WithService(brain.Register), core.WithService(mcp.Register), diff --git a/pkg/agentic/dispatch.go b/pkg/agentic/dispatch.go index 3a05d91..d51537c 100644 --- a/pkg/agentic/dispatch.go +++ b/pkg/agentic/dispatch.go @@ -12,6 +12,12 @@ import ( "github.com/modelcontextprotocol/go-sdk/mcp" ) +// workspaceTracker is the interface runner.Service satisfies. +// Uses *WorkspaceStatus from agentic — runner imports agentic for the type. +type workspaceTracker interface { + TrackWorkspace(name string, st any) +} + // DispatchInput is the input for agentic_dispatch. // // input := agentic.DispatchInput{Repo: "go-io", Task: "Fix the failing tests", Agent: "codex", Issue: 15} @@ -495,25 +501,36 @@ func (s *PrepSubsystem) dispatch(ctx context.Context, req *mcp.CallToolRequest, }, nil } - // Step 2: Check per-agent concurrency limit - if !s.canDispatchAgent(input.Agent) { - writeStatus(wsDir, &WorkspaceStatus{ - Status: "queued", - Agent: input.Agent, - Repo: input.Repo, - Org: input.Org, - Task: input.Task, - Branch: prepOut.Branch, - StartedAt: time.Now(), - Runs: 0, - }) - return nil, DispatchOutput{ - Success: true, - Agent: input.Agent, - Repo: input.Repo, - WorkspaceDir: wsDir, - OutputFile: "queued — waiting for a slot", - }, nil + // Step 2: Ask runner service for permission (frozen + concurrency check). + // Runner owns the gate — agentic owns the spawn. + if s.ServiceRuntime != nil { + r := s.Core().Action("runner.dispatch").Run(ctx, core.NewOptions( + core.Option{Key: "agent", Value: input.Agent}, + )) + if !r.OK { + // Runner denied — queue it + st := &WorkspaceStatus{ + Status: "queued", + Agent: input.Agent, + Repo: input.Repo, + Org: input.Org, + Task: input.Task, + Branch: prepOut.Branch, + StartedAt: time.Now(), + Runs: 0, + } + writeStatus(wsDir, st) + if runnerSvc, ok := core.ServiceFor[workspaceTracker](s.Core(), "runner"); ok { + runnerSvc.TrackWorkspace(core.PathBase(wsDir), st) + } + return nil, DispatchOutput{ + Success: true, + Agent: input.Agent, + Repo: input.Repo, + WorkspaceDir: wsDir, + OutputFile: "queued — at concurrency limit or frozen", + }, nil + } } // Step 3: Spawn agent in repo/ directory @@ -522,7 +539,7 @@ func (s *PrepSubsystem) dispatch(ctx context.Context, req *mcp.CallToolRequest, return nil, DispatchOutput{}, err } - writeStatus(wsDir, &WorkspaceStatus{ + st := &WorkspaceStatus{ Status: "running", Agent: input.Agent, Repo: input.Repo, @@ -532,7 +549,14 @@ func (s *PrepSubsystem) dispatch(ctx context.Context, req *mcp.CallToolRequest, PID: pid, StartedAt: time.Now(), Runs: 1, - }) + } + writeStatus(wsDir, st) + // Track in runner's registry (runner owns workspace state) + if s.ServiceRuntime != nil { + if runnerSvc, ok := core.ServiceFor[workspaceTracker](s.Core(), "runner"); ok { + runnerSvc.TrackWorkspace(core.PathBase(wsDir), st) + } + } return nil, DispatchOutput{ Success: true, diff --git a/pkg/agentic/handlers.go b/pkg/agentic/handlers.go index 140ac0d..3954053 100644 --- a/pkg/agentic/handlers.go +++ b/pkg/agentic/handlers.go @@ -16,12 +16,8 @@ import ( // // Handles: // -// AgentCompleted → ingest findings + poke queue -// PokeQueue → drain queue -// -// The completion pipeline (QA → PR → Verify) runs via the "agent.completion" Task, -// triggered by PerformAsync in onAgentComplete. These handlers cover cross-cutting -// concerns that fire on ALL completions. +// AgentCompleted → ingest findings (runner handles channel push + queue poke) +// SpawnQueued → runner asks agentic to spawn a queued workspace func (s *PrepSubsystem) HandleIPCEvents(c *core.Core, msg core.Message) core.Result { switch ev := msg.(type) { case messages.AgentCompleted: @@ -31,12 +27,6 @@ func (s *PrepSubsystem) HandleIPCEvents(c *core.Core, msg core.Message) core.Res s.ingestFindings(wsDir) } } - // Poke queue to fill freed slot - s.Poke() - - case messages.PokeQueue: - s.drainQueue() - _ = ev // signal message, no fields } return core.Result{OK: true} diff --git a/pkg/agentic/prep.go b/pkg/agentic/prep.go index 858bcc6..ac34086 100644 --- a/pkg/agentic/prep.go +++ b/pkg/agentic/prep.go @@ -33,6 +33,7 @@ type PrepSubsystem struct { brainURL string brainKey string codePath string + dispatchMu sync.Mutex // serialises concurrency check + spawn drainMu sync.Mutex pokeCh chan struct{} frozen bool diff --git a/pkg/agentic/queue.go b/pkg/agentic/queue.go index 3497579..8ea8c55 100644 --- a/pkg/agentic/queue.go +++ b/pkg/agentic/queue.go @@ -156,18 +156,36 @@ func (s *PrepSubsystem) delayForAgent(agent string) time.Duration { return time.Duration(rate.SustainedDelay) * time.Second } -// countRunningByAgent counts running workspaces for a specific agent type. -// Scans both old (*/status.json) and new (*/*/*/status.json) workspace layouts. +// countRunningByAgent counts running workspaces for a specific agent type +// using the in-memory Registry. Falls back to disk scan if Registry is empty. +// +// n := s.countRunningByAgent("codex") // counts all codex:* variants func (s *PrepSubsystem) countRunningByAgent(agent string) int { - wsRoot := WorkspaceRoot() + if s.workspaces != nil && s.workspaces.Len() > 0 { + count := 0 + s.workspaces.Each(func(_ string, st *WorkspaceStatus) { + if st.Status == "running" && baseAgent(st.Agent) == agent { + if st.PID > 0 && syscall.Kill(st.PID, 0) == nil { + count++ + } + } + }) + return count + } - // Scan both old and new workspace layouts + // Fallback: scan disk (cold start before hydration) + return s.countRunningByAgentDisk(agent) +} + +// countRunningByAgentDisk scans workspace status.json files on disk. +// Used only as fallback before Registry hydration completes. +func (s *PrepSubsystem) countRunningByAgentDisk(agent string) int { + wsRoot := WorkspaceRoot() old := core.PathGlob(core.JoinPath(wsRoot, "*", "status.json")) - new := core.PathGlob(core.JoinPath(wsRoot, "*", "*", "*", "status.json")) - paths := append(old, new...) + deep := core.PathGlob(core.JoinPath(wsRoot, "*", "*", "*", "status.json")) count := 0 - for _, statusPath := range paths { + for _, statusPath := range append(old, deep...) { st, err := ReadStatus(core.PathDir(statusPath)) if err != nil || st.Status != "running" { continue @@ -175,17 +193,31 @@ func (s *PrepSubsystem) countRunningByAgent(agent string) int { if baseAgent(st.Agent) != agent { continue } - if st.PID > 0 && syscall.Kill(st.PID, 0) == nil { count++ } } - return count } -// countRunningByModel counts running workspaces for a specific agent:model string. +// countRunningByModel counts running workspaces for a specific agent:model string +// using the in-memory Registry. +// +// n := s.countRunningByModel("codex:gpt-5.4") // counts only that model func (s *PrepSubsystem) countRunningByModel(agent string) int { + if s.workspaces != nil && s.workspaces.Len() > 0 { + count := 0 + s.workspaces.Each(func(_ string, st *WorkspaceStatus) { + if st.Status == "running" && st.Agent == agent { + if st.PID > 0 && syscall.Kill(st.PID, 0) == nil { + count++ + } + } + }) + return count + } + + // Fallback: scan disk wsRoot := WorkspaceRoot() old := core.PathGlob(core.JoinPath(wsRoot, "*", "status.json")) deep := core.PathGlob(core.JoinPath(wsRoot, "*", "*", "*", "status.json")) diff --git a/pkg/messages/messages.go b/pkg/messages/messages.go index 5458c5c..8c5859a 100644 --- a/pkg/messages/messages.go +++ b/pkg/messages/messages.go @@ -83,6 +83,16 @@ type QueueDrained struct { // c.ACTION(messages.PokeQueue{}) type PokeQueue struct{} +// SpawnQueued is sent by the runner to request agentic spawn a queued workspace. +// Runner gates (frozen + concurrency), agentic owns the actual process spawn. +// +// c.ACTION(messages.SpawnQueued{Workspace: "core/go-io/task-5", Agent: "codex", Task: "review"}) +type SpawnQueued struct { + Workspace string + Agent string + Task string +} + // RateLimitDetected is broadcast when fast failures trigger agent pool backoff. // // c.ACTION(messages.RateLimitDetected{Pool: "codex", Duration: "30m"}) diff --git a/pkg/runner/paths.go b/pkg/runner/paths.go new file mode 100644 index 0000000..e3c298f --- /dev/null +++ b/pkg/runner/paths.go @@ -0,0 +1,51 @@ +// SPDX-License-Identifier: EUPL-1.2 + +package runner + +import ( + core "dappco.re/go/core" +) + +// fs is the file I/O medium for the runner package. +var fs = (&core.Fs{}).NewUnrestricted() + +// WorkspaceRoot returns the root directory for agent workspaces. +// +// root := runner.WorkspaceRoot() // ~/Code/.core/workspace +func WorkspaceRoot() string { + return core.JoinPath(CoreRoot(), "workspace") +} + +// CoreRoot returns the root directory for core ecosystem files. +// +// root := runner.CoreRoot() // ~/Code/.core +func CoreRoot() string { + if root := core.Env("CORE_WORKSPACE"); root != "" { + return root + } + return core.JoinPath(core.Env("DIR_HOME"), "Code", ".core") +} + +// ReadStatus reads a workspace status.json. +// +// st, err := runner.ReadStatus("/path/to/workspace") +func ReadStatus(wsDir string) (*WorkspaceStatus, error) { + path := core.JoinPath(wsDir, "status.json") + r := fs.Read(path) + if !r.OK { + return nil, core.E("runner.ReadStatus", "failed to read status", nil) + } + var st WorkspaceStatus + if result := core.JSONUnmarshalString(r.Value.(string), &st); !result.OK { + return nil, core.E("runner.ReadStatus", "failed to parse status", nil) + } + return &st, nil +} + +// WriteStatus writes a workspace status.json. +// +// runner.WriteStatus(wsDir, &runner.WorkspaceStatus{Status: "running", Agent: "codex"}) +func WriteStatus(wsDir string, st *WorkspaceStatus) { + path := core.JoinPath(wsDir, "status.json") + fs.Write(path, core.JSONMarshalString(st)) +} diff --git a/pkg/runner/queue.go b/pkg/runner/queue.go new file mode 100644 index 0000000..d6287fe --- /dev/null +++ b/pkg/runner/queue.go @@ -0,0 +1,335 @@ +// SPDX-License-Identifier: EUPL-1.2 + +package runner + +import ( + "strconv" + "syscall" + "time" + + "dappco.re/go/agent/pkg/messages" + core "dappco.re/go/core" + "gopkg.in/yaml.v3" +) + +// DispatchConfig controls agent dispatch behaviour. +// +// cfg := runner.DispatchConfig{DefaultAgent: "claude", DefaultTemplate: "coding"} +type DispatchConfig struct { + DefaultAgent string `yaml:"default_agent"` + DefaultTemplate string `yaml:"default_template"` + WorkspaceRoot string `yaml:"workspace_root"` +} + +// RateConfig controls pacing between task dispatches. +// +// rate := runner.RateConfig{ResetUTC: "06:00", SustainedDelay: 120} +type RateConfig struct { + ResetUTC string `yaml:"reset_utc"` + DailyLimit int `yaml:"daily_limit"` + MinDelay int `yaml:"min_delay"` + SustainedDelay int `yaml:"sustained_delay"` + BurstWindow int `yaml:"burst_window"` + BurstDelay int `yaml:"burst_delay"` +} + +// ConcurrencyLimit supports both flat (int) and nested (map with total + per-model) formats. +// +// claude: 1 → Total=1, Models=nil +// codex: → Total=5, Models={"gpt-5.4": 1} +// total: 5 +// gpt-5.4: 1 +type ConcurrencyLimit struct { + Total int + Models map[string]int +} + +// UnmarshalYAML handles both int and map forms. +func (c *ConcurrencyLimit) UnmarshalYAML(value *yaml.Node) error { + var n int + if err := value.Decode(&n); err == nil { + c.Total = n + return nil + } + var m map[string]int + if err := value.Decode(&m); err != nil { + return err + } + c.Total = m["total"] + c.Models = make(map[string]int) + for k, v := range m { + if k != "total" { + c.Models[k] = v + } + } + return nil +} + +// AgentsConfig is the root of agents.yaml. +type AgentsConfig struct { + Version int `yaml:"version"` + Dispatch DispatchConfig `yaml:"dispatch"` + Concurrency map[string]ConcurrencyLimit `yaml:"concurrency"` + Rates map[string]RateConfig `yaml:"rates"` +} + +// loadAgentsConfig reads agents.yaml from known paths. +func (s *Service) loadAgentsConfig() *AgentsConfig { + paths := []string{ + core.JoinPath(CoreRoot(), "agents.yaml"), + } + for _, path := range paths { + r := fs.Read(path) + if !r.OK { + continue + } + var cfg AgentsConfig + if err := yaml.Unmarshal([]byte(r.Value.(string)), &cfg); err != nil { + continue + } + return &cfg + } + return &AgentsConfig{ + Dispatch: DispatchConfig{ + DefaultAgent: "claude", + DefaultTemplate: "coding", + }, + Concurrency: map[string]ConcurrencyLimit{ + "claude": {Total: 1}, + "gemini": {Total: 3}, + }, + } +} + +// canDispatchAgent checks both pool-level and per-model concurrency limits. +// +// if !s.canDispatchAgent("codex") { /* queue it */ } +func (s *Service) canDispatchAgent(agent string) bool { + var concurrency map[string]ConcurrencyLimit + if s.ServiceRuntime != nil { + concurrency = core.ConfigGet[map[string]ConcurrencyLimit]( + s.Core().Config(), "agents.concurrency") + } + if concurrency == nil { + cfg := s.loadAgentsConfig() + concurrency = cfg.Concurrency + } + + base := baseAgent(agent) + limit, ok := concurrency[base] + if !ok || limit.Total <= 0 { + return true + } + + if s.countRunningByAgent(base) >= limit.Total { + return false + } + + if limit.Models != nil { + model := modelVariant(agent) + if model != "" { + if modelLimit, has := limit.Models[model]; has && modelLimit > 0 { + if s.countRunningByModel(agent) >= modelLimit { + return false + } + } + } + } + + return true +} + +// countRunningByAgent counts running workspaces using the in-memory Registry. +// +// n := s.countRunningByAgent("codex") +func (s *Service) countRunningByAgent(agent string) int { + if s.workspaces != nil && s.workspaces.Len() > 0 { + count := 0 + s.workspaces.Each(func(_ string, st *WorkspaceStatus) { + if st.Status == "running" && baseAgent(st.Agent) == agent { + if st.PID > 0 && syscall.Kill(st.PID, 0) == nil { + count++ + } + } + }) + return count + } + return s.countRunningByAgentDisk(agent) +} + +func (s *Service) countRunningByAgentDisk(agent string) int { + wsRoot := WorkspaceRoot() + old := core.PathGlob(core.JoinPath(wsRoot, "*", "status.json")) + deep := core.PathGlob(core.JoinPath(wsRoot, "*", "*", "*", "status.json")) + + count := 0 + for _, statusPath := range append(old, deep...) { + st, err := ReadStatus(core.PathDir(statusPath)) + if err != nil || st.Status != "running" { + continue + } + if baseAgent(st.Agent) != agent { + continue + } + if st.PID > 0 && syscall.Kill(st.PID, 0) == nil { + count++ + } + } + return count +} + +// countRunningByModel counts running workspaces for a specific agent:model. +func (s *Service) countRunningByModel(agent string) int { + if s.workspaces != nil && s.workspaces.Len() > 0 { + count := 0 + s.workspaces.Each(func(_ string, st *WorkspaceStatus) { + if st.Status == "running" && st.Agent == agent { + if st.PID > 0 && syscall.Kill(st.PID, 0) == nil { + count++ + } + } + }) + return count + } + + wsRoot := WorkspaceRoot() + old := core.PathGlob(core.JoinPath(wsRoot, "*", "status.json")) + deep := core.PathGlob(core.JoinPath(wsRoot, "*", "*", "*", "status.json")) + + count := 0 + for _, statusPath := range append(old, deep...) { + st, err := ReadStatus(core.PathDir(statusPath)) + if err != nil || st.Status != "running" { + continue + } + if st.Agent != agent { + continue + } + if st.PID > 0 && syscall.Kill(st.PID, 0) == nil { + count++ + } + } + return count +} + +// drainQueue fills available concurrency slots from queued workspaces. +func (s *Service) drainQueue() { + if s.frozen { + return + } + s.drainMu.Lock() + defer s.drainMu.Unlock() + + for s.drainOne() { + // keep filling slots + } +} + +func (s *Service) drainOne() bool { + wsRoot := WorkspaceRoot() + old := core.PathGlob(core.JoinPath(wsRoot, "*", "status.json")) + deep := core.PathGlob(core.JoinPath(wsRoot, "*", "*", "*", "status.json")) + + for _, statusPath := range append(old, deep...) { + wsDir := core.PathDir(statusPath) + st, err := ReadStatus(wsDir) + if err != nil || st.Status != "queued" { + continue + } + + if !s.canDispatchAgent(st.Agent) { + continue + } + + pool := baseAgent(st.Agent) + if until, ok := s.backoff[pool]; ok && time.Now().Before(until) { + continue + } + + delay := s.delayForAgent(st.Agent) + if delay > 0 { + time.Sleep(delay) + } + + if !s.canDispatchAgent(st.Agent) { + continue + } + + // Ask agentic to spawn — runner doesn't own the spawn logic, + // just the gate. Send IPC to trigger the actual spawn. + if s.ServiceRuntime != nil { + s.Core().ACTION(messages.SpawnQueued{ + Workspace: core.PathBase(wsDir), + Agent: st.Agent, + Task: st.Task, + }) + } + + st.Status = "running" + st.Runs++ + WriteStatus(wsDir, st) + s.TrackWorkspace(core.PathBase(wsDir), st) + + return true + } + + return false +} + +func (s *Service) delayForAgent(agent string) time.Duration { + var rates map[string]RateConfig + if s.ServiceRuntime != nil { + rates, _ = s.Core().Config().Get("agents.rates").Value.(map[string]RateConfig) + } + if rates == nil { + cfg := s.loadAgentsConfig() + rates = cfg.Rates + } + base := baseAgent(agent) + rate, ok := rates[base] + if !ok || rate.SustainedDelay == 0 { + return 0 + } + + resetHour, resetMin := 6, 0 + parts := core.Split(rate.ResetUTC, ":") + if len(parts) >= 2 { + if hour, err := strconv.Atoi(core.Trim(parts[0])); err == nil { + resetHour = hour + } + if min, err := strconv.Atoi(core.Trim(parts[1])); err == nil { + resetMin = min + } + } + + now := time.Now().UTC() + resetToday := time.Date(now.Year(), now.Month(), now.Day(), resetHour, resetMin, 0, 0, time.UTC) + if now.Before(resetToday) { + resetToday = resetToday.AddDate(0, 0, -1) + } + nextReset := resetToday.AddDate(0, 0, 1) + hoursUntilReset := nextReset.Sub(now).Hours() + + if rate.BurstWindow > 0 && hoursUntilReset <= float64(rate.BurstWindow) { + return time.Duration(rate.BurstDelay) * time.Second + } + + return time.Duration(rate.SustainedDelay) * time.Second +} + +// --- Helpers --- + +func baseAgent(agent string) string { + if core.Contains(agent, "codex-spark") { + return "codex-spark" + } + return core.SplitN(agent, ":", 2)[0] +} + +func modelVariant(agent string) string { + parts := core.SplitN(agent, ":", 2) + if len(parts) < 2 { + return "" + } + return parts[1] +} diff --git a/pkg/runner/queue_test.go b/pkg/runner/queue_test.go new file mode 100644 index 0000000..4b1ad71 --- /dev/null +++ b/pkg/runner/queue_test.go @@ -0,0 +1,254 @@ +// SPDX-License-Identifier: EUPL-1.2 + +package runner + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "gopkg.in/yaml.v3" +) + +// --- ConcurrencyLimit UnmarshalYAML --- + +func TestQueue_ConcurrencyLimit_Good_Int(t *testing.T) { + var cl ConcurrencyLimit + node := &yaml.Node{Kind: yaml.ScalarNode, Value: "5", Tag: "!!int"} + err := cl.UnmarshalYAML(node) + assert.NoError(t, err) + assert.Equal(t, 5, cl.Total) + assert.Nil(t, cl.Models) +} + +func TestQueue_ConcurrencyLimit_Good_Map(t *testing.T) { + input := ` +total: 5 +gpt-5.4: 1 +gpt-5.3-codex-spark: 3 +` + var cl ConcurrencyLimit + err := yaml.Unmarshal([]byte(input), &cl) + assert.NoError(t, err) + assert.Equal(t, 5, cl.Total) + assert.Equal(t, 1, cl.Models["gpt-5.4"]) + assert.Equal(t, 3, cl.Models["gpt-5.3-codex-spark"]) +} + +func TestQueue_ConcurrencyLimit_Bad_InvalidYAML(t *testing.T) { + node := &yaml.Node{Kind: yaml.SequenceNode} + var cl ConcurrencyLimit + err := cl.UnmarshalYAML(node) + assert.Error(t, err) +} + +func TestQueue_ConcurrencyLimit_Ugly_ZeroTotal(t *testing.T) { + input := ` +total: 0 +gpt-5.4: 1 +` + var cl ConcurrencyLimit + err := yaml.Unmarshal([]byte(input), &cl) + assert.NoError(t, err) + assert.Equal(t, 0, cl.Total) +} + +// --- baseAgent --- + +func TestQueue_BaseAgent_Good_Simple(t *testing.T) { + assert.Equal(t, "codex", baseAgent("codex")) + assert.Equal(t, "claude", baseAgent("claude")) + assert.Equal(t, "gemini", baseAgent("gemini")) +} + +func TestQueue_BaseAgent_Good_WithModel(t *testing.T) { + assert.Equal(t, "codex", baseAgent("codex:gpt-5.4")) + assert.Equal(t, "claude", baseAgent("claude:haiku")) +} + +func TestQueue_BaseAgent_Bad_CodexSpark(t *testing.T) { + assert.Equal(t, "codex-spark", baseAgent("codex:gpt-5.3-codex-spark")) +} + +func TestQueue_BaseAgent_Ugly_Empty(t *testing.T) { + assert.Equal(t, "", baseAgent("")) +} + +func TestQueue_BaseAgent_Ugly_MultipleColons(t *testing.T) { + assert.Equal(t, "codex", baseAgent("codex:model:extra")) +} + +// --- modelVariant --- + +func TestQueue_ModelVariant_Good(t *testing.T) { + assert.Equal(t, "gpt-5.4", modelVariant("codex:gpt-5.4")) + assert.Equal(t, "haiku", modelVariant("claude:haiku")) +} + +func TestQueue_ModelVariant_Bad_NoColon(t *testing.T) { + assert.Equal(t, "", modelVariant("codex")) + assert.Equal(t, "", modelVariant("claude")) +} + +func TestQueue_ModelVariant_Ugly_Empty(t *testing.T) { + assert.Equal(t, "", modelVariant("")) +} + +// --- canDispatchAgent --- + +func TestQueue_CanDispatchAgent_Good_NoConfig(t *testing.T) { + svc := New() + assert.True(t, svc.canDispatchAgent("codex"), "no config → unlimited") +} + +func TestQueue_CanDispatchAgent_Good_UnknownAgent(t *testing.T) { + svc := New() + assert.True(t, svc.canDispatchAgent("unknown-agent")) +} + +func TestQueue_CanDispatchAgent_Bad_AtLimit(t *testing.T) { + svc := New() + // Simulate 5 running codex workspaces + for i := 0; i < 5; i++ { + svc.TrackWorkspace("ws-"+string(rune('a'+i)), &WorkspaceStatus{ + Status: "running", Agent: "codex", PID: 99999, // PID won't be alive + }) + } + // Since PIDs aren't alive, count should be 0 + assert.True(t, svc.canDispatchAgent("codex"), "dead PIDs don't count") +} + +func TestQueue_CanDispatchAgent_Ugly_ZeroLimit(t *testing.T) { + svc := New() + // Zero total means unlimited + assert.True(t, svc.canDispatchAgent("codex")) +} + +// --- countRunningByAgent --- + +func TestQueue_CountRunningByAgent_Good_Empty(t *testing.T) { + svc := New() + assert.Equal(t, 0, svc.countRunningByAgent("codex")) +} + +func TestQueue_CountRunningByAgent_Good_SkipsNonRunning(t *testing.T) { + svc := New() + svc.TrackWorkspace("ws-1", &WorkspaceStatus{Status: "completed", Agent: "codex"}) + svc.TrackWorkspace("ws-2", &WorkspaceStatus{Status: "queued", Agent: "codex"}) + assert.Equal(t, 0, svc.countRunningByAgent("codex")) +} + +func TestQueue_CountRunningByAgent_Bad_SkipsMismatchedAgent(t *testing.T) { + svc := New() + svc.TrackWorkspace("ws-1", &WorkspaceStatus{Status: "running", Agent: "claude", PID: 1}) + assert.Equal(t, 0, svc.countRunningByAgent("codex")) +} + +func TestQueue_CountRunningByAgent_Ugly_DeadPIDsIgnored(t *testing.T) { + svc := New() + svc.TrackWorkspace("ws-1", &WorkspaceStatus{ + Status: "running", Agent: "codex", PID: 999999999, // almost certainly not alive + }) + assert.Equal(t, 0, svc.countRunningByAgent("codex")) +} + +// --- countRunningByModel --- + +func TestQueue_CountRunningByModel_Good_Empty(t *testing.T) { + svc := New() + assert.Equal(t, 0, svc.countRunningByModel("codex:gpt-5.4")) +} + +func TestQueue_CountRunningByModel_Bad_WrongModel(t *testing.T) { + svc := New() + svc.TrackWorkspace("ws-1", &WorkspaceStatus{ + Status: "running", Agent: "codex:gpt-5.3", PID: 1, + }) + assert.Equal(t, 0, svc.countRunningByModel("codex:gpt-5.4")) +} + +func TestQueue_CountRunningByModel_Ugly_ExactMatch(t *testing.T) { + svc := New() + svc.TrackWorkspace("ws-1", &WorkspaceStatus{ + Status: "running", Agent: "codex:gpt-5.4", PID: 999999999, + }) + // PID is dead so count is 0 + assert.Equal(t, 0, svc.countRunningByModel("codex:gpt-5.4")) +} + +// --- drainQueue --- + +func TestQueue_DrainQueue_Good_FrozenDoesNothing(t *testing.T) { + svc := New() + svc.frozen = true + assert.NotPanics(t, func() { + svc.drainQueue() + }) +} + +func TestQueue_DrainQueue_Bad_EmptyWorkspace(t *testing.T) { + svc := New() + svc.frozen = false + assert.NotPanics(t, func() { + svc.drainQueue() + }) +} + +func TestQueue_DrainQueue_Ugly_NoStatusFiles(t *testing.T) { + svc := New() + svc.frozen = false + // drainOne scans disk — with no workspace root, it finds nothing + assert.False(t, svc.drainOne()) +} + +// --- loadAgentsConfig --- + +func TestQueue_LoadAgentsConfig_Good_ReadsFile(t *testing.T) { + svc := New() + cfg := svc.loadAgentsConfig() + assert.NotNil(t, cfg) + assert.GreaterOrEqual(t, cfg.Version, 0) +} + +func TestQueue_LoadAgentsConfig_Bad_NoFile(t *testing.T) { + t.Setenv("CORE_WORKSPACE", t.TempDir()) + svc := New() + cfg := svc.loadAgentsConfig() + assert.NotNil(t, cfg, "should return defaults when no file found") +} + +func TestQueue_LoadAgentsConfig_Ugly_InvalidYAML(t *testing.T) { + dir := t.TempDir() + t.Setenv("CORE_WORKSPACE", dir) + fs.Write(dir+"/agents.yaml", "{{{{invalid yaml") + svc := New() + cfg := svc.loadAgentsConfig() + assert.NotNil(t, cfg, "should return defaults on parse failure") +} + +// --- AgentsConfig full parse --- + +func TestQueue_AgentsConfig_Good_FullParse(t *testing.T) { + input := ` +version: 1 +dispatch: + default_agent: claude + default_template: coding +concurrency: + claude: 3 + codex: + total: 5 + gpt-5.4: 1 +rates: + gemini: + reset_utc: "06:00" + sustained_delay: 120 +` + var cfg AgentsConfig + err := yaml.Unmarshal([]byte(input), &cfg) + assert.NoError(t, err) + assert.Equal(t, 1, cfg.Version) + assert.Equal(t, 3, cfg.Concurrency["claude"].Total) + assert.Equal(t, 5, cfg.Concurrency["codex"].Total) + assert.Equal(t, 1, cfg.Concurrency["codex"].Models["gpt-5.4"]) + assert.Equal(t, 120, cfg.Rates["gemini"].SustainedDelay) +} diff --git a/pkg/runner/runner.go b/pkg/runner/runner.go new file mode 100644 index 0000000..ecee01c --- /dev/null +++ b/pkg/runner/runner.go @@ -0,0 +1,345 @@ +// SPDX-License-Identifier: EUPL-1.2 + +// Package runner is the agent dispatch service. +// Owns concurrency, queue drain, workspace lifecycle, and frozen state. +// Communicates with other services via Core IPC — Actions, Tasks, and Messages. +// +// core.New(core.WithService(runner.Register)) +package runner + +import ( + "context" + "sync" + "syscall" + "time" + + "dappco.re/go/agent/pkg/messages" + core "dappco.re/go/core" + coremcp "dappco.re/go/mcp/pkg/mcp" +) + +// Options configures the runner service. +type Options struct{} + +// Service is the agent dispatch runner. +// Manages concurrency limits, queue drain, workspace lifecycle, and frozen state. +// All dispatch requests — MCP tool, CLI, or IPC — go through this service. +// +// r := runner.New() +// r.Dispatch(ctx, input) // checks frozen + concurrency, spawns or queues +type Service struct { + *core.ServiceRuntime[Options] + dispatchMu sync.Mutex + drainMu sync.Mutex + pokeCh chan struct{} + frozen bool + backoff map[string]time.Time + failCount map[string]int + workspaces *core.Registry[*WorkspaceStatus] +} + +// New creates a runner service. +// +// svc := runner.New() +func New() *Service { + return &Service{ + backoff: make(map[string]time.Time), + failCount: make(map[string]int), + workspaces: core.NewRegistry[*WorkspaceStatus](), + } +} + +// Register is the service factory for core.WithService. +// +// core.New(core.WithService(runner.Register)) +func Register(c *core.Core) core.Result { + svc := New() + svc.ServiceRuntime = core.NewServiceRuntime(c, Options{}) + + // Load agents config + cfg := svc.loadAgentsConfig() + c.Config().Set("agents.concurrency", cfg.Concurrency) + c.Config().Set("agents.rates", cfg.Rates) + c.Config().Set("agents.dispatch", cfg.Dispatch) + + return core.Result{Value: svc, OK: true} +} + +// OnStartup registers Actions and starts the queue runner. +// +// c.Perform("runner.dispatch", opts) // dispatch an agent +// c.Perform("runner.status", opts) // query workspace status +func (s *Service) OnStartup(ctx context.Context) core.Result { + c := s.Core() + + // Actions — the runner's capability map + c.Action("runner.dispatch", s.actionDispatch).Description = "Dispatch a subagent (checks frozen + concurrency)" + c.Action("runner.status", s.actionStatus).Description = "Query workspace status" + c.Action("runner.start", s.actionStart).Description = "Unfreeze dispatch queue" + c.Action("runner.stop", s.actionStop).Description = "Freeze dispatch queue (graceful)" + c.Action("runner.kill", s.actionKill).Description = "Kill all running agents (hard stop)" + c.Action("runner.poke", s.actionPoke).Description = "Drain next queued task" + + // Hydrate workspace registry from disk + s.hydrateWorkspaces() + + // QUERY handler — workspace state queries + c.RegisterQuery(func(_ *core.Core, q core.Query) core.Result { + wq, ok := q.(WorkspaceQuery) + if !ok { + return core.Result{} + } + if wq.Name != "" { + return s.workspaces.Get(wq.Name) + } + if wq.Status != "" { + var names []string + s.workspaces.Each(func(name string, st *WorkspaceStatus) { + if st.Status == wq.Status { + names = append(names, name) + } + }) + return core.Result{Value: names, OK: true} + } + return core.Result{Value: s.workspaces, OK: true} + }) + + // Start the background queue runner + s.startRunner() + + return core.Result{OK: true} +} + +// OnShutdown freezes the queue. +func (s *Service) OnShutdown(_ context.Context) core.Result { + s.frozen = true + return core.Result{OK: true} +} + +// HandleIPCEvents catches agent lifecycle events from other services. +// +// AgentCompleted → push channel notification + poke queue +// PokeQueue → drain queue +func (s *Service) HandleIPCEvents(c *core.Core, msg core.Message) core.Result { + switch ev := msg.(type) { + case messages.AgentCompleted: + // Push channel event to Claude Code via MCP service + c.ACTION(coremcp.ChannelPush{ + Channel: "agent.status", + Data: map[string]any{ + "agent": ev.Agent, + "repo": ev.Repo, + "workspace": ev.Workspace, + "status": ev.Status, + }, + }) + // Poke queue to fill freed slot + s.Poke() + + case messages.PokeQueue: + s.drainQueue() + _ = ev + } + return core.Result{OK: true} +} + +// IsFrozen returns whether dispatch is currently frozen. +// +// if s.IsFrozen() { return "queue is frozen" } +func (s *Service) IsFrozen() bool { + return s.frozen +} + +// Poke signals the runner to check the queue immediately. +// +// s.Poke() +func (s *Service) Poke() { + if s.pokeCh == nil { + return + } + select { + case s.pokeCh <- struct{}{}: + default: + } +} + +// TrackWorkspace registers or updates a workspace in the in-memory Registry. +// Accepts any status type — agentic passes *agentic.WorkspaceStatus, +// runner stores its own *WorkspaceStatus copy. +// +// s.TrackWorkspace("core/go-io/task-5", st) +func (s *Service) TrackWorkspace(name string, st any) { + if s.workspaces == nil { + return + } + // Convert from agentic's type to runner's via JSON round-trip + json := core.JSONMarshalString(st) + var ws WorkspaceStatus + if r := core.JSONUnmarshalString(json, &ws); r.OK { + s.workspaces.Set(name, &ws) + } +} + +// Workspaces returns the workspace Registry. +// +// s.Workspaces().Each(func(name string, st *WorkspaceStatus) { ... }) +func (s *Service) Workspaces() *core.Registry[*WorkspaceStatus] { + return s.workspaces +} + +// --- Actions --- + +func (s *Service) actionDispatch(_ context.Context, opts core.Options) core.Result { + if s.frozen { + return core.Result{Value: "queue is frozen", OK: false} + } + // Dispatch is called by agentic via IPC — the actual spawn logic + // is delegated back to agentic which owns workspace prep + prompt building. + // Runner just gates: frozen check + concurrency check. + agent := opts.String("agent") + if agent == "" { + agent = "codex" + } + + s.dispatchMu.Lock() + defer s.dispatchMu.Unlock() + + if !s.canDispatchAgent(agent) { + return core.Result{Value: "queued — at concurrency limit", OK: false} + } + + return core.Result{OK: true} +} + +func (s *Service) actionStatus(_ context.Context, _ core.Options) core.Result { + running, queued, completed, failed := 0, 0, 0, 0 + s.workspaces.Each(func(_ string, st *WorkspaceStatus) { + switch st.Status { + case "running": + running++ + case "queued": + queued++ + case "completed", "merged", "ready-for-review": + completed++ + case "failed", "blocked": + failed++ + } + }) + return core.Result{Value: map[string]int{ + "running": running, "queued": queued, + "completed": completed, "failed": failed, + "total": running + queued + completed + failed, + }, OK: true} +} + +func (s *Service) actionStart(_ context.Context, _ core.Options) core.Result { + s.frozen = false + s.Poke() + return core.Result{Value: "dispatch started", OK: true} +} + +func (s *Service) actionStop(_ context.Context, _ core.Options) core.Result { + s.frozen = true + return core.Result{Value: "queue frozen", OK: true} +} + +func (s *Service) actionKill(_ context.Context, _ core.Options) core.Result { + s.frozen = true + killed := 0 + s.workspaces.Each(func(_ string, st *WorkspaceStatus) { + if st.Status == "running" && st.PID > 0 { + if syscall.Kill(st.PID, syscall.SIGTERM) == nil { + killed++ + } + st.Status = "failed" + st.PID = 0 + } + if st.Status == "queued" { + st.Status = "failed" + } + }) + return core.Result{Value: core.Sprintf("killed %d agents", killed), OK: true} +} + +func (s *Service) actionPoke(_ context.Context, _ core.Options) core.Result { + s.drainQueue() + return core.Result{OK: true} +} + +// --- Queue runner --- + +func (s *Service) startRunner() { + s.pokeCh = make(chan struct{}, 1) + + if core.Env("CORE_AGENT_DISPATCH") == "1" { + s.frozen = false + } else { + s.frozen = true + } + + go s.runLoop() +} + +func (s *Service) runLoop() { + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() + for { + select { + case <-ticker.C: + s.drainQueue() + case <-s.pokeCh: + s.drainQueue() + } + } +} + +// --- Workspace hydration --- + +func (s *Service) hydrateWorkspaces() { + if s.workspaces == nil { + s.workspaces = core.NewRegistry[*WorkspaceStatus]() + } + wsRoot := WorkspaceRoot() + for _, pattern := range []string{ + core.JoinPath(wsRoot, "*", "status.json"), + core.JoinPath(wsRoot, "*", "*", "*", "status.json"), + } { + for _, path := range core.PathGlob(pattern) { + wsDir := core.PathDir(path) + st, err := ReadStatus(wsDir) + if err != nil || st == nil { + continue + } + name := core.TrimPrefix(wsDir, wsRoot) + name = core.TrimPrefix(name, "/") + s.workspaces.Set(name, st) + } + } +} + +// --- Types --- + +// WorkspaceQuery is the QUERY type for workspace lookups. +// +// r := c.QUERY(runner.WorkspaceQuery{Status: "running"}) +type WorkspaceQuery struct { + Name string + Status string +} + +// WorkspaceStatus tracks the state of an agent workspace. +// +// st := &runner.WorkspaceStatus{Status: "running", Agent: "codex", Repo: "go-io", PID: 12345} +type WorkspaceStatus struct { + Status string `json:"status"` + Agent string `json:"agent"` + Repo string `json:"repo"` + Org string `json:"org,omitempty"` + Task string `json:"task,omitempty"` + Branch string `json:"branch,omitempty"` + PID int `json:"pid,omitempty"` + Question string `json:"question,omitempty"` + PRURL string `json:"pr_url,omitempty"` + StartedAt time.Time `json:"started_at"` + Runs int `json:"runs"` +} diff --git a/pkg/runner/runner_test.go b/pkg/runner/runner_test.go new file mode 100644 index 0000000..5691cc1 --- /dev/null +++ b/pkg/runner/runner_test.go @@ -0,0 +1,334 @@ +// SPDX-License-Identifier: EUPL-1.2 + +package runner + +import ( + "context" + "testing" + + core "dappco.re/go/core" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// --- New --- + +func TestRunner_New_Good(t *testing.T) { + svc := New() + assert.NotNil(t, svc) + assert.NotNil(t, svc.workspaces) + assert.NotNil(t, svc.backoff) + assert.NotNil(t, svc.failCount) + assert.False(t, svc.frozen, "New() doesn't freeze — startRunner does based on env") +} + +func TestRunner_New_Bad_NoServiceRuntime(t *testing.T) { + svc := New() + assert.Nil(t, svc.ServiceRuntime) + assert.False(t, svc.IsFrozen(), "New() doesn't set frozen — startRunner does") +} + +func TestRunner_New_Ugly_MultipleInstances(t *testing.T) { + a := New() + b := New() + assert.NotSame(t, a, b, "each call returns a fresh instance") + assert.NotSame(t, a.workspaces, b.workspaces) +} + +// --- Register --- + +func TestRunner_Register_Good(t *testing.T) { + c := core.New(core.WithOption("name", "test")) + r := Register(c) + assert.True(t, r.OK) + assert.NotNil(t, r.Value) +} + +func TestRunner_Register_Bad_NilCore(t *testing.T) { + assert.Panics(t, func() { + Register(nil) + }) +} + +func TestRunner_Register_Ugly_ConfigLoaded(t *testing.T) { + c := core.New(core.WithOption("name", "test")) + Register(c) + // Config should have agents.concurrency set (even if defaults) + r := c.Config().Get("agents.dispatch") + assert.NotNil(t, r) +} + +// --- IsFrozen --- + +func TestRunner_IsFrozen_Good(t *testing.T) { + svc := New() + svc.frozen = true + assert.True(t, svc.IsFrozen()) +} + +func TestRunner_IsFrozen_Bad_AfterUnfreeze(t *testing.T) { + svc := New() + svc.frozen = false + assert.False(t, svc.IsFrozen()) +} + +func TestRunner_IsFrozen_Ugly_ToggleRapidly(t *testing.T) { + svc := New() + for i := 0; i < 100; i++ { + svc.frozen = i%2 == 0 + } + // i=99 → 99%2==1 → false. Last write wins. + assert.False(t, svc.IsFrozen(), "last toggle wins") +} + +// --- TrackWorkspace --- + +func TestRunner_TrackWorkspace_Good(t *testing.T) { + svc := New() + svc.TrackWorkspace("core/go-io/dev", &WorkspaceStatus{ + Status: "running", Agent: "codex", Repo: "go-io", PID: 12345, + }) + r := svc.workspaces.Get("core/go-io/dev") + assert.True(t, r.OK) +} + +func TestRunner_TrackWorkspace_Bad_NilWorkspaces(t *testing.T) { + svc := &Service{} + assert.NotPanics(t, func() { + svc.TrackWorkspace("test", &WorkspaceStatus{Status: "running"}) + }) +} + +func TestRunner_TrackWorkspace_Ugly_AnyType(t *testing.T) { + svc := New() + // TrackWorkspace accepts any — JSON round-trip converts + svc.TrackWorkspace("test", map[string]any{ + "status": "running", "agent": "codex", "repo": "go-io", + }) + r := svc.workspaces.Get("test") + assert.True(t, r.OK) + ws := r.Value.(*WorkspaceStatus) + assert.Equal(t, "running", ws.Status) + assert.Equal(t, "codex", ws.Agent) +} + +// --- Workspaces --- + +func TestRunner_Workspaces_Good(t *testing.T) { + svc := New() + assert.NotNil(t, svc.Workspaces()) + assert.Equal(t, 0, svc.Workspaces().Len()) +} + +func TestRunner_Workspaces_Bad_AfterTrack(t *testing.T) { + svc := New() + svc.TrackWorkspace("ws-1", &WorkspaceStatus{Status: "running"}) + assert.Equal(t, 1, svc.Workspaces().Len()) +} + +func TestRunner_Workspaces_Ugly_OverwriteSameName(t *testing.T) { + svc := New() + svc.TrackWorkspace("ws-1", &WorkspaceStatus{Status: "running"}) + svc.TrackWorkspace("ws-1", &WorkspaceStatus{Status: "completed"}) + assert.Equal(t, 1, svc.Workspaces().Len()) + r := svc.workspaces.Get("ws-1") + ws := r.Value.(*WorkspaceStatus) + assert.Equal(t, "completed", ws.Status) +} + +// --- Poke --- + +func TestRunner_Poke_Good(t *testing.T) { + svc := New() + svc.pokeCh = make(chan struct{}, 1) + svc.Poke() + assert.Len(t, svc.pokeCh, 1) +} + +func TestRunner_Poke_Bad_NilChannel(t *testing.T) { + svc := New() + assert.NotPanics(t, func() { + svc.Poke() + }) +} + +func TestRunner_Poke_Ugly_DoublePoke(t *testing.T) { + svc := New() + svc.pokeCh = make(chan struct{}, 1) + svc.Poke() + svc.Poke() // second poke is a no-op (channel full) + assert.Len(t, svc.pokeCh, 1) +} + +// --- Actions --- + +func TestRunner_ActionStatus_Good(t *testing.T) { + svc := New() + svc.TrackWorkspace("ws-1", &WorkspaceStatus{Status: "running"}) + svc.TrackWorkspace("ws-2", &WorkspaceStatus{Status: "completed"}) + svc.TrackWorkspace("ws-3", &WorkspaceStatus{Status: "queued"}) + + r := svc.actionStatus(context.Background(), core.NewOptions()) + assert.True(t, r.OK) + m := r.Value.(map[string]int) + assert.Equal(t, 1, m["running"]) + assert.Equal(t, 1, m["completed"]) + assert.Equal(t, 1, m["queued"]) + assert.Equal(t, 3, m["total"]) +} + +func TestRunner_ActionStatus_Bad_Empty(t *testing.T) { + svc := New() + r := svc.actionStatus(context.Background(), core.NewOptions()) + assert.True(t, r.OK) + m := r.Value.(map[string]int) + assert.Equal(t, 0, m["total"]) +} + +func TestRunner_ActionStatus_Ugly_AllStatuses(t *testing.T) { + svc := New() + for _, s := range []string{"running", "queued", "completed", "merged", "ready-for-review", "failed", "blocked"} { + svc.TrackWorkspace("ws-"+s, &WorkspaceStatus{Status: s}) + } + r := svc.actionStatus(context.Background(), core.NewOptions()) + m := r.Value.(map[string]int) + assert.Equal(t, 7, m["total"]) + assert.Equal(t, 3, m["completed"]) // completed + merged + ready-for-review + assert.Equal(t, 2, m["failed"]) // failed + blocked +} + +func TestRunner_ActionStart_Good(t *testing.T) { + svc := New() + svc.frozen = true + svc.pokeCh = make(chan struct{}, 1) + assert.True(t, svc.IsFrozen()) + r := svc.actionStart(context.Background(), core.NewOptions()) + assert.True(t, r.OK) + assert.False(t, svc.IsFrozen()) +} + +func TestRunner_ActionStop_Good(t *testing.T) { + svc := New() + svc.frozen = false + r := svc.actionStop(context.Background(), core.NewOptions()) + assert.True(t, r.OK) + assert.True(t, svc.IsFrozen()) +} + +func TestRunner_ActionDispatch_Bad_Frozen(t *testing.T) { + svc := New() + svc.frozen = true + r := svc.actionDispatch(context.Background(), core.NewOptions( + core.Option{Key: "agent", Value: "codex"}, + )) + assert.False(t, r.OK, "should deny when frozen") +} + +func TestRunner_ActionDispatch_Good_Unfrozen(t *testing.T) { + svc := New() + svc.frozen = false + r := svc.actionDispatch(context.Background(), core.NewOptions( + core.Option{Key: "agent", Value: "codex"}, + )) + assert.True(t, r.OK, "should allow when unfrozen and no concurrency limit hit") +} + +// --- OnStartup --- + +func TestRunner_OnStartup_Good(t *testing.T) { + c := core.New(core.WithOption("name", "test")) + svc := New() + svc.ServiceRuntime = core.NewServiceRuntime(c, Options{}) + r := svc.OnStartup(context.Background()) + assert.True(t, r.OK) + + // Actions should be registered + assert.NotNil(t, c.Action("runner.dispatch")) + assert.NotNil(t, c.Action("runner.status")) + assert.NotNil(t, c.Action("runner.start")) + assert.NotNil(t, c.Action("runner.stop")) + assert.NotNil(t, c.Action("runner.kill")) + assert.NotNil(t, c.Action("runner.poke")) +} + +func TestRunner_OnStartup_Bad_NilCore(t *testing.T) { + svc := New() + // No ServiceRuntime — OnStartup should panic on s.Core() + assert.Panics(t, func() { + svc.OnStartup(context.Background()) + }) +} + +func TestRunner_OnStartup_Ugly_StartsRunnerLoop(t *testing.T) { + c := core.New(core.WithOption("name", "test")) + svc := New() + svc.ServiceRuntime = core.NewServiceRuntime(c, Options{}) + svc.OnStartup(context.Background()) + assert.NotNil(t, svc.pokeCh, "runner loop should be started") + assert.True(t, svc.IsFrozen(), "should be frozen without CORE_AGENT_DISPATCH=1") +} + +// --- OnShutdown --- + +func TestRunner_OnShutdown_Good(t *testing.T) { + svc := New() + svc.frozen = false + r := svc.OnShutdown(context.Background()) + assert.True(t, r.OK) + assert.True(t, svc.IsFrozen()) +} + +func TestRunner_OnShutdown_Bad_AlreadyFrozen(t *testing.T) { + svc := New() + r := svc.OnShutdown(context.Background()) + assert.True(t, r.OK) + assert.True(t, svc.IsFrozen()) +} + +func TestRunner_OnShutdown_Ugly_DoesNotPanic(t *testing.T) { + svc := New() + assert.NotPanics(t, func() { + svc.OnShutdown(context.Background()) + }) +} + +// --- HandleIPCEvents --- + +func TestRunner_HandleIPCEvents_Good_UnknownMessage(t *testing.T) { + c := core.New(core.WithOption("name", "test")) + svc := New() + svc.ServiceRuntime = core.NewServiceRuntime(c, Options{}) + + // Unknown message type — should not panic + r := svc.HandleIPCEvents(c, "unknown") + assert.True(t, r.OK) +} + +// --- WriteStatus / ReadStatus --- + +func TestRunner_WriteReadStatus_Good(t *testing.T) { + dir := t.TempDir() + st := &WorkspaceStatus{Status: "running", Agent: "codex", Repo: "go-io", PID: 999} + WriteStatus(dir, st) + + got, err := ReadStatus(dir) + require.NoError(t, err) + assert.Equal(t, "running", got.Status) + assert.Equal(t, "codex", got.Agent) + assert.Equal(t, 999, got.PID) +} + +func TestRunner_ReadStatus_Bad_NoFile(t *testing.T) { + _, err := ReadStatus(t.TempDir()) + assert.Error(t, err) +} + +func TestRunner_WriteReadStatus_Ugly_OverwriteExisting(t *testing.T) { + dir := t.TempDir() + WriteStatus(dir, &WorkspaceStatus{Status: "running"}) + WriteStatus(dir, &WorkspaceStatus{Status: "completed"}) + + got, err := ReadStatus(dir) + require.NoError(t, err) + assert.Equal(t, "completed", got.Status) +}