// SPDX-Licence-Identifier: EUPL-1.2 package api_test import ( "context" "encoding/json" "net/http" "net/http/httptest" "os" "os/exec" "strconv" "strings" "testing" "time" core "dappco.re/go/core" goapi "dappco.re/go/core/api" process "dappco.re/go/core/process" processapi "dappco.re/go/core/process/pkg/api" corews "dappco.re/go/core/ws" "github.com/gin-gonic/gin" "github.com/gorilla/websocket" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) func init() { gin.SetMode(gin.TestMode) } func TestProcessProvider_Name_Good(t *testing.T) { p := processapi.NewProvider(nil, nil, nil) assert.Equal(t, "process", p.Name()) } func TestProcessProvider_BasePath_Good(t *testing.T) { p := processapi.NewProvider(nil, nil, nil) assert.Equal(t, "/api/process", p.BasePath()) } func TestProcessProvider_Channels_Good(t *testing.T) { p := processapi.NewProvider(nil, nil, nil) channels := p.Channels() assert.Contains(t, channels, "process.daemon.started") assert.Contains(t, channels, "process.daemon.stopped") assert.Contains(t, channels, "process.daemon.health") } func TestProcessProvider_Describe_Good(t *testing.T) { p := processapi.NewProvider(nil, nil, nil) descs := p.Describe() assert.GreaterOrEqual(t, len(descs), 5) // Verify all descriptions have required fields for _, d := range descs { assert.NotEmpty(t, d.Method) assert.NotEmpty(t, d.Path) assert.NotEmpty(t, d.Summary) assert.NotEmpty(t, d.Tags) } foundPipelineRoute := false foundSignalRoute := false for _, d := range descs { if d.Method == "POST" && d.Path == "/pipelines/run" { foundPipelineRoute = true } if d.Method == "POST" && d.Path == "/processes/:id/signal" { foundSignalRoute = true } } assert.True(t, foundPipelineRoute, "pipeline route should be described") assert.True(t, foundSignalRoute, "signal route should be described") } func TestProcessProvider_ListDaemons_Good(t *testing.T) { // Use a temp directory so the registry has no daemons dir := t.TempDir() registry := newTestRegistry(dir) p := processapi.NewProvider(registry, nil, nil) r := setupRouter(p) w := httptest.NewRecorder() req, _ := http.NewRequest("GET", "/api/process/daemons", nil) r.ServeHTTP(w, req) assert.Equal(t, http.StatusOK, w.Code) var resp goapi.Response[[]any] err := json.Unmarshal(w.Body.Bytes(), &resp) require.NoError(t, err) assert.True(t, resp.Success) } func TestProcessProvider_ListDaemons_BroadcastsStarted_Good(t *testing.T) { dir := t.TempDir() registry := newTestRegistry(dir) require.NoError(t, registry.Register(process.DaemonEntry{ Code: "test", Daemon: "serve", PID: os.Getpid(), })) hub := corews.NewHub() ctx, cancel := context.WithCancel(context.Background()) defer cancel() go hub.Run(ctx) p := processapi.NewProvider(registry, nil, hub) server := httptest.NewServer(hub.Handler()) defer server.Close() conn := connectWS(t, server.URL) defer conn.Close() require.Eventually(t, func() bool { return hub.ClientCount() == 1 }, time.Second, 10*time.Millisecond) r := setupRouter(p) w := httptest.NewRecorder() req, _ := http.NewRequest("GET", "/api/process/daemons", nil) r.ServeHTTP(w, req) assert.Equal(t, http.StatusOK, w.Code) events := readWSEvents(t, conn, "process.daemon.started") started := events["process.daemon.started"] require.NotNil(t, started) startedData := started.Data.(map[string]any) assert.Equal(t, "test", startedData["code"]) assert.Equal(t, "serve", startedData["daemon"]) assert.Equal(t, float64(os.Getpid()), startedData["pid"]) } func TestProcessProvider_GetDaemon_Bad(t *testing.T) { dir := t.TempDir() registry := newTestRegistry(dir) p := processapi.NewProvider(registry, nil, nil) r := setupRouter(p) w := httptest.NewRecorder() req, _ := http.NewRequest("GET", "/api/process/daemons/test/nonexistent", nil) r.ServeHTTP(w, req) assert.Equal(t, http.StatusNotFound, w.Code) } func TestProcessProvider_HealthCheck_Bad(t *testing.T) { dir := t.TempDir() registry := newTestRegistry(dir) healthSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusServiceUnavailable) _, _ = w.Write([]byte("upstream health check failed")) })) defer healthSrv.Close() hostPort := strings.TrimPrefix(healthSrv.URL, "http://") require.NoError(t, registry.Register(process.DaemonEntry{ Code: "test", Daemon: "broken", PID: os.Getpid(), Health: hostPort, })) p := processapi.NewProvider(registry, nil, nil) r := setupRouter(p) w := httptest.NewRecorder() req, _ := http.NewRequest("GET", "/api/process/daemons/test/broken/health", nil) r.ServeHTTP(w, req) assert.Equal(t, http.StatusServiceUnavailable, w.Code) var resp goapi.Response[map[string]any] err := json.Unmarshal(w.Body.Bytes(), &resp) require.NoError(t, err) require.True(t, resp.Success) assert.Equal(t, false, resp.Data["healthy"]) assert.Equal(t, hostPort, resp.Data["address"]) assert.Equal(t, "upstream health check failed", resp.Data["reason"]) } func TestProcessProvider_RegistersAsRouteGroup_Good(t *testing.T) { p := processapi.NewProvider(nil, nil, nil) engine, err := goapi.New() require.NoError(t, err) engine.Register(p) assert.Len(t, engine.Groups(), 1) assert.Equal(t, "process", engine.Groups()[0].Name()) } func TestProcessProvider_Channels_RegisterAsStreamGroup_Good(t *testing.T) { p := processapi.NewProvider(nil, nil, nil) engine, err := goapi.New() require.NoError(t, err) engine.Register(p) // Engine.Channels() discovers StreamGroups channels := engine.Channels() assert.Contains(t, channels, "process.daemon.started") } func TestProcessProvider_RunPipeline_Good(t *testing.T) { svc := newTestProcessService(t) p := processapi.NewProvider(nil, svc, nil) r := setupRouter(p) w := httptest.NewRecorder() body := strings.NewReader(`{ "mode": "parallel", "specs": [ {"name": "first", "command": "echo", "args": ["1"]}, {"name": "second", "command": "echo", "args": ["2"]} ] }`) req, err := http.NewRequest("POST", "/api/process/pipelines/run", body) require.NoError(t, err) req.Header.Set("Content-Type", "application/json") r.ServeHTTP(w, req) assert.Equal(t, http.StatusOK, w.Code) var resp goapi.Response[process.RunAllResult] err = json.Unmarshal(w.Body.Bytes(), &resp) require.NoError(t, err) assert.True(t, resp.Success) assert.Equal(t, 2, resp.Data.Passed) assert.Len(t, resp.Data.Results, 2) } func TestProcessProvider_RunPipeline_Unavailable(t *testing.T) { p := processapi.NewProvider(nil, nil, nil) r := setupRouter(p) w := httptest.NewRecorder() req, err := http.NewRequest("POST", "/api/process/pipelines/run", strings.NewReader(`{"mode":"all","specs":[]}`)) require.NoError(t, err) req.Header.Set("Content-Type", "application/json") r.ServeHTTP(w, req) assert.Equal(t, http.StatusServiceUnavailable, w.Code) } func TestProcessProvider_ListProcesses_Good(t *testing.T) { svc := newTestProcessService(t) proc, err := svc.Start(context.Background(), "echo", "hello-api") require.NoError(t, err) <-proc.Done() p := processapi.NewProvider(nil, svc, nil) r := setupRouter(p) w := httptest.NewRecorder() req, err := http.NewRequest("GET", "/api/process/processes", nil) require.NoError(t, err) r.ServeHTTP(w, req) assert.Equal(t, http.StatusOK, w.Code) var resp goapi.Response[[]process.Info] err = json.Unmarshal(w.Body.Bytes(), &resp) require.NoError(t, err) require.True(t, resp.Success) require.Len(t, resp.Data, 1) assert.Equal(t, proc.ID, resp.Data[0].ID) assert.Equal(t, "echo", resp.Data[0].Command) } func TestProcessProvider_ListProcesses_RunningOnly_Good(t *testing.T) { svc := newTestProcessService(t) runningProc, err := svc.Start(context.Background(), "sleep", "60") require.NoError(t, err) exitedProc, err := svc.Start(context.Background(), "echo", "done") require.NoError(t, err) <-exitedProc.Done() p := processapi.NewProvider(nil, svc, nil) r := setupRouter(p) w := httptest.NewRecorder() req, err := http.NewRequest("GET", "/api/process/processes?runningOnly=true", nil) require.NoError(t, err) r.ServeHTTP(w, req) assert.Equal(t, http.StatusOK, w.Code) var resp goapi.Response[[]process.Info] err = json.Unmarshal(w.Body.Bytes(), &resp) require.NoError(t, err) require.True(t, resp.Success) require.Len(t, resp.Data, 1) assert.Equal(t, runningProc.ID, resp.Data[0].ID) assert.Equal(t, process.StatusRunning, resp.Data[0].Status) require.NoError(t, svc.Kill(runningProc.ID)) <-runningProc.Done() } func TestProcessProvider_StartProcess_Good(t *testing.T) { svc := newTestProcessService(t) p := processapi.NewProvider(nil, svc, nil) r := setupRouter(p) body := strings.NewReader(`{ "command": "sleep", "args": ["60"], "detach": true, "killGroup": true }`) req, err := http.NewRequest("POST", "/api/process/processes", body) require.NoError(t, err) req.Header.Set("Content-Type", "application/json") w := httptest.NewRecorder() r.ServeHTTP(w, req) assert.Equal(t, http.StatusOK, w.Code) var resp goapi.Response[process.Info] err = json.Unmarshal(w.Body.Bytes(), &resp) require.NoError(t, err) require.True(t, resp.Success) assert.Equal(t, "sleep", resp.Data.Command) assert.Equal(t, process.StatusRunning, resp.Data.Status) assert.True(t, resp.Data.Running) assert.NotEmpty(t, resp.Data.ID) managed, err := svc.Get(resp.Data.ID) require.NoError(t, err) require.NoError(t, svc.Kill(managed.ID)) select { case <-managed.Done(): case <-time.After(5 * time.Second): t.Fatal("process should have been killed after start test") } } func TestProcessProvider_RunProcess_Good(t *testing.T) { svc := newTestProcessService(t) p := processapi.NewProvider(nil, svc, nil) r := setupRouter(p) body := strings.NewReader(`{ "command": "echo", "args": ["run-check"] }`) req, err := http.NewRequest("POST", "/api/process/processes/run", body) require.NoError(t, err) req.Header.Set("Content-Type", "application/json") w := httptest.NewRecorder() r.ServeHTTP(w, req) assert.Equal(t, http.StatusOK, w.Code) var resp goapi.Response[string] err = json.Unmarshal(w.Body.Bytes(), &resp) require.NoError(t, err) require.True(t, resp.Success) assert.Contains(t, resp.Data, "run-check") } func TestProcessProvider_GetProcess_Good(t *testing.T) { svc := newTestProcessService(t) proc, err := svc.Start(context.Background(), "echo", "single") require.NoError(t, err) <-proc.Done() p := processapi.NewProvider(nil, svc, nil) r := setupRouter(p) w := httptest.NewRecorder() req, err := http.NewRequest("GET", "/api/process/processes/"+proc.ID, nil) require.NoError(t, err) r.ServeHTTP(w, req) assert.Equal(t, http.StatusOK, w.Code) var resp goapi.Response[process.Info] err = json.Unmarshal(w.Body.Bytes(), &resp) require.NoError(t, err) require.True(t, resp.Success) assert.Equal(t, proc.ID, resp.Data.ID) assert.Equal(t, "echo", resp.Data.Command) } func TestProcessProvider_GetProcessOutput_Good(t *testing.T) { svc := newTestProcessService(t) proc, err := svc.Start(context.Background(), "echo", "output-check") require.NoError(t, err) <-proc.Done() p := processapi.NewProvider(nil, svc, nil) r := setupRouter(p) w := httptest.NewRecorder() req, err := http.NewRequest("GET", "/api/process/processes/"+proc.ID+"/output", nil) require.NoError(t, err) r.ServeHTTP(w, req) assert.Equal(t, http.StatusOK, w.Code) var resp goapi.Response[string] err = json.Unmarshal(w.Body.Bytes(), &resp) require.NoError(t, err) require.True(t, resp.Success) assert.Contains(t, resp.Data, "output-check") } func TestProcessProvider_WaitProcess_Good(t *testing.T) { svc := newTestProcessService(t) proc, err := svc.Start(context.Background(), "echo", "wait-check") require.NoError(t, err) p := processapi.NewProvider(nil, svc, nil) r := setupRouter(p) w := httptest.NewRecorder() req, err := http.NewRequest("POST", "/api/process/processes/"+proc.ID+"/wait", nil) require.NoError(t, err) r.ServeHTTP(w, req) assert.Equal(t, http.StatusOK, w.Code) var resp goapi.Response[process.Info] err = json.Unmarshal(w.Body.Bytes(), &resp) require.NoError(t, err) require.True(t, resp.Success) assert.Equal(t, proc.ID, resp.Data.ID) assert.Equal(t, process.StatusExited, resp.Data.Status) assert.Equal(t, 0, resp.Data.ExitCode) } func TestProcessProvider_WaitProcess_NonZeroExit_Good(t *testing.T) { svc := newTestProcessService(t) proc, err := svc.Start(context.Background(), "sh", "-c", "exit 7") require.NoError(t, err) p := processapi.NewProvider(nil, svc, nil) r := setupRouter(p) w := httptest.NewRecorder() req, err := http.NewRequest("POST", "/api/process/processes/"+proc.ID+"/wait", nil) require.NoError(t, err) r.ServeHTTP(w, req) assert.Equal(t, http.StatusConflict, w.Code) var resp goapi.Response[any] err = json.Unmarshal(w.Body.Bytes(), &resp) require.NoError(t, err) require.False(t, resp.Success) require.NotNil(t, resp.Error) assert.Equal(t, "wait_failed", resp.Error.Code) assert.Contains(t, resp.Error.Message, "process exited with code 7") details, ok := resp.Error.Details.(map[string]any) require.True(t, ok) assert.Equal(t, "exited", details["status"]) assert.Equal(t, float64(7), details["exitCode"]) assert.Equal(t, proc.ID, details["id"]) } func TestProcessProvider_InputAndCloseStdin_Good(t *testing.T) { svc := newTestProcessService(t) proc, err := svc.Start(context.Background(), "cat") require.NoError(t, err) p := processapi.NewProvider(nil, svc, nil) r := setupRouter(p) inputReq := strings.NewReader("{\"input\":\"hello-api\\n\"}") inputHTTPReq, err := http.NewRequest("POST", "/api/process/processes/"+proc.ID+"/input", inputReq) require.NoError(t, err) inputHTTPReq.Header.Set("Content-Type", "application/json") inputResp := httptest.NewRecorder() r.ServeHTTP(inputResp, inputHTTPReq) assert.Equal(t, http.StatusOK, inputResp.Code) closeReq, err := http.NewRequest("POST", "/api/process/processes/"+proc.ID+"/close-stdin", nil) require.NoError(t, err) closeResp := httptest.NewRecorder() r.ServeHTTP(closeResp, closeReq) assert.Equal(t, http.StatusOK, closeResp.Code) select { case <-proc.Done(): case <-time.After(5 * time.Second): t.Fatal("process should have exited after stdin was closed") } assert.Contains(t, proc.Output(), "hello-api") } func TestProcessProvider_KillProcess_Good(t *testing.T) { svc := newTestProcessService(t) proc, err := svc.Start(context.Background(), "sleep", "60") require.NoError(t, err) p := processapi.NewProvider(nil, svc, nil) r := setupRouter(p) w := httptest.NewRecorder() req, err := http.NewRequest("POST", "/api/process/processes/"+proc.ID+"/kill", nil) require.NoError(t, err) r.ServeHTTP(w, req) assert.Equal(t, http.StatusOK, w.Code) var resp goapi.Response[map[string]any] err = json.Unmarshal(w.Body.Bytes(), &resp) require.NoError(t, err) require.True(t, resp.Success) assert.Equal(t, true, resp.Data["killed"]) select { case <-proc.Done(): case <-time.After(5 * time.Second): t.Fatal("process should have been killed") } assert.Equal(t, process.StatusKilled, proc.Status) } func TestProcessProvider_KillProcess_ByPID_Good(t *testing.T) { svc := newTestProcessService(t) p := processapi.NewProvider(nil, svc, nil) r := setupRouter(p) cmd := exec.Command("sleep", "60") require.NoError(t, cmd.Start()) waitCh := make(chan error, 1) go func() { waitCh <- cmd.Wait() }() t.Cleanup(func() { if cmd.ProcessState == nil && cmd.Process != nil { _ = cmd.Process.Kill() } select { case <-waitCh: case <-time.After(2 * time.Second): } }) w := httptest.NewRecorder() req, err := http.NewRequest("POST", "/api/process/processes/"+strconv.Itoa(cmd.Process.Pid)+"/kill", nil) require.NoError(t, err) r.ServeHTTP(w, req) assert.Equal(t, http.StatusOK, w.Code) var resp goapi.Response[map[string]any] err = json.Unmarshal(w.Body.Bytes(), &resp) require.NoError(t, err) require.True(t, resp.Success) assert.Equal(t, true, resp.Data["killed"]) select { case err := <-waitCh: require.Error(t, err) case <-time.After(5 * time.Second): t.Fatal("unmanaged process should have been killed by PID") } } func TestProcessProvider_SignalProcess_Good(t *testing.T) { svc := newTestProcessService(t) proc, err := svc.Start(context.Background(), "sleep", "60") require.NoError(t, err) p := processapi.NewProvider(nil, svc, nil) r := setupRouter(p) w := httptest.NewRecorder() req, err := http.NewRequest("POST", "/api/process/processes/"+proc.ID+"/signal", strings.NewReader(`{"signal":"SIGTERM"}`)) require.NoError(t, err) req.Header.Set("Content-Type", "application/json") r.ServeHTTP(w, req) assert.Equal(t, http.StatusOK, w.Code) var resp goapi.Response[map[string]any] err = json.Unmarshal(w.Body.Bytes(), &resp) require.NoError(t, err) require.True(t, resp.Success) assert.Equal(t, true, resp.Data["signalled"]) select { case <-proc.Done(): case <-time.After(5 * time.Second): t.Fatal("process should have been signalled") } assert.Equal(t, process.StatusKilled, proc.Status) } func TestProcessProvider_SignalProcess_ByPID_Good(t *testing.T) { svc := newTestProcessService(t) p := processapi.NewProvider(nil, svc, nil) r := setupRouter(p) cmd := exec.Command("sleep", "60") require.NoError(t, cmd.Start()) waitCh := make(chan error, 1) go func() { waitCh <- cmd.Wait() }() t.Cleanup(func() { if cmd.ProcessState == nil && cmd.Process != nil { _ = cmd.Process.Kill() } select { case <-waitCh: case <-time.After(2 * time.Second): } }) w := httptest.NewRecorder() req, err := http.NewRequest("POST", "/api/process/processes/"+strconv.Itoa(cmd.Process.Pid)+"/signal", strings.NewReader(`{"signal":"SIGTERM"}`)) require.NoError(t, err) req.Header.Set("Content-Type", "application/json") r.ServeHTTP(w, req) assert.Equal(t, http.StatusOK, w.Code) var resp goapi.Response[map[string]any] err = json.Unmarshal(w.Body.Bytes(), &resp) require.NoError(t, err) require.True(t, resp.Success) assert.Equal(t, true, resp.Data["signalled"]) select { case err := <-waitCh: require.Error(t, err) case <-time.After(5 * time.Second): t.Fatal("unmanaged process should have been signalled by PID") } } func TestProcessProvider_SignalProcess_InvalidSignal_Bad(t *testing.T) { svc := newTestProcessService(t) proc, err := svc.Start(context.Background(), "sleep", "60") require.NoError(t, err) p := processapi.NewProvider(nil, svc, nil) r := setupRouter(p) w := httptest.NewRecorder() req, err := http.NewRequest("POST", "/api/process/processes/"+proc.ID+"/signal", strings.NewReader(`{"signal":"NOPE"}`)) require.NoError(t, err) req.Header.Set("Content-Type", "application/json") r.ServeHTTP(w, req) assert.Equal(t, http.StatusBadRequest, w.Code) assert.True(t, proc.IsRunning()) require.NoError(t, svc.Kill(proc.ID)) <-proc.Done() } func TestProcessProvider_BroadcastsProcessEvents_Good(t *testing.T) { svc := newTestProcessService(t) hub := corews.NewHub() ctx, cancel := context.WithCancel(context.Background()) defer cancel() go hub.Run(ctx) _ = processapi.NewProvider(nil, svc, hub) server := httptest.NewServer(hub.Handler()) defer server.Close() conn := connectWS(t, server.URL) defer conn.Close() require.Eventually(t, func() bool { return hub.ClientCount() == 1 }, time.Second, 10*time.Millisecond) proc, err := svc.Start(context.Background(), "sh", "-c", "echo live-event") require.NoError(t, err) <-proc.Done() events := readWSEvents(t, conn, "process.started", "process.output", "process.exited") started := events["process.started"] require.NotNil(t, started) startedData := started.Data.(map[string]any) assert.Equal(t, proc.ID, startedData["id"]) assert.Equal(t, "sh", startedData["command"]) assert.Equal(t, float64(proc.Info().PID), startedData["pid"]) output := events["process.output"] require.NotNil(t, output) outputData := output.Data.(map[string]any) assert.Equal(t, proc.ID, outputData["id"]) assert.Equal(t, "stdout", outputData["stream"]) assert.Contains(t, outputData["line"], "live-event") exited := events["process.exited"] require.NotNil(t, exited) exitedData := exited.Data.(map[string]any) assert.Equal(t, proc.ID, exitedData["id"]) assert.Equal(t, float64(0), exitedData["exitCode"]) } func TestProcessProvider_BroadcastsKilledEvents_Good(t *testing.T) { svc := newTestProcessService(t) hub := corews.NewHub() ctx, cancel := context.WithCancel(context.Background()) defer cancel() go hub.Run(ctx) _ = processapi.NewProvider(nil, svc, hub) server := httptest.NewServer(hub.Handler()) defer server.Close() conn := connectWS(t, server.URL) defer conn.Close() require.Eventually(t, func() bool { return hub.ClientCount() == 1 }, time.Second, 10*time.Millisecond) proc, err := svc.Start(context.Background(), "sleep", "60") require.NoError(t, err) require.NoError(t, svc.Kill(proc.ID)) select { case <-proc.Done(): case <-time.After(2 * time.Second): t.Fatal("process should have been killed") } events := readWSEvents(t, conn, "process.killed", "process.exited") killed := events["process.killed"] require.NotNil(t, killed) killedData := killed.Data.(map[string]any) assert.Equal(t, proc.ID, killedData["id"]) assert.Equal(t, "SIGKILL", killedData["signal"]) assert.Equal(t, float64(-1), killedData["exitCode"]) exited := events["process.exited"] require.NotNil(t, exited) exitedData := exited.Data.(map[string]any) assert.Equal(t, proc.ID, exitedData["id"]) assert.Equal(t, float64(-1), exitedData["exitCode"]) } func TestProcessProvider_ProcessRoutes_Unavailable(t *testing.T) { p := processapi.NewProvider(nil, nil, nil) r := setupRouter(p) cases := []string{ "/api/process/processes", "/api/process/processes/anything", "/api/process/processes/anything/output", "/api/process/processes/anything/wait", "/api/process/processes/anything/input", "/api/process/processes/anything/close-stdin", "/api/process/processes/anything/kill", } for _, path := range cases { w := httptest.NewRecorder() method := "GET" switch { case strings.HasSuffix(path, "/kill"), strings.HasSuffix(path, "/wait"), strings.HasSuffix(path, "/input"), strings.HasSuffix(path, "/close-stdin"): method = "POST" } req, err := http.NewRequest(method, path, nil) require.NoError(t, err) r.ServeHTTP(w, req) assert.Equal(t, http.StatusServiceUnavailable, w.Code) } } // -- Test helpers ------------------------------------------------------------- func setupRouter(p *processapi.ProcessProvider) *gin.Engine { r := gin.New() rg := r.Group(p.BasePath()) p.RegisterRoutes(rg) return r } // newTestRegistry creates a process.Registry backed by a test directory. func newTestRegistry(dir string) *process.Registry { return process.NewRegistry(dir) } func newTestProcessService(t *testing.T) *process.Service { t.Helper() c := core.New() factory := process.NewService(process.Options{}) raw, err := factory(c) require.NoError(t, err) return raw.(*process.Service) } func connectWS(t *testing.T, serverURL string) *websocket.Conn { t.Helper() wsURL := "ws" + strings.TrimPrefix(serverURL, "http") conn, _, err := websocket.DefaultDialer.Dial(wsURL, nil) require.NoError(t, err) return conn } func readWSEvents(t *testing.T, conn *websocket.Conn, channels ...string) map[string]corews.Message { t.Helper() want := make(map[string]struct{}, len(channels)) for _, channel := range channels { want[channel] = struct{}{} } events := make(map[string]corews.Message, len(channels)) deadline := time.Now().Add(3 * time.Second) for len(events) < len(channels) && time.Now().Before(deadline) { require.NoError(t, conn.SetReadDeadline(time.Now().Add(500*time.Millisecond))) _, payload, err := conn.ReadMessage() require.NoError(t, err) for _, line := range strings.Split(strings.TrimSpace(string(payload)), "\n") { if strings.TrimSpace(line) == "" { continue } var msg corews.Message require.NoError(t, json.Unmarshal([]byte(line), &msg)) if _, ok := want[msg.Channel]; ok { events[msg.Channel] = msg } } } require.Len(t, events, len(channels)) return events }