feat(coordination): add agent registry, task router, and dispatcher

Multi-agent coordination layer:
- AgentRegistry interface + MemoryRegistry (heartbeat, reap, discovery)
- TaskRouter interface + DefaultRouter (capability matching, load balancing)
- Dispatcher orchestrates registry + router + allowance for task dispatch

Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
Snider 2026-02-20 07:16:53 +00:00
parent 3e43233e0e
commit 646cc0261b
7 changed files with 1231 additions and 9 deletions

49
TODO.md
View file

@ -17,17 +17,48 @@
## Phase 3: Multi-Agent Coordination
- [ ] Currently single-agent -- only one Claude subprocess at a time
- [ ] Add agent discovery (register capabilities, heartbeat)
- [ ] Add task routing based on agent capabilities and current load
- [ ] Add load balancing across multiple agent instances
### 3.1 Agent Registry
## Phase 4: CLI Integration
- [x] **Create `registry.go`**`AgentInfo` struct (ID, Name, Capabilities []string, Status enum, LastHeartbeat, CurrentLoad int, MaxLoad int), `AgentStatus` enum (Available/Busy/Offline)
- [x] **`AgentRegistry` interface** — `Register(AgentInfo) error`, `Deregister(id string) error`, `Get(id string) (AgentInfo, error)`, `List() []AgentInfo`, `Heartbeat(id string) error`, `Reap(ttl time.Duration) []string` (returns IDs of reaped agents)
- [x] **`MemoryRegistry` implementation** — `sync.RWMutex` guarded map, `Reap()` marks agents offline if heartbeat older than TTL
- [x] **Tests** — registration, deregistration, heartbeat updates, reap stale agents, concurrent access
- [ ] `client.go` talks to REST API -- wire into `core agent` CLI command group
- [ ] `core agent status` -- show running agents, task queue depth, allowance remaining
- [ ] `core agent dispatch <task>` -- submit task to queue from CLI
- [ ] `core agent logs <task-id>` -- stream task output via WebSocket
### 3.2 Task Router
- [x] **Create `router.go`**`TaskRouter` interface with `Route(task *Task, agents []AgentInfo) (string, error)` returning agent ID
- [x] **`DefaultRouter` implementation** — capability matching (task.Labels ⊆ agent.Capabilities), then least-loaded agent (CurrentLoad / MaxLoad ratio), priority weighting (critical tasks skip load balancing)
- [x] **`ErrNoEligibleAgent`** sentinel error when no agents match capabilities or all are at capacity
- [x] **Tests** — capability matching, load distribution, critical priority bypass, no eligible agent error, tie-breaking by agent ID (deterministic)
### 3.3 Dispatcher
- [x] **Create `dispatcher.go`**`Dispatcher` struct wrapping `AgentRegistry`, `TaskRouter`, `AllowanceService`, and `Client`
- [x] **`Dispatch(ctx, task) (string, error)`** — Route → allowance check → claim via client → record usage. Returns assigned agent ID
- [x] **`DispatchLoop(ctx, interval)`** — polls client.ListTasks(pending) → dispatches each. Respects context cancellation
- [x] **Tests** — full dispatch flow with mock client, allowance rejection path, no-agent-available path, loop cancellation
## Phase 4: CLI Backing Functions
Phase 4 provides the data-fetching and formatting functions that `core agent` CLI commands will call. The CLI commands themselves live in `core/cli`.
### 4.1 Status Summary
- [ ] **Create `status.go`**`StatusSummary` struct (Agents []AgentInfo, PendingTasks int, InProgressTasks int, AllowanceRemaining map[string]int64)
- [ ] **`GetStatus(ctx, registry, client, allowanceSvc) (*StatusSummary, error)`** — aggregates registry.List(), client.ListTasks counts, allowance remaining per agent
- [ ] **`FormatStatus(summary) string`** — tabular text output for CLI rendering
- [ ] **Tests** — with mock registry + nil client, full summary with mock client
### 4.2 Task Submission
- [ ] **`SubmitTask(ctx, client, title, description, labels, priority) (*Task, error)`** — creates a new task via client (requires new Client.CreateTask method)
- [ ] **Add `Client.CreateTask(ctx, task) (*Task, error)`** — POST /api/tasks
- [ ] **Tests** — creation with all fields, validation (empty title), httptest mock
### 4.3 Log Streaming
- [ ] **Create `logs.go`**`StreamLogs(ctx, client, taskID, writer) error` — polls task updates and writes progress to io.Writer
- [ ] **Tests** — mock client with progress updates, context cancellation
---

109
dispatcher.go Normal file
View file

