Add "Process-Streaming"

Virgil 2026-02-19 16:57:11 +00:00
parent def31d9c4f
commit 4530dfc59d

222
Process-Streaming.-.md Normal file

@ -0,0 +1,222 @@
# Process Streaming
The `go-ws` package provides dedicated helpers for streaming process output and status changes over WebSocket. This is the primary use case the package was designed for -- giving web frontends real-time visibility into server-side processes.
Back to [[Home]]
## Channel Convention
Process messages use the channel naming convention `process:{id}`, where `{id}` is your application's unique process identifier. Clients must subscribe to the specific process channel to receive its output.
```
Client A subscribes to "process:build-1"
Client B subscribes to "process:build-1"
Client C subscribes to "process:test-5"
hub.SendProcessOutput("build-1", "compiling...")
→ delivered to Client A and Client B only
hub.SendProcessOutput("test-5", "PASS")
→ delivered to Client C only
```
## Server-Side API
### SendProcessOutput
Streams a single line of process output to all subscribers of the process channel.
```go
func (h *Hub) SendProcessOutput(processID string, output string) error
```
Usage:
```go
// Stream stdout line by line
scanner := bufio.NewScanner(cmd.Stdout)
for scanner.Scan() {
if err := hub.SendProcessOutput(processID, scanner.Text()); err != nil {
log.Printf("failed to send output: %v", err)
}
}
```
This sends a message with:
- `type`: `"process_output"`
- `channel`: `"process:{processID}"`
- `processId`: the process ID
- `data`: the output string
### SendProcessStatus
Sends a process lifecycle status change with an exit code.
```go
func (h *Hub) SendProcessStatus(processID string, status string, exitCode int) error
```
Usage:
```go
// Process started
hub.SendProcessStatus(processID, "running", 0)
// Process completed successfully
hub.SendProcessStatus(processID, "exited", 0)
// Process failed
hub.SendProcessStatus(processID, "exited", 1)
// Process was killed
hub.SendProcessStatus(processID, "killed", -1)
```
This sends a message with:
- `type`: `"process_status"`
- `channel`: `"process:{processID}"`
- `processId`: the process ID
- `data`: `{"status": "...", "exitCode": N}`
## Client-Side Integration
### JavaScript Example
```js
const socket = new WebSocket("ws://localhost:8080/ws");
const processId = "build-1";
socket.onopen = () => {
// Subscribe to process output
socket.send(JSON.stringify({
type: "subscribe",
data: `process:${processId}`
}));
};
socket.onmessage = (event) => {
const msg = JSON.parse(event.data);
switch (msg.type) {
case "process_output":
// Append output to terminal
terminal.write(msg.data + "\n");
break;
case "process_status":
const { status, exitCode } = msg.data;
if (status === "exited") {
terminal.write(`\nProcess exited with code ${exitCode}\n`);
// Unsubscribe now that the process is finished
socket.send(JSON.stringify({
type: "unsubscribe",
data: `process:${processId}`
}));
}
break;
}
};
```
## Complete Server Example
A full example showing process execution with WebSocket streaming:
```go
package main
import (
"bufio"
"context"
"net/http"
"os/exec"
ws "forge.lthn.ai/core/go-ws"
)
func main() {
hub := ws.NewHub()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go hub.Run(ctx)
http.HandleFunc("/ws", hub.Handler())
http.HandleFunc("/run", func(w http.ResponseWriter, r *http.Request) {
processID := r.URL.Query().Get("id")
if processID == "" {
http.Error(w, "missing id parameter", http.StatusBadRequest)
return
}
go runProcess(hub, processID)
w.WriteHeader(http.StatusAccepted)
})
http.ListenAndServe(":8080", nil)
}
func runProcess(hub *ws.Hub, id string) {
cmd := exec.Command("go", "build", "./...")
stdout, _ := cmd.StdoutPipe()
stderr, _ := cmd.StderrPipe()
hub.SendProcessStatus(id, "running", 0)
if err := cmd.Start(); err != nil {
hub.SendProcessStatus(id, "failed", -1)
hub.SendError("failed to start process: " + err.Error())
return
}
// Stream stdout
go func() {
scanner := bufio.NewScanner(stdout)
for scanner.Scan() {
hub.SendProcessOutput(id, scanner.Text())
}
}()
// Stream stderr
go func() {
scanner := bufio.NewScanner(stderr)
for scanner.Scan() {
hub.SendProcessOutput(id, scanner.Text())
}
}()
exitCode := 0
if err := cmd.Wait(); err != nil {
if exitErr, ok := err.(*exec.ExitError); ok {
exitCode = exitErr.ExitCode()
} else {
exitCode = -1
}
}
hub.SendProcessStatus(id, "exited", exitCode)
}
```
## Monitoring
Use the Hub's statistics methods to monitor process channel activity:
```go
// Check if anyone is watching a process
if hub.ChannelSubscriberCount("process:"+processID) == 0 {
// No subscribers -- consider skipping expensive output formatting
}
// Overall hub health
stats := hub.Stats()
log.Printf("clients=%d channels=%d", stats.Clients, stats.Channels)
```
## See Also
- [[Channel-Subscriptions]] -- How the `process:{id}` channels work under the hood
- [[Message-Types]] -- Wire format for `process_output` and `process_status`
- [[Architecture]] -- Goroutine model and back-pressure handling