go-store/medium.go
Snider 9df2291d28
Some checks are pending
Security Scan / security (push) Waiting to run
Test / test (push) Waiting to run
Align store options with RFC
2026-04-15 11:11:46 +01:00

362 lines
9.9 KiB
Go

// SPDX-License-Identifier: EUPL-1.2
package store
import (
"bytes"
goio "io"
core "dappco.re/go/core"
"dappco.re/go/core/io"
)
// Medium is the minimal storage transport used by the go-store workspace
// import and export helpers and by Compact when writing cold archives.
//
// Any `dappco.re/go/core/io.Medium` implementation (local, memory, S3, cube,
// sftp) satisfies this interface by structural typing — go-store only needs a
// handful of methods to ferry bytes between the workspace buffer and the
// underlying medium.
//
// Usage example: `medium, _ := local.New("/tmp/exports"); storeInstance, err := store.New(":memory:", store.WithMedium(medium))`
type Medium interface {
Read(path string) (string, error)
Write(path, content string) error
EnsureDir(path string) error
Create(path string) (goio.WriteCloser, error)
Exists(path string) bool
}
// staticMediumCheck documents that `dappco.re/go/core/io.Medium` satisfies the
// in-package `store.Medium` interface — agents pass an `io.Medium` directly to
// `store.WithMedium` without an adapter.
var _ Medium = io.Medium(nil)
// Usage example: `medium, _ := local.New("/srv/core"); storeInstance, err := store.NewConfigured(store.StoreConfig{DatabasePath: ":memory:", Medium: medium})`
// WithMedium installs an io.Medium-compatible transport on the Store so that
// Compact archives and Import/Export helpers route through the medium instead
// of the raw filesystem.
func WithMedium(medium Medium) StoreOption {
return func(storeInstance *Store) {
if storeInstance == nil {
return
}
storeInstance.medium = medium
}
}
// Usage example: `medium := storeInstance.Medium(); if medium != nil { _ = medium.EnsureDir("exports") }`
func (storeInstance *Store) Medium() Medium {
if storeInstance == nil {
return nil
}
return storeInstance.medium
}
// Usage example: `err := store.Import(workspace, medium, "dataset.jsonl")`
// Import reads a JSON, JSONL, or CSV payload from the provided medium and
// appends each record to the workspace buffer as a `Put` entry. Format is
// chosen from the file extension: `.json` expects either a top-level array or
// `{"entries":[...]}` shape, `.jsonl`/`.ndjson` parse line-by-line, and `.csv`
// uses the first row as the header.
func Import(workspace *Workspace, medium Medium, path string) error {
if workspace == nil {
return core.E("store.Import", "workspace is nil", nil)
}
if medium == nil {
return core.E("store.Import", "medium is nil", nil)
}
if path == "" {
return core.E("store.Import", "path is empty", nil)
}
content, err := medium.Read(path)
if err != nil {
return core.E("store.Import", "read from medium", err)
}
kind := importEntryKind(path)
switch lowercaseText(importExtension(path)) {
case ".jsonl", ".ndjson":
return importJSONLines(workspace, kind, content)
case ".csv":
return importCSV(workspace, kind, content)
case ".json":
return importJSON(workspace, kind, content)
default:
return importJSONLines(workspace, kind, content)
}
}
// Usage example: `err := store.Export(workspace, medium, "report.json")`
// Export writes the workspace aggregate summary to the medium at the given
// path. Format is chosen from the extension: `.jsonl` writes one record per
// query row, `.csv` writes header + rows, everything else writes the
// aggregate as JSON.
func Export(workspace *Workspace, medium Medium, path string) error {
if workspace == nil {
return core.E("store.Export", "workspace is nil", nil)
}
if medium == nil {
return core.E("store.Export", "medium is nil", nil)
}
if path == "" {
return core.E("store.Export", "path is empty", nil)
}
if err := ensureMediumDir(medium, core.PathDir(path)); err != nil {
return core.E("store.Export", "ensure directory", err)
}
switch lowercaseText(importExtension(path)) {
case ".jsonl", ".ndjson":
return exportJSONLines(workspace, medium, path)
case ".csv":
return exportCSV(workspace, medium, path)
default:
return exportJSON(workspace, medium, path)
}
}
func ensureMediumDir(medium Medium, directory string) error {
if directory == "" || directory == "." || directory == "/" {
return nil
}
if err := medium.EnsureDir(directory); err != nil {
return core.E("store.ensureMediumDir", "ensure directory", err)
}
return nil
}
func importExtension(path string) string {
base := core.PathBase(path)
for i := len(base) - 1; i >= 0; i-- {
if base[i] == '.' {
return base[i:]
}
}
return ""
}
func importEntryKind(path string) string {
base := core.PathBase(path)
for i := len(base) - 1; i >= 0; i-- {
if base[i] == '.' {
base = base[:i]
break
}
}
if base == "" {
return "entry"
}
return base
}
func importJSONLines(workspace *Workspace, kind, content string) error {
scanner := core.Split(content, "\n")
for _, rawLine := range scanner {
line := core.Trim(rawLine)
if line == "" {
continue
}
record := map[string]any{}
if result := core.JSONUnmarshalString(line, &record); !result.OK {
err, _ := result.Value.(error)
return core.E("store.Import", "parse jsonl line", err)
}
if err := workspace.Put(kind, record); err != nil {
return core.E("store.Import", "put jsonl record", err)
}
}
return nil
}
func importJSON(workspace *Workspace, kind, content string) error {
trimmed := core.Trim(content)
if trimmed == "" {
return nil
}
var topLevel any
if result := core.JSONUnmarshalString(trimmed, &topLevel); !result.OK {
err, _ := result.Value.(error)
return core.E("store.Import", "parse json", err)
}
records := collectJSONRecords(topLevel)
for _, record := range records {
if err := workspace.Put(kind, record); err != nil {
return core.E("store.Import", "put json record", err)
}
}
return nil
}
func collectJSONRecords(value any) []map[string]any {
switch shape := value.(type) {
case []any:
records := make([]map[string]any, 0, len(shape))
for _, entry := range shape {
if record, ok := entry.(map[string]any); ok {
records = append(records, record)
}
}
return records
case map[string]any:
if nested, ok := shape["entries"].([]any); ok {
return collectJSONRecords(nested)
}
if nested, ok := shape["records"].([]any); ok {
return collectJSONRecords(nested)
}
if nested, ok := shape["data"].([]any); ok {
return collectJSONRecords(nested)
}
return []map[string]any{shape}
}
return nil
}
func importCSV(workspace *Workspace, kind, content string) error {
lines := core.Split(content, "\n")
if len(lines) == 0 {
return nil
}
header := splitCSVLine(lines[0])
if len(header) == 0 {
return nil
}
for _, rawLine := range lines[1:] {
line := trimTrailingCarriageReturn(rawLine)
if line == "" {
continue
}
fields := splitCSVLine(line)
record := make(map[string]any, len(header))
for columnIndex, columnName := range header {
if columnIndex < len(fields) {
record[columnName] = fields[columnIndex]
} else {
record[columnName] = ""
}
}
if err := workspace.Put(kind, record); err != nil {
return core.E("store.Import", "put csv record", err)
}
}
return nil
}
func splitCSVLine(line string) []string {
line = trimTrailingCarriageReturn(line)
var (
fields []string
buffer bytes.Buffer
inQuotes bool
wasEscaped bool
)
for index := 0; index < len(line); index++ {
character := line[index]
switch {
case character == '"' && inQuotes && index+1 < len(line) && line[index+1] == '"':
buffer.WriteByte('"')
index++
wasEscaped = true
case character == '"':
inQuotes = !inQuotes
case character == ',' && !inQuotes:
fields = append(fields, buffer.String())
buffer.Reset()
wasEscaped = false
default:
buffer.WriteByte(character)
}
}
fields = append(fields, buffer.String())
_ = wasEscaped
return fields
}
func exportJSON(workspace *Workspace, medium Medium, path string) error {
summary := workspace.Aggregate()
content := core.JSONMarshalString(summary)
if err := medium.Write(path, content); err != nil {
return core.E("store.Export", "write json", err)
}
return nil
}
func exportJSONLines(workspace *Workspace, medium Medium, path string) error {
result := workspace.Query("SELECT entry_kind, entry_data, created_at FROM workspace_entries ORDER BY entry_id")
if !result.OK {
err, _ := result.Value.(error)
return core.E("store.Export", "query workspace", err)
}
rows, ok := result.Value.([]map[string]any)
if !ok {
rows = nil
}
builder := core.NewBuilder()
for _, row := range rows {
line := core.JSONMarshalString(row)
builder.WriteString(line)
builder.WriteString("\n")
}
if err := medium.Write(path, builder.String()); err != nil {
return core.E("store.Export", "write jsonl", err)
}
return nil
}
func exportCSV(workspace *Workspace, medium Medium, path string) error {
result := workspace.Query("SELECT entry_kind, entry_data, created_at FROM workspace_entries ORDER BY entry_id")
if !result.OK {
err, _ := result.Value.(error)
return core.E("store.Export", "query workspace", err)
}
rows, ok := result.Value.([]map[string]any)
if !ok {
rows = nil
}
builder := core.NewBuilder()
builder.WriteString("entry_kind,entry_data,created_at\n")
for _, row := range rows {
builder.WriteString(csvField(core.Sprint(row["entry_kind"])))
builder.WriteString(",")
builder.WriteString(csvField(core.Sprint(row["entry_data"])))
builder.WriteString(",")
builder.WriteString(csvField(core.Sprint(row["created_at"])))
builder.WriteString("\n")
}
if err := medium.Write(path, builder.String()); err != nil {
return core.E("store.Export", "write csv", err)
}
return nil
}
func trimTrailingCarriageReturn(value string) string {
for len(value) > 0 && value[len(value)-1] == '\r' {
value = value[:len(value)-1]
}
return value
}
func csvField(value string) string {
needsQuote := false
for index := 0; index < len(value); index++ {
switch value[index] {
case ',', '"', '\n', '\r':
needsQuote = true
}
if needsQuote {
break
}
}
if !needsQuote {
return value
}
escaped := core.Replace(value, `"`, `""`)
return core.Concat(`"`, escaped, `"`)
}