diff --git a/config_runtime.go b/config_runtime.go index 0465ea4..fbf81a1 100644 --- a/config_runtime.go +++ b/config_runtime.go @@ -82,7 +82,7 @@ func (w *ConfigWatcher) Start() { ticker := time.NewTicker(time.Second) defer ticker.Stop() if info, errorValue := os.Stat(w.path); errorValue == nil { - w.lastModified = info.ModTime() + w.lastModifiedAt = info.ModTime() } for { @@ -93,10 +93,10 @@ func (w *ConfigWatcher) Start() { continue } - if !info.ModTime().After(w.lastModified) { + if !info.ModTime().After(w.lastModifiedAt) { continue } - w.lastModified = info.ModTime() + w.lastModifiedAt = info.ModTime() config, errorValue := LoadConfig(w.path) if errorValue == nil && w.onChange != nil { diff --git a/miner_methods.go b/miner_methods.go index aa3e138..4024ab8 100644 --- a/miner_methods.go +++ b/miner_methods.go @@ -7,14 +7,14 @@ import ( "time" ) -var minerSequence atomic.Int64 +var minerIDSequence atomic.Int64 // NewMiner creates a Miner for an accepted net.Conn. Does not start reading yet. // // m := proxy.NewMiner(conn, 3333, nil) func NewMiner(conn net.Conn, localPort uint16, tlsCfg *tls.Config) *Miner { miner := &Miner{ - id: minerSequence.Add(1), + id: minerIDSequence.Add(1), state: MinerStateWaitLogin, localPort: localPort, mapperID: -1, diff --git a/pool/client.go b/pool/client.go index f0110aa..7d094d8 100644 --- a/pool/client.go +++ b/pool/client.go @@ -27,15 +27,15 @@ import ( // client := pool.NewStratumClient(poolCfg, listener) // client.Connect() type StratumClient struct { - config proxy.PoolConfig - listener StratumListener - conn net.Conn - tlsConn *tls.Conn // nil if plain TCP - sessionID string // pool-assigned session id from login reply - sequence int64 // atomic JSON-RPC request id counter - active bool // true once first job received - disconnectOnce sync.Once - sendMu sync.Mutex + config proxy.PoolConfig + listener StratumListener + conn net.Conn + tlsConn *tls.Conn // nil if plain TCP + sessionID string // pool-assigned session id from login reply + requestSequence int64 // atomic JSON-RPC request id counter + active bool // true once first job received + disconnectOnce sync.Once + sendMu sync.Mutex } // StratumListener receives events from the pool connection. @@ -158,7 +158,7 @@ func (c *StratumClient) Login() { // // seq := client.Submit("job-1", "deadbeef", "HASH64HEX", "cn/r") func (c *StratumClient) Submit(jobID string, nonce string, result string, algo string) int64 { - sequence := atomic.AddInt64(&c.sequence, 1) + requestID := atomic.AddInt64(&c.requestSequence, 1) params := map[string]string{ "id": c.sessionID, "job_id": jobID, @@ -170,12 +170,12 @@ func (c *StratumClient) Submit(jobID string, nonce string, result string, algo s } _ = c.writeJSON(jsonRPCRequest{ - ID: sequence, + ID: requestID, Method: "submit", Params: params, }) - return sequence + return requestID } // Disconnect closes the connection cleanly. Triggers OnDisconnect on the listener. diff --git a/proxy.go b/proxy.go index 9d6310d..b4c7d47 100644 --- a/proxy.go +++ b/proxy.go @@ -23,23 +23,23 @@ import ( // p, result := proxy.New(cfg) // if result.OK { p.Start() } type Proxy struct { - config *Config - customDiff *CustomDiff - rateLimiter *RateLimiter - splitter Splitter - stats *Stats - workers *Workers - events *EventBus - currentMiners atomic.Uint64 - miners map[int64]*Miner - minerMu sync.RWMutex - servers []*Server - httpServer *http.Server - accessLogger *appendLineLogger - shareLogger *appendLineLogger - ticker *time.Ticker - watcher *ConfigWatcher - done chan struct{} + config *Config + customDifficulty *CustomDiff + rateLimiter *RateLimiter + splitter Splitter + stats *Stats + workers *Workers + events *EventBus + currentMiners atomic.Uint64 + miners map[int64]*Miner + minerMu sync.RWMutex + servers []*Server + httpServer *http.Server + accessLogger *appendLineLogger + shareLogger *appendLineLogger + ticker *time.Ticker + watcher *ConfigWatcher + done chan struct{} } // Splitter is the interface both NonceSplitter and SimpleSplitter satisfy. @@ -96,10 +96,10 @@ type CloseEvent struct { // }) // w.Start() type ConfigWatcher struct { - path string - onChange func(*Config) - lastModified time.Time - done chan struct{} + path string + onChange func(*Config) + lastModifiedAt time.Time + done chan struct{} } // RateLimiter implements per-IP token bucket connection rate limiting. @@ -110,10 +110,10 @@ type ConfigWatcher struct { // rl := proxy.NewRateLimiter(cfg.RateLimit) // if !rl.Allow("1.2.3.4") { conn.Close(); return } type RateLimiter struct { - config RateLimit - buckets map[string]*tokenBucket - banned map[string]time.Time - mu sync.Mutex + limitConfig RateLimit + buckets map[string]*tokenBucket + banned map[string]time.Time + mu sync.Mutex } // tokenBucket is a simple token bucket for one IP. @@ -128,8 +128,8 @@ type tokenBucket struct { // cd := proxy.NewCustomDiff(cfg.CustomDiff) // bus.Subscribe(proxy.EventLogin, cd.OnLogin) type CustomDiff struct { - defaultDifficulty uint64 - mu sync.RWMutex + globalDifficulty uint64 + mu sync.RWMutex } var splitterFactories = map[string]func(*Config, *EventBus) Splitter{ diff --git a/proxy_logging_runtime.go b/proxy_logging_runtime.go index 4d137d4..4e93d4d 100644 --- a/proxy_logging_runtime.go +++ b/proxy_logging_runtime.go @@ -8,10 +8,10 @@ import ( ) type appendLineLogger struct { - path string - mu sync.Mutex - file *os.File - closed bool + path string + mu sync.Mutex + openFile *os.File + closed bool } func newAppendLineLogger(path string) *appendLineLogger { @@ -30,15 +30,15 @@ func (l *appendLineLogger) writeLine(line string) { return } - if l.file == nil { + if l.openFile == nil { file, errorValue := os.OpenFile(l.path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644) if errorValue != nil { return } - l.file = file + l.openFile = file } - _, _ = l.file.WriteString(line) + _, _ = l.openFile.WriteString(line) } func (l *appendLineLogger) setPath(path string) { @@ -53,9 +53,9 @@ func (l *appendLineLogger) setPath(path string) { return } - if l.file != nil { - _ = l.file.Close() - l.file = nil + if l.openFile != nil { + _ = l.openFile.Close() + l.openFile = nil } l.closed = false l.path = path @@ -73,9 +73,9 @@ func (l *appendLineLogger) close() { return } l.closed = true - if l.file != nil { - _ = l.file.Close() - l.file = nil + if l.openFile != nil { + _ = l.openFile.Close() + l.openFile = nil } } diff --git a/proxy_runtime.go b/proxy_runtime.go index ea84d18..578d13f 100644 --- a/proxy_runtime.go +++ b/proxy_runtime.go @@ -25,22 +25,22 @@ func New(config *Config) (*Proxy, error) { eventBus := NewEventBus() statsValue := NewStats() - customDiffFilter := NewCustomDiff(config.CustomDiff) - eventBus.Subscribe(EventLogin, customDiffFilter.OnLogin) + customDifficultyFilter := NewCustomDiff(config.CustomDiff) + eventBus.Subscribe(EventLogin, customDifficultyFilter.OnLogin) workersValue := NewWorkers(config.Workers, eventBus) workersValue.SetCustomDiffStats(config.CustomDiffStats) splitterValue := newSplitter(config, eventBus) proxyInstance := &Proxy{ - config: config, - customDiff: customDiffFilter, - splitter: splitterValue, - stats: statsValue, - workers: workersValue, - events: eventBus, - miners: make(map[int64]*Miner), - rateLimiter: NewRateLimiter(config.RateLimit), - done: make(chan struct{}), + config: config, + customDifficulty: customDifficultyFilter, + splitter: splitterValue, + stats: statsValue, + workers: workersValue, + events: eventBus, + miners: make(map[int64]*Miner), + rateLimiter: NewRateLimiter(config.RateLimit), + done: make(chan struct{}), } proxyInstance.accessLogger = subscribeAccessLog(eventBus, config.AccessLogFile) @@ -236,8 +236,8 @@ func (p *Proxy) Reload(config *Config) { p.config.sourcePath = sourcePath p.config.Bind = bind } - if p.customDiff != nil { - p.customDiff.SetGlobalDiff(p.config.CustomDiff) + if p.customDifficulty != nil { + p.customDifficulty.SetGlobalDiff(p.config.CustomDiff) } if p.workers != nil { p.workers.SetCustomDiffStats(p.config.CustomDiffStats) diff --git a/proxy_runtime_test.go b/proxy_runtime_test.go index 92253b5..11dbb46 100644 --- a/proxy_runtime_test.go +++ b/proxy_runtime_test.go @@ -48,8 +48,8 @@ func TestProxy_Reload_Good(t *testing.T) { if proxyValue.config.CustomDiff != 250 { t.Fatalf("expected custom diff to reload, got %d", proxyValue.config.CustomDiff) } - if proxyValue.customDiff == nil || proxyValue.customDiff.defaultDifficulty != 250 { - t.Fatalf("expected live custom diff to update, got %+v", proxyValue.customDiff) + if proxyValue.customDifficulty == nil || proxyValue.customDifficulty.globalDifficulty != 250 { + t.Fatalf("expected live custom diff to update, got %+v", proxyValue.customDifficulty) } proxyValue.events.Dispatch(Event{Type: EventLogin, Miner: miner}) diff --git a/runtime_support.go b/runtime_support.go index 369cc50..20a5066 100644 --- a/runtime_support.go +++ b/runtime_support.go @@ -11,9 +11,9 @@ import ( // rl := proxy.NewRateLimiter(cfg.RateLimit) func NewRateLimiter(config RateLimit) *RateLimiter { return &RateLimiter{ - config: config, - buckets: make(map[string]*tokenBucket), - banned: make(map[string]time.Time), + limitConfig: config, + buckets: make(map[string]*tokenBucket), + banned: make(map[string]time.Time), } } @@ -26,7 +26,7 @@ func (rateLimiter *RateLimiter) SetConfig(config RateLimit) { } rateLimiter.mu.Lock() - rateLimiter.config = config + rateLimiter.limitConfig = config rateLimiter.mu.Unlock() } @@ -44,7 +44,7 @@ func (rateLimiter *RateLimiter) Allow(ip string) bool { rateLimiter.mu.Lock() defer rateLimiter.mu.Unlock() - if rateLimiter.config.MaxConnectionsPerMinute <= 0 { + if rateLimiter.limitConfig.MaxConnectionsPerMinute <= 0 { return true } @@ -58,7 +58,7 @@ func (rateLimiter *RateLimiter) Allow(ip string) bool { bucket, exists := rateLimiter.buckets[host] if !exists { bucket = &tokenBucket{ - tokens: rateLimiter.config.MaxConnectionsPerMinute, + tokens: rateLimiter.limitConfig.MaxConnectionsPerMinute, lastRefill: now, } rateLimiter.buckets[host] = bucket @@ -66,8 +66,8 @@ func (rateLimiter *RateLimiter) Allow(ip string) bool { rateLimiter.refillBucket(bucket, now) if bucket.tokens <= 0 { - if rateLimiter.config.BanDurationSeconds > 0 { - rateLimiter.banned[host] = now.Add(time.Duration(rateLimiter.config.BanDurationSeconds) * time.Second) + if rateLimiter.limitConfig.BanDurationSeconds > 0 { + rateLimiter.banned[host] = now.Add(time.Duration(rateLimiter.limitConfig.BanDurationSeconds) * time.Second) } return false } @@ -88,7 +88,7 @@ func (rateLimiter *RateLimiter) Tick() { rateLimiter.mu.Lock() defer rateLimiter.mu.Unlock() - if rateLimiter.config.MaxConnectionsPerMinute <= 0 { + if rateLimiter.limitConfig.MaxConnectionsPerMinute <= 0 { return } @@ -104,11 +104,11 @@ func (rateLimiter *RateLimiter) Tick() { } func (rateLimiter *RateLimiter) refillBucket(bucket *tokenBucket, now time.Time) { - if bucket == nil || rateLimiter.config.MaxConnectionsPerMinute <= 0 { + if bucket == nil || rateLimiter.limitConfig.MaxConnectionsPerMinute <= 0 { return } - refillEvery := time.Minute / time.Duration(rateLimiter.config.MaxConnectionsPerMinute) + refillEvery := time.Minute / time.Duration(rateLimiter.limitConfig.MaxConnectionsPerMinute) if refillEvery <= 0 { refillEvery = time.Second } @@ -120,8 +120,8 @@ func (rateLimiter *RateLimiter) refillBucket(bucket *tokenBucket, now time.Time) tokensToAdd := int(elapsed / refillEvery) bucket.tokens += tokensToAdd - if bucket.tokens > rateLimiter.config.MaxConnectionsPerMinute { - bucket.tokens = rateLimiter.config.MaxConnectionsPerMinute + if bucket.tokens > rateLimiter.limitConfig.MaxConnectionsPerMinute { + bucket.tokens = rateLimiter.limitConfig.MaxConnectionsPerMinute } bucket.lastRefill = bucket.lastRefill.Add(time.Duration(tokensToAdd) * refillEvery) } @@ -130,7 +130,7 @@ func (rateLimiter *RateLimiter) refillBucket(bucket *tokenBucket, now time.Time) // // cd := proxy.NewCustomDiff(50000) func NewCustomDiff(globalDiff uint64) *CustomDiff { - return &CustomDiff{defaultDifficulty: globalDiff} + return &CustomDiff{globalDifficulty: globalDiff} } // SetGlobalDiff updates the default custom difficulty override. @@ -142,7 +142,7 @@ func (customDiff *CustomDiff) SetGlobalDiff(globalDiff uint64) { } customDiff.mu.Lock() - customDiff.defaultDifficulty = globalDiff + customDiff.globalDifficulty = globalDiff customDiff.mu.Unlock() } @@ -172,7 +172,7 @@ func (customDiff *CustomDiff) OnLogin(event Event) { } customDiff.mu.RLock() - globalDiff := customDiff.defaultDifficulty + globalDiff := customDiff.globalDifficulty customDiff.mu.RUnlock() if globalDiff > 0 { event.Miner.SetCustomDiff(globalDiff) diff --git a/splitter/simple/splitter.go b/splitter/simple/splitter.go index baaff38..d96f706 100644 --- a/splitter/simple/splitter.go +++ b/splitter/simple/splitter.go @@ -25,7 +25,7 @@ type SimpleSplitter struct { events *proxy.EventBus strategyFactory pool.StrategyFactory mu sync.Mutex - sequence int64 // monotonic mapper sequence counter + mapperSequence int64 // monotonic mapper sequence counter } // NewSimpleSplitter creates the passthrough splitter. @@ -72,9 +72,9 @@ func (s *SimpleSplitter) OnLogin(event *proxy.LoginEvent) { } if mapper == nil { - s.sequence++ + s.mapperSequence++ var strategy pool.Strategy - mapper = NewSimpleMapper(s.sequence, nil) + mapper = NewSimpleMapper(s.mapperSequence, nil) mapper.events = s.events if s.strategyFactory != nil { strategy = s.strategyFactory(mapper) diff --git a/stats.go b/stats.go index b4befb9..65d073b 100644 --- a/stats.go +++ b/stats.go @@ -15,18 +15,18 @@ import ( // bus.Subscribe(proxy.EventAccept, s.OnAccept) // bus.Subscribe(proxy.EventReject, s.OnReject) type Stats struct { - accepted atomic.Uint64 - rejected atomic.Uint64 - invalid atomic.Uint64 - expired atomic.Uint64 - hashes atomic.Uint64 // cumulative sum of accepted share difficulties - connections atomic.Uint64 // total TCP connections accepted (ever) - maxMiners atomic.Uint64 // peak concurrent miner count - topDiff [10]uint64 // top-10 accepted difficulties, sorted descending; guarded by mu - latency []uint16 // pool response latencies in ms; capped at 10000 samples; guarded by mu - windows [6]tickWindow // one per hashrate reporting period - startTime time.Time - mu sync.Mutex + accepted atomic.Uint64 + rejected atomic.Uint64 + invalid atomic.Uint64 + expired atomic.Uint64 + hashes atomic.Uint64 // cumulative sum of accepted share difficulties + connections atomic.Uint64 // total TCP connections accepted (ever) + maxMiners atomic.Uint64 // peak concurrent miner count + topDifficulties [10]uint64 // top-10 accepted difficulties, sorted descending; guarded by mu + latencySamples []uint16 // pool response latencies in ms; capped at 10000 samples; guarded by mu + windows [6]tickWindow // one per hashrate reporting period + startTime time.Time + mu sync.Mutex } // Hashrate window sizes in seconds. Index maps to Stats.windows and SummaryResponse.Hashrate. @@ -68,8 +68,8 @@ var hashrateWindowSizes = [5]int{60, 600, 3600, 43200, 86400} // s := proxy.NewStats() func NewStats() *Stats { stats := &Stats{ - startTime: time.Now().UTC(), - latency: make([]uint16, 0, 128), + startTime: time.Now().UTC(), + latencySamples: make([]uint16, 0, 128), } for index, size := range hashrateWindowSizes { @@ -96,9 +96,9 @@ func (s *Stats) OnAccept(event Event) { for index := 0; index < HashrateWindowAll; index++ { s.windows[index].buckets[s.windows[index].pos] += event.Diff } - insertTopDiff(&s.topDiff, event.Diff) + insertTopDiff(&s.topDifficulties, event.Diff) if event.Latency > 0 { - s.latency = appendCappedLatency(s.latency, event.Latency) + s.latencySamples = appendCappedLatency(s.latencySamples, event.Latency) } s.mu.Unlock() } @@ -116,7 +116,7 @@ func (s *Stats) OnReject(event Event) { } if event.Latency > 0 { s.mu.Lock() - s.latency = appendCappedLatency(s.latency, event.Latency) + s.latencySamples = appendCappedLatency(s.latencySamples, event.Latency) s.mu.Unlock() } } @@ -148,7 +148,7 @@ func (s *Stats) Summary() StatsSummary { summary.Invalid = s.invalid.Load() summary.Expired = s.expired.Load() summary.Hashes = s.hashes.Load() - summary.TopDiff = s.topDiff + summary.TopDiff = s.topDifficulties for index := 0; index < HashrateWindowAll; index++ { windowSize := hashrateWindowSizes[index] @@ -163,8 +163,8 @@ func (s *Stats) Summary() StatsSummary { summary.AvgTime = uint32(uptimeSeconds / summary.Accepted) } - if len(s.latency) > 0 { - values := slices.Clone(s.latency) + if len(s.latencySamples) > 0 { + values := slices.Clone(s.latencySamples) sort.Slice(values, func(left int, right int) bool { return values[left] < values[right] })