go-process/docs/architecture.md

10 KiB

title description
Architecture Internals of go-process — key types, data flow, and design decisions.

Architecture

This document explains how go-process is structured, how data flows through the system, and the role of each major component.

Overview

The package is organised into four layers:

  1. Service — The Core-integrated service that owns processes and broadcasts events.
  2. Process — An individual managed process with output capture and lifecycle state.
  3. Runner — A pipeline orchestrator that runs multiple processes with dependency resolution.
  4. Daemon — A higher-level abstraction for long-running services with PID files, health checks, and registry integration.

A separate exec/ sub-package provides a thin, fluent wrapper around os/exec for simple one-shot commands.

Key Types

Status

Process lifecycle is tracked as a state machine:

pending -> running -> exited
                   -> failed
                   -> killed
type Status string

const (
    StatusPending Status = "pending"
    StatusRunning Status = "running"
    StatusExited  Status = "exited"
    StatusFailed  Status = "failed"
    StatusKilled  Status = "killed"
)
  • pending — queued but not yet started (currently unused by the service, reserved for future scheduling).
  • running — actively executing.
  • exited — completed; check ExitCode for success (0) or failure.
  • failed — could not be started (e.g. binary not found).
  • killed — terminated by signal or context cancellation.

Service

Service is the central type. It embeds core.ServiceRuntime[Options] to participate in the Core DI container and implements both Startable and Stoppable lifecycle interfaces.

type Service struct {
    *core.ServiceRuntime[Options]
    managed *core.Registry[*ManagedProcess]
    bufSize   int
}

Key behaviours:

  • OnStartup — registers the named Core actions process.run, process.start, process.kill, process.list, and process.get.
  • OnShutdown — iterates all running processes and calls Kill() on each, ensuring no orphaned child processes when the application exits.
  • Process IDs are generated with core.ID() and stored in a Core registry.

Registration

The service is registered with Core via a factory function:

core.New(core.WithService(process.Register))

Register returns core.Result{Value: *Service, OK: true} — the standard Core WithService factory signature used by the v0.8.0 contract.

Process

Process wraps an os/exec.Cmd with:

  • Thread-safe state (sync.RWMutex guards all mutable fields).
  • A RingBuffer for output capture (configurable size, default 1 MB).
  • A done channel that closes when the process exits, enabling select-based coordination.
  • Stdin pipe access via SendInput() and CloseStdin().
  • Context-based cancellation — cancelling the context kills the process.

Info Snapshot

Process.Info() returns an Info struct — a serialisable snapshot of the process state, suitable for JSON APIs or UI display:

type Info struct {
    ID        string        `json:"id"`
    Command   string        `json:"command"`
    Args      []string      `json:"args"`
    Dir       string        `json:"dir"`
    StartedAt time.Time     `json:"startedAt"`
    Status    Status        `json:"status"`
    ExitCode  int           `json:"exitCode"`
    Duration  time.Duration `json:"duration"`
    PID       int           `json:"pid"`
}

RingBuffer

A fixed-size circular buffer that overwrites the oldest data when full. Thread-safe for concurrent reads and writes.

rb := process.NewRingBuffer(64 * 1024) // 64 KB
rb.Write([]byte("data"))
fmt.Println(rb.String()) // "data"
fmt.Println(rb.Len())    // 4
fmt.Println(rb.Cap())    // 65536
rb.Reset()

The ring buffer is used internally to capture process stdout and stderr. When a process produces more output than the buffer capacity, the oldest data is silently overwritten. This prevents unbounded memory growth for long-running or verbose processes.

ACTION Messages

Four IPC message types are broadcast through Core.ACTION():

Type When Key Fields
ActionProcessStarted Process begins execution ID, Command, Args, Dir, PID
ActionProcessOutput Each line of stdout/stderr ID, Line, Stream
ActionProcessExited Process completes ID, ExitCode, Duration, Error
ActionProcessKilled Process is terminated ID, Signal

The Stream type distinguishes stdout from stderr:

type Stream string

const (
    StreamStdout Stream = "stdout"
    StreamStderr Stream = "stderr"
)

Data Flow

When Service.StartWithOptions() is called:

1. Generate a unique ID with `core.ID()`
2. Create context with cancel
3. Build os/exec.Cmd with dir, env, pipes
4. Create RingBuffer (unless DisableCapture is set)
5. cmd.Start()
6. Store process in the Core registry
7. Broadcast ActionProcessStarted via Core.ACTION
8. Spawn 2 goroutines to stream stdout and stderr
   - Each line is written to the RingBuffer
   - Each line is broadcast as ActionProcessOutput
