diff --git a/splitter/nicehash/impl.go b/splitter/nicehash/impl.go index c88b90b..c745315 100644 --- a/splitter/nicehash/impl.go +++ b/splitter/nicehash/impl.go @@ -37,10 +37,7 @@ func (s *NonceSplitter) Connect() { s.addMapperLocked() } for _, mapper := range s.mappers { - if mapper.strategy != nil { - mapper.strategy.Connect() - return - } + mapper.Start() } } @@ -149,6 +146,7 @@ func (s *NonceSplitter) addMapperLocked() *NonceMapper { s.byID = make(map[int64]*NonceMapper) } s.byID[mapper.id] = mapper + mapper.Start() return mapper } @@ -163,6 +161,17 @@ func NewNonceMapper(id int64, cfg *proxy.Config, strategy pool.Strategy) *NonceM } } +// Start connects the mapper's upstream strategy once. +func (m *NonceMapper) Start() { + if m == nil || m.strategy == nil { + return + } + m.startOnce.Do(func() { + m.lastUsed = time.Now() + m.strategy.Connect() + }) +} + // Add assigns a miner to a free slot. func (m *NonceMapper) Add(miner *proxy.Miner) bool { if m == nil || miner == nil { diff --git a/splitter/nicehash/mapper.go b/splitter/nicehash/mapper.go index 8bfd0c3..1b51044 100644 --- a/splitter/nicehash/mapper.go +++ b/splitter/nicehash/mapper.go @@ -23,6 +23,7 @@ type NonceMapper struct { active bool // true once pool has sent at least one job suspended int // > 0 when pool connection is in error/reconnecting lastUsed time.Time + startOnce sync.Once mu sync.Mutex } diff --git a/splitter/nicehash/mapper_start_test.go b/splitter/nicehash/mapper_start_test.go new file mode 100644 index 0000000..e5c5040 --- /dev/null +++ b/splitter/nicehash/mapper_start_test.go @@ -0,0 +1,60 @@ +package nicehash + +import ( + "sync" + "testing" + + "dappco.re/go/proxy" +) + +type startCountingStrategy struct { + mu sync.Mutex + connect int +} + +func (s *startCountingStrategy) Connect() { + s.mu.Lock() + defer s.mu.Unlock() + s.connect++ +} + +func (s *startCountingStrategy) Submit(jobID, nonce, result, algo string) int64 { + return 0 +} + +func (s *startCountingStrategy) Disconnect() {} + +func (s *startCountingStrategy) IsActive() bool { + s.mu.Lock() + defer s.mu.Unlock() + return s.connect > 0 +} + +func TestMapper_Start_Good(t *testing.T) { + strategy := &startCountingStrategy{} + mapper := NewNonceMapper(1, &proxy.Config{}, strategy) + + mapper.Start() + + if strategy.connect != 1 { + t.Fatalf("expected one connect call, got %d", strategy.connect) + } +} + +func TestMapper_Start_Bad(t *testing.T) { + mapper := NewNonceMapper(1, &proxy.Config{}, nil) + + mapper.Start() +} + +func TestMapper_Start_Ugly(t *testing.T) { + strategy := &startCountingStrategy{} + mapper := NewNonceMapper(1, &proxy.Config{}, strategy) + + mapper.Start() + mapper.Start() + + if strategy.connect != 1 { + t.Fatalf("expected Start to be idempotent, got %d connect calls", strategy.connect) + } +} diff --git a/state_impl.go b/state_impl.go index 0d03c08..45febe1 100644 --- a/state_impl.go +++ b/state_impl.go @@ -57,13 +57,9 @@ func New(cfg *Config) (*Proxy, Result) { p.events.Subscribe(EventClose, p.accessLog.OnClose) } p.events.Subscribe(EventLogin, p.stats.OnLogin) - p.events.Subscribe(EventLogin, p.workers.OnLogin) p.events.Subscribe(EventClose, p.stats.OnClose) - p.events.Subscribe(EventClose, p.workers.OnClose) p.events.Subscribe(EventAccept, p.stats.OnAccept) - p.events.Subscribe(EventAccept, p.workers.OnAccept) p.events.Subscribe(EventReject, p.stats.OnReject) - p.events.Subscribe(EventReject, p.workers.OnReject) if cfg.Watch && cfg.sourcePath != "" { p.watcher = NewConfigWatcher(cfg.sourcePath, p.Reload) } @@ -1164,12 +1160,14 @@ func insertTopDiff(top *[10]uint64, diff uint64) { } // NewWorkers creates a worker aggregate tracker. -func NewWorkers(mode WorkersMode, _ *EventBus) *Workers { - return &Workers{ +func NewWorkers(mode WorkersMode, bus *EventBus) *Workers { + workers := &Workers{ mode: mode, nameIndex: make(map[string]int), idIndex: make(map[int64]int), } + workers.bindEvents(bus) + return workers } func (w *Workers) bindEvents(bus *EventBus) { @@ -1178,6 +1176,14 @@ func (w *Workers) bindEvents(bus *EventBus) { } w.mu.Lock() defer w.mu.Unlock() + if w.subscribed { + return + } + bus.Subscribe(EventLogin, w.OnLogin) + bus.Subscribe(EventAccept, w.OnAccept) + bus.Subscribe(EventReject, w.OnReject) + bus.Subscribe(EventClose, w.OnClose) + w.subscribed = true } func workerNameFor(mode WorkersMode, miner *Miner) string { diff --git a/worker.go b/worker.go index ec08b87..ba76ee7 100644 --- a/worker.go +++ b/worker.go @@ -10,11 +10,12 @@ import ( // // w := proxy.NewWorkers(proxy.WorkersByRigID, bus) type Workers struct { - mode WorkersMode - entries []WorkerRecord // ordered by first-seen (stable) - nameIndex map[string]int // workerName → entries index - idIndex map[int64]int // minerID → entries index - mu sync.RWMutex + mode WorkersMode + entries []WorkerRecord // ordered by first-seen (stable) + nameIndex map[string]int // workerName → entries index + idIndex map[int64]int // minerID → entries index + subscribed bool + mu sync.RWMutex } // WorkerRecord is the per-identity aggregate. diff --git a/worker_test.go b/worker_test.go new file mode 100644 index 0000000..cae232e --- /dev/null +++ b/worker_test.go @@ -0,0 +1,49 @@ +package proxy + +import "testing" + +func TestWorker_NewWorkers_Good(t *testing.T) { + bus := NewEventBus() + workers := NewWorkers(WorkersByRigID, bus) + miner := &Miner{id: 7, user: "wallet", rigID: "rig-1", ip: "10.0.0.1"} + + bus.Dispatch(Event{Type: EventLogin, Miner: miner}) + + records := workers.List() + if len(records) != 1 { + t.Fatalf("expected one worker record, got %d", len(records)) + } + if records[0].Name != "rig-1" { + t.Fatalf("expected rig id worker name, got %q", records[0].Name) + } + if records[0].Connections != 1 { + t.Fatalf("expected one connection, got %d", records[0].Connections) + } +} + +func TestWorker_NewWorkers_Bad(t *testing.T) { + workers := NewWorkers(WorkersDisabled, nil) + if workers == nil { + t.Fatalf("expected workers instance") + } + if got := workers.List(); len(got) != 0 { + t.Fatalf("expected no worker records, got %d", len(got)) + } +} + +func TestWorker_NewWorkers_Ugly(t *testing.T) { + bus := NewEventBus() + workers := NewWorkers(WorkersByUser, bus) + workers.bindEvents(bus) + + miner := &Miner{id: 11, user: "wallet", ip: "10.0.0.2"} + bus.Dispatch(Event{Type: EventLogin, Miner: miner}) + + records := workers.List() + if len(records) != 1 { + t.Fatalf("expected one worker record, got %d", len(records)) + } + if records[0].Connections != 1 { + t.Fatalf("expected a single subscription path, got %d connections", records[0].Connections) + } +}