go-process/pkg/api/provider_test.go

873 lines
24 KiB
Go

// SPDX-Licence-Identifier: EUPL-1.2
package api_test
import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"os"
"os/exec"
"strconv"
"strings"
"testing"
"time"
core "dappco.re/go/core"
goapi "dappco.re/go/core/api"
process "dappco.re/go/core/process"
processapi "dappco.re/go/core/process/pkg/api"
corews "dappco.re/go/core/ws"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func init() {
gin.SetMode(gin.TestMode)
}
func TestProcessProvider_Name_Good(t *testing.T) {
p := processapi.NewProvider(nil, nil, nil)
assert.Equal(t, "process", p.Name())
}
func TestProcessProvider_BasePath_Good(t *testing.T) {
p := processapi.NewProvider(nil, nil, nil)
assert.Equal(t, "/api/process", p.BasePath())
}
func TestProcessProvider_Channels_Good(t *testing.T) {
p := processapi.NewProvider(nil, nil, nil)
channels := p.Channels()
assert.Contains(t, channels, "process.daemon.started")
assert.Contains(t, channels, "process.daemon.stopped")
assert.Contains(t, channels, "process.daemon.health")
}
func TestProcessProvider_Describe_Good(t *testing.T) {
p := processapi.NewProvider(nil, nil, nil)
descs := p.Describe()
assert.GreaterOrEqual(t, len(descs), 5)
// Verify all descriptions have required fields
for _, d := range descs {
assert.NotEmpty(t, d.Method)
assert.NotEmpty(t, d.Path)
assert.NotEmpty(t, d.Summary)
assert.NotEmpty(t, d.Tags)
}
foundPipelineRoute := false
foundSignalRoute := false
for _, d := range descs {
if d.Method == "POST" && d.Path == "/pipelines/run" {
foundPipelineRoute = true
}
if d.Method == "POST" && d.Path == "/processes/:id/signal" {
foundSignalRoute = true
}
}
assert.True(t, foundPipelineRoute, "pipeline route should be described")
assert.True(t, foundSignalRoute, "signal route should be described")
}
func TestProcessProvider_ListDaemons_Good(t *testing.T) {
// Use a temp directory so the registry has no daemons
dir := t.TempDir()
registry := newTestRegistry(dir)
p := processapi.NewProvider(registry, nil, nil)
r := setupRouter(p)
w := httptest.NewRecorder()
req, _ := http.NewRequest("GET", "/api/process/daemons", nil)
r.ServeHTTP(w, req)
assert.Equal(t, http.StatusOK, w.Code)
var resp goapi.Response[[]any]
err := json.Unmarshal(w.Body.Bytes(), &resp)
require.NoError(t, err)
assert.True(t, resp.Success)
}
func TestProcessProvider_ListDaemons_BroadcastsStarted_Good(t *testing.T) {
dir := t.TempDir()
registry := newTestRegistry(dir)
require.NoError(t, registry.Register(process.DaemonEntry{
Code: "test",
Daemon: "serve",
PID: os.Getpid(),
}))
hub := corews.NewHub()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go hub.Run(ctx)
p := processapi.NewProvider(registry, nil, hub)
server := httptest.NewServer(hub.Handler())
defer server.Close()
conn := connectWS(t, server.URL)
defer conn.Close()
require.Eventually(t, func() bool {
return hub.ClientCount() == 1
}, time.Second, 10*time.Millisecond)
r := setupRouter(p)
w := httptest.NewRecorder()
req, _ := http.NewRequest("GET", "/api/process/daemons", nil)
r.ServeHTTP(w, req)
assert.Equal(t, http.StatusOK, w.Code)
events := readWSEvents(t, conn, "process.daemon.started")
started := events["process.daemon.started"]
require.NotNil(t, started)
startedData := started.Data.(map[string]any)
assert.Equal(t, "test", startedData["code"])
assert.Equal(t, "serve", startedData["daemon"])
assert.Equal(t, float64(os.Getpid()), startedData["pid"])
}
func TestProcessProvider_GetDaemon_Bad(t *testing.T) {
dir := t.TempDir()
registry := newTestRegistry(dir)
p := processapi.NewProvider(registry, nil, nil)
r := setupRouter(p)
w := httptest.NewRecorder()
req, _ := http.NewRequest("GET", "/api/process/daemons/test/nonexistent", nil)
r.ServeHTTP(w, req)
assert.Equal(t, http.StatusNotFound, w.Code)
}
func TestProcessProvider_HealthCheck_Bad(t *testing.T) {
dir := t.TempDir()
registry := newTestRegistry(dir)
healthSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusServiceUnavailable)
_, _ = w.Write([]byte("upstream health check failed"))
}))
defer healthSrv.Close()
hostPort := strings.TrimPrefix(healthSrv.URL, "http://")
require.NoError(t, registry.Register(process.DaemonEntry{
Code: "test",
Daemon: "broken",
PID: os.Getpid(),
Health: hostPort,
}))
p := processapi.NewProvider(registry, nil, nil)
r := setupRouter(p)
w := httptest.NewRecorder()
req, _ := http.NewRequest("GET", "/api/process/daemons/test/broken/health", nil)
r.ServeHTTP(w, req)
assert.Equal(t, http.StatusServiceUnavailable, w.Code)
var resp goapi.Response[map[string]any]
err := json.Unmarshal(w.Body.Bytes(), &resp)
require.NoError(t, err)
require.True(t, resp.Success)
assert.Equal(t, false, resp.Data["healthy"])
assert.Equal(t, hostPort, resp.Data["address"])
assert.Equal(t, "upstream health check failed", resp.Data["reason"])
}
func TestProcessProvider_RegistersAsRouteGroup_Good(t *testing.T) {
p := processapi.NewProvider(nil, nil, nil)
engine, err := goapi.New()
require.NoError(t, err)
engine.Register(p)
assert.Len(t, engine.Groups(), 1)
assert.Equal(t, "process", engine.Groups()[0].Name())
}
func TestProcessProvider_Channels_RegisterAsStreamGroup_Good(t *testing.T) {
p := processapi.NewProvider(nil, nil, nil)
engine, err := goapi.New()
require.NoError(t, err)
engine.Register(p)
// Engine.Channels() discovers StreamGroups
channels := engine.Channels()
assert.Contains(t, channels, "process.daemon.started")
}
func TestProcessProvider_RunPipeline_Good(t *testing.T) {
svc := newTestProcessService(t)
p := processapi.NewProvider(nil, svc, nil)
r := setupRouter(p)
w := httptest.NewRecorder()
body := strings.NewReader(`{
"mode": "parallel",
"specs": [
{"name": "first", "command": "echo", "args": ["1"]},
{"name": "second", "command": "echo", "args": ["2"]}
]
}`)
req, err := http.NewRequest("POST", "/api/process/pipelines/run", body)
require.NoError(t, err)
req.Header.Set("Content-Type", "application/json")
r.ServeHTTP(w, req)
assert.Equal(t, http.StatusOK, w.Code)
var resp goapi.Response[process.RunAllResult]
err = json.Unmarshal(w.Body.Bytes(), &resp)
require.NoError(t, err)
assert.True(t, resp.Success)
assert.Equal(t, 2, resp.Data.Passed)
assert.Len(t, resp.Data.Results, 2)
}
func TestProcessProvider_RunPipeline_Unavailable(t *testing.T) {
p := processapi.NewProvider(nil, nil, nil)
r := setupRouter(p)
w := httptest.NewRecorder()
req, err := http.NewRequest("POST", "/api/process/pipelines/run", strings.NewReader(`{"mode":"all","specs":[]}`))
require.NoError(t, err)
req.Header.Set("Content-Type", "application/json")
r.ServeHTTP(w, req)
assert.Equal(t, http.StatusServiceUnavailable, w.Code)
}
func TestProcessProvider_ListProcesses_Good(t *testing.T) {
svc := newTestProcessService(t)
proc, err := svc.Start(context.Background(), "echo", "hello-api")
require.NoError(t, err)
<-proc.Done()
p := processapi.NewProvider(nil, svc, nil)
r := setupRouter(p)
w := httptest.NewRecorder()
req, err := http.NewRequest("GET", "/api/process/processes", nil)
require.NoError(t, err)
r.ServeHTTP(w, req)
assert.Equal(t, http.StatusOK, w.Code)
var resp goapi.Response[[]process.Info]
err = json.Unmarshal(w.Body.Bytes(), &resp)
require.NoError(t, err)
require.True(t, resp.Success)
require.Len(t, resp.Data, 1)
assert.Equal(t, proc.ID, resp.Data[0].ID)
assert.Equal(t, "echo", resp.Data[0].Command)
}
func TestProcessProvider_ListProcesses_RunningOnly_Good(t *testing.T) {
svc := newTestProcessService(t)
runningProc, err := svc.Start(context.Background(), "sleep", "60")
require.NoError(t, err)
exitedProc, err := svc.Start(context.Background(), "echo", "done")
require.NoError(t, err)
<-exitedProc.Done()
p := processapi.NewProvider(nil, svc, nil)
r := setupRouter(p)
w := httptest.NewRecorder()
req, err := http.NewRequest("GET", "/api/process/processes?runningOnly=true", nil)
require.NoError(t, err)
r.ServeHTTP(w, req)
assert.Equal(t, http.StatusOK, w.Code)
var resp goapi.Response[[]process.Info]
err = json.Unmarshal(w.Body.Bytes(), &resp)
require.NoError(t, err)
require.True(t, resp.Success)
require.Len(t, resp.Data, 1)
assert.Equal(t, runningProc.ID, resp.Data[0].ID)
assert.Equal(t, process.StatusRunning, resp.Data[0].Status)
require.NoError(t, svc.Kill(runningProc.ID))
<-runningProc.Done()
}
func TestProcessProvider_StartProcess_Good(t *testing.T) {
svc := newTestProcessService(t)
p := processapi.NewProvider(nil, svc, nil)
r := setupRouter(p)
body := strings.NewReader(`{
"command": "sleep",
"args": ["60"],
"detach": true,
"killGroup": true
}`)
req, err := http.NewRequest("POST", "/api/process/processes", body)
require.NoError(t, err)
req.Header.Set("Content-Type", "application/json")
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
assert.Equal(t, http.StatusOK, w.Code)
var resp goapi.Response[process.Info]
err = json.Unmarshal(w.Body.Bytes(), &resp)
require.NoError(t, err)
require.True(t, resp.Success)
assert.Equal(t, "sleep", resp.Data.Command)
assert.Equal(t, process.StatusRunning, resp.Data.Status)
assert.True(t, resp.Data.Running)
assert.NotEmpty(t, resp.Data.ID)
managed, err := svc.Get(resp.Data.ID)
require.NoError(t, err)
require.NoError(t, svc.Kill(managed.ID))
select {
case <-managed.Done():
case <-time.After(5 * time.Second):
t.Fatal("process should have been killed after start test")
}
}
func TestProcessProvider_RunProcess_Good(t *testing.T) {
svc := newTestProcessService(t)
p := processapi.NewProvider(nil, svc, nil)
r := setupRouter(p)
body := strings.NewReader(`{
"command": "echo",
"args": ["run-check"]
}`)
req, err := http.NewRequest("POST", "/api/process/processes/run", body)
require.NoError(t, err)
req.Header.Set("Content-Type", "application/json")
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
assert.Equal(t, http.StatusOK, w.Code)
var resp goapi.Response[string]
err = json.Unmarshal(w.Body.Bytes(), &resp)
require.NoError(t, err)
require.True(t, resp.Success)
assert.Contains(t, resp.Data, "run-check")
}
func TestProcessProvider_GetProcess_Good(t *testing.T) {
svc := newTestProcessService(t)
proc, err := svc.Start(context.Background(), "echo", "single")
require.NoError(t, err)
<-proc.Done()
p := processapi.NewProvider(nil, svc, nil)
r := setupRouter(p)
w := httptest.NewRecorder()
req, err := http.NewRequest("GET", "/api/process/processes/"+proc.ID, nil)
require.NoError(t, err)
r.ServeHTTP(w, req)
assert.Equal(t, http.StatusOK, w.Code)
var resp goapi.Response[process.Info]
err = json.Unmarshal(w.Body.Bytes(), &resp)
require.NoError(t, err)
require.True(t, resp.Success)
assert.Equal(t, proc.ID, resp.Data.ID)
assert.Equal(t, "echo", resp.Data.Command)
}
func TestProcessProvider_GetProcessOutput_Good(t *testing.T) {
svc := newTestProcessService(t)
proc, err := svc.Start(context.Background(), "echo", "output-check")
require.NoError(t, err)
<-proc.Done()
p := processapi.NewProvider(nil, svc, nil)
r := setupRouter(p)
w := httptest.NewRecorder()
req, err := http.NewRequest("GET", "/api/process/processes/"+proc.ID+"/output", nil)
require.NoError(t, err)
r.ServeHTTP(w, req)
assert.Equal(t, http.StatusOK, w.Code)
var resp goapi.Response[string]
err = json.Unmarshal(w.Body.Bytes(), &resp)
require.NoError(t, err)
require.True(t, resp.Success)
assert.Contains(t, resp.Data, "output-check")
}
func TestProcessProvider_WaitProcess_Good(t *testing.T) {
svc := newTestProcessService(t)
proc, err := svc.Start(context.Background(), "echo", "wait-check")
require.NoError(t, err)
p := processapi.NewProvider(nil, svc, nil)
r := setupRouter(p)
w := httptest.NewRecorder()
req, err := http.NewRequest("POST", "/api/process/processes/"+proc.ID+"/wait", nil)
require.NoError(t, err)
r.ServeHTTP(w, req)
assert.Equal(t, http.StatusOK, w.Code)
var resp goapi.Response[process.Info]
err = json.Unmarshal(w.Body.Bytes(), &resp)
require.NoError(t, err)
require.True(t, resp.Success)
assert.Equal(t, proc.ID, resp.Data.ID)
assert.Equal(t, process.StatusExited, resp.Data.Status)
assert.Equal(t, 0, resp.Data.ExitCode)
}
func TestProcessProvider_WaitProcess_NonZeroExit_Good(t *testing.T) {
svc := newTestProcessService(t)
proc, err := svc.Start(context.Background(), "sh", "-c", "exit 7")
require.NoError(t, err)
p := processapi.NewProvider(nil, svc, nil)
r := setupRouter(p)
w := httptest.NewRecorder()
req, err := http.NewRequest("POST", "/api/process/processes/"+proc.ID+"/wait", nil)
require.NoError(t, err)
r.ServeHTTP(w, req)
assert.Equal(t, http.StatusConflict, w.Code)
var resp goapi.Response[any]
err = json.Unmarshal(w.Body.Bytes(), &resp)
require.NoError(t, err)
require.False(t, resp.Success)
require.NotNil(t, resp.Error)
assert.Equal(t, "wait_failed", resp.Error.Code)
assert.Contains(t, resp.Error.Message, "process exited with code 7")
details, ok := resp.Error.Details.(map[string]any)
require.True(t, ok)
assert.Equal(t, "exited", details["status"])
assert.Equal(t, float64(7), details["exitCode"])
assert.Equal(t, proc.ID, details["id"])
}
func TestProcessProvider_InputAndCloseStdin_Good(t *testing.T) {
svc := newTestProcessService(t)
proc, err := svc.Start(context.Background(), "cat")
require.NoError(t, err)
p := processapi.NewProvider(nil, svc, nil)
r := setupRouter(p)
inputReq := strings.NewReader("{\"input\":\"hello-api\\n\"}")
inputHTTPReq, err := http.NewRequest("POST", "/api/process/processes/"+proc.ID+"/input", inputReq)
require.NoError(t, err)
inputHTTPReq.Header.Set("Content-Type", "application/json")
inputResp := httptest.NewRecorder()
r.ServeHTTP(inputResp, inputHTTPReq)
assert.Equal(t, http.StatusOK, inputResp.Code)
closeReq, err := http.NewRequest("POST", "/api/process/processes/"+proc.ID+"/close-stdin", nil)
require.NoError(t, err)
closeResp := httptest.NewRecorder()
r.ServeHTTP(closeResp, closeReq)
assert.Equal(t, http.StatusOK, closeResp.Code)
select {
case <-proc.Done():
case <-time.After(5 * time.Second):
t.Fatal("process should have exited after stdin was closed")
}
assert.Contains(t, proc.Output(), "hello-api")
}
func TestProcessProvider_KillProcess_Good(t *testing.T) {
svc := newTestProcessService(t)
proc, err := svc.Start(context.Background(), "sleep", "60")
require.NoError(t, err)
p := processapi.NewProvider(nil, svc, nil)
r := setupRouter(p)
w := httptest.NewRecorder()
req, err := http.NewRequest("POST", "/api/process/processes/"+proc.ID+"/kill", nil)
require.NoError(t, err)
r.ServeHTTP(w, req)
assert.Equal(t, http.StatusOK, w.Code)
var resp goapi.Response[map[string]any]
err = json.Unmarshal(w.Body.Bytes(), &resp)
require.NoError(t, err)
require.True(t, resp.Success)
assert.Equal(t, true, resp.Data["killed"])
select {
case <-proc.Done():
case <-time.After(5 * time.Second):
t.Fatal("process should have been killed")
}
assert.Equal(t, process.StatusKilled, proc.Status)
}
func TestProcessProvider_KillProcess_ByPID_Good(t *testing.T) {
svc := newTestProcessService(t)
p := processapi.NewProvider(nil, svc, nil)
r := setupRouter(p)
cmd := exec.Command("sleep", "60")
require.NoError(t, cmd.Start())
waitCh := make(chan error, 1)
go func() {
waitCh <- cmd.Wait()
}()
t.Cleanup(func() {
if cmd.ProcessState == nil && cmd.Process != nil {
_ = cmd.Process.Kill()
}
select {
case <-waitCh:
case <-time.After(2 * time.Second):
}
})
w := httptest.NewRecorder()
req, err := http.NewRequest("POST", "/api/process/processes/"+strconv.Itoa(cmd.Process.Pid)+"/kill", nil)
require.NoError(t, err)
r.ServeHTTP(w, req)
assert.Equal(t, http.StatusOK, w.Code)
var resp goapi.Response[map[string]any]
err = json.Unmarshal(w.Body.Bytes(), &resp)
require.NoError(t, err)
require.True(t, resp.Success)
assert.Equal(t, true, resp.Data["killed"])
select {
case err := <-waitCh:
require.Error(t, err)
case <-time.After(5 * time.Second):
t.Fatal("unmanaged process should have been killed by PID")
}
}
func TestProcessProvider_SignalProcess_Good(t *testing.T) {
svc := newTestProcessService(t)
proc, err := svc.Start(context.Background(), "sleep", "60")
require.NoError(t, err)
p := processapi.NewProvider(nil, svc, nil)
r := setupRouter(p)
w := httptest.NewRecorder()
req, err := http.NewRequest("POST", "/api/process/processes/"+proc.ID+"/signal", strings.NewReader(`{"signal":"SIGTERM"}`))
require.NoError(t, err)
req.Header.Set("Content-Type", "application/json")
r.ServeHTTP(w, req)
assert.Equal(t, http.StatusOK, w.Code)
var resp goapi.Response[map[string]any]
err = json.Unmarshal(w.Body.Bytes(), &resp)
require.NoError(t, err)
require.True(t, resp.Success)
assert.Equal(t, true, resp.Data["signalled"])
select {
case <-proc.Done():
case <-time.After(5 * time.Second):
t.Fatal("process should have been signalled")
}
assert.Equal(t, process.StatusKilled, proc.Status)
}
func TestProcessProvider_SignalProcess_ByPID_Good(t *testing.T) {
svc := newTestProcessService(t)
p := processapi.NewProvider(nil, svc, nil)
r := setupRouter(p)
cmd := exec.Command("sleep", "60")
require.NoError(t, cmd.Start())
waitCh := make(chan error, 1)
go func() {
waitCh <- cmd.Wait()
}()
t.Cleanup(func() {
if cmd.ProcessState == nil && cmd.Process != nil {
_ = cmd.Process.Kill()
}
select {
case <-waitCh:
case <-time.After(2 * time.Second):
}
})
w := httptest.NewRecorder()
req, err := http.NewRequest("POST", "/api/process/processes/"+strconv.Itoa(cmd.Process.Pid)+"/signal", strings.NewReader(`{"signal":"SIGTERM"}`))
require.NoError(t, err)
req.Header.Set("Content-Type", "application/json")
r.ServeHTTP(w, req)
assert.Equal(t, http.StatusOK, w.Code)
var resp goapi.Response[map[string]any]
err = json.Unmarshal(w.Body.Bytes(), &resp)
require.NoError(t, err)
require.True(t, resp.Success)
assert.Equal(t, true, resp.Data["signalled"])
select {
case err := <-waitCh:
require.Error(t, err)
case <-time.After(5 * time.Second):
t.Fatal("unmanaged process should have been signalled by PID")
}
}
func TestProcessProvider_SignalProcess_InvalidSignal_Bad(t *testing.T) {
svc := newTestProcessService(t)
proc, err := svc.Start(context.Background(), "sleep", "60")
require.NoError(t, err)
p := processapi.NewProvider(nil, svc, nil)
r := setupRouter(p)
w := httptest.NewRecorder()
req, err := http.NewRequest("POST", "/api/process/processes/"+proc.ID+"/signal", strings.NewReader(`{"signal":"NOPE"}`))
require.NoError(t, err)
req.Header.Set("Content-Type", "application/json")
r.ServeHTTP(w, req)
assert.Equal(t, http.StatusBadRequest, w.Code)
assert.True(t, proc.IsRunning())
require.NoError(t, svc.Kill(proc.ID))
<-proc.Done()
}
func TestProcessProvider_BroadcastsProcessEvents_Good(t *testing.T) {
svc := newTestProcessService(t)
hub := corews.NewHub()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go hub.Run(ctx)
_ = processapi.NewProvider(nil, svc, hub)
server := httptest.NewServer(hub.Handler())
defer server.Close()
conn := connectWS(t, server.URL)
defer conn.Close()
require.Eventually(t, func() bool {
return hub.ClientCount() == 1
}, time.Second, 10*time.Millisecond)
proc, err := svc.Start(context.Background(), "sh", "-c", "echo live-event")
require.NoError(t, err)
<-proc.Done()
events := readWSEvents(t, conn, "process.started", "process.output", "process.exited")
started := events["process.started"]
require.NotNil(t, started)
startedData := started.Data.(map[string]any)
assert.Equal(t, proc.ID, startedData["id"])
assert.Equal(t, "sh", startedData["command"])
assert.Equal(t, float64(proc.Info().PID), startedData["pid"])
output := events["process.output"]
require.NotNil(t, output)
outputData := output.Data.(map[string]any)
assert.Equal(t, proc.ID, outputData["id"])
assert.Equal(t, "stdout", outputData["stream"])
assert.Contains(t, outputData["line"], "live-event")
exited := events["process.exited"]
require.NotNil(t, exited)
exitedData := exited.Data.(map[string]any)
assert.Equal(t, proc.ID, exitedData["id"])
assert.Equal(t, float64(0), exitedData["exitCode"])
}
func TestProcessProvider_BroadcastsKilledEvents_Good(t *testing.T) {
svc := newTestProcessService(t)
hub := corews.NewHub()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go hub.Run(ctx)
_ = processapi.NewProvider(nil, svc, hub)
server := httptest.NewServer(hub.Handler())
defer server.Close()
conn := connectWS(t, server.URL)
defer conn.Close()
require.Eventually(t, func() bool {
return hub.ClientCount() == 1
}, time.Second, 10*time.Millisecond)
proc, err := svc.Start(context.Background(), "sleep", "60")
require.NoError(t, err)
require.NoError(t, svc.Kill(proc.ID))
select {
case <-proc.Done():
case <-time.After(2 * time.Second):
t.Fatal("process should have been killed")
}
events := readWSEvents(t, conn, "process.killed", "process.exited")
killed := events["process.killed"]
require.NotNil(t, killed)
killedData := killed.Data.(map[string]any)
assert.Equal(t, proc.ID, killedData["id"])
assert.Equal(t, "SIGKILL", killedData["signal"])
assert.Equal(t, float64(-1), killedData["exitCode"])
exited := events["process.exited"]
require.NotNil(t, exited)
exitedData := exited.Data.(map[string]any)
assert.Equal(t, proc.ID, exitedData["id"])
assert.Equal(t, float64(-1), exitedData["exitCode"])
}
func TestProcessProvider_ProcessRoutes_Unavailable(t *testing.T) {
p := processapi.NewProvider(nil, nil, nil)
r := setupRouter(p)
cases := []string{
"/api/process/processes",
"/api/process/processes/anything",
"/api/process/processes/anything/output",
"/api/process/processes/anything/wait",
"/api/process/processes/anything/input",
"/api/process/processes/anything/close-stdin",
"/api/process/processes/anything/kill",
}
for _, path := range cases {
w := httptest.NewRecorder()
method := "GET"
switch {
case strings.HasSuffix(path, "/kill"),
strings.HasSuffix(path, "/wait"),
strings.HasSuffix(path, "/input"),
strings.HasSuffix(path, "/close-stdin"):
method = "POST"
}
req, err := http.NewRequest(method, path, nil)
require.NoError(t, err)
r.ServeHTTP(w, req)
assert.Equal(t, http.StatusServiceUnavailable, w.Code)
}
}
// -- Test helpers -------------------------------------------------------------
func setupRouter(p *processapi.ProcessProvider) *gin.Engine {
r := gin.New()
rg := r.Group(p.BasePath())
p.RegisterRoutes(rg)
return r
}
// newTestRegistry creates a process.Registry backed by a test directory.
func newTestRegistry(dir string) *process.Registry {
return process.NewRegistry(dir)
}
func newTestProcessService(t *testing.T) *process.Service {
t.Helper()
c := core.New()
factory := process.NewService(process.Options{})
raw, err := factory(c)
require.NoError(t, err)
return raw.(*process.Service)
}
func connectWS(t *testing.T, serverURL string) *websocket.Conn {
t.Helper()
wsURL := "ws" + strings.TrimPrefix(serverURL, "http")
conn, _, err := websocket.DefaultDialer.Dial(wsURL, nil)
require.NoError(t, err)
return conn
}
func readWSEvents(t *testing.T, conn *websocket.Conn, channels ...string) map[string]corews.Message {
t.Helper()
want := make(map[string]struct{}, len(channels))
for _, channel := range channels {
want[channel] = struct{}{}
}
events := make(map[string]corews.Message, len(channels))
deadline := time.Now().Add(3 * time.Second)
for len(events) < len(channels) && time.Now().Before(deadline) {
require.NoError(t, conn.SetReadDeadline(time.Now().Add(500*time.Millisecond)))
_, payload, err := conn.ReadMessage()
require.NoError(t, err)
for _, line := range strings.Split(strings.TrimSpace(string(payload)), "\n") {
if strings.TrimSpace(line) == "" {
continue
}
var msg corews.Message
require.NoError(t, json.Unmarshal([]byte(line), &msg))
if _, ok := want[msg.Channel]; ok {
events[msg.Channel] = msg
}
}
}
require.Len(t, events, len(channels))
return events
}