go-proxy/worker.go
Virgil 0a195f7962 feat(proxy): add UUID session ids and custom diff buckets
Co-Authored-By: Virgil <virgil@lethean.io>
2026-04-04 11:53:03 +00:00

237 lines
5.1 KiB
Go

package proxy
import (
"strconv"
"sync"
"time"
)
// Workers maintains per-worker aggregate stats. Workers are identified by name,
// derived from the miner's login fields per WorkersMode.
//
// w := proxy.NewWorkers(proxy.WorkersByRigID, bus)
type Workers struct {
mode WorkersMode
customDiffStats bool
entries []WorkerRecord // ordered by first-seen (stable)
nameIndex map[string]int // workerName → entries index
idIndex map[int64]int // minerID → entries index
mu sync.RWMutex
}
// WorkerRecord is the per-identity aggregate.
//
// hr60 := record.Hashrate(60)
type WorkerRecord struct {
Name string
LastIP string
Connections uint64
Accepted uint64
Rejected uint64
Invalid uint64
Hashes uint64 // sum of accepted share difficulties
LastHashAt time.Time
windows [5]tickWindow // 60s, 600s, 3600s, 12h, 24h
}
// Hashrate returns the H/s for a given window (seconds: 60, 600, 3600, 43200, 86400).
//
// hr60 := record.Hashrate(60)
func (r *WorkerRecord) Hashrate(seconds int) float64 {
for index, windowSize := range hashrateWindowSizes {
if windowSize == seconds {
return float64(sumBuckets(r.windows[index].buckets)) / float64(seconds)
}
}
return 0
}
// NewWorkers creates the worker aggregate and subscribes it to the event bus.
//
// w := proxy.NewWorkers(proxy.WorkersByRigID, bus)
func NewWorkers(mode WorkersMode, bus *EventBus) *Workers {
workers := &Workers{
mode: mode,
entries: make([]WorkerRecord, 0),
nameIndex: make(map[string]int),
idIndex: make(map[int64]int),
}
if bus != nil {
bus.Subscribe(EventLogin, workers.onLogin)
bus.Subscribe(EventAccept, workers.onAccept)
bus.Subscribe(EventReject, workers.onReject)
bus.Subscribe(EventClose, workers.onClose)
}
return workers
}
// SetCustomDiffStats toggles per-custom-difficulty worker bucketing.
//
// workers.SetCustomDiffStats(true)
func (w *Workers) SetCustomDiffStats(enabled bool) {
if w == nil {
return
}
w.mu.Lock()
w.customDiffStats = enabled
w.mu.Unlock()
}
// List returns a snapshot of all worker records in first-seen order.
//
// records := workers.List()
func (w *Workers) List() []WorkerRecord {
w.mu.RLock()
defer w.mu.RUnlock()
records := make([]WorkerRecord, len(w.entries))
copy(records, w.entries)
return records
}
// Tick advances all worker hashrate windows. Called by the proxy tick loop every second.
//
// workers.Tick()
func (w *Workers) Tick() {
w.mu.Lock()
defer w.mu.Unlock()
for entryIndex := range w.entries {
for windowIndex, size := range hashrateWindowSizes {
if windowIndex >= len(w.entries[entryIndex].windows) {
break
}
window := &w.entries[entryIndex].windows[windowIndex]
if window.size == 0 {
window.size = size
window.buckets = make([]uint64, size)
}
window.pos = (window.pos + 1) % window.size
window.buckets[window.pos] = 0
}
}
}
func (w *Workers) onLogin(event Event) {
if event.Miner == nil || w.mode == WorkersDisabled {
return
}
name := w.workerName(event.Miner)
if name == "" {
return
}
w.mu.Lock()
defer w.mu.Unlock()
index, exists := w.nameIndex[name]
if !exists {
record := WorkerRecord{Name: name}
for windowIndex, size := range hashrateWindowSizes {
if windowIndex >= len(record.windows) {
break
}
record.windows[windowIndex] = tickWindow{
buckets: make([]uint64, size),
size: size,
}
}
w.entries = append(w.entries, record)
index = len(w.entries) - 1
w.nameIndex[name] = index
}
record := &w.entries[index]
record.LastIP = event.Miner.IP()
record.Connections++
w.idIndex[event.Miner.ID()] = index
}
func (w *Workers) onAccept(event Event) {
w.updateShare(event, true)
}
func (w *Workers) onReject(event Event) {
w.updateShare(event, false)
}
func (w *Workers) onClose(event Event) {
if event.Miner == nil {
return
}
w.mu.Lock()
defer w.mu.Unlock()
delete(w.idIndex, event.Miner.ID())
}
func (w *Workers) updateShare(event Event, accepted bool) {
if event.Miner == nil || w.mode == WorkersDisabled {
return
}
w.mu.Lock()
defer w.mu.Unlock()
index, exists := w.idIndex[event.Miner.ID()]
if !exists {
return
}
record := &w.entries[index]
if accepted {
record.Accepted++
record.Hashes += event.Diff
record.LastHashAt = time.Now().UTC()
for windowIndex := range record.windows {
record.windows[windowIndex].buckets[record.windows[windowIndex].pos] += event.Diff
}
return
}
record.Rejected++
if isInvalidShareError(event.Error) {
record.Invalid++
}
}
func (w *Workers) workerName(miner *Miner) string {
if miner == nil {
return ""
}
w.mu.RLock()
customDiffStats := w.customDiffStats
w.mu.RUnlock()
name := ""
switch w.mode {
case WorkersByRigID:
if miner.RigID() != "" {
name = miner.RigID()
} else {
name = miner.User()
}
case WorkersByUser:
name = miner.User()
case WorkersByPass:
name = miner.Password()
case WorkersByAgent:
name = miner.Agent()
case WorkersByIP:
name = miner.IP()
default:
return ""
}
if !customDiffStats || miner.CustomDiff() == 0 || name == "" {
return name
}
return name + "+cd" + strconv.FormatUint(miner.CustomDiff(), 10)
}