From 5ccb169342a31f409ff11b9d04e3ff182f65d42d Mon Sep 17 00:00:00 2001 From: Snider Date: Fri, 20 Feb 2026 15:01:55 +0000 Subject: [PATCH] docs: graduate TODO/FINDINGS into production documentation Replace internal task tracking (TODO.md, FINDINGS.md) with structured documentation in docs/. Trim CLAUDE.md to agent instructions only. Co-Authored-By: Virgil --- CLAUDE.md | 45 +++--- FINDINGS.md | 60 -------- TODO.md | 65 --------- docs/architecture.md | 317 +++++++++++++++++++++++++++++++++++++++++++ docs/development.md | 185 +++++++++++++++++++++++++ docs/history.md | 112 +++++++++++++++ 6 files changed, 636 insertions(+), 148 deletions(-) delete mode 100644 FINDINGS.md delete mode 100644 TODO.md create mode 100644 docs/architecture.md create mode 100644 docs/development.md create mode 100644 docs/history.md diff --git a/CLAUDE.md b/CLAUDE.md index c0e66b6..fda88a9 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -1,34 +1,33 @@ # CLAUDE.md -## What This Is - -WebSocket hub with channel-based pub/sub for real-time streaming. Module: `forge.lthn.ai/core/go-ws` +Module: `forge.lthn.ai/core/go-ws` ## Commands ```bash -go test ./... # Run all tests -go test -v -run Name # Run single test +go test ./... # Run all tests +go test -race ./... # Race detector (required before commit) +go test -v -run Name ./... # Run single test +go test -bench=. -benchmem ./... # Benchmarks ``` -## Architecture - -- `Hub` manages WebSocket connections and channel subscriptions -- Messages types: `process_output`, `process_status`, `event`, `error`, `ping/pong`, `subscribe/unsubscribe` -- `hub.SendProcessOutput(id, line)` broadcasts to subscribers -- `HubConfig` provides configurable heartbeat, pong timeout, write timeout, and connection callbacks -- `Authenticator` interface for token-based auth on upgrade — nil means all connections accepted (backward compat) -- `APIKeyAuthenticator` validates `Authorization: Bearer ` against a static key→userID map -- `AuthenticatorFunc` adapter lets plain functions satisfy the `Authenticator` interface -- `Client.UserID` and `Client.Claims` populated during authenticated upgrade -- `OnAuthFailure` callback on `HubConfig` for logging/metrics on rejected connections -- `ReconnectingClient` provides client-side reconnection with exponential backoff -- `ConnectionState`: `StateDisconnected`, `StateConnecting`, `StateConnected` -- Coverage: 98.5% - ## Coding Standards -- UK English -- `go test ./...` must pass before commit +- UK English throughout +- SPDX-Licence-Identifier: EUPL-1.2 header in every .go file +- `go vet ./...` and `go test -race ./...` must be clean before commit - Conventional commits: `type(scope): description` -- Co-Author: `Co-Authored-By: Virgil ` +- Co-Author trailer: `Co-Authored-By: Virgil ` + +## Key Interfaces + +- `Authenticator` — implement for custom auth (JWT, OAuth, etc.) +- `HubConfig` — configures heartbeat, pong timeout, write timeout, callbacks, authenticator +- `ReconnectConfig` — configures client-side backoff and callbacks + +## Documentation + +See `docs/` for full reference: +- `docs/architecture.md` — hub pattern, channels, auth, Redis bridge, envelope/loop-prevention +- `docs/development.md` — prerequisites, test patterns, coding standards +- `docs/history.md` — completed phases with commit hashes, known limitations diff --git a/FINDINGS.md b/FINDINGS.md deleted file mode 100644 index d4c75c6..0000000 --- a/FINDINGS.md +++ /dev/null @@ -1,60 +0,0 @@ -# FINDINGS.md -- go-ws - -## 2026-02-19: Split from core/go (Virgil) - -### Origin - -Extracted from `forge.lthn.ai/core/go` `pkg/ws/` on 19 Feb 2026. - -### Architecture - -- Hub pattern: central `Hub` manages client registration, unregistration, and message routing -- Channel-based subscriptions: clients subscribe to named channels for targeted messaging -- Broadcast support: send to all connected clients or to a specific channel -- Message types: `process_output`, `process_status`, `event`, `error`, `ping/pong`, `subscribe/unsubscribe` -- `writePump` batches outbound messages for efficiency (reduces syscall overhead) -- `readPump` handles inbound messages and automatic ping/pong keepalive - -### Dependencies - -- `github.com/gorilla/websocket` -- WebSocket server implementation - -### Notes - -- Hub must be started with `go hub.Run(ctx)` before accepting connections -- HTTP handler exposed via `hub.Handler()` for mounting on any router -- `hub.SendProcessOutput(processID, line)` is the primary API for streaming subprocess output - -## 2026-02-20: Phase 0 & Phase 1 (Charon) - -### Race condition fix - -- `SendToChannel` had a data race: it acquired `RLock`, read the channel's client map, released `RUnlock`, then iterated clients outside the lock. If `Hub.Run` processed an unregister concurrently, the map was modified during iteration. -- Fix: copy client pointers into a slice under `RLock`, then iterate the copy after releasing the lock. - -### Phase 0: Test coverage 88.4% to 98.5% - -- Added 16 new test functions covering: hub shutdown, broadcast overflow, channel send overflow, marshal errors, upgrade error, `Client.Close`, malformed JSON, non-string subscribe/unsubscribe data, unknown message types, writePump close/batch, concurrent subscribe/unsubscribe, multi-client channel delivery, end-to-end process output/status. -- Added `BenchmarkBroadcast` (100 clients) and `BenchmarkSendToChannel` (50 subscribers). -- `go vet ./...` clean; `go test -race ./...` clean. - -### Phase 1: Connection resilience - -- `HubConfig` struct: `HeartbeatInterval`, `PongTimeout`, `WriteTimeout`, `OnConnect`, `OnDisconnect` callbacks. -- `NewHubWithConfig(config)`: constructor with validation and defaults. -- `readPump`/`writePump` now use hub config values instead of hardcoded durations. -- `ReconnectingClient`: client-side reconnection with exponential backoff. - - `ReconnectConfig`: URL, InitialBackoff (1s), MaxBackoff (30s), BackoffMultiplier (2.0), MaxRetries, Dialer, Headers, OnConnect/OnDisconnect/OnReconnect/OnMessage callbacks. - - `Connect(ctx)`: blocking reconnect loop; returns on context cancel or max retries. - - `Send(msg)`: thread-safe message send; returns error if not connected. - - `State()`: returns `StateDisconnected`, `StateConnecting`, or `StateConnected`. - - `Close()`: cancels context and closes underlying connection. - - Exponential backoff: `calculateBackoff(attempt)` doubles each attempt, capped at MaxBackoff. - -### API surface additions - -- `HubConfig`, `DefaultHubConfig()`, `NewHubWithConfig()` -- `ConnectionState` enum: `StateDisconnected`, `StateConnecting`, `StateConnected` -- `ReconnectConfig`, `ReconnectingClient`, `NewReconnectingClient()` -- `DefaultHeartbeatInterval`, `DefaultPongTimeout`, `DefaultWriteTimeout` constants -- `NewHub()` still works unchanged (uses `DefaultHubConfig()` internally) diff --git a/TODO.md b/TODO.md deleted file mode 100644 index e9c85f5..0000000 --- a/TODO.md +++ /dev/null @@ -1,65 +0,0 @@ -# TODO.md — go-ws - -Dispatched from core/go orchestration. Pick up tasks in order. - ---- - -## Phase 0: Hardening & Test Coverage (complete) - -- [x] **Expand test coverage** — 67 test functions total across two sessions. Hub.Run lifecycle (register, broadcast delivery, shutdown, unregister, duplicate unregister), Subscribe/Unsubscribe (multiple channels, idempotent, partial leave, concurrent race), SendToChannel (no subscribers, multiple subscribers, buffer full), SendProcessOutput/SendProcessStatus (no subscribers, non-zero exit), readPump (subscribe, unsubscribe, ping, invalid JSON, non-string data, unknown types), writePump (batch sending, close-on-channel-close), buffer overflow disconnect, marshal errors, Handler upgrade error, Client.Close(), broadcast reaches all clients, disconnect cleans up everything. -- [x] **Integration test** — End-to-end tests using httptest.NewServer + gorilla/websocket Dial: connect-subscribe-send-receive, multiple clients on same channel, unsubscribe stops delivery, broadcast reaches all clients, process output/status streaming, disconnect cleanup. -- [x] **Benchmark** — 9 benchmarks in ws_bench_test.go: BenchmarkBroadcast_100 (100 clients), BenchmarkSendToChannel_50 (50 subscribers), parallel variants, message marshalling, WebSocket end-to-end, subscribe/unsubscribe cycle, multi-channel fanout, concurrent subscribers. All use b.ReportAllocs() and b.Loop() (Go 1.25+). Plus 2 inline benchmarks in ws_test.go. -- [x] **`go vet ./...` clean** — No warnings. Race-free under `go test -race`. -- [x] **Race condition fix** — Fixed data race in SendToChannel: clients now copied under lock before iteration. - -## Phase 1: Connection Resilience - -- [x] Add client-side reconnection support (exponential backoff) — `ReconnectingClient` with `ReconnectConfig`. Configurable initial backoff, max backoff, multiplier, max retries. -- [x] Tune heartbeat interval and pong timeout for flaky networks — `HubConfig` with `HeartbeatInterval`, `PongTimeout`, `WriteTimeout`. `NewHubWithConfig()` constructor. Defaults: 30s heartbeat, 60s pong timeout, 10s write timeout. -- [x] Add connection state callbacks (onConnect, onDisconnect, onReconnect) — Hub-level `OnConnect`/`OnDisconnect` callbacks in `HubConfig`. Client-level `OnConnect`/`OnDisconnect`/`OnReconnect` callbacks in `ReconnectConfig`. `ConnectionState` enum: `StateDisconnected`, `StateConnecting`, `StateConnected`. - -## Phase 2: Auth (complete) - -Token-based authentication on WebSocket upgrade handshake. Pure Go, no JWT library dependency — consumers bring their own validation logic via an interface. - -### 2.1 Authenticator Interface - -- [x] **Create `auth.go`** — Define the auth abstraction: - - `type AuthResult struct { Valid bool; UserID string; Claims map[string]any; Error error }` — result of authentication - - `type Authenticator interface { Authenticate(r *http.Request) AuthResult }` — validates the HTTP request during upgrade. Implementations can check headers (`Authorization: Bearer `), query params (`?token=xxx`), or cookies. - - `type AuthenticatorFunc func(r *http.Request) AuthResult` — adapter for using functions as Authenticators (implements the interface) - - `type APIKeyAuthenticator struct { Keys map[string]string }` — built-in authenticator that validates `Authorization: Bearer ` against a static key→userID map. Provided as a convenience; consumers can use their own JWT-based authenticator. - - `func NewAPIKeyAuth(keys map[string]string) *APIKeyAuthenticator` — constructor - -### 2.2 Wire Into Hub - -- [x] **Add `Authenticator` to `HubConfig`** — Optional field. When nil, all connections are accepted (backward compatible). When set, `Handler()` calls `Authenticate(r)` before upgrading. -- [x] **Update `Handler()`** — If `h.config.Authenticator != nil`, call `Authenticate(r)`. If `!result.Valid`, respond with `http.StatusUnauthorized` (or `http.StatusForbidden` if `result.Error` indicates a different status) and return without upgrading. If valid, store `result.UserID` and `result.Claims` on the `Client` struct. -- [x] **Add auth fields to `Client`** — `UserID string` and `Claims map[string]any` fields. Set during authenticated upgrade. Empty for unauthenticated hubs (nil authenticator). -- [x] **Expose `OnAuthFailure` callback** — Optional `OnAuthFailure func(r *http.Request, result AuthResult)` on `HubConfig` for logging/metrics on rejected connections. - -### 2.3 Tests - -- [x] **Unit tests** — (a) APIKeyAuthenticator valid key, (b) invalid key, (c) missing header, (d) malformed header ("Bearer" without token, wrong scheme), (e) AuthenticatorFunc adapter, (f) nil Authenticator (backward compat — all connections accepted) -- [x] **Integration tests** — Using httptest + gorilla/websocket Dial: - - (a) Authenticated connect with valid API key → upgrade succeeds, client.UserID set - - (b) Rejected connect with invalid key → HTTP 401, no WebSocket upgrade - - (c) Rejected connect with no auth header → HTTP 401 - - (d) Nil authenticator → all connections accepted (existing behaviour preserved) - - (e) OnAuthFailure callback fires on rejection - - (f) Multiple clients with different API keys → each gets correct UserID -- [x] **Existing tests still pass** — No authenticator set = backward compatible - -## Phase 3: Scaling - -- [x] Support Redis pub/sub as backend for multi-instance hub coordination -- [x] Broadcast messages across hub instances via Redis channels -- [ ] Add sticky sessions or connection-affinity documentation for load balancers - ---- - -## Workflow - -1. Virgil in core/go writes tasks here after research -2. This repo's dedicated session picks up tasks in phase order -3. Mark `[x]` when done, note commit hash diff --git a/docs/architecture.md b/docs/architecture.md new file mode 100644 index 0000000..982394a --- /dev/null +++ b/docs/architecture.md @@ -0,0 +1,317 @@ +# go-ws Architecture + +Module: `forge.lthn.ai/core/go-ws` + +--- + +## Overview + +go-ws is a WebSocket hub for real-time streaming in Go. It implements the hub pattern for centralised connection management with channel-based pub/sub delivery, token-based authentication on upgrade, client-side reconnection with exponential backoff, and a Redis pub/sub bridge for coordinating multiple hub instances. + +--- + +## Hub Pattern + +The `Hub` struct is the central broker. It owns all connection state and serialises mutations through goroutine-safe channels: + +``` + ┌─────────────────────────────┐ + │ Hub │ + HTTP upgrade ──► │ register chan *Client │ + disconnect ──► │ unregister chan *Client │ + server send ──► │ broadcast chan []byte │ + │ │ + │ clients map[*Client]bool │ + │ channels map[string]map… │ + └─────────────────────────────┘ +``` + +`Hub.Run(ctx)` is a single select-loop goroutine that processes all state transitions. Mutations to `clients` and `channels` occur only inside `Run`, protected by `sync.RWMutex` for reads from concurrent senders. This eliminates the need for channel-specific mutexes on write paths, while `SendToChannel` uses `RLock` plus a client-slice copy to prevent iterator invalidation races. + +### Lifecycle + +1. Call `hub.Run(ctx)` in a goroutine before accepting connections. +2. Mount `hub.Handler()` on any `http.ServeMux` or router. +3. Cancel the context to shut down: `Run` closes all client send channels, which causes `writePump` goroutines to send a WebSocket close frame and exit. + +--- + +## Connection Management + +Each connected client is represented by a `Client`: + +```go +type Client struct { + hub *Hub + conn *websocket.Conn + send chan []byte // buffered, capacity 256 + subscriptions map[string]bool + mu sync.RWMutex + + UserID string + Claims map[string]any +} +``` + +On upgrade, `Handler` creates a `Client`, sends it to `hub.register`, then starts two goroutines: + +- `readPump` — reads inbound frames, dispatches subscribe/unsubscribe/ping, enforces pong timeout. +- `writePump` — drains `client.send`, batches queued frames into a single write, sends server-side ping on heartbeat tick. + +On disconnect (read error, write error, or context cancel), `readPump` sends the client to `hub.unregister`. `Run` removes it from `clients` and all channel maps, then closes `client.send`. Closing `client.send` is the signal that causes `writePump` to send a close frame and exit. + +### Buffer Overflow + +Each client's `send` channel has capacity 256. When `Broadcast` or `SendToChannel` cannot deliver to a full channel, the client is considered stalled. For broadcasts, `Run` schedules an unregister via a goroutine. For `SendToChannel`, the message is silently dropped for that client only. + +### Timing Defaults + +| Constant | Default | Purpose | +|---|---|---| +| `DefaultHeartbeatInterval` | 30 s | Server-side ping cadence | +| `DefaultPongTimeout` | 60 s | Read deadline after each pong | +| `DefaultWriteTimeout` | 10 s | Write deadline per frame | + +All three are configurable via `HubConfig`. + +--- + +## Channel Subscriptions + +Channels are named strings; there are no predefined names. Clients subscribe by sending a JSON message: + +```json +{"type": "subscribe", "data": "process:proc-1"} +``` + +`readPump` intercepts subscribe/unsubscribe frames and calls `hub.Subscribe` / `hub.Unsubscribe` directly. The hub maintains two parallel indices: + +- `hub.channels[channelName][*Client]` — for targeted send +- `client.subscriptions[channelName]` — for cleanup on disconnect + +Both indices are kept consistent. Unsubscribing from a channel with no remaining subscribers removes the channel entry entirely. + +### Sending to a Channel + +```go +hub.SendToChannel("process:proc-1", ws.Message{ + Type: ws.TypeProcessOutput, + Data: "output line", +}) +``` + +`SendToChannel` acquires `RLock`, copies the subscriber slice, releases the lock, then delivers to each client's `send` channel. The copy-under-lock pattern is critical: it prevents a data race with `hub.unregister` which mutates the map under write lock concurrently. + +### Process Helpers + +Two convenience methods wrap `SendToChannel` with idiomatic channel naming (`process:`): + +```go +hub.SendProcessOutput(processID, line) +hub.SendProcessStatus(processID, "exited", exitCode) +``` + +--- + +## Message Types + +All frames are JSON-encoded `Message` structs: + +```go +type Message struct { + Type MessageType `json:"type"` + Channel string `json:"channel,omitempty"` + ProcessID string `json:"processId,omitempty"` + Data any `json:"data,omitempty"` + Timestamp time.Time `json:"timestamp"` +} +``` + +| Type | Direction | Purpose | +|---|---|---| +| `process_output` | server → client | Real-time subprocess stdout/stderr line | +| `process_status` | server → client | Process state change (`running`, `exited`) with exit code | +| `event` | server → client | Generic application event | +| `error` | server → client | Error notification | +| `ping` | client → server | Keep-alive request | +| `pong` | server → client | Response to client ping | +| `subscribe` | client → server | Subscribe to a named channel | +| `unsubscribe` | client → server | Unsubscribe from a named channel | + +Server-side pings use the WebSocket protocol ping frame (not a JSON `ping` message). Client-sent `ping` messages result in a JSON `pong` response via `client.send`. + +--- + +## Authentication + +Authentication is optional and backward compatible. When `HubConfig.Authenticator` is nil, all connections are accepted unchanged. When set, `Handler` calls `Authenticate(r)` before the WebSocket upgrade: + +```go +auth := ws.NewAPIKeyAuth(map[string]string{ + "secret-key": "user-1", +}) +hub := ws.NewHubWithConfig(ws.HubConfig{Authenticator: auth}) +``` + +If authentication fails, the HTTP response is `401 Unauthorised` and no upgrade occurs. + +### Authenticator Interface + +```go +type Authenticator interface { + Authenticate(r *http.Request) AuthResult +} + +type AuthResult struct { + Valid bool + UserID string + Claims map[string]any + Error error +} +``` + +Implementations may inspect any part of the request: headers, query parameters, cookies. The `AuthenticatorFunc` adapter allows plain functions to satisfy the interface without defining a named type. + +### Built-in: APIKeyAuthenticator + +`APIKeyAuthenticator` validates `Authorization: Bearer ` against a static key-to-userID map. It is intended for simple deployments and internal tooling. For JWT or OAuth, implement the `Authenticator` interface directly. + +### Auth Fields on Client + +On successful authentication, `client.UserID` and `client.Claims` are populated. These are readable from `OnConnect` callbacks and any code that holds a reference to the client. + +### OnAuthFailure Callback + +`HubConfig.OnAuthFailure` fires on every rejected connection. Use it for logging, metrics, or rate-limit triggers. It receives the original `*http.Request` and the `AuthResult`. + +--- + +## Redis Pub/Sub Bridge + +`RedisBridge` connects a local `Hub` to Redis, enabling multiple hub instances (across multiple processes or servers) to coordinate broadcasts and channel messages. + +``` + Instance A Redis Instance B + ┌─────────┐ publish ┌─────────┐ receive ┌─────────┐ + │ Hub A │──────────────►│ ws:* │─────────────►│ Hub B │ + │ Bridge A│ │ channels│ │ Bridge B│ + └─────────┘ └─────────┘ └─────────┘ +``` + +### Redis Channel Naming + +| Redis channel | Meaning | +|---|---| +| `{prefix}:broadcast` | Global broadcast to all clients across all instances | +| `{prefix}:channel:{name}` | Targeted delivery to subscribers of `{name}` | + +The default prefix is `ws`. Override via `RedisConfig.Prefix`. + +### Usage + +```go +bridge, err := ws.NewRedisBridge(hub, ws.RedisConfig{ + Addr: "10.69.69.87:6379", + Prefix: "ws", +}) +bridge.Start(ctx) +defer bridge.Stop() + +// Publish to all instances +bridge.PublishBroadcast(msg) + +// Publish to subscribers of "process:abc" on all instances +bridge.PublishToChannel("process:abc", msg) +``` + +`NewRedisBridge` validates connectivity with a `PING` before returning. `Start` uses `PSubscribe` with a pattern that matches both the broadcast channel and all `{prefix}:channel:*` channels. The listener goroutine forwards received messages to the local hub via `hub.Broadcast` or `hub.SendToChannel` as appropriate. + +### Envelope Pattern and Loop Prevention + +Every published message is wrapped in a `redisEnvelope` before serialisation: + +```go +type redisEnvelope struct { + SourceID string `json:"sourceId"` + Message Message `json:"message"` +} +``` + +`SourceID` is a 16-byte cryptographically random hex string generated once per bridge instance at construction time. The listener goroutine drops any envelope whose `SourceID` matches the local bridge's own ID. This prevents a bridge from re-delivering its own published messages to its own hub. + +Without this guard, a single `PublishBroadcast` call would immediately echo back to the publishing instance, creating an infinite loop. + +--- + +## ReconnectingClient + +`ReconnectingClient` provides client-side resilience. It wraps a gorilla/websocket connection with an automatic reconnect loop: + +```go +client := ws.NewReconnectingClient(ws.ReconnectConfig{ + URL: "ws://server/ws", + InitialBackoff: 1 * time.Second, + MaxBackoff: 30 * time.Second, + BackoffMultiplier: 2.0, + MaxRetries: 0, // unlimited + OnConnect: func() { /* ... */ }, + OnDisconnect: func() { /* ... */ }, + OnReconnect: func(attempt int) { /* ... */ }, + OnMessage: func(msg ws.Message) { /* ... */ }, +}) + +go client.Connect(ctx) // blocks until context cancelled or max retries exceeded +``` + +### Backoff Calculation + +Backoff doubles on each failed attempt, capped at `MaxBackoff`: + +``` +attempt 1: InitialBackoff +attempt 2: InitialBackoff * Multiplier +attempt 3: InitialBackoff * Multiplier^2 +... +attempt N: min(InitialBackoff * Multiplier^(N-1), MaxBackoff) +``` + +The attempt counter resets to zero after a successful connection. + +### Connection States + +```go +const ( + StateDisconnected ConnectionState = iota + StateConnecting + StateConnected +) +``` + +`client.State()` returns the current state under a read lock. Useful for health checks and UI indicators. + +--- + +## Concurrency Model + +| Resource | Protection | +|---|---| +| `hub.clients`, `hub.channels` | `sync.RWMutex` on Hub | +| `client.subscriptions` | `sync.RWMutex` on Client | +| `ReconnectingClient.conn`, `.state` | `sync.RWMutex` on ReconnectingClient | +| Hub state transitions | Serialised through `hub.register`/`hub.unregister` channels in `Run` loop | + +The key invariant: `hub.clients` and `hub.channels` are mutated only from within `hub.Run` (via channel receive) or from `Subscribe`/`Unsubscribe` under write lock. `SendToChannel` copies the subscriber slice under read lock before releasing it, so iteration happens outside any lock without races. + +All code is clean under `go test -race ./...`. + +--- + +## Dependency Graph + +``` +go-ws +├── github.com/gorilla/websocket v1.5.3 (WebSocket server + client) +└── github.com/redis/go-redis/v9 v9.18.0 (Redis bridge, optional at runtime) +``` + +The Redis dependency is a compile-time import but a runtime opt-in. Applications that do not create a `RedisBridge` incur no Redis connections. diff --git a/docs/development.md b/docs/development.md new file mode 100644 index 0000000..ee24e93 --- /dev/null +++ b/docs/development.md @@ -0,0 +1,185 @@ +# go-ws Development Guide + +Module: `forge.lthn.ai/core/go-ws` + +--- + +## Prerequisites + +- Go 1.25 or later (benchmarks use `b.Loop()`, which requires 1.25+) +- A running Redis instance if exercising `RedisBridge` integration tests (default: `10.69.69.87:6379`) +- No CGO dependencies; builds on Linux, macOS, and Windows without toolchain additions + +--- + +## Build and Test + +```bash +# Run the full test suite +go test ./... + +# Run with race detector (required before every commit) +go test -race ./... + +# Run a single test by name +go test -v -run TestHub_Run ./... + +# Run benchmarks +go test -bench=. -benchmem ./... + +# Run a specific benchmark +go test -bench=BenchmarkBroadcast_100 -benchmem ./... +``` + +There is no Taskfile in this module. All automation goes through the standard `go` toolchain. + +--- + +## Test Organisation + +Tests live in the same package (`package ws`), giving direct access to unexported fields for white-box assertions. Three test files exist: + +| File | Coverage | +|---|---| +| `ws_test.go` | Hub lifecycle, broadcast, channel send, subscribe/unsubscribe, readPump, writePump, integration via httptest, auth integration | +| `auth_test.go` | `APIKeyAuthenticator`, `AuthenticatorFunc`, nil authenticator, `OnAuthFailure` callback | +| `redis_test.go` | `RedisBridge` unit and integration (skipped when Redis unavailable) | +| `ws_bench_test.go` | 9 benchmarks: broadcast, channel send, parallel variants, marshal, WebSocket end-to-end, subscribe/unsubscribe cycle, multi-channel fanout | + +### Test Naming Convention + +Integration tests use `httptest.NewServer` + `gorilla/websocket` Dial for end-to-end coverage. Unit tests drive the Hub directly via channels and method calls. + +### Redis Tests + +Redis integration tests call `skipIfNoRedis(t)` at the top, which pings the configured address and skips if unreachable: + +```go +func TestRedisBridge_PublishBroadcast(t *testing.T) { + client := skipIfNoRedis(t) + // ... +} +``` + +Each Redis test uses a unique time-based prefix to avoid collisions between parallel runs. Cleanup removes any leftover keys in `t.Cleanup`. + +### Benchmark Pattern + +Benchmarks use `b.Loop()` (Go 1.25+) and `b.ReportAllocs()`: + +```go +func BenchmarkBroadcast_100(b *testing.B) { + // setup ... + b.ResetTimer() + b.ReportAllocs() + for b.Loop() { + _ = hub.Broadcast(msg) + } + b.StopTimer() + // drain ... +} +``` + +--- + +## Coding Standards + +### Language + +UK English throughout: "initialise", "colour", "behaviour", "cancelled", "unauthorised". Never American spellings. + +### Go Style + +- `declare(strict_types=1)` is a PHP concept; in Go, enforce correctness via the type system and `go vet`. +- All exported symbols must have doc comments. +- Error strings are lowercase and do not end with punctuation (Go convention). +- Use named return values only when they materially clarify intent. +- Prefer `require.NoError` over `assert.NoError` when a test cannot continue after failure. + +### Licence Header + +Every `.go` source file begins with: + +```go +// SPDX-Licence-Identifier: EUPL-1.2 +``` + +This applies to all files including test files. + +### Formatting + +Standard `gofmt`. No additional linter configuration is committed. Run `go vet ./...` before committing; the suite must be clean. + +--- + +## Commit Guidelines + +Commits follow Conventional Commits: + +``` +type(scope): description +``` + +Common types: `feat`, `fix`, `test`, `docs`, `refactor`, `bench`. + +Scope is the logical area: `ws`, `auth`, `redis`, `reconnect`. + +Every commit includes the co-author trailer: + +``` +Co-Authored-By: Virgil +``` + +Example: + +``` +feat(redis): add envelope pattern for loop prevention + +Generated a random sourceID per bridge instance at construction. +Listener drops messages where sourceID matches local bridge. + +Co-Authored-By: Virgil +``` + +--- + +## Adding a New Message Type + +1. Add a constant to the `MessageType` block in `ws.go`. +2. Handle the new type in `readPump` if it is client-initiated. +3. Add a helper method on `Hub` if it is server-initiated (following the pattern of `SendProcessOutput`). +4. Add unit tests covering the new type in `ws_test.go`. +5. Update `docs/architecture.md` message type table. + +--- + +## Adding a New Authenticator + +Implement the `Authenticator` interface: + +```go +type MyJWTAuth struct { /* ... */ } + +func (a *MyJWTAuth) Authenticate(r *http.Request) ws.AuthResult { + token := r.Header.Get("Authorization") + claims, err := validateJWT(token) + if err != nil { + return ws.AuthResult{Valid: false, Error: err} + } + return ws.AuthResult{ + Valid: true, + UserID: claims.Subject, + Claims: map[string]any{"roles": claims.Roles}, + } +} +``` + +Pass it to `HubConfig.Authenticator`. The built-in `APIKeyAuthenticator` in `auth.go` serves as a reference implementation. JWT validation libraries are intentionally not imported; consumers bring their own. + +--- + +## Repository + +- Forge: `ssh://git@forge.lthn.ai:2223/core/go-ws.git` +- Push via SSH only; HTTPS authentication is not configured on Forge. +- Licence: EUPL-1.2 diff --git a/docs/history.md b/docs/history.md new file mode 100644 index 0000000..3390c81 --- /dev/null +++ b/docs/history.md @@ -0,0 +1,112 @@ +# go-ws Project History + +--- + +## Origin + +go-ws was extracted from `forge.lthn.ai/core/go` on 19 February 2026. The original code lived at `pkg/ws/` within the core/go monorepo. It was split into a standalone module to allow independent versioning, dedicated test coverage, and direct use by consumers that do not need the full core/go framework. + +Extraction commit: `e942500 feat: extract go-ws from core/go pkg/ws` + +--- + +## Phases + +### Phase 0: Hardening and Test Coverage + +Commit: `53d8a15 test: expand Phase 0 coverage — 16 new tests, 9 benchmarks, SPDX headers` +Commit: `13d9422 feat(ws): phase 0 coverage (98.5%) + phase 1 connection resilience` + +Coverage rose from 88.4% to 98.5%. Work included: + +- 16 new test functions: hub shutdown, broadcast channel overflow, `SendToChannel` buffer full, marshal errors, upgrade error, `Client.Close`, malformed JSON, non-string subscribe/unsubscribe data, unknown message types, `writePump` close and batch paths, concurrent subscribe/unsubscribe race, multi-client channel delivery, end-to-end process output and status streaming. +- 9 benchmarks in `ws_bench_test.go`: `BenchmarkBroadcast_100`, `BenchmarkSendToChannel_50`, parallel variants, message marshalling, WebSocket end-to-end, subscribe/unsubscribe cycle, multi-channel fanout, concurrent subscribers. +- All benchmarks use `b.Loop()` (Go 1.25+) and `b.ReportAllocs()`. +- Race condition fix: `SendToChannel` previously read the channel's client map under `RLock`, released the lock, then iterated. A concurrent `unregister` could mutate the map during iteration. Fixed by copying client pointers to a slice under `RLock` and iterating the copy after releasing the lock. +- SPDX licence headers added to all source files. +- `go vet ./...` clean; `go test -race ./...` clean. + +### Phase 1: Connection Resilience + +Commit: `13d9422 feat(ws): phase 0 coverage (98.5%) + phase 1 connection resilience` + +- `HubConfig` struct with configurable `HeartbeatInterval`, `PongTimeout`, `WriteTimeout`, and `OnConnect`/`OnDisconnect` callbacks. +- `DefaultHubConfig()` and `NewHubWithConfig(config)` constructors. `NewHub()` delegates to `NewHubWithConfig(DefaultHubConfig())`. +- `readPump` and `writePump` use hub config values instead of hardcoded durations. +- `ReconnectingClient` with exponential backoff: + - `ReconnectConfig`: URL, `InitialBackoff` (1 s), `MaxBackoff` (30 s), `BackoffMultiplier` (2.0), `MaxRetries` (0 = unlimited), `Dialer`, `Headers`, `OnConnect`/`OnDisconnect`/`OnReconnect`/`OnMessage` callbacks. + - `Connect(ctx)`: blocking reconnect loop, returns on context cancel or max retries exceeded. + - `Send(msg)`: thread-safe write to server, returns error if not connected. + - `State()`: returns `StateDisconnected`, `StateConnecting`, or `StateConnected`. + - `Close()`: cancels context, closes underlying connection. +- `ConnectionState` enum: `StateDisconnected`, `StateConnecting`, `StateConnected`. + +### Phase 2: Authentication + +Commit: `534bbe5 docs: flesh out Phase 2 auth task specs` +Commit: `9e48f0b feat(auth): Phase 2 — token-based authentication on WebSocket upgrade` + +Token-based authentication at the HTTP upgrade handshake. No JWT library is imported; consumers provide their own validation logic via an interface. + +- `Authenticator` interface: `Authenticate(r *http.Request) AuthResult`. +- `AuthResult` struct: `Valid bool`, `UserID string`, `Claims map[string]any`, `Error error`. +- `AuthenticatorFunc` adapter for plain function use. +- `APIKeyAuthenticator`: validates `Authorization: Bearer ` against a static key-to-userID map. +- `NewAPIKeyAuth(keys map[string]string)` constructor. +- `HubConfig.Authenticator`: optional; nil preserves backward-compatible behaviour (all connections accepted). +- `HubConfig.OnAuthFailure`: optional callback for rejected connections. +- `Client.UserID` and `Client.Claims` populated on successful authentication. +- Auth errors defined in `errors.go`: `ErrMissingAuthHeader`, `ErrMalformedAuthHeader`, `ErrInvalidAPIKey`. +- Unit tests: valid key, invalid key, missing header, malformed header, `AuthenticatorFunc` adapter, nil authenticator. +- Integration tests via httptest: authenticated connect, rejected connect (401), nil authenticator backward compat, `OnAuthFailure` callback, multiple clients with distinct user IDs. + +### Phase 3: Redis Pub/Sub Bridge + +Commit: `da3df00 feat(redis): add Redis pub/sub bridge for multi-instance Hub coordination` + +Cross-instance coordination via Redis pub/sub. + +- `RedisBridge` struct: wraps a `Hub` with a Redis client and listener goroutine. +- `RedisConfig`: `Addr`, `Password`, `DB`, `Prefix` (default `ws`). +- `NewRedisBridge(hub, cfg)`: validates connectivity with `PING` before returning. +- `Start(ctx)`: subscribes via `PSubscribe` to `{prefix}:broadcast` and `{prefix}:channel:*`, spawns listener goroutine. +- `Stop()`: cancels listener, closes pub/sub subscription and Redis client connection. +- `PublishBroadcast(msg)`: publishes to `{prefix}:broadcast`; all bridge instances deliver to their local hub clients. +- `PublishToChannel(channel, msg)`: publishes to `{prefix}:channel:{channel}`; all bridge instances deliver to local subscribers of that channel. +- Envelope pattern (`redisEnvelope`): wraps every message with a `sourceID` (16-byte cryptographically random hex) to prevent echo loops. The listener silently drops messages whose `sourceID` matches the local bridge. +- `SourceID()`: returns the instance identifier, useful for debugging. +- Redis integration tests use `skipIfNoRedis(t)` and unique time-based prefixes to avoid collisions between parallel runs. + +--- + +## Known Limitations + +### Load Balancer Affinity + +The Redis bridge coordinates messages across hub instances but does not solve connection affinity. When a client connects to instance A and a message is published via instance B, the message reaches the client correctly through Redis. However, subscription state is local to each hub instance. A client subscribed on instance A is invisible to instance B's channel maps. + +Consequence: `hub.ChannelSubscriberCount` and `hub.Stats` reflect only local state. There is no global subscriber registry. For systems that need to know whether any instance has a subscriber before publishing, either publish unconditionally (relying on Redis to route) or implement a shared registry in Redis. + +Sticky sessions at the load balancer level (by client IP or cookie) eliminate the affinity problem entirely and are the recommended approach for most deployments. + +### Origin Check + +The WebSocket upgrader is configured with `CheckOrigin: func(*http.Request) bool { return true }`. This accepts connections from any origin, which is appropriate for local development and internal tooling. Production deployments behind a reverse proxy with strict origin control should override the upgrader or add origin validation in an `Authenticator` implementation. + +### Broadcast Buffer + +The hub's broadcast channel has a fixed capacity of 256. High-throughput broadcast workloads can saturate this buffer, causing `hub.Broadcast` to return an error. Callers should handle this error and consider whether dropping or queuing at the application level is appropriate. + +### No Global Subscriber Registry + +There is no mechanism to enumerate all connected clients across multiple hub instances. Redis holds no session state; it only relays messages. Applications requiring global presence information (e.g. "how many users are watching process X") must maintain their own counter, typically in Redis. + +--- + +## Future Considerations + +- **Sticky session documentation**: A deployment guide covering Nginx/Traefik IP-hash or cookie-based affinity for multi-instance setups. +- **Global subscriber registry**: Optional Redis-backed presence tracking to complement the bridge. +- **TLS for Redis**: `RedisConfig` currently supports `Addr`, `Password`, and `DB` only. Adding a `TLSConfig *tls.Config` field would support encrypted Redis connections without breaking the existing API. +- **Message acknowledgement**: The current model is fire-and-forget. A future phase could add client-side ack with server-side retry for guaranteed delivery on unreliable connections. +- **Per-channel access control**: The `Authenticator` interface gates connection upgrade but does not restrict which channels a client may subscribe to. A `ChannelAuthoriser` hook on `HubConfig` would allow per-subscription checks using `client.Claims`.