From 1c7194176aab9aaff20d526afb045a95331c50d7 Mon Sep 17 00:00:00 2001 From: Snider Date: Sat, 28 Feb 2026 12:18:19 +0000 Subject: [PATCH] feat: add zstd compress/decompress helpers for cold storage Add compressFileZstd, decompressZstd, and walkZstFiles helpers using klauspost/compress. Promote zstd from indirect to direct dep. Co-Authored-By: Virgil --- pkg/lem/zstd.go | 77 ++++++++++++++++++++++++++++++++++++++++++++ pkg/lem/zstd_test.go | 58 +++++++++++++++++++++++++++++++++ 2 files changed, 135 insertions(+) create mode 100644 pkg/lem/zstd.go create mode 100644 pkg/lem/zstd_test.go diff --git a/pkg/lem/zstd.go b/pkg/lem/zstd.go new file mode 100644 index 0000000..30eaced --- /dev/null +++ b/pkg/lem/zstd.go @@ -0,0 +1,77 @@ +// zstd.go — zstd compression/decompression for cold storage JSONL files. +package lem + +import ( + "fmt" + "io" + "os" + "path/filepath" + "strings" + + "github.com/klauspost/compress/zstd" +) + +// decompressZstd reads a .zst file and writes the decompressed content to dst. +func decompressZstd(src, dst string) error { + f, err := os.Open(src) + if err != nil { + return fmt.Errorf("open %s: %w", src, err) + } + defer f.Close() + + dec, err := zstd.NewReader(f) + if err != nil { + return fmt.Errorf("zstd reader: %w", err) + } + defer dec.Close() + + out, err := os.Create(dst) + if err != nil { + return fmt.Errorf("create %s: %w", dst, err) + } + defer out.Close() + + if _, err := io.Copy(out, dec); err != nil { + return fmt.Errorf("decompress %s: %w", src, err) + } + return nil +} + +// compressFileZstd compresses src to dst with zstd level 3. +func compressFileZstd(src, dst string) error { + f, err := os.Open(src) + if err != nil { + return fmt.Errorf("open %s: %w", src, err) + } + defer f.Close() + + out, err := os.Create(dst) + if err != nil { + return fmt.Errorf("create %s: %w", dst, err) + } + defer out.Close() + + enc, err := zstd.NewWriter(out, zstd.WithEncoderLevel(zstd.SpeedDefault)) + if err != nil { + return fmt.Errorf("zstd writer: %w", err) + } + + if _, err := io.Copy(enc, f); err != nil { + enc.Close() + return fmt.Errorf("compress %s: %w", src, err) + } + return enc.Close() +} + +// walkZstFiles walks a directory tree and calls fn for each .jsonl.zst file. +func walkZstFiles(root string, fn func(zstPath string) error) error { + return filepath.Walk(root, func(path string, info os.FileInfo, err error) error { + if err != nil || info.IsDir() { + return err + } + if strings.HasSuffix(path, ".jsonl.zst") { + return fn(path) + } + return nil + }) +} diff --git a/pkg/lem/zstd_test.go b/pkg/lem/zstd_test.go new file mode 100644 index 0000000..13a899c --- /dev/null +++ b/pkg/lem/zstd_test.go @@ -0,0 +1,58 @@ +package lem + +import ( + "os" + "path/filepath" + "testing" +) + +func TestDecompressZstd(t *testing.T) { + // Create a temp .jsonl file and compress it + dir := t.TempDir() + src := filepath.Join(dir, "test.jsonl") + os.WriteFile(src, []byte(`{"messages":[{"role":"user","content":"hello"}]}`+"\n"), 0644) + + // Compress with Go zstd + zstPath := src + ".zst" + if err := compressFileZstd(src, zstPath); err != nil { + t.Fatal(err) + } + os.Remove(src) // remove original + + // Decompress + outPath := filepath.Join(dir, "out.jsonl") + if err := decompressZstd(zstPath, outPath); err != nil { + t.Fatal(err) + } + + data, err := os.ReadFile(outPath) + if err != nil { + t.Fatal(err) + } + if string(data) != `{"messages":[{"role":"user","content":"hello"}]}`+"\n" { + t.Fatalf("unexpected content: %q", string(data)) + } +} + +func TestWalkZstFiles(t *testing.T) { + dir := t.TempDir() + // Create a nested .jsonl.zst + sub := filepath.Join(dir, "sub") + os.MkdirAll(sub, 0755) + src := filepath.Join(sub, "data.jsonl") + os.WriteFile(src, []byte("line1\nline2\n"), 0644) + compressFileZstd(src, src+".zst") + os.Remove(src) + + // Also create a .json file (should be ignored) + os.WriteFile(filepath.Join(sub, "probes.json"), []byte("[]"), 0644) + + var found []string + walkZstFiles(dir, func(zstPath string) error { + found = append(found, zstPath) + return nil + }) + if len(found) != 1 { + t.Fatalf("expected 1 .zst file, got %d: %v", len(found), found) + } +}