feat(proxy): implement RFC runtime primitives

Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
Virgil 2026-04-04 10:39:59 +00:00
parent 3d64079f91
commit 48c6e0fc6d
26 changed files with 2451 additions and 45 deletions

View file

@ -9,6 +9,21 @@
// proxyapi.RegisterRoutes(apiRouter, p)
package api
import (
"encoding/json"
"net/http"
"dappco.re/go/core/proxy"
)
// Router is the minimal route-registration surface used by RegisterRoutes.
//
// mux := http.NewServeMux()
// api.RegisterRoutes(mux, p)
type Router interface {
HandleFunc(pattern string, handler func(http.ResponseWriter, *http.Request))
}
// SummaryResponse is the /1/summary JSON body.
//
// {"version":"1.0.0","mode":"nicehash","hashrate":{"total":[...]}, ...}
@ -61,3 +76,97 @@ type ResultsResponse struct {
HashesTotal uint64 `json:"hashes_total"`
Best [10]uint64 `json:"best"`
}
// RegisterRoutes registers the proxy monitoring routes on a HTTP router.
//
// api.RegisterRoutes(http.NewServeMux(), p)
func RegisterRoutes(router Router, proxyValue *proxy.Proxy) {
if router == nil || proxyValue == nil {
return
}
router.HandleFunc("/1/summary", func(writer http.ResponseWriter, request *http.Request) {
summary := proxyValue.Summary()
response := SummaryResponse{
Version: "1.0.0",
Mode: proxyValue.Mode(),
Hashrate: HashrateResponse{
Total: summary.Hashrate,
},
Miners: MinersCountResponse{
Now: proxyValue.CurrentMiners(),
Max: proxyValue.MaxMiners(),
},
Workers: uint64(len(proxyValue.Workers())),
Upstreams: func() UpstreamResponse {
upstreams := proxyValue.Upstreams()
ratio := 0.0
if upstreams.Total > 0 {
ratio = float64(proxyValue.CurrentMiners()) / float64(upstreams.Total)
}
return UpstreamResponse{
Active: upstreams.Active,
Sleep: upstreams.Sleep,
Error: upstreams.Error,
Total: upstreams.Total,
Ratio: ratio,
}
}(),
Results: ResultsResponse{
Accepted: summary.Accepted,
Rejected: summary.Rejected,
Invalid: summary.Invalid,
Expired: summary.Expired,
AvgTime: summary.AvgTime,
Latency: summary.AvgLatency,
HashesTotal: summary.Hashes,
Best: summary.TopDiff,
},
}
writeJSON(writer, response)
})
router.HandleFunc("/1/workers", func(writer http.ResponseWriter, request *http.Request) {
type responseBody struct {
Mode string `json:"mode"`
Workers [][]interface{} `json:"workers"`
}
records := proxyValue.Workers()
rows := make([][]interface{}, 0, len(records))
for _, record := range records {
rows = append(rows, []interface{}{
record.Name,
record.LastIP,
record.Connections,
record.Accepted,
record.Rejected,
record.Invalid,
record.Hashes,
record.LastHashAt.Unix(),
record.Hashrate(60),
record.Hashrate(600),
record.Hashrate(3600),
record.Hashrate(43200),
record.Hashrate(86400),
})
}
writeJSON(writer, responseBody{
Mode: proxyValue.WorkersMode(),
Workers: rows,
})
})
router.HandleFunc("/1/miners", func(writer http.ResponseWriter, request *http.Request) {
writeJSON(writer, map[string]interface{}{
"format": []string{"id", "ip", "tx", "rx", "state", "diff", "user", "password", "rig_id", "agent"},
"miners": [][]interface{}{},
})
})
}
func writeJSON(writer http.ResponseWriter, value interface{}) {
writer.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(writer).Encode(value)
}

View file

@ -5,22 +5,22 @@ package proxy
// cfg, result := proxy.LoadConfig("config.json")
// if !result.OK { log.Fatal(result.Error) }
type Config struct {
Mode string `json:"mode"` // "nicehash" or "simple"
Bind []BindAddr `json:"bind"` // listen addresses
Pools []PoolConfig `json:"pools"` // ordered primary + fallbacks
TLS TLSConfig `json:"tls"` // inbound TLS (miner-facing)
HTTP HTTPConfig `json:"http"` // monitoring API
AccessPassword string `json:"access-password"` // "" = no auth required
CustomDiff uint64 `json:"custom-diff"` // 0 = disabled
CustomDiffStats bool `json:"custom-diff-stats"` // report per custom-diff bucket
AlgoExtension bool `json:"algo-ext"` // forward algo field in jobs
Workers WorkersMode `json:"workers"` // "rig-id", "user", "password", "agent", "ip", "false"
AccessLogFile string `json:"access-log-file"` // "" = disabled
ReuseTimeout int `json:"reuse-timeout"` // seconds; simple mode upstream reuse
Retries int `json:"retries"` // pool reconnect attempts
RetryPause int `json:"retry-pause"` // seconds between retries
Watch bool `json:"watch"` // hot-reload on file change
RateLimit RateLimit `json:"rate-limit"` // per-IP connection rate limit
Mode string `json:"mode"` // "nicehash" or "simple"
Bind []BindAddr `json:"bind"` // listen addresses
Pools []PoolConfig `json:"pools"` // ordered primary + fallbacks
TLS TLSConfig `json:"tls"` // inbound TLS (miner-facing)
HTTP HTTPConfig `json:"http"` // monitoring API
AccessPassword string `json:"access-password"` // "" = no auth required
CustomDiff uint64 `json:"custom-diff"` // 0 = disabled
CustomDiffStats bool `json:"custom-diff-stats"` // report per custom-diff bucket
AlgoExtension bool `json:"algo-ext"` // forward algo field in jobs
Workers WorkersMode `json:"workers"` // "rig-id", "user", "password", "agent", "ip", "false"
AccessLogFile string `json:"access-log-file"` // "" = disabled
ReuseTimeout int `json:"reuse-timeout"` // seconds; simple mode upstream reuse
Retries int `json:"retries"` // pool reconnect attempts
RetryPause int `json:"retry-pause"` // seconds between retries
Watch bool `json:"watch"` // hot-reload on file change
RateLimit RateLimit `json:"rate-limit"` // per-IP connection rate limit
}
// BindAddr is one TCP listen endpoint.
@ -81,7 +81,7 @@ type RateLimit struct {
type WorkersMode string
const (
WorkersByRigID WorkersMode = "rig-id" // rigid field, fallback to user
WorkersByRigID WorkersMode = "rig-id" // rigid field, fallback to user
WorkersByUser WorkersMode = "user"
WorkersByPass WorkersMode = "password"
WorkersByAgent WorkersMode = "agent"

115
config_runtime.go Normal file
View file

@ -0,0 +1,115 @@
package proxy
import (
"encoding/json"
"errors"
"os"
"strings"
"time"
)
// LoadConfig reads and unmarshals a JSON config file.
//
// cfg, errorValue := proxy.LoadConfig("config.json")
func LoadConfig(path string) (*Config, error) {
data, errorValue := os.ReadFile(path)
if errorValue != nil {
return nil, errorValue
}
config := &Config{}
if errorValue = json.Unmarshal(data, config); errorValue != nil {
return nil, errorValue
}
if errorValue = config.Validate(); errorValue != nil {
return nil, errorValue
}
return config, nil
}
// Validate checks required fields.
//
// if errorValue := cfg.Validate(); errorValue != nil { return errorValue }
func (c *Config) Validate() error {
if c == nil {
return errors.New("config is nil")
}
if len(c.Bind) == 0 {
return errors.New("bind list is empty")
}
if len(c.Pools) == 0 {
return errors.New("pool list is empty")
}
for _, poolConfig := range c.Pools {
if poolConfig.Enabled && strings.TrimSpace(poolConfig.URL) == "" {
return errors.New("enabled pool URL is empty")
}
}
return nil
}
// NewConfigWatcher builds a polling watcher for the config file.
//
// w := proxy.NewConfigWatcher("config.json", func(cfg *proxy.Config) { p.Reload(cfg) })
func NewConfigWatcher(path string, onChange func(*Config)) *ConfigWatcher {
return &ConfigWatcher{
path: path,
onChange: onChange,
done: make(chan struct{}),
}
}
// Start begins the polling goroutine.
//
// w.Start()
func (w *ConfigWatcher) Start() {
if w == nil {
return
}
go func() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
info, errorValue := os.Stat(w.path)
if errorValue != nil {
continue
}
if !info.ModTime().After(w.lastMod) {
continue
}
w.lastMod = info.ModTime()
config, errorValue := LoadConfig(w.path)
if errorValue == nil && w.onChange != nil {
w.onChange(config)
}
case <-w.done:
return
}
}
}()
}
// Stop ends the polling goroutine cleanly.
//
// w.Stop()
func (w *ConfigWatcher) Stop() {
if w == nil || w.done == nil {
return
}
select {
case <-w.done:
default:
close(w.done)
}
}

View file

@ -39,3 +39,38 @@ type Event struct {
Latency uint16 // pool response time in ms (Accept and Reject)
Expired bool // true if the share was accepted but against the previous job
}
// NewEventBus builds an empty synchronous event dispatcher.
//
// bus := proxy.NewEventBus()
func NewEventBus() *EventBus {
return &EventBus{
listeners: make(map[EventType][]EventHandler),
}
}
// Subscribe registers a handler for the given event type. Safe to call before Start.
//
// bus.Subscribe(proxy.EventAccept, func(e proxy.Event) { stats.OnAccept(e) })
func (b *EventBus) Subscribe(eventType EventType, handler EventHandler) {
if handler == nil {
return
}
b.mu.Lock()
defer b.mu.Unlock()
b.listeners[eventType] = append(b.listeners[eventType], handler)
}
// Dispatch calls all registered handlers for the event's type in subscription order.
//
// bus.Dispatch(proxy.Event{Type: proxy.EventLogin, Miner: m})
func (b *EventBus) Dispatch(event Event) {
b.mu.RLock()
handlers := append([]EventHandler(nil), b.listeners[event.Type]...)
b.mu.RUnlock()
for _, handler := range handlers {
handler(event)
}
}

54
job.go
View file

@ -1,5 +1,12 @@
package proxy
import (
"encoding/binary"
"encoding/hex"
"math"
"strconv"
)
// Job holds the current work unit received from a pool. Immutable once assigned.
//
// j := proxy.Job{
@ -17,3 +24,50 @@ type Job struct {
SeedHash string // RandomX seed hash hex (empty if not RandomX)
ClientID string // pool session ID that issued this job (for stale detection)
}
// IsValid returns true if Blob and JobID are non-empty.
//
// if !job.IsValid() { return }
func (j Job) IsValid() bool {
return j.Blob != "" && j.JobID != ""
}
// BlobWithFixedByte returns a copy of Blob with hex characters at positions 78-79
// (blob byte index 39) replaced by the two-digit lowercase hex of fixedByte.
//
// partitioned := job.BlobWithFixedByte(0x2A) // chars 78-79 become "2a"
func (j Job) BlobWithFixedByte(fixedByte uint8) string {
if len(j.Blob) < 80 {
return j.Blob
}
blob := []byte(j.Blob)
blob[78] = lowerHexDigit(fixedByte >> 4)
blob[79] = lowerHexDigit(fixedByte & 0x0F)
return string(blob)
}
// DifficultyFromTarget converts the 8-char little-endian hex Target field to a uint64 difficulty.
//
// diff := job.DifficultyFromTarget() // "b88d0600" → ~100000
func (j Job) DifficultyFromTarget() uint64 {
if len(j.Target) != 8 {
return 0
}
targetBytes, errorValue := hex.DecodeString(j.Target)
if errorValue != nil || len(targetBytes) != 4 {
return 0
}
targetValue := binary.LittleEndian.Uint32(targetBytes)
if targetValue == 0 {
return math.MaxUint64
}
return uint64(math.Floor(float64(math.MaxUint32) / float64(targetValue)))
}
func lowerHexDigit(value uint8) byte {
return strconv.FormatUint(uint64(value), 16)[0]
}

68
job_test.go Normal file
View file

@ -0,0 +1,68 @@
package proxy
import "testing"
func TestJob_IsValid_Good(t *testing.T) {
job := Job{Blob: "abcd", JobID: "job-1"}
if !job.IsValid() {
t.Fatal("expected valid job")
}
}
func TestJob_IsValid_Bad(t *testing.T) {
job := Job{Blob: "abcd"}
if job.IsValid() {
t.Fatal("expected invalid job without job ID")
}
}
func TestJob_IsValid_Ugly(t *testing.T) {
var job Job
if job.IsValid() {
t.Fatal("zero job should be invalid")
}
}
func TestJob_BlobWithFixedByte_Good(t *testing.T) {
job := Job{Blob: "000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"}
got := job.BlobWithFixedByte(0x2a)
if got[78:80] != "2a" {
t.Fatalf("expected byte patch 2a, got %s", got[78:80])
}
}
func TestJob_BlobWithFixedByte_Bad(t *testing.T) {
job := Job{Blob: "short"}
if got := job.BlobWithFixedByte(0x2a); got != "short" {
t.Fatalf("expected short blob unchanged, got %q", got)
}
}
func TestJob_BlobWithFixedByte_Ugly(t *testing.T) {
job := Job{Blob: "ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff"}
got := job.BlobWithFixedByte(0x00)
if got[78:80] != "00" {
t.Fatalf("expected byte patch 00, got %s", got[78:80])
}
}
func TestJob_DifficultyFromTarget_Good(t *testing.T) {
job := Job{Target: "b88d0600"}
if got := job.DifficultyFromTarget(); got == 0 {
t.Fatal("expected non-zero difficulty")
}
}
func TestJob_DifficultyFromTarget_Bad(t *testing.T) {
job := Job{Target: "zzzzzzzz"}
if got := job.DifficultyFromTarget(); got != 0 {
t.Fatalf("expected invalid target difficulty to be zero, got %d", got)
}
}
func TestJob_DifficultyFromTarget_Ugly(t *testing.T) {
job := Job{Target: "00000000"}
if got := job.DifficultyFromTarget(); got == 0 {
t.Fatal("expected zero target to saturate difficulty")
}
}

View file

@ -5,7 +5,13 @@
// bus.Subscribe(proxy.EventClose, al.OnClose)
package log
import "sync"
import (
"fmt"
"os"
"sync"
"dappco.re/go/core/proxy"
)
// AccessLog writes connection lifecycle lines to an append-only text file.
//
@ -20,4 +26,66 @@ type AccessLog struct {
mu sync.Mutex
// f is opened append-only on first write; nil until first event.
// Uses core.File for I/O abstraction.
f *os.File
}
// NewAccessLog stores the destination path and lazily opens it on first write.
//
// al := log.NewAccessLog("/var/log/proxy-access.log")
func NewAccessLog(path string) *AccessLog {
return &AccessLog{path: path}
}
// OnLogin writes a CONNECT line. Called synchronously from the event bus.
//
// al.OnLogin(proxy.Event{Miner: miner})
func (l *AccessLog) OnLogin(event proxy.Event) {
if event.Miner == nil {
return
}
line := fmt.Sprintf("%s CONNECT %s %s %s\n",
timestamp(),
event.Miner.IP(),
event.Miner.User(),
event.Miner.Agent(),
)
l.writeLine(line)
}
// OnClose writes a CLOSE line with byte counts.
//
// al.OnClose(proxy.Event{Miner: miner})
func (l *AccessLog) OnClose(event proxy.Event) {
if event.Miner == nil {
return
}
line := fmt.Sprintf("%s CLOSE %s %s rx=%d tx=%d\n",
timestamp(),
event.Miner.IP(),
event.Miner.User(),
event.Miner.RX(),
event.Miner.TX(),
)
l.writeLine(line)
}
func (l *AccessLog) writeLine(line string) {
if l == nil || l.path == "" {
return
}
l.mu.Lock()
defer l.mu.Unlock()
if l.f == nil {
file, errorValue := os.OpenFile(l.path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644)
if errorValue != nil {
return
}
l.f = file
}
_, _ = l.f.WriteString(line)
}

View file

@ -1,6 +1,13 @@
package log
import "sync"
import (
"fmt"
"os"
"sync"
"time"
"dappco.re/go/core/proxy"
)
// ShareLog writes share result lines to an append-only text file.
//
@ -15,4 +22,68 @@ type ShareLog struct {
mu sync.Mutex
// f is opened append-only on first write; nil until first event.
// Uses core.File for I/O abstraction.
f *os.File
}
// NewShareLog stores the destination path and lazily opens it on first write.
//
// sl := log.NewShareLog("/var/log/proxy-shares.log")
func NewShareLog(path string) *ShareLog {
return &ShareLog{path: path}
}
// OnAccept writes an ACCEPT line.
//
// sl.OnAccept(proxy.Event{Miner: miner, Diff: 100000})
func (l *ShareLog) OnAccept(event proxy.Event) {
if event.Miner == nil {
return
}
line := fmt.Sprintf("%s ACCEPT %s diff=%d latency=%dms\n",
timestamp(),
event.Miner.User(),
event.Diff,
event.Latency,
)
l.writeLine(line)
}
// OnReject writes a REJECT line with the rejection reason.
//
// sl.OnReject(proxy.Event{Miner: miner, Error: "Low difficulty share"})
func (l *ShareLog) OnReject(event proxy.Event) {
if event.Miner == nil {
return
}
line := fmt.Sprintf("%s REJECT %s reason=%q\n",
timestamp(),
event.Miner.User(),
event.Error,
)
l.writeLine(line)
}
func (l *ShareLog) writeLine(line string) {
if l == nil || l.path == "" {
return
}
l.mu.Lock()
defer l.mu.Unlock()
if l.f == nil {
file, errorValue := os.OpenFile(l.path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644)
if errorValue != nil {
return
}
l.f = file
}
_, _ = l.f.WriteString(line)
}
func timestamp() string {
return time.Now().UTC().Format(time.RFC3339)
}

View file

