refactor(node): align AX comments and peer copy semantics
Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
parent
e3b66f7e8c
commit
194fe707de
11 changed files with 146 additions and 44 deletions
|
|
@ -60,7 +60,7 @@ The `Transport` manages a WebSocket server (gorilla/websocket) and outbound conn
|
|||
|
||||
**Keepalive**: A goroutine per connection ticks at `PingInterval`. If `LastActivity` has not been updated within `PingInterval + PongTimeout`, the connection is removed.
|
||||
|
||||
**Graceful close**: `GracefulClose` sends `MsgDisconnect` before closing the underlying WebSocket. Write deadlines are managed exclusively inside `Send()` under `writeMu` to prevent the race (P2P-RACE-1) where a bare `SetWriteDeadline` call could race with concurrent sends.
|
||||
**Graceful close**: `GracefulClose` sends `MsgDisconnect` before closing the underlying WebSocket. Write deadlines are managed exclusively inside `Send()` under `writeMutex` to prevent the race (P2P-RACE-1) where a bare `SetWriteDeadline` call could race with concurrent sends.
|
||||
|
||||
**Buffer pool**: `MarshalJSON` uses a `sync.Pool` of `bytes.Buffer` (initial capacity 1 KB, maximum pooled size 64 KB) to reduce allocation pressure in the message serialisation hot path. HTML escaping is disabled to match `json.Marshal` semantics.
|
||||
|
||||
|
|
@ -70,15 +70,15 @@ The `Transport` manages a WebSocket server (gorilla/websocket) and outbound conn
|
|||
|
||||
**Peer fields persisted**:
|
||||
- `ID`, `Name`, `PublicKey`, `Address`, `Role`, `AddedAt`, `LastSeen`
|
||||
- `PingMS`, `Hops`, `GeoKM`, `Score` (float64, 0–100)
|
||||
- `PingMilliseconds`, `Hops`, `GeographicKilometres`, `Score` (float64, 0–100)
|
||||
|
||||
**KD-tree dimensions** (lower is better in all axes):
|
||||
|
||||
| Dimension | Weight | Rationale |
|
||||
|-----------|--------|-----------|
|
||||
| `PingMS` | 1.0 | Latency dominates interactive performance |
|
||||
| `PingMilliseconds` | 1.0 | Latency dominates interactive performance |
|
||||
| `Hops` | 0.7 | Network hop count (routing cost) |
|
||||
| `GeoKM` | 0.2 | Geographic distance (minor factor) |
|
||||
| `GeographicKilometres` | 0.2 | Geographic distance (minor factor) |
|
||||
| `100 - Score` | 1.2 | Reliability (inverted so lower = better peer) |
|
||||
|
||||
`SelectOptimalPeer()` queries the tree for the point nearest to the origin (ideal: zero latency, zero hops, zero distance, maximum score). `SelectNearestPeers(n)` returns the n best.
|
||||
|
|
@ -236,7 +236,7 @@ A global logger instance is available via `logging.Debug(...)`, `logging.Info(..
|
|||
|----------|------------|
|
||||
| `Transport.conns` | `sync.RWMutex` |
|
||||
| `Transport.handler` | `sync.RWMutex` |
|
||||
| `PeerConnection` writes | `sync.Mutex` (`writeMu`) |
|
||||
| `PeerConnection` writes | `sync.Mutex` (`writeMutex`) |
|
||||
| `PeerConnection` close | `sync.Once` (`closeOnce`) |
|
||||
| `PeerRegistry.peers` + KD-tree | `sync.RWMutex` |
|
||||
| `PeerRegistry.allowedPublicKeys` | separate `sync.RWMutex` |
|
||||
|
|
|
|||
|
|
@ -20,9 +20,9 @@ type Peer struct {
|
|||
LastSeen time.Time `json:"lastSeen"`
|
||||
|
||||
// Poindexter metrics (updated dynamically)
|
||||
PingMS float64 `json:"pingMs"` // Latency in milliseconds
|
||||
PingMilliseconds float64 `json:"pingMs"` // Latency in milliseconds
|
||||
Hops int `json:"hops"` // Network hop count
|
||||
GeoKM float64 `json:"geoKm"` // Geographic distance in kilometres
|
||||
GeographicKilometres float64 `json:"geoKm"` // Geographic distance in kilometres
|
||||
Score float64 `json:"score"` // Reliability score 0--100
|
||||
|
||||
Connected bool `json:"-"` // Not persisted
|
||||
|
|
@ -83,9 +83,9 @@ The registry maintains a 4-dimensional KD-tree for optimal peer selection. Each
|
|||
|
||||
| Dimension | Source | Weight | Direction |
|
||||
|-----------|--------|--------|-----------|
|
||||
| Latency | `PingMS` | 1.0 | Lower is better |
|
||||
| Latency | `PingMilliseconds` | 1.0 | Lower is better |
|
||||
| Hops | `Hops` | 0.7 | Lower is better |
|
||||
| Geographic distance | `GeoKM` | 0.2 | Lower is better |
|
||||
| Geographic distance | `GeographicKilometres` | 0.2 | Lower is better |
|
||||
| Reliability | `100 - Score` | 1.2 | Inverted so lower is better |
|
||||
|
||||
The score dimension is inverted so that the "ideal peer" target point `[0, 0, 0, 0]` represents zero latency, zero hops, zero distance, and maximum reliability (score 100).
|
||||
|
|
|
|||
|
|
@ -31,7 +31,7 @@ Tests covered:
|
|||
- MaxConnections enforcement: 503 HTTP rejection when limit is reached
|
||||
- Keepalive timeout: connection cleaned up after `PingInterval + PongTimeout` elapses
|
||||
- Graceful close: `MsgDisconnect` sent before underlying WebSocket close
|
||||
- Concurrent sends: no data races under `go test -race` (`writeMu` protects all writes)
|
||||
- Concurrent sends: no data races under `go test -race` (`writeMutex` protects all writes)
|
||||
|
||||
### Phase 3 — Controller Tests
|
||||
|
||||
|
|
@ -104,9 +104,9 @@ The originally identified risk — that `transport.OnMessage(c.handleResponse)`
|
|||
|
||||
### P2P-RACE-1 — GracefulClose Data Race (Phase 3)
|
||||
|
||||
`GracefulClose` previously called `pc.Conn.SetWriteDeadline()` outside of `writeMu`, racing with concurrent `Send()` calls that also set the write deadline. Detected by `go test -race`.
|
||||
`GracefulClose` previously called `pc.Conn.SetWriteDeadline()` outside of `writeMutex`, racing with concurrent `Send()` calls that also set the write deadline. Detected by `go test -race`.
|
||||
|
||||
Fix: removed the bare `SetWriteDeadline` call from `GracefulClose`. The method now relies entirely on `Send()`, which manages write deadlines under `writeMu`. This is documented in a comment in `transport.go` to prevent the pattern from being reintroduced.
|
||||
Fix: removed the bare `SetWriteDeadline` call from `GracefulClose`. The method now relies entirely on `Send()`, which manages write deadlines under `writeMutex`. This is documented in a comment in `transport.go` to prevent the pattern from being reintroduced.
|
||||
|
||||
## Wiki Corrections (19 February 2026)
|
||||
|
||||
|
|
|
|||
|
|
@ -39,13 +39,13 @@ Paths follow XDG base directories via `github.com/adrg/xdg`. The private key is
|
|||
### Creating an Identity
|
||||
|
||||
```go
|
||||
nm, err := node.NewNodeManager()
|
||||
nodeManager, err := node.NewNodeManager()
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
// Generate a new identity (persists key and config to disk)
|
||||
err = nm.GenerateIdentity("eu-controller-01", node.RoleController)
|
||||
err = nodeManager.GenerateIdentity("eu-controller-01", node.RoleController)
|
||||
```
|
||||
|
||||
Internally this calls `stmf.GenerateKeyPair()` from the Borg library to produce the X25519 keypair.
|
||||
|
|
@ -53,7 +53,7 @@ Internally this calls `stmf.GenerateKeyPair()` from the Borg library to produce
|
|||
### Custom Paths (Testing)
|
||||
|
||||
```go
|
||||
nm, err := node.NewNodeManagerFromPaths(
|
||||
nodeManager, err := node.NewNodeManagerFromPaths(
|
||||
"/tmp/test/private.key",
|
||||
"/tmp/test/node.json",
|
||||
)
|
||||
|
|
@ -62,8 +62,8 @@ nm, err := node.NewNodeManagerFromPaths(
|
|||
### Checking and Retrieving Identity
|
||||
|
||||
```go
|
||||
if nm.HasIdentity() {
|
||||
identity := nm.GetIdentity() // Returns a copy
|
||||
if nodeManager.HasIdentity() {
|
||||
identity := nodeManager.GetIdentity() // Returns a copy
|
||||
fmt.Println(identity.ID, identity.Name)
|
||||
}
|
||||
```
|
||||
|
|
@ -73,7 +73,7 @@ if nm.HasIdentity() {
|
|||
### Deriving Shared Secrets
|
||||
|
||||
```go
|
||||
sharedSecret, err := nm.DeriveSharedSecret(peerPublicKeyBase64)
|
||||
sharedSecret, err := nodeManager.DeriveSharedSecret(peerPublicKeyBase64)
|
||||
```
|
||||
|
||||
This performs X25519 ECDH with the peer's public key and hashes the result with SHA-256, producing a 32-byte symmetric key. The same shared secret is derived independently by both sides (no secret is transmitted).
|
||||
|
|
@ -81,7 +81,7 @@ This performs X25519 ECDH with the peer's public key and hashes the result with
|
|||
### Deleting an Identity
|
||||
|
||||
```go
|
||||
err := nm.Delete() // Removes key and config from disk, clears in-memory state
|
||||
err := nodeManager.Delete() // Removes key and config from disk, clears in-memory state
|
||||
```
|
||||
|
||||
## Challenge-Response Authentication
|
||||
|
|
|
|||
|
|
@ -25,7 +25,7 @@ type TransportConfig struct {
|
|||
Sensible defaults via `DefaultTransportConfig()`:
|
||||
|
||||
```go
|
||||
cfg := node.DefaultTransportConfig()
|
||||
transportConfig := node.DefaultTransportConfig()
|
||||
// ListenAddr: ":9091", WebSocketPath: "/ws", MaxConnections: 100
|
||||
// MaxMessageSize: 1MB, PingInterval: 30s, PongTimeout: 10s
|
||||
```
|
||||
|
|
@ -33,10 +33,10 @@ cfg := node.DefaultTransportConfig()
|
|||
## Creating and Starting
|
||||
|
||||
```go
|
||||
transport := node.NewTransport(nodeManager, peerRegistry, cfg)
|
||||
transport := node.NewTransport(nodeManager, peerRegistry, transportConfig)
|
||||
|
||||
// Set message handler before Start() to avoid races
|
||||
transport.OnMessage(func(conn *node.PeerConnection, msg *node.Message) {
|
||||
transport.OnMessage(func(peerConnection *node.PeerConnection, msg *node.Message) {
|
||||
// Handle incoming messages
|
||||
})
|
||||
|
||||
|
|
@ -96,15 +96,15 @@ type PeerConnection struct {
|
|||
### Sending Messages
|
||||
|
||||
```go
|
||||
err := peerConn.Send(msg)
|
||||
err := peerConnection.Send(msg)
|
||||
```
|
||||
|
||||
`Send()` serialises the message to JSON, encrypts it with SMSG, sets a 10-second write deadline, and writes as a binary WebSocket frame. A `writeMu` mutex serialises concurrent writes.
|
||||
`Send()` serialises the message to JSON, encrypts it with SMSG, sets a 10-second write deadline, and writes as a binary WebSocket frame. A `writeMutex` serialises concurrent writes.
|
||||
|
||||
### Graceful Close
|
||||
|
||||
```go
|
||||
err := peerConn.GracefulClose("shutting down", node.DisconnectShutdown)
|
||||
err := peerConnection.GracefulClose("shutting down", node.DisconnectShutdown)
|
||||
```
|
||||
|
||||
Sends a `disconnect` message (best-effort) before closing the connection. Uses `sync.Once` to ensure the connection is only closed once.
|
||||
|
|
|
|||
|
|
@ -347,9 +347,7 @@ func extractTarball(tarData []byte, destDir string) (string, error) {
|
|||
return firstExecutable, nil
|
||||
}
|
||||
|
||||
// StreamBundle writes a bundle to a writer (for large transfers).
|
||||
//
|
||||
// err := StreamBundle(bundle, writer)
|
||||
// err := StreamBundle(bundle, writer)
|
||||
func StreamBundle(bundle *Bundle, w io.Writer) error {
|
||||
result := core.JSONMarshal(bundle)
|
||||
if !result.OK {
|
||||
|
|
@ -359,9 +357,7 @@ func StreamBundle(bundle *Bundle, w io.Writer) error {
|
|||
return err
|
||||
}
|
||||
|
||||
// ReadBundle reads a bundle from a reader.
|
||||
//
|
||||
// bundle, err := ReadBundle(reader)
|
||||
// bundle, err := ReadBundle(reader)
|
||||
func ReadBundle(r io.Reader) (*Bundle, error) {
|
||||
var buf bytes.Buffer
|
||||
if _, err := io.Copy(&buf, r); err != nil {
|
||||
|
|
|
|||
|
|
@ -87,10 +87,9 @@ func NewNodeManager() (*NodeManager, error) {
|
|||
return NewNodeManagerFromPaths(keyPath, configPath)
|
||||
}
|
||||
|
||||
// Missing files are treated as a fresh install; malformed or partial identity
|
||||
// state is returned as an error so callers can handle it explicitly.
|
||||
//
|
||||
// nodeManager, err := NewNodeManagerFromPaths("/srv/p2p/private.key", "/srv/p2p/node.json")
|
||||
// Missing files are treated as a fresh install; malformed or partial identity
|
||||
// state returns an error so callers can handle it explicitly.
|
||||
func NewNodeManagerFromPaths(keyPath, configPath string) (*NodeManager, error) {
|
||||
nm := &NodeManager{
|
||||
keyPath: keyPath,
|
||||
|
|
|
|||
33
node/peer.go
33
node/peer.go
|
|
@ -131,10 +131,9 @@ func NewPeerRegistry() (*PeerRegistry, error) {
|
|||
return NewPeerRegistryFromPath(peersPath)
|
||||
}
|
||||
|
||||
// Missing files are treated as an empty registry; malformed registry files are
|
||||
// returned as errors so callers can repair the persisted state.
|
||||
//
|
||||
// peerRegistry, err := NewPeerRegistryFromPath("/srv/p2p/peers.json")
|
||||
// Missing files are treated as an empty registry; malformed registry files
|
||||
// return an error so callers can repair the persisted state.
|
||||
func NewPeerRegistryFromPath(peersPath string) (*PeerRegistry, error) {
|
||||
pr := &PeerRegistry{
|
||||
peers: make(map[string]*Peer),
|
||||
|
|
@ -248,6 +247,13 @@ func (r *PeerRegistry) AllowedPublicKeys() iter.Seq[string] {
|
|||
//
|
||||
// err := registry.AddPeer(&Peer{ID: "worker-1", Address: "10.0.0.1:9091", Role: RoleWorker})
|
||||
func (r *PeerRegistry) AddPeer(peer *Peer) error {
|
||||
if peer == nil {
|
||||
return core.E("PeerRegistry.AddPeer", "peer is nil", nil)
|
||||
}
|
||||
|
||||
peerCopy := *peer
|
||||
peer = &peerCopy
|
||||
|
||||
r.mu.Lock()
|
||||
|
||||
if peer.ID == "" {
|
||||
|
|
@ -271,7 +277,7 @@ func (r *PeerRegistry) AddPeer(peer *Peer) error {
|
|||
peer.AddedAt = time.Now()
|
||||
}
|
||||
if peer.Score == 0 {
|
||||
peer.Score = 50 // Default neutral score
|
||||
peer.Score = ScoreDefault
|
||||
}
|
||||
|
||||
r.peers[peer.ID] = peer
|
||||
|
|
@ -286,6 +292,17 @@ func (r *PeerRegistry) AddPeer(peer *Peer) error {
|
|||
//
|
||||
// err := registry.UpdatePeer(&Peer{ID: "worker-1", Score: 90})
|
||||
func (r *PeerRegistry) UpdatePeer(peer *Peer) error {
|
||||
if peer == nil {
|
||||
return core.E("PeerRegistry.UpdatePeer", "peer is nil", nil)
|
||||
}
|
||||
|
||||
if peer.ID == "" {
|
||||
return core.E("PeerRegistry.UpdatePeer", "peer ID is required", nil)
|
||||
}
|
||||
|
||||
peerCopy := *peer
|
||||
peer = &peerCopy
|
||||
|
||||
r.mu.Lock()
|
||||
|
||||
if _, exists := r.peers[peer.ID]; !exists {
|
||||
|
|
@ -502,7 +519,13 @@ func (r *PeerRegistry) GetPeersByScore() []*Peer {
|
|||
return 0
|
||||
})
|
||||
|
||||
return peers
|
||||
peerCopies := make([]*Peer, 0, len(peers))
|
||||
for _, peer := range peers {
|
||||
peerCopy := *peer
|
||||
peerCopies = append(peerCopies, &peerCopy)
|
||||
}
|
||||
|
||||
return peerCopies
|
||||
}
|
||||
|
||||
// for peer := range registry.PeersByScore() {
|
||||
|
|
|
|||
|
|
@ -65,6 +65,15 @@ func TestPeer_Registry_AddPeer_Good(t *testing.T) {
|
|||
t.Errorf("expected 1 peer, got %d", pr.Count())
|
||||
}
|
||||
|
||||
peer.Name = "Mutated after add"
|
||||
stored := pr.GetPeer("test-peer-1")
|
||||
if stored == nil {
|
||||
t.Fatal("expected peer to exist after add")
|
||||
}
|
||||
if stored.Name != "Test Peer" {
|
||||
t.Errorf("expected stored peer to remain unchanged, got %q", stored.Name)
|
||||
}
|
||||
|
||||
// Try to add duplicate
|
||||
err = pr.AddPeer(peer)
|
||||
if err == nil {
|
||||
|
|
@ -634,6 +643,34 @@ func TestPeer_Registry_PeersSortedByScore_Good(t *testing.T) {
|
|||
if sorted[2].ID != "low-score" {
|
||||
t.Errorf("third peer should be low-score, got %s", sorted[2].ID)
|
||||
}
|
||||
|
||||
sorted[0].Name = "Mutated"
|
||||
restored := pr.GetPeer("high-score")
|
||||
if restored == nil {
|
||||
t.Fatal("expected high-score peer to still exist")
|
||||
}
|
||||
if restored.Name != "High" {
|
||||
t.Errorf("expected registry peer to remain unchanged, got %q", restored.Name)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPeer_Registry_NilPeerInputs_Bad(t *testing.T) {
|
||||
pr, cleanup := setupTestPeerRegistry(t)
|
||||
defer cleanup()
|
||||
|
||||
t.Run("AddPeer", func(t *testing.T) {
|
||||
err := pr.AddPeer(nil)
|
||||
if err == nil {
|
||||
t.Fatal("expected error when adding nil peer")
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("UpdatePeer", func(t *testing.T) {
|
||||
err := pr.UpdatePeer(nil)
|
||||
if err == nil {
|
||||
t.Fatal("expected error when updating nil peer")
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// --- Additional coverage tests for peer.go ---
|
||||
|
|
@ -736,6 +773,19 @@ func TestPeer_Registry_UpdatePeer_Good(t *testing.T) {
|
|||
if updated.Score != 80 {
|
||||
t.Errorf("expected score 80, got %f", updated.Score)
|
||||
}
|
||||
|
||||
peer.Name = "Mutated after update"
|
||||
peer.Score = 12
|
||||
stored := pr.GetPeer("update-test")
|
||||
if stored == nil {
|
||||
t.Fatal("expected peer to exist after update mutation")
|
||||
}
|
||||
if stored.Name != "Updated" {
|
||||
t.Errorf("expected stored peer name to remain Updated, got %q", stored.Name)
|
||||
}
|
||||
if stored.Score != 80 {
|
||||
t.Errorf("expected stored peer score to remain 80, got %f", stored.Score)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPeer_Registry_UpdateMetrics_NotFound_Bad(t *testing.T) {
|
||||
|
|
|
|||
|
|
@ -107,22 +107,45 @@ func NewMessageDeduplicator(retentionWindow time.Duration) *MessageDeduplicator
|
|||
return d
|
||||
}
|
||||
|
||||
// IsDuplicate checks whether a message ID is still within the retention window.
|
||||
// duplicate := deduplicator.IsDuplicate(message.ID)
|
||||
func (d *MessageDeduplicator) IsDuplicate(msgID string) bool {
|
||||
d.mutex.RLock()
|
||||
_, exists := d.recentMessageTimes[msgID]
|
||||
seenAt, exists := d.recentMessageTimes[msgID]
|
||||
retentionWindow := d.timeToLive
|
||||
d.mutex.RUnlock()
|
||||
return exists
|
||||
|
||||
if !exists {
|
||||
return false
|
||||
}
|
||||
|
||||
if retentionWindow > 0 && time.Since(seenAt) <= retentionWindow {
|
||||
return true
|
||||
}
|
||||
|
||||
d.mutex.Lock()
|
||||
defer d.mutex.Unlock()
|
||||
|
||||
seenAt, exists = d.recentMessageTimes[msgID]
|
||||
if !exists {
|
||||
return false
|
||||
}
|
||||
|
||||
if retentionWindow <= 0 || time.Since(seenAt) > retentionWindow {
|
||||
delete(d.recentMessageTimes, msgID)
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// Mark records a message ID as recently seen.
|
||||
// deduplicator.Mark(message.ID)
|
||||
func (d *MessageDeduplicator) Mark(msgID string) {
|
||||
d.mutex.Lock()
|
||||
d.recentMessageTimes[msgID] = time.Now()
|
||||
d.mutex.Unlock()
|
||||
}
|
||||
|
||||
// Cleanup removes expired entries from the deduplicator.
|
||||
// deduplicator.Cleanup()
|
||||
func (d *MessageDeduplicator) Cleanup() {
|
||||
d.mutex.Lock()
|
||||
defer d.mutex.Unlock()
|
||||
|
|
|
|||
|
|
@ -153,6 +153,17 @@ func TestTransport_MessageDeduplicator_Good(t *testing.T) {
|
|||
}
|
||||
})
|
||||
|
||||
t.Run("ExpiredEntriesDoNotLinger", func(t *testing.T) {
|
||||
d := NewMessageDeduplicator(50 * time.Millisecond)
|
||||
d.Mark("msg-1")
|
||||
|
||||
time.Sleep(75 * time.Millisecond)
|
||||
|
||||
if d.IsDuplicate("msg-1") {
|
||||
t.Error("should not be duplicate after TTL even before cleanup runs")
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("ConcurrentAccess", func(t *testing.T) {
|
||||
d := NewMessageDeduplicator(5 * time.Minute)
|
||||
var wg sync.WaitGroup
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue