commit 8095807ad603bbf989abe2756a0b6240aad35907 Author: Snider Date: Fri Mar 6 12:50:09 2026 +0000 feat: extract process package from core/go pkg/process Process management with Core IPC integration, output streaming via ring buffer, exec wrapper with logging, and dependency-aware runner. Moved from forge.lthn.ai/core/go/pkg/process to standalone module. Co-Authored-By: Virgil diff --git a/actions.go b/actions.go new file mode 100644 index 0000000..7f33cf8 --- /dev/null +++ b/actions.go @@ -0,0 +1,37 @@ +package process + +import "time" + +// --- ACTION messages (broadcast via Core.ACTION) --- + +// ActionProcessStarted is broadcast when a process begins execution. +type ActionProcessStarted struct { + ID string + Command string + Args []string + Dir string + PID int +} + +// ActionProcessOutput is broadcast for each line of output. +// Subscribe to this for real-time streaming. +type ActionProcessOutput struct { + ID string + Line string + Stream Stream +} + +// ActionProcessExited is broadcast when a process completes. +// Check ExitCode for success (0) or failure. +type ActionProcessExited struct { + ID string + ExitCode int + Duration time.Duration + Error error // Non-nil if failed to start or was killed +} + +// ActionProcessKilled is broadcast when a process is terminated. +type ActionProcessKilled struct { + ID string + Signal string +} diff --git a/buffer.go b/buffer.go new file mode 100644 index 0000000..bf02f59 --- /dev/null +++ b/buffer.go @@ -0,0 +1,108 @@ +package process + +import "sync" + +// RingBuffer is a fixed-size circular buffer that overwrites old data. +// Thread-safe for concurrent reads and writes. +type RingBuffer struct { + data []byte + size int + start int + end int + full bool + mu sync.RWMutex +} + +// NewRingBuffer creates a ring buffer with the given capacity. +func NewRingBuffer(size int) *RingBuffer { + return &RingBuffer{ + data: make([]byte, size), + size: size, + } +} + +// Write appends data to the buffer, overwriting oldest data if full. +func (rb *RingBuffer) Write(p []byte) (n int, err error) { + rb.mu.Lock() + defer rb.mu.Unlock() + + for _, b := range p { + rb.data[rb.end] = b + rb.end = (rb.end + 1) % rb.size + if rb.full { + rb.start = (rb.start + 1) % rb.size + } + if rb.end == rb.start { + rb.full = true + } + } + return len(p), nil +} + +// String returns the buffer contents as a string. +func (rb *RingBuffer) String() string { + rb.mu.RLock() + defer rb.mu.RUnlock() + + if !rb.full && rb.start == rb.end { + return "" + } + + if rb.full { + result := make([]byte, rb.size) + copy(result, rb.data[rb.start:]) + copy(result[rb.size-rb.start:], rb.data[:rb.end]) + return string(result) + } + + return string(rb.data[rb.start:rb.end]) +} + +// Bytes returns a copy of the buffer contents. +func (rb *RingBuffer) Bytes() []byte { + rb.mu.RLock() + defer rb.mu.RUnlock() + + if !rb.full && rb.start == rb.end { + return nil + } + + if rb.full { + result := make([]byte, rb.size) + copy(result, rb.data[rb.start:]) + copy(result[rb.size-rb.start:], rb.data[:rb.end]) + return result + } + + result := make([]byte, rb.end-rb.start) + copy(result, rb.data[rb.start:rb.end]) + return result +} + +// Len returns the current length of data in the buffer. +func (rb *RingBuffer) Len() int { + rb.mu.RLock() + defer rb.mu.RUnlock() + + if rb.full { + return rb.size + } + if rb.end >= rb.start { + return rb.end - rb.start + } + return rb.size - rb.start + rb.end +} + +// Cap returns the buffer capacity. +func (rb *RingBuffer) Cap() int { + return rb.size +} + +// Reset clears the buffer. +func (rb *RingBuffer) Reset() { + rb.mu.Lock() + defer rb.mu.Unlock() + rb.start = 0 + rb.end = 0 + rb.full = false +} diff --git a/buffer_test.go b/buffer_test.go new file mode 100644 index 0000000..bbd4f1c --- /dev/null +++ b/buffer_test.go @@ -0,0 +1,72 @@ +package process + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestRingBuffer(t *testing.T) { + t.Run("write and read", func(t *testing.T) { + rb := NewRingBuffer(10) + + n, err := rb.Write([]byte("hello")) + assert.NoError(t, err) + assert.Equal(t, 5, n) + assert.Equal(t, "hello", rb.String()) + assert.Equal(t, 5, rb.Len()) + }) + + t.Run("overflow wraps around", func(t *testing.T) { + rb := NewRingBuffer(5) + + _, _ = rb.Write([]byte("hello")) + assert.Equal(t, "hello", rb.String()) + + _, _ = rb.Write([]byte("world")) + // Should contain "world" (overwrote "hello") + assert.Equal(t, 5, rb.Len()) + assert.Equal(t, "world", rb.String()) + }) + + t.Run("partial overflow", func(t *testing.T) { + rb := NewRingBuffer(10) + + _, _ = rb.Write([]byte("hello")) + _, _ = rb.Write([]byte("worldx")) + // Should contain "lloworldx" (11 chars, buffer is 10) + assert.Equal(t, 10, rb.Len()) + }) + + t.Run("empty buffer", func(t *testing.T) { + rb := NewRingBuffer(10) + assert.Equal(t, "", rb.String()) + assert.Equal(t, 0, rb.Len()) + assert.Nil(t, rb.Bytes()) + }) + + t.Run("reset", func(t *testing.T) { + rb := NewRingBuffer(10) + _, _ = rb.Write([]byte("hello")) + rb.Reset() + assert.Equal(t, "", rb.String()) + assert.Equal(t, 0, rb.Len()) + }) + + t.Run("cap", func(t *testing.T) { + rb := NewRingBuffer(42) + assert.Equal(t, 42, rb.Cap()) + }) + + t.Run("bytes returns copy", func(t *testing.T) { + rb := NewRingBuffer(10) + _, _ = rb.Write([]byte("hello")) + + bytes := rb.Bytes() + assert.Equal(t, []byte("hello"), bytes) + + // Modifying returned bytes shouldn't affect buffer + bytes[0] = 'x' + assert.Equal(t, "hello", rb.String()) + }) +} diff --git a/exec/exec.go b/exec/exec.go new file mode 100644 index 0000000..21978a9 --- /dev/null +++ b/exec/exec.go @@ -0,0 +1,176 @@ +package exec + +import ( + "bytes" + "context" + "fmt" + "io" + "os" + "os/exec" + "strings" +) + +// Options configuration for command execution +type Options struct { + Dir string + Env []string + Stdin io.Reader + Stdout io.Writer + Stderr io.Writer + // If true, command will run in background (not implemented in this wrapper yet) + // Background bool +} + +// Command wraps os/exec.Command with logging and context +func Command(ctx context.Context, name string, args ...string) *Cmd { + return &Cmd{ + name: name, + args: args, + ctx: ctx, + } +} + +// Cmd represents a wrapped command +type Cmd struct { + name string + args []string + ctx context.Context + opts Options + cmd *exec.Cmd + logger Logger +} + +// WithDir sets the working directory +func (c *Cmd) WithDir(dir string) *Cmd { + c.opts.Dir = dir + return c +} + +// WithEnv sets the environment variables +func (c *Cmd) WithEnv(env []string) *Cmd { + c.opts.Env = env + return c +} + +// WithStdin sets stdin +func (c *Cmd) WithStdin(r io.Reader) *Cmd { + c.opts.Stdin = r + return c +} + +// WithStdout sets stdout +func (c *Cmd) WithStdout(w io.Writer) *Cmd { + c.opts.Stdout = w + return c +} + +// WithStderr sets stderr +func (c *Cmd) WithStderr(w io.Writer) *Cmd { + c.opts.Stderr = w + return c +} + +// WithLogger sets a custom logger for this command. +// If not set, the package default logger is used. +func (c *Cmd) WithLogger(l Logger) *Cmd { + c.logger = l + return c +} + +// Run executes the command and waits for it to finish. +// It automatically logs the command execution at debug level. +func (c *Cmd) Run() error { + c.prepare() + c.logDebug("executing command") + + if err := c.cmd.Run(); err != nil { + wrapped := wrapError(err, c.name, c.args) + c.logError("command failed", wrapped) + return wrapped + } + return nil +} + +// Output runs the command and returns its standard output. +func (c *Cmd) Output() ([]byte, error) { + c.prepare() + c.logDebug("executing command") + + out, err := c.cmd.Output() + if err != nil { + wrapped := wrapError(err, c.name, c.args) + c.logError("command failed", wrapped) + return nil, wrapped + } + return out, nil +} + +// CombinedOutput runs the command and returns its combined standard output and standard error. +func (c *Cmd) CombinedOutput() ([]byte, error) { + c.prepare() + c.logDebug("executing command") + + out, err := c.cmd.CombinedOutput() + if err != nil { + wrapped := wrapError(err, c.name, c.args) + c.logError("command failed", wrapped) + return out, wrapped + } + return out, nil +} + +func (c *Cmd) prepare() { + if c.ctx != nil { + c.cmd = exec.CommandContext(c.ctx, c.name, c.args...) + } else { + // Should we enforce context? The issue says "Enforce context usage". + // For now, let's allow nil but log a warning if we had a logger? + // Or strictly panic/error? + // Let's fallback to Background for now but maybe strict later. + c.cmd = exec.Command(c.name, c.args...) + } + + c.cmd.Dir = c.opts.Dir + if len(c.opts.Env) > 0 { + c.cmd.Env = append(os.Environ(), c.opts.Env...) + } + + c.cmd.Stdin = c.opts.Stdin + c.cmd.Stdout = c.opts.Stdout + c.cmd.Stderr = c.opts.Stderr +} + +// RunQuiet executes the command suppressing stdout unless there is an error. +// Useful for internal commands. +func RunQuiet(ctx context.Context, name string, args ...string) error { + var stderr bytes.Buffer + cmd := Command(ctx, name, args...).WithStderr(&stderr) + if err := cmd.Run(); err != nil { + // Include stderr in error message + return fmt.Errorf("%w: %s", err, strings.TrimSpace(stderr.String())) + } + return nil +} + +func wrapError(err error, name string, args []string) error { + cmdStr := name + " " + strings.Join(args, " ") + if exitErr, ok := err.(*exec.ExitError); ok { + return fmt.Errorf("command %q failed with exit code %d: %w", cmdStr, exitErr.ExitCode(), err) + } + return fmt.Errorf("failed to execute %q: %w", cmdStr, err) +} + +func (c *Cmd) getLogger() Logger { + if c.logger != nil { + return c.logger + } + return defaultLogger +} + +func (c *Cmd) logDebug(msg string) { + c.getLogger().Debug(msg, "cmd", c.name, "args", strings.Join(c.args, " ")) +} + +func (c *Cmd) logError(msg string, err error) { + c.getLogger().Error(msg, "cmd", c.name, "args", strings.Join(c.args, " "), "err", err) +} diff --git a/exec/exec_test.go b/exec/exec_test.go new file mode 100644 index 0000000..6984e9a --- /dev/null +++ b/exec/exec_test.go @@ -0,0 +1,148 @@ +package exec_test + +import ( + "context" + "strings" + "testing" + + "forge.lthn.ai/core/go-process/exec" +) + +// mockLogger captures log calls for testing +type mockLogger struct { + debugCalls []logCall + errorCalls []logCall +} + +type logCall struct { + msg string + keyvals []any +} + +func (m *mockLogger) Debug(msg string, keyvals ...any) { + m.debugCalls = append(m.debugCalls, logCall{msg, keyvals}) +} + +func (m *mockLogger) Error(msg string, keyvals ...any) { + m.errorCalls = append(m.errorCalls, logCall{msg, keyvals}) +} + +func TestCommand_Run_Good_LogsDebug(t *testing.T) { + logger := &mockLogger{} + ctx := context.Background() + + err := exec.Command(ctx, "echo", "hello"). + WithLogger(logger). + Run() + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if len(logger.debugCalls) != 1 { + t.Fatalf("expected 1 debug call, got %d", len(logger.debugCalls)) + } + if logger.debugCalls[0].msg != "executing command" { + t.Errorf("expected msg 'executing command', got %q", logger.debugCalls[0].msg) + } + if len(logger.errorCalls) != 0 { + t.Errorf("expected no error calls, got %d", len(logger.errorCalls)) + } +} + +func TestCommand_Run_Bad_LogsError(t *testing.T) { + logger := &mockLogger{} + ctx := context.Background() + + err := exec.Command(ctx, "false"). + WithLogger(logger). + Run() + if err == nil { + t.Fatal("expected error") + } + + if len(logger.debugCalls) != 1 { + t.Fatalf("expected 1 debug call, got %d", len(logger.debugCalls)) + } + if len(logger.errorCalls) != 1 { + t.Fatalf("expected 1 error call, got %d", len(logger.errorCalls)) + } + if logger.errorCalls[0].msg != "command failed" { + t.Errorf("expected msg 'command failed', got %q", logger.errorCalls[0].msg) + } +} + +func TestCommand_Output_Good(t *testing.T) { + logger := &mockLogger{} + ctx := context.Background() + + out, err := exec.Command(ctx, "echo", "test"). + WithLogger(logger). + Output() + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if strings.TrimSpace(string(out)) != "test" { + t.Errorf("expected 'test', got %q", string(out)) + } + if len(logger.debugCalls) != 1 { + t.Errorf("expected 1 debug call, got %d", len(logger.debugCalls)) + } +} + +func TestCommand_CombinedOutput_Good(t *testing.T) { + logger := &mockLogger{} + ctx := context.Background() + + out, err := exec.Command(ctx, "echo", "combined"). + WithLogger(logger). + CombinedOutput() + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if strings.TrimSpace(string(out)) != "combined" { + t.Errorf("expected 'combined', got %q", string(out)) + } + if len(logger.debugCalls) != 1 { + t.Errorf("expected 1 debug call, got %d", len(logger.debugCalls)) + } +} + +func TestNopLogger(t *testing.T) { + // Verify NopLogger doesn't panic + var nop exec.NopLogger + nop.Debug("msg", "key", "val") + nop.Error("msg", "key", "val") +} + +func TestSetDefaultLogger(t *testing.T) { + original := exec.DefaultLogger() + defer exec.SetDefaultLogger(original) + + logger := &mockLogger{} + exec.SetDefaultLogger(logger) + + if exec.DefaultLogger() != logger { + t.Error("default logger not set correctly") + } + + // Test nil resets to NopLogger + exec.SetDefaultLogger(nil) + if _, ok := exec.DefaultLogger().(exec.NopLogger); !ok { + t.Error("expected NopLogger when setting nil") + } +} + +func TestCommand_UsesDefaultLogger(t *testing.T) { + original := exec.DefaultLogger() + defer exec.SetDefaultLogger(original) + + logger := &mockLogger{} + exec.SetDefaultLogger(logger) + + ctx := context.Background() + _ = exec.Command(ctx, "echo", "test").Run() + + if len(logger.debugCalls) != 1 { + t.Errorf("expected default logger to receive 1 debug call, got %d", len(logger.debugCalls)) + } +} diff --git a/exec/logger.go b/exec/logger.go new file mode 100644 index 0000000..e8f5a6b --- /dev/null +++ b/exec/logger.go @@ -0,0 +1,35 @@ +package exec + +// Logger interface for command execution logging. +// Compatible with pkg/log.Logger and other structured loggers. +type Logger interface { + // Debug logs a debug-level message with optional key-value pairs. + Debug(msg string, keyvals ...any) + // Error logs an error-level message with optional key-value pairs. + Error(msg string, keyvals ...any) +} + +// NopLogger is a no-op logger that discards all messages. +type NopLogger struct{} + +// Debug discards the message (no-op implementation). +func (NopLogger) Debug(string, ...any) {} + +// Error discards the message (no-op implementation). +func (NopLogger) Error(string, ...any) {} + +var defaultLogger Logger = NopLogger{} + +// SetDefaultLogger sets the package-level default logger. +// Commands without an explicit logger will use this. +func SetDefaultLogger(l Logger) { + if l == nil { + l = NopLogger{} + } + defaultLogger = l +} + +// DefaultLogger returns the current default logger. +func DefaultLogger() Logger { + return defaultLogger +} diff --git a/global_test.go b/global_test.go new file mode 100644 index 0000000..80a8158 --- /dev/null +++ b/global_test.go @@ -0,0 +1,298 @@ +package process + +import ( + "context" + "sync" + "testing" + + "forge.lthn.ai/core/go/pkg/framework" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestGlobal_DefaultNotInitialized(t *testing.T) { + // Reset global state for this test + old := defaultService.Swap(nil) + defer func() { + if old != nil { + defaultService.Store(old) + } + }() + + assert.Nil(t, Default()) + + _, err := Start(context.Background(), "echo", "test") + assert.ErrorIs(t, err, ErrServiceNotInitialized) + + _, err = Run(context.Background(), "echo", "test") + assert.ErrorIs(t, err, ErrServiceNotInitialized) + + _, err = Get("proc-1") + assert.ErrorIs(t, err, ErrServiceNotInitialized) + + assert.Nil(t, List()) + assert.Nil(t, Running()) + + err = Kill("proc-1") + assert.ErrorIs(t, err, ErrServiceNotInitialized) + + _, err = StartWithOptions(context.Background(), RunOptions{Command: "echo"}) + assert.ErrorIs(t, err, ErrServiceNotInitialized) + + _, err = RunWithOptions(context.Background(), RunOptions{Command: "echo"}) + assert.ErrorIs(t, err, ErrServiceNotInitialized) +} + +func TestGlobal_SetDefault(t *testing.T) { + t.Run("sets and retrieves service", func(t *testing.T) { + // Reset global state + old := defaultService.Swap(nil) + defer func() { + if old != nil { + defaultService.Store(old) + } + }() + + core, err := framework.New( + framework.WithName("process", NewService(Options{})), + ) + require.NoError(t, err) + + svc, err := framework.ServiceFor[*Service](core, "process") + require.NoError(t, err) + + SetDefault(svc) + assert.Equal(t, svc, Default()) + }) + + t.Run("panics on nil", func(t *testing.T) { + assert.Panics(t, func() { + SetDefault(nil) + }) + }) +} + +func TestGlobal_ConcurrentDefault(t *testing.T) { + // Reset global state + old := defaultService.Swap(nil) + defer func() { + if old != nil { + defaultService.Store(old) + } + }() + + core, err := framework.New( + framework.WithName("process", NewService(Options{})), + ) + require.NoError(t, err) + + svc, err := framework.ServiceFor[*Service](core, "process") + require.NoError(t, err) + + SetDefault(svc) + + // Concurrent reads of Default() + var wg sync.WaitGroup + for i := 0; i < 100; i++ { + wg.Add(1) + go func() { + defer wg.Done() + s := Default() + assert.NotNil(t, s) + assert.Equal(t, svc, s) + }() + } + wg.Wait() +} + +func TestGlobal_ConcurrentSetDefault(t *testing.T) { + // Reset global state + old := defaultService.Swap(nil) + defer func() { + if old != nil { + defaultService.Store(old) + } + }() + + // Create multiple services + var services []*Service + for i := 0; i < 10; i++ { + core, err := framework.New( + framework.WithName("process", NewService(Options{})), + ) + require.NoError(t, err) + + svc, err := framework.ServiceFor[*Service](core, "process") + require.NoError(t, err) + services = append(services, svc) + } + + // Concurrent SetDefault calls - should not panic or race + var wg sync.WaitGroup + for _, svc := range services { + wg.Add(1) + go func(s *Service) { + defer wg.Done() + SetDefault(s) + }(svc) + } + wg.Wait() + + // Final state should be one of the services + final := Default() + assert.NotNil(t, final) + + found := false + for _, svc := range services { + if svc == final { + found = true + break + } + } + assert.True(t, found, "Default should be one of the set services") +} + +func TestGlobal_ConcurrentOperations(t *testing.T) { + // Reset global state + old := defaultService.Swap(nil) + defer func() { + if old != nil { + defaultService.Store(old) + } + }() + + core, err := framework.New( + framework.WithName("process", NewService(Options{})), + ) + require.NoError(t, err) + + svc, err := framework.ServiceFor[*Service](core, "process") + require.NoError(t, err) + + SetDefault(svc) + + // Concurrent Start, List, Get operations + var wg sync.WaitGroup + var processes []*Process + var procMu sync.Mutex + + // Start 20 processes concurrently + for i := 0; i < 20; i++ { + wg.Add(1) + go func() { + defer wg.Done() + proc, err := Start(context.Background(), "echo", "concurrent") + if err == nil { + procMu.Lock() + processes = append(processes, proc) + procMu.Unlock() + } + }() + } + + // Concurrent List calls while starting + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + defer wg.Done() + _ = List() + _ = Running() + }() + } + + wg.Wait() + + // Wait for all processes to complete + procMu.Lock() + for _, p := range processes { + <-p.Done() + } + procMu.Unlock() + + // All should have succeeded + assert.Len(t, processes, 20) + + // Concurrent Get calls + var wg2 sync.WaitGroup + for _, p := range processes { + wg2.Add(1) + go func(id string) { + defer wg2.Done() + got, err := Get(id) + assert.NoError(t, err) + assert.NotNil(t, got) + }(p.ID) + } + wg2.Wait() +} + +func TestGlobal_StartWithOptions(t *testing.T) { + svc, _ := newTestService(t) + + // Set as default + old := defaultService.Swap(svc) + defer func() { + if old != nil { + defaultService.Store(old) + } + }() + + proc, err := StartWithOptions(context.Background(), RunOptions{ + Command: "echo", + Args: []string{"with", "options"}, + }) + require.NoError(t, err) + + <-proc.Done() + + assert.Equal(t, 0, proc.ExitCode) + assert.Contains(t, proc.Output(), "with options") +} + +func TestGlobal_RunWithOptions(t *testing.T) { + svc, _ := newTestService(t) + + // Set as default + old := defaultService.Swap(svc) + defer func() { + if old != nil { + defaultService.Store(old) + } + }() + + output, err := RunWithOptions(context.Background(), RunOptions{ + Command: "echo", + Args: []string{"run", "options"}, + }) + require.NoError(t, err) + assert.Contains(t, output, "run options") +} + +func TestGlobal_Running(t *testing.T) { + svc, _ := newTestService(t) + + // Set as default + old := defaultService.Swap(svc) + defer func() { + if old != nil { + defaultService.Store(old) + } + }() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Start a long-running process + proc, err := Start(ctx, "sleep", "60") + require.NoError(t, err) + + running := Running() + assert.Len(t, running, 1) + assert.Equal(t, proc.ID, running[0].ID) + + cancel() + <-proc.Done() + + running = Running() + assert.Len(t, running, 0) +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..bb6dee6 --- /dev/null +++ b/go.mod @@ -0,0 +1,15 @@ +module forge.lthn.ai/core/go-process + +go 1.26.0 + +require ( + forge.lthn.ai/core/go v0.1.0 + github.com/stretchr/testify v1.11.1 +) + +require ( + github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect + github.com/kr/pretty v0.3.1 // indirect + github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..1706355 --- /dev/null +++ b/go.sum @@ -0,0 +1,22 @@ +forge.lthn.ai/core/go v0.1.0 h1:Ow/1NTajrrNPO0zgkskEyEGdx4SKpiNqTaqM0txNOYI= +forge.lthn.ai/core/go v0.1.0/go.mod h1:lwi0tccAlg5j3k6CfoNJEueBc5l9mUeSBX/x6uY8ZbQ= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= +github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= +github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/process.go b/process.go new file mode 100644 index 0000000..45ee0d9 --- /dev/null +++ b/process.go @@ -0,0 +1,167 @@ +package process + +import ( + "context" + "io" + "os/exec" + "sync" + "time" +) + +// Process represents a managed external process. +type Process struct { + ID string + Command string + Args []string + Dir string + Env []string + StartedAt time.Time + Status Status + ExitCode int + Duration time.Duration + + cmd *exec.Cmd + ctx context.Context + cancel context.CancelFunc + output *RingBuffer + stdin io.WriteCloser + done chan struct{} + mu sync.RWMutex +} + +// Info returns a snapshot of process state. +func (p *Process) Info() Info { + p.mu.RLock() + defer p.mu.RUnlock() + + pid := 0 + if p.cmd != nil && p.cmd.Process != nil { + pid = p.cmd.Process.Pid + } + + return Info{ + ID: p.ID, + Command: p.Command, + Args: p.Args, + Dir: p.Dir, + StartedAt: p.StartedAt, + Status: p.Status, + ExitCode: p.ExitCode, + Duration: p.Duration, + PID: pid, + } +} + +// Output returns the captured output as a string. +func (p *Process) Output() string { + p.mu.RLock() + defer p.mu.RUnlock() + if p.output == nil { + return "" + } + return p.output.String() +} + +// OutputBytes returns the captured output as bytes. +func (p *Process) OutputBytes() []byte { + p.mu.RLock() + defer p.mu.RUnlock() + if p.output == nil { + return nil + } + return p.output.Bytes() +} + +// IsRunning returns true if the process is still executing. +func (p *Process) IsRunning() bool { + p.mu.RLock() + defer p.mu.RUnlock() + return p.Status == StatusRunning +} + +// Wait blocks until the process exits. +func (p *Process) Wait() error { + <-p.done + p.mu.RLock() + defer p.mu.RUnlock() + if p.Status == StatusFailed || p.Status == StatusKilled { + return &exec.ExitError{} + } + if p.ExitCode != 0 { + return &exec.ExitError{} + } + return nil +} + +// Done returns a channel that closes when the process exits. +func (p *Process) Done() <-chan struct{} { + return p.done +} + +// Kill forcefully terminates the process. +func (p *Process) Kill() error { + p.mu.Lock() + defer p.mu.Unlock() + + if p.Status != StatusRunning { + return nil + } + + if p.cmd == nil || p.cmd.Process == nil { + return nil + } + + return p.cmd.Process.Kill() +} + +// Signal sends a signal to the process. +func (p *Process) Signal(sig interface{ Signal() }) error { + p.mu.Lock() + defer p.mu.Unlock() + + if p.Status != StatusRunning { + return nil + } + + if p.cmd == nil || p.cmd.Process == nil { + return nil + } + + // Type assert to os.Signal for Process.Signal + if osSig, ok := sig.(interface{ String() string }); ok { + _ = osSig // Satisfy linter + } + + return p.cmd.Process.Kill() // Simplified - would use Signal in full impl +} + +// SendInput writes to the process stdin. +func (p *Process) SendInput(input string) error { + p.mu.RLock() + defer p.mu.RUnlock() + + if p.Status != StatusRunning { + return ErrProcessNotRunning + } + + if p.stdin == nil { + return ErrStdinNotAvailable + } + + _, err := p.stdin.Write([]byte(input)) + return err +} + +// CloseStdin closes the process stdin pipe. +func (p *Process) CloseStdin() error { + p.mu.Lock() + defer p.mu.Unlock() + + if p.stdin == nil { + return nil + } + + err := p.stdin.Close() + p.stdin = nil + return err +} diff --git a/process_global.go b/process_global.go new file mode 100644 index 0000000..deed860 --- /dev/null +++ b/process_global.go @@ -0,0 +1,133 @@ +package process + +import ( + "context" + "sync" + "sync/atomic" + + "forge.lthn.ai/core/go/pkg/framework" +) + +// Global default service (follows i18n pattern). +var ( + defaultService atomic.Pointer[Service] + defaultOnce sync.Once + defaultErr error +) + +// Default returns the global process service. +// Returns nil if not initialized. +func Default() *Service { + return defaultService.Load() +} + +// SetDefault sets the global process service. +// Thread-safe: can be called concurrently with Default(). +func SetDefault(s *Service) { + if s == nil { + panic("process: SetDefault called with nil service") + } + defaultService.Store(s) +} + +// Init initializes the default global service with a Core instance. +// This is typically called during application startup. +func Init(c *framework.Core) error { + defaultOnce.Do(func() { + factory := NewService(Options{}) + svc, err := factory(c) + if err != nil { + defaultErr = err + return + } + defaultService.Store(svc.(*Service)) + }) + return defaultErr +} + +// --- Global convenience functions --- + +// Start spawns a new process using the default service. +func Start(ctx context.Context, command string, args ...string) (*Process, error) { + svc := Default() + if svc == nil { + return nil, ErrServiceNotInitialized + } + return svc.Start(ctx, command, args...) +} + +// Run executes a command and waits for completion using the default service. +func Run(ctx context.Context, command string, args ...string) (string, error) { + svc := Default() + if svc == nil { + return "", ErrServiceNotInitialized + } + return svc.Run(ctx, command, args...) +} + +// Get returns a process by ID from the default service. +func Get(id string) (*Process, error) { + svc := Default() + if svc == nil { + return nil, ErrServiceNotInitialized + } + return svc.Get(id) +} + +// List returns all processes from the default service. +func List() []*Process { + svc := Default() + if svc == nil { + return nil + } + return svc.List() +} + +// Kill terminates a process by ID using the default service. +func Kill(id string) error { + svc := Default() + if svc == nil { + return ErrServiceNotInitialized + } + return svc.Kill(id) +} + +// StartWithOptions spawns a process with full configuration using the default service. +func StartWithOptions(ctx context.Context, opts RunOptions) (*Process, error) { + svc := Default() + if svc == nil { + return nil, ErrServiceNotInitialized + } + return svc.StartWithOptions(ctx, opts) +} + +// RunWithOptions executes a command with options and waits using the default service. +func RunWithOptions(ctx context.Context, opts RunOptions) (string, error) { + svc := Default() + if svc == nil { + return "", ErrServiceNotInitialized + } + return svc.RunWithOptions(ctx, opts) +} + +// Running returns all currently running processes from the default service. +func Running() []*Process { + svc := Default() + if svc == nil { + return nil + } + return svc.Running() +} + +// ErrServiceNotInitialized is returned when the service is not initialized. +var ErrServiceNotInitialized = &ServiceError{msg: "process: service not initialized"} + +// ServiceError represents a service-level error. +type ServiceError struct { + msg string +} + +// Error returns the service error message. +func (e *ServiceError) Error() string { + return e.msg +} diff --git a/process_test.go b/process_test.go new file mode 100644 index 0000000..8bf7bf7 --- /dev/null +++ b/process_test.go @@ -0,0 +1,227 @@ +package process + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestProcess_Info(t *testing.T) { + svc, _ := newTestService(t) + + proc, err := svc.Start(context.Background(), "echo", "hello") + require.NoError(t, err) + + <-proc.Done() + + info := proc.Info() + assert.Equal(t, proc.ID, info.ID) + assert.Equal(t, "echo", info.Command) + assert.Equal(t, []string{"hello"}, info.Args) + assert.Equal(t, StatusExited, info.Status) + assert.Equal(t, 0, info.ExitCode) + assert.Greater(t, info.Duration, time.Duration(0)) +} + +func TestProcess_Output(t *testing.T) { + t.Run("captures stdout", func(t *testing.T) { + svc, _ := newTestService(t) + + proc, err := svc.Start(context.Background(), "echo", "hello world") + require.NoError(t, err) + + <-proc.Done() + + output := proc.Output() + assert.Contains(t, output, "hello world") + }) + + t.Run("OutputBytes returns copy", func(t *testing.T) { + svc, _ := newTestService(t) + + proc, err := svc.Start(context.Background(), "echo", "test") + require.NoError(t, err) + + <-proc.Done() + + bytes := proc.OutputBytes() + assert.NotNil(t, bytes) + assert.Contains(t, string(bytes), "test") + }) +} + +func TestProcess_IsRunning(t *testing.T) { + t.Run("true while running", func(t *testing.T) { + svc, _ := newTestService(t) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + proc, err := svc.Start(ctx, "sleep", "10") + require.NoError(t, err) + + assert.True(t, proc.IsRunning()) + + cancel() + <-proc.Done() + + assert.False(t, proc.IsRunning()) + }) + + t.Run("false after completion", func(t *testing.T) { + svc, _ := newTestService(t) + + proc, err := svc.Start(context.Background(), "echo", "done") + require.NoError(t, err) + + <-proc.Done() + + assert.False(t, proc.IsRunning()) + }) +} + +func TestProcess_Wait(t *testing.T) { + t.Run("returns nil on success", func(t *testing.T) { + svc, _ := newTestService(t) + + proc, err := svc.Start(context.Background(), "echo", "ok") + require.NoError(t, err) + + err = proc.Wait() + assert.NoError(t, err) + }) + + t.Run("returns error on failure", func(t *testing.T) { + svc, _ := newTestService(t) + + proc, err := svc.Start(context.Background(), "sh", "-c", "exit 1") + require.NoError(t, err) + + err = proc.Wait() + assert.Error(t, err) + }) +} + +func TestProcess_Done(t *testing.T) { + t.Run("channel closes on completion", func(t *testing.T) { + svc, _ := newTestService(t) + + proc, err := svc.Start(context.Background(), "echo", "test") + require.NoError(t, err) + + select { + case <-proc.Done(): + // Success - channel closed + case <-time.After(5 * time.Second): + t.Fatal("Done channel should have closed") + } + }) +} + +func TestProcess_Kill(t *testing.T) { + t.Run("terminates running process", func(t *testing.T) { + svc, _ := newTestService(t) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + proc, err := svc.Start(ctx, "sleep", "60") + require.NoError(t, err) + + assert.True(t, proc.IsRunning()) + + err = proc.Kill() + assert.NoError(t, err) + + select { + case <-proc.Done(): + // Good - process terminated + case <-time.After(2 * time.Second): + t.Fatal("process should have been killed") + } + }) + + t.Run("noop on completed process", func(t *testing.T) { + svc, _ := newTestService(t) + + proc, err := svc.Start(context.Background(), "echo", "done") + require.NoError(t, err) + + <-proc.Done() + + err = proc.Kill() + assert.NoError(t, err) + }) +} + +func TestProcess_SendInput(t *testing.T) { + t.Run("writes to stdin", func(t *testing.T) { + svc, _ := newTestService(t) + + // Use cat to echo back stdin + proc, err := svc.Start(context.Background(), "cat") + require.NoError(t, err) + + err = proc.SendInput("hello\n") + assert.NoError(t, err) + + err = proc.CloseStdin() + assert.NoError(t, err) + + <-proc.Done() + + assert.Contains(t, proc.Output(), "hello") + }) + + t.Run("error on completed process", func(t *testing.T) { + svc, _ := newTestService(t) + + proc, err := svc.Start(context.Background(), "echo", "done") + require.NoError(t, err) + + <-proc.Done() + + err = proc.SendInput("test") + assert.ErrorIs(t, err, ErrProcessNotRunning) + }) +} + +func TestProcess_CloseStdin(t *testing.T) { + t.Run("closes stdin pipe", func(t *testing.T) { + svc, _ := newTestService(t) + + proc, err := svc.Start(context.Background(), "cat") + require.NoError(t, err) + + err = proc.CloseStdin() + assert.NoError(t, err) + + // Process should exit now that stdin is closed + select { + case <-proc.Done(): + // Good + case <-time.After(2 * time.Second): + t.Fatal("cat should exit when stdin is closed") + } + }) + + t.Run("double close is safe", func(t *testing.T) { + svc, _ := newTestService(t) + + proc, err := svc.Start(context.Background(), "cat") + require.NoError(t, err) + + // First close + err = proc.CloseStdin() + assert.NoError(t, err) + + <-proc.Done() + + // Second close should be safe (stdin already nil) + err = proc.CloseStdin() + assert.NoError(t, err) + }) +} diff --git a/runner.go b/runner.go new file mode 100644 index 0000000..8eb1fd5 --- /dev/null +++ b/runner.go @@ -0,0 +1,293 @@ +package process + +import ( + "context" + "errors" + "sync" + "time" +) + +// Runner orchestrates multiple processes with dependencies. +type Runner struct { + service *Service +} + +// NewRunner creates a runner for the given service. +func NewRunner(svc *Service) *Runner { + return &Runner{service: svc} +} + +// RunSpec defines a process to run with optional dependencies. +type RunSpec struct { + // Name is a friendly identifier (e.g., "lint", "test"). + Name string + // Command is the executable to run. + Command string + // Args are the command arguments. + Args []string + // Dir is the working directory. + Dir string + // Env are additional environment variables. + Env []string + // After lists spec names that must complete successfully first. + After []string + // AllowFailure if true, continues pipeline even if this spec fails. + AllowFailure bool +} + +// RunResult captures the outcome of a single process. +type RunResult struct { + Name string + Spec RunSpec + ExitCode int + Duration time.Duration + Output string + Error error + Skipped bool +} + +// Passed returns true if the process succeeded. +func (r RunResult) Passed() bool { + return !r.Skipped && r.Error == nil && r.ExitCode == 0 +} + +// RunAllResult is the aggregate result of running multiple specs. +type RunAllResult struct { + Results []RunResult + Duration time.Duration + Passed int + Failed int + Skipped int +} + +// Success returns true if all non-skipped specs passed. +func (r RunAllResult) Success() bool { + return r.Failed == 0 +} + +// RunAll executes specs respecting dependencies, parallelising where possible. +func (r *Runner) RunAll(ctx context.Context, specs []RunSpec) (*RunAllResult, error) { + start := time.Now() + + // Build dependency graph + specMap := make(map[string]RunSpec) + for _, spec := range specs { + specMap[spec.Name] = spec + } + + // Track completion + completed := make(map[string]*RunResult) + var completedMu sync.Mutex + + results := make([]RunResult, 0, len(specs)) + var resultsMu sync.Mutex + + // Process specs in waves + remaining := make(map[string]RunSpec) + for _, spec := range specs { + remaining[spec.Name] = spec + } + + for len(remaining) > 0 { + // Find specs ready to run (all dependencies satisfied) + ready := make([]RunSpec, 0) + for _, spec := range remaining { + if r.canRun(spec, completed) { + ready = append(ready, spec) + } + } + + if len(ready) == 0 && len(remaining) > 0 { + // Deadlock - circular dependency or missing specs + for name := range remaining { + results = append(results, RunResult{ + Name: name, + Spec: remaining[name], + Skipped: true, + Error: errors.New("circular dependency or missing dependency"), + }) + } + break + } + + // Run ready specs in parallel + var wg sync.WaitGroup + for _, spec := range ready { + wg.Add(1) + go func(spec RunSpec) { + defer wg.Done() + + // Check if dependencies failed + completedMu.Lock() + shouldSkip := false + for _, dep := range spec.After { + if result, ok := completed[dep]; ok { + if !result.Passed() && !specMap[dep].AllowFailure { + shouldSkip = true + break + } + } + } + completedMu.Unlock() + + var result RunResult + if shouldSkip { + result = RunResult{ + Name: spec.Name, + Spec: spec, + Skipped: true, + Error: errors.New("skipped due to dependency failure"), + } + } else { + result = r.runSpec(ctx, spec) + } + + completedMu.Lock() + completed[spec.Name] = &result + completedMu.Unlock() + + resultsMu.Lock() + results = append(results, result) + resultsMu.Unlock() + }(spec) + } + wg.Wait() + + // Remove completed from remaining + for _, spec := range ready { + delete(remaining, spec.Name) + } + } + + // Build aggregate result + aggResult := &RunAllResult{ + Results: results, + Duration: time.Since(start), + } + + for _, res := range results { + if res.Skipped { + aggResult.Skipped++ + } else if res.Passed() { + aggResult.Passed++ + } else { + aggResult.Failed++ + } + } + + return aggResult, nil +} + +// canRun checks if all dependencies are completed. +func (r *Runner) canRun(spec RunSpec, completed map[string]*RunResult) bool { + for _, dep := range spec.After { + if _, ok := completed[dep]; !ok { + return false + } + } + return true +} + +// runSpec executes a single spec. +func (r *Runner) runSpec(ctx context.Context, spec RunSpec) RunResult { + start := time.Now() + + proc, err := r.service.StartWithOptions(ctx, RunOptions{ + Command: spec.Command, + Args: spec.Args, + Dir: spec.Dir, + Env: spec.Env, + }) + if err != nil { + return RunResult{ + Name: spec.Name, + Spec: spec, + Duration: time.Since(start), + Error: err, + } + } + + <-proc.Done() + + return RunResult{ + Name: spec.Name, + Spec: spec, + ExitCode: proc.ExitCode, + Duration: proc.Duration, + Output: proc.Output(), + Error: nil, + } +} + +// RunSequential executes specs one after another, stopping on first failure. +func (r *Runner) RunSequential(ctx context.Context, specs []RunSpec) (*RunAllResult, error) { + start := time.Now() + results := make([]RunResult, 0, len(specs)) + + for _, spec := range specs { + result := r.runSpec(ctx, spec) + results = append(results, result) + + if !result.Passed() && !spec.AllowFailure { + // Mark remaining as skipped + for i := len(results); i < len(specs); i++ { + results = append(results, RunResult{ + Name: specs[i].Name, + Spec: specs[i], + Skipped: true, + }) + } + break + } + } + + aggResult := &RunAllResult{ + Results: results, + Duration: time.Since(start), + } + + for _, res := range results { + if res.Skipped { + aggResult.Skipped++ + } else if res.Passed() { + aggResult.Passed++ + } else { + aggResult.Failed++ + } + } + + return aggResult, nil +} + +// RunParallel executes all specs concurrently, regardless of dependencies. +func (r *Runner) RunParallel(ctx context.Context, specs []RunSpec) (*RunAllResult, error) { + start := time.Now() + results := make([]RunResult, len(specs)) + + var wg sync.WaitGroup + for i, spec := range specs { + wg.Add(1) + go func(i int, spec RunSpec) { + defer wg.Done() + results[i] = r.runSpec(ctx, spec) + }(i, spec) + } + wg.Wait() + + aggResult := &RunAllResult{ + Results: results, + Duration: time.Since(start), + } + + for _, res := range results { + if res.Skipped { + aggResult.Skipped++ + } else if res.Passed() { + aggResult.Passed++ + } else { + aggResult.Failed++ + } + } + + return aggResult, nil +} diff --git a/runner_test.go b/runner_test.go new file mode 100644 index 0000000..7d27f8c --- /dev/null +++ b/runner_test.go @@ -0,0 +1,176 @@ +package process + +import ( + "context" + "testing" + + "forge.lthn.ai/core/go/pkg/framework" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func newTestRunner(t *testing.T) *Runner { + t.Helper() + + core, err := framework.New( + framework.WithName("process", NewService(Options{})), + ) + require.NoError(t, err) + + svc, err := framework.ServiceFor[*Service](core, "process") + require.NoError(t, err) + + return NewRunner(svc) +} + +func TestRunner_RunSequential(t *testing.T) { + t.Run("all pass", func(t *testing.T) { + runner := newTestRunner(t) + + result, err := runner.RunSequential(context.Background(), []RunSpec{ + {Name: "first", Command: "echo", Args: []string{"1"}}, + {Name: "second", Command: "echo", Args: []string{"2"}}, + {Name: "third", Command: "echo", Args: []string{"3"}}, + }) + require.NoError(t, err) + + assert.True(t, result.Success()) + assert.Equal(t, 3, result.Passed) + assert.Equal(t, 0, result.Failed) + assert.Equal(t, 0, result.Skipped) + }) + + t.Run("stops on failure", func(t *testing.T) { + runner := newTestRunner(t) + + result, err := runner.RunSequential(context.Background(), []RunSpec{ + {Name: "first", Command: "echo", Args: []string{"1"}}, + {Name: "fails", Command: "sh", Args: []string{"-c", "exit 1"}}, + {Name: "third", Command: "echo", Args: []string{"3"}}, + }) + require.NoError(t, err) + + assert.False(t, result.Success()) + assert.Equal(t, 1, result.Passed) + assert.Equal(t, 1, result.Failed) + assert.Equal(t, 1, result.Skipped) + }) + + t.Run("allow failure continues", func(t *testing.T) { + runner := newTestRunner(t) + + result, err := runner.RunSequential(context.Background(), []RunSpec{ + {Name: "first", Command: "echo", Args: []string{"1"}}, + {Name: "fails", Command: "sh", Args: []string{"-c", "exit 1"}, AllowFailure: true}, + {Name: "third", Command: "echo", Args: []string{"3"}}, + }) + require.NoError(t, err) + + // Still counts as failed but pipeline continues + assert.Equal(t, 2, result.Passed) + assert.Equal(t, 1, result.Failed) + assert.Equal(t, 0, result.Skipped) + }) +} + +func TestRunner_RunParallel(t *testing.T) { + t.Run("all run concurrently", func(t *testing.T) { + runner := newTestRunner(t) + + result, err := runner.RunParallel(context.Background(), []RunSpec{ + {Name: "first", Command: "echo", Args: []string{"1"}}, + {Name: "second", Command: "echo", Args: []string{"2"}}, + {Name: "third", Command: "echo", Args: []string{"3"}}, + }) + require.NoError(t, err) + + assert.True(t, result.Success()) + assert.Equal(t, 3, result.Passed) + assert.Len(t, result.Results, 3) + }) + + t.Run("failure doesnt stop others", func(t *testing.T) { + runner := newTestRunner(t) + + result, err := runner.RunParallel(context.Background(), []RunSpec{ + {Name: "first", Command: "echo", Args: []string{"1"}}, + {Name: "fails", Command: "sh", Args: []string{"-c", "exit 1"}}, + {Name: "third", Command: "echo", Args: []string{"3"}}, + }) + require.NoError(t, err) + + assert.False(t, result.Success()) + assert.Equal(t, 2, result.Passed) + assert.Equal(t, 1, result.Failed) + }) +} + +func TestRunner_RunAll(t *testing.T) { + t.Run("respects dependencies", func(t *testing.T) { + runner := newTestRunner(t) + + result, err := runner.RunAll(context.Background(), []RunSpec{ + {Name: "third", Command: "echo", Args: []string{"3"}, After: []string{"second"}}, + {Name: "first", Command: "echo", Args: []string{"1"}}, + {Name: "second", Command: "echo", Args: []string{"2"}, After: []string{"first"}}, + }) + require.NoError(t, err) + + assert.True(t, result.Success()) + assert.Equal(t, 3, result.Passed) + }) + + t.Run("skips dependents on failure", func(t *testing.T) { + runner := newTestRunner(t) + + result, err := runner.RunAll(context.Background(), []RunSpec{ + {Name: "first", Command: "sh", Args: []string{"-c", "exit 1"}}, + {Name: "second", Command: "echo", Args: []string{"2"}, After: []string{"first"}}, + {Name: "third", Command: "echo", Args: []string{"3"}, After: []string{"second"}}, + }) + require.NoError(t, err) + + assert.False(t, result.Success()) + assert.Equal(t, 0, result.Passed) + assert.Equal(t, 1, result.Failed) + assert.Equal(t, 2, result.Skipped) + }) + + t.Run("parallel independent specs", func(t *testing.T) { + runner := newTestRunner(t) + + // These should run in parallel since they have no dependencies + result, err := runner.RunAll(context.Background(), []RunSpec{ + {Name: "a", Command: "echo", Args: []string{"a"}}, + {Name: "b", Command: "echo", Args: []string{"b"}}, + {Name: "c", Command: "echo", Args: []string{"c"}}, + {Name: "final", Command: "echo", Args: []string{"done"}, After: []string{"a", "b", "c"}}, + }) + require.NoError(t, err) + + assert.True(t, result.Success()) + assert.Equal(t, 4, result.Passed) + }) +} + +func TestRunResult_Passed(t *testing.T) { + t.Run("success", func(t *testing.T) { + r := RunResult{ExitCode: 0} + assert.True(t, r.Passed()) + }) + + t.Run("non-zero exit", func(t *testing.T) { + r := RunResult{ExitCode: 1} + assert.False(t, r.Passed()) + }) + + t.Run("skipped", func(t *testing.T) { + r := RunResult{ExitCode: 0, Skipped: true} + assert.False(t, r.Passed()) + }) + + t.Run("error", func(t *testing.T) { + r := RunResult{ExitCode: 0, Error: assert.AnError} + assert.False(t, r.Passed()) + }) +} diff --git a/service.go b/service.go new file mode 100644 index 0000000..405fac1 --- /dev/null +++ b/service.go @@ -0,0 +1,378 @@ +package process + +import ( + "bufio" + "context" + "errors" + "fmt" + "io" + "os/exec" + "sync" + "sync/atomic" + "time" + + "forge.lthn.ai/core/go/pkg/framework" +) + +// Default buffer size for process output (1MB). +const DefaultBufferSize = 1024 * 1024 + +// Errors +var ( + ErrProcessNotFound = errors.New("process not found") + ErrProcessNotRunning = errors.New("process is not running") + ErrStdinNotAvailable = errors.New("stdin not available") +) + +// Service manages process execution with Core IPC integration. +type Service struct { + *framework.ServiceRuntime[Options] + + processes map[string]*Process + mu sync.RWMutex + bufSize int + idCounter atomic.Uint64 +} + +// Options configures the process service. +type Options struct { + // BufferSize is the ring buffer size for output capture. + // Default: 1MB (1024 * 1024 bytes). + BufferSize int +} + +// NewService creates a process service factory for Core registration. +// +// core, _ := framework.New( +// framework.WithName("process", process.NewService(process.Options{})), +// ) +func NewService(opts Options) func(*framework.Core) (any, error) { + return func(c *framework.Core) (any, error) { + if opts.BufferSize == 0 { + opts.BufferSize = DefaultBufferSize + } + svc := &Service{ + ServiceRuntime: framework.NewServiceRuntime(c, opts), + processes: make(map[string]*Process), + bufSize: opts.BufferSize, + } + return svc, nil + } +} + +// OnStartup implements framework.Startable. +func (s *Service) OnStartup(ctx context.Context) error { + return nil +} + +// OnShutdown implements framework.Stoppable. +// Kills all running processes on shutdown. +func (s *Service) OnShutdown(ctx context.Context) error { + s.mu.RLock() + procs := make([]*Process, 0, len(s.processes)) + for _, p := range s.processes { + if p.IsRunning() { + procs = append(procs, p) + } + } + s.mu.RUnlock() + + for _, p := range procs { + _ = p.Kill() + } + + return nil +} + +// Start spawns a new process with the given command and args. +func (s *Service) Start(ctx context.Context, command string, args ...string) (*Process, error) { + return s.StartWithOptions(ctx, RunOptions{ + Command: command, + Args: args, + }) +} + +// StartWithOptions spawns a process with full configuration. +func (s *Service) StartWithOptions(ctx context.Context, opts RunOptions) (*Process, error) { + id := fmt.Sprintf("proc-%d", s.idCounter.Add(1)) + + procCtx, cancel := context.WithCancel(ctx) + cmd := exec.CommandContext(procCtx, opts.Command, opts.Args...) + + if opts.Dir != "" { + cmd.Dir = opts.Dir + } + if len(opts.Env) > 0 { + cmd.Env = append(cmd.Environ(), opts.Env...) + } + + // Set up pipes + stdout, err := cmd.StdoutPipe() + if err != nil { + cancel() + return nil, fmt.Errorf("failed to create stdout pipe: %w", err) + } + + stderr, err := cmd.StderrPipe() + if err != nil { + cancel() + return nil, fmt.Errorf("failed to create stderr pipe: %w", err) + } + + stdin, err := cmd.StdinPipe() + if err != nil { + cancel() + return nil, fmt.Errorf("failed to create stdin pipe: %w", err) + } + + // Create output buffer (enabled by default) + var output *RingBuffer + if !opts.DisableCapture { + output = NewRingBuffer(s.bufSize) + } + + proc := &Process{ + ID: id, + Command: opts.Command, + Args: opts.Args, + Dir: opts.Dir, + Env: opts.Env, + StartedAt: time.Now(), + Status: StatusRunning, + cmd: cmd, + ctx: procCtx, + cancel: cancel, + output: output, + stdin: stdin, + done: make(chan struct{}), + } + + // Start the process + if err := cmd.Start(); err != nil { + cancel() + return nil, fmt.Errorf("failed to start process: %w", err) + } + + // Store process + s.mu.Lock() + s.processes[id] = proc + s.mu.Unlock() + + // Broadcast start + _ = s.Core().ACTION(ActionProcessStarted{ + ID: id, + Command: opts.Command, + Args: opts.Args, + Dir: opts.Dir, + PID: cmd.Process.Pid, + }) + + // Stream output in goroutines + var wg sync.WaitGroup + wg.Add(2) + go func() { + defer wg.Done() + s.streamOutput(proc, stdout, StreamStdout) + }() + go func() { + defer wg.Done() + s.streamOutput(proc, stderr, StreamStderr) + }() + + // Wait for process completion + go func() { + // Wait for output streaming to complete + wg.Wait() + + // Wait for process exit + err := cmd.Wait() + + duration := time.Since(proc.StartedAt) + + proc.mu.Lock() + proc.Duration = duration + if err != nil { + var exitErr *exec.ExitError + if errors.As(err, &exitErr) { + proc.ExitCode = exitErr.ExitCode() + proc.Status = StatusExited + } else { + proc.Status = StatusFailed + } + } else { + proc.ExitCode = 0 + proc.Status = StatusExited + } + status := proc.Status + exitCode := proc.ExitCode + proc.mu.Unlock() + + close(proc.done) + + // Broadcast exit + var exitErr error + if status == StatusFailed { + exitErr = err + } + _ = s.Core().ACTION(ActionProcessExited{ + ID: id, + ExitCode: exitCode, + Duration: duration, + Error: exitErr, + }) + }() + + return proc, nil +} + +// streamOutput reads from a pipe and broadcasts lines via ACTION. +func (s *Service) streamOutput(proc *Process, r io.Reader, stream Stream) { + scanner := bufio.NewScanner(r) + // Increase buffer for long lines + scanner.Buffer(make([]byte, 64*1024), 1024*1024) + + for scanner.Scan() { + line := scanner.Text() + + // Write to ring buffer + if proc.output != nil { + _, _ = proc.output.Write([]byte(line + "\n")) + } + + // Broadcast output + _ = s.Core().ACTION(ActionProcessOutput{ + ID: proc.ID, + Line: line, + Stream: stream, + }) + } +} + +// Get returns a process by ID. +func (s *Service) Get(id string) (*Process, error) { + s.mu.RLock() + defer s.mu.RUnlock() + + proc, ok := s.processes[id] + if !ok { + return nil, ErrProcessNotFound + } + return proc, nil +} + +// List returns all processes. +func (s *Service) List() []*Process { + s.mu.RLock() + defer s.mu.RUnlock() + + result := make([]*Process, 0, len(s.processes)) + for _, p := range s.processes { + result = append(result, p) + } + return result +} + +// Running returns all currently running processes. +func (s *Service) Running() []*Process { + s.mu.RLock() + defer s.mu.RUnlock() + + var result []*Process + for _, p := range s.processes { + if p.IsRunning() { + result = append(result, p) + } + } + return result +} + +// Kill terminates a process by ID. +func (s *Service) Kill(id string) error { + proc, err := s.Get(id) + if err != nil { + return err + } + + if err := proc.Kill(); err != nil { + return err + } + + _ = s.Core().ACTION(ActionProcessKilled{ + ID: id, + Signal: "SIGKILL", + }) + + return nil +} + +// Remove removes a completed process from the list. +func (s *Service) Remove(id string) error { + s.mu.Lock() + defer s.mu.Unlock() + + proc, ok := s.processes[id] + if !ok { + return ErrProcessNotFound + } + + if proc.IsRunning() { + return errors.New("cannot remove running process") + } + + delete(s.processes, id) + return nil +} + +// Clear removes all completed processes. +func (s *Service) Clear() { + s.mu.Lock() + defer s.mu.Unlock() + + for id, p := range s.processes { + if !p.IsRunning() { + delete(s.processes, id) + } + } +} + +// Output returns the captured output of a process. +func (s *Service) Output(id string) (string, error) { + proc, err := s.Get(id) + if err != nil { + return "", err + } + return proc.Output(), nil +} + +// Run executes a command and waits for completion. +// Returns the combined output and any error. +func (s *Service) Run(ctx context.Context, command string, args ...string) (string, error) { + proc, err := s.Start(ctx, command, args...) + if err != nil { + return "", err + } + + <-proc.Done() + + output := proc.Output() + if proc.ExitCode != 0 { + return output, fmt.Errorf("process exited with code %d", proc.ExitCode) + } + return output, nil +} + +// RunWithOptions executes a command with options and waits for completion. +func (s *Service) RunWithOptions(ctx context.Context, opts RunOptions) (string, error) { + proc, err := s.StartWithOptions(ctx, opts) + if err != nil { + return "", err + } + + <-proc.Done() + + output := proc.Output() + if proc.ExitCode != 0 { + return output, fmt.Errorf("process exited with code %d", proc.ExitCode) + } + return output, nil +} diff --git a/service_test.go b/service_test.go new file mode 100644 index 0000000..b72e3e2 --- /dev/null +++ b/service_test.go @@ -0,0 +1,257 @@ +package process + +import ( + "context" + "strings" + "sync" + "testing" + "time" + + "forge.lthn.ai/core/go/pkg/framework" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func newTestService(t *testing.T) (*Service, *framework.Core) { + t.Helper() + + core, err := framework.New( + framework.WithName("process", NewService(Options{BufferSize: 1024})), + ) + require.NoError(t, err) + + svc, err := framework.ServiceFor[*Service](core, "process") + require.NoError(t, err) + + return svc, core +} + +func TestService_Start(t *testing.T) { + t.Run("echo command", func(t *testing.T) { + svc, _ := newTestService(t) + + proc, err := svc.Start(context.Background(), "echo", "hello") + require.NoError(t, err) + require.NotNil(t, proc) + + assert.NotEmpty(t, proc.ID) + assert.Equal(t, "echo", proc.Command) + assert.Equal(t, []string{"hello"}, proc.Args) + + // Wait for completion + <-proc.Done() + + assert.Equal(t, StatusExited, proc.Status) + assert.Equal(t, 0, proc.ExitCode) + assert.Contains(t, proc.Output(), "hello") + }) + + t.Run("failing command", func(t *testing.T) { + svc, _ := newTestService(t) + + proc, err := svc.Start(context.Background(), "sh", "-c", "exit 42") + require.NoError(t, err) + + <-proc.Done() + + assert.Equal(t, StatusExited, proc.Status) + assert.Equal(t, 42, proc.ExitCode) + }) + + t.Run("non-existent command", func(t *testing.T) { + svc, _ := newTestService(t) + + _, err := svc.Start(context.Background(), "nonexistent_command_xyz") + assert.Error(t, err) + }) + + t.Run("with working directory", func(t *testing.T) { + svc, _ := newTestService(t) + + proc, err := svc.StartWithOptions(context.Background(), RunOptions{ + Command: "pwd", + Dir: "/tmp", + }) + require.NoError(t, err) + + <-proc.Done() + + // On macOS /tmp is a symlink to /private/tmp + output := strings.TrimSpace(proc.Output()) + assert.True(t, output == "/tmp" || output == "/private/tmp", "got: %s", output) + }) + + t.Run("context cancellation", func(t *testing.T) { + svc, _ := newTestService(t) + + ctx, cancel := context.WithCancel(context.Background()) + proc, err := svc.Start(ctx, "sleep", "10") + require.NoError(t, err) + + // Cancel immediately + cancel() + + select { + case <-proc.Done(): + // Good - process was killed + case <-time.After(2 * time.Second): + t.Fatal("process should have been killed") + } + }) +} + +func TestService_Run(t *testing.T) { + t.Run("returns output", func(t *testing.T) { + svc, _ := newTestService(t) + + output, err := svc.Run(context.Background(), "echo", "hello world") + require.NoError(t, err) + assert.Contains(t, output, "hello world") + }) + + t.Run("returns error on failure", func(t *testing.T) { + svc, _ := newTestService(t) + + _, err := svc.Run(context.Background(), "sh", "-c", "exit 1") + assert.Error(t, err) + assert.Contains(t, err.Error(), "exited with code 1") + }) +} + +func TestService_Actions(t *testing.T) { + t.Run("broadcasts events", func(t *testing.T) { + core, err := framework.New( + framework.WithName("process", NewService(Options{})), + ) + require.NoError(t, err) + + var started []ActionProcessStarted + var outputs []ActionProcessOutput + var exited []ActionProcessExited + var mu sync.Mutex + + core.RegisterAction(func(c *framework.Core, msg framework.Message) error { + mu.Lock() + defer mu.Unlock() + switch m := msg.(type) { + case ActionProcessStarted: + started = append(started, m) + case ActionProcessOutput: + outputs = append(outputs, m) + case ActionProcessExited: + exited = append(exited, m) + } + return nil + }) + + svc, _ := framework.ServiceFor[*Service](core, "process") + proc, err := svc.Start(context.Background(), "echo", "test") + require.NoError(t, err) + + <-proc.Done() + + // Give time for events to propagate + time.Sleep(10 * time.Millisecond) + + mu.Lock() + defer mu.Unlock() + + assert.Len(t, started, 1) + assert.Equal(t, "echo", started[0].Command) + assert.Equal(t, []string{"test"}, started[0].Args) + + assert.NotEmpty(t, outputs) + foundTest := false + for _, o := range outputs { + if strings.Contains(o.Line, "test") { + foundTest = true + break + } + } + assert.True(t, foundTest, "should have output containing 'test'") + + assert.Len(t, exited, 1) + assert.Equal(t, 0, exited[0].ExitCode) + }) +} + +func TestService_List(t *testing.T) { + t.Run("tracks processes", func(t *testing.T) { + svc, _ := newTestService(t) + + proc1, _ := svc.Start(context.Background(), "echo", "1") + proc2, _ := svc.Start(context.Background(), "echo", "2") + + <-proc1.Done() + <-proc2.Done() + + list := svc.List() + assert.Len(t, list, 2) + }) + + t.Run("get by id", func(t *testing.T) { + svc, _ := newTestService(t) + + proc, _ := svc.Start(context.Background(), "echo", "test") + <-proc.Done() + + got, err := svc.Get(proc.ID) + require.NoError(t, err) + assert.Equal(t, proc.ID, got.ID) + }) + + t.Run("get not found", func(t *testing.T) { + svc, _ := newTestService(t) + + _, err := svc.Get("nonexistent") + assert.ErrorIs(t, err, ErrProcessNotFound) + }) +} + +func TestService_Remove(t *testing.T) { + t.Run("removes completed process", func(t *testing.T) { + svc, _ := newTestService(t) + + proc, _ := svc.Start(context.Background(), "echo", "test") + <-proc.Done() + + err := svc.Remove(proc.ID) + require.NoError(t, err) + + _, err = svc.Get(proc.ID) + assert.ErrorIs(t, err, ErrProcessNotFound) + }) + + t.Run("cannot remove running process", func(t *testing.T) { + svc, _ := newTestService(t) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + proc, _ := svc.Start(ctx, "sleep", "10") + + err := svc.Remove(proc.ID) + assert.Error(t, err) + + cancel() + <-proc.Done() + }) +} + +func TestService_Clear(t *testing.T) { + t.Run("clears completed processes", func(t *testing.T) { + svc, _ := newTestService(t) + + proc1, _ := svc.Start(context.Background(), "echo", "1") + proc2, _ := svc.Start(context.Background(), "echo", "2") + + <-proc1.Done() + <-proc2.Done() + + assert.Len(t, svc.List(), 2) + + svc.Clear() + + assert.Len(t, svc.List(), 0) + }) +} diff --git a/types.go b/types.go new file mode 100644 index 0000000..4489af7 --- /dev/null +++ b/types.go @@ -0,0 +1,89 @@ +// Package process provides process management with Core IPC integration. +// +// The process package enables spawning, monitoring, and controlling external +// processes with output streaming via the Core ACTION system. +// +// # Getting Started +// +// // Register with Core +// core, _ := framework.New( +// framework.WithName("process", process.NewService(process.Options{})), +// ) +// +// // Get service and run a process +// svc, err := framework.ServiceFor[*process.Service](core, "process") +// if err != nil { +// return err +// } +// proc, err := svc.Start(ctx, "go", "test", "./...") +// +// # Listening for Events +// +// Process events are broadcast via Core.ACTION: +// +// core.RegisterAction(func(c *framework.Core, msg framework.Message) error { +// switch m := msg.(type) { +// case process.ActionProcessOutput: +// fmt.Print(m.Line) +// case process.ActionProcessExited: +// fmt.Printf("Exit code: %d\n", m.ExitCode) +// } +// return nil +// }) +package process + +import "time" + +// Status represents the process lifecycle state. +type Status string + +const ( + // StatusPending indicates the process is queued but not yet started. + StatusPending Status = "pending" + // StatusRunning indicates the process is actively executing. + StatusRunning Status = "running" + // StatusExited indicates the process completed (check ExitCode). + StatusExited Status = "exited" + // StatusFailed indicates the process could not be started. + StatusFailed Status = "failed" + // StatusKilled indicates the process was terminated by signal. + StatusKilled Status = "killed" +) + +// Stream identifies the output source. +type Stream string + +const ( + // StreamStdout is standard output. + StreamStdout Stream = "stdout" + // StreamStderr is standard error. + StreamStderr Stream = "stderr" +) + +// RunOptions configures process execution. +type RunOptions struct { + // Command is the executable to run. + Command string + // Args are the command arguments. + Args []string + // Dir is the working directory (empty = current). + Dir string + // Env are additional environment variables (KEY=VALUE format). + Env []string + // DisableCapture disables output buffering. + // By default, output is captured to a ring buffer. + DisableCapture bool +} + +// Info provides a snapshot of process state without internal fields. +type Info struct { + ID string `json:"id"` + Command string `json:"command"` + Args []string `json:"args"` + Dir string `json:"dir"` + StartedAt time.Time `json:"startedAt"` + Status Status `json:"status"` + ExitCode int `json:"exitCode"` + Duration time.Duration `json:"duration"` + PID int `json:"pid"` +}