237 lines
5.1 KiB
Go
237 lines
5.1 KiB
Go
package proxy
|
|
|
|
import (
|
|
"strconv"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// Workers maintains per-worker aggregate stats. Workers are identified by name,
|
|
// derived from the miner's login fields per WorkersMode.
|
|
//
|
|
// w := proxy.NewWorkers(proxy.WorkersByRigID, bus)
|
|
type Workers struct {
|
|
mode WorkersMode
|
|
customDiffStats bool
|
|
entries []WorkerRecord // ordered by first-seen (stable)
|
|
nameIndex map[string]int // workerName → entries index
|
|
idIndex map[int64]int // minerID → entries index
|
|
mu sync.RWMutex
|
|
}
|
|
|
|
// WorkerRecord is the per-identity aggregate.
|
|
//
|
|
// hr60 := record.Hashrate(60)
|
|
type WorkerRecord struct {
|
|
Name string
|
|
LastIP string
|
|
Connections uint64
|
|
Accepted uint64
|
|
Rejected uint64
|
|
Invalid uint64
|
|
Hashes uint64 // sum of accepted share difficulties
|
|
LastHashAt time.Time
|
|
windows [5]tickWindow // 60s, 600s, 3600s, 12h, 24h
|
|
}
|
|
|
|
// Hashrate returns the H/s for a given window (seconds: 60, 600, 3600, 43200, 86400).
|
|
//
|
|
// hr60 := record.Hashrate(60)
|
|
func (r *WorkerRecord) Hashrate(seconds int) float64 {
|
|
for index, windowSize := range hashrateWindowSizes {
|
|
if windowSize == seconds {
|
|
return float64(sumBuckets(r.windows[index].buckets)) / float64(seconds)
|
|
}
|
|
}
|
|
|
|
return 0
|
|
}
|
|
|
|
// NewWorkers creates the worker aggregate and subscribes it to the event bus.
|
|
//
|
|
// w := proxy.NewWorkers(proxy.WorkersByRigID, bus)
|
|
func NewWorkers(mode WorkersMode, bus *EventBus) *Workers {
|
|
workers := &Workers{
|
|
mode: mode,
|
|
entries: make([]WorkerRecord, 0),
|
|
nameIndex: make(map[string]int),
|
|
idIndex: make(map[int64]int),
|
|
}
|
|
|
|
if bus != nil {
|
|
bus.Subscribe(EventLogin, workers.onLogin)
|
|
bus.Subscribe(EventAccept, workers.onAccept)
|
|
bus.Subscribe(EventReject, workers.onReject)
|
|
bus.Subscribe(EventClose, workers.onClose)
|
|
}
|
|
|
|
return workers
|
|
}
|
|
|
|
// SetCustomDiffStats toggles per-custom-difficulty worker bucketing.
|
|
//
|
|
// workers.SetCustomDiffStats(true)
|
|
func (w *Workers) SetCustomDiffStats(enabled bool) {
|
|
if w == nil {
|
|
return
|
|
}
|
|
|
|
w.mu.Lock()
|
|
w.customDiffStats = enabled
|
|
w.mu.Unlock()
|
|
}
|
|
|
|
// List returns a snapshot of all worker records in first-seen order.
|
|
//
|
|
// records := workers.List()
|
|
func (w *Workers) List() []WorkerRecord {
|
|
w.mu.RLock()
|
|
defer w.mu.RUnlock()
|
|
|
|
records := make([]WorkerRecord, len(w.entries))
|
|
copy(records, w.entries)
|
|
return records
|
|
}
|
|
|
|
// Tick advances all worker hashrate windows. Called by the proxy tick loop every second.
|
|
//
|
|
// workers.Tick()
|
|
func (w *Workers) Tick() {
|
|
w.mu.Lock()
|
|
defer w.mu.Unlock()
|
|
|
|
for entryIndex := range w.entries {
|
|
for windowIndex, size := range hashrateWindowSizes {
|
|
if windowIndex >= len(w.entries[entryIndex].windows) {
|
|
break
|
|
}
|
|
window := &w.entries[entryIndex].windows[windowIndex]
|
|
if window.size == 0 {
|
|
window.size = size
|
|
window.buckets = make([]uint64, size)
|
|
}
|
|
window.pos = (window.pos + 1) % window.size
|
|
window.buckets[window.pos] = 0
|
|
}
|
|
}
|
|
}
|
|
|
|
func (w *Workers) onLogin(event Event) {
|
|
if event.Miner == nil || w.mode == WorkersDisabled {
|
|
return
|
|
}
|
|
|
|
name := w.workerName(event.Miner)
|
|
if name == "" {
|
|
return
|
|
}
|
|
|
|
w.mu.Lock()
|
|
defer w.mu.Unlock()
|
|
|
|
index, exists := w.nameIndex[name]
|
|
if !exists {
|
|
record := WorkerRecord{Name: name}
|
|
for windowIndex, size := range hashrateWindowSizes {
|
|
if windowIndex >= len(record.windows) {
|
|
break
|
|
}
|
|
record.windows[windowIndex] = tickWindow{
|
|
buckets: make([]uint64, size),
|
|
size: size,
|
|
}
|
|
}
|
|
w.entries = append(w.entries, record)
|
|
index = len(w.entries) - 1
|
|
w.nameIndex[name] = index
|
|
}
|
|
|
|
record := &w.entries[index]
|
|
record.LastIP = event.Miner.IP()
|
|
record.Connections++
|
|
w.idIndex[event.Miner.ID()] = index
|
|
}
|
|
|
|
func (w *Workers) onAccept(event Event) {
|
|
w.updateShare(event, true)
|
|
}
|
|
|
|
func (w *Workers) onReject(event Event) {
|
|
w.updateShare(event, false)
|
|
}
|
|
|
|
func (w *Workers) onClose(event Event) {
|
|
if event.Miner == nil {
|
|
return
|
|
}
|
|
|
|
w.mu.Lock()
|
|
defer w.mu.Unlock()
|
|
delete(w.idIndex, event.Miner.ID())
|
|
}
|
|
|
|
func (w *Workers) updateShare(event Event, accepted bool) {
|
|
if event.Miner == nil || w.mode == WorkersDisabled {
|
|
return
|
|
}
|
|
|
|
w.mu.Lock()
|
|
defer w.mu.Unlock()
|
|
|
|
index, exists := w.idIndex[event.Miner.ID()]
|
|
if !exists {
|
|
return
|
|
}
|
|
|
|
record := &w.entries[index]
|
|
if accepted {
|
|
record.Accepted++
|
|
record.Hashes += event.Diff
|
|
record.LastHashAt = time.Now().UTC()
|
|
for windowIndex := range record.windows {
|
|
record.windows[windowIndex].buckets[record.windows[windowIndex].pos] += event.Diff
|
|
}
|
|
return
|
|
}
|
|
|
|
record.Rejected++
|
|
if isInvalidShareError(event.Error) {
|
|
record.Invalid++
|
|
}
|
|
}
|
|
|
|
func (w *Workers) workerName(miner *Miner) string {
|
|
if miner == nil {
|
|
return ""
|
|
}
|
|
|
|
w.mu.RLock()
|
|
customDiffStats := w.customDiffStats
|
|
w.mu.RUnlock()
|
|
|
|
name := ""
|
|
switch w.mode {
|
|
case WorkersByRigID:
|
|
if miner.RigID() != "" {
|
|
name = miner.RigID()
|
|
} else {
|
|
name = miner.User()
|
|
}
|
|
case WorkersByUser:
|
|
name = miner.User()
|
|
case WorkersByPass:
|
|
name = miner.Password()
|
|
case WorkersByAgent:
|
|
name = miner.Agent()
|
|
case WorkersByIP:
|
|
name = miner.IP()
|
|
default:
|
|
return ""
|
|
}
|
|
|
|
if !customDiffStats || miner.CustomDiff() == 0 || name == "" {
|
|
return name
|
|
}
|
|
|
|
return name + "+cd" + strconv.FormatUint(miner.CustomDiff(), 10)
|
|
}
|