AX Principle 2: comments show HOW with real values, not prose restating what the name already says. Co-Authored-By: Charon <charon@lethean.io>
715 lines
18 KiB
Go
715 lines
18 KiB
Go
package node
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"os"
|
|
"path/filepath"
|
|
"regexp"
|
|
"sync"
|
|
"time"
|
|
|
|
"forge.lthn.ai/Snider/Mining/pkg/logging"
|
|
"forge.lthn.ai/Snider/Poindexter"
|
|
"github.com/adrg/xdg"
|
|
)
|
|
|
|
// Peer represents a known remote node.
|
|
type Peer struct {
|
|
ID string `json:"id"`
|
|
Name string `json:"name"`
|
|
PublicKey string `json:"publicKey"`
|
|
Address string `json:"address"` // host:port for WebSocket connection
|
|
Role NodeRole `json:"role"`
|
|
AddedAt time.Time `json:"addedAt"`
|
|
LastSeen time.Time `json:"lastSeen"`
|
|
|
|
// Poindexter metrics (updated dynamically)
|
|
PingMS float64 `json:"pingMs"` // Latency in milliseconds
|
|
Hops int `json:"hops"` // Network hop count
|
|
GeoKM float64 `json:"geoKm"` // Geographic distance in kilometers
|
|
Score float64 `json:"score"` // Reliability score 0-100
|
|
|
|
// Connection state (not persisted)
|
|
Connected bool `json:"-"`
|
|
}
|
|
|
|
// saveDebounceInterval is the minimum time between disk writes.
|
|
const saveDebounceInterval = 5 * time.Second
|
|
|
|
// PeerAuthMode controls how unknown peers are handled
|
|
type PeerAuthMode int
|
|
|
|
const (
|
|
// PeerAuthOpen allows any peer to connect (original behavior)
|
|
PeerAuthOpen PeerAuthMode = iota
|
|
// PeerAuthAllowlist only allows pre-registered peers or those with allowed public keys
|
|
PeerAuthAllowlist
|
|
)
|
|
|
|
// Peer name validation constants
|
|
const (
|
|
PeerNameMinLength = 1
|
|
PeerNameMaxLength = 64
|
|
)
|
|
|
|
// peerNamePattern is the raw pattern used to validate peer names.
|
|
// validatePeerName compiles it via regexp.Compile — no package-level Must* panic.
|
|
const peerNamePattern = `^[a-zA-Z0-9][a-zA-Z0-9\-_ ]{0,62}[a-zA-Z0-9]$|^[a-zA-Z0-9]$`
|
|
|
|
// safeKeyPrefix("abc123def456xyz789") // "abc123def456xyz7..."
|
|
// safeKeyPrefix("") // "(empty)"
|
|
func safeKeyPrefix(key string) string {
|
|
if len(key) >= 16 {
|
|
return key[:16] + "..."
|
|
}
|
|
if len(key) == 0 {
|
|
return "(empty)"
|
|
}
|
|
return key
|
|
}
|
|
|
|
// validatePeerName("my-worker") // nil
|
|
// validatePeerName("bad name!!!") // error: invalid characters
|
|
// validatePeerName("") // nil (empty names are optional)
|
|
func validatePeerName(name string) error {
|
|
if name == "" {
|
|
return nil // Empty names are allowed (optional field)
|
|
}
|
|
if len(name) < PeerNameMinLength {
|
|
return fmt.Errorf("peer name too short (min %d characters)", PeerNameMinLength)
|
|
}
|
|
if len(name) > PeerNameMaxLength {
|
|
return fmt.Errorf("peer name too long (max %d characters)", PeerNameMaxLength)
|
|
}
|
|
nameRegex, err := regexp.Compile(peerNamePattern)
|
|
if err != nil {
|
|
return fmt.Errorf("invalid peer name pattern: %w", err)
|
|
}
|
|
if !nameRegex.MatchString(name) {
|
|
return fmt.Errorf("peer name contains invalid characters (use alphanumeric, hyphens, underscores, spaces)")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// PeerRegistry manages known peers with KD-tree based selection.
|
|
type PeerRegistry struct {
|
|
peers map[string]*Peer
|
|
kdTree *poindexter.KDTree[string] // KD-tree with peer ID as payload
|
|
path string
|
|
mutex sync.RWMutex
|
|
|
|
// Authentication settings
|
|
authMode PeerAuthMode // How to handle unknown peers
|
|
allowedPublicKeys map[string]bool // Allowlist of public keys (when authMode is Allowlist)
|
|
allowedPublicKeyMutex sync.RWMutex // Protects allowedPublicKeys
|
|
|
|
// Debounce disk writes
|
|
dirty bool // Whether there are unsaved changes
|
|
saveTimer *time.Timer // Timer for debounced save
|
|
saveMutex sync.Mutex // Protects dirty and saveTimer
|
|
stopChan chan struct{} // Signal to stop background save
|
|
saveStopOnce sync.Once // Ensure stopChan is closed only once
|
|
}
|
|
|
|
// Dimension weights for peer selection.
|
|
// Lower ping, hops, geographic distance are better; higher score is better.
|
|
const (
|
|
pingWeight = 1.0
|
|
hopsWeight = 0.7
|
|
geographicWeight = 0.2
|
|
scoreWeight = 1.2
|
|
)
|
|
|
|
// registry, err := node.NewPeerRegistry()
|
|
// if err != nil { return err }
|
|
func NewPeerRegistry() (*PeerRegistry, error) {
|
|
peersPath, err := xdg.ConfigFile("lethean-desktop/peers.json")
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get peers path: %w", err)
|
|
}
|
|
|
|
return NewPeerRegistryWithPath(peersPath)
|
|
}
|
|
|
|
// registry, err := node.NewPeerRegistryWithPath("/tmp/test-peers.json")
|
|
// used in tests to avoid xdg path caching
|
|
func NewPeerRegistryWithPath(peersPath string) (*PeerRegistry, error) {
|
|
registry := &PeerRegistry{
|
|
peers: make(map[string]*Peer),
|
|
path: peersPath,
|
|
stopChan: make(chan struct{}),
|
|
authMode: PeerAuthOpen, // Default to open for backward compatibility
|
|
allowedPublicKeys: make(map[string]bool),
|
|
}
|
|
|
|
// Try to load existing peers
|
|
if err := registry.load(); err != nil {
|
|
// No existing peers, that's ok
|
|
registry.rebuildKDTree()
|
|
return registry, nil
|
|
}
|
|
|
|
registry.rebuildKDTree()
|
|
return registry, nil
|
|
}
|
|
|
|
// registry.SetAuthMode(PeerAuthAllowlist) // require pre-registration
|
|
// registry.SetAuthMode(PeerAuthOpen) // allow any peer (default)
|
|
func (r *PeerRegistry) SetAuthMode(mode PeerAuthMode) {
|
|
r.allowedPublicKeyMutex.Lock()
|
|
defer r.allowedPublicKeyMutex.Unlock()
|
|
r.authMode = mode
|
|
logging.Info("peer auth mode changed", logging.Fields{"mode": mode})
|
|
}
|
|
|
|
// if registry.GetAuthMode() == PeerAuthAllowlist { /* enforce allowlist */ }
|
|
func (r *PeerRegistry) GetAuthMode() PeerAuthMode {
|
|
r.allowedPublicKeyMutex.RLock()
|
|
defer r.allowedPublicKeyMutex.RUnlock()
|
|
return r.authMode
|
|
}
|
|
|
|
// registry.AllowPublicKey(peer.PublicKey) // permit this key without pre-registration
|
|
func (r *PeerRegistry) AllowPublicKey(publicKey string) {
|
|
r.allowedPublicKeyMutex.Lock()
|
|
defer r.allowedPublicKeyMutex.Unlock()
|
|
r.allowedPublicKeys[publicKey] = true
|
|
logging.Debug("public key added to allowlist", logging.Fields{"key": safeKeyPrefix(publicKey)})
|
|
}
|
|
|
|
// registry.RevokePublicKey(peer.PublicKey) // block this key on next connect attempt
|
|
func (r *PeerRegistry) RevokePublicKey(publicKey string) {
|
|
r.allowedPublicKeyMutex.Lock()
|
|
defer r.allowedPublicKeyMutex.Unlock()
|
|
delete(r.allowedPublicKeys, publicKey)
|
|
logging.Debug("public key removed from allowlist", logging.Fields{"key": safeKeyPrefix(publicKey)})
|
|
}
|
|
|
|
// if registry.IsPublicKeyAllowed(peer.PublicKey) { /* permit */ }
|
|
func (r *PeerRegistry) IsPublicKeyAllowed(publicKey string) bool {
|
|
r.allowedPublicKeyMutex.RLock()
|
|
defer r.allowedPublicKeyMutex.RUnlock()
|
|
return r.allowedPublicKeys[publicKey]
|
|
}
|
|
|
|
// if !registry.IsPeerAllowed(payload.Identity.ID, payload.Identity.PublicKey) { conn.Close(); return }
|
|
// true when Open mode, or Allowlist mode with pre-registered peer or allowlisted key
|
|
func (r *PeerRegistry) IsPeerAllowed(peerID string, publicKey string) bool {
|
|
r.allowedPublicKeyMutex.RLock()
|
|
authMode := r.authMode
|
|
keyAllowed := r.allowedPublicKeys[publicKey]
|
|
r.allowedPublicKeyMutex.RUnlock()
|
|
|
|
// Open mode allows everyone
|
|
if authMode == PeerAuthOpen {
|
|
return true
|
|
}
|
|
|
|
// Allowlist mode: check if peer is pre-registered
|
|
r.mutex.RLock()
|
|
_, isRegistered := r.peers[peerID]
|
|
r.mutex.RUnlock()
|
|
|
|
if isRegistered {
|
|
return true
|
|
}
|
|
|
|
// Check if public key is allowlisted
|
|
return keyAllowed
|
|
}
|
|
|
|
// keys := registry.ListAllowedPublicKeys() // for display or export
|
|
func (r *PeerRegistry) ListAllowedPublicKeys() []string {
|
|
r.allowedPublicKeyMutex.RLock()
|
|
defer r.allowedPublicKeyMutex.RUnlock()
|
|
|
|
keys := make([]string, 0, len(r.allowedPublicKeys))
|
|
for key := range r.allowedPublicKeys {
|
|
keys = append(keys, key)
|
|
}
|
|
return keys
|
|
}
|
|
|
|
// registry.AddPeer(&Peer{ID: "abc123", Name: "worker-1", Address: "10.0.0.2:9090", Role: RoleWorker})
|
|
// Persistence is debounced — call Close() before shutdown to flush all pending writes.
|
|
func (r *PeerRegistry) AddPeer(peer *Peer) error {
|
|
r.mutex.Lock()
|
|
|
|
if peer.ID == "" {
|
|
r.mutex.Unlock()
|
|
return fmt.Errorf("peer ID is required")
|
|
}
|
|
|
|
// Validate peer name (P2P-LOW-3)
|
|
if err := validatePeerName(peer.Name); err != nil {
|
|
r.mutex.Unlock()
|
|
return err
|
|
}
|
|
|
|
if _, exists := r.peers[peer.ID]; exists {
|
|
r.mutex.Unlock()
|
|
return fmt.Errorf("peer %s already exists", peer.ID)
|
|
}
|
|
|
|
// Set defaults
|
|
if peer.AddedAt.IsZero() {
|
|
peer.AddedAt = time.Now()
|
|
}
|
|
if peer.Score == 0 {
|
|
peer.Score = 50 // Default neutral score
|
|
}
|
|
|
|
r.peers[peer.ID] = peer
|
|
r.rebuildKDTree()
|
|
r.mutex.Unlock()
|
|
|
|
return r.save()
|
|
}
|
|
|
|
// registry.UpdatePeer(peer) // after handshake completes and real ID is known
|
|
// Note: Persistence is debounced. Call Close() to flush before shutdown.
|
|
func (r *PeerRegistry) UpdatePeer(peer *Peer) error {
|
|
r.mutex.Lock()
|
|
|
|
if _, exists := r.peers[peer.ID]; !exists {
|
|
r.mutex.Unlock()
|
|
return fmt.Errorf("peer %s not found", peer.ID)
|
|
}
|
|
|
|
r.peers[peer.ID] = peer
|
|
r.rebuildKDTree()
|
|
r.mutex.Unlock()
|
|
|
|
return r.save()
|
|
}
|
|
|
|
// registry.RemovePeer(peer.ID) // on manual disconnect or ban
|
|
// Note: Persistence is debounced. Call Close() to flush before shutdown.
|
|
func (r *PeerRegistry) RemovePeer(id string) error {
|
|
r.mutex.Lock()
|
|
|
|
if _, exists := r.peers[id]; !exists {
|
|
r.mutex.Unlock()
|
|
return fmt.Errorf("peer %s not found", id)
|
|
}
|
|
|
|
delete(r.peers, id)
|
|
r.rebuildKDTree()
|
|
r.mutex.Unlock()
|
|
|
|
return r.save()
|
|
}
|
|
|
|
// peer := registry.GetPeer("abc123def456")
|
|
// if peer == nil { return fmt.Errorf("peer not found") }
|
|
func (r *PeerRegistry) GetPeer(id string) *Peer {
|
|
r.mutex.RLock()
|
|
defer r.mutex.RUnlock()
|
|
|
|
peer, exists := r.peers[id]
|
|
if !exists {
|
|
return nil
|
|
}
|
|
|
|
// Return a copy
|
|
peerCopy := *peer
|
|
return &peerCopy
|
|
}
|
|
|
|
// for _, peer := range registry.ListPeers() { log(peer.ID, peer.Role) }
|
|
func (r *PeerRegistry) ListPeers() []*Peer {
|
|
r.mutex.RLock()
|
|
defer r.mutex.RUnlock()
|
|
|
|
peers := make([]*Peer, 0, len(r.peers))
|
|
for _, peer := range r.peers {
|
|
peerCopy := *peer
|
|
peers = append(peers, &peerCopy)
|
|
}
|
|
return peers
|
|
}
|
|
|
|
// registry.UpdateMetrics(peer.ID, rtt, peer.GeoKM, peer.Hops) // after PingPeer
|
|
// Note: Persistence is debounced. Call Close() to flush before shutdown.
|
|
func (r *PeerRegistry) UpdateMetrics(id string, pingMS, geoKM float64, hops int) error {
|
|
r.mutex.Lock()
|
|
|
|
peer, exists := r.peers[id]
|
|
if !exists {
|
|
r.mutex.Unlock()
|
|
return fmt.Errorf("peer %s not found", id)
|
|
}
|
|
|
|
peer.PingMS = pingMS
|
|
peer.GeoKM = geoKM
|
|
peer.Hops = hops
|
|
peer.LastSeen = time.Now()
|
|
|
|
r.rebuildKDTree()
|
|
r.mutex.Unlock()
|
|
|
|
return r.save()
|
|
}
|
|
|
|
// registry.UpdateScore(peer.ID, 75.0) // clamps to 0-100
|
|
// Note: Persistence is debounced. Call Close() to flush before shutdown.
|
|
func (r *PeerRegistry) UpdateScore(id string, score float64) error {
|
|
r.mutex.Lock()
|
|
|
|
peer, exists := r.peers[id]
|
|
if !exists {
|
|
r.mutex.Unlock()
|
|
return fmt.Errorf("peer %s not found", id)
|
|
}
|
|
|
|
// Clamp score to 0-100
|
|
if score < 0 {
|
|
score = 0
|
|
} else if score > 100 {
|
|
score = 100
|
|
}
|
|
|
|
peer.Score = score
|
|
r.rebuildKDTree()
|
|
r.mutex.Unlock()
|
|
|
|
return r.save()
|
|
}
|
|
|
|
// registry.SetConnected(peer.ID, true) // on connect
|
|
// registry.SetConnected(peer.ID, false) // on disconnect or error
|
|
func (r *PeerRegistry) SetConnected(id string, connected bool) {
|
|
r.mutex.Lock()
|
|
defer r.mutex.Unlock()
|
|
|
|
if peer, exists := r.peers[id]; exists {
|
|
peer.Connected = connected
|
|
if connected {
|
|
peer.LastSeen = time.Now()
|
|
}
|
|
}
|
|
}
|
|
|
|
// Score adjustment constants
|
|
const (
|
|
ScoreSuccessIncrement = 1.0 // Increment for successful interaction
|
|
ScoreFailureDecrement = 5.0 // Decrement for failed interaction
|
|
ScoreTimeoutDecrement = 3.0 // Decrement for timeout
|
|
ScoreMinimum = 0.0 // Minimum score
|
|
ScoreMaximum = 100.0 // Maximum score
|
|
ScoreDefault = 50.0 // Default score for new peers
|
|
)
|
|
|
|
// registry.RecordSuccess(peer.ID) // after a successful response
|
|
func (r *PeerRegistry) RecordSuccess(id string) {
|
|
r.mutex.Lock()
|
|
peer, exists := r.peers[id]
|
|
if !exists {
|
|
r.mutex.Unlock()
|
|
return
|
|
}
|
|
|
|
peer.Score = min(peer.Score+ScoreSuccessIncrement, ScoreMaximum)
|
|
peer.LastSeen = time.Now()
|
|
r.mutex.Unlock()
|
|
r.save()
|
|
}
|
|
|
|
// registry.RecordFailure(peer.ID) // after a failed send or error response
|
|
func (r *PeerRegistry) RecordFailure(id string) {
|
|
r.mutex.Lock()
|
|
peer, exists := r.peers[id]
|
|
if !exists {
|
|
r.mutex.Unlock()
|
|
return
|
|
}
|
|
|
|
peer.Score = max(peer.Score-ScoreFailureDecrement, ScoreMinimum)
|
|
newScore := peer.Score
|
|
r.mutex.Unlock()
|
|
r.save()
|
|
|
|
logging.Debug("peer score decreased", logging.Fields{
|
|
"peer_id": id,
|
|
"new_score": newScore,
|
|
"reason": "failure",
|
|
})
|
|
}
|
|
|
|
// registry.RecordTimeout(peer.ID) // after a request deadline exceeded
|
|
func (r *PeerRegistry) RecordTimeout(id string) {
|
|
r.mutex.Lock()
|
|
peer, exists := r.peers[id]
|
|
if !exists {
|
|
r.mutex.Unlock()
|
|
return
|
|
}
|
|
|
|
peer.Score = max(peer.Score-ScoreTimeoutDecrement, ScoreMinimum)
|
|
newScore := peer.Score
|
|
r.mutex.Unlock()
|
|
r.save()
|
|
|
|
logging.Debug("peer score decreased", logging.Fields{
|
|
"peer_id": id,
|
|
"new_score": newScore,
|
|
"reason": "timeout",
|
|
})
|
|
}
|
|
|
|
// for _, peer := range registry.GetPeersByScore() { log(peer.ID, peer.Score) }
|
|
func (r *PeerRegistry) GetPeersByScore() []*Peer {
|
|
r.mutex.RLock()
|
|
defer r.mutex.RUnlock()
|
|
|
|
peers := make([]*Peer, 0, len(r.peers))
|
|
for _, peer := range r.peers {
|
|
peers = append(peers, peer)
|
|
}
|
|
|
|
// Sort by score descending
|
|
for i := 0; i < len(peers)-1; i++ {
|
|
for j := i + 1; j < len(peers); j++ {
|
|
if peers[j].Score > peers[i].Score {
|
|
peers[i], peers[j] = peers[j], peers[i]
|
|
}
|
|
}
|
|
}
|
|
|
|
return peers
|
|
}
|
|
|
|
// peer := registry.SelectOptimalPeer()
|
|
// if peer != nil { ctrl.ConnectToPeer(peer.ID) }
|
|
func (r *PeerRegistry) SelectOptimalPeer() *Peer {
|
|
r.mutex.RLock()
|
|
defer r.mutex.RUnlock()
|
|
|
|
if r.kdTree == nil || len(r.peers) == 0 {
|
|
return nil
|
|
}
|
|
|
|
// Target: ideal peer (0 ping, 0 hops, 0 geo, 100 score)
|
|
// Score is inverted (100 - score) so lower is better in the tree
|
|
target := []float64{0, 0, 0, 0}
|
|
|
|
result, _, found := r.kdTree.Nearest(target)
|
|
if !found {
|
|
return nil
|
|
}
|
|
|
|
peer, exists := r.peers[result.Value]
|
|
if !exists {
|
|
return nil
|
|
}
|
|
|
|
peerCopy := *peer
|
|
return &peerCopy
|
|
}
|
|
|
|
// peers := registry.SelectNearestPeers(3) // top 3 peers by ping, hops, geo, score
|
|
// for _, peer := range peers { ctrl.ConnectToPeer(peer.ID) }
|
|
func (r *PeerRegistry) SelectNearestPeers(count int) []*Peer {
|
|
r.mutex.RLock()
|
|
defer r.mutex.RUnlock()
|
|
|
|
if r.kdTree == nil || len(r.peers) == 0 {
|
|
return nil
|
|
}
|
|
|
|
// Target: ideal peer
|
|
target := []float64{0, 0, 0, 0}
|
|
|
|
results, _ := r.kdTree.KNearest(target, count)
|
|
|
|
peers := make([]*Peer, 0, len(results))
|
|
for _, result := range results {
|
|
if peer, exists := r.peers[result.Value]; exists {
|
|
peerCopy := *peer
|
|
peers = append(peers, &peerCopy)
|
|
}
|
|
}
|
|
|
|
return peers
|
|
}
|
|
|
|
// for _, peer := range registry.GetConnectedPeers() { ctrl.GetRemoteStats(peer.ID) }
|
|
func (r *PeerRegistry) GetConnectedPeers() []*Peer {
|
|
r.mutex.RLock()
|
|
defer r.mutex.RUnlock()
|
|
|
|
peers := make([]*Peer, 0)
|
|
for _, peer := range r.peers {
|
|
if peer.Connected {
|
|
peerCopy := *peer
|
|
peers = append(peers, &peerCopy)
|
|
}
|
|
}
|
|
return peers
|
|
}
|
|
|
|
// if registry.Count() == 0 { return ErrNoPeers }
|
|
func (r *PeerRegistry) Count() int {
|
|
r.mutex.RLock()
|
|
defer r.mutex.RUnlock()
|
|
return len(r.peers)
|
|
}
|
|
|
|
// rebuildKDTree rebuilds the KD-tree from current peers.
|
|
// Must be called with lock held.
|
|
func (r *PeerRegistry) rebuildKDTree() {
|
|
if len(r.peers) == 0 {
|
|
r.kdTree = nil
|
|
return
|
|
}
|
|
|
|
points := make([]poindexter.KDPoint[string], 0, len(r.peers))
|
|
for _, peer := range r.peers {
|
|
// Build 4D point with weighted, normalized values
|
|
// Invert score so that higher score = lower value (better)
|
|
point := poindexter.KDPoint[string]{
|
|
ID: peer.ID,
|
|
Coords: []float64{
|
|
peer.PingMS * pingWeight,
|
|
float64(peer.Hops) * hopsWeight,
|
|
peer.GeoKM * geographicWeight,
|
|
(100 - peer.Score) * scoreWeight, // Invert score
|
|
},
|
|
Value: peer.ID,
|
|
}
|
|
points = append(points, point)
|
|
}
|
|
|
|
// Build KD-tree with Euclidean distance
|
|
tree, err := poindexter.NewKDTree(points, poindexter.WithMetric(poindexter.EuclideanDistance{}))
|
|
if err != nil {
|
|
// Log error but continue - worst case we don't have optimal selection
|
|
return
|
|
}
|
|
|
|
r.kdTree = tree
|
|
}
|
|
|
|
// r.scheduleSave() // called by AddPeer, UpdatePeer, RemovePeer, UpdateMetrics, UpdateScore
|
|
// Multiple writes within saveDebounceInterval are coalesced into one disk write.
|
|
// Must NOT be called with r.mutex held.
|
|
func (r *PeerRegistry) scheduleSave() {
|
|
r.saveMutex.Lock()
|
|
defer r.saveMutex.Unlock()
|
|
|
|
r.dirty = true
|
|
|
|
// If timer already running, let it handle the save
|
|
if r.saveTimer != nil {
|
|
return
|
|
}
|
|
|
|
// Start a new timer
|
|
r.saveTimer = time.AfterFunc(saveDebounceInterval, func() {
|
|
r.saveMutex.Lock()
|
|
r.saveTimer = nil
|
|
shouldSave := r.dirty
|
|
r.dirty = false
|
|
r.saveMutex.Unlock()
|
|
|
|
if shouldSave {
|
|
r.mutex.RLock()
|
|
err := r.saveNow()
|
|
r.mutex.RUnlock()
|
|
if err != nil {
|
|
// Log error but continue - best effort persistence
|
|
logging.Warn("failed to save peer registry", logging.Fields{"error": err})
|
|
}
|
|
}
|
|
})
|
|
}
|
|
|
|
// saveNow persists peers to disk immediately.
|
|
// Must be called with r.mutex held (at least RLock).
|
|
func (r *PeerRegistry) saveNow() error {
|
|
// Ensure directory exists
|
|
directoryPath := filepath.Dir(r.path)
|
|
if err := os.MkdirAll(directoryPath, 0755); err != nil {
|
|
return fmt.Errorf("failed to create peers directory: %w", err)
|
|
}
|
|
|
|
// Convert to slice for JSON
|
|
peers := make([]*Peer, 0, len(r.peers))
|
|
for _, peer := range r.peers {
|
|
peers = append(peers, peer)
|
|
}
|
|
|
|
data, err := json.MarshalIndent(peers, "", " ")
|
|
if err != nil {
|
|
return fmt.Errorf("failed to marshal peers: %w", err)
|
|
}
|
|
|
|
// Use atomic write pattern: write to temp file, then rename
|
|
temporaryPath := r.path + ".tmp"
|
|
if err := os.WriteFile(temporaryPath, data, 0644); err != nil {
|
|
return fmt.Errorf("failed to write peers temp file: %w", err)
|
|
}
|
|
|
|
if err := os.Rename(temporaryPath, r.path); err != nil {
|
|
os.Remove(temporaryPath) // Clean up temp file
|
|
return fmt.Errorf("failed to rename peers file: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Close flushes any pending changes and releases resources.
|
|
func (r *PeerRegistry) Close() error {
|
|
r.saveStopOnce.Do(func() {
|
|
close(r.stopChan)
|
|
})
|
|
|
|
// Cancel pending timer and save immediately if dirty
|
|
r.saveMutex.Lock()
|
|
if r.saveTimer != nil {
|
|
r.saveTimer.Stop()
|
|
r.saveTimer = nil
|
|
}
|
|
shouldSave := r.dirty
|
|
r.dirty = false
|
|
r.saveMutex.Unlock()
|
|
|
|
if shouldSave {
|
|
r.mutex.RLock()
|
|
err := r.saveNow()
|
|
r.mutex.RUnlock()
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// save is a helper that schedules a debounced save.
|
|
// Kept for backward compatibility but now debounces writes.
|
|
// Must NOT be called with r.mutex held.
|
|
func (r *PeerRegistry) save() error {
|
|
r.scheduleSave()
|
|
return nil // Errors will be logged asynchronously
|
|
}
|
|
|
|
// load reads peers from disk.
|
|
func (r *PeerRegistry) load() error {
|
|
data, err := os.ReadFile(r.path)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to read peers: %w", err)
|
|
}
|
|
|
|
var peers []*Peer
|
|
if err := json.Unmarshal(data, &peers); err != nil {
|
|
return fmt.Errorf("failed to unmarshal peers: %w", err)
|
|
}
|
|
|
|
r.peers = make(map[string]*Peer)
|
|
for _, peer := range peers {
|
|
r.peers[peer.ID] = peer
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|