From 94b99bfd18a2b54aa702be7561a0bc5260de32f2 Mon Sep 17 00:00:00 2001 From: Virgil Date: Sun, 29 Mar 2026 14:55:24 +0000 Subject: [PATCH] fix(process): align service contract with RFC Co-Authored-By: Virgil --- CLAUDE.md | 6 +- docs/architecture.md | 27 ++++----- docs/development.md | 4 -- docs/index.md | 63 +++++++++++---------- process.go | 19 ++++--- process_test.go | 5 ++ service.go | 130 +++++++++++++++++++++++++++++++------------ service_test.go | 71 +++++++++++++++++++++++ 8 files changed, 228 insertions(+), 97 deletions(-) diff --git a/CLAUDE.md b/CLAUDE.md index 5be5192..caa4aee 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -20,11 +20,11 @@ core go vet # Vet The package has three layers, all in the root `process` package (plus a `exec` subpackage): -### Layer 1: Process Execution (service.go, process.go, process_global.go) +### Layer 1: Process Execution (service.go, process.go) `Service` is a Core service (`*core.ServiceRuntime[Options]`) that manages all `Process` instances. It spawns subprocesses, pipes stdout/stderr through goroutines, captures output to a `RingBuffer`, and broadcasts IPC actions (`ActionProcessStarted`, `ActionProcessOutput`, `ActionProcessExited`, `ActionProcessKilled` — defined in actions.go). -`process_global.go` provides package-level convenience functions (`Start`, `Run`, `Kill`, `List`) that delegate to a global `Service` singleton initialized via `Init(core)`. Follows the same pattern as Go's `i18n` package. +During `OnStartup`, the service registers the named Core actions `process.run`, `process.start`, `process.kill`, `process.list`, and `process.get`. Core's `Process` primitive delegates to those actions, so `c.Process().Run(...)` only works when this service has been registered. ### Layer 2: Daemon Lifecycle (daemon.go, pidfile.go, health.go, registry.go) @@ -45,7 +45,7 @@ Builder-pattern wrapper around `os/exec` with structured logging via a pluggable ## Key Patterns -- **Core integration**: `Service` embeds `*core.ServiceRuntime[Options]` and uses `s.Core().ACTION(...)` to broadcast typed action messages. Tests create a Core instance via `framework.New(framework.WithName("process", NewService(...)))`. +- **Core integration**: `Service` embeds `*core.ServiceRuntime[Options]` and uses `s.Core().ACTION(...)` to broadcast typed action messages. Tests create a Core instance via `framework.New(framework.WithService(Register))`. - **Output capture**: All process output goes through a fixed-size `RingBuffer` (default 1MB). Oldest data is silently overwritten when full. Set `RunOptions.DisableCapture` to skip buffering for long-running processes where output is only streamed via IPC. - **Process lifecycle**: Status transitions are `StatusPending → StatusRunning → StatusExited|StatusFailed|StatusKilled`. The `done` channel closes on exit; use `<-proc.Done()` or `proc.Wait()`. - **Detach / process group isolation**: Set `RunOptions.Detach = true` to run the subprocess in its own process group (`Setpgid`). Detached processes use `context.Background()` so they survive parent context cancellation and parent death. diff --git a/docs/architecture.md b/docs/architecture.md index 4da33a4..9bb13cc 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -60,32 +60,28 @@ participate in the Core DI container and implements both `Startable` and ```go type Service struct { *core.ServiceRuntime[Options] - processes map[string]*Process - mu sync.RWMutex + managed *core.Registry[*ManagedProcess] bufSize int - idCounter atomic.Uint64 } ``` Key behaviours: -- **OnStartup** — currently a no-op; reserved for future initialisation. +- **OnStartup** — registers the named Core actions `process.run`, `process.start`, `process.kill`, `process.list`, and `process.get`. - **OnShutdown** — iterates all running processes and calls `Kill()` on each, ensuring no orphaned child processes when the application exits. -- Process IDs are generated as `proc-N` using an atomic counter, guaranteeing - uniqueness without locks. +- Process IDs are generated with `core.ID()` and stored in a Core registry. #### Registration The service is registered with Core via a factory function: ```go -process.NewService(process.Options{BufferSize: 2 * 1024 * 1024}) +core.New(core.WithService(process.Register)) ``` -`NewService` returns a `func(*core.Core) (any, error)` closure — the standard -Core service factory signature. The `Options` struct is captured by the closure -and applied when Core instantiates the service. +`Register` returns `core.Result{Value: *Service, OK: true}` — the standard +Core `WithService` factory signature used by the v0.8.0 contract. ### Process @@ -163,12 +159,12 @@ const ( When `Service.StartWithOptions()` is called: ``` -1. Generate unique ID (atomic counter) +1. Generate a unique ID with `core.ID()` 2. Create context with cancel 3. Build os/exec.Cmd with dir, env, pipes 4. Create RingBuffer (unless DisableCapture is set) 5. cmd.Start() -6. Store process in map +6. Store process in the Core registry 7. Broadcast ActionProcessStarted via Core.ACTION 8. Spawn 2 goroutines to stream stdout and stderr - Each line is written to the RingBuffer @@ -176,8 +172,9 @@ When `Service.StartWithOptions()` is called: 9. Spawn 1 goroutine to wait for process exit - Waits for output goroutines to finish first - Calls cmd.Wait() - - Updates process status and exit code + - Classifies the exit as exited, failed, or killed - Closes the done channel + - Broadcasts ActionProcessKilled when the process died from a signal - Broadcasts ActionProcessExited ``` @@ -296,12 +293,12 @@ File naming convention: `{code}-{daemon}.json` (slashes replaced with dashes). ## exec Sub-Package -The `exec` package (`forge.lthn.ai/core/go-process/exec`) provides a fluent +The `exec` package (`dappco.re/go/core/process/exec`) provides a fluent wrapper around `os/exec` for simple, one-shot commands that do not need Core integration: ```go -import "forge.lthn.ai/core/go-process/exec" +import "dappco.re/go/core/process/exec" // Fluent API err := exec.Command(ctx, "go", "build", "./..."). diff --git a/docs/development.md b/docs/development.md index d11384f..954bbd0 100644 --- a/docs/development.md +++ b/docs/development.md @@ -101,9 +101,7 @@ go-process/ pidfile.go # PID file single-instance lock pidfile_test.go # PID file tests process.go # Process type and methods - process_global.go # Global singleton and convenience API process_test.go # Process tests - global_test.go # Global API tests (concurrency) registry.go # Daemon registry (JSON file store) registry_test.go # Registry tests runner.go # Pipeline runner (sequential, parallel, DAG) @@ -142,8 +140,6 @@ go-process/ | `ErrProcessNotFound` | No process with the given ID exists in the service | | `ErrProcessNotRunning` | Operation requires a running process (e.g. SendInput, Signal) | | `ErrStdinNotAvailable` | Stdin pipe is nil (already closed or never created) | -| `ErrServiceNotInitialized` | Global convenience function called before `process.Init()` | -| `ServiceError` | Wraps service-level errors with a message string | ## Build Configuration diff --git a/docs/index.md b/docs/index.md index ddc0a5c..333a7ea 100644 --- a/docs/index.md +++ b/docs/index.md @@ -5,10 +5,10 @@ description: Process management with Core IPC integration for Go applications. # go-process -`forge.lthn.ai/core/go-process` is a process management library that provides +`dappco.re/go/core/process` is a process management library that provides spawning, monitoring, and controlling external processes with real-time output streaming via the Core ACTION (IPC) system. It integrates directly with the -[Core DI framework](https://forge.lthn.ai/core/go) as a first-class service. +[Core DI framework](https://dappco.re/go/core) as a first-class service. ## Features @@ -28,22 +28,17 @@ streaming via the Core ACTION (IPC) system. It integrates directly with the ```go import ( "context" - framework "forge.lthn.ai/core/go/pkg/core" - "forge.lthn.ai/core/go-process" + "dappco.re/go/core" + "dappco.re/go/core/process" ) -// Create a Core instance with the process service -c, err := framework.New( - framework.WithName("process", process.NewService(process.Options{})), -) -if err != nil { - log.Fatal(err) -} +// Create a Core instance with the process service registered. +c := core.New(core.WithService(process.Register)) // Retrieve the typed service -svc, err := framework.ServiceFor[*process.Service](c, "process") -if err != nil { - log.Fatal(err) +svc, ok := core.ServiceFor[*process.Service](c, "process") +if !ok { + panic("process service not registered") } ``` @@ -51,15 +46,19 @@ if err != nil { ```go // Fire-and-forget (async) -proc, err := svc.Start(ctx, "go", "test", "./...") -if err != nil { - return err +start := svc.Start(ctx, "go", "test", "./...") +if !start.OK { + return start.Value.(error) } +proc := start.Value.(*process.Process) <-proc.Done() fmt.Println(proc.Output()) // Synchronous convenience -output, err := svc.Run(ctx, "echo", "hello world") +run := svc.Run(ctx, "echo", "hello world") +if run.OK { + fmt.Println(run.Value.(string)) +} ``` ### Listen for Events @@ -67,7 +66,7 @@ output, err := svc.Run(ctx, "echo", "hello world") Process lifecycle events are broadcast through Core's ACTION system: ```go -c.RegisterAction(func(c *framework.Core, msg framework.Message) error { +c.RegisterAction(func(c *core.Core, msg core.Message) core.Result { switch m := msg.(type) { case process.ActionProcessStarted: fmt.Printf("Started: %s (PID %d)\n", m.Command, m.PID) @@ -78,24 +77,24 @@ c.RegisterAction(func(c *framework.Core, msg framework.Message) error { case process.ActionProcessKilled: fmt.Printf("Killed with %s\n", m.Signal) } - return nil + return core.Result{OK: true} }) ``` -### Global Convenience API +### Permission Model -For applications that only need a single process service, a global singleton -is available: +Core's process primitive delegates to named actions registered by this module. +Without `process.Register`, `c.Process().Run(...)` fails with `OK: false`. ```go -// Initialise once at startup -process.Init(coreInstance) +c := core.New() +r := c.Process().Run(ctx, "echo", "blocked") +fmt.Println(r.OK) // false -// Then use package-level functions anywhere -proc, _ := process.Start(ctx, "ls", "-la") -output, _ := process.Run(ctx, "date") -procs := process.List() -running := process.Running() +c = core.New(core.WithService(process.Register)) +_ = c.ServiceStartup(ctx, nil) +r = c.Process().Run(ctx, "echo", "allowed") +fmt.Println(r.OK) // true ``` ## Package Layout @@ -109,7 +108,7 @@ running := process.Running() | Field | Value | |-------|-------| -| Module path | `forge.lthn.ai/core/go-process` | +| Module path | `dappco.re/go/core/process` | | Go version | 1.26.0 | | Licence | EUPL-1.2 | @@ -117,7 +116,7 @@ running := process.Running() | Module | Purpose | |--------|---------| -| `forge.lthn.ai/core/go` | Core DI framework (`ServiceRuntime`, `Core.ACTION`, lifecycle interfaces) | +| `dappco.re/go/core` | Core DI framework (`ServiceRuntime`, `Core.ACTION`, lifecycle interfaces) | | `github.com/stretchr/testify` | Test assertions (test-only) | The package has no other runtime dependencies beyond the Go standard library diff --git a/process.go b/process.go index ad0748e..ced44e3 100644 --- a/process.go +++ b/process.go @@ -18,6 +18,7 @@ type processStdin interface { // ManagedProcess represents a tracked external process started by the service. type ManagedProcess struct { ID string + PID int Command string Args []string Dir string @@ -36,6 +37,7 @@ type ManagedProcess struct { mu sync.RWMutex gracePeriod time.Duration killGroup bool + lastSignal string } // Process is kept as a compatibility alias for ManagedProcess. @@ -46,22 +48,17 @@ func (p *ManagedProcess) Info() ProcessInfo { p.mu.RLock() defer p.mu.RUnlock() - pid := 0 - if p.cmd != nil && p.cmd.Process != nil { - pid = p.cmd.Process.Pid - } - return ProcessInfo{ ID: p.ID, Command: p.Command, - Args: p.Args, + Args: append([]string(nil), p.Args...), Dir: p.Dir, StartedAt: p.StartedAt, Running: p.Status == StatusRunning, Status: p.Status, ExitCode: p.ExitCode, Duration: p.Duration, - PID: pid, + PID: p.PID, } } @@ -131,6 +128,7 @@ func (p *ManagedProcess) Kill() error { return nil } + p.lastSignal = "SIGKILL" if p.killGroup { // Kill entire process group (negative PID) return syscall.Kill(-p.cmd.Process.Pid, syscall.SIGKILL) @@ -181,6 +179,7 @@ func (p *ManagedProcess) terminate() error { if p.killGroup { pid = -pid } + p.lastSignal = "SIGTERM" return syscall.Kill(pid, syscall.SIGTERM) } @@ -214,3 +213,9 @@ func (p *ManagedProcess) CloseStdin() error { p.stdin = nil return err } + +func (p *ManagedProcess) requestedSignal() string { + p.mu.RLock() + defer p.mu.RUnlock() + return p.lastSignal +} diff --git a/process_test.go b/process_test.go index 711b60b..51dac44 100644 --- a/process_test.go +++ b/process_test.go @@ -112,6 +112,8 @@ func TestProcess_Kill_Good(t *testing.T) { case <-time.After(2 * time.Second): t.Fatal("process should have been killed") } + assert.Equal(t, StatusKilled, proc.Status) + assert.Equal(t, -1, proc.ExitCode) }) t.Run("noop on completed process", func(t *testing.T) { @@ -160,6 +162,7 @@ func TestProcess_Signal_Good(t *testing.T) { case <-time.After(2 * time.Second): t.Fatal("process should have been terminated by signal") } + assert.Equal(t, StatusKilled, proc.Status) }) t.Run("error on completed process", func(t *testing.T) { @@ -213,6 +216,7 @@ func TestProcess_Timeout_Good(t *testing.T) { t.Fatal("process should have been killed by timeout") } assert.False(t, proc.IsRunning()) + assert.Equal(t, StatusKilled, proc.Status) }) t.Run("no timeout when zero", func(t *testing.T) { @@ -312,5 +316,6 @@ func TestProcess_TimeoutWithGrace_Good(t *testing.T) { case <-time.After(5 * time.Second): t.Fatal("process should have been killed by timeout") } + assert.Equal(t, StatusKilled, proc.Status) }) } diff --git a/service.go b/service.go index 619e225..5edbc49 100644 --- a/service.go +++ b/service.go @@ -151,9 +151,9 @@ func (s *Service) StartWithOptions(ctx context.Context, opts RunOptions) core.Re proc := &ManagedProcess{ ID: id, Command: opts.Command, - Args: opts.Args, + Args: append([]string(nil), opts.Args...), Dir: opts.Dir, - Env: opts.Env, + Env: append([]string(nil), opts.Env...), StartedAt: time.Now(), Status: StatusRunning, cmd: cmd, @@ -171,6 +171,7 @@ func (s *Service) StartWithOptions(ctx context.Context, opts RunOptions) core.Re cancel() return core.Result{Value: core.E("process.start", core.Concat("command failed: ", opts.Command), err), OK: false} } + proc.PID = cmd.Process.Pid // Store process if r := s.managed.Set(id, proc); !r.OK { @@ -214,36 +215,32 @@ func (s *Service) StartWithOptions(ctx context.Context, opts RunOptions) core.Re // Wait for process completion go func() { wg.Wait() - err := cmd.Wait() + waitErr := cmd.Wait() duration := time.Since(proc.StartedAt) + status, exitCode, actionErr, killedSignal := classifyProcessExit(proc, waitErr) proc.mu.Lock() + proc.PID = cmd.Process.Pid proc.Duration = duration - if err != nil { - var exitErr *exec.ExitError - if core.As(err, &exitErr) { - proc.ExitCode = exitErr.ExitCode() - proc.Status = StatusExited - } else { - proc.Status = StatusFailed - } - } else { - proc.ExitCode = 0 - proc.Status = StatusExited - } - status := proc.Status - exitCode := proc.ExitCode + proc.ExitCode = exitCode + proc.Status = status proc.mu.Unlock() close(proc.done) + if status == StatusKilled { + _ = s.Core().ACTION(ActionProcessKilled{ + ID: id, + Signal: killedSignal, + }) + } s.Core().ACTION(ActionProcessExited{ ID: id, ExitCode: exitCode, Duration: duration, + Error: actionErr, }) - _ = status }() return core.Result{Value: proc, OK: true} @@ -311,12 +308,6 @@ func (s *Service) Kill(id string) error { if err := proc.Kill(); err != nil { return err } - - _ = s.Core().ACTION(ActionProcessKilled{ - ID: id, - Signal: "SIGKILL", - }) - return nil } @@ -392,14 +383,10 @@ func (s *Service) handleRun(ctx context.Context, opts core.Options) core.Result Dir: opts.String("dir"), } if r := opts.Get("args"); r.OK { - if args, ok := r.Value.([]string); ok { - runOpts.Args = args - } + runOpts.Args = optionStrings(r.Value) } if r := opts.Get("env"); r.OK { - if env, ok := r.Value.([]string); ok { - runOpts.Env = env - } + runOpts.Env = optionStrings(r.Value) } return s.runCommand(ctx, runOpts) @@ -417,14 +404,10 @@ func (s *Service) handleStart(ctx context.Context, opts core.Options) core.Resul Detach: true, } if r := opts.Get("args"); r.OK { - if args, ok := r.Value.([]string); ok { - runOpts.Args = args - } + runOpts.Args = optionStrings(r.Value) } if r := opts.Get("env"); r.OK { - if env, ok := r.Value.([]string); ok { - runOpts.Env = env - } + runOpts.Env = optionStrings(r.Value) } r := s.StartWithOptions(ctx, runOpts) @@ -501,6 +484,9 @@ func (p *ManagedProcess) Signal(sig os.Signal) error { return nil } + if signal, ok := sig.(syscall.Signal); ok { + p.lastSignal = normalizeSignalName(signal) + } return p.cmd.Process.Signal(sig) } @@ -543,3 +529,75 @@ func (s *Service) handleGet(ctx context.Context, opts core.Options) core.Result } return core.Result{Value: proc.Info(), OK: true} } + +func optionStrings(value any) []string { + switch typed := value.(type) { + case nil: + return nil + case []string: + return append([]string(nil), typed...) + case []any: + result := make([]string, 0, len(typed)) + for _, item := range typed { + text, ok := item.(string) + if !ok { + return nil + } + result = append(result, text) + } + return result + default: + return nil + } +} + +func classifyProcessExit(proc *ManagedProcess, err error) (Status, int, error, string) { + if err == nil { + return StatusExited, 0, nil, "" + } + + if sig, ok := processExitSignal(err); ok { + return StatusKilled, -1, err, normalizeSignalName(sig) + } + + if ctxErr := proc.ctx.Err(); ctxErr != nil { + signal := proc.requestedSignal() + if signal == "" { + signal = "SIGKILL" + } + return StatusKilled, -1, ctxErr, signal + } + + var exitErr *exec.ExitError + if core.As(err, &exitErr) { + return StatusExited, exitErr.ExitCode(), err, "" + } + + return StatusFailed, -1, err, "" +} + +func processExitSignal(err error) (syscall.Signal, bool) { + var exitErr *exec.ExitError + if !core.As(err, &exitErr) || exitErr.ProcessState == nil { + return 0, false + } + + waitStatus, ok := exitErr.ProcessState.Sys().(syscall.WaitStatus) + if !ok || !waitStatus.Signaled() { + return 0, false + } + return waitStatus.Signal(), true +} + +func normalizeSignalName(sig syscall.Signal) string { + switch sig { + case syscall.SIGINT: + return "SIGINT" + case syscall.SIGKILL: + return "SIGKILL" + case syscall.SIGTERM: + return "SIGTERM" + default: + return sig.String() + } +} diff --git a/service_test.go b/service_test.go index d6a2ece..98ffbcf 100644 --- a/service_test.go +++ b/service_test.go @@ -175,6 +175,46 @@ func TestService_HandleList_Good(t *testing.T) { } } +func TestService_HandleGet_Good(t *testing.T) { + svc, c := newStartedTestService(t) + + start := c.Action("process.start").Run(context.Background(), framework.NewOptions( + framework.Option{Key: "command", Value: "sleep"}, + framework.Option{Key: "args", Value: []string{"60"}}, + )) + require.True(t, start.OK) + + id := start.Value.(string) + r := c.Action("process.get").Run(context.Background(), framework.NewOptions( + framework.Option{Key: "id", Value: id}, + )) + require.True(t, r.OK) + + info := r.Value.(ProcessInfo) + assert.Equal(t, id, info.ID) + assert.Equal(t, "sleep", info.Command) + assert.True(t, info.Running) + assert.Equal(t, StatusRunning, info.Status) + assert.Positive(t, info.PID) + + proc, err := svc.Get(id) + require.NoError(t, err) + _ = proc.Kill() + <-proc.Done() +} + +func TestService_HandleGet_Bad(t *testing.T) { + _, c := newStartedTestService(t) + + missingID := c.Action("process.get").Run(context.Background(), framework.NewOptions()) + assert.False(t, missingID.OK) + + missingProc := c.Action("process.get").Run(context.Background(), framework.NewOptions( + framework.Option{Key: "id", Value: "missing"}, + )) + assert.False(t, missingProc.OK) +} + func TestService_Ugly_PermissionModel(t *testing.T) { c := framework.New() @@ -208,6 +248,7 @@ func TestService_Start_Good(t *testing.T) { proc := startProc(t, svc, context.Background(), "echo", "hello") assert.NotEmpty(t, proc.ID) + assert.Positive(t, proc.PID) assert.Equal(t, "echo", proc.Command) assert.Equal(t, []string{"hello"}, proc.Args) @@ -388,6 +429,36 @@ func TestService_Actions_Good(t *testing.T) { assert.Len(t, exited, 1) assert.Equal(t, 0, exited[0].ExitCode) }) + + t.Run("broadcasts killed event", func(t *testing.T) { + svc, c := newTestService(t) + + var killed []ActionProcessKilled + var mu sync.Mutex + + c.RegisterAction(func(cc *framework.Core, msg framework.Message) framework.Result { + mu.Lock() + defer mu.Unlock() + if m, ok := msg.(ActionProcessKilled); ok { + killed = append(killed, m) + } + return framework.Result{OK: true} + }) + + proc := startProc(t, svc, context.Background(), "sleep", "60") + err := svc.Kill(proc.ID) + require.NoError(t, err) + <-proc.Done() + + time.Sleep(10 * time.Millisecond) + + mu.Lock() + defer mu.Unlock() + + require.Len(t, killed, 1) + assert.Equal(t, proc.ID, killed[0].ID) + assert.Equal(t, "SIGKILL", killed[0].Signal) + }) } func TestService_List_Good(t *testing.T) {