package pool import ( "bufio" "encoding/json" "net" "sync" "sync/atomic" "testing" "time" "dappco.re/go/core/proxy" ) type strategyTestListener struct { jobCh chan proxy.Job disconnectMu sync.Mutex disconnects int } func (l *strategyTestListener) OnJob(job proxy.Job) { l.jobCh <- job } func (l *strategyTestListener) OnResultAccepted(sequence int64, accepted bool, errorMessage string) {} func (l *strategyTestListener) OnDisconnect() { l.disconnectMu.Lock() l.disconnects++ l.disconnectMu.Unlock() } func (l *strategyTestListener) Disconnects() int { l.disconnectMu.Lock() defer l.disconnectMu.Unlock() return l.disconnects } func TestFailoverStrategy_Connect_Ugly(t *testing.T) { primaryListener, err := net.Listen("tcp", "127.0.0.1:0") if err != nil { t.Fatal(err) } defer primaryListener.Close() backupListener, err := net.Listen("tcp", "127.0.0.1:0") if err != nil { t.Fatal(err) } defer backupListener.Close() go func() { conn, acceptErr := primaryListener.Accept() if acceptErr != nil { return } _ = primaryListener.Close() _ = conn.Close() }() go func() { conn, acceptErr := backupListener.Accept() if acceptErr != nil { return } defer conn.Close() reader := bufio.NewReader(conn) if _, readErr := reader.ReadBytes('\n'); readErr != nil { return } _ = json.NewEncoder(conn).Encode(map[string]interface{}{ "id": 1, "jsonrpc": "2.0", "error": nil, "result": map[string]interface{}{ "id": "session-1", "job": map[string]interface{}{ "blob": "abcd", "job_id": "job-1", "target": "b88d0600", }, }, }) }() listener := &strategyTestListener{ jobCh: make(chan proxy.Job, 1), } strategy := NewFailoverStrategy([]proxy.PoolConfig{ {URL: primaryListener.Addr().String(), Enabled: true}, {URL: backupListener.Addr().String(), Enabled: true}, }, listener, &proxy.Config{Retries: 2}) strategy.Connect() defer strategy.Disconnect() select { case job := <-listener.jobCh: if job.JobID != "job-1" { t.Fatalf("expected backup job, got %+v", job) } case <-time.After(3 * time.Second): t.Fatal("expected failover job after primary disconnect") } if listener.Disconnects() == 0 { t.Fatal("expected disconnect callback before failover reconnect") } } func TestFailoverStrategy_OnDisconnect_Good(t *testing.T) { primaryListener, err := net.Listen("tcp", "127.0.0.1:0") if err != nil { t.Fatal(err) } defer primaryListener.Close() backupListener, err := net.Listen("tcp", "127.0.0.1:0") if err != nil { t.Fatal(err) } defer backupListener.Close() var primaryConnections atomic.Int32 go func() { conn, acceptErr := primaryListener.Accept() if acceptErr != nil { return } primaryConnections.Add(1) defer primaryListener.Close() defer conn.Close() reader := bufio.NewReader(conn) if _, readErr := reader.ReadBytes('\n'); readErr != nil { return } _ = json.NewEncoder(conn).Encode(map[string]interface{}{ "id": 1, "jsonrpc": "2.0", "error": map[string]interface{}{ "code": -1, "message": "Unauthenticated", }, }) }() go func() { conn, acceptErr := backupListener.Accept() if acceptErr != nil { return } defer conn.Close() reader := bufio.NewReader(conn) if _, readErr := reader.ReadBytes('\n'); readErr != nil { return } _ = json.NewEncoder(conn).Encode(map[string]interface{}{ "id": 1, "jsonrpc": "2.0", "error": nil, "result": map[string]interface{}{ "id": "session-1", "job": map[string]interface{}{ "blob": "abcd", "job_id": "job-1", "target": "b88d0600", }, }, }) }() listener := &strategyTestListener{ jobCh: make(chan proxy.Job, 1), } strategy := NewFailoverStrategy([]proxy.PoolConfig{ {URL: primaryListener.Addr().String(), Enabled: true}, {URL: backupListener.Addr().String(), Enabled: true}, }, listener, &proxy.Config{Retries: 1}) strategy.Connect() defer strategy.Disconnect() select { case job := <-listener.jobCh: if job.JobID != "job-1" { t.Fatalf("expected backup job, got %+v", job) } case <-time.After(3 * time.Second): t.Fatalf("expected backup job after primary disconnect, primary connections=%d", primaryConnections.Load()) } if listener.Disconnects() == 0 { t.Fatal("expected disconnect callback before failover reconnect") } } func TestFailoverStrategy_OnDisconnect_PrimaryFirst(t *testing.T) { primaryListener, err := net.Listen("tcp", "127.0.0.1:0") if err != nil { t.Fatal(err) } primaryAddr := primaryListener.Addr().String() _ = primaryListener.Close() backupListener, err := net.Listen("tcp", "127.0.0.1:0") if err != nil { t.Fatal(err) } defer backupListener.Close() go func() { conn, acceptErr := backupListener.Accept() if acceptErr != nil { return } defer conn.Close() reader := bufio.NewReader(conn) if _, readErr := reader.ReadBytes('\n'); readErr != nil { return } _ = json.NewEncoder(conn).Encode(map[string]interface{}{ "id": 1, "jsonrpc": "2.0", "error": nil, "result": map[string]interface{}{ "id": "session-backup", "job": map[string]interface{}{ "blob": "abcd", "job_id": "backup-job", "target": "b88d0600", }, }, }) time.Sleep(40 * time.Millisecond) }() listener := &strategyTestListener{ jobCh: make(chan proxy.Job, 2), } strategy := NewFailoverStrategy([]proxy.PoolConfig{ {URL: primaryAddr, Enabled: true}, {URL: backupListener.Addr().String(), Enabled: true}, }, listener, &proxy.Config{Retries: 2}) strategy.Connect() defer strategy.Disconnect() select { case job := <-listener.jobCh: if job.JobID != "backup-job" { t.Fatalf("expected initial failover job, got %+v", job) } case <-time.After(3 * time.Second): t.Fatal("expected initial failover job") } primaryListener, err = net.Listen("tcp", primaryAddr) if err != nil { t.Fatal(err) } defer primaryListener.Close() go func() { conn, acceptErr := primaryListener.Accept() if acceptErr != nil { return } defer conn.Close() reader := bufio.NewReader(conn) if _, readErr := reader.ReadBytes('\n'); readErr != nil { return } _ = json.NewEncoder(conn).Encode(map[string]interface{}{ "id": 1, "jsonrpc": "2.0", "error": nil, "result": map[string]interface{}{ "id": "session-primary", "job": map[string]interface{}{ "blob": "abcd", "job_id": "primary-job", "target": "b88d0600", }, }, }) }() select { case job := <-listener.jobCh: if job.JobID != "primary-job" { t.Fatalf("expected reconnect to prefer primary pool, got %+v", job) } case <-time.After(3 * time.Second): t.Fatal("expected reconnect job") } }