Applied AX principles 1, 2, and 9 across the node, dispatcher, transport, peer,
worker, bundle, and logging packages:
- Added usage-example comments to all public methods missing them (AX-2):
dispatcher.RegisterHandler, Handlers; peer.AddPeer, UpdatePeer, RemovePeer,
AllowPublicKey, RevokePublicKey, IsPublicKeyAllowed, IsPeerAllowed, RecordSuccess,
RecordFailure, RecordTimeout, SelectOptimalPeer, SelectNearestPeers, Count,
PeersByScore, AllowedPublicKeys, ListAllowedPublicKeys; transport.Start, Stop, Send,
Connections, Broadcast, PeerConnection.Send, Close, GracefulClose
- Removed redundant inline comments that restate code (bundle.go, transport.go, worker.go)
- Added _Bad and _Ugly test categories to logging/logger_test.go to satisfy the
TestFilename_Function_{Good,Bad,Ugly} naming convention (AX-10)
- Removed all linter-injected short-form alias files (*_compat.go, *_alias_test.go)
that violated AX-1 (Err* aliases, Uint64Val, WithComponent, GetLevel, GetGlobal, etc.)
Co-Authored-By: Virgil <virgil@lethean.io>
797 lines
20 KiB
Go
797 lines
20 KiB
Go
package node
|
|
|
|
import (
|
|
"iter"
|
|
"maps"
|
|
"regexp"
|
|
"slices"
|
|
"sync"
|
|
"time"
|
|
|
|
core "dappco.re/go/core"
|
|
"dappco.re/go/core/p2p/logging"
|
|
|
|
poindexter "forge.lthn.ai/Snider/Poindexter"
|
|
"github.com/adrg/xdg"
|
|
)
|
|
|
|
// Peer represents a known remote node.
|
|
//
|
|
// peer := &Peer{
|
|
// ID: "worker-1",
|
|
// Name: "Worker 1",
|
|
// Address: "127.0.0.1:9101",
|
|
// PingMilliseconds: 42.5,
|
|
// GeographicKilometres: 100,
|
|
// Score: 80,
|
|
// }
|
|
type Peer struct {
|
|
ID string `json:"id"`
|
|
Name string `json:"name"`
|
|
PublicKey string `json:"publicKey"`
|
|
Address string `json:"address"` // host:port for WebSocket connection
|
|
Role NodeRole `json:"role"`
|
|
AddedAt time.Time `json:"addedAt"`
|
|
LastSeen time.Time `json:"lastSeen"`
|
|
|
|
// Poindexter metrics (updated dynamically)
|
|
PingMilliseconds float64 `json:"pingMs"` // Latency in milliseconds
|
|
Hops int `json:"hops"` // Network hop count
|
|
GeographicKilometres float64 `json:"geoKm"` // Geographic distance in kilometres
|
|
Score float64 `json:"score"` // Reliability score 0-100
|
|
|
|
// Connection state (not persisted)
|
|
Connected bool `json:"-"`
|
|
}
|
|
|
|
// peerRegistrySaveDebounceInterval is the minimum time between disk writes.
|
|
const peerRegistrySaveDebounceInterval = 5 * time.Second
|
|
|
|
// PeerAuthMode controls how unknown peers are handled
|
|
//
|
|
// mode := PeerAuthAllowlist
|
|
type PeerAuthMode int
|
|
|
|
const (
|
|
// PeerAuthOpen allows any peer to connect (original behavior)
|
|
PeerAuthOpen PeerAuthMode = iota
|
|
// PeerAuthAllowlist only allows pre-registered peers or those with allowed public keys
|
|
PeerAuthAllowlist
|
|
)
|
|
|
|
// Peer name validation constants
|
|
const (
|
|
PeerNameMinLength = 1
|
|
PeerNameMaxLength = 64
|
|
)
|
|
|
|
// peerNamePattern validates peer names: alphanumeric, hyphens, underscores, and spaces.
|
|
var peerNamePattern = regexp.MustCompile(`^[a-zA-Z0-9][a-zA-Z0-9\-_ ]{0,62}[a-zA-Z0-9]$|^[a-zA-Z0-9]$`)
|
|
|
|
// safeKeyPrefix returns a truncated key for logging, handling short keys safely
|
|
func safeKeyPrefix(key string) string {
|
|
if len(key) >= 16 {
|
|
return key[:16] + "..."
|
|
}
|
|
if len(key) == 0 {
|
|
return "(empty)"
|
|
}
|
|
return key
|
|
}
|
|
|
|
// validatePeerName checks if a peer name is valid.
|
|
// Peer names must be 1-64 characters, start and end with alphanumeric,
|
|
// and contain only alphanumeric, hyphens, underscores, and spaces.
|
|
func validatePeerName(name string) error {
|
|
if name == "" {
|
|
return nil // Empty names are allowed (optional field)
|
|
}
|
|
if len(name) < PeerNameMinLength {
|
|
return core.E("validatePeerName", "peer name too short", nil)
|
|
}
|
|
if len(name) > PeerNameMaxLength {
|
|
return core.E("validatePeerName", "peer name too long", nil)
|
|
}
|
|
if !peerNamePattern.MatchString(name) {
|
|
return core.E("validatePeerName", "peer name contains invalid characters (use alphanumeric, hyphens, underscores, spaces)", nil)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// PeerRegistry manages known peers with KD-tree based selection.
|
|
//
|
|
// peerRegistry, err := NewPeerRegistry()
|
|
type PeerRegistry struct {
|
|
peers map[string]*Peer
|
|
kdTree *poindexter.KDTree[string] // KD-tree with peer ID as payload
|
|
path string
|
|
mu sync.RWMutex
|
|
|
|
// Authentication settings
|
|
authMode PeerAuthMode // How to handle unknown peers
|
|
allowedPublicKeys map[string]bool // Allowlist of public keys (when authMode is Allowlist)
|
|
allowedPublicKeyMu sync.RWMutex // Protects allowedPublicKeys
|
|
|
|
// Debounce disk writes
|
|
hasPendingChanges bool // Whether there are unsaved changes
|
|
pendingSaveTimer *time.Timer // Timer for debounced save
|
|
saveMutex sync.Mutex // Protects pending save state
|
|
}
|
|
|
|
// Dimension weights for peer selection.
|
|
// Lower ping, hops, and geographic distance are better; higher score is better.
|
|
var (
|
|
pingWeight = 1.0
|
|
hopsWeight = 0.7
|
|
geographicWeight = 0.2
|
|
scoreWeight = 1.2
|
|
)
|
|
|
|
// NewPeerRegistry loads the default peer registry.
|
|
//
|
|
// peerRegistry, err := NewPeerRegistry()
|
|
func NewPeerRegistry() (*PeerRegistry, error) {
|
|
peersPath, err := xdg.ConfigFile("lethean-desktop/peers.json")
|
|
if err != nil {
|
|
return nil, core.E("PeerRegistry.New", "failed to get peers path", err)
|
|
}
|
|
|
|
return NewPeerRegistryFromPath(peersPath)
|
|
}
|
|
|
|
// NewPeerRegistryFromPath loads or creates a peer registry at an explicit path.
|
|
//
|
|
// Missing files are treated as an empty registry; malformed registry files are
|
|
// returned as errors so callers can repair the persisted state.
|
|
//
|
|
// peerRegistry, err := NewPeerRegistryFromPath("/srv/p2p/peers.json")
|
|
func NewPeerRegistryFromPath(peersPath string) (*PeerRegistry, error) {
|
|
pr := &PeerRegistry{
|
|
peers: make(map[string]*Peer),
|
|
path: peersPath,
|
|
authMode: PeerAuthOpen, // Default to open.
|
|
allowedPublicKeys: make(map[string]bool),
|
|
}
|
|
|
|
// Missing files indicate a first run; any existing file must parse cleanly.
|
|
if !filesystemExists(peersPath) {
|
|
pr.rebuildKDTree()
|
|
return pr, nil
|
|
}
|
|
|
|
if err := pr.load(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
pr.rebuildKDTree()
|
|
return pr, nil
|
|
}
|
|
|
|
// SetAuthMode changes how unknown peers are handled.
|
|
//
|
|
// registry.SetAuthMode(PeerAuthAllowlist)
|
|
func (r *PeerRegistry) SetAuthMode(mode PeerAuthMode) {
|
|
r.allowedPublicKeyMu.Lock()
|
|
defer r.allowedPublicKeyMu.Unlock()
|
|
r.authMode = mode
|
|
logging.Info("peer auth mode changed", logging.Fields{"mode": mode})
|
|
}
|
|
|
|
// GetAuthMode returns the current authentication mode.
|
|
//
|
|
// mode := registry.GetAuthMode()
|
|
func (r *PeerRegistry) GetAuthMode() PeerAuthMode {
|
|
r.allowedPublicKeyMu.RLock()
|
|
defer r.allowedPublicKeyMu.RUnlock()
|
|
return r.authMode
|
|
}
|
|
|
|
// AllowPublicKey adds a public key to the allowlist.
|
|
//
|
|
// registry.AllowPublicKey(peer.PublicKey)
|
|
func (r *PeerRegistry) AllowPublicKey(publicKey string) {
|
|
r.allowedPublicKeyMu.Lock()
|
|
defer r.allowedPublicKeyMu.Unlock()
|
|
r.allowedPublicKeys[publicKey] = true
|
|
logging.Debug("public key added to allowlist", logging.Fields{"key": safeKeyPrefix(publicKey)})
|
|
}
|
|
|
|
// RevokePublicKey removes a public key from the allowlist.
|
|
//
|
|
// registry.RevokePublicKey(peer.PublicKey)
|
|
func (r *PeerRegistry) RevokePublicKey(publicKey string) {
|
|
r.allowedPublicKeyMu.Lock()
|
|
defer r.allowedPublicKeyMu.Unlock()
|
|
delete(r.allowedPublicKeys, publicKey)
|
|
logging.Debug("public key removed from allowlist", logging.Fields{"key": safeKeyPrefix(publicKey)})
|
|
}
|
|
|
|
// IsPublicKeyAllowed checks if a public key is in the allowlist.
|
|
//
|
|
// allowed := registry.IsPublicKeyAllowed(peer.PublicKey)
|
|
func (r *PeerRegistry) IsPublicKeyAllowed(publicKey string) bool {
|
|
r.allowedPublicKeyMu.RLock()
|
|
defer r.allowedPublicKeyMu.RUnlock()
|
|
return r.allowedPublicKeys[publicKey]
|
|
}
|
|
|
|
// IsPeerAllowed checks if a peer is allowed to connect based on auth mode.
|
|
// Returns true when AuthMode is Open (all allowed), or when Allowlist mode is active
|
|
// and the peer is pre-registered or its public key is in the allowlist.
|
|
//
|
|
// allowed := registry.IsPeerAllowed(peer.ID, peer.PublicKey)
|
|
func (r *PeerRegistry) IsPeerAllowed(peerID string, publicKey string) bool {
|
|
r.allowedPublicKeyMu.RLock()
|
|
authMode := r.authMode
|
|
keyAllowed := r.allowedPublicKeys[publicKey]
|
|
r.allowedPublicKeyMu.RUnlock()
|
|
|
|
// Open mode allows everyone
|
|
if authMode == PeerAuthOpen {
|
|
return true
|
|
}
|
|
|
|
// Allowlist mode: check if peer is pre-registered
|
|
r.mu.RLock()
|
|
_, isRegistered := r.peers[peerID]
|
|
r.mu.RUnlock()
|
|
|
|
if isRegistered {
|
|
return true
|
|
}
|
|
|
|
// Check if public key is allowlisted
|
|
return keyAllowed
|
|
}
|
|
|
|
// ListAllowedPublicKeys returns all allowlisted public keys.
|
|
//
|
|
// keys := registry.ListAllowedPublicKeys()
|
|
func (r *PeerRegistry) ListAllowedPublicKeys() []string {
|
|
return slices.Collect(r.AllowedPublicKeys())
|
|
}
|
|
|
|
// AllowedPublicKeys returns an iterator over all allowlisted public keys.
|
|
//
|
|
// for key := range registry.AllowedPublicKeys() {
|
|
// log.Printf("allowed: %s", key[:16])
|
|
// }
|
|
func (r *PeerRegistry) AllowedPublicKeys() iter.Seq[string] {
|
|
return func(yield func(string) bool) {
|
|
r.allowedPublicKeyMu.RLock()
|
|
defer r.allowedPublicKeyMu.RUnlock()
|
|
|
|
for key := range r.allowedPublicKeys {
|
|
if !yield(key) {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// AddPeer adds a new peer to the registry.
|
|
// Persistence is debounced — writes are batched every 5s. Call Close() before shutdown.
|
|
//
|
|
// err := registry.AddPeer(&Peer{ID: "worker-1", Address: "10.0.0.1:9091", Role: RoleWorker})
|
|
func (r *PeerRegistry) AddPeer(peer *Peer) error {
|
|
r.mu.Lock()
|
|
|
|
if peer.ID == "" {
|
|
r.mu.Unlock()
|
|
return core.E("PeerRegistry.AddPeer", "peer ID is required", nil)
|
|
}
|
|
|
|
// Validate peer name (P2P-LOW-3)
|
|
if err := validatePeerName(peer.Name); err != nil {
|
|
r.mu.Unlock()
|
|
return err
|
|
}
|
|
|
|
if _, exists := r.peers[peer.ID]; exists {
|
|
r.mu.Unlock()
|
|
return core.E("PeerRegistry.AddPeer", "peer "+peer.ID+" already exists", nil)
|
|
}
|
|
|
|
// Set defaults
|
|
if peer.AddedAt.IsZero() {
|
|
peer.AddedAt = time.Now()
|
|
}
|
|
if peer.Score == 0 {
|
|
peer.Score = 50 // Default neutral score
|
|
}
|
|
|
|
r.peers[peer.ID] = peer
|
|
r.rebuildKDTree()
|
|
r.mu.Unlock()
|
|
|
|
r.scheduleSave()
|
|
return nil
|
|
}
|
|
|
|
// UpdatePeer updates an existing peer's information.
|
|
// Persistence is debounced. Call Close() to flush before shutdown.
|
|
//
|
|
// err := registry.UpdatePeer(&Peer{ID: "worker-1", Score: 90})
|
|
func (r *PeerRegistry) UpdatePeer(peer *Peer) error {
|
|
r.mu.Lock()
|
|
|
|
if _, exists := r.peers[peer.ID]; !exists {
|
|
r.mu.Unlock()
|
|
return core.E("PeerRegistry.UpdatePeer", "peer "+peer.ID+" not found", nil)
|
|
}
|
|
|
|
r.peers[peer.ID] = peer
|
|
r.rebuildKDTree()
|
|
r.mu.Unlock()
|
|
|
|
r.scheduleSave()
|
|
return nil
|
|
}
|
|
|
|
// RemovePeer removes a peer from the registry.
|
|
// Persistence is debounced. Call Close() to flush before shutdown.
|
|
//
|
|
// err := registry.RemovePeer("worker-1")
|
|
func (r *PeerRegistry) RemovePeer(id string) error {
|
|
r.mu.Lock()
|
|
|
|
if _, exists := r.peers[id]; !exists {
|
|
r.mu.Unlock()
|
|
return core.E("PeerRegistry.RemovePeer", "peer "+id+" not found", nil)
|
|
}
|
|
|
|
delete(r.peers, id)
|
|
r.rebuildKDTree()
|
|
r.mu.Unlock()
|
|
|
|
r.scheduleSave()
|
|
return nil
|
|
}
|
|
|
|
// GetPeer returns a copy of the peer with the supplied ID.
|
|
//
|
|
// peer := registry.GetPeer("worker-1")
|
|
func (r *PeerRegistry) GetPeer(id string) *Peer {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
|
|
peer, exists := r.peers[id]
|
|
if !exists {
|
|
return nil
|
|
}
|
|
|
|
// Return a copy
|
|
peerCopy := *peer
|
|
return &peerCopy
|
|
}
|
|
|
|
// ListPeers returns all registered peers.
|
|
func (r *PeerRegistry) ListPeers() []*Peer {
|
|
return slices.Collect(r.Peers())
|
|
}
|
|
|
|
// Peers returns an iterator over all registered peers.
|
|
// Each peer is a copy to prevent mutation.
|
|
func (r *PeerRegistry) Peers() iter.Seq[*Peer] {
|
|
return func(yield func(*Peer) bool) {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
|
|
for _, peer := range r.peers {
|
|
peerCopy := *peer
|
|
if !yield(&peerCopy) {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// UpdateMetrics updates a peer's performance metrics.
|
|
//
|
|
// registry.UpdateMetrics("worker-1", 42.5, 100, 3)
|
|
//
|
|
// Note: Persistence is debounced. Call Close() to flush before shutdown.
|
|
func (r *PeerRegistry) UpdateMetrics(id string, pingMilliseconds, geographicKilometres float64, hopCount int) error {
|
|
r.mu.Lock()
|
|
|
|
peer, exists := r.peers[id]
|
|
if !exists {
|
|
r.mu.Unlock()
|
|
return core.E("PeerRegistry.UpdateMetrics", "peer "+id+" not found", nil)
|
|
}
|
|
|
|
peer.PingMilliseconds = pingMilliseconds
|
|
peer.GeographicKilometres = geographicKilometres
|
|
peer.Hops = hopCount
|
|
peer.LastSeen = time.Now()
|
|
|
|
r.rebuildKDTree()
|
|
r.mu.Unlock()
|
|
|
|
r.scheduleSave()
|
|
return nil
|
|
}
|
|
|
|
// UpdateScore updates a peer's reliability score.
|
|
// Note: Persistence is debounced. Call Close() to flush before shutdown.
|
|
func (r *PeerRegistry) UpdateScore(id string, score float64) error {
|
|
r.mu.Lock()
|
|
|
|
peer, exists := r.peers[id]
|
|
if !exists {
|
|
r.mu.Unlock()
|
|
return core.E("PeerRegistry.UpdateScore", "peer "+id+" not found", nil)
|
|
}
|
|
|
|
// Clamp score to 0-100
|
|
score = max(0, min(score, 100))
|
|
|
|
peer.Score = score
|
|
r.rebuildKDTree()
|
|
r.mu.Unlock()
|
|
|
|
r.scheduleSave()
|
|
return nil
|
|
}
|
|
|
|
// SetConnected updates a peer's connection state.
|
|
//
|
|
// registry.SetConnected("worker-1", true)
|
|
func (r *PeerRegistry) SetConnected(id string, connected bool) {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
|
|
if peer, exists := r.peers[id]; exists {
|
|
peer.Connected = connected
|
|
if connected {
|
|
peer.LastSeen = time.Now()
|
|
}
|
|
}
|
|
}
|
|
|
|
// Score adjustment constants
|
|
const (
|
|
ScoreSuccessIncrement = 1.0 // Increment for successful interaction
|
|
ScoreFailureDecrement = 5.0 // Decrement for failed interaction
|
|
ScoreTimeoutDecrement = 3.0 // Decrement for timeout
|
|
ScoreMinimum = 0.0 // Minimum score
|
|
ScoreMaximum = 100.0 // Maximum score
|
|
ScoreDefault = 50.0 // Default score for new peers
|
|
)
|
|
|
|
// RecordSuccess records a successful interaction with a peer, improving their score.
|
|
//
|
|
// registry.RecordSuccess("worker-1")
|
|
func (r *PeerRegistry) RecordSuccess(id string) {
|
|
r.mu.Lock()
|
|
peer, exists := r.peers[id]
|
|
if !exists {
|
|
r.mu.Unlock()
|
|
return
|
|
}
|
|
|
|
peer.Score = min(peer.Score+ScoreSuccessIncrement, ScoreMaximum)
|
|
peer.LastSeen = time.Now()
|
|
r.mu.Unlock()
|
|
r.scheduleSave()
|
|
}
|
|
|
|
// RecordFailure records a failed interaction with a peer, reducing their score.
|
|
//
|
|
// registry.RecordFailure("worker-1")
|
|
func (r *PeerRegistry) RecordFailure(id string) {
|
|
r.mu.Lock()
|
|
peer, exists := r.peers[id]
|
|
if !exists {
|
|
r.mu.Unlock()
|
|
return
|
|
}
|
|
|
|
peer.Score = max(peer.Score-ScoreFailureDecrement, ScoreMinimum)
|
|
newScore := peer.Score
|
|
r.mu.Unlock()
|
|
r.scheduleSave()
|
|
|
|
logging.Debug("peer score decreased", logging.Fields{
|
|
"peer_id": id,
|
|
"new_score": newScore,
|
|
"reason": "failure",
|
|
})
|
|
}
|
|
|
|
// RecordTimeout records a timeout when communicating with a peer.
|
|
//
|
|
// registry.RecordTimeout("worker-1")
|
|
func (r *PeerRegistry) RecordTimeout(id string) {
|
|
r.mu.Lock()
|
|
peer, exists := r.peers[id]
|
|
if !exists {
|
|
r.mu.Unlock()
|
|
return
|
|
}
|
|
|
|
peer.Score = max(peer.Score-ScoreTimeoutDecrement, ScoreMinimum)
|
|
newScore := peer.Score
|
|
r.mu.Unlock()
|
|
r.scheduleSave()
|
|
|
|
logging.Debug("peer score decreased", logging.Fields{
|
|
"peer_id": id,
|
|
"new_score": newScore,
|
|
"reason": "timeout",
|
|
})
|
|
}
|
|
|
|
// GetPeersByScore returns peers sorted by score, highest first.
|
|
//
|
|
// peers := registry.GetPeersByScore()
|
|
func (r *PeerRegistry) GetPeersByScore() []*Peer {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
|
|
peers := slices.Collect(maps.Values(r.peers))
|
|
|
|
// Sort by score descending
|
|
slices.SortFunc(peers, func(a, b *Peer) int {
|
|
if b.Score > a.Score {
|
|
return 1
|
|
}
|
|
if b.Score < a.Score {
|
|
return -1
|
|
}
|
|
return 0
|
|
})
|
|
|
|
return peers
|
|
}
|
|
|
|
// PeersByScore returns an iterator over peers sorted by score (highest first).
|
|
//
|
|
// for peer := range registry.PeersByScore() {
|
|
// log.Printf("peer %s score=%.0f", peer.ID, peer.Score)
|
|
// }
|
|
func (r *PeerRegistry) PeersByScore() iter.Seq[*Peer] {
|
|
return func(yield func(*Peer) bool) {
|
|
peers := r.GetPeersByScore()
|
|
for _, p := range peers {
|
|
if !yield(p) {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// SelectOptimalPeer returns the best peer based on multi-factor optimisation.
|
|
// Uses Poindexter KD-tree to find the peer closest to ideal metrics (low ping, low hops, high score).
|
|
//
|
|
// peer := registry.SelectOptimalPeer()
|
|
func (r *PeerRegistry) SelectOptimalPeer() *Peer {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
|
|
if r.kdTree == nil || len(r.peers) == 0 {
|
|
return nil
|
|
}
|
|
|
|
// Target: ideal peer (0 ping, 0 hops, 0 geographic distance, 100 score)
|
|
// Score is inverted (100 - score) so lower is better in the tree
|
|
target := []float64{0, 0, 0, 0}
|
|
|
|
result, _, found := r.kdTree.Nearest(target)
|
|
if !found {
|
|
return nil
|
|
}
|
|
|
|
peer, exists := r.peers[result.Value]
|
|
if !exists {
|
|
return nil
|
|
}
|
|
|
|
peerCopy := *peer
|
|
return &peerCopy
|
|
}
|
|
|
|
// SelectNearestPeers returns the n best peers based on multi-factor optimisation.
|
|
//
|
|
// peers := registry.SelectNearestPeers(3)
|
|
func (r *PeerRegistry) SelectNearestPeers(n int) []*Peer {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
|
|
if r.kdTree == nil || len(r.peers) == 0 {
|
|
return nil
|
|
}
|
|
|
|
// Target: ideal peer
|
|
target := []float64{0, 0, 0, 0}
|
|
|
|
results, _ := r.kdTree.KNearest(target, n)
|
|
|
|
peers := make([]*Peer, 0, len(results))
|
|
for _, result := range results {
|
|
if peer, exists := r.peers[result.Value]; exists {
|
|
peerCopy := *peer
|
|
peers = append(peers, &peerCopy)
|
|
}
|
|
}
|
|
|
|
return peers
|
|
}
|
|
|
|
// GetConnectedPeers returns all currently connected peers as a slice.
|
|
//
|
|
// connectedPeers := registry.GetConnectedPeers()
|
|
func (r *PeerRegistry) GetConnectedPeers() []*Peer {
|
|
return slices.Collect(r.ConnectedPeers())
|
|
}
|
|
|
|
// ConnectedPeers returns an iterator over all currently connected peers.
|
|
// Each peer is a copy to prevent mutation.
|
|
func (r *PeerRegistry) ConnectedPeers() iter.Seq[*Peer] {
|
|
return func(yield func(*Peer) bool) {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
|
|
for _, peer := range r.peers {
|
|
if peer.Connected {
|
|
peerCopy := *peer
|
|
if !yield(&peerCopy) {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Count returns the number of registered peers.
|
|
//
|
|
// n := registry.Count()
|
|
func (r *PeerRegistry) Count() int {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
return len(r.peers)
|
|
}
|
|
|
|
// rebuildKDTree rebuilds the KD-tree from current peers.
|
|
// Must be called with lock held.
|
|
func (r *PeerRegistry) rebuildKDTree() {
|
|
if len(r.peers) == 0 {
|
|
r.kdTree = nil
|
|
return
|
|
}
|
|
|
|
points := make([]poindexter.KDPoint[string], 0, len(r.peers))
|
|
for _, peer := range r.peers {
|
|
// Build a 4D point with weighted, normalised values.
|
|
// Invert score so that higher score = lower value (better)
|
|
point := poindexter.KDPoint[string]{
|
|
ID: peer.ID,
|
|
Coords: []float64{
|
|
peer.PingMilliseconds * pingWeight,
|
|
float64(peer.Hops) * hopsWeight,
|
|
peer.GeographicKilometres * geographicWeight,
|
|
(100 - peer.Score) * scoreWeight, // Invert score
|
|
},
|
|
Value: peer.ID,
|
|
}
|
|
points = append(points, point)
|
|
}
|
|
|
|
// Build KD-tree with Euclidean distance
|
|
tree, err := poindexter.NewKDTree(points, poindexter.WithMetric(poindexter.EuclideanDistance{}))
|
|
if err != nil {
|
|
// Log error but continue - worst case we don't have optimal selection
|
|
return
|
|
}
|
|
|
|
r.kdTree = tree
|
|
}
|
|
|
|
// scheduleSave schedules a debounced save operation.
|
|
// Multiple calls within peerRegistrySaveDebounceInterval will be coalesced into a single save.
|
|
// Call it after releasing r.mu so peer state and save state do not interleave.
|
|
func (r *PeerRegistry) scheduleSave() {
|
|
r.saveMutex.Lock()
|
|
defer r.saveMutex.Unlock()
|
|
|
|
r.hasPendingChanges = true
|
|
|
|
// If timer already running, let it handle the save
|
|
if r.pendingSaveTimer != nil {
|
|
return
|
|
}
|
|
|
|
// Start a new timer
|
|
r.pendingSaveTimer = time.AfterFunc(peerRegistrySaveDebounceInterval, func() {
|
|
r.saveMutex.Lock()
|
|
r.pendingSaveTimer = nil
|
|
shouldSave := r.hasPendingChanges
|
|
r.hasPendingChanges = false
|
|
r.saveMutex.Unlock()
|
|
|
|
if shouldSave {
|
|
r.mu.RLock()
|
|
err := r.saveNow()
|
|
r.mu.RUnlock()
|
|
if err != nil {
|
|
// Log error but continue - best effort persistence
|
|
logging.Warn("failed to save peer registry", logging.Fields{"error": err})
|
|
}
|
|
}
|
|
})
|
|
}
|
|
|
|
// saveNow persists peers to disk immediately.
|
|
// Must be called with r.mu held (at least RLock).
|
|
func (r *PeerRegistry) saveNow() error {
|
|
// Ensure directory exists
|
|
dir := core.PathDir(r.path)
|
|
if err := filesystemEnsureDir(dir); err != nil {
|
|
return core.E("PeerRegistry.saveNow", "failed to create peers directory", err)
|
|
}
|
|
|
|
// Convert to slice for JSON
|
|
peers := slices.Collect(maps.Values(r.peers))
|
|
|
|
result := core.JSONMarshal(peers)
|
|
if !result.OK {
|
|
return core.E("PeerRegistry.saveNow", "failed to marshal peers", result.Value.(error))
|
|
}
|
|
data := result.Value.([]byte)
|
|
|
|
// Use atomic write pattern: write to temp file, then rename
|
|
tmpPath := r.path + ".tmp"
|
|
if err := filesystemWrite(tmpPath, string(data)); err != nil {
|
|
return core.E("PeerRegistry.saveNow", "failed to write peers temp file", err)
|
|
}
|
|
|
|
if err := filesystemRename(tmpPath, r.path); err != nil {
|
|
filesystemDelete(tmpPath) // Clean up temp file
|
|
return core.E("PeerRegistry.saveNow", "failed to rename peers file", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Close flushes any pending changes and releases resources.
|
|
func (r *PeerRegistry) Close() error {
|
|
// Cancel any pending timer and save immediately if changes are queued.
|
|
r.saveMutex.Lock()
|
|
if r.pendingSaveTimer != nil {
|
|
r.pendingSaveTimer.Stop()
|
|
r.pendingSaveTimer = nil
|
|
}
|
|
shouldSave := r.hasPendingChanges
|
|
r.hasPendingChanges = false
|
|
r.saveMutex.Unlock()
|
|
|
|
if shouldSave {
|
|
r.mu.RLock()
|
|
err := r.saveNow()
|
|
r.mu.RUnlock()
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// load reads peers from disk.
|
|
func (r *PeerRegistry) load() error {
|
|
content, err := filesystemRead(r.path)
|
|
if err != nil {
|
|
return core.E("PeerRegistry.load", "failed to read peers", err)
|
|
}
|
|
|
|
var peers []*Peer
|
|
result := core.JSONUnmarshalString(content, &peers)
|
|
if !result.OK {
|
|
return core.E("PeerRegistry.load", "failed to unmarshal peers", result.Value.(error))
|
|
}
|
|
|
|
r.peers = make(map[string]*Peer)
|
|
for _, peer := range peers {
|
|
r.peers[peer.ID] = peer
|
|
}
|
|
|
|
return nil
|
|
}
|