refactor(ratelimit): replace all fmt.Errorf with coreerr.E from go-log
Replace all 41 remaining fmt.Errorf calls in production code (ratelimit.go and sqlite.go) with coreerr.E() from forge.lthn.ai/core/go-log. Promotes go-log from indirect to direct dependency in go.mod. Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
parent
ee6c5aa69d
commit
4bb1cb96d4
3 changed files with 44 additions and 42 deletions
2
go.mod
2
go.mod
|
|
@ -4,12 +4,12 @@ go 1.26.0
|
|||
|
||||
require (
|
||||
forge.lthn.ai/core/go-io v0.1.2
|
||||
forge.lthn.ai/core/go-log v0.0.2
|
||||
gopkg.in/yaml.v3 v3.0.1
|
||||
modernc.org/sqlite v1.46.1
|
||||
)
|
||||
|
||||
require (
|
||||
forge.lthn.ai/core/go-log v0.0.2 // indirect
|
||||
github.com/dustin/go-humanize v1.0.1 // indirect
|
||||
github.com/google/uuid v1.6.0 // indirect
|
||||
github.com/mattn/go-isatty v0.0.20 // indirect
|
||||
|
|
|
|||
23
ratelimit.go
23
ratelimit.go
|
|
@ -15,6 +15,7 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
coreerr "forge.lthn.ai/core/go-log"
|
||||
coreio "forge.lthn.ai/core/go-io"
|
||||
"gopkg.in/yaml.v3"
|
||||
)
|
||||
|
|
@ -256,10 +257,10 @@ func (rl *RateLimiter) Persist() error {
|
|||
|
||||
if sqlite != nil {
|
||||
if err := sqlite.saveQuotas(quotas); err != nil {
|
||||
return fmt.Errorf("ratelimit.Persist: sqlite quotas: %w", err)
|
||||
return coreerr.E("ratelimit.Persist", "sqlite quotas", err)
|
||||
}
|
||||
if err := sqlite.saveState(state); err != nil {
|
||||
return fmt.Errorf("ratelimit.Persist: sqlite state: %w", err)
|
||||
return coreerr.E("ratelimit.Persist", "sqlite state", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
@ -274,11 +275,11 @@ func (rl *RateLimiter) Persist() error {
|
|||
State: state,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("ratelimit.Persist: marshal: %w", err)
|
||||
return coreerr.E("ratelimit.Persist", "marshal", err)
|
||||
}
|
||||
|
||||
if err := coreio.Local.Write(filePath, string(data)); err != nil {
|
||||
return fmt.Errorf("ratelimit.Persist: write: %w", err)
|
||||
return coreerr.E("ratelimit.Persist", "write", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
@ -617,12 +618,12 @@ func MigrateYAMLToSQLite(yamlPath, sqlitePath string) error {
|
|||
// Load from YAML.
|
||||
content, err := coreio.Local.Read(yamlPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("ratelimit.MigrateYAMLToSQLite: read: %w", err)
|
||||
return coreerr.E("ratelimit.MigrateYAMLToSQLite", "read", err)
|
||||
}
|
||||
|
||||
var rl RateLimiter
|
||||
if err := yaml.Unmarshal([]byte(content), &rl); err != nil {
|
||||
return fmt.Errorf("ratelimit.MigrateYAMLToSQLite: unmarshal: %w", err)
|
||||
return coreerr.E("ratelimit.MigrateYAMLToSQLite", "unmarshal", err)
|
||||
}
|
||||
|
||||
// Write to SQLite.
|
||||
|
|
@ -661,32 +662,32 @@ func CountTokens(ctx context.Context, apiKey, model, text string) (int, error) {
|
|||
|
||||
jsonBody, err := json.Marshal(reqBody)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("ratelimit.CountTokens: marshal request: %w", err)
|
||||
return 0, coreerr.E("ratelimit.CountTokens", "marshal request", err)
|
||||
}
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewBuffer(jsonBody))
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("ratelimit.CountTokens: new request: %w", err)
|
||||
return 0, coreerr.E("ratelimit.CountTokens", "new request", err)
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
req.Header.Set("x-goog-api-key", apiKey)
|
||||
|
||||
resp, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("ratelimit.CountTokens: do request: %w", err)
|
||||
return 0, coreerr.E("ratelimit.CountTokens", "do request", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
return 0, fmt.Errorf("ratelimit.CountTokens: API error (status %d): %s", resp.StatusCode, string(body))
|
||||
return 0, coreerr.E("ratelimit.CountTokens", fmt.Sprintf("API error (status %d): %s", resp.StatusCode, string(body)), nil)
|
||||
}
|
||||
|
||||
var result struct {
|
||||
TotalTokens int `json:"totalTokens"`
|
||||
}
|
||||
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
|
||||
return 0, fmt.Errorf("ratelimit.CountTokens: decode response: %w", err)
|
||||
return 0, coreerr.E("ratelimit.CountTokens", "decode response", err)
|
||||
}
|
||||
|
||||
return result.TotalTokens, nil
|
||||
|
|
|
|||
61
sqlite.go
61
sqlite.go
|
|
@ -5,6 +5,7 @@ import (
|
|||
"fmt"
|
||||
"time"
|
||||
|
||||
coreerr "forge.lthn.ai/core/go-log"
|
||||
_ "modernc.org/sqlite"
|
||||
)
|
||||
|
||||
|
|
@ -19,7 +20,7 @@ type sqliteStore struct {
|
|||
func newSQLiteStore(dbPath string) (*sqliteStore, error) {
|
||||
db, err := sql.Open("sqlite", dbPath)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("ratelimit.newSQLiteStore: open: %w", err)
|
||||
return nil, coreerr.E("ratelimit.newSQLiteStore", "open", err)
|
||||
}
|
||||
|
||||
// Single connection for PRAGMA consistency.
|
||||
|
|
@ -27,11 +28,11 @@ func newSQLiteStore(dbPath string) (*sqliteStore, error) {
|
|||
|
||||
if _, err := db.Exec("PRAGMA journal_mode=WAL"); err != nil {
|
||||
db.Close()
|
||||
return nil, fmt.Errorf("ratelimit.newSQLiteStore: WAL: %w", err)
|
||||
return nil, coreerr.E("ratelimit.newSQLiteStore", "WAL", err)
|
||||
}
|
||||
if _, err := db.Exec("PRAGMA busy_timeout=5000"); err != nil {
|
||||
db.Close()
|
||||
return nil, fmt.Errorf("ratelimit.newSQLiteStore: busy_timeout: %w", err)
|
||||
return nil, coreerr.E("ratelimit.newSQLiteStore", "busy_timeout", err)
|
||||
}
|
||||
|
||||
if err := createSchema(db); err != nil {
|
||||
|
|
@ -71,7 +72,7 @@ func createSchema(db *sql.DB) error {
|
|||
|
||||
for _, stmt := range stmts {
|
||||
if _, err := db.Exec(stmt); err != nil {
|
||||
return fmt.Errorf("ratelimit.createSchema: %w", err)
|
||||
return coreerr.E("ratelimit.createSchema", "exec", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
|
@ -81,7 +82,7 @@ func createSchema(db *sql.DB) error {
|
|||
func (s *sqliteStore) saveQuotas(quotas map[string]ModelQuota) error {
|
||||
tx, err := s.db.Begin()
|
||||
if err != nil {
|
||||
return fmt.Errorf("ratelimit.saveQuotas: begin: %w", err)
|
||||
return coreerr.E("ratelimit.saveQuotas", "begin", err)
|
||||
}
|
||||
defer tx.Rollback()
|
||||
|
||||
|
|
@ -92,13 +93,13 @@ func (s *sqliteStore) saveQuotas(quotas map[string]ModelQuota) error {
|
|||
max_tpm = excluded.max_tpm,
|
||||
max_rpd = excluded.max_rpd`)
|
||||
if err != nil {
|
||||
return fmt.Errorf("ratelimit.saveQuotas: prepare: %w", err)
|
||||
return coreerr.E("ratelimit.saveQuotas", "prepare", err)
|
||||
}
|
||||
defer stmt.Close()
|
||||
|
||||
for model, q := range quotas {
|
||||
if _, err := stmt.Exec(model, q.MaxRPM, q.MaxTPM, q.MaxRPD); err != nil {
|
||||
return fmt.Errorf("ratelimit.saveQuotas: exec %s: %w", model, err)
|
||||
return coreerr.E("ratelimit.saveQuotas", fmt.Sprintf("exec %s", model), err)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -109,7 +110,7 @@ func (s *sqliteStore) saveQuotas(quotas map[string]ModelQuota) error {
|
|||
func (s *sqliteStore) loadQuotas() (map[string]ModelQuota, error) {
|
||||
rows, err := s.db.Query("SELECT model, max_rpm, max_tpm, max_rpd FROM quotas")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("ratelimit.loadQuotas: query: %w", err)
|
||||
return nil, coreerr.E("ratelimit.loadQuotas", "query", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
|
|
@ -118,12 +119,12 @@ func (s *sqliteStore) loadQuotas() (map[string]ModelQuota, error) {
|
|||
var model string
|
||||
var q ModelQuota
|
||||
if err := rows.Scan(&model, &q.MaxRPM, &q.MaxTPM, &q.MaxRPD); err != nil {
|
||||
return nil, fmt.Errorf("ratelimit.loadQuotas: scan: %w", err)
|
||||
return nil, coreerr.E("ratelimit.loadQuotas", "scan", err)
|
||||
}
|
||||
result[model] = q
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, fmt.Errorf("ratelimit.loadQuotas: rows: %w", err)
|
||||
return nil, coreerr.E("ratelimit.loadQuotas", "rows", err)
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
|
@ -134,57 +135,57 @@ func (s *sqliteStore) loadQuotas() (map[string]ModelQuota, error) {
|
|||
func (s *sqliteStore) saveState(state map[string]*UsageStats) error {
|
||||
tx, err := s.db.Begin()
|
||||
if err != nil {
|
||||
return fmt.Errorf("ratelimit.saveState: begin: %w", err)
|
||||
return coreerr.E("ratelimit.saveState", "begin", err)
|
||||
}
|
||||
defer tx.Rollback()
|
||||
|
||||
// Clear existing state in the transaction.
|
||||
if _, err := tx.Exec("DELETE FROM requests"); err != nil {
|
||||
return fmt.Errorf("ratelimit.saveState: clear requests: %w", err)
|
||||
return coreerr.E("ratelimit.saveState", "clear requests", err)
|
||||
}
|
||||
if _, err := tx.Exec("DELETE FROM tokens"); err != nil {
|
||||
return fmt.Errorf("ratelimit.saveState: clear tokens: %w", err)
|
||||
return coreerr.E("ratelimit.saveState", "clear tokens", err)
|
||||
}
|
||||
if _, err := tx.Exec("DELETE FROM daily"); err != nil {
|
||||
return fmt.Errorf("ratelimit.saveState: clear daily: %w", err)
|
||||
return coreerr.E("ratelimit.saveState", "clear daily", err)
|
||||
}
|
||||
|
||||
reqStmt, err := tx.Prepare("INSERT INTO requests (model, ts) VALUES (?, ?)")
|
||||
if err != nil {
|
||||
return fmt.Errorf("ratelimit.saveState: prepare requests: %w", err)
|
||||
return coreerr.E("ratelimit.saveState", "prepare requests", err)
|
||||
}
|
||||
defer reqStmt.Close()
|
||||
|
||||
tokStmt, err := tx.Prepare("INSERT INTO tokens (model, ts, count) VALUES (?, ?, ?)")
|
||||
if err != nil {
|
||||
return fmt.Errorf("ratelimit.saveState: prepare tokens: %w", err)
|
||||
return coreerr.E("ratelimit.saveState", "prepare tokens", err)
|
||||
}
|
||||
defer tokStmt.Close()
|
||||
|
||||
dayStmt, err := tx.Prepare("INSERT INTO daily (model, day_start, day_count) VALUES (?, ?, ?)")
|
||||
if err != nil {
|
||||
return fmt.Errorf("ratelimit.saveState: prepare daily: %w", err)
|
||||
return coreerr.E("ratelimit.saveState", "prepare daily", err)
|
||||
}
|
||||
defer dayStmt.Close()
|
||||
|
||||
for model, stats := range state {
|
||||
for _, t := range stats.Requests {
|
||||
if _, err := reqStmt.Exec(model, t.UnixNano()); err != nil {
|
||||
return fmt.Errorf("ratelimit.saveState: insert request %s: %w", model, err)
|
||||
return coreerr.E("ratelimit.saveState", fmt.Sprintf("insert request %s", model), err)
|
||||
}
|
||||
}
|
||||
for _, te := range stats.Tokens {
|
||||
if _, err := tokStmt.Exec(model, te.Time.UnixNano(), te.Count); err != nil {
|
||||
return fmt.Errorf("ratelimit.saveState: insert token %s: %w", model, err)
|
||||
return coreerr.E("ratelimit.saveState", fmt.Sprintf("insert token %s", model), err)
|
||||
}
|
||||
}
|
||||
if _, err := dayStmt.Exec(model, stats.DayStart.UnixNano(), stats.DayCount); err != nil {
|
||||
return fmt.Errorf("ratelimit.saveState: insert daily %s: %w", model, err)
|
||||
return coreerr.E("ratelimit.saveState", fmt.Sprintf("insert daily %s", model), err)
|
||||
}
|
||||
}
|
||||
|
||||
if err := tx.Commit(); err != nil {
|
||||
return fmt.Errorf("ratelimit.saveState: commit: %w", err)
|
||||
return coreerr.E("ratelimit.saveState", "commit", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
@ -196,7 +197,7 @@ func (s *sqliteStore) loadState() (map[string]*UsageStats, error) {
|
|||
// Load daily counters first (these define which models have state).
|
||||
rows, err := s.db.Query("SELECT model, day_start, day_count FROM daily")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("ratelimit.loadState: query daily: %w", err)
|
||||
return nil, coreerr.E("ratelimit.loadState", "query daily", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
|
|
@ -205,7 +206,7 @@ func (s *sqliteStore) loadState() (map[string]*UsageStats, error) {
|
|||
var dayStartNano int64
|
||||
var dayCount int
|
||||
if err := rows.Scan(&model, &dayStartNano, &dayCount); err != nil {
|
||||
return nil, fmt.Errorf("ratelimit.loadState: scan daily: %w", err)
|
||||
return nil, coreerr.E("ratelimit.loadState", "scan daily", err)
|
||||
}
|
||||
result[model] = &UsageStats{
|
||||
DayStart: time.Unix(0, dayStartNano),
|
||||
|
|
@ -213,13 +214,13 @@ func (s *sqliteStore) loadState() (map[string]*UsageStats, error) {
|
|||
}
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, fmt.Errorf("ratelimit.loadState: daily rows: %w", err)
|
||||
return nil, coreerr.E("ratelimit.loadState", "daily rows", err)
|
||||
}
|
||||
|
||||
// Load requests.
|
||||
reqRows, err := s.db.Query("SELECT model, ts FROM requests ORDER BY ts")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("ratelimit.loadState: query requests: %w", err)
|
||||
return nil, coreerr.E("ratelimit.loadState", "query requests", err)
|
||||
}
|
||||
defer reqRows.Close()
|
||||
|
||||
|
|
@ -227,7 +228,7 @@ func (s *sqliteStore) loadState() (map[string]*UsageStats, error) {
|
|||
var model string
|
||||
var tsNano int64
|
||||
if err := reqRows.Scan(&model, &tsNano); err != nil {
|
||||
return nil, fmt.Errorf("ratelimit.loadState: scan requests: %w", err)
|
||||
return nil, coreerr.E("ratelimit.loadState", "scan requests", err)
|
||||
}
|
||||
if _, ok := result[model]; !ok {
|
||||
result[model] = &UsageStats{}
|
||||
|
|
@ -235,13 +236,13 @@ func (s *sqliteStore) loadState() (map[string]*UsageStats, error) {
|
|||
result[model].Requests = append(result[model].Requests, time.Unix(0, tsNano))
|
||||
}
|
||||
if err := reqRows.Err(); err != nil {
|
||||
return nil, fmt.Errorf("ratelimit.loadState: request rows: %w", err)
|
||||
return nil, coreerr.E("ratelimit.loadState", "request rows", err)
|
||||
}
|
||||
|
||||
// Load tokens.
|
||||
tokRows, err := s.db.Query("SELECT model, ts, count FROM tokens ORDER BY ts")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("ratelimit.loadState: query tokens: %w", err)
|
||||
return nil, coreerr.E("ratelimit.loadState", "query tokens", err)
|
||||
}
|
||||
defer tokRows.Close()
|
||||
|
||||
|
|
@ -250,7 +251,7 @@ func (s *sqliteStore) loadState() (map[string]*UsageStats, error) {
|
|||
var tsNano int64
|
||||
var count int
|
||||
if err := tokRows.Scan(&model, &tsNano, &count); err != nil {
|
||||
return nil, fmt.Errorf("ratelimit.loadState: scan tokens: %w", err)
|
||||
return nil, coreerr.E("ratelimit.loadState", "scan tokens", err)
|
||||
}
|
||||
if _, ok := result[model]; !ok {
|
||||
result[model] = &UsageStats{}
|
||||
|
|
@ -261,7 +262,7 @@ func (s *sqliteStore) loadState() (map[string]*UsageStats, error) {
|
|||
})
|
||||
}
|
||||
if err := tokRows.Err(); err != nil {
|
||||
return nil, fmt.Errorf("ratelimit.loadState: token rows: %w", err)
|
||||
return nil, coreerr.E("ratelimit.loadState", "token rows", err)
|
||||
}
|
||||
|
||||
return result, nil
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue