diff --git a/pkg/agentic/queue.go b/pkg/agentic/queue.go index 52a5669..9aa04d1 100644 --- a/pkg/agentic/queue.go +++ b/pkg/agentic/queue.go @@ -32,14 +32,49 @@ type RateConfig struct { BurstDelay int `yaml:"burst_delay"` // Delay during burst window } +// ConcurrencyLimit supports both flat (int) and nested (map with total + per-model) formats. +// +// claude: 1 → Total=1, Models=nil +// codex: → Total=2, Models={"gpt-5.4": 1, "gpt-5.3-codex-spark": 1} +// total: 2 +// gpt-5.4: 1 +// gpt-5.3-codex-spark: 1 +type ConcurrencyLimit struct { + Total int + Models map[string]int +} + +// UnmarshalYAML handles both int and map forms. +func (c *ConcurrencyLimit) UnmarshalYAML(value *yaml.Node) error { + // Try int first + var n int + if err := value.Decode(&n); err == nil { + c.Total = n + return nil + } + // Try map + var m map[string]int + if err := value.Decode(&m); err != nil { + return err + } + c.Total = m["total"] + c.Models = make(map[string]int) + for k, v := range m { + if k != "total" { + c.Models[k] = v + } + } + return nil +} + // AgentsConfig is the root of config/agents.yaml. // // cfg := agentic.AgentsConfig{Version: 1, Dispatch: agentic.DispatchConfig{DefaultAgent: "claude"}} type AgentsConfig struct { - Version int `yaml:"version"` - Dispatch DispatchConfig `yaml:"dispatch"` - Concurrency map[string]int `yaml:"concurrency"` - Rates map[string]RateConfig `yaml:"rates"` + Version int `yaml:"version"` + Dispatch DispatchConfig `yaml:"dispatch"` + Concurrency map[string]ConcurrencyLimit `yaml:"concurrency"` + Rates map[string]RateConfig `yaml:"rates"` } // loadAgentsConfig reads config/agents.yaml from the code path. @@ -66,9 +101,9 @@ func (s *PrepSubsystem) loadAgentsConfig() *AgentsConfig { DefaultAgent: "claude", DefaultTemplate: "coding", }, - Concurrency: map[string]int{ - "claude": 1, - "gemini": 3, + Concurrency: map[string]ConcurrencyLimit{ + "claude": {Total: 1}, + "gemini": {Total: 3}, }, } } @@ -142,6 +177,28 @@ func (s *PrepSubsystem) countRunningByAgent(agent string) int { return count } +// countRunningByModel counts running workspaces for a specific agent:model string. +func (s *PrepSubsystem) countRunningByModel(agent string) int { + wsRoot := WorkspaceRoot() + old := core.PathGlob(core.JoinPath(wsRoot, "*", "status.json")) + deep := core.PathGlob(core.JoinPath(wsRoot, "*", "*", "*", "status.json")) + + count := 0 + for _, statusPath := range append(old, deep...) { + st, err := ReadStatus(core.PathDir(statusPath)) + if err != nil || st.Status != "running" { + continue + } + if st.Agent != agent { + continue + } + if st.PID > 0 && syscall.Kill(st.PID, 0) == nil { + count++ + } + } + return count +} + // baseAgent strips the model variant (gemini:flash → gemini). func baseAgent(agent string) string { // codex:gpt-5.3-codex-spark → codex-spark (separate pool) @@ -151,15 +208,48 @@ func baseAgent(agent string) string { return core.SplitN(agent, ":", 2)[0] } -// canDispatchAgent checks if we're under the concurrency limit for a specific agent type. +// canDispatchAgent checks both pool-level and per-model concurrency limits. +// +// codex: {total: 2, models: {gpt-5.4: 1}} → max 2 codex total, max 1 gpt-5.4 func (s *PrepSubsystem) canDispatchAgent(agent string) bool { cfg := s.loadAgentsConfig() base := baseAgent(agent) limit, ok := cfg.Concurrency[base] - if !ok || limit <= 0 { + if !ok || limit.Total <= 0 { return true } - return s.countRunningByAgent(base) < limit + + // Check pool total + if s.countRunningByAgent(base) >= limit.Total { + return false + } + + // Check per-model limit if configured + if limit.Models != nil { + model := modelVariant(agent) + if model != "" { + if modelLimit, has := limit.Models[model]; has && modelLimit > 0 { + if s.countRunningByModel(agent) >= modelLimit { + return false + } + } + } + } + + return true +} + +// modelVariant extracts the model name from an agent string. +// +// codex:gpt-5.4 → gpt-5.4 +// codex:gpt-5.3-codex-spark → gpt-5.3-codex-spark +// claude → "" +func modelVariant(agent string) string { + parts := core.SplitN(agent, ":", 2) + if len(parts) < 2 { + return "" + } + return parts[1] } // drainQueue fills all available concurrency slots from queued workspaces. diff --git a/pkg/agentic/queue_test.go b/pkg/agentic/queue_test.go index c3a1645..3f54a37 100644 --- a/pkg/agentic/queue_test.go +++ b/pkg/agentic/queue_test.go @@ -26,8 +26,8 @@ func TestDispatchConfig_Good_Defaults(t *testing.T) { cfg := s.loadAgentsConfig() assert.Equal(t, "claude", cfg.Dispatch.DefaultAgent) assert.Equal(t, "coding", cfg.Dispatch.DefaultTemplate) - assert.Equal(t, 1, cfg.Concurrency["claude"]) - assert.Equal(t, 3, cfg.Concurrency["gemini"]) + assert.Equal(t, 1, cfg.Concurrency["claude"].Total) + assert.Equal(t, 3, cfg.Concurrency["gemini"].Total) } func TestCanDispatchAgent_Good_NoConfig(t *testing.T) {