diff --git a/Process-Streaming.-.md b/Process-Streaming.-.md new file mode 100644 index 0000000..974ea01 --- /dev/null +++ b/Process-Streaming.-.md @@ -0,0 +1,222 @@ +# Process Streaming + +The `go-ws` package provides dedicated helpers for streaming process output and status changes over WebSocket. This is the primary use case the package was designed for -- giving web frontends real-time visibility into server-side processes. + +Back to [[Home]] + +## Channel Convention + +Process messages use the channel naming convention `process:{id}`, where `{id}` is your application's unique process identifier. Clients must subscribe to the specific process channel to receive its output. + +``` +Client A subscribes to "process:build-1" +Client B subscribes to "process:build-1" +Client C subscribes to "process:test-5" + +hub.SendProcessOutput("build-1", "compiling...") + → delivered to Client A and Client B only + +hub.SendProcessOutput("test-5", "PASS") + → delivered to Client C only +``` + +## Server-Side API + +### SendProcessOutput + +Streams a single line of process output to all subscribers of the process channel. + +```go +func (h *Hub) SendProcessOutput(processID string, output string) error +``` + +Usage: + +```go +// Stream stdout line by line +scanner := bufio.NewScanner(cmd.Stdout) +for scanner.Scan() { + if err := hub.SendProcessOutput(processID, scanner.Text()); err != nil { + log.Printf("failed to send output: %v", err) + } +} +``` + +This sends a message with: +- `type`: `"process_output"` +- `channel`: `"process:{processID}"` +- `processId`: the process ID +- `data`: the output string + +### SendProcessStatus + +Sends a process lifecycle status change with an exit code. + +```go +func (h *Hub) SendProcessStatus(processID string, status string, exitCode int) error +``` + +Usage: + +```go +// Process started +hub.SendProcessStatus(processID, "running", 0) + +// Process completed successfully +hub.SendProcessStatus(processID, "exited", 0) + +// Process failed +hub.SendProcessStatus(processID, "exited", 1) + +// Process was killed +hub.SendProcessStatus(processID, "killed", -1) +``` + +This sends a message with: +- `type`: `"process_status"` +- `channel`: `"process:{processID}"` +- `processId`: the process ID +- `data`: `{"status": "...", "exitCode": N}` + +## Client-Side Integration + +### JavaScript Example + +```js +const socket = new WebSocket("ws://localhost:8080/ws"); + +const processId = "build-1"; + +socket.onopen = () => { + // Subscribe to process output + socket.send(JSON.stringify({ + type: "subscribe", + data: `process:${processId}` + })); +}; + +socket.onmessage = (event) => { + const msg = JSON.parse(event.data); + + switch (msg.type) { + case "process_output": + // Append output to terminal + terminal.write(msg.data + "\n"); + break; + + case "process_status": + const { status, exitCode } = msg.data; + if (status === "exited") { + terminal.write(`\nProcess exited with code ${exitCode}\n`); + // Unsubscribe now that the process is finished + socket.send(JSON.stringify({ + type: "unsubscribe", + data: `process:${processId}` + })); + } + break; + } +}; +``` + +## Complete Server Example + +A full example showing process execution with WebSocket streaming: + +```go +package main + +import ( + "bufio" + "context" + "net/http" + "os/exec" + + ws "forge.lthn.ai/core/go-ws" +) + +func main() { + hub := ws.NewHub() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go hub.Run(ctx) + + http.HandleFunc("/ws", hub.Handler()) + + http.HandleFunc("/run", func(w http.ResponseWriter, r *http.Request) { + processID := r.URL.Query().Get("id") + if processID == "" { + http.Error(w, "missing id parameter", http.StatusBadRequest) + return + } + + go runProcess(hub, processID) + w.WriteHeader(http.StatusAccepted) + }) + + http.ListenAndServe(":8080", nil) +} + +func runProcess(hub *ws.Hub, id string) { + cmd := exec.Command("go", "build", "./...") + + stdout, _ := cmd.StdoutPipe() + stderr, _ := cmd.StderrPipe() + + hub.SendProcessStatus(id, "running", 0) + + if err := cmd.Start(); err != nil { + hub.SendProcessStatus(id, "failed", -1) + hub.SendError("failed to start process: " + err.Error()) + return + } + + // Stream stdout + go func() { + scanner := bufio.NewScanner(stdout) + for scanner.Scan() { + hub.SendProcessOutput(id, scanner.Text()) + } + }() + + // Stream stderr + go func() { + scanner := bufio.NewScanner(stderr) + for scanner.Scan() { + hub.SendProcessOutput(id, scanner.Text()) + } + }() + + exitCode := 0 + if err := cmd.Wait(); err != nil { + if exitErr, ok := err.(*exec.ExitError); ok { + exitCode = exitErr.ExitCode() + } else { + exitCode = -1 + } + } + + hub.SendProcessStatus(id, "exited", exitCode) +} +``` + +## Monitoring + +Use the Hub's statistics methods to monitor process channel activity: + +```go +// Check if anyone is watching a process +if hub.ChannelSubscriberCount("process:"+processID) == 0 { + // No subscribers -- consider skipping expensive output formatting +} + +// Overall hub health +stats := hub.Stats() +log.Printf("clients=%d channels=%d", stats.Clients, stats.Channels) +``` + +## See Also + +- [[Channel-Subscriptions]] -- How the `process:{id}` channels work under the hood +- [[Message-Types]] -- Wire format for `process_output` and `process_status` +- [[Architecture]] -- Goroutine model and back-pressure handling