fix(simple): track expired shares
Propagate the submitted job id through simple-mode share handling so accepted shares can be flagged expired when a reply lands after a job rollover. Add coverage for the expired accept event path.\n\nCo-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
parent
3efa7f34d0
commit
b66739b64f
3 changed files with 69 additions and 5 deletions
|
|
@ -217,8 +217,12 @@ func (m *SimpleMapper) Submit(event *proxy.SubmitEvent) {
|
|||
}
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
seq := m.strategy.Submit(event.JobID, event.Nonce, event.Result, event.Algo)
|
||||
m.pending[seq] = submitContext{RequestID: event.RequestID, StartedAt: time.Now()}
|
||||
jobID := event.JobID
|
||||
if jobID == "" {
|
||||
jobID = m.currentJob.JobID
|
||||
}
|
||||
seq := m.strategy.Submit(jobID, event.Nonce, event.Result, event.Algo)
|
||||
m.pending[seq] = submitContext{RequestID: event.RequestID, StartedAt: time.Now(), JobID: jobID}
|
||||
}
|
||||
|
||||
// OnJob forwards the latest pool job to the active miner.
|
||||
|
|
@ -227,6 +231,7 @@ func (m *SimpleMapper) OnJob(job proxy.Job) {
|
|||
return
|
||||
}
|
||||
m.mu.Lock()
|
||||
m.prevJob = m.currentJob
|
||||
m.currentJob = job
|
||||
miner := m.miner
|
||||
m.mu.Unlock()
|
||||
|
|
@ -247,6 +252,8 @@ func (m *SimpleMapper) OnResultAccepted(sequence int64, accepted bool, errorMess
|
|||
delete(m.pending, sequence)
|
||||
}
|
||||
miner := m.miner
|
||||
currentJob := m.currentJob
|
||||
prevJob := m.prevJob
|
||||
m.mu.Unlock()
|
||||
if !ok || miner == nil {
|
||||
return
|
||||
|
|
@ -260,17 +267,21 @@ func (m *SimpleMapper) OnResultAccepted(sequence int64, accepted bool, errorMess
|
|||
latency = uint16(elapsed)
|
||||
}
|
||||
}
|
||||
job := currentJob
|
||||
expired := false
|
||||
if ctx.JobID != "" && ctx.JobID == prevJob.JobID && ctx.JobID != currentJob.JobID {
|
||||
job = prevJob
|
||||
expired = true
|
||||
}
|
||||
if accepted {
|
||||
miner.Success(ctx.RequestID, "OK")
|
||||
if m.events != nil {
|
||||
job := miner.CurrentJob()
|
||||
m.events.Dispatch(proxy.Event{Type: proxy.EventAccept, Miner: miner, Diff: job.DifficultyFromTarget(), Job: &job, Latency: latency})
|
||||
m.events.Dispatch(proxy.Event{Type: proxy.EventAccept, Miner: miner, Diff: job.DifficultyFromTarget(), Job: &job, Latency: latency, Expired: expired})
|
||||
}
|
||||
return
|
||||
}
|
||||
miner.ReplyWithError(ctx.RequestID, errorMessage)
|
||||
if m.events != nil {
|
||||
job := miner.CurrentJob()
|
||||
m.events.Dispatch(proxy.Event{Type: proxy.EventReject, Miner: miner, Diff: job.DifficultyFromTarget(), Job: &job, Error: errorMessage, Latency: latency})
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,6 +1,9 @@
|
|||
package simple
|
||||
|
||||
import (
|
||||
"io"
|
||||
"net"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
|
@ -112,3 +115,51 @@ func TestSimpleSplitter_Upstreams_Ugly(t *testing.T) {
|
|||
t.Fatalf("expected total upstreams to be 2, got %d", stats.Total)
|
||||
}
|
||||
}
|
||||
|
||||
type discardConn struct{}
|
||||
|
||||
func (discardConn) Read([]byte) (int, error) { return 0, io.EOF }
|
||||
func (discardConn) Write(p []byte) (int, error) { return len(p), nil }
|
||||
func (discardConn) Close() error { return nil }
|
||||
func (discardConn) LocalAddr() net.Addr { return nil }
|
||||
func (discardConn) RemoteAddr() net.Addr { return nil }
|
||||
func (discardConn) SetDeadline(time.Time) error { return nil }
|
||||
func (discardConn) SetReadDeadline(time.Time) error { return nil }
|
||||
func (discardConn) SetWriteDeadline(time.Time) error { return nil }
|
||||
|
||||
func TestSimpleMapper_OnResultAccepted_Expired(t *testing.T) {
|
||||
bus := proxy.NewEventBus()
|
||||
events := make(chan proxy.Event, 1)
|
||||
var once sync.Once
|
||||
bus.Subscribe(proxy.EventAccept, func(e proxy.Event) {
|
||||
once.Do(func() {
|
||||
events <- e
|
||||
})
|
||||
})
|
||||
|
||||
miner := proxy.NewMiner(discardConn{}, 3333, nil)
|
||||
miner.SetID(1)
|
||||
mapper := &SimpleMapper{
|
||||
miner: miner,
|
||||
currentJob: proxy.Job{JobID: "job-new", Blob: "blob-new", Target: "b88d0600"},
|
||||
prevJob: proxy.Job{JobID: "job-old", Blob: "blob-old", Target: "b88d0600"},
|
||||
events: bus,
|
||||
pending: map[int64]submitContext{
|
||||
7: {RequestID: 9, StartedAt: time.Now(), JobID: "job-old"},
|
||||
},
|
||||
}
|
||||
|
||||
mapper.OnResultAccepted(7, true, "")
|
||||
|
||||
select {
|
||||
case event := <-events:
|
||||
if !event.Expired {
|
||||
t.Fatalf("expected expired share to be flagged")
|
||||
}
|
||||
if event.Job == nil || event.Job.JobID != "job-old" {
|
||||
t.Fatalf("expected previous job to be attached, got %+v", event.Job)
|
||||
}
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("expected accept event")
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@ type SimpleMapper struct {
|
|||
id int64
|
||||
miner *proxy.Miner // nil when idle
|
||||
currentJob proxy.Job
|
||||
prevJob proxy.Job
|
||||
strategy pool.Strategy
|
||||
idleAt time.Time // zero when active
|
||||
stopped bool
|
||||
|
|
@ -28,4 +29,5 @@ type SimpleMapper struct {
|
|||
type submitContext struct {
|
||||
RequestID int64
|
||||
StartedAt time.Time
|
||||
JobID string
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue