Process Streaming
The go-ws package provides dedicated helpers for streaming process output and status changes over WebSocket. This is the primary use case the package was designed for -- giving web frontends real-time visibility into server-side processes.
Back to Home
Channel Convention
Process messages use the channel naming convention process:{id}, where {id} is your application's unique process identifier. Clients must subscribe to the specific process channel to receive its output.
Client A subscribes to "process:build-1"
Client B subscribes to "process:build-1"
Client C subscribes to "process:test-5"
hub.SendProcessOutput("build-1", "compiling...")
→ delivered to Client A and Client B only
hub.SendProcessOutput("test-5", "PASS")
→ delivered to Client C only
Server-Side API
SendProcessOutput
Streams a single line of process output to all subscribers of the process channel.
func (h *Hub) SendProcessOutput(processID string, output string) error
Usage:
// Stream stdout line by line
scanner := bufio.NewScanner(cmd.Stdout)
for scanner.Scan() {
if err := hub.SendProcessOutput(processID, scanner.Text()); err != nil {
log.Printf("failed to send output: %v", err)
}
}
This sends a message with:
type:"process_output"channel:"process:{processID}"processId: the process IDdata: the output string
SendProcessStatus
Sends a process lifecycle status change with an exit code.
func (h *Hub) SendProcessStatus(processID string, status string, exitCode int) error
Usage:
// Process started
hub.SendProcessStatus(processID, "running", 0)
// Process completed successfully
hub.SendProcessStatus(processID, "exited", 0)
// Process failed
hub.SendProcessStatus(processID, "exited", 1)
// Process was killed
hub.SendProcessStatus(processID, "killed", -1)
This sends a message with:
type:"process_status"channel:"process:{processID}"processId: the process IDdata:{"status": "...", "exitCode": N}
Client-Side Integration
JavaScript Example
const socket = new WebSocket("ws://localhost:8080/ws");
const processId = "build-1";
socket.onopen = () => {
// Subscribe to process output
socket.send(JSON.stringify({
type: "subscribe",
data: `process:${processId}`
}));
};
socket.onmessage = (event) => {
const msg = JSON.parse(event.data);
switch (msg.type) {
case "process_output":
// Append output to terminal
terminal.write(msg.data + "\n");
break;
case "process_status":
const { status, exitCode } = msg.data;
if (status === "exited") {
terminal.write(`\nProcess exited with code ${exitCode}\n`);
// Unsubscribe now that the process is finished
socket.send(JSON.stringify({
type: "unsubscribe",
data: `process:${processId}`
}));
}
break;
}
};
Complete Server Example
A full example showing process execution with WebSocket streaming:
package main
import (
"bufio"
"context"
"net/http"
"os/exec"
ws "forge.lthn.ai/core/go-ws"
)
func main() {
hub := ws.NewHub()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go hub.Run(ctx)
http.HandleFunc("/ws", hub.Handler())
http.HandleFunc("/run", func(w http.ResponseWriter, r *http.Request) {
processID := r.URL.Query().Get("id")
if processID == "" {
http.Error(w, "missing id parameter", http.StatusBadRequest)
return
}
go runProcess(hub, processID)
w.WriteHeader(http.StatusAccepted)
})
http.ListenAndServe(":8080", nil)
}
func runProcess(hub *ws.Hub, id string) {
cmd := exec.Command("go", "build", "./...")
stdout, _ := cmd.StdoutPipe()
stderr, _ := cmd.StderrPipe()
hub.SendProcessStatus(id, "running", 0)
if err := cmd.Start(); err != nil {
hub.SendProcessStatus(id, "failed", -1)
hub.SendError("failed to start process: " + err.Error())
return
}
// Stream stdout
go func() {
scanner := bufio.NewScanner(stdout)
for scanner.Scan() {
hub.SendProcessOutput(id, scanner.Text())
}
}()
// Stream stderr
go func() {
scanner := bufio.NewScanner(stderr)
for scanner.Scan() {
hub.SendProcessOutput(id, scanner.Text())
}
}()
exitCode := 0
if err := cmd.Wait(); err != nil {
if exitErr, ok := err.(*exec.ExitError); ok {
exitCode = exitErr.ExitCode()
} else {
exitCode = -1
}
}
hub.SendProcessStatus(id, "exited", exitCode)
}
Monitoring
Use the Hub's statistics methods to monitor process channel activity:
// Check if anyone is watching a process
if hub.ChannelSubscriberCount("process:"+processID) == 0 {
// No subscribers -- consider skipping expensive output formatting
}
// Overall hub health
stats := hub.Stats()
log.Printf("clients=%d channels=%d", stats.Clients, stats.Channels)
See Also
- Channel-Subscriptions -- How the
process:{id}channels work under the hood - Message-Types -- Wire format for
process_outputandprocess_status - Architecture -- Goroutine model and back-pressure handling