feat(proxy): close RFC gaps

Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
Virgil 2026-04-04 10:52:30 +00:00
parent 20f0626a19
commit 7d2d309529
13 changed files with 302 additions and 72 deletions

View file

@ -12,6 +12,7 @@ package api
import (
"encoding/json"
"net/http"
"strings"
"dappco.re/go/core/proxy"
)
@ -86,6 +87,9 @@ func RegisterRoutes(router Router, proxyValue *proxy.Proxy) {
}
router.HandleFunc("/1/summary", func(writer http.ResponseWriter, request *http.Request) {
if !allowRequest(writer, request, proxyValue.HTTPConfig()) {
return
}
summary := proxyValue.Summary()
response := SummaryResponse{
Version: "1.0.0",
@ -127,6 +131,9 @@ func RegisterRoutes(router Router, proxyValue *proxy.Proxy) {
})
router.HandleFunc("/1/workers", func(writer http.ResponseWriter, request *http.Request) {
if !allowRequest(writer, request, proxyValue.HTTPConfig()) {
return
}
type responseBody struct {
Mode string `json:"mode"`
Workers [][]interface{} `json:"workers"`
@ -159,6 +166,9 @@ func RegisterRoutes(router Router, proxyValue *proxy.Proxy) {
})
router.HandleFunc("/1/miners", func(writer http.ResponseWriter, request *http.Request) {
if !allowRequest(writer, request, proxyValue.HTTPConfig()) {
return
}
miners := proxyValue.Miners()
rows := make([][]interface{}, 0, len(miners))
for _, miner := range miners {
@ -187,6 +197,29 @@ func RegisterRoutes(router Router, proxyValue *proxy.Proxy) {
})
}
func allowRequest(writer http.ResponseWriter, request *http.Request, config proxy.HTTPConfig) bool {
if request == nil {
return false
}
if config.AccessToken != "" {
header := request.Header.Get("Authorization")
prefix := "Bearer "
if !strings.HasPrefix(header, prefix) || strings.TrimSpace(strings.TrimPrefix(header, prefix)) != config.AccessToken {
writer.Header().Set("WWW-Authenticate", "Bearer")
http.Error(writer, http.StatusText(http.StatusUnauthorized), http.StatusUnauthorized)
return false
}
}
if config.Restricted && request.Method != http.MethodGet {
http.Error(writer, http.StatusText(http.StatusMethodNotAllowed), http.StatusMethodNotAllowed)
return false
}
return true
}
func writeJSON(writer http.ResponseWriter, value interface{}) {
writer.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(writer).Encode(value)

View file

@ -28,6 +28,7 @@ type Miner struct {
id int64 // monotonically increasing per-process; atomic assignment
rpcID string // UUID v4 sent to miner as session id
state MinerState
stateMu sync.RWMutex
extAlgo bool // miner sent algo list in login params
extNH bool // NiceHash mode active (fixed byte splitting)
ip string // remote IP (without port, for logging)

View file

@ -37,14 +37,23 @@ func NewMiner(conn net.Conn, localPort uint16, tlsCfg *tls.Config) *Miner {
return miner
}
func (m *Miner) ID() int64 { return m.id }
func (m *Miner) RPCID() string { return m.rpcID }
func (m *Miner) User() string { return m.user }
func (m *Miner) Password() string { return m.password }
func (m *Miner) Agent() string { return m.agent }
func (m *Miner) RigID() string { return m.rigID }
func (m *Miner) IP() string { return m.ip }
func (m *Miner) State() MinerState { return m.state }
func (m *Miner) ID() int64 { return m.id }
func (m *Miner) RPCID() string { return m.rpcID }
func (m *Miner) User() string { return m.user }
func (m *Miner) Password() string { return m.password }
func (m *Miner) Agent() string { return m.agent }
func (m *Miner) RigID() string { return m.rigID }
func (m *Miner) IP() string { return m.ip }
func (m *Miner) State() MinerState {
if m == nil {
return MinerStateClosing
}
m.stateMu.RLock()
state := m.state
m.stateMu.RUnlock()
return state
}
func (m *Miner) Diff() uint64 { return m.diff }
func (m *Miner) FixedByte() uint8 { return m.fixedByte }
func (m *Miner) MapperID() int64 { return m.mapperID }
@ -54,12 +63,20 @@ func (m *Miner) TX() uint64 { return m.tx }
func (m *Miner) RX() uint64 { return m.rx }
func (m *Miner) LastActivityAt() time.Time { return m.lastActivityAt }
func (m *Miner) SetRPCID(value string) { m.rpcID = value }
func (m *Miner) SetUser(value string) { m.user = value }
func (m *Miner) SetPassword(value string) { m.password = value }
func (m *Miner) SetAgent(value string) { m.agent = value }
func (m *Miner) SetRigID(value string) { m.rigID = value }
func (m *Miner) SetState(value MinerState) { m.state = value }
func (m *Miner) SetRPCID(value string) { m.rpcID = value }
func (m *Miner) SetUser(value string) { m.user = value }
func (m *Miner) SetPassword(value string) { m.password = value }
func (m *Miner) SetAgent(value string) { m.agent = value }
func (m *Miner) SetRigID(value string) { m.rigID = value }
func (m *Miner) SetState(value MinerState) {
if m == nil {
return
}
m.stateMu.Lock()
m.state = value
m.stateMu.Unlock()
}
func (m *Miner) SetDiff(value uint64) { m.diff = value }
func (m *Miner) SetFixedByte(value uint8) { m.fixedByte = value }
func (m *Miner) SetMapperID(value int64) { m.mapperID = value }

View file

@ -7,6 +7,7 @@ import (
"encoding/json"
"net"
"strings"
"time"
)
type minerRequest struct {
@ -26,6 +27,10 @@ func (m *Miner) Start() {
go func() {
reader := bufio.NewReaderSize(m.conn, len(m.buf))
for {
if errorValue := m.applyReadDeadline(); errorValue != nil {
m.Close()
return
}
line, isPrefix, errorValue := reader.ReadLine()
if errorValue != nil {
m.Close()
@ -59,7 +64,7 @@ func (m *Miner) ForwardJob(job Job, algo string) {
}
m.diff = job.DifficultyFromTarget()
m.state = MinerStateReady
m.SetState(MinerStateReady)
jobCopy := job
m.currentJob = &jobCopy
m.Touch()
@ -118,7 +123,7 @@ func (m *Miner) Close() {
}
m.closeOnce.Do(func() {
m.state = MinerStateClosing
m.SetState(MinerStateClosing)
if m.events != nil {
m.events.Dispatch(Event{Type: EventClose, Miner: m})
}
@ -208,7 +213,7 @@ func (m *Miner) handleLogin(request minerRequest) {
m.events.Dispatch(Event{Type: EventLogin, Miner: m})
}
if m.state == MinerStateClosing {
if m.State() == MinerStateClosing {
return
}
@ -231,9 +236,9 @@ func (m *Miner) handleLogin(request minerRequest) {
if m.extAlgo {
result["extensions"] = []string{"algo"}
}
m.state = MinerStateReady
m.SetState(MinerStateReady)
} else {
m.state = MinerStateWaitReady
m.SetState(MinerStateWaitReady)
if m.extAlgo {
result["extensions"] = []string{"algo"}
}
@ -248,7 +253,7 @@ func (m *Miner) handleLogin(request minerRequest) {
}
func (m *Miner) handleSubmit(request minerRequest) {
if m.state != MinerStateReady {
if m.State() != MinerStateReady {
m.ReplyWithError(request.ID, "Unauthenticated")
return
}
@ -311,6 +316,34 @@ func (m *Miner) currentJobCopy() *Job {
return &jobCopy
}
func (m *Miner) applyReadDeadline() error {
if m == nil || m.conn == nil {
return nil
}
deadline := m.readDeadline()
if deadline.IsZero() {
return nil
}
return m.conn.SetReadDeadline(deadline)
}
func (m *Miner) readDeadline() time.Time {
if m == nil {
return time.Time{}
}
switch m.State() {
case MinerStateWaitLogin:
return m.lastActivityAt.Add(10 * time.Second)
case MinerStateWaitReady, MinerStateReady:
return m.lastActivityAt.Add(600 * time.Second)
default:
return time.Time{}
}
}
func (m *Miner) dispatchSubmitResult(eventType EventType, diff uint64, errorMessage string, requestID int64) {
if m == nil || m.events == nil {
return
@ -334,11 +367,11 @@ func (m *Miner) dispatchSubmitResult(eventType EventType, diff uint64, errorMess
func (m *Miner) setStateFromJob(job Job) {
m.currentJob = &job
m.state = MinerStateReady
m.SetState(MinerStateReady)
}
func (m *Miner) Expire() {
if m == nil || m.state == MinerStateClosing {
if m == nil || m.State() == MinerStateClosing {
return
}
m.Close()

View file

@ -217,6 +217,7 @@ func (c *StratumClient) handleMessage(response jsonRPCResponse) {
if json.Unmarshal(response.Result, &loginResult) == nil && loginResult.ID != "" {
c.sessionID = loginResult.ID
if loginResult.Job.IsValid() {
loginResult.Job.ClientID = c.sessionID
c.active = true
if c.listener != nil {
c.listener.OnJob(loginResult.Job)
@ -239,6 +240,7 @@ func (c *StratumClient) handleMessage(response jsonRPCResponse) {
if response.Method == "job" {
var payload proxy.Job
if json.Unmarshal(response.Params, &payload) == nil && payload.IsValid() {
payload.ClientID = c.sessionID
c.active = true
if c.listener != nil {
c.listener.OnJob(payload)

View file

@ -67,7 +67,11 @@ func (s *FailoverStrategy) Connect() {
s.mu.Lock()
defer s.mu.Unlock()
if len(s.pools) == 0 {
pools := s.pools
if s.cfg != nil {
pools = s.cfg.Pools
}
if len(pools) == 0 {
return
}
@ -83,7 +87,7 @@ func (s *FailoverStrategy) Connect() {
}
for attempt := 0; attempt < retries; attempt++ {
for index, poolConfig := range s.pools {
for index, poolConfig := range pools {
if !poolConfig.Enabled {
continue
}

View file

@ -21,17 +21,19 @@ import (
// p, result := proxy.New(cfg)
// if result.OK { p.Start() }
type Proxy struct {
config *Config
splitter Splitter
stats *Stats
workers *Workers
events *EventBus
miners map[int64]*Miner
minerMu sync.RWMutex
servers []*Server
ticker *time.Ticker
watcher *ConfigWatcher
done chan struct{}
config *Config
customDiff *CustomDiff
rateLimiter *RateLimiter
splitter Splitter
stats *Stats
workers *Workers
events *EventBus
miners map[int64]*Miner
minerMu sync.RWMutex
servers []*Server
ticker *time.Ticker
watcher *ConfigWatcher
done chan struct{}
}
// Splitter is the interface both NonceSplitter and SimpleSplitter satisfy.
@ -121,6 +123,7 @@ type tokenBucket struct {
// bus.Subscribe(proxy.EventLogin, cd.OnLogin)
type CustomDiff struct {
globalDiff uint64
mu sync.RWMutex
}
var splitterFactories = map[string]func(*Config, *EventBus) Splitter{

View file

@ -19,18 +19,21 @@ func New(cfg *Config) (*Proxy, error) {
events := NewEventBus()
stats := NewStats()
events.Subscribe(EventLogin, NewCustomDiff(cfg.CustomDiff).OnLogin)
customDiff := NewCustomDiff(cfg.CustomDiff)
events.Subscribe(EventLogin, customDiff.OnLogin)
workers := NewWorkers(cfg.Workers, events)
splitter := newSplitter(cfg, events)
proxyValue := &Proxy{
config: cfg,
splitter: splitter,
stats: stats,
workers: workers,
events: events,
miners: make(map[int64]*Miner),
done: make(chan struct{}),
config: cfg,
customDiff: customDiff,
splitter: splitter,
stats: stats,
workers: workers,
events: events,
miners: make(map[int64]*Miner),
rateLimiter: NewRateLimiter(cfg.RateLimit),
done: make(chan struct{}),
}
events.Subscribe(EventLogin, func(event Event) {
@ -93,7 +96,7 @@ func (p *Proxy) Start() {
tlsConfig = &tls.Config{Certificates: []tls.Certificate{certificate}}
}
}
server, errorValue := NewServer(bind, tlsConfig, NewRateLimiter(p.config.RateLimit), p.acceptConn)
server, errorValue := NewServer(bind, tlsConfig, p.rateLimiter, p.acceptConn)
if errorValue != nil {
continue
}
@ -109,6 +112,9 @@ func (p *Proxy) Start() {
ticks++
p.stats.Tick()
p.workers.Tick()
if p.rateLimiter != nil {
p.rateLimiter.Tick()
}
if p.splitter != nil {
p.splitter.Tick(ticks)
}
@ -158,7 +164,19 @@ func (p *Proxy) Stop() {
// p.Reload(newCfg)
func (p *Proxy) Reload(cfg *Config) {
if cfg != nil {
p.config = cfg
if p.config == nil {
p.config = cfg
} else {
sourcePath := p.config.sourcePath
*p.config = *cfg
p.config.sourcePath = sourcePath
}
if p.customDiff != nil {
p.customDiff.SetGlobalDiff(p.config.CustomDiff)
}
if p.rateLimiter != nil {
p.rateLimiter.SetConfig(p.config.RateLimit)
}
}
}
@ -209,6 +227,13 @@ func (p *Proxy) Mode() string {
return p.config.Mode
}
func (p *Proxy) HTTPConfig() HTTPConfig {
if p == nil || p.config == nil {
return HTTPConfig{}
}
return p.config.HTTP
}
func (p *Proxy) WorkersMode() string {
if p == nil || p.config == nil {
return ""

View file

@ -17,11 +17,24 @@ func NewRateLimiter(config RateLimit) *RateLimiter {
}
}
// SetConfig replaces the active rate-limit settings.
//
// rl.SetConfig(proxy.RateLimit{MaxConnectionsPerMinute: 30, BanDurationSeconds: 300})
func (rl *RateLimiter) SetConfig(config RateLimit) {
if rl == nil {
return
}
rl.mu.Lock()
rl.cfg = config
rl.mu.Unlock()
}
// Allow returns true if the IP address is permitted to open a new connection. Thread-safe.
//
// if rl.Allow(conn.RemoteAddr().String()) { proceed() }
func (rl *RateLimiter) Allow(ip string) bool {
if rl == nil || rl.cfg.MaxConnectionsPerMinute <= 0 {
if rl == nil {
return true
}
@ -31,6 +44,10 @@ func (rl *RateLimiter) Allow(ip string) bool {
rl.mu.Lock()
defer rl.mu.Unlock()
if rl.cfg.MaxConnectionsPerMinute <= 0 {
return true
}
if bannedUntil, exists := rl.banned[host]; exists {
if bannedUntil.After(now) {
return false
@ -63,7 +80,7 @@ func (rl *RateLimiter) Allow(ip string) bool {
//
// rl.Tick()
func (rl *RateLimiter) Tick() {
if rl == nil || rl.cfg.MaxConnectionsPerMinute <= 0 {
if rl == nil {
return
}
@ -71,6 +88,10 @@ func (rl *RateLimiter) Tick() {
rl.mu.Lock()
defer rl.mu.Unlock()
if rl.cfg.MaxConnectionsPerMinute <= 0 {
return
}
for host, bannedUntil := range rl.banned {
if !bannedUntil.After(now) {
delete(rl.banned, host)
@ -112,6 +133,19 @@ func NewCustomDiff(globalDiff uint64) *CustomDiff {
return &CustomDiff{globalDiff: globalDiff}
}
// SetGlobalDiff updates the default custom difficulty override.
//
// cd.SetGlobalDiff(100000)
func (cd *CustomDiff) SetGlobalDiff(globalDiff uint64) {
if cd == nil {
return
}
cd.mu.Lock()
cd.globalDiff = globalDiff
cd.mu.Unlock()
}
// OnLogin parses miner.User for a "+{number}" suffix and sets miner.CustomDiff.
//
// cd.OnLogin(proxy.Event{Miner: miner})
@ -130,7 +164,14 @@ func (cd *CustomDiff) OnLogin(event Event) {
}
}
if cd != nil && cd.globalDiff > 0 {
event.Miner.SetCustomDiff(cd.globalDiff)
if cd == nil {
return
}
cd.mu.RLock()
globalDiff := cd.globalDiff
cd.mu.RUnlock()
if globalDiff > 0 {
event.Miner.SetCustomDiff(globalDiff)
}
}

View file

@ -31,6 +31,7 @@ type SubmitContext struct {
RequestID int64 // JSON-RPC id from the miner's submit request
MinerID int64 // miner that submitted
JobID string
Expired bool
}
// NewNonceMapper creates one upstream pool mapper and its local slot table.
@ -59,6 +60,12 @@ func (m *NonceMapper) Submit(event *proxy.SubmitEvent) {
return
}
valid, expired := m.storage.JobStatus(event.JobID)
if !valid {
event.Miner.ReplyWithError(event.RequestID, "Invalid job id")
return
}
sequence := m.strategy.Submit(event.JobID, event.Nonce, event.Result, event.Algo)
if sequence == 0 {
if event.Miner != nil {
@ -71,6 +78,7 @@ func (m *NonceMapper) Submit(event *proxy.SubmitEvent) {
RequestID: event.RequestID,
MinerID: event.Miner.ID(),
JobID: event.JobID,
Expired: expired,
}
m.mu.Unlock()
}
@ -118,11 +126,12 @@ func (m *NonceMapper) OnResultAccepted(sequence int64, accepted bool, errorMessa
if m.events != nil {
jobCopy := m.storage.CurrentJob()
m.events.Dispatch(proxy.Event{
Type: eventType,
Miner: miner,
Job: jobCopy,
Diff: miner.Diff(),
Error: errorMessage,
Type: eventType,
Miner: miner,
Job: jobCopy,
Diff: miner.Diff(),
Error: errorMessage,
Expired: context.Expired,
})
}

View file

@ -114,10 +114,27 @@ func (s *NonceStorage) SetJob(job proxy.Job) {
//
// if !storage.IsValidJobID(submitJobID) { reject }
func (s *NonceStorage) IsValidJobID(id string) bool {
valid, _ := s.JobStatus(id)
return valid
}
// JobStatus returns whether the job ID is current or stale-but-still-acceptable.
//
// valid, expired := storage.JobStatus(submitJobID)
func (s *NonceStorage) JobStatus(id string) (valid bool, expired bool) {
s.mu.Lock()
defer s.mu.Unlock()
return id != "" && (id == s.job.JobID || id == s.prevJob.JobID)
if id == "" {
return false, false
}
if id == s.job.JobID {
return true, false
}
if id == s.prevJob.JobID && id != "" {
return true, true
}
return false, false
}
// SlotCount returns free, dead, and active slot counts for monitoring output.

View file

@ -18,34 +18,66 @@ type SimpleMapper struct {
miner *proxy.Miner // nil when idle
strategy pool.Strategy
events *proxy.EventBus
pending map[int64]int64
pending map[int64]simpleSubmitContext
job proxy.Job
prevJob proxy.Job
idleAt time.Time // zero when active
stopped bool
mu sync.Mutex
}
type simpleSubmitContext struct {
RequestID int64
Expired bool
}
// NewSimpleMapper stores the mapper ID and strategy.
//
// mapper := simple.NewSimpleMapper(1, strategy)
func NewSimpleMapper(id int64, strategy pool.Strategy) *SimpleMapper {
return &SimpleMapper{id: id, strategy: strategy, pending: make(map[int64]int64)}
return &SimpleMapper{id: id, strategy: strategy, pending: make(map[int64]simpleSubmitContext)}
}
func (m *SimpleMapper) OnJob(job proxy.Job) {
if !job.IsValid() || m.miner == nil {
return
}
m.miner.ForwardJob(job, job.Algo)
}
func (m *SimpleMapper) OnResultAccepted(sequence int64, accepted bool, errorMessage string) {
if m.miner == nil {
if !job.IsValid() {
return
}
m.mu.Lock()
requestID, exists := m.pending[sequence]
if m.job.ClientID == job.ClientID || m.job.ClientID == "" {
m.prevJob = m.job
} else {
m.prevJob = proxy.Job{}
}
m.job = job
miner := m.miner
m.mu.Unlock()
if miner != nil {
miner.ForwardJob(job, job.Algo)
}
}
func (m *SimpleMapper) JobStatus(id string) (valid bool, expired bool) {
m.mu.Lock()
defer m.mu.Unlock()
if id == "" {
return false, false
}
if id == m.job.JobID {
return true, false
}
if id == m.prevJob.JobID {
return true, true
}
return false, false
}
func (m *SimpleMapper) OnResultAccepted(sequence int64, accepted bool, errorMessage string) {
m.mu.Lock()
context, exists := m.pending[sequence]
miner := m.miner
if !exists {
m.mu.Unlock()
return
@ -53,18 +85,22 @@ func (m *SimpleMapper) OnResultAccepted(sequence int64, accepted bool, errorMess
delete(m.pending, sequence)
m.mu.Unlock()
if miner == nil {
return
}
if accepted {
if m.events != nil {
m.events.Dispatch(proxy.Event{Type: proxy.EventAccept, Miner: m.miner, Diff: m.miner.Diff()})
m.events.Dispatch(proxy.Event{Type: proxy.EventAccept, Miner: miner, Diff: miner.Diff(), Expired: context.Expired})
}
m.miner.Success(requestID, "OK")
miner.Success(context.RequestID, "OK")
return
}
if m.events != nil {
m.events.Dispatch(proxy.Event{Type: proxy.EventReject, Miner: m.miner, Diff: m.miner.Diff(), Error: errorMessage})
m.events.Dispatch(proxy.Event{Type: proxy.EventReject, Miner: miner, Diff: miner.Diff(), Error: errorMessage, Expired: context.Expired})
}
m.miner.ReplyWithError(requestID, errorMessage)
miner.ReplyWithError(context.RequestID, errorMessage)
}
func (m *SimpleMapper) OnDisconnect() {

View file

@ -92,6 +92,12 @@ func (s *SimpleSplitter) OnSubmit(event *proxy.SubmitEvent) {
return
}
valid, expired := mapper.JobStatus(event.JobID)
if !valid {
event.Miner.ReplyWithError(event.RequestID, "Invalid job id")
return
}
sequence := mapper.strategy.Submit(event.JobID, event.Nonce, event.Result, event.Algo)
if sequence == 0 {
event.Miner.ReplyWithError(event.RequestID, "Pool unavailable")
@ -99,7 +105,10 @@ func (s *SimpleSplitter) OnSubmit(event *proxy.SubmitEvent) {
}
mapper.mu.Lock()
mapper.pending[sequence] = event.RequestID
mapper.pending[sequence] = simpleSubmitContext{
RequestID: event.RequestID,
Expired: expired,
}
mapper.mu.Unlock()
}