docs: graduate TODO/FINDINGS into production documentation

Replace internal task tracking (TODO.md, FINDINGS.md) with structured
documentation in docs/. Trim CLAUDE.md to agent instructions only.

Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
Snider 2026-02-20 15:01:55 +00:00
parent da3df0077d
commit 5ccb169342
6 changed files with 636 additions and 148 deletions

View file

@ -1,34 +1,33 @@
# CLAUDE.md
## What This Is
WebSocket hub with channel-based pub/sub for real-time streaming. Module: `forge.lthn.ai/core/go-ws`
Module: `forge.lthn.ai/core/go-ws`
## Commands
```bash
go test ./... # Run all tests
go test -v -run Name # Run single test
go test ./... # Run all tests
go test -race ./... # Race detector (required before commit)
go test -v -run Name ./... # Run single test
go test -bench=. -benchmem ./... # Benchmarks
```
## Architecture
- `Hub` manages WebSocket connections and channel subscriptions
- Messages types: `process_output`, `process_status`, `event`, `error`, `ping/pong`, `subscribe/unsubscribe`
- `hub.SendProcessOutput(id, line)` broadcasts to subscribers
- `HubConfig` provides configurable heartbeat, pong timeout, write timeout, and connection callbacks
- `Authenticator` interface for token-based auth on upgrade — nil means all connections accepted (backward compat)
- `APIKeyAuthenticator` validates `Authorization: Bearer <key>` against a static key→userID map
- `AuthenticatorFunc` adapter lets plain functions satisfy the `Authenticator` interface
- `Client.UserID` and `Client.Claims` populated during authenticated upgrade
- `OnAuthFailure` callback on `HubConfig` for logging/metrics on rejected connections
- `ReconnectingClient` provides client-side reconnection with exponential backoff
- `ConnectionState`: `StateDisconnected`, `StateConnecting`, `StateConnected`
- Coverage: 98.5%
## Coding Standards
- UK English
- `go test ./...` must pass before commit
- UK English throughout
- SPDX-Licence-Identifier: EUPL-1.2 header in every .go file
- `go vet ./...` and `go test -race ./...` must be clean before commit
- Conventional commits: `type(scope): description`
- Co-Author: `Co-Authored-By: Virgil <virgil@lethean.io>`
- Co-Author trailer: `Co-Authored-By: Virgil <virgil@lethean.io>`
## Key Interfaces
- `Authenticator` — implement for custom auth (JWT, OAuth, etc.)
- `HubConfig` — configures heartbeat, pong timeout, write timeout, callbacks, authenticator
- `ReconnectConfig` — configures client-side backoff and callbacks
## Documentation
See `docs/` for full reference:
- `docs/architecture.md` — hub pattern, channels, auth, Redis bridge, envelope/loop-prevention
- `docs/development.md` — prerequisites, test patterns, coding standards
- `docs/history.md` — completed phases with commit hashes, known limitations

View file

@ -1,60 +0,0 @@
# FINDINGS.md -- go-ws
## 2026-02-19: Split from core/go (Virgil)
### Origin
Extracted from `forge.lthn.ai/core/go` `pkg/ws/` on 19 Feb 2026.
### Architecture
- Hub pattern: central `Hub` manages client registration, unregistration, and message routing
- Channel-based subscriptions: clients subscribe to named channels for targeted messaging
- Broadcast support: send to all connected clients or to a specific channel
- Message types: `process_output`, `process_status`, `event`, `error`, `ping/pong`, `subscribe/unsubscribe`
- `writePump` batches outbound messages for efficiency (reduces syscall overhead)
- `readPump` handles inbound messages and automatic ping/pong keepalive
### Dependencies
- `github.com/gorilla/websocket` -- WebSocket server implementation
### Notes
- Hub must be started with `go hub.Run(ctx)` before accepting connections
- HTTP handler exposed via `hub.Handler()` for mounting on any router
- `hub.SendProcessOutput(processID, line)` is the primary API for streaming subprocess output
## 2026-02-20: Phase 0 & Phase 1 (Charon)
### Race condition fix
- `SendToChannel` had a data race: it acquired `RLock`, read the channel's client map, released `RUnlock`, then iterated clients outside the lock. If `Hub.Run` processed an unregister concurrently, the map was modified during iteration.
- Fix: copy client pointers into a slice under `RLock`, then iterate the copy after releasing the lock.
### Phase 0: Test coverage 88.4% to 98.5%
- Added 16 new test functions covering: hub shutdown, broadcast overflow, channel send overflow, marshal errors, upgrade error, `Client.Close`, malformed JSON, non-string subscribe/unsubscribe data, unknown message types, writePump close/batch, concurrent subscribe/unsubscribe, multi-client channel delivery, end-to-end process output/status.
- Added `BenchmarkBroadcast` (100 clients) and `BenchmarkSendToChannel` (50 subscribers).
- `go vet ./...` clean; `go test -race ./...` clean.
### Phase 1: Connection resilience
- `HubConfig` struct: `HeartbeatInterval`, `PongTimeout`, `WriteTimeout`, `OnConnect`, `OnDisconnect` callbacks.
- `NewHubWithConfig(config)`: constructor with validation and defaults.
- `readPump`/`writePump` now use hub config values instead of hardcoded durations.
- `ReconnectingClient`: client-side reconnection with exponential backoff.
- `ReconnectConfig`: URL, InitialBackoff (1s), MaxBackoff (30s), BackoffMultiplier (2.0), MaxRetries, Dialer, Headers, OnConnect/OnDisconnect/OnReconnect/OnMessage callbacks.
- `Connect(ctx)`: blocking reconnect loop; returns on context cancel or max retries.
- `Send(msg)`: thread-safe message send; returns error if not connected.
- `State()`: returns `StateDisconnected`, `StateConnecting`, or `StateConnected`.
- `Close()`: cancels context and closes underlying connection.
- Exponential backoff: `calculateBackoff(attempt)` doubles each attempt, capped at MaxBackoff.
### API surface additions
- `HubConfig`, `DefaultHubConfig()`, `NewHubWithConfig()`
- `ConnectionState` enum: `StateDisconnected`, `StateConnecting`, `StateConnected`
- `ReconnectConfig`, `ReconnectingClient`, `NewReconnectingClient()`
- `DefaultHeartbeatInterval`, `DefaultPongTimeout`, `DefaultWriteTimeout` constants
- `NewHub()` still works unchanged (uses `DefaultHubConfig()` internally)

65
TODO.md
View file

