feat(api): add process signal endpoint
This commit is contained in:
parent
cf9291d095
commit
f9537fb24d
2 changed files with 146 additions and 1 deletions
|
|
@ -17,6 +17,7 @@ import (
|
|||
"dappco.re/go/core"
|
||||
"dappco.re/go/core/api"
|
||||
"dappco.re/go/core/api/pkg/provider"
|
||||
coreerr "dappco.re/go/core/log"
|
||||
process "dappco.re/go/core/process"
|
||||
"dappco.re/go/core/ws"
|
||||
"github.com/gin-gonic/gin"
|
||||
|
|
@ -102,6 +103,7 @@ func (p *ProcessProvider) RegisterRoutes(rg *gin.RouterGroup) {
|
|||
rg.POST("/processes/:id/input", p.inputProcess)
|
||||
rg.POST("/processes/:id/close-stdin", p.closeProcessStdin)
|
||||
rg.POST("/processes/:id/kill", p.killProcess)
|
||||
rg.POST("/processes/:id/signal", p.signalProcess)
|
||||
rg.POST("/pipelines/run", p.runPipeline)
|
||||
}
|
||||
|
||||
|
|
@ -300,6 +302,26 @@ func (p *ProcessProvider) Describe() []api.RouteDescription {
|
|||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Method: "POST",
|
||||
Path: "/processes/:id/signal",
|
||||
Summary: "Signal a managed process",
|
||||
Description: "Sends a Unix signal to the managed process identified by ID.",
|
||||
Tags: []string{"process"},
|
||||
RequestBody: map[string]any{
|
||||
"type": "object",
|
||||
"properties": map[string]any{
|
||||
"signal": map[string]any{"type": "string"},
|
||||
},
|
||||
"required": []string{"signal"},
|
||||
},
|
||||
Response: map[string]any{
|
||||
"type": "object",
|
||||
"properties": map[string]any{
|
||||
"signalled": map[string]any{"type": "boolean"},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Method: "POST",
|
||||
Path: "/pipelines/run",
|
||||
|
|
@ -565,6 +587,40 @@ func (p *ProcessProvider) killProcess(c *gin.Context) {
|
|||
c.JSON(http.StatusOK, api.OK(map[string]any{"killed": true}))
|
||||
}
|
||||
|
||||
type processSignalRequest struct {
|
||||
Signal string `json:"signal"`
|
||||
}
|
||||
|
||||
func (p *ProcessProvider) signalProcess(c *gin.Context) {
|
||||
if p.service == nil {
|
||||
c.JSON(http.StatusServiceUnavailable, api.Fail("service_unavailable", "process service is not configured"))
|
||||
return
|
||||
}
|
||||
|
||||
var req processSignalRequest
|
||||
if err := c.ShouldBindJSON(&req); err != nil {
|
||||
c.JSON(http.StatusBadRequest, api.Fail("invalid_request", err.Error()))
|
||||
return
|
||||
}
|
||||
|
||||
sig, err := parseSignal(req.Signal)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusBadRequest, api.Fail("invalid_signal", err.Error()))
|
||||
return
|
||||
}
|
||||
|
||||
if err := p.service.Signal(c.Param("id"), sig); err != nil {
|
||||
status := http.StatusInternalServerError
|
||||
if err == process.ErrProcessNotFound || err == process.ErrProcessNotRunning {
|
||||
status = http.StatusNotFound
|
||||
}
|
||||
c.JSON(status, api.Fail("signal_failed", err.Error()))
|
||||
return
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, api.OK(map[string]any{"signalled": true}))
|
||||
}
|
||||
|
||||
type pipelineRunRequest struct {
|
||||
Mode string `json:"mode"`
|
||||
Specs []process.RunSpec `json:"specs"`
|
||||
|
|
@ -664,6 +720,40 @@ func intParam(c *gin.Context, name string) int {
|
|||
return v
|
||||
}
|
||||
|
||||
func parseSignal(value string) (syscall.Signal, error) {
|
||||
trimmed := strings.TrimSpace(strings.ToUpper(value))
|
||||
if trimmed == "" {
|
||||
return 0, coreerr.E("ProcessProvider.parseSignal", "signal is required", nil)
|
||||
}
|
||||
|
||||
if n, err := strconv.Atoi(trimmed); err == nil {
|
||||
return syscall.Signal(n), nil
|
||||
}
|
||||
|
||||
switch trimmed {
|
||||
case "SIGTERM", "TERM":
|
||||
return syscall.SIGTERM, nil
|
||||
case "SIGKILL", "KILL":
|
||||
return syscall.SIGKILL, nil
|
||||
case "SIGINT", "INT":
|
||||
return syscall.SIGINT, nil
|
||||
case "SIGQUIT", "QUIT":
|
||||
return syscall.SIGQUIT, nil
|
||||
case "SIGHUP", "HUP":
|
||||
return syscall.SIGHUP, nil
|
||||
case "SIGSTOP", "STOP":
|
||||
return syscall.SIGSTOP, nil
|
||||
case "SIGCONT", "CONT":
|
||||
return syscall.SIGCONT, nil
|
||||
case "SIGUSR1", "USR1":
|
||||
return syscall.SIGUSR1, nil
|
||||
case "SIGUSR2", "USR2":
|
||||
return syscall.SIGUSR2, nil
|
||||
default:
|
||||
return 0, coreerr.E("ProcessProvider.parseSignal", "unsupported signal", nil)
|
||||
}
|
||||
}
|
||||
|
||||
func (p *ProcessProvider) registerProcessEvents() {
|
||||
if p == nil || p.hub == nil || p.service == nil {
|
||||
return
|
||||
|
|
|
|||
|
|
@ -59,13 +59,17 @@ func TestProcessProvider_Describe_Good(t *testing.T) {
|
|||
}
|
||||
|
||||
foundPipelineRoute := false
|
||||
foundSignalRoute := false
|
||||
for _, d := range descs {
|
||||
if d.Method == "POST" && d.Path == "/pipelines/run" {
|
||||
foundPipelineRoute = true
|
||||
break
|
||||
}
|
||||
if d.Method == "POST" && d.Path == "/processes/:id/signal" {
|
||||
foundSignalRoute = true
|
||||
}
|
||||
}
|
||||
assert.True(t, foundPipelineRoute, "pipeline route should be described")
|
||||
assert.True(t, foundSignalRoute, "signal route should be described")
|
||||
}
|
||||
|
||||
func TestProcessProvider_ListDaemons_Good(t *testing.T) {
|
||||
|
|
@ -438,6 +442,57 @@ func TestProcessProvider_KillProcess_Good(t *testing.T) {
|
|||
assert.Equal(t, process.StatusKilled, proc.Status)
|
||||
}
|
||||
|
||||
func TestProcessProvider_SignalProcess_Good(t *testing.T) {
|
||||
svc := newTestProcessService(t)
|
||||
proc, err := svc.Start(context.Background(), "sleep", "60")
|
||||
require.NoError(t, err)
|
||||
|
||||
p := processapi.NewProvider(nil, svc, nil)
|
||||
r := setupRouter(p)
|
||||
w := httptest.NewRecorder()
|
||||
|
||||
req, err := http.NewRequest("POST", "/api/process/processes/"+proc.ID+"/signal", strings.NewReader(`{"signal":"SIGTERM"}`))
|
||||
require.NoError(t, err)
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
r.ServeHTTP(w, req)
|
||||
|
||||
assert.Equal(t, http.StatusOK, w.Code)
|
||||
|
||||
var resp goapi.Response[map[string]any]
|
||||
err = json.Unmarshal(w.Body.Bytes(), &resp)
|
||||
require.NoError(t, err)
|
||||
require.True(t, resp.Success)
|
||||
assert.Equal(t, true, resp.Data["signalled"])
|
||||
|
||||
select {
|
||||
case <-proc.Done():
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Fatal("process should have been signalled")
|
||||
}
|
||||
assert.Equal(t, process.StatusKilled, proc.Status)
|
||||
}
|
||||
|
||||
func TestProcessProvider_SignalProcess_InvalidSignal_Bad(t *testing.T) {
|
||||
svc := newTestProcessService(t)
|
||||
proc, err := svc.Start(context.Background(), "sleep", "60")
|
||||
require.NoError(t, err)
|
||||
|
||||
p := processapi.NewProvider(nil, svc, nil)
|
||||
r := setupRouter(p)
|
||||
w := httptest.NewRecorder()
|
||||
|
||||
req, err := http.NewRequest("POST", "/api/process/processes/"+proc.ID+"/signal", strings.NewReader(`{"signal":"NOPE"}`))
|
||||
require.NoError(t, err)
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
r.ServeHTTP(w, req)
|
||||
|
||||
assert.Equal(t, http.StatusBadRequest, w.Code)
|
||||
assert.True(t, proc.IsRunning())
|
||||
|
||||
require.NoError(t, svc.Kill(proc.ID))
|
||||
<-proc.Done()
|
||||
}
|
||||
|
||||
func TestProcessProvider_BroadcastsProcessEvents_Good(t *testing.T) {
|
||||
svc := newTestProcessService(t)
|
||||
hub := corews.NewHub()
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue