fix(proxy): disconnect splitters on shutdown

Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
Virgil 2026-04-04 21:55:49 +00:00
parent 4c2a0ffab7
commit 5e343a7354
4 changed files with 60 additions and 2 deletions

View file

@ -152,6 +152,22 @@ func (s *NonceSplitter) Upstreams() proxy.UpstreamStats {
return stats
}
// Disconnect closes all upstream pool connections and forgets the current mapper set.
func (s *NonceSplitter) Disconnect() {
if s == nil {
return
}
s.mu.Lock()
defer s.mu.Unlock()
for _, mapper := range s.mappers {
if mapper != nil && mapper.strategy != nil {
mapper.strategy.Disconnect()
}
}
s.mappers = nil
s.byID = make(map[int64]*NonceMapper)
}
func (s *NonceSplitter) addMapperLocked() *NonceMapper {
id := s.seq
s.seq++

View file

@ -195,6 +195,27 @@ func (s *SimpleSplitter) Upstreams() proxy.UpstreamStats {
return stats
}
// Disconnect closes every active or idle upstream connection and clears the mapper tables.
func (s *SimpleSplitter) Disconnect() {
if s == nil {
return
}
s.mu.Lock()
defer s.mu.Unlock()
for _, mapper := range s.active {
if mapper != nil && mapper.strategy != nil {
mapper.strategy.Disconnect()
}
}
for _, mapper := range s.idle {
if mapper != nil && mapper.strategy != nil {
mapper.strategy.Disconnect()
}
}
s.active = make(map[int64]*SimpleMapper)
s.idle = make(map[int64]*SimpleMapper)
}
func (s *SimpleSplitter) newMapperLocked() *SimpleMapper {
id := s.seq
s.seq++

View file

@ -268,6 +268,9 @@ func (p *Proxy) Stop() {
server.Stop()
}
p.closeAllMiners()
if splitter, ok := p.splitter.(interface{ Disconnect() }); ok {
splitter.Disconnect()
}
if p.watcher != nil {
p.watcher.Stop()
}

View file

@ -11,9 +11,11 @@ func TestProxy_Stop_Good(t *testing.T) {
defer serverConn.Close()
miner := NewMiner(clientConn, 3333, nil)
splitter := &stubSplitter{}
proxyInstance := &Proxy{
done: make(chan struct{}),
miners: map[int64]*Miner{miner.ID(): miner},
done: make(chan struct{}),
miners: map[int64]*Miner{miner.ID(): miner},
splitter: splitter,
}
done := make(chan error, 1)
@ -34,6 +36,9 @@ func TestProxy_Stop_Good(t *testing.T) {
case <-time.After(time.Second):
t.Fatalf("expected miner connection to close during Stop")
}
if !splitter.disconnected {
t.Fatalf("expected splitter to be disconnected during Stop")
}
}
func TestProxy_Stop_Bad(t *testing.T) {
@ -60,3 +65,16 @@ func TestProxy_Stop_Ugly(t *testing.T) {
t.Fatalf("expected closed connection after repeated Stop calls")
}
}
type stubSplitter struct {
disconnected bool
}
func (s *stubSplitter) Connect() {}
func (s *stubSplitter) OnLogin(event *LoginEvent) {}
func (s *stubSplitter) OnSubmit(event *SubmitEvent) {}
func (s *stubSplitter) OnClose(event *CloseEvent) {}
func (s *stubSplitter) Tick(ticks uint64) {}
func (s *stubSplitter) GC() {}
func (s *stubSplitter) Upstreams() UpstreamStats { return UpstreamStats{} }
func (s *stubSplitter) Disconnect() { s.disconnected = true }