[agent/codex:gpt-5.4-mini] Read docs/RFC.md fully. Find features described in the specs... #1

Merged
Virgil merged 1 commit from agent/read-docs-rfc-md-fully--find-features-de into dev 2026-04-04 10:29:13 +00:00
22 changed files with 3116 additions and 66 deletions

View file

@ -9,6 +9,18 @@
// proxyapi.RegisterRoutes(apiRouter, p)
package api
import (
"encoding/json"
"net/http"
"dappco.re/go/core/proxy"
)
// Router matches the standard http.ServeMux registration shape.
type Router interface {
HandleFunc(pattern string, handler func(http.ResponseWriter, *http.Request))
}
// SummaryResponse is the /1/summary JSON body.
//
// {"version":"1.0.0","mode":"nicehash","hashrate":{"total":[...]}, ...}
@ -61,3 +73,115 @@ type ResultsResponse struct {
HashesTotal uint64 `json:"hashes_total"`
Best [10]uint64 `json:"best"`
}
// RegisterRoutes wires the monitoring endpoints onto the supplied router.
func RegisterRoutes(r Router, p *proxy.Proxy) {
if r == nil || p == nil {
return
}
r.HandleFunc("/1/summary", func(w http.ResponseWriter, req *http.Request) {
writeJSON(w, summaryResponse(p))
})
r.HandleFunc("/1/workers", func(w http.ResponseWriter, req *http.Request) {
writeJSON(w, workersResponse(p))
})
r.HandleFunc("/1/miners", func(w http.ResponseWriter, req *http.Request) {
writeJSON(w, minersResponse(p))
})
}
func summaryResponse(p *proxy.Proxy) SummaryResponse {
summary := p.Summary()
now, max := p.MinerCount()
upstreams := p.Upstreams()
return SummaryResponse{
Version: "1.0.0",
Mode: p.Mode(),
Hashrate: HashrateResponse{
Total: summary.Hashrate,
},
Miners: MinersCountResponse{
Now: now,
Max: max,
},
Workers: uint64(len(p.WorkerRecords())),
Upstreams: UpstreamResponse{
Active: upstreams.Active,
Sleep: upstreams.Sleep,
Error: upstreams.Error,
Total: upstreams.Total,
Ratio: ratio(now, upstreams.Total),
},
Results: ResultsResponse{
Accepted: summary.Accepted,
Rejected: summary.Rejected,
Invalid: summary.Invalid,
Expired: summary.Expired,
AvgTime: summary.AvgTime,
Latency: summary.AvgLatency,
HashesTotal: summary.Hashes,
Best: summary.TopDiff,
},
}
}
func workersResponse(p *proxy.Proxy) any {
records := p.WorkerRecords()
rows := make([]any, 0, len(records))
for _, record := range records {
rows = append(rows, []any{
record.Name,
record.LastIP,
record.Connections,
record.Accepted,
record.Rejected,
record.Invalid,
record.Hashes,
record.LastHashAt.Unix(),
record.Hashrate(60),
record.Hashrate(600),
record.Hashrate(3600),
record.Hashrate(43200),
record.Hashrate(86400),
})
}
return map[string]any{
"mode": string(p.WorkersMode()),
"workers": rows,
}
}
func minersResponse(p *proxy.Proxy) any {
records := p.MinerSnapshots()
rows := make([]any, 0, len(records))
for _, miner := range records {
rows = append(rows, []any{
miner.ID,
miner.IP,
miner.TX,
miner.RX,
miner.State,
miner.Diff,
miner.User,
miner.Password,
miner.RigID,
miner.Agent,
})
}
return map[string]any{
"format": []string{"id", "ip", "tx", "rx", "state", "diff", "user", "password", "rig_id", "agent"},
"miners": rows,
}
}
func writeJSON(w http.ResponseWriter, payload any) {
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(payload)
}
func ratio(now, total uint64) float64 {
if total == 0 {
return 0
}
return float64(now) / float64(total)
}

View file

@ -5,22 +5,22 @@ package proxy
// cfg, result := proxy.LoadConfig("config.json")
// if !result.OK { log.Fatal(result.Error) }
type Config struct {
Mode string `json:"mode"` // "nicehash" or "simple"
Bind []BindAddr `json:"bind"` // listen addresses
Pools []PoolConfig `json:"pools"` // ordered primary + fallbacks
TLS TLSConfig `json:"tls"` // inbound TLS (miner-facing)
HTTP HTTPConfig `json:"http"` // monitoring API
AccessPassword string `json:"access-password"` // "" = no auth required
CustomDiff uint64 `json:"custom-diff"` // 0 = disabled
CustomDiffStats bool `json:"custom-diff-stats"` // report per custom-diff bucket
AlgoExtension bool `json:"algo-ext"` // forward algo field in jobs
Workers WorkersMode `json:"workers"` // "rig-id", "user", "password", "agent", "ip", "false"
AccessLogFile string `json:"access-log-file"` // "" = disabled
ReuseTimeout int `json:"reuse-timeout"` // seconds; simple mode upstream reuse
Retries int `json:"retries"` // pool reconnect attempts
RetryPause int `json:"retry-pause"` // seconds between retries
Watch bool `json:"watch"` // hot-reload on file change
RateLimit RateLimit `json:"rate-limit"` // per-IP connection rate limit
Mode string `json:"mode"` // "nicehash" or "simple"
Bind []BindAddr `json:"bind"` // listen addresses
Pools []PoolConfig `json:"pools"` // ordered primary + fallbacks
TLS TLSConfig `json:"tls"` // inbound TLS (miner-facing)
HTTP HTTPConfig `json:"http"` // monitoring API
AccessPassword string `json:"access-password"` // "" = no auth required
CustomDiff uint64 `json:"custom-diff"` // 0 = disabled
CustomDiffStats bool `json:"custom-diff-stats"` // report per custom-diff bucket
AlgoExtension bool `json:"algo-ext"` // forward algo field in jobs
Workers WorkersMode `json:"workers"` // "rig-id", "user", "password", "agent", "ip", "false"
AccessLogFile string `json:"access-log-file"` // "" = disabled
ReuseTimeout int `json:"reuse-timeout"` // seconds; simple mode upstream reuse
Retries int `json:"retries"` // pool reconnect attempts
RetryPause int `json:"retry-pause"` // seconds between retries
Watch bool `json:"watch"` // hot-reload on file change
RateLimit RateLimit `json:"rate-limit"` // per-IP connection rate limit
}
// BindAddr is one TCP listen endpoint.
@ -81,7 +81,7 @@ type RateLimit struct {
type WorkersMode string
const (
WorkersByRigID WorkersMode = "rig-id" // rigid field, fallback to user
WorkersByRigID WorkersMode = "rig-id" // rigid field, fallback to user
WorkersByUser WorkersMode = "user"
WorkersByPass WorkersMode = "password"
WorkersByAgent WorkersMode = "agent"

353
core_impl.go Normal file
View file

@ -0,0 +1,353 @@
package proxy
import (
"crypto/rand"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"errors"
"io"
"math"
"net"
"os"
"strconv"
"strings"
"sync"
"time"
)
// Result is a small success/error carrier used by constructors and loaders.
type Result struct {
OK bool
Error error
}
func successResult() Result {
return Result{OK: true}
}
func errorResult(err error) Result {
return Result{OK: false, Error: err}
}
var splitterRegistryMu sync.RWMutex
var splitterRegistry = map[string]func(*Config, *EventBus) Splitter{}
// RegisterSplitterFactory registers a mode-specific splitter constructor.
// Packages such as splitter/nicehash and splitter/simple call this from init.
func RegisterSplitterFactory(mode string, factory func(*Config, *EventBus) Splitter) {
splitterRegistryMu.Lock()
defer splitterRegistryMu.Unlock()
splitterRegistry[strings.ToLower(mode)] = factory
}
func getSplitterFactory(mode string) (func(*Config, *EventBus) Splitter, bool) {
splitterRegistryMu.RLock()
defer splitterRegistryMu.RUnlock()
factory, ok := splitterRegistry[strings.ToLower(mode)]
return factory, ok
}
// LoadConfig reads and unmarshals a JSON config file.
func LoadConfig(path string) (*Config, Result) {
data, err := os.ReadFile(path)
if err != nil {
return nil, errorResult(err)
}
cfg := &Config{}
if err := json.Unmarshal(data, cfg); err != nil {
return nil, errorResult(err)
}
if cfg.Mode == "" {
cfg.Mode = "nicehash"
}
return cfg, cfg.Validate()
}
// Validate checks that mandatory bind and pool settings are present.
func (c *Config) Validate() Result {
if c == nil {
return errorResult(errors.New("config is nil"))
}
if len(c.Bind) == 0 {
return errorResult(errors.New("bind list is empty"))
}
if len(c.Pools) == 0 {
return errorResult(errors.New("pool list is empty"))
}
for _, pool := range c.Pools {
if pool.Enabled && strings.TrimSpace(pool.URL) == "" {
return errorResult(errors.New("enabled pool url is empty"))
}
}
return successResult()
}
// NewEventBus creates an empty synchronous event dispatcher.
func NewEventBus() *EventBus {
return &EventBus{listeners: make(map[EventType][]EventHandler)}
}
// Subscribe registers a handler for the given event type.
func (b *EventBus) Subscribe(t EventType, h EventHandler) {
if b == nil || h == nil {
return
}
b.mu.Lock()
defer b.mu.Unlock()
if b.listeners == nil {
b.listeners = make(map[EventType][]EventHandler)
}
b.listeners[t] = append(b.listeners[t], h)
}
// Dispatch calls all registered handlers for the event's type.
func (b *EventBus) Dispatch(e Event) {
if b == nil {
return
}
b.mu.RLock()
handlers := append([]EventHandler(nil), b.listeners[e.Type]...)
b.mu.RUnlock()
for _, handler := range handlers {
handler(e)
}
}
// IsValid returns true when the job contains a blob and job id.
func (j Job) IsValid() bool {
return j.Blob != "" && j.JobID != ""
}
// BlobWithFixedByte replaces the blob byte at position 39 with fixedByte.
func (j Job) BlobWithFixedByte(fixedByte uint8) string {
if len(j.Blob) < 80 {
return j.Blob
}
blob := []byte(j.Blob)
encoded := make([]byte, 2)
hex.Encode(encoded, []byte{fixedByte})
blob[78] = encoded[0]
blob[79] = encoded[1]
return string(blob)
}
// DifficultyFromTarget converts the target to a rough integer difficulty.
func (j Job) DifficultyFromTarget() uint64 {
if len(j.Target) != 8 {
return 0
}
raw, err := hex.DecodeString(j.Target)
if err != nil || len(raw) != 4 {
return 0
}
target := uint32(raw[0]) | uint32(raw[1])<<8 | uint32(raw[2])<<16 | uint32(raw[3])<<24
if target == 0 {
return 0
}
return uint64(math.MaxUint32 / uint64(target))
}
// NewCustomDiff creates a login-time custom difficulty resolver.
func NewCustomDiff(globalDiff uint64) *CustomDiff {
return &CustomDiff{globalDiff: globalDiff}
}
// OnLogin parses +N suffixes and applies global difficulty fallbacks.
func (cd *CustomDiff) OnLogin(e Event) {
if cd == nil || e.Miner == nil {
return
}
miner := e.Miner
user := miner.user
plus := strings.LastIndex(user, "+")
if plus >= 0 && plus < len(user)-1 {
if parsed, err := strconv.ParseUint(user[plus+1:], 10, 64); err == nil {
miner.user = user[:plus]
miner.customDiff = parsed
}
return
}
if cd.globalDiff > 0 {
miner.customDiff = cd.globalDiff
}
}
// NewRateLimiter creates a per-IP token bucket limiter.
func NewRateLimiter(cfg RateLimit) *RateLimiter {
return &RateLimiter{
cfg: cfg,
buckets: make(map[string]*tokenBucket),
banned: make(map[string]time.Time),
}
}
// Allow returns true if the IP address is permitted to open a new connection.
func (rl *RateLimiter) Allow(ip string) bool {
if rl == nil || rl.cfg.MaxConnectionsPerMinute <= 0 {
return true
}
host := hostOnly(ip)
now := time.Now()
rl.mu.Lock()
defer rl.mu.Unlock()
if until, banned := rl.banned[host]; banned {
if now.Before(until) {
return false
}
delete(rl.banned, host)
}
bucket, ok := rl.buckets[host]
if !ok {
bucket = &tokenBucket{tokens: rl.cfg.MaxConnectionsPerMinute, lastRefill: now}
rl.buckets[host] = bucket
}
refillBucket(bucket, rl.cfg.MaxConnectionsPerMinute, now)
if bucket.tokens <= 0 {
if rl.cfg.BanDurationSeconds > 0 {
rl.banned[host] = now.Add(time.Duration(rl.cfg.BanDurationSeconds) * time.Second)
}
return false
}
bucket.tokens--
bucket.lastRefill = now
return true
}
// Tick removes expired ban entries and refills token buckets.
func (rl *RateLimiter) Tick() {
if rl == nil || rl.cfg.MaxConnectionsPerMinute <= 0 {
return
}
now := time.Now()
rl.mu.Lock()
defer rl.mu.Unlock()
for host, until := range rl.banned {
if !now.Before(until) {
delete(rl.banned, host)
}
}
for _, bucket := range rl.buckets {
refillBucket(bucket, rl.cfg.MaxConnectionsPerMinute, now)
}
}
// NewConfigWatcher creates a polling watcher for a config file.
func NewConfigWatcher(path string, onChange func(*Config)) *ConfigWatcher {
return &ConfigWatcher{
path: path,
onChange: onChange,
done: make(chan struct{}),
}
}
// Start begins the 1-second polling loop.
func (w *ConfigWatcher) Start() {
if w == nil || w.path == "" || w.onChange == nil {
return
}
go func() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
info, err := os.Stat(w.path)
if err != nil {
continue
}
mod := info.ModTime()
if mod.After(w.lastMod) {
w.lastMod = mod
cfg, result := LoadConfig(w.path)
if result.OK && cfg != nil {
w.onChange(cfg)
}
}
case <-w.done:
return
}
}
}()
}
// Stop ends the watcher goroutine.
func (w *ConfigWatcher) Stop() {
if w == nil {
return
}
select {
case <-w.done:
default:
close(w.done)
}
}
func hostOnly(ip string) string {
host, _, err := net.SplitHostPort(ip)
if err == nil {
return host
}
return ip
}
func refillBucket(bucket *tokenBucket, limit int, now time.Time) {
if bucket == nil || limit <= 0 {
return
}
if bucket.lastRefill.IsZero() {
bucket.lastRefill = now
if bucket.tokens <= 0 {
bucket.tokens = limit
}
return
}
interval := time.Duration(60/limit) * time.Second
if interval <= 0 {
interval = time.Second
}
elapsed := now.Sub(bucket.lastRefill)
if elapsed < interval {
return
}
add := int(elapsed / interval)
bucket.tokens += add
if bucket.tokens > limit {
bucket.tokens = limit
}
bucket.lastRefill = bucket.lastRefill.Add(time.Duration(add) * interval)
}
func generateUUID() string {
var b [16]byte
if _, err := io.ReadFull(rand.Reader, b[:]); err != nil {
return strconv.FormatInt(time.Now().UnixNano(), 16)
}
b[6] = (b[6] & 0x0f) | 0x40
b[8] = (b[8] & 0x3f) | 0x80
var out [36]byte
hex.Encode(out[0:8], b[0:4])
out[8] = '-'
hex.Encode(out[9:13], b[4:6])
out[13] = '-'
hex.Encode(out[14:18], b[6:8])
out[18] = '-'
hex.Encode(out[19:23], b[8:10])
out[23] = '-'
hex.Encode(out[24:36], b[10:16])
return string(out[:])
}
func sha256Hex(data []byte) string {
sum := sha256.Sum256(data)
return hex.EncodeToString(sum[:])
}

30
customdiff_test.go Normal file
View file

@ -0,0 +1,30 @@
package proxy
import "testing"
func TestCustomDiff_OnLogin(t *testing.T) {
cd := NewCustomDiff(10000)
miner := &Miner{user: "WALLET+50000"}
cd.OnLogin(Event{Miner: miner})
if miner.User() != "WALLET" {
t.Fatalf("expected stripped user, got %q", miner.User())
}
if miner.customDiff != 50000 {
t.Fatalf("expected custom diff 50000, got %d", miner.customDiff)
}
miner = &Miner{user: "WALLET+abc"}
cd.OnLogin(Event{Miner: miner})
if miner.User() != "WALLET+abc" {
t.Fatalf("expected invalid suffix to remain unchanged")
}
if miner.customDiff != 0 {
t.Fatalf("expected custom diff 0 for invalid suffix, got %d", miner.customDiff)
}
miner = &Miner{user: "WALLET"}
cd.OnLogin(Event{Miner: miner})
if miner.customDiff != 10000 {
t.Fatalf("expected global diff fallback, got %d", miner.customDiff)
}
}

24
job_test.go Normal file
View file

@ -0,0 +1,24 @@
package proxy
import (
"strings"
"testing"
)
func TestJob_BlobWithFixedByte(t *testing.T) {
job := Job{Blob: strings.Repeat("0", 160)}
got := job.BlobWithFixedByte(0x2A)
if len(got) != 160 {
t.Fatalf("expected length 160, got %d", len(got))
}
if got[78:80] != "2a" {
t.Fatalf("expected fixed byte patch, got %q", got[78:80])
}
}
func TestJob_DifficultyFromTarget(t *testing.T) {
job := Job{Target: "b88d0600"}
if got := job.DifficultyFromTarget(); got == 0 {
t.Fatalf("expected non-zero difficulty")
}
}

View file

@ -5,7 +5,10 @@
// bus.Subscribe(proxy.EventClose, al.OnClose)
package log
import "sync"
import (
"os"
"sync"
)
// AccessLog writes connection lifecycle lines to an append-only text file.
//
@ -18,6 +21,5 @@ import "sync"
type AccessLog struct {
path string
mu sync.Mutex
// f is opened append-only on first write; nil until first event.
// Uses core.File for I/O abstraction.
f *os.File
}

140
log/impl.go Normal file
View file

@ -0,0 +1,140 @@
package log
import (
"os"
"strconv"
"strings"
"time"
"dappco.re/go/core/proxy"
)
// NewAccessLog creates an append-only access log.
func NewAccessLog(path string) *AccessLog {
return &AccessLog{path: path}
}
// OnLogin writes a CONNECT line.
func (l *AccessLog) OnLogin(e proxy.Event) {
if l == nil || e.Miner == nil {
return
}
l.writeLine("CONNECT", e.Miner.IP(), e.Miner.User(), e.Miner.Agent(), 0, 0)
}
// OnClose writes a CLOSE line with byte counts.
func (l *AccessLog) OnClose(e proxy.Event) {
if l == nil || e.Miner == nil {
return
}
l.writeLine("CLOSE", e.Miner.IP(), e.Miner.User(), "", e.Miner.RX(), e.Miner.TX())
}
// NewShareLog creates an append-only share log.
func NewShareLog(path string) *ShareLog {
return &ShareLog{path: path}
}
// OnAccept writes an ACCEPT line.
func (l *ShareLog) OnAccept(e proxy.Event) {
if l == nil || e.Miner == nil {
return
}
l.writeAcceptLine(e.Miner.User(), e.Diff, uint64(e.Latency))
}
// OnReject writes a REJECT line.
func (l *ShareLog) OnReject(e proxy.Event) {
if l == nil || e.Miner == nil {
return
}
l.writeRejectLine(e.Miner.User(), e.Error)
}
func (l *AccessLog) writeLine(kind, ip, user, agent string, rx, tx uint64) {
l.mu.Lock()
defer l.mu.Unlock()
if err := l.ensureFile(); err != nil {
return
}
var builder strings.Builder
builder.WriteString(time.Now().UTC().Format(time.RFC3339))
builder.WriteByte(' ')
builder.WriteString(kind)
builder.WriteString(" ")
builder.WriteString(ip)
builder.WriteString(" ")
builder.WriteString(user)
if agent != "" {
builder.WriteString(" ")
builder.WriteString(agent)
}
if rx > 0 || tx > 0 {
builder.WriteString(" rx=")
builder.WriteString(strconv.FormatUint(rx, 10))
builder.WriteString(" tx=")
builder.WriteString(strconv.FormatUint(tx, 10))
}
builder.WriteByte('\n')
_, _ = l.f.WriteString(builder.String())
}
func (l *ShareLog) writeAcceptLine(user string, diff uint64, latency uint64) {
l.mu.Lock()
defer l.mu.Unlock()
if err := l.ensureFile(); err != nil {
return
}
var builder strings.Builder
builder.WriteString(time.Now().UTC().Format(time.RFC3339))
builder.WriteString(" ACCEPT")
builder.WriteString(" ")
builder.WriteString(user)
builder.WriteString(" diff=")
builder.WriteString(strconv.FormatUint(diff, 10))
builder.WriteString(" latency=")
builder.WriteString(strconv.FormatUint(latency, 10))
builder.WriteString("ms")
builder.WriteByte('\n')
_, _ = l.f.WriteString(builder.String())
}
func (l *ShareLog) writeRejectLine(user, reason string) {
l.mu.Lock()
defer l.mu.Unlock()
if err := l.ensureFile(); err != nil {
return
}
var builder strings.Builder
builder.WriteString(time.Now().UTC().Format(time.RFC3339))
builder.WriteString(" REJECT ")
builder.WriteString(user)
builder.WriteString(" reason=\"")
builder.WriteString(reason)
builder.WriteString("\"\n")
_, _ = l.f.WriteString(builder.String())
}
func (l *AccessLog) ensureFile() error {
if l.f != nil {
return nil
}
f, err := os.OpenFile(l.path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0o644)
if err != nil {
return err
}
l.f = f
return nil
}
func (l *ShareLog) ensureFile() error {
if l.f != nil {
return nil
}
f, err := os.OpenFile(l.path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0o644)
if err != nil {
return err
}
l.f = f
return nil
}

View file

@ -1,6 +1,9 @@
package log
import "sync"
import (
"os"
"sync"
)
// ShareLog writes share result lines to an append-only text file.
//
@ -13,6 +16,5 @@ import "sync"
type ShareLog struct {
path string
mu sync.Mutex
// f is opened append-only on first write; nil until first event.
// Uses core.File for I/O abstraction.
f *os.File
}

View file

@ -25,28 +25,35 @@ const (
// m := proxy.NewMiner(conn, 3333, nil)
// m.Start()
type Miner struct {
id int64 // monotonically increasing per-process; atomic assignment
rpcID string // UUID v4 sent to miner as session id
id int64 // monotonically increasing per-process; atomic assignment
rpcID string // UUID v4 sent to miner as session id
state MinerState
extAlgo bool // miner sent algo list in login params
extNH bool // NiceHash mode active (fixed byte splitting)
ip string // remote IP (without port, for logging)
extAlgo bool // miner sent algo list in login params
extNH bool // NiceHash mode active (fixed byte splitting)
ip string // remote IP (without port, for logging)
localPort uint16
user string // login params.login (wallet address), custom diff suffix stripped
password string // login params.pass
agent string // login params.agent
rigID string // login params.rigid (optional extension)
fixedByte uint8 // NiceHash slot index (0-255)
mapperID int64 // which NonceMapper owns this miner; -1 = unassigned
routeID int64 // SimpleMapper ID in simple mode; -1 = unassigned
customDiff uint64 // 0 = use pool diff; non-zero = cap diff to this value
diff uint64 // last difficulty sent to this miner from the pool
rx uint64 // bytes received from miner
tx uint64 // bytes sent to miner
user string // login params.login (wallet address), custom diff suffix stripped
password string // login params.pass
agent string // login params.agent
rigID string // login params.rigid (optional extension)
fixedByte uint8 // NiceHash slot index (0-255)
mapperID int64 // which NonceMapper owns this miner; -1 = unassigned
routeID int64 // SimpleMapper ID in simple mode; -1 = unassigned
customDiff uint64 // 0 = use pool diff; non-zero = cap diff to this value
accessPassword string
globalDiff uint64
diff uint64 // last difficulty sent to this miner from the pool
rx uint64 // bytes received from miner
tx uint64 // bytes sent from miner
currentJob Job
connectedAt time.Time
lastActivityAt time.Time
conn net.Conn
tlsConn *tls.Conn // nil if plain TCP
sendMu sync.Mutex // serialises writes to conn
tlsConn *tls.Conn // nil if plain TCP
sendMu sync.Mutex // serialises writes to conn
buf [16384]byte // per-miner send buffer; avoids per-write allocations
onLogin func(*Miner)
onSubmit func(*Miner, *SubmitEvent)
onClose func(*Miner)
closeOnce sync.Once
}

View file

@ -19,14 +19,17 @@ import (
// client := pool.NewStratumClient(poolCfg, listener)
// client.Connect()
type StratumClient struct {
cfg proxy.PoolConfig
listener StratumListener
conn net.Conn
tlsConn *tls.Conn // nil if plain TCP
sessionID string // pool-assigned session id from login reply
seq int64 // atomic JSON-RPC request id counter
active bool // true once first job received
sendMu sync.Mutex
cfg proxy.PoolConfig
listener StratumListener
conn net.Conn
tlsConn *tls.Conn // nil if plain TCP
sessionID string // pool-assigned session id from login reply
seq int64 // atomic JSON-RPC request id counter
active bool // true once first job received
pending map[int64]struct{}
closedOnce sync.Once
mu sync.Mutex
sendMu sync.Mutex
}
// StratumListener receives events from the pool connection.

446
pool/impl.go Normal file
View file

@ -0,0 +1,446 @@
package pool
import (
"bufio"
"crypto/sha256"
"crypto/tls"
"encoding/hex"
"encoding/json"
"errors"
"io"
"net"
"strconv"
"strings"
"sync/atomic"
"time"
"dappco.re/go/core/proxy"
)
// NewStrategyFactory creates a StrategyFactory for the supplied config.
func NewStrategyFactory(cfg *proxy.Config) StrategyFactory {
return func(listener StratumListener) Strategy {
return NewFailoverStrategy(cfg.Pools, listener, cfg)
}
}
// NewStratumClient constructs a pool client.
func NewStratumClient(cfg proxy.PoolConfig, listener StratumListener) *StratumClient {
return &StratumClient{
cfg: cfg,
listener: listener,
pending: make(map[int64]struct{}),
}
}
// IsActive reports whether the client has received at least one job.
func (c *StratumClient) IsActive() bool {
if c == nil {
return false
}
c.mu.Lock()
defer c.mu.Unlock()
return c.active
}
// Connect dials the pool.
func (c *StratumClient) Connect() proxy.Result {
if c == nil {
return proxy.Result{OK: false, Error: errors.New("client is nil")}
}
addr := c.cfg.URL
if addr == "" {
return proxy.Result{OK: false, Error: errors.New("pool url is empty")}
}
conn, err := net.Dial("tcp", addr)
if err != nil {
return proxy.Result{OK: false, Error: err}
}
if c.cfg.TLS {
host := addr
if strings.Contains(addr, ":") {
host, _, _ = net.SplitHostPort(addr)
}
tlsCfg := &tls.Config{InsecureSkipVerify: true, ServerName: host}
tlsConn := tls.Client(conn, tlsCfg)
if err := tlsConn.Handshake(); err != nil {
_ = conn.Close()
return proxy.Result{OK: false, Error: err}
}
if fp := strings.TrimSpace(strings.ToLower(c.cfg.TLSFingerprint)); fp != "" {
cert := tlsConn.ConnectionState().PeerCertificates
if len(cert) == 0 {
_ = tlsConn.Close()
return proxy.Result{OK: false, Error: errors.New("missing certificate")}
}
sum := sha256.Sum256(cert[0].Raw)
if hex.EncodeToString(sum[:]) != fp {
_ = tlsConn.Close()
return proxy.Result{OK: false, Error: errors.New("tls fingerprint mismatch")}
}
}
c.conn = tlsConn
c.tlsConn = tlsConn
} else {
c.conn = conn
}
go c.readLoop()
return proxy.Result{OK: true}
}
// Login sends the miner-style login request to the pool.
func (c *StratumClient) Login() {
if c == nil || c.conn == nil {
return
}
params := map[string]any{
"login": c.cfg.User,
"pass": c.cfg.Pass,
}
if c.cfg.RigID != "" {
params["rigid"] = c.cfg.RigID
}
if c.cfg.Algo != "" {
params["algo"] = []string{c.cfg.Algo}
}
req := map[string]any{
"id": 1,
"jsonrpc": "2.0",
"method": "login",
"params": params,
}
_ = c.writeJSON(req)
}
// Submit forwards a share to the pool.
func (c *StratumClient) Submit(jobID, nonce, result, algo string) int64 {
if c == nil {
return 0
}
seq := atomic.AddInt64(&c.seq, 1)
c.mu.Lock()
c.pending[seq] = struct{}{}
sessionID := c.sessionID
c.mu.Unlock()
req := map[string]any{
"id": seq,
"jsonrpc": "2.0",
"method": "submit",
"params": map[string]any{
"id": sessionID,
"job_id": jobID,
"nonce": nonce,
"result": result,
"algo": algo,
},
}
_ = c.writeJSON(req)
return seq
}
// Disconnect closes the connection and notifies the listener.
func (c *StratumClient) Disconnect() {
if c == nil {
return
}
c.closedOnce.Do(func() {
if c.conn != nil {
_ = c.conn.Close()
}
if c.listener != nil {
c.listener.OnDisconnect()
}
})
}
func (c *StratumClient) notifyDisconnect() {
c.closedOnce.Do(func() {
if c.listener != nil {
c.listener.OnDisconnect()
}
})
}
func (c *StratumClient) writeJSON(payload any) error {
c.sendMu.Lock()
defer c.sendMu.Unlock()
if c.conn == nil {
return errors.New("connection is nil")
}
data, err := json.Marshal(payload)
if err != nil {
return err
}
data = append(data, '\n')
_, err = c.conn.Write(data)
if err != nil {
c.notifyDisconnect()
}
return err
}
func (c *StratumClient) readLoop() {
defer c.notifyDisconnect()
reader := bufio.NewReader(c.conn)
for {
line, isPrefix, err := reader.ReadLine()
if err != nil {
if err == io.EOF {
return
}
return
}
if isPrefix {
return
}
if len(line) == 0 {
continue
}
c.handleMessage(line)
}
}
func (c *StratumClient) handleMessage(line []byte) {
var base struct {
ID any `json:"id"`
Method string `json:"method"`
Result json.RawMessage `json:"result"`
Error json.RawMessage `json:"error"`
Params json.RawMessage `json:"params"`
}
if err := json.Unmarshal(line, &base); err != nil {
return
}
if len(base.Result) > 0 {
var loginReply struct {
ID string `json:"id"`
Job *struct {
Blob string `json:"blob"`
JobID string `json:"job_id"`
Target string `json:"target"`
Algo string `json:"algo"`
Height uint64 `json:"height"`
SeedHash string `json:"seed_hash"`
ID string `json:"id"`
} `json:"job"`
}
if err := json.Unmarshal(base.Result, &loginReply); err == nil {
if loginReply.ID != "" {
c.mu.Lock()
c.sessionID = loginReply.ID
c.mu.Unlock()
}
if loginReply.Job != nil && loginReply.Job.JobID != "" {
c.mu.Lock()
c.active = true
c.mu.Unlock()
if c.listener != nil {
c.listener.OnJob(proxy.Job{
Blob: loginReply.Job.Blob,
JobID: loginReply.Job.JobID,
Target: loginReply.Job.Target,
Algo: loginReply.Job.Algo,
Height: loginReply.Job.Height,
SeedHash: loginReply.Job.SeedHash,
ClientID: loginReply.Job.ID,
})
}
return
}
}
}
if base.Method == "job" {
var params struct {
Blob string `json:"blob"`
JobID string `json:"job_id"`
Target string `json:"target"`
Algo string `json:"algo"`
Height uint64 `json:"height"`
SeedHash string `json:"seed_hash"`
ID string `json:"id"`
}
if err := json.Unmarshal(base.Params, &params); err != nil {
return
}
c.mu.Lock()
c.active = true
c.mu.Unlock()
if c.listener != nil {
c.listener.OnJob(proxy.Job{
Blob: params.Blob,
JobID: params.JobID,
Target: params.Target,
Algo: params.Algo,
Height: params.Height,
SeedHash: params.SeedHash,
ClientID: params.ID,
})
}
return
}
seq := requestID(base.ID)
if seq == 0 {
return
}
c.mu.Lock()
_, ok := c.pending[seq]
if ok {
delete(c.pending, seq)
}
c.mu.Unlock()
if !ok {
return
}
var payload struct {
Status string `json:"status"`
}
if len(base.Result) > 0 {
_ = json.Unmarshal(base.Result, &payload)
}
accepted := len(base.Error) == 0
if payload.Status != "" && strings.EqualFold(payload.Status, "OK") {
accepted = true
}
errorMessage := ""
if !accepted && len(base.Error) > 0 {
var errPayload struct {
Message string `json:"message"`
}
_ = json.Unmarshal(base.Error, &errPayload)
errorMessage = errPayload.Message
}
if c.listener != nil {
c.listener.OnResultAccepted(seq, accepted, errorMessage)
}
}
// NewFailoverStrategy creates the ordered pool failover wrapper.
func NewFailoverStrategy(pools []proxy.PoolConfig, listener StratumListener, cfg *proxy.Config) *FailoverStrategy {
return &FailoverStrategy{
pools: pools,
listener: listener,
cfg: cfg,
}
}
// Connect establishes the first reachable pool connection.
func (s *FailoverStrategy) Connect() {
if s == nil {
return
}
s.mu.Lock()
defer s.mu.Unlock()
s.connectLocked(0)
}
func (s *FailoverStrategy) connectLocked(start int) {
enabled := enabledPools(s.pools)
if len(enabled) == 0 {
return
}
retries := 1
retryPause := time.Second
if s.cfg != nil {
if s.cfg.Retries > 0 {
retries = s.cfg.Retries
}
if s.cfg.RetryPause > 0 {
retryPause = time.Duration(s.cfg.RetryPause) * time.Second
}
}
for attempt := 0; attempt < retries; attempt++ {
for i := 0; i < len(enabled); i++ {
index := (start + i) % len(enabled)
poolCfg := enabled[index]
client := NewStratumClient(poolCfg, s)
if result := client.Connect(); result.OK {
s.client = client
s.current = index
client.Login()
return
}
}
time.Sleep(retryPause)
}
}
// Submit sends the share through the active client.
func (s *FailoverStrategy) Submit(jobID, nonce, result, algo string) int64 {
if s == nil || s.client == nil {
return 0
}
return s.client.Submit(jobID, nonce, result, algo)
}
// Disconnect closes the active client.
func (s *FailoverStrategy) Disconnect() {
if s == nil {
return
}
s.mu.Lock()
defer s.mu.Unlock()
if s.client != nil {
s.client.Disconnect()
s.client = nil
}
}
// IsActive reports whether the current client has received a job.
func (s *FailoverStrategy) IsActive() bool {
return s != nil && s.client != nil && s.client.IsActive()
}
// OnJob forwards the pool job to the outer listener.
func (s *FailoverStrategy) OnJob(job proxy.Job) {
if s != nil && s.listener != nil {
s.listener.OnJob(job)
}
}
// OnResultAccepted forwards the result status to the outer listener.
func (s *FailoverStrategy) OnResultAccepted(sequence int64, accepted bool, errorMessage string) {
if s != nil && s.listener != nil {
s.listener.OnResultAccepted(sequence, accepted, errorMessage)
}
}
// OnDisconnect retries from the primary pool and forwards the disconnect.
func (s *FailoverStrategy) OnDisconnect() {
if s == nil {
return
}
if s.listener != nil {
s.listener.OnDisconnect()
}
go s.Connect()
}
func enabledPools(pools []proxy.PoolConfig) []proxy.PoolConfig {
out := make([]proxy.PoolConfig, 0, len(pools))
for _, poolCfg := range pools {
if poolCfg.Enabled {
out = append(out, poolCfg)
}
}
return out
}
func requestID(id any) int64 {
switch v := id.(type) {
case float64:
return int64(v)
case int64:
return v
case int:
return int64(v)
case string:
n, _ := strconv.ParseInt(v, 10, 64)
return n
default:
return 0
}
}

View file

@ -11,6 +11,7 @@
package proxy
import (
"net/http"
"sync"
"time"
)
@ -21,15 +22,21 @@ import (
// p, result := proxy.New(cfg)
// if result.OK { p.Start() }
type Proxy struct {
config *Config
splitter Splitter
stats *Stats
workers *Workers
events *EventBus
servers []*Server
ticker *time.Ticker
watcher *ConfigWatcher
done chan struct{}
config *Config
splitter Splitter
stats *Stats
workers *Workers
events *EventBus
servers []*Server
ticker *time.Ticker
watcher *ConfigWatcher
done chan struct{}
stopOnce sync.Once
minersMu sync.RWMutex
miners map[int64]*Miner
customDiff *CustomDiff
rateLimit *RateLimiter
httpServer *http.Server
}
// Splitter is the interface both NonceSplitter and SimpleSplitter satisfy.

13
ratelimit_test.go Normal file
View file

@ -0,0 +1,13 @@
package proxy
import "testing"
func TestRateLimiter_Allow(t *testing.T) {
rl := NewRateLimiter(RateLimit{MaxConnectionsPerMinute: 1, BanDurationSeconds: 1})
if !rl.Allow("1.2.3.4:1234") {
t.Fatalf("expected first call to pass")
}
if rl.Allow("1.2.3.4:1234") {
t.Fatalf("expected second call to fail")
}
}

383
splitter/nicehash/impl.go Normal file
View file

@ -0,0 +1,383 @@
package nicehash
import (
"time"
"dappco.re/go/core/proxy"
"dappco.re/go/core/proxy/pool"
)
func init() {
proxy.RegisterSplitterFactory("nicehash", func(cfg *proxy.Config, events *proxy.EventBus) proxy.Splitter {
return NewNonceSplitter(cfg, events, pool.NewStrategyFactory(cfg))
})
}
// NewNonceSplitter creates a NiceHash splitter.
func NewNonceSplitter(cfg *proxy.Config, events *proxy.EventBus, factory pool.StrategyFactory) *NonceSplitter {
if factory == nil {
factory = pool.NewStrategyFactory(cfg)
}
return &NonceSplitter{
byID: make(map[int64]*NonceMapper),
cfg: cfg,
events: events,
strategyFactory: factory,
}
}
// Connect establishes the first mapper.
func (s *NonceSplitter) Connect() {
if s == nil {
return
}
s.mu.Lock()
defer s.mu.Unlock()
if len(s.mappers) == 0 {
s.addMapperLocked()
}
for _, mapper := range s.mappers {
if mapper.strategy != nil {
mapper.strategy.Connect()
return
}
}
}
// OnLogin assigns the miner to a mapper with a free slot.
func (s *NonceSplitter) OnLogin(event *proxy.LoginEvent) {
if s == nil || event == nil || event.Miner == nil {
return
}
s.mu.Lock()
defer s.mu.Unlock()
event.Miner.SetExtendedNiceHash(true)
for _, mapper := range s.mappers {
if mapper.Add(event.Miner) {
s.byID[mapper.id] = mapper
return
}
}
mapper := s.addMapperLocked()
if mapper != nil {
_ = mapper.Add(event.Miner)
s.byID[mapper.id] = mapper
}
}
// OnSubmit forwards a share to the owning mapper.
func (s *NonceSplitter) OnSubmit(event *proxy.SubmitEvent) {
if s == nil || event == nil || event.Miner == nil {
return
}
s.mu.RLock()
mapper := s.byID[event.Miner.MapperID()]
s.mu.RUnlock()
if mapper != nil {
mapper.Submit(event)
}
}
// OnClose releases the miner slot.
func (s *NonceSplitter) OnClose(event *proxy.CloseEvent) {
if s == nil || event == nil || event.Miner == nil {
return
}
s.mu.RLock()
mapper := s.byID[event.Miner.MapperID()]
s.mu.RUnlock()
if mapper != nil {
mapper.Remove(event.Miner)
}
}
// GC removes empty mappers that have been idle.
func (s *NonceSplitter) GC() {
if s == nil {
return
}
s.mu.Lock()
defer s.mu.Unlock()
now := time.Now()
next := s.mappers[:0]
for _, mapper := range s.mappers {
free, dead, active := mapper.storage.SlotCount()
if active == 0 && dead == 0 && now.Sub(mapper.lastUsed) > time.Minute {
if mapper.strategy != nil {
mapper.strategy.Disconnect()
}
delete(s.byID, mapper.id)
_ = free
continue
}
next = append(next, mapper)
}
s.mappers = next
}
// Tick is called once per second.
func (s *NonceSplitter) Tick(ticks uint64) {}
// Upstreams returns pool connection counts.
func (s *NonceSplitter) Upstreams() proxy.UpstreamStats {
if s == nil {
return proxy.UpstreamStats{}
}
s.mu.RLock()
defer s.mu.RUnlock()
var stats proxy.UpstreamStats
for _, mapper := range s.mappers {
if mapper.strategy != nil && mapper.strategy.IsActive() {
stats.Active++
} else if mapper.suspended > 0 {
stats.Error++
}
}
stats.Total = uint64(len(s.mappers))
return stats
}
func (s *NonceSplitter) addMapperLocked() *NonceMapper {
id := s.seq
s.seq++
mapper := NewNonceMapper(id, s.cfg, nil)
mapper.events = s.events
mapper.lastUsed = time.Now()
mapper.strategy = s.strategyFactory(mapper)
s.mappers = append(s.mappers, mapper)
if s.byID == nil {
s.byID = make(map[int64]*NonceMapper)
}
s.byID[mapper.id] = mapper
return mapper
}
// NewNonceMapper creates a mapper for one upstream connection.
func NewNonceMapper(id int64, cfg *proxy.Config, strategy pool.Strategy) *NonceMapper {
return &NonceMapper{
id: id,
storage: NewNonceStorage(),
strategy: strategy,
pending: make(map[int64]SubmitContext),
cfg: cfg,
}
}
// Add assigns a miner to a free slot.
func (m *NonceMapper) Add(miner *proxy.Miner) bool {
if m == nil || miner == nil {
return false
}
m.mu.Lock()
defer m.mu.Unlock()
ok := m.storage.Add(miner)
if ok {
miner.SetMapperID(m.id)
miner.SetExtendedNiceHash(true)
m.lastUsed = time.Now()
m.storage.mu.Lock()
job := m.storage.job
m.storage.mu.Unlock()
if job.IsValid() {
miner.ForwardJob(job, job.Algo)
}
}
return ok
}
// Remove marks the miner slot as dead.
func (m *NonceMapper) Remove(miner *proxy.Miner) {
if m == nil || miner == nil {
return
}
m.mu.Lock()
defer m.mu.Unlock()
m.storage.Remove(miner)
miner.SetMapperID(-1)
m.lastUsed = time.Now()
}
// Submit forwards the share to the pool.
func (m *NonceMapper) Submit(event *proxy.SubmitEvent) {
if m == nil || event == nil || event.Miner == nil || m.strategy == nil {
return
}
m.mu.Lock()
defer m.mu.Unlock()
jobID := event.JobID
m.storage.mu.Lock()
job := m.storage.job
prevJob := m.storage.prevJob
m.storage.mu.Unlock()
if jobID == "" {
jobID = job.JobID
}
if jobID == "" || (jobID != job.JobID && jobID != prevJob.JobID) {
return
}
seq := m.strategy.Submit(jobID, event.Nonce, event.Result, event.Algo)
m.pending[seq] = SubmitContext{RequestID: event.RequestID, MinerID: event.Miner.ID(), JobID: jobID}
m.lastUsed = time.Now()
}
// IsActive reports whether the mapper has received a valid job.
func (m *NonceMapper) IsActive() bool {
if m == nil {
return false
}
m.mu.Lock()
defer m.mu.Unlock()
return m.active
}
// OnJob stores the current pool job and broadcasts it to active miners.
func (m *NonceMapper) OnJob(job proxy.Job) {
if m == nil || !job.IsValid() {
return
}
m.mu.Lock()
defer m.mu.Unlock()
m.storage.SetJob(job)
m.active = true
m.suspended = 0
m.lastUsed = time.Now()
}
// OnResultAccepted correlates a pool result back to the originating miner.
func (m *NonceMapper) OnResultAccepted(sequence int64, accepted bool, errorMessage string) {
if m == nil {
return
}
m.mu.Lock()
ctx, ok := m.pending[sequence]
if ok {
delete(m.pending, sequence)
}
m.storage.mu.Lock()
miner := m.storage.miners[ctx.MinerID]
job := m.storage.job
prevJob := m.storage.prevJob
m.storage.mu.Unlock()
expired := ctx.JobID != "" && ctx.JobID == prevJob.JobID && ctx.JobID != job.JobID
m.mu.Unlock()
if !ok || miner == nil {
return
}
if accepted {
miner.Success(ctx.RequestID, "OK")
if m.events != nil {
m.events.Dispatch(proxy.Event{Type: proxy.EventAccept, Miner: miner, Job: &job, Diff: job.DifficultyFromTarget(), Latency: 0, Expired: expired})
}
return
}
miner.ReplyWithError(ctx.RequestID, errorMessage)
if m.events != nil {
m.events.Dispatch(proxy.Event{Type: proxy.EventReject, Miner: miner, Job: &job, Diff: job.DifficultyFromTarget(), Error: errorMessage})
}
}
func (m *NonceMapper) OnDisconnect() {
if m == nil {
return
}
m.mu.Lock()
defer m.mu.Unlock()
m.active = false
m.suspended++
}
// NewNonceStorage creates an empty slot table.
func NewNonceStorage() *NonceStorage {
return &NonceStorage{miners: make(map[int64]*proxy.Miner)}
}
// Add finds the next free slot.
func (s *NonceStorage) Add(miner *proxy.Miner) bool {
if s == nil || miner == nil {
return false
}
s.mu.Lock()
defer s.mu.Unlock()
for i := 0; i < 256; i++ {
index := (s.cursor + i) % 256
if s.slots[index] != 0 {
continue
}
s.slots[index] = miner.ID()
s.miners[miner.ID()] = miner
miner.SetFixedByte(uint8(index))
s.cursor = (index + 1) % 256
return true
}
return false
}
// Remove marks a slot as dead.
func (s *NonceStorage) Remove(miner *proxy.Miner) {
if s == nil || miner == nil {
return
}
s.mu.Lock()
defer s.mu.Unlock()
index := int(miner.FixedByte())
if index >= 0 && index < len(s.slots) && s.slots[index] == miner.ID() {
s.slots[index] = -miner.ID()
}
delete(s.miners, miner.ID())
}
// SetJob replaces the current job and sends it to active miners.
func (s *NonceStorage) SetJob(job proxy.Job) {
if s == nil || !job.IsValid() {
return
}
s.mu.Lock()
s.prevJob = s.job
if s.prevJob.ClientID != job.ClientID {
s.prevJob = proxy.Job{}
}
s.job = job
for i := range s.slots {
if s.slots[i] < 0 {
s.slots[i] = 0
}
}
miners := make([]*proxy.Miner, 0, len(s.miners))
for _, miner := range s.miners {
miners = append(miners, miner)
}
s.mu.Unlock()
for _, miner := range miners {
miner.ForwardJob(job, job.Algo)
}
}
// IsValidJobID returns true if the id matches the current or previous job.
func (s *NonceStorage) IsValidJobID(id string) bool {
if s == nil {
return false
}
s.mu.Lock()
defer s.mu.Unlock()
return id != "" && (id == s.job.JobID || id == s.prevJob.JobID)
}
// SlotCount returns free, dead, and active counts.
func (s *NonceStorage) SlotCount() (free, dead, active int) {
if s == nil {
return 0, 0, 0
}
s.mu.Lock()
defer s.mu.Unlock()
for _, slot := range s.slots {
switch {
case slot == 0:
free++
case slot < 0:
dead++
default:
active++
}
}
return
}

View file

@ -2,6 +2,7 @@ package nicehash
import (
"sync"
"time"
"dappco.re/go/core/proxy"
"dappco.re/go/core/proxy/pool"
@ -15,11 +16,13 @@ import (
type NonceMapper struct {
id int64
storage *NonceStorage
strategy pool.Strategy // manages pool client lifecycle and failover
strategy pool.Strategy // manages pool client lifecycle and failover
pending map[int64]SubmitContext // sequence → {requestID, minerID}
cfg *proxy.Config
events *proxy.EventBus
active bool // true once pool has sent at least one job
suspended int // > 0 when pool connection is in error/reconnecting
lastUsed time.Time
mu sync.Mutex
}
@ -29,4 +32,5 @@ type NonceMapper struct {
type SubmitContext struct {
RequestID int64 // JSON-RPC id from the miner's submit request
MinerID int64 // miner that submitted
JobID string
}

View file

@ -23,8 +23,10 @@ import (
// s.Connect()
type NonceSplitter struct {
mappers []*NonceMapper
byID map[int64]*NonceMapper
cfg *proxy.Config
events *proxy.EventBus
strategyFactory pool.StrategyFactory
mu sync.RWMutex
seq int64
}

View file

@ -0,0 +1,26 @@
package nicehash
import (
"testing"
"dappco.re/go/core/proxy"
)
func TestNonceStorage_AddAndRemove(t *testing.T) {
storage := NewNonceStorage()
miner := &proxy.Miner{}
miner.SetID(1)
if !storage.Add(miner) {
t.Fatalf("expected add to succeed")
}
if miner.FixedByte() != 0 {
t.Fatalf("expected first slot to be 0, got %d", miner.FixedByte())
}
storage.Remove(miner)
free, dead, active := storage.SlotCount()
if free != 255 || dead != 1 || active != 0 {
t.Fatalf("unexpected slot counts: free=%d dead=%d active=%d", free, dead, active)
}
}

227
splitter/simple/impl.go Normal file
View file

@ -0,0 +1,227 @@
package simple
import (
"time"
"dappco.re/go/core/proxy"
"dappco.re/go/core/proxy/pool"
)
func init() {
proxy.RegisterSplitterFactory("simple", func(cfg *proxy.Config, events *proxy.EventBus) proxy.Splitter {
return NewSimpleSplitter(cfg, events, pool.NewStrategyFactory(cfg))
})
}
// NewSimpleSplitter creates the passthrough splitter.
func NewSimpleSplitter(cfg *proxy.Config, events *proxy.EventBus, factory pool.StrategyFactory) *SimpleSplitter {
if factory == nil {
factory = pool.NewStrategyFactory(cfg)
}
return &SimpleSplitter{
active: make(map[int64]*SimpleMapper),
idle: make(map[int64]*SimpleMapper),
cfg: cfg,
events: events,
factory: factory,
}
}
// Connect establishes any mapper strategies that already exist.
func (s *SimpleSplitter) Connect() {
if s == nil {
return
}
s.mu.Lock()
defer s.mu.Unlock()
for _, mapper := range s.active {
if mapper.strategy != nil {
mapper.strategy.Connect()
}
}
for _, mapper := range s.idle {
if mapper.strategy != nil {
mapper.strategy.Connect()
}
}
}
// OnLogin creates or reclaims a mapper.
func (s *SimpleSplitter) OnLogin(event *proxy.LoginEvent) {
if s == nil || event == nil || event.Miner == nil {
return
}
s.mu.Lock()
defer s.mu.Unlock()
if s.cfg.ReuseTimeout > 0 {
for id, mapper := range s.idle {
if mapper.strategy != nil && mapper.strategy.IsActive() {
delete(s.idle, id)
mapper.miner = event.Miner
mapper.idleAt = time.Time{}
mapper.stopped = false
s.active[event.Miner.ID()] = mapper
event.Miner.SetRouteID(mapper.id)
return
}
}
}
mapper := s.newMapperLocked()
mapper.miner = event.Miner
s.active[event.Miner.ID()] = mapper
event.Miner.SetRouteID(mapper.id)
if mapper.strategy != nil {
mapper.strategy.Connect()
}
}
// OnSubmit forwards the share to the owning mapper.
func (s *SimpleSplitter) OnSubmit(event *proxy.SubmitEvent) {
if s == nil || event == nil || event.Miner == nil {
return
}
s.mu.Lock()
mapper := s.active[event.Miner.ID()]
s.mu.Unlock()
if mapper != nil {
mapper.Submit(event)
}
}
// OnClose moves a mapper to the idle pool or stops it.
func (s *SimpleSplitter) OnClose(event *proxy.CloseEvent) {
if s == nil || event == nil || event.Miner == nil {
return
}
s.mu.Lock()
defer s.mu.Unlock()
mapper := s.active[event.Miner.ID()]
if mapper == nil {
return
}
delete(s.active, event.Miner.ID())
mapper.miner = nil
mapper.idleAt = time.Now()
event.Miner.SetRouteID(-1)
if s.cfg.ReuseTimeout > 0 {
s.idle[mapper.id] = mapper
return
}
mapper.stopped = true
if mapper.strategy != nil {
mapper.strategy.Disconnect()
}
}
// GC removes expired idle mappers.
func (s *SimpleSplitter) GC() {
if s == nil {
return
}
s.mu.Lock()
defer s.mu.Unlock()
now := time.Now()
for id, mapper := range s.idle {
if mapper.stopped || (s.cfg.ReuseTimeout > 0 && now.Sub(mapper.idleAt) > time.Duration(s.cfg.ReuseTimeout)*time.Second) {
if mapper.strategy != nil {
mapper.strategy.Disconnect()
}
delete(s.idle, id)
}
}
}
// Tick is a no-op for simple mode.
func (s *SimpleSplitter) Tick(ticks uint64) {}
// Upstreams returns active/idle/error counts.
func (s *SimpleSplitter) Upstreams() proxy.UpstreamStats {
if s == nil {
return proxy.UpstreamStats{}
}
s.mu.Lock()
defer s.mu.Unlock()
var stats proxy.UpstreamStats
stats.Active = uint64(len(s.active))
stats.Sleep = uint64(len(s.idle))
stats.Total = stats.Active + stats.Sleep
return stats
}
func (s *SimpleSplitter) newMapperLocked() *SimpleMapper {
id := s.seq
s.seq++
mapper := &SimpleMapper{
id: id,
events: s.events,
pending: make(map[int64]*proxy.SubmitEvent),
}
mapper.strategy = s.factory(mapper)
if mapper.strategy == nil {
mapper.strategy = s.factory(mapper)
}
return mapper
}
// Submit forwards a share to the pool.
func (m *SimpleMapper) Submit(event *proxy.SubmitEvent) {
if m == nil || event == nil || m.strategy == nil {
return
}
m.mu.Lock()
defer m.mu.Unlock()
seq := m.strategy.Submit(event.JobID, event.Nonce, event.Result, event.Algo)
m.pending[seq] = event
}
// OnJob forwards the latest pool job to the active miner.
func (m *SimpleMapper) OnJob(job proxy.Job) {
if m == nil {
return
}
m.mu.Lock()
miner := m.miner
m.mu.Unlock()
if miner == nil {
return
}
miner.ForwardJob(job, job.Algo)
}
// OnResultAccepted forwards result status to the miner.
func (m *SimpleMapper) OnResultAccepted(sequence int64, accepted bool, errorMessage string) {
if m == nil {
return
}
m.mu.Lock()
ctx := m.pending[sequence]
delete(m.pending, sequence)
miner := m.miner
m.mu.Unlock()
if ctx == nil || miner == nil {
return
}
if accepted {
miner.Success(ctx.RequestID, "OK")
if m.events != nil {
job := miner.CurrentJob()
m.events.Dispatch(proxy.Event{Type: proxy.EventAccept, Miner: miner, Diff: job.DifficultyFromTarget(), Job: &job})
}
return
}
miner.ReplyWithError(ctx.RequestID, errorMessage)
if m.events != nil {
job := miner.CurrentJob()
m.events.Dispatch(proxy.Event{Type: proxy.EventReject, Miner: miner, Diff: job.DifficultyFromTarget(), Job: &job, Error: errorMessage})
}
}
// OnDisconnect marks the mapper as disconnected.
func (m *SimpleMapper) OnDisconnect() {
if m == nil {
return
}
m.stopped = true
}

View file

@ -1,6 +1,7 @@
package simple
import (
"sync"
"time"
"dappco.re/go/core/proxy"
@ -18,4 +19,7 @@ type SimpleMapper struct {
strategy pool.Strategy
idleAt time.Time // zero when active
stopped bool
events *proxy.EventBus
pending map[int64]*proxy.SubmitEvent
mu sync.Mutex
}

1252
state_impl.go Normal file

File diff suppressed because it is too large Load diff

View file

@ -19,6 +19,7 @@ type Stats struct {
expired atomic.Uint64
hashes atomic.Uint64 // cumulative sum of accepted share difficulties
connections atomic.Uint64 // total TCP connections accepted (ever)
miners atomic.Uint64 // current connected miners
maxMiners atomic.Uint64 // peak concurrent miner count
topDiff [10]uint64 // top-10 accepted difficulties, sorted descending; guarded by mu
latency []uint16 // pool response latencies in ms; capped at 10000 samples; guarded by mu
@ -53,8 +54,8 @@ type StatsSummary struct {
Invalid uint64 `json:"invalid"`
Expired uint64 `json:"expired"`
Hashes uint64 `json:"hashes_total"`
AvgTime uint32 `json:"avg_time"` // seconds per accepted share
AvgLatency uint32 `json:"latency"` // median pool response latency in ms
Hashrate [6]float64 `json:"hashrate"` // H/s per window (index = HashrateWindow* constants)
AvgTime uint32 `json:"avg_time"` // seconds per accepted share
AvgLatency uint32 `json:"latency"` // median pool response latency in ms
Hashrate [6]float64 `json:"hashrate"` // H/s per window (index = HashrateWindow* constants)
TopDiff [10]uint64 `json:"best"`
}

View file

@ -11,9 +11,9 @@ import (
// w := proxy.NewWorkers(proxy.WorkersByRigID, bus)
type Workers struct {
mode WorkersMode
entries []WorkerRecord // ordered by first-seen (stable)
nameIndex map[string]int // workerName → entries index
idIndex map[int64]int // minerID → entries index
entries []WorkerRecord // ordered by first-seen (stable)
nameIndex map[string]int // workerName → entries index
idIndex map[int64]int // minerID → entries index
mu sync.RWMutex
}
@ -27,7 +27,7 @@ type WorkerRecord struct {
Accepted uint64
Rejected uint64
Invalid uint64
Hashes uint64 // sum of accepted share difficulties
Hashes uint64 // sum of accepted share difficulties
LastHashAt time.Time
windows [5]tickWindow // 60s, 600s, 3600s, 12h, 24h
}