2026-03-30 20:46:43 +00:00
package store
import (
"compress/gzip"
"io"
"time"
2026-04-04 17:10:25 +00:00
"unicode"
2026-03-30 20:46:43 +00:00
core "dappco.re/go/core"
2026-03-30 20:53:12 +00:00
"github.com/klauspost/compress/zstd"
2026-03-30 20:46:43 +00:00
)
2026-04-04 09:01:23 +00:00
var defaultArchiveOutputDirectory = ".core/archive/"
2026-03-30 20:46:43 +00:00
2026-04-04 17:05:51 +00:00
// Usage example: `options := store.CompactOptions{Before: time.Date(2026, 3, 30, 0, 0, 0, 0, time.UTC), Output: "/tmp/archive", Format: "gzip"}`
2026-04-04 21:29:27 +00:00
// Usage example: `result := storeInstance.Compact(store.CompactOptions{Before: time.Now().Add(-90 * 24 * time.Hour)})`
// Leave `Output` empty to write gzip JSONL archives under `.core/archive/`, or
// set `Format` to `zstd` when downstream tooling expects `.jsonl.zst`.
2026-03-30 20:46:43 +00:00
type CompactOptions struct {
2026-04-04 09:48:02 +00:00
// Usage example: `options := store.CompactOptions{Before: time.Now().Add(-90 * 24 * time.Hour)}`
2026-03-30 20:46:43 +00:00
Before time . Time
2026-04-04 09:48:02 +00:00
// Usage example: `options := store.CompactOptions{Output: "/tmp/archive"}`
2026-03-30 20:46:43 +00:00
Output string
2026-04-04 09:48:02 +00:00
// Usage example: `options := store.CompactOptions{Format: "zstd"}`
2026-03-30 20:46:43 +00:00
Format string
}
2026-04-04 17:05:51 +00:00
// Usage example: `normalisedOptions := (store.CompactOptions{Before: time.Date(2026, 3, 30, 0, 0, 0, 0, time.UTC)}).Normalised()`
2026-04-04 15:47:56 +00:00
func ( compactOptions CompactOptions ) Normalised ( ) CompactOptions {
if compactOptions . Output == "" {
compactOptions . Output = defaultArchiveOutputDirectory
}
2026-04-04 21:29:27 +00:00
compactOptions . Format = lowercaseText ( core . Trim ( compactOptions . Format ) )
2026-04-04 15:47:56 +00:00
if compactOptions . Format == "" {
compactOptions . Format = "gzip"
}
return compactOptions
}
2026-04-04 17:05:51 +00:00
// Usage example: `if err := (store.CompactOptions{Before: time.Date(2026, 3, 30, 0, 0, 0, 0, time.UTC), Format: "gzip"}).Validate(); err != nil { return }`
2026-04-04 15:47:56 +00:00
func ( compactOptions CompactOptions ) Validate ( ) error {
2026-04-04 18:48:41 +00:00
if compactOptions . Before . IsZero ( ) {
return core . E (
"store.CompactOptions.Validate" ,
"before cutoff time is empty; use a value like time.Now().Add(-24 * time.Hour)" ,
nil ,
)
}
2026-04-04 21:29:27 +00:00
switch lowercaseText ( core . Trim ( compactOptions . Format ) ) {
2026-04-04 15:47:56 +00:00
case "" , "gzip" , "zstd" :
return nil
default :
return core . E (
"store.CompactOptions.Validate" ,
core . Concat ( ` format must be "gzip" or "zstd"; got ` , compactOptions . Format ) ,
nil ,
)
}
}
2026-04-04 21:29:27 +00:00
func lowercaseText ( text string ) string {
2026-04-04 17:10:25 +00:00
builder := core . NewBuilder ( )
for _ , r := range text {
builder . WriteRune ( unicode . ToLower ( r ) )
}
return builder . String ( )
}
2026-03-30 20:46:43 +00:00
type compactArchiveEntry struct {
2026-04-04 11:40:29 +00:00
journalEntryID int64
journalBucketName string
journalMeasurementName string
journalFieldsJSON string
journalTagsJSON string
journalCommittedAtUnixMilli int64
2026-03-30 20:46:43 +00:00
}
// Usage example: `result := storeInstance.Compact(store.CompactOptions{Before: time.Now().Add(-30 * 24 * time.Hour), Output: "/tmp/archive", Format: "gzip"})`
func ( storeInstance * Store ) Compact ( options CompactOptions ) core . Result {
2026-04-03 06:31:35 +00:00
if err := storeInstance . ensureReady ( "store.Compact" ) ; err != nil {
return core . Result { Value : err , OK : false }
}
2026-04-04 09:04:56 +00:00
if err := ensureJournalSchema ( storeInstance . sqliteDatabase ) ; err != nil {
2026-03-30 20:46:43 +00:00
return core . Result { Value : core . E ( "store.Compact" , "ensure journal schema" , err ) , OK : false }
}
2026-04-04 15:47:56 +00:00
options = options . Normalised ( )
if err := options . Validate ( ) ; err != nil {
return core . Result { Value : core . E ( "store.Compact" , "validate options" , err ) , OK : false }
2026-03-30 20:46:43 +00:00
}
filesystem := ( & core . Fs { } ) . NewUnrestricted ( )
2026-04-04 15:47:56 +00:00
if result := filesystem . EnsureDir ( options . Output ) ; ! result . OK {
2026-03-30 20:46:43 +00:00
return core . Result { Value : core . E ( "store.Compact" , "ensure archive directory" , result . Value . ( error ) ) , OK : false }
}
2026-04-04 09:04:56 +00:00
rows , err := storeInstance . sqliteDatabase . Query (
2026-04-04 09:54:03 +00:00
"SELECT entry_id, bucket_name, measurement, fields_json, tags_json, committed_at FROM " + journalEntriesTableName + " WHERE archived_at IS NULL AND committed_at < ? ORDER BY committed_at, entry_id" ,
2026-03-30 20:46:43 +00:00
options . Before . UnixMilli ( ) ,
)
if err != nil {
return core . Result { Value : core . E ( "store.Compact" , "query journal rows" , err ) , OK : false }
}
defer rows . Close ( )
var archiveEntries [ ] compactArchiveEntry
for rows . Next ( ) {
var entry compactArchiveEntry
if err := rows . Scan (
2026-04-04 11:40:29 +00:00
& entry . journalEntryID ,
& entry . journalBucketName ,
& entry . journalMeasurementName ,
& entry . journalFieldsJSON ,
& entry . journalTagsJSON ,
& entry . journalCommittedAtUnixMilli ,
2026-03-30 20:46:43 +00:00
) ; err != nil {
return core . Result { Value : core . E ( "store.Compact" , "scan journal row" , err ) , OK : false }
}
archiveEntries = append ( archiveEntries , entry )
}
if err := rows . Err ( ) ; err != nil {
return core . Result { Value : core . E ( "store.Compact" , "iterate journal rows" , err ) , OK : false }
}
if len ( archiveEntries ) == 0 {
return core . Result { Value : "" , OK : true }
}
2026-04-04 15:47:56 +00:00
outputPath := compactOutputPath ( options . Output , options . Format )
2026-04-03 06:01:00 +00:00
archiveFileResult := filesystem . Create ( outputPath )
if ! archiveFileResult . OK {
return core . Result { Value : core . E ( "store.Compact" , "create archive file" , archiveFileResult . Value . ( error ) ) , OK : false }
2026-03-30 20:46:43 +00:00
}
2026-04-03 06:01:00 +00:00
file , ok := archiveFileResult . Value . ( io . WriteCloser )
2026-03-30 20:46:43 +00:00
if ! ok {
return core . Result { Value : core . E ( "store.Compact" , "archive file is not writable" , nil ) , OK : false }
}
2026-04-04 20:29:21 +00:00
archiveFileClosed := false
2026-03-30 20:46:43 +00:00
defer func ( ) {
2026-04-04 20:29:21 +00:00
if ! archiveFileClosed {
2026-03-30 20:46:43 +00:00
_ = file . Close ( )
}
} ( )
2026-04-04 15:47:56 +00:00
writer , err := archiveWriter ( file , options . Format )
2026-03-30 20:53:12 +00:00
if err != nil {
return core . Result { Value : err , OK : false }
}
2026-04-04 20:29:21 +00:00
archiveWriteFinished := false
2026-03-30 20:46:43 +00:00
defer func ( ) {
2026-04-04 20:29:21 +00:00
if ! archiveWriteFinished {
2026-03-30 20:46:43 +00:00
_ = writer . Close ( )
}
} ( )
for _ , entry := range archiveEntries {
2026-04-04 10:33:25 +00:00
lineMap , err := archiveEntryLine ( entry )
2026-03-30 20:46:43 +00:00
if err != nil {
return core . Result { Value : err , OK : false }
}
2026-04-04 11:05:48 +00:00
lineJSON , err := marshalJSONText ( lineMap , "store.Compact" , "marshal archive line" )
2026-03-30 20:46:43 +00:00
if err != nil {
return core . Result { Value : err , OK : false }
}
if _ , err := io . WriteString ( writer , lineJSON + "\n" ) ; err != nil {
return core . Result { Value : core . E ( "store.Compact" , "write archive line" , err ) , OK : false }
}
}
if err := writer . Close ( ) ; err != nil {
return core . Result { Value : core . E ( "store.Compact" , "close archive writer" , err ) , OK : false }
}
2026-04-04 20:29:21 +00:00
archiveWriteFinished = true
2026-03-30 20:46:43 +00:00
if err := file . Close ( ) ; err != nil {
return core . Result { Value : core . E ( "store.Compact" , "close archive file" , err ) , OK : false }
}
2026-04-04 20:29:21 +00:00
archiveFileClosed = true
2026-03-30 20:46:43 +00:00
2026-04-04 09:04:56 +00:00
transaction , err := storeInstance . sqliteDatabase . Begin ( )
2026-03-30 20:46:43 +00:00
if err != nil {
return core . Result { Value : core . E ( "store.Compact" , "begin archive transaction" , err ) , OK : false }
}
committed := false
defer func ( ) {
if ! committed {
_ = transaction . Rollback ( )
}
} ( )
archivedAt := time . Now ( ) . UnixMilli ( )
for _ , entry := range archiveEntries {
if _ , err := transaction . Exec (
"UPDATE " + journalEntriesTableName + " SET archived_at = ? WHERE entry_id = ?" ,
archivedAt ,
2026-04-04 11:40:29 +00:00
entry . journalEntryID ,
2026-03-30 20:46:43 +00:00
) ; err != nil {
return core . Result { Value : core . E ( "store.Compact" , "mark journal row archived" , err ) , OK : false }
}
}
if err := transaction . Commit ( ) ; err != nil {
return core . Result { Value : core . E ( "store.Compact" , "commit archive transaction" , err ) , OK : false }
}
committed = true
return core . Result { Value : outputPath , OK : true }
}
2026-04-04 10:33:25 +00:00
func archiveEntryLine ( entry compactArchiveEntry ) ( map [ string ] any , error ) {
2026-03-30 20:46:43 +00:00
fields := make ( map [ string ] any )
2026-04-04 11:40:29 +00:00
fieldsResult := core . JSONUnmarshalString ( entry . journalFieldsJSON , & fields )
2026-03-30 20:46:43 +00:00
if ! fieldsResult . OK {
return nil , core . E ( "store.Compact" , "unmarshal fields" , fieldsResult . Value . ( error ) )
}
tags := make ( map [ string ] string )
2026-04-04 11:40:29 +00:00
tagsResult := core . JSONUnmarshalString ( entry . journalTagsJSON , & tags )
2026-03-30 20:46:43 +00:00
if ! tagsResult . OK {
return nil , core . E ( "store.Compact" , "unmarshal tags" , tagsResult . Value . ( error ) )
}
return map [ string ] any {
2026-04-04 11:40:29 +00:00
"bucket" : entry . journalBucketName ,
"measurement" : entry . journalMeasurementName ,
2026-03-30 20:46:43 +00:00
"fields" : fields ,
"tags" : tags ,
2026-04-04 11:40:29 +00:00
"committed_at" : entry . journalCommittedAtUnixMilli ,
2026-03-30 20:46:43 +00:00
} , nil
}
2026-03-30 20:53:12 +00:00
func archiveWriter ( writer io . Writer , format string ) ( io . WriteCloser , error ) {
switch format {
case "gzip" :
return gzip . NewWriter ( writer ) , nil
case "zstd" :
zstdWriter , err := zstd . NewWriter ( writer )
if err != nil {
return nil , core . E ( "store.Compact" , "create zstd writer" , err )
}
return zstdWriter , nil
default :
return nil , core . E ( "store.Compact" , core . Concat ( "unsupported archive format: " , format ) , nil )
}
}
2026-03-30 20:46:43 +00:00
func compactOutputPath ( outputDirectory , format string ) string {
extension := ".jsonl"
if format == "gzip" {
extension = ".jsonl.gz"
}
2026-03-30 20:53:12 +00:00
if format == "zstd" {
extension = ".jsonl.zst"
}
2026-04-04 13:10:33 +00:00
// Include nanoseconds so two compactions in the same second never collide.
filename := core . Concat ( "journal-" , time . Now ( ) . UTC ( ) . Format ( "20060102-150405.000000000" ) , extension )
2026-03-30 20:46:43 +00:00
return joinPath ( outputDirectory , filename )
}