diff --git a/api/router.go b/api/router.go index 170514e..dddc15f 100644 --- a/api/router.go +++ b/api/router.go @@ -9,6 +9,18 @@ // proxyapi.RegisterRoutes(apiRouter, p) package api +import ( + "encoding/json" + "net/http" + + "dappco.re/go/core/proxy" +) + +// Router matches the standard http.ServeMux registration shape. +type Router interface { + HandleFunc(pattern string, handler func(http.ResponseWriter, *http.Request)) +} + // SummaryResponse is the /1/summary JSON body. // // {"version":"1.0.0","mode":"nicehash","hashrate":{"total":[...]}, ...} @@ -61,3 +73,115 @@ type ResultsResponse struct { HashesTotal uint64 `json:"hashes_total"` Best [10]uint64 `json:"best"` } + +// RegisterRoutes wires the monitoring endpoints onto the supplied router. +func RegisterRoutes(r Router, p *proxy.Proxy) { + if r == nil || p == nil { + return + } + r.HandleFunc("/1/summary", func(w http.ResponseWriter, req *http.Request) { + writeJSON(w, summaryResponse(p)) + }) + r.HandleFunc("/1/workers", func(w http.ResponseWriter, req *http.Request) { + writeJSON(w, workersResponse(p)) + }) + r.HandleFunc("/1/miners", func(w http.ResponseWriter, req *http.Request) { + writeJSON(w, minersResponse(p)) + }) +} + +func summaryResponse(p *proxy.Proxy) SummaryResponse { + summary := p.Summary() + now, max := p.MinerCount() + upstreams := p.Upstreams() + return SummaryResponse{ + Version: "1.0.0", + Mode: p.Mode(), + Hashrate: HashrateResponse{ + Total: summary.Hashrate, + }, + Miners: MinersCountResponse{ + Now: now, + Max: max, + }, + Workers: uint64(len(p.WorkerRecords())), + Upstreams: UpstreamResponse{ + Active: upstreams.Active, + Sleep: upstreams.Sleep, + Error: upstreams.Error, + Total: upstreams.Total, + Ratio: ratio(now, upstreams.Total), + }, + Results: ResultsResponse{ + Accepted: summary.Accepted, + Rejected: summary.Rejected, + Invalid: summary.Invalid, + Expired: summary.Expired, + AvgTime: summary.AvgTime, + Latency: summary.AvgLatency, + HashesTotal: summary.Hashes, + Best: summary.TopDiff, + }, + } +} + +func workersResponse(p *proxy.Proxy) any { + records := p.WorkerRecords() + rows := make([]any, 0, len(records)) + for _, record := range records { + rows = append(rows, []any{ + record.Name, + record.LastIP, + record.Connections, + record.Accepted, + record.Rejected, + record.Invalid, + record.Hashes, + record.LastHashAt.Unix(), + record.Hashrate(60), + record.Hashrate(600), + record.Hashrate(3600), + record.Hashrate(43200), + record.Hashrate(86400), + }) + } + return map[string]any{ + "mode": string(p.WorkersMode()), + "workers": rows, + } +} + +func minersResponse(p *proxy.Proxy) any { + records := p.MinerSnapshots() + rows := make([]any, 0, len(records)) + for _, miner := range records { + rows = append(rows, []any{ + miner.ID, + miner.IP, + miner.TX, + miner.RX, + miner.State, + miner.Diff, + miner.User, + miner.Password, + miner.RigID, + miner.Agent, + }) + } + return map[string]any{ + "format": []string{"id", "ip", "tx", "rx", "state", "diff", "user", "password", "rig_id", "agent"}, + "miners": rows, + } +} + +func writeJSON(w http.ResponseWriter, payload any) { + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(payload) +} + +func ratio(now, total uint64) float64 { + if total == 0 { + return 0 + } + return float64(now) / float64(total) +} diff --git a/config.go b/config.go index 895f7e6..9de0349 100644 --- a/config.go +++ b/config.go @@ -5,22 +5,22 @@ package proxy // cfg, result := proxy.LoadConfig("config.json") // if !result.OK { log.Fatal(result.Error) } type Config struct { - Mode string `json:"mode"` // "nicehash" or "simple" - Bind []BindAddr `json:"bind"` // listen addresses - Pools []PoolConfig `json:"pools"` // ordered primary + fallbacks - TLS TLSConfig `json:"tls"` // inbound TLS (miner-facing) - HTTP HTTPConfig `json:"http"` // monitoring API - AccessPassword string `json:"access-password"` // "" = no auth required - CustomDiff uint64 `json:"custom-diff"` // 0 = disabled - CustomDiffStats bool `json:"custom-diff-stats"` // report per custom-diff bucket - AlgoExtension bool `json:"algo-ext"` // forward algo field in jobs - Workers WorkersMode `json:"workers"` // "rig-id", "user", "password", "agent", "ip", "false" - AccessLogFile string `json:"access-log-file"` // "" = disabled - ReuseTimeout int `json:"reuse-timeout"` // seconds; simple mode upstream reuse - Retries int `json:"retries"` // pool reconnect attempts - RetryPause int `json:"retry-pause"` // seconds between retries - Watch bool `json:"watch"` // hot-reload on file change - RateLimit RateLimit `json:"rate-limit"` // per-IP connection rate limit + Mode string `json:"mode"` // "nicehash" or "simple" + Bind []BindAddr `json:"bind"` // listen addresses + Pools []PoolConfig `json:"pools"` // ordered primary + fallbacks + TLS TLSConfig `json:"tls"` // inbound TLS (miner-facing) + HTTP HTTPConfig `json:"http"` // monitoring API + AccessPassword string `json:"access-password"` // "" = no auth required + CustomDiff uint64 `json:"custom-diff"` // 0 = disabled + CustomDiffStats bool `json:"custom-diff-stats"` // report per custom-diff bucket + AlgoExtension bool `json:"algo-ext"` // forward algo field in jobs + Workers WorkersMode `json:"workers"` // "rig-id", "user", "password", "agent", "ip", "false" + AccessLogFile string `json:"access-log-file"` // "" = disabled + ReuseTimeout int `json:"reuse-timeout"` // seconds; simple mode upstream reuse + Retries int `json:"retries"` // pool reconnect attempts + RetryPause int `json:"retry-pause"` // seconds between retries + Watch bool `json:"watch"` // hot-reload on file change + RateLimit RateLimit `json:"rate-limit"` // per-IP connection rate limit } // BindAddr is one TCP listen endpoint. @@ -81,7 +81,7 @@ type RateLimit struct { type WorkersMode string const ( - WorkersByRigID WorkersMode = "rig-id" // rigid field, fallback to user + WorkersByRigID WorkersMode = "rig-id" // rigid field, fallback to user WorkersByUser WorkersMode = "user" WorkersByPass WorkersMode = "password" WorkersByAgent WorkersMode = "agent" diff --git a/core_impl.go b/core_impl.go new file mode 100644 index 0000000..dbacba7 --- /dev/null +++ b/core_impl.go @@ -0,0 +1,353 @@ +package proxy + +import ( + "crypto/rand" + "crypto/sha256" + "encoding/hex" + "encoding/json" + "errors" + "io" + "math" + "net" + "os" + "strconv" + "strings" + "sync" + "time" +) + +// Result is a small success/error carrier used by constructors and loaders. +type Result struct { + OK bool + Error error +} + +func successResult() Result { + return Result{OK: true} +} + +func errorResult(err error) Result { + return Result{OK: false, Error: err} +} + +var splitterRegistryMu sync.RWMutex +var splitterRegistry = map[string]func(*Config, *EventBus) Splitter{} + +// RegisterSplitterFactory registers a mode-specific splitter constructor. +// Packages such as splitter/nicehash and splitter/simple call this from init. +func RegisterSplitterFactory(mode string, factory func(*Config, *EventBus) Splitter) { + splitterRegistryMu.Lock() + defer splitterRegistryMu.Unlock() + splitterRegistry[strings.ToLower(mode)] = factory +} + +func getSplitterFactory(mode string) (func(*Config, *EventBus) Splitter, bool) { + splitterRegistryMu.RLock() + defer splitterRegistryMu.RUnlock() + factory, ok := splitterRegistry[strings.ToLower(mode)] + return factory, ok +} + +// LoadConfig reads and unmarshals a JSON config file. +func LoadConfig(path string) (*Config, Result) { + data, err := os.ReadFile(path) + if err != nil { + return nil, errorResult(err) + } + + cfg := &Config{} + if err := json.Unmarshal(data, cfg); err != nil { + return nil, errorResult(err) + } + + if cfg.Mode == "" { + cfg.Mode = "nicehash" + } + return cfg, cfg.Validate() +} + +// Validate checks that mandatory bind and pool settings are present. +func (c *Config) Validate() Result { + if c == nil { + return errorResult(errors.New("config is nil")) + } + if len(c.Bind) == 0 { + return errorResult(errors.New("bind list is empty")) + } + if len(c.Pools) == 0 { + return errorResult(errors.New("pool list is empty")) + } + for _, pool := range c.Pools { + if pool.Enabled && strings.TrimSpace(pool.URL) == "" { + return errorResult(errors.New("enabled pool url is empty")) + } + } + return successResult() +} + +// NewEventBus creates an empty synchronous event dispatcher. +func NewEventBus() *EventBus { + return &EventBus{listeners: make(map[EventType][]EventHandler)} +} + +// Subscribe registers a handler for the given event type. +func (b *EventBus) Subscribe(t EventType, h EventHandler) { + if b == nil || h == nil { + return + } + b.mu.Lock() + defer b.mu.Unlock() + if b.listeners == nil { + b.listeners = make(map[EventType][]EventHandler) + } + b.listeners[t] = append(b.listeners[t], h) +} + +// Dispatch calls all registered handlers for the event's type. +func (b *EventBus) Dispatch(e Event) { + if b == nil { + return + } + b.mu.RLock() + handlers := append([]EventHandler(nil), b.listeners[e.Type]...) + b.mu.RUnlock() + for _, handler := range handlers { + handler(e) + } +} + +// IsValid returns true when the job contains a blob and job id. +func (j Job) IsValid() bool { + return j.Blob != "" && j.JobID != "" +} + +// BlobWithFixedByte replaces the blob byte at position 39 with fixedByte. +func (j Job) BlobWithFixedByte(fixedByte uint8) string { + if len(j.Blob) < 80 { + return j.Blob + } + blob := []byte(j.Blob) + encoded := make([]byte, 2) + hex.Encode(encoded, []byte{fixedByte}) + blob[78] = encoded[0] + blob[79] = encoded[1] + return string(blob) +} + +// DifficultyFromTarget converts the target to a rough integer difficulty. +func (j Job) DifficultyFromTarget() uint64 { + if len(j.Target) != 8 { + return 0 + } + raw, err := hex.DecodeString(j.Target) + if err != nil || len(raw) != 4 { + return 0 + } + target := uint32(raw[0]) | uint32(raw[1])<<8 | uint32(raw[2])<<16 | uint32(raw[3])<<24 + if target == 0 { + return 0 + } + return uint64(math.MaxUint32 / uint64(target)) +} + +// NewCustomDiff creates a login-time custom difficulty resolver. +func NewCustomDiff(globalDiff uint64) *CustomDiff { + return &CustomDiff{globalDiff: globalDiff} +} + +// OnLogin parses +N suffixes and applies global difficulty fallbacks. +func (cd *CustomDiff) OnLogin(e Event) { + if cd == nil || e.Miner == nil { + return + } + miner := e.Miner + user := miner.user + plus := strings.LastIndex(user, "+") + if plus >= 0 && plus < len(user)-1 { + if parsed, err := strconv.ParseUint(user[plus+1:], 10, 64); err == nil { + miner.user = user[:plus] + miner.customDiff = parsed + } + return + } + if cd.globalDiff > 0 { + miner.customDiff = cd.globalDiff + } +} + +// NewRateLimiter creates a per-IP token bucket limiter. +func NewRateLimiter(cfg RateLimit) *RateLimiter { + return &RateLimiter{ + cfg: cfg, + buckets: make(map[string]*tokenBucket), + banned: make(map[string]time.Time), + } +} + +// Allow returns true if the IP address is permitted to open a new connection. +func (rl *RateLimiter) Allow(ip string) bool { + if rl == nil || rl.cfg.MaxConnectionsPerMinute <= 0 { + return true + } + host := hostOnly(ip) + now := time.Now() + + rl.mu.Lock() + defer rl.mu.Unlock() + + if until, banned := rl.banned[host]; banned { + if now.Before(until) { + return false + } + delete(rl.banned, host) + } + + bucket, ok := rl.buckets[host] + if !ok { + bucket = &tokenBucket{tokens: rl.cfg.MaxConnectionsPerMinute, lastRefill: now} + rl.buckets[host] = bucket + } + + refillBucket(bucket, rl.cfg.MaxConnectionsPerMinute, now) + if bucket.tokens <= 0 { + if rl.cfg.BanDurationSeconds > 0 { + rl.banned[host] = now.Add(time.Duration(rl.cfg.BanDurationSeconds) * time.Second) + } + return false + } + + bucket.tokens-- + bucket.lastRefill = now + return true +} + +// Tick removes expired ban entries and refills token buckets. +func (rl *RateLimiter) Tick() { + if rl == nil || rl.cfg.MaxConnectionsPerMinute <= 0 { + return + } + now := time.Now() + + rl.mu.Lock() + defer rl.mu.Unlock() + + for host, until := range rl.banned { + if !now.Before(until) { + delete(rl.banned, host) + } + } + for _, bucket := range rl.buckets { + refillBucket(bucket, rl.cfg.MaxConnectionsPerMinute, now) + } +} + +// NewConfigWatcher creates a polling watcher for a config file. +func NewConfigWatcher(path string, onChange func(*Config)) *ConfigWatcher { + return &ConfigWatcher{ + path: path, + onChange: onChange, + done: make(chan struct{}), + } +} + +// Start begins the 1-second polling loop. +func (w *ConfigWatcher) Start() { + if w == nil || w.path == "" || w.onChange == nil { + return + } + go func() { + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + for { + select { + case <-ticker.C: + info, err := os.Stat(w.path) + if err != nil { + continue + } + mod := info.ModTime() + if mod.After(w.lastMod) { + w.lastMod = mod + cfg, result := LoadConfig(w.path) + if result.OK && cfg != nil { + w.onChange(cfg) + } + } + case <-w.done: + return + } + } + }() +} + +// Stop ends the watcher goroutine. +func (w *ConfigWatcher) Stop() { + if w == nil { + return + } + select { + case <-w.done: + default: + close(w.done) + } +} + +func hostOnly(ip string) string { + host, _, err := net.SplitHostPort(ip) + if err == nil { + return host + } + return ip +} + +func refillBucket(bucket *tokenBucket, limit int, now time.Time) { + if bucket == nil || limit <= 0 { + return + } + if bucket.lastRefill.IsZero() { + bucket.lastRefill = now + if bucket.tokens <= 0 { + bucket.tokens = limit + } + return + } + interval := time.Duration(60/limit) * time.Second + if interval <= 0 { + interval = time.Second + } + elapsed := now.Sub(bucket.lastRefill) + if elapsed < interval { + return + } + add := int(elapsed / interval) + bucket.tokens += add + if bucket.tokens > limit { + bucket.tokens = limit + } + bucket.lastRefill = bucket.lastRefill.Add(time.Duration(add) * interval) +} + +func generateUUID() string { + var b [16]byte + if _, err := io.ReadFull(rand.Reader, b[:]); err != nil { + return strconv.FormatInt(time.Now().UnixNano(), 16) + } + b[6] = (b[6] & 0x0f) | 0x40 + b[8] = (b[8] & 0x3f) | 0x80 + var out [36]byte + hex.Encode(out[0:8], b[0:4]) + out[8] = '-' + hex.Encode(out[9:13], b[4:6]) + out[13] = '-' + hex.Encode(out[14:18], b[6:8]) + out[18] = '-' + hex.Encode(out[19:23], b[8:10]) + out[23] = '-' + hex.Encode(out[24:36], b[10:16]) + return string(out[:]) +} + +func sha256Hex(data []byte) string { + sum := sha256.Sum256(data) + return hex.EncodeToString(sum[:]) +} diff --git a/customdiff_test.go b/customdiff_test.go new file mode 100644 index 0000000..0e40bbb --- /dev/null +++ b/customdiff_test.go @@ -0,0 +1,30 @@ +package proxy + +import "testing" + +func TestCustomDiff_OnLogin(t *testing.T) { + cd := NewCustomDiff(10000) + miner := &Miner{user: "WALLET+50000"} + cd.OnLogin(Event{Miner: miner}) + if miner.User() != "WALLET" { + t.Fatalf("expected stripped user, got %q", miner.User()) + } + if miner.customDiff != 50000 { + t.Fatalf("expected custom diff 50000, got %d", miner.customDiff) + } + + miner = &Miner{user: "WALLET+abc"} + cd.OnLogin(Event{Miner: miner}) + if miner.User() != "WALLET+abc" { + t.Fatalf("expected invalid suffix to remain unchanged") + } + if miner.customDiff != 0 { + t.Fatalf("expected custom diff 0 for invalid suffix, got %d", miner.customDiff) + } + + miner = &Miner{user: "WALLET"} + cd.OnLogin(Event{Miner: miner}) + if miner.customDiff != 10000 { + t.Fatalf("expected global diff fallback, got %d", miner.customDiff) + } +} diff --git a/job_test.go b/job_test.go new file mode 100644 index 0000000..6477874 --- /dev/null +++ b/job_test.go @@ -0,0 +1,24 @@ +package proxy + +import ( + "strings" + "testing" +) + +func TestJob_BlobWithFixedByte(t *testing.T) { + job := Job{Blob: strings.Repeat("0", 160)} + got := job.BlobWithFixedByte(0x2A) + if len(got) != 160 { + t.Fatalf("expected length 160, got %d", len(got)) + } + if got[78:80] != "2a" { + t.Fatalf("expected fixed byte patch, got %q", got[78:80]) + } +} + +func TestJob_DifficultyFromTarget(t *testing.T) { + job := Job{Target: "b88d0600"} + if got := job.DifficultyFromTarget(); got == 0 { + t.Fatalf("expected non-zero difficulty") + } +} diff --git a/log/access.go b/log/access.go index b5b0105..2d53a9f 100644 --- a/log/access.go +++ b/log/access.go @@ -5,7 +5,10 @@ // bus.Subscribe(proxy.EventClose, al.OnClose) package log -import "sync" +import ( + "os" + "sync" +) // AccessLog writes connection lifecycle lines to an append-only text file. // @@ -18,6 +21,5 @@ import "sync" type AccessLog struct { path string mu sync.Mutex - // f is opened append-only on first write; nil until first event. - // Uses core.File for I/O abstraction. + f *os.File } diff --git a/log/impl.go b/log/impl.go new file mode 100644 index 0000000..9c4156a --- /dev/null +++ b/log/impl.go @@ -0,0 +1,140 @@ +package log + +import ( + "os" + "strconv" + "strings" + "time" + + "dappco.re/go/core/proxy" +) + +// NewAccessLog creates an append-only access log. +func NewAccessLog(path string) *AccessLog { + return &AccessLog{path: path} +} + +// OnLogin writes a CONNECT line. +func (l *AccessLog) OnLogin(e proxy.Event) { + if l == nil || e.Miner == nil { + return + } + l.writeLine("CONNECT", e.Miner.IP(), e.Miner.User(), e.Miner.Agent(), 0, 0) +} + +// OnClose writes a CLOSE line with byte counts. +func (l *AccessLog) OnClose(e proxy.Event) { + if l == nil || e.Miner == nil { + return + } + l.writeLine("CLOSE", e.Miner.IP(), e.Miner.User(), "", e.Miner.RX(), e.Miner.TX()) +} + +// NewShareLog creates an append-only share log. +func NewShareLog(path string) *ShareLog { + return &ShareLog{path: path} +} + +// OnAccept writes an ACCEPT line. +func (l *ShareLog) OnAccept(e proxy.Event) { + if l == nil || e.Miner == nil { + return + } + l.writeAcceptLine(e.Miner.User(), e.Diff, uint64(e.Latency)) +} + +// OnReject writes a REJECT line. +func (l *ShareLog) OnReject(e proxy.Event) { + if l == nil || e.Miner == nil { + return + } + l.writeRejectLine(e.Miner.User(), e.Error) +} + +func (l *AccessLog) writeLine(kind, ip, user, agent string, rx, tx uint64) { + l.mu.Lock() + defer l.mu.Unlock() + if err := l.ensureFile(); err != nil { + return + } + var builder strings.Builder + builder.WriteString(time.Now().UTC().Format(time.RFC3339)) + builder.WriteByte(' ') + builder.WriteString(kind) + builder.WriteString(" ") + builder.WriteString(ip) + builder.WriteString(" ") + builder.WriteString(user) + if agent != "" { + builder.WriteString(" ") + builder.WriteString(agent) + } + if rx > 0 || tx > 0 { + builder.WriteString(" rx=") + builder.WriteString(strconv.FormatUint(rx, 10)) + builder.WriteString(" tx=") + builder.WriteString(strconv.FormatUint(tx, 10)) + } + builder.WriteByte('\n') + _, _ = l.f.WriteString(builder.String()) +} + +func (l *ShareLog) writeAcceptLine(user string, diff uint64, latency uint64) { + l.mu.Lock() + defer l.mu.Unlock() + if err := l.ensureFile(); err != nil { + return + } + var builder strings.Builder + builder.WriteString(time.Now().UTC().Format(time.RFC3339)) + builder.WriteString(" ACCEPT") + builder.WriteString(" ") + builder.WriteString(user) + builder.WriteString(" diff=") + builder.WriteString(strconv.FormatUint(diff, 10)) + builder.WriteString(" latency=") + builder.WriteString(strconv.FormatUint(latency, 10)) + builder.WriteString("ms") + builder.WriteByte('\n') + _, _ = l.f.WriteString(builder.String()) +} + +func (l *ShareLog) writeRejectLine(user, reason string) { + l.mu.Lock() + defer l.mu.Unlock() + if err := l.ensureFile(); err != nil { + return + } + var builder strings.Builder + builder.WriteString(time.Now().UTC().Format(time.RFC3339)) + builder.WriteString(" REJECT ") + builder.WriteString(user) + builder.WriteString(" reason=\"") + builder.WriteString(reason) + builder.WriteString("\"\n") + _, _ = l.f.WriteString(builder.String()) +} + +func (l *AccessLog) ensureFile() error { + if l.f != nil { + return nil + } + f, err := os.OpenFile(l.path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0o644) + if err != nil { + return err + } + l.f = f + return nil +} + +func (l *ShareLog) ensureFile() error { + if l.f != nil { + return nil + } + f, err := os.OpenFile(l.path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0o644) + if err != nil { + return err + } + l.f = f + return nil +} diff --git a/log/share.go b/log/share.go index cec2b74..ef10284 100644 --- a/log/share.go +++ b/log/share.go @@ -1,6 +1,9 @@ package log -import "sync" +import ( + "os" + "sync" +) // ShareLog writes share result lines to an append-only text file. // @@ -13,6 +16,5 @@ import "sync" type ShareLog struct { path string mu sync.Mutex - // f is opened append-only on first write; nil until first event. - // Uses core.File for I/O abstraction. + f *os.File } diff --git a/miner.go b/miner.go index 619ed74..a5b9b53 100644 --- a/miner.go +++ b/miner.go @@ -25,28 +25,35 @@ const ( // m := proxy.NewMiner(conn, 3333, nil) // m.Start() type Miner struct { - id int64 // monotonically increasing per-process; atomic assignment - rpcID string // UUID v4 sent to miner as session id + id int64 // monotonically increasing per-process; atomic assignment + rpcID string // UUID v4 sent to miner as session id state MinerState - extAlgo bool // miner sent algo list in login params - extNH bool // NiceHash mode active (fixed byte splitting) - ip string // remote IP (without port, for logging) + extAlgo bool // miner sent algo list in login params + extNH bool // NiceHash mode active (fixed byte splitting) + ip string // remote IP (without port, for logging) localPort uint16 - user string // login params.login (wallet address), custom diff suffix stripped - password string // login params.pass - agent string // login params.agent - rigID string // login params.rigid (optional extension) - fixedByte uint8 // NiceHash slot index (0-255) - mapperID int64 // which NonceMapper owns this miner; -1 = unassigned - routeID int64 // SimpleMapper ID in simple mode; -1 = unassigned - customDiff uint64 // 0 = use pool diff; non-zero = cap diff to this value - diff uint64 // last difficulty sent to this miner from the pool - rx uint64 // bytes received from miner - tx uint64 // bytes sent to miner + user string // login params.login (wallet address), custom diff suffix stripped + password string // login params.pass + agent string // login params.agent + rigID string // login params.rigid (optional extension) + fixedByte uint8 // NiceHash slot index (0-255) + mapperID int64 // which NonceMapper owns this miner; -1 = unassigned + routeID int64 // SimpleMapper ID in simple mode; -1 = unassigned + customDiff uint64 // 0 = use pool diff; non-zero = cap diff to this value + accessPassword string + globalDiff uint64 + diff uint64 // last difficulty sent to this miner from the pool + rx uint64 // bytes received from miner + tx uint64 // bytes sent from miner + currentJob Job connectedAt time.Time lastActivityAt time.Time conn net.Conn - tlsConn *tls.Conn // nil if plain TCP - sendMu sync.Mutex // serialises writes to conn + tlsConn *tls.Conn // nil if plain TCP + sendMu sync.Mutex // serialises writes to conn buf [16384]byte // per-miner send buffer; avoids per-write allocations + onLogin func(*Miner) + onSubmit func(*Miner, *SubmitEvent) + onClose func(*Miner) + closeOnce sync.Once } diff --git a/pool/client.go b/pool/client.go index 8b60f8e..24c4a69 100644 --- a/pool/client.go +++ b/pool/client.go @@ -19,14 +19,17 @@ import ( // client := pool.NewStratumClient(poolCfg, listener) // client.Connect() type StratumClient struct { - cfg proxy.PoolConfig - listener StratumListener - conn net.Conn - tlsConn *tls.Conn // nil if plain TCP - sessionID string // pool-assigned session id from login reply - seq int64 // atomic JSON-RPC request id counter - active bool // true once first job received - sendMu sync.Mutex + cfg proxy.PoolConfig + listener StratumListener + conn net.Conn + tlsConn *tls.Conn // nil if plain TCP + sessionID string // pool-assigned session id from login reply + seq int64 // atomic JSON-RPC request id counter + active bool // true once first job received + pending map[int64]struct{} + closedOnce sync.Once + mu sync.Mutex + sendMu sync.Mutex } // StratumListener receives events from the pool connection. diff --git a/pool/impl.go b/pool/impl.go new file mode 100644 index 0000000..9251145 --- /dev/null +++ b/pool/impl.go @@ -0,0 +1,446 @@ +package pool + +import ( + "bufio" + "crypto/sha256" + "crypto/tls" + "encoding/hex" + "encoding/json" + "errors" + "io" + "net" + "strconv" + "strings" + "sync/atomic" + "time" + + "dappco.re/go/core/proxy" +) + +// NewStrategyFactory creates a StrategyFactory for the supplied config. +func NewStrategyFactory(cfg *proxy.Config) StrategyFactory { + return func(listener StratumListener) Strategy { + return NewFailoverStrategy(cfg.Pools, listener, cfg) + } +} + +// NewStratumClient constructs a pool client. +func NewStratumClient(cfg proxy.PoolConfig, listener StratumListener) *StratumClient { + return &StratumClient{ + cfg: cfg, + listener: listener, + pending: make(map[int64]struct{}), + } +} + +// IsActive reports whether the client has received at least one job. +func (c *StratumClient) IsActive() bool { + if c == nil { + return false + } + c.mu.Lock() + defer c.mu.Unlock() + return c.active +} + +// Connect dials the pool. +func (c *StratumClient) Connect() proxy.Result { + if c == nil { + return proxy.Result{OK: false, Error: errors.New("client is nil")} + } + addr := c.cfg.URL + if addr == "" { + return proxy.Result{OK: false, Error: errors.New("pool url is empty")} + } + conn, err := net.Dial("tcp", addr) + if err != nil { + return proxy.Result{OK: false, Error: err} + } + if c.cfg.TLS { + host := addr + if strings.Contains(addr, ":") { + host, _, _ = net.SplitHostPort(addr) + } + tlsCfg := &tls.Config{InsecureSkipVerify: true, ServerName: host} + tlsConn := tls.Client(conn, tlsCfg) + if err := tlsConn.Handshake(); err != nil { + _ = conn.Close() + return proxy.Result{OK: false, Error: err} + } + if fp := strings.TrimSpace(strings.ToLower(c.cfg.TLSFingerprint)); fp != "" { + cert := tlsConn.ConnectionState().PeerCertificates + if len(cert) == 0 { + _ = tlsConn.Close() + return proxy.Result{OK: false, Error: errors.New("missing certificate")} + } + sum := sha256.Sum256(cert[0].Raw) + if hex.EncodeToString(sum[:]) != fp { + _ = tlsConn.Close() + return proxy.Result{OK: false, Error: errors.New("tls fingerprint mismatch")} + } + } + c.conn = tlsConn + c.tlsConn = tlsConn + } else { + c.conn = conn + } + go c.readLoop() + return proxy.Result{OK: true} +} + +// Login sends the miner-style login request to the pool. +func (c *StratumClient) Login() { + if c == nil || c.conn == nil { + return + } + params := map[string]any{ + "login": c.cfg.User, + "pass": c.cfg.Pass, + } + if c.cfg.RigID != "" { + params["rigid"] = c.cfg.RigID + } + if c.cfg.Algo != "" { + params["algo"] = []string{c.cfg.Algo} + } + req := map[string]any{ + "id": 1, + "jsonrpc": "2.0", + "method": "login", + "params": params, + } + _ = c.writeJSON(req) +} + +// Submit forwards a share to the pool. +func (c *StratumClient) Submit(jobID, nonce, result, algo string) int64 { + if c == nil { + return 0 + } + seq := atomic.AddInt64(&c.seq, 1) + c.mu.Lock() + c.pending[seq] = struct{}{} + sessionID := c.sessionID + c.mu.Unlock() + req := map[string]any{ + "id": seq, + "jsonrpc": "2.0", + "method": "submit", + "params": map[string]any{ + "id": sessionID, + "job_id": jobID, + "nonce": nonce, + "result": result, + "algo": algo, + }, + } + _ = c.writeJSON(req) + return seq +} + +// Disconnect closes the connection and notifies the listener. +func (c *StratumClient) Disconnect() { + if c == nil { + return + } + c.closedOnce.Do(func() { + if c.conn != nil { + _ = c.conn.Close() + } + if c.listener != nil { + c.listener.OnDisconnect() + } + }) +} + +func (c *StratumClient) notifyDisconnect() { + c.closedOnce.Do(func() { + if c.listener != nil { + c.listener.OnDisconnect() + } + }) +} + +func (c *StratumClient) writeJSON(payload any) error { + c.sendMu.Lock() + defer c.sendMu.Unlock() + if c.conn == nil { + return errors.New("connection is nil") + } + data, err := json.Marshal(payload) + if err != nil { + return err + } + data = append(data, '\n') + _, err = c.conn.Write(data) + if err != nil { + c.notifyDisconnect() + } + return err +} + +func (c *StratumClient) readLoop() { + defer c.notifyDisconnect() + reader := bufio.NewReader(c.conn) + for { + line, isPrefix, err := reader.ReadLine() + if err != nil { + if err == io.EOF { + return + } + return + } + if isPrefix { + return + } + if len(line) == 0 { + continue + } + c.handleMessage(line) + } +} + +func (c *StratumClient) handleMessage(line []byte) { + var base struct { + ID any `json:"id"` + Method string `json:"method"` + Result json.RawMessage `json:"result"` + Error json.RawMessage `json:"error"` + Params json.RawMessage `json:"params"` + } + if err := json.Unmarshal(line, &base); err != nil { + return + } + + if len(base.Result) > 0 { + var loginReply struct { + ID string `json:"id"` + Job *struct { + Blob string `json:"blob"` + JobID string `json:"job_id"` + Target string `json:"target"` + Algo string `json:"algo"` + Height uint64 `json:"height"` + SeedHash string `json:"seed_hash"` + ID string `json:"id"` + } `json:"job"` + } + if err := json.Unmarshal(base.Result, &loginReply); err == nil { + if loginReply.ID != "" { + c.mu.Lock() + c.sessionID = loginReply.ID + c.mu.Unlock() + } + if loginReply.Job != nil && loginReply.Job.JobID != "" { + c.mu.Lock() + c.active = true + c.mu.Unlock() + if c.listener != nil { + c.listener.OnJob(proxy.Job{ + Blob: loginReply.Job.Blob, + JobID: loginReply.Job.JobID, + Target: loginReply.Job.Target, + Algo: loginReply.Job.Algo, + Height: loginReply.Job.Height, + SeedHash: loginReply.Job.SeedHash, + ClientID: loginReply.Job.ID, + }) + } + return + } + } + } + + if base.Method == "job" { + var params struct { + Blob string `json:"blob"` + JobID string `json:"job_id"` + Target string `json:"target"` + Algo string `json:"algo"` + Height uint64 `json:"height"` + SeedHash string `json:"seed_hash"` + ID string `json:"id"` + } + if err := json.Unmarshal(base.Params, ¶ms); err != nil { + return + } + c.mu.Lock() + c.active = true + c.mu.Unlock() + if c.listener != nil { + c.listener.OnJob(proxy.Job{ + Blob: params.Blob, + JobID: params.JobID, + Target: params.Target, + Algo: params.Algo, + Height: params.Height, + SeedHash: params.SeedHash, + ClientID: params.ID, + }) + } + return + } + + seq := requestID(base.ID) + if seq == 0 { + return + } + c.mu.Lock() + _, ok := c.pending[seq] + if ok { + delete(c.pending, seq) + } + c.mu.Unlock() + if !ok { + return + } + + var payload struct { + Status string `json:"status"` + } + if len(base.Result) > 0 { + _ = json.Unmarshal(base.Result, &payload) + } + accepted := len(base.Error) == 0 + if payload.Status != "" && strings.EqualFold(payload.Status, "OK") { + accepted = true + } + errorMessage := "" + if !accepted && len(base.Error) > 0 { + var errPayload struct { + Message string `json:"message"` + } + _ = json.Unmarshal(base.Error, &errPayload) + errorMessage = errPayload.Message + } + if c.listener != nil { + c.listener.OnResultAccepted(seq, accepted, errorMessage) + } +} + +// NewFailoverStrategy creates the ordered pool failover wrapper. +func NewFailoverStrategy(pools []proxy.PoolConfig, listener StratumListener, cfg *proxy.Config) *FailoverStrategy { + return &FailoverStrategy{ + pools: pools, + listener: listener, + cfg: cfg, + } +} + +// Connect establishes the first reachable pool connection. +func (s *FailoverStrategy) Connect() { + if s == nil { + return + } + s.mu.Lock() + defer s.mu.Unlock() + s.connectLocked(0) +} + +func (s *FailoverStrategy) connectLocked(start int) { + enabled := enabledPools(s.pools) + if len(enabled) == 0 { + return + } + retries := 1 + retryPause := time.Second + if s.cfg != nil { + if s.cfg.Retries > 0 { + retries = s.cfg.Retries + } + if s.cfg.RetryPause > 0 { + retryPause = time.Duration(s.cfg.RetryPause) * time.Second + } + } + for attempt := 0; attempt < retries; attempt++ { + for i := 0; i < len(enabled); i++ { + index := (start + i) % len(enabled) + poolCfg := enabled[index] + client := NewStratumClient(poolCfg, s) + if result := client.Connect(); result.OK { + s.client = client + s.current = index + client.Login() + return + } + } + time.Sleep(retryPause) + } +} + +// Submit sends the share through the active client. +func (s *FailoverStrategy) Submit(jobID, nonce, result, algo string) int64 { + if s == nil || s.client == nil { + return 0 + } + return s.client.Submit(jobID, nonce, result, algo) +} + +// Disconnect closes the active client. +func (s *FailoverStrategy) Disconnect() { + if s == nil { + return + } + s.mu.Lock() + defer s.mu.Unlock() + if s.client != nil { + s.client.Disconnect() + s.client = nil + } +} + +// IsActive reports whether the current client has received a job. +func (s *FailoverStrategy) IsActive() bool { + return s != nil && s.client != nil && s.client.IsActive() +} + +// OnJob forwards the pool job to the outer listener. +func (s *FailoverStrategy) OnJob(job proxy.Job) { + if s != nil && s.listener != nil { + s.listener.OnJob(job) + } +} + +// OnResultAccepted forwards the result status to the outer listener. +func (s *FailoverStrategy) OnResultAccepted(sequence int64, accepted bool, errorMessage string) { + if s != nil && s.listener != nil { + s.listener.OnResultAccepted(sequence, accepted, errorMessage) + } +} + +// OnDisconnect retries from the primary pool and forwards the disconnect. +func (s *FailoverStrategy) OnDisconnect() { + if s == nil { + return + } + if s.listener != nil { + s.listener.OnDisconnect() + } + go s.Connect() +} + +func enabledPools(pools []proxy.PoolConfig) []proxy.PoolConfig { + out := make([]proxy.PoolConfig, 0, len(pools)) + for _, poolCfg := range pools { + if poolCfg.Enabled { + out = append(out, poolCfg) + } + } + return out +} + +func requestID(id any) int64 { + switch v := id.(type) { + case float64: + return int64(v) + case int64: + return v + case int: + return int64(v) + case string: + n, _ := strconv.ParseInt(v, 10, 64) + return n + default: + return 0 + } +} diff --git a/proxy.go b/proxy.go index e0aa703..72d9d15 100644 --- a/proxy.go +++ b/proxy.go @@ -11,6 +11,7 @@ package proxy import ( + "net/http" "sync" "time" ) @@ -21,15 +22,21 @@ import ( // p, result := proxy.New(cfg) // if result.OK { p.Start() } type Proxy struct { - config *Config - splitter Splitter - stats *Stats - workers *Workers - events *EventBus - servers []*Server - ticker *time.Ticker - watcher *ConfigWatcher - done chan struct{} + config *Config + splitter Splitter + stats *Stats + workers *Workers + events *EventBus + servers []*Server + ticker *time.Ticker + watcher *ConfigWatcher + done chan struct{} + stopOnce sync.Once + minersMu sync.RWMutex + miners map[int64]*Miner + customDiff *CustomDiff + rateLimit *RateLimiter + httpServer *http.Server } // Splitter is the interface both NonceSplitter and SimpleSplitter satisfy. diff --git a/ratelimit_test.go b/ratelimit_test.go new file mode 100644 index 0000000..b310dd7 --- /dev/null +++ b/ratelimit_test.go @@ -0,0 +1,13 @@ +package proxy + +import "testing" + +func TestRateLimiter_Allow(t *testing.T) { + rl := NewRateLimiter(RateLimit{MaxConnectionsPerMinute: 1, BanDurationSeconds: 1}) + if !rl.Allow("1.2.3.4:1234") { + t.Fatalf("expected first call to pass") + } + if rl.Allow("1.2.3.4:1234") { + t.Fatalf("expected second call to fail") + } +} diff --git a/splitter/nicehash/impl.go b/splitter/nicehash/impl.go new file mode 100644 index 0000000..52f2b82 --- /dev/null +++ b/splitter/nicehash/impl.go @@ -0,0 +1,383 @@ +package nicehash + +import ( + "time" + + "dappco.re/go/core/proxy" + "dappco.re/go/core/proxy/pool" +) + +func init() { + proxy.RegisterSplitterFactory("nicehash", func(cfg *proxy.Config, events *proxy.EventBus) proxy.Splitter { + return NewNonceSplitter(cfg, events, pool.NewStrategyFactory(cfg)) + }) +} + +// NewNonceSplitter creates a NiceHash splitter. +func NewNonceSplitter(cfg *proxy.Config, events *proxy.EventBus, factory pool.StrategyFactory) *NonceSplitter { + if factory == nil { + factory = pool.NewStrategyFactory(cfg) + } + return &NonceSplitter{ + byID: make(map[int64]*NonceMapper), + cfg: cfg, + events: events, + strategyFactory: factory, + } +} + +// Connect establishes the first mapper. +func (s *NonceSplitter) Connect() { + if s == nil { + return + } + s.mu.Lock() + defer s.mu.Unlock() + if len(s.mappers) == 0 { + s.addMapperLocked() + } + for _, mapper := range s.mappers { + if mapper.strategy != nil { + mapper.strategy.Connect() + return + } + } +} + +// OnLogin assigns the miner to a mapper with a free slot. +func (s *NonceSplitter) OnLogin(event *proxy.LoginEvent) { + if s == nil || event == nil || event.Miner == nil { + return + } + s.mu.Lock() + defer s.mu.Unlock() + event.Miner.SetExtendedNiceHash(true) + for _, mapper := range s.mappers { + if mapper.Add(event.Miner) { + s.byID[mapper.id] = mapper + return + } + } + mapper := s.addMapperLocked() + if mapper != nil { + _ = mapper.Add(event.Miner) + s.byID[mapper.id] = mapper + } +} + +// OnSubmit forwards a share to the owning mapper. +func (s *NonceSplitter) OnSubmit(event *proxy.SubmitEvent) { + if s == nil || event == nil || event.Miner == nil { + return + } + s.mu.RLock() + mapper := s.byID[event.Miner.MapperID()] + s.mu.RUnlock() + if mapper != nil { + mapper.Submit(event) + } +} + +// OnClose releases the miner slot. +func (s *NonceSplitter) OnClose(event *proxy.CloseEvent) { + if s == nil || event == nil || event.Miner == nil { + return + } + s.mu.RLock() + mapper := s.byID[event.Miner.MapperID()] + s.mu.RUnlock() + if mapper != nil { + mapper.Remove(event.Miner) + } +} + +// GC removes empty mappers that have been idle. +func (s *NonceSplitter) GC() { + if s == nil { + return + } + s.mu.Lock() + defer s.mu.Unlock() + now := time.Now() + next := s.mappers[:0] + for _, mapper := range s.mappers { + free, dead, active := mapper.storage.SlotCount() + if active == 0 && dead == 0 && now.Sub(mapper.lastUsed) > time.Minute { + if mapper.strategy != nil { + mapper.strategy.Disconnect() + } + delete(s.byID, mapper.id) + _ = free + continue + } + next = append(next, mapper) + } + s.mappers = next +} + +// Tick is called once per second. +func (s *NonceSplitter) Tick(ticks uint64) {} + +// Upstreams returns pool connection counts. +func (s *NonceSplitter) Upstreams() proxy.UpstreamStats { + if s == nil { + return proxy.UpstreamStats{} + } + s.mu.RLock() + defer s.mu.RUnlock() + var stats proxy.UpstreamStats + for _, mapper := range s.mappers { + if mapper.strategy != nil && mapper.strategy.IsActive() { + stats.Active++ + } else if mapper.suspended > 0 { + stats.Error++ + } + } + stats.Total = uint64(len(s.mappers)) + return stats +} + +func (s *NonceSplitter) addMapperLocked() *NonceMapper { + id := s.seq + s.seq++ + mapper := NewNonceMapper(id, s.cfg, nil) + mapper.events = s.events + mapper.lastUsed = time.Now() + mapper.strategy = s.strategyFactory(mapper) + s.mappers = append(s.mappers, mapper) + if s.byID == nil { + s.byID = make(map[int64]*NonceMapper) + } + s.byID[mapper.id] = mapper + return mapper +} + +// NewNonceMapper creates a mapper for one upstream connection. +func NewNonceMapper(id int64, cfg *proxy.Config, strategy pool.Strategy) *NonceMapper { + return &NonceMapper{ + id: id, + storage: NewNonceStorage(), + strategy: strategy, + pending: make(map[int64]SubmitContext), + cfg: cfg, + } +} + +// Add assigns a miner to a free slot. +func (m *NonceMapper) Add(miner *proxy.Miner) bool { + if m == nil || miner == nil { + return false + } + m.mu.Lock() + defer m.mu.Unlock() + ok := m.storage.Add(miner) + if ok { + miner.SetMapperID(m.id) + miner.SetExtendedNiceHash(true) + m.lastUsed = time.Now() + m.storage.mu.Lock() + job := m.storage.job + m.storage.mu.Unlock() + if job.IsValid() { + miner.ForwardJob(job, job.Algo) + } + } + return ok +} + +// Remove marks the miner slot as dead. +func (m *NonceMapper) Remove(miner *proxy.Miner) { + if m == nil || miner == nil { + return + } + m.mu.Lock() + defer m.mu.Unlock() + m.storage.Remove(miner) + miner.SetMapperID(-1) + m.lastUsed = time.Now() +} + +// Submit forwards the share to the pool. +func (m *NonceMapper) Submit(event *proxy.SubmitEvent) { + if m == nil || event == nil || event.Miner == nil || m.strategy == nil { + return + } + m.mu.Lock() + defer m.mu.Unlock() + jobID := event.JobID + m.storage.mu.Lock() + job := m.storage.job + prevJob := m.storage.prevJob + m.storage.mu.Unlock() + if jobID == "" { + jobID = job.JobID + } + if jobID == "" || (jobID != job.JobID && jobID != prevJob.JobID) { + return + } + seq := m.strategy.Submit(jobID, event.Nonce, event.Result, event.Algo) + m.pending[seq] = SubmitContext{RequestID: event.RequestID, MinerID: event.Miner.ID(), JobID: jobID} + m.lastUsed = time.Now() +} + +// IsActive reports whether the mapper has received a valid job. +func (m *NonceMapper) IsActive() bool { + if m == nil { + return false + } + m.mu.Lock() + defer m.mu.Unlock() + return m.active +} + +// OnJob stores the current pool job and broadcasts it to active miners. +func (m *NonceMapper) OnJob(job proxy.Job) { + if m == nil || !job.IsValid() { + return + } + m.mu.Lock() + defer m.mu.Unlock() + m.storage.SetJob(job) + m.active = true + m.suspended = 0 + m.lastUsed = time.Now() +} + +// OnResultAccepted correlates a pool result back to the originating miner. +func (m *NonceMapper) OnResultAccepted(sequence int64, accepted bool, errorMessage string) { + if m == nil { + return + } + m.mu.Lock() + ctx, ok := m.pending[sequence] + if ok { + delete(m.pending, sequence) + } + m.storage.mu.Lock() + miner := m.storage.miners[ctx.MinerID] + job := m.storage.job + prevJob := m.storage.prevJob + m.storage.mu.Unlock() + expired := ctx.JobID != "" && ctx.JobID == prevJob.JobID && ctx.JobID != job.JobID + m.mu.Unlock() + if !ok || miner == nil { + return + } + if accepted { + miner.Success(ctx.RequestID, "OK") + if m.events != nil { + m.events.Dispatch(proxy.Event{Type: proxy.EventAccept, Miner: miner, Job: &job, Diff: job.DifficultyFromTarget(), Latency: 0, Expired: expired}) + } + return + } + miner.ReplyWithError(ctx.RequestID, errorMessage) + if m.events != nil { + m.events.Dispatch(proxy.Event{Type: proxy.EventReject, Miner: miner, Job: &job, Diff: job.DifficultyFromTarget(), Error: errorMessage}) + } +} + +func (m *NonceMapper) OnDisconnect() { + if m == nil { + return + } + m.mu.Lock() + defer m.mu.Unlock() + m.active = false + m.suspended++ +} + +// NewNonceStorage creates an empty slot table. +func NewNonceStorage() *NonceStorage { + return &NonceStorage{miners: make(map[int64]*proxy.Miner)} +} + +// Add finds the next free slot. +func (s *NonceStorage) Add(miner *proxy.Miner) bool { + if s == nil || miner == nil { + return false + } + s.mu.Lock() + defer s.mu.Unlock() + for i := 0; i < 256; i++ { + index := (s.cursor + i) % 256 + if s.slots[index] != 0 { + continue + } + s.slots[index] = miner.ID() + s.miners[miner.ID()] = miner + miner.SetFixedByte(uint8(index)) + s.cursor = (index + 1) % 256 + return true + } + return false +} + +// Remove marks a slot as dead. +func (s *NonceStorage) Remove(miner *proxy.Miner) { + if s == nil || miner == nil { + return + } + s.mu.Lock() + defer s.mu.Unlock() + index := int(miner.FixedByte()) + if index >= 0 && index < len(s.slots) && s.slots[index] == miner.ID() { + s.slots[index] = -miner.ID() + } + delete(s.miners, miner.ID()) +} + +// SetJob replaces the current job and sends it to active miners. +func (s *NonceStorage) SetJob(job proxy.Job) { + if s == nil || !job.IsValid() { + return + } + s.mu.Lock() + s.prevJob = s.job + if s.prevJob.ClientID != job.ClientID { + s.prevJob = proxy.Job{} + } + s.job = job + for i := range s.slots { + if s.slots[i] < 0 { + s.slots[i] = 0 + } + } + miners := make([]*proxy.Miner, 0, len(s.miners)) + for _, miner := range s.miners { + miners = append(miners, miner) + } + s.mu.Unlock() + for _, miner := range miners { + miner.ForwardJob(job, job.Algo) + } +} + +// IsValidJobID returns true if the id matches the current or previous job. +func (s *NonceStorage) IsValidJobID(id string) bool { + if s == nil { + return false + } + s.mu.Lock() + defer s.mu.Unlock() + return id != "" && (id == s.job.JobID || id == s.prevJob.JobID) +} + +// SlotCount returns free, dead, and active counts. +func (s *NonceStorage) SlotCount() (free, dead, active int) { + if s == nil { + return 0, 0, 0 + } + s.mu.Lock() + defer s.mu.Unlock() + for _, slot := range s.slots { + switch { + case slot == 0: + free++ + case slot < 0: + dead++ + default: + active++ + } + } + return +} diff --git a/splitter/nicehash/mapper.go b/splitter/nicehash/mapper.go index 6f4b408..b7cf918 100644 --- a/splitter/nicehash/mapper.go +++ b/splitter/nicehash/mapper.go @@ -2,6 +2,7 @@ package nicehash import ( "sync" + "time" "dappco.re/go/core/proxy" "dappco.re/go/core/proxy/pool" @@ -15,11 +16,13 @@ import ( type NonceMapper struct { id int64 storage *NonceStorage - strategy pool.Strategy // manages pool client lifecycle and failover + strategy pool.Strategy // manages pool client lifecycle and failover pending map[int64]SubmitContext // sequence → {requestID, minerID} cfg *proxy.Config + events *proxy.EventBus active bool // true once pool has sent at least one job suspended int // > 0 when pool connection is in error/reconnecting + lastUsed time.Time mu sync.Mutex } @@ -29,4 +32,5 @@ type NonceMapper struct { type SubmitContext struct { RequestID int64 // JSON-RPC id from the miner's submit request MinerID int64 // miner that submitted + JobID string } diff --git a/splitter/nicehash/splitter.go b/splitter/nicehash/splitter.go index 0d77522..21ad011 100644 --- a/splitter/nicehash/splitter.go +++ b/splitter/nicehash/splitter.go @@ -23,8 +23,10 @@ import ( // s.Connect() type NonceSplitter struct { mappers []*NonceMapper + byID map[int64]*NonceMapper cfg *proxy.Config events *proxy.EventBus strategyFactory pool.StrategyFactory mu sync.RWMutex + seq int64 } diff --git a/splitter/nicehash/storage_test.go b/splitter/nicehash/storage_test.go new file mode 100644 index 0000000..3cb56f8 --- /dev/null +++ b/splitter/nicehash/storage_test.go @@ -0,0 +1,26 @@ +package nicehash + +import ( + "testing" + + "dappco.re/go/core/proxy" +) + +func TestNonceStorage_AddAndRemove(t *testing.T) { + storage := NewNonceStorage() + miner := &proxy.Miner{} + miner.SetID(1) + + if !storage.Add(miner) { + t.Fatalf("expected add to succeed") + } + if miner.FixedByte() != 0 { + t.Fatalf("expected first slot to be 0, got %d", miner.FixedByte()) + } + + storage.Remove(miner) + free, dead, active := storage.SlotCount() + if free != 255 || dead != 1 || active != 0 { + t.Fatalf("unexpected slot counts: free=%d dead=%d active=%d", free, dead, active) + } +} diff --git a/splitter/simple/impl.go b/splitter/simple/impl.go new file mode 100644 index 0000000..cc63b32 --- /dev/null +++ b/splitter/simple/impl.go @@ -0,0 +1,227 @@ +package simple + +import ( + "time" + + "dappco.re/go/core/proxy" + "dappco.re/go/core/proxy/pool" +) + +func init() { + proxy.RegisterSplitterFactory("simple", func(cfg *proxy.Config, events *proxy.EventBus) proxy.Splitter { + return NewSimpleSplitter(cfg, events, pool.NewStrategyFactory(cfg)) + }) +} + +// NewSimpleSplitter creates the passthrough splitter. +func NewSimpleSplitter(cfg *proxy.Config, events *proxy.EventBus, factory pool.StrategyFactory) *SimpleSplitter { + if factory == nil { + factory = pool.NewStrategyFactory(cfg) + } + return &SimpleSplitter{ + active: make(map[int64]*SimpleMapper), + idle: make(map[int64]*SimpleMapper), + cfg: cfg, + events: events, + factory: factory, + } +} + +// Connect establishes any mapper strategies that already exist. +func (s *SimpleSplitter) Connect() { + if s == nil { + return + } + s.mu.Lock() + defer s.mu.Unlock() + for _, mapper := range s.active { + if mapper.strategy != nil { + mapper.strategy.Connect() + } + } + for _, mapper := range s.idle { + if mapper.strategy != nil { + mapper.strategy.Connect() + } + } +} + +// OnLogin creates or reclaims a mapper. +func (s *SimpleSplitter) OnLogin(event *proxy.LoginEvent) { + if s == nil || event == nil || event.Miner == nil { + return + } + s.mu.Lock() + defer s.mu.Unlock() + + if s.cfg.ReuseTimeout > 0 { + for id, mapper := range s.idle { + if mapper.strategy != nil && mapper.strategy.IsActive() { + delete(s.idle, id) + mapper.miner = event.Miner + mapper.idleAt = time.Time{} + mapper.stopped = false + s.active[event.Miner.ID()] = mapper + event.Miner.SetRouteID(mapper.id) + return + } + } + } + + mapper := s.newMapperLocked() + mapper.miner = event.Miner + s.active[event.Miner.ID()] = mapper + event.Miner.SetRouteID(mapper.id) + if mapper.strategy != nil { + mapper.strategy.Connect() + } +} + +// OnSubmit forwards the share to the owning mapper. +func (s *SimpleSplitter) OnSubmit(event *proxy.SubmitEvent) { + if s == nil || event == nil || event.Miner == nil { + return + } + s.mu.Lock() + mapper := s.active[event.Miner.ID()] + s.mu.Unlock() + if mapper != nil { + mapper.Submit(event) + } +} + +// OnClose moves a mapper to the idle pool or stops it. +func (s *SimpleSplitter) OnClose(event *proxy.CloseEvent) { + if s == nil || event == nil || event.Miner == nil { + return + } + s.mu.Lock() + defer s.mu.Unlock() + mapper := s.active[event.Miner.ID()] + if mapper == nil { + return + } + delete(s.active, event.Miner.ID()) + mapper.miner = nil + mapper.idleAt = time.Now() + event.Miner.SetRouteID(-1) + if s.cfg.ReuseTimeout > 0 { + s.idle[mapper.id] = mapper + return + } + mapper.stopped = true + if mapper.strategy != nil { + mapper.strategy.Disconnect() + } +} + +// GC removes expired idle mappers. +func (s *SimpleSplitter) GC() { + if s == nil { + return + } + s.mu.Lock() + defer s.mu.Unlock() + now := time.Now() + for id, mapper := range s.idle { + if mapper.stopped || (s.cfg.ReuseTimeout > 0 && now.Sub(mapper.idleAt) > time.Duration(s.cfg.ReuseTimeout)*time.Second) { + if mapper.strategy != nil { + mapper.strategy.Disconnect() + } + delete(s.idle, id) + } + } +} + +// Tick is a no-op for simple mode. +func (s *SimpleSplitter) Tick(ticks uint64) {} + +// Upstreams returns active/idle/error counts. +func (s *SimpleSplitter) Upstreams() proxy.UpstreamStats { + if s == nil { + return proxy.UpstreamStats{} + } + s.mu.Lock() + defer s.mu.Unlock() + var stats proxy.UpstreamStats + stats.Active = uint64(len(s.active)) + stats.Sleep = uint64(len(s.idle)) + stats.Total = stats.Active + stats.Sleep + return stats +} + +func (s *SimpleSplitter) newMapperLocked() *SimpleMapper { + id := s.seq + s.seq++ + mapper := &SimpleMapper{ + id: id, + events: s.events, + pending: make(map[int64]*proxy.SubmitEvent), + } + mapper.strategy = s.factory(mapper) + if mapper.strategy == nil { + mapper.strategy = s.factory(mapper) + } + return mapper +} + +// Submit forwards a share to the pool. +func (m *SimpleMapper) Submit(event *proxy.SubmitEvent) { + if m == nil || event == nil || m.strategy == nil { + return + } + m.mu.Lock() + defer m.mu.Unlock() + seq := m.strategy.Submit(event.JobID, event.Nonce, event.Result, event.Algo) + m.pending[seq] = event +} + +// OnJob forwards the latest pool job to the active miner. +func (m *SimpleMapper) OnJob(job proxy.Job) { + if m == nil { + return + } + m.mu.Lock() + miner := m.miner + m.mu.Unlock() + if miner == nil { + return + } + miner.ForwardJob(job, job.Algo) +} + +// OnResultAccepted forwards result status to the miner. +func (m *SimpleMapper) OnResultAccepted(sequence int64, accepted bool, errorMessage string) { + if m == nil { + return + } + m.mu.Lock() + ctx := m.pending[sequence] + delete(m.pending, sequence) + miner := m.miner + m.mu.Unlock() + if ctx == nil || miner == nil { + return + } + if accepted { + miner.Success(ctx.RequestID, "OK") + if m.events != nil { + job := miner.CurrentJob() + m.events.Dispatch(proxy.Event{Type: proxy.EventAccept, Miner: miner, Diff: job.DifficultyFromTarget(), Job: &job}) + } + return + } + miner.ReplyWithError(ctx.RequestID, errorMessage) + if m.events != nil { + job := miner.CurrentJob() + m.events.Dispatch(proxy.Event{Type: proxy.EventReject, Miner: miner, Diff: job.DifficultyFromTarget(), Job: &job, Error: errorMessage}) + } +} + +// OnDisconnect marks the mapper as disconnected. +func (m *SimpleMapper) OnDisconnect() { + if m == nil { + return + } + m.stopped = true +} diff --git a/splitter/simple/mapper.go b/splitter/simple/mapper.go index 18be7e8..dceb739 100644 --- a/splitter/simple/mapper.go +++ b/splitter/simple/mapper.go @@ -1,6 +1,7 @@ package simple import ( + "sync" "time" "dappco.re/go/core/proxy" @@ -18,4 +19,7 @@ type SimpleMapper struct { strategy pool.Strategy idleAt time.Time // zero when active stopped bool + events *proxy.EventBus + pending map[int64]*proxy.SubmitEvent + mu sync.Mutex } diff --git a/state_impl.go b/state_impl.go new file mode 100644 index 0000000..693cab9 --- /dev/null +++ b/state_impl.go @@ -0,0 +1,1252 @@ +package proxy + +import ( + "bufio" + "context" + "crypto/tls" + "encoding/json" + "errors" + "io" + "net" + "net/http" + "sort" + "strconv" + "strings" + "sync/atomic" + "time" +) + +// MinerSnapshot is a serialisable view of one miner connection. +type MinerSnapshot struct { + ID int64 + IP string + TX uint64 + RX uint64 + State MinerState + Diff uint64 + User string + Password string + RigID string + Agent string +} + +// New creates the proxy and wires the default event handlers. +func New(cfg *Config) (*Proxy, Result) { + if cfg == nil { + return nil, errorResult(errors.New("config is nil")) + } + if cfg.Mode == "" { + cfg.Mode = "nicehash" + } + if result := cfg.Validate(); !result.OK { + return nil, result + } + + p := &Proxy{ + config: cfg, + events: NewEventBus(), + stats: NewStats(), + workers: NewWorkers(cfg.Workers, nil), + miners: make(map[int64]*Miner), + customDiff: NewCustomDiff(cfg.CustomDiff), + rateLimit: NewRateLimiter(cfg.RateLimit), + done: make(chan struct{}), + } + p.workers.bindEvents(p.events) + + p.events.Subscribe(EventLogin, p.customDiff.OnLogin) + p.events.Subscribe(EventLogin, p.stats.OnLogin) + p.events.Subscribe(EventLogin, p.workers.OnLogin) + p.events.Subscribe(EventClose, p.stats.OnClose) + p.events.Subscribe(EventClose, p.workers.OnClose) + p.events.Subscribe(EventAccept, p.stats.OnAccept) + p.events.Subscribe(EventAccept, p.workers.OnAccept) + p.events.Subscribe(EventReject, p.stats.OnReject) + p.events.Subscribe(EventReject, p.workers.OnReject) + + if factory, ok := getSplitterFactory(cfg.Mode); ok { + p.splitter = factory(cfg, p.events) + } else { + p.splitter = &noopSplitter{} + } + + return p, successResult() +} + +// Mode returns the active proxy mode. +func (p *Proxy) Mode() string { + if p == nil || p.config == nil { + return "" + } + return p.config.Mode +} + +// WorkersMode returns the worker naming strategy. +func (p *Proxy) WorkersMode() WorkersMode { + if p == nil || p.config == nil { + return WorkersDisabled + } + return p.config.Workers +} + +// Summary returns the current global stats snapshot. +func (p *Proxy) Summary() StatsSummary { + if p == nil || p.stats == nil { + return StatsSummary{} + } + return p.stats.Summary() +} + +// WorkerRecords returns a stable snapshot of worker rows. +func (p *Proxy) WorkerRecords() []WorkerRecord { + if p == nil || p.workers == nil { + return nil + } + return p.workers.List() +} + +// MinerSnapshots returns a stable snapshot of connected miners. +func (p *Proxy) MinerSnapshots() []MinerSnapshot { + if p == nil { + return nil + } + p.minersMu.RLock() + defer p.minersMu.RUnlock() + rows := make([]MinerSnapshot, 0, len(p.miners)) + for _, miner := range p.miners { + rows = append(rows, MinerSnapshot{ + ID: miner.id, + IP: miner.ip, + TX: miner.tx, + RX: miner.rx, + State: miner.state, + Diff: miner.customDiff, + User: miner.user, + Password: "********", + RigID: miner.rigID, + Agent: miner.agent, + }) + } + sort.Slice(rows, func(i, j int) bool { return rows[i].ID < rows[j].ID }) + return rows +} + +// MinerCount returns current and peak connected miner counts. +func (p *Proxy) MinerCount() (now, max uint64) { + if p == nil || p.stats == nil { + return 0, 0 + } + return p.stats.miners.Load(), p.stats.maxMiners.Load() +} + +// Upstreams returns splitter upstream counts. +func (p *Proxy) Upstreams() UpstreamStats { + if p == nil || p.splitter == nil { + return UpstreamStats{} + } + return p.splitter.Upstreams() +} + +// Start starts the TCP listeners, ticker loop, and optional HTTP API. +func (p *Proxy) Start() { + if p == nil { + return + } + for _, bind := range p.config.Bind { + var tlsCfg *tls.Config + if bind.TLS && p.config.TLS.Enabled { + tlsCfg = buildTLSConfig(p.config.TLS) + } + server, _ := NewServer(bind, tlsCfg, p.rateLimit, p.acceptMiner) + p.servers = append(p.servers, server) + server.Start() + } + if p.splitter != nil { + p.splitter.Connect() + } + if p.config.HTTP.Enabled { + p.startHTTP() + } + p.ticker = time.NewTicker(time.Second) + go func() { + var ticks uint64 + for { + select { + case <-p.ticker.C: + ticks++ + if p.stats != nil { + p.stats.Tick() + } + if p.workers != nil { + p.workers.Tick() + } + if p.rateLimit != nil { + p.rateLimit.Tick() + } + if p.splitter != nil { + p.splitter.Tick(ticks) + if ticks%60 == 0 { + p.splitter.GC() + } + } + case <-p.done: + return + } + } + }() + <-p.done + p.Stop() +} + +// Stop shuts down listeners, background tasks, and HTTP. +func (p *Proxy) Stop() { + if p == nil { + return + } + p.stopOnce.Do(func() { + close(p.done) + if p.ticker != nil { + p.ticker.Stop() + } + for _, server := range p.servers { + server.Stop() + } + if p.watcher != nil { + p.watcher.Stop() + } + if p.httpServer != nil { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + _ = p.httpServer.Shutdown(ctx) + } + }) +} + +// Reload swaps the live configuration and updates dependent state. +func (p *Proxy) Reload(cfg *Config) { + if p == nil || cfg == nil { + return + } + p.config = cfg + if p.customDiff != nil { + p.customDiff.globalDiff = cfg.CustomDiff + } + p.rateLimit = NewRateLimiter(cfg.RateLimit) +} + +func (p *Proxy) acceptMiner(conn net.Conn, localPort uint16) { + if p == nil { + _ = conn.Close() + return + } + if p.stats != nil { + p.stats.connections.Add(1) + } + miner := NewMiner(conn, localPort, nil) + miner.accessPassword = p.config.AccessPassword + miner.globalDiff = p.config.CustomDiff + miner.extNH = strings.EqualFold(p.config.Mode, "nicehash") + miner.onLogin = func(m *Miner) { + if p.events != nil { + p.events.Dispatch(Event{Type: EventLogin, Miner: m}) + } + if p.splitter != nil { + p.splitter.OnLogin(&LoginEvent{Miner: m}) + } + } + miner.onSubmit = func(m *Miner, event *SubmitEvent) { + if p.splitter != nil { + p.splitter.OnSubmit(event) + } + } + miner.onClose = func(m *Miner) { + if p.events != nil { + p.events.Dispatch(Event{Type: EventClose, Miner: m}) + } + if p.splitter != nil { + p.splitter.OnClose(&CloseEvent{Miner: m}) + } + p.minersMu.Lock() + delete(p.miners, m.id) + p.minersMu.Unlock() + } + p.minersMu.Lock() + p.miners[miner.id] = miner + p.minersMu.Unlock() + miner.Start() +} + +func buildTLSConfig(cfg TLSConfig) *tls.Config { + if !cfg.Enabled || cfg.CertFile == "" || cfg.KeyFile == "" { + return nil + } + cert, err := tls.LoadX509KeyPair(cfg.CertFile, cfg.KeyFile) + if err != nil { + return nil + } + return &tls.Config{Certificates: []tls.Certificate{cert}} +} + +func (p *Proxy) startHTTP() { + mux := http.NewServeMux() + mux.HandleFunc("/1/summary", func(w http.ResponseWriter, r *http.Request) { + if !p.allowHTTP(r) { + w.WriteHeader(http.StatusMethodNotAllowed) + return + } + p.writeJSON(w, p.summaryDocument()) + }) + mux.HandleFunc("/1/workers", func(w http.ResponseWriter, r *http.Request) { + if !p.allowHTTP(r) { + w.WriteHeader(http.StatusMethodNotAllowed) + return + } + p.writeJSON(w, p.workersDocument()) + }) + mux.HandleFunc("/1/miners", func(w http.ResponseWriter, r *http.Request) { + if !p.allowHTTP(r) { + w.WriteHeader(http.StatusMethodNotAllowed) + return + } + p.writeJSON(w, p.minersDocument()) + }) + addr := net.JoinHostPort(p.config.HTTP.Host, strconv.Itoa(int(p.config.HTTP.Port))) + p.httpServer = &http.Server{Addr: addr, Handler: mux} + go func() { + _ = p.httpServer.ListenAndServe() + }() +} + +func (p *Proxy) allowHTTP(r *http.Request) bool { + if p == nil { + return false + } + if p.config.HTTP.Restricted && r.Method != http.MethodGet { + return false + } + if token := p.config.HTTP.AccessToken; token != "" { + parts := strings.SplitN(r.Header.Get("Authorization"), " ", 2) + if len(parts) != 2 || !strings.EqualFold(parts[0], "bearer") || parts[1] != token { + return false + } + } + return true +} + +func (p *Proxy) writeJSON(w http.ResponseWriter, payload any) { + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(payload) +} + +func (p *Proxy) summaryDocument() any { + summary := p.Summary() + now, max := p.MinerCount() + upstreams := p.Upstreams() + workerCount := uint64(len(p.WorkerRecords())) + return map[string]any{ + "version": "1.0.0", + "mode": p.Mode(), + "hashrate": map[string]any{ + "total": summary.Hashrate, + }, + "miners": map[string]any{ + "now": now, + "max": max, + }, + "workers": workerCount, + "upstreams": map[string]any{"active": upstreams.Active, "sleep": upstreams.Sleep, "error": upstreams.Error, "total": upstreams.Total, "ratio": upstreamRatio(now, upstreams)}, + "results": map[string]any{ + "accepted": summary.Accepted, + "rejected": summary.Rejected, + "invalid": summary.Invalid, + "expired": summary.Expired, + "avg_time": summary.AvgTime, + "latency": summary.AvgLatency, + "hashes_total": summary.Hashes, + "best": summary.TopDiff, + }, + } +} + +func (p *Proxy) workersDocument() any { + records := p.WorkerRecords() + rows := make([]any, 0, len(records)) + for _, record := range records { + rows = append(rows, []any{ + record.Name, + record.LastIP, + record.Connections, + record.Accepted, + record.Rejected, + record.Invalid, + record.Hashes, + record.LastHashAt.Unix(), + record.Hashrate(60), + record.Hashrate(600), + record.Hashrate(3600), + record.Hashrate(43200), + record.Hashrate(86400), + }) + } + return map[string]any{ + "mode": string(p.WorkersMode()), + "workers": rows, + } +} + +func (p *Proxy) minersDocument() any { + records := p.MinerSnapshots() + rows := make([]any, 0, len(records)) + for _, miner := range records { + rows = append(rows, []any{ + miner.ID, + miner.IP, + miner.TX, + miner.RX, + miner.State, + miner.Diff, + miner.User, + miner.Password, + miner.RigID, + miner.Agent, + }) + } + return map[string]any{ + "format": []string{"id", "ip", "tx", "rx", "state", "diff", "user", "password", "rig_id", "agent"}, + "miners": rows, + } +} + +func upstreamRatio(now uint64, upstreams UpstreamStats) float64 { + if upstreams.Total == 0 { + return 0 + } + return float64(now) / float64(upstreams.Total) +} + +func NewMiner(conn net.Conn, localPort uint16, tlsCfg *tls.Config) *Miner { + if tlsCfg != nil { + if _, ok := conn.(*tls.Conn); !ok { + conn = tls.Server(conn, tlsCfg) + } + } + miner := &Miner{ + id: nextMinerID(), + state: MinerStateWaitLogin, + localPort: localPort, + mapperID: -1, + routeID: -1, + connectedAt: time.Now().UTC(), + lastActivityAt: time.Now().UTC(), + conn: conn, + } + if tlsConn, ok := conn.(*tls.Conn); ok { + miner.tlsConn = tlsConn + } + if remote := conn.RemoteAddr(); remote != nil { + miner.ip = hostOnly(remote.String()) + } + return miner +} + +func (m *Miner) SetID(id int64) { m.id = id } +func (m *Miner) ID() int64 { return m.id } +func (m *Miner) SetMapperID(id int64) { + m.mapperID = id +} +func (m *Miner) MapperID() int64 { + return m.mapperID +} +func (m *Miner) SetRouteID(id int64) { + m.routeID = id +} +func (m *Miner) RouteID() int64 { + return m.routeID +} +func (m *Miner) SetExtendedNiceHash(enabled bool) { + m.extNH = enabled +} +func (m *Miner) ExtendedNiceHash() bool { + return m.extNH +} +func (m *Miner) SetCurrentJob(job Job) { + m.currentJob = job +} +func (m *Miner) CurrentJob() Job { + return m.currentJob +} +func (m *Miner) FixedByte() uint8 { + return m.fixedByte +} +func (m *Miner) SetFixedByte(value uint8) { + m.fixedByte = value +} +func (m *Miner) IP() string { + return m.ip +} +func (m *Miner) User() string { + return m.user +} +func (m *Miner) Password() string { + return m.password +} +func (m *Miner) Agent() string { + return m.agent +} +func (m *Miner) RigID() string { + return m.rigID +} +func (m *Miner) RX() uint64 { + return m.rx +} +func (m *Miner) TX() uint64 { + return m.tx +} +func (m *Miner) State() MinerState { + return m.state +} + +// Start launches the read loop. +func (m *Miner) Start() { + if m == nil { + return + } + go m.readLoop() +} + +func (m *Miner) readLoop() { + defer func() { + if m.onClose != nil { + m.onClose(m) + } + }() + + reader := bufio.NewReader(m.conn) + for { + if m.state == MinerStateClosing { + return + } + if timeout := m.readTimeout(); timeout > 0 { + _ = m.conn.SetReadDeadline(time.Now().Add(timeout)) + } + line, isPrefix, err := reader.ReadLine() + if err != nil { + if ne, ok := err.(net.Error); ok && ne.Timeout() { + return + } + if err != io.EOF { + return + } + return + } + if isPrefix { + m.Close() + return + } + if len(line) == 0 { + continue + } + m.rx += uint64(len(line) + 1) + m.lastActivityAt = time.Now().UTC() + if !m.handleLine(line) { + return + } + } +} + +func (m *Miner) readTimeout() time.Duration { + switch m.state { + case MinerStateWaitLogin: + return 10 * time.Second + case MinerStateWaitReady, MinerStateReady: + return 600 * time.Second + default: + return 0 + } +} + +type stratumRequest struct { + ID any `json:"id"` + JSONRPC string `json:"jsonrpc"` + Method string `json:"method"` + Params json.RawMessage `json:"params"` +} + +func (m *Miner) handleLine(line []byte) bool { + var req stratumRequest + if err := json.Unmarshal(line, &req); err != nil { + return false + } + switch req.Method { + case "login": + m.handleLogin(req) + case "submit": + m.handleSubmit(req) + case "keepalived": + m.handleKeepalived(req) + } + return true +} + +type loginParams struct { + Login string `json:"login"` + Pass string `json:"pass"` + Agent string `json:"agent"` + Algo []string `json:"algo"` + RigID string `json:"rigid"` +} + +func (m *Miner) handleLogin(req stratumRequest) { + if m.state != MinerStateWaitLogin { + return + } + var params loginParams + if err := json.Unmarshal(req.Params, ¶ms); err != nil || strings.TrimSpace(params.Login) == "" { + m.ReplyWithError(requestID(req.ID), "Invalid payment address provided") + return + } + if m.accessPassword != "" && params.Pass != m.accessPassword { + m.ReplyWithError(requestID(req.ID), "Invalid password") + return + } + m.user = params.Login + m.customDiff = 0 + m.password = params.Pass + m.agent = params.Agent + m.rigID = params.RigID + m.extAlgo = len(params.Algo) > 0 + m.rpcID = generateUUID() + m.state = MinerStateWaitReady + if m.onLogin != nil { + m.onLogin(m) + } + m.Success(requestID(req.ID), "OK") +} + +func parseLoginUser(login string, globalDiff uint64) (string, uint64) { + plus := strings.LastIndex(login, "+") + if plus >= 0 && plus < len(login)-1 { + if parsed, err := strconv.ParseUint(login[plus+1:], 10, 64); err == nil { + return login[:plus], parsed + } + return login, 0 + } + if globalDiff > 0 { + return login, globalDiff + } + return login, 0 +} + +func (m *Miner) handleSubmit(req stratumRequest) { + if m.state != MinerStateReady { + m.ReplyWithError(requestID(req.ID), "Unauthenticated") + return + } + var params struct { + ID string `json:"id"` + JobID string `json:"job_id"` + Nonce string `json:"nonce"` + Result string `json:"result"` + Algo string `json:"algo"` + } + if err := json.Unmarshal(req.Params, ¶ms); err != nil { + m.ReplyWithError(requestID(req.ID), "Invalid nonce") + return + } + if params.ID != m.rpcID { + m.ReplyWithError(requestID(req.ID), "Unauthenticated") + return + } + if params.JobID == "" { + m.ReplyWithError(requestID(req.ID), "Missing job id") + return + } + if !isLowerHex8(params.Nonce) { + m.ReplyWithError(requestID(req.ID), "Invalid nonce") + return + } + if m.onSubmit != nil { + m.onSubmit(m, &SubmitEvent{ + Miner: m, + JobID: params.JobID, + Nonce: params.Nonce, + Result: params.Result, + Algo: params.Algo, + RequestID: requestID(req.ID), + }) + } + m.lastActivityAt = time.Now().UTC() +} + +func (m *Miner) handleKeepalived(req stratumRequest) { + m.lastActivityAt = time.Now().UTC() + m.Success(requestID(req.ID), "KEEPALIVED") +} + +func requestID(id any) int64 { + switch v := id.(type) { + case float64: + return int64(v) + case int64: + return v + case int: + return int64(v) + case string: + n, _ := strconv.ParseInt(v, 10, 64) + return n + default: + return 0 + } +} + +func isLowerHex8(value string) bool { + if len(value) != 8 { + return false + } + for _, r := range value { + if (r < '0' || r > '9') && (r < 'a' || r > 'f') { + return false + } + } + return true +} + +func (m *Miner) ForwardJob(job Job, algo string) { + if m == nil || !job.IsValid() { + return + } + m.currentJob = job + if algo == "" { + algo = job.Algo + } + blob := job.Blob + if m.extNH { + blob = job.BlobWithFixedByte(m.fixedByte) + } + payload := map[string]any{ + "jsonrpc": "2.0", + "method": "job", + "params": map[string]any{ + "blob": blob, + "job_id": job.JobID, + "target": job.Target, + "algo": algo, + "id": m.rpcID, + "height": job.Height, + "seed_hash": job.SeedHash, + }, + } + _ = m.writeJSON(payload) + if m.state == MinerStateWaitReady { + m.state = MinerStateReady + } +} + +func (m *Miner) ReplyWithError(id int64, message string) { + if m == nil { + return + } + payload := map[string]any{ + "id": id, + "jsonrpc": "2.0", + "error": map[string]any{ + "code": -1, + "message": message, + }, + } + _ = m.writeJSON(payload) +} + +func (m *Miner) Success(id int64, status string) { + if m == nil { + return + } + payload := map[string]any{ + "id": id, + "jsonrpc": "2.0", + "error": nil, + "result": map[string]any{ + "status": status, + }, + } + _ = m.writeJSON(payload) +} + +func (m *Miner) writeJSON(payload any) error { + m.sendMu.Lock() + defer m.sendMu.Unlock() + if m.conn == nil { + return nil + } + data, err := json.Marshal(payload) + if err != nil { + return err + } + data = append(data, '\n') + n, err := m.conn.Write(data) + m.tx += uint64(n) + if err != nil { + m.Close() + } + return err +} + +func (m *Miner) Close() { + if m == nil { + return + } + m.closeOnce.Do(func() { + m.state = MinerStateClosing + if m.conn != nil { + _ = m.conn.Close() + } + }) +} + +// NewStats creates zeroed global metrics. +func NewStats() *Stats { + stats := &Stats{startTime: time.Now().UTC(), latency: make([]uint16, 0, 1024)} + stats.windows[HashrateWindow60s] = newTickWindow(60) + stats.windows[HashrateWindow600s] = newTickWindow(600) + stats.windows[HashrateWindow3600s] = newTickWindow(3600) + stats.windows[HashrateWindow12h] = newTickWindow(43200) + stats.windows[HashrateWindow24h] = newTickWindow(86400) + return stats +} + +// OnLogin increments the current miner count. +func (s *Stats) OnLogin(e Event) { + if s == nil || e.Miner == nil { + return + } + now := s.miners.Add(1) + for { + max := s.maxMiners.Load() + if now <= max || s.maxMiners.CompareAndSwap(max, now) { + break + } + } +} + +// OnClose decrements the current miner count. +func (s *Stats) OnClose(e Event) { + if s == nil || e.Miner == nil { + return + } + for { + current := s.miners.Load() + if current == 0 || s.miners.CompareAndSwap(current, current-1) { + return + } + } +} + +// OnAccept records an accepted share. +func (s *Stats) OnAccept(e Event) { + if s == nil { + return + } + s.accepted.Add(1) + if e.Expired { + s.expired.Add(1) + } + s.mu.Lock() + if len(s.latency) < 10000 { + s.latency = append(s.latency, e.Latency) + } + insertTopDiff(&s.topDiff, e.Diff) + for i := range s.windows { + if s.windows[i].size > 0 { + s.windows[i].buckets[s.windows[i].pos] += e.Diff + } + } + s.mu.Unlock() + if e.Diff > 0 { + s.hashes.Add(e.Diff) + } +} + +// OnReject records a rejected share. +func (s *Stats) OnReject(e Event) { + if s == nil { + return + } + s.rejected.Add(1) + if strings.Contains(strings.ToLower(e.Error), "difficulty") || strings.Contains(strings.ToLower(e.Error), "invalid") || strings.Contains(strings.ToLower(e.Error), "nonce") { + s.invalid.Add(1) + } +} + +// Tick advances the rolling windows. +func (s *Stats) Tick() { + if s == nil { + return + } + s.mu.Lock() + defer s.mu.Unlock() + for i := range s.windows { + if s.windows[i].size == 0 { + continue + } + s.windows[i].pos = (s.windows[i].pos + 1) % s.windows[i].size + s.windows[i].buckets[s.windows[i].pos] = 0 + } +} + +// Summary returns a snapshot of the current metrics. +func (s *Stats) Summary() StatsSummary { + if s == nil { + return StatsSummary{} + } + s.mu.Lock() + defer s.mu.Unlock() + summary := StatsSummary{ + Accepted: s.accepted.Load(), + Rejected: s.rejected.Load(), + Invalid: s.invalid.Load(), + Expired: s.expired.Load(), + Hashes: s.hashes.Load(), + } + if summary.Accepted > 0 { + uptime := uint64(time.Since(s.startTime).Seconds()) + if uptime == 0 { + uptime = 1 + } + summary.AvgTime = uint32(uptime / summary.Accepted) + } + if len(s.latency) > 0 { + samples := append([]uint16(nil), s.latency...) + sort.Slice(samples, func(i, j int) bool { return samples[i] < samples[j] }) + summary.AvgLatency = uint32(samples[len(samples)/2]) + } + summary.TopDiff = s.topDiff + for i := range s.windows { + if i == HashrateWindowAll { + uptime := time.Since(s.startTime).Seconds() + if uptime > 0 { + summary.Hashrate[i] = float64(summary.Hashes) / uptime + } + continue + } + total := uint64(0) + for _, bucket := range s.windows[i].buckets { + total += bucket + } + if s.windows[i].size > 0 { + summary.Hashrate[i] = float64(total) / float64(s.windows[i].size) + } + } + return summary +} + +func insertTopDiff(top *[10]uint64, diff uint64) { + if diff == 0 { + return + } + for i := range top { + if diff > top[i] { + for j := len(top) - 1; j > i; j-- { + top[j] = top[j-1] + } + top[i] = diff + return + } + } +} + +// NewWorkers creates a worker aggregate tracker. +func NewWorkers(mode WorkersMode, _ *EventBus) *Workers { + return &Workers{ + mode: mode, + nameIndex: make(map[string]int), + idIndex: make(map[int64]int), + } +} + +func (w *Workers) bindEvents(bus *EventBus) { + if w == nil || bus == nil { + return + } + w.mu.Lock() + defer w.mu.Unlock() +} + +func workerNameFor(mode WorkersMode, miner *Miner) string { + if miner == nil { + return "" + } + switch mode { + case WorkersByRigID: + if miner.rigID != "" { + return miner.rigID + } + return miner.user + case WorkersByUser: + return miner.user + case WorkersByPass: + return miner.password + case WorkersByAgent: + return miner.agent + case WorkersByIP: + return miner.ip + case WorkersDisabled: + return "" + default: + return miner.user + } +} + +// OnLogin creates or updates a worker record. +func (w *Workers) OnLogin(e Event) { + if w == nil || e.Miner == nil { + return + } + if w.mode == WorkersDisabled { + return + } + name := workerNameFor(w.mode, e.Miner) + if name == "" { + return + } + w.mu.Lock() + defer w.mu.Unlock() + index, ok := w.nameIndex[name] + if !ok { + index = len(w.entries) + record := WorkerRecord{Name: name} + record.windows[0] = newTickWindow(60) + record.windows[1] = newTickWindow(600) + record.windows[2] = newTickWindow(3600) + record.windows[3] = newTickWindow(43200) + record.windows[4] = newTickWindow(86400) + w.entries = append(w.entries, record) + w.nameIndex[name] = index + } + record := &w.entries[index] + record.Name = name + record.LastIP = e.Miner.ip + record.Connections++ + w.idIndex[e.Miner.id] = index +} + +func newTickWindow(size int) tickWindow { + return tickWindow{ + buckets: make([]uint64, size), + size: size, + } +} + +// OnAccept updates the owning worker with the accepted share. +func (w *Workers) OnAccept(e Event) { + if w == nil || e.Miner == nil { + return + } + w.mu.Lock() + defer w.mu.Unlock() + index, ok := w.idIndex[e.Miner.id] + if !ok || index < 0 || index >= len(w.entries) { + return + } + record := &w.entries[index] + record.Accepted++ + record.Hashes += e.Diff + record.LastHashAt = time.Now().UTC() + record.LastIP = e.Miner.ip + for i := range record.windows { + if record.windows[i].size > 0 { + record.windows[i].buckets[record.windows[i].pos] += e.Diff + } + } +} + +// OnReject updates the owning worker with the rejected share. +func (w *Workers) OnReject(e Event) { + if w == nil || e.Miner == nil { + return + } + w.mu.Lock() + defer w.mu.Unlock() + index, ok := w.idIndex[e.Miner.id] + if !ok || index < 0 || index >= len(w.entries) { + return + } + record := &w.entries[index] + record.Rejected++ + if strings.Contains(strings.ToLower(e.Error), "difficulty") || strings.Contains(strings.ToLower(e.Error), "invalid") || strings.Contains(strings.ToLower(e.Error), "nonce") { + record.Invalid++ + } + record.LastIP = e.Miner.ip +} + +// OnClose removes the miner mapping from the worker table. +func (w *Workers) OnClose(e Event) { + if w == nil || e.Miner == nil { + return + } + w.mu.Lock() + defer w.mu.Unlock() + delete(w.idIndex, e.Miner.id) +} + +// List returns a snapshot of all workers. +func (w *Workers) List() []WorkerRecord { + if w == nil { + return nil + } + w.mu.RLock() + defer w.mu.RUnlock() + out := make([]WorkerRecord, len(w.entries)) + copy(out, w.entries) + return out +} + +// Tick advances worker hash windows. +func (w *Workers) Tick() { + if w == nil { + return + } + w.mu.Lock() + defer w.mu.Unlock() + for i := range w.entries { + for j := range w.entries[i].windows { + if w.entries[i].windows[j].size == 0 { + continue + } + w.entries[i].windows[j].pos = (w.entries[i].windows[j].pos + 1) % w.entries[i].windows[j].size + w.entries[i].windows[j].buckets[w.entries[i].windows[j].pos] = 0 + } + } +} + +// Hashrate returns the configured worker hashrate window. +func (r *WorkerRecord) Hashrate(seconds int) float64 { + if r == nil || seconds <= 0 { + return 0 + } + index := -1 + switch seconds { + case 60: + index = 0 + case 600: + index = 1 + case 3600: + index = 2 + case 43200: + index = 3 + case 86400: + index = 4 + } + if index < 0 || index >= len(r.windows) { + return 0 + } + total := uint64(0) + for _, bucket := range r.windows[index].buckets { + total += bucket + } + if seconds == 0 { + return 0 + } + return float64(total) / float64(seconds) +} + +// Apply parses login suffixes and applies the configured global difficulty. +func (cd *CustomDiff) Apply(miner *Miner) { + if cd == nil || miner == nil { + return + } + miner.user, miner.customDiff = parseLoginUser(miner.user, cd.globalDiff) +} + +// NewServer constructs a server instance. +func NewServer(bind BindAddr, tlsCfg *tls.Config, limiter *RateLimiter, onAccept func(net.Conn, uint16)) (*Server, Result) { + if onAccept == nil { + onAccept = func(net.Conn, uint16) {} + } + return &Server{ + addr: bind, + tlsCfg: tlsCfg, + limiter: limiter, + onAccept: onAccept, + done: make(chan struct{}), + }, successResult() +} + +// Start begins accepting connections in a goroutine. +func (s *Server) Start() { + if s == nil { + return + } + go func() { + ln, err := net.Listen("tcp", net.JoinHostPort(s.addr.Host, strconv.Itoa(int(s.addr.Port)))) + if err != nil { + return + } + if s.tlsCfg != nil || s.addr.TLS { + if s.tlsCfg != nil { + ln = tls.NewListener(ln, s.tlsCfg) + } + } + s.listener = ln + for { + conn, err := ln.Accept() + if err != nil { + select { + case <-s.done: + return + default: + continue + } + } + if s.limiter != nil && !s.limiter.Allow(conn.RemoteAddr().String()) { + _ = conn.Close() + continue + } + if s.onAccept != nil { + s.onAccept(conn, s.addr.Port) + } + } + }() +} + +// Stop closes the listener. +func (s *Server) Stop() { + if s == nil { + return + } + select { + case <-s.done: + default: + close(s.done) + } + if s.listener != nil { + _ = s.listener.Close() + } +} + +// NewConfig returns a minimal config? not used. + +// NewRateLimiter, Allow, Tick are defined in core_impl.go. + +// IsActive reports whether the limiter has enabled rate limiting. +func (rl *RateLimiter) IsActive() bool { + return rl != nil && rl.cfg.MaxConnectionsPerMinute > 0 +} + +func nextMinerID() int64 { return atomic.AddInt64(&minerSeq, 1) } + +var minerSeq int64 + +type noopSplitter struct{} + +func (n *noopSplitter) Connect() {} +func (n *noopSplitter) OnLogin(event *LoginEvent) {} +func (n *noopSplitter) OnSubmit(event *SubmitEvent) {} +func (n *noopSplitter) OnClose(event *CloseEvent) {} +func (n *noopSplitter) Tick(ticks uint64) {} +func (n *noopSplitter) GC() {} +func (n *noopSplitter) Upstreams() UpstreamStats { return UpstreamStats{} } + +// Difficulty helper for the HTTP summary. +func workerSummaryNow(workers []WorkerRecord) uint64 { + return uint64(len(workers)) +} diff --git a/stats.go b/stats.go index 4ff6559..ca5e536 100644 --- a/stats.go +++ b/stats.go @@ -19,6 +19,7 @@ type Stats struct { expired atomic.Uint64 hashes atomic.Uint64 // cumulative sum of accepted share difficulties connections atomic.Uint64 // total TCP connections accepted (ever) + miners atomic.Uint64 // current connected miners maxMiners atomic.Uint64 // peak concurrent miner count topDiff [10]uint64 // top-10 accepted difficulties, sorted descending; guarded by mu latency []uint16 // pool response latencies in ms; capped at 10000 samples; guarded by mu @@ -53,8 +54,8 @@ type StatsSummary struct { Invalid uint64 `json:"invalid"` Expired uint64 `json:"expired"` Hashes uint64 `json:"hashes_total"` - AvgTime uint32 `json:"avg_time"` // seconds per accepted share - AvgLatency uint32 `json:"latency"` // median pool response latency in ms - Hashrate [6]float64 `json:"hashrate"` // H/s per window (index = HashrateWindow* constants) + AvgTime uint32 `json:"avg_time"` // seconds per accepted share + AvgLatency uint32 `json:"latency"` // median pool response latency in ms + Hashrate [6]float64 `json:"hashrate"` // H/s per window (index = HashrateWindow* constants) TopDiff [10]uint64 `json:"best"` } diff --git a/worker.go b/worker.go index bf729f7..ec08b87 100644 --- a/worker.go +++ b/worker.go @@ -11,9 +11,9 @@ import ( // w := proxy.NewWorkers(proxy.WorkersByRigID, bus) type Workers struct { mode WorkersMode - entries []WorkerRecord // ordered by first-seen (stable) - nameIndex map[string]int // workerName → entries index - idIndex map[int64]int // minerID → entries index + entries []WorkerRecord // ordered by first-seen (stable) + nameIndex map[string]int // workerName → entries index + idIndex map[int64]int // minerID → entries index mu sync.RWMutex } @@ -27,7 +27,7 @@ type WorkerRecord struct { Accepted uint64 Rejected uint64 Invalid uint64 - Hashes uint64 // sum of accepted share difficulties + Hashes uint64 // sum of accepted share difficulties LastHashAt time.Time windows [5]tickWindow // 60s, 600s, 3600s, 12h, 24h }