From 5e343a73546b396f3c4be1e6d8ae6fb857110aa2 Mon Sep 17 00:00:00 2001 From: Virgil Date: Sat, 4 Apr 2026 21:55:49 +0000 Subject: [PATCH] fix(proxy): disconnect splitters on shutdown Co-Authored-By: Virgil --- splitter/nicehash/impl.go | 16 ++++++++++++++++ splitter/simple/impl.go | 21 +++++++++++++++++++++ state_impl.go | 3 +++ state_stop_test.go | 22 ++++++++++++++++++++-- 4 files changed, 60 insertions(+), 2 deletions(-) diff --git a/splitter/nicehash/impl.go b/splitter/nicehash/impl.go index 9d5d85d..654a484 100644 --- a/splitter/nicehash/impl.go +++ b/splitter/nicehash/impl.go @@ -152,6 +152,22 @@ func (s *NonceSplitter) Upstreams() proxy.UpstreamStats { return stats } +// Disconnect closes all upstream pool connections and forgets the current mapper set. +func (s *NonceSplitter) Disconnect() { + if s == nil { + return + } + s.mu.Lock() + defer s.mu.Unlock() + for _, mapper := range s.mappers { + if mapper != nil && mapper.strategy != nil { + mapper.strategy.Disconnect() + } + } + s.mappers = nil + s.byID = make(map[int64]*NonceMapper) +} + func (s *NonceSplitter) addMapperLocked() *NonceMapper { id := s.seq s.seq++ diff --git a/splitter/simple/impl.go b/splitter/simple/impl.go index 4159918..9f7c7eb 100644 --- a/splitter/simple/impl.go +++ b/splitter/simple/impl.go @@ -195,6 +195,27 @@ func (s *SimpleSplitter) Upstreams() proxy.UpstreamStats { return stats } +// Disconnect closes every active or idle upstream connection and clears the mapper tables. +func (s *SimpleSplitter) Disconnect() { + if s == nil { + return + } + s.mu.Lock() + defer s.mu.Unlock() + for _, mapper := range s.active { + if mapper != nil && mapper.strategy != nil { + mapper.strategy.Disconnect() + } + } + for _, mapper := range s.idle { + if mapper != nil && mapper.strategy != nil { + mapper.strategy.Disconnect() + } + } + s.active = make(map[int64]*SimpleMapper) + s.idle = make(map[int64]*SimpleMapper) +} + func (s *SimpleSplitter) newMapperLocked() *SimpleMapper { id := s.seq s.seq++ diff --git a/state_impl.go b/state_impl.go index 9011cec..8b4f0af 100644 --- a/state_impl.go +++ b/state_impl.go @@ -268,6 +268,9 @@ func (p *Proxy) Stop() { server.Stop() } p.closeAllMiners() + if splitter, ok := p.splitter.(interface{ Disconnect() }); ok { + splitter.Disconnect() + } if p.watcher != nil { p.watcher.Stop() } diff --git a/state_stop_test.go b/state_stop_test.go index 111ac73..944f3c1 100644 --- a/state_stop_test.go +++ b/state_stop_test.go @@ -11,9 +11,11 @@ func TestProxy_Stop_Good(t *testing.T) { defer serverConn.Close() miner := NewMiner(clientConn, 3333, nil) + splitter := &stubSplitter{} proxyInstance := &Proxy{ - done: make(chan struct{}), - miners: map[int64]*Miner{miner.ID(): miner}, + done: make(chan struct{}), + miners: map[int64]*Miner{miner.ID(): miner}, + splitter: splitter, } done := make(chan error, 1) @@ -34,6 +36,9 @@ func TestProxy_Stop_Good(t *testing.T) { case <-time.After(time.Second): t.Fatalf("expected miner connection to close during Stop") } + if !splitter.disconnected { + t.Fatalf("expected splitter to be disconnected during Stop") + } } func TestProxy_Stop_Bad(t *testing.T) { @@ -60,3 +65,16 @@ func TestProxy_Stop_Ugly(t *testing.T) { t.Fatalf("expected closed connection after repeated Stop calls") } } + +type stubSplitter struct { + disconnected bool +} + +func (s *stubSplitter) Connect() {} +func (s *stubSplitter) OnLogin(event *LoginEvent) {} +func (s *stubSplitter) OnSubmit(event *SubmitEvent) {} +func (s *stubSplitter) OnClose(event *CloseEvent) {} +func (s *stubSplitter) Tick(ticks uint64) {} +func (s *stubSplitter) GC() {} +func (s *stubSplitter) Upstreams() UpstreamStats { return UpstreamStats{} } +func (s *stubSplitter) Disconnect() { s.disconnected = true }