go-proxy/splitter/nicehash/mapper.go
Virgil 07ff21aa67 fix(proxy): align job and splitter behaviour with RFC
Co-Authored-By: Virgil <virgil@lethean.io>
2026-04-04 11:07:41 +00:00

181 lines
3.8 KiB
Go

package nicehash
import (
"sync"
"time"
"dappco.re/go/core/proxy"
"dappco.re/go/core/proxy/pool"
)
// NonceMapper manages one outbound pool connection and the 256-slot NonceStorage.
// It implements pool.StratumListener to receive job and result events from the pool.
//
// m := nicehash.NewNonceMapper(id, cfg, strategy)
// m.Start()
type NonceMapper struct {
id int64
storage *NonceStorage
strategy pool.Strategy // manages pool client lifecycle and failover
pending map[int64]SubmitContext // sequence → {requestID, minerID, jobID}
cfg *proxy.Config
events *proxy.EventBus
active bool // true once pool has sent at least one job
suspended int // > 0 when pool connection is in error/reconnecting
idleAt time.Time
mu sync.Mutex
}
// SubmitContext tracks one in-flight share submission waiting for pool reply.
//
// ctx := SubmitContext{RequestID: 42, MinerID: 7}
type SubmitContext struct {
RequestID int64 // JSON-RPC id from the miner's submit request
MinerID int64 // miner that submitted
JobID string
Expired bool
}
// NewNonceMapper creates one upstream pool mapper and its local slot table.
//
// mapper := nicehash.NewNonceMapper(1, cfg, strategy)
func NewNonceMapper(id int64, cfg *proxy.Config, strategy pool.Strategy) *NonceMapper {
return &NonceMapper{
id: id,
storage: NewNonceStorage(),
strategy: strategy,
cfg: cfg,
pending: make(map[int64]SubmitContext),
}
}
func (m *NonceMapper) Add(miner *proxy.Miner) bool {
if !m.storage.Add(miner) {
return false
}
m.mu.Lock()
m.idleAt = time.Time{}
m.mu.Unlock()
return true
}
func (m *NonceMapper) Remove(miner *proxy.Miner) {
m.storage.Remove(miner)
_, _, active := m.storage.SlotCount()
if active == 0 {
m.mu.Lock()
if m.idleAt.IsZero() {
m.idleAt = time.Now().UTC()
}
m.mu.Unlock()
}
}
func (m *NonceMapper) Submit(event *proxy.SubmitEvent) {
if event == nil || event.Miner == nil || m.strategy == nil {
return
}
valid, expired := m.storage.JobStatus(event.JobID)
if !valid {
event.Miner.ReplyWithError(event.RequestID, "Invalid job id")
return
}
sequence := m.strategy.Submit(event.JobID, event.Nonce, event.Result, event.Algo)
if sequence == 0 {
if event.Miner != nil {
event.Miner.ReplyWithError(event.RequestID, "Pool unavailable")
}
return
}
m.mu.Lock()
m.pending[sequence] = SubmitContext{
RequestID: event.RequestID,
MinerID: event.Miner.ID(),
JobID: event.JobID,
Expired: expired,
}
m.mu.Unlock()
}
func (m *NonceMapper) IsActive() bool {
if m.strategy == nil {
return false
}
return m.strategy.IsActive()
}
func (m *NonceMapper) OnJob(job proxy.Job) {
if !job.IsValid() {
return
}
m.mu.Lock()
m.active = true
m.suspended = 0
m.idleAt = time.Time{}
m.mu.Unlock()
m.storage.SetJob(job)
}
func (m *NonceMapper) OnResultAccepted(sequence int64, accepted bool, errorMessage string) {
m.mu.Lock()
context, exists := m.pending[sequence]
if exists {
delete(m.pending, sequence)
}
m.mu.Unlock()
if !exists {
return
}
miner := m.storage.Miners()[context.MinerID]
if miner == nil {
return
}
eventType := proxy.EventReject
if accepted {
eventType = proxy.EventAccept
}
if m.events != nil {
jobCopy := m.storage.CurrentJob()
m.events.Dispatch(proxy.Event{
Type: eventType,
Miner: miner,
Job: jobCopy,
Diff: miner.Diff(),
Error: errorMessage,
Expired: context.Expired,
})
}
if accepted {
miner.Success(context.RequestID, "OK")
return
}
miner.ReplyWithError(context.RequestID, errorMessage)
}
func (m *NonceMapper) OnDisconnect() {
m.mu.Lock()
m.active = false
m.suspended++
m.mu.Unlock()
}
func (m *NonceMapper) IdleDuration(now time.Time) time.Duration {
m.mu.Lock()
idleAt := m.idleAt
m.mu.Unlock()
if idleAt.IsZero() {
return 0
}
return now.Sub(idleAt)
}