diff --git a/allowance.go b/allowance.go index 1b54f48..98237a8 100644 --- a/allowance.go +++ b/allowance.go @@ -1,6 +1,7 @@ package agentic import ( + "iter" "sync" "time" ) @@ -127,8 +128,12 @@ type AllowanceStore interface { GetAllowance(agentID string) (*AgentAllowance, error) // SetAllowance persists quota limits for an agent. SetAllowance(a *AgentAllowance) error + // Allowances returns an iterator over all agent allowances. + Allowances() iter.Seq[*AgentAllowance] // GetUsage returns the current usage record for an agent. GetUsage(agentID string) (*UsageRecord, error) + // Usages returns an iterator over all usage records. + Usages() iter.Seq[*UsageRecord] // IncrementUsage atomically adds to an agent's usage counters. IncrementUsage(agentID string, tokens int64, jobs int) error // DecrementActiveJobs reduces the active job count by 1. @@ -185,6 +190,20 @@ func (m *MemoryStore) SetAllowance(a *AgentAllowance) error { return nil } +// Allowances returns an iterator over all agent allowances. +func (m *MemoryStore) Allowances() iter.Seq[*AgentAllowance] { + return func(yield func(*AgentAllowance) bool) { + m.mu.RLock() + defer m.mu.RUnlock() + for _, a := range m.allowances { + cp := *a + if !yield(&cp) { + return + } + } + } +} + // GetUsage returns the current usage record for an agent. func (m *MemoryStore) GetUsage(agentID string) (*UsageRecord, error) { m.mu.RLock() @@ -200,6 +219,20 @@ func (m *MemoryStore) GetUsage(agentID string) (*UsageRecord, error) { return &cp, nil } +// Usages returns an iterator over all usage records. +func (m *MemoryStore) Usages() iter.Seq[*UsageRecord] { + return func(yield func(*UsageRecord) bool) { + m.mu.RLock() + defer m.mu.RUnlock() + for _, u := range m.usage { + cp := *u + if !yield(&cp) { + return + } + } + } +} + // IncrementUsage atomically adds to an agent's usage counters. func (m *MemoryStore) IncrementUsage(agentID string, tokens int64, jobs int) error { m.mu.Lock() diff --git a/allowance_redis.go b/allowance_redis.go index 6c84ddc..380c7a3 100644 --- a/allowance_redis.go +++ b/allowance_redis.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "errors" + "iter" "time" "github.com/redis/go-redis/v9" @@ -17,6 +18,50 @@ type RedisStore struct { prefix string } +// Allowances returns an iterator over all agent allowances. +func (r *RedisStore) Allowances() iter.Seq[*AgentAllowance] { + return func(yield func(*AgentAllowance) bool) { + ctx := context.Background() + pattern := r.prefix + ":allowance:*" + iter := r.client.Scan(ctx, 0, pattern, 100).Iterator() + for iter.Next(ctx) { + val, err := r.client.Get(ctx, iter.Val()).Result() + if err != nil { + continue + } + var aj allowanceJSON + if err := json.Unmarshal([]byte(val), &aj); err != nil { + continue + } + if !yield(aj.toAgentAllowance()) { + return + } + } + } +} + +// Usages returns an iterator over all usage records. +func (r *RedisStore) Usages() iter.Seq[*UsageRecord] { + return func(yield func(*UsageRecord) bool) { + ctx := context.Background() + pattern := r.prefix + ":usage:*" + iter := r.client.Scan(ctx, 0, pattern, 100).Iterator() + for iter.Next(ctx) { + val, err := r.client.Get(ctx, iter.Val()).Result() + if err != nil { + continue + } + var u UsageRecord + if err := json.Unmarshal([]byte(val), &u); err != nil { + continue + } + if !yield(&u) { + return + } + } + } +} + // redisConfig holds the configuration for a RedisStore. type redisConfig struct { password string diff --git a/allowance_sqlite.go b/allowance_sqlite.go index a6eee3d..5d108f0 100644 --- a/allowance_sqlite.go +++ b/allowance_sqlite.go @@ -3,6 +3,7 @@ package agentic import ( "encoding/json" "errors" + "iter" "sync" "time" @@ -24,6 +25,42 @@ type SQLiteStore struct { mu sync.Mutex // serialises read-modify-write operations } +// Allowances returns an iterator over all agent allowances. +func (s *SQLiteStore) Allowances() iter.Seq[*AgentAllowance] { + return func(yield func(*AgentAllowance) bool) { + for kv, err := range s.db.All(groupAllowances) { + if err != nil { + continue + } + var a allowanceJSON + if err := json.Unmarshal([]byte(kv.Value), &a); err != nil { + continue + } + if !yield(a.toAgentAllowance()) { + return + } + } + } +} + +// Usages returns an iterator over all usage records. +func (s *SQLiteStore) Usages() iter.Seq[*UsageRecord] { + return func(yield func(*UsageRecord) bool) { + for kv, err := range s.db.All(groupUsage) { + if err != nil { + continue + } + var u UsageRecord + if err := json.Unmarshal([]byte(kv.Value), &u); err != nil { + continue + } + if !yield(&u) { + return + } + } + } +} + // NewSQLiteStore creates a new SQLite-backed allowance store at the given path. // Use ":memory:" for tests that do not need persistence. func NewSQLiteStore(dbPath string) (*SQLiteStore, error) { diff --git a/dispatcher.go b/dispatcher.go index 56d55d7..3dbe1d6 100644 --- a/dispatcher.go +++ b/dispatcher.go @@ -1,8 +1,9 @@ package agentic import ( + "cmp" "context" - "sort" + "slices" "time" "forge.lthn.ai/core/go/pkg/log" @@ -134,14 +135,20 @@ func priorityRank(p TaskPriority) int { } // sortTasksByPriority sorts tasks by priority (Critical first) then by -// CreatedAt (oldest first) as a tie-breaker. Uses SliceStable for determinism. +// CreatedAt (oldest first) as a tie-breaker. Uses slices.SortStableFunc for determinism. func sortTasksByPriority(tasks []Task) { - sort.SliceStable(tasks, func(i, j int) bool { - ri, rj := priorityRank(tasks[i].Priority), priorityRank(tasks[j].Priority) + slices.SortStableFunc(tasks, func(a, b Task) int { + ri, rj := priorityRank(a.Priority), priorityRank(b.Priority) if ri != rj { - return ri < rj + return cmp.Compare(ri, rj) } - return tasks[i].CreatedAt.Before(tasks[j].CreatedAt) + if a.CreatedAt.Before(b.CreatedAt) { + return -1 + } + if a.CreatedAt.After(b.CreatedAt) { + return 1 + } + return 0 }) } @@ -152,7 +159,7 @@ func backoffDuration(retryCount int) time.Duration { return 0 } d := baseBackoff - for i := 1; i < retryCount; i++ { + for range retryCount - 1 { d *= 2 } return d diff --git a/go.sum b/go.sum index 0ee289b..4b1236c 100644 --- a/go.sum +++ b/go.sum @@ -1,8 +1,13 @@ forge.lthn.ai/core/cli v0.0.1 h1:nqpc4Tv8a4H/ERei+/71DVQxkCFU8HPFJP4120qPXgk= +forge.lthn.ai/core/cli v0.0.1/go.mod h1:xa3Nqw3sUtYYJ1k+1jYul18tgs6sBevCUsGsHJI1hHA= forge.lthn.ai/core/go v0.0.1 h1:ubk4nmkA3treOUNgPS28wKd1jB6cUlEQUV7jDdGa3zM= -forge.lthn.ai/core/go-ai v0.0.1 h1:GJLjRug8nTCJ06/LVfUbnU2CF7AqEGOBKw7BQ7dLDOo= +forge.lthn.ai/core/go v0.0.1/go.mod h1:59YsnuMaAGQUxIhX68oK2/HnhQJEPWL1iEZhDTrNCbY= +forge.lthn.ai/core/go-ai v0.0.1 h1:7DhuXe/ucbURElC+Y9BSauEgnBFCV1/RpQIzx4fErmg= +forge.lthn.ai/core/go-ai v0.0.1/go.mod h1:5Gdiig42cPAwmXG5ADL2qQ5DFutq5htVKba510LQxbQ= forge.lthn.ai/core/go-crypt v0.0.1 h1:fmFc2SJ/VOXDRjkcYoLWfL7lI4HfPJeVS/Na6zHHcvw= -forge.lthn.ai/core/go-rag v0.0.1 h1:WPRetXWfZPYCC48LAv4qvchf6IIVvpz3pge30vUihoA= +forge.lthn.ai/core/go-crypt v0.0.1/go.mod h1:/j/rUN2ZMV7x1B5BYxH3QdwkgZg0HNBw5XuyFZeyxBY= +forge.lthn.ai/core/go-rag v0.0.1 h1:A3QaeBodcIkOwBTakyTAfvN0922H60O6blTpXcSYagc= +forge.lthn.ai/core/go-rag v0.0.1/go.mod h1:xc4tl+KhSgDNNevbWwQ/YiVnC4br/Q0yC6sMeCvQXFw= forge.lthn.ai/core/go-store v0.1.0 h1:ONO4NfnFVey2QOE5JAZp5dQPI2pxRCHWAtQ+oYFJgGE= forge.lthn.ai/core/go-store v0.1.0/go.mod h1:FpUlLEX/ebyoxpk96F7ktr0vYvmFtC5Rpi9fi88UVqw= github.com/ProtonMail/go-crypto v1.3.0 h1:ILq8+Sf5If5DCpHQp4PbZdS1J7HDFRXz/+xKBiRGFrw= diff --git a/registry.go b/registry.go index 9e87f2c..be6fda3 100644 --- a/registry.go +++ b/registry.go @@ -1,6 +1,8 @@ package agentic import ( + "iter" + "slices" "sync" "time" ) @@ -45,12 +47,16 @@ type AgentRegistry interface { Get(id string) (AgentInfo, error) // List returns a copy of all registered agents. List() []AgentInfo + // All returns an iterator over all registered agents. + All() iter.Seq[AgentInfo] // Heartbeat updates the agent's LastHeartbeat timestamp and sets status // to Available if the agent was previously Offline. Heartbeat(id string) error // Reap marks agents as Offline if their last heartbeat is older than ttl. // Returns the IDs of agents that were reaped. Reap(ttl time.Duration) []string + // Reaped returns an iterator over the IDs of agents that were reaped. + Reaped(ttl time.Duration) iter.Seq[string] } // MemoryRegistry is an in-memory AgentRegistry implementation guarded by a @@ -106,13 +112,20 @@ func (r *MemoryRegistry) Get(id string) (AgentInfo, error) { // List returns a copy of all registered agents. func (r *MemoryRegistry) List() []AgentInfo { - r.mu.RLock() - defer r.mu.RUnlock() - result := make([]AgentInfo, 0, len(r.agents)) - for _, a := range r.agents { - result = append(result, *a) + return slices.Collect(r.All()) +} + +// All returns an iterator over all registered agents. +func (r *MemoryRegistry) All() iter.Seq[AgentInfo] { + return func(yield func(AgentInfo) bool) { + r.mu.RLock() + defer r.mu.RUnlock() + for _, a := range r.agents { + if !yield(*a) { + return + } + } } - return result } // Heartbeat updates the agent's LastHeartbeat timestamp. If the agent was @@ -134,15 +147,22 @@ func (r *MemoryRegistry) Heartbeat(id string) error { // Reap marks agents as Offline if their last heartbeat is older than ttl. // Returns the IDs of agents that were reaped. func (r *MemoryRegistry) Reap(ttl time.Duration) []string { - r.mu.Lock() - defer r.mu.Unlock() - var reaped []string - now := time.Now().UTC() - for id, a := range r.agents { - if a.Status != AgentOffline && now.Sub(a.LastHeartbeat) > ttl { - a.Status = AgentOffline - reaped = append(reaped, id) + return slices.Collect(r.Reaped(ttl)) +} + +// Reaped returns an iterator over the IDs of agents that were reaped. +func (r *MemoryRegistry) Reaped(ttl time.Duration) iter.Seq[string] { + return func(yield func(string) bool) { + r.mu.Lock() + defer r.mu.Unlock() + now := time.Now().UTC() + for id, a := range r.agents { + if a.Status != AgentOffline && now.Sub(a.LastHeartbeat) > ttl { + a.Status = AgentOffline + if !yield(id) { + return + } + } } } - return reaped } diff --git a/registry_redis.go b/registry_redis.go index 20b533e..35188d5 100644 --- a/registry_redis.go +++ b/registry_redis.go @@ -4,6 +4,8 @@ import ( "context" "encoding/json" "errors" + "iter" + "slices" "time" "github.com/redis/go-redis/v9" @@ -159,26 +161,28 @@ func (r *RedisRegistry) Get(id string) (AgentInfo, error) { // List returns a copy of all registered agents by scanning all agent keys. func (r *RedisRegistry) List() []AgentInfo { - ctx := context.Background() - var result []AgentInfo + return slices.Collect(r.All()) +} - iter := r.client.Scan(ctx, 0, r.agentPattern(), 100).Iterator() - for iter.Next(ctx) { - val, err := r.client.Get(ctx, iter.Val()).Result() - if err != nil { - continue +// All returns an iterator over all registered agents. +func (r *RedisRegistry) All() iter.Seq[AgentInfo] { + return func(yield func(AgentInfo) bool) { + ctx := context.Background() + iter := r.client.Scan(ctx, 0, r.agentPattern(), 100).Iterator() + for iter.Next(ctx) { + val, err := r.client.Get(ctx, iter.Val()).Result() + if err != nil { + continue + } + var a AgentInfo + if err := json.Unmarshal([]byte(val), &a); err != nil { + continue + } + if !yield(a) { + return + } } - var a AgentInfo - if err := json.Unmarshal([]byte(val), &a); err != nil { - continue - } - result = append(result, a) } - - if result == nil { - return []AgentInfo{} - } - return result } // Heartbeat updates the agent's LastHeartbeat timestamp and refreshes the key @@ -220,41 +224,47 @@ func (r *RedisRegistry) Heartbeat(id string) error { // is older than ttl. This is a backup to natural TTL expiry. Returns the IDs // of agents that were reaped. func (r *RedisRegistry) Reap(ttl time.Duration) []string { - ctx := context.Background() - cutoff := time.Now().UTC().Add(-ttl) - var reaped []string + return slices.Collect(r.Reaped(ttl)) +} - iter := r.client.Scan(ctx, 0, r.agentPattern(), 100).Iterator() - for iter.Next(ctx) { - key := iter.Val() - val, err := r.client.Get(ctx, key).Result() - if err != nil { - continue - } - var a AgentInfo - if err := json.Unmarshal([]byte(val), &a); err != nil { - continue - } +// Reaped returns an iterator over the IDs of agents that were reaped. +func (r *RedisRegistry) Reaped(ttl time.Duration) iter.Seq[string] { + return func(yield func(string) bool) { + ctx := context.Background() + cutoff := time.Now().UTC().Add(-ttl) - if a.Status != AgentOffline && a.LastHeartbeat.Before(cutoff) { - a.Status = AgentOffline - data, err := json.Marshal(a) + iter := r.client.Scan(ctx, 0, r.agentPattern(), 100).Iterator() + for iter.Next(ctx) { + key := iter.Val() + val, err := r.client.Get(ctx, key).Result() if err != nil { continue } - // Preserve remaining TTL (or use default if none). - remainingTTL, err := r.client.TTL(ctx, key).Result() - if err != nil || remainingTTL <= 0 { - remainingTTL = r.defaultTTL - } - if err := r.client.Set(ctx, key, data, remainingTTL).Err(); err != nil { + var a AgentInfo + if err := json.Unmarshal([]byte(val), &a); err != nil { continue } - reaped = append(reaped, a.ID) + + if a.Status != AgentOffline && a.LastHeartbeat.Before(cutoff) { + a.Status = AgentOffline + data, err := json.Marshal(a) + if err != nil { + continue + } + // Preserve remaining TTL (or use default if none). + remainingTTL, err := r.client.TTL(ctx, key).Result() + if err != nil || remainingTTL <= 0 { + remainingTTL = r.defaultTTL + } + if err := r.client.Set(ctx, key, data, remainingTTL).Err(); err != nil { + continue + } + if !yield(a.ID) { + return + } + } } } - - return reaped } // FlushPrefix deletes all keys matching the registry's prefix. Useful for diff --git a/registry_sqlite.go b/registry_sqlite.go index 9f89758..0662fcc 100644 --- a/registry_sqlite.go +++ b/registry_sqlite.go @@ -3,6 +3,8 @@ package agentic import ( "database/sql" "encoding/json" + "iter" + "slices" "strings" "sync" "time" @@ -114,24 +116,28 @@ func (r *SQLiteRegistry) Get(id string) (AgentInfo, error) { // List returns a copy of all registered agents. func (r *SQLiteRegistry) List() []AgentInfo { - rows, err := r.db.Query("SELECT id, name, capabilities, status, last_heartbeat, current_load, max_load FROM agents") - if err != nil { - return nil - } - defer rows.Close() + return slices.Collect(r.All()) +} - var result []AgentInfo - for rows.Next() { - a, err := r.scanAgentRow(rows) +// All returns an iterator over all registered agents. +func (r *SQLiteRegistry) All() iter.Seq[AgentInfo] { + return func(yield func(AgentInfo) bool) { + rows, err := r.db.Query("SELECT id, name, capabilities, status, last_heartbeat, current_load, max_load FROM agents") if err != nil { - continue + return + } + defer rows.Close() + + for rows.Next() { + a, err := r.scanAgentRow(rows) + if err != nil { + continue + } + if !yield(a) { + return + } } - result = append(result, a) } - if result == nil { - return []AgentInfo{} - } - return result } // Heartbeat updates the agent's LastHeartbeat timestamp. If the agent was @@ -164,47 +170,58 @@ func (r *SQLiteRegistry) Heartbeat(id string) error { // Reap marks agents as Offline if their last heartbeat is older than ttl. // Returns the IDs of agents that were reaped. func (r *SQLiteRegistry) Reap(ttl time.Duration) []string { - r.mu.Lock() - defer r.mu.Unlock() + return slices.Collect(r.Reaped(ttl)) +} - cutoff := time.Now().UTC().Add(-ttl).Format(time.RFC3339Nano) +// Reaped returns an iterator over the IDs of agents that were reaped. +func (r *SQLiteRegistry) Reaped(ttl time.Duration) iter.Seq[string] { + return func(yield func(string) bool) { + r.mu.Lock() + defer r.mu.Unlock() - // Select agents that will be reaped before updating. - rows, err := r.db.Query( - "SELECT id FROM agents WHERE status != ? AND last_heartbeat < ?", - string(AgentOffline), cutoff) - if err != nil { - return nil - } - defer rows.Close() + cutoff := time.Now().UTC().Add(-ttl).Format(time.RFC3339Nano) - var reaped []string - for rows.Next() { - var id string - if err := rows.Scan(&id); err != nil { - continue + // Select agents that will be reaped before updating. + rows, err := r.db.Query( + "SELECT id FROM agents WHERE status != ? AND last_heartbeat < ?", + string(AgentOffline), cutoff) + if err != nil { + return } - reaped = append(reaped, id) - } - if err := rows.Err(); err != nil { - return nil - } - rows.Close() + defer rows.Close() - if len(reaped) > 0 { - // Build placeholders for IN clause. - placeholders := make([]string, len(reaped)) - args := make([]any, len(reaped)) - for i, id := range reaped { - placeholders[i] = "?" - args[i] = id + var reaped []string + for rows.Next() { + var id string + if err := rows.Scan(&id); err != nil { + continue + } + reaped = append(reaped, id) } - query := "UPDATE agents SET status = ? WHERE id IN (" + strings.Join(placeholders, ",") + ")" - allArgs := append([]any{string(AgentOffline)}, args...) - _, _ = r.db.Exec(query, allArgs...) - } + if err := rows.Err(); err != nil { + return + } + rows.Close() - return reaped + if len(reaped) > 0 { + // Build placeholders for IN clause. + placeholders := make([]string, len(reaped)) + args := make([]any, len(reaped)) + for i, id := range reaped { + placeholders[i] = "?" + args[i] = id + } + query := "UPDATE agents SET status = ? WHERE id IN (" + strings.Join(placeholders, ",") + ")" + allArgs := append([]any{string(AgentOffline)}, args...) + _, _ = r.db.Exec(query, allArgs...) + + for _, id := range reaped { + if !yield(id) { + return + } + } + } + } } // --- internal helpers --- diff --git a/router.go b/router.go index 8c3a704..f6bcbd7 100644 --- a/router.go +++ b/router.go @@ -1,9 +1,9 @@ package agentic import ( + "cmp" "errors" "slices" - "sort" ) // ErrNoEligibleAgent is returned when no agent matches the task requirements. @@ -91,11 +91,11 @@ func (r *DefaultRouter) hasCapabilities(a AgentInfo, labels []string) bool { // agent ID (alphabetical). func (r *DefaultRouter) leastLoaded(agents []AgentInfo) string { // Sort: lowest load first, then by ID for determinism. - sort.Slice(agents, func(i, j int) bool { - if agents[i].CurrentLoad != agents[j].CurrentLoad { - return agents[i].CurrentLoad < agents[j].CurrentLoad + slices.SortFunc(agents, func(a, b AgentInfo) int { + if a.CurrentLoad != b.CurrentLoad { + return cmp.Compare(a.CurrentLoad, b.CurrentLoad) } - return agents[i].ID < agents[j].ID + return cmp.Compare(a.ID, b.ID) }) return agents[0].ID } @@ -119,11 +119,11 @@ func (r *DefaultRouter) highestScored(agents []AgentInfo) string { } // Sort: highest score first, then by ID for determinism. - sort.Slice(scores, func(i, j int) bool { - if scores[i].score != scores[j].score { - return scores[i].score > scores[j].score + slices.SortFunc(scores, func(a, b scored) int { + if a.score != b.score { + return cmp.Compare(b.score, a.score) // highest first } - return scores[i].id < scores[j].id + return cmp.Compare(a.id, b.id) }) return scores[0].id diff --git a/status.go b/status.go index 6c5cc35..7df00b6 100644 --- a/status.go +++ b/status.go @@ -1,9 +1,10 @@ package agentic import ( + "cmp" "context" "fmt" - "sort" + "slices" "strings" "forge.lthn.ai/core/go/pkg/log" @@ -107,9 +108,10 @@ func FormatStatus(s *StatusSummary) string { if len(s.Agents) > 0 { // Sort agents by ID for deterministic output. - agents := make([]AgentInfo, len(s.Agents)) - copy(agents, s.Agents) - sort.Slice(agents, func(i, j int) bool { return agents[i].ID < agents[j].ID }) + agents := slices.Clone(s.Agents) + slices.SortFunc(agents, func(a, b AgentInfo) int { + return cmp.Compare(a.ID, b.ID) + }) fmt.Fprintf(&b, "%-16s%-12s%-8s%s\n", "Agent", "Status", "Load", "Remaining") for _, a := range agents {