fix(proxy): drain pending submits on stop
Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
parent
d42c21438a
commit
c7d688ccfa
2 changed files with 53 additions and 2 deletions
|
|
@ -75,6 +75,8 @@ func New(cfg *Config) (*Proxy, Result) {
|
||||||
p.events.Subscribe(EventAccept, p.customDiffBuckets.OnAccept)
|
p.events.Subscribe(EventAccept, p.customDiffBuckets.OnAccept)
|
||||||
p.events.Subscribe(EventReject, p.customDiffBuckets.OnReject)
|
p.events.Subscribe(EventReject, p.customDiffBuckets.OnReject)
|
||||||
}
|
}
|
||||||
|
p.events.Subscribe(EventAccept, p.onShareSettled)
|
||||||
|
p.events.Subscribe(EventReject, p.onShareSettled)
|
||||||
if cfg.Watch && cfg.sourcePath != "" {
|
if cfg.Watch && cfg.sourcePath != "" {
|
||||||
p.watcher = NewConfigWatcher(cfg.sourcePath, p.Reload)
|
p.watcher = NewConfigWatcher(cfg.sourcePath, p.Reload)
|
||||||
}
|
}
|
||||||
|
|
@ -339,6 +341,21 @@ func (p *Proxy) Reload(cfg *Config) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *Proxy) onShareSettled(Event) {
|
||||||
|
if p == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
for {
|
||||||
|
current := p.submitCount.Load()
|
||||||
|
if current == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if p.submitCount.CompareAndSwap(current, current-1) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (p *Proxy) acceptMiner(conn net.Conn, localPort uint16) {
|
func (p *Proxy) acceptMiner(conn net.Conn, localPort uint16) {
|
||||||
if p == nil {
|
if p == nil {
|
||||||
_ = conn.Close()
|
_ = conn.Close()
|
||||||
|
|
@ -361,9 +378,10 @@ func (p *Proxy) acceptMiner(conn net.Conn, localPort uint16) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
miner.onSubmit = func(m *Miner, event *SubmitEvent) {
|
miner.onSubmit = func(m *Miner, event *SubmitEvent) {
|
||||||
p.submitCount.Add(1)
|
|
||||||
defer p.submitCount.Add(-1)
|
|
||||||
if p.splitter != nil {
|
if p.splitter != nil {
|
||||||
|
if _, ok := p.splitter.(*noopSplitter); !ok {
|
||||||
|
p.submitCount.Add(1)
|
||||||
|
}
|
||||||
p.splitter.OnSubmit(event)
|
p.splitter.OnSubmit(event)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
33
state_submit_test.go
Normal file
33
state_submit_test.go
Normal file
|
|
@ -0,0 +1,33 @@
|
||||||
|
package proxy
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestProxy_Stop_WaitsForSubmitDrain(t *testing.T) {
|
||||||
|
p := &Proxy{
|
||||||
|
done: make(chan struct{}),
|
||||||
|
}
|
||||||
|
p.submitCount.Store(1)
|
||||||
|
|
||||||
|
stopped := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
p.Stop()
|
||||||
|
close(stopped)
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-stopped:
|
||||||
|
t.Fatalf("expected Stop to wait for pending submits")
|
||||||
|
case <-time.After(50 * time.Millisecond):
|
||||||
|
}
|
||||||
|
|
||||||
|
p.submitCount.Store(0)
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-stopped:
|
||||||
|
case <-time.After(time.Second):
|
||||||
|
t.Fatalf("expected Stop to finish after pending submits drain")
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
Add table
Reference in a new issue