feat: modernise to Go 1.26 iterators and stdlib helpers
Add Peers, AllowedPublicKeys, ConnectedPeers, PeersByScore iterators on PeerManager. Add Handlers on Dispatcher, Connections on Transport. Use slices.Collect, maps.Keys/Values, slices.SortFunc internally. Co-Authored-By: Gemini <noreply@google.com> Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
parent
e3a8d5b2d8
commit
50a6e12cf4
6 changed files with 132 additions and 51 deletions
2
go.mod
2
go.mod
|
|
@ -12,8 +12,8 @@ require (
|
|||
)
|
||||
|
||||
require (
|
||||
github.com/ProtonMail/go-crypto v1.3.0 // indirect
|
||||
forge.lthn.ai/Snider/Enchantrix v0.0.4 // indirect
|
||||
github.com/ProtonMail/go-crypto v1.3.0 // indirect
|
||||
github.com/cloudflare/circl v1.6.3 // indirect
|
||||
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
|
||||
github.com/klauspost/compress v1.18.4 // indirect
|
||||
|
|
|
|||
21
go.sum
21
go.sum
|
|
@ -1,26 +1,45 @@
|
|||
forge.lthn.ai/Snider/Borg v0.2.1 h1:tsbbLQukDm4fyTkBDi98cwzoWkcCVXBOl9lhoxNDWJ4=
|
||||
forge.lthn.ai/Snider/Borg v0.2.1 h1:Uf/YtUJLL8jlxTCjvP4J+5GHe3LLeALGtbh7zj8d8Qc=
|
||||
forge.lthn.ai/Snider/Borg v0.2.1/go.mod h1:MVfolb7F6/A2LOIijcbBhWImu5db5NSMcSjvShMoMCA=
|
||||
forge.lthn.ai/Snider/Enchantrix v0.0.4 h1:biwpix/bdedfyc0iVeK15awhhJKH6TEMYOTXzHXx5TI=
|
||||
forge.lthn.ai/Snider/Enchantrix v0.0.4/go.mod h1:OGCwuVeZPq3OPe2h6TX/ZbgEjHU6B7owpIBeXQGbSe0=
|
||||
forge.lthn.ai/Snider/Poindexter v0.0.2 h1:XXzSKFjO6MeftQAnB9qR+IkOTp9f57Tg4sIx8Qzi/II=
|
||||
forge.lthn.ai/Snider/Poindexter v0.0.2/go.mod h1:ddzGia98k3HKkR0gl58IDzqz+MmgW2cQJOCNLfuWPpo=
|
||||
github.com/ProtonMail/go-crypto v1.3.0 h1:ILq8+Sf5If5DCpHQp4PbZdS1J7HDFRXz/+xKBiRGFrw=
|
||||
github.com/ProtonMail/go-crypto v1.3.0/go.mod h1:9whxjD8Rbs29b4XWbB8irEcE8KHMqaR2e7GWU1R+/PE=
|
||||
github.com/adrg/xdg v0.5.3 h1:xRnxJXne7+oWDatRhR1JLnvuccuIeCoBu2rtuLqQB78=
|
||||
github.com/adrg/xdg v0.5.3/go.mod h1:nlTsY+NNiCBGCK2tpm09vRqfVzrc2fLmXGpBLF0zlTQ=
|
||||
github.com/cloudflare/circl v1.6.3 h1:9GPOhQGF9MCYUeXyMYlqTR6a5gTrgR/fBLXvUgtVcg8=
|
||||
github.com/cloudflare/circl v1.6.3/go.mod h1:2eXP6Qfat4O/Yhh8BznvKnJ+uzEoTQ6jVKJRn81BiS4=
|
||||
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
|
||||
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
|
||||
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
||||
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
|
||||
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
|
||||
github.com/klauspost/compress v1.18.4 h1:RPhnKRAQ4Fh8zU2FY/6ZFDwTVTxgJ/EMydqSTzE9a2c=
|
||||
github.com/klauspost/compress v1.18.4/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4=
|
||||
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
|
||||
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
|
||||
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
|
||||
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
|
||||
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
|
||||
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
|
||||
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
|
||||
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
|
||||
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
|
||||
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
|
||||
github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ=
|
||||
github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc=
|
||||
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
|
||||
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
|
||||
golang.org/x/crypto v0.48.0 h1:/VRzVqiRSggnhY7gNRxPauEQ5Drw9haKdM0jqfcCFts=
|
||||
golang.org/x/crypto v0.48.0/go.mod h1:r0kV5h3qnFPlQnBSrULhlsRfryS2pmewsg+XfMgkVos=
|
||||
golang.org/x/sys v0.41.0 h1:Ivj+2Cp/ylzLiEU89QhWblYnOE9zerudt9Ftecq2C6k=
|
||||
golang.org/x/sys v0.41.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
|
||||
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
|
||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
|
|
|
|||
|
|
@ -239,12 +239,11 @@ func (c *Controller) GetRemoteLogs(peerID, minerName string, lines int) ([]strin
|
|||
|
||||
// GetAllStats fetches stats from all connected peers.
|
||||
func (c *Controller) GetAllStats() map[string]*StatsPayload {
|
||||
peers := c.peers.GetConnectedPeers()
|
||||
results := make(map[string]*StatsPayload)
|
||||
var mu sync.Mutex
|
||||
var wg sync.WaitGroup
|
||||
|
||||
for _, peer := range peers {
|
||||
for peer := range c.peers.ConnectedPeers() {
|
||||
wg.Add(1)
|
||||
go func(p *Peer) {
|
||||
defer wg.Done()
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@ package node
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"iter"
|
||||
"sync"
|
||||
|
||||
"forge.lthn.ai/core/go-p2p/logging"
|
||||
|
|
@ -70,6 +71,20 @@ func (d *Dispatcher) RegisterHandler(intentID byte, handler IntentHandler) {
|
|||
})
|
||||
}
|
||||
|
||||
// Handlers returns an iterator over all registered intent handlers.
|
||||
func (d *Dispatcher) Handlers() iter.Seq2[byte, IntentHandler] {
|
||||
return func(yield func(byte, IntentHandler) bool) {
|
||||
d.mu.RLock()
|
||||
defer d.mu.RUnlock()
|
||||
|
||||
for intentID, handler := range d.handlers {
|
||||
if !yield(intentID, handler) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Dispatch routes a parsed UEPS packet through the threat circuit breaker
|
||||
// and then to the appropriate intent handler.
|
||||
//
|
||||
|
|
|
|||
104
node/peer.go
104
node/peer.go
|
|
@ -3,9 +3,12 @@ package node
|
|||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"iter"
|
||||
"maps"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"regexp"
|
||||
"slices"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
|
|
@ -215,14 +218,21 @@ func (r *PeerRegistry) IsPeerAllowed(peerID string, publicKey string) bool {
|
|||
|
||||
// ListAllowedPublicKeys returns all allowlisted public keys.
|
||||
func (r *PeerRegistry) ListAllowedPublicKeys() []string {
|
||||
r.allowedPublicKeyMu.RLock()
|
||||
defer r.allowedPublicKeyMu.RUnlock()
|
||||
return slices.Collect(r.AllowedPublicKeys())
|
||||
}
|
||||
|
||||
keys := make([]string, 0, len(r.allowedPublicKeys))
|
||||
for key := range r.allowedPublicKeys {
|
||||
keys = append(keys, key)
|
||||
// AllowedPublicKeys returns an iterator over all allowlisted public keys.
|
||||
func (r *PeerRegistry) AllowedPublicKeys() iter.Seq[string] {
|
||||
return func(yield func(string) bool) {
|
||||
r.allowedPublicKeyMu.RLock()
|
||||
defer r.allowedPublicKeyMu.RUnlock()
|
||||
|
||||
for key := range r.allowedPublicKeys {
|
||||
if !yield(key) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
return keys
|
||||
}
|
||||
|
||||
// AddPeer adds a new peer to the registry.
|
||||
|
|
@ -313,15 +323,23 @@ func (r *PeerRegistry) GetPeer(id string) *Peer {
|
|||
|
||||
// ListPeers returns all registered peers.
|
||||
func (r *PeerRegistry) ListPeers() []*Peer {
|
||||
r.mu.RLock()
|
||||
defer r.mu.RUnlock()
|
||||
return slices.Collect(r.Peers())
|
||||
}
|
||||
|
||||
peers := make([]*Peer, 0, len(r.peers))
|
||||
for _, peer := range r.peers {
|
||||
peerCopy := *peer
|
||||
peers = append(peers, &peerCopy)
|
||||
// Peers returns an iterator over all registered peers.
|
||||
// Each peer is a copy to prevent mutation.
|
||||
func (r *PeerRegistry) Peers() iter.Seq[*Peer] {
|
||||
return func(yield func(*Peer) bool) {
|
||||
r.mu.RLock()
|
||||
defer r.mu.RUnlock()
|
||||
|
||||
for _, peer := range r.peers {
|
||||
peerCopy := *peer
|
||||
if !yield(&peerCopy) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
return peers
|
||||
}
|
||||
|
||||
// UpdateMetrics updates a peer's performance metrics.
|
||||
|
|
@ -456,21 +474,32 @@ func (r *PeerRegistry) GetPeersByScore() []*Peer {
|
|||
r.mu.RLock()
|
||||
defer r.mu.RUnlock()
|
||||
|
||||
peers := make([]*Peer, 0, len(r.peers))
|
||||
for _, p := range r.peers {
|
||||
peers = append(peers, p)
|
||||
}
|
||||
peers := slices.Collect(maps.Values(r.peers))
|
||||
|
||||
// Sort by score descending
|
||||
for i := 0; i < len(peers)-1; i++ {
|
||||
for j := i + 1; j < len(peers); j++ {
|
||||
if peers[j].Score > peers[i].Score {
|
||||
peers[i], peers[j] = peers[j], peers[i]
|
||||
slices.SortFunc(peers, func(a, b *Peer) int {
|
||||
if b.Score > a.Score {
|
||||
return 1
|
||||
}
|
||||
if b.Score < a.Score {
|
||||
return -1
|
||||
}
|
||||
return 0
|
||||
})
|
||||
|
||||
return peers
|
||||
}
|
||||
|
||||
// PeersByScore returns an iterator over peers sorted by score (highest first).
|
||||
func (r *PeerRegistry) PeersByScore() iter.Seq[*Peer] {
|
||||
return func(yield func(*Peer) bool) {
|
||||
peers := r.GetPeersByScore()
|
||||
for _, p := range peers {
|
||||
if !yield(p) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return peers
|
||||
}
|
||||
|
||||
// SelectOptimalPeer returns the best peer based on multi-factor optimization.
|
||||
|
|
@ -528,17 +557,25 @@ func (r *PeerRegistry) SelectNearestPeers(n int) []*Peer {
|
|||
|
||||
// GetConnectedPeers returns all currently connected peers.
|
||||
func (r *PeerRegistry) GetConnectedPeers() []*Peer {
|
||||
r.mu.RLock()
|
||||
defer r.mu.RUnlock()
|
||||
return slices.Collect(r.ConnectedPeers())
|
||||
}
|
||||
|
||||
peers := make([]*Peer, 0)
|
||||
for _, peer := range r.peers {
|
||||
if peer.Connected {
|
||||
peerCopy := *peer
|
||||
peers = append(peers, &peerCopy)
|
||||
// ConnectedPeers returns an iterator over all currently connected peers.
|
||||
// Each peer is a copy to prevent mutation.
|
||||
func (r *PeerRegistry) ConnectedPeers() iter.Seq[*Peer] {
|
||||
return func(yield func(*Peer) bool) {
|
||||
r.mu.RLock()
|
||||
defer r.mu.RUnlock()
|
||||
|
||||
for _, peer := range r.peers {
|
||||
if peer.Connected {
|
||||
peerCopy := *peer
|
||||
if !yield(&peerCopy) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return peers
|
||||
}
|
||||
|
||||
// Count returns the number of registered peers.
|
||||
|
|
@ -627,10 +664,7 @@ func (r *PeerRegistry) saveNow() error {
|
|||
}
|
||||
|
||||
// Convert to slice for JSON
|
||||
peers := make([]*Peer, 0, len(r.peers))
|
||||
for _, peer := range r.peers {
|
||||
peers = append(peers, peer)
|
||||
}
|
||||
peers := slices.Collect(maps.Values(r.peers))
|
||||
|
||||
data, err := json.MarshalIndent(peers, "", " ")
|
||||
if err != nil {
|
||||
|
|
|
|||
|
|
@ -6,8 +6,11 @@ import (
|
|||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"iter"
|
||||
"maps"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"slices"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
|
@ -271,11 +274,13 @@ func (t *Transport) Stop() error {
|
|||
t.cancel()
|
||||
|
||||
// Gracefully close all connections with shutdown message
|
||||
t.mu.Lock()
|
||||
for _, pc := range t.conns {
|
||||
t.mu.RLock()
|
||||
conns := slices.Collect(maps.Values(t.conns))
|
||||
t.mu.RUnlock()
|
||||
|
||||
for _, pc := range conns {
|
||||
pc.GracefulClose("server shutdown", DisconnectShutdown)
|
||||
}
|
||||
t.mu.Unlock()
|
||||
|
||||
// Shutdown HTTP server if it was started
|
||||
if t.server != nil {
|
||||
|
|
@ -368,22 +373,31 @@ func (t *Transport) Send(peerID string, msg *Message) error {
|
|||
return pc.Send(msg)
|
||||
}
|
||||
|
||||
// Connections returns an iterator over all active peer connections.
|
||||
func (t *Transport) Connections() iter.Seq[*PeerConnection] {
|
||||
return func(yield func(*PeerConnection) bool) {
|
||||
t.mu.RLock()
|
||||
defer t.mu.RUnlock()
|
||||
|
||||
for _, pc := range t.conns {
|
||||
if !yield(pc) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Broadcast sends a message to all connected peers except the sender.
|
||||
// The sender is identified by msg.From and excluded to prevent echo.
|
||||
func (t *Transport) Broadcast(msg *Message) error {
|
||||
t.mu.RLock()
|
||||
conns := make([]*PeerConnection, 0, len(t.conns))
|
||||
for _, pc := range t.conns {
|
||||
conns := slices.Collect(t.Connections())
|
||||
|
||||
var lastErr error
|
||||
for _, pc := range conns {
|
||||
// Exclude sender from broadcast to prevent echo (P2P-MED-6)
|
||||
if pc.Peer != nil && pc.Peer.ID == msg.From {
|
||||
continue
|
||||
}
|
||||
conns = append(conns, pc)
|
||||
}
|
||||
t.mu.RUnlock()
|
||||
|
||||
var lastErr error
|
||||
for _, pc := range conns {
|
||||
if err := pc.Send(msg); err != nil {
|
||||
lastErr = err
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue