ax(node): rename PeerRegistry receiver r to registry
AX Principle 1 — predictable names over short names. The receiver `r` on PeerRegistry is a single-letter abbreviation; all 29 methods now use `registry` for consistency with the rest of the node package (controller, worker, transport all use full names). Co-Authored-By: Charon <charon@lethean.io>
This commit is contained in:
parent
ee8b3e6499
commit
311e457e35
1 changed files with 183 additions and 183 deletions
366
pkg/node/peer.go
366
pkg/node/peer.go
|
|
@ -34,7 +34,7 @@ type Peer struct {
|
|||
Connected bool `json:"-"`
|
||||
}
|
||||
|
||||
// r.scheduleSave() // coalesces writes; disk flush at most once per saveDebounceInterval
|
||||
// registry.scheduleSave() // coalesces writes; disk flush at most once per saveDebounceInterval
|
||||
const saveDebounceInterval = 5 * time.Second
|
||||
|
||||
// registry.SetAuthMode(PeerAuthAllowlist) // allow only pre-registered peers
|
||||
|
|
@ -157,50 +157,50 @@ func NewPeerRegistryWithPath(peersPath string) (*PeerRegistry, error) {
|
|||
|
||||
// registry.SetAuthMode(PeerAuthAllowlist) // require pre-registration
|
||||
// registry.SetAuthMode(PeerAuthOpen) // allow any peer (default)
|
||||
func (r *PeerRegistry) SetAuthMode(mode PeerAuthMode) {
|
||||
r.allowedPublicKeyMutex.Lock()
|
||||
defer r.allowedPublicKeyMutex.Unlock()
|
||||
r.authMode = mode
|
||||
func (registry *PeerRegistry) SetAuthMode(mode PeerAuthMode) {
|
||||
registry.allowedPublicKeyMutex.Lock()
|
||||
defer registry.allowedPublicKeyMutex.Unlock()
|
||||
registry.authMode = mode
|
||||
logging.Info("peer auth mode changed", logging.Fields{"mode": mode})
|
||||
}
|
||||
|
||||
// if registry.GetAuthMode() == PeerAuthAllowlist { /* enforce allowlist */ }
|
||||
func (r *PeerRegistry) GetAuthMode() PeerAuthMode {
|
||||
r.allowedPublicKeyMutex.RLock()
|
||||
defer r.allowedPublicKeyMutex.RUnlock()
|
||||
return r.authMode
|
||||
func (registry *PeerRegistry) GetAuthMode() PeerAuthMode {
|
||||
registry.allowedPublicKeyMutex.RLock()
|
||||
defer registry.allowedPublicKeyMutex.RUnlock()
|
||||
return registry.authMode
|
||||
}
|
||||
|
||||
// registry.AllowPublicKey(peer.PublicKey) // permit this key without pre-registration
|
||||
func (r *PeerRegistry) AllowPublicKey(publicKey string) {
|
||||
r.allowedPublicKeyMutex.Lock()
|
||||
defer r.allowedPublicKeyMutex.Unlock()
|
||||
r.allowedPublicKeys[publicKey] = true
|
||||
func (registry *PeerRegistry) AllowPublicKey(publicKey string) {
|
||||
registry.allowedPublicKeyMutex.Lock()
|
||||
defer registry.allowedPublicKeyMutex.Unlock()
|
||||
registry.allowedPublicKeys[publicKey] = true
|
||||
logging.Debug("public key added to allowlist", logging.Fields{"key": safeKeyPrefix(publicKey)})
|
||||
}
|
||||
|
||||
// registry.RevokePublicKey(peer.PublicKey) // block this key on next connect attempt
|
||||
func (r *PeerRegistry) RevokePublicKey(publicKey string) {
|
||||
r.allowedPublicKeyMutex.Lock()
|
||||
defer r.allowedPublicKeyMutex.Unlock()
|
||||
delete(r.allowedPublicKeys, publicKey)
|
||||
func (registry *PeerRegistry) RevokePublicKey(publicKey string) {
|
||||
registry.allowedPublicKeyMutex.Lock()
|
||||
defer registry.allowedPublicKeyMutex.Unlock()
|
||||
delete(registry.allowedPublicKeys, publicKey)
|
||||
logging.Debug("public key removed from allowlist", logging.Fields{"key": safeKeyPrefix(publicKey)})
|
||||
}
|
||||
|
||||
// if registry.IsPublicKeyAllowed(peer.PublicKey) { /* permit */ }
|
||||
func (r *PeerRegistry) IsPublicKeyAllowed(publicKey string) bool {
|
||||
r.allowedPublicKeyMutex.RLock()
|
||||
defer r.allowedPublicKeyMutex.RUnlock()
|
||||
return r.allowedPublicKeys[publicKey]
|
||||
func (registry *PeerRegistry) IsPublicKeyAllowed(publicKey string) bool {
|
||||
registry.allowedPublicKeyMutex.RLock()
|
||||
defer registry.allowedPublicKeyMutex.RUnlock()
|
||||
return registry.allowedPublicKeys[publicKey]
|
||||
}
|
||||
|
||||
// if !registry.IsPeerAllowed(payload.Identity.ID, payload.Identity.PublicKey) { conn.Close(); return }
|
||||
// true when Open mode, or Allowlist mode with pre-registered peer or allowlisted key
|
||||
func (r *PeerRegistry) IsPeerAllowed(peerID string, publicKey string) bool {
|
||||
r.allowedPublicKeyMutex.RLock()
|
||||
authMode := r.authMode
|
||||
keyAllowed := r.allowedPublicKeys[publicKey]
|
||||
r.allowedPublicKeyMutex.RUnlock()
|
||||
func (registry *PeerRegistry) IsPeerAllowed(peerID string, publicKey string) bool {
|
||||
registry.allowedPublicKeyMutex.RLock()
|
||||
authMode := registry.authMode
|
||||
keyAllowed := registry.allowedPublicKeys[publicKey]
|
||||
registry.allowedPublicKeyMutex.RUnlock()
|
||||
|
||||
// Open mode allows everyone
|
||||
if authMode == PeerAuthOpen {
|
||||
|
|
@ -208,9 +208,9 @@ func (r *PeerRegistry) IsPeerAllowed(peerID string, publicKey string) bool {
|
|||
}
|
||||
|
||||
// Allowlist mode: check if peer is pre-registered
|
||||
r.mutex.RLock()
|
||||
_, isRegistered := r.peers[peerID]
|
||||
r.mutex.RUnlock()
|
||||
registry.mutex.RLock()
|
||||
_, isRegistered := registry.peers[peerID]
|
||||
registry.mutex.RUnlock()
|
||||
|
||||
if isRegistered {
|
||||
return true
|
||||
|
|
@ -221,12 +221,12 @@ func (r *PeerRegistry) IsPeerAllowed(peerID string, publicKey string) bool {
|
|||
}
|
||||
|
||||
// keys := registry.ListAllowedPublicKeys() // for display or export
|
||||
func (r *PeerRegistry) ListAllowedPublicKeys() []string {
|
||||
r.allowedPublicKeyMutex.RLock()
|
||||
defer r.allowedPublicKeyMutex.RUnlock()
|
||||
func (registry *PeerRegistry) ListAllowedPublicKeys() []string {
|
||||
registry.allowedPublicKeyMutex.RLock()
|
||||
defer registry.allowedPublicKeyMutex.RUnlock()
|
||||
|
||||
keys := make([]string, 0, len(r.allowedPublicKeys))
|
||||
for key := range r.allowedPublicKeys {
|
||||
keys := make([]string, 0, len(registry.allowedPublicKeys))
|
||||
for key := range registry.allowedPublicKeys {
|
||||
keys = append(keys, key)
|
||||
}
|
||||
return keys
|
||||
|
|
@ -234,22 +234,22 @@ func (r *PeerRegistry) ListAllowedPublicKeys() []string {
|
|||
|
||||
// registry.AddPeer(&Peer{ID: "abc123", Name: "worker-1", Address: "10.0.0.2:9090", Role: RoleWorker})
|
||||
// Persistence is debounced — call Close() before shutdown to flush all pending writes.
|
||||
func (r *PeerRegistry) AddPeer(peer *Peer) error {
|
||||
r.mutex.Lock()
|
||||
func (registry *PeerRegistry) AddPeer(peer *Peer) error {
|
||||
registry.mutex.Lock()
|
||||
|
||||
if peer.ID == "" {
|
||||
r.mutex.Unlock()
|
||||
registry.mutex.Unlock()
|
||||
return fmt.Errorf("peer ID is required")
|
||||
}
|
||||
|
||||
// Validate peer name (P2P-LOW-3)
|
||||
if err := validatePeerName(peer.Name); err != nil {
|
||||
r.mutex.Unlock()
|
||||
registry.mutex.Unlock()
|
||||
return err
|
||||
}
|
||||
|
||||
if _, exists := r.peers[peer.ID]; exists {
|
||||
r.mutex.Unlock()
|
||||
if _, exists := registry.peers[peer.ID]; exists {
|
||||
registry.mutex.Unlock()
|
||||
return fmt.Errorf("peer %s already exists", peer.ID)
|
||||
}
|
||||
|
||||
|
|
@ -261,54 +261,54 @@ func (r *PeerRegistry) AddPeer(peer *Peer) error {
|
|||
peer.Score = 50 // Default neutral score
|
||||
}
|
||||
|
||||
r.peers[peer.ID] = peer
|
||||
r.rebuildKDTree()
|
||||
r.mutex.Unlock()
|
||||
registry.peers[peer.ID] = peer
|
||||
registry.rebuildKDTree()
|
||||
registry.mutex.Unlock()
|
||||
|
||||
return r.save()
|
||||
return registry.save()
|
||||
}
|
||||
|
||||
// registry.UpdatePeer(peer) // after handshake completes and real ID is known
|
||||
// Note: Persistence is debounced. Call Close() to flush before shutdown.
|
||||
func (r *PeerRegistry) UpdatePeer(peer *Peer) error {
|
||||
r.mutex.Lock()
|
||||
func (registry *PeerRegistry) UpdatePeer(peer *Peer) error {
|
||||
registry.mutex.Lock()
|
||||
|
||||
if _, exists := r.peers[peer.ID]; !exists {
|
||||
r.mutex.Unlock()
|
||||
if _, exists := registry.peers[peer.ID]; !exists {
|
||||
registry.mutex.Unlock()
|
||||
return fmt.Errorf("peer %s not found", peer.ID)
|
||||
}
|
||||
|
||||
r.peers[peer.ID] = peer
|
||||
r.rebuildKDTree()
|
||||
r.mutex.Unlock()
|
||||
registry.peers[peer.ID] = peer
|
||||
registry.rebuildKDTree()
|
||||
registry.mutex.Unlock()
|
||||
|
||||
return r.save()
|
||||
return registry.save()
|
||||
}
|
||||
|
||||
// registry.RemovePeer(peer.ID) // on manual disconnect or ban
|
||||
// Note: Persistence is debounced. Call Close() to flush before shutdown.
|
||||
func (r *PeerRegistry) RemovePeer(id string) error {
|
||||
r.mutex.Lock()
|
||||
func (registry *PeerRegistry) RemovePeer(id string) error {
|
||||
registry.mutex.Lock()
|
||||
|
||||
if _, exists := r.peers[id]; !exists {
|
||||
r.mutex.Unlock()
|
||||
if _, exists := registry.peers[id]; !exists {
|
||||
registry.mutex.Unlock()
|
||||
return fmt.Errorf("peer %s not found", id)
|
||||
}
|
||||
|
||||
delete(r.peers, id)
|
||||
r.rebuildKDTree()
|
||||
r.mutex.Unlock()
|
||||
delete(registry.peers, id)
|
||||
registry.rebuildKDTree()
|
||||
registry.mutex.Unlock()
|
||||
|
||||
return r.save()
|
||||
return registry.save()
|
||||
}
|
||||
|
||||
// peer := registry.GetPeer("abc123def456")
|
||||
// if peer == nil { return fmt.Errorf("peer not found") }
|
||||
func (r *PeerRegistry) GetPeer(id string) *Peer {
|
||||
r.mutex.RLock()
|
||||
defer r.mutex.RUnlock()
|
||||
func (registry *PeerRegistry) GetPeer(id string) *Peer {
|
||||
registry.mutex.RLock()
|
||||
defer registry.mutex.RUnlock()
|
||||
|
||||
peer, exists := r.peers[id]
|
||||
peer, exists := registry.peers[id]
|
||||
if !exists {
|
||||
return nil
|
||||
}
|
||||
|
|
@ -319,12 +319,12 @@ func (r *PeerRegistry) GetPeer(id string) *Peer {
|
|||
}
|
||||
|
||||
// for _, peer := range registry.ListPeers() { log(peer.ID, peer.Role) }
|
||||
func (r *PeerRegistry) ListPeers() []*Peer {
|
||||
r.mutex.RLock()
|
||||
defer r.mutex.RUnlock()
|
||||
func (registry *PeerRegistry) ListPeers() []*Peer {
|
||||
registry.mutex.RLock()
|
||||
defer registry.mutex.RUnlock()
|
||||
|
||||
peers := make([]*Peer, 0, len(r.peers))
|
||||
for _, peer := range r.peers {
|
||||
peers := make([]*Peer, 0, len(registry.peers))
|
||||
for _, peer := range registry.peers {
|
||||
peerCopy := *peer
|
||||
peers = append(peers, &peerCopy)
|
||||
}
|
||||
|
|
@ -333,12 +333,12 @@ func (r *PeerRegistry) ListPeers() []*Peer {
|
|||
|
||||
// registry.UpdateMetrics(peer.ID, rtt, peer.GeoKM, peer.Hops) // after PingPeer
|
||||
// Note: Persistence is debounced. Call Close() to flush before shutdown.
|
||||
func (r *PeerRegistry) UpdateMetrics(id string, pingMS, geoKM float64, hops int) error {
|
||||
r.mutex.Lock()
|
||||
func (registry *PeerRegistry) UpdateMetrics(id string, pingMS, geoKM float64, hops int) error {
|
||||
registry.mutex.Lock()
|
||||
|
||||
peer, exists := r.peers[id]
|
||||
peer, exists := registry.peers[id]
|
||||
if !exists {
|
||||
r.mutex.Unlock()
|
||||
registry.mutex.Unlock()
|
||||
return fmt.Errorf("peer %s not found", id)
|
||||
}
|
||||
|
||||
|
|
@ -347,20 +347,20 @@ func (r *PeerRegistry) UpdateMetrics(id string, pingMS, geoKM float64, hops int)
|
|||
peer.Hops = hops
|
||||
peer.LastSeen = time.Now()
|
||||
|
||||
r.rebuildKDTree()
|
||||
r.mutex.Unlock()
|
||||
registry.rebuildKDTree()
|
||||
registry.mutex.Unlock()
|
||||
|
||||
return r.save()
|
||||
return registry.save()
|
||||
}
|
||||
|
||||
// registry.UpdateScore(peer.ID, 75.0) // clamps to 0-100
|
||||
// Note: Persistence is debounced. Call Close() to flush before shutdown.
|
||||
func (r *PeerRegistry) UpdateScore(id string, score float64) error {
|
||||
r.mutex.Lock()
|
||||
func (registry *PeerRegistry) UpdateScore(id string, score float64) error {
|
||||
registry.mutex.Lock()
|
||||
|
||||
peer, exists := r.peers[id]
|
||||
peer, exists := registry.peers[id]
|
||||
if !exists {
|
||||
r.mutex.Unlock()
|
||||
registry.mutex.Unlock()
|
||||
return fmt.Errorf("peer %s not found", id)
|
||||
}
|
||||
|
||||
|
|
@ -372,19 +372,19 @@ func (r *PeerRegistry) UpdateScore(id string, score float64) error {
|
|||
}
|
||||
|
||||
peer.Score = score
|
||||
r.rebuildKDTree()
|
||||
r.mutex.Unlock()
|
||||
registry.rebuildKDTree()
|
||||
registry.mutex.Unlock()
|
||||
|
||||
return r.save()
|
||||
return registry.save()
|
||||
}
|
||||
|
||||
// registry.SetConnected(peer.ID, true) // on connect
|
||||
// registry.SetConnected(peer.ID, false) // on disconnect or error
|
||||
func (r *PeerRegistry) SetConnected(id string, connected bool) {
|
||||
r.mutex.Lock()
|
||||
defer r.mutex.Unlock()
|
||||
func (registry *PeerRegistry) SetConnected(id string, connected bool) {
|
||||
registry.mutex.Lock()
|
||||
defer registry.mutex.Unlock()
|
||||
|
||||
if peer, exists := r.peers[id]; exists {
|
||||
if peer, exists := registry.peers[id]; exists {
|
||||
peer.Connected = connected
|
||||
if connected {
|
||||
peer.LastSeen = time.Now()
|
||||
|
|
@ -403,33 +403,33 @@ const (
|
|||
)
|
||||
|
||||
// registry.RecordSuccess(peer.ID) // after a successful response
|
||||
func (r *PeerRegistry) RecordSuccess(id string) {
|
||||
r.mutex.Lock()
|
||||
peer, exists := r.peers[id]
|
||||
func (registry *PeerRegistry) RecordSuccess(id string) {
|
||||
registry.mutex.Lock()
|
||||
peer, exists := registry.peers[id]
|
||||
if !exists {
|
||||
r.mutex.Unlock()
|
||||
registry.mutex.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
peer.Score = min(peer.Score+ScoreSuccessIncrement, ScoreMaximum)
|
||||
peer.LastSeen = time.Now()
|
||||
r.mutex.Unlock()
|
||||
r.save()
|
||||
registry.mutex.Unlock()
|
||||
registry.save()
|
||||
}
|
||||
|
||||
// registry.RecordFailure(peer.ID) // after a failed send or error response
|
||||
func (r *PeerRegistry) RecordFailure(id string) {
|
||||
r.mutex.Lock()
|
||||
peer, exists := r.peers[id]
|
||||
func (registry *PeerRegistry) RecordFailure(id string) {
|
||||
registry.mutex.Lock()
|
||||
peer, exists := registry.peers[id]
|
||||
if !exists {
|
||||
r.mutex.Unlock()
|
||||
registry.mutex.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
peer.Score = max(peer.Score-ScoreFailureDecrement, ScoreMinimum)
|
||||
newScore := peer.Score
|
||||
r.mutex.Unlock()
|
||||
r.save()
|
||||
registry.mutex.Unlock()
|
||||
registry.save()
|
||||
|
||||
logging.Debug("peer score decreased", logging.Fields{
|
||||
"peer_id": id,
|
||||
|
|
@ -439,18 +439,18 @@ func (r *PeerRegistry) RecordFailure(id string) {
|
|||
}
|
||||
|
||||
// registry.RecordTimeout(peer.ID) // after a request deadline exceeded
|
||||
func (r *PeerRegistry) RecordTimeout(id string) {
|
||||
r.mutex.Lock()
|
||||
peer, exists := r.peers[id]
|
||||
func (registry *PeerRegistry) RecordTimeout(id string) {
|
||||
registry.mutex.Lock()
|
||||
peer, exists := registry.peers[id]
|
||||
if !exists {
|
||||
r.mutex.Unlock()
|
||||
registry.mutex.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
peer.Score = max(peer.Score-ScoreTimeoutDecrement, ScoreMinimum)
|
||||
newScore := peer.Score
|
||||
r.mutex.Unlock()
|
||||
r.save()
|
||||
registry.mutex.Unlock()
|
||||
registry.save()
|
||||
|
||||
logging.Debug("peer score decreased", logging.Fields{
|
||||
"peer_id": id,
|
||||
|
|
@ -460,12 +460,12 @@ func (r *PeerRegistry) RecordTimeout(id string) {
|
|||
}
|
||||
|
||||
// for _, peer := range registry.GetPeersByScore() { log(peer.ID, peer.Score) }
|
||||
func (r *PeerRegistry) GetPeersByScore() []*Peer {
|
||||
r.mutex.RLock()
|
||||
defer r.mutex.RUnlock()
|
||||
func (registry *PeerRegistry) GetPeersByScore() []*Peer {
|
||||
registry.mutex.RLock()
|
||||
defer registry.mutex.RUnlock()
|
||||
|
||||
peers := make([]*Peer, 0, len(r.peers))
|
||||
for _, peer := range r.peers {
|
||||
peers := make([]*Peer, 0, len(registry.peers))
|
||||
for _, peer := range registry.peers {
|
||||
peers = append(peers, peer)
|
||||
}
|
||||
|
||||
|
|
@ -483,11 +483,11 @@ func (r *PeerRegistry) GetPeersByScore() []*Peer {
|
|||
|
||||
// peer := registry.SelectOptimalPeer()
|
||||
// if peer != nil { ctrl.ConnectToPeer(peer.ID) }
|
||||
func (r *PeerRegistry) SelectOptimalPeer() *Peer {
|
||||
r.mutex.RLock()
|
||||
defer r.mutex.RUnlock()
|
||||
func (registry *PeerRegistry) SelectOptimalPeer() *Peer {
|
||||
registry.mutex.RLock()
|
||||
defer registry.mutex.RUnlock()
|
||||
|
||||
if r.kdTree == nil || len(r.peers) == 0 {
|
||||
if registry.kdTree == nil || len(registry.peers) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
@ -495,12 +495,12 @@ func (r *PeerRegistry) SelectOptimalPeer() *Peer {
|
|||
// Score is inverted (100 - score) so lower is better in the tree
|
||||
target := []float64{0, 0, 0, 0}
|
||||
|
||||
result, _, found := r.kdTree.Nearest(target)
|
||||
result, _, found := registry.kdTree.Nearest(target)
|
||||
if !found {
|
||||
return nil
|
||||
}
|
||||
|
||||
peer, exists := r.peers[result.Value]
|
||||
peer, exists := registry.peers[result.Value]
|
||||
if !exists {
|
||||
return nil
|
||||
}
|
||||
|
|
@ -511,22 +511,22 @@ func (r *PeerRegistry) SelectOptimalPeer() *Peer {
|
|||
|
||||
// peers := registry.SelectNearestPeers(3) // top 3 peers by ping, hops, geo, score
|
||||
// for _, peer := range peers { ctrl.ConnectToPeer(peer.ID) }
|
||||
func (r *PeerRegistry) SelectNearestPeers(count int) []*Peer {
|
||||
r.mutex.RLock()
|
||||
defer r.mutex.RUnlock()
|
||||
func (registry *PeerRegistry) SelectNearestPeers(count int) []*Peer {
|
||||
registry.mutex.RLock()
|
||||
defer registry.mutex.RUnlock()
|
||||
|
||||
if r.kdTree == nil || len(r.peers) == 0 {
|
||||
if registry.kdTree == nil || len(registry.peers) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Target: ideal peer
|
||||
target := []float64{0, 0, 0, 0}
|
||||
|
||||
results, _ := r.kdTree.KNearest(target, count)
|
||||
results, _ := registry.kdTree.KNearest(target, count)
|
||||
|
||||
peers := make([]*Peer, 0, len(results))
|
||||
for _, result := range results {
|
||||
if peer, exists := r.peers[result.Value]; exists {
|
||||
if peer, exists := registry.peers[result.Value]; exists {
|
||||
peerCopy := *peer
|
||||
peers = append(peers, &peerCopy)
|
||||
}
|
||||
|
|
@ -536,12 +536,12 @@ func (r *PeerRegistry) SelectNearestPeers(count int) []*Peer {
|
|||
}
|
||||
|
||||
// for _, peer := range registry.GetConnectedPeers() { ctrl.GetRemoteStats(peer.ID) }
|
||||
func (r *PeerRegistry) GetConnectedPeers() []*Peer {
|
||||
r.mutex.RLock()
|
||||
defer r.mutex.RUnlock()
|
||||
func (registry *PeerRegistry) GetConnectedPeers() []*Peer {
|
||||
registry.mutex.RLock()
|
||||
defer registry.mutex.RUnlock()
|
||||
|
||||
peers := make([]*Peer, 0)
|
||||
for _, peer := range r.peers {
|
||||
for _, peer := range registry.peers {
|
||||
if peer.Connected {
|
||||
peerCopy := *peer
|
||||
peers = append(peers, &peerCopy)
|
||||
|
|
@ -551,21 +551,21 @@ func (r *PeerRegistry) GetConnectedPeers() []*Peer {
|
|||
}
|
||||
|
||||
// if registry.Count() == 0 { return ErrNoPeers }
|
||||
func (r *PeerRegistry) Count() int {
|
||||
r.mutex.RLock()
|
||||
defer r.mutex.RUnlock()
|
||||
return len(r.peers)
|
||||
func (registry *PeerRegistry) Count() int {
|
||||
registry.mutex.RLock()
|
||||
defer registry.mutex.RUnlock()
|
||||
return len(registry.peers)
|
||||
}
|
||||
|
||||
// r.rebuildKDTree() // called after AddPeer, UpdatePeer, RemovePeer, UpdateMetrics — lock must be held
|
||||
func (r *PeerRegistry) rebuildKDTree() {
|
||||
if len(r.peers) == 0 {
|
||||
r.kdTree = nil
|
||||
// registry.rebuildKDTree() // called after AddPeer, UpdatePeer, RemovePeer, UpdateMetrics — lock must be held
|
||||
func (registry *PeerRegistry) rebuildKDTree() {
|
||||
if len(registry.peers) == 0 {
|
||||
registry.kdTree = nil
|
||||
return
|
||||
}
|
||||
|
||||
points := make([]poindexter.KDPoint[string], 0, len(r.peers))
|
||||
for _, peer := range r.peers {
|
||||
points := make([]poindexter.KDPoint[string], 0, len(registry.peers))
|
||||
for _, peer := range registry.peers {
|
||||
// Build 4D point with weighted, normalized values
|
||||
// Invert score so that higher score = lower value (better)
|
||||
point := poindexter.KDPoint[string]{
|
||||
|
|
@ -588,35 +588,35 @@ func (r *PeerRegistry) rebuildKDTree() {
|
|||
return
|
||||
}
|
||||
|
||||
r.kdTree = tree
|
||||
registry.kdTree = tree
|
||||
}
|
||||
|
||||
// r.scheduleSave() // called by AddPeer, UpdatePeer, RemovePeer, UpdateMetrics, UpdateScore
|
||||
// registry.scheduleSave() // called by AddPeer, UpdatePeer, RemovePeer, UpdateMetrics, UpdateScore
|
||||
// Multiple writes within saveDebounceInterval are coalesced into one disk write.
|
||||
// Must NOT be called with r.mutex held.
|
||||
func (r *PeerRegistry) scheduleSave() {
|
||||
r.saveMutex.Lock()
|
||||
defer r.saveMutex.Unlock()
|
||||
// Must NOT be called with registry.mutex held.
|
||||
func (registry *PeerRegistry) scheduleSave() {
|
||||
registry.saveMutex.Lock()
|
||||
defer registry.saveMutex.Unlock()
|
||||
|
||||
r.dirty = true
|
||||
registry.dirty = true
|
||||
|
||||
// If timer already running, let it handle the save
|
||||
if r.saveTimer != nil {
|
||||
if registry.saveTimer != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Start a new timer
|
||||
r.saveTimer = time.AfterFunc(saveDebounceInterval, func() {
|
||||
r.saveMutex.Lock()
|
||||
r.saveTimer = nil
|
||||
shouldSave := r.dirty
|
||||
r.dirty = false
|
||||
r.saveMutex.Unlock()
|
||||
registry.saveTimer = time.AfterFunc(saveDebounceInterval, func() {
|
||||
registry.saveMutex.Lock()
|
||||
registry.saveTimer = nil
|
||||
shouldSave := registry.dirty
|
||||
registry.dirty = false
|
||||
registry.saveMutex.Unlock()
|
||||
|
||||
if shouldSave {
|
||||
r.mutex.RLock()
|
||||
err := r.saveNow()
|
||||
r.mutex.RUnlock()
|
||||
registry.mutex.RLock()
|
||||
err := registry.saveNow()
|
||||
registry.mutex.RUnlock()
|
||||
if err != nil {
|
||||
// Log error but continue - best effort persistence
|
||||
logging.Warn("failed to save peer registry", logging.Fields{"error": err})
|
||||
|
|
@ -625,18 +625,18 @@ func (r *PeerRegistry) scheduleSave() {
|
|||
})
|
||||
}
|
||||
|
||||
// r.mutex.RLock(); err := r.saveNow(); r.mutex.RUnlock()
|
||||
// Must be called with r.mutex held (at least RLock).
|
||||
func (r *PeerRegistry) saveNow() error {
|
||||
// registry.mutex.RLock(); err := registry.saveNow(); registry.mutex.RUnlock()
|
||||
// Must be called with registry.mutex held (at least RLock).
|
||||
func (registry *PeerRegistry) saveNow() error {
|
||||
// Ensure directory exists
|
||||
directoryPath := filepath.Dir(r.path)
|
||||
directoryPath := filepath.Dir(registry.path)
|
||||
if err := os.MkdirAll(directoryPath, 0755); err != nil {
|
||||
return fmt.Errorf("failed to create peers directory: %w", err)
|
||||
}
|
||||
|
||||
// Convert to slice for JSON
|
||||
peers := make([]*Peer, 0, len(r.peers))
|
||||
for _, peer := range r.peers {
|
||||
peers := make([]*Peer, 0, len(registry.peers))
|
||||
for _, peer := range registry.peers {
|
||||
peers = append(peers, peer)
|
||||
}
|
||||
|
||||
|
|
@ -646,12 +646,12 @@ func (r *PeerRegistry) saveNow() error {
|
|||
}
|
||||
|
||||
// Use atomic write pattern: write to temp file, then rename
|
||||
temporaryPath := r.path + ".tmp"
|
||||
temporaryPath := registry.path + ".tmp"
|
||||
if err := os.WriteFile(temporaryPath, data, 0644); err != nil {
|
||||
return fmt.Errorf("failed to write peers temp file: %w", err)
|
||||
}
|
||||
|
||||
if err := os.Rename(temporaryPath, r.path); err != nil {
|
||||
if err := os.Rename(temporaryPath, registry.path); err != nil {
|
||||
os.Remove(temporaryPath) // Clean up temp file
|
||||
return fmt.Errorf("failed to rename peers file: %w", err)
|
||||
}
|
||||
|
|
@ -660,41 +660,41 @@ func (r *PeerRegistry) saveNow() error {
|
|||
}
|
||||
|
||||
// defer registry.Close() // flush pending writes on shutdown
|
||||
func (r *PeerRegistry) Close() error {
|
||||
r.saveStopOnce.Do(func() {
|
||||
close(r.stopChan)
|
||||
func (registry *PeerRegistry) Close() error {
|
||||
registry.saveStopOnce.Do(func() {
|
||||
close(registry.stopChan)
|
||||
})
|
||||
|
||||
// Cancel pending timer and save immediately if dirty
|
||||
r.saveMutex.Lock()
|
||||
if r.saveTimer != nil {
|
||||
r.saveTimer.Stop()
|
||||
r.saveTimer = nil
|
||||
registry.saveMutex.Lock()
|
||||
if registry.saveTimer != nil {
|
||||
registry.saveTimer.Stop()
|
||||
registry.saveTimer = nil
|
||||
}
|
||||
shouldSave := r.dirty
|
||||
r.dirty = false
|
||||
r.saveMutex.Unlock()
|
||||
shouldSave := registry.dirty
|
||||
registry.dirty = false
|
||||
registry.saveMutex.Unlock()
|
||||
|
||||
if shouldSave {
|
||||
r.mutex.RLock()
|
||||
err := r.saveNow()
|
||||
r.mutex.RUnlock()
|
||||
registry.mutex.RLock()
|
||||
err := registry.saveNow()
|
||||
registry.mutex.RUnlock()
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// r.save() // schedules a debounced write; errors logged asynchronously
|
||||
// Must NOT be called with r.mutex held.
|
||||
func (r *PeerRegistry) save() error {
|
||||
r.scheduleSave()
|
||||
// registry.save() // schedules a debounced write; errors logged asynchronously
|
||||
// Must NOT be called with registry.mutex held.
|
||||
func (registry *PeerRegistry) save() error {
|
||||
registry.scheduleSave()
|
||||
return nil // Errors will be logged asynchronously
|
||||
}
|
||||
|
||||
// if err := r.load(); err != nil { /* no existing peers, that's ok */ }
|
||||
func (r *PeerRegistry) load() error {
|
||||
data, err := os.ReadFile(r.path)
|
||||
// if err := registry.load(); err != nil { /* no existing peers, that's ok */ }
|
||||
func (registry *PeerRegistry) load() error {
|
||||
data, err := os.ReadFile(registry.path)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to read peers: %w", err)
|
||||
}
|
||||
|
|
@ -704,9 +704,9 @@ func (r *PeerRegistry) load() error {
|
|||
return fmt.Errorf("failed to unmarshal peers: %w", err)
|
||||
}
|
||||
|
||||
r.peers = make(map[string]*Peer)
|
||||
registry.peers = make(map[string]*Peer)
|
||||
for _, peer := range peers {
|
||||
r.peers[peer.ID] = peer
|
||||
registry.peers[peer.ID] = peer
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue