diff --git a/job.go b/job.go index f327c29..836d0e3 100644 --- a/job.go +++ b/job.go @@ -15,13 +15,13 @@ import ( // Algo: "cn/r", // } type Job struct { - Blob string // hex-encoded block template (160 hex chars = 80 bytes) - JobID string // pool-assigned identifier - Target string // 8-char hex little-endian uint32 difficulty target - Algo string // algorithm e.g. "cn/r", "rx/0"; "" if not negotiated - Height uint64 // block height (0 if pool did not provide) - SeedHash string // RandomX seed hash hex (empty if not RandomX) - ClientID string // pool session ID that issued this job (for stale detection) + Blob string `json:"blob"` // hex-encoded block template (160 hex chars = 80 bytes) + JobID string `json:"job_id"` // pool-assigned identifier + Target string `json:"target"` // 8-char hex little-endian uint32 difficulty target + Algo string `json:"algo"` // algorithm e.g. "cn/r", "rx/0"; "" if not negotiated + Height uint64 `json:"height"` // block height (0 if pool did not provide) + SeedHash string `json:"seed_hash"` // RandomX seed hash hex (empty if not RandomX) + ClientID string `json:"id"` // pool session ID that issued this job (for stale detection) } // IsValid returns true if Blob and JobID are non-empty. diff --git a/job_test.go b/job_test.go index 90c9f47..3bcba34 100644 --- a/job_test.go +++ b/job_test.go @@ -1,6 +1,9 @@ package proxy -import "testing" +import ( + "encoding/json" + "testing" +) func TestJob_IsValid_Good(t *testing.T) { job := Job{Blob: "abcd", JobID: "job-1"} @@ -70,3 +73,13 @@ func TestJob_DifficultyFromTarget_Ugly(t *testing.T) { t.Fatalf("expected maximum target to resolve to difficulty 1, got %d", got) } } + +func TestJob_JSON_Unmarshal_Good(t *testing.T) { + var job Job + if err := json.Unmarshal([]byte(`{"blob":"abcd","job_id":"job-1","target":"b88d0600","algo":"cn/r","height":42,"seed_hash":"seed","id":"session-1"}`), &job); err != nil { + t.Fatal(err) + } + if job.JobID != "job-1" || job.SeedHash != "seed" || job.ClientID != "session-1" { + t.Fatalf("unexpected decoded job: %+v", job) + } +} diff --git a/miner_methods.go b/miner_methods.go index c8bd47b..05905f0 100644 --- a/miner_methods.go +++ b/miner_methods.go @@ -84,6 +84,14 @@ func (m *Miner) SetRouteID(value int64) { m.routeID = value } func (m *Miner) SetCustomDiff(value uint64) { m.customDiff = value } func (m *Miner) SetNiceHashEnabled(value bool) { m.extNH = value } +func (m *Miner) PrimeJob(job Job) { + if m == nil || !job.IsValid() { + return + } + m.currentJob = &job + m.diff = job.DifficultyFromTarget() +} + func (m *Miner) Touch() { m.lastActivityAt = time.Now().UTC() } diff --git a/miner_runtime.go b/miner_runtime.go index 59f8d4e..07385df 100644 --- a/miner_runtime.go +++ b/miner_runtime.go @@ -223,8 +223,12 @@ func (m *Miner) handleLogin(request minerRequest) { } if m.currentJob != nil && m.currentJob.IsValid() { jobCopy := *m.currentJob + blob := jobCopy.Blob + if m.extNH { + blob = jobCopy.BlobWithFixedByte(m.fixedByte) + } jobResult := map[string]interface{}{ - "blob": jobCopy.Blob, + "blob": blob, "job_id": jobCopy.JobID, "target": jobCopy.Target, "id": m.rpcID, diff --git a/miner_runtime_test.go b/miner_runtime_test.go index 18ad9c8..7518e55 100644 --- a/miner_runtime_test.go +++ b/miner_runtime_test.go @@ -169,3 +169,59 @@ func TestMiner_Login_Ugly(t *testing.T) { t.Fatalf("expected algo extension to be advertised, got %#v", response) } } + +func TestMiner_Login_NiceHashPatchedJob_Good(t *testing.T) { + serverConn, clientConn := net.Pipe() + defer clientConn.Close() + + miner := NewMiner(serverConn, 3333, nil) + miner.algoExtension = true + miner.events = NewEventBus() + miner.events.Subscribe(EventLogin, func(event Event) { + if event.Miner == nil { + return + } + event.Miner.SetNiceHashEnabled(true) + event.Miner.SetFixedByte(0x2a) + event.Miner.PrimeJob(Job{ + Blob: "000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", + JobID: "job-1", + Target: "b88d0600", + Algo: "cn/r", + }) + }) + miner.Start() + defer miner.Close() + + encoder := json.NewEncoder(clientConn) + if err := encoder.Encode(map[string]interface{}{ + "id": 5, + "jsonrpc": "2.0", + "method": "login", + "params": map[string]interface{}{ + "login": "wallet", + "pass": "x", + "agent": "xmrig", + "algo": []string{"cn/r"}, + }, + }); err != nil { + t.Fatal(err) + } + + clientConn.SetReadDeadline(time.Now().Add(time.Second)) + line, err := bufio.NewReader(clientConn).ReadBytes('\n') + if err != nil { + t.Fatal(err) + } + + var response map[string]interface{} + if err := json.Unmarshal(line, &response); err != nil { + t.Fatal(err) + } + + result := response["result"].(map[string]interface{}) + job := result["job"].(map[string]interface{}) + if blob, _ := job["blob"].(string); blob[78:80] != "2a" { + t.Fatalf("expected patched NiceHash blob, got %q", blob) + } +} diff --git a/pool/strategy.go b/pool/strategy.go index 909737f..9fa6724 100644 --- a/pool/strategy.go +++ b/pool/strategy.go @@ -19,6 +19,8 @@ type FailoverStrategy struct { client *StratumClient listener StratumListener cfg *proxy.Config + closed bool + running bool mu sync.Mutex } @@ -65,10 +67,27 @@ func NewFailoverStrategy(pools []proxy.PoolConfig, listener StratumListener, cfg // strategy.Connect() func (s *FailoverStrategy) Connect() { s.mu.Lock() - defer s.mu.Unlock() + s.closed = false + s.mu.Unlock() + s.connectFrom(0) +} + +func (s *FailoverStrategy) connectFrom(start int) { + s.mu.Lock() + if s.running || s.closed { + s.mu.Unlock() + return + } + s.running = true + s.mu.Unlock() + defer func() { + s.mu.Lock() + s.running = false + s.mu.Unlock() + }() pools := s.pools - if s.cfg != nil { + if s.cfg != nil && len(s.cfg.Pools) > 0 { pools = s.cfg.Pools } if len(pools) == 0 { @@ -87,15 +106,24 @@ func (s *FailoverStrategy) Connect() { } for attempt := 0; attempt < retries; attempt++ { - for index, poolConfig := range pools { + for offset := 0; offset < len(pools); offset++ { + index := (start + offset) % len(pools) + poolConfig := pools[index] if !poolConfig.Enabled { continue } - client := NewStratumClient(poolConfig, s.listener) + client := NewStratumClient(poolConfig, s) if errorValue := client.Connect(); errorValue == nil { + s.mu.Lock() + if s.closed { + s.mu.Unlock() + client.Disconnect() + return + } s.client = client s.current = index + s.mu.Unlock() client.Login() return } @@ -119,6 +147,7 @@ func (s *FailoverStrategy) Submit(jobID string, nonce string, result string, alg func (s *FailoverStrategy) Disconnect() { s.mu.Lock() + s.closed = true client := s.client s.client = nil s.mu.Unlock() @@ -132,3 +161,37 @@ func (s *FailoverStrategy) IsActive() bool { defer s.mu.Unlock() return s.client != nil && s.client.active } + +func (s *FailoverStrategy) OnJob(job proxy.Job) { + if s.listener != nil { + s.listener.OnJob(job) + } +} + +func (s *FailoverStrategy) OnResultAccepted(sequence int64, accepted bool, errorMessage string) { + if s.listener != nil { + s.listener.OnResultAccepted(sequence, accepted, errorMessage) + } +} + +func (s *FailoverStrategy) OnDisconnect() { + s.mu.Lock() + client := s.client + s.client = nil + closed := s.closed + s.mu.Unlock() + + if s.listener != nil { + s.listener.OnDisconnect() + } + if closed { + return + } + if client != nil { + client.active = false + } + go func() { + time.Sleep(10 * time.Millisecond) + s.connectFrom(0) + }() +} diff --git a/pool/strategy_test.go b/pool/strategy_test.go new file mode 100644 index 0000000..0076c5c --- /dev/null +++ b/pool/strategy_test.go @@ -0,0 +1,110 @@ +package pool + +import ( + "bufio" + "encoding/json" + "net" + "sync" + "testing" + "time" + + "dappco.re/go/core/proxy" +) + +type strategyTestListener struct { + jobCh chan proxy.Job + disconnectMu sync.Mutex + disconnects int +} + +func (l *strategyTestListener) OnJob(job proxy.Job) { + l.jobCh <- job +} + +func (l *strategyTestListener) OnResultAccepted(sequence int64, accepted bool, errorMessage string) {} + +func (l *strategyTestListener) OnDisconnect() { + l.disconnectMu.Lock() + l.disconnects++ + l.disconnectMu.Unlock() +} + +func (l *strategyTestListener) Disconnects() int { + l.disconnectMu.Lock() + defer l.disconnectMu.Unlock() + return l.disconnects +} + +func TestFailoverStrategy_Connect_Ugly(t *testing.T) { + primaryListener, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + defer primaryListener.Close() + + backupListener, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + defer backupListener.Close() + + go func() { + conn, acceptErr := primaryListener.Accept() + if acceptErr != nil { + return + } + _ = primaryListener.Close() + _ = conn.Close() + }() + + go func() { + conn, acceptErr := backupListener.Accept() + if acceptErr != nil { + return + } + defer conn.Close() + + reader := bufio.NewReader(conn) + if _, readErr := reader.ReadBytes('\n'); readErr != nil { + return + } + + _ = json.NewEncoder(conn).Encode(map[string]interface{}{ + "id": 1, + "jsonrpc": "2.0", + "error": nil, + "result": map[string]interface{}{ + "id": "session-1", + "job": map[string]interface{}{ + "blob": "abcd", + "job_id": "job-1", + "target": "b88d0600", + }, + }, + }) + }() + + listener := &strategyTestListener{ + jobCh: make(chan proxy.Job, 1), + } + strategy := NewFailoverStrategy([]proxy.PoolConfig{ + {URL: primaryListener.Addr().String(), Enabled: true}, + {URL: backupListener.Addr().String(), Enabled: true}, + }, listener, &proxy.Config{Retries: 2}) + + strategy.Connect() + defer strategy.Disconnect() + + select { + case job := <-listener.jobCh: + if job.JobID != "job-1" { + t.Fatalf("expected backup job, got %+v", job) + } + case <-time.After(3 * time.Second): + t.Fatal("expected failover job after primary disconnect") + } + + if listener.Disconnects() == 0 { + t.Fatal("expected disconnect callback before failover reconnect") + } +} diff --git a/splitter/nicehash/mapper.go b/splitter/nicehash/mapper.go index b532160..20d8eb5 100644 --- a/splitter/nicehash/mapper.go +++ b/splitter/nicehash/mapper.go @@ -30,10 +30,11 @@ type NonceMapper struct { // // ctx := SubmitContext{RequestID: 42, MinerID: 7} type SubmitContext struct { - RequestID int64 // JSON-RPC id from the miner's submit request - MinerID int64 // miner that submitted - JobID string - Expired bool + RequestID int64 // JSON-RPC id from the miner's submit request + MinerID int64 // miner that submitted + JobID string + Expired bool + SubmittedAt time.Time } // NewNonceMapper creates one upstream pool mapper and its local slot table. @@ -93,10 +94,11 @@ func (m *NonceMapper) Submit(event *proxy.SubmitEvent) { } m.mu.Lock() m.pending[sequence] = SubmitContext{ - RequestID: event.RequestID, - MinerID: event.Miner.ID(), - JobID: event.JobID, - Expired: expired, + RequestID: event.RequestID, + MinerID: event.Miner.ID(), + JobID: event.JobID, + Expired: expired, + SubmittedAt: time.Now().UTC(), } m.mu.Unlock() } @@ -144,12 +146,24 @@ func (m *NonceMapper) OnResultAccepted(sequence int64, accepted bool, errorMessa if m.events != nil { jobCopy := m.storage.CurrentJob() + latency := uint16(0) + if !context.SubmittedAt.IsZero() { + elapsed := time.Since(context.SubmittedAt).Milliseconds() + if elapsed > 0 { + if elapsed > int64(^uint16(0)) { + latency = ^uint16(0) + } else { + latency = uint16(elapsed) + } + } + } m.events.Dispatch(proxy.Event{ Type: eventType, Miner: miner, Job: jobCopy, Diff: miner.Diff(), Error: errorMessage, + Latency: latency, Expired: context.Expired, }) } diff --git a/splitter/nicehash/splitter.go b/splitter/nicehash/splitter.go index f4aa086..7a16a43 100644 --- a/splitter/nicehash/splitter.go +++ b/splitter/nicehash/splitter.go @@ -69,7 +69,7 @@ func (s *NonceSplitter) OnLogin(event *proxy.LoginEvent) { event.Miner.SetMapperID(mapper.id) event.Miner.SetNiceHashEnabled(true) if currentJob := mapper.storage.CurrentJob(); currentJob != nil && currentJob.IsValid() { - event.Miner.ForwardJob(*currentJob, currentJob.Algo) + event.Miner.PrimeJob(*currentJob) } return } @@ -83,7 +83,7 @@ func (s *NonceSplitter) OnLogin(event *proxy.LoginEvent) { event.Miner.SetMapperID(mapper.id) event.Miner.SetNiceHashEnabled(true) if currentJob := mapper.storage.CurrentJob(); currentJob != nil && currentJob.IsValid() { - event.Miner.ForwardJob(*currentJob, currentJob.Algo) + event.Miner.PrimeJob(*currentJob) } } } diff --git a/splitter/simple/mapper.go b/splitter/simple/mapper.go index e80beda..c736db2 100644 --- a/splitter/simple/mapper.go +++ b/splitter/simple/mapper.go @@ -27,8 +27,9 @@ type SimpleMapper struct { } type simpleSubmitContext struct { - RequestID int64 - Expired bool + RequestID int64 + Expired bool + SubmittedAt time.Time } // NewSimpleMapper stores the mapper ID and strategy. @@ -90,15 +91,17 @@ func (m *SimpleMapper) OnResultAccepted(sequence int64, accepted bool, errorMess } if accepted { + latency := shareLatency(context.SubmittedAt) if m.events != nil { - m.events.Dispatch(proxy.Event{Type: proxy.EventAccept, Miner: miner, Diff: miner.Diff(), Expired: context.Expired}) + m.events.Dispatch(proxy.Event{Type: proxy.EventAccept, Miner: miner, Job: m.currentJob(), Diff: miner.Diff(), Latency: latency, Expired: context.Expired}) } miner.Success(context.RequestID, "OK") return } + latency := shareLatency(context.SubmittedAt) if m.events != nil { - m.events.Dispatch(proxy.Event{Type: proxy.EventReject, Miner: miner, Diff: miner.Diff(), Error: errorMessage, Expired: context.Expired}) + m.events.Dispatch(proxy.Event{Type: proxy.EventReject, Miner: miner, Job: m.currentJob(), Diff: miner.Diff(), Error: errorMessage, Latency: latency, Expired: context.Expired}) } miner.ReplyWithError(context.RequestID, errorMessage) } @@ -106,3 +109,27 @@ func (m *SimpleMapper) OnResultAccepted(sequence int64, accepted bool, errorMess func (m *SimpleMapper) OnDisconnect() { m.stopped = true } + +func (m *SimpleMapper) currentJob() *proxy.Job { + m.mu.Lock() + defer m.mu.Unlock() + if !m.job.IsValid() { + return nil + } + job := m.job + return &job +} + +func shareLatency(submittedAt time.Time) uint16 { + if submittedAt.IsZero() { + return 0 + } + elapsed := time.Since(submittedAt).Milliseconds() + if elapsed <= 0 { + return 0 + } + if elapsed > int64(^uint16(0)) { + return ^uint16(0) + } + return uint16(elapsed) +} diff --git a/splitter/simple/splitter.go b/splitter/simple/splitter.go index 571a916..31a8ef2 100644 --- a/splitter/simple/splitter.go +++ b/splitter/simple/splitter.go @@ -90,7 +90,7 @@ func (s *SimpleSplitter) OnLogin(event *proxy.LoginEvent) { currentJob := mapper.job mapper.mu.Unlock() if currentJob.IsValid() { - event.Miner.ForwardJob(currentJob, currentJob.Algo) + event.Miner.PrimeJob(currentJob) } } @@ -120,8 +120,9 @@ func (s *SimpleSplitter) OnSubmit(event *proxy.SubmitEvent) { mapper.mu.Lock() mapper.pending[sequence] = simpleSubmitContext{ - RequestID: event.RequestID, - Expired: expired, + RequestID: event.RequestID, + Expired: expired, + SubmittedAt: time.Now().UTC(), } mapper.mu.Unlock() }