1
0
Fork 0
forked from lthn/LEM

feat: add zstd compress/decompress helpers for cold storage

Add compressFileZstd, decompressZstd, and walkZstFiles helpers
using klauspost/compress. Promote zstd from indirect to direct dep.

Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
Snider 2026-02-28 12:18:19 +00:00
parent d998dd252b
commit 1c7194176a
2 changed files with 135 additions and 0 deletions

77
pkg/lem/zstd.go Normal file
View file

@ -0,0 +1,77 @@
// zstd.go — zstd compression/decompression for cold storage JSONL files.
package lem
import (
"fmt"
"io"
"os"
"path/filepath"
"strings"
"github.com/klauspost/compress/zstd"
)
// decompressZstd reads a .zst file and writes the decompressed content to dst.
func decompressZstd(src, dst string) error {
f, err := os.Open(src)
if err != nil {
return fmt.Errorf("open %s: %w", src, err)
}
defer f.Close()
dec, err := zstd.NewReader(f)
if err != nil {
return fmt.Errorf("zstd reader: %w", err)
}
defer dec.Close()
out, err := os.Create(dst)
if err != nil {
return fmt.Errorf("create %s: %w", dst, err)
}
defer out.Close()
if _, err := io.Copy(out, dec); err != nil {
return fmt.Errorf("decompress %s: %w", src, err)
}
return nil
}
// compressFileZstd compresses src to dst with zstd level 3.
func compressFileZstd(src, dst string) error {
f, err := os.Open(src)
if err != nil {
return fmt.Errorf("open %s: %w", src, err)
}
defer f.Close()
out, err := os.Create(dst)
if err != nil {
return fmt.Errorf("create %s: %w", dst, err)
}
defer out.Close()
enc, err := zstd.NewWriter(out, zstd.WithEncoderLevel(zstd.SpeedDefault))
if err != nil {
return fmt.Errorf("zstd writer: %w", err)
}
if _, err := io.Copy(enc, f); err != nil {
enc.Close()
return fmt.Errorf("compress %s: %w", src, err)
}
return enc.Close()
}
// walkZstFiles walks a directory tree and calls fn for each .jsonl.zst file.
func walkZstFiles(root string, fn func(zstPath string) error) error {
return filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if err != nil || info.IsDir() {
return err
}
if strings.HasSuffix(path, ".jsonl.zst") {
return fn(path)
}
return nil
})
}

58
pkg/lem/zstd_test.go Normal file
View file

@ -0,0 +1,58 @@
package lem
import (
"os"
"path/filepath"
"testing"
)
func TestDecompressZstd(t *testing.T) {
// Create a temp .jsonl file and compress it
dir := t.TempDir()
src := filepath.Join(dir, "test.jsonl")
os.WriteFile(src, []byte(`{"messages":[{"role":"user","content":"hello"}]}`+"\n"), 0644)
// Compress with Go zstd
zstPath := src + ".zst"
if err := compressFileZstd(src, zstPath); err != nil {
t.Fatal(err)
}
os.Remove(src) // remove original
// Decompress
outPath := filepath.Join(dir, "out.jsonl")
if err := decompressZstd(zstPath, outPath); err != nil {
t.Fatal(err)
}
data, err := os.ReadFile(outPath)
if err != nil {
t.Fatal(err)
}
if string(data) != `{"messages":[{"role":"user","content":"hello"}]}`+"\n" {
t.Fatalf("unexpected content: %q", string(data))
}
}
func TestWalkZstFiles(t *testing.T) {
dir := t.TempDir()
// Create a nested .jsonl.zst
sub := filepath.Join(dir, "sub")
os.MkdirAll(sub, 0755)
src := filepath.Join(sub, "data.jsonl")
os.WriteFile(src, []byte("line1\nline2\n"), 0644)
compressFileZstd(src, src+".zst")
os.Remove(src)
// Also create a .json file (should be ignored)
os.WriteFile(filepath.Join(sub, "probes.json"), []byte("[]"), 0644)
var found []string
walkZstFiles(dir, func(zstPath string) error {
found = append(found, zstPath)
return nil
})
if len(found) != 1 {
t.Fatalf("expected 1 .zst file, got %d: %v", len(found), found)
}
}