From f0788716173989c24c784b3783b6697160c90e25 Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 20 Feb 2026 13:59:30 +0000 Subject: [PATCH] feat(dispatch): add rate-limit backoff and ticket requeue MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When an agent job fails in under 30 seconds (indicating rate limiting or auth rejection), the watcher now: - Requeues the ticket back to the queue instead of marking it done - Applies exponential backoff: 2x, 4x, 8x the base poll interval - Resets backoff to 1x on successful job completion This prevents burning through the entire queue during rate-limited periods. Tested on Clotho (AU) with 15m base → 30m/60m/120m backoff. Co-Authored-By: Charon --- cmd/ai/cmd_dispatch.go | 107 +++++++++++++++++++++++++++++++++-------- 1 file changed, 86 insertions(+), 21 deletions(-) diff --git a/cmd/ai/cmd_dispatch.go b/cmd/ai/cmd_dispatch.go index 45c9db7c..2c817d0d 100644 --- a/cmd/ai/cmd_dispatch.go +++ b/cmd/ai/cmd_dispatch.go @@ -113,13 +113,22 @@ func dispatchRunCmd() *cli.Command { return nil } - return processTicket(paths, ticketFile) + _, err = processTicket(paths, ticketFile) + return err }, } cmd.Flags().String("work-dir", "", "Working directory (default: ~/ai-work)") return cmd } +// fastFailThreshold is how quickly a job must fail to be considered rate-limited. +// Real work always takes longer than 30 seconds; a 3-second exit means the CLI +// was rejected before it could start (rate limit, auth error, etc.). +const fastFailThreshold = 30 * time.Second + +// maxBackoffMultiplier caps the exponential backoff at 8x the base interval. +const maxBackoffMultiplier = 8 + func dispatchWatchCmd() *cli.Command { cmd := &cli.Command{ Use: "watch", @@ -152,15 +161,39 @@ func dispatchWatchCmd() *cli.Command { go heartbeatLoop(ctx, registry, agentID, interval/2) } - ticker := time.NewTicker(interval) + // Backoff state: consecutive fast failures increase the poll delay. + backoffMultiplier := 1 + currentInterval := interval + + ticker := time.NewTicker(currentInterval) defer ticker.Stop() - runCycleWithEvents(paths, registry, events, agentID) + adjustTicker := func(fastFail bool) { + if fastFail { + if backoffMultiplier < maxBackoffMultiplier { + backoffMultiplier *= 2 + } + currentInterval = interval * time.Duration(backoffMultiplier) + log.Warn("Fast failure detected, backing off", + "multiplier", backoffMultiplier, "next_poll", currentInterval) + } else { + if backoffMultiplier > 1 { + log.Info("Job succeeded, resetting backoff") + } + backoffMultiplier = 1 + currentInterval = interval + } + ticker.Reset(currentInterval) + } + + fastFail := runCycleWithEvents(paths, registry, events, agentID) + adjustTicker(fastFail) for { select { case <-ticker.C: - runCycleWithEvents(paths, registry, events, agentID) + ff := runCycleWithEvents(paths, registry, events, agentID) + adjustTicker(ff) case <-sigChan: log.Info("Shutting down watcher...") if registry != nil { @@ -243,25 +276,25 @@ func heartbeatLoop(ctx context.Context, registry agentic.AgentRegistry, agentID } // runCycleWithEvents wraps runCycle with registry status updates and event emission. -func runCycleWithEvents(paths runnerPaths, registry agentic.AgentRegistry, events agentic.EventEmitter, agentID string) { +// Returns true if the cycle resulted in a fast failure (likely rate-limited). +func runCycleWithEvents(paths runnerPaths, registry agentic.AgentRegistry, events agentic.EventEmitter, agentID string) bool { if registry != nil { - // Set busy while processing. if agent, err := registry.Get(agentID); err == nil { agent.Status = agentic.AgentBusy _ = registry.Register(agent) } } - runCycle(paths) + fastFail := runCycle(paths) if registry != nil { - // Back to available. if agent, err := registry.Get(agentID); err == nil { agent.Status = agentic.AgentAvailable agent.LastHeartbeat = time.Now().UTC() _ = registry.Register(agent) } } + return fastFail } func dispatchStatusCmd() *cli.Command { @@ -308,61 +341,93 @@ func dispatchStatusCmd() *cli.Command { return cmd } -func runCycle(paths runnerPaths) { +// runCycle picks and processes one ticket. Returns true if the job fast-failed +// (likely rate-limited), signalling the caller to back off. +func runCycle(paths runnerPaths) bool { if err := acquireLock(paths.lock); err != nil { log.Debug("Runner locked, skipping cycle") - return + return false } defer releaseLock(paths.lock) ticketFile, err := pickOldestTicket(paths.queue) if err != nil { log.Error("Failed to pick ticket", "error", err) - return + return false } if ticketFile == "" { - return + return false // empty queue, no backoff needed } - if err := processTicket(paths, ticketFile); err != nil { + start := time.Now() + success, err := processTicket(paths, ticketFile) + elapsed := time.Since(start) + + if err != nil { log.Error("Failed to process ticket", "file", ticketFile, "error", err) } + + // Detect fast failure: job failed in under 30s → likely rate-limited. + if !success && elapsed < fastFailThreshold { + log.Warn("Job finished too fast, likely rate-limited", + "elapsed", elapsed.Round(time.Second), "file", filepath.Base(ticketFile)) + return true + } + + return false } -func processTicket(paths runnerPaths, ticketPath string) error { +// processTicket processes a single ticket. Returns (success, error). +// On fast failure the caller is responsible for detecting the timing and backing off. +// The ticket is moved active→done on completion, or active→queue on fast failure. +func processTicket(paths runnerPaths, ticketPath string) (bool, error) { fileName := filepath.Base(ticketPath) log.Info("Processing ticket", "file", fileName) activePath := filepath.Join(paths.active, fileName) if err := os.Rename(ticketPath, activePath); err != nil { - return fmt.Errorf("failed to move ticket to active: %w", err) + return false, fmt.Errorf("failed to move ticket to active: %w", err) } data, err := os.ReadFile(activePath) if err != nil { - return fmt.Errorf("failed to read ticket: %w", err) + return false, fmt.Errorf("failed to read ticket: %w", err) } var t dispatchTicket if err := json.Unmarshal(data, &t); err != nil { - return fmt.Errorf("failed to unmarshal ticket: %w", err) + return false, fmt.Errorf("failed to unmarshal ticket: %w", err) } jobDir := filepath.Join(paths.jobs, fmt.Sprintf("%s-%s-%d", t.RepoOwner, t.RepoName, t.IssueNumber)) repoDir := filepath.Join(jobDir, t.RepoName) if err := os.MkdirAll(jobDir, 0755); err != nil { - return err + return false, err } if err := prepareRepo(t, repoDir); err != nil { reportToForge(t, false, fmt.Sprintf("Git setup failed: %v", err)) moveToDone(paths, activePath, fileName) - return err + return false, err } prompt := buildPrompt(t) logFile := filepath.Join(paths.logs, fmt.Sprintf("%s-%s-%d.log", t.RepoOwner, t.RepoName, t.IssueNumber)) + start := time.Now() success, exitCode, runErr := runAgent(t, prompt, repoDir, logFile) + elapsed := time.Since(start) + + // Fast failure: agent exited in <30s without success → likely rate-limited. + // Requeue the ticket so it's retried after the backoff period. + if !success && elapsed < fastFailThreshold { + log.Warn("Agent rejected fast, requeuing ticket", "elapsed", elapsed.Round(time.Second), "file", fileName) + requeuePath := filepath.Join(paths.queue, fileName) + if err := os.Rename(activePath, requeuePath); err != nil { + // Fallback: move to done if requeue fails. + moveToDone(paths, activePath, fileName) + } + return false, runErr + } msg := fmt.Sprintf("Agent completed work on #%d. Exit code: %d.", t.IssueNumber, exitCode) if !success { @@ -374,8 +439,8 @@ func processTicket(paths runnerPaths, ticketPath string) error { reportToForge(t, success, msg) moveToDone(paths, activePath, fileName) - log.Info("Ticket complete", "id", t.ID, "success", success) - return nil + log.Info("Ticket complete", "id", t.ID, "success", success, "elapsed", elapsed.Round(time.Second)) + return success, nil } func prepareRepo(t dispatchTicket, repoDir string) error {