go-agentic/registry_sqlite.go

268 lines
8.1 KiB
Go
Raw Permalink Normal View History

package agentic
import (
"database/sql"
"encoding/json"
"iter"
"slices"
"strings"
"sync"
"time"
_ "modernc.org/sqlite"
)
// SQLiteRegistry implements AgentRegistry using a SQLite database.
// It provides persistent storage that survives process restarts.
type SQLiteRegistry struct {
db *sql.DB
mu sync.Mutex // serialises read-modify-write operations
}
// NewSQLiteRegistry creates a new SQLite-backed agent registry at the given path.
// Use ":memory:" for tests that do not need persistence.
func NewSQLiteRegistry(dbPath string) (*SQLiteRegistry, error) {
db, err := sql.Open("sqlite", dbPath)
if err != nil {
return nil, &APIError{Code: 500, Message: "failed to open SQLite registry: " + err.Error()}
}
db.SetMaxOpenConns(1)
if _, err := db.Exec("PRAGMA journal_mode=WAL"); err != nil {
db.Close()
return nil, &APIError{Code: 500, Message: "failed to set WAL mode: " + err.Error()}
}
if _, err := db.Exec("PRAGMA busy_timeout=5000"); err != nil {
db.Close()
return nil, &APIError{Code: 500, Message: "failed to set busy_timeout: " + err.Error()}
}
if _, err := db.Exec(`CREATE TABLE IF NOT EXISTS agents (
id TEXT PRIMARY KEY,
name TEXT NOT NULL DEFAULT '',
capabilities TEXT NOT NULL DEFAULT '[]',
status TEXT NOT NULL DEFAULT 'available',
last_heartbeat DATETIME NOT NULL DEFAULT (datetime('now')),
current_load INTEGER NOT NULL DEFAULT 0,
max_load INTEGER NOT NULL DEFAULT 0,
registered_at DATETIME NOT NULL DEFAULT (datetime('now'))
)`); err != nil {
db.Close()
return nil, &APIError{Code: 500, Message: "failed to create agents table: " + err.Error()}
}
return &SQLiteRegistry{db: db}, nil
}
// Close releases the underlying SQLite database.
func (r *SQLiteRegistry) Close() error {
return r.db.Close()
}
// Register adds or updates an agent in the registry. Returns an error if the
// agent ID is empty.
func (r *SQLiteRegistry) Register(agent AgentInfo) error {
if agent.ID == "" {
return &APIError{Code: 400, Message: "agent ID is required"}
}
caps, err := json.Marshal(agent.Capabilities)
if err != nil {
return &APIError{Code: 500, Message: "failed to marshal capabilities: " + err.Error()}
}
hb := agent.LastHeartbeat
if hb.IsZero() {
hb = time.Now().UTC()
}
r.mu.Lock()
defer r.mu.Unlock()
_, err = r.db.Exec(`INSERT INTO agents (id, name, capabilities, status, last_heartbeat, current_load, max_load, registered_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(id) DO UPDATE SET
name = excluded.name,
capabilities = excluded.capabilities,
status = excluded.status,
last_heartbeat = excluded.last_heartbeat,
current_load = excluded.current_load,
max_load = excluded.max_load`,
agent.ID, agent.Name, string(caps), string(agent.Status), hb.Format(time.RFC3339Nano),
agent.CurrentLoad, agent.MaxLoad, hb.Format(time.RFC3339Nano))
if err != nil {
return &APIError{Code: 500, Message: "failed to register agent: " + err.Error()}
}
return nil
}
// Deregister removes an agent from the registry. Returns an error if the agent
// is not found.
func (r *SQLiteRegistry) Deregister(id string) error {
r.mu.Lock()
defer r.mu.Unlock()
res, err := r.db.Exec("DELETE FROM agents WHERE id = ?", id)
if err != nil {
return &APIError{Code: 500, Message: "failed to deregister agent: " + err.Error()}
}
n, err := res.RowsAffected()
if err != nil {
return &APIError{Code: 500, Message: "failed to check delete result: " + err.Error()}
}
if n == 0 {
return &APIError{Code: 404, Message: "agent not found: " + id}
}
return nil
}
// Get returns a copy of the agent info for the given ID. Returns an error if
// the agent is not found.
func (r *SQLiteRegistry) Get(id string) (AgentInfo, error) {
return r.scanAgent("SELECT id, name, capabilities, status, last_heartbeat, current_load, max_load FROM agents WHERE id = ?", id)
}
// List returns a copy of all registered agents.
func (r *SQLiteRegistry) List() []AgentInfo {
return slices.Collect(r.All())
}
// All returns an iterator over all registered agents.
func (r *SQLiteRegistry) All() iter.Seq[AgentInfo] {
return func(yield func(AgentInfo) bool) {
rows, err := r.db.Query("SELECT id, name, capabilities, status, last_heartbeat, current_load, max_load FROM agents")
if err != nil {
return
}
defer rows.Close()
for rows.Next() {
a, err := r.scanAgentRow(rows)
if err != nil {
continue
}
if !yield(a) {
return
}
}
}
}
// Heartbeat updates the agent's LastHeartbeat timestamp. If the agent was
// Offline, it transitions to Available.
func (r *SQLiteRegistry) Heartbeat(id string) error {
r.mu.Lock()
defer r.mu.Unlock()
now := time.Now().UTC().Format(time.RFC3339Nano)
// Update heartbeat for all agents, and transition offline agents to available.
res, err := r.db.Exec(`UPDATE agents SET
last_heartbeat = ?,
status = CASE WHEN status = ? THEN ? ELSE status END
WHERE id = ?`,
now, string(AgentOffline), string(AgentAvailable), id)
if err != nil {
return &APIError{Code: 500, Message: "failed to heartbeat agent: " + err.Error()}
}
n, err := res.RowsAffected()
if err != nil {
return &APIError{Code: 500, Message: "failed to check heartbeat result: " + err.Error()}
}
if n == 0 {
return &APIError{Code: 404, Message: "agent not found: " + id}
}
return nil
}
// Reap marks agents as Offline if their last heartbeat is older than ttl.
// Returns the IDs of agents that were reaped.
func (r *SQLiteRegistry) Reap(ttl time.Duration) []string {
return slices.Collect(r.Reaped(ttl))
}
// Reaped returns an iterator over the IDs of agents that were reaped.
func (r *SQLiteRegistry) Reaped(ttl time.Duration) iter.Seq[string] {
return func(yield func(string) bool) {
r.mu.Lock()
defer r.mu.Unlock()
cutoff := time.Now().UTC().Add(-ttl).Format(time.RFC3339Nano)
// Select agents that will be reaped before updating.
rows, err := r.db.Query(
"SELECT id FROM agents WHERE status != ? AND last_heartbeat < ?",
string(AgentOffline), cutoff)
if err != nil {
return
}
defer rows.Close()
var reaped []string
for rows.Next() {
var id string
if err := rows.Scan(&id); err != nil {
continue
}
reaped = append(reaped, id)
}
if err := rows.Err(); err != nil {
return
}
rows.Close()
if len(reaped) > 0 {
// Build placeholders for IN clause.
placeholders := make([]string, len(reaped))
args := make([]any, len(reaped))
for i, id := range reaped {
placeholders[i] = "?"
args[i] = id
}
query := "UPDATE agents SET status = ? WHERE id IN (" + strings.Join(placeholders, ",") + ")"
allArgs := append([]any{string(AgentOffline)}, args...)
_, _ = r.db.Exec(query, allArgs...)
for _, id := range reaped {
if !yield(id) {
return
}
}
}
}
}
// --- internal helpers ---
// scanAgent executes a query that returns a single agent row.
func (r *SQLiteRegistry) scanAgent(query string, args ...any) (AgentInfo, error) {
row := r.db.QueryRow(query, args...)
var a AgentInfo
var capsJSON string
var statusStr string
var hbStr string
err := row.Scan(&a.ID, &a.Name, &capsJSON, &statusStr, &hbStr, &a.CurrentLoad, &a.MaxLoad)
if err == sql.ErrNoRows {
return AgentInfo{}, &APIError{Code: 404, Message: "agent not found: " + args[0].(string)}
}
if err != nil {
return AgentInfo{}, &APIError{Code: 500, Message: "failed to scan agent: " + err.Error()}
}
if err := json.Unmarshal([]byte(capsJSON), &a.Capabilities); err != nil {
return AgentInfo{}, &APIError{Code: 500, Message: "failed to unmarshal capabilities: " + err.Error()}
}
a.Status = AgentStatus(statusStr)
a.LastHeartbeat, _ = time.Parse(time.RFC3339Nano, hbStr)
return a, nil
}
// scanAgentRow scans a single row from a rows iterator.
func (r *SQLiteRegistry) scanAgentRow(rows *sql.Rows) (AgentInfo, error) {
var a AgentInfo
var capsJSON string
var statusStr string
var hbStr string
err := rows.Scan(&a.ID, &a.Name, &capsJSON, &statusStr, &hbStr, &a.CurrentLoad, &a.MaxLoad)
if err != nil {
return AgentInfo{}, err
}
if err := json.Unmarshal([]byte(capsJSON), &a.Capabilities); err != nil {
return AgentInfo{}, err
}
a.Status = AgentStatus(statusStr)
a.LastHeartbeat, _ = time.Parse(time.RFC3339Nano, hbStr)
return a, nil
}