feat(proxy): fill RFC login and watch gaps

Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
Virgil 2026-04-04 18:25:36 +00:00
parent 7f44596858
commit 64443c41f6
9 changed files with 216 additions and 11 deletions

View file

@ -21,6 +21,7 @@ type Config struct {
RetryPause int `json:"retry-pause"` // seconds between retries
Watch bool `json:"watch"` // hot-reload on file change
RateLimit RateLimit `json:"rate-limit"` // per-IP connection rate limit
sourcePath string
}
// BindAddr is one TCP listen endpoint.

View file

@ -59,6 +59,7 @@ func LoadConfig(path string) (*Config, Result) {
if err := json.Unmarshal(data, cfg); err != nil {
return nil, errorResult(err)
}
cfg.sourcePath = path
return cfg, cfg.Validate()
}

View file

@ -30,6 +30,7 @@ type Miner struct {
state MinerState
extAlgo bool // miner sent algo list in login params
extNH bool // NiceHash mode active (fixed byte splitting)
algoEnabled bool // proxy is configured to negotiate the algo extension
ip string // remote IP (without port, for logging)
localPort uint16
user string // login params.login (wallet address), custom diff suffix stripped

105
miner_login_test.go Normal file
View file

@ -0,0 +1,105 @@
package proxy
import (
"bufio"
"encoding/json"
"net"
"strings"
"testing"
)
func TestMiner_HandleLogin_Good(t *testing.T) {
minerConn, clientConn := net.Pipe()
defer minerConn.Close()
defer clientConn.Close()
miner := NewMiner(minerConn, 3333, nil)
miner.algoEnabled = true
miner.extNH = true
miner.fixedByte = 0x2a
miner.currentJob = Job{
Blob: strings.Repeat("0", 160),
JobID: "job-1",
Target: "b88d0600",
Algo: "cn/r",
Height: 7,
SeedHash: "seed",
}
params, err := json.Marshal(loginParams{
Login: "wallet",
Pass: "x",
Agent: "xmrig",
Algo: []string{"cn/r"},
RigID: "rig-1",
})
if err != nil {
t.Fatalf("marshal login params: %v", err)
}
done := make(chan struct{})
go func() {
miner.handleLogin(stratumRequest{ID: 1, Method: "login", Params: params})
close(done)
}()
line, err := bufio.NewReader(clientConn).ReadBytes('\n')
if err != nil {
t.Fatalf("read login response: %v", err)
}
<-done
var payload struct {
Result struct {
ID string `json:"id"`
Status string `json:"status"`
Extensions []string `json:"extensions"`
Job map[string]any `json:"job"`
} `json:"result"`
}
if err := json.Unmarshal(line, &payload); err != nil {
t.Fatalf("unmarshal login response: %v", err)
}
if payload.Result.Status != "OK" {
t.Fatalf("expected login success, got %q", payload.Result.Status)
}
if payload.Result.ID == "" {
t.Fatalf("expected rpc id in login response")
}
if len(payload.Result.Extensions) != 1 || payload.Result.Extensions[0] != "algo" {
t.Fatalf("expected algo extension, got %#v", payload.Result.Extensions)
}
if got := payload.Result.Job["job_id"]; got != "job-1" {
t.Fatalf("expected embedded job, got %#v", got)
}
if got := payload.Result.Job["algo"]; got != "cn/r" {
t.Fatalf("expected embedded algo, got %#v", got)
}
blob, _ := payload.Result.Job["blob"].(string)
if blob[78:80] != "2a" {
t.Fatalf("expected fixed-byte patched blob, got %q", blob[78:80])
}
if miner.State() != MinerStateReady {
t.Fatalf("expected miner ready after login reply with job, got %d", miner.State())
}
}
func TestProxy_New_Watch_Good(t *testing.T) {
cfg := &Config{
Mode: "nicehash",
Workers: WorkersByRigID,
Bind: []BindAddr{{Host: "127.0.0.1", Port: 3333}},
Pools: []PoolConfig{{URL: "pool.example:3333", Enabled: true}},
Watch: true,
sourcePath: "/tmp/proxy.json",
}
proxyInstance, result := New(cfg)
if !result.OK {
t.Fatalf("expected valid proxy, got error: %v", result.Error)
}
if proxyInstance.watcher == nil {
t.Fatalf("expected config watcher when watch is enabled and source path is known")
}
}

View file

@ -179,7 +179,7 @@ func (m *NonceMapper) Add(miner *proxy.Miner) bool {
job := m.storage.job
m.storage.mu.Unlock()
if job.IsValid() {
miner.ForwardJob(job, job.Algo)
miner.SetCurrentJob(job)
}
}
return ok

View file

@ -63,6 +63,9 @@ func (s *SimpleSplitter) OnLogin(event *proxy.LoginEvent) {
mapper.stopped = false
s.active[event.Miner.ID()] = mapper
event.Miner.SetRouteID(mapper.id)
if mapper.currentJob.IsValid() {
event.Miner.SetCurrentJob(mapper.currentJob)
}
return
}
}
@ -182,6 +185,7 @@ func (m *SimpleMapper) OnJob(job proxy.Job) {
return
}
m.mu.Lock()
m.currentJob = job
miner := m.miner
m.mu.Unlock()
if miner == nil {

View file

@ -0,0 +1,38 @@
package simple
import (
"testing"
"dappco.re/go/proxy"
"dappco.re/go/proxy/pool"
)
type activeStrategy struct{}
func (a activeStrategy) Connect() {}
func (a activeStrategy) Submit(string, string, string, string) int64 { return 0 }
func (a activeStrategy) Disconnect() {}
func (a activeStrategy) IsActive() bool { return true }
func TestSimpleSplitter_OnLogin_Good(t *testing.T) {
splitter := NewSimpleSplitter(&proxy.Config{ReuseTimeout: 30}, nil, func(listener pool.StratumListener) pool.Strategy {
return activeStrategy{}
})
miner := &proxy.Miner{}
job := proxy.Job{JobID: "job-1", Blob: "blob"}
mapper := &SimpleMapper{
id: 7,
strategy: activeStrategy{},
currentJob: job,
}
splitter.idle[mapper.id] = mapper
splitter.OnLogin(&proxy.LoginEvent{Miner: miner})
if miner.RouteID() != mapper.id {
t.Fatalf("expected reclaimed mapper route id %d, got %d", mapper.id, miner.RouteID())
}
if got := miner.CurrentJob().JobID; got != job.JobID {
t.Fatalf("expected current job to be restored on reuse, got %q", got)
}
}

View file

@ -14,12 +14,13 @@ import (
//
// m := simple.NewSimpleMapper(id, strategy)
type SimpleMapper struct {
id int64
miner *proxy.Miner // nil when idle
strategy pool.Strategy
idleAt time.Time // zero when active
stopped bool
events *proxy.EventBus
pending map[int64]*proxy.SubmitEvent
mu sync.Mutex
id int64
miner *proxy.Miner // nil when idle
currentJob proxy.Job
strategy pool.Strategy
idleAt time.Time // zero when active
stopped bool
events *proxy.EventBus
pending map[int64]*proxy.SubmitEvent
mu sync.Mutex
}

View file

@ -60,6 +60,9 @@ func New(cfg *Config) (*Proxy, Result) {
p.events.Subscribe(EventAccept, p.workers.OnAccept)
p.events.Subscribe(EventReject, p.stats.OnReject)
p.events.Subscribe(EventReject, p.workers.OnReject)
if cfg.Watch && cfg.sourcePath != "" {
p.watcher = NewConfigWatcher(cfg.sourcePath, p.Reload)
}
if factory, ok := getSplitterFactory(cfg.Mode); ok {
p.splitter = factory(cfg, p.events)
@ -161,6 +164,9 @@ func (p *Proxy) Start() {
if p.splitter != nil {
p.splitter.Connect()
}
if p.watcher != nil {
p.watcher.Start()
}
if p.config.HTTP.Enabled {
p.startHTTP()
}
@ -230,10 +236,12 @@ func (p *Proxy) Reload(cfg *Config) {
preservedBind := append([]BindAddr(nil), p.config.Bind...)
preservedMode := p.config.Mode
preservedWorkers := p.config.Workers
preservedSourcePath := p.config.sourcePath
*p.config = *cfg
p.config.Bind = preservedBind
p.config.Mode = preservedMode
p.config.Workers = preservedWorkers
p.config.sourcePath = preservedSourcePath
}
if p.customDiff != nil {
p.customDiff.globalDiff = cfg.CustomDiff
@ -251,6 +259,7 @@ func (p *Proxy) acceptMiner(conn net.Conn, localPort uint16) {
}
miner := NewMiner(conn, localPort, nil)
miner.accessPassword = p.config.AccessPassword
miner.algoEnabled = p.config.AlgoExtension
miner.globalDiff = p.config.CustomDiff
miner.extNH = strings.EqualFold(p.config.Mode, "nicehash")
miner.onLogin = func(m *Miner) {
@ -513,6 +522,10 @@ func (m *Miner) State() MinerState {
return m.state
}
func (m *Miner) supportsAlgoExtension() bool {
return m != nil && m.algoEnabled && m.extAlgo
}
// Start launches the read loop.
func (m *Miner) Start() {
if m == nil {
@ -627,7 +640,7 @@ func (m *Miner) handleLogin(req stratumRequest) {
if m.onLogin != nil {
m.onLogin(m)
}
m.Success(requestID(req.ID), "OK")
m.replyLoginSuccess(requestID(req.ID))
}
func parseLoginUser(login string, globalDiff uint64) (string, uint64) {
@ -737,18 +750,59 @@ func (m *Miner) ForwardJob(job Job, algo string) {
"blob": blob,
"job_id": job.JobID,
"target": job.Target,
"algo": algo,
"id": m.rpcID,
"height": job.Height,
"seed_hash": job.SeedHash,
},
}
if m.supportsAlgoExtension() && algo != "" {
payload["params"].(map[string]any)["algo"] = algo
}
_ = m.writeJSON(payload)
if m.state == MinerStateWaitReady {
m.state = MinerStateReady
}
}
func (m *Miner) replyLoginSuccess(id int64) {
if m == nil {
return
}
result := map[string]any{
"id": m.rpcID,
"status": "OK",
}
if m.supportsAlgoExtension() {
result["extensions"] = []string{"algo"}
}
if job := m.CurrentJob(); job.IsValid() {
blob := job.Blob
if m.extNH {
blob = job.BlobWithFixedByte(m.fixedByte)
}
jobPayload := map[string]any{
"blob": blob,
"job_id": job.JobID,
"target": job.Target,
"id": m.rpcID,
"height": job.Height,
"seed_hash": job.SeedHash,
}
if m.supportsAlgoExtension() && job.Algo != "" {
jobPayload["algo"] = job.Algo
}
result["job"] = jobPayload
m.state = MinerStateReady
}
payload := map[string]any{
"id": id,
"jsonrpc": "2.0",
"error": nil,
"result": result,
}
_ = m.writeJSON(payload)
}
func (m *Miner) ReplyWithError(id int64, message string) {
if m == nil {
return