diff --git a/config.go b/config.go index e8493bd..f08f40d 100644 --- a/config.go +++ b/config.go @@ -16,6 +16,7 @@ type Config struct { AlgoExtension bool `json:"algo-ext"` // forward algo field in jobs Workers WorkersMode `json:"workers"` // "rig-id", "user", "password", "agent", "ip", "false" AccessLogFile string `json:"access-log-file"` // "" = disabled + ShareLogFile string `json:"share-log-file"` // "" = disabled ReuseTimeout int `json:"reuse-timeout"` // seconds; simple mode upstream reuse Retries int `json:"retries"` // pool reconnect attempts RetryPause int `json:"retry-pause"` // seconds between retries diff --git a/miner.go b/miner.go index 5bb09f5..480f947 100644 --- a/miner.go +++ b/miner.go @@ -30,6 +30,7 @@ type Miner struct { state MinerState stateMu sync.RWMutex extAlgo bool // miner sent algo list in login params + algoExtension bool // config allows forwarding algo negotiation extNH bool // NiceHash mode active (fixed byte splitting) ip string // remote IP (without port, for logging) localPort uint16 diff --git a/miner_runtime.go b/miner_runtime.go index bad544d..59f8d4e 100644 --- a/miner_runtime.go +++ b/miner_runtime.go @@ -75,7 +75,7 @@ func (m *Miner) ForwardJob(job Job, algo string) { "target": job.Target, "id": m.rpcID, } - if m.extAlgo && algo != "" { + if m.algoExtension && m.extAlgo && algo != "" { params["algo"] = algo } @@ -229,17 +229,17 @@ func (m *Miner) handleLogin(request minerRequest) { "target": jobCopy.Target, "id": m.rpcID, } - if m.extAlgo && jobCopy.Algo != "" { + if m.algoExtension && m.extAlgo && jobCopy.Algo != "" { jobResult["algo"] = jobCopy.Algo } result["job"] = jobResult - if m.extAlgo { + if m.algoExtension && m.extAlgo { result["extensions"] = []string{"algo"} } m.SetState(MinerStateReady) } else { m.SetState(MinerStateWaitReady) - if m.extAlgo { + if m.algoExtension && m.extAlgo { result["extensions"] = []string{"algo"} } } diff --git a/miner_runtime_test.go b/miner_runtime_test.go index 712c336..18ad9c8 100644 --- a/miner_runtime_test.go +++ b/miner_runtime_test.go @@ -127,3 +127,45 @@ func TestMiner_Submit_Ugly(t *testing.T) { t.Fatalf("expected invalid nonce error, got %#v", response) } } + +func TestMiner_Login_Ugly(t *testing.T) { + serverConn, clientConn := net.Pipe() + defer clientConn.Close() + + miner := NewMiner(serverConn, 3333, nil) + miner.algoExtension = true + miner.Start() + defer miner.Close() + + encoder := json.NewEncoder(clientConn) + if err := encoder.Encode(map[string]interface{}{ + "id": 4, + "jsonrpc": "2.0", + "method": "login", + "params": map[string]interface{}{ + "login": "wallet", + "pass": "x", + "agent": "xmrig", + "algo": []string{"cn/r"}, + }, + }); err != nil { + t.Fatal(err) + } + + clientConn.SetReadDeadline(time.Now().Add(time.Second)) + line, err := bufio.NewReader(clientConn).ReadBytes('\n') + if err != nil { + t.Fatal(err) + } + + var response map[string]interface{} + if err := json.Unmarshal(line, &response); err != nil { + t.Fatal(err) + } + + result := response["result"].(map[string]interface{}) + extensions, ok := result["extensions"].([]interface{}) + if !ok || len(extensions) != 1 || extensions[0] != "algo" { + t.Fatalf("expected algo extension to be advertised, got %#v", response) + } +} diff --git a/proxy.go b/proxy.go index c150c6a..f5e60c6 100644 --- a/proxy.go +++ b/proxy.go @@ -11,6 +11,7 @@ package proxy import ( + "net/http" "sync" "time" ) @@ -31,6 +32,7 @@ type Proxy struct { miners map[int64]*Miner minerMu sync.RWMutex servers []*Server + httpServer *http.Server ticker *time.Ticker watcher *ConfigWatcher done chan struct{} diff --git a/proxy_http_runtime.go b/proxy_http_runtime.go new file mode 100644 index 0000000..4c0d545 --- /dev/null +++ b/proxy_http_runtime.go @@ -0,0 +1,191 @@ +package proxy + +import ( + "context" + "encoding/json" + "net" + "net/http" + "strconv" + "strings" + "time" +) + +const proxyAPIVersion = "1.0.0" + +func startHTTPServer(p *Proxy) { + if p == nil || p.config == nil || !p.config.HTTP.Enabled || p.httpServer != nil { + return + } + + mux := http.NewServeMux() + registerMonitoringRoutes(mux, p) + + address := net.JoinHostPort(p.config.HTTP.Host, strconv.Itoa(int(p.config.HTTP.Port))) + listener, errorValue := net.Listen("tcp", address) + if errorValue != nil { + return + } + + server := &http.Server{ + Handler: mux, + } + p.httpServer = server + + go func() { + _ = server.Serve(listener) + }() +} + +func stopHTTPServer(p *Proxy) { + if p == nil || p.httpServer == nil { + return + } + + server := p.httpServer + p.httpServer = nil + + shutdownContext, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + _ = server.Shutdown(shutdownContext) +} + +func registerMonitoringRoutes(router *http.ServeMux, proxyValue *Proxy) { + if router == nil || proxyValue == nil { + return + } + + router.HandleFunc("/1/summary", func(writer http.ResponseWriter, request *http.Request) { + if !allowHTTPRequest(writer, request, proxyValue.HTTPConfig()) { + return + } + summary := proxyValue.Summary() + response := map[string]interface{}{ + "version": proxyAPIVersion, + "mode": proxyValue.Mode(), + "hashrate": map[string]interface{}{ + "total": summary.Hashrate, + }, + "miners": map[string]uint64{ + "now": proxyValue.CurrentMiners(), + "max": proxyValue.MaxMiners(), + }, + "workers": uint64(len(proxyValue.Workers())), + "upstreams": func() map[string]interface{} { + upstreams := proxyValue.Upstreams() + ratio := 0.0 + if upstreams.Total > 0 { + ratio = float64(proxyValue.CurrentMiners()) / float64(upstreams.Total) + } + return map[string]interface{}{ + "active": upstreams.Active, + "sleep": upstreams.Sleep, + "error": upstreams.Error, + "total": upstreams.Total, + "ratio": ratio, + } + }(), + "results": map[string]interface{}{ + "accepted": summary.Accepted, + "rejected": summary.Rejected, + "invalid": summary.Invalid, + "expired": summary.Expired, + "avg_time": summary.AvgTime, + "latency": summary.AvgLatency, + "hashes_total": summary.Hashes, + "best": summary.TopDiff, + }, + } + writeHTTPJSON(writer, response) + }) + + router.HandleFunc("/1/workers", func(writer http.ResponseWriter, request *http.Request) { + if !allowHTTPRequest(writer, request, proxyValue.HTTPConfig()) { + return + } + + records := proxyValue.Workers() + rows := make([][]interface{}, 0, len(records)) + for _, record := range records { + rows = append(rows, []interface{}{ + record.Name, + record.LastIP, + record.Connections, + record.Accepted, + record.Rejected, + record.Invalid, + record.Hashes, + record.LastHashAt.Unix(), + record.Hashrate(60), + record.Hashrate(600), + record.Hashrate(3600), + record.Hashrate(43200), + record.Hashrate(86400), + }) + } + + writeHTTPJSON(writer, map[string]interface{}{ + "mode": proxyValue.WorkersMode(), + "workers": rows, + }) + }) + + router.HandleFunc("/1/miners", func(writer http.ResponseWriter, request *http.Request) { + if !allowHTTPRequest(writer, request, proxyValue.HTTPConfig()) { + return + } + + miners := proxyValue.Miners() + rows := make([][]interface{}, 0, len(miners)) + for _, miner := range miners { + ip := "" + if remote := miner.RemoteAddr(); remote != nil { + ip = remote.String() + } + rows = append(rows, []interface{}{ + miner.ID(), + ip, + miner.TX(), + miner.RX(), + miner.State(), + miner.Diff(), + miner.User(), + "********", + miner.RigID(), + miner.Agent(), + }) + } + + writeHTTPJSON(writer, map[string]interface{}{ + "format": []string{"id", "ip", "tx", "rx", "state", "diff", "user", "password", "rig_id", "agent"}, + "miners": rows, + }) + }) +} + +func allowHTTPRequest(writer http.ResponseWriter, request *http.Request, config HTTPConfig) bool { + if request == nil { + return false + } + + if config.AccessToken != "" { + header := request.Header.Get("Authorization") + prefix := "Bearer " + if !strings.HasPrefix(header, prefix) || strings.TrimSpace(strings.TrimPrefix(header, prefix)) != config.AccessToken { + writer.Header().Set("WWW-Authenticate", "Bearer") + http.Error(writer, http.StatusText(http.StatusUnauthorized), http.StatusUnauthorized) + return false + } + } + + if config.Restricted && request.Method != http.MethodGet { + http.Error(writer, http.StatusText(http.StatusMethodNotAllowed), http.StatusMethodNotAllowed) + return false + } + + return true +} + +func writeHTTPJSON(writer http.ResponseWriter, value interface{}) { + writer.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(writer).Encode(value) +} diff --git a/proxy_logging_runtime.go b/proxy_logging_runtime.go new file mode 100644 index 0000000..e7f36cc --- /dev/null +++ b/proxy_logging_runtime.go @@ -0,0 +1,97 @@ +package proxy + +import ( + "fmt" + "os" + "sync" + "time" +) + +type appendLineLogger struct { + path string + mu sync.Mutex + file *os.File +} + +func newAppendLineLogger(path string) *appendLineLogger { + return &appendLineLogger{path: path} +} + +func (l *appendLineLogger) writeLine(line string) { + if l == nil || l.path == "" { + return + } + + l.mu.Lock() + defer l.mu.Unlock() + + if l.file == nil { + file, errorValue := os.OpenFile(l.path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644) + if errorValue != nil { + return + } + l.file = file + } + + _, _ = l.file.WriteString(line) +} + +func subscribeAccessLog(events *EventBus, path string) { + if events == nil || path == "" { + return + } + + logger := newAppendLineLogger(path) + events.Subscribe(EventLogin, func(event Event) { + if event.Miner == nil { + return + } + logger.writeLine(fmt.Sprintf("%s CONNECT %s %s %s\n", + time.Now().UTC().Format(time.RFC3339), + event.Miner.IP(), + event.Miner.User(), + event.Miner.Agent(), + )) + }) + events.Subscribe(EventClose, func(event Event) { + if event.Miner == nil { + return + } + logger.writeLine(fmt.Sprintf("%s CLOSE %s %s rx=%d tx=%d\n", + time.Now().UTC().Format(time.RFC3339), + event.Miner.IP(), + event.Miner.User(), + event.Miner.RX(), + event.Miner.TX(), + )) + }) +} + +func subscribeShareLog(events *EventBus, path string) { + if events == nil || path == "" { + return + } + + logger := newAppendLineLogger(path) + events.Subscribe(EventAccept, func(event Event) { + if event.Miner == nil { + return + } + logger.writeLine(fmt.Sprintf("%s ACCEPT %s diff=%d latency=%dms\n", + time.Now().UTC().Format(time.RFC3339), + event.Miner.User(), + event.Diff, + event.Latency, + )) + }) + events.Subscribe(EventReject, func(event Event) { + if event.Miner == nil { + return + } + logger.writeLine(fmt.Sprintf("%s REJECT %s reason=%q\n", + time.Now().UTC().Format(time.RFC3339), + event.Miner.User(), + event.Error, + )) + }) +} diff --git a/proxy_runtime.go b/proxy_runtime.go index f3be738..05461a0 100644 --- a/proxy_runtime.go +++ b/proxy_runtime.go @@ -36,6 +36,9 @@ func New(cfg *Config) (*Proxy, error) { done: make(chan struct{}), } + subscribeAccessLog(events, cfg.AccessLogFile) + subscribeShareLog(events, cfg.ShareLogFile) + events.Subscribe(EventLogin, func(event Event) { if event.Miner != nil { proxyValue.minerMu.Lock() @@ -104,6 +107,10 @@ func (p *Proxy) Start() { server.Start() } + if p.config != nil && p.config.HTTP.Enabled { + startHTTPServer(p) + } + go func() { var ticks uint64 for { @@ -123,6 +130,8 @@ func (p *Proxy) Start() { } } }() + + <-p.done } type noopSplitter struct{} @@ -149,6 +158,7 @@ func (p *Proxy) Stop() { for _, server := range p.servers { server.Stop() } + stopHTTPServer(p) if p.watcher != nil { p.watcher.Stop() } @@ -254,6 +264,7 @@ func (p *Proxy) acceptConn(conn net.Conn, localPort uint16) { miner.splitter = p.splitter if p.config != nil { miner.accessPassword = p.config.AccessPassword + miner.algoExtension = p.config.AlgoExtension } miner.Start() } diff --git a/runtime_support.go b/runtime_support.go index 9faaf4e..fc86333 100644 --- a/runtime_support.go +++ b/runtime_support.go @@ -156,12 +156,13 @@ func (cd *CustomDiff) OnLogin(event Event) { user := event.Miner.User() index := strings.LastIndex(user, "+") - if index > 0 { + if index > 0 && index < len(user)-1 { if value, errorValue := strconv.ParseUint(user[index+1:], 10, 64); errorValue == nil { event.Miner.SetUser(user[:index]) event.Miner.SetCustomDiff(value) return } + return } if cd == nil { diff --git a/runtime_support_test.go b/runtime_support_test.go index df9b6fd..907c6a1 100644 --- a/runtime_support_test.go +++ b/runtime_support_test.go @@ -51,7 +51,7 @@ func TestCustomDiff_OnLogin_Bad(t *testing.T) { func TestCustomDiff_OnLogin_Ugly(t *testing.T) { miner := &Miner{user: "wallet+bad"} NewCustomDiff(100).OnLogin(Event{Miner: miner}) - if miner.User() != "wallet+bad" || miner.CustomDiff() != 100 { - t.Fatalf("expected invalid suffix to preserve user and apply global diff, got user=%q diff=%d", miner.User(), miner.CustomDiff()) + if miner.User() != "wallet+bad" || miner.CustomDiff() != 0 { + t.Fatalf("expected invalid suffix to preserve user and leave diff unset, got user=%q diff=%d", miner.User(), miner.CustomDiff()) } } diff --git a/splitter/simple/splitter.go b/splitter/simple/splitter.go index c848970..4e0c925 100644 --- a/splitter/simple/splitter.go +++ b/splitter/simple/splitter.go @@ -53,6 +53,13 @@ func (s *SimpleSplitter) OnLogin(event *proxy.LoginEvent) { var mapper *SimpleMapper for mapperID, idleMapper := range s.idle { + if idleMapper == nil || idleMapper.stopped || idleMapper.strategy == nil || !idleMapper.strategy.IsActive() { + if idleMapper != nil && idleMapper.strategy != nil { + idleMapper.strategy.Disconnect() + } + delete(s.idle, mapperID) + continue + } mapper = idleMapper delete(s.idle, mapperID) break @@ -156,7 +163,11 @@ func (s *SimpleSplitter) GC() { now := time.Now().UTC() for mapperID, mapper := range s.idle { - if timeout == 0 || (!mapper.idleAt.IsZero() && now.Sub(mapper.idleAt) > timeout) || mapper.stopped { + if mapper == nil { + delete(s.idle, mapperID) + continue + } + if mapper.stopped || mapper.strategy == nil || !mapper.strategy.IsActive() || timeout == 0 || (!mapper.idleAt.IsZero() && now.Sub(mapper.idleAt) > timeout) { if mapper.strategy != nil { mapper.strategy.Disconnect() } diff --git a/splitter/simple/splitter_test.go b/splitter/simple/splitter_test.go new file mode 100644 index 0000000..2a5ddd4 --- /dev/null +++ b/splitter/simple/splitter_test.go @@ -0,0 +1,61 @@ +package simple + +import ( + "testing" + "time" + + "dappco.re/go/core/proxy" + "dappco.re/go/core/proxy/pool" +) + +type fakeStrategy struct { + active bool + connects int + disconnects int +} + +func (s *fakeStrategy) Connect() {} + +func (s *fakeStrategy) Submit(jobID, nonce, result, algo string) int64 { return 1 } + +func (s *fakeStrategy) Disconnect() { + s.disconnects++ + s.active = false +} + +func (s *fakeStrategy) IsActive() bool { return s.active } + +func TestSimpleSplitter_OnLogin_Ugly(t *testing.T) { + deadStrategy := &fakeStrategy{active: false} + liveStrategy := &fakeStrategy{active: true} + splitter := &SimpleSplitter{ + active: make(map[int64]*SimpleMapper), + idle: map[int64]*SimpleMapper{ + 1: { + id: 1, + strategy: deadStrategy, + idleAt: time.Now().UTC(), + }, + }, + cfg: &proxy.Config{ReuseTimeout: 60}, + factory: func(listener pool.StratumListener) pool.Strategy { + return liveStrategy + }, + } + + miner := &proxy.Miner{} + splitter.OnLogin(&proxy.LoginEvent{Miner: miner}) + + if len(splitter.idle) != 0 { + t.Fatalf("expected dead idle mapper to be discarded, got %d idle mappers", len(splitter.idle)) + } + if len(splitter.active) != 1 { + t.Fatalf("expected one active mapper, got %d", len(splitter.active)) + } + if deadStrategy.disconnects != 1 { + t.Fatalf("expected dead mapper to be disconnected once, got %d", deadStrategy.disconnects) + } + if miner.RouteID() == 0 { + t.Fatal("expected miner to receive a route ID") + } +}