go-ws/docs/architecture.md
Snider 5ccb169342 docs: graduate TODO/FINDINGS into production documentation
Replace internal task tracking (TODO.md, FINDINGS.md) with structured
documentation in docs/. Trim CLAUDE.md to agent instructions only.

Co-Authored-By: Virgil <virgil@lethean.io>
2026-02-20 15:01:55 +00:00

317 lines
12 KiB
Markdown

# go-ws Architecture
Module: `forge.lthn.ai/core/go-ws`
---
## Overview
go-ws is a WebSocket hub for real-time streaming in Go. It implements the hub pattern for centralised connection management with channel-based pub/sub delivery, token-based authentication on upgrade, client-side reconnection with exponential backoff, and a Redis pub/sub bridge for coordinating multiple hub instances.
---
## Hub Pattern
The `Hub` struct is the central broker. It owns all connection state and serialises mutations through goroutine-safe channels:
```
┌─────────────────────────────┐
│ Hub │
HTTP upgrade ──► │ register chan *Client │
disconnect ──► │ unregister chan *Client │
server send ──► │ broadcast chan []byte │
│ │
│ clients map[*Client]bool │
│ channels map[string]map… │
└─────────────────────────────┘
```
`Hub.Run(ctx)` is a single select-loop goroutine that processes all state transitions. Mutations to `clients` and `channels` occur only inside `Run`, protected by `sync.RWMutex` for reads from concurrent senders. This eliminates the need for channel-specific mutexes on write paths, while `SendToChannel` uses `RLock` plus a client-slice copy to prevent iterator invalidation races.
### Lifecycle
1. Call `hub.Run(ctx)` in a goroutine before accepting connections.
2. Mount `hub.Handler()` on any `http.ServeMux` or router.
3. Cancel the context to shut down: `Run` closes all client send channels, which causes `writePump` goroutines to send a WebSocket close frame and exit.
---
## Connection Management
Each connected client is represented by a `Client`:
```go
type Client struct {
hub *Hub
conn *websocket.Conn
send chan []byte // buffered, capacity 256
subscriptions map[string]bool
mu sync.RWMutex
UserID string
Claims map[string]any
}
```
On upgrade, `Handler` creates a `Client`, sends it to `hub.register`, then starts two goroutines:
- `readPump` — reads inbound frames, dispatches subscribe/unsubscribe/ping, enforces pong timeout.
- `writePump` — drains `client.send`, batches queued frames into a single write, sends server-side ping on heartbeat tick.
On disconnect (read error, write error, or context cancel), `readPump` sends the client to `hub.unregister`. `Run` removes it from `clients` and all channel maps, then closes `client.send`. Closing `client.send` is the signal that causes `writePump` to send a close frame and exit.
### Buffer Overflow
Each client's `send` channel has capacity 256. When `Broadcast` or `SendToChannel` cannot deliver to a full channel, the client is considered stalled. For broadcasts, `Run` schedules an unregister via a goroutine. For `SendToChannel`, the message is silently dropped for that client only.
### Timing Defaults
| Constant | Default | Purpose |
|---|---|---|
| `DefaultHeartbeatInterval` | 30 s | Server-side ping cadence |
| `DefaultPongTimeout` | 60 s | Read deadline after each pong |
| `DefaultWriteTimeout` | 10 s | Write deadline per frame |
All three are configurable via `HubConfig`.
---
## Channel Subscriptions
Channels are named strings; there are no predefined names. Clients subscribe by sending a JSON message:
```json
{"type": "subscribe", "data": "process:proc-1"}
```
`readPump` intercepts subscribe/unsubscribe frames and calls `hub.Subscribe` / `hub.Unsubscribe` directly. The hub maintains two parallel indices:
- `hub.channels[channelName][*Client]` — for targeted send
- `client.subscriptions[channelName]` — for cleanup on disconnect
Both indices are kept consistent. Unsubscribing from a channel with no remaining subscribers removes the channel entry entirely.
### Sending to a Channel
```go
hub.SendToChannel("process:proc-1", ws.Message{
Type: ws.TypeProcessOutput,
Data: "output line",
})
```
`SendToChannel` acquires `RLock`, copies the subscriber slice, releases the lock, then delivers to each client's `send` channel. The copy-under-lock pattern is critical: it prevents a data race with `hub.unregister` which mutates the map under write lock concurrently.
### Process Helpers
Two convenience methods wrap `SendToChannel` with idiomatic channel naming (`process:<id>`):
```go
hub.SendProcessOutput(processID, line)
hub.SendProcessStatus(processID, "exited", exitCode)
```
---
## Message Types
All frames are JSON-encoded `Message` structs:
```go
type Message struct {
Type MessageType `json:"type"`
Channel string `json:"channel,omitempty"`
ProcessID string `json:"processId,omitempty"`
Data any `json:"data,omitempty"`
Timestamp time.Time `json:"timestamp"`
}
```
| Type | Direction | Purpose |
|---|---|---|
| `process_output` | server → client | Real-time subprocess stdout/stderr line |
| `process_status` | server → client | Process state change (`running`, `exited`) with exit code |
| `event` | server → client | Generic application event |
| `error` | server → client | Error notification |
| `ping` | client → server | Keep-alive request |
| `pong` | server → client | Response to client ping |
| `subscribe` | client → server | Subscribe to a named channel |
| `unsubscribe` | client → server | Unsubscribe from a named channel |
Server-side pings use the WebSocket protocol ping frame (not a JSON `ping` message). Client-sent `ping` messages result in a JSON `pong` response via `client.send`.
---
## Authentication
Authentication is optional and backward compatible. When `HubConfig.Authenticator` is nil, all connections are accepted unchanged. When set, `Handler` calls `Authenticate(r)` before the WebSocket upgrade:
```go
auth := ws.NewAPIKeyAuth(map[string]string{
"secret-key": "user-1",
})
hub := ws.NewHubWithConfig(ws.HubConfig{Authenticator: auth})
```
If authentication fails, the HTTP response is `401 Unauthorised` and no upgrade occurs.
### Authenticator Interface
```go
type Authenticator interface {
Authenticate(r *http.Request) AuthResult
}
type AuthResult struct {
Valid bool
UserID string
Claims map[string]any
Error error
}
```
Implementations may inspect any part of the request: headers, query parameters, cookies. The `AuthenticatorFunc` adapter allows plain functions to satisfy the interface without defining a named type.
### Built-in: APIKeyAuthenticator
`APIKeyAuthenticator` validates `Authorization: Bearer <key>` against a static key-to-userID map. It is intended for simple deployments and internal tooling. For JWT or OAuth, implement the `Authenticator` interface directly.
### Auth Fields on Client
On successful authentication, `client.UserID` and `client.Claims` are populated. These are readable from `OnConnect` callbacks and any code that holds a reference to the client.
### OnAuthFailure Callback
`HubConfig.OnAuthFailure` fires on every rejected connection. Use it for logging, metrics, or rate-limit triggers. It receives the original `*http.Request` and the `AuthResult`.
---
## Redis Pub/Sub Bridge
`RedisBridge` connects a local `Hub` to Redis, enabling multiple hub instances (across multiple processes or servers) to coordinate broadcasts and channel messages.
```
Instance A Redis Instance B
┌─────────┐ publish ┌─────────┐ receive ┌─────────┐
│ Hub A │──────────────►│ ws:* │─────────────►│ Hub B │
│ Bridge A│ │ channels│ │ Bridge B│
└─────────┘ └─────────┘ └─────────┘
```
### Redis Channel Naming
| Redis channel | Meaning |
|---|---|
| `{prefix}:broadcast` | Global broadcast to all clients across all instances |
| `{prefix}:channel:{name}` | Targeted delivery to subscribers of `{name}` |
The default prefix is `ws`. Override via `RedisConfig.Prefix`.
### Usage
```go
bridge, err := ws.NewRedisBridge(hub, ws.RedisConfig{
Addr: "10.69.69.87:6379",
Prefix: "ws",
})
bridge.Start(ctx)
defer bridge.Stop()
// Publish to all instances
bridge.PublishBroadcast(msg)
// Publish to subscribers of "process:abc" on all instances
bridge.PublishToChannel("process:abc", msg)
```
`NewRedisBridge` validates connectivity with a `PING` before returning. `Start` uses `PSubscribe` with a pattern that matches both the broadcast channel and all `{prefix}:channel:*` channels. The listener goroutine forwards received messages to the local hub via `hub.Broadcast` or `hub.SendToChannel` as appropriate.
### Envelope Pattern and Loop Prevention
Every published message is wrapped in a `redisEnvelope` before serialisation:
```go
type redisEnvelope struct {
SourceID string `json:"sourceId"`
Message Message `json:"message"`
}
```
`SourceID` is a 16-byte cryptographically random hex string generated once per bridge instance at construction time. The listener goroutine drops any envelope whose `SourceID` matches the local bridge's own ID. This prevents a bridge from re-delivering its own published messages to its own hub.
Without this guard, a single `PublishBroadcast` call would immediately echo back to the publishing instance, creating an infinite loop.
---
## ReconnectingClient
`ReconnectingClient` provides client-side resilience. It wraps a gorilla/websocket connection with an automatic reconnect loop:
```go
client := ws.NewReconnectingClient(ws.ReconnectConfig{
URL: "ws://server/ws",
InitialBackoff: 1 * time.Second,
MaxBackoff: 30 * time.Second,
BackoffMultiplier: 2.0,
MaxRetries: 0, // unlimited
OnConnect: func() { /* ... */ },
OnDisconnect: func() { /* ... */ },
OnReconnect: func(attempt int) { /* ... */ },
OnMessage: func(msg ws.Message) { /* ... */ },
})
go client.Connect(ctx) // blocks until context cancelled or max retries exceeded
```
### Backoff Calculation
Backoff doubles on each failed attempt, capped at `MaxBackoff`:
```
attempt 1: InitialBackoff
attempt 2: InitialBackoff * Multiplier
attempt 3: InitialBackoff * Multiplier^2
...
attempt N: min(InitialBackoff * Multiplier^(N-1), MaxBackoff)
```
The attempt counter resets to zero after a successful connection.
### Connection States
```go
const (
StateDisconnected ConnectionState = iota
StateConnecting
StateConnected
)
```
`client.State()` returns the current state under a read lock. Useful for health checks and UI indicators.
---
## Concurrency Model
| Resource | Protection |
|---|---|
| `hub.clients`, `hub.channels` | `sync.RWMutex` on Hub |
| `client.subscriptions` | `sync.RWMutex` on Client |
| `ReconnectingClient.conn`, `.state` | `sync.RWMutex` on ReconnectingClient |
| Hub state transitions | Serialised through `hub.register`/`hub.unregister` channels in `Run` loop |
The key invariant: `hub.clients` and `hub.channels` are mutated only from within `hub.Run` (via channel receive) or from `Subscribe`/`Unsubscribe` under write lock. `SendToChannel` copies the subscriber slice under read lock before releasing it, so iteration happens outside any lock without races.
All code is clean under `go test -race ./...`.
---
## Dependency Graph
```
go-ws
├── github.com/gorilla/websocket v1.5.3 (WebSocket server + client)
└── github.com/redis/go-redis/v9 v9.18.0 (Redis bridge, optional at runtime)
```
The Redis dependency is a compile-time import but a runtime opt-in. Applications that do not create a `RedisBridge` incur no Redis connections.