From 90ce26a1b73ae5c5bcb605c2ad33718598e66dc0 Mon Sep 17 00:00:00 2001 From: Virgil Date: Sat, 4 Apr 2026 01:03:42 +0000 Subject: [PATCH] feat(api): expose managed process routes Co-Authored-By: Virgil --- pkg/api/provider.go | 111 +++++++++++++++++++++++++++++++++++++++ pkg/api/provider_test.go | 102 +++++++++++++++++++++++++++++++++++ process.go | 21 ++++---- 3 files changed, 224 insertions(+), 10 deletions(-) diff --git a/pkg/api/provider.go b/pkg/api/provider.go index ec28250..a3adbcd 100644 --- a/pkg/api/provider.go +++ b/pkg/api/provider.go @@ -90,6 +90,9 @@ func (p *ProcessProvider) RegisterRoutes(rg *gin.RouterGroup) { rg.GET("/daemons/:code/:daemon", p.getDaemon) rg.POST("/daemons/:code/:daemon/stop", p.stopDaemon) rg.GET("/daemons/:code/:daemon/health", p.healthCheck) + rg.GET("/processes", p.listProcesses) + rg.GET("/processes/:id", p.getProcess) + rg.POST("/processes/:id/kill", p.killProcess) rg.POST("/pipelines/run", p.runPipeline) } @@ -163,6 +166,66 @@ func (p *ProcessProvider) Describe() []api.RouteDescription { }, }, }, + { + Method: "GET", + Path: "/processes", + Summary: "List managed processes", + Description: "Returns the current process service snapshot as serialisable process info entries.", + Tags: []string{"process"}, + Response: map[string]any{ + "type": "array", + "items": map[string]any{ + "type": "object", + "properties": map[string]any{ + "id": map[string]any{"type": "string"}, + "command": map[string]any{"type": "string"}, + "args": map[string]any{"type": "array"}, + "dir": map[string]any{"type": "string"}, + "startedAt": map[string]any{"type": "string", "format": "date-time"}, + "running": map[string]any{"type": "boolean"}, + "status": map[string]any{"type": "string"}, + "exitCode": map[string]any{"type": "integer"}, + "duration": map[string]any{"type": "integer"}, + "pid": map[string]any{"type": "integer"}, + }, + }, + }, + }, + { + Method: "GET", + Path: "/processes/:id", + Summary: "Get a managed process", + Description: "Returns a single managed process by ID as a process info snapshot.", + Tags: []string{"process"}, + Response: map[string]any{ + "type": "object", + "properties": map[string]any{ + "id": map[string]any{"type": "string"}, + "command": map[string]any{"type": "string"}, + "args": map[string]any{"type": "array"}, + "dir": map[string]any{"type": "string"}, + "startedAt": map[string]any{"type": "string", "format": "date-time"}, + "running": map[string]any{"type": "boolean"}, + "status": map[string]any{"type": "string"}, + "exitCode": map[string]any{"type": "integer"}, + "duration": map[string]any{"type": "integer"}, + "pid": map[string]any{"type": "integer"}, + }, + }, + }, + { + Method: "POST", + Path: "/processes/:id/kill", + Summary: "Kill a managed process", + Description: "Sends SIGKILL to the managed process identified by ID.", + Tags: []string{"process"}, + Response: map[string]any{ + "type": "object", + "properties": map[string]any{ + "killed": map[string]any{"type": "boolean"}, + }, + }, + }, { Method: "POST", Path: "/pipelines/run", @@ -289,6 +352,54 @@ func (p *ProcessProvider) healthCheck(c *gin.Context) { c.JSON(statusCode, api.OK(result)) } +func (p *ProcessProvider) listProcesses(c *gin.Context) { + if p.service == nil { + c.JSON(http.StatusServiceUnavailable, api.Fail("service_unavailable", "process service is not configured")) + return + } + + procs := p.service.List() + infos := make([]process.Info, 0, len(procs)) + for _, proc := range procs { + infos = append(infos, proc.Info()) + } + + c.JSON(http.StatusOK, api.OK(infos)) +} + +func (p *ProcessProvider) getProcess(c *gin.Context) { + if p.service == nil { + c.JSON(http.StatusServiceUnavailable, api.Fail("service_unavailable", "process service is not configured")) + return + } + + proc, err := p.service.Get(c.Param("id")) + if err != nil { + c.JSON(http.StatusNotFound, api.Fail("not_found", err.Error())) + return + } + + c.JSON(http.StatusOK, api.OK(proc.Info())) +} + +func (p *ProcessProvider) killProcess(c *gin.Context) { + if p.service == nil { + c.JSON(http.StatusServiceUnavailable, api.Fail("service_unavailable", "process service is not configured")) + return + } + + if err := p.service.Kill(c.Param("id")); err != nil { + status := http.StatusInternalServerError + if err == process.ErrProcessNotFound { + status = http.StatusNotFound + } + c.JSON(status, api.Fail("kill_failed", err.Error())) + return + } + + c.JSON(http.StatusOK, api.OK(map[string]any{"killed": true})) +} + type pipelineRunRequest struct { Mode string `json:"mode"` Specs []process.RunSpec `json:"specs"` diff --git a/pkg/api/provider_test.go b/pkg/api/provider_test.go index a2e6f54..34de0af 100644 --- a/pkg/api/provider_test.go +++ b/pkg/api/provider_test.go @@ -3,12 +3,14 @@ package api_test import ( + "context" "encoding/json" "net/http" "net/http/httptest" "os" "strings" "testing" + "time" core "dappco.re/go/core" goapi "dappco.re/go/core/api" @@ -202,6 +204,106 @@ func TestProcessProvider_RunPipeline_Unavailable(t *testing.T) { assert.Equal(t, http.StatusServiceUnavailable, w.Code) } +func TestProcessProvider_ListProcesses_Good(t *testing.T) { + svc := newTestProcessService(t) + proc, err := svc.Start(context.Background(), "echo", "hello-api") + require.NoError(t, err) + <-proc.Done() + + p := processapi.NewProvider(nil, svc, nil) + r := setupRouter(p) + w := httptest.NewRecorder() + + req, err := http.NewRequest("GET", "/api/process/processes", nil) + require.NoError(t, err) + r.ServeHTTP(w, req) + + assert.Equal(t, http.StatusOK, w.Code) + + var resp goapi.Response[[]process.Info] + err = json.Unmarshal(w.Body.Bytes(), &resp) + require.NoError(t, err) + require.True(t, resp.Success) + require.Len(t, resp.Data, 1) + assert.Equal(t, proc.ID, resp.Data[0].ID) + assert.Equal(t, "echo", resp.Data[0].Command) +} + +func TestProcessProvider_GetProcess_Good(t *testing.T) { + svc := newTestProcessService(t) + proc, err := svc.Start(context.Background(), "echo", "single") + require.NoError(t, err) + <-proc.Done() + + p := processapi.NewProvider(nil, svc, nil) + r := setupRouter(p) + w := httptest.NewRecorder() + + req, err := http.NewRequest("GET", "/api/process/processes/"+proc.ID, nil) + require.NoError(t, err) + r.ServeHTTP(w, req) + + assert.Equal(t, http.StatusOK, w.Code) + + var resp goapi.Response[process.Info] + err = json.Unmarshal(w.Body.Bytes(), &resp) + require.NoError(t, err) + require.True(t, resp.Success) + assert.Equal(t, proc.ID, resp.Data.ID) + assert.Equal(t, "echo", resp.Data.Command) +} + +func TestProcessProvider_KillProcess_Good(t *testing.T) { + svc := newTestProcessService(t) + proc, err := svc.Start(context.Background(), "sleep", "60") + require.NoError(t, err) + + p := processapi.NewProvider(nil, svc, nil) + r := setupRouter(p) + w := httptest.NewRecorder() + + req, err := http.NewRequest("POST", "/api/process/processes/"+proc.ID+"/kill", nil) + require.NoError(t, err) + r.ServeHTTP(w, req) + + assert.Equal(t, http.StatusOK, w.Code) + + var resp goapi.Response[map[string]any] + err = json.Unmarshal(w.Body.Bytes(), &resp) + require.NoError(t, err) + require.True(t, resp.Success) + assert.Equal(t, true, resp.Data["killed"]) + + select { + case <-proc.Done(): + case <-time.After(5 * time.Second): + t.Fatal("process should have been killed") + } + assert.Equal(t, process.StatusKilled, proc.Status) +} + +func TestProcessProvider_ProcessRoutes_Unavailable(t *testing.T) { + p := processapi.NewProvider(nil, nil, nil) + r := setupRouter(p) + + cases := []string{ + "/api/process/processes", + "/api/process/processes/anything", + "/api/process/processes/anything/kill", + } + + for _, path := range cases { + w := httptest.NewRecorder() + req, err := http.NewRequest("GET", path, nil) + if strings.HasSuffix(path, "/kill") { + req, err = http.NewRequest("POST", path, nil) + } + require.NoError(t, err) + r.ServeHTTP(w, req) + assert.Equal(t, http.StatusServiceUnavailable, w.Code) + } +} + // -- Test helpers ------------------------------------------------------------- func setupRouter(p *processapi.ProcessProvider) *gin.Engine { diff --git a/process.go b/process.go index 1b2fc65..94ba6f5 100644 --- a/process.go +++ b/process.go @@ -250,19 +250,20 @@ func (p *Process) Signal(sig os.Signal) error { } // Some shells briefly ignore or defer the signal while they are still - // initialising child jobs. Retry once after a short delay so the whole - // process group is more reliably terminated. + // initialising child jobs. Retry a few times after short delays so the + // whole process group is more reliably terminated. go func(pid int, sig syscall.Signal, done <-chan struct{}) { - timer := time.NewTimer(50 * time.Millisecond) - defer timer.Stop() + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() - select { - case <-done: - return - case <-timer.C: + for i := 0; i < 5; i++ { + select { + case <-done: + return + case <-ticker.C: + _ = syscall.Kill(-pid, sig) + } } - - _ = syscall.Kill(-pid, sig) }(cmd.Process.Pid, sysSig, p.done) return nil