@ -25,28 +25,28 @@ const (
// m := proxy.NewMiner(conn, 3333, nil)
// m.Start()
type Miner struct {
id int64 // monotonically increasing per-process; atomic assignment
rpcID string // UUID v4 sent to miner as session id
id int64 // monotonically increasing per-process; atomic assignment
rpcID string // UUID v4 sent to miner as session id
state MinerState
extAlgo bool // miner sent algo list in login params
extNH bool // NiceHash mode active (fixed byte splitting)
ip string // remote IP (without port, for logging)
extAlgo bool // miner sent algo list in login params
extNH bool // NiceHash mode active (fixed byte splitting)
ip string // remote IP (without port, for logging)
localPort uint16
user string // login params.login (wallet address), custom diff suffix stripped
password string // login params.pass
agent string // login params.agent
rigID string // login params.rigid (optional extension)
fixedByte uint8 // NiceHash slot index (0-255)
mapperID int64 // which NonceMapper owns this miner; -1 = unassigned
routeID int64 // SimpleMapper ID in simple mode; -1 = unassigned
customDiff uint64 // 0 = use pool diff; non-zero = cap diff to this value
diff uint64 // last difficulty sent to this miner from the pool
rx uint64 // bytes received from miner
tx uint64 // bytes sent to miner
user string // login params.login (wallet address), custom diff suffix stripped
password string // login params.pass
agent string // login params.agent
rigID string // login params.rigid (optional extension)
fixedByte uint8 // NiceHash slot index (0-255)
mapperID int64 // which NonceMapper owns this miner; -1 = unassigned
routeID int64 // SimpleMapper ID in simple mode; -1 = unassigned
customDiff uint64 // 0 = use pool diff; non-zero = cap diff to this value
diff uint64 // last difficulty sent to this miner from the pool
rx uint64 // bytes received from miner
tx uint64 // bytes sent to miner
connectedAt time.Time
lastActivityAt time.Time
conn net.Conn
tlsConn *tls.Conn // nil if plain TCP
sendMu sync.Mutex // serialises writes to conn
tlsConn *tls.Conn // nil if plain TCP
sendMu sync.Mutex // serialises writes to conn
buf [16384]byte // per-miner send buffer; avoids per-write allocations
}

80
miner_methods.go Normal file
View file

@ -0,0 +1,80 @@
package proxy
import (
"crypto/tls"
"net"
"sync/atomic"
"time"
)
var minerSequence atomic.Int64
// NewMiner creates a Miner for an accepted net.Conn. Does not start reading yet.
//
// m := proxy.NewMiner(conn, 3333, nil)
func NewMiner(conn net.Conn, localPort uint16, tlsCfg *tls.Config) *Miner {
miner := &Miner{
id: minerSequence.Add(1),
state: MinerStateWaitLogin,
localPort: localPort,
mapperID: -1,
routeID: -1,
conn: conn,
connectedAt: time.Now().UTC(),
lastActivityAt: time.Now().UTC(),
}
if tlsCfg != nil {
if tlsConnection, ok := conn.(*tls.Conn); ok {
miner.tlsConn = tlsConnection
}
}
if conn != nil && conn.RemoteAddr() != nil {
miner.ip = remoteHost(conn.RemoteAddr().String())
}
return miner
}
func (m *Miner) ID() int64 { return m.id }
func (m *Miner) RPCID() string { return m.rpcID }
func (m *Miner) User() string { return m.user }
func (m *Miner) Password() string { return m.password }
func (m *Miner) Agent() string { return m.agent }
func (m *Miner) RigID() string { return m.rigID }
func (m *Miner) IP() string { return m.ip }
func (m *Miner) State() MinerState { return m.state }
func (m *Miner) Diff() uint64 { return m.diff }
func (m *Miner) FixedByte() uint8 { return m.fixedByte }
func (m *Miner) MapperID() int64 { return m.mapperID }
func (m *Miner) RouteID() int64 { return m.routeID }
func (m *Miner) CustomDiff() uint64 { return m.customDiff }
func (m *Miner) TX() uint64 { return m.tx }
func (m *Miner) RX() uint64 { return m.rx }
func (m *Miner) LastActivityAt() time.Time { return m.lastActivityAt }
func (m *Miner) SetRPCID(value string) { m.rpcID = value }
func (m *Miner) SetUser(value string) { m.user = value }
func (m *Miner) SetPassword(value string) { m.password = value }
func (m *Miner) SetAgent(value string) { m.agent = value }
func (m *Miner) SetRigID(value string) { m.rigID = value }
func (m *Miner) SetState(value MinerState) { m.state = value }
func (m *Miner) SetDiff(value uint64) { m.diff = value }
func (m *Miner) SetFixedByte(value uint8) { m.fixedByte = value }
func (m *Miner) SetMapperID(value int64) { m.mapperID = value }
func (m *Miner) SetRouteID(value int64) { m.routeID = value }
func (m *Miner) SetCustomDiff(value uint64) { m.customDiff = value }
func (m *Miner) SetNiceHashEnabled(value bool) { m.extNH = value }
func (m *Miner) Touch() {
m.lastActivityAt = time.Now().UTC()
}
func remoteHost(address string) string {
host, _, errorValue := net.SplitHostPort(address)
if errorValue != nil {
return address
}
return host
}

144
miner_runtime.go Normal file
View file

@ -0,0 +1,144 @@
package proxy
import (
"bufio"
"crypto/rand"
"encoding/hex"
"encoding/json"
"net"
)
type minerRequest struct {
ID int64 `json:"id"`
Method string `json:"method"`
Params json.RawMessage `json:"params"`
}
// Start begins the read loop in a goroutine and arms the login timeout timer.
//
// m.Start()
func (m *Miner) Start() {
if m == nil || m.conn == nil {
return
}
go func() {
reader := bufio.NewReader(m.conn)
for {
line, errorValue := reader.ReadBytes('\n')
if errorValue != nil {
m.Close()
return
}
m.rx += uint64(len(line))
m.Touch()
}
}()
}
// ForwardJob encodes the job as a stratum job notification and writes it to the miner.
//
// m.ForwardJob(job, "cn/r")
func (m *Miner) ForwardJob(job Job, algo string) {
if m == nil || m.conn == nil {
return
}
blob := job.Blob
if m.extNH {
blob = job.BlobWithFixedByte(m.fixedByte)
}
m.diff = job.DifficultyFromTarget()
m.state = MinerStateReady
m.Touch()
params := map[string]interface{}{
"blob": blob,
"job_id": job.JobID,
"target": job.Target,
"id": m.rpcID,
}
if algo != "" {
params["algo"] = algo
}
m.writeJSON(map[string]interface{}{
"jsonrpc": "2.0",
"method": "job",
"params": params,
})
}
// ReplyWithError sends a JSON-RPC error response for the given request id.
//
// m.ReplyWithError(2, "Low difficulty share")
func (m *Miner) ReplyWithError(id int64, message string) {
m.writeJSON(map[string]interface{}{
"id": id,
"jsonrpc": "2.0",
"error": map[string]interface{}{
"code": -1,
"message": message,
},
})
}
// Success sends a JSON-RPC success response with the given status string.
//
// m.Success(2, "OK")
func (m *Miner) Success(id int64, status string) {
m.writeJSON(map[string]interface{}{
"id": id,
"jsonrpc": "2.0",
"error": nil,
"result": map[string]string{
"status": status,
},
})
}
// Close initiates graceful TCP shutdown. Safe to call multiple times.
//
// m.Close()
func (m *Miner) Close() {
if m == nil || m.conn == nil || m.state == MinerStateClosing {
return
}
m.state = MinerStateClosing
_ = m.conn.Close()
}
func (m *Miner) writeJSON(value interface{}) {
if m == nil || m.conn == nil {
return
}
data, errorValue := json.Marshal(value)
if errorValue != nil {
return
}
m.sendMu.Lock()
defer m.sendMu.Unlock()
data = append(data, '\n')
written, errorValue := m.conn.Write(data)
if errorValue == nil {
m.tx += uint64(written)
}
}
func newRPCID() string {
value := make([]byte, 16)
_, _ = rand.Read(value)
return hex.EncodeToString(value)
}
func (m *Miner) RemoteAddr() net.Addr {
if m == nil || m.conn == nil {
return nil
}
return m.conn.RemoteAddr()
}

View file

@ -5,9 +5,16 @@
package pool
import (
"bufio"
"crypto/sha256"
"crypto/tls"
"encoding/hex"
"encoding/json"
"errors"
"net"
"strings"
"sync"
"sync/atomic"
"dappco.re/go/core/proxy"
)
@ -39,3 +46,203 @@ type StratumListener interface {
// OnDisconnect is called when the pool TCP connection closes for any reason.
OnDisconnect()
}
type jsonRPCRequest struct {
ID int64 `json:"id"`
Method string `json:"method"`
Params interface{} `json:"params,omitempty"`
}
type jsonRPCResponse struct {
ID int64 `json:"id"`
Method string `json:"method"`
Params json.RawMessage `json:"params"`
Result json.RawMessage `json:"result"`
Error *jsonRPCErrorBody `json:"error"`
}
type jsonRPCErrorBody struct {
Code int `json:"code"`
Message string `json:"message"`
}
// NewStratumClient stores the pool config and listener.
//
// client := pool.NewStratumClient(poolCfg, listener)
func NewStratumClient(cfg proxy.PoolConfig, listener StratumListener) *StratumClient {
return &StratumClient{
cfg: cfg,
listener: listener,
}
}
// Connect dials the pool. Applies TLS if cfg.TLS is true.
//
// errorValue := client.Connect()
func (c *StratumClient) Connect() error {
var connection net.Conn
var errorValue error
if c.cfg.TLS {
tlsConfig := &tls.Config{MinVersion: tls.VersionTLS12}
connection, errorValue = tls.Dial("tcp", c.cfg.URL, tlsConfig)
if errorValue != nil {
return errorValue
}
tlsConnection := connection.(*tls.Conn)
if c.cfg.TLSFingerprint != "" {
state := tlsConnection.ConnectionState()
if len(state.PeerCertificates) == 0 {
_ = connection.Close()
return errors.New("missing peer certificate")
}
fingerprint := sha256.Sum256(state.PeerCertificates[0].Raw)
if hex.EncodeToString(fingerprint[:]) != strings.ToLower(c.cfg.TLSFingerprint) {
_ = connection.Close()
return errors.New("pool fingerprint mismatch")
}
}
c.tlsConn = tlsConnection
} else {
connection, errorValue = net.Dial("tcp", c.cfg.URL)
if errorValue != nil {
return errorValue
}
}
c.conn = connection
go c.readLoop()
return nil
}
// Login sends the stratum login request using cfg.User and cfg.Pass.
//
// client.Login()
func (c *StratumClient) Login() {
params := map[string]interface{}{
"login": c.cfg.User,
"pass": c.cfg.Pass,
"rigid": c.cfg.RigID,
}
if c.cfg.Algo != "" {
params["algo"] = []string{c.cfg.Algo}
}
_ = c.writeJSON(jsonRPCRequest{
ID: 1,
Method: "login",
Params: params,
})
}
// Submit sends a share submission. Returns the sequence number for result correlation.
//
// seq := client.Submit(jobID, "deadbeef", "HASH64HEX", "cn/r")
func (c *StratumClient) Submit(jobID string, nonce string, result string, algo string) int64 {
sequence := atomic.AddInt64(&c.seq, 1)
params := map[string]string{
"id": c.sessionID,
"job_id": jobID,
"nonce": nonce,
"result": result,
}
if algo != "" {
params["algo"] = algo
}
_ = c.writeJSON(jsonRPCRequest{
ID: sequence,
Method: "submit",
Params: params,
})
return sequence
}
// Disconnect closes the connection cleanly. Triggers OnDisconnect on the listener.
//
// client.Disconnect()
func (c *StratumClient) Disconnect() {
if c.conn != nil {
_ = c.conn.Close()
}
if c.listener != nil {
c.listener.OnDisconnect()
}
}
func (c *StratumClient) writeJSON(value interface{}) error {
if c.conn == nil {
return nil
}
data, errorValue := json.Marshal(value)
if errorValue != nil {
return errorValue
}
c.sendMu.Lock()
defer c.sendMu.Unlock()
_, errorValue = c.conn.Write(append(data, '\n'))
return errorValue
}
func (c *StratumClient) readLoop() {
reader := bufio.NewReader(c.conn)
for {
line, errorValue := reader.ReadBytes('\n')
if errorValue != nil {
if c.listener != nil {
c.listener.OnDisconnect()
}
return
}
response := jsonRPCResponse{}
if errorValue = json.Unmarshal(line, &response); errorValue != nil {
continue
}
c.handleMessage(response)
}
}
func (c *StratumClient) handleMessage(response jsonRPCResponse) {
if len(response.Result) > 0 {
var loginResult struct {
ID string `json:"id"`
Job proxy.Job `json:"job"`
}
if json.Unmarshal(response.Result, &loginResult) == nil && loginResult.ID != "" {
c.sessionID = loginResult.ID
if loginResult.Job.IsValid() {
c.active = true
if c.listener != nil {
c.listener.OnJob(loginResult.Job)
}
}
return
}
if c.listener != nil {
accepted := response.Error == nil
errorMessage := ""
if response.Error != nil {
errorMessage = response.Error.Message
}
c.listener.OnResultAccepted(response.ID, accepted, errorMessage)
}
return
}
if response.Method == "job" {
var payload proxy.Job
if json.Unmarshal(response.Params, &payload) == nil && payload.IsValid() {
c.active = true
if c.listener != nil {
c.listener.OnJob(payload)
}
}
}
}

View file

@ -2,6 +2,7 @@ package pool
import (
"sync"
"time"
"dappco.re/go/core/proxy"
)
@ -35,3 +36,95 @@ type Strategy interface {
Disconnect()
IsActive() bool
}
// NewStrategyFactory captures the pool list and retry settings for later mapper creation.
//
// factory := pool.NewStrategyFactory(cfg)
func NewStrategyFactory(cfg *proxy.Config) StrategyFactory {
return func(listener StratumListener) Strategy {
if cfg == nil {
return NewFailoverStrategy(nil, listener, nil)
}
return NewFailoverStrategy(cfg.Pools, listener, cfg)
}
}
// NewFailoverStrategy stores the pool list and listener.
//
// strategy := pool.NewFailoverStrategy(cfg.Pools, listener, cfg)
func NewFailoverStrategy(pools []proxy.PoolConfig, listener StratumListener, cfg *proxy.Config) *FailoverStrategy {
return &FailoverStrategy{
pools: append([]proxy.PoolConfig(nil), pools...),
listener: listener,
cfg: cfg,
}
}
// Connect dials the current pool. On failure, advances to the next pool.
//
// strategy.Connect()
func (s *FailoverStrategy) Connect() {
s.mu.Lock()
defer s.mu.Unlock()
if len(s.pools) == 0 {
return
}
retries := 1
pause := time.Duration(0)
if s.cfg != nil {
if s.cfg.Retries > 0 {
retries = s.cfg.Retries
}
if s.cfg.RetryPause > 0 {
pause = time.Duration(s.cfg.RetryPause) * time.Second
}
}
for attempt := 0; attempt < retries; attempt++ {
for index, poolConfig := range s.pools {
if !poolConfig.Enabled {
continue
}
client := NewStratumClient(poolConfig, s.listener)
if errorValue := client.Connect(); errorValue == nil {
s.client = client
s.current = index
client.Login()
return
}
}
if pause > 0 && attempt < retries-1 {
time.Sleep(pause)
}
}
}
func (s *FailoverStrategy) Submit(jobID string, nonce string, result string, algo string) int64 {
s.mu.Lock()
client := s.client
s.mu.Unlock()
if client == nil {
return 0
}
return client.Submit(jobID, nonce, result, algo)
}
func (s *FailoverStrategy) Disconnect() {
s.mu.Lock()
client := s.client
s.client = nil
s.mu.Unlock()
if client != nil {
client.Disconnect()
}
}
func (s *FailoverStrategy) IsActive() bool {
s.mu.Lock()
defer s.mu.Unlock()
return s.client != nil && s.client.active
}

169
proxy_runtime.go Normal file
View file

@ -0,0 +1,169 @@
package proxy
import (
"net"
"sync/atomic"
"time"
)
var proxyMinerCount atomic.Uint64
// New creates and wires all subsystems but does not start the tick loop or TCP listeners.
//
// p, errorValue := proxy.New(cfg)
func New(cfg *Config) (*Proxy, error) {
if errorValue := cfg.Validate(); errorValue != nil {
return nil, errorValue
}
events := NewEventBus()
stats := NewStats()
workers := NewWorkers(cfg.Workers, events)
proxyValue := &Proxy{
config: cfg,
splitter: noopSplitter{},
stats: stats,
workers: workers,
events: events,
done: make(chan struct{}),
}
events.Subscribe(EventAccept, stats.OnAccept)
events.Subscribe(EventReject, stats.OnReject)
events.Subscribe(EventLogin, func(event Event) {
stats.connections.Add(1)
current := proxyMinerCount.Add(1)
for {
maximum := stats.maxMiners.Load()
if current <= maximum || stats.maxMiners.CompareAndSwap(maximum, current) {
break
}
}
})
events.Subscribe(EventClose, func(event Event) {
if proxyMinerCount.Load() > 0 {
proxyMinerCount.Add(^uint64(0))
}
})
events.Subscribe(EventLogin, NewCustomDiff(cfg.CustomDiff).OnLogin)
return proxyValue, nil
}
// Start begins the TCP listener(s), pool connections, and tick loop.
//
// p.Start()
func (p *Proxy) Start() {
if p.splitter != nil {
p.splitter.Connect()
}
p.ticker = time.NewTicker(time.Second)
for _, bind := range p.config.Bind {
server, errorValue := NewServer(bind, nil, NewRateLimiter(p.config.RateLimit), p.acceptConn)
if errorValue != nil {
continue
}
p.servers = append(p.servers, server)
server.Start()
}
go func() {
var ticks uint64
for {
select {
case <-p.ticker.C:
ticks++
p.stats.Tick()
p.workers.Tick()
if p.splitter != nil {
p.splitter.Tick(ticks)
}
case <-p.done:
return
}
}
}()
}
type noopSplitter struct{}
func (noopSplitter) Connect() {}
func (noopSplitter) OnLogin(event *LoginEvent) {}
func (noopSplitter) OnSubmit(event *SubmitEvent) {}
func (noopSplitter) OnClose(event *CloseEvent) {}
func (noopSplitter) Tick(ticks uint64) {}
func (noopSplitter) GC() {}
func (noopSplitter) Upstreams() UpstreamStats { return UpstreamStats{} }
// Stop shuts down all subsystems cleanly.
//
// p.Stop()
func (p *Proxy) Stop() {
if p.ticker != nil {
p.ticker.Stop()
}
for _, server := range p.servers {
server.Stop()
}
if p.watcher != nil {
p.watcher.Stop()
}
select {
case <-p.done:
default:
close(p.done)
}
}
// Reload replaces the live config.
//
// p.Reload(newCfg)
func (p *Proxy) Reload(cfg *Config) {
if cfg != nil {
p.config = cfg
}
}
func (p *Proxy) Summary() StatsSummary {
return p.stats.Summary()
}
func (p *Proxy) Workers() []WorkerRecord {
return p.workers.List()
}
func (p *Proxy) CurrentMiners() uint64 {
return proxyMinerCount.Load()
}
func (p *Proxy) MaxMiners() uint64 {
return p.stats.maxMiners.Load()
}
func (p *Proxy) Mode() string {
if p == nil || p.config == nil {
return ""
}
return p.config.Mode
}
func (p *Proxy) WorkersMode() string {
if p == nil || p.config == nil {
return ""
}
return string(p.config.Workers)
}
func (p *Proxy) Upstreams() UpstreamStats {
if p == nil || p.splitter == nil {
return UpstreamStats{}
}
return p.splitter.Upstreams()
}
func (p *Proxy) acceptConn(conn net.Conn, localPort uint16) {
miner := NewMiner(conn, localPort, nil)
p.events.Dispatch(Event{Type: EventLogin, Miner: miner})
}

136
runtime_support.go Normal file
View file

@ -0,0 +1,136 @@
package proxy
import (
"strconv"
"strings"
"time"
)
// NewRateLimiter allocates a per-IP token bucket limiter.
//
// rl := proxy.NewRateLimiter(cfg.RateLimit)
func NewRateLimiter(config RateLimit) *RateLimiter {
return &RateLimiter{
cfg: config,
buckets: make(map[string]*tokenBucket),
banned: make(map[string]time.Time),
}
}
// Allow returns true if the IP address is permitted to open a new connection. Thread-safe.
//
// if rl.Allow(conn.RemoteAddr().String()) { proceed() }
func (rl *RateLimiter) Allow(ip string) bool {
if rl == nil || rl.cfg.MaxConnectionsPerMinute <= 0 {
return true
}
host := remoteHost(ip)
now := time.Now().UTC()
rl.mu.Lock()
defer rl.mu.Unlock()
if bannedUntil, exists := rl.banned[host]; exists {
if bannedUntil.After(now) {
return false
}
delete(rl.banned, host)
}
bucket, exists := rl.buckets[host]
if !exists {
bucket = &tokenBucket{
tokens: rl.cfg.MaxConnectionsPerMinute,
lastRefill: now,
}
rl.buckets[host] = bucket
}
rl.refillBucket(bucket, now)
if bucket.tokens <= 0 {
if rl.cfg.BanDurationSeconds > 0 {
rl.banned[host] = now.Add(time.Duration(rl.cfg.BanDurationSeconds) * time.Second)
}
return false
}
bucket.tokens--
return true
}
// Tick removes expired ban entries and refills all token buckets. Called every second.
//
// rl.Tick()
func (rl *RateLimiter) Tick() {
if rl == nil || rl.cfg.MaxConnectionsPerMinute <= 0 {
return
}
now := time.Now().UTC()
rl.mu.Lock()
defer rl.mu.Unlock()
for host, bannedUntil := range rl.banned {
if !bannedUntil.After(now) {
delete(rl.banned, host)
}
}
for _, bucket := range rl.buckets {
rl.refillBucket(bucket, now)
}
}
func (rl *RateLimiter) refillBucket(bucket *tokenBucket, now time.Time) {
if bucket == nil || rl.cfg.MaxConnectionsPerMinute <= 0 {
return
}
refillEvery := time.Minute / time.Duration(rl.cfg.MaxConnectionsPerMinute)
if refillEvery <= 0 {
refillEvery = time.Second
}
elapsed := now.Sub(bucket.lastRefill)
if elapsed < refillEvery {
return
}
tokensToAdd := int(elapsed / refillEvery)
bucket.tokens += tokensToAdd
if bucket.tokens > rl.cfg.MaxConnectionsPerMinute {
bucket.tokens = rl.cfg.MaxConnectionsPerMinute
}
bucket.lastRefill = bucket.lastRefill.Add(time.Duration(tokensToAdd) * refillEvery)
}
// NewCustomDiff stores the default custom difficulty override.
//
// cd := proxy.NewCustomDiff(50000)
func NewCustomDiff(globalDiff uint64) *CustomDiff {
return &CustomDiff{globalDiff: globalDiff}
}
// OnLogin parses miner.User for a "+{number}" suffix and sets miner.CustomDiff.
//
// cd.OnLogin(proxy.Event{Miner: miner})
func (cd *CustomDiff) OnLogin(event Event) {
if event.Miner == nil {
return
}
user := event.Miner.User()
index := strings.LastIndex(user, "+")
if index > 0 {
if value, errorValue := strconv.ParseUint(user[index+1:], 10, 64); errorValue == nil {
event.Miner.SetUser(user[:index])
event.Miner.SetCustomDiff(value)
return
}
}
if cd != nil && cd.globalDiff > 0 {
event.Miner.SetCustomDiff(cd.globalDiff)
}
}

57
runtime_support_test.go Normal file
View file

@ -0,0 +1,57 @@
package proxy
import (
"testing"
"time"
)
func TestRateLimiter_Allow_Good(t *testing.T) {
limiter := NewRateLimiter(RateLimit{MaxConnectionsPerMinute: 2})
if !limiter.Allow("127.0.0.1:1234") {
t.Fatal("expected first connection to pass")
}
}
func TestRateLimiter_Allow_Bad(t *testing.T) {
limiter := NewRateLimiter(RateLimit{MaxConnectionsPerMinute: 1, BanDurationSeconds: 60})
if !limiter.Allow("127.0.0.1:1234") {
t.Fatal("expected first connection to pass")
}
if limiter.Allow("127.0.0.1:1234") {
t.Fatal("expected second connection to be blocked")
}
}
func TestRateLimiter_Allow_Ugly(t *testing.T) {
limiter := NewRateLimiter(RateLimit{MaxConnectionsPerMinute: 1})
limiter.Allow("127.0.0.1:1234")
time.Sleep(time.Second)
limiter.Tick()
if limiter.Allow("127.0.0.1:1234") {
t.Fatal("expected bucket not to refill fully after one second at 1/minute")
}
}
func TestCustomDiff_OnLogin_Good(t *testing.T) {
miner := &Miner{user: "wallet+5000"}
NewCustomDiff(100).OnLogin(Event{Miner: miner})
if miner.User() != "wallet" || miner.CustomDiff() != 5000 {
t.Fatalf("expected parsed custom diff, got user=%q diff=%d", miner.User(), miner.CustomDiff())
}
}
func TestCustomDiff_OnLogin_Bad(t *testing.T) {
miner := &Miner{user: "wallet"}
NewCustomDiff(100).OnLogin(Event{Miner: miner})
if miner.CustomDiff() != 100 {
t.Fatalf("expected fallback diff 100, got %d", miner.CustomDiff())
}
}
func TestCustomDiff_OnLogin_Ugly(t *testing.T) {
miner := &Miner{user: "wallet+bad"}
NewCustomDiff(100).OnLogin(Event{Miner: miner})
if miner.User() != "wallet+bad" || miner.CustomDiff() != 100 {
t.Fatalf("expected invalid suffix to preserve user and apply global diff, got user=%q diff=%d", miner.User(), miner.CustomDiff())
}
}

80
server_runtime.go Normal file
View file

@ -0,0 +1,80 @@
package proxy
import (
"crypto/tls"
"errors"
"net"
"strconv"
)
// NewServer binds one miner-facing TCP listener.
//
// srv, errorValue := proxy.NewServer(bind, nil, limiter, onAccept)
func NewServer(bind BindAddr, tlsCfg *tls.Config, limiter *RateLimiter, onAccept func(net.Conn, uint16)) (*Server, error) {
address := net.JoinHostPort(bind.Host, strconv.Itoa(int(bind.Port)))
listener, errorValue := net.Listen("tcp", address)
if errorValue != nil {
return nil, errorValue
}
return &Server{
addr: bind,
tlsCfg: tlsCfg,
limiter: limiter,
onAccept: onAccept,
listener: listener,
done: make(chan struct{}),
}, nil
}
// Start begins accepting connections in a goroutine.
//
// srv.Start()
func (s *Server) Start() {
if s == nil || s.listener == nil {
return
}
go func() {
for {
conn, errorValue := s.listener.Accept()
if errorValue != nil {
select {
case <-s.done:
return
default:
continue
}
}
if s.limiter != nil && !s.limiter.Allow(conn.RemoteAddr().String()) {
_ = conn.Close()
continue
}
if s.onAccept == nil {
_ = conn.Close()
continue
}
s.onAccept(conn, s.addr.Port)
}
}()
}
// Stop closes the listener.
//
// srv.Stop()
func (s *Server) Stop() {
if s == nil || s.listener == nil {
return
}
select {
case <-s.done:
default:
close(s.done)
}
_ = s.listener.Close()
}
var errServerClosed = errors.New("server closed")

View file

@ -15,7 +15,7 @@ import (
type NonceMapper struct {
id int64
storage *NonceStorage
strategy pool.Strategy // manages pool client lifecycle and failover
strategy pool.Strategy // manages pool client lifecycle and failover
pending map[int64]SubmitContext // sequence → {requestID, minerID}
cfg *proxy.Config
active bool // true once pool has sent at least one job
@ -30,3 +30,45 @@ type SubmitContext struct {
RequestID int64 // JSON-RPC id from the miner's submit request
MinerID int64 // miner that submitted
}
// NewNonceMapper creates one upstream pool mapper and its local slot table.
//
// mapper := nicehash.NewNonceMapper(1, cfg, strategy)
func NewNonceMapper(id int64, cfg *proxy.Config, strategy pool.Strategy) *NonceMapper {
return &NonceMapper{
id: id,
storage: NewNonceStorage(),
strategy: strategy,
cfg: cfg,
pending: make(map[int64]SubmitContext),
}
}
func (m *NonceMapper) Add(miner *proxy.Miner) bool {
return m.storage.Add(miner)
}
func (m *NonceMapper) Remove(miner *proxy.Miner) {
m.storage.Remove(miner)
}
func (m *NonceMapper) Submit(event *proxy.SubmitEvent) {
if event == nil || m.strategy == nil {
return
}
sequence := m.strategy.Submit(event.JobID, event.Nonce, event.Result, event.Algo)
m.mu.Lock()
m.pending[sequence] = SubmitContext{
RequestID: event.RequestID,
MinerID: event.Miner.ID(),
}
m.mu.Unlock()
}
func (m *NonceMapper) IsActive() bool {
if m.strategy == nil {
return false
}
return m.strategy.IsActive()
}

View file

@ -28,3 +28,135 @@ type NonceSplitter struct {
strategyFactory pool.StrategyFactory
mu sync.RWMutex
}
// NewNonceSplitter creates the NiceHash splitter.
//
// s := nicehash.NewNonceSplitter(cfg, bus, factory)
func NewNonceSplitter(cfg *proxy.Config, events *proxy.EventBus, factory pool.StrategyFactory) *NonceSplitter {
return &NonceSplitter{
cfg: cfg,
events: events,
strategyFactory: factory,
mappers: make([]*NonceMapper, 0, 1),
}
}
func (s *NonceSplitter) Connect() {
s.mu.Lock()
defer s.mu.Unlock()
if len(s.mappers) > 0 {
return
}
mapper := s.newMapperLocked()
s.mappers = append(s.mappers, mapper)
mapper.strategy.Connect()
}
func (s *NonceSplitter) OnLogin(event *proxy.LoginEvent) {
if event == nil || event.Miner == nil {
return
}
s.mu.Lock()
defer s.mu.Unlock()
for _, mapper := range s.mappers {
if mapper.Add(event.Miner) {
event.Miner.SetMapperID(mapper.id)
event.Miner.SetNiceHashEnabled(true)
return
}
}
mapper := s.newMapperLocked()
s.mappers = append(s.mappers, mapper)
mapper.strategy.Connect()
if mapper.Add(event.Miner) {
event.Miner.SetMapperID(mapper.id)
event.Miner.SetNiceHashEnabled(true)
}
}
func (s *NonceSplitter) OnSubmit(event *proxy.SubmitEvent) {
if event == nil || event.Miner == nil {
return
}
s.mu.RLock()
defer s.mu.RUnlock()
for _, mapper := range s.mappers {
if mapper.id == event.Miner.MapperID() {
mapper.Submit(event)
return
}
}
}
func (s *NonceSplitter) OnClose(event *proxy.CloseEvent) {
if event == nil || event.Miner == nil {
return
}
s.mu.RLock()
defer s.mu.RUnlock()
for _, mapper := range s.mappers {
if mapper.id == event.Miner.MapperID() {
mapper.Remove(event.Miner)
return
}
}
}
func (s *NonceSplitter) Tick(ticks uint64) {
if ticks%60 == 0 {
s.GC()
}
}
func (s *NonceSplitter) GC() {
s.mu.Lock()
defer s.mu.Unlock()
filtered := s.mappers[:0]
for _, mapper := range s.mappers {
free, dead, active := mapper.storage.SlotCount()
if active == 0 && dead == 0 && free == 256 && len(s.mappers) > 1 {
mapper.strategy.Disconnect()
continue
}
filtered = append(filtered, mapper)
}
s.mappers = filtered
}
func (s *NonceSplitter) Upstreams() proxy.UpstreamStats {
s.mu.RLock()
defer s.mu.RUnlock()
var stats proxy.UpstreamStats
for _, mapper := range s.mappers {
stats.Total++
switch {
case mapper.suspended > 0:
stats.Error++
case mapper.IsActive():
stats.Active++
default:
stats.Sleep++
}
}
return stats
}
func (s *NonceSplitter) newMapperLocked() *NonceMapper {
mapperID := int64(len(s.mappers) + 1)
var strategy pool.Strategy
if s.strategyFactory != nil {
strategy = s.strategyFactory(nil)
}
return NewNonceMapper(mapperID, s.cfg, strategy)
}

View file

@ -23,3 +23,120 @@ type NonceStorage struct {
cursor int // search starts here (round-robin allocation)
mu sync.Mutex
}
// NewNonceStorage allocates the fixed-size miner slot table.
//
// storage := nicehash.NewNonceStorage()
func NewNonceStorage() *NonceStorage {
return &NonceStorage{
miners: make(map[int64]*proxy.Miner),
}
}
// Add finds the next free slot starting from cursor (wrapping), sets slot[index] = minerID,
// and sets the miner fixed byte.
//
// ok := storage.Add(miner)
func (s *NonceStorage) Add(miner *proxy.Miner) bool {
if miner == nil {
return false
}
s.mu.Lock()
defer s.mu.Unlock()
for offset := 0; offset < len(s.slots); offset++ {
index := (s.cursor + offset) % len(s.slots)
if s.slots[index] != 0 {
continue
}
s.slots[index] = miner.ID()
s.miners[miner.ID()] = miner
miner.SetFixedByte(uint8(index))
s.cursor = (index + 1) % len(s.slots)
return true
}
return false
}
// Remove marks slot[miner.FixedByte] as a dead slot until the next SetJob call.
//
// storage.Remove(miner)
func (s *NonceStorage) Remove(miner *proxy.Miner) {
if miner == nil {
return
}
s.mu.Lock()
defer s.mu.Unlock()
index := int(miner.FixedByte())
if index >= 0 && index < len(s.slots) && s.slots[index] == miner.ID() {
s.slots[index] = -miner.ID()
}
delete(s.miners, miner.ID())
}
// SetJob replaces the current job, clears dead slots, and fans the job out to active miners.
//
// storage.SetJob(job)
func (s *NonceStorage) SetJob(job proxy.Job) {
s.mu.Lock()
if s.job.ClientID == job.ClientID || s.job.ClientID == "" {
s.prevJob = s.job
} else {
s.prevJob = proxy.Job{}
}
s.job = job
miners := make([]*proxy.Miner, 0, len(s.miners))
for index, minerID := range s.slots {
if minerID < 0 {
s.slots[index] = 0
continue
}
if minerID > 0 {
if miner := s.miners[minerID]; miner != nil {
miners = append(miners, miner)
}
}
}
s.mu.Unlock()
for _, miner := range miners {
miner.ForwardJob(job, job.Algo)
}
}
// IsValidJobID returns true if id matches the current or previous job ID.
//
// if !storage.IsValidJobID(submitJobID) { reject }
func (s *NonceStorage) IsValidJobID(id string) bool {
s.mu.Lock()
defer s.mu.Unlock()
return id != "" && (id == s.job.JobID || id == s.prevJob.JobID)
}
// SlotCount returns free, dead, and active slot counts for monitoring output.
//
// free, dead, active := storage.SlotCount()
func (s *NonceStorage) SlotCount() (free int, dead int, active int) {
s.mu.Lock()
defer s.mu.Unlock()
for _, slot := range s.slots {
switch {
case slot == 0:
free++
case slot < 0:
dead++
default:
active++
}
}
return free, dead, active
}

View file

@ -0,0 +1,61 @@
package nicehash
import (
"testing"
"dappco.re/go/core/proxy"
)
func TestNonceStorage_Add_Good(t *testing.T) {
storage := NewNonceStorage()
miner := proxy.NewMiner(nil, 0, nil)
miner.SetUser("wallet")
if !storage.Add(miner) {
t.Fatal("expected slot allocation to succeed")
}
}
func TestNonceStorage_Add_Bad(t *testing.T) {
storage := NewNonceStorage()
if storage.Add(nil) {
t.Fatal("expected nil miner allocation to fail")
}
}
func TestNonceStorage_Add_Ugly(t *testing.T) {
storage := NewNonceStorage()
for index := 0; index < 256; index++ {
miner := proxy.NewMiner(nil, 0, nil)
if !storage.Add(miner) {
t.Fatalf("expected miner %d to fit", index)
}
}
if storage.Add(proxy.NewMiner(nil, 0, nil)) {
t.Fatal("expected 257th miner to fail")
}
}
func TestNonceStorage_IsValidJobID_Good(t *testing.T) {
storage := NewNonceStorage()
storage.SetJob(proxy.Job{Blob: "abcd", JobID: "job-1"})
if !storage.IsValidJobID("job-1") {
t.Fatal("expected current job ID to be valid")
}
}
func TestNonceStorage_IsValidJobID_Bad(t *testing.T) {
storage := NewNonceStorage()
storage.SetJob(proxy.Job{Blob: "abcd", JobID: "job-1"})
if storage.IsValidJobID("job-2") {
t.Fatal("expected unknown job ID to be invalid")
}
}
func TestNonceStorage_IsValidJobID_Ugly(t *testing.T) {
storage := NewNonceStorage()
storage.SetJob(proxy.Job{Blob: "abcd", JobID: "job-1", ClientID: "pool-a"})
storage.SetJob(proxy.Job{Blob: "efgh", JobID: "job-2", ClientID: "pool-a"})
if !storage.IsValidJobID("job-1") {
t.Fatal("expected previous job ID from same client to remain valid")
}
}

View file

@ -19,3 +19,10 @@ type SimpleMapper struct {
idleAt time.Time // zero when active
stopped bool
}
// NewSimpleMapper stores the mapper ID and strategy.
//
// mapper := simple.NewSimpleMapper(1, strategy)
func NewSimpleMapper(id int64, strategy pool.Strategy) *SimpleMapper {
return &SimpleMapper{id: id, strategy: strategy}
}

View file

@ -9,6 +9,7 @@ package simple
import (
"sync"
"time"
"dappco.re/go/core/proxy"
"dappco.re/go/core/proxy/pool"
@ -26,3 +27,138 @@ type SimpleSplitter struct {
mu sync.Mutex
seq int64 // monotonic mapper sequence counter
}
// NewSimpleSplitter creates the passthrough splitter.
//
// s := simple.NewSimpleSplitter(cfg, bus, factory)
func NewSimpleSplitter(cfg *proxy.Config, events *proxy.EventBus, factory pool.StrategyFactory) *SimpleSplitter {
return &SimpleSplitter{
active: make(map[int64]*SimpleMapper),
idle: make(map[int64]*SimpleMapper),
cfg: cfg,
events: events,
factory: factory,
}
}
func (s *SimpleSplitter) Connect() {}
func (s *SimpleSplitter) OnLogin(event *proxy.LoginEvent) {
if event == nil || event.Miner == nil {
return
}
s.mu.Lock()
defer s.mu.Unlock()
var mapper *SimpleMapper
for mapperID, idleMapper := range s.idle {
mapper = idleMapper
delete(s.idle, mapperID)
break
}
if mapper == nil {
s.seq++
var strategy pool.Strategy
if s.factory != nil {
strategy = s.factory(nil)
}
mapper = NewSimpleMapper(s.seq, strategy)
if mapper.strategy != nil {
mapper.strategy.Connect()
}
}
mapper.miner = event.Miner
mapper.idleAt = time.Time{}
event.Miner.SetRouteID(mapper.id)
s.active[event.Miner.ID()] = mapper
}
func (s *SimpleSplitter) OnSubmit(event *proxy.SubmitEvent) {
if event == nil || event.Miner == nil {
return
}
s.mu.Lock()
mapper := s.active[event.Miner.ID()]
s.mu.Unlock()
if mapper == nil || mapper.strategy == nil {
return
}
mapper.strategy.Submit(event.JobID, event.Nonce, event.Result, event.Algo)
}
func (s *SimpleSplitter) OnClose(event *proxy.CloseEvent) {
if event == nil || event.Miner == nil {
return
}
s.mu.Lock()
defer s.mu.Unlock()
mapper := s.active[event.Miner.ID()]
if mapper == nil {
return
}
delete(s.active, event.Miner.ID())
mapper.miner = nil
mapper.idleAt = time.Now().UTC()
if s.cfg != nil && s.cfg.ReuseTimeout > 0 {
s.idle[mapper.id] = mapper
return
}
mapper.stopped = true
if mapper.strategy != nil {
mapper.strategy.Disconnect()
}
}
func (s *SimpleSplitter) Tick(ticks uint64) {
if ticks%60 == 0 {
s.GC()
}
}
func (s *SimpleSplitter) GC() {
s.mu.Lock()
defer s.mu.Unlock()
timeout := time.Duration(0)
if s.cfg != nil && s.cfg.ReuseTimeout > 0 {
timeout = time.Duration(s.cfg.ReuseTimeout) * time.Second
}
now := time.Now().UTC()
for mapperID, mapper := range s.idle {
if timeout == 0 || (!mapper.idleAt.IsZero() && now.Sub(mapper.idleAt) > timeout) || mapper.stopped {
if mapper.strategy != nil {
mapper.strategy.Disconnect()
}
delete(s.idle, mapperID)
}
}
}
func (s *SimpleSplitter) Upstreams() proxy.UpstreamStats {
s.mu.Lock()
defer s.mu.Unlock()
stats := proxy.UpstreamStats{
Sleep: uint64(len(s.idle)),
}
for _, mapper := range s.active {
stats.Total++
if mapper.strategy != nil && mapper.strategy.IsActive() {
stats.Active++
} else {
stats.Error++
}
}
stats.Total += uint64(len(s.idle))
return stats
}

164
stats.go
View file

@ -1,6 +1,8 @@
package proxy
import (
"slices"
"sort"
"sync"
"sync/atomic"
"time"
@ -53,8 +55,164 @@ type StatsSummary struct {
Invalid uint64 `json:"invalid"`
Expired uint64 `json:"expired"`
Hashes uint64 `json:"hashes_total"`
AvgTime uint32 `json:"avg_time"` // seconds per accepted share
AvgLatency uint32 `json:"latency"` // median pool response latency in ms
Hashrate [6]float64 `json:"hashrate"` // H/s per window (index = HashrateWindow* constants)
AvgTime uint32 `json:"avg_time"` // seconds per accepted share
AvgLatency uint32 `json:"latency"` // median pool response latency in ms
Hashrate [6]float64 `json:"hashrate"` // H/s per window (index = HashrateWindow* constants)
TopDiff [10]uint64 `json:"best"`
}
var hashrateWindowSizes = [5]int{60, 600, 3600, 43200, 86400}
// NewStats allocates the rolling windows and initialises the clock anchor.
//
// s := proxy.NewStats()
func NewStats() *Stats {
stats := &Stats{
startTime: time.Now().UTC(),
latency: make([]uint16, 0, 128),
}
for index, size := range hashrateWindowSizes {
stats.windows[index] = tickWindow{
buckets: make([]uint64, size),
size: size,
}
}
return stats
}
// OnAccept records an accepted share. Adds diff to the current second's bucket in all windows.
//
// stats.OnAccept(proxy.Event{Diff: 100000, Latency: 82})
func (s *Stats) OnAccept(event Event) {
s.accepted.Add(1)
s.hashes.Add(event.Diff)
if event.Expired {
s.expired.Add(1)
}
s.mu.Lock()
for index := 0; index < HashrateWindowAll; index++ {
s.windows[index].buckets[s.windows[index].pos] += event.Diff
}
insertTopDiff(&s.topDiff, event.Diff)
if event.Latency > 0 {
s.latency = appendCappedLatency(s.latency, event.Latency)
}
s.mu.Unlock()
}
// OnReject records a rejected share. If e.Error indicates low diff or malformed, increments invalid.
//
// stats.OnReject(proxy.Event{Error: "Low difficulty share"})
func (s *Stats) OnReject(event Event) {
s.rejected.Add(1)
if isInvalidShareError(event.Error) {
s.invalid.Add(1)
}
if event.Expired {
s.expired.Add(1)
}
if event.Latency > 0 {
s.mu.Lock()
s.latency = appendCappedLatency(s.latency, event.Latency)
s.mu.Unlock()
}
}
// Tick advances all rolling windows by one second bucket. Called by the proxy tick loop.
//
// stats.Tick()
func (s *Stats) Tick() {
s.mu.Lock()
defer s.mu.Unlock()
for index := 0; index < HashrateWindowAll; index++ {
window := &s.windows[index]
window.pos = (window.pos + 1) % window.size
window.buckets[window.pos] = 0
}
}
// Summary returns a point-in-time snapshot of all stats fields for API serialisation.
//
// summary := stats.Summary()
func (s *Stats) Summary() StatsSummary {
s.mu.Lock()
defer s.mu.Unlock()
var summary StatsSummary
summary.Accepted = s.accepted.Load()
summary.Rejected = s.rejected.Load()
summary.Invalid = s.invalid.Load()
summary.Expired = s.expired.Load()
summary.Hashes = s.hashes.Load()
summary.TopDiff = s.topDiff
for index := 0; index < HashrateWindowAll; index++ {
windowSize := hashrateWindowSizes[index]
summary.Hashrate[index] = float64(sumBuckets(s.windows[index].buckets)) / float64(windowSize)
}
uptimeSeconds := uint64(time.Since(s.startTime).Seconds())
if uptimeSeconds > 0 {
summary.Hashrate[HashrateWindowAll] = float64(summary.Hashes) / float64(uptimeSeconds)
}
if summary.Accepted > 0 && uptimeSeconds > 0 {
summary.AvgTime = uint32(uptimeSeconds / summary.Accepted)
}
if len(s.latency) > 0 {
values := slices.Clone(s.latency)
sort.Slice(values, func(left int, right int) bool {
return values[left] < values[right]
})
summary.AvgLatency = uint32(values[len(values)/2])
}
return summary
}
func appendCappedLatency(latencies []uint16, latency uint16) []uint16 {
if len(latencies) == 10000 {
copy(latencies, latencies[1:])
latencies[len(latencies)-1] = latency
return latencies
}
return append(latencies, latency)
}
func insertTopDiff(topDiff *[10]uint64, difficulty uint64) {
if difficulty == 0 {
return
}
for index, value := range topDiff {
if difficulty <= value {
continue
}
copy(topDiff[index+1:], topDiff[index:len(topDiff)-1])
topDiff[index] = difficulty
return
}
}
func isInvalidShareError(message string) bool {
switch message {
case "Low difficulty share", "Invalid nonce", "Malformed share", "Invalid result":
return true
default:
return false
}
}
func sumBuckets(values []uint64) uint64 {
var total uint64
for _, value := range values {
total += value
}
return total
}

94
stats_workers_test.go Normal file
View file

@ -0,0 +1,94 @@
package proxy
import "testing"
func TestEventBus_Dispatch_Good(t *testing.T) {
bus := NewEventBus()
called := false
bus.Subscribe(EventLogin, func(event Event) {
called = event.Miner != nil
})
bus.Dispatch(Event{Type: EventLogin, Miner: &Miner{}})
if !called {
t.Fatal("expected handler to be called")
}
}
func TestEventBus_Dispatch_Bad(t *testing.T) {
bus := NewEventBus()
bus.Subscribe(EventLogin, nil)
bus.Dispatch(Event{Type: EventLogin})
}
func TestEventBus_Dispatch_Ugly(t *testing.T) {
bus := NewEventBus()
count := 0
bus.Subscribe(EventLogin, func(event Event) { count++ })
bus.Subscribe(EventLogin, func(event Event) { count++ })
bus.Dispatch(Event{Type: EventLogin})
if count != 2 {
t.Fatalf("expected both handlers to run, got %d", count)
}
}
func TestStats_Summary_Good(t *testing.T) {
stats := NewStats()
stats.OnAccept(Event{Diff: 120, Latency: 80})
summary := stats.Summary()
if summary.Accepted != 1 || summary.Hashes != 120 {
t.Fatalf("unexpected summary: %+v", summary)
}
}
func TestStats_Summary_Bad(t *testing.T) {
stats := NewStats()
stats.OnReject(Event{Error: "Low difficulty share"})
summary := stats.Summary()
if summary.Rejected != 1 || summary.Invalid != 1 {
t.Fatalf("unexpected summary: %+v", summary)
}
}
func TestStats_Summary_Ugly(t *testing.T) {
stats := NewStats()
stats.OnAccept(Event{Diff: 100, Latency: 10})
stats.Tick()
stats.OnAccept(Event{Diff: 200, Latency: 20})
summary := stats.Summary()
if summary.TopDiff[0] != 200 || summary.TopDiff[1] != 100 {
t.Fatalf("unexpected best shares: %+v", summary.TopDiff)
}
}
func TestWorkers_List_Good(t *testing.T) {
bus := NewEventBus()
workers := NewWorkers(WorkersByRigID, bus)
miner := &Miner{id: 1, user: "wallet", rigID: "rig-a", ip: "10.0.0.1"}
bus.Dispatch(Event{Type: EventLogin, Miner: miner})
bus.Dispatch(Event{Type: EventAccept, Miner: miner, Diff: 600})
records := workers.List()
if len(records) != 1 || records[0].Name != "rig-a" || records[0].Accepted != 1 {
t.Fatalf("unexpected worker records: %+v", records)
}
}
func TestWorkers_List_Bad(t *testing.T) {
bus := NewEventBus()
workers := NewWorkers(WorkersDisabled, bus)
bus.Dispatch(Event{Type: EventLogin, Miner: &Miner{id: 1, user: "wallet"}})
if len(workers.List()) != 0 {
t.Fatal("expected no worker records when disabled")
}
}
func TestWorkers_List_Ugly(t *testing.T) {
bus := NewEventBus()
workers := NewWorkers(WorkersByRigID, bus)
miner := &Miner{id: 1, user: "wallet", ip: "10.0.0.1"}
bus.Dispatch(Event{Type: EventLogin, Miner: miner})
bus.Dispatch(Event{Type: EventReject, Miner: miner, Error: "Low difficulty share"})
records := workers.List()
if len(records) != 1 || records[0].Name != "wallet" || records[0].Invalid != 1 {
t.Fatalf("unexpected worker records: %+v", records)
}
}

181
worker.go
View file

@ -11,9 +11,9 @@ import (
// w := proxy.NewWorkers(proxy.WorkersByRigID, bus)
type Workers struct {
mode WorkersMode
entries []WorkerRecord // ordered by first-seen (stable)
nameIndex map[string]int // workerName → entries index
idIndex map[int64]int // minerID → entries index
entries []WorkerRecord // ordered by first-seen (stable)
nameIndex map[string]int // workerName → entries index
idIndex map[int64]int // minerID → entries index
mu sync.RWMutex
}
@ -27,7 +27,180 @@ type WorkerRecord struct {
Accepted uint64
Rejected uint64
Invalid uint64
Hashes uint64 // sum of accepted share difficulties
Hashes uint64 // sum of accepted share difficulties
LastHashAt time.Time
windows [5]tickWindow // 60s, 600s, 3600s, 12h, 24h
}
// Hashrate returns the H/s for a given window (seconds: 60, 600, 3600, 43200, 86400).
//
// hr60 := record.Hashrate(60)
func (r *WorkerRecord) Hashrate(seconds int) float64 {
for index, windowSize := range hashrateWindowSizes {
if windowSize == seconds {
return float64(sumBuckets(r.windows[index].buckets)) / float64(seconds)
}
}
return 0
}
// NewWorkers creates the worker aggregate and subscribes it to the event bus.
//
// w := proxy.NewWorkers(proxy.WorkersByRigID, bus)
func NewWorkers(mode WorkersMode, bus *EventBus) *Workers {
workers := &Workers{
mode: mode,
entries: make([]WorkerRecord, 0),
nameIndex: make(map[string]int),
idIndex: make(map[int64]int),
}
if bus != nil {
bus.Subscribe(EventLogin, workers.onLogin)
bus.Subscribe(EventAccept, workers.onAccept)
bus.Subscribe(EventReject, workers.onReject)
bus.Subscribe(EventClose, workers.onClose)
}
return workers
}
// List returns a snapshot of all worker records in first-seen order.
//
// records := workers.List()
func (w *Workers) List() []WorkerRecord {
w.mu.RLock()
defer w.mu.RUnlock()
records := make([]WorkerRecord, len(w.entries))
copy(records, w.entries)
return records
}
// Tick advances all worker hashrate windows. Called by the proxy tick loop every second.
//
// workers.Tick()
func (w *Workers) Tick() {
w.mu.Lock()
defer w.mu.Unlock()
for entryIndex := range w.entries {
for windowIndex, size := range hashrateWindowSizes {
if windowIndex >= len(w.entries[entryIndex].windows) {
break
}
window := &w.entries[entryIndex].windows[windowIndex]
if window.size == 0 {
window.size = size
window.buckets = make([]uint64, size)
}
window.pos = (window.pos + 1) % window.size
window.buckets[window.pos] = 0
}
}
}
func (w *Workers) onLogin(event Event) {
if event.Miner == nil || w.mode == WorkersDisabled {
return
}
name := w.workerName(event.Miner)
if name == "" {
return
}
w.mu.Lock()
defer w.mu.Unlock()
index, exists := w.nameIndex[name]
if !exists {
record := WorkerRecord{Name: name}
for windowIndex, size := range hashrateWindowSizes {
if windowIndex >= len(record.windows) {
break
}
record.windows[windowIndex] = tickWindow{
buckets: make([]uint64, size),
size: size,
}
}
w.entries = append(w.entries, record)
index = len(w.entries) - 1
w.nameIndex[name] = index
}
record := &w.entries[index]
record.LastIP = event.Miner.IP()
record.Connections++
w.idIndex[event.Miner.ID()] = index
}
func (w *Workers) onAccept(event Event) {
w.updateShare(event, true)
}
func (w *Workers) onReject(event Event) {
w.updateShare(event, false)
}
func (w *Workers) onClose(event Event) {
if event.Miner == nil {
return
}
w.mu.Lock()
defer w.mu.Unlock()
delete(w.idIndex, event.Miner.ID())
}
func (w *Workers) updateShare(event Event, accepted bool) {
if event.Miner == nil || w.mode == WorkersDisabled {
return
}
w.mu.Lock()
defer w.mu.Unlock()
index, exists := w.idIndex[event.Miner.ID()]
if !exists {
return
}
record := &w.entries[index]
if accepted {
record.Accepted++
record.Hashes += event.Diff
record.LastHashAt = time.Now().UTC()
for windowIndex := range record.windows {
record.windows[windowIndex].buckets[record.windows[windowIndex].pos] += event.Diff
}
return
}
record.Rejected++
if isInvalidShareError(event.Error) {
record.Invalid++
}
}
func (w *Workers) workerName(miner *Miner) string {
switch w.mode {
case WorkersByRigID:
if miner.RigID() != "" {
return miner.RigID()
}
return miner.User()
case WorkersByUser:
return miner.User()
case WorkersByPass:
return miner.Password()
case WorkersByAgent:
return miner.Agent()
case WorkersByIP:
return miner.IP()
default:
return ""
}
}