@ -1,65 +0,0 @@
# TODO.md — go-ws
Dispatched from core/go orchestration. Pick up tasks in order.
---
## Phase 0: Hardening & Test Coverage (complete)
- [x] **Expand test coverage** — 67 test functions total across two sessions. Hub.Run lifecycle (register, broadcast delivery, shutdown, unregister, duplicate unregister), Subscribe/Unsubscribe (multiple channels, idempotent, partial leave, concurrent race), SendToChannel (no subscribers, multiple subscribers, buffer full), SendProcessOutput/SendProcessStatus (no subscribers, non-zero exit), readPump (subscribe, unsubscribe, ping, invalid JSON, non-string data, unknown types), writePump (batch sending, close-on-channel-close), buffer overflow disconnect, marshal errors, Handler upgrade error, Client.Close(), broadcast reaches all clients, disconnect cleans up everything.
- [x] **Integration test** — End-to-end tests using httptest.NewServer + gorilla/websocket Dial: connect-subscribe-send-receive, multiple clients on same channel, unsubscribe stops delivery, broadcast reaches all clients, process output/status streaming, disconnect cleanup.
- [x] **Benchmark** — 9 benchmarks in ws_bench_test.go: BenchmarkBroadcast_100 (100 clients), BenchmarkSendToChannel_50 (50 subscribers), parallel variants, message marshalling, WebSocket end-to-end, subscribe/unsubscribe cycle, multi-channel fanout, concurrent subscribers. All use b.ReportAllocs() and b.Loop() (Go 1.25+). Plus 2 inline benchmarks in ws_test.go.
- [x] **`go vet ./...` clean** — No warnings. Race-free under `go test -race`.
- [x] **Race condition fix** — Fixed data race in SendToChannel: clients now copied under lock before iteration.
## Phase 1: Connection Resilience
- [x] Add client-side reconnection support (exponential backoff) — `ReconnectingClient` with `ReconnectConfig`. Configurable initial backoff, max backoff, multiplier, max retries.
- [x] Tune heartbeat interval and pong timeout for flaky networks — `HubConfig` with `HeartbeatInterval`, `PongTimeout`, `WriteTimeout`. `NewHubWithConfig()` constructor. Defaults: 30s heartbeat, 60s pong timeout, 10s write timeout.
- [x] Add connection state callbacks (onConnect, onDisconnect, onReconnect) — Hub-level `OnConnect`/`OnDisconnect` callbacks in `HubConfig`. Client-level `OnConnect`/`OnDisconnect`/`OnReconnect` callbacks in `ReconnectConfig`. `ConnectionState` enum: `StateDisconnected`, `StateConnecting`, `StateConnected`.
## Phase 2: Auth (complete)
Token-based authentication on WebSocket upgrade handshake. Pure Go, no JWT library dependency — consumers bring their own validation logic via an interface.
### 2.1 Authenticator Interface
- [x] **Create `auth.go`** — Define the auth abstraction:
- `type AuthResult struct { Valid bool; UserID string; Claims map[string]any; Error error }` — result of authentication
- `type Authenticator interface { Authenticate(r *http.Request) AuthResult }` — validates the HTTP request during upgrade. Implementations can check headers (`Authorization: Bearer <token>`), query params (`?token=xxx`), or cookies.
- `type AuthenticatorFunc func(r *http.Request) AuthResult` — adapter for using functions as Authenticators (implements the interface)
- `type APIKeyAuthenticator struct { Keys map[string]string }` — built-in authenticator that validates `Authorization: Bearer <key>` against a static key→userID map. Provided as a convenience; consumers can use their own JWT-based authenticator.
- `func NewAPIKeyAuth(keys map[string]string) *APIKeyAuthenticator` — constructor
### 2.2 Wire Into Hub
- [x] **Add `Authenticator` to `HubConfig`** — Optional field. When nil, all connections are accepted (backward compatible). When set, `Handler()` calls `Authenticate(r)` before upgrading.
- [x] **Update `Handler()`** — If `h.config.Authenticator != nil`, call `Authenticate(r)`. If `!result.Valid`, respond with `http.StatusUnauthorized` (or `http.StatusForbidden` if `result.Error` indicates a different status) and return without upgrading. If valid, store `result.UserID` and `result.Claims` on the `Client` struct.
- [x] **Add auth fields to `Client`**`UserID string` and `Claims map[string]any` fields. Set during authenticated upgrade. Empty for unauthenticated hubs (nil authenticator).
- [x] **Expose `OnAuthFailure` callback** — Optional `OnAuthFailure func(r *http.Request, result AuthResult)` on `HubConfig` for logging/metrics on rejected connections.
### 2.3 Tests
- [x] **Unit tests** — (a) APIKeyAuthenticator valid key, (b) invalid key, (c) missing header, (d) malformed header ("Bearer" without token, wrong scheme), (e) AuthenticatorFunc adapter, (f) nil Authenticator (backward compat — all connections accepted)
- [x] **Integration tests** — Using httptest + gorilla/websocket Dial:
- (a) Authenticated connect with valid API key → upgrade succeeds, client.UserID set
- (b) Rejected connect with invalid key → HTTP 401, no WebSocket upgrade
- (c) Rejected connect with no auth header → HTTP 401
- (d) Nil authenticator → all connections accepted (existing behaviour preserved)
- (e) OnAuthFailure callback fires on rejection
- (f) Multiple clients with different API keys → each gets correct UserID
- [x] **Existing tests still pass** — No authenticator set = backward compatible
## Phase 3: Scaling
- [x] Support Redis pub/sub as backend for multi-instance hub coordination
- [x] Broadcast messages across hub instances via Redis channels
- [ ] Add sticky sessions or connection-affinity documentation for load balancers
---
## Workflow
1. Virgil in core/go writes tasks here after research
2. This repo's dedicated session picks up tasks in phase order
3. Mark `[x]` when done, note commit hash

317
docs/architecture.md Normal file
View file

@ -0,0 +1,317 @@
# go-ws Architecture
Module: `forge.lthn.ai/core/go-ws`
---
## Overview
go-ws is a WebSocket hub for real-time streaming in Go. It implements the hub pattern for centralised connection management with channel-based pub/sub delivery, token-based authentication on upgrade, client-side reconnection with exponential backoff, and a Redis pub/sub bridge for coordinating multiple hub instances.
---
## Hub Pattern
The `Hub` struct is the central broker. It owns all connection state and serialises mutations through goroutine-safe channels:
```
┌─────────────────────────────┐
│ Hub │
HTTP upgrade ──► │ register chan *Client │
disconnect ──► │ unregister chan *Client │
server send ──► │ broadcast chan []byte │
│ │
│ clients map[*Client]bool │
│ channels map[string]map… │
└─────────────────────────────┘
```
`Hub.Run(ctx)` is a single select-loop goroutine that processes all state transitions. Mutations to `clients` and `channels` occur only inside `Run`, protected by `sync.RWMutex` for reads from concurrent senders. This eliminates the need for channel-specific mutexes on write paths, while `SendToChannel` uses `RLock` plus a client-slice copy to prevent iterator invalidation races.
### Lifecycle
1. Call `hub.Run(ctx)` in a goroutine before accepting connections.
2. Mount `hub.Handler()` on any `http.ServeMux` or router.
3. Cancel the context to shut down: `Run` closes all client send channels, which causes `writePump` goroutines to send a WebSocket close frame and exit.
---
## Connection Management
Each connected client is represented by a `Client`:
```go
type Client struct {
hub *Hub
conn *websocket.Conn
send chan []byte // buffered, capacity 256
subscriptions map[string]bool
mu sync.RWMutex
UserID string
Claims map[string]any
}
```
On upgrade, `Handler` creates a `Client`, sends it to `hub.register`, then starts two goroutines:
- `readPump` — reads inbound frames, dispatches subscribe/unsubscribe/ping, enforces pong timeout.
- `writePump` — drains `client.send`, batches queued frames into a single write, sends server-side ping on heartbeat tick.
On disconnect (read error, write error, or context cancel), `readPump` sends the client to `hub.unregister`. `Run` removes it from `clients` and all channel maps, then closes `client.send`. Closing `client.send` is the signal that causes `writePump` to send a close frame and exit.
### Buffer Overflow
Each client's `send` channel has capacity 256. When `Broadcast` or `SendToChannel` cannot deliver to a full channel, the client is considered stalled. For broadcasts, `Run` schedules an unregister via a goroutine. For `SendToChannel`, the message is silently dropped for that client only.
### Timing Defaults
| Constant | Default | Purpose |
|---|---|---|
| `DefaultHeartbeatInterval` | 30 s | Server-side ping cadence |
| `DefaultPongTimeout` | 60 s | Read deadline after each pong |
| `DefaultWriteTimeout` | 10 s | Write deadline per frame |
All three are configurable via `HubConfig`.
---
## Channel Subscriptions
Channels are named strings; there are no predefined names. Clients subscribe by sending a JSON message:
```json
{"type": "subscribe", "data": "process:proc-1"}
```
`readPump` intercepts subscribe/unsubscribe frames and calls `hub.Subscribe` / `hub.Unsubscribe` directly. The hub maintains two parallel indices:
- `hub.channels[channelName][*Client]` — for targeted send
- `client.subscriptions[channelName]` — for cleanup on disconnect
Both indices are kept consistent. Unsubscribing from a channel with no remaining subscribers removes the channel entry entirely.
### Sending to a Channel
```go
hub.SendToChannel("process:proc-1", ws.Message{
Type: ws.TypeProcessOutput,
Data: "output line",
})
```
`SendToChannel` acquires `RLock`, copies the subscriber slice, releases the lock, then delivers to each client's `send` channel. The copy-under-lock pattern is critical: it prevents a data race with `hub.unregister` which mutates the map under write lock concurrently.
### Process Helpers
Two convenience methods wrap `SendToChannel` with idiomatic channel naming (`process:<id>`):
```go
hub.SendProcessOutput(processID, line)
hub.SendProcessStatus(processID, "exited", exitCode)
```
---
## Message Types
All frames are JSON-encoded `Message` structs:
```go
type Message struct {
Type MessageType `json:"type"`
Channel string `json:"channel,omitempty"`
ProcessID string `json:"processId,omitempty"`
Data any `json:"data,omitempty"`
Timestamp time.Time `json:"timestamp"`
}
```
| Type | Direction | Purpose |
|---|---|---|
| `process_output` | server → client | Real-time subprocess stdout/stderr line |
| `process_status` | server → client | Process state change (`running`, `exited`) with exit code |
| `event` | server → client | Generic application event |
| `error` | server → client | Error notification |
| `ping` | client → server | Keep-alive request |
| `pong` | server → client | Response to client ping |
| `subscribe` | client → server | Subscribe to a named channel |
| `unsubscribe` | client → server | Unsubscribe from a named channel |
Server-side pings use the WebSocket protocol ping frame (not a JSON `ping` message). Client-sent `ping` messages result in a JSON `pong` response via `client.send`.
---
## Authentication
Authentication is optional and backward compatible. When `HubConfig.Authenticator` is nil, all connections are accepted unchanged. When set, `Handler` calls `Authenticate(r)` before the WebSocket upgrade:
```go
auth := ws.NewAPIKeyAuth(map[string]string{
"secret-key": "user-1",
})
hub := ws.NewHubWithConfig(ws.HubConfig{Authenticator: auth})
```
If authentication fails, the HTTP response is `401 Unauthorised` and no upgrade occurs.
### Authenticator Interface
```go
type Authenticator interface {
Authenticate(r *http.Request) AuthResult
}
type AuthResult struct {
Valid bool
UserID string
Claims map[string]any
Error error
}
```
Implementations may inspect any part of the request: headers, query parameters, cookies. The `AuthenticatorFunc` adapter allows plain functions to satisfy the interface without defining a named type.
### Built-in: APIKeyAuthenticator
`APIKeyAuthenticator` validates `Authorization: Bearer <key>` against a static key-to-userID map. It is intended for simple deployments and internal tooling. For JWT or OAuth, implement the `Authenticator` interface directly.
### Auth Fields on Client
On successful authentication, `client.UserID` and `client.Claims` are populated. These are readable from `OnConnect` callbacks and any code that holds a reference to the client.
### OnAuthFailure Callback
`HubConfig.OnAuthFailure` fires on every rejected connection. Use it for logging, metrics, or rate-limit triggers. It receives the original `*http.Request` and the `AuthResult`.
---
## Redis Pub/Sub Bridge
`RedisBridge` connects a local `Hub` to Redis, enabling multiple hub instances (across multiple processes or servers) to coordinate broadcasts and channel messages.
```
Instance A Redis Instance B
┌─────────┐ publish ┌─────────┐ receive ┌─────────┐
│ Hub A │──────────────►│ ws:* │─────────────►│ Hub B │
│ Bridge A│ │ channels│ │ Bridge B│
└─────────┘ └─────────┘ └─────────┘
```
### Redis Channel Naming
| Redis channel | Meaning |
|---|---|
| `{prefix}:broadcast` | Global broadcast to all clients across all instances |
| `{prefix}:channel:{name}` | Targeted delivery to subscribers of `{name}` |
The default prefix is `ws`. Override via `RedisConfig.Prefix`.
### Usage
```go
bridge, err := ws.NewRedisBridge(hub, ws.RedisConfig{
Addr: "10.69.69.87:6379",
Prefix: "ws",
})
bridge.Start(ctx)
defer bridge.Stop()
// Publish to all instances
bridge.PublishBroadcast(msg)
// Publish to subscribers of "process:abc" on all instances
bridge.PublishToChannel("process:abc", msg)
```
`NewRedisBridge` validates connectivity with a `PING` before returning. `Start` uses `PSubscribe` with a pattern that matches both the broadcast channel and all `{prefix}:channel:*` channels. The listener goroutine forwards received messages to the local hub via `hub.Broadcast` or `hub.SendToChannel` as appropriate.
### Envelope Pattern and Loop Prevention
Every published message is wrapped in a `redisEnvelope` before serialisation:
```go
type redisEnvelope struct {
SourceID string `json:"sourceId"`
Message Message `json:"message"`
}
```
`SourceID` is a 16-byte cryptographically random hex string generated once per bridge instance at construction time. The listener goroutine drops any envelope whose `SourceID` matches the local bridge's own ID. This prevents a bridge from re-delivering its own published messages to its own hub.
Without this guard, a single `PublishBroadcast` call would immediately echo back to the publishing instance, creating an infinite loop.
---
## ReconnectingClient
`ReconnectingClient` provides client-side resilience. It wraps a gorilla/websocket connection with an automatic reconnect loop:
```go
client := ws.NewReconnectingClient(ws.ReconnectConfig{
URL: "ws://server/ws",
InitialBackoff: 1 * time.Second,
MaxBackoff: 30 * time.Second,
BackoffMultiplier: 2.0,
MaxRetries: 0, // unlimited
OnConnect: func() { /* ... */ },
OnDisconnect: func() { /* ... */ },
OnReconnect: func(attempt int) { /* ... */ },
OnMessage: func(msg ws.Message) { /* ... */ },
})
go client.Connect(ctx) // blocks until context cancelled or max retries exceeded
```
### Backoff Calculation
Backoff doubles on each failed attempt, capped at `MaxBackoff`:
```
attempt 1: InitialBackoff
attempt 2: InitialBackoff * Multiplier
attempt 3: InitialBackoff * Multiplier^2
...
attempt N: min(InitialBackoff * Multiplier^(N-1), MaxBackoff)
```
The attempt counter resets to zero after a successful connection.
### Connection States
```go
const (
StateDisconnected ConnectionState = iota
StateConnecting
StateConnected
)
```
`client.State()` returns the current state under a read lock. Useful for health checks and UI indicators.
---
## Concurrency Model
| Resource | Protection |
|---|---|
| `hub.clients`, `hub.channels` | `sync.RWMutex` on Hub |
| `client.subscriptions` | `sync.RWMutex` on Client |
| `ReconnectingClient.conn`, `.state` | `sync.RWMutex` on ReconnectingClient |
| Hub state transitions | Serialised through `hub.register`/`hub.unregister` channels in `Run` loop |
The key invariant: `hub.clients` and `hub.channels` are mutated only from within `hub.Run` (via channel receive) or from `Subscribe`/`Unsubscribe` under write lock. `SendToChannel` copies the subscriber slice under read lock before releasing it, so iteration happens outside any lock without races.
All code is clean under `go test -race ./...`.
---
## Dependency Graph
```
go-ws
├── github.com/gorilla/websocket v1.5.3 (WebSocket server + client)
└── github.com/redis/go-redis/v9 v9.18.0 (Redis bridge, optional at runtime)
```
The Redis dependency is a compile-time import but a runtime opt-in. Applications that do not create a `RedisBridge` incur no Redis connections.

