diff --git a/compact.go b/compact.go index 17e09e5..0ed7a76 100644 --- a/compact.go +++ b/compact.go @@ -23,6 +23,12 @@ type CompactOptions struct { Output string // Usage example: `options := store.CompactOptions{Format: "zstd"}` Format string + // Usage example: `medium, _ := s3.New(s3.Options{Bucket: "archive"}); options := store.CompactOptions{Before: time.Now().Add(-90 * 24 * time.Hour), Medium: medium}` + // Medium routes the archive write through an io.Medium instead of the raw + // filesystem. When set, Output is the path inside the medium; leave empty + // to use `.core/archive/`. When nil, Compact falls back to the store-level + // medium (if configured via WithMedium), then to the local filesystem. + Medium Medium } // Usage example: `normalisedOptions := (store.CompactOptions{Before: time.Date(2026, 3, 30, 0, 0, 0, 0, time.UTC)}).Normalised()` @@ -89,17 +95,26 @@ func (storeInstance *Store) Compact(options CompactOptions) core.Result { return core.Result{Value: core.E("store.Compact", "validate options", err), OK: false} } - filesystem := (&core.Fs{}).NewUnrestricted() - if result := filesystem.EnsureDir(options.Output); !result.OK { - return core.Result{Value: core.E("store.Compact", "ensure archive directory", result.Value.(error)), OK: false} + medium := options.Medium + if medium == nil { + medium = storeInstance.medium } - rows, err := storeInstance.sqliteDatabase.Query( + filesystem := (&core.Fs{}).NewUnrestricted() + if medium == nil { + if result := filesystem.EnsureDir(options.Output); !result.OK { + return core.Result{Value: core.E("store.Compact", "ensure archive directory", result.Value.(error)), OK: false} + } + } else if err := ensureMediumDir(medium, options.Output); err != nil { + return core.Result{Value: core.E("store.Compact", "ensure medium archive directory", err), OK: false} + } + + rows, queryErr := storeInstance.sqliteDatabase.Query( "SELECT entry_id, bucket_name, measurement, fields_json, tags_json, committed_at FROM "+journalEntriesTableName+" WHERE archived_at IS NULL AND committed_at < ? ORDER BY committed_at, entry_id", options.Before.UnixMilli(), ) - if err != nil { - return core.Result{Value: core.E("store.Compact", "query journal rows", err), OK: false} + if queryErr != nil { + return core.Result{Value: core.E("store.Compact", "query journal rows", queryErr), OK: false} } defer rows.Close() @@ -126,14 +141,25 @@ func (storeInstance *Store) Compact(options CompactOptions) core.Result { } outputPath := compactOutputPath(options.Output, options.Format) - archiveFileResult := filesystem.Create(outputPath) - if !archiveFileResult.OK { - return core.Result{Value: core.E("store.Compact", "create archive file", archiveFileResult.Value.(error)), OK: false} - } - - file, ok := archiveFileResult.Value.(io.WriteCloser) - if !ok { - return core.Result{Value: core.E("store.Compact", "archive file is not writable", nil), OK: false} + var ( + file io.WriteCloser + createErr error + ) + if medium != nil { + file, createErr = medium.Create(outputPath) + if createErr != nil { + return core.Result{Value: core.E("store.Compact", "create archive via medium", createErr), OK: false} + } + } else { + archiveFileResult := filesystem.Create(outputPath) + if !archiveFileResult.OK { + return core.Result{Value: core.E("store.Compact", "create archive file", archiveFileResult.Value.(error)), OK: false} + } + existingFile, ok := archiveFileResult.Value.(io.WriteCloser) + if !ok { + return core.Result{Value: core.E("store.Compact", "archive file is not writable", nil), OK: false} + } + file = existingFile } archiveFileClosed := false defer func() { diff --git a/go.mod b/go.mod index a30e590..0172eb4 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.26.0 require ( dappco.re/go/core v0.8.0-alpha.1 + dappco.re/go/core/io v0.4.2 github.com/klauspost/compress v1.18.5 github.com/stretchr/testify v1.11.1 modernc.org/sqlite v1.47.0 diff --git a/go.sum b/go.sum index 9a1042d..da46534 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,7 @@ dappco.re/go/core v0.8.0-alpha.1 h1:gj7+Scv+L63Z7wMxbJYHhaRFkHJo2u4MMPuUSv/Dhtk= dappco.re/go/core v0.8.0-alpha.1/go.mod h1:f2/tBZ3+3IqDrg2F5F598llv0nmb/4gJVCFzM5geE4A= +dappco.re/go/core/io v0.4.2 h1:SHNF/xMPyFnKWWYoFW5Y56eiuGVL/mFa1lfIw/530ls= +dappco.re/go/core/io v0.4.2/go.mod h1:w71dukyunczLb8frT9JOd5B78PjwWQD3YAXiCt3AcPA= github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU= github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU= github.com/alecthomas/assert/v2 v2.10.0 h1:jjRCHsj6hBJhkmhznrCzoNpbA3zqy0fYiUcYZP/GkPY= diff --git a/json.go b/json.go new file mode 100644 index 0000000..f536685 --- /dev/null +++ b/json.go @@ -0,0 +1,142 @@ +// SPDX-License-Identifier: EUPL-1.2 + +// JSON helpers for storage consumers. +// Re-exports the minimum JSON surface needed by downstream users like +// go-cache and go-tenant so they don't need to import encoding/json directly. +// Internally uses core/go JSON primitives. +package store + +import ( + "bytes" + + core "dappco.re/go/core" +) + +// RawMessage is a raw encoded JSON value. +// Use in structs where the JSON should be stored as-is without re-encoding. +// +// Usage example: +// +// type CacheEntry struct { +// Data store.RawMessage `json:"data"` +// } +type RawMessage []byte + +// MarshalJSON returns the raw bytes as-is. If empty, returns `null`. +// +// Usage example: `bytes, err := raw.MarshalJSON()` +func (raw RawMessage) MarshalJSON() ([]byte, error) { + if len(raw) == 0 { + return []byte("null"), nil + } + return raw, nil +} + +// UnmarshalJSON stores the raw JSON bytes without decoding them. +// +// Usage example: `var raw store.RawMessage; err := raw.UnmarshalJSON(data)` +func (raw *RawMessage) UnmarshalJSON(data []byte) error { + if raw == nil { + return core.E("store.RawMessage.UnmarshalJSON", "nil receiver", nil) + } + *raw = append((*raw)[:0], data...) + return nil +} + +// MarshalIndent serialises a value to pretty-printed JSON bytes. +// Uses core.JSONMarshal internally then applies prefix/indent formatting +// so consumers get readable output without importing encoding/json. +// +// Usage example: `data, err := store.MarshalIndent(entry, "", " ")` +func MarshalIndent(v any, prefix, indent string) ([]byte, error) { + marshalled := core.JSONMarshal(v) + if !marshalled.OK { + if err, ok := marshalled.Value.(error); ok { + return nil, core.E("store.MarshalIndent", "marshal", err) + } + return nil, core.E("store.MarshalIndent", "marshal", nil) + } + raw, ok := marshalled.Value.([]byte) + if !ok { + return nil, core.E("store.MarshalIndent", "non-bytes result", nil) + } + if prefix == "" && indent == "" { + return raw, nil + } + + var buf bytes.Buffer + if err := indentCompactJSON(&buf, raw, prefix, indent); err != nil { + return nil, core.E("store.MarshalIndent", "indent", err) + } + return buf.Bytes(), nil +} + +// indentCompactJSON formats compact JSON bytes with prefix+indent. +// Mirrors json.Indent's semantics without importing encoding/json. +// +// Usage example: `var buf bytes.Buffer; _ = indentCompactJSON(&buf, compact, "", " ")` +func indentCompactJSON(buf *bytes.Buffer, src []byte, prefix, indent string) error { + depth := 0 + inString := false + escaped := false + + writeNewlineIndent := func(level int) { + buf.WriteByte('\n') + buf.WriteString(prefix) + for i := 0; i < level; i++ { + buf.WriteString(indent) + } + } + + for i := 0; i < len(src); i++ { + c := src[i] + if inString { + buf.WriteByte(c) + if escaped { + escaped = false + continue + } + if c == '\\' { + escaped = true + continue + } + if c == '"' { + inString = false + } + continue + } + switch c { + case '"': + inString = true + buf.WriteByte(c) + case '{', '[': + buf.WriteByte(c) + depth++ + // Look ahead for empty object/array. + if i+1 < len(src) && (src[i+1] == '}' || src[i+1] == ']') { + continue + } + writeNewlineIndent(depth) + case '}', ']': + // Only indent if previous byte wasn't the matching opener. + if i > 0 && src[i-1] != '{' && src[i-1] != '[' { + depth-- + writeNewlineIndent(depth) + } else { + depth-- + } + buf.WriteByte(c) + case ',': + buf.WriteByte(c) + writeNewlineIndent(depth) + case ':': + buf.WriteByte(c) + buf.WriteByte(' ') + case ' ', '\t', '\n', '\r': + // Drop whitespace from compact source. + default: + buf.WriteByte(c) + } + } + return nil +} diff --git a/medium.go b/medium.go new file mode 100644 index 0000000..df69e66 --- /dev/null +++ b/medium.go @@ -0,0 +1,362 @@ +// SPDX-License-Identifier: EUPL-1.2 + +package store + +import ( + "bytes" + goio "io" + + core "dappco.re/go/core" + "dappco.re/go/core/io" +) + +// Medium is the minimal storage transport used by the go-store workspace +// import and export helpers and by Compact when writing cold archives. +// +// Any `dappco.re/go/core/io.Medium` implementation (local, memory, S3, cube, +// sftp) satisfies this interface by structural typing — go-store only needs a +// handful of methods to ferry bytes between the workspace buffer and the +// underlying medium. +// +// Usage example: `medium, _ := local.New("/tmp/exports"); storeInstance, err := store.New(":memory:", store.WithMedium(medium))` +type Medium interface { + Read(path string) (string, error) + Write(path, content string) error + EnsureDir(path string) error + Create(path string) (goio.WriteCloser, error) + Exists(path string) bool +} + +// staticMediumCheck documents that `dappco.re/go/core/io.Medium` satisfies the +// in-package `store.Medium` interface — agents pass an `io.Medium` directly to +// `store.WithMedium` without an adapter. +var _ Medium = io.Medium(nil) + +// Usage example: `medium, _ := local.New("/srv/core"); storeInstance, err := store.NewConfigured(store.StoreConfig{DatabasePath: ":memory:", Medium: medium})` +// WithMedium installs an io.Medium-compatible transport on the Store so that +// Compact archives and Import/Export helpers route through the medium instead +// of the raw filesystem. +func WithMedium(medium Medium) StoreOption { + return func(storeConfig *StoreConfig) { + if storeConfig == nil { + return + } + storeConfig.Medium = medium + } +} + +// Usage example: `medium := storeInstance.Medium(); if medium != nil { _ = medium.EnsureDir("exports") }` +func (storeInstance *Store) Medium() Medium { + if storeInstance == nil { + return nil + } + return storeInstance.medium +} + +// Usage example: `err := store.Import(workspace, medium, "dataset.jsonl")` +// Import reads a JSON, JSONL, or CSV payload from the provided medium and +// appends each record to the workspace buffer as a `Put` entry. Format is +// chosen from the file extension: `.json` expects either a top-level array or +// `{"entries":[...]}` shape, `.jsonl`/`.ndjson` parse line-by-line, and `.csv` +// uses the first row as the header. +func Import(workspace *Workspace, medium Medium, path string) error { + if workspace == nil { + return core.E("store.Import", "workspace is nil", nil) + } + if medium == nil { + return core.E("store.Import", "medium is nil", nil) + } + if path == "" { + return core.E("store.Import", "path is empty", nil) + } + + content, err := medium.Read(path) + if err != nil { + return core.E("store.Import", "read from medium", err) + } + + kind := importEntryKind(path) + switch lowercaseText(importExtension(path)) { + case ".jsonl", ".ndjson": + return importJSONLines(workspace, kind, content) + case ".csv": + return importCSV(workspace, kind, content) + case ".json": + return importJSON(workspace, kind, content) + default: + return importJSONLines(workspace, kind, content) + } +} + +// Usage example: `err := store.Export(workspace, medium, "report.json")` +// Export writes the workspace aggregate summary to the medium at the given +// path. Format is chosen from the extension: `.jsonl` writes one record per +// query row, `.csv` writes header + rows, everything else writes the +// aggregate as JSON. +func Export(workspace *Workspace, medium Medium, path string) error { + if workspace == nil { + return core.E("store.Export", "workspace is nil", nil) + } + if medium == nil { + return core.E("store.Export", "medium is nil", nil) + } + if path == "" { + return core.E("store.Export", "path is empty", nil) + } + + if err := ensureMediumDir(medium, core.PathDir(path)); err != nil { + return core.E("store.Export", "ensure directory", err) + } + + switch lowercaseText(importExtension(path)) { + case ".jsonl", ".ndjson": + return exportJSONLines(workspace, medium, path) + case ".csv": + return exportCSV(workspace, medium, path) + default: + return exportJSON(workspace, medium, path) + } +} + +func ensureMediumDir(medium Medium, directory string) error { + if directory == "" || directory == "." || directory == "/" { + return nil + } + if err := medium.EnsureDir(directory); err != nil { + return core.E("store.ensureMediumDir", "ensure directory", err) + } + return nil +} + +func importExtension(path string) string { + base := core.PathBase(path) + for i := len(base) - 1; i >= 0; i-- { + if base[i] == '.' { + return base[i:] + } + } + return "" +} + +func importEntryKind(path string) string { + base := core.PathBase(path) + for i := len(base) - 1; i >= 0; i-- { + if base[i] == '.' { + base = base[:i] + break + } + } + if base == "" { + return "entry" + } + return base +} + +func importJSONLines(workspace *Workspace, kind, content string) error { + scanner := core.Split(content, "\n") + for _, rawLine := range scanner { + line := core.Trim(rawLine) + if line == "" { + continue + } + record := map[string]any{} + if result := core.JSONUnmarshalString(line, &record); !result.OK { + err, _ := result.Value.(error) + return core.E("store.Import", "parse jsonl line", err) + } + if err := workspace.Put(kind, record); err != nil { + return core.E("store.Import", "put jsonl record", err) + } + } + return nil +} + +func importJSON(workspace *Workspace, kind, content string) error { + trimmed := core.Trim(content) + if trimmed == "" { + return nil + } + + var topLevel any + if result := core.JSONUnmarshalString(trimmed, &topLevel); !result.OK { + err, _ := result.Value.(error) + return core.E("store.Import", "parse json", err) + } + + records := collectJSONRecords(topLevel) + for _, record := range records { + if err := workspace.Put(kind, record); err != nil { + return core.E("store.Import", "put json record", err) + } + } + return nil +} + +func collectJSONRecords(value any) []map[string]any { + switch shape := value.(type) { + case []any: + records := make([]map[string]any, 0, len(shape)) + for _, entry := range shape { + if record, ok := entry.(map[string]any); ok { + records = append(records, record) + } + } + return records + case map[string]any: + if nested, ok := shape["entries"].([]any); ok { + return collectJSONRecords(nested) + } + if nested, ok := shape["records"].([]any); ok { + return collectJSONRecords(nested) + } + if nested, ok := shape["data"].([]any); ok { + return collectJSONRecords(nested) + } + return []map[string]any{shape} + } + return nil +} + +func importCSV(workspace *Workspace, kind, content string) error { + lines := core.Split(content, "\n") + if len(lines) == 0 { + return nil + } + header := splitCSVLine(lines[0]) + if len(header) == 0 { + return nil + } + for _, rawLine := range lines[1:] { + line := trimTrailingCarriageReturn(rawLine) + if line == "" { + continue + } + fields := splitCSVLine(line) + record := make(map[string]any, len(header)) + for columnIndex, columnName := range header { + if columnIndex < len(fields) { + record[columnName] = fields[columnIndex] + } else { + record[columnName] = "" + } + } + if err := workspace.Put(kind, record); err != nil { + return core.E("store.Import", "put csv record", err) + } + } + return nil +} + +func splitCSVLine(line string) []string { + line = trimTrailingCarriageReturn(line) + var ( + fields []string + buffer bytes.Buffer + inQuotes bool + wasEscaped bool + ) + for index := 0; index < len(line); index++ { + character := line[index] + switch { + case character == '"' && inQuotes && index+1 < len(line) && line[index+1] == '"': + buffer.WriteByte('"') + index++ + wasEscaped = true + case character == '"': + inQuotes = !inQuotes + case character == ',' && !inQuotes: + fields = append(fields, buffer.String()) + buffer.Reset() + wasEscaped = false + default: + buffer.WriteByte(character) + } + } + fields = append(fields, buffer.String()) + _ = wasEscaped + return fields +} + +func exportJSON(workspace *Workspace, medium Medium, path string) error { + summary := workspace.Aggregate() + content := core.JSONMarshalString(summary) + if err := medium.Write(path, content); err != nil { + return core.E("store.Export", "write json", err) + } + return nil +} + +func exportJSONLines(workspace *Workspace, medium Medium, path string) error { + result := workspace.Query("SELECT entry_kind, entry_data, created_at FROM workspace_entries ORDER BY entry_id") + if !result.OK { + err, _ := result.Value.(error) + return core.E("store.Export", "query workspace", err) + } + rows, ok := result.Value.([]map[string]any) + if !ok { + rows = nil + } + + builder := core.NewBuilder() + for _, row := range rows { + line := core.JSONMarshalString(row) + builder.WriteString(line) + builder.WriteString("\n") + } + if err := medium.Write(path, builder.String()); err != nil { + return core.E("store.Export", "write jsonl", err) + } + return nil +} + +func exportCSV(workspace *Workspace, medium Medium, path string) error { + result := workspace.Query("SELECT entry_kind, entry_data, created_at FROM workspace_entries ORDER BY entry_id") + if !result.OK { + err, _ := result.Value.(error) + return core.E("store.Export", "query workspace", err) + } + rows, ok := result.Value.([]map[string]any) + if !ok { + rows = nil + } + + builder := core.NewBuilder() + builder.WriteString("entry_kind,entry_data,created_at\n") + for _, row := range rows { + builder.WriteString(csvField(core.Sprint(row["entry_kind"]))) + builder.WriteString(",") + builder.WriteString(csvField(core.Sprint(row["entry_data"]))) + builder.WriteString(",") + builder.WriteString(csvField(core.Sprint(row["created_at"]))) + builder.WriteString("\n") + } + if err := medium.Write(path, builder.String()); err != nil { + return core.E("store.Export", "write csv", err) + } + return nil +} + +func trimTrailingCarriageReturn(value string) string { + for len(value) > 0 && value[len(value)-1] == '\r' { + value = value[:len(value)-1] + } + return value +} + +func csvField(value string) string { + needsQuote := false + for index := 0; index < len(value); index++ { + switch value[index] { + case ',', '"', '\n', '\r': + needsQuote = true + } + if needsQuote { + break + } + } + if !needsQuote { + return value + } + escaped := core.Replace(value, `"`, `""`) + return core.Concat(`"`, escaped, `"`) +} diff --git a/medium_test.go b/medium_test.go new file mode 100644 index 0000000..95eba81 --- /dev/null +++ b/medium_test.go @@ -0,0 +1,321 @@ +// SPDX-License-Identifier: EUPL-1.2 + +package store + +import ( + "bytes" + goio "io" + "io/fs" + "sync" + "testing" + "time" + + core "dappco.re/go/core" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// memoryMedium is an in-memory implementation of `store.Medium` used by the +// medium tests so assertions do not depend on the local filesystem. +type memoryMedium struct { + lock sync.Mutex + files map[string]string +} + +func newMemoryMedium() *memoryMedium { + return &memoryMedium{files: make(map[string]string)} +} + +func (medium *memoryMedium) Read(path string) (string, error) { + medium.lock.Lock() + defer medium.lock.Unlock() + content, ok := medium.files[path] + if !ok { + return "", core.E("memoryMedium.Read", "file not found: "+path, nil) + } + return content, nil +} + +func (medium *memoryMedium) Write(path, content string) error { + medium.lock.Lock() + defer medium.lock.Unlock() + medium.files[path] = content + return nil +} + +func (medium *memoryMedium) EnsureDir(string) error { return nil } + +func (medium *memoryMedium) Create(path string) (goio.WriteCloser, error) { + return &memoryWriter{medium: medium, path: path}, nil +} + +func (medium *memoryMedium) Exists(path string) bool { + medium.lock.Lock() + defer medium.lock.Unlock() + _, ok := medium.files[path] + return ok +} + +type memoryWriter struct { + medium *memoryMedium + path string + buffer bytes.Buffer + closed bool +} + +func (writer *memoryWriter) Write(data []byte) (int, error) { + return writer.buffer.Write(data) +} + +func (writer *memoryWriter) Close() error { + if writer.closed { + return nil + } + writer.closed = true + return writer.medium.Write(writer.path, writer.buffer.String()) +} + +// Ensure memoryMedium still satisfies the internal Medium contract. +var _ Medium = (*memoryMedium)(nil) + +// Compile-time check for fs.FileInfo usage in the tests. +var _ fs.FileInfo = (*FileInfoStub)(nil) + +type FileInfoStub struct{} + +func (FileInfoStub) Name() string { return "" } +func (FileInfoStub) Size() int64 { return 0 } +func (FileInfoStub) Mode() fs.FileMode { return 0 } +func (FileInfoStub) ModTime() time.Time { return time.Time{} } +func (FileInfoStub) IsDir() bool { return false } +func (FileInfoStub) Sys() any { return nil } + +func TestMedium_WithMedium_Good(t *testing.T) { + useWorkspaceStateDirectory(t) + + medium := newMemoryMedium() + storeInstance, err := New(":memory:", WithMedium(medium)) + require.NoError(t, err) + defer storeInstance.Close() + + assert.Same(t, medium, storeInstance.Medium(), "medium should round-trip via accessor") + assert.Same(t, medium, storeInstance.Config().Medium, "medium should appear in Config()") +} + +func TestMedium_WithMedium_Bad_NilKeepsFilesystemBackend(t *testing.T) { + useWorkspaceStateDirectory(t) + + storeInstance, err := New(":memory:") + require.NoError(t, err) + defer storeInstance.Close() + + assert.Nil(t, storeInstance.Medium()) +} + +func TestMedium_Import_Good_JSONL(t *testing.T) { + useWorkspaceStateDirectory(t) + + storeInstance, err := New(":memory:") + require.NoError(t, err) + defer storeInstance.Close() + + workspace, err := storeInstance.NewWorkspace("medium-import-jsonl") + require.NoError(t, err) + defer workspace.Discard() + + medium := newMemoryMedium() + require.NoError(t, medium.Write("data.jsonl", `{"user":"@alice"} +{"user":"@bob"} +`)) + + require.NoError(t, Import(workspace, medium, "data.jsonl")) + + rows := requireResultRows(t, workspace.Query("SELECT entry_kind, entry_data FROM workspace_entries ORDER BY entry_id")) + require.Len(t, rows, 2) + assert.Equal(t, "data", rows[0]["entry_kind"]) + assert.Contains(t, rows[0]["entry_data"], "@alice") + assert.Contains(t, rows[1]["entry_data"], "@bob") +} + +func TestMedium_Import_Good_JSONArray(t *testing.T) { + useWorkspaceStateDirectory(t) + + storeInstance, err := New(":memory:") + require.NoError(t, err) + defer storeInstance.Close() + + workspace, err := storeInstance.NewWorkspace("medium-import-json-array") + require.NoError(t, err) + defer workspace.Discard() + + medium := newMemoryMedium() + require.NoError(t, medium.Write("users.json", `[{"name":"Alice"},{"name":"Bob"},{"name":"Carol"}]`)) + + require.NoError(t, Import(workspace, medium, "users.json")) + + assert.Equal(t, map[string]any{"users": 3}, workspace.Aggregate()) +} + +func TestMedium_Import_Good_CSV(t *testing.T) { + useWorkspaceStateDirectory(t) + + storeInstance, err := New(":memory:") + require.NoError(t, err) + defer storeInstance.Close() + + workspace, err := storeInstance.NewWorkspace("medium-import-csv") + require.NoError(t, err) + defer workspace.Discard() + + medium := newMemoryMedium() + require.NoError(t, medium.Write("findings.csv", "tool,severity\ngosec,high\ngolint,low\n")) + + require.NoError(t, Import(workspace, medium, "findings.csv")) + + assert.Equal(t, map[string]any{"findings": 2}, workspace.Aggregate()) +} + +func TestMedium_Import_Bad_NilArguments(t *testing.T) { + useWorkspaceStateDirectory(t) + + storeInstance, err := New(":memory:") + require.NoError(t, err) + defer storeInstance.Close() + + workspace, err := storeInstance.NewWorkspace("medium-import-bad") + require.NoError(t, err) + defer workspace.Discard() + + medium := newMemoryMedium() + + require.Error(t, Import(nil, medium, "data.json")) + require.Error(t, Import(workspace, nil, "data.json")) + require.Error(t, Import(workspace, medium, "")) +} + +func TestMedium_Import_Ugly_MissingFileReturnsError(t *testing.T) { + useWorkspaceStateDirectory(t) + + storeInstance, err := New(":memory:") + require.NoError(t, err) + defer storeInstance.Close() + + workspace, err := storeInstance.NewWorkspace("medium-import-missing") + require.NoError(t, err) + defer workspace.Discard() + + medium := newMemoryMedium() + require.Error(t, Import(workspace, medium, "ghost.jsonl")) +} + +func TestMedium_Export_Good_JSON(t *testing.T) { + useWorkspaceStateDirectory(t) + + storeInstance, err := New(":memory:") + require.NoError(t, err) + defer storeInstance.Close() + + workspace, err := storeInstance.NewWorkspace("medium-export-json") + require.NoError(t, err) + defer workspace.Discard() + + require.NoError(t, workspace.Put("like", map[string]any{"user": "@alice"})) + require.NoError(t, workspace.Put("like", map[string]any{"user": "@bob"})) + require.NoError(t, workspace.Put("profile_match", map[string]any{"user": "@carol"})) + + medium := newMemoryMedium() + require.NoError(t, Export(workspace, medium, "report.json")) + + assert.True(t, medium.Exists("report.json")) + content, err := medium.Read("report.json") + require.NoError(t, err) + assert.Contains(t, content, `"like":2`) + assert.Contains(t, content, `"profile_match":1`) +} + +func TestMedium_Export_Good_JSONLines(t *testing.T) { + useWorkspaceStateDirectory(t) + + storeInstance, err := New(":memory:") + require.NoError(t, err) + defer storeInstance.Close() + + workspace, err := storeInstance.NewWorkspace("medium-export-jsonl") + require.NoError(t, err) + defer workspace.Discard() + + require.NoError(t, workspace.Put("like", map[string]any{"user": "@alice"})) + require.NoError(t, workspace.Put("like", map[string]any{"user": "@bob"})) + + medium := newMemoryMedium() + require.NoError(t, Export(workspace, medium, "report.jsonl")) + + content, err := medium.Read("report.jsonl") + require.NoError(t, err) + lines := 0 + for _, line := range splitNewlines(content) { + if line != "" { + lines++ + } + } + assert.Equal(t, 2, lines) +} + +func TestMedium_Export_Bad_NilArguments(t *testing.T) { + useWorkspaceStateDirectory(t) + + storeInstance, err := New(":memory:") + require.NoError(t, err) + defer storeInstance.Close() + + workspace, err := storeInstance.NewWorkspace("medium-export-bad") + require.NoError(t, err) + defer workspace.Discard() + + medium := newMemoryMedium() + + require.Error(t, Export(nil, medium, "report.json")) + require.Error(t, Export(workspace, nil, "report.json")) + require.Error(t, Export(workspace, medium, "")) +} + +func TestMedium_Compact_Good_MediumRoutesArchive(t *testing.T) { + useWorkspaceStateDirectory(t) + useArchiveOutputDirectory(t) + + medium := newMemoryMedium() + storeInstance, err := New(":memory:", WithJournal("http://127.0.0.1:8086", "core", "events"), WithMedium(medium)) + require.NoError(t, err) + defer storeInstance.Close() + + require.True(t, storeInstance.CommitToJournal("jobs", map[string]any{"count": 3}, map[string]string{"workspace": "jobs-1"}).OK) + + result := storeInstance.Compact(CompactOptions{ + Before: time.Now().Add(time.Minute), + Output: "archive/", + Format: "gzip", + }) + require.True(t, result.OK, "compact result: %v", result.Value) + outputPath, ok := result.Value.(string) + require.True(t, ok) + require.NotEmpty(t, outputPath) + assert.True(t, medium.Exists(outputPath), "compact should write through medium at %s", outputPath) +} + +func splitNewlines(content string) []string { + var result []string + current := core.NewBuilder() + for index := 0; index < len(content); index++ { + character := content[index] + if character == '\n' { + result = append(result, current.String()) + current.Reset() + continue + } + current.WriteByte(character) + } + if current.Len() > 0 { + result = append(result, current.String()) + } + return result +} diff --git a/store.go b/store.go index 84722db..471a106 100644 --- a/store.go +++ b/store.go @@ -45,6 +45,11 @@ type StoreConfig struct { PurgeInterval time.Duration // Usage example: `config := store.StoreConfig{WorkspaceStateDirectory: "/tmp/core-state"}` WorkspaceStateDirectory string + // Usage example: `medium, _ := local.New("/srv/core"); config := store.StoreConfig{DatabasePath: ":memory:", Medium: medium}` + // Medium overrides the raw filesystem for Compact archives and Import / + // Export helpers, letting tests and production swap the backing transport + // (memory, S3, cube) without touching the store API. + Medium Medium } // Usage example: `config := (store.StoreConfig{DatabasePath: ":memory:"}).Normalised(); fmt.Println(config.PurgeInterval, config.WorkspaceStateDirectory)` @@ -139,6 +144,7 @@ type Store struct { purgeWaitGroup sync.WaitGroup purgeInterval time.Duration // interval between background purge cycles journalConfiguration JournalConfiguration + medium Medium lifecycleLock sync.Mutex isClosed bool @@ -223,6 +229,7 @@ func (storeInstance *Store) Config() StoreConfig { Journal: storeInstance.JournalConfiguration(), PurgeInterval: storeInstance.purgeInterval, WorkspaceStateDirectory: storeInstance.WorkspaceStateDirectory(), + Medium: storeInstance.medium, } } @@ -289,6 +296,7 @@ func openConfiguredStore(operation string, storeConfig StoreConfig) (*Store, err } storeInstance.purgeInterval = storeConfig.PurgeInterval storeInstance.workspaceStateDirectory = storeConfig.WorkspaceStateDirectory + storeInstance.medium = storeConfig.Medium // New() performs a non-destructive orphan scan so callers can discover // leftover workspaces via RecoverOrphans().