cli/pkg/ml/status.go
Claude 548256312d feat: add ML inference, scoring, and training pipeline (pkg/ml)
Port LEM scoring/training pipeline into CoreGo as pkg/ml with:
- Inference abstraction with HTTP, llama-server, and Ollama backends
- 3-tier scoring engine (heuristic, exact, LLM judge)
- Capability and content probes for model evaluation
- GGUF/safetensors format converters, MLX to PEFT adapter conversion
- DuckDB integration for training data pipeline
- InfluxDB metrics for lab dashboard
- Training data export (JSONL + Parquet)
- Expansion generation pipeline with distributed workers
- 10 CLI commands under 'core ml' (score, probe, export, expand, status, gguf, convert, agent, worker)
- 5 MCP tools (ml_generate, ml_score, ml_probe, ml_status, ml_backends)

All 37 ML tests passing. Binary builds at 138MB with all commands.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-16 05:53:52 +00:00

212 lines
4.9 KiB
Go

package ml
import (
"fmt"
"io"
"sort"
)
// trainingRow holds deduplicated training status + loss for a single model.
type trainingRow struct {
model string
status string
iteration int
totalIters int
pct float64
loss float64
hasLoss bool
}
// genRow holds deduplicated generation progress for a single worker.
type genRow struct {
worker string
completed int
target int
pct float64
}
// PrintStatus queries InfluxDB for training and generation progress and writes
// a formatted summary to w.
func PrintStatus(influx *InfluxClient, w io.Writer) error {
statusRows, err := influx.QuerySQL(
"SELECT model, run_id, status, iteration, total_iters, pct FROM training_status ORDER BY time DESC LIMIT 10",
)
if err != nil {
statusRows = nil
}
lossRows, err := influx.QuerySQL(
"SELECT model, loss_type, loss, iteration, tokens_per_sec FROM training_loss WHERE loss_type = 'train' ORDER BY time DESC LIMIT 10",
)
if err != nil {
lossRows = nil
}
goldenRows, err := influx.QuerySQL(
"SELECT worker, completed, target, pct FROM golden_gen_progress ORDER BY time DESC LIMIT 5",
)
if err != nil {
goldenRows = nil
}
expansionRows, err := influx.QuerySQL(
"SELECT worker, completed, target, pct FROM expansion_progress ORDER BY time DESC LIMIT 5",
)
if err != nil {
expansionRows = nil
}
training := dedupeTraining(statusRows, lossRows)
golden := dedupeGeneration(goldenRows)
expansion := dedupeGeneration(expansionRows)
fmt.Fprintln(w, "Training:")
if len(training) == 0 {
fmt.Fprintln(w, " (no data)")
} else {
for _, tr := range training {
progress := fmt.Sprintf("%d/%d", tr.iteration, tr.totalIters)
pct := fmt.Sprintf("%.1f%%", tr.pct)
if tr.hasLoss {
fmt.Fprintf(w, " %-13s %-9s %9s %7s loss=%.3f\n",
tr.model, tr.status, progress, pct, tr.loss)
} else {
fmt.Fprintf(w, " %-13s %-9s %9s %7s\n",
tr.model, tr.status, progress, pct)
}
}
}
fmt.Fprintln(w)
fmt.Fprintln(w, "Generation:")
hasGenData := false
if len(golden) > 0 {
hasGenData = true
for _, g := range golden {
progress := fmt.Sprintf("%d/%d", g.completed, g.target)
pct := fmt.Sprintf("%.1f%%", g.pct)
fmt.Fprintf(w, " %-13s %11s %7s (%s)\n", "golden", progress, pct, g.worker)
}
}
if len(expansion) > 0 {
hasGenData = true
for _, g := range expansion {
progress := fmt.Sprintf("%d/%d", g.completed, g.target)
pct := fmt.Sprintf("%.1f%%", g.pct)
fmt.Fprintf(w, " %-13s %11s %7s (%s)\n", "expansion", progress, pct, g.worker)
}
}
if !hasGenData {
fmt.Fprintln(w, " (no data)")
}
return nil
}
// dedupeTraining merges training status and loss rows, keeping only the first
// (latest) row per model.
func dedupeTraining(statusRows, lossRows []map[string]interface{}) []trainingRow {
lossMap := make(map[string]float64)
lossSeenMap := make(map[string]bool)
for _, row := range lossRows {
model := strVal(row, "model")
if model == "" || lossSeenMap[model] {
continue
}
lossSeenMap[model] = true
lossMap[model] = floatVal(row, "loss")
}
seen := make(map[string]bool)
var rows []trainingRow
for _, row := range statusRows {
model := strVal(row, "model")
if model == "" || seen[model] {
continue
}
seen[model] = true
tr := trainingRow{
model: model,
status: strVal(row, "status"),
iteration: intVal(row, "iteration"),
totalIters: intVal(row, "total_iters"),
pct: floatVal(row, "pct"),
}
if loss, ok := lossMap[model]; ok {
tr.loss = loss
tr.hasLoss = true
}
rows = append(rows, tr)
}
sort.Slice(rows, func(i, j int) bool {
return rows[i].model < rows[j].model
})
return rows
}
// dedupeGeneration deduplicates generation progress rows by worker.
func dedupeGeneration(rows []map[string]interface{}) []genRow {
seen := make(map[string]bool)
var result []genRow
for _, row := range rows {
worker := strVal(row, "worker")
if worker == "" || seen[worker] {
continue
}
seen[worker] = true
result = append(result, genRow{
worker: worker,
completed: intVal(row, "completed"),
target: intVal(row, "target"),
pct: floatVal(row, "pct"),
})
}
sort.Slice(result, func(i, j int) bool {
return result[i].worker < result[j].worker
})
return result
}
// strVal extracts a string value from a row map.
func strVal(row map[string]interface{}, key string) string {
v, ok := row[key]
if !ok {
return ""
}
s, ok := v.(string)
if !ok {
return ""
}
return s
}
// floatVal extracts a float64 value from a row map.
func floatVal(row map[string]interface{}, key string) float64 {
v, ok := row[key]
if !ok {
return 0
}
f, ok := v.(float64)
if !ok {
return 0
}
return f
}
// intVal extracts an integer value from a row map. InfluxDB JSON returns all
// numbers as float64, so this truncates to int.
func intVal(row map[string]interface{}, key string) int {
return int(floatVal(row, key))
}