From d47d89af7ac7b623e3a13fb7999db90cdb4f20c2 Mon Sep 17 00:00:00 2001 From: Virgil Date: Sat, 4 Apr 2026 20:28:54 +0000 Subject: [PATCH] Fix submit job validation and custom diff fallback --- core_impl.go | 14 +------ customdiff_test.go | 4 +- splitter/nicehash/impl.go | 13 ++++++- splitter/nicehash/mapper_start_test.go | 52 +++++++++++++++++++++++++ splitter/simple/impl.go | 15 +++++++ splitter/simple/impl_test.go | 54 ++++++++++++++++++++++++++ state_impl.go | 1 - 7 files changed, 135 insertions(+), 18 deletions(-) diff --git a/core_impl.go b/core_impl.go index 3382d5e..c380e75 100644 --- a/core_impl.go +++ b/core_impl.go @@ -181,19 +181,7 @@ func (cd *CustomDiff) OnLogin(e Event) { if cd == nil || e.Miner == nil { return } - miner := e.Miner - user := miner.user - plus := strings.LastIndex(user, "+") - if plus >= 0 && plus < len(user)-1 { - if parsed, err := strconv.ParseUint(user[plus+1:], 10, 64); err == nil { - miner.user = user[:plus] - miner.customDiff = parsed - } - return - } - if cd.globalDiff > 0 { - miner.customDiff = cd.globalDiff - } + e.Miner.user, e.Miner.customDiff = parseLoginUser(e.Miner.user, cd.globalDiff) } // NewRateLimiter creates a per-IP token bucket limiter. diff --git a/customdiff_test.go b/customdiff_test.go index 0e40bbb..effe9d8 100644 --- a/customdiff_test.go +++ b/customdiff_test.go @@ -18,8 +18,8 @@ func TestCustomDiff_OnLogin(t *testing.T) { if miner.User() != "WALLET+abc" { t.Fatalf("expected invalid suffix to remain unchanged") } - if miner.customDiff != 0 { - t.Fatalf("expected custom diff 0 for invalid suffix, got %d", miner.customDiff) + if miner.customDiff != 10000 { + t.Fatalf("expected global diff fallback for invalid suffix, got %d", miner.customDiff) } miner = &Miner{user: "WALLET"} diff --git a/splitter/nicehash/impl.go b/splitter/nicehash/impl.go index 001f466..3d27858 100644 --- a/splitter/nicehash/impl.go +++ b/splitter/nicehash/impl.go @@ -234,12 +234,13 @@ func (m *NonceMapper) Submit(event *proxy.SubmitEvent) { jobID := event.JobID m.storage.mu.Lock() job := m.storage.job - prevJob := m.storage.prevJob m.storage.mu.Unlock() if jobID == "" { jobID = job.JobID } - if jobID == "" || (jobID != job.JobID && jobID != prevJob.JobID) { + valid := m.storage.IsValidJobID(jobID) + if jobID == "" || !valid { + m.rejectInvalidJobLocked(event, job) return } seq := m.strategy.Submit(jobID, event.Nonce, event.Result, event.Algo) @@ -252,6 +253,14 @@ func (m *NonceMapper) Submit(event *proxy.SubmitEvent) { m.lastUsed = time.Now() } +func (m *NonceMapper) rejectInvalidJobLocked(event *proxy.SubmitEvent, job proxy.Job) { + event.Miner.ReplyWithError(event.RequestID, "Invalid job id") + if m.events != nil { + jobCopy := job + m.events.Dispatch(proxy.Event{Type: proxy.EventReject, Miner: event.Miner, Job: &jobCopy, Error: "Invalid job id"}) + } +} + // IsActive reports whether the mapper has received a valid job. func (m *NonceMapper) IsActive() bool { if m == nil { diff --git a/splitter/nicehash/mapper_start_test.go b/splitter/nicehash/mapper_start_test.go index e5c5040..c6905ef 100644 --- a/splitter/nicehash/mapper_start_test.go +++ b/splitter/nicehash/mapper_start_test.go @@ -1,6 +1,9 @@ package nicehash import ( + "bufio" + "encoding/json" + "net" "sync" "testing" @@ -58,3 +61,52 @@ func TestMapper_Start_Ugly(t *testing.T) { t.Fatalf("expected Start to be idempotent, got %d connect calls", strategy.connect) } } + +func TestMapper_Submit_InvalidJob_Good(t *testing.T) { + minerConn, clientConn := net.Pipe() + defer minerConn.Close() + defer clientConn.Close() + + miner := proxy.NewMiner(minerConn, 3333, nil) + miner.SetID(7) + strategy := &startCountingStrategy{} + mapper := NewNonceMapper(1, &proxy.Config{}, strategy) + mapper.storage.job = proxy.Job{JobID: "job-1", Blob: "blob", Target: "b88d0600"} + + done := make(chan struct{}) + go func() { + mapper.Submit(&proxy.SubmitEvent{ + Miner: miner, + JobID: "job-missing", + Nonce: "deadbeef", + Result: "hash", + RequestID: 42, + }) + close(done) + }() + + line, err := bufio.NewReader(clientConn).ReadBytes('\n') + if err != nil { + t.Fatalf("read error reply: %v", err) + } + <-done + + var payload struct { + ID float64 `json:"id"` + Error struct { + Message string `json:"message"` + } `json:"error"` + } + if err := json.Unmarshal(line, &payload); err != nil { + t.Fatalf("unmarshal error reply: %v", err) + } + if payload.ID != 42 { + t.Fatalf("expected request id 42, got %v", payload.ID) + } + if payload.Error.Message != "Invalid job id" { + t.Fatalf("expected invalid job error, got %q", payload.Error.Message) + } + if len(mapper.pending) != 0 { + t.Fatalf("expected invalid submit not to create a pending entry") + } +} diff --git a/splitter/simple/impl.go b/splitter/simple/impl.go index c7c82cf..4159918 100644 --- a/splitter/simple/impl.go +++ b/splitter/simple/impl.go @@ -221,10 +221,25 @@ func (m *SimpleMapper) Submit(event *proxy.SubmitEvent) { if jobID == "" { jobID = m.currentJob.JobID } + if jobID == "" || (jobID != m.currentJob.JobID && jobID != m.prevJob.JobID) { + m.rejectInvalidJobLocked(event, m.currentJob) + return + } seq := m.strategy.Submit(jobID, event.Nonce, event.Result, event.Algo) m.pending[seq] = submitContext{RequestID: event.RequestID, StartedAt: time.Now(), JobID: jobID} } +func (m *SimpleMapper) rejectInvalidJobLocked(event *proxy.SubmitEvent, job proxy.Job) { + if event == nil || event.Miner == nil { + return + } + event.Miner.ReplyWithError(event.RequestID, "Invalid job id") + if m.events != nil { + jobCopy := job + m.events.Dispatch(proxy.Event{Type: proxy.EventReject, Miner: event.Miner, Job: &jobCopy, Error: "Invalid job id"}) + } +} + // OnJob forwards the latest pool job to the active miner. func (m *SimpleMapper) OnJob(job proxy.Job) { if m == nil { diff --git a/splitter/simple/impl_test.go b/splitter/simple/impl_test.go index 8bcffef..02ce74a 100644 --- a/splitter/simple/impl_test.go +++ b/splitter/simple/impl_test.go @@ -1,6 +1,8 @@ package simple import ( + "bufio" + "encoding/json" "io" "net" "sync" @@ -163,3 +165,55 @@ func TestSimpleMapper_OnResultAccepted_Expired(t *testing.T) { t.Fatal("expected accept event") } } + +func TestSimpleMapper_Submit_InvalidJob_Good(t *testing.T) { + minerConn, clientConn := net.Pipe() + defer minerConn.Close() + defer clientConn.Close() + + miner := proxy.NewMiner(minerConn, 3333, nil) + mapper := &SimpleMapper{ + miner: miner, + currentJob: proxy.Job{JobID: "job-1", Blob: "blob", Target: "b88d0600"}, + prevJob: proxy.Job{JobID: "job-0", Blob: "blob", Target: "b88d0600"}, + strategy: activeStrategy{}, + pending: make(map[int64]submitContext), + } + + done := make(chan struct{}) + go func() { + mapper.Submit(&proxy.SubmitEvent{ + Miner: miner, + JobID: "job-missing", + Nonce: "deadbeef", + Result: "hash", + RequestID: 9, + }) + close(done) + }() + + line, err := bufio.NewReader(clientConn).ReadBytes('\n') + if err != nil { + t.Fatalf("read error reply: %v", err) + } + <-done + + var payload struct { + ID float64 `json:"id"` + Error struct { + Message string `json:"message"` + } `json:"error"` + } + if err := json.Unmarshal(line, &payload); err != nil { + t.Fatalf("unmarshal error reply: %v", err) + } + if payload.ID != 9 { + t.Fatalf("expected request id 9, got %v", payload.ID) + } + if payload.Error.Message != "Invalid job id" { + t.Fatalf("expected invalid job error, got %q", payload.Error.Message) + } + if len(mapper.pending) != 0 { + t.Fatalf("expected invalid submit not to create a pending entry") + } +} diff --git a/state_impl.go b/state_impl.go index 75f7f6f..271a4dc 100644 --- a/state_impl.go +++ b/state_impl.go @@ -841,7 +841,6 @@ func parseLoginUser(login string, globalDiff uint64) (string, uint64) { if parsed, err := strconv.ParseUint(login[plus+1:], 10, 64); err == nil { return login[:plus], parsed } - return login, 0 } if globalDiff > 0 { return login, globalDiff