package proxy import ( "crypto/tls" "errors" "net" "time" ) type splitterShutdown interface { PendingCount() int Disconnect() } // New wires the proxy and returns a ready-to-start instance. // // p, errorValue := proxy.New(config) func New(config *Config) (*Proxy, error) { if config == nil { return nil, errors.New("config is nil") } if errorValue := config.Validate(); errorValue != nil { return nil, errorValue } eventBus := NewEventBus() statsValue := NewStats() customDifficultyFilter := NewCustomDiff(config.CustomDiff) eventBus.Subscribe(EventLogin, customDifficultyFilter.OnLogin) workersValue := NewWorkers(config.Workers, eventBus) workersValue.SetCustomDiffStats(config.CustomDiffStats) splitterValue := newSplitter(config, eventBus) proxyInstance := &Proxy{ config: config, customDifficulty: customDifficultyFilter, splitter: splitterValue, stats: statsValue, workers: workersValue, events: eventBus, miners: make(map[int64]*Miner), rateLimiter: NewRateLimiter(config.RateLimit), done: make(chan struct{}), } proxyInstance.accessLogger = subscribeAccessLog(eventBus, config.AccessLogFile) proxyInstance.shareLogger = subscribeShareLog(eventBus, config.ShareLogFile) eventBus.Subscribe(EventLogin, func(event Event) { if event.Miner != nil { proxyInstance.minerMu.Lock() proxyInstance.miners[event.Miner.ID()] = event.Miner proxyInstance.minerMu.Unlock() } current := proxyInstance.currentMiners.Add(1) for { maximum := statsValue.maxMiners.Load() if current <= maximum || statsValue.maxMiners.CompareAndSwap(maximum, current) { break } } }) eventBus.Subscribe(EventClose, func(event Event) { if event.Miner != nil { proxyInstance.minerMu.Lock() delete(proxyInstance.miners, event.Miner.ID()) proxyInstance.minerMu.Unlock() } if proxyInstance.currentMiners.Load() > 0 { proxyInstance.currentMiners.Add(^uint64(0)) } }) eventBus.Subscribe(EventAccept, statsValue.OnAccept) eventBus.Subscribe(EventReject, statsValue.OnReject) if splitterValue != nil { eventBus.Subscribe(EventSubmit, func(event Event) { splitterValue.OnSubmit(&SubmitEvent{ Miner: event.Miner, JobID: event.JobID, Nonce: event.Nonce, Result: event.Result, Algo: event.Algo, RequestID: event.RequestID, }) }) eventBus.Subscribe(EventLogin, func(event Event) { splitterValue.OnLogin(&LoginEvent{Miner: event.Miner}) }) eventBus.Subscribe(EventClose, func(event Event) { splitterValue.OnClose(&CloseEvent{Miner: event.Miner}) }) } if config.Watch && config.sourcePath != "" { proxyInstance.watcher = newConfigWatcher(config.sourcePath, proxyInstance.Reload, config.Watch) proxyInstance.watcher.Start() } return proxyInstance, nil } // Start connects the pool, opens listeners, and blocks until `Stop()`. // // p.Start() func (p *Proxy) Start() { if p.splitter != nil { p.splitter.Connect() } p.ticker = time.NewTicker(time.Second) for _, bind := range p.config.Bind { var tlsConfig *tls.Config if bind.TLS && p.config.TLS.Enabled { certificate, errorValue := tls.LoadX509KeyPair(p.config.TLS.CertFile, p.config.TLS.KeyFile) if errorValue == nil { tlsConfig = buildTLSConfig(p.config.TLS) tlsConfig.Certificates = []tls.Certificate{certificate} } else { p.Stop() return } } server, errorValue := NewServer(bind, tlsConfig, p.rateLimiter, p.acceptConn) if errorValue != nil { p.Stop() return } p.servers = append(p.servers, server) server.Start() } if p.config != nil && p.config.HTTP.Enabled { startHTTPServer(p) } go func() { var ticks uint64 for { select { case <-p.ticker.C: ticks++ p.stats.Tick() p.workers.Tick() if p.rateLimiter != nil { p.rateLimiter.Tick() } if p.splitter != nil { p.splitter.Tick(ticks) } case <-p.done: return } } }() <-p.done } type noopSplitter struct{} func (noopSplitter) Connect() {} func (noopSplitter) OnLogin(event *LoginEvent) {} func (noopSplitter) OnSubmit(event *SubmitEvent) {} func (noopSplitter) OnClose(event *CloseEvent) {} func (noopSplitter) Tick(ticks uint64) {} func (noopSplitter) GC() {} func (noopSplitter) Upstreams() UpstreamStats { return UpstreamStats{} } func (noopSplitter) PendingCount() int { return 0 } func (noopSplitter) Disconnect() {} func noopSplitterFactory(cfg *Config, events *EventBus) Splitter { return noopSplitter{} } // Stop closes listeners, log files, watcher, miners, and pool connections. // // p.Stop() func (p *Proxy) Stop() { if p.ticker != nil { p.ticker.Stop() } for _, server := range p.servers { server.Stop() } stopHTTPServer(p) if p.accessLogger != nil { p.accessLogger.close() } if p.shareLogger != nil { p.shareLogger.close() } if p.watcher != nil { p.watcher.Stop() } if shutdown, ok := p.splitter.(splitterShutdown); ok { deadline := time.Now().Add(5 * time.Second) for shutdown.PendingCount() > 0 && time.Now().Before(deadline) { time.Sleep(50 * time.Millisecond) } } p.minerMu.RLock() miners := make([]*Miner, 0, len(p.miners)) for _, miner := range p.miners { miners = append(miners, miner) } p.minerMu.RUnlock() for _, miner := range miners { if miner != nil { miner.Close() } } if shutdown, ok := p.splitter.(splitterShutdown); ok { shutdown.Disconnect() } select { case <-p.done: default: close(p.done) } } // Reload replaces the live config. // // p.Reload(newCfg) func (p *Proxy) Reload(config *Config) { if config != nil { if p.config == nil { p.config = config } else { sourcePath := p.config.sourcePath bind := append([]BindAddr(nil), p.config.Bind...) *p.config = *config p.config.sourcePath = sourcePath p.config.Bind = bind } if p.customDifficulty != nil { p.customDifficulty.SetGlobalDiff(p.config.CustomDiff) } if p.workers != nil { p.workers.SetCustomDiffStats(p.config.CustomDiffStats) } if p.rateLimiter != nil { p.rateLimiter.SetConfig(p.config.RateLimit) } if p.accessLogger != nil { p.accessLogger.setPath(p.config.AccessLogFile) } if p.shareLogger != nil { p.shareLogger.setPath(p.config.ShareLogFile) } } } func (p *Proxy) Summary() StatsSummary { if p == nil || p.stats == nil { return StatsSummary{} } return p.stats.Summary() } func (p *Proxy) Workers() []WorkerRecord { if p == nil || p.workers == nil { return nil } return p.workers.List() } func (p *Proxy) Miners() []*Miner { if p == nil { return nil } p.minerMu.RLock() defer p.minerMu.RUnlock() miners := make([]*Miner, 0, len(p.miners)) for _, miner := range p.miners { miners = append(miners, miner) } return miners } func (p *Proxy) CurrentMiners() uint64 { if p == nil { return 0 } return p.currentMiners.Load() } func (p *Proxy) MaxMiners() uint64 { if p == nil || p.stats == nil { return 0 } return p.stats.maxMiners.Load() } func (p *Proxy) Mode() string { if p == nil || p.config == nil { return "" } return p.config.Mode } func (p *Proxy) HTTPConfig() HTTPConfig { if p == nil || p.config == nil { return HTTPConfig{} } return p.config.HTTP } func (p *Proxy) WorkersMode() string { if p == nil || p.config == nil { return "" } return string(p.config.Workers) } func (p *Proxy) Upstreams() UpstreamStats { if p == nil || p.splitter == nil { return UpstreamStats{} } return p.splitter.Upstreams() } func (p *Proxy) acceptConn(conn net.Conn, localPort uint16) { if p != nil && p.stats != nil { p.stats.connections.Add(1) } var tlsConfig *tls.Config if _, ok := conn.(*tls.Conn); ok { tlsConfig = &tls.Config{} } miner := NewMiner(conn, localPort, tlsConfig) miner.events = p.events miner.splitter = p.splitter if p.config != nil { miner.accessPassword = p.config.AccessPassword miner.algoExtension = p.config.AlgoExtension } miner.Start() }