3 Process-Streaming
Virgil edited this page 2026-02-19 16:57:11 +00:00

Process Streaming

The go-ws package provides dedicated helpers for streaming process output and status changes over WebSocket. This is the primary use case the package was designed for -- giving web frontends real-time visibility into server-side processes.

Back to Home

Channel Convention

Process messages use the channel naming convention process:{id}, where {id} is your application's unique process identifier. Clients must subscribe to the specific process channel to receive its output.

Client A subscribes to "process:build-1"
Client B subscribes to "process:build-1"
Client C subscribes to "process:test-5"

hub.SendProcessOutput("build-1", "compiling...")
  → delivered to Client A and Client B only

hub.SendProcessOutput("test-5", "PASS")
  → delivered to Client C only

Server-Side API

SendProcessOutput

Streams a single line of process output to all subscribers of the process channel.

func (h *Hub) SendProcessOutput(processID string, output string) error

Usage:

// Stream stdout line by line
scanner := bufio.NewScanner(cmd.Stdout)
for scanner.Scan() {
    if err := hub.SendProcessOutput(processID, scanner.Text()); err != nil {
        log.Printf("failed to send output: %v", err)
    }
}

This sends a message with:

  • type: "process_output"
  • channel: "process:{processID}"
  • processId: the process ID
  • data: the output string

SendProcessStatus

Sends a process lifecycle status change with an exit code.

func (h *Hub) SendProcessStatus(processID string, status string, exitCode int) error

Usage:

// Process started
hub.SendProcessStatus(processID, "running", 0)

// Process completed successfully
hub.SendProcessStatus(processID, "exited", 0)

// Process failed
hub.SendProcessStatus(processID, "exited", 1)

// Process was killed
hub.SendProcessStatus(processID, "killed", -1)

This sends a message with:

  • type: "process_status"
  • channel: "process:{processID}"
  • processId: the process ID
  • data: {"status": "...", "exitCode": N}

Client-Side Integration

JavaScript Example

const socket = new WebSocket("ws://localhost:8080/ws");

const processId = "build-1";

socket.onopen = () => {
  // Subscribe to process output
  socket.send(JSON.stringify({
    type: "subscribe",
    data: `process:${processId}`
  }));
};

socket.onmessage = (event) => {
  const msg = JSON.parse(event.data);

  switch (msg.type) {
    case "process_output":
      // Append output to terminal
      terminal.write(msg.data + "\n");
      break;

    case "process_status":
      const { status, exitCode } = msg.data;
      if (status === "exited") {
        terminal.write(`\nProcess exited with code ${exitCode}\n`);
        // Unsubscribe now that the process is finished
        socket.send(JSON.stringify({
          type: "unsubscribe",
          data: `process:${processId}`
        }));
      }
      break;
  }
};

Complete Server Example

A full example showing process execution with WebSocket streaming:

package main

import (
    "bufio"
    "context"
    "net/http"
    "os/exec"

    ws "forge.lthn.ai/core/go-ws"
)

func main() {
    hub := ws.NewHub()
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()
    go hub.Run(ctx)

    http.HandleFunc("/ws", hub.Handler())

    http.HandleFunc("/run", func(w http.ResponseWriter, r *http.Request) {
        processID := r.URL.Query().Get("id")
        if processID == "" {
            http.Error(w, "missing id parameter", http.StatusBadRequest)
            return
        }

        go runProcess(hub, processID)
        w.WriteHeader(http.StatusAccepted)
    })

    http.ListenAndServe(":8080", nil)
}

func runProcess(hub *ws.Hub, id string) {
    cmd := exec.Command("go", "build", "./...")

    stdout, _ := cmd.StdoutPipe()
    stderr, _ := cmd.StderrPipe()

    hub.SendProcessStatus(id, "running", 0)

    if err := cmd.Start(); err != nil {
        hub.SendProcessStatus(id, "failed", -1)
        hub.SendError("failed to start process: " + err.Error())
        return
    }

    // Stream stdout
    go func() {
        scanner := bufio.NewScanner(stdout)
        for scanner.Scan() {
            hub.SendProcessOutput(id, scanner.Text())
        }
    }()

    // Stream stderr
    go func() {
        scanner := bufio.NewScanner(stderr)
        for scanner.Scan() {
            hub.SendProcessOutput(id, scanner.Text())
        }
    }()

    exitCode := 0
    if err := cmd.Wait(); err != nil {
        if exitErr, ok := err.(*exec.ExitError); ok {
            exitCode = exitErr.ExitCode()
        } else {
            exitCode = -1
        }
    }

    hub.SendProcessStatus(id, "exited", exitCode)
}

Monitoring

Use the Hub's statistics methods to monitor process channel activity:

// Check if anyone is watching a process
if hub.ChannelSubscriberCount("process:"+processID) == 0 {
    // No subscribers -- consider skipping expensive output formatting
}

// Overall hub health
stats := hub.Stats()
log.Printf("clients=%d channels=%d", stats.Clients, stats.Channels)

See Also