fix(proxy): drain submits before shutdown
Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
parent
031f0c0f17
commit
01a0cc5907
2 changed files with 61 additions and 4 deletions
|
|
@ -283,10 +283,6 @@ func (p *Proxy) Stop() {
|
||||||
for _, server := range p.servers {
|
for _, server := range p.servers {
|
||||||
server.Stop()
|
server.Stop()
|
||||||
}
|
}
|
||||||
p.closeAllMiners()
|
|
||||||
if splitter, ok := p.splitter.(interface{ Disconnect() }); ok {
|
|
||||||
splitter.Disconnect()
|
|
||||||
}
|
|
||||||
if p.watcher != nil {
|
if p.watcher != nil {
|
||||||
p.watcher.Stop()
|
p.watcher.Stop()
|
||||||
}
|
}
|
||||||
|
|
@ -299,6 +295,10 @@ func (p *Proxy) Stop() {
|
||||||
for p.submitCount.Load() > 0 && time.Now().Before(deadline) {
|
for p.submitCount.Load() > 0 && time.Now().Before(deadline) {
|
||||||
time.Sleep(10 * time.Millisecond)
|
time.Sleep(10 * time.Millisecond)
|
||||||
}
|
}
|
||||||
|
p.closeAllMiners()
|
||||||
|
if splitter, ok := p.splitter.(interface{ Disconnect() }); ok {
|
||||||
|
splitter.Disconnect()
|
||||||
|
}
|
||||||
if p.accessLog != nil {
|
if p.accessLog != nil {
|
||||||
p.accessLog.Close()
|
p.accessLog.Close()
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -66,6 +66,48 @@ func TestProxy_Stop_Ugly(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestProxy_Stop_WaitsBeforeDisconnectingSubmitPaths(t *testing.T) {
|
||||||
|
serverConn, clientConn := net.Pipe()
|
||||||
|
defer serverConn.Close()
|
||||||
|
|
||||||
|
miner := NewMiner(clientConn, 3333, nil)
|
||||||
|
splitter := &blockingStopSplitter{disconnectedCh: make(chan struct{})}
|
||||||
|
proxyInstance := &Proxy{
|
||||||
|
done: make(chan struct{}),
|
||||||
|
miners: map[int64]*Miner{miner.ID(): miner},
|
||||||
|
splitter: splitter,
|
||||||
|
}
|
||||||
|
proxyInstance.submitCount.Store(1)
|
||||||
|
|
||||||
|
stopped := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
proxyInstance.Stop()
|
||||||
|
close(stopped)
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-splitter.disconnectedCh:
|
||||||
|
t.Fatalf("expected splitter disconnect to wait for submit drain")
|
||||||
|
case <-stopped:
|
||||||
|
t.Fatalf("expected Stop to keep waiting while submits are in flight")
|
||||||
|
case <-time.After(50 * time.Millisecond):
|
||||||
|
}
|
||||||
|
|
||||||
|
proxyInstance.submitCount.Store(0)
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-splitter.disconnectedCh:
|
||||||
|
case <-time.After(time.Second):
|
||||||
|
t.Fatalf("expected splitter disconnect after submit drain")
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-stopped:
|
||||||
|
case <-time.After(time.Second):
|
||||||
|
t.Fatalf("expected Stop to finish after submit drain")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
type stubSplitter struct {
|
type stubSplitter struct {
|
||||||
disconnected bool
|
disconnected bool
|
||||||
}
|
}
|
||||||
|
|
@ -78,3 +120,18 @@ func (s *stubSplitter) Tick(ticks uint64) {}
|
||||||
func (s *stubSplitter) GC() {}
|
func (s *stubSplitter) GC() {}
|
||||||
func (s *stubSplitter) Upstreams() UpstreamStats { return UpstreamStats{} }
|
func (s *stubSplitter) Upstreams() UpstreamStats { return UpstreamStats{} }
|
||||||
func (s *stubSplitter) Disconnect() { s.disconnected = true }
|
func (s *stubSplitter) Disconnect() { s.disconnected = true }
|
||||||
|
|
||||||
|
type blockingStopSplitter struct {
|
||||||
|
disconnectedCh chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *blockingStopSplitter) Connect() {}
|
||||||
|
func (s *blockingStopSplitter) OnLogin(event *LoginEvent) {}
|
||||||
|
func (s *blockingStopSplitter) OnSubmit(event *SubmitEvent) {}
|
||||||
|
func (s *blockingStopSplitter) OnClose(event *CloseEvent) {}
|
||||||
|
func (s *blockingStopSplitter) Tick(ticks uint64) {}
|
||||||
|
func (s *blockingStopSplitter) GC() {}
|
||||||
|
func (s *blockingStopSplitter) Upstreams() UpstreamStats { return UpstreamStats{} }
|
||||||
|
func (s *blockingStopSplitter) Disconnect() {
|
||||||
|
close(s.disconnectedCh)
|
||||||
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue