From a1f47f5792c06ff0cb36e7e97ff83e43c8202296 Mon Sep 17 00:00:00 2001 From: Virgil Date: Sun, 5 Apr 2026 01:38:05 +0000 Subject: [PATCH] fix(proxy): align pool failover and simple routing Co-Authored-By: Virgil --- pool/impl.go | 5 ++++ pool/strategy_disconnect_test.go | 27 ++++++++++++++++++ splitter/simple/impl.go | 14 +++++++++- splitter/simple/impl_test.go | 47 ++++++++++++++++++++++++++++++++ 4 files changed, 92 insertions(+), 1 deletion(-) diff --git a/pool/impl.go b/pool/impl.go index a5c62af..fb36eb3 100644 --- a/pool/impl.go +++ b/pool/impl.go @@ -297,6 +297,11 @@ func (c *StratumClient) handleMessage(line []byte) { } } + if len(base.Error) > 0 && requestID(base.ID) == 1 { + c.notifyDisconnect() + return + } + if base.Method == "job" { var params struct { Blob string `json:"blob"` diff --git a/pool/strategy_disconnect_test.go b/pool/strategy_disconnect_test.go index d3247dd..39958c6 100644 --- a/pool/strategy_disconnect_test.go +++ b/pool/strategy_disconnect_test.go @@ -1,6 +1,7 @@ package pool import ( + "encoding/json" "net" "sync/atomic" "testing" @@ -119,3 +120,29 @@ func TestFailoverStrategy_OnDisconnect_ClearsClient_Bad(t *testing.T) { t.Fatalf("expected one disconnect notification, got %d", got) } } + +func TestStratumClient_HandleMessage_LoginErrorDisconnects_Ugly(t *testing.T) { + spy := &disconnectSpy{} + client := &StratumClient{ + listener: spy, + pending: make(map[int64]struct{}), + } + + payload, err := json.Marshal(map[string]any{ + "id": 1, + "jsonrpc": "2.0", + "error": map[string]any{ + "code": -1, + "message": "Invalid payment address provided", + }, + }) + if err != nil { + t.Fatalf("marshal login error payload: %v", err) + } + + client.handleMessage(payload) + + if got := spy.disconnects.Load(); got != 1 { + t.Fatalf("expected login failure to disconnect upstream once, got %d", got) + } +} diff --git a/splitter/simple/impl.go b/splitter/simple/impl.go index 01839ec..11fd0e9 100644 --- a/splitter/simple/impl.go +++ b/splitter/simple/impl.go @@ -87,7 +87,7 @@ func (s *SimpleSplitter) OnSubmit(event *proxy.SubmitEvent) { return } s.mu.Lock() - mapper := s.active[event.Miner.ID()] + mapper := s.activeMapperByRouteIDLocked(event.Miner.RouteID()) s.mu.Unlock() if mapper != nil { mapper.Submit(event) @@ -257,6 +257,18 @@ func (s *SimpleSplitter) newMapperLocked() *SimpleMapper { return mapper } +func (s *SimpleSplitter) activeMapperByRouteIDLocked(routeID int64) *SimpleMapper { + if s == nil || routeID < 0 { + return nil + } + for _, mapper := range s.active { + if mapper != nil && mapper.id == routeID { + return mapper + } + } + return nil +} + // Submit forwards a share to the pool. func (m *SimpleMapper) Submit(event *proxy.SubmitEvent) { if m == nil || event == nil || m.strategy == nil { diff --git a/splitter/simple/impl_test.go b/splitter/simple/impl_test.go index 8c62267..2f47e44 100644 --- a/splitter/simple/impl_test.go +++ b/splitter/simple/impl_test.go @@ -20,6 +20,21 @@ func (a activeStrategy) Submit(string, string, string, string) int64 { return 0 func (a activeStrategy) Disconnect() {} func (a activeStrategy) IsActive() bool { return true } +type submitRecordingStrategy struct { + submits int +} + +func (s *submitRecordingStrategy) Connect() {} + +func (s *submitRecordingStrategy) Submit(string, string, string, string) int64 { + s.submits++ + return int64(s.submits) +} + +func (s *submitRecordingStrategy) Disconnect() {} + +func (s *submitRecordingStrategy) IsActive() bool { return true } + func TestSimpleMapper_New_Good(t *testing.T) { strategy := activeStrategy{} mapper := NewSimpleMapper(7, strategy) @@ -90,6 +105,38 @@ func TestSimpleSplitter_OnLogin_Ugly(t *testing.T) { } } +func TestSimpleSplitter_OnSubmit_UsesRouteID_Good(t *testing.T) { + strategy := &submitRecordingStrategy{} + splitter := NewSimpleSplitter(&proxy.Config{ReuseTimeout: 30}, nil, nil) + miner := proxy.NewMiner(discardConn{}, 3333, nil) + miner.SetID(21) + miner.SetRouteID(7) + + mapper := &SimpleMapper{ + id: 7, + miner: miner, + currentJob: proxy.Job{JobID: "job-1", Blob: "blob", Target: "b88d0600"}, + strategy: strategy, + pending: make(map[int64]submitContext), + } + splitter.active[99] = mapper + + splitter.OnSubmit(&proxy.SubmitEvent{ + Miner: miner, + JobID: "job-1", + Nonce: "deadbeef", + Result: "hash", + RequestID: 11, + }) + + if strategy.submits != 1 { + t.Fatalf("expected one submit routed by route id, got %d", strategy.submits) + } + if len(mapper.pending) != 1 { + t.Fatalf("expected routed submit to create one pending entry, got %d", len(mapper.pending)) + } +} + func TestSimpleSplitter_Upstreams_Good(t *testing.T) { splitter := NewSimpleSplitter(&proxy.Config{ReuseTimeout: 30}, nil, func(listener pool.StratumListener) pool.Strategy { return activeStrategy{}