fix(proxy): align pool failover and simple routing
Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
parent
b5e4a6499f
commit
a1f47f5792
4 changed files with 92 additions and 1 deletions
|
|
@ -297,6 +297,11 @@ func (c *StratumClient) handleMessage(line []byte) {
|
|||
}
|
||||
}
|
||||
|
||||
if len(base.Error) > 0 && requestID(base.ID) == 1 {
|
||||
c.notifyDisconnect()
|
||||
return
|
||||
}
|
||||
|
||||
if base.Method == "job" {
|
||||
var params struct {
|
||||
Blob string `json:"blob"`
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
package pool
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"net"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
|
|
@ -119,3 +120,29 @@ func TestFailoverStrategy_OnDisconnect_ClearsClient_Bad(t *testing.T) {
|
|||
t.Fatalf("expected one disconnect notification, got %d", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStratumClient_HandleMessage_LoginErrorDisconnects_Ugly(t *testing.T) {
|
||||
spy := &disconnectSpy{}
|
||||
client := &StratumClient{
|
||||
listener: spy,
|
||||
pending: make(map[int64]struct{}),
|
||||
}
|
||||
|
||||
payload, err := json.Marshal(map[string]any{
|
||||
"id": 1,
|
||||
"jsonrpc": "2.0",
|
||||
"error": map[string]any{
|
||||
"code": -1,
|
||||
"message": "Invalid payment address provided",
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("marshal login error payload: %v", err)
|
||||
}
|
||||
|
||||
client.handleMessage(payload)
|
||||
|
||||
if got := spy.disconnects.Load(); got != 1 {
|
||||
t.Fatalf("expected login failure to disconnect upstream once, got %d", got)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -87,7 +87,7 @@ func (s *SimpleSplitter) OnSubmit(event *proxy.SubmitEvent) {
|
|||
return
|
||||
}
|
||||
s.mu.Lock()
|
||||
mapper := s.active[event.Miner.ID()]
|
||||
mapper := s.activeMapperByRouteIDLocked(event.Miner.RouteID())
|
||||
s.mu.Unlock()
|
||||
if mapper != nil {
|
||||
mapper.Submit(event)
|
||||
|
|
@ -257,6 +257,18 @@ func (s *SimpleSplitter) newMapperLocked() *SimpleMapper {
|
|||
return mapper
|
||||
}
|
||||
|
||||
func (s *SimpleSplitter) activeMapperByRouteIDLocked(routeID int64) *SimpleMapper {
|
||||
if s == nil || routeID < 0 {
|
||||
return nil
|
||||
}
|
||||
for _, mapper := range s.active {
|
||||
if mapper != nil && mapper.id == routeID {
|
||||
return mapper
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Submit forwards a share to the pool.
|
||||
func (m *SimpleMapper) Submit(event *proxy.SubmitEvent) {
|
||||
if m == nil || event == nil || m.strategy == nil {
|
||||
|
|
|
|||
|
|
@ -20,6 +20,21 @@ func (a activeStrategy) Submit(string, string, string, string) int64 { return 0
|
|||
func (a activeStrategy) Disconnect() {}
|
||||
func (a activeStrategy) IsActive() bool { return true }
|
||||
|
||||
type submitRecordingStrategy struct {
|
||||
submits int
|
||||
}
|
||||
|
||||
func (s *submitRecordingStrategy) Connect() {}
|
||||
|
||||
func (s *submitRecordingStrategy) Submit(string, string, string, string) int64 {
|
||||
s.submits++
|
||||
return int64(s.submits)
|
||||
}
|
||||
|
||||
func (s *submitRecordingStrategy) Disconnect() {}
|
||||
|
||||
func (s *submitRecordingStrategy) IsActive() bool { return true }
|
||||
|
||||
func TestSimpleMapper_New_Good(t *testing.T) {
|
||||
strategy := activeStrategy{}
|
||||
mapper := NewSimpleMapper(7, strategy)
|
||||
|
|
@ -90,6 +105,38 @@ func TestSimpleSplitter_OnLogin_Ugly(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestSimpleSplitter_OnSubmit_UsesRouteID_Good(t *testing.T) {
|
||||
strategy := &submitRecordingStrategy{}
|
||||
splitter := NewSimpleSplitter(&proxy.Config{ReuseTimeout: 30}, nil, nil)
|
||||
miner := proxy.NewMiner(discardConn{}, 3333, nil)
|
||||
miner.SetID(21)
|
||||
miner.SetRouteID(7)
|
||||
|
||||
mapper := &SimpleMapper{
|
||||
id: 7,
|
||||
miner: miner,
|
||||
currentJob: proxy.Job{JobID: "job-1", Blob: "blob", Target: "b88d0600"},
|
||||
strategy: strategy,
|
||||
pending: make(map[int64]submitContext),
|
||||
}
|
||||
splitter.active[99] = mapper
|
||||
|
||||
splitter.OnSubmit(&proxy.SubmitEvent{
|
||||
Miner: miner,
|
||||
JobID: "job-1",
|
||||
Nonce: "deadbeef",
|
||||
Result: "hash",
|
||||
RequestID: 11,
|
||||
})
|
||||
|
||||
if strategy.submits != 1 {
|
||||
t.Fatalf("expected one submit routed by route id, got %d", strategy.submits)
|
||||
}
|
||||
if len(mapper.pending) != 1 {
|
||||
t.Fatalf("expected routed submit to create one pending entry, got %d", len(mapper.pending))
|
||||
}
|
||||
}
|
||||
|
||||
func TestSimpleSplitter_Upstreams_Good(t *testing.T) {
|
||||
splitter := NewSimpleSplitter(&proxy.Config{ReuseTimeout: 30}, nil, func(listener pool.StratumListener) pool.Strategy {
|
||||
return activeStrategy{}
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue