fix(transport): expose accepted peers before handshake ack

Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
Virgil 2026-03-31 05:44:02 +00:00
parent 1c8592a5a0
commit 1badcf1877

View file

@ -416,6 +416,8 @@ func (t *Transport) OnMessage(handler MessageHandler) {
}
// Connect dials a peer, completes the handshake, and starts the session loops.
//
// pc, err := transport.Connect(&Peer{ID: "worker-1", Address: "127.0.0.1:9091"})
func (t *Transport) Connect(peer *Peer) (*PeerConnection, error) {
// Build WebSocket URL
scheme := "ws"
@ -679,7 +681,6 @@ func (t *Transport) handleWebSocketUpgrade(w http.ResponseWriter, r *http.Reques
rateLimiter: NewPeerRateLimiter(100, 50), // 100 burst, 50/sec refill
}
// Send handshake acknowledgment
identity := t.nodeManager.GetIdentity()
if identity == nil {
conn.Close()
@ -711,16 +712,18 @@ func (t *Transport) handleWebSocketUpgrade(w http.ResponseWriter, r *http.Reques
return
}
if err := conn.WriteMessage(websocket.TextMessage, ackData); err != nil {
conn.Close()
return
}
// Store connection
// Make the accepted connection visible before the client reads the ack.
// Connect() returns only after that read completes, so this keeps the
// server registry aligned with the caller's view of the handshake.
t.mutex.Lock()
t.connections[peer.ID] = pc
t.mutex.Unlock()
if err := conn.WriteMessage(websocket.TextMessage, ackData); err != nil {
t.removeConnection(pc)
return
}
// Update registry
t.peerRegistry.SetConnected(peer.ID, true)