fix(process): align service contract with RFC

Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
Virgil 2026-03-29 14:55:24 +00:00
parent cca45eebcc
commit 94b99bfd18
8 changed files with 228 additions and 97 deletions

View file

@ -20,11 +20,11 @@ core go vet # Vet
The package has three layers, all in the root `process` package (plus a `exec` subpackage):
### Layer 1: Process Execution (service.go, process.go, process_global.go)
### Layer 1: Process Execution (service.go, process.go)
`Service` is a Core service (`*core.ServiceRuntime[Options]`) that manages all `Process` instances. It spawns subprocesses, pipes stdout/stderr through goroutines, captures output to a `RingBuffer`, and broadcasts IPC actions (`ActionProcessStarted`, `ActionProcessOutput`, `ActionProcessExited`, `ActionProcessKilled` — defined in actions.go).
`process_global.go` provides package-level convenience functions (`Start`, `Run`, `Kill`, `List`) that delegate to a global `Service` singleton initialized via `Init(core)`. Follows the same pattern as Go's `i18n` package.
During `OnStartup`, the service registers the named Core actions `process.run`, `process.start`, `process.kill`, `process.list`, and `process.get`. Core's `Process` primitive delegates to those actions, so `c.Process().Run(...)` only works when this service has been registered.
### Layer 2: Daemon Lifecycle (daemon.go, pidfile.go, health.go, registry.go)
@ -45,7 +45,7 @@ Builder-pattern wrapper around `os/exec` with structured logging via a pluggable
## Key Patterns
- **Core integration**: `Service` embeds `*core.ServiceRuntime[Options]` and uses `s.Core().ACTION(...)` to broadcast typed action messages. Tests create a Core instance via `framework.New(framework.WithName("process", NewService(...)))`.
- **Core integration**: `Service` embeds `*core.ServiceRuntime[Options]` and uses `s.Core().ACTION(...)` to broadcast typed action messages. Tests create a Core instance via `framework.New(framework.WithService(Register))`.
- **Output capture**: All process output goes through a fixed-size `RingBuffer` (default 1MB). Oldest data is silently overwritten when full. Set `RunOptions.DisableCapture` to skip buffering for long-running processes where output is only streamed via IPC.
- **Process lifecycle**: Status transitions are `StatusPending → StatusRunning → StatusExited|StatusFailed|StatusKilled`. The `done` channel closes on exit; use `<-proc.Done()` or `proc.Wait()`.
- **Detach / process group isolation**: Set `RunOptions.Detach = true` to run the subprocess in its own process group (`Setpgid`). Detached processes use `context.Background()` so they survive parent context cancellation and parent death.

View file

@ -60,32 +60,28 @@ participate in the Core DI container and implements both `Startable` and
```go
type Service struct {
*core.ServiceRuntime[Options]
processes map[string]*Process
mu sync.RWMutex
managed *core.Registry[*ManagedProcess]
bufSize int
idCounter atomic.Uint64
}
```
Key behaviours:
- **OnStartup**currently a no-op; reserved for future initialisation.
- **OnStartup**registers the named Core actions `process.run`, `process.start`, `process.kill`, `process.list`, and `process.get`.
- **OnShutdown** — iterates all running processes and calls `Kill()` on each,
ensuring no orphaned child processes when the application exits.
- Process IDs are generated as `proc-N` using an atomic counter, guaranteeing
uniqueness without locks.
- Process IDs are generated with `core.ID()` and stored in a Core registry.
#### Registration
The service is registered with Core via a factory function:
```go
process.NewService(process.Options{BufferSize: 2 * 1024 * 1024})
core.New(core.WithService(process.Register))
```
`NewService` returns a `func(*core.Core) (any, error)` closure — the standard
Core service factory signature. The `Options` struct is captured by the closure
and applied when Core instantiates the service.
`Register` returns `core.Result{Value: *Service, OK: true}` — the standard
Core `WithService` factory signature used by the v0.8.0 contract.
### Process
@ -163,12 +159,12 @@ const (
When `Service.StartWithOptions()` is called:
```
1. Generate unique ID (atomic counter)
1. Generate a unique ID with `core.ID()`
2. Create context with cancel
3. Build os/exec.Cmd with dir, env, pipes
4. Create RingBuffer (unless DisableCapture is set)
5. cmd.Start()
6. Store process in map
6. Store process in the Core registry
7. Broadcast ActionProcessStarted via Core.ACTION
8. Spawn 2 goroutines to stream stdout and stderr
- Each line is written to the RingBuffer
@ -176,8 +172,9 @@ When `Service.StartWithOptions()` is called:
9. Spawn 1 goroutine to wait for process exit
- Waits for output goroutines to finish first
- Calls cmd.Wait()
- Updates process status and exit code
- Classifies the exit as exited, failed, or killed
- Closes the done channel
- Broadcasts ActionProcessKilled when the process died from a signal
- Broadcasts ActionProcessExited
```
@ -296,12 +293,12 @@ File naming convention: `{code}-{daemon}.json` (slashes replaced with dashes).
## exec Sub-Package
The `exec` package (`forge.lthn.ai/core/go-process/exec`) provides a fluent
The `exec` package (`dappco.re/go/core/process/exec`) provides a fluent
wrapper around `os/exec` for simple, one-shot commands that do not need Core
integration:
```go
import "forge.lthn.ai/core/go-process/exec"
import "dappco.re/go/core/process/exec"
// Fluent API
err := exec.Command(ctx, "go", "build", "./...").

View file

@ -101,9 +101,7 @@ go-process/
pidfile.go # PID file single-instance lock
pidfile_test.go # PID file tests
process.go # Process type and methods
process_global.go # Global singleton and convenience API
process_test.go # Process tests
global_test.go # Global API tests (concurrency)
registry.go # Daemon registry (JSON file store)
registry_test.go # Registry tests
runner.go # Pipeline runner (sequential, parallel, DAG)
@ -142,8 +140,6 @@ go-process/
| `ErrProcessNotFound` | No process with the given ID exists in the service |
| `ErrProcessNotRunning` | Operation requires a running process (e.g. SendInput, Signal) |
| `ErrStdinNotAvailable` | Stdin pipe is nil (already closed or never created) |
| `ErrServiceNotInitialized` | Global convenience function called before `process.Init()` |
| `ServiceError` | Wraps service-level errors with a message string |
## Build Configuration

View file

@ -5,10 +5,10 @@ description: Process management with Core IPC integration for Go applications.
# go-process
`forge.lthn.ai/core/go-process` is a process management library that provides
`dappco.re/go/core/process` is a process management library that provides
spawning, monitoring, and controlling external processes with real-time output
streaming via the Core ACTION (IPC) system. It integrates directly with the
[Core DI framework](https://forge.lthn.ai/core/go) as a first-class service.
[Core DI framework](https://dappco.re/go/core) as a first-class service.
## Features
@ -28,22 +28,17 @@ streaming via the Core ACTION (IPC) system. It integrates directly with the
```go
import (
"context"
framework "forge.lthn.ai/core/go/pkg/core"
"forge.lthn.ai/core/go-process"
"dappco.re/go/core"
"dappco.re/go/core/process"
)
// Create a Core instance with the process service
c, err := framework.New(
framework.WithName("process", process.NewService(process.Options{})),
)
if err != nil {
log.Fatal(err)
}
// Create a Core instance with the process service registered.
c := core.New(core.WithService(process.Register))
// Retrieve the typed service
svc, err := framework.ServiceFor[*process.Service](c, "process")
if err != nil {
log.Fatal(err)
svc, ok := core.ServiceFor[*process.Service](c, "process")
if !ok {
panic("process service not registered")
}
```
@ -51,15 +46,19 @@ if err != nil {
```go
// Fire-and-forget (async)
proc, err := svc.Start(ctx, "go", "test", "./...")
if err != nil {
return err
start := svc.Start(ctx, "go", "test", "./...")
if !start.OK {
return start.Value.(error)
}
proc := start.Value.(*process.Process)
<-proc.Done()
fmt.Println(proc.Output())
// Synchronous convenience
output, err := svc.Run(ctx, "echo", "hello world")
run := svc.Run(ctx, "echo", "hello world")
if run.OK {
fmt.Println(run.Value.(string))
}
```
### Listen for Events
@ -67,7 +66,7 @@ output, err := svc.Run(ctx, "echo", "hello world")
Process lifecycle events are broadcast through Core's ACTION system:
```go
c.RegisterAction(func(c *framework.Core, msg framework.Message) error {
c.RegisterAction(func(c *core.Core, msg core.Message) core.Result {
switch m := msg.(type) {
case process.ActionProcessStarted:
fmt.Printf("Started: %s (PID %d)\n", m.Command, m.PID)
@ -78,24 +77,24 @@ c.RegisterAction(func(c *framework.Core, msg framework.Message) error {
case process.ActionProcessKilled:
fmt.Printf("Killed with %s\n", m.Signal)
}
return nil
return core.Result{OK: true}
})
```
### Global Convenience API
### Permission Model
For applications that only need a single process service, a global singleton
is available:
Core's process primitive delegates to named actions registered by this module.
Without `process.Register`, `c.Process().Run(...)` fails with `OK: false`.
```go
// Initialise once at startup
process.Init(coreInstance)
c := core.New()
r := c.Process().Run(ctx, "echo", "blocked")
fmt.Println(r.OK) // false
// Then use package-level functions anywhere
proc, _ := process.Start(ctx, "ls", "-la")
output, _ := process.Run(ctx, "date")
procs := process.List()
running := process.Running()
c = core.New(core.WithService(process.Register))
_ = c.ServiceStartup(ctx, nil)
r = c.Process().Run(ctx, "echo", "allowed")
fmt.Println(r.OK) // true
```
## Package Layout
@ -109,7 +108,7 @@ running := process.Running()
| Field | Value |
|-------|-------|
| Module path | `forge.lthn.ai/core/go-process` |
| Module path | `dappco.re/go/core/process` |
| Go version | 1.26.0 |
| Licence | EUPL-1.2 |
@ -117,7 +116,7 @@ running := process.Running()
| Module | Purpose |
|--------|---------|
| `forge.lthn.ai/core/go` | Core DI framework (`ServiceRuntime`, `Core.ACTION`, lifecycle interfaces) |
| `dappco.re/go/core` | Core DI framework (`ServiceRuntime`, `Core.ACTION`, lifecycle interfaces) |
| `github.com/stretchr/testify` | Test assertions (test-only) |
The package has no other runtime dependencies beyond the Go standard library

View file

@ -18,6 +18,7 @@ type processStdin interface {
// ManagedProcess represents a tracked external process started by the service.
type ManagedProcess struct {
ID string
PID int
Command string
Args []string
Dir string
@ -36,6 +37,7 @@ type ManagedProcess struct {
mu sync.RWMutex
gracePeriod time.Duration
killGroup bool
lastSignal string
}
// Process is kept as a compatibility alias for ManagedProcess.
@ -46,22 +48,17 @@ func (p *ManagedProcess) Info() ProcessInfo {
p.mu.RLock()
defer p.mu.RUnlock()
pid := 0
if p.cmd != nil && p.cmd.Process != nil {
pid = p.cmd.Process.Pid
}
return ProcessInfo{
ID: p.ID,
Command: p.Command,
Args: p.Args,
Args: append([]string(nil), p.Args...),
Dir: p.Dir,
StartedAt: p.StartedAt,
Running: p.Status == StatusRunning,
Status: p.Status,
ExitCode: p.ExitCode,
Duration: p.Duration,
PID: pid,
PID: p.PID,
}
}
@ -131,6 +128,7 @@ func (p *ManagedProcess) Kill() error {
return nil
}
p.lastSignal = "SIGKILL"
if p.killGroup {
// Kill entire process group (negative PID)
return syscall.Kill(-p.cmd.Process.Pid, syscall.SIGKILL)
@ -181,6 +179,7 @@ func (p *ManagedProcess) terminate() error {
if p.killGroup {
pid = -pid
}
p.lastSignal = "SIGTERM"
return syscall.Kill(pid, syscall.SIGTERM)
}
@ -214,3 +213,9 @@ func (p *ManagedProcess) CloseStdin() error {
p.stdin = nil
return err
}
func (p *ManagedProcess) requestedSignal() string {
p.mu.RLock()
defer p.mu.RUnlock()
return p.lastSignal
}

View file

@ -112,6 +112,8 @@ func TestProcess_Kill_Good(t *testing.T) {
case <-time.After(2 * time.Second):
t.Fatal("process should have been killed")
}
assert.Equal(t, StatusKilled, proc.Status)
assert.Equal(t, -1, proc.ExitCode)
})
t.Run("noop on completed process", func(t *testing.T) {
@ -160,6 +162,7 @@ func TestProcess_Signal_Good(t *testing.T) {
case <-time.After(2 * time.Second):
t.Fatal("process should have been terminated by signal")
}
assert.Equal(t, StatusKilled, proc.Status)
})
t.Run("error on completed process", func(t *testing.T) {
@ -213,6 +216,7 @@ func TestProcess_Timeout_Good(t *testing.T) {
t.Fatal("process should have been killed by timeout")
}
assert.False(t, proc.IsRunning())
assert.Equal(t, StatusKilled, proc.Status)
})
t.Run("no timeout when zero", func(t *testing.T) {
@ -312,5 +316,6 @@ func TestProcess_TimeoutWithGrace_Good(t *testing.T) {
case <-time.After(5 * time.Second):
t.Fatal("process should have been killed by timeout")
}
assert.Equal(t, StatusKilled, proc.Status)
})
}

View file

@ -151,9 +151,9 @@ func (s *Service) StartWithOptions(ctx context.Context, opts RunOptions) core.Re
proc := &ManagedProcess{
ID: id,
Command: opts.Command,
Args: opts.Args,
Args: append([]string(nil), opts.Args...),
Dir: opts.Dir,
Env: opts.Env,
Env: append([]string(nil), opts.Env...),
StartedAt: time.Now(),
Status: StatusRunning,
cmd: cmd,
@ -171,6 +171,7 @@ func (s *Service) StartWithOptions(ctx context.Context, opts RunOptions) core.Re
cancel()
return core.Result{Value: core.E("process.start", core.Concat("command failed: ", opts.Command), err), OK: false}
}
proc.PID = cmd.Process.Pid
// Store process
if r := s.managed.Set(id, proc); !r.OK {
@ -214,36 +215,32 @@ func (s *Service) StartWithOptions(ctx context.Context, opts RunOptions) core.Re
// Wait for process completion
go func() {
wg.Wait()
err := cmd.Wait()
waitErr := cmd.Wait()
duration := time.Since(proc.StartedAt)
status, exitCode, actionErr, killedSignal := classifyProcessExit(proc, waitErr)
proc.mu.Lock()
proc.PID = cmd.Process.Pid
proc.Duration = duration
if err != nil {
var exitErr *exec.ExitError
if core.As(err, &exitErr) {
proc.ExitCode = exitErr.ExitCode()
proc.Status = StatusExited
} else {
proc.Status = StatusFailed
}
} else {
proc.ExitCode = 0
proc.Status = StatusExited
}
status := proc.Status
exitCode := proc.ExitCode
proc.ExitCode = exitCode
proc.Status = status
proc.mu.Unlock()
close(proc.done)
if status == StatusKilled {
_ = s.Core().ACTION(ActionProcessKilled{
ID: id,
Signal: killedSignal,
})
}
s.Core().ACTION(ActionProcessExited{
ID: id,
ExitCode: exitCode,
Duration: duration,
Error: actionErr,
})
_ = status
}()
return core.Result{Value: proc, OK: true}
@ -311,12 +308,6 @@ func (s *Service) Kill(id string) error {
if err := proc.Kill(); err != nil {
return err
}
_ = s.Core().ACTION(ActionProcessKilled{
ID: id,
Signal: "SIGKILL",
})
return nil
}
@ -392,14 +383,10 @@ func (s *Service) handleRun(ctx context.Context, opts core.Options) core.Result
Dir: opts.String("dir"),
}
if r := opts.Get("args"); r.OK {
if args, ok := r.Value.([]string); ok {
runOpts.Args = args
}
runOpts.Args = optionStrings(r.Value)
}
if r := opts.Get("env"); r.OK {
if env, ok := r.Value.([]string); ok {
runOpts.Env = env
}
runOpts.Env = optionStrings(r.Value)
}
return s.runCommand(ctx, runOpts)
@ -417,14 +404,10 @@ func (s *Service) handleStart(ctx context.Context, opts core.Options) core.Resul
Detach: true,
}
if r := opts.Get("args"); r.OK {
if args, ok := r.Value.([]string); ok {
runOpts.Args = args
}
runOpts.Args = optionStrings(r.Value)
}
if r := opts.Get("env"); r.OK {
if env, ok := r.Value.([]string); ok {
runOpts.Env = env
}
runOpts.Env = optionStrings(r.Value)
}
r := s.StartWithOptions(ctx, runOpts)
@ -501,6 +484,9 @@ func (p *ManagedProcess) Signal(sig os.Signal) error {
return nil
}
if signal, ok := sig.(syscall.Signal); ok {
p.lastSignal = normalizeSignalName(signal)
}
return p.cmd.Process.Signal(sig)
}
@ -543,3 +529,75 @@ func (s *Service) handleGet(ctx context.Context, opts core.Options) core.Result
}
return core.Result{Value: proc.Info(), OK: true}
}
func optionStrings(value any) []string {
switch typed := value.(type) {
case nil:
return nil
case []string:
return append([]string(nil), typed...)
case []any:
result := make([]string, 0, len(typed))
for _, item := range typed {
text, ok := item.(string)
if !ok {
return nil
}
result = append(result, text)
}
return result
default:
return nil
}
}
func classifyProcessExit(proc *ManagedProcess, err error) (Status, int, error, string) {
if err == nil {
return StatusExited, 0, nil, ""
}
if sig, ok := processExitSignal(err); ok {
return StatusKilled, -1, err, normalizeSignalName(sig)
}
if ctxErr := proc.ctx.Err(); ctxErr != nil {
signal := proc.requestedSignal()
if signal == "" {
signal = "SIGKILL"
}
return StatusKilled, -1, ctxErr, signal
}
var exitErr *exec.ExitError
if core.As(err, &exitErr) {
return StatusExited, exitErr.ExitCode(), err, ""
}
return StatusFailed, -1, err, ""
}
func processExitSignal(err error) (syscall.Signal, bool) {
var exitErr *exec.ExitError
if !core.As(err, &exitErr) || exitErr.ProcessState == nil {
return 0, false
}
waitStatus, ok := exitErr.ProcessState.Sys().(syscall.WaitStatus)
if !ok || !waitStatus.Signaled() {
return 0, false
}
return waitStatus.Signal(), true
}
func normalizeSignalName(sig syscall.Signal) string {
switch sig {
case syscall.SIGINT:
return "SIGINT"
case syscall.SIGKILL:
return "SIGKILL"
case syscall.SIGTERM:
return "SIGTERM"
default:
return sig.String()
}
}

View file

@ -175,6 +175,46 @@ func TestService_HandleList_Good(t *testing.T) {
}
}
func TestService_HandleGet_Good(t *testing.T) {
svc, c := newStartedTestService(t)
start := c.Action("process.start").Run(context.Background(), framework.NewOptions(
framework.Option{Key: "command", Value: "sleep"},
framework.Option{Key: "args", Value: []string{"60"}},
))
require.True(t, start.OK)
id := start.Value.(string)
r := c.Action("process.get").Run(context.Background(), framework.NewOptions(
framework.Option{Key: "id", Value: id},
))
require.True(t, r.OK)
info := r.Value.(ProcessInfo)
assert.Equal(t, id, info.ID)
assert.Equal(t, "sleep", info.Command)
assert.True(t, info.Running)
assert.Equal(t, StatusRunning, info.Status)
assert.Positive(t, info.PID)
proc, err := svc.Get(id)
require.NoError(t, err)
_ = proc.Kill()
<-proc.Done()
}
func TestService_HandleGet_Bad(t *testing.T) {
_, c := newStartedTestService(t)
missingID := c.Action("process.get").Run(context.Background(), framework.NewOptions())
assert.False(t, missingID.OK)
missingProc := c.Action("process.get").Run(context.Background(), framework.NewOptions(
framework.Option{Key: "id", Value: "missing"},
))
assert.False(t, missingProc.OK)
}
func TestService_Ugly_PermissionModel(t *testing.T) {
c := framework.New()
@ -208,6 +248,7 @@ func TestService_Start_Good(t *testing.T) {
proc := startProc(t, svc, context.Background(), "echo", "hello")
assert.NotEmpty(t, proc.ID)
assert.Positive(t, proc.PID)
assert.Equal(t, "echo", proc.Command)
assert.Equal(t, []string{"hello"}, proc.Args)
@ -388,6 +429,36 @@ func TestService_Actions_Good(t *testing.T) {
assert.Len(t, exited, 1)
assert.Equal(t, 0, exited[0].ExitCode)
})
t.Run("broadcasts killed event", func(t *testing.T) {
svc, c := newTestService(t)
var killed []ActionProcessKilled
var mu sync.Mutex
c.RegisterAction(func(cc *framework.Core, msg framework.Message) framework.Result {
mu.Lock()
defer mu.Unlock()
if m, ok := msg.(ActionProcessKilled); ok {
killed = append(killed, m)
}
return framework.Result{OK: true}
})
proc := startProc(t, svc, context.Background(), "sleep", "60")
err := svc.Kill(proc.ID)
require.NoError(t, err)
<-proc.Done()
time.Sleep(10 * time.Millisecond)
mu.Lock()
defer mu.Unlock()
require.Len(t, killed, 1)
assert.Equal(t, proc.ID, killed[0].ID)
assert.Equal(t, "SIGKILL", killed[0].Signal)
})
}
func TestService_List_Good(t *testing.T) {