feat: modernise to Go 1.26 iterators and stdlib helpers
Add iter.Seq iterators for AgentRegistry (AllSeq) and AllowanceStore (ListSeq) across all backends (sqlite, redis). Use slices.SortFunc, slices.Contains, maps.Keys in dispatcher and router. Co-Authored-By: Gemini <noreply@google.com> Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
parent
b2286b46d7
commit
1e263febf2
10 changed files with 304 additions and 128 deletions
33
allowance.go
33
allowance.go
|
|
@ -1,6 +1,7 @@
|
|||
package agentic
|
||||
|
||||
import (
|
||||
"iter"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
|
@ -127,8 +128,12 @@ type AllowanceStore interface {
|
|||
GetAllowance(agentID string) (*AgentAllowance, error)
|
||||
// SetAllowance persists quota limits for an agent.
|
||||
SetAllowance(a *AgentAllowance) error
|
||||
// Allowances returns an iterator over all agent allowances.
|
||||
Allowances() iter.Seq[*AgentAllowance]
|
||||
// GetUsage returns the current usage record for an agent.
|
||||
GetUsage(agentID string) (*UsageRecord, error)
|
||||
// Usages returns an iterator over all usage records.
|
||||
Usages() iter.Seq[*UsageRecord]
|
||||
// IncrementUsage atomically adds to an agent's usage counters.
|
||||
IncrementUsage(agentID string, tokens int64, jobs int) error
|
||||
// DecrementActiveJobs reduces the active job count by 1.
|
||||
|
|
@ -185,6 +190,20 @@ func (m *MemoryStore) SetAllowance(a *AgentAllowance) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// Allowances returns an iterator over all agent allowances.
|
||||
func (m *MemoryStore) Allowances() iter.Seq[*AgentAllowance] {
|
||||
return func(yield func(*AgentAllowance) bool) {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
for _, a := range m.allowances {
|
||||
cp := *a
|
||||
if !yield(&cp) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// GetUsage returns the current usage record for an agent.
|
||||
func (m *MemoryStore) GetUsage(agentID string) (*UsageRecord, error) {
|
||||
m.mu.RLock()
|
||||
|
|
@ -200,6 +219,20 @@ func (m *MemoryStore) GetUsage(agentID string) (*UsageRecord, error) {
|
|||
return &cp, nil
|
||||
}
|
||||
|
||||
// Usages returns an iterator over all usage records.
|
||||
func (m *MemoryStore) Usages() iter.Seq[*UsageRecord] {
|
||||
return func(yield func(*UsageRecord) bool) {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
for _, u := range m.usage {
|
||||
cp := *u
|
||||
if !yield(&cp) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// IncrementUsage atomically adds to an agent's usage counters.
|
||||
func (m *MemoryStore) IncrementUsage(agentID string, tokens int64, jobs int) error {
|
||||
m.mu.Lock()
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@ import (
|
|||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"iter"
|
||||
"time"
|
||||
|
||||
"github.com/redis/go-redis/v9"
|
||||
|
|
@ -17,6 +18,50 @@ type RedisStore struct {
|
|||
prefix string
|
||||
}
|
||||
|
||||
// Allowances returns an iterator over all agent allowances.
|
||||
func (r *RedisStore) Allowances() iter.Seq[*AgentAllowance] {
|
||||
return func(yield func(*AgentAllowance) bool) {
|
||||
ctx := context.Background()
|
||||
pattern := r.prefix + ":allowance:*"
|
||||
iter := r.client.Scan(ctx, 0, pattern, 100).Iterator()
|
||||
for iter.Next(ctx) {
|
||||
val, err := r.client.Get(ctx, iter.Val()).Result()
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
var aj allowanceJSON
|
||||
if err := json.Unmarshal([]byte(val), &aj); err != nil {
|
||||
continue
|
||||
}
|
||||
if !yield(aj.toAgentAllowance()) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Usages returns an iterator over all usage records.
|
||||
func (r *RedisStore) Usages() iter.Seq[*UsageRecord] {
|
||||
return func(yield func(*UsageRecord) bool) {
|
||||
ctx := context.Background()
|
||||
pattern := r.prefix + ":usage:*"
|
||||
iter := r.client.Scan(ctx, 0, pattern, 100).Iterator()
|
||||
for iter.Next(ctx) {
|
||||
val, err := r.client.Get(ctx, iter.Val()).Result()
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
var u UsageRecord
|
||||
if err := json.Unmarshal([]byte(val), &u); err != nil {
|
||||
continue
|
||||
}
|
||||
if !yield(&u) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// redisConfig holds the configuration for a RedisStore.
|
||||
type redisConfig struct {
|
||||
password string
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@ package agentic
|
|||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"iter"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
|
|
@ -24,6 +25,42 @@ type SQLiteStore struct {
|
|||
mu sync.Mutex // serialises read-modify-write operations
|
||||
}
|
||||
|
||||
// Allowances returns an iterator over all agent allowances.
|
||||
func (s *SQLiteStore) Allowances() iter.Seq[*AgentAllowance] {
|
||||
return func(yield func(*AgentAllowance) bool) {
|
||||
for kv, err := range s.db.All(groupAllowances) {
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
var a allowanceJSON
|
||||
if err := json.Unmarshal([]byte(kv.Value), &a); err != nil {
|
||||
continue
|
||||
}
|
||||
if !yield(a.toAgentAllowance()) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Usages returns an iterator over all usage records.
|
||||
func (s *SQLiteStore) Usages() iter.Seq[*UsageRecord] {
|
||||
return func(yield func(*UsageRecord) bool) {
|
||||
for kv, err := range s.db.All(groupUsage) {
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
var u UsageRecord
|
||||
if err := json.Unmarshal([]byte(kv.Value), &u); err != nil {
|
||||
continue
|
||||
}
|
||||
if !yield(&u) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// NewSQLiteStore creates a new SQLite-backed allowance store at the given path.
|
||||
// Use ":memory:" for tests that do not need persistence.
|
||||
func NewSQLiteStore(dbPath string) (*SQLiteStore, error) {
|
||||
|
|
|
|||
|
|
@ -1,8 +1,9 @@
|
|||
package agentic
|
||||
|
||||
import (
|
||||
"cmp"
|
||||
"context"
|
||||
"sort"
|
||||
"slices"
|
||||
"time"
|
||||
|
||||
"forge.lthn.ai/core/go/pkg/log"
|
||||
|
|
@ -134,14 +135,20 @@ func priorityRank(p TaskPriority) int {
|
|||
}
|
||||
|
||||
// sortTasksByPriority sorts tasks by priority (Critical first) then by
|
||||
// CreatedAt (oldest first) as a tie-breaker. Uses SliceStable for determinism.
|
||||
// CreatedAt (oldest first) as a tie-breaker. Uses slices.SortStableFunc for determinism.
|
||||
func sortTasksByPriority(tasks []Task) {
|
||||
sort.SliceStable(tasks, func(i, j int) bool {
|
||||
ri, rj := priorityRank(tasks[i].Priority), priorityRank(tasks[j].Priority)
|
||||
slices.SortStableFunc(tasks, func(a, b Task) int {
|
||||
ri, rj := priorityRank(a.Priority), priorityRank(b.Priority)
|
||||
if ri != rj {
|
||||
return ri < rj
|
||||
return cmp.Compare(ri, rj)
|
||||
}
|
||||
return tasks[i].CreatedAt.Before(tasks[j].CreatedAt)
|
||||
if a.CreatedAt.Before(b.CreatedAt) {
|
||||
return -1
|
||||
}
|
||||
if a.CreatedAt.After(b.CreatedAt) {
|
||||
return 1
|
||||
}
|
||||
return 0
|
||||
})
|
||||
}
|
||||
|
||||
|
|
@ -152,7 +159,7 @@ func backoffDuration(retryCount int) time.Duration {
|
|||
return 0
|
||||
}
|
||||
d := baseBackoff
|
||||
for i := 1; i < retryCount; i++ {
|
||||
for range retryCount - 1 {
|
||||
d *= 2
|
||||
}
|
||||
return d
|
||||
|
|
|
|||
9
go.sum
9
go.sum
|
|
@ -1,8 +1,13 @@
|
|||
forge.lthn.ai/core/cli v0.0.1 h1:nqpc4Tv8a4H/ERei+/71DVQxkCFU8HPFJP4120qPXgk=
|
||||
forge.lthn.ai/core/cli v0.0.1/go.mod h1:xa3Nqw3sUtYYJ1k+1jYul18tgs6sBevCUsGsHJI1hHA=
|
||||
forge.lthn.ai/core/go v0.0.1 h1:ubk4nmkA3treOUNgPS28wKd1jB6cUlEQUV7jDdGa3zM=
|
||||
forge.lthn.ai/core/go-ai v0.0.1 h1:GJLjRug8nTCJ06/LVfUbnU2CF7AqEGOBKw7BQ7dLDOo=
|
||||
forge.lthn.ai/core/go v0.0.1/go.mod h1:59YsnuMaAGQUxIhX68oK2/HnhQJEPWL1iEZhDTrNCbY=
|
||||
forge.lthn.ai/core/go-ai v0.0.1 h1:7DhuXe/ucbURElC+Y9BSauEgnBFCV1/RpQIzx4fErmg=
|
||||
forge.lthn.ai/core/go-ai v0.0.1/go.mod h1:5Gdiig42cPAwmXG5ADL2qQ5DFutq5htVKba510LQxbQ=
|
||||
forge.lthn.ai/core/go-crypt v0.0.1 h1:fmFc2SJ/VOXDRjkcYoLWfL7lI4HfPJeVS/Na6zHHcvw=
|
||||
forge.lthn.ai/core/go-rag v0.0.1 h1:WPRetXWfZPYCC48LAv4qvchf6IIVvpz3pge30vUihoA=
|
||||
forge.lthn.ai/core/go-crypt v0.0.1/go.mod h1:/j/rUN2ZMV7x1B5BYxH3QdwkgZg0HNBw5XuyFZeyxBY=
|
||||
forge.lthn.ai/core/go-rag v0.0.1 h1:A3QaeBodcIkOwBTakyTAfvN0922H60O6blTpXcSYagc=
|
||||
forge.lthn.ai/core/go-rag v0.0.1/go.mod h1:xc4tl+KhSgDNNevbWwQ/YiVnC4br/Q0yC6sMeCvQXFw=
|
||||
forge.lthn.ai/core/go-store v0.1.0 h1:ONO4NfnFVey2QOE5JAZp5dQPI2pxRCHWAtQ+oYFJgGE=
|
||||
forge.lthn.ai/core/go-store v0.1.0/go.mod h1:FpUlLEX/ebyoxpk96F7ktr0vYvmFtC5Rpi9fi88UVqw=
|
||||
github.com/ProtonMail/go-crypto v1.3.0 h1:ILq8+Sf5If5DCpHQp4PbZdS1J7HDFRXz/+xKBiRGFrw=
|
||||
|
|
|
|||
50
registry.go
50
registry.go
|
|
@ -1,6 +1,8 @@
|
|||
package agentic
|
||||
|
||||
import (
|
||||
"iter"
|
||||
"slices"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
|
@ -45,12 +47,16 @@ type AgentRegistry interface {
|
|||
Get(id string) (AgentInfo, error)
|
||||
// List returns a copy of all registered agents.
|
||||
List() []AgentInfo
|
||||
// All returns an iterator over all registered agents.
|
||||
All() iter.Seq[AgentInfo]
|
||||
// Heartbeat updates the agent's LastHeartbeat timestamp and sets status
|
||||
// to Available if the agent was previously Offline.
|
||||
Heartbeat(id string) error
|
||||
// Reap marks agents as Offline if their last heartbeat is older than ttl.
|
||||
// Returns the IDs of agents that were reaped.
|
||||
Reap(ttl time.Duration) []string
|
||||
// Reaped returns an iterator over the IDs of agents that were reaped.
|
||||
Reaped(ttl time.Duration) iter.Seq[string]
|
||||
}
|
||||
|
||||
// MemoryRegistry is an in-memory AgentRegistry implementation guarded by a
|
||||
|
|
@ -106,13 +112,20 @@ func (r *MemoryRegistry) Get(id string) (AgentInfo, error) {
|
|||
|
||||
// List returns a copy of all registered agents.
|
||||
func (r *MemoryRegistry) List() []AgentInfo {
|
||||
r.mu.RLock()
|
||||
defer r.mu.RUnlock()
|
||||
result := make([]AgentInfo, 0, len(r.agents))
|
||||
for _, a := range r.agents {
|
||||
result = append(result, *a)
|
||||
return slices.Collect(r.All())
|
||||
}
|
||||
|
||||
// All returns an iterator over all registered agents.
|
||||
func (r *MemoryRegistry) All() iter.Seq[AgentInfo] {
|
||||
return func(yield func(AgentInfo) bool) {
|
||||
r.mu.RLock()
|
||||
defer r.mu.RUnlock()
|
||||
for _, a := range r.agents {
|
||||
if !yield(*a) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// Heartbeat updates the agent's LastHeartbeat timestamp. If the agent was
|
||||
|
|
@ -134,15 +147,22 @@ func (r *MemoryRegistry) Heartbeat(id string) error {
|
|||
// Reap marks agents as Offline if their last heartbeat is older than ttl.
|
||||
// Returns the IDs of agents that were reaped.
|
||||
func (r *MemoryRegistry) Reap(ttl time.Duration) []string {
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
var reaped []string
|
||||
now := time.Now().UTC()
|
||||
for id, a := range r.agents {
|
||||
if a.Status != AgentOffline && now.Sub(a.LastHeartbeat) > ttl {
|
||||
a.Status = AgentOffline
|
||||
reaped = append(reaped, id)
|
||||
return slices.Collect(r.Reaped(ttl))
|
||||
}
|
||||
|
||||
// Reaped returns an iterator over the IDs of agents that were reaped.
|
||||
func (r *MemoryRegistry) Reaped(ttl time.Duration) iter.Seq[string] {
|
||||
return func(yield func(string) bool) {
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
now := time.Now().UTC()
|
||||
for id, a := range r.agents {
|
||||
if a.Status != AgentOffline && now.Sub(a.LastHeartbeat) > ttl {
|
||||
a.Status = AgentOffline
|
||||
if !yield(id) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return reaped
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,6 +4,8 @@ import (
|
|||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"iter"
|
||||
"slices"
|
||||
"time"
|
||||
|
||||
"github.com/redis/go-redis/v9"
|
||||
|
|
@ -159,26 +161,28 @@ func (r *RedisRegistry) Get(id string) (AgentInfo, error) {
|
|||
|
||||
// List returns a copy of all registered agents by scanning all agent keys.
|
||||
func (r *RedisRegistry) List() []AgentInfo {
|
||||
ctx := context.Background()
|
||||
var result []AgentInfo
|
||||
return slices.Collect(r.All())
|
||||
}
|
||||
|
||||
iter := r.client.Scan(ctx, 0, r.agentPattern(), 100).Iterator()
|
||||
for iter.Next(ctx) {
|
||||
val, err := r.client.Get(ctx, iter.Val()).Result()
|
||||
if err != nil {
|
||||
continue
|
||||
// All returns an iterator over all registered agents.
|
||||
func (r *RedisRegistry) All() iter.Seq[AgentInfo] {
|
||||
return func(yield func(AgentInfo) bool) {
|
||||
ctx := context.Background()
|
||||
iter := r.client.Scan(ctx, 0, r.agentPattern(), 100).Iterator()
|
||||
for iter.Next(ctx) {
|
||||
val, err := r.client.Get(ctx, iter.Val()).Result()
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
var a AgentInfo
|
||||
if err := json.Unmarshal([]byte(val), &a); err != nil {
|
||||
continue
|
||||
}
|
||||
if !yield(a) {
|
||||
return
|
||||
}
|
||||
}
|
||||
var a AgentInfo
|
||||
if err := json.Unmarshal([]byte(val), &a); err != nil {
|
||||
continue
|
||||
}
|
||||
result = append(result, a)
|
||||
}
|
||||
|
||||
if result == nil {
|
||||
return []AgentInfo{}
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// Heartbeat updates the agent's LastHeartbeat timestamp and refreshes the key
|
||||
|
|
@ -220,41 +224,47 @@ func (r *RedisRegistry) Heartbeat(id string) error {
|
|||
// is older than ttl. This is a backup to natural TTL expiry. Returns the IDs
|
||||
// of agents that were reaped.
|
||||
func (r *RedisRegistry) Reap(ttl time.Duration) []string {
|
||||
ctx := context.Background()
|
||||
cutoff := time.Now().UTC().Add(-ttl)
|
||||
var reaped []string
|
||||
return slices.Collect(r.Reaped(ttl))
|
||||
}
|
||||
|
||||
iter := r.client.Scan(ctx, 0, r.agentPattern(), 100).Iterator()
|
||||
for iter.Next(ctx) {
|
||||
key := iter.Val()
|
||||
val, err := r.client.Get(ctx, key).Result()
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
var a AgentInfo
|
||||
if err := json.Unmarshal([]byte(val), &a); err != nil {
|
||||
continue
|
||||
}
|
||||
// Reaped returns an iterator over the IDs of agents that were reaped.
|
||||
func (r *RedisRegistry) Reaped(ttl time.Duration) iter.Seq[string] {
|
||||
return func(yield func(string) bool) {
|
||||
ctx := context.Background()
|
||||
cutoff := time.Now().UTC().Add(-ttl)
|
||||
|
||||
if a.Status != AgentOffline && a.LastHeartbeat.Before(cutoff) {
|
||||
a.Status = AgentOffline
|
||||
data, err := json.Marshal(a)
|
||||
iter := r.client.Scan(ctx, 0, r.agentPattern(), 100).Iterator()
|
||||
for iter.Next(ctx) {
|
||||
key := iter.Val()
|
||||
val, err := r.client.Get(ctx, key).Result()
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
// Preserve remaining TTL (or use default if none).
|
||||
remainingTTL, err := r.client.TTL(ctx, key).Result()
|
||||
if err != nil || remainingTTL <= 0 {
|
||||
remainingTTL = r.defaultTTL
|
||||
}
|
||||
if err := r.client.Set(ctx, key, data, remainingTTL).Err(); err != nil {
|
||||
var a AgentInfo
|
||||
if err := json.Unmarshal([]byte(val), &a); err != nil {
|
||||
continue
|
||||
}
|
||||
reaped = append(reaped, a.ID)
|
||||
|
||||
if a.Status != AgentOffline && a.LastHeartbeat.Before(cutoff) {
|
||||
a.Status = AgentOffline
|
||||
data, err := json.Marshal(a)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
// Preserve remaining TTL (or use default if none).
|
||||
remainingTTL, err := r.client.TTL(ctx, key).Result()
|
||||
if err != nil || remainingTTL <= 0 {
|
||||
remainingTTL = r.defaultTTL
|
||||
}
|
||||
if err := r.client.Set(ctx, key, data, remainingTTL).Err(); err != nil {
|
||||
continue
|
||||
}
|
||||
if !yield(a.ID) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return reaped
|
||||
}
|
||||
|
||||
// FlushPrefix deletes all keys matching the registry's prefix. Useful for
|
||||
|
|
|
|||
|
|
@ -3,6 +3,8 @@ package agentic
|
|||
import (
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"iter"
|
||||
"slices"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
|
@ -114,24 +116,28 @@ func (r *SQLiteRegistry) Get(id string) (AgentInfo, error) {
|
|||
|
||||
// List returns a copy of all registered agents.
|
||||
func (r *SQLiteRegistry) List() []AgentInfo {
|
||||
rows, err := r.db.Query("SELECT id, name, capabilities, status, last_heartbeat, current_load, max_load FROM agents")
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
defer rows.Close()
|
||||
return slices.Collect(r.All())
|
||||
}
|
||||
|
||||
var result []AgentInfo
|
||||
for rows.Next() {
|
||||
a, err := r.scanAgentRow(rows)
|
||||
// All returns an iterator over all registered agents.
|
||||
func (r *SQLiteRegistry) All() iter.Seq[AgentInfo] {
|
||||
return func(yield func(AgentInfo) bool) {
|
||||
rows, err := r.db.Query("SELECT id, name, capabilities, status, last_heartbeat, current_load, max_load FROM agents")
|
||||
if err != nil {
|
||||
continue
|
||||
return
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
for rows.Next() {
|
||||
a, err := r.scanAgentRow(rows)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
if !yield(a) {
|
||||
return
|
||||
}
|
||||
}
|
||||
result = append(result, a)
|
||||
}
|
||||
if result == nil {
|
||||
return []AgentInfo{}
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// Heartbeat updates the agent's LastHeartbeat timestamp. If the agent was
|
||||
|
|
@ -164,47 +170,58 @@ func (r *SQLiteRegistry) Heartbeat(id string) error {
|
|||
// Reap marks agents as Offline if their last heartbeat is older than ttl.
|
||||
// Returns the IDs of agents that were reaped.
|
||||
func (r *SQLiteRegistry) Reap(ttl time.Duration) []string {
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
return slices.Collect(r.Reaped(ttl))
|
||||
}
|
||||
|
||||
cutoff := time.Now().UTC().Add(-ttl).Format(time.RFC3339Nano)
|
||||
// Reaped returns an iterator over the IDs of agents that were reaped.
|
||||
func (r *SQLiteRegistry) Reaped(ttl time.Duration) iter.Seq[string] {
|
||||
return func(yield func(string) bool) {
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
|
||||
// Select agents that will be reaped before updating.
|
||||
rows, err := r.db.Query(
|
||||
"SELECT id FROM agents WHERE status != ? AND last_heartbeat < ?",
|
||||
string(AgentOffline), cutoff)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
defer rows.Close()
|
||||
cutoff := time.Now().UTC().Add(-ttl).Format(time.RFC3339Nano)
|
||||
|
||||
var reaped []string
|
||||
for rows.Next() {
|
||||
var id string
|
||||
if err := rows.Scan(&id); err != nil {
|
||||
continue
|
||||
// Select agents that will be reaped before updating.
|
||||
rows, err := r.db.Query(
|
||||
"SELECT id FROM agents WHERE status != ? AND last_heartbeat < ?",
|
||||
string(AgentOffline), cutoff)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
reaped = append(reaped, id)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil
|
||||
}
|
||||
rows.Close()
|
||||
defer rows.Close()
|
||||
|
||||
if len(reaped) > 0 {
|
||||
// Build placeholders for IN clause.
|
||||
placeholders := make([]string, len(reaped))
|
||||
args := make([]any, len(reaped))
|
||||
for i, id := range reaped {
|
||||
placeholders[i] = "?"
|
||||
args[i] = id
|
||||
var reaped []string
|
||||
for rows.Next() {
|
||||
var id string
|
||||
if err := rows.Scan(&id); err != nil {
|
||||
continue
|
||||
}
|
||||
reaped = append(reaped, id)
|
||||
}
|
||||
query := "UPDATE agents SET status = ? WHERE id IN (" + strings.Join(placeholders, ",") + ")"
|
||||
allArgs := append([]any{string(AgentOffline)}, args...)
|
||||
_, _ = r.db.Exec(query, allArgs...)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return
|
||||
}
|
||||
rows.Close()
|
||||
|
||||
return reaped
|
||||
if len(reaped) > 0 {
|
||||
// Build placeholders for IN clause.
|
||||
placeholders := make([]string, len(reaped))
|
||||
args := make([]any, len(reaped))
|
||||
for i, id := range reaped {
|
||||
placeholders[i] = "?"
|
||||
args[i] = id
|
||||
}
|
||||
query := "UPDATE agents SET status = ? WHERE id IN (" + strings.Join(placeholders, ",") + ")"
|
||||
allArgs := append([]any{string(AgentOffline)}, args...)
|
||||
_, _ = r.db.Exec(query, allArgs...)
|
||||
|
||||
for _, id := range reaped {
|
||||
if !yield(id) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// --- internal helpers ---
|
||||
|
|
|
|||
18
router.go
18
router.go
|
|
@ -1,9 +1,9 @@
|
|||
package agentic
|
||||
|
||||
import (
|
||||
"cmp"
|
||||
"errors"
|
||||
"slices"
|
||||
"sort"
|
||||
)
|
||||
|
||||
// ErrNoEligibleAgent is returned when no agent matches the task requirements.
|
||||
|
|
@ -91,11 +91,11 @@ func (r *DefaultRouter) hasCapabilities(a AgentInfo, labels []string) bool {
|
|||
// agent ID (alphabetical).
|
||||
func (r *DefaultRouter) leastLoaded(agents []AgentInfo) string {
|
||||
// Sort: lowest load first, then by ID for determinism.
|
||||
sort.Slice(agents, func(i, j int) bool {
|
||||
if agents[i].CurrentLoad != agents[j].CurrentLoad {
|
||||
return agents[i].CurrentLoad < agents[j].CurrentLoad
|
||||
slices.SortFunc(agents, func(a, b AgentInfo) int {
|
||||
if a.CurrentLoad != b.CurrentLoad {
|
||||
return cmp.Compare(a.CurrentLoad, b.CurrentLoad)
|
||||
}
|
||||
return agents[i].ID < agents[j].ID
|
||||
return cmp.Compare(a.ID, b.ID)
|
||||
})
|
||||
return agents[0].ID
|
||||
}
|
||||
|
|
@ -119,11 +119,11 @@ func (r *DefaultRouter) highestScored(agents []AgentInfo) string {
|
|||
}
|
||||
|
||||
// Sort: highest score first, then by ID for determinism.
|
||||
sort.Slice(scores, func(i, j int) bool {
|
||||
if scores[i].score != scores[j].score {
|
||||
return scores[i].score > scores[j].score
|
||||
slices.SortFunc(scores, func(a, b scored) int {
|
||||
if a.score != b.score {
|
||||
return cmp.Compare(b.score, a.score) // highest first
|
||||
}
|
||||
return scores[i].id < scores[j].id
|
||||
return cmp.Compare(a.id, b.id)
|
||||
})
|
||||
|
||||
return scores[0].id
|
||||
|
|
|
|||
10
status.go
10
status.go
|
|
@ -1,9 +1,10 @@
|
|||
package agentic
|
||||
|
||||
import (
|
||||
"cmp"
|
||||
"context"
|
||||
"fmt"
|
||||
"sort"
|
||||
"slices"
|
||||
"strings"
|
||||
|
||||
"forge.lthn.ai/core/go/pkg/log"
|
||||
|
|
@ -107,9 +108,10 @@ func FormatStatus(s *StatusSummary) string {
|
|||
|
||||
if len(s.Agents) > 0 {
|
||||
// Sort agents by ID for deterministic output.
|
||||
agents := make([]AgentInfo, len(s.Agents))
|
||||
copy(agents, s.Agents)
|
||||
sort.Slice(agents, func(i, j int) bool { return agents[i].ID < agents[j].ID })
|
||||
agents := slices.Clone(s.Agents)
|
||||
slices.SortFunc(agents, func(a, b AgentInfo) int {
|
||||
return cmp.Compare(a.ID, b.ID)
|
||||
})
|
||||
|
||||
fmt.Fprintf(&b, "%-16s%-12s%-8s%s\n", "Agent", "Status", "Load", "Remaining")
|
||||
for _, a := range agents {
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue