From 2eb17db21077232455b9ba0ae3660be9b19c8722 Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 17 Feb 2026 19:53:07 +0000 Subject: [PATCH] feat(ml): add live progress and expand-status commands MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Final two Python pipeline gaps ported to Go — InfluxDB live generation progress and DuckDB expansion pipeline status. Python scripts can now be archived. Co-Authored-By: Claude Opus 4.6 --- cmd/ml/cmd_expand_status.go | 89 +++++++++++++++++++++++++++++++++++++ cmd/ml/cmd_live.go | 68 ++++++++++++++++++++++++++++ cmd/ml/cmd_ml.go | 4 ++ 3 files changed, 161 insertions(+) create mode 100644 cmd/ml/cmd_expand_status.go create mode 100644 cmd/ml/cmd_live.go diff --git a/cmd/ml/cmd_expand_status.go b/cmd/ml/cmd_expand_status.go new file mode 100644 index 00000000..5bd8b765 --- /dev/null +++ b/cmd/ml/cmd_expand_status.go @@ -0,0 +1,89 @@ +package ml + +import ( + "fmt" + "os" + + "forge.lthn.ai/core/go/pkg/cli" + "forge.lthn.ai/core/go-ai/ml" +) + +var expandStatusCmd = &cli.Command{ + Use: "expand-status", + Short: "Show expansion pipeline progress", + Long: "Queries DuckDB for expansion prompts, generated responses, scoring status, and overall pipeline progress.", + RunE: runExpandStatus, +} + +func runExpandStatus(cmd *cli.Command, args []string) error { + path := dbPath + if path == "" { + path = os.Getenv("LEM_DB") + } + if path == "" { + return fmt.Errorf("--db or LEM_DB required") + } + + db, err := ml.OpenDB(path) + if err != nil { + return fmt.Errorf("open db: %w", err) + } + defer db.Close() + + fmt.Fprintln(os.Stdout, "LEM Expansion Pipeline Status") + fmt.Fprintln(os.Stdout, "==================================================") + + // Expansion prompts + total, pending, err := db.ExpansionPromptCounts() + if err != nil { + fmt.Fprintln(os.Stdout, " Expansion prompts: not created (run: normalize)") + return nil + } + fmt.Fprintf(os.Stdout, " Expansion prompts: %d total, %d pending\n", total, pending) + + // Generated responses + generated, models, err := db.ExpansionRawCounts() + if err != nil { + generated = 0 + fmt.Fprintln(os.Stdout, " Generated: 0 (run: core ml expand)") + } else if len(models) > 0 { + modelStr := "" + for i, m := range models { + if i > 0 { + modelStr += ", " + } + modelStr += fmt.Sprintf("%s: %d", m.Name, m.Count) + } + fmt.Fprintf(os.Stdout, " Generated: %d (%s)\n", generated, modelStr) + } else { + fmt.Fprintf(os.Stdout, " Generated: %d\n", generated) + } + + // Scored + scored, hPassed, jScored, jPassed, err := db.ExpansionScoreCounts() + if err != nil { + fmt.Fprintln(os.Stdout, " Scored: 0 (run: score --tier 1)") + } else { + fmt.Fprintf(os.Stdout, " Heuristic scored: %d (%d passed)\n", scored, hPassed) + if jScored > 0 { + fmt.Fprintf(os.Stdout, " Judge scored: %d (%d passed)\n", jScored, jPassed) + } + } + + // Pipeline progress + if total > 0 && generated > 0 { + genPct := float64(generated) / float64(total) * 100 + fmt.Fprintf(os.Stdout, "\n Progress: %.1f%% generated\n", genPct) + } + + // Golden set context + golden, err := db.GoldenSetCount() + if err == nil && golden > 0 { + fmt.Fprintf(os.Stdout, "\n Golden set: %d / %d\n", golden, targetTotal) + if generated > 0 { + fmt.Fprintf(os.Stdout, " Combined: %d total examples\n", golden+generated) + } + } + + return nil +} diff --git a/cmd/ml/cmd_live.go b/cmd/ml/cmd_live.go new file mode 100644 index 00000000..994399f5 --- /dev/null +++ b/cmd/ml/cmd_live.go @@ -0,0 +1,68 @@ +package ml + +import ( + "fmt" + "os" + + "forge.lthn.ai/core/go/pkg/cli" + "forge.lthn.ai/core/go-ai/ml" +) + +const targetTotal = 15000 + +var liveCmd = &cli.Command{ + Use: "live", + Short: "Show live generation progress from InfluxDB", + Long: "Queries InfluxDB for real-time generation progress, worker breakdown, and domain/voice counts.", + RunE: runLive, +} + +func runLive(cmd *cli.Command, args []string) error { + influx := ml.NewInfluxClient(influxURL, influxDB) + + // Total completed generations + total, err := influx.QueryScalar("SELECT count(DISTINCT i) AS n FROM gold_gen") + if err != nil { + return fmt.Errorf("live: query total: %w", err) + } + + // Distinct domains and voices + domains, err := influx.QueryScalar("SELECT count(DISTINCT d) AS n FROM gold_gen") + if err != nil { + return fmt.Errorf("live: query domains: %w", err) + } + + voices, err := influx.QueryScalar("SELECT count(DISTINCT v) AS n FROM gold_gen") + if err != nil { + return fmt.Errorf("live: query voices: %w", err) + } + + // Per-worker breakdown + workers, err := influx.QueryRows("SELECT w, count(DISTINCT i) AS n FROM gold_gen GROUP BY w ORDER BY n DESC") + if err != nil { + return fmt.Errorf("live: query workers: %w", err) + } + + pct := float64(total) / float64(targetTotal) * 100 + remaining := targetTotal - total + + fmt.Fprintln(os.Stdout, "Golden Set Live Status (from InfluxDB)") + fmt.Fprintln(os.Stdout, "─────────────────────────────────────────────") + fmt.Fprintf(os.Stdout, " Total: %d / %d (%.1f%%)\n", total, targetTotal, pct) + fmt.Fprintf(os.Stdout, " Remaining: %d\n", remaining) + fmt.Fprintf(os.Stdout, " Domains: %d\n", domains) + fmt.Fprintf(os.Stdout, " Voices: %d\n", voices) + fmt.Fprintln(os.Stdout) + fmt.Fprintln(os.Stdout, " Workers:") + for _, w := range workers { + name := w["w"] + n := w["n"] + marker := "" + if name == "migration" { + marker = " (seed data)" + } + fmt.Fprintf(os.Stdout, " %-20s %6s generations%s\n", name, n, marker) + } + + return nil +} diff --git a/cmd/ml/cmd_ml.go b/cmd/ml/cmd_ml.go index cee1b052..f8dab296 100644 --- a/cmd/ml/cmd_ml.go +++ b/cmd/ml/cmd_ml.go @@ -22,6 +22,8 @@ // - core ml approve: Filter scored expansions into training JSONL // - core ml publish: Upload Parquet dataset to HuggingFace Hub // - core ml coverage: Analyze seed coverage by region and domain +// - core ml live: Show live generation progress from InfluxDB +// - core ml expand-status: Show expansion pipeline progress package ml import ( @@ -62,6 +64,8 @@ func AddMLCommands(root *cli.Command) { mlCmd.AddCommand(approveCmd) mlCmd.AddCommand(publishCmd) mlCmd.AddCommand(coverageCmd) + mlCmd.AddCommand(liveCmd) + mlCmd.AddCommand(expandStatusCmd) root.AddCommand(mlCmd) }