Mining/pkg/node/peer.go
Claude 676d7b13a9
Some checks are pending
Security Scan / security (push) Waiting to run
Test / test (push) Waiting to run
ax(node): remove prose description lines from PeerRegistry method comments
AX Principle 2: comments show usage examples, not prose descriptions.
Five methods (AddPeer, UpdatePeer, RemovePeer, UpdateMetrics, UpdateScore)
had a redundant "Note: Persistence is debounced..." prose line appended
after the usage example. The usage example already communicates intent;
the prose line restated implementation detail without adding value.

Co-Authored-By: Charon <charon@lethean.io>
2026-04-02 16:33:20 +01:00

715 lines
20 KiB
Go

package node
import (
"encoding/json"
"fmt"
"os"
"path"
"regexp"
"sync"
"time"
"forge.lthn.ai/Snider/Mining/pkg/logging"
"forge.lthn.ai/Snider/Poindexter"
"github.com/adrg/xdg"
)
// peer := registry.GetPeer("abc123def456")
// registry.AddPeer(&Peer{ID: "abc123", Name: "worker-1", Address: "10.0.0.2:9091", Role: RoleWorker})
type Peer struct {
ID string `json:"id"`
Name string `json:"name"`
PublicKey string `json:"publicKey"`
Address string `json:"address"` // host:port for WebSocket connection
Role NodeRole `json:"role"`
AddedAt time.Time `json:"addedAt"`
LastSeen time.Time `json:"lastSeen"`
// Poindexter metrics (updated dynamically)
PingMS float64 `json:"pingMs"` // Latency in milliseconds
Hops int `json:"hops"` // Network hop count
GeoKM float64 `json:"geoKm"` // Geographic distance in kilometers
Score float64 `json:"score"` // Reliability score 0-100
// Connection state (not persisted)
Connected bool `json:"-"`
}
// registry.scheduleSave() // coalesces writes; disk flush at most once per saveDebounceInterval
const saveDebounceInterval = 5 * time.Second
// registry.SetAuthMode(PeerAuthAllowlist) // allow only pre-registered peers
// registry.SetAuthMode(PeerAuthOpen) // allow any peer (default)
type PeerAuthMode int
const (
// PeerAuthOpen allows any peer to connect (original behavior)
PeerAuthOpen PeerAuthMode = iota
// PeerAuthAllowlist only allows pre-registered peers or those with allowed public keys
PeerAuthAllowlist
)
// Peer name validation constants
const (
PeerNameMinLength = 1
PeerNameMaxLength = 64
)
// validatePeerName("worker-1") // nil
// validatePeerName("bad name!!!") // error: invalid characters
const peerNamePattern = `^[a-zA-Z0-9][a-zA-Z0-9\-_ ]{0,62}[a-zA-Z0-9]$|^[a-zA-Z0-9]$`
// safeKeyPrefix("abc123def456xyz789") // "abc123def456xyz7..."
// safeKeyPrefix("") // "(empty)"
func safeKeyPrefix(key string) string {
if len(key) >= 16 {
return key[:16] + "..."
}
if len(key) == 0 {
return "(empty)"
}
return key
}
// validatePeerName("my-worker") // nil
// validatePeerName("bad name!!!") // error: invalid characters
// validatePeerName("") // nil (empty names are optional)
func validatePeerName(name string) error {
if name == "" {
return nil // Empty names are allowed (optional field)
}
if len(name) < PeerNameMinLength {
return fmt.Errorf("peer name too short (min %d characters)", PeerNameMinLength)
}
if len(name) > PeerNameMaxLength {
return fmt.Errorf("peer name too long (max %d characters)", PeerNameMaxLength)
}
nameRegex, err := regexp.Compile(peerNamePattern)
if err != nil {
return fmt.Errorf("invalid peer name pattern: %w", err)
}
if !nameRegex.MatchString(name) {
return fmt.Errorf("peer name contains invalid characters (use alphanumeric, hyphens, underscores, spaces)")
}
return nil
}
// registry, err := node.NewPeerRegistry()
// peer := registry.SelectOptimalPeer()
// registry.UpdateMetrics(peer.ID, rtt, peer.GeoKM, peer.Hops)
type PeerRegistry struct {
peers map[string]*Peer
kdTree *poindexter.KDTree[string] // KD-tree with peer ID as payload
peersFilePath string
mutex sync.RWMutex
// Authentication settings
authMode PeerAuthMode // How to handle unknown peers
allowedPublicKeys map[string]bool // Allowlist of public keys (when authMode is Allowlist)
allowedPublicKeysMutex sync.RWMutex // Protects allowedPublicKeys
// Debounce disk writes
dirty bool // Whether there are unsaved changes
saveTimer *time.Timer // Timer for debounced save
saveMutex sync.Mutex // Protects dirty and saveTimer
stopChannel chan struct{} // Signal to stop background save
saveStopOnce sync.Once // Ensure stopChannel is closed only once
}
// Dimension weights for peer selection.
// Lower ping, hops, geographic distance are better; higher score is better.
const (
pingWeight = 1.0
hopsWeight = 0.7
geographicWeight = 0.2
scoreWeight = 1.2
)
// registry, err := node.NewPeerRegistry()
// if err != nil { return err }
func NewPeerRegistry() (*PeerRegistry, error) {
peersPath, err := xdg.ConfigFile("lethean-desktop/peers.json")
if err != nil {
return nil, fmt.Errorf("failed to get peers path: %w", err)
}
return NewPeerRegistryWithPath(peersPath)
}
// registry, err := node.NewPeerRegistryWithPath("/tmp/test-peers.json")
// used in tests to avoid xdg path caching
func NewPeerRegistryWithPath(peersPath string) (*PeerRegistry, error) {
registry := &PeerRegistry{
peers: make(map[string]*Peer),
peersFilePath: peersPath,
stopChannel: make(chan struct{}),
authMode: PeerAuthOpen, // Default to open for backward compatibility
allowedPublicKeys: make(map[string]bool),
}
// Try to load existing peers
if err := registry.load(); err != nil {
// No existing peers, that's ok
registry.rebuildKDTree()
return registry, nil
}
registry.rebuildKDTree()
return registry, nil
}
// registry.SetAuthMode(PeerAuthAllowlist) // require pre-registration
// registry.SetAuthMode(PeerAuthOpen) // allow any peer (default)
func (registry *PeerRegistry) SetAuthMode(mode PeerAuthMode) {
registry.allowedPublicKeysMutex.Lock()
defer registry.allowedPublicKeysMutex.Unlock()
registry.authMode = mode
logging.Info("peer auth mode changed", logging.Fields{"mode": mode})
}
// if registry.GetAuthMode() == PeerAuthAllowlist { /* enforce allowlist */ }
func (registry *PeerRegistry) GetAuthMode() PeerAuthMode {
registry.allowedPublicKeysMutex.RLock()
defer registry.allowedPublicKeysMutex.RUnlock()
return registry.authMode
}
// registry.AllowPublicKey(peer.PublicKey) // permit this key without pre-registration
func (registry *PeerRegistry) AllowPublicKey(publicKey string) {
registry.allowedPublicKeysMutex.Lock()
defer registry.allowedPublicKeysMutex.Unlock()
registry.allowedPublicKeys[publicKey] = true
logging.Debug("public key added to allowlist", logging.Fields{"key": safeKeyPrefix(publicKey)})
}
// registry.RevokePublicKey(peer.PublicKey) // block this key on next connect attempt
func (registry *PeerRegistry) RevokePublicKey(publicKey string) {
registry.allowedPublicKeysMutex.Lock()
defer registry.allowedPublicKeysMutex.Unlock()
delete(registry.allowedPublicKeys, publicKey)
logging.Debug("public key removed from allowlist", logging.Fields{"key": safeKeyPrefix(publicKey)})
}
// if registry.IsPublicKeyAllowed(peer.PublicKey) { /* permit */ }
func (registry *PeerRegistry) IsPublicKeyAllowed(publicKey string) bool {
registry.allowedPublicKeysMutex.RLock()
defer registry.allowedPublicKeysMutex.RUnlock()
return registry.allowedPublicKeys[publicKey]
}
// if !registry.IsPeerAllowed(payload.Identity.ID, payload.Identity.PublicKey) { conn.Close(); return }
// true when Open mode, or Allowlist mode with pre-registered peer or allowlisted key
func (registry *PeerRegistry) IsPeerAllowed(peerID string, publicKey string) bool {
registry.allowedPublicKeysMutex.RLock()
authMode := registry.authMode
keyAllowed := registry.allowedPublicKeys[publicKey]
registry.allowedPublicKeysMutex.RUnlock()
// Open mode allows everyone
if authMode == PeerAuthOpen {
return true
}
// Allowlist mode: check if peer is pre-registered
registry.mutex.RLock()
_, isRegistered := registry.peers[peerID]
registry.mutex.RUnlock()
if isRegistered {
return true
}
// Check if public key is allowlisted
return keyAllowed
}
// keys := registry.ListAllowedPublicKeys() // for display or export
func (registry *PeerRegistry) ListAllowedPublicKeys() []string {
registry.allowedPublicKeysMutex.RLock()
defer registry.allowedPublicKeysMutex.RUnlock()
keys := make([]string, 0, len(registry.allowedPublicKeys))
for key := range registry.allowedPublicKeys {
keys = append(keys, key)
}
return keys
}
// registry.AddPeer(&Peer{ID: "abc123", Name: "worker-1", Address: "10.0.0.2:9090", Role: RoleWorker})
func (registry *PeerRegistry) AddPeer(peer *Peer) error {
registry.mutex.Lock()
if peer.ID == "" {
registry.mutex.Unlock()
return fmt.Errorf("peer ID is required")
}
// Validate peer name (P2P-LOW-3)
if err := validatePeerName(peer.Name); err != nil {
registry.mutex.Unlock()
return err
}
if _, exists := registry.peers[peer.ID]; exists {
registry.mutex.Unlock()
return fmt.Errorf("peer %s already exists", peer.ID)
}
// Set defaults
if peer.AddedAt.IsZero() {
peer.AddedAt = time.Now()
}
if peer.Score == 0 {
peer.Score = 50 // Default neutral score
}
registry.peers[peer.ID] = peer
registry.rebuildKDTree()
registry.mutex.Unlock()
return registry.save()
}
// registry.UpdatePeer(peer) // after handshake completes and real ID is known
func (registry *PeerRegistry) UpdatePeer(peer *Peer) error {
registry.mutex.Lock()
if _, exists := registry.peers[peer.ID]; !exists {
registry.mutex.Unlock()
return fmt.Errorf("peer %s not found", peer.ID)
}
registry.peers[peer.ID] = peer
registry.rebuildKDTree()
registry.mutex.Unlock()
return registry.save()
}
// registry.RemovePeer(peer.ID) // on manual disconnect or ban
func (registry *PeerRegistry) RemovePeer(id string) error {
registry.mutex.Lock()
if _, exists := registry.peers[id]; !exists {
registry.mutex.Unlock()
return fmt.Errorf("peer %s not found", id)
}
delete(registry.peers, id)
registry.rebuildKDTree()
registry.mutex.Unlock()
return registry.save()
}
// peer := registry.GetPeer("abc123def456")
// if peer == nil { return fmt.Errorf("peer not found") }
func (registry *PeerRegistry) GetPeer(id string) *Peer {
registry.mutex.RLock()
defer registry.mutex.RUnlock()
peer, exists := registry.peers[id]
if !exists {
return nil
}
// Return a copy
peerCopy := *peer
return &peerCopy
}
// for _, peer := range registry.ListPeers() { log(peer.ID, peer.Role) }
func (registry *PeerRegistry) ListPeers() []*Peer {
registry.mutex.RLock()
defer registry.mutex.RUnlock()
peers := make([]*Peer, 0, len(registry.peers))
for _, peer := range registry.peers {
peerCopy := *peer
peers = append(peers, &peerCopy)
}
return peers
}
// registry.UpdateMetrics(peer.ID, rtt, peer.GeoKM, peer.Hops) // after PingPeer
func (registry *PeerRegistry) UpdateMetrics(id string, pingMS, geoKM float64, hops int) error {
registry.mutex.Lock()
peer, exists := registry.peers[id]
if !exists {
registry.mutex.Unlock()
return fmt.Errorf("peer %s not found", id)
}
peer.PingMS = pingMS
peer.GeoKM = geoKM
peer.Hops = hops
peer.LastSeen = time.Now()
registry.rebuildKDTree()
registry.mutex.Unlock()
return registry.save()
}
// registry.UpdateScore(peer.ID, 75.0) // clamps to 0-100
func (registry *PeerRegistry) UpdateScore(id string, score float64) error {
registry.mutex.Lock()
peer, exists := registry.peers[id]
if !exists {
registry.mutex.Unlock()
return fmt.Errorf("peer %s not found", id)
}
// Clamp score to 0-100
if score < 0 {
score = 0
} else if score > 100 {
score = 100
}
peer.Score = score
registry.rebuildKDTree()
registry.mutex.Unlock()
return registry.save()
}
// registry.SetConnected(peer.ID, true) // on connect
// registry.SetConnected(peer.ID, false) // on disconnect or error
func (registry *PeerRegistry) SetConnected(id string, connected bool) {
registry.mutex.Lock()
defer registry.mutex.Unlock()
if peer, exists := registry.peers[id]; exists {
peer.Connected = connected
if connected {
peer.LastSeen = time.Now()
}
}
}
// Score adjustment constants
// peer.Score = min(peer.Score+ScoreSuccessIncrement, ScoreMaximum)
// peer.Score = max(peer.Score-ScoreFailureDecrement, ScoreMinimum)
// peer.Score = max(peer.Score-ScoreTimeoutDecrement, ScoreMinimum)
const (
ScoreSuccessIncrement = 1.0 // peer.Score += ScoreSuccessIncrement
ScoreFailureDecrement = 5.0 // peer.Score -= ScoreFailureDecrement
ScoreTimeoutDecrement = 3.0 // peer.Score -= ScoreTimeoutDecrement
ScoreMinimum = 0.0 // peer.Score = max(newScore, ScoreMinimum)
ScoreMaximum = 100.0 // peer.Score = min(newScore, ScoreMaximum)
ScoreDefault = 50.0 // peer := &Peer{Score: ScoreDefault}
)
// registry.RecordSuccess(peer.ID) // after a successful response
func (registry *PeerRegistry) RecordSuccess(id string) {
registry.mutex.Lock()
peer, exists := registry.peers[id]
if !exists {
registry.mutex.Unlock()
return
}
peer.Score = min(peer.Score+ScoreSuccessIncrement, ScoreMaximum)
peer.LastSeen = time.Now()
registry.mutex.Unlock()
registry.save()
}
// registry.RecordFailure(peer.ID) // after a failed send or error response
func (registry *PeerRegistry) RecordFailure(id string) {
registry.mutex.Lock()
peer, exists := registry.peers[id]
if !exists {
registry.mutex.Unlock()
return
}
peer.Score = max(peer.Score-ScoreFailureDecrement, ScoreMinimum)
newScore := peer.Score
registry.mutex.Unlock()
registry.save()
logging.Debug("peer score decreased", logging.Fields{
"peer_id": id,
"new_score": newScore,
"reason": "failure",
})
}
// registry.RecordTimeout(peer.ID) // after a request deadline exceeded
func (registry *PeerRegistry) RecordTimeout(id string) {
registry.mutex.Lock()
peer, exists := registry.peers[id]
if !exists {
registry.mutex.Unlock()
return
}
peer.Score = max(peer.Score-ScoreTimeoutDecrement, ScoreMinimum)
newScore := peer.Score
registry.mutex.Unlock()
registry.save()
logging.Debug("peer score decreased", logging.Fields{
"peer_id": id,
"new_score": newScore,
"reason": "timeout",
})
}
// for _, peer := range registry.GetPeersByScore() { log(peer.ID, peer.Score) }
func (registry *PeerRegistry) GetPeersByScore() []*Peer {
registry.mutex.RLock()
defer registry.mutex.RUnlock()
peers := make([]*Peer, 0, len(registry.peers))
for _, peer := range registry.peers {
peers = append(peers, peer)
}
// Sort by score descending
for i := 0; i < len(peers)-1; i++ {
for j := i + 1; j < len(peers); j++ {
if peers[j].Score > peers[i].Score {
peers[i], peers[j] = peers[j], peers[i]
}
}
}
return peers
}
// peer := registry.SelectOptimalPeer()
// if peer != nil { ctrl.ConnectToPeer(peer.ID) }
func (registry *PeerRegistry) SelectOptimalPeer() *Peer {
registry.mutex.RLock()
defer registry.mutex.RUnlock()
if registry.kdTree == nil || len(registry.peers) == 0 {
return nil
}
// Target: ideal peer (0 ping, 0 hops, 0 geo, 100 score)
// Score is inverted (100 - score) so lower is better in the tree
target := []float64{0, 0, 0, 0}
result, _, found := registry.kdTree.Nearest(target)
if !found {
return nil
}
peer, exists := registry.peers[result.Value]
if !exists {
return nil
}
peerCopy := *peer
return &peerCopy
}
// peers := registry.SelectNearestPeers(3) // top 3 peers by ping, hops, geo, score
// for _, peer := range peers { ctrl.ConnectToPeer(peer.ID) }
func (registry *PeerRegistry) SelectNearestPeers(count int) []*Peer {
registry.mutex.RLock()
defer registry.mutex.RUnlock()
if registry.kdTree == nil || len(registry.peers) == 0 {
return nil
}
// Target: ideal peer
target := []float64{0, 0, 0, 0}
results, _ := registry.kdTree.KNearest(target, count)
peers := make([]*Peer, 0, len(results))
for _, result := range results {
if peer, exists := registry.peers[result.Value]; exists {
peerCopy := *peer
peers = append(peers, &peerCopy)
}
}
return peers
}
// for _, peer := range registry.GetConnectedPeers() { ctrl.GetRemoteStats(peer.ID) }
func (registry *PeerRegistry) GetConnectedPeers() []*Peer {
registry.mutex.RLock()
defer registry.mutex.RUnlock()
peers := make([]*Peer, 0)
for _, peer := range registry.peers {
if peer.Connected {
peerCopy := *peer
peers = append(peers, &peerCopy)
}
}
return peers
}
// if registry.Count() == 0 { return ErrNoPeers }
func (registry *PeerRegistry) Count() int {
registry.mutex.RLock()
defer registry.mutex.RUnlock()
return len(registry.peers)
}
// registry.rebuildKDTree() // called after AddPeer, UpdatePeer, RemovePeer, UpdateMetrics — lock must be held
func (registry *PeerRegistry) rebuildKDTree() {
if len(registry.peers) == 0 {
registry.kdTree = nil
return
}
points := make([]poindexter.KDPoint[string], 0, len(registry.peers))
for _, peer := range registry.peers {
// Build 4D point with weighted, normalized values
// Invert score so that higher score = lower value (better)
point := poindexter.KDPoint[string]{
ID: peer.ID,
Coords: []float64{
peer.PingMS * pingWeight,
float64(peer.Hops) * hopsWeight,
peer.GeoKM * geographicWeight,
(100 - peer.Score) * scoreWeight, // Invert score
},
Value: peer.ID,
}
points = append(points, point)
}
// Build KD-tree with Euclidean distance
tree, err := poindexter.NewKDTree(points, poindexter.WithMetric(poindexter.EuclideanDistance{}))
if err != nil {
// Log error but continue - worst case we don't have optimal selection
return
}
registry.kdTree = tree
}
// registry.scheduleSave() // called by AddPeer, UpdatePeer, RemovePeer, UpdateMetrics, UpdateScore
// Multiple writes within saveDebounceInterval are coalesced into one disk write.
// Must NOT be called with registry.mutex held.
func (registry *PeerRegistry) scheduleSave() {
registry.saveMutex.Lock()
defer registry.saveMutex.Unlock()
registry.dirty = true
// If timer already running, let it handle the save
if registry.saveTimer != nil {
return
}
// Start a new timer
registry.saveTimer = time.AfterFunc(saveDebounceInterval, func() {
registry.saveMutex.Lock()
registry.saveTimer = nil
shouldSave := registry.dirty
registry.dirty = false
registry.saveMutex.Unlock()
if shouldSave {
registry.mutex.RLock()
err := registry.saveNow()
registry.mutex.RUnlock()
if err != nil {
// Log error but continue - best effort persistence
logging.Warn("failed to save peer registry", logging.Fields{"error": err})
}
}
})
}
// registry.mutex.RLock(); err := registry.saveNow(); registry.mutex.RUnlock()
// Must be called with registry.mutex held (at least RLock).
func (registry *PeerRegistry) saveNow() error {
// Ensure directory exists
directoryPath := path.Dir(registry.peersFilePath)
if err := os.MkdirAll(directoryPath, 0755); err != nil {
return fmt.Errorf("failed to create peers directory: %w", err)
}
// Convert to slice for JSON
peers := make([]*Peer, 0, len(registry.peers))
for _, peer := range registry.peers {
peers = append(peers, peer)
}
data, err := json.MarshalIndent(peers, "", " ")
if err != nil {
return fmt.Errorf("failed to marshal peers: %w", err)
}
// Use atomic write pattern: write to temp file, then rename
temporaryPath := registry.peersFilePath + ".tmp"
if err := os.WriteFile(temporaryPath, data, 0644); err != nil {
return fmt.Errorf("failed to write peers temp file: %w", err)
}
if err := os.Rename(temporaryPath, registry.peersFilePath); err != nil {
os.Remove(temporaryPath) // Clean up temp file
return fmt.Errorf("failed to rename peers file: %w", err)
}
return nil
}
// defer registry.Close() // flush pending writes on shutdown
func (registry *PeerRegistry) Close() error {
registry.saveStopOnce.Do(func() {
close(registry.stopChannel)
})
// Cancel pending timer and save immediately if dirty
registry.saveMutex.Lock()
if registry.saveTimer != nil {
registry.saveTimer.Stop()
registry.saveTimer = nil
}
shouldSave := registry.dirty
registry.dirty = false
registry.saveMutex.Unlock()
if shouldSave {
registry.mutex.RLock()
err := registry.saveNow()
registry.mutex.RUnlock()
return err
}
return nil
}
// registry.save() // schedules a debounced write; errors logged asynchronously
// Must NOT be called with registry.mutex held.
func (registry *PeerRegistry) save() error {
registry.scheduleSave()
return nil // Errors will be logged asynchronously
}
// if err := registry.load(); err != nil { /* no existing peers, that's ok */ }
func (registry *PeerRegistry) load() error {
data, err := os.ReadFile(registry.peersFilePath)
if err != nil {
return fmt.Errorf("failed to read peers: %w", err)
}
var peers []*Peer
if err := json.Unmarshal(data, &peers); err != nil {
return fmt.Errorf("failed to unmarshal peers: %w", err)
}
registry.peers = make(map[string]*Peer)
for _, peer := range peers {
registry.peers[peer.ID] = peer
}
return nil
}