package proxy import ( "slices" "sort" "sync" "sync/atomic" "time" ) // Stats tracks global proxy metrics. Hot-path counters are atomic. Hashrate windows // use a ring buffer per window size, advanced by Tick(). // // s := proxy.NewStats() // bus.Subscribe(proxy.EventAccept, s.OnAccept) // bus.Subscribe(proxy.EventReject, s.OnReject) type Stats struct { accepted atomic.Uint64 rejected atomic.Uint64 invalid atomic.Uint64 expired atomic.Uint64 hashes atomic.Uint64 // cumulative sum of accepted share difficulties connections atomic.Uint64 // total TCP connections accepted (ever) maxMiners atomic.Uint64 // peak concurrent miner count topDifficulties [10]uint64 // top-10 accepted difficulties, sorted descending; guarded by mu latencySamples []uint16 // pool response latencies in ms; capped at 10000 samples; guarded by mu windows [6]tickWindow // one per hashrate reporting period startTime time.Time mu sync.Mutex } // Hashrate window sizes in seconds. Index maps to Stats.windows and SummaryResponse.Hashrate. const ( HashrateWindow60s = 0 // 1 minute HashrateWindow600s = 1 // 10 minutes HashrateWindow3600s = 2 // 1 hour HashrateWindow12h = 3 // 12 hours HashrateWindow24h = 4 // 24 hours HashrateWindowAll = 5 // all-time (single accumulator, no window) ) // tickWindow is a fixed-capacity ring buffer of per-second difficulty sums. type tickWindow struct { buckets []uint64 pos int size int // window size in seconds = len(buckets) } // StatsSummary is the serialisable snapshot returned by Summary(). // // summary := stats.Summary() type StatsSummary struct { Accepted uint64 `json:"accepted"` Rejected uint64 `json:"rejected"` Invalid uint64 `json:"invalid"` Expired uint64 `json:"expired"` Hashes uint64 `json:"hashes_total"` AvgTime uint32 `json:"avg_time"` // seconds per accepted share AvgLatency uint32 `json:"latency"` // median pool response latency in ms Hashrate [6]float64 `json:"hashrate"` // H/s per window (index = HashrateWindow* constants) TopDiff [10]uint64 `json:"best"` } var hashrateWindowSizes = [5]int{60, 600, 3600, 43200, 86400} // NewStats allocates the rolling windows and initialises the clock anchor. // // s := proxy.NewStats() func NewStats() *Stats { stats := &Stats{ startTime: time.Now().UTC(), latencySamples: make([]uint16, 0, 128), } for index, size := range hashrateWindowSizes { stats.windows[index] = tickWindow{ buckets: make([]uint64, size), size: size, } } return stats } // OnAccept records an accepted share. Adds diff to the current second's bucket in all windows. // // stats.OnAccept(proxy.Event{Diff: 100000, Latency: 82}) func (s *Stats) OnAccept(event Event) { s.accepted.Add(1) s.hashes.Add(event.Diff) if event.Expired { s.expired.Add(1) } s.mu.Lock() for index := 0; index < HashrateWindowAll; index++ { s.windows[index].buckets[s.windows[index].pos] += event.Diff } insertTopDiff(&s.topDifficulties, event.Diff) if event.Latency > 0 { s.latencySamples = appendCappedLatency(s.latencySamples, event.Latency) } s.mu.Unlock() } // OnReject records a rejected share. If e.Error indicates low diff or malformed, increments invalid. // // stats.OnReject(proxy.Event{Error: "Low difficulty share"}) func (s *Stats) OnReject(event Event) { s.rejected.Add(1) if isInvalidShareError(event.Error) { s.invalid.Add(1) } if event.Expired { s.expired.Add(1) } if event.Latency > 0 { s.mu.Lock() s.latencySamples = appendCappedLatency(s.latencySamples, event.Latency) s.mu.Unlock() } } // Tick advances all rolling windows by one second bucket. Called by the proxy tick loop. // // stats.Tick() func (s *Stats) Tick() { s.mu.Lock() defer s.mu.Unlock() for index := 0; index < HashrateWindowAll; index++ { window := &s.windows[index] window.pos = (window.pos + 1) % window.size window.buckets[window.pos] = 0 } } // Summary returns a point-in-time snapshot of all stats fields for API serialisation. // // summary := stats.Summary() func (s *Stats) Summary() StatsSummary { s.mu.Lock() defer s.mu.Unlock() var summary StatsSummary summary.Accepted = s.accepted.Load() summary.Rejected = s.rejected.Load() summary.Invalid = s.invalid.Load() summary.Expired = s.expired.Load() summary.Hashes = s.hashes.Load() summary.TopDiff = s.topDifficulties for index := 0; index < HashrateWindowAll; index++ { windowSize := hashrateWindowSizes[index] summary.Hashrate[index] = float64(sumBuckets(s.windows[index].buckets)) / float64(windowSize) } uptimeSeconds := uint64(time.Since(s.startTime).Seconds()) if uptimeSeconds > 0 { summary.Hashrate[HashrateWindowAll] = float64(summary.Hashes) / float64(uptimeSeconds) } if summary.Accepted > 0 && uptimeSeconds > 0 { summary.AvgTime = uint32(uptimeSeconds / summary.Accepted) } if len(s.latencySamples) > 0 { values := slices.Clone(s.latencySamples) sort.Slice(values, func(left int, right int) bool { return values[left] < values[right] }) summary.AvgLatency = uint32(values[len(values)/2]) } return summary } func appendCappedLatency(latencies []uint16, latency uint16) []uint16 { if len(latencies) == 10000 { copy(latencies, latencies[1:]) latencies[len(latencies)-1] = latency return latencies } return append(latencies, latency) } func insertTopDiff(topDiff *[10]uint64, difficulty uint64) { if difficulty == 0 { return } for index, value := range topDiff { if difficulty <= value { continue } copy(topDiff[index+1:], topDiff[index:len(topDiff)-1]) topDiff[index] = difficulty return } } func isInvalidShareError(message string) bool { switch message { case "Low difficulty share", "Invalid nonce", "Malformed share", "Invalid result": return true default: return false } } func sumBuckets(values []uint64) uint64 { var total uint64 for _, value := range values { total += value } return total }