// Example: medium := datanode.New() // Example: _ = medium.Write("jobs/run.log", "started") // Example: snapshot, _ := medium.Snapshot() // Example: restored, _ := datanode.FromTar(snapshot) package datanode import ( "cmp" goio "io" "io/fs" "path" "slices" "sync" "time" core "dappco.re/go/core" borgdatanode "forge.lthn.ai/Snider/Borg/pkg/datanode" ) var ( dataNodeWalkDir = func(fileSystem fs.FS, root string, callback fs.WalkDirFunc) error { return fs.WalkDir(fileSystem, root, callback) } dataNodeOpen = func(dataNode *borgdatanode.DataNode, filePath string) (fs.File, error) { return dataNode.Open(filePath) } dataNodeReadAll = func(reader goio.Reader) ([]byte, error) { return goio.ReadAll(reader) } ) // Example: medium := datanode.New() // Example: _ = medium.Write("jobs/run.log", "started") // Example: snapshot, _ := medium.Snapshot() type Medium struct { dataNode *borgdatanode.DataNode directorySet map[string]bool lock sync.RWMutex } // Example: medium := datanode.New() // Example: _ = medium.Write("jobs/run.log", "started") func New() *Medium { return &Medium{ dataNode: borgdatanode.New(), directorySet: make(map[string]bool), } } // Example: sourceMedium := datanode.New() // Example: snapshot, _ := sourceMedium.Snapshot() // Example: restored, _ := datanode.FromTar(snapshot) func FromTar(data []byte) (*Medium, error) { dataNode, err := borgdatanode.FromTar(data) if err != nil { return nil, core.E("datanode.FromTar", "failed to restore", err) } return &Medium{ dataNode: dataNode, directorySet: make(map[string]bool), }, nil } // Example: snapshot, _ := medium.Snapshot() func (medium *Medium) Snapshot() ([]byte, error) { medium.lock.RLock() defer medium.lock.RUnlock() data, err := medium.dataNode.ToTar() if err != nil { return nil, core.E("datanode.Snapshot", "tar failed", err) } return data, nil } // Example: _ = medium.Restore(snapshot) func (medium *Medium) Restore(data []byte) error { dataNode, err := borgdatanode.FromTar(data) if err != nil { return core.E("datanode.Restore", "tar failed", err) } medium.lock.Lock() defer medium.lock.Unlock() medium.dataNode = dataNode medium.directorySet = make(map[string]bool) return nil } // Example: dataNode := medium.DataNode() func (medium *Medium) DataNode() *borgdatanode.DataNode { medium.lock.RLock() defer medium.lock.RUnlock() return medium.dataNode } func normaliseEntryPath(filePath string) string { filePath = core.TrimPrefix(filePath, "/") filePath = path.Clean(filePath) if filePath == "." { return "" } return filePath } func (medium *Medium) Read(filePath string) (string, error) { medium.lock.RLock() defer medium.lock.RUnlock() filePath = normaliseEntryPath(filePath) file, err := medium.dataNode.Open(filePath) if err != nil { return "", core.E("datanode.Read", core.Concat("not found: ", filePath), fs.ErrNotExist) } defer file.Close() info, err := file.Stat() if err != nil { return "", core.E("datanode.Read", core.Concat("stat failed: ", filePath), err) } if info.IsDir() { return "", core.E("datanode.Read", core.Concat("is a directory: ", filePath), fs.ErrInvalid) } data, err := goio.ReadAll(file) if err != nil { return "", core.E("datanode.Read", core.Concat("read failed: ", filePath), err) } return string(data), nil } func (medium *Medium) Write(filePath, content string) error { medium.lock.Lock() defer medium.lock.Unlock() filePath = normaliseEntryPath(filePath) if filePath == "" { return core.E("datanode.Write", "empty path", fs.ErrInvalid) } medium.dataNode.AddData(filePath, []byte(content)) medium.ensureDirsLocked(path.Dir(filePath)) return nil } func (medium *Medium) WriteMode(filePath, content string, mode fs.FileMode) error { return medium.Write(filePath, content) } func (medium *Medium) EnsureDir(filePath string) error { medium.lock.Lock() defer medium.lock.Unlock() filePath = normaliseEntryPath(filePath) if filePath == "" { return nil } medium.ensureDirsLocked(filePath) return nil } func (medium *Medium) ensureDirsLocked(directoryPath string) { for directoryPath != "" && directoryPath != "." { medium.directorySet[directoryPath] = true directoryPath = path.Dir(directoryPath) if directoryPath == "." { break } } } func (medium *Medium) IsFile(filePath string) bool { medium.lock.RLock() defer medium.lock.RUnlock() filePath = normaliseEntryPath(filePath) info, err := medium.dataNode.Stat(filePath) return err == nil && !info.IsDir() } func (medium *Medium) Delete(filePath string) error { medium.lock.Lock() defer medium.lock.Unlock() filePath = normaliseEntryPath(filePath) if filePath == "" { return core.E("datanode.Delete", "cannot delete root", fs.ErrPermission) } info, err := medium.dataNode.Stat(filePath) if err != nil { if medium.directorySet[filePath] { hasChildren, err := medium.hasPrefixLocked(filePath + "/") if err != nil { return core.E("datanode.Delete", core.Concat("failed to inspect directory: ", filePath), err) } if hasChildren { return core.E("datanode.Delete", core.Concat("directory not empty: ", filePath), fs.ErrExist) } delete(medium.directorySet, filePath) return nil } return core.E("datanode.Delete", core.Concat("not found: ", filePath), fs.ErrNotExist) } if info.IsDir() { hasChildren, err := medium.hasPrefixLocked(filePath + "/") if err != nil { return core.E("datanode.Delete", core.Concat("failed to inspect directory: ", filePath), err) } if hasChildren { return core.E("datanode.Delete", core.Concat("directory not empty: ", filePath), fs.ErrExist) } delete(medium.directorySet, filePath) return nil } if err := medium.removeFileLocked(filePath); err != nil { return core.E("datanode.Delete", core.Concat("failed to delete file: ", filePath), err) } return nil } func (medium *Medium) DeleteAll(filePath string) error { medium.lock.Lock() defer medium.lock.Unlock() filePath = normaliseEntryPath(filePath) if filePath == "" { return core.E("datanode.DeleteAll", "cannot delete root", fs.ErrPermission) } prefix := filePath + "/" found := false info, err := medium.dataNode.Stat(filePath) if err == nil && !info.IsDir() { if err := medium.removeFileLocked(filePath); err != nil { return core.E("datanode.DeleteAll", core.Concat("failed to delete file: ", filePath), err) } found = true } entries, err := medium.collectAllLocked() if err != nil { return core.E("datanode.DeleteAll", core.Concat("failed to inspect tree: ", filePath), err) } for _, name := range entries { if name == filePath || core.HasPrefix(name, prefix) { if err := medium.removeFileLocked(name); err != nil { return core.E("datanode.DeleteAll", core.Concat("failed to delete file: ", name), err) } found = true } } for directoryPath := range medium.directorySet { if directoryPath == filePath || core.HasPrefix(directoryPath, prefix) { delete(medium.directorySet, directoryPath) found = true } } if !found { return core.E("datanode.DeleteAll", core.Concat("not found: ", filePath), fs.ErrNotExist) } return nil } func (medium *Medium) Rename(oldPath, newPath string) error { medium.lock.Lock() defer medium.lock.Unlock() oldPath = normaliseEntryPath(oldPath) newPath = normaliseEntryPath(newPath) info, err := medium.dataNode.Stat(oldPath) if err != nil { return core.E("datanode.Rename", core.Concat("not found: ", oldPath), fs.ErrNotExist) } if !info.IsDir() { data, err := medium.readFileLocked(oldPath) if err != nil { return core.E("datanode.Rename", core.Concat("failed to read source file: ", oldPath), err) } medium.dataNode.AddData(newPath, data) medium.ensureDirsLocked(path.Dir(newPath)) if err := medium.removeFileLocked(oldPath); err != nil { return core.E("datanode.Rename", core.Concat("failed to remove source file: ", oldPath), err) } return nil } oldPrefix := oldPath + "/" newPrefix := newPath + "/" entries, err := medium.collectAllLocked() if err != nil { return core.E("datanode.Rename", core.Concat("failed to inspect tree: ", oldPath), err) } for _, name := range entries { if core.HasPrefix(name, oldPrefix) { newName := core.Concat(newPrefix, core.TrimPrefix(name, oldPrefix)) data, err := medium.readFileLocked(name) if err != nil { return core.E("datanode.Rename", core.Concat("failed to read source file: ", name), err) } medium.dataNode.AddData(newName, data) if err := medium.removeFileLocked(name); err != nil { return core.E("datanode.Rename", core.Concat("failed to remove source file: ", name), err) } } } dirsToMove := make(map[string]string) for directoryPath := range medium.directorySet { if directoryPath == oldPath || core.HasPrefix(directoryPath, oldPrefix) { newDirectoryPath := core.Concat(newPath, core.TrimPrefix(directoryPath, oldPath)) dirsToMove[directoryPath] = newDirectoryPath } } for oldDirectoryPath, newDirectoryPath := range dirsToMove { delete(medium.directorySet, oldDirectoryPath) medium.directorySet[newDirectoryPath] = true } return nil } func (medium *Medium) List(filePath string) ([]fs.DirEntry, error) { medium.lock.RLock() defer medium.lock.RUnlock() filePath = normaliseEntryPath(filePath) entries, err := medium.dataNode.ReadDir(filePath) if err != nil { if filePath == "" || medium.directorySet[filePath] { return []fs.DirEntry{}, nil } return nil, core.E("datanode.List", core.Concat("not found: ", filePath), fs.ErrNotExist) } prefix := filePath if prefix != "" { prefix += "/" } seen := make(map[string]bool) for _, entry := range entries { seen[entry.Name()] = true } for directoryPath := range medium.directorySet { if !core.HasPrefix(directoryPath, prefix) { continue } rest := core.TrimPrefix(directoryPath, prefix) if rest == "" { continue } first := core.SplitN(rest, "/", 2)[0] if !seen[first] { seen[first] = true entries = append(entries, &dirEntry{name: first}) } } slices.SortFunc(entries, func(a, b fs.DirEntry) int { return cmp.Compare(a.Name(), b.Name()) }) return entries, nil } func (medium *Medium) Stat(filePath string) (fs.FileInfo, error) { medium.lock.RLock() defer medium.lock.RUnlock() filePath = normaliseEntryPath(filePath) if filePath == "" { return &fileInfo{name: ".", isDir: true, mode: fs.ModeDir | 0755}, nil } info, err := medium.dataNode.Stat(filePath) if err == nil { return info, nil } if medium.directorySet[filePath] { return &fileInfo{name: path.Base(filePath), isDir: true, mode: fs.ModeDir | 0755}, nil } return nil, core.E("datanode.Stat", core.Concat("not found: ", filePath), fs.ErrNotExist) } func (medium *Medium) Open(filePath string) (fs.File, error) { medium.lock.RLock() defer medium.lock.RUnlock() filePath = normaliseEntryPath(filePath) return medium.dataNode.Open(filePath) } func (medium *Medium) Create(filePath string) (goio.WriteCloser, error) { filePath = normaliseEntryPath(filePath) if filePath == "" { return nil, core.E("datanode.Create", "empty path", fs.ErrInvalid) } return &writeCloser{medium: medium, path: filePath}, nil } func (medium *Medium) Append(filePath string) (goio.WriteCloser, error) { filePath = normaliseEntryPath(filePath) if filePath == "" { return nil, core.E("datanode.Append", "empty path", fs.ErrInvalid) } var existing []byte medium.lock.RLock() if medium.IsFile(filePath) { data, err := medium.readFileLocked(filePath) if err != nil { medium.lock.RUnlock() return nil, core.E("datanode.Append", core.Concat("failed to read existing content: ", filePath), err) } existing = data } medium.lock.RUnlock() return &writeCloser{medium: medium, path: filePath, buffer: existing}, nil } func (medium *Medium) ReadStream(filePath string) (goio.ReadCloser, error) { medium.lock.RLock() defer medium.lock.RUnlock() filePath = normaliseEntryPath(filePath) file, err := medium.dataNode.Open(filePath) if err != nil { return nil, core.E("datanode.ReadStream", core.Concat("not found: ", filePath), fs.ErrNotExist) } return file.(goio.ReadCloser), nil } func (medium *Medium) WriteStream(filePath string) (goio.WriteCloser, error) { return medium.Create(filePath) } func (medium *Medium) Exists(filePath string) bool { medium.lock.RLock() defer medium.lock.RUnlock() filePath = normaliseEntryPath(filePath) if filePath == "" { return true } _, err := medium.dataNode.Stat(filePath) if err == nil { return true } return medium.directorySet[filePath] } func (medium *Medium) IsDir(filePath string) bool { medium.lock.RLock() defer medium.lock.RUnlock() filePath = normaliseEntryPath(filePath) if filePath == "" { return true } info, err := medium.dataNode.Stat(filePath) if err == nil { return info.IsDir() } return medium.directorySet[filePath] } func (medium *Medium) hasPrefixLocked(prefix string) (bool, error) { entries, err := medium.collectAllLocked() if err != nil { return false, err } for _, name := range entries { if core.HasPrefix(name, prefix) { return true, nil } } for directoryPath := range medium.directorySet { if core.HasPrefix(directoryPath, prefix) { return true, nil } } return false, nil } func (medium *Medium) collectAllLocked() ([]string, error) { var names []string err := dataNodeWalkDir(medium.dataNode, ".", func(filePath string, entry fs.DirEntry, err error) error { if err != nil { return err } if !entry.IsDir() { names = append(names, filePath) } return nil }) return names, err } func (medium *Medium) readFileLocked(filePath string) ([]byte, error) { file, err := dataNodeOpen(medium.dataNode, filePath) if err != nil { return nil, err } data, readErr := dataNodeReadAll(file) closeErr := file.Close() if readErr != nil { return nil, readErr } if closeErr != nil { return nil, closeErr } return data, nil } func (medium *Medium) removeFileLocked(target string) error { entries, err := medium.collectAllLocked() if err != nil { return err } newDataNode := borgdatanode.New() for _, name := range entries { if name == target { continue } data, err := medium.readFileLocked(name) if err != nil { return err } newDataNode.AddData(name, data) } medium.dataNode = newDataNode return nil } type writeCloser struct { medium *Medium path string buffer []byte } func (writer *writeCloser) Write(data []byte) (int, error) { writer.buffer = append(writer.buffer, data...) return len(data), nil } func (writer *writeCloser) Close() error { writer.medium.lock.Lock() defer writer.medium.lock.Unlock() writer.medium.dataNode.AddData(writer.path, writer.buffer) writer.medium.ensureDirsLocked(path.Dir(writer.path)) return nil } type dirEntry struct { name string } func (entry *dirEntry) Name() string { return entry.name } func (entry *dirEntry) IsDir() bool { return true } func (entry *dirEntry) Type() fs.FileMode { return fs.ModeDir } func (entry *dirEntry) Info() (fs.FileInfo, error) { return &fileInfo{name: entry.name, isDir: true, mode: fs.ModeDir | 0755}, nil } type fileInfo struct { name string size int64 mode fs.FileMode modTime time.Time isDir bool } func (info *fileInfo) Name() string { return info.name } func (info *fileInfo) Size() int64 { return info.size } func (info *fileInfo) Mode() fs.FileMode { return info.mode } func (info *fileInfo) ModTime() time.Time { return info.modTime } func (info *fileInfo) IsDir() bool { return info.isDir } func (info *fileInfo) Sys() any { return nil }