From 6f4d7019e20fed1e1eef9f19d0ec5a24d3acccb6 Mon Sep 17 00:00:00 2001 From: Virgil Date: Sat, 4 Apr 2026 18:38:28 +0000 Subject: [PATCH] fix(proxy): align runtime with RFC Co-Authored-By: Virgil --- splitter/nicehash/impl.go | 20 +++++- splitter/nicehash/mapper.go | 1 + splitter/simple/impl.go | 31 ++++++--- splitter/simple/mapper.go | 7 +- state_impl.go | 132 +++++++++++++++++++++++++++++++++--- 5 files changed, 167 insertions(+), 24 deletions(-) diff --git a/splitter/nicehash/impl.go b/splitter/nicehash/impl.go index 88da577..c88b90b 100644 --- a/splitter/nicehash/impl.go +++ b/splitter/nicehash/impl.go @@ -216,7 +216,12 @@ func (m *NonceMapper) Submit(event *proxy.SubmitEvent) { return } seq := m.strategy.Submit(jobID, event.Nonce, event.Result, event.Algo) - m.pending[seq] = SubmitContext{RequestID: event.RequestID, MinerID: event.Miner.ID(), JobID: jobID} + m.pending[seq] = SubmitContext{ + RequestID: event.RequestID, + MinerID: event.Miner.ID(), + JobID: jobID, + StartedAt: time.Now(), + } m.lastUsed = time.Now() } @@ -263,16 +268,25 @@ func (m *NonceMapper) OnResultAccepted(sequence int64, accepted bool, errorMessa if !ok || miner == nil { return } + latency := uint16(0) + if !ctx.StartedAt.IsZero() { + elapsed := time.Since(ctx.StartedAt).Milliseconds() + if elapsed > int64(^uint16(0)) { + latency = ^uint16(0) + } else { + latency = uint16(elapsed) + } + } if accepted { miner.Success(ctx.RequestID, "OK") if m.events != nil { - m.events.Dispatch(proxy.Event{Type: proxy.EventAccept, Miner: miner, Job: &job, Diff: job.DifficultyFromTarget(), Latency: 0, Expired: expired}) + m.events.Dispatch(proxy.Event{Type: proxy.EventAccept, Miner: miner, Job: &job, Diff: job.DifficultyFromTarget(), Latency: latency, Expired: expired}) } return } miner.ReplyWithError(ctx.RequestID, errorMessage) if m.events != nil { - m.events.Dispatch(proxy.Event{Type: proxy.EventReject, Miner: miner, Job: &job, Diff: job.DifficultyFromTarget(), Error: errorMessage}) + m.events.Dispatch(proxy.Event{Type: proxy.EventReject, Miner: miner, Job: &job, Diff: job.DifficultyFromTarget(), Error: errorMessage, Latency: latency}) } } diff --git a/splitter/nicehash/mapper.go b/splitter/nicehash/mapper.go index 5636c05..8bfd0c3 100644 --- a/splitter/nicehash/mapper.go +++ b/splitter/nicehash/mapper.go @@ -33,4 +33,5 @@ type SubmitContext struct { RequestID int64 // JSON-RPC id from the miner's submit request MinerID int64 // miner that submitted JobID string + StartedAt time.Time } diff --git a/splitter/simple/impl.go b/splitter/simple/impl.go index d110a40..19e79f3 100644 --- a/splitter/simple/impl.go +++ b/splitter/simple/impl.go @@ -136,8 +136,10 @@ func (s *SimpleSplitter) GC() { } } -// Tick is a no-op for simple mode. -func (s *SimpleSplitter) Tick(ticks uint64) {} +// Tick advances timeout checks in simple mode. +func (s *SimpleSplitter) Tick(ticks uint64) { + s.GC() +} // Upstreams returns active/idle/error counts. func (s *SimpleSplitter) Upstreams() proxy.UpstreamStats { @@ -159,7 +161,7 @@ func (s *SimpleSplitter) newMapperLocked() *SimpleMapper { mapper := &SimpleMapper{ id: id, events: s.events, - pending: make(map[int64]*proxy.SubmitEvent), + pending: make(map[int64]submitContext), } mapper.strategy = s.factory(mapper) if mapper.strategy == nil { @@ -176,7 +178,7 @@ func (m *SimpleMapper) Submit(event *proxy.SubmitEvent) { m.mu.Lock() defer m.mu.Unlock() seq := m.strategy.Submit(event.JobID, event.Nonce, event.Result, event.Algo) - m.pending[seq] = event + m.pending[seq] = submitContext{RequestID: event.RequestID, StartedAt: time.Now()} } // OnJob forwards the latest pool job to the active miner. @@ -200,25 +202,36 @@ func (m *SimpleMapper) OnResultAccepted(sequence int64, accepted bool, errorMess return } m.mu.Lock() - ctx := m.pending[sequence] - delete(m.pending, sequence) + ctx, ok := m.pending[sequence] + if ok { + delete(m.pending, sequence) + } miner := m.miner m.mu.Unlock() - if ctx == nil || miner == nil { + if !ok || miner == nil { return } + latency := uint16(0) + if !ctx.StartedAt.IsZero() { + elapsed := time.Since(ctx.StartedAt).Milliseconds() + if elapsed > int64(^uint16(0)) { + latency = ^uint16(0) + } else { + latency = uint16(elapsed) + } + } if accepted { miner.Success(ctx.RequestID, "OK") if m.events != nil { job := miner.CurrentJob() - m.events.Dispatch(proxy.Event{Type: proxy.EventAccept, Miner: miner, Diff: job.DifficultyFromTarget(), Job: &job}) + m.events.Dispatch(proxy.Event{Type: proxy.EventAccept, Miner: miner, Diff: job.DifficultyFromTarget(), Job: &job, Latency: latency}) } return } miner.ReplyWithError(ctx.RequestID, errorMessage) if m.events != nil { job := miner.CurrentJob() - m.events.Dispatch(proxy.Event{Type: proxy.EventReject, Miner: miner, Diff: job.DifficultyFromTarget(), Job: &job, Error: errorMessage}) + m.events.Dispatch(proxy.Event{Type: proxy.EventReject, Miner: miner, Diff: job.DifficultyFromTarget(), Job: &job, Error: errorMessage, Latency: latency}) } } diff --git a/splitter/simple/mapper.go b/splitter/simple/mapper.go index 212ccdf..c3b3e40 100644 --- a/splitter/simple/mapper.go +++ b/splitter/simple/mapper.go @@ -21,6 +21,11 @@ type SimpleMapper struct { idleAt time.Time // zero when active stopped bool events *proxy.EventBus - pending map[int64]*proxy.SubmitEvent + pending map[int64]submitContext mu sync.Mutex } + +type submitContext struct { + RequestID int64 + StartedAt time.Time +} diff --git a/state_impl.go b/state_impl.go index 5a5541f..f528051 100644 --- a/state_impl.go +++ b/state_impl.go @@ -6,7 +6,6 @@ import ( "crypto/tls" "encoding/json" "errors" - "io" "net" "net/http" "sort" @@ -120,7 +119,7 @@ func (p *Proxy) MinerSnapshots() []MinerSnapshot { TX: miner.tx, RX: miner.rx, State: miner.state, - Diff: miner.customDiff, + Diff: miner.diff, User: miner.user, Password: "********", RigID: miner.rigID, @@ -147,6 +146,14 @@ func (p *Proxy) Upstreams() UpstreamStats { return p.splitter.Upstreams() } +// Events returns the proxy event bus for external composition. +func (p *Proxy) Events() *EventBus { + if p == nil { + return nil + } + return p.events +} + // Start starts the TCP listeners, ticker loop, and optional HTTP API. func (p *Proxy) Start() { if p == nil { @@ -300,7 +307,100 @@ func buildTLSConfig(cfg TLSConfig) *tls.Config { if err != nil { return nil } - return &tls.Config{Certificates: []tls.Certificate{cert}} + tlsConfig := &tls.Config{Certificates: []tls.Certificate{cert}} + applyTLSProtocols(tlsConfig, cfg.Protocols) + applyTLSCiphers(tlsConfig, cfg.Ciphers) + return tlsConfig +} + +func applyTLSProtocols(tlsConfig *tls.Config, protocols string) { + if tlsConfig == nil || strings.TrimSpace(protocols) == "" { + return + } + parts := splitTLSConfigList(protocols) + minVersion := uint16(0) + maxVersion := uint16(0) + for _, part := range parts { + if part == "" { + continue + } + if strings.Contains(part, "-") { + bounds := strings.SplitN(part, "-", 2) + low := parseTLSVersion(bounds[0]) + high := parseTLSVersion(bounds[1]) + if low == 0 || high == 0 { + continue + } + if minVersion == 0 || low < minVersion { + minVersion = low + } + if high > maxVersion { + maxVersion = high + } + continue + } + version := parseTLSVersion(part) + if version == 0 { + continue + } + if minVersion == 0 || version < minVersion { + minVersion = version + } + if version > maxVersion { + maxVersion = version + } + } + if minVersion != 0 { + tlsConfig.MinVersion = minVersion + } + if maxVersion != 0 { + tlsConfig.MaxVersion = maxVersion + } +} + +func applyTLSCiphers(tlsConfig *tls.Config, ciphers string) { + if tlsConfig == nil || strings.TrimSpace(ciphers) == "" { + return + } + allowed := map[string]uint16{} + for _, suite := range tls.CipherSuites() { + allowed[strings.ToLower(suite.Name)] = suite.ID + } + for _, suite := range tls.InsecureCipherSuites() { + allowed[strings.ToLower(suite.Name)] = suite.ID + } + parts := splitTLSConfigList(ciphers) + for _, part := range parts { + if id, ok := allowed[strings.ToLower(part)]; ok { + tlsConfig.CipherSuites = append(tlsConfig.CipherSuites, id) + } + } +} + +func splitTLSConfigList(value string) []string { + return strings.FieldsFunc(value, func(r rune) bool { + switch r { + case ',', ';', ':', '|', ' ': + return true + default: + return false + } + }) +} + +func parseTLSVersion(value string) uint16 { + switch strings.ToLower(strings.TrimSpace(value)) { + case "tls1.0", "tlsv1.0", "tls1", "tlsv1", "1.0", "1", "tls10", "tlsv10": + return tls.VersionTLS10 + case "tls1.1", "tlsv1.1", "1.1", "tls11", "tlsv11": + return tls.VersionTLS11 + case "tls1.2", "tlsv1.2", "1.2", "tls12", "tlsv12": + return tls.VersionTLS12 + case "tls1.3", "tlsv1.3", "1.3", "tls13", "tlsv13": + return tls.VersionTLS13 + default: + return 0 + } } func (p *Proxy) startHTTP() { @@ -551,12 +651,7 @@ func (m *Miner) readLoop() { } line, isPrefix, err := reader.ReadLine() if err != nil { - if ne, ok := err.(net.Error); ok && ne.Timeout() { - return - } - if err != io.EOF { - return - } + m.Close() return } if isPrefix { @@ -569,6 +664,7 @@ func (m *Miner) readLoop() { m.rx += uint64(len(line) + 1) m.lastActivityAt = time.Now().UTC() if !m.handleLine(line) { + m.Close() return } } @@ -695,11 +791,11 @@ func (m *Miner) handleSubmit(req stratumRequest) { RequestID: requestID(req.ID), }) } - m.lastActivityAt = time.Now().UTC() + m.touchActivity() } func (m *Miner) handleKeepalived(req stratumRequest) { - m.lastActivityAt = time.Now().UTC() + m.touchActivity() m.Success(requestID(req.ID), "KEEPALIVED") } @@ -736,6 +832,7 @@ func (m *Miner) ForwardJob(job Job, algo string) { return } m.currentJob = job + m.diff = job.DifficultyFromTarget() if algo == "" { algo = job.Algo } @@ -759,6 +856,7 @@ func (m *Miner) ForwardJob(job Job, algo string) { payload["params"].(map[string]any)["algo"] = algo } _ = m.writeJSON(payload) + m.touchActivity() if m.state == MinerStateWaitReady { m.state = MinerStateReady } @@ -780,6 +878,7 @@ func (m *Miner) replyLoginSuccess(id int64) { if m.extNH { blob = job.BlobWithFixedByte(m.fixedByte) } + m.diff = job.DifficultyFromTarget() jobPayload := map[string]any{ "blob": blob, "job_id": job.JobID, @@ -792,6 +891,7 @@ func (m *Miner) replyLoginSuccess(id int64) { jobPayload["algo"] = job.Algo } result["job"] = jobPayload + m.touchActivity() m.state = MinerStateReady } payload := map[string]any{ @@ -833,6 +933,16 @@ func (m *Miner) Success(id int64, status string) { _ = m.writeJSON(payload) } +func (m *Miner) touchActivity() { + if m == nil { + return + } + m.lastActivityAt = time.Now().UTC() + if m.conn != nil { + _ = m.conn.SetReadDeadline(time.Now().Add(600 * time.Second)) + } +} + func (m *Miner) writeJSON(payload any) error { m.sendMu.Lock() defer m.sendMu.Unlock()