diff --git a/pkg/mcp/agentic/queue.go b/pkg/mcp/agentic/queue.go index 22aef9f..e538db0 100644 --- a/pkg/mcp/agentic/queue.go +++ b/pkg/mcp/agentic/queue.go @@ -8,6 +8,7 @@ import ( "os/exec" "path/filepath" "syscall" + "time" "gopkg.in/yaml.v3" ) @@ -19,11 +20,22 @@ type DispatchConfig struct { WorkspaceRoot string `yaml:"workspace_root"` } +// RateConfig controls pacing between task dispatches. +type RateConfig struct { + ResetUTC string `yaml:"reset_utc"` // Daily quota reset time (UTC), e.g. "06:00" + DailyLimit int `yaml:"daily_limit"` // Max requests per day (0 = unknown) + MinDelay int `yaml:"min_delay"` // Minimum seconds between task starts + SustainedDelay int `yaml:"sustained_delay"` // Delay when pacing for full-day use + BurstWindow int `yaml:"burst_window"` // Hours before reset where burst kicks in + BurstDelay int `yaml:"burst_delay"` // Delay during burst window +} + // AgentsConfig is the root of config/agents.yaml. type AgentsConfig struct { - Version int `yaml:"version"` - Dispatch DispatchConfig `yaml:"dispatch"` - Concurrency map[string]int `yaml:"concurrency"` // per-agent type limits + Version int `yaml:"version"` + Dispatch DispatchConfig `yaml:"dispatch"` + Concurrency map[string]int `yaml:"concurrency"` + Rates map[string]RateConfig `yaml:"rates"` } // loadAgentsConfig reads config/agents.yaml from the code path. @@ -46,7 +58,6 @@ func (s *PrepSubsystem) loadAgentsConfig() *AgentsConfig { return &cfg } - // Defaults: 1 claude, 3 gemini return &AgentsConfig{ Dispatch: DispatchConfig{ DefaultAgent: "claude", @@ -59,6 +70,37 @@ func (s *PrepSubsystem) loadAgentsConfig() *AgentsConfig { } } +// delayForAgent calculates how long to wait before spawning the next task +// for a given agent type, based on rate config and time of day. +func (s *PrepSubsystem) delayForAgent(agent string) time.Duration { + cfg := s.loadAgentsConfig() + rate, ok := cfg.Rates[agent] + if !ok || rate.SustainedDelay == 0 { + return 0 + } + + // Parse reset time + resetHour, resetMin := 6, 0 + fmt.Sscanf(rate.ResetUTC, "%d:%d", &resetHour, &resetMin) + + now := time.Now().UTC() + resetToday := time.Date(now.Year(), now.Month(), now.Day(), resetHour, resetMin, 0, 0, time.UTC) + if now.Before(resetToday) { + // Reset hasn't happened yet today — reset was yesterday + resetToday = resetToday.AddDate(0, 0, -1) + } + nextReset := resetToday.AddDate(0, 0, 1) + hoursUntilReset := nextReset.Sub(now).Hours() + + // Burst mode: if within burst window of reset, use burst delay + if rate.BurstWindow > 0 && hoursUntilReset <= float64(rate.BurstWindow) { + return time.Duration(rate.BurstDelay) * time.Second + } + + // Sustained mode + return time.Duration(rate.SustainedDelay) * time.Second +} + // countRunningByAgent counts running workspaces for a specific agent type. func (s *PrepSubsystem) countRunningByAgent(agent string) int { home, _ := os.UserHomeDir() @@ -80,7 +122,6 @@ func (s *PrepSubsystem) countRunningByAgent(agent string) int { continue } - // Verify PID is actually alive if st.PID > 0 { proc, err := os.FindProcess(st.PID) if err == nil && proc.Signal(syscall.Signal(0)) == nil { @@ -97,22 +138,18 @@ func (s *PrepSubsystem) canDispatchAgent(agent string) bool { cfg := s.loadAgentsConfig() limit, ok := cfg.Concurrency[agent] if !ok || limit <= 0 { - return true // no limit set or unlimited + return true } return s.countRunningByAgent(agent) < limit } -// canDispatch checks the legacy global limit (backwards compat). +// canDispatch is kept for backwards compat. func (s *PrepSubsystem) canDispatch() bool { - return true // per-agent limits handle this now -} - -// canDispatchFor checks per-agent concurrency. -func (s *PrepSubsystem) canDispatchFor(agent string) bool { - return s.canDispatchAgent(agent) + return true } // drainQueue finds the oldest queued workspace and spawns it if a slot is available. +// Applies rate-based delay between spawns. func (s *PrepSubsystem) drainQueue() { home, _ := os.UserHomeDir() wsRoot := filepath.Join(home, "Code", "host-uk", "core", ".core", "workspace") @@ -122,7 +159,6 @@ func (s *PrepSubsystem) drainQueue() { return } - // Find oldest queued workspace that has a free slot for its agent type for _, entry := range entries { if !entry.IsDir() { continue @@ -134,12 +170,21 @@ func (s *PrepSubsystem) drainQueue() { continue } - // Check per-agent limit if !s.canDispatchAgent(st.Agent) { continue } - // Found a queued workspace with a free slot — spawn it + // Apply rate delay before spawning + delay := s.delayForAgent(st.Agent) + if delay > 0 { + time.Sleep(delay) + } + + // Re-check concurrency after delay (another task may have started) + if !s.canDispatchAgent(st.Agent) { + continue + } + srcDir := filepath.Join(wsDir, "src") prompt := "Read PROMPT.md for instructions. All context files (CLAUDE.md, TODO.md, CONTEXT.md, CONSUMERS.md, RECENT.md) are in the parent directory. Work in this directory." @@ -167,13 +212,11 @@ func (s *PrepSubsystem) drainQueue() { continue } - // Update status to running st.Status = "running" st.PID = cmd.Process.Pid st.Runs++ writeStatus(wsDir, st) - // Monitor this one too go func() { cmd.Wait() outFile.Close() @@ -184,10 +227,9 @@ func (s *PrepSubsystem) drainQueue() { writeStatus(wsDir, st2) } - // Recursively drain — pick up next queued item s.drainQueue() }() - return // Only spawn one at a time per drain call + return } }