diff --git a/events.go b/events.go index c366d3f..e12058b 100644 --- a/events.go +++ b/events.go @@ -32,13 +32,18 @@ type EventHandler func(Event) // // bus.Dispatch(proxy.Event{Type: proxy.EventLogin, Miner: m}) type Event struct { - Type EventType - Miner *Miner // always set - Job *Job // set for Accept and Reject events - Diff uint64 // effective difficulty of the share (Accept and Reject) - Error string // rejection reason (Reject only) - Latency uint16 // pool response time in ms (Accept and Reject) - Expired bool // true if the share was accepted but against the previous job + Type EventType + Miner *Miner // always set + Job *Job // set for Accept and Reject events + JobID string // set for Submit events + Nonce string // set for Submit events + Result string // set for Submit events + Algo string // set for Submit events + RequestID int64 // set for Submit events + Diff uint64 // effective difficulty of the share (Accept and Reject) + Error string // rejection reason (Reject only) + Latency uint16 // pool response time in ms (Accept and Reject) + Expired bool // true if the share was accepted but against the previous job } // NewEventBus builds an empty synchronous event dispatcher. diff --git a/miner_runtime.go b/miner_runtime.go index 07385df..a85ab0d 100644 --- a/miner_runtime.go +++ b/miner_runtime.go @@ -294,6 +294,19 @@ func (m *Miner) handleSubmit(request minerRequest) { } m.Touch() + if m.events != nil { + m.events.Dispatch(Event{ + Type: EventSubmit, + Miner: m, + JobID: params.JobID, + Nonce: params.Nonce, + Result: params.Result, + Algo: params.Algo, + RequestID: request.ID, + }) + return + } + if m.splitter != nil { m.splitter.OnSubmit(&SubmitEvent{ Miner: m, diff --git a/miner_runtime_test.go b/miner_runtime_test.go index 7518e55..57a73cc 100644 --- a/miner_runtime_test.go +++ b/miner_runtime_test.go @@ -225,3 +225,62 @@ func TestMiner_Login_NiceHashPatchedJob_Good(t *testing.T) { t.Fatalf("expected patched NiceHash blob, got %q", blob) } } + +func TestMiner_Submit_Good(t *testing.T) { + serverConn, clientConn := net.Pipe() + defer clientConn.Close() + + miner := NewMiner(serverConn, 3333, nil) + miner.events = NewEventBus() + miner.SetRPCID("session") + miner.SetState(MinerStateReady) + + submitSeen := make(chan Event, 1) + miner.events.Subscribe(EventSubmit, func(event Event) { + submitSeen <- event + miner.Success(event.RequestID, "OK") + }) + + miner.Start() + defer miner.Close() + + encoder := json.NewEncoder(clientConn) + if err := encoder.Encode(map[string]interface{}{ + "id": 6, + "jsonrpc": "2.0", + "method": "submit", + "params": map[string]interface{}{ + "id": "session", + "job_id": "job-1", + "nonce": "deadbeef", + "result": "abc", + "algo": "cn/r", + }, + }); err != nil { + t.Fatal(err) + } + + select { + case event := <-submitSeen: + if event.JobID != "job-1" || event.Nonce != "deadbeef" || event.Algo != "cn/r" { + t.Fatalf("unexpected submit event: %+v", event) + } + case <-time.After(time.Second): + t.Fatal("expected submit event to be dispatched") + } + + clientConn.SetReadDeadline(time.Now().Add(time.Second)) + line, err := bufio.NewReader(clientConn).ReadBytes('\n') + if err != nil { + t.Fatal(err) + } + + var response map[string]interface{} + if err := json.Unmarshal(line, &response); err != nil { + t.Fatal(err) + } + result := response["result"].(map[string]interface{}) + if result["status"] != "OK" { + t.Fatalf("unexpected submit response: %#v", response) + } +} diff --git a/proxy_runtime.go b/proxy_runtime.go index f940b17..d4059b5 100644 --- a/proxy_runtime.go +++ b/proxy_runtime.go @@ -9,6 +9,11 @@ import ( var proxyMinerCount atomic.Uint64 +type splitterShutdown interface { + PendingCount() int + Disconnect() +} + // New creates and wires all subsystems but does not start the tick loop or TCP listeners. // // p, errorValue := proxy.New(cfg) @@ -67,6 +72,16 @@ func New(cfg *Config) (*Proxy, error) { events.Subscribe(EventAccept, stats.OnAccept) events.Subscribe(EventReject, stats.OnReject) if splitter != nil { + events.Subscribe(EventSubmit, func(event Event) { + splitter.OnSubmit(&SubmitEvent{ + Miner: event.Miner, + JobID: event.JobID, + Nonce: event.Nonce, + Result: event.Result, + Algo: event.Algo, + RequestID: event.RequestID, + }) + }) events.Subscribe(EventLogin, func(event Event) { splitter.OnLogin(&LoginEvent{Miner: event.Miner}) }) @@ -148,6 +163,8 @@ func (noopSplitter) OnClose(event *CloseEvent) {} func (noopSplitter) Tick(ticks uint64) {} func (noopSplitter) GC() {} func (noopSplitter) Upstreams() UpstreamStats { return UpstreamStats{} } +func (noopSplitter) PendingCount() int { return 0 } +func (noopSplitter) Disconnect() {} func noopSplitterFactory(cfg *Config, events *EventBus) Splitter { return noopSplitter{} @@ -167,6 +184,30 @@ func (p *Proxy) Stop() { if p.watcher != nil { p.watcher.Stop() } + + if shutdown, ok := p.splitter.(splitterShutdown); ok { + deadline := time.Now().Add(5 * time.Second) + for shutdown.PendingCount() > 0 && time.Now().Before(deadline) { + time.Sleep(50 * time.Millisecond) + } + } + + p.minerMu.RLock() + miners := make([]*Miner, 0, len(p.miners)) + for _, miner := range p.miners { + miners = append(miners, miner) + } + p.minerMu.RUnlock() + for _, miner := range miners { + if miner != nil { + miner.Close() + } + } + + if shutdown, ok := p.splitter.(splitterShutdown); ok { + shutdown.Disconnect() + } + select { case <-p.done: default: diff --git a/splitter/nicehash/splitter.go b/splitter/nicehash/splitter.go index 7a16a43..d194395 100644 --- a/splitter/nicehash/splitter.go +++ b/splitter/nicehash/splitter.go @@ -164,6 +164,45 @@ func (s *NonceSplitter) Upstreams() proxy.UpstreamStats { return stats } +func (s *NonceSplitter) PendingCount() int { + s.mu.RLock() + mappers := append([]*NonceMapper(nil), s.mappers...) + s.mu.RUnlock() + + pending := 0 + for _, mapper := range mappers { + if mapper == nil { + continue + } + mapper.mu.Lock() + pending += len(mapper.pending) + mapper.mu.Unlock() + } + return pending +} + +func (s *NonceSplitter) Disconnect() { + s.mu.Lock() + mappers := s.mappers + s.mappers = nil + s.mu.Unlock() + + for _, mapper := range mappers { + if mapper == nil { + continue + } + mapper.mu.Lock() + strategy := mapper.strategy + mapper.strategy = nil + mapper.active = false + mapper.suspended = 0 + mapper.mu.Unlock() + if strategy != nil { + strategy.Disconnect() + } + } +} + func (s *NonceSplitter) newMapperLocked() *NonceMapper { mapperID := int64(len(s.mappers) + 1) mapper := NewNonceMapper(mapperID, s.cfg, nil) diff --git a/splitter/simple/splitter.go b/splitter/simple/splitter.go index 31a8ef2..c7bcc33 100644 --- a/splitter/simple/splitter.go +++ b/splitter/simple/splitter.go @@ -51,9 +51,15 @@ func (s *SimpleSplitter) OnLogin(event *proxy.LoginEvent) { s.mu.Lock() defer s.mu.Unlock() + timeout := time.Duration(0) + if s.cfg != nil && s.cfg.ReuseTimeout > 0 { + timeout = time.Duration(s.cfg.ReuseTimeout) * time.Second + } + var mapper *SimpleMapper + now := time.Now().UTC() for mapperID, idleMapper := range s.idle { - if idleMapper == nil || idleMapper.stopped || idleMapper.strategy == nil || !idleMapper.strategy.IsActive() { + if idleMapper == nil || idleMapper.stopped || idleMapper.strategy == nil || !idleMapper.strategy.IsActive() || (timeout > 0 && !idleMapper.idleAt.IsZero() && now.Sub(idleMapper.idleAt) > timeout) { if idleMapper != nil && idleMapper.strategy != nil { idleMapper.strategy.Disconnect() } @@ -202,3 +208,65 @@ func (s *SimpleSplitter) Upstreams() proxy.UpstreamStats { stats.Total += uint64(len(s.idle)) return stats } + +func (s *SimpleSplitter) PendingCount() int { + s.mu.Lock() + mapperList := make([]*SimpleMapper, 0, len(s.active)+len(s.idle)) + for _, mapper := range s.active { + mapperList = append(mapperList, mapper) + } + for _, mapper := range s.idle { + mapperList = append(mapperList, mapper) + } + s.mu.Unlock() + + pending := 0 + for _, mapper := range mapperList { + if mapper == nil { + continue + } + mapper.mu.Lock() + pending += len(mapper.pending) + mapper.mu.Unlock() + } + return pending +} + +func (s *SimpleSplitter) Disconnect() { + s.mu.Lock() + active := s.active + idle := s.idle + s.active = make(map[int64]*SimpleMapper) + s.idle = make(map[int64]*SimpleMapper) + s.mu.Unlock() + + for _, mapper := range active { + if mapper == nil { + continue + } + mapper.mu.Lock() + mapper.stopped = true + strategy := mapper.strategy + mapper.strategy = nil + mapper.miner = nil + mapper.mu.Unlock() + if strategy != nil { + strategy.Disconnect() + } + } + + for _, mapper := range idle { + if mapper == nil { + continue + } + mapper.mu.Lock() + mapper.stopped = true + strategy := mapper.strategy + mapper.strategy = nil + mapper.miner = nil + mapper.mu.Unlock() + if strategy != nil { + strategy.Disconnect() + } + } +} diff --git a/splitter/simple/splitter_test.go b/splitter/simple/splitter_test.go index 2a5ddd4..e28a555 100644 --- a/splitter/simple/splitter_test.go +++ b/splitter/simple/splitter_test.go @@ -59,3 +59,31 @@ func TestSimpleSplitter_OnLogin_Ugly(t *testing.T) { t.Fatal("expected miner to receive a route ID") } } + +func TestSimpleSplitter_OnLogin_Bad(t *testing.T) { + activeStrategy := &fakeStrategy{active: true} + splitter := &SimpleSplitter{ + active: make(map[int64]*SimpleMapper), + idle: map[int64]*SimpleMapper{ + 1: { + id: 1, + strategy: activeStrategy, + idleAt: time.Now().UTC().Add(-2 * time.Minute), + }, + }, + cfg: &proxy.Config{ReuseTimeout: 60}, + factory: func(listener pool.StratumListener) pool.Strategy { + return activeStrategy + }, + } + + miner := &proxy.Miner{} + splitter.OnLogin(&proxy.LoginEvent{Miner: miner}) + + if len(splitter.idle) != 0 { + t.Fatalf("expected stale idle mapper to be discarded, got %d idle mappers", len(splitter.idle)) + } + if len(splitter.active) != 1 { + t.Fatalf("expected one active mapper, got %d active mappers", len(splitter.active)) + } +}