diff --git a/splitter/nicehash/impl.go b/splitter/nicehash/impl.go index 3d27858..d32c2ab 100644 --- a/splitter/nicehash/impl.go +++ b/splitter/nicehash/impl.go @@ -300,7 +300,7 @@ func (m *NonceMapper) OnResultAccepted(sequence int64, accepted bool, errorMessa prevJob := m.storage.prevJob m.storage.mu.Unlock() _ = m.storage.IsValidJobID(ctx.JobID) - expired := ctx.JobID != "" && ctx.JobID == prevJob.JobID && ctx.JobID != job.JobID + job, expired := resolveSubmissionJob(ctx.JobID, job, prevJob) m.mu.Unlock() if !ok || miner == nil { return @@ -327,6 +327,13 @@ func (m *NonceMapper) OnResultAccepted(sequence int64, accepted bool, errorMessa } } +func resolveSubmissionJob(jobID string, currentJob, previousJob proxy.Job) (proxy.Job, bool) { + if jobID != "" && jobID == previousJob.JobID && jobID != currentJob.JobID { + return previousJob, true + } + return currentJob, false +} + func (m *NonceMapper) OnDisconnect() { if m == nil { return diff --git a/splitter/nicehash/mapper_start_test.go b/splitter/nicehash/mapper_start_test.go index c6905ef..189c0bc 100644 --- a/splitter/nicehash/mapper_start_test.go +++ b/splitter/nicehash/mapper_start_test.go @@ -6,6 +6,7 @@ import ( "net" "sync" "testing" + "time" "dappco.re/go/proxy" ) @@ -33,6 +34,17 @@ func (s *startCountingStrategy) IsActive() bool { return s.connect > 0 } +type discardConn struct{} + +func (discardConn) Read([]byte) (int, error) { return 0, nil } +func (discardConn) Write(p []byte) (int, error) { return len(p), nil } +func (discardConn) Close() error { return nil } +func (discardConn) LocalAddr() net.Addr { return nil } +func (discardConn) RemoteAddr() net.Addr { return nil } +func (discardConn) SetDeadline(time.Time) error { return nil } +func (discardConn) SetReadDeadline(time.Time) error { return nil } +func (discardConn) SetWriteDeadline(time.Time) error { return nil } + func TestMapper_Start_Good(t *testing.T) { strategy := &startCountingStrategy{} mapper := NewNonceMapper(1, &proxy.Config{}, strategy) @@ -110,3 +122,39 @@ func TestMapper_Submit_InvalidJob_Good(t *testing.T) { t.Fatalf("expected invalid submit not to create a pending entry") } } + +func TestMapper_OnResultAccepted_ExpiredUsesPreviousJob(t *testing.T) { + bus := proxy.NewEventBus() + events := make(chan proxy.Event, 1) + bus.Subscribe(proxy.EventAccept, func(e proxy.Event) { + events <- e + }) + + miner := proxy.NewMiner(discardConn{}, 3333, nil) + miner.SetID(7) + mapper := NewNonceMapper(1, &proxy.Config{}, &startCountingStrategy{}) + mapper.events = bus + mapper.storage.job = proxy.Job{JobID: "job-new", Blob: "blob-new", Target: "b88d0600"} + mapper.storage.prevJob = proxy.Job{JobID: "job-old", Blob: "blob-old", Target: "b88d0600"} + mapper.storage.miners[miner.ID()] = miner + mapper.pending[9] = SubmitContext{ + RequestID: 42, + MinerID: miner.ID(), + JobID: "job-old", + StartedAt: time.Now(), + } + + mapper.OnResultAccepted(9, true, "") + + select { + case event := <-events: + if !event.Expired { + t.Fatalf("expected expired share to be flagged") + } + if event.Job == nil || event.Job.JobID != "job-old" { + t.Fatalf("expected previous job to be attached, got %+v", event.Job) + } + case <-time.After(time.Second): + t.Fatal("expected accept event") + } +}