From 7d2d30952990c909b89c97f6fe39bd555d74ccae Mon Sep 17 00:00:00 2001 From: Virgil Date: Sat, 4 Apr 2026 10:52:30 +0000 Subject: [PATCH] feat(proxy): close RFC gaps Co-Authored-By: Virgil --- api/router.go | 33 +++++++++++++++++ miner.go | 1 + miner_methods.go | 45 ++++++++++++++++-------- miner_runtime.go | 49 +++++++++++++++++++++----- pool/client.go | 2 ++ pool/strategy.go | 8 +++-- proxy.go | 25 +++++++------ proxy_runtime.go | 45 ++++++++++++++++++------ runtime_support.go | 49 +++++++++++++++++++++++--- splitter/nicehash/mapper.go | 19 +++++++--- splitter/nicehash/storage.go | 19 +++++++++- splitter/simple/mapper.go | 68 +++++++++++++++++++++++++++--------- splitter/simple/splitter.go | 11 +++++- 13 files changed, 302 insertions(+), 72 deletions(-) diff --git a/api/router.go b/api/router.go index 92b2854..f300172 100644 --- a/api/router.go +++ b/api/router.go @@ -12,6 +12,7 @@ package api import ( "encoding/json" "net/http" + "strings" "dappco.re/go/core/proxy" ) @@ -86,6 +87,9 @@ func RegisterRoutes(router Router, proxyValue *proxy.Proxy) { } router.HandleFunc("/1/summary", func(writer http.ResponseWriter, request *http.Request) { + if !allowRequest(writer, request, proxyValue.HTTPConfig()) { + return + } summary := proxyValue.Summary() response := SummaryResponse{ Version: "1.0.0", @@ -127,6 +131,9 @@ func RegisterRoutes(router Router, proxyValue *proxy.Proxy) { }) router.HandleFunc("/1/workers", func(writer http.ResponseWriter, request *http.Request) { + if !allowRequest(writer, request, proxyValue.HTTPConfig()) { + return + } type responseBody struct { Mode string `json:"mode"` Workers [][]interface{} `json:"workers"` @@ -159,6 +166,9 @@ func RegisterRoutes(router Router, proxyValue *proxy.Proxy) { }) router.HandleFunc("/1/miners", func(writer http.ResponseWriter, request *http.Request) { + if !allowRequest(writer, request, proxyValue.HTTPConfig()) { + return + } miners := proxyValue.Miners() rows := make([][]interface{}, 0, len(miners)) for _, miner := range miners { @@ -187,6 +197,29 @@ func RegisterRoutes(router Router, proxyValue *proxy.Proxy) { }) } +func allowRequest(writer http.ResponseWriter, request *http.Request, config proxy.HTTPConfig) bool { + if request == nil { + return false + } + + if config.AccessToken != "" { + header := request.Header.Get("Authorization") + prefix := "Bearer " + if !strings.HasPrefix(header, prefix) || strings.TrimSpace(strings.TrimPrefix(header, prefix)) != config.AccessToken { + writer.Header().Set("WWW-Authenticate", "Bearer") + http.Error(writer, http.StatusText(http.StatusUnauthorized), http.StatusUnauthorized) + return false + } + } + + if config.Restricted && request.Method != http.MethodGet { + http.Error(writer, http.StatusText(http.StatusMethodNotAllowed), http.StatusMethodNotAllowed) + return false + } + + return true +} + func writeJSON(writer http.ResponseWriter, value interface{}) { writer.Header().Set("Content-Type", "application/json") _ = json.NewEncoder(writer).Encode(value) diff --git a/miner.go b/miner.go index 0e43d31..5bb09f5 100644 --- a/miner.go +++ b/miner.go @@ -28,6 +28,7 @@ type Miner struct { id int64 // monotonically increasing per-process; atomic assignment rpcID string // UUID v4 sent to miner as session id state MinerState + stateMu sync.RWMutex extAlgo bool // miner sent algo list in login params extNH bool // NiceHash mode active (fixed byte splitting) ip string // remote IP (without port, for logging) diff --git a/miner_methods.go b/miner_methods.go index 72ce381..c8bd47b 100644 --- a/miner_methods.go +++ b/miner_methods.go @@ -37,14 +37,23 @@ func NewMiner(conn net.Conn, localPort uint16, tlsCfg *tls.Config) *Miner { return miner } -func (m *Miner) ID() int64 { return m.id } -func (m *Miner) RPCID() string { return m.rpcID } -func (m *Miner) User() string { return m.user } -func (m *Miner) Password() string { return m.password } -func (m *Miner) Agent() string { return m.agent } -func (m *Miner) RigID() string { return m.rigID } -func (m *Miner) IP() string { return m.ip } -func (m *Miner) State() MinerState { return m.state } +func (m *Miner) ID() int64 { return m.id } +func (m *Miner) RPCID() string { return m.rpcID } +func (m *Miner) User() string { return m.user } +func (m *Miner) Password() string { return m.password } +func (m *Miner) Agent() string { return m.agent } +func (m *Miner) RigID() string { return m.rigID } +func (m *Miner) IP() string { return m.ip } +func (m *Miner) State() MinerState { + if m == nil { + return MinerStateClosing + } + + m.stateMu.RLock() + state := m.state + m.stateMu.RUnlock() + return state +} func (m *Miner) Diff() uint64 { return m.diff } func (m *Miner) FixedByte() uint8 { return m.fixedByte } func (m *Miner) MapperID() int64 { return m.mapperID } @@ -54,12 +63,20 @@ func (m *Miner) TX() uint64 { return m.tx } func (m *Miner) RX() uint64 { return m.rx } func (m *Miner) LastActivityAt() time.Time { return m.lastActivityAt } -func (m *Miner) SetRPCID(value string) { m.rpcID = value } -func (m *Miner) SetUser(value string) { m.user = value } -func (m *Miner) SetPassword(value string) { m.password = value } -func (m *Miner) SetAgent(value string) { m.agent = value } -func (m *Miner) SetRigID(value string) { m.rigID = value } -func (m *Miner) SetState(value MinerState) { m.state = value } +func (m *Miner) SetRPCID(value string) { m.rpcID = value } +func (m *Miner) SetUser(value string) { m.user = value } +func (m *Miner) SetPassword(value string) { m.password = value } +func (m *Miner) SetAgent(value string) { m.agent = value } +func (m *Miner) SetRigID(value string) { m.rigID = value } +func (m *Miner) SetState(value MinerState) { + if m == nil { + return + } + + m.stateMu.Lock() + m.state = value + m.stateMu.Unlock() +} func (m *Miner) SetDiff(value uint64) { m.diff = value } func (m *Miner) SetFixedByte(value uint8) { m.fixedByte = value } func (m *Miner) SetMapperID(value int64) { m.mapperID = value } diff --git a/miner_runtime.go b/miner_runtime.go index 8f0c0d9..bad544d 100644 --- a/miner_runtime.go +++ b/miner_runtime.go @@ -7,6 +7,7 @@ import ( "encoding/json" "net" "strings" + "time" ) type minerRequest struct { @@ -26,6 +27,10 @@ func (m *Miner) Start() { go func() { reader := bufio.NewReaderSize(m.conn, len(m.buf)) for { + if errorValue := m.applyReadDeadline(); errorValue != nil { + m.Close() + return + } line, isPrefix, errorValue := reader.ReadLine() if errorValue != nil { m.Close() @@ -59,7 +64,7 @@ func (m *Miner) ForwardJob(job Job, algo string) { } m.diff = job.DifficultyFromTarget() - m.state = MinerStateReady + m.SetState(MinerStateReady) jobCopy := job m.currentJob = &jobCopy m.Touch() @@ -118,7 +123,7 @@ func (m *Miner) Close() { } m.closeOnce.Do(func() { - m.state = MinerStateClosing + m.SetState(MinerStateClosing) if m.events != nil { m.events.Dispatch(Event{Type: EventClose, Miner: m}) } @@ -208,7 +213,7 @@ func (m *Miner) handleLogin(request minerRequest) { m.events.Dispatch(Event{Type: EventLogin, Miner: m}) } - if m.state == MinerStateClosing { + if m.State() == MinerStateClosing { return } @@ -231,9 +236,9 @@ func (m *Miner) handleLogin(request minerRequest) { if m.extAlgo { result["extensions"] = []string{"algo"} } - m.state = MinerStateReady + m.SetState(MinerStateReady) } else { - m.state = MinerStateWaitReady + m.SetState(MinerStateWaitReady) if m.extAlgo { result["extensions"] = []string{"algo"} } @@ -248,7 +253,7 @@ func (m *Miner) handleLogin(request minerRequest) { } func (m *Miner) handleSubmit(request minerRequest) { - if m.state != MinerStateReady { + if m.State() != MinerStateReady { m.ReplyWithError(request.ID, "Unauthenticated") return } @@ -311,6 +316,34 @@ func (m *Miner) currentJobCopy() *Job { return &jobCopy } +func (m *Miner) applyReadDeadline() error { + if m == nil || m.conn == nil { + return nil + } + + deadline := m.readDeadline() + if deadline.IsZero() { + return nil + } + + return m.conn.SetReadDeadline(deadline) +} + +func (m *Miner) readDeadline() time.Time { + if m == nil { + return time.Time{} + } + + switch m.State() { + case MinerStateWaitLogin: + return m.lastActivityAt.Add(10 * time.Second) + case MinerStateWaitReady, MinerStateReady: + return m.lastActivityAt.Add(600 * time.Second) + default: + return time.Time{} + } +} + func (m *Miner) dispatchSubmitResult(eventType EventType, diff uint64, errorMessage string, requestID int64) { if m == nil || m.events == nil { return @@ -334,11 +367,11 @@ func (m *Miner) dispatchSubmitResult(eventType EventType, diff uint64, errorMess func (m *Miner) setStateFromJob(job Job) { m.currentJob = &job - m.state = MinerStateReady + m.SetState(MinerStateReady) } func (m *Miner) Expire() { - if m == nil || m.state == MinerStateClosing { + if m == nil || m.State() == MinerStateClosing { return } m.Close() diff --git a/pool/client.go b/pool/client.go index 20f73d6..0c74e6b 100644 --- a/pool/client.go +++ b/pool/client.go @@ -217,6 +217,7 @@ func (c *StratumClient) handleMessage(response jsonRPCResponse) { if json.Unmarshal(response.Result, &loginResult) == nil && loginResult.ID != "" { c.sessionID = loginResult.ID if loginResult.Job.IsValid() { + loginResult.Job.ClientID = c.sessionID c.active = true if c.listener != nil { c.listener.OnJob(loginResult.Job) @@ -239,6 +240,7 @@ func (c *StratumClient) handleMessage(response jsonRPCResponse) { if response.Method == "job" { var payload proxy.Job if json.Unmarshal(response.Params, &payload) == nil && payload.IsValid() { + payload.ClientID = c.sessionID c.active = true if c.listener != nil { c.listener.OnJob(payload) diff --git a/pool/strategy.go b/pool/strategy.go index 827bfe6..909737f 100644 --- a/pool/strategy.go +++ b/pool/strategy.go @@ -67,7 +67,11 @@ func (s *FailoverStrategy) Connect() { s.mu.Lock() defer s.mu.Unlock() - if len(s.pools) == 0 { + pools := s.pools + if s.cfg != nil { + pools = s.cfg.Pools + } + if len(pools) == 0 { return } @@ -83,7 +87,7 @@ func (s *FailoverStrategy) Connect() { } for attempt := 0; attempt < retries; attempt++ { - for index, poolConfig := range s.pools { + for index, poolConfig := range pools { if !poolConfig.Enabled { continue } diff --git a/proxy.go b/proxy.go index b555e20..c150c6a 100644 --- a/proxy.go +++ b/proxy.go @@ -21,17 +21,19 @@ import ( // p, result := proxy.New(cfg) // if result.OK { p.Start() } type Proxy struct { - config *Config - splitter Splitter - stats *Stats - workers *Workers - events *EventBus - miners map[int64]*Miner - minerMu sync.RWMutex - servers []*Server - ticker *time.Ticker - watcher *ConfigWatcher - done chan struct{} + config *Config + customDiff *CustomDiff + rateLimiter *RateLimiter + splitter Splitter + stats *Stats + workers *Workers + events *EventBus + miners map[int64]*Miner + minerMu sync.RWMutex + servers []*Server + ticker *time.Ticker + watcher *ConfigWatcher + done chan struct{} } // Splitter is the interface both NonceSplitter and SimpleSplitter satisfy. @@ -121,6 +123,7 @@ type tokenBucket struct { // bus.Subscribe(proxy.EventLogin, cd.OnLogin) type CustomDiff struct { globalDiff uint64 + mu sync.RWMutex } var splitterFactories = map[string]func(*Config, *EventBus) Splitter{ diff --git a/proxy_runtime.go b/proxy_runtime.go index 3d02c6f..f3be738 100644 --- a/proxy_runtime.go +++ b/proxy_runtime.go @@ -19,18 +19,21 @@ func New(cfg *Config) (*Proxy, error) { events := NewEventBus() stats := NewStats() - events.Subscribe(EventLogin, NewCustomDiff(cfg.CustomDiff).OnLogin) + customDiff := NewCustomDiff(cfg.CustomDiff) + events.Subscribe(EventLogin, customDiff.OnLogin) workers := NewWorkers(cfg.Workers, events) splitter := newSplitter(cfg, events) proxyValue := &Proxy{ - config: cfg, - splitter: splitter, - stats: stats, - workers: workers, - events: events, - miners: make(map[int64]*Miner), - done: make(chan struct{}), + config: cfg, + customDiff: customDiff, + splitter: splitter, + stats: stats, + workers: workers, + events: events, + miners: make(map[int64]*Miner), + rateLimiter: NewRateLimiter(cfg.RateLimit), + done: make(chan struct{}), } events.Subscribe(EventLogin, func(event Event) { @@ -93,7 +96,7 @@ func (p *Proxy) Start() { tlsConfig = &tls.Config{Certificates: []tls.Certificate{certificate}} } } - server, errorValue := NewServer(bind, tlsConfig, NewRateLimiter(p.config.RateLimit), p.acceptConn) + server, errorValue := NewServer(bind, tlsConfig, p.rateLimiter, p.acceptConn) if errorValue != nil { continue } @@ -109,6 +112,9 @@ func (p *Proxy) Start() { ticks++ p.stats.Tick() p.workers.Tick() + if p.rateLimiter != nil { + p.rateLimiter.Tick() + } if p.splitter != nil { p.splitter.Tick(ticks) } @@ -158,7 +164,19 @@ func (p *Proxy) Stop() { // p.Reload(newCfg) func (p *Proxy) Reload(cfg *Config) { if cfg != nil { - p.config = cfg + if p.config == nil { + p.config = cfg + } else { + sourcePath := p.config.sourcePath + *p.config = *cfg + p.config.sourcePath = sourcePath + } + if p.customDiff != nil { + p.customDiff.SetGlobalDiff(p.config.CustomDiff) + } + if p.rateLimiter != nil { + p.rateLimiter.SetConfig(p.config.RateLimit) + } } } @@ -209,6 +227,13 @@ func (p *Proxy) Mode() string { return p.config.Mode } +func (p *Proxy) HTTPConfig() HTTPConfig { + if p == nil || p.config == nil { + return HTTPConfig{} + } + return p.config.HTTP +} + func (p *Proxy) WorkersMode() string { if p == nil || p.config == nil { return "" diff --git a/runtime_support.go b/runtime_support.go index 0de48f1..9faaf4e 100644 --- a/runtime_support.go +++ b/runtime_support.go @@ -17,11 +17,24 @@ func NewRateLimiter(config RateLimit) *RateLimiter { } } +// SetConfig replaces the active rate-limit settings. +// +// rl.SetConfig(proxy.RateLimit{MaxConnectionsPerMinute: 30, BanDurationSeconds: 300}) +func (rl *RateLimiter) SetConfig(config RateLimit) { + if rl == nil { + return + } + + rl.mu.Lock() + rl.cfg = config + rl.mu.Unlock() +} + // Allow returns true if the IP address is permitted to open a new connection. Thread-safe. // // if rl.Allow(conn.RemoteAddr().String()) { proceed() } func (rl *RateLimiter) Allow(ip string) bool { - if rl == nil || rl.cfg.MaxConnectionsPerMinute <= 0 { + if rl == nil { return true } @@ -31,6 +44,10 @@ func (rl *RateLimiter) Allow(ip string) bool { rl.mu.Lock() defer rl.mu.Unlock() + if rl.cfg.MaxConnectionsPerMinute <= 0 { + return true + } + if bannedUntil, exists := rl.banned[host]; exists { if bannedUntil.After(now) { return false @@ -63,7 +80,7 @@ func (rl *RateLimiter) Allow(ip string) bool { // // rl.Tick() func (rl *RateLimiter) Tick() { - if rl == nil || rl.cfg.MaxConnectionsPerMinute <= 0 { + if rl == nil { return } @@ -71,6 +88,10 @@ func (rl *RateLimiter) Tick() { rl.mu.Lock() defer rl.mu.Unlock() + if rl.cfg.MaxConnectionsPerMinute <= 0 { + return + } + for host, bannedUntil := range rl.banned { if !bannedUntil.After(now) { delete(rl.banned, host) @@ -112,6 +133,19 @@ func NewCustomDiff(globalDiff uint64) *CustomDiff { return &CustomDiff{globalDiff: globalDiff} } +// SetGlobalDiff updates the default custom difficulty override. +// +// cd.SetGlobalDiff(100000) +func (cd *CustomDiff) SetGlobalDiff(globalDiff uint64) { + if cd == nil { + return + } + + cd.mu.Lock() + cd.globalDiff = globalDiff + cd.mu.Unlock() +} + // OnLogin parses miner.User for a "+{number}" suffix and sets miner.CustomDiff. // // cd.OnLogin(proxy.Event{Miner: miner}) @@ -130,7 +164,14 @@ func (cd *CustomDiff) OnLogin(event Event) { } } - if cd != nil && cd.globalDiff > 0 { - event.Miner.SetCustomDiff(cd.globalDiff) + if cd == nil { + return + } + + cd.mu.RLock() + globalDiff := cd.globalDiff + cd.mu.RUnlock() + if globalDiff > 0 { + event.Miner.SetCustomDiff(globalDiff) } } diff --git a/splitter/nicehash/mapper.go b/splitter/nicehash/mapper.go index 1525921..cb46909 100644 --- a/splitter/nicehash/mapper.go +++ b/splitter/nicehash/mapper.go @@ -31,6 +31,7 @@ type SubmitContext struct { RequestID int64 // JSON-RPC id from the miner's submit request MinerID int64 // miner that submitted JobID string + Expired bool } // NewNonceMapper creates one upstream pool mapper and its local slot table. @@ -59,6 +60,12 @@ func (m *NonceMapper) Submit(event *proxy.SubmitEvent) { return } + valid, expired := m.storage.JobStatus(event.JobID) + if !valid { + event.Miner.ReplyWithError(event.RequestID, "Invalid job id") + return + } + sequence := m.strategy.Submit(event.JobID, event.Nonce, event.Result, event.Algo) if sequence == 0 { if event.Miner != nil { @@ -71,6 +78,7 @@ func (m *NonceMapper) Submit(event *proxy.SubmitEvent) { RequestID: event.RequestID, MinerID: event.Miner.ID(), JobID: event.JobID, + Expired: expired, } m.mu.Unlock() } @@ -118,11 +126,12 @@ func (m *NonceMapper) OnResultAccepted(sequence int64, accepted bool, errorMessa if m.events != nil { jobCopy := m.storage.CurrentJob() m.events.Dispatch(proxy.Event{ - Type: eventType, - Miner: miner, - Job: jobCopy, - Diff: miner.Diff(), - Error: errorMessage, + Type: eventType, + Miner: miner, + Job: jobCopy, + Diff: miner.Diff(), + Error: errorMessage, + Expired: context.Expired, }) } diff --git a/splitter/nicehash/storage.go b/splitter/nicehash/storage.go index 4d80414..d44cee4 100644 --- a/splitter/nicehash/storage.go +++ b/splitter/nicehash/storage.go @@ -114,10 +114,27 @@ func (s *NonceStorage) SetJob(job proxy.Job) { // // if !storage.IsValidJobID(submitJobID) { reject } func (s *NonceStorage) IsValidJobID(id string) bool { + valid, _ := s.JobStatus(id) + return valid +} + +// JobStatus returns whether the job ID is current or stale-but-still-acceptable. +// +// valid, expired := storage.JobStatus(submitJobID) +func (s *NonceStorage) JobStatus(id string) (valid bool, expired bool) { s.mu.Lock() defer s.mu.Unlock() - return id != "" && (id == s.job.JobID || id == s.prevJob.JobID) + if id == "" { + return false, false + } + if id == s.job.JobID { + return true, false + } + if id == s.prevJob.JobID && id != "" { + return true, true + } + return false, false } // SlotCount returns free, dead, and active slot counts for monitoring output. diff --git a/splitter/simple/mapper.go b/splitter/simple/mapper.go index 544269d..e80beda 100644 --- a/splitter/simple/mapper.go +++ b/splitter/simple/mapper.go @@ -18,34 +18,66 @@ type SimpleMapper struct { miner *proxy.Miner // nil when idle strategy pool.Strategy events *proxy.EventBus - pending map[int64]int64 + pending map[int64]simpleSubmitContext + job proxy.Job + prevJob proxy.Job idleAt time.Time // zero when active stopped bool mu sync.Mutex } +type simpleSubmitContext struct { + RequestID int64 + Expired bool +} + // NewSimpleMapper stores the mapper ID and strategy. // // mapper := simple.NewSimpleMapper(1, strategy) func NewSimpleMapper(id int64, strategy pool.Strategy) *SimpleMapper { - return &SimpleMapper{id: id, strategy: strategy, pending: make(map[int64]int64)} + return &SimpleMapper{id: id, strategy: strategy, pending: make(map[int64]simpleSubmitContext)} } func (m *SimpleMapper) OnJob(job proxy.Job) { - if !job.IsValid() || m.miner == nil { - return - } - - m.miner.ForwardJob(job, job.Algo) -} - -func (m *SimpleMapper) OnResultAccepted(sequence int64, accepted bool, errorMessage string) { - if m.miner == nil { + if !job.IsValid() { return } m.mu.Lock() - requestID, exists := m.pending[sequence] + if m.job.ClientID == job.ClientID || m.job.ClientID == "" { + m.prevJob = m.job + } else { + m.prevJob = proxy.Job{} + } + m.job = job + miner := m.miner + m.mu.Unlock() + + if miner != nil { + miner.ForwardJob(job, job.Algo) + } +} + +func (m *SimpleMapper) JobStatus(id string) (valid bool, expired bool) { + m.mu.Lock() + defer m.mu.Unlock() + + if id == "" { + return false, false + } + if id == m.job.JobID { + return true, false + } + if id == m.prevJob.JobID { + return true, true + } + return false, false +} + +func (m *SimpleMapper) OnResultAccepted(sequence int64, accepted bool, errorMessage string) { + m.mu.Lock() + context, exists := m.pending[sequence] + miner := m.miner if !exists { m.mu.Unlock() return @@ -53,18 +85,22 @@ func (m *SimpleMapper) OnResultAccepted(sequence int64, accepted bool, errorMess delete(m.pending, sequence) m.mu.Unlock() + if miner == nil { + return + } + if accepted { if m.events != nil { - m.events.Dispatch(proxy.Event{Type: proxy.EventAccept, Miner: m.miner, Diff: m.miner.Diff()}) + m.events.Dispatch(proxy.Event{Type: proxy.EventAccept, Miner: miner, Diff: miner.Diff(), Expired: context.Expired}) } - m.miner.Success(requestID, "OK") + miner.Success(context.RequestID, "OK") return } if m.events != nil { - m.events.Dispatch(proxy.Event{Type: proxy.EventReject, Miner: m.miner, Diff: m.miner.Diff(), Error: errorMessage}) + m.events.Dispatch(proxy.Event{Type: proxy.EventReject, Miner: miner, Diff: miner.Diff(), Error: errorMessage, Expired: context.Expired}) } - m.miner.ReplyWithError(requestID, errorMessage) + miner.ReplyWithError(context.RequestID, errorMessage) } func (m *SimpleMapper) OnDisconnect() { diff --git a/splitter/simple/splitter.go b/splitter/simple/splitter.go index 1474607..c848970 100644 --- a/splitter/simple/splitter.go +++ b/splitter/simple/splitter.go @@ -92,6 +92,12 @@ func (s *SimpleSplitter) OnSubmit(event *proxy.SubmitEvent) { return } + valid, expired := mapper.JobStatus(event.JobID) + if !valid { + event.Miner.ReplyWithError(event.RequestID, "Invalid job id") + return + } + sequence := mapper.strategy.Submit(event.JobID, event.Nonce, event.Result, event.Algo) if sequence == 0 { event.Miner.ReplyWithError(event.RequestID, "Pool unavailable") @@ -99,7 +105,10 @@ func (s *SimpleSplitter) OnSubmit(event *proxy.SubmitEvent) { } mapper.mu.Lock() - mapper.pending[sequence] = event.RequestID + mapper.pending[sequence] = simpleSubmitContext{ + RequestID: event.RequestID, + Expired: expired, + } mapper.mu.Unlock() }