@ -0,0 +1,109 @@
package agentic
import (
"context"
"time"
"forge.lthn.ai/core/go/pkg/log"
)
// Dispatcher orchestrates task dispatch by combining the agent registry,
// task router, allowance service, and API client.
type Dispatcher struct {
registry AgentRegistry
router TaskRouter
allowance *AllowanceService
client *Client // can be nil for tests
}
// NewDispatcher creates a new Dispatcher with the given dependencies.
func NewDispatcher(registry AgentRegistry, router TaskRouter, allowance *AllowanceService, client *Client) *Dispatcher {
return &Dispatcher{
registry: registry,
router: router,
allowance: allowance,
client: client,
}
}
// Dispatch assigns a task to the best available agent. It queries the registry
// for available agents, routes the task, checks the agent's allowance, claims
// the task via the API client (if present), and records usage. Returns the
// assigned agent ID.
func (d *Dispatcher) Dispatch(ctx context.Context, task *Task) (string, error) {
const op = "Dispatcher.Dispatch"
// 1. Get available agents from registry.
agents := d.registry.List()
// 2. Route task to best agent.
agentID, err := d.router.Route(task, agents)
if err != nil {
return "", log.E(op, "routing failed", err)
}
// 3. Check allowance for the selected agent.
check, err := d.allowance.Check(agentID, "")
if err != nil {
return "", log.E(op, "allowance check failed", err)
}
if !check.Allowed {
return "", log.E(op, "agent quota exceeded: "+check.Reason, nil)
}
// 4. Claim the task via the API client (if available).
if d.client != nil {
if _, err := d.client.ClaimTask(ctx, task.ID); err != nil {
return "", log.E(op, "failed to claim task", err)
}
}
// 5. Record job start usage.
if err := d.allowance.RecordUsage(UsageReport{
AgentID: agentID,
JobID: task.ID,
Event: QuotaEventJobStarted,
Timestamp: time.Now().UTC(),
}); err != nil {
return "", log.E(op, "failed to record usage", err)
}
return agentID, nil
}
// DispatchLoop polls for pending tasks at the given interval and dispatches
// each one. It runs until the context is cancelled and returns ctx.Err().
func (d *Dispatcher) DispatchLoop(ctx context.Context, interval time.Duration) error {
const op = "Dispatcher.DispatchLoop"
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
if d.client == nil {
continue
}
tasks, err := d.client.ListTasks(ctx, ListOptions{Status: StatusPending})
if err != nil {
// Log but continue — transient API errors should not stop the loop.
_ = log.E(op, "failed to list pending tasks", err)
continue
}
for i := range tasks {
if ctx.Err() != nil {
return ctx.Err()
}
if _, err := d.Dispatch(ctx, &tasks[i]); err != nil {
// Log dispatch errors but continue with the next task.
_ = log.E(op, "failed to dispatch task "+tasks[i].ID, err)
}
}
}
}
}

267
dispatcher_test.go Normal file
View file

