feat(agentic): rate-aware task scheduling

Reads rates config from agents.yaml per agent type. Calculates delay
based on time-of-day relative to quota reset window:
- Sustained mode: steady pacing across the full day
- Burst mode: faster pacing when close to reset (quota about to refill)

Gemini resets 06:00 UTC. Start at 09:00 = pace slower (sustained_delay).
Start at 03:00 = burst mode (burst_delay) since reset is 3 hours away.

Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
Snider 2026-03-15 17:55:55 +00:00
parent a5ebea1770
commit 02f0afd1ea

View file

@ -8,6 +8,7 @@ import (
"os/exec"
"path/filepath"
"syscall"
"time"
"gopkg.in/yaml.v3"
)
@ -19,11 +20,22 @@ type DispatchConfig struct {
WorkspaceRoot string `yaml:"workspace_root"`
}
// RateConfig controls pacing between task dispatches.
type RateConfig struct {
ResetUTC string `yaml:"reset_utc"` // Daily quota reset time (UTC), e.g. "06:00"
DailyLimit int `yaml:"daily_limit"` // Max requests per day (0 = unknown)
MinDelay int `yaml:"min_delay"` // Minimum seconds between task starts
SustainedDelay int `yaml:"sustained_delay"` // Delay when pacing for full-day use
BurstWindow int `yaml:"burst_window"` // Hours before reset where burst kicks in
BurstDelay int `yaml:"burst_delay"` // Delay during burst window
}
// AgentsConfig is the root of config/agents.yaml.
type AgentsConfig struct {
Version int `yaml:"version"`
Dispatch DispatchConfig `yaml:"dispatch"`
Concurrency map[string]int `yaml:"concurrency"` // per-agent type limits
Version int `yaml:"version"`
Dispatch DispatchConfig `yaml:"dispatch"`
Concurrency map[string]int `yaml:"concurrency"`
Rates map[string]RateConfig `yaml:"rates"`
}
// loadAgentsConfig reads config/agents.yaml from the code path.
@ -46,7 +58,6 @@ func (s *PrepSubsystem) loadAgentsConfig() *AgentsConfig {
return &cfg
}
// Defaults: 1 claude, 3 gemini
return &AgentsConfig{
Dispatch: DispatchConfig{
DefaultAgent: "claude",
@ -59,6 +70,37 @@ func (s *PrepSubsystem) loadAgentsConfig() *AgentsConfig {
}
}
// delayForAgent calculates how long to wait before spawning the next task
// for a given agent type, based on rate config and time of day.
func (s *PrepSubsystem) delayForAgent(agent string) time.Duration {
cfg := s.loadAgentsConfig()
rate, ok := cfg.Rates[agent]
if !ok || rate.SustainedDelay == 0 {
return 0
}
// Parse reset time
resetHour, resetMin := 6, 0
fmt.Sscanf(rate.ResetUTC, "%d:%d", &resetHour, &resetMin)
now := time.Now().UTC()
resetToday := time.Date(now.Year(), now.Month(), now.Day(), resetHour, resetMin, 0, 0, time.UTC)
if now.Before(resetToday) {
// Reset hasn't happened yet today — reset was yesterday
resetToday = resetToday.AddDate(0, 0, -1)
}
nextReset := resetToday.AddDate(0, 0, 1)
hoursUntilReset := nextReset.Sub(now).Hours()
// Burst mode: if within burst window of reset, use burst delay
if rate.BurstWindow > 0 && hoursUntilReset <= float64(rate.BurstWindow) {
return time.Duration(rate.BurstDelay) * time.Second
}
// Sustained mode
return time.Duration(rate.SustainedDelay) * time.Second
}
// countRunningByAgent counts running workspaces for a specific agent type.
func (s *PrepSubsystem) countRunningByAgent(agent string) int {
home, _ := os.UserHomeDir()
@ -80,7 +122,6 @@ func (s *PrepSubsystem) countRunningByAgent(agent string) int {
continue
}
// Verify PID is actually alive
if st.PID > 0 {
proc, err := os.FindProcess(st.PID)
if err == nil && proc.Signal(syscall.Signal(0)) == nil {
@ -97,22 +138,18 @@ func (s *PrepSubsystem) canDispatchAgent(agent string) bool {
cfg := s.loadAgentsConfig()
limit, ok := cfg.Concurrency[agent]
if !ok || limit <= 0 {
return true // no limit set or unlimited
return true
}
return s.countRunningByAgent(agent) < limit
}
// canDispatch checks the legacy global limit (backwards compat).
// canDispatch is kept for backwards compat.
func (s *PrepSubsystem) canDispatch() bool {
return true // per-agent limits handle this now
}
// canDispatchFor checks per-agent concurrency.
func (s *PrepSubsystem) canDispatchFor(agent string) bool {
return s.canDispatchAgent(agent)
return true
}
// drainQueue finds the oldest queued workspace and spawns it if a slot is available.
// Applies rate-based delay between spawns.
func (s *PrepSubsystem) drainQueue() {
home, _ := os.UserHomeDir()
wsRoot := filepath.Join(home, "Code", "host-uk", "core", ".core", "workspace")
@ -122,7 +159,6 @@ func (s *PrepSubsystem) drainQueue() {
return
}
// Find oldest queued workspace that has a free slot for its agent type
for _, entry := range entries {
if !entry.IsDir() {
continue
@ -134,12 +170,21 @@ func (s *PrepSubsystem) drainQueue() {
continue
}
// Check per-agent limit
if !s.canDispatchAgent(st.Agent) {
continue
}
// Found a queued workspace with a free slot — spawn it
// Apply rate delay before spawning
delay := s.delayForAgent(st.Agent)
if delay > 0 {
time.Sleep(delay)
}
// Re-check concurrency after delay (another task may have started)
if !s.canDispatchAgent(st.Agent) {
continue
}
srcDir := filepath.Join(wsDir, "src")
prompt := "Read PROMPT.md for instructions. All context files (CLAUDE.md, TODO.md, CONTEXT.md, CONSUMERS.md, RECENT.md) are in the parent directory. Work in this directory."
@ -167,13 +212,11 @@ func (s *PrepSubsystem) drainQueue() {
continue
}
// Update status to running
st.Status = "running"
st.PID = cmd.Process.Pid
st.Runs++
writeStatus(wsDir, st)
// Monitor this one too
go func() {
cmd.Wait()
outFile.Close()
@ -184,10 +227,9 @@ func (s *PrepSubsystem) drainQueue() {
writeStatus(wsDir, st2)
}
// Recursively drain — pick up next queued item
s.drainQueue()
}()
return // Only spawn one at a time per drain call
return
}
}