go-proxy/stats.go
Virgil 48c6e0fc6d feat(proxy): implement RFC runtime primitives
Co-Authored-By: Virgil <virgil@lethean.io>
2026-04-04 10:39:59 +00:00

218 lines
5.8 KiB
Go

package proxy
import (
"slices"
"sort"
"sync"
"sync/atomic"
"time"
)
// Stats tracks global proxy metrics. Hot-path counters are atomic. Hashrate windows
// use a ring buffer per window size, advanced by Tick().
//
// s := proxy.NewStats()
// bus.Subscribe(proxy.EventAccept, s.OnAccept)
// bus.Subscribe(proxy.EventReject, s.OnReject)
type Stats struct {
accepted atomic.Uint64
rejected atomic.Uint64
invalid atomic.Uint64
expired atomic.Uint64
hashes atomic.Uint64 // cumulative sum of accepted share difficulties
connections atomic.Uint64 // total TCP connections accepted (ever)
maxMiners atomic.Uint64 // peak concurrent miner count
topDiff [10]uint64 // top-10 accepted difficulties, sorted descending; guarded by mu
latency []uint16 // pool response latencies in ms; capped at 10000 samples; guarded by mu
windows [6]tickWindow // one per hashrate reporting period
startTime time.Time
mu sync.Mutex
}
// Hashrate window sizes in seconds. Index maps to Stats.windows and SummaryResponse.Hashrate.
const (
HashrateWindow60s = 0 // 1 minute
HashrateWindow600s = 1 // 10 minutes
HashrateWindow3600s = 2 // 1 hour
HashrateWindow12h = 3 // 12 hours
HashrateWindow24h = 4 // 24 hours
HashrateWindowAll = 5 // all-time (single accumulator, no window)
)
// tickWindow is a fixed-capacity ring buffer of per-second difficulty sums.
type tickWindow struct {
buckets []uint64
pos int
size int // window size in seconds = len(buckets)
}
// StatsSummary is the serialisable snapshot returned by Summary().
//
// summary := stats.Summary()
type StatsSummary struct {
Accepted uint64 `json:"accepted"`
Rejected uint64 `json:"rejected"`
Invalid uint64 `json:"invalid"`
Expired uint64 `json:"expired"`
Hashes uint64 `json:"hashes_total"`
AvgTime uint32 `json:"avg_time"` // seconds per accepted share
AvgLatency uint32 `json:"latency"` // median pool response latency in ms
Hashrate [6]float64 `json:"hashrate"` // H/s per window (index = HashrateWindow* constants)
TopDiff [10]uint64 `json:"best"`
}
var hashrateWindowSizes = [5]int{60, 600, 3600, 43200, 86400}
// NewStats allocates the rolling windows and initialises the clock anchor.
//
// s := proxy.NewStats()
func NewStats() *Stats {
stats := &Stats{
startTime: time.Now().UTC(),
latency: make([]uint16, 0, 128),
}
for index, size := range hashrateWindowSizes {
stats.windows[index] = tickWindow{
buckets: make([]uint64, size),
size: size,
}
}
return stats
}
// OnAccept records an accepted share. Adds diff to the current second's bucket in all windows.
//
// stats.OnAccept(proxy.Event{Diff: 100000, Latency: 82})
func (s *Stats) OnAccept(event Event) {
s.accepted.Add(1)
s.hashes.Add(event.Diff)
if event.Expired {
s.expired.Add(1)
}
s.mu.Lock()
for index := 0; index < HashrateWindowAll; index++ {
s.windows[index].buckets[s.windows[index].pos] += event.Diff
}
insertTopDiff(&s.topDiff, event.Diff)
if event.Latency > 0 {
s.latency = appendCappedLatency(s.latency, event.Latency)
}
s.mu.Unlock()
}
// OnReject records a rejected share. If e.Error indicates low diff or malformed, increments invalid.
//
// stats.OnReject(proxy.Event{Error: "Low difficulty share"})
func (s *Stats) OnReject(event Event) {
s.rejected.Add(1)
if isInvalidShareError(event.Error) {
s.invalid.Add(1)
}
if event.Expired {
s.expired.Add(1)
}
if event.Latency > 0 {
s.mu.Lock()
s.latency = appendCappedLatency(s.latency, event.Latency)
s.mu.Unlock()
}
}
// Tick advances all rolling windows by one second bucket. Called by the proxy tick loop.
//
// stats.Tick()
func (s *Stats) Tick() {
s.mu.Lock()
defer s.mu.Unlock()
for index := 0; index < HashrateWindowAll; index++ {
window := &s.windows[index]
window.pos = (window.pos + 1) % window.size
window.buckets[window.pos] = 0
}
}
// Summary returns a point-in-time snapshot of all stats fields for API serialisation.
//
// summary := stats.Summary()
func (s *Stats) Summary() StatsSummary {
s.mu.Lock()
defer s.mu.Unlock()
var summary StatsSummary
summary.Accepted = s.accepted.Load()
summary.Rejected = s.rejected.Load()
summary.Invalid = s.invalid.Load()
summary.Expired = s.expired.Load()
summary.Hashes = s.hashes.Load()
summary.TopDiff = s.topDiff
for index := 0; index < HashrateWindowAll; index++ {
windowSize := hashrateWindowSizes[index]
summary.Hashrate[index] = float64(sumBuckets(s.windows[index].buckets)) / float64(windowSize)
}
uptimeSeconds := uint64(time.Since(s.startTime).Seconds())
if uptimeSeconds > 0 {
summary.Hashrate[HashrateWindowAll] = float64(summary.Hashes) / float64(uptimeSeconds)
}
if summary.Accepted > 0 && uptimeSeconds > 0 {
summary.AvgTime = uint32(uptimeSeconds / summary.Accepted)
}
if len(s.latency) > 0 {
values := slices.Clone(s.latency)
sort.Slice(values, func(left int, right int) bool {
return values[left] < values[right]
})
summary.AvgLatency = uint32(values[len(values)/2])
}
return summary
}
func appendCappedLatency(latencies []uint16, latency uint16) []uint16 {
if len(latencies) == 10000 {
copy(latencies, latencies[1:])
latencies[len(latencies)-1] = latency
return latencies
}
return append(latencies, latency)
}
func insertTopDiff(topDiff *[10]uint64, difficulty uint64) {
if difficulty == 0 {
return
}
for index, value := range topDiff {
if difficulty <= value {
continue
}
copy(topDiff[index+1:], topDiff[index:len(topDiff)-1])
topDiff[index] = difficulty
return
}
}
func isInvalidShareError(message string) bool {
switch message {
case "Low difficulty share", "Invalid nonce", "Malformed share", "Invalid result":
return true
default:
return false
}
}
func sumBuckets(values []uint64) uint64 {
var total uint64
for _, value := range values {
total += value
}
return total
}