go-agentic/allowance_sqlite.go
Snider 1e263febf2
Some checks failed
Security Scan / security (push) Successful in 7s
Test / test (push) Failing after 39s
feat: modernise to Go 1.26 iterators and stdlib helpers
Add iter.Seq iterators for AgentRegistry (AllSeq) and AllowanceStore
(ListSeq) across all backends (sqlite, redis). Use slices.SortFunc,
slices.Contains, maps.Keys in dispatcher and router.

Co-Authored-By: Gemini <noreply@google.com>
Co-Authored-By: Virgil <virgil@lethean.io>
2026-02-23 06:08:13 +00:00

333 lines
9.7 KiB
Go

package agentic
import (
"encoding/json"
"errors"
"iter"
"sync"
"time"
"forge.lthn.ai/core/go-store"
)
// SQLite group names for namespacing data in the KV store.
const (
groupAllowances = "allowances"
groupUsage = "usage"
groupModelQuota = "model_quotas"
groupModelUsage = "model_usage"
)
// SQLiteStore implements AllowanceStore using go-store (SQLite KV).
// It provides persistent storage that survives process restarts.
type SQLiteStore struct {
db *store.Store
mu sync.Mutex // serialises read-modify-write operations
}
// Allowances returns an iterator over all agent allowances.
func (s *SQLiteStore) Allowances() iter.Seq[*AgentAllowance] {
return func(yield func(*AgentAllowance) bool) {
for kv, err := range s.db.All(groupAllowances) {
if err != nil {
continue
}
var a allowanceJSON
if err := json.Unmarshal([]byte(kv.Value), &a); err != nil {
continue
}
if !yield(a.toAgentAllowance()) {
return
}
}
}
}
// Usages returns an iterator over all usage records.
func (s *SQLiteStore) Usages() iter.Seq[*UsageRecord] {
return func(yield func(*UsageRecord) bool) {
for kv, err := range s.db.All(groupUsage) {
if err != nil {
continue
}
var u UsageRecord
if err := json.Unmarshal([]byte(kv.Value), &u); err != nil {
continue
}
if !yield(&u) {
return
}
}
}
}
// NewSQLiteStore creates a new SQLite-backed allowance store at the given path.
// Use ":memory:" for tests that do not need persistence.
func NewSQLiteStore(dbPath string) (*SQLiteStore, error) {
db, err := store.New(dbPath)
if err != nil {
return nil, &APIError{Code: 500, Message: "failed to open SQLite store: " + err.Error()}
}
return &SQLiteStore{db: db}, nil
}
// Close releases the underlying SQLite database.
func (s *SQLiteStore) Close() error {
return s.db.Close()
}
// GetAllowance returns the quota limits for an agent.
func (s *SQLiteStore) GetAllowance(agentID string) (*AgentAllowance, error) {
val, err := s.db.Get(groupAllowances, agentID)
if err != nil {
if errors.Is(err, store.ErrNotFound) {
return nil, &APIError{Code: 404, Message: "allowance not found for agent: " + agentID}
}
return nil, &APIError{Code: 500, Message: "failed to get allowance: " + err.Error()}
}
var a allowanceJSON
if err := json.Unmarshal([]byte(val), &a); err != nil {
return nil, &APIError{Code: 500, Message: "failed to unmarshal allowance: " + err.Error()}
}
return a.toAgentAllowance(), nil
}
// SetAllowance persists quota limits for an agent.
func (s *SQLiteStore) SetAllowance(a *AgentAllowance) error {
aj := newAllowanceJSON(a)
data, err := json.Marshal(aj)
if err != nil {
return &APIError{Code: 500, Message: "failed to marshal allowance: " + err.Error()}
}
if err := s.db.Set(groupAllowances, a.AgentID, string(data)); err != nil {
return &APIError{Code: 500, Message: "failed to set allowance: " + err.Error()}
}
return nil
}
// GetUsage returns the current usage record for an agent.
func (s *SQLiteStore) GetUsage(agentID string) (*UsageRecord, error) {
val, err := s.db.Get(groupUsage, agentID)
if err != nil {
if errors.Is(err, store.ErrNotFound) {
return &UsageRecord{
AgentID: agentID,
PeriodStart: startOfDay(time.Now().UTC()),
}, nil
}
return nil, &APIError{Code: 500, Message: "failed to get usage: " + err.Error()}
}
var u UsageRecord
if err := json.Unmarshal([]byte(val), &u); err != nil {
return nil, &APIError{Code: 500, Message: "failed to unmarshal usage: " + err.Error()}
}
return &u, nil
}
// IncrementUsage atomically adds to an agent's usage counters.
func (s *SQLiteStore) IncrementUsage(agentID string, tokens int64, jobs int) error {
s.mu.Lock()
defer s.mu.Unlock()
u, err := s.getUsageLocked(agentID)
if err != nil {
return err
}
u.TokensUsed += tokens
u.JobsStarted += jobs
if jobs > 0 {
u.ActiveJobs += jobs
}
return s.putUsageLocked(u)
}
// DecrementActiveJobs reduces the active job count by 1.
func (s *SQLiteStore) DecrementActiveJobs(agentID string) error {
s.mu.Lock()
defer s.mu.Unlock()
u, err := s.getUsageLocked(agentID)
if err != nil {
return err
}
if u.ActiveJobs > 0 {
u.ActiveJobs--
}
return s.putUsageLocked(u)
}
// ReturnTokens adds tokens back to the agent's remaining quota.
func (s *SQLiteStore) ReturnTokens(agentID string, tokens int64) error {
s.mu.Lock()
defer s.mu.Unlock()
u, err := s.getUsageLocked(agentID)
if err != nil {
return err
}
u.TokensUsed -= tokens
if u.TokensUsed < 0 {
u.TokensUsed = 0
}
return s.putUsageLocked(u)
}
// ResetUsage clears usage counters for an agent.
func (s *SQLiteStore) ResetUsage(agentID string) error {
s.mu.Lock()
defer s.mu.Unlock()
u := &UsageRecord{
AgentID: agentID,
PeriodStart: startOfDay(time.Now().UTC()),
}
return s.putUsageLocked(u)
}
// GetModelQuota returns global limits for a model.
func (s *SQLiteStore) GetModelQuota(model string) (*ModelQuota, error) {
val, err := s.db.Get(groupModelQuota, model)
if err != nil {
if errors.Is(err, store.ErrNotFound) {
return nil, &APIError{Code: 404, Message: "model quota not found: " + model}
}
return nil, &APIError{Code: 500, Message: "failed to get model quota: " + err.Error()}
}
var q ModelQuota
if err := json.Unmarshal([]byte(val), &q); err != nil {
return nil, &APIError{Code: 500, Message: "failed to unmarshal model quota: " + err.Error()}
}
return &q, nil
}
// GetModelUsage returns current token usage for a model.
func (s *SQLiteStore) GetModelUsage(model string) (int64, error) {
val, err := s.db.Get(groupModelUsage, model)
if err != nil {
if errors.Is(err, store.ErrNotFound) {
return 0, nil
}
return 0, &APIError{Code: 500, Message: "failed to get model usage: " + err.Error()}
}
var tokens int64
if err := json.Unmarshal([]byte(val), &tokens); err != nil {
return 0, &APIError{Code: 500, Message: "failed to unmarshal model usage: " + err.Error()}
}
return tokens, nil
}
// IncrementModelUsage atomically adds to a model's usage counter.
func (s *SQLiteStore) IncrementModelUsage(model string, tokens int64) error {
s.mu.Lock()
defer s.mu.Unlock()
current, err := s.getModelUsageLocked(model)
if err != nil {
return err
}
current += tokens
data, err := json.Marshal(current)
if err != nil {
return &APIError{Code: 500, Message: "failed to marshal model usage: " + err.Error()}
}
if err := s.db.Set(groupModelUsage, model, string(data)); err != nil {
return &APIError{Code: 500, Message: "failed to set model usage: " + err.Error()}
}
return nil
}
// SetModelQuota persists global limits for a model.
func (s *SQLiteStore) SetModelQuota(q *ModelQuota) error {
data, err := json.Marshal(q)
if err != nil {
return &APIError{Code: 500, Message: "failed to marshal model quota: " + err.Error()}
}
if err := s.db.Set(groupModelQuota, q.Model, string(data)); err != nil {
return &APIError{Code: 500, Message: "failed to set model quota: " + err.Error()}
}
return nil
}
// --- internal helpers (must be called with mu held) ---
// getUsageLocked reads a UsageRecord from the store. Caller must hold s.mu.
func (s *SQLiteStore) getUsageLocked(agentID string) (*UsageRecord, error) {
val, err := s.db.Get(groupUsage, agentID)
if err != nil {
if errors.Is(err, store.ErrNotFound) {
return &UsageRecord{
AgentID: agentID,
PeriodStart: startOfDay(time.Now().UTC()),
}, nil
}
return nil, &APIError{Code: 500, Message: "failed to get usage: " + err.Error()}
}
var u UsageRecord
if err := json.Unmarshal([]byte(val), &u); err != nil {
return nil, &APIError{Code: 500, Message: "failed to unmarshal usage: " + err.Error()}
}
return &u, nil
}
// putUsageLocked writes a UsageRecord to the store. Caller must hold s.mu.
func (s *SQLiteStore) putUsageLocked(u *UsageRecord) error {
data, err := json.Marshal(u)
if err != nil {
return &APIError{Code: 500, Message: "failed to marshal usage: " + err.Error()}
}
if err := s.db.Set(groupUsage, u.AgentID, string(data)); err != nil {
return &APIError{Code: 500, Message: "failed to set usage: " + err.Error()}
}
return nil
}
// getModelUsageLocked reads model usage from the store. Caller must hold s.mu.
func (s *SQLiteStore) getModelUsageLocked(model string) (int64, error) {
val, err := s.db.Get(groupModelUsage, model)
if err != nil {
if errors.Is(err, store.ErrNotFound) {
return 0, nil
}
return 0, &APIError{Code: 500, Message: "failed to get model usage: " + err.Error()}
}
var tokens int64
if err := json.Unmarshal([]byte(val), &tokens); err != nil {
return 0, &APIError{Code: 500, Message: "failed to unmarshal model usage: " + err.Error()}
}
return tokens, nil
}
// --- JSON serialisation helper for AgentAllowance ---
// time.Duration does not have a stable JSON representation. We serialise it
// as an int64 (nanoseconds) to avoid locale-dependent string parsing.
type allowanceJSON struct {
AgentID string `json:"agent_id"`
DailyTokenLimit int64 `json:"daily_token_limit"`
DailyJobLimit int `json:"daily_job_limit"`
ConcurrentJobs int `json:"concurrent_jobs"`
MaxJobDurationNs int64 `json:"max_job_duration_ns"`
ModelAllowlist []string `json:"model_allowlist,omitempty"`
}
func newAllowanceJSON(a *AgentAllowance) *allowanceJSON {
return &allowanceJSON{
AgentID: a.AgentID,
DailyTokenLimit: a.DailyTokenLimit,
DailyJobLimit: a.DailyJobLimit,
ConcurrentJobs: a.ConcurrentJobs,
MaxJobDurationNs: int64(a.MaxJobDuration),
ModelAllowlist: a.ModelAllowlist,
}
}
func (aj *allowanceJSON) toAgentAllowance() *AgentAllowance {
return &AgentAllowance{
AgentID: aj.AgentID,
DailyTokenLimit: aj.DailyTokenLimit,
DailyJobLimit: aj.DailyJobLimit,
ConcurrentJobs: aj.ConcurrentJobs,
MaxJobDuration: time.Duration(aj.MaxJobDurationNs),
ModelAllowlist: aj.ModelAllowlist,
}
}