feat(v0.8.0): Result-native process service

- Register factory returns core.Result
- OnStartup/OnShutdown return core.Result
- Start/StartWithOptions/Run/RunWithOptions all return core.Result
- 5 named Action handlers (process.run/start/kill/list/get)
- core.ID() replaces fmt.Sprintf for process IDs
- core.As replaces errors.As, core.Sprintf replaces fmt.Sprintf
- handleRun returns output as Value (string) always, OK = exit 0

Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
Snider 2026-03-26 01:28:03 +00:00
parent 5d316650e2
commit 1b4efe0f67
7 changed files with 332 additions and 291 deletions

View file

@ -11,7 +11,6 @@ import (
)
func TestGlobal_DefaultNotInitialized(t *testing.T) {
// Reset global state for this test
old := defaultService.Swap(nil)
defer func() {
if old != nil {
@ -21,13 +20,13 @@ func TestGlobal_DefaultNotInitialized(t *testing.T) {
assert.Nil(t, Default())
_, err := Start(context.Background(), "echo", "test")
assert.ErrorIs(t, err, ErrServiceNotInitialized)
r := Start(context.Background(), "echo", "test")
assert.False(t, r.OK)
_, err = Run(context.Background(), "echo", "test")
assert.ErrorIs(t, err, ErrServiceNotInitialized)
r = Run(context.Background(), "echo", "test")
assert.False(t, r.OK)
_, err = Get("proc-1")
_, err := Get("proc-1")
assert.ErrorIs(t, err, ErrServiceNotInitialized)
assert.Nil(t, List())
@ -36,11 +35,11 @@ func TestGlobal_DefaultNotInitialized(t *testing.T) {
err = Kill("proc-1")
assert.ErrorIs(t, err, ErrServiceNotInitialized)
_, err = StartWithOptions(context.Background(), RunOptions{Command: "echo"})
assert.ErrorIs(t, err, ErrServiceNotInitialized)
r = StartWithOptions(context.Background(), RunOptions{Command: "echo"})
assert.False(t, r.OK)
_, err = RunWithOptions(context.Background(), RunOptions{Command: "echo"})
assert.ErrorIs(t, err, ErrServiceNotInitialized)
r = RunWithOptions(context.Background(), RunOptions{Command: "echo"})
assert.False(t, r.OK)
}
func newGlobalTestService(t *testing.T) *Service {
@ -62,7 +61,6 @@ func TestGlobal_SetDefault(t *testing.T) {
}()
svc := newGlobalTestService(t)
err := SetDefault(svc)
require.NoError(t, err)
assert.Equal(t, svc, Default())
@ -83,7 +81,6 @@ func TestGlobal_ConcurrentDefault(t *testing.T) {
}()
svc := newGlobalTestService(t)
err := SetDefault(svc)
require.NoError(t, err)
@ -146,7 +143,6 @@ func TestGlobal_ConcurrentOperations(t *testing.T) {
}()
svc := newGlobalTestService(t)
err := SetDefault(svc)
require.NoError(t, err)
@ -158,10 +154,10 @@ func TestGlobal_ConcurrentOperations(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
proc, err := Start(context.Background(), "echo", "concurrent")
if err == nil {
r := Start(context.Background(), "echo", "concurrent")
if r.OK {
procMu.Lock()
processes = append(processes, proc)
processes = append(processes, r.Value.(*Process))
procMu.Unlock()
}
}()
@ -201,7 +197,6 @@ func TestGlobal_ConcurrentOperations(t *testing.T) {
func TestGlobal_StartWithOptions(t *testing.T) {
svc, _ := newTestService(t)
old := defaultService.Swap(svc)
defer func() {
if old != nil {
@ -209,21 +204,20 @@ func TestGlobal_StartWithOptions(t *testing.T) {
}
}()
proc, err := StartWithOptions(context.Background(), RunOptions{
r := StartWithOptions(context.Background(), RunOptions{
Command: "echo",
Args: []string{"with", "options"},
})
require.NoError(t, err)
require.True(t, r.OK)
proc := r.Value.(*Process)
<-proc.Done()
assert.Equal(t, 0, proc.ExitCode)
assert.Contains(t, proc.Output(), "with options")
}
func TestGlobal_RunWithOptions(t *testing.T) {
svc, _ := newTestService(t)
old := defaultService.Swap(svc)
defer func() {
if old != nil {
@ -231,17 +225,16 @@ func TestGlobal_RunWithOptions(t *testing.T) {
}
}()
output, err := RunWithOptions(context.Background(), RunOptions{
r := RunWithOptions(context.Background(), RunOptions{
Command: "echo",
Args: []string{"run", "options"},
})
require.NoError(t, err)
assert.Contains(t, output, "run options")
assert.True(t, r.OK)
assert.Contains(t, r.Value.(string), "run options")
}
func TestGlobal_Running(t *testing.T) {
svc, _ := newTestService(t)
old := defaultService.Swap(svc)
defer func() {
if old != nil {
@ -252,8 +245,9 @@ func TestGlobal_Running(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
proc, err := Start(ctx, "sleep", "60")
require.NoError(t, err)
r := Start(ctx, "sleep", "60")
require.True(t, r.OK)
proc := r.Value.(*Process)
running := Running()
assert.Len(t, running, 1)

20
go.mod
View file

@ -3,16 +3,21 @@ module dappco.re/go/core/process
go 1.26.0
require (
dappco.re/go/core v0.4.7
dappco.re/go/core/io v0.1.7
dappco.re/go/core/log v0.0.4
dappco.re/go/core/ws v0.2.4
dappco.re/go/core v0.5.0
dappco.re/go/core/io v0.2.0
dappco.re/go/core/log v0.1.0
dappco.re/go/core/ws v0.3.0
forge.lthn.ai/core/api v0.1.5
github.com/gin-gonic/gin v1.12.0
github.com/stretchr/testify v1.11.1
)
require (
dappco.re/go/core/api v0.2.0
dappco.re/go/core/i18n v0.2.0
dappco.re/go/core/process v0.3.0
dappco.re/go/core/scm v0.4.0
dappco.re/go/core/store v0.2.0
forge.lthn.ai/core/go-io v0.1.5 // indirect
forge.lthn.ai/core/go-log v0.0.4 // indirect
github.com/99designs/gqlgen v0.17.88 // indirect
@ -108,10 +113,3 @@ require (
google.golang.org/protobuf v1.36.11 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
replace (
dappco.re/go/core => ../go
dappco.re/go/core/io => ../go-io
dappco.re/go/core/log => ../go-log
dappco.re/go/core/ws => ../go-ws
)

View file

@ -50,19 +50,19 @@ func Init(c *core.Core) error {
// --- Global convenience functions ---
// Start spawns a new process using the default service.
func Start(ctx context.Context, command string, args ...string) (*Process, error) {
func Start(ctx context.Context, command string, args ...string) core.Result {
svc := Default()
if svc == nil {
return nil, ErrServiceNotInitialized
return core.Result{OK: false}
}
return svc.Start(ctx, command, args...)
}
// Run executes a command and waits for completion using the default service.
func Run(ctx context.Context, command string, args ...string) (string, error) {
func Run(ctx context.Context, command string, args ...string) core.Result {
svc := Default()
if svc == nil {
return "", ErrServiceNotInitialized
return core.Result{Value: "", OK: false}
}
return svc.Run(ctx, command, args...)
}
@ -95,19 +95,19 @@ func Kill(id string) error {
}
// StartWithOptions spawns a process with full configuration using the default service.
func StartWithOptions(ctx context.Context, opts RunOptions) (*Process, error) {
func StartWithOptions(ctx context.Context, opts RunOptions) core.Result {
svc := Default()
if svc == nil {
return nil, ErrServiceNotInitialized
return core.Result{OK: false}
}
return svc.StartWithOptions(ctx, opts)
}
// RunWithOptions executes a command with options and waits using the default service.
func RunWithOptions(ctx context.Context, opts RunOptions) (string, error) {
func RunWithOptions(ctx context.Context, opts RunOptions) core.Result {
svc := Default()
if svc == nil {
return "", ErrServiceNotInitialized
return core.Result{Value: "", OK: false}
}
return svc.RunWithOptions(ctx, opts)
}

View file

@ -13,8 +13,7 @@ import (
func TestProcess_Info(t *testing.T) {
svc, _ := newTestService(t)
proc, err := svc.Start(context.Background(), "echo", "hello")
require.NoError(t, err)
proc := startProc(t, svc, context.Background(), "echo", "hello")
<-proc.Done()
@ -30,24 +29,15 @@ func TestProcess_Info(t *testing.T) {
func TestProcess_Output(t *testing.T) {
t.Run("captures stdout", func(t *testing.T) {
svc, _ := newTestService(t)
proc, err := svc.Start(context.Background(), "echo", "hello world")
require.NoError(t, err)
proc := startProc(t, svc, context.Background(), "echo", "hello world")
<-proc.Done()
output := proc.Output()
assert.Contains(t, output, "hello world")
assert.Contains(t, proc.Output(), "hello world")
})
t.Run("OutputBytes returns copy", func(t *testing.T) {
svc, _ := newTestService(t)
proc, err := svc.Start(context.Background(), "echo", "test")
require.NoError(t, err)
proc := startProc(t, svc, context.Background(), "echo", "test")
<-proc.Done()
bytes := proc.OutputBytes()
assert.NotNil(t, bytes)
assert.Contains(t, string(bytes), "test")
@ -57,29 +47,21 @@ func TestProcess_Output(t *testing.T) {
func TestProcess_IsRunning(t *testing.T) {
t.Run("true while running", func(t *testing.T) {
svc, _ := newTestService(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
proc, err := svc.Start(ctx, "sleep", "10")
require.NoError(t, err)
proc := startProc(t, svc, ctx, "sleep", "10")
assert.True(t, proc.IsRunning())
cancel()
<-proc.Done()
assert.False(t, proc.IsRunning())
})
t.Run("false after completion", func(t *testing.T) {
svc, _ := newTestService(t)
proc, err := svc.Start(context.Background(), "echo", "done")
require.NoError(t, err)
proc := startProc(t, svc, context.Background(), "echo", "done")
<-proc.Done()
assert.False(t, proc.IsRunning())
})
}
@ -87,21 +69,15 @@ func TestProcess_IsRunning(t *testing.T) {
func TestProcess_Wait(t *testing.T) {
t.Run("returns nil on success", func(t *testing.T) {
svc, _ := newTestService(t)
proc, err := svc.Start(context.Background(), "echo", "ok")
require.NoError(t, err)
err = proc.Wait()
proc := startProc(t, svc, context.Background(), "echo", "ok")
err := proc.Wait()
assert.NoError(t, err)
})
t.Run("returns error on failure", func(t *testing.T) {
svc, _ := newTestService(t)
proc, err := svc.Start(context.Background(), "sh", "-c", "exit 1")
require.NoError(t, err)
err = proc.Wait()
proc := startProc(t, svc, context.Background(), "sh", "-c", "exit 1")
err := proc.Wait()
assert.Error(t, err)
})
}
@ -109,13 +85,10 @@ func TestProcess_Wait(t *testing.T) {
func TestProcess_Done(t *testing.T) {
t.Run("channel closes on completion", func(t *testing.T) {
svc, _ := newTestService(t)
proc, err := svc.Start(context.Background(), "echo", "test")
require.NoError(t, err)
proc := startProc(t, svc, context.Background(), "echo", "test")
select {
case <-proc.Done():
// Success - channel closed
case <-time.After(5 * time.Second):
t.Fatal("Done channel should have closed")
}
@ -125,21 +98,17 @@ func TestProcess_Done(t *testing.T) {
func TestProcess_Kill(t *testing.T) {
t.Run("terminates running process", func(t *testing.T) {
svc, _ := newTestService(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
proc, err := svc.Start(ctx, "sleep", "60")
require.NoError(t, err)
proc := startProc(t, svc, ctx, "sleep", "60")
assert.True(t, proc.IsRunning())
err = proc.Kill()
err := proc.Kill()
assert.NoError(t, err)
select {
case <-proc.Done():
// Good - process terminated
case <-time.After(2 * time.Second):
t.Fatal("process should have been killed")
}
@ -147,13 +116,9 @@ func TestProcess_Kill(t *testing.T) {
t.Run("noop on completed process", func(t *testing.T) {
svc, _ := newTestService(t)
proc, err := svc.Start(context.Background(), "echo", "done")
require.NoError(t, err)
proc := startProc(t, svc, context.Background(), "echo", "done")
<-proc.Done()
err = proc.Kill()
err := proc.Kill()
assert.NoError(t, err)
})
}
@ -161,31 +126,21 @@ func TestProcess_Kill(t *testing.T) {
func TestProcess_SendInput(t *testing.T) {
t.Run("writes to stdin", func(t *testing.T) {
svc, _ := newTestService(t)
proc := startProc(t, svc, context.Background(), "cat")
// Use cat to echo back stdin
proc, err := svc.Start(context.Background(), "cat")
require.NoError(t, err)
err = proc.SendInput("hello\n")
err := proc.SendInput("hello\n")
assert.NoError(t, err)
err = proc.CloseStdin()
assert.NoError(t, err)
<-proc.Done()
assert.Contains(t, proc.Output(), "hello")
})
t.Run("error on completed process", func(t *testing.T) {
svc, _ := newTestService(t)
proc, err := svc.Start(context.Background(), "echo", "done")
require.NoError(t, err)
proc := startProc(t, svc, context.Background(), "echo", "done")
<-proc.Done()
err = proc.SendInput("test")
err := proc.SendInput("test")
assert.ErrorIs(t, err, ErrProcessNotRunning)
})
}
@ -193,19 +148,15 @@ func TestProcess_SendInput(t *testing.T) {
func TestProcess_Signal(t *testing.T) {
t.Run("sends signal to running process", func(t *testing.T) {
svc, _ := newTestService(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
proc, err := svc.Start(ctx, "sleep", "60")
require.NoError(t, err)
err = proc.Signal(os.Interrupt)
proc := startProc(t, svc, ctx, "sleep", "60")
err := proc.Signal(os.Interrupt)
assert.NoError(t, err)
select {
case <-proc.Done():
// Process terminated by signal
case <-time.After(2 * time.Second):
t.Fatal("process should have been terminated by signal")
}
@ -213,12 +164,9 @@ func TestProcess_Signal(t *testing.T) {
t.Run("error on completed process", func(t *testing.T) {
svc, _ := newTestService(t)
proc, err := svc.Start(context.Background(), "echo", "done")
require.NoError(t, err)
proc := startProc(t, svc, context.Background(), "echo", "done")
<-proc.Done()
err = proc.Signal(os.Interrupt)
err := proc.Signal(os.Interrupt)
assert.ErrorIs(t, err, ErrProcessNotRunning)
})
}
@ -226,17 +174,12 @@ func TestProcess_Signal(t *testing.T) {
func TestProcess_CloseStdin(t *testing.T) {
t.Run("closes stdin pipe", func(t *testing.T) {
svc, _ := newTestService(t)
proc, err := svc.Start(context.Background(), "cat")
require.NoError(t, err)
err = proc.CloseStdin()
proc := startProc(t, svc, context.Background(), "cat")
err := proc.CloseStdin()
assert.NoError(t, err)
// Process should exit now that stdin is closed
select {
case <-proc.Done():
// Good
case <-time.After(2 * time.Second):
t.Fatal("cat should exit when stdin is closed")
}
@ -244,17 +187,10 @@ func TestProcess_CloseStdin(t *testing.T) {
t.Run("double close is safe", func(t *testing.T) {
svc, _ := newTestService(t)
proc, err := svc.Start(context.Background(), "cat")
require.NoError(t, err)
// First close
err = proc.CloseStdin()
proc := startProc(t, svc, context.Background(), "cat")
err := proc.CloseStdin()
assert.NoError(t, err)
<-proc.Done()
// Second close should be safe (stdin already nil)
err = proc.CloseStdin()
assert.NoError(t, err)
})
@ -263,34 +199,31 @@ func TestProcess_CloseStdin(t *testing.T) {
func TestProcess_Timeout(t *testing.T) {
t.Run("kills process after timeout", func(t *testing.T) {
svc, _ := newTestService(t)
proc, err := svc.StartWithOptions(context.Background(), RunOptions{
r := svc.StartWithOptions(context.Background(), RunOptions{
Command: "sleep",
Args: []string{"60"},
Timeout: 200 * time.Millisecond,
})
require.NoError(t, err)
require.True(t, r.OK)
proc := r.Value.(*Process)
select {
case <-proc.Done():
// Good — process was killed by timeout
case <-time.After(5 * time.Second):
t.Fatal("process should have been killed by timeout")
}
assert.False(t, proc.IsRunning())
})
t.Run("no timeout when zero", func(t *testing.T) {
svc, _ := newTestService(t)
proc, err := svc.StartWithOptions(context.Background(), RunOptions{
r := svc.StartWithOptions(context.Background(), RunOptions{
Command: "echo",
Args: []string{"fast"},
Timeout: 0,
})
require.NoError(t, err)
require.True(t, r.OK)
proc := r.Value.(*Process)
<-proc.Done()
assert.Equal(t, 0, proc.ExitCode)
})
@ -299,23 +232,20 @@ func TestProcess_Timeout(t *testing.T) {
func TestProcess_Shutdown(t *testing.T) {
t.Run("graceful with grace period", func(t *testing.T) {
svc, _ := newTestService(t)
// Use a process that traps SIGTERM
proc, err := svc.StartWithOptions(context.Background(), RunOptions{
r := svc.StartWithOptions(context.Background(), RunOptions{
Command: "sleep",
Args: []string{"60"},
GracePeriod: 100 * time.Millisecond,
})
require.NoError(t, err)
require.True(t, r.OK)
proc := r.Value.(*Process)
assert.True(t, proc.IsRunning())
err = proc.Shutdown()
err := proc.Shutdown()
assert.NoError(t, err)
select {
case <-proc.Done():
// Good
case <-time.After(5 * time.Second):
t.Fatal("shutdown should have completed")
}
@ -323,19 +253,18 @@ func TestProcess_Shutdown(t *testing.T) {
t.Run("immediate kill without grace period", func(t *testing.T) {
svc, _ := newTestService(t)
proc, err := svc.StartWithOptions(context.Background(), RunOptions{
r := svc.StartWithOptions(context.Background(), RunOptions{
Command: "sleep",
Args: []string{"60"},
})
require.NoError(t, err)
require.True(t, r.OK)
proc := r.Value.(*Process)
err = proc.Shutdown()
err := proc.Shutdown()
assert.NoError(t, err)
select {
case <-proc.Done():
// Good
case <-time.After(2 * time.Second):
t.Fatal("kill should be immediate")
}
@ -345,25 +274,21 @@ func TestProcess_Shutdown(t *testing.T) {
func TestProcess_KillGroup(t *testing.T) {
t.Run("kills child processes", func(t *testing.T) {
svc, _ := newTestService(t)
// Spawn a parent that spawns a child — KillGroup should kill both
proc, err := svc.StartWithOptions(context.Background(), RunOptions{
r := svc.StartWithOptions(context.Background(), RunOptions{
Command: "sh",
Args: []string{"-c", "sleep 60 & wait"},
Detach: true,
KillGroup: true,
})
require.NoError(t, err)
require.True(t, r.OK)
proc := r.Value.(*Process)
// Give child time to spawn
time.Sleep(100 * time.Millisecond)
err = proc.Kill()
err := proc.Kill()
assert.NoError(t, err)
select {
case <-proc.Done():
// Good — whole group killed
case <-time.After(5 * time.Second):
t.Fatal("process group should have been killed")
}
@ -373,18 +298,17 @@ func TestProcess_KillGroup(t *testing.T) {
func TestProcess_TimeoutWithGrace(t *testing.T) {
t.Run("timeout triggers graceful shutdown", func(t *testing.T) {
svc, _ := newTestService(t)
proc, err := svc.StartWithOptions(context.Background(), RunOptions{
r := svc.StartWithOptions(context.Background(), RunOptions{
Command: "sleep",
Args: []string{"60"},
Timeout: 200 * time.Millisecond,
GracePeriod: 100 * time.Millisecond,
})
require.NoError(t, err)
require.True(t, r.OK)
proc := r.Value.(*Process)
select {
case <-proc.Done():
// Good — timeout + grace triggered
case <-time.After(5 * time.Second):
t.Fatal("process should have been killed by timeout")
}

View file

@ -193,21 +193,21 @@ func (r *Runner) canRun(spec RunSpec, completed map[string]*RunResult) bool {
func (r *Runner) runSpec(ctx context.Context, spec RunSpec) RunResult {
start := time.Now()
proc, err := r.service.StartWithOptions(ctx, RunOptions{
sr := r.service.StartWithOptions(ctx, RunOptions{
Command: spec.Command,
Args: spec.Args,
Dir: spec.Dir,
Env: spec.Env,
})
if err != nil {
if !sr.OK {
return RunResult{
Name: spec.Name,
Spec: spec,
Duration: time.Since(start),
Error: err,
}
}
proc := sr.Value.(*Process)
<-proc.Done()
return RunResult{

View file

@ -3,8 +3,6 @@ package process
import (
"bufio"
"context"
"errors"
"fmt"
"io"
"os/exec"
"sync"
@ -43,7 +41,22 @@ type Options struct {
BufferSize int
}
// Register is the WithService factory for go-process.
// Registers the process service with Core — OnStartup registers named Actions
// (process.run, process.start, process.kill, process.list, process.get).
//
// core.New(core.WithService(process.Register))
func Register(c *core.Core) core.Result {
svc := &Service{
ServiceRuntime: core.NewServiceRuntime(c, Options{BufferSize: DefaultBufferSize}),
processes: make(map[string]*Process),
bufSize: DefaultBufferSize,
}
return core.Result{Value: svc, OK: true}
}
// NewService creates a process service factory for Core registration.
// Deprecated: Use Register with core.WithService(process.Register) instead.
//
// core, _ := core.New(
// core.WithName("process", process.NewService(process.Options{})),
@ -62,14 +75,23 @@ func NewService(opts Options) func(*core.Core) (any, error) {
}
}
// OnStartup implements core.Startable.
func (s *Service) OnStartup(ctx context.Context) error {
return nil
// OnStartup implements core.Startable — registers named Actions.
//
// c.Process().Run(ctx, "git", "log") // → calls process.run Action
func (s *Service) OnStartup(ctx context.Context) core.Result {
c := s.Core()
c.Action("process.run", s.handleRun)
c.Action("process.start", s.handleStart)
c.Action("process.kill", s.handleKill)
c.Action("process.list", s.handleList)
c.Action("process.get", s.handleGet)
return core.Result{OK: true}
}
// OnShutdown implements core.Stoppable.
// Gracefully shuts down all running processes (SIGTERM → SIGKILL).
func (s *Service) OnShutdown(ctx context.Context) error {
// OnShutdown implements core.Stoppable — kills all managed processes.
//
// c.ServiceShutdown(ctx) // calls OnShutdown on all Stoppable services
func (s *Service) OnShutdown(ctx context.Context) core.Result {
s.mu.RLock()
procs := make([]*Process, 0, len(s.processes))
for _, p := range s.processes {
@ -83,11 +105,14 @@ func (s *Service) OnShutdown(ctx context.Context) error {
_ = p.Shutdown()
}
return nil
return core.Result{OK: true}
}
// Start spawns a new process with the given command and args.
func (s *Service) Start(ctx context.Context, command string, args ...string) (*Process, error) {
//
// r := svc.Start(ctx, "echo", "hello")
// if r.OK { proc := r.Value.(*Process) }
func (s *Service) Start(ctx context.Context, command string, args ...string) core.Result {
return s.StartWithOptions(ctx, RunOptions{
Command: command,
Args: args,
@ -95,8 +120,11 @@ func (s *Service) Start(ctx context.Context, command string, args ...string) (*P
}
// StartWithOptions spawns a process with full configuration.
func (s *Service) StartWithOptions(ctx context.Context, opts RunOptions) (*Process, error) {
id := fmt.Sprintf("proc-%d", s.idCounter.Add(1))
//
// r := svc.StartWithOptions(ctx, process.RunOptions{Command: "go", Args: []string{"test", "./..."}})
// if r.OK { proc := r.Value.(*Process) }
func (s *Service) StartWithOptions(ctx context.Context, opts RunOptions) core.Result {
id := core.ID()
// Detached processes use Background context so they survive parent death
parentCtx := ctx
@ -122,19 +150,19 @@ func (s *Service) StartWithOptions(ctx context.Context, opts RunOptions) (*Proce
stdout, err := cmd.StdoutPipe()
if err != nil {
cancel()
return nil, coreerr.E("Service.StartWithOptions", "failed to create stdout pipe", err)
return core.Result{OK: false}
}
stderr, err := cmd.StderrPipe()
if err != nil {
cancel()
return nil, coreerr.E("Service.StartWithOptions", "failed to create stderr pipe", err)
return core.Result{OK: false}
}
stdin, err := cmd.StdinPipe()
if err != nil {
cancel()
return nil, coreerr.E("Service.StartWithOptions", "failed to create stdin pipe", err)
return core.Result{OK: false}
}
// Create output buffer (enabled by default)
@ -164,7 +192,7 @@ func (s *Service) StartWithOptions(ctx context.Context, opts RunOptions) (*Proce
// Start the process
if err := cmd.Start(); err != nil {
cancel()
return nil, coreerr.E("Service.StartWithOptions", "failed to start process", err)
return core.Result{OK: false}
}
// Store process
@ -177,7 +205,6 @@ func (s *Service) StartWithOptions(ctx context.Context, opts RunOptions) (*Proce
go func() {
select {
case <-proc.done:
// Process exited before timeout
case <-time.After(opts.Timeout):
proc.Shutdown()
}
@ -185,7 +212,7 @@ func (s *Service) StartWithOptions(ctx context.Context, opts RunOptions) (*Proce
}
// Broadcast start
_ = s.Core().ACTION(ActionProcessStarted{
s.Core().ACTION(ActionProcessStarted{
ID: id,
Command: opts.Command,
Args: opts.Args,
@ -207,10 +234,7 @@ func (s *Service) StartWithOptions(ctx context.Context, opts RunOptions) (*Proce
// Wait for process completion
go func() {
// Wait for output streaming to complete
wg.Wait()
// Wait for process exit
err := cmd.Wait()
duration := time.Since(proc.StartedAt)
@ -219,7 +243,7 @@ func (s *Service) StartWithOptions(ctx context.Context, opts RunOptions) (*Proce
proc.Duration = duration
if err != nil {
var exitErr *exec.ExitError
if errors.As(err, &exitErr) {
if core.As(err, &exitErr) {
proc.ExitCode = exitErr.ExitCode()
proc.Status = StatusExited
} else {
@ -235,20 +259,15 @@ func (s *Service) StartWithOptions(ctx context.Context, opts RunOptions) (*Proce
close(proc.done)
// Broadcast exit
var exitErr error
if status == StatusFailed {
exitErr = err
}
_ = s.Core().ACTION(ActionProcessExited{
s.Core().ACTION(ActionProcessExited{
ID: id,
ExitCode: exitCode,
Duration: duration,
Error: exitErr,
})
_ = status
}()
return proc, nil
return core.Result{Value: proc, OK: true}
}
// streamOutput reads from a pipe and broadcasts lines via ACTION.
@ -371,34 +390,146 @@ func (s *Service) Output(id string) (string, error) {
}
// Run executes a command and waits for completion.
// Returns the combined output and any error.
func (s *Service) Run(ctx context.Context, command string, args ...string) (string, error) {
proc, err := s.Start(ctx, command, args...)
if err != nil {
return "", err
// Value is always the output string. OK is true if exit code is 0.
//
// r := svc.Run(ctx, "go", "test", "./...")
// output := r.Value.(string)
func (s *Service) Run(ctx context.Context, command string, args ...string) core.Result {
r := s.Start(ctx, command, args...)
if !r.OK {
return core.Result{Value: "", OK: false}
}
proc := r.Value.(*Process)
<-proc.Done()
output := proc.Output()
if proc.ExitCode != 0 {
return output, coreerr.E("Service.Run", fmt.Sprintf("process exited with code %d", proc.ExitCode), nil)
}
return output, nil
return core.Result{Value: proc.Output(), OK: proc.ExitCode == 0}
}
// RunWithOptions executes a command with options and waits for completion.
func (s *Service) RunWithOptions(ctx context.Context, opts RunOptions) (string, error) {
proc, err := s.StartWithOptions(ctx, opts)
if err != nil {
return "", err
// Value is always the output string. OK is true if exit code is 0.
//
// r := svc.RunWithOptions(ctx, process.RunOptions{Command: "go", Args: []string{"test"}})
// output := r.Value.(string)
func (s *Service) RunWithOptions(ctx context.Context, opts RunOptions) core.Result {
r := s.StartWithOptions(ctx, opts)
if !r.OK {
return core.Result{Value: "", OK: false}
}
proc := r.Value.(*Process)
<-proc.Done()
output := proc.Output()
if proc.ExitCode != 0 {
return output, coreerr.E("Service.RunWithOptions", fmt.Sprintf("process exited with code %d", proc.ExitCode), nil)
}
return output, nil
return core.Result{Value: proc.Output(), OK: proc.ExitCode == 0}
}
// --- Named Action Handlers ---
// These are registered during OnStartup and called via c.Process() sugar.
// c.Process().Run(ctx, "git", "log") → c.Action("process.run").Run(ctx, opts)
// handleRun executes a command synchronously and returns the output.
//
// r := c.Action("process.run").Run(ctx, core.NewOptions(
// core.Option{Key: "command", Value: "git"},
// core.Option{Key: "args", Value: []string{"log"}},
// core.Option{Key: "dir", Value: "/repo"},
// ))
func (s *Service) handleRun(ctx context.Context, opts core.Options) core.Result {
command := opts.String("command")
if command == "" {
return core.Result{Value: coreerr.E("process.run", "command is required", nil), OK: false}
}
runOpts := RunOptions{
Command: command,
Dir: opts.String("dir"),
}
if r := opts.Get("args"); r.OK {
if args, ok := r.Value.([]string); ok {
runOpts.Args = args
}
}
if r := opts.Get("env"); r.OK {
if env, ok := r.Value.([]string); ok {
runOpts.Env = env
}
}
return s.RunWithOptions(ctx, runOpts)
}
// handleStart spawns a detached/background process and returns the process ID.
//
// r := c.Action("process.start").Run(ctx, core.NewOptions(
// core.Option{Key: "command", Value: "docker"},
// core.Option{Key: "args", Value: []string{"run", "nginx"}},
// ))
// id := r.Value.(string)
func (s *Service) handleStart(ctx context.Context, opts core.Options) core.Result {
command := opts.String("command")
if command == "" {
return core.Result{Value: coreerr.E("process.start", "command is required", nil), OK: false}
}
runOpts := RunOptions{
Command: command,
Dir: opts.String("dir"),
}
if r := opts.Get("args"); r.OK {
if args, ok := r.Value.([]string); ok {
runOpts.Args = args
}
}
r := s.StartWithOptions(ctx, runOpts)
if !r.OK {
return r
}
return core.Result{Value: r.Value.(*Process).ID, OK: true}
}
// handleKill terminates a process by ID.
//
// r := c.Action("process.kill").Run(ctx, core.NewOptions(
// core.Option{Key: "id", Value: "id-42-a3f2b1"},
// ))
func (s *Service) handleKill(ctx context.Context, opts core.Options) core.Result {
id := opts.String("id")
if id != "" {
if err := s.Kill(id); err != nil {
return core.Result{Value: err, OK: false}
}
return core.Result{OK: true}
}
return core.Result{Value: coreerr.E("process.kill", "id is required", nil), OK: false}
}
// handleList returns the IDs of all managed processes.
//
// r := c.Action("process.list").Run(ctx, core.NewOptions())
// ids := r.Value.([]string)
func (s *Service) handleList(ctx context.Context, opts core.Options) core.Result {
s.mu.RLock()
defer s.mu.RUnlock()
ids := make([]string, 0, len(s.processes))
for id := range s.processes {
ids = append(ids, id)
}
return core.Result{Value: ids, OK: true}
}
// handleGet returns process info by ID.
//
// r := c.Action("process.get").Run(ctx, core.NewOptions(
// core.Option{Key: "id", Value: "id-42-a3f2b1"},
// ))
// info := r.Value.(process.Info)
func (s *Service) handleGet(ctx context.Context, opts core.Options) core.Result {
id := opts.String("id")
proc, err := s.Get(id)
if err != nil {
return core.Result{Value: err, OK: false}
}
return core.Result{Value: proc.Info(), OK: true}
}

View file

@ -24,19 +24,23 @@ func newTestService(t *testing.T) (*Service, *framework.Core) {
return svc, c
}
func startProc(t *testing.T, svc *Service, ctx context.Context, command string, args ...string) *Process {
t.Helper()
r := svc.Start(ctx, command, args...)
require.True(t, r.OK)
return r.Value.(*Process)
}
func TestService_Start(t *testing.T) {
t.Run("echo command", func(t *testing.T) {
svc, _ := newTestService(t)
proc, err := svc.Start(context.Background(), "echo", "hello")
require.NoError(t, err)
require.NotNil(t, proc)
proc := startProc(t, svc, context.Background(), "echo", "hello")
assert.NotEmpty(t, proc.ID)
assert.Equal(t, "echo", proc.Command)
assert.Equal(t, []string{"hello"}, proc.Args)
// Wait for completion
<-proc.Done()
assert.Equal(t, StatusExited, proc.Status)
@ -47,8 +51,7 @@ func TestService_Start(t *testing.T) {
t.Run("failing command", func(t *testing.T) {
svc, _ := newTestService(t)
proc, err := svc.Start(context.Background(), "sh", "-c", "exit 42")
require.NoError(t, err)
proc := startProc(t, svc, context.Background(), "sh", "-c", "exit 42")
<-proc.Done()
@ -59,22 +62,22 @@ func TestService_Start(t *testing.T) {
t.Run("non-existent command", func(t *testing.T) {
svc, _ := newTestService(t)
_, err := svc.Start(context.Background(), "nonexistent_command_xyz")
assert.Error(t, err)
r := svc.Start(context.Background(), "nonexistent_command_xyz")
assert.False(t, r.OK)
})
t.Run("with working directory", func(t *testing.T) {
svc, _ := newTestService(t)
proc, err := svc.StartWithOptions(context.Background(), RunOptions{
r := svc.StartWithOptions(context.Background(), RunOptions{
Command: "pwd",
Dir: "/tmp",
})
require.NoError(t, err)
require.True(t, r.OK)
proc := r.Value.(*Process)
<-proc.Done()
// On macOS /tmp is a symlink to /private/tmp
output := strings.TrimSpace(proc.Output())
assert.True(t, output == "/tmp" || output == "/private/tmp", "got: %s", output)
})
@ -83,15 +86,12 @@ func TestService_Start(t *testing.T) {
svc, _ := newTestService(t)
ctx, cancel := context.WithCancel(context.Background())
proc, err := svc.Start(ctx, "sleep", "10")
require.NoError(t, err)
proc := startProc(t, svc, ctx, "sleep", "10")
// Cancel immediately
cancel()
select {
case <-proc.Done():
// Good - process was killed
case <-time.After(2 * time.Second):
t.Fatal("process should have been killed")
}
@ -100,12 +100,13 @@ func TestService_Start(t *testing.T) {
t.Run("disable capture", func(t *testing.T) {
svc, _ := newTestService(t)
proc, err := svc.StartWithOptions(context.Background(), RunOptions{
r := svc.StartWithOptions(context.Background(), RunOptions{
Command: "echo",
Args: []string{"no-capture"},
DisableCapture: true,
})
require.NoError(t, err)
require.True(t, r.OK)
proc := r.Value.(*Process)
<-proc.Done()
assert.Equal(t, StatusExited, proc.Status)
@ -115,12 +116,13 @@ func TestService_Start(t *testing.T) {
t.Run("with environment variables", func(t *testing.T) {
svc, _ := newTestService(t)
proc, err := svc.StartWithOptions(context.Background(), RunOptions{
r := svc.StartWithOptions(context.Background(), RunOptions{
Command: "sh",
Args: []string{"-c", "echo $MY_TEST_VAR"},
Env: []string{"MY_TEST_VAR=hello_env"},
})
require.NoError(t, err)
require.True(t, r.OK)
proc := r.Value.(*Process)
<-proc.Done()
assert.Contains(t, proc.Output(), "hello_env")
@ -131,17 +133,16 @@ func TestService_Start(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
proc, err := svc.StartWithOptions(ctx, RunOptions{
r := svc.StartWithOptions(ctx, RunOptions{
Command: "echo",
Args: []string{"detached"},
Detach: true,
})
require.NoError(t, err)
require.True(t, r.OK)
proc := r.Value.(*Process)
// Cancel the parent context
cancel()
// Detached process should still complete normally
select {
case <-proc.Done():
assert.Equal(t, StatusExited, proc.Status)
@ -156,17 +157,16 @@ func TestService_Run(t *testing.T) {
t.Run("returns output", func(t *testing.T) {
svc, _ := newTestService(t)
output, err := svc.Run(context.Background(), "echo", "hello world")
require.NoError(t, err)
assert.Contains(t, output, "hello world")
r := svc.Run(context.Background(), "echo", "hello world")
assert.True(t, r.OK)
assert.Contains(t, r.Value.(string), "hello world")
})
t.Run("returns error on failure", func(t *testing.T) {
t.Run("returns !OK on failure", func(t *testing.T) {
svc, _ := newTestService(t)
_, err := svc.Run(context.Background(), "sh", "-c", "exit 1")
assert.Error(t, err)
assert.Contains(t, err.Error(), "exited with code 1")
r := svc.Run(context.Background(), "sh", "-c", "exit 1")
assert.False(t, r.OK)
})
}
@ -174,7 +174,6 @@ func TestService_Actions(t *testing.T) {
t.Run("broadcasts events", func(t *testing.T) {
c := framework.New()
// Register process service on Core
factory := NewService(Options{})
raw, err := factory(c)
require.NoError(t, err)
@ -198,12 +197,10 @@ func TestService_Actions(t *testing.T) {
}
return framework.Result{OK: true}
})
proc, err := svc.Start(context.Background(), "echo", "test")
require.NoError(t, err)
proc := startProc(t, svc, context.Background(), "echo", "test")
<-proc.Done()
// Give time for events to propagate
time.Sleep(10 * time.Millisecond)
mu.Lock()
@ -232,8 +229,8 @@ func TestService_List(t *testing.T) {
t.Run("tracks processes", func(t *testing.T) {
svc, _ := newTestService(t)
proc1, _ := svc.Start(context.Background(), "echo", "1")
proc2, _ := svc.Start(context.Background(), "echo", "2")
proc1 := startProc(t, svc, context.Background(), "echo", "1")
proc2 := startProc(t, svc, context.Background(), "echo", "2")
<-proc1.Done()
<-proc2.Done()
@ -245,7 +242,7 @@ func TestService_List(t *testing.T) {
t.Run("get by id", func(t *testing.T) {
svc, _ := newTestService(t)
proc, _ := svc.Start(context.Background(), "echo", "test")
proc := startProc(t, svc, context.Background(), "echo", "test")
<-proc.Done()
got, err := svc.Get(proc.ID)
@ -265,7 +262,7 @@ func TestService_Remove(t *testing.T) {
t.Run("removes completed process", func(t *testing.T) {
svc, _ := newTestService(t)
proc, _ := svc.Start(context.Background(), "echo", "test")
proc := startProc(t, svc, context.Background(), "echo", "test")
<-proc.Done()
err := svc.Remove(proc.ID)
@ -281,7 +278,7 @@ func TestService_Remove(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
proc, _ := svc.Start(ctx, "sleep", "10")
proc := startProc(t, svc, ctx, "sleep", "10")
err := svc.Remove(proc.ID)
assert.Error(t, err)
@ -295,8 +292,8 @@ func TestService_Clear(t *testing.T) {
t.Run("clears completed processes", func(t *testing.T) {
svc, _ := newTestService(t)
proc1, _ := svc.Start(context.Background(), "echo", "1")
proc2, _ := svc.Start(context.Background(), "echo", "2")
proc1 := startProc(t, svc, context.Background(), "echo", "1")
proc2 := startProc(t, svc, context.Background(), "echo", "2")
<-proc1.Done()
<-proc2.Done()
@ -316,15 +313,13 @@ func TestService_Kill(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
proc, err := svc.Start(ctx, "sleep", "60")
require.NoError(t, err)
proc := startProc(t, svc, ctx, "sleep", "60")
err = svc.Kill(proc.ID)
err := svc.Kill(proc.ID)
assert.NoError(t, err)
select {
case <-proc.Done():
// Process killed successfully
case <-time.After(2 * time.Second):
t.Fatal("process should have been killed")
}
@ -342,8 +337,7 @@ func TestService_Output(t *testing.T) {
t.Run("returns captured output", func(t *testing.T) {
svc, _ := newTestService(t)
proc, err := svc.Start(context.Background(), "echo", "captured")
require.NoError(t, err)
proc := startProc(t, svc, context.Background(), "echo", "captured")
<-proc.Done()
output, err := svc.Output(proc.ID)
@ -366,16 +360,14 @@ func TestService_OnShutdown(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
proc1, err := svc.Start(ctx, "sleep", "60")
require.NoError(t, err)
proc2, err := svc.Start(ctx, "sleep", "60")
require.NoError(t, err)
proc1 := startProc(t, svc, ctx, "sleep", "60")
proc2 := startProc(t, svc, ctx, "sleep", "60")
assert.True(t, proc1.IsRunning())
assert.True(t, proc2.IsRunning())
err = svc.OnShutdown(context.Background())
assert.NoError(t, err)
r := svc.OnShutdown(context.Background())
assert.True(t, r.OK)
select {
case <-proc1.Done():
@ -391,10 +383,16 @@ func TestService_OnShutdown(t *testing.T) {
}
func TestService_OnStartup(t *testing.T) {
t.Run("returns nil", func(t *testing.T) {
svc, _ := newTestService(t)
err := svc.OnStartup(context.Background())
assert.NoError(t, err)
t.Run("registers named actions", func(t *testing.T) {
svc, c := newTestService(t)
r := svc.OnStartup(context.Background())
assert.True(t, r.OK)
assert.True(t, c.Action("process.run").Exists())
assert.True(t, c.Action("process.start").Exists())
assert.True(t, c.Action("process.kill").Exists())
assert.True(t, c.Action("process.list").Exists())
assert.True(t, c.Action("process.get").Exists())
})
}
@ -402,23 +400,22 @@ func TestService_RunWithOptions(t *testing.T) {
t.Run("returns output on success", func(t *testing.T) {
svc, _ := newTestService(t)
output, err := svc.RunWithOptions(context.Background(), RunOptions{
r := svc.RunWithOptions(context.Background(), RunOptions{
Command: "echo",
Args: []string{"opts-test"},
})
require.NoError(t, err)
assert.Contains(t, output, "opts-test")
assert.True(t, r.OK)
assert.Contains(t, r.Value.(string), "opts-test")
})
t.Run("returns error on failure", func(t *testing.T) {
t.Run("returns !OK on failure", func(t *testing.T) {
svc, _ := newTestService(t)
_, err := svc.RunWithOptions(context.Background(), RunOptions{
r := svc.RunWithOptions(context.Background(), RunOptions{
Command: "sh",
Args: []string{"-c", "exit 2"},
})
assert.Error(t, err)
assert.Contains(t, err.Error(), "exited with code 2")
assert.False(t, r.OK)
})
}
@ -429,11 +426,8 @@ func TestService_Running(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
proc1, err := svc.Start(ctx, "sleep", "60")
require.NoError(t, err)
proc2, err := svc.Start(context.Background(), "echo", "done")
require.NoError(t, err)
proc1 := startProc(t, svc, ctx, "sleep", "60")
proc2 := startProc(t, svc, context.Background(), "echo", "done")
<-proc2.Done()
running := svc.Running()