feat(process): add readiness accessors and AX examples

Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
Virgil 2026-04-04 00:39:27 +00:00
parent eeca66240a
commit cdc8bfe502
15 changed files with 488 additions and 19 deletions

View file

@ -6,6 +6,10 @@ import "time"
// TaskProcessRun requests synchronous command execution through Core.PERFORM.
// The handler returns the combined command output on success.
//
// Example:
//
// c.PERFORM(process.TaskProcessRun{Command: "echo", Args: []string{"hello"}})
type TaskProcessRun struct {
Command string
Args []string
@ -24,6 +28,10 @@ type TaskProcessRun struct {
}
// TaskProcessKill requests termination of a managed process by ID or PID.
//
// Example:
//
// c.PERFORM(process.TaskProcessKill{ID: "proc-1"})
type TaskProcessKill struct {
// ID identifies a managed process started by this service.
ID string
@ -32,6 +40,10 @@ type TaskProcessKill struct {
}
// ActionProcessStarted is broadcast when a process begins execution.
//
// Example:
//
// case process.ActionProcessStarted: fmt.Println("started", msg.ID)
type ActionProcessStarted struct {
ID string
Command string
@ -42,6 +54,10 @@ type ActionProcessStarted struct {
// ActionProcessOutput is broadcast for each line of output.
// Subscribe to this for real-time streaming.
//
// Example:
//
// case process.ActionProcessOutput: fmt.Println(msg.Line)
type ActionProcessOutput struct {
ID string
Line string
@ -50,6 +66,10 @@ type ActionProcessOutput struct {
// ActionProcessExited is broadcast when a process completes.
// Check ExitCode for success (0) or failure.
//
// Example:
//
// case process.ActionProcessExited: fmt.Println(msg.ExitCode)
type ActionProcessExited struct {
ID string
ExitCode int
@ -58,6 +78,10 @@ type ActionProcessExited struct {
}
// ActionProcessKilled is broadcast when a process is terminated.
//
// Example:
//
// case process.ActionProcessKilled: fmt.Println(msg.Signal)
type ActionProcessKilled struct {
ID string
Signal string

View file

@ -11,6 +11,13 @@ import (
)
// DaemonOptions configures daemon mode execution.
//
// Example:
//
// opts := process.DaemonOptions{
// PIDFile: "/var/run/myapp.pid",
// HealthAddr: "127.0.0.1:0",
// }
type DaemonOptions struct {
// PIDFile path for single-instance enforcement.
// Leave empty to skip PID file management.
@ -46,6 +53,10 @@ type Daemon struct {
}
// NewDaemon creates a daemon runner with the given options.
//
// Example:
//
// daemon := process.NewDaemon(process.DaemonOptions{HealthAddr: "127.0.0.1:0"})
func NewDaemon(opts DaemonOptions) *Daemon {
if opts.ShutdownTimeout == 0 {
opts.ShutdownTimeout = 30 * time.Second
@ -68,6 +79,10 @@ func NewDaemon(opts DaemonOptions) *Daemon {
}
// Start initialises the daemon (PID file, health server).
//
// Example:
//
// if err := daemon.Start(); err != nil { return err }
func (d *Daemon) Start() error {
d.mu.Lock()
defer d.mu.Unlock()
@ -114,6 +129,10 @@ func (d *Daemon) Start() error {
}
// Run blocks until the context is cancelled.
//
// Example:
//
// if err := daemon.Run(ctx); err != nil { return err }
func (d *Daemon) Run(ctx context.Context) error {
d.mu.Lock()
if !d.running {
@ -128,6 +147,10 @@ func (d *Daemon) Run(ctx context.Context) error {
}
// Stop performs graceful shutdown.
//
// Example:
//
// _ = daemon.Stop()
func (d *Daemon) Stop() error {
d.mu.Lock()
defer d.mu.Unlock()
@ -167,14 +190,36 @@ func (d *Daemon) Stop() error {
return nil
}
// SetReady sets the daemon readiness status for health checks.
// SetReady sets the daemon readiness status for `/ready`.
//
// Example:
//
// daemon.SetReady(false)
func (d *Daemon) SetReady(ready bool) {
if d.health != nil {
d.health.SetReady(ready)
}
}
// Ready reports whether the daemon is currently ready for traffic.
//
// Example:
//
// if daemon.Ready() {
// // expose the service to callers
// }
func (d *Daemon) Ready() bool {
if d.health != nil {
return d.health.Ready()
}
return false
}
// HealthAddr returns the health server address, or empty if disabled.
//
// Example:
//
// addr := daemon.HealthAddr()
func (d *Daemon) HealthAddr() string {
if d.health != nil {
return d.health.Addr()

View file

@ -75,14 +75,21 @@ func TestDaemon_SetReady(t *testing.T) {
resp, _ := http.Get("http://" + addr + "/ready")
assert.Equal(t, http.StatusOK, resp.StatusCode)
_ = resp.Body.Close()
assert.True(t, d.Ready())
d.SetReady(false)
assert.False(t, d.Ready())
resp, _ = http.Get("http://" + addr + "/ready")
assert.Equal(t, http.StatusServiceUnavailable, resp.StatusCode)
_ = resp.Body.Close()
}
func TestDaemon_ReadyWithoutHealthServer(t *testing.T) {
d := NewDaemon(DaemonOptions{})
assert.False(t, d.Ready())
}
func TestDaemon_NoHealthAddrReturnsEmpty(t *testing.T) {
d := NewDaemon(DaemonOptions{})
assert.Empty(t, d.HealthAddr())

View file

@ -15,7 +15,7 @@ import (
// ErrCommandContextRequired is returned when a command is created without a context.
var ErrCommandContextRequired = coreerr.E("", "exec: command context is required", nil)
// Options configuration for command execution
// Options configures command execution.
type Options struct {
Dir string
Env []string
@ -26,7 +26,11 @@ type Options struct {
Background bool
}
// Command wraps os/exec.Command with logging and context
// Command wraps os/exec.Command with logging and context.
//
// Example:
//
// cmd := exec.Command(ctx, "go", "test", "./...")
func Command(ctx context.Context, name string, args ...string) *Cmd {
return &Cmd{
name: name,
@ -35,7 +39,7 @@ func Command(ctx context.Context, name string, args ...string) *Cmd {
}
}
// Cmd represents a wrapped command
// Cmd represents a wrapped command.
type Cmd struct {
name string
args []string
@ -45,31 +49,51 @@ type Cmd struct {
logger Logger
}
// WithDir sets the working directory
// WithDir sets the working directory.
//
// Example:
//
// cmd.WithDir("/tmp")
func (c *Cmd) WithDir(dir string) *Cmd {
c.opts.Dir = dir
return c
}
// WithEnv sets the environment variables
// WithEnv sets the environment variables.
//
// Example:
//
// cmd.WithEnv([]string{"CGO_ENABLED=0"})
func (c *Cmd) WithEnv(env []string) *Cmd {
c.opts.Env = env
return c
}
// WithStdin sets stdin
// WithStdin sets stdin.
//
// Example:
//
// cmd.WithStdin(strings.NewReader("input"))
func (c *Cmd) WithStdin(r io.Reader) *Cmd {
c.opts.Stdin = r
return c
}
// WithStdout sets stdout
// WithStdout sets stdout.
//
// Example:
//
// cmd.WithStdout(os.Stdout)
func (c *Cmd) WithStdout(w io.Writer) *Cmd {
c.opts.Stdout = w
return c
}
// WithStderr sets stderr
// WithStderr sets stderr.
//
// Example:
//
// cmd.WithStderr(os.Stderr)
func (c *Cmd) WithStderr(w io.Writer) *Cmd {
c.opts.Stderr = w
return c
@ -89,6 +113,10 @@ func (c *Cmd) WithBackground(background bool) *Cmd {
}
// Start launches the command.
//
// Example:
//
// if err := cmd.Start(); err != nil { return err }
func (c *Cmd) Start() error {
if err := c.prepare(); err != nil {
return err
@ -112,6 +140,10 @@ func (c *Cmd) Start() error {
// Run executes the command and waits for it to finish.
// It automatically logs the command execution at debug level.
//
// Example:
//
// if err := cmd.Run(); err != nil { return err }
func (c *Cmd) Run() error {
if c.opts.Background {
return c.Start()
@ -131,6 +163,10 @@ func (c *Cmd) Run() error {
}
// Output runs the command and returns its standard output.
//
// Example:
//
// out, err := cmd.Output()
func (c *Cmd) Output() ([]byte, error) {
if c.opts.Background {
return nil, coreerr.E("Cmd.Output", "background execution is incompatible with Output", nil)
@ -151,6 +187,10 @@ func (c *Cmd) Output() ([]byte, error) {
}
// CombinedOutput runs the command and returns its combined standard output and standard error.
//
// Example:
//
// out, err := cmd.CombinedOutput()
func (c *Cmd) CombinedOutput() ([]byte, error) {
if c.opts.Background {
return nil, coreerr.E("Cmd.CombinedOutput", "background execution is incompatible with CombinedOutput", nil)
@ -190,6 +230,10 @@ func (c *Cmd) prepare() error {
// RunQuiet executes the command suppressing stdout unless there is an error.
// Useful for internal commands.
//
// Example:
//
// err := exec.RunQuiet(ctx, "go", "vet", "./...")
func RunQuiet(ctx context.Context, name string, args ...string) error {
var stderr bytes.Buffer
cmd := Command(ctx, name, args...).WithStderr(&stderr)

View file

@ -4,8 +4,14 @@ package exec
// Compatible with pkg/log.Logger and other structured loggers.
type Logger interface {
// Debug logs a debug-level message with optional key-value pairs.
//
// Example:
// logger.Debug("starting", "cmd", "go")
Debug(msg string, keyvals ...any)
// Error logs an error-level message with optional key-value pairs.
//
// Example:
// logger.Error("failed", "cmd", "go", "err", err)
Error(msg string, keyvals ...any)
}
@ -22,6 +28,10 @@ var defaultLogger Logger = NopLogger{}
// SetDefaultLogger sets the package-level default logger.
// Commands without an explicit logger will use this.
//
// Example:
//
// exec.SetDefaultLogger(logger)
func SetDefaultLogger(l Logger) {
if l == nil {
l = NopLogger{}
@ -30,6 +40,10 @@ func SetDefaultLogger(l Logger) {
}
// DefaultLogger returns the current default logger.
//
// Example:
//
// logger := exec.DefaultLogger()
func DefaultLogger() Logger {
return defaultLogger
}

View file

@ -13,10 +13,10 @@ import (
coreerr "dappco.re/go/core/log"
)
// HealthCheck is a function that returns nil if healthy.
// HealthCheck is a function that returns nil when the service is healthy.
type HealthCheck func() error
// HealthServer provides HTTP /health and /ready endpoints for process monitoring.
// HealthServer provides HTTP `/health` and `/ready` endpoints for process monitoring.
type HealthServer struct {
addr string
server *http.Server
@ -27,6 +27,10 @@ type HealthServer struct {
}
// NewHealthServer creates a health check server on the given address.
//
// Example:
//
// server := process.NewHealthServer("127.0.0.1:0")
func NewHealthServer(addr string) *HealthServer {
return &HealthServer{
addr: addr,
@ -35,20 +39,45 @@ func NewHealthServer(addr string) *HealthServer {
}
// AddCheck registers a health check function.
//
// Example:
//
// server.AddCheck(func() error { return nil })
func (h *HealthServer) AddCheck(check HealthCheck) {
h.mu.Lock()
h.checks = append(h.checks, check)
h.mu.Unlock()
}
// SetReady sets the readiness status.
// SetReady sets the readiness status used by `/ready`.
//
// Example:
//
// server.SetReady(false)
func (h *HealthServer) SetReady(ready bool) {
h.mu.Lock()
h.ready = ready
h.mu.Unlock()
}
// Ready reports whether `/ready` currently returns HTTP 200.
//
// Example:
//
// if server.Ready() {
// // publish the service
// }
func (h *HealthServer) Ready() bool {
h.mu.Lock()
defer h.mu.Unlock()
return h.ready
}
// Start begins serving health check endpoints.
//
// Example:
//
// if err := server.Start(); err != nil { return err }
func (h *HealthServer) Start() error {
mux := http.NewServeMux()
@ -100,6 +129,10 @@ func (h *HealthServer) Start() error {
}
// Stop gracefully shuts down the health server.
//
// Example:
//
// _ = server.Stop(context.Background())
func (h *HealthServer) Stop(ctx context.Context) error {
if h.server == nil {
return nil
@ -108,6 +141,10 @@ func (h *HealthServer) Stop(ctx context.Context) error {
}
// Addr returns the actual address the server is listening on.
//
// Example:
//
// addr := server.Addr()
func (h *HealthServer) Addr() string {
if h.listener != nil {
return h.listener.Addr().String()
@ -115,16 +152,24 @@ func (h *HealthServer) Addr() string {
return h.addr
}
// WaitForHealth polls a health endpoint until it responds 200 or the timeout
// (in milliseconds) expires. Returns true if healthy, false on timeout.
// WaitForHealth polls `/health` until it responds 200 or the timeout expires.
//
// Example:
//
// if !process.WaitForHealth("127.0.0.1:8080", 5_000) {
// return errors.New("service did not become ready")
// }
func WaitForHealth(addr string, timeoutMs int) bool {
ok, _ := ProbeHealth(addr, timeoutMs)
return ok
}
// ProbeHealth polls a health endpoint until it responds 200 or the timeout
// (in milliseconds) expires. Returns the health status and the last observed
// failure reason if the endpoint never becomes healthy.
// ProbeHealth polls `/health` until it responds 200 or the timeout expires.
// It returns the health status and the last observed failure reason.
//
// Example:
//
// ok, reason := process.ProbeHealth("127.0.0.1:8080", 5_000)
func ProbeHealth(addr string, timeoutMs int) (bool, string) {
deadline := time.Now().Add(time.Duration(timeoutMs) * time.Millisecond)
url := fmt.Sprintf("http://%s/health", addr)

View file

@ -11,6 +11,7 @@ import (
func TestHealthServer_Endpoints(t *testing.T) {
hs := NewHealthServer("127.0.0.1:0")
assert.True(t, hs.Ready())
err := hs.Start()
require.NoError(t, err)
defer func() { _ = hs.Stop(context.Background()) }()
@ -29,6 +30,7 @@ func TestHealthServer_Endpoints(t *testing.T) {
_ = resp.Body.Close()
hs.SetReady(false)
assert.False(t, hs.Ready())
resp, err = http.Get("http://" + addr + "/ready")
require.NoError(t, err)
@ -36,6 +38,15 @@ func TestHealthServer_Endpoints(t *testing.T) {
_ = resp.Body.Close()
}
func TestHealthServer_Ready(t *testing.T) {
hs := NewHealthServer("127.0.0.1:0")
assert.True(t, hs.Ready())
hs.SetReady(false)
assert.False(t, hs.Ready())
}
func TestHealthServer_WithChecks(t *testing.T) {
hs := NewHealthServer("127.0.0.1:0")

View file

@ -14,18 +14,30 @@ import (
)
// PIDFile manages a process ID file for single-instance enforcement.
//
// Example:
//
// pidFile := process.NewPIDFile("/var/run/myapp.pid")
type PIDFile struct {
path string
mu sync.Mutex
}
// NewPIDFile creates a PID file manager.
//
// Example:
//
// pidFile := process.NewPIDFile("/var/run/myapp.pid")
func NewPIDFile(path string) *PIDFile {
return &PIDFile{path: path}
}
// Acquire writes the current PID to the file.
// Returns error if another instance is running.
//
// Example:
//
// if err := pidFile.Acquire(); err != nil { return err }
func (p *PIDFile) Acquire() error {
p.mu.Lock()
defer p.mu.Unlock()
@ -57,6 +69,10 @@ func (p *PIDFile) Acquire() error {
}
// Release removes the PID file.
//
// Example:
//
// _ = pidFile.Release()
func (p *PIDFile) Release() error {
p.mu.Lock()
defer p.mu.Unlock()
@ -67,6 +83,10 @@ func (p *PIDFile) Release() error {
}
// Path returns the PID file path.
//
// Example:
//
// path := pidFile.Path()
func (p *PIDFile) Path() string {
return p.path
}
@ -74,6 +94,10 @@ func (p *PIDFile) Path() string {
// ReadPID reads a PID file and checks if the process is still running.
// Returns (pid, true) if the process is alive, (pid, false) if dead/stale,
// or (0, false) if the file doesn't exist or is invalid.
//
// Example:
//
// pid, running := process.ReadPID("/var/run/myapp.pid")
func ReadPID(path string) (int, bool) {
data, err := coreio.Local.Read(path)
if err != nil {

View file

@ -14,6 +14,10 @@ import (
)
// Process represents a managed external process.
//
// Example:
//
// proc, err := svc.Start(ctx, "echo", "hello")
type Process struct {
ID string
Command string
@ -37,6 +41,10 @@ type Process struct {
}
// Info returns a snapshot of process state.
//
// Example:
//
// info := proc.Info()
func (p *Process) Info() Info {
p.mu.RLock()
defer p.mu.RUnlock()
@ -61,6 +69,10 @@ func (p *Process) Info() Info {
}
// Output returns the captured output as a string.
//
// Example:
//
// fmt.Println(proc.Output())
func (p *Process) Output() string {
p.mu.RLock()
defer p.mu.RUnlock()
@ -71,6 +83,10 @@ func (p *Process) Output() string {
}
// OutputBytes returns the captured output as bytes.
//
// Example:
//
// data := proc.OutputBytes()
func (p *Process) OutputBytes() []byte {
p.mu.RLock()
defer p.mu.RUnlock()
@ -88,6 +104,10 @@ func (p *Process) IsRunning() bool {
}
// Wait blocks until the process exits.
//
// Example:
//
// if err := proc.Wait(); err != nil { return err }
func (p *Process) Wait() error {
<-p.done
p.mu.RLock()
@ -105,12 +125,20 @@ func (p *Process) Wait() error {
}
// Done returns a channel that closes when the process exits.
//
// Example:
//
// <-proc.Done()
func (p *Process) Done() <-chan struct{} {
return p.done
}
// Kill forcefully terminates the process.
// If KillGroup is set, kills the entire process group.
//
// Example:
//
// _ = proc.Kill()
func (p *Process) Kill() error {
p.mu.Lock()
defer p.mu.Unlock()
@ -133,6 +161,10 @@ func (p *Process) Kill() error {
// Shutdown gracefully stops the process: SIGTERM, then SIGKILL after grace period.
// If GracePeriod was not set (zero), falls back to immediate Kill().
// If KillGroup is set, signals are sent to the entire process group.
//
// Example:
//
// _ = proc.Shutdown()
func (p *Process) Shutdown() error {
p.mu.RLock()
grace := p.gracePeriod
@ -177,6 +209,10 @@ func (p *Process) terminate() error {
}
// Signal sends a signal to the process.
//
// Example:
//
// _ = proc.Signal(os.Interrupt)
func (p *Process) Signal(sig os.Signal) error {
p.mu.Lock()
defer p.mu.Unlock()
@ -201,6 +237,10 @@ func (p *Process) Signal(sig os.Signal) error {
}
// SendInput writes to the process stdin.
//
// Example:
//
// _ = proc.SendInput("hello\n")
func (p *Process) SendInput(input string) error {
p.mu.RLock()
defer p.mu.RUnlock()
@ -218,6 +258,10 @@ func (p *Process) SendInput(input string) error {
}
// CloseStdin closes the process stdin pipe.
//
// Example:
//
// _ = proc.CloseStdin()
func (p *Process) CloseStdin() error {
p.mu.Lock()
defer p.mu.Unlock()

View file

@ -9,7 +9,7 @@ import (
coreerr "dappco.re/go/core/log"
)
// Global default service (follows i18n pattern).
// Global default service used by package-level helpers.
var (
defaultService atomic.Pointer[Service]
defaultOnce sync.Once
@ -18,12 +18,20 @@ var (
// Default returns the global process service.
// Returns nil if not initialized.
//
// Example:
//
// svc := process.Default()
func Default() *Service {
return defaultService.Load()
}
// SetDefault sets the global process service.
// Thread-safe: can be called concurrently with Default().
//
// Example:
//
// _ = process.SetDefault(svc)
func SetDefault(s *Service) error {
if s == nil {
return ErrSetDefaultNil
@ -34,6 +42,10 @@ func SetDefault(s *Service) error {
// Init initializes the default global service with a Core instance.
// This is typically called during application startup.
//
// Example:
//
// _ = process.Init(coreInstance)
func Init(c *core.Core) error {
defaultOnce.Do(func() {
factory := NewService(Options{})
@ -50,6 +62,10 @@ func Init(c *core.Core) error {
// --- Global convenience functions ---
// Start spawns a new process using the default service.
//
// Example:
//
// proc, err := process.Start(ctx, "echo", "hello")
func Start(ctx context.Context, command string, args ...string) (*Process, error) {
svc := Default()
if svc == nil {
@ -59,6 +75,10 @@ func Start(ctx context.Context, command string, args ...string) (*Process, error
}
// Run executes a command and waits for completion using the default service.
//
// Example:
//
// out, err := process.Run(ctx, "echo", "hello")
func Run(ctx context.Context, command string, args ...string) (string, error) {
svc := Default()
if svc == nil {
@ -68,6 +88,10 @@ func Run(ctx context.Context, command string, args ...string) (string, error) {
}
// Get returns a process by ID from the default service.
//
// Example:
//
// proc, err := process.Get("proc-1")
func Get(id string) (*Process, error) {
svc := Default()
if svc == nil {
@ -77,6 +101,10 @@ func Get(id string) (*Process, error) {
}
// List returns all processes from the default service.
//
// Example:
//
// procs := process.List()
func List() []*Process {
svc := Default()
if svc == nil {
@ -86,6 +114,10 @@ func List() []*Process {
}
// Kill terminates a process by ID using the default service.
//
// Example:
//
// _ = process.Kill("proc-1")
func Kill(id string) error {
svc := Default()
if svc == nil {
@ -95,6 +127,10 @@ func Kill(id string) error {
}
// KillPID terminates a process by operating-system PID using the default service.
//
// Example:
//
// _ = process.KillPID(1234)
func KillPID(pid int) error {
svc := Default()
if svc == nil {
@ -104,6 +140,10 @@ func KillPID(pid int) error {
}
// StartWithOptions spawns a process with full configuration using the default service.
//
// Example:
//
// proc, err := process.StartWithOptions(ctx, process.RunOptions{Command: "pwd", Dir: "/tmp"})
func StartWithOptions(ctx context.Context, opts RunOptions) (*Process, error) {
svc := Default()
if svc == nil {
@ -113,6 +153,10 @@ func StartWithOptions(ctx context.Context, opts RunOptions) (*Process, error) {
}
// RunWithOptions executes a command with options and waits using the default service.
//
// Example:
//
// out, err := process.RunWithOptions(ctx, process.RunOptions{Command: "echo", Args: []string{"hello"}})
func RunWithOptions(ctx context.Context, opts RunOptions) (string, error) {
svc := Default()
if svc == nil {
@ -122,6 +166,10 @@ func RunWithOptions(ctx context.Context, opts RunOptions) (string, error) {
}
// Running returns all currently running processes from the default service.
//
// Example:
//
// running := process.Running()
func Running() []*Process {
svc := Default()
if svc == nil {

View file

@ -21,17 +21,27 @@ var ErrProgramContextRequired = coreerr.E("", "program: command context is requi
var ErrProgramNameRequired = coreerr.E("", "program: program name is empty", nil)
// Program represents a named executable located on the system PATH.
// Create one with a Name, call Find to resolve its path, then Run or RunDir.
//
// Example:
//
// git := &process.Program{Name: "git"}
// if err := git.Find(); err != nil { return err }
// out, err := git.Run(ctx, "status")
type Program struct {
// Name is the binary name (e.g. "go", "node", "git").
Name string
// Path is the absolute path resolved by Find.
// Example: "/usr/bin/git"
// If empty, Run and RunDir fall back to Name for OS PATH resolution.
Path string
}
// Find resolves the program's absolute path using exec.LookPath.
// Returns ErrProgramNotFound (wrapped) if the binary is not on PATH.
//
// Example:
//
// if err := p.Find(); err != nil { return err }
func (p *Program) Find() error {
if p.Name == "" {
return coreerr.E("Program.Find", "program name is empty", nil)
@ -46,6 +56,10 @@ func (p *Program) Find() error {
// Run executes the program with args in the current working directory.
// Returns trimmed combined stdout+stderr output and any error.
//
// Example:
//
// out, err := p.Run(ctx, "hello")
func (p *Program) Run(ctx context.Context, args ...string) (string, error) {
return p.RunDir(ctx, "", args...)
}
@ -53,6 +67,10 @@ func (p *Program) Run(ctx context.Context, args ...string) (string, error) {
// RunDir executes the program with args in dir.
// Returns trimmed combined stdout+stderr output and any error.
// If dir is empty, the process inherits the caller's working directory.
//
// Example:
//
// out, err := p.RunDir(ctx, "/tmp", "pwd")
func (p *Program) RunDir(ctx context.Context, dir string, args ...string) (string, error) {
if ctx == nil {
return "", coreerr.E("Program.RunDir", "program: command context is required", ErrProgramContextRequired)

View file

@ -14,6 +14,10 @@ import (
)
// DaemonEntry records a running daemon in the registry.
//
// Example:
//
// entry := process.DaemonEntry{Code: "app", Daemon: "serve", PID: os.Getpid()}
type DaemonEntry struct {
Code string `json:"code"`
Daemon string `json:"daemon"`
@ -30,11 +34,19 @@ type Registry struct {
}
// NewRegistry creates a registry backed by the given directory.
//
// Example:
//
// reg := process.NewRegistry("/tmp/daemons")
func NewRegistry(dir string) *Registry {
return &Registry{dir: dir}
}
// DefaultRegistry returns a registry using ~/.core/daemons/.
//
// Example:
//
// reg := process.DefaultRegistry()
func DefaultRegistry() *Registry {
home, err := os.UserHomeDir()
if err != nil {
@ -46,6 +58,10 @@ func DefaultRegistry() *Registry {
// Register writes a daemon entry to the registry directory.
// If Started is zero, it is set to the current time.
// The directory is created if it does not exist.
//
// Example:
//
// _ = reg.Register(entry)
func (r *Registry) Register(entry DaemonEntry) error {
if entry.Started.IsZero() {
entry.Started = time.Now()
@ -67,6 +83,10 @@ func (r *Registry) Register(entry DaemonEntry) error {
}
// Unregister removes a daemon entry from the registry.
//
// Example:
//
// _ = reg.Unregister("app", "serve")
func (r *Registry) Unregister(code, daemon string) error {
if err := coreio.Local.Delete(r.entryPath(code, daemon)); err != nil {
return coreerr.E("Registry.Unregister", "failed to delete entry file", err)
@ -76,6 +96,10 @@ func (r *Registry) Unregister(code, daemon string) error {
// Get reads a single daemon entry and checks whether its process is alive.
// If the process is dead, the stale file is removed and (nil, false) is returned.
//
// Example:
//
// entry, ok := reg.Get("app", "serve")
func (r *Registry) Get(code, daemon string) (*DaemonEntry, bool) {
path := r.entryPath(code, daemon)
@ -99,6 +123,10 @@ func (r *Registry) Get(code, daemon string) (*DaemonEntry, bool) {
}
// List returns all alive daemon entries, pruning any with dead PIDs.
//
// Example:
//
// entries, err := reg.List()
func (r *Registry) List() ([]DaemonEntry, error) {
matches, err := filepath.Glob(filepath.Join(r.dir, "*.json"))
if err != nil {

View file

@ -20,11 +20,19 @@ var ErrRunnerNoService = coreerr.E("", "runner service is nil", nil)
var ErrRunnerInvalidSpecName = coreerr.E("", "runner spec names must be non-empty and unique", nil)
// NewRunner creates a runner for the given service.
//
// Example:
//
// runner := process.NewRunner(svc)
func NewRunner(svc *Service) *Runner {
return &Runner{service: svc}
}
// RunSpec defines a process to run with optional dependencies.
//
// Example:
//
// spec := process.RunSpec{Name: "test", Command: "go", Args: []string{"test", "./..."}}
type RunSpec struct {
// Name is a friendly identifier (e.g., "lint", "test").
Name string
@ -54,6 +62,10 @@ type RunResult struct {
}
// Passed returns true if the process succeeded.
//
// Example:
//
// if result.Passed() { ... }
func (r RunResult) Passed() bool {
return !r.Skipped && r.Error == nil && r.ExitCode == 0
}
@ -68,11 +80,19 @@ type RunAllResult struct {
}
// Success returns true if all non-skipped specs passed.
//
// Example:
//
// if result.Success() { ... }
func (r RunAllResult) Success() bool {
return r.Failed == 0
}
// RunAll executes specs respecting dependencies, parallelising where possible.
//
// Example:
//
// result, err := runner.RunAll(ctx, specs)
func (r *Runner) RunAll(ctx context.Context, specs []RunSpec) (*RunAllResult, error) {
if err := r.ensureService(); err != nil {
return nil, err
@ -241,6 +261,10 @@ func (r *Runner) runSpec(ctx context.Context, spec RunSpec) RunResult {
}
// RunSequential executes specs one after another, stopping on first failure.
//
// Example:
//
// result, err := runner.RunSequential(ctx, specs)
func (r *Runner) RunSequential(ctx context.Context, specs []RunSpec) (*RunAllResult, error) {
if err := r.ensureService(); err != nil {
return nil, err
@ -287,6 +311,10 @@ func (r *Runner) RunSequential(ctx context.Context, specs []RunSpec) (*RunAllRes
}
// RunParallel executes all specs concurrently, regardless of dependencies.
//
// Example:
//
// result, err := runner.RunParallel(ctx, specs)
func (r *Runner) RunParallel(ctx context.Context, specs []RunSpec) (*RunAllResult, error) {
if err := r.ensureService(); err != nil {
return nil, err

View file

@ -39,6 +39,10 @@ type Service struct {
}
// Options configures the process service.
//
// Example:
//
// svc := process.NewService(process.Options{BufferSize: 2 * 1024 * 1024})
type Options struct {
// BufferSize is the ring buffer size for output capture.
// Default: 1MB (1024 * 1024 bytes).
@ -50,6 +54,10 @@ type Options struct {
// core, _ := core.New(
// core.WithName("process", process.NewService(process.Options{})),
// )
//
// Example:
//
// factory := process.NewService(process.Options{})
func NewService(opts Options) func(*core.Core) (any, error) {
return func(c *core.Core) (any, error) {
if opts.BufferSize == 0 {
@ -65,6 +73,10 @@ func NewService(opts Options) func(*core.Core) (any, error) {
}
// OnStartup implements core.Startable.
//
// Example:
//
// _ = svc.OnStartup(ctx)
func (s *Service) OnStartup(ctx context.Context) error {
s.registrations.Do(func() {
if s.Core() != nil {
@ -76,6 +88,10 @@ func (s *Service) OnStartup(ctx context.Context) error {
// OnShutdown implements core.Stoppable.
// Gracefully shuts down all running processes (SIGTERM → SIGKILL).
//
// Example:
//
// _ = svc.OnShutdown(ctx)
func (s *Service) OnShutdown(ctx context.Context) error {
s.mu.RLock()
procs := make([]*Process, 0, len(s.processes))
@ -94,6 +110,10 @@ func (s *Service) OnShutdown(ctx context.Context) error {
}
// Start spawns a new process with the given command and args.
//
// Example:
//
// proc, err := svc.Start(ctx, "echo", "hello")
func (s *Service) Start(ctx context.Context, command string, args ...string) (*Process, error) {
return s.StartWithOptions(ctx, RunOptions{
Command: command,
@ -102,6 +122,10 @@ func (s *Service) Start(ctx context.Context, command string, args ...string) (*P
}
// StartWithOptions spawns a process with full configuration.
//
// Example:
//
// proc, err := svc.StartWithOptions(ctx, process.RunOptions{Command: "pwd", Dir: "/tmp"})
func (s *Service) StartWithOptions(ctx context.Context, opts RunOptions) (*Process, error) {
if opts.Command == "" {
return nil, coreerr.E("Service.StartWithOptions", "command is required", nil)
@ -292,6 +316,10 @@ func (s *Service) streamOutput(proc *Process, r io.Reader, stream Stream) {
}
// Get returns a process by ID.
//
// Example:
//
// proc, err := svc.Get("proc-1")
func (s *Service) Get(id string) (*Process, error) {
s.mu.RLock()
defer s.mu.RUnlock()
@ -304,6 +332,10 @@ func (s *Service) Get(id string) (*Process, error) {
}
// List returns all processes.
//
// Example:
//
// for _, proc := range svc.List() { _ = proc }
func (s *Service) List() []*Process {
s.mu.RLock()
defer s.mu.RUnlock()
@ -317,6 +349,10 @@ func (s *Service) List() []*Process {
}
// Running returns all currently running processes.
//
// Example:
//
// running := svc.Running()
func (s *Service) Running() []*Process {
s.mu.RLock()
defer s.mu.RUnlock()
@ -332,6 +368,10 @@ func (s *Service) Running() []*Process {
}
// Kill terminates a process by ID.
//
// Example:
//
// _ = svc.Kill("proc-1")
func (s *Service) Kill(id string) error {
proc, err := s.Get(id)
if err != nil {
@ -342,6 +382,10 @@ func (s *Service) Kill(id string) error {
}
// KillPID terminates a process by operating-system PID.
//
// Example:
//
// _ = svc.KillPID(1234)
func (s *Service) KillPID(pid int) error {
if pid <= 0 {
return coreerr.E("Service.KillPID", "pid must be positive", nil)
@ -355,6 +399,10 @@ func (s *Service) KillPID(pid int) error {
}
// Remove removes a completed process from the list.
//
// Example:
//
// _ = svc.Remove("proc-1")
func (s *Service) Remove(id string) error {
s.mu.Lock()
defer s.mu.Unlock()
@ -373,6 +421,10 @@ func (s *Service) Remove(id string) error {
}
// Clear removes all completed processes.
//
// Example:
//
// svc.Clear()
func (s *Service) Clear() {
s.mu.Lock()
defer s.mu.Unlock()
@ -385,6 +437,10 @@ func (s *Service) Clear() {
}
// Output returns the captured output of a process.
//
// Example:
//
// out, err := svc.Output("proc-1")
func (s *Service) Output(id string) (string, error) {
proc, err := s.Get(id)
if err != nil {
@ -395,6 +451,10 @@ func (s *Service) Output(id string) (string, error) {
// Run executes a command and waits for completion.
// Returns the combined output and any error.
//
// Example:
//
// out, err := svc.Run(ctx, "echo", "hello")
func (s *Service) Run(ctx context.Context, command string, args ...string) (string, error) {
proc, err := s.Start(ctx, command, args...)
if err != nil {
@ -414,6 +474,10 @@ func (s *Service) Run(ctx context.Context, command string, args ...string) (stri
}
// RunWithOptions executes a command with options and waits for completion.
//
// Example:
//
// out, err := svc.RunWithOptions(ctx, process.RunOptions{Command: "echo", Args: []string{"hello"}})
func (s *Service) RunWithOptions(ctx context.Context, opts RunOptions) (string, error) {
proc, err := s.StartWithOptions(ctx, opts)
if err != nil {

View file

@ -1,5 +1,10 @@
// Package process provides process management with Core IPC integration.
//
// Example:
//
// svc := process.NewService(process.Options{})
// proc, err := svc.Start(ctx, "echo", "hello")
//
// The process package enables spawning, monitoring, and controlling external
// processes with output streaming via the Core ACTION system.
//
@ -35,6 +40,10 @@ package process
import "time"
// Status represents the process lifecycle state.
//
// Example:
//
// if proc.Status == process.StatusKilled { return }
type Status string
const (
@ -51,6 +60,10 @@ const (
)
// Stream identifies the output source.
//
// Example:
//
// if event.Stream == process.StreamStdout { ... }
type Stream string
const (
@ -61,6 +74,13 @@ const (
)
// RunOptions configures process execution.
//
// Example:
//
// opts := process.RunOptions{
// Command: "go",
// Args: []string{"test", "./..."},
// }
type RunOptions struct {
// Command is the executable to run.
Command string
@ -92,6 +112,11 @@ type RunOptions struct {
}
// Info provides a snapshot of process state without internal fields.
//
// Example:
//
// info := proc.Info()
// fmt.Println(info.PID)
type Info struct {
ID string `json:"id"`
Command string `json:"command"`