10 KiB
| title | description |
|---|---|
| Architecture | Internals of go-process — key types, data flow, and design decisions. |
Architecture
This document explains how go-process is structured, how data flows through
the system, and the role of each major component.
Overview
The package is organised into four layers:
- Service — The Core-integrated service that owns processes and broadcasts events.
- Process — An individual managed process with output capture and lifecycle state.
- Runner — A pipeline orchestrator that runs multiple processes with dependency resolution.
- Daemon — A higher-level abstraction for long-running services with PID files, health checks, and registry integration.
A separate exec/ sub-package provides a thin, fluent wrapper around os/exec
for simple one-shot commands.
Key Types
Status
Process lifecycle is tracked as a state machine:
pending -> running -> exited
-> failed
-> killed
type Status string
const (
StatusPending Status = "pending"
StatusRunning Status = "running"
StatusExited Status = "exited"
StatusFailed Status = "failed"
StatusKilled Status = "killed"
)
- pending — queued but not yet started (currently unused by the service, reserved for future scheduling).
- running — actively executing.
- exited — completed; check
ExitCodefor success (0) or failure. - failed — could not be started (e.g. binary not found).
- killed — terminated by signal or context cancellation.
Service
Service is the central type. It embeds core.ServiceRuntime[Options] to
participate in the Core DI container and implements both Startable and
Stoppable lifecycle interfaces.
type Service struct {
*core.ServiceRuntime[Options]
processes map[string]*Process
mu sync.RWMutex
bufSize int
idCounter atomic.Uint64
}
Key behaviours:
- OnStartup — currently a no-op; reserved for future initialisation.
- OnShutdown — iterates all running processes and calls
Kill()on each, ensuring no orphaned child processes when the application exits. - Process IDs are generated as
proc-Nusing an atomic counter, guaranteeing uniqueness without locks.
Registration
The service is registered with Core via a factory function:
process.NewService(process.Options{BufferSize: 2 * 1024 * 1024})
NewService returns a func(*core.Core) (any, error) closure — the standard
Core service factory signature. The Options struct is captured by the closure
and applied when Core instantiates the service.
Process
Process wraps an os/exec.Cmd with:
- Thread-safe state (
sync.RWMutexguards all mutable fields). - A
RingBufferfor output capture (configurable size, default 1 MB). - A
donechannel that closes when the process exits, enablingselect-based coordination. - Stdin pipe access via
SendInput()andCloseStdin(). - Context-based cancellation — cancelling the context kills the process.
Info Snapshot
Process.Info() returns an Info struct — a serialisable snapshot of the
process state, suitable for JSON APIs or UI display:
type Info struct {
ID string `json:"id"`
Command string `json:"command"`
Args []string `json:"args"`
Dir string `json:"dir"`
StartedAt time.Time `json:"startedAt"`
Status Status `json:"status"`
ExitCode int `json:"exitCode"`
Duration time.Duration `json:"duration"`
PID int `json:"pid"`
}
RingBuffer
A fixed-size circular buffer that overwrites the oldest data when full. Thread-safe for concurrent reads and writes.
rb := process.NewRingBuffer(64 * 1024) // 64 KB
rb.Write([]byte("data"))
fmt.Println(rb.String()) // "data"
fmt.Println(rb.Len()) // 4
fmt.Println(rb.Cap()) // 65536
rb.Reset()
The ring buffer is used internally to capture process stdout and stderr. When a process produces more output than the buffer capacity, the oldest data is silently overwritten. This prevents unbounded memory growth for long-running or verbose processes.
ACTION Messages
Four IPC message types are broadcast through Core.ACTION():
| Type | When | Key Fields |
|---|---|---|
ActionProcessStarted |
Process begins execution | ID, Command, Args, Dir, PID |
ActionProcessOutput |
Each line of stdout/stderr | ID, Line, Stream |
ActionProcessExited |
Process completes | ID, ExitCode, Duration, Error |
ActionProcessKilled |
Process is terminated | ID, Signal |
The Stream type distinguishes stdout from stderr:
type Stream string
const (
StreamStdout Stream = "stdout"
StreamStderr Stream = "stderr"
)
Data Flow
When Service.StartWithOptions() is called:
1. Generate unique ID (atomic counter)
2. Create context with cancel
3. Build os/exec.Cmd with dir, env, pipes
4. Create RingBuffer (unless DisableCapture is set)
5. cmd.Start()
6. Store process in map
7. Broadcast ActionProcessStarted via Core.ACTION
8. Spawn 2 goroutines to stream stdout and stderr
- Each line is written to the RingBuffer
- Each line is broadcast as ActionProcessOutput
9. Spawn 1 goroutine to wait for process exit
- Waits for output goroutines to finish first
- Calls cmd.Wait()
- Updates process status and exit code
- Closes the done channel
- Broadcasts ActionProcessExited
The output streaming goroutines use bufio.Scanner with a 1 MB line buffer
to handle long lines without truncation.
Runner
The Runner orchestrates multiple processes, defined as RunSpec values:
type RunSpec struct {
Name string
Command string
Args []string
Dir string
Env []string
After []string // dependency names
AllowFailure bool
}
Three execution strategies are available:
RunAll (dependency graph)
Processes dependencies in waves. In each wave, all specs whose dependencies
are satisfied run in parallel. If a dependency fails (and AllowFailure is
false), its dependents are skipped. Circular dependencies are detected and
reported as skipped with an error.
Wave 1: [lint, vet] (no dependencies)
Wave 2: [test] (depends on lint, vet)
Wave 3: [build] (depends on test)
RunSequential
Executes specs one after another. Stops on the first failure unless
AllowFailure is set. Remaining specs are marked as skipped.
RunParallel
Runs all specs concurrently, ignoring the After field entirely. Failures
do not affect other specs.
All three strategies return a RunAllResult with aggregate counts:
type RunAllResult struct {
Results []RunResult
Duration time.Duration
Passed int
Failed int
Skipped int
}
Daemon
The Daemon type manages the full lifecycle of a long-running service:
NewDaemon(opts) -> Start() -> Run(ctx) -> Stop()
PID File
PIDFile provides single-instance enforcement. Acquire() writes the current
process PID to a file; if the file already exists and the recorded PID is still
alive (verified via syscall.Signal(0)), it returns an error. Stale PID files
from dead processes are automatically cleaned up.
pid := process.NewPIDFile("/var/run/myapp.pid")
err := pid.Acquire() // writes current PID, fails if another instance is live
defer pid.Release() // removes the file
Health Server
HealthServer exposes two HTTP endpoints:
/health— runs all registeredHealthCheckfunctions. Returns 200 if all pass, 503 if any fail./ready— returns 200 or 503 based on the readiness flag, toggled viaSetReady(bool).
The server binds to a configurable address (use port 0 for ephemeral port
allocation in tests). WaitForHealth() is a polling utility that waits for
/health to return 200 within a timeout.
Registry
Registry tracks running daemons via JSON files in a directory (default:
~/.core/daemons/). Each daemon is a DaemonEntry:
type DaemonEntry struct {
Code string `json:"code"`
Daemon string `json:"daemon"`
PID int `json:"pid"`
Health string `json:"health,omitempty"`
Project string `json:"project,omitempty"`
Binary string `json:"binary,omitempty"`
Started time.Time `json:"started"`
}
The registry automatically prunes entries with dead PIDs on List() and
Get(). When a Daemon is configured with a Registry, it auto-registers
on Start() and auto-unregisters on Stop().
File naming convention: {code}-{daemon}.json (slashes replaced with dashes).
exec Sub-Package
The exec package (forge.lthn.ai/core/go-process/exec) provides a fluent
wrapper around os/exec for simple, one-shot commands that do not need Core
integration:
import "forge.lthn.ai/core/go-process/exec"
// Fluent API
err := exec.Command(ctx, "go", "build", "./...").
WithDir("/path/to/project").
WithEnv([]string{"CGO_ENABLED=0"}).
WithLogger(myLogger).
Run()
// Get output
out, err := exec.Command(ctx, "git", "status").Output()
// Combined stdout + stderr
out, err := exec.Command(ctx, "make").CombinedOutput()
// Quiet mode (suppresses stdout, includes stderr in error)
err := exec.RunQuiet(ctx, "go", "vet", "./...")
Logging
Commands are automatically logged at debug level before execution and at error level on failure. The logger interface is minimal:
type Logger interface {
Debug(msg string, keyvals ...any)
Error(msg string, keyvals ...any)
}
A NopLogger (the default) discards all messages. Use SetDefaultLogger() to
set a package-wide logger, or WithLogger() for per-command overrides.
Thread Safety
All public types are safe for concurrent use:
Service—sync.RWMutexprotects the process map; atomic counter for IDs.Process—sync.RWMutexprotects mutable state.RingBuffer—sync.RWMutexon all read/write operations.PIDFile—sync.Mutexon acquire/release.HealthServer—sync.Mutexon check list and readiness flag.Registry— filesystem-level isolation (one file per daemon).- Global singleton —
atomic.Pointerfor lock-free reads.