diff --git a/api/router.go b/api/router.go index 41b54b5..8331a58 100644 --- a/api/router.go +++ b/api/router.go @@ -25,13 +25,14 @@ type Router interface { // // {"version":"1.0.0","mode":"nicehash","hashrate":{"total":[...]}, ...} type SummaryResponse struct { - Version string `json:"version"` - Mode string `json:"mode"` - Hashrate HashrateResponse `json:"hashrate"` - Miners MinersCountResponse `json:"miners"` - Workers uint64 `json:"workers"` - Upstreams UpstreamResponse `json:"upstreams"` - Results ResultsResponse `json:"results"` + Version string `json:"version"` + Mode string `json:"mode"` + Hashrate HashrateResponse `json:"hashrate"` + Miners MinersCountResponse `json:"miners"` + Workers uint64 `json:"workers"` + Upstreams UpstreamResponse `json:"upstreams"` + Results ResultsResponse `json:"results"` + CustomDiffStats map[uint64]proxy.CustomDiffBucketStats `json:"custom_diff_stats,omitempty"` } // HashrateResponse carries the per-window hashrate array. @@ -100,6 +101,7 @@ func summaryResponse(p *proxy.Proxy) SummaryResponse { Hashrate: HashrateResponse{ Total: summary.Hashrate, }, + CustomDiffStats: summary.CustomDiffStats, Miners: MinersCountResponse{ Now: now, Max: max, diff --git a/customdiffstats.go b/customdiffstats.go new file mode 100644 index 0000000..5ea9ed3 --- /dev/null +++ b/customdiffstats.go @@ -0,0 +1,108 @@ +package proxy + +import ( + "strings" + "sync" +) + +// CustomDiffBucketStats tracks per-custom-difficulty share outcomes. +type CustomDiffBucketStats struct { + Accepted uint64 `json:"accepted"` + Rejected uint64 `json:"rejected"` + Invalid uint64 `json:"invalid"` + Expired uint64 `json:"expired"` + Hashes uint64 `json:"hashes_total"` +} + +// CustomDiffBuckets records share totals grouped by miner custom difficulty. +type CustomDiffBuckets struct { + enabled bool + buckets map[uint64]*CustomDiffBucketStats + mu sync.Mutex +} + +// NewCustomDiffBuckets creates a per-difficulty share tracker. +func NewCustomDiffBuckets(enabled bool) *CustomDiffBuckets { + return &CustomDiffBuckets{ + enabled: enabled, + buckets: make(map[uint64]*CustomDiffBucketStats), + } +} + +// SetEnabled toggles recording without discarding any collected buckets. +func (b *CustomDiffBuckets) SetEnabled(enabled bool) { + if b == nil { + return + } + b.mu.Lock() + defer b.mu.Unlock() + b.enabled = enabled +} + +// OnAccept records an accepted share for the miner's custom difficulty bucket. +func (b *CustomDiffBuckets) OnAccept(e Event) { + if b == nil || !b.enabled || e.Miner == nil { + return + } + b.mu.Lock() + defer b.mu.Unlock() + bucket := b.bucketLocked(e.Miner.customDiff) + bucket.Accepted++ + if e.Expired { + bucket.Expired++ + } + if e.Diff > 0 { + bucket.Hashes += e.Diff + } +} + +// OnReject records a rejected share for the miner's custom difficulty bucket. +func (b *CustomDiffBuckets) OnReject(e Event) { + if b == nil || !b.enabled || e.Miner == nil { + return + } + b.mu.Lock() + defer b.mu.Unlock() + bucket := b.bucketLocked(e.Miner.customDiff) + bucket.Rejected++ + if isInvalidShareReason(e.Error) { + bucket.Invalid++ + } +} + +// Snapshot returns a copy of the current bucket totals. +func (b *CustomDiffBuckets) Snapshot() map[uint64]CustomDiffBucketStats { + if b == nil { + return nil + } + b.mu.Lock() + defer b.mu.Unlock() + if !b.enabled || len(b.buckets) == 0 { + return nil + } + out := make(map[uint64]CustomDiffBucketStats, len(b.buckets)) + for diff, bucket := range b.buckets { + if bucket == nil { + continue + } + out[diff] = *bucket + } + return out +} + +func (b *CustomDiffBuckets) bucketLocked(diff uint64) *CustomDiffBucketStats { + if b.buckets == nil { + b.buckets = make(map[uint64]*CustomDiffBucketStats) + } + bucket, ok := b.buckets[diff] + if !ok { + bucket = &CustomDiffBucketStats{} + b.buckets[diff] = bucket + } + return bucket +} + +func isInvalidShareReason(reason string) bool { + reason = strings.ToLower(reason) + return strings.Contains(reason, "difficulty") || strings.Contains(reason, "invalid") || strings.Contains(reason, "nonce") +} diff --git a/customdiffstats_test.go b/customdiffstats_test.go new file mode 100644 index 0000000..7b5aef8 --- /dev/null +++ b/customdiffstats_test.go @@ -0,0 +1,77 @@ +package proxy + +import "testing" + +func TestProxy_CustomDiffStats_Good(t *testing.T) { + cfg := &Config{ + Mode: "nicehash", + Workers: WorkersByRigID, + CustomDiffStats: true, + Bind: []BindAddr{{Host: "127.0.0.1", Port: 3333}}, + Pools: []PoolConfig{{URL: "pool.example:3333", Enabled: true}}, + } + p, result := New(cfg) + if !result.OK { + t.Fatalf("expected valid proxy, got error: %v", result.Error) + } + + miner := &Miner{customDiff: 50000} + p.events.Dispatch(Event{Type: EventAccept, Miner: miner, Diff: 75, Expired: true}) + + summary := p.Summary() + bucket, ok := summary.CustomDiffStats[50000] + if !ok { + t.Fatalf("expected custom diff bucket 50000 to be present") + } + if bucket.Accepted != 1 || bucket.Expired != 1 || bucket.Hashes != 75 { + t.Fatalf("unexpected bucket totals: %+v", bucket) + } +} + +func TestProxy_CustomDiffStats_Bad(t *testing.T) { + cfg := &Config{ + Mode: "nicehash", + Workers: WorkersByRigID, + CustomDiffStats: true, + Bind: []BindAddr{{Host: "127.0.0.1", Port: 3333}}, + Pools: []PoolConfig{{URL: "pool.example:3333", Enabled: true}}, + } + p, result := New(cfg) + if !result.OK { + t.Fatalf("expected valid proxy, got error: %v", result.Error) + } + + miner := &Miner{customDiff: 10000} + p.events.Dispatch(Event{Type: EventReject, Miner: miner, Error: "Invalid nonce"}) + + summary := p.Summary() + bucket, ok := summary.CustomDiffStats[10000] + if !ok { + t.Fatalf("expected custom diff bucket 10000 to be present") + } + if bucket.Rejected != 1 || bucket.Invalid != 1 { + t.Fatalf("unexpected bucket totals: %+v", bucket) + } +} + +func TestProxy_CustomDiffStats_Ugly(t *testing.T) { + cfg := &Config{ + Mode: "nicehash", + Workers: WorkersByRigID, + CustomDiffStats: false, + Bind: []BindAddr{{Host: "127.0.0.1", Port: 3333}}, + Pools: []PoolConfig{{URL: "pool.example:3333", Enabled: true}}, + } + p, result := New(cfg) + if !result.OK { + t.Fatalf("expected valid proxy, got error: %v", result.Error) + } + + miner := &Miner{customDiff: 25000} + p.events.Dispatch(Event{Type: EventAccept, Miner: miner, Diff: 1}) + + summary := p.Summary() + if len(summary.CustomDiffStats) != 0 { + t.Fatalf("expected custom diff stats to remain disabled, got %+v", summary.CustomDiffStats) + } +} diff --git a/pool/impl.go b/pool/impl.go index ef6e9c2..bf48b03 100644 --- a/pool/impl.go +++ b/pool/impl.go @@ -350,6 +350,7 @@ func (s *FailoverStrategy) Connect() { } s.mu.Lock() defer s.mu.Unlock() + s.closing = false s.connectLocked(0) } @@ -408,10 +409,12 @@ func (s *FailoverStrategy) Disconnect() { return } s.mu.Lock() - defer s.mu.Unlock() - if s.client != nil { - s.client.Disconnect() - s.client = nil + client := s.client + s.closing = true + s.client = nil + s.mu.Unlock() + if client != nil { + client.Disconnect() } } @@ -452,6 +455,15 @@ func (s *FailoverStrategy) OnDisconnect() { if s == nil { return } + s.mu.Lock() + closing := s.closing + if closing { + s.closing = false + } + s.mu.Unlock() + if closing { + return + } if s.listener != nil { s.listener.OnDisconnect() } diff --git a/pool/strategy.go b/pool/strategy.go index 92b2efb..c9e1986 100644 --- a/pool/strategy.go +++ b/pool/strategy.go @@ -18,6 +18,7 @@ type FailoverStrategy struct { client *StratumClient listener StratumListener cfg *proxy.Config + closing bool mu sync.Mutex } diff --git a/pool/strategy_disconnect_test.go b/pool/strategy_disconnect_test.go new file mode 100644 index 0000000..de9a173 --- /dev/null +++ b/pool/strategy_disconnect_test.go @@ -0,0 +1,65 @@ +package pool + +import ( + "sync/atomic" + "testing" + "time" + + "dappco.re/go/proxy" +) + +type disconnectSpy struct { + disconnects atomic.Int64 +} + +func (s *disconnectSpy) OnJob(proxy.Job) {} + +func (s *disconnectSpy) OnResultAccepted(int64, bool, string) {} + +func (s *disconnectSpy) OnDisconnect() { + s.disconnects.Add(1) +} + +func TestFailoverStrategy_Disconnect_Good(t *testing.T) { + spy := &disconnectSpy{} + strategy := &FailoverStrategy{ + listener: spy, + client: &StratumClient{listener: nil}, + } + strategy.client.listener = strategy + + strategy.Disconnect() + time.Sleep(10 * time.Millisecond) + + if got := spy.disconnects.Load(); got != 0 { + t.Fatalf("expected intentional disconnect to suppress reconnect, got %d listener calls", got) + } +} + +func TestFailoverStrategy_Disconnect_Bad(t *testing.T) { + spy := &disconnectSpy{} + strategy := &FailoverStrategy{listener: spy} + + strategy.OnDisconnect() + + if got := spy.disconnects.Load(); got != 1 { + t.Fatalf("expected external disconnect to notify listener once, got %d", got) + } +} + +func TestFailoverStrategy_Disconnect_Ugly(t *testing.T) { + spy := &disconnectSpy{} + strategy := &FailoverStrategy{ + listener: spy, + client: &StratumClient{listener: nil}, + } + strategy.client.listener = strategy + + strategy.Disconnect() + strategy.Disconnect() + time.Sleep(10 * time.Millisecond) + + if got := spy.disconnects.Load(); got != 0 { + t.Fatalf("expected repeated intentional disconnects to remain silent, got %d listener calls", got) + } +} diff --git a/proxy.go b/proxy.go index 35d994a..813911d 100644 --- a/proxy.go +++ b/proxy.go @@ -23,23 +23,24 @@ import ( // p, result := proxy.New(cfg) // if result.OK { p.Start() } type Proxy struct { - config *Config - splitter Splitter - stats *Stats - workers *Workers - events *EventBus - servers []*Server - ticker *time.Ticker - watcher *ConfigWatcher - done chan struct{} - stopOnce sync.Once - minersMu sync.RWMutex - miners map[int64]*Miner - customDiff *CustomDiff - rateLimit *RateLimiter - httpServer *http.Server - accessLog *accessLogSink - submitCount atomic.Int64 + config *Config + splitter Splitter + stats *Stats + workers *Workers + events *EventBus + servers []*Server + ticker *time.Ticker + watcher *ConfigWatcher + done chan struct{} + stopOnce sync.Once + minersMu sync.RWMutex + miners map[int64]*Miner + customDiff *CustomDiff + customDiffBuckets *CustomDiffBuckets + rateLimit *RateLimiter + httpServer *http.Server + accessLog *accessLogSink + submitCount atomic.Int64 } // Splitter is the interface both NonceSplitter and SimpleSplitter satisfy. diff --git a/state_impl.go b/state_impl.go index 45febe1..2835141 100644 --- a/state_impl.go +++ b/state_impl.go @@ -39,15 +39,16 @@ func New(cfg *Config) (*Proxy, Result) { } p := &Proxy{ - config: cfg, - events: NewEventBus(), - stats: NewStats(), - workers: NewWorkers(cfg.Workers, nil), - miners: make(map[int64]*Miner), - customDiff: NewCustomDiff(cfg.CustomDiff), - rateLimit: NewRateLimiter(cfg.RateLimit), - accessLog: newAccessLogSink(cfg.AccessLogFile), - done: make(chan struct{}), + config: cfg, + events: NewEventBus(), + stats: NewStats(), + workers: NewWorkers(cfg.Workers, nil), + miners: make(map[int64]*Miner), + customDiff: NewCustomDiff(cfg.CustomDiff), + customDiffBuckets: NewCustomDiffBuckets(cfg.CustomDiffStats), + rateLimit: NewRateLimiter(cfg.RateLimit), + accessLog: newAccessLogSink(cfg.AccessLogFile), + done: make(chan struct{}), } p.workers.bindEvents(p.events) @@ -60,6 +61,10 @@ func New(cfg *Config) (*Proxy, Result) { p.events.Subscribe(EventClose, p.stats.OnClose) p.events.Subscribe(EventAccept, p.stats.OnAccept) p.events.Subscribe(EventReject, p.stats.OnReject) + if p.customDiffBuckets != nil { + p.events.Subscribe(EventAccept, p.customDiffBuckets.OnAccept) + p.events.Subscribe(EventReject, p.customDiffBuckets.OnReject) + } if cfg.Watch && cfg.sourcePath != "" { p.watcher = NewConfigWatcher(cfg.sourcePath, p.Reload) } @@ -94,7 +99,11 @@ func (p *Proxy) Summary() StatsSummary { if p == nil || p.stats == nil { return StatsSummary{} } - return p.stats.Summary() + summary := p.stats.Summary() + if p.customDiffBuckets != nil { + summary.CustomDiffStats = p.customDiffBuckets.Snapshot() + } + return summary } // WorkerRecords returns a stable snapshot of worker rows. @@ -274,6 +283,9 @@ func (p *Proxy) Reload(cfg *Config) { if p.customDiff != nil { p.customDiff.globalDiff = cfg.CustomDiff } + if p.customDiffBuckets != nil { + p.customDiffBuckets.SetEnabled(cfg.CustomDiffStats) + } p.rateLimit = NewRateLimiter(cfg.RateLimit) for _, server := range p.servers { if server != nil { @@ -499,6 +511,7 @@ func (p *Proxy) summaryDocument() any { "hashrate": map[string]any{ "total": summary.Hashrate, }, + "custom_diff_stats": summary.CustomDiffStats, "miners": map[string]any{ "now": now, "max": max, diff --git a/stats.go b/stats.go index ca5e536..f516767 100644 --- a/stats.go +++ b/stats.go @@ -49,13 +49,14 @@ type tickWindow struct { // // summary := stats.Summary() type StatsSummary struct { - Accepted uint64 `json:"accepted"` - Rejected uint64 `json:"rejected"` - Invalid uint64 `json:"invalid"` - Expired uint64 `json:"expired"` - Hashes uint64 `json:"hashes_total"` - AvgTime uint32 `json:"avg_time"` // seconds per accepted share - AvgLatency uint32 `json:"latency"` // median pool response latency in ms - Hashrate [6]float64 `json:"hashrate"` // H/s per window (index = HashrateWindow* constants) - TopDiff [10]uint64 `json:"best"` + Accepted uint64 `json:"accepted"` + Rejected uint64 `json:"rejected"` + Invalid uint64 `json:"invalid"` + Expired uint64 `json:"expired"` + Hashes uint64 `json:"hashes_total"` + AvgTime uint32 `json:"avg_time"` // seconds per accepted share + AvgLatency uint32 `json:"latency"` // median pool response latency in ms + Hashrate [6]float64 `json:"hashrate"` // H/s per window (index = HashrateWindow* constants) + TopDiff [10]uint64 `json:"best"` + CustomDiffStats map[uint64]CustomDiffBucketStats `json:"custom_diff_stats,omitempty"` }