diff --git a/internal/cmd/lab/cmd_lab.go b/internal/cmd/lab/cmd_lab.go new file mode 100644 index 00000000..89cb270b --- /dev/null +++ b/internal/cmd/lab/cmd_lab.go @@ -0,0 +1,138 @@ +package lab + +import ( + "context" + "log/slog" + "net/http" + "os" + "os/signal" + "time" + + "forge.lthn.ai/core/cli/pkg/cli" + "forge.lthn.ai/core/cli/pkg/lab" + "forge.lthn.ai/core/cli/pkg/lab/collector" + "forge.lthn.ai/core/cli/pkg/lab/handler" +) + +func init() { + cli.RegisterCommands(AddLabCommands) +} + +var labCmd = &cli.Command{ + Use: "lab", + Short: "Homelab monitoring dashboard", + Long: "Lab dashboard with real-time monitoring of machines, training runs, models, and services.", +} + +var ( + labBind string +) + +var serveCmd = &cli.Command{ + Use: "serve", + Short: "Start the lab dashboard web server", + Long: "Starts the lab dashboard HTTP server with live-updating collectors for system stats, Docker, Forgejo, HuggingFace, InfluxDB, and more.", + RunE: runServe, +} + +func init() { + serveCmd.Flags().StringVar(&labBind, "bind", ":8080", "HTTP listen address") +} + +// AddLabCommands registers the 'lab' command and subcommands. +func AddLabCommands(root *cli.Command) { + labCmd.AddCommand(serveCmd) + root.AddCommand(labCmd) +} + +func runServe(cmd *cli.Command, args []string) error { + cfg := lab.LoadConfig() + cfg.Addr = labBind + + store := lab.NewStore() + logger := slog.New(slog.NewJSONHandler(os.Stdout, nil)) + + // Setup collectors. + reg := collector.NewRegistry(logger) + reg.Register(collector.NewSystem(cfg, store), 60*time.Second) + reg.Register(collector.NewPrometheus(cfg.PrometheusURL, store), + time.Duration(cfg.PrometheusInterval)*time.Second) + reg.Register(collector.NewHuggingFace(cfg.HFAuthor, store), + time.Duration(cfg.HFInterval)*time.Second) + reg.Register(collector.NewDocker(store), + time.Duration(cfg.DockerInterval)*time.Second) + + if cfg.ForgeToken != "" { + reg.Register(collector.NewForgejo(cfg.ForgeURL, cfg.ForgeToken, store), + time.Duration(cfg.ForgeInterval)*time.Second) + } + + reg.Register(collector.NewTraining(cfg, store), + time.Duration(cfg.TrainingInterval)*time.Second) + reg.Register(collector.NewServices(store), 60*time.Second) + + if cfg.InfluxToken != "" { + reg.Register(collector.NewInfluxDB(cfg, store), + time.Duration(cfg.InfluxInterval)*time.Second) + } + + ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt) + defer cancel() + reg.Start(ctx) + defer reg.Stop() + + // Setup HTTP handlers. + web := handler.NewWebHandler(store) + api := handler.NewAPIHandler(store) + + mux := http.NewServeMux() + + // Web pages. + mux.HandleFunc("GET /", web.Dashboard) + mux.HandleFunc("GET /models", web.Models) + mux.HandleFunc("GET /training", web.Training) + mux.HandleFunc("GET /dataset", web.Dataset) + mux.HandleFunc("GET /golden-set", func(w http.ResponseWriter, r *http.Request) { + http.Redirect(w, r, "/dataset", http.StatusMovedPermanently) + }) + mux.HandleFunc("GET /runs", func(w http.ResponseWriter, r *http.Request) { + http.Redirect(w, r, "/training", http.StatusMovedPermanently) + }) + mux.HandleFunc("GET /agents", web.Agents) + mux.HandleFunc("GET /services", web.Services) + + // SSE for live updates. + mux.HandleFunc("GET /events", web.Events) + + // JSON API. + mux.HandleFunc("GET /api/status", api.Status) + mux.HandleFunc("GET /api/models", api.Models) + mux.HandleFunc("GET /api/training", api.Training) + mux.HandleFunc("GET /api/dataset", api.GoldenSet) + mux.HandleFunc("GET /api/golden-set", api.GoldenSet) + mux.HandleFunc("GET /api/runs", api.Runs) + mux.HandleFunc("GET /api/agents", api.Agents) + mux.HandleFunc("GET /api/services", api.Services) + mux.HandleFunc("GET /health", api.Health) + + srv := &http.Server{ + Addr: cfg.Addr, + Handler: mux, + ReadTimeout: 5 * time.Second, + WriteTimeout: 10 * time.Second, + } + + go func() { + <-ctx.Done() + logger.Info("shutting down") + shutCtx, shutCancel := context.WithTimeout(context.Background(), 5*time.Second) + defer shutCancel() + srv.Shutdown(shutCtx) + }() + + logger.Info("lab dashboard starting", "addr", cfg.Addr) + if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { + return err + } + return nil +} diff --git a/internal/variants/full.go b/internal/variants/full.go index 1c1d8330..83ec73da 100644 --- a/internal/variants/full.go +++ b/internal/variants/full.go @@ -44,6 +44,7 @@ import ( _ "forge.lthn.ai/core/cli/internal/cmd/gitcmd" _ "forge.lthn.ai/core/cli/internal/cmd/go" _ "forge.lthn.ai/core/cli/internal/cmd/help" + _ "forge.lthn.ai/core/cli/internal/cmd/lab" _ "forge.lthn.ai/core/cli/internal/cmd/mcpcmd" _ "forge.lthn.ai/core/cli/internal/cmd/ml" _ "forge.lthn.ai/core/cli/internal/cmd/monitor" diff --git a/pkg/lab/collector/collector.go b/pkg/lab/collector/collector.go new file mode 100644 index 00000000..9796bc41 --- /dev/null +++ b/pkg/lab/collector/collector.go @@ -0,0 +1,82 @@ +package collector + +import ( + "context" + "log/slog" + "sync" + "time" +) + +type Collector interface { + Name() string + Collect(ctx context.Context) error +} + +type Registry struct { + mu sync.Mutex + entries []entry + logger *slog.Logger +} + +type entry struct { + c Collector + interval time.Duration + cancel context.CancelFunc +} + +func NewRegistry(logger *slog.Logger) *Registry { + return &Registry{logger: logger} +} + +func (r *Registry) Register(c Collector, interval time.Duration) { + r.mu.Lock() + defer r.mu.Unlock() + r.entries = append(r.entries, entry{c: c, interval: interval}) +} + +func (r *Registry) Start(ctx context.Context) { + r.mu.Lock() + defer r.mu.Unlock() + + for i := range r.entries { + e := &r.entries[i] + cctx, cancel := context.WithCancel(ctx) + e.cancel = cancel + go r.run(cctx, e.c, e.interval) + } +} + +func (r *Registry) run(ctx context.Context, c Collector, interval time.Duration) { + r.logger.Info("collector started", "name", c.Name(), "interval", interval) + + // Run immediately on start. + if err := c.Collect(ctx); err != nil { + r.logger.Warn("collector error", "name", c.Name(), "err", err) + } + + ticker := time.NewTicker(interval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + r.logger.Info("collector stopped", "name", c.Name()) + return + case <-ticker.C: + if err := c.Collect(ctx); err != nil { + r.logger.Warn("collector error", "name", c.Name(), "err", err) + } + } + } +} + +func (r *Registry) Stop() { + r.mu.Lock() + defer r.mu.Unlock() + + for _, e := range r.entries { + if e.cancel != nil { + e.cancel() + } + } +} diff --git a/pkg/lab/collector/docker.go b/pkg/lab/collector/docker.go new file mode 100644 index 00000000..0d930544 --- /dev/null +++ b/pkg/lab/collector/docker.go @@ -0,0 +1,94 @@ +package collector + +import ( + "context" + "encoding/json" + "fmt" + "net" + "net/http" + "time" + + "forge.lthn.ai/core/cli/pkg/lab" +) + +type Docker struct { + store *lab.Store +} + +func NewDocker(s *lab.Store) *Docker { + return &Docker{store: s} +} + +func (d *Docker) Name() string { return "docker" } + +func (d *Docker) Collect(ctx context.Context) error { + client := &http.Client{ + Timeout: 5 * time.Second, + Transport: &http.Transport{ + DialContext: func(ctx context.Context, _, _ string) (net.Conn, error) { + return net.Dial("unix", "/var/run/docker.sock") + }, + }, + } + + req, err := http.NewRequestWithContext(ctx, "GET", "http://docker/containers/json?all=true", nil) + if err != nil { + return err + } + + resp, err := client.Do(req) + if err != nil { + d.store.SetError("docker", err) + return err + } + defer resp.Body.Close() + + var containers []struct { + Names []string `json:"Names"` + Image string `json:"Image"` + State string `json:"State"` + Status string `json:"Status"` + Created int64 `json:"Created"` + } + + if err := json.NewDecoder(resp.Body).Decode(&containers); err != nil { + d.store.SetError("docker", err) + return err + } + + var result []lab.Container + for _, c := range containers { + name := "" + if len(c.Names) > 0 { + name = c.Names[0] + if len(name) > 0 && name[0] == '/' { + name = name[1:] + } + } + + created := time.Unix(c.Created, 0) + uptime := "" + if c.State == "running" { + d := time.Since(created) + days := int(d.Hours()) / 24 + hours := int(d.Hours()) % 24 + if days > 0 { + uptime = fmt.Sprintf("%dd %dh", days, hours) + } else { + uptime = fmt.Sprintf("%dh %dm", hours, int(d.Minutes())%60) + } + } + + result = append(result, lab.Container{ + Name: name, + Status: c.State, + Image: c.Image, + Uptime: uptime, + Created: created, + }) + } + + d.store.SetContainers(result) + d.store.SetError("docker", nil) + return nil +} diff --git a/pkg/lab/collector/forgejo.go b/pkg/lab/collector/forgejo.go new file mode 100644 index 00000000..3d06cd1a --- /dev/null +++ b/pkg/lab/collector/forgejo.go @@ -0,0 +1,130 @@ +package collector + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "time" + + "forge.lthn.ai/core/cli/pkg/lab" +) + +type Forgejo struct { + url string + token string + store *lab.Store +} + +func NewForgejo(forgeURL, token string, s *lab.Store) *Forgejo { + return &Forgejo{url: forgeURL, token: token, store: s} +} + +func (f *Forgejo) Name() string { return "forgejo" } + +func (f *Forgejo) Collect(ctx context.Context) error { + if f.token == "" { + return nil + } + + commits, err := f.recentActivity(ctx) + if err != nil { + f.store.SetError("forgejo", err) + return err + } + + f.store.SetCommits(commits) + f.store.SetError("forgejo", nil) + return nil +} + +type forgeRepo struct { + FullName string `json:"full_name"` + UpdatedAt time.Time `json:"updated_at"` +} + +type forgeCommit struct { + SHA string `json:"sha"` + Commit struct { + Message string `json:"message"` + Author struct { + Name string `json:"name"` + Date time.Time `json:"date"` + } `json:"author"` + } `json:"commit"` +} + +func (f *Forgejo) recentActivity(ctx context.Context) ([]lab.Commit, error) { + // Get recently updated repos + repos, err := f.apiGet(ctx, "/api/v1/repos/search?sort=updated&order=desc&limit=5") + if err != nil { + return nil, err + } + + var repoList []forgeRepo + if err := json.Unmarshal(repos, &repoList); err != nil { + // The search API wraps in {"data": [...], "ok": true} + var wrapped struct { + Data []forgeRepo `json:"data"` + } + if err2 := json.Unmarshal(repos, &wrapped); err2 != nil { + return nil, err + } + repoList = wrapped.Data + } + + var commits []lab.Commit + for _, repo := range repoList { + if len(commits) >= 10 { + break + } + data, err := f.apiGet(ctx, fmt.Sprintf("/api/v1/repos/%s/commits?limit=2", repo.FullName)) + if err != nil { + continue + } + var fc []forgeCommit + if err := json.Unmarshal(data, &fc); err != nil { + continue + } + for _, c := range fc { + msg := c.Commit.Message + if len(msg) > 80 { + msg = msg[:77] + "..." + } + commits = append(commits, lab.Commit{ + SHA: c.SHA[:8], + Message: msg, + Author: c.Commit.Author.Name, + Repo: repo.FullName, + Timestamp: c.Commit.Author.Date, + }) + } + } + + return commits, nil +} + +func (f *Forgejo) apiGet(ctx context.Context, path string) (json.RawMessage, error) { + req, err := http.NewRequestWithContext(ctx, "GET", f.url+path, nil) + if err != nil { + return nil, err + } + req.Header.Set("Authorization", "token "+f.token) + + client := &http.Client{Timeout: 10 * time.Second} + resp, err := client.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if resp.StatusCode != 200 { + return nil, fmt.Errorf("forgejo %s returned %d", path, resp.StatusCode) + } + + var raw json.RawMessage + if err := json.NewDecoder(resp.Body).Decode(&raw); err != nil { + return nil, err + } + return raw, nil +} diff --git a/pkg/lab/collector/huggingface.go b/pkg/lab/collector/huggingface.go new file mode 100644 index 00000000..0fd9e640 --- /dev/null +++ b/pkg/lab/collector/huggingface.go @@ -0,0 +1,55 @@ +package collector + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "time" + + "forge.lthn.ai/core/cli/pkg/lab" +) + +type HuggingFace struct { + author string + store *lab.Store +} + +func NewHuggingFace(author string, s *lab.Store) *HuggingFace { + return &HuggingFace{author: author, store: s} +} + +func (h *HuggingFace) Name() string { return "huggingface" } + +func (h *HuggingFace) Collect(ctx context.Context) error { + u := fmt.Sprintf("https://huggingface.co/api/models?author=%s&sort=downloads&direction=-1&limit=20", h.author) + + req, err := http.NewRequestWithContext(ctx, "GET", u, nil) + if err != nil { + return err + } + + client := &http.Client{Timeout: 10 * time.Second} + resp, err := client.Do(req) + if err != nil { + h.store.SetError("huggingface", err) + return err + } + defer resp.Body.Close() + + if resp.StatusCode != 200 { + err := fmt.Errorf("HuggingFace API returned %d", resp.StatusCode) + h.store.SetError("huggingface", err) + return err + } + + var models []lab.HFModel + if err := json.NewDecoder(resp.Body).Decode(&models); err != nil { + h.store.SetError("huggingface", err) + return err + } + + h.store.SetModels(models) + h.store.SetError("huggingface", nil) + return nil +} diff --git a/pkg/lab/collector/influxdb.go b/pkg/lab/collector/influxdb.go new file mode 100644 index 00000000..c578d8a0 --- /dev/null +++ b/pkg/lab/collector/influxdb.go @@ -0,0 +1,354 @@ +package collector + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "sort" + "strings" + "time" + + "forge.lthn.ai/core/cli/pkg/lab" +) + +type InfluxDB struct { + cfg *lab.Config + store *lab.Store +} + +func NewInfluxDB(cfg *lab.Config, s *lab.Store) *InfluxDB { + return &InfluxDB{cfg: cfg, store: s} +} + +func (i *InfluxDB) Name() string { return "influxdb" } + +func (i *InfluxDB) Collect(ctx context.Context) error { + if i.cfg.InfluxURL == "" || i.cfg.InfluxToken == "" { + return nil + } + + data := lab.BenchmarkData{ + Loss: make(map[string][]lab.LossPoint), + Content: make(map[string][]lab.ContentPoint), + Capability: make(map[string][]lab.CapabilityPoint), + CapabilityJudge: make(map[string][]lab.CapabilityJudgePoint), + UpdatedAt: time.Now(), + } + + // Collect all run identifiers from each measurement. + runSet := map[string]lab.BenchmarkRun{} + + // Training loss data. + if rows, err := i.query(ctx, "SELECT run_id, model, iteration, loss, loss_type, learning_rate, iterations_per_sec, tokens_per_sec FROM training_loss ORDER BY run_id, iteration"); err == nil { + for _, row := range rows { + rid := jsonStr(row["run_id"]) + mdl := jsonStr(row["model"]) + if rid == "" { + continue + } + runSet[rid] = lab.BenchmarkRun{RunID: rid, Model: mdl, Type: "training"} + data.Loss[rid] = append(data.Loss[rid], lab.LossPoint{ + Iteration: jsonInt(row["iteration"]), + Loss: jsonFloat(row["loss"]), + LossType: jsonStr(row["loss_type"]), + LearningRate: jsonFloat(row["learning_rate"]), + TokensPerSec: jsonFloat(row["tokens_per_sec"]), + }) + } + } + + // Content scores. + if rows, err := i.query(ctx, "SELECT run_id, model, label, dimension, score, iteration, has_kernel FROM content_score ORDER BY run_id, iteration, dimension"); err == nil { + for _, row := range rows { + rid := jsonStr(row["run_id"]) + mdl := jsonStr(row["model"]) + if rid == "" { + continue + } + if _, ok := runSet[rid]; !ok { + runSet[rid] = lab.BenchmarkRun{RunID: rid, Model: mdl, Type: "content"} + } + hk := jsonStr(row["has_kernel"]) + data.Content[rid] = append(data.Content[rid], lab.ContentPoint{ + Label: jsonStr(row["label"]), + Dimension: jsonStr(row["dimension"]), + Score: jsonFloat(row["score"]), + Iteration: jsonInt(row["iteration"]), + HasKernel: hk == "true" || hk == "True", + }) + } + } + + // Capability scores. + if rows, err := i.query(ctx, "SELECT run_id, model, label, category, accuracy, correct, total, iteration FROM capability_score ORDER BY run_id, iteration, category"); err == nil { + for _, row := range rows { + rid := jsonStr(row["run_id"]) + mdl := jsonStr(row["model"]) + if rid == "" { + continue + } + if _, ok := runSet[rid]; !ok { + runSet[rid] = lab.BenchmarkRun{RunID: rid, Model: mdl, Type: "capability"} + } + data.Capability[rid] = append(data.Capability[rid], lab.CapabilityPoint{ + Label: jsonStr(row["label"]), + Category: jsonStr(row["category"]), + Accuracy: jsonFloat(row["accuracy"]), + Correct: jsonInt(row["correct"]), + Total: jsonInt(row["total"]), + Iteration: jsonInt(row["iteration"]), + }) + } + } + + // Capability judge scores (0-10 per probe). + if rows, err := i.query(ctx, "SELECT run_id, model, label, probe_id, category, reasoning, correctness, clarity, avg, iteration FROM capability_judge ORDER BY run_id, iteration, probe_id"); err == nil { + for _, row := range rows { + rid := jsonStr(row["run_id"]) + if rid == "" { + continue + } + data.CapabilityJudge[rid] = append(data.CapabilityJudge[rid], lab.CapabilityJudgePoint{ + Label: jsonStr(row["label"]), + ProbeID: jsonStr(row["probe_id"]), + Category: jsonStr(row["category"]), + Reasoning: jsonFloat(row["reasoning"]), + Correctness: jsonFloat(row["correctness"]), + Clarity: jsonFloat(row["clarity"]), + Avg: jsonFloat(row["avg"]), + Iteration: jsonInt(row["iteration"]), + }) + } + } + + // Build sorted runs list. + for _, r := range runSet { + data.Runs = append(data.Runs, r) + } + sort.Slice(data.Runs, func(i, j int) bool { + return data.Runs[i].Model < data.Runs[j].Model || (data.Runs[i].Model == data.Runs[j].Model && data.Runs[i].RunID < data.Runs[j].RunID) + }) + + i.store.SetBenchmarks(data) + + // Live training run statuses. + var runStatuses []lab.TrainingRunStatus + if rows, err := i.query(ctx, "SELECT model, run_id, status, iteration, total_iters, pct FROM training_status ORDER BY time DESC LIMIT 50"); err == nil { + // Deduplicate: keep only the latest status per run_id. + seen := map[string]bool{} + for _, row := range rows { + rid := jsonStr(row["run_id"]) + if rid == "" || seen[rid] { + continue + } + seen[rid] = true + rs := lab.TrainingRunStatus{ + Model: jsonStr(row["model"]), + RunID: rid, + Status: jsonStr(row["status"]), + Iteration: jsonInt(row["iteration"]), + TotalIters: jsonInt(row["total_iters"]), + Pct: jsonFloat(row["pct"]), + } + // Find latest loss for this run from already-collected data. + if lossPoints, ok := data.Loss[rid]; ok { + for j := len(lossPoints) - 1; j >= 0; j-- { + if lossPoints[j].LossType == "train" && rs.LastLoss == 0 { + rs.LastLoss = lossPoints[j].Loss + rs.TokensSec = lossPoints[j].TokensPerSec + } + if lossPoints[j].LossType == "val" && rs.ValLoss == 0 { + rs.ValLoss = lossPoints[j].Loss + } + if rs.LastLoss > 0 && rs.ValLoss > 0 { + break + } + } + } + runStatuses = append(runStatuses, rs) + } + } + i.store.SetTrainingRuns(runStatuses) + + // Golden set data explorer — query gold_gen (real-time per-generation records). + gs := lab.GoldenSetSummary{TargetTotal: 15000, UpdatedAt: time.Now()} + + // Try real-time gold_gen first (populated by lem_generate.py directly). + if rows, err := i.query(ctx, "SELECT count(DISTINCT i) AS total, count(DISTINCT d) AS domains, count(DISTINCT v) AS voices, avg(gen_time) AS avg_t, avg(chars) AS avg_c FROM gold_gen"); err == nil && len(rows) > 0 { + r := rows[0] + total := jsonInt(r["total"]) + if total > 0 { + gs.Available = true + gs.TotalExamples = total + gs.Domains = jsonInt(r["domains"]) + gs.Voices = jsonInt(r["voices"]) + gs.AvgGenTime = jsonFloat(r["avg_t"]) + gs.AvgResponseChars = jsonFloat(r["avg_c"]) + gs.CompletionPct = float64(total) / float64(gs.TargetTotal) * 100 + } + } + + // Fallback to pipeline.py metrics if gold_gen isn't populated. + if !gs.Available { + if rows, err := i.query(ctx, "SELECT total_examples, domains, voices, avg_gen_time, avg_response_chars, completion_pct FROM golden_set_stats ORDER BY time DESC LIMIT 1"); err == nil && len(rows) > 0 { + r := rows[0] + gs.Available = true + gs.TotalExamples = jsonInt(r["total_examples"]) + gs.Domains = jsonInt(r["domains"]) + gs.Voices = jsonInt(r["voices"]) + gs.AvgGenTime = jsonFloat(r["avg_gen_time"]) + gs.AvgResponseChars = jsonFloat(r["avg_response_chars"]) + gs.CompletionPct = jsonFloat(r["completion_pct"]) + } + } + + if gs.Available { + // Per-domain from gold_gen. + if rows, err := i.query(ctx, "SELECT d, count(DISTINCT i) AS n, avg(gen_time) AS avg_t FROM gold_gen GROUP BY d ORDER BY n DESC"); err == nil && len(rows) > 0 { + for _, r := range rows { + gs.DomainStats = append(gs.DomainStats, lab.DomainStat{ + Domain: jsonStr(r["d"]), + Count: jsonInt(r["n"]), + AvgGenTime: jsonFloat(r["avg_t"]), + }) + } + } + // Fallback to pipeline stats. + if len(gs.DomainStats) == 0 { + if rows, err := i.query(ctx, "SELECT DISTINCT domain, count, avg_gen_time FROM golden_set_domain ORDER BY count DESC"); err == nil { + for _, r := range rows { + gs.DomainStats = append(gs.DomainStats, lab.DomainStat{ + Domain: jsonStr(r["domain"]), + Count: jsonInt(r["count"]), + AvgGenTime: jsonFloat(r["avg_gen_time"]), + }) + } + } + } + + // Per-voice from gold_gen. + if rows, err := i.query(ctx, "SELECT v, count(DISTINCT i) AS n, avg(chars) AS avg_c, avg(gen_time) AS avg_t FROM gold_gen GROUP BY v ORDER BY n DESC"); err == nil && len(rows) > 0 { + for _, r := range rows { + gs.VoiceStats = append(gs.VoiceStats, lab.VoiceStat{ + Voice: jsonStr(r["v"]), + Count: jsonInt(r["n"]), + AvgChars: jsonFloat(r["avg_c"]), + AvgGenTime: jsonFloat(r["avg_t"]), + }) + } + } + // Fallback. + if len(gs.VoiceStats) == 0 { + if rows, err := i.query(ctx, "SELECT DISTINCT voice, count, avg_chars, avg_gen_time FROM golden_set_voice ORDER BY count DESC"); err == nil { + for _, r := range rows { + gs.VoiceStats = append(gs.VoiceStats, lab.VoiceStat{ + Voice: jsonStr(r["voice"]), + Count: jsonInt(r["count"]), + AvgChars: jsonFloat(r["avg_chars"]), + AvgGenTime: jsonFloat(r["avg_gen_time"]), + }) + } + } + } + } + // Worker activity. + if rows, err := i.query(ctx, "SELECT w, count(DISTINCT i) AS n, max(time) AS last_seen FROM gold_gen GROUP BY w ORDER BY n DESC"); err == nil { + for _, r := range rows { + gs.Workers = append(gs.Workers, lab.WorkerStat{ + Worker: jsonStr(r["w"]), + Count: jsonInt(r["n"]), + }) + } + } + + i.store.SetGoldenSet(gs) + + // Dataset stats (from DuckDB, pushed as dataset_stats measurement). + ds := lab.DatasetSummary{UpdatedAt: time.Now()} + if rows, err := i.query(ctx, "SELECT table, rows FROM dataset_stats ORDER BY rows DESC"); err == nil && len(rows) > 0 { + ds.Available = true + for _, r := range rows { + ds.Tables = append(ds.Tables, lab.DatasetTable{ + Name: jsonStr(r["table"]), + Rows: jsonInt(r["rows"]), + }) + } + } + i.store.SetDataset(ds) + + i.store.SetError("influxdb", nil) + return nil +} + +func (i *InfluxDB) query(ctx context.Context, sql string) ([]map[string]any, error) { + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + body := fmt.Sprintf(`{"db":%q,"q":%q}`, i.cfg.InfluxDB, sql) + req, err := http.NewRequestWithContext(ctx, "POST", i.cfg.InfluxURL+"/api/v3/query_sql", strings.NewReader(body)) + if err != nil { + return nil, err + } + req.Header.Set("Authorization", "Bearer "+i.cfg.InfluxToken) + req.Header.Set("Content-Type", "application/json") + + resp, err := http.DefaultClient.Do(req) + if err != nil { + i.store.SetError("influxdb", err) + return nil, err + } + defer resp.Body.Close() + + if resp.StatusCode != 200 { + err := fmt.Errorf("influxdb query returned %d", resp.StatusCode) + i.store.SetError("influxdb", err) + return nil, err + } + + var rows []map[string]any + if err := json.NewDecoder(resp.Body).Decode(&rows); err != nil { + return nil, err + } + return rows, nil +} + +// JSON value helpers — InfluxDB 3 returns typed JSON values. + +func jsonStr(v any) string { + if v == nil { + return "" + } + if s, ok := v.(string); ok { + return s + } + return fmt.Sprintf("%v", v) +} + +func jsonFloat(v any) float64 { + if v == nil { + return 0 + } + switch n := v.(type) { + case float64: + return n + case json.Number: + f, _ := n.Float64() + return f + } + return 0 +} + +func jsonInt(v any) int { + if v == nil { + return 0 + } + switch n := v.(type) { + case float64: + return int(n) + case json.Number: + i, _ := n.Int64() + return int(i) + } + return 0 +} diff --git a/pkg/lab/collector/prometheus.go b/pkg/lab/collector/prometheus.go new file mode 100644 index 00000000..6b4d324f --- /dev/null +++ b/pkg/lab/collector/prometheus.go @@ -0,0 +1,104 @@ +package collector + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "net/url" + "strconv" + "time" + + "forge.lthn.ai/core/cli/pkg/lab" +) + +type Prometheus struct { + url string + store *lab.Store +} + +func NewPrometheus(promURL string, s *lab.Store) *Prometheus { + return &Prometheus{url: promURL, store: s} +} + +func (p *Prometheus) Name() string { return "prometheus" } + +func (p *Prometheus) Collect(ctx context.Context) error { + // Machine stats are handled by the system collector (direct /proc + SSH). + // This collector only queries agent metrics from Prometheus. + agents := lab.AgentSummary{} + if v, err := p.query(ctx, "agents_registered_total"); err == nil && v != nil { + agents.RegisteredTotal = int(*v) + agents.Available = true + } + if v, err := p.query(ctx, "agents_queue_pending"); err == nil && v != nil { + agents.QueuePending = int(*v) + } + if v, err := p.query(ctx, "agents_tasks_completed_total"); err == nil && v != nil { + agents.TasksCompleted = int(*v) + } + if v, err := p.query(ctx, "agents_tasks_failed_total"); err == nil && v != nil { + agents.TasksFailed = int(*v) + } + if v, err := p.query(ctx, "agents_capabilities_count"); err == nil && v != nil { + agents.Capabilities = int(*v) + } + if v, err := p.query(ctx, "agents_heartbeat_age_seconds"); err == nil && v != nil { + agents.HeartbeatAge = *v + } + if v, err := p.query(ctx, "agents_exporter_up"); err == nil && v != nil { + agents.ExporterUp = *v > 0 + } + + p.store.SetAgents(agents) + p.store.SetError("prometheus", nil) + return nil +} + +type promResponse struct { + Status string `json:"status"` + Data struct { + ResultType string `json:"resultType"` + Result []struct { + Value [2]json.RawMessage `json:"value"` + } `json:"result"` + } `json:"data"` +} + +func (p *Prometheus) query(ctx context.Context, promql string) (*float64, error) { + u := fmt.Sprintf("%s/api/v1/query?query=%s", p.url, url.QueryEscape(promql)) + + req, err := http.NewRequestWithContext(ctx, "GET", u, nil) + if err != nil { + return nil, err + } + + client := &http.Client{Timeout: 5 * time.Second} + resp, err := client.Do(req) + if err != nil { + p.store.SetError("prometheus", err) + return nil, err + } + defer resp.Body.Close() + + var pr promResponse + if err := json.NewDecoder(resp.Body).Decode(&pr); err != nil { + return nil, err + } + + if pr.Status != "success" || len(pr.Data.Result) == 0 { + return nil, nil + } + + var valStr string + if err := json.Unmarshal(pr.Data.Result[0].Value[1], &valStr); err != nil { + return nil, err + } + + val, err := strconv.ParseFloat(valStr, 64) + if err != nil { + return nil, err + } + + return &val, nil +} diff --git a/pkg/lab/collector/services.go b/pkg/lab/collector/services.go new file mode 100644 index 00000000..63d96589 --- /dev/null +++ b/pkg/lab/collector/services.go @@ -0,0 +1,107 @@ +package collector + +import ( + "context" + "net/http" + "time" + + "forge.lthn.ai/core/cli/pkg/lab" +) + +type Services struct { + store *lab.Store + services []lab.Service +} + +func NewServices(s *lab.Store) *Services { + return &Services{ + store: s, + services: []lab.Service{ + // Source Control + {Name: "Forgejo (primary)", URL: "https://forge.lthn.io", Category: "Source Control", Machine: "m3-ultra", Icon: "git"}, + {Name: "Forgejo (dev)", URL: "https://dev.lthn.io", Category: "Source Control", Machine: "snider-linux", Icon: "git"}, + {Name: "Forgejo (QA)", URL: "https://qa.lthn.io", Category: "Source Control", Machine: "gateway", Icon: "git"}, + {Name: "Forgejo (devops)", URL: "https://devops.lthn.io", Category: "Source Control", Machine: "gateway", Icon: "git"}, + {Name: "Forgejo Pages", URL: "https://host-uk.pages.lthn.io", Category: "Source Control", Machine: "snider-linux", Icon: "web"}, + + // CI/CD + {Name: "Woodpecker CI", URL: "https://ci.lthn.io", Category: "CI/CD", Machine: "snider-linux", Icon: "ci"}, + + // Monitoring + {Name: "Grafana", URL: "https://grafana.lthn.io", Category: "Monitoring", Machine: "snider-linux", Icon: "chart"}, + {Name: "Traefik Dashboard", URL: "https://traefik.lthn.io", Category: "Monitoring", Machine: "snider-linux", Icon: "route"}, + {Name: "Portainer", URL: "https://portainer.lthn.io", Category: "Monitoring", Machine: "snider-linux", Icon: "container"}, + {Name: "MantisBT", URL: "https://bugs.lthn.io", Category: "Monitoring", Machine: "snider-linux", Icon: "bug"}, + + // AI & Models + {Name: "Ollama API", URL: "https://ollama.lthn.io", Category: "AI", Machine: "snider-linux", Icon: "ai"}, + {Name: "AnythingLLM", URL: "https://anythingllm.lthn.io", Category: "AI", Machine: "snider-linux", Icon: "ai"}, + {Name: "Argilla", URL: "https://argilla.lthn.io", Category: "AI", Machine: "snider-linux", Icon: "data"}, + {Name: "Lab Helper API", URL: "http://10.69.69.108:9800", Category: "AI", Machine: "m3-ultra", Icon: "api"}, + {Name: "Lab Dashboard", URL: "https://lab.lthn.io", Category: "AI", Machine: "snider-linux", Icon: "web"}, + + // Media & Content + {Name: "Jellyfin", URL: "https://media.lthn.io", Category: "Media", Machine: "m3-ultra", Icon: "media"}, + {Name: "Immich Photos", URL: "https://photos.lthn.io", Category: "Media", Machine: "m3-ultra", Icon: "photo"}, + + // Social + {Name: "Mastodon", URL: "https://fedi.lthn.io", Category: "Social", Machine: "snider-linux", Icon: "social"}, + {Name: "Mixpost", URL: "https://social.lthn.io", Category: "Social", Machine: "snider-linux", Icon: "social"}, + + // i18n + {Name: "Weblate", URL: "https://i18n.lthn.io", Category: "Translation", Machine: "snider-linux", Icon: "i18n"}, + + // Infra + {Name: "dAppCo.re CDN", URL: "https://dappco.re", Category: "Infrastructure", Machine: "snider-linux", Icon: "cdn"}, + {Name: "lthn.ai Landing", URL: "https://lthn.ai", Category: "Infrastructure", Machine: "snider-linux", Icon: "web"}, + }, + } +} + +func (s *Services) Name() string { return "services" } + +func (s *Services) Collect(ctx context.Context) error { + client := &http.Client{ + Timeout: 5 * time.Second, + CheckRedirect: func(req *http.Request, via []*http.Request) error { + return http.ErrUseLastResponse // don't follow redirects + }, + } + + for i := range s.services { + s.services[i].Status = checkHealth(ctx, client, s.services[i].URL) + } + + result := make([]lab.Service, len(s.services)) + copy(result, s.services) + s.store.SetServices(result) + s.store.SetError("services", nil) + return nil +} + +func checkHealth(ctx context.Context, client *http.Client, url string) string { + // Try HEAD first, fall back to GET if HEAD fails. + req, err := http.NewRequestWithContext(ctx, "HEAD", url, nil) + if err != nil { + return "unavailable" + } + + resp, err := client.Do(req) + if err != nil { + // Retry with GET (some servers reject HEAD). + req2, _ := http.NewRequestWithContext(ctx, "GET", url, nil) + if req2 == nil { + return "unavailable" + } + resp, err = client.Do(req2) + if err != nil { + return "unavailable" + } + } + resp.Body.Close() + + if resp.StatusCode < 500 { + return "ok" + } + return "unavailable" +} diff --git a/pkg/lab/collector/system.go b/pkg/lab/collector/system.go new file mode 100644 index 00000000..170ead99 --- /dev/null +++ b/pkg/lab/collector/system.go @@ -0,0 +1,374 @@ +package collector + +import ( + "bufio" + "context" + "fmt" + "os" + "os/exec" + "path/filepath" + "runtime" + "strconv" + "strings" + "time" + + "forge.lthn.ai/core/cli/pkg/lab" +) + +type System struct { + store *lab.Store + cfg *lab.Config +} + +func NewSystem(cfg *lab.Config, s *lab.Store) *System { + return &System{store: s, cfg: cfg} +} + +func (s *System) Name() string { return "system" } + +func (s *System) Collect(ctx context.Context) error { + var machines []lab.Machine + + // Collect local machine stats. + local := s.collectLocal() + machines = append(machines, local) + + // Collect M3 Ultra stats via SSH. + if s.cfg.M3Host != "" { + m3 := s.collectM3(ctx) + machines = append(machines, m3) + } + + s.store.SetMachines(machines) + s.store.SetError("system", nil) + return nil +} + +// --------------------------------------------------------------------------- +// Local (snider-linux) +// --------------------------------------------------------------------------- + +// procPath returns the path to a proc file, preferring /host/proc (Docker mount) over /proc. +func procPath(name string) string { + hp := "/host/proc/" + name + if _, err := os.Stat(hp); err == nil { + return hp + } + return "/proc/" + name +} + +func (s *System) collectLocal() lab.Machine { + m := lab.Machine{ + Name: "snider-linux", + Host: "localhost", + Status: lab.StatusOK, + CPUCores: runtime.NumCPU(), + } + + // Load average + if data, err := os.ReadFile(procPath("loadavg")); err == nil { + fields := strings.Fields(string(data)) + if len(fields) > 0 { + m.Load1, _ = strconv.ParseFloat(fields[0], 64) + } + } + + // Memory from host /proc/meminfo + if f, err := os.Open(procPath("meminfo")); err == nil { + defer f.Close() + var memTotal, memAvail float64 + scanner := bufio.NewScanner(f) + for scanner.Scan() { + line := scanner.Text() + if strings.HasPrefix(line, "MemTotal:") { + memTotal = parseMemInfoKB(line) + } else if strings.HasPrefix(line, "MemAvailable:") { + memAvail = parseMemInfoKB(line) + } + } + if memTotal > 0 { + m.MemTotalGB = memTotal / 1024 / 1024 + m.MemUsedGB = (memTotal - memAvail) / 1024 / 1024 + m.MemUsedPct = (1.0 - memAvail/memTotal) * 100 + } + } + + // Disk — use host root mount if available + diskTarget := "/" + if _, err := os.Stat("/host/root"); err == nil { + diskTarget = "/host/root" + } + if out, err := exec.Command("df", "-BG", diskTarget).Output(); err == nil { + lines := strings.Split(strings.TrimSpace(string(out)), "\n") + if len(lines) >= 2 { + fields := strings.Fields(lines[1]) + if len(fields) >= 5 { + m.DiskTotalGB = parseGB(fields[1]) + m.DiskUsedGB = parseGB(fields[2]) + pct := strings.TrimSuffix(fields[4], "%") + m.DiskUsedPct, _ = strconv.ParseFloat(pct, 64) + } + } + } + + // GPU via sysfs (works inside Docker with /host/drm mount) + s.collectGPUSysfs(&m) + + // Uptime + if data, err := os.ReadFile(procPath("uptime")); err == nil { + fields := strings.Fields(string(data)) + if len(fields) > 0 { + if secs, err := strconv.ParseFloat(fields[0], 64); err == nil { + m.Uptime = formatDuration(time.Duration(secs * float64(time.Second))) + } + } + } + + return m +} + +func (s *System) collectGPUSysfs(m *lab.Machine) { + // Try sysfs paths: /host/sys (Docker mount of /sys) or /sys (native) + drmBase := "/host/sys/class/drm" + if _, err := os.Stat(drmBase); err != nil { + drmBase = "/sys/class/drm" + } + + // Find the discrete GPU (largest VRAM) — card0 may be integrated + gpuDev := "" + var bestTotal float64 + for _, card := range []string{"card0", "card1", "card2"} { + p := fmt.Sprintf("%s/%s/device/mem_info_vram_total", drmBase, card) + if data, err := os.ReadFile(p); err == nil { + val, _ := strconv.ParseFloat(strings.TrimSpace(string(data)), 64) + if val > bestTotal { + bestTotal = val + gpuDev = fmt.Sprintf("%s/%s/device", drmBase, card) + } + } + } + if gpuDev == "" { + return + } + + m.GPUName = "AMD Radeon RX 7800 XT" + m.GPUVRAMTotal = bestTotal / 1024 / 1024 / 1024 + + if data, err := os.ReadFile(gpuDev + "/mem_info_vram_used"); err == nil { + val, _ := strconv.ParseFloat(strings.TrimSpace(string(data)), 64) + m.GPUVRAMUsed = val / 1024 / 1024 / 1024 + } + if m.GPUVRAMTotal > 0 { + m.GPUVRAMPct = m.GPUVRAMUsed / m.GPUVRAMTotal * 100 + } + + // Temperature — find hwmon under the device + matches, _ := filepath.Glob(gpuDev + "/hwmon/hwmon*/temp1_input") + if len(matches) > 0 { + if data, err := os.ReadFile(matches[0]); err == nil { + val, _ := strconv.ParseFloat(strings.TrimSpace(string(data)), 64) + m.GPUTemp = int(val / 1000) // millidegrees to degrees + } + } +} + +// --------------------------------------------------------------------------- +// M3 Ultra (via SSH) +// --------------------------------------------------------------------------- + +func (s *System) collectM3(ctx context.Context) lab.Machine { + m := lab.Machine{ + Name: "m3-ultra", + Host: s.cfg.M3Host, + Status: lab.StatusUnavailable, + GPUName: "Apple M3 Ultra (80 cores)", + } + + cmd := exec.CommandContext(ctx, "ssh", + "-o", "ConnectTimeout=5", + "-o", "BatchMode=yes", + "-i", s.cfg.M3SSHKey, + fmt.Sprintf("%s@%s", s.cfg.M3User, s.cfg.M3Host), + "printf '===CPU===\\n'; sysctl -n hw.ncpu; sysctl -n vm.loadavg; printf '===MEM===\\n'; sysctl -n hw.memsize; vm_stat; printf '===DISK===\\n'; df -k /; printf '===UPTIME===\\n'; uptime", + ) + + out, err := cmd.Output() + if err != nil { + return m + } + + m.Status = lab.StatusOK + s.parseM3Output(&m, string(out)) + return m +} + +func (s *System) parseM3Output(m *lab.Machine, output string) { + sections := splitSections(output) + + // CPU + if cpu, ok := sections["CPU"]; ok { + lines := strings.Split(strings.TrimSpace(cpu), "\n") + if len(lines) >= 1 { + m.CPUCores, _ = strconv.Atoi(strings.TrimSpace(lines[0])) + } + if len(lines) >= 2 { + // "{ 8.22 4.56 4.00 }" + loadStr := strings.Trim(strings.TrimSpace(lines[1]), "{ }") + fields := strings.Fields(loadStr) + if len(fields) >= 1 { + m.Load1, _ = strconv.ParseFloat(fields[0], 64) + } + } + } + + // Memory + if mem, ok := sections["MEM"]; ok { + lines := strings.Split(strings.TrimSpace(mem), "\n") + if len(lines) >= 1 { + bytes, _ := strconv.ParseFloat(strings.TrimSpace(lines[0]), 64) + m.MemTotalGB = bytes / 1024 / 1024 / 1024 + } + // Parse vm_stat: page size 16384, look for free/active/inactive/wired/speculative/compressor + var pageSize float64 = 16384 + var free, active, inactive, speculative, wired, compressor float64 + for _, line := range lines[1:] { + if strings.Contains(line, "page size of") { + // "Mach Virtual Memory Statistics: (page size of 16384 bytes)" + for _, word := range strings.Fields(line) { + if v, err := strconv.ParseFloat(word, 64); err == nil && v > 1000 { + pageSize = v + break + } + } + } + val := parseVMStatLine(line) + switch { + case strings.HasPrefix(line, "Pages free:"): + free = val + case strings.HasPrefix(line, "Pages active:"): + active = val + case strings.HasPrefix(line, "Pages inactive:"): + inactive = val + case strings.HasPrefix(line, "Pages speculative:"): + speculative = val + case strings.HasPrefix(line, "Pages wired"): + wired = val + case strings.HasPrefix(line, "Pages occupied by compressor:"): + compressor = val + } + } + usedPages := active + wired + compressor + totalPages := free + active + inactive + speculative + wired + compressor + if totalPages > 0 && m.MemTotalGB > 0 { + m.MemUsedGB = usedPages * pageSize / 1024 / 1024 / 1024 + m.MemUsedPct = m.MemUsedGB / m.MemTotalGB * 100 + } + } + + // Disk + if disk, ok := sections["DISK"]; ok { + lines := strings.Split(strings.TrimSpace(disk), "\n") + if len(lines) >= 2 { + fields := strings.Fields(lines[1]) + if len(fields) >= 5 { + totalKB, _ := strconv.ParseFloat(fields[1], 64) + usedKB, _ := strconv.ParseFloat(fields[2], 64) + m.DiskTotalGB = totalKB / 1024 / 1024 + m.DiskUsedGB = usedKB / 1024 / 1024 + if m.DiskTotalGB > 0 { + m.DiskUsedPct = m.DiskUsedGB / m.DiskTotalGB * 100 + } + } + } + } + + // Uptime — "13:20 up 3 days, 1:09, 3 users, load averages: ..." + if up, ok := sections["UPTIME"]; ok { + line := strings.TrimSpace(up) + if idx := strings.Index(line, "up "); idx >= 0 { + rest := line[idx+3:] + // Split on ", " and take parts until we hit one containing "user" + parts := strings.Split(rest, ", ") + var uptimeParts []string + for _, p := range parts { + if strings.Contains(p, "user") || strings.Contains(p, "load") { + break + } + uptimeParts = append(uptimeParts, p) + } + m.Uptime = strings.TrimSpace(strings.Join(uptimeParts, ", ")) + } + } +} + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +func splitSections(output string) map[string]string { + sections := make(map[string]string) + var current string + var buf strings.Builder + for _, line := range strings.Split(output, "\n") { + if strings.HasPrefix(line, "===") && strings.HasSuffix(line, "===") { + if current != "" { + sections[current] = buf.String() + buf.Reset() + } + current = strings.Trim(line, "=") + } else if current != "" { + buf.WriteString(line) + buf.WriteByte('\n') + } + } + if current != "" { + sections[current] = buf.String() + } + return sections +} + +func parseVMStatLine(line string) float64 { + // "Pages free: 2266867." + parts := strings.SplitN(line, ":", 2) + if len(parts) < 2 { + return 0 + } + val := strings.TrimSpace(strings.TrimSuffix(strings.TrimSpace(parts[1]), ".")) + f, _ := strconv.ParseFloat(val, 64) + return f +} + +func parseMemInfoKB(line string) float64 { + fields := strings.Fields(line) + if len(fields) < 2 { + return 0 + } + v, _ := strconv.ParseFloat(fields[1], 64) + return v +} + +func parseGB(s string) float64 { + s = strings.TrimSuffix(s, "G") + v, _ := strconv.ParseFloat(s, 64) + return v +} + +func parseBytesGB(line string) float64 { + // "GPU[0] : VRAM Total Memory (B): 17163091968" + parts := strings.Split(line, ":") + if len(parts) < 3 { + return 0 + } + val := strings.TrimSpace(parts[len(parts)-1]) + bytes, _ := strconv.ParseFloat(val, 64) + return bytes / 1024 / 1024 / 1024 +} + +func formatDuration(d time.Duration) string { + days := int(d.Hours()) / 24 + hours := int(d.Hours()) % 24 + if days > 0 { + return fmt.Sprintf("%dd %dh", days, hours) + } + return fmt.Sprintf("%dh %dm", hours, int(d.Minutes())%60) +} diff --git a/pkg/lab/collector/training.go b/pkg/lab/collector/training.go new file mode 100644 index 00000000..983d5ff9 --- /dev/null +++ b/pkg/lab/collector/training.go @@ -0,0 +1,123 @@ +package collector + +import ( + "bufio" + "context" + "encoding/json" + "net/http" + "os" + "path/filepath" + "time" + + "forge.lthn.ai/core/cli/pkg/lab" +) + +type Training struct { + cfg *lab.Config + store *lab.Store +} + +func NewTraining(cfg *lab.Config, s *lab.Store) *Training { + return &Training{cfg: cfg, store: s} +} + +func (t *Training) Name() string { return "training" } + +func (t *Training) Collect(ctx context.Context) error { + summary := lab.TrainingSummary{ + GoldTarget: 15000, + } + + // Fetch from M3 lab-helper API + if t.cfg.M3APIURL != "" { + t.fetchM3API(ctx, &summary) + } + + // Parse local intercept JSONL files + interceptDir := t.cfg.TrainingDataDir + if interceptDir != "" { + count, lastTime := countJSONLFiles(filepath.Join(interceptDir, "command-intercepts")) + summary.InterceptCount = count + summary.LastIntercept = lastTime + } + + // Count QA sessions + sessDir := filepath.Join(t.cfg.TrainingDataDir, "qa-epic-verification", "sessions") + if entries, err := os.ReadDir(sessDir); err == nil { + summary.SessionCount = len(entries) + } + + t.store.SetTraining(summary) + t.store.SetError("training", nil) + return nil +} + +type m3TrainingResponse struct { + GoldGenerated int `json:"gold_generated"` + GoldTarget int `json:"gold_target"` + GoldPercent float64 `json:"gold_percent"` + SeedsComplete int `json:"seeds_complete"` + GGUFCount int `json:"gguf_count"` + GGUFFiles []string `json:"gguf_files"` + AdapterCount int `json:"adapter_count"` +} + +func (t *Training) fetchM3API(ctx context.Context, summary *lab.TrainingSummary) { + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + req, err := http.NewRequestWithContext(ctx, "GET", t.cfg.M3APIURL+"/api/training", nil) + if err != nil { + return + } + + resp, err := http.DefaultClient.Do(req) + if err != nil { + t.store.SetError("m3-api", err) + return + } + defer resp.Body.Close() + + var data m3TrainingResponse + if err := json.NewDecoder(resp.Body).Decode(&data); err != nil { + return + } + + summary.GoldGenerated = data.GoldGenerated + summary.GoldAvailable = true + summary.GoldPercent = data.GoldPercent + summary.GGUFCount = data.GGUFCount + summary.GGUFFiles = data.GGUFFiles + summary.AdapterCount = data.AdapterCount + t.store.SetError("m3-api", nil) +} + +func countJSONLFiles(dir string) (int, time.Time) { + var total int + var lastTime time.Time + + files, err := filepath.Glob(filepath.Join(dir, "*.jsonl")) + if err != nil { + return 0, lastTime + } + + for _, f := range files { + file, err := os.Open(f) + if err != nil { + continue + } + scanner := bufio.NewScanner(file) + for scanner.Scan() { + total++ + var ev struct { + Timestamp time.Time `json:"timestamp"` + } + if json.Unmarshal(scanner.Bytes(), &ev) == nil && ev.Timestamp.After(lastTime) { + lastTime = ev.Timestamp + } + } + file.Close() + } + + return total, lastTime +} diff --git a/pkg/lab/config.go b/pkg/lab/config.go new file mode 100644 index 00000000..4f3dcbfd --- /dev/null +++ b/pkg/lab/config.go @@ -0,0 +1,84 @@ +package lab + +import ( + "os" + "strconv" +) + +type Config struct { + Addr string + + PrometheusURL string + PrometheusInterval int + + ForgeURL string + ForgeToken string + ForgeInterval int + + HFAuthor string + HFInterval int + + M3Host string + M3User string + M3SSHKey string + M3APIURL string + M3Interval int + + TrainingDataDir string + TrainingInterval int + + DockerInterval int + + InfluxURL string + InfluxToken string + InfluxDB string + InfluxInterval int +} + +func LoadConfig() *Config { + return &Config{ + Addr: env("ADDR", ":8080"), + + PrometheusURL: env("PROMETHEUS_URL", "http://prometheus:9090"), + PrometheusInterval: envInt("PROMETHEUS_INTERVAL", 15), + + ForgeURL: env("FORGE_URL", "https://forge.lthn.io"), + ForgeToken: env("FORGE_TOKEN", ""), + ForgeInterval: envInt("FORGE_INTERVAL", 60), + + HFAuthor: env("HF_AUTHOR", "lthn"), + HFInterval: envInt("HF_INTERVAL", 300), + + M3Host: env("M3_HOST", "10.69.69.108"), + M3User: env("M3_USER", "claude"), + M3SSHKey: env("M3_SSH_KEY", "/root/.ssh/id_ed25519"), + M3APIURL: env("M3_API_URL", "http://10.69.69.108:9800"), + M3Interval: envInt("M3_INTERVAL", 30), + + TrainingDataDir: env("TRAINING_DATA_DIR", "/data/training"), + TrainingInterval: envInt("TRAINING_INTERVAL", 60), + + DockerInterval: envInt("DOCKER_INTERVAL", 30), + + InfluxURL: env("INFLUX_URL", "http://localhost:8181"), + InfluxToken: env("INFLUX_TOKEN", ""), + InfluxDB: env("INFLUX_DB", "training"), + InfluxInterval: envInt("INFLUX_INTERVAL", 60), + } +} + +func env(key, fallback string) string { + if v := os.Getenv(key); v != "" { + return v + } + return fallback +} + +func envInt(key string, fallback int) int { + if v := os.Getenv(key); v != "" { + if n, err := strconv.Atoi(v); err == nil { + return n + } + } + return fallback +} diff --git a/pkg/lab/handler/api.go b/pkg/lab/handler/api.go new file mode 100644 index 00000000..f4ea9b24 --- /dev/null +++ b/pkg/lab/handler/api.go @@ -0,0 +1,65 @@ +package handler + +import ( + "encoding/json" + "net/http" + "time" + + "forge.lthn.ai/core/cli/pkg/lab" +) + +type APIHandler struct { + store *lab.Store +} + +func NewAPIHandler(s *lab.Store) *APIHandler { + return &APIHandler{store: s} +} + +type apiResponse struct { + Status string `json:"status"` + UpdatedAt time.Time `json:"updated_at"` + Data any `json:"data"` +} + +func (h *APIHandler) writeJSON(w http.ResponseWriter, data any) { + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(apiResponse{ + Status: "ok", + UpdatedAt: time.Now(), + Data: data, + }) +} + +func (h *APIHandler) Status(w http.ResponseWriter, r *http.Request) { + h.writeJSON(w, h.store.Overview()) +} + +func (h *APIHandler) Models(w http.ResponseWriter, r *http.Request) { + h.writeJSON(w, h.store.GetModels()) +} + +func (h *APIHandler) Training(w http.ResponseWriter, r *http.Request) { + h.writeJSON(w, h.store.GetTraining()) +} + +func (h *APIHandler) Agents(w http.ResponseWriter, r *http.Request) { + h.writeJSON(w, h.store.GetAgents()) +} + +func (h *APIHandler) Services(w http.ResponseWriter, r *http.Request) { + h.writeJSON(w, h.store.GetServices()) +} + +func (h *APIHandler) GoldenSet(w http.ResponseWriter, r *http.Request) { + h.writeJSON(w, h.store.GetGoldenSet()) +} + +func (h *APIHandler) Runs(w http.ResponseWriter, r *http.Request) { + h.writeJSON(w, h.store.GetBenchmarks()) +} + +func (h *APIHandler) Health(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(map[string]string{"status": "ok"}) +} diff --git a/pkg/lab/handler/chart.go b/pkg/lab/handler/chart.go new file mode 100644 index 00000000..a60f4d6f --- /dev/null +++ b/pkg/lab/handler/chart.go @@ -0,0 +1,626 @@ +package handler + +import ( + "fmt" + "html/template" + "math" + "sort" + "strings" + + "forge.lthn.ai/core/cli/pkg/lab" +) + +const ( + chartW = 760 + chartH = 280 + marginTop = 25 + marginRight = 20 + marginBot = 35 + marginLeft = 55 + plotW = chartW - marginLeft - marginRight + plotH = chartH - marginTop - marginBot +) + +var dimensionColors = map[string]string{ + "ccp_compliance": "#f87171", + "truth_telling": "#4ade80", + "engagement": "#fbbf24", + "axiom_integration": "#60a5fa", + "sovereignty_reasoning": "#c084fc", + "emotional_register": "#fb923c", +} + +func getDimColor(dim string) string { + if c, ok := dimensionColors[dim]; ok { + return c + } + return "#8888a0" +} + +// LossChart generates an SVG line chart for training loss data. +func LossChart(points []lab.LossPoint) template.HTML { + if len(points) == 0 { + return template.HTML(`
| Run | `) + for _, cat := range cats { + icon := catIcon(cat) + sb.WriteString(fmt.Sprintf(``, cat, icon)) + } + sb.WriteString(` | ||
|---|---|---|---|
%s | `, short))
+ for _, cat := range cats {
+ jc, jok := judgeCells[key{cat, l}]
+ bc, bok := binaryCells[key{cat, l}]
+
+ if hasJudge && jok && jc.count > 0 {
+ // Show judge score (0-10 average).
+ avg := jc.sum / float64(jc.count)
+ color := "var(--red)"
+ if avg >= 7.0 {
+ color = "var(--green)"
+ } else if avg >= 4.0 {
+ color = "var(--yellow)"
+ }
+ passInfo := ""
+ if bok {
+ passInfo = fmt.Sprintf(" (%d/%d pass)", bc.correct, bc.total)
+ }
+ sb.WriteString(fmt.Sprintf(`%.1f | `, + color, cat, avg, passInfo, avg)) + } else if bok { + // Fall back to binary. + icon := "fa-circle-xmark" + color := "var(--red)" + if bc.accuracy >= 80 { + icon = "fa-circle-check" + color = "var(--green)" + } else if bc.accuracy >= 50 { + icon = "fa-triangle-exclamation" + color = "var(--yellow)" + } + sb.WriteString(fmt.Sprintf(`%d/%d | `, + color, cat, bc.correct, bc.total, bc.accuracy, icon, bc.correct, bc.total)) + } else { + sb.WriteString(``) + } + } + sb.WriteString(` |
Agent metrics not available. The Prometheus agent exporter may be offline.
+Expected at: localhost:9402/metrics
| Repo | Message | Author | Time |
|---|---|---|---|
{{.Repo}} |
+ {{shortMsg .Message}} | +{{.Author}} | +{{timeAgo .Timestamp}} | +
| Table | Rows | Size |
|---|---|---|
{{.Name}} |
+ {{fmtInt .Rows}} | ++ + | +
No golden set data available.
| Worker | Generations |
|---|---|
{{.Worker}} |
+ {{fmtInt .Count}} | +
dataset_statsSeed browser coming soon. Use lem export --seeds to explore locally.
| Domain | Count | Avg Gen Time | Coverage |
|---|---|---|---|
{{.Domain}} |
+ {{.Count}} | +{{pct .AvgGenTime}}s | ++ + | +
No domain data available.
| Voice | Count | Avg Chars | Avg Gen Time |
|---|---|---|---|
{{.Voice}} |
+ {{.Count}} | +{{pct .AvgChars}} | +{{pct .AvgGenTime}}s | +
No voice data available.
dataset_statsExpansion pipeline: use lem expand to generate responses from trained models, then lem score to filter by quality.
Export formats:
+| Format | Command | Use |
|---|---|---|
JSONL (MLX) |
+ lem export --format jsonl |
+ MLX LoRA training (train/valid/test splits) | +
Parquet |
+ lem export --format parquet |
+ HuggingFace dataset upload | +
CSV |
+ lem export --format csv |
+ Spreadsheet analysis | +
pipeline.py metrics to push stats to InfluxDB.| Worker | Generations |
|---|---|
{{.Worker}} |
+ {{.Count}} | +
| Domain | Count | Avg Gen Time | Coverage |
|---|---|---|---|
{{.Domain}} |
+ {{.Count}} | +{{pct .AvgGenTime}}s | ++ + | +
| Voice | Count | Avg Chars | Avg Gen Time |
|---|---|---|---|
{{.Voice}} |
+ {{.Count}} | +{{pct .AvgChars}} | +{{pct .AvgGenTime}}s | +
| Model | Downloads | Likes | Pipeline | Updated |
|---|---|---|---|---|
| {{.ModelID}} | +{{.Downloads}} | +{{.Likes}} | +{{if .PipelineTag}}{{.PipelineTag}}{{else}}-{{end}} | +{{timeAgo .LastModified}} | +
No models loaded yet. HuggingFace data refreshes every 5 minutes.
+No benchmark data available. InfluxDB data refreshes every 60 seconds.
+No training or benchmark data. InfluxDB refreshes every 60 seconds.
+No data for this model yet.