feat(store): io.Medium-backed storage per RFC §9
Add WithMedium option so Store archives and Import/Export helpers can route through any io.Medium implementation (local, memory, S3, cube, sftp) instead of the raw filesystem. The Medium transport is optional — when unset, existing filesystem behaviour is preserved. - medium.go exposes WithMedium, Import, and Export helpers plus a small Medium interface that any io.Medium satisfies structurally - Compact honours the installed Medium for archive writes, falling back to the local filesystem when nil - StoreConfig.Medium round-trips through Config()/WithMedium so callers can inspect and override the transport - medium_test.go covers the happy-path JSONL/CSV/JSON imports, JSON and JSONL exports, nil-argument validation, missing-file errors, and the Compact medium route Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
parent
eef4e737aa
commit
2d7fb951db
7 changed files with 876 additions and 14 deletions
54
compact.go
54
compact.go
|
|
@ -23,6 +23,12 @@ type CompactOptions struct {
|
|||
Output string
|
||||
// Usage example: `options := store.CompactOptions{Format: "zstd"}`
|
||||
Format string
|
||||
// Usage example: `medium, _ := s3.New(s3.Options{Bucket: "archive"}); options := store.CompactOptions{Before: time.Now().Add(-90 * 24 * time.Hour), Medium: medium}`
|
||||
// Medium routes the archive write through an io.Medium instead of the raw
|
||||
// filesystem. When set, Output is the path inside the medium; leave empty
|
||||
// to use `.core/archive/`. When nil, Compact falls back to the store-level
|
||||
// medium (if configured via WithMedium), then to the local filesystem.
|
||||
Medium Medium
|
||||
}
|
||||
|
||||
// Usage example: `normalisedOptions := (store.CompactOptions{Before: time.Date(2026, 3, 30, 0, 0, 0, 0, time.UTC)}).Normalised()`
|
||||
|
|
@ -89,17 +95,26 @@ func (storeInstance *Store) Compact(options CompactOptions) core.Result {
|
|||
return core.Result{Value: core.E("store.Compact", "validate options", err), OK: false}
|
||||
}
|
||||
|
||||
filesystem := (&core.Fs{}).NewUnrestricted()
|
||||
if result := filesystem.EnsureDir(options.Output); !result.OK {
|
||||
return core.Result{Value: core.E("store.Compact", "ensure archive directory", result.Value.(error)), OK: false}
|
||||
medium := options.Medium
|
||||
if medium == nil {
|
||||
medium = storeInstance.medium
|
||||
}
|
||||
|
||||
rows, err := storeInstance.sqliteDatabase.Query(
|
||||
filesystem := (&core.Fs{}).NewUnrestricted()
|
||||
if medium == nil {
|
||||
if result := filesystem.EnsureDir(options.Output); !result.OK {
|
||||
return core.Result{Value: core.E("store.Compact", "ensure archive directory", result.Value.(error)), OK: false}
|
||||
}
|
||||
} else if err := ensureMediumDir(medium, options.Output); err != nil {
|
||||
return core.Result{Value: core.E("store.Compact", "ensure medium archive directory", err), OK: false}
|
||||
}
|
||||
|
||||
rows, queryErr := storeInstance.sqliteDatabase.Query(
|
||||
"SELECT entry_id, bucket_name, measurement, fields_json, tags_json, committed_at FROM "+journalEntriesTableName+" WHERE archived_at IS NULL AND committed_at < ? ORDER BY committed_at, entry_id",
|
||||
options.Before.UnixMilli(),
|
||||
)
|
||||
if err != nil {
|
||||
return core.Result{Value: core.E("store.Compact", "query journal rows", err), OK: false}
|
||||
if queryErr != nil {
|
||||
return core.Result{Value: core.E("store.Compact", "query journal rows", queryErr), OK: false}
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
|
|
@ -126,14 +141,25 @@ func (storeInstance *Store) Compact(options CompactOptions) core.Result {
|
|||
}
|
||||
|
||||
outputPath := compactOutputPath(options.Output, options.Format)
|
||||
archiveFileResult := filesystem.Create(outputPath)
|
||||
if !archiveFileResult.OK {
|
||||
return core.Result{Value: core.E("store.Compact", "create archive file", archiveFileResult.Value.(error)), OK: false}
|
||||
}
|
||||
|
||||
file, ok := archiveFileResult.Value.(io.WriteCloser)
|
||||
if !ok {
|
||||
return core.Result{Value: core.E("store.Compact", "archive file is not writable", nil), OK: false}
|
||||
var (
|
||||
file io.WriteCloser
|
||||
createErr error
|
||||
)
|
||||
if medium != nil {
|
||||
file, createErr = medium.Create(outputPath)
|
||||
if createErr != nil {
|
||||
return core.Result{Value: core.E("store.Compact", "create archive via medium", createErr), OK: false}
|
||||
}
|
||||
} else {
|
||||
archiveFileResult := filesystem.Create(outputPath)
|
||||
if !archiveFileResult.OK {
|
||||
return core.Result{Value: core.E("store.Compact", "create archive file", archiveFileResult.Value.(error)), OK: false}
|
||||
}
|
||||
existingFile, ok := archiveFileResult.Value.(io.WriteCloser)
|
||||
if !ok {
|
||||
return core.Result{Value: core.E("store.Compact", "archive file is not writable", nil), OK: false}
|
||||
}
|
||||
file = existingFile
|
||||
}
|
||||
archiveFileClosed := false
|
||||
defer func() {
|
||||
|
|
|
|||
1
go.mod
1
go.mod
|
|
@ -4,6 +4,7 @@ go 1.26.0
|
|||
|
||||
require (
|
||||
dappco.re/go/core v0.8.0-alpha.1
|
||||
dappco.re/go/core/io v0.4.2
|
||||
github.com/klauspost/compress v1.18.5
|
||||
github.com/stretchr/testify v1.11.1
|
||||
modernc.org/sqlite v1.47.0
|
||||
|
|
|
|||
2
go.sum
2
go.sum
|
|
@ -1,5 +1,7 @@
|
|||
dappco.re/go/core v0.8.0-alpha.1 h1:gj7+Scv+L63Z7wMxbJYHhaRFkHJo2u4MMPuUSv/Dhtk=
|
||||
dappco.re/go/core v0.8.0-alpha.1/go.mod h1:f2/tBZ3+3IqDrg2F5F598llv0nmb/4gJVCFzM5geE4A=
|
||||
dappco.re/go/core/io v0.4.2 h1:SHNF/xMPyFnKWWYoFW5Y56eiuGVL/mFa1lfIw/530ls=
|
||||
dappco.re/go/core/io v0.4.2/go.mod h1:w71dukyunczLb8frT9JOd5B78PjwWQD3YAXiCt3AcPA=
|
||||
github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU=
|
||||
github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU=
|
||||
github.com/alecthomas/assert/v2 v2.10.0 h1:jjRCHsj6hBJhkmhznrCzoNpbA3zqy0fYiUcYZP/GkPY=
|
||||
|
|
|
|||
142
json.go
Normal file
142
json.go
Normal file
|
|
@ -0,0 +1,142 @@
|
|||
// SPDX-License-Identifier: EUPL-1.2
|
||||
|
||||
// JSON helpers for storage consumers.
|
||||
// Re-exports the minimum JSON surface needed by downstream users like
|
||||
// go-cache and go-tenant so they don't need to import encoding/json directly.
|
||||
// Internally uses core/go JSON primitives.
|
||||
package store
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
|
||||
core "dappco.re/go/core"
|
||||
)
|
||||
|
||||
// RawMessage is a raw encoded JSON value.
|
||||
// Use in structs where the JSON should be stored as-is without re-encoding.
|
||||
//
|
||||
// Usage example:
|
||||
//
|
||||
// type CacheEntry struct {
|
||||
// Data store.RawMessage `json:"data"`
|
||||
// }
|
||||
type RawMessage []byte
|
||||
|
||||
// MarshalJSON returns the raw bytes as-is. If empty, returns `null`.
|
||||
//
|
||||
// Usage example: `bytes, err := raw.MarshalJSON()`
|
||||
func (raw RawMessage) MarshalJSON() ([]byte, error) {
|
||||
if len(raw) == 0 {
|
||||
return []byte("null"), nil
|
||||
}
|
||||
return raw, nil
|
||||
}
|
||||
|
||||
// UnmarshalJSON stores the raw JSON bytes without decoding them.
|
||||
//
|
||||
// Usage example: `var raw store.RawMessage; err := raw.UnmarshalJSON(data)`
|
||||
func (raw *RawMessage) UnmarshalJSON(data []byte) error {
|
||||
if raw == nil {
|
||||
return core.E("store.RawMessage.UnmarshalJSON", "nil receiver", nil)
|
||||
}
|
||||
*raw = append((*raw)[:0], data...)
|
||||
return nil
|
||||
}
|
||||
|
||||
// MarshalIndent serialises a value to pretty-printed JSON bytes.
|
||||
// Uses core.JSONMarshal internally then applies prefix/indent formatting
|
||||
// so consumers get readable output without importing encoding/json.
|
||||
//
|
||||
// Usage example: `data, err := store.MarshalIndent(entry, "", " ")`
|
||||
func MarshalIndent(v any, prefix, indent string) ([]byte, error) {
|
||||
marshalled := core.JSONMarshal(v)
|
||||
if !marshalled.OK {
|
||||
if err, ok := marshalled.Value.(error); ok {
|
||||
return nil, core.E("store.MarshalIndent", "marshal", err)
|
||||
}
|
||||
return nil, core.E("store.MarshalIndent", "marshal", nil)
|
||||
}
|
||||
raw, ok := marshalled.Value.([]byte)
|
||||
if !ok {
|
||||
return nil, core.E("store.MarshalIndent", "non-bytes result", nil)
|
||||
}
|
||||
if prefix == "" && indent == "" {
|
||||
return raw, nil
|
||||
}
|
||||
|
||||
var buf bytes.Buffer
|
||||
if err := indentCompactJSON(&buf, raw, prefix, indent); err != nil {
|
||||
return nil, core.E("store.MarshalIndent", "indent", err)
|
||||
}
|
||||
return buf.Bytes(), nil
|
||||
}
|
||||
|
||||
// indentCompactJSON formats compact JSON bytes with prefix+indent.
|
||||
// Mirrors json.Indent's semantics without importing encoding/json.
|
||||
//
|
||||
// Usage example: `var buf bytes.Buffer; _ = indentCompactJSON(&buf, compact, "", " ")`
|
||||
func indentCompactJSON(buf *bytes.Buffer, src []byte, prefix, indent string) error {
|
||||
depth := 0
|
||||
inString := false
|
||||
escaped := false
|
||||
|
||||
writeNewlineIndent := func(level int) {
|
||||
buf.WriteByte('\n')
|
||||
buf.WriteString(prefix)
|
||||
for i := 0; i < level; i++ {
|
||||
buf.WriteString(indent)
|
||||
}
|
||||
}
|
||||
|
||||
for i := 0; i < len(src); i++ {
|
||||
c := src[i]
|
||||
if inString {
|
||||
buf.WriteByte(c)
|
||||
if escaped {
|
||||
escaped = false
|
||||
continue
|
||||
}
|
||||
if c == '\\' {
|
||||
escaped = true
|
||||
continue
|
||||
}
|
||||
if c == '"' {
|
||||
inString = false
|
||||
}
|
||||
continue
|
||||
}
|
||||
switch c {
|
||||
case '"':
|
||||
inString = true
|
||||
buf.WriteByte(c)
|
||||
case '{', '[':
|
||||
buf.WriteByte(c)
|
||||
depth++
|
||||
// Look ahead for empty object/array.
|
||||
if i+1 < len(src) && (src[i+1] == '}' || src[i+1] == ']') {
|
||||
continue
|
||||
}
|
||||
writeNewlineIndent(depth)
|
||||
case '}', ']':
|
||||
// Only indent if previous byte wasn't the matching opener.
|
||||
if i > 0 && src[i-1] != '{' && src[i-1] != '[' {
|
||||
depth--
|
||||
writeNewlineIndent(depth)
|
||||
} else {
|
||||
depth--
|
||||
}
|
||||
buf.WriteByte(c)
|
||||
case ',':
|
||||
buf.WriteByte(c)
|
||||
writeNewlineIndent(depth)
|
||||
case ':':
|
||||
buf.WriteByte(c)
|
||||
buf.WriteByte(' ')
|
||||
case ' ', '\t', '\n', '\r':
|
||||
// Drop whitespace from compact source.
|
||||
default:
|
||||
buf.WriteByte(c)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
362
medium.go
Normal file
362
medium.go
Normal file
|
|
@ -0,0 +1,362 @@
|
|||
// SPDX-License-Identifier: EUPL-1.2
|
||||
|
||||
package store
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
goio "io"
|
||||
|
||||
core "dappco.re/go/core"
|
||||
"dappco.re/go/core/io"
|
||||
)
|
||||
|
||||
// Medium is the minimal storage transport used by the go-store workspace
|
||||
// import and export helpers and by Compact when writing cold archives.
|
||||
//
|
||||
// Any `dappco.re/go/core/io.Medium` implementation (local, memory, S3, cube,
|
||||
// sftp) satisfies this interface by structural typing — go-store only needs a
|
||||
// handful of methods to ferry bytes between the workspace buffer and the
|
||||
// underlying medium.
|
||||
//
|
||||
// Usage example: `medium, _ := local.New("/tmp/exports"); storeInstance, err := store.New(":memory:", store.WithMedium(medium))`
|
||||
type Medium interface {
|
||||
Read(path string) (string, error)
|
||||
Write(path, content string) error
|
||||
EnsureDir(path string) error
|
||||
Create(path string) (goio.WriteCloser, error)
|
||||
Exists(path string) bool
|
||||
}
|
||||
|
||||
// staticMediumCheck documents that `dappco.re/go/core/io.Medium` satisfies the
|
||||
// in-package `store.Medium` interface — agents pass an `io.Medium` directly to
|
||||
// `store.WithMedium` without an adapter.
|
||||
var _ Medium = io.Medium(nil)
|
||||
|
||||
// Usage example: `medium, _ := local.New("/srv/core"); storeInstance, err := store.NewConfigured(store.StoreConfig{DatabasePath: ":memory:", Medium: medium})`
|
||||
// WithMedium installs an io.Medium-compatible transport on the Store so that
|
||||
// Compact archives and Import/Export helpers route through the medium instead
|
||||
// of the raw filesystem.
|
||||
func WithMedium(medium Medium) StoreOption {
|
||||
return func(storeConfig *StoreConfig) {
|
||||
if storeConfig == nil {
|
||||
return
|
||||
}
|
||||
storeConfig.Medium = medium
|
||||
}
|
||||
}
|
||||
|
||||
// Usage example: `medium := storeInstance.Medium(); if medium != nil { _ = medium.EnsureDir("exports") }`
|
||||
func (storeInstance *Store) Medium() Medium {
|
||||
if storeInstance == nil {
|
||||
return nil
|
||||
}
|
||||
return storeInstance.medium
|
||||
}
|
||||
|
||||
// Usage example: `err := store.Import(workspace, medium, "dataset.jsonl")`
|
||||
// Import reads a JSON, JSONL, or CSV payload from the provided medium and
|
||||
// appends each record to the workspace buffer as a `Put` entry. Format is
|
||||
// chosen from the file extension: `.json` expects either a top-level array or
|
||||
// `{"entries":[...]}` shape, `.jsonl`/`.ndjson` parse line-by-line, and `.csv`
|
||||
// uses the first row as the header.
|
||||
func Import(workspace *Workspace, medium Medium, path string) error {
|
||||
if workspace == nil {
|
||||
return core.E("store.Import", "workspace is nil", nil)
|
||||
}
|
||||
if medium == nil {
|
||||
return core.E("store.Import", "medium is nil", nil)
|
||||
}
|
||||
if path == "" {
|
||||
return core.E("store.Import", "path is empty", nil)
|
||||
}
|
||||
|
||||
content, err := medium.Read(path)
|
||||
if err != nil {
|
||||
return core.E("store.Import", "read from medium", err)
|
||||
}
|
||||
|
||||
kind := importEntryKind(path)
|
||||
switch lowercaseText(importExtension(path)) {
|
||||
case ".jsonl", ".ndjson":
|
||||
return importJSONLines(workspace, kind, content)
|
||||
case ".csv":
|
||||
return importCSV(workspace, kind, content)
|
||||
case ".json":
|
||||
return importJSON(workspace, kind, content)
|
||||
default:
|
||||
return importJSONLines(workspace, kind, content)
|
||||
}
|
||||
}
|
||||
|
||||
// Usage example: `err := store.Export(workspace, medium, "report.json")`
|
||||
// Export writes the workspace aggregate summary to the medium at the given
|
||||
// path. Format is chosen from the extension: `.jsonl` writes one record per
|
||||
// query row, `.csv` writes header + rows, everything else writes the
|
||||
// aggregate as JSON.
|
||||
func Export(workspace *Workspace, medium Medium, path string) error {
|
||||
if workspace == nil {
|
||||
return core.E("store.Export", "workspace is nil", nil)
|
||||
}
|
||||
if medium == nil {
|
||||
return core.E("store.Export", "medium is nil", nil)
|
||||
}
|
||||
if path == "" {
|
||||
return core.E("store.Export", "path is empty", nil)
|
||||
}
|
||||
|
||||
if err := ensureMediumDir(medium, core.PathDir(path)); err != nil {
|
||||
return core.E("store.Export", "ensure directory", err)
|
||||
}
|
||||
|
||||
switch lowercaseText(importExtension(path)) {
|
||||
case ".jsonl", ".ndjson":
|
||||
return exportJSONLines(workspace, medium, path)
|
||||
case ".csv":
|
||||
return exportCSV(workspace, medium, path)
|
||||
default:
|
||||
return exportJSON(workspace, medium, path)
|
||||
}
|
||||
}
|
||||
|
||||
func ensureMediumDir(medium Medium, directory string) error {
|
||||
if directory == "" || directory == "." || directory == "/" {
|
||||
return nil
|
||||
}
|
||||
if err := medium.EnsureDir(directory); err != nil {
|
||||
return core.E("store.ensureMediumDir", "ensure directory", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func importExtension(path string) string {
|
||||
base := core.PathBase(path)
|
||||
for i := len(base) - 1; i >= 0; i-- {
|
||||
if base[i] == '.' {
|
||||
return base[i:]
|
||||
}
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func importEntryKind(path string) string {
|
||||
base := core.PathBase(path)
|
||||
for i := len(base) - 1; i >= 0; i-- {
|
||||
if base[i] == '.' {
|
||||
base = base[:i]
|
||||
break
|
||||
}
|
||||
}
|
||||
if base == "" {
|
||||
return "entry"
|
||||
}
|
||||
return base
|
||||
}
|
||||
|
||||
func importJSONLines(workspace *Workspace, kind, content string) error {
|
||||
scanner := core.Split(content, "\n")
|
||||
for _, rawLine := range scanner {
|
||||
line := core.Trim(rawLine)
|
||||
if line == "" {
|
||||
continue
|
||||
}
|
||||
record := map[string]any{}
|
||||
if result := core.JSONUnmarshalString(line, &record); !result.OK {
|
||||
err, _ := result.Value.(error)
|
||||
return core.E("store.Import", "parse jsonl line", err)
|
||||
}
|
||||
if err := workspace.Put(kind, record); err != nil {
|
||||
return core.E("store.Import", "put jsonl record", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func importJSON(workspace *Workspace, kind, content string) error {
|
||||
trimmed := core.Trim(content)
|
||||
if trimmed == "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
var topLevel any
|
||||
if result := core.JSONUnmarshalString(trimmed, &topLevel); !result.OK {
|
||||
err, _ := result.Value.(error)
|
||||
return core.E("store.Import", "parse json", err)
|
||||
}
|
||||
|
||||
records := collectJSONRecords(topLevel)
|
||||
for _, record := range records {
|
||||
if err := workspace.Put(kind, record); err != nil {
|
||||
return core.E("store.Import", "put json record", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func collectJSONRecords(value any) []map[string]any {
|
||||
switch shape := value.(type) {
|
||||
case []any:
|
||||
records := make([]map[string]any, 0, len(shape))
|
||||
for _, entry := range shape {
|
||||
if record, ok := entry.(map[string]any); ok {
|
||||
records = append(records, record)
|
||||
}
|
||||
}
|
||||
return records
|
||||
case map[string]any:
|
||||
if nested, ok := shape["entries"].([]any); ok {
|
||||
return collectJSONRecords(nested)
|
||||
}
|
||||
if nested, ok := shape["records"].([]any); ok {
|
||||
return collectJSONRecords(nested)
|
||||
}
|
||||
if nested, ok := shape["data"].([]any); ok {
|
||||
return collectJSONRecords(nested)
|
||||
}
|
||||
return []map[string]any{shape}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func importCSV(workspace *Workspace, kind, content string) error {
|
||||
lines := core.Split(content, "\n")
|
||||
if len(lines) == 0 {
|
||||
return nil
|
||||
}
|
||||
header := splitCSVLine(lines[0])
|
||||
if len(header) == 0 {
|
||||
return nil
|
||||
}
|
||||
for _, rawLine := range lines[1:] {
|
||||
line := trimTrailingCarriageReturn(rawLine)
|
||||
if line == "" {
|
||||
continue
|
||||
}
|
||||
fields := splitCSVLine(line)
|
||||
record := make(map[string]any, len(header))
|
||||
for columnIndex, columnName := range header {
|
||||
if columnIndex < len(fields) {
|
||||
record[columnName] = fields[columnIndex]
|
||||
} else {
|
||||
record[columnName] = ""
|
||||
}
|
||||
}
|
||||
if err := workspace.Put(kind, record); err != nil {
|
||||
return core.E("store.Import", "put csv record", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func splitCSVLine(line string) []string {
|
||||
line = trimTrailingCarriageReturn(line)
|
||||
var (
|
||||
fields []string
|
||||
buffer bytes.Buffer
|
||||
inQuotes bool
|
||||
wasEscaped bool
|
||||
)
|
||||
for index := 0; index < len(line); index++ {
|
||||
character := line[index]
|
||||
switch {
|
||||
case character == '"' && inQuotes && index+1 < len(line) && line[index+1] == '"':
|
||||
buffer.WriteByte('"')
|
||||
index++
|
||||
wasEscaped = true
|
||||
case character == '"':
|
||||
inQuotes = !inQuotes
|
||||
case character == ',' && !inQuotes:
|
||||
fields = append(fields, buffer.String())
|
||||
buffer.Reset()
|
||||
wasEscaped = false
|
||||
default:
|
||||
buffer.WriteByte(character)
|
||||
}
|
||||
}
|
||||
fields = append(fields, buffer.String())
|
||||
_ = wasEscaped
|
||||
return fields
|
||||
}
|
||||
|
||||
func exportJSON(workspace *Workspace, medium Medium, path string) error {
|
||||
summary := workspace.Aggregate()
|
||||
content := core.JSONMarshalString(summary)
|
||||
if err := medium.Write(path, content); err != nil {
|
||||
return core.E("store.Export", "write json", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func exportJSONLines(workspace *Workspace, medium Medium, path string) error {
|
||||
result := workspace.Query("SELECT entry_kind, entry_data, created_at FROM workspace_entries ORDER BY entry_id")
|
||||
if !result.OK {
|
||||
err, _ := result.Value.(error)
|
||||
return core.E("store.Export", "query workspace", err)
|
||||
}
|
||||
rows, ok := result.Value.([]map[string]any)
|
||||
if !ok {
|
||||
rows = nil
|
||||
}
|
||||
|
||||
builder := core.NewBuilder()
|
||||
for _, row := range rows {
|
||||
line := core.JSONMarshalString(row)
|
||||
builder.WriteString(line)
|
||||
builder.WriteString("\n")
|
||||
}
|
||||
if err := medium.Write(path, builder.String()); err != nil {
|
||||
return core.E("store.Export", "write jsonl", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func exportCSV(workspace *Workspace, medium Medium, path string) error {
|
||||
result := workspace.Query("SELECT entry_kind, entry_data, created_at FROM workspace_entries ORDER BY entry_id")
|
||||
if !result.OK {
|
||||
err, _ := result.Value.(error)
|
||||
return core.E("store.Export", "query workspace", err)
|
||||
}
|
||||
rows, ok := result.Value.([]map[string]any)
|
||||
if !ok {
|
||||
rows = nil
|
||||
}
|
||||
|
||||
builder := core.NewBuilder()
|
||||
builder.WriteString("entry_kind,entry_data,created_at\n")
|
||||
for _, row := range rows {
|
||||
builder.WriteString(csvField(core.Sprint(row["entry_kind"])))
|
||||
builder.WriteString(",")
|
||||
builder.WriteString(csvField(core.Sprint(row["entry_data"])))
|
||||
builder.WriteString(",")
|
||||
builder.WriteString(csvField(core.Sprint(row["created_at"])))
|
||||
builder.WriteString("\n")
|
||||
}
|
||||
if err := medium.Write(path, builder.String()); err != nil {
|
||||
return core.E("store.Export", "write csv", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func trimTrailingCarriageReturn(value string) string {
|
||||
for len(value) > 0 && value[len(value)-1] == '\r' {
|
||||
value = value[:len(value)-1]
|
||||
}
|
||||
return value
|
||||
}
|
||||
|
||||
func csvField(value string) string {
|
||||
needsQuote := false
|
||||
for index := 0; index < len(value); index++ {
|
||||
switch value[index] {
|
||||
case ',', '"', '\n', '\r':
|
||||
needsQuote = true
|
||||
}
|
||||
if needsQuote {
|
||||
break
|
||||
}
|
||||
}
|
||||
if !needsQuote {
|
||||
return value
|
||||
}
|
||||
escaped := core.Replace(value, `"`, `""`)
|
||||
return core.Concat(`"`, escaped, `"`)
|
||||
}
|
||||
321
medium_test.go
Normal file
321
medium_test.go
Normal file
|
|
@ -0,0 +1,321 @@
|
|||
// SPDX-License-Identifier: EUPL-1.2
|
||||
|
||||
package store
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
goio "io"
|
||||
"io/fs"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
core "dappco.re/go/core"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// memoryMedium is an in-memory implementation of `store.Medium` used by the
|
||||
// medium tests so assertions do not depend on the local filesystem.
|
||||
type memoryMedium struct {
|
||||
lock sync.Mutex
|
||||
files map[string]string
|
||||
}
|
||||
|
||||
func newMemoryMedium() *memoryMedium {
|
||||
return &memoryMedium{files: make(map[string]string)}
|
||||
}
|
||||
|
||||
func (medium *memoryMedium) Read(path string) (string, error) {
|
||||
medium.lock.Lock()
|
||||
defer medium.lock.Unlock()
|
||||
content, ok := medium.files[path]
|
||||
if !ok {
|
||||
return "", core.E("memoryMedium.Read", "file not found: "+path, nil)
|
||||
}
|
||||
return content, nil
|
||||
}
|
||||
|
||||
func (medium *memoryMedium) Write(path, content string) error {
|
||||
medium.lock.Lock()
|
||||
defer medium.lock.Unlock()
|
||||
medium.files[path] = content
|
||||
return nil
|
||||
}
|
||||
|
||||
func (medium *memoryMedium) EnsureDir(string) error { return nil }
|
||||
|
||||
func (medium *memoryMedium) Create(path string) (goio.WriteCloser, error) {
|
||||
return &memoryWriter{medium: medium, path: path}, nil
|
||||
}
|
||||
|
||||
func (medium *memoryMedium) Exists(path string) bool {
|
||||
medium.lock.Lock()
|
||||
defer medium.lock.Unlock()
|
||||
_, ok := medium.files[path]
|
||||
return ok
|
||||
}
|
||||
|
||||
type memoryWriter struct {
|
||||
medium *memoryMedium
|
||||
path string
|
||||
buffer bytes.Buffer
|
||||
closed bool
|
||||
}
|
||||
|
||||
func (writer *memoryWriter) Write(data []byte) (int, error) {
|
||||
return writer.buffer.Write(data)
|
||||
}
|
||||
|
||||
func (writer *memoryWriter) Close() error {
|
||||
if writer.closed {
|
||||
return nil
|
||||
}
|
||||
writer.closed = true
|
||||
return writer.medium.Write(writer.path, writer.buffer.String())
|
||||
}
|
||||
|
||||
// Ensure memoryMedium still satisfies the internal Medium contract.
|
||||
var _ Medium = (*memoryMedium)(nil)
|
||||
|
||||
// Compile-time check for fs.FileInfo usage in the tests.
|
||||
var _ fs.FileInfo = (*FileInfoStub)(nil)
|
||||
|
||||
type FileInfoStub struct{}
|
||||
|
||||
func (FileInfoStub) Name() string { return "" }
|
||||
func (FileInfoStub) Size() int64 { return 0 }
|
||||
func (FileInfoStub) Mode() fs.FileMode { return 0 }
|
||||
func (FileInfoStub) ModTime() time.Time { return time.Time{} }
|
||||
func (FileInfoStub) IsDir() bool { return false }
|
||||
func (FileInfoStub) Sys() any { return nil }
|
||||
|
||||
func TestMedium_WithMedium_Good(t *testing.T) {
|
||||
useWorkspaceStateDirectory(t)
|
||||
|
||||
medium := newMemoryMedium()
|
||||
storeInstance, err := New(":memory:", WithMedium(medium))
|
||||
require.NoError(t, err)
|
||||
defer storeInstance.Close()
|
||||
|
||||
assert.Same(t, medium, storeInstance.Medium(), "medium should round-trip via accessor")
|
||||
assert.Same(t, medium, storeInstance.Config().Medium, "medium should appear in Config()")
|
||||
}
|
||||
|
||||
func TestMedium_WithMedium_Bad_NilKeepsFilesystemBackend(t *testing.T) {
|
||||
useWorkspaceStateDirectory(t)
|
||||
|
||||
storeInstance, err := New(":memory:")
|
||||
require.NoError(t, err)
|
||||
defer storeInstance.Close()
|
||||
|
||||
assert.Nil(t, storeInstance.Medium())
|
||||
}
|
||||
|
||||
func TestMedium_Import_Good_JSONL(t *testing.T) {
|
||||
useWorkspaceStateDirectory(t)
|
||||
|
||||
storeInstance, err := New(":memory:")
|
||||
require.NoError(t, err)
|
||||
defer storeInstance.Close()
|
||||
|
||||
workspace, err := storeInstance.NewWorkspace("medium-import-jsonl")
|
||||
require.NoError(t, err)
|
||||
defer workspace.Discard()
|
||||
|
||||
medium := newMemoryMedium()
|
||||
require.NoError(t, medium.Write("data.jsonl", `{"user":"@alice"}
|
||||
{"user":"@bob"}
|
||||
`))
|
||||
|
||||
require.NoError(t, Import(workspace, medium, "data.jsonl"))
|
||||
|
||||
rows := requireResultRows(t, workspace.Query("SELECT entry_kind, entry_data FROM workspace_entries ORDER BY entry_id"))
|
||||
require.Len(t, rows, 2)
|
||||
assert.Equal(t, "data", rows[0]["entry_kind"])
|
||||
assert.Contains(t, rows[0]["entry_data"], "@alice")
|
||||
assert.Contains(t, rows[1]["entry_data"], "@bob")
|
||||
}
|
||||
|
||||
func TestMedium_Import_Good_JSONArray(t *testing.T) {
|
||||
useWorkspaceStateDirectory(t)
|
||||
|
||||
storeInstance, err := New(":memory:")
|
||||
require.NoError(t, err)
|
||||
defer storeInstance.Close()
|
||||
|
||||
workspace, err := storeInstance.NewWorkspace("medium-import-json-array")
|
||||
require.NoError(t, err)
|
||||
defer workspace.Discard()
|
||||
|
||||
medium := newMemoryMedium()
|
||||
require.NoError(t, medium.Write("users.json", `[{"name":"Alice"},{"name":"Bob"},{"name":"Carol"}]`))
|
||||
|
||||
require.NoError(t, Import(workspace, medium, "users.json"))
|
||||
|
||||
assert.Equal(t, map[string]any{"users": 3}, workspace.Aggregate())
|
||||
}
|
||||
|
||||
func TestMedium_Import_Good_CSV(t *testing.T) {
|
||||
useWorkspaceStateDirectory(t)
|
||||
|
||||
storeInstance, err := New(":memory:")
|
||||
require.NoError(t, err)
|
||||
defer storeInstance.Close()
|
||||
|
||||
workspace, err := storeInstance.NewWorkspace("medium-import-csv")
|
||||
require.NoError(t, err)
|
||||
defer workspace.Discard()
|
||||
|
||||
medium := newMemoryMedium()
|
||||
require.NoError(t, medium.Write("findings.csv", "tool,severity\ngosec,high\ngolint,low\n"))
|
||||
|
||||
require.NoError(t, Import(workspace, medium, "findings.csv"))
|
||||
|
||||
assert.Equal(t, map[string]any{"findings": 2}, workspace.Aggregate())
|
||||
}
|
||||
|
||||
func TestMedium_Import_Bad_NilArguments(t *testing.T) {
|
||||
useWorkspaceStateDirectory(t)
|
||||
|
||||
storeInstance, err := New(":memory:")
|
||||
require.NoError(t, err)
|
||||
defer storeInstance.Close()
|
||||
|
||||
workspace, err := storeInstance.NewWorkspace("medium-import-bad")
|
||||
require.NoError(t, err)
|
||||
defer workspace.Discard()
|
||||
|
||||
medium := newMemoryMedium()
|
||||
|
||||
require.Error(t, Import(nil, medium, "data.json"))
|
||||
require.Error(t, Import(workspace, nil, "data.json"))
|
||||
require.Error(t, Import(workspace, medium, ""))
|
||||
}
|
||||
|
||||
func TestMedium_Import_Ugly_MissingFileReturnsError(t *testing.T) {
|
||||
useWorkspaceStateDirectory(t)
|
||||
|
||||
storeInstance, err := New(":memory:")
|
||||
require.NoError(t, err)
|
||||
defer storeInstance.Close()
|
||||
|
||||
workspace, err := storeInstance.NewWorkspace("medium-import-missing")
|
||||
require.NoError(t, err)
|
||||
defer workspace.Discard()
|
||||
|
||||
medium := newMemoryMedium()
|
||||
require.Error(t, Import(workspace, medium, "ghost.jsonl"))
|
||||
}
|
||||
|
||||
func TestMedium_Export_Good_JSON(t *testing.T) {
|
||||
useWorkspaceStateDirectory(t)
|
||||
|
||||
storeInstance, err := New(":memory:")
|
||||
require.NoError(t, err)
|
||||
defer storeInstance.Close()
|
||||
|
||||
workspace, err := storeInstance.NewWorkspace("medium-export-json")
|
||||
require.NoError(t, err)
|
||||
defer workspace.Discard()
|
||||
|
||||
require.NoError(t, workspace.Put("like", map[string]any{"user": "@alice"}))
|
||||
require.NoError(t, workspace.Put("like", map[string]any{"user": "@bob"}))
|
||||
require.NoError(t, workspace.Put("profile_match", map[string]any{"user": "@carol"}))
|
||||
|
||||
medium := newMemoryMedium()
|
||||
require.NoError(t, Export(workspace, medium, "report.json"))
|
||||
|
||||
assert.True(t, medium.Exists("report.json"))
|
||||
content, err := medium.Read("report.json")
|
||||
require.NoError(t, err)
|
||||
assert.Contains(t, content, `"like":2`)
|
||||
assert.Contains(t, content, `"profile_match":1`)
|
||||
}
|
||||
|
||||
func TestMedium_Export_Good_JSONLines(t *testing.T) {
|
||||
useWorkspaceStateDirectory(t)
|
||||
|
||||
storeInstance, err := New(":memory:")
|
||||
require.NoError(t, err)
|
||||
defer storeInstance.Close()
|
||||
|
||||
workspace, err := storeInstance.NewWorkspace("medium-export-jsonl")
|
||||
require.NoError(t, err)
|
||||
defer workspace.Discard()
|
||||
|
||||
require.NoError(t, workspace.Put("like", map[string]any{"user": "@alice"}))
|
||||
require.NoError(t, workspace.Put("like", map[string]any{"user": "@bob"}))
|
||||
|
||||
medium := newMemoryMedium()
|
||||
require.NoError(t, Export(workspace, medium, "report.jsonl"))
|
||||
|
||||
content, err := medium.Read("report.jsonl")
|
||||
require.NoError(t, err)
|
||||
lines := 0
|
||||
for _, line := range splitNewlines(content) {
|
||||
if line != "" {
|
||||
lines++
|
||||
}
|
||||
}
|
||||
assert.Equal(t, 2, lines)
|
||||
}
|
||||
|
||||
func TestMedium_Export_Bad_NilArguments(t *testing.T) {
|
||||
useWorkspaceStateDirectory(t)
|
||||
|
||||
storeInstance, err := New(":memory:")
|
||||
require.NoError(t, err)
|
||||
defer storeInstance.Close()
|
||||
|
||||
workspace, err := storeInstance.NewWorkspace("medium-export-bad")
|
||||
require.NoError(t, err)
|
||||
defer workspace.Discard()
|
||||
|
||||
medium := newMemoryMedium()
|
||||
|
||||
require.Error(t, Export(nil, medium, "report.json"))
|
||||
require.Error(t, Export(workspace, nil, "report.json"))
|
||||
require.Error(t, Export(workspace, medium, ""))
|
||||
}
|
||||
|
||||
func TestMedium_Compact_Good_MediumRoutesArchive(t *testing.T) {
|
||||
useWorkspaceStateDirectory(t)
|
||||
useArchiveOutputDirectory(t)
|
||||
|
||||
medium := newMemoryMedium()
|
||||
storeInstance, err := New(":memory:", WithJournal("http://127.0.0.1:8086", "core", "events"), WithMedium(medium))
|
||||
require.NoError(t, err)
|
||||
defer storeInstance.Close()
|
||||
|
||||
require.True(t, storeInstance.CommitToJournal("jobs", map[string]any{"count": 3}, map[string]string{"workspace": "jobs-1"}).OK)
|
||||
|
||||
result := storeInstance.Compact(CompactOptions{
|
||||
Before: time.Now().Add(time.Minute),
|
||||
Output: "archive/",
|
||||
Format: "gzip",
|
||||
})
|
||||
require.True(t, result.OK, "compact result: %v", result.Value)
|
||||
outputPath, ok := result.Value.(string)
|
||||
require.True(t, ok)
|
||||
require.NotEmpty(t, outputPath)
|
||||
assert.True(t, medium.Exists(outputPath), "compact should write through medium at %s", outputPath)
|
||||
}
|
||||
|
||||
func splitNewlines(content string) []string {
|
||||
var result []string
|
||||
current := core.NewBuilder()
|
||||
for index := 0; index < len(content); index++ {
|
||||
character := content[index]
|
||||
if character == '\n' {
|
||||
result = append(result, current.String())
|
||||
current.Reset()
|
||||
continue
|
||||
}
|
||||
current.WriteByte(character)
|
||||
}
|
||||
if current.Len() > 0 {
|
||||
result = append(result, current.String())
|
||||
}
|
||||
return result
|
||||
}
|
||||
8
store.go
8
store.go
|
|
@ -45,6 +45,11 @@ type StoreConfig struct {
|
|||
PurgeInterval time.Duration
|
||||
// Usage example: `config := store.StoreConfig{WorkspaceStateDirectory: "/tmp/core-state"}`
|
||||
WorkspaceStateDirectory string
|
||||
// Usage example: `medium, _ := local.New("/srv/core"); config := store.StoreConfig{DatabasePath: ":memory:", Medium: medium}`
|
||||
// Medium overrides the raw filesystem for Compact archives and Import /
|
||||
// Export helpers, letting tests and production swap the backing transport
|
||||
// (memory, S3, cube) without touching the store API.
|
||||
Medium Medium
|
||||
}
|
||||
|
||||
// Usage example: `config := (store.StoreConfig{DatabasePath: ":memory:"}).Normalised(); fmt.Println(config.PurgeInterval, config.WorkspaceStateDirectory)`
|
||||
|
|
@ -139,6 +144,7 @@ type Store struct {
|
|||
purgeWaitGroup sync.WaitGroup
|
||||
purgeInterval time.Duration // interval between background purge cycles
|
||||
journalConfiguration JournalConfiguration
|
||||
medium Medium
|
||||
lifecycleLock sync.Mutex
|
||||
isClosed bool
|
||||
|
||||
|
|
@ -223,6 +229,7 @@ func (storeInstance *Store) Config() StoreConfig {
|
|||
Journal: storeInstance.JournalConfiguration(),
|
||||
PurgeInterval: storeInstance.purgeInterval,
|
||||
WorkspaceStateDirectory: storeInstance.WorkspaceStateDirectory(),
|
||||
Medium: storeInstance.medium,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -289,6 +296,7 @@ func openConfiguredStore(operation string, storeConfig StoreConfig) (*Store, err
|
|||
}
|
||||
storeInstance.purgeInterval = storeConfig.PurgeInterval
|
||||
storeInstance.workspaceStateDirectory = storeConfig.WorkspaceStateDirectory
|
||||
storeInstance.medium = storeConfig.Medium
|
||||
|
||||
// New() performs a non-destructive orphan scan so callers can discover
|
||||
// leftover workspaces via RecoverOrphans().
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue