From 48c6e0fc6d7ff167071f78cca002c612626f3ea3 Mon Sep 17 00:00:00 2001 From: Virgil Date: Sat, 4 Apr 2026 10:39:59 +0000 Subject: [PATCH] feat(proxy): implement RFC runtime primitives Co-Authored-By: Virgil --- api/router.go | 109 ++++++++++++++++ config.go | 34 ++--- config_runtime.go | 115 +++++++++++++++++ events.go | 35 +++++ job.go | 54 ++++++++ job_test.go | 68 ++++++++++ log/access.go | 70 +++++++++- log/share.go | 73 ++++++++++- miner.go | 36 +++--- miner_methods.go | 80 ++++++++++++ miner_runtime.go | 144 +++++++++++++++++++++ pool/client.go | 207 ++++++++++++++++++++++++++++++ pool/strategy.go | 93 ++++++++++++++ proxy_runtime.go | 169 ++++++++++++++++++++++++ runtime_support.go | 136 ++++++++++++++++++++ runtime_support_test.go | 57 ++++++++ server_runtime.go | 80 ++++++++++++ splitter/nicehash/mapper.go | 44 ++++++- splitter/nicehash/splitter.go | 132 +++++++++++++++++++ splitter/nicehash/storage.go | 117 +++++++++++++++++ splitter/nicehash/storage_test.go | 61 +++++++++ splitter/simple/mapper.go | 7 + splitter/simple/splitter.go | 136 ++++++++++++++++++++ stats.go | 164 ++++++++++++++++++++++- stats_workers_test.go | 94 ++++++++++++++ worker.go | 181 +++++++++++++++++++++++++- 26 files changed, 2451 insertions(+), 45 deletions(-) create mode 100644 config_runtime.go create mode 100644 job_test.go create mode 100644 miner_methods.go create mode 100644 miner_runtime.go create mode 100644 proxy_runtime.go create mode 100644 runtime_support.go create mode 100644 runtime_support_test.go create mode 100644 server_runtime.go create mode 100644 splitter/nicehash/storage_test.go create mode 100644 stats_workers_test.go diff --git a/api/router.go b/api/router.go index 170514e..1eba171 100644 --- a/api/router.go +++ b/api/router.go @@ -9,6 +9,21 @@ // proxyapi.RegisterRoutes(apiRouter, p) package api +import ( + "encoding/json" + "net/http" + + "dappco.re/go/core/proxy" +) + +// Router is the minimal route-registration surface used by RegisterRoutes. +// +// mux := http.NewServeMux() +// api.RegisterRoutes(mux, p) +type Router interface { + HandleFunc(pattern string, handler func(http.ResponseWriter, *http.Request)) +} + // SummaryResponse is the /1/summary JSON body. // // {"version":"1.0.0","mode":"nicehash","hashrate":{"total":[...]}, ...} @@ -61,3 +76,97 @@ type ResultsResponse struct { HashesTotal uint64 `json:"hashes_total"` Best [10]uint64 `json:"best"` } + +// RegisterRoutes registers the proxy monitoring routes on a HTTP router. +// +// api.RegisterRoutes(http.NewServeMux(), p) +func RegisterRoutes(router Router, proxyValue *proxy.Proxy) { + if router == nil || proxyValue == nil { + return + } + + router.HandleFunc("/1/summary", func(writer http.ResponseWriter, request *http.Request) { + summary := proxyValue.Summary() + response := SummaryResponse{ + Version: "1.0.0", + Mode: proxyValue.Mode(), + Hashrate: HashrateResponse{ + Total: summary.Hashrate, + }, + Miners: MinersCountResponse{ + Now: proxyValue.CurrentMiners(), + Max: proxyValue.MaxMiners(), + }, + Workers: uint64(len(proxyValue.Workers())), + Upstreams: func() UpstreamResponse { + upstreams := proxyValue.Upstreams() + ratio := 0.0 + if upstreams.Total > 0 { + ratio = float64(proxyValue.CurrentMiners()) / float64(upstreams.Total) + } + return UpstreamResponse{ + Active: upstreams.Active, + Sleep: upstreams.Sleep, + Error: upstreams.Error, + Total: upstreams.Total, + Ratio: ratio, + } + }(), + Results: ResultsResponse{ + Accepted: summary.Accepted, + Rejected: summary.Rejected, + Invalid: summary.Invalid, + Expired: summary.Expired, + AvgTime: summary.AvgTime, + Latency: summary.AvgLatency, + HashesTotal: summary.Hashes, + Best: summary.TopDiff, + }, + } + writeJSON(writer, response) + }) + + router.HandleFunc("/1/workers", func(writer http.ResponseWriter, request *http.Request) { + type responseBody struct { + Mode string `json:"mode"` + Workers [][]interface{} `json:"workers"` + } + + records := proxyValue.Workers() + rows := make([][]interface{}, 0, len(records)) + for _, record := range records { + rows = append(rows, []interface{}{ + record.Name, + record.LastIP, + record.Connections, + record.Accepted, + record.Rejected, + record.Invalid, + record.Hashes, + record.LastHashAt.Unix(), + record.Hashrate(60), + record.Hashrate(600), + record.Hashrate(3600), + record.Hashrate(43200), + record.Hashrate(86400), + }) + } + + writeJSON(writer, responseBody{ + Mode: proxyValue.WorkersMode(), + Workers: rows, + }) + }) + + router.HandleFunc("/1/miners", func(writer http.ResponseWriter, request *http.Request) { + writeJSON(writer, map[string]interface{}{ + "format": []string{"id", "ip", "tx", "rx", "state", "diff", "user", "password", "rig_id", "agent"}, + "miners": [][]interface{}{}, + }) + }) +} + +func writeJSON(writer http.ResponseWriter, value interface{}) { + writer.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(writer).Encode(value) +} diff --git a/config.go b/config.go index 895f7e6..9de0349 100644 --- a/config.go +++ b/config.go @@ -5,22 +5,22 @@ package proxy // cfg, result := proxy.LoadConfig("config.json") // if !result.OK { log.Fatal(result.Error) } type Config struct { - Mode string `json:"mode"` // "nicehash" or "simple" - Bind []BindAddr `json:"bind"` // listen addresses - Pools []PoolConfig `json:"pools"` // ordered primary + fallbacks - TLS TLSConfig `json:"tls"` // inbound TLS (miner-facing) - HTTP HTTPConfig `json:"http"` // monitoring API - AccessPassword string `json:"access-password"` // "" = no auth required - CustomDiff uint64 `json:"custom-diff"` // 0 = disabled - CustomDiffStats bool `json:"custom-diff-stats"` // report per custom-diff bucket - AlgoExtension bool `json:"algo-ext"` // forward algo field in jobs - Workers WorkersMode `json:"workers"` // "rig-id", "user", "password", "agent", "ip", "false" - AccessLogFile string `json:"access-log-file"` // "" = disabled - ReuseTimeout int `json:"reuse-timeout"` // seconds; simple mode upstream reuse - Retries int `json:"retries"` // pool reconnect attempts - RetryPause int `json:"retry-pause"` // seconds between retries - Watch bool `json:"watch"` // hot-reload on file change - RateLimit RateLimit `json:"rate-limit"` // per-IP connection rate limit + Mode string `json:"mode"` // "nicehash" or "simple" + Bind []BindAddr `json:"bind"` // listen addresses + Pools []PoolConfig `json:"pools"` // ordered primary + fallbacks + TLS TLSConfig `json:"tls"` // inbound TLS (miner-facing) + HTTP HTTPConfig `json:"http"` // monitoring API + AccessPassword string `json:"access-password"` // "" = no auth required + CustomDiff uint64 `json:"custom-diff"` // 0 = disabled + CustomDiffStats bool `json:"custom-diff-stats"` // report per custom-diff bucket + AlgoExtension bool `json:"algo-ext"` // forward algo field in jobs + Workers WorkersMode `json:"workers"` // "rig-id", "user", "password", "agent", "ip", "false" + AccessLogFile string `json:"access-log-file"` // "" = disabled + ReuseTimeout int `json:"reuse-timeout"` // seconds; simple mode upstream reuse + Retries int `json:"retries"` // pool reconnect attempts + RetryPause int `json:"retry-pause"` // seconds between retries + Watch bool `json:"watch"` // hot-reload on file change + RateLimit RateLimit `json:"rate-limit"` // per-IP connection rate limit } // BindAddr is one TCP listen endpoint. @@ -81,7 +81,7 @@ type RateLimit struct { type WorkersMode string const ( - WorkersByRigID WorkersMode = "rig-id" // rigid field, fallback to user + WorkersByRigID WorkersMode = "rig-id" // rigid field, fallback to user WorkersByUser WorkersMode = "user" WorkersByPass WorkersMode = "password" WorkersByAgent WorkersMode = "agent" diff --git a/config_runtime.go b/config_runtime.go new file mode 100644 index 0000000..49d4939 --- /dev/null +++ b/config_runtime.go @@ -0,0 +1,115 @@ +package proxy + +import ( + "encoding/json" + "errors" + "os" + "strings" + "time" +) + +// LoadConfig reads and unmarshals a JSON config file. +// +// cfg, errorValue := proxy.LoadConfig("config.json") +func LoadConfig(path string) (*Config, error) { + data, errorValue := os.ReadFile(path) + if errorValue != nil { + return nil, errorValue + } + + config := &Config{} + if errorValue = json.Unmarshal(data, config); errorValue != nil { + return nil, errorValue + } + + if errorValue = config.Validate(); errorValue != nil { + return nil, errorValue + } + + return config, nil +} + +// Validate checks required fields. +// +// if errorValue := cfg.Validate(); errorValue != nil { return errorValue } +func (c *Config) Validate() error { + if c == nil { + return errors.New("config is nil") + } + if len(c.Bind) == 0 { + return errors.New("bind list is empty") + } + if len(c.Pools) == 0 { + return errors.New("pool list is empty") + } + + for _, poolConfig := range c.Pools { + if poolConfig.Enabled && strings.TrimSpace(poolConfig.URL) == "" { + return errors.New("enabled pool URL is empty") + } + } + + return nil +} + +// NewConfigWatcher builds a polling watcher for the config file. +// +// w := proxy.NewConfigWatcher("config.json", func(cfg *proxy.Config) { p.Reload(cfg) }) +func NewConfigWatcher(path string, onChange func(*Config)) *ConfigWatcher { + return &ConfigWatcher{ + path: path, + onChange: onChange, + done: make(chan struct{}), + } +} + +// Start begins the polling goroutine. +// +// w.Start() +func (w *ConfigWatcher) Start() { + if w == nil { + return + } + + go func() { + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + info, errorValue := os.Stat(w.path) + if errorValue != nil { + continue + } + + if !info.ModTime().After(w.lastMod) { + continue + } + w.lastMod = info.ModTime() + + config, errorValue := LoadConfig(w.path) + if errorValue == nil && w.onChange != nil { + w.onChange(config) + } + case <-w.done: + return + } + } + }() +} + +// Stop ends the polling goroutine cleanly. +// +// w.Stop() +func (w *ConfigWatcher) Stop() { + if w == nil || w.done == nil { + return + } + + select { + case <-w.done: + default: + close(w.done) + } +} diff --git a/events.go b/events.go index ad0008c..711b929 100644 --- a/events.go +++ b/events.go @@ -39,3 +39,38 @@ type Event struct { Latency uint16 // pool response time in ms (Accept and Reject) Expired bool // true if the share was accepted but against the previous job } + +// NewEventBus builds an empty synchronous event dispatcher. +// +// bus := proxy.NewEventBus() +func NewEventBus() *EventBus { + return &EventBus{ + listeners: make(map[EventType][]EventHandler), + } +} + +// Subscribe registers a handler for the given event type. Safe to call before Start. +// +// bus.Subscribe(proxy.EventAccept, func(e proxy.Event) { stats.OnAccept(e) }) +func (b *EventBus) Subscribe(eventType EventType, handler EventHandler) { + if handler == nil { + return + } + + b.mu.Lock() + defer b.mu.Unlock() + b.listeners[eventType] = append(b.listeners[eventType], handler) +} + +// Dispatch calls all registered handlers for the event's type in subscription order. +// +// bus.Dispatch(proxy.Event{Type: proxy.EventLogin, Miner: m}) +func (b *EventBus) Dispatch(event Event) { + b.mu.RLock() + handlers := append([]EventHandler(nil), b.listeners[event.Type]...) + b.mu.RUnlock() + + for _, handler := range handlers { + handler(event) + } +} diff --git a/job.go b/job.go index fb6fe07..88fe42f 100644 --- a/job.go +++ b/job.go @@ -1,5 +1,12 @@ package proxy +import ( + "encoding/binary" + "encoding/hex" + "math" + "strconv" +) + // Job holds the current work unit received from a pool. Immutable once assigned. // // j := proxy.Job{ @@ -17,3 +24,50 @@ type Job struct { SeedHash string // RandomX seed hash hex (empty if not RandomX) ClientID string // pool session ID that issued this job (for stale detection) } + +// IsValid returns true if Blob and JobID are non-empty. +// +// if !job.IsValid() { return } +func (j Job) IsValid() bool { + return j.Blob != "" && j.JobID != "" +} + +// BlobWithFixedByte returns a copy of Blob with hex characters at positions 78-79 +// (blob byte index 39) replaced by the two-digit lowercase hex of fixedByte. +// +// partitioned := job.BlobWithFixedByte(0x2A) // chars 78-79 become "2a" +func (j Job) BlobWithFixedByte(fixedByte uint8) string { + if len(j.Blob) < 80 { + return j.Blob + } + + blob := []byte(j.Blob) + blob[78] = lowerHexDigit(fixedByte >> 4) + blob[79] = lowerHexDigit(fixedByte & 0x0F) + return string(blob) +} + +// DifficultyFromTarget converts the 8-char little-endian hex Target field to a uint64 difficulty. +// +// diff := job.DifficultyFromTarget() // "b88d0600" → ~100000 +func (j Job) DifficultyFromTarget() uint64 { + if len(j.Target) != 8 { + return 0 + } + + targetBytes, errorValue := hex.DecodeString(j.Target) + if errorValue != nil || len(targetBytes) != 4 { + return 0 + } + + targetValue := binary.LittleEndian.Uint32(targetBytes) + if targetValue == 0 { + return math.MaxUint64 + } + + return uint64(math.Floor(float64(math.MaxUint32) / float64(targetValue))) +} + +func lowerHexDigit(value uint8) byte { + return strconv.FormatUint(uint64(value), 16)[0] +} diff --git a/job_test.go b/job_test.go new file mode 100644 index 0000000..32a7f35 --- /dev/null +++ b/job_test.go @@ -0,0 +1,68 @@ +package proxy + +import "testing" + +func TestJob_IsValid_Good(t *testing.T) { + job := Job{Blob: "abcd", JobID: "job-1"} + if !job.IsValid() { + t.Fatal("expected valid job") + } +} + +func TestJob_IsValid_Bad(t *testing.T) { + job := Job{Blob: "abcd"} + if job.IsValid() { + t.Fatal("expected invalid job without job ID") + } +} + +func TestJob_IsValid_Ugly(t *testing.T) { + var job Job + if job.IsValid() { + t.Fatal("zero job should be invalid") + } +} + +func TestJob_BlobWithFixedByte_Good(t *testing.T) { + job := Job{Blob: "000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"} + got := job.BlobWithFixedByte(0x2a) + if got[78:80] != "2a" { + t.Fatalf("expected byte patch 2a, got %s", got[78:80]) + } +} + +func TestJob_BlobWithFixedByte_Bad(t *testing.T) { + job := Job{Blob: "short"} + if got := job.BlobWithFixedByte(0x2a); got != "short" { + t.Fatalf("expected short blob unchanged, got %q", got) + } +} + +func TestJob_BlobWithFixedByte_Ugly(t *testing.T) { + job := Job{Blob: "ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff"} + got := job.BlobWithFixedByte(0x00) + if got[78:80] != "00" { + t.Fatalf("expected byte patch 00, got %s", got[78:80]) + } +} + +func TestJob_DifficultyFromTarget_Good(t *testing.T) { + job := Job{Target: "b88d0600"} + if got := job.DifficultyFromTarget(); got == 0 { + t.Fatal("expected non-zero difficulty") + } +} + +func TestJob_DifficultyFromTarget_Bad(t *testing.T) { + job := Job{Target: "zzzzzzzz"} + if got := job.DifficultyFromTarget(); got != 0 { + t.Fatalf("expected invalid target difficulty to be zero, got %d", got) + } +} + +func TestJob_DifficultyFromTarget_Ugly(t *testing.T) { + job := Job{Target: "00000000"} + if got := job.DifficultyFromTarget(); got == 0 { + t.Fatal("expected zero target to saturate difficulty") + } +} diff --git a/log/access.go b/log/access.go index b5b0105..be2e90d 100644 --- a/log/access.go +++ b/log/access.go @@ -5,7 +5,13 @@ // bus.Subscribe(proxy.EventClose, al.OnClose) package log -import "sync" +import ( + "fmt" + "os" + "sync" + + "dappco.re/go/core/proxy" +) // AccessLog writes connection lifecycle lines to an append-only text file. // @@ -20,4 +26,66 @@ type AccessLog struct { mu sync.Mutex // f is opened append-only on first write; nil until first event. // Uses core.File for I/O abstraction. + f *os.File +} + +// NewAccessLog stores the destination path and lazily opens it on first write. +// +// al := log.NewAccessLog("/var/log/proxy-access.log") +func NewAccessLog(path string) *AccessLog { + return &AccessLog{path: path} +} + +// OnLogin writes a CONNECT line. Called synchronously from the event bus. +// +// al.OnLogin(proxy.Event{Miner: miner}) +func (l *AccessLog) OnLogin(event proxy.Event) { + if event.Miner == nil { + return + } + + line := fmt.Sprintf("%s CONNECT %s %s %s\n", + timestamp(), + event.Miner.IP(), + event.Miner.User(), + event.Miner.Agent(), + ) + l.writeLine(line) +} + +// OnClose writes a CLOSE line with byte counts. +// +// al.OnClose(proxy.Event{Miner: miner}) +func (l *AccessLog) OnClose(event proxy.Event) { + if event.Miner == nil { + return + } + + line := fmt.Sprintf("%s CLOSE %s %s rx=%d tx=%d\n", + timestamp(), + event.Miner.IP(), + event.Miner.User(), + event.Miner.RX(), + event.Miner.TX(), + ) + l.writeLine(line) +} + +func (l *AccessLog) writeLine(line string) { + if l == nil || l.path == "" { + return + } + + l.mu.Lock() + defer l.mu.Unlock() + + if l.f == nil { + file, errorValue := os.OpenFile(l.path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644) + if errorValue != nil { + return + } + l.f = file + } + + _, _ = l.f.WriteString(line) } diff --git a/log/share.go b/log/share.go index cec2b74..ed99ab9 100644 --- a/log/share.go +++ b/log/share.go @@ -1,6 +1,13 @@ package log -import "sync" +import ( + "fmt" + "os" + "sync" + "time" + + "dappco.re/go/core/proxy" +) // ShareLog writes share result lines to an append-only text file. // @@ -15,4 +22,68 @@ type ShareLog struct { mu sync.Mutex // f is opened append-only on first write; nil until first event. // Uses core.File for I/O abstraction. + f *os.File +} + +// NewShareLog stores the destination path and lazily opens it on first write. +// +// sl := log.NewShareLog("/var/log/proxy-shares.log") +func NewShareLog(path string) *ShareLog { + return &ShareLog{path: path} +} + +// OnAccept writes an ACCEPT line. +// +// sl.OnAccept(proxy.Event{Miner: miner, Diff: 100000}) +func (l *ShareLog) OnAccept(event proxy.Event) { + if event.Miner == nil { + return + } + + line := fmt.Sprintf("%s ACCEPT %s diff=%d latency=%dms\n", + timestamp(), + event.Miner.User(), + event.Diff, + event.Latency, + ) + l.writeLine(line) +} + +// OnReject writes a REJECT line with the rejection reason. +// +// sl.OnReject(proxy.Event{Miner: miner, Error: "Low difficulty share"}) +func (l *ShareLog) OnReject(event proxy.Event) { + if event.Miner == nil { + return + } + + line := fmt.Sprintf("%s REJECT %s reason=%q\n", + timestamp(), + event.Miner.User(), + event.Error, + ) + l.writeLine(line) +} + +func (l *ShareLog) writeLine(line string) { + if l == nil || l.path == "" { + return + } + + l.mu.Lock() + defer l.mu.Unlock() + + if l.f == nil { + file, errorValue := os.OpenFile(l.path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644) + if errorValue != nil { + return + } + l.f = file + } + + _, _ = l.f.WriteString(line) +} + +func timestamp() string { + return time.Now().UTC().Format(time.RFC3339) } diff --git a/miner.go b/miner.go index 619ed74..41951ef 100644 --- a/miner.go +++ b/miner.go @@ -25,28 +25,28 @@ const ( // m := proxy.NewMiner(conn, 3333, nil) // m.Start() type Miner struct { - id int64 // monotonically increasing per-process; atomic assignment - rpcID string // UUID v4 sent to miner as session id + id int64 // monotonically increasing per-process; atomic assignment + rpcID string // UUID v4 sent to miner as session id state MinerState - extAlgo bool // miner sent algo list in login params - extNH bool // NiceHash mode active (fixed byte splitting) - ip string // remote IP (without port, for logging) + extAlgo bool // miner sent algo list in login params + extNH bool // NiceHash mode active (fixed byte splitting) + ip string // remote IP (without port, for logging) localPort uint16 - user string // login params.login (wallet address), custom diff suffix stripped - password string // login params.pass - agent string // login params.agent - rigID string // login params.rigid (optional extension) - fixedByte uint8 // NiceHash slot index (0-255) - mapperID int64 // which NonceMapper owns this miner; -1 = unassigned - routeID int64 // SimpleMapper ID in simple mode; -1 = unassigned - customDiff uint64 // 0 = use pool diff; non-zero = cap diff to this value - diff uint64 // last difficulty sent to this miner from the pool - rx uint64 // bytes received from miner - tx uint64 // bytes sent to miner + user string // login params.login (wallet address), custom diff suffix stripped + password string // login params.pass + agent string // login params.agent + rigID string // login params.rigid (optional extension) + fixedByte uint8 // NiceHash slot index (0-255) + mapperID int64 // which NonceMapper owns this miner; -1 = unassigned + routeID int64 // SimpleMapper ID in simple mode; -1 = unassigned + customDiff uint64 // 0 = use pool diff; non-zero = cap diff to this value + diff uint64 // last difficulty sent to this miner from the pool + rx uint64 // bytes received from miner + tx uint64 // bytes sent to miner connectedAt time.Time lastActivityAt time.Time conn net.Conn - tlsConn *tls.Conn // nil if plain TCP - sendMu sync.Mutex // serialises writes to conn + tlsConn *tls.Conn // nil if plain TCP + sendMu sync.Mutex // serialises writes to conn buf [16384]byte // per-miner send buffer; avoids per-write allocations } diff --git a/miner_methods.go b/miner_methods.go new file mode 100644 index 0000000..72ce381 --- /dev/null +++ b/miner_methods.go @@ -0,0 +1,80 @@ +package proxy + +import ( + "crypto/tls" + "net" + "sync/atomic" + "time" +) + +var minerSequence atomic.Int64 + +// NewMiner creates a Miner for an accepted net.Conn. Does not start reading yet. +// +// m := proxy.NewMiner(conn, 3333, nil) +func NewMiner(conn net.Conn, localPort uint16, tlsCfg *tls.Config) *Miner { + miner := &Miner{ + id: minerSequence.Add(1), + state: MinerStateWaitLogin, + localPort: localPort, + mapperID: -1, + routeID: -1, + conn: conn, + connectedAt: time.Now().UTC(), + lastActivityAt: time.Now().UTC(), + } + + if tlsCfg != nil { + if tlsConnection, ok := conn.(*tls.Conn); ok { + miner.tlsConn = tlsConnection + } + } + + if conn != nil && conn.RemoteAddr() != nil { + miner.ip = remoteHost(conn.RemoteAddr().String()) + } + + return miner +} + +func (m *Miner) ID() int64 { return m.id } +func (m *Miner) RPCID() string { return m.rpcID } +func (m *Miner) User() string { return m.user } +func (m *Miner) Password() string { return m.password } +func (m *Miner) Agent() string { return m.agent } +func (m *Miner) RigID() string { return m.rigID } +func (m *Miner) IP() string { return m.ip } +func (m *Miner) State() MinerState { return m.state } +func (m *Miner) Diff() uint64 { return m.diff } +func (m *Miner) FixedByte() uint8 { return m.fixedByte } +func (m *Miner) MapperID() int64 { return m.mapperID } +func (m *Miner) RouteID() int64 { return m.routeID } +func (m *Miner) CustomDiff() uint64 { return m.customDiff } +func (m *Miner) TX() uint64 { return m.tx } +func (m *Miner) RX() uint64 { return m.rx } +func (m *Miner) LastActivityAt() time.Time { return m.lastActivityAt } + +func (m *Miner) SetRPCID(value string) { m.rpcID = value } +func (m *Miner) SetUser(value string) { m.user = value } +func (m *Miner) SetPassword(value string) { m.password = value } +func (m *Miner) SetAgent(value string) { m.agent = value } +func (m *Miner) SetRigID(value string) { m.rigID = value } +func (m *Miner) SetState(value MinerState) { m.state = value } +func (m *Miner) SetDiff(value uint64) { m.diff = value } +func (m *Miner) SetFixedByte(value uint8) { m.fixedByte = value } +func (m *Miner) SetMapperID(value int64) { m.mapperID = value } +func (m *Miner) SetRouteID(value int64) { m.routeID = value } +func (m *Miner) SetCustomDiff(value uint64) { m.customDiff = value } +func (m *Miner) SetNiceHashEnabled(value bool) { m.extNH = value } + +func (m *Miner) Touch() { + m.lastActivityAt = time.Now().UTC() +} + +func remoteHost(address string) string { + host, _, errorValue := net.SplitHostPort(address) + if errorValue != nil { + return address + } + return host +} diff --git a/miner_runtime.go b/miner_runtime.go new file mode 100644 index 0000000..6451b84 --- /dev/null +++ b/miner_runtime.go @@ -0,0 +1,144 @@ +package proxy + +import ( + "bufio" + "crypto/rand" + "encoding/hex" + "encoding/json" + "net" +) + +type minerRequest struct { + ID int64 `json:"id"` + Method string `json:"method"` + Params json.RawMessage `json:"params"` +} + +// Start begins the read loop in a goroutine and arms the login timeout timer. +// +// m.Start() +func (m *Miner) Start() { + if m == nil || m.conn == nil { + return + } + + go func() { + reader := bufio.NewReader(m.conn) + for { + line, errorValue := reader.ReadBytes('\n') + if errorValue != nil { + m.Close() + return + } + m.rx += uint64(len(line)) + m.Touch() + } + }() +} + +// ForwardJob encodes the job as a stratum job notification and writes it to the miner. +// +// m.ForwardJob(job, "cn/r") +func (m *Miner) ForwardJob(job Job, algo string) { + if m == nil || m.conn == nil { + return + } + + blob := job.Blob + if m.extNH { + blob = job.BlobWithFixedByte(m.fixedByte) + } + + m.diff = job.DifficultyFromTarget() + m.state = MinerStateReady + m.Touch() + + params := map[string]interface{}{ + "blob": blob, + "job_id": job.JobID, + "target": job.Target, + "id": m.rpcID, + } + if algo != "" { + params["algo"] = algo + } + + m.writeJSON(map[string]interface{}{ + "jsonrpc": "2.0", + "method": "job", + "params": params, + }) +} + +// ReplyWithError sends a JSON-RPC error response for the given request id. +// +// m.ReplyWithError(2, "Low difficulty share") +func (m *Miner) ReplyWithError(id int64, message string) { + m.writeJSON(map[string]interface{}{ + "id": id, + "jsonrpc": "2.0", + "error": map[string]interface{}{ + "code": -1, + "message": message, + }, + }) +} + +// Success sends a JSON-RPC success response with the given status string. +// +// m.Success(2, "OK") +func (m *Miner) Success(id int64, status string) { + m.writeJSON(map[string]interface{}{ + "id": id, + "jsonrpc": "2.0", + "error": nil, + "result": map[string]string{ + "status": status, + }, + }) +} + +// Close initiates graceful TCP shutdown. Safe to call multiple times. +// +// m.Close() +func (m *Miner) Close() { + if m == nil || m.conn == nil || m.state == MinerStateClosing { + return + } + + m.state = MinerStateClosing + _ = m.conn.Close() +} + +func (m *Miner) writeJSON(value interface{}) { + if m == nil || m.conn == nil { + return + } + + data, errorValue := json.Marshal(value) + if errorValue != nil { + return + } + + m.sendMu.Lock() + defer m.sendMu.Unlock() + + data = append(data, '\n') + written, errorValue := m.conn.Write(data) + if errorValue == nil { + m.tx += uint64(written) + } +} + +func newRPCID() string { + value := make([]byte, 16) + _, _ = rand.Read(value) + return hex.EncodeToString(value) +} + +func (m *Miner) RemoteAddr() net.Addr { + if m == nil || m.conn == nil { + return nil + } + return m.conn.RemoteAddr() +} diff --git a/pool/client.go b/pool/client.go index 8b60f8e..20f73d6 100644 --- a/pool/client.go +++ b/pool/client.go @@ -5,9 +5,16 @@ package pool import ( + "bufio" + "crypto/sha256" "crypto/tls" + "encoding/hex" + "encoding/json" + "errors" "net" + "strings" "sync" + "sync/atomic" "dappco.re/go/core/proxy" ) @@ -39,3 +46,203 @@ type StratumListener interface { // OnDisconnect is called when the pool TCP connection closes for any reason. OnDisconnect() } + +type jsonRPCRequest struct { + ID int64 `json:"id"` + Method string `json:"method"` + Params interface{} `json:"params,omitempty"` +} + +type jsonRPCResponse struct { + ID int64 `json:"id"` + Method string `json:"method"` + Params json.RawMessage `json:"params"` + Result json.RawMessage `json:"result"` + Error *jsonRPCErrorBody `json:"error"` +} + +type jsonRPCErrorBody struct { + Code int `json:"code"` + Message string `json:"message"` +} + +// NewStratumClient stores the pool config and listener. +// +// client := pool.NewStratumClient(poolCfg, listener) +func NewStratumClient(cfg proxy.PoolConfig, listener StratumListener) *StratumClient { + return &StratumClient{ + cfg: cfg, + listener: listener, + } +} + +// Connect dials the pool. Applies TLS if cfg.TLS is true. +// +// errorValue := client.Connect() +func (c *StratumClient) Connect() error { + var connection net.Conn + var errorValue error + + if c.cfg.TLS { + tlsConfig := &tls.Config{MinVersion: tls.VersionTLS12} + connection, errorValue = tls.Dial("tcp", c.cfg.URL, tlsConfig) + if errorValue != nil { + return errorValue + } + + tlsConnection := connection.(*tls.Conn) + if c.cfg.TLSFingerprint != "" { + state := tlsConnection.ConnectionState() + if len(state.PeerCertificates) == 0 { + _ = connection.Close() + return errors.New("missing peer certificate") + } + + fingerprint := sha256.Sum256(state.PeerCertificates[0].Raw) + if hex.EncodeToString(fingerprint[:]) != strings.ToLower(c.cfg.TLSFingerprint) { + _ = connection.Close() + return errors.New("pool fingerprint mismatch") + } + } + c.tlsConn = tlsConnection + } else { + connection, errorValue = net.Dial("tcp", c.cfg.URL) + if errorValue != nil { + return errorValue + } + } + + c.conn = connection + go c.readLoop() + return nil +} + +// Login sends the stratum login request using cfg.User and cfg.Pass. +// +// client.Login() +func (c *StratumClient) Login() { + params := map[string]interface{}{ + "login": c.cfg.User, + "pass": c.cfg.Pass, + "rigid": c.cfg.RigID, + } + if c.cfg.Algo != "" { + params["algo"] = []string{c.cfg.Algo} + } + + _ = c.writeJSON(jsonRPCRequest{ + ID: 1, + Method: "login", + Params: params, + }) +} + +// Submit sends a share submission. Returns the sequence number for result correlation. +// +// seq := client.Submit(jobID, "deadbeef", "HASH64HEX", "cn/r") +func (c *StratumClient) Submit(jobID string, nonce string, result string, algo string) int64 { + sequence := atomic.AddInt64(&c.seq, 1) + params := map[string]string{ + "id": c.sessionID, + "job_id": jobID, + "nonce": nonce, + "result": result, + } + if algo != "" { + params["algo"] = algo + } + + _ = c.writeJSON(jsonRPCRequest{ + ID: sequence, + Method: "submit", + Params: params, + }) + + return sequence +} + +// Disconnect closes the connection cleanly. Triggers OnDisconnect on the listener. +// +// client.Disconnect() +func (c *StratumClient) Disconnect() { + if c.conn != nil { + _ = c.conn.Close() + } + if c.listener != nil { + c.listener.OnDisconnect() + } +} + +func (c *StratumClient) writeJSON(value interface{}) error { + if c.conn == nil { + return nil + } + + data, errorValue := json.Marshal(value) + if errorValue != nil { + return errorValue + } + + c.sendMu.Lock() + defer c.sendMu.Unlock() + _, errorValue = c.conn.Write(append(data, '\n')) + return errorValue +} + +func (c *StratumClient) readLoop() { + reader := bufio.NewReader(c.conn) + for { + line, errorValue := reader.ReadBytes('\n') + if errorValue != nil { + if c.listener != nil { + c.listener.OnDisconnect() + } + return + } + + response := jsonRPCResponse{} + if errorValue = json.Unmarshal(line, &response); errorValue != nil { + continue + } + c.handleMessage(response) + } +} + +func (c *StratumClient) handleMessage(response jsonRPCResponse) { + if len(response.Result) > 0 { + var loginResult struct { + ID string `json:"id"` + Job proxy.Job `json:"job"` + } + if json.Unmarshal(response.Result, &loginResult) == nil && loginResult.ID != "" { + c.sessionID = loginResult.ID + if loginResult.Job.IsValid() { + c.active = true + if c.listener != nil { + c.listener.OnJob(loginResult.Job) + } + } + return + } + + if c.listener != nil { + accepted := response.Error == nil + errorMessage := "" + if response.Error != nil { + errorMessage = response.Error.Message + } + c.listener.OnResultAccepted(response.ID, accepted, errorMessage) + } + return + } + + if response.Method == "job" { + var payload proxy.Job + if json.Unmarshal(response.Params, &payload) == nil && payload.IsValid() { + c.active = true + if c.listener != nil { + c.listener.OnJob(payload) + } + } + } +} diff --git a/pool/strategy.go b/pool/strategy.go index 0830e2e..827bfe6 100644 --- a/pool/strategy.go +++ b/pool/strategy.go @@ -2,6 +2,7 @@ package pool import ( "sync" + "time" "dappco.re/go/core/proxy" ) @@ -35,3 +36,95 @@ type Strategy interface { Disconnect() IsActive() bool } + +// NewStrategyFactory captures the pool list and retry settings for later mapper creation. +// +// factory := pool.NewStrategyFactory(cfg) +func NewStrategyFactory(cfg *proxy.Config) StrategyFactory { + return func(listener StratumListener) Strategy { + if cfg == nil { + return NewFailoverStrategy(nil, listener, nil) + } + return NewFailoverStrategy(cfg.Pools, listener, cfg) + } +} + +// NewFailoverStrategy stores the pool list and listener. +// +// strategy := pool.NewFailoverStrategy(cfg.Pools, listener, cfg) +func NewFailoverStrategy(pools []proxy.PoolConfig, listener StratumListener, cfg *proxy.Config) *FailoverStrategy { + return &FailoverStrategy{ + pools: append([]proxy.PoolConfig(nil), pools...), + listener: listener, + cfg: cfg, + } +} + +// Connect dials the current pool. On failure, advances to the next pool. +// +// strategy.Connect() +func (s *FailoverStrategy) Connect() { + s.mu.Lock() + defer s.mu.Unlock() + + if len(s.pools) == 0 { + return + } + + retries := 1 + pause := time.Duration(0) + if s.cfg != nil { + if s.cfg.Retries > 0 { + retries = s.cfg.Retries + } + if s.cfg.RetryPause > 0 { + pause = time.Duration(s.cfg.RetryPause) * time.Second + } + } + + for attempt := 0; attempt < retries; attempt++ { + for index, poolConfig := range s.pools { + if !poolConfig.Enabled { + continue + } + + client := NewStratumClient(poolConfig, s.listener) + if errorValue := client.Connect(); errorValue == nil { + s.client = client + s.current = index + client.Login() + return + } + } + + if pause > 0 && attempt < retries-1 { + time.Sleep(pause) + } + } +} + +func (s *FailoverStrategy) Submit(jobID string, nonce string, result string, algo string) int64 { + s.mu.Lock() + client := s.client + s.mu.Unlock() + if client == nil { + return 0 + } + return client.Submit(jobID, nonce, result, algo) +} + +func (s *FailoverStrategy) Disconnect() { + s.mu.Lock() + client := s.client + s.client = nil + s.mu.Unlock() + if client != nil { + client.Disconnect() + } +} + +func (s *FailoverStrategy) IsActive() bool { + s.mu.Lock() + defer s.mu.Unlock() + return s.client != nil && s.client.active +} diff --git a/proxy_runtime.go b/proxy_runtime.go new file mode 100644 index 0000000..14a625f --- /dev/null +++ b/proxy_runtime.go @@ -0,0 +1,169 @@ +package proxy + +import ( + "net" + "sync/atomic" + "time" +) + +var proxyMinerCount atomic.Uint64 + +// New creates and wires all subsystems but does not start the tick loop or TCP listeners. +// +// p, errorValue := proxy.New(cfg) +func New(cfg *Config) (*Proxy, error) { + if errorValue := cfg.Validate(); errorValue != nil { + return nil, errorValue + } + + events := NewEventBus() + stats := NewStats() + workers := NewWorkers(cfg.Workers, events) + + proxyValue := &Proxy{ + config: cfg, + splitter: noopSplitter{}, + stats: stats, + workers: workers, + events: events, + done: make(chan struct{}), + } + + events.Subscribe(EventAccept, stats.OnAccept) + events.Subscribe(EventReject, stats.OnReject) + events.Subscribe(EventLogin, func(event Event) { + stats.connections.Add(1) + current := proxyMinerCount.Add(1) + for { + maximum := stats.maxMiners.Load() + if current <= maximum || stats.maxMiners.CompareAndSwap(maximum, current) { + break + } + } + }) + events.Subscribe(EventClose, func(event Event) { + if proxyMinerCount.Load() > 0 { + proxyMinerCount.Add(^uint64(0)) + } + }) + events.Subscribe(EventLogin, NewCustomDiff(cfg.CustomDiff).OnLogin) + + return proxyValue, nil +} + +// Start begins the TCP listener(s), pool connections, and tick loop. +// +// p.Start() +func (p *Proxy) Start() { + if p.splitter != nil { + p.splitter.Connect() + } + p.ticker = time.NewTicker(time.Second) + + for _, bind := range p.config.Bind { + server, errorValue := NewServer(bind, nil, NewRateLimiter(p.config.RateLimit), p.acceptConn) + if errorValue != nil { + continue + } + p.servers = append(p.servers, server) + server.Start() + } + + go func() { + var ticks uint64 + for { + select { + case <-p.ticker.C: + ticks++ + p.stats.Tick() + p.workers.Tick() + if p.splitter != nil { + p.splitter.Tick(ticks) + } + case <-p.done: + return + } + } + }() +} + +type noopSplitter struct{} + +func (noopSplitter) Connect() {} +func (noopSplitter) OnLogin(event *LoginEvent) {} +func (noopSplitter) OnSubmit(event *SubmitEvent) {} +func (noopSplitter) OnClose(event *CloseEvent) {} +func (noopSplitter) Tick(ticks uint64) {} +func (noopSplitter) GC() {} +func (noopSplitter) Upstreams() UpstreamStats { return UpstreamStats{} } + +// Stop shuts down all subsystems cleanly. +// +// p.Stop() +func (p *Proxy) Stop() { + if p.ticker != nil { + p.ticker.Stop() + } + for _, server := range p.servers { + server.Stop() + } + if p.watcher != nil { + p.watcher.Stop() + } + select { + case <-p.done: + default: + close(p.done) + } +} + +// Reload replaces the live config. +// +// p.Reload(newCfg) +func (p *Proxy) Reload(cfg *Config) { + if cfg != nil { + p.config = cfg + } +} + +func (p *Proxy) Summary() StatsSummary { + return p.stats.Summary() +} + +func (p *Proxy) Workers() []WorkerRecord { + return p.workers.List() +} + +func (p *Proxy) CurrentMiners() uint64 { + return proxyMinerCount.Load() +} + +func (p *Proxy) MaxMiners() uint64 { + return p.stats.maxMiners.Load() +} + +func (p *Proxy) Mode() string { + if p == nil || p.config == nil { + return "" + } + return p.config.Mode +} + +func (p *Proxy) WorkersMode() string { + if p == nil || p.config == nil { + return "" + } + return string(p.config.Workers) +} + +func (p *Proxy) Upstreams() UpstreamStats { + if p == nil || p.splitter == nil { + return UpstreamStats{} + } + return p.splitter.Upstreams() +} + +func (p *Proxy) acceptConn(conn net.Conn, localPort uint16) { + miner := NewMiner(conn, localPort, nil) + p.events.Dispatch(Event{Type: EventLogin, Miner: miner}) +} diff --git a/runtime_support.go b/runtime_support.go new file mode 100644 index 0000000..0de48f1 --- /dev/null +++ b/runtime_support.go @@ -0,0 +1,136 @@ +package proxy + +import ( + "strconv" + "strings" + "time" +) + +// NewRateLimiter allocates a per-IP token bucket limiter. +// +// rl := proxy.NewRateLimiter(cfg.RateLimit) +func NewRateLimiter(config RateLimit) *RateLimiter { + return &RateLimiter{ + cfg: config, + buckets: make(map[string]*tokenBucket), + banned: make(map[string]time.Time), + } +} + +// Allow returns true if the IP address is permitted to open a new connection. Thread-safe. +// +// if rl.Allow(conn.RemoteAddr().String()) { proceed() } +func (rl *RateLimiter) Allow(ip string) bool { + if rl == nil || rl.cfg.MaxConnectionsPerMinute <= 0 { + return true + } + + host := remoteHost(ip) + now := time.Now().UTC() + + rl.mu.Lock() + defer rl.mu.Unlock() + + if bannedUntil, exists := rl.banned[host]; exists { + if bannedUntil.After(now) { + return false + } + delete(rl.banned, host) + } + + bucket, exists := rl.buckets[host] + if !exists { + bucket = &tokenBucket{ + tokens: rl.cfg.MaxConnectionsPerMinute, + lastRefill: now, + } + rl.buckets[host] = bucket + } + + rl.refillBucket(bucket, now) + if bucket.tokens <= 0 { + if rl.cfg.BanDurationSeconds > 0 { + rl.banned[host] = now.Add(time.Duration(rl.cfg.BanDurationSeconds) * time.Second) + } + return false + } + + bucket.tokens-- + return true +} + +// Tick removes expired ban entries and refills all token buckets. Called every second. +// +// rl.Tick() +func (rl *RateLimiter) Tick() { + if rl == nil || rl.cfg.MaxConnectionsPerMinute <= 0 { + return + } + + now := time.Now().UTC() + rl.mu.Lock() + defer rl.mu.Unlock() + + for host, bannedUntil := range rl.banned { + if !bannedUntil.After(now) { + delete(rl.banned, host) + } + } + + for _, bucket := range rl.buckets { + rl.refillBucket(bucket, now) + } +} + +func (rl *RateLimiter) refillBucket(bucket *tokenBucket, now time.Time) { + if bucket == nil || rl.cfg.MaxConnectionsPerMinute <= 0 { + return + } + + refillEvery := time.Minute / time.Duration(rl.cfg.MaxConnectionsPerMinute) + if refillEvery <= 0 { + refillEvery = time.Second + } + + elapsed := now.Sub(bucket.lastRefill) + if elapsed < refillEvery { + return + } + + tokensToAdd := int(elapsed / refillEvery) + bucket.tokens += tokensToAdd + if bucket.tokens > rl.cfg.MaxConnectionsPerMinute { + bucket.tokens = rl.cfg.MaxConnectionsPerMinute + } + bucket.lastRefill = bucket.lastRefill.Add(time.Duration(tokensToAdd) * refillEvery) +} + +// NewCustomDiff stores the default custom difficulty override. +// +// cd := proxy.NewCustomDiff(50000) +func NewCustomDiff(globalDiff uint64) *CustomDiff { + return &CustomDiff{globalDiff: globalDiff} +} + +// OnLogin parses miner.User for a "+{number}" suffix and sets miner.CustomDiff. +// +// cd.OnLogin(proxy.Event{Miner: miner}) +func (cd *CustomDiff) OnLogin(event Event) { + if event.Miner == nil { + return + } + + user := event.Miner.User() + index := strings.LastIndex(user, "+") + if index > 0 { + if value, errorValue := strconv.ParseUint(user[index+1:], 10, 64); errorValue == nil { + event.Miner.SetUser(user[:index]) + event.Miner.SetCustomDiff(value) + return + } + } + + if cd != nil && cd.globalDiff > 0 { + event.Miner.SetCustomDiff(cd.globalDiff) + } +} diff --git a/runtime_support_test.go b/runtime_support_test.go new file mode 100644 index 0000000..df9b6fd --- /dev/null +++ b/runtime_support_test.go @@ -0,0 +1,57 @@ +package proxy + +import ( + "testing" + "time" +) + +func TestRateLimiter_Allow_Good(t *testing.T) { + limiter := NewRateLimiter(RateLimit{MaxConnectionsPerMinute: 2}) + if !limiter.Allow("127.0.0.1:1234") { + t.Fatal("expected first connection to pass") + } +} + +func TestRateLimiter_Allow_Bad(t *testing.T) { + limiter := NewRateLimiter(RateLimit{MaxConnectionsPerMinute: 1, BanDurationSeconds: 60}) + if !limiter.Allow("127.0.0.1:1234") { + t.Fatal("expected first connection to pass") + } + if limiter.Allow("127.0.0.1:1234") { + t.Fatal("expected second connection to be blocked") + } +} + +func TestRateLimiter_Allow_Ugly(t *testing.T) { + limiter := NewRateLimiter(RateLimit{MaxConnectionsPerMinute: 1}) + limiter.Allow("127.0.0.1:1234") + time.Sleep(time.Second) + limiter.Tick() + if limiter.Allow("127.0.0.1:1234") { + t.Fatal("expected bucket not to refill fully after one second at 1/minute") + } +} + +func TestCustomDiff_OnLogin_Good(t *testing.T) { + miner := &Miner{user: "wallet+5000"} + NewCustomDiff(100).OnLogin(Event{Miner: miner}) + if miner.User() != "wallet" || miner.CustomDiff() != 5000 { + t.Fatalf("expected parsed custom diff, got user=%q diff=%d", miner.User(), miner.CustomDiff()) + } +} + +func TestCustomDiff_OnLogin_Bad(t *testing.T) { + miner := &Miner{user: "wallet"} + NewCustomDiff(100).OnLogin(Event{Miner: miner}) + if miner.CustomDiff() != 100 { + t.Fatalf("expected fallback diff 100, got %d", miner.CustomDiff()) + } +} + +func TestCustomDiff_OnLogin_Ugly(t *testing.T) { + miner := &Miner{user: "wallet+bad"} + NewCustomDiff(100).OnLogin(Event{Miner: miner}) + if miner.User() != "wallet+bad" || miner.CustomDiff() != 100 { + t.Fatalf("expected invalid suffix to preserve user and apply global diff, got user=%q diff=%d", miner.User(), miner.CustomDiff()) + } +} diff --git a/server_runtime.go b/server_runtime.go new file mode 100644 index 0000000..9030be1 --- /dev/null +++ b/server_runtime.go @@ -0,0 +1,80 @@ +package proxy + +import ( + "crypto/tls" + "errors" + "net" + "strconv" +) + +// NewServer binds one miner-facing TCP listener. +// +// srv, errorValue := proxy.NewServer(bind, nil, limiter, onAccept) +func NewServer(bind BindAddr, tlsCfg *tls.Config, limiter *RateLimiter, onAccept func(net.Conn, uint16)) (*Server, error) { + address := net.JoinHostPort(bind.Host, strconv.Itoa(int(bind.Port))) + listener, errorValue := net.Listen("tcp", address) + if errorValue != nil { + return nil, errorValue + } + + return &Server{ + addr: bind, + tlsCfg: tlsCfg, + limiter: limiter, + onAccept: onAccept, + listener: listener, + done: make(chan struct{}), + }, nil +} + +// Start begins accepting connections in a goroutine. +// +// srv.Start() +func (s *Server) Start() { + if s == nil || s.listener == nil { + return + } + + go func() { + for { + conn, errorValue := s.listener.Accept() + if errorValue != nil { + select { + case <-s.done: + return + default: + continue + } + } + + if s.limiter != nil && !s.limiter.Allow(conn.RemoteAddr().String()) { + _ = conn.Close() + continue + } + + if s.onAccept == nil { + _ = conn.Close() + continue + } + s.onAccept(conn, s.addr.Port) + } + }() +} + +// Stop closes the listener. +// +// srv.Stop() +func (s *Server) Stop() { + if s == nil || s.listener == nil { + return + } + + select { + case <-s.done: + default: + close(s.done) + } + _ = s.listener.Close() +} + +var errServerClosed = errors.New("server closed") diff --git a/splitter/nicehash/mapper.go b/splitter/nicehash/mapper.go index 6f4b408..7ed0a02 100644 --- a/splitter/nicehash/mapper.go +++ b/splitter/nicehash/mapper.go @@ -15,7 +15,7 @@ import ( type NonceMapper struct { id int64 storage *NonceStorage - strategy pool.Strategy // manages pool client lifecycle and failover + strategy pool.Strategy // manages pool client lifecycle and failover pending map[int64]SubmitContext // sequence → {requestID, minerID} cfg *proxy.Config active bool // true once pool has sent at least one job @@ -30,3 +30,45 @@ type SubmitContext struct { RequestID int64 // JSON-RPC id from the miner's submit request MinerID int64 // miner that submitted } + +// NewNonceMapper creates one upstream pool mapper and its local slot table. +// +// mapper := nicehash.NewNonceMapper(1, cfg, strategy) +func NewNonceMapper(id int64, cfg *proxy.Config, strategy pool.Strategy) *NonceMapper { + return &NonceMapper{ + id: id, + storage: NewNonceStorage(), + strategy: strategy, + cfg: cfg, + pending: make(map[int64]SubmitContext), + } +} + +func (m *NonceMapper) Add(miner *proxy.Miner) bool { + return m.storage.Add(miner) +} + +func (m *NonceMapper) Remove(miner *proxy.Miner) { + m.storage.Remove(miner) +} + +func (m *NonceMapper) Submit(event *proxy.SubmitEvent) { + if event == nil || m.strategy == nil { + return + } + + sequence := m.strategy.Submit(event.JobID, event.Nonce, event.Result, event.Algo) + m.mu.Lock() + m.pending[sequence] = SubmitContext{ + RequestID: event.RequestID, + MinerID: event.Miner.ID(), + } + m.mu.Unlock() +} + +func (m *NonceMapper) IsActive() bool { + if m.strategy == nil { + return false + } + return m.strategy.IsActive() +} diff --git a/splitter/nicehash/splitter.go b/splitter/nicehash/splitter.go index 0d77522..0532175 100644 --- a/splitter/nicehash/splitter.go +++ b/splitter/nicehash/splitter.go @@ -28,3 +28,135 @@ type NonceSplitter struct { strategyFactory pool.StrategyFactory mu sync.RWMutex } + +// NewNonceSplitter creates the NiceHash splitter. +// +// s := nicehash.NewNonceSplitter(cfg, bus, factory) +func NewNonceSplitter(cfg *proxy.Config, events *proxy.EventBus, factory pool.StrategyFactory) *NonceSplitter { + return &NonceSplitter{ + cfg: cfg, + events: events, + strategyFactory: factory, + mappers: make([]*NonceMapper, 0, 1), + } +} + +func (s *NonceSplitter) Connect() { + s.mu.Lock() + defer s.mu.Unlock() + + if len(s.mappers) > 0 { + return + } + + mapper := s.newMapperLocked() + s.mappers = append(s.mappers, mapper) + mapper.strategy.Connect() +} + +func (s *NonceSplitter) OnLogin(event *proxy.LoginEvent) { + if event == nil || event.Miner == nil { + return + } + + s.mu.Lock() + defer s.mu.Unlock() + + for _, mapper := range s.mappers { + if mapper.Add(event.Miner) { + event.Miner.SetMapperID(mapper.id) + event.Miner.SetNiceHashEnabled(true) + return + } + } + + mapper := s.newMapperLocked() + s.mappers = append(s.mappers, mapper) + mapper.strategy.Connect() + if mapper.Add(event.Miner) { + event.Miner.SetMapperID(mapper.id) + event.Miner.SetNiceHashEnabled(true) + } +} + +func (s *NonceSplitter) OnSubmit(event *proxy.SubmitEvent) { + if event == nil || event.Miner == nil { + return + } + + s.mu.RLock() + defer s.mu.RUnlock() + + for _, mapper := range s.mappers { + if mapper.id == event.Miner.MapperID() { + mapper.Submit(event) + return + } + } +} + +func (s *NonceSplitter) OnClose(event *proxy.CloseEvent) { + if event == nil || event.Miner == nil { + return + } + + s.mu.RLock() + defer s.mu.RUnlock() + + for _, mapper := range s.mappers { + if mapper.id == event.Miner.MapperID() { + mapper.Remove(event.Miner) + return + } + } +} + +func (s *NonceSplitter) Tick(ticks uint64) { + if ticks%60 == 0 { + s.GC() + } +} + +func (s *NonceSplitter) GC() { + s.mu.Lock() + defer s.mu.Unlock() + + filtered := s.mappers[:0] + for _, mapper := range s.mappers { + free, dead, active := mapper.storage.SlotCount() + if active == 0 && dead == 0 && free == 256 && len(s.mappers) > 1 { + mapper.strategy.Disconnect() + continue + } + filtered = append(filtered, mapper) + } + s.mappers = filtered +} + +func (s *NonceSplitter) Upstreams() proxy.UpstreamStats { + s.mu.RLock() + defer s.mu.RUnlock() + + var stats proxy.UpstreamStats + for _, mapper := range s.mappers { + stats.Total++ + switch { + case mapper.suspended > 0: + stats.Error++ + case mapper.IsActive(): + stats.Active++ + default: + stats.Sleep++ + } + } + return stats +} + +func (s *NonceSplitter) newMapperLocked() *NonceMapper { + mapperID := int64(len(s.mappers) + 1) + var strategy pool.Strategy + if s.strategyFactory != nil { + strategy = s.strategyFactory(nil) + } + return NewNonceMapper(mapperID, s.cfg, strategy) +} diff --git a/splitter/nicehash/storage.go b/splitter/nicehash/storage.go index defd3a8..b2661af 100644 --- a/splitter/nicehash/storage.go +++ b/splitter/nicehash/storage.go @@ -23,3 +23,120 @@ type NonceStorage struct { cursor int // search starts here (round-robin allocation) mu sync.Mutex } + +// NewNonceStorage allocates the fixed-size miner slot table. +// +// storage := nicehash.NewNonceStorage() +func NewNonceStorage() *NonceStorage { + return &NonceStorage{ + miners: make(map[int64]*proxy.Miner), + } +} + +// Add finds the next free slot starting from cursor (wrapping), sets slot[index] = minerID, +// and sets the miner fixed byte. +// +// ok := storage.Add(miner) +func (s *NonceStorage) Add(miner *proxy.Miner) bool { + if miner == nil { + return false + } + + s.mu.Lock() + defer s.mu.Unlock() + + for offset := 0; offset < len(s.slots); offset++ { + index := (s.cursor + offset) % len(s.slots) + if s.slots[index] != 0 { + continue + } + + s.slots[index] = miner.ID() + s.miners[miner.ID()] = miner + miner.SetFixedByte(uint8(index)) + s.cursor = (index + 1) % len(s.slots) + return true + } + + return false +} + +// Remove marks slot[miner.FixedByte] as a dead slot until the next SetJob call. +// +// storage.Remove(miner) +func (s *NonceStorage) Remove(miner *proxy.Miner) { + if miner == nil { + return + } + + s.mu.Lock() + defer s.mu.Unlock() + + index := int(miner.FixedByte()) + if index >= 0 && index < len(s.slots) && s.slots[index] == miner.ID() { + s.slots[index] = -miner.ID() + } + delete(s.miners, miner.ID()) +} + +// SetJob replaces the current job, clears dead slots, and fans the job out to active miners. +// +// storage.SetJob(job) +func (s *NonceStorage) SetJob(job proxy.Job) { + s.mu.Lock() + if s.job.ClientID == job.ClientID || s.job.ClientID == "" { + s.prevJob = s.job + } else { + s.prevJob = proxy.Job{} + } + s.job = job + + miners := make([]*proxy.Miner, 0, len(s.miners)) + for index, minerID := range s.slots { + if minerID < 0 { + s.slots[index] = 0 + continue + } + if minerID > 0 { + if miner := s.miners[minerID]; miner != nil { + miners = append(miners, miner) + } + } + } + s.mu.Unlock() + + for _, miner := range miners { + miner.ForwardJob(job, job.Algo) + } +} + +// IsValidJobID returns true if id matches the current or previous job ID. +// +// if !storage.IsValidJobID(submitJobID) { reject } +func (s *NonceStorage) IsValidJobID(id string) bool { + s.mu.Lock() + defer s.mu.Unlock() + + return id != "" && (id == s.job.JobID || id == s.prevJob.JobID) +} + +// SlotCount returns free, dead, and active slot counts for monitoring output. +// +// free, dead, active := storage.SlotCount() +func (s *NonceStorage) SlotCount() (free int, dead int, active int) { + s.mu.Lock() + defer s.mu.Unlock() + + for _, slot := range s.slots { + switch { + case slot == 0: + free++ + case slot < 0: + dead++ + default: + active++ + } + } + + return free, dead, active +} diff --git a/splitter/nicehash/storage_test.go b/splitter/nicehash/storage_test.go new file mode 100644 index 0000000..c371ae4 --- /dev/null +++ b/splitter/nicehash/storage_test.go @@ -0,0 +1,61 @@ +package nicehash + +import ( + "testing" + + "dappco.re/go/core/proxy" +) + +func TestNonceStorage_Add_Good(t *testing.T) { + storage := NewNonceStorage() + miner := proxy.NewMiner(nil, 0, nil) + miner.SetUser("wallet") + if !storage.Add(miner) { + t.Fatal("expected slot allocation to succeed") + } +} + +func TestNonceStorage_Add_Bad(t *testing.T) { + storage := NewNonceStorage() + if storage.Add(nil) { + t.Fatal("expected nil miner allocation to fail") + } +} + +func TestNonceStorage_Add_Ugly(t *testing.T) { + storage := NewNonceStorage() + for index := 0; index < 256; index++ { + miner := proxy.NewMiner(nil, 0, nil) + if !storage.Add(miner) { + t.Fatalf("expected miner %d to fit", index) + } + } + if storage.Add(proxy.NewMiner(nil, 0, nil)) { + t.Fatal("expected 257th miner to fail") + } +} + +func TestNonceStorage_IsValidJobID_Good(t *testing.T) { + storage := NewNonceStorage() + storage.SetJob(proxy.Job{Blob: "abcd", JobID: "job-1"}) + if !storage.IsValidJobID("job-1") { + t.Fatal("expected current job ID to be valid") + } +} + +func TestNonceStorage_IsValidJobID_Bad(t *testing.T) { + storage := NewNonceStorage() + storage.SetJob(proxy.Job{Blob: "abcd", JobID: "job-1"}) + if storage.IsValidJobID("job-2") { + t.Fatal("expected unknown job ID to be invalid") + } +} + +func TestNonceStorage_IsValidJobID_Ugly(t *testing.T) { + storage := NewNonceStorage() + storage.SetJob(proxy.Job{Blob: "abcd", JobID: "job-1", ClientID: "pool-a"}) + storage.SetJob(proxy.Job{Blob: "efgh", JobID: "job-2", ClientID: "pool-a"}) + if !storage.IsValidJobID("job-1") { + t.Fatal("expected previous job ID from same client to remain valid") + } +} diff --git a/splitter/simple/mapper.go b/splitter/simple/mapper.go index 18be7e8..afe5dcb 100644 --- a/splitter/simple/mapper.go +++ b/splitter/simple/mapper.go @@ -19,3 +19,10 @@ type SimpleMapper struct { idleAt time.Time // zero when active stopped bool } + +// NewSimpleMapper stores the mapper ID and strategy. +// +// mapper := simple.NewSimpleMapper(1, strategy) +func NewSimpleMapper(id int64, strategy pool.Strategy) *SimpleMapper { + return &SimpleMapper{id: id, strategy: strategy} +} diff --git a/splitter/simple/splitter.go b/splitter/simple/splitter.go index 7e8c555..81f3e19 100644 --- a/splitter/simple/splitter.go +++ b/splitter/simple/splitter.go @@ -9,6 +9,7 @@ package simple import ( "sync" + "time" "dappco.re/go/core/proxy" "dappco.re/go/core/proxy/pool" @@ -26,3 +27,138 @@ type SimpleSplitter struct { mu sync.Mutex seq int64 // monotonic mapper sequence counter } + +// NewSimpleSplitter creates the passthrough splitter. +// +// s := simple.NewSimpleSplitter(cfg, bus, factory) +func NewSimpleSplitter(cfg *proxy.Config, events *proxy.EventBus, factory pool.StrategyFactory) *SimpleSplitter { + return &SimpleSplitter{ + active: make(map[int64]*SimpleMapper), + idle: make(map[int64]*SimpleMapper), + cfg: cfg, + events: events, + factory: factory, + } +} + +func (s *SimpleSplitter) Connect() {} + +func (s *SimpleSplitter) OnLogin(event *proxy.LoginEvent) { + if event == nil || event.Miner == nil { + return + } + + s.mu.Lock() + defer s.mu.Unlock() + + var mapper *SimpleMapper + for mapperID, idleMapper := range s.idle { + mapper = idleMapper + delete(s.idle, mapperID) + break + } + + if mapper == nil { + s.seq++ + var strategy pool.Strategy + if s.factory != nil { + strategy = s.factory(nil) + } + mapper = NewSimpleMapper(s.seq, strategy) + if mapper.strategy != nil { + mapper.strategy.Connect() + } + } + + mapper.miner = event.Miner + mapper.idleAt = time.Time{} + event.Miner.SetRouteID(mapper.id) + s.active[event.Miner.ID()] = mapper +} + +func (s *SimpleSplitter) OnSubmit(event *proxy.SubmitEvent) { + if event == nil || event.Miner == nil { + return + } + + s.mu.Lock() + mapper := s.active[event.Miner.ID()] + s.mu.Unlock() + if mapper == nil || mapper.strategy == nil { + return + } + + mapper.strategy.Submit(event.JobID, event.Nonce, event.Result, event.Algo) +} + +func (s *SimpleSplitter) OnClose(event *proxy.CloseEvent) { + if event == nil || event.Miner == nil { + return + } + + s.mu.Lock() + defer s.mu.Unlock() + + mapper := s.active[event.Miner.ID()] + if mapper == nil { + return + } + delete(s.active, event.Miner.ID()) + + mapper.miner = nil + mapper.idleAt = time.Now().UTC() + if s.cfg != nil && s.cfg.ReuseTimeout > 0 { + s.idle[mapper.id] = mapper + return + } + + mapper.stopped = true + if mapper.strategy != nil { + mapper.strategy.Disconnect() + } +} + +func (s *SimpleSplitter) Tick(ticks uint64) { + if ticks%60 == 0 { + s.GC() + } +} + +func (s *SimpleSplitter) GC() { + s.mu.Lock() + defer s.mu.Unlock() + + timeout := time.Duration(0) + if s.cfg != nil && s.cfg.ReuseTimeout > 0 { + timeout = time.Duration(s.cfg.ReuseTimeout) * time.Second + } + + now := time.Now().UTC() + for mapperID, mapper := range s.idle { + if timeout == 0 || (!mapper.idleAt.IsZero() && now.Sub(mapper.idleAt) > timeout) || mapper.stopped { + if mapper.strategy != nil { + mapper.strategy.Disconnect() + } + delete(s.idle, mapperID) + } + } +} + +func (s *SimpleSplitter) Upstreams() proxy.UpstreamStats { + s.mu.Lock() + defer s.mu.Unlock() + + stats := proxy.UpstreamStats{ + Sleep: uint64(len(s.idle)), + } + for _, mapper := range s.active { + stats.Total++ + if mapper.strategy != nil && mapper.strategy.IsActive() { + stats.Active++ + } else { + stats.Error++ + } + } + stats.Total += uint64(len(s.idle)) + return stats +} diff --git a/stats.go b/stats.go index 4ff6559..b4befb9 100644 --- a/stats.go +++ b/stats.go @@ -1,6 +1,8 @@ package proxy import ( + "slices" + "sort" "sync" "sync/atomic" "time" @@ -53,8 +55,164 @@ type StatsSummary struct { Invalid uint64 `json:"invalid"` Expired uint64 `json:"expired"` Hashes uint64 `json:"hashes_total"` - AvgTime uint32 `json:"avg_time"` // seconds per accepted share - AvgLatency uint32 `json:"latency"` // median pool response latency in ms - Hashrate [6]float64 `json:"hashrate"` // H/s per window (index = HashrateWindow* constants) + AvgTime uint32 `json:"avg_time"` // seconds per accepted share + AvgLatency uint32 `json:"latency"` // median pool response latency in ms + Hashrate [6]float64 `json:"hashrate"` // H/s per window (index = HashrateWindow* constants) TopDiff [10]uint64 `json:"best"` } + +var hashrateWindowSizes = [5]int{60, 600, 3600, 43200, 86400} + +// NewStats allocates the rolling windows and initialises the clock anchor. +// +// s := proxy.NewStats() +func NewStats() *Stats { + stats := &Stats{ + startTime: time.Now().UTC(), + latency: make([]uint16, 0, 128), + } + + for index, size := range hashrateWindowSizes { + stats.windows[index] = tickWindow{ + buckets: make([]uint64, size), + size: size, + } + } + + return stats +} + +// OnAccept records an accepted share. Adds diff to the current second's bucket in all windows. +// +// stats.OnAccept(proxy.Event{Diff: 100000, Latency: 82}) +func (s *Stats) OnAccept(event Event) { + s.accepted.Add(1) + s.hashes.Add(event.Diff) + if event.Expired { + s.expired.Add(1) + } + + s.mu.Lock() + for index := 0; index < HashrateWindowAll; index++ { + s.windows[index].buckets[s.windows[index].pos] += event.Diff + } + insertTopDiff(&s.topDiff, event.Diff) + if event.Latency > 0 { + s.latency = appendCappedLatency(s.latency, event.Latency) + } + s.mu.Unlock() +} + +// OnReject records a rejected share. If e.Error indicates low diff or malformed, increments invalid. +// +// stats.OnReject(proxy.Event{Error: "Low difficulty share"}) +func (s *Stats) OnReject(event Event) { + s.rejected.Add(1) + if isInvalidShareError(event.Error) { + s.invalid.Add(1) + } + if event.Expired { + s.expired.Add(1) + } + if event.Latency > 0 { + s.mu.Lock() + s.latency = appendCappedLatency(s.latency, event.Latency) + s.mu.Unlock() + } +} + +// Tick advances all rolling windows by one second bucket. Called by the proxy tick loop. +// +// stats.Tick() +func (s *Stats) Tick() { + s.mu.Lock() + defer s.mu.Unlock() + + for index := 0; index < HashrateWindowAll; index++ { + window := &s.windows[index] + window.pos = (window.pos + 1) % window.size + window.buckets[window.pos] = 0 + } +} + +// Summary returns a point-in-time snapshot of all stats fields for API serialisation. +// +// summary := stats.Summary() +func (s *Stats) Summary() StatsSummary { + s.mu.Lock() + defer s.mu.Unlock() + + var summary StatsSummary + summary.Accepted = s.accepted.Load() + summary.Rejected = s.rejected.Load() + summary.Invalid = s.invalid.Load() + summary.Expired = s.expired.Load() + summary.Hashes = s.hashes.Load() + summary.TopDiff = s.topDiff + + for index := 0; index < HashrateWindowAll; index++ { + windowSize := hashrateWindowSizes[index] + summary.Hashrate[index] = float64(sumBuckets(s.windows[index].buckets)) / float64(windowSize) + } + + uptimeSeconds := uint64(time.Since(s.startTime).Seconds()) + if uptimeSeconds > 0 { + summary.Hashrate[HashrateWindowAll] = float64(summary.Hashes) / float64(uptimeSeconds) + } + if summary.Accepted > 0 && uptimeSeconds > 0 { + summary.AvgTime = uint32(uptimeSeconds / summary.Accepted) + } + + if len(s.latency) > 0 { + values := slices.Clone(s.latency) + sort.Slice(values, func(left int, right int) bool { + return values[left] < values[right] + }) + summary.AvgLatency = uint32(values[len(values)/2]) + } + + return summary +} + +func appendCappedLatency(latencies []uint16, latency uint16) []uint16 { + if len(latencies) == 10000 { + copy(latencies, latencies[1:]) + latencies[len(latencies)-1] = latency + return latencies + } + + return append(latencies, latency) +} + +func insertTopDiff(topDiff *[10]uint64, difficulty uint64) { + if difficulty == 0 { + return + } + + for index, value := range topDiff { + if difficulty <= value { + continue + } + + copy(topDiff[index+1:], topDiff[index:len(topDiff)-1]) + topDiff[index] = difficulty + return + } +} + +func isInvalidShareError(message string) bool { + switch message { + case "Low difficulty share", "Invalid nonce", "Malformed share", "Invalid result": + return true + default: + return false + } +} + +func sumBuckets(values []uint64) uint64 { + var total uint64 + for _, value := range values { + total += value + } + return total +} diff --git a/stats_workers_test.go b/stats_workers_test.go new file mode 100644 index 0000000..8eb6acc --- /dev/null +++ b/stats_workers_test.go @@ -0,0 +1,94 @@ +package proxy + +import "testing" + +func TestEventBus_Dispatch_Good(t *testing.T) { + bus := NewEventBus() + called := false + bus.Subscribe(EventLogin, func(event Event) { + called = event.Miner != nil + }) + bus.Dispatch(Event{Type: EventLogin, Miner: &Miner{}}) + if !called { + t.Fatal("expected handler to be called") + } +} + +func TestEventBus_Dispatch_Bad(t *testing.T) { + bus := NewEventBus() + bus.Subscribe(EventLogin, nil) + bus.Dispatch(Event{Type: EventLogin}) +} + +func TestEventBus_Dispatch_Ugly(t *testing.T) { + bus := NewEventBus() + count := 0 + bus.Subscribe(EventLogin, func(event Event) { count++ }) + bus.Subscribe(EventLogin, func(event Event) { count++ }) + bus.Dispatch(Event{Type: EventLogin}) + if count != 2 { + t.Fatalf("expected both handlers to run, got %d", count) + } +} + +func TestStats_Summary_Good(t *testing.T) { + stats := NewStats() + stats.OnAccept(Event{Diff: 120, Latency: 80}) + summary := stats.Summary() + if summary.Accepted != 1 || summary.Hashes != 120 { + t.Fatalf("unexpected summary: %+v", summary) + } +} + +func TestStats_Summary_Bad(t *testing.T) { + stats := NewStats() + stats.OnReject(Event{Error: "Low difficulty share"}) + summary := stats.Summary() + if summary.Rejected != 1 || summary.Invalid != 1 { + t.Fatalf("unexpected summary: %+v", summary) + } +} + +func TestStats_Summary_Ugly(t *testing.T) { + stats := NewStats() + stats.OnAccept(Event{Diff: 100, Latency: 10}) + stats.Tick() + stats.OnAccept(Event{Diff: 200, Latency: 20}) + summary := stats.Summary() + if summary.TopDiff[0] != 200 || summary.TopDiff[1] != 100 { + t.Fatalf("unexpected best shares: %+v", summary.TopDiff) + } +} + +func TestWorkers_List_Good(t *testing.T) { + bus := NewEventBus() + workers := NewWorkers(WorkersByRigID, bus) + miner := &Miner{id: 1, user: "wallet", rigID: "rig-a", ip: "10.0.0.1"} + bus.Dispatch(Event{Type: EventLogin, Miner: miner}) + bus.Dispatch(Event{Type: EventAccept, Miner: miner, Diff: 600}) + records := workers.List() + if len(records) != 1 || records[0].Name != "rig-a" || records[0].Accepted != 1 { + t.Fatalf("unexpected worker records: %+v", records) + } +} + +func TestWorkers_List_Bad(t *testing.T) { + bus := NewEventBus() + workers := NewWorkers(WorkersDisabled, bus) + bus.Dispatch(Event{Type: EventLogin, Miner: &Miner{id: 1, user: "wallet"}}) + if len(workers.List()) != 0 { + t.Fatal("expected no worker records when disabled") + } +} + +func TestWorkers_List_Ugly(t *testing.T) { + bus := NewEventBus() + workers := NewWorkers(WorkersByRigID, bus) + miner := &Miner{id: 1, user: "wallet", ip: "10.0.0.1"} + bus.Dispatch(Event{Type: EventLogin, Miner: miner}) + bus.Dispatch(Event{Type: EventReject, Miner: miner, Error: "Low difficulty share"}) + records := workers.List() + if len(records) != 1 || records[0].Name != "wallet" || records[0].Invalid != 1 { + t.Fatalf("unexpected worker records: %+v", records) + } +} diff --git a/worker.go b/worker.go index bf729f7..2a52a03 100644 --- a/worker.go +++ b/worker.go @@ -11,9 +11,9 @@ import ( // w := proxy.NewWorkers(proxy.WorkersByRigID, bus) type Workers struct { mode WorkersMode - entries []WorkerRecord // ordered by first-seen (stable) - nameIndex map[string]int // workerName → entries index - idIndex map[int64]int // minerID → entries index + entries []WorkerRecord // ordered by first-seen (stable) + nameIndex map[string]int // workerName → entries index + idIndex map[int64]int // minerID → entries index mu sync.RWMutex } @@ -27,7 +27,180 @@ type WorkerRecord struct { Accepted uint64 Rejected uint64 Invalid uint64 - Hashes uint64 // sum of accepted share difficulties + Hashes uint64 // sum of accepted share difficulties LastHashAt time.Time windows [5]tickWindow // 60s, 600s, 3600s, 12h, 24h } + +// Hashrate returns the H/s for a given window (seconds: 60, 600, 3600, 43200, 86400). +// +// hr60 := record.Hashrate(60) +func (r *WorkerRecord) Hashrate(seconds int) float64 { + for index, windowSize := range hashrateWindowSizes { + if windowSize == seconds { + return float64(sumBuckets(r.windows[index].buckets)) / float64(seconds) + } + } + + return 0 +} + +// NewWorkers creates the worker aggregate and subscribes it to the event bus. +// +// w := proxy.NewWorkers(proxy.WorkersByRigID, bus) +func NewWorkers(mode WorkersMode, bus *EventBus) *Workers { + workers := &Workers{ + mode: mode, + entries: make([]WorkerRecord, 0), + nameIndex: make(map[string]int), + idIndex: make(map[int64]int), + } + + if bus != nil { + bus.Subscribe(EventLogin, workers.onLogin) + bus.Subscribe(EventAccept, workers.onAccept) + bus.Subscribe(EventReject, workers.onReject) + bus.Subscribe(EventClose, workers.onClose) + } + + return workers +} + +// List returns a snapshot of all worker records in first-seen order. +// +// records := workers.List() +func (w *Workers) List() []WorkerRecord { + w.mu.RLock() + defer w.mu.RUnlock() + + records := make([]WorkerRecord, len(w.entries)) + copy(records, w.entries) + return records +} + +// Tick advances all worker hashrate windows. Called by the proxy tick loop every second. +// +// workers.Tick() +func (w *Workers) Tick() { + w.mu.Lock() + defer w.mu.Unlock() + + for entryIndex := range w.entries { + for windowIndex, size := range hashrateWindowSizes { + if windowIndex >= len(w.entries[entryIndex].windows) { + break + } + window := &w.entries[entryIndex].windows[windowIndex] + if window.size == 0 { + window.size = size + window.buckets = make([]uint64, size) + } + window.pos = (window.pos + 1) % window.size + window.buckets[window.pos] = 0 + } + } +} + +func (w *Workers) onLogin(event Event) { + if event.Miner == nil || w.mode == WorkersDisabled { + return + } + + name := w.workerName(event.Miner) + if name == "" { + return + } + + w.mu.Lock() + defer w.mu.Unlock() + + index, exists := w.nameIndex[name] + if !exists { + record := WorkerRecord{Name: name} + for windowIndex, size := range hashrateWindowSizes { + if windowIndex >= len(record.windows) { + break + } + record.windows[windowIndex] = tickWindow{ + buckets: make([]uint64, size), + size: size, + } + } + w.entries = append(w.entries, record) + index = len(w.entries) - 1 + w.nameIndex[name] = index + } + + record := &w.entries[index] + record.LastIP = event.Miner.IP() + record.Connections++ + w.idIndex[event.Miner.ID()] = index +} + +func (w *Workers) onAccept(event Event) { + w.updateShare(event, true) +} + +func (w *Workers) onReject(event Event) { + w.updateShare(event, false) +} + +func (w *Workers) onClose(event Event) { + if event.Miner == nil { + return + } + + w.mu.Lock() + defer w.mu.Unlock() + delete(w.idIndex, event.Miner.ID()) +} + +func (w *Workers) updateShare(event Event, accepted bool) { + if event.Miner == nil || w.mode == WorkersDisabled { + return + } + + w.mu.Lock() + defer w.mu.Unlock() + + index, exists := w.idIndex[event.Miner.ID()] + if !exists { + return + } + + record := &w.entries[index] + if accepted { + record.Accepted++ + record.Hashes += event.Diff + record.LastHashAt = time.Now().UTC() + for windowIndex := range record.windows { + record.windows[windowIndex].buckets[record.windows[windowIndex].pos] += event.Diff + } + return + } + + record.Rejected++ + if isInvalidShareError(event.Error) { + record.Invalid++ + } +} + +func (w *Workers) workerName(miner *Miner) string { + switch w.mode { + case WorkersByRigID: + if miner.RigID() != "" { + return miner.RigID() + } + return miner.User() + case WorkersByUser: + return miner.User() + case WorkersByPass: + return miner.Password() + case WorkersByAgent: + return miner.Agent() + case WorkersByIP: + return miner.IP() + default: + return "" + } +}