From ba8c19d32f6de664d396b535cc19cd7154f2441b Mon Sep 17 00:00:00 2001 From: Snider Date: Fri, 20 Feb 2026 11:47:47 +0000 Subject: [PATCH] =?UTF-8?q?feat(dispatch):=20Phase=207=20=E2=80=94=20prior?= =?UTF-8?q?ity-ordered=20dispatch=20with=20retry=20backoff=20and=20dead-le?= =?UTF-8?q?tter?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Sort pending tasks by priority (Critical > High > Medium > Low) with oldest-first tie-breaking before dispatch. Add exponential backoff for failed dispatches (5s, 10s, 20s...) and dead-letter tasks that exceed MaxRetries (default 3) by marking them StatusFailed via UpdateTask. New Task fields: MaxRetries, RetryCount, LastAttempt, FailReason. New constant: StatusFailed. Co-Authored-By: Virgil Co-Authored-By: Claude Opus 4.6 --- dispatcher.go | 109 +++++++++++++++- dispatcher_test.go | 305 +++++++++++++++++++++++++++++++++++++++++++++ types.go | 10 ++ 3 files changed, 420 insertions(+), 4 deletions(-) diff --git a/dispatcher.go b/dispatcher.go index 01d0f78..5a1a95b 100644 --- a/dispatcher.go +++ b/dispatcher.go @@ -2,11 +2,19 @@ package agentic import ( "context" + "sort" "time" "forge.lthn.ai/core/go/pkg/log" ) +const ( + // DefaultMaxRetries is the default number of dispatch attempts before dead-lettering. + DefaultMaxRetries = 3 + // baseBackoff is the base duration for exponential backoff between retries. + baseBackoff = 5 * time.Second +) + // Dispatcher orchestrates task dispatch by combining the agent registry, // task router, allowance service, and API client. type Dispatcher struct { @@ -71,8 +79,74 @@ func (d *Dispatcher) Dispatch(ctx context.Context, task *Task) (string, error) { return agentID, nil } +// priorityRank maps a TaskPriority to a numeric rank for sorting. +// Lower values are dispatched first. +func priorityRank(p TaskPriority) int { + switch p { + case PriorityCritical: + return 0 + case PriorityHigh: + return 1 + case PriorityMedium: + return 2 + case PriorityLow: + return 3 + default: + return 4 + } +} + +// sortTasksByPriority sorts tasks by priority (Critical first) then by +// CreatedAt (oldest first) as a tie-breaker. Uses SliceStable for determinism. +func sortTasksByPriority(tasks []Task) { + sort.SliceStable(tasks, func(i, j int) bool { + ri, rj := priorityRank(tasks[i].Priority), priorityRank(tasks[j].Priority) + if ri != rj { + return ri < rj + } + return tasks[i].CreatedAt.Before(tasks[j].CreatedAt) + }) +} + +// backoffDuration returns the exponential backoff duration for the given retry +// count. First retry waits baseBackoff (5s), second waits 10s, third 20s, etc. +func backoffDuration(retryCount int) time.Duration { + if retryCount <= 0 { + return 0 + } + d := baseBackoff + for i := 1; i < retryCount; i++ { + d *= 2 + } + return d +} + +// shouldSkipRetry returns true if a task has been retried and the backoff +// period has not yet elapsed since the last attempt. +func shouldSkipRetry(task *Task, now time.Time) bool { + if task.RetryCount <= 0 { + return false + } + if task.LastAttempt == nil { + return false + } + return task.LastAttempt.Add(backoffDuration(task.RetryCount)).After(now) +} + +// effectiveMaxRetries returns the max retries for a task, using DefaultMaxRetries +// when the task does not specify one. +func effectiveMaxRetries(task *Task) int { + if task.MaxRetries > 0 { + return task.MaxRetries + } + return DefaultMaxRetries +} + // DispatchLoop polls for pending tasks at the given interval and dispatches -// each one. It runs until the context is cancelled and returns ctx.Err(). +// each one. Tasks are sorted by priority (Critical > High > Medium > Low) with +// oldest-first tie-breaking. Failed dispatches are retried with exponential +// backoff. Tasks exceeding their retry limit are dead-lettered with StatusFailed. +// It runs until the context is cancelled and returns ctx.Err(). func (d *Dispatcher) DispatchLoop(ctx context.Context, interval time.Duration) error { const op = "Dispatcher.DispatchLoop" @@ -95,13 +169,40 @@ func (d *Dispatcher) DispatchLoop(ctx context.Context, interval time.Duration) e continue } + // Sort by priority then by creation time. + sortTasksByPriority(tasks) + + now := time.Now().UTC() for i := range tasks { if ctx.Err() != nil { return ctx.Err() } - if _, err := d.Dispatch(ctx, &tasks[i]); err != nil { - // Log dispatch errors but continue with the next task. - _ = log.E(op, "failed to dispatch task "+tasks[i].ID, err) + + task := &tasks[i] + + // Check if backoff period has not elapsed for retried tasks. + if shouldSkipRetry(task, now) { + continue + } + + if _, err := d.Dispatch(ctx, task); err != nil { + // Increment retry count and record the attempt time. + task.RetryCount++ + attemptTime := now + task.LastAttempt = &attemptTime + + maxRetries := effectiveMaxRetries(task) + if task.RetryCount >= maxRetries { + // Dead-letter: mark as failed via the API. + if updateErr := d.client.UpdateTask(ctx, task.ID, TaskUpdate{ + Status: StatusFailed, + Notes: "max retries exceeded", + }); updateErr != nil { + _ = log.E(op, "failed to dead-letter task "+task.ID, updateErr) + } + } else { + _ = log.E(op, "failed to dispatch task "+task.ID, err) + } } } } diff --git a/dispatcher_test.go b/dispatcher_test.go index 8921ae7..cc207ed 100644 --- a/dispatcher_test.go +++ b/dispatcher_test.go @@ -271,3 +271,308 @@ func TestDispatcher_Dispatch_Good_Concurrent(t *testing.T) { usage, _ := store.GetUsage("agent-1") assert.Equal(t, 10, usage.JobsStarted) } + +// --- Phase 7: Priority sorting tests --- + +func TestSortTasksByPriority_Good(t *testing.T) { + base := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC) + tasks := []Task{ + {ID: "low-old", Priority: PriorityLow, CreatedAt: base}, + {ID: "critical-new", Priority: PriorityCritical, CreatedAt: base.Add(2 * time.Hour)}, + {ID: "medium-old", Priority: PriorityMedium, CreatedAt: base}, + {ID: "high-old", Priority: PriorityHigh, CreatedAt: base}, + {ID: "critical-old", Priority: PriorityCritical, CreatedAt: base}, + } + + sortTasksByPriority(tasks) + + // Critical tasks first, oldest critical before newer critical. + assert.Equal(t, "critical-old", tasks[0].ID) + assert.Equal(t, "critical-new", tasks[1].ID) + // Then high. + assert.Equal(t, "high-old", tasks[2].ID) + // Then medium. + assert.Equal(t, "medium-old", tasks[3].ID) + // Then low. + assert.Equal(t, "low-old", tasks[4].ID) +} + +// --- Phase 7: Backoff duration tests --- + +func TestBackoffDuration_Good(t *testing.T) { + // retryCount=0 → 0 (no backoff). + assert.Equal(t, time.Duration(0), backoffDuration(0)) + // retryCount=1 → 5s (base). + assert.Equal(t, 5*time.Second, backoffDuration(1)) + // retryCount=2 → 10s. + assert.Equal(t, 10*time.Second, backoffDuration(2)) + // retryCount=3 → 20s. + assert.Equal(t, 20*time.Second, backoffDuration(3)) + // retryCount=4 → 40s. + assert.Equal(t, 40*time.Second, backoffDuration(4)) +} + +// --- Phase 7: shouldSkipRetry tests --- + +func TestShouldSkipRetry_Good(t *testing.T) { + now := time.Now().UTC() + recent := now.Add(-2 * time.Second) // 2s ago, backoff for retry 1 is 5s → skip. + + task := &Task{ + ID: "task-1", + RetryCount: 1, + LastAttempt: &recent, + } + assert.True(t, shouldSkipRetry(task, now)) + + // After backoff elapses, should NOT skip. + old := now.Add(-10 * time.Second) // 10s ago, backoff for retry 1 is 5s → ready. + task.LastAttempt = &old + assert.False(t, shouldSkipRetry(task, now)) +} + +func TestShouldSkipRetry_Bad_NoRetry(t *testing.T) { + now := time.Now().UTC() + + // RetryCount=0 → never skip. + task := &Task{ID: "task-1", RetryCount: 0} + assert.False(t, shouldSkipRetry(task, now)) + + // RetryCount=0 even with a LastAttempt set → never skip. + recent := now.Add(-1 * time.Second) + task.LastAttempt = &recent + assert.False(t, shouldSkipRetry(task, now)) + + // RetryCount>0 but nil LastAttempt → never skip. + task2 := &Task{ID: "task-2", RetryCount: 2, LastAttempt: nil} + assert.False(t, shouldSkipRetry(task2, now)) +} + +// --- Phase 7: DispatchLoop priority order test --- + +func TestDispatcher_DispatchLoop_Good_PriorityOrder(t *testing.T) { + base := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC) + pendingTasks := []Task{ + {ID: "low-1", Status: StatusPending, Priority: PriorityLow, CreatedAt: base}, + {ID: "critical-1", Status: StatusPending, Priority: PriorityCritical, CreatedAt: base}, + {ID: "medium-1", Status: StatusPending, Priority: PriorityMedium, CreatedAt: base}, + {ID: "high-1", Status: StatusPending, Priority: PriorityHigh, CreatedAt: base}, + {ID: "critical-2", Status: StatusPending, Priority: PriorityCritical, CreatedAt: base.Add(time.Second)}, + } + + var mu sync.Mutex + var claimOrder []string + claimedIDs := make(map[string]bool) + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch { + case r.Method == http.MethodGet && r.URL.Path == "/api/tasks": + w.Header().Set("Content-Type", "application/json") + mu.Lock() + var remaining []Task + for _, tk := range pendingTasks { + if !claimedIDs[tk.ID] { + remaining = append(remaining, tk) + } + } + mu.Unlock() + _ = json.NewEncoder(w).Encode(remaining) + + case r.Method == http.MethodPost: + w.Header().Set("Content-Type", "application/json") + for _, tk := range pendingTasks { + if r.URL.Path == "/api/tasks/"+tk.ID+"/claim" { + mu.Lock() + claimedIDs[tk.ID] = true + claimOrder = append(claimOrder, tk.ID) + mu.Unlock() + claimed := tk + claimed.Status = StatusInProgress + _ = json.NewEncoder(w).Encode(ClaimResponse{Task: &claimed}) + return + } + } + w.WriteHeader(http.StatusNotFound) + + default: + w.WriteHeader(http.StatusNotFound) + } + })) + defer server.Close() + + client := NewClient(server.URL, "test-token") + d, reg, store := setupDispatcher(t, client) + registerAgent(t, reg, store, "agent-1", nil, 10) + + ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer cancel() + + err := d.DispatchLoop(ctx, 50*time.Millisecond) + require.ErrorIs(t, err, context.DeadlineExceeded) + + mu.Lock() + defer mu.Unlock() + + // All 5 tasks should have been claimed. + require.Len(t, claimOrder, 5) + // Critical tasks first (oldest before newest), then high, medium, low. + assert.Equal(t, "critical-1", claimOrder[0]) + assert.Equal(t, "critical-2", claimOrder[1]) + assert.Equal(t, "high-1", claimOrder[2]) + assert.Equal(t, "medium-1", claimOrder[3]) + assert.Equal(t, "low-1", claimOrder[4]) +} + +// --- Phase 7: DispatchLoop retry backoff test --- + +func TestDispatcher_DispatchLoop_Good_RetryBackoff(t *testing.T) { + // A task with a recent LastAttempt and RetryCount=1 should be skipped + // because the backoff period (5s) has not elapsed. + recentAttempt := time.Now().UTC() + pendingTasks := []Task{ + { + ID: "retrying-task", + Status: StatusPending, + Priority: PriorityHigh, + CreatedAt: time.Now().UTC().Add(-time.Hour), + RetryCount: 1, + LastAttempt: &recentAttempt, + }, + { + ID: "fresh-task", + Status: StatusPending, + Priority: PriorityLow, + CreatedAt: time.Now().UTC(), + }, + } + + var mu sync.Mutex + claimedIDs := make(map[string]bool) + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch { + case r.Method == http.MethodGet && r.URL.Path == "/api/tasks": + w.Header().Set("Content-Type", "application/json") + mu.Lock() + var remaining []Task + for _, tk := range pendingTasks { + if !claimedIDs[tk.ID] { + remaining = append(remaining, tk) + } + } + mu.Unlock() + _ = json.NewEncoder(w).Encode(remaining) + + case r.Method == http.MethodPost: + w.Header().Set("Content-Type", "application/json") + for _, tk := range pendingTasks { + if r.URL.Path == "/api/tasks/"+tk.ID+"/claim" { + mu.Lock() + claimedIDs[tk.ID] = true + mu.Unlock() + claimed := tk + claimed.Status = StatusInProgress + _ = json.NewEncoder(w).Encode(ClaimResponse{Task: &claimed}) + return + } + } + w.WriteHeader(http.StatusNotFound) + + default: + w.WriteHeader(http.StatusNotFound) + } + })) + defer server.Close() + + client := NewClient(server.URL, "test-token") + d, reg, store := setupDispatcher(t, client) + registerAgent(t, reg, store, "agent-1", nil, 10) + + // Run the loop for a short period — not long enough for the 5s backoff to elapse. + ctx, cancel := context.WithTimeout(context.Background(), 300*time.Millisecond) + defer cancel() + + err := d.DispatchLoop(ctx, 50*time.Millisecond) + require.ErrorIs(t, err, context.DeadlineExceeded) + + mu.Lock() + defer mu.Unlock() + + // The fresh task should have been claimed. + assert.True(t, claimedIDs["fresh-task"]) + // The retrying task should NOT have been claimed because backoff has not elapsed. + assert.False(t, claimedIDs["retrying-task"]) +} + +// --- Phase 7: DispatchLoop dead-letter test --- + +func TestDispatcher_DispatchLoop_Good_DeadLetter(t *testing.T) { + // A task with RetryCount at MaxRetries-1 that fails dispatch should be dead-lettered. + pendingTasks := []Task{ + { + ID: "doomed-task", + Status: StatusPending, + Priority: PriorityHigh, + CreatedAt: time.Now().UTC().Add(-time.Hour), + MaxRetries: 1, // Will fail after 1 attempt. + RetryCount: 0, + }, + } + + var mu sync.Mutex + var deadLettered bool + var deadLetterNotes string + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch { + case r.Method == http.MethodGet && r.URL.Path == "/api/tasks": + w.Header().Set("Content-Type", "application/json") + mu.Lock() + done := deadLettered + mu.Unlock() + if done { + // Return empty list once dead-lettered. + _ = json.NewEncoder(w).Encode([]Task{}) + } else { + _ = json.NewEncoder(w).Encode(pendingTasks) + } + + case r.Method == http.MethodPost && r.URL.Path == "/api/tasks/doomed-task/claim": + // Claim always fails to trigger retry logic. + w.WriteHeader(http.StatusInternalServerError) + _ = json.NewEncoder(w).Encode(APIError{Code: 500, Message: "server error"}) + + case r.Method == http.MethodPatch && r.URL.Path == "/api/tasks/doomed-task": + // This is the UpdateTask call for dead-lettering. + var update TaskUpdate + _ = json.NewDecoder(r.Body).Decode(&update) + mu.Lock() + deadLettered = true + deadLetterNotes = update.Notes + mu.Unlock() + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + _ = json.NewEncoder(w).Encode(map[string]string{"status": "ok"}) + + default: + w.WriteHeader(http.StatusNotFound) + } + })) + defer server.Close() + + client := NewClient(server.URL, "test-token") + d, reg, store := setupDispatcher(t, client) + registerAgent(t, reg, store, "agent-1", nil, 10) + + ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer cancel() + + err := d.DispatchLoop(ctx, 50*time.Millisecond) + require.ErrorIs(t, err, context.DeadlineExceeded) + + mu.Lock() + defer mu.Unlock() + + assert.True(t, deadLettered, "task should have been dead-lettered") + assert.Equal(t, "max retries exceeded", deadLetterNotes) +} diff --git a/types.go b/types.go index 53fc480..1cca663 100644 --- a/types.go +++ b/types.go @@ -19,6 +19,8 @@ const ( StatusCompleted TaskStatus = "completed" // StatusBlocked indicates the task cannot proceed due to dependencies. StatusBlocked TaskStatus = "blocked" + // StatusFailed indicates the task has exceeded its retry limit and been dead-lettered. + StatusFailed TaskStatus = "failed" ) // TaskPriority represents the urgency level of a task. @@ -65,6 +67,14 @@ type Task struct { Dependencies []string `json:"dependencies,omitempty"` // Blockers lists task IDs that this task is blocking. Blockers []string `json:"blockers,omitempty"` + // MaxRetries is the maximum dispatch attempts before dead-lettering. 0 uses DefaultMaxRetries. + MaxRetries int `json:"max_retries,omitempty"` + // RetryCount is the number of failed dispatch attempts so far. + RetryCount int `json:"retry_count,omitempty"` + // LastAttempt is when the last dispatch attempt occurred. + LastAttempt *time.Time `json:"last_attempt,omitempty"` + // FailReason explains why the task was moved to failed status. + FailReason string `json:"fail_reason,omitempty"` } // TaskUpdate contains fields that can be updated on a task.