feat(agentic): enforce queue rate limits

Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
Virgil 2026-04-01 19:14:54 +00:00
parent e6593913f8
commit c58b9acb27
2 changed files with 161 additions and 2 deletions

View file

@ -134,11 +134,17 @@ func (s *PrepSubsystem) delayForAgent(agent string) time.Duration {
nextReset := resetToday.AddDate(0, 0, 1)
hoursUntilReset := nextReset.Sub(now).Hours()
delay := time.Duration(rate.SustainedDelay) * time.Second
if rate.BurstWindow > 0 && hoursUntilReset <= float64(rate.BurstWindow) {
return time.Duration(rate.BurstDelay) * time.Second
delay = time.Duration(rate.BurstDelay) * time.Second
}
return time.Duration(rate.SustainedDelay) * time.Second
minDelay := time.Duration(rate.MinDelay) * time.Second
if minDelay > delay {
delay = minDelay
}
return delay
}
// n := s.countRunningByAgent("codex")
@ -237,6 +243,14 @@ func (s *PrepSubsystem) canDispatchAgent(agent string) bool {
base := baseAgent(agent)
limit, ok := concurrency[base]
if !ok || limit.Total <= 0 {
if blocked, until := s.dailyRateLimitBackoff(agent); blocked {
if s.backoff == nil {
s.backoff = make(map[string]time.Time)
}
s.backoff[baseAgent(agent)] = until
s.persistRuntimeState()
return false
}
return true
}
@ -255,9 +269,94 @@ func (s *PrepSubsystem) canDispatchAgent(agent string) bool {
}
}
if blocked, until := s.dailyRateLimitBackoff(agent); blocked {
if s.backoff == nil {
s.backoff = make(map[string]time.Time)
}
s.backoff[base] = until
s.persistRuntimeState()
return false
}
return true
}
func (s *PrepSubsystem) dailyRateLimitBackoff(agent string) (bool, time.Time) {
rates := s.loadAgentsConfig().Rates
rate, ok := rates[baseAgent(agent)]
if !ok || rate.DailyLimit <= 0 {
return false, time.Time{}
}
if s.dailyDispatchCount(agent) < rate.DailyLimit {
return false, time.Time{}
}
resetHour, resetMin := 6, 0
parts := core.Split(rate.ResetUTC, ":")
if len(parts) >= 2 {
if hour, err := strconv.Atoi(core.Trim(parts[0])); err == nil {
resetHour = hour
}
if min, err := strconv.Atoi(core.Trim(parts[1])); err == nil {
resetMin = min
}
}
now := time.Now().UTC()
resetToday := time.Date(now.Year(), now.Month(), now.Day(), resetHour, resetMin, 0, 0, time.UTC)
if now.Before(resetToday) {
resetToday = resetToday.AddDate(0, 0, -1)
}
nextReset := resetToday.AddDate(0, 0, 1)
if nextReset.Before(now) {
nextReset = now
}
return true, nextReset
}
func (s *PrepSubsystem) dailyDispatchCount(agent string) int {
eventsPath := core.JoinPath(WorkspaceRoot(), "events.jsonl")
result := fs.Read(eventsPath)
if !result.OK {
return 0
}
targetDay := time.Now().UTC().Format("2006-01-02")
base := baseAgent(agent)
count := 0
for _, line := range core.Split(result.Value.(string), "\n") {
line = core.Trim(line)
if line == "" {
continue
}
var event CompletionEvent
if parseResult := core.JSONUnmarshalString(line, &event); !parseResult.OK {
continue
}
if event.Type != "agent_started" {
continue
}
if baseAgent(event.Agent) != base {
continue
}
timestamp, err := time.Parse(time.RFC3339, event.Timestamp)
if err != nil {
continue
}
if timestamp.UTC().Format("2006-01-02") != targetDay {
continue
}
count++
}
return count
}
// model := modelVariant("codex:gpt-5.4")
// core.Println(model) // "gpt-5.4"
func modelVariant(agent string) string {

View file

@ -122,6 +122,66 @@ rates:
"expected 120s or 15s, got %v", d)
}
func TestQueue_DelayForAgent_Good_MinDelayFloor(t *testing.T) {
root := t.TempDir()
t.Setenv("CORE_WORKSPACE", root)
cfg := `version: 1
rates:
codex:
reset_utc: "06:00"
min_delay: 90
sustained_delay: 30
burst_window: 0
burst_delay: 0`
require.True(t, fs.Write(core.JoinPath(root, "agents.yaml"), cfg).OK)
s := &PrepSubsystem{
ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}),
codePath: t.TempDir(),
backoff: make(map[string]time.Time),
failCount: make(map[string]int),
}
d := s.delayForAgent("codex:gpt-5.4")
assert.Equal(t, 90*time.Second, d)
}
func TestQueue_CanDispatchAgent_Bad_DailyLimitReached(t *testing.T) {
root := t.TempDir()
t.Setenv("CORE_WORKSPACE", root)
require.True(t, fs.EnsureDir(core.JoinPath(root, "workspace")).OK)
cfg := `version: 1
rates:
codex:
reset_utc: "06:00"
daily_limit: 2
sustained_delay: 30`
require.True(t, fs.Write(core.JoinPath(root, "agents.yaml"), cfg).OK)
now := time.Now().UTC().Format(time.RFC3339)
eventsPath := core.JoinPath(root, "workspace", "events.jsonl")
require.True(t, fs.Write(eventsPath, core.Concat(
core.JSONMarshalString(CompletionEvent{Type: "agent_started", Agent: "codex:gpt-5.4", Timestamp: now}),
"\n",
core.JSONMarshalString(CompletionEvent{Type: "agent_started", Agent: "codex:gpt-5.4", Timestamp: now}),
"\n",
)).OK)
s := &PrepSubsystem{
ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}),
codePath: t.TempDir(),
backoff: make(map[string]time.Time),
failCount: make(map[string]int),
}
assert.False(t, s.canDispatchAgent("codex:gpt-5.4"))
until, ok := s.backoff["codex"]
require.True(t, ok)
assert.True(t, until.After(time.Now()))
}
// --- countRunningByModel ---
func TestQueue_CountRunningByModel_Good_NoWorkspaces(t *testing.T) {