From 1f8ff58b20bb1f60736190c395e5d22edd2e2c36 Mon Sep 17 00:00:00 2001 From: Virgil Date: Sun, 5 Apr 2026 02:48:03 +0000 Subject: [PATCH] fix(login): defer login events until assignment succeeds Co-Authored-By: Virgil --- miner.go | 1 + miner_login_test.go | 62 ++++++++++++++++++++++++++ splitter/nicehash/impl.go | 7 ++- splitter/nicehash/mapper_start_test.go | 43 ++++++++++++++++++ state_impl.go | 11 +++-- 5 files changed, 120 insertions(+), 4 deletions(-) diff --git a/miner.go b/miner.go index 52d7e31..982bbae 100644 --- a/miner.go +++ b/miner.go @@ -57,6 +57,7 @@ type Miner struct { sendMu sync.Mutex // serialises writes to conn buf [16384]byte // per-miner send buffer; avoids per-write allocations onLogin func(*Miner) + onLoginAccepted func(*Miner) onSubmit func(*Miner, *SubmitEvent) onClose func(*Miner) closeOnce sync.Once diff --git a/miner_login_test.go b/miner_login_test.go index 26b76b3..72e10dc 100644 --- a/miner_login_test.go +++ b/miner_login_test.go @@ -175,6 +175,68 @@ func TestMiner_HandleLogin_Ugly(t *testing.T) { } } +func TestMiner_HandleLogin_FailedAssignmentDoesNotDispatchLoginEvent(t *testing.T) { + minerConn, clientConn := net.Pipe() + defer minerConn.Close() + defer clientConn.Close() + + proxyInstance := &Proxy{ + config: &Config{ + Mode: "nicehash", + Workers: WorkersByUser, + Bind: []BindAddr{{Host: "127.0.0.1", Port: 3333}}, + Pools: []PoolConfig{{URL: "pool.example:3333", Enabled: true}}, + }, + events: NewEventBus(), + stats: NewStats(), + workers: NewWorkers(WorkersByUser, nil), + miners: make(map[int64]*Miner), + } + proxyInstance.events.Subscribe(EventLogin, proxyInstance.stats.OnLogin) + proxyInstance.workers.bindEvents(proxyInstance.events) + + miner := NewMiner(minerConn, 3333, nil) + miner.extNH = true + miner.onLogin = func(*Miner) {} + miner.onLoginAccepted = func(m *Miner) { + proxyInstance.events.Dispatch(Event{Type: EventLogin, Miner: m}) + } + proxyInstance.miners[miner.ID()] = miner + + params, err := json.Marshal(loginParams{ + Login: "wallet", + Pass: "x", + }) + if err != nil { + t.Fatalf("marshal login params: %v", err) + } + + go miner.handleLogin(stratumRequest{ID: 12, Method: "login", Params: params}) + + line, err := bufio.NewReader(clientConn).ReadBytes('\n') + if err != nil { + t.Fatalf("read login rejection: %v", err) + } + + var payload struct { + Error struct { + Message string `json:"message"` + } `json:"error"` + } + if err := json.Unmarshal(line, &payload); err != nil { + t.Fatalf("unmarshal login rejection: %v", err) + } + if payload.Error.Message != "Proxy is full, try again later" { + t.Fatalf("expected full-table rejection, got %q", payload.Error.Message) + } + if now, max := proxyInstance.MinerCount(); now != 0 || max != 0 { + t.Fatalf("expected failed login not to affect miner counts, got now=%d max=%d", now, max) + } + if records := proxyInstance.WorkerRecords(); len(records) != 0 { + t.Fatalf("expected failed login not to create worker records, got %d", len(records)) + } +} + func TestMiner_HandleLogin_CustomDiffCap_Good(t *testing.T) { minerConn, clientConn := net.Pipe() defer minerConn.Close() diff --git a/splitter/nicehash/impl.go b/splitter/nicehash/impl.go index c678434..72dc707 100644 --- a/splitter/nicehash/impl.go +++ b/splitter/nicehash/impl.go @@ -277,6 +277,7 @@ func (m *NonceMapper) Submit(event *proxy.SubmitEvent) { jobID := event.JobID m.storage.mu.Lock() job := m.storage.job + prevJob := m.storage.prevJob m.storage.mu.Unlock() if jobID == "" { jobID = job.JobID @@ -286,12 +287,16 @@ func (m *NonceMapper) Submit(event *proxy.SubmitEvent) { m.rejectInvalidJobLocked(event, job) return } + submissionJob := job + if jobID == prevJob.JobID && prevJob.JobID != "" { + submissionJob = prevJob + } seq := m.strategy.Submit(jobID, event.Nonce, event.Result, event.Algo) m.pending[seq] = SubmitContext{ RequestID: event.RequestID, MinerID: event.Miner.ID(), JobID: jobID, - Diff: proxy.EffectiveShareDifficulty(job, event.Miner), + Diff: proxy.EffectiveShareDifficulty(submissionJob, event.Miner), StartedAt: time.Now(), } m.lastUsed = time.Now() diff --git a/splitter/nicehash/mapper_start_test.go b/splitter/nicehash/mapper_start_test.go index c9f128b..8498358 100644 --- a/splitter/nicehash/mapper_start_test.go +++ b/splitter/nicehash/mapper_start_test.go @@ -166,6 +166,49 @@ func TestMapper_OnResultAccepted_ExpiredUsesPreviousJob(t *testing.T) { } } +func TestMapper_Submit_ExpiredJobUsesPreviousDifficulty(t *testing.T) { + miner := proxy.NewMiner(discardConn{}, 3333, nil) + miner.SetID(9) + + strategy := &submitCaptureStrategy{} + mapper := NewNonceMapper(1, &proxy.Config{}, strategy) + mapper.storage.job = proxy.Job{JobID: "job-new", Blob: "blob-new", Target: "ffffffff"} + mapper.storage.prevJob = proxy.Job{JobID: "job-old", Blob: "blob-old", Target: "b88d0600"} + mapper.storage.miners[miner.ID()] = miner + + mapper.Submit(&proxy.SubmitEvent{ + Miner: miner, + JobID: "job-old", + Nonce: "deadbeef", + Result: "hash", + RequestID: 88, + }) + + ctx, ok := mapper.pending[strategy.seq] + if !ok { + t.Fatal("expected pending submit context for expired job") + } + want := mapper.storage.prevJob.DifficultyFromTarget() + if ctx.Diff != want { + t.Fatalf("expected previous-job difficulty %d, got %d", want, ctx.Diff) + } +} + +type submitCaptureStrategy struct { + seq int64 +} + +func (s *submitCaptureStrategy) Connect() {} + +func (s *submitCaptureStrategy) Submit(jobID, nonce, result, algo string) int64 { + s.seq++ + return s.seq +} + +func (s *submitCaptureStrategy) Disconnect() {} + +func (s *submitCaptureStrategy) IsActive() bool { return true } + func TestMapper_OnResultAccepted_CustomDiffUsesEffectiveDifficulty(t *testing.T) { bus := proxy.NewEventBus() events := make(chan proxy.Event, 1) diff --git a/state_impl.go b/state_impl.go index 26c8636..9d35b5b 100644 --- a/state_impl.go +++ b/state_impl.go @@ -452,13 +452,15 @@ func (p *Proxy) acceptMiner(conn net.Conn, localPort uint16) { miner.globalDiff = customDiff miner.extNH = strings.EqualFold(mode, "nicehash") miner.onLogin = func(m *Miner) { - if p.events != nil { - p.events.Dispatch(Event{Type: EventLogin, Miner: m}) - } if p.splitter != nil { p.splitter.OnLogin(&LoginEvent{Miner: m}) } } + miner.onLoginAccepted = func(m *Miner) { + if p.events != nil { + p.events.Dispatch(Event{Type: EventLogin, Miner: m}) + } + } miner.onSubmit = func(m *Miner, event *SubmitEvent) { if p.splitter != nil { if _, ok := p.splitter.(*noopSplitter); !ok { @@ -1030,6 +1032,9 @@ func (m *Miner) handleLogin(request stratumRequest) { m.ReplyWithError(requestID(request.ID), "Proxy is unavailable, try again later") return } + if m.onLoginAccepted != nil { + m.onLoginAccepted(m) + } m.replyLoginSuccess(requestID(request.ID)) }