From 0afa5e9147ca48718c1d1e7025a8efe40f7fbc96 Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 15 Feb 2026 16:55:17 +0000 Subject: [PATCH] feat: add `lem ingest` command + go-huggingface dependency Ingests benchmark data (content scores, capability scores, training curves) from JSONL files and mlx_lm logs into InfluxDB. Batched writes, iteration extraction from checkpoint labels. Also adds github.com/hupe1980/go-huggingface for future HF sync. Co-Authored-By: Claude Opus 4.6 --- go.mod | 1 + go.sum | 2 + main.go | 3 + pkg/lem/ingest.go | 331 +++++++++++++++++++++++++++++++++++++++++ pkg/lem/ingest_test.go | 223 +++++++++++++++++++++++++++ 5 files changed, 560 insertions(+) create mode 100644 pkg/lem/ingest.go create mode 100644 pkg/lem/ingest_test.go diff --git a/go.mod b/go.mod index 1fd424c..dd2f83d 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/goccy/go-json v0.10.5 // indirect github.com/google/flatbuffers v25.1.24+incompatible // indirect github.com/google/uuid v1.6.0 // indirect + github.com/hupe1980/go-huggingface v0.0.15 // indirect github.com/klauspost/compress v1.17.11 // indirect github.com/klauspost/cpuid/v2 v2.2.9 // indirect github.com/marcboeker/go-duckdb v1.8.5 // indirect diff --git a/go.sum b/go.sum index e14faae..6c74149 100644 --- a/go.sum +++ b/go.sum @@ -8,6 +8,8 @@ github.com/google/flatbuffers v25.1.24+incompatible h1:4wPqL3K7GzBd1CwyhSd3usxLK github.com/google/flatbuffers v25.1.24+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/hupe1980/go-huggingface v0.0.15 h1:tTWmUGGunC/BYz4hrwS8SSVtMYVYjceG2uhL8HxeXvw= +github.com/hupe1980/go-huggingface v0.0.15/go.mod h1:IRvsik3+b9BJyw9hCfw1arI6gDObcVto1UA8f3kt8mM= github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc= github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= github.com/klauspost/cpuid/v2 v2.2.9 h1:66ze0taIn2H33fBvCkXuv9BmCwDfafmiIVpKV9kKGuY= diff --git a/main.go b/main.go index 55c741d..8b0d1f6 100644 --- a/main.go +++ b/main.go @@ -20,6 +20,7 @@ Commands: export Export golden set to training-format JSONL splits expand Generate expansion responses via trained LEM model conv Generate conversational training data + ingest Ingest benchmark data into InfluxDB ` func main() { @@ -43,6 +44,8 @@ func main() { lem.RunExport(os.Args[2:]) case "conv": lem.RunConv(os.Args[2:]) + case "ingest": + lem.RunIngest(os.Args[2:]) default: fmt.Fprintf(os.Stderr, "unknown command: %s\n\n%s", os.Args[1], usage) os.Exit(1) diff --git a/pkg/lem/ingest.go b/pkg/lem/ingest.go new file mode 100644 index 0000000..f13ba3d --- /dev/null +++ b/pkg/lem/ingest.go @@ -0,0 +1,331 @@ +package lem + +import ( + "bufio" + "encoding/json" + "flag" + "fmt" + "log" + "os" + "regexp" + "strconv" + "strings" +) + +// RunIngest is the CLI entry point for the ingest command. +// It reads benchmark JSONL files and training logs, then pushes +// the data into InfluxDB as line protocol for the lab dashboard. +func RunIngest(args []string) { + fs := flag.NewFlagSet("ingest", flag.ExitOnError) + + contentFile := fs.String("content", "", "Content scores JSONL file") + capabilityFile := fs.String("capability", "", "Capability scores JSONL file") + trainingLog := fs.String("training-log", "", "MLX LoRA training log file") + model := fs.String("model", "", "Model name tag (required)") + runID := fs.String("run-id", "", "Run ID tag (defaults to model name)") + influxURL := fs.String("influx", "", "InfluxDB URL") + influxDB := fs.String("influx-db", "", "InfluxDB database name") + batchSize := fs.Int("batch-size", 100, "Lines per InfluxDB write batch") + + if err := fs.Parse(args); err != nil { + log.Fatalf("parse flags: %v", err) + } + + if *model == "" { + fmt.Fprintln(os.Stderr, "error: --model is required") + fs.Usage() + os.Exit(1) + } + + if *contentFile == "" && *capabilityFile == "" && *trainingLog == "" { + fmt.Fprintln(os.Stderr, "error: at least one of --content, --capability, or --training-log is required") + fs.Usage() + os.Exit(1) + } + + if *runID == "" { + *runID = *model + } + + influx := NewInfluxClient(*influxURL, *influxDB) + total := 0 + + if *contentFile != "" { + n, err := ingestContentScores(influx, *contentFile, *model, *runID, *batchSize) + if err != nil { + log.Fatalf("ingest content scores: %v", err) + } + fmt.Printf(" Content scores: %d points\n", n) + total += n + } + + if *capabilityFile != "" { + n, err := ingestCapabilityScores(influx, *capabilityFile, *model, *runID, *batchSize) + if err != nil { + log.Fatalf("ingest capability scores: %v", err) + } + fmt.Printf(" Capability scores: %d points\n", n) + total += n + } + + if *trainingLog != "" { + n, err := ingestTrainingCurve(influx, *trainingLog, *model, *runID, *batchSize) + if err != nil { + log.Fatalf("ingest training curve: %v", err) + } + fmt.Printf(" Training curve: %d points\n", n) + total += n + } + + fmt.Printf("\nTotal: %d points ingested\n", total) +} + +var iterRe = regexp.MustCompile(`@(\d+)`) + +// extractIteration pulls the iteration number from a label like "model@200". +func extractIteration(label string) int { + m := iterRe.FindStringSubmatch(label) + if m == nil { + return 0 + } + n, _ := strconv.Atoi(m[1]) + return n +} + +// contentScoreEntry is one line from a content scores JSONL file. +type contentScoreEntry struct { + Label string `json:"label"` + Aggregates map[string]float64 `json:"aggregates"` + Probes map[string]contentProbeEntry `json:"probes"` +} + +type contentProbeEntry struct { + Scores map[string]interface{} `json:"scores"` +} + +// ingestContentScores reads a content scores JSONL file and writes +// content_score and probe_score measurements to InfluxDB. +func ingestContentScores(influx *InfluxClient, filepath, model, runID string, batchSize int) (int, error) { + f, err := os.Open(filepath) + if err != nil { + return 0, fmt.Errorf("open %s: %w", filepath, err) + } + defer f.Close() + + var lines []string + count := 0 + scanner := bufio.NewScanner(f) + scanner.Buffer(make([]byte, 1024*1024), 1024*1024) + + for scanner.Scan() { + text := strings.TrimSpace(scanner.Text()) + if text == "" { + continue + } + + var entry contentScoreEntry + if err := json.Unmarshal([]byte(text), &entry); err != nil { + continue + } + + label := entry.Label + if label == "" { + label = "unknown" + } + iteration := extractIteration(label) + hasKernel := strings.Contains(strings.ToLower(label), "kernel") + + // Aggregate scores. + for dim, val := range entry.Aggregates { + lp := fmt.Sprintf("content_score,model=%s,run_id=%s,label=%s,dimension=%s,has_kernel=%t score=%f,iteration=%di", + escapeLp(model), escapeLp(runID), escapeLp(label), escapeLp(dim), hasKernel, val, iteration) + lines = append(lines, lp) + count++ + } + + // Per-probe scores. + for probeID, probeData := range entry.Probes { + for dim, val := range probeData.Scores { + if dim == "notes" { + continue + } + fval, ok := toFloat64(val) + if !ok { + continue + } + lp := fmt.Sprintf("probe_score,model=%s,run_id=%s,label=%s,probe=%s,dimension=%s,has_kernel=%t score=%f,iteration=%di", + escapeLp(model), escapeLp(runID), escapeLp(label), escapeLp(probeID), escapeLp(dim), hasKernel, fval, iteration) + lines = append(lines, lp) + count++ + } + } + + if len(lines) >= batchSize { + if err := influx.WriteLp(lines); err != nil { + return count, fmt.Errorf("write content scores: %w", err) + } + lines = lines[:0] + } + } + + if len(lines) > 0 { + if err := influx.WriteLp(lines); err != nil { + return count, fmt.Errorf("flush content scores: %w", err) + } + } + + return count, scanner.Err() +} + +// capabilityScoreEntry is one line from a capability scores JSONL file. +type capabilityScoreEntry struct { + Label string `json:"label"` + Accuracy float64 `json:"accuracy"` + Correct int `json:"correct"` + Total int `json:"total"` + ByCategory map[string]capabilityCatEntry `json:"by_category"` +} + +type capabilityCatEntry struct { + Correct int `json:"correct"` + Total int `json:"total"` +} + +// ingestCapabilityScores reads a capability scores JSONL file and writes +// capability_score measurements to InfluxDB. +func ingestCapabilityScores(influx *InfluxClient, filepath, model, runID string, batchSize int) (int, error) { + f, err := os.Open(filepath) + if err != nil { + return 0, fmt.Errorf("open %s: %w", filepath, err) + } + defer f.Close() + + var lines []string + count := 0 + scanner := bufio.NewScanner(f) + scanner.Buffer(make([]byte, 1024*1024), 1024*1024) + + for scanner.Scan() { + text := strings.TrimSpace(scanner.Text()) + if text == "" { + continue + } + + var entry capabilityScoreEntry + if err := json.Unmarshal([]byte(text), &entry); err != nil { + continue + } + + label := entry.Label + if label == "" { + label = "unknown" + } + iteration := extractIteration(label) + + // Overall score. + lp := fmt.Sprintf("capability_score,model=%s,run_id=%s,label=%s,category=overall accuracy=%f,correct=%di,total=%di,iteration=%di", + escapeLp(model), escapeLp(runID), escapeLp(label), entry.Accuracy, entry.Correct, entry.Total, iteration) + lines = append(lines, lp) + count++ + + // Per-category scores. + for cat, catData := range entry.ByCategory { + if catData.Total > 0 { + pct := float64(catData.Correct) / float64(catData.Total) * 100.0 + lp := fmt.Sprintf("capability_score,model=%s,run_id=%s,label=%s,category=%s accuracy=%f,correct=%di,total=%di,iteration=%di", + escapeLp(model), escapeLp(runID), escapeLp(label), escapeLp(cat), pct, catData.Correct, catData.Total, iteration) + lines = append(lines, lp) + count++ + } + } + + if len(lines) >= batchSize { + if err := influx.WriteLp(lines); err != nil { + return count, fmt.Errorf("write capability scores: %w", err) + } + lines = lines[:0] + } + } + + if len(lines) > 0 { + if err := influx.WriteLp(lines); err != nil { + return count, fmt.Errorf("flush capability scores: %w", err) + } + } + + return count, scanner.Err() +} + +var ( + valLossRe = regexp.MustCompile(`Iter (\d+): Val loss ([\d.]+)`) + trainLossRe = regexp.MustCompile(`Iter (\d+): Train loss ([\d.]+), Learning Rate ([\d.eE+-]+), It/sec ([\d.]+), Tokens/sec ([\d.]+)`) +) + +// ingestTrainingCurve parses an mlx_lm training log and writes +// training_loss measurements to InfluxDB. +func ingestTrainingCurve(influx *InfluxClient, filepath, model, runID string, batchSize int) (int, error) { + f, err := os.Open(filepath) + if err != nil { + return 0, fmt.Errorf("open %s: %w", filepath, err) + } + defer f.Close() + + var lines []string + count := 0 + scanner := bufio.NewScanner(f) + + for scanner.Scan() { + text := scanner.Text() + + if m := valLossRe.FindStringSubmatch(text); m != nil { + iteration, _ := strconv.Atoi(m[1]) + valLoss, _ := strconv.ParseFloat(m[2], 64) + lp := fmt.Sprintf("training_loss,model=%s,run_id=%s,loss_type=val loss=%f,iteration=%di", + escapeLp(model), escapeLp(runID), valLoss, iteration) + lines = append(lines, lp) + count++ + } + + if m := trainLossRe.FindStringSubmatch(text); m != nil { + iteration, _ := strconv.Atoi(m[1]) + trainLoss, _ := strconv.ParseFloat(m[2], 64) + lr, _ := strconv.ParseFloat(m[3], 64) + itSec, _ := strconv.ParseFloat(m[4], 64) + tokSec, _ := strconv.ParseFloat(m[5], 64) + lp := fmt.Sprintf("training_loss,model=%s,run_id=%s,loss_type=train loss=%f,learning_rate=%f,iterations_per_sec=%f,tokens_per_sec=%f,iteration=%di", + escapeLp(model), escapeLp(runID), trainLoss, lr, itSec, tokSec, iteration) + lines = append(lines, lp) + count++ + } + + if len(lines) >= batchSize { + if err := influx.WriteLp(lines); err != nil { + return count, fmt.Errorf("write training curve: %w", err) + } + lines = lines[:0] + } + } + + if len(lines) > 0 { + if err := influx.WriteLp(lines); err != nil { + return count, fmt.Errorf("flush training curve: %w", err) + } + } + + return count, scanner.Err() +} + +// toFloat64 converts an interface{} to float64 if possible. +func toFloat64(v interface{}) (float64, bool) { + switch n := v.(type) { + case float64: + return n, true + case int: + return float64(n), true + case json.Number: + f, err := n.Float64() + return f, err == nil + default: + return 0, false + } +} diff --git a/pkg/lem/ingest_test.go b/pkg/lem/ingest_test.go new file mode 100644 index 0000000..f66962a --- /dev/null +++ b/pkg/lem/ingest_test.go @@ -0,0 +1,223 @@ +package lem + +import ( + "encoding/json" + "net/http" + "net/http/httptest" + "os" + "path/filepath" + "strings" + "testing" +) + +func TestExtractIteration(t *testing.T) { + tests := []struct { + label string + want int + }{ + {"deepseek-r1@200", 200}, + {"gemma12b@1600", 1600}, + {"model@0", 0}, + {"no-iteration", 0}, + {"base", 0}, + {"@50+kernel", 50}, + } + + for _, tt := range tests { + got := extractIteration(tt.label) + if got != tt.want { + t.Errorf("extractIteration(%q) = %d, want %d", tt.label, got, tt.want) + } + } +} + +func TestIngestContentScores(t *testing.T) { + var receivedLines []string + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body := make([]byte, r.ContentLength) + r.Body.Read(body) + receivedLines = append(receivedLines, strings.Split(string(body), "\n")...) + w.WriteHeader(http.StatusNoContent) + })) + defer ts.Close() + + // Create test JSONL. + dir := t.TempDir() + path := filepath.Join(dir, "content.jsonl") + + entries := []contentScoreEntry{ + { + Label: "gemma12b@200", + Aggregates: map[string]float64{"sovereignty": 7.5, "ethical_depth": 8.0}, + Probes: map[string]contentProbeEntry{ + "p01": {Scores: map[string]interface{}{"sovereignty": 8.0, "notes": "good"}}, + }, + }, + { + Label: "gemma12b@400+kernel", + Aggregates: map[string]float64{"sovereignty": 9.0}, + }, + } + + f, _ := os.Create(path) + for _, e := range entries { + data, _ := json.Marshal(e) + f.Write(data) + f.WriteString("\n") + } + f.Close() + + influx := &InfluxClient{url: ts.URL, db: "test", token: "test"} + n, err := ingestContentScores(influx, path, "gemma3-12b", "test-run", 100) + if err != nil { + t.Fatalf("ingest: %v", err) + } + + // 2 aggregates + 1 probe (notes skipped) + 1 aggregate = 4 points. + if n != 4 { + t.Errorf("expected 4 points, got %d", n) + } + + // Verify line protocol content. + allLines := strings.Join(receivedLines, "\n") + if !strings.Contains(allLines, "content_score") { + t.Error("missing content_score measurement") + } + if !strings.Contains(allLines, "probe_score") { + t.Error("missing probe_score measurement") + } + if !strings.Contains(allLines, "has_kernel=true") { + t.Error("missing has_kernel=true for kernel label") + } + if !strings.Contains(allLines, "iteration=200i") { + t.Error("missing iteration=200i") + } +} + +func TestIngestCapabilityScores(t *testing.T) { + var receivedLines []string + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body := make([]byte, r.ContentLength) + r.Body.Read(body) + receivedLines = append(receivedLines, strings.Split(string(body), "\n")...) + w.WriteHeader(http.StatusNoContent) + })) + defer ts.Close() + + dir := t.TempDir() + path := filepath.Join(dir, "capability.jsonl") + + entries := []capabilityScoreEntry{ + { + Label: "deepseek@400", + Accuracy: 82.6, + Correct: 19, + Total: 23, + ByCategory: map[string]capabilityCatEntry{ + "math": {Correct: 7, Total: 8}, + "logic": {Correct: 4, Total: 5}, + "empty": {Correct: 0, Total: 0}, // Should be skipped. + }, + }, + } + + f, _ := os.Create(path) + for _, e := range entries { + data, _ := json.Marshal(e) + f.Write(data) + f.WriteString("\n") + } + f.Close() + + influx := &InfluxClient{url: ts.URL, db: "test", token: "test"} + n, err := ingestCapabilityScores(influx, path, "deepseek-r1-7b", "test-run", 100) + if err != nil { + t.Fatalf("ingest: %v", err) + } + + // 1 overall + 2 categories (empty skipped) = 3. + if n != 3 { + t.Errorf("expected 3 points, got %d", n) + } + + allLines := strings.Join(receivedLines, "\n") + if !strings.Contains(allLines, "category=overall") { + t.Error("missing overall category") + } + if !strings.Contains(allLines, "category=math") { + t.Error("missing math category") + } + if !strings.Contains(allLines, "iteration=400i") { + t.Error("missing iteration=400i") + } +} + +func TestIngestTrainingCurve(t *testing.T) { + var receivedLines []string + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body := make([]byte, r.ContentLength) + r.Body.Read(body) + receivedLines = append(receivedLines, strings.Split(string(body), "\n")...) + w.WriteHeader(http.StatusNoContent) + })) + defer ts.Close() + + dir := t.TempDir() + path := filepath.Join(dir, "training.log") + + logContent := `Loading model from mlx-community/gemma-3-1b-it-qat-4bit +Starting training... +Iter 10: Train loss 2.534, Learning Rate 1.000e-05, It/sec 3.21, Tokens/sec 1205.4 +Iter 20: Train loss 1.891, Learning Rate 1.000e-05, It/sec 3.18, Tokens/sec 1198.2 +Iter 25: Val loss 1.756 +Iter 30: Train loss 1.654, Learning Rate 1.000e-05, It/sec 3.22, Tokens/sec 1210.0 +Some random log line that should be ignored +Iter 50: Val loss 1.523 +` + os.WriteFile(path, []byte(logContent), 0644) + + influx := &InfluxClient{url: ts.URL, db: "test", token: "test"} + n, err := ingestTrainingCurve(influx, path, "gemma3-1b", "test-run", 100) + if err != nil { + t.Fatalf("ingest: %v", err) + } + + // 3 train + 2 val = 5. + if n != 5 { + t.Errorf("expected 5 points, got %d", n) + } + + allLines := strings.Join(receivedLines, "\n") + if !strings.Contains(allLines, "loss_type=val") { + t.Error("missing val loss") + } + if !strings.Contains(allLines, "loss_type=train") { + t.Error("missing train loss") + } + if !strings.Contains(allLines, "tokens_per_sec=") { + t.Error("missing tokens_per_sec field") + } +} + +func TestToFloat64(t *testing.T) { + tests := []struct { + input interface{} + want float64 + ok bool + }{ + {7.5, 7.5, true}, + {42, 42.0, true}, + {"not a number", 0, false}, + {nil, 0, false}, + } + + for _, tt := range tests { + got, ok := toFloat64(tt.input) + if ok != tt.ok { + t.Errorf("toFloat64(%v) ok=%v, want %v", tt.input, ok, tt.ok) + } + if ok && got != tt.want { + t.Errorf("toFloat64(%v) = %f, want %f", tt.input, got, tt.want) + } + } +}