ax(mining): rename EventHub receiver from h to hub for AX Principle 1 compliance
Single-letter receiver `h` on all EventHub methods is a predictability violation — `hub` carries semantic meaning across the file. Co-Authored-By: Charon <charon@lethean.io>
This commit is contained in:
parent
0ef766ee36
commit
e560e93ef3
1 changed files with 50 additions and 50 deletions
|
|
@ -114,29 +114,29 @@ func NewEventHubWithOptions(maxConnections int) *EventHub {
|
|||
|
||||
// hub := NewEventHub()
|
||||
// go hub.Run() // blocks until hub.Stop() is called
|
||||
func (h *EventHub) Run() {
|
||||
func (hub *EventHub) Run() {
|
||||
for {
|
||||
select {
|
||||
case <-h.stop:
|
||||
case <-hub.stop:
|
||||
// Close all client connections
|
||||
h.mutex.Lock()
|
||||
for client := range h.clients {
|
||||
hub.mutex.Lock()
|
||||
for client := range hub.clients {
|
||||
client.safeClose()
|
||||
delete(h.clients, client)
|
||||
delete(hub.clients, client)
|
||||
}
|
||||
h.mutex.Unlock()
|
||||
hub.mutex.Unlock()
|
||||
return
|
||||
|
||||
case client := <-h.register:
|
||||
h.mutex.Lock()
|
||||
h.clients[client] = true
|
||||
stateProvider := h.stateProvider
|
||||
h.mutex.Unlock()
|
||||
logging.Debug("client connected", logging.Fields{"total": len(h.clients)})
|
||||
case client := <-hub.register:
|
||||
hub.mutex.Lock()
|
||||
hub.clients[client] = true
|
||||
stateProvider := hub.stateProvider
|
||||
hub.mutex.Unlock()
|
||||
logging.Debug("client connected", logging.Fields{"total": len(hub.clients)})
|
||||
|
||||
// Send initial state sync if provider is set
|
||||
if stateProvider != nil {
|
||||
go func(c *wsClient) {
|
||||
go func(wsconn *wsClient) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
logging.Error("panic in state sync goroutine", logging.Fields{"panic": r})
|
||||
|
|
@ -155,7 +155,7 @@ func (h *EventHub) Run() {
|
|||
return
|
||||
}
|
||||
select {
|
||||
case c.send <- data:
|
||||
case wsconn.send <- data:
|
||||
default:
|
||||
// Client buffer full
|
||||
}
|
||||
|
|
@ -163,45 +163,45 @@ func (h *EventHub) Run() {
|
|||
}(client)
|
||||
}
|
||||
|
||||
case client := <-h.unregister:
|
||||
h.mutex.Lock()
|
||||
if _, ok := h.clients[client]; ok {
|
||||
delete(h.clients, client)
|
||||
case client := <-hub.unregister:
|
||||
hub.mutex.Lock()
|
||||
if _, ok := hub.clients[client]; ok {
|
||||
delete(hub.clients, client)
|
||||
client.safeClose()
|
||||
// Decrement WebSocket connection metrics
|
||||
RecordWSConnection(false)
|
||||
}
|
||||
h.mutex.Unlock()
|
||||
logging.Debug("client disconnected", logging.Fields{"total": len(h.clients)})
|
||||
hub.mutex.Unlock()
|
||||
logging.Debug("client disconnected", logging.Fields{"total": len(hub.clients)})
|
||||
|
||||
case event := <-h.broadcast:
|
||||
case event := <-hub.broadcast:
|
||||
data, err := MarshalJSON(event)
|
||||
if err != nil {
|
||||
logging.Error("failed to marshal event", logging.Fields{"error": err})
|
||||
continue
|
||||
}
|
||||
|
||||
h.mutex.RLock()
|
||||
for client := range h.clients {
|
||||
hub.mutex.RLock()
|
||||
for client := range hub.clients {
|
||||
// Check if client is subscribed to this miner
|
||||
if h.shouldSendToClient(client, event) {
|
||||
if hub.shouldSendToClient(client, event) {
|
||||
select {
|
||||
case client.send <- data:
|
||||
default:
|
||||
// Client buffer full, close connection
|
||||
go func(c *wsClient) {
|
||||
h.unregister <- c
|
||||
go func(wsconn *wsClient) {
|
||||
hub.unregister <- wsconn
|
||||
}(client)
|
||||
}
|
||||
}
|
||||
}
|
||||
h.mutex.RUnlock()
|
||||
hub.mutex.RUnlock()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// if h.shouldSendToClient(client, event) { client.send <- data }
|
||||
func (h *EventHub) shouldSendToClient(client *wsClient, event Event) bool {
|
||||
// if hub.shouldSendToClient(client, event) { client.send <- data }
|
||||
func (hub *EventHub) shouldSendToClient(client *wsClient, event Event) bool {
|
||||
// Always send pong and system events
|
||||
if event.Type == EventPong {
|
||||
return true
|
||||
|
|
@ -243,36 +243,36 @@ func (h *EventHub) shouldSendToClient(client *wsClient, event Event) bool {
|
|||
}
|
||||
|
||||
// hub.Stop() // safe to call multiple times; closes all client connections
|
||||
func (h *EventHub) Stop() {
|
||||
h.stopOnce.Do(func() {
|
||||
close(h.stop)
|
||||
func (hub *EventHub) Stop() {
|
||||
hub.stopOnce.Do(func() {
|
||||
close(hub.stop)
|
||||
})
|
||||
}
|
||||
|
||||
// hub.SetStateProvider(func() interface{} { return manager.GetState() })
|
||||
func (h *EventHub) SetStateProvider(provider StateProvider) {
|
||||
h.mutex.Lock()
|
||||
defer h.mutex.Unlock()
|
||||
h.stateProvider = provider
|
||||
func (hub *EventHub) SetStateProvider(provider StateProvider) {
|
||||
hub.mutex.Lock()
|
||||
defer hub.mutex.Unlock()
|
||||
hub.stateProvider = provider
|
||||
}
|
||||
|
||||
// hub.Broadcast(NewEvent(EventMinerStats, statsData))
|
||||
func (h *EventHub) Broadcast(event Event) {
|
||||
func (hub *EventHub) Broadcast(event Event) {
|
||||
if event.Timestamp.IsZero() {
|
||||
event.Timestamp = time.Now()
|
||||
}
|
||||
select {
|
||||
case h.broadcast <- event:
|
||||
case hub.broadcast <- event:
|
||||
default:
|
||||
logging.Warn("broadcast channel full, dropping event", logging.Fields{"type": event.Type})
|
||||
}
|
||||
}
|
||||
|
||||
// if hub.ClientCount() == 0 { return } // skip broadcast when no listeners
|
||||
func (h *EventHub) ClientCount() int {
|
||||
h.mutex.RLock()
|
||||
defer h.mutex.RUnlock()
|
||||
return len(h.clients)
|
||||
func (hub *EventHub) ClientCount() int {
|
||||
hub.mutex.RLock()
|
||||
defer hub.mutex.RUnlock()
|
||||
return len(hub.clients)
|
||||
}
|
||||
|
||||
// hub.Broadcast(NewEvent(EventMinerStarted, MinerEventData{Name: "xmrig"}))
|
||||
|
|
@ -378,14 +378,14 @@ func (c *wsClient) readPump() {
|
|||
}
|
||||
|
||||
// if !hub.ServeWs(conn) { c.JSON(http.StatusServiceUnavailable, gin.H{"error": "connection limit reached"}) }
|
||||
func (h *EventHub) ServeWs(conn *websocket.Conn) bool {
|
||||
func (hub *EventHub) ServeWs(conn *websocket.Conn) bool {
|
||||
// Check connection limit
|
||||
h.mutex.RLock()
|
||||
currentCount := len(h.clients)
|
||||
h.mutex.RUnlock()
|
||||
hub.mutex.RLock()
|
||||
currentCount := len(hub.clients)
|
||||
hub.mutex.RUnlock()
|
||||
|
||||
if currentCount >= h.maxConnections {
|
||||
logging.Warn("connection rejected: limit reached", logging.Fields{"current": currentCount, "max": h.maxConnections})
|
||||
if currentCount >= hub.maxConnections {
|
||||
logging.Warn("connection rejected: limit reached", logging.Fields{"current": currentCount, "max": hub.maxConnections})
|
||||
conn.WriteMessage(websocket.CloseMessage,
|
||||
websocket.FormatCloseMessage(websocket.CloseTryAgainLater, "connection limit reached"))
|
||||
conn.Close()
|
||||
|
|
@ -395,11 +395,11 @@ func (h *EventHub) ServeWs(conn *websocket.Conn) bool {
|
|||
client := &wsClient{
|
||||
conn: conn,
|
||||
send: make(chan []byte, 256),
|
||||
hub: h,
|
||||
hub: hub,
|
||||
miners: map[string]bool{"*": true}, // Subscribe to all by default
|
||||
}
|
||||
|
||||
h.register <- client
|
||||
hub.register <- client
|
||||
|
||||
// Start read/write pumps
|
||||
go client.writePump()
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue