Mining/pkg/node/peer.go
Claude 836fc18ff3
ax(node): replace prose comments with usage examples (AX Principle 2)
Comments on saveNow, save, load, Close, and DefaultResponseHandler
restated what the signature already said. Replace with concrete call
examples showing how each is actually invoked.

Co-Authored-By: Charon <charon@lethean.io>
2026-04-02 12:30:42 +01:00

713 lines
18 KiB
Go

package node
import (
"encoding/json"
"fmt"
"os"
"path/filepath"
"regexp"
"sync"
"time"
"forge.lthn.ai/Snider/Mining/pkg/logging"
"forge.lthn.ai/Snider/Poindexter"
"github.com/adrg/xdg"
)
// Peer represents a known remote node.
type Peer struct {
ID string `json:"id"`
Name string `json:"name"`
PublicKey string `json:"publicKey"`
Address string `json:"address"` // host:port for WebSocket connection
Role NodeRole `json:"role"`
AddedAt time.Time `json:"addedAt"`
LastSeen time.Time `json:"lastSeen"`
// Poindexter metrics (updated dynamically)
PingMS float64 `json:"pingMs"` // Latency in milliseconds
Hops int `json:"hops"` // Network hop count
GeoKM float64 `json:"geoKm"` // Geographic distance in kilometers
Score float64 `json:"score"` // Reliability score 0-100
// Connection state (not persisted)
Connected bool `json:"-"`
}
// r.scheduleSave() // coalesces writes; disk flush at most once per saveDebounceInterval
const saveDebounceInterval = 5 * time.Second
// PeerAuthMode controls how unknown peers are handled
type PeerAuthMode int
const (
// PeerAuthOpen allows any peer to connect (original behavior)
PeerAuthOpen PeerAuthMode = iota
// PeerAuthAllowlist only allows pre-registered peers or those with allowed public keys
PeerAuthAllowlist
)
// Peer name validation constants
const (
PeerNameMinLength = 1
PeerNameMaxLength = 64
)
// peerNamePattern is the raw pattern used to validate peer names.
// validatePeerName compiles it via regexp.Compile — no package-level Must* panic.
const peerNamePattern = `^[a-zA-Z0-9][a-zA-Z0-9\-_ ]{0,62}[a-zA-Z0-9]$|^[a-zA-Z0-9]$`
// safeKeyPrefix("abc123def456xyz789") // "abc123def456xyz7..."
// safeKeyPrefix("") // "(empty)"
func safeKeyPrefix(key string) string {
if len(key) >= 16 {
return key[:16] + "..."
}
if len(key) == 0 {
return "(empty)"
}
return key
}
// validatePeerName("my-worker") // nil
// validatePeerName("bad name!!!") // error: invalid characters
// validatePeerName("") // nil (empty names are optional)
func validatePeerName(name string) error {
if name == "" {
return nil // Empty names are allowed (optional field)
}
if len(name) < PeerNameMinLength {
return fmt.Errorf("peer name too short (min %d characters)", PeerNameMinLength)
}
if len(name) > PeerNameMaxLength {
return fmt.Errorf("peer name too long (max %d characters)", PeerNameMaxLength)
}
nameRegex, err := regexp.Compile(peerNamePattern)
if err != nil {
return fmt.Errorf("invalid peer name pattern: %w", err)
}
if !nameRegex.MatchString(name) {
return fmt.Errorf("peer name contains invalid characters (use alphanumeric, hyphens, underscores, spaces)")
}
return nil
}
// PeerRegistry manages known peers with KD-tree based selection.
type PeerRegistry struct {
peers map[string]*Peer
kdTree *poindexter.KDTree[string] // KD-tree with peer ID as payload
path string
mutex sync.RWMutex
// Authentication settings
authMode PeerAuthMode // How to handle unknown peers
allowedPublicKeys map[string]bool // Allowlist of public keys (when authMode is Allowlist)
allowedPublicKeyMutex sync.RWMutex // Protects allowedPublicKeys
// Debounce disk writes
dirty bool // Whether there are unsaved changes
saveTimer *time.Timer // Timer for debounced save
saveMutex sync.Mutex // Protects dirty and saveTimer
stopChan chan struct{} // Signal to stop background save
saveStopOnce sync.Once // Ensure stopChan is closed only once
}
// Dimension weights for peer selection.
// Lower ping, hops, geographic distance are better; higher score is better.
const (
pingWeight = 1.0
hopsWeight = 0.7
geographicWeight = 0.2
scoreWeight = 1.2
)
// registry, err := node.NewPeerRegistry()
// if err != nil { return err }
func NewPeerRegistry() (*PeerRegistry, error) {
peersPath, err := xdg.ConfigFile("lethean-desktop/peers.json")
if err != nil {
return nil, fmt.Errorf("failed to get peers path: %w", err)
}
return NewPeerRegistryWithPath(peersPath)
}
// registry, err := node.NewPeerRegistryWithPath("/tmp/test-peers.json")
// used in tests to avoid xdg path caching
func NewPeerRegistryWithPath(peersPath string) (*PeerRegistry, error) {
registry := &PeerRegistry{
peers: make(map[string]*Peer),
path: peersPath,
stopChan: make(chan struct{}),
authMode: PeerAuthOpen, // Default to open for backward compatibility
allowedPublicKeys: make(map[string]bool),
}
// Try to load existing peers
if err := registry.load(); err != nil {
// No existing peers, that's ok
registry.rebuildKDTree()
return registry, nil
}
registry.rebuildKDTree()
return registry, nil
}
// registry.SetAuthMode(PeerAuthAllowlist) // require pre-registration
// registry.SetAuthMode(PeerAuthOpen) // allow any peer (default)
func (r *PeerRegistry) SetAuthMode(mode PeerAuthMode) {
r.allowedPublicKeyMutex.Lock()
defer r.allowedPublicKeyMutex.Unlock()
r.authMode = mode
logging.Info("peer auth mode changed", logging.Fields{"mode": mode})
}
// if registry.GetAuthMode() == PeerAuthAllowlist { /* enforce allowlist */ }
func (r *PeerRegistry) GetAuthMode() PeerAuthMode {
r.allowedPublicKeyMutex.RLock()
defer r.allowedPublicKeyMutex.RUnlock()
return r.authMode
}
// registry.AllowPublicKey(peer.PublicKey) // permit this key without pre-registration
func (r *PeerRegistry) AllowPublicKey(publicKey string) {
r.allowedPublicKeyMutex.Lock()
defer r.allowedPublicKeyMutex.Unlock()
r.allowedPublicKeys[publicKey] = true
logging.Debug("public key added to allowlist", logging.Fields{"key": safeKeyPrefix(publicKey)})
}
// registry.RevokePublicKey(peer.PublicKey) // block this key on next connect attempt
func (r *PeerRegistry) RevokePublicKey(publicKey string) {
r.allowedPublicKeyMutex.Lock()
defer r.allowedPublicKeyMutex.Unlock()
delete(r.allowedPublicKeys, publicKey)
logging.Debug("public key removed from allowlist", logging.Fields{"key": safeKeyPrefix(publicKey)})
}
// if registry.IsPublicKeyAllowed(peer.PublicKey) { /* permit */ }
func (r *PeerRegistry) IsPublicKeyAllowed(publicKey string) bool {
r.allowedPublicKeyMutex.RLock()
defer r.allowedPublicKeyMutex.RUnlock()
return r.allowedPublicKeys[publicKey]
}
// if !registry.IsPeerAllowed(payload.Identity.ID, payload.Identity.PublicKey) { conn.Close(); return }
// true when Open mode, or Allowlist mode with pre-registered peer or allowlisted key
func (r *PeerRegistry) IsPeerAllowed(peerID string, publicKey string) bool {
r.allowedPublicKeyMutex.RLock()
authMode := r.authMode
keyAllowed := r.allowedPublicKeys[publicKey]
r.allowedPublicKeyMutex.RUnlock()
// Open mode allows everyone
if authMode == PeerAuthOpen {
return true
}
// Allowlist mode: check if peer is pre-registered
r.mutex.RLock()
_, isRegistered := r.peers[peerID]
r.mutex.RUnlock()
if isRegistered {
return true
}
// Check if public key is allowlisted
return keyAllowed
}
// keys := registry.ListAllowedPublicKeys() // for display or export
func (r *PeerRegistry) ListAllowedPublicKeys() []string {
r.allowedPublicKeyMutex.RLock()
defer r.allowedPublicKeyMutex.RUnlock()
keys := make([]string, 0, len(r.allowedPublicKeys))
for key := range r.allowedPublicKeys {
keys = append(keys, key)
}
return keys
}
// registry.AddPeer(&Peer{ID: "abc123", Name: "worker-1", Address: "10.0.0.2:9090", Role: RoleWorker})
// Persistence is debounced — call Close() before shutdown to flush all pending writes.
func (r *PeerRegistry) AddPeer(peer *Peer) error {
r.mutex.Lock()
if peer.ID == "" {
r.mutex.Unlock()
return fmt.Errorf("peer ID is required")
}
// Validate peer name (P2P-LOW-3)
if err := validatePeerName(peer.Name); err != nil {
r.mutex.Unlock()
return err
}
if _, exists := r.peers[peer.ID]; exists {
r.mutex.Unlock()
return fmt.Errorf("peer %s already exists", peer.ID)
}
// Set defaults
if peer.AddedAt.IsZero() {
peer.AddedAt = time.Now()
}
if peer.Score == 0 {
peer.Score = 50 // Default neutral score
}
r.peers[peer.ID] = peer
r.rebuildKDTree()
r.mutex.Unlock()
return r.save()
}
// registry.UpdatePeer(peer) // after handshake completes and real ID is known
// Note: Persistence is debounced. Call Close() to flush before shutdown.
func (r *PeerRegistry) UpdatePeer(peer *Peer) error {
r.mutex.Lock()
if _, exists := r.peers[peer.ID]; !exists {
r.mutex.Unlock()
return fmt.Errorf("peer %s not found", peer.ID)
}
r.peers[peer.ID] = peer
r.rebuildKDTree()
r.mutex.Unlock()
return r.save()
}
// registry.RemovePeer(peer.ID) // on manual disconnect or ban
// Note: Persistence is debounced. Call Close() to flush before shutdown.
func (r *PeerRegistry) RemovePeer(id string) error {
r.mutex.Lock()
if _, exists := r.peers[id]; !exists {
r.mutex.Unlock()
return fmt.Errorf("peer %s not found", id)
}
delete(r.peers, id)
r.rebuildKDTree()
r.mutex.Unlock()
return r.save()
}
// peer := registry.GetPeer("abc123def456")
// if peer == nil { return fmt.Errorf("peer not found") }
func (r *PeerRegistry) GetPeer(id string) *Peer {
r.mutex.RLock()
defer r.mutex.RUnlock()
peer, exists := r.peers[id]
if !exists {
return nil
}
// Return a copy
peerCopy := *peer
return &peerCopy
}
// for _, peer := range registry.ListPeers() { log(peer.ID, peer.Role) }
func (r *PeerRegistry) ListPeers() []*Peer {
r.mutex.RLock()
defer r.mutex.RUnlock()
peers := make([]*Peer, 0, len(r.peers))
for _, peer := range r.peers {
peerCopy := *peer
peers = append(peers, &peerCopy)
}
return peers
}
// registry.UpdateMetrics(peer.ID, rtt, peer.GeoKM, peer.Hops) // after PingPeer
// Note: Persistence is debounced. Call Close() to flush before shutdown.
func (r *PeerRegistry) UpdateMetrics(id string, pingMS, geoKM float64, hops int) error {
r.mutex.Lock()
peer, exists := r.peers[id]
if !exists {
r.mutex.Unlock()
return fmt.Errorf("peer %s not found", id)
}
peer.PingMS = pingMS
peer.GeoKM = geoKM
peer.Hops = hops
peer.LastSeen = time.Now()
r.rebuildKDTree()
r.mutex.Unlock()
return r.save()
}
// registry.UpdateScore(peer.ID, 75.0) // clamps to 0-100
// Note: Persistence is debounced. Call Close() to flush before shutdown.
func (r *PeerRegistry) UpdateScore(id string, score float64) error {
r.mutex.Lock()
peer, exists := r.peers[id]
if !exists {
r.mutex.Unlock()
return fmt.Errorf("peer %s not found", id)
}
// Clamp score to 0-100
if score < 0 {
score = 0
} else if score > 100 {
score = 100
}
peer.Score = score
r.rebuildKDTree()
r.mutex.Unlock()
return r.save()
}
// registry.SetConnected(peer.ID, true) // on connect
// registry.SetConnected(peer.ID, false) // on disconnect or error
func (r *PeerRegistry) SetConnected(id string, connected bool) {
r.mutex.Lock()
defer r.mutex.Unlock()
if peer, exists := r.peers[id]; exists {
peer.Connected = connected
if connected {
peer.LastSeen = time.Now()
}
}
}
// Score adjustment constants
const (
ScoreSuccessIncrement = 1.0 // Increment for successful interaction
ScoreFailureDecrement = 5.0 // Decrement for failed interaction
ScoreTimeoutDecrement = 3.0 // Decrement for timeout
ScoreMinimum = 0.0 // Minimum score
ScoreMaximum = 100.0 // Maximum score
ScoreDefault = 50.0 // Default score for new peers
)
// registry.RecordSuccess(peer.ID) // after a successful response
func (r *PeerRegistry) RecordSuccess(id string) {
r.mutex.Lock()
peer, exists := r.peers[id]
if !exists {
r.mutex.Unlock()
return
}
peer.Score = min(peer.Score+ScoreSuccessIncrement, ScoreMaximum)
peer.LastSeen = time.Now()
r.mutex.Unlock()
r.save()
}
// registry.RecordFailure(peer.ID) // after a failed send or error response
func (r *PeerRegistry) RecordFailure(id string) {
r.mutex.Lock()
peer, exists := r.peers[id]
if !exists {
r.mutex.Unlock()
return
}
peer.Score = max(peer.Score-ScoreFailureDecrement, ScoreMinimum)
newScore := peer.Score
r.mutex.Unlock()
r.save()
logging.Debug("peer score decreased", logging.Fields{
"peer_id": id,
"new_score": newScore,
"reason": "failure",
})
}
// registry.RecordTimeout(peer.ID) // after a request deadline exceeded
func (r *PeerRegistry) RecordTimeout(id string) {
r.mutex.Lock()
peer, exists := r.peers[id]
if !exists {
r.mutex.Unlock()
return
}
peer.Score = max(peer.Score-ScoreTimeoutDecrement, ScoreMinimum)
newScore := peer.Score
r.mutex.Unlock()
r.save()
logging.Debug("peer score decreased", logging.Fields{
"peer_id": id,
"new_score": newScore,
"reason": "timeout",
})
}
// for _, peer := range registry.GetPeersByScore() { log(peer.ID, peer.Score) }
func (r *PeerRegistry) GetPeersByScore() []*Peer {
r.mutex.RLock()
defer r.mutex.RUnlock()
peers := make([]*Peer, 0, len(r.peers))
for _, peer := range r.peers {
peers = append(peers, peer)
}
// Sort by score descending
for i := 0; i < len(peers)-1; i++ {
for j := i + 1; j < len(peers); j++ {
if peers[j].Score > peers[i].Score {
peers[i], peers[j] = peers[j], peers[i]
}
}
}
return peers
}
// peer := registry.SelectOptimalPeer()
// if peer != nil { ctrl.ConnectToPeer(peer.ID) }
func (r *PeerRegistry) SelectOptimalPeer() *Peer {
r.mutex.RLock()
defer r.mutex.RUnlock()
if r.kdTree == nil || len(r.peers) == 0 {
return nil
}
// Target: ideal peer (0 ping, 0 hops, 0 geo, 100 score)
// Score is inverted (100 - score) so lower is better in the tree
target := []float64{0, 0, 0, 0}
result, _, found := r.kdTree.Nearest(target)
if !found {
return nil
}
peer, exists := r.peers[result.Value]
if !exists {
return nil
}
peerCopy := *peer
return &peerCopy
}
// peers := registry.SelectNearestPeers(3) // top 3 peers by ping, hops, geo, score
// for _, peer := range peers { ctrl.ConnectToPeer(peer.ID) }
func (r *PeerRegistry) SelectNearestPeers(count int) []*Peer {
r.mutex.RLock()
defer r.mutex.RUnlock()
if r.kdTree == nil || len(r.peers) == 0 {
return nil
}
// Target: ideal peer
target := []float64{0, 0, 0, 0}
results, _ := r.kdTree.KNearest(target, count)
peers := make([]*Peer, 0, len(results))
for _, result := range results {
if peer, exists := r.peers[result.Value]; exists {
peerCopy := *peer
peers = append(peers, &peerCopy)
}
}
return peers
}
// for _, peer := range registry.GetConnectedPeers() { ctrl.GetRemoteStats(peer.ID) }
func (r *PeerRegistry) GetConnectedPeers() []*Peer {
r.mutex.RLock()
defer r.mutex.RUnlock()
peers := make([]*Peer, 0)
for _, peer := range r.peers {
if peer.Connected {
peerCopy := *peer
peers = append(peers, &peerCopy)
}
}
return peers
}
// if registry.Count() == 0 { return ErrNoPeers }
func (r *PeerRegistry) Count() int {
r.mutex.RLock()
defer r.mutex.RUnlock()
return len(r.peers)
}
// r.rebuildKDTree() // called after AddPeer, UpdatePeer, RemovePeer, UpdateMetrics — lock must be held
func (r *PeerRegistry) rebuildKDTree() {
if len(r.peers) == 0 {
r.kdTree = nil
return
}
points := make([]poindexter.KDPoint[string], 0, len(r.peers))
for _, peer := range r.peers {
// Build 4D point with weighted, normalized values
// Invert score so that higher score = lower value (better)
point := poindexter.KDPoint[string]{
ID: peer.ID,
Coords: []float64{
peer.PingMS * pingWeight,
float64(peer.Hops) * hopsWeight,
peer.GeoKM * geographicWeight,
(100 - peer.Score) * scoreWeight, // Invert score
},
Value: peer.ID,
}
points = append(points, point)
}
// Build KD-tree with Euclidean distance
tree, err := poindexter.NewKDTree(points, poindexter.WithMetric(poindexter.EuclideanDistance{}))
if err != nil {
// Log error but continue - worst case we don't have optimal selection
return
}
r.kdTree = tree
}
// r.scheduleSave() // called by AddPeer, UpdatePeer, RemovePeer, UpdateMetrics, UpdateScore
// Multiple writes within saveDebounceInterval are coalesced into one disk write.
// Must NOT be called with r.mutex held.
func (r *PeerRegistry) scheduleSave() {
r.saveMutex.Lock()
defer r.saveMutex.Unlock()
r.dirty = true
// If timer already running, let it handle the save
if r.saveTimer != nil {
return
}
// Start a new timer
r.saveTimer = time.AfterFunc(saveDebounceInterval, func() {
r.saveMutex.Lock()
r.saveTimer = nil
shouldSave := r.dirty
r.dirty = false
r.saveMutex.Unlock()
if shouldSave {
r.mutex.RLock()
err := r.saveNow()
r.mutex.RUnlock()
if err != nil {
// Log error but continue - best effort persistence
logging.Warn("failed to save peer registry", logging.Fields{"error": err})
}
}
})
}
// r.mutex.RLock(); err := r.saveNow(); r.mutex.RUnlock()
// Must be called with r.mutex held (at least RLock).
func (r *PeerRegistry) saveNow() error {
// Ensure directory exists
directoryPath := filepath.Dir(r.path)
if err := os.MkdirAll(directoryPath, 0755); err != nil {
return fmt.Errorf("failed to create peers directory: %w", err)
}
// Convert to slice for JSON
peers := make([]*Peer, 0, len(r.peers))
for _, peer := range r.peers {
peers = append(peers, peer)
}
data, err := json.MarshalIndent(peers, "", " ")
if err != nil {
return fmt.Errorf("failed to marshal peers: %w", err)
}
// Use atomic write pattern: write to temp file, then rename
temporaryPath := r.path + ".tmp"
if err := os.WriteFile(temporaryPath, data, 0644); err != nil {
return fmt.Errorf("failed to write peers temp file: %w", err)
}
if err := os.Rename(temporaryPath, r.path); err != nil {
os.Remove(temporaryPath) // Clean up temp file
return fmt.Errorf("failed to rename peers file: %w", err)
}
return nil
}
// defer registry.Close() // flush pending writes on shutdown
func (r *PeerRegistry) Close() error {
r.saveStopOnce.Do(func() {
close(r.stopChan)
})
// Cancel pending timer and save immediately if dirty
r.saveMutex.Lock()
if r.saveTimer != nil {
r.saveTimer.Stop()
r.saveTimer = nil
}
shouldSave := r.dirty
r.dirty = false
r.saveMutex.Unlock()
if shouldSave {
r.mutex.RLock()
err := r.saveNow()
r.mutex.RUnlock()
return err
}
return nil
}
// r.save() // schedules a debounced write; errors logged asynchronously
// Must NOT be called with r.mutex held.
func (r *PeerRegistry) save() error {
r.scheduleSave()
return nil // Errors will be logged asynchronously
}
// if err := r.load(); err != nil { /* no existing peers, that's ok */ }
func (r *PeerRegistry) load() error {
data, err := os.ReadFile(r.path)
if err != nil {
return fmt.Errorf("failed to read peers: %w", err)
}
var peers []*Peer
if err := json.Unmarshal(data, &peers); err != nil {
return fmt.Errorf("failed to unmarshal peers: %w", err)
}
r.peers = make(map[string]*Peer)
for _, peer := range peers {
r.peers[peer.ID] = peer
}
return nil
}