go/pkg/ml/ingest.go
Claude 1f3a1bcc47 feat: port 11 LEM data management commands into core ml
Ports all remaining LEM pipeline commands from pkg/lem into core ml,
eliminating the standalone LEM CLI dependency. Each command is split
into reusable business logic (pkg/ml/) and a thin cobra wrapper
(internal/cmd/ml/).

New commands: query, inventory, metrics, ingest, normalize, seed-influx,
consolidate, import-all, approve, publish, coverage.

Adds Path(), Exec(), QueryRowScan() convenience methods to DB type.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-16 05:53:52 +00:00

384 lines
11 KiB
Go

package ml
import (
"bufio"
"encoding/json"
"fmt"
"io"
"os"
"regexp"
"strconv"
"strings"
"time"
)
// IngestConfig holds the configuration for a benchmark/training ingest run.
type IngestConfig struct {
ContentFile string
CapabilityFile string
TrainingLog string
Model string
RunID string
BatchSize int
}
// contentScoreLine is the JSON structure for a content scores JSONL line.
type contentScoreLine struct {
Label string `json:"label"`
Aggregates map[string]interface{} `json:"aggregates"`
Probes map[string]contentScoreProbe `json:"probes"`
}
// contentScoreProbe is the per-probe block within a content score line.
type contentScoreProbe struct {
Scores map[string]interface{} `json:"scores"`
}
// capabilityScoreLine is the JSON structure for a capability scores JSONL line.
type capabilityScoreLine struct {
Label string `json:"label"`
Accuracy float64 `json:"accuracy"`
Correct int `json:"correct"`
Total int `json:"total"`
ByCategory map[string]capabilityCatBlock `json:"by_category"`
}
// capabilityCatBlock is the per-category block within a capability score line.
type capabilityCatBlock struct {
Correct int `json:"correct"`
Total int `json:"total"`
}
// Training log regexes.
var (
reValLoss = regexp.MustCompile(`Iter (\d+): Val loss ([\d.]+)`)
reTrainLoss = regexp.MustCompile(`Iter (\d+): Train loss ([\d.]+), Learning Rate ([\d.eE+-]+), It/sec ([\d.]+), Tokens/sec ([\d.]+)`)
)
// Ingest reads benchmark scores and training logs and writes them to InfluxDB.
// At least one of ContentFile, CapabilityFile, or TrainingLog must be set.
func Ingest(influx *InfluxClient, cfg IngestConfig, w io.Writer) error {
if cfg.ContentFile == "" && cfg.CapabilityFile == "" && cfg.TrainingLog == "" {
return fmt.Errorf("at least one of --content, --capability, or --training-log is required")
}
if cfg.Model == "" {
return fmt.Errorf("--model is required")
}
if cfg.RunID == "" {
cfg.RunID = cfg.Model
}
if cfg.BatchSize <= 0 {
cfg.BatchSize = 100
}
var totalPoints int
if cfg.ContentFile != "" {
n, err := ingestContentScores(influx, cfg, w)
if err != nil {
return fmt.Errorf("ingest content scores: %w", err)
}
totalPoints += n
}
if cfg.CapabilityFile != "" {
n, err := ingestCapabilityScores(influx, cfg, w)
if err != nil {
return fmt.Errorf("ingest capability scores: %w", err)
}
totalPoints += n
}
if cfg.TrainingLog != "" {
n, err := ingestTrainingLog(influx, cfg, w)
if err != nil {
return fmt.Errorf("ingest training log: %w", err)
}
totalPoints += n
}
fmt.Fprintf(w, "Ingested %d total points into InfluxDB\n", totalPoints)
return nil
}
// ingestContentScores reads a content scores JSONL file and writes content_score
// and probe_score measurements to InfluxDB.
func ingestContentScores(influx *InfluxClient, cfg IngestConfig, w io.Writer) (int, error) {
f, err := os.Open(cfg.ContentFile)
if err != nil {
return 0, fmt.Errorf("open %s: %w", cfg.ContentFile, err)
}
defer f.Close()
scanner := bufio.NewScanner(f)
scanner.Buffer(make([]byte, 1024*1024), 1024*1024)
var lines []string
var totalPoints int
lineNum := 0
for scanner.Scan() {
lineNum++
raw := strings.TrimSpace(scanner.Text())
if raw == "" {
continue
}
var entry contentScoreLine
if err := json.Unmarshal([]byte(raw), &entry); err != nil {
return totalPoints, fmt.Errorf("line %d: parse json: %w", lineNum, err)
}
label := entry.Label
iteration := extractIteration(label)
hasKernel := "false"
if strings.Contains(strings.ToLower(label), "kernel") || strings.Contains(label, "LEK") {
hasKernel = "true"
}
ts := time.Now().UnixNano()
// Write aggregate content_score — one point per dimension.
for dim, val := range entry.Aggregates {
score, ok := toFloat64(val)
if !ok {
continue
}
line := fmt.Sprintf(
"content_score,model=%s,run_id=%s,label=%s,dimension=%s,has_kernel=%s score=%.6f,iteration=%di %d",
EscapeLp(cfg.Model), EscapeLp(cfg.RunID), EscapeLp(label),
EscapeLp(dim), hasKernel, score, iteration, ts,
)
lines = append(lines, line)
totalPoints++
}
// Write per-probe probe_score — one point per probe per dimension.
for probeID, probe := range entry.Probes {
for dim, val := range probe.Scores {
score, ok := toFloat64(val)
if !ok {
continue
}
line := fmt.Sprintf(
"probe_score,model=%s,run_id=%s,label=%s,probe_id=%s,dimension=%s,has_kernel=%s score=%.6f,iteration=%di %d",
EscapeLp(cfg.Model), EscapeLp(cfg.RunID), EscapeLp(label),
EscapeLp(probeID), EscapeLp(dim), hasKernel, score, iteration, ts,
)
lines = append(lines, line)
totalPoints++
}
}
// Flush batch if needed.
if len(lines) >= cfg.BatchSize {
if err := influx.WriteLp(lines); err != nil {
return totalPoints, fmt.Errorf("write batch: %w", err)
}
lines = lines[:0]
}
}
if err := scanner.Err(); err != nil {
return totalPoints, fmt.Errorf("scan %s: %w", cfg.ContentFile, err)
}
// Flush remaining lines.
if len(lines) > 0 {
if err := influx.WriteLp(lines); err != nil {
return totalPoints, fmt.Errorf("write final batch: %w", err)
}
}
fmt.Fprintf(w, " content scores: %d points from %d lines\n", totalPoints, lineNum)
return totalPoints, nil
}
// ingestCapabilityScores reads a capability scores JSONL file and writes
// capability_score measurements to InfluxDB.
func ingestCapabilityScores(influx *InfluxClient, cfg IngestConfig, w io.Writer) (int, error) {
f, err := os.Open(cfg.CapabilityFile)
if err != nil {
return 0, fmt.Errorf("open %s: %w", cfg.CapabilityFile, err)
}
defer f.Close()
scanner := bufio.NewScanner(f)
scanner.Buffer(make([]byte, 1024*1024), 1024*1024)
var lines []string
var totalPoints int
lineNum := 0
for scanner.Scan() {
lineNum++
raw := strings.TrimSpace(scanner.Text())
if raw == "" {
continue
}
var entry capabilityScoreLine
if err := json.Unmarshal([]byte(raw), &entry); err != nil {
return totalPoints, fmt.Errorf("line %d: parse json: %w", lineNum, err)
}
label := entry.Label
iteration := extractIteration(label)
ts := time.Now().UnixNano()
// Overall capability score.
line := fmt.Sprintf(
"capability_score,model=%s,run_id=%s,label=%s,category=overall accuracy=%.6f,correct=%di,total=%di,iteration=%di %d",
EscapeLp(cfg.Model), EscapeLp(cfg.RunID), EscapeLp(label),
entry.Accuracy, entry.Correct, entry.Total, iteration, ts,
)
lines = append(lines, line)
totalPoints++
// Per-category breakdown.
for cat, block := range entry.ByCategory {
var catAccuracy float64
if block.Total > 0 {
catAccuracy = float64(block.Correct) / float64(block.Total)
}
line := fmt.Sprintf(
"capability_score,model=%s,run_id=%s,label=%s,category=%s accuracy=%.6f,correct=%di,total=%di,iteration=%di %d",
EscapeLp(cfg.Model), EscapeLp(cfg.RunID), EscapeLp(label),
EscapeLp(cat), catAccuracy, block.Correct, block.Total, iteration, ts,
)
lines = append(lines, line)
totalPoints++
}
// Flush batch if needed.
if len(lines) >= cfg.BatchSize {
if err := influx.WriteLp(lines); err != nil {
return totalPoints, fmt.Errorf("write batch: %w", err)
}
lines = lines[:0]
}
}
if err := scanner.Err(); err != nil {
return totalPoints, fmt.Errorf("scan %s: %w", cfg.CapabilityFile, err)
}
// Flush remaining lines.
if len(lines) > 0 {
if err := influx.WriteLp(lines); err != nil {
return totalPoints, fmt.Errorf("write final batch: %w", err)
}
}
fmt.Fprintf(w, " capability scores: %d points from %d lines\n", totalPoints, lineNum)
return totalPoints, nil
}
// ingestTrainingLog reads an MLX LoRA training log and writes training_loss
// measurements to InfluxDB for both training and validation loss entries.
func ingestTrainingLog(influx *InfluxClient, cfg IngestConfig, w io.Writer) (int, error) {
f, err := os.Open(cfg.TrainingLog)
if err != nil {
return 0, fmt.Errorf("open %s: %w", cfg.TrainingLog, err)
}
defer f.Close()
scanner := bufio.NewScanner(f)
scanner.Buffer(make([]byte, 1024*1024), 1024*1024)
var lines []string
var totalPoints int
lineNum := 0
for scanner.Scan() {
lineNum++
text := scanner.Text()
// Try validation loss first (shorter regex, less common).
if m := reValLoss.FindStringSubmatch(text); m != nil {
iter, _ := strconv.Atoi(m[1])
loss, _ := strconv.ParseFloat(m[2], 64)
ts := time.Now().UnixNano()
line := fmt.Sprintf(
"training_loss,model=%s,run_id=%s,loss_type=val loss=%.6f,iteration=%di %d",
EscapeLp(cfg.Model), EscapeLp(cfg.RunID), loss, iter, ts,
)
lines = append(lines, line)
totalPoints++
}
// Try training loss.
if m := reTrainLoss.FindStringSubmatch(text); m != nil {
iter, _ := strconv.Atoi(m[1])
loss, _ := strconv.ParseFloat(m[2], 64)
lr, _ := strconv.ParseFloat(m[3], 64)
itPerSec, _ := strconv.ParseFloat(m[4], 64)
tokPerSec, _ := strconv.ParseFloat(m[5], 64)
ts := time.Now().UnixNano()
line := fmt.Sprintf(
"training_loss,model=%s,run_id=%s,loss_type=train loss=%.6f,iteration=%di,learning_rate=%.10f,it_per_sec=%.4f,tokens_per_sec=%.2f %d",
EscapeLp(cfg.Model), EscapeLp(cfg.RunID), loss, iter, lr, itPerSec, tokPerSec, ts,
)
lines = append(lines, line)
totalPoints++
}
// Flush batch if needed.
if len(lines) >= cfg.BatchSize {
if err := influx.WriteLp(lines); err != nil {
return totalPoints, fmt.Errorf("write batch: %w", err)
}
lines = lines[:0]
}
}
if err := scanner.Err(); err != nil {
return totalPoints, fmt.Errorf("scan %s: %w", cfg.TrainingLog, err)
}
// Flush remaining lines.
if len(lines) > 0 {
if err := influx.WriteLp(lines); err != nil {
return totalPoints, fmt.Errorf("write final batch: %w", err)
}
}
fmt.Fprintf(w, " training log: %d points from %d lines\n", totalPoints, lineNum)
return totalPoints, nil
}
// extractIteration extracts an iteration number from a label like "model@200".
// Returns 0 if no iteration is found.
func extractIteration(label string) int {
idx := strings.LastIndex(label, "@")
if idx < 0 || idx+1 >= len(label) {
return 0
}
n, err := strconv.Atoi(label[idx+1:])
if err != nil {
return 0
}
return n
}
// toFloat64 converts a JSON-decoded interface{} value to float64.
// Handles float64 (standard json.Unmarshal), json.Number, and string values.
func toFloat64(v interface{}) (float64, bool) {
switch val := v.(type) {
case float64:
return val, true
case int:
return float64(val), true
case int64:
return float64(val), true
case json.Number:
f, err := val.Float64()
return f, err == nil
case string:
f, err := strconv.ParseFloat(val, 64)
return f, err == nil
default:
return 0, false
}
}