feat(api): drain SSE clients on shutdown
Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
parent
837a910148
commit
65ae0fca6d
2 changed files with 65 additions and 0 deletions
6
api.go
6
api.go
|
|
@ -123,6 +123,12 @@ func (e *Engine) Serve(ctx context.Context) error {
|
|||
// Block until context is cancelled.
|
||||
<-ctx.Done()
|
||||
|
||||
// Signal SSE clients first so their handlers can exit cleanly before the
|
||||
// HTTP server begins its own shutdown sequence.
|
||||
if e.sseBroker != nil {
|
||||
e.sseBroker.Drain()
|
||||
}
|
||||
|
||||
// Graceful shutdown with timeout.
|
||||
shutdownCtx, cancel := context.WithTimeout(context.Background(), shutdownTimeout)
|
||||
defer cancel()
|
||||
|
|
|
|||
59
sse_test.go
59
sse_test.go
|
|
@ -4,6 +4,8 @@ package api_test
|
|||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"strings"
|
||||
|
|
@ -339,6 +341,63 @@ func TestNoSSEBroker_Good(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestWithSSE_Good_EngineShutdownDrainsClients(t *testing.T) {
|
||||
gin.SetMode(gin.TestMode)
|
||||
|
||||
broker := api.NewSSEBroker()
|
||||
|
||||
ln, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
if err != nil {
|
||||
t.Fatalf("failed to allocate listener: %v", err)
|
||||
}
|
||||
addr := ln.Addr().String()
|
||||
_ = ln.Close()
|
||||
|
||||
e, err := api.New(api.WithAddr(addr), api.WithSSE(broker))
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
errCh := make(chan error, 1)
|
||||
go func() {
|
||||
errCh <- e.Serve(ctx)
|
||||
}()
|
||||
|
||||
deadline := time.Now().Add(2 * time.Second)
|
||||
for time.Now().Before(deadline) {
|
||||
conn, err := net.DialTimeout("tcp", addr, 100*time.Millisecond)
|
||||
if err == nil {
|
||||
_ = conn.Close()
|
||||
break
|
||||
}
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
}
|
||||
|
||||
resp, err := http.Get("http://" + addr + "/events")
|
||||
if err != nil {
|
||||
t.Fatalf("request failed: %v", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
waitForClients(t, broker, 1)
|
||||
|
||||
cancel()
|
||||
|
||||
select {
|
||||
case serveErr := <-errCh:
|
||||
if serveErr != nil {
|
||||
t.Fatalf("Serve returned unexpected error: %v", serveErr)
|
||||
}
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Fatal("Serve did not return within 5 seconds after context cancellation")
|
||||
}
|
||||
|
||||
if got := broker.ClientCount(); got != 0 {
|
||||
t.Fatalf("expected SSE broker to drain all clients, got %d", got)
|
||||
}
|
||||
}
|
||||
|
||||
// ── Helpers ──────────────────────────────────────────────────────────────
|
||||
|
||||
// waitForClients polls the broker until the expected number of clients
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue