go-proxy/splitter/simple/splitter.go
Virgil 5190caf9d6 refactor(ax): expand internal naming
Co-Authored-By: Virgil <virgil@lethean.io>
2026-04-04 14:33:44 +00:00

275 lines
6.3 KiB
Go

// Package simple implements the passthrough splitter mode.
//
// Simple mode creates one upstream pool connection per miner. When ReuseTimeout > 0,
// the upstream connection is held idle for that many seconds after the miner disconnects,
// allowing the next miner to inherit it and avoid reconnect latency.
//
// s := simple.NewSimpleSplitter(cfg, eventBus, strategyFactory)
package simple
import (
"sync"
"time"
"dappco.re/go/core/proxy"
"dappco.re/go/core/proxy/pool"
)
// SimpleSplitter is the Splitter implementation for simple (passthrough) mode.
//
// s := simple.NewSimpleSplitter(cfg, eventBus, strategyFactory)
type SimpleSplitter struct {
active map[int64]*SimpleMapper // minerID → mapper
idle map[int64]*SimpleMapper // mapperID → mapper (reuse pool, keyed by mapper seq)
config *proxy.Config
events *proxy.EventBus
strategyFactory pool.StrategyFactory
mu sync.Mutex
mapperSequence int64 // monotonic mapper sequence counter
}
// NewSimpleSplitter creates the passthrough splitter.
//
// s := simple.NewSimpleSplitter(cfg, bus, factory)
func NewSimpleSplitter(cfg *proxy.Config, events *proxy.EventBus, factory pool.StrategyFactory) *SimpleSplitter {
return &SimpleSplitter{
active: make(map[int64]*SimpleMapper),
idle: make(map[int64]*SimpleMapper),
config: cfg,
events: events,
strategyFactory: factory,
}
}
func (s *SimpleSplitter) Connect() {}
func (s *SimpleSplitter) OnLogin(event *proxy.LoginEvent) {
if event == nil || event.Miner == nil {
return
}
s.mu.Lock()
defer s.mu.Unlock()
timeout := time.Duration(0)
if s.config != nil && s.config.ReuseTimeout > 0 {
timeout = time.Duration(s.config.ReuseTimeout) * time.Second
}
var mapper *SimpleMapper
now := time.Now().UTC()
for mapperID, idleMapper := range s.idle {
if idleMapper == nil || idleMapper.stopped || idleMapper.strategy == nil || !idleMapper.strategy.IsActive() || (timeout > 0 && !idleMapper.idleAt.IsZero() && now.Sub(idleMapper.idleAt) > timeout) {
if idleMapper != nil && idleMapper.strategy != nil {
idleMapper.strategy.Disconnect()
}
delete(s.idle, mapperID)
continue
}
mapper = idleMapper
delete(s.idle, mapperID)
break
}
if mapper == nil {
s.mapperSequence++
var strategy pool.Strategy
mapper = NewSimpleMapper(s.mapperSequence, nil)
mapper.events = s.events
if s.strategyFactory != nil {
strategy = s.strategyFactory(mapper)
}
mapper.strategy = strategy
if mapper.strategy != nil {
mapper.strategy.Connect()
}
} else {
mapper.events = s.events
mapper.clearPending()
}
mapper.miner = event.Miner
mapper.idleAt = time.Time{}
event.Miner.SetRouteID(mapper.id)
s.active[event.Miner.ID()] = mapper
mapper.mu.Lock()
currentJob := mapper.job
mapper.mu.Unlock()
if currentJob.IsValid() {
event.Miner.PrimeJob(currentJob)
}
}
func (s *SimpleSplitter) OnSubmit(event *proxy.SubmitEvent) {
if event == nil || event.Miner == nil {
return
}
s.mu.Lock()
mapper := s.active[event.Miner.ID()]
s.mu.Unlock()
if mapper == nil || mapper.strategy == nil {
return
}
job, valid, expired := mapper.JobForID(event.JobID)
if !valid {
event.Miner.ReplyWithError(event.RequestID, "Invalid job id")
return
}
sequence := mapper.strategy.Submit(event.JobID, event.Nonce, event.Result, event.Algo)
if sequence == 0 {
event.Miner.ReplyWithError(event.RequestID, "Pool unavailable")
return
}
mapper.mu.Lock()
mapper.pending[sequence] = simpleSubmitContext{
RequestID: event.RequestID,
Job: job,
Expired: expired,
SubmittedAt: time.Now().UTC(),
}
mapper.mu.Unlock()
}
func (s *SimpleSplitter) OnClose(event *proxy.CloseEvent) {
if event == nil || event.Miner == nil {
return
}
s.mu.Lock()
defer s.mu.Unlock()
mapper := s.active[event.Miner.ID()]
if mapper == nil {
return
}
delete(s.active, event.Miner.ID())
mapper.clearPending()
mapper.miner = nil
mapper.idleAt = time.Now().UTC()
if s.config != nil && s.config.ReuseTimeout > 0 {
s.idle[mapper.id] = mapper
return
}
mapper.stopped = true
if mapper.strategy != nil {
mapper.strategy.Disconnect()
}
}
func (s *SimpleSplitter) Tick(ticks uint64) {
if ticks%60 == 0 {
s.GC()
}
}
func (s *SimpleSplitter) GC() {
s.mu.Lock()
defer s.mu.Unlock()
timeout := time.Duration(0)
if s.config != nil && s.config.ReuseTimeout > 0 {
timeout = time.Duration(s.config.ReuseTimeout) * time.Second
}
now := time.Now().UTC()
for mapperID, mapper := range s.idle {
if mapper == nil {
delete(s.idle, mapperID)
continue
}
if mapper.stopped || mapper.strategy == nil || !mapper.strategy.IsActive() || timeout == 0 || (!mapper.idleAt.IsZero() && now.Sub(mapper.idleAt) > timeout) {
if mapper.strategy != nil {
mapper.strategy.Disconnect()
}
delete(s.idle, mapperID)
}
}
}
func (s *SimpleSplitter) Upstreams() proxy.UpstreamStats {
s.mu.Lock()
defer s.mu.Unlock()
stats := proxy.UpstreamStats{
Sleep: uint64(len(s.idle)),
}
for _, mapper := range s.active {
stats.Total++
if mapper.strategy != nil && mapper.strategy.IsActive() {
stats.Active++
} else {
stats.Error++
}
}
stats.Total += uint64(len(s.idle))
return stats
}
func (s *SimpleSplitter) PendingCount() int {
s.mu.Lock()
mapperList := make([]*SimpleMapper, 0, len(s.active)+len(s.idle))
for _, mapper := range s.active {
mapperList = append(mapperList, mapper)
}
for _, mapper := range s.idle {
mapperList = append(mapperList, mapper)
}
s.mu.Unlock()
pending := 0
for _, mapper := range mapperList {
if mapper == nil {
continue
}
mapper.mu.Lock()
pending += len(mapper.pending)
mapper.mu.Unlock()
}
return pending
}
func (s *SimpleSplitter) Disconnect() {
s.mu.Lock()
active := s.active
idle := s.idle
s.active = make(map[int64]*SimpleMapper)
s.idle = make(map[int64]*SimpleMapper)
s.mu.Unlock()
for _, mapper := range active {
if mapper == nil {
continue
}
mapper.mu.Lock()
mapper.stopped = true
strategy := mapper.strategy
mapper.strategy = nil
mapper.miner = nil
mapper.mu.Unlock()
if strategy != nil {
strategy.Disconnect()
}
}
for _, mapper := range idle {
if mapper == nil {
continue
}
mapper.mu.Lock()
mapper.stopped = true
strategy := mapper.strategy
mapper.strategy = nil
mapper.miner = nil
mapper.mu.Unlock()
if strategy != nil {
strategy.Disconnect()
}
}
}