348 lines
9.4 KiB
Go
348 lines
9.4 KiB
Go
// SPDX-License-Identifier: EUPL-1.2
|
|
|
|
package store
|
|
|
|
import (
|
|
"bytes"
|
|
|
|
core "dappco.re/go/core"
|
|
"dappco.re/go/core/io"
|
|
)
|
|
|
|
// Medium is the minimal storage transport used by the go-store workspace
|
|
// import and export helpers and by Compact when writing cold archives.
|
|
//
|
|
// This is an alias of `dappco.re/go/core/io.Medium`, so callers can pass any
|
|
// upstream medium implementation directly without an adapter.
|
|
//
|
|
// Usage example: `medium, _ := local.New("/tmp/exports"); storeInstance, err := store.New(":memory:", store.WithMedium(medium))`
|
|
type Medium = io.Medium
|
|
|
|
// Usage example: `medium, _ := local.New("/srv/core"); storeInstance, err := store.NewConfigured(store.StoreConfig{DatabasePath: ":memory:", Medium: medium})`
|
|
// WithMedium installs an io.Medium-compatible transport on the Store so that
|
|
// Compact archives and Import/Export helpers route through the medium instead
|
|
// of the raw filesystem.
|
|
func WithMedium(medium Medium) StoreOption {
|
|
return func(storeInstance *Store) {
|
|
if storeInstance == nil {
|
|
return
|
|
}
|
|
storeInstance.medium = medium
|
|
}
|
|
}
|
|
|
|
// Usage example: `medium := storeInstance.Medium(); if medium != nil { _ = medium.EnsureDir("exports") }`
|
|
func (storeInstance *Store) Medium() Medium {
|
|
if storeInstance == nil {
|
|
return nil
|
|
}
|
|
return storeInstance.medium
|
|
}
|
|
|
|
// Usage example: `err := store.Import(workspace, medium, "dataset.jsonl")`
|
|
// Import reads a JSON, JSONL, or CSV payload from the provided medium and
|
|
// appends each record to the workspace buffer as a `Put` entry. Format is
|
|
// chosen from the file extension: `.json` expects either a top-level array or
|
|
// `{"entries":[...]}` shape, `.jsonl`/`.ndjson` parse line-by-line, and `.csv`
|
|
// uses the first row as the header.
|
|
func Import(workspace *Workspace, medium Medium, path string) error {
|
|
if workspace == nil {
|
|
return core.E("store.Import", "workspace is nil", nil)
|
|
}
|
|
if medium == nil {
|
|
return core.E("store.Import", "medium is nil", nil)
|
|
}
|
|
if path == "" {
|
|
return core.E("store.Import", "path is empty", nil)
|
|
}
|
|
|
|
content, err := medium.Read(path)
|
|
if err != nil {
|
|
return core.E("store.Import", "read from medium", err)
|
|
}
|
|
|
|
kind := importEntryKind(path)
|
|
switch lowercaseText(importExtension(path)) {
|
|
case ".jsonl", ".ndjson":
|
|
return importJSONLines(workspace, kind, content)
|
|
case ".csv":
|
|
return importCSV(workspace, kind, content)
|
|
case ".json":
|
|
return importJSON(workspace, kind, content)
|
|
default:
|
|
return importJSONLines(workspace, kind, content)
|
|
}
|
|
}
|
|
|
|
// Usage example: `err := store.Export(workspace, medium, "report.json")`
|
|
// Export writes the workspace aggregate summary to the medium at the given
|
|
// path. Format is chosen from the extension: `.jsonl` writes one record per
|
|
// query row, `.csv` writes header + rows, everything else writes the
|
|
// aggregate as JSON.
|
|
func Export(workspace *Workspace, medium Medium, path string) error {
|
|
if workspace == nil {
|
|
return core.E("store.Export", "workspace is nil", nil)
|
|
}
|
|
if medium == nil {
|
|
return core.E("store.Export", "medium is nil", nil)
|
|
}
|
|
if path == "" {
|
|
return core.E("store.Export", "path is empty", nil)
|
|
}
|
|
|
|
if err := ensureMediumDir(medium, core.PathDir(path)); err != nil {
|
|
return core.E("store.Export", "ensure directory", err)
|
|
}
|
|
|
|
switch lowercaseText(importExtension(path)) {
|
|
case ".jsonl", ".ndjson":
|
|
return exportJSONLines(workspace, medium, path)
|
|
case ".csv":
|
|
return exportCSV(workspace, medium, path)
|
|
default:
|
|
return exportJSON(workspace, medium, path)
|
|
}
|
|
}
|
|
|
|
func ensureMediumDir(medium Medium, directory string) error {
|
|
if directory == "" || directory == "." || directory == "/" {
|
|
return nil
|
|
}
|
|
if err := medium.EnsureDir(directory); err != nil {
|
|
return core.E("store.ensureMediumDir", "ensure directory", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func importExtension(path string) string {
|
|
base := core.PathBase(path)
|
|
for i := len(base) - 1; i >= 0; i-- {
|
|
if base[i] == '.' {
|
|
return base[i:]
|
|
}
|
|
}
|
|
return ""
|
|
}
|
|
|
|
func importEntryKind(path string) string {
|
|
base := core.PathBase(path)
|
|
for i := len(base) - 1; i >= 0; i-- {
|
|
if base[i] == '.' {
|
|
base = base[:i]
|
|
break
|
|
}
|
|
}
|
|
if base == "" {
|
|
return "entry"
|
|
}
|
|
return base
|
|
}
|
|
|
|
func importJSONLines(workspace *Workspace, kind, content string) error {
|
|
scanner := core.Split(content, "\n")
|
|
for _, rawLine := range scanner {
|
|
line := core.Trim(rawLine)
|
|
if line == "" {
|
|
continue
|
|
}
|
|
record := map[string]any{}
|
|
if result := core.JSONUnmarshalString(line, &record); !result.OK {
|
|
err, _ := result.Value.(error)
|
|
return core.E("store.Import", "parse jsonl line", err)
|
|
}
|
|
if err := workspace.Put(kind, record); err != nil {
|
|
return core.E("store.Import", "put jsonl record", err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func importJSON(workspace *Workspace, kind, content string) error {
|
|
trimmed := core.Trim(content)
|
|
if trimmed == "" {
|
|
return nil
|
|
}
|
|
|
|
var topLevel any
|
|
if result := core.JSONUnmarshalString(trimmed, &topLevel); !result.OK {
|
|
err, _ := result.Value.(error)
|
|
return core.E("store.Import", "parse json", err)
|
|
}
|
|
|
|
records := collectJSONRecords(topLevel)
|
|
for _, record := range records {
|
|
if err := workspace.Put(kind, record); err != nil {
|
|
return core.E("store.Import", "put json record", err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func collectJSONRecords(value any) []map[string]any {
|
|
switch shape := value.(type) {
|
|
case []any:
|
|
records := make([]map[string]any, 0, len(shape))
|
|
for _, entry := range shape {
|
|
if record, ok := entry.(map[string]any); ok {
|
|
records = append(records, record)
|
|
}
|
|
}
|
|
return records
|
|
case map[string]any:
|
|
if nested, ok := shape["entries"].([]any); ok {
|
|
return collectJSONRecords(nested)
|
|
}
|
|
if nested, ok := shape["records"].([]any); ok {
|
|
return collectJSONRecords(nested)
|
|
}
|
|
if nested, ok := shape["data"].([]any); ok {
|
|
return collectJSONRecords(nested)
|
|
}
|
|
return []map[string]any{shape}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func importCSV(workspace *Workspace, kind, content string) error {
|
|
lines := core.Split(content, "\n")
|
|
if len(lines) == 0 {
|
|
return nil
|
|
}
|
|
header := splitCSVLine(lines[0])
|
|
if len(header) == 0 {
|
|
return nil
|
|
}
|
|
for _, rawLine := range lines[1:] {
|
|
line := trimTrailingCarriageReturn(rawLine)
|
|
if line == "" {
|
|
continue
|
|
}
|
|
fields := splitCSVLine(line)
|
|
record := make(map[string]any, len(header))
|
|
for columnIndex, columnName := range header {
|
|
if columnIndex < len(fields) {
|
|
record[columnName] = fields[columnIndex]
|
|
} else {
|
|
record[columnName] = ""
|
|
}
|
|
}
|
|
if err := workspace.Put(kind, record); err != nil {
|
|
return core.E("store.Import", "put csv record", err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func splitCSVLine(line string) []string {
|
|
line = trimTrailingCarriageReturn(line)
|
|
var (
|
|
fields []string
|
|
buffer bytes.Buffer
|
|
inQuotes bool
|
|
wasEscaped bool
|
|
)
|
|
for index := 0; index < len(line); index++ {
|
|
character := line[index]
|
|
switch {
|
|
case character == '"' && inQuotes && index+1 < len(line) && line[index+1] == '"':
|
|
buffer.WriteByte('"')
|
|
index++
|
|
wasEscaped = true
|
|
case character == '"':
|
|
inQuotes = !inQuotes
|
|
case character == ',' && !inQuotes:
|
|
fields = append(fields, buffer.String())
|
|
buffer.Reset()
|
|
wasEscaped = false
|
|
default:
|
|
buffer.WriteByte(character)
|
|
}
|
|
}
|
|
fields = append(fields, buffer.String())
|
|
_ = wasEscaped
|
|
return fields
|
|
}
|
|
|
|
func exportJSON(workspace *Workspace, medium Medium, path string) error {
|
|
summary := workspace.Aggregate()
|
|
content := core.JSONMarshalString(summary)
|
|
if err := medium.Write(path, content); err != nil {
|
|
return core.E("store.Export", "write json", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func exportJSONLines(workspace *Workspace, medium Medium, path string) error {
|
|
result := workspace.Query("SELECT entry_kind, entry_data, created_at FROM workspace_entries ORDER BY entry_id")
|
|
if !result.OK {
|
|
err, _ := result.Value.(error)
|
|
return core.E("store.Export", "query workspace", err)
|
|
}
|
|
rows, ok := result.Value.([]map[string]any)
|
|
if !ok {
|
|
rows = nil
|
|
}
|
|
|
|
builder := core.NewBuilder()
|
|
for _, row := range rows {
|
|
line := core.JSONMarshalString(row)
|
|
builder.WriteString(line)
|
|
builder.WriteString("\n")
|
|
}
|
|
if err := medium.Write(path, builder.String()); err != nil {
|
|
return core.E("store.Export", "write jsonl", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func exportCSV(workspace *Workspace, medium Medium, path string) error {
|
|
result := workspace.Query("SELECT entry_kind, entry_data, created_at FROM workspace_entries ORDER BY entry_id")
|
|
if !result.OK {
|
|
err, _ := result.Value.(error)
|
|
return core.E("store.Export", "query workspace", err)
|
|
}
|
|
rows, ok := result.Value.([]map[string]any)
|
|
if !ok {
|
|
rows = nil
|
|
}
|
|
|
|
builder := core.NewBuilder()
|
|
builder.WriteString("entry_kind,entry_data,created_at\n")
|
|
for _, row := range rows {
|
|
builder.WriteString(csvField(core.Sprint(row["entry_kind"])))
|
|
builder.WriteString(",")
|
|
builder.WriteString(csvField(core.Sprint(row["entry_data"])))
|
|
builder.WriteString(",")
|
|
builder.WriteString(csvField(core.Sprint(row["created_at"])))
|
|
builder.WriteString("\n")
|
|
}
|
|
if err := medium.Write(path, builder.String()); err != nil {
|
|
return core.E("store.Export", "write csv", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func trimTrailingCarriageReturn(value string) string {
|
|
for len(value) > 0 && value[len(value)-1] == '\r' {
|
|
value = value[:len(value)-1]
|
|
}
|
|
return value
|
|
}
|
|
|
|
func csvField(value string) string {
|
|
needsQuote := false
|
|
for index := 0; index < len(value); index++ {
|
|
switch value[index] {
|
|
case ',', '"', '\n', '\r':
|
|
needsQuote = true
|
|
}
|
|
if needsQuote {
|
|
break
|
|
}
|
|
}
|
|
if !needsQuote {
|
|
return value
|
|
}
|
|
escaped := core.Replace(value, `"`, `""`)
|
|
return core.Concat(`"`, escaped, `"`)
|
|
}
|