185
docs/development.md Normal file
View file

@ -0,0 +1,185 @@
# go-ws Development Guide
Module: `forge.lthn.ai/core/go-ws`
---
## Prerequisites
- Go 1.25 or later (benchmarks use `b.Loop()`, which requires 1.25+)
- A running Redis instance if exercising `RedisBridge` integration tests (default: `10.69.69.87:6379`)
- No CGO dependencies; builds on Linux, macOS, and Windows without toolchain additions
---
## Build and Test
```bash
# Run the full test suite
go test ./...
# Run with race detector (required before every commit)
go test -race ./...
# Run a single test by name
go test -v -run TestHub_Run ./...
# Run benchmarks
go test -bench=. -benchmem ./...
# Run a specific benchmark
go test -bench=BenchmarkBroadcast_100 -benchmem ./...
```
There is no Taskfile in this module. All automation goes through the standard `go` toolchain.
---
## Test Organisation
Tests live in the same package (`package ws`), giving direct access to unexported fields for white-box assertions. Three test files exist:
| File | Coverage |
|---|---|
| `ws_test.go` | Hub lifecycle, broadcast, channel send, subscribe/unsubscribe, readPump, writePump, integration via httptest, auth integration |
| `auth_test.go` | `APIKeyAuthenticator`, `AuthenticatorFunc`, nil authenticator, `OnAuthFailure` callback |
| `redis_test.go` | `RedisBridge` unit and integration (skipped when Redis unavailable) |
| `ws_bench_test.go` | 9 benchmarks: broadcast, channel send, parallel variants, marshal, WebSocket end-to-end, subscribe/unsubscribe cycle, multi-channel fanout |
### Test Naming Convention
Integration tests use `httptest.NewServer` + `gorilla/websocket` Dial for end-to-end coverage. Unit tests drive the Hub directly via channels and method calls.
### Redis Tests
Redis integration tests call `skipIfNoRedis(t)` at the top, which pings the configured address and skips if unreachable:
```go
func TestRedisBridge_PublishBroadcast(t *testing.T) {
client := skipIfNoRedis(t)
// ...
}
```
Each Redis test uses a unique time-based prefix to avoid collisions between parallel runs. Cleanup removes any leftover keys in `t.Cleanup`.
### Benchmark Pattern
Benchmarks use `b.Loop()` (Go 1.25+) and `b.ReportAllocs()`:
```go
func BenchmarkBroadcast_100(b *testing.B) {
// setup ...
b.ResetTimer()
b.ReportAllocs()
for b.Loop() {
_ = hub.Broadcast(msg)
}
b.StopTimer()
// drain ...
}
```
---
## Coding Standards
### Language
UK English throughout: "initialise", "colour", "behaviour", "cancelled", "unauthorised". Never American spellings.
### Go Style
- `declare(strict_types=1)` is a PHP concept; in Go, enforce correctness via the type system and `go vet`.
- All exported symbols must have doc comments.
- Error strings are lowercase and do not end with punctuation (Go convention).
- Use named return values only when they materially clarify intent.
- Prefer `require.NoError` over `assert.NoError` when a test cannot continue after failure.
### Licence Header
Every `.go` source file begins with:
```go
// SPDX-Licence-Identifier: EUPL-1.2
```
This applies to all files including test files.
### Formatting
Standard `gofmt`. No additional linter configuration is committed. Run `go vet ./...` before committing; the suite must be clean.
---
## Commit Guidelines
Commits follow Conventional Commits:
```
type(scope): description
```
Common types: `feat`, `fix`, `test`, `docs`, `refactor`, `bench`.
Scope is the logical area: `ws`, `auth`, `redis`, `reconnect`.
Every commit includes the co-author trailer:
```
Co-Authored-By: Virgil <virgil@lethean.io>
```
Example:
```
feat(redis): add envelope pattern for loop prevention
Generated a random sourceID per bridge instance at construction.
Listener drops messages where sourceID matches local bridge.
Co-Authored-By: Virgil <virgil@lethean.io>
```
---
## Adding a New Message Type
1. Add a constant to the `MessageType` block in `ws.go`.
2. Handle the new type in `readPump` if it is client-initiated.
3. Add a helper method on `Hub` if it is server-initiated (following the pattern of `SendProcessOutput`).
4. Add unit tests covering the new type in `ws_test.go`.
5. Update `docs/architecture.md` message type table.
---
## Adding a New Authenticator
Implement the `Authenticator` interface:
```go
type MyJWTAuth struct { /* ... */ }
func (a *MyJWTAuth) Authenticate(r *http.Request) ws.AuthResult {
token := r.Header.Get("Authorization")
claims, err := validateJWT(token)
if err != nil {
return ws.AuthResult{Valid: false, Error: err}
}
return ws.AuthResult{
Valid: true,
UserID: claims.Subject,
Claims: map[string]any{"roles": claims.Roles},
}
}
```
Pass it to `HubConfig.Authenticator`. The built-in `APIKeyAuthenticator` in `auth.go` serves as a reference implementation. JWT validation libraries are intentionally not imported; consumers bring their own.
---
## Repository
- Forge: `ssh://git@forge.lthn.ai:2223/core/go-ws.git`
- Push via SSH only; HTTPS authentication is not configured on Forge.
- Licence: EUPL-1.2

