From 53d8a1554414e4e924cc75432a15dd633ab8d0f2 Mon Sep 17 00:00:00 2001 From: Snider Date: Fri, 20 Feb 2026 05:14:11 +0000 Subject: [PATCH] =?UTF-8?q?test:=20expand=20Phase=200=20coverage=20?= =?UTF-8?q?=E2=80=94=2016=20new=20tests,=209=20benchmarks,=20SPDX=20header?= =?UTF-8?q?s?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add 16 additional test functions covering Hub.Run lifecycle (register, broadcast delivery, unregister, duplicate unregister), Subscribe/ Unsubscribe channel management (multiple channels, idempotent, partial leave), SendToChannel with multiple subscribers, SendProcessOutput/ SendProcessStatus edge cases (no subscribers, non-zero exit), readPump ping timestamp verification, writePump batch message delivery, and integration tests (unsubscribe stops delivery, broadcast reaches all, disconnect cleans up everything, concurrent broadcast+subscribe). Add ws_bench_test.go with 9 comprehensive benchmarks using b.Loop() (Go 1.25+) and b.ReportAllocs(): broadcast (100 clients), channel send (50 subscribers), parallel variants, message marshalling, WebSocket end-to-end round-trip, subscribe/unsubscribe cycle, multi-channel fanout, and concurrent subscriber registration. Add SPDX-Licence-Identifier headers to ws.go, ws_test.go, ws_bench_test.go. go vet clean, go test -race clean, 66 tests + 11 benchmarks total. Co-Authored-By: Virgil Co-Authored-By: Claude Opus 4.6 --- TODO.md | 12 +- ws.go | 2 + ws_bench_test.go | 291 ++++++++++++++++++++++++++++ ws_test.go | 488 +++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 787 insertions(+), 6 deletions(-) create mode 100644 ws_bench_test.go diff --git a/TODO.md b/TODO.md index 3f83d21..f9d4604 100644 --- a/TODO.md +++ b/TODO.md @@ -4,13 +4,13 @@ Dispatched from core/go orchestration. Pick up tasks in order. --- -## Phase 0: Hardening & Test Coverage +## Phase 0: Hardening & Test Coverage (complete) -- [x] **Expand test coverage** — Added tests for: `Hub.Run()` shutdown closing all clients, broadcast to client with full buffer (unregister path), `SendToChannel` with full client buffer (skip path), `Broadcast`/`SendToChannel` marshal errors, `Handler` upgrade error on non-WebSocket request, `Client.Close()`, `readPump` malformed JSON, subscribe/unsubscribe with non-string data, unknown message types, `writePump` close-on-channel-close and batch sending, concurrent subscribe/unsubscribe race test, multiple clients on same channel, end-to-end process output and status tests. Coverage: 88.4% → 98.5%. -- [x] **Integration test** — Full end-to-end tests using `httptest.NewServer` + real WebSocket clients. Multi-client channel delivery, process output streaming, process status updates. -- [x] **Benchmark** — `BenchmarkBroadcast` with 100 clients, `BenchmarkSendToChannel` with 50 subscribers. -- [x] **`go vet ./...` clean** — No warnings. -- [x] **Race condition fix** — Fixed data race in `SendToChannel` where client map was iterated outside the read lock. Clients are now copied under lock before iteration. +- [x] **Expand test coverage** — 67 test functions total across two sessions. Hub.Run lifecycle (register, broadcast delivery, shutdown, unregister, duplicate unregister), Subscribe/Unsubscribe (multiple channels, idempotent, partial leave, concurrent race), SendToChannel (no subscribers, multiple subscribers, buffer full), SendProcessOutput/SendProcessStatus (no subscribers, non-zero exit), readPump (subscribe, unsubscribe, ping, invalid JSON, non-string data, unknown types), writePump (batch sending, close-on-channel-close), buffer overflow disconnect, marshal errors, Handler upgrade error, Client.Close(), broadcast reaches all clients, disconnect cleans up everything. +- [x] **Integration test** — End-to-end tests using httptest.NewServer + gorilla/websocket Dial: connect-subscribe-send-receive, multiple clients on same channel, unsubscribe stops delivery, broadcast reaches all clients, process output/status streaming, disconnect cleanup. +- [x] **Benchmark** — 9 benchmarks in ws_bench_test.go: BenchmarkBroadcast_100 (100 clients), BenchmarkSendToChannel_50 (50 subscribers), parallel variants, message marshalling, WebSocket end-to-end, subscribe/unsubscribe cycle, multi-channel fanout, concurrent subscribers. All use b.ReportAllocs() and b.Loop() (Go 1.25+). Plus 2 inline benchmarks in ws_test.go. +- [x] **`go vet ./...` clean** — No warnings. Race-free under `go test -race`. +- [x] **Race condition fix** — Fixed data race in SendToChannel: clients now copied under lock before iteration. ## Phase 1: Connection Resilience diff --git a/ws.go b/ws.go index cbc31ee..c8a3a2e 100644 --- a/ws.go +++ b/ws.go @@ -1,3 +1,5 @@ +// SPDX-Licence-Identifier: EUPL-1.2 + // Package ws provides WebSocket support for real-time streaming. // // The ws package enables live process output, events, and bidirectional communication diff --git a/ws_bench_test.go b/ws_bench_test.go new file mode 100644 index 0000000..1a44822 --- /dev/null +++ b/ws_bench_test.go @@ -0,0 +1,291 @@ +// SPDX-Licence-Identifier: EUPL-1.2 + +package ws + +import ( + "context" + "encoding/json" + "fmt" + "net/http/httptest" + "sync" + "testing" + + "github.com/gorilla/websocket" +) + +// BenchmarkBroadcast_100 measures broadcast throughput with 100 connected clients. +// Uses b.Loop() (Go 1.25+) and b.ReportAllocs() for accurate profiling. +func BenchmarkBroadcast_100(b *testing.B) { + hub := NewHub() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go hub.Run(ctx) + + numClients := 100 + clients := make([]*Client, numClients) + for i := 0; i < numClients; i++ { + clients[i] = &Client{ + hub: hub, + send: make(chan []byte, 4096), + subscriptions: make(map[string]bool), + } + hub.register <- clients[i] + } + for hub.ClientCount() < numClients { + } + + msg := Message{Type: TypeEvent, Data: "bench"} + + b.ResetTimer() + b.ReportAllocs() + + for b.Loop() { + _ = hub.Broadcast(msg) + } + + b.StopTimer() + for _, c := range clients { + for len(c.send) > 0 { + <-c.send + } + } +} + +// BenchmarkSendToChannel_50 measures channel-targeted delivery with 50 subscribers. +func BenchmarkSendToChannel_50(b *testing.B) { + hub := NewHub() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go hub.Run(ctx) + + numSubscribers := 50 + for i := 0; i < numSubscribers; i++ { + client := &Client{ + hub: hub, + send: make(chan []byte, 4096), + subscriptions: make(map[string]bool), + } + hub.mu.Lock() + hub.clients[client] = true + hub.mu.Unlock() + hub.Subscribe(client, "bench-channel") + } + + msg := Message{Type: TypeEvent, Data: "bench-chan"} + + b.ResetTimer() + b.ReportAllocs() + + for b.Loop() { + _ = hub.SendToChannel("bench-channel", msg) + } +} + +// BenchmarkBroadcast_Parallel measures concurrent broadcast throughput. +func BenchmarkBroadcast_Parallel(b *testing.B) { + hub := NewHub() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go hub.Run(ctx) + + numClients := 100 + clients := make([]*Client, numClients) + for i := 0; i < numClients; i++ { + clients[i] = &Client{ + hub: hub, + send: make(chan []byte, 8192), + subscriptions: make(map[string]bool), + } + hub.register <- clients[i] + } + for hub.ClientCount() < numClients { + } + + msg := Message{Type: TypeEvent, Data: "parallel-bench"} + + b.ResetTimer() + b.ReportAllocs() + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + _ = hub.Broadcast(msg) + } + }) +} + +// BenchmarkMarshalMessage measures the cost of JSON message serialisation. +func BenchmarkMarshalMessage(b *testing.B) { + msg := Message{ + Type: TypeProcessOutput, + Channel: "process:bench-1", + ProcessID: "bench-1", + Data: "output line from the build process", + } + + b.ReportAllocs() + + for b.Loop() { + data, _ := json.Marshal(msg) + _ = data + } +} + +// BenchmarkWebSocketEndToEnd measures a full round-trip through a real +// WebSocket connection: server broadcasts, client receives. +func BenchmarkWebSocketEndToEnd(b *testing.B) { + hub := NewHub() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go hub.Run(ctx) + + server := httptest.NewServer(hub.Handler()) + defer server.Close() + + url := "ws" + server.URL[4:] // http -> ws + conn, _, err := websocket.DefaultDialer.Dial(url, nil) + if err != nil { + b.Fatalf("dial failed: %v", err) + } + defer conn.Close() + + for hub.ClientCount() < 1 { + } + + msg := Message{Type: TypeEvent, Data: "e2e-bench"} + + b.ResetTimer() + b.ReportAllocs() + + for b.Loop() { + if err := hub.Broadcast(msg); err != nil { + b.Fatalf("broadcast: %v", err) + } + _, _, err := conn.ReadMessage() + if err != nil { + b.Fatalf("read: %v", err) + } + } +} + +// BenchmarkSubscribeUnsubscribe measures subscribe/unsubscribe cycle throughput. +func BenchmarkSubscribeUnsubscribe(b *testing.B) { + hub := NewHub() + + client := &Client{ + hub: hub, + send: make(chan []byte, 256), + subscriptions: make(map[string]bool), + } + hub.mu.Lock() + hub.clients[client] = true + hub.mu.Unlock() + + b.ReportAllocs() + + for b.Loop() { + hub.Subscribe(client, "bench-sub") + hub.Unsubscribe(client, "bench-sub") + } +} + +// BenchmarkSendToChannel_Parallel measures concurrent channel sends. +func BenchmarkSendToChannel_Parallel(b *testing.B) { + hub := NewHub() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go hub.Run(ctx) + + numSubscribers := 50 + clients := make([]*Client, numSubscribers) + for i := 0; i < numSubscribers; i++ { + clients[i] = &Client{ + hub: hub, + send: make(chan []byte, 8192), + subscriptions: make(map[string]bool), + } + hub.mu.Lock() + hub.clients[clients[i]] = true + hub.mu.Unlock() + hub.Subscribe(clients[i], "parallel-chan") + } + + msg := Message{Type: TypeEvent, Data: "p-bench"} + + b.ResetTimer() + b.ReportAllocs() + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + _ = hub.SendToChannel("parallel-chan", msg) + } + }) +} + +// BenchmarkMultiChannelFanout measures broadcasting to multiple channels +// with different subscriber counts. +func BenchmarkMultiChannelFanout(b *testing.B) { + hub := NewHub() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go hub.Run(ctx) + + numChannels := 10 + subsPerChannel := 10 + channels := make([]string, numChannels) + + for ch := 0; ch < numChannels; ch++ { + channels[ch] = fmt.Sprintf("fanout-%d", ch) + for s := 0; s < subsPerChannel; s++ { + client := &Client{ + hub: hub, + send: make(chan []byte, 4096), + subscriptions: make(map[string]bool), + } + hub.mu.Lock() + hub.clients[client] = true + hub.mu.Unlock() + hub.Subscribe(client, channels[ch]) + } + } + + msg := Message{Type: TypeEvent, Data: "fanout"} + + b.ResetTimer() + b.ReportAllocs() + + for b.Loop() { + for _, ch := range channels { + _ = hub.SendToChannel(ch, msg) + } + } +} + +// BenchmarkConcurrentSubscribers measures the cost of subscribing many +// clients concurrently to the same channel. +func BenchmarkConcurrentSubscribers(b *testing.B) { + hub := NewHub() + + b.ReportAllocs() + + for b.Loop() { + var wg sync.WaitGroup + for i := 0; i < 100; i++ { + wg.Add(1) + go func() { + defer wg.Done() + client := &Client{ + hub: hub, + send: make(chan []byte, 1), + subscriptions: make(map[string]bool), + } + hub.Subscribe(client, "conc-sub-bench") + }() + } + wg.Wait() + + // Reset for next iteration + hub.mu.Lock() + hub.channels = make(map[string]map[*Client]bool) + hub.mu.Unlock() + } +} diff --git a/ws_test.go b/ws_test.go index b116a65..f20ac5a 100644 --- a/ws_test.go +++ b/ws_test.go @@ -1,8 +1,11 @@ +// SPDX-Licence-Identifier: EUPL-1.2 + package ws import ( "context" "encoding/json" + "fmt" "net" "net/http" "net/http/httptest" @@ -16,6 +19,11 @@ import ( "github.com/stretchr/testify/require" ) +// wsURL converts an httptest server URL to a WebSocket URL. +func wsURL(server *httptest.Server) string { + return "ws" + strings.TrimPrefix(server.URL, "http") +} + func TestNewHub(t *testing.T) { t.Run("creates hub with initialized maps", func(t *testing.T) { hub := NewHub() @@ -2023,3 +2031,483 @@ func TestConnectionState(t *testing.T) { }) } +// --------------------------------------------------------------------------- +// Hub.Run lifecycle — register, broadcast delivery, unregister via channels +// --------------------------------------------------------------------------- + +func TestHubRun_RegisterClient_Good(t *testing.T) { + hub := NewHub() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go hub.Run(ctx) + + client := &Client{ + hub: hub, + send: make(chan []byte, 256), + subscriptions: make(map[string]bool), + } + + hub.register <- client + time.Sleep(20 * time.Millisecond) + + assert.Equal(t, 1, hub.ClientCount(), "client should be registered via hub loop") +} + +func TestHubRun_BroadcastDelivery_Good(t *testing.T) { + hub := NewHub() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go hub.Run(ctx) + + client := &Client{ + hub: hub, + send: make(chan []byte, 256), + subscriptions: make(map[string]bool), + } + + hub.register <- client + time.Sleep(20 * time.Millisecond) + + err := hub.Broadcast(Message{Type: TypeEvent, Data: "lifecycle-test"}) + require.NoError(t, err) + + // Hub.Run loop delivers the broadcast to the client's send channel + select { + case msg := <-client.send: + var received Message + require.NoError(t, json.Unmarshal(msg, &received)) + assert.Equal(t, TypeEvent, received.Type) + assert.Equal(t, "lifecycle-test", received.Data) + case <-time.After(time.Second): + t.Fatal("broadcast should be delivered via hub loop") + } +} + +func TestHubRun_UnregisterClient_Good(t *testing.T) { + hub := NewHub() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go hub.Run(ctx) + + client := &Client{ + hub: hub, + send: make(chan []byte, 256), + subscriptions: make(map[string]bool), + } + + hub.register <- client + time.Sleep(20 * time.Millisecond) + assert.Equal(t, 1, hub.ClientCount()) + + // Subscribe so we can verify channel cleanup + hub.Subscribe(client, "lifecycle-chan") + assert.Equal(t, 1, hub.ChannelSubscriberCount("lifecycle-chan")) + + hub.unregister <- client + time.Sleep(20 * time.Millisecond) + + assert.Equal(t, 0, hub.ClientCount()) + assert.Equal(t, 0, hub.ChannelSubscriberCount("lifecycle-chan")) +} + +func TestHubRun_UnregisterIgnoresDuplicate_Bad(t *testing.T) { + hub := NewHub() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go hub.Run(ctx) + + client := &Client{ + hub: hub, + send: make(chan []byte, 256), + subscriptions: make(map[string]bool), + } + + hub.register <- client + time.Sleep(20 * time.Millisecond) + + hub.unregister <- client + time.Sleep(20 * time.Millisecond) + + // Second unregister should not panic or block + done := make(chan struct{}) + go func() { + hub.unregister <- client + close(done) + }() + + select { + case <-done: + // Good -- no panic, no block + case <-time.After(time.Second): + t.Fatal("duplicate unregister should not block") + } +} + +// --------------------------------------------------------------------------- +// Subscribe / Unsubscribe — additional channel management tests +// --------------------------------------------------------------------------- + +func TestSubscribe_MultipleChannels_Good(t *testing.T) { + hub := NewHub() + client := &Client{ + hub: hub, + send: make(chan []byte, 256), + subscriptions: make(map[string]bool), + } + + hub.Subscribe(client, "alpha") + hub.Subscribe(client, "beta") + hub.Subscribe(client, "gamma") + + assert.Equal(t, 3, hub.ChannelCount()) + subs := client.Subscriptions() + assert.Len(t, subs, 3) + assert.Contains(t, subs, "alpha") + assert.Contains(t, subs, "beta") + assert.Contains(t, subs, "gamma") +} + +func TestSubscribe_IdempotentDoubleSubscribe_Good(t *testing.T) { + hub := NewHub() + client := &Client{ + hub: hub, + send: make(chan []byte, 256), + subscriptions: make(map[string]bool), + } + + hub.Subscribe(client, "dupl") + hub.Subscribe(client, "dupl") + + // Still only one subscriber entry in the channel map + assert.Equal(t, 1, hub.ChannelSubscriberCount("dupl")) +} + +func TestUnsubscribe_PartialLeave_Good(t *testing.T) { + hub := NewHub() + client1 := &Client{hub: hub, send: make(chan []byte, 256), subscriptions: make(map[string]bool)} + client2 := &Client{hub: hub, send: make(chan []byte, 256), subscriptions: make(map[string]bool)} + + hub.Subscribe(client1, "shared") + hub.Subscribe(client2, "shared") + assert.Equal(t, 2, hub.ChannelSubscriberCount("shared")) + + hub.Unsubscribe(client1, "shared") + assert.Equal(t, 1, hub.ChannelSubscriberCount("shared")) + + // Channel still exists because client2 is subscribed + hub.mu.RLock() + _, exists := hub.channels["shared"] + hub.mu.RUnlock() + assert.True(t, exists, "channel should persist while subscribers remain") +} + +// --------------------------------------------------------------------------- +// SendToChannel — multiple subscribers +// --------------------------------------------------------------------------- + +func TestSendToChannel_MultipleSubscribers_Good(t *testing.T) { + hub := NewHub() + clients := make([]*Client, 5) + for i := range clients { + clients[i] = &Client{ + hub: hub, + send: make(chan []byte, 256), + subscriptions: make(map[string]bool), + } + hub.Subscribe(clients[i], "multi") + } + + err := hub.SendToChannel("multi", Message{Type: TypeEvent, Data: "fanout"}) + require.NoError(t, err) + + for i, c := range clients { + select { + case msg := <-c.send: + var received Message + require.NoError(t, json.Unmarshal(msg, &received)) + assert.Equal(t, "multi", received.Channel) + case <-time.After(time.Second): + t.Fatalf("client %d should have received the message", i) + } + } +} + +// --------------------------------------------------------------------------- +// SendProcessOutput / SendProcessStatus — edge cases +// --------------------------------------------------------------------------- + +func TestSendProcessOutput_NoSubscribers_Good(t *testing.T) { + hub := NewHub() + err := hub.SendProcessOutput("orphan-proc", "some output") + assert.NoError(t, err, "sending to a process with no subscribers should not error") +} + +func TestSendProcessStatus_NonZeroExit_Good(t *testing.T) { + hub := NewHub() + client := &Client{ + hub: hub, + send: make(chan []byte, 256), + subscriptions: make(map[string]bool), + } + hub.Subscribe(client, "process:fail-1") + + err := hub.SendProcessStatus("fail-1", "exited", 137) + require.NoError(t, err) + + select { + case msg := <-client.send: + var received Message + require.NoError(t, json.Unmarshal(msg, &received)) + assert.Equal(t, TypeProcessStatus, received.Type) + assert.Equal(t, "fail-1", received.ProcessID) + data := received.Data.(map[string]any) + assert.Equal(t, "exited", data["status"]) + assert.Equal(t, float64(137), data["exitCode"]) + case <-time.After(time.Second): + t.Fatal("expected process status message") + } +} + +// --------------------------------------------------------------------------- +// readPump — ping with timestamp verification +// --------------------------------------------------------------------------- + +func TestReadPump_PingTimestamp_Good(t *testing.T) { + hub := NewHub() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go hub.Run(ctx) + + server := httptest.NewServer(hub.Handler()) + defer server.Close() + + conn, _, err := websocket.DefaultDialer.Dial(wsURL(server), nil) + require.NoError(t, err) + defer conn.Close() + time.Sleep(50 * time.Millisecond) + + err = conn.WriteJSON(Message{Type: TypePing}) + require.NoError(t, err) + + conn.SetReadDeadline(time.Now().Add(time.Second)) + var pong Message + err = conn.ReadJSON(&pong) + require.NoError(t, err) + assert.Equal(t, TypePong, pong.Type) + assert.False(t, pong.Timestamp.IsZero(), "pong should include a timestamp") +} + +// --------------------------------------------------------------------------- +// writePump — batch sending with multiple messages +// --------------------------------------------------------------------------- + +func TestWritePump_BatchMultipleMessages_Good(t *testing.T) { + hub := NewHub() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go hub.Run(ctx) + + server := httptest.NewServer(hub.Handler()) + defer server.Close() + + conn, _, err := websocket.DefaultDialer.Dial(wsURL(server), nil) + require.NoError(t, err) + defer conn.Close() + time.Sleep(50 * time.Millisecond) + + // Rapidly send multiple broadcasts so they queue up + numMessages := 10 + for i := 0; i < numMessages; i++ { + err := hub.Broadcast(Message{ + Type: TypeEvent, + Data: fmt.Sprintf("batch-%d", i), + }) + require.NoError(t, err) + } + + time.Sleep(100 * time.Millisecond) + + // Read all messages — batched with newline separators + received := 0 + conn.SetReadDeadline(time.Now().Add(2 * time.Second)) + for received < numMessages { + _, raw, err := conn.ReadMessage() + if err != nil { + break + } + parts := strings.Split(string(raw), "\n") + for _, part := range parts { + part = strings.TrimSpace(part) + if part == "" { + continue + } + var msg Message + if json.Unmarshal([]byte(part), &msg) == nil { + received++ + } + } + } + + assert.Equal(t, numMessages, received, "all batched messages should be received") +} + +// --------------------------------------------------------------------------- +// Integration — unsubscribe stops delivery +// --------------------------------------------------------------------------- + +func TestIntegration_UnsubscribeStopsDelivery_Good(t *testing.T) { + hub := NewHub() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go hub.Run(ctx) + + server := httptest.NewServer(hub.Handler()) + defer server.Close() + + conn, _, err := websocket.DefaultDialer.Dial(wsURL(server), nil) + require.NoError(t, err) + defer conn.Close() + time.Sleep(50 * time.Millisecond) + + // Subscribe + err = conn.WriteJSON(Message{Type: TypeSubscribe, Data: "temp:feed"}) + require.NoError(t, err) + time.Sleep(50 * time.Millisecond) + + // Verify we receive messages on the channel + err = hub.SendToChannel("temp:feed", Message{Type: TypeEvent, Data: "before-unsub"}) + require.NoError(t, err) + + conn.SetReadDeadline(time.Now().Add(time.Second)) + var msg1 Message + err = conn.ReadJSON(&msg1) + require.NoError(t, err) + assert.Equal(t, "before-unsub", msg1.Data) + + // Unsubscribe + err = conn.WriteJSON(Message{Type: TypeUnsubscribe, Data: "temp:feed"}) + require.NoError(t, err) + time.Sleep(50 * time.Millisecond) + + // Send another message -- client should NOT receive it + err = hub.SendToChannel("temp:feed", Message{Type: TypeEvent, Data: "after-unsub"}) + require.NoError(t, err) + + // Try to read -- should timeout (no message delivered) + conn.SetReadDeadline(time.Now().Add(200 * time.Millisecond)) + var msg2 Message + err = conn.ReadJSON(&msg2) + assert.Error(t, err, "should not receive messages after unsubscribing") +} + +// --------------------------------------------------------------------------- +// Integration — broadcast reaches all clients (no channel subscription) +// --------------------------------------------------------------------------- + +func TestIntegration_BroadcastReachesAllClients_Good(t *testing.T) { + hub := NewHub() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go hub.Run(ctx) + + server := httptest.NewServer(hub.Handler()) + defer server.Close() + + numClients := 3 + conns := make([]*websocket.Conn, numClients) + for i := 0; i < numClients; i++ { + conn, _, err := websocket.DefaultDialer.Dial(wsURL(server), nil) + require.NoError(t, err) + defer conn.Close() + conns[i] = conn + } + + time.Sleep(100 * time.Millisecond) + assert.Equal(t, numClients, hub.ClientCount()) + + // Broadcast -- no channel subscription needed + err := hub.Broadcast(Message{Type: TypeError, Data: "global-alert"}) + require.NoError(t, err) + + for i, conn := range conns { + conn.SetReadDeadline(time.Now().Add(2 * time.Second)) + var received Message + err := conn.ReadJSON(&received) + require.NoError(t, err, "client %d should receive broadcast", i) + assert.Equal(t, TypeError, received.Type) + assert.Equal(t, "global-alert", received.Data) + } +} + +// --------------------------------------------------------------------------- +// Integration — disconnect cleans up all subscriptions +// --------------------------------------------------------------------------- + +func TestIntegration_DisconnectCleansUpEverything_Good(t *testing.T) { + hub := NewHub() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go hub.Run(ctx) + + server := httptest.NewServer(hub.Handler()) + defer server.Close() + + conn, _, err := websocket.DefaultDialer.Dial(wsURL(server), nil) + require.NoError(t, err) + + // Subscribe to multiple channels + err = conn.WriteJSON(Message{Type: TypeSubscribe, Data: "ch-a"}) + require.NoError(t, err) + err = conn.WriteJSON(Message{Type: TypeSubscribe, Data: "ch-b"}) + require.NoError(t, err) + time.Sleep(50 * time.Millisecond) + + assert.Equal(t, 1, hub.ClientCount()) + assert.Equal(t, 1, hub.ChannelSubscriberCount("ch-a")) + assert.Equal(t, 1, hub.ChannelSubscriberCount("ch-b")) + + // Disconnect + conn.Close() + time.Sleep(100 * time.Millisecond) + + assert.Equal(t, 0, hub.ClientCount()) + assert.Equal(t, 0, hub.ChannelSubscriberCount("ch-a")) + assert.Equal(t, 0, hub.ChannelSubscriberCount("ch-b")) + assert.Equal(t, 0, hub.ChannelCount(), "empty channels should be cleaned up") +} + +// --------------------------------------------------------------------------- +// Concurrent broadcast + subscribe via hub loop (race test) +// --------------------------------------------------------------------------- + +func TestConcurrentSubscribeAndBroadcast_Good(t *testing.T) { + hub := NewHub() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go hub.Run(ctx) + + var wg sync.WaitGroup + + for i := 0; i < 50; i++ { + wg.Add(2) + go func(id int) { + defer wg.Done() + client := &Client{ + hub: hub, + send: make(chan []byte, 256), + subscriptions: make(map[string]bool), + } + hub.register <- client + }(i) + go func(id int) { + defer wg.Done() + _ = hub.Broadcast(Message{Type: TypeEvent, Data: id}) + }(i) + } + + wg.Wait() + time.Sleep(100 * time.Millisecond) + + assert.Equal(t, 50, hub.ClientCount()) +} +