- Add // Example: usage comments to all Medium interface methods in io.go - Add // Example: comments to local, s3, sqlite, store, datanode, node medium methods - Rename short variable `n` → `nodeTree` throughout node/node_test.go - Rename short variable `s` → `keyValueStore` in store/store_test.go - Rename counter variable `n` → `count` in store/store_test.go - Rename `m` → `medium` in store/medium_test.go helper - Remove redundant prose comments replaced by usage examples Co-Authored-By: Virgil <virgil@lethean.io>
597 lines
15 KiB
Go
597 lines
15 KiB
Go
// Example: medium := datanode.New()
|
|
// Example: _ = medium.Write("jobs/run.log", "started")
|
|
// Example: snapshot, _ := medium.Snapshot()
|
|
// Example: restored, _ := datanode.FromTar(snapshot)
|
|
package datanode
|
|
|
|
import (
|
|
"cmp"
|
|
goio "io"
|
|
"io/fs"
|
|
"path"
|
|
"slices"
|
|
"sync"
|
|
"time"
|
|
|
|
core "dappco.re/go/core"
|
|
borgdatanode "forge.lthn.ai/Snider/Borg/pkg/datanode"
|
|
)
|
|
|
|
var (
|
|
dataNodeWalkDir = func(fileSystem fs.FS, root string, callback fs.WalkDirFunc) error {
|
|
return fs.WalkDir(fileSystem, root, callback)
|
|
}
|
|
dataNodeOpen = func(dataNode *borgdatanode.DataNode, filePath string) (fs.File, error) {
|
|
return dataNode.Open(filePath)
|
|
}
|
|
dataNodeReadAll = func(reader goio.Reader) ([]byte, error) {
|
|
return goio.ReadAll(reader)
|
|
}
|
|
)
|
|
|
|
// Example: medium := datanode.New()
|
|
// Example: _ = medium.Write("jobs/run.log", "started")
|
|
// Example: snapshot, _ := medium.Snapshot()
|
|
type Medium struct {
|
|
dataNode *borgdatanode.DataNode
|
|
directorySet map[string]bool
|
|
lock sync.RWMutex
|
|
}
|
|
|
|
// Example: medium := datanode.New()
|
|
// Example: _ = medium.Write("jobs/run.log", "started")
|
|
func New() *Medium {
|
|
return &Medium{
|
|
dataNode: borgdatanode.New(),
|
|
directorySet: make(map[string]bool),
|
|
}
|
|
}
|
|
|
|
// Example: sourceMedium := datanode.New()
|
|
// Example: snapshot, _ := sourceMedium.Snapshot()
|
|
// Example: restored, _ := datanode.FromTar(snapshot)
|
|
func FromTar(data []byte) (*Medium, error) {
|
|
dataNode, err := borgdatanode.FromTar(data)
|
|
if err != nil {
|
|
return nil, core.E("datanode.FromTar", "failed to restore", err)
|
|
}
|
|
return &Medium{
|
|
dataNode: dataNode,
|
|
directorySet: make(map[string]bool),
|
|
}, nil
|
|
}
|
|
|
|
// Example: snapshot, _ := medium.Snapshot()
|
|
func (medium *Medium) Snapshot() ([]byte, error) {
|
|
medium.lock.RLock()
|
|
defer medium.lock.RUnlock()
|
|
data, err := medium.dataNode.ToTar()
|
|
if err != nil {
|
|
return nil, core.E("datanode.Snapshot", "tar failed", err)
|
|
}
|
|
return data, nil
|
|
}
|
|
|
|
// Example: _ = medium.Restore(snapshot)
|
|
func (medium *Medium) Restore(data []byte) error {
|
|
dataNode, err := borgdatanode.FromTar(data)
|
|
if err != nil {
|
|
return core.E("datanode.Restore", "tar failed", err)
|
|
}
|
|
medium.lock.Lock()
|
|
defer medium.lock.Unlock()
|
|
medium.dataNode = dataNode
|
|
medium.directorySet = make(map[string]bool)
|
|
return nil
|
|
}
|
|
|
|
// Example: dataNode := medium.DataNode()
|
|
func (medium *Medium) DataNode() *borgdatanode.DataNode {
|
|
medium.lock.RLock()
|
|
defer medium.lock.RUnlock()
|
|
return medium.dataNode
|
|
}
|
|
|
|
func normaliseEntryPath(filePath string) string {
|
|
filePath = core.TrimPrefix(filePath, "/")
|
|
filePath = path.Clean(filePath)
|
|
if filePath == "." {
|
|
return ""
|
|
}
|
|
return filePath
|
|
}
|
|
|
|
func (medium *Medium) Read(filePath string) (string, error) {
|
|
medium.lock.RLock()
|
|
defer medium.lock.RUnlock()
|
|
|
|
filePath = normaliseEntryPath(filePath)
|
|
file, err := medium.dataNode.Open(filePath)
|
|
if err != nil {
|
|
return "", core.E("datanode.Read", core.Concat("not found: ", filePath), fs.ErrNotExist)
|
|
}
|
|
defer file.Close()
|
|
|
|
info, err := file.Stat()
|
|
if err != nil {
|
|
return "", core.E("datanode.Read", core.Concat("stat failed: ", filePath), err)
|
|
}
|
|
if info.IsDir() {
|
|
return "", core.E("datanode.Read", core.Concat("is a directory: ", filePath), fs.ErrInvalid)
|
|
}
|
|
|
|
data, err := goio.ReadAll(file)
|
|
if err != nil {
|
|
return "", core.E("datanode.Read", core.Concat("read failed: ", filePath), err)
|
|
}
|
|
return string(data), nil
|
|
}
|
|
|
|
func (medium *Medium) Write(filePath, content string) error {
|
|
medium.lock.Lock()
|
|
defer medium.lock.Unlock()
|
|
|
|
filePath = normaliseEntryPath(filePath)
|
|
if filePath == "" {
|
|
return core.E("datanode.Write", "empty path", fs.ErrInvalid)
|
|
}
|
|
medium.dataNode.AddData(filePath, []byte(content))
|
|
|
|
medium.ensureDirsLocked(path.Dir(filePath))
|
|
return nil
|
|
}
|
|
|
|
func (medium *Medium) WriteMode(filePath, content string, mode fs.FileMode) error {
|
|
return medium.Write(filePath, content)
|
|
}
|
|
|
|
func (medium *Medium) EnsureDir(filePath string) error {
|
|
medium.lock.Lock()
|
|
defer medium.lock.Unlock()
|
|
|
|
filePath = normaliseEntryPath(filePath)
|
|
if filePath == "" {
|
|
return nil
|
|
}
|
|
medium.ensureDirsLocked(filePath)
|
|
return nil
|
|
}
|
|
|
|
func (medium *Medium) ensureDirsLocked(directoryPath string) {
|
|
for directoryPath != "" && directoryPath != "." {
|
|
medium.directorySet[directoryPath] = true
|
|
directoryPath = path.Dir(directoryPath)
|
|
if directoryPath == "." {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
func (medium *Medium) IsFile(filePath string) bool {
|
|
medium.lock.RLock()
|
|
defer medium.lock.RUnlock()
|
|
|
|
filePath = normaliseEntryPath(filePath)
|
|
info, err := medium.dataNode.Stat(filePath)
|
|
return err == nil && !info.IsDir()
|
|
}
|
|
|
|
func (medium *Medium) Delete(filePath string) error {
|
|
medium.lock.Lock()
|
|
defer medium.lock.Unlock()
|
|
|
|
filePath = normaliseEntryPath(filePath)
|
|
if filePath == "" {
|
|
return core.E("datanode.Delete", "cannot delete root", fs.ErrPermission)
|
|
}
|
|
|
|
info, err := medium.dataNode.Stat(filePath)
|
|
if err != nil {
|
|
if medium.directorySet[filePath] {
|
|
hasChildren, err := medium.hasPrefixLocked(filePath + "/")
|
|
if err != nil {
|
|
return core.E("datanode.Delete", core.Concat("failed to inspect directory: ", filePath), err)
|
|
}
|
|
if hasChildren {
|
|
return core.E("datanode.Delete", core.Concat("directory not empty: ", filePath), fs.ErrExist)
|
|
}
|
|
delete(medium.directorySet, filePath)
|
|
return nil
|
|
}
|
|
return core.E("datanode.Delete", core.Concat("not found: ", filePath), fs.ErrNotExist)
|
|
}
|
|
|
|
if info.IsDir() {
|
|
hasChildren, err := medium.hasPrefixLocked(filePath + "/")
|
|
if err != nil {
|
|
return core.E("datanode.Delete", core.Concat("failed to inspect directory: ", filePath), err)
|
|
}
|
|
if hasChildren {
|
|
return core.E("datanode.Delete", core.Concat("directory not empty: ", filePath), fs.ErrExist)
|
|
}
|
|
delete(medium.directorySet, filePath)
|
|
return nil
|
|
}
|
|
|
|
if err := medium.removeFileLocked(filePath); err != nil {
|
|
return core.E("datanode.Delete", core.Concat("failed to delete file: ", filePath), err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (medium *Medium) DeleteAll(filePath string) error {
|
|
medium.lock.Lock()
|
|
defer medium.lock.Unlock()
|
|
|
|
filePath = normaliseEntryPath(filePath)
|
|
if filePath == "" {
|
|
return core.E("datanode.DeleteAll", "cannot delete root", fs.ErrPermission)
|
|
}
|
|
|
|
prefix := filePath + "/"
|
|
found := false
|
|
|
|
info, err := medium.dataNode.Stat(filePath)
|
|
if err == nil && !info.IsDir() {
|
|
if err := medium.removeFileLocked(filePath); err != nil {
|
|
return core.E("datanode.DeleteAll", core.Concat("failed to delete file: ", filePath), err)
|
|
}
|
|
found = true
|
|
}
|
|
|
|
entries, err := medium.collectAllLocked()
|
|
if err != nil {
|
|
return core.E("datanode.DeleteAll", core.Concat("failed to inspect tree: ", filePath), err)
|
|
}
|
|
for _, name := range entries {
|
|
if name == filePath || core.HasPrefix(name, prefix) {
|
|
if err := medium.removeFileLocked(name); err != nil {
|
|
return core.E("datanode.DeleteAll", core.Concat("failed to delete file: ", name), err)
|
|
}
|
|
found = true
|
|
}
|
|
}
|
|
|
|
for directoryPath := range medium.directorySet {
|
|
if directoryPath == filePath || core.HasPrefix(directoryPath, prefix) {
|
|
delete(medium.directorySet, directoryPath)
|
|
found = true
|
|
}
|
|
}
|
|
|
|
if !found {
|
|
return core.E("datanode.DeleteAll", core.Concat("not found: ", filePath), fs.ErrNotExist)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (medium *Medium) Rename(oldPath, newPath string) error {
|
|
medium.lock.Lock()
|
|
defer medium.lock.Unlock()
|
|
|
|
oldPath = normaliseEntryPath(oldPath)
|
|
newPath = normaliseEntryPath(newPath)
|
|
|
|
info, err := medium.dataNode.Stat(oldPath)
|
|
if err != nil {
|
|
return core.E("datanode.Rename", core.Concat("not found: ", oldPath), fs.ErrNotExist)
|
|
}
|
|
|
|
if !info.IsDir() {
|
|
data, err := medium.readFileLocked(oldPath)
|
|
if err != nil {
|
|
return core.E("datanode.Rename", core.Concat("failed to read source file: ", oldPath), err)
|
|
}
|
|
medium.dataNode.AddData(newPath, data)
|
|
medium.ensureDirsLocked(path.Dir(newPath))
|
|
if err := medium.removeFileLocked(oldPath); err != nil {
|
|
return core.E("datanode.Rename", core.Concat("failed to remove source file: ", oldPath), err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
oldPrefix := oldPath + "/"
|
|
newPrefix := newPath + "/"
|
|
|
|
entries, err := medium.collectAllLocked()
|
|
if err != nil {
|
|
return core.E("datanode.Rename", core.Concat("failed to inspect tree: ", oldPath), err)
|
|
}
|
|
for _, name := range entries {
|
|
if core.HasPrefix(name, oldPrefix) {
|
|
newName := core.Concat(newPrefix, core.TrimPrefix(name, oldPrefix))
|
|
data, err := medium.readFileLocked(name)
|
|
if err != nil {
|
|
return core.E("datanode.Rename", core.Concat("failed to read source file: ", name), err)
|
|
}
|
|
medium.dataNode.AddData(newName, data)
|
|
if err := medium.removeFileLocked(name); err != nil {
|
|
return core.E("datanode.Rename", core.Concat("failed to remove source file: ", name), err)
|
|
}
|
|
}
|
|
}
|
|
|
|
dirsToMove := make(map[string]string)
|
|
for directoryPath := range medium.directorySet {
|
|
if directoryPath == oldPath || core.HasPrefix(directoryPath, oldPrefix) {
|
|
newDirectoryPath := core.Concat(newPath, core.TrimPrefix(directoryPath, oldPath))
|
|
dirsToMove[directoryPath] = newDirectoryPath
|
|
}
|
|
}
|
|
for oldDirectoryPath, newDirectoryPath := range dirsToMove {
|
|
delete(medium.directorySet, oldDirectoryPath)
|
|
medium.directorySet[newDirectoryPath] = true
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (medium *Medium) List(filePath string) ([]fs.DirEntry, error) {
|
|
medium.lock.RLock()
|
|
defer medium.lock.RUnlock()
|
|
|
|
filePath = normaliseEntryPath(filePath)
|
|
|
|
entries, err := medium.dataNode.ReadDir(filePath)
|
|
if err != nil {
|
|
if filePath == "" || medium.directorySet[filePath] {
|
|
return []fs.DirEntry{}, nil
|
|
}
|
|
return nil, core.E("datanode.List", core.Concat("not found: ", filePath), fs.ErrNotExist)
|
|
}
|
|
|
|
prefix := filePath
|
|
if prefix != "" {
|
|
prefix += "/"
|
|
}
|
|
seen := make(map[string]bool)
|
|
for _, e := range entries {
|
|
seen[e.Name()] = true
|
|
}
|
|
|
|
for directoryPath := range medium.directorySet {
|
|
if !core.HasPrefix(directoryPath, prefix) {
|
|
continue
|
|
}
|
|
rest := core.TrimPrefix(directoryPath, prefix)
|
|
if rest == "" {
|
|
continue
|
|
}
|
|
first := core.SplitN(rest, "/", 2)[0]
|
|
if !seen[first] {
|
|
seen[first] = true
|
|
entries = append(entries, &dirEntry{name: first})
|
|
}
|
|
}
|
|
|
|
slices.SortFunc(entries, func(a, b fs.DirEntry) int {
|
|
return cmp.Compare(a.Name(), b.Name())
|
|
})
|
|
|
|
return entries, nil
|
|
}
|
|
|
|
func (medium *Medium) Stat(filePath string) (fs.FileInfo, error) {
|
|
medium.lock.RLock()
|
|
defer medium.lock.RUnlock()
|
|
|
|
filePath = normaliseEntryPath(filePath)
|
|
if filePath == "" {
|
|
return &fileInfo{name: ".", isDir: true, mode: fs.ModeDir | 0755}, nil
|
|
}
|
|
|
|
info, err := medium.dataNode.Stat(filePath)
|
|
if err == nil {
|
|
return info, nil
|
|
}
|
|
|
|
if medium.directorySet[filePath] {
|
|
return &fileInfo{name: path.Base(filePath), isDir: true, mode: fs.ModeDir | 0755}, nil
|
|
}
|
|
return nil, core.E("datanode.Stat", core.Concat("not found: ", filePath), fs.ErrNotExist)
|
|
}
|
|
|
|
func (medium *Medium) Open(filePath string) (fs.File, error) {
|
|
medium.lock.RLock()
|
|
defer medium.lock.RUnlock()
|
|
|
|
filePath = normaliseEntryPath(filePath)
|
|
return medium.dataNode.Open(filePath)
|
|
}
|
|
|
|
func (medium *Medium) Create(filePath string) (goio.WriteCloser, error) {
|
|
filePath = normaliseEntryPath(filePath)
|
|
if filePath == "" {
|
|
return nil, core.E("datanode.Create", "empty path", fs.ErrInvalid)
|
|
}
|
|
return &writeCloser{medium: medium, path: filePath}, nil
|
|
}
|
|
|
|
func (medium *Medium) Append(filePath string) (goio.WriteCloser, error) {
|
|
filePath = normaliseEntryPath(filePath)
|
|
if filePath == "" {
|
|
return nil, core.E("datanode.Append", "empty path", fs.ErrInvalid)
|
|
}
|
|
|
|
var existing []byte
|
|
medium.lock.RLock()
|
|
if medium.IsFile(filePath) {
|
|
data, err := medium.readFileLocked(filePath)
|
|
if err != nil {
|
|
medium.lock.RUnlock()
|
|
return nil, core.E("datanode.Append", core.Concat("failed to read existing content: ", filePath), err)
|
|
}
|
|
existing = data
|
|
}
|
|
medium.lock.RUnlock()
|
|
|
|
return &writeCloser{medium: medium, path: filePath, buf: existing}, nil
|
|
}
|
|
|
|
func (medium *Medium) ReadStream(filePath string) (goio.ReadCloser, error) {
|
|
medium.lock.RLock()
|
|
defer medium.lock.RUnlock()
|
|
|
|
filePath = normaliseEntryPath(filePath)
|
|
file, err := medium.dataNode.Open(filePath)
|
|
if err != nil {
|
|
return nil, core.E("datanode.ReadStream", core.Concat("not found: ", filePath), fs.ErrNotExist)
|
|
}
|
|
return file.(goio.ReadCloser), nil
|
|
}
|
|
|
|
func (medium *Medium) WriteStream(filePath string) (goio.WriteCloser, error) {
|
|
return medium.Create(filePath)
|
|
}
|
|
|
|
func (medium *Medium) Exists(filePath string) bool {
|
|
medium.lock.RLock()
|
|
defer medium.lock.RUnlock()
|
|
|
|
filePath = normaliseEntryPath(filePath)
|
|
if filePath == "" {
|
|
return true
|
|
}
|
|
_, err := medium.dataNode.Stat(filePath)
|
|
if err == nil {
|
|
return true
|
|
}
|
|
return medium.directorySet[filePath]
|
|
}
|
|
|
|
func (medium *Medium) IsDir(filePath string) bool {
|
|
medium.lock.RLock()
|
|
defer medium.lock.RUnlock()
|
|
|
|
filePath = normaliseEntryPath(filePath)
|
|
if filePath == "" {
|
|
return true
|
|
}
|
|
info, err := medium.dataNode.Stat(filePath)
|
|
if err == nil {
|
|
return info.IsDir()
|
|
}
|
|
return medium.directorySet[filePath]
|
|
}
|
|
|
|
func (medium *Medium) hasPrefixLocked(prefix string) (bool, error) {
|
|
entries, err := medium.collectAllLocked()
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
for _, name := range entries {
|
|
if core.HasPrefix(name, prefix) {
|
|
return true, nil
|
|
}
|
|
}
|
|
for directoryPath := range medium.directorySet {
|
|
if core.HasPrefix(directoryPath, prefix) {
|
|
return true, nil
|
|
}
|
|
}
|
|
return false, nil
|
|
}
|
|
|
|
func (medium *Medium) collectAllLocked() ([]string, error) {
|
|
var names []string
|
|
err := dataNodeWalkDir(medium.dataNode, ".", func(filePath string, entry fs.DirEntry, err error) error {
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !entry.IsDir() {
|
|
names = append(names, filePath)
|
|
}
|
|
return nil
|
|
})
|
|
return names, err
|
|
}
|
|
|
|
func (medium *Medium) readFileLocked(filePath string) ([]byte, error) {
|
|
file, err := dataNodeOpen(medium.dataNode, filePath)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
data, readErr := dataNodeReadAll(file)
|
|
closeErr := file.Close()
|
|
if readErr != nil {
|
|
return nil, readErr
|
|
}
|
|
if closeErr != nil {
|
|
return nil, closeErr
|
|
}
|
|
return data, nil
|
|
}
|
|
|
|
func (medium *Medium) removeFileLocked(target string) error {
|
|
entries, err := medium.collectAllLocked()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
newDataNode := borgdatanode.New()
|
|
for _, name := range entries {
|
|
if name == target {
|
|
continue
|
|
}
|
|
data, err := medium.readFileLocked(name)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
newDataNode.AddData(name, data)
|
|
}
|
|
medium.dataNode = newDataNode
|
|
return nil
|
|
}
|
|
|
|
type writeCloser struct {
|
|
medium *Medium
|
|
path string
|
|
buf []byte
|
|
}
|
|
|
|
func (writer *writeCloser) Write(data []byte) (int, error) {
|
|
writer.buf = append(writer.buf, data...)
|
|
return len(data), nil
|
|
}
|
|
|
|
func (writer *writeCloser) Close() error {
|
|
writer.medium.lock.Lock()
|
|
defer writer.medium.lock.Unlock()
|
|
|
|
writer.medium.dataNode.AddData(writer.path, writer.buf)
|
|
writer.medium.ensureDirsLocked(path.Dir(writer.path))
|
|
return nil
|
|
}
|
|
|
|
type dirEntry struct {
|
|
name string
|
|
}
|
|
|
|
func (entry *dirEntry) Name() string { return entry.name }
|
|
|
|
func (entry *dirEntry) IsDir() bool { return true }
|
|
|
|
func (entry *dirEntry) Type() fs.FileMode { return fs.ModeDir }
|
|
|
|
func (entry *dirEntry) Info() (fs.FileInfo, error) {
|
|
return &fileInfo{name: entry.name, isDir: true, mode: fs.ModeDir | 0755}, nil
|
|
}
|
|
|
|
type fileInfo struct {
|
|
name string
|
|
size int64
|
|
mode fs.FileMode
|
|
modTime time.Time
|
|
isDir bool
|
|
}
|
|
|
|
func (info *fileInfo) Name() string { return info.name }
|
|
|
|
func (info *fileInfo) Size() int64 { return info.size }
|
|
|
|
func (info *fileInfo) Mode() fs.FileMode { return info.mode }
|
|
|
|
func (info *fileInfo) ModTime() time.Time { return info.modTime }
|
|
|
|
func (info *fileInfo) IsDir() bool { return info.isDir }
|
|
|
|
func (info *fileInfo) Sys() any { return nil }
|