refactor(ax): expand internal naming

Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
Virgil 2026-04-04 14:33:44 +00:00
parent 8798eea2a0
commit 5190caf9d6
10 changed files with 111 additions and 111 deletions

View file

@ -82,7 +82,7 @@ func (w *ConfigWatcher) Start() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
if info, errorValue := os.Stat(w.path); errorValue == nil {
w.lastModified = info.ModTime()
w.lastModifiedAt = info.ModTime()
}
for {
@ -93,10 +93,10 @@ func (w *ConfigWatcher) Start() {
continue
}
if !info.ModTime().After(w.lastModified) {
if !info.ModTime().After(w.lastModifiedAt) {
continue
}
w.lastModified = info.ModTime()
w.lastModifiedAt = info.ModTime()
config, errorValue := LoadConfig(w.path)
if errorValue == nil && w.onChange != nil {

View file

@ -7,14 +7,14 @@ import (
"time"
)
var minerSequence atomic.Int64
var minerIDSequence atomic.Int64
// NewMiner creates a Miner for an accepted net.Conn. Does not start reading yet.
//
// m := proxy.NewMiner(conn, 3333, nil)
func NewMiner(conn net.Conn, localPort uint16, tlsCfg *tls.Config) *Miner {
miner := &Miner{
id: minerSequence.Add(1),
id: minerIDSequence.Add(1),
state: MinerStateWaitLogin,
localPort: localPort,
mapperID: -1,

View file

@ -27,15 +27,15 @@ import (
// client := pool.NewStratumClient(poolCfg, listener)
// client.Connect()
type StratumClient struct {
config proxy.PoolConfig
listener StratumListener
conn net.Conn
tlsConn *tls.Conn // nil if plain TCP
sessionID string // pool-assigned session id from login reply
sequence int64 // atomic JSON-RPC request id counter
active bool // true once first job received
disconnectOnce sync.Once
sendMu sync.Mutex
config proxy.PoolConfig
listener StratumListener
conn net.Conn
tlsConn *tls.Conn // nil if plain TCP
sessionID string // pool-assigned session id from login reply
requestSequence int64 // atomic JSON-RPC request id counter
active bool // true once first job received
disconnectOnce sync.Once
sendMu sync.Mutex
}
// StratumListener receives events from the pool connection.
@ -158,7 +158,7 @@ func (c *StratumClient) Login() {
//
// seq := client.Submit("job-1", "deadbeef", "HASH64HEX", "cn/r")
func (c *StratumClient) Submit(jobID string, nonce string, result string, algo string) int64 {
sequence := atomic.AddInt64(&c.sequence, 1)
requestID := atomic.AddInt64(&c.requestSequence, 1)
params := map[string]string{
"id": c.sessionID,
"job_id": jobID,
@ -170,12 +170,12 @@ func (c *StratumClient) Submit(jobID string, nonce string, result string, algo s
}
_ = c.writeJSON(jsonRPCRequest{
ID: sequence,
ID: requestID,
Method: "submit",
Params: params,
})
return sequence
return requestID
}
// Disconnect closes the connection cleanly. Triggers OnDisconnect on the listener.

View file

@ -23,23 +23,23 @@ import (
// p, result := proxy.New(cfg)
// if result.OK { p.Start() }
type Proxy struct {
config *Config
customDiff *CustomDiff
rateLimiter *RateLimiter
splitter Splitter
stats *Stats
workers *Workers
events *EventBus
currentMiners atomic.Uint64
miners map[int64]*Miner
minerMu sync.RWMutex
servers []*Server
httpServer *http.Server
accessLogger *appendLineLogger
shareLogger *appendLineLogger
ticker *time.Ticker
watcher *ConfigWatcher
done chan struct{}
config *Config
customDifficulty *CustomDiff
rateLimiter *RateLimiter
splitter Splitter
stats *Stats
workers *Workers
events *EventBus
currentMiners atomic.Uint64
miners map[int64]*Miner
minerMu sync.RWMutex
servers []*Server
httpServer *http.Server
accessLogger *appendLineLogger
shareLogger *appendLineLogger
ticker *time.Ticker
watcher *ConfigWatcher
done chan struct{}
}
// Splitter is the interface both NonceSplitter and SimpleSplitter satisfy.
@ -96,10 +96,10 @@ type CloseEvent struct {
// })
// w.Start()
type ConfigWatcher struct {
path string
onChange func(*Config)
lastModified time.Time
done chan struct{}
path string
onChange func(*Config)
lastModifiedAt time.Time
done chan struct{}
}
// RateLimiter implements per-IP token bucket connection rate limiting.
@ -110,10 +110,10 @@ type ConfigWatcher struct {
// rl := proxy.NewRateLimiter(cfg.RateLimit)
// if !rl.Allow("1.2.3.4") { conn.Close(); return }
type RateLimiter struct {
config RateLimit
buckets map[string]*tokenBucket
banned map[string]time.Time
mu sync.Mutex
limitConfig RateLimit
buckets map[string]*tokenBucket
banned map[string]time.Time
mu sync.Mutex
}
// tokenBucket is a simple token bucket for one IP.
@ -128,8 +128,8 @@ type tokenBucket struct {
// cd := proxy.NewCustomDiff(cfg.CustomDiff)
// bus.Subscribe(proxy.EventLogin, cd.OnLogin)
type CustomDiff struct {
defaultDifficulty uint64
mu sync.RWMutex
globalDifficulty uint64
mu sync.RWMutex
}
var splitterFactories = map[string]func(*Config, *EventBus) Splitter{

View file

@ -8,10 +8,10 @@ import (
)
type appendLineLogger struct {
path string
mu sync.Mutex
file *os.File
closed bool
path string
mu sync.Mutex
openFile *os.File
closed bool
}
func newAppendLineLogger(path string) *appendLineLogger {
@ -30,15 +30,15 @@ func (l *appendLineLogger) writeLine(line string) {
return
}
if l.file == nil {
if l.openFile == nil {
file, errorValue := os.OpenFile(l.path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644)
if errorValue != nil {
return
}
l.file = file
l.openFile = file
}
_, _ = l.file.WriteString(line)
_, _ = l.openFile.WriteString(line)
}
func (l *appendLineLogger) setPath(path string) {
@ -53,9 +53,9 @@ func (l *appendLineLogger) setPath(path string) {
return
}
if l.file != nil {
_ = l.file.Close()
l.file = nil
if l.openFile != nil {
_ = l.openFile.Close()
l.openFile = nil
}
l.closed = false
l.path = path
@ -73,9 +73,9 @@ func (l *appendLineLogger) close() {
return
}
l.closed = true
if l.file != nil {
_ = l.file.Close()
l.file = nil
if l.openFile != nil {
_ = l.openFile.Close()
l.openFile = nil
}
}

View file

@ -25,22 +25,22 @@ func New(config *Config) (*Proxy, error) {
eventBus := NewEventBus()
statsValue := NewStats()
customDiffFilter := NewCustomDiff(config.CustomDiff)
eventBus.Subscribe(EventLogin, customDiffFilter.OnLogin)
customDifficultyFilter := NewCustomDiff(config.CustomDiff)
eventBus.Subscribe(EventLogin, customDifficultyFilter.OnLogin)
workersValue := NewWorkers(config.Workers, eventBus)
workersValue.SetCustomDiffStats(config.CustomDiffStats)
splitterValue := newSplitter(config, eventBus)
proxyInstance := &Proxy{
config: config,
customDiff: customDiffFilter,
splitter: splitterValue,
stats: statsValue,
workers: workersValue,
events: eventBus,
miners: make(map[int64]*Miner),
rateLimiter: NewRateLimiter(config.RateLimit),
done: make(chan struct{}),
config: config,
customDifficulty: customDifficultyFilter,
splitter: splitterValue,
stats: statsValue,
workers: workersValue,
events: eventBus,
miners: make(map[int64]*Miner),
rateLimiter: NewRateLimiter(config.RateLimit),
done: make(chan struct{}),
}
proxyInstance.accessLogger = subscribeAccessLog(eventBus, config.AccessLogFile)
@ -236,8 +236,8 @@ func (p *Proxy) Reload(config *Config) {
p.config.sourcePath = sourcePath
p.config.Bind = bind
}
if p.customDiff != nil {
p.customDiff.SetGlobalDiff(p.config.CustomDiff)
if p.customDifficulty != nil {
p.customDifficulty.SetGlobalDiff(p.config.CustomDiff)
}
if p.workers != nil {
p.workers.SetCustomDiffStats(p.config.CustomDiffStats)

View file

@ -48,8 +48,8 @@ func TestProxy_Reload_Good(t *testing.T) {
if proxyValue.config.CustomDiff != 250 {
t.Fatalf("expected custom diff to reload, got %d", proxyValue.config.CustomDiff)
}
if proxyValue.customDiff == nil || proxyValue.customDiff.defaultDifficulty != 250 {
t.Fatalf("expected live custom diff to update, got %+v", proxyValue.customDiff)
if proxyValue.customDifficulty == nil || proxyValue.customDifficulty.globalDifficulty != 250 {
t.Fatalf("expected live custom diff to update, got %+v", proxyValue.customDifficulty)
}
proxyValue.events.Dispatch(Event{Type: EventLogin, Miner: miner})

View file

@ -11,9 +11,9 @@ import (
// rl := proxy.NewRateLimiter(cfg.RateLimit)
func NewRateLimiter(config RateLimit) *RateLimiter {
return &RateLimiter{
config: config,
buckets: make(map[string]*tokenBucket),
banned: make(map[string]time.Time),
limitConfig: config,
buckets: make(map[string]*tokenBucket),
banned: make(map[string]time.Time),
}
}
@ -26,7 +26,7 @@ func (rateLimiter *RateLimiter) SetConfig(config RateLimit) {
}
rateLimiter.mu.Lock()
rateLimiter.config = config
rateLimiter.limitConfig = config
rateLimiter.mu.Unlock()
}
@ -44,7 +44,7 @@ func (rateLimiter *RateLimiter) Allow(ip string) bool {
rateLimiter.mu.Lock()
defer rateLimiter.mu.Unlock()
if rateLimiter.config.MaxConnectionsPerMinute <= 0 {
if rateLimiter.limitConfig.MaxConnectionsPerMinute <= 0 {
return true
}
@ -58,7 +58,7 @@ func (rateLimiter *RateLimiter) Allow(ip string) bool {
bucket, exists := rateLimiter.buckets[host]
if !exists {
bucket = &tokenBucket{
tokens: rateLimiter.config.MaxConnectionsPerMinute,
tokens: rateLimiter.limitConfig.MaxConnectionsPerMinute,
lastRefill: now,
}
rateLimiter.buckets[host] = bucket
@ -66,8 +66,8 @@ func (rateLimiter *RateLimiter) Allow(ip string) bool {
rateLimiter.refillBucket(bucket, now)
if bucket.tokens <= 0 {
if rateLimiter.config.BanDurationSeconds > 0 {
rateLimiter.banned[host] = now.Add(time.Duration(rateLimiter.config.BanDurationSeconds) * time.Second)
if rateLimiter.limitConfig.BanDurationSeconds > 0 {
rateLimiter.banned[host] = now.Add(time.Duration(rateLimiter.limitConfig.BanDurationSeconds) * time.Second)
}
return false
}
@ -88,7 +88,7 @@ func (rateLimiter *RateLimiter) Tick() {
rateLimiter.mu.Lock()
defer rateLimiter.mu.Unlock()
if rateLimiter.config.MaxConnectionsPerMinute <= 0 {
if rateLimiter.limitConfig.MaxConnectionsPerMinute <= 0 {
return
}
@ -104,11 +104,11 @@ func (rateLimiter *RateLimiter) Tick() {
}
func (rateLimiter *RateLimiter) refillBucket(bucket *tokenBucket, now time.Time) {
if bucket == nil || rateLimiter.config.MaxConnectionsPerMinute <= 0 {
if bucket == nil || rateLimiter.limitConfig.MaxConnectionsPerMinute <= 0 {
return
}
refillEvery := time.Minute / time.Duration(rateLimiter.config.MaxConnectionsPerMinute)
refillEvery := time.Minute / time.Duration(rateLimiter.limitConfig.MaxConnectionsPerMinute)
if refillEvery <= 0 {
refillEvery = time.Second
}
@ -120,8 +120,8 @@ func (rateLimiter *RateLimiter) refillBucket(bucket *tokenBucket, now time.Time)
tokensToAdd := int(elapsed / refillEvery)
bucket.tokens += tokensToAdd
if bucket.tokens > rateLimiter.config.MaxConnectionsPerMinute {
bucket.tokens = rateLimiter.config.MaxConnectionsPerMinute
if bucket.tokens > rateLimiter.limitConfig.MaxConnectionsPerMinute {
bucket.tokens = rateLimiter.limitConfig.MaxConnectionsPerMinute
}
bucket.lastRefill = bucket.lastRefill.Add(time.Duration(tokensToAdd) * refillEvery)
}
@ -130,7 +130,7 @@ func (rateLimiter *RateLimiter) refillBucket(bucket *tokenBucket, now time.Time)
//
// cd := proxy.NewCustomDiff(50000)
func NewCustomDiff(globalDiff uint64) *CustomDiff {
return &CustomDiff{defaultDifficulty: globalDiff}
return &CustomDiff{globalDifficulty: globalDiff}
}
// SetGlobalDiff updates the default custom difficulty override.
@ -142,7 +142,7 @@ func (customDiff *CustomDiff) SetGlobalDiff(globalDiff uint64) {
}
customDiff.mu.Lock()
customDiff.defaultDifficulty = globalDiff
customDiff.globalDifficulty = globalDiff
customDiff.mu.Unlock()
}
@ -172,7 +172,7 @@ func (customDiff *CustomDiff) OnLogin(event Event) {
}
customDiff.mu.RLock()
globalDiff := customDiff.defaultDifficulty
globalDiff := customDiff.globalDifficulty
customDiff.mu.RUnlock()
if globalDiff > 0 {
event.Miner.SetCustomDiff(globalDiff)

View file

@ -25,7 +25,7 @@ type SimpleSplitter struct {
events *proxy.EventBus
strategyFactory pool.StrategyFactory
mu sync.Mutex
sequence int64 // monotonic mapper sequence counter
mapperSequence int64 // monotonic mapper sequence counter
}
// NewSimpleSplitter creates the passthrough splitter.
@ -72,9 +72,9 @@ func (s *SimpleSplitter) OnLogin(event *proxy.LoginEvent) {
}
if mapper == nil {
s.sequence++
s.mapperSequence++
var strategy pool.Strategy
mapper = NewSimpleMapper(s.sequence, nil)
mapper = NewSimpleMapper(s.mapperSequence, nil)
mapper.events = s.events
if s.strategyFactory != nil {
strategy = s.strategyFactory(mapper)

View file

@ -15,18 +15,18 @@ import (
// bus.Subscribe(proxy.EventAccept, s.OnAccept)
// bus.Subscribe(proxy.EventReject, s.OnReject)
type Stats struct {
accepted atomic.Uint64
rejected atomic.Uint64
invalid atomic.Uint64
expired atomic.Uint64
hashes atomic.Uint64 // cumulative sum of accepted share difficulties
connections atomic.Uint64 // total TCP connections accepted (ever)
maxMiners atomic.Uint64 // peak concurrent miner count
topDiff [10]uint64 // top-10 accepted difficulties, sorted descending; guarded by mu
latency []uint16 // pool response latencies in ms; capped at 10000 samples; guarded by mu
windows [6]tickWindow // one per hashrate reporting period
startTime time.Time
mu sync.Mutex
accepted atomic.Uint64
rejected atomic.Uint64
invalid atomic.Uint64
expired atomic.Uint64
hashes atomic.Uint64 // cumulative sum of accepted share difficulties
connections atomic.Uint64 // total TCP connections accepted (ever)
maxMiners atomic.Uint64 // peak concurrent miner count
topDifficulties [10]uint64 // top-10 accepted difficulties, sorted descending; guarded by mu
latencySamples []uint16 // pool response latencies in ms; capped at 10000 samples; guarded by mu
windows [6]tickWindow // one per hashrate reporting period
startTime time.Time
mu sync.Mutex
}
// Hashrate window sizes in seconds. Index maps to Stats.windows and SummaryResponse.Hashrate.
@ -68,8 +68,8 @@ var hashrateWindowSizes = [5]int{60, 600, 3600, 43200, 86400}
// s := proxy.NewStats()
func NewStats() *Stats {
stats := &Stats{
startTime: time.Now().UTC(),
latency: make([]uint16, 0, 128),
startTime: time.Now().UTC(),
latencySamples: make([]uint16, 0, 128),
}
for index, size := range hashrateWindowSizes {
@ -96,9 +96,9 @@ func (s *Stats) OnAccept(event Event) {
for index := 0; index < HashrateWindowAll; index++ {
s.windows[index].buckets[s.windows[index].pos] += event.Diff
}
insertTopDiff(&s.topDiff, event.Diff)
insertTopDiff(&s.topDifficulties, event.Diff)
if event.Latency > 0 {
s.latency = appendCappedLatency(s.latency, event.Latency)
s.latencySamples = appendCappedLatency(s.latencySamples, event.Latency)
}
s.mu.Unlock()
}
@ -116,7 +116,7 @@ func (s *Stats) OnReject(event Event) {
}
if event.Latency > 0 {
s.mu.Lock()
s.latency = appendCappedLatency(s.latency, event.Latency)
s.latencySamples = appendCappedLatency(s.latencySamples, event.Latency)
s.mu.Unlock()
}
}
@ -148,7 +148,7 @@ func (s *Stats) Summary() StatsSummary {
summary.Invalid = s.invalid.Load()
summary.Expired = s.expired.Load()
summary.Hashes = s.hashes.Load()
summary.TopDiff = s.topDiff
summary.TopDiff = s.topDifficulties
for index := 0; index < HashrateWindowAll; index++ {
windowSize := hashrateWindowSizes[index]
@ -163,8 +163,8 @@ func (s *Stats) Summary() StatsSummary {
summary.AvgTime = uint32(uptimeSeconds / summary.Accepted)
}
if len(s.latency) > 0 {
values := slices.Clone(s.latency)
if len(s.latencySamples) > 0 {
values := slices.Clone(s.latencySamples)
sort.Slice(values, func(left int, right int) bool {
return values[left] < values[right]
})