fix(login): defer login events until assignment succeeds

Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
Virgil 2026-04-05 02:48:03 +00:00
parent bc6113c80d
commit 1f8ff58b20
5 changed files with 120 additions and 4 deletions

View file

@ -57,6 +57,7 @@ type Miner struct {
sendMu sync.Mutex // serialises writes to conn
buf [16384]byte // per-miner send buffer; avoids per-write allocations
onLogin func(*Miner)
onLoginAccepted func(*Miner)
onSubmit func(*Miner, *SubmitEvent)
onClose func(*Miner)
closeOnce sync.Once

View file

@ -175,6 +175,68 @@ func TestMiner_HandleLogin_Ugly(t *testing.T) {
}
}
func TestMiner_HandleLogin_FailedAssignmentDoesNotDispatchLoginEvent(t *testing.T) {
minerConn, clientConn := net.Pipe()
defer minerConn.Close()
defer clientConn.Close()
proxyInstance := &Proxy{
config: &Config{
Mode: "nicehash",
Workers: WorkersByUser,
Bind: []BindAddr{{Host: "127.0.0.1", Port: 3333}},
Pools: []PoolConfig{{URL: "pool.example:3333", Enabled: true}},
},
events: NewEventBus(),
stats: NewStats(),
workers: NewWorkers(WorkersByUser, nil),
miners: make(map[int64]*Miner),
}
proxyInstance.events.Subscribe(EventLogin, proxyInstance.stats.OnLogin)
proxyInstance.workers.bindEvents(proxyInstance.events)
miner := NewMiner(minerConn, 3333, nil)
miner.extNH = true
miner.onLogin = func(*Miner) {}
miner.onLoginAccepted = func(m *Miner) {
proxyInstance.events.Dispatch(Event{Type: EventLogin, Miner: m})
}
proxyInstance.miners[miner.ID()] = miner
params, err := json.Marshal(loginParams{
Login: "wallet",
Pass: "x",
})
if err != nil {
t.Fatalf("marshal login params: %v", err)
}
go miner.handleLogin(stratumRequest{ID: 12, Method: "login", Params: params})
line, err := bufio.NewReader(clientConn).ReadBytes('\n')
if err != nil {
t.Fatalf("read login rejection: %v", err)
}
var payload struct {
Error struct {
Message string `json:"message"`
} `json:"error"`
}
if err := json.Unmarshal(line, &payload); err != nil {
t.Fatalf("unmarshal login rejection: %v", err)
}
if payload.Error.Message != "Proxy is full, try again later" {
t.Fatalf("expected full-table rejection, got %q", payload.Error.Message)
}
if now, max := proxyInstance.MinerCount(); now != 0 || max != 0 {
t.Fatalf("expected failed login not to affect miner counts, got now=%d max=%d", now, max)
}
if records := proxyInstance.WorkerRecords(); len(records) != 0 {
t.Fatalf("expected failed login not to create worker records, got %d", len(records))
}
}
func TestMiner_HandleLogin_CustomDiffCap_Good(t *testing.T) {
minerConn, clientConn := net.Pipe()
defer minerConn.Close()

View file

@ -277,6 +277,7 @@ func (m *NonceMapper) Submit(event *proxy.SubmitEvent) {
jobID := event.JobID
m.storage.mu.Lock()
job := m.storage.job
prevJob := m.storage.prevJob
m.storage.mu.Unlock()
if jobID == "" {
jobID = job.JobID
@ -286,12 +287,16 @@ func (m *NonceMapper) Submit(event *proxy.SubmitEvent) {
m.rejectInvalidJobLocked(event, job)
return
}
submissionJob := job
if jobID == prevJob.JobID && prevJob.JobID != "" {
submissionJob = prevJob
}
seq := m.strategy.Submit(jobID, event.Nonce, event.Result, event.Algo)
m.pending[seq] = SubmitContext{
RequestID: event.RequestID,
MinerID: event.Miner.ID(),
JobID: jobID,
Diff: proxy.EffectiveShareDifficulty(job, event.Miner),
Diff: proxy.EffectiveShareDifficulty(submissionJob, event.Miner),
StartedAt: time.Now(),
}
m.lastUsed = time.Now()

View file

@ -166,6 +166,49 @@ func TestMapper_OnResultAccepted_ExpiredUsesPreviousJob(t *testing.T) {
}
}
func TestMapper_Submit_ExpiredJobUsesPreviousDifficulty(t *testing.T) {
miner := proxy.NewMiner(discardConn{}, 3333, nil)
miner.SetID(9)
strategy := &submitCaptureStrategy{}
mapper := NewNonceMapper(1, &proxy.Config{}, strategy)
mapper.storage.job = proxy.Job{JobID: "job-new", Blob: "blob-new", Target: "ffffffff"}
mapper.storage.prevJob = proxy.Job{JobID: "job-old", Blob: "blob-old", Target: "b88d0600"}
mapper.storage.miners[miner.ID()] = miner
mapper.Submit(&proxy.SubmitEvent{
Miner: miner,
JobID: "job-old",
Nonce: "deadbeef",
Result: "hash",
RequestID: 88,
})
ctx, ok := mapper.pending[strategy.seq]
if !ok {
t.Fatal("expected pending submit context for expired job")
}
want := mapper.storage.prevJob.DifficultyFromTarget()
if ctx.Diff != want {
t.Fatalf("expected previous-job difficulty %d, got %d", want, ctx.Diff)
}
}
type submitCaptureStrategy struct {
seq int64
}
func (s *submitCaptureStrategy) Connect() {}
func (s *submitCaptureStrategy) Submit(jobID, nonce, result, algo string) int64 {
s.seq++
return s.seq
}
func (s *submitCaptureStrategy) Disconnect() {}
func (s *submitCaptureStrategy) IsActive() bool { return true }
func TestMapper_OnResultAccepted_CustomDiffUsesEffectiveDifficulty(t *testing.T) {
bus := proxy.NewEventBus()
events := make(chan proxy.Event, 1)

View file

@ -452,13 +452,15 @@ func (p *Proxy) acceptMiner(conn net.Conn, localPort uint16) {
miner.globalDiff = customDiff
miner.extNH = strings.EqualFold(mode, "nicehash")
miner.onLogin = func(m *Miner) {
if p.events != nil {
p.events.Dispatch(Event{Type: EventLogin, Miner: m})
}
if p.splitter != nil {
p.splitter.OnLogin(&LoginEvent{Miner: m})
}
}
miner.onLoginAccepted = func(m *Miner) {
if p.events != nil {
p.events.Dispatch(Event{Type: EventLogin, Miner: m})
}
}
miner.onSubmit = func(m *Miner, event *SubmitEvent) {
if p.splitter != nil {
if _, ok := p.splitter.(*noopSplitter); !ok {
@ -1030,6 +1032,9 @@ func (m *Miner) handleLogin(request stratumRequest) {
m.ReplyWithError(requestID(request.ID), "Proxy is unavailable, try again later")
return
}
if m.onLoginAccepted != nil {
m.onLoginAccepted(m)
}
m.replyLoginSuccess(requestID(request.ID))
}