feat(dispatch): add rate-limit backoff and ticket requeue
When an agent job fails in under 30 seconds (indicating rate limiting or auth rejection), the watcher now: - Requeues the ticket back to the queue instead of marking it done - Applies exponential backoff: 2x, 4x, 8x the base poll interval - Resets backoff to 1x on successful job completion This prevents burning through the entire queue during rate-limited periods. Tested on Clotho (AU) with 15m base → 30m/60m/120m backoff. Co-Authored-By: Charon <charon@lethean.io>
This commit is contained in:
parent
15d9d9483a
commit
f078871617
1 changed files with 86 additions and 21 deletions
|
|
@ -113,13 +113,22 @@ func dispatchRunCmd() *cli.Command {
|
|||
return nil
|
||||
}
|
||||
|
||||
return processTicket(paths, ticketFile)
|
||||
_, err = processTicket(paths, ticketFile)
|
||||
return err
|
||||
},
|
||||
}
|
||||
cmd.Flags().String("work-dir", "", "Working directory (default: ~/ai-work)")
|
||||
return cmd
|
||||
}
|
||||
|
||||
// fastFailThreshold is how quickly a job must fail to be considered rate-limited.
|
||||
// Real work always takes longer than 30 seconds; a 3-second exit means the CLI
|
||||
// was rejected before it could start (rate limit, auth error, etc.).
|
||||
const fastFailThreshold = 30 * time.Second
|
||||
|
||||
// maxBackoffMultiplier caps the exponential backoff at 8x the base interval.
|
||||
const maxBackoffMultiplier = 8
|
||||
|
||||
func dispatchWatchCmd() *cli.Command {
|
||||
cmd := &cli.Command{
|
||||
Use: "watch",
|
||||
|
|
@ -152,15 +161,39 @@ func dispatchWatchCmd() *cli.Command {
|
|||
go heartbeatLoop(ctx, registry, agentID, interval/2)
|
||||
}
|
||||
|
||||
ticker := time.NewTicker(interval)
|
||||
// Backoff state: consecutive fast failures increase the poll delay.
|
||||
backoffMultiplier := 1
|
||||
currentInterval := interval
|
||||
|
||||
ticker := time.NewTicker(currentInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
runCycleWithEvents(paths, registry, events, agentID)
|
||||
adjustTicker := func(fastFail bool) {
|
||||
if fastFail {
|
||||
if backoffMultiplier < maxBackoffMultiplier {
|
||||
backoffMultiplier *= 2
|
||||
}
|
||||
currentInterval = interval * time.Duration(backoffMultiplier)
|
||||
log.Warn("Fast failure detected, backing off",
|
||||
"multiplier", backoffMultiplier, "next_poll", currentInterval)
|
||||
} else {
|
||||
if backoffMultiplier > 1 {
|
||||
log.Info("Job succeeded, resetting backoff")
|
||||
}
|
||||
backoffMultiplier = 1
|
||||
currentInterval = interval
|
||||
}
|
||||
ticker.Reset(currentInterval)
|
||||
}
|
||||
|
||||
fastFail := runCycleWithEvents(paths, registry, events, agentID)
|
||||
adjustTicker(fastFail)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
runCycleWithEvents(paths, registry, events, agentID)
|
||||
ff := runCycleWithEvents(paths, registry, events, agentID)
|
||||
adjustTicker(ff)
|
||||
case <-sigChan:
|
||||
log.Info("Shutting down watcher...")
|
||||
if registry != nil {
|
||||
|
|
@ -243,25 +276,25 @@ func heartbeatLoop(ctx context.Context, registry agentic.AgentRegistry, agentID
|
|||
}
|
||||
|
||||
// runCycleWithEvents wraps runCycle with registry status updates and event emission.
|
||||
func runCycleWithEvents(paths runnerPaths, registry agentic.AgentRegistry, events agentic.EventEmitter, agentID string) {
|
||||
// Returns true if the cycle resulted in a fast failure (likely rate-limited).
|
||||
func runCycleWithEvents(paths runnerPaths, registry agentic.AgentRegistry, events agentic.EventEmitter, agentID string) bool {
|
||||
if registry != nil {
|
||||
// Set busy while processing.
|
||||
if agent, err := registry.Get(agentID); err == nil {
|
||||
agent.Status = agentic.AgentBusy
|
||||
_ = registry.Register(agent)
|
||||
}
|
||||
}
|
||||
|
||||
runCycle(paths)
|
||||
fastFail := runCycle(paths)
|
||||
|
||||
if registry != nil {
|
||||
// Back to available.
|
||||
if agent, err := registry.Get(agentID); err == nil {
|
||||
agent.Status = agentic.AgentAvailable
|
||||
agent.LastHeartbeat = time.Now().UTC()
|
||||
_ = registry.Register(agent)
|
||||
}
|
||||
}
|
||||
return fastFail
|
||||
}
|
||||
|
||||
func dispatchStatusCmd() *cli.Command {
|
||||
|
|
@ -308,61 +341,93 @@ func dispatchStatusCmd() *cli.Command {
|
|||
return cmd
|
||||
}
|
||||
|
||||
func runCycle(paths runnerPaths) {
|
||||
// runCycle picks and processes one ticket. Returns true if the job fast-failed
|
||||
// (likely rate-limited), signalling the caller to back off.
|
||||
func runCycle(paths runnerPaths) bool {
|
||||
if err := acquireLock(paths.lock); err != nil {
|
||||
log.Debug("Runner locked, skipping cycle")
|
||||
return
|
||||
return false
|
||||
}
|
||||
defer releaseLock(paths.lock)
|
||||
|
||||
ticketFile, err := pickOldestTicket(paths.queue)
|
||||
if err != nil {
|
||||
log.Error("Failed to pick ticket", "error", err)
|
||||
return
|
||||
return false
|
||||
}
|
||||
if ticketFile == "" {
|
||||
return
|
||||
return false // empty queue, no backoff needed
|
||||
}
|
||||
|
||||
if err := processTicket(paths, ticketFile); err != nil {
|
||||
start := time.Now()
|
||||
success, err := processTicket(paths, ticketFile)
|
||||
elapsed := time.Since(start)
|
||||
|
||||
if err != nil {
|
||||
log.Error("Failed to process ticket", "file", ticketFile, "error", err)
|
||||
}
|
||||
|
||||
// Detect fast failure: job failed in under 30s → likely rate-limited.
|
||||
if !success && elapsed < fastFailThreshold {
|
||||
log.Warn("Job finished too fast, likely rate-limited",
|
||||
"elapsed", elapsed.Round(time.Second), "file", filepath.Base(ticketFile))
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func processTicket(paths runnerPaths, ticketPath string) error {
|
||||
// processTicket processes a single ticket. Returns (success, error).
|
||||
// On fast failure the caller is responsible for detecting the timing and backing off.
|
||||
// The ticket is moved active→done on completion, or active→queue on fast failure.
|
||||
func processTicket(paths runnerPaths, ticketPath string) (bool, error) {
|
||||
fileName := filepath.Base(ticketPath)
|
||||
log.Info("Processing ticket", "file", fileName)
|
||||
|
||||
activePath := filepath.Join(paths.active, fileName)
|
||||
if err := os.Rename(ticketPath, activePath); err != nil {
|
||||
return fmt.Errorf("failed to move ticket to active: %w", err)
|
||||
return false, fmt.Errorf("failed to move ticket to active: %w", err)
|
||||
}
|
||||
|
||||
data, err := os.ReadFile(activePath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to read ticket: %w", err)
|
||||
return false, fmt.Errorf("failed to read ticket: %w", err)
|
||||
}
|
||||
var t dispatchTicket
|
||||
if err := json.Unmarshal(data, &t); err != nil {
|
||||
return fmt.Errorf("failed to unmarshal ticket: %w", err)
|
||||
return false, fmt.Errorf("failed to unmarshal ticket: %w", err)
|
||||
}
|
||||
|
||||
jobDir := filepath.Join(paths.jobs, fmt.Sprintf("%s-%s-%d", t.RepoOwner, t.RepoName, t.IssueNumber))
|
||||
repoDir := filepath.Join(jobDir, t.RepoName)
|
||||
if err := os.MkdirAll(jobDir, 0755); err != nil {
|
||||
return err
|
||||
return false, err
|
||||
}
|
||||
|
||||
if err := prepareRepo(t, repoDir); err != nil {
|
||||
reportToForge(t, false, fmt.Sprintf("Git setup failed: %v", err))
|
||||
moveToDone(paths, activePath, fileName)
|
||||
return err
|
||||
return false, err
|
||||
}
|
||||
|
||||
prompt := buildPrompt(t)
|
||||
|
||||
logFile := filepath.Join(paths.logs, fmt.Sprintf("%s-%s-%d.log", t.RepoOwner, t.RepoName, t.IssueNumber))
|
||||
start := time.Now()
|
||||
success, exitCode, runErr := runAgent(t, prompt, repoDir, logFile)
|
||||
elapsed := time.Since(start)
|
||||
|
||||
// Fast failure: agent exited in <30s without success → likely rate-limited.
|
||||
// Requeue the ticket so it's retried after the backoff period.
|
||||
if !success && elapsed < fastFailThreshold {
|
||||
log.Warn("Agent rejected fast, requeuing ticket", "elapsed", elapsed.Round(time.Second), "file", fileName)
|
||||
requeuePath := filepath.Join(paths.queue, fileName)
|
||||
if err := os.Rename(activePath, requeuePath); err != nil {
|
||||
// Fallback: move to done if requeue fails.
|
||||
moveToDone(paths, activePath, fileName)
|
||||
}
|
||||
return false, runErr
|
||||
}
|
||||
|
||||
msg := fmt.Sprintf("Agent completed work on #%d. Exit code: %d.", t.IssueNumber, exitCode)
|
||||
if !success {
|
||||
|
|
@ -374,8 +439,8 @@ func processTicket(paths runnerPaths, ticketPath string) error {
|
|||
reportToForge(t, success, msg)
|
||||
|
||||
moveToDone(paths, activePath, fileName)
|
||||
log.Info("Ticket complete", "id", t.ID, "success", success)
|
||||
return nil
|
||||
log.Info("Ticket complete", "id", t.ID, "success", success, "elapsed", elapsed.Round(time.Second))
|
||||
return success, nil
|
||||
}
|
||||
|
||||
func prepareRepo(t dispatchTicket, repoDir string) error {
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue