package proxy import ( "strconv" "sync" "time" ) // Workers maintains per-worker aggregate stats. Workers are identified by name, // derived from the miner's login fields per WorkersMode. // // w := proxy.NewWorkers(proxy.WorkersByRigID, bus) type Workers struct { mode WorkersMode customDiffStats bool entries []WorkerRecord // ordered by first-seen (stable) nameIndex map[string]int // workerName → entries index idIndex map[int64]int // minerID → entries index mu sync.RWMutex } // WorkerRecord is the per-identity aggregate. // // hr60 := record.Hashrate(60) type WorkerRecord struct { Name string LastIP string Connections uint64 Accepted uint64 Rejected uint64 Invalid uint64 Hashes uint64 // sum of accepted share difficulties LastHashAt time.Time windows [5]tickWindow // 60s, 600s, 3600s, 12h, 24h } // Hashrate returns the H/s for a given window (seconds: 60, 600, 3600, 43200, 86400). // // hr60 := record.Hashrate(60) func (r *WorkerRecord) Hashrate(seconds int) float64 { for index, windowSize := range hashrateWindowSizes { if windowSize == seconds { return float64(sumBuckets(r.windows[index].buckets)) / float64(seconds) } } return 0 } // NewWorkers creates the worker aggregate and subscribes it to the event bus. // // w := proxy.NewWorkers(proxy.WorkersByRigID, bus) func NewWorkers(mode WorkersMode, bus *EventBus) *Workers { workers := &Workers{ mode: mode, entries: make([]WorkerRecord, 0), nameIndex: make(map[string]int), idIndex: make(map[int64]int), } if bus != nil { bus.Subscribe(EventLogin, workers.onLogin) bus.Subscribe(EventAccept, workers.onAccept) bus.Subscribe(EventReject, workers.onReject) bus.Subscribe(EventClose, workers.onClose) } return workers } // SetCustomDiffStats toggles per-custom-difficulty worker bucketing. // // workers.SetCustomDiffStats(true) func (w *Workers) SetCustomDiffStats(enabled bool) { if w == nil { return } w.mu.Lock() w.customDiffStats = enabled w.mu.Unlock() } // List returns a snapshot of all worker records in first-seen order. // // records := workers.List() func (w *Workers) List() []WorkerRecord { w.mu.RLock() defer w.mu.RUnlock() records := make([]WorkerRecord, len(w.entries)) copy(records, w.entries) return records } // Tick advances all worker hashrate windows. Called by the proxy tick loop every second. // // workers.Tick() func (w *Workers) Tick() { w.mu.Lock() defer w.mu.Unlock() for entryIndex := range w.entries { for windowIndex, size := range hashrateWindowSizes { if windowIndex >= len(w.entries[entryIndex].windows) { break } window := &w.entries[entryIndex].windows[windowIndex] if window.size == 0 { window.size = size window.buckets = make([]uint64, size) } window.pos = (window.pos + 1) % window.size window.buckets[window.pos] = 0 } } } func (w *Workers) onLogin(event Event) { if event.Miner == nil || w.mode == WorkersDisabled { return } name := w.workerName(event.Miner) if name == "" { return } w.mu.Lock() defer w.mu.Unlock() index, exists := w.nameIndex[name] if !exists { record := WorkerRecord{Name: name} for windowIndex, size := range hashrateWindowSizes { if windowIndex >= len(record.windows) { break } record.windows[windowIndex] = tickWindow{ buckets: make([]uint64, size), size: size, } } w.entries = append(w.entries, record) index = len(w.entries) - 1 w.nameIndex[name] = index } record := &w.entries[index] record.LastIP = event.Miner.IP() record.Connections++ w.idIndex[event.Miner.ID()] = index } func (w *Workers) onAccept(event Event) { w.updateShare(event, true) } func (w *Workers) onReject(event Event) { w.updateShare(event, false) } func (w *Workers) onClose(event Event) { if event.Miner == nil { return } w.mu.Lock() defer w.mu.Unlock() delete(w.idIndex, event.Miner.ID()) } func (w *Workers) updateShare(event Event, accepted bool) { if event.Miner == nil || w.mode == WorkersDisabled { return } w.mu.Lock() defer w.mu.Unlock() index, exists := w.idIndex[event.Miner.ID()] if !exists { return } record := &w.entries[index] if accepted { record.Accepted++ record.Hashes += event.Diff record.LastHashAt = time.Now().UTC() for windowIndex := range record.windows { record.windows[windowIndex].buckets[record.windows[windowIndex].pos] += event.Diff } return } record.Rejected++ if isInvalidShareError(event.Error) { record.Invalid++ } } func (w *Workers) workerName(miner *Miner) string { if miner == nil { return "" } w.mu.RLock() customDiffStats := w.customDiffStats w.mu.RUnlock() name := "" switch w.mode { case WorkersByRigID: if miner.RigID() != "" { name = miner.RigID() } else { name = miner.User() } case WorkersByUser: name = miner.User() case WorkersByPass: name = miner.Password() case WorkersByAgent: name = miner.Agent() case WorkersByIP: name = miner.IP() default: return "" } if !customDiffStats || miner.CustomDiff() == 0 || name == "" { return name } return name + "+cd" + strconv.FormatUint(miner.CustomDiff(), 10) }