feat(api): add configurable SSE path

Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
Virgil 2026-04-02 02:01:43 +00:00
parent b4d414b702
commit 39bf094b51
5 changed files with 123 additions and 5 deletions

5
api.go
View file

@ -54,6 +54,7 @@ type Engine struct {
swaggerExternalDocsURL string
pprofEnabled bool
expvarEnabled bool
ssePath string
graphql *graphqlConfig
}
@ -226,7 +227,7 @@ func (e *Engine) build() *gin.Engine {
// Mount SSE endpoint if configured.
if e.sseBroker != nil {
r.GET("/events", e.sseBroker.Handler())
r.GET(resolveSSEPath(e.ssePath), e.sseBroker.Handler())
}
// Mount GraphQL endpoint if configured.
@ -238,7 +239,7 @@ func (e *Engine) build() *gin.Engine {
if e.swaggerEnabled {
ssePath := ""
if e.sseBroker != nil {
ssePath = "/events"
ssePath = resolveSSEPath(e.ssePath)
}
registerSwagger(
r,

View file

@ -438,9 +438,10 @@ func WithHTTPSign(secrets httpsign.Secrets, opts ...httpsign.Option) Option {
}
}
// WithSSE registers a Server-Sent Events broker at GET /events.
// Clients connect to the endpoint and receive a streaming text/event-stream
// response. The broker manages client connections and broadcasts events
// WithSSE registers a Server-Sent Events broker at the configured path.
// By default the endpoint is mounted at GET /events; use WithSSEPath to
// customise the route. Clients receive a streaming text/event-stream
// response and the broker manages client connections and broadcasts events
// published via its Publish method.
//
// Example:
@ -453,6 +454,14 @@ func WithSSE(broker *SSEBroker) Option {
}
}
// WithSSEPath sets a custom URL path for the SSE endpoint.
// The default path is "/events".
func WithSSEPath(path string) Option {
return func(e *Engine) {
e.ssePath = normaliseSSEPath(path)
}
}
// WithLocation adds reverse proxy header detection middleware via
// gin-contrib/location. It inspects X-Forwarded-Proto and X-Forwarded-Host
// headers to determine the original scheme and host when the server runs

29
sse.go
View file

@ -6,11 +6,15 @@ import (
"encoding/json"
"fmt"
"net/http"
"strings"
"sync"
"github.com/gin-gonic/gin"
)
// defaultSSEPath is the URL path where the SSE endpoint is mounted.
const defaultSSEPath = "/events"
// SSEBroker manages Server-Sent Events connections and broadcasts events
// to subscribed clients. Clients connect via a GET endpoint and receive
// a streaming text/event-stream response. Each client may optionally
@ -52,6 +56,31 @@ func NewSSEBroker() *SSEBroker {
}
}
// normaliseSSEPath coerces custom SSE paths into a stable form.
// The path always begins with a single slash and never ends with one.
func normaliseSSEPath(path string) string {
path = strings.TrimSpace(path)
if path == "" {
return defaultSSEPath
}
path = "/" + strings.Trim(path, "/")
if path == "/" {
return defaultSSEPath
}
return path
}
// resolveSSEPath returns the configured SSE path or the default path when
// no override has been provided.
func resolveSSEPath(path string) string {
if strings.TrimSpace(path) == "" {
return defaultSSEPath
}
return normaliseSSEPath(path)
}
// Publish sends an event to all clients subscribed to the given channel.
// Clients subscribed to an empty channel (no ?channel= param) receive
// events on every channel. The data value is JSON-encoded before sending.

View file

@ -48,6 +48,44 @@ func TestWithSSE_Good_EndpointExists(t *testing.T) {
}
}
func TestWithSSE_Good_CustomPath(t *testing.T) {
gin.SetMode(gin.TestMode)
broker := api.NewSSEBroker()
e, err := api.New(api.WithSSE(broker), api.WithSSEPath("/stream"))
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
srv := httptest.NewServer(e.Handler())
defer srv.Close()
resp, err := http.Get(srv.URL + "/stream")
if err != nil {
t.Fatalf("request failed: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
t.Fatalf("expected 200, got %d", resp.StatusCode)
}
ct := resp.Header.Get("Content-Type")
if !strings.HasPrefix(ct, "text/event-stream") {
t.Fatalf("expected Content-Type starting with text/event-stream, got %q", ct)
}
notFoundResp, err := http.Get(srv.URL + "/events")
if err != nil {
t.Fatalf("request to default SSE path failed: %v", err)
}
defer notFoundResp.Body.Close()
if notFoundResp.StatusCode != http.StatusNotFound {
t.Fatalf("expected 404 at default /events when custom path is configured, got %d", notFoundResp.StatusCode)
}
}
func TestWithSSE_Good_ReceivesPublishedEvent(t *testing.T) {
gin.SetMode(gin.TestMode)

View file

@ -240,6 +240,47 @@ func TestSwagger_Good_IncludesSSEEndpoint(t *testing.T) {
}
}
func TestSwagger_Good_UsesCustomSSEPath(t *testing.T) {
gin.SetMode(gin.TestMode)
broker := api.NewSSEBroker()
e, err := api.New(
api.WithSwagger("SSE API", "SSE test", "1.0.0"),
api.WithSSE(broker),
api.WithSSEPath("/stream"),
)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
srv := httptest.NewServer(e.Handler())
defer srv.Close()
resp, err := http.Get(srv.URL + "/swagger/doc.json")
if err != nil {
t.Fatalf("request failed: %v", err)
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
t.Fatalf("failed to read body: %v", err)
}
var doc map[string]any
if err := json.Unmarshal(body, &doc); err != nil {
t.Fatalf("invalid JSON: %v", err)
}
paths := doc["paths"].(map[string]any)
if _, ok := paths["/stream"]; !ok {
t.Fatal("expected custom SSE path /stream in swagger doc")
}
if _, ok := paths["/events"]; ok {
t.Fatal("did not expect default /events path when custom SSE path is configured")
}
}
func TestSwagger_Good_CachesSpec(t *testing.T) {
spec := &swaggerSpecHelper{
title: "Cache Test",