Honor worker mode changes on reload
This commit is contained in:
parent
a76e6be1c7
commit
bbdff60580
2 changed files with 101 additions and 35 deletions
|
|
@ -56,8 +56,8 @@ func TestProxy_Reload_Good(t *testing.T) {
|
|||
if p.config.Mode != "nicehash" {
|
||||
t.Fatalf("expected mode to remain unchanged, got %q", p.config.Mode)
|
||||
}
|
||||
if p.config.Workers != WorkersByRigID {
|
||||
t.Fatalf("expected workers mode to remain unchanged, got %q", p.config.Workers)
|
||||
if p.config.Workers != WorkersByUser {
|
||||
t.Fatalf("expected workers mode to reload, got %q", p.config.Workers)
|
||||
}
|
||||
if got := p.config.Pools[0].URL; got != "pool-b.example:4444" {
|
||||
t.Fatalf("expected pools to reload, got %q", got)
|
||||
|
|
@ -70,6 +70,41 @@ func TestProxy_Reload_Good(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestProxy_Reload_WorkersMode_Good(t *testing.T) {
|
||||
miner := &Miner{id: 7, user: "wallet-a", rigID: "rig-a", ip: "10.0.0.7"}
|
||||
workers := NewWorkers(WorkersByRigID, nil)
|
||||
workers.OnLogin(Event{Miner: miner})
|
||||
|
||||
p := &Proxy{
|
||||
config: &Config{
|
||||
Mode: "nicehash",
|
||||
Workers: WorkersByRigID,
|
||||
Bind: []BindAddr{{Host: "127.0.0.1", Port: 3333}},
|
||||
Pools: []PoolConfig{{URL: "pool-a.example:3333", Enabled: true}},
|
||||
},
|
||||
workers: workers,
|
||||
miners: map[int64]*Miner{miner.id: miner},
|
||||
}
|
||||
|
||||
p.Reload(&Config{
|
||||
Mode: "nicehash",
|
||||
Workers: WorkersByUser,
|
||||
Bind: []BindAddr{{Host: "127.0.0.1", Port: 3333}},
|
||||
Pools: []PoolConfig{{URL: "pool-a.example:3333", Enabled: true}},
|
||||
})
|
||||
|
||||
if got := p.WorkersMode(); got != WorkersByUser {
|
||||
t.Fatalf("expected proxy workers mode %q, got %q", WorkersByUser, got)
|
||||
}
|
||||
records := p.WorkerRecords()
|
||||
if len(records) != 1 {
|
||||
t.Fatalf("expected one rebuilt worker record, got %d", len(records))
|
||||
}
|
||||
if got := records[0].Name; got != "wallet-a" {
|
||||
t.Fatalf("expected worker record to rebuild using user mode, got %q", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestProxy_Reload_UpdatesServers(t *testing.T) {
|
||||
originalLimiter := NewRateLimiter(RateLimit{MaxConnectionsPerMinute: 1})
|
||||
p := &Proxy{
|
||||
|
|
|
|||
|
|
@ -297,19 +297,26 @@ func (p *Proxy) closeAllMiners() {
|
|||
if p == nil {
|
||||
return
|
||||
}
|
||||
p.minersMu.RLock()
|
||||
miners := make([]*Miner, 0, len(p.miners))
|
||||
for _, miner := range p.miners {
|
||||
miners = append(miners, miner)
|
||||
}
|
||||
p.minersMu.RUnlock()
|
||||
for _, miner := range miners {
|
||||
for _, miner := range p.activeMiners() {
|
||||
if miner != nil {
|
||||
miner.Close()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Proxy) activeMiners() []*Miner {
|
||||
if p == nil {
|
||||
return nil
|
||||
}
|
||||
p.minersMu.RLock()
|
||||
defer p.minersMu.RUnlock()
|
||||
miners := make([]*Miner, 0, len(p.miners))
|
||||
for _, miner := range p.miners {
|
||||
miners = append(miners, miner)
|
||||
}
|
||||
return miners
|
||||
}
|
||||
|
||||
// p.Reload(&proxy.Config{Mode: "simple", Pools: []proxy.PoolConfig{{URL: "pool.example:3333", Enabled: true}}})
|
||||
func (p *Proxy) Reload(config *Config) {
|
||||
if p == nil || config == nil {
|
||||
|
|
@ -319,19 +326,21 @@ func (p *Proxy) Reload(config *Config) {
|
|||
return
|
||||
}
|
||||
poolsChanged := p.config == nil || !reflect.DeepEqual(p.config.Pools, config.Pools)
|
||||
workersChanged := p.config == nil || p.config.Workers != config.Workers
|
||||
if p.config == nil {
|
||||
p.config = config
|
||||
} else {
|
||||
preservedBind := append([]BindAddr(nil), p.config.Bind...)
|
||||
preservedMode := p.config.Mode
|
||||
preservedWorkers := p.config.Workers
|
||||
preservedConfigPath := p.config.configPath
|
||||
*p.config = *config
|
||||
p.config.Bind = preservedBind
|
||||
p.config.Mode = preservedMode
|
||||
p.config.Workers = preservedWorkers
|
||||
p.config.configPath = preservedConfigPath
|
||||
}
|
||||
if workersChanged && p.workers != nil {
|
||||
p.workers.ResetMode(p.config.Workers, p.activeMiners())
|
||||
}
|
||||
if p.customDiff != nil {
|
||||
p.customDiff.globalDiff = config.CustomDiff
|
||||
}
|
||||
|
|
@ -1443,32 +1452,9 @@ func (w *Workers) OnLogin(e Event) {
|
|||
if w == nil || e.Miner == nil {
|
||||
return
|
||||
}
|
||||
if w.mode == WorkersDisabled {
|
||||
return
|
||||
}
|
||||
name := workerNameFor(w.mode, e.Miner)
|
||||
if name == "" {
|
||||
return
|
||||
}
|
||||
w.mu.Lock()
|
||||
defer w.mu.Unlock()
|
||||
index, ok := w.nameIndex[name]
|
||||
if !ok {
|
||||
index = len(w.entries)
|
||||
record := WorkerRecord{Name: name}
|
||||
record.windows[0] = newTickWindow(60)
|
||||
record.windows[1] = newTickWindow(600)
|
||||
record.windows[2] = newTickWindow(3600)
|
||||
record.windows[3] = newTickWindow(43200)
|
||||
record.windows[4] = newTickWindow(86400)
|
||||
w.entries = append(w.entries, record)
|
||||
w.nameIndex[name] = index
|
||||
}
|
||||
record := &w.entries[index]
|
||||
record.Name = name
|
||||
record.LastIP = e.Miner.ip
|
||||
record.Connections++
|
||||
w.idIndex[e.Miner.id] = index
|
||||
w.recordLoginLocked(e.Miner)
|
||||
}
|
||||
|
||||
func newTickWindow(size int) tickWindow {
|
||||
|
|
@ -1478,6 +1464,24 @@ func newTickWindow(size int) tickWindow {
|
|||
}
|
||||
}
|
||||
|
||||
// ResetMode switches the worker identity strategy and rebuilds the live worker index.
|
||||
//
|
||||
// workers.ResetMode(proxy.WorkersByUser, activeMiners)
|
||||
func (w *Workers) ResetMode(mode WorkersMode, miners []*Miner) {
|
||||
if w == nil {
|
||||
return
|
||||
}
|
||||
w.mu.Lock()
|
||||
defer w.mu.Unlock()
|
||||
w.mode = mode
|
||||
w.entries = nil
|
||||
w.nameIndex = make(map[string]int)
|
||||
w.idIndex = make(map[int64]int)
|
||||
for _, miner := range miners {
|
||||
w.recordLoginLocked(miner)
|
||||
}
|
||||
}
|
||||
|
||||
// OnAccept updates the owning worker with the accepted share.
|
||||
func (w *Workers) OnAccept(e Event) {
|
||||
if w == nil || e.Miner == nil {
|
||||
|
|
@ -1604,6 +1608,33 @@ func cloneWorkerRecord(record WorkerRecord) WorkerRecord {
|
|||
return cloned
|
||||
}
|
||||
|
||||
func (w *Workers) recordLoginLocked(miner *Miner) {
|
||||
if w == nil || miner == nil || w.mode == WorkersDisabled {
|
||||
return
|
||||
}
|
||||
name := workerNameFor(w.mode, miner)
|
||||
if name == "" {
|
||||
return
|
||||
}
|
||||
index, ok := w.nameIndex[name]
|
||||
if !ok {
|
||||
index = len(w.entries)
|
||||
record := WorkerRecord{Name: name}
|
||||
record.windows[0] = newTickWindow(60)
|
||||
record.windows[1] = newTickWindow(600)
|
||||
record.windows[2] = newTickWindow(3600)
|
||||
record.windows[3] = newTickWindow(43200)
|
||||
record.windows[4] = newTickWindow(86400)
|
||||
w.entries = append(w.entries, record)
|
||||
w.nameIndex[name] = index
|
||||
}
|
||||
record := &w.entries[index]
|
||||
record.Name = name
|
||||
record.LastIP = miner.ip
|
||||
record.Connections++
|
||||
w.idIndex[miner.id] = index
|
||||
}
|
||||
|
||||
// Apply normalises one miner login at the same point the handshake does.
|
||||
//
|
||||
// cd.Apply(&proxy.Miner{user: "WALLET+50000"})
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue