go-proxy/pool/strategy_test.go
Virgil 69eb908fe8 fix(pool): retry failover from primary
Co-Authored-By: Virgil <virgil@lethean.io>
2026-04-04 17:23:15 +00:00

306 lines
6.6 KiB
Go

package pool
import (
"bufio"
"encoding/json"
"net"
"sync"
"sync/atomic"
"testing"
"time"
"dappco.re/go/core/proxy"
)
type strategyTestListener struct {
jobCh chan proxy.Job
disconnectMu sync.Mutex
disconnects int
}
func (l *strategyTestListener) OnJob(job proxy.Job) {
l.jobCh <- job
}
func (l *strategyTestListener) OnResultAccepted(sequence int64, accepted bool, errorMessage string) {}
func (l *strategyTestListener) OnDisconnect() {
l.disconnectMu.Lock()
l.disconnects++
l.disconnectMu.Unlock()
}
func (l *strategyTestListener) Disconnects() int {
l.disconnectMu.Lock()
defer l.disconnectMu.Unlock()
return l.disconnects
}
func TestFailoverStrategy_Connect_Ugly(t *testing.T) {
primaryListener, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
defer primaryListener.Close()
backupListener, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
defer backupListener.Close()
go func() {
conn, acceptErr := primaryListener.Accept()
if acceptErr != nil {
return
}
_ = primaryListener.Close()
_ = conn.Close()
}()
go func() {
conn, acceptErr := backupListener.Accept()
if acceptErr != nil {
return
}
defer conn.Close()
reader := bufio.NewReader(conn)
if _, readErr := reader.ReadBytes('\n'); readErr != nil {
return
}
_ = json.NewEncoder(conn).Encode(map[string]interface{}{
"id": 1,
"jsonrpc": "2.0",
"error": nil,
"result": map[string]interface{}{
"id": "session-1",
"job": map[string]interface{}{
"blob": "abcd",
"job_id": "job-1",
"target": "b88d0600",
},
},
})
}()
listener := &strategyTestListener{
jobCh: make(chan proxy.Job, 1),
}
strategy := NewFailoverStrategy([]proxy.PoolConfig{
{URL: primaryListener.Addr().String(), Enabled: true},
{URL: backupListener.Addr().String(), Enabled: true},
}, listener, &proxy.Config{Retries: 2})
strategy.Connect()
defer strategy.Disconnect()
select {
case job := <-listener.jobCh:
if job.JobID != "job-1" {
t.Fatalf("expected backup job, got %+v", job)
}
case <-time.After(3 * time.Second):
t.Fatal("expected failover job after primary disconnect")
}
if listener.Disconnects() == 0 {
t.Fatal("expected disconnect callback before failover reconnect")
}
}
func TestFailoverStrategy_OnDisconnect_Good(t *testing.T) {
primaryListener, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
defer primaryListener.Close()
backupListener, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
defer backupListener.Close()
var primaryConnections atomic.Int32
go func() {
conn, acceptErr := primaryListener.Accept()
if acceptErr != nil {
return
}
primaryConnections.Add(1)
defer primaryListener.Close()
defer conn.Close()
reader := bufio.NewReader(conn)
if _, readErr := reader.ReadBytes('\n'); readErr != nil {
return
}
_ = json.NewEncoder(conn).Encode(map[string]interface{}{
"id": 1,
"jsonrpc": "2.0",
"error": map[string]interface{}{
"code": -1,
"message": "Unauthenticated",
},
})
}()
go func() {
conn, acceptErr := backupListener.Accept()
if acceptErr != nil {
return
}
defer conn.Close()
reader := bufio.NewReader(conn)
if _, readErr := reader.ReadBytes('\n'); readErr != nil {
return
}
_ = json.NewEncoder(conn).Encode(map[string]interface{}{
"id": 1,
"jsonrpc": "2.0",
"error": nil,
"result": map[string]interface{}{
"id": "session-1",
"job": map[string]interface{}{
"blob": "abcd",
"job_id": "job-1",
"target": "b88d0600",
},
},
})
}()
listener := &strategyTestListener{
jobCh: make(chan proxy.Job, 1),
}
strategy := NewFailoverStrategy([]proxy.PoolConfig{
{URL: primaryListener.Addr().String(), Enabled: true},
{URL: backupListener.Addr().String(), Enabled: true},
}, listener, &proxy.Config{Retries: 1})
strategy.Connect()
defer strategy.Disconnect()
select {
case job := <-listener.jobCh:
if job.JobID != "job-1" {
t.Fatalf("expected backup job, got %+v", job)
}
case <-time.After(3 * time.Second):
t.Fatalf("expected backup job after primary disconnect, primary connections=%d", primaryConnections.Load())
}
if listener.Disconnects() == 0 {
t.Fatal("expected disconnect callback before failover reconnect")
}
}
func TestFailoverStrategy_OnDisconnect_PrimaryFirst(t *testing.T) {
primaryListener, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
primaryAddr := primaryListener.Addr().String()
_ = primaryListener.Close()
backupListener, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
defer backupListener.Close()
go func() {
conn, acceptErr := backupListener.Accept()
if acceptErr != nil {
return
}
defer conn.Close()
reader := bufio.NewReader(conn)
if _, readErr := reader.ReadBytes('\n'); readErr != nil {
return
}
_ = json.NewEncoder(conn).Encode(map[string]interface{}{
"id": 1,
"jsonrpc": "2.0",
"error": nil,
"result": map[string]interface{}{
"id": "session-backup",
"job": map[string]interface{}{
"blob": "abcd",
"job_id": "backup-job",
"target": "b88d0600",
},
},
})
time.Sleep(40 * time.Millisecond)
}()
listener := &strategyTestListener{
jobCh: make(chan proxy.Job, 2),
}
strategy := NewFailoverStrategy([]proxy.PoolConfig{
{URL: primaryAddr, Enabled: true},
{URL: backupListener.Addr().String(), Enabled: true},
}, listener, &proxy.Config{Retries: 2})
strategy.Connect()
defer strategy.Disconnect()
select {
case job := <-listener.jobCh:
if job.JobID != "backup-job" {
t.Fatalf("expected initial failover job, got %+v", job)
}
case <-time.After(3 * time.Second):
t.Fatal("expected initial failover job")
}
primaryListener, err = net.Listen("tcp", primaryAddr)
if err != nil {
t.Fatal(err)
}
defer primaryListener.Close()
go func() {
conn, acceptErr := primaryListener.Accept()
if acceptErr != nil {
return
}
defer conn.Close()
reader := bufio.NewReader(conn)
if _, readErr := reader.ReadBytes('\n'); readErr != nil {
return
}
_ = json.NewEncoder(conn).Encode(map[string]interface{}{
"id": 1,
"jsonrpc": "2.0",
"error": nil,
"result": map[string]interface{}{
"id": "session-primary",
"job": map[string]interface{}{
"blob": "abcd",
"job_id": "primary-job",
"target": "b88d0600",
},
},
})
}()
select {
case job := <-listener.jobCh:
if job.JobID != "primary-job" {
t.Fatalf("expected reconnect to prefer primary pool, got %+v", job)
}
case <-time.After(3 * time.Second):
t.Fatal("expected reconnect job")
}
}