go-agentic/registry_sqlite.go
Snider 1e263febf2
Some checks failed
Security Scan / security (push) Successful in 7s
Test / test (push) Failing after 39s
feat: modernise to Go 1.26 iterators and stdlib helpers
Add iter.Seq iterators for AgentRegistry (AllSeq) and AllowanceStore
(ListSeq) across all backends (sqlite, redis). Use slices.SortFunc,
slices.Contains, maps.Keys in dispatcher and router.

Co-Authored-By: Gemini <noreply@google.com>
Co-Authored-By: Virgil <virgil@lethean.io>
2026-02-23 06:08:13 +00:00

267 lines
8.1 KiB
Go

package agentic
import (
"database/sql"
"encoding/json"
"iter"
"slices"
"strings"
"sync"
"time"
_ "modernc.org/sqlite"
)
// SQLiteRegistry implements AgentRegistry using a SQLite database.
// It provides persistent storage that survives process restarts.
type SQLiteRegistry struct {
db *sql.DB
mu sync.Mutex // serialises read-modify-write operations
}
// NewSQLiteRegistry creates a new SQLite-backed agent registry at the given path.
// Use ":memory:" for tests that do not need persistence.
func NewSQLiteRegistry(dbPath string) (*SQLiteRegistry, error) {
db, err := sql.Open("sqlite", dbPath)
if err != nil {
return nil, &APIError{Code: 500, Message: "failed to open SQLite registry: " + err.Error()}
}
db.SetMaxOpenConns(1)
if _, err := db.Exec("PRAGMA journal_mode=WAL"); err != nil {
db.Close()
return nil, &APIError{Code: 500, Message: "failed to set WAL mode: " + err.Error()}
}
if _, err := db.Exec("PRAGMA busy_timeout=5000"); err != nil {
db.Close()
return nil, &APIError{Code: 500, Message: "failed to set busy_timeout: " + err.Error()}
}
if _, err := db.Exec(`CREATE TABLE IF NOT EXISTS agents (
id TEXT PRIMARY KEY,
name TEXT NOT NULL DEFAULT '',
capabilities TEXT NOT NULL DEFAULT '[]',
status TEXT NOT NULL DEFAULT 'available',
last_heartbeat DATETIME NOT NULL DEFAULT (datetime('now')),
current_load INTEGER NOT NULL DEFAULT 0,
max_load INTEGER NOT NULL DEFAULT 0,
registered_at DATETIME NOT NULL DEFAULT (datetime('now'))
)`); err != nil {
db.Close()
return nil, &APIError{Code: 500, Message: "failed to create agents table: " + err.Error()}
}
return &SQLiteRegistry{db: db}, nil
}
// Close releases the underlying SQLite database.
func (r *SQLiteRegistry) Close() error {
return r.db.Close()
}
// Register adds or updates an agent in the registry. Returns an error if the
// agent ID is empty.
func (r *SQLiteRegistry) Register(agent AgentInfo) error {
if agent.ID == "" {
return &APIError{Code: 400, Message: "agent ID is required"}
}
caps, err := json.Marshal(agent.Capabilities)
if err != nil {
return &APIError{Code: 500, Message: "failed to marshal capabilities: " + err.Error()}
}
hb := agent.LastHeartbeat
if hb.IsZero() {
hb = time.Now().UTC()
}
r.mu.Lock()
defer r.mu.Unlock()
_, err = r.db.Exec(`INSERT INTO agents (id, name, capabilities, status, last_heartbeat, current_load, max_load, registered_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(id) DO UPDATE SET
name = excluded.name,
capabilities = excluded.capabilities,
status = excluded.status,
last_heartbeat = excluded.last_heartbeat,
current_load = excluded.current_load,
max_load = excluded.max_load`,
agent.ID, agent.Name, string(caps), string(agent.Status), hb.Format(time.RFC3339Nano),
agent.CurrentLoad, agent.MaxLoad, hb.Format(time.RFC3339Nano))
if err != nil {
return &APIError{Code: 500, Message: "failed to register agent: " + err.Error()}
}
return nil
}
// Deregister removes an agent from the registry. Returns an error if the agent
// is not found.
func (r *SQLiteRegistry) Deregister(id string) error {
r.mu.Lock()
defer r.mu.Unlock()
res, err := r.db.Exec("DELETE FROM agents WHERE id = ?", id)
if err != nil {
return &APIError{Code: 500, Message: "failed to deregister agent: " + err.Error()}
}
n, err := res.RowsAffected()
if err != nil {
return &APIError{Code: 500, Message: "failed to check delete result: " + err.Error()}
}
if n == 0 {
return &APIError{Code: 404, Message: "agent not found: " + id}
}
return nil
}
// Get returns a copy of the agent info for the given ID. Returns an error if
// the agent is not found.
func (r *SQLiteRegistry) Get(id string) (AgentInfo, error) {
return r.scanAgent("SELECT id, name, capabilities, status, last_heartbeat, current_load, max_load FROM agents WHERE id = ?", id)
}
// List returns a copy of all registered agents.
func (r *SQLiteRegistry) List() []AgentInfo {
return slices.Collect(r.All())
}
// All returns an iterator over all registered agents.
func (r *SQLiteRegistry) All() iter.Seq[AgentInfo] {
return func(yield func(AgentInfo) bool) {
rows, err := r.db.Query("SELECT id, name, capabilities, status, last_heartbeat, current_load, max_load FROM agents")
if err != nil {
return
}
defer rows.Close()
for rows.Next() {
a, err := r.scanAgentRow(rows)
if err != nil {
continue
}
if !yield(a) {
return
}
}
}
}
// Heartbeat updates the agent's LastHeartbeat timestamp. If the agent was
// Offline, it transitions to Available.
func (r *SQLiteRegistry) Heartbeat(id string) error {
r.mu.Lock()
defer r.mu.Unlock()
now := time.Now().UTC().Format(time.RFC3339Nano)
// Update heartbeat for all agents, and transition offline agents to available.
res, err := r.db.Exec(`UPDATE agents SET
last_heartbeat = ?,
status = CASE WHEN status = ? THEN ? ELSE status END
WHERE id = ?`,
now, string(AgentOffline), string(AgentAvailable), id)
if err != nil {
return &APIError{Code: 500, Message: "failed to heartbeat agent: " + err.Error()}
}
n, err := res.RowsAffected()
if err != nil {
return &APIError{Code: 500, Message: "failed to check heartbeat result: " + err.Error()}
}
if n == 0 {
return &APIError{Code: 404, Message: "agent not found: " + id}
}
return nil
}
// Reap marks agents as Offline if their last heartbeat is older than ttl.
// Returns the IDs of agents that were reaped.
func (r *SQLiteRegistry) Reap(ttl time.Duration) []string {
return slices.Collect(r.Reaped(ttl))
}
// Reaped returns an iterator over the IDs of agents that were reaped.
func (r *SQLiteRegistry) Reaped(ttl time.Duration) iter.Seq[string] {
return func(yield func(string) bool) {
r.mu.Lock()
defer r.mu.Unlock()
cutoff := time.Now().UTC().Add(-ttl).Format(time.RFC3339Nano)
// Select agents that will be reaped before updating.
rows, err := r.db.Query(
"SELECT id FROM agents WHERE status != ? AND last_heartbeat < ?",
string(AgentOffline), cutoff)
if err != nil {
return
}
defer rows.Close()
var reaped []string
for rows.Next() {
var id string
if err := rows.Scan(&id); err != nil {
continue
}
reaped = append(reaped, id)
}
if err := rows.Err(); err != nil {
return
}
rows.Close()
if len(reaped) > 0 {
// Build placeholders for IN clause.
placeholders := make([]string, len(reaped))
args := make([]any, len(reaped))
for i, id := range reaped {
placeholders[i] = "?"
args[i] = id
}
query := "UPDATE agents SET status = ? WHERE id IN (" + strings.Join(placeholders, ",") + ")"
allArgs := append([]any{string(AgentOffline)}, args...)
_, _ = r.db.Exec(query, allArgs...)
for _, id := range reaped {
if !yield(id) {
return
}
}
}
}
}
// --- internal helpers ---
// scanAgent executes a query that returns a single agent row.
func (r *SQLiteRegistry) scanAgent(query string, args ...any) (AgentInfo, error) {
row := r.db.QueryRow(query, args...)
var a AgentInfo
var capsJSON string
var statusStr string
var hbStr string
err := row.Scan(&a.ID, &a.Name, &capsJSON, &statusStr, &hbStr, &a.CurrentLoad, &a.MaxLoad)
if err == sql.ErrNoRows {
return AgentInfo{}, &APIError{Code: 404, Message: "agent not found: " + args[0].(string)}
}
if err != nil {
return AgentInfo{}, &APIError{Code: 500, Message: "failed to scan agent: " + err.Error()}
}
if err := json.Unmarshal([]byte(capsJSON), &a.Capabilities); err != nil {
return AgentInfo{}, &APIError{Code: 500, Message: "failed to unmarshal capabilities: " + err.Error()}
}
a.Status = AgentStatus(statusStr)
a.LastHeartbeat, _ = time.Parse(time.RFC3339Nano, hbStr)
return a, nil
}
// scanAgentRow scans a single row from a rows iterator.
func (r *SQLiteRegistry) scanAgentRow(rows *sql.Rows) (AgentInfo, error) {
var a AgentInfo
var capsJSON string
var statusStr string
var hbStr string
err := rows.Scan(&a.ID, &a.Name, &capsJSON, &statusStr, &hbStr, &a.CurrentLoad, &a.MaxLoad)
if err != nil {
return AgentInfo{}, err
}
if err := json.Unmarshal([]byte(capsJSON), &a.Capabilities); err != nil {
return AgentInfo{}, err
}
a.Status = AgentStatus(statusStr)
a.LastHeartbeat, _ = time.Parse(time.RFC3339Nano, hbStr)
return a, nil
}