diff --git a/compact.go b/compact.go index 9af6b8c..a837831 100644 --- a/compact.go +++ b/compact.go @@ -6,6 +6,7 @@ import ( "time" core "dappco.re/go/core" + "github.com/klauspost/compress/zstd" ) var defaultArchiveOutputDirectory = ".core/archive" @@ -42,7 +43,7 @@ func (storeInstance *Store) Compact(options CompactOptions) core.Result { if format == "" { format = "gzip" } - if format != "gzip" { + if format != "gzip" && format != "zstd" { return core.Result{Value: core.E("store.Compact", core.Concat("unsupported archive format: ", format), nil), OK: false} } @@ -99,7 +100,10 @@ func (storeInstance *Store) Compact(options CompactOptions) core.Result { } }() - writer := gzip.NewWriter(file) + writer, err := archiveWriter(file, format) + if err != nil { + return core.Result{Value: err, OK: false} + } writeOK := false defer func() { if !writeOK { @@ -181,11 +185,29 @@ func compactArchiveLine(entry compactArchiveEntry) (map[string]any, error) { }, nil } +func archiveWriter(writer io.Writer, format string) (io.WriteCloser, error) { + switch format { + case "gzip": + return gzip.NewWriter(writer), nil + case "zstd": + zstdWriter, err := zstd.NewWriter(writer) + if err != nil { + return nil, core.E("store.Compact", "create zstd writer", err) + } + return zstdWriter, nil + default: + return nil, core.E("store.Compact", core.Concat("unsupported archive format: ", format), nil) + } +} + func compactOutputPath(outputDirectory, format string) string { extension := ".jsonl" if format == "gzip" { extension = ".jsonl.gz" } + if format == "zstd" { + extension = ".jsonl.zst" + } filename := core.Concat("journal-", time.Now().UTC().Format("20060102-150405"), extension) return joinPath(outputDirectory, filename) } diff --git a/compact_test.go b/compact_test.go index 0cb1889..978998c 100644 --- a/compact_test.go +++ b/compact_test.go @@ -8,6 +8,7 @@ import ( "time" core "dappco.re/go/core" + "github.com/klauspost/compress/zstd" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -64,6 +65,52 @@ func TestCompact_Compact_Good_GzipArchive(t *testing.T) { assert.Equal(t, "session-b", remainingRows[0]["measurement"]) } +func TestCompact_Compact_Good_ZstdArchive(t *testing.T) { + outputDirectory := useArchiveOutputDirectory(t) + + storeInstance, err := New(":memory:", WithJournal("http://127.0.0.1:8086", "core", "events")) + require.NoError(t, err) + defer storeInstance.Close() + + require.True(t, + storeInstance.CommitToJournal("session-a", map[string]any{"like": 1}, map[string]string{"workspace": "session-a"}).OK, + ) + + _, err = storeInstance.database.Exec( + "UPDATE "+journalEntriesTableName+" SET committed_at = ? WHERE measurement = ?", + time.Now().Add(-48*time.Hour).UnixMilli(), + "session-a", + ) + require.NoError(t, err) + + result := storeInstance.Compact(CompactOptions{ + Before: time.Now().Add(-24 * time.Hour), + Output: outputDirectory, + Format: "zstd", + }) + require.True(t, result.OK, "compact failed: %v", result.Value) + + archivePath, ok := result.Value.(string) + require.True(t, ok, "unexpected archive path type: %T", result.Value) + assert.True(t, testFilesystem().Exists(archivePath)) + assert.Contains(t, archivePath, ".jsonl.zst") + + archiveData := requireCoreReadBytes(t, archivePath) + reader, err := zstd.NewReader(bytes.NewReader(archiveData)) + require.NoError(t, err) + defer reader.Close() + + decompressedData, err := io.ReadAll(reader) + require.NoError(t, err) + lines := core.Split(core.Trim(string(decompressedData)), "\n") + require.Len(t, lines, 1) + + archivedRow := make(map[string]any) + unmarshalResult := core.JSONUnmarshalString(lines[0], &archivedRow) + require.True(t, unmarshalResult.OK, "archive line unmarshal failed: %v", unmarshalResult.Value) + assert.Equal(t, "session-a", archivedRow["measurement"]) +} + func TestCompact_Compact_Good_NoRows(t *testing.T) { outputDirectory := useArchiveOutputDirectory(t) diff --git a/go.mod b/go.mod index c9e5c0c..befc5c4 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.26.0 require ( dappco.re/go/core v0.8.0-alpha.1 + github.com/klauspost/compress v1.18.5 github.com/stretchr/testify v1.11.1 modernc.org/sqlite v1.47.0 ) diff --git a/go.sum b/go.sum index a347d7d..731c6e5 100644 --- a/go.sum +++ b/go.sum @@ -5,13 +5,14 @@ github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1 github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= -github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e h1:ijClszYn+mADRFY17kjQEVQ1XRhq2/JR1M3sGqeJoxs= github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e/go.mod h1:boTsfXsheKC2y+lKOCMpSfarhxDeIzfZG1jqGcPl3cA= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= +github.com/klauspost/compress v1.18.5 h1:/h1gH5Ce+VWNLSWqPzOVn6XBO+vJbCNGvjoaGBFW2IE= +github.com/klauspost/compress v1.18.5/go.mod h1:cwPg85FWrGar70rWktvGQj8/hthj3wpl0PGDogxkrSQ= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -26,20 +27,15 @@ github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94 github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= -github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= -github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= -golang.org/x/exp v0.0.0-20251023183803-a4bb9ffd2546/go.mod h1:j/pmGrbnkbPtQfxEe5D0VQhZC6qKbfKifgD0oM7sR70= golang.org/x/mod v0.34.0 h1:xIHgNUUnW6sYkcM5Jleh05DvLOtwc6RitGHbDk4akRI= golang.org/x/mod v0.34.0/go.mod h1:ykgH52iCZe79kzLLMhyCUzhMci+nQj+0XkbXpNYtVjY= -golang.org/x/net v0.52.0/go.mod h1:R1MAz7uMZxVMualyPXb+VaqGSa3LIaUqk0eEt3w36Sw= golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4= golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.42.0 h1:omrd2nAlyT5ESRdCLYdm3+fMfNFE/+Rf4bDIQImRJeo= golang.org/x/sys v0.42.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= -golang.org/x/telemetry v0.0.0-20260311193753-579e4da9a98c/go.mod h1:TpUTTEp9frx7rTdLpC9gFG9kdI7zVLFTFFlqaH2Cncw= golang.org/x/tools v0.43.0 h1:12BdW9CeB3Z+J/I/wj34VMl8X+fEXBxVR90JeMX5E7s= golang.org/x/tools v0.43.0/go.mod h1:uHkMso649BX2cZK6+RpuIPXS3ho2hZo4FVwfoy1vIk0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=