RedisBridge enables multiple Hub instances to coordinate broadcasts and channel-targeted messages across processes via Redis pub/sub. Uses envelope pattern with sourceID for infinite loop prevention. Phase 3 items 1-2 complete. 15 tests including cross-bridge messaging, loop prevention, concurrent publishes, and graceful shutdown. Race-free under -race. Co-Authored-By: Virgil <virgil@lethean.io>
226 lines
5.9 KiB
Go
226 lines
5.9 KiB
Go
// SPDX-Licence-Identifier: EUPL-1.2
|
|
|
|
package ws
|
|
|
|
import (
|
|
"context"
|
|
"crypto/rand"
|
|
"encoding/hex"
|
|
"encoding/json"
|
|
"fmt"
|
|
"strings"
|
|
"sync"
|
|
|
|
"github.com/redis/go-redis/v9"
|
|
)
|
|
|
|
// RedisConfig configures the Redis pub/sub bridge.
|
|
type RedisConfig struct {
|
|
// Addr is the Redis server address (e.g. "10.69.69.87:6379").
|
|
Addr string
|
|
|
|
// Password is the optional Redis authentication password.
|
|
Password string
|
|
|
|
// DB is the Redis database number (default 0).
|
|
DB int
|
|
|
|
// Prefix is the key prefix for Redis channels (default "ws").
|
|
Prefix string
|
|
}
|
|
|
|
// redisEnvelope wraps a Message with a source identifier to prevent
|
|
// infinite echo loops between bridge instances.
|
|
type redisEnvelope struct {
|
|
SourceID string `json:"sourceId"`
|
|
Message Message `json:"message"`
|
|
}
|
|
|
|
// RedisBridge connects a Hub to Redis pub/sub for cross-instance messaging.
|
|
// Multiple Hub instances using the same Redis backend will coordinate
|
|
// broadcasts and channel messages transparently.
|
|
type RedisBridge struct {
|
|
hub *Hub
|
|
client *redis.Client
|
|
pubsub *redis.PubSub
|
|
prefix string
|
|
sourceID string
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
wg sync.WaitGroup
|
|
}
|
|
|
|
// NewRedisBridge creates a Redis bridge for the given Hub.
|
|
// It establishes a connection to Redis and validates connectivity
|
|
// before returning. The bridge must be started with Start() to
|
|
// begin processing messages.
|
|
func NewRedisBridge(hub *Hub, cfg RedisConfig) (*RedisBridge, error) {
|
|
if hub == nil {
|
|
return nil, fmt.Errorf("hub must not be nil")
|
|
}
|
|
if cfg.Addr == "" {
|
|
return nil, fmt.Errorf("redis address must not be empty")
|
|
}
|
|
if cfg.Prefix == "" {
|
|
cfg.Prefix = "ws"
|
|
}
|
|
|
|
client := redis.NewClient(&redis.Options{
|
|
Addr: cfg.Addr,
|
|
Password: cfg.Password,
|
|
DB: cfg.DB,
|
|
})
|
|
|
|
// Verify connectivity.
|
|
if err := client.Ping(context.Background()).Err(); err != nil {
|
|
client.Close()
|
|
return nil, fmt.Errorf("redis ping failed: %w", err)
|
|
}
|
|
|
|
// Generate a unique source ID to prevent echo loops.
|
|
idBytes := make([]byte, 16)
|
|
if _, err := rand.Read(idBytes); err != nil {
|
|
client.Close()
|
|
return nil, fmt.Errorf("failed to generate source ID: %w", err)
|
|
}
|
|
sourceID := hex.EncodeToString(idBytes)
|
|
|
|
return &RedisBridge{
|
|
hub: hub,
|
|
client: client,
|
|
prefix: cfg.Prefix,
|
|
sourceID: sourceID,
|
|
}, nil
|
|
}
|
|
|
|
// Start begins listening for Redis messages and forwarding them to
|
|
// the local Hub's clients. It subscribes to the broadcast channel
|
|
// and uses pattern-subscribe for all channel-targeted messages.
|
|
// The bridge runs until Stop() is called or the provided context
|
|
// is cancelled.
|
|
func (rb *RedisBridge) Start(ctx context.Context) error {
|
|
rb.ctx, rb.cancel = context.WithCancel(ctx)
|
|
|
|
broadcastChan := rb.prefix + ":broadcast"
|
|
channelPattern := rb.prefix + ":channel:*"
|
|
|
|
rb.pubsub = rb.client.PSubscribe(rb.ctx, broadcastChan, channelPattern)
|
|
|
|
// Wait for the subscription confirmation.
|
|
_, err := rb.pubsub.Receive(rb.ctx)
|
|
if err != nil {
|
|
rb.pubsub.Close()
|
|
return fmt.Errorf("redis subscribe failed: %w", err)
|
|
}
|
|
|
|
rb.wg.Add(1)
|
|
go rb.listen()
|
|
|
|
return nil
|
|
}
|
|
|
|
// Stop cleanly shuts down the Redis bridge. It cancels the listener
|
|
// goroutine, closes the pub/sub subscription, and closes the Redis
|
|
// client connection.
|
|
func (rb *RedisBridge) Stop() error {
|
|
if rb.cancel != nil {
|
|
rb.cancel()
|
|
}
|
|
|
|
// Wait for the listener goroutine to exit.
|
|
rb.wg.Wait()
|
|
|
|
var firstErr error
|
|
if rb.pubsub != nil {
|
|
if err := rb.pubsub.Close(); err != nil && firstErr == nil {
|
|
firstErr = err
|
|
}
|
|
}
|
|
if rb.client != nil {
|
|
if err := rb.client.Close(); err != nil && firstErr == nil {
|
|
firstErr = err
|
|
}
|
|
}
|
|
return firstErr
|
|
}
|
|
|
|
// PublishToChannel publishes a message to a specific channel via Redis.
|
|
// Other bridge instances subscribed to the same Redis will receive the
|
|
// message and deliver it to their local Hub clients on that channel.
|
|
func (rb *RedisBridge) PublishToChannel(channel string, msg Message) error {
|
|
redisChan := rb.prefix + ":channel:" + channel
|
|
return rb.publish(redisChan, msg)
|
|
}
|
|
|
|
// PublishBroadcast publishes a broadcast message via Redis. All bridge
|
|
// instances will receive it and deliver to all their local Hub clients.
|
|
func (rb *RedisBridge) PublishBroadcast(msg Message) error {
|
|
redisChan := rb.prefix + ":broadcast"
|
|
return rb.publish(redisChan, msg)
|
|
}
|
|
|
|
// publish serialises the envelope and publishes to the given Redis channel.
|
|
func (rb *RedisBridge) publish(redisChan string, msg Message) error {
|
|
env := redisEnvelope{
|
|
SourceID: rb.sourceID,
|
|
Message: msg,
|
|
}
|
|
|
|
data, err := json.Marshal(env)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to marshal redis envelope: %w", err)
|
|
}
|
|
|
|
return rb.client.Publish(rb.ctx, redisChan, data).Err()
|
|
}
|
|
|
|
// listen runs in a goroutine, reading messages from the Redis pub/sub
|
|
// channel and forwarding them to the local Hub. Messages originating
|
|
// from this bridge instance (matching sourceID) are silently dropped
|
|
// to prevent infinite loops.
|
|
func (rb *RedisBridge) listen() {
|
|
defer rb.wg.Done()
|
|
|
|
ch := rb.pubsub.Channel()
|
|
broadcastChan := rb.prefix + ":broadcast"
|
|
channelPrefix := rb.prefix + ":channel:"
|
|
|
|
for {
|
|
select {
|
|
case <-rb.ctx.Done():
|
|
return
|
|
case redisMsg, ok := <-ch:
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
var env redisEnvelope
|
|
if err := json.Unmarshal([]byte(redisMsg.Payload), &env); err != nil {
|
|
// Skip malformed messages.
|
|
continue
|
|
}
|
|
|
|
// Loop prevention: skip our own messages.
|
|
if env.SourceID == rb.sourceID {
|
|
continue
|
|
}
|
|
|
|
switch {
|
|
case redisMsg.Channel == broadcastChan:
|
|
// Deliver as a local broadcast.
|
|
_ = rb.hub.Broadcast(env.Message)
|
|
|
|
case strings.HasPrefix(redisMsg.Channel, channelPrefix):
|
|
// Extract the Hub channel name from the Redis channel.
|
|
hubChannel := strings.TrimPrefix(redisMsg.Channel, channelPrefix)
|
|
_ = rb.hub.SendToChannel(hubChannel, env.Message)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// SourceID returns the unique identifier for this bridge instance.
|
|
// Useful for testing and debugging.
|
|
func (rb *RedisBridge) SourceID() string {
|
|
return rb.sourceID
|
|
}
|