diff --git a/splitter/simple/impl.go b/splitter/simple/impl.go index e79dbcd..c7c82cf 100644 --- a/splitter/simple/impl.go +++ b/splitter/simple/impl.go @@ -217,8 +217,12 @@ func (m *SimpleMapper) Submit(event *proxy.SubmitEvent) { } m.mu.Lock() defer m.mu.Unlock() - seq := m.strategy.Submit(event.JobID, event.Nonce, event.Result, event.Algo) - m.pending[seq] = submitContext{RequestID: event.RequestID, StartedAt: time.Now()} + jobID := event.JobID + if jobID == "" { + jobID = m.currentJob.JobID + } + seq := m.strategy.Submit(jobID, event.Nonce, event.Result, event.Algo) + m.pending[seq] = submitContext{RequestID: event.RequestID, StartedAt: time.Now(), JobID: jobID} } // OnJob forwards the latest pool job to the active miner. @@ -227,6 +231,7 @@ func (m *SimpleMapper) OnJob(job proxy.Job) { return } m.mu.Lock() + m.prevJob = m.currentJob m.currentJob = job miner := m.miner m.mu.Unlock() @@ -247,6 +252,8 @@ func (m *SimpleMapper) OnResultAccepted(sequence int64, accepted bool, errorMess delete(m.pending, sequence) } miner := m.miner + currentJob := m.currentJob + prevJob := m.prevJob m.mu.Unlock() if !ok || miner == nil { return @@ -260,17 +267,21 @@ func (m *SimpleMapper) OnResultAccepted(sequence int64, accepted bool, errorMess latency = uint16(elapsed) } } + job := currentJob + expired := false + if ctx.JobID != "" && ctx.JobID == prevJob.JobID && ctx.JobID != currentJob.JobID { + job = prevJob + expired = true + } if accepted { miner.Success(ctx.RequestID, "OK") if m.events != nil { - job := miner.CurrentJob() - m.events.Dispatch(proxy.Event{Type: proxy.EventAccept, Miner: miner, Diff: job.DifficultyFromTarget(), Job: &job, Latency: latency}) + m.events.Dispatch(proxy.Event{Type: proxy.EventAccept, Miner: miner, Diff: job.DifficultyFromTarget(), Job: &job, Latency: latency, Expired: expired}) } return } miner.ReplyWithError(ctx.RequestID, errorMessage) if m.events != nil { - job := miner.CurrentJob() m.events.Dispatch(proxy.Event{Type: proxy.EventReject, Miner: miner, Diff: job.DifficultyFromTarget(), Job: &job, Error: errorMessage, Latency: latency}) } } diff --git a/splitter/simple/impl_test.go b/splitter/simple/impl_test.go index 0ebfaef..8bcffef 100644 --- a/splitter/simple/impl_test.go +++ b/splitter/simple/impl_test.go @@ -1,6 +1,9 @@ package simple import ( + "io" + "net" + "sync" "testing" "time" @@ -112,3 +115,51 @@ func TestSimpleSplitter_Upstreams_Ugly(t *testing.T) { t.Fatalf("expected total upstreams to be 2, got %d", stats.Total) } } + +type discardConn struct{} + +func (discardConn) Read([]byte) (int, error) { return 0, io.EOF } +func (discardConn) Write(p []byte) (int, error) { return len(p), nil } +func (discardConn) Close() error { return nil } +func (discardConn) LocalAddr() net.Addr { return nil } +func (discardConn) RemoteAddr() net.Addr { return nil } +func (discardConn) SetDeadline(time.Time) error { return nil } +func (discardConn) SetReadDeadline(time.Time) error { return nil } +func (discardConn) SetWriteDeadline(time.Time) error { return nil } + +func TestSimpleMapper_OnResultAccepted_Expired(t *testing.T) { + bus := proxy.NewEventBus() + events := make(chan proxy.Event, 1) + var once sync.Once + bus.Subscribe(proxy.EventAccept, func(e proxy.Event) { + once.Do(func() { + events <- e + }) + }) + + miner := proxy.NewMiner(discardConn{}, 3333, nil) + miner.SetID(1) + mapper := &SimpleMapper{ + miner: miner, + currentJob: proxy.Job{JobID: "job-new", Blob: "blob-new", Target: "b88d0600"}, + prevJob: proxy.Job{JobID: "job-old", Blob: "blob-old", Target: "b88d0600"}, + events: bus, + pending: map[int64]submitContext{ + 7: {RequestID: 9, StartedAt: time.Now(), JobID: "job-old"}, + }, + } + + mapper.OnResultAccepted(7, true, "") + + select { + case event := <-events: + if !event.Expired { + t.Fatalf("expected expired share to be flagged") + } + if event.Job == nil || event.Job.JobID != "job-old" { + t.Fatalf("expected previous job to be attached, got %+v", event.Job) + } + case <-time.After(time.Second): + t.Fatal("expected accept event") + } +} diff --git a/splitter/simple/mapper.go b/splitter/simple/mapper.go index c3b3e40..a37ebca 100644 --- a/splitter/simple/mapper.go +++ b/splitter/simple/mapper.go @@ -17,6 +17,7 @@ type SimpleMapper struct { id int64 miner *proxy.Miner // nil when idle currentJob proxy.Job + prevJob proxy.Job strategy pool.Strategy idleAt time.Time // zero when active stopped bool @@ -28,4 +29,5 @@ type SimpleMapper struct { type submitContext struct { RequestID int64 StartedAt time.Time + JobID string }