diff --git a/pkg/node/controller.go b/pkg/node/controller.go index 917afa0..f4ad02f 100644 --- a/pkg/node/controller.go +++ b/pkg/node/controller.go @@ -102,13 +102,13 @@ func (controller *Controller) sendRequest(peerID string, msg *Message, timeout t } // Wait for response - ctx, cancel := context.WithTimeout(context.Background(), timeout) + requestContext, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() select { case response := <-responseChannel: return response, nil - case <-ctx.Done(): + case <-requestContext.Done(): return nil, fmt.Errorf("request timeout") } } diff --git a/pkg/node/transport.go b/pkg/node/transport.go index 8e8f85a..b2e30bc 100644 --- a/pkg/node/transport.go +++ b/pkg/node/transport.go @@ -111,16 +111,16 @@ type Transport struct { config TransportConfig server *http.Server upgrader websocket.Upgrader - connections map[string]*PeerConnection // peer ID -> connection - pendingConns atomic.Int32 // tracks connections during handshake - node *NodeManager - registry *PeerRegistry - handler MessageHandler - deduplicator *MessageDeduplicator // Message deduplication - mutex sync.RWMutex - ctx context.Context - cancel context.CancelFunc - waitGroup sync.WaitGroup + connections map[string]*PeerConnection // peer ID -> connection + pendingConns atomic.Int32 // tracks connections during handshake + node *NodeManager + registry *PeerRegistry + handler MessageHandler + deduplicator *MessageDeduplicator // Message deduplication + mutex sync.RWMutex + cancelContext context.Context + cancel context.CancelFunc + waitGroup sync.WaitGroup } // limiter := node.NewPeerRateLimiter(100, 50) // 100 burst capacity, 50 tokens/sec refill @@ -185,13 +185,13 @@ type PeerConnection struct { // t := node.NewTransport(nodeManager, peerRegistry, cfg) // if err := t.Start(); err != nil { return err } func NewTransport(node *NodeManager, registry *PeerRegistry, config TransportConfig) *Transport { - ctx, cancel := context.WithCancel(context.Background()) + cancelContext, cancel := context.WithCancel(context.Background()) return &Transport{ config: config, node: node, registry: registry, - connections: make(map[string]*PeerConnection), + connections: make(map[string]*PeerConnection), deduplicator: NewMessageDeduplicator(5 * time.Minute), // 5 minute TTL for dedup upgrader: websocket.Upgrader{ ReadBufferSize: 1024, @@ -211,8 +211,8 @@ func NewTransport(node *NodeManager, registry *PeerRegistry, config TransportCon return host == "localhost" || host == "127.0.0.1" || host == "::1" }, }, - ctx: ctx, - cancel: cancel, + cancelContext: cancelContext, + cancel: cancel, } } @@ -277,7 +277,7 @@ func (transport *Transport) Start() error { defer ticker.Stop() for { select { - case <-transport.ctx.Done(): + case <-transport.cancelContext.Done(): return case <-ticker.C: transport.deduplicator.Cleanup() @@ -301,10 +301,10 @@ func (transport *Transport) Stop() error { // Shutdown HTTP server if it was started if transport.server != nil { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + shutdownContext, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - if err := transport.server.Shutdown(ctx); err != nil { + if err := transport.server.Shutdown(shutdownContext); err != nil { return fmt.Errorf("server shutdown error: %w", err) } } @@ -735,7 +735,7 @@ func (transport *Transport) readLoop(pc *PeerConnection) { for { select { - case <-transport.ctx.Done(): + case <-transport.cancelContext.Done(): return default: } @@ -799,7 +799,7 @@ func (transport *Transport) keepalive(pc *PeerConnection) { for { select { - case <-transport.ctx.Done(): + case <-transport.cancelContext.Done(): return case <-ticker.C: // Check if connection is still alive