LEM/pkg/lem/export_cold.go
Snider 1b570b8229 feat: add 'lem data export-cold' for warm DuckDB -> cold JSONL.zst export
Export distill_results from DuckDB back to compressed JSONL.zst files,
completing the cold -> warm -> cold round-trip data pipeline.

Co-Authored-By: Virgil <virgil@lethean.io>
2026-02-28 12:21:36 +00:00

107 lines
2.7 KiB
Go

// export_cold.go — Export warm DuckDB data back to cold JSONL.zst files.
package lem
import (
"encoding/json"
"fmt"
"log"
"os"
"path/filepath"
)
// ExportColdOpts holds configuration for the data export-cold command.
type ExportColdOpts struct {
DB string // DuckDB path
Model string // Filter by model (empty = all)
OutputDir string // Output directory
Compress bool // Compress to .zst (default true)
}
// RunExportCold exports distill results from DuckDB to JSONL files, optionally compressed.
// Returns the total number of examples exported.
func RunExportCold(cfg ExportColdOpts) (int, error) {
dbPath := cfg.DB
if dbPath == "" {
dbPath = os.Getenv("LEM_DB")
}
if dbPath == "" {
return 0, fmt.Errorf("--db or LEM_DB required")
}
db, err := OpenDB(dbPath)
if err != nil {
return 0, fmt.Errorf("open db: %w", err)
}
defer db.Close()
outDir := cfg.OutputDir
if outDir == "" {
outDir = "."
}
os.MkdirAll(outDir, 0755)
// Group results by source_file
query := "SELECT probe_id, model, phase, prompt, response, source_file FROM distill_results"
var args []any
if cfg.Model != "" {
query += " WHERE model = ?"
args = append(args, cfg.Model)
}
query += " ORDER BY source_file, probe_id"
rows, err := db.QueryRows(query, args...)
if err != nil {
return 0, fmt.Errorf("query distill_results: %w", err)
}
// Group by source_file
grouped := make(map[string][]map[string]any)
for _, row := range rows {
sf := fmt.Sprintf("%v", row["source_file"])
grouped[sf] = append(grouped[sf], row)
}
total := 0
for sourceFile, records := range grouped {
jsonlName := sourceFile
if jsonlName == "" {
jsonlName = "export.jsonl"
}
jsonlPath := filepath.Join(outDir, jsonlName)
os.MkdirAll(filepath.Dir(jsonlPath), 0755)
f, err := os.Create(jsonlPath)
if err != nil {
log.Printf(" WARNING: create %s: %v", jsonlPath, err)
continue
}
for _, row := range records {
example := TrainingExample{
Messages: []ChatMessage{
{Role: "user", Content: fmt.Sprintf("%v", row["prompt"])},
{Role: "assistant", Content: fmt.Sprintf("%v", row["response"])},
},
}
line, _ := json.Marshal(example)
f.Write(append(line, '\n'))
total++
}
f.Close()
if cfg.Compress {
zstPath := jsonlPath + ".zst"
if err := compressFileZstd(jsonlPath, zstPath); err != nil {
log.Printf(" WARNING: compress %s: %v", jsonlPath, err)
continue
}
os.Remove(jsonlPath) // remove uncompressed
log.Printf(" %s -> %s (%d examples)", sourceFile, filepath.Base(zstPath), len(records))
} else {
log.Printf(" %s (%d examples)", jsonlPath, len(records))
}
}
fmt.Printf("Exported %d examples from %d files\n", total, len(grouped))
return total, nil
}