9. Spawn 1 goroutine to wait for process exit
   - Waits for output goroutines to finish first
   - Calls cmd.Wait()
   - Classifies the exit as exited, failed, or killed
   - Closes the done channel
   - Broadcasts ActionProcessKilled when the process died from a signal
   - Broadcasts ActionProcessExited

The output streaming goroutines use bufio.Scanner with a 1 MB line buffer to handle long lines without truncation.

Runner

The Runner orchestrates multiple processes, defined as RunSpec values:

type RunSpec struct {
    Name         string
    Command      string
    Args         []string
    Dir          string
    Env          []string
    After        []string   // dependency names
    AllowFailure bool
}

Three execution strategies are available:

RunAll (dependency graph)

Processes dependencies in waves. In each wave, all specs whose dependencies are satisfied run in parallel. If a dependency fails (and AllowFailure is false), its dependents are skipped. Circular dependencies are detected and reported as skipped with an error.

Wave 1: [lint, vet]        (no dependencies)
Wave 2: [test]             (depends on lint, vet)
Wave 3: [build]            (depends on test)

RunSequential

Executes specs one after another. Stops on the first failure unless AllowFailure is set. Remaining specs are marked as skipped.

RunParallel

Runs all specs concurrently, ignoring the After field entirely. Failures do not affect other specs.

All three strategies return a RunAllResult with aggregate counts:

type RunAllResult struct {
    Results  []RunResult
    Duration time.Duration
    Passed   int
    Failed   int
    Skipped  int
}

Daemon

The Daemon type manages the full lifecycle of a long-running service:

NewDaemon(opts) -> Start() -> Run(ctx) -> Stop()

PID File

PIDFile provides single-instance enforcement. Acquire() writes the current process PID to a file; if the file already exists and the recorded PID is still alive (verified via syscall.Signal(0)), it returns an error. Stale PID files from dead processes are automatically cleaned up.

pid := process.NewPIDFile("/var/run/myapp.pid")
err := pid.Acquire()  // writes current PID, fails if another instance is live
defer pid.Release()   // removes the file

Health Server

HealthServer exposes two HTTP endpoints:

  • /health — runs all registered HealthCheck functions. Returns 200 if all pass, 503 if any fail.
  • /ready — returns 200 or 503 based on the readiness flag, toggled via SetReady(bool).

The server binds to a configurable address (use port 0 for ephemeral port allocation in tests). WaitForHealth() is a polling utility that waits for /health to return 200 within a timeout.

Registry

Registry tracks running daemons via JSON files in a directory (default: ~/.core/daemons/). Each daemon is a DaemonEntry:

type DaemonEntry struct {
    Code    string    `json:"code"`
    Daemon  string    `json:"daemon"`
    PID     int       `json:"pid"`
    Health  string    `json:"health,omitempty"`
    Project string    `json:"project,omitempty"`
    Binary  string    `json:"binary,omitempty"`
    Started time.Time `json:"started"`
}

The registry automatically prunes entries with dead PIDs on List() and Get(). When a Daemon is configured with a Registry, it auto-registers on Start() and auto-unregisters on Stop().

File naming convention: {code}-{daemon}.json (slashes replaced with dashes).

exec Sub-Package

The exec package (dappco.re/go/core/process/exec) provides a fluent wrapper around os/exec for simple, one-shot commands that do not need Core integration:

import "dappco.re/go/core/process/exec"

// Fluent API
err := exec.Command(ctx, "go", "build", "./...").
    WithDir("/path/to/project").
    WithEnv([]string{"CGO_ENABLED=0"}).
    WithLogger(myLogger).
    Run()

// Get output
out, err := exec.Command(ctx, "git", "status").Output()

// Combined stdout + stderr
out, err := exec.Command(ctx, "make").CombinedOutput()

// Quiet mode (suppresses stdout, includes stderr in error)
err := exec.RunQuiet(ctx, "go", "vet", "./...")

Logging

Commands are automatically logged at debug level before execution and at error level on failure. The logger interface is minimal:

type Logger interface {
    Debug(msg string, keyvals ...any)
    Error(msg string, keyvals ...any)
}

A NopLogger (the default) discards all messages. Use SetDefaultLogger() to set a package-wide logger, or WithLogger() for per-command overrides.

Thread Safety

All public types are safe for concurrent use:

  • Servicesync.RWMutex protects the process map; atomic counter for IDs.
  • Processsync.RWMutex protects mutable state.
  • RingBuffersync.RWMutex on all read/write operations.
  • PIDFilesync.Mutex on acquire/release.
  • HealthServersync.Mutex on check list and readiness flag.
  • Registry — filesystem-level isolation (one file per daemon).
  • Global singleton — atomic.Pointer for lock-free reads.