2026-03-30 20:46:43 +00:00
package store
import (
"compress/gzip"
"io"
"time"
core "dappco.re/go/core"
2026-03-30 20:53:12 +00:00
"github.com/klauspost/compress/zstd"
2026-03-30 20:46:43 +00:00
)
2026-04-04 09:01:23 +00:00
var defaultArchiveOutputDirectory = ".core/archive/"
2026-03-30 20:46:43 +00:00
2026-04-04 12:05:00 +00:00
// CompactOptions archives completed journal rows before a cutoff time to a
// compressed JSONL file.
2026-04-04 11:30:13 +00:00
//
2026-03-30 20:46:43 +00:00
// Usage example: `options := store.CompactOptions{Before: time.Now().Add(-90 * 24 * time.Hour), Output: "/tmp/archive", Format: "gzip"}`
2026-04-04 13:25:19 +00:00
// The default output directory is `.core/archive/`; the default format is
// `gzip`, and `zstd` is also supported.
2026-03-30 20:46:43 +00:00
type CompactOptions struct {
2026-04-04 09:48:02 +00:00
// Usage example: `options := store.CompactOptions{Before: time.Now().Add(-90 * 24 * time.Hour)}`
2026-03-30 20:46:43 +00:00
Before time . Time
2026-04-04 09:48:02 +00:00
// Usage example: `options := store.CompactOptions{Output: "/tmp/archive"}`
2026-03-30 20:46:43 +00:00
Output string
2026-04-04 09:48:02 +00:00
// Usage example: `options := store.CompactOptions{Format: "zstd"}`
2026-03-30 20:46:43 +00:00
Format string
}
type compactArchiveEntry struct {
2026-04-04 11:40:29 +00:00
journalEntryID int64
journalBucketName string
journalMeasurementName string
journalFieldsJSON string
journalTagsJSON string
journalCommittedAtUnixMilli int64
2026-03-30 20:46:43 +00:00
}
// Usage example: `result := storeInstance.Compact(store.CompactOptions{Before: time.Now().Add(-30 * 24 * time.Hour), Output: "/tmp/archive", Format: "gzip"})`
func ( storeInstance * Store ) Compact ( options CompactOptions ) core . Result {
2026-04-03 06:31:35 +00:00
if err := storeInstance . ensureReady ( "store.Compact" ) ; err != nil {
return core . Result { Value : err , OK : false }
}
2026-04-04 09:04:56 +00:00
if err := ensureJournalSchema ( storeInstance . sqliteDatabase ) ; err != nil {
2026-03-30 20:46:43 +00:00
return core . Result { Value : core . E ( "store.Compact" , "ensure journal schema" , err ) , OK : false }
}
outputDirectory := options . Output
if outputDirectory == "" {
outputDirectory = defaultArchiveOutputDirectory
}
format := options . Format
if format == "" {
format = "gzip"
}
2026-03-30 20:53:12 +00:00
if format != "gzip" && format != "zstd" {
2026-03-30 20:46:43 +00:00
return core . Result { Value : core . E ( "store.Compact" , core . Concat ( "unsupported archive format: " , format ) , nil ) , OK : false }
}
filesystem := ( & core . Fs { } ) . NewUnrestricted ( )
if result := filesystem . EnsureDir ( outputDirectory ) ; ! result . OK {
return core . Result { Value : core . E ( "store.Compact" , "ensure archive directory" , result . Value . ( error ) ) , OK : false }
}
2026-04-04 09:04:56 +00:00
rows , err := storeInstance . sqliteDatabase . Query (
2026-04-04 09:54:03 +00:00
"SELECT entry_id, bucket_name, measurement, fields_json, tags_json, committed_at FROM " + journalEntriesTableName + " WHERE archived_at IS NULL AND committed_at < ? ORDER BY committed_at, entry_id" ,
2026-03-30 20:46:43 +00:00
options . Before . UnixMilli ( ) ,
)
if err != nil {
return core . Result { Value : core . E ( "store.Compact" , "query journal rows" , err ) , OK : false }
}
defer rows . Close ( )
var archiveEntries [ ] compactArchiveEntry
for rows . Next ( ) {
var entry compactArchiveEntry
if err := rows . Scan (
2026-04-04 11:40:29 +00:00
& entry . journalEntryID ,
& entry . journalBucketName ,
& entry . journalMeasurementName ,
& entry . journalFieldsJSON ,
& entry . journalTagsJSON ,
& entry . journalCommittedAtUnixMilli ,
2026-03-30 20:46:43 +00:00
) ; err != nil {
return core . Result { Value : core . E ( "store.Compact" , "scan journal row" , err ) , OK : false }
}
archiveEntries = append ( archiveEntries , entry )
}
if err := rows . Err ( ) ; err != nil {
return core . Result { Value : core . E ( "store.Compact" , "iterate journal rows" , err ) , OK : false }
}
if len ( archiveEntries ) == 0 {
return core . Result { Value : "" , OK : true }
}
outputPath := compactOutputPath ( outputDirectory , format )
2026-04-03 06:01:00 +00:00
archiveFileResult := filesystem . Create ( outputPath )
if ! archiveFileResult . OK {
return core . Result { Value : core . E ( "store.Compact" , "create archive file" , archiveFileResult . Value . ( error ) ) , OK : false }
2026-03-30 20:46:43 +00:00
}
2026-04-03 06:01:00 +00:00
file , ok := archiveFileResult . Value . ( io . WriteCloser )
2026-03-30 20:46:43 +00:00
if ! ok {
return core . Result { Value : core . E ( "store.Compact" , "archive file is not writable" , nil ) , OK : false }
}
fileClosed := false
defer func ( ) {
if ! fileClosed {
_ = file . Close ( )
}
} ( )
2026-03-30 20:53:12 +00:00
writer , err := archiveWriter ( file , format )
if err != nil {
return core . Result { Value : err , OK : false }
}
2026-03-30 20:46:43 +00:00
writeOK := false
defer func ( ) {
if ! writeOK {
_ = writer . Close ( )
}
} ( )
for _ , entry := range archiveEntries {
2026-04-04 10:33:25 +00:00
lineMap , err := archiveEntryLine ( entry )
2026-03-30 20:46:43 +00:00
if err != nil {
return core . Result { Value : err , OK : false }
}
2026-04-04 11:05:48 +00:00
lineJSON , err := marshalJSONText ( lineMap , "store.Compact" , "marshal archive line" )
2026-03-30 20:46:43 +00:00
if err != nil {
return core . Result { Value : err , OK : false }
}
if _ , err := io . WriteString ( writer , lineJSON + "\n" ) ; err != nil {
return core . Result { Value : core . E ( "store.Compact" , "write archive line" , err ) , OK : false }
}
}
if err := writer . Close ( ) ; err != nil {
return core . Result { Value : core . E ( "store.Compact" , "close archive writer" , err ) , OK : false }
}
writeOK = true
if err := file . Close ( ) ; err != nil {
return core . Result { Value : core . E ( "store.Compact" , "close archive file" , err ) , OK : false }
}
fileClosed = true
2026-04-04 09:04:56 +00:00
transaction , err := storeInstance . sqliteDatabase . Begin ( )
2026-03-30 20:46:43 +00:00
if err != nil {
return core . Result { Value : core . E ( "store.Compact" , "begin archive transaction" , err ) , OK : false }
}
committed := false
defer func ( ) {
if ! committed {
_ = transaction . Rollback ( )
}
} ( )
archivedAt := time . Now ( ) . UnixMilli ( )
for _ , entry := range archiveEntries {
if _ , err := transaction . Exec (
"UPDATE " + journalEntriesTableName + " SET archived_at = ? WHERE entry_id = ?" ,
archivedAt ,
2026-04-04 11:40:29 +00:00
entry . journalEntryID ,
2026-03-30 20:46:43 +00:00
) ; err != nil {
return core . Result { Value : core . E ( "store.Compact" , "mark journal row archived" , err ) , OK : false }
}
}
if err := transaction . Commit ( ) ; err != nil {
return core . Result { Value : core . E ( "store.Compact" , "commit archive transaction" , err ) , OK : false }
}
committed = true
return core . Result { Value : outputPath , OK : true }
}
2026-04-04 10:33:25 +00:00
func archiveEntryLine ( entry compactArchiveEntry ) ( map [ string ] any , error ) {
2026-03-30 20:46:43 +00:00
fields := make ( map [ string ] any )
2026-04-04 11:40:29 +00:00
fieldsResult := core . JSONUnmarshalString ( entry . journalFieldsJSON , & fields )
2026-03-30 20:46:43 +00:00
if ! fieldsResult . OK {
return nil , core . E ( "store.Compact" , "unmarshal fields" , fieldsResult . Value . ( error ) )
}
tags := make ( map [ string ] string )
2026-04-04 11:40:29 +00:00
tagsResult := core . JSONUnmarshalString ( entry . journalTagsJSON , & tags )
2026-03-30 20:46:43 +00:00
if ! tagsResult . OK {
return nil , core . E ( "store.Compact" , "unmarshal tags" , tagsResult . Value . ( error ) )
}
return map [ string ] any {
2026-04-04 11:40:29 +00:00
"bucket" : entry . journalBucketName ,
"measurement" : entry . journalMeasurementName ,
2026-03-30 20:46:43 +00:00
"fields" : fields ,
"tags" : tags ,
2026-04-04 11:40:29 +00:00
"committed_at" : entry . journalCommittedAtUnixMilli ,
2026-03-30 20:46:43 +00:00
} , nil
}
2026-03-30 20:53:12 +00:00
func archiveWriter ( writer io . Writer , format string ) ( io . WriteCloser , error ) {
switch format {
case "gzip" :
return gzip . NewWriter ( writer ) , nil
case "zstd" :
zstdWriter , err := zstd . NewWriter ( writer )
if err != nil {
return nil , core . E ( "store.Compact" , "create zstd writer" , err )
}
return zstdWriter , nil
default :
return nil , core . E ( "store.Compact" , core . Concat ( "unsupported archive format: " , format ) , nil )
}
}
2026-03-30 20:46:43 +00:00
func compactOutputPath ( outputDirectory , format string ) string {
extension := ".jsonl"
if format == "gzip" {
extension = ".jsonl.gz"
}
2026-03-30 20:53:12 +00:00
if format == "zstd" {
extension = ".jsonl.zst"
}
2026-04-04 13:10:33 +00:00
// Include nanoseconds so two compactions in the same second never collide.
filename := core . Concat ( "journal-" , time . Now ( ) . UTC ( ) . Format ( "20060102-150405.000000000" ) , extension )
2026-03-30 20:46:43 +00:00
return joinPath ( outputDirectory , filename )
}