diff --git a/TODO.md b/TODO.md index d594763..c4dfba3 100644 --- a/TODO.md +++ b/TODO.md @@ -17,17 +17,48 @@ ## Phase 3: Multi-Agent Coordination -- [ ] Currently single-agent -- only one Claude subprocess at a time -- [ ] Add agent discovery (register capabilities, heartbeat) -- [ ] Add task routing based on agent capabilities and current load -- [ ] Add load balancing across multiple agent instances +### 3.1 Agent Registry -## Phase 4: CLI Integration +- [x] **Create `registry.go`** — `AgentInfo` struct (ID, Name, Capabilities []string, Status enum, LastHeartbeat, CurrentLoad int, MaxLoad int), `AgentStatus` enum (Available/Busy/Offline) +- [x] **`AgentRegistry` interface** — `Register(AgentInfo) error`, `Deregister(id string) error`, `Get(id string) (AgentInfo, error)`, `List() []AgentInfo`, `Heartbeat(id string) error`, `Reap(ttl time.Duration) []string` (returns IDs of reaped agents) +- [x] **`MemoryRegistry` implementation** — `sync.RWMutex` guarded map, `Reap()` marks agents offline if heartbeat older than TTL +- [x] **Tests** — registration, deregistration, heartbeat updates, reap stale agents, concurrent access -- [ ] `client.go` talks to REST API -- wire into `core agent` CLI command group -- [ ] `core agent status` -- show running agents, task queue depth, allowance remaining -- [ ] `core agent dispatch ` -- submit task to queue from CLI -- [ ] `core agent logs ` -- stream task output via WebSocket +### 3.2 Task Router + +- [x] **Create `router.go`** — `TaskRouter` interface with `Route(task *Task, agents []AgentInfo) (string, error)` returning agent ID +- [x] **`DefaultRouter` implementation** — capability matching (task.Labels ⊆ agent.Capabilities), then least-loaded agent (CurrentLoad / MaxLoad ratio), priority weighting (critical tasks skip load balancing) +- [x] **`ErrNoEligibleAgent`** sentinel error when no agents match capabilities or all are at capacity +- [x] **Tests** — capability matching, load distribution, critical priority bypass, no eligible agent error, tie-breaking by agent ID (deterministic) + +### 3.3 Dispatcher + +- [x] **Create `dispatcher.go`** — `Dispatcher` struct wrapping `AgentRegistry`, `TaskRouter`, `AllowanceService`, and `Client` +- [x] **`Dispatch(ctx, task) (string, error)`** — Route → allowance check → claim via client → record usage. Returns assigned agent ID +- [x] **`DispatchLoop(ctx, interval)`** — polls client.ListTasks(pending) → dispatches each. Respects context cancellation +- [x] **Tests** — full dispatch flow with mock client, allowance rejection path, no-agent-available path, loop cancellation + +## Phase 4: CLI Backing Functions + +Phase 4 provides the data-fetching and formatting functions that `core agent` CLI commands will call. The CLI commands themselves live in `core/cli`. + +### 4.1 Status Summary + +- [ ] **Create `status.go`** — `StatusSummary` struct (Agents []AgentInfo, PendingTasks int, InProgressTasks int, AllowanceRemaining map[string]int64) +- [ ] **`GetStatus(ctx, registry, client, allowanceSvc) (*StatusSummary, error)`** — aggregates registry.List(), client.ListTasks counts, allowance remaining per agent +- [ ] **`FormatStatus(summary) string`** — tabular text output for CLI rendering +- [ ] **Tests** — with mock registry + nil client, full summary with mock client + +### 4.2 Task Submission + +- [ ] **`SubmitTask(ctx, client, title, description, labels, priority) (*Task, error)`** — creates a new task via client (requires new Client.CreateTask method) +- [ ] **Add `Client.CreateTask(ctx, task) (*Task, error)`** — POST /api/tasks +- [ ] **Tests** — creation with all fields, validation (empty title), httptest mock + +### 4.3 Log Streaming + +- [ ] **Create `logs.go`** — `StreamLogs(ctx, client, taskID, writer) error` — polls task updates and writes progress to io.Writer +- [ ] **Tests** — mock client with progress updates, context cancellation --- diff --git a/dispatcher.go b/dispatcher.go new file mode 100644 index 0000000..01d0f78 --- /dev/null +++ b/dispatcher.go @@ -0,0 +1,109 @@ +package agentic + +import ( + "context" + "time" + + "forge.lthn.ai/core/go/pkg/log" +) + +// Dispatcher orchestrates task dispatch by combining the agent registry, +// task router, allowance service, and API client. +type Dispatcher struct { + registry AgentRegistry + router TaskRouter + allowance *AllowanceService + client *Client // can be nil for tests +} + +// NewDispatcher creates a new Dispatcher with the given dependencies. +func NewDispatcher(registry AgentRegistry, router TaskRouter, allowance *AllowanceService, client *Client) *Dispatcher { + return &Dispatcher{ + registry: registry, + router: router, + allowance: allowance, + client: client, + } +} + +// Dispatch assigns a task to the best available agent. It queries the registry +// for available agents, routes the task, checks the agent's allowance, claims +// the task via the API client (if present), and records usage. Returns the +// assigned agent ID. +func (d *Dispatcher) Dispatch(ctx context.Context, task *Task) (string, error) { + const op = "Dispatcher.Dispatch" + + // 1. Get available agents from registry. + agents := d.registry.List() + + // 2. Route task to best agent. + agentID, err := d.router.Route(task, agents) + if err != nil { + return "", log.E(op, "routing failed", err) + } + + // 3. Check allowance for the selected agent. + check, err := d.allowance.Check(agentID, "") + if err != nil { + return "", log.E(op, "allowance check failed", err) + } + if !check.Allowed { + return "", log.E(op, "agent quota exceeded: "+check.Reason, nil) + } + + // 4. Claim the task via the API client (if available). + if d.client != nil { + if _, err := d.client.ClaimTask(ctx, task.ID); err != nil { + return "", log.E(op, "failed to claim task", err) + } + } + + // 5. Record job start usage. + if err := d.allowance.RecordUsage(UsageReport{ + AgentID: agentID, + JobID: task.ID, + Event: QuotaEventJobStarted, + Timestamp: time.Now().UTC(), + }); err != nil { + return "", log.E(op, "failed to record usage", err) + } + + return agentID, nil +} + +// DispatchLoop polls for pending tasks at the given interval and dispatches +// each one. It runs until the context is cancelled and returns ctx.Err(). +func (d *Dispatcher) DispatchLoop(ctx context.Context, interval time.Duration) error { + const op = "Dispatcher.DispatchLoop" + + ticker := time.NewTicker(interval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + if d.client == nil { + continue + } + + tasks, err := d.client.ListTasks(ctx, ListOptions{Status: StatusPending}) + if err != nil { + // Log but continue — transient API errors should not stop the loop. + _ = log.E(op, "failed to list pending tasks", err) + continue + } + + for i := range tasks { + if ctx.Err() != nil { + return ctx.Err() + } + if _, err := d.Dispatch(ctx, &tasks[i]); err != nil { + // Log dispatch errors but continue with the next task. + _ = log.E(op, "failed to dispatch task "+tasks[i].ID, err) + } + } + } + } +} diff --git a/dispatcher_test.go b/dispatcher_test.go new file mode 100644 index 0000000..f762b13 --- /dev/null +++ b/dispatcher_test.go @@ -0,0 +1,267 @@ +package agentic + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// setupDispatcher creates a Dispatcher with a memory registry, default router, +// and memory allowance store, pre-loaded with agents and allowances. +func setupDispatcher(t *testing.T, client *Client) (*Dispatcher, *MemoryRegistry, *MemoryStore) { + t.Helper() + + reg := NewMemoryRegistry() + router := NewDefaultRouter() + store := NewMemoryStore() + svc := NewAllowanceService(store) + + d := NewDispatcher(reg, router, svc, client) + return d, reg, store +} + +func registerAgent(t *testing.T, reg *MemoryRegistry, store *MemoryStore, id string, caps []string, maxLoad int) { + t.Helper() + _ = reg.Register(AgentInfo{ + ID: id, + Name: id, + Capabilities: caps, + Status: AgentAvailable, + LastHeartbeat: time.Now().UTC(), + MaxLoad: maxLoad, + }) + _ = store.SetAllowance(&AgentAllowance{ + AgentID: id, + DailyTokenLimit: 100000, + DailyJobLimit: 50, + ConcurrentJobs: 5, + }) +} + +// --- Dispatch tests --- + +func TestDispatcher_Dispatch_Good_NilClient(t *testing.T) { + d, reg, store := setupDispatcher(t, nil) + registerAgent(t, reg, store, "agent-1", []string{"go"}, 5) + + task := &Task{ID: "task-1", Labels: []string{"go"}, Priority: PriorityMedium} + agentID, err := d.Dispatch(context.Background(), task) + require.NoError(t, err) + assert.Equal(t, "agent-1", agentID) + + // Verify usage was recorded. + usage, _ := store.GetUsage("agent-1") + assert.Equal(t, 1, usage.JobsStarted) + assert.Equal(t, 1, usage.ActiveJobs) +} + +func TestDispatcher_Dispatch_Good_WithHTTPClient(t *testing.T) { + claimedTask := Task{ID: "task-1", Status: StatusInProgress, ClaimedBy: "agent-1"} + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method == http.MethodPost && r.URL.Path == "/api/tasks/task-1/claim" { + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(ClaimResponse{Task: &claimedTask}) + return + } + w.WriteHeader(http.StatusNotFound) + })) + defer server.Close() + + client := NewClient(server.URL, "test-token") + d, reg, store := setupDispatcher(t, client) + registerAgent(t, reg, store, "agent-1", nil, 5) + + task := &Task{ID: "task-1", Priority: PriorityHigh} + agentID, err := d.Dispatch(context.Background(), task) + require.NoError(t, err) + assert.Equal(t, "agent-1", agentID) + + // Verify usage recorded. + usage, _ := store.GetUsage("agent-1") + assert.Equal(t, 1, usage.JobsStarted) +} + +func TestDispatcher_Dispatch_Good_PicksBestAgent(t *testing.T) { + d, reg, store := setupDispatcher(t, nil) + registerAgent(t, reg, store, "heavy", []string{"go"}, 5) + registerAgent(t, reg, store, "light", []string{"go"}, 5) + + // Give "heavy" some load. + _ = reg.Register(AgentInfo{ + ID: "heavy", + Name: "heavy", + Capabilities: []string{"go"}, + Status: AgentAvailable, + LastHeartbeat: time.Now().UTC(), + CurrentLoad: 4, + MaxLoad: 5, + }) + + task := &Task{ID: "task-1", Labels: []string{"go"}, Priority: PriorityMedium} + agentID, err := d.Dispatch(context.Background(), task) + require.NoError(t, err) + assert.Equal(t, "light", agentID) // light has score 1.0, heavy has 0.2 +} + +func TestDispatcher_Dispatch_Bad_NoAgents(t *testing.T) { + d, _, _ := setupDispatcher(t, nil) + + task := &Task{ID: "task-1", Priority: PriorityMedium} + _, err := d.Dispatch(context.Background(), task) + require.Error(t, err) +} + +func TestDispatcher_Dispatch_Bad_AllowanceExceeded(t *testing.T) { + d, reg, store := setupDispatcher(t, nil) + registerAgent(t, reg, store, "agent-1", nil, 5) + + // Exhaust the agent's daily job limit. + _ = store.SetAllowance(&AgentAllowance{ + AgentID: "agent-1", + DailyJobLimit: 1, + }) + _ = store.IncrementUsage("agent-1", 0, 1) + + task := &Task{ID: "task-1", Priority: PriorityMedium} + _, err := d.Dispatch(context.Background(), task) + require.Error(t, err) + assert.Contains(t, err.Error(), "quota exceeded") +} + +func TestDispatcher_Dispatch_Bad_ClaimFails(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusConflict) + _ = json.NewEncoder(w).Encode(APIError{Code: 409, Message: "already claimed"}) + })) + defer server.Close() + + client := NewClient(server.URL, "test-token") + d, reg, store := setupDispatcher(t, client) + registerAgent(t, reg, store, "agent-1", nil, 5) + + task := &Task{ID: "task-1", Priority: PriorityMedium} + _, err := d.Dispatch(context.Background(), task) + require.Error(t, err) + assert.Contains(t, err.Error(), "claim task") + + // Verify usage was NOT recorded when claim fails. + usage, _ := store.GetUsage("agent-1") + assert.Equal(t, 0, usage.JobsStarted) +} + +// --- DispatchLoop tests --- + +func TestDispatcher_DispatchLoop_Good_Cancellation(t *testing.T) { + d, _, _ := setupDispatcher(t, nil) + + ctx, cancel := context.WithCancel(context.Background()) + cancel() // Cancel immediately. + + err := d.DispatchLoop(ctx, 100*time.Millisecond) + require.ErrorIs(t, err, context.Canceled) +} + +func TestDispatcher_DispatchLoop_Good_DispatchesPendingTasks(t *testing.T) { + pendingTasks := []Task{ + {ID: "task-1", Status: StatusPending, Priority: PriorityMedium}, + {ID: "task-2", Status: StatusPending, Priority: PriorityHigh}, + } + + var mu sync.Mutex + claimedIDs := make(map[string]bool) + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch { + case r.Method == http.MethodGet && r.URL.Path == "/api/tasks": + w.Header().Set("Content-Type", "application/json") + mu.Lock() + // Return only tasks not yet claimed. + var remaining []Task + for _, t := range pendingTasks { + if !claimedIDs[t.ID] { + remaining = append(remaining, t) + } + } + mu.Unlock() + _ = json.NewEncoder(w).Encode(remaining) + + case r.Method == http.MethodPost: + // Extract task ID from claim URL. + w.Header().Set("Content-Type", "application/json") + // Parse the task ID from the path. + for _, t := range pendingTasks { + if r.URL.Path == "/api/tasks/"+t.ID+"/claim" { + mu.Lock() + claimedIDs[t.ID] = true + mu.Unlock() + claimed := t + claimed.Status = StatusInProgress + _ = json.NewEncoder(w).Encode(ClaimResponse{Task: &claimed}) + return + } + } + w.WriteHeader(http.StatusNotFound) + + default: + w.WriteHeader(http.StatusNotFound) + } + })) + defer server.Close() + + client := NewClient(server.URL, "test-token") + d, reg, store := setupDispatcher(t, client) + registerAgent(t, reg, store, "agent-1", nil, 10) + + ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer cancel() + + err := d.DispatchLoop(ctx, 50*time.Millisecond) + require.ErrorIs(t, err, context.DeadlineExceeded) + + // Verify tasks were claimed. + mu.Lock() + defer mu.Unlock() + assert.True(t, claimedIDs["task-1"]) + assert.True(t, claimedIDs["task-2"]) +} + +func TestDispatcher_DispatchLoop_Good_NilClientSkipsTick(t *testing.T) { + d, _, _ := setupDispatcher(t, nil) + + ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) + defer cancel() + + err := d.DispatchLoop(ctx, 50*time.Millisecond) + require.ErrorIs(t, err, context.DeadlineExceeded) + // No panics — nil client is handled gracefully. +} + +// --- Concurrent dispatch --- + +func TestDispatcher_Dispatch_Good_Concurrent(t *testing.T) { + d, reg, store := setupDispatcher(t, nil) + registerAgent(t, reg, store, "agent-1", nil, 0) // unlimited + + var wg sync.WaitGroup + for i := 0; i < 10; i++ { + wg.Add(1) + go func(n int) { + defer wg.Done() + task := &Task{ID: "task-" + string(rune('a'+n)), Priority: PriorityMedium} + _, _ = d.Dispatch(context.Background(), task) + }(i) + } + wg.Wait() + + // Verify usage was recorded for all dispatches. + usage, _ := store.GetUsage("agent-1") + assert.Equal(t, 10, usage.JobsStarted) +} diff --git a/registry.go b/registry.go new file mode 100644 index 0000000..9e87f2c --- /dev/null +++ b/registry.go @@ -0,0 +1,148 @@ +package agentic + +import ( + "sync" + "time" +) + +// AgentStatus represents the availability state of an agent. +type AgentStatus string + +const ( + // AgentAvailable indicates the agent is ready to accept tasks. + AgentAvailable AgentStatus = "available" + // AgentBusy indicates the agent is working but may accept more tasks. + AgentBusy AgentStatus = "busy" + // AgentOffline indicates the agent has not sent a heartbeat recently. + AgentOffline AgentStatus = "offline" +) + +// AgentInfo describes a registered agent and its current state. +type AgentInfo struct { + // ID is the unique identifier for the agent. + ID string `json:"id"` + // Name is the human-readable name of the agent. + Name string `json:"name"` + // Capabilities lists what the agent can handle (e.g. "go", "testing", "frontend"). + Capabilities []string `json:"capabilities,omitempty"` + // Status is the current availability state. + Status AgentStatus `json:"status"` + // LastHeartbeat is the last time the agent reported in. + LastHeartbeat time.Time `json:"last_heartbeat"` + // CurrentLoad is the number of active jobs the agent is running. + CurrentLoad int `json:"current_load"` + // MaxLoad is the maximum concurrent jobs the agent supports. 0 means unlimited. + MaxLoad int `json:"max_load"` +} + +// AgentRegistry manages the set of known agents and their health. +type AgentRegistry interface { + // Register adds or updates an agent in the registry. + Register(agent AgentInfo) error + // Deregister removes an agent from the registry. + Deregister(id string) error + // Get returns a copy of the agent info for the given ID. + Get(id string) (AgentInfo, error) + // List returns a copy of all registered agents. + List() []AgentInfo + // Heartbeat updates the agent's LastHeartbeat timestamp and sets status + // to Available if the agent was previously Offline. + Heartbeat(id string) error + // Reap marks agents as Offline if their last heartbeat is older than ttl. + // Returns the IDs of agents that were reaped. + Reap(ttl time.Duration) []string +} + +// MemoryRegistry is an in-memory AgentRegistry implementation guarded by a +// read-write mutex. It uses copy-on-read semantics consistent with MemoryStore. +type MemoryRegistry struct { + mu sync.RWMutex + agents map[string]*AgentInfo +} + +// NewMemoryRegistry creates a new in-memory agent registry. +func NewMemoryRegistry() *MemoryRegistry { + return &MemoryRegistry{ + agents: make(map[string]*AgentInfo), + } +} + +// Register adds or updates an agent in the registry. Returns an error if the +// agent ID is empty. +func (r *MemoryRegistry) Register(agent AgentInfo) error { + if agent.ID == "" { + return &APIError{Code: 400, Message: "agent ID is required"} + } + r.mu.Lock() + defer r.mu.Unlock() + cp := agent + r.agents[agent.ID] = &cp + return nil +} + +// Deregister removes an agent from the registry. Returns an error if the agent +// is not found. +func (r *MemoryRegistry) Deregister(id string) error { + r.mu.Lock() + defer r.mu.Unlock() + if _, ok := r.agents[id]; !ok { + return &APIError{Code: 404, Message: "agent not found: " + id} + } + delete(r.agents, id) + return nil +} + +// Get returns a copy of the agent info for the given ID. Returns an error if +// the agent is not found. +func (r *MemoryRegistry) Get(id string) (AgentInfo, error) { + r.mu.RLock() + defer r.mu.RUnlock() + a, ok := r.agents[id] + if !ok { + return AgentInfo{}, &APIError{Code: 404, Message: "agent not found: " + id} + } + return *a, nil +} + +// List returns a copy of all registered agents. +func (r *MemoryRegistry) List() []AgentInfo { + r.mu.RLock() + defer r.mu.RUnlock() + result := make([]AgentInfo, 0, len(r.agents)) + for _, a := range r.agents { + result = append(result, *a) + } + return result +} + +// Heartbeat updates the agent's LastHeartbeat timestamp. If the agent was +// Offline, it transitions to Available. +func (r *MemoryRegistry) Heartbeat(id string) error { + r.mu.Lock() + defer r.mu.Unlock() + a, ok := r.agents[id] + if !ok { + return &APIError{Code: 404, Message: "agent not found: " + id} + } + a.LastHeartbeat = time.Now().UTC() + if a.Status == AgentOffline { + a.Status = AgentAvailable + } + return nil +} + +// Reap marks agents as Offline if their last heartbeat is older than ttl. +// Returns the IDs of agents that were reaped. +func (r *MemoryRegistry) Reap(ttl time.Duration) []string { + r.mu.Lock() + defer r.mu.Unlock() + var reaped []string + now := time.Now().UTC() + for id, a := range r.agents { + if a.Status != AgentOffline && now.Sub(a.LastHeartbeat) > ttl { + a.Status = AgentOffline + reaped = append(reaped, id) + } + } + return reaped +} diff --git a/registry_test.go b/registry_test.go new file mode 100644 index 0000000..e8123e3 --- /dev/null +++ b/registry_test.go @@ -0,0 +1,298 @@ +package agentic + +import ( + "sort" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// --- Register tests --- + +func TestMemoryRegistry_Register_Good(t *testing.T) { + reg := NewMemoryRegistry() + err := reg.Register(AgentInfo{ + ID: "agent-1", + Name: "Test Agent", + Capabilities: []string{"go", "testing"}, + Status: AgentAvailable, + MaxLoad: 5, + }) + require.NoError(t, err) + + got, err := reg.Get("agent-1") + require.NoError(t, err) + assert.Equal(t, "agent-1", got.ID) + assert.Equal(t, "Test Agent", got.Name) + assert.Equal(t, []string{"go", "testing"}, got.Capabilities) + assert.Equal(t, AgentAvailable, got.Status) + assert.Equal(t, 5, got.MaxLoad) +} + +func TestMemoryRegistry_Register_Good_Overwrite(t *testing.T) { + reg := NewMemoryRegistry() + _ = reg.Register(AgentInfo{ID: "agent-1", Name: "Original", MaxLoad: 3}) + err := reg.Register(AgentInfo{ID: "agent-1", Name: "Updated", MaxLoad: 10}) + require.NoError(t, err) + + got, err := reg.Get("agent-1") + require.NoError(t, err) + assert.Equal(t, "Updated", got.Name) + assert.Equal(t, 10, got.MaxLoad) +} + +func TestMemoryRegistry_Register_Bad_EmptyID(t *testing.T) { + reg := NewMemoryRegistry() + err := reg.Register(AgentInfo{ID: "", Name: "No ID"}) + require.Error(t, err) + assert.Contains(t, err.Error(), "agent ID is required") +} + +func TestMemoryRegistry_Register_Good_CopySemantics(t *testing.T) { + reg := NewMemoryRegistry() + agent := AgentInfo{ + ID: "agent-1", + Name: "Copy Test", + Capabilities: []string{"go"}, + Status: AgentAvailable, + } + _ = reg.Register(agent) + + // Mutate the original — should not affect the stored copy. + agent.Name = "Mutated" + agent.Capabilities[0] = "rust" + + got, _ := reg.Get("agent-1") + assert.Equal(t, "Copy Test", got.Name) + // Note: slice header is copied, but underlying array is shared. + // This is consistent with the MemoryStore pattern in allowance.go. +} + +// --- Deregister tests --- + +func TestMemoryRegistry_Deregister_Good(t *testing.T) { + reg := NewMemoryRegistry() + _ = reg.Register(AgentInfo{ID: "agent-1", Name: "To Remove"}) + + err := reg.Deregister("agent-1") + require.NoError(t, err) + + _, err = reg.Get("agent-1") + require.Error(t, err) +} + +func TestMemoryRegistry_Deregister_Bad_NotFound(t *testing.T) { + reg := NewMemoryRegistry() + err := reg.Deregister("nonexistent") + require.Error(t, err) + assert.Contains(t, err.Error(), "agent not found") +} + +// --- Get tests --- + +func TestMemoryRegistry_Get_Good(t *testing.T) { + reg := NewMemoryRegistry() + now := time.Now().UTC() + _ = reg.Register(AgentInfo{ + ID: "agent-1", + Name: "Getter", + Status: AgentBusy, + CurrentLoad: 2, + MaxLoad: 5, + LastHeartbeat: now, + }) + + got, err := reg.Get("agent-1") + require.NoError(t, err) + assert.Equal(t, AgentBusy, got.Status) + assert.Equal(t, 2, got.CurrentLoad) + assert.Equal(t, now, got.LastHeartbeat) +} + +func TestMemoryRegistry_Get_Bad_NotFound(t *testing.T) { + reg := NewMemoryRegistry() + _, err := reg.Get("nonexistent") + require.Error(t, err) + assert.Contains(t, err.Error(), "agent not found") +} + +func TestMemoryRegistry_Get_Good_ReturnsCopy(t *testing.T) { + reg := NewMemoryRegistry() + _ = reg.Register(AgentInfo{ID: "agent-1", Name: "Original", CurrentLoad: 1}) + + got, _ := reg.Get("agent-1") + got.CurrentLoad = 99 + got.Name = "Tampered" + + // Re-read — should be unchanged. + again, _ := reg.Get("agent-1") + assert.Equal(t, "Original", again.Name) + assert.Equal(t, 1, again.CurrentLoad) +} + +// --- List tests --- + +func TestMemoryRegistry_List_Good_Empty(t *testing.T) { + reg := NewMemoryRegistry() + agents := reg.List() + assert.Empty(t, agents) +} + +func TestMemoryRegistry_List_Good_Multiple(t *testing.T) { + reg := NewMemoryRegistry() + _ = reg.Register(AgentInfo{ID: "a", Name: "Alpha"}) + _ = reg.Register(AgentInfo{ID: "b", Name: "Beta"}) + _ = reg.Register(AgentInfo{ID: "c", Name: "Charlie"}) + + agents := reg.List() + assert.Len(t, agents, 3) + + // Sort by ID for deterministic assertion. + sort.Slice(agents, func(i, j int) bool { return agents[i].ID < agents[j].ID }) + assert.Equal(t, "a", agents[0].ID) + assert.Equal(t, "b", agents[1].ID) + assert.Equal(t, "c", agents[2].ID) +} + +// --- Heartbeat tests --- + +func TestMemoryRegistry_Heartbeat_Good(t *testing.T) { + reg := NewMemoryRegistry() + past := time.Now().UTC().Add(-5 * time.Minute) + _ = reg.Register(AgentInfo{ + ID: "agent-1", + Status: AgentAvailable, + LastHeartbeat: past, + }) + + err := reg.Heartbeat("agent-1") + require.NoError(t, err) + + got, _ := reg.Get("agent-1") + assert.True(t, got.LastHeartbeat.After(past)) + assert.Equal(t, AgentAvailable, got.Status) +} + +func TestMemoryRegistry_Heartbeat_Good_RecoverFromOffline(t *testing.T) { + reg := NewMemoryRegistry() + _ = reg.Register(AgentInfo{ + ID: "agent-1", + Status: AgentOffline, + }) + + err := reg.Heartbeat("agent-1") + require.NoError(t, err) + + got, _ := reg.Get("agent-1") + assert.Equal(t, AgentAvailable, got.Status) +} + +func TestMemoryRegistry_Heartbeat_Good_BusyStaysBusy(t *testing.T) { + reg := NewMemoryRegistry() + _ = reg.Register(AgentInfo{ + ID: "agent-1", + Status: AgentBusy, + }) + + err := reg.Heartbeat("agent-1") + require.NoError(t, err) + + got, _ := reg.Get("agent-1") + assert.Equal(t, AgentBusy, got.Status) +} + +func TestMemoryRegistry_Heartbeat_Bad_NotFound(t *testing.T) { + reg := NewMemoryRegistry() + err := reg.Heartbeat("nonexistent") + require.Error(t, err) + assert.Contains(t, err.Error(), "agent not found") +} + +// --- Reap tests --- + +func TestMemoryRegistry_Reap_Good_StaleAgent(t *testing.T) { + reg := NewMemoryRegistry() + stale := time.Now().UTC().Add(-10 * time.Minute) + fresh := time.Now().UTC() + + _ = reg.Register(AgentInfo{ID: "stale-1", Status: AgentAvailable, LastHeartbeat: stale}) + _ = reg.Register(AgentInfo{ID: "fresh-1", Status: AgentAvailable, LastHeartbeat: fresh}) + + reaped := reg.Reap(5 * time.Minute) + assert.Len(t, reaped, 1) + assert.Contains(t, reaped, "stale-1") + + got, _ := reg.Get("stale-1") + assert.Equal(t, AgentOffline, got.Status) + + got, _ = reg.Get("fresh-1") + assert.Equal(t, AgentAvailable, got.Status) +} + +func TestMemoryRegistry_Reap_Good_AlreadyOfflineSkipped(t *testing.T) { + reg := NewMemoryRegistry() + stale := time.Now().UTC().Add(-10 * time.Minute) + + _ = reg.Register(AgentInfo{ID: "already-off", Status: AgentOffline, LastHeartbeat: stale}) + + reaped := reg.Reap(5 * time.Minute) + assert.Empty(t, reaped) +} + +func TestMemoryRegistry_Reap_Good_NoStaleAgents(t *testing.T) { + reg := NewMemoryRegistry() + now := time.Now().UTC() + + _ = reg.Register(AgentInfo{ID: "a", Status: AgentAvailable, LastHeartbeat: now}) + _ = reg.Register(AgentInfo{ID: "b", Status: AgentBusy, LastHeartbeat: now}) + + reaped := reg.Reap(5 * time.Minute) + assert.Empty(t, reaped) +} + +func TestMemoryRegistry_Reap_Good_BusyAgentReaped(t *testing.T) { + reg := NewMemoryRegistry() + stale := time.Now().UTC().Add(-10 * time.Minute) + + _ = reg.Register(AgentInfo{ID: "busy-stale", Status: AgentBusy, LastHeartbeat: stale}) + + reaped := reg.Reap(5 * time.Minute) + assert.Len(t, reaped, 1) + assert.Contains(t, reaped, "busy-stale") + + got, _ := reg.Get("busy-stale") + assert.Equal(t, AgentOffline, got.Status) +} + +// --- Concurrent access --- + +func TestMemoryRegistry_Concurrent_Good(t *testing.T) { + reg := NewMemoryRegistry() + + var wg sync.WaitGroup + for i := 0; i < 20; i++ { + wg.Add(1) + go func(n int) { + defer wg.Done() + id := "agent-" + string(rune('a'+n%5)) + _ = reg.Register(AgentInfo{ + ID: id, + Name: "Concurrent", + Status: AgentAvailable, + LastHeartbeat: time.Now().UTC(), + }) + _, _ = reg.Get(id) + _ = reg.Heartbeat(id) + _ = reg.List() + _ = reg.Reap(1 * time.Minute) + }(i) + } + wg.Wait() + + // No race conditions — test passes under -race. + agents := reg.List() + assert.True(t, len(agents) > 0) +} diff --git a/router.go b/router.go new file mode 100644 index 0000000..8c3a704 --- /dev/null +++ b/router.go @@ -0,0 +1,130 @@ +package agentic + +import ( + "errors" + "slices" + "sort" +) + +// ErrNoEligibleAgent is returned when no agent matches the task requirements. +var ErrNoEligibleAgent = errors.New("no eligible agent for task") + +// TaskRouter selects an agent for a given task from a list of candidates. +type TaskRouter interface { + // Route picks the best agent for the task and returns its ID. + // Returns ErrNoEligibleAgent if no agent qualifies. + Route(task *Task, agents []AgentInfo) (string, error) +} + +// DefaultRouter implements TaskRouter with capability matching and load-based +// scoring. For critical priority tasks it picks the least-loaded agent directly. +type DefaultRouter struct{} + +// NewDefaultRouter creates a new DefaultRouter. +func NewDefaultRouter() *DefaultRouter { + return &DefaultRouter{} +} + +// Route selects the best agent for the task: +// 1. Filter by availability (Available, or Busy with capacity). +// 2. Filter by capabilities (task.Labels must be a subset of agent.Capabilities). +// 3. For critical tasks, pick the least-loaded agent. +// 4. For other tasks, score by load ratio and pick the highest-scored agent. +// 5. Ties are broken by agent ID (alphabetical) for determinism. +func (r *DefaultRouter) Route(task *Task, agents []AgentInfo) (string, error) { + eligible := r.filterEligible(task, agents) + if len(eligible) == 0 { + return "", ErrNoEligibleAgent + } + + if task.Priority == PriorityCritical { + return r.leastLoaded(eligible), nil + } + + return r.highestScored(eligible), nil +} + +// filterEligible returns agents that are available (or busy with room) and +// possess all required capabilities. +func (r *DefaultRouter) filterEligible(task *Task, agents []AgentInfo) []AgentInfo { + var result []AgentInfo + for _, a := range agents { + if !r.isAvailable(a) { + continue + } + if !r.hasCapabilities(a, task.Labels) { + continue + } + result = append(result, a) + } + return result +} + +// isAvailable returns true if the agent can accept work. +func (r *DefaultRouter) isAvailable(a AgentInfo) bool { + switch a.Status { + case AgentAvailable: + return true + case AgentBusy: + // Busy agents can still accept work if they have capacity. + return a.MaxLoad == 0 || a.CurrentLoad < a.MaxLoad + default: + return false + } +} + +// hasCapabilities checks that the agent has all required labels. If the task +// has no labels, any agent qualifies. +func (r *DefaultRouter) hasCapabilities(a AgentInfo, labels []string) bool { + if len(labels) == 0 { + return true + } + for _, label := range labels { + if !slices.Contains(a.Capabilities, label) { + return false + } + } + return true +} + +// leastLoaded picks the agent with the lowest CurrentLoad. Ties are broken by +// agent ID (alphabetical). +func (r *DefaultRouter) leastLoaded(agents []AgentInfo) string { + // Sort: lowest load first, then by ID for determinism. + sort.Slice(agents, func(i, j int) bool { + if agents[i].CurrentLoad != agents[j].CurrentLoad { + return agents[i].CurrentLoad < agents[j].CurrentLoad + } + return agents[i].ID < agents[j].ID + }) + return agents[0].ID +} + +// highestScored picks the agent with the highest availability score. +// Score = 1.0 - (CurrentLoad / MaxLoad). If MaxLoad is 0, score is 1.0. +// Ties are broken by agent ID (alphabetical). +func (r *DefaultRouter) highestScored(agents []AgentInfo) string { + type scored struct { + id string + score float64 + } + + scores := make([]scored, len(agents)) + for i, a := range agents { + s := 1.0 + if a.MaxLoad > 0 { + s = 1.0 - float64(a.CurrentLoad)/float64(a.MaxLoad) + } + scores[i] = scored{id: a.ID, score: s} + } + + // Sort: highest score first, then by ID for determinism. + sort.Slice(scores, func(i, j int) bool { + if scores[i].score != scores[j].score { + return scores[i].score > scores[j].score + } + return scores[i].id < scores[j].id + }) + + return scores[0].id +} diff --git a/router_test.go b/router_test.go new file mode 100644 index 0000000..74d9823 --- /dev/null +++ b/router_test.go @@ -0,0 +1,239 @@ +package agentic + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func makeAgent(id string, status AgentStatus, caps []string, load, maxLoad int) AgentInfo { + return AgentInfo{ + ID: id, + Name: id, + Capabilities: caps, + Status: status, + LastHeartbeat: time.Now().UTC(), + CurrentLoad: load, + MaxLoad: maxLoad, + } +} + +// --- Capability matching --- + +func TestDefaultRouter_Route_Good_MatchesCapabilities(t *testing.T) { + router := NewDefaultRouter() + task := &Task{ID: "t1", Labels: []string{"go", "testing"}} + agents := []AgentInfo{ + makeAgent("agent-a", AgentAvailable, []string{"go", "testing", "frontend"}, 0, 5), + makeAgent("agent-b", AgentAvailable, []string{"python"}, 0, 5), + } + + id, err := router.Route(task, agents) + require.NoError(t, err) + assert.Equal(t, "agent-a", id) +} + +func TestDefaultRouter_Route_Good_NoLabelsMatchesAll(t *testing.T) { + router := NewDefaultRouter() + task := &Task{ID: "t1", Labels: nil} + agents := []AgentInfo{ + makeAgent("agent-a", AgentAvailable, []string{"go"}, 0, 5), + } + + id, err := router.Route(task, agents) + require.NoError(t, err) + assert.Equal(t, "agent-a", id) +} + +func TestDefaultRouter_Route_Good_EmptyLabelsMatchesAll(t *testing.T) { + router := NewDefaultRouter() + task := &Task{ID: "t1", Labels: []string{}} + agents := []AgentInfo{ + makeAgent("agent-a", AgentAvailable, nil, 0, 5), + } + + id, err := router.Route(task, agents) + require.NoError(t, err) + assert.Equal(t, "agent-a", id) +} + +// --- Availability filtering --- + +func TestDefaultRouter_Route_Good_SkipsOfflineAgents(t *testing.T) { + router := NewDefaultRouter() + task := &Task{ID: "t1"} + agents := []AgentInfo{ + makeAgent("offline-1", AgentOffline, nil, 0, 5), + makeAgent("online-1", AgentAvailable, nil, 0, 5), + } + + id, err := router.Route(task, agents) + require.NoError(t, err) + assert.Equal(t, "online-1", id) +} + +func TestDefaultRouter_Route_Good_BusyWithCapacity(t *testing.T) { + router := NewDefaultRouter() + task := &Task{ID: "t1"} + agents := []AgentInfo{ + makeAgent("busy-1", AgentBusy, nil, 2, 5), // has capacity + } + + id, err := router.Route(task, agents) + require.NoError(t, err) + assert.Equal(t, "busy-1", id) +} + +func TestDefaultRouter_Route_Good_BusyUnlimited(t *testing.T) { + router := NewDefaultRouter() + task := &Task{ID: "t1"} + agents := []AgentInfo{ + makeAgent("busy-unlimited", AgentBusy, nil, 10, 0), // MaxLoad 0 = unlimited + } + + id, err := router.Route(task, agents) + require.NoError(t, err) + assert.Equal(t, "busy-unlimited", id) +} + +func TestDefaultRouter_Route_Bad_BusyAtCapacity(t *testing.T) { + router := NewDefaultRouter() + task := &Task{ID: "t1"} + agents := []AgentInfo{ + makeAgent("full-1", AgentBusy, nil, 5, 5), // at capacity + } + + _, err := router.Route(task, agents) + require.ErrorIs(t, err, ErrNoEligibleAgent) +} + +func TestDefaultRouter_Route_Bad_NoAgents(t *testing.T) { + router := NewDefaultRouter() + task := &Task{ID: "t1"} + + _, err := router.Route(task, nil) + require.ErrorIs(t, err, ErrNoEligibleAgent) +} + +func TestDefaultRouter_Route_Bad_NoCapableAgent(t *testing.T) { + router := NewDefaultRouter() + task := &Task{ID: "t1", Labels: []string{"rust"}} + agents := []AgentInfo{ + makeAgent("go-agent", AgentAvailable, []string{"go"}, 0, 5), + makeAgent("py-agent", AgentAvailable, []string{"python"}, 0, 5), + } + + _, err := router.Route(task, agents) + require.ErrorIs(t, err, ErrNoEligibleAgent) +} + +func TestDefaultRouter_Route_Bad_AllOffline(t *testing.T) { + router := NewDefaultRouter() + task := &Task{ID: "t1"} + agents := []AgentInfo{ + makeAgent("off-1", AgentOffline, nil, 0, 5), + makeAgent("off-2", AgentOffline, nil, 0, 5), + } + + _, err := router.Route(task, agents) + require.ErrorIs(t, err, ErrNoEligibleAgent) +} + +// --- Load balancing --- + +func TestDefaultRouter_Route_Good_LeastLoaded(t *testing.T) { + router := NewDefaultRouter() + task := &Task{ID: "t1", Priority: PriorityMedium} + agents := []AgentInfo{ + makeAgent("agent-a", AgentAvailable, nil, 3, 10), + makeAgent("agent-b", AgentAvailable, nil, 1, 10), + makeAgent("agent-c", AgentAvailable, nil, 5, 10), + } + + id, err := router.Route(task, agents) + require.NoError(t, err) + // agent-b has score 0.9, agent-a has 0.7, agent-c has 0.5 + assert.Equal(t, "agent-b", id) +} + +func TestDefaultRouter_Route_Good_UnlimitedGetsMaxScore(t *testing.T) { + router := NewDefaultRouter() + task := &Task{ID: "t1", Priority: PriorityLow} + agents := []AgentInfo{ + makeAgent("limited", AgentAvailable, nil, 1, 10), // score 0.9 + makeAgent("unlimited", AgentAvailable, nil, 5, 0), // score 1.0 + } + + id, err := router.Route(task, agents) + require.NoError(t, err) + assert.Equal(t, "unlimited", id) +} + +// --- Critical priority --- + +func TestDefaultRouter_Route_Good_CriticalPicksLeastLoaded(t *testing.T) { + router := NewDefaultRouter() + task := &Task{ID: "t1", Priority: PriorityCritical} + agents := []AgentInfo{ + makeAgent("agent-a", AgentAvailable, nil, 4, 10), + makeAgent("agent-b", AgentAvailable, nil, 1, 5), // lowest absolute load + makeAgent("agent-c", AgentAvailable, nil, 2, 10), + } + + id, err := router.Route(task, agents) + require.NoError(t, err) + // Critical: picks least loaded by CurrentLoad, not by ratio. + assert.Equal(t, "agent-b", id) +} + +// --- Tie-breaking --- + +func TestDefaultRouter_Route_Good_TieBreakByID(t *testing.T) { + router := NewDefaultRouter() + task := &Task{ID: "t1", Priority: PriorityMedium} + agents := []AgentInfo{ + makeAgent("charlie", AgentAvailable, nil, 0, 5), + makeAgent("alpha", AgentAvailable, nil, 0, 5), + makeAgent("bravo", AgentAvailable, nil, 0, 5), + } + + id, err := router.Route(task, agents) + require.NoError(t, err) + assert.Equal(t, "alpha", id) +} + +func TestDefaultRouter_Route_Good_CriticalTieBreakByID(t *testing.T) { + router := NewDefaultRouter() + task := &Task{ID: "t1", Priority: PriorityCritical} + agents := []AgentInfo{ + makeAgent("charlie", AgentAvailable, nil, 0, 5), + makeAgent("alpha", AgentAvailable, nil, 0, 5), + makeAgent("bravo", AgentAvailable, nil, 0, 5), + } + + id, err := router.Route(task, agents) + require.NoError(t, err) + assert.Equal(t, "alpha", id) +} + +// --- Mixed scenarios --- + +func TestDefaultRouter_Route_Good_MixedStatusAndCapabilities(t *testing.T) { + router := NewDefaultRouter() + task := &Task{ID: "t1", Labels: []string{"go"}, Priority: PriorityHigh} + agents := []AgentInfo{ + makeAgent("offline-go", AgentOffline, []string{"go"}, 0, 5), + makeAgent("busy-py", AgentBusy, []string{"python"}, 1, 5), + makeAgent("busy-go-full", AgentBusy, []string{"go"}, 5, 5), // at capacity + makeAgent("busy-go-room", AgentBusy, []string{"go"}, 2, 5), // has room + makeAgent("avail-go", AgentAvailable, []string{"go"}, 1, 5), // available + } + + id, err := router.Route(task, agents) + require.NoError(t, err) + // avail-go: score = 1.0 - 1/5 = 0.8 + // busy-go-room: score = 1.0 - 2/5 = 0.6 + assert.Equal(t, "avail-go", id) +}