feat(store): add zstd archive support
Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
parent
d9fad2d6be
commit
d983760445
4 changed files with 74 additions and 8 deletions
26
compact.go
26
compact.go
|
|
@ -6,6 +6,7 @@ import (
|
|||
"time"
|
||||
|
||||
core "dappco.re/go/core"
|
||||
"github.com/klauspost/compress/zstd"
|
||||
)
|
||||
|
||||
var defaultArchiveOutputDirectory = ".core/archive"
|
||||
|
|
@ -42,7 +43,7 @@ func (storeInstance *Store) Compact(options CompactOptions) core.Result {
|
|||
if format == "" {
|
||||
format = "gzip"
|
||||
}
|
||||
if format != "gzip" {
|
||||
if format != "gzip" && format != "zstd" {
|
||||
return core.Result{Value: core.E("store.Compact", core.Concat("unsupported archive format: ", format), nil), OK: false}
|
||||
}
|
||||
|
||||
|
|
@ -99,7 +100,10 @@ func (storeInstance *Store) Compact(options CompactOptions) core.Result {
|
|||
}
|
||||
}()
|
||||
|
||||
writer := gzip.NewWriter(file)
|
||||
writer, err := archiveWriter(file, format)
|
||||
if err != nil {
|
||||
return core.Result{Value: err, OK: false}
|
||||
}
|
||||
writeOK := false
|
||||
defer func() {
|
||||
if !writeOK {
|
||||
|
|
@ -181,11 +185,29 @@ func compactArchiveLine(entry compactArchiveEntry) (map[string]any, error) {
|
|||
}, nil
|
||||
}
|
||||
|
||||
func archiveWriter(writer io.Writer, format string) (io.WriteCloser, error) {
|
||||
switch format {
|
||||
case "gzip":
|
||||
return gzip.NewWriter(writer), nil
|
||||
case "zstd":
|
||||
zstdWriter, err := zstd.NewWriter(writer)
|
||||
if err != nil {
|
||||
return nil, core.E("store.Compact", "create zstd writer", err)
|
||||
}
|
||||
return zstdWriter, nil
|
||||
default:
|
||||
return nil, core.E("store.Compact", core.Concat("unsupported archive format: ", format), nil)
|
||||
}
|
||||
}
|
||||
|
||||
func compactOutputPath(outputDirectory, format string) string {
|
||||
extension := ".jsonl"
|
||||
if format == "gzip" {
|
||||
extension = ".jsonl.gz"
|
||||
}
|
||||
if format == "zstd" {
|
||||
extension = ".jsonl.zst"
|
||||
}
|
||||
filename := core.Concat("journal-", time.Now().UTC().Format("20060102-150405"), extension)
|
||||
return joinPath(outputDirectory, filename)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -8,6 +8,7 @@ import (
|
|||
"time"
|
||||
|
||||
core "dappco.re/go/core"
|
||||
"github.com/klauspost/compress/zstd"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
|
@ -64,6 +65,52 @@ func TestCompact_Compact_Good_GzipArchive(t *testing.T) {
|
|||
assert.Equal(t, "session-b", remainingRows[0]["measurement"])
|
||||
}
|
||||
|
||||
func TestCompact_Compact_Good_ZstdArchive(t *testing.T) {
|
||||
outputDirectory := useArchiveOutputDirectory(t)
|
||||
|
||||
storeInstance, err := New(":memory:", WithJournal("http://127.0.0.1:8086", "core", "events"))
|
||||
require.NoError(t, err)
|
||||
defer storeInstance.Close()
|
||||
|
||||
require.True(t,
|
||||
storeInstance.CommitToJournal("session-a", map[string]any{"like": 1}, map[string]string{"workspace": "session-a"}).OK,
|
||||
)
|
||||
|
||||
_, err = storeInstance.database.Exec(
|
||||
"UPDATE "+journalEntriesTableName+" SET committed_at = ? WHERE measurement = ?",
|
||||
time.Now().Add(-48*time.Hour).UnixMilli(),
|
||||
"session-a",
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
result := storeInstance.Compact(CompactOptions{
|
||||
Before: time.Now().Add(-24 * time.Hour),
|
||||
Output: outputDirectory,
|
||||
Format: "zstd",
|
||||
})
|
||||
require.True(t, result.OK, "compact failed: %v", result.Value)
|
||||
|
||||
archivePath, ok := result.Value.(string)
|
||||
require.True(t, ok, "unexpected archive path type: %T", result.Value)
|
||||
assert.True(t, testFilesystem().Exists(archivePath))
|
||||
assert.Contains(t, archivePath, ".jsonl.zst")
|
||||
|
||||
archiveData := requireCoreReadBytes(t, archivePath)
|
||||
reader, err := zstd.NewReader(bytes.NewReader(archiveData))
|
||||
require.NoError(t, err)
|
||||
defer reader.Close()
|
||||
|
||||
decompressedData, err := io.ReadAll(reader)
|
||||
require.NoError(t, err)
|
||||
lines := core.Split(core.Trim(string(decompressedData)), "\n")
|
||||
require.Len(t, lines, 1)
|
||||
|
||||
archivedRow := make(map[string]any)
|
||||
unmarshalResult := core.JSONUnmarshalString(lines[0], &archivedRow)
|
||||
require.True(t, unmarshalResult.OK, "archive line unmarshal failed: %v", unmarshalResult.Value)
|
||||
assert.Equal(t, "session-a", archivedRow["measurement"])
|
||||
}
|
||||
|
||||
func TestCompact_Compact_Good_NoRows(t *testing.T) {
|
||||
outputDirectory := useArchiveOutputDirectory(t)
|
||||
|
||||
|
|
|
|||
1
go.mod
1
go.mod
|
|
@ -4,6 +4,7 @@ go 1.26.0
|
|||
|
||||
require (
|
||||
dappco.re/go/core v0.8.0-alpha.1
|
||||
github.com/klauspost/compress v1.18.5
|
||||
github.com/stretchr/testify v1.11.1
|
||||
modernc.org/sqlite v1.47.0
|
||||
)
|
||||
|
|
|
|||
8
go.sum
8
go.sum
|
|
@ -5,13 +5,14 @@ github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1
|
|||
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
|
||||
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
|
||||
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
|
||||
github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e h1:ijClszYn+mADRFY17kjQEVQ1XRhq2/JR1M3sGqeJoxs=
|
||||
github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e/go.mod h1:boTsfXsheKC2y+lKOCMpSfarhxDeIzfZG1jqGcPl3cA=
|
||||
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
||||
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k=
|
||||
github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
|
||||
github.com/klauspost/compress v1.18.5 h1:/h1gH5Ce+VWNLSWqPzOVn6XBO+vJbCNGvjoaGBFW2IE=
|
||||
github.com/klauspost/compress v1.18.5/go.mod h1:cwPg85FWrGar70rWktvGQj8/hthj3wpl0PGDogxkrSQ=
|
||||
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
|
||||
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
|
||||
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
|
||||
|
|
@ -26,20 +27,15 @@ github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94
|
|||
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
|
||||
github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ=
|
||||
github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc=
|
||||
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
|
||||
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
|
||||
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
|
||||
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
|
||||
golang.org/x/exp v0.0.0-20251023183803-a4bb9ffd2546/go.mod h1:j/pmGrbnkbPtQfxEe5D0VQhZC6qKbfKifgD0oM7sR70=
|
||||
golang.org/x/mod v0.34.0 h1:xIHgNUUnW6sYkcM5Jleh05DvLOtwc6RitGHbDk4akRI=
|
||||
golang.org/x/mod v0.34.0/go.mod h1:ykgH52iCZe79kzLLMhyCUzhMci+nQj+0XkbXpNYtVjY=
|
||||
golang.org/x/net v0.52.0/go.mod h1:R1MAz7uMZxVMualyPXb+VaqGSa3LIaUqk0eEt3w36Sw=
|
||||
golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4=
|
||||
golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0=
|
||||
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.42.0 h1:omrd2nAlyT5ESRdCLYdm3+fMfNFE/+Rf4bDIQImRJeo=
|
||||
golang.org/x/sys v0.42.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
|
||||
golang.org/x/telemetry v0.0.0-20260311193753-579e4da9a98c/go.mod h1:TpUTTEp9frx7rTdLpC9gFG9kdI7zVLFTFFlqaH2Cncw=
|
||||
golang.org/x/tools v0.43.0 h1:12BdW9CeB3Z+J/I/wj34VMl8X+fEXBxVR90JeMX5E7s=
|
||||
golang.org/x/tools v0.43.0/go.mod h1:uHkMso649BX2cZK6+RpuIPXS3ho2hZo4FVwfoy1vIk0=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue