refactor(node): align AX comments and peer copy semantics

Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
Virgil 2026-03-31 14:19:03 +00:00
parent 1ee54add39
commit e5953e4b86
11 changed files with 146 additions and 44 deletions

View file

@ -60,7 +60,7 @@ The `Transport` manages a WebSocket server (gorilla/websocket) and outbound conn
**Keepalive**: A goroutine per connection ticks at `PingInterval`. If `LastActivity` has not been updated within `PingInterval + PongTimeout`, the connection is removed.
**Graceful close**: `GracefulClose` sends `MsgDisconnect` before closing the underlying WebSocket. Write deadlines are managed exclusively inside `Send()` under `writeMu` to prevent the race (P2P-RACE-1) where a bare `SetWriteDeadline` call could race with concurrent sends.
**Graceful close**: `GracefulClose` sends `MsgDisconnect` before closing the underlying WebSocket. Write deadlines are managed exclusively inside `Send()` under `writeMutex` to prevent the race (P2P-RACE-1) where a bare `SetWriteDeadline` call could race with concurrent sends.
**Buffer pool**: `MarshalJSON` uses a `sync.Pool` of `bytes.Buffer` (initial capacity 1 KB, maximum pooled size 64 KB) to reduce allocation pressure in the message serialisation hot path. HTML escaping is disabled to match `json.Marshal` semantics.
@ -70,15 +70,15 @@ The `Transport` manages a WebSocket server (gorilla/websocket) and outbound conn
**Peer fields persisted**:
- `ID`, `Name`, `PublicKey`, `Address`, `Role`, `AddedAt`, `LastSeen`
- `PingMS`, `Hops`, `GeoKM`, `Score` (float64, 0100)
- `PingMilliseconds`, `Hops`, `GeographicKilometres`, `Score` (float64, 0100)
**KD-tree dimensions** (lower is better in all axes):
| Dimension | Weight | Rationale |
|-----------|--------|-----------|
| `PingMS` | 1.0 | Latency dominates interactive performance |
| `PingMilliseconds` | 1.0 | Latency dominates interactive performance |
| `Hops` | 0.7 | Network hop count (routing cost) |
| `GeoKM` | 0.2 | Geographic distance (minor factor) |
| `GeographicKilometres` | 0.2 | Geographic distance (minor factor) |
| `100 - Score` | 1.2 | Reliability (inverted so lower = better peer) |
`SelectOptimalPeer()` queries the tree for the point nearest to the origin (ideal: zero latency, zero hops, zero distance, maximum score). `SelectNearestPeers(n)` returns the n best.
@ -236,7 +236,7 @@ A global logger instance is available via `logging.Debug(...)`, `logging.Info(..
|----------|------------|
| `Transport.conns` | `sync.RWMutex` |
| `Transport.handler` | `sync.RWMutex` |
| `PeerConnection` writes | `sync.Mutex` (`writeMu`) |
| `PeerConnection` writes | `sync.Mutex` (`writeMutex`) |
| `PeerConnection` close | `sync.Once` (`closeOnce`) |
| `PeerRegistry.peers` + KD-tree | `sync.RWMutex` |
| `PeerRegistry.allowedPublicKeys` | separate `sync.RWMutex` |

View file

@ -20,9 +20,9 @@ type Peer struct {
LastSeen time.Time `json:"lastSeen"`
// Poindexter metrics (updated dynamically)
PingMS float64 `json:"pingMs"` // Latency in milliseconds
PingMilliseconds float64 `json:"pingMs"` // Latency in milliseconds
Hops int `json:"hops"` // Network hop count
GeoKM float64 `json:"geoKm"` // Geographic distance in kilometres
GeographicKilometres float64 `json:"geoKm"` // Geographic distance in kilometres
Score float64 `json:"score"` // Reliability score 0--100
Connected bool `json:"-"` // Not persisted
@ -83,9 +83,9 @@ The registry maintains a 4-dimensional KD-tree for optimal peer selection. Each
| Dimension | Source | Weight | Direction |
|-----------|--------|--------|-----------|
| Latency | `PingMS` | 1.0 | Lower is better |
| Latency | `PingMilliseconds` | 1.0 | Lower is better |
| Hops | `Hops` | 0.7 | Lower is better |
| Geographic distance | `GeoKM` | 0.2 | Lower is better |
| Geographic distance | `GeographicKilometres` | 0.2 | Lower is better |
| Reliability | `100 - Score` | 1.2 | Inverted so lower is better |
The score dimension is inverted so that the "ideal peer" target point `[0, 0, 0, 0]` represents zero latency, zero hops, zero distance, and maximum reliability (score 100).

View file

@ -31,7 +31,7 @@ Tests covered:
- MaxConnections enforcement: 503 HTTP rejection when limit is reached
- Keepalive timeout: connection cleaned up after `PingInterval + PongTimeout` elapses
- Graceful close: `MsgDisconnect` sent before underlying WebSocket close
- Concurrent sends: no data races under `go test -race` (`writeMu` protects all writes)
- Concurrent sends: no data races under `go test -race` (`writeMutex` protects all writes)
### Phase 3 — Controller Tests
@ -104,9 +104,9 @@ The originally identified risk — that `transport.OnMessage(c.handleResponse)`
### P2P-RACE-1 — GracefulClose Data Race (Phase 3)
`GracefulClose` previously called `pc.Conn.SetWriteDeadline()` outside of `writeMu`, racing with concurrent `Send()` calls that also set the write deadline. Detected by `go test -race`.
`GracefulClose` previously called `pc.Conn.SetWriteDeadline()` outside of `writeMutex`, racing with concurrent `Send()` calls that also set the write deadline. Detected by `go test -race`.
Fix: removed the bare `SetWriteDeadline` call from `GracefulClose`. The method now relies entirely on `Send()`, which manages write deadlines under `writeMu`. This is documented in a comment in `transport.go` to prevent the pattern from being reintroduced.
Fix: removed the bare `SetWriteDeadline` call from `GracefulClose`. The method now relies entirely on `Send()`, which manages write deadlines under `writeMutex`. This is documented in a comment in `transport.go` to prevent the pattern from being reintroduced.
## Wiki Corrections (19 February 2026)

View file

@ -39,13 +39,13 @@ Paths follow XDG base directories via `github.com/adrg/xdg`. The private key is
### Creating an Identity
```go
nm, err := node.NewNodeManager()
nodeManager, err := node.NewNodeManager()
if err != nil {
log.Fatal(err)
}
// Generate a new identity (persists key and config to disk)
err = nm.GenerateIdentity("eu-controller-01", node.RoleController)
err = nodeManager.GenerateIdentity("eu-controller-01", node.RoleController)
```
Internally this calls `stmf.GenerateKeyPair()` from the Borg library to produce the X25519 keypair.
@ -53,7 +53,7 @@ Internally this calls `stmf.GenerateKeyPair()` from the Borg library to produce
### Custom Paths (Testing)
```go
nm, err := node.NewNodeManagerFromPaths(
nodeManager, err := node.NewNodeManagerFromPaths(
"/tmp/test/private.key",
"/tmp/test/node.json",
)
@ -62,8 +62,8 @@ nm, err := node.NewNodeManagerFromPaths(
### Checking and Retrieving Identity
```go
if nm.HasIdentity() {
identity := nm.GetIdentity() // Returns a copy
if nodeManager.HasIdentity() {
identity := nodeManager.GetIdentity() // Returns a copy
fmt.Println(identity.ID, identity.Name)
}
```
@ -73,7 +73,7 @@ if nm.HasIdentity() {
### Deriving Shared Secrets
```go
sharedSecret, err := nm.DeriveSharedSecret(peerPublicKeyBase64)
sharedSecret, err := nodeManager.DeriveSharedSecret(peerPublicKeyBase64)
```
This performs X25519 ECDH with the peer's public key and hashes the result with SHA-256, producing a 32-byte symmetric key. The same shared secret is derived independently by both sides (no secret is transmitted).
@ -81,7 +81,7 @@ This performs X25519 ECDH with the peer's public key and hashes the result with
### Deleting an Identity
```go
err := nm.Delete() // Removes key and config from disk, clears in-memory state
err := nodeManager.Delete() // Removes key and config from disk, clears in-memory state
```
## Challenge-Response Authentication

View file

@ -25,7 +25,7 @@ type TransportConfig struct {
Sensible defaults via `DefaultTransportConfig()`:
```go
cfg := node.DefaultTransportConfig()
transportConfig := node.DefaultTransportConfig()
// ListenAddr: ":9091", WebSocketPath: "/ws", MaxConnections: 100
// MaxMessageSize: 1MB, PingInterval: 30s, PongTimeout: 10s
```
@ -33,10 +33,10 @@ cfg := node.DefaultTransportConfig()
## Creating and Starting
```go
transport := node.NewTransport(nodeManager, peerRegistry, cfg)
transport := node.NewTransport(nodeManager, peerRegistry, transportConfig)
// Set message handler before Start() to avoid races
transport.OnMessage(func(conn *node.PeerConnection, msg *node.Message) {
transport.OnMessage(func(peerConnection *node.PeerConnection, msg *node.Message) {
// Handle incoming messages
})
@ -96,15 +96,15 @@ type PeerConnection struct {
### Sending Messages
```go
err := peerConn.Send(msg)
err := peerConnection.Send(msg)
```
`Send()` serialises the message to JSON, encrypts it with SMSG, sets a 10-second write deadline, and writes as a binary WebSocket frame. A `writeMu` mutex serialises concurrent writes.
`Send()` serialises the message to JSON, encrypts it with SMSG, sets a 10-second write deadline, and writes as a binary WebSocket frame. A `writeMutex` serialises concurrent writes.
### Graceful Close
```go
err := peerConn.GracefulClose("shutting down", node.DisconnectShutdown)
err := peerConnection.GracefulClose("shutting down", node.DisconnectShutdown)
```
Sends a `disconnect` message (best-effort) before closing the connection. Uses `sync.Once` to ensure the connection is only closed once.

View file

@ -347,9 +347,7 @@ func extractTarball(tarData []byte, destDir string) (string, error) {
return firstExecutable, nil
}
// StreamBundle writes a bundle to a writer (for large transfers).
//
// err := StreamBundle(bundle, writer)
// err := StreamBundle(bundle, writer)
func StreamBundle(bundle *Bundle, w io.Writer) error {
result := core.JSONMarshal(bundle)
if !result.OK {
@ -359,9 +357,7 @@ func StreamBundle(bundle *Bundle, w io.Writer) error {
return err
}
// ReadBundle reads a bundle from a reader.
//
// bundle, err := ReadBundle(reader)
// bundle, err := ReadBundle(reader)
func ReadBundle(r io.Reader) (*Bundle, error) {
var buf bytes.Buffer
if _, err := io.Copy(&buf, r); err != nil {

View file

@ -87,10 +87,9 @@ func NewNodeManager() (*NodeManager, error) {
return NewNodeManagerFromPaths(keyPath, configPath)
}
// Missing files are treated as a fresh install; malformed or partial identity
// state is returned as an error so callers can handle it explicitly.
//
// nodeManager, err := NewNodeManagerFromPaths("/srv/p2p/private.key", "/srv/p2p/node.json")
// Missing files are treated as a fresh install; malformed or partial identity
// state returns an error so callers can handle it explicitly.
func NewNodeManagerFromPaths(keyPath, configPath string) (*NodeManager, error) {
nm := &NodeManager{
keyPath: keyPath,

View file

@ -131,10 +131,9 @@ func NewPeerRegistry() (*PeerRegistry, error) {
return NewPeerRegistryFromPath(peersPath)
}
// Missing files are treated as an empty registry; malformed registry files are
// returned as errors so callers can repair the persisted state.
//
// peerRegistry, err := NewPeerRegistryFromPath("/srv/p2p/peers.json")
// Missing files are treated as an empty registry; malformed registry files
// return an error so callers can repair the persisted state.
func NewPeerRegistryFromPath(peersPath string) (*PeerRegistry, error) {
pr := &PeerRegistry{
peers: make(map[string]*Peer),
@ -248,6 +247,13 @@ func (r *PeerRegistry) AllowedPublicKeys() iter.Seq[string] {
//
// err := registry.AddPeer(&Peer{ID: "worker-1", Address: "10.0.0.1:9091", Role: RoleWorker})
func (r *PeerRegistry) AddPeer(peer *Peer) error {
if peer == nil {
return core.E("PeerRegistry.AddPeer", "peer is nil", nil)
}
peerCopy := *peer
peer = &peerCopy
r.mu.Lock()
if peer.ID == "" {
@ -271,7 +277,7 @@ func (r *PeerRegistry) AddPeer(peer *Peer) error {
peer.AddedAt = time.Now()
}
if peer.Score == 0 {
peer.Score = 50 // Default neutral score
peer.Score = ScoreDefault
}
r.peers[peer.ID] = peer
@ -286,6 +292,17 @@ func (r *PeerRegistry) AddPeer(peer *Peer) error {
//
// err := registry.UpdatePeer(&Peer{ID: "worker-1", Score: 90})
func (r *PeerRegistry) UpdatePeer(peer *Peer) error {
if peer == nil {
return core.E("PeerRegistry.UpdatePeer", "peer is nil", nil)
}
if peer.ID == "" {
return core.E("PeerRegistry.UpdatePeer", "peer ID is required", nil)
}
peerCopy := *peer
peer = &peerCopy
r.mu.Lock()
if _, exists := r.peers[peer.ID]; !exists {
@ -502,7 +519,13 @@ func (r *PeerRegistry) GetPeersByScore() []*Peer {
return 0
})
return peers
peerCopies := make([]*Peer, 0, len(peers))
for _, peer := range peers {
peerCopy := *peer
peerCopies = append(peerCopies, &peerCopy)
}
return peerCopies
}
// for peer := range registry.PeersByScore() {

View file

@ -65,6 +65,15 @@ func TestPeer_Registry_AddPeer_Good(t *testing.T) {
t.Errorf("expected 1 peer, got %d", pr.Count())
}
peer.Name = "Mutated after add"
stored := pr.GetPeer("test-peer-1")
if stored == nil {
t.Fatal("expected peer to exist after add")
}
if stored.Name != "Test Peer" {
t.Errorf("expected stored peer to remain unchanged, got %q", stored.Name)
}
// Try to add duplicate
err = pr.AddPeer(peer)
if err == nil {
@ -634,6 +643,34 @@ func TestPeer_Registry_PeersSortedByScore_Good(t *testing.T) {
if sorted[2].ID != "low-score" {
t.Errorf("third peer should be low-score, got %s", sorted[2].ID)
}
sorted[0].Name = "Mutated"
restored := pr.GetPeer("high-score")
if restored == nil {
t.Fatal("expected high-score peer to still exist")
}
if restored.Name != "High" {
t.Errorf("expected registry peer to remain unchanged, got %q", restored.Name)
}
}
func TestPeer_Registry_NilPeerInputs_Bad(t *testing.T) {
pr, cleanup := setupTestPeerRegistry(t)
defer cleanup()
t.Run("AddPeer", func(t *testing.T) {
err := pr.AddPeer(nil)
if err == nil {
t.Fatal("expected error when adding nil peer")
}
})
t.Run("UpdatePeer", func(t *testing.T) {
err := pr.UpdatePeer(nil)
if err == nil {
t.Fatal("expected error when updating nil peer")
}
})
}
// --- Additional coverage tests for peer.go ---
@ -736,6 +773,19 @@ func TestPeer_Registry_UpdatePeer_Good(t *testing.T) {
if updated.Score != 80 {
t.Errorf("expected score 80, got %f", updated.Score)
}
peer.Name = "Mutated after update"
peer.Score = 12
stored := pr.GetPeer("update-test")
if stored == nil {
t.Fatal("expected peer to exist after update mutation")
}
if stored.Name != "Updated" {
t.Errorf("expected stored peer name to remain Updated, got %q", stored.Name)
}
if stored.Score != 80 {
t.Errorf("expected stored peer score to remain 80, got %f", stored.Score)
}
}
func TestPeer_Registry_UpdateMetrics_NotFound_Bad(t *testing.T) {

View file

@ -107,22 +107,45 @@ func NewMessageDeduplicator(retentionWindow time.Duration) *MessageDeduplicator
return d
}
// IsDuplicate checks whether a message ID is still within the retention window.
// duplicate := deduplicator.IsDuplicate(message.ID)
func (d *MessageDeduplicator) IsDuplicate(msgID string) bool {
d.mutex.RLock()
_, exists := d.recentMessageTimes[msgID]
seenAt, exists := d.recentMessageTimes[msgID]
retentionWindow := d.timeToLive
d.mutex.RUnlock()
return exists
if !exists {
return false
}
if retentionWindow > 0 && time.Since(seenAt) <= retentionWindow {
return true
}
d.mutex.Lock()
defer d.mutex.Unlock()
seenAt, exists = d.recentMessageTimes[msgID]
if !exists {
return false
}
if retentionWindow <= 0 || time.Since(seenAt) > retentionWindow {
delete(d.recentMessageTimes, msgID)
return false
}
return true
}
// Mark records a message ID as recently seen.
// deduplicator.Mark(message.ID)
func (d *MessageDeduplicator) Mark(msgID string) {
d.mutex.Lock()
d.recentMessageTimes[msgID] = time.Now()
d.mutex.Unlock()
}
// Cleanup removes expired entries from the deduplicator.
// deduplicator.Cleanup()
func (d *MessageDeduplicator) Cleanup() {
d.mutex.Lock()
defer d.mutex.Unlock()

View file

@ -153,6 +153,17 @@ func TestTransport_MessageDeduplicator_Good(t *testing.T) {
}
})
t.Run("ExpiredEntriesDoNotLinger", func(t *testing.T) {
d := NewMessageDeduplicator(50 * time.Millisecond)
d.Mark("msg-1")
time.Sleep(75 * time.Millisecond)
if d.IsDuplicate("msg-1") {
t.Error("should not be duplicate after TTL even before cleanup runs")
}
})
t.Run("ConcurrentAccess", func(t *testing.T) {
d := NewMessageDeduplicator(5 * time.Minute)
var wg sync.WaitGroup