112
docs/history.md Normal file
View file

@ -0,0 +1,112 @@
# go-ws Project History
---
## Origin
go-ws was extracted from `forge.lthn.ai/core/go` on 19 February 2026. The original code lived at `pkg/ws/` within the core/go monorepo. It was split into a standalone module to allow independent versioning, dedicated test coverage, and direct use by consumers that do not need the full core/go framework.
Extraction commit: `e942500 feat: extract go-ws from core/go pkg/ws`
---
## Phases
### Phase 0: Hardening and Test Coverage
Commit: `53d8a15 test: expand Phase 0 coverage — 16 new tests, 9 benchmarks, SPDX headers`
Commit: `13d9422 feat(ws): phase 0 coverage (98.5%) + phase 1 connection resilience`
Coverage rose from 88.4% to 98.5%. Work included:
- 16 new test functions: hub shutdown, broadcast channel overflow, `SendToChannel` buffer full, marshal errors, upgrade error, `Client.Close`, malformed JSON, non-string subscribe/unsubscribe data, unknown message types, `writePump` close and batch paths, concurrent subscribe/unsubscribe race, multi-client channel delivery, end-to-end process output and status streaming.
- 9 benchmarks in `ws_bench_test.go`: `BenchmarkBroadcast_100`, `BenchmarkSendToChannel_50`, parallel variants, message marshalling, WebSocket end-to-end, subscribe/unsubscribe cycle, multi-channel fanout, concurrent subscribers.
- All benchmarks use `b.Loop()` (Go 1.25+) and `b.ReportAllocs()`.
- Race condition fix: `SendToChannel` previously read the channel's client map under `RLock`, released the lock, then iterated. A concurrent `unregister` could mutate the map during iteration. Fixed by copying client pointers to a slice under `RLock` and iterating the copy after releasing the lock.
- SPDX licence headers added to all source files.
- `go vet ./...` clean; `go test -race ./...` clean.
### Phase 1: Connection Resilience
Commit: `13d9422 feat(ws): phase 0 coverage (98.5%) + phase 1 connection resilience`
- `HubConfig` struct with configurable `HeartbeatInterval`, `PongTimeout`, `WriteTimeout`, and `OnConnect`/`OnDisconnect` callbacks.
- `DefaultHubConfig()` and `NewHubWithConfig(config)` constructors. `NewHub()` delegates to `NewHubWithConfig(DefaultHubConfig())`.
- `readPump` and `writePump` use hub config values instead of hardcoded durations.
- `ReconnectingClient` with exponential backoff:
- `ReconnectConfig`: URL, `InitialBackoff` (1 s), `MaxBackoff` (30 s), `BackoffMultiplier` (2.0), `MaxRetries` (0 = unlimited), `Dialer`, `Headers`, `OnConnect`/`OnDisconnect`/`OnReconnect`/`OnMessage` callbacks.
- `Connect(ctx)`: blocking reconnect loop, returns on context cancel or max retries exceeded.
- `Send(msg)`: thread-safe write to server, returns error if not connected.
- `State()`: returns `StateDisconnected`, `StateConnecting`, or `StateConnected`.
- `Close()`: cancels context, closes underlying connection.
- `ConnectionState` enum: `StateDisconnected`, `StateConnecting`, `StateConnected`.
### Phase 2: Authentication
Commit: `534bbe5 docs: flesh out Phase 2 auth task specs`
Commit: `9e48f0b feat(auth): Phase 2 — token-based authentication on WebSocket upgrade`
Token-based authentication at the HTTP upgrade handshake. No JWT library is imported; consumers provide their own validation logic via an interface.
- `Authenticator` interface: `Authenticate(r *http.Request) AuthResult`.
- `AuthResult` struct: `Valid bool`, `UserID string`, `Claims map[string]any`, `Error error`.
- `AuthenticatorFunc` adapter for plain function use.
- `APIKeyAuthenticator`: validates `Authorization: Bearer <key>` against a static key-to-userID map.
- `NewAPIKeyAuth(keys map[string]string)` constructor.
- `HubConfig.Authenticator`: optional; nil preserves backward-compatible behaviour (all connections accepted).
- `HubConfig.OnAuthFailure`: optional callback for rejected connections.
- `Client.UserID` and `Client.Claims` populated on successful authentication.
- Auth errors defined in `errors.go`: `ErrMissingAuthHeader`, `ErrMalformedAuthHeader`, `ErrInvalidAPIKey`.
- Unit tests: valid key, invalid key, missing header, malformed header, `AuthenticatorFunc` adapter, nil authenticator.
- Integration tests via httptest: authenticated connect, rejected connect (401), nil authenticator backward compat, `OnAuthFailure` callback, multiple clients with distinct user IDs.
### Phase 3: Redis Pub/Sub Bridge
Commit: `da3df00 feat(redis): add Redis pub/sub bridge for multi-instance Hub coordination`
Cross-instance coordination via Redis pub/sub.
- `RedisBridge` struct: wraps a `Hub` with a Redis client and listener goroutine.
- `RedisConfig`: `Addr`, `Password`, `DB`, `Prefix` (default `ws`).
- `NewRedisBridge(hub, cfg)`: validates connectivity with `PING` before returning.
- `Start(ctx)`: subscribes via `PSubscribe` to `{prefix}:broadcast` and `{prefix}:channel:*`, spawns listener goroutine.
- `Stop()`: cancels listener, closes pub/sub subscription and Redis client connection.
- `PublishBroadcast(msg)`: publishes to `{prefix}:broadcast`; all bridge instances deliver to their local hub clients.
- `PublishToChannel(channel, msg)`: publishes to `{prefix}:channel:{channel}`; all bridge instances deliver to local subscribers of that channel.
- Envelope pattern (`redisEnvelope`): wraps every message with a `sourceID` (16-byte cryptographically random hex) to prevent echo loops. The listener silently drops messages whose `sourceID` matches the local bridge.
- `SourceID()`: returns the instance identifier, useful for debugging.
- Redis integration tests use `skipIfNoRedis(t)` and unique time-based prefixes to avoid collisions between parallel runs.
---
## Known Limitations
### Load Balancer Affinity
The Redis bridge coordinates messages across hub instances but does not solve connection affinity. When a client connects to instance A and a message is published via instance B, the message reaches the client correctly through Redis. However, subscription state is local to each hub instance. A client subscribed on instance A is invisible to instance B's channel maps.
Consequence: `hub.ChannelSubscriberCount` and `hub.Stats` reflect only local state. There is no global subscriber registry. For systems that need to know whether any instance has a subscriber before publishing, either publish unconditionally (relying on Redis to route) or implement a shared registry in Redis.
Sticky sessions at the load balancer level (by client IP or cookie) eliminate the affinity problem entirely and are the recommended approach for most deployments.
### Origin Check
The WebSocket upgrader is configured with `CheckOrigin: func(*http.Request) bool { return true }`. This accepts connections from any origin, which is appropriate for local development and internal tooling. Production deployments behind a reverse proxy with strict origin control should override the upgrader or add origin validation in an `Authenticator` implementation.
### Broadcast Buffer
The hub's broadcast channel has a fixed capacity of 256. High-throughput broadcast workloads can saturate this buffer, causing `hub.Broadcast` to return an error. Callers should handle this error and consider whether dropping or queuing at the application level is appropriate.
### No Global Subscriber Registry
There is no mechanism to enumerate all connected clients across multiple hub instances. Redis holds no session state; it only relays messages. Applications requiring global presence information (e.g. "how many users are watching process X") must maintain their own counter, typically in Redis.
---
## Future Considerations
- **Sticky session documentation**: A deployment guide covering Nginx/Traefik IP-hash or cookie-based affinity for multi-instance setups.
- **Global subscriber registry**: Optional Redis-backed presence tracking to complement the bridge.
- **TLS for Redis**: `RedisConfig` currently supports `Addr`, `Password`, and `DB` only. Adding a `TLSConfig *tls.Config` field would support encrypted Redis connections without breaking the existing API.
- **Message acknowledgement**: The current model is fire-and-forget. A future phase could add client-side ack with server-side retry for guaranteed delivery on unreliable connections.
- **Per-channel access control**: The `Authenticator` interface gates connection upgrade but does not restrict which channels a client may subscribe to. A `ChannelAuthoriser` hook on `HubConfig` would allow per-subscription checks using `client.Claims`.