diff --git a/pkg/agentic/queue.go b/pkg/agentic/queue.go index dc3db1d..61ae731 100644 --- a/pkg/agentic/queue.go +++ b/pkg/agentic/queue.go @@ -134,11 +134,17 @@ func (s *PrepSubsystem) delayForAgent(agent string) time.Duration { nextReset := resetToday.AddDate(0, 0, 1) hoursUntilReset := nextReset.Sub(now).Hours() + delay := time.Duration(rate.SustainedDelay) * time.Second if rate.BurstWindow > 0 && hoursUntilReset <= float64(rate.BurstWindow) { - return time.Duration(rate.BurstDelay) * time.Second + delay = time.Duration(rate.BurstDelay) * time.Second } - return time.Duration(rate.SustainedDelay) * time.Second + minDelay := time.Duration(rate.MinDelay) * time.Second + if minDelay > delay { + delay = minDelay + } + + return delay } // n := s.countRunningByAgent("codex") @@ -237,6 +243,14 @@ func (s *PrepSubsystem) canDispatchAgent(agent string) bool { base := baseAgent(agent) limit, ok := concurrency[base] if !ok || limit.Total <= 0 { + if blocked, until := s.dailyRateLimitBackoff(agent); blocked { + if s.backoff == nil { + s.backoff = make(map[string]time.Time) + } + s.backoff[baseAgent(agent)] = until + s.persistRuntimeState() + return false + } return true } @@ -255,9 +269,94 @@ func (s *PrepSubsystem) canDispatchAgent(agent string) bool { } } + if blocked, until := s.dailyRateLimitBackoff(agent); blocked { + if s.backoff == nil { + s.backoff = make(map[string]time.Time) + } + s.backoff[base] = until + s.persistRuntimeState() + return false + } + return true } +func (s *PrepSubsystem) dailyRateLimitBackoff(agent string) (bool, time.Time) { + rates := s.loadAgentsConfig().Rates + rate, ok := rates[baseAgent(agent)] + if !ok || rate.DailyLimit <= 0 { + return false, time.Time{} + } + + if s.dailyDispatchCount(agent) < rate.DailyLimit { + return false, time.Time{} + } + + resetHour, resetMin := 6, 0 + parts := core.Split(rate.ResetUTC, ":") + if len(parts) >= 2 { + if hour, err := strconv.Atoi(core.Trim(parts[0])); err == nil { + resetHour = hour + } + if min, err := strconv.Atoi(core.Trim(parts[1])); err == nil { + resetMin = min + } + } + + now := time.Now().UTC() + resetToday := time.Date(now.Year(), now.Month(), now.Day(), resetHour, resetMin, 0, 0, time.UTC) + if now.Before(resetToday) { + resetToday = resetToday.AddDate(0, 0, -1) + } + nextReset := resetToday.AddDate(0, 0, 1) + if nextReset.Before(now) { + nextReset = now + } + return true, nextReset +} + +func (s *PrepSubsystem) dailyDispatchCount(agent string) int { + eventsPath := core.JoinPath(WorkspaceRoot(), "events.jsonl") + result := fs.Read(eventsPath) + if !result.OK { + return 0 + } + + targetDay := time.Now().UTC().Format("2006-01-02") + base := baseAgent(agent) + count := 0 + + for _, line := range core.Split(result.Value.(string), "\n") { + line = core.Trim(line) + if line == "" { + continue + } + + var event CompletionEvent + if parseResult := core.JSONUnmarshalString(line, &event); !parseResult.OK { + continue + } + if event.Type != "agent_started" { + continue + } + if baseAgent(event.Agent) != base { + continue + } + + timestamp, err := time.Parse(time.RFC3339, event.Timestamp) + if err != nil { + continue + } + if timestamp.UTC().Format("2006-01-02") != targetDay { + continue + } + + count++ + } + + return count +} + // model := modelVariant("codex:gpt-5.4") // core.Println(model) // "gpt-5.4" func modelVariant(agent string) string { diff --git a/pkg/agentic/queue_extra_test.go b/pkg/agentic/queue_extra_test.go index 774a35b..b57044c 100644 --- a/pkg/agentic/queue_extra_test.go +++ b/pkg/agentic/queue_extra_test.go @@ -122,6 +122,66 @@ rates: "expected 120s or 15s, got %v", d) } +func TestQueue_DelayForAgent_Good_MinDelayFloor(t *testing.T) { + root := t.TempDir() + t.Setenv("CORE_WORKSPACE", root) + + cfg := `version: 1 +rates: + codex: + reset_utc: "06:00" + min_delay: 90 + sustained_delay: 30 + burst_window: 0 + burst_delay: 0` + require.True(t, fs.Write(core.JoinPath(root, "agents.yaml"), cfg).OK) + + s := &PrepSubsystem{ + ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}), + codePath: t.TempDir(), + backoff: make(map[string]time.Time), + failCount: make(map[string]int), + } + + d := s.delayForAgent("codex:gpt-5.4") + assert.Equal(t, 90*time.Second, d) +} + +func TestQueue_CanDispatchAgent_Bad_DailyLimitReached(t *testing.T) { + root := t.TempDir() + t.Setenv("CORE_WORKSPACE", root) + require.True(t, fs.EnsureDir(core.JoinPath(root, "workspace")).OK) + + cfg := `version: 1 +rates: + codex: + reset_utc: "06:00" + daily_limit: 2 + sustained_delay: 30` + require.True(t, fs.Write(core.JoinPath(root, "agents.yaml"), cfg).OK) + + now := time.Now().UTC().Format(time.RFC3339) + eventsPath := core.JoinPath(root, "workspace", "events.jsonl") + require.True(t, fs.Write(eventsPath, core.Concat( + core.JSONMarshalString(CompletionEvent{Type: "agent_started", Agent: "codex:gpt-5.4", Timestamp: now}), + "\n", + core.JSONMarshalString(CompletionEvent{Type: "agent_started", Agent: "codex:gpt-5.4", Timestamp: now}), + "\n", + )).OK) + + s := &PrepSubsystem{ + ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}), + codePath: t.TempDir(), + backoff: make(map[string]time.Time), + failCount: make(map[string]int), + } + + assert.False(t, s.canDispatchAgent("codex:gpt-5.4")) + until, ok := s.backoff["codex"] + require.True(t, ok) + assert.True(t, until.After(time.Now())) +} + // --- countRunningByModel --- func TestQueue_CountRunningByModel_Good_NoWorkspaces(t *testing.T) {