feat(proxy): add custom diff stats and clean failover disconnects

Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
Virgil 2026-04-04 19:20:29 +00:00
parent c250a4d6f2
commit e92c6070be
9 changed files with 327 additions and 47 deletions

View file

@ -25,13 +25,14 @@ type Router interface {
//
// {"version":"1.0.0","mode":"nicehash","hashrate":{"total":[...]}, ...}
type SummaryResponse struct {
Version string `json:"version"`
Mode string `json:"mode"`
Hashrate HashrateResponse `json:"hashrate"`
Miners MinersCountResponse `json:"miners"`
Workers uint64 `json:"workers"`
Upstreams UpstreamResponse `json:"upstreams"`
Results ResultsResponse `json:"results"`
Version string `json:"version"`
Mode string `json:"mode"`
Hashrate HashrateResponse `json:"hashrate"`
Miners MinersCountResponse `json:"miners"`
Workers uint64 `json:"workers"`
Upstreams UpstreamResponse `json:"upstreams"`
Results ResultsResponse `json:"results"`
CustomDiffStats map[uint64]proxy.CustomDiffBucketStats `json:"custom_diff_stats,omitempty"`
}
// HashrateResponse carries the per-window hashrate array.
@ -100,6 +101,7 @@ func summaryResponse(p *proxy.Proxy) SummaryResponse {
Hashrate: HashrateResponse{
Total: summary.Hashrate,
},
CustomDiffStats: summary.CustomDiffStats,
Miners: MinersCountResponse{
Now: now,
Max: max,

108
customdiffstats.go Normal file
View file

@ -0,0 +1,108 @@
package proxy
import (
"strings"
"sync"
)
// CustomDiffBucketStats tracks per-custom-difficulty share outcomes.
type CustomDiffBucketStats struct {
Accepted uint64 `json:"accepted"`
Rejected uint64 `json:"rejected"`
Invalid uint64 `json:"invalid"`
Expired uint64 `json:"expired"`
Hashes uint64 `json:"hashes_total"`
}
// CustomDiffBuckets records share totals grouped by miner custom difficulty.
type CustomDiffBuckets struct {
enabled bool
buckets map[uint64]*CustomDiffBucketStats
mu sync.Mutex
}
// NewCustomDiffBuckets creates a per-difficulty share tracker.
func NewCustomDiffBuckets(enabled bool) *CustomDiffBuckets {
return &CustomDiffBuckets{
enabled: enabled,
buckets: make(map[uint64]*CustomDiffBucketStats),
}
}
// SetEnabled toggles recording without discarding any collected buckets.
func (b *CustomDiffBuckets) SetEnabled(enabled bool) {
if b == nil {
return
}
b.mu.Lock()
defer b.mu.Unlock()
b.enabled = enabled
}
// OnAccept records an accepted share for the miner's custom difficulty bucket.
func (b *CustomDiffBuckets) OnAccept(e Event) {
if b == nil || !b.enabled || e.Miner == nil {
return
}
b.mu.Lock()
defer b.mu.Unlock()
bucket := b.bucketLocked(e.Miner.customDiff)
bucket.Accepted++
if e.Expired {
bucket.Expired++
}
if e.Diff > 0 {
bucket.Hashes += e.Diff
}
}
// OnReject records a rejected share for the miner's custom difficulty bucket.
func (b *CustomDiffBuckets) OnReject(e Event) {
if b == nil || !b.enabled || e.Miner == nil {
return
}
b.mu.Lock()
defer b.mu.Unlock()
bucket := b.bucketLocked(e.Miner.customDiff)
bucket.Rejected++
if isInvalidShareReason(e.Error) {
bucket.Invalid++
}
}
// Snapshot returns a copy of the current bucket totals.
func (b *CustomDiffBuckets) Snapshot() map[uint64]CustomDiffBucketStats {
if b == nil {
return nil
}
b.mu.Lock()
defer b.mu.Unlock()
if !b.enabled || len(b.buckets) == 0 {
return nil
}
out := make(map[uint64]CustomDiffBucketStats, len(b.buckets))
for diff, bucket := range b.buckets {
if bucket == nil {
continue
}
out[diff] = *bucket
}
return out
}
func (b *CustomDiffBuckets) bucketLocked(diff uint64) *CustomDiffBucketStats {
if b.buckets == nil {
b.buckets = make(map[uint64]*CustomDiffBucketStats)
}
bucket, ok := b.buckets[diff]
if !ok {
bucket = &CustomDiffBucketStats{}
b.buckets[diff] = bucket
}
return bucket
}
func isInvalidShareReason(reason string) bool {
reason = strings.ToLower(reason)
return strings.Contains(reason, "difficulty") || strings.Contains(reason, "invalid") || strings.Contains(reason, "nonce")
}

77
customdiffstats_test.go Normal file
View file

@ -0,0 +1,77 @@
package proxy
import "testing"
func TestProxy_CustomDiffStats_Good(t *testing.T) {
cfg := &Config{
Mode: "nicehash",
Workers: WorkersByRigID,
CustomDiffStats: true,
Bind: []BindAddr{{Host: "127.0.0.1", Port: 3333}},
Pools: []PoolConfig{{URL: "pool.example:3333", Enabled: true}},
}
p, result := New(cfg)
if !result.OK {
t.Fatalf("expected valid proxy, got error: %v", result.Error)
}
miner := &Miner{customDiff: 50000}
p.events.Dispatch(Event{Type: EventAccept, Miner: miner, Diff: 75, Expired: true})
summary := p.Summary()
bucket, ok := summary.CustomDiffStats[50000]
if !ok {
t.Fatalf("expected custom diff bucket 50000 to be present")
}
if bucket.Accepted != 1 || bucket.Expired != 1 || bucket.Hashes != 75 {
t.Fatalf("unexpected bucket totals: %+v", bucket)
}
}
func TestProxy_CustomDiffStats_Bad(t *testing.T) {
cfg := &Config{
Mode: "nicehash",
Workers: WorkersByRigID,
CustomDiffStats: true,
Bind: []BindAddr{{Host: "127.0.0.1", Port: 3333}},
Pools: []PoolConfig{{URL: "pool.example:3333", Enabled: true}},
}
p, result := New(cfg)
if !result.OK {
t.Fatalf("expected valid proxy, got error: %v", result.Error)
}
miner := &Miner{customDiff: 10000}
p.events.Dispatch(Event{Type: EventReject, Miner: miner, Error: "Invalid nonce"})
summary := p.Summary()
bucket, ok := summary.CustomDiffStats[10000]
if !ok {
t.Fatalf("expected custom diff bucket 10000 to be present")
}
if bucket.Rejected != 1 || bucket.Invalid != 1 {
t.Fatalf("unexpected bucket totals: %+v", bucket)
}
}
func TestProxy_CustomDiffStats_Ugly(t *testing.T) {
cfg := &Config{
Mode: "nicehash",
Workers: WorkersByRigID,
CustomDiffStats: false,
Bind: []BindAddr{{Host: "127.0.0.1", Port: 3333}},
Pools: []PoolConfig{{URL: "pool.example:3333", Enabled: true}},
}
p, result := New(cfg)
if !result.OK {
t.Fatalf("expected valid proxy, got error: %v", result.Error)
}
miner := &Miner{customDiff: 25000}
p.events.Dispatch(Event{Type: EventAccept, Miner: miner, Diff: 1})
summary := p.Summary()
if len(summary.CustomDiffStats) != 0 {
t.Fatalf("expected custom diff stats to remain disabled, got %+v", summary.CustomDiffStats)
}
}

View file

@ -350,6 +350,7 @@ func (s *FailoverStrategy) Connect() {
}
s.mu.Lock()
defer s.mu.Unlock()
s.closing = false
s.connectLocked(0)
}
@ -408,10 +409,12 @@ func (s *FailoverStrategy) Disconnect() {
return
}
s.mu.Lock()
defer s.mu.Unlock()
if s.client != nil {
s.client.Disconnect()
s.client = nil
client := s.client
s.closing = true
s.client = nil
s.mu.Unlock()
if client != nil {
client.Disconnect()
}
}
@ -452,6 +455,15 @@ func (s *FailoverStrategy) OnDisconnect() {
if s == nil {
return
}
s.mu.Lock()
closing := s.closing
if closing {
s.closing = false
}
s.mu.Unlock()
if closing {
return
}
if s.listener != nil {
s.listener.OnDisconnect()
}

View file

@ -18,6 +18,7 @@ type FailoverStrategy struct {
client *StratumClient
listener StratumListener
cfg *proxy.Config
closing bool
mu sync.Mutex
}

View file

@ -0,0 +1,65 @@
package pool
import (
"sync/atomic"
"testing"
"time"
"dappco.re/go/proxy"
)
type disconnectSpy struct {
disconnects atomic.Int64
}
func (s *disconnectSpy) OnJob(proxy.Job) {}
func (s *disconnectSpy) OnResultAccepted(int64, bool, string) {}
func (s *disconnectSpy) OnDisconnect() {
s.disconnects.Add(1)
}
func TestFailoverStrategy_Disconnect_Good(t *testing.T) {
spy := &disconnectSpy{}
strategy := &FailoverStrategy{
listener: spy,
client: &StratumClient{listener: nil},
}
strategy.client.listener = strategy
strategy.Disconnect()
time.Sleep(10 * time.Millisecond)
if got := spy.disconnects.Load(); got != 0 {
t.Fatalf("expected intentional disconnect to suppress reconnect, got %d listener calls", got)
}
}
func TestFailoverStrategy_Disconnect_Bad(t *testing.T) {
spy := &disconnectSpy{}
strategy := &FailoverStrategy{listener: spy}
strategy.OnDisconnect()
if got := spy.disconnects.Load(); got != 1 {
t.Fatalf("expected external disconnect to notify listener once, got %d", got)
}
}
func TestFailoverStrategy_Disconnect_Ugly(t *testing.T) {
spy := &disconnectSpy{}
strategy := &FailoverStrategy{
listener: spy,
client: &StratumClient{listener: nil},
}
strategy.client.listener = strategy
strategy.Disconnect()
strategy.Disconnect()
time.Sleep(10 * time.Millisecond)
if got := spy.disconnects.Load(); got != 0 {
t.Fatalf("expected repeated intentional disconnects to remain silent, got %d listener calls", got)
}
}

View file

@ -23,23 +23,24 @@ import (
// p, result := proxy.New(cfg)
// if result.OK { p.Start() }
type Proxy struct {
config *Config
splitter Splitter
stats *Stats
workers *Workers
events *EventBus
servers []*Server
ticker *time.Ticker
watcher *ConfigWatcher
done chan struct{}
stopOnce sync.Once
minersMu sync.RWMutex
miners map[int64]*Miner
customDiff *CustomDiff
rateLimit *RateLimiter
httpServer *http.Server
accessLog *accessLogSink
submitCount atomic.Int64
config *Config
splitter Splitter
stats *Stats
workers *Workers
events *EventBus
servers []*Server
ticker *time.Ticker
watcher *ConfigWatcher
done chan struct{}
stopOnce sync.Once
minersMu sync.RWMutex
miners map[int64]*Miner
customDiff *CustomDiff
customDiffBuckets *CustomDiffBuckets
rateLimit *RateLimiter
httpServer *http.Server
accessLog *accessLogSink
submitCount atomic.Int64
}
// Splitter is the interface both NonceSplitter and SimpleSplitter satisfy.

View file

@ -39,15 +39,16 @@ func New(cfg *Config) (*Proxy, Result) {
}
p := &Proxy{
config: cfg,
events: NewEventBus(),
stats: NewStats(),
workers: NewWorkers(cfg.Workers, nil),
miners: make(map[int64]*Miner),
customDiff: NewCustomDiff(cfg.CustomDiff),
rateLimit: NewRateLimiter(cfg.RateLimit),
accessLog: newAccessLogSink(cfg.AccessLogFile),
done: make(chan struct{}),
config: cfg,
events: NewEventBus(),
stats: NewStats(),
workers: NewWorkers(cfg.Workers, nil),
miners: make(map[int64]*Miner),
customDiff: NewCustomDiff(cfg.CustomDiff),
customDiffBuckets: NewCustomDiffBuckets(cfg.CustomDiffStats),
rateLimit: NewRateLimiter(cfg.RateLimit),
accessLog: newAccessLogSink(cfg.AccessLogFile),
done: make(chan struct{}),
}
p.workers.bindEvents(p.events)
@ -60,6 +61,10 @@ func New(cfg *Config) (*Proxy, Result) {
p.events.Subscribe(EventClose, p.stats.OnClose)
p.events.Subscribe(EventAccept, p.stats.OnAccept)
p.events.Subscribe(EventReject, p.stats.OnReject)
if p.customDiffBuckets != nil {
p.events.Subscribe(EventAccept, p.customDiffBuckets.OnAccept)
p.events.Subscribe(EventReject, p.customDiffBuckets.OnReject)
}
if cfg.Watch && cfg.sourcePath != "" {
p.watcher = NewConfigWatcher(cfg.sourcePath, p.Reload)
}
@ -94,7 +99,11 @@ func (p *Proxy) Summary() StatsSummary {
if p == nil || p.stats == nil {
return StatsSummary{}
}
return p.stats.Summary()
summary := p.stats.Summary()
if p.customDiffBuckets != nil {
summary.CustomDiffStats = p.customDiffBuckets.Snapshot()
}
return summary
}
// WorkerRecords returns a stable snapshot of worker rows.
@ -274,6 +283,9 @@ func (p *Proxy) Reload(cfg *Config) {
if p.customDiff != nil {
p.customDiff.globalDiff = cfg.CustomDiff
}
if p.customDiffBuckets != nil {
p.customDiffBuckets.SetEnabled(cfg.CustomDiffStats)
}
p.rateLimit = NewRateLimiter(cfg.RateLimit)
for _, server := range p.servers {
if server != nil {
@ -499,6 +511,7 @@ func (p *Proxy) summaryDocument() any {
"hashrate": map[string]any{
"total": summary.Hashrate,
},
"custom_diff_stats": summary.CustomDiffStats,
"miners": map[string]any{
"now": now,
"max": max,

View file

@ -49,13 +49,14 @@ type tickWindow struct {
//
// summary := stats.Summary()
type StatsSummary struct {
Accepted uint64 `json:"accepted"`
Rejected uint64 `json:"rejected"`
Invalid uint64 `json:"invalid"`
Expired uint64 `json:"expired"`
Hashes uint64 `json:"hashes_total"`
AvgTime uint32 `json:"avg_time"` // seconds per accepted share
AvgLatency uint32 `json:"latency"` // median pool response latency in ms
Hashrate [6]float64 `json:"hashrate"` // H/s per window (index = HashrateWindow* constants)
TopDiff [10]uint64 `json:"best"`
Accepted uint64 `json:"accepted"`
Rejected uint64 `json:"rejected"`
Invalid uint64 `json:"invalid"`
Expired uint64 `json:"expired"`
Hashes uint64 `json:"hashes_total"`
AvgTime uint32 `json:"avg_time"` // seconds per accepted share
AvgLatency uint32 `json:"latency"` // median pool response latency in ms
Hashrate [6]float64 `json:"hashrate"` // H/s per window (index = HashrateWindow* constants)
TopDiff [10]uint64 `json:"best"`
CustomDiffStats map[uint64]CustomDiffBucketStats `json:"custom_diff_stats,omitempty"`
}