From f0477b99800213c0bbab4289d489faf6cace4cf3 Mon Sep 17 00:00:00 2001 From: Virgil Date: Sat, 4 Apr 2026 12:01:40 +0000 Subject: [PATCH] refactor(proxy): use semantic runtime names Co-Authored-By: Virgil --- proxy_http_runtime.go | 100 +++++++++++++++++++++++++------------- proxy_runtime.go | 108 +++++++++++++++++++++--------------------- runtime_support.go | 84 ++++++++++++++++---------------- server_runtime.go | 40 ++++++++-------- 4 files changed, 184 insertions(+), 148 deletions(-) diff --git a/proxy_http_runtime.go b/proxy_http_runtime.go index 4c0d545..b6bcb36 100644 --- a/proxy_http_runtime.go +++ b/proxy_http_runtime.go @@ -12,6 +12,44 @@ import ( const proxyAPIVersion = "1.0.0" +type summaryHTTPResponse struct { + Version string `json:"version"` + Mode string `json:"mode"` + Hashrate summaryHashrate `json:"hashrate"` + Miners summaryMinerCounts `json:"miners"` + Workers uint64 `json:"workers"` + Upstreams summaryUpstreams `json:"upstreams"` + Results summaryResults `json:"results"` +} + +type summaryHashrate struct { + Total [6]float64 `json:"total"` +} + +type summaryMinerCounts struct { + Now uint64 `json:"now"` + Max uint64 `json:"max"` +} + +type summaryUpstreams struct { + Active uint64 `json:"active"` + Sleep uint64 `json:"sleep"` + Error uint64 `json:"error"` + Total uint64 `json:"total"` + Ratio float64 `json:"ratio"` +} + +type summaryResults struct { + Accepted uint64 `json:"accepted"` + Rejected uint64 `json:"rejected"` + Invalid uint64 `json:"invalid"` + Expired uint64 `json:"expired"` + AvgTime uint32 `json:"avg_time"` + Latency uint32 `json:"latency"` + HashesTotal uint64 `json:"hashes_total"` + Best [10]uint64 `json:"best"` +} + func startHTTPServer(p *Proxy) { if p == nil || p.config == nil || !p.config.HTTP.Enabled || p.httpServer != nil { return @@ -59,40 +97,38 @@ func registerMonitoringRoutes(router *http.ServeMux, proxyValue *Proxy) { return } summary := proxyValue.Summary() - response := map[string]interface{}{ - "version": proxyAPIVersion, - "mode": proxyValue.Mode(), - "hashrate": map[string]interface{}{ - "total": summary.Hashrate, + upstreams := proxyValue.Upstreams() + ratio := 0.0 + if upstreams.Total > 0 { + ratio = float64(proxyValue.CurrentMiners()) / float64(upstreams.Total) + } + response := summaryHTTPResponse{ + Version: proxyAPIVersion, + Mode: proxyValue.Mode(), + Hashrate: summaryHashrate{ + Total: summary.Hashrate, }, - "miners": map[string]uint64{ - "now": proxyValue.CurrentMiners(), - "max": proxyValue.MaxMiners(), + Miners: summaryMinerCounts{ + Now: proxyValue.CurrentMiners(), + Max: proxyValue.MaxMiners(), }, - "workers": uint64(len(proxyValue.Workers())), - "upstreams": func() map[string]interface{} { - upstreams := proxyValue.Upstreams() - ratio := 0.0 - if upstreams.Total > 0 { - ratio = float64(proxyValue.CurrentMiners()) / float64(upstreams.Total) - } - return map[string]interface{}{ - "active": upstreams.Active, - "sleep": upstreams.Sleep, - "error": upstreams.Error, - "total": upstreams.Total, - "ratio": ratio, - } - }(), - "results": map[string]interface{}{ - "accepted": summary.Accepted, - "rejected": summary.Rejected, - "invalid": summary.Invalid, - "expired": summary.Expired, - "avg_time": summary.AvgTime, - "latency": summary.AvgLatency, - "hashes_total": summary.Hashes, - "best": summary.TopDiff, + Workers: uint64(len(proxyValue.Workers())), + Upstreams: summaryUpstreams{ + Active: upstreams.Active, + Sleep: upstreams.Sleep, + Error: upstreams.Error, + Total: upstreams.Total, + Ratio: ratio, + }, + Results: summaryResults{ + Accepted: summary.Accepted, + Rejected: summary.Rejected, + Invalid: summary.Invalid, + Expired: summary.Expired, + AvgTime: summary.AvgTime, + Latency: summary.AvgLatency, + HashesTotal: summary.Hashes, + Best: summary.TopDiff, }, } writeHTTPJSON(writer, response) diff --git a/proxy_runtime.go b/proxy_runtime.go index df339d2..531f8bd 100644 --- a/proxy_runtime.go +++ b/proxy_runtime.go @@ -13,65 +13,65 @@ type splitterShutdown interface { // New creates and wires all subsystems but does not start the tick loop or TCP listeners. // -// p, errorValue := proxy.New(cfg) -func New(cfg *Config) (*Proxy, error) { - if errorValue := cfg.Validate(); errorValue != nil { +// p, errorValue := proxy.New(config) +func New(config *Config) (*Proxy, error) { + if errorValue := config.Validate(); errorValue != nil { return nil, errorValue } - events := NewEventBus() - stats := NewStats() - customDiff := NewCustomDiff(cfg.CustomDiff) - events.Subscribe(EventLogin, customDiff.OnLogin) - workers := NewWorkers(cfg.Workers, events) - workers.SetCustomDiffStats(cfg.CustomDiffStats) - splitter := newSplitter(cfg, events) + eventBus := NewEventBus() + statsValue := NewStats() + customDiffFilter := NewCustomDiff(config.CustomDiff) + eventBus.Subscribe(EventLogin, customDiffFilter.OnLogin) + workersValue := NewWorkers(config.Workers, eventBus) + workersValue.SetCustomDiffStats(config.CustomDiffStats) + splitterValue := newSplitter(config, eventBus) - proxyValue := &Proxy{ - config: cfg, - customDiff: customDiff, - splitter: splitter, - stats: stats, - workers: workers, - events: events, + proxyInstance := &Proxy{ + config: config, + customDiff: customDiffFilter, + splitter: splitterValue, + stats: statsValue, + workers: workersValue, + events: eventBus, miners: make(map[int64]*Miner), - rateLimiter: NewRateLimiter(cfg.RateLimit), + rateLimiter: NewRateLimiter(config.RateLimit), done: make(chan struct{}), } - proxyValue.accessLogger = subscribeAccessLog(events, cfg.AccessLogFile) - proxyValue.shareLogger = subscribeShareLog(events, cfg.ShareLogFile) + proxyInstance.accessLogger = subscribeAccessLog(eventBus, config.AccessLogFile) + proxyInstance.shareLogger = subscribeShareLog(eventBus, config.ShareLogFile) - events.Subscribe(EventLogin, func(event Event) { + eventBus.Subscribe(EventLogin, func(event Event) { if event.Miner != nil { - proxyValue.minerMu.Lock() - proxyValue.miners[event.Miner.ID()] = event.Miner - proxyValue.minerMu.Unlock() + proxyInstance.minerMu.Lock() + proxyInstance.miners[event.Miner.ID()] = event.Miner + proxyInstance.minerMu.Unlock() } - stats.connections.Add(1) - current := proxyValue.currentMiners.Add(1) + statsValue.connections.Add(1) + current := proxyInstance.currentMiners.Add(1) for { - maximum := stats.maxMiners.Load() - if current <= maximum || stats.maxMiners.CompareAndSwap(maximum, current) { + maximum := statsValue.maxMiners.Load() + if current <= maximum || statsValue.maxMiners.CompareAndSwap(maximum, current) { break } } }) - events.Subscribe(EventClose, func(event Event) { + eventBus.Subscribe(EventClose, func(event Event) { if event.Miner != nil { - proxyValue.minerMu.Lock() - delete(proxyValue.miners, event.Miner.ID()) - proxyValue.minerMu.Unlock() + proxyInstance.minerMu.Lock() + delete(proxyInstance.miners, event.Miner.ID()) + proxyInstance.minerMu.Unlock() } - if proxyValue.currentMiners.Load() > 0 { - proxyValue.currentMiners.Add(^uint64(0)) + if proxyInstance.currentMiners.Load() > 0 { + proxyInstance.currentMiners.Add(^uint64(0)) } }) - events.Subscribe(EventAccept, stats.OnAccept) - events.Subscribe(EventReject, stats.OnReject) - if splitter != nil { - events.Subscribe(EventSubmit, func(event Event) { - splitter.OnSubmit(&SubmitEvent{ + eventBus.Subscribe(EventAccept, statsValue.OnAccept) + eventBus.Subscribe(EventReject, statsValue.OnReject) + if splitterValue != nil { + eventBus.Subscribe(EventSubmit, func(event Event) { + splitterValue.OnSubmit(&SubmitEvent{ Miner: event.Miner, JobID: event.JobID, Nonce: event.Nonce, @@ -80,19 +80,19 @@ func New(cfg *Config) (*Proxy, error) { RequestID: event.RequestID, }) }) - events.Subscribe(EventLogin, func(event Event) { - splitter.OnLogin(&LoginEvent{Miner: event.Miner}) + eventBus.Subscribe(EventLogin, func(event Event) { + splitterValue.OnLogin(&LoginEvent{Miner: event.Miner}) }) - events.Subscribe(EventClose, func(event Event) { - splitter.OnClose(&CloseEvent{Miner: event.Miner}) + eventBus.Subscribe(EventClose, func(event Event) { + splitterValue.OnClose(&CloseEvent{Miner: event.Miner}) }) } - if cfg.Watch && cfg.sourcePath != "" { - proxyValue.watcher = NewConfigWatcher(cfg.sourcePath, proxyValue.Reload) - proxyValue.watcher.Start() + if config.Watch && config.sourcePath != "" { + proxyInstance.watcher = NewConfigWatcher(config.sourcePath, proxyInstance.Reload) + proxyInstance.watcher.Start() } - return proxyValue, nil + return proxyInstance, nil } // Start begins the TCP listener(s), pool connections, and tick loop. @@ -222,14 +222,14 @@ func (p *Proxy) Stop() { // Reload replaces the live config. // // p.Reload(newCfg) -func (p *Proxy) Reload(cfg *Config) { - if cfg != nil { +func (p *Proxy) Reload(config *Config) { + if config != nil { if p.config == nil { - p.config = cfg + p.config = config } else { sourcePath := p.config.sourcePath bind := append([]BindAddr(nil), p.config.Bind...) - *p.config = *cfg + *p.config = *config p.config.sourcePath = sourcePath p.config.Bind = bind } @@ -317,11 +317,11 @@ func (p *Proxy) Upstreams() UpstreamStats { } func (p *Proxy) acceptConn(conn net.Conn, localPort uint16) { - var tlsCfg *tls.Config + var tlsConfig *tls.Config if _, ok := conn.(*tls.Conn); ok { - tlsCfg = &tls.Config{} + tlsConfig = &tls.Config{} } - miner := NewMiner(conn, localPort, tlsCfg) + miner := NewMiner(conn, localPort, tlsConfig) miner.events = p.events miner.splitter = p.splitter if p.config != nil { diff --git a/runtime_support.go b/runtime_support.go index fc86333..fca5ab2 100644 --- a/runtime_support.go +++ b/runtime_support.go @@ -20,54 +20,54 @@ func NewRateLimiter(config RateLimit) *RateLimiter { // SetConfig replaces the active rate-limit settings. // // rl.SetConfig(proxy.RateLimit{MaxConnectionsPerMinute: 30, BanDurationSeconds: 300}) -func (rl *RateLimiter) SetConfig(config RateLimit) { - if rl == nil { +func (rateLimiter *RateLimiter) SetConfig(config RateLimit) { + if rateLimiter == nil { return } - rl.mu.Lock() - rl.cfg = config - rl.mu.Unlock() + rateLimiter.mu.Lock() + rateLimiter.cfg = config + rateLimiter.mu.Unlock() } // Allow returns true if the IP address is permitted to open a new connection. Thread-safe. // // if rl.Allow(conn.RemoteAddr().String()) { proceed() } -func (rl *RateLimiter) Allow(ip string) bool { - if rl == nil { +func (rateLimiter *RateLimiter) Allow(ip string) bool { + if rateLimiter == nil { return true } host := remoteHost(ip) now := time.Now().UTC() - rl.mu.Lock() - defer rl.mu.Unlock() + rateLimiter.mu.Lock() + defer rateLimiter.mu.Unlock() - if rl.cfg.MaxConnectionsPerMinute <= 0 { + if rateLimiter.cfg.MaxConnectionsPerMinute <= 0 { return true } - if bannedUntil, exists := rl.banned[host]; exists { + if bannedUntil, exists := rateLimiter.banned[host]; exists { if bannedUntil.After(now) { return false } - delete(rl.banned, host) + delete(rateLimiter.banned, host) } - bucket, exists := rl.buckets[host] + bucket, exists := rateLimiter.buckets[host] if !exists { bucket = &tokenBucket{ - tokens: rl.cfg.MaxConnectionsPerMinute, + tokens: rateLimiter.cfg.MaxConnectionsPerMinute, lastRefill: now, } - rl.buckets[host] = bucket + rateLimiter.buckets[host] = bucket } - rl.refillBucket(bucket, now) + rateLimiter.refillBucket(bucket, now) if bucket.tokens <= 0 { - if rl.cfg.BanDurationSeconds > 0 { - rl.banned[host] = now.Add(time.Duration(rl.cfg.BanDurationSeconds) * time.Second) + if rateLimiter.cfg.BanDurationSeconds > 0 { + rateLimiter.banned[host] = now.Add(time.Duration(rateLimiter.cfg.BanDurationSeconds) * time.Second) } return false } @@ -79,36 +79,36 @@ func (rl *RateLimiter) Allow(ip string) bool { // Tick removes expired ban entries and refills all token buckets. Called every second. // // rl.Tick() -func (rl *RateLimiter) Tick() { - if rl == nil { +func (rateLimiter *RateLimiter) Tick() { + if rateLimiter == nil { return } now := time.Now().UTC() - rl.mu.Lock() - defer rl.mu.Unlock() + rateLimiter.mu.Lock() + defer rateLimiter.mu.Unlock() - if rl.cfg.MaxConnectionsPerMinute <= 0 { + if rateLimiter.cfg.MaxConnectionsPerMinute <= 0 { return } - for host, bannedUntil := range rl.banned { + for host, bannedUntil := range rateLimiter.banned { if !bannedUntil.After(now) { - delete(rl.banned, host) + delete(rateLimiter.banned, host) } } - for _, bucket := range rl.buckets { - rl.refillBucket(bucket, now) + for _, bucket := range rateLimiter.buckets { + rateLimiter.refillBucket(bucket, now) } } -func (rl *RateLimiter) refillBucket(bucket *tokenBucket, now time.Time) { - if bucket == nil || rl.cfg.MaxConnectionsPerMinute <= 0 { +func (rateLimiter *RateLimiter) refillBucket(bucket *tokenBucket, now time.Time) { + if bucket == nil || rateLimiter.cfg.MaxConnectionsPerMinute <= 0 { return } - refillEvery := time.Minute / time.Duration(rl.cfg.MaxConnectionsPerMinute) + refillEvery := time.Minute / time.Duration(rateLimiter.cfg.MaxConnectionsPerMinute) if refillEvery <= 0 { refillEvery = time.Second } @@ -120,8 +120,8 @@ func (rl *RateLimiter) refillBucket(bucket *tokenBucket, now time.Time) { tokensToAdd := int(elapsed / refillEvery) bucket.tokens += tokensToAdd - if bucket.tokens > rl.cfg.MaxConnectionsPerMinute { - bucket.tokens = rl.cfg.MaxConnectionsPerMinute + if bucket.tokens > rateLimiter.cfg.MaxConnectionsPerMinute { + bucket.tokens = rateLimiter.cfg.MaxConnectionsPerMinute } bucket.lastRefill = bucket.lastRefill.Add(time.Duration(tokensToAdd) * refillEvery) } @@ -136,20 +136,20 @@ func NewCustomDiff(globalDiff uint64) *CustomDiff { // SetGlobalDiff updates the default custom difficulty override. // // cd.SetGlobalDiff(100000) -func (cd *CustomDiff) SetGlobalDiff(globalDiff uint64) { - if cd == nil { +func (customDiff *CustomDiff) SetGlobalDiff(globalDiff uint64) { + if customDiff == nil { return } - cd.mu.Lock() - cd.globalDiff = globalDiff - cd.mu.Unlock() + customDiff.mu.Lock() + customDiff.globalDiff = globalDiff + customDiff.mu.Unlock() } // OnLogin parses miner.User for a "+{number}" suffix and sets miner.CustomDiff. // // cd.OnLogin(proxy.Event{Miner: miner}) -func (cd *CustomDiff) OnLogin(event Event) { +func (customDiff *CustomDiff) OnLogin(event Event) { if event.Miner == nil { return } @@ -165,13 +165,13 @@ func (cd *CustomDiff) OnLogin(event Event) { return } - if cd == nil { + if customDiff == nil { return } - cd.mu.RLock() - globalDiff := cd.globalDiff - cd.mu.RUnlock() + customDiff.mu.RLock() + globalDiff := customDiff.globalDiff + customDiff.mu.RUnlock() if globalDiff > 0 { event.Miner.SetCustomDiff(globalDiff) } diff --git a/server_runtime.go b/server_runtime.go index 6fae353..b17f56e 100644 --- a/server_runtime.go +++ b/server_runtime.go @@ -9,18 +9,18 @@ import ( // NewServer binds one miner-facing TCP listener. // -// srv, errorValue := proxy.NewServer(bind, nil, limiter, onAccept) -func NewServer(bind BindAddr, tlsCfg *tls.Config, limiter *RateLimiter, onAccept func(net.Conn, uint16)) (*Server, error) { - address := net.JoinHostPort(bind.Host, strconv.Itoa(int(bind.Port))) +// srv, errorValue := proxy.NewServer(bindAddress, nil, rateLimiter, onAccept) +func NewServer(bindAddress BindAddr, tlsConfig *tls.Config, rateLimiter *RateLimiter, onAccept func(net.Conn, uint16)) (*Server, error) { + address := net.JoinHostPort(bindAddress.Host, strconv.Itoa(int(bindAddress.Port))) listener, errorValue := net.Listen("tcp", address) if errorValue != nil { return nil, errorValue } return &Server{ - addr: bind, - tlsCfg: tlsCfg, - limiter: limiter, + addr: bindAddress, + tlsCfg: tlsConfig, + limiter: rateLimiter, onAccept: onAccept, listener: listener, done: make(chan struct{}), @@ -30,37 +30,37 @@ func NewServer(bind BindAddr, tlsCfg *tls.Config, limiter *RateLimiter, onAccept // Start begins accepting connections in a goroutine. // // srv.Start() -func (s *Server) Start() { - if s == nil || s.listener == nil { +func (server *Server) Start() { + if server == nil || server.listener == nil { return } go func() { for { - conn, errorValue := s.listener.Accept() + conn, errorValue := server.listener.Accept() if errorValue != nil { select { - case <-s.done: + case <-server.done: return default: continue } } - if s.limiter != nil && !s.limiter.Allow(conn.RemoteAddr().String()) { + if server.limiter != nil && !server.limiter.Allow(conn.RemoteAddr().String()) { _ = conn.Close() continue } - if s.tlsCfg != nil { - conn = tls.Server(conn, s.tlsCfg) + if server.tlsCfg != nil { + conn = tls.Server(conn, server.tlsCfg) } - if s.onAccept == nil { + if server.onAccept == nil { _ = conn.Close() continue } - s.onAccept(conn, s.addr.Port) + server.onAccept(conn, server.addr.Port) } }() } @@ -68,17 +68,17 @@ func (s *Server) Start() { // Stop closes the listener. // // srv.Stop() -func (s *Server) Stop() { - if s == nil || s.listener == nil { +func (server *Server) Stop() { + if server == nil || server.listener == nil { return } select { - case <-s.done: + case <-server.done: default: - close(s.done) + close(server.done) } - _ = s.listener.Close() + _ = server.listener.Close() } var errServerClosed = errors.New("server closed")