diff --git a/pkg/node/peer.go b/pkg/node/peer.go index 6aa20cb..fdd8828 100644 --- a/pkg/node/peer.go +++ b/pkg/node/peer.go @@ -34,7 +34,7 @@ type Peer struct { Connected bool `json:"-"` } -// r.scheduleSave() // coalesces writes; disk flush at most once per saveDebounceInterval +// registry.scheduleSave() // coalesces writes; disk flush at most once per saveDebounceInterval const saveDebounceInterval = 5 * time.Second // registry.SetAuthMode(PeerAuthAllowlist) // allow only pre-registered peers @@ -157,50 +157,50 @@ func NewPeerRegistryWithPath(peersPath string) (*PeerRegistry, error) { // registry.SetAuthMode(PeerAuthAllowlist) // require pre-registration // registry.SetAuthMode(PeerAuthOpen) // allow any peer (default) -func (r *PeerRegistry) SetAuthMode(mode PeerAuthMode) { - r.allowedPublicKeyMutex.Lock() - defer r.allowedPublicKeyMutex.Unlock() - r.authMode = mode +func (registry *PeerRegistry) SetAuthMode(mode PeerAuthMode) { + registry.allowedPublicKeyMutex.Lock() + defer registry.allowedPublicKeyMutex.Unlock() + registry.authMode = mode logging.Info("peer auth mode changed", logging.Fields{"mode": mode}) } // if registry.GetAuthMode() == PeerAuthAllowlist { /* enforce allowlist */ } -func (r *PeerRegistry) GetAuthMode() PeerAuthMode { - r.allowedPublicKeyMutex.RLock() - defer r.allowedPublicKeyMutex.RUnlock() - return r.authMode +func (registry *PeerRegistry) GetAuthMode() PeerAuthMode { + registry.allowedPublicKeyMutex.RLock() + defer registry.allowedPublicKeyMutex.RUnlock() + return registry.authMode } // registry.AllowPublicKey(peer.PublicKey) // permit this key without pre-registration -func (r *PeerRegistry) AllowPublicKey(publicKey string) { - r.allowedPublicKeyMutex.Lock() - defer r.allowedPublicKeyMutex.Unlock() - r.allowedPublicKeys[publicKey] = true +func (registry *PeerRegistry) AllowPublicKey(publicKey string) { + registry.allowedPublicKeyMutex.Lock() + defer registry.allowedPublicKeyMutex.Unlock() + registry.allowedPublicKeys[publicKey] = true logging.Debug("public key added to allowlist", logging.Fields{"key": safeKeyPrefix(publicKey)}) } // registry.RevokePublicKey(peer.PublicKey) // block this key on next connect attempt -func (r *PeerRegistry) RevokePublicKey(publicKey string) { - r.allowedPublicKeyMutex.Lock() - defer r.allowedPublicKeyMutex.Unlock() - delete(r.allowedPublicKeys, publicKey) +func (registry *PeerRegistry) RevokePublicKey(publicKey string) { + registry.allowedPublicKeyMutex.Lock() + defer registry.allowedPublicKeyMutex.Unlock() + delete(registry.allowedPublicKeys, publicKey) logging.Debug("public key removed from allowlist", logging.Fields{"key": safeKeyPrefix(publicKey)}) } // if registry.IsPublicKeyAllowed(peer.PublicKey) { /* permit */ } -func (r *PeerRegistry) IsPublicKeyAllowed(publicKey string) bool { - r.allowedPublicKeyMutex.RLock() - defer r.allowedPublicKeyMutex.RUnlock() - return r.allowedPublicKeys[publicKey] +func (registry *PeerRegistry) IsPublicKeyAllowed(publicKey string) bool { + registry.allowedPublicKeyMutex.RLock() + defer registry.allowedPublicKeyMutex.RUnlock() + return registry.allowedPublicKeys[publicKey] } // if !registry.IsPeerAllowed(payload.Identity.ID, payload.Identity.PublicKey) { conn.Close(); return } // true when Open mode, or Allowlist mode with pre-registered peer or allowlisted key -func (r *PeerRegistry) IsPeerAllowed(peerID string, publicKey string) bool { - r.allowedPublicKeyMutex.RLock() - authMode := r.authMode - keyAllowed := r.allowedPublicKeys[publicKey] - r.allowedPublicKeyMutex.RUnlock() +func (registry *PeerRegistry) IsPeerAllowed(peerID string, publicKey string) bool { + registry.allowedPublicKeyMutex.RLock() + authMode := registry.authMode + keyAllowed := registry.allowedPublicKeys[publicKey] + registry.allowedPublicKeyMutex.RUnlock() // Open mode allows everyone if authMode == PeerAuthOpen { @@ -208,9 +208,9 @@ func (r *PeerRegistry) IsPeerAllowed(peerID string, publicKey string) bool { } // Allowlist mode: check if peer is pre-registered - r.mutex.RLock() - _, isRegistered := r.peers[peerID] - r.mutex.RUnlock() + registry.mutex.RLock() + _, isRegistered := registry.peers[peerID] + registry.mutex.RUnlock() if isRegistered { return true @@ -221,12 +221,12 @@ func (r *PeerRegistry) IsPeerAllowed(peerID string, publicKey string) bool { } // keys := registry.ListAllowedPublicKeys() // for display or export -func (r *PeerRegistry) ListAllowedPublicKeys() []string { - r.allowedPublicKeyMutex.RLock() - defer r.allowedPublicKeyMutex.RUnlock() +func (registry *PeerRegistry) ListAllowedPublicKeys() []string { + registry.allowedPublicKeyMutex.RLock() + defer registry.allowedPublicKeyMutex.RUnlock() - keys := make([]string, 0, len(r.allowedPublicKeys)) - for key := range r.allowedPublicKeys { + keys := make([]string, 0, len(registry.allowedPublicKeys)) + for key := range registry.allowedPublicKeys { keys = append(keys, key) } return keys @@ -234,22 +234,22 @@ func (r *PeerRegistry) ListAllowedPublicKeys() []string { // registry.AddPeer(&Peer{ID: "abc123", Name: "worker-1", Address: "10.0.0.2:9090", Role: RoleWorker}) // Persistence is debounced — call Close() before shutdown to flush all pending writes. -func (r *PeerRegistry) AddPeer(peer *Peer) error { - r.mutex.Lock() +func (registry *PeerRegistry) AddPeer(peer *Peer) error { + registry.mutex.Lock() if peer.ID == "" { - r.mutex.Unlock() + registry.mutex.Unlock() return fmt.Errorf("peer ID is required") } // Validate peer name (P2P-LOW-3) if err := validatePeerName(peer.Name); err != nil { - r.mutex.Unlock() + registry.mutex.Unlock() return err } - if _, exists := r.peers[peer.ID]; exists { - r.mutex.Unlock() + if _, exists := registry.peers[peer.ID]; exists { + registry.mutex.Unlock() return fmt.Errorf("peer %s already exists", peer.ID) } @@ -261,54 +261,54 @@ func (r *PeerRegistry) AddPeer(peer *Peer) error { peer.Score = 50 // Default neutral score } - r.peers[peer.ID] = peer - r.rebuildKDTree() - r.mutex.Unlock() + registry.peers[peer.ID] = peer + registry.rebuildKDTree() + registry.mutex.Unlock() - return r.save() + return registry.save() } // registry.UpdatePeer(peer) // after handshake completes and real ID is known // Note: Persistence is debounced. Call Close() to flush before shutdown. -func (r *PeerRegistry) UpdatePeer(peer *Peer) error { - r.mutex.Lock() +func (registry *PeerRegistry) UpdatePeer(peer *Peer) error { + registry.mutex.Lock() - if _, exists := r.peers[peer.ID]; !exists { - r.mutex.Unlock() + if _, exists := registry.peers[peer.ID]; !exists { + registry.mutex.Unlock() return fmt.Errorf("peer %s not found", peer.ID) } - r.peers[peer.ID] = peer - r.rebuildKDTree() - r.mutex.Unlock() + registry.peers[peer.ID] = peer + registry.rebuildKDTree() + registry.mutex.Unlock() - return r.save() + return registry.save() } // registry.RemovePeer(peer.ID) // on manual disconnect or ban // Note: Persistence is debounced. Call Close() to flush before shutdown. -func (r *PeerRegistry) RemovePeer(id string) error { - r.mutex.Lock() +func (registry *PeerRegistry) RemovePeer(id string) error { + registry.mutex.Lock() - if _, exists := r.peers[id]; !exists { - r.mutex.Unlock() + if _, exists := registry.peers[id]; !exists { + registry.mutex.Unlock() return fmt.Errorf("peer %s not found", id) } - delete(r.peers, id) - r.rebuildKDTree() - r.mutex.Unlock() + delete(registry.peers, id) + registry.rebuildKDTree() + registry.mutex.Unlock() - return r.save() + return registry.save() } // peer := registry.GetPeer("abc123def456") // if peer == nil { return fmt.Errorf("peer not found") } -func (r *PeerRegistry) GetPeer(id string) *Peer { - r.mutex.RLock() - defer r.mutex.RUnlock() +func (registry *PeerRegistry) GetPeer(id string) *Peer { + registry.mutex.RLock() + defer registry.mutex.RUnlock() - peer, exists := r.peers[id] + peer, exists := registry.peers[id] if !exists { return nil } @@ -319,12 +319,12 @@ func (r *PeerRegistry) GetPeer(id string) *Peer { } // for _, peer := range registry.ListPeers() { log(peer.ID, peer.Role) } -func (r *PeerRegistry) ListPeers() []*Peer { - r.mutex.RLock() - defer r.mutex.RUnlock() +func (registry *PeerRegistry) ListPeers() []*Peer { + registry.mutex.RLock() + defer registry.mutex.RUnlock() - peers := make([]*Peer, 0, len(r.peers)) - for _, peer := range r.peers { + peers := make([]*Peer, 0, len(registry.peers)) + for _, peer := range registry.peers { peerCopy := *peer peers = append(peers, &peerCopy) } @@ -333,12 +333,12 @@ func (r *PeerRegistry) ListPeers() []*Peer { // registry.UpdateMetrics(peer.ID, rtt, peer.GeoKM, peer.Hops) // after PingPeer // Note: Persistence is debounced. Call Close() to flush before shutdown. -func (r *PeerRegistry) UpdateMetrics(id string, pingMS, geoKM float64, hops int) error { - r.mutex.Lock() +func (registry *PeerRegistry) UpdateMetrics(id string, pingMS, geoKM float64, hops int) error { + registry.mutex.Lock() - peer, exists := r.peers[id] + peer, exists := registry.peers[id] if !exists { - r.mutex.Unlock() + registry.mutex.Unlock() return fmt.Errorf("peer %s not found", id) } @@ -347,20 +347,20 @@ func (r *PeerRegistry) UpdateMetrics(id string, pingMS, geoKM float64, hops int) peer.Hops = hops peer.LastSeen = time.Now() - r.rebuildKDTree() - r.mutex.Unlock() + registry.rebuildKDTree() + registry.mutex.Unlock() - return r.save() + return registry.save() } // registry.UpdateScore(peer.ID, 75.0) // clamps to 0-100 // Note: Persistence is debounced. Call Close() to flush before shutdown. -func (r *PeerRegistry) UpdateScore(id string, score float64) error { - r.mutex.Lock() +func (registry *PeerRegistry) UpdateScore(id string, score float64) error { + registry.mutex.Lock() - peer, exists := r.peers[id] + peer, exists := registry.peers[id] if !exists { - r.mutex.Unlock() + registry.mutex.Unlock() return fmt.Errorf("peer %s not found", id) } @@ -372,19 +372,19 @@ func (r *PeerRegistry) UpdateScore(id string, score float64) error { } peer.Score = score - r.rebuildKDTree() - r.mutex.Unlock() + registry.rebuildKDTree() + registry.mutex.Unlock() - return r.save() + return registry.save() } // registry.SetConnected(peer.ID, true) // on connect // registry.SetConnected(peer.ID, false) // on disconnect or error -func (r *PeerRegistry) SetConnected(id string, connected bool) { - r.mutex.Lock() - defer r.mutex.Unlock() +func (registry *PeerRegistry) SetConnected(id string, connected bool) { + registry.mutex.Lock() + defer registry.mutex.Unlock() - if peer, exists := r.peers[id]; exists { + if peer, exists := registry.peers[id]; exists { peer.Connected = connected if connected { peer.LastSeen = time.Now() @@ -403,33 +403,33 @@ const ( ) // registry.RecordSuccess(peer.ID) // after a successful response -func (r *PeerRegistry) RecordSuccess(id string) { - r.mutex.Lock() - peer, exists := r.peers[id] +func (registry *PeerRegistry) RecordSuccess(id string) { + registry.mutex.Lock() + peer, exists := registry.peers[id] if !exists { - r.mutex.Unlock() + registry.mutex.Unlock() return } peer.Score = min(peer.Score+ScoreSuccessIncrement, ScoreMaximum) peer.LastSeen = time.Now() - r.mutex.Unlock() - r.save() + registry.mutex.Unlock() + registry.save() } // registry.RecordFailure(peer.ID) // after a failed send or error response -func (r *PeerRegistry) RecordFailure(id string) { - r.mutex.Lock() - peer, exists := r.peers[id] +func (registry *PeerRegistry) RecordFailure(id string) { + registry.mutex.Lock() + peer, exists := registry.peers[id] if !exists { - r.mutex.Unlock() + registry.mutex.Unlock() return } peer.Score = max(peer.Score-ScoreFailureDecrement, ScoreMinimum) newScore := peer.Score - r.mutex.Unlock() - r.save() + registry.mutex.Unlock() + registry.save() logging.Debug("peer score decreased", logging.Fields{ "peer_id": id, @@ -439,18 +439,18 @@ func (r *PeerRegistry) RecordFailure(id string) { } // registry.RecordTimeout(peer.ID) // after a request deadline exceeded -func (r *PeerRegistry) RecordTimeout(id string) { - r.mutex.Lock() - peer, exists := r.peers[id] +func (registry *PeerRegistry) RecordTimeout(id string) { + registry.mutex.Lock() + peer, exists := registry.peers[id] if !exists { - r.mutex.Unlock() + registry.mutex.Unlock() return } peer.Score = max(peer.Score-ScoreTimeoutDecrement, ScoreMinimum) newScore := peer.Score - r.mutex.Unlock() - r.save() + registry.mutex.Unlock() + registry.save() logging.Debug("peer score decreased", logging.Fields{ "peer_id": id, @@ -460,12 +460,12 @@ func (r *PeerRegistry) RecordTimeout(id string) { } // for _, peer := range registry.GetPeersByScore() { log(peer.ID, peer.Score) } -func (r *PeerRegistry) GetPeersByScore() []*Peer { - r.mutex.RLock() - defer r.mutex.RUnlock() +func (registry *PeerRegistry) GetPeersByScore() []*Peer { + registry.mutex.RLock() + defer registry.mutex.RUnlock() - peers := make([]*Peer, 0, len(r.peers)) - for _, peer := range r.peers { + peers := make([]*Peer, 0, len(registry.peers)) + for _, peer := range registry.peers { peers = append(peers, peer) } @@ -483,11 +483,11 @@ func (r *PeerRegistry) GetPeersByScore() []*Peer { // peer := registry.SelectOptimalPeer() // if peer != nil { ctrl.ConnectToPeer(peer.ID) } -func (r *PeerRegistry) SelectOptimalPeer() *Peer { - r.mutex.RLock() - defer r.mutex.RUnlock() +func (registry *PeerRegistry) SelectOptimalPeer() *Peer { + registry.mutex.RLock() + defer registry.mutex.RUnlock() - if r.kdTree == nil || len(r.peers) == 0 { + if registry.kdTree == nil || len(registry.peers) == 0 { return nil } @@ -495,12 +495,12 @@ func (r *PeerRegistry) SelectOptimalPeer() *Peer { // Score is inverted (100 - score) so lower is better in the tree target := []float64{0, 0, 0, 0} - result, _, found := r.kdTree.Nearest(target) + result, _, found := registry.kdTree.Nearest(target) if !found { return nil } - peer, exists := r.peers[result.Value] + peer, exists := registry.peers[result.Value] if !exists { return nil } @@ -511,22 +511,22 @@ func (r *PeerRegistry) SelectOptimalPeer() *Peer { // peers := registry.SelectNearestPeers(3) // top 3 peers by ping, hops, geo, score // for _, peer := range peers { ctrl.ConnectToPeer(peer.ID) } -func (r *PeerRegistry) SelectNearestPeers(count int) []*Peer { - r.mutex.RLock() - defer r.mutex.RUnlock() +func (registry *PeerRegistry) SelectNearestPeers(count int) []*Peer { + registry.mutex.RLock() + defer registry.mutex.RUnlock() - if r.kdTree == nil || len(r.peers) == 0 { + if registry.kdTree == nil || len(registry.peers) == 0 { return nil } // Target: ideal peer target := []float64{0, 0, 0, 0} - results, _ := r.kdTree.KNearest(target, count) + results, _ := registry.kdTree.KNearest(target, count) peers := make([]*Peer, 0, len(results)) for _, result := range results { - if peer, exists := r.peers[result.Value]; exists { + if peer, exists := registry.peers[result.Value]; exists { peerCopy := *peer peers = append(peers, &peerCopy) } @@ -536,12 +536,12 @@ func (r *PeerRegistry) SelectNearestPeers(count int) []*Peer { } // for _, peer := range registry.GetConnectedPeers() { ctrl.GetRemoteStats(peer.ID) } -func (r *PeerRegistry) GetConnectedPeers() []*Peer { - r.mutex.RLock() - defer r.mutex.RUnlock() +func (registry *PeerRegistry) GetConnectedPeers() []*Peer { + registry.mutex.RLock() + defer registry.mutex.RUnlock() peers := make([]*Peer, 0) - for _, peer := range r.peers { + for _, peer := range registry.peers { if peer.Connected { peerCopy := *peer peers = append(peers, &peerCopy) @@ -551,21 +551,21 @@ func (r *PeerRegistry) GetConnectedPeers() []*Peer { } // if registry.Count() == 0 { return ErrNoPeers } -func (r *PeerRegistry) Count() int { - r.mutex.RLock() - defer r.mutex.RUnlock() - return len(r.peers) +func (registry *PeerRegistry) Count() int { + registry.mutex.RLock() + defer registry.mutex.RUnlock() + return len(registry.peers) } -// r.rebuildKDTree() // called after AddPeer, UpdatePeer, RemovePeer, UpdateMetrics — lock must be held -func (r *PeerRegistry) rebuildKDTree() { - if len(r.peers) == 0 { - r.kdTree = nil +// registry.rebuildKDTree() // called after AddPeer, UpdatePeer, RemovePeer, UpdateMetrics — lock must be held +func (registry *PeerRegistry) rebuildKDTree() { + if len(registry.peers) == 0 { + registry.kdTree = nil return } - points := make([]poindexter.KDPoint[string], 0, len(r.peers)) - for _, peer := range r.peers { + points := make([]poindexter.KDPoint[string], 0, len(registry.peers)) + for _, peer := range registry.peers { // Build 4D point with weighted, normalized values // Invert score so that higher score = lower value (better) point := poindexter.KDPoint[string]{ @@ -588,35 +588,35 @@ func (r *PeerRegistry) rebuildKDTree() { return } - r.kdTree = tree + registry.kdTree = tree } -// r.scheduleSave() // called by AddPeer, UpdatePeer, RemovePeer, UpdateMetrics, UpdateScore +// registry.scheduleSave() // called by AddPeer, UpdatePeer, RemovePeer, UpdateMetrics, UpdateScore // Multiple writes within saveDebounceInterval are coalesced into one disk write. -// Must NOT be called with r.mutex held. -func (r *PeerRegistry) scheduleSave() { - r.saveMutex.Lock() - defer r.saveMutex.Unlock() +// Must NOT be called with registry.mutex held. +func (registry *PeerRegistry) scheduleSave() { + registry.saveMutex.Lock() + defer registry.saveMutex.Unlock() - r.dirty = true + registry.dirty = true // If timer already running, let it handle the save - if r.saveTimer != nil { + if registry.saveTimer != nil { return } // Start a new timer - r.saveTimer = time.AfterFunc(saveDebounceInterval, func() { - r.saveMutex.Lock() - r.saveTimer = nil - shouldSave := r.dirty - r.dirty = false - r.saveMutex.Unlock() + registry.saveTimer = time.AfterFunc(saveDebounceInterval, func() { + registry.saveMutex.Lock() + registry.saveTimer = nil + shouldSave := registry.dirty + registry.dirty = false + registry.saveMutex.Unlock() if shouldSave { - r.mutex.RLock() - err := r.saveNow() - r.mutex.RUnlock() + registry.mutex.RLock() + err := registry.saveNow() + registry.mutex.RUnlock() if err != nil { // Log error but continue - best effort persistence logging.Warn("failed to save peer registry", logging.Fields{"error": err}) @@ -625,18 +625,18 @@ func (r *PeerRegistry) scheduleSave() { }) } -// r.mutex.RLock(); err := r.saveNow(); r.mutex.RUnlock() -// Must be called with r.mutex held (at least RLock). -func (r *PeerRegistry) saveNow() error { +// registry.mutex.RLock(); err := registry.saveNow(); registry.mutex.RUnlock() +// Must be called with registry.mutex held (at least RLock). +func (registry *PeerRegistry) saveNow() error { // Ensure directory exists - directoryPath := filepath.Dir(r.path) + directoryPath := filepath.Dir(registry.path) if err := os.MkdirAll(directoryPath, 0755); err != nil { return fmt.Errorf("failed to create peers directory: %w", err) } // Convert to slice for JSON - peers := make([]*Peer, 0, len(r.peers)) - for _, peer := range r.peers { + peers := make([]*Peer, 0, len(registry.peers)) + for _, peer := range registry.peers { peers = append(peers, peer) } @@ -646,12 +646,12 @@ func (r *PeerRegistry) saveNow() error { } // Use atomic write pattern: write to temp file, then rename - temporaryPath := r.path + ".tmp" + temporaryPath := registry.path + ".tmp" if err := os.WriteFile(temporaryPath, data, 0644); err != nil { return fmt.Errorf("failed to write peers temp file: %w", err) } - if err := os.Rename(temporaryPath, r.path); err != nil { + if err := os.Rename(temporaryPath, registry.path); err != nil { os.Remove(temporaryPath) // Clean up temp file return fmt.Errorf("failed to rename peers file: %w", err) } @@ -660,41 +660,41 @@ func (r *PeerRegistry) saveNow() error { } // defer registry.Close() // flush pending writes on shutdown -func (r *PeerRegistry) Close() error { - r.saveStopOnce.Do(func() { - close(r.stopChan) +func (registry *PeerRegistry) Close() error { + registry.saveStopOnce.Do(func() { + close(registry.stopChan) }) // Cancel pending timer and save immediately if dirty - r.saveMutex.Lock() - if r.saveTimer != nil { - r.saveTimer.Stop() - r.saveTimer = nil + registry.saveMutex.Lock() + if registry.saveTimer != nil { + registry.saveTimer.Stop() + registry.saveTimer = nil } - shouldSave := r.dirty - r.dirty = false - r.saveMutex.Unlock() + shouldSave := registry.dirty + registry.dirty = false + registry.saveMutex.Unlock() if shouldSave { - r.mutex.RLock() - err := r.saveNow() - r.mutex.RUnlock() + registry.mutex.RLock() + err := registry.saveNow() + registry.mutex.RUnlock() return err } return nil } -// r.save() // schedules a debounced write; errors logged asynchronously -// Must NOT be called with r.mutex held. -func (r *PeerRegistry) save() error { - r.scheduleSave() +// registry.save() // schedules a debounced write; errors logged asynchronously +// Must NOT be called with registry.mutex held. +func (registry *PeerRegistry) save() error { + registry.scheduleSave() return nil // Errors will be logged asynchronously } -// if err := r.load(); err != nil { /* no existing peers, that's ok */ } -func (r *PeerRegistry) load() error { - data, err := os.ReadFile(r.path) +// if err := registry.load(); err != nil { /* no existing peers, that's ok */ } +func (registry *PeerRegistry) load() error { + data, err := os.ReadFile(registry.path) if err != nil { return fmt.Errorf("failed to read peers: %w", err) } @@ -704,9 +704,9 @@ func (r *PeerRegistry) load() error { return fmt.Errorf("failed to unmarshal peers: %w", err) } - r.peers = make(map[string]*Peer) + registry.peers = make(map[string]*Peer) for _, peer := range peers { - r.peers[peer.ID] = peer + registry.peers[peer.ID] = peer } return nil