ax(node): rename Transport.mu to mutex (AX Principle 1)
Abbreviated field name `mu` is ambiguous — rename to `mutex` to satisfy AX-025 §1 (predictable names over short names). Co-Authored-By: Charon <charon@lethean.io>
This commit is contained in:
parent
df80dbddb5
commit
2ea46d8ec0
1 changed files with 23 additions and 23 deletions
|
|
@ -109,7 +109,7 @@ type Transport struct {
|
|||
registry *PeerRegistry
|
||||
handler MessageHandler
|
||||
dedup *MessageDeduplicator // Message deduplication
|
||||
mu sync.RWMutex
|
||||
mutex sync.RWMutex
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
waitGroup sync.WaitGroup
|
||||
|
|
@ -277,11 +277,11 @@ func (t *Transport) Stop() error {
|
|||
t.cancel()
|
||||
|
||||
// Gracefully close all connections with shutdown message
|
||||
t.mu.Lock()
|
||||
t.mutex.Lock()
|
||||
for _, pc := range t.conns {
|
||||
pc.GracefulClose("server shutdown", DisconnectShutdown)
|
||||
}
|
||||
t.mu.Unlock()
|
||||
t.mutex.Unlock()
|
||||
|
||||
// Shutdown HTTP server if it was started
|
||||
if t.server != nil {
|
||||
|
|
@ -300,8 +300,8 @@ func (t *Transport) Stop() error {
|
|||
// OnMessage sets the handler for incoming messages.
|
||||
// Must be called before Start() to avoid races.
|
||||
func (t *Transport) OnMessage(handler MessageHandler) {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
t.mutex.Lock()
|
||||
defer t.mutex.Unlock()
|
||||
t.handler = handler
|
||||
}
|
||||
|
||||
|
|
@ -339,9 +339,9 @@ func (t *Transport) Connect(peer *Peer) (*PeerConnection, error) {
|
|||
}
|
||||
|
||||
// Store connection using the real peer ID from handshake
|
||||
t.mu.Lock()
|
||||
t.mutex.Lock()
|
||||
t.conns[pc.Peer.ID] = pc
|
||||
t.mu.Unlock()
|
||||
t.mutex.Unlock()
|
||||
|
||||
logging.Debug("connected to peer", logging.Fields{"peer_id": pc.Peer.ID, "secret_len": len(pc.SharedSecret)})
|
||||
|
||||
|
|
@ -363,9 +363,9 @@ func (t *Transport) Connect(peer *Peer) (*PeerConnection, error) {
|
|||
|
||||
// if err := transport.Send(peer.ID, msg); err != nil { return err }
|
||||
func (t *Transport) Send(peerID string, msg *Message) error {
|
||||
t.mu.RLock()
|
||||
t.mutex.RLock()
|
||||
pc, exists := t.conns[peerID]
|
||||
t.mu.RUnlock()
|
||||
t.mutex.RUnlock()
|
||||
|
||||
if !exists {
|
||||
return fmt.Errorf("peer %s not connected", peerID)
|
||||
|
|
@ -377,7 +377,7 @@ func (t *Transport) Send(peerID string, msg *Message) error {
|
|||
// Broadcast sends a message to all connected peers except the sender.
|
||||
// The sender is identified by msg.From and excluded to prevent echo.
|
||||
func (t *Transport) Broadcast(msg *Message) error {
|
||||
t.mu.RLock()
|
||||
t.mutex.RLock()
|
||||
conns := make([]*PeerConnection, 0, len(t.conns))
|
||||
for _, pc := range t.conns {
|
||||
// Exclude sender from broadcast to prevent echo (P2P-MED-6)
|
||||
|
|
@ -386,7 +386,7 @@ func (t *Transport) Broadcast(msg *Message) error {
|
|||
}
|
||||
conns = append(conns, pc)
|
||||
}
|
||||
t.mu.RUnlock()
|
||||
t.mutex.RUnlock()
|
||||
|
||||
var lastErr error
|
||||
for _, pc := range conns {
|
||||
|
|
@ -399,17 +399,17 @@ func (t *Transport) Broadcast(msg *Message) error {
|
|||
|
||||
// GetConnection returns an active connection to a peer.
|
||||
func (t *Transport) GetConnection(peerID string) *PeerConnection {
|
||||
t.mu.RLock()
|
||||
defer t.mu.RUnlock()
|
||||
t.mutex.RLock()
|
||||
defer t.mutex.RUnlock()
|
||||
return t.conns[peerID]
|
||||
}
|
||||
|
||||
// handleWSUpgrade handles incoming WebSocket connections.
|
||||
func (t *Transport) handleWSUpgrade(w http.ResponseWriter, r *http.Request) {
|
||||
// Enforce MaxConns limit (including pending connections during handshake)
|
||||
t.mu.RLock()
|
||||
t.mutex.RLock()
|
||||
currentConns := len(t.conns)
|
||||
t.mu.RUnlock()
|
||||
t.mutex.RUnlock()
|
||||
pendingConns := int(t.pendingConns.Load())
|
||||
|
||||
totalConns := currentConns + pendingConns
|
||||
|
|
@ -583,9 +583,9 @@ func (t *Transport) handleWSUpgrade(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
|
||||
// Store connection
|
||||
t.mu.Lock()
|
||||
t.mutex.Lock()
|
||||
t.conns[peer.ID] = pc
|
||||
t.mu.Unlock()
|
||||
t.mutex.Unlock()
|
||||
|
||||
// Update registry
|
||||
t.registry.SetConnected(peer.ID, true)
|
||||
|
|
@ -764,9 +764,9 @@ func (t *Transport) readLoop(pc *PeerConnection) {
|
|||
}
|
||||
|
||||
// Dispatch to handler (read handler under lock to avoid race)
|
||||
t.mu.RLock()
|
||||
t.mutex.RLock()
|
||||
handler := t.handler
|
||||
t.mu.RUnlock()
|
||||
t.mutex.RUnlock()
|
||||
if handler != nil {
|
||||
handler(pc, msg)
|
||||
}
|
||||
|
|
@ -810,9 +810,9 @@ func (t *Transport) keepalive(pc *PeerConnection) {
|
|||
|
||||
// removeConnection removes and cleans up a connection.
|
||||
func (t *Transport) removeConnection(pc *PeerConnection) {
|
||||
t.mu.Lock()
|
||||
t.mutex.Lock()
|
||||
delete(t.conns, pc.Peer.ID)
|
||||
t.mu.Unlock()
|
||||
t.mutex.Unlock()
|
||||
|
||||
t.registry.SetConnected(pc.Peer.ID, false)
|
||||
pc.Close()
|
||||
|
|
@ -931,7 +931,7 @@ func (t *Transport) decryptMessage(data []byte, sharedSecret []byte) (*Message,
|
|||
// n := transport.ConnectedPeers()
|
||||
// if n == 0 { return ErrNoPeers }
|
||||
func (t *Transport) ConnectedPeers() int {
|
||||
t.mu.RLock()
|
||||
defer t.mu.RUnlock()
|
||||
t.mutex.RLock()
|
||||
defer t.mutex.RUnlock()
|
||||
return len(t.conns)
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue