218 lines
5.9 KiB
Go
218 lines
5.9 KiB
Go
package proxy
|
|
|
|
import (
|
|
"slices"
|
|
"sort"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
)
|
|
|
|
// Stats tracks global proxy metrics. Hot-path counters are atomic. Hashrate windows
|
|
// use a ring buffer per window size, advanced by Tick().
|
|
//
|
|
// s := proxy.NewStats()
|
|
// bus.Subscribe(proxy.EventAccept, s.OnAccept)
|
|
// bus.Subscribe(proxy.EventReject, s.OnReject)
|
|
type Stats struct {
|
|
accepted atomic.Uint64
|
|
rejected atomic.Uint64
|
|
invalid atomic.Uint64
|
|
expired atomic.Uint64
|
|
hashes atomic.Uint64 // cumulative sum of accepted share difficulties
|
|
connections atomic.Uint64 // total TCP connections accepted (ever)
|
|
maxMiners atomic.Uint64 // peak concurrent miner count
|
|
topDifficulties [10]uint64 // top-10 accepted difficulties, sorted descending; guarded by mu
|
|
latencySamples []uint16 // pool response latencies in ms; capped at 10000 samples; guarded by mu
|
|
windows [6]tickWindow // one per hashrate reporting period
|
|
startTime time.Time
|
|
mu sync.Mutex
|
|
}
|
|
|
|
// Hashrate window sizes in seconds. Index maps to Stats.windows and SummaryResponse.Hashrate.
|
|
const (
|
|
HashrateWindow60s = 0 // 1 minute
|
|
HashrateWindow600s = 1 // 10 minutes
|
|
HashrateWindow3600s = 2 // 1 hour
|
|
HashrateWindow12h = 3 // 12 hours
|
|
HashrateWindow24h = 4 // 24 hours
|
|
HashrateWindowAll = 5 // all-time (single accumulator, no window)
|
|
)
|
|
|
|
// tickWindow is a fixed-capacity ring buffer of per-second difficulty sums.
|
|
type tickWindow struct {
|
|
buckets []uint64
|
|
pos int
|
|
size int // window size in seconds = len(buckets)
|
|
}
|
|
|
|
// StatsSummary is the serialisable snapshot returned by Summary().
|
|
//
|
|
// summary := stats.Summary()
|
|
type StatsSummary struct {
|
|
Accepted uint64 `json:"accepted"`
|
|
Rejected uint64 `json:"rejected"`
|
|
Invalid uint64 `json:"invalid"`
|
|
Expired uint64 `json:"expired"`
|
|
Hashes uint64 `json:"hashes_total"`
|
|
AvgTime uint32 `json:"avg_time"` // seconds per accepted share
|
|
AvgLatency uint32 `json:"latency"` // median pool response latency in ms
|
|
Hashrate [6]float64 `json:"hashrate"` // H/s per window (index = HashrateWindow* constants)
|
|
TopDiff [10]uint64 `json:"best"`
|
|
}
|
|
|
|
var hashrateWindowSizes = [5]int{60, 600, 3600, 43200, 86400}
|
|
|
|
// NewStats allocates the rolling windows and initialises the clock anchor.
|
|
//
|
|
// s := proxy.NewStats()
|
|
func NewStats() *Stats {
|
|
stats := &Stats{
|
|
startTime: time.Now().UTC(),
|
|
latencySamples: make([]uint16, 0, 128),
|
|
}
|
|
|
|
for index, size := range hashrateWindowSizes {
|
|
stats.windows[index] = tickWindow{
|
|
buckets: make([]uint64, size),
|
|
size: size,
|
|
}
|
|
}
|
|
|
|
return stats
|
|
}
|
|
|
|
// OnAccept records an accepted share. Adds diff to the current second's bucket in all windows.
|
|
//
|
|
// stats.OnAccept(proxy.Event{Diff: 100000, Latency: 82})
|
|
func (s *Stats) OnAccept(event Event) {
|
|
s.accepted.Add(1)
|
|
s.hashes.Add(event.Diff)
|
|
if event.Expired {
|
|
s.expired.Add(1)
|
|
}
|
|
|
|
s.mu.Lock()
|
|
for index := 0; index < HashrateWindowAll; index++ {
|
|
s.windows[index].buckets[s.windows[index].pos] += event.Diff
|
|
}
|
|
insertTopDiff(&s.topDifficulties, event.Diff)
|
|
if event.Latency > 0 {
|
|
s.latencySamples = appendCappedLatency(s.latencySamples, event.Latency)
|
|
}
|
|
s.mu.Unlock()
|
|
}
|
|
|
|
// OnReject records a rejected share. If e.Error indicates low diff or malformed, increments invalid.
|
|
//
|
|
// stats.OnReject(proxy.Event{Error: "Low difficulty share"})
|
|
func (s *Stats) OnReject(event Event) {
|
|
s.rejected.Add(1)
|
|
if isInvalidShareError(event.Error) {
|
|
s.invalid.Add(1)
|
|
}
|
|
if event.Expired {
|
|
s.expired.Add(1)
|
|
}
|
|
if event.Latency > 0 {
|
|
s.mu.Lock()
|
|
s.latencySamples = appendCappedLatency(s.latencySamples, event.Latency)
|
|
s.mu.Unlock()
|
|
}
|
|
}
|
|
|
|
// Tick advances all rolling windows by one second bucket. Called by the proxy tick loop.
|
|
//
|
|
// stats.Tick()
|
|
func (s *Stats) Tick() {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
for index := 0; index < HashrateWindowAll; index++ {
|
|
window := &s.windows[index]
|
|
window.pos = (window.pos + 1) % window.size
|
|
window.buckets[window.pos] = 0
|
|
}
|
|
}
|
|
|
|
// Summary returns a point-in-time snapshot of all stats fields for API serialisation.
|
|
//
|
|
// summary := stats.Summary()
|
|
func (s *Stats) Summary() StatsSummary {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
var summary StatsSummary
|
|
summary.Accepted = s.accepted.Load()
|
|
summary.Rejected = s.rejected.Load()
|
|
summary.Invalid = s.invalid.Load()
|
|
summary.Expired = s.expired.Load()
|
|
summary.Hashes = s.hashes.Load()
|
|
summary.TopDiff = s.topDifficulties
|
|
|
|
for index := 0; index < HashrateWindowAll; index++ {
|
|
windowSize := hashrateWindowSizes[index]
|
|
summary.Hashrate[index] = float64(sumBuckets(s.windows[index].buckets)) / float64(windowSize)
|
|
}
|
|
|
|
uptimeSeconds := uint64(time.Since(s.startTime).Seconds())
|
|
if uptimeSeconds > 0 {
|
|
summary.Hashrate[HashrateWindowAll] = float64(summary.Hashes) / float64(uptimeSeconds)
|
|
}
|
|
if summary.Accepted > 0 && uptimeSeconds > 0 {
|
|
summary.AvgTime = uint32(uptimeSeconds / summary.Accepted)
|
|
}
|
|
|
|
if len(s.latencySamples) > 0 {
|
|
values := slices.Clone(s.latencySamples)
|
|
sort.Slice(values, func(left int, right int) bool {
|
|
return values[left] < values[right]
|
|
})
|
|
summary.AvgLatency = uint32(values[len(values)/2])
|
|
}
|
|
|
|
return summary
|
|
}
|
|
|
|
func appendCappedLatency(latencies []uint16, latency uint16) []uint16 {
|
|
if len(latencies) == 10000 {
|
|
copy(latencies, latencies[1:])
|
|
latencies[len(latencies)-1] = latency
|
|
return latencies
|
|
}
|
|
|
|
return append(latencies, latency)
|
|
}
|
|
|
|
func insertTopDiff(topDiff *[10]uint64, difficulty uint64) {
|
|
if difficulty == 0 {
|
|
return
|
|
}
|
|
|
|
for index, value := range topDiff {
|
|
if difficulty <= value {
|
|
continue
|
|
}
|
|
|
|
copy(topDiff[index+1:], topDiff[index:len(topDiff)-1])
|
|
topDiff[index] = difficulty
|
|
return
|
|
}
|
|
}
|
|
|
|
func isInvalidShareError(message string) bool {
|
|
switch message {
|
|
case "Low difficulty share", "Invalid nonce", "Malformed share", "Invalid result":
|
|
return true
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
func sumBuckets(values []uint64) uint64 {
|
|
var total uint64
|
|
for _, value := range values {
|
|
total += value
|
|
}
|
|
return total
|
|
}
|