feat(ws): phase 0 coverage (98.5%) + phase 1 connection resilience

Phase 0: Fix SendToChannel data race (client map iterated outside lock),
add 16 test functions covering all edge cases, benchmarks, and integration
tests. Coverage 88.4% -> 98.5%. go vet clean, race detector clean.

Phase 1: Add HubConfig with configurable heartbeat/pong/write timeouts
and OnConnect/OnDisconnect callbacks. Add ReconnectingClient with
exponential backoff, max retries, and OnConnect/OnDisconnect/OnReconnect
state callbacks. Full test coverage for all resilience features.

Co-Authored-By: Charon <developers@lethean.io>
This commit is contained in:
Claude 2026-02-20 01:17:56 +00:00
parent 51f9c59fab
commit 13d9422b74
No known key found for this signature in database
GPG key ID: AF404715446AEB41
5 changed files with 1602 additions and 16 deletions

View file

@ -16,6 +16,10 @@ go test -v -run Name # Run single test
- `Hub` manages WebSocket connections and channel subscriptions
- Messages types: `process_output`, `process_status`, `event`, `error`, `ping/pong`, `subscribe/unsubscribe`
- `hub.SendProcessOutput(id, line)` broadcasts to subscribers
- `HubConfig` provides configurable heartbeat, pong timeout, write timeout, and connection callbacks
- `ReconnectingClient` provides client-side reconnection with exponential backoff
- `ConnectionState`: `StateDisconnected`, `StateConnecting`, `StateConnected`
- Coverage: 98.5%
## Coding Standards

View file

@ -24,3 +24,37 @@ Extracted from `forge.lthn.ai/core/go` `pkg/ws/` on 19 Feb 2026.
- Hub must be started with `go hub.Run(ctx)` before accepting connections
- HTTP handler exposed via `hub.Handler()` for mounting on any router
- `hub.SendProcessOutput(processID, line)` is the primary API for streaming subprocess output
## 2026-02-20: Phase 0 & Phase 1 (Charon)
### Race condition fix
- `SendToChannel` had a data race: it acquired `RLock`, read the channel's client map, released `RUnlock`, then iterated clients outside the lock. If `Hub.Run` processed an unregister concurrently, the map was modified during iteration.
- Fix: copy client pointers into a slice under `RLock`, then iterate the copy after releasing the lock.
### Phase 0: Test coverage 88.4% to 98.5%
- Added 16 new test functions covering: hub shutdown, broadcast overflow, channel send overflow, marshal errors, upgrade error, `Client.Close`, malformed JSON, non-string subscribe/unsubscribe data, unknown message types, writePump close/batch, concurrent subscribe/unsubscribe, multi-client channel delivery, end-to-end process output/status.
- Added `BenchmarkBroadcast` (100 clients) and `BenchmarkSendToChannel` (50 subscribers).
- `go vet ./...` clean; `go test -race ./...` clean.
### Phase 1: Connection resilience
- `HubConfig` struct: `HeartbeatInterval`, `PongTimeout`, `WriteTimeout`, `OnConnect`, `OnDisconnect` callbacks.
- `NewHubWithConfig(config)`: constructor with validation and defaults.
- `readPump`/`writePump` now use hub config values instead of hardcoded durations.
- `ReconnectingClient`: client-side reconnection with exponential backoff.
- `ReconnectConfig`: URL, InitialBackoff (1s), MaxBackoff (30s), BackoffMultiplier (2.0), MaxRetries, Dialer, Headers, OnConnect/OnDisconnect/OnReconnect/OnMessage callbacks.
- `Connect(ctx)`: blocking reconnect loop; returns on context cancel or max retries.
- `Send(msg)`: thread-safe message send; returns error if not connected.
- `State()`: returns `StateDisconnected`, `StateConnecting`, or `StateConnected`.
- `Close()`: cancels context and closes underlying connection.
- Exponential backoff: `calculateBackoff(attempt)` doubles each attempt, capped at MaxBackoff.
### API surface additions
- `HubConfig`, `DefaultHubConfig()`, `NewHubWithConfig()`
- `ConnectionState` enum: `StateDisconnected`, `StateConnecting`, `StateConnected`
- `ReconnectConfig`, `ReconnectingClient`, `NewReconnectingClient()`
- `DefaultHeartbeatInterval`, `DefaultPongTimeout`, `DefaultWriteTimeout` constants
- `NewHub()` still works unchanged (uses `DefaultHubConfig()` internally)

15
TODO.md
View file

@ -6,16 +6,17 @@ Dispatched from core/go orchestration. Pick up tasks in order.
## Phase 0: Hardening & Test Coverage
- [ ] **Expand test coverage**`ws_test.go` exists. Add tests for: `Hub.Run()` lifecycle (start, register client, broadcast, shutdown), `Subscribe`/`Unsubscribe` channel management, `SendToChannel` with no subscribers (should not error), `SendProcessOutput`/`SendProcessStatus` helpers, client `readPump` message parsing (subscribe, unsubscribe, ping), client `writePump` batch sending, client buffer overflow (send channel full → client disconnected), concurrent broadcast + subscribe (race test).
- [ ] **Integration test** — Use `httptest.NewServer` + real WebSocket client. Connect, subscribe to channel, send message, verify receipt. Test multiple clients on same channel.
- [ ] **Benchmark**`BenchmarkBroadcast` with 100 connected clients. `BenchmarkSendToChannel` with 50 subscribers. Measure message throughput.
- [ ] **`go vet ./...` clean** — Fix any warnings.
- [x] **Expand test coverage** — Added tests for: `Hub.Run()` shutdown closing all clients, broadcast to client with full buffer (unregister path), `SendToChannel` with full client buffer (skip path), `Broadcast`/`SendToChannel` marshal errors, `Handler` upgrade error on non-WebSocket request, `Client.Close()`, `readPump` malformed JSON, subscribe/unsubscribe with non-string data, unknown message types, `writePump` close-on-channel-close and batch sending, concurrent subscribe/unsubscribe race test, multiple clients on same channel, end-to-end process output and status tests. Coverage: 88.4% → 98.5%.
- [x] **Integration test** — Full end-to-end tests using `httptest.NewServer` + real WebSocket clients. Multi-client channel delivery, process output streaming, process status updates.
- [x] **Benchmark**`BenchmarkBroadcast` with 100 clients, `BenchmarkSendToChannel` with 50 subscribers.
- [x] **`go vet ./...` clean** — No warnings.
- [x] **Race condition fix** — Fixed data race in `SendToChannel` where client map was iterated outside the read lock. Clients are now copied under lock before iteration.
## Phase 1: Connection Resilience
- [ ] Add client-side reconnection support (exponential backoff)
- [ ] Tune heartbeat interval and pong timeout for flaky networks
- [ ] Add connection state callbacks (onConnect, onDisconnect, onReconnect)
- [x] Add client-side reconnection support (exponential backoff)`ReconnectingClient` with `ReconnectConfig`. Configurable initial backoff, max backoff, multiplier, max retries.
- [x] Tune heartbeat interval and pong timeout for flaky networks`HubConfig` with `HeartbeatInterval`, `PongTimeout`, `WriteTimeout`. `NewHubWithConfig()` constructor. Defaults: 30s heartbeat, 60s pong timeout, 10s write timeout.
- [x] Add connection state callbacks (onConnect, onDisconnect, onReconnect) — Hub-level `OnConnect`/`OnDisconnect` callbacks in `HubConfig`. Client-level `OnConnect`/`OnDisconnect`/`OnReconnect` callbacks in `ReconnectConfig`. `ConnectionState` enum: `StateDisconnected`, `StateConnecting`, `StateConnected`.
## Phase 2: Auth

332
ws.go
View file

@ -63,6 +63,56 @@ var upgrader = websocket.Upgrader{
},
}
// Default timing values for heartbeat and pong timeout.
const (
DefaultHeartbeatInterval = 30 * time.Second
DefaultPongTimeout = 60 * time.Second
DefaultWriteTimeout = 10 * time.Second
)
// ConnectionState represents the current state of a reconnecting client.
type ConnectionState int
const (
// StateDisconnected indicates the client is not connected.
StateDisconnected ConnectionState = iota
// StateConnecting indicates the client is attempting to connect.
StateConnecting
// StateConnected indicates the client has an active connection.
StateConnected
)
// HubConfig holds configuration for the Hub and its managed connections.
type HubConfig struct {
// HeartbeatInterval is the interval between server-side ping messages.
// Defaults to 30 seconds.
HeartbeatInterval time.Duration
// PongTimeout is how long the server waits for a pong before
// considering the connection dead. Must be greater than HeartbeatInterval.
// Defaults to 60 seconds.
PongTimeout time.Duration
// WriteTimeout is the deadline for write operations.
// Defaults to 10 seconds.
WriteTimeout time.Duration
// OnConnect is called when a client connects to the hub.
OnConnect func(client *Client)
// OnDisconnect is called when a client disconnects from the hub.
OnDisconnect func(client *Client)
}
// DefaultHubConfig returns a HubConfig with sensible defaults.
func DefaultHubConfig() HubConfig {
return HubConfig{
HeartbeatInterval: DefaultHeartbeatInterval,
PongTimeout: DefaultPongTimeout,
WriteTimeout: DefaultWriteTimeout,
}
}
// MessageType identifies the type of WebSocket message.
type MessageType string
@ -110,17 +160,33 @@ type Hub struct {
register chan *Client
unregister chan *Client
channels map[string]map[*Client]bool
config HubConfig
mu sync.RWMutex
}
// NewHub creates a new WebSocket hub.
// NewHub creates a new WebSocket hub with default configuration.
func NewHub() *Hub {
return NewHubWithConfig(DefaultHubConfig())
}
// NewHubWithConfig creates a new WebSocket hub with the given configuration.
func NewHubWithConfig(config HubConfig) *Hub {
if config.HeartbeatInterval <= 0 {
config.HeartbeatInterval = DefaultHeartbeatInterval
}
if config.PongTimeout <= 0 {
config.PongTimeout = DefaultPongTimeout
}
if config.WriteTimeout <= 0 {
config.WriteTimeout = DefaultWriteTimeout
}
return &Hub{
clients: make(map[*Client]bool),
broadcast: make(chan []byte, 256),
register: make(chan *Client),
unregister: make(chan *Client),
channels: make(map[string]map[*Client]bool),
config: config,
}
}
@ -142,6 +208,9 @@ func (h *Hub) Run(ctx context.Context) {
h.mu.Lock()
h.clients[client] = true
h.mu.Unlock()
if h.config.OnConnect != nil {
h.config.OnConnect(client)
}
case client := <-h.unregister:
h.mu.Lock()
if _, ok := h.clients[client]; ok {
@ -157,8 +226,13 @@ func (h *Hub) Run(ctx context.Context) {
}
}
}
h.mu.Unlock()
if h.config.OnDisconnect != nil {
h.config.OnDisconnect(client)
}
} else {
h.mu.Unlock()
}
h.mu.Unlock()
case message := <-h.broadcast:
h.mu.RLock()
for client := range h.clients {
@ -236,13 +310,19 @@ func (h *Hub) SendToChannel(channel string, msg Message) error {
h.mu.RLock()
clients, ok := h.channels[channel]
h.mu.RUnlock()
if !ok {
h.mu.RUnlock()
return nil // No subscribers, not an error
}
// Copy client references under lock to avoid races during iteration
targets := make([]*Client, 0, len(clients))
for client := range clients {
targets = append(targets, client)
}
h.mu.RUnlock()
for _, client := range targets {
select {
case client.send <- data:
default:
@ -366,10 +446,11 @@ func (c *Client) readPump() {
c.conn.Close()
}()
pongTimeout := c.hub.config.PongTimeout
c.conn.SetReadLimit(65536)
c.conn.SetReadDeadline(time.Now().Add(60 * time.Second))
c.conn.SetReadDeadline(time.Now().Add(pongTimeout))
c.conn.SetPongHandler(func(string) error {
c.conn.SetReadDeadline(time.Now().Add(60 * time.Second))
c.conn.SetReadDeadline(time.Now().Add(pongTimeout))
return nil
})
@ -401,7 +482,9 @@ func (c *Client) readPump() {
// writePump sends messages to the client.
func (c *Client) writePump() {
ticker := time.NewTicker(30 * time.Second)
heartbeat := c.hub.config.HeartbeatInterval
writeTimeout := c.hub.config.WriteTimeout
ticker := time.NewTicker(heartbeat)
defer func() {
ticker.Stop()
c.conn.Close()
@ -410,7 +493,7 @@ func (c *Client) writePump() {
for {
select {
case message, ok := <-c.send:
c.conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
c.conn.SetWriteDeadline(time.Now().Add(writeTimeout))
if !ok {
c.conn.WriteMessage(websocket.CloseMessage, []byte{})
return
@ -433,7 +516,7 @@ func (c *Client) writePump() {
return
}
case <-ticker.C:
c.conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
c.conn.SetWriteDeadline(time.Now().Add(writeTimeout))
if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
return
}
@ -463,3 +546,234 @@ func (c *Client) Close() error {
c.hub.unregister <- c
return c.conn.Close()
}
// ReconnectConfig holds configuration for the reconnecting WebSocket client.
type ReconnectConfig struct {
// URL is the WebSocket server URL to connect to.
URL string
// InitialBackoff is the delay before the first reconnection attempt.
// Defaults to 1 second.
InitialBackoff time.Duration
// MaxBackoff is the maximum delay between reconnection attempts.
// Defaults to 30 seconds.
MaxBackoff time.Duration
// BackoffMultiplier controls exponential growth of the backoff.
// Defaults to 2.0.
BackoffMultiplier float64
// MaxRetries is the maximum number of consecutive reconnection attempts.
// Zero means unlimited retries.
MaxRetries int
// OnConnect is called when the client successfully connects.
OnConnect func()
// OnDisconnect is called when the client loses its connection.
OnDisconnect func()
// OnReconnect is called when the client successfully reconnects
// after a disconnection. The attempt count is passed in.
OnReconnect func(attempt int)
// OnMessage is called when a message is received from the server.
OnMessage func(msg Message)
// Dialer is the WebSocket dialer to use. Defaults to websocket.DefaultDialer.
Dialer *websocket.Dialer
// Headers are additional HTTP headers to send during the handshake.
Headers http.Header
}
// ReconnectingClient is a WebSocket client that automatically reconnects
// with exponential backoff when the connection drops.
type ReconnectingClient struct {
config ReconnectConfig
conn *websocket.Conn
send chan []byte
state ConnectionState
mu sync.RWMutex
done chan struct{}
ctx context.Context
cancel context.CancelFunc
}
// NewReconnectingClient creates a new reconnecting WebSocket client.
func NewReconnectingClient(config ReconnectConfig) *ReconnectingClient {
if config.InitialBackoff <= 0 {
config.InitialBackoff = 1 * time.Second
}
if config.MaxBackoff <= 0 {
config.MaxBackoff = 30 * time.Second
}
if config.BackoffMultiplier <= 0 {
config.BackoffMultiplier = 2.0
}
if config.Dialer == nil {
config.Dialer = websocket.DefaultDialer
}
return &ReconnectingClient{
config: config,
send: make(chan []byte, 256),
state: StateDisconnected,
done: make(chan struct{}),
}
}
// Connect starts the reconnecting client. It blocks until the context is
// cancelled. The client will automatically reconnect on connection loss.
func (rc *ReconnectingClient) Connect(ctx context.Context) error {
rc.ctx, rc.cancel = context.WithCancel(ctx)
defer rc.cancel()
attempt := 0
wasConnected := false
for {
select {
case <-rc.ctx.Done():
rc.setState(StateDisconnected)
return rc.ctx.Err()
default:
}
rc.setState(StateConnecting)
attempt++
conn, _, err := rc.config.Dialer.DialContext(rc.ctx, rc.config.URL, rc.config.Headers)
if err != nil {
if rc.config.MaxRetries > 0 && attempt > rc.config.MaxRetries {
rc.setState(StateDisconnected)
return fmt.Errorf("max retries (%d) exceeded: %w", rc.config.MaxRetries, err)
}
backoff := rc.calculateBackoff(attempt)
select {
case <-rc.ctx.Done():
rc.setState(StateDisconnected)
return rc.ctx.Err()
case <-time.After(backoff):
continue
}
}
// Connected successfully
rc.mu.Lock()
rc.conn = conn
rc.mu.Unlock()
rc.setState(StateConnected)
if wasConnected {
if rc.config.OnReconnect != nil {
rc.config.OnReconnect(attempt)
}
} else {
if rc.config.OnConnect != nil {
rc.config.OnConnect()
}
}
// Reset attempt counter after a successful connection
attempt = 0
wasConnected = true
// Run the read loop — blocks until connection drops
rc.readLoop()
// Connection lost
rc.mu.Lock()
rc.conn = nil
rc.mu.Unlock()
if rc.config.OnDisconnect != nil {
rc.config.OnDisconnect()
}
}
}
// Send sends a message to the server. Returns an error if not connected.
func (rc *ReconnectingClient) Send(msg Message) error {
msg.Timestamp = time.Now()
data, err := json.Marshal(msg)
if err != nil {
return fmt.Errorf("failed to marshal message: %w", err)
}
rc.mu.RLock()
conn := rc.conn
rc.mu.RUnlock()
if conn == nil {
return fmt.Errorf("not connected")
}
rc.mu.Lock()
defer rc.mu.Unlock()
return rc.conn.WriteMessage(websocket.TextMessage, data)
}
// State returns the current connection state.
func (rc *ReconnectingClient) State() ConnectionState {
rc.mu.RLock()
defer rc.mu.RUnlock()
return rc.state
}
// Close gracefully shuts down the reconnecting client.
func (rc *ReconnectingClient) Close() error {
if rc.cancel != nil {
rc.cancel()
}
rc.mu.RLock()
conn := rc.conn
rc.mu.RUnlock()
if conn != nil {
return conn.Close()
}
return nil
}
func (rc *ReconnectingClient) setState(state ConnectionState) {
rc.mu.Lock()
rc.state = state
rc.mu.Unlock()
}
func (rc *ReconnectingClient) calculateBackoff(attempt int) time.Duration {
backoff := rc.config.InitialBackoff
for i := 1; i < attempt; i++ {
backoff = time.Duration(float64(backoff) * rc.config.BackoffMultiplier)
if backoff > rc.config.MaxBackoff {
backoff = rc.config.MaxBackoff
break
}
}
return backoff
}
func (rc *ReconnectingClient) readLoop() {
rc.mu.RLock()
conn := rc.conn
rc.mu.RUnlock()
if conn == nil {
return
}
for {
_, data, err := conn.ReadMessage()
if err != nil {
return
}
if rc.config.OnMessage != nil {
var msg Message
if jsonErr := json.Unmarshal(data, &msg); jsonErr == nil {
rc.config.OnMessage(msg)
}
}
}
}

1233
ws_test.go

File diff suppressed because it is too large Load diff