From 01a0cc5907c6168ce1113ca9ebcf2a28b6bc17ab Mon Sep 17 00:00:00 2001 From: Virgil Date: Sun, 5 Apr 2026 03:15:17 +0000 Subject: [PATCH] fix(proxy): drain submits before shutdown Co-Authored-By: Virgil --- state_impl.go | 8 +++---- state_stop_test.go | 57 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 61 insertions(+), 4 deletions(-) diff --git a/state_impl.go b/state_impl.go index dccfa22..2105f07 100644 --- a/state_impl.go +++ b/state_impl.go @@ -283,10 +283,6 @@ func (p *Proxy) Stop() { for _, server := range p.servers { server.Stop() } - p.closeAllMiners() - if splitter, ok := p.splitter.(interface{ Disconnect() }); ok { - splitter.Disconnect() - } if p.watcher != nil { p.watcher.Stop() } @@ -299,6 +295,10 @@ func (p *Proxy) Stop() { for p.submitCount.Load() > 0 && time.Now().Before(deadline) { time.Sleep(10 * time.Millisecond) } + p.closeAllMiners() + if splitter, ok := p.splitter.(interface{ Disconnect() }); ok { + splitter.Disconnect() + } if p.accessLog != nil { p.accessLog.Close() } diff --git a/state_stop_test.go b/state_stop_test.go index 944f3c1..6637ba7 100644 --- a/state_stop_test.go +++ b/state_stop_test.go @@ -66,6 +66,48 @@ func TestProxy_Stop_Ugly(t *testing.T) { } } +func TestProxy_Stop_WaitsBeforeDisconnectingSubmitPaths(t *testing.T) { + serverConn, clientConn := net.Pipe() + defer serverConn.Close() + + miner := NewMiner(clientConn, 3333, nil) + splitter := &blockingStopSplitter{disconnectedCh: make(chan struct{})} + proxyInstance := &Proxy{ + done: make(chan struct{}), + miners: map[int64]*Miner{miner.ID(): miner}, + splitter: splitter, + } + proxyInstance.submitCount.Store(1) + + stopped := make(chan struct{}) + go func() { + proxyInstance.Stop() + close(stopped) + }() + + select { + case <-splitter.disconnectedCh: + t.Fatalf("expected splitter disconnect to wait for submit drain") + case <-stopped: + t.Fatalf("expected Stop to keep waiting while submits are in flight") + case <-time.After(50 * time.Millisecond): + } + + proxyInstance.submitCount.Store(0) + + select { + case <-splitter.disconnectedCh: + case <-time.After(time.Second): + t.Fatalf("expected splitter disconnect after submit drain") + } + + select { + case <-stopped: + case <-time.After(time.Second): + t.Fatalf("expected Stop to finish after submit drain") + } +} + type stubSplitter struct { disconnected bool } @@ -78,3 +120,18 @@ func (s *stubSplitter) Tick(ticks uint64) {} func (s *stubSplitter) GC() {} func (s *stubSplitter) Upstreams() UpstreamStats { return UpstreamStats{} } func (s *stubSplitter) Disconnect() { s.disconnected = true } + +type blockingStopSplitter struct { + disconnectedCh chan struct{} +} + +func (s *blockingStopSplitter) Connect() {} +func (s *blockingStopSplitter) OnLogin(event *LoginEvent) {} +func (s *blockingStopSplitter) OnSubmit(event *SubmitEvent) {} +func (s *blockingStopSplitter) OnClose(event *CloseEvent) {} +func (s *blockingStopSplitter) Tick(ticks uint64) {} +func (s *blockingStopSplitter) GC() {} +func (s *blockingStopSplitter) Upstreams() UpstreamStats { return UpstreamStats{} } +func (s *blockingStopSplitter) Disconnect() { + close(s.disconnectedCh) +}