From 64443c41f61d1af09009196fbaeed5c71b857b87 Mon Sep 17 00:00:00 2001 From: Virgil Date: Sat, 4 Apr 2026 18:25:36 +0000 Subject: [PATCH] feat(proxy): fill RFC login and watch gaps Co-Authored-By: Virgil --- config.go | 1 + core_impl.go | 1 + miner.go | 1 + miner_login_test.go | 105 +++++++++++++++++++++++++++++++++++ splitter/nicehash/impl.go | 2 +- splitter/simple/impl.go | 4 ++ splitter/simple/impl_test.go | 38 +++++++++++++ splitter/simple/mapper.go | 17 +++--- state_impl.go | 58 ++++++++++++++++++- 9 files changed, 216 insertions(+), 11 deletions(-) create mode 100644 miner_login_test.go create mode 100644 splitter/simple/impl_test.go diff --git a/config.go b/config.go index 9de0349..2a31c86 100644 --- a/config.go +++ b/config.go @@ -21,6 +21,7 @@ type Config struct { RetryPause int `json:"retry-pause"` // seconds between retries Watch bool `json:"watch"` // hot-reload on file change RateLimit RateLimit `json:"rate-limit"` // per-IP connection rate limit + sourcePath string } // BindAddr is one TCP listen endpoint. diff --git a/core_impl.go b/core_impl.go index 2a86054..6943672 100644 --- a/core_impl.go +++ b/core_impl.go @@ -59,6 +59,7 @@ func LoadConfig(path string) (*Config, Result) { if err := json.Unmarshal(data, cfg); err != nil { return nil, errorResult(err) } + cfg.sourcePath = path return cfg, cfg.Validate() } diff --git a/miner.go b/miner.go index a5b9b53..d8384fe 100644 --- a/miner.go +++ b/miner.go @@ -30,6 +30,7 @@ type Miner struct { state MinerState extAlgo bool // miner sent algo list in login params extNH bool // NiceHash mode active (fixed byte splitting) + algoEnabled bool // proxy is configured to negotiate the algo extension ip string // remote IP (without port, for logging) localPort uint16 user string // login params.login (wallet address), custom diff suffix stripped diff --git a/miner_login_test.go b/miner_login_test.go new file mode 100644 index 0000000..17dbc9f --- /dev/null +++ b/miner_login_test.go @@ -0,0 +1,105 @@ +package proxy + +import ( + "bufio" + "encoding/json" + "net" + "strings" + "testing" +) + +func TestMiner_HandleLogin_Good(t *testing.T) { + minerConn, clientConn := net.Pipe() + defer minerConn.Close() + defer clientConn.Close() + + miner := NewMiner(minerConn, 3333, nil) + miner.algoEnabled = true + miner.extNH = true + miner.fixedByte = 0x2a + miner.currentJob = Job{ + Blob: strings.Repeat("0", 160), + JobID: "job-1", + Target: "b88d0600", + Algo: "cn/r", + Height: 7, + SeedHash: "seed", + } + + params, err := json.Marshal(loginParams{ + Login: "wallet", + Pass: "x", + Agent: "xmrig", + Algo: []string{"cn/r"}, + RigID: "rig-1", + }) + if err != nil { + t.Fatalf("marshal login params: %v", err) + } + + done := make(chan struct{}) + go func() { + miner.handleLogin(stratumRequest{ID: 1, Method: "login", Params: params}) + close(done) + }() + + line, err := bufio.NewReader(clientConn).ReadBytes('\n') + if err != nil { + t.Fatalf("read login response: %v", err) + } + <-done + + var payload struct { + Result struct { + ID string `json:"id"` + Status string `json:"status"` + Extensions []string `json:"extensions"` + Job map[string]any `json:"job"` + } `json:"result"` + } + if err := json.Unmarshal(line, &payload); err != nil { + t.Fatalf("unmarshal login response: %v", err) + } + + if payload.Result.Status != "OK" { + t.Fatalf("expected login success, got %q", payload.Result.Status) + } + if payload.Result.ID == "" { + t.Fatalf("expected rpc id in login response") + } + if len(payload.Result.Extensions) != 1 || payload.Result.Extensions[0] != "algo" { + t.Fatalf("expected algo extension, got %#v", payload.Result.Extensions) + } + if got := payload.Result.Job["job_id"]; got != "job-1" { + t.Fatalf("expected embedded job, got %#v", got) + } + if got := payload.Result.Job["algo"]; got != "cn/r" { + t.Fatalf("expected embedded algo, got %#v", got) + } + blob, _ := payload.Result.Job["blob"].(string) + if blob[78:80] != "2a" { + t.Fatalf("expected fixed-byte patched blob, got %q", blob[78:80]) + } + if miner.State() != MinerStateReady { + t.Fatalf("expected miner ready after login reply with job, got %d", miner.State()) + } +} + +func TestProxy_New_Watch_Good(t *testing.T) { + cfg := &Config{ + Mode: "nicehash", + Workers: WorkersByRigID, + Bind: []BindAddr{{Host: "127.0.0.1", Port: 3333}}, + Pools: []PoolConfig{{URL: "pool.example:3333", Enabled: true}}, + Watch: true, + sourcePath: "/tmp/proxy.json", + } + + proxyInstance, result := New(cfg) + if !result.OK { + t.Fatalf("expected valid proxy, got error: %v", result.Error) + } + if proxyInstance.watcher == nil { + t.Fatalf("expected config watcher when watch is enabled and source path is known") + } +} diff --git a/splitter/nicehash/impl.go b/splitter/nicehash/impl.go index b1211f6..88da577 100644 --- a/splitter/nicehash/impl.go +++ b/splitter/nicehash/impl.go @@ -179,7 +179,7 @@ func (m *NonceMapper) Add(miner *proxy.Miner) bool { job := m.storage.job m.storage.mu.Unlock() if job.IsValid() { - miner.ForwardJob(job, job.Algo) + miner.SetCurrentJob(job) } } return ok diff --git a/splitter/simple/impl.go b/splitter/simple/impl.go index 759d993..d110a40 100644 --- a/splitter/simple/impl.go +++ b/splitter/simple/impl.go @@ -63,6 +63,9 @@ func (s *SimpleSplitter) OnLogin(event *proxy.LoginEvent) { mapper.stopped = false s.active[event.Miner.ID()] = mapper event.Miner.SetRouteID(mapper.id) + if mapper.currentJob.IsValid() { + event.Miner.SetCurrentJob(mapper.currentJob) + } return } } @@ -182,6 +185,7 @@ func (m *SimpleMapper) OnJob(job proxy.Job) { return } m.mu.Lock() + m.currentJob = job miner := m.miner m.mu.Unlock() if miner == nil { diff --git a/splitter/simple/impl_test.go b/splitter/simple/impl_test.go new file mode 100644 index 0000000..78e3557 --- /dev/null +++ b/splitter/simple/impl_test.go @@ -0,0 +1,38 @@ +package simple + +import ( + "testing" + + "dappco.re/go/proxy" + "dappco.re/go/proxy/pool" +) + +type activeStrategy struct{} + +func (a activeStrategy) Connect() {} +func (a activeStrategy) Submit(string, string, string, string) int64 { return 0 } +func (a activeStrategy) Disconnect() {} +func (a activeStrategy) IsActive() bool { return true } + +func TestSimpleSplitter_OnLogin_Good(t *testing.T) { + splitter := NewSimpleSplitter(&proxy.Config{ReuseTimeout: 30}, nil, func(listener pool.StratumListener) pool.Strategy { + return activeStrategy{} + }) + miner := &proxy.Miner{} + job := proxy.Job{JobID: "job-1", Blob: "blob"} + mapper := &SimpleMapper{ + id: 7, + strategy: activeStrategy{}, + currentJob: job, + } + splitter.idle[mapper.id] = mapper + + splitter.OnLogin(&proxy.LoginEvent{Miner: miner}) + + if miner.RouteID() != mapper.id { + t.Fatalf("expected reclaimed mapper route id %d, got %d", mapper.id, miner.RouteID()) + } + if got := miner.CurrentJob().JobID; got != job.JobID { + t.Fatalf("expected current job to be restored on reuse, got %q", got) + } +} diff --git a/splitter/simple/mapper.go b/splitter/simple/mapper.go index b9c16d7..212ccdf 100644 --- a/splitter/simple/mapper.go +++ b/splitter/simple/mapper.go @@ -14,12 +14,13 @@ import ( // // m := simple.NewSimpleMapper(id, strategy) type SimpleMapper struct { - id int64 - miner *proxy.Miner // nil when idle - strategy pool.Strategy - idleAt time.Time // zero when active - stopped bool - events *proxy.EventBus - pending map[int64]*proxy.SubmitEvent - mu sync.Mutex + id int64 + miner *proxy.Miner // nil when idle + currentJob proxy.Job + strategy pool.Strategy + idleAt time.Time // zero when active + stopped bool + events *proxy.EventBus + pending map[int64]*proxy.SubmitEvent + mu sync.Mutex } diff --git a/state_impl.go b/state_impl.go index 7393db2..5a5541f 100644 --- a/state_impl.go +++ b/state_impl.go @@ -60,6 +60,9 @@ func New(cfg *Config) (*Proxy, Result) { p.events.Subscribe(EventAccept, p.workers.OnAccept) p.events.Subscribe(EventReject, p.stats.OnReject) p.events.Subscribe(EventReject, p.workers.OnReject) + if cfg.Watch && cfg.sourcePath != "" { + p.watcher = NewConfigWatcher(cfg.sourcePath, p.Reload) + } if factory, ok := getSplitterFactory(cfg.Mode); ok { p.splitter = factory(cfg, p.events) @@ -161,6 +164,9 @@ func (p *Proxy) Start() { if p.splitter != nil { p.splitter.Connect() } + if p.watcher != nil { + p.watcher.Start() + } if p.config.HTTP.Enabled { p.startHTTP() } @@ -230,10 +236,12 @@ func (p *Proxy) Reload(cfg *Config) { preservedBind := append([]BindAddr(nil), p.config.Bind...) preservedMode := p.config.Mode preservedWorkers := p.config.Workers + preservedSourcePath := p.config.sourcePath *p.config = *cfg p.config.Bind = preservedBind p.config.Mode = preservedMode p.config.Workers = preservedWorkers + p.config.sourcePath = preservedSourcePath } if p.customDiff != nil { p.customDiff.globalDiff = cfg.CustomDiff @@ -251,6 +259,7 @@ func (p *Proxy) acceptMiner(conn net.Conn, localPort uint16) { } miner := NewMiner(conn, localPort, nil) miner.accessPassword = p.config.AccessPassword + miner.algoEnabled = p.config.AlgoExtension miner.globalDiff = p.config.CustomDiff miner.extNH = strings.EqualFold(p.config.Mode, "nicehash") miner.onLogin = func(m *Miner) { @@ -513,6 +522,10 @@ func (m *Miner) State() MinerState { return m.state } +func (m *Miner) supportsAlgoExtension() bool { + return m != nil && m.algoEnabled && m.extAlgo +} + // Start launches the read loop. func (m *Miner) Start() { if m == nil { @@ -627,7 +640,7 @@ func (m *Miner) handleLogin(req stratumRequest) { if m.onLogin != nil { m.onLogin(m) } - m.Success(requestID(req.ID), "OK") + m.replyLoginSuccess(requestID(req.ID)) } func parseLoginUser(login string, globalDiff uint64) (string, uint64) { @@ -737,18 +750,59 @@ func (m *Miner) ForwardJob(job Job, algo string) { "blob": blob, "job_id": job.JobID, "target": job.Target, - "algo": algo, "id": m.rpcID, "height": job.Height, "seed_hash": job.SeedHash, }, } + if m.supportsAlgoExtension() && algo != "" { + payload["params"].(map[string]any)["algo"] = algo + } _ = m.writeJSON(payload) if m.state == MinerStateWaitReady { m.state = MinerStateReady } } +func (m *Miner) replyLoginSuccess(id int64) { + if m == nil { + return + } + result := map[string]any{ + "id": m.rpcID, + "status": "OK", + } + if m.supportsAlgoExtension() { + result["extensions"] = []string{"algo"} + } + if job := m.CurrentJob(); job.IsValid() { + blob := job.Blob + if m.extNH { + blob = job.BlobWithFixedByte(m.fixedByte) + } + jobPayload := map[string]any{ + "blob": blob, + "job_id": job.JobID, + "target": job.Target, + "id": m.rpcID, + "height": job.Height, + "seed_hash": job.SeedHash, + } + if m.supportsAlgoExtension() && job.Algo != "" { + jobPayload["algo"] = job.Algo + } + result["job"] = jobPayload + m.state = MinerStateReady + } + payload := map[string]any{ + "id": id, + "jsonrpc": "2.0", + "error": nil, + "result": result, + } + _ = m.writeJSON(payload) +} + func (m *Miner) ReplyWithError(id int64, message string) { if m == nil { return