chore(proxy): clarify watcher and limiter names
This commit is contained in:
parent
8a9046356e
commit
ea378354de
6 changed files with 54 additions and 54 deletions
|
|
@ -18,7 +18,7 @@ func TestConfigWatcher_New_Good(t *testing.T) {
|
||||||
if watcher == nil {
|
if watcher == nil {
|
||||||
t.Fatal("expected watcher")
|
t.Fatal("expected watcher")
|
||||||
}
|
}
|
||||||
if watcher.lastMod.IsZero() {
|
if watcher.lastModifiedAt.IsZero() {
|
||||||
t.Fatal("expected last modification time to be initialised from the file")
|
t.Fatal("expected last modification time to be initialised from the file")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
78
core_impl.go
78
core_impl.go
|
|
@ -307,9 +307,9 @@ func (cd *CustomDiff) OnLogin(e Event) {
|
||||||
// }
|
// }
|
||||||
func NewRateLimiter(config RateLimit) *RateLimiter {
|
func NewRateLimiter(config RateLimit) *RateLimiter {
|
||||||
return &RateLimiter{
|
return &RateLimiter{
|
||||||
config: config,
|
limit: config,
|
||||||
buckets: make(map[string]*tokenBucket),
|
bucketByHost: make(map[string]*tokenBucket),
|
||||||
banned: make(map[string]time.Time),
|
banUntilByHost: make(map[string]time.Time),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -317,7 +317,7 @@ func NewRateLimiter(config RateLimit) *RateLimiter {
|
||||||
// // hostOnly("203.0.113.42:3333") == "203.0.113.42"
|
// // hostOnly("203.0.113.42:3333") == "203.0.113.42"
|
||||||
// }
|
// }
|
||||||
func (rl *RateLimiter) Allow(ip string) bool {
|
func (rl *RateLimiter) Allow(ip string) bool {
|
||||||
if rl == nil || rl.config.MaxConnectionsPerMinute <= 0 {
|
if rl == nil || rl.limit.MaxConnectionsPerMinute <= 0 {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
host := hostOnly(ip)
|
host := hostOnly(ip)
|
||||||
|
|
@ -326,23 +326,23 @@ func (rl *RateLimiter) Allow(ip string) bool {
|
||||||
rl.mu.Lock()
|
rl.mu.Lock()
|
||||||
defer rl.mu.Unlock()
|
defer rl.mu.Unlock()
|
||||||
|
|
||||||
if until, banned := rl.banned[host]; banned {
|
if until, banned := rl.banUntilByHost[host]; banned {
|
||||||
if now.Before(until) {
|
if now.Before(until) {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
delete(rl.banned, host)
|
delete(rl.banUntilByHost, host)
|
||||||
}
|
}
|
||||||
|
|
||||||
bucket, ok := rl.buckets[host]
|
bucket, ok := rl.bucketByHost[host]
|
||||||
if !ok {
|
if !ok {
|
||||||
bucket = &tokenBucket{tokens: rl.config.MaxConnectionsPerMinute, lastRefill: now}
|
bucket = &tokenBucket{tokens: rl.limit.MaxConnectionsPerMinute, lastRefill: now}
|
||||||
rl.buckets[host] = bucket
|
rl.bucketByHost[host] = bucket
|
||||||
}
|
}
|
||||||
|
|
||||||
refillBucket(bucket, rl.config.MaxConnectionsPerMinute, now)
|
refillBucket(bucket, rl.limit.MaxConnectionsPerMinute, now)
|
||||||
if bucket.tokens <= 0 {
|
if bucket.tokens <= 0 {
|
||||||
if rl.config.BanDurationSeconds > 0 {
|
if rl.limit.BanDurationSeconds > 0 {
|
||||||
rl.banned[host] = now.Add(time.Duration(rl.config.BanDurationSeconds) * time.Second)
|
rl.banUntilByHost[host] = now.Add(time.Duration(rl.limit.BanDurationSeconds) * time.Second)
|
||||||
}
|
}
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
@ -356,7 +356,7 @@ func (rl *RateLimiter) Allow(ip string) bool {
|
||||||
//
|
//
|
||||||
// limiter.Tick()
|
// limiter.Tick()
|
||||||
func (rl *RateLimiter) Tick() {
|
func (rl *RateLimiter) Tick() {
|
||||||
if rl == nil || rl.config.MaxConnectionsPerMinute <= 0 {
|
if rl == nil || rl.limit.MaxConnectionsPerMinute <= 0 {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
|
|
@ -364,13 +364,13 @@ func (rl *RateLimiter) Tick() {
|
||||||
rl.mu.Lock()
|
rl.mu.Lock()
|
||||||
defer rl.mu.Unlock()
|
defer rl.mu.Unlock()
|
||||||
|
|
||||||
for host, until := range rl.banned {
|
for host, until := range rl.banUntilByHost {
|
||||||
if !now.Before(until) {
|
if !now.Before(until) {
|
||||||
delete(rl.banned, host)
|
delete(rl.banUntilByHost, host)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for _, bucket := range rl.buckets {
|
for _, bucket := range rl.bucketByHost {
|
||||||
refillBucket(bucket, rl.config.MaxConnectionsPerMinute, now)
|
refillBucket(bucket, rl.limit.MaxConnectionsPerMinute, now)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -381,19 +381,19 @@ func (rl *RateLimiter) Tick() {
|
||||||
// watcher.Start() // polls once per second and reloads after the file mtime changes
|
// watcher.Start() // polls once per second and reloads after the file mtime changes
|
||||||
func NewConfigWatcher(configPath string, onChange func(*Config)) *ConfigWatcher {
|
func NewConfigWatcher(configPath string, onChange func(*Config)) *ConfigWatcher {
|
||||||
watcher := &ConfigWatcher{
|
watcher := &ConfigWatcher{
|
||||||
path: configPath,
|
configPath: configPath,
|
||||||
onChange: onChange,
|
onConfigChange: onChange,
|
||||||
done: make(chan struct{}),
|
stopCh: make(chan struct{}),
|
||||||
}
|
}
|
||||||
if info, err := os.Stat(configPath); err == nil {
|
if info, err := os.Stat(configPath); err == nil {
|
||||||
watcher.lastMod = info.ModTime()
|
watcher.lastModifiedAt = info.ModTime()
|
||||||
}
|
}
|
||||||
return watcher
|
return watcher
|
||||||
}
|
}
|
||||||
|
|
||||||
// watcher.Start()
|
// watcher.Start()
|
||||||
func (w *ConfigWatcher) Start() {
|
func (w *ConfigWatcher) Start() {
|
||||||
if w == nil || w.path == "" || w.onChange == nil {
|
if w == nil || w.configPath == "" || w.onConfigChange == nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
w.mu.Lock()
|
w.mu.Lock()
|
||||||
|
|
@ -401,18 +401,18 @@ func (w *ConfigWatcher) Start() {
|
||||||
w.mu.Unlock()
|
w.mu.Unlock()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if w.done == nil {
|
if w.stopCh == nil {
|
||||||
w.done = make(chan struct{})
|
w.stopCh = make(chan struct{})
|
||||||
} else {
|
} else {
|
||||||
select {
|
select {
|
||||||
case <-w.done:
|
case <-w.stopCh:
|
||||||
w.done = make(chan struct{})
|
w.stopCh = make(chan struct{})
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
done := w.done
|
stopCh := w.stopCh
|
||||||
path := w.path
|
configPath := w.configPath
|
||||||
onChange := w.onChange
|
onConfigChange := w.onConfigChange
|
||||||
w.started = true
|
w.started = true
|
||||||
w.mu.Unlock()
|
w.mu.Unlock()
|
||||||
|
|
||||||
|
|
@ -422,22 +422,22 @@ func (w *ConfigWatcher) Start() {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
if info, err := os.Stat(path); err == nil {
|
if info, err := os.Stat(configPath); err == nil {
|
||||||
w.mu.Lock()
|
w.mu.Lock()
|
||||||
changed := info.ModTime() != w.lastMod
|
changed := info.ModTime() != w.lastModifiedAt
|
||||||
if changed {
|
if changed {
|
||||||
w.lastMod = info.ModTime()
|
w.lastModifiedAt = info.ModTime()
|
||||||
}
|
}
|
||||||
w.mu.Unlock()
|
w.mu.Unlock()
|
||||||
if !changed {
|
if !changed {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
config, result := LoadConfig(path)
|
config, result := LoadConfig(configPath)
|
||||||
if result.OK && config != nil {
|
if result.OK && config != nil {
|
||||||
onChange(config)
|
onConfigChange(config)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
case <-done:
|
case <-stopCh:
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -450,16 +450,16 @@ func (w *ConfigWatcher) Stop() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
w.mu.Lock()
|
w.mu.Lock()
|
||||||
done := w.done
|
stopCh := w.stopCh
|
||||||
w.started = false
|
w.started = false
|
||||||
w.mu.Unlock()
|
w.mu.Unlock()
|
||||||
if done == nil {
|
if stopCh == nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
select {
|
select {
|
||||||
case <-done:
|
case <-stopCh:
|
||||||
default:
|
default:
|
||||||
close(done)
|
close(stopCh)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
20
proxy.go
20
proxy.go
|
|
@ -123,12 +123,12 @@ type CloseEvent struct {
|
||||||
// })
|
// })
|
||||||
// watcher.Start()
|
// watcher.Start()
|
||||||
type ConfigWatcher struct {
|
type ConfigWatcher struct {
|
||||||
path string
|
configPath string
|
||||||
onChange func(*Config)
|
onConfigChange func(*Config)
|
||||||
lastMod time.Time
|
lastModifiedAt time.Time
|
||||||
done chan struct{}
|
stopCh chan struct{}
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
started bool
|
started bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// RateLimiter throttles new connections per source IP.
|
// RateLimiter throttles new connections per source IP.
|
||||||
|
|
@ -141,10 +141,10 @@ type ConfigWatcher struct {
|
||||||
// // accept the socket
|
// // accept the socket
|
||||||
// }
|
// }
|
||||||
type RateLimiter struct {
|
type RateLimiter struct {
|
||||||
config RateLimit
|
limit RateLimit
|
||||||
buckets map[string]*tokenBucket
|
bucketByHost map[string]*tokenBucket
|
||||||
banned map[string]time.Time
|
banUntilByHost map[string]time.Time
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
// tokenBucket is the per-IP refillable counter.
|
// tokenBucket is the per-IP refillable counter.
|
||||||
|
|
|
||||||
|
|
@ -18,7 +18,7 @@ func TestRateLimiter_Allow(t *testing.T) {
|
||||||
func TestRateLimiter_Allow_ReplenishesHighLimits(t *testing.T) {
|
func TestRateLimiter_Allow_ReplenishesHighLimits(t *testing.T) {
|
||||||
rl := NewRateLimiter(RateLimit{MaxConnectionsPerMinute: 120, BanDurationSeconds: 1})
|
rl := NewRateLimiter(RateLimit{MaxConnectionsPerMinute: 120, BanDurationSeconds: 1})
|
||||||
rl.mu.Lock()
|
rl.mu.Lock()
|
||||||
rl.buckets["1.2.3.4"] = &tokenBucket{
|
rl.bucketByHost["1.2.3.4"] = &tokenBucket{
|
||||||
tokens: 0,
|
tokens: 0,
|
||||||
lastRefill: time.Now().Add(-30 * time.Second),
|
lastRefill: time.Now().Add(-30 * time.Second),
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -155,7 +155,7 @@ func TestProxy_Reload_WatchEnabled_Good(t *testing.T) {
|
||||||
if p.watcher == nil {
|
if p.watcher == nil {
|
||||||
t.Fatalf("expected reload to create a watcher when watch is enabled")
|
t.Fatalf("expected reload to create a watcher when watch is enabled")
|
||||||
}
|
}
|
||||||
if got := p.watcher.path; got != "/tmp/proxy.json" {
|
if got := p.watcher.configPath; got != "/tmp/proxy.json" {
|
||||||
t.Fatalf("expected watcher to keep the original config path, got %q", got)
|
t.Fatalf("expected watcher to keep the original config path, got %q", got)
|
||||||
}
|
}
|
||||||
p.watcher.Stop()
|
p.watcher.Stop()
|
||||||
|
|
@ -188,7 +188,7 @@ func TestProxy_Reload_WatchDisabled_Bad(t *testing.T) {
|
||||||
t.Fatalf("expected reload to stop and clear the watcher when watch is disabled")
|
t.Fatalf("expected reload to stop and clear the watcher when watch is disabled")
|
||||||
}
|
}
|
||||||
select {
|
select {
|
||||||
case <-watcher.done:
|
case <-watcher.stopCh:
|
||||||
default:
|
default:
|
||||||
t.Fatalf("expected existing watcher to be stopped")
|
t.Fatalf("expected existing watcher to be stopped")
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1829,7 +1829,7 @@ func (s *Server) listen() Result {
|
||||||
|
|
||||||
// IsActive reports whether the limiter has enabled rate limiting.
|
// IsActive reports whether the limiter has enabled rate limiting.
|
||||||
func (rl *RateLimiter) IsActive() bool {
|
func (rl *RateLimiter) IsActive() bool {
|
||||||
return rl != nil && rl.config.MaxConnectionsPerMinute > 0
|
return rl != nil && rl.limit.MaxConnectionsPerMinute > 0
|
||||||
}
|
}
|
||||||
|
|
||||||
func nextMinerID() int64 { return atomic.AddInt64(&minerSeq, 1) }
|
func nextMinerID() int64 { return atomic.AddInt64(&minerSeq, 1) }
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue