feat(dispatch): Phase 7 — priority-ordered dispatch with retry backoff and dead-letter
Sort pending tasks by priority (Critical > High > Medium > Low) with oldest-first tie-breaking before dispatch. Add exponential backoff for failed dispatches (5s, 10s, 20s...) and dead-letter tasks that exceed MaxRetries (default 3) by marking them StatusFailed via UpdateTask. New Task fields: MaxRetries, RetryCount, LastAttempt, FailReason. New constant: StatusFailed. Co-Authored-By: Virgil <virgil@lethean.io> Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
ce502c0f16
commit
ba8c19d32f
3 changed files with 420 additions and 4 deletions
109
dispatcher.go
109
dispatcher.go
|
|
@ -2,11 +2,19 @@ package agentic
|
|||
|
||||
import (
|
||||
"context"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"forge.lthn.ai/core/go/pkg/log"
|
||||
)
|
||||
|
||||
const (
|
||||
// DefaultMaxRetries is the default number of dispatch attempts before dead-lettering.
|
||||
DefaultMaxRetries = 3
|
||||
// baseBackoff is the base duration for exponential backoff between retries.
|
||||
baseBackoff = 5 * time.Second
|
||||
)
|
||||
|
||||
// Dispatcher orchestrates task dispatch by combining the agent registry,
|
||||
// task router, allowance service, and API client.
|
||||
type Dispatcher struct {
|
||||
|
|
@ -71,8 +79,74 @@ func (d *Dispatcher) Dispatch(ctx context.Context, task *Task) (string, error) {
|
|||
return agentID, nil
|
||||
}
|
||||
|
||||
// priorityRank maps a TaskPriority to a numeric rank for sorting.
|
||||
// Lower values are dispatched first.
|
||||
func priorityRank(p TaskPriority) int {
|
||||
switch p {
|
||||
case PriorityCritical:
|
||||
return 0
|
||||
case PriorityHigh:
|
||||
return 1
|
||||
case PriorityMedium:
|
||||
return 2
|
||||
case PriorityLow:
|
||||
return 3
|
||||
default:
|
||||
return 4
|
||||
}
|
||||
}
|
||||
|
||||
// sortTasksByPriority sorts tasks by priority (Critical first) then by
|
||||
// CreatedAt (oldest first) as a tie-breaker. Uses SliceStable for determinism.
|
||||
func sortTasksByPriority(tasks []Task) {
|
||||
sort.SliceStable(tasks, func(i, j int) bool {
|
||||
ri, rj := priorityRank(tasks[i].Priority), priorityRank(tasks[j].Priority)
|
||||
if ri != rj {
|
||||
return ri < rj
|
||||
}
|
||||
return tasks[i].CreatedAt.Before(tasks[j].CreatedAt)
|
||||
})
|
||||
}
|
||||
|
||||
// backoffDuration returns the exponential backoff duration for the given retry
|
||||
// count. First retry waits baseBackoff (5s), second waits 10s, third 20s, etc.
|
||||
func backoffDuration(retryCount int) time.Duration {
|
||||
if retryCount <= 0 {
|
||||
return 0
|
||||
}
|
||||
d := baseBackoff
|
||||
for i := 1; i < retryCount; i++ {
|
||||
d *= 2
|
||||
}
|
||||
return d
|
||||
}
|
||||
|
||||
// shouldSkipRetry returns true if a task has been retried and the backoff
|
||||
// period has not yet elapsed since the last attempt.
|
||||
func shouldSkipRetry(task *Task, now time.Time) bool {
|
||||
if task.RetryCount <= 0 {
|
||||
return false
|
||||
}
|
||||
if task.LastAttempt == nil {
|
||||
return false
|
||||
}
|
||||
return task.LastAttempt.Add(backoffDuration(task.RetryCount)).After(now)
|
||||
}
|
||||
|
||||
// effectiveMaxRetries returns the max retries for a task, using DefaultMaxRetries
|
||||
// when the task does not specify one.
|
||||
func effectiveMaxRetries(task *Task) int {
|
||||
if task.MaxRetries > 0 {
|
||||
return task.MaxRetries
|
||||
}
|
||||
return DefaultMaxRetries
|
||||
}
|
||||
|
||||
// DispatchLoop polls for pending tasks at the given interval and dispatches
|
||||
// each one. It runs until the context is cancelled and returns ctx.Err().
|
||||
// each one. Tasks are sorted by priority (Critical > High > Medium > Low) with
|
||||
// oldest-first tie-breaking. Failed dispatches are retried with exponential
|
||||
// backoff. Tasks exceeding their retry limit are dead-lettered with StatusFailed.
|
||||
// It runs until the context is cancelled and returns ctx.Err().
|
||||
func (d *Dispatcher) DispatchLoop(ctx context.Context, interval time.Duration) error {
|
||||
const op = "Dispatcher.DispatchLoop"
|
||||
|
||||
|
|
@ -95,13 +169,40 @@ func (d *Dispatcher) DispatchLoop(ctx context.Context, interval time.Duration) e
|
|||
continue
|
||||
}
|
||||
|
||||
// Sort by priority then by creation time.
|
||||
sortTasksByPriority(tasks)
|
||||
|
||||
now := time.Now().UTC()
|
||||
for i := range tasks {
|
||||
if ctx.Err() != nil {
|
||||
return ctx.Err()
|
||||
}
|
||||
if _, err := d.Dispatch(ctx, &tasks[i]); err != nil {
|
||||
// Log dispatch errors but continue with the next task.
|
||||
_ = log.E(op, "failed to dispatch task "+tasks[i].ID, err)
|
||||
|
||||
task := &tasks[i]
|
||||
|
||||
// Check if backoff period has not elapsed for retried tasks.
|
||||
if shouldSkipRetry(task, now) {
|
||||
continue
|
||||
}
|
||||
|
||||
if _, err := d.Dispatch(ctx, task); err != nil {
|
||||
// Increment retry count and record the attempt time.
|
||||
task.RetryCount++
|
||||
attemptTime := now
|
||||
task.LastAttempt = &attemptTime
|
||||
|
||||
maxRetries := effectiveMaxRetries(task)
|
||||
if task.RetryCount >= maxRetries {
|
||||
// Dead-letter: mark as failed via the API.
|
||||
if updateErr := d.client.UpdateTask(ctx, task.ID, TaskUpdate{
|
||||
Status: StatusFailed,
|
||||
Notes: "max retries exceeded",
|
||||
}); updateErr != nil {
|
||||
_ = log.E(op, "failed to dead-letter task "+task.ID, updateErr)
|
||||
}
|
||||
} else {
|
||||
_ = log.E(op, "failed to dispatch task "+task.ID, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -271,3 +271,308 @@ func TestDispatcher_Dispatch_Good_Concurrent(t *testing.T) {
|
|||
usage, _ := store.GetUsage("agent-1")
|
||||
assert.Equal(t, 10, usage.JobsStarted)
|
||||
}
|
||||
|
||||
// --- Phase 7: Priority sorting tests ---
|
||||
|
||||
func TestSortTasksByPriority_Good(t *testing.T) {
|
||||
base := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC)
|
||||
tasks := []Task{
|
||||
{ID: "low-old", Priority: PriorityLow, CreatedAt: base},
|
||||
{ID: "critical-new", Priority: PriorityCritical, CreatedAt: base.Add(2 * time.Hour)},
|
||||
{ID: "medium-old", Priority: PriorityMedium, CreatedAt: base},
|
||||
{ID: "high-old", Priority: PriorityHigh, CreatedAt: base},
|
||||
{ID: "critical-old", Priority: PriorityCritical, CreatedAt: base},
|
||||
}
|
||||
|
||||
sortTasksByPriority(tasks)
|
||||
|
||||
// Critical tasks first, oldest critical before newer critical.
|
||||
assert.Equal(t, "critical-old", tasks[0].ID)
|
||||
assert.Equal(t, "critical-new", tasks[1].ID)
|
||||
// Then high.
|
||||
assert.Equal(t, "high-old", tasks[2].ID)
|
||||
// Then medium.
|
||||
assert.Equal(t, "medium-old", tasks[3].ID)
|
||||
// Then low.
|
||||
assert.Equal(t, "low-old", tasks[4].ID)
|
||||
}
|
||||
|
||||
// --- Phase 7: Backoff duration tests ---
|
||||
|
||||
func TestBackoffDuration_Good(t *testing.T) {
|
||||
// retryCount=0 → 0 (no backoff).
|
||||
assert.Equal(t, time.Duration(0), backoffDuration(0))
|
||||
// retryCount=1 → 5s (base).
|
||||
assert.Equal(t, 5*time.Second, backoffDuration(1))
|
||||
// retryCount=2 → 10s.
|
||||
assert.Equal(t, 10*time.Second, backoffDuration(2))
|
||||
// retryCount=3 → 20s.
|
||||
assert.Equal(t, 20*time.Second, backoffDuration(3))
|
||||
// retryCount=4 → 40s.
|
||||
assert.Equal(t, 40*time.Second, backoffDuration(4))
|
||||
}
|
||||
|
||||
// --- Phase 7: shouldSkipRetry tests ---
|
||||
|
||||
func TestShouldSkipRetry_Good(t *testing.T) {
|
||||
now := time.Now().UTC()
|
||||
recent := now.Add(-2 * time.Second) // 2s ago, backoff for retry 1 is 5s → skip.
|
||||
|
||||
task := &Task{
|
||||
ID: "task-1",
|
||||
RetryCount: 1,
|
||||
LastAttempt: &recent,
|
||||
}
|
||||
assert.True(t, shouldSkipRetry(task, now))
|
||||
|
||||
// After backoff elapses, should NOT skip.
|
||||
old := now.Add(-10 * time.Second) // 10s ago, backoff for retry 1 is 5s → ready.
|
||||
task.LastAttempt = &old
|
||||
assert.False(t, shouldSkipRetry(task, now))
|
||||
}
|
||||
|
||||
func TestShouldSkipRetry_Bad_NoRetry(t *testing.T) {
|
||||
now := time.Now().UTC()
|
||||
|
||||
// RetryCount=0 → never skip.
|
||||
task := &Task{ID: "task-1", RetryCount: 0}
|
||||
assert.False(t, shouldSkipRetry(task, now))
|
||||
|
||||
// RetryCount=0 even with a LastAttempt set → never skip.
|
||||
recent := now.Add(-1 * time.Second)
|
||||
task.LastAttempt = &recent
|
||||
assert.False(t, shouldSkipRetry(task, now))
|
||||
|
||||
// RetryCount>0 but nil LastAttempt → never skip.
|
||||
task2 := &Task{ID: "task-2", RetryCount: 2, LastAttempt: nil}
|
||||
assert.False(t, shouldSkipRetry(task2, now))
|
||||
}
|
||||
|
||||
// --- Phase 7: DispatchLoop priority order test ---
|
||||
|
||||
func TestDispatcher_DispatchLoop_Good_PriorityOrder(t *testing.T) {
|
||||
base := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC)
|
||||
pendingTasks := []Task{
|
||||
{ID: "low-1", Status: StatusPending, Priority: PriorityLow, CreatedAt: base},
|
||||
{ID: "critical-1", Status: StatusPending, Priority: PriorityCritical, CreatedAt: base},
|
||||
{ID: "medium-1", Status: StatusPending, Priority: PriorityMedium, CreatedAt: base},
|
||||
{ID: "high-1", Status: StatusPending, Priority: PriorityHigh, CreatedAt: base},
|
||||
{ID: "critical-2", Status: StatusPending, Priority: PriorityCritical, CreatedAt: base.Add(time.Second)},
|
||||
}
|
||||
|
||||
var mu sync.Mutex
|
||||
var claimOrder []string
|
||||
claimedIDs := make(map[string]bool)
|
||||
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
switch {
|
||||
case r.Method == http.MethodGet && r.URL.Path == "/api/tasks":
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
mu.Lock()
|
||||
var remaining []Task
|
||||
for _, tk := range pendingTasks {
|
||||
if !claimedIDs[tk.ID] {
|
||||
remaining = append(remaining, tk)
|
||||
}
|
||||
}
|
||||
mu.Unlock()
|
||||
_ = json.NewEncoder(w).Encode(remaining)
|
||||
|
||||
case r.Method == http.MethodPost:
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
for _, tk := range pendingTasks {
|
||||
if r.URL.Path == "/api/tasks/"+tk.ID+"/claim" {
|
||||
mu.Lock()
|
||||
claimedIDs[tk.ID] = true
|
||||
claimOrder = append(claimOrder, tk.ID)
|
||||
mu.Unlock()
|
||||
claimed := tk
|
||||
claimed.Status = StatusInProgress
|
||||
_ = json.NewEncoder(w).Encode(ClaimResponse{Task: &claimed})
|
||||
return
|
||||
}
|
||||
}
|
||||
w.WriteHeader(http.StatusNotFound)
|
||||
|
||||
default:
|
||||
w.WriteHeader(http.StatusNotFound)
|
||||
}
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
client := NewClient(server.URL, "test-token")
|
||||
d, reg, store := setupDispatcher(t, client)
|
||||
registerAgent(t, reg, store, "agent-1", nil, 10)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
|
||||
defer cancel()
|
||||
|
||||
err := d.DispatchLoop(ctx, 50*time.Millisecond)
|
||||
require.ErrorIs(t, err, context.DeadlineExceeded)
|
||||
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
|
||||
// All 5 tasks should have been claimed.
|
||||
require.Len(t, claimOrder, 5)
|
||||
// Critical tasks first (oldest before newest), then high, medium, low.
|
||||
assert.Equal(t, "critical-1", claimOrder[0])
|
||||
assert.Equal(t, "critical-2", claimOrder[1])
|
||||
assert.Equal(t, "high-1", claimOrder[2])
|
||||
assert.Equal(t, "medium-1", claimOrder[3])
|
||||
assert.Equal(t, "low-1", claimOrder[4])
|
||||
}
|
||||
|
||||
// --- Phase 7: DispatchLoop retry backoff test ---
|
||||
|
||||
func TestDispatcher_DispatchLoop_Good_RetryBackoff(t *testing.T) {
|
||||
// A task with a recent LastAttempt and RetryCount=1 should be skipped
|
||||
// because the backoff period (5s) has not elapsed.
|
||||
recentAttempt := time.Now().UTC()
|
||||
pendingTasks := []Task{
|
||||
{
|
||||
ID: "retrying-task",
|
||||
Status: StatusPending,
|
||||
Priority: PriorityHigh,
|
||||
CreatedAt: time.Now().UTC().Add(-time.Hour),
|
||||
RetryCount: 1,
|
||||
LastAttempt: &recentAttempt,
|
||||
},
|
||||
{
|
||||
ID: "fresh-task",
|
||||
Status: StatusPending,
|
||||
Priority: PriorityLow,
|
||||
CreatedAt: time.Now().UTC(),
|
||||
},
|
||||
}
|
||||
|
||||
var mu sync.Mutex
|
||||
claimedIDs := make(map[string]bool)
|
||||
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
switch {
|
||||
case r.Method == http.MethodGet && r.URL.Path == "/api/tasks":
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
mu.Lock()
|
||||
var remaining []Task
|
||||
for _, tk := range pendingTasks {
|
||||
if !claimedIDs[tk.ID] {
|
||||
remaining = append(remaining, tk)
|
||||
}
|
||||
}
|
||||
mu.Unlock()
|
||||
_ = json.NewEncoder(w).Encode(remaining)
|
||||
|
||||
case r.Method == http.MethodPost:
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
for _, tk := range pendingTasks {
|
||||
if r.URL.Path == "/api/tasks/"+tk.ID+"/claim" {
|
||||
mu.Lock()
|
||||
claimedIDs[tk.ID] = true
|
||||
mu.Unlock()
|
||||
claimed := tk
|
||||
claimed.Status = StatusInProgress
|
||||
_ = json.NewEncoder(w).Encode(ClaimResponse{Task: &claimed})
|
||||
return
|
||||
}
|
||||
}
|
||||
w.WriteHeader(http.StatusNotFound)
|
||||
|
||||
default:
|
||||
w.WriteHeader(http.StatusNotFound)
|
||||
}
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
client := NewClient(server.URL, "test-token")
|
||||
d, reg, store := setupDispatcher(t, client)
|
||||
registerAgent(t, reg, store, "agent-1", nil, 10)
|
||||
|
||||
// Run the loop for a short period — not long enough for the 5s backoff to elapse.
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 300*time.Millisecond)
|
||||
defer cancel()
|
||||
|
||||
err := d.DispatchLoop(ctx, 50*time.Millisecond)
|
||||
require.ErrorIs(t, err, context.DeadlineExceeded)
|
||||
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
|
||||
// The fresh task should have been claimed.
|
||||
assert.True(t, claimedIDs["fresh-task"])
|
||||
// The retrying task should NOT have been claimed because backoff has not elapsed.
|
||||
assert.False(t, claimedIDs["retrying-task"])
|
||||
}
|
||||
|
||||
// --- Phase 7: DispatchLoop dead-letter test ---
|
||||
|
||||
func TestDispatcher_DispatchLoop_Good_DeadLetter(t *testing.T) {
|
||||
// A task with RetryCount at MaxRetries-1 that fails dispatch should be dead-lettered.
|
||||
pendingTasks := []Task{
|
||||
{
|
||||
ID: "doomed-task",
|
||||
Status: StatusPending,
|
||||
Priority: PriorityHigh,
|
||||
CreatedAt: time.Now().UTC().Add(-time.Hour),
|
||||
MaxRetries: 1, // Will fail after 1 attempt.
|
||||
RetryCount: 0,
|
||||
},
|
||||
}
|
||||
|
||||
var mu sync.Mutex
|
||||
var deadLettered bool
|
||||
var deadLetterNotes string
|
||||
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
switch {
|
||||
case r.Method == http.MethodGet && r.URL.Path == "/api/tasks":
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
mu.Lock()
|
||||
done := deadLettered
|
||||
mu.Unlock()
|
||||
if done {
|
||||
// Return empty list once dead-lettered.
|
||||
_ = json.NewEncoder(w).Encode([]Task{})
|
||||
} else {
|
||||
_ = json.NewEncoder(w).Encode(pendingTasks)
|
||||
}
|
||||
|
||||
case r.Method == http.MethodPost && r.URL.Path == "/api/tasks/doomed-task/claim":
|
||||
// Claim always fails to trigger retry logic.
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
_ = json.NewEncoder(w).Encode(APIError{Code: 500, Message: "server error"})
|
||||
|
||||
case r.Method == http.MethodPatch && r.URL.Path == "/api/tasks/doomed-task":
|
||||
// This is the UpdateTask call for dead-lettering.
|
||||
var update TaskUpdate
|
||||
_ = json.NewDecoder(r.Body).Decode(&update)
|
||||
mu.Lock()
|
||||
deadLettered = true
|
||||
deadLetterNotes = update.Notes
|
||||
mu.Unlock()
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
_ = json.NewEncoder(w).Encode(map[string]string{"status": "ok"})
|
||||
|
||||
default:
|
||||
w.WriteHeader(http.StatusNotFound)
|
||||
}
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
client := NewClient(server.URL, "test-token")
|
||||
d, reg, store := setupDispatcher(t, client)
|
||||
registerAgent(t, reg, store, "agent-1", nil, 10)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
|
||||
defer cancel()
|
||||
|
||||
err := d.DispatchLoop(ctx, 50*time.Millisecond)
|
||||
require.ErrorIs(t, err, context.DeadlineExceeded)
|
||||
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
|
||||
assert.True(t, deadLettered, "task should have been dead-lettered")
|
||||
assert.Equal(t, "max retries exceeded", deadLetterNotes)
|
||||
}
|
||||
|
|
|
|||
10
types.go
10
types.go
|
|
@ -19,6 +19,8 @@ const (
|
|||
StatusCompleted TaskStatus = "completed"
|
||||
// StatusBlocked indicates the task cannot proceed due to dependencies.
|
||||
StatusBlocked TaskStatus = "blocked"
|
||||
// StatusFailed indicates the task has exceeded its retry limit and been dead-lettered.
|
||||
StatusFailed TaskStatus = "failed"
|
||||
)
|
||||
|
||||
// TaskPriority represents the urgency level of a task.
|
||||
|
|
@ -65,6 +67,14 @@ type Task struct {
|
|||
Dependencies []string `json:"dependencies,omitempty"`
|
||||
// Blockers lists task IDs that this task is blocking.
|
||||
Blockers []string `json:"blockers,omitempty"`
|
||||
// MaxRetries is the maximum dispatch attempts before dead-lettering. 0 uses DefaultMaxRetries.
|
||||
MaxRetries int `json:"max_retries,omitempty"`
|
||||
// RetryCount is the number of failed dispatch attempts so far.
|
||||
RetryCount int `json:"retry_count,omitempty"`
|
||||
// LastAttempt is when the last dispatch attempt occurred.
|
||||
LastAttempt *time.Time `json:"last_attempt,omitempty"`
|
||||
// FailReason explains why the task was moved to failed status.
|
||||
FailReason string `json:"fail_reason,omitempty"`
|
||||
}
|
||||
|
||||
// TaskUpdate contains fields that can be updated on a task.
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue