go-proxy/splitter/simple/mapper.go
Virgil 259f7e80c8 fix(proxy): reset custom diff and preserve share difficulty
Co-Authored-By: Virgil <virgil@lethean.io>
2026-04-04 13:24:17 +00:00

151 lines
3.4 KiB
Go

package simple
import (
"sync"
"time"
"dappco.re/go/core/proxy"
"dappco.re/go/core/proxy/pool"
)
// SimpleMapper holds one outbound pool connection and serves at most one active miner
// at a time. It becomes idle when the miner disconnects and may be reclaimed for the
// next login.
//
// m := simple.NewSimpleMapper(id, strategy)
type SimpleMapper struct {
id int64
miner *proxy.Miner // nil when idle
strategy pool.Strategy
events *proxy.EventBus
pending map[int64]simpleSubmitContext
job proxy.Job
prevJob proxy.Job
idleAt time.Time // zero when active
stopped bool
mu sync.Mutex
}
type simpleSubmitContext struct {
RequestID int64
Job proxy.Job
Expired bool
SubmittedAt time.Time
}
// NewSimpleMapper stores the mapper ID and strategy.
//
// mapper := simple.NewSimpleMapper(1, strategy)
func NewSimpleMapper(id int64, strategy pool.Strategy) *SimpleMapper {
return &SimpleMapper{id: id, strategy: strategy, pending: make(map[int64]simpleSubmitContext)}
}
func (m *SimpleMapper) OnJob(job proxy.Job) {
if !job.IsValid() {
return
}
m.mu.Lock()
if m.job.IsValid() && m.job.ClientID != "" && m.job.ClientID == job.ClientID {
m.prevJob = m.job
} else {
m.prevJob = proxy.Job{}
}
m.job = job
miner := m.miner
m.mu.Unlock()
if miner != nil {
miner.ForwardJob(job, job.Algo)
}
}
func (m *SimpleMapper) JobStatus(id string) (valid bool, expired bool) {
_, valid, expired = m.JobForID(id)
return valid, expired
}
func (m *SimpleMapper) JobForID(id string) (proxy.Job, bool, bool) {
m.mu.Lock()
defer m.mu.Unlock()
if id == "" {
return proxy.Job{}, false, false
}
if id == m.job.JobID {
return m.job, true, false
}
if m.prevJob.IsValid() && m.prevJob.ClientID != "" && id == m.prevJob.JobID {
return m.prevJob, true, true
}
return proxy.Job{}, false, false
}
func (m *SimpleMapper) OnResultAccepted(sequence int64, accepted bool, errorMessage string) {
m.mu.Lock()
context, exists := m.pending[sequence]
miner := m.miner
if !exists {
m.mu.Unlock()
return
}
delete(m.pending, sequence)
m.mu.Unlock()
if miner == nil {
return
}
shareDifficulty := context.Job.DifficultyFromTarget()
if shareDifficulty == 0 {
shareDifficulty = miner.Diff()
}
if accepted {
latency := shareLatency(context.SubmittedAt)
if m.events != nil {
m.events.Dispatch(proxy.Event{Type: proxy.EventAccept, Miner: miner, Job: jobPointer(context.Job), Diff: shareDifficulty, Latency: latency, Expired: context.Expired})
}
miner.Success(context.RequestID, "OK")
return
}
latency := shareLatency(context.SubmittedAt)
if m.events != nil {
m.events.Dispatch(proxy.Event{Type: proxy.EventReject, Miner: miner, Job: jobPointer(context.Job), Diff: shareDifficulty, Error: errorMessage, Latency: latency, Expired: context.Expired})
}
miner.ReplyWithError(context.RequestID, errorMessage)
}
func (m *SimpleMapper) OnDisconnect() {
m.clearPending()
m.stopped = true
}
func (m *SimpleMapper) clearPending() {
m.mu.Lock()
m.pending = make(map[int64]simpleSubmitContext)
m.mu.Unlock()
}
func jobPointer(job proxy.Job) *proxy.Job {
if !job.IsValid() {
return nil
}
jobCopy := job
return &jobCopy
}
func shareLatency(submittedAt time.Time) uint16 {
if submittedAt.IsZero() {
return 0
}
elapsed := time.Since(submittedAt).Milliseconds()
if elapsed <= 0 {
return 0
}
if elapsed > int64(^uint16(0)) {
return ^uint16(0)
}
return uint16(elapsed)
}