348 lines
10 KiB
Markdown
348 lines
10 KiB
Markdown
---
|
|
title: Architecture
|
|
description: Internals of go-process — key types, data flow, and design decisions.
|
|
---
|
|
|
|
# Architecture
|
|
|
|
This document explains how `go-process` is structured, how data flows through
|
|
the system, and the role of each major component.
|
|
|
|
## Overview
|
|
|
|
The package is organised into four layers:
|
|
|
|
1. **Service** — The Core-integrated service that owns processes and broadcasts events.
|
|
2. **Process** — An individual managed process with output capture and lifecycle state.
|
|
3. **Runner** — A pipeline orchestrator that runs multiple processes with dependency resolution.
|
|
4. **Daemon** — A higher-level abstraction for long-running services with PID files, health checks, and registry integration.
|
|
|
|
A separate `exec/` sub-package provides a thin, fluent wrapper around `os/exec`
|
|
for simple one-shot commands.
|
|
|
|
## Key Types
|
|
|
|
### Status
|
|
|
|
Process lifecycle is tracked as a state machine:
|
|
|
|
```
|
|
pending -> running -> exited
|
|
-> failed
|
|
-> killed
|
|
```
|
|
|
|
```go
|
|
type Status string
|
|
|
|
const (
|
|
StatusPending Status = "pending"
|
|
StatusRunning Status = "running"
|
|
StatusExited Status = "exited"
|
|
StatusFailed Status = "failed"
|
|
StatusKilled Status = "killed"
|
|
)
|
|
```
|
|
|
|
- **pending** — queued but not yet started (currently unused by the service,
|
|
reserved for future scheduling).
|
|
- **running** — actively executing.
|
|
- **exited** — completed; check `ExitCode` for success (0) or failure.
|
|
- **failed** — could not be started (e.g. binary not found).
|
|
- **killed** — terminated by signal or context cancellation.
|
|
|
|
### Service
|
|
|
|
`Service` is the central type. It embeds `core.ServiceRuntime[Options]` to
|
|
participate in the Core DI container and implements both `Startable` and
|
|
`Stoppable` lifecycle interfaces.
|
|
|
|
```go
|
|
type Service struct {
|
|
*core.ServiceRuntime[Options]
|
|
processes map[string]*Process
|
|
mu sync.RWMutex
|
|
bufSize int
|
|
idCounter atomic.Uint64
|
|
}
|
|
```
|
|
|
|
Key behaviours:
|
|
|
|
- **OnStartup** — currently a no-op; reserved for future initialisation.
|
|
- **OnShutdown** — iterates all running processes and calls `Kill()` on each,
|
|
ensuring no orphaned child processes when the application exits.
|
|
- Process IDs are generated as `proc-N` using an atomic counter, guaranteeing
|
|
uniqueness without locks.
|
|
|
|
#### Registration
|
|
|
|
The service is registered with Core via a factory function:
|
|
|
|
```go
|
|
process.NewService(process.Options{BufferSize: 2 * 1024 * 1024})
|
|
```
|
|
|
|
`NewService` returns a `func(*core.Core) (any, error)` closure — the standard
|
|
Core service factory signature. The `Options` struct is captured by the closure
|
|
and applied when Core instantiates the service.
|
|
|
|
### Process
|
|
|
|
`Process` wraps an `os/exec.Cmd` with:
|
|
|
|
- Thread-safe state (`sync.RWMutex` guards all mutable fields).
|
|
- A `RingBuffer` for output capture (configurable size, default 1 MB).
|
|
- A `done` channel that closes when the process exits, enabling `select`-based
|
|
coordination.
|
|
- Stdin pipe access via `SendInput()` and `CloseStdin()`.
|
|
- Context-based cancellation — cancelling the context kills the process.
|
|
|
|
#### Info Snapshot
|
|
|
|
`Process.Info()` returns an `Info` struct — a serialisable snapshot of the
|
|
process state, suitable for JSON APIs or UI display:
|
|
|
|
```go
|
|
type Info struct {
|
|
ID string `json:"id"`
|
|
Command string `json:"command"`
|
|
Args []string `json:"args"`
|
|
Dir string `json:"dir"`
|
|
StartedAt time.Time `json:"startedAt"`
|
|
Status Status `json:"status"`
|
|
ExitCode int `json:"exitCode"`
|
|
Duration time.Duration `json:"duration"`
|
|
PID int `json:"pid"`
|
|
}
|
|
```
|
|
|
|
### RingBuffer
|
|
|
|
A fixed-size circular buffer that overwrites the oldest data when full.
|
|
Thread-safe for concurrent reads and writes.
|
|
|
|
```go
|
|
rb := process.NewRingBuffer(64 * 1024) // 64 KB
|
|
rb.Write([]byte("data"))
|
|
fmt.Println(rb.String()) // "data"
|
|
fmt.Println(rb.Len()) // 4
|
|
fmt.Println(rb.Cap()) // 65536
|
|
rb.Reset()
|
|
```
|
|
|
|
The ring buffer is used internally to capture process stdout and stderr. When
|
|
a process produces more output than the buffer capacity, the oldest data is
|
|
silently overwritten. This prevents unbounded memory growth for long-running
|
|
or verbose processes.
|
|
|
|
### ACTION Messages
|
|
|
|
Four IPC message types are broadcast through `Core.ACTION()`:
|
|
|
|
| Type | When | Key Fields |
|
|
|------|------|------------|
|
|
| `ActionProcessStarted` | Process begins execution | `ID`, `Command`, `Args`, `Dir`, `PID` |
|
|
| `ActionProcessOutput` | Each line of stdout/stderr | `ID`, `Line`, `Stream` |
|
|
| `ActionProcessExited` | Process completes | `ID`, `ExitCode`, `Duration`, `Error` |
|
|
| `ActionProcessKilled` | Process is terminated | `ID`, `Signal` |
|
|
|
|
The `Stream` type distinguishes stdout from stderr:
|
|
|
|
```go
|
|
type Stream string
|
|
|
|
const (
|
|
StreamStdout Stream = "stdout"
|
|
StreamStderr Stream = "stderr"
|
|
)
|
|
```
|
|
|
|
## Data Flow
|
|
|
|
When `Service.StartWithOptions()` is called:
|
|
|
|
```
|
|
1. Generate unique ID (atomic counter)
|
|
2. Create context with cancel
|
|
3. Build os/exec.Cmd with dir, env, pipes
|
|
4. Create RingBuffer (unless DisableCapture is set)
|
|
5. cmd.Start()
|
|
6. Store process in map
|
|
7. Broadcast ActionProcessStarted via Core.ACTION
|
|
8. Spawn 2 goroutines to stream stdout and stderr
|
|
- Each line is written to the RingBuffer
|
|
- Each line is broadcast as ActionProcessOutput
|
|
9. Spawn 1 goroutine to wait for process exit
|
|
- Waits for output goroutines to finish first
|
|
- Calls cmd.Wait()
|
|
- Updates process status and exit code
|
|
- Closes the done channel
|
|
- Broadcasts ActionProcessExited
|
|
```
|
|
|
|
The output streaming goroutines use `bufio.Scanner` with a 1 MB line buffer
|
|
to handle long lines without truncation.
|
|
|
|
## Runner
|
|
|
|
The `Runner` orchestrates multiple processes, defined as `RunSpec` values:
|
|
|
|
```go
|
|
type RunSpec struct {
|
|
Name string
|
|
Command string
|
|
Args []string
|
|
Dir string
|
|
Env []string
|
|
After []string // dependency names
|
|
AllowFailure bool
|
|
}
|
|
```
|
|
|
|
Three execution strategies are available:
|
|
|
|
### RunAll (dependency graph)
|
|
|
|
Processes dependencies in waves. In each wave, all specs whose dependencies
|
|
are satisfied run in parallel. If a dependency fails (and `AllowFailure` is
|
|
false), its dependents are skipped. Circular dependencies are detected and
|
|
reported as skipped with an error.
|
|
|
|
```
|
|
Wave 1: [lint, vet] (no dependencies)
|
|
Wave 2: [test] (depends on lint, vet)
|
|
Wave 3: [build] (depends on test)
|
|
```
|
|
|
|
### RunSequential
|
|
|
|
Executes specs one after another. Stops on the first failure unless
|
|
`AllowFailure` is set. Remaining specs are marked as skipped.
|
|
|
|
### RunParallel
|
|
|
|
Runs all specs concurrently, ignoring the `After` field entirely. Failures
|
|
do not affect other specs.
|
|
|
|
All three strategies return a `RunAllResult` with aggregate counts:
|
|
|
|
```go
|
|
type RunAllResult struct {
|
|
Results []RunResult
|
|
Duration time.Duration
|
|
Passed int
|
|
Failed int
|
|
Skipped int
|
|
}
|
|
```
|
|
|
|
## Daemon
|
|
|
|
The `Daemon` type manages the full lifecycle of a long-running service:
|
|
|
|
```
|
|
NewDaemon(opts) -> Start() -> Run(ctx) -> Stop()
|
|
```
|
|
|
|
### PID File
|
|
|
|
`PIDFile` provides single-instance enforcement. `Acquire()` writes the current
|
|
process PID to a file; if the file already exists and the recorded PID is still
|
|
alive (verified via `syscall.Signal(0)`), it returns an error. Stale PID files
|
|
from dead processes are automatically cleaned up.
|
|
|
|
```go
|
|
pid := process.NewPIDFile("/var/run/myapp.pid")
|
|
err := pid.Acquire() // writes current PID, fails if another instance is live
|
|
defer pid.Release() // removes the file
|
|
```
|
|
|
|
### Health Server
|
|
|
|
`HealthServer` exposes two HTTP endpoints:
|
|
|
|
- **`/health`** — runs all registered `HealthCheck` functions. Returns 200 if
|
|
all pass, 503 if any fail.
|
|
- **`/ready`** — returns 200 or 503 based on the readiness flag, toggled via
|
|
`SetReady(bool)`.
|
|
|
|
The server binds to a configurable address (use port `0` for ephemeral port
|
|
allocation in tests). `WaitForHealth()` is a polling utility that waits for
|
|
`/health` to return 200 within a timeout.
|
|
|
|
### Registry
|
|
|
|
`Registry` tracks running daemons via JSON files in a directory (default:
|
|
`~/.core/daemons/`). Each daemon is a `DaemonEntry`:
|
|
|
|
```go
|
|
type DaemonEntry struct {
|
|
Code string `json:"code"`
|
|
Daemon string `json:"daemon"`
|
|
PID int `json:"pid"`
|
|
Health string `json:"health,omitempty"`
|
|
Project string `json:"project,omitempty"`
|
|
Binary string `json:"binary,omitempty"`
|
|
Started time.Time `json:"started"`
|
|
}
|
|
```
|
|
|
|
The registry automatically prunes entries with dead PIDs on `List()` and
|
|
`Get()`. When a `Daemon` is configured with a `Registry`, it auto-registers
|
|
on `Start()` and auto-unregisters on `Stop()`.
|
|
|
|
File naming convention: `{code}-{daemon}.json` (slashes replaced with dashes).
|
|
|
|
## exec Sub-Package
|
|
|
|
The `exec` package (`forge.lthn.ai/core/go-process/exec`) provides a fluent
|
|
wrapper around `os/exec` for simple, one-shot commands that do not need Core
|
|
integration:
|
|
|
|
```go
|
|
import "forge.lthn.ai/core/go-process/exec"
|
|
|
|
// Fluent API
|
|
err := exec.Command(ctx, "go", "build", "./...").
|
|
WithDir("/path/to/project").
|
|
WithEnv([]string{"CGO_ENABLED=0"}).
|
|
WithLogger(myLogger).
|
|
Run()
|
|
|
|
// Get output
|
|
out, err := exec.Command(ctx, "git", "status").Output()
|
|
|
|
// Combined stdout + stderr
|
|
out, err := exec.Command(ctx, "make").CombinedOutput()
|
|
|
|
// Quiet mode (suppresses stdout, includes stderr in error)
|
|
err := exec.RunQuiet(ctx, "go", "vet", "./...")
|
|
```
|
|
|
|
### Logging
|
|
|
|
Commands are automatically logged at debug level before execution and at error
|
|
level on failure. The logger interface is minimal:
|
|
|
|
```go
|
|
type Logger interface {
|
|
Debug(msg string, keyvals ...any)
|
|
Error(msg string, keyvals ...any)
|
|
}
|
|
```
|
|
|
|
A `NopLogger` (the default) discards all messages. Use `SetDefaultLogger()` to
|
|
set a package-wide logger, or `WithLogger()` for per-command overrides.
|
|
|
|
## Thread Safety
|
|
|
|
All public types are safe for concurrent use:
|
|
|
|
- `Service` — `sync.RWMutex` protects the process map; atomic counter for IDs.
|
|
- `Process` — `sync.RWMutex` protects mutable state.
|
|
- `RingBuffer` — `sync.RWMutex` on all read/write operations.
|
|
- `PIDFile` — `sync.Mutex` on acquire/release.
|
|
- `HealthServer` — `sync.Mutex` on check list and readiness flag.
|
|
- `Registry` — filesystem-level isolation (one file per daemon).
|
|
- Global singleton — `atomic.Pointer` for lock-free reads.
|