package proxy import ( "bufio" "context" "crypto/tls" "encoding/json" "net" "net/http" "reflect" "sort" "strconv" "strings" "sync/atomic" "time" ) const ( maxStratumLineLength = 16384 minerLoginTimeout = 10 * time.Second minerReadyTimeout = 600 * time.Second submitDrainTimeout = 5 * time.Second maskedPassword = "********" ) // MinerSnapshot is a serialisable view of one miner connection. // // snapshots := p.MinerSnapshots() // for _, s := range snapshots { // _ = s.ID // 1 // _ = s.IP // "10.0.0.1:49152" // _ = s.Diff // 100000 // } type MinerSnapshot struct { ID int64 IP string TX uint64 RX uint64 State MinerState Diff uint64 User string Password string RigID string Agent string } // cfg := &proxy.Config{Mode: "nicehash", Bind: []proxy.BindAddr{{Host: "0.0.0.0", Port: 3333}}, Pools: []proxy.PoolConfig{{URL: "pool.example:3333", Enabled: true}}} // p, result := proxy.New(cfg) // // if !result.OK { // return result.Error // } func New(config *Config) (*Proxy, Result) { if config == nil { return nil, newErrorResult(NewScopedError("proxy", "config is nil", nil)) } if result := config.Validate(); !result.OK { return nil, result } p := &Proxy{ config: config, events: NewEventBus(), stats: NewStats(), workers: NewWorkers(config.Workers, nil), miners: make(map[int64]*Miner), customDiff: NewCustomDiff(config.CustomDiff), customDiffBuckets: NewCustomDiffBuckets(config.CustomDiffStats), rateLimit: NewRateLimiter(config.RateLimit), accessLog: newAccessLogSink(config.AccessLogFile), shareLog: newShareLogSink(config.ShareLogFile), done: make(chan struct{}), } p.events.Subscribe(EventLogin, p.customDiff.OnLogin) p.workers.bindEvents(p.events) if p.accessLog != nil { p.events.Subscribe(EventLogin, p.accessLog.OnLogin) p.events.Subscribe(EventClose, p.accessLog.OnClose) } p.events.Subscribe(EventLogin, p.stats.OnLogin) p.events.Subscribe(EventClose, p.stats.OnClose) p.events.Subscribe(EventAccept, p.stats.OnAccept) p.events.Subscribe(EventReject, p.stats.OnReject) shareSinks := make([]ShareSink, 0, 2) if p.shareLog != nil { shareSinks = append(shareSinks, p.shareLog) } if p.customDiffBuckets != nil { shareSinks = append(shareSinks, p.customDiffBuckets) } if len(shareSinks) > 0 { p.shareSink = newShareSinkGroup(shareSinks...) p.events.Subscribe(EventAccept, p.shareSink.OnAccept) p.events.Subscribe(EventReject, p.shareSink.OnReject) } p.events.Subscribe(EventAccept, p.onShareSettled) p.events.Subscribe(EventReject, p.onShareSettled) if config.Watch && config.configPath != "" { p.watcher = NewConfigWatcher(config.configPath, p.Reload) } if factory, ok := splitterFactoryForMode(config.Mode); ok { p.splitter = factory(config, p.events) } else { p.splitter = &noopSplitter{} } return p, newSuccessResult() } // Mode returns the runtime mode, for example "nicehash" or "simple". // // mode := p.Mode() // "nicehash" func (p *Proxy) Mode() string { if p == nil || p.config == nil { return "" } p.configMu.RLock() defer p.configMu.RUnlock() return p.config.Mode } // WorkersMode returns the active worker identity strategy, for example proxy.WorkersByRigID. // // mode := p.WorkersMode() // proxy.WorkersByRigID func (p *Proxy) WorkersMode() WorkersMode { if p == nil || p.config == nil { return WorkersDisabled } p.configMu.RLock() defer p.configMu.RUnlock() return p.config.Workers } // Summary returns a snapshot of the current global metrics. // // summary := p.Summary() // _ = summary.Accepted func (p *Proxy) Summary() StatsSummary { if p == nil || p.stats == nil { return StatsSummary{} } summary := p.stats.Summary() if p.customDiffBuckets != nil { summary.CustomDiffStats = p.customDiffBuckets.Snapshot() } return summary } // WorkerRecords returns a snapshot of the current worker aggregates. // // records := p.WorkerRecords() // for _, r := range records { _ = r.Name } func (p *Proxy) WorkerRecords() []WorkerRecord { if p == nil || p.workers == nil { return nil } return p.workers.List() } // MinerSnapshots returns a snapshot of the live miner connections. // // snapshots := p.MinerSnapshots() // for _, s := range snapshots { _ = s.IP } func (p *Proxy) MinerSnapshots() []MinerSnapshot { if p == nil { return nil } p.minersMu.RLock() defer p.minersMu.RUnlock() rows := make([]MinerSnapshot, 0, len(p.miners)) for _, miner := range p.miners { ip := miner.RemoteAddr() if ip == "" { ip = miner.IP() } rows = append(rows, MinerSnapshot{ ID: miner.id, IP: ip, TX: miner.tx, RX: miner.rx, State: miner.state, Diff: miner.diff, User: miner.user, Password: maskedPassword, RigID: miner.rigID, Agent: miner.agent, }) } sort.Slice(rows, func(i, j int) bool { return rows[i].ID < rows[j].ID }) return rows } // MinerCount returns the current and peak miner counts. // // now, max := p.MinerCount() // 142, 200 func (p *Proxy) MinerCount() (now, max uint64) { if p == nil || p.stats == nil { return 0, 0 } return p.stats.miners.Load(), p.stats.maxMiners.Load() } // Upstreams returns the current upstream connection counts. // // stats := p.Upstreams() // _ = stats.Active // 1 func (p *Proxy) Upstreams() UpstreamStats { if p == nil || p.splitter == nil { return UpstreamStats{} } return p.splitter.Upstreams() } // Events returns the proxy event bus for subscription. // // bus := p.Events() // bus.Subscribe(proxy.EventAccept, handler) func (p *Proxy) Events() *EventBus { if p == nil { return nil } return p.events } // p.Start() // // go func() { // time.Sleep(30 * time.Second) // p.Stop() // }() // p.Start() func (p *Proxy) Start() { if p == nil { return } if p.config == nil { return } for _, bind := range p.config.Bind { var tlsConfig *tls.Config if bind.TLS { if !p.config.TLS.Enabled { p.Stop() return } var result Result tlsConfig, result = buildTLSConfig(p.config.TLS) if !result.OK { p.Stop() return } } server, result := NewServer(bind, tlsConfig, p.rateLimit, p.acceptMiner) if !result.OK { p.Stop() return } p.servers = append(p.servers, server) server.Start() } if p.splitter != nil { p.splitter.Connect() } if p.watcher != nil { p.watcher.Start() } if p.config.HTTP.Enabled { if !p.startMonitoringServer() { p.Stop() return } } p.ticker = time.NewTicker(time.Second) go func() { var ticks uint64 for { select { case <-p.ticker.C: ticks++ if p.stats != nil { p.stats.Tick() } if p.workers != nil { p.workers.Tick() } if p.rateLimit != nil { p.rateLimit.Tick() } if p.splitter != nil { p.splitter.Tick(ticks) if ticks%60 == 0 { p.splitter.GC() } } case <-p.done: return } } }() <-p.done p.Stop() } // Stop shuts down listeners, pool connections, the config watcher, and log sinks. // // p.Stop() func (p *Proxy) Stop() { if p == nil { return } p.stopOnce.Do(func() { close(p.done) if p.ticker != nil { p.ticker.Stop() } for _, server := range p.servers { server.Stop() } if p.watcher != nil { p.watcher.Stop() } if p.httpServer != nil { ctx, cancel := context.WithTimeout(context.Background(), submitDrainTimeout) defer cancel() _ = p.httpServer.Shutdown(ctx) } deadline := time.Now().Add(submitDrainTimeout) for p.submitCount.Load() > 0 && time.Now().Before(deadline) { time.Sleep(10 * time.Millisecond) } p.closeAllMiners() if splitter, ok := p.splitter.(interface{ Disconnect() }); ok { splitter.Disconnect() } if p.accessLog != nil { p.accessLog.Close() } if p.shareLog != nil { p.shareLog.Close() } }) } func (p *Proxy) closeAllMiners() { if p == nil { return } for _, miner := range p.activeMiners() { if miner != nil { miner.Close() } } } func (p *Proxy) activeMiners() []*Miner { if p == nil { return nil } p.minersMu.RLock() defer p.minersMu.RUnlock() miners := make([]*Miner, 0, len(p.miners)) for _, miner := range p.miners { miners = append(miners, miner) } return miners } // p.Reload(&proxy.Config{Mode: "simple", Pools: []proxy.PoolConfig{{URL: "pool.example:3333", Enabled: true}}}) // // p.Reload(&proxy.Config{ // Mode: "simple", // Pools: []proxy.PoolConfig{{URL: "pool.example:3333", Enabled: true}}, // }) func (p *Proxy) Reload(config *Config) { if p == nil || config == nil { return } if result := config.Validate(); !result.OK { return } p.configMu.Lock() poolsChanged := p.config == nil || !reflect.DeepEqual(p.config.Pools, config.Pools) workersChanged := p.config == nil || p.config.Workers != config.Workers nextWorkersMode := config.Workers if p.config == nil { p.config = config } else { preservedBind := append([]BindAddr(nil), p.config.Bind...) // Splitter wiring is established at start-up, so reload only swaps the // knobs that live subsystems can absorb without reconnecting listeners. preservedMode := p.config.Mode preservedConfigPath := p.config.configPath *p.config = *config p.config.Bind = preservedBind p.config.Mode = preservedMode p.config.configPath = preservedConfigPath } p.configMu.Unlock() if workersChanged && p.workers != nil { p.workers.ResetMode(nextWorkersMode, p.activeMiners()) } if p.customDiff != nil { p.customDiff.globalDiff.Store(config.CustomDiff) } p.reloadCustomDiff(config.CustomDiff) if p.customDiffBuckets != nil { p.customDiffBuckets.SetEnabled(config.CustomDiffStats) } p.rateLimit = NewRateLimiter(config.RateLimit) for _, server := range p.servers { if server != nil { server.limiter = p.rateLimit } } if p.accessLog != nil { p.accessLog.SetPath(config.AccessLogFile) } if p.shareLog != nil { p.shareLog.SetPath(config.ShareLogFile) } p.reloadWatcher(config.Watch) if poolsChanged { if reloadable, ok := p.splitter.(interface{ ReloadPools() }); ok { reloadable.ReloadPools() } } } func (p *Proxy) reloadCustomDiff(globalDiff uint64) { if p == nil { return } for _, miner := range p.activeMiners() { if miner == nil { continue } miner.globalDiff = globalDiff if miner.customDiffFromLogin { continue } miner.customDiff = globalDiff switch miner.state { case MinerStateWaitReady, MinerStateReady: job := miner.CurrentJob() if job.IsValid() { miner.ForwardJob(job, job.Algo) } } } } func (p *Proxy) reloadWatcher(enabled bool) { if p == nil || p.config == nil || p.config.configPath == "" { return } if enabled { if p.watcher != nil { return } p.watcher = NewConfigWatcher(p.config.configPath, p.Reload) p.watcher.Start() return } if p.watcher == nil { return } p.watcher.Stop() p.watcher = nil } func (p *Proxy) onShareSettled(Event) { if p == nil { return } for { current := p.submitCount.Load() if current == 0 { return } if p.submitCount.CompareAndSwap(current, current-1) { return } } } func (p *Proxy) acceptMiner(conn net.Conn, localPort uint16) { if p == nil { _ = conn.Close() return } p.configMu.RLock() accessPassword := "" algoExtension := false customDiff := uint64(0) mode := "" if p.config != nil { accessPassword = p.config.AccessPassword algoExtension = p.config.AlgoExtension customDiff = p.config.CustomDiff mode = p.config.Mode } p.configMu.RUnlock() if p.stats != nil { p.stats.connections.Add(1) } miner := NewMiner(conn, localPort, nil) miner.accessPassword = accessPassword miner.algoEnabled = algoExtension miner.globalDiff = customDiff miner.extNH = strings.EqualFold(mode, "nicehash") miner.onLogin = func(m *Miner) { if p.splitter != nil { p.splitter.OnLogin(&LoginEvent{Miner: m}) } } miner.onLoginReady = func(m *Miner) { if p.events != nil { p.events.Dispatch(Event{Type: EventLogin, Miner: m}) } } miner.onSubmit = func(m *Miner, event *SubmitEvent) { if p.splitter != nil { if _, ok := p.splitter.(*noopSplitter); !ok { p.submitCount.Add(1) } p.splitter.OnSubmit(event) } } miner.onClose = func(m *Miner) { if p.events != nil { p.events.Dispatch(Event{Type: EventClose, Miner: m}) } if p.splitter != nil { p.splitter.OnClose(&CloseEvent{Miner: m}) } p.minersMu.Lock() delete(p.miners, m.id) p.minersMu.Unlock() } p.minersMu.Lock() p.miners[miner.id] = miner p.minersMu.Unlock() miner.Start() } func buildTLSConfig(cfg TLSConfig) (*tls.Config, Result) { if !cfg.Enabled { return nil, newSuccessResult() } if cfg.CertFile == "" || cfg.KeyFile == "" { return nil, newErrorResult(NewScopedError("proxy.tls", "tls certificate or key path is empty", nil)) } cert, err := tls.LoadX509KeyPair(cfg.CertFile, cfg.KeyFile) if err != nil { return nil, newErrorResult(NewScopedError("proxy.tls", "load certificate failed", err)) } tlsConfig := &tls.Config{Certificates: []tls.Certificate{cert}} applyTLSProtocols(tlsConfig, cfg.Protocols) applyTLSCiphers(tlsConfig, cfg.Ciphers) return tlsConfig, newSuccessResult() } func applyTLSProtocols(tlsConfig *tls.Config, protocols string) { if tlsConfig == nil || strings.TrimSpace(protocols) == "" { return } parts := splitTLSConfigList(protocols) minVersion := uint16(0) maxVersion := uint16(0) for _, part := range parts { if part == "" { continue } if strings.Contains(part, "-") { bounds := strings.SplitN(part, "-", 2) low := parseTLSVersion(bounds[0]) high := parseTLSVersion(bounds[1]) if low == 0 || high == 0 { continue } if minVersion == 0 || low < minVersion { minVersion = low } if high > maxVersion { maxVersion = high } continue } version := parseTLSVersion(part) if version == 0 { continue } if minVersion == 0 || version < minVersion { minVersion = version } if version > maxVersion { maxVersion = version } } if minVersion != 0 { tlsConfig.MinVersion = minVersion } if maxVersion != 0 { tlsConfig.MaxVersion = maxVersion } } func applyTLSCiphers(tlsConfig *tls.Config, ciphers string) { if tlsConfig == nil || strings.TrimSpace(ciphers) == "" { return } parts := splitTLSConfigList(ciphers) for _, part := range parts { if id, ok := lookupTLSCipherSuite(part); ok { tlsConfig.CipherSuites = append(tlsConfig.CipherSuites, id) } } } func lookupTLSCipherSuite(value string) (uint16, bool) { name := strings.ToLower(strings.TrimSpace(value)) if name == "" { return 0, false } allowed := map[string]uint16{} for _, suite := range tls.CipherSuites() { allowed[strings.ToLower(suite.Name)] = suite.ID } for _, suite := range tls.InsecureCipherSuites() { allowed[strings.ToLower(suite.Name)] = suite.ID } if id, ok := allowed[name]; ok { return id, true } if alias, ok := tlsCipherSuiteAliases[name]; ok { if id, ok := allowed[strings.ToLower(alias)]; ok { return id, true } } return 0, false } var tlsCipherSuiteAliases = map[string]string{ "ecdhe-ecdsa-aes128-gcm-sha256": "TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256", "ecdhe-rsa-aes128-gcm-sha256": "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256", "ecdhe-ecdsa-aes256-gcm-sha384": "TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384", "ecdhe-rsa-aes256-gcm-sha384": "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384", "ecdhe-ecdsa-chacha20-poly1305": "TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305", "ecdhe-rsa-chacha20-poly1305": "TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305", "dhe-rsa-aes128-gcm-sha256": "TLS_DHE_RSA_WITH_AES_128_GCM_SHA256", "dhe-rsa-aes256-gcm-sha384": "TLS_DHE_RSA_WITH_AES_256_GCM_SHA384", "aes128-gcm-sha256": "TLS_RSA_WITH_AES_128_GCM_SHA256", "aes256-gcm-sha384": "TLS_RSA_WITH_AES_256_GCM_SHA384", "aes128-sha": "TLS_RSA_WITH_AES_128_CBC_SHA", "aes256-sha": "TLS_RSA_WITH_AES_256_CBC_SHA", "ecdhe-ecdsa-aes128-sha256": "TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA256", "ecdhe-rsa-aes128-sha256": "TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256", "ecdhe-ecdsa-aes256-sha384": "TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA384", "ecdhe-rsa-aes256-sha384": "TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA384", } func splitTLSConfigList(value string) []string { return strings.FieldsFunc(value, func(r rune) bool { switch r { case ',', ';', ':', '|', ' ': return true default: return false } }) } func parseTLSVersion(value string) uint16 { switch strings.ToLower(strings.TrimSpace(value)) { case "tls1.0", "tlsv1.0", "tls1", "tlsv1", "1.0", "1", "tls10", "tlsv10": return tls.VersionTLS10 case "tls1.1", "tlsv1.1", "1.1", "tls11", "tlsv11": return tls.VersionTLS11 case "tls1.2", "tlsv1.2", "1.2", "tls12", "tlsv12": return tls.VersionTLS12 case "tls1.3", "tlsv1.3", "1.3", "tls13", "tlsv13": return tls.VersionTLS13 default: return 0 } } func (p *Proxy) startMonitoringServer() bool { if p == nil || p.config == nil || !p.config.HTTP.Enabled { return false } mux := http.NewServeMux() p.registerMonitoringRoute(mux, MonitoringRouteSummary, func() any { return p.SummaryDocument() }) p.registerMonitoringRoute(mux, MonitoringRouteWorkers, func() any { return p.WorkersDocument() }) p.registerMonitoringRoute(mux, MonitoringRouteMiners, func() any { return p.MinersDocument() }) addr := net.JoinHostPort(p.config.HTTP.Host, strconv.Itoa(int(p.config.HTTP.Port))) listener, err := net.Listen("tcp", addr) if err != nil { return false } p.httpServer = &http.Server{Addr: addr, Handler: mux} go func() { err := p.httpServer.Serve(listener) if err != nil && err != http.ErrServerClosed { p.Stop() } }() return true } func (p *Proxy) registerMonitoringRoute(mux *http.ServeMux, pattern string, renderDocument func() any) { if p == nil || mux == nil || renderDocument == nil { return } mux.HandleFunc(pattern, func(w http.ResponseWriter, r *http.Request) { if status, ok := p.AllowMonitoringRequest(r); !ok { switch status { case http.StatusUnauthorized: w.Header().Set("WWW-Authenticate", "Bearer") case http.StatusMethodNotAllowed: w.Header().Set("Allow", http.MethodGet) } w.WriteHeader(status) return } p.writeJSONResponse(w, renderDocument()) }) } // AllowMonitoringRequest applies the configured monitoring API access checks. // // status, ok := p.AllowMonitoringRequest(request) func (p *Proxy) AllowMonitoringRequest(r *http.Request) (int, bool) { if p == nil || p.config == nil { return http.StatusServiceUnavailable, false } if p.config.HTTP.Restricted && r.Method != http.MethodGet { return http.StatusMethodNotAllowed, false } if token := p.config.HTTP.AccessToken; token != "" { parts := strings.SplitN(r.Header.Get("Authorization"), " ", 2) if len(parts) != 2 || !strings.EqualFold(parts[0], "bearer") || parts[1] != token { return http.StatusUnauthorized, false } } return http.StatusOK, true } func (p *Proxy) writeJSONResponse(w http.ResponseWriter, payload any) { w.Header().Set("Content-Type", "application/json") _ = json.NewEncoder(w).Encode(payload) } // SummaryDocument builds the RFC-shaped /1/summary response body. // // doc := p.SummaryDocument() // _ = doc.Results.Accepted func (p *Proxy) SummaryDocument() SummaryDocument { summary := p.Summary() now, max := p.MinerCount() upstreams := p.Upstreams() return SummaryDocument{ Version: SummaryDocumentVersion, Mode: p.Mode(), Hashrate: HashrateDocument{ Total: summary.Hashrate, }, CustomDiffStats: summary.CustomDiffStats, Miners: MinersCountDocument{ Now: now, Max: max, }, Workers: uint64(len(p.WorkerRecords())), Upstreams: UpstreamDocument{Active: upstreams.Active, Sleep: upstreams.Sleep, Error: upstreams.Error, Total: upstreams.Total, Ratio: upstreamRatio(now, upstreams)}, Results: ResultsDocument{ Accepted: summary.Accepted, Rejected: summary.Rejected, Invalid: summary.Invalid, Expired: summary.Expired, AvgTime: summary.AvgTime, Latency: summary.AvgLatency, HashesTotal: summary.Hashes, Best: summary.TopDiff, }, } } // WorkersDocument builds the RFC-shaped /1/workers response body. // // doc := p.WorkersDocument() // _ = doc.Workers[0][0] func (p *Proxy) WorkersDocument() WorkersDocument { records := p.WorkerRecords() rows := make([]WorkerRow, 0, len(records)) for _, record := range records { hashrates := make([]float64, len(workerHashrateWindows)) for index, seconds := range workerHashrateWindows { hashrates[index] = record.Hashrate(seconds) } rows = append(rows, WorkerRow{ record.Name, record.LastIP, record.Connections, record.Accepted, record.Rejected, record.Invalid, record.Hashes, unixOrZero(record.LastHashAt), hashrates[0], hashrates[1], hashrates[2], hashrates[3], hashrates[4], }) } return WorkersDocument{ Mode: string(p.WorkersMode()), Workers: rows, } } // MinersDocument builds the RFC-shaped /1/miners response body. // // doc := p.MinersDocument() // _ = doc.Miners[0][7] func (p *Proxy) MinersDocument() MinersDocument { records := p.MinerSnapshots() rows := make([]MinerRow, 0, len(records)) for _, miner := range records { rows = append(rows, MinerRow{ miner.ID, miner.IP, miner.TX, miner.RX, miner.State, miner.Diff, miner.User, "********", miner.RigID, miner.Agent, }) } return MinersDocument{ Format: append([]string(nil), MinersDocumentFormat...), Miners: rows, } } func upstreamRatio(now uint64, upstreams UpstreamStats) float64 { if upstreams.Total == 0 { return 0 } return float64(now) / float64(upstreams.Total) } func unixOrZero(value time.Time) int64 { if value.IsZero() { return 0 } return value.Unix() } func NewMiner(conn net.Conn, localPort uint16, tlsCfg *tls.Config) *Miner { if tlsCfg != nil { if _, ok := conn.(*tls.Conn); !ok { conn = tls.Server(conn, tlsCfg) } } miner := &Miner{ id: nextMinerID(), state: MinerStateWaitLogin, localPort: localPort, mapperID: -1, routeID: -1, connectedAt: time.Now().UTC(), lastActivityAt: time.Now().UTC(), conn: conn, } if tlsConn, ok := conn.(*tls.Conn); ok { miner.tlsConn = tlsConn } if remote := conn.RemoteAddr(); remote != nil { miner.remoteAddr = remote.String() miner.ip = hostOnly(miner.remoteAddr) } return miner } // SetID assigns the miner's internal ID. Used by NonceStorage tests. // // m.SetID(42) func (m *Miner) SetID(id int64) { m.id = id } // ID returns the miner's monotonically increasing per-process identifier. // // id := m.ID() // 42 func (m *Miner) ID() int64 { return m.id } // SetMapperID assigns which NonceMapper owns this miner in NiceHash mode. // // m.SetMapperID(0) // assigned to mapper 0 func (m *Miner) SetMapperID(id int64) { m.mapperID = id } // MapperID returns the owning NonceMapper's ID, or -1 if unassigned. // // if m.MapperID() < 0 { /* miner not assigned to any mapper */ } func (m *Miner) MapperID() int64 { return m.mapperID } // SetRouteID assigns the SimpleMapper ID in simple mode. // // m.SetRouteID(3) func (m *Miner) SetRouteID(id int64) { m.routeID = id } // RouteID returns the SimpleMapper ID, or -1 if unassigned. // // if m.RouteID() < 0 { /* miner not routed */ } func (m *Miner) RouteID() int64 { return m.routeID } // SetExtendedNiceHash enables or disables NiceHash nonce-splitting mode for this miner. // // m.SetExtendedNiceHash(true) func (m *Miner) SetExtendedNiceHash(enabled bool) { m.extNH = enabled } // ExtendedNiceHash reports whether this miner is in NiceHash nonce-splitting mode. // // if m.ExtendedNiceHash() { /* blob byte 39 is patched */ } func (m *Miner) ExtendedNiceHash() bool { return m.extNH } // SetCurrentJob assigns the current pool work unit to this miner. // // m.SetCurrentJob(proxy.Job{Blob: "...", JobID: "job-1"}) func (m *Miner) SetCurrentJob(job Job) { m.currentJob = job } // CurrentJob returns the last job forwarded to this miner. // // job := m.CurrentJob() // if job.IsValid() { /* miner has a valid job */ } func (m *Miner) CurrentJob() Job { return m.currentJob } // LoginAlgos returns the algorithm list sent by the miner during login, or nil if empty. // // algos := m.LoginAlgos() // ["cn/r", "rx/0"] func (m *Miner) LoginAlgos() []string { if m == nil || len(m.loginAlgos) == 0 { return nil } return append([]string(nil), m.loginAlgos...) } // FixedByte returns the NiceHash slot index (0-255) assigned to this miner. // // slot := m.FixedByte() // 0x2A func (m *Miner) FixedByte() uint8 { return m.fixedByte } // SetFixedByte assigns the NiceHash slot index for this miner. // // m.SetFixedByte(0x2A) func (m *Miner) SetFixedByte(value uint8) { m.fixedByte = value } // IP returns the remote IP address (without port) for logging. // // ip := m.IP() // "10.0.0.1" func (m *Miner) IP() string { return m.ip } // RemoteAddr returns the full remote address including port. // // addr := m.RemoteAddr() // "10.0.0.1:49152" func (m *Miner) RemoteAddr() string { if m == nil { return "" } return m.remoteAddr } // User returns the wallet address from login params, with any custom diff suffix stripped. // // user := m.User() // "WALLET" (even if login was "WALLET+50000") func (m *Miner) User() string { return m.user } // Password returns the login params.pass value. // // pass := m.Password() // "x" func (m *Miner) Password() string { return m.password } // Agent returns the mining software identifier from login params. // // agent := m.Agent() // "XMRig/6.21.0" func (m *Miner) Agent() string { return m.agent } // RigID returns the optional rigid extension field from login params. // // rigid := m.RigID() // "rig-alpha" func (m *Miner) RigID() string { return m.rigID } // RX returns the total bytes received from this miner. // // rx := m.RX() // 4096 func (m *Miner) RX() uint64 { return m.rx } // TX returns the total bytes sent to this miner. // // tx := m.TX() // 8192 func (m *Miner) TX() uint64 { return m.tx } // Diff returns the last difficulty sent to this miner from the pool. // // diff := m.Diff() // 100000 func (m *Miner) Diff() uint64 { return m.diff } // State returns the current lifecycle state of this miner connection. // // if m.State() == proxy.MinerStateReady { /* miner is active */ } func (m *Miner) State() MinerState { return m.state } func (m *Miner) supportsAlgoExtension() bool { return m != nil && m.algoEnabled && m.extAlgo } // Start launches the read loop. func (m *Miner) Start() { if m == nil { return } go m.readLoop() } func (m *Miner) readLoop() { defer func() { if m.onClose != nil { m.onClose(m) } }() reader := bufio.NewReaderSize(m.conn, maxStratumLineLength+1) for { if m.state == MinerStateClosing { return } if timeout := m.readTimeout(); timeout > 0 { _ = m.conn.SetReadDeadline(time.Now().Add(timeout)) } line, isPrefix, err := reader.ReadLine() if err != nil { m.Close() return } if isPrefix || len(line) > maxStratumLineLength { m.Close() return } if len(line) == 0 { continue } m.rx += uint64(len(line) + 1) m.lastActivityAt = time.Now().UTC() if !m.handleLine(line) { m.Close() return } } } func (m *Miner) readTimeout() time.Duration { switch m.state { case MinerStateWaitLogin: return minerLoginTimeout case MinerStateWaitReady, MinerStateReady: return minerReadyTimeout default: return 0 } } type stratumRequest struct { ID any `json:"id"` JSONRPC string `json:"jsonrpc"` Method string `json:"method"` Params json.RawMessage `json:"params"` } func (m *Miner) handleLine(line []byte) bool { var request stratumRequest if err := json.Unmarshal(line, &request); err != nil { return false } switch request.Method { case "login": m.handleLogin(request) case "submit": m.handleSubmit(request) case "keepalived": m.handleKeepalived(request) } return true } type loginParams struct { Login string `json:"login"` Pass string `json:"pass"` Agent string `json:"agent"` Algo []string `json:"algo"` RigID string `json:"rigid"` } func (m *Miner) handleLogin(request stratumRequest) { if m.state != MinerStateWaitLogin { return } var params loginParams if err := json.Unmarshal(request.Params, ¶ms); err != nil || strings.TrimSpace(params.Login) == "" { m.ReplyWithError(requestID(request.ID), "Invalid payment address provided") return } if m.accessPassword != "" && params.Pass != m.accessPassword { m.ReplyWithError(requestID(request.ID), "Invalid password") return } resolved := resolveLoginCustomDiff(params.Login, m.globalDiff) m.user = resolved.user m.customDiff = resolved.diff m.customDiffFromLogin = resolved.fromLogin m.customDiffResolved = true m.password = params.Pass m.agent = params.Agent m.rigID = params.RigID m.loginAlgos = append([]string(nil), params.Algo...) m.extAlgo = len(m.loginAlgos) > 0 m.rpcID = generateUUID() if m.onLogin != nil { m.onLogin(m) } if m.state == MinerStateClosing { return } m.state = MinerStateWaitReady if m.extNH { if m.MapperID() < 0 { m.state = MinerStateWaitLogin m.rpcID = "" m.ReplyWithError(requestID(request.ID), "Proxy is full, try again later") return } } else if m.RouteID() < 0 { m.state = MinerStateWaitLogin m.rpcID = "" m.ReplyWithError(requestID(request.ID), "Proxy is unavailable, try again later") return } if m.onLoginReady != nil { m.onLoginReady(m) } m.replyLoginSuccess(requestID(request.ID)) } type resolvedCustomDiff struct { user string diff uint64 fromLogin bool } func resolveLoginCustomDiff(login string, globalDiff uint64) resolvedCustomDiff { plus := strings.LastIndex(login, "+") if plus >= 0 && plus < len(login)-1 { suffix := login[plus+1:] if isDecimalDigits(suffix) { if parsed, err := strconv.ParseUint(suffix, 10, 64); err == nil { return resolvedCustomDiff{user: login[:plus], diff: parsed, fromLogin: true} } } return resolvedCustomDiff{user: login} } if globalDiff > 0 { return resolvedCustomDiff{user: login, diff: globalDiff} } return resolvedCustomDiff{user: login} } func isDecimalDigits(value string) bool { if value == "" { return false } for _, r := range value { if r < '0' || r > '9' { return false } } return true } func (m *Miner) handleSubmit(request stratumRequest) { if m.state != MinerStateReady { m.ReplyWithError(requestID(request.ID), "Unauthenticated") return } var params struct { ID string `json:"id"` JobID string `json:"job_id"` Nonce string `json:"nonce"` Result string `json:"result"` Algo string `json:"algo"` } if err := json.Unmarshal(request.Params, ¶ms); err != nil { m.ReplyWithError(requestID(request.ID), "Invalid nonce") return } if params.ID != m.rpcID { m.ReplyWithError(requestID(request.ID), "Unauthenticated") return } if params.JobID == "" { m.ReplyWithError(requestID(request.ID), "Missing job id") return } if !isLowerHex8(params.Nonce) { m.ReplyWithError(requestID(request.ID), "Invalid nonce") return } if m.onSubmit != nil { m.onSubmit(m, &SubmitEvent{ Miner: m, JobID: params.JobID, Nonce: params.Nonce, Result: params.Result, Algo: params.Algo, RequestID: requestID(request.ID), }) } m.touchActivity() } func (m *Miner) handleKeepalived(request stratumRequest) { m.touchActivity() m.Success(requestID(request.ID), "KEEPALIVED") } func requestID(id any) int64 { switch v := id.(type) { case float64: return int64(v) case int64: return v case int: return int64(v) case string: n, _ := strconv.ParseInt(v, 10, 64) return n default: return 0 } } func isLowerHex8(value string) bool { if len(value) != 8 { return false } for _, r := range value { if (r < '0' || r > '9') && (r < 'a' || r > 'f') { return false } } return true } func (m *Miner) ForwardJob(job Job, algo string) { if m == nil || !job.IsValid() { return } m.currentJob = job renderedJob, effectiveAlgo := m.renderJob(job, algo) payload := map[string]any{ "jsonrpc": "2.0", "method": "job", "params": map[string]any{ "blob": renderedJob.Blob, "job_id": renderedJob.JobID, "target": renderedJob.Target, "id": m.rpcID, "height": renderedJob.Height, "seed_hash": renderedJob.SeedHash, }, } if m.supportsAlgoExtension() && effectiveAlgo != "" { payload["params"].(map[string]any)["algo"] = effectiveAlgo } _ = m.writeJSON(payload) m.touchActivity() if m.state == MinerStateWaitReady { m.state = MinerStateReady } } func (m *Miner) replyLoginSuccess(id int64) { if m == nil { return } result := map[string]any{ "id": m.rpcID, "status": "OK", } if m.supportsAlgoExtension() { result["extensions"] = []string{"algo"} } if job := m.CurrentJob(); job.IsValid() { renderedJob, effectiveAlgo := m.renderJob(job, job.Algo) jobPayload := map[string]any{ "blob": renderedJob.Blob, "job_id": renderedJob.JobID, "target": renderedJob.Target, "id": m.rpcID, "height": renderedJob.Height, "seed_hash": renderedJob.SeedHash, } if m.supportsAlgoExtension() && effectiveAlgo != "" { jobPayload["algo"] = effectiveAlgo } result["job"] = jobPayload m.touchActivity() m.state = MinerStateReady } payload := map[string]any{ "id": id, "jsonrpc": "2.0", "error": nil, "result": result, } _ = m.writeJSON(payload) } func (m *Miner) renderJob(job Job, algo string) (Job, string) { if m == nil { return job, algo } rendered := job if algo == "" { algo = job.Algo } if m.extNH { rendered.Blob = job.BlobWithFixedByte(m.fixedByte) } effectiveDiff := job.DifficultyFromTarget() if m.customDiff > 0 && effectiveDiff > 0 && effectiveDiff > m.customDiff { rendered.Target = targetFromDifficulty(m.customDiff) effectiveDiff = rendered.DifficultyFromTarget() } m.diff = effectiveDiff return rendered, algo } func (m *Miner) ReplyWithError(id int64, message string) { if m == nil { return } payload := map[string]any{ "id": id, "jsonrpc": "2.0", "error": map[string]any{ "code": -1, "message": message, }, } _ = m.writeJSON(payload) } func (m *Miner) Success(id int64, status string) { if m == nil { return } payload := map[string]any{ "id": id, "jsonrpc": "2.0", "error": nil, "result": map[string]any{ "status": status, }, } _ = m.writeJSON(payload) } func (m *Miner) touchActivity() { if m == nil { return } m.lastActivityAt = time.Now().UTC() if m.conn != nil { _ = m.conn.SetReadDeadline(time.Now().Add(minerReadyTimeout)) } } func (m *Miner) writeJSON(payload any) error { m.sendMu.Lock() defer m.sendMu.Unlock() if m.conn == nil { return nil } data, err := json.Marshal(payload) if err != nil { return err } data = append(data, '\n') var written int if len(data) <= len(m.buf) { copy(m.buf[:], data) written, err = m.conn.Write(m.buf[:len(data)]) } else { written, err = m.conn.Write(data) } m.tx += uint64(written) if err != nil { m.Close() } return err } func (m *Miner) Close() { if m == nil { return } m.closeOnce.Do(func() { m.state = MinerStateClosing if m.conn != nil { _ = m.conn.Close() } }) } // NewStats creates zeroed global metrics. // // stats := proxy.NewStats() // _ = stats.Summary() func NewStats() *Stats { stats := &Stats{startTime: time.Now().UTC(), latency: make([]uint16, 0, 1024)} stats.windows[HashrateWindow60s] = newTickWindow(60) stats.windows[HashrateWindow600s] = newTickWindow(600) stats.windows[HashrateWindow3600s] = newTickWindow(3600) stats.windows[HashrateWindow12h] = newTickWindow(43200) stats.windows[HashrateWindow24h] = newTickWindow(86400) return stats } // OnLogin increments the current miner count. func (s *Stats) OnLogin(e Event) { if s == nil || e.Miner == nil { return } now := s.miners.Add(1) for { max := s.maxMiners.Load() if now <= max || s.maxMiners.CompareAndSwap(max, now) { break } } } // OnClose decrements the current miner count. func (s *Stats) OnClose(e Event) { if s == nil || e.Miner == nil { return } for { current := s.miners.Load() if current == 0 || s.miners.CompareAndSwap(current, current-1) { return } } } // OnAccept records an accepted share. // // stats.OnAccept(proxy.Event{Diff: 100000, Latency: 82}) func (s *Stats) OnAccept(e Event) { if s == nil { return } s.accepted.Add(1) if e.Expired { s.expired.Add(1) } s.mu.Lock() if len(s.latency) < 10000 { s.latency = append(s.latency, e.Latency) } insertTopDiff(&s.topDiff, e.Diff) for i := range s.windows { if s.windows[i].size > 0 { s.windows[i].buckets[s.windows[i].pos] += e.Diff } } s.mu.Unlock() if e.Diff > 0 { s.hashes.Add(e.Diff) } } // OnReject records a rejected share. // // stats.OnReject(proxy.Event{Error: "Low difficulty share"}) func (s *Stats) OnReject(e Event) { if s == nil { return } s.rejected.Add(1) if isInvalidShareReason(e.Error) { s.invalid.Add(1) } } // Tick advances the rolling windows. // // stats.Tick() func (s *Stats) Tick() { if s == nil { return } s.mu.Lock() defer s.mu.Unlock() for i := range s.windows { if s.windows[i].size == 0 { continue } s.windows[i].pos = (s.windows[i].pos + 1) % s.windows[i].size s.windows[i].buckets[s.windows[i].pos] = 0 } } // Summary returns a snapshot of the current metrics. // // summary := stats.Summary() func (s *Stats) Summary() StatsSummary { if s == nil { return StatsSummary{} } s.mu.Lock() defer s.mu.Unlock() summary := StatsSummary{ Accepted: s.accepted.Load(), Rejected: s.rejected.Load(), Invalid: s.invalid.Load(), Expired: s.expired.Load(), Hashes: s.hashes.Load(), } if summary.Accepted > 0 { uptime := uint64(time.Since(s.startTime).Seconds()) if uptime == 0 { uptime = 1 } summary.AvgTime = uint32(uptime / summary.Accepted) } if len(s.latency) > 0 { samples := append([]uint16(nil), s.latency...) sort.Slice(samples, func(i, j int) bool { return samples[i] < samples[j] }) summary.AvgLatency = uint32(samples[len(samples)/2]) } summary.TopDiff = s.topDiff for i := range s.windows { if i == HashrateWindowAll { uptime := time.Since(s.startTime).Seconds() if uptime > 0 { summary.Hashrate[i] = float64(summary.Hashes) / uptime } continue } total := uint64(0) for _, bucket := range s.windows[i].buckets { total += bucket } if s.windows[i].size > 0 { summary.Hashrate[i] = float64(total) / float64(s.windows[i].size) } } return summary } func insertTopDiff(top *[10]uint64, diff uint64) { if diff == 0 { return } for i := range top { if diff > top[i] { for j := len(top) - 1; j > i; j-- { top[j] = top[j-1] } top[i] = diff return } } } // NewWorkers creates a worker aggregate tracker. // // workers := proxy.NewWorkers(proxy.WorkersByRigID, bus) // workers.OnLogin(proxy.Event{Miner: miner}) func NewWorkers(mode WorkersMode, eventBus *EventBus) *Workers { workers := &Workers{ mode: mode, nameIndex: make(map[string]int), idIndex: make(map[int64]int), } workers.bindEvents(eventBus) return workers } func (w *Workers) bindEvents(eventBus *EventBus) { if w == nil || eventBus == nil { return } w.mu.Lock() defer w.mu.Unlock() if w.subscribed { return } eventBus.Subscribe(EventLogin, w.OnLogin) eventBus.Subscribe(EventAccept, w.OnAccept) eventBus.Subscribe(EventReject, w.OnReject) eventBus.Subscribe(EventClose, w.OnClose) w.subscribed = true } func workerNameFor(mode WorkersMode, miner *Miner) string { if miner == nil { return "" } switch mode { case WorkersByRigID: if miner.rigID != "" { return miner.rigID } return miner.user case WorkersByUser: return miner.user case WorkersByPass: return miner.password case WorkersByAgent: return miner.agent case WorkersByIP: return miner.ip case WorkersDisabled: return "" default: return miner.user } } // OnLogin creates or updates a worker record. // // workers.OnLogin(proxy.Event{Miner: miner}) func (w *Workers) OnLogin(e Event) { if w == nil || e.Miner == nil { return } w.mu.Lock() defer w.mu.Unlock() w.recordLoginLocked(e.Miner) } func newTickWindow(size int) tickWindow { return tickWindow{ buckets: make([]uint64, size), size: size, } } // ResetMode switches the worker identity strategy and rebuilds the live worker index. // // workers.ResetMode(proxy.WorkersByUser, activeMiners) func (w *Workers) ResetMode(mode WorkersMode, miners []*Miner) { if w == nil { return } w.mu.Lock() defer w.mu.Unlock() w.mode = mode w.entries = nil w.nameIndex = make(map[string]int) w.idIndex = make(map[int64]int) for _, miner := range miners { w.recordLoginLocked(miner) } } // OnAccept updates the owning worker with the accepted share. // // workers.OnAccept(proxy.Event{Miner: miner, Diff: 100000}) func (w *Workers) OnAccept(e Event) { if w == nil || e.Miner == nil { return } w.mu.Lock() defer w.mu.Unlock() index, ok := w.idIndex[e.Miner.id] if !ok || index < 0 || index >= len(w.entries) { return } record := &w.entries[index] record.Accepted++ record.Hashes += e.Diff record.LastHashAt = time.Now().UTC() record.LastIP = e.Miner.ip for i := range record.windows { if record.windows[i].size > 0 { record.windows[i].buckets[record.windows[i].pos] += e.Diff } } } // OnReject updates the owning worker with the rejected share. // // workers.OnReject(proxy.Event{Miner: miner, Error: "Low difficulty share"}) func (w *Workers) OnReject(e Event) { if w == nil || e.Miner == nil { return } w.mu.Lock() defer w.mu.Unlock() index, ok := w.idIndex[e.Miner.id] if !ok || index < 0 || index >= len(w.entries) { return } record := &w.entries[index] record.Rejected++ if isInvalidShareReason(e.Error) { record.Invalid++ } record.LastIP = e.Miner.ip } // OnClose removes the miner mapping from the worker table. func (w *Workers) OnClose(e Event) { if w == nil || e.Miner == nil { return } w.mu.Lock() defer w.mu.Unlock() delete(w.idIndex, e.Miner.id) } // List returns a snapshot of all workers. // // records := workers.List() func (w *Workers) List() []WorkerRecord { if w == nil { return nil } w.mu.RLock() defer w.mu.RUnlock() out := make([]WorkerRecord, len(w.entries)) for i := range w.entries { out[i] = cloneWorkerRecord(w.entries[i]) } return out } // Tick advances worker hash windows. // // workers.Tick() func (w *Workers) Tick() { if w == nil { return } w.mu.Lock() defer w.mu.Unlock() for i := range w.entries { for j := range w.entries[i].windows { if w.entries[i].windows[j].size == 0 { continue } w.entries[i].windows[j].pos = (w.entries[i].windows[j].pos + 1) % w.entries[i].windows[j].size w.entries[i].windows[j].buckets[w.entries[i].windows[j].pos] = 0 } } } // Hashrate returns the configured worker hashrate window. // // hr60 := record.Hashrate(60) func (r *WorkerRecord) Hashrate(seconds int) float64 { if r == nil || seconds <= 0 { return 0 } index := -1 switch seconds { case 60: index = 0 case 600: index = 1 case 3600: index = 2 case 43200: index = 3 case 86400: index = 4 } if index < 0 || index >= len(r.windows) { return 0 } total := uint64(0) for _, bucket := range r.windows[index].buckets { total += bucket } if seconds == 0 { return 0 } return float64(total) / float64(seconds) } func cloneWorkerRecord(record WorkerRecord) WorkerRecord { cloned := record for i := range record.windows { if len(record.windows[i].buckets) == 0 { continue } cloned.windows[i].buckets = append([]uint64(nil), record.windows[i].buckets...) } return cloned } func (w *Workers) recordLoginLocked(miner *Miner) { if w == nil || miner == nil || w.mode == WorkersDisabled { return } name := workerNameFor(w.mode, miner) if name == "" { return } index, ok := w.nameIndex[name] if !ok { index = len(w.entries) record := WorkerRecord{Name: name} record.windows[0] = newTickWindow(60) record.windows[1] = newTickWindow(600) record.windows[2] = newTickWindow(3600) record.windows[3] = newTickWindow(43200) record.windows[4] = newTickWindow(86400) w.entries = append(w.entries, record) w.nameIndex[name] = index } record := &w.entries[index] record.Name = name record.LastIP = miner.ip record.Connections++ w.idIndex[miner.id] = index } // Apply normalises one miner login at the same point the handshake does. // // cd.Apply(&proxy.Miner{user: "WALLET+50000"}) func (cd *CustomDiff) Apply(miner *Miner) { if cd == nil || miner == nil { return } cd.OnLogin(Event{Miner: miner}) } // NewServer constructs a server instance. // // server, result := proxy.NewServer(bind, tlsConfig, limiter, func(conn net.Conn, port uint16) { // _ = conn // _ = port // }) func NewServer(bind BindAddr, tlsConfig *tls.Config, limiter *RateLimiter, onAccept func(net.Conn, uint16)) (*Server, Result) { if onAccept == nil { onAccept = func(net.Conn, uint16) {} } server := &Server{ addr: bind, tlsConfig: tlsConfig, limiter: limiter, onAccept: onAccept, done: make(chan struct{}), } if result := server.listen(); !result.OK { return nil, result } return server, newSuccessResult() } // Start begins accepting connections in a goroutine. // // server.Start() func (s *Server) Start() { if s == nil { return } if result := s.listen(); !result.OK { return } go func() { for { conn, err := s.listener.Accept() if err != nil { select { case <-s.done: return default: continue } } if s.limiter != nil && !s.limiter.Allow(conn.RemoteAddr().String()) { _ = conn.Close() continue } if s.onAccept != nil { s.onAccept(conn, s.addr.Port) } } }() } // Stop closes the listener. // // server.Stop() func (s *Server) Stop() { if s == nil { return } select { case <-s.done: default: close(s.done) } if s.listener != nil { _ = s.listener.Close() } } func (s *Server) listen() Result { if s == nil { return newErrorResult(NewScopedError("proxy.server", "server is nil", nil)) } if s.listener != nil { return newSuccessResult() } if s.addr.TLS && s.tlsConfig == nil { return newErrorResult(NewScopedError("proxy.server", "tls listener requires a tls config", nil)) } ln, err := net.Listen("tcp", net.JoinHostPort(s.addr.Host, strconv.Itoa(int(s.addr.Port)))) if err != nil { return newErrorResult(NewScopedError("proxy.server", "listen failed", err)) } if s.tlsConfig != nil { ln = tls.NewListener(ln, s.tlsConfig) } s.listener = ln return newSuccessResult() } // IsActive reports whether the limiter has enabled rate limiting. // // if rl.IsActive() { /* rate limiting is enabled */ } func (rl *RateLimiter) IsActive() bool { return rl != nil && rl.limit.MaxConnectionsPerMinute > 0 } func nextMinerID() int64 { return atomic.AddInt64(&minerSeq, 1) } var minerSeq int64 type noopSplitter struct{} func (n *noopSplitter) Connect() {} func (n *noopSplitter) OnLogin(event *LoginEvent) {} func (n *noopSplitter) OnSubmit(event *SubmitEvent) {} func (n *noopSplitter) OnClose(event *CloseEvent) {} func (n *noopSplitter) Tick(ticks uint64) {} func (n *noopSplitter) GC() {} func (n *noopSplitter) Upstreams() UpstreamStats { return UpstreamStats{} }