go-proxy/splitter/simple/splitter.go
Virgil 20f0626a19 feat(proxy): wire miner runtime flow
Co-Authored-By: Virgil <virgil@lethean.io>
2026-04-04 10:47:58 +00:00

176 lines
4 KiB
Go

// Package simple implements the passthrough splitter mode.
//
// Simple mode creates one upstream pool connection per miner. When ReuseTimeout > 0,
// the upstream connection is held idle for that many seconds after the miner disconnects,
// allowing the next miner to inherit it and avoid reconnect latency.
//
// s := simple.NewSimpleSplitter(cfg, eventBus, strategyFactory)
package simple
import (
"sync"
"time"
"dappco.re/go/core/proxy"
"dappco.re/go/core/proxy/pool"
)
// SimpleSplitter is the Splitter implementation for simple (passthrough) mode.
//
// s := simple.NewSimpleSplitter(cfg, eventBus, strategyFactory)
type SimpleSplitter struct {
active map[int64]*SimpleMapper // minerID → mapper
idle map[int64]*SimpleMapper // mapperID → mapper (reuse pool, keyed by mapper seq)
cfg *proxy.Config
events *proxy.EventBus
factory pool.StrategyFactory
mu sync.Mutex
seq int64 // monotonic mapper sequence counter
}
// NewSimpleSplitter creates the passthrough splitter.
//
// s := simple.NewSimpleSplitter(cfg, bus, factory)
func NewSimpleSplitter(cfg *proxy.Config, events *proxy.EventBus, factory pool.StrategyFactory) *SimpleSplitter {
return &SimpleSplitter{
active: make(map[int64]*SimpleMapper),
idle: make(map[int64]*SimpleMapper),
cfg: cfg,
events: events,
factory: factory,
}
}
func (s *SimpleSplitter) Connect() {}
func (s *SimpleSplitter) OnLogin(event *proxy.LoginEvent) {
if event == nil || event.Miner == nil {
return
}
s.mu.Lock()
defer s.mu.Unlock()
var mapper *SimpleMapper
for mapperID, idleMapper := range s.idle {
mapper = idleMapper
delete(s.idle, mapperID)
break
}
if mapper == nil {
s.seq++
var strategy pool.Strategy
mapper = NewSimpleMapper(s.seq, nil)
mapper.events = s.events
if s.factory != nil {
strategy = s.factory(mapper)
}
mapper.strategy = strategy
if mapper.strategy != nil {
mapper.strategy.Connect()
}
} else {
mapper.events = s.events
}
mapper.miner = event.Miner
mapper.idleAt = time.Time{}
event.Miner.SetRouteID(mapper.id)
s.active[event.Miner.ID()] = mapper
}
func (s *SimpleSplitter) OnSubmit(event *proxy.SubmitEvent) {
if event == nil || event.Miner == nil {
return
}
s.mu.Lock()
mapper := s.active[event.Miner.ID()]
s.mu.Unlock()
if mapper == nil || mapper.strategy == nil {
return
}
sequence := mapper.strategy.Submit(event.JobID, event.Nonce, event.Result, event.Algo)
if sequence == 0 {
event.Miner.ReplyWithError(event.RequestID, "Pool unavailable")
return
}
mapper.mu.Lock()
mapper.pending[sequence] = event.RequestID
mapper.mu.Unlock()
}
func (s *SimpleSplitter) OnClose(event *proxy.CloseEvent) {
if event == nil || event.Miner == nil {
return
}
s.mu.Lock()
defer s.mu.Unlock()
mapper := s.active[event.Miner.ID()]
if mapper == nil {
return
}
delete(s.active, event.Miner.ID())
mapper.miner = nil
mapper.idleAt = time.Now().UTC()
if s.cfg != nil && s.cfg.ReuseTimeout > 0 {
s.idle[mapper.id] = mapper
return
}
mapper.stopped = true
if mapper.strategy != nil {
mapper.strategy.Disconnect()
}
}
func (s *SimpleSplitter) Tick(ticks uint64) {
if ticks%60 == 0 {
s.GC()
}
}
func (s *SimpleSplitter) GC() {
s.mu.Lock()
defer s.mu.Unlock()
timeout := time.Duration(0)
if s.cfg != nil && s.cfg.ReuseTimeout > 0 {
timeout = time.Duration(s.cfg.ReuseTimeout) * time.Second
}
now := time.Now().UTC()
for mapperID, mapper := range s.idle {
if timeout == 0 || (!mapper.idleAt.IsZero() && now.Sub(mapper.idleAt) > timeout) || mapper.stopped {
if mapper.strategy != nil {
mapper.strategy.Disconnect()
}
delete(s.idle, mapperID)
}
}
}
func (s *SimpleSplitter) Upstreams() proxy.UpstreamStats {
s.mu.Lock()
defer s.mu.Unlock()
stats := proxy.UpstreamStats{
Sleep: uint64(len(s.idle)),
}
for _, mapper := range s.active {
stats.Total++
if mapper.strategy != nil && mapper.strategy.IsActive() {
stats.Active++
} else {
stats.Error++
}
}
stats.Total += uint64(len(s.idle))
return stats
}