feat(proxy): dispatch submits and drain shutdown

Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
Virgil 2026-04-04 11:18:35 +00:00
parent 465ea38308
commit e41ad7ef2e
7 changed files with 261 additions and 8 deletions

View file

@ -32,13 +32,18 @@ type EventHandler func(Event)
//
// bus.Dispatch(proxy.Event{Type: proxy.EventLogin, Miner: m})
type Event struct {
Type EventType
Miner *Miner // always set
Job *Job // set for Accept and Reject events
Diff uint64 // effective difficulty of the share (Accept and Reject)
Error string // rejection reason (Reject only)
Latency uint16 // pool response time in ms (Accept and Reject)
Expired bool // true if the share was accepted but against the previous job
Type EventType
Miner *Miner // always set
Job *Job // set for Accept and Reject events
JobID string // set for Submit events
Nonce string // set for Submit events
Result string // set for Submit events
Algo string // set for Submit events
RequestID int64 // set for Submit events
Diff uint64 // effective difficulty of the share (Accept and Reject)
Error string // rejection reason (Reject only)
Latency uint16 // pool response time in ms (Accept and Reject)
Expired bool // true if the share was accepted but against the previous job
}
// NewEventBus builds an empty synchronous event dispatcher.

View file

@ -294,6 +294,19 @@ func (m *Miner) handleSubmit(request minerRequest) {
}
m.Touch()
if m.events != nil {
m.events.Dispatch(Event{
Type: EventSubmit,
Miner: m,
JobID: params.JobID,
Nonce: params.Nonce,
Result: params.Result,
Algo: params.Algo,
RequestID: request.ID,
})
return
}
if m.splitter != nil {
m.splitter.OnSubmit(&SubmitEvent{
Miner: m,

View file

@ -225,3 +225,62 @@ func TestMiner_Login_NiceHashPatchedJob_Good(t *testing.T) {
t.Fatalf("expected patched NiceHash blob, got %q", blob)
}
}
func TestMiner_Submit_Good(t *testing.T) {
serverConn, clientConn := net.Pipe()
defer clientConn.Close()
miner := NewMiner(serverConn, 3333, nil)
miner.events = NewEventBus()
miner.SetRPCID("session")
miner.SetState(MinerStateReady)
submitSeen := make(chan Event, 1)
miner.events.Subscribe(EventSubmit, func(event Event) {
submitSeen <- event
miner.Success(event.RequestID, "OK")
})
miner.Start()
defer miner.Close()
encoder := json.NewEncoder(clientConn)
if err := encoder.Encode(map[string]interface{}{
"id": 6,
"jsonrpc": "2.0",
"method": "submit",
"params": map[string]interface{}{
"id": "session",
"job_id": "job-1",
"nonce": "deadbeef",
"result": "abc",
"algo": "cn/r",
},
}); err != nil {
t.Fatal(err)
}
select {
case event := <-submitSeen:
if event.JobID != "job-1" || event.Nonce != "deadbeef" || event.Algo != "cn/r" {
t.Fatalf("unexpected submit event: %+v", event)
}
case <-time.After(time.Second):
t.Fatal("expected submit event to be dispatched")
}
clientConn.SetReadDeadline(time.Now().Add(time.Second))
line, err := bufio.NewReader(clientConn).ReadBytes('\n')
if err != nil {
t.Fatal(err)
}
var response map[string]interface{}
if err := json.Unmarshal(line, &response); err != nil {
t.Fatal(err)
}
result := response["result"].(map[string]interface{})
if result["status"] != "OK" {
t.Fatalf("unexpected submit response: %#v", response)
}
}

View file

@ -9,6 +9,11 @@ import (
var proxyMinerCount atomic.Uint64
type splitterShutdown interface {
PendingCount() int
Disconnect()
}
// New creates and wires all subsystems but does not start the tick loop or TCP listeners.
//
// p, errorValue := proxy.New(cfg)
@ -67,6 +72,16 @@ func New(cfg *Config) (*Proxy, error) {
events.Subscribe(EventAccept, stats.OnAccept)
events.Subscribe(EventReject, stats.OnReject)
if splitter != nil {
events.Subscribe(EventSubmit, func(event Event) {
splitter.OnSubmit(&SubmitEvent{
Miner: event.Miner,
JobID: event.JobID,
Nonce: event.Nonce,
Result: event.Result,
Algo: event.Algo,
RequestID: event.RequestID,
})
})
events.Subscribe(EventLogin, func(event Event) {
splitter.OnLogin(&LoginEvent{Miner: event.Miner})
})
@ -148,6 +163,8 @@ func (noopSplitter) OnClose(event *CloseEvent) {}
func (noopSplitter) Tick(ticks uint64) {}
func (noopSplitter) GC() {}
func (noopSplitter) Upstreams() UpstreamStats { return UpstreamStats{} }
func (noopSplitter) PendingCount() int { return 0 }
func (noopSplitter) Disconnect() {}
func noopSplitterFactory(cfg *Config, events *EventBus) Splitter {
return noopSplitter{}
@ -167,6 +184,30 @@ func (p *Proxy) Stop() {
if p.watcher != nil {
p.watcher.Stop()
}
if shutdown, ok := p.splitter.(splitterShutdown); ok {
deadline := time.Now().Add(5 * time.Second)
for shutdown.PendingCount() > 0 && time.Now().Before(deadline) {
time.Sleep(50 * time.Millisecond)
}
}
p.minerMu.RLock()
miners := make([]*Miner, 0, len(p.miners))
for _, miner := range p.miners {
miners = append(miners, miner)
}
p.minerMu.RUnlock()
for _, miner := range miners {
if miner != nil {
miner.Close()
}
}
if shutdown, ok := p.splitter.(splitterShutdown); ok {
shutdown.Disconnect()
}
select {
case <-p.done:
default:

View file

@ -164,6 +164,45 @@ func (s *NonceSplitter) Upstreams() proxy.UpstreamStats {
return stats
}
func (s *NonceSplitter) PendingCount() int {
s.mu.RLock()
mappers := append([]*NonceMapper(nil), s.mappers...)
s.mu.RUnlock()
pending := 0
for _, mapper := range mappers {
if mapper == nil {
continue
}
mapper.mu.Lock()
pending += len(mapper.pending)
mapper.mu.Unlock()
}
return pending
}
func (s *NonceSplitter) Disconnect() {
s.mu.Lock()
mappers := s.mappers
s.mappers = nil
s.mu.Unlock()
for _, mapper := range mappers {
if mapper == nil {
continue
}
mapper.mu.Lock()
strategy := mapper.strategy
mapper.strategy = nil
mapper.active = false
mapper.suspended = 0
mapper.mu.Unlock()
if strategy != nil {
strategy.Disconnect()
}
}
}
func (s *NonceSplitter) newMapperLocked() *NonceMapper {
mapperID := int64(len(s.mappers) + 1)
mapper := NewNonceMapper(mapperID, s.cfg, nil)

View file

@ -51,9 +51,15 @@ func (s *SimpleSplitter) OnLogin(event *proxy.LoginEvent) {
s.mu.Lock()
defer s.mu.Unlock()
timeout := time.Duration(0)
if s.cfg != nil && s.cfg.ReuseTimeout > 0 {
timeout = time.Duration(s.cfg.ReuseTimeout) * time.Second
}
var mapper *SimpleMapper
now := time.Now().UTC()
for mapperID, idleMapper := range s.idle {
if idleMapper == nil || idleMapper.stopped || idleMapper.strategy == nil || !idleMapper.strategy.IsActive() {
if idleMapper == nil || idleMapper.stopped || idleMapper.strategy == nil || !idleMapper.strategy.IsActive() || (timeout > 0 && !idleMapper.idleAt.IsZero() && now.Sub(idleMapper.idleAt) > timeout) {
if idleMapper != nil && idleMapper.strategy != nil {
idleMapper.strategy.Disconnect()
}
@ -202,3 +208,65 @@ func (s *SimpleSplitter) Upstreams() proxy.UpstreamStats {
stats.Total += uint64(len(s.idle))
return stats
}
func (s *SimpleSplitter) PendingCount() int {
s.mu.Lock()
mapperList := make([]*SimpleMapper, 0, len(s.active)+len(s.idle))
for _, mapper := range s.active {
mapperList = append(mapperList, mapper)
}
for _, mapper := range s.idle {
mapperList = append(mapperList, mapper)
}
s.mu.Unlock()
pending := 0
for _, mapper := range mapperList {
if mapper == nil {
continue
}
mapper.mu.Lock()
pending += len(mapper.pending)
mapper.mu.Unlock()
}
return pending
}
func (s *SimpleSplitter) Disconnect() {
s.mu.Lock()
active := s.active
idle := s.idle
s.active = make(map[int64]*SimpleMapper)
s.idle = make(map[int64]*SimpleMapper)
s.mu.Unlock()
for _, mapper := range active {
if mapper == nil {
continue
}
mapper.mu.Lock()
mapper.stopped = true
strategy := mapper.strategy
mapper.strategy = nil
mapper.miner = nil
mapper.mu.Unlock()
if strategy != nil {
strategy.Disconnect()
}
}
for _, mapper := range idle {
if mapper == nil {
continue
}
mapper.mu.Lock()
mapper.stopped = true
strategy := mapper.strategy
mapper.strategy = nil
mapper.miner = nil
mapper.mu.Unlock()
if strategy != nil {
strategy.Disconnect()
}
}
}

View file

@ -59,3 +59,31 @@ func TestSimpleSplitter_OnLogin_Ugly(t *testing.T) {
t.Fatal("expected miner to receive a route ID")
}
}
func TestSimpleSplitter_OnLogin_Bad(t *testing.T) {
activeStrategy := &fakeStrategy{active: true}
splitter := &SimpleSplitter{
active: make(map[int64]*SimpleMapper),
idle: map[int64]*SimpleMapper{
1: {
id: 1,
strategy: activeStrategy,
idleAt: time.Now().UTC().Add(-2 * time.Minute),
},
},
cfg: &proxy.Config{ReuseTimeout: 60},
factory: func(listener pool.StratumListener) pool.Strategy {
return activeStrategy
},
}
miner := &proxy.Miner{}
splitter.OnLogin(&proxy.LoginEvent{Miner: miner})
if len(splitter.idle) != 0 {
t.Fatalf("expected stale idle mapper to be discarded, got %d idle mappers", len(splitter.idle))
}
if len(splitter.active) != 1 {
t.Fatalf("expected one active mapper, got %d active mappers", len(splitter.active))
}
}