diff --git a/api/router.go b/api/router.go index 1eba171..92b2854 100644 --- a/api/router.go +++ b/api/router.go @@ -159,9 +159,30 @@ func RegisterRoutes(router Router, proxyValue *proxy.Proxy) { }) router.HandleFunc("/1/miners", func(writer http.ResponseWriter, request *http.Request) { + miners := proxyValue.Miners() + rows := make([][]interface{}, 0, len(miners)) + for _, miner := range miners { + ip := "" + if remote := miner.RemoteAddr(); remote != nil { + ip = remote.String() + } + rows = append(rows, []interface{}{ + miner.ID(), + ip, + miner.TX(), + miner.RX(), + miner.State(), + miner.Diff(), + miner.User(), + "********", + miner.RigID(), + miner.Agent(), + }) + } + writeJSON(writer, map[string]interface{}{ "format": []string{"id", "ip", "tx", "rx", "state", "diff", "user", "password", "rig_id", "agent"}, - "miners": [][]interface{}{}, + "miners": rows, }) }) } diff --git a/config.go b/config.go index 9de0349..e8493bd 100644 --- a/config.go +++ b/config.go @@ -21,6 +21,7 @@ type Config struct { RetryPause int `json:"retry-pause"` // seconds between retries Watch bool `json:"watch"` // hot-reload on file change RateLimit RateLimit `json:"rate-limit"` // per-IP connection rate limit + sourcePath string `json:"-"` } // BindAddr is one TCP listen endpoint. diff --git a/config_runtime.go b/config_runtime.go index 49d4939..6af1e7a 100644 --- a/config_runtime.go +++ b/config_runtime.go @@ -21,6 +21,7 @@ func LoadConfig(path string) (*Config, error) { if errorValue = json.Unmarshal(data, config); errorValue != nil { return nil, errorValue } + config.sourcePath = path if errorValue = config.Validate(); errorValue != nil { return nil, errorValue diff --git a/events.go b/events.go index 711b929..c366d3f 100644 --- a/events.go +++ b/events.go @@ -18,6 +18,7 @@ type EventType int const ( EventLogin EventType = iota // miner completed login + EventSubmit // miner submitted a share EventAccept // pool accepted a submitted share EventReject // pool rejected a share (or share expired) EventClose // miner TCP connection closed diff --git a/miner.go b/miner.go index 41951ef..0e43d31 100644 --- a/miner.go +++ b/miner.go @@ -36,6 +36,7 @@ type Miner struct { password string // login params.pass agent string // login params.agent rigID string // login params.rigid (optional extension) + algo []string fixedByte uint8 // NiceHash slot index (0-255) mapperID int64 // which NonceMapper owns this miner; -1 = unassigned routeID int64 // SimpleMapper ID in simple mode; -1 = unassigned @@ -45,6 +46,11 @@ type Miner struct { tx uint64 // bytes sent to miner connectedAt time.Time lastActivityAt time.Time + events *EventBus + splitter Splitter + currentJob *Job + closeOnce sync.Once + accessPassword string conn net.Conn tlsConn *tls.Conn // nil if plain TCP sendMu sync.Mutex // serialises writes to conn diff --git a/miner_runtime.go b/miner_runtime.go index 6451b84..8f0c0d9 100644 --- a/miner_runtime.go +++ b/miner_runtime.go @@ -6,6 +6,7 @@ import ( "encoding/hex" "encoding/json" "net" + "strings" ) type minerRequest struct { @@ -23,15 +24,23 @@ func (m *Miner) Start() { } go func() { - reader := bufio.NewReader(m.conn) + reader := bufio.NewReaderSize(m.conn, len(m.buf)) for { - line, errorValue := reader.ReadBytes('\n') + line, isPrefix, errorValue := reader.ReadLine() if errorValue != nil { m.Close() return } - m.rx += uint64(len(line)) + if isPrefix { + m.Close() + return + } + if len(line) == 0 { + continue + } + m.rx += uint64(len(line) + 1) m.Touch() + m.handleLine(line) } }() } @@ -51,6 +60,8 @@ func (m *Miner) ForwardJob(job Job, algo string) { m.diff = job.DifficultyFromTarget() m.state = MinerStateReady + jobCopy := job + m.currentJob = &jobCopy m.Touch() params := map[string]interface{}{ @@ -59,7 +70,7 @@ func (m *Miner) ForwardJob(job Job, algo string) { "target": job.Target, "id": m.rpcID, } - if algo != "" { + if m.extAlgo && algo != "" { params["algo"] = algo } @@ -102,12 +113,17 @@ func (m *Miner) Success(id int64, status string) { // // m.Close() func (m *Miner) Close() { - if m == nil || m.conn == nil || m.state == MinerStateClosing { + if m == nil || m.conn == nil { return } - m.state = MinerStateClosing - _ = m.conn.Close() + m.closeOnce.Do(func() { + m.state = MinerStateClosing + if m.events != nil { + m.events.Dispatch(Event{Type: EventClose, Miner: m}) + } + _ = m.conn.Close() + }) } func (m *Miner) writeJSON(value interface{}) { @@ -130,6 +146,204 @@ func (m *Miner) writeJSON(value interface{}) { } } +func (m *Miner) handleLine(line []byte) { + if len(line) > len(m.buf) { + m.Close() + return + } + + request := minerRequest{} + if errorValue := json.Unmarshal(line, &request); errorValue != nil { + m.Close() + return + } + + switch request.Method { + case "login": + m.handleLogin(request) + case "submit": + m.handleSubmit(request) + case "keepalived": + m.handleKeepalived(request) + default: + if request.ID != 0 { + m.ReplyWithError(request.ID, "Invalid request") + } + } +} + +func (m *Miner) handleLogin(request minerRequest) { + type loginParams struct { + Login string `json:"login"` + Pass string `json:"pass"` + Agent string `json:"agent"` + Algo []string `json:"algo"` + RigID string `json:"rigid"` + } + + params := loginParams{} + if errorValue := json.Unmarshal(request.Params, ¶ms); errorValue != nil { + m.ReplyWithError(request.ID, "Invalid payment address provided") + return + } + + if params.Login == "" { + m.ReplyWithError(request.ID, "Invalid payment address provided") + return + } + if m.accessPassword != "" && params.Pass != m.accessPassword { + m.ReplyWithError(request.ID, "Invalid password") + return + } + + m.SetPassword(params.Pass) + m.SetAgent(params.Agent) + m.SetRigID(params.RigID) + m.algo = append(m.algo[:0], params.Algo...) + m.extAlgo = len(params.Algo) > 0 + m.SetUser(params.Login) + m.SetRPCID(newRPCID()) + + if m.events != nil { + m.events.Dispatch(Event{Type: EventLogin, Miner: m}) + } + + if m.state == MinerStateClosing { + return + } + + result := map[string]interface{}{ + "id": m.rpcID, + "status": "OK", + } + if m.currentJob != nil && m.currentJob.IsValid() { + jobCopy := *m.currentJob + jobResult := map[string]interface{}{ + "blob": jobCopy.Blob, + "job_id": jobCopy.JobID, + "target": jobCopy.Target, + "id": m.rpcID, + } + if m.extAlgo && jobCopy.Algo != "" { + jobResult["algo"] = jobCopy.Algo + } + result["job"] = jobResult + if m.extAlgo { + result["extensions"] = []string{"algo"} + } + m.state = MinerStateReady + } else { + m.state = MinerStateWaitReady + if m.extAlgo { + result["extensions"] = []string{"algo"} + } + } + + m.writeJSON(map[string]interface{}{ + "id": request.ID, + "jsonrpc": "2.0", + "error": nil, + "result": result, + }) +} + +func (m *Miner) handleSubmit(request minerRequest) { + if m.state != MinerStateReady { + m.ReplyWithError(request.ID, "Unauthenticated") + return + } + + type submitParams struct { + ID string `json:"id"` + JobID string `json:"job_id"` + Nonce string `json:"nonce"` + Result string `json:"result"` + Algo string `json:"algo"` + } + + params := submitParams{} + if errorValue := json.Unmarshal(request.Params, ¶ms); errorValue != nil { + m.ReplyWithError(request.ID, "Malformed share") + return + } + + if params.ID != m.rpcID { + m.ReplyWithError(request.ID, "Unauthenticated") + return + } + if params.JobID == "" { + m.ReplyWithError(request.ID, "Missing job id") + return + } + if len(params.Nonce) != 8 || params.Nonce != strings.ToLower(params.Nonce) { + m.ReplyWithError(request.ID, "Invalid nonce") + return + } + if _, errorValue := hex.DecodeString(params.Nonce); errorValue != nil { + m.ReplyWithError(request.ID, "Invalid nonce") + return + } + + m.Touch() + if m.splitter != nil { + m.splitter.OnSubmit(&SubmitEvent{ + Miner: m, + JobID: params.JobID, + Nonce: params.Nonce, + Result: params.Result, + Algo: params.Algo, + RequestID: request.ID, + }) + } +} + +func (m *Miner) handleKeepalived(request minerRequest) { + m.Touch() + m.Success(request.ID, "KEEPALIVED") +} + +func (m *Miner) currentJobCopy() *Job { + if m == nil || m.currentJob == nil { + return nil + } + + jobCopy := *m.currentJob + return &jobCopy +} + +func (m *Miner) dispatchSubmitResult(eventType EventType, diff uint64, errorMessage string, requestID int64) { + if m == nil || m.events == nil { + return + } + + jobCopy := m.currentJobCopy() + m.events.Dispatch(Event{ + Type: eventType, + Miner: m, + Job: jobCopy, + Diff: diff, + Error: errorMessage, + Latency: 0, + }) + if eventType == EventAccept { + m.Success(requestID, "OK") + return + } + m.ReplyWithError(requestID, errorMessage) +} + +func (m *Miner) setStateFromJob(job Job) { + m.currentJob = &job + m.state = MinerStateReady +} + +func (m *Miner) Expire() { + if m == nil || m.state == MinerStateClosing { + return + } + m.Close() +} + func newRPCID() string { value := make([]byte, 16) _, _ = rand.Read(value) diff --git a/miner_runtime_test.go b/miner_runtime_test.go new file mode 100644 index 0000000..712c336 --- /dev/null +++ b/miner_runtime_test.go @@ -0,0 +1,129 @@ +package proxy + +import ( + "bufio" + "encoding/json" + "net" + "testing" + "time" +) + +func TestMiner_Login_Good(t *testing.T) { + serverConn, clientConn := net.Pipe() + defer clientConn.Close() + + miner := NewMiner(serverConn, 3333, nil) + miner.Start() + defer miner.Close() + + encoder := json.NewEncoder(clientConn) + if err := encoder.Encode(map[string]interface{}{ + "id": 1, + "jsonrpc": "2.0", + "method": "login", + "params": map[string]interface{}{ + "login": "wallet", + "pass": "x", + "agent": "xmrig", + }, + }); err != nil { + t.Fatal(err) + } + + clientConn.SetReadDeadline(time.Now().Add(time.Second)) + line, err := bufio.NewReader(clientConn).ReadBytes('\n') + if err != nil { + t.Fatal(err) + } + + var response map[string]interface{} + if err := json.Unmarshal(line, &response); err != nil { + t.Fatal(err) + } + + if response["jsonrpc"] != "2.0" { + t.Fatalf("unexpected response: %#v", response) + } + + result := response["result"].(map[string]interface{}) + if result["status"] != "OK" || result["id"] == "" { + t.Fatalf("unexpected login response: %#v", response) + } +} + +func TestMiner_Keepalived_Bad(t *testing.T) { + serverConn, clientConn := net.Pipe() + defer clientConn.Close() + + miner := NewMiner(serverConn, 3333, nil) + miner.Start() + defer miner.Close() + + encoder := json.NewEncoder(clientConn) + if err := encoder.Encode(map[string]interface{}{ + "id": 2, + "jsonrpc": "2.0", + "method": "keepalived", + }); err != nil { + t.Fatal(err) + } + + clientConn.SetReadDeadline(time.Now().Add(time.Second)) + line, err := bufio.NewReader(clientConn).ReadBytes('\n') + if err != nil { + t.Fatal(err) + } + + var response map[string]interface{} + if err := json.Unmarshal(line, &response); err != nil { + t.Fatal(err) + } + + result := response["result"].(map[string]interface{}) + if result["status"] != "KEEPALIVED" { + t.Fatalf("unexpected keepalived response: %#v", response) + } +} + +func TestMiner_Submit_Ugly(t *testing.T) { + serverConn, clientConn := net.Pipe() + defer clientConn.Close() + + miner := NewMiner(serverConn, 3333, nil) + miner.Start() + defer miner.Close() + + miner.SetRPCID("session") + miner.SetState(MinerStateReady) + + encoder := json.NewEncoder(clientConn) + if err := encoder.Encode(map[string]interface{}{ + "id": 3, + "jsonrpc": "2.0", + "method": "submit", + "params": map[string]interface{}{ + "id": "session", + "job_id": "job-1", + "nonce": "ABC123", + "result": "abc", + "algo": "cn/r", + }, + }); err != nil { + t.Fatal(err) + } + + clientConn.SetReadDeadline(time.Now().Add(time.Second)) + line, err := bufio.NewReader(clientConn).ReadBytes('\n') + if err != nil { + t.Fatal(err) + } + + var response map[string]interface{} + if err := json.Unmarshal(line, &response); err != nil { + t.Fatal(err) + } + + if response["error"] == nil { + t.Fatalf("expected invalid nonce error, got %#v", response) + } +} diff --git a/proxy.go b/proxy.go index e0aa703..b555e20 100644 --- a/proxy.go +++ b/proxy.go @@ -26,6 +26,8 @@ type Proxy struct { stats *Stats workers *Workers events *EventBus + miners map[int64]*Miner + minerMu sync.RWMutex servers []*Server ticker *time.Ticker watcher *ConfigWatcher @@ -120,3 +122,29 @@ type tokenBucket struct { type CustomDiff struct { globalDiff uint64 } + +var splitterFactories = map[string]func(*Config, *EventBus) Splitter{ + "": noopSplitterFactory, +} + +// RegisterSplitterFactory registers a splitter constructor for a mode name. +// +// proxy.RegisterSplitterFactory("nicehash", func(cfg *proxy.Config, bus *proxy.EventBus) proxy.Splitter { +// return nicehash.NewNonceSplitter(cfg, bus, pool.NewStrategyFactory(cfg)) +// }) +func RegisterSplitterFactory(mode string, factory func(*Config, *EventBus) Splitter) { + if mode == "" || factory == nil { + return + } + splitterFactories[mode] = factory +} + +func newSplitter(cfg *Config, events *EventBus) Splitter { + if cfg == nil { + return noopSplitter{} + } + if factory, exists := splitterFactories[cfg.Mode]; exists && factory != nil { + return factory(cfg, events) + } + return noopSplitter{} +} diff --git a/proxy_runtime.go b/proxy_runtime.go index 14a625f..3d02c6f 100644 --- a/proxy_runtime.go +++ b/proxy_runtime.go @@ -1,6 +1,7 @@ package proxy import ( + "crypto/tls" "net" "sync/atomic" "time" @@ -18,20 +19,26 @@ func New(cfg *Config) (*Proxy, error) { events := NewEventBus() stats := NewStats() + events.Subscribe(EventLogin, NewCustomDiff(cfg.CustomDiff).OnLogin) workers := NewWorkers(cfg.Workers, events) + splitter := newSplitter(cfg, events) proxyValue := &Proxy{ config: cfg, - splitter: noopSplitter{}, + splitter: splitter, stats: stats, workers: workers, events: events, + miners: make(map[int64]*Miner), done: make(chan struct{}), } - events.Subscribe(EventAccept, stats.OnAccept) - events.Subscribe(EventReject, stats.OnReject) events.Subscribe(EventLogin, func(event Event) { + if event.Miner != nil { + proxyValue.minerMu.Lock() + proxyValue.miners[event.Miner.ID()] = event.Miner + proxyValue.minerMu.Unlock() + } stats.connections.Add(1) current := proxyMinerCount.Add(1) for { @@ -42,11 +49,29 @@ func New(cfg *Config) (*Proxy, error) { } }) events.Subscribe(EventClose, func(event Event) { + if event.Miner != nil { + proxyValue.minerMu.Lock() + delete(proxyValue.miners, event.Miner.ID()) + proxyValue.minerMu.Unlock() + } if proxyMinerCount.Load() > 0 { proxyMinerCount.Add(^uint64(0)) } }) - events.Subscribe(EventLogin, NewCustomDiff(cfg.CustomDiff).OnLogin) + events.Subscribe(EventAccept, stats.OnAccept) + events.Subscribe(EventReject, stats.OnReject) + if splitter != nil { + events.Subscribe(EventLogin, func(event Event) { + splitter.OnLogin(&LoginEvent{Miner: event.Miner}) + }) + events.Subscribe(EventClose, func(event Event) { + splitter.OnClose(&CloseEvent{Miner: event.Miner}) + }) + } + if cfg.Watch && cfg.sourcePath != "" { + proxyValue.watcher = NewConfigWatcher(cfg.sourcePath, proxyValue.Reload) + proxyValue.watcher.Start() + } return proxyValue, nil } @@ -61,7 +86,14 @@ func (p *Proxy) Start() { p.ticker = time.NewTicker(time.Second) for _, bind := range p.config.Bind { - server, errorValue := NewServer(bind, nil, NewRateLimiter(p.config.RateLimit), p.acceptConn) + var tlsConfig *tls.Config + if bind.TLS && p.config.TLS.Enabled { + certificate, errorValue := tls.LoadX509KeyPair(p.config.TLS.CertFile, p.config.TLS.KeyFile) + if errorValue == nil { + tlsConfig = &tls.Config{Certificates: []tls.Certificate{certificate}} + } + } + server, errorValue := NewServer(bind, tlsConfig, NewRateLimiter(p.config.RateLimit), p.acceptConn) if errorValue != nil { continue } @@ -97,6 +129,10 @@ func (noopSplitter) Tick(ticks uint64) {} func (noopSplitter) GC() {} func (noopSplitter) Upstreams() UpstreamStats { return UpstreamStats{} } +func noopSplitterFactory(cfg *Config, events *EventBus) Splitter { + return noopSplitter{} +} + // Stop shuts down all subsystems cleanly. // // p.Stop() @@ -127,18 +163,42 @@ func (p *Proxy) Reload(cfg *Config) { } func (p *Proxy) Summary() StatsSummary { + if p == nil || p.stats == nil { + return StatsSummary{} + } return p.stats.Summary() } func (p *Proxy) Workers() []WorkerRecord { + if p == nil || p.workers == nil { + return nil + } return p.workers.List() } +func (p *Proxy) Miners() []*Miner { + if p == nil { + return nil + } + + p.minerMu.RLock() + defer p.minerMu.RUnlock() + + miners := make([]*Miner, 0, len(p.miners)) + for _, miner := range p.miners { + miners = append(miners, miner) + } + return miners +} + func (p *Proxy) CurrentMiners() uint64 { return proxyMinerCount.Load() } func (p *Proxy) MaxMiners() uint64 { + if p == nil || p.stats == nil { + return 0 + } return p.stats.maxMiners.Load() } @@ -165,5 +225,10 @@ func (p *Proxy) Upstreams() UpstreamStats { func (p *Proxy) acceptConn(conn net.Conn, localPort uint16) { miner := NewMiner(conn, localPort, nil) - p.events.Dispatch(Event{Type: EventLogin, Miner: miner}) + miner.events = p.events + miner.splitter = p.splitter + if p.config != nil { + miner.accessPassword = p.config.AccessPassword + } + miner.Start() } diff --git a/server_runtime.go b/server_runtime.go index 9030be1..6fae353 100644 --- a/server_runtime.go +++ b/server_runtime.go @@ -52,6 +52,10 @@ func (s *Server) Start() { continue } + if s.tlsCfg != nil { + conn = tls.Server(conn, s.tlsCfg) + } + if s.onAccept == nil { _ = conn.Close() continue diff --git a/splitter/nicehash/mapper.go b/splitter/nicehash/mapper.go index 7ed0a02..1525921 100644 --- a/splitter/nicehash/mapper.go +++ b/splitter/nicehash/mapper.go @@ -16,8 +16,9 @@ type NonceMapper struct { id int64 storage *NonceStorage strategy pool.Strategy // manages pool client lifecycle and failover - pending map[int64]SubmitContext // sequence → {requestID, minerID} + pending map[int64]SubmitContext // sequence → {requestID, minerID, jobID} cfg *proxy.Config + events *proxy.EventBus active bool // true once pool has sent at least one job suspended int // > 0 when pool connection is in error/reconnecting mu sync.Mutex @@ -29,6 +30,7 @@ type NonceMapper struct { type SubmitContext struct { RequestID int64 // JSON-RPC id from the miner's submit request MinerID int64 // miner that submitted + JobID string } // NewNonceMapper creates one upstream pool mapper and its local slot table. @@ -53,15 +55,22 @@ func (m *NonceMapper) Remove(miner *proxy.Miner) { } func (m *NonceMapper) Submit(event *proxy.SubmitEvent) { - if event == nil || m.strategy == nil { + if event == nil || event.Miner == nil || m.strategy == nil { return } sequence := m.strategy.Submit(event.JobID, event.Nonce, event.Result, event.Algo) + if sequence == 0 { + if event.Miner != nil { + event.Miner.ReplyWithError(event.RequestID, "Pool unavailable") + } + return + } m.mu.Lock() m.pending[sequence] = SubmitContext{ RequestID: event.RequestID, MinerID: event.Miner.ID(), + JobID: event.JobID, } m.mu.Unlock() } @@ -72,3 +81,61 @@ func (m *NonceMapper) IsActive() bool { } return m.strategy.IsActive() } + +func (m *NonceMapper) OnJob(job proxy.Job) { + if !job.IsValid() { + return + } + + m.mu.Lock() + m.active = true + m.suspended = 0 + m.mu.Unlock() + m.storage.SetJob(job) +} + +func (m *NonceMapper) OnResultAccepted(sequence int64, accepted bool, errorMessage string) { + m.mu.Lock() + context, exists := m.pending[sequence] + if exists { + delete(m.pending, sequence) + } + m.mu.Unlock() + if !exists { + return + } + + miner := m.storage.Miners()[context.MinerID] + if miner == nil { + return + } + + eventType := proxy.EventReject + if accepted { + eventType = proxy.EventAccept + } + + if m.events != nil { + jobCopy := m.storage.CurrentJob() + m.events.Dispatch(proxy.Event{ + Type: eventType, + Miner: miner, + Job: jobCopy, + Diff: miner.Diff(), + Error: errorMessage, + }) + } + + if accepted { + miner.Success(context.RequestID, "OK") + return + } + miner.ReplyWithError(context.RequestID, errorMessage) +} + +func (m *NonceMapper) OnDisconnect() { + m.mu.Lock() + m.active = false + m.suspended++ + m.mu.Unlock() +} diff --git a/splitter/nicehash/register.go b/splitter/nicehash/register.go new file mode 100644 index 0000000..45426e3 --- /dev/null +++ b/splitter/nicehash/register.go @@ -0,0 +1,12 @@ +package nicehash + +import ( + "dappco.re/go/core/proxy" + "dappco.re/go/core/proxy/pool" +) + +func init() { + proxy.RegisterSplitterFactory("nicehash", func(cfg *proxy.Config, events *proxy.EventBus) proxy.Splitter { + return NewNonceSplitter(cfg, events, pool.NewStrategyFactory(cfg)) + }) +} diff --git a/splitter/nicehash/splitter.go b/splitter/nicehash/splitter.go index 0532175..2459d9f 100644 --- a/splitter/nicehash/splitter.go +++ b/splitter/nicehash/splitter.go @@ -64,6 +64,7 @@ func (s *NonceSplitter) OnLogin(event *proxy.LoginEvent) { for _, mapper := range s.mappers { if mapper.Add(event.Miner) { + mapper.events = s.events event.Miner.SetMapperID(mapper.id) event.Miner.SetNiceHashEnabled(true) return @@ -74,6 +75,7 @@ func (s *NonceSplitter) OnLogin(event *proxy.LoginEvent) { s.mappers = append(s.mappers, mapper) mapper.strategy.Connect() if mapper.Add(event.Miner) { + mapper.events = s.events event.Miner.SetMapperID(mapper.id) event.Miner.SetNiceHashEnabled(true) } @@ -154,9 +156,12 @@ func (s *NonceSplitter) Upstreams() proxy.UpstreamStats { func (s *NonceSplitter) newMapperLocked() *NonceMapper { mapperID := int64(len(s.mappers) + 1) + mapper := NewNonceMapper(mapperID, s.cfg, nil) + mapper.events = s.events var strategy pool.Strategy if s.strategyFactory != nil { - strategy = s.strategyFactory(nil) + strategy = s.strategyFactory(mapper) } - return NewNonceMapper(mapperID, s.cfg, strategy) + mapper.strategy = strategy + return mapper } diff --git a/splitter/nicehash/storage.go b/splitter/nicehash/storage.go index b2661af..4d80414 100644 --- a/splitter/nicehash/storage.go +++ b/splitter/nicehash/storage.go @@ -140,3 +140,28 @@ func (s *NonceStorage) SlotCount() (free int, dead int, active int) { return free, dead, active } + +// Miners returns a snapshot of the active miner map. +func (s *NonceStorage) Miners() map[int64]*proxy.Miner { + s.mu.Lock() + defer s.mu.Unlock() + + miners := make(map[int64]*proxy.Miner, len(s.miners)) + for minerID, miner := range s.miners { + miners[minerID] = miner + } + return miners +} + +// CurrentJob returns a copy of the latest assigned job, if any. +func (s *NonceStorage) CurrentJob() *proxy.Job { + s.mu.Lock() + defer s.mu.Unlock() + + if !s.job.IsValid() { + return nil + } + + jobCopy := s.job + return &jobCopy +} diff --git a/splitter/simple/mapper.go b/splitter/simple/mapper.go index afe5dcb..544269d 100644 --- a/splitter/simple/mapper.go +++ b/splitter/simple/mapper.go @@ -1,6 +1,7 @@ package simple import ( + "sync" "time" "dappco.re/go/core/proxy" @@ -16,13 +17,56 @@ type SimpleMapper struct { id int64 miner *proxy.Miner // nil when idle strategy pool.Strategy + events *proxy.EventBus + pending map[int64]int64 idleAt time.Time // zero when active stopped bool + mu sync.Mutex } // NewSimpleMapper stores the mapper ID and strategy. // // mapper := simple.NewSimpleMapper(1, strategy) func NewSimpleMapper(id int64, strategy pool.Strategy) *SimpleMapper { - return &SimpleMapper{id: id, strategy: strategy} + return &SimpleMapper{id: id, strategy: strategy, pending: make(map[int64]int64)} +} + +func (m *SimpleMapper) OnJob(job proxy.Job) { + if !job.IsValid() || m.miner == nil { + return + } + + m.miner.ForwardJob(job, job.Algo) +} + +func (m *SimpleMapper) OnResultAccepted(sequence int64, accepted bool, errorMessage string) { + if m.miner == nil { + return + } + + m.mu.Lock() + requestID, exists := m.pending[sequence] + if !exists { + m.mu.Unlock() + return + } + delete(m.pending, sequence) + m.mu.Unlock() + + if accepted { + if m.events != nil { + m.events.Dispatch(proxy.Event{Type: proxy.EventAccept, Miner: m.miner, Diff: m.miner.Diff()}) + } + m.miner.Success(requestID, "OK") + return + } + + if m.events != nil { + m.events.Dispatch(proxy.Event{Type: proxy.EventReject, Miner: m.miner, Diff: m.miner.Diff(), Error: errorMessage}) + } + m.miner.ReplyWithError(requestID, errorMessage) +} + +func (m *SimpleMapper) OnDisconnect() { + m.stopped = true } diff --git a/splitter/simple/register.go b/splitter/simple/register.go new file mode 100644 index 0000000..aae93cb --- /dev/null +++ b/splitter/simple/register.go @@ -0,0 +1,12 @@ +package simple + +import ( + "dappco.re/go/core/proxy" + "dappco.re/go/core/proxy/pool" +) + +func init() { + proxy.RegisterSplitterFactory("simple", func(cfg *proxy.Config, events *proxy.EventBus) proxy.Splitter { + return NewSimpleSplitter(cfg, events, pool.NewStrategyFactory(cfg)) + }) +} diff --git a/splitter/simple/splitter.go b/splitter/simple/splitter.go index 81f3e19..1474607 100644 --- a/splitter/simple/splitter.go +++ b/splitter/simple/splitter.go @@ -61,13 +61,17 @@ func (s *SimpleSplitter) OnLogin(event *proxy.LoginEvent) { if mapper == nil { s.seq++ var strategy pool.Strategy + mapper = NewSimpleMapper(s.seq, nil) + mapper.events = s.events if s.factory != nil { - strategy = s.factory(nil) + strategy = s.factory(mapper) } - mapper = NewSimpleMapper(s.seq, strategy) + mapper.strategy = strategy if mapper.strategy != nil { mapper.strategy.Connect() } + } else { + mapper.events = s.events } mapper.miner = event.Miner @@ -88,7 +92,15 @@ func (s *SimpleSplitter) OnSubmit(event *proxy.SubmitEvent) { return } - mapper.strategy.Submit(event.JobID, event.Nonce, event.Result, event.Algo) + sequence := mapper.strategy.Submit(event.JobID, event.Nonce, event.Result, event.Algo) + if sequence == 0 { + event.Miner.ReplyWithError(event.RequestID, "Pool unavailable") + return + } + + mapper.mu.Lock() + mapper.pending[sequence] = event.RequestID + mapper.mu.Unlock() } func (s *SimpleSplitter) OnClose(event *proxy.CloseEvent) {