feat(process): broadcast provider process ws events
This commit is contained in:
parent
c7542939c7
commit
040500f3e1
2 changed files with 230 additions and 1 deletions
|
|
@ -10,8 +10,11 @@ import (
|
|||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"dappco.re/go/core"
|
||||
"dappco.re/go/core/api"
|
||||
"dappco.re/go/core/api/pkg/provider"
|
||||
process "dappco.re/go/core/process"
|
||||
|
|
@ -27,6 +30,7 @@ type ProcessProvider struct {
|
|||
service *process.Service
|
||||
runner *process.Runner
|
||||
hub *ws.Hub
|
||||
actions sync.Once
|
||||
}
|
||||
|
||||
// compile-time interface checks
|
||||
|
|
@ -54,6 +58,7 @@ func NewProvider(registry *process.Registry, service *process.Service, hub *ws.H
|
|||
if service != nil {
|
||||
p.runner = process.NewRunner(service)
|
||||
}
|
||||
p.registerProcessEvents()
|
||||
return p
|
||||
}
|
||||
|
||||
|
|
@ -486,10 +491,16 @@ func (p *ProcessProvider) emitEvent(channel string, data any) {
|
|||
if p.hub == nil {
|
||||
return
|
||||
}
|
||||
_ = p.hub.SendToChannel(channel, ws.Message{
|
||||
msg := ws.Message{
|
||||
Type: ws.TypeEvent,
|
||||
Data: data,
|
||||
}
|
||||
_ = p.hub.Broadcast(ws.Message{
|
||||
Type: msg.Type,
|
||||
Channel: channel,
|
||||
Data: data,
|
||||
})
|
||||
_ = p.hub.SendToChannel(channel, msg)
|
||||
}
|
||||
|
||||
// PIDAlive checks whether a PID is still running. Exported for use by
|
||||
|
|
@ -510,3 +521,83 @@ func intParam(c *gin.Context, name string) int {
|
|||
v, _ := strconv.Atoi(c.Param(name))
|
||||
return v
|
||||
}
|
||||
|
||||
func (p *ProcessProvider) registerProcessEvents() {
|
||||
if p == nil || p.hub == nil || p.service == nil {
|
||||
return
|
||||
}
|
||||
|
||||
coreApp := p.service.Core()
|
||||
if coreApp == nil {
|
||||
return
|
||||
}
|
||||
|
||||
p.actions.Do(func() {
|
||||
coreApp.RegisterAction(func(_ *core.Core, msg core.Message) core.Result {
|
||||
p.forwardProcessEvent(msg)
|
||||
return core.Result{OK: true}
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func (p *ProcessProvider) forwardProcessEvent(msg core.Message) {
|
||||
switch m := msg.(type) {
|
||||
case process.ActionProcessStarted:
|
||||
payload := p.processEventPayload(m.ID)
|
||||
payload["id"] = m.ID
|
||||
payload["command"] = m.Command
|
||||
payload["args"] = append([]string(nil), m.Args...)
|
||||
payload["dir"] = m.Dir
|
||||
payload["pid"] = m.PID
|
||||
if _, ok := payload["startedAt"]; !ok {
|
||||
payload["startedAt"] = time.Now().UTC()
|
||||
}
|
||||
p.emitEvent("process.started", payload)
|
||||
case process.ActionProcessOutput:
|
||||
p.emitEvent("process.output", map[string]any{
|
||||
"id": m.ID,
|
||||
"line": m.Line,
|
||||
"stream": m.Stream,
|
||||
})
|
||||
case process.ActionProcessExited:
|
||||
payload := p.processEventPayload(m.ID)
|
||||
payload["id"] = m.ID
|
||||
payload["exitCode"] = m.ExitCode
|
||||
payload["duration"] = m.Duration
|
||||
if m.Error != nil {
|
||||
payload["error"] = m.Error.Error()
|
||||
}
|
||||
p.emitEvent("process.exited", payload)
|
||||
case process.ActionProcessKilled:
|
||||
payload := p.processEventPayload(m.ID)
|
||||
payload["id"] = m.ID
|
||||
payload["signal"] = m.Signal
|
||||
payload["exitCode"] = -1
|
||||
p.emitEvent("process.killed", payload)
|
||||
}
|
||||
}
|
||||
|
||||
func (p *ProcessProvider) processEventPayload(id string) map[string]any {
|
||||
if p == nil || p.service == nil || id == "" {
|
||||
return map[string]any{}
|
||||
}
|
||||
|
||||
proc, err := p.service.Get(id)
|
||||
if err != nil {
|
||||
return map[string]any{}
|
||||
}
|
||||
|
||||
info := proc.Info()
|
||||
return map[string]any{
|
||||
"id": info.ID,
|
||||
"command": info.Command,
|
||||
"args": append([]string(nil), info.Args...),
|
||||
"dir": info.Dir,
|
||||
"startedAt": info.StartedAt,
|
||||
"running": info.Running,
|
||||
"status": info.Status,
|
||||
"exitCode": info.ExitCode,
|
||||
"duration": info.Duration,
|
||||
"pid": info.PID,
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -16,7 +16,9 @@ import (
|
|||
goapi "dappco.re/go/core/api"
|
||||
process "dappco.re/go/core/process"
|
||||
processapi "dappco.re/go/core/process/pkg/api"
|
||||
corews "dappco.re/go/core/ws"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/gorilla/websocket"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
|
@ -305,6 +307,98 @@ func TestProcessProvider_KillProcess_Good(t *testing.T) {
|
|||
assert.Equal(t, process.StatusKilled, proc.Status)
|
||||
}
|
||||
|
||||
func TestProcessProvider_BroadcastsProcessEvents_Good(t *testing.T) {
|
||||
svc := newTestProcessService(t)
|
||||
hub := corews.NewHub()
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
go hub.Run(ctx)
|
||||
|
||||
_ = processapi.NewProvider(nil, svc, hub)
|
||||
|
||||
server := httptest.NewServer(hub.Handler())
|
||||
defer server.Close()
|
||||
|
||||
conn := connectWS(t, server.URL)
|
||||
defer conn.Close()
|
||||
|
||||
require.Eventually(t, func() bool {
|
||||
return hub.ClientCount() == 1
|
||||
}, time.Second, 10*time.Millisecond)
|
||||
|
||||
proc, err := svc.Start(context.Background(), "sh", "-c", "echo live-event")
|
||||
require.NoError(t, err)
|
||||
<-proc.Done()
|
||||
|
||||
events := readWSEvents(t, conn, "process.started", "process.output", "process.exited")
|
||||
|
||||
started := events["process.started"]
|
||||
require.NotNil(t, started)
|
||||
startedData := started.Data.(map[string]any)
|
||||
assert.Equal(t, proc.ID, startedData["id"])
|
||||
assert.Equal(t, "sh", startedData["command"])
|
||||
assert.Equal(t, float64(proc.Info().PID), startedData["pid"])
|
||||
|
||||
output := events["process.output"]
|
||||
require.NotNil(t, output)
|
||||
outputData := output.Data.(map[string]any)
|
||||
assert.Equal(t, proc.ID, outputData["id"])
|
||||
assert.Equal(t, "stdout", outputData["stream"])
|
||||
assert.Contains(t, outputData["line"], "live-event")
|
||||
|
||||
exited := events["process.exited"]
|
||||
require.NotNil(t, exited)
|
||||
exitedData := exited.Data.(map[string]any)
|
||||
assert.Equal(t, proc.ID, exitedData["id"])
|
||||
assert.Equal(t, float64(0), exitedData["exitCode"])
|
||||
}
|
||||
|
||||
func TestProcessProvider_BroadcastsKilledEvents_Good(t *testing.T) {
|
||||
svc := newTestProcessService(t)
|
||||
hub := corews.NewHub()
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
go hub.Run(ctx)
|
||||
|
||||
_ = processapi.NewProvider(nil, svc, hub)
|
||||
|
||||
server := httptest.NewServer(hub.Handler())
|
||||
defer server.Close()
|
||||
|
||||
conn := connectWS(t, server.URL)
|
||||
defer conn.Close()
|
||||
|
||||
require.Eventually(t, func() bool {
|
||||
return hub.ClientCount() == 1
|
||||
}, time.Second, 10*time.Millisecond)
|
||||
|
||||
proc, err := svc.Start(context.Background(), "sleep", "60")
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NoError(t, svc.Kill(proc.ID))
|
||||
|
||||
select {
|
||||
case <-proc.Done():
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("process should have been killed")
|
||||
}
|
||||
|
||||
events := readWSEvents(t, conn, "process.killed", "process.exited")
|
||||
|
||||
killed := events["process.killed"]
|
||||
require.NotNil(t, killed)
|
||||
killedData := killed.Data.(map[string]any)
|
||||
assert.Equal(t, proc.ID, killedData["id"])
|
||||
assert.Equal(t, "SIGKILL", killedData["signal"])
|
||||
assert.Equal(t, float64(-1), killedData["exitCode"])
|
||||
|
||||
exited := events["process.exited"]
|
||||
require.NotNil(t, exited)
|
||||
exitedData := exited.Data.(map[string]any)
|
||||
assert.Equal(t, proc.ID, exitedData["id"])
|
||||
assert.Equal(t, float64(-1), exitedData["exitCode"])
|
||||
}
|
||||
|
||||
func TestProcessProvider_ProcessRoutes_Unavailable(t *testing.T) {
|
||||
p := processapi.NewProvider(nil, nil, nil)
|
||||
r := setupRouter(p)
|
||||
|
|
@ -352,3 +446,47 @@ func newTestProcessService(t *testing.T) *process.Service {
|
|||
|
||||
return raw.(*process.Service)
|
||||
}
|
||||
|
||||
func connectWS(t *testing.T, serverURL string) *websocket.Conn {
|
||||
t.Helper()
|
||||
|
||||
wsURL := "ws" + strings.TrimPrefix(serverURL, "http")
|
||||
conn, _, err := websocket.DefaultDialer.Dial(wsURL, nil)
|
||||
require.NoError(t, err)
|
||||
return conn
|
||||
}
|
||||
|
||||
func readWSEvents(t *testing.T, conn *websocket.Conn, channels ...string) map[string]corews.Message {
|
||||
t.Helper()
|
||||
|
||||
want := make(map[string]struct{}, len(channels))
|
||||
for _, channel := range channels {
|
||||
want[channel] = struct{}{}
|
||||
}
|
||||
|
||||
events := make(map[string]corews.Message, len(channels))
|
||||
deadline := time.Now().Add(3 * time.Second)
|
||||
|
||||
for len(events) < len(channels) && time.Now().Before(deadline) {
|
||||
require.NoError(t, conn.SetReadDeadline(time.Now().Add(500*time.Millisecond)))
|
||||
|
||||
_, payload, err := conn.ReadMessage()
|
||||
require.NoError(t, err)
|
||||
|
||||
for _, line := range strings.Split(strings.TrimSpace(string(payload)), "\n") {
|
||||
if strings.TrimSpace(line) == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
var msg corews.Message
|
||||
require.NoError(t, json.Unmarshal([]byte(line), &msg))
|
||||
|
||||
if _, ok := want[msg.Channel]; ok {
|
||||
events[msg.Channel] = msg
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
require.Len(t, events, len(channels))
|
||||
return events
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue