Replace internal task tracking (TODO.md, FINDINGS.md) with structured documentation in docs/. Trim CLAUDE.md to agent instructions only. Co-Authored-By: Virgil <virgil@lethean.io>
12 KiB
go-ws Architecture
Module: forge.lthn.ai/core/go-ws
Overview
go-ws is a WebSocket hub for real-time streaming in Go. It implements the hub pattern for centralised connection management with channel-based pub/sub delivery, token-based authentication on upgrade, client-side reconnection with exponential backoff, and a Redis pub/sub bridge for coordinating multiple hub instances.
Hub Pattern
The Hub struct is the central broker. It owns all connection state and serialises mutations through goroutine-safe channels:
┌─────────────────────────────┐
│ Hub │
HTTP upgrade ──► │ register chan *Client │
disconnect ──► │ unregister chan *Client │
server send ──► │ broadcast chan []byte │
│ │
│ clients map[*Client]bool │
│ channels map[string]map… │
└─────────────────────────────┘
Hub.Run(ctx) is a single select-loop goroutine that processes all state transitions. Mutations to clients and channels occur only inside Run, protected by sync.RWMutex for reads from concurrent senders. This eliminates the need for channel-specific mutexes on write paths, while SendToChannel uses RLock plus a client-slice copy to prevent iterator invalidation races.
Lifecycle
- Call
hub.Run(ctx)in a goroutine before accepting connections. - Mount
hub.Handler()on anyhttp.ServeMuxor router. - Cancel the context to shut down:
Runcloses all client send channels, which causeswritePumpgoroutines to send a WebSocket close frame and exit.
Connection Management
Each connected client is represented by a Client:
type Client struct {
hub *Hub
conn *websocket.Conn
send chan []byte // buffered, capacity 256
subscriptions map[string]bool
mu sync.RWMutex
UserID string
Claims map[string]any
}
On upgrade, Handler creates a Client, sends it to hub.register, then starts two goroutines:
readPump— reads inbound frames, dispatches subscribe/unsubscribe/ping, enforces pong timeout.writePump— drainsclient.send, batches queued frames into a single write, sends server-side ping on heartbeat tick.
On disconnect (read error, write error, or context cancel), readPump sends the client to hub.unregister. Run removes it from clients and all channel maps, then closes client.send. Closing client.send is the signal that causes writePump to send a close frame and exit.
Buffer Overflow
Each client's send channel has capacity 256. When Broadcast or SendToChannel cannot deliver to a full channel, the client is considered stalled. For broadcasts, Run schedules an unregister via a goroutine. For SendToChannel, the message is silently dropped for that client only.
Timing Defaults
| Constant | Default | Purpose |
|---|---|---|
DefaultHeartbeatInterval |
30 s | Server-side ping cadence |
DefaultPongTimeout |
60 s | Read deadline after each pong |
DefaultWriteTimeout |
10 s | Write deadline per frame |
All three are configurable via HubConfig.
Channel Subscriptions
Channels are named strings; there are no predefined names. Clients subscribe by sending a JSON message:
{"type": "subscribe", "data": "process:proc-1"}
readPump intercepts subscribe/unsubscribe frames and calls hub.Subscribe / hub.Unsubscribe directly. The hub maintains two parallel indices:
hub.channels[channelName][*Client]— for targeted sendclient.subscriptions[channelName]— for cleanup on disconnect
Both indices are kept consistent. Unsubscribing from a channel with no remaining subscribers removes the channel entry entirely.
Sending to a Channel
hub.SendToChannel("process:proc-1", ws.Message{
Type: ws.TypeProcessOutput,
Data: "output line",
})
SendToChannel acquires RLock, copies the subscriber slice, releases the lock, then delivers to each client's send channel. The copy-under-lock pattern is critical: it prevents a data race with hub.unregister which mutates the map under write lock concurrently.
Process Helpers
Two convenience methods wrap SendToChannel with idiomatic channel naming (process:<id>):
hub.SendProcessOutput(processID, line)
hub.SendProcessStatus(processID, "exited", exitCode)
Message Types
All frames are JSON-encoded Message structs:
type Message struct {
Type MessageType `json:"type"`
Channel string `json:"channel,omitempty"`
ProcessID string `json:"processId,omitempty"`
Data any `json:"data,omitempty"`
Timestamp time.Time `json:"timestamp"`
}
| Type | Direction | Purpose |
|---|---|---|
process_output |
server → client | Real-time subprocess stdout/stderr line |
process_status |
server → client | Process state change (running, exited) with exit code |
event |
server → client | Generic application event |
error |
server → client | Error notification |
ping |
client → server | Keep-alive request |
pong |
server → client | Response to client ping |
subscribe |
client → server | Subscribe to a named channel |
unsubscribe |
client → server | Unsubscribe from a named channel |
Server-side pings use the WebSocket protocol ping frame (not a JSON ping message). Client-sent ping messages result in a JSON pong response via client.send.
Authentication
Authentication is optional and backward compatible. When HubConfig.Authenticator is nil, all connections are accepted unchanged. When set, Handler calls Authenticate(r) before the WebSocket upgrade:
auth := ws.NewAPIKeyAuth(map[string]string{
"secret-key": "user-1",
})
hub := ws.NewHubWithConfig(ws.HubConfig{Authenticator: auth})
If authentication fails, the HTTP response is 401 Unauthorised and no upgrade occurs.
Authenticator Interface
type Authenticator interface {
Authenticate(r *http.Request) AuthResult
}
type AuthResult struct {
Valid bool
UserID string
Claims map[string]any
Error error
}
Implementations may inspect any part of the request: headers, query parameters, cookies. The AuthenticatorFunc adapter allows plain functions to satisfy the interface without defining a named type.
Built-in: APIKeyAuthenticator
APIKeyAuthenticator validates Authorization: Bearer <key> against a static key-to-userID map. It is intended for simple deployments and internal tooling. For JWT or OAuth, implement the Authenticator interface directly.
Auth Fields on Client
On successful authentication, client.UserID and client.Claims are populated. These are readable from OnConnect callbacks and any code that holds a reference to the client.
OnAuthFailure Callback
HubConfig.OnAuthFailure fires on every rejected connection. Use it for logging, metrics, or rate-limit triggers. It receives the original *http.Request and the AuthResult.
Redis Pub/Sub Bridge
RedisBridge connects a local Hub to Redis, enabling multiple hub instances (across multiple processes or servers) to coordinate broadcasts and channel messages.
Instance A Redis Instance B
┌─────────┐ publish ┌─────────┐ receive ┌─────────┐
│ Hub A │──────────────►│ ws:* │─────────────►│ Hub B │
│ Bridge A│ │ channels│ │ Bridge B│
└─────────┘ └─────────┘ └─────────┘
Redis Channel Naming
| Redis channel | Meaning |
|---|---|
{prefix}:broadcast |
Global broadcast to all clients across all instances |
{prefix}:channel:{name} |
Targeted delivery to subscribers of {name} |
The default prefix is ws. Override via RedisConfig.Prefix.
Usage
bridge, err := ws.NewRedisBridge(hub, ws.RedisConfig{
Addr: "10.69.69.87:6379",
Prefix: "ws",
})
bridge.Start(ctx)
defer bridge.Stop()
// Publish to all instances
bridge.PublishBroadcast(msg)
// Publish to subscribers of "process:abc" on all instances
bridge.PublishToChannel("process:abc", msg)
NewRedisBridge validates connectivity with a PING before returning. Start uses PSubscribe with a pattern that matches both the broadcast channel and all {prefix}:channel:* channels. The listener goroutine forwards received messages to the local hub via hub.Broadcast or hub.SendToChannel as appropriate.
Envelope Pattern and Loop Prevention
Every published message is wrapped in a redisEnvelope before serialisation:
type redisEnvelope struct {
SourceID string `json:"sourceId"`
Message Message `json:"message"`
}
SourceID is a 16-byte cryptographically random hex string generated once per bridge instance at construction time. The listener goroutine drops any envelope whose SourceID matches the local bridge's own ID. This prevents a bridge from re-delivering its own published messages to its own hub.
Without this guard, a single PublishBroadcast call would immediately echo back to the publishing instance, creating an infinite loop.
ReconnectingClient
ReconnectingClient provides client-side resilience. It wraps a gorilla/websocket connection with an automatic reconnect loop:
client := ws.NewReconnectingClient(ws.ReconnectConfig{
URL: "ws://server/ws",
InitialBackoff: 1 * time.Second,
MaxBackoff: 30 * time.Second,
BackoffMultiplier: 2.0,
MaxRetries: 0, // unlimited
OnConnect: func() { /* ... */ },
OnDisconnect: func() { /* ... */ },
OnReconnect: func(attempt int) { /* ... */ },
OnMessage: func(msg ws.Message) { /* ... */ },
})
go client.Connect(ctx) // blocks until context cancelled or max retries exceeded
Backoff Calculation
Backoff doubles on each failed attempt, capped at MaxBackoff:
attempt 1: InitialBackoff
attempt 2: InitialBackoff * Multiplier
attempt 3: InitialBackoff * Multiplier^2
...
attempt N: min(InitialBackoff * Multiplier^(N-1), MaxBackoff)
The attempt counter resets to zero after a successful connection.
Connection States
const (
StateDisconnected ConnectionState = iota
StateConnecting
StateConnected
)
client.State() returns the current state under a read lock. Useful for health checks and UI indicators.
Concurrency Model
| Resource | Protection |
|---|---|
hub.clients, hub.channels |
sync.RWMutex on Hub |
client.subscriptions |
sync.RWMutex on Client |
ReconnectingClient.conn, .state |
sync.RWMutex on ReconnectingClient |
| Hub state transitions | Serialised through hub.register/hub.unregister channels in Run loop |
The key invariant: hub.clients and hub.channels are mutated only from within hub.Run (via channel receive) or from Subscribe/Unsubscribe under write lock. SendToChannel copies the subscriber slice under read lock before releasing it, so iteration happens outside any lock without races.
All code is clean under go test -race ./....
Dependency Graph
go-ws
├── github.com/gorilla/websocket v1.5.3 (WebSocket server + client)
└── github.com/redis/go-redis/v9 v9.18.0 (Redis bridge, optional at runtime)
The Redis dependency is a compile-time import but a runtime opt-in. Applications that do not create a RedisBridge incur no Redis connections.