package simple import ( "sync" "time" "dappco.re/go/core/proxy" "dappco.re/go/core/proxy/pool" ) // SimpleMapper holds one outbound pool connection and serves at most one active miner // at a time. It becomes idle when the miner disconnects and may be reclaimed for the // next login. // // m := simple.NewSimpleMapper(id, strategy) type SimpleMapper struct { id int64 miner *proxy.Miner // nil when idle strategy pool.Strategy events *proxy.EventBus pending map[int64]simpleSubmitContext job proxy.Job prevJob proxy.Job idleAt time.Time // zero when active stopped bool mu sync.Mutex } type simpleSubmitContext struct { RequestID int64 Job proxy.Job Expired bool SubmittedAt time.Time } // NewSimpleMapper stores the mapper ID and strategy. // // mapper := simple.NewSimpleMapper(1, strategy) func NewSimpleMapper(id int64, strategy pool.Strategy) *SimpleMapper { return &SimpleMapper{id: id, strategy: strategy, pending: make(map[int64]simpleSubmitContext)} } func (m *SimpleMapper) OnJob(job proxy.Job) { if !job.IsValid() { return } m.mu.Lock() if m.job.IsValid() && m.job.ClientID != "" && m.job.ClientID == job.ClientID { m.prevJob = m.job } else { m.prevJob = proxy.Job{} } m.job = job miner := m.miner m.mu.Unlock() if miner != nil { miner.ForwardJob(job, job.Algo) } } func (m *SimpleMapper) JobStatus(id string) (valid bool, expired bool) { _, valid, expired = m.JobForID(id) return valid, expired } func (m *SimpleMapper) JobForID(id string) (proxy.Job, bool, bool) { m.mu.Lock() defer m.mu.Unlock() if id == "" { return proxy.Job{}, false, false } if id == m.job.JobID { return m.job, true, false } if m.prevJob.IsValid() && m.prevJob.ClientID != "" && id == m.prevJob.JobID { return m.prevJob, true, true } return proxy.Job{}, false, false } func (m *SimpleMapper) OnResultAccepted(sequence int64, accepted bool, errorMessage string) { m.mu.Lock() context, exists := m.pending[sequence] miner := m.miner if !exists { m.mu.Unlock() return } delete(m.pending, sequence) m.mu.Unlock() if miner == nil { return } shareDifficulty := context.Job.DifficultyFromTarget() if shareDifficulty == 0 { shareDifficulty = miner.Diff() } if accepted { latency := shareLatency(context.SubmittedAt) if m.events != nil { m.events.Dispatch(proxy.Event{Type: proxy.EventAccept, Miner: miner, Job: jobPointer(context.Job), Diff: shareDifficulty, Latency: latency, Expired: context.Expired}) } miner.Success(context.RequestID, "OK") return } latency := shareLatency(context.SubmittedAt) if m.events != nil { m.events.Dispatch(proxy.Event{Type: proxy.EventReject, Miner: miner, Job: jobPointer(context.Job), Diff: shareDifficulty, Error: errorMessage, Latency: latency, Expired: context.Expired}) } miner.ReplyWithError(context.RequestID, errorMessage) } func (m *SimpleMapper) OnDisconnect() { m.clearPending() m.stopped = true } func (m *SimpleMapper) clearPending() { m.mu.Lock() m.pending = make(map[int64]simpleSubmitContext) m.mu.Unlock() } func jobPointer(job proxy.Job) *proxy.Job { if !job.IsValid() { return nil } jobCopy := job return &jobCopy } func shareLatency(submittedAt time.Time) uint16 { if submittedAt.IsZero() { return 0 } elapsed := time.Since(submittedAt).Milliseconds() if elapsed <= 0 { return 0 } if elapsed > int64(^uint16(0)) { return ^uint16(0) } return uint16(elapsed) }