diff --git a/docs/architecture.md b/docs/architecture.md index bb4b90c..8771504 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -60,7 +60,7 @@ The `Transport` manages a WebSocket server (gorilla/websocket) and outbound conn **Keepalive**: A goroutine per connection ticks at `PingInterval`. If `LastActivity` has not been updated within `PingInterval + PongTimeout`, the connection is removed. -**Graceful close**: `GracefulClose` sends `MsgDisconnect` before closing the underlying WebSocket. Write deadlines are managed exclusively inside `Send()` under `writeMu` to prevent the race (P2P-RACE-1) where a bare `SetWriteDeadline` call could race with concurrent sends. +**Graceful close**: `GracefulClose` sends `MsgDisconnect` before closing the underlying WebSocket. Write deadlines are managed exclusively inside `Send()` under `writeMutex` to prevent the race (P2P-RACE-1) where a bare `SetWriteDeadline` call could race with concurrent sends. **Buffer pool**: `MarshalJSON` uses a `sync.Pool` of `bytes.Buffer` (initial capacity 1 KB, maximum pooled size 64 KB) to reduce allocation pressure in the message serialisation hot path. HTML escaping is disabled to match `json.Marshal` semantics. @@ -70,15 +70,15 @@ The `Transport` manages a WebSocket server (gorilla/websocket) and outbound conn **Peer fields persisted**: - `ID`, `Name`, `PublicKey`, `Address`, `Role`, `AddedAt`, `LastSeen` -- `PingMS`, `Hops`, `GeoKM`, `Score` (float64, 0–100) +- `PingMilliseconds`, `Hops`, `GeographicKilometres`, `Score` (float64, 0–100) **KD-tree dimensions** (lower is better in all axes): | Dimension | Weight | Rationale | |-----------|--------|-----------| -| `PingMS` | 1.0 | Latency dominates interactive performance | +| `PingMilliseconds` | 1.0 | Latency dominates interactive performance | | `Hops` | 0.7 | Network hop count (routing cost) | -| `GeoKM` | 0.2 | Geographic distance (minor factor) | +| `GeographicKilometres` | 0.2 | Geographic distance (minor factor) | | `100 - Score` | 1.2 | Reliability (inverted so lower = better peer) | `SelectOptimalPeer()` queries the tree for the point nearest to the origin (ideal: zero latency, zero hops, zero distance, maximum score). `SelectNearestPeers(n)` returns the n best. @@ -236,7 +236,7 @@ A global logger instance is available via `logging.Debug(...)`, `logging.Info(.. |----------|------------| | `Transport.conns` | `sync.RWMutex` | | `Transport.handler` | `sync.RWMutex` | -| `PeerConnection` writes | `sync.Mutex` (`writeMu`) | +| `PeerConnection` writes | `sync.Mutex` (`writeMutex`) | | `PeerConnection` close | `sync.Once` (`closeOnce`) | | `PeerRegistry.peers` + KD-tree | `sync.RWMutex` | | `PeerRegistry.allowedPublicKeys` | separate `sync.RWMutex` | diff --git a/docs/discovery.md b/docs/discovery.md index 3423165..452bb94 100644 --- a/docs/discovery.md +++ b/docs/discovery.md @@ -20,9 +20,9 @@ type Peer struct { LastSeen time.Time `json:"lastSeen"` // Poindexter metrics (updated dynamically) - PingMS float64 `json:"pingMs"` // Latency in milliseconds + PingMilliseconds float64 `json:"pingMs"` // Latency in milliseconds Hops int `json:"hops"` // Network hop count - GeoKM float64 `json:"geoKm"` // Geographic distance in kilometres + GeographicKilometres float64 `json:"geoKm"` // Geographic distance in kilometres Score float64 `json:"score"` // Reliability score 0--100 Connected bool `json:"-"` // Not persisted @@ -83,9 +83,9 @@ The registry maintains a 4-dimensional KD-tree for optimal peer selection. Each | Dimension | Source | Weight | Direction | |-----------|--------|--------|-----------| -| Latency | `PingMS` | 1.0 | Lower is better | +| Latency | `PingMilliseconds` | 1.0 | Lower is better | | Hops | `Hops` | 0.7 | Lower is better | -| Geographic distance | `GeoKM` | 0.2 | Lower is better | +| Geographic distance | `GeographicKilometres` | 0.2 | Lower is better | | Reliability | `100 - Score` | 1.2 | Inverted so lower is better | The score dimension is inverted so that the "ideal peer" target point `[0, 0, 0, 0]` represents zero latency, zero hops, zero distance, and maximum reliability (score 100). diff --git a/docs/history.md b/docs/history.md index 5c42f56..5c42f07 100644 --- a/docs/history.md +++ b/docs/history.md @@ -31,7 +31,7 @@ Tests covered: - MaxConnections enforcement: 503 HTTP rejection when limit is reached - Keepalive timeout: connection cleaned up after `PingInterval + PongTimeout` elapses - Graceful close: `MsgDisconnect` sent before underlying WebSocket close -- Concurrent sends: no data races under `go test -race` (`writeMu` protects all writes) +- Concurrent sends: no data races under `go test -race` (`writeMutex` protects all writes) ### Phase 3 — Controller Tests @@ -104,9 +104,9 @@ The originally identified risk — that `transport.OnMessage(c.handleResponse)` ### P2P-RACE-1 — GracefulClose Data Race (Phase 3) -`GracefulClose` previously called `pc.Conn.SetWriteDeadline()` outside of `writeMu`, racing with concurrent `Send()` calls that also set the write deadline. Detected by `go test -race`. +`GracefulClose` previously called `pc.Conn.SetWriteDeadline()` outside of `writeMutex`, racing with concurrent `Send()` calls that also set the write deadline. Detected by `go test -race`. -Fix: removed the bare `SetWriteDeadline` call from `GracefulClose`. The method now relies entirely on `Send()`, which manages write deadlines under `writeMu`. This is documented in a comment in `transport.go` to prevent the pattern from being reintroduced. +Fix: removed the bare `SetWriteDeadline` call from `GracefulClose`. The method now relies entirely on `Send()`, which manages write deadlines under `writeMutex`. This is documented in a comment in `transport.go` to prevent the pattern from being reintroduced. ## Wiki Corrections (19 February 2026) diff --git a/docs/identity.md b/docs/identity.md index b102683..05c1a58 100644 --- a/docs/identity.md +++ b/docs/identity.md @@ -39,13 +39,13 @@ Paths follow XDG base directories via `github.com/adrg/xdg`. The private key is ### Creating an Identity ```go -nm, err := node.NewNodeManager() +nodeManager, err := node.NewNodeManager() if err != nil { log.Fatal(err) } // Generate a new identity (persists key and config to disk) -err = nm.GenerateIdentity("eu-controller-01", node.RoleController) +err = nodeManager.GenerateIdentity("eu-controller-01", node.RoleController) ``` Internally this calls `stmf.GenerateKeyPair()` from the Borg library to produce the X25519 keypair. @@ -53,7 +53,7 @@ Internally this calls `stmf.GenerateKeyPair()` from the Borg library to produce ### Custom Paths (Testing) ```go -nm, err := node.NewNodeManagerFromPaths( +nodeManager, err := node.NewNodeManagerFromPaths( "/tmp/test/private.key", "/tmp/test/node.json", ) @@ -62,8 +62,8 @@ nm, err := node.NewNodeManagerFromPaths( ### Checking and Retrieving Identity ```go -if nm.HasIdentity() { - identity := nm.GetIdentity() // Returns a copy +if nodeManager.HasIdentity() { + identity := nodeManager.GetIdentity() // Returns a copy fmt.Println(identity.ID, identity.Name) } ``` @@ -73,7 +73,7 @@ if nm.HasIdentity() { ### Deriving Shared Secrets ```go -sharedSecret, err := nm.DeriveSharedSecret(peerPublicKeyBase64) +sharedSecret, err := nodeManager.DeriveSharedSecret(peerPublicKeyBase64) ``` This performs X25519 ECDH with the peer's public key and hashes the result with SHA-256, producing a 32-byte symmetric key. The same shared secret is derived independently by both sides (no secret is transmitted). @@ -81,7 +81,7 @@ This performs X25519 ECDH with the peer's public key and hashes the result with ### Deleting an Identity ```go -err := nm.Delete() // Removes key and config from disk, clears in-memory state +err := nodeManager.Delete() // Removes key and config from disk, clears in-memory state ``` ## Challenge-Response Authentication diff --git a/docs/transport.md b/docs/transport.md index 5aad542..07a25e6 100644 --- a/docs/transport.md +++ b/docs/transport.md @@ -25,7 +25,7 @@ type TransportConfig struct { Sensible defaults via `DefaultTransportConfig()`: ```go -cfg := node.DefaultTransportConfig() +transportConfig := node.DefaultTransportConfig() // ListenAddr: ":9091", WebSocketPath: "/ws", MaxConnections: 100 // MaxMessageSize: 1MB, PingInterval: 30s, PongTimeout: 10s ``` @@ -33,10 +33,10 @@ cfg := node.DefaultTransportConfig() ## Creating and Starting ```go -transport := node.NewTransport(nodeManager, peerRegistry, cfg) +transport := node.NewTransport(nodeManager, peerRegistry, transportConfig) // Set message handler before Start() to avoid races -transport.OnMessage(func(conn *node.PeerConnection, msg *node.Message) { +transport.OnMessage(func(peerConnection *node.PeerConnection, msg *node.Message) { // Handle incoming messages }) @@ -96,15 +96,15 @@ type PeerConnection struct { ### Sending Messages ```go -err := peerConn.Send(msg) +err := peerConnection.Send(msg) ``` -`Send()` serialises the message to JSON, encrypts it with SMSG, sets a 10-second write deadline, and writes as a binary WebSocket frame. A `writeMu` mutex serialises concurrent writes. +`Send()` serialises the message to JSON, encrypts it with SMSG, sets a 10-second write deadline, and writes as a binary WebSocket frame. A `writeMutex` serialises concurrent writes. ### Graceful Close ```go -err := peerConn.GracefulClose("shutting down", node.DisconnectShutdown) +err := peerConnection.GracefulClose("shutting down", node.DisconnectShutdown) ``` Sends a `disconnect` message (best-effort) before closing the connection. Uses `sync.Once` to ensure the connection is only closed once. diff --git a/node/bundle.go b/node/bundle.go index f1c7342..30e409b 100644 --- a/node/bundle.go +++ b/node/bundle.go @@ -347,9 +347,7 @@ func extractTarball(tarData []byte, destDir string) (string, error) { return firstExecutable, nil } -// StreamBundle writes a bundle to a writer (for large transfers). -// -// err := StreamBundle(bundle, writer) +// err := StreamBundle(bundle, writer) func StreamBundle(bundle *Bundle, w io.Writer) error { result := core.JSONMarshal(bundle) if !result.OK { @@ -359,9 +357,7 @@ func StreamBundle(bundle *Bundle, w io.Writer) error { return err } -// ReadBundle reads a bundle from a reader. -// -// bundle, err := ReadBundle(reader) +// bundle, err := ReadBundle(reader) func ReadBundle(r io.Reader) (*Bundle, error) { var buf bytes.Buffer if _, err := io.Copy(&buf, r); err != nil { diff --git a/node/identity.go b/node/identity.go index ae5d1f1..7bb3a1d 100644 --- a/node/identity.go +++ b/node/identity.go @@ -87,10 +87,9 @@ func NewNodeManager() (*NodeManager, error) { return NewNodeManagerFromPaths(keyPath, configPath) } -// Missing files are treated as a fresh install; malformed or partial identity -// state is returned as an error so callers can handle it explicitly. -// // nodeManager, err := NewNodeManagerFromPaths("/srv/p2p/private.key", "/srv/p2p/node.json") +// Missing files are treated as a fresh install; malformed or partial identity +// state returns an error so callers can handle it explicitly. func NewNodeManagerFromPaths(keyPath, configPath string) (*NodeManager, error) { nm := &NodeManager{ keyPath: keyPath, diff --git a/node/peer.go b/node/peer.go index 42f8801..27a80d7 100644 --- a/node/peer.go +++ b/node/peer.go @@ -131,10 +131,9 @@ func NewPeerRegistry() (*PeerRegistry, error) { return NewPeerRegistryFromPath(peersPath) } -// Missing files are treated as an empty registry; malformed registry files are -// returned as errors so callers can repair the persisted state. -// // peerRegistry, err := NewPeerRegistryFromPath("/srv/p2p/peers.json") +// Missing files are treated as an empty registry; malformed registry files +// return an error so callers can repair the persisted state. func NewPeerRegistryFromPath(peersPath string) (*PeerRegistry, error) { pr := &PeerRegistry{ peers: make(map[string]*Peer), @@ -248,6 +247,13 @@ func (r *PeerRegistry) AllowedPublicKeys() iter.Seq[string] { // // err := registry.AddPeer(&Peer{ID: "worker-1", Address: "10.0.0.1:9091", Role: RoleWorker}) func (r *PeerRegistry) AddPeer(peer *Peer) error { + if peer == nil { + return core.E("PeerRegistry.AddPeer", "peer is nil", nil) + } + + peerCopy := *peer + peer = &peerCopy + r.mu.Lock() if peer.ID == "" { @@ -271,7 +277,7 @@ func (r *PeerRegistry) AddPeer(peer *Peer) error { peer.AddedAt = time.Now() } if peer.Score == 0 { - peer.Score = 50 // Default neutral score + peer.Score = ScoreDefault } r.peers[peer.ID] = peer @@ -286,6 +292,17 @@ func (r *PeerRegistry) AddPeer(peer *Peer) error { // // err := registry.UpdatePeer(&Peer{ID: "worker-1", Score: 90}) func (r *PeerRegistry) UpdatePeer(peer *Peer) error { + if peer == nil { + return core.E("PeerRegistry.UpdatePeer", "peer is nil", nil) + } + + if peer.ID == "" { + return core.E("PeerRegistry.UpdatePeer", "peer ID is required", nil) + } + + peerCopy := *peer + peer = &peerCopy + r.mu.Lock() if _, exists := r.peers[peer.ID]; !exists { @@ -502,7 +519,13 @@ func (r *PeerRegistry) GetPeersByScore() []*Peer { return 0 }) - return peers + peerCopies := make([]*Peer, 0, len(peers)) + for _, peer := range peers { + peerCopy := *peer + peerCopies = append(peerCopies, &peerCopy) + } + + return peerCopies } // for peer := range registry.PeersByScore() { diff --git a/node/peer_test.go b/node/peer_test.go index b10979c..45d6f0f 100644 --- a/node/peer_test.go +++ b/node/peer_test.go @@ -65,6 +65,15 @@ func TestPeer_Registry_AddPeer_Good(t *testing.T) { t.Errorf("expected 1 peer, got %d", pr.Count()) } + peer.Name = "Mutated after add" + stored := pr.GetPeer("test-peer-1") + if stored == nil { + t.Fatal("expected peer to exist after add") + } + if stored.Name != "Test Peer" { + t.Errorf("expected stored peer to remain unchanged, got %q", stored.Name) + } + // Try to add duplicate err = pr.AddPeer(peer) if err == nil { @@ -634,6 +643,34 @@ func TestPeer_Registry_PeersSortedByScore_Good(t *testing.T) { if sorted[2].ID != "low-score" { t.Errorf("third peer should be low-score, got %s", sorted[2].ID) } + + sorted[0].Name = "Mutated" + restored := pr.GetPeer("high-score") + if restored == nil { + t.Fatal("expected high-score peer to still exist") + } + if restored.Name != "High" { + t.Errorf("expected registry peer to remain unchanged, got %q", restored.Name) + } +} + +func TestPeer_Registry_NilPeerInputs_Bad(t *testing.T) { + pr, cleanup := setupTestPeerRegistry(t) + defer cleanup() + + t.Run("AddPeer", func(t *testing.T) { + err := pr.AddPeer(nil) + if err == nil { + t.Fatal("expected error when adding nil peer") + } + }) + + t.Run("UpdatePeer", func(t *testing.T) { + err := pr.UpdatePeer(nil) + if err == nil { + t.Fatal("expected error when updating nil peer") + } + }) } // --- Additional coverage tests for peer.go --- @@ -736,6 +773,19 @@ func TestPeer_Registry_UpdatePeer_Good(t *testing.T) { if updated.Score != 80 { t.Errorf("expected score 80, got %f", updated.Score) } + + peer.Name = "Mutated after update" + peer.Score = 12 + stored := pr.GetPeer("update-test") + if stored == nil { + t.Fatal("expected peer to exist after update mutation") + } + if stored.Name != "Updated" { + t.Errorf("expected stored peer name to remain Updated, got %q", stored.Name) + } + if stored.Score != 80 { + t.Errorf("expected stored peer score to remain 80, got %f", stored.Score) + } } func TestPeer_Registry_UpdateMetrics_NotFound_Bad(t *testing.T) { diff --git a/node/transport.go b/node/transport.go index f537787..9c93165 100644 --- a/node/transport.go +++ b/node/transport.go @@ -107,22 +107,45 @@ func NewMessageDeduplicator(retentionWindow time.Duration) *MessageDeduplicator return d } -// IsDuplicate checks whether a message ID is still within the retention window. +// duplicate := deduplicator.IsDuplicate(message.ID) func (d *MessageDeduplicator) IsDuplicate(msgID string) bool { d.mutex.RLock() - _, exists := d.recentMessageTimes[msgID] + seenAt, exists := d.recentMessageTimes[msgID] + retentionWindow := d.timeToLive d.mutex.RUnlock() - return exists + + if !exists { + return false + } + + if retentionWindow > 0 && time.Since(seenAt) <= retentionWindow { + return true + } + + d.mutex.Lock() + defer d.mutex.Unlock() + + seenAt, exists = d.recentMessageTimes[msgID] + if !exists { + return false + } + + if retentionWindow <= 0 || time.Since(seenAt) > retentionWindow { + delete(d.recentMessageTimes, msgID) + return false + } + + return true } -// Mark records a message ID as recently seen. +// deduplicator.Mark(message.ID) func (d *MessageDeduplicator) Mark(msgID string) { d.mutex.Lock() d.recentMessageTimes[msgID] = time.Now() d.mutex.Unlock() } -// Cleanup removes expired entries from the deduplicator. +// deduplicator.Cleanup() func (d *MessageDeduplicator) Cleanup() { d.mutex.Lock() defer d.mutex.Unlock() diff --git a/node/transport_test.go b/node/transport_test.go index 24969b1..16acd46 100644 --- a/node/transport_test.go +++ b/node/transport_test.go @@ -153,6 +153,17 @@ func TestTransport_MessageDeduplicator_Good(t *testing.T) { } }) + t.Run("ExpiredEntriesDoNotLinger", func(t *testing.T) { + d := NewMessageDeduplicator(50 * time.Millisecond) + d.Mark("msg-1") + + time.Sleep(75 * time.Millisecond) + + if d.IsDuplicate("msg-1") { + t.Error("should not be duplicate after TTL even before cleanup runs") + } + }) + t.Run("ConcurrentAccess", func(t *testing.T) { d := NewMessageDeduplicator(5 * time.Minute) var wg sync.WaitGroup