LEM/pkg/lem/ingest.go
Snider 56eda1a081 refactor: migrate all 25 commands from passthrough to cobra framework
Replace passthrough() + stdlib flag.FlagSet anti-pattern with proper
cobra integration. Every Run* function now takes a typed *Opts struct
and returns error. Flags registered via cli.StringFlag/IntFlag/etc.
Commands participate in Core lifecycle with full cobra flag parsing.

- 6 command groups: gen, score, data, export, infra, mon
- 25 commands converted, 0 passthrough() calls remain
- Delete passthrough() helper from lem.go
- Update export_test.go to use ExportOpts struct

Co-Authored-By: Virgil <virgil@lethean.io>
2026-02-23 03:32:53 +00:00

327 lines
9.1 KiB
Go

package lem
import (
"bufio"
"encoding/json"
"fmt"
"os"
"regexp"
"strconv"
"strings"
)
// IngestOpts holds configuration for the ingest command.
type IngestOpts struct {
Content string // Content scores JSONL file
Capability string // Capability scores JSONL file
TrainingLog string // MLX LoRA training log file
Model string // Model name tag (required)
RunID string // Run ID tag (defaults to model name)
InfluxURL string // InfluxDB URL
InfluxDB string // InfluxDB database name
BatchSize int // Lines per InfluxDB write batch
}
// RunIngest is the CLI entry point for the ingest command.
// It reads benchmark JSONL files and training logs, then pushes
// the data into InfluxDB as line protocol for the lab dashboard.
func RunIngest(cfg IngestOpts) error {
if cfg.Model == "" {
return fmt.Errorf("--model is required")
}
if cfg.Content == "" && cfg.Capability == "" && cfg.TrainingLog == "" {
return fmt.Errorf("at least one of --content, --capability, or --training-log is required")
}
if cfg.RunID == "" {
cfg.RunID = cfg.Model
}
if cfg.BatchSize <= 0 {
cfg.BatchSize = 100
}
influx := NewInfluxClient(cfg.InfluxURL, cfg.InfluxDB)
total := 0
if cfg.Content != "" {
n, err := ingestContentScores(influx, cfg.Content, cfg.Model, cfg.RunID, cfg.BatchSize)
if err != nil {
return fmt.Errorf("ingest content scores: %w", err)
}
fmt.Printf(" Content scores: %d points\n", n)
total += n
}
if cfg.Capability != "" {
n, err := ingestCapabilityScores(influx, cfg.Capability, cfg.Model, cfg.RunID, cfg.BatchSize)
if err != nil {
return fmt.Errorf("ingest capability scores: %w", err)
}
fmt.Printf(" Capability scores: %d points\n", n)
total += n
}
if cfg.TrainingLog != "" {
n, err := ingestTrainingCurve(influx, cfg.TrainingLog, cfg.Model, cfg.RunID, cfg.BatchSize)
if err != nil {
return fmt.Errorf("ingest training curve: %w", err)
}
fmt.Printf(" Training curve: %d points\n", n)
total += n
}
fmt.Printf("\nTotal: %d points ingested\n", total)
return nil
}
var iterRe = regexp.MustCompile(`@(\d+)`)
// extractIteration pulls the iteration number from a label like "model@200".
func extractIteration(label string) int {
m := iterRe.FindStringSubmatch(label)
if m == nil {
return 0
}
n, _ := strconv.Atoi(m[1])
return n
}
// contentScoreEntry is one line from a content scores JSONL file.
type contentScoreEntry struct {
Label string `json:"label"`
Aggregates map[string]float64 `json:"aggregates"`
Probes map[string]contentProbeEntry `json:"probes"`
}
type contentProbeEntry struct {
Scores map[string]any `json:"scores"`
}
// ingestContentScores reads a content scores JSONL file and writes
// content_score and probe_score measurements to InfluxDB.
func ingestContentScores(influx *InfluxClient, filepath, model, runID string, batchSize int) (int, error) {
f, err := os.Open(filepath)
if err != nil {
return 0, fmt.Errorf("open %s: %w", filepath, err)
}
defer f.Close()
var lines []string
count := 0
scanner := bufio.NewScanner(f)
scanner.Buffer(make([]byte, 1024*1024), 1024*1024)
for scanner.Scan() {
text := strings.TrimSpace(scanner.Text())
if text == "" {
continue
}
var entry contentScoreEntry
if err := json.Unmarshal([]byte(text), &entry); err != nil {
continue
}
label := entry.Label
if label == "" {
label = "unknown"
}
iteration := extractIteration(label)
hasKernel := strings.Contains(strings.ToLower(label), "kernel")
// Aggregate scores.
for dim, val := range entry.Aggregates {
lp := fmt.Sprintf("content_score,model=%s,run_id=%s,label=%s,dimension=%s,has_kernel=%t score=%f,iteration=%di",
escapeLp(model), escapeLp(runID), escapeLp(label), escapeLp(dim), hasKernel, val, iteration)
lines = append(lines, lp)
count++
}
// Per-probe scores.
for probeID, probeData := range entry.Probes {
for dim, val := range probeData.Scores {
if dim == "notes" {
continue
}
fval, ok := toFloat64(val)
if !ok {
continue
}
lp := fmt.Sprintf("probe_score,model=%s,run_id=%s,label=%s,probe=%s,dimension=%s,has_kernel=%t score=%f,iteration=%di",
escapeLp(model), escapeLp(runID), escapeLp(label), escapeLp(probeID), escapeLp(dim), hasKernel, fval, iteration)
lines = append(lines, lp)
count++
}
}
if len(lines) >= batchSize {
if err := influx.WriteLp(lines); err != nil {
return count, fmt.Errorf("write content scores: %w", err)
}
lines = lines[:0]
}
}
if len(lines) > 0 {
if err := influx.WriteLp(lines); err != nil {
return count, fmt.Errorf("flush content scores: %w", err)
}
}
return count, scanner.Err()
}
// capabilityScoreEntry is one line from a capability scores JSONL file.
type capabilityScoreEntry struct {
Label string `json:"label"`
Accuracy float64 `json:"accuracy"`
Correct int `json:"correct"`
Total int `json:"total"`
ByCategory map[string]capabilityCatEntry `json:"by_category"`
}
type capabilityCatEntry struct {
Correct int `json:"correct"`
Total int `json:"total"`
}
// ingestCapabilityScores reads a capability scores JSONL file and writes
// capability_score measurements to InfluxDB.
func ingestCapabilityScores(influx *InfluxClient, filepath, model, runID string, batchSize int) (int, error) {
f, err := os.Open(filepath)
if err != nil {
return 0, fmt.Errorf("open %s: %w", filepath, err)
}
defer f.Close()
var lines []string
count := 0
scanner := bufio.NewScanner(f)
scanner.Buffer(make([]byte, 1024*1024), 1024*1024)
for scanner.Scan() {
text := strings.TrimSpace(scanner.Text())
if text == "" {
continue
}
var entry capabilityScoreEntry
if err := json.Unmarshal([]byte(text), &entry); err != nil {
continue
}
label := entry.Label
if label == "" {
label = "unknown"
}
iteration := extractIteration(label)
// Overall score.
lp := fmt.Sprintf("capability_score,model=%s,run_id=%s,label=%s,category=overall accuracy=%f,correct=%di,total=%di,iteration=%di",
escapeLp(model), escapeLp(runID), escapeLp(label), entry.Accuracy, entry.Correct, entry.Total, iteration)
lines = append(lines, lp)
count++
// Per-category scores.
for cat, catData := range entry.ByCategory {
if catData.Total > 0 {
pct := float64(catData.Correct) / float64(catData.Total) * 100.0
lp := fmt.Sprintf("capability_score,model=%s,run_id=%s,label=%s,category=%s accuracy=%f,correct=%di,total=%di,iteration=%di",
escapeLp(model), escapeLp(runID), escapeLp(label), escapeLp(cat), pct, catData.Correct, catData.Total, iteration)
lines = append(lines, lp)
count++
}
}
if len(lines) >= batchSize {
if err := influx.WriteLp(lines); err != nil {
return count, fmt.Errorf("write capability scores: %w", err)
}
lines = lines[:0]
}
}
if len(lines) > 0 {
if err := influx.WriteLp(lines); err != nil {
return count, fmt.Errorf("flush capability scores: %w", err)
}
}
return count, scanner.Err()
}
var (
valLossRe = regexp.MustCompile(`Iter (\d+): Val loss ([\d.]+)`)
trainLossRe = regexp.MustCompile(`Iter (\d+): Train loss ([\d.]+), Learning Rate ([\d.eE+-]+), It/sec ([\d.]+), Tokens/sec ([\d.]+)`)
)
// ingestTrainingCurve parses an mlx_lm training log and writes
// training_loss measurements to InfluxDB.
func ingestTrainingCurve(influx *InfluxClient, filepath, model, runID string, batchSize int) (int, error) {
f, err := os.Open(filepath)
if err != nil {
return 0, fmt.Errorf("open %s: %w", filepath, err)
}
defer f.Close()
var lines []string
count := 0
scanner := bufio.NewScanner(f)
for scanner.Scan() {
text := scanner.Text()
if m := valLossRe.FindStringSubmatch(text); m != nil {
iteration, _ := strconv.Atoi(m[1])
valLoss, _ := strconv.ParseFloat(m[2], 64)
lp := fmt.Sprintf("training_loss,model=%s,run_id=%s,loss_type=val loss=%f,iteration=%di",
escapeLp(model), escapeLp(runID), valLoss, iteration)
lines = append(lines, lp)
count++
}
if m := trainLossRe.FindStringSubmatch(text); m != nil {
iteration, _ := strconv.Atoi(m[1])
trainLoss, _ := strconv.ParseFloat(m[2], 64)
lr, _ := strconv.ParseFloat(m[3], 64)
itSec, _ := strconv.ParseFloat(m[4], 64)
tokSec, _ := strconv.ParseFloat(m[5], 64)
lp := fmt.Sprintf("training_loss,model=%s,run_id=%s,loss_type=train loss=%f,learning_rate=%f,iterations_per_sec=%f,tokens_per_sec=%f,iteration=%di",
escapeLp(model), escapeLp(runID), trainLoss, lr, itSec, tokSec, iteration)
lines = append(lines, lp)
count++
}
if len(lines) >= batchSize {
if err := influx.WriteLp(lines); err != nil {
return count, fmt.Errorf("write training curve: %w", err)
}
lines = lines[:0]
}
}
if len(lines) > 0 {
if err := influx.WriteLp(lines); err != nil {
return count, fmt.Errorf("flush training curve: %w", err)
}
}
return count, scanner.Err()
}
// toFloat64 converts an interface{} to float64 if possible.
func toFloat64(v any) (float64, bool) {
switch n := v.(type) {
case float64:
return n, true
case int:
return float64(n), true
case json.Number:
f, err := n.Float64()
return f, err == nil
default:
return 0, false
}
}