147 lines
3.3 KiB
Go
147 lines
3.3 KiB
Go
package simple
|
|
|
|
import (
|
|
"sync"
|
|
"time"
|
|
|
|
"dappco.re/go/core/proxy"
|
|
"dappco.re/go/core/proxy/pool"
|
|
)
|
|
|
|
// SimpleMapper holds one outbound pool connection and serves at most one active miner
|
|
// at a time. It becomes idle when the miner disconnects and may be reclaimed for the
|
|
// next login.
|
|
//
|
|
// m := simple.NewSimpleMapper(id, strategy)
|
|
type SimpleMapper struct {
|
|
id int64
|
|
miner *proxy.Miner // nil when idle
|
|
strategy pool.Strategy
|
|
events *proxy.EventBus
|
|
pending map[int64]simpleSubmitContext
|
|
job proxy.Job
|
|
prevJob proxy.Job
|
|
idleAt time.Time // zero when active
|
|
stopped bool
|
|
mu sync.Mutex
|
|
}
|
|
|
|
type simpleSubmitContext struct {
|
|
RequestID int64
|
|
Job proxy.Job
|
|
Expired bool
|
|
SubmittedAt time.Time
|
|
}
|
|
|
|
// NewSimpleMapper stores the mapper ID and strategy.
|
|
//
|
|
// mapper := simple.NewSimpleMapper(1, strategy)
|
|
func NewSimpleMapper(id int64, strategy pool.Strategy) *SimpleMapper {
|
|
return &SimpleMapper{id: id, strategy: strategy, pending: make(map[int64]simpleSubmitContext)}
|
|
}
|
|
|
|
func (m *SimpleMapper) OnJob(job proxy.Job) {
|
|
if !job.IsValid() {
|
|
return
|
|
}
|
|
|
|
m.mu.Lock()
|
|
if m.job.IsValid() && m.job.ClientID != "" && m.job.ClientID == job.ClientID {
|
|
m.prevJob = m.job
|
|
} else {
|
|
m.prevJob = proxy.Job{}
|
|
}
|
|
m.job = job
|
|
miner := m.miner
|
|
m.mu.Unlock()
|
|
|
|
if miner != nil {
|
|
miner.ForwardJob(job, job.Algo)
|
|
}
|
|
}
|
|
|
|
func (m *SimpleMapper) JobStatus(id string) (valid bool, expired bool) {
|
|
_, valid, expired = m.JobForID(id)
|
|
return valid, expired
|
|
}
|
|
|
|
func (m *SimpleMapper) JobForID(id string) (proxy.Job, bool, bool) {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
|
|
if id == "" {
|
|
return proxy.Job{}, false, false
|
|
}
|
|
if id == m.job.JobID {
|
|
return m.job, true, false
|
|
}
|
|
if m.prevJob.IsValid() && m.prevJob.ClientID != "" && id == m.prevJob.JobID {
|
|
return m.prevJob, true, true
|
|
}
|
|
return proxy.Job{}, false, false
|
|
}
|
|
|
|
func (m *SimpleMapper) OnResultAccepted(sequence int64, accepted bool, errorMessage string) {
|
|
m.mu.Lock()
|
|
context, exists := m.pending[sequence]
|
|
miner := m.miner
|
|
if !exists {
|
|
m.mu.Unlock()
|
|
return
|
|
}
|
|
delete(m.pending, sequence)
|
|
m.mu.Unlock()
|
|
|
|
if miner == nil {
|
|
return
|
|
}
|
|
|
|
if accepted {
|
|
latency := shareLatency(context.SubmittedAt)
|
|
if m.events != nil {
|
|
m.events.Dispatch(proxy.Event{Type: proxy.EventAccept, Miner: miner, Job: jobPointer(context.Job), Diff: miner.Diff(), Latency: latency, Expired: context.Expired})
|
|
}
|
|
miner.Success(context.RequestID, "OK")
|
|
return
|
|
}
|
|
|
|
latency := shareLatency(context.SubmittedAt)
|
|
if m.events != nil {
|
|
m.events.Dispatch(proxy.Event{Type: proxy.EventReject, Miner: miner, Job: jobPointer(context.Job), Diff: miner.Diff(), Error: errorMessage, Latency: latency, Expired: context.Expired})
|
|
}
|
|
miner.ReplyWithError(context.RequestID, errorMessage)
|
|
}
|
|
|
|
func (m *SimpleMapper) OnDisconnect() {
|
|
m.clearPending()
|
|
m.stopped = true
|
|
}
|
|
|
|
func (m *SimpleMapper) clearPending() {
|
|
m.mu.Lock()
|
|
m.pending = make(map[int64]simpleSubmitContext)
|
|
m.mu.Unlock()
|
|
}
|
|
|
|
func jobPointer(job proxy.Job) *proxy.Job {
|
|
if !job.IsValid() {
|
|
return nil
|
|
}
|
|
|
|
jobCopy := job
|
|
return &jobCopy
|
|
}
|
|
|
|
func shareLatency(submittedAt time.Time) uint16 {
|
|
if submittedAt.IsZero() {
|
|
return 0
|
|
}
|
|
elapsed := time.Since(submittedAt).Milliseconds()
|
|
if elapsed <= 0 {
|
|
return 0
|
|
}
|
|
if elapsed > int64(^uint16(0)) {
|
|
return ^uint16(0)
|
|
}
|
|
return uint16(elapsed)
|
|
}
|