go-agentic/allowance_sqlite.go

334 lines
9.7 KiB
Go
Raw Permalink Normal View History

package agentic
import (
"encoding/json"
"errors"
"iter"
"sync"
"time"
"forge.lthn.ai/core/go-store"
)
// SQLite group names for namespacing data in the KV store.
const (
groupAllowances = "allowances"
groupUsage = "usage"
groupModelQuota = "model_quotas"
groupModelUsage = "model_usage"
)
// SQLiteStore implements AllowanceStore using go-store (SQLite KV).
// It provides persistent storage that survives process restarts.
type SQLiteStore struct {
db *store.Store
mu sync.Mutex // serialises read-modify-write operations
}
// Allowances returns an iterator over all agent allowances.
func (s *SQLiteStore) Allowances() iter.Seq[*AgentAllowance] {
return func(yield func(*AgentAllowance) bool) {
for kv, err := range s.db.All(groupAllowances) {
if err != nil {
continue
}
var a allowanceJSON
if err := json.Unmarshal([]byte(kv.Value), &a); err != nil {
continue
}
if !yield(a.toAgentAllowance()) {
return
}
}
}
}
// Usages returns an iterator over all usage records.
func (s *SQLiteStore) Usages() iter.Seq[*UsageRecord] {
return func(yield func(*UsageRecord) bool) {
for kv, err := range s.db.All(groupUsage) {
if err != nil {
continue
}
var u UsageRecord
if err := json.Unmarshal([]byte(kv.Value), &u); err != nil {
continue
}
if !yield(&u) {
return
}
}
}
}
// NewSQLiteStore creates a new SQLite-backed allowance store at the given path.
// Use ":memory:" for tests that do not need persistence.
func NewSQLiteStore(dbPath string) (*SQLiteStore, error) {
db, err := store.New(dbPath)
if err != nil {
return nil, &APIError{Code: 500, Message: "failed to open SQLite store: " + err.Error()}
}
return &SQLiteStore{db: db}, nil
}
// Close releases the underlying SQLite database.
func (s *SQLiteStore) Close() error {
return s.db.Close()
}
// GetAllowance returns the quota limits for an agent.
func (s *SQLiteStore) GetAllowance(agentID string) (*AgentAllowance, error) {
val, err := s.db.Get(groupAllowances, agentID)
if err != nil {
if errors.Is(err, store.ErrNotFound) {
return nil, &APIError{Code: 404, Message: "allowance not found for agent: " + agentID}
}
return nil, &APIError{Code: 500, Message: "failed to get allowance: " + err.Error()}
}
var a allowanceJSON
if err := json.Unmarshal([]byte(val), &a); err != nil {
return nil, &APIError{Code: 500, Message: "failed to unmarshal allowance: " + err.Error()}
}
return a.toAgentAllowance(), nil
}
// SetAllowance persists quota limits for an agent.
func (s *SQLiteStore) SetAllowance(a *AgentAllowance) error {
aj := newAllowanceJSON(a)
data, err := json.Marshal(aj)
if err != nil {
return &APIError{Code: 500, Message: "failed to marshal allowance: " + err.Error()}
}
if err := s.db.Set(groupAllowances, a.AgentID, string(data)); err != nil {
return &APIError{Code: 500, Message: "failed to set allowance: " + err.Error()}
}
return nil
}
// GetUsage returns the current usage record for an agent.
func (s *SQLiteStore) GetUsage(agentID string) (*UsageRecord, error) {
val, err := s.db.Get(groupUsage, agentID)
if err != nil {
if errors.Is(err, store.ErrNotFound) {
return &UsageRecord{
AgentID: agentID,
PeriodStart: startOfDay(time.Now().UTC()),
}, nil
}
return nil, &APIError{Code: 500, Message: "failed to get usage: " + err.Error()}
}
var u UsageRecord
if err := json.Unmarshal([]byte(val), &u); err != nil {
return nil, &APIError{Code: 500, Message: "failed to unmarshal usage: " + err.Error()}
}
return &u, nil
}
// IncrementUsage atomically adds to an agent's usage counters.
func (s *SQLiteStore) IncrementUsage(agentID string, tokens int64, jobs int) error {
s.mu.Lock()
defer s.mu.Unlock()
u, err := s.getUsageLocked(agentID)
if err != nil {
return err
}
u.TokensUsed += tokens
u.JobsStarted += jobs
if jobs > 0 {
u.ActiveJobs += jobs
}
return s.putUsageLocked(u)
}
// DecrementActiveJobs reduces the active job count by 1.
func (s *SQLiteStore) DecrementActiveJobs(agentID string) error {
s.mu.Lock()
defer s.mu.Unlock()
u, err := s.getUsageLocked(agentID)
if err != nil {
return err
}
if u.ActiveJobs > 0 {
u.ActiveJobs--
}
return s.putUsageLocked(u)
}
// ReturnTokens adds tokens back to the agent's remaining quota.
func (s *SQLiteStore) ReturnTokens(agentID string, tokens int64) error {
s.mu.Lock()
defer s.mu.Unlock()
u, err := s.getUsageLocked(agentID)
if err != nil {
return err
}
u.TokensUsed -= tokens
if u.TokensUsed < 0 {
u.TokensUsed = 0
}
return s.putUsageLocked(u)
}
// ResetUsage clears usage counters for an agent.
func (s *SQLiteStore) ResetUsage(agentID string) error {
s.mu.Lock()
defer s.mu.Unlock()
u := &UsageRecord{
AgentID: agentID,
PeriodStart: startOfDay(time.Now().UTC()),
}
return s.putUsageLocked(u)
}
// GetModelQuota returns global limits for a model.
func (s *SQLiteStore) GetModelQuota(model string) (*ModelQuota, error) {
val, err := s.db.Get(groupModelQuota, model)
if err != nil {
if errors.Is(err, store.ErrNotFound) {
return nil, &APIError{Code: 404, Message: "model quota not found: " + model}
}
return nil, &APIError{Code: 500, Message: "failed to get model quota: " + err.Error()}
}
var q ModelQuota
if err := json.Unmarshal([]byte(val), &q); err != nil {
return nil, &APIError{Code: 500, Message: "failed to unmarshal model quota: " + err.Error()}
}
return &q, nil
}
// GetModelUsage returns current token usage for a model.
func (s *SQLiteStore) GetModelUsage(model string) (int64, error) {
val, err := s.db.Get(groupModelUsage, model)
if err != nil {
if errors.Is(err, store.ErrNotFound) {
return 0, nil
}
return 0, &APIError{Code: 500, Message: "failed to get model usage: " + err.Error()}
}
var tokens int64
if err := json.Unmarshal([]byte(val), &tokens); err != nil {
return 0, &APIError{Code: 500, Message: "failed to unmarshal model usage: " + err.Error()}
}
return tokens, nil
}
// IncrementModelUsage atomically adds to a model's usage counter.
func (s *SQLiteStore) IncrementModelUsage(model string, tokens int64) error {
s.mu.Lock()
defer s.mu.Unlock()
current, err := s.getModelUsageLocked(model)
if err != nil {
return err
}
current += tokens
data, err := json.Marshal(current)
if err != nil {
return &APIError{Code: 500, Message: "failed to marshal model usage: " + err.Error()}
}
if err := s.db.Set(groupModelUsage, model, string(data)); err != nil {
return &APIError{Code: 500, Message: "failed to set model usage: " + err.Error()}
}
return nil
}
// SetModelQuota persists global limits for a model.
func (s *SQLiteStore) SetModelQuota(q *ModelQuota) error {
data, err := json.Marshal(q)
if err != nil {
return &APIError{Code: 500, Message: "failed to marshal model quota: " + err.Error()}
}
if err := s.db.Set(groupModelQuota, q.Model, string(data)); err != nil {
return &APIError{Code: 500, Message: "failed to set model quota: " + err.Error()}
}
return nil
}
// --- internal helpers (must be called with mu held) ---
// getUsageLocked reads a UsageRecord from the store. Caller must hold s.mu.
func (s *SQLiteStore) getUsageLocked(agentID string) (*UsageRecord, error) {
val, err := s.db.Get(groupUsage, agentID)
if err != nil {
if errors.Is(err, store.ErrNotFound) {
return &UsageRecord{
AgentID: agentID,
PeriodStart: startOfDay(time.Now().UTC()),
}, nil
}
return nil, &APIError{Code: 500, Message: "failed to get usage: " + err.Error()}
}
var u UsageRecord
if err := json.Unmarshal([]byte(val), &u); err != nil {
return nil, &APIError{Code: 500, Message: "failed to unmarshal usage: " + err.Error()}
}
return &u, nil
}
// putUsageLocked writes a UsageRecord to the store. Caller must hold s.mu.
func (s *SQLiteStore) putUsageLocked(u *UsageRecord) error {
data, err := json.Marshal(u)
if err != nil {
return &APIError{Code: 500, Message: "failed to marshal usage: " + err.Error()}
}
if err := s.db.Set(groupUsage, u.AgentID, string(data)); err != nil {
return &APIError{Code: 500, Message: "failed to set usage: " + err.Error()}
}
return nil
}
// getModelUsageLocked reads model usage from the store. Caller must hold s.mu.
func (s *SQLiteStore) getModelUsageLocked(model string) (int64, error) {
val, err := s.db.Get(groupModelUsage, model)
if err != nil {
if errors.Is(err, store.ErrNotFound) {
return 0, nil
}
return 0, &APIError{Code: 500, Message: "failed to get model usage: " + err.Error()}
}
var tokens int64
if err := json.Unmarshal([]byte(val), &tokens); err != nil {
return 0, &APIError{Code: 500, Message: "failed to unmarshal model usage: " + err.Error()}
}
return tokens, nil
}
// --- JSON serialisation helper for AgentAllowance ---
// time.Duration does not have a stable JSON representation. We serialise it
// as an int64 (nanoseconds) to avoid locale-dependent string parsing.
type allowanceJSON struct {
AgentID string `json:"agent_id"`
DailyTokenLimit int64 `json:"daily_token_limit"`
DailyJobLimit int `json:"daily_job_limit"`
ConcurrentJobs int `json:"concurrent_jobs"`
MaxJobDurationNs int64 `json:"max_job_duration_ns"`
ModelAllowlist []string `json:"model_allowlist,omitempty"`
}
func newAllowanceJSON(a *AgentAllowance) *allowanceJSON {
return &allowanceJSON{
AgentID: a.AgentID,
DailyTokenLimit: a.DailyTokenLimit,
DailyJobLimit: a.DailyJobLimit,
ConcurrentJobs: a.ConcurrentJobs,
MaxJobDurationNs: int64(a.MaxJobDuration),
ModelAllowlist: a.ModelAllowlist,
}
}
func (aj *allowanceJSON) toAgentAllowance() *AgentAllowance {
return &AgentAllowance{
AgentID: aj.AgentID,
DailyTokenLimit: aj.DailyTokenLimit,
DailyJobLimit: aj.DailyJobLimit,
ConcurrentJobs: aj.ConcurrentJobs,
MaxJobDuration: time.Duration(aj.MaxJobDurationNs),
ModelAllowlist: aj.ModelAllowlist,
}
}