From 040500f3e1c641b183290d58c468ce3251f7f08a Mon Sep 17 00:00:00 2001 From: Virgil Date: Sat, 4 Apr 2026 06:09:01 +0000 Subject: [PATCH] feat(process): broadcast provider process ws events --- pkg/api/provider.go | 93 +++++++++++++++++++++++++- pkg/api/provider_test.go | 138 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 230 insertions(+), 1 deletion(-) diff --git a/pkg/api/provider.go b/pkg/api/provider.go index 59c01e7..7f4e2ad 100644 --- a/pkg/api/provider.go +++ b/pkg/api/provider.go @@ -10,8 +10,11 @@ import ( "os" "strconv" "strings" + "sync" "syscall" + "time" + "dappco.re/go/core" "dappco.re/go/core/api" "dappco.re/go/core/api/pkg/provider" process "dappco.re/go/core/process" @@ -27,6 +30,7 @@ type ProcessProvider struct { service *process.Service runner *process.Runner hub *ws.Hub + actions sync.Once } // compile-time interface checks @@ -54,6 +58,7 @@ func NewProvider(registry *process.Registry, service *process.Service, hub *ws.H if service != nil { p.runner = process.NewRunner(service) } + p.registerProcessEvents() return p } @@ -486,10 +491,16 @@ func (p *ProcessProvider) emitEvent(channel string, data any) { if p.hub == nil { return } - _ = p.hub.SendToChannel(channel, ws.Message{ + msg := ws.Message{ Type: ws.TypeEvent, Data: data, + } + _ = p.hub.Broadcast(ws.Message{ + Type: msg.Type, + Channel: channel, + Data: data, }) + _ = p.hub.SendToChannel(channel, msg) } // PIDAlive checks whether a PID is still running. Exported for use by @@ -510,3 +521,83 @@ func intParam(c *gin.Context, name string) int { v, _ := strconv.Atoi(c.Param(name)) return v } + +func (p *ProcessProvider) registerProcessEvents() { + if p == nil || p.hub == nil || p.service == nil { + return + } + + coreApp := p.service.Core() + if coreApp == nil { + return + } + + p.actions.Do(func() { + coreApp.RegisterAction(func(_ *core.Core, msg core.Message) core.Result { + p.forwardProcessEvent(msg) + return core.Result{OK: true} + }) + }) +} + +func (p *ProcessProvider) forwardProcessEvent(msg core.Message) { + switch m := msg.(type) { + case process.ActionProcessStarted: + payload := p.processEventPayload(m.ID) + payload["id"] = m.ID + payload["command"] = m.Command + payload["args"] = append([]string(nil), m.Args...) + payload["dir"] = m.Dir + payload["pid"] = m.PID + if _, ok := payload["startedAt"]; !ok { + payload["startedAt"] = time.Now().UTC() + } + p.emitEvent("process.started", payload) + case process.ActionProcessOutput: + p.emitEvent("process.output", map[string]any{ + "id": m.ID, + "line": m.Line, + "stream": m.Stream, + }) + case process.ActionProcessExited: + payload := p.processEventPayload(m.ID) + payload["id"] = m.ID + payload["exitCode"] = m.ExitCode + payload["duration"] = m.Duration + if m.Error != nil { + payload["error"] = m.Error.Error() + } + p.emitEvent("process.exited", payload) + case process.ActionProcessKilled: + payload := p.processEventPayload(m.ID) + payload["id"] = m.ID + payload["signal"] = m.Signal + payload["exitCode"] = -1 + p.emitEvent("process.killed", payload) + } +} + +func (p *ProcessProvider) processEventPayload(id string) map[string]any { + if p == nil || p.service == nil || id == "" { + return map[string]any{} + } + + proc, err := p.service.Get(id) + if err != nil { + return map[string]any{} + } + + info := proc.Info() + return map[string]any{ + "id": info.ID, + "command": info.Command, + "args": append([]string(nil), info.Args...), + "dir": info.Dir, + "startedAt": info.StartedAt, + "running": info.Running, + "status": info.Status, + "exitCode": info.ExitCode, + "duration": info.Duration, + "pid": info.PID, + } +} diff --git a/pkg/api/provider_test.go b/pkg/api/provider_test.go index 4787372..c8ee7c7 100644 --- a/pkg/api/provider_test.go +++ b/pkg/api/provider_test.go @@ -16,7 +16,9 @@ import ( goapi "dappco.re/go/core/api" process "dappco.re/go/core/process" processapi "dappco.re/go/core/process/pkg/api" + corews "dappco.re/go/core/ws" "github.com/gin-gonic/gin" + "github.com/gorilla/websocket" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -305,6 +307,98 @@ func TestProcessProvider_KillProcess_Good(t *testing.T) { assert.Equal(t, process.StatusKilled, proc.Status) } +func TestProcessProvider_BroadcastsProcessEvents_Good(t *testing.T) { + svc := newTestProcessService(t) + hub := corews.NewHub() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go hub.Run(ctx) + + _ = processapi.NewProvider(nil, svc, hub) + + server := httptest.NewServer(hub.Handler()) + defer server.Close() + + conn := connectWS(t, server.URL) + defer conn.Close() + + require.Eventually(t, func() bool { + return hub.ClientCount() == 1 + }, time.Second, 10*time.Millisecond) + + proc, err := svc.Start(context.Background(), "sh", "-c", "echo live-event") + require.NoError(t, err) + <-proc.Done() + + events := readWSEvents(t, conn, "process.started", "process.output", "process.exited") + + started := events["process.started"] + require.NotNil(t, started) + startedData := started.Data.(map[string]any) + assert.Equal(t, proc.ID, startedData["id"]) + assert.Equal(t, "sh", startedData["command"]) + assert.Equal(t, float64(proc.Info().PID), startedData["pid"]) + + output := events["process.output"] + require.NotNil(t, output) + outputData := output.Data.(map[string]any) + assert.Equal(t, proc.ID, outputData["id"]) + assert.Equal(t, "stdout", outputData["stream"]) + assert.Contains(t, outputData["line"], "live-event") + + exited := events["process.exited"] + require.NotNil(t, exited) + exitedData := exited.Data.(map[string]any) + assert.Equal(t, proc.ID, exitedData["id"]) + assert.Equal(t, float64(0), exitedData["exitCode"]) +} + +func TestProcessProvider_BroadcastsKilledEvents_Good(t *testing.T) { + svc := newTestProcessService(t) + hub := corews.NewHub() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go hub.Run(ctx) + + _ = processapi.NewProvider(nil, svc, hub) + + server := httptest.NewServer(hub.Handler()) + defer server.Close() + + conn := connectWS(t, server.URL) + defer conn.Close() + + require.Eventually(t, func() bool { + return hub.ClientCount() == 1 + }, time.Second, 10*time.Millisecond) + + proc, err := svc.Start(context.Background(), "sleep", "60") + require.NoError(t, err) + + require.NoError(t, svc.Kill(proc.ID)) + + select { + case <-proc.Done(): + case <-time.After(2 * time.Second): + t.Fatal("process should have been killed") + } + + events := readWSEvents(t, conn, "process.killed", "process.exited") + + killed := events["process.killed"] + require.NotNil(t, killed) + killedData := killed.Data.(map[string]any) + assert.Equal(t, proc.ID, killedData["id"]) + assert.Equal(t, "SIGKILL", killedData["signal"]) + assert.Equal(t, float64(-1), killedData["exitCode"]) + + exited := events["process.exited"] + require.NotNil(t, exited) + exitedData := exited.Data.(map[string]any) + assert.Equal(t, proc.ID, exitedData["id"]) + assert.Equal(t, float64(-1), exitedData["exitCode"]) +} + func TestProcessProvider_ProcessRoutes_Unavailable(t *testing.T) { p := processapi.NewProvider(nil, nil, nil) r := setupRouter(p) @@ -352,3 +446,47 @@ func newTestProcessService(t *testing.T) *process.Service { return raw.(*process.Service) } + +func connectWS(t *testing.T, serverURL string) *websocket.Conn { + t.Helper() + + wsURL := "ws" + strings.TrimPrefix(serverURL, "http") + conn, _, err := websocket.DefaultDialer.Dial(wsURL, nil) + require.NoError(t, err) + return conn +} + +func readWSEvents(t *testing.T, conn *websocket.Conn, channels ...string) map[string]corews.Message { + t.Helper() + + want := make(map[string]struct{}, len(channels)) + for _, channel := range channels { + want[channel] = struct{}{} + } + + events := make(map[string]corews.Message, len(channels)) + deadline := time.Now().Add(3 * time.Second) + + for len(events) < len(channels) && time.Now().Before(deadline) { + require.NoError(t, conn.SetReadDeadline(time.Now().Add(500*time.Millisecond))) + + _, payload, err := conn.ReadMessage() + require.NoError(t, err) + + for _, line := range strings.Split(strings.TrimSpace(string(payload)), "\n") { + if strings.TrimSpace(line) == "" { + continue + } + + var msg corews.Message + require.NoError(t, json.Unmarshal([]byte(line), &msg)) + + if _, ok := want[msg.Channel]; ok { + events[msg.Channel] = msg + } + } + } + + require.Len(t, events, len(channels)) + return events +}