diff --git a/pool/impl.go b/pool/impl.go index 02cbe9c..ef6e9c2 100644 --- a/pool/impl.go +++ b/pool/impl.go @@ -138,6 +138,22 @@ func (c *StratumClient) Submit(jobID, nonce, result, algo string) int64 { return seq } +// Keepalive sends a lightweight keepalived request to the pool when enabled. +func (c *StratumClient) Keepalive() { + if c == nil || c.conn == nil || !c.IsActive() { + return + } + req := map[string]any{ + "id": atomic.AddInt64(&c.seq, 1), + "jsonrpc": "2.0", + "method": "keepalived", + "params": map[string]any{ + "id": c.sessionID, + }, + } + _ = c.writeJSON(req) +} + // Disconnect closes the connection and notifies the listener. func (c *StratumClient) Disconnect() { if c == nil { @@ -404,6 +420,19 @@ func (s *FailoverStrategy) IsActive() bool { return s != nil && s.client != nil && s.client.IsActive() } +// Tick keeps an active pool connection alive when configured. +func (s *FailoverStrategy) Tick(ticks uint64) { + if s == nil || ticks == 0 || ticks%60 != 0 { + return + } + s.mu.Lock() + client := s.client + s.mu.Unlock() + if client != nil && client.cfg.Keepalive { + client.Keepalive() + } +} + // OnJob forwards the pool job to the outer listener. func (s *FailoverStrategy) OnJob(job proxy.Job) { if s != nil && s.listener != nil { diff --git a/pool/keepalive_test.go b/pool/keepalive_test.go new file mode 100644 index 0000000..ddd19c9 --- /dev/null +++ b/pool/keepalive_test.go @@ -0,0 +1,112 @@ +package pool + +import ( + "bufio" + "encoding/json" + "net" + "testing" + "time" +) + +func TestStratumClient_Keepalive_Good(t *testing.T) { + serverConn, clientConn := net.Pipe() + defer serverConn.Close() + defer clientConn.Close() + + client := &StratumClient{ + conn: clientConn, + active: true, + sessionID: "session-1", + } + + done := make(chan struct{}) + go func() { + client.Keepalive() + close(done) + }() + + line, err := bufio.NewReader(serverConn).ReadBytes('\n') + if err != nil { + t.Fatalf("read keepalive request: %v", err) + } + <-done + + var payload map[string]any + if err := json.Unmarshal(line, &payload); err != nil { + t.Fatalf("unmarshal keepalive request: %v", err) + } + if got := payload["method"]; got != "keepalived" { + t.Fatalf("expected keepalived method, got %#v", got) + } + params, ok := payload["params"].(map[string]any) + if !ok { + t.Fatalf("expected params object, got %#v", payload["params"]) + } + if got := params["id"]; got != "session-1" { + t.Fatalf("expected session id in keepalive payload, got %#v", got) + } +} + +func TestStratumClient_Keepalive_Bad(t *testing.T) { + serverConn, clientConn := net.Pipe() + defer serverConn.Close() + defer clientConn.Close() + + client := &StratumClient{ + conn: clientConn, + active: false, + } + + client.Keepalive() + + if err := serverConn.SetReadDeadline(time.Now().Add(50 * time.Millisecond)); err != nil { + t.Fatalf("set deadline: %v", err) + } + buf := make([]byte, 1) + if _, err := serverConn.Read(buf); err == nil { + t.Fatalf("expected no keepalive data while inactive") + } +} + +func TestStratumClient_Keepalive_Ugly(t *testing.T) { + serverConn, clientConn := net.Pipe() + defer serverConn.Close() + defer clientConn.Close() + + client := &StratumClient{ + conn: clientConn, + active: true, + sessionID: "session-2", + } + + reader := bufio.NewReader(serverConn) + done := make(chan struct{}) + go func() { + client.Keepalive() + client.Keepalive() + close(done) + }() + + first, err := reader.ReadBytes('\n') + if err != nil { + t.Fatalf("read first keepalive request: %v", err) + } + second, err := reader.ReadBytes('\n') + if err != nil { + t.Fatalf("read second keepalive request: %v", err) + } + <-done + + var firstPayload map[string]any + if err := json.Unmarshal(first, &firstPayload); err != nil { + t.Fatalf("unmarshal first keepalive request: %v", err) + } + var secondPayload map[string]any + if err := json.Unmarshal(second, &secondPayload); err != nil { + t.Fatalf("unmarshal second keepalive request: %v", err) + } + + if firstPayload["id"] == secondPayload["id"] { + t.Fatalf("expected keepalive request ids to be unique") + } +} diff --git a/splitter/nicehash/impl.go b/splitter/nicehash/impl.go index c745315..a96f7a8 100644 --- a/splitter/nicehash/impl.go +++ b/splitter/nicehash/impl.go @@ -113,7 +113,25 @@ func (s *NonceSplitter) GC() { } // Tick is called once per second. -func (s *NonceSplitter) Tick(ticks uint64) {} +func (s *NonceSplitter) Tick(ticks uint64) { + if s == nil { + return + } + strategies := make([]pool.Strategy, 0, len(s.mappers)) + s.mu.RLock() + for _, mapper := range s.mappers { + if mapper == nil || mapper.strategy == nil { + continue + } + strategies = append(strategies, mapper.strategy) + } + s.mu.RUnlock() + for _, strategy := range strategies { + if ticker, ok := strategy.(interface{ Tick(uint64) }); ok { + ticker.Tick(ticks) + } + } +} // Upstreams returns pool connection counts. func (s *NonceSplitter) Upstreams() proxy.UpstreamStats { diff --git a/splitter/simple/impl.go b/splitter/simple/impl.go index 19e79f3..f76fdd8 100644 --- a/splitter/simple/impl.go +++ b/splitter/simple/impl.go @@ -138,6 +138,27 @@ func (s *SimpleSplitter) GC() { // Tick advances timeout checks in simple mode. func (s *SimpleSplitter) Tick(ticks uint64) { + if s == nil { + return + } + strategies := make([]pool.Strategy, 0, len(s.active)+len(s.idle)) + s.mu.Lock() + for _, mapper := range s.active { + if mapper != nil && mapper.strategy != nil { + strategies = append(strategies, mapper.strategy) + } + } + for _, mapper := range s.idle { + if mapper != nil && mapper.strategy != nil { + strategies = append(strategies, mapper.strategy) + } + } + s.mu.Unlock() + for _, strategy := range strategies { + if ticker, ok := strategy.(interface{ Tick(uint64) }); ok { + ticker.Tick(ticks) + } + } s.GC() }