refactor(node): replace stdlib helpers with core primitives
Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
parent
be55b2499b
commit
04ae11da43
12 changed files with 118 additions and 111 deletions
|
|
@ -2,11 +2,9 @@
|
|||
package logging
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"maps"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
|
|
@ -115,7 +113,7 @@ func (l *Logger) log(level Level, msg string, fields Fields) {
|
|||
}
|
||||
|
||||
// Build the log line
|
||||
var sb strings.Builder
|
||||
sb := core.NewBuilder()
|
||||
timestamp := time.Now().Format("2006/01/02 15:04:05")
|
||||
sb.WriteString(timestamp)
|
||||
sb.WriteString(" [")
|
||||
|
|
@ -138,12 +136,12 @@ func (l *Logger) log(level Level, msg string, fields Fields) {
|
|||
sb.WriteString(" ")
|
||||
sb.WriteString(k)
|
||||
sb.WriteString("=")
|
||||
sb.WriteString(fmt.Sprintf("%v", v))
|
||||
sb.WriteString(core.Sprint(v))
|
||||
}
|
||||
}
|
||||
|
||||
sb.WriteString("\n")
|
||||
fmt.Fprint(l.output, sb.String())
|
||||
_, _ = l.output.Write([]byte(sb.String()))
|
||||
}
|
||||
|
||||
// Debug logs a debug message.
|
||||
|
|
@ -168,22 +166,22 @@ func (l *Logger) Error(msg string, fields ...Fields) {
|
|||
|
||||
// Debugf logs a formatted debug message.
|
||||
func (l *Logger) Debugf(format string, args ...any) {
|
||||
l.log(LevelDebug, fmt.Sprintf(format, args...), nil)
|
||||
l.log(LevelDebug, core.Sprintf(format, args...), nil)
|
||||
}
|
||||
|
||||
// Infof logs a formatted informational message.
|
||||
func (l *Logger) Infof(format string, args ...any) {
|
||||
l.log(LevelInfo, fmt.Sprintf(format, args...), nil)
|
||||
l.log(LevelInfo, core.Sprintf(format, args...), nil)
|
||||
}
|
||||
|
||||
// Warnf logs a formatted warning message.
|
||||
func (l *Logger) Warnf(format string, args ...any) {
|
||||
l.log(LevelWarn, fmt.Sprintf(format, args...), nil)
|
||||
l.log(LevelWarn, core.Sprintf(format, args...), nil)
|
||||
}
|
||||
|
||||
// Errorf logs a formatted error message.
|
||||
func (l *Logger) Errorf(format string, args ...any) {
|
||||
l.log(LevelError, fmt.Sprintf(format, args...), nil)
|
||||
l.log(LevelError, core.Sprintf(format, args...), nil)
|
||||
}
|
||||
|
||||
// mergeFields combines multiple Fields maps into one.
|
||||
|
|
@ -270,7 +268,7 @@ func Errorf(format string, args ...any) {
|
|||
|
||||
// ParseLevel parses a string into a log level.
|
||||
func ParseLevel(s string) (Level, error) {
|
||||
switch strings.ToUpper(s) {
|
||||
switch core.Upper(s) {
|
||||
case "DEBUG":
|
||||
return LevelDebug, nil
|
||||
case "INFO":
|
||||
|
|
|
|||
|
|
@ -2,8 +2,9 @@ package node
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"sync"
|
||||
|
||||
core "dappco.re/go/core"
|
||||
)
|
||||
|
||||
// bufferPool provides reusable byte buffers for JSON encoding.
|
||||
|
|
@ -29,27 +30,22 @@ func putBuffer(buf *bytes.Buffer) {
|
|||
}
|
||||
}
|
||||
|
||||
// MarshalJSON encodes a value to JSON using a pooled buffer.
|
||||
// MarshalJSON encodes a value to JSON using Core's JSON primitive and then
|
||||
// restores the historical no-EscapeHTML behaviour expected by the node package.
|
||||
// Returns a copy of the encoded bytes (safe to use after the function returns).
|
||||
func MarshalJSON(v any) ([]byte, error) {
|
||||
buf := getBuffer()
|
||||
defer putBuffer(buf)
|
||||
|
||||
enc := json.NewEncoder(buf)
|
||||
// Don't escape HTML characters (matches json.Marshal behavior for these use cases)
|
||||
enc.SetEscapeHTML(false)
|
||||
if err := enc.Encode(v); err != nil {
|
||||
return nil, err
|
||||
encoded := core.JSONMarshal(v)
|
||||
if !encoded.OK {
|
||||
return nil, encoded.Value.(error)
|
||||
}
|
||||
data := encoded.Value.([]byte)
|
||||
|
||||
// json.Encoder.Encode adds a newline; remove it to match json.Marshal
|
||||
data := buf.Bytes()
|
||||
if len(data) > 0 && data[len(data)-1] == '\n' {
|
||||
data = data[:len(data)-1]
|
||||
}
|
||||
data = bytes.ReplaceAll(data, []byte(`\u003c`), []byte("<"))
|
||||
data = bytes.ReplaceAll(data, []byte(`\u003e`), []byte(">"))
|
||||
data = bytes.ReplaceAll(data, []byte(`\u0026`), []byte("&"))
|
||||
|
||||
// Return a copy since the buffer will be reused
|
||||
result := make([]byte, len(data))
|
||||
copy(result, data)
|
||||
return result, nil
|
||||
// Return a copy since callers may retain the slice after subsequent calls.
|
||||
out := make([]byte, len(data))
|
||||
copy(out, data)
|
||||
return out, nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -5,11 +5,8 @@ import (
|
|||
"bytes"
|
||||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
|
||||
core "dappco.re/go/core"
|
||||
|
||||
|
|
@ -93,7 +90,7 @@ func CreateMinerBundle(minerPath string, profileJSON []byte, name string, passwo
|
|||
|
||||
// Create a tarball with the miner binary
|
||||
tarData, err := createTarball(map[string][]byte{
|
||||
filepath.Base(minerPath): minerData,
|
||||
core.PathBase(minerPath): minerData,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, core.E("CreateMinerBundle", "failed to create tarball", err)
|
||||
|
|
@ -211,7 +208,7 @@ func createTarball(files map[string][]byte) ([]byte, error) {
|
|||
|
||||
for name, content := range files {
|
||||
// Create parent directories if needed
|
||||
dir := filepath.Dir(name)
|
||||
dir := core.PathDir(name)
|
||||
if dir != "." && !dirs[dir] {
|
||||
hdr := &tar.Header{
|
||||
Name: dir + "/",
|
||||
|
|
@ -226,7 +223,7 @@ func createTarball(files map[string][]byte) ([]byte, error) {
|
|||
|
||||
// Determine file mode (executable for binaries in miners/)
|
||||
mode := int64(0644)
|
||||
if filepath.Dir(name) == "miners" || !isJSON(content) {
|
||||
if core.PathDir(name) == "miners" || !isJSON(content) {
|
||||
mode = 0755
|
||||
}
|
||||
|
||||
|
|
@ -253,11 +250,16 @@ func createTarball(files map[string][]byte) ([]byte, error) {
|
|||
// extractTarball extracts a tar archive to a directory, returns first executable found.
|
||||
func extractTarball(tarData []byte, destDir string) (string, error) {
|
||||
// Ensure destDir is an absolute, clean path for security checks
|
||||
absDestDir, err := filepath.Abs(destDir)
|
||||
if err != nil {
|
||||
return "", core.E("extractTarball", "failed to resolve destination directory", err)
|
||||
absDestDir := destDir
|
||||
if !core.PathIsAbs(absDestDir) {
|
||||
cwd, err := os.Getwd()
|
||||
if err != nil {
|
||||
return "", core.E("extractTarball", "failed to resolve destination directory", err)
|
||||
}
|
||||
absDestDir = core.CleanPath(core.Concat(cwd, string(os.PathSeparator), absDestDir), string(os.PathSeparator))
|
||||
} else {
|
||||
absDestDir = core.CleanPath(absDestDir, string(os.PathSeparator))
|
||||
}
|
||||
absDestDir = filepath.Clean(absDestDir)
|
||||
|
||||
if err := fsEnsureDir(absDestDir); err != nil {
|
||||
return "", err
|
||||
|
|
@ -276,24 +278,27 @@ func extractTarball(tarData []byte, destDir string) (string, error) {
|
|||
}
|
||||
|
||||
// Security: Sanitize the tar entry name to prevent path traversal (Zip Slip)
|
||||
cleanName := filepath.Clean(hdr.Name)
|
||||
cleanName := core.CleanPath(hdr.Name, "/")
|
||||
|
||||
// Reject absolute paths
|
||||
if filepath.IsAbs(cleanName) {
|
||||
if core.PathIsAbs(cleanName) {
|
||||
return "", core.E("extractTarball", "invalid tar entry: absolute path not allowed: "+hdr.Name, nil)
|
||||
}
|
||||
|
||||
// Reject paths that escape the destination directory
|
||||
if strings.HasPrefix(cleanName, ".."+string(os.PathSeparator)) || cleanName == ".." {
|
||||
if core.HasPrefix(cleanName, "../") || cleanName == ".." {
|
||||
return "", core.E("extractTarball", "invalid tar entry: path traversal attempt: "+hdr.Name, nil)
|
||||
}
|
||||
|
||||
// Build the full path and verify it's within destDir
|
||||
fullPath := filepath.Join(absDestDir, cleanName)
|
||||
fullPath = filepath.Clean(fullPath)
|
||||
fullPath := core.CleanPath(core.Concat(absDestDir, string(os.PathSeparator), cleanName), string(os.PathSeparator))
|
||||
|
||||
// Final security check: ensure the path is still within destDir
|
||||
if !strings.HasPrefix(fullPath, absDestDir+string(os.PathSeparator)) && fullPath != absDestDir {
|
||||
allowedPrefix := core.Concat(absDestDir, string(os.PathSeparator))
|
||||
if absDestDir == string(os.PathSeparator) {
|
||||
allowedPrefix = absDestDir
|
||||
}
|
||||
if !core.HasPrefix(fullPath, allowedPrefix) && fullPath != absDestDir {
|
||||
return "", core.E("extractTarball", "invalid tar entry: path escape attempt: "+hdr.Name, nil)
|
||||
}
|
||||
|
||||
|
|
@ -304,7 +309,7 @@ func extractTarball(tarData []byte, destDir string) (string, error) {
|
|||
}
|
||||
case tar.TypeReg:
|
||||
// Ensure parent directory exists
|
||||
if err := fsEnsureDir(filepath.Dir(fullPath)); err != nil {
|
||||
if err := fsEnsureDir(core.PathDir(fullPath)); err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
|
|
@ -345,16 +350,25 @@ func extractTarball(tarData []byte, destDir string) (string, error) {
|
|||
|
||||
// StreamBundle writes a bundle to a writer (for large transfers).
|
||||
func StreamBundle(bundle *Bundle, w io.Writer) error {
|
||||
encoder := json.NewEncoder(w)
|
||||
return encoder.Encode(bundle)
|
||||
result := core.JSONMarshal(bundle)
|
||||
if !result.OK {
|
||||
return result.Value.(error)
|
||||
}
|
||||
_, err := w.Write(result.Value.([]byte))
|
||||
return err
|
||||
}
|
||||
|
||||
// ReadBundle reads a bundle from a reader.
|
||||
func ReadBundle(r io.Reader) (*Bundle, error) {
|
||||
var bundle Bundle
|
||||
decoder := json.NewDecoder(r)
|
||||
if err := decoder.Decode(&bundle); err != nil {
|
||||
var buf bytes.Buffer
|
||||
if _, err := io.Copy(&buf, r); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var bundle Bundle
|
||||
result := core.JSONUnmarshal(buf.Bytes(), &bundle)
|
||||
if !result.OK {
|
||||
return nil, result.Value.(error)
|
||||
}
|
||||
return &bundle, nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2,7 +2,6 @@ package node
|
|||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
|
|
@ -137,7 +136,7 @@ func (c *Controller) GetRemoteStats(peerID string) (*StatsPayload, error) {
|
|||
}
|
||||
|
||||
// StartRemoteMiner requests a remote peer to start a miner with a given profile.
|
||||
func (c *Controller) StartRemoteMiner(peerID, minerType, profileID string, configOverride json.RawMessage) error {
|
||||
func (c *Controller) StartRemoteMiner(peerID, minerType, profileID string, configOverride RawMessage) error {
|
||||
identity := c.node.GetIdentity()
|
||||
if identity == nil {
|
||||
return ErrIdentityNotInitialized
|
||||
|
|
|
|||
|
|
@ -1,7 +1,6 @@
|
|||
package node
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"iter"
|
||||
"sync"
|
||||
|
||||
|
|
@ -69,7 +68,7 @@ func (d *Dispatcher) RegisterHandler(intentID byte, handler IntentHandler) {
|
|||
defer d.mu.Unlock()
|
||||
d.handlers[intentID] = handler
|
||||
d.log.Debug("handler registered", logging.Fields{
|
||||
"intent_id": fmt.Sprintf("0x%02X", intentID),
|
||||
"intent_id": core.Sprintf("0x%02X", intentID),
|
||||
})
|
||||
}
|
||||
|
||||
|
|
@ -108,7 +107,7 @@ func (d *Dispatcher) Dispatch(pkt *ueps.ParsedPacket) error {
|
|||
d.log.Warn("packet dropped: threat score exceeds safety threshold", logging.Fields{
|
||||
"threat_score": pkt.Header.ThreatScore,
|
||||
"threshold": ThreatScoreThreshold,
|
||||
"intent_id": fmt.Sprintf("0x%02X", pkt.Header.IntentID),
|
||||
"intent_id": core.Sprintf("0x%02X", pkt.Header.IntentID),
|
||||
"version": pkt.Header.Version,
|
||||
})
|
||||
return ErrThreatScoreExceeded
|
||||
|
|
@ -121,7 +120,7 @@ func (d *Dispatcher) Dispatch(pkt *ueps.ParsedPacket) error {
|
|||
|
||||
if !exists {
|
||||
d.log.Warn("packet dropped: unknown intent", logging.Fields{
|
||||
"intent_id": fmt.Sprintf("0x%02X", pkt.Header.IntentID),
|
||||
"intent_id": core.Sprintf("0x%02X", pkt.Header.IntentID),
|
||||
"version": pkt.Header.Version,
|
||||
})
|
||||
return ErrUnknownIntent
|
||||
|
|
@ -134,7 +133,7 @@ func (d *Dispatcher) Dispatch(pkt *ueps.ParsedPacket) error {
|
|||
var (
|
||||
// ErrThreatScoreExceeded is returned when a packet's ThreatScore exceeds
|
||||
// the safety threshold.
|
||||
ErrThreatScoreExceeded = core.E("Dispatcher.Dispatch", fmt.Sprintf("packet rejected: threat score exceeds safety threshold (%d)", ThreatScoreThreshold), nil)
|
||||
ErrThreatScoreExceeded = core.E("Dispatcher.Dispatch", core.Sprintf("packet rejected: threat score exceeds safety threshold (%d)", ThreatScoreThreshold), nil)
|
||||
|
||||
// ErrUnknownIntent is returned when no handler is registered for the
|
||||
// packet's IntentID.
|
||||
|
|
|
|||
|
|
@ -7,8 +7,6 @@ import (
|
|||
"crypto/rand"
|
||||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
|
|
@ -202,7 +200,7 @@ func (n *NodeManager) DeriveSharedSecret(peerPubKeyBase64 string) ([]byte, error
|
|||
// savePrivateKey saves the private key to disk with restricted permissions.
|
||||
func (n *NodeManager) savePrivateKey() error {
|
||||
// Ensure directory exists
|
||||
dir := filepath.Dir(n.keyPath)
|
||||
dir := core.PathDir(n.keyPath)
|
||||
if err := fsEnsureDir(dir); err != nil {
|
||||
return core.E("NodeManager.savePrivateKey", "failed to create key directory", err)
|
||||
}
|
||||
|
|
@ -218,15 +216,16 @@ func (n *NodeManager) savePrivateKey() error {
|
|||
// saveIdentity saves the public identity to the config file.
|
||||
func (n *NodeManager) saveIdentity() error {
|
||||
// Ensure directory exists
|
||||
dir := filepath.Dir(n.configPath)
|
||||
dir := core.PathDir(n.configPath)
|
||||
if err := fsEnsureDir(dir); err != nil {
|
||||
return core.E("NodeManager.saveIdentity", "failed to create config directory", err)
|
||||
}
|
||||
|
||||
data, err := json.MarshalIndent(n.identity, "", " ")
|
||||
if err != nil {
|
||||
return core.E("NodeManager.saveIdentity", "failed to marshal identity", err)
|
||||
result := core.JSONMarshal(n.identity)
|
||||
if !result.OK {
|
||||
return core.E("NodeManager.saveIdentity", "failed to marshal identity", result.Value.(error))
|
||||
}
|
||||
data := result.Value.([]byte)
|
||||
|
||||
if err := fsWrite(n.configPath, string(data)); err != nil {
|
||||
return core.E("NodeManager.saveIdentity", "failed to write identity", err)
|
||||
|
|
@ -244,8 +243,9 @@ func (n *NodeManager) loadIdentity() error {
|
|||
}
|
||||
|
||||
var identity NodeIdentity
|
||||
if err := json.Unmarshal([]byte(content), &identity); err != nil {
|
||||
return core.E("NodeManager.loadIdentity", "failed to unmarshal identity", err)
|
||||
result := core.JSONUnmarshalString(content, &identity)
|
||||
if !result.OK {
|
||||
return core.E("NodeManager.loadIdentity", "failed to unmarshal identity", result.Value.(error))
|
||||
}
|
||||
|
||||
// Load private key
|
||||
|
|
|
|||
|
|
@ -5,7 +5,6 @@ package levin
|
|||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"maps"
|
||||
"math"
|
||||
"slices"
|
||||
|
|
@ -394,7 +393,7 @@ func encodeValue(buf []byte, v Value) ([]byte, error) {
|
|||
return encodeSection(buf, v.objectVal)
|
||||
|
||||
default:
|
||||
return nil, core.E("levin.encodeValue", fmt.Sprintf("unknown type tag: 0x%02x", v.Type), ErrStorageUnknownType)
|
||||
return nil, core.E("levin.encodeValue", core.Sprintf("unknown type tag: 0x%02x", v.Type), ErrStorageUnknownType)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -441,7 +440,7 @@ func encodeArray(buf []byte, v Value) ([]byte, error) {
|
|||
return buf, nil
|
||||
|
||||
default:
|
||||
return nil, core.E("levin.encodeArray", fmt.Sprintf("unknown type tag: array of 0x%02x", elemType), ErrStorageUnknownType)
|
||||
return nil, core.E("levin.encodeArray", core.Sprintf("unknown type tag: array of 0x%02x", elemType), ErrStorageUnknownType)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -613,7 +612,7 @@ func decodeValue(buf []byte, tag uint8) (Value, int, error) {
|
|||
return Value{Type: TypeObject, objectVal: sec}, consumed, nil
|
||||
|
||||
default:
|
||||
return Value{}, 0, core.E("levin.decodeValue", fmt.Sprintf("unknown type tag: 0x%02x", tag), ErrStorageUnknownType)
|
||||
return Value{}, 0, core.E("levin.decodeValue", core.Sprintf("unknown type tag: 0x%02x", tag), ErrStorageUnknownType)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -681,6 +680,6 @@ func decodeArray(buf []byte, tag uint8) (Value, int, error) {
|
|||
return Value{Type: tag, objectArray: arr}, off, nil
|
||||
|
||||
default:
|
||||
return Value{}, 0, core.E("levin.decodeArray", fmt.Sprintf("unknown type tag: array of 0x%02x", elemType), ErrStorageUnknownType)
|
||||
return Value{}, 0, core.E("levin.decodeArray", core.Sprintf("unknown type tag: array of 0x%02x", elemType), ErrStorageUnknownType)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ import (
|
|||
"slices"
|
||||
"time"
|
||||
|
||||
core "dappco.re/go/core"
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
|
|
@ -20,6 +21,9 @@ const (
|
|||
// Used for version negotiation during handshake.
|
||||
var SupportedProtocolVersions = []string{"1.0"}
|
||||
|
||||
// RawMessage is the message payload byte slice used for deferred JSON decoding.
|
||||
type RawMessage = json.RawMessage
|
||||
|
||||
// IsProtocolVersionSupported checks if a given version is supported.
|
||||
func IsProtocolVersionSupported(version string) bool {
|
||||
return slices.Contains(SupportedProtocolVersions, version)
|
||||
|
|
@ -57,18 +61,18 @@ const (
|
|||
|
||||
// Message represents a P2P message between nodes.
|
||||
type Message struct {
|
||||
ID string `json:"id"` // UUID
|
||||
Type MessageType `json:"type"`
|
||||
From string `json:"from"` // Sender node ID
|
||||
To string `json:"to"` // Recipient node ID (empty for broadcast)
|
||||
Timestamp time.Time `json:"ts"`
|
||||
Payload json.RawMessage `json:"payload"`
|
||||
ReplyTo string `json:"replyTo,omitempty"` // ID of message being replied to
|
||||
ID string `json:"id"` // UUID
|
||||
Type MessageType `json:"type"`
|
||||
From string `json:"from"` // Sender node ID
|
||||
To string `json:"to"` // Recipient node ID (empty for broadcast)
|
||||
Timestamp time.Time `json:"ts"`
|
||||
Payload RawMessage `json:"payload"`
|
||||
ReplyTo string `json:"replyTo,omitempty"` // ID of message being replied to
|
||||
}
|
||||
|
||||
// NewMessage creates a new message with a generated ID and timestamp.
|
||||
func NewMessage(msgType MessageType, from, to string, payload any) (*Message, error) {
|
||||
var payloadBytes json.RawMessage
|
||||
var payloadBytes RawMessage
|
||||
if payload != nil {
|
||||
data, err := MarshalJSON(payload)
|
||||
if err != nil {
|
||||
|
|
@ -102,7 +106,11 @@ func (m *Message) ParsePayload(v any) error {
|
|||
if m.Payload == nil {
|
||||
return nil
|
||||
}
|
||||
return json.Unmarshal(m.Payload, v)
|
||||
result := core.JSONUnmarshal(m.Payload, v)
|
||||
if !result.OK {
|
||||
return result.Value.(error)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// --- Payload Types ---
|
||||
|
|
@ -135,9 +143,9 @@ type PongPayload struct {
|
|||
|
||||
// StartMinerPayload requests starting a miner.
|
||||
type StartMinerPayload struct {
|
||||
MinerType string `json:"minerType"` // Required: miner type (e.g., "xmrig", "tt-miner")
|
||||
ProfileID string `json:"profileId,omitempty"`
|
||||
Config json.RawMessage `json:"config,omitempty"` // Override profile config
|
||||
MinerType string `json:"minerType"` // Required: miner type (e.g., "xmrig", "tt-miner")
|
||||
ProfileID string `json:"profileId,omitempty"`
|
||||
Config RawMessage `json:"config,omitempty"` // Override profile config
|
||||
}
|
||||
|
||||
// StopMinerPayload requests stopping a miner.
|
||||
|
|
|
|||
16
node/peer.go
16
node/peer.go
|
|
@ -1,10 +1,8 @@
|
|||
package node
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"iter"
|
||||
"maps"
|
||||
"path/filepath"
|
||||
"regexp"
|
||||
"slices"
|
||||
"sync"
|
||||
|
|
@ -654,7 +652,7 @@ func (r *PeerRegistry) scheduleSave() {
|
|||
// Must be called with r.mu held (at least RLock).
|
||||
func (r *PeerRegistry) saveNow() error {
|
||||
// Ensure directory exists
|
||||
dir := filepath.Dir(r.path)
|
||||
dir := core.PathDir(r.path)
|
||||
if err := fsEnsureDir(dir); err != nil {
|
||||
return core.E("PeerRegistry.saveNow", "failed to create peers directory", err)
|
||||
}
|
||||
|
|
@ -662,10 +660,11 @@ func (r *PeerRegistry) saveNow() error {
|
|||
// Convert to slice for JSON
|
||||
peers := slices.Collect(maps.Values(r.peers))
|
||||
|
||||
data, err := json.MarshalIndent(peers, "", " ")
|
||||
if err != nil {
|
||||
return core.E("PeerRegistry.saveNow", "failed to marshal peers", err)
|
||||
result := core.JSONMarshal(peers)
|
||||
if !result.OK {
|
||||
return core.E("PeerRegistry.saveNow", "failed to marshal peers", result.Value.(error))
|
||||
}
|
||||
data := result.Value.([]byte)
|
||||
|
||||
// Use atomic write pattern: write to temp file, then rename
|
||||
tmpPath := r.path + ".tmp"
|
||||
|
|
@ -723,8 +722,9 @@ func (r *PeerRegistry) load() error {
|
|||
}
|
||||
|
||||
var peers []*Peer
|
||||
if err := json.Unmarshal([]byte(content), &peers); err != nil {
|
||||
return core.E("PeerRegistry.load", "failed to unmarshal peers", err)
|
||||
result := core.JSONUnmarshalString(content, &peers)
|
||||
if !result.OK {
|
||||
return core.E("PeerRegistry.load", "failed to unmarshal peers", result.Value.(error))
|
||||
}
|
||||
|
||||
r.peers = make(map[string]*Peer)
|
||||
|
|
|
|||
|
|
@ -1,8 +1,6 @@
|
|||
package node
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
core "dappco.re/go/core"
|
||||
)
|
||||
|
||||
|
|
@ -13,7 +11,7 @@ type ProtocolError struct {
|
|||
}
|
||||
|
||||
func (e *ProtocolError) Error() string {
|
||||
return fmt.Sprintf("remote error (%d): %s", e.Code, e.Message)
|
||||
return core.Sprintf("remote error (%d): %s", e.Code, e.Message)
|
||||
}
|
||||
|
||||
// ResponseHandler provides helpers for handling protocol responses.
|
||||
|
|
|
|||
|
|
@ -4,8 +4,6 @@ import (
|
|||
"context"
|
||||
"crypto/tls"
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"iter"
|
||||
"maps"
|
||||
"net/http"
|
||||
|
|
@ -457,7 +455,7 @@ func (t *Transport) handleWSUpgrade(w http.ResponseWriter, r *http.Request) {
|
|||
|
||||
// Decode handshake message (not encrypted yet, contains public key)
|
||||
var msg Message
|
||||
if err := json.Unmarshal(data, &msg); err != nil {
|
||||
if result := core.JSONUnmarshal(data, &msg); !result.OK {
|
||||
conn.Close()
|
||||
return
|
||||
}
|
||||
|
|
@ -485,7 +483,7 @@ func (t *Transport) handleWSUpgrade(w http.ResponseWriter, r *http.Request) {
|
|||
rejectPayload := HandshakeAckPayload{
|
||||
Identity: *identity,
|
||||
Accepted: false,
|
||||
Reason: fmt.Sprintf("incompatible protocol version %s, supported: %v", payload.Version, SupportedProtocolVersions),
|
||||
Reason: core.Sprintf("incompatible protocol version %s, supported: %v", payload.Version, SupportedProtocolVersions),
|
||||
}
|
||||
rejectMsg, _ := NewMessage(MsgHandshakeAck, identity.ID, payload.Identity.ID, rejectPayload)
|
||||
if rejectData, err := MarshalJSON(rejectMsg); err == nil {
|
||||
|
|
@ -660,8 +658,8 @@ func (t *Transport) performHandshake(pc *PeerConnection) error {
|
|||
}
|
||||
|
||||
var ackMsg Message
|
||||
if err := json.Unmarshal(ackData, &ackMsg); err != nil {
|
||||
return core.E("Transport.performHandshake", "unmarshal handshake ack", err)
|
||||
if result := core.JSONUnmarshal(ackData, &ackMsg); !result.OK {
|
||||
return core.E("Transport.performHandshake", "unmarshal handshake ack", result.Value.(error))
|
||||
}
|
||||
|
||||
if ackMsg.Type != MsgHandshakeAck {
|
||||
|
|
@ -932,8 +930,8 @@ func (t *Transport) decryptMessage(data []byte, sharedSecret []byte) (*Message,
|
|||
|
||||
// Parse message from JSON
|
||||
var msg Message
|
||||
if err := json.Unmarshal([]byte(smsgMsg.Body), &msg); err != nil {
|
||||
return nil, err
|
||||
if result := core.JSONUnmarshalString(smsgMsg.Body, &msg); !result.OK {
|
||||
return nil, result.Value.(error)
|
||||
}
|
||||
|
||||
return &msg, nil
|
||||
|
|
|
|||
|
|
@ -2,8 +2,6 @@ package node
|
|||
|
||||
import (
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
core "dappco.re/go/core"
|
||||
|
|
@ -331,8 +329,8 @@ func (w *Worker) handleDeploy(conn *PeerConnection, msg *Message) (*Message, err
|
|||
|
||||
// Unmarshal into interface{} to pass to ProfileManager
|
||||
var profile any
|
||||
if err := json.Unmarshal(profileData, &profile); err != nil {
|
||||
return nil, core.E("Worker.handleDeploy", "invalid profile data JSON", err)
|
||||
if result := core.JSONUnmarshal(profileData, &profile); !result.OK {
|
||||
return nil, core.E("Worker.handleDeploy", "invalid profile data JSON", result.Value.(error))
|
||||
}
|
||||
|
||||
if err := w.profileManager.SaveProfile(profile); err != nil {
|
||||
|
|
@ -353,8 +351,8 @@ func (w *Worker) handleDeploy(conn *PeerConnection, msg *Message) (*Message, err
|
|||
case BundleMiner, BundleFull:
|
||||
// Determine installation directory
|
||||
// We use w.DataDir/lethean-desktop/miners/<bundle_name>
|
||||
minersDir := filepath.Join(w.DataDir, "lethean-desktop", "miners")
|
||||
installDir := filepath.Join(minersDir, payload.Name)
|
||||
minersDir := core.JoinPath(w.DataDir, "lethean-desktop", "miners")
|
||||
installDir := core.JoinPath(minersDir, payload.Name)
|
||||
|
||||
logging.Info("deploying miner bundle", logging.Fields{
|
||||
"name": payload.Name,
|
||||
|
|
@ -371,8 +369,8 @@ func (w *Worker) handleDeploy(conn *PeerConnection, msg *Message) (*Message, err
|
|||
// If the bundle contained a profile config, save it
|
||||
if len(profileData) > 0 && w.profileManager != nil {
|
||||
var profile any
|
||||
if err := json.Unmarshal(profileData, &profile); err != nil {
|
||||
logging.Warn("failed to parse profile from miner bundle", logging.Fields{"error": err})
|
||||
if result := core.JSONUnmarshal(profileData, &profile); !result.OK {
|
||||
logging.Warn("failed to parse profile from miner bundle", logging.Fields{"error": result.Value.(error)})
|
||||
} else {
|
||||
if err := w.profileManager.SaveProfile(profile); err != nil {
|
||||
logging.Warn("failed to save profile from miner bundle", logging.Fields{"error": err})
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue