feat(proxy): wire worker bus and mapper startup

Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
Virgil 2026-04-04 18:53:16 +00:00
parent bc67e73ca0
commit 1bcbb389e6
6 changed files with 141 additions and 15 deletions

View file

@ -37,10 +37,7 @@ func (s *NonceSplitter) Connect() {
s.addMapperLocked()
}
for _, mapper := range s.mappers {
if mapper.strategy != nil {
mapper.strategy.Connect()
return
}
mapper.Start()
}
}
@ -149,6 +146,7 @@ func (s *NonceSplitter) addMapperLocked() *NonceMapper {
s.byID = make(map[int64]*NonceMapper)
}
s.byID[mapper.id] = mapper
mapper.Start()
return mapper
}
@ -163,6 +161,17 @@ func NewNonceMapper(id int64, cfg *proxy.Config, strategy pool.Strategy) *NonceM
}
}
// Start connects the mapper's upstream strategy once.
func (m *NonceMapper) Start() {
if m == nil || m.strategy == nil {
return
}
m.startOnce.Do(func() {
m.lastUsed = time.Now()
m.strategy.Connect()
})
}
// Add assigns a miner to a free slot.
func (m *NonceMapper) Add(miner *proxy.Miner) bool {
if m == nil || miner == nil {

View file

@ -23,6 +23,7 @@ type NonceMapper struct {
active bool // true once pool has sent at least one job
suspended int // > 0 when pool connection is in error/reconnecting
lastUsed time.Time
startOnce sync.Once
mu sync.Mutex
}

View file

@ -0,0 +1,60 @@
package nicehash
import (
"sync"
"testing"
"dappco.re/go/proxy"
)
type startCountingStrategy struct {
mu sync.Mutex
connect int
}
func (s *startCountingStrategy) Connect() {
s.mu.Lock()
defer s.mu.Unlock()
s.connect++
}
func (s *startCountingStrategy) Submit(jobID, nonce, result, algo string) int64 {
return 0
}
func (s *startCountingStrategy) Disconnect() {}
func (s *startCountingStrategy) IsActive() bool {
s.mu.Lock()
defer s.mu.Unlock()
return s.connect > 0
}
func TestMapper_Start_Good(t *testing.T) {
strategy := &startCountingStrategy{}
mapper := NewNonceMapper(1, &proxy.Config{}, strategy)
mapper.Start()
if strategy.connect != 1 {
t.Fatalf("expected one connect call, got %d", strategy.connect)
}
}
func TestMapper_Start_Bad(t *testing.T) {
mapper := NewNonceMapper(1, &proxy.Config{}, nil)
mapper.Start()
}
func TestMapper_Start_Ugly(t *testing.T) {
strategy := &startCountingStrategy{}
mapper := NewNonceMapper(1, &proxy.Config{}, strategy)
mapper.Start()
mapper.Start()
if strategy.connect != 1 {
t.Fatalf("expected Start to be idempotent, got %d connect calls", strategy.connect)
}
}

View file

@ -57,13 +57,9 @@ func New(cfg *Config) (*Proxy, Result) {
p.events.Subscribe(EventClose, p.accessLog.OnClose)
}
p.events.Subscribe(EventLogin, p.stats.OnLogin)
p.events.Subscribe(EventLogin, p.workers.OnLogin)
p.events.Subscribe(EventClose, p.stats.OnClose)
p.events.Subscribe(EventClose, p.workers.OnClose)
p.events.Subscribe(EventAccept, p.stats.OnAccept)
p.events.Subscribe(EventAccept, p.workers.OnAccept)
p.events.Subscribe(EventReject, p.stats.OnReject)
p.events.Subscribe(EventReject, p.workers.OnReject)
if cfg.Watch && cfg.sourcePath != "" {
p.watcher = NewConfigWatcher(cfg.sourcePath, p.Reload)
}
@ -1164,12 +1160,14 @@ func insertTopDiff(top *[10]uint64, diff uint64) {
}
// NewWorkers creates a worker aggregate tracker.
func NewWorkers(mode WorkersMode, _ *EventBus) *Workers {
return &Workers{
func NewWorkers(mode WorkersMode, bus *EventBus) *Workers {
workers := &Workers{
mode: mode,
nameIndex: make(map[string]int),
idIndex: make(map[int64]int),
}
workers.bindEvents(bus)
return workers
}
func (w *Workers) bindEvents(bus *EventBus) {
@ -1178,6 +1176,14 @@ func (w *Workers) bindEvents(bus *EventBus) {
}
w.mu.Lock()
defer w.mu.Unlock()
if w.subscribed {
return
}
bus.Subscribe(EventLogin, w.OnLogin)
bus.Subscribe(EventAccept, w.OnAccept)
bus.Subscribe(EventReject, w.OnReject)
bus.Subscribe(EventClose, w.OnClose)
w.subscribed = true
}
func workerNameFor(mode WorkersMode, miner *Miner) string {

View file

@ -10,11 +10,12 @@ import (
//
// w := proxy.NewWorkers(proxy.WorkersByRigID, bus)
type Workers struct {
mode WorkersMode
entries []WorkerRecord // ordered by first-seen (stable)
nameIndex map[string]int // workerName → entries index
idIndex map[int64]int // minerID → entries index
mu sync.RWMutex
mode WorkersMode
entries []WorkerRecord // ordered by first-seen (stable)
nameIndex map[string]int // workerName → entries index
idIndex map[int64]int // minerID → entries index
subscribed bool
mu sync.RWMutex
}
// WorkerRecord is the per-identity aggregate.

49
worker_test.go Normal file
View file

@ -0,0 +1,49 @@
package proxy
import "testing"
func TestWorker_NewWorkers_Good(t *testing.T) {
bus := NewEventBus()
workers := NewWorkers(WorkersByRigID, bus)
miner := &Miner{id: 7, user: "wallet", rigID: "rig-1", ip: "10.0.0.1"}
bus.Dispatch(Event{Type: EventLogin, Miner: miner})
records := workers.List()
if len(records) != 1 {
t.Fatalf("expected one worker record, got %d", len(records))
}
if records[0].Name != "rig-1" {
t.Fatalf("expected rig id worker name, got %q", records[0].Name)
}
if records[0].Connections != 1 {
t.Fatalf("expected one connection, got %d", records[0].Connections)
}
}
func TestWorker_NewWorkers_Bad(t *testing.T) {
workers := NewWorkers(WorkersDisabled, nil)
if workers == nil {
t.Fatalf("expected workers instance")
}
if got := workers.List(); len(got) != 0 {
t.Fatalf("expected no worker records, got %d", len(got))
}
}
func TestWorker_NewWorkers_Ugly(t *testing.T) {
bus := NewEventBus()
workers := NewWorkers(WorkersByUser, bus)
workers.bindEvents(bus)
miner := &Miner{id: 11, user: "wallet", ip: "10.0.0.2"}
bus.Dispatch(Event{Type: EventLogin, Miner: miner})
records := workers.List()
if len(records) != 1 {
t.Fatalf("expected one worker record, got %d", len(records))
}
if records[0].Connections != 1 {
t.Fatalf("expected a single subscription path, got %d connections", records[0].Connections)
}
}