go-proxy/pool/client_test.go
Virgil 1d6176153c fix(pool): handle upstream error responses
Co-Authored-By: Virgil <virgil@lethean.io>
2026-04-04 17:07:26 +00:00

173 lines
3.8 KiB
Go

package pool
import (
"net"
"sync"
"testing"
"time"
"dappco.re/go/core/proxy"
)
type disconnectCountingListener struct {
mu sync.Mutex
count int
}
func (l *disconnectCountingListener) OnJob(job proxy.Job) {}
func (l *disconnectCountingListener) OnResultAccepted(sequence int64, accepted bool, errorMessage string) {
}
func (l *disconnectCountingListener) OnDisconnect() {
l.mu.Lock()
l.count++
l.mu.Unlock()
}
func (l *disconnectCountingListener) Count() int {
l.mu.Lock()
defer l.mu.Unlock()
return l.count
}
func TestStratumClient_ReadLoop_Ugly(t *testing.T) {
serverConn, clientConn := net.Pipe()
defer clientConn.Close()
listener := &disconnectCountingListener{}
client := &StratumClient{
listener: listener,
conn: serverConn,
}
go client.readLoop()
payload := make([]byte, 16385)
for index := range payload {
payload[index] = 'a'
}
payload = append(payload, '\n')
writeErr := make(chan error, 1)
go func() {
_, err := clientConn.Write(payload)
writeErr <- err
}()
time.Sleep(50 * time.Millisecond)
if got := listener.Count(); got != 1 {
t.Fatalf("expected oversized line to close the connection, got %d disconnect callbacks", got)
}
select {
case err := <-writeErr:
if err != nil {
t.Fatal(err)
}
default:
}
}
func TestStratumClient_Disconnect_Good(t *testing.T) {
serverConn, clientConn := net.Pipe()
defer clientConn.Close()
listener := &disconnectCountingListener{}
client := &StratumClient{
listener: listener,
conn: serverConn,
}
go client.readLoop()
time.Sleep(10 * time.Millisecond)
client.Disconnect()
time.Sleep(50 * time.Millisecond)
if got := listener.Count(); got != 1 {
t.Fatalf("expected one disconnect callback, got %d", got)
}
}
type resultCapturingListener struct {
mu sync.Mutex
sequence int64
accepted bool
errorMessage string
results int
disconnects int
}
func (l *resultCapturingListener) OnJob(job proxy.Job) {}
func (l *resultCapturingListener) OnResultAccepted(sequence int64, accepted bool, errorMessage string) {
l.mu.Lock()
l.sequence = sequence
l.accepted = accepted
l.errorMessage = errorMessage
l.results++
l.mu.Unlock()
}
func (l *resultCapturingListener) OnDisconnect() {
l.mu.Lock()
l.disconnects++
l.mu.Unlock()
}
func (l *resultCapturingListener) Snapshot() (int64, bool, string, int, int) {
l.mu.Lock()
defer l.mu.Unlock()
return l.sequence, l.accepted, l.errorMessage, l.results, l.disconnects
}
func TestStratumClient_HandleMessage_Bad(t *testing.T) {
listener := &resultCapturingListener{}
client := &StratumClient{
listener: listener,
sessionID: "session-1",
}
client.handleMessage(jsonRPCResponse{
ID: 7,
Error: &jsonRPCErrorBody{
Code: -1,
Message: "Low difficulty share",
},
})
sequence, accepted, errorMessage, results, disconnects := listener.Snapshot()
if sequence != 7 || accepted || errorMessage != "Low difficulty share" || results != 1 {
t.Fatalf("expected rejected submit callback, got sequence=%d accepted=%v error=%q results=%d", sequence, accepted, errorMessage, results)
}
if disconnects != 0 {
t.Fatalf("expected no disconnect on submit rejection, got %d", disconnects)
}
}
func TestStratumClient_HandleMessage_Ugly(t *testing.T) {
serverConn, clientConn := net.Pipe()
defer clientConn.Close()
listener := &resultCapturingListener{}
client := &StratumClient{
listener: listener,
conn: serverConn,
}
defer client.Disconnect()
client.handleMessage(jsonRPCResponse{
ID: 1,
Error: &jsonRPCErrorBody{
Code: -1,
Message: "Unauthenticated",
},
})
_, _, _, results, disconnects := listener.Snapshot()
if results != 0 {
t.Fatalf("expected login rejection not to be reported as a share result, got %d results", results)
}
if disconnects != 1 {
t.Fatalf("expected login rejection to disconnect once, got %d", disconnects)
}
}