cli/pkg/lab/collector/influxdb.go
Claude 50da0adcb7 feat: integrate lab dashboard as core lab serve
Port the standalone lab dashboard (lab.lthn.io) into the core CLI as
pkg/lab/ with collectors, handlers, and HTML templates. The dashboard
monitors machines, Docker containers, Forgejo, HuggingFace models,
training runs, and InfluxDB metrics with SSE live updates.

New command: core lab serve --bind :8080

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-16 05:53:52 +00:00

354 lines
11 KiB
Go

package collector
import (
"context"
"encoding/json"
"fmt"
"net/http"
"sort"
"strings"
"time"
"forge.lthn.ai/core/cli/pkg/lab"
)
type InfluxDB struct {
cfg *lab.Config
store *lab.Store
}
func NewInfluxDB(cfg *lab.Config, s *lab.Store) *InfluxDB {
return &InfluxDB{cfg: cfg, store: s}
}
func (i *InfluxDB) Name() string { return "influxdb" }
func (i *InfluxDB) Collect(ctx context.Context) error {
if i.cfg.InfluxURL == "" || i.cfg.InfluxToken == "" {
return nil
}
data := lab.BenchmarkData{
Loss: make(map[string][]lab.LossPoint),
Content: make(map[string][]lab.ContentPoint),
Capability: make(map[string][]lab.CapabilityPoint),
CapabilityJudge: make(map[string][]lab.CapabilityJudgePoint),
UpdatedAt: time.Now(),
}
// Collect all run identifiers from each measurement.
runSet := map[string]lab.BenchmarkRun{}
// Training loss data.
if rows, err := i.query(ctx, "SELECT run_id, model, iteration, loss, loss_type, learning_rate, iterations_per_sec, tokens_per_sec FROM training_loss ORDER BY run_id, iteration"); err == nil {
for _, row := range rows {
rid := jsonStr(row["run_id"])
mdl := jsonStr(row["model"])
if rid == "" {
continue
}
runSet[rid] = lab.BenchmarkRun{RunID: rid, Model: mdl, Type: "training"}
data.Loss[rid] = append(data.Loss[rid], lab.LossPoint{
Iteration: jsonInt(row["iteration"]),
Loss: jsonFloat(row["loss"]),
LossType: jsonStr(row["loss_type"]),
LearningRate: jsonFloat(row["learning_rate"]),
TokensPerSec: jsonFloat(row["tokens_per_sec"]),
})
}
}
// Content scores.
if rows, err := i.query(ctx, "SELECT run_id, model, label, dimension, score, iteration, has_kernel FROM content_score ORDER BY run_id, iteration, dimension"); err == nil {
for _, row := range rows {
rid := jsonStr(row["run_id"])
mdl := jsonStr(row["model"])
if rid == "" {
continue
}
if _, ok := runSet[rid]; !ok {
runSet[rid] = lab.BenchmarkRun{RunID: rid, Model: mdl, Type: "content"}
}
hk := jsonStr(row["has_kernel"])
data.Content[rid] = append(data.Content[rid], lab.ContentPoint{
Label: jsonStr(row["label"]),
Dimension: jsonStr(row["dimension"]),
Score: jsonFloat(row["score"]),
Iteration: jsonInt(row["iteration"]),
HasKernel: hk == "true" || hk == "True",
})
}
}
// Capability scores.
if rows, err := i.query(ctx, "SELECT run_id, model, label, category, accuracy, correct, total, iteration FROM capability_score ORDER BY run_id, iteration, category"); err == nil {
for _, row := range rows {
rid := jsonStr(row["run_id"])
mdl := jsonStr(row["model"])
if rid == "" {
continue
}
if _, ok := runSet[rid]; !ok {
runSet[rid] = lab.BenchmarkRun{RunID: rid, Model: mdl, Type: "capability"}
}
data.Capability[rid] = append(data.Capability[rid], lab.CapabilityPoint{
Label: jsonStr(row["label"]),
Category: jsonStr(row["category"]),
Accuracy: jsonFloat(row["accuracy"]),
Correct: jsonInt(row["correct"]),
Total: jsonInt(row["total"]),
Iteration: jsonInt(row["iteration"]),
})
}
}
// Capability judge scores (0-10 per probe).
if rows, err := i.query(ctx, "SELECT run_id, model, label, probe_id, category, reasoning, correctness, clarity, avg, iteration FROM capability_judge ORDER BY run_id, iteration, probe_id"); err == nil {
for _, row := range rows {
rid := jsonStr(row["run_id"])
if rid == "" {
continue
}
data.CapabilityJudge[rid] = append(data.CapabilityJudge[rid], lab.CapabilityJudgePoint{
Label: jsonStr(row["label"]),
ProbeID: jsonStr(row["probe_id"]),
Category: jsonStr(row["category"]),
Reasoning: jsonFloat(row["reasoning"]),
Correctness: jsonFloat(row["correctness"]),
Clarity: jsonFloat(row["clarity"]),
Avg: jsonFloat(row["avg"]),
Iteration: jsonInt(row["iteration"]),
})
}
}
// Build sorted runs list.
for _, r := range runSet {
data.Runs = append(data.Runs, r)
}
sort.Slice(data.Runs, func(i, j int) bool {
return data.Runs[i].Model < data.Runs[j].Model || (data.Runs[i].Model == data.Runs[j].Model && data.Runs[i].RunID < data.Runs[j].RunID)
})
i.store.SetBenchmarks(data)
// Live training run statuses.
var runStatuses []lab.TrainingRunStatus
if rows, err := i.query(ctx, "SELECT model, run_id, status, iteration, total_iters, pct FROM training_status ORDER BY time DESC LIMIT 50"); err == nil {
// Deduplicate: keep only the latest status per run_id.
seen := map[string]bool{}
for _, row := range rows {
rid := jsonStr(row["run_id"])
if rid == "" || seen[rid] {
continue
}
seen[rid] = true
rs := lab.TrainingRunStatus{
Model: jsonStr(row["model"]),
RunID: rid,
Status: jsonStr(row["status"]),
Iteration: jsonInt(row["iteration"]),
TotalIters: jsonInt(row["total_iters"]),
Pct: jsonFloat(row["pct"]),
}
// Find latest loss for this run from already-collected data.
if lossPoints, ok := data.Loss[rid]; ok {
for j := len(lossPoints) - 1; j >= 0; j-- {
if lossPoints[j].LossType == "train" && rs.LastLoss == 0 {
rs.LastLoss = lossPoints[j].Loss
rs.TokensSec = lossPoints[j].TokensPerSec
}
if lossPoints[j].LossType == "val" && rs.ValLoss == 0 {
rs.ValLoss = lossPoints[j].Loss
}
if rs.LastLoss > 0 && rs.ValLoss > 0 {
break
}
}
}
runStatuses = append(runStatuses, rs)
}
}
i.store.SetTrainingRuns(runStatuses)
// Golden set data explorer — query gold_gen (real-time per-generation records).
gs := lab.GoldenSetSummary{TargetTotal: 15000, UpdatedAt: time.Now()}
// Try real-time gold_gen first (populated by lem_generate.py directly).
if rows, err := i.query(ctx, "SELECT count(DISTINCT i) AS total, count(DISTINCT d) AS domains, count(DISTINCT v) AS voices, avg(gen_time) AS avg_t, avg(chars) AS avg_c FROM gold_gen"); err == nil && len(rows) > 0 {
r := rows[0]
total := jsonInt(r["total"])
if total > 0 {
gs.Available = true
gs.TotalExamples = total
gs.Domains = jsonInt(r["domains"])
gs.Voices = jsonInt(r["voices"])
gs.AvgGenTime = jsonFloat(r["avg_t"])
gs.AvgResponseChars = jsonFloat(r["avg_c"])
gs.CompletionPct = float64(total) / float64(gs.TargetTotal) * 100
}
}
// Fallback to pipeline.py metrics if gold_gen isn't populated.
if !gs.Available {
if rows, err := i.query(ctx, "SELECT total_examples, domains, voices, avg_gen_time, avg_response_chars, completion_pct FROM golden_set_stats ORDER BY time DESC LIMIT 1"); err == nil && len(rows) > 0 {
r := rows[0]
gs.Available = true
gs.TotalExamples = jsonInt(r["total_examples"])
gs.Domains = jsonInt(r["domains"])
gs.Voices = jsonInt(r["voices"])
gs.AvgGenTime = jsonFloat(r["avg_gen_time"])
gs.AvgResponseChars = jsonFloat(r["avg_response_chars"])
gs.CompletionPct = jsonFloat(r["completion_pct"])
}
}
if gs.Available {
// Per-domain from gold_gen.
if rows, err := i.query(ctx, "SELECT d, count(DISTINCT i) AS n, avg(gen_time) AS avg_t FROM gold_gen GROUP BY d ORDER BY n DESC"); err == nil && len(rows) > 0 {
for _, r := range rows {
gs.DomainStats = append(gs.DomainStats, lab.DomainStat{
Domain: jsonStr(r["d"]),
Count: jsonInt(r["n"]),
AvgGenTime: jsonFloat(r["avg_t"]),
})
}
}
// Fallback to pipeline stats.
if len(gs.DomainStats) == 0 {
if rows, err := i.query(ctx, "SELECT DISTINCT domain, count, avg_gen_time FROM golden_set_domain ORDER BY count DESC"); err == nil {
for _, r := range rows {
gs.DomainStats = append(gs.DomainStats, lab.DomainStat{
Domain: jsonStr(r["domain"]),
Count: jsonInt(r["count"]),
AvgGenTime: jsonFloat(r["avg_gen_time"]),
})
}
}
}
// Per-voice from gold_gen.
if rows, err := i.query(ctx, "SELECT v, count(DISTINCT i) AS n, avg(chars) AS avg_c, avg(gen_time) AS avg_t FROM gold_gen GROUP BY v ORDER BY n DESC"); err == nil && len(rows) > 0 {
for _, r := range rows {
gs.VoiceStats = append(gs.VoiceStats, lab.VoiceStat{
Voice: jsonStr(r["v"]),
Count: jsonInt(r["n"]),
AvgChars: jsonFloat(r["avg_c"]),
AvgGenTime: jsonFloat(r["avg_t"]),
})
}
}
// Fallback.
if len(gs.VoiceStats) == 0 {
if rows, err := i.query(ctx, "SELECT DISTINCT voice, count, avg_chars, avg_gen_time FROM golden_set_voice ORDER BY count DESC"); err == nil {
for _, r := range rows {
gs.VoiceStats = append(gs.VoiceStats, lab.VoiceStat{
Voice: jsonStr(r["voice"]),
Count: jsonInt(r["count"]),
AvgChars: jsonFloat(r["avg_chars"]),
AvgGenTime: jsonFloat(r["avg_gen_time"]),
})
}
}
}
}
// Worker activity.
if rows, err := i.query(ctx, "SELECT w, count(DISTINCT i) AS n, max(time) AS last_seen FROM gold_gen GROUP BY w ORDER BY n DESC"); err == nil {
for _, r := range rows {
gs.Workers = append(gs.Workers, lab.WorkerStat{
Worker: jsonStr(r["w"]),
Count: jsonInt(r["n"]),
})
}
}
i.store.SetGoldenSet(gs)
// Dataset stats (from DuckDB, pushed as dataset_stats measurement).
ds := lab.DatasetSummary{UpdatedAt: time.Now()}
if rows, err := i.query(ctx, "SELECT table, rows FROM dataset_stats ORDER BY rows DESC"); err == nil && len(rows) > 0 {
ds.Available = true
for _, r := range rows {
ds.Tables = append(ds.Tables, lab.DatasetTable{
Name: jsonStr(r["table"]),
Rows: jsonInt(r["rows"]),
})
}
}
i.store.SetDataset(ds)
i.store.SetError("influxdb", nil)
return nil
}
func (i *InfluxDB) query(ctx context.Context, sql string) ([]map[string]any, error) {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
body := fmt.Sprintf(`{"db":%q,"q":%q}`, i.cfg.InfluxDB, sql)
req, err := http.NewRequestWithContext(ctx, "POST", i.cfg.InfluxURL+"/api/v3/query_sql", strings.NewReader(body))
if err != nil {
return nil, err
}
req.Header.Set("Authorization", "Bearer "+i.cfg.InfluxToken)
req.Header.Set("Content-Type", "application/json")
resp, err := http.DefaultClient.Do(req)
if err != nil {
i.store.SetError("influxdb", err)
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
err := fmt.Errorf("influxdb query returned %d", resp.StatusCode)
i.store.SetError("influxdb", err)
return nil, err
}
var rows []map[string]any
if err := json.NewDecoder(resp.Body).Decode(&rows); err != nil {
return nil, err
}
return rows, nil
}
// JSON value helpers — InfluxDB 3 returns typed JSON values.
func jsonStr(v any) string {
if v == nil {
return ""
}
if s, ok := v.(string); ok {
return s
}
return fmt.Sprintf("%v", v)
}
func jsonFloat(v any) float64 {
if v == nil {
return 0
}
switch n := v.(type) {
case float64:
return n
case json.Number:
f, _ := n.Float64()
return f
}
return 0
}
func jsonInt(v any) int {
if v == nil {
return 0
}
switch n := v.(type) {
case float64:
return int(n)
case json.Number:
i, _ := n.Int64()
return int(i)
}
return 0
}