@ -0,0 +1,267 @@
package agentic
import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// setupDispatcher creates a Dispatcher with a memory registry, default router,
// and memory allowance store, pre-loaded with agents and allowances.
func setupDispatcher(t *testing.T, client *Client) (*Dispatcher, *MemoryRegistry, *MemoryStore) {
t.Helper()
reg := NewMemoryRegistry()
router := NewDefaultRouter()
store := NewMemoryStore()
svc := NewAllowanceService(store)
d := NewDispatcher(reg, router, svc, client)
return d, reg, store
}
func registerAgent(t *testing.T, reg *MemoryRegistry, store *MemoryStore, id string, caps []string, maxLoad int) {
t.Helper()
_ = reg.Register(AgentInfo{
ID: id,
Name: id,
Capabilities: caps,
Status: AgentAvailable,
LastHeartbeat: time.Now().UTC(),
MaxLoad: maxLoad,
})
_ = store.SetAllowance(&AgentAllowance{
AgentID: id,
DailyTokenLimit: 100000,
DailyJobLimit: 50,
ConcurrentJobs: 5,
})
}
// --- Dispatch tests ---
func TestDispatcher_Dispatch_Good_NilClient(t *testing.T) {
d, reg, store := setupDispatcher(t, nil)
registerAgent(t, reg, store, "agent-1", []string{"go"}, 5)
task := &Task{ID: "task-1", Labels: []string{"go"}, Priority: PriorityMedium}
agentID, err := d.Dispatch(context.Background(), task)
require.NoError(t, err)
assert.Equal(t, "agent-1", agentID)
// Verify usage was recorded.
usage, _ := store.GetUsage("agent-1")
assert.Equal(t, 1, usage.JobsStarted)
assert.Equal(t, 1, usage.ActiveJobs)
}
func TestDispatcher_Dispatch_Good_WithHTTPClient(t *testing.T) {
claimedTask := Task{ID: "task-1", Status: StatusInProgress, ClaimedBy: "agent-1"}
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodPost && r.URL.Path == "/api/tasks/task-1/claim" {
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(ClaimResponse{Task: &claimedTask})
return
}
w.WriteHeader(http.StatusNotFound)
}))
defer server.Close()
client := NewClient(server.URL, "test-token")
d, reg, store := setupDispatcher(t, client)
registerAgent(t, reg, store, "agent-1", nil, 5)
task := &Task{ID: "task-1", Priority: PriorityHigh}
agentID, err := d.Dispatch(context.Background(), task)
require.NoError(t, err)
assert.Equal(t, "agent-1", agentID)
// Verify usage recorded.
usage, _ := store.GetUsage("agent-1")
assert.Equal(t, 1, usage.JobsStarted)
}
func TestDispatcher_Dispatch_Good_PicksBestAgent(t *testing.T) {
d, reg, store := setupDispatcher(t, nil)
registerAgent(t, reg, store, "heavy", []string{"go"}, 5)
registerAgent(t, reg, store, "light", []string{"go"}, 5)
// Give "heavy" some load.
_ = reg.Register(AgentInfo{
ID: "heavy",
Name: "heavy",
Capabilities: []string{"go"},
Status: AgentAvailable,
LastHeartbeat: time.Now().UTC(),
CurrentLoad: 4,
MaxLoad: 5,
})
task := &Task{ID: "task-1", Labels: []string{"go"}, Priority: PriorityMedium}
agentID, err := d.Dispatch(context.Background(), task)
require.NoError(t, err)
assert.Equal(t, "light", agentID) // light has score 1.0, heavy has 0.2
}
func TestDispatcher_Dispatch_Bad_NoAgents(t *testing.T) {
d, _, _ := setupDispatcher(t, nil)
task := &Task{ID: "task-1", Priority: PriorityMedium}
_, err := d.Dispatch(context.Background(), task)
require.Error(t, err)
}
func TestDispatcher_Dispatch_Bad_AllowanceExceeded(t *testing.T) {
d, reg, store := setupDispatcher(t, nil)
registerAgent(t, reg, store, "agent-1", nil, 5)
// Exhaust the agent's daily job limit.
_ = store.SetAllowance(&AgentAllowance{
AgentID: "agent-1",
DailyJobLimit: 1,
})
_ = store.IncrementUsage("agent-1", 0, 1)
task := &Task{ID: "task-1", Priority: PriorityMedium}
_, err := d.Dispatch(context.Background(), task)
require.Error(t, err)
assert.Contains(t, err.Error(), "quota exceeded")
}
func TestDispatcher_Dispatch_Bad_ClaimFails(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusConflict)
_ = json.NewEncoder(w).Encode(APIError{Code: 409, Message: "already claimed"})
}))
defer server.Close()
client := NewClient(server.URL, "test-token")
d, reg, store := setupDispatcher(t, client)
registerAgent(t, reg, store, "agent-1", nil, 5)
task := &Task{ID: "task-1", Priority: PriorityMedium}
_, err := d.Dispatch(context.Background(), task)
require.Error(t, err)
assert.Contains(t, err.Error(), "claim task")
// Verify usage was NOT recorded when claim fails.
usage, _ := store.GetUsage("agent-1")
assert.Equal(t, 0, usage.JobsStarted)
}
// --- DispatchLoop tests ---
func TestDispatcher_DispatchLoop_Good_Cancellation(t *testing.T) {
d, _, _ := setupDispatcher(t, nil)
ctx, cancel := context.WithCancel(context.Background())
cancel() // Cancel immediately.
err := d.DispatchLoop(ctx, 100*time.Millisecond)
require.ErrorIs(t, err, context.Canceled)
}
func TestDispatcher_DispatchLoop_Good_DispatchesPendingTasks(t *testing.T) {
pendingTasks := []Task{
{ID: "task-1", Status: StatusPending, Priority: PriorityMedium},
{ID: "task-2", Status: StatusPending, Priority: PriorityHigh},
}
var mu sync.Mutex
claimedIDs := make(map[string]bool)
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch {
case r.Method == http.MethodGet && r.URL.Path == "/api/tasks":
w.Header().Set("Content-Type", "application/json")
mu.Lock()
// Return only tasks not yet claimed.
var remaining []Task
for _, t := range pendingTasks {
if !claimedIDs[t.ID] {
remaining = append(remaining, t)
}
}
mu.Unlock()
_ = json.NewEncoder(w).Encode(remaining)
case r.Method == http.MethodPost:
// Extract task ID from claim URL.
w.Header().Set("Content-Type", "application/json")
// Parse the task ID from the path.
for _, t := range pendingTasks {
if r.URL.Path == "/api/tasks/"+t.ID+"/claim" {
mu.Lock()
claimedIDs[t.ID] = true
mu.Unlock()
claimed := t
claimed.Status = StatusInProgress
_ = json.NewEncoder(w).Encode(ClaimResponse{Task: &claimed})
return
}
}
w.WriteHeader(http.StatusNotFound)
default:
w.WriteHeader(http.StatusNotFound)
}
}))
defer server.Close()
client := NewClient(server.URL, "test-token")
d, reg, store := setupDispatcher(t, client)
registerAgent(t, reg, store, "agent-1", nil, 10)
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
err := d.DispatchLoop(ctx, 50*time.Millisecond)
require.ErrorIs(t, err, context.DeadlineExceeded)
// Verify tasks were claimed.
mu.Lock()
defer mu.Unlock()
assert.True(t, claimedIDs["task-1"])
assert.True(t, claimedIDs["task-2"])
}
func TestDispatcher_DispatchLoop_Good_NilClientSkipsTick(t *testing.T) {
d, _, _ := setupDispatcher(t, nil)
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()
err := d.DispatchLoop(ctx, 50*time.Millisecond)
require.ErrorIs(t, err, context.DeadlineExceeded)
// No panics — nil client is handled gracefully.
}
// --- Concurrent dispatch ---
func TestDispatcher_Dispatch_Good_Concurrent(t *testing.T) {
d, reg, store := setupDispatcher(t, nil)
registerAgent(t, reg, store, "agent-1", nil, 0) // unlimited
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func(n int) {
defer wg.Done()
task := &Task{ID: "task-" + string(rune('a'+n)), Priority: PriorityMedium}
_, _ = d.Dispatch(context.Background(), task)
}(i)
}
wg.Wait()
// Verify usage was recorded for all dispatches.
usage, _ := store.GetUsage("agent-1")
assert.Equal(t, 10, usage.JobsStarted)
}

148
registry.go Normal file
View file

@ -0,0 +1,148 @@
package agentic
import (
"sync"
"time"
)
// AgentStatus represents the availability state of an agent.
type AgentStatus string
const (
// AgentAvailable indicates the agent is ready to accept tasks.
AgentAvailable AgentStatus = "available"
// AgentBusy indicates the agent is working but may accept more tasks.
AgentBusy AgentStatus = "busy"
// AgentOffline indicates the agent has not sent a heartbeat recently.
AgentOffline AgentStatus = "offline"
)
// AgentInfo describes a registered agent and its current state.
type AgentInfo struct {
// ID is the unique identifier for the agent.
ID string `json:"id"`
// Name is the human-readable name of the agent.
Name string `json:"name"`
// Capabilities lists what the agent can handle (e.g. "go", "testing", "frontend").
Capabilities []string `json:"capabilities,omitempty"`
// Status is the current availability state.
Status AgentStatus `json:"status"`
// LastHeartbeat is the last time the agent reported in.
LastHeartbeat time.Time `json:"last_heartbeat"`
// CurrentLoad is the number of active jobs the agent is running.
CurrentLoad int `json:"current_load"`
// MaxLoad is the maximum concurrent jobs the agent supports. 0 means unlimited.
MaxLoad int `json:"max_load"`
}
// AgentRegistry manages the set of known agents and their health.
type AgentRegistry interface {
// Register adds or updates an agent in the registry.
Register(agent AgentInfo) error
// Deregister removes an agent from the registry.
Deregister(id string) error
// Get returns a copy of the agent info for the given ID.
Get(id string) (AgentInfo, error)
// List returns a copy of all registered agents.
List() []AgentInfo
// Heartbeat updates the agent's LastHeartbeat timestamp and sets status
// to Available if the agent was previously Offline.
Heartbeat(id string) error
// Reap marks agents as Offline if their last heartbeat is older than ttl.
// Returns the IDs of agents that were reaped.
Reap(ttl time.Duration) []string
}
// MemoryRegistry is an in-memory AgentRegistry implementation guarded by a
// read-write mutex. It uses copy-on-read semantics consistent with MemoryStore.
type MemoryRegistry struct {
mu sync.RWMutex
agents map[string]*AgentInfo
}
// NewMemoryRegistry creates a new in-memory agent registry.
func NewMemoryRegistry() *MemoryRegistry {
return &MemoryRegistry{
agents: make(map[string]*AgentInfo),
}
}
// Register adds or updates an agent in the registry. Returns an error if the
// agent ID is empty.
func (r *MemoryRegistry) Register(agent AgentInfo) error {
if agent.ID == "" {
return &APIError{Code: 400, Message: "agent ID is required"}
}
r.mu.Lock()
defer r.mu.Unlock()
cp := agent
r.agents[agent.ID] = &cp
return nil
}
// Deregister removes an agent from the registry. Returns an error if the agent
// is not found.
func (r *MemoryRegistry) Deregister(id string) error {
r.mu.Lock()
defer r.mu.Unlock()
if _, ok := r.agents[id]; !ok {
return &APIError{Code: 404, Message: "agent not found: " + id}
}
delete(r.agents, id)
return nil
}
// Get returns a copy of the agent info for the given ID. Returns an error if
// the agent is not found.
func (r *MemoryRegistry) Get(id string) (AgentInfo, error) {
r.mu.RLock()
defer r.mu.RUnlock()
a, ok := r.agents[id]
if !ok {
return AgentInfo{}, &APIError{Code: 404, Message: "agent not found: " + id}
}
return *a, nil
}
// List returns a copy of all registered agents.
func (r *MemoryRegistry) List() []AgentInfo {
r.mu.RLock()
defer r.mu.RUnlock()
result := make([]AgentInfo, 0, len(r.agents))
for _, a := range r.agents {
result = append(result, *a)
}
return result
}
// Heartbeat updates the agent's LastHeartbeat timestamp. If the agent was
// Offline, it transitions to Available.
func (r *MemoryRegistry) Heartbeat(id string) error {
r.mu.Lock()
defer r.mu.Unlock()
a, ok := r.agents[id]
if !ok {
return &APIError{Code: 404, Message: "agent not found: " + id}
}
a.LastHeartbeat = time.Now().UTC()
if a.Status == AgentOffline {
a.Status = AgentAvailable
}
return nil
}
// Reap marks agents as Offline if their last heartbeat is older than ttl.
// Returns the IDs of agents that were reaped.
func (r *MemoryRegistry) Reap(ttl time.Duration) []string {
r.mu.Lock()
defer r.mu.Unlock()
var reaped []string
now := time.Now().UTC()
for id, a := range r.agents {
if a.Status != AgentOffline && now.Sub(a.LastHeartbeat) > ttl {
a.Status = AgentOffline
reaped = append(reaped, id)
}
}
return reaped
}

298
registry_test.go Normal file
View file

@ -0,0 +1,298 @@
package agentic
import (
"sort"
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// --- Register tests ---
func TestMemoryRegistry_Register_Good(t *testing.T) {
reg := NewMemoryRegistry()
err := reg.Register(AgentInfo{
ID: "agent-1",
Name: "Test Agent",
Capabilities: []string{"go", "testing"},
Status: AgentAvailable,
MaxLoad: 5,
})
require.NoError(t, err)
got, err := reg.Get("agent-1")
require.NoError(t, err)
assert.Equal(t, "agent-1", got.ID)
assert.Equal(t, "Test Agent", got.Name)
assert.Equal(t, []string{"go", "testing"}, got.Capabilities)
assert.Equal(t, AgentAvailable, got.Status)
assert.Equal(t, 5, got.MaxLoad)
}
func TestMemoryRegistry_Register_Good_Overwrite(t *testing.T) {
reg := NewMemoryRegistry()
_ = reg.Register(AgentInfo{ID: "agent-1", Name: "Original", MaxLoad: 3})
err := reg.Register(AgentInfo{ID: "agent-1", Name: "Updated", MaxLoad: 10})
require.NoError(t, err)
got, err := reg.Get("agent-1")
require.NoError(t, err)
assert.Equal(t, "Updated", got.Name)
assert.Equal(t, 10, got.MaxLoad)
}
func TestMemoryRegistry_Register_Bad_EmptyID(t *testing.T) {
reg := NewMemoryRegistry()
err := reg.Register(AgentInfo{ID: "", Name: "No ID"})
require.Error(t, err)
assert.Contains(t, err.Error(), "agent ID is required")
}
func TestMemoryRegistry_Register_Good_CopySemantics(t *testing.T) {
reg := NewMemoryRegistry()
agent := AgentInfo{
ID: "agent-1",
Name: "Copy Test",
Capabilities: []string{"go"},
Status: AgentAvailable,
}
_ = reg.Register(agent)
// Mutate the original — should not affect the stored copy.
agent.Name = "Mutated"
agent.Capabilities[0] = "rust"
got, _ := reg.Get("agent-1")
assert.Equal(t, "Copy Test", got.Name)
// Note: slice header is copied, but underlying array is shared.
// This is consistent with the MemoryStore pattern in allowance.go.
}
// --- Deregister tests ---
func TestMemoryRegistry_Deregister_Good(t *testing.T) {
reg := NewMemoryRegistry()
_ = reg.Register(AgentInfo{ID: "agent-1", Name: "To Remove"})
err := reg.Deregister("agent-1")
require.NoError(t, err)
_, err = reg.Get("agent-1")
require.Error(t, err)
}
func TestMemoryRegistry_Deregister_Bad_NotFound(t *testing.T) {
reg := NewMemoryRegistry()
err := reg.Deregister("nonexistent")
require.Error(t, err)
assert.Contains(t, err.Error(), "agent not found")
}
// --- Get tests ---
func TestMemoryRegistry_Get_Good(t *testing.T) {
reg := NewMemoryRegistry()
now := time.Now().UTC()
_ = reg.Register(AgentInfo{
ID: "agent-1",
Name: "Getter",
Status: AgentBusy,
CurrentLoad: 2,
MaxLoad: 5,
LastHeartbeat: now,
})
got, err := reg.Get("agent-1")
require.NoError(t, err)
assert.Equal(t, AgentBusy, got.Status)
assert.Equal(t, 2, got.CurrentLoad)
assert.Equal(t, now, got.LastHeartbeat)
}
func TestMemoryRegistry_Get_Bad_NotFound(t *testing.T) {
reg := NewMemoryRegistry()
_, err := reg.Get("nonexistent")
require.Error(t, err)
assert.Contains(t, err.Error(), "agent not found")
}
func TestMemoryRegistry_Get_Good_ReturnsCopy(t *testing.T) {
reg := NewMemoryRegistry()
_ = reg.Register(AgentInfo{ID: "agent-1", Name: "Original", CurrentLoad: 1})
got, _ := reg.Get("agent-1")
got.CurrentLoad = 99
got.Name = "Tampered"
// Re-read — should be unchanged.
again, _ := reg.Get("agent-1")
assert.Equal(t, "Original", again.Name)
assert.Equal(t, 1, again.CurrentLoad)
}
// --- List tests ---
func TestMemoryRegistry_List_Good_Empty(t *testing.T) {
reg := NewMemoryRegistry()
agents := reg.List()
assert.Empty(t, agents)
}
func TestMemoryRegistry_List_Good_Multiple(t *testing.T) {
reg := NewMemoryRegistry()
_ = reg.Register(AgentInfo{ID: "a", Name: "Alpha"})
_ = reg.Register(AgentInfo{ID: "b", Name: "Beta"})
_ = reg.Register(AgentInfo{ID: "c", Name: "Charlie"})
agents := reg.List()
assert.Len(t, agents, 3)
// Sort by ID for deterministic assertion.
sort.Slice(agents, func(i, j int) bool { return agents[i].ID < agents[j].ID })
assert.Equal(t, "a", agents[0].ID)
assert.Equal(t, "b", agents[1].ID)
assert.Equal(t, "c", agents[2].ID)
}
// --- Heartbeat tests ---
func TestMemoryRegistry_Heartbeat_Good(t *testing.T) {
reg := NewMemoryRegistry()
past := time.Now().UTC().Add(-5 * time.Minute)
_ = reg.Register(AgentInfo{
ID: "agent-1",
Status: AgentAvailable,
LastHeartbeat: past,
})
err := reg.Heartbeat("agent-1")
require.NoError(t, err)
got, _ := reg.Get("agent-1")
assert.True(t, got.LastHeartbeat.After(past))
assert.Equal(t, AgentAvailable, got.Status)
}
func TestMemoryRegistry_Heartbeat_Good_RecoverFromOffline(t *testing.T) {
reg := NewMemoryRegistry()
_ = reg.Register(AgentInfo{
ID: "agent-1",
Status: AgentOffline,
})
err := reg.Heartbeat("agent-1")
require.NoError(t, err)
got, _ := reg.Get("agent-1")
assert.Equal(t, AgentAvailable, got.Status)
}
func TestMemoryRegistry_Heartbeat_Good_BusyStaysBusy(t *testing.T) {
reg := NewMemoryRegistry()
_ = reg.Register(AgentInfo{
ID: "agent-1",
Status: AgentBusy,
})
err := reg.Heartbeat("agent-1")
require.NoError(t, err)
got, _ := reg.Get("agent-1")
assert.Equal(t, AgentBusy, got.Status)
}
func TestMemoryRegistry_Heartbeat_Bad_NotFound(t *testing.T) {
reg := NewMemoryRegistry()
err := reg.Heartbeat("nonexistent")
require.Error(t, err)
assert.Contains(t, err.Error(), "agent not found")
}
// --- Reap tests ---
func TestMemoryRegistry_Reap_Good_StaleAgent(t *testing.T) {
reg := NewMemoryRegistry()
stale := time.Now().UTC().Add(-10 * time.Minute)
fresh := time.Now().UTC()
_ = reg.Register(AgentInfo{ID: "stale-1", Status: AgentAvailable, LastHeartbeat: stale})
_ = reg.Register(AgentInfo{ID: "fresh-1", Status: AgentAvailable, LastHeartbeat: fresh})
reaped := reg.Reap(5 * time.Minute)
assert.Len(t, reaped, 1)
assert.Contains(t, reaped, "stale-1")
got, _ := reg.Get("stale-1")
assert.Equal(t, AgentOffline, got.Status)
got, _ = reg.Get("fresh-1")
assert.Equal(t, AgentAvailable, got.Status)
}
func TestMemoryRegistry_Reap_Good_AlreadyOfflineSkipped(t *testing.T) {
reg := NewMemoryRegistry()
stale := time.Now().UTC().Add(-10 * time.Minute)
_ = reg.Register(AgentInfo{ID: "already-off", Status: AgentOffline, LastHeartbeat: stale})
reaped := reg.Reap(5 * time.Minute)
assert.Empty(t, reaped)
}
func TestMemoryRegistry_Reap_Good_NoStaleAgents(t *testing.T) {
reg := NewMemoryRegistry()
now := time.Now().UTC()
_ = reg.Register(AgentInfo{ID: "a", Status: AgentAvailable, LastHeartbeat: now})
_ = reg.Register(AgentInfo{ID: "b", Status: AgentBusy, LastHeartbeat: now})
reaped := reg.Reap(5 * time.Minute)
assert.Empty(t, reaped)
}
func TestMemoryRegistry_Reap_Good_BusyAgentReaped(t *testing.T) {
reg := NewMemoryRegistry()
stale := time.Now().UTC().Add(-10 * time.Minute)
_ = reg.Register(AgentInfo{ID: "busy-stale", Status: AgentBusy, LastHeartbeat: stale})
reaped := reg.Reap(5 * time.Minute)
assert.Len(t, reaped, 1)
assert.Contains(t, reaped, "busy-stale")
got, _ := reg.Get("busy-stale")
assert.Equal(t, AgentOffline, got.Status)
}
// --- Concurrent access ---
func TestMemoryRegistry_Concurrent_Good(t *testing.T) {
reg := NewMemoryRegistry()
var wg sync.WaitGroup
for i := 0; i < 20; i++ {
wg.Add(1)
go func(n int) {
defer wg.Done()
id := "agent-" + string(rune('a'+n%5))
_ = reg.Register(AgentInfo{
ID: id,
Name: "Concurrent",
Status: AgentAvailable,
LastHeartbeat: time.Now().UTC(),
})
_, _ = reg.Get(id)
_ = reg.Heartbeat(id)
_ = reg.List()
_ = reg.Reap(1 * time.Minute)
}(i)
}
wg.Wait()
// No race conditions — test passes under -race.
agents := reg.List()
assert.True(t, len(agents) > 0)
}

130
router.go Normal file
View file

@ -0,0 +1,130 @@
package agentic
import (
"errors"
"slices"
"sort"
)
// ErrNoEligibleAgent is returned when no agent matches the task requirements.
var ErrNoEligibleAgent = errors.New("no eligible agent for task")
// TaskRouter selects an agent for a given task from a list of candidates.
type TaskRouter interface {
// Route picks the best agent for the task and returns its ID.
// Returns ErrNoEligibleAgent if no agent qualifies.
Route(task *Task, agents []AgentInfo) (string, error)
}
// DefaultRouter implements TaskRouter with capability matching and load-based
// scoring. For critical priority tasks it picks the least-loaded agent directly.
type DefaultRouter struct{}
// NewDefaultRouter creates a new DefaultRouter.
func NewDefaultRouter() *DefaultRouter {
return &DefaultRouter{}
}
// Route selects the best agent for the task:
// 1. Filter by availability (Available, or Busy with capacity).
// 2. Filter by capabilities (task.Labels must be a subset of agent.Capabilities).
// 3. For critical tasks, pick the least-loaded agent.
// 4. For other tasks, score by load ratio and pick the highest-scored agent.
// 5. Ties are broken by agent ID (alphabetical) for determinism.
func (r *DefaultRouter) Route(task *Task, agents []AgentInfo) (string, error) {
eligible := r.filterEligible(task, agents)
if len(eligible) == 0 {
return "", ErrNoEligibleAgent
}
if task.Priority == PriorityCritical {
return r.leastLoaded(eligible), nil
}
return r.highestScored(eligible), nil
}
// filterEligible returns agents that are available (or busy with room) and
// possess all required capabilities.
func (r *DefaultRouter) filterEligible(task *Task, agents []AgentInfo) []AgentInfo {
var result []AgentInfo
for _, a := range agents {
if !r.isAvailable(a) {
continue
}
if !r.hasCapabilities(a, task.Labels) {
continue
}
result = append(result, a)
}
return result
}
// isAvailable returns true if the agent can accept work.
func (r *DefaultRouter) isAvailable(a AgentInfo) bool {
switch a.Status {
case AgentAvailable:
return true
case AgentBusy:
// Busy agents can still accept work if they have capacity.
return a.MaxLoad == 0 || a.CurrentLoad < a.MaxLoad
default:
return false
}
}
// hasCapabilities checks that the agent has all required labels. If the task
// has no labels, any agent qualifies.
func (r *DefaultRouter) hasCapabilities(a AgentInfo, labels []string) bool {
if len(labels) == 0 {
return true
}
for _, label := range labels {
if !slices.Contains(a.Capabilities, label) {
return false
}
}
return true
}
// leastLoaded picks the agent with the lowest CurrentLoad. Ties are broken by
// agent ID (alphabetical).
func (r *DefaultRouter) leastLoaded(agents []AgentInfo) string {
// Sort: lowest load first, then by ID for determinism.
sort.Slice(agents, func(i, j int) bool {
if agents[i].CurrentLoad != agents[j].CurrentLoad {
return agents[i].CurrentLoad < agents[j].CurrentLoad
}
return agents[i].ID < agents[j].ID
})
return agents[0].ID
}
// highestScored picks the agent with the highest availability score.
// Score = 1.0 - (CurrentLoad / MaxLoad). If MaxLoad is 0, score is 1.0.
// Ties are broken by agent ID (alphabetical).
func (r *DefaultRouter) highestScored(agents []AgentInfo) string {
type scored struct {
id string
score float64
}
scores := make([]scored, len(agents))
for i, a := range agents {
s := 1.0
if a.MaxLoad > 0 {
s = 1.0 - float64(a.CurrentLoad)/float64(a.MaxLoad)
}
scores[i] = scored{id: a.ID, score: s}
}
// Sort: highest score first, then by ID for determinism.
sort.Slice(scores, func(i, j int) bool {
if scores[i].score != scores[j].score {
return scores[i].score > scores[j].score
}
return scores[i].id < scores[j].id
})
return scores[0].id
}

239
router_test.go Normal file
View file

@ -0,0 +1,239 @@
package agentic
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func makeAgent(id string, status AgentStatus, caps []string, load, maxLoad int) AgentInfo {
return AgentInfo{
ID: id,
Name: id,
Capabilities: caps,
Status: status,
LastHeartbeat: time.Now().UTC(),
CurrentLoad: load,
MaxLoad: maxLoad,
}
}
// --- Capability matching ---
func TestDefaultRouter_Route_Good_MatchesCapabilities(t *testing.T) {
router := NewDefaultRouter()
task := &Task{ID: "t1", Labels: []string{"go", "testing"}}
agents := []AgentInfo{
makeAgent("agent-a", AgentAvailable, []string{"go", "testing", "frontend"}, 0, 5),
makeAgent("agent-b", AgentAvailable, []string{"python"}, 0, 5),
}
id, err := router.Route(task, agents)
require.NoError(t, err)
assert.Equal(t, "agent-a", id)
}
func TestDefaultRouter_Route_Good_NoLabelsMatchesAll(t *testing.T) {
router := NewDefaultRouter()
task := &Task{ID: "t1", Labels: nil}
agents := []AgentInfo{
makeAgent("agent-a", AgentAvailable, []string{"go"}, 0, 5),
}
id, err := router.Route(task, agents)
require.NoError(t, err)
assert.Equal(t, "agent-a", id)
}
func TestDefaultRouter_Route_Good_EmptyLabelsMatchesAll(t *testing.T) {
router := NewDefaultRouter()
task := &Task{ID: "t1", Labels: []string{}}
agents := []AgentInfo{
makeAgent("agent-a", AgentAvailable, nil, 0, 5),
}
id, err := router.Route(task, agents)
require.NoError(t, err)
assert.Equal(t, "agent-a", id)
}
// --- Availability filtering ---
func TestDefaultRouter_Route_Good_SkipsOfflineAgents(t *testing.T) {
router := NewDefaultRouter()
task := &Task{ID: "t1"}
agents := []AgentInfo{
makeAgent("offline-1", AgentOffline, nil, 0, 5),
makeAgent("online-1", AgentAvailable, nil, 0, 5),
}
id, err := router.Route(task, agents)
require.NoError(t, err)
assert.Equal(t, "online-1", id)
}
func TestDefaultRouter_Route_Good_BusyWithCapacity(t *testing.T) {
router := NewDefaultRouter()
task := &Task{ID: "t1"}
agents := []AgentInfo{
makeAgent("busy-1", AgentBusy, nil, 2, 5), // has capacity
}
id, err := router.Route(task, agents)
require.NoError(t, err)
assert.Equal(t, "busy-1", id)
}
func TestDefaultRouter_Route_Good_BusyUnlimited(t *testing.T) {
router := NewDefaultRouter()
task := &Task{ID: "t1"}
agents := []AgentInfo{
makeAgent("busy-unlimited", AgentBusy, nil, 10, 0), // MaxLoad 0 = unlimited
}
id, err := router.Route(task, agents)
require.NoError(t, err)
assert.Equal(t, "busy-unlimited", id)
}
func TestDefaultRouter_Route_Bad_BusyAtCapacity(t *testing.T) {
router := NewDefaultRouter()
task := &Task{ID: "t1"}
agents := []AgentInfo{
makeAgent("full-1", AgentBusy, nil, 5, 5), // at capacity
}
_, err := router.Route(task, agents)
require.ErrorIs(t, err, ErrNoEligibleAgent)
}
func TestDefaultRouter_Route_Bad_NoAgents(t *testing.T) {
router := NewDefaultRouter()
task := &Task{ID: "t1"}
_, err := router.Route(task, nil)
require.ErrorIs(t, err, ErrNoEligibleAgent)
}
func TestDefaultRouter_Route_Bad_NoCapableAgent(t *testing.T) {
router := NewDefaultRouter()
task := &Task{ID: "t1", Labels: []string{"rust"}}
agents := []AgentInfo{
makeAgent("go-agent", AgentAvailable, []string{"go"}, 0, 5),
makeAgent("py-agent", AgentAvailable, []string{"python"}, 0, 5),
}
_, err := router.Route(task, agents)
require.ErrorIs(t, err, ErrNoEligibleAgent)
}
func TestDefaultRouter_Route_Bad_AllOffline(t *testing.T) {
router := NewDefaultRouter()
task := &Task{ID: "t1"}
agents := []AgentInfo{
makeAgent("off-1", AgentOffline, nil, 0, 5),
makeAgent("off-2", AgentOffline, nil, 0, 5),
}
_, err := router.Route(task, agents)
require.ErrorIs(t, err, ErrNoEligibleAgent)
}
// --- Load balancing ---
func TestDefaultRouter_Route_Good_LeastLoaded(t *testing.T) {
router := NewDefaultRouter()
task := &Task{ID: "t1", Priority: PriorityMedium}
agents := []AgentInfo{
makeAgent("agent-a", AgentAvailable, nil, 3, 10),
makeAgent("agent-b", AgentAvailable, nil, 1, 10),
makeAgent("agent-c", AgentAvailable, nil, 5, 10),
}
id, err := router.Route(task, agents)
require.NoError(t, err)
// agent-b has score 0.9, agent-a has 0.7, agent-c has 0.5
assert.Equal(t, "agent-b", id)
}
func TestDefaultRouter_Route_Good_UnlimitedGetsMaxScore(t *testing.T) {
router := NewDefaultRouter()
task := &Task{ID: "t1", Priority: PriorityLow}
agents := []AgentInfo{
makeAgent("limited", AgentAvailable, nil, 1, 10), // score 0.9
makeAgent("unlimited", AgentAvailable, nil, 5, 0), // score 1.0
}
id, err := router.Route(task, agents)
require.NoError(t, err)
assert.Equal(t, "unlimited", id)
}
// --- Critical priority ---
func TestDefaultRouter_Route_Good_CriticalPicksLeastLoaded(t *testing.T) {
router := NewDefaultRouter()
task := &Task{ID: "t1", Priority: PriorityCritical}
agents := []AgentInfo{
makeAgent("agent-a", AgentAvailable, nil, 4, 10),
makeAgent("agent-b", AgentAvailable, nil, 1, 5), // lowest absolute load
makeAgent("agent-c", AgentAvailable, nil, 2, 10),
}
id, err := router.Route(task, agents)
require.NoError(t, err)
// Critical: picks least loaded by CurrentLoad, not by ratio.
assert.Equal(t, "agent-b", id)
}
// --- Tie-breaking ---
func TestDefaultRouter_Route_Good_TieBreakByID(t *testing.T) {
router := NewDefaultRouter()
task := &Task{ID: "t1", Priority: PriorityMedium}
agents := []AgentInfo{
makeAgent("charlie", AgentAvailable, nil, 0, 5),
makeAgent("alpha", AgentAvailable, nil, 0, 5),
makeAgent("bravo", AgentAvailable, nil, 0, 5),
}
id, err := router.Route(task, agents)
require.NoError(t, err)
assert.Equal(t, "alpha", id)
}
func TestDefaultRouter_Route_Good_CriticalTieBreakByID(t *testing.T) {
router := NewDefaultRouter()
task := &Task{ID: "t1", Priority: PriorityCritical}
agents := []AgentInfo{
makeAgent("charlie", AgentAvailable, nil, 0, 5),
makeAgent("alpha", AgentAvailable, nil, 0, 5),
makeAgent("bravo", AgentAvailable, nil, 0, 5),
}
id, err := router.Route(task, agents)
require.NoError(t, err)
assert.Equal(t, "alpha", id)
}
// --- Mixed scenarios ---
func TestDefaultRouter_Route_Good_MixedStatusAndCapabilities(t *testing.T) {
router := NewDefaultRouter()
task := &Task{ID: "t1", Labels: []string{"go"}, Priority: PriorityHigh}
agents := []AgentInfo{
makeAgent("offline-go", AgentOffline, []string{"go"}, 0, 5),
makeAgent("busy-py", AgentBusy, []string{"python"}, 1, 5),
makeAgent("busy-go-full", AgentBusy, []string{"go"}, 5, 5), // at capacity
makeAgent("busy-go-room", AgentBusy, []string{"go"}, 2, 5), // has room
makeAgent("avail-go", AgentAvailable, []string{"go"}, 1, 5), // available
}
id, err := router.Route(task, agents)
require.NoError(t, err)
// avail-go: score = 1.0 - 1/5 = 0.8
// busy-go-room: score = 1.0 - 2/5 = 0.6
assert.Equal(t, "avail-go", id)
}