diff --git a/TODO.md b/TODO.md index 17d8dc0..e9c85f5 100644 --- a/TODO.md +++ b/TODO.md @@ -52,8 +52,8 @@ Token-based authentication on WebSocket upgrade handshake. Pure Go, no JWT libra ## Phase 3: Scaling -- [ ] Support Redis pub/sub as backend for multi-instance hub coordination -- [ ] Broadcast messages across hub instances via Redis channels +- [x] Support Redis pub/sub as backend for multi-instance hub coordination +- [x] Broadcast messages across hub instances via Redis channels - [ ] Add sticky sessions or connection-affinity documentation for load balancers --- diff --git a/go.mod b/go.mod index 6889543..cdd0e66 100644 --- a/go.mod +++ b/go.mod @@ -4,11 +4,15 @@ go 1.25.5 require ( github.com/gorilla/websocket v1.5.3 + github.com/redis/go-redis/v9 v9.18.0 github.com/stretchr/testify v1.11.1 ) require ( + github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + go.uber.org/atomic v1.11.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 4b33f39..a9d24c2 100644 --- a/go.sum +++ b/go.sum @@ -1,11 +1,27 @@ +github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= +github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= +github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= +github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/klauspost/cpuid/v2 v2.0.9 h1:lgaqFMSdTdQYdZ04uHyN2d/eKdOMyi2YLSvlQIBFYa4= +github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/redis/go-redis/v9 v9.18.0 h1:pMkxYPkEbMPwRdenAzUNyFNrDgHx9U+DrBabWNfSRQs= +github.com/redis/go-redis/v9 v9.18.0/go.mod h1:k3ufPphLU5YXwNTUcCRXGxUoF1fqxnhFQmscfkCoDA0= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0= +github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA= +go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= +go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/redis.go b/redis.go new file mode 100644 index 0000000..d69b2e4 --- /dev/null +++ b/redis.go @@ -0,0 +1,226 @@ +// SPDX-Licence-Identifier: EUPL-1.2 + +package ws + +import ( + "context" + "crypto/rand" + "encoding/hex" + "encoding/json" + "fmt" + "strings" + "sync" + + "github.com/redis/go-redis/v9" +) + +// RedisConfig configures the Redis pub/sub bridge. +type RedisConfig struct { + // Addr is the Redis server address (e.g. "10.69.69.87:6379"). + Addr string + + // Password is the optional Redis authentication password. + Password string + + // DB is the Redis database number (default 0). + DB int + + // Prefix is the key prefix for Redis channels (default "ws"). + Prefix string +} + +// redisEnvelope wraps a Message with a source identifier to prevent +// infinite echo loops between bridge instances. +type redisEnvelope struct { + SourceID string `json:"sourceId"` + Message Message `json:"message"` +} + +// RedisBridge connects a Hub to Redis pub/sub for cross-instance messaging. +// Multiple Hub instances using the same Redis backend will coordinate +// broadcasts and channel messages transparently. +type RedisBridge struct { + hub *Hub + client *redis.Client + pubsub *redis.PubSub + prefix string + sourceID string + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup +} + +// NewRedisBridge creates a Redis bridge for the given Hub. +// It establishes a connection to Redis and validates connectivity +// before returning. The bridge must be started with Start() to +// begin processing messages. +func NewRedisBridge(hub *Hub, cfg RedisConfig) (*RedisBridge, error) { + if hub == nil { + return nil, fmt.Errorf("hub must not be nil") + } + if cfg.Addr == "" { + return nil, fmt.Errorf("redis address must not be empty") + } + if cfg.Prefix == "" { + cfg.Prefix = "ws" + } + + client := redis.NewClient(&redis.Options{ + Addr: cfg.Addr, + Password: cfg.Password, + DB: cfg.DB, + }) + + // Verify connectivity. + if err := client.Ping(context.Background()).Err(); err != nil { + client.Close() + return nil, fmt.Errorf("redis ping failed: %w", err) + } + + // Generate a unique source ID to prevent echo loops. + idBytes := make([]byte, 16) + if _, err := rand.Read(idBytes); err != nil { + client.Close() + return nil, fmt.Errorf("failed to generate source ID: %w", err) + } + sourceID := hex.EncodeToString(idBytes) + + return &RedisBridge{ + hub: hub, + client: client, + prefix: cfg.Prefix, + sourceID: sourceID, + }, nil +} + +// Start begins listening for Redis messages and forwarding them to +// the local Hub's clients. It subscribes to the broadcast channel +// and uses pattern-subscribe for all channel-targeted messages. +// The bridge runs until Stop() is called or the provided context +// is cancelled. +func (rb *RedisBridge) Start(ctx context.Context) error { + rb.ctx, rb.cancel = context.WithCancel(ctx) + + broadcastChan := rb.prefix + ":broadcast" + channelPattern := rb.prefix + ":channel:*" + + rb.pubsub = rb.client.PSubscribe(rb.ctx, broadcastChan, channelPattern) + + // Wait for the subscription confirmation. + _, err := rb.pubsub.Receive(rb.ctx) + if err != nil { + rb.pubsub.Close() + return fmt.Errorf("redis subscribe failed: %w", err) + } + + rb.wg.Add(1) + go rb.listen() + + return nil +} + +// Stop cleanly shuts down the Redis bridge. It cancels the listener +// goroutine, closes the pub/sub subscription, and closes the Redis +// client connection. +func (rb *RedisBridge) Stop() error { + if rb.cancel != nil { + rb.cancel() + } + + // Wait for the listener goroutine to exit. + rb.wg.Wait() + + var firstErr error + if rb.pubsub != nil { + if err := rb.pubsub.Close(); err != nil && firstErr == nil { + firstErr = err + } + } + if rb.client != nil { + if err := rb.client.Close(); err != nil && firstErr == nil { + firstErr = err + } + } + return firstErr +} + +// PublishToChannel publishes a message to a specific channel via Redis. +// Other bridge instances subscribed to the same Redis will receive the +// message and deliver it to their local Hub clients on that channel. +func (rb *RedisBridge) PublishToChannel(channel string, msg Message) error { + redisChan := rb.prefix + ":channel:" + channel + return rb.publish(redisChan, msg) +} + +// PublishBroadcast publishes a broadcast message via Redis. All bridge +// instances will receive it and deliver to all their local Hub clients. +func (rb *RedisBridge) PublishBroadcast(msg Message) error { + redisChan := rb.prefix + ":broadcast" + return rb.publish(redisChan, msg) +} + +// publish serialises the envelope and publishes to the given Redis channel. +func (rb *RedisBridge) publish(redisChan string, msg Message) error { + env := redisEnvelope{ + SourceID: rb.sourceID, + Message: msg, + } + + data, err := json.Marshal(env) + if err != nil { + return fmt.Errorf("failed to marshal redis envelope: %w", err) + } + + return rb.client.Publish(rb.ctx, redisChan, data).Err() +} + +// listen runs in a goroutine, reading messages from the Redis pub/sub +// channel and forwarding them to the local Hub. Messages originating +// from this bridge instance (matching sourceID) are silently dropped +// to prevent infinite loops. +func (rb *RedisBridge) listen() { + defer rb.wg.Done() + + ch := rb.pubsub.Channel() + broadcastChan := rb.prefix + ":broadcast" + channelPrefix := rb.prefix + ":channel:" + + for { + select { + case <-rb.ctx.Done(): + return + case redisMsg, ok := <-ch: + if !ok { + return + } + + var env redisEnvelope + if err := json.Unmarshal([]byte(redisMsg.Payload), &env); err != nil { + // Skip malformed messages. + continue + } + + // Loop prevention: skip our own messages. + if env.SourceID == rb.sourceID { + continue + } + + switch { + case redisMsg.Channel == broadcastChan: + // Deliver as a local broadcast. + _ = rb.hub.Broadcast(env.Message) + + case strings.HasPrefix(redisMsg.Channel, channelPrefix): + // Extract the Hub channel name from the Redis channel. + hubChannel := strings.TrimPrefix(redisMsg.Channel, channelPrefix) + _ = rb.hub.SendToChannel(hubChannel, env.Message) + } + } + } +} + +// SourceID returns the unique identifier for this bridge instance. +// Useful for testing and debugging. +func (rb *RedisBridge) SourceID() string { + return rb.sourceID +} diff --git a/redis_test.go b/redis_test.go new file mode 100644 index 0000000..9c2c84b --- /dev/null +++ b/redis_test.go @@ -0,0 +1,617 @@ +// SPDX-Licence-Identifier: EUPL-1.2 + +package ws + +import ( + "context" + "encoding/json" + "fmt" + "sync" + "testing" + "time" + + "github.com/redis/go-redis/v9" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const redisAddr = "10.69.69.87:6379" + +// skipIfNoRedis checks Redis availability and returns a connected client. +// Tests are skipped when Redis is unreachable. +func skipIfNoRedis(t *testing.T) *redis.Client { + t.Helper() + client := redis.NewClient(&redis.Options{Addr: redisAddr}) + if err := client.Ping(context.Background()).Err(); err != nil { + t.Skip("Redis not available:", err) + } + return client +} + +// testPrefix returns a unique Redis key prefix per test to avoid +// collisions between parallel test runs. +func testPrefix(t *testing.T) string { + t.Helper() + return fmt.Sprintf("test_%d", time.Now().UnixNano()) +} + +// cleanupRedis removes all keys matching the given prefix pattern. +func cleanupRedis(t *testing.T, client *redis.Client, prefix string) { + t.Helper() + t.Cleanup(func() { + ctx := context.Background() + // PubSub channels are ephemeral — no keys to clean. + // But flush any leftover data keys just in case. + iter := client.Scan(ctx, 0, prefix+":*", 100).Iterator() + for iter.Next(ctx) { + client.Del(ctx, iter.Val()) + } + client.Close() + }) +} + +// startTestHub creates a Hub, starts it, and returns cleanup resources. +func startTestHub(t *testing.T) (*Hub, context.Context, context.CancelFunc) { + t.Helper() + hub := NewHub() + ctx, cancel := context.WithCancel(context.Background()) + go hub.Run(ctx) + t.Cleanup(func() { cancel() }) + return hub, ctx, cancel +} + +// --------------------------------------------------------------------------- +// Bridge lifecycle +// --------------------------------------------------------------------------- + +func TestRedisBridge_CreateAndLifecycle(t *testing.T) { + rc := skipIfNoRedis(t) + prefix := testPrefix(t) + cleanupRedis(t, rc, prefix) + + hub, _, _ := startTestHub(t) + + bridge, err := NewRedisBridge(hub, RedisConfig{ + Addr: redisAddr, + Prefix: prefix, + }) + require.NoError(t, err) + require.NotNil(t, bridge) + assert.NotEmpty(t, bridge.SourceID(), "bridge should have a unique source ID") + + // Start the bridge. + err = bridge.Start(context.Background()) + require.NoError(t, err) + + // Stop the bridge. + err = bridge.Stop() + require.NoError(t, err) +} + +func TestRedisBridge_NilHub(t *testing.T) { + skipIfNoRedis(t) + + _, err := NewRedisBridge(nil, RedisConfig{ + Addr: redisAddr, + }) + require.Error(t, err) + assert.Contains(t, err.Error(), "hub must not be nil") +} + +func TestRedisBridge_EmptyAddr(t *testing.T) { + hub := NewHub() + + _, err := NewRedisBridge(hub, RedisConfig{ + Addr: "", + }) + require.Error(t, err) + assert.Contains(t, err.Error(), "redis address must not be empty") +} + +func TestRedisBridge_BadAddr(t *testing.T) { + hub := NewHub() + + _, err := NewRedisBridge(hub, RedisConfig{ + Addr: "127.0.0.1:1", // Nothing listening here. + }) + require.Error(t, err) + assert.Contains(t, err.Error(), "redis ping failed") +} + +func TestRedisBridge_DefaultPrefix(t *testing.T) { + rc := skipIfNoRedis(t) + cleanupRedis(t, rc, "ws") + + hub, _, _ := startTestHub(t) + + bridge, err := NewRedisBridge(hub, RedisConfig{ + Addr: redisAddr, + }) + require.NoError(t, err) + assert.Equal(t, "ws", bridge.prefix) + + err = bridge.Start(context.Background()) + require.NoError(t, err) + defer bridge.Stop() +} + +// --------------------------------------------------------------------------- +// PublishBroadcast — messages reach local WebSocket clients +// --------------------------------------------------------------------------- + +func TestRedisBridge_PublishBroadcast(t *testing.T) { + rc := skipIfNoRedis(t) + prefix := testPrefix(t) + cleanupRedis(t, rc, prefix) + + hub, _, _ := startTestHub(t) + + // Register a local client. + client := &Client{ + hub: hub, + send: make(chan []byte, 256), + subscriptions: make(map[string]bool), + } + hub.register <- client + time.Sleep(50 * time.Millisecond) + require.Equal(t, 1, hub.ClientCount()) + + // Create two bridges on same Redis — bridge1 publishes, bridge2 receives. + bridge1, err := NewRedisBridge(hub, RedisConfig{Addr: redisAddr, Prefix: prefix}) + require.NoError(t, err) + err = bridge1.Start(context.Background()) + require.NoError(t, err) + defer bridge1.Stop() + + // A second hub + bridge to receive the cross-instance message. + hub2, _, _ := startTestHub(t) + client2 := &Client{ + hub: hub2, + send: make(chan []byte, 256), + subscriptions: make(map[string]bool), + } + hub2.register <- client2 + time.Sleep(50 * time.Millisecond) + + bridge2, err := NewRedisBridge(hub2, RedisConfig{Addr: redisAddr, Prefix: prefix}) + require.NoError(t, err) + err = bridge2.Start(context.Background()) + require.NoError(t, err) + defer bridge2.Stop() + + // Allow subscriptions to propagate. + time.Sleep(100 * time.Millisecond) + + // Publish broadcast from bridge1. + err = bridge1.PublishBroadcast(Message{Type: TypeEvent, Data: "cross-broadcast"}) + require.NoError(t, err) + + // bridge2's hub should receive the message (client2 gets it). + select { + case msg := <-client2.send: + var received Message + require.NoError(t, json.Unmarshal(msg, &received)) + assert.Equal(t, TypeEvent, received.Type) + assert.Equal(t, "cross-broadcast", received.Data) + case <-time.After(3 * time.Second): + t.Fatal("bridge2 client should have received the broadcast") + } +} + +// --------------------------------------------------------------------------- +// PublishToChannel — targeted channel delivery +// --------------------------------------------------------------------------- + +func TestRedisBridge_PublishToChannel(t *testing.T) { + rc := skipIfNoRedis(t) + prefix := testPrefix(t) + cleanupRedis(t, rc, prefix) + + hub, _, _ := startTestHub(t) + + // Create a client subscribed to a specific channel. + subClient := &Client{ + hub: hub, + send: make(chan []byte, 256), + subscriptions: make(map[string]bool), + } + hub.register <- subClient + time.Sleep(50 * time.Millisecond) + hub.Subscribe(subClient, "process:abc") + + // Create a client NOT subscribed to that channel. + otherClient := &Client{ + hub: hub, + send: make(chan []byte, 256), + subscriptions: make(map[string]bool), + } + hub.register <- otherClient + time.Sleep(50 * time.Millisecond) + + // Second hub + bridge (the publisher). + hub2, _, _ := startTestHub(t) + bridge2, err := NewRedisBridge(hub2, RedisConfig{Addr: redisAddr, Prefix: prefix}) + require.NoError(t, err) + err = bridge2.Start(context.Background()) + require.NoError(t, err) + defer bridge2.Stop() + + // Local hub bridge (the receiver). + bridge1, err := NewRedisBridge(hub, RedisConfig{Addr: redisAddr, Prefix: prefix}) + require.NoError(t, err) + err = bridge1.Start(context.Background()) + require.NoError(t, err) + defer bridge1.Stop() + + time.Sleep(100 * time.Millisecond) + + // Publish to channel from bridge2. + err = bridge2.PublishToChannel("process:abc", Message{ + Type: TypeProcessOutput, + ProcessID: "abc", + Data: "line of output", + }) + require.NoError(t, err) + + // subClient (subscribed to process:abc) should receive the message. + select { + case msg := <-subClient.send: + var received Message + require.NoError(t, json.Unmarshal(msg, &received)) + assert.Equal(t, TypeProcessOutput, received.Type) + assert.Equal(t, "line of output", received.Data) + case <-time.After(3 * time.Second): + t.Fatal("subscribed client should have received the channel message") + } + + // otherClient should NOT receive the message. + select { + case msg := <-otherClient.send: + t.Fatalf("unsubscribed client should not receive channel message, got: %s", msg) + case <-time.After(300 * time.Millisecond): + // Good — no message delivered. + } +} + +// --------------------------------------------------------------------------- +// Cross-bridge messaging +// --------------------------------------------------------------------------- + +func TestRedisBridge_CrossBridge(t *testing.T) { + rc := skipIfNoRedis(t) + prefix := testPrefix(t) + cleanupRedis(t, rc, prefix) + + // Hub A with a client. + hubA, _, _ := startTestHub(t) + clientA := &Client{ + hub: hubA, + send: make(chan []byte, 256), + subscriptions: make(map[string]bool), + } + hubA.register <- clientA + time.Sleep(50 * time.Millisecond) + + bridgeA, err := NewRedisBridge(hubA, RedisConfig{Addr: redisAddr, Prefix: prefix}) + require.NoError(t, err) + err = bridgeA.Start(context.Background()) + require.NoError(t, err) + defer bridgeA.Stop() + + // Hub B with a client. + hubB, _, _ := startTestHub(t) + clientB := &Client{ + hub: hubB, + send: make(chan []byte, 256), + subscriptions: make(map[string]bool), + } + hubB.register <- clientB + time.Sleep(50 * time.Millisecond) + + bridgeB, err := NewRedisBridge(hubB, RedisConfig{Addr: redisAddr, Prefix: prefix}) + require.NoError(t, err) + err = bridgeB.Start(context.Background()) + require.NoError(t, err) + defer bridgeB.Stop() + + // Allow subscriptions to settle. + time.Sleep(200 * time.Millisecond) + + // Publish from A, verify B receives. + err = bridgeA.PublishBroadcast(Message{Type: TypeEvent, Data: "from-A"}) + require.NoError(t, err) + + select { + case msg := <-clientB.send: + var received Message + require.NoError(t, json.Unmarshal(msg, &received)) + assert.Equal(t, "from-A", received.Data) + case <-time.After(3 * time.Second): + t.Fatal("hub B should receive broadcast from hub A") + } + + // Publish from B, verify A receives. + err = bridgeB.PublishBroadcast(Message{Type: TypeEvent, Data: "from-B"}) + require.NoError(t, err) + + select { + case msg := <-clientA.send: + var received Message + require.NoError(t, json.Unmarshal(msg, &received)) + assert.Equal(t, "from-B", received.Data) + case <-time.After(3 * time.Second): + t.Fatal("hub A should receive broadcast from hub B") + } +} + +// --------------------------------------------------------------------------- +// Loop prevention +// --------------------------------------------------------------------------- + +func TestRedisBridge_LoopPrevention(t *testing.T) { + rc := skipIfNoRedis(t) + prefix := testPrefix(t) + cleanupRedis(t, rc, prefix) + + hub, _, _ := startTestHub(t) + client := &Client{ + hub: hub, + send: make(chan []byte, 256), + subscriptions: make(map[string]bool), + } + hub.register <- client + time.Sleep(50 * time.Millisecond) + + bridge, err := NewRedisBridge(hub, RedisConfig{Addr: redisAddr, Prefix: prefix}) + require.NoError(t, err) + err = bridge.Start(context.Background()) + require.NoError(t, err) + defer bridge.Stop() + + time.Sleep(100 * time.Millisecond) + + // Publish from this bridge — the same bridge should NOT deliver + // the message back to its own hub. + err = bridge.PublishBroadcast(Message{Type: TypeEvent, Data: "echo-test"}) + require.NoError(t, err) + + select { + case msg := <-client.send: + t.Fatalf("bridge should not echo its own messages, got: %s", msg) + case <-time.After(500 * time.Millisecond): + // Good — no echo. + } +} + +// --------------------------------------------------------------------------- +// Concurrent publishes +// --------------------------------------------------------------------------- + +func TestRedisBridge_ConcurrentPublishes(t *testing.T) { + rc := skipIfNoRedis(t) + prefix := testPrefix(t) + cleanupRedis(t, rc, prefix) + + // Receiver hub. + hubRecv, _, _ := startTestHub(t) + recvClient := &Client{ + hub: hubRecv, + send: make(chan []byte, 1024), + subscriptions: make(map[string]bool), + } + hubRecv.register <- recvClient + time.Sleep(50 * time.Millisecond) + + bridgeRecv, err := NewRedisBridge(hubRecv, RedisConfig{Addr: redisAddr, Prefix: prefix}) + require.NoError(t, err) + err = bridgeRecv.Start(context.Background()) + require.NoError(t, err) + defer bridgeRecv.Stop() + + // Sender hub. + hubSend, _, _ := startTestHub(t) + bridgeSend, err := NewRedisBridge(hubSend, RedisConfig{Addr: redisAddr, Prefix: prefix}) + require.NoError(t, err) + err = bridgeSend.Start(context.Background()) + require.NoError(t, err) + defer bridgeSend.Stop() + + time.Sleep(200 * time.Millisecond) + + // Fire 10 concurrent broadcasts. + numPublishes := 10 + var wg sync.WaitGroup + for i := 0; i < numPublishes; i++ { + wg.Add(1) + go func(idx int) { + defer wg.Done() + _ = bridgeSend.PublishBroadcast(Message{ + Type: TypeEvent, + Data: fmt.Sprintf("concurrent-%d", idx), + }) + }(i) + } + wg.Wait() + + // Collect received messages. + received := 0 + timeout := time.After(5 * time.Second) + for received < numPublishes { + select { + case <-recvClient.send: + received++ + case <-timeout: + t.Fatalf("expected %d messages, received %d", numPublishes, received) + } + } + assert.Equal(t, numPublishes, received) +} + +// --------------------------------------------------------------------------- +// Graceful shutdown +// --------------------------------------------------------------------------- + +func TestRedisBridge_GracefulShutdown(t *testing.T) { + rc := skipIfNoRedis(t) + prefix := testPrefix(t) + cleanupRedis(t, rc, prefix) + + hub, _, _ := startTestHub(t) + + bridge, err := NewRedisBridge(hub, RedisConfig{Addr: redisAddr, Prefix: prefix}) + require.NoError(t, err) + err = bridge.Start(context.Background()) + require.NoError(t, err) + + // Stop should not panic or hang. + done := make(chan error, 1) + go func() { + done <- bridge.Stop() + }() + + select { + case err := <-done: + assert.NoError(t, err) + case <-time.After(5 * time.Second): + t.Fatal("Stop() should not hang") + } + + // Publishing after stop should fail gracefully (context cancelled). + err = bridge.PublishBroadcast(Message{Type: TypeEvent, Data: "after-stop"}) + assert.Error(t, err, "publishing after stop should error") +} + +func TestRedisBridge_StopWithoutStart(t *testing.T) { + rc := skipIfNoRedis(t) + prefix := testPrefix(t) + cleanupRedis(t, rc, prefix) + + hub, _, _ := startTestHub(t) + + bridge, err := NewRedisBridge(hub, RedisConfig{Addr: redisAddr, Prefix: prefix}) + require.NoError(t, err) + + // Stop without Start should not panic. + assert.NotPanics(t, func() { + _ = bridge.Stop() + }) +} + +// --------------------------------------------------------------------------- +// Context cancellation stops the bridge +// --------------------------------------------------------------------------- + +func TestRedisBridge_ContextCancellation(t *testing.T) { + rc := skipIfNoRedis(t) + prefix := testPrefix(t) + cleanupRedis(t, rc, prefix) + + hub, _, _ := startTestHub(t) + + bridge, err := NewRedisBridge(hub, RedisConfig{Addr: redisAddr, Prefix: prefix}) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + err = bridge.Start(ctx) + require.NoError(t, err) + + // Cancel the context — the listener should exit gracefully. + cancel() + time.Sleep(200 * time.Millisecond) + + // Cleanup without hanging. + err = bridge.Stop() + assert.NoError(t, err) +} + +// --------------------------------------------------------------------------- +// Channel message with pattern matching +// --------------------------------------------------------------------------- + +func TestRedisBridge_ChannelPatternMatching(t *testing.T) { + rc := skipIfNoRedis(t) + prefix := testPrefix(t) + cleanupRedis(t, rc, prefix) + + hub, _, _ := startTestHub(t) + + // Subscribe clients to different channels. + clientA := &Client{ + hub: hub, + send: make(chan []byte, 256), + subscriptions: make(map[string]bool), + } + clientB := &Client{ + hub: hub, + send: make(chan []byte, 256), + subscriptions: make(map[string]bool), + } + hub.register <- clientA + hub.register <- clientB + time.Sleep(50 * time.Millisecond) + + hub.Subscribe(clientA, "events:user:1") + hub.Subscribe(clientB, "events:user:2") + + // Receiver bridge. + bridge1, err := NewRedisBridge(hub, RedisConfig{Addr: redisAddr, Prefix: prefix}) + require.NoError(t, err) + err = bridge1.Start(context.Background()) + require.NoError(t, err) + defer bridge1.Stop() + + // Sender bridge. + hub2, _, _ := startTestHub(t) + bridge2, err := NewRedisBridge(hub2, RedisConfig{Addr: redisAddr, Prefix: prefix}) + require.NoError(t, err) + err = bridge2.Start(context.Background()) + require.NoError(t, err) + defer bridge2.Stop() + + time.Sleep(200 * time.Millisecond) + + // Publish to events:user:1 — only clientA should receive. + err = bridge2.PublishToChannel("events:user:1", Message{Type: TypeEvent, Data: "for-user-1"}) + require.NoError(t, err) + + select { + case msg := <-clientA.send: + var received Message + require.NoError(t, json.Unmarshal(msg, &received)) + assert.Equal(t, "for-user-1", received.Data) + case <-time.After(3 * time.Second): + t.Fatal("clientA should receive the channel message") + } + + // clientB should NOT receive it. + select { + case msg := <-clientB.send: + t.Fatalf("clientB should not receive message for user:1, got: %s", msg) + case <-time.After(300 * time.Millisecond): + // Good. + } +} + +// --------------------------------------------------------------------------- +// Unique source IDs per bridge instance +// --------------------------------------------------------------------------- + +func TestRedisBridge_UniqueSourceIDs(t *testing.T) { + rc := skipIfNoRedis(t) + prefix := testPrefix(t) + cleanupRedis(t, rc, prefix) + + hub, _, _ := startTestHub(t) + + bridge1, err := NewRedisBridge(hub, RedisConfig{Addr: redisAddr, Prefix: prefix}) + require.NoError(t, err) + + bridge2, err := NewRedisBridge(hub, RedisConfig{Addr: redisAddr, Prefix: prefix}) + require.NoError(t, err) + + assert.NotEqual(t, bridge1.SourceID(), bridge2.SourceID(), + "each bridge instance must have a unique source ID") + + _ = bridge1.Stop() + _ = bridge2.Stop() +}