From 65ae0fca6d77a8c68a5a5a1ecd76ad05d324f800 Mon Sep 17 00:00:00 2001 From: Virgil Date: Wed, 1 Apr 2026 09:45:30 +0000 Subject: [PATCH] feat(api): drain SSE clients on shutdown Co-Authored-By: Virgil --- api.go | 6 ++++++ sse_test.go | 59 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 65 insertions(+) diff --git a/api.go b/api.go index d391726..6c9fc05 100644 --- a/api.go +++ b/api.go @@ -123,6 +123,12 @@ func (e *Engine) Serve(ctx context.Context) error { // Block until context is cancelled. <-ctx.Done() + // Signal SSE clients first so their handlers can exit cleanly before the + // HTTP server begins its own shutdown sequence. + if e.sseBroker != nil { + e.sseBroker.Drain() + } + // Graceful shutdown with timeout. shutdownCtx, cancel := context.WithTimeout(context.Background(), shutdownTimeout) defer cancel() diff --git a/sse_test.go b/sse_test.go index a2769e3..741603f 100644 --- a/sse_test.go +++ b/sse_test.go @@ -4,6 +4,8 @@ package api_test import ( "bufio" + "context" + "net" "net/http" "net/http/httptest" "strings" @@ -339,6 +341,63 @@ func TestNoSSEBroker_Good(t *testing.T) { } } +func TestWithSSE_Good_EngineShutdownDrainsClients(t *testing.T) { + gin.SetMode(gin.TestMode) + + broker := api.NewSSEBroker() + + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("failed to allocate listener: %v", err) + } + addr := ln.Addr().String() + _ = ln.Close() + + e, err := api.New(api.WithAddr(addr), api.WithSSE(broker)) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + ctx, cancel := context.WithCancel(context.Background()) + errCh := make(chan error, 1) + go func() { + errCh <- e.Serve(ctx) + }() + + deadline := time.Now().Add(2 * time.Second) + for time.Now().Before(deadline) { + conn, err := net.DialTimeout("tcp", addr, 100*time.Millisecond) + if err == nil { + _ = conn.Close() + break + } + time.Sleep(50 * time.Millisecond) + } + + resp, err := http.Get("http://" + addr + "/events") + if err != nil { + t.Fatalf("request failed: %v", err) + } + defer resp.Body.Close() + + waitForClients(t, broker, 1) + + cancel() + + select { + case serveErr := <-errCh: + if serveErr != nil { + t.Fatalf("Serve returned unexpected error: %v", serveErr) + } + case <-time.After(5 * time.Second): + t.Fatal("Serve did not return within 5 seconds after context cancellation") + } + + if got := broker.ClientCount(); got != 0 { + t.Fatalf("expected SSE broker to drain all clients, got %d", got) + } +} + // ── Helpers ────────────────────────────────────────────────────────────── // waitForClients polls the broker until the expected number of clients