go-proxy/splitter/simple/impl.go
Virgil b3fd1fef61 refactor(splitter): clarify mapper ownership names
Co-Authored-By: Virgil <virgil@lethean.io>
2026-04-05 02:40:42 +00:00

383 lines
9.1 KiB
Go

package simple
import (
"time"
"dappco.re/go/proxy"
"dappco.re/go/proxy/pool"
)
func init() {
proxy.RegisterSplitterFactory("simple", func(config *proxy.Config, eventBus *proxy.EventBus) proxy.Splitter {
return NewSimpleSplitter(config, eventBus, pool.NewStrategyFactory(config))
})
}
// NewSimpleSplitter creates the passthrough splitter.
func NewSimpleSplitter(config *proxy.Config, eventBus *proxy.EventBus, factory pool.StrategyFactory) *SimpleSplitter {
if factory == nil {
factory = pool.NewStrategyFactory(config)
}
return &SimpleSplitter{
active: make(map[int64]*SimpleMapper),
idle: make(map[int64]*SimpleMapper),
config: config,
events: eventBus,
factory: factory,
}
}
// Connect establishes any mapper strategies that already exist.
func (s *SimpleSplitter) Connect() {
if s == nil {
return
}
s.mu.Lock()
defer s.mu.Unlock()
for _, mapper := range s.active {
if mapper.strategy != nil {
mapper.strategy.Connect()
}
}
for _, mapper := range s.idle {
if mapper.strategy != nil {
mapper.strategy.Connect()
}
}
}
// OnLogin creates or reclaims a mapper.
func (s *SimpleSplitter) OnLogin(event *proxy.LoginEvent) {
if s == nil || event == nil || event.Miner == nil {
return
}
s.mu.Lock()
defer s.mu.Unlock()
now := time.Now()
if s.config.ReuseTimeout > 0 {
for id, mapper := range s.idle {
if mapper.strategy != nil && mapper.strategy.IsActive() && !mapper.idleAt.IsZero() && now.Sub(mapper.idleAt) <= time.Duration(s.config.ReuseTimeout)*time.Second {
delete(s.idle, id)
mapper.miner = event.Miner
mapper.idleAt = time.Time{}
mapper.stopped = false
s.active[event.Miner.ID()] = mapper
event.Miner.SetRouteID(mapper.id)
if mapper.currentJob.IsValid() {
event.Miner.SetCurrentJob(mapper.currentJob)
}
return
}
}
}
mapper := s.newMapperLocked()
mapper.miner = event.Miner
s.active[event.Miner.ID()] = mapper
event.Miner.SetRouteID(mapper.id)
if mapper.strategy != nil {
mapper.strategy.Connect()
}
}
// OnSubmit forwards the share to the owning mapper.
func (s *SimpleSplitter) OnSubmit(event *proxy.SubmitEvent) {
if s == nil || event == nil || event.Miner == nil {
return
}
s.mu.Lock()
mapper := s.activeMapperByRouteIDLocked(event.Miner.RouteID())
s.mu.Unlock()
if mapper != nil {
mapper.Submit(event)
}
}
// OnClose moves a mapper to the idle pool or stops it.
func (s *SimpleSplitter) OnClose(event *proxy.CloseEvent) {
if s == nil || event == nil || event.Miner == nil {
return
}
s.mu.Lock()
defer s.mu.Unlock()
mapper := s.active[event.Miner.ID()]
if mapper == nil {
return
}
delete(s.active, event.Miner.ID())
mapper.miner = nil
mapper.idleAt = time.Now()
event.Miner.SetRouteID(-1)
if s.config.ReuseTimeout > 0 {
s.idle[mapper.id] = mapper
return
}
mapper.stopped = true
if mapper.strategy != nil {
mapper.strategy.Disconnect()
}
}
// GC removes expired idle mappers.
func (s *SimpleSplitter) GC() {
if s == nil {
return
}
s.mu.Lock()
defer s.mu.Unlock()
now := time.Now()
for id, mapper := range s.idle {
if mapper.stopped || (s.config.ReuseTimeout > 0 && now.Sub(mapper.idleAt) > time.Duration(s.config.ReuseTimeout)*time.Second) {
if mapper.strategy != nil {
mapper.strategy.Disconnect()
}
delete(s.idle, id)
}
}
}
// Tick advances timeout checks in simple mode.
func (s *SimpleSplitter) Tick(ticks uint64) {
if s == nil {
return
}
strategies := make([]pool.Strategy, 0, len(s.active)+len(s.idle))
s.mu.Lock()
for _, mapper := range s.active {
if mapper != nil && mapper.strategy != nil {
strategies = append(strategies, mapper.strategy)
}
}
for _, mapper := range s.idle {
if mapper != nil && mapper.strategy != nil {
strategies = append(strategies, mapper.strategy)
}
}
s.mu.Unlock()
for _, strategy := range strategies {
if ticker, ok := strategy.(interface{ Tick(uint64) }); ok {
ticker.Tick(ticks)
}
}
s.GC()
}
// Upstreams returns active/idle/error counts.
func (s *SimpleSplitter) Upstreams() proxy.UpstreamStats {
if s == nil {
return proxy.UpstreamStats{}
}
s.mu.Lock()
defer s.mu.Unlock()
var stats proxy.UpstreamStats
for _, mapper := range s.active {
if mapper == nil {
continue
}
if mapper.stopped || mapper.strategy == nil || !mapper.strategy.IsActive() {
stats.Error++
continue
}
stats.Active++
}
for _, mapper := range s.idle {
if mapper == nil {
continue
}
if mapper.stopped || mapper.strategy == nil || !mapper.strategy.IsActive() {
stats.Error++
continue
}
stats.Sleep++
}
stats.Total = stats.Active + stats.Sleep + stats.Error
return stats
}
// Disconnect closes every active or idle upstream connection and clears the mapper tables.
func (s *SimpleSplitter) Disconnect() {
if s == nil {
return
}
s.mu.Lock()
defer s.mu.Unlock()
for _, mapper := range s.active {
if mapper != nil && mapper.strategy != nil {
mapper.strategy.Disconnect()
}
}
for _, mapper := range s.idle {
if mapper != nil && mapper.strategy != nil {
mapper.strategy.Disconnect()
}
}
s.active = make(map[int64]*SimpleMapper)
s.idle = make(map[int64]*SimpleMapper)
}
// ReloadPools reconnects each active or idle mapper using the updated pool list.
//
// s.ReloadPools()
func (s *SimpleSplitter) ReloadPools() {
if s == nil {
return
}
strategies := make([]pool.Strategy, 0, len(s.active)+len(s.idle))
s.mu.Lock()
for _, mapper := range s.active {
if mapper == nil || mapper.strategy == nil {
continue
}
strategies = append(strategies, mapper.strategy)
}
for _, mapper := range s.idle {
if mapper == nil || mapper.strategy == nil {
continue
}
strategies = append(strategies, mapper.strategy)
}
s.mu.Unlock()
for _, strategy := range strategies {
if reloadable, ok := strategy.(pool.ReloadableStrategy); ok {
reloadable.ReloadPools()
}
}
}
func (s *SimpleSplitter) newMapperLocked() *SimpleMapper {
id := s.nextMapperID
s.nextMapperID++
mapper := NewSimpleMapper(id, nil)
mapper.events = s.events
mapper.strategy = s.factory(mapper)
if mapper.strategy == nil {
mapper.strategy = s.factory(mapper)
}
return mapper
}
func (s *SimpleSplitter) activeMapperByRouteIDLocked(routeID int64) *SimpleMapper {
if s == nil || routeID < 0 {
return nil
}
for _, mapper := range s.active {
if mapper != nil && mapper.id == routeID {
return mapper
}
}
return nil
}
// Submit forwards a share to the pool.
func (m *SimpleMapper) Submit(event *proxy.SubmitEvent) {
if m == nil || event == nil || m.strategy == nil {
return
}
m.mu.Lock()
defer m.mu.Unlock()
jobID := event.JobID
if jobID == "" {
jobID = m.currentJob.JobID
}
if jobID == "" || (jobID != m.currentJob.JobID && jobID != m.prevJob.JobID) {
m.rejectInvalidJobLocked(event, m.currentJob)
return
}
submissionJob := m.currentJob
if jobID == m.prevJob.JobID && m.prevJob.JobID != "" {
submissionJob = m.prevJob
}
seq := m.strategy.Submit(jobID, event.Nonce, event.Result, event.Algo)
m.pending[seq] = submitContext{
RequestID: event.RequestID,
Diff: proxy.EffectiveShareDifficulty(submissionJob, event.Miner),
StartedAt: time.Now(),
JobID: jobID,
}
}
func (m *SimpleMapper) rejectInvalidJobLocked(event *proxy.SubmitEvent, job proxy.Job) {
if event == nil || event.Miner == nil {
return
}
event.Miner.ReplyWithError(event.RequestID, "Invalid job id")
if m.events != nil {
jobCopy := job
m.events.Dispatch(proxy.Event{Type: proxy.EventReject, Miner: event.Miner, Job: &jobCopy, Error: "Invalid job id"})
}
}
// OnJob forwards the latest pool job to the active miner.
func (m *SimpleMapper) OnJob(job proxy.Job) {
if m == nil {
return
}
m.mu.Lock()
m.prevJob = m.currentJob
if m.prevJob.ClientID != job.ClientID {
m.prevJob = proxy.Job{}
}
m.currentJob = job
m.stopped = false
m.idleAt = time.Time{}
miner := m.miner
m.mu.Unlock()
if miner == nil {
return
}
miner.ForwardJob(job, job.Algo)
}
// OnResultAccepted forwards result status to the miner.
func (m *SimpleMapper) OnResultAccepted(sequence int64, accepted bool, errorMessage string) {
if m == nil {
return
}
m.mu.Lock()
ctx, ok := m.pending[sequence]
if ok {
delete(m.pending, sequence)
}
miner := m.miner
currentJob := m.currentJob
prevJob := m.prevJob
m.mu.Unlock()
if !ok || miner == nil {
return
}
latency := uint16(0)
if !ctx.StartedAt.IsZero() {
elapsed := time.Since(ctx.StartedAt).Milliseconds()
if elapsed > int64(^uint16(0)) {
latency = ^uint16(0)
} else {
latency = uint16(elapsed)
}
}
job := currentJob
expired := false
if ctx.JobID != "" && ctx.JobID == prevJob.JobID && ctx.JobID != currentJob.JobID {
job = prevJob
expired = true
}
if accepted {
miner.Success(ctx.RequestID, "OK")
if m.events != nil {
m.events.Dispatch(proxy.Event{Type: proxy.EventAccept, Miner: miner, Diff: ctx.Diff, Job: &job, Latency: latency, Expired: expired})
}
return
}
miner.ReplyWithError(ctx.RequestID, errorMessage)
if m.events != nil {
m.events.Dispatch(proxy.Event{Type: proxy.EventReject, Miner: miner, Diff: ctx.Diff, Job: &job, Error: errorMessage, Latency: latency})
}
}
// OnDisconnect marks the mapper as disconnected.
func (m *SimpleMapper) OnDisconnect() {
if m == nil {
return
}
m.stopped = true
}