diff --git a/pool/impl.go b/pool/impl.go index 8ef2184..7ad2dc8 100644 --- a/pool/impl.go +++ b/pool/impl.go @@ -418,6 +418,20 @@ func (s *FailoverStrategy) Disconnect() { } } +// ReloadPools reconnects against the latest pool configuration. +// +// strategy.ReloadPools() +func (s *FailoverStrategy) ReloadPools() { + if s == nil { + return + } + s.mu.Lock() + s.current = 0 + s.mu.Unlock() + s.Disconnect() + s.Connect() +} + // IsActive reports whether the current client has received a job. func (s *FailoverStrategy) IsActive() bool { return s != nil && s.client != nil && s.client.IsActive() diff --git a/pool/strategy.go b/pool/strategy.go index c2ed1df..c3a36a1 100644 --- a/pool/strategy.go +++ b/pool/strategy.go @@ -36,3 +36,10 @@ type Strategy interface { Disconnect() IsActive() bool } + +// ReloadableStrategy re-establishes an upstream connection after config changes. +// +// strategy.ReloadPools() +type ReloadableStrategy interface { + ReloadPools() +} diff --git a/reload_test.go b/reload_test.go index 8e0ebe2..f5b2cb9 100644 --- a/reload_test.go +++ b/reload_test.go @@ -2,6 +2,19 @@ package proxy import "testing" +type reloadableSplitter struct { + reloads int +} + +func (s *reloadableSplitter) Connect() {} +func (s *reloadableSplitter) OnLogin(event *LoginEvent) {} +func (s *reloadableSplitter) OnSubmit(event *SubmitEvent) {} +func (s *reloadableSplitter) OnClose(event *CloseEvent) {} +func (s *reloadableSplitter) Tick(ticks uint64) {} +func (s *reloadableSplitter) GC() {} +func (s *reloadableSplitter) Upstreams() UpstreamStats { return UpstreamStats{} } +func (s *reloadableSplitter) ReloadPools() { s.reloads++ } + func TestProxy_Reload_Good(t *testing.T) { original := &Config{ Mode: "nicehash", @@ -145,3 +158,51 @@ func TestProxy_Reload_WatchDisabled_Bad(t *testing.T) { t.Fatalf("expected existing watcher to be stopped") } } + +func TestProxy_Reload_PoolsChanged_ReloadsSplitter_Good(t *testing.T) { + splitter := &reloadableSplitter{} + p := &Proxy{ + config: &Config{ + Mode: "nicehash", + Workers: WorkersByRigID, + Bind: []BindAddr{{Host: "127.0.0.1", Port: 3333}}, + Pools: []PoolConfig{{URL: "pool-a.example:3333", Enabled: true}}, + }, + splitter: splitter, + } + + p.Reload(&Config{ + Mode: "nicehash", + Workers: WorkersByRigID, + Bind: []BindAddr{{Host: "127.0.0.1", Port: 3333}}, + Pools: []PoolConfig{{URL: "pool-b.example:3333", Enabled: true}}, + }) + + if splitter.reloads != 1 { + t.Fatalf("expected pool reload to reconnect upstreams once, got %d", splitter.reloads) + } +} + +func TestProxy_Reload_PoolsUnchanged_DoesNotReloadSplitter_Ugly(t *testing.T) { + splitter := &reloadableSplitter{} + p := &Proxy{ + config: &Config{ + Mode: "nicehash", + Workers: WorkersByRigID, + Bind: []BindAddr{{Host: "127.0.0.1", Port: 3333}}, + Pools: []PoolConfig{{URL: "pool-a.example:3333", Enabled: true}}, + }, + splitter: splitter, + } + + p.Reload(&Config{ + Mode: "nicehash", + Workers: WorkersByRigID, + Bind: []BindAddr{{Host: "127.0.0.1", Port: 3333}}, + Pools: []PoolConfig{{URL: "pool-a.example:3333", Enabled: true}}, + }) + + if splitter.reloads != 0 { + t.Fatalf("expected unchanged pool config to skip reconnect, got %d", splitter.reloads) + } +} diff --git a/splitter/nicehash/impl.go b/splitter/nicehash/impl.go index 4e999c3..e160dd2 100644 --- a/splitter/nicehash/impl.go +++ b/splitter/nicehash/impl.go @@ -172,6 +172,29 @@ func (s *NonceSplitter) Disconnect() { s.byID = make(map[int64]*NonceMapper) } +// ReloadPools reconnects each mapper strategy using the updated pool list. +// +// s.ReloadPools() +func (s *NonceSplitter) ReloadPools() { + if s == nil { + return + } + strategies := make([]pool.Strategy, 0, len(s.mappers)) + s.mu.RLock() + for _, mapper := range s.mappers { + if mapper == nil || mapper.strategy == nil { + continue + } + strategies = append(strategies, mapper.strategy) + } + s.mu.RUnlock() + for _, strategy := range strategies { + if reloadable, ok := strategy.(pool.ReloadableStrategy); ok { + reloadable.ReloadPools() + } + } +} + func (s *NonceSplitter) addMapperLocked() *NonceMapper { id := s.seq s.seq++ diff --git a/splitter/nicehash/reload_test.go b/splitter/nicehash/reload_test.go new file mode 100644 index 0000000..77cd001 --- /dev/null +++ b/splitter/nicehash/reload_test.go @@ -0,0 +1,67 @@ +package nicehash + +import ( + "testing" + + "dappco.re/go/proxy" + "dappco.re/go/proxy/pool" +) + +type reloadableStrategy struct { + reloads int +} + +func (s *reloadableStrategy) Connect() {} +func (s *reloadableStrategy) Submit(jobID, nonce, result, algo string) int64 { return 0 } +func (s *reloadableStrategy) Disconnect() {} +func (s *reloadableStrategy) IsActive() bool { return true } +func (s *reloadableStrategy) ReloadPools() { s.reloads++ } + +var _ pool.ReloadableStrategy = (*reloadableStrategy)(nil) + +func TestNonceSplitter_ReloadPools_Good(t *testing.T) { + strategy := &reloadableStrategy{} + splitter := &NonceSplitter{ + mappers: []*NonceMapper{ + {strategy: strategy}, + }, + } + + splitter.ReloadPools() + + if strategy.reloads != 1 { + t.Fatalf("expected mapper strategy to reload once, got %d", strategy.reloads) + } +} + +func TestNonceSplitter_ReloadPools_Bad(t *testing.T) { + splitter := &NonceSplitter{ + mappers: []*NonceMapper{ + {strategy: nil}, + }, + } + + splitter.ReloadPools() +} + +func TestNonceSplitter_ReloadPools_Ugly(t *testing.T) { + splitter := NewNonceSplitter(&proxy.Config{}, proxy.NewEventBus(), func(listener pool.StratumListener) pool.Strategy { + return &reloadableStrategy{} + }) + splitter.mappers = []*NonceMapper{ + {strategy: &reloadableStrategy{}}, + {strategy: &reloadableStrategy{}}, + } + + splitter.ReloadPools() + + for index, mapper := range splitter.mappers { + strategy, ok := mapper.strategy.(*reloadableStrategy) + if !ok { + t.Fatalf("expected reloadable strategy at mapper %d", index) + } + if strategy.reloads != 1 { + t.Fatalf("expected mapper %d to reload once, got %d", index, strategy.reloads) + } + } +} diff --git a/splitter/simple/impl.go b/splitter/simple/impl.go index 36ea906..b50b205 100644 --- a/splitter/simple/impl.go +++ b/splitter/simple/impl.go @@ -216,6 +216,35 @@ func (s *SimpleSplitter) Disconnect() { s.idle = make(map[int64]*SimpleMapper) } +// ReloadPools reconnects each active or idle mapper using the updated pool list. +// +// s.ReloadPools() +func (s *SimpleSplitter) ReloadPools() { + if s == nil { + return + } + strategies := make([]pool.Strategy, 0, len(s.active)+len(s.idle)) + s.mu.Lock() + for _, mapper := range s.active { + if mapper == nil || mapper.strategy == nil { + continue + } + strategies = append(strategies, mapper.strategy) + } + for _, mapper := range s.idle { + if mapper == nil || mapper.strategy == nil { + continue + } + strategies = append(strategies, mapper.strategy) + } + s.mu.Unlock() + for _, strategy := range strategies { + if reloadable, ok := strategy.(pool.ReloadableStrategy); ok { + reloadable.ReloadPools() + } + } +} + func (s *SimpleSplitter) newMapperLocked() *SimpleMapper { id := s.seq s.seq++ diff --git a/splitter/simple/reload_test.go b/splitter/simple/reload_test.go new file mode 100644 index 0000000..b6aae88 --- /dev/null +++ b/splitter/simple/reload_test.go @@ -0,0 +1,68 @@ +package simple + +import ( + "testing" + + "dappco.re/go/proxy/pool" +) + +type reloadableStrategy struct { + reloads int +} + +func (s *reloadableStrategy) Connect() {} +func (s *reloadableStrategy) Submit(jobID, nonce, result, algo string) int64 { return 0 } +func (s *reloadableStrategy) Disconnect() {} +func (s *reloadableStrategy) IsActive() bool { return true } +func (s *reloadableStrategy) ReloadPools() { s.reloads++ } + +var _ pool.ReloadableStrategy = (*reloadableStrategy)(nil) + +func TestSimpleSplitter_ReloadPools_Good(t *testing.T) { + strategy := &reloadableStrategy{} + splitter := &SimpleSplitter{ + active: map[int64]*SimpleMapper{ + 1: {strategy: strategy}, + }, + idle: map[int64]*SimpleMapper{}, + } + + splitter.ReloadPools() + + if strategy.reloads != 1 { + t.Fatalf("expected active mapper strategy to reload once, got %d", strategy.reloads) + } +} + +func TestSimpleSplitter_ReloadPools_Bad(t *testing.T) { + splitter := &SimpleSplitter{ + active: map[int64]*SimpleMapper{ + 1: {strategy: nil}, + }, + idle: map[int64]*SimpleMapper{}, + } + + splitter.ReloadPools() +} + +func TestSimpleSplitter_ReloadPools_Ugly(t *testing.T) { + active := &reloadableStrategy{} + idle := &reloadableStrategy{} + splitter := &SimpleSplitter{ + active: map[int64]*SimpleMapper{ + 1: {strategy: active}, + }, + idle: map[int64]*SimpleMapper{ + 2: {strategy: idle}, + }, + } + + splitter.ReloadPools() + + if active.reloads != 1 { + t.Fatalf("expected active mapper reload, got %d", active.reloads) + } + if idle.reloads != 1 { + t.Fatalf("expected idle mapper reload, got %d", idle.reloads) + } +} diff --git a/state_impl.go b/state_impl.go index bf7bd92..3874e73 100644 --- a/state_impl.go +++ b/state_impl.go @@ -8,6 +8,7 @@ import ( "errors" "net" "net/http" + "reflect" "sort" "strconv" "strings" @@ -319,6 +320,7 @@ func (p *Proxy) Reload(config *Config) { if result := config.Validate(); !result.OK { return } + poolsChanged := p.config == nil || !reflect.DeepEqual(p.config.Pools, config.Pools) if p.config == nil { p.config = config } else { @@ -351,6 +353,11 @@ func (p *Proxy) Reload(config *Config) { p.shareLog.SetPath(config.ShareLogFile) } p.reloadWatcher(config.Watch) + if poolsChanged { + if reloadable, ok := p.splitter.(interface{ ReloadPools() }); ok { + reloadable.ReloadPools() + } + } } func (p *Proxy) reloadWatcher(enabled bool) {