feat(redis): add Redis pub/sub bridge for multi-instance Hub coordination
RedisBridge enables multiple Hub instances to coordinate broadcasts and channel-targeted messages across processes via Redis pub/sub. Uses envelope pattern with sourceID for infinite loop prevention. Phase 3 items 1-2 complete. 15 tests including cross-bridge messaging, loop prevention, concurrent publishes, and graceful shutdown. Race-free under -race. Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
parent
9e48f0b60d
commit
da3df0077d
5 changed files with 865 additions and 2 deletions
4
TODO.md
4
TODO.md
|
|
@ -52,8 +52,8 @@ Token-based authentication on WebSocket upgrade handshake. Pure Go, no JWT libra
|
|||
|
||||
## Phase 3: Scaling
|
||||
|
||||
- [ ] Support Redis pub/sub as backend for multi-instance hub coordination
|
||||
- [ ] Broadcast messages across hub instances via Redis channels
|
||||
- [x] Support Redis pub/sub as backend for multi-instance hub coordination
|
||||
- [x] Broadcast messages across hub instances via Redis channels
|
||||
- [ ] Add sticky sessions or connection-affinity documentation for load balancers
|
||||
|
||||
---
|
||||
|
|
|
|||
4
go.mod
4
go.mod
|
|
@ -4,11 +4,15 @@ go 1.25.5
|
|||
|
||||
require (
|
||||
github.com/gorilla/websocket v1.5.3
|
||||
github.com/redis/go-redis/v9 v9.18.0
|
||||
github.com/stretchr/testify v1.11.1
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/cespare/xxhash/v2 v2.3.0 // indirect
|
||||
github.com/davecgh/go-spew v1.1.1 // indirect
|
||||
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
|
||||
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||
go.uber.org/atomic v1.11.0 // indirect
|
||||
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||
)
|
||||
|
|
|
|||
16
go.sum
16
go.sum
|
|
@ -1,11 +1,27 @@
|
|||
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
|
||||
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
|
||||
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
|
||||
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
|
||||
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
|
||||
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
|
||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
|
||||
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
|
||||
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
|
||||
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
|
||||
github.com/klauspost/cpuid/v2 v2.0.9 h1:lgaqFMSdTdQYdZ04uHyN2d/eKdOMyi2YLSvlQIBFYa4=
|
||||
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/redis/go-redis/v9 v9.18.0 h1:pMkxYPkEbMPwRdenAzUNyFNrDgHx9U+DrBabWNfSRQs=
|
||||
github.com/redis/go-redis/v9 v9.18.0/go.mod h1:k3ufPphLU5YXwNTUcCRXGxUoF1fqxnhFQmscfkCoDA0=
|
||||
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
|
||||
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
|
||||
github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0=
|
||||
github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA=
|
||||
go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
|
||||
go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||
|
|
|
|||
226
redis.go
Normal file
226
redis.go
Normal file
|
|
@ -0,0 +1,226 @@
|
|||
// SPDX-Licence-Identifier: EUPL-1.2
|
||||
|
||||
package ws
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
// RedisConfig configures the Redis pub/sub bridge.
|
||||
type RedisConfig struct {
|
||||
// Addr is the Redis server address (e.g. "10.69.69.87:6379").
|
||||
Addr string
|
||||
|
||||
// Password is the optional Redis authentication password.
|
||||
Password string
|
||||
|
||||
// DB is the Redis database number (default 0).
|
||||
DB int
|
||||
|
||||
// Prefix is the key prefix for Redis channels (default "ws").
|
||||
Prefix string
|
||||
}
|
||||
|
||||
// redisEnvelope wraps a Message with a source identifier to prevent
|
||||
// infinite echo loops between bridge instances.
|
||||
type redisEnvelope struct {
|
||||
SourceID string `json:"sourceId"`
|
||||
Message Message `json:"message"`
|
||||
}
|
||||
|
||||
// RedisBridge connects a Hub to Redis pub/sub for cross-instance messaging.
|
||||
// Multiple Hub instances using the same Redis backend will coordinate
|
||||
// broadcasts and channel messages transparently.
|
||||
type RedisBridge struct {
|
||||
hub *Hub
|
||||
client *redis.Client
|
||||
pubsub *redis.PubSub
|
||||
prefix string
|
||||
sourceID string
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
// NewRedisBridge creates a Redis bridge for the given Hub.
|
||||
// It establishes a connection to Redis and validates connectivity
|
||||
// before returning. The bridge must be started with Start() to
|
||||
// begin processing messages.
|
||||
func NewRedisBridge(hub *Hub, cfg RedisConfig) (*RedisBridge, error) {
|
||||
if hub == nil {
|
||||
return nil, fmt.Errorf("hub must not be nil")
|
||||
}
|
||||
if cfg.Addr == "" {
|
||||
return nil, fmt.Errorf("redis address must not be empty")
|
||||
}
|
||||
if cfg.Prefix == "" {
|
||||
cfg.Prefix = "ws"
|
||||
}
|
||||
|
||||
client := redis.NewClient(&redis.Options{
|
||||
Addr: cfg.Addr,
|
||||
Password: cfg.Password,
|
||||
DB: cfg.DB,
|
||||
})
|
||||
|
||||
// Verify connectivity.
|
||||
if err := client.Ping(context.Background()).Err(); err != nil {
|
||||
client.Close()
|
||||
return nil, fmt.Errorf("redis ping failed: %w", err)
|
||||
}
|
||||
|
||||
// Generate a unique source ID to prevent echo loops.
|
||||
idBytes := make([]byte, 16)
|
||||
if _, err := rand.Read(idBytes); err != nil {
|
||||
client.Close()
|
||||
return nil, fmt.Errorf("failed to generate source ID: %w", err)
|
||||
}
|
||||
sourceID := hex.EncodeToString(idBytes)
|
||||
|
||||
return &RedisBridge{
|
||||
hub: hub,
|
||||
client: client,
|
||||
prefix: cfg.Prefix,
|
||||
sourceID: sourceID,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Start begins listening for Redis messages and forwarding them to
|
||||
// the local Hub's clients. It subscribes to the broadcast channel
|
||||
// and uses pattern-subscribe for all channel-targeted messages.
|
||||
// The bridge runs until Stop() is called or the provided context
|
||||
// is cancelled.
|
||||
func (rb *RedisBridge) Start(ctx context.Context) error {
|
||||
rb.ctx, rb.cancel = context.WithCancel(ctx)
|
||||
|
||||
broadcastChan := rb.prefix + ":broadcast"
|
||||
channelPattern := rb.prefix + ":channel:*"
|
||||
|
||||
rb.pubsub = rb.client.PSubscribe(rb.ctx, broadcastChan, channelPattern)
|
||||
|
||||
// Wait for the subscription confirmation.
|
||||
_, err := rb.pubsub.Receive(rb.ctx)
|
||||
if err != nil {
|
||||
rb.pubsub.Close()
|
||||
return fmt.Errorf("redis subscribe failed: %w", err)
|
||||
}
|
||||
|
||||
rb.wg.Add(1)
|
||||
go rb.listen()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stop cleanly shuts down the Redis bridge. It cancels the listener
|
||||
// goroutine, closes the pub/sub subscription, and closes the Redis
|
||||
// client connection.
|
||||
func (rb *RedisBridge) Stop() error {
|
||||
if rb.cancel != nil {
|
||||
rb.cancel()
|
||||
}
|
||||
|
||||
// Wait for the listener goroutine to exit.
|
||||
rb.wg.Wait()
|
||||
|
||||
var firstErr error
|
||||
if rb.pubsub != nil {
|
||||
if err := rb.pubsub.Close(); err != nil && firstErr == nil {
|
||||
firstErr = err
|
||||
}
|
||||
}
|
||||
if rb.client != nil {
|
||||
if err := rb.client.Close(); err != nil && firstErr == nil {
|
||||
firstErr = err
|
||||
}
|
||||
}
|
||||
return firstErr
|
||||
}
|
||||
|
||||
// PublishToChannel publishes a message to a specific channel via Redis.
|
||||
// Other bridge instances subscribed to the same Redis will receive the
|
||||
// message and deliver it to their local Hub clients on that channel.
|
||||
func (rb *RedisBridge) PublishToChannel(channel string, msg Message) error {
|
||||
redisChan := rb.prefix + ":channel:" + channel
|
||||
return rb.publish(redisChan, msg)
|
||||
}
|
||||
|
||||
// PublishBroadcast publishes a broadcast message via Redis. All bridge
|
||||
// instances will receive it and deliver to all their local Hub clients.
|
||||
func (rb *RedisBridge) PublishBroadcast(msg Message) error {
|
||||
redisChan := rb.prefix + ":broadcast"
|
||||
return rb.publish(redisChan, msg)
|
||||
}
|
||||
|
||||
// publish serialises the envelope and publishes to the given Redis channel.
|
||||
func (rb *RedisBridge) publish(redisChan string, msg Message) error {
|
||||
env := redisEnvelope{
|
||||
SourceID: rb.sourceID,
|
||||
Message: msg,
|
||||
}
|
||||
|
||||
data, err := json.Marshal(env)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to marshal redis envelope: %w", err)
|
||||
}
|
||||
|
||||
return rb.client.Publish(rb.ctx, redisChan, data).Err()
|
||||
}
|
||||
|
||||
// listen runs in a goroutine, reading messages from the Redis pub/sub
|
||||
// channel and forwarding them to the local Hub. Messages originating
|
||||
// from this bridge instance (matching sourceID) are silently dropped
|
||||
// to prevent infinite loops.
|
||||
func (rb *RedisBridge) listen() {
|
||||
defer rb.wg.Done()
|
||||
|
||||
ch := rb.pubsub.Channel()
|
||||
broadcastChan := rb.prefix + ":broadcast"
|
||||
channelPrefix := rb.prefix + ":channel:"
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-rb.ctx.Done():
|
||||
return
|
||||
case redisMsg, ok := <-ch:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
var env redisEnvelope
|
||||
if err := json.Unmarshal([]byte(redisMsg.Payload), &env); err != nil {
|
||||
// Skip malformed messages.
|
||||
continue
|
||||
}
|
||||
|
||||
// Loop prevention: skip our own messages.
|
||||
if env.SourceID == rb.sourceID {
|
||||
continue
|
||||
}
|
||||
|
||||
switch {
|
||||
case redisMsg.Channel == broadcastChan:
|
||||
// Deliver as a local broadcast.
|
||||
_ = rb.hub.Broadcast(env.Message)
|
||||
|
||||
case strings.HasPrefix(redisMsg.Channel, channelPrefix):
|
||||
// Extract the Hub channel name from the Redis channel.
|
||||
hubChannel := strings.TrimPrefix(redisMsg.Channel, channelPrefix)
|
||||
_ = rb.hub.SendToChannel(hubChannel, env.Message)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// SourceID returns the unique identifier for this bridge instance.
|
||||
// Useful for testing and debugging.
|
||||
func (rb *RedisBridge) SourceID() string {
|
||||
return rb.sourceID
|
||||
}
|
||||
617
redis_test.go
Normal file
617
redis_test.go
Normal file
|
|
@ -0,0 +1,617 @@
|
|||
// SPDX-Licence-Identifier: EUPL-1.2
|
||||
|
||||
package ws
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/redis/go-redis/v9"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
const redisAddr = "10.69.69.87:6379"
|
||||
|
||||
// skipIfNoRedis checks Redis availability and returns a connected client.
|
||||
// Tests are skipped when Redis is unreachable.
|
||||
func skipIfNoRedis(t *testing.T) *redis.Client {
|
||||
t.Helper()
|
||||
client := redis.NewClient(&redis.Options{Addr: redisAddr})
|
||||
if err := client.Ping(context.Background()).Err(); err != nil {
|
||||
t.Skip("Redis not available:", err)
|
||||
}
|
||||
return client
|
||||
}
|
||||
|
||||
// testPrefix returns a unique Redis key prefix per test to avoid
|
||||
// collisions between parallel test runs.
|
||||
func testPrefix(t *testing.T) string {
|
||||
t.Helper()
|
||||
return fmt.Sprintf("test_%d", time.Now().UnixNano())
|
||||
}
|
||||
|
||||
// cleanupRedis removes all keys matching the given prefix pattern.
|
||||
func cleanupRedis(t *testing.T, client *redis.Client, prefix string) {
|
||||
t.Helper()
|
||||
t.Cleanup(func() {
|
||||
ctx := context.Background()
|
||||
// PubSub channels are ephemeral — no keys to clean.
|
||||
// But flush any leftover data keys just in case.
|
||||
iter := client.Scan(ctx, 0, prefix+":*", 100).Iterator()
|
||||
for iter.Next(ctx) {
|
||||
client.Del(ctx, iter.Val())
|
||||
}
|
||||
client.Close()
|
||||
})
|
||||
}
|
||||
|
||||
// startTestHub creates a Hub, starts it, and returns cleanup resources.
|
||||
func startTestHub(t *testing.T) (*Hub, context.Context, context.CancelFunc) {
|
||||
t.Helper()
|
||||
hub := NewHub()
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
go hub.Run(ctx)
|
||||
t.Cleanup(func() { cancel() })
|
||||
return hub, ctx, cancel
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Bridge lifecycle
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func TestRedisBridge_CreateAndLifecycle(t *testing.T) {
|
||||
rc := skipIfNoRedis(t)
|
||||
prefix := testPrefix(t)
|
||||
cleanupRedis(t, rc, prefix)
|
||||
|
||||
hub, _, _ := startTestHub(t)
|
||||
|
||||
bridge, err := NewRedisBridge(hub, RedisConfig{
|
||||
Addr: redisAddr,
|
||||
Prefix: prefix,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, bridge)
|
||||
assert.NotEmpty(t, bridge.SourceID(), "bridge should have a unique source ID")
|
||||
|
||||
// Start the bridge.
|
||||
err = bridge.Start(context.Background())
|
||||
require.NoError(t, err)
|
||||
|
||||
// Stop the bridge.
|
||||
err = bridge.Stop()
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func TestRedisBridge_NilHub(t *testing.T) {
|
||||
skipIfNoRedis(t)
|
||||
|
||||
_, err := NewRedisBridge(nil, RedisConfig{
|
||||
Addr: redisAddr,
|
||||
})
|
||||
require.Error(t, err)
|
||||
assert.Contains(t, err.Error(), "hub must not be nil")
|
||||
}
|
||||
|
||||
func TestRedisBridge_EmptyAddr(t *testing.T) {
|
||||
hub := NewHub()
|
||||
|
||||
_, err := NewRedisBridge(hub, RedisConfig{
|
||||
Addr: "",
|
||||
})
|
||||
require.Error(t, err)
|
||||
assert.Contains(t, err.Error(), "redis address must not be empty")
|
||||
}
|
||||
|
||||
func TestRedisBridge_BadAddr(t *testing.T) {
|
||||
hub := NewHub()
|
||||
|
||||
_, err := NewRedisBridge(hub, RedisConfig{
|
||||
Addr: "127.0.0.1:1", // Nothing listening here.
|
||||
})
|
||||
require.Error(t, err)
|
||||
assert.Contains(t, err.Error(), "redis ping failed")
|
||||
}
|
||||
|
||||
func TestRedisBridge_DefaultPrefix(t *testing.T) {
|
||||
rc := skipIfNoRedis(t)
|
||||
cleanupRedis(t, rc, "ws")
|
||||
|
||||
hub, _, _ := startTestHub(t)
|
||||
|
||||
bridge, err := NewRedisBridge(hub, RedisConfig{
|
||||
Addr: redisAddr,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "ws", bridge.prefix)
|
||||
|
||||
err = bridge.Start(context.Background())
|
||||
require.NoError(t, err)
|
||||
defer bridge.Stop()
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// PublishBroadcast — messages reach local WebSocket clients
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func TestRedisBridge_PublishBroadcast(t *testing.T) {
|
||||
rc := skipIfNoRedis(t)
|
||||
prefix := testPrefix(t)
|
||||
cleanupRedis(t, rc, prefix)
|
||||
|
||||
hub, _, _ := startTestHub(t)
|
||||
|
||||
// Register a local client.
|
||||
client := &Client{
|
||||
hub: hub,
|
||||
send: make(chan []byte, 256),
|
||||
subscriptions: make(map[string]bool),
|
||||
}
|
||||
hub.register <- client
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
require.Equal(t, 1, hub.ClientCount())
|
||||
|
||||
// Create two bridges on same Redis — bridge1 publishes, bridge2 receives.
|
||||
bridge1, err := NewRedisBridge(hub, RedisConfig{Addr: redisAddr, Prefix: prefix})
|
||||
require.NoError(t, err)
|
||||
err = bridge1.Start(context.Background())
|
||||
require.NoError(t, err)
|
||||
defer bridge1.Stop()
|
||||
|
||||
// A second hub + bridge to receive the cross-instance message.
|
||||
hub2, _, _ := startTestHub(t)
|
||||
client2 := &Client{
|
||||
hub: hub2,
|
||||
send: make(chan []byte, 256),
|
||||
subscriptions: make(map[string]bool),
|
||||
}
|
||||
hub2.register <- client2
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
|
||||
bridge2, err := NewRedisBridge(hub2, RedisConfig{Addr: redisAddr, Prefix: prefix})
|
||||
require.NoError(t, err)
|
||||
err = bridge2.Start(context.Background())
|
||||
require.NoError(t, err)
|
||||
defer bridge2.Stop()
|
||||
|
||||
// Allow subscriptions to propagate.
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
// Publish broadcast from bridge1.
|
||||
err = bridge1.PublishBroadcast(Message{Type: TypeEvent, Data: "cross-broadcast"})
|
||||
require.NoError(t, err)
|
||||
|
||||
// bridge2's hub should receive the message (client2 gets it).
|
||||
select {
|
||||
case msg := <-client2.send:
|
||||
var received Message
|
||||
require.NoError(t, json.Unmarshal(msg, &received))
|
||||
assert.Equal(t, TypeEvent, received.Type)
|
||||
assert.Equal(t, "cross-broadcast", received.Data)
|
||||
case <-time.After(3 * time.Second):
|
||||
t.Fatal("bridge2 client should have received the broadcast")
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// PublishToChannel — targeted channel delivery
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func TestRedisBridge_PublishToChannel(t *testing.T) {
|
||||
rc := skipIfNoRedis(t)
|
||||
prefix := testPrefix(t)
|
||||
cleanupRedis(t, rc, prefix)
|
||||
|
||||
hub, _, _ := startTestHub(t)
|
||||
|
||||
// Create a client subscribed to a specific channel.
|
||||
subClient := &Client{
|
||||
hub: hub,
|
||||
send: make(chan []byte, 256),
|
||||
subscriptions: make(map[string]bool),
|
||||
}
|
||||
hub.register <- subClient
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
hub.Subscribe(subClient, "process:abc")
|
||||
|
||||
// Create a client NOT subscribed to that channel.
|
||||
otherClient := &Client{
|
||||
hub: hub,
|
||||
send: make(chan []byte, 256),
|
||||
subscriptions: make(map[string]bool),
|
||||
}
|
||||
hub.register <- otherClient
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
|
||||
// Second hub + bridge (the publisher).
|
||||
hub2, _, _ := startTestHub(t)
|
||||
bridge2, err := NewRedisBridge(hub2, RedisConfig{Addr: redisAddr, Prefix: prefix})
|
||||
require.NoError(t, err)
|
||||
err = bridge2.Start(context.Background())
|
||||
require.NoError(t, err)
|
||||
defer bridge2.Stop()
|
||||
|
||||
// Local hub bridge (the receiver).
|
||||
bridge1, err := NewRedisBridge(hub, RedisConfig{Addr: redisAddr, Prefix: prefix})
|
||||
require.NoError(t, err)
|
||||
err = bridge1.Start(context.Background())
|
||||
require.NoError(t, err)
|
||||
defer bridge1.Stop()
|
||||
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
// Publish to channel from bridge2.
|
||||
err = bridge2.PublishToChannel("process:abc", Message{
|
||||
Type: TypeProcessOutput,
|
||||
ProcessID: "abc",
|
||||
Data: "line of output",
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
// subClient (subscribed to process:abc) should receive the message.
|
||||
select {
|
||||
case msg := <-subClient.send:
|
||||
var received Message
|
||||
require.NoError(t, json.Unmarshal(msg, &received))
|
||||
assert.Equal(t, TypeProcessOutput, received.Type)
|
||||
assert.Equal(t, "line of output", received.Data)
|
||||
case <-time.After(3 * time.Second):
|
||||
t.Fatal("subscribed client should have received the channel message")
|
||||
}
|
||||
|
||||
// otherClient should NOT receive the message.
|
||||
select {
|
||||
case msg := <-otherClient.send:
|
||||
t.Fatalf("unsubscribed client should not receive channel message, got: %s", msg)
|
||||
case <-time.After(300 * time.Millisecond):
|
||||
// Good — no message delivered.
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Cross-bridge messaging
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func TestRedisBridge_CrossBridge(t *testing.T) {
|
||||
rc := skipIfNoRedis(t)
|
||||
prefix := testPrefix(t)
|
||||
cleanupRedis(t, rc, prefix)
|
||||
|
||||
// Hub A with a client.
|
||||
hubA, _, _ := startTestHub(t)
|
||||
clientA := &Client{
|
||||
hub: hubA,
|
||||
send: make(chan []byte, 256),
|
||||
subscriptions: make(map[string]bool),
|
||||
}
|
||||
hubA.register <- clientA
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
|
||||
bridgeA, err := NewRedisBridge(hubA, RedisConfig{Addr: redisAddr, Prefix: prefix})
|
||||
require.NoError(t, err)
|
||||
err = bridgeA.Start(context.Background())
|
||||
require.NoError(t, err)
|
||||
defer bridgeA.Stop()
|
||||
|
||||
// Hub B with a client.
|
||||
hubB, _, _ := startTestHub(t)
|
||||
clientB := &Client{
|
||||
hub: hubB,
|
||||
send: make(chan []byte, 256),
|
||||
subscriptions: make(map[string]bool),
|
||||
}
|
||||
hubB.register <- clientB
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
|
||||
bridgeB, err := NewRedisBridge(hubB, RedisConfig{Addr: redisAddr, Prefix: prefix})
|
||||
require.NoError(t, err)
|
||||
err = bridgeB.Start(context.Background())
|
||||
require.NoError(t, err)
|
||||
defer bridgeB.Stop()
|
||||
|
||||
// Allow subscriptions to settle.
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
// Publish from A, verify B receives.
|
||||
err = bridgeA.PublishBroadcast(Message{Type: TypeEvent, Data: "from-A"})
|
||||
require.NoError(t, err)
|
||||
|
||||
select {
|
||||
case msg := <-clientB.send:
|
||||
var received Message
|
||||
require.NoError(t, json.Unmarshal(msg, &received))
|
||||
assert.Equal(t, "from-A", received.Data)
|
||||
case <-time.After(3 * time.Second):
|
||||
t.Fatal("hub B should receive broadcast from hub A")
|
||||
}
|
||||
|
||||
// Publish from B, verify A receives.
|
||||
err = bridgeB.PublishBroadcast(Message{Type: TypeEvent, Data: "from-B"})
|
||||
require.NoError(t, err)
|
||||
|
||||
select {
|
||||
case msg := <-clientA.send:
|
||||
var received Message
|
||||
require.NoError(t, json.Unmarshal(msg, &received))
|
||||
assert.Equal(t, "from-B", received.Data)
|
||||
case <-time.After(3 * time.Second):
|
||||
t.Fatal("hub A should receive broadcast from hub B")
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Loop prevention
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func TestRedisBridge_LoopPrevention(t *testing.T) {
|
||||
rc := skipIfNoRedis(t)
|
||||
prefix := testPrefix(t)
|
||||
cleanupRedis(t, rc, prefix)
|
||||
|
||||
hub, _, _ := startTestHub(t)
|
||||
client := &Client{
|
||||
hub: hub,
|
||||
send: make(chan []byte, 256),
|
||||
subscriptions: make(map[string]bool),
|
||||
}
|
||||
hub.register <- client
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
|
||||
bridge, err := NewRedisBridge(hub, RedisConfig{Addr: redisAddr, Prefix: prefix})
|
||||
require.NoError(t, err)
|
||||
err = bridge.Start(context.Background())
|
||||
require.NoError(t, err)
|
||||
defer bridge.Stop()
|
||||
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
// Publish from this bridge — the same bridge should NOT deliver
|
||||
// the message back to its own hub.
|
||||
err = bridge.PublishBroadcast(Message{Type: TypeEvent, Data: "echo-test"})
|
||||
require.NoError(t, err)
|
||||
|
||||
select {
|
||||
case msg := <-client.send:
|
||||
t.Fatalf("bridge should not echo its own messages, got: %s", msg)
|
||||
case <-time.After(500 * time.Millisecond):
|
||||
// Good — no echo.
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Concurrent publishes
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func TestRedisBridge_ConcurrentPublishes(t *testing.T) {
|
||||
rc := skipIfNoRedis(t)
|
||||
prefix := testPrefix(t)
|
||||
cleanupRedis(t, rc, prefix)
|
||||
|
||||
// Receiver hub.
|
||||
hubRecv, _, _ := startTestHub(t)
|
||||
recvClient := &Client{
|
||||
hub: hubRecv,
|
||||
send: make(chan []byte, 1024),
|
||||
subscriptions: make(map[string]bool),
|
||||
}
|
||||
hubRecv.register <- recvClient
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
|
||||
bridgeRecv, err := NewRedisBridge(hubRecv, RedisConfig{Addr: redisAddr, Prefix: prefix})
|
||||
require.NoError(t, err)
|
||||
err = bridgeRecv.Start(context.Background())
|
||||
require.NoError(t, err)
|
||||
defer bridgeRecv.Stop()
|
||||
|
||||
// Sender hub.
|
||||
hubSend, _, _ := startTestHub(t)
|
||||
bridgeSend, err := NewRedisBridge(hubSend, RedisConfig{Addr: redisAddr, Prefix: prefix})
|
||||
require.NoError(t, err)
|
||||
err = bridgeSend.Start(context.Background())
|
||||
require.NoError(t, err)
|
||||
defer bridgeSend.Stop()
|
||||
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
// Fire 10 concurrent broadcasts.
|
||||
numPublishes := 10
|
||||
var wg sync.WaitGroup
|
||||
for i := 0; i < numPublishes; i++ {
|
||||
wg.Add(1)
|
||||
go func(idx int) {
|
||||
defer wg.Done()
|
||||
_ = bridgeSend.PublishBroadcast(Message{
|
||||
Type: TypeEvent,
|
||||
Data: fmt.Sprintf("concurrent-%d", idx),
|
||||
})
|
||||
}(i)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
// Collect received messages.
|
||||
received := 0
|
||||
timeout := time.After(5 * time.Second)
|
||||
for received < numPublishes {
|
||||
select {
|
||||
case <-recvClient.send:
|
||||
received++
|
||||
case <-timeout:
|
||||
t.Fatalf("expected %d messages, received %d", numPublishes, received)
|
||||
}
|
||||
}
|
||||
assert.Equal(t, numPublishes, received)
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Graceful shutdown
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func TestRedisBridge_GracefulShutdown(t *testing.T) {
|
||||
rc := skipIfNoRedis(t)
|
||||
prefix := testPrefix(t)
|
||||
cleanupRedis(t, rc, prefix)
|
||||
|
||||
hub, _, _ := startTestHub(t)
|
||||
|
||||
bridge, err := NewRedisBridge(hub, RedisConfig{Addr: redisAddr, Prefix: prefix})
|
||||
require.NoError(t, err)
|
||||
err = bridge.Start(context.Background())
|
||||
require.NoError(t, err)
|
||||
|
||||
// Stop should not panic or hang.
|
||||
done := make(chan error, 1)
|
||||
go func() {
|
||||
done <- bridge.Stop()
|
||||
}()
|
||||
|
||||
select {
|
||||
case err := <-done:
|
||||
assert.NoError(t, err)
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Fatal("Stop() should not hang")
|
||||
}
|
||||
|
||||
// Publishing after stop should fail gracefully (context cancelled).
|
||||
err = bridge.PublishBroadcast(Message{Type: TypeEvent, Data: "after-stop"})
|
||||
assert.Error(t, err, "publishing after stop should error")
|
||||
}
|
||||
|
||||
func TestRedisBridge_StopWithoutStart(t *testing.T) {
|
||||
rc := skipIfNoRedis(t)
|
||||
prefix := testPrefix(t)
|
||||
cleanupRedis(t, rc, prefix)
|
||||
|
||||
hub, _, _ := startTestHub(t)
|
||||
|
||||
bridge, err := NewRedisBridge(hub, RedisConfig{Addr: redisAddr, Prefix: prefix})
|
||||
require.NoError(t, err)
|
||||
|
||||
// Stop without Start should not panic.
|
||||
assert.NotPanics(t, func() {
|
||||
_ = bridge.Stop()
|
||||
})
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Context cancellation stops the bridge
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func TestRedisBridge_ContextCancellation(t *testing.T) {
|
||||
rc := skipIfNoRedis(t)
|
||||
prefix := testPrefix(t)
|
||||
cleanupRedis(t, rc, prefix)
|
||||
|
||||
hub, _, _ := startTestHub(t)
|
||||
|
||||
bridge, err := NewRedisBridge(hub, RedisConfig{Addr: redisAddr, Prefix: prefix})
|
||||
require.NoError(t, err)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
err = bridge.Start(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Cancel the context — the listener should exit gracefully.
|
||||
cancel()
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
// Cleanup without hanging.
|
||||
err = bridge.Stop()
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Channel message with pattern matching
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func TestRedisBridge_ChannelPatternMatching(t *testing.T) {
|
||||
rc := skipIfNoRedis(t)
|
||||
prefix := testPrefix(t)
|
||||
cleanupRedis(t, rc, prefix)
|
||||
|
||||
hub, _, _ := startTestHub(t)
|
||||
|
||||
// Subscribe clients to different channels.
|
||||
clientA := &Client{
|
||||
hub: hub,
|
||||
send: make(chan []byte, 256),
|
||||
subscriptions: make(map[string]bool),
|
||||
}
|
||||
clientB := &Client{
|
||||
hub: hub,
|
||||
send: make(chan []byte, 256),
|
||||
subscriptions: make(map[string]bool),
|
||||
}
|
||||
hub.register <- clientA
|
||||
hub.register <- clientB
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
|
||||
hub.Subscribe(clientA, "events:user:1")
|
||||
hub.Subscribe(clientB, "events:user:2")
|
||||
|
||||
// Receiver bridge.
|
||||
bridge1, err := NewRedisBridge(hub, RedisConfig{Addr: redisAddr, Prefix: prefix})
|
||||
require.NoError(t, err)
|
||||
err = bridge1.Start(context.Background())
|
||||
require.NoError(t, err)
|
||||
defer bridge1.Stop()
|
||||
|
||||
// Sender bridge.
|
||||
hub2, _, _ := startTestHub(t)
|
||||
bridge2, err := NewRedisBridge(hub2, RedisConfig{Addr: redisAddr, Prefix: prefix})
|
||||
require.NoError(t, err)
|
||||
err = bridge2.Start(context.Background())
|
||||
require.NoError(t, err)
|
||||
defer bridge2.Stop()
|
||||
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
// Publish to events:user:1 — only clientA should receive.
|
||||
err = bridge2.PublishToChannel("events:user:1", Message{Type: TypeEvent, Data: "for-user-1"})
|
||||
require.NoError(t, err)
|
||||
|
||||
select {
|
||||
case msg := <-clientA.send:
|
||||
var received Message
|
||||
require.NoError(t, json.Unmarshal(msg, &received))
|
||||
assert.Equal(t, "for-user-1", received.Data)
|
||||
case <-time.After(3 * time.Second):
|
||||
t.Fatal("clientA should receive the channel message")
|
||||
}
|
||||
|
||||
// clientB should NOT receive it.
|
||||
select {
|
||||
case msg := <-clientB.send:
|
||||
t.Fatalf("clientB should not receive message for user:1, got: %s", msg)
|
||||
case <-time.After(300 * time.Millisecond):
|
||||
// Good.
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Unique source IDs per bridge instance
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func TestRedisBridge_UniqueSourceIDs(t *testing.T) {
|
||||
rc := skipIfNoRedis(t)
|
||||
prefix := testPrefix(t)
|
||||
cleanupRedis(t, rc, prefix)
|
||||
|
||||
hub, _, _ := startTestHub(t)
|
||||
|
||||
bridge1, err := NewRedisBridge(hub, RedisConfig{Addr: redisAddr, Prefix: prefix})
|
||||
require.NoError(t, err)
|
||||
|
||||
bridge2, err := NewRedisBridge(hub, RedisConfig{Addr: redisAddr, Prefix: prefix})
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.NotEqual(t, bridge1.SourceID(), bridge2.SourceID(),
|
||||
"each bridge instance must have a unique source ID")
|
||||
|
||||
_ = bridge1.Stop()
|
||||
_ = bridge2.Stop()
|
||||
}
|
||||
Loading…
Add table
Reference in a new issue