feat(api): expose managed process routes
Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
parent
eb6a7819e7
commit
90ce26a1b7
3 changed files with 224 additions and 10 deletions
|
|
@ -90,6 +90,9 @@ func (p *ProcessProvider) RegisterRoutes(rg *gin.RouterGroup) {
|
|||
rg.GET("/daemons/:code/:daemon", p.getDaemon)
|
||||
rg.POST("/daemons/:code/:daemon/stop", p.stopDaemon)
|
||||
rg.GET("/daemons/:code/:daemon/health", p.healthCheck)
|
||||
rg.GET("/processes", p.listProcesses)
|
||||
rg.GET("/processes/:id", p.getProcess)
|
||||
rg.POST("/processes/:id/kill", p.killProcess)
|
||||
rg.POST("/pipelines/run", p.runPipeline)
|
||||
}
|
||||
|
||||
|
|
@ -163,6 +166,66 @@ func (p *ProcessProvider) Describe() []api.RouteDescription {
|
|||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Method: "GET",
|
||||
Path: "/processes",
|
||||
Summary: "List managed processes",
|
||||
Description: "Returns the current process service snapshot as serialisable process info entries.",
|
||||
Tags: []string{"process"},
|
||||
Response: map[string]any{
|
||||
"type": "array",
|
||||
"items": map[string]any{
|
||||
"type": "object",
|
||||
"properties": map[string]any{
|
||||
"id": map[string]any{"type": "string"},
|
||||
"command": map[string]any{"type": "string"},
|
||||
"args": map[string]any{"type": "array"},
|
||||
"dir": map[string]any{"type": "string"},
|
||||
"startedAt": map[string]any{"type": "string", "format": "date-time"},
|
||||
"running": map[string]any{"type": "boolean"},
|
||||
"status": map[string]any{"type": "string"},
|
||||
"exitCode": map[string]any{"type": "integer"},
|
||||
"duration": map[string]any{"type": "integer"},
|
||||
"pid": map[string]any{"type": "integer"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Method: "GET",
|
||||
Path: "/processes/:id",
|
||||
Summary: "Get a managed process",
|
||||
Description: "Returns a single managed process by ID as a process info snapshot.",
|
||||
Tags: []string{"process"},
|
||||
Response: map[string]any{
|
||||
"type": "object",
|
||||
"properties": map[string]any{
|
||||
"id": map[string]any{"type": "string"},
|
||||
"command": map[string]any{"type": "string"},
|
||||
"args": map[string]any{"type": "array"},
|
||||
"dir": map[string]any{"type": "string"},
|
||||
"startedAt": map[string]any{"type": "string", "format": "date-time"},
|
||||
"running": map[string]any{"type": "boolean"},
|
||||
"status": map[string]any{"type": "string"},
|
||||
"exitCode": map[string]any{"type": "integer"},
|
||||
"duration": map[string]any{"type": "integer"},
|
||||
"pid": map[string]any{"type": "integer"},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Method: "POST",
|
||||
Path: "/processes/:id/kill",
|
||||
Summary: "Kill a managed process",
|
||||
Description: "Sends SIGKILL to the managed process identified by ID.",
|
||||
Tags: []string{"process"},
|
||||
Response: map[string]any{
|
||||
"type": "object",
|
||||
"properties": map[string]any{
|
||||
"killed": map[string]any{"type": "boolean"},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Method: "POST",
|
||||
Path: "/pipelines/run",
|
||||
|
|
@ -289,6 +352,54 @@ func (p *ProcessProvider) healthCheck(c *gin.Context) {
|
|||
c.JSON(statusCode, api.OK(result))
|
||||
}
|
||||
|
||||
func (p *ProcessProvider) listProcesses(c *gin.Context) {
|
||||
if p.service == nil {
|
||||
c.JSON(http.StatusServiceUnavailable, api.Fail("service_unavailable", "process service is not configured"))
|
||||
return
|
||||
}
|
||||
|
||||
procs := p.service.List()
|
||||
infos := make([]process.Info, 0, len(procs))
|
||||
for _, proc := range procs {
|
||||
infos = append(infos, proc.Info())
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, api.OK(infos))
|
||||
}
|
||||
|
||||
func (p *ProcessProvider) getProcess(c *gin.Context) {
|
||||
if p.service == nil {
|
||||
c.JSON(http.StatusServiceUnavailable, api.Fail("service_unavailable", "process service is not configured"))
|
||||
return
|
||||
}
|
||||
|
||||
proc, err := p.service.Get(c.Param("id"))
|
||||
if err != nil {
|
||||
c.JSON(http.StatusNotFound, api.Fail("not_found", err.Error()))
|
||||
return
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, api.OK(proc.Info()))
|
||||
}
|
||||
|
||||
func (p *ProcessProvider) killProcess(c *gin.Context) {
|
||||
if p.service == nil {
|
||||
c.JSON(http.StatusServiceUnavailable, api.Fail("service_unavailable", "process service is not configured"))
|
||||
return
|
||||
}
|
||||
|
||||
if err := p.service.Kill(c.Param("id")); err != nil {
|
||||
status := http.StatusInternalServerError
|
||||
if err == process.ErrProcessNotFound {
|
||||
status = http.StatusNotFound
|
||||
}
|
||||
c.JSON(status, api.Fail("kill_failed", err.Error()))
|
||||
return
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, api.OK(map[string]any{"killed": true}))
|
||||
}
|
||||
|
||||
type pipelineRunRequest struct {
|
||||
Mode string `json:"mode"`
|
||||
Specs []process.RunSpec `json:"specs"`
|
||||
|
|
|
|||
|
|
@ -3,12 +3,14 @@
|
|||
package api_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"os"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
core "dappco.re/go/core"
|
||||
goapi "dappco.re/go/core/api"
|
||||
|
|
@ -202,6 +204,106 @@ func TestProcessProvider_RunPipeline_Unavailable(t *testing.T) {
|
|||
assert.Equal(t, http.StatusServiceUnavailable, w.Code)
|
||||
}
|
||||
|
||||
func TestProcessProvider_ListProcesses_Good(t *testing.T) {
|
||||
svc := newTestProcessService(t)
|
||||
proc, err := svc.Start(context.Background(), "echo", "hello-api")
|
||||
require.NoError(t, err)
|
||||
<-proc.Done()
|
||||
|
||||
p := processapi.NewProvider(nil, svc, nil)
|
||||
r := setupRouter(p)
|
||||
w := httptest.NewRecorder()
|
||||
|
||||
req, err := http.NewRequest("GET", "/api/process/processes", nil)
|
||||
require.NoError(t, err)
|
||||
r.ServeHTTP(w, req)
|
||||
|
||||
assert.Equal(t, http.StatusOK, w.Code)
|
||||
|
||||
var resp goapi.Response[[]process.Info]
|
||||
err = json.Unmarshal(w.Body.Bytes(), &resp)
|
||||
require.NoError(t, err)
|
||||
require.True(t, resp.Success)
|
||||
require.Len(t, resp.Data, 1)
|
||||
assert.Equal(t, proc.ID, resp.Data[0].ID)
|
||||
assert.Equal(t, "echo", resp.Data[0].Command)
|
||||
}
|
||||
|
||||
func TestProcessProvider_GetProcess_Good(t *testing.T) {
|
||||
svc := newTestProcessService(t)
|
||||
proc, err := svc.Start(context.Background(), "echo", "single")
|
||||
require.NoError(t, err)
|
||||
<-proc.Done()
|
||||
|
||||
p := processapi.NewProvider(nil, svc, nil)
|
||||
r := setupRouter(p)
|
||||
w := httptest.NewRecorder()
|
||||
|
||||
req, err := http.NewRequest("GET", "/api/process/processes/"+proc.ID, nil)
|
||||
require.NoError(t, err)
|
||||
r.ServeHTTP(w, req)
|
||||
|
||||
assert.Equal(t, http.StatusOK, w.Code)
|
||||
|
||||
var resp goapi.Response[process.Info]
|
||||
err = json.Unmarshal(w.Body.Bytes(), &resp)
|
||||
require.NoError(t, err)
|
||||
require.True(t, resp.Success)
|
||||
assert.Equal(t, proc.ID, resp.Data.ID)
|
||||
assert.Equal(t, "echo", resp.Data.Command)
|
||||
}
|
||||
|
||||
func TestProcessProvider_KillProcess_Good(t *testing.T) {
|
||||
svc := newTestProcessService(t)
|
||||
proc, err := svc.Start(context.Background(), "sleep", "60")
|
||||
require.NoError(t, err)
|
||||
|
||||
p := processapi.NewProvider(nil, svc, nil)
|
||||
r := setupRouter(p)
|
||||
w := httptest.NewRecorder()
|
||||
|
||||
req, err := http.NewRequest("POST", "/api/process/processes/"+proc.ID+"/kill", nil)
|
||||
require.NoError(t, err)
|
||||
r.ServeHTTP(w, req)
|
||||
|
||||
assert.Equal(t, http.StatusOK, w.Code)
|
||||
|
||||
var resp goapi.Response[map[string]any]
|
||||
err = json.Unmarshal(w.Body.Bytes(), &resp)
|
||||
require.NoError(t, err)
|
||||
require.True(t, resp.Success)
|
||||
assert.Equal(t, true, resp.Data["killed"])
|
||||
|
||||
select {
|
||||
case <-proc.Done():
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Fatal("process should have been killed")
|
||||
}
|
||||
assert.Equal(t, process.StatusKilled, proc.Status)
|
||||
}
|
||||
|
||||
func TestProcessProvider_ProcessRoutes_Unavailable(t *testing.T) {
|
||||
p := processapi.NewProvider(nil, nil, nil)
|
||||
r := setupRouter(p)
|
||||
|
||||
cases := []string{
|
||||
"/api/process/processes",
|
||||
"/api/process/processes/anything",
|
||||
"/api/process/processes/anything/kill",
|
||||
}
|
||||
|
||||
for _, path := range cases {
|
||||
w := httptest.NewRecorder()
|
||||
req, err := http.NewRequest("GET", path, nil)
|
||||
if strings.HasSuffix(path, "/kill") {
|
||||
req, err = http.NewRequest("POST", path, nil)
|
||||
}
|
||||
require.NoError(t, err)
|
||||
r.ServeHTTP(w, req)
|
||||
assert.Equal(t, http.StatusServiceUnavailable, w.Code)
|
||||
}
|
||||
}
|
||||
|
||||
// -- Test helpers -------------------------------------------------------------
|
||||
|
||||
func setupRouter(p *processapi.ProcessProvider) *gin.Engine {
|
||||
|
|
|
|||
21
process.go
21
process.go
|
|
@ -250,19 +250,20 @@ func (p *Process) Signal(sig os.Signal) error {
|
|||
}
|
||||
|
||||
// Some shells briefly ignore or defer the signal while they are still
|
||||
// initialising child jobs. Retry once after a short delay so the whole
|
||||
// process group is more reliably terminated.
|
||||
// initialising child jobs. Retry a few times after short delays so the
|
||||
// whole process group is more reliably terminated.
|
||||
go func(pid int, sig syscall.Signal, done <-chan struct{}) {
|
||||
timer := time.NewTimer(50 * time.Millisecond)
|
||||
defer timer.Stop()
|
||||
ticker := time.NewTicker(100 * time.Millisecond)
|
||||
defer ticker.Stop()
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
return
|
||||
case <-timer.C:
|
||||
for i := 0; i < 5; i++ {
|
||||
select {
|
||||
case <-done:
|
||||
return
|
||||
case <-ticker.C:
|
||||
_ = syscall.Kill(-pid, sig)
|
||||
}
|
||||
}
|
||||
|
||||
_ = syscall.Kill(-pid, sig)
|
||||
}(cmd.Process.Pid, sysSig, p.done)
|
||||
|
||||
return nil
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue