fix(proxy): keep runtime state local
Preserve bind addresses on reload and track active miners per Proxy instance. Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
parent
e41ad7ef2e
commit
9e906d11f9
3 changed files with 96 additions and 21 deletions
30
proxy.go
30
proxy.go
|
|
@ -13,6 +13,7 @@ package proxy
|
|||
import (
|
||||
"net/http"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
|
|
@ -22,20 +23,21 @@ import (
|
|||
// p, result := proxy.New(cfg)
|
||||
// if result.OK { p.Start() }
|
||||
type Proxy struct {
|
||||
config *Config
|
||||
customDiff *CustomDiff
|
||||
rateLimiter *RateLimiter
|
||||
splitter Splitter
|
||||
stats *Stats
|
||||
workers *Workers
|
||||
events *EventBus
|
||||
miners map[int64]*Miner
|
||||
minerMu sync.RWMutex
|
||||
servers []*Server
|
||||
httpServer *http.Server
|
||||
ticker *time.Ticker
|
||||
watcher *ConfigWatcher
|
||||
done chan struct{}
|
||||
config *Config
|
||||
customDiff *CustomDiff
|
||||
rateLimiter *RateLimiter
|
||||
splitter Splitter
|
||||
stats *Stats
|
||||
workers *Workers
|
||||
events *EventBus
|
||||
currentMiners atomic.Uint64
|
||||
miners map[int64]*Miner
|
||||
minerMu sync.RWMutex
|
||||
servers []*Server
|
||||
httpServer *http.Server
|
||||
ticker *time.Ticker
|
||||
watcher *ConfigWatcher
|
||||
done chan struct{}
|
||||
}
|
||||
|
||||
// Splitter is the interface both NonceSplitter and SimpleSplitter satisfy.
|
||||
|
|
|
|||
|
|
@ -3,12 +3,9 @@ package proxy
|
|||
import (
|
||||
"crypto/tls"
|
||||
"net"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
var proxyMinerCount atomic.Uint64
|
||||
|
||||
type splitterShutdown interface {
|
||||
PendingCount() int
|
||||
Disconnect()
|
||||
|
|
@ -51,7 +48,7 @@ func New(cfg *Config) (*Proxy, error) {
|
|||
proxyValue.minerMu.Unlock()
|
||||
}
|
||||
stats.connections.Add(1)
|
||||
current := proxyMinerCount.Add(1)
|
||||
current := proxyValue.currentMiners.Add(1)
|
||||
for {
|
||||
maximum := stats.maxMiners.Load()
|
||||
if current <= maximum || stats.maxMiners.CompareAndSwap(maximum, current) {
|
||||
|
|
@ -65,8 +62,8 @@ func New(cfg *Config) (*Proxy, error) {
|
|||
delete(proxyValue.miners, event.Miner.ID())
|
||||
proxyValue.minerMu.Unlock()
|
||||
}
|
||||
if proxyMinerCount.Load() > 0 {
|
||||
proxyMinerCount.Add(^uint64(0))
|
||||
if proxyValue.currentMiners.Load() > 0 {
|
||||
proxyValue.currentMiners.Add(^uint64(0))
|
||||
}
|
||||
})
|
||||
events.Subscribe(EventAccept, stats.OnAccept)
|
||||
|
|
@ -224,8 +221,10 @@ func (p *Proxy) Reload(cfg *Config) {
|
|||
p.config = cfg
|
||||
} else {
|
||||
sourcePath := p.config.sourcePath
|
||||
bind := append([]BindAddr(nil), p.config.Bind...)
|
||||
*p.config = *cfg
|
||||
p.config.sourcePath = sourcePath
|
||||
p.config.Bind = bind
|
||||
}
|
||||
if p.customDiff != nil {
|
||||
p.customDiff.SetGlobalDiff(p.config.CustomDiff)
|
||||
|
|
@ -266,7 +265,10 @@ func (p *Proxy) Miners() []*Miner {
|
|||
}
|
||||
|
||||
func (p *Proxy) CurrentMiners() uint64 {
|
||||
return proxyMinerCount.Load()
|
||||
if p == nil {
|
||||
return 0
|
||||
}
|
||||
return p.currentMiners.Load()
|
||||
}
|
||||
|
||||
func (p *Proxy) MaxMiners() uint64 {
|
||||
|
|
|
|||
71
proxy_runtime_test.go
Normal file
71
proxy_runtime_test.go
Normal file
|
|
@ -0,0 +1,71 @@
|
|||
package proxy
|
||||
|
||||
import "testing"
|
||||
|
||||
func TestProxy_Reload_Good(t *testing.T) {
|
||||
cfg := &Config{
|
||||
Mode: "noop",
|
||||
Bind: []BindAddr{{Host: "127.0.0.1", Port: 3333}},
|
||||
Pools: []PoolConfig{{URL: "pool-a:3333", Enabled: true}},
|
||||
CustomDiff: 100,
|
||||
Workers: WorkersDisabled,
|
||||
}
|
||||
proxyValue, errorValue := New(cfg)
|
||||
if errorValue != nil {
|
||||
t.Fatal(errorValue)
|
||||
}
|
||||
|
||||
reloadCfg := &Config{
|
||||
Mode: "simple",
|
||||
Bind: []BindAddr{{Host: "0.0.0.0", Port: 4444}},
|
||||
Pools: []PoolConfig{{URL: "pool-b:4444", Enabled: true}},
|
||||
CustomDiff: 250,
|
||||
Workers: WorkersByUser,
|
||||
}
|
||||
|
||||
proxyValue.Reload(reloadCfg)
|
||||
|
||||
if len(proxyValue.config.Bind) != 1 || proxyValue.config.Bind[0].Port != 3333 {
|
||||
t.Fatalf("expected bind addresses to remain unchanged, got %+v", proxyValue.config.Bind)
|
||||
}
|
||||
if len(proxyValue.config.Pools) != 1 || proxyValue.config.Pools[0].URL != "pool-b:4444" {
|
||||
t.Fatalf("expected pools to reload, got %+v", proxyValue.config.Pools)
|
||||
}
|
||||
if proxyValue.config.CustomDiff != 250 {
|
||||
t.Fatalf("expected custom diff to reload, got %d", proxyValue.config.CustomDiff)
|
||||
}
|
||||
if proxyValue.customDiff == nil || proxyValue.customDiff.globalDiff != 250 {
|
||||
t.Fatalf("expected live custom diff to update, got %+v", proxyValue.customDiff)
|
||||
}
|
||||
}
|
||||
|
||||
func TestProxy_CurrentMiners_Good(t *testing.T) {
|
||||
cfg := &Config{
|
||||
Mode: "noop",
|
||||
Bind: []BindAddr{{Host: "127.0.0.1", Port: 3333}},
|
||||
Pools: []PoolConfig{{URL: "pool-a:3333", Enabled: true}},
|
||||
Workers: WorkersDisabled,
|
||||
}
|
||||
firstProxy, errorValue := New(cfg)
|
||||
if errorValue != nil {
|
||||
t.Fatal(errorValue)
|
||||
}
|
||||
secondProxy, errorValue := New(cfg)
|
||||
if errorValue != nil {
|
||||
t.Fatal(errorValue)
|
||||
}
|
||||
|
||||
miner := &Miner{}
|
||||
firstProxy.events.Dispatch(Event{Type: EventLogin, Miner: miner})
|
||||
if got := firstProxy.CurrentMiners(); got != 1 {
|
||||
t.Fatalf("expected first proxy miner count 1, got %d", got)
|
||||
}
|
||||
if got := secondProxy.CurrentMiners(); got != 0 {
|
||||
t.Fatalf("expected second proxy miner count 0, got %d", got)
|
||||
}
|
||||
|
||||
firstProxy.events.Dispatch(Event{Type: EventClose, Miner: miner})
|
||||
if got := firstProxy.CurrentMiners(); got != 0 {
|
||||
t.Fatalf("expected first proxy miner count to return to 0, got %d", got)
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Reference in a new issue