go-proxy/proxy_runtime.go
Virgil f0477b9980 refactor(proxy): use semantic runtime names
Co-Authored-By: Virgil <virgil@lethean.io>
2026-04-04 12:01:40 +00:00

332 lines
7.5 KiB
Go

package proxy
import (
"crypto/tls"
"net"
"time"
)
type splitterShutdown interface {
PendingCount() int
Disconnect()
}
// New creates and wires all subsystems but does not start the tick loop or TCP listeners.
//
// p, errorValue := proxy.New(config)
func New(config *Config) (*Proxy, error) {
if errorValue := config.Validate(); errorValue != nil {
return nil, errorValue
}
eventBus := NewEventBus()
statsValue := NewStats()
customDiffFilter := NewCustomDiff(config.CustomDiff)
eventBus.Subscribe(EventLogin, customDiffFilter.OnLogin)
workersValue := NewWorkers(config.Workers, eventBus)
workersValue.SetCustomDiffStats(config.CustomDiffStats)
splitterValue := newSplitter(config, eventBus)
proxyInstance := &Proxy{
config: config,
customDiff: customDiffFilter,
splitter: splitterValue,
stats: statsValue,
workers: workersValue,
events: eventBus,
miners: make(map[int64]*Miner),
rateLimiter: NewRateLimiter(config.RateLimit),
done: make(chan struct{}),
}
proxyInstance.accessLogger = subscribeAccessLog(eventBus, config.AccessLogFile)
proxyInstance.shareLogger = subscribeShareLog(eventBus, config.ShareLogFile)
eventBus.Subscribe(EventLogin, func(event Event) {
if event.Miner != nil {
proxyInstance.minerMu.Lock()
proxyInstance.miners[event.Miner.ID()] = event.Miner
proxyInstance.minerMu.Unlock()
}
statsValue.connections.Add(1)
current := proxyInstance.currentMiners.Add(1)
for {
maximum := statsValue.maxMiners.Load()
if current <= maximum || statsValue.maxMiners.CompareAndSwap(maximum, current) {
break
}
}
})
eventBus.Subscribe(EventClose, func(event Event) {
if event.Miner != nil {
proxyInstance.minerMu.Lock()
delete(proxyInstance.miners, event.Miner.ID())
proxyInstance.minerMu.Unlock()
}
if proxyInstance.currentMiners.Load() > 0 {
proxyInstance.currentMiners.Add(^uint64(0))
}
})
eventBus.Subscribe(EventAccept, statsValue.OnAccept)
eventBus.Subscribe(EventReject, statsValue.OnReject)
if splitterValue != nil {
eventBus.Subscribe(EventSubmit, func(event Event) {
splitterValue.OnSubmit(&SubmitEvent{
Miner: event.Miner,
JobID: event.JobID,
Nonce: event.Nonce,
Result: event.Result,
Algo: event.Algo,
RequestID: event.RequestID,
})
})
eventBus.Subscribe(EventLogin, func(event Event) {
splitterValue.OnLogin(&LoginEvent{Miner: event.Miner})
})
eventBus.Subscribe(EventClose, func(event Event) {
splitterValue.OnClose(&CloseEvent{Miner: event.Miner})
})
}
if config.Watch && config.sourcePath != "" {
proxyInstance.watcher = NewConfigWatcher(config.sourcePath, proxyInstance.Reload)
proxyInstance.watcher.Start()
}
return proxyInstance, nil
}
// Start begins the TCP listener(s), pool connections, and tick loop.
//
// p.Start()
func (p *Proxy) Start() {
if p.splitter != nil {
p.splitter.Connect()
}
p.ticker = time.NewTicker(time.Second)
for _, bind := range p.config.Bind {
var tlsConfig *tls.Config
if bind.TLS && p.config.TLS.Enabled {
certificate, errorValue := tls.LoadX509KeyPair(p.config.TLS.CertFile, p.config.TLS.KeyFile)
if errorValue == nil {
tlsConfig = buildTLSConfig(p.config.TLS)
tlsConfig.Certificates = []tls.Certificate{certificate}
} else {
p.Stop()
return
}
}
server, errorValue := NewServer(bind, tlsConfig, p.rateLimiter, p.acceptConn)
if errorValue != nil {
p.Stop()
return
}
p.servers = append(p.servers, server)
server.Start()
}
if p.config != nil && p.config.HTTP.Enabled {
startHTTPServer(p)
}
go func() {
var ticks uint64
for {
select {
case <-p.ticker.C:
ticks++
p.stats.Tick()
p.workers.Tick()
if p.rateLimiter != nil {
p.rateLimiter.Tick()
}
if p.splitter != nil {
p.splitter.Tick(ticks)
}
case <-p.done:
return
}
}
}()
<-p.done
}
type noopSplitter struct{}
func (noopSplitter) Connect() {}
func (noopSplitter) OnLogin(event *LoginEvent) {}
func (noopSplitter) OnSubmit(event *SubmitEvent) {}
func (noopSplitter) OnClose(event *CloseEvent) {}
func (noopSplitter) Tick(ticks uint64) {}
func (noopSplitter) GC() {}
func (noopSplitter) Upstreams() UpstreamStats { return UpstreamStats{} }
func (noopSplitter) PendingCount() int { return 0 }
func (noopSplitter) Disconnect() {}
func noopSplitterFactory(cfg *Config, events *EventBus) Splitter {
return noopSplitter{}
}
// Stop shuts down all subsystems cleanly.
//
// p.Stop()
func (p *Proxy) Stop() {
if p.ticker != nil {
p.ticker.Stop()
}
for _, server := range p.servers {
server.Stop()
}
stopHTTPServer(p)
if p.accessLogger != nil {
p.accessLogger.close()
}
if p.shareLogger != nil {
p.shareLogger.close()
}
if p.watcher != nil {
p.watcher.Stop()
}
if shutdown, ok := p.splitter.(splitterShutdown); ok {
deadline := time.Now().Add(5 * time.Second)
for shutdown.PendingCount() > 0 && time.Now().Before(deadline) {
time.Sleep(50 * time.Millisecond)
}
}
p.minerMu.RLock()
miners := make([]*Miner, 0, len(p.miners))
for _, miner := range p.miners {
miners = append(miners, miner)
}
p.minerMu.RUnlock()
for _, miner := range miners {
if miner != nil {
miner.Close()
}
}
if shutdown, ok := p.splitter.(splitterShutdown); ok {
shutdown.Disconnect()
}
select {
case <-p.done:
default:
close(p.done)
}
}
// Reload replaces the live config.
//
// p.Reload(newCfg)
func (p *Proxy) Reload(config *Config) {
if config != nil {
if p.config == nil {
p.config = config
} else {
sourcePath := p.config.sourcePath
bind := append([]BindAddr(nil), p.config.Bind...)
*p.config = *config
p.config.sourcePath = sourcePath
p.config.Bind = bind
}
if p.customDiff != nil {
p.customDiff.SetGlobalDiff(p.config.CustomDiff)
}
if p.workers != nil {
p.workers.SetCustomDiffStats(p.config.CustomDiffStats)
}
if p.rateLimiter != nil {
p.rateLimiter.SetConfig(p.config.RateLimit)
}
}
}
func (p *Proxy) Summary() StatsSummary {
if p == nil || p.stats == nil {
return StatsSummary{}
}
return p.stats.Summary()
}
func (p *Proxy) Workers() []WorkerRecord {
if p == nil || p.workers == nil {
return nil
}
return p.workers.List()
}
func (p *Proxy) Miners() []*Miner {
if p == nil {
return nil
}
p.minerMu.RLock()
defer p.minerMu.RUnlock()
miners := make([]*Miner, 0, len(p.miners))
for _, miner := range p.miners {
miners = append(miners, miner)
}
return miners
}
func (p *Proxy) CurrentMiners() uint64 {
if p == nil {
return 0
}
return p.currentMiners.Load()
}
func (p *Proxy) MaxMiners() uint64 {
if p == nil || p.stats == nil {
return 0
}
return p.stats.maxMiners.Load()
}
func (p *Proxy) Mode() string {
if p == nil || p.config == nil {
return ""
}
return p.config.Mode
}
func (p *Proxy) HTTPConfig() HTTPConfig {
if p == nil || p.config == nil {
return HTTPConfig{}
}
return p.config.HTTP
}
func (p *Proxy) WorkersMode() string {
if p == nil || p.config == nil {
return ""
}
return string(p.config.Workers)
}
func (p *Proxy) Upstreams() UpstreamStats {
if p == nil || p.splitter == nil {
return UpstreamStats{}
}
return p.splitter.Upstreams()
}
func (p *Proxy) acceptConn(conn net.Conn, localPort uint16) {
var tlsConfig *tls.Config
if _, ok := conn.(*tls.Conn); ok {
tlsConfig = &tls.Config{}
}
miner := NewMiner(conn, localPort, tlsConfig)
miner.events = p.events
miner.splitter = p.splitter
if p.config != nil {
miner.accessPassword = p.config.AccessPassword
miner.algoExtension = p.config.AlgoExtension
}
miner.Start()
}