fix(pool): restore failover and stratum job handling

Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
Virgil 2026-04-04 11:13:41 +00:00
parent 07ff21aa67
commit 3376cea600
11 changed files with 326 additions and 30 deletions

14
job.go
View file

@ -15,13 +15,13 @@ import (
// Algo: "cn/r",
// }
type Job struct {
Blob string // hex-encoded block template (160 hex chars = 80 bytes)
JobID string // pool-assigned identifier
Target string // 8-char hex little-endian uint32 difficulty target
Algo string // algorithm e.g. "cn/r", "rx/0"; "" if not negotiated
Height uint64 // block height (0 if pool did not provide)
SeedHash string // RandomX seed hash hex (empty if not RandomX)
ClientID string // pool session ID that issued this job (for stale detection)
Blob string `json:"blob"` // hex-encoded block template (160 hex chars = 80 bytes)
JobID string `json:"job_id"` // pool-assigned identifier
Target string `json:"target"` // 8-char hex little-endian uint32 difficulty target
Algo string `json:"algo"` // algorithm e.g. "cn/r", "rx/0"; "" if not negotiated
Height uint64 `json:"height"` // block height (0 if pool did not provide)
SeedHash string `json:"seed_hash"` // RandomX seed hash hex (empty if not RandomX)
ClientID string `json:"id"` // pool session ID that issued this job (for stale detection)
}
// IsValid returns true if Blob and JobID are non-empty.

View file

@ -1,6 +1,9 @@
package proxy
import "testing"
import (
"encoding/json"
"testing"
)
func TestJob_IsValid_Good(t *testing.T) {
job := Job{Blob: "abcd", JobID: "job-1"}
@ -70,3 +73,13 @@ func TestJob_DifficultyFromTarget_Ugly(t *testing.T) {
t.Fatalf("expected maximum target to resolve to difficulty 1, got %d", got)
}
}
func TestJob_JSON_Unmarshal_Good(t *testing.T) {
var job Job
if err := json.Unmarshal([]byte(`{"blob":"abcd","job_id":"job-1","target":"b88d0600","algo":"cn/r","height":42,"seed_hash":"seed","id":"session-1"}`), &job); err != nil {
t.Fatal(err)
}
if job.JobID != "job-1" || job.SeedHash != "seed" || job.ClientID != "session-1" {
t.Fatalf("unexpected decoded job: %+v", job)
}
}

View file

@ -84,6 +84,14 @@ func (m *Miner) SetRouteID(value int64) { m.routeID = value }
func (m *Miner) SetCustomDiff(value uint64) { m.customDiff = value }
func (m *Miner) SetNiceHashEnabled(value bool) { m.extNH = value }
func (m *Miner) PrimeJob(job Job) {
if m == nil || !job.IsValid() {
return
}
m.currentJob = &job
m.diff = job.DifficultyFromTarget()
}
func (m *Miner) Touch() {
m.lastActivityAt = time.Now().UTC()
}

View file

@ -223,8 +223,12 @@ func (m *Miner) handleLogin(request minerRequest) {
}
if m.currentJob != nil && m.currentJob.IsValid() {
jobCopy := *m.currentJob
blob := jobCopy.Blob
if m.extNH {
blob = jobCopy.BlobWithFixedByte(m.fixedByte)
}
jobResult := map[string]interface{}{
"blob": jobCopy.Blob,
"blob": blob,
"job_id": jobCopy.JobID,
"target": jobCopy.Target,
"id": m.rpcID,

View file

@ -169,3 +169,59 @@ func TestMiner_Login_Ugly(t *testing.T) {
t.Fatalf("expected algo extension to be advertised, got %#v", response)
}
}
func TestMiner_Login_NiceHashPatchedJob_Good(t *testing.T) {
serverConn, clientConn := net.Pipe()
defer clientConn.Close()
miner := NewMiner(serverConn, 3333, nil)
miner.algoExtension = true
miner.events = NewEventBus()
miner.events.Subscribe(EventLogin, func(event Event) {
if event.Miner == nil {
return
}
event.Miner.SetNiceHashEnabled(true)
event.Miner.SetFixedByte(0x2a)
event.Miner.PrimeJob(Job{
Blob: "000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000",
JobID: "job-1",
Target: "b88d0600",
Algo: "cn/r",
})
})
miner.Start()
defer miner.Close()
encoder := json.NewEncoder(clientConn)
if err := encoder.Encode(map[string]interface{}{
"id": 5,
"jsonrpc": "2.0",
"method": "login",
"params": map[string]interface{}{
"login": "wallet",
"pass": "x",
"agent": "xmrig",
"algo": []string{"cn/r"},
},
}); err != nil {
t.Fatal(err)
}
clientConn.SetReadDeadline(time.Now().Add(time.Second))
line, err := bufio.NewReader(clientConn).ReadBytes('\n')
if err != nil {
t.Fatal(err)
}
var response map[string]interface{}
if err := json.Unmarshal(line, &response); err != nil {
t.Fatal(err)
}
result := response["result"].(map[string]interface{})
job := result["job"].(map[string]interface{})
if blob, _ := job["blob"].(string); blob[78:80] != "2a" {
t.Fatalf("expected patched NiceHash blob, got %q", blob)
}
}

View file

@ -19,6 +19,8 @@ type FailoverStrategy struct {
client *StratumClient
listener StratumListener
cfg *proxy.Config
closed bool
running bool
mu sync.Mutex
}
@ -65,10 +67,27 @@ func NewFailoverStrategy(pools []proxy.PoolConfig, listener StratumListener, cfg
// strategy.Connect()
func (s *FailoverStrategy) Connect() {
s.mu.Lock()
defer s.mu.Unlock()
s.closed = false
s.mu.Unlock()
s.connectFrom(0)
}
func (s *FailoverStrategy) connectFrom(start int) {
s.mu.Lock()
if s.running || s.closed {
s.mu.Unlock()
return
}
s.running = true
s.mu.Unlock()
defer func() {
s.mu.Lock()
s.running = false
s.mu.Unlock()
}()
pools := s.pools
if s.cfg != nil {
if s.cfg != nil && len(s.cfg.Pools) > 0 {
pools = s.cfg.Pools
}
if len(pools) == 0 {
@ -87,15 +106,24 @@ func (s *FailoverStrategy) Connect() {
}
for attempt := 0; attempt < retries; attempt++ {
for index, poolConfig := range pools {
for offset := 0; offset < len(pools); offset++ {
index := (start + offset) % len(pools)
poolConfig := pools[index]
if !poolConfig.Enabled {
continue
}
client := NewStratumClient(poolConfig, s.listener)
client := NewStratumClient(poolConfig, s)
if errorValue := client.Connect(); errorValue == nil {
s.mu.Lock()
if s.closed {
s.mu.Unlock()
client.Disconnect()
return
}
s.client = client
s.current = index
s.mu.Unlock()
client.Login()
return
}
@ -119,6 +147,7 @@ func (s *FailoverStrategy) Submit(jobID string, nonce string, result string, alg
func (s *FailoverStrategy) Disconnect() {
s.mu.Lock()
s.closed = true
client := s.client
s.client = nil
s.mu.Unlock()
@ -132,3 +161,37 @@ func (s *FailoverStrategy) IsActive() bool {
defer s.mu.Unlock()
return s.client != nil && s.client.active
}
func (s *FailoverStrategy) OnJob(job proxy.Job) {
if s.listener != nil {
s.listener.OnJob(job)
}
}
func (s *FailoverStrategy) OnResultAccepted(sequence int64, accepted bool, errorMessage string) {
if s.listener != nil {
s.listener.OnResultAccepted(sequence, accepted, errorMessage)
}
}
func (s *FailoverStrategy) OnDisconnect() {
s.mu.Lock()
client := s.client
s.client = nil
closed := s.closed
s.mu.Unlock()
if s.listener != nil {
s.listener.OnDisconnect()
}
if closed {
return
}
if client != nil {
client.active = false
}
go func() {
time.Sleep(10 * time.Millisecond)
s.connectFrom(0)
}()
}

110
pool/strategy_test.go Normal file
View file

@ -0,0 +1,110 @@
package pool
import (
"bufio"
"encoding/json"
"net"
"sync"
"testing"
"time"
"dappco.re/go/core/proxy"
)
type strategyTestListener struct {
jobCh chan proxy.Job
disconnectMu sync.Mutex
disconnects int
}
func (l *strategyTestListener) OnJob(job proxy.Job) {
l.jobCh <- job
}
func (l *strategyTestListener) OnResultAccepted(sequence int64, accepted bool, errorMessage string) {}
func (l *strategyTestListener) OnDisconnect() {
l.disconnectMu.Lock()
l.disconnects++
l.disconnectMu.Unlock()
}
func (l *strategyTestListener) Disconnects() int {
l.disconnectMu.Lock()
defer l.disconnectMu.Unlock()
return l.disconnects
}
func TestFailoverStrategy_Connect_Ugly(t *testing.T) {
primaryListener, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
defer primaryListener.Close()
backupListener, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
defer backupListener.Close()
go func() {
conn, acceptErr := primaryListener.Accept()
if acceptErr != nil {
return
}
_ = primaryListener.Close()
_ = conn.Close()
}()
go func() {
conn, acceptErr := backupListener.Accept()
if acceptErr != nil {
return
}
defer conn.Close()
reader := bufio.NewReader(conn)
if _, readErr := reader.ReadBytes('\n'); readErr != nil {
return
}
_ = json.NewEncoder(conn).Encode(map[string]interface{}{
"id": 1,
"jsonrpc": "2.0",
"error": nil,
"result": map[string]interface{}{
"id": "session-1",
"job": map[string]interface{}{
"blob": "abcd",
"job_id": "job-1",
"target": "b88d0600",
},
},
})
}()
listener := &strategyTestListener{
jobCh: make(chan proxy.Job, 1),
}
strategy := NewFailoverStrategy([]proxy.PoolConfig{
{URL: primaryListener.Addr().String(), Enabled: true},
{URL: backupListener.Addr().String(), Enabled: true},
}, listener, &proxy.Config{Retries: 2})
strategy.Connect()
defer strategy.Disconnect()
select {
case job := <-listener.jobCh:
if job.JobID != "job-1" {
t.Fatalf("expected backup job, got %+v", job)
}
case <-time.After(3 * time.Second):
t.Fatal("expected failover job after primary disconnect")
}
if listener.Disconnects() == 0 {
t.Fatal("expected disconnect callback before failover reconnect")
}
}

View file

@ -30,10 +30,11 @@ type NonceMapper struct {
//
// ctx := SubmitContext{RequestID: 42, MinerID: 7}
type SubmitContext struct {
RequestID int64 // JSON-RPC id from the miner's submit request
MinerID int64 // miner that submitted
JobID string
Expired bool
RequestID int64 // JSON-RPC id from the miner's submit request
MinerID int64 // miner that submitted
JobID string
Expired bool
SubmittedAt time.Time
}
// NewNonceMapper creates one upstream pool mapper and its local slot table.
@ -93,10 +94,11 @@ func (m *NonceMapper) Submit(event *proxy.SubmitEvent) {
}
m.mu.Lock()
m.pending[sequence] = SubmitContext{
RequestID: event.RequestID,
MinerID: event.Miner.ID(),
JobID: event.JobID,
Expired: expired,
RequestID: event.RequestID,
MinerID: event.Miner.ID(),
JobID: event.JobID,
Expired: expired,
SubmittedAt: time.Now().UTC(),
}
m.mu.Unlock()
}
@ -144,12 +146,24 @@ func (m *NonceMapper) OnResultAccepted(sequence int64, accepted bool, errorMessa
if m.events != nil {
jobCopy := m.storage.CurrentJob()
latency := uint16(0)
if !context.SubmittedAt.IsZero() {
elapsed := time.Since(context.SubmittedAt).Milliseconds()
if elapsed > 0 {
if elapsed > int64(^uint16(0)) {
latency = ^uint16(0)
} else {
latency = uint16(elapsed)
}
}
}
m.events.Dispatch(proxy.Event{
Type: eventType,
Miner: miner,
Job: jobCopy,
Diff: miner.Diff(),
Error: errorMessage,
Latency: latency,
Expired: context.Expired,
})
}

View file

@ -69,7 +69,7 @@ func (s *NonceSplitter) OnLogin(event *proxy.LoginEvent) {
event.Miner.SetMapperID(mapper.id)
event.Miner.SetNiceHashEnabled(true)
if currentJob := mapper.storage.CurrentJob(); currentJob != nil && currentJob.IsValid() {
event.Miner.ForwardJob(*currentJob, currentJob.Algo)
event.Miner.PrimeJob(*currentJob)
}
return
}
@ -83,7 +83,7 @@ func (s *NonceSplitter) OnLogin(event *proxy.LoginEvent) {
event.Miner.SetMapperID(mapper.id)
event.Miner.SetNiceHashEnabled(true)
if currentJob := mapper.storage.CurrentJob(); currentJob != nil && currentJob.IsValid() {
event.Miner.ForwardJob(*currentJob, currentJob.Algo)
event.Miner.PrimeJob(*currentJob)
}
}
}

View file

@ -27,8 +27,9 @@ type SimpleMapper struct {
}
type simpleSubmitContext struct {
RequestID int64
Expired bool
RequestID int64
Expired bool
SubmittedAt time.Time
}
// NewSimpleMapper stores the mapper ID and strategy.
@ -90,15 +91,17 @@ func (m *SimpleMapper) OnResultAccepted(sequence int64, accepted bool, errorMess
}
if accepted {
latency := shareLatency(context.SubmittedAt)
if m.events != nil {
m.events.Dispatch(proxy.Event{Type: proxy.EventAccept, Miner: miner, Diff: miner.Diff(), Expired: context.Expired})
m.events.Dispatch(proxy.Event{Type: proxy.EventAccept, Miner: miner, Job: m.currentJob(), Diff: miner.Diff(), Latency: latency, Expired: context.Expired})
}
miner.Success(context.RequestID, "OK")
return
}
latency := shareLatency(context.SubmittedAt)
if m.events != nil {
m.events.Dispatch(proxy.Event{Type: proxy.EventReject, Miner: miner, Diff: miner.Diff(), Error: errorMessage, Expired: context.Expired})
m.events.Dispatch(proxy.Event{Type: proxy.EventReject, Miner: miner, Job: m.currentJob(), Diff: miner.Diff(), Error: errorMessage, Latency: latency, Expired: context.Expired})
}
miner.ReplyWithError(context.RequestID, errorMessage)
}
@ -106,3 +109,27 @@ func (m *SimpleMapper) OnResultAccepted(sequence int64, accepted bool, errorMess
func (m *SimpleMapper) OnDisconnect() {
m.stopped = true
}
func (m *SimpleMapper) currentJob() *proxy.Job {
m.mu.Lock()
defer m.mu.Unlock()
if !m.job.IsValid() {
return nil
}
job := m.job
return &job
}
func shareLatency(submittedAt time.Time) uint16 {
if submittedAt.IsZero() {
return 0
}
elapsed := time.Since(submittedAt).Milliseconds()
if elapsed <= 0 {
return 0
}
if elapsed > int64(^uint16(0)) {
return ^uint16(0)
}
return uint16(elapsed)
}

View file

@ -90,7 +90,7 @@ func (s *SimpleSplitter) OnLogin(event *proxy.LoginEvent) {
currentJob := mapper.job
mapper.mu.Unlock()
if currentJob.IsValid() {
event.Miner.ForwardJob(currentJob, currentJob.Algo)
event.Miner.PrimeJob(currentJob)
}
}
@ -120,8 +120,9 @@ func (s *SimpleSplitter) OnSubmit(event *proxy.SubmitEvent) {
mapper.mu.Lock()
mapper.pending[sequence] = simpleSubmitContext{
RequestID: event.RequestID,
Expired: expired,
RequestID: event.RequestID,
Expired: expired,
SubmittedAt: time.Now().UTC(),
}
mapper.mu.Unlock()
}