AX Principle 1 — predictable names over short names. `msg` required reading the struct to understand what it contained; `clientMessage` is self-describing at the call site. Co-Authored-By: Charon <charon@lethean.io>
408 lines
11 KiB
Go
408 lines
11 KiB
Go
package mining
|
|
|
|
import (
|
|
"sync"
|
|
"time"
|
|
|
|
"forge.lthn.ai/Snider/Mining/pkg/logging"
|
|
"github.com/gorilla/websocket"
|
|
)
|
|
|
|
// hub.Broadcast(NewEvent(EventMinerStarted, MinerEventData{Name: "xmrig"}))
|
|
// hub.Broadcast(NewEvent(EventMinerStats, MinerStatsData{Name: "xmrig", Hashrate: 1200}))
|
|
type EventType string
|
|
|
|
const (
|
|
// Miner lifecycle events
|
|
EventMinerStarting EventType = "miner.starting"
|
|
EventMinerStarted EventType = "miner.started"
|
|
EventMinerStopping EventType = "miner.stopping"
|
|
EventMinerStopped EventType = "miner.stopped"
|
|
EventMinerStats EventType = "miner.stats"
|
|
EventMinerError EventType = "miner.error"
|
|
EventMinerConnected EventType = "miner.connected"
|
|
|
|
// System events
|
|
EventPong EventType = "pong"
|
|
EventStateSync EventType = "state.sync" // Initial state on connect/reconnect
|
|
)
|
|
|
|
// hub.Broadcast(Event{Type: EventMinerStarted, Data: MinerEventData{Name: "xmrig"}})
|
|
type Event struct {
|
|
Type EventType `json:"type"`
|
|
Timestamp time.Time `json:"timestamp"`
|
|
Data interface{} `json:"data,omitempty"`
|
|
}
|
|
|
|
// hub.Broadcast(NewEvent(EventMinerStats, MinerStatsData{Name: "xmrig", Hashrate: 1200, Shares: 5}))
|
|
type MinerStatsData struct {
|
|
Name string `json:"name"`
|
|
Hashrate int `json:"hashrate"`
|
|
Shares int `json:"shares"`
|
|
Rejected int `json:"rejected"`
|
|
Uptime int `json:"uptime"`
|
|
Algorithm string `json:"algorithm,omitempty"`
|
|
DiffCurrent int `json:"diffCurrent,omitempty"`
|
|
}
|
|
|
|
// hub.Broadcast(NewEvent(EventMinerStarted, MinerEventData{Name: "xmrig", ProfileID: "default"}))
|
|
type MinerEventData struct {
|
|
Name string `json:"name"`
|
|
ProfileID string `json:"profileId,omitempty"`
|
|
Reason string `json:"reason,omitempty"`
|
|
Error string `json:"error,omitempty"`
|
|
Pool string `json:"pool,omitempty"`
|
|
}
|
|
|
|
// hub.ServeWs(conn) // upgrades conn, creates wsClient, registers with hub
|
|
type wsClient struct {
|
|
conn *websocket.Conn
|
|
send chan []byte
|
|
hub *EventHub
|
|
miners map[string]bool // subscribed miners, "*" for all
|
|
minersMutex sync.RWMutex // protects miners map from concurrent access
|
|
closeOnce sync.Once
|
|
}
|
|
|
|
// client.safeClose() // safe to call from multiple goroutines; channel closed exactly once
|
|
func (client *wsClient) safeClose() {
|
|
client.closeOnce.Do(func() {
|
|
close(client.send)
|
|
})
|
|
}
|
|
|
|
// hub.SetStateProvider(func() interface{} { return manager.GetState() })
|
|
type StateProvider func() interface{}
|
|
|
|
// hub := NewEventHub(); go hub.Run(); hub.Broadcast(NewEvent(EventMinerStarted, data))
|
|
type EventHub struct {
|
|
clients map[*wsClient]bool
|
|
broadcast chan Event
|
|
register chan *wsClient
|
|
unregister chan *wsClient
|
|
mutex sync.RWMutex
|
|
stop chan struct{}
|
|
stopOnce sync.Once
|
|
maxConnections int
|
|
stateProvider StateProvider
|
|
}
|
|
|
|
// hub := NewEventHubWithOptions(50) // cap connections; 100 is the default
|
|
const DefaultMaxConnections = 100
|
|
|
|
// hub := NewEventHub()
|
|
// go hub.Run()
|
|
func NewEventHub() *EventHub {
|
|
return NewEventHubWithOptions(DefaultMaxConnections)
|
|
}
|
|
|
|
// hub := NewEventHubWithOptions(50)
|
|
// go hub.Run()
|
|
func NewEventHubWithOptions(maxConnections int) *EventHub {
|
|
if maxConnections <= 0 {
|
|
maxConnections = DefaultMaxConnections
|
|
}
|
|
return &EventHub{
|
|
clients: make(map[*wsClient]bool),
|
|
broadcast: make(chan Event, 256),
|
|
register: make(chan *wsClient, 16),
|
|
unregister: make(chan *wsClient, 16), // Buffered to prevent goroutine leaks on shutdown
|
|
stop: make(chan struct{}),
|
|
maxConnections: maxConnections,
|
|
}
|
|
}
|
|
|
|
// hub := NewEventHub()
|
|
// go hub.Run() // blocks until hub.Stop() is called
|
|
func (hub *EventHub) Run() {
|
|
for {
|
|
select {
|
|
case <-hub.stop:
|
|
// Close all client connections
|
|
hub.mutex.Lock()
|
|
for client := range hub.clients {
|
|
client.safeClose()
|
|
delete(hub.clients, client)
|
|
}
|
|
hub.mutex.Unlock()
|
|
return
|
|
|
|
case client := <-hub.register:
|
|
hub.mutex.Lock()
|
|
hub.clients[client] = true
|
|
stateProvider := hub.stateProvider
|
|
hub.mutex.Unlock()
|
|
logging.Debug("client connected", logging.Fields{"total": len(hub.clients)})
|
|
|
|
// Send initial state sync if provider is set
|
|
if stateProvider != nil {
|
|
go func(wsconn *wsClient) {
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
logging.Error("panic in state sync goroutine", logging.Fields{"panic": r})
|
|
}
|
|
}()
|
|
state := stateProvider()
|
|
if state != nil {
|
|
event := Event{
|
|
Type: EventStateSync,
|
|
Timestamp: time.Now(),
|
|
Data: state,
|
|
}
|
|
data, err := MarshalJSON(event)
|
|
if err != nil {
|
|
logging.Error("failed to marshal state sync", logging.Fields{"error": err})
|
|
return
|
|
}
|
|
select {
|
|
case wsconn.send <- data:
|
|
default:
|
|
// Client buffer full
|
|
}
|
|
}
|
|
}(client)
|
|
}
|
|
|
|
case client := <-hub.unregister:
|
|
hub.mutex.Lock()
|
|
if _, ok := hub.clients[client]; ok {
|
|
delete(hub.clients, client)
|
|
client.safeClose()
|
|
// Decrement WebSocket connection metrics
|
|
RecordWSConnection(false)
|
|
}
|
|
hub.mutex.Unlock()
|
|
logging.Debug("client disconnected", logging.Fields{"total": len(hub.clients)})
|
|
|
|
case event := <-hub.broadcast:
|
|
data, err := MarshalJSON(event)
|
|
if err != nil {
|
|
logging.Error("failed to marshal event", logging.Fields{"error": err})
|
|
continue
|
|
}
|
|
|
|
hub.mutex.RLock()
|
|
for client := range hub.clients {
|
|
// Check if client is subscribed to this miner
|
|
if hub.shouldSendToClient(client, event) {
|
|
select {
|
|
case client.send <- data:
|
|
default:
|
|
// Client buffer full, close connection
|
|
go func(wsconn *wsClient) {
|
|
hub.unregister <- wsconn
|
|
}(client)
|
|
}
|
|
}
|
|
}
|
|
hub.mutex.RUnlock()
|
|
}
|
|
}
|
|
}
|
|
|
|
// if hub.shouldSendToClient(client, event) { client.send <- data }
|
|
func (hub *EventHub) shouldSendToClient(client *wsClient, event Event) bool {
|
|
// Always send pong and system events
|
|
if event.Type == EventPong {
|
|
return true
|
|
}
|
|
|
|
// Check miner subscription for miner events (protected by mutex)
|
|
client.minersMutex.RLock()
|
|
defer client.minersMutex.RUnlock()
|
|
|
|
if client.miners == nil || len(client.miners) == 0 {
|
|
// No subscription filter, send all
|
|
return true
|
|
}
|
|
|
|
// Check for wildcard subscription
|
|
if client.miners["*"] {
|
|
return true
|
|
}
|
|
|
|
// Extract miner name from event data
|
|
minerName := ""
|
|
switch data := event.Data.(type) {
|
|
case MinerStatsData:
|
|
minerName = data.Name
|
|
case MinerEventData:
|
|
minerName = data.Name
|
|
case map[string]interface{}:
|
|
if name, ok := data["name"].(string); ok {
|
|
minerName = name
|
|
}
|
|
}
|
|
|
|
if minerName == "" {
|
|
// Non-miner event, send to all
|
|
return true
|
|
}
|
|
|
|
return client.miners[minerName]
|
|
}
|
|
|
|
// hub.Stop() // safe to call multiple times; closes all client connections
|
|
func (hub *EventHub) Stop() {
|
|
hub.stopOnce.Do(func() {
|
|
close(hub.stop)
|
|
})
|
|
}
|
|
|
|
// hub.SetStateProvider(func() interface{} { return manager.GetState() })
|
|
func (hub *EventHub) SetStateProvider(provider StateProvider) {
|
|
hub.mutex.Lock()
|
|
defer hub.mutex.Unlock()
|
|
hub.stateProvider = provider
|
|
}
|
|
|
|
// hub.Broadcast(NewEvent(EventMinerStats, statsData))
|
|
func (hub *EventHub) Broadcast(event Event) {
|
|
if event.Timestamp.IsZero() {
|
|
event.Timestamp = time.Now()
|
|
}
|
|
select {
|
|
case hub.broadcast <- event:
|
|
default:
|
|
logging.Warn("broadcast channel full, dropping event", logging.Fields{"type": event.Type})
|
|
}
|
|
}
|
|
|
|
// if hub.ClientCount() == 0 { return } // skip broadcast when no listeners
|
|
func (hub *EventHub) ClientCount() int {
|
|
hub.mutex.RLock()
|
|
defer hub.mutex.RUnlock()
|
|
return len(hub.clients)
|
|
}
|
|
|
|
// hub.Broadcast(NewEvent(EventMinerStarted, MinerEventData{Name: "xmrig"}))
|
|
func NewEvent(eventType EventType, data interface{}) Event {
|
|
return Event{
|
|
Type: eventType,
|
|
Timestamp: time.Now(),
|
|
Data: data,
|
|
}
|
|
}
|
|
|
|
// go client.writePump() // started by ServeWs; writes hub events to conn, sends ping every 30s
|
|
func (client *wsClient) writePump() {
|
|
ticker := time.NewTicker(30 * time.Second)
|
|
defer func() {
|
|
ticker.Stop()
|
|
client.conn.Close()
|
|
}()
|
|
|
|
for {
|
|
select {
|
|
case message, ok := <-client.send:
|
|
client.conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
|
|
if !ok {
|
|
// Hub closed the channel
|
|
client.conn.WriteMessage(websocket.CloseMessage, []byte{})
|
|
return
|
|
}
|
|
|
|
w, err := client.conn.NextWriter(websocket.TextMessage)
|
|
if err != nil {
|
|
return
|
|
}
|
|
if _, err := w.Write(message); err != nil {
|
|
logging.Debug("WebSocket write error", logging.Fields{"error": err})
|
|
return
|
|
}
|
|
|
|
if err := w.Close(); err != nil {
|
|
return
|
|
}
|
|
|
|
case <-ticker.C:
|
|
client.conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
|
|
if err := client.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// go client.readPump() // started by ServeWs; reads subscribe/ping messages from conn, unregisters on close
|
|
func (client *wsClient) readPump() {
|
|
defer func() {
|
|
client.hub.unregister <- client
|
|
client.conn.Close()
|
|
}()
|
|
|
|
client.conn.SetReadLimit(512)
|
|
client.conn.SetReadDeadline(time.Now().Add(60 * time.Second))
|
|
client.conn.SetPongHandler(func(string) error {
|
|
client.conn.SetReadDeadline(time.Now().Add(60 * time.Second))
|
|
return nil
|
|
})
|
|
|
|
for {
|
|
_, message, err := client.conn.ReadMessage()
|
|
if err != nil {
|
|
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
|
|
logging.Debug("WebSocket error", logging.Fields{"error": err})
|
|
}
|
|
break
|
|
}
|
|
|
|
// Parse client message
|
|
var clientMessage struct {
|
|
Type string `json:"type"`
|
|
Miners []string `json:"miners,omitempty"`
|
|
}
|
|
if err := UnmarshalJSON(message, &clientMessage); err != nil {
|
|
continue
|
|
}
|
|
|
|
switch clientMessage.Type {
|
|
case "subscribe":
|
|
// Update miner subscription (protected by mutex)
|
|
client.minersMutex.Lock()
|
|
client.miners = make(map[string]bool)
|
|
for _, m := range clientMessage.Miners {
|
|
client.miners[m] = true
|
|
}
|
|
client.minersMutex.Unlock()
|
|
logging.Debug("client subscribed to miners", logging.Fields{"miners": clientMessage.Miners})
|
|
|
|
case "ping":
|
|
// Respond with pong
|
|
client.hub.Broadcast(Event{
|
|
Type: EventPong,
|
|
Timestamp: time.Now(),
|
|
})
|
|
}
|
|
}
|
|
}
|
|
|
|
// if !hub.ServeWs(conn) { c.JSON(http.StatusServiceUnavailable, gin.H{"error": "connection limit reached"}) }
|
|
func (hub *EventHub) ServeWs(conn *websocket.Conn) bool {
|
|
// Check connection limit
|
|
hub.mutex.RLock()
|
|
currentCount := len(hub.clients)
|
|
hub.mutex.RUnlock()
|
|
|
|
if currentCount >= hub.maxConnections {
|
|
logging.Warn("connection rejected: limit reached", logging.Fields{"current": currentCount, "max": hub.maxConnections})
|
|
conn.WriteMessage(websocket.CloseMessage,
|
|
websocket.FormatCloseMessage(websocket.CloseTryAgainLater, "connection limit reached"))
|
|
conn.Close()
|
|
return false
|
|
}
|
|
|
|
client := &wsClient{
|
|
conn: conn,
|
|
send: make(chan []byte, 256),
|
|
hub: hub,
|
|
miners: map[string]bool{"*": true}, // Subscribe to all by default
|
|
}
|
|
|
|
hub.register <- client
|
|
|
|
// Start read/write pumps
|
|
go client.writePump()
|
|
go client.readPump()
|
|
return true
|
|
}
|