diff --git a/pkg/mining/events.go b/pkg/mining/events.go index 8f06fbb..8e0af9c 100644 --- a/pkg/mining/events.go +++ b/pkg/mining/events.go @@ -114,29 +114,29 @@ func NewEventHubWithOptions(maxConnections int) *EventHub { // hub := NewEventHub() // go hub.Run() // blocks until hub.Stop() is called -func (h *EventHub) Run() { +func (hub *EventHub) Run() { for { select { - case <-h.stop: + case <-hub.stop: // Close all client connections - h.mutex.Lock() - for client := range h.clients { + hub.mutex.Lock() + for client := range hub.clients { client.safeClose() - delete(h.clients, client) + delete(hub.clients, client) } - h.mutex.Unlock() + hub.mutex.Unlock() return - case client := <-h.register: - h.mutex.Lock() - h.clients[client] = true - stateProvider := h.stateProvider - h.mutex.Unlock() - logging.Debug("client connected", logging.Fields{"total": len(h.clients)}) + case client := <-hub.register: + hub.mutex.Lock() + hub.clients[client] = true + stateProvider := hub.stateProvider + hub.mutex.Unlock() + logging.Debug("client connected", logging.Fields{"total": len(hub.clients)}) // Send initial state sync if provider is set if stateProvider != nil { - go func(c *wsClient) { + go func(wsconn *wsClient) { defer func() { if r := recover(); r != nil { logging.Error("panic in state sync goroutine", logging.Fields{"panic": r}) @@ -155,7 +155,7 @@ func (h *EventHub) Run() { return } select { - case c.send <- data: + case wsconn.send <- data: default: // Client buffer full } @@ -163,45 +163,45 @@ func (h *EventHub) Run() { }(client) } - case client := <-h.unregister: - h.mutex.Lock() - if _, ok := h.clients[client]; ok { - delete(h.clients, client) + case client := <-hub.unregister: + hub.mutex.Lock() + if _, ok := hub.clients[client]; ok { + delete(hub.clients, client) client.safeClose() // Decrement WebSocket connection metrics RecordWSConnection(false) } - h.mutex.Unlock() - logging.Debug("client disconnected", logging.Fields{"total": len(h.clients)}) + hub.mutex.Unlock() + logging.Debug("client disconnected", logging.Fields{"total": len(hub.clients)}) - case event := <-h.broadcast: + case event := <-hub.broadcast: data, err := MarshalJSON(event) if err != nil { logging.Error("failed to marshal event", logging.Fields{"error": err}) continue } - h.mutex.RLock() - for client := range h.clients { + hub.mutex.RLock() + for client := range hub.clients { // Check if client is subscribed to this miner - if h.shouldSendToClient(client, event) { + if hub.shouldSendToClient(client, event) { select { case client.send <- data: default: // Client buffer full, close connection - go func(c *wsClient) { - h.unregister <- c + go func(wsconn *wsClient) { + hub.unregister <- wsconn }(client) } } } - h.mutex.RUnlock() + hub.mutex.RUnlock() } } } -// if h.shouldSendToClient(client, event) { client.send <- data } -func (h *EventHub) shouldSendToClient(client *wsClient, event Event) bool { +// if hub.shouldSendToClient(client, event) { client.send <- data } +func (hub *EventHub) shouldSendToClient(client *wsClient, event Event) bool { // Always send pong and system events if event.Type == EventPong { return true @@ -243,36 +243,36 @@ func (h *EventHub) shouldSendToClient(client *wsClient, event Event) bool { } // hub.Stop() // safe to call multiple times; closes all client connections -func (h *EventHub) Stop() { - h.stopOnce.Do(func() { - close(h.stop) +func (hub *EventHub) Stop() { + hub.stopOnce.Do(func() { + close(hub.stop) }) } // hub.SetStateProvider(func() interface{} { return manager.GetState() }) -func (h *EventHub) SetStateProvider(provider StateProvider) { - h.mutex.Lock() - defer h.mutex.Unlock() - h.stateProvider = provider +func (hub *EventHub) SetStateProvider(provider StateProvider) { + hub.mutex.Lock() + defer hub.mutex.Unlock() + hub.stateProvider = provider } // hub.Broadcast(NewEvent(EventMinerStats, statsData)) -func (h *EventHub) Broadcast(event Event) { +func (hub *EventHub) Broadcast(event Event) { if event.Timestamp.IsZero() { event.Timestamp = time.Now() } select { - case h.broadcast <- event: + case hub.broadcast <- event: default: logging.Warn("broadcast channel full, dropping event", logging.Fields{"type": event.Type}) } } // if hub.ClientCount() == 0 { return } // skip broadcast when no listeners -func (h *EventHub) ClientCount() int { - h.mutex.RLock() - defer h.mutex.RUnlock() - return len(h.clients) +func (hub *EventHub) ClientCount() int { + hub.mutex.RLock() + defer hub.mutex.RUnlock() + return len(hub.clients) } // hub.Broadcast(NewEvent(EventMinerStarted, MinerEventData{Name: "xmrig"})) @@ -378,14 +378,14 @@ func (c *wsClient) readPump() { } // if !hub.ServeWs(conn) { c.JSON(http.StatusServiceUnavailable, gin.H{"error": "connection limit reached"}) } -func (h *EventHub) ServeWs(conn *websocket.Conn) bool { +func (hub *EventHub) ServeWs(conn *websocket.Conn) bool { // Check connection limit - h.mutex.RLock() - currentCount := len(h.clients) - h.mutex.RUnlock() + hub.mutex.RLock() + currentCount := len(hub.clients) + hub.mutex.RUnlock() - if currentCount >= h.maxConnections { - logging.Warn("connection rejected: limit reached", logging.Fields{"current": currentCount, "max": h.maxConnections}) + if currentCount >= hub.maxConnections { + logging.Warn("connection rejected: limit reached", logging.Fields{"current": currentCount, "max": hub.maxConnections}) conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseTryAgainLater, "connection limit reached")) conn.Close() @@ -395,11 +395,11 @@ func (h *EventHub) ServeWs(conn *websocket.Conn) bool { client := &wsClient{ conn: conn, send: make(chan []byte, 256), - hub: h, + hub: hub, miners: map[string]bool{"*": true}, // Subscribe to all by default } - h.register <- client + hub.register <- client // Start read/write pumps go client.writePump()