From 1badcf1877ce40d8b3bfedc0d83d3e636cf2a638 Mon Sep 17 00:00:00 2001 From: Virgil Date: Tue, 31 Mar 2026 05:44:02 +0000 Subject: [PATCH] fix(transport): expose accepted peers before handshake ack Co-Authored-By: Virgil --- node/transport.go | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/node/transport.go b/node/transport.go index 55871f5..595f0c4 100644 --- a/node/transport.go +++ b/node/transport.go @@ -416,6 +416,8 @@ func (t *Transport) OnMessage(handler MessageHandler) { } // Connect dials a peer, completes the handshake, and starts the session loops. +// +// pc, err := transport.Connect(&Peer{ID: "worker-1", Address: "127.0.0.1:9091"}) func (t *Transport) Connect(peer *Peer) (*PeerConnection, error) { // Build WebSocket URL scheme := "ws" @@ -679,7 +681,6 @@ func (t *Transport) handleWebSocketUpgrade(w http.ResponseWriter, r *http.Reques rateLimiter: NewPeerRateLimiter(100, 50), // 100 burst, 50/sec refill } - // Send handshake acknowledgment identity := t.nodeManager.GetIdentity() if identity == nil { conn.Close() @@ -711,16 +712,18 @@ func (t *Transport) handleWebSocketUpgrade(w http.ResponseWriter, r *http.Reques return } - if err := conn.WriteMessage(websocket.TextMessage, ackData); err != nil { - conn.Close() - return - } - - // Store connection + // Make the accepted connection visible before the client reads the ack. + // Connect() returns only after that read completes, so this keeps the + // server registry aligned with the caller's view of the handshake. t.mutex.Lock() t.connections[peer.ID] = pc t.mutex.Unlock() + if err := conn.WriteMessage(websocket.TextMessage, ackData); err != nil { + t.removeConnection(pc) + return + } + // Update registry t.peerRegistry.SetConnected(peer.ID, true)