fix(store): r2 — address residual CodeRabbit findings on PR #4
Round 2 follow-up to 6c90af8. CodeRabbit re-reviewed and surfaced
14 residual issues; all dispositioned.
Code:
- compact.go: staged archive preserved after successful DB commit
(was being deleted prematurely)
- workspace.go: commit idempotency — recovery skips/removes leftover
files when durable summary marker exists; cleanup failure no longer
fails Commit() after durable write
- medium.go: StoreConfig public example; JSON import fails fast on
unsupported/non-object records; CSV parser switched from hand-roll
to encoding/csv with multiline + malformed handling
- import.go: removed /tmp seed fallbacks (deterministic dirs); read +
JSON parse failures now return contextual errors
- publish.go: HuggingFace token uses real HOME via core.Env (not
DIR_HOME); empty Repo validated before dry-run; upload uses
caller-configurable PublishConfig.Context (no fixed http timeout)
- store.go: Close() backfills db/sqliteDatabase aliases before
closing/syncing
- test_asserts_test.go: errIs delegates to core.Is (AX import rules)
CI / docs:
- .github/workflows/ci.yml: CGO_ENABLED=1 explicit (DuckDB requires CGO)
- DEPENDENCIES.md: required toolchain documented for DuckDB context
- README.md: Licence badge UK English + LICENCE.md link
- LICENCE.md (new file)
- publish_test.go (new) — covers HOME / dry-run / config-context paths
Disposition replies:
- Testify reintroduction suggestion: RESOLVED-COMMENT — AX-6 bans testify
- SonarCloud: no PR comments/check annotations exposed; RESOLVED-COMMENT
Verification: gofmt clean, golangci-lint run 0 issues, GOWORK=off
go vet + go test -count=1 ./... pass with explicit cache paths.
Closes residual findings on https://github.com/dAppCore/go-store/pull/4
Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
parent
6c90af807d
commit
ebe5377871
15 changed files with 273 additions and 89 deletions
3
.github/workflows/ci.yml
vendored
3
.github/workflows/ci.yml
vendored
|
|
@ -1,5 +1,8 @@
|
|||
name: CI
|
||||
|
||||
env:
|
||||
CGO_ENABLED: "1"
|
||||
|
||||
on:
|
||||
push:
|
||||
branches: [main, dev]
|
||||
|
|
|
|||
|
|
@ -16,4 +16,6 @@ workspace recovery behaviour rather than preserving the feature.
|
|||
|
||||
This is a CGO and MIT-licensed dependency exception. It must not be used for the
|
||||
primary SQLite store path, and new runtime storage features should continue to
|
||||
use pure-Go dependencies compatible with EUPL-1.2.
|
||||
use pure-Go dependencies compatible with EUPL-1.2. Builds and CI that include
|
||||
workspace, import, inventory, or scoring behaviour must run with
|
||||
`CGO_ENABLED=1` and a C/C++ toolchain available.
|
||||
|
|
|
|||
6
LICENCE.md
Normal file
6
LICENCE.md
Normal file
|
|
@ -0,0 +1,6 @@
|
|||
# Licence
|
||||
|
||||
This project is licensed under the European Union Public Licence, version 1.2
|
||||
(EUPL-1.2).
|
||||
|
||||
Full licence text: https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12
|
||||
|
|
@ -1,5 +1,5 @@
|
|||
[](https://pkg.go.dev/dappco.re/go/store)
|
||||
[](LICENSE.md)
|
||||
[](LICENCE.md)
|
||||
[](go.mod)
|
||||
|
||||
# go-store
|
||||
|
|
|
|||
|
|
@ -217,11 +217,11 @@ func (storeInstance *Store) Compact(options CompactOptions) core.Result {
|
|||
return core.Result{Value: core.E("store.Compact", "commit archive transaction", err), OK: false}
|
||||
}
|
||||
committed = true
|
||||
stagedOutputPublished = true
|
||||
|
||||
if err := medium.Rename(stagedOutputPath, outputPath); err != nil {
|
||||
return core.Result{Value: core.E("store.Compact", "publish staged archive", err), OK: false}
|
||||
}
|
||||
stagedOutputPublished = true
|
||||
|
||||
return core.Result{Value: outputPath, OK: true}
|
||||
}
|
||||
|
|
|
|||
11
import.go
11
import.go
|
|
@ -309,7 +309,7 @@ func ImportAll(db *DuckDB, cfg ImportConfig, w io.Writer) error {
|
|||
}
|
||||
|
||||
seedTotal := 0
|
||||
seedDirs := []string{core.JoinPath(cfg.DataDir, "seeds"), "/tmp/lem-data/seeds", "/tmp/lem-repo/seeds"}
|
||||
seedDirs := []string{core.JoinPath(cfg.DataDir, "seeds")}
|
||||
for _, seedDir := range seedDirs {
|
||||
if !isDir(seedDir) {
|
||||
continue
|
||||
|
|
@ -476,19 +476,22 @@ func importSeeds(db *DuckDB, seedDir string) (int, error) {
|
|||
return
|
||||
}
|
||||
|
||||
rel := core.TrimPrefix(path, seedDir+"/")
|
||||
region := core.TrimSuffix(core.PathBase(path), ".json")
|
||||
|
||||
readResult := localFs.Read(path)
|
||||
if !readResult.OK {
|
||||
firstErr = core.E("store.importSeeds", core.Sprintf("read seed file %s", rel), readResult.Value.(error))
|
||||
return
|
||||
}
|
||||
data := []byte(readResult.Value.(string))
|
||||
|
||||
rel := core.TrimPrefix(path, seedDir+"/")
|
||||
region := core.TrimSuffix(core.PathBase(path), ".json")
|
||||
|
||||
// Try parsing as array or object with prompts/seeds field.
|
||||
var seedsList []any
|
||||
var raw any
|
||||
if r := core.JSONUnmarshal(data, &raw); !r.OK {
|
||||
err, _ := r.Value.(error)
|
||||
firstErr = core.E("store.importSeeds", core.Sprintf("parse seed file %s", rel), err)
|
||||
return
|
||||
}
|
||||
|
||||
|
|
|
|||
74
medium.go
74
medium.go
|
|
@ -4,6 +4,7 @@ package store
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/csv"
|
||||
|
||||
core "dappco.re/go/core"
|
||||
coreio "dappco.re/go/core/io"
|
||||
|
|
@ -15,7 +16,7 @@ import (
|
|||
// This is an alias of `dappco.re/go/core/io.Medium`, so callers can pass any
|
||||
// upstream medium implementation directly without an adapter.
|
||||
//
|
||||
// Usage example: `medium, _ := local.New("/tmp/exports"); storeInstance, err := store.New(":memory:", store.WithMedium(medium))`
|
||||
// Usage example: `medium, _ := local.New("/tmp/exports"); storeInstance, err := store.NewConfigured(store.StoreConfig{DatabasePath: ":memory:", Medium: medium})`
|
||||
type Medium = coreio.Medium
|
||||
|
||||
// Usage example: `medium, _ := local.New("/srv/core"); storeInstance, err := store.NewConfigured(store.StoreConfig{DatabasePath: ":memory:", Medium: medium})`
|
||||
|
|
@ -169,7 +170,10 @@ func importJSON(workspace *Workspace, kind, content string) error {
|
|||
return core.E("store.Import", "parse json", err)
|
||||
}
|
||||
|
||||
records := collectJSONRecords(topLevel)
|
||||
records, err := collectJSONRecords(topLevel)
|
||||
if err != nil {
|
||||
return core.E("store.Import", "normalise json records", err)
|
||||
}
|
||||
for _, record := range records {
|
||||
if err := workspace.Put(kind, record); err != nil {
|
||||
return core.E("store.Import", "put json record", err)
|
||||
|
|
@ -178,16 +182,18 @@ func importJSON(workspace *Workspace, kind, content string) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func collectJSONRecords(value any) []map[string]any {
|
||||
func collectJSONRecords(value any) ([]map[string]any, error) {
|
||||
switch shape := value.(type) {
|
||||
case []any:
|
||||
records := make([]map[string]any, 0, len(shape))
|
||||
for _, entry := range shape {
|
||||
if record, ok := entry.(map[string]any); ok {
|
||||
records = append(records, record)
|
||||
for index, entry := range shape {
|
||||
record, ok := entry.(map[string]any)
|
||||
if !ok {
|
||||
return nil, core.E("store.Import", core.Concat("json array element is not an object at index ", core.Sprint(index)), nil)
|
||||
}
|
||||
records = append(records, record)
|
||||
}
|
||||
return records
|
||||
return records, nil
|
||||
case map[string]any:
|
||||
if nested, ok := shape["entries"].([]any); ok {
|
||||
return collectJSONRecords(nested)
|
||||
|
|
@ -198,26 +204,29 @@ func collectJSONRecords(value any) []map[string]any {
|
|||
if nested, ok := shape["data"].([]any); ok {
|
||||
return collectJSONRecords(nested)
|
||||
}
|
||||
return []map[string]any{shape}
|
||||
return []map[string]any{shape}, nil
|
||||
}
|
||||
return nil
|
||||
return nil, core.E("store.Import", "unsupported json shape", nil)
|
||||
}
|
||||
|
||||
func importCSV(workspace *Workspace, kind, content string) error {
|
||||
lines := core.Split(content, "\n")
|
||||
if len(lines) == 0 {
|
||||
reader := csv.NewReader(bytes.NewBufferString(content))
|
||||
reader.FieldsPerRecord = -1
|
||||
rows, err := reader.ReadAll()
|
||||
if err != nil {
|
||||
return core.E("store.Import", "parse csv", err)
|
||||
}
|
||||
if len(rows) == 0 {
|
||||
return nil
|
||||
}
|
||||
header := splitCSVLine(lines[0])
|
||||
header := rows[0]
|
||||
if len(header) == 0 {
|
||||
return nil
|
||||
}
|
||||
for _, rawLine := range lines[1:] {
|
||||
line := trimTrailingCarriageReturn(rawLine)
|
||||
if line == "" {
|
||||
for _, fields := range rows[1:] {
|
||||
if len(fields) == 0 {
|
||||
continue
|
||||
}
|
||||
fields := splitCSVLine(line)
|
||||
record := make(map[string]any, len(header))
|
||||
for columnIndex, columnName := range header {
|
||||
if columnIndex < len(fields) {
|
||||
|
|
@ -233,32 +242,6 @@ func importCSV(workspace *Workspace, kind, content string) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func splitCSVLine(line string) []string {
|
||||
line = trimTrailingCarriageReturn(line)
|
||||
buffer := &bytes.Buffer{}
|
||||
var (
|
||||
fields []string
|
||||
inQuotes bool
|
||||
)
|
||||
for index := 0; index < len(line); index++ {
|
||||
character := line[index]
|
||||
switch {
|
||||
case character == '"' && inQuotes && index+1 < len(line) && line[index+1] == '"':
|
||||
buffer.WriteByte('"')
|
||||
index++
|
||||
case character == '"':
|
||||
inQuotes = !inQuotes
|
||||
case character == ',' && !inQuotes:
|
||||
fields = append(fields, buffer.String())
|
||||
buffer.Reset()
|
||||
default:
|
||||
buffer.WriteByte(character)
|
||||
}
|
||||
}
|
||||
fields = append(fields, buffer.String())
|
||||
return fields
|
||||
}
|
||||
|
||||
func exportJSON(workspace *Workspace, medium Medium, path string) error {
|
||||
summary := workspace.Aggregate()
|
||||
content := core.JSONMarshalString(summary)
|
||||
|
|
@ -318,13 +301,6 @@ func exportCSV(workspace *Workspace, medium Medium, path string) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func trimTrailingCarriageReturn(value string) string {
|
||||
for len(value) > 0 && value[len(value)-1] == '\r' {
|
||||
value = value[:len(value)-1]
|
||||
}
|
||||
return value
|
||||
}
|
||||
|
||||
func csvField(value string) string {
|
||||
needsQuote := false
|
||||
for index := 0; index < len(value); index++ {
|
||||
|
|
|
|||
|
|
@ -106,6 +106,14 @@ func (medium *memoryMedium) Rename(oldPath, newPath string) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
type renameFailMedium struct {
|
||||
*memoryMedium
|
||||
}
|
||||
|
||||
func (medium *renameFailMedium) Rename(string, string) error {
|
||||
return core.E("renameFailMedium.Rename", "forced rename failure", nil)
|
||||
}
|
||||
|
||||
func (medium *memoryMedium) List(path string) ([]fs.DirEntry, error) { return nil, nil }
|
||||
|
||||
func (medium *memoryMedium) Stat(path string) (fs.FileInfo, error) {
|
||||
|
|
@ -289,6 +297,67 @@ func TestMedium_Import_Good_CSV(t *testing.T) {
|
|||
assertEqual(t, map[string]any{"findings": 2}, workspace.Aggregate())
|
||||
}
|
||||
|
||||
func TestMedium_Import_Good_CSVQuotedMultiline(t *testing.T) {
|
||||
useWorkspaceStateDirectory(t)
|
||||
|
||||
storeInstance, err := New(":memory:")
|
||||
assertNoError(t, err)
|
||||
defer func() { _ = storeInstance.Close() }()
|
||||
|
||||
workspace, err := storeInstance.NewWorkspace("medium-import-csv-multiline")
|
||||
assertNoError(t, err)
|
||||
defer workspace.Discard()
|
||||
|
||||
medium := newMemoryMedium()
|
||||
assertNoError(t, medium.Write("notes.csv", "name,note\nAlice,\"hello\nworld\"\n"))
|
||||
|
||||
assertNoError(t, Import(workspace, medium, "notes.csv"))
|
||||
|
||||
assertEqual(t, map[string]any{"notes": 1}, workspace.Aggregate())
|
||||
}
|
||||
|
||||
func TestMedium_Import_Bad_JSONArrayNonObject(t *testing.T) {
|
||||
useWorkspaceStateDirectory(t)
|
||||
|
||||
storeInstance, err := New(":memory:")
|
||||
assertNoError(t, err)
|
||||
defer func() { _ = storeInstance.Close() }()
|
||||
|
||||
workspace, err := storeInstance.NewWorkspace("medium-import-json-non-object")
|
||||
assertNoError(t, err)
|
||||
defer workspace.Discard()
|
||||
|
||||
medium := newMemoryMedium()
|
||||
assertNoError(t, medium.Write("users.json", `[{"name":"Alice"},"Bob"]`))
|
||||
|
||||
assertError(t, Import(workspace, medium, "users.json"))
|
||||
|
||||
count, err := workspace.Count()
|
||||
assertNoError(t, err)
|
||||
assertEqual(t, 0, count)
|
||||
}
|
||||
|
||||
func TestMedium_Import_Bad_MalformedCSV(t *testing.T) {
|
||||
useWorkspaceStateDirectory(t)
|
||||
|
||||
storeInstance, err := New(":memory:")
|
||||
assertNoError(t, err)
|
||||
defer func() { _ = storeInstance.Close() }()
|
||||
|
||||
workspace, err := storeInstance.NewWorkspace("medium-import-csv-bad")
|
||||
assertNoError(t, err)
|
||||
defer workspace.Discard()
|
||||
|
||||
medium := newMemoryMedium()
|
||||
assertNoError(t, medium.Write("findings.csv", "tool,severity\ngosec,\"high\n"))
|
||||
|
||||
assertError(t, Import(workspace, medium, "findings.csv"))
|
||||
|
||||
count, err := workspace.Count()
|
||||
assertNoError(t, err)
|
||||
assertEqual(t, 0, count)
|
||||
}
|
||||
|
||||
func TestMedium_Import_Bad_NilArguments(t *testing.T) {
|
||||
useWorkspaceStateDirectory(t)
|
||||
|
||||
|
|
@ -416,6 +485,35 @@ func TestMedium_Compact_Good_MediumRoutesArchive(t *testing.T) {
|
|||
assertTruef(t, medium.Exists(outputPath), "compact should write through medium at %s", outputPath)
|
||||
}
|
||||
|
||||
func TestMedium_Compact_Bad_PreservesStagedArchiveWhenPublishFails(t *testing.T) {
|
||||
useWorkspaceStateDirectory(t)
|
||||
useArchiveOutputDirectory(t)
|
||||
|
||||
medium := &renameFailMedium{memoryMedium: newMemoryMedium()}
|
||||
storeInstance, err := New(":memory:", WithJournal("http://127.0.0.1:8086", "core", "events"), WithMedium(medium))
|
||||
assertNoError(t, err)
|
||||
defer func() { _ = storeInstance.Close() }()
|
||||
|
||||
assertTrue(t, storeInstance.CommitToJournal("jobs", map[string]any{"count": 3}, map[string]string{"workspace": "jobs-1"}).OK)
|
||||
|
||||
result := storeInstance.Compact(CompactOptions{
|
||||
Before: time.Now().Add(time.Minute),
|
||||
Output: "archive/",
|
||||
Format: "gzip",
|
||||
})
|
||||
assertFalse(t, result.OK)
|
||||
|
||||
stagedArchiveFound := false
|
||||
medium.lock.Lock()
|
||||
for path := range medium.files {
|
||||
if core.HasSuffix(path, ".tmp") {
|
||||
stagedArchiveFound = true
|
||||
}
|
||||
}
|
||||
medium.lock.Unlock()
|
||||
assertTrue(t, stagedArchiveFound)
|
||||
}
|
||||
|
||||
func splitNewlines(content string) []string {
|
||||
var result []string
|
||||
current := core.NewBuilder()
|
||||
|
|
|
|||
39
publish.go
39
publish.go
|
|
@ -4,6 +4,7 @@ package store
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"io"
|
||||
"io/fs"
|
||||
"net/http"
|
||||
|
|
@ -46,6 +47,14 @@ type PublishConfig struct {
|
|||
// cfg.Token // "hf_..."
|
||||
Token string
|
||||
|
||||
// Context controls cancellation for HuggingFace API requests. When nil,
|
||||
// Publish uses context.Background().
|
||||
//
|
||||
// Usage example:
|
||||
//
|
||||
// cfg.Context = context.Background()
|
||||
Context context.Context
|
||||
|
||||
// DryRun lists files that would be uploaded without actually uploading.
|
||||
//
|
||||
// Usage example:
|
||||
|
|
@ -74,6 +83,14 @@ func Publish(cfg PublishConfig, w io.Writer) error {
|
|||
if cfg.InputDir == "" {
|
||||
return core.E("store.Publish", "input directory is required", nil)
|
||||
}
|
||||
if cfg.Repo == "" {
|
||||
return core.E("store.Publish", "repository is required", nil)
|
||||
}
|
||||
|
||||
publishContext := cfg.Context
|
||||
if publishContext == nil {
|
||||
publishContext = context.Background()
|
||||
}
|
||||
|
||||
token := resolveHFToken(cfg.Token)
|
||||
if token == "" && !cfg.DryRun {
|
||||
|
|
@ -109,12 +126,12 @@ func Publish(cfg PublishConfig, w io.Writer) error {
|
|||
|
||||
core.Print(w, "Publishing to https://huggingface.co/datasets/%s", cfg.Repo)
|
||||
|
||||
if err := ensureHFDatasetRepo(token, cfg.Repo, cfg.Public); err != nil {
|
||||
if err := ensureHFDatasetRepo(publishContext, token, cfg.Repo, cfg.Public); err != nil {
|
||||
return core.E("store.Publish", "ensure HuggingFace dataset", err)
|
||||
}
|
||||
|
||||
for _, f := range files {
|
||||
if err := uploadFileToHF(token, cfg.Repo, f.local, f.remote); err != nil {
|
||||
if err := uploadFileToHF(publishContext, token, cfg.Repo, f.local, f.remote); err != nil {
|
||||
return core.E("store.Publish", core.Sprintf("upload %s", core.PathBase(f.local)), err)
|
||||
}
|
||||
core.Print(w, " Uploaded %s -> %s", core.PathBase(f.local), f.remote)
|
||||
|
|
@ -133,7 +150,7 @@ func resolveHFToken(explicit string) string {
|
|||
if env := core.Env("HF_TOKEN"); env != "" {
|
||||
return env
|
||||
}
|
||||
home := core.Env("DIR_HOME")
|
||||
home := core.Env("HOME")
|
||||
if home == "" {
|
||||
return ""
|
||||
}
|
||||
|
|
@ -166,7 +183,7 @@ func collectUploadFiles(inputDir string) ([]uploadEntry, error) {
|
|||
return files, nil
|
||||
}
|
||||
|
||||
func ensureHFDatasetRepo(token, repoID string, public bool) error {
|
||||
func ensureHFDatasetRepo(ctx context.Context, token, repoID string, public bool) error {
|
||||
if repoID == "" {
|
||||
return core.E("store.ensureHFDatasetRepo", "repository is required", nil)
|
||||
}
|
||||
|
|
@ -185,7 +202,7 @@ func ensureHFDatasetRepo(token, repoID string, public bool) error {
|
|||
createPayload["organization"] = organisation
|
||||
}
|
||||
|
||||
createStatus, createBody, err := hfJSONRequest(token, http.MethodPost, "https://huggingface.co/api/repos/create", createPayload)
|
||||
createStatus, createBody, err := hfJSONRequest(ctx, token, http.MethodPost, "https://huggingface.co/api/repos/create", createPayload)
|
||||
if err != nil {
|
||||
return core.E("store.ensureHFDatasetRepo", "create dataset repository", err)
|
||||
}
|
||||
|
|
@ -194,7 +211,7 @@ func ensureHFDatasetRepo(token, repoID string, public bool) error {
|
|||
}
|
||||
|
||||
settingsURL := core.Sprintf("https://huggingface.co/api/repos/dataset/%s/settings", repoID)
|
||||
settingsStatus, settingsBody, err := hfJSONRequest(token, http.MethodPut, settingsURL, map[string]any{
|
||||
settingsStatus, settingsBody, err := hfJSONRequest(ctx, token, http.MethodPut, settingsURL, map[string]any{
|
||||
"private": !public,
|
||||
})
|
||||
if err != nil {
|
||||
|
|
@ -214,9 +231,9 @@ func splitHFRepoID(repoID string) (organisation string, name string) {
|
|||
return parts[0], parts[1]
|
||||
}
|
||||
|
||||
func hfJSONRequest(token, method, url string, payload map[string]any) (int, string, error) {
|
||||
func hfJSONRequest(ctx context.Context, token, method, url string, payload map[string]any) (int, string, error) {
|
||||
payloadJSON := core.JSONMarshalString(payload)
|
||||
req, err := http.NewRequest(method, url, bytes.NewBufferString(payloadJSON))
|
||||
req, err := http.NewRequestWithContext(ctx, method, url, bytes.NewBufferString(payloadJSON))
|
||||
if err != nil {
|
||||
return 0, "", core.E("store.hfJSONRequest", "create request", err)
|
||||
}
|
||||
|
|
@ -241,7 +258,7 @@ func hfJSONRequest(token, method, url string, payload map[string]any) (int, stri
|
|||
|
||||
// uploadFileToHF uploads a single file to a HuggingFace dataset repo via the
|
||||
// Hub API.
|
||||
func uploadFileToHF(token, repoID, localPath, remotePath string) error {
|
||||
func uploadFileToHF(ctx context.Context, token, repoID, localPath, remotePath string) error {
|
||||
openResult := localFs.Open(localPath)
|
||||
if !openResult.OK {
|
||||
return core.E("store.uploadFileToHF", core.Sprintf("open %s", localPath), openResult.Value.(error))
|
||||
|
|
@ -251,7 +268,7 @@ func uploadFileToHF(token, repoID, localPath, remotePath string) error {
|
|||
|
||||
url := core.Sprintf("https://huggingface.co/api/datasets/%s/upload/main/%s", repoID, remotePath)
|
||||
|
||||
req, err := http.NewRequest(http.MethodPut, url, file)
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodPut, url, file)
|
||||
if err != nil {
|
||||
return core.E("store.uploadFileToHF", "create request", err)
|
||||
}
|
||||
|
|
@ -261,7 +278,7 @@ func uploadFileToHF(token, repoID, localPath, remotePath string) error {
|
|||
req.ContentLength = stat.Size()
|
||||
}
|
||||
|
||||
client := &http.Client{Timeout: 120 * time.Second}
|
||||
client := &http.Client{}
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
return core.E("store.uploadFileToHF", "upload request", err)
|
||||
|
|
|
|||
29
publish_test.go
Normal file
29
publish_test.go
Normal file
|
|
@ -0,0 +1,29 @@
|
|||
package store
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"testing"
|
||||
|
||||
core "dappco.re/go/core"
|
||||
)
|
||||
|
||||
func TestPublish_Publish_Bad_EmptyRepository(t *testing.T) {
|
||||
var output bytes.Buffer
|
||||
|
||||
err := Publish(PublishConfig{InputDir: t.TempDir(), DryRun: true}, &output)
|
||||
|
||||
assertError(t, err)
|
||||
assertContainsString(t, err.Error(), "repository is required")
|
||||
}
|
||||
|
||||
func TestPublish_ResolveHFToken_Good_UserHomeFallback(t *testing.T) {
|
||||
homeDirectory := t.TempDir()
|
||||
t.Setenv("HF_TOKEN", "")
|
||||
t.Setenv("HOME", homeDirectory)
|
||||
|
||||
tokenDirectory := core.JoinPath(homeDirectory, ".huggingface")
|
||||
requireCoreOK(t, testFilesystem().EnsureDir(tokenDirectory))
|
||||
requireCoreWriteBytes(t, core.JoinPath(tokenDirectory, "token"), []byte(" hf_file_token \n"))
|
||||
|
||||
assertEqual(t, "hf_file_token", resolveHFToken(""))
|
||||
}
|
||||
6
store.go
6
store.go
|
|
@ -463,6 +463,12 @@ func (storeInstance *Store) Close() error {
|
|||
storeInstance.cachedOrphanWorkspaces = nil
|
||||
storeInstance.orphanWorkspaceLock.Unlock()
|
||||
|
||||
if storeInstance.db == nil {
|
||||
storeInstance.db = storeInstance.sqliteDatabase
|
||||
}
|
||||
if storeInstance.sqliteDatabase == nil {
|
||||
storeInstance.sqliteDatabase = storeInstance.db
|
||||
}
|
||||
if storeInstance.sqliteDatabase == nil {
|
||||
return orphanCleanupErr
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1026,6 +1026,23 @@ func TestStore_Close_Good_Idempotent(t *testing.T) {
|
|||
assertNoError(t, storeInstance.Close())
|
||||
}
|
||||
|
||||
func TestStore_Close_Good_BackfillsDatabaseAlias(t *testing.T) {
|
||||
database, err := sql.Open("sqlite", ":memory:")
|
||||
assertNoError(t, err)
|
||||
|
||||
storeInstance := &Store{
|
||||
db: database,
|
||||
cancelPurge: func() {},
|
||||
purgeContext: context.Background(),
|
||||
}
|
||||
|
||||
assertNoError(t, storeInstance.Close())
|
||||
|
||||
_, err = database.Exec("SELECT 1")
|
||||
assertError(t, err)
|
||||
assertContainsString(t, err.Error(), "closed")
|
||||
}
|
||||
|
||||
func TestStore_Close_Good_OperationsFailAfterClose(t *testing.T) {
|
||||
storeInstance, _ := New(":memory:")
|
||||
assertNoError(t, storeInstance.Close())
|
||||
|
|
|
|||
|
|
@ -4,6 +4,8 @@ import (
|
|||
"reflect"
|
||||
"sort"
|
||||
"testing"
|
||||
|
||||
core "dappco.re/go/core"
|
||||
)
|
||||
|
||||
func assertNoError(t testing.TB, err error) {
|
||||
|
|
@ -187,26 +189,7 @@ func assertNotPanics(t testing.TB, fn func()) {
|
|||
}
|
||||
|
||||
func errIs(err, target error) bool {
|
||||
for err != nil {
|
||||
if err == target {
|
||||
return true
|
||||
}
|
||||
multiUnwrapper, ok := err.(interface{ Unwrap() []error })
|
||||
if ok {
|
||||
for _, childErr := range multiUnwrapper.Unwrap() {
|
||||
if errIs(childErr, target) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
unwrapper, ok := err.(interface{ Unwrap() error })
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
err = unwrapper.Unwrap()
|
||||
}
|
||||
return false
|
||||
return core.Is(err, target)
|
||||
}
|
||||
|
||||
func isNil(value any) bool {
|
||||
|
|
|
|||
26
workspace.go
26
workspace.go
|
|
@ -189,13 +189,18 @@ func loadRecoveredWorkspaces(stateDirectory string, store *Store) []*Workspace {
|
|||
filesystem := (&core.Fs{}).NewUnrestricted()
|
||||
orphanWorkspaces := make([]*Workspace, 0)
|
||||
for _, databasePath := range discoverOrphanWorkspacePaths(stateDirectory) {
|
||||
workspaceName := workspaceNameFromPath(stateDirectory, databasePath)
|
||||
if workspaceCommitMarkerExists(store, workspaceName) {
|
||||
removeWorkspaceDatabaseFiles(filesystem, databasePath)
|
||||
continue
|
||||
}
|
||||
database, err := openWorkspaceDatabase(databasePath)
|
||||
if err != nil {
|
||||
quarantineOrphanWorkspaceFiles(filesystem, stateDirectory, databasePath)
|
||||
continue
|
||||
}
|
||||
orphanWorkspace := &Workspace{
|
||||
name: workspaceNameFromPath(stateDirectory, databasePath),
|
||||
name: workspaceName,
|
||||
store: store,
|
||||
db: database,
|
||||
databasePath: databasePath,
|
||||
|
|
@ -326,7 +331,7 @@ func (workspace *Workspace) Commit() core.Result {
|
|||
return core.Result{Value: err, OK: false}
|
||||
}
|
||||
if err := workspace.closeAndRemoveFiles(); err != nil {
|
||||
return core.Result{Value: err, OK: false}
|
||||
return core.Result{Value: cloneAnyMap(fields), OK: true}
|
||||
}
|
||||
return core.Result{Value: cloneAnyMap(fields), OK: true}
|
||||
}
|
||||
|
|
@ -596,6 +601,23 @@ func workspaceQuarantinePathExists(filesystem *core.Fs, databasePath string) boo
|
|||
return false
|
||||
}
|
||||
|
||||
func workspaceCommitMarkerExists(storeInstance *Store, workspaceName string) bool {
|
||||
if storeInstance == nil || workspaceName == "" {
|
||||
return false
|
||||
}
|
||||
exists, err := storeInstance.Exists(workspaceSummaryGroup(workspaceName), "summary")
|
||||
return err == nil && exists
|
||||
}
|
||||
|
||||
func removeWorkspaceDatabaseFiles(filesystem *core.Fs, databasePath string) {
|
||||
if filesystem == nil || databasePath == "" {
|
||||
return
|
||||
}
|
||||
for _, path := range workspaceDatabaseFilePaths(databasePath) {
|
||||
_ = filesystem.Delete(path)
|
||||
}
|
||||
}
|
||||
|
||||
func workspaceDatabaseFilePaths(databasePath string) []string {
|
||||
if core.HasSuffix(databasePath, ".duckdb") {
|
||||
return []string{databasePath, databasePath + ".wal"}
|
||||
|
|
|
|||
|
|
@ -233,6 +233,28 @@ func TestWorkspace_Commit_Good_EmitsSummaryEvent(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestWorkspace_RecoverOrphans_Good_SkipsAlreadyCommittedWorkspaceFile(t *testing.T) {
|
||||
stateDirectory := useWorkspaceStateDirectory(t)
|
||||
|
||||
storeInstance, err := New(":memory:", WithJournal("http://127.0.0.1:8086", "core", "events"))
|
||||
assertNoError(t, err)
|
||||
defer func() { _ = storeInstance.Close() }()
|
||||
|
||||
workspace, err := storeInstance.NewWorkspace("committed-leftover")
|
||||
assertNoError(t, err)
|
||||
|
||||
assertNoError(t, workspace.Put("like", map[string]any{"user": "@alice"}))
|
||||
fields, err := workspace.aggregateFields()
|
||||
assertNoError(t, err)
|
||||
assertNoError(t, storeInstance.commitWorkspaceAggregate(workspace.Name(), fields))
|
||||
assertNoError(t, workspace.closeWithoutRemovingFiles())
|
||||
assertTrue(t, testFilesystem().Exists(workspace.databasePath))
|
||||
|
||||
orphans := storeInstance.RecoverOrphans(stateDirectory)
|
||||
assertLen(t, orphans, 0)
|
||||
assertFalse(t, testFilesystem().Exists(workspace.databasePath))
|
||||
}
|
||||
|
||||
func TestWorkspace_Discard_Good_Idempotent(t *testing.T) {
|
||||
useWorkspaceStateDirectory(t)
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue