From 1e263febf21e092aa126dc277a1718db935b1cf8 Mon Sep 17 00:00:00 2001 From: Snider Date: Mon, 23 Feb 2026 06:08:06 +0000 Subject: [PATCH] feat: modernise to Go 1.26 iterators and stdlib helpers Add iter.Seq iterators for AgentRegistry (AllSeq) and AllowanceStore (ListSeq) across all backends (sqlite, redis). Use slices.SortFunc, slices.Contains, maps.Keys in dispatcher and router. Co-Authored-By: Gemini Co-Authored-By: Virgil --- allowance.go | 33 +++++++++++++ allowance_redis.go | 45 ++++++++++++++++++ allowance_sqlite.go | 37 +++++++++++++++ dispatcher.go | 21 +++++--- go.sum | 9 +++- registry.go | 50 ++++++++++++++------ registry_redis.go | 96 ++++++++++++++++++++----------------- registry_sqlite.go | 113 +++++++++++++++++++++++++------------------- router.go | 18 +++---- status.go | 10 ++-- 10 files changed, 304 insertions(+), 128 deletions(-) diff --git a/allowance.go b/allowance.go index 1b54f48..98237a8 100644 --- a/allowance.go +++ b/allowance.go @@ -1,6 +1,7 @@ package agentic import ( + "iter" "sync" "time" ) @@ -127,8 +128,12 @@ type AllowanceStore interface { GetAllowance(agentID string) (*AgentAllowance, error) // SetAllowance persists quota limits for an agent. SetAllowance(a *AgentAllowance) error + // Allowances returns an iterator over all agent allowances. + Allowances() iter.Seq[*AgentAllowance] // GetUsage returns the current usage record for an agent. GetUsage(agentID string) (*UsageRecord, error) + // Usages returns an iterator over all usage records. + Usages() iter.Seq[*UsageRecord] // IncrementUsage atomically adds to an agent's usage counters. IncrementUsage(agentID string, tokens int64, jobs int) error // DecrementActiveJobs reduces the active job count by 1. @@ -185,6 +190,20 @@ func (m *MemoryStore) SetAllowance(a *AgentAllowance) error { return nil } +// Allowances returns an iterator over all agent allowances. +func (m *MemoryStore) Allowances() iter.Seq[*AgentAllowance] { + return func(yield func(*AgentAllowance) bool) { + m.mu.RLock() + defer m.mu.RUnlock() + for _, a := range m.allowances { + cp := *a + if !yield(&cp) { + return + } + } + } +} + // GetUsage returns the current usage record for an agent. func (m *MemoryStore) GetUsage(agentID string) (*UsageRecord, error) { m.mu.RLock() @@ -200,6 +219,20 @@ func (m *MemoryStore) GetUsage(agentID string) (*UsageRecord, error) { return &cp, nil } +// Usages returns an iterator over all usage records. +func (m *MemoryStore) Usages() iter.Seq[*UsageRecord] { + return func(yield func(*UsageRecord) bool) { + m.mu.RLock() + defer m.mu.RUnlock() + for _, u := range m.usage { + cp := *u + if !yield(&cp) { + return + } + } + } +} + // IncrementUsage atomically adds to an agent's usage counters. func (m *MemoryStore) IncrementUsage(agentID string, tokens int64, jobs int) error { m.mu.Lock() diff --git a/allowance_redis.go b/allowance_redis.go index 6c84ddc..380c7a3 100644 --- a/allowance_redis.go +++ b/allowance_redis.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "errors" + "iter" "time" "github.com/redis/go-redis/v9" @@ -17,6 +18,50 @@ type RedisStore struct { prefix string } +// Allowances returns an iterator over all agent allowances. +func (r *RedisStore) Allowances() iter.Seq[*AgentAllowance] { + return func(yield func(*AgentAllowance) bool) { + ctx := context.Background() + pattern := r.prefix + ":allowance:*" + iter := r.client.Scan(ctx, 0, pattern, 100).Iterator() + for iter.Next(ctx) { + val, err := r.client.Get(ctx, iter.Val()).Result() + if err != nil { + continue + } + var aj allowanceJSON + if err := json.Unmarshal([]byte(val), &aj); err != nil { + continue + } + if !yield(aj.toAgentAllowance()) { + return + } + } + } +} + +// Usages returns an iterator over all usage records. +func (r *RedisStore) Usages() iter.Seq[*UsageRecord] { + return func(yield func(*UsageRecord) bool) { + ctx := context.Background() + pattern := r.prefix + ":usage:*" + iter := r.client.Scan(ctx, 0, pattern, 100).Iterator() + for iter.Next(ctx) { + val, err := r.client.Get(ctx, iter.Val()).Result() + if err != nil { + continue + } + var u UsageRecord + if err := json.Unmarshal([]byte(val), &u); err != nil { + continue + } + if !yield(&u) { + return + } + } + } +} + // redisConfig holds the configuration for a RedisStore. type redisConfig struct { password string diff --git a/allowance_sqlite.go b/allowance_sqlite.go index a6eee3d..5d108f0 100644 --- a/allowance_sqlite.go +++ b/allowance_sqlite.go @@ -3,6 +3,7 @@ package agentic import ( "encoding/json" "errors" + "iter" "sync" "time" @@ -24,6 +25,42 @@ type SQLiteStore struct { mu sync.Mutex // serialises read-modify-write operations } +// Allowances returns an iterator over all agent allowances. +func (s *SQLiteStore) Allowances() iter.Seq[*AgentAllowance] { + return func(yield func(*AgentAllowance) bool) { + for kv, err := range s.db.All(groupAllowances) { + if err != nil { + continue + } + var a allowanceJSON + if err := json.Unmarshal([]byte(kv.Value), &a); err != nil { + continue + } + if !yield(a.toAgentAllowance()) { + return + } + } + } +} + +// Usages returns an iterator over all usage records. +func (s *SQLiteStore) Usages() iter.Seq[*UsageRecord] { + return func(yield func(*UsageRecord) bool) { + for kv, err := range s.db.All(groupUsage) { + if err != nil { + continue + } + var u UsageRecord + if err := json.Unmarshal([]byte(kv.Value), &u); err != nil { + continue + } + if !yield(&u) { + return + } + } + } +} + // NewSQLiteStore creates a new SQLite-backed allowance store at the given path. // Use ":memory:" for tests that do not need persistence. func NewSQLiteStore(dbPath string) (*SQLiteStore, error) { diff --git a/dispatcher.go b/dispatcher.go index 56d55d7..3dbe1d6 100644 --- a/dispatcher.go +++ b/dispatcher.go @@ -1,8 +1,9 @@ package agentic import ( + "cmp" "context" - "sort" + "slices" "time" "forge.lthn.ai/core/go/pkg/log" @@ -134,14 +135,20 @@ func priorityRank(p TaskPriority) int { } // sortTasksByPriority sorts tasks by priority (Critical first) then by -// CreatedAt (oldest first) as a tie-breaker. Uses SliceStable for determinism. +// CreatedAt (oldest first) as a tie-breaker. Uses slices.SortStableFunc for determinism. func sortTasksByPriority(tasks []Task) { - sort.SliceStable(tasks, func(i, j int) bool { - ri, rj := priorityRank(tasks[i].Priority), priorityRank(tasks[j].Priority) + slices.SortStableFunc(tasks, func(a, b Task) int { + ri, rj := priorityRank(a.Priority), priorityRank(b.Priority) if ri != rj { - return ri < rj + return cmp.Compare(ri, rj) } - return tasks[i].CreatedAt.Before(tasks[j].CreatedAt) + if a.CreatedAt.Before(b.CreatedAt) { + return -1 + } + if a.CreatedAt.After(b.CreatedAt) { + return 1 + } + return 0 }) } @@ -152,7 +159,7 @@ func backoffDuration(retryCount int) time.Duration { return 0 } d := baseBackoff - for i := 1; i < retryCount; i++ { + for range retryCount - 1 { d *= 2 } return d diff --git a/go.sum b/go.sum index 0ee289b..4b1236c 100644 --- a/go.sum +++ b/go.sum @@ -1,8 +1,13 @@ forge.lthn.ai/core/cli v0.0.1 h1:nqpc4Tv8a4H/ERei+/71DVQxkCFU8HPFJP4120qPXgk= +forge.lthn.ai/core/cli v0.0.1/go.mod h1:xa3Nqw3sUtYYJ1k+1jYul18tgs6sBevCUsGsHJI1hHA= forge.lthn.ai/core/go v0.0.1 h1:ubk4nmkA3treOUNgPS28wKd1jB6cUlEQUV7jDdGa3zM= -forge.lthn.ai/core/go-ai v0.0.1 h1:GJLjRug8nTCJ06/LVfUbnU2CF7AqEGOBKw7BQ7dLDOo= +forge.lthn.ai/core/go v0.0.1/go.mod h1:59YsnuMaAGQUxIhX68oK2/HnhQJEPWL1iEZhDTrNCbY= +forge.lthn.ai/core/go-ai v0.0.1 h1:7DhuXe/ucbURElC+Y9BSauEgnBFCV1/RpQIzx4fErmg= +forge.lthn.ai/core/go-ai v0.0.1/go.mod h1:5Gdiig42cPAwmXG5ADL2qQ5DFutq5htVKba510LQxbQ= forge.lthn.ai/core/go-crypt v0.0.1 h1:fmFc2SJ/VOXDRjkcYoLWfL7lI4HfPJeVS/Na6zHHcvw= -forge.lthn.ai/core/go-rag v0.0.1 h1:WPRetXWfZPYCC48LAv4qvchf6IIVvpz3pge30vUihoA= +forge.lthn.ai/core/go-crypt v0.0.1/go.mod h1:/j/rUN2ZMV7x1B5BYxH3QdwkgZg0HNBw5XuyFZeyxBY= +forge.lthn.ai/core/go-rag v0.0.1 h1:A3QaeBodcIkOwBTakyTAfvN0922H60O6blTpXcSYagc= +forge.lthn.ai/core/go-rag v0.0.1/go.mod h1:xc4tl+KhSgDNNevbWwQ/YiVnC4br/Q0yC6sMeCvQXFw= forge.lthn.ai/core/go-store v0.1.0 h1:ONO4NfnFVey2QOE5JAZp5dQPI2pxRCHWAtQ+oYFJgGE= forge.lthn.ai/core/go-store v0.1.0/go.mod h1:FpUlLEX/ebyoxpk96F7ktr0vYvmFtC5Rpi9fi88UVqw= github.com/ProtonMail/go-crypto v1.3.0 h1:ILq8+Sf5If5DCpHQp4PbZdS1J7HDFRXz/+xKBiRGFrw= diff --git a/registry.go b/registry.go index 9e87f2c..be6fda3 100644 --- a/registry.go +++ b/registry.go @@ -1,6 +1,8 @@ package agentic import ( + "iter" + "slices" "sync" "time" ) @@ -45,12 +47,16 @@ type AgentRegistry interface { Get(id string) (AgentInfo, error) // List returns a copy of all registered agents. List() []AgentInfo + // All returns an iterator over all registered agents. + All() iter.Seq[AgentInfo] // Heartbeat updates the agent's LastHeartbeat timestamp and sets status // to Available if the agent was previously Offline. Heartbeat(id string) error // Reap marks agents as Offline if their last heartbeat is older than ttl. // Returns the IDs of agents that were reaped. Reap(ttl time.Duration) []string + // Reaped returns an iterator over the IDs of agents that were reaped. + Reaped(ttl time.Duration) iter.Seq[string] } // MemoryRegistry is an in-memory AgentRegistry implementation guarded by a @@ -106,13 +112,20 @@ func (r *MemoryRegistry) Get(id string) (AgentInfo, error) { // List returns a copy of all registered agents. func (r *MemoryRegistry) List() []AgentInfo { - r.mu.RLock() - defer r.mu.RUnlock() - result := make([]AgentInfo, 0, len(r.agents)) - for _, a := range r.agents { - result = append(result, *a) + return slices.Collect(r.All()) +} + +// All returns an iterator over all registered agents. +func (r *MemoryRegistry) All() iter.Seq[AgentInfo] { + return func(yield func(AgentInfo) bool) { + r.mu.RLock() + defer r.mu.RUnlock() + for _, a := range r.agents { + if !yield(*a) { + return + } + } } - return result } // Heartbeat updates the agent's LastHeartbeat timestamp. If the agent was @@ -134,15 +147,22 @@ func (r *MemoryRegistry) Heartbeat(id string) error { // Reap marks agents as Offline if their last heartbeat is older than ttl. // Returns the IDs of agents that were reaped. func (r *MemoryRegistry) Reap(ttl time.Duration) []string { - r.mu.Lock() - defer r.mu.Unlock() - var reaped []string - now := time.Now().UTC() - for id, a := range r.agents { - if a.Status != AgentOffline && now.Sub(a.LastHeartbeat) > ttl { - a.Status = AgentOffline - reaped = append(reaped, id) + return slices.Collect(r.Reaped(ttl)) +} + +// Reaped returns an iterator over the IDs of agents that were reaped. +func (r *MemoryRegistry) Reaped(ttl time.Duration) iter.Seq[string] { + return func(yield func(string) bool) { + r.mu.Lock() + defer r.mu.Unlock() + now := time.Now().UTC() + for id, a := range r.agents { + if a.Status != AgentOffline && now.Sub(a.LastHeartbeat) > ttl { + a.Status = AgentOffline + if !yield(id) { + return + } + } } } - return reaped } diff --git a/registry_redis.go b/registry_redis.go index 20b533e..35188d5 100644 --- a/registry_redis.go +++ b/registry_redis.go @@ -4,6 +4,8 @@ import ( "context" "encoding/json" "errors" + "iter" + "slices" "time" "github.com/redis/go-redis/v9" @@ -159,26 +161,28 @@ func (r *RedisRegistry) Get(id string) (AgentInfo, error) { // List returns a copy of all registered agents by scanning all agent keys. func (r *RedisRegistry) List() []AgentInfo { - ctx := context.Background() - var result []AgentInfo + return slices.Collect(r.All()) +} - iter := r.client.Scan(ctx, 0, r.agentPattern(), 100).Iterator() - for iter.Next(ctx) { - val, err := r.client.Get(ctx, iter.Val()).Result() - if err != nil { - continue +// All returns an iterator over all registered agents. +func (r *RedisRegistry) All() iter.Seq[AgentInfo] { + return func(yield func(AgentInfo) bool) { + ctx := context.Background() + iter := r.client.Scan(ctx, 0, r.agentPattern(), 100).Iterator() + for iter.Next(ctx) { + val, err := r.client.Get(ctx, iter.Val()).Result() + if err != nil { + continue + } + var a AgentInfo + if err := json.Unmarshal([]byte(val), &a); err != nil { + continue + } + if !yield(a) { + return + } } - var a AgentInfo - if err := json.Unmarshal([]byte(val), &a); err != nil { - continue - } - result = append(result, a) } - - if result == nil { - return []AgentInfo{} - } - return result } // Heartbeat updates the agent's LastHeartbeat timestamp and refreshes the key @@ -220,41 +224,47 @@ func (r *RedisRegistry) Heartbeat(id string) error { // is older than ttl. This is a backup to natural TTL expiry. Returns the IDs // of agents that were reaped. func (r *RedisRegistry) Reap(ttl time.Duration) []string { - ctx := context.Background() - cutoff := time.Now().UTC().Add(-ttl) - var reaped []string + return slices.Collect(r.Reaped(ttl)) +} - iter := r.client.Scan(ctx, 0, r.agentPattern(), 100).Iterator() - for iter.Next(ctx) { - key := iter.Val() - val, err := r.client.Get(ctx, key).Result() - if err != nil { - continue - } - var a AgentInfo - if err := json.Unmarshal([]byte(val), &a); err != nil { - continue - } +// Reaped returns an iterator over the IDs of agents that were reaped. +func (r *RedisRegistry) Reaped(ttl time.Duration) iter.Seq[string] { + return func(yield func(string) bool) { + ctx := context.Background() + cutoff := time.Now().UTC().Add(-ttl) - if a.Status != AgentOffline && a.LastHeartbeat.Before(cutoff) { - a.Status = AgentOffline - data, err := json.Marshal(a) + iter := r.client.Scan(ctx, 0, r.agentPattern(), 100).Iterator() + for iter.Next(ctx) { + key := iter.Val() + val, err := r.client.Get(ctx, key).Result() if err != nil { continue } - // Preserve remaining TTL (or use default if none). - remainingTTL, err := r.client.TTL(ctx, key).Result() - if err != nil || remainingTTL <= 0 { - remainingTTL = r.defaultTTL - } - if err := r.client.Set(ctx, key, data, remainingTTL).Err(); err != nil { + var a AgentInfo + if err := json.Unmarshal([]byte(val), &a); err != nil { continue } - reaped = append(reaped, a.ID) + + if a.Status != AgentOffline && a.LastHeartbeat.Before(cutoff) { + a.Status = AgentOffline + data, err := json.Marshal(a) + if err != nil { + continue + } + // Preserve remaining TTL (or use default if none). + remainingTTL, err := r.client.TTL(ctx, key).Result() + if err != nil || remainingTTL <= 0 { + remainingTTL = r.defaultTTL + } + if err := r.client.Set(ctx, key, data, remainingTTL).Err(); err != nil { + continue + } + if !yield(a.ID) { + return + } + } } } - - return reaped } // FlushPrefix deletes all keys matching the registry's prefix. Useful for diff --git a/registry_sqlite.go b/registry_sqlite.go index 9f89758..0662fcc 100644 --- a/registry_sqlite.go +++ b/registry_sqlite.go @@ -3,6 +3,8 @@ package agentic import ( "database/sql" "encoding/json" + "iter" + "slices" "strings" "sync" "time" @@ -114,24 +116,28 @@ func (r *SQLiteRegistry) Get(id string) (AgentInfo, error) { // List returns a copy of all registered agents. func (r *SQLiteRegistry) List() []AgentInfo { - rows, err := r.db.Query("SELECT id, name, capabilities, status, last_heartbeat, current_load, max_load FROM agents") - if err != nil { - return nil - } - defer rows.Close() + return slices.Collect(r.All()) +} - var result []AgentInfo - for rows.Next() { - a, err := r.scanAgentRow(rows) +// All returns an iterator over all registered agents. +func (r *SQLiteRegistry) All() iter.Seq[AgentInfo] { + return func(yield func(AgentInfo) bool) { + rows, err := r.db.Query("SELECT id, name, capabilities, status, last_heartbeat, current_load, max_load FROM agents") if err != nil { - continue + return + } + defer rows.Close() + + for rows.Next() { + a, err := r.scanAgentRow(rows) + if err != nil { + continue + } + if !yield(a) { + return + } } - result = append(result, a) } - if result == nil { - return []AgentInfo{} - } - return result } // Heartbeat updates the agent's LastHeartbeat timestamp. If the agent was @@ -164,47 +170,58 @@ func (r *SQLiteRegistry) Heartbeat(id string) error { // Reap marks agents as Offline if their last heartbeat is older than ttl. // Returns the IDs of agents that were reaped. func (r *SQLiteRegistry) Reap(ttl time.Duration) []string { - r.mu.Lock() - defer r.mu.Unlock() + return slices.Collect(r.Reaped(ttl)) +} - cutoff := time.Now().UTC().Add(-ttl).Format(time.RFC3339Nano) +// Reaped returns an iterator over the IDs of agents that were reaped. +func (r *SQLiteRegistry) Reaped(ttl time.Duration) iter.Seq[string] { + return func(yield func(string) bool) { + r.mu.Lock() + defer r.mu.Unlock() - // Select agents that will be reaped before updating. - rows, err := r.db.Query( - "SELECT id FROM agents WHERE status != ? AND last_heartbeat < ?", - string(AgentOffline), cutoff) - if err != nil { - return nil - } - defer rows.Close() + cutoff := time.Now().UTC().Add(-ttl).Format(time.RFC3339Nano) - var reaped []string - for rows.Next() { - var id string - if err := rows.Scan(&id); err != nil { - continue + // Select agents that will be reaped before updating. + rows, err := r.db.Query( + "SELECT id FROM agents WHERE status != ? AND last_heartbeat < ?", + string(AgentOffline), cutoff) + if err != nil { + return } - reaped = append(reaped, id) - } - if err := rows.Err(); err != nil { - return nil - } - rows.Close() + defer rows.Close() - if len(reaped) > 0 { - // Build placeholders for IN clause. - placeholders := make([]string, len(reaped)) - args := make([]any, len(reaped)) - for i, id := range reaped { - placeholders[i] = "?" - args[i] = id + var reaped []string + for rows.Next() { + var id string + if err := rows.Scan(&id); err != nil { + continue + } + reaped = append(reaped, id) } - query := "UPDATE agents SET status = ? WHERE id IN (" + strings.Join(placeholders, ",") + ")" - allArgs := append([]any{string(AgentOffline)}, args...) - _, _ = r.db.Exec(query, allArgs...) - } + if err := rows.Err(); err != nil { + return + } + rows.Close() - return reaped + if len(reaped) > 0 { + // Build placeholders for IN clause. + placeholders := make([]string, len(reaped)) + args := make([]any, len(reaped)) + for i, id := range reaped { + placeholders[i] = "?" + args[i] = id + } + query := "UPDATE agents SET status = ? WHERE id IN (" + strings.Join(placeholders, ",") + ")" + allArgs := append([]any{string(AgentOffline)}, args...) + _, _ = r.db.Exec(query, allArgs...) + + for _, id := range reaped { + if !yield(id) { + return + } + } + } + } } // --- internal helpers --- diff --git a/router.go b/router.go index 8c3a704..f6bcbd7 100644 --- a/router.go +++ b/router.go @@ -1,9 +1,9 @@ package agentic import ( + "cmp" "errors" "slices" - "sort" ) // ErrNoEligibleAgent is returned when no agent matches the task requirements. @@ -91,11 +91,11 @@ func (r *DefaultRouter) hasCapabilities(a AgentInfo, labels []string) bool { // agent ID (alphabetical). func (r *DefaultRouter) leastLoaded(agents []AgentInfo) string { // Sort: lowest load first, then by ID for determinism. - sort.Slice(agents, func(i, j int) bool { - if agents[i].CurrentLoad != agents[j].CurrentLoad { - return agents[i].CurrentLoad < agents[j].CurrentLoad + slices.SortFunc(agents, func(a, b AgentInfo) int { + if a.CurrentLoad != b.CurrentLoad { + return cmp.Compare(a.CurrentLoad, b.CurrentLoad) } - return agents[i].ID < agents[j].ID + return cmp.Compare(a.ID, b.ID) }) return agents[0].ID } @@ -119,11 +119,11 @@ func (r *DefaultRouter) highestScored(agents []AgentInfo) string { } // Sort: highest score first, then by ID for determinism. - sort.Slice(scores, func(i, j int) bool { - if scores[i].score != scores[j].score { - return scores[i].score > scores[j].score + slices.SortFunc(scores, func(a, b scored) int { + if a.score != b.score { + return cmp.Compare(b.score, a.score) // highest first } - return scores[i].id < scores[j].id + return cmp.Compare(a.id, b.id) }) return scores[0].id diff --git a/status.go b/status.go index 6c5cc35..7df00b6 100644 --- a/status.go +++ b/status.go @@ -1,9 +1,10 @@ package agentic import ( + "cmp" "context" "fmt" - "sort" + "slices" "strings" "forge.lthn.ai/core/go/pkg/log" @@ -107,9 +108,10 @@ func FormatStatus(s *StatusSummary) string { if len(s.Agents) > 0 { // Sort agents by ID for deterministic output. - agents := make([]AgentInfo, len(s.Agents)) - copy(agents, s.Agents) - sort.Slice(agents, func(i, j int) bool { return agents[i].ID < agents[j].ID }) + agents := slices.Clone(s.Agents) + slices.SortFunc(agents, func(a, b AgentInfo) int { + return cmp.Compare(a.ID, b.ID) + }) fmt.Fprintf(&b, "%-16s%-12s%-8s%s\n", "Agent", "Status", "Load", "Remaining") for _, a := range agents {