216 lines
4.5 KiB
Go
216 lines
4.5 KiB
Go
package nicehash
|
|
|
|
import (
|
|
"sync"
|
|
"time"
|
|
|
|
"dappco.re/go/core/proxy"
|
|
"dappco.re/go/core/proxy/pool"
|
|
)
|
|
|
|
// NonceMapper manages one outbound pool connection and the 256-slot NonceStorage.
|
|
// It implements pool.StratumListener to receive job and result events from the pool.
|
|
//
|
|
// m := nicehash.NewNonceMapper(id, cfg, strategy)
|
|
// m.Start()
|
|
type NonceMapper struct {
|
|
id int64
|
|
storage *NonceStorage
|
|
strategy pool.Strategy // manages pool client lifecycle and failover
|
|
pending map[int64]SubmitContext // sequence → {requestID, minerID, jobID}
|
|
cfg *proxy.Config
|
|
events *proxy.EventBus
|
|
active bool // true once pool has sent at least one job
|
|
suspended int // > 0 when pool connection is in error/reconnecting
|
|
idleAt time.Time
|
|
mu sync.Mutex
|
|
}
|
|
|
|
// SubmitContext tracks one in-flight share submission waiting for pool reply.
|
|
//
|
|
// ctx := SubmitContext{RequestID: 42, MinerID: 7}
|
|
type SubmitContext struct {
|
|
RequestID int64 // JSON-RPC id from the miner's submit request
|
|
MinerID int64 // miner that submitted
|
|
Job proxy.Job
|
|
JobID string
|
|
Expired bool
|
|
SubmittedAt time.Time
|
|
}
|
|
|
|
// NewNonceMapper creates one upstream pool mapper and its local slot table.
|
|
//
|
|
// mapper := nicehash.NewNonceMapper(1, cfg, strategy)
|
|
func NewNonceMapper(id int64, cfg *proxy.Config, strategy pool.Strategy) *NonceMapper {
|
|
return &NonceMapper{
|
|
id: id,
|
|
storage: NewNonceStorage(),
|
|
strategy: strategy,
|
|
cfg: cfg,
|
|
pending: make(map[int64]SubmitContext),
|
|
}
|
|
}
|
|
|
|
func (m *NonceMapper) Add(miner *proxy.Miner) bool {
|
|
if !m.storage.Add(miner) {
|
|
return false
|
|
}
|
|
|
|
m.mu.Lock()
|
|
m.idleAt = time.Time{}
|
|
m.mu.Unlock()
|
|
return true
|
|
}
|
|
|
|
func (m *NonceMapper) Remove(miner *proxy.Miner) {
|
|
m.storage.Remove(miner)
|
|
|
|
_, _, active := m.storage.SlotCount()
|
|
if active == 0 {
|
|
m.mu.Lock()
|
|
if m.idleAt.IsZero() {
|
|
m.idleAt = time.Now().UTC()
|
|
}
|
|
m.mu.Unlock()
|
|
}
|
|
}
|
|
|
|
func (m *NonceMapper) Submit(event *proxy.SubmitEvent) {
|
|
if event == nil || event.Miner == nil || m.strategy == nil {
|
|
return
|
|
}
|
|
|
|
job, valid, expired := m.storage.JobForID(event.JobID)
|
|
if !valid {
|
|
event.Miner.ReplyWithError(event.RequestID, "Invalid job id")
|
|
return
|
|
}
|
|
|
|
sequence := m.strategy.Submit(event.JobID, event.Nonce, event.Result, event.Algo)
|
|
if sequence == 0 {
|
|
if event.Miner != nil {
|
|
event.Miner.ReplyWithError(event.RequestID, "Pool unavailable")
|
|
}
|
|
return
|
|
}
|
|
m.mu.Lock()
|
|
m.pending[sequence] = SubmitContext{
|
|
RequestID: event.RequestID,
|
|
MinerID: event.Miner.ID(),
|
|
Job: job,
|
|
JobID: event.JobID,
|
|
Expired: expired,
|
|
SubmittedAt: time.Now().UTC(),
|
|
}
|
|
m.mu.Unlock()
|
|
}
|
|
|
|
func (m *NonceMapper) IsActive() bool {
|
|
if m.strategy == nil {
|
|
return false
|
|
}
|
|
return m.strategy.IsActive()
|
|
}
|
|
|
|
func (m *NonceMapper) OnJob(job proxy.Job) {
|
|
if !job.IsValid() {
|
|
return
|
|
}
|
|
|
|
m.mu.Lock()
|
|
m.active = true
|
|
m.suspended = 0
|
|
m.idleAt = time.Time{}
|
|
m.mu.Unlock()
|
|
m.storage.SetJob(job)
|
|
}
|
|
|
|
func (m *NonceMapper) OnResultAccepted(sequence int64, accepted bool, errorMessage string) {
|
|
m.mu.Lock()
|
|
context, exists := m.pending[sequence]
|
|
if exists {
|
|
delete(m.pending, sequence)
|
|
}
|
|
m.mu.Unlock()
|
|
if !exists {
|
|
return
|
|
}
|
|
|
|
miner := m.storage.Miners()[context.MinerID]
|
|
if miner == nil {
|
|
return
|
|
}
|
|
shareDifficulty := context.Job.DifficultyFromTarget()
|
|
if shareDifficulty == 0 {
|
|
shareDifficulty = miner.Diff()
|
|
}
|
|
|
|
eventType := proxy.EventReject
|
|
if accepted {
|
|
eventType = proxy.EventAccept
|
|
}
|
|
|
|
if m.events != nil {
|
|
latency := uint16(0)
|
|
if !context.SubmittedAt.IsZero() {
|
|
elapsed := time.Since(context.SubmittedAt).Milliseconds()
|
|
if elapsed > 0 {
|
|
if elapsed > int64(^uint16(0)) {
|
|
latency = ^uint16(0)
|
|
} else {
|
|
latency = uint16(elapsed)
|
|
}
|
|
}
|
|
}
|
|
m.events.Dispatch(proxy.Event{
|
|
Type: eventType,
|
|
Miner: miner,
|
|
Job: jobPointer(context.Job),
|
|
Diff: shareDifficulty,
|
|
Error: errorMessage,
|
|
Latency: latency,
|
|
Expired: context.Expired,
|
|
})
|
|
}
|
|
|
|
if accepted {
|
|
miner.Success(context.RequestID, "OK")
|
|
return
|
|
}
|
|
miner.ReplyWithError(context.RequestID, errorMessage)
|
|
}
|
|
|
|
func (m *NonceMapper) OnDisconnect() {
|
|
m.clearPending()
|
|
m.mu.Lock()
|
|
m.active = false
|
|
m.suspended++
|
|
m.mu.Unlock()
|
|
}
|
|
|
|
func (m *NonceMapper) IdleDuration(now time.Time) time.Duration {
|
|
m.mu.Lock()
|
|
idleAt := m.idleAt
|
|
m.mu.Unlock()
|
|
|
|
if idleAt.IsZero() {
|
|
return 0
|
|
}
|
|
|
|
return now.Sub(idleAt)
|
|
}
|
|
|
|
func (m *NonceMapper) clearPending() {
|
|
m.mu.Lock()
|
|
m.pending = make(map[int64]SubmitContext)
|
|
m.mu.Unlock()
|
|
}
|
|
|
|
func jobPointer(job proxy.Job) *proxy.Job {
|
|
if !job.IsValid() {
|
|
return nil
|
|
}
|
|
|
|
jobCopy := job
|
|
return &jobCopy
|
|
}
|