agent/pkg/agentic/queue.go
Snider 53acf4000d feat(concurrency): nested per-model limits under agent pools
Concurrency config now supports both flat and nested formats:

  claude: 1                    # flat — 1 total
  codex:                       # nested — 2 total, per-model caps
    total: 2
    gpt-5.4: 1
    gpt-5.3-codex-spark: 1

canDispatchAgent checks pool total first, then per-model limit.
countRunningByModel added for exact agent string matching.
ConcurrencyLimit custom YAML unmarshaler handles both int and map.

Co-Authored-By: Virgil <virgil@lethean.io>
2026-03-24 13:05:41 +00:00

323 lines
8.6 KiB
Go

// SPDX-License-Identifier: EUPL-1.2
package agentic
import (
"strconv"
"syscall"
"time"
core "dappco.re/go/core"
"gopkg.in/yaml.v3"
)
// DispatchConfig controls agent dispatch behaviour.
//
// cfg := agentic.DispatchConfig{DefaultAgent: "claude", DefaultTemplate: "coding"}
type DispatchConfig struct {
DefaultAgent string `yaml:"default_agent"`
DefaultTemplate string `yaml:"default_template"`
WorkspaceRoot string `yaml:"workspace_root"`
}
// RateConfig controls pacing between task dispatches.
//
// rate := agentic.RateConfig{ResetUTC: "06:00", SustainedDelay: 120, BurstWindow: 2, BurstDelay: 15}
type RateConfig struct {
ResetUTC string `yaml:"reset_utc"` // Daily quota reset time (UTC), e.g. "06:00"
DailyLimit int `yaml:"daily_limit"` // Max requests per day (0 = unknown)
MinDelay int `yaml:"min_delay"` // Minimum seconds between task starts
SustainedDelay int `yaml:"sustained_delay"` // Delay when pacing for full-day use
BurstWindow int `yaml:"burst_window"` // Hours before reset where burst kicks in
BurstDelay int `yaml:"burst_delay"` // Delay during burst window
}
// ConcurrencyLimit supports both flat (int) and nested (map with total + per-model) formats.
//
// claude: 1 → Total=1, Models=nil
// codex: → Total=2, Models={"gpt-5.4": 1, "gpt-5.3-codex-spark": 1}
// total: 2
// gpt-5.4: 1
// gpt-5.3-codex-spark: 1
type ConcurrencyLimit struct {
Total int
Models map[string]int
}
// UnmarshalYAML handles both int and map forms.
func (c *ConcurrencyLimit) UnmarshalYAML(value *yaml.Node) error {
// Try int first
var n int
if err := value.Decode(&n); err == nil {
c.Total = n
return nil
}
// Try map
var m map[string]int
if err := value.Decode(&m); err != nil {
return err
}
c.Total = m["total"]
c.Models = make(map[string]int)
for k, v := range m {
if k != "total" {
c.Models[k] = v
}
}
return nil
}
// AgentsConfig is the root of config/agents.yaml.
//
// cfg := agentic.AgentsConfig{Version: 1, Dispatch: agentic.DispatchConfig{DefaultAgent: "claude"}}
type AgentsConfig struct {
Version int `yaml:"version"`
Dispatch DispatchConfig `yaml:"dispatch"`
Concurrency map[string]ConcurrencyLimit `yaml:"concurrency"`
Rates map[string]RateConfig `yaml:"rates"`
}
// loadAgentsConfig reads config/agents.yaml from the code path.
func (s *PrepSubsystem) loadAgentsConfig() *AgentsConfig {
paths := []string{
core.JoinPath(CoreRoot(), "agents.yaml"),
core.JoinPath(s.codePath, "core", "agent", "config", "agents.yaml"),
}
for _, path := range paths {
r := fs.Read(path)
if !r.OK {
continue
}
var cfg AgentsConfig
if err := yaml.Unmarshal([]byte(r.Value.(string)), &cfg); err != nil {
continue
}
return &cfg
}
return &AgentsConfig{
Dispatch: DispatchConfig{
DefaultAgent: "claude",
DefaultTemplate: "coding",
},
Concurrency: map[string]ConcurrencyLimit{
"claude": {Total: 1},
"gemini": {Total: 3},
},
}
}
// delayForAgent calculates how long to wait before spawning the next task
// for a given agent type, based on rate config and time of day.
func (s *PrepSubsystem) delayForAgent(agent string) time.Duration {
cfg := s.loadAgentsConfig()
// Strip variant suffix (claude:opus → claude) for config lookup
base := baseAgent(agent)
rate, ok := cfg.Rates[base]
if !ok || rate.SustainedDelay == 0 {
return 0
}
// Parse reset time
resetHour, resetMin := 6, 0
parts := core.Split(rate.ResetUTC, ":")
if len(parts) >= 2 {
if hour, err := strconv.Atoi(core.Trim(parts[0])); err == nil {
resetHour = hour
}
if min, err := strconv.Atoi(core.Trim(parts[1])); err == nil {
resetMin = min
}
}
now := time.Now().UTC()
resetToday := time.Date(now.Year(), now.Month(), now.Day(), resetHour, resetMin, 0, 0, time.UTC)
if now.Before(resetToday) {
// Reset hasn't happened yet today — reset was yesterday
resetToday = resetToday.AddDate(0, 0, -1)
}
nextReset := resetToday.AddDate(0, 0, 1)
hoursUntilReset := nextReset.Sub(now).Hours()
// Burst mode: if within burst window of reset, use burst delay
if rate.BurstWindow > 0 && hoursUntilReset <= float64(rate.BurstWindow) {
return time.Duration(rate.BurstDelay) * time.Second
}
// Sustained mode
return time.Duration(rate.SustainedDelay) * time.Second
}
// countRunningByAgent counts running workspaces for a specific agent type.
// Scans both old (*/status.json) and new (*/*/*/status.json) workspace layouts.
func (s *PrepSubsystem) countRunningByAgent(agent string) int {
wsRoot := WorkspaceRoot()
// Scan both old and new workspace layouts
old := core.PathGlob(core.JoinPath(wsRoot, "*", "status.json"))
new := core.PathGlob(core.JoinPath(wsRoot, "*", "*", "*", "status.json"))
paths := append(old, new...)
count := 0
for _, statusPath := range paths {
st, err := ReadStatus(core.PathDir(statusPath))
if err != nil || st.Status != "running" {
continue
}
if baseAgent(st.Agent) != agent {
continue
}
if st.PID > 0 && syscall.Kill(st.PID, 0) == nil {
count++
}
}
return count
}
// countRunningByModel counts running workspaces for a specific agent:model string.
func (s *PrepSubsystem) countRunningByModel(agent string) int {
wsRoot := WorkspaceRoot()
old := core.PathGlob(core.JoinPath(wsRoot, "*", "status.json"))
deep := core.PathGlob(core.JoinPath(wsRoot, "*", "*", "*", "status.json"))
count := 0
for _, statusPath := range append(old, deep...) {
st, err := ReadStatus(core.PathDir(statusPath))
if err != nil || st.Status != "running" {
continue
}
if st.Agent != agent {
continue
}
if st.PID > 0 && syscall.Kill(st.PID, 0) == nil {
count++
}
}
return count
}
// baseAgent strips the model variant (gemini:flash → gemini).
func baseAgent(agent string) string {
// codex:gpt-5.3-codex-spark → codex-spark (separate pool)
if core.Contains(agent, "codex-spark") {
return "codex-spark"
}
return core.SplitN(agent, ":", 2)[0]
}
// canDispatchAgent checks both pool-level and per-model concurrency limits.
//
// codex: {total: 2, models: {gpt-5.4: 1}} → max 2 codex total, max 1 gpt-5.4
func (s *PrepSubsystem) canDispatchAgent(agent string) bool {
cfg := s.loadAgentsConfig()
base := baseAgent(agent)
limit, ok := cfg.Concurrency[base]
if !ok || limit.Total <= 0 {
return true
}
// Check pool total
if s.countRunningByAgent(base) >= limit.Total {
return false
}
// Check per-model limit if configured
if limit.Models != nil {
model := modelVariant(agent)
if model != "" {
if modelLimit, has := limit.Models[model]; has && modelLimit > 0 {
if s.countRunningByModel(agent) >= modelLimit {
return false
}
}
}
}
return true
}
// modelVariant extracts the model name from an agent string.
//
// codex:gpt-5.4 → gpt-5.4
// codex:gpt-5.3-codex-spark → gpt-5.3-codex-spark
// claude → ""
func modelVariant(agent string) string {
parts := core.SplitN(agent, ":", 2)
if len(parts) < 2 {
return ""
}
return parts[1]
}
// drainQueue fills all available concurrency slots from queued workspaces.
// Loops until no slots remain or no queued tasks match. Serialised via drainMu.
func (s *PrepSubsystem) drainQueue() {
if s.frozen {
return
}
s.drainMu.Lock()
defer s.drainMu.Unlock()
for s.drainOne() {
// keep filling slots
}
}
// drainOne finds the oldest queued workspace and spawns it if a slot is available.
// Returns true if a task was spawned, false if nothing to do.
func (s *PrepSubsystem) drainOne() bool {
wsRoot := WorkspaceRoot()
// Scan both old and new workspace layouts
old := core.PathGlob(core.JoinPath(wsRoot, "*", "status.json"))
deep := core.PathGlob(core.JoinPath(wsRoot, "*", "*", "*", "status.json"))
statusFiles := append(old, deep...)
for _, statusPath := range statusFiles {
wsDir := core.PathDir(statusPath)
st, err := ReadStatus(wsDir)
if err != nil || st.Status != "queued" {
continue
}
if !s.canDispatchAgent(st.Agent) {
continue
}
// Skip if agent pool is in rate-limit backoff
pool := baseAgent(st.Agent)
if until, ok := s.backoff[pool]; ok && time.Now().Before(until) {
continue
}
// Apply rate delay before spawning
delay := s.delayForAgent(st.Agent)
if delay > 0 {
time.Sleep(delay)
}
// Re-check concurrency after delay (another task may have started)
if !s.canDispatchAgent(st.Agent) {
continue
}
prompt := "TASK: " + st.Task + "\n\nResume from where you left off. Read CODEX.md for conventions. Commit when done."
pid, _, err := s.spawnAgent(st.Agent, prompt, wsDir)
if err != nil {
continue
}
st.Status = "running"
st.PID = pid
st.Runs++
writeStatus(wsDir, st)
return true
}
return false
}