diff --git a/configwatcher_test.go b/configwatcher_test.go index 5207694..76518f5 100644 --- a/configwatcher_test.go +++ b/configwatcher_test.go @@ -18,7 +18,7 @@ func TestConfigWatcher_New_Good(t *testing.T) { if watcher == nil { t.Fatal("expected watcher") } - if watcher.lastMod.IsZero() { + if watcher.lastModifiedAt.IsZero() { t.Fatal("expected last modification time to be initialised from the file") } } diff --git a/core_impl.go b/core_impl.go index cdc0428..60d2067 100644 --- a/core_impl.go +++ b/core_impl.go @@ -307,9 +307,9 @@ func (cd *CustomDiff) OnLogin(e Event) { // } func NewRateLimiter(config RateLimit) *RateLimiter { return &RateLimiter{ - config: config, - buckets: make(map[string]*tokenBucket), - banned: make(map[string]time.Time), + limit: config, + bucketByHost: make(map[string]*tokenBucket), + banUntilByHost: make(map[string]time.Time), } } @@ -317,7 +317,7 @@ func NewRateLimiter(config RateLimit) *RateLimiter { // // hostOnly("203.0.113.42:3333") == "203.0.113.42" // } func (rl *RateLimiter) Allow(ip string) bool { - if rl == nil || rl.config.MaxConnectionsPerMinute <= 0 { + if rl == nil || rl.limit.MaxConnectionsPerMinute <= 0 { return true } host := hostOnly(ip) @@ -326,23 +326,23 @@ func (rl *RateLimiter) Allow(ip string) bool { rl.mu.Lock() defer rl.mu.Unlock() - if until, banned := rl.banned[host]; banned { + if until, banned := rl.banUntilByHost[host]; banned { if now.Before(until) { return false } - delete(rl.banned, host) + delete(rl.banUntilByHost, host) } - bucket, ok := rl.buckets[host] + bucket, ok := rl.bucketByHost[host] if !ok { - bucket = &tokenBucket{tokens: rl.config.MaxConnectionsPerMinute, lastRefill: now} - rl.buckets[host] = bucket + bucket = &tokenBucket{tokens: rl.limit.MaxConnectionsPerMinute, lastRefill: now} + rl.bucketByHost[host] = bucket } - refillBucket(bucket, rl.config.MaxConnectionsPerMinute, now) + refillBucket(bucket, rl.limit.MaxConnectionsPerMinute, now) if bucket.tokens <= 0 { - if rl.config.BanDurationSeconds > 0 { - rl.banned[host] = now.Add(time.Duration(rl.config.BanDurationSeconds) * time.Second) + if rl.limit.BanDurationSeconds > 0 { + rl.banUntilByHost[host] = now.Add(time.Duration(rl.limit.BanDurationSeconds) * time.Second) } return false } @@ -356,7 +356,7 @@ func (rl *RateLimiter) Allow(ip string) bool { // // limiter.Tick() func (rl *RateLimiter) Tick() { - if rl == nil || rl.config.MaxConnectionsPerMinute <= 0 { + if rl == nil || rl.limit.MaxConnectionsPerMinute <= 0 { return } now := time.Now() @@ -364,13 +364,13 @@ func (rl *RateLimiter) Tick() { rl.mu.Lock() defer rl.mu.Unlock() - for host, until := range rl.banned { + for host, until := range rl.banUntilByHost { if !now.Before(until) { - delete(rl.banned, host) + delete(rl.banUntilByHost, host) } } - for _, bucket := range rl.buckets { - refillBucket(bucket, rl.config.MaxConnectionsPerMinute, now) + for _, bucket := range rl.bucketByHost { + refillBucket(bucket, rl.limit.MaxConnectionsPerMinute, now) } } @@ -381,19 +381,19 @@ func (rl *RateLimiter) Tick() { // watcher.Start() // polls once per second and reloads after the file mtime changes func NewConfigWatcher(configPath string, onChange func(*Config)) *ConfigWatcher { watcher := &ConfigWatcher{ - path: configPath, - onChange: onChange, - done: make(chan struct{}), + configPath: configPath, + onConfigChange: onChange, + stopCh: make(chan struct{}), } if info, err := os.Stat(configPath); err == nil { - watcher.lastMod = info.ModTime() + watcher.lastModifiedAt = info.ModTime() } return watcher } // watcher.Start() func (w *ConfigWatcher) Start() { - if w == nil || w.path == "" || w.onChange == nil { + if w == nil || w.configPath == "" || w.onConfigChange == nil { return } w.mu.Lock() @@ -401,18 +401,18 @@ func (w *ConfigWatcher) Start() { w.mu.Unlock() return } - if w.done == nil { - w.done = make(chan struct{}) + if w.stopCh == nil { + w.stopCh = make(chan struct{}) } else { select { - case <-w.done: - w.done = make(chan struct{}) + case <-w.stopCh: + w.stopCh = make(chan struct{}) default: } } - done := w.done - path := w.path - onChange := w.onChange + stopCh := w.stopCh + configPath := w.configPath + onConfigChange := w.onConfigChange w.started = true w.mu.Unlock() @@ -422,22 +422,22 @@ func (w *ConfigWatcher) Start() { for { select { case <-ticker.C: - if info, err := os.Stat(path); err == nil { + if info, err := os.Stat(configPath); err == nil { w.mu.Lock() - changed := info.ModTime() != w.lastMod + changed := info.ModTime() != w.lastModifiedAt if changed { - w.lastMod = info.ModTime() + w.lastModifiedAt = info.ModTime() } w.mu.Unlock() if !changed { continue } - config, result := LoadConfig(path) + config, result := LoadConfig(configPath) if result.OK && config != nil { - onChange(config) + onConfigChange(config) } } - case <-done: + case <-stopCh: return } } @@ -450,16 +450,16 @@ func (w *ConfigWatcher) Stop() { return } w.mu.Lock() - done := w.done + stopCh := w.stopCh w.started = false w.mu.Unlock() - if done == nil { + if stopCh == nil { return } select { - case <-done: + case <-stopCh: default: - close(done) + close(stopCh) } } diff --git a/proxy.go b/proxy.go index 6f2dc37..02f9a2a 100644 --- a/proxy.go +++ b/proxy.go @@ -123,12 +123,12 @@ type CloseEvent struct { // }) // watcher.Start() type ConfigWatcher struct { - path string - onChange func(*Config) - lastMod time.Time - done chan struct{} - mu sync.Mutex - started bool + configPath string + onConfigChange func(*Config) + lastModifiedAt time.Time + stopCh chan struct{} + mu sync.Mutex + started bool } // RateLimiter throttles new connections per source IP. @@ -141,10 +141,10 @@ type ConfigWatcher struct { // // accept the socket // } type RateLimiter struct { - config RateLimit - buckets map[string]*tokenBucket - banned map[string]time.Time - mu sync.Mutex + limit RateLimit + bucketByHost map[string]*tokenBucket + banUntilByHost map[string]time.Time + mu sync.Mutex } // tokenBucket is the per-IP refillable counter. diff --git a/ratelimit_test.go b/ratelimit_test.go index 6f10f00..c5f06ed 100644 --- a/ratelimit_test.go +++ b/ratelimit_test.go @@ -18,7 +18,7 @@ func TestRateLimiter_Allow(t *testing.T) { func TestRateLimiter_Allow_ReplenishesHighLimits(t *testing.T) { rl := NewRateLimiter(RateLimit{MaxConnectionsPerMinute: 120, BanDurationSeconds: 1}) rl.mu.Lock() - rl.buckets["1.2.3.4"] = &tokenBucket{ + rl.bucketByHost["1.2.3.4"] = &tokenBucket{ tokens: 0, lastRefill: time.Now().Add(-30 * time.Second), } diff --git a/reload_test.go b/reload_test.go index db47b18..6961f3e 100644 --- a/reload_test.go +++ b/reload_test.go @@ -155,7 +155,7 @@ func TestProxy_Reload_WatchEnabled_Good(t *testing.T) { if p.watcher == nil { t.Fatalf("expected reload to create a watcher when watch is enabled") } - if got := p.watcher.path; got != "/tmp/proxy.json" { + if got := p.watcher.configPath; got != "/tmp/proxy.json" { t.Fatalf("expected watcher to keep the original config path, got %q", got) } p.watcher.Stop() @@ -188,7 +188,7 @@ func TestProxy_Reload_WatchDisabled_Bad(t *testing.T) { t.Fatalf("expected reload to stop and clear the watcher when watch is disabled") } select { - case <-watcher.done: + case <-watcher.stopCh: default: t.Fatalf("expected existing watcher to be stopped") } diff --git a/state_impl.go b/state_impl.go index a672eac..e23b981 100644 --- a/state_impl.go +++ b/state_impl.go @@ -1829,7 +1829,7 @@ func (s *Server) listen() Result { // IsActive reports whether the limiter has enabled rate limiting. func (rl *RateLimiter) IsActive() bool { - return rl != nil && rl.config.MaxConnectionsPerMinute > 0 + return rl != nil && rl.limit.MaxConnectionsPerMinute > 0 } func nextMinerID() int64 { return atomic.AddInt64(&minerSeq, 1) }