Export distill_results from DuckDB back to compressed JSONL.zst files, completing the cold -> warm -> cold round-trip data pipeline. Co-Authored-By: Virgil <virgil@lethean.io>
107 lines
2.7 KiB
Go
107 lines
2.7 KiB
Go
// export_cold.go — Export warm DuckDB data back to cold JSONL.zst files.
|
|
package lem
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"log"
|
|
"os"
|
|
"path/filepath"
|
|
)
|
|
|
|
// ExportColdOpts holds configuration for the data export-cold command.
|
|
type ExportColdOpts struct {
|
|
DB string // DuckDB path
|
|
Model string // Filter by model (empty = all)
|
|
OutputDir string // Output directory
|
|
Compress bool // Compress to .zst (default true)
|
|
}
|
|
|
|
// RunExportCold exports distill results from DuckDB to JSONL files, optionally compressed.
|
|
// Returns the total number of examples exported.
|
|
func RunExportCold(cfg ExportColdOpts) (int, error) {
|
|
dbPath := cfg.DB
|
|
if dbPath == "" {
|
|
dbPath = os.Getenv("LEM_DB")
|
|
}
|
|
if dbPath == "" {
|
|
return 0, fmt.Errorf("--db or LEM_DB required")
|
|
}
|
|
|
|
db, err := OpenDB(dbPath)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("open db: %w", err)
|
|
}
|
|
defer db.Close()
|
|
|
|
outDir := cfg.OutputDir
|
|
if outDir == "" {
|
|
outDir = "."
|
|
}
|
|
os.MkdirAll(outDir, 0755)
|
|
|
|
// Group results by source_file
|
|
query := "SELECT probe_id, model, phase, prompt, response, source_file FROM distill_results"
|
|
var args []any
|
|
if cfg.Model != "" {
|
|
query += " WHERE model = ?"
|
|
args = append(args, cfg.Model)
|
|
}
|
|
query += " ORDER BY source_file, probe_id"
|
|
|
|
rows, err := db.QueryRows(query, args...)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("query distill_results: %w", err)
|
|
}
|
|
|
|
// Group by source_file
|
|
grouped := make(map[string][]map[string]any)
|
|
for _, row := range rows {
|
|
sf := fmt.Sprintf("%v", row["source_file"])
|
|
grouped[sf] = append(grouped[sf], row)
|
|
}
|
|
|
|
total := 0
|
|
for sourceFile, records := range grouped {
|
|
jsonlName := sourceFile
|
|
if jsonlName == "" {
|
|
jsonlName = "export.jsonl"
|
|
}
|
|
jsonlPath := filepath.Join(outDir, jsonlName)
|
|
os.MkdirAll(filepath.Dir(jsonlPath), 0755)
|
|
|
|
f, err := os.Create(jsonlPath)
|
|
if err != nil {
|
|
log.Printf(" WARNING: create %s: %v", jsonlPath, err)
|
|
continue
|
|
}
|
|
|
|
for _, row := range records {
|
|
example := TrainingExample{
|
|
Messages: []ChatMessage{
|
|
{Role: "user", Content: fmt.Sprintf("%v", row["prompt"])},
|
|
{Role: "assistant", Content: fmt.Sprintf("%v", row["response"])},
|
|
},
|
|
}
|
|
line, _ := json.Marshal(example)
|
|
f.Write(append(line, '\n'))
|
|
total++
|
|
}
|
|
f.Close()
|
|
|
|
if cfg.Compress {
|
|
zstPath := jsonlPath + ".zst"
|
|
if err := compressFileZstd(jsonlPath, zstPath); err != nil {
|
|
log.Printf(" WARNING: compress %s: %v", jsonlPath, err)
|
|
continue
|
|
}
|
|
os.Remove(jsonlPath) // remove uncompressed
|
|
log.Printf(" %s -> %s (%d examples)", sourceFile, filepath.Base(zstPath), len(records))
|
|
} else {
|
|
log.Printf(" %s (%d examples)", jsonlPath, len(records))
|
|
}
|
|
}
|
|
|
|
fmt.Printf("Exported %d examples from %d files\n", total, len(grouped))
|
|
return total, nil
|
|
}
|