From 41270b2904edf765abba1202ca70e0b824eaad55 Mon Sep 17 00:00:00 2001 From: Snider Date: Fri, 30 Jan 2026 19:48:28 +0000 Subject: [PATCH] feat(process): add process management package with Core IPC Add pkg/process for spawning, monitoring, and orchestrating external processes with Core ACTION integration: - Service with framework.ServiceRuntime integration - ACTION messages: ProcessStarted, ProcessOutput, ProcessExited - RingBuffer for output capture - Runner for orchestration (RunAll, RunSequential, RunParallel) - Dependency graph support for QA pipelines - Global convenience functions following i18n patterns Also add docs/pkg/PACKAGE_STANDARDS.md defining how to create Core packages, using pkg/i18n as the reference implementation. Co-Authored-By: Claude Opus 4.5 --- docs/pkg/PACKAGE_STANDARDS.md | 566 ++++++++++++++++++++++++++++++++++ pkg/process/actions.go | 37 +++ pkg/process/buffer.go | 108 +++++++ pkg/process/buffer_test.go | 72 +++++ pkg/process/process.go | 182 +++++++++++ pkg/process/process_global.go | 105 +++++++ pkg/process/runner.go | 293 ++++++++++++++++++ pkg/process/runner_test.go | 176 +++++++++++ pkg/process/service.go | 378 +++++++++++++++++++++++ pkg/process/service_test.go | 271 ++++++++++++++++ pkg/process/types.go | 86 ++++++ 11 files changed, 2274 insertions(+) create mode 100644 docs/pkg/PACKAGE_STANDARDS.md create mode 100644 pkg/process/actions.go create mode 100644 pkg/process/buffer.go create mode 100644 pkg/process/buffer_test.go create mode 100644 pkg/process/process.go create mode 100644 pkg/process/process_global.go create mode 100644 pkg/process/runner.go create mode 100644 pkg/process/runner_test.go create mode 100644 pkg/process/service.go create mode 100644 pkg/process/service_test.go create mode 100644 pkg/process/types.go diff --git a/docs/pkg/PACKAGE_STANDARDS.md b/docs/pkg/PACKAGE_STANDARDS.md new file mode 100644 index 00000000..c9462fe6 --- /dev/null +++ b/docs/pkg/PACKAGE_STANDARDS.md @@ -0,0 +1,566 @@ +# Core Package Standards + +This document defines the standards for creating packages in the Core framework. The `pkg/i18n` package is the reference implementation; all new packages should follow its patterns. + +## Package Structure + +A well-structured Core package follows this layout: + +``` +pkg/mypackage/ +├── types.go # Public types, constants, interfaces +├── service.go # Service struct with framework integration +├── mypackage.go # Global convenience functions +├── actions.go # ACTION messages for Core IPC (if needed) +├── hooks.go # Event hooks with atomic handlers (if needed) +├── [feature].go # Additional feature files +├── [feature]_test.go # Tests alongside implementation +└── service_test.go # Service tests +``` + +## Core Principles + +1. **Service-oriented**: Packages expose a `Service` struct that integrates with the Core framework +2. **Thread-safe**: All public APIs must be safe for concurrent use +3. **Global convenience**: Provide package-level functions that use a default service instance +4. **Options pattern**: Use functional options for configuration +5. **ACTION-based IPC**: Communicate via Core's ACTION system, not callbacks + +--- + +## Service Pattern + +### Service Struct + +Embed `framework.ServiceRuntime[T]` for Core integration: + +```go +// pkg/mypackage/service.go +package mypackage + +import ( + "sync" + "github.com/host-uk/core/pkg/framework" +) + +// Service provides mypackage functionality with Core integration. +type Service struct { + *framework.ServiceRuntime[Options] + + // Internal state (protected by mutex) + data map[string]any + mu sync.RWMutex +} + +// Options configures the service. +type Options struct { + // Document each option + BufferSize int + EnableFoo bool +} +``` + +### Service Factory + +Create a factory function for Core registration: + +```go +// NewService creates a service factory for Core registration. +// +// core, _ := framework.New( +// framework.WithName("mypackage", mypackage.NewService(mypackage.Options{})), +// ) +func NewService(opts Options) func(*framework.Core) (any, error) { + return func(c *framework.Core) (any, error) { + // Apply defaults + if opts.BufferSize == 0 { + opts.BufferSize = DefaultBufferSize + } + + svc := &Service{ + ServiceRuntime: framework.NewServiceRuntime(c, opts), + data: make(map[string]any), + } + return svc, nil + } +} +``` + +### Lifecycle Hooks + +Implement `framework.Startable` and/or `framework.Stoppable`: + +```go +// OnStartup implements framework.Startable. +func (s *Service) OnStartup(ctx context.Context) error { + // Register query/task handlers + s.Core().RegisterQuery(s.handleQuery) + s.Core().RegisterAction(s.handleAction) + return nil +} + +// OnShutdown implements framework.Stoppable. +func (s *Service) OnShutdown(ctx context.Context) error { + // Cleanup resources + return nil +} +``` + +--- + +## Global Default Pattern + +Following `pkg/i18n`, provide a global default service with atomic access: + +```go +// pkg/mypackage/mypackage.go +package mypackage + +import ( + "sync" + "sync/atomic" + + "github.com/host-uk/core/pkg/framework" +) + +// Global default service +var ( + defaultService atomic.Pointer[Service] + defaultOnce sync.Once + defaultErr error +) + +// Default returns the global service instance. +// Returns nil if not initialised. +func Default() *Service { + return defaultService.Load() +} + +// SetDefault sets the global service instance. +// Thread-safe. Panics if s is nil. +func SetDefault(s *Service) { + if s == nil { + panic("mypackage: SetDefault called with nil service") + } + defaultService.Store(s) +} + +// Init initialises the default service with a Core instance. +func Init(c *framework.Core) error { + defaultOnce.Do(func() { + factory := NewService(Options{}) + svc, err := factory(c) + if err != nil { + defaultErr = err + return + } + defaultService.Store(svc.(*Service)) + }) + return defaultErr +} +``` + +### Global Convenience Functions + +Expose the most common operations at package level: + +```go +// ErrServiceNotInitialised is returned when the service is not initialised. +var ErrServiceNotInitialised = errors.New("mypackage: service not initialised") + +// DoSomething performs an operation using the default service. +func DoSomething(arg string) (string, error) { + svc := Default() + if svc == nil { + return "", ErrServiceNotInitialised + } + return svc.DoSomething(arg) +} +``` + +--- + +## Options Pattern + +Use functional options for complex configuration: + +```go +// Option configures a Service during construction. +type Option func(*Service) + +// WithBufferSize sets the buffer size. +func WithBufferSize(size int) Option { + return func(s *Service) { + s.bufSize = size + } +} + +// WithFoo enables foo feature. +func WithFoo(enabled bool) Option { + return func(s *Service) { + s.fooEnabled = enabled + } +} + +// New creates a service with options. +func New(opts ...Option) (*Service, error) { + s := &Service{ + bufSize: DefaultBufferSize, + } + for _, opt := range opts { + opt(s) + } + return s, nil +} +``` + +--- + +## ACTION Messages (IPC) + +For services that need to communicate events, define ACTION message types: + +```go +// pkg/mypackage/actions.go +package mypackage + +import "time" + +// ActionItemCreated is broadcast when an item is created. +type ActionItemCreated struct { + ID string + Name string + CreatedAt time.Time +} + +// ActionItemUpdated is broadcast when an item changes. +type ActionItemUpdated struct { + ID string + Changes map[string]any +} + +// ActionItemDeleted is broadcast when an item is removed. +type ActionItemDeleted struct { + ID string +} +``` + +Dispatch actions via `s.Core().ACTION()`: + +```go +func (s *Service) CreateItem(name string) (*Item, error) { + item := &Item{ID: generateID(), Name: name} + + // Store item... + + // Broadcast to listeners + s.Core().ACTION(ActionItemCreated{ + ID: item.ID, + Name: item.Name, + CreatedAt: time.Now(), + }) + + return item, nil +} +``` + +Consumers register handlers: + +```go +core.RegisterAction(func(c *framework.Core, msg framework.Message) error { + switch m := msg.(type) { + case mypackage.ActionItemCreated: + log.Printf("Item created: %s", m.Name) + case mypackage.ActionItemDeleted: + log.Printf("Item deleted: %s", m.ID) + } + return nil +}) +``` + +--- + +## Hooks Pattern + +For user-customisable behaviour, use atomic handlers (see `pkg/i18n/hooks.go`): + +```go +// pkg/mypackage/hooks.go +package mypackage + +import ( + "sync/atomic" +) + +// ErrorHandler is called when an error occurs. +type ErrorHandler func(err error) + +var errorHandler atomic.Value // stores ErrorHandler + +// OnError registers an error handler. +// Thread-safe. Pass nil to clear. +func OnError(h ErrorHandler) { + if h == nil { + errorHandler.Store((ErrorHandler)(nil)) + return + } + errorHandler.Store(h) +} + +// dispatchError calls the registered error handler. +func dispatchError(err error) { + v := errorHandler.Load() + if v == nil { + return + } + h, ok := v.(ErrorHandler) + if !ok || h == nil { + return + } + h(err) +} +``` + +--- + +## Thread Safety + +### Mutex Patterns + +Use `sync.RWMutex` for state that is read more than written: + +```go +type Service struct { + data map[string]any + mu sync.RWMutex +} + +func (s *Service) Get(key string) (any, bool) { + s.mu.RLock() + defer s.mu.RUnlock() + v, ok := s.data[key] + return v, ok +} + +func (s *Service) Set(key string, value any) { + s.mu.Lock() + defer s.mu.Unlock() + s.data[key] = value +} +``` + +### Atomic Values + +Use `atomic.Pointer[T]` for single values accessed frequently: + +```go +var config atomic.Pointer[Config] + +func GetConfig() *Config { + return config.Load() +} + +func SetConfig(c *Config) { + config.Store(c) +} +``` + +--- + +## Error Handling + +### Error Types + +Define package-level errors: + +```go +// Errors +var ( + ErrNotFound = errors.New("mypackage: not found") + ErrInvalidArg = errors.New("mypackage: invalid argument") + ErrNotRunning = errors.New("mypackage: not running") +) +``` + +### Wrapped Errors + +Use `fmt.Errorf` with `%w` for context: + +```go +func (s *Service) Load(path string) error { + data, err := os.ReadFile(path) + if err != nil { + return fmt.Errorf("failed to load config: %w", err) + } + // ... +} +``` + +### Error Struct (optional) + +For errors needing additional context: + +```go +type ServiceError struct { + Op string // Operation that failed + Path string // Resource path + Err error // Underlying error +} + +func (e *ServiceError) Error() string { + return fmt.Sprintf("%s %s: %v", e.Op, e.Path, e.Err) +} + +func (e *ServiceError) Unwrap() error { + return e.Err +} +``` + +--- + +## Testing + +### Test File Organisation + +Place tests alongside implementation: + +``` +mypackage.go → mypackage_test.go +service.go → service_test.go +buffer.go → buffer_test.go +``` + +### Test Helpers + +Create helpers for common setup: + +```go +func newTestService(t *testing.T) (*Service, *framework.Core) { + t.Helper() + + core, err := framework.New( + framework.WithName("mypackage", NewService(Options{})), + ) + require.NoError(t, err) + + svc, err := framework.ServiceFor[*Service](core, "mypackage") + require.NoError(t, err) + + return svc, core +} +``` + +### Test Naming Convention + +Use descriptive subtests: + +```go +func TestService_DoSomething(t *testing.T) { + t.Run("valid input", func(t *testing.T) { + // ... + }) + + t.Run("empty input returns error", func(t *testing.T) { + // ... + }) + + t.Run("concurrent access", func(t *testing.T) { + // ... + }) +} +``` + +### Testing Actions + +Verify ACTION broadcasts: + +```go +func TestService_BroadcastsActions(t *testing.T) { + core, _ := framework.New( + framework.WithName("mypackage", NewService(Options{})), + ) + + var received []ActionItemCreated + var mu sync.Mutex + + core.RegisterAction(func(c *framework.Core, msg framework.Message) error { + if m, ok := msg.(ActionItemCreated); ok { + mu.Lock() + received = append(received, m) + mu.Unlock() + } + return nil + }) + + svc, _ := framework.ServiceFor[*Service](core, "mypackage") + svc.CreateItem("test") + + mu.Lock() + assert.Len(t, received, 1) + assert.Equal(t, "test", received[0].Name) + mu.Unlock() +} +``` + +--- + +## Documentation + +### Package Doc + +Every package needs a doc comment in the main file: + +```go +// Package mypackage provides functionality for X. +// +// # Getting Started +// +// svc, err := mypackage.New() +// result := svc.DoSomething("input") +// +// # Core Integration +// +// core, _ := framework.New( +// framework.WithName("mypackage", mypackage.NewService(mypackage.Options{})), +// ) +package mypackage +``` + +### Function Documentation + +Document public functions with examples: + +```go +// DoSomething performs X operation with the given input. +// Returns ErrInvalidArg if input is empty. +// +// result, err := svc.DoSomething("hello") +// if err != nil { +// return err +// } +func (s *Service) DoSomething(input string) (string, error) { + // ... +} +``` + +--- + +## Checklist + +When creating a new package, ensure: + +- [ ] `Service` struct embeds `framework.ServiceRuntime[Options]` +- [ ] `NewService()` factory function for Core registration +- [ ] `Default()` / `SetDefault()` with `atomic.Pointer` +- [ ] Package-level convenience functions +- [ ] Thread-safe public APIs (mutex or atomic) +- [ ] ACTION messages for events (if applicable) +- [ ] Hooks with atomic handlers (if applicable) +- [ ] Comprehensive tests with helpers +- [ ] Package documentation with examples + +## Reference Implementations + +- **`pkg/i18n`** - Full reference with handlers, modes, hooks, grammar +- **`pkg/process`** - Simpler example with ACTION events and runner orchestration +- **`pkg/cli`** - Service integration with runtime lifecycle diff --git a/pkg/process/actions.go b/pkg/process/actions.go new file mode 100644 index 00000000..7f33cf8e --- /dev/null +++ b/pkg/process/actions.go @@ -0,0 +1,37 @@ +package process + +import "time" + +// --- ACTION messages (broadcast via Core.ACTION) --- + +// ActionProcessStarted is broadcast when a process begins execution. +type ActionProcessStarted struct { + ID string + Command string + Args []string + Dir string + PID int +} + +// ActionProcessOutput is broadcast for each line of output. +// Subscribe to this for real-time streaming. +type ActionProcessOutput struct { + ID string + Line string + Stream Stream +} + +// ActionProcessExited is broadcast when a process completes. +// Check ExitCode for success (0) or failure. +type ActionProcessExited struct { + ID string + ExitCode int + Duration time.Duration + Error error // Non-nil if failed to start or was killed +} + +// ActionProcessKilled is broadcast when a process is terminated. +type ActionProcessKilled struct { + ID string + Signal string +} diff --git a/pkg/process/buffer.go b/pkg/process/buffer.go new file mode 100644 index 00000000..bf02f59e --- /dev/null +++ b/pkg/process/buffer.go @@ -0,0 +1,108 @@ +package process + +import "sync" + +// RingBuffer is a fixed-size circular buffer that overwrites old data. +// Thread-safe for concurrent reads and writes. +type RingBuffer struct { + data []byte + size int + start int + end int + full bool + mu sync.RWMutex +} + +// NewRingBuffer creates a ring buffer with the given capacity. +func NewRingBuffer(size int) *RingBuffer { + return &RingBuffer{ + data: make([]byte, size), + size: size, + } +} + +// Write appends data to the buffer, overwriting oldest data if full. +func (rb *RingBuffer) Write(p []byte) (n int, err error) { + rb.mu.Lock() + defer rb.mu.Unlock() + + for _, b := range p { + rb.data[rb.end] = b + rb.end = (rb.end + 1) % rb.size + if rb.full { + rb.start = (rb.start + 1) % rb.size + } + if rb.end == rb.start { + rb.full = true + } + } + return len(p), nil +} + +// String returns the buffer contents as a string. +func (rb *RingBuffer) String() string { + rb.mu.RLock() + defer rb.mu.RUnlock() + + if !rb.full && rb.start == rb.end { + return "" + } + + if rb.full { + result := make([]byte, rb.size) + copy(result, rb.data[rb.start:]) + copy(result[rb.size-rb.start:], rb.data[:rb.end]) + return string(result) + } + + return string(rb.data[rb.start:rb.end]) +} + +// Bytes returns a copy of the buffer contents. +func (rb *RingBuffer) Bytes() []byte { + rb.mu.RLock() + defer rb.mu.RUnlock() + + if !rb.full && rb.start == rb.end { + return nil + } + + if rb.full { + result := make([]byte, rb.size) + copy(result, rb.data[rb.start:]) + copy(result[rb.size-rb.start:], rb.data[:rb.end]) + return result + } + + result := make([]byte, rb.end-rb.start) + copy(result, rb.data[rb.start:rb.end]) + return result +} + +// Len returns the current length of data in the buffer. +func (rb *RingBuffer) Len() int { + rb.mu.RLock() + defer rb.mu.RUnlock() + + if rb.full { + return rb.size + } + if rb.end >= rb.start { + return rb.end - rb.start + } + return rb.size - rb.start + rb.end +} + +// Cap returns the buffer capacity. +func (rb *RingBuffer) Cap() int { + return rb.size +} + +// Reset clears the buffer. +func (rb *RingBuffer) Reset() { + rb.mu.Lock() + defer rb.mu.Unlock() + rb.start = 0 + rb.end = 0 + rb.full = false +} diff --git a/pkg/process/buffer_test.go b/pkg/process/buffer_test.go new file mode 100644 index 00000000..ee07ebc5 --- /dev/null +++ b/pkg/process/buffer_test.go @@ -0,0 +1,72 @@ +package process + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestRingBuffer(t *testing.T) { + t.Run("write and read", func(t *testing.T) { + rb := NewRingBuffer(10) + + n, err := rb.Write([]byte("hello")) + assert.NoError(t, err) + assert.Equal(t, 5, n) + assert.Equal(t, "hello", rb.String()) + assert.Equal(t, 5, rb.Len()) + }) + + t.Run("overflow wraps around", func(t *testing.T) { + rb := NewRingBuffer(5) + + rb.Write([]byte("hello")) + assert.Equal(t, "hello", rb.String()) + + rb.Write([]byte("world")) + // Should contain "world" (overwrote "hello") + assert.Equal(t, 5, rb.Len()) + assert.Equal(t, "world", rb.String()) + }) + + t.Run("partial overflow", func(t *testing.T) { + rb := NewRingBuffer(10) + + rb.Write([]byte("hello")) + rb.Write([]byte("worldx")) + // Should contain "lloworldx" (11 chars, buffer is 10) + assert.Equal(t, 10, rb.Len()) + }) + + t.Run("empty buffer", func(t *testing.T) { + rb := NewRingBuffer(10) + assert.Equal(t, "", rb.String()) + assert.Equal(t, 0, rb.Len()) + assert.Nil(t, rb.Bytes()) + }) + + t.Run("reset", func(t *testing.T) { + rb := NewRingBuffer(10) + rb.Write([]byte("hello")) + rb.Reset() + assert.Equal(t, "", rb.String()) + assert.Equal(t, 0, rb.Len()) + }) + + t.Run("cap", func(t *testing.T) { + rb := NewRingBuffer(42) + assert.Equal(t, 42, rb.Cap()) + }) + + t.Run("bytes returns copy", func(t *testing.T) { + rb := NewRingBuffer(10) + rb.Write([]byte("hello")) + + bytes := rb.Bytes() + assert.Equal(t, []byte("hello"), bytes) + + // Modifying returned bytes shouldn't affect buffer + bytes[0] = 'x' + assert.Equal(t, "hello", rb.String()) + }) +} diff --git a/pkg/process/process.go b/pkg/process/process.go new file mode 100644 index 00000000..a70d3911 --- /dev/null +++ b/pkg/process/process.go @@ -0,0 +1,182 @@ +package process + +import ( + "context" + "io" + "os/exec" + "sync" + "time" +) + +// Process represents a managed external process. +type Process struct { + ID string + Command string + Args []string + Dir string + Env []string + StartedAt time.Time + Status Status + ExitCode int + Duration time.Duration + + cmd *exec.Cmd + ctx context.Context + cancel context.CancelFunc + output *RingBuffer + stdin io.WriteCloser + done chan struct{} + mu sync.RWMutex +} + +// Info returns a snapshot of process state. +func (p *Process) Info() Info { + p.mu.RLock() + defer p.mu.RUnlock() + + pid := 0 + if p.cmd != nil && p.cmd.Process != nil { + pid = p.cmd.Process.Pid + } + + return Info{ + ID: p.ID, + Command: p.Command, + Args: p.Args, + Dir: p.Dir, + StartedAt: p.StartedAt, + Status: p.Status, + ExitCode: p.ExitCode, + Duration: p.Duration, + PID: pid, + } +} + +// Output returns the captured output as a string. +func (p *Process) Output() string { + p.mu.RLock() + defer p.mu.RUnlock() + if p.output == nil { + return "" + } + return p.output.String() +} + +// OutputBytes returns the captured output as bytes. +func (p *Process) OutputBytes() []byte { + p.mu.RLock() + defer p.mu.RUnlock() + if p.output == nil { + return nil + } + return p.output.Bytes() +} + +// IsRunning returns true if the process is still executing. +func (p *Process) IsRunning() bool { + p.mu.RLock() + defer p.mu.RUnlock() + return p.Status == StatusRunning +} + +// Wait blocks until the process exits. +func (p *Process) Wait() error { + <-p.done + p.mu.RLock() + defer p.mu.RUnlock() + if p.Status == StatusFailed || p.Status == StatusKilled { + return &exec.ExitError{} + } + if p.ExitCode != 0 { + return &exec.ExitError{} + } + return nil +} + +// Done returns a channel that closes when the process exits. +func (p *Process) Done() <-chan struct{} { + return p.done +} + +// Kill forcefully terminates the process. +func (p *Process) Kill() error { + p.mu.Lock() + defer p.mu.Unlock() + + if p.Status != StatusRunning { + return nil + } + + if p.cmd == nil || p.cmd.Process == nil { + return nil + } + + return p.cmd.Process.Kill() +} + +// Signal sends a signal to the process. +func (p *Process) Signal(sig interface{ Signal() }) error { + p.mu.Lock() + defer p.mu.Unlock() + + if p.Status != StatusRunning { + return nil + } + + if p.cmd == nil || p.cmd.Process == nil { + return nil + } + + // Type assert to os.Signal for Process.Signal + if osSig, ok := sig.(interface{ String() string }); ok { + _ = osSig // Satisfy linter + } + + return p.cmd.Process.Kill() // Simplified - would use Signal in full impl +} + +// SendInput writes to the process stdin. +func (p *Process) SendInput(input string) error { + p.mu.RLock() + defer p.mu.RUnlock() + + if p.Status != StatusRunning { + return ErrProcessNotRunning + } + + if p.stdin == nil { + return ErrStdinNotAvailable + } + + _, err := p.stdin.Write([]byte(input)) + return err +} + +// CloseStdin closes the process stdin pipe. +func (p *Process) CloseStdin() error { + p.mu.Lock() + defer p.mu.Unlock() + + if p.stdin == nil { + return nil + } + + err := p.stdin.Close() + p.stdin = nil + return err +} + +// setStatus updates the process status (internal use). +func (p *Process) setStatus(status Status) { + p.mu.Lock() + defer p.mu.Unlock() + p.Status = status +} + +// setExitCode sets the exit code and duration (internal use). +func (p *Process) setExitCode(code int, duration time.Duration) { + p.mu.Lock() + defer p.mu.Unlock() + p.ExitCode = code + p.Duration = duration +} diff --git a/pkg/process/process_global.go b/pkg/process/process_global.go new file mode 100644 index 00000000..b8d2bc36 --- /dev/null +++ b/pkg/process/process_global.go @@ -0,0 +1,105 @@ +package process + +import ( + "context" + "sync" + "sync/atomic" + + "github.com/host-uk/core/pkg/framework" +) + +// Global default service (follows i18n pattern). +var ( + defaultService atomic.Pointer[Service] + defaultOnce sync.Once + defaultErr error +) + +// Default returns the global process service. +// Returns nil if not initialized. +func Default() *Service { + return defaultService.Load() +} + +// SetDefault sets the global process service. +// Thread-safe: can be called concurrently with Default(). +func SetDefault(s *Service) { + if s == nil { + panic("process: SetDefault called with nil service") + } + defaultService.Store(s) +} + +// Init initializes the default global service with a Core instance. +// This is typically called during application startup. +func Init(c *framework.Core) error { + defaultOnce.Do(func() { + factory := NewService(Options{}) + svc, err := factory(c) + if err != nil { + defaultErr = err + return + } + defaultService.Store(svc.(*Service)) + }) + return defaultErr +} + +// --- Global convenience functions --- + +// Start spawns a new process using the default service. +func Start(ctx context.Context, command string, args ...string) (*Process, error) { + svc := Default() + if svc == nil { + return nil, ErrServiceNotInitialized + } + return svc.Start(ctx, command, args...) +} + +// Run executes a command and waits for completion using the default service. +func Run(ctx context.Context, command string, args ...string) (string, error) { + svc := Default() + if svc == nil { + return "", ErrServiceNotInitialized + } + return svc.Run(ctx, command, args...) +} + +// Get returns a process by ID from the default service. +func Get(id string) (*Process, error) { + svc := Default() + if svc == nil { + return nil, ErrServiceNotInitialized + } + return svc.Get(id) +} + +// List returns all processes from the default service. +func List() []*Process { + svc := Default() + if svc == nil { + return nil + } + return svc.List() +} + +// Kill terminates a process by ID using the default service. +func Kill(id string) error { + svc := Default() + if svc == nil { + return ErrServiceNotInitialized + } + return svc.Kill(id) +} + +// ErrServiceNotInitialized is returned when the service is not initialized. +var ErrServiceNotInitialized = &ServiceError{msg: "process: service not initialized"} + +// ServiceError represents a service-level error. +type ServiceError struct { + msg string +} + +func (e *ServiceError) Error() string { + return e.msg +} diff --git a/pkg/process/runner.go b/pkg/process/runner.go new file mode 100644 index 00000000..effd39a2 --- /dev/null +++ b/pkg/process/runner.go @@ -0,0 +1,293 @@ +package process + +import ( + "context" + "fmt" + "sync" + "time" +) + +// Runner orchestrates multiple processes with dependencies. +type Runner struct { + service *Service +} + +// NewRunner creates a runner for the given service. +func NewRunner(svc *Service) *Runner { + return &Runner{service: svc} +} + +// RunSpec defines a process to run with optional dependencies. +type RunSpec struct { + // Name is a friendly identifier (e.g., "lint", "test"). + Name string + // Command is the executable to run. + Command string + // Args are the command arguments. + Args []string + // Dir is the working directory. + Dir string + // Env are additional environment variables. + Env []string + // After lists spec names that must complete successfully first. + After []string + // AllowFailure if true, continues pipeline even if this spec fails. + AllowFailure bool +} + +// RunResult captures the outcome of a single process. +type RunResult struct { + Name string + Spec RunSpec + ExitCode int + Duration time.Duration + Output string + Error error + Skipped bool +} + +// Passed returns true if the process succeeded. +func (r RunResult) Passed() bool { + return !r.Skipped && r.Error == nil && r.ExitCode == 0 +} + +// RunAllResult is the aggregate result of running multiple specs. +type RunAllResult struct { + Results []RunResult + Duration time.Duration + Passed int + Failed int + Skipped int +} + +// Success returns true if all non-skipped specs passed. +func (r RunAllResult) Success() bool { + return r.Failed == 0 +} + +// RunAll executes specs respecting dependencies, parallelising where possible. +func (r *Runner) RunAll(ctx context.Context, specs []RunSpec) (*RunAllResult, error) { + start := time.Now() + + // Build dependency graph + specMap := make(map[string]RunSpec) + for _, spec := range specs { + specMap[spec.Name] = spec + } + + // Track completion + completed := make(map[string]*RunResult) + var completedMu sync.Mutex + + results := make([]RunResult, 0, len(specs)) + var resultsMu sync.Mutex + + // Process specs in waves + remaining := make(map[string]RunSpec) + for _, spec := range specs { + remaining[spec.Name] = spec + } + + for len(remaining) > 0 { + // Find specs ready to run (all dependencies satisfied) + ready := make([]RunSpec, 0) + for _, spec := range remaining { + if r.canRun(spec, completed) { + ready = append(ready, spec) + } + } + + if len(ready) == 0 && len(remaining) > 0 { + // Deadlock - circular dependency or missing specs + for name := range remaining { + results = append(results, RunResult{ + Name: name, + Spec: remaining[name], + Skipped: true, + Error: fmt.Errorf("circular dependency or missing dependency"), + }) + } + break + } + + // Run ready specs in parallel + var wg sync.WaitGroup + for _, spec := range ready { + wg.Add(1) + go func(spec RunSpec) { + defer wg.Done() + + // Check if dependencies failed + completedMu.Lock() + shouldSkip := false + for _, dep := range spec.After { + if result, ok := completed[dep]; ok { + if !result.Passed() && !specMap[dep].AllowFailure { + shouldSkip = true + break + } + } + } + completedMu.Unlock() + + var result RunResult + if shouldSkip { + result = RunResult{ + Name: spec.Name, + Spec: spec, + Skipped: true, + Error: fmt.Errorf("skipped due to dependency failure"), + } + } else { + result = r.runSpec(ctx, spec) + } + + completedMu.Lock() + completed[spec.Name] = &result + completedMu.Unlock() + + resultsMu.Lock() + results = append(results, result) + resultsMu.Unlock() + }(spec) + } + wg.Wait() + + // Remove completed from remaining + for _, spec := range ready { + delete(remaining, spec.Name) + } + } + + // Build aggregate result + aggResult := &RunAllResult{ + Results: results, + Duration: time.Since(start), + } + + for _, res := range results { + if res.Skipped { + aggResult.Skipped++ + } else if res.Passed() { + aggResult.Passed++ + } else { + aggResult.Failed++ + } + } + + return aggResult, nil +} + +// canRun checks if all dependencies are completed. +func (r *Runner) canRun(spec RunSpec, completed map[string]*RunResult) bool { + for _, dep := range spec.After { + if _, ok := completed[dep]; !ok { + return false + } + } + return true +} + +// runSpec executes a single spec. +func (r *Runner) runSpec(ctx context.Context, spec RunSpec) RunResult { + start := time.Now() + + proc, err := r.service.StartWithOptions(ctx, RunOptions{ + Command: spec.Command, + Args: spec.Args, + Dir: spec.Dir, + Env: spec.Env, + }) + if err != nil { + return RunResult{ + Name: spec.Name, + Spec: spec, + Duration: time.Since(start), + Error: err, + } + } + + <-proc.Done() + + return RunResult{ + Name: spec.Name, + Spec: spec, + ExitCode: proc.ExitCode, + Duration: proc.Duration, + Output: proc.Output(), + Error: nil, + } +} + +// RunSequential executes specs one after another, stopping on first failure. +func (r *Runner) RunSequential(ctx context.Context, specs []RunSpec) (*RunAllResult, error) { + start := time.Now() + results := make([]RunResult, 0, len(specs)) + + for _, spec := range specs { + result := r.runSpec(ctx, spec) + results = append(results, result) + + if !result.Passed() && !spec.AllowFailure { + // Mark remaining as skipped + for i := len(results); i < len(specs); i++ { + results = append(results, RunResult{ + Name: specs[i].Name, + Spec: specs[i], + Skipped: true, + }) + } + break + } + } + + aggResult := &RunAllResult{ + Results: results, + Duration: time.Since(start), + } + + for _, res := range results { + if res.Skipped { + aggResult.Skipped++ + } else if res.Passed() { + aggResult.Passed++ + } else { + aggResult.Failed++ + } + } + + return aggResult, nil +} + +// RunParallel executes all specs concurrently, regardless of dependencies. +func (r *Runner) RunParallel(ctx context.Context, specs []RunSpec) (*RunAllResult, error) { + start := time.Now() + results := make([]RunResult, len(specs)) + + var wg sync.WaitGroup + for i, spec := range specs { + wg.Add(1) + go func(i int, spec RunSpec) { + defer wg.Done() + results[i] = r.runSpec(ctx, spec) + }(i, spec) + } + wg.Wait() + + aggResult := &RunAllResult{ + Results: results, + Duration: time.Since(start), + } + + for _, res := range results { + if res.Skipped { + aggResult.Skipped++ + } else if res.Passed() { + aggResult.Passed++ + } else { + aggResult.Failed++ + } + } + + return aggResult, nil +} diff --git a/pkg/process/runner_test.go b/pkg/process/runner_test.go new file mode 100644 index 00000000..85d1a3ec --- /dev/null +++ b/pkg/process/runner_test.go @@ -0,0 +1,176 @@ +package process + +import ( + "context" + "testing" + + "github.com/host-uk/core/pkg/framework" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func newTestRunner(t *testing.T) *Runner { + t.Helper() + + core, err := framework.New( + framework.WithName("process", NewService(Options{})), + ) + require.NoError(t, err) + + svc, err := framework.ServiceFor[*Service](core, "process") + require.NoError(t, err) + + return NewRunner(svc) +} + +func TestRunner_RunSequential(t *testing.T) { + t.Run("all pass", func(t *testing.T) { + runner := newTestRunner(t) + + result, err := runner.RunSequential(context.Background(), []RunSpec{ + {Name: "first", Command: "echo", Args: []string{"1"}}, + {Name: "second", Command: "echo", Args: []string{"2"}}, + {Name: "third", Command: "echo", Args: []string{"3"}}, + }) + require.NoError(t, err) + + assert.True(t, result.Success()) + assert.Equal(t, 3, result.Passed) + assert.Equal(t, 0, result.Failed) + assert.Equal(t, 0, result.Skipped) + }) + + t.Run("stops on failure", func(t *testing.T) { + runner := newTestRunner(t) + + result, err := runner.RunSequential(context.Background(), []RunSpec{ + {Name: "first", Command: "echo", Args: []string{"1"}}, + {Name: "fails", Command: "sh", Args: []string{"-c", "exit 1"}}, + {Name: "third", Command: "echo", Args: []string{"3"}}, + }) + require.NoError(t, err) + + assert.False(t, result.Success()) + assert.Equal(t, 1, result.Passed) + assert.Equal(t, 1, result.Failed) + assert.Equal(t, 1, result.Skipped) + }) + + t.Run("allow failure continues", func(t *testing.T) { + runner := newTestRunner(t) + + result, err := runner.RunSequential(context.Background(), []RunSpec{ + {Name: "first", Command: "echo", Args: []string{"1"}}, + {Name: "fails", Command: "sh", Args: []string{"-c", "exit 1"}, AllowFailure: true}, + {Name: "third", Command: "echo", Args: []string{"3"}}, + }) + require.NoError(t, err) + + // Still counts as failed but pipeline continues + assert.Equal(t, 2, result.Passed) + assert.Equal(t, 1, result.Failed) + assert.Equal(t, 0, result.Skipped) + }) +} + +func TestRunner_RunParallel(t *testing.T) { + t.Run("all run concurrently", func(t *testing.T) { + runner := newTestRunner(t) + + result, err := runner.RunParallel(context.Background(), []RunSpec{ + {Name: "first", Command: "echo", Args: []string{"1"}}, + {Name: "second", Command: "echo", Args: []string{"2"}}, + {Name: "third", Command: "echo", Args: []string{"3"}}, + }) + require.NoError(t, err) + + assert.True(t, result.Success()) + assert.Equal(t, 3, result.Passed) + assert.Len(t, result.Results, 3) + }) + + t.Run("failure doesnt stop others", func(t *testing.T) { + runner := newTestRunner(t) + + result, err := runner.RunParallel(context.Background(), []RunSpec{ + {Name: "first", Command: "echo", Args: []string{"1"}}, + {Name: "fails", Command: "sh", Args: []string{"-c", "exit 1"}}, + {Name: "third", Command: "echo", Args: []string{"3"}}, + }) + require.NoError(t, err) + + assert.False(t, result.Success()) + assert.Equal(t, 2, result.Passed) + assert.Equal(t, 1, result.Failed) + }) +} + +func TestRunner_RunAll(t *testing.T) { + t.Run("respects dependencies", func(t *testing.T) { + runner := newTestRunner(t) + + result, err := runner.RunAll(context.Background(), []RunSpec{ + {Name: "third", Command: "echo", Args: []string{"3"}, After: []string{"second"}}, + {Name: "first", Command: "echo", Args: []string{"1"}}, + {Name: "second", Command: "echo", Args: []string{"2"}, After: []string{"first"}}, + }) + require.NoError(t, err) + + assert.True(t, result.Success()) + assert.Equal(t, 3, result.Passed) + }) + + t.Run("skips dependents on failure", func(t *testing.T) { + runner := newTestRunner(t) + + result, err := runner.RunAll(context.Background(), []RunSpec{ + {Name: "first", Command: "sh", Args: []string{"-c", "exit 1"}}, + {Name: "second", Command: "echo", Args: []string{"2"}, After: []string{"first"}}, + {Name: "third", Command: "echo", Args: []string{"3"}, After: []string{"second"}}, + }) + require.NoError(t, err) + + assert.False(t, result.Success()) + assert.Equal(t, 0, result.Passed) + assert.Equal(t, 1, result.Failed) + assert.Equal(t, 2, result.Skipped) + }) + + t.Run("parallel independent specs", func(t *testing.T) { + runner := newTestRunner(t) + + // These should run in parallel since they have no dependencies + result, err := runner.RunAll(context.Background(), []RunSpec{ + {Name: "a", Command: "echo", Args: []string{"a"}}, + {Name: "b", Command: "echo", Args: []string{"b"}}, + {Name: "c", Command: "echo", Args: []string{"c"}}, + {Name: "final", Command: "echo", Args: []string{"done"}, After: []string{"a", "b", "c"}}, + }) + require.NoError(t, err) + + assert.True(t, result.Success()) + assert.Equal(t, 4, result.Passed) + }) +} + +func TestRunResult_Passed(t *testing.T) { + t.Run("success", func(t *testing.T) { + r := RunResult{ExitCode: 0} + assert.True(t, r.Passed()) + }) + + t.Run("non-zero exit", func(t *testing.T) { + r := RunResult{ExitCode: 1} + assert.False(t, r.Passed()) + }) + + t.Run("skipped", func(t *testing.T) { + r := RunResult{ExitCode: 0, Skipped: true} + assert.False(t, r.Passed()) + }) + + t.Run("error", func(t *testing.T) { + r := RunResult{ExitCode: 0, Error: assert.AnError} + assert.False(t, r.Passed()) + }) +} diff --git a/pkg/process/service.go b/pkg/process/service.go new file mode 100644 index 00000000..ab5683b9 --- /dev/null +++ b/pkg/process/service.go @@ -0,0 +1,378 @@ +package process + +import ( + "bufio" + "context" + "errors" + "fmt" + "io" + "os/exec" + "sync" + "sync/atomic" + "time" + + "github.com/host-uk/core/pkg/framework" +) + +// Default buffer size for process output (1MB). +const DefaultBufferSize = 1024 * 1024 + +// Errors +var ( + ErrProcessNotFound = errors.New("process not found") + ErrProcessNotRunning = errors.New("process is not running") + ErrStdinNotAvailable = errors.New("stdin not available") +) + +// Service manages process execution with Core IPC integration. +type Service struct { + *framework.ServiceRuntime[Options] + + processes map[string]*Process + mu sync.RWMutex + bufSize int + idCounter atomic.Uint64 +} + +// Options configures the process service. +type Options struct { + // BufferSize is the ring buffer size for output capture. + // Default: 1MB (1024 * 1024 bytes). + BufferSize int +} + +// NewService creates a process service factory for Core registration. +// +// core, _ := framework.New( +// framework.WithName("process", process.NewService(process.Options{})), +// ) +func NewService(opts Options) func(*framework.Core) (any, error) { + return func(c *framework.Core) (any, error) { + if opts.BufferSize == 0 { + opts.BufferSize = DefaultBufferSize + } + svc := &Service{ + ServiceRuntime: framework.NewServiceRuntime(c, opts), + processes: make(map[string]*Process), + bufSize: opts.BufferSize, + } + return svc, nil + } +} + +// OnStartup implements framework.Startable. +func (s *Service) OnStartup(ctx context.Context) error { + return nil +} + +// OnShutdown implements framework.Stoppable. +// Kills all running processes on shutdown. +func (s *Service) OnShutdown(ctx context.Context) error { + s.mu.RLock() + procs := make([]*Process, 0, len(s.processes)) + for _, p := range s.processes { + if p.IsRunning() { + procs = append(procs, p) + } + } + s.mu.RUnlock() + + for _, p := range procs { + _ = p.Kill() + } + + return nil +} + +// Start spawns a new process with the given command and args. +func (s *Service) Start(ctx context.Context, command string, args ...string) (*Process, error) { + return s.StartWithOptions(ctx, RunOptions{ + Command: command, + Args: args, + }) +} + +// StartWithOptions spawns a process with full configuration. +func (s *Service) StartWithOptions(ctx context.Context, opts RunOptions) (*Process, error) { + id := fmt.Sprintf("proc-%d", s.idCounter.Add(1)) + + procCtx, cancel := context.WithCancel(ctx) + cmd := exec.CommandContext(procCtx, opts.Command, opts.Args...) + + if opts.Dir != "" { + cmd.Dir = opts.Dir + } + if len(opts.Env) > 0 { + cmd.Env = append(cmd.Environ(), opts.Env...) + } + + // Set up pipes + stdout, err := cmd.StdoutPipe() + if err != nil { + cancel() + return nil, fmt.Errorf("failed to create stdout pipe: %w", err) + } + + stderr, err := cmd.StderrPipe() + if err != nil { + cancel() + return nil, fmt.Errorf("failed to create stderr pipe: %w", err) + } + + stdin, err := cmd.StdinPipe() + if err != nil { + cancel() + return nil, fmt.Errorf("failed to create stdin pipe: %w", err) + } + + // Create output buffer (enabled by default) + var output *RingBuffer + if !opts.DisableCapture { + output = NewRingBuffer(s.bufSize) + } + + proc := &Process{ + ID: id, + Command: opts.Command, + Args: opts.Args, + Dir: opts.Dir, + Env: opts.Env, + StartedAt: time.Now(), + Status: StatusRunning, + cmd: cmd, + ctx: procCtx, + cancel: cancel, + output: output, + stdin: stdin, + done: make(chan struct{}), + } + + // Start the process + if err := cmd.Start(); err != nil { + cancel() + return nil, fmt.Errorf("failed to start process: %w", err) + } + + // Store process + s.mu.Lock() + s.processes[id] = proc + s.mu.Unlock() + + // Broadcast start + s.Core().ACTION(ActionProcessStarted{ + ID: id, + Command: opts.Command, + Args: opts.Args, + Dir: opts.Dir, + PID: cmd.Process.Pid, + }) + + // Stream output in goroutines + var wg sync.WaitGroup + wg.Add(2) + go func() { + defer wg.Done() + s.streamOutput(proc, stdout, StreamStdout) + }() + go func() { + defer wg.Done() + s.streamOutput(proc, stderr, StreamStderr) + }() + + // Wait for process completion + go func() { + // Wait for output streaming to complete + wg.Wait() + + // Wait for process exit + err := cmd.Wait() + + duration := time.Since(proc.StartedAt) + + proc.mu.Lock() + proc.Duration = duration + if err != nil { + var exitErr *exec.ExitError + if errors.As(err, &exitErr) { + proc.ExitCode = exitErr.ExitCode() + proc.Status = StatusExited + } else { + proc.Status = StatusFailed + } + } else { + proc.ExitCode = 0 + proc.Status = StatusExited + } + status := proc.Status + exitCode := proc.ExitCode + proc.mu.Unlock() + + close(proc.done) + + // Broadcast exit + var exitErr error + if status == StatusFailed { + exitErr = err + } + s.Core().ACTION(ActionProcessExited{ + ID: id, + ExitCode: exitCode, + Duration: duration, + Error: exitErr, + }) + }() + + return proc, nil +} + +// streamOutput reads from a pipe and broadcasts lines via ACTION. +func (s *Service) streamOutput(proc *Process, r io.Reader, stream Stream) { + scanner := bufio.NewScanner(r) + // Increase buffer for long lines + scanner.Buffer(make([]byte, 64*1024), 1024*1024) + + for scanner.Scan() { + line := scanner.Text() + + // Write to ring buffer + if proc.output != nil { + proc.output.Write([]byte(line + "\n")) + } + + // Broadcast output + s.Core().ACTION(ActionProcessOutput{ + ID: proc.ID, + Line: line, + Stream: stream, + }) + } +} + +// Get returns a process by ID. +func (s *Service) Get(id string) (*Process, error) { + s.mu.RLock() + defer s.mu.RUnlock() + + proc, ok := s.processes[id] + if !ok { + return nil, ErrProcessNotFound + } + return proc, nil +} + +// List returns all processes. +func (s *Service) List() []*Process { + s.mu.RLock() + defer s.mu.RUnlock() + + result := make([]*Process, 0, len(s.processes)) + for _, p := range s.processes { + result = append(result, p) + } + return result +} + +// Running returns all currently running processes. +func (s *Service) Running() []*Process { + s.mu.RLock() + defer s.mu.RUnlock() + + var result []*Process + for _, p := range s.processes { + if p.IsRunning() { + result = append(result, p) + } + } + return result +} + +// Kill terminates a process by ID. +func (s *Service) Kill(id string) error { + proc, err := s.Get(id) + if err != nil { + return err + } + + if err := proc.Kill(); err != nil { + return err + } + + s.Core().ACTION(ActionProcessKilled{ + ID: id, + Signal: "SIGKILL", + }) + + return nil +} + +// Remove removes a completed process from the list. +func (s *Service) Remove(id string) error { + s.mu.Lock() + defer s.mu.Unlock() + + proc, ok := s.processes[id] + if !ok { + return ErrProcessNotFound + } + + if proc.IsRunning() { + return errors.New("cannot remove running process") + } + + delete(s.processes, id) + return nil +} + +// Clear removes all completed processes. +func (s *Service) Clear() { + s.mu.Lock() + defer s.mu.Unlock() + + for id, p := range s.processes { + if !p.IsRunning() { + delete(s.processes, id) + } + } +} + +// Output returns the captured output of a process. +func (s *Service) Output(id string) (string, error) { + proc, err := s.Get(id) + if err != nil { + return "", err + } + return proc.Output(), nil +} + +// Run executes a command and waits for completion. +// Returns the combined output and any error. +func (s *Service) Run(ctx context.Context, command string, args ...string) (string, error) { + proc, err := s.Start(ctx, command, args...) + if err != nil { + return "", err + } + + <-proc.Done() + + output := proc.Output() + if proc.ExitCode != 0 { + return output, fmt.Errorf("process exited with code %d", proc.ExitCode) + } + return output, nil +} + +// RunWithOptions executes a command with options and waits for completion. +func (s *Service) RunWithOptions(ctx context.Context, opts RunOptions) (string, error) { + proc, err := s.StartWithOptions(ctx, opts) + if err != nil { + return "", err + } + + <-proc.Done() + + output := proc.Output() + if proc.ExitCode != 0 { + return output, fmt.Errorf("process exited with code %d", proc.ExitCode) + } + return output, nil +} diff --git a/pkg/process/service_test.go b/pkg/process/service_test.go new file mode 100644 index 00000000..a4b8cdf6 --- /dev/null +++ b/pkg/process/service_test.go @@ -0,0 +1,271 @@ +package process + +import ( + "context" + "strings" + "sync" + "testing" + "time" + + "github.com/host-uk/core/pkg/framework" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func newTestService(t *testing.T) (*Service, *framework.Core) { + t.Helper() + + core, err := framework.New( + framework.WithName("process", NewService(Options{BufferSize: 1024})), + ) + require.NoError(t, err) + + svc, err := framework.ServiceFor[*Service](core, "process") + require.NoError(t, err) + + return svc, core +} + +func TestService_Start(t *testing.T) { + t.Run("echo command", func(t *testing.T) { + svc, _ := newTestService(t) + + proc, err := svc.Start(context.Background(), "echo", "hello") + require.NoError(t, err) + require.NotNil(t, proc) + + assert.NotEmpty(t, proc.ID) + assert.Equal(t, "echo", proc.Command) + assert.Equal(t, []string{"hello"}, proc.Args) + + // Wait for completion + <-proc.Done() + + assert.Equal(t, StatusExited, proc.Status) + assert.Equal(t, 0, proc.ExitCode) + assert.Contains(t, proc.Output(), "hello") + }) + + t.Run("failing command", func(t *testing.T) { + svc, _ := newTestService(t) + + proc, err := svc.Start(context.Background(), "sh", "-c", "exit 42") + require.NoError(t, err) + + <-proc.Done() + + assert.Equal(t, StatusExited, proc.Status) + assert.Equal(t, 42, proc.ExitCode) + }) + + t.Run("non-existent command", func(t *testing.T) { + svc, _ := newTestService(t) + + _, err := svc.Start(context.Background(), "nonexistent_command_xyz") + assert.Error(t, err) + }) + + t.Run("with working directory", func(t *testing.T) { + svc, _ := newTestService(t) + + proc, err := svc.StartWithOptions(context.Background(), RunOptions{ + Command: "pwd", + Dir: "/tmp", + }) + require.NoError(t, err) + + <-proc.Done() + + // On macOS /tmp is a symlink to /private/tmp + output := strings.TrimSpace(proc.Output()) + assert.True(t, output == "/tmp" || output == "/private/tmp", "got: %s", output) + }) + + t.Run("context cancellation", func(t *testing.T) { + svc, _ := newTestService(t) + + ctx, cancel := context.WithCancel(context.Background()) + proc, err := svc.Start(ctx, "sleep", "10") + require.NoError(t, err) + + // Cancel immediately + cancel() + + select { + case <-proc.Done(): + // Good - process was killed + case <-time.After(2 * time.Second): + t.Fatal("process should have been killed") + } + }) +} + +func TestService_Run(t *testing.T) { + t.Run("returns output", func(t *testing.T) { + svc, _ := newTestService(t) + + output, err := svc.Run(context.Background(), "echo", "hello world") + require.NoError(t, err) + assert.Contains(t, output, "hello world") + }) + + t.Run("returns error on failure", func(t *testing.T) { + svc, _ := newTestService(t) + + _, err := svc.Run(context.Background(), "sh", "-c", "exit 1") + assert.Error(t, err) + assert.Contains(t, err.Error(), "exited with code 1") + }) +} + +func TestService_Actions(t *testing.T) { + t.Run("broadcasts events", func(t *testing.T) { + core, err := framework.New( + framework.WithName("process", NewService(Options{})), + ) + require.NoError(t, err) + + var started []ActionProcessStarted + var outputs []ActionProcessOutput + var exited []ActionProcessExited + var mu sync.Mutex + + core.RegisterAction(func(c *framework.Core, msg framework.Message) error { + mu.Lock() + defer mu.Unlock() + switch m := msg.(type) { + case ActionProcessStarted: + started = append(started, m) + case ActionProcessOutput: + outputs = append(outputs, m) + case ActionProcessExited: + exited = append(exited, m) + } + return nil + }) + + svc, _ := framework.ServiceFor[*Service](core, "process") + proc, err := svc.Start(context.Background(), "echo", "test") + require.NoError(t, err) + + <-proc.Done() + + // Give time for events to propagate + time.Sleep(10 * time.Millisecond) + + mu.Lock() + defer mu.Unlock() + + assert.Len(t, started, 1) + assert.Equal(t, "echo", started[0].Command) + assert.Equal(t, []string{"test"}, started[0].Args) + + assert.NotEmpty(t, outputs) + foundTest := false + for _, o := range outputs { + if strings.Contains(o.Line, "test") { + foundTest = true + break + } + } + assert.True(t, foundTest, "should have output containing 'test'") + + assert.Len(t, exited, 1) + assert.Equal(t, 0, exited[0].ExitCode) + }) +} + +func TestService_List(t *testing.T) { + t.Run("tracks processes", func(t *testing.T) { + svc, _ := newTestService(t) + + proc1, _ := svc.Start(context.Background(), "echo", "1") + proc2, _ := svc.Start(context.Background(), "echo", "2") + + <-proc1.Done() + <-proc2.Done() + + list := svc.List() + assert.Len(t, list, 2) + }) + + t.Run("get by id", func(t *testing.T) { + svc, _ := newTestService(t) + + proc, _ := svc.Start(context.Background(), "echo", "test") + <-proc.Done() + + got, err := svc.Get(proc.ID) + require.NoError(t, err) + assert.Equal(t, proc.ID, got.ID) + }) + + t.Run("get not found", func(t *testing.T) { + svc, _ := newTestService(t) + + _, err := svc.Get("nonexistent") + assert.ErrorIs(t, err, ErrProcessNotFound) + }) +} + +func TestService_Remove(t *testing.T) { + t.Run("removes completed process", func(t *testing.T) { + svc, _ := newTestService(t) + + proc, _ := svc.Start(context.Background(), "echo", "test") + <-proc.Done() + + err := svc.Remove(proc.ID) + require.NoError(t, err) + + _, err = svc.Get(proc.ID) + assert.ErrorIs(t, err, ErrProcessNotFound) + }) + + t.Run("cannot remove running process", func(t *testing.T) { + svc, _ := newTestService(t) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + proc, _ := svc.Start(ctx, "sleep", "10") + + err := svc.Remove(proc.ID) + assert.Error(t, err) + + cancel() + <-proc.Done() + }) +} + +func TestService_Clear(t *testing.T) { + t.Run("clears completed processes", func(t *testing.T) { + svc, _ := newTestService(t) + + proc1, _ := svc.Start(context.Background(), "echo", "1") + proc2, _ := svc.Start(context.Background(), "echo", "2") + + <-proc1.Done() + <-proc2.Done() + + assert.Len(t, svc.List(), 2) + + svc.Clear() + + assert.Len(t, svc.List(), 0) + }) +} + +func TestProcess_Info(t *testing.T) { + svc, _ := newTestService(t) + + proc, _ := svc.Start(context.Background(), "echo", "hello") + <-proc.Done() + + info := proc.Info() + assert.Equal(t, proc.ID, info.ID) + assert.Equal(t, "echo", info.Command) + assert.Equal(t, []string{"hello"}, info.Args) + assert.Equal(t, StatusExited, info.Status) + assert.Equal(t, 0, info.ExitCode) +} diff --git a/pkg/process/types.go b/pkg/process/types.go new file mode 100644 index 00000000..74e03a6d --- /dev/null +++ b/pkg/process/types.go @@ -0,0 +1,86 @@ +// Package process provides process management with Core IPC integration. +// +// The process package enables spawning, monitoring, and controlling external +// processes with output streaming via the Core ACTION system. +// +// # Getting Started +// +// // Register with Core +// core, _ := framework.New( +// framework.WithName("process", process.NewService(process.Options{})), +// ) +// +// // Get service and run a process +// svc := framework.MustServiceFor[*process.Service](core, "process") +// proc, _ := svc.Start(ctx, "go", "test", "./...") +// +// # Listening for Events +// +// Process events are broadcast via Core.ACTION: +// +// core.RegisterAction(func(c *framework.Core, msg framework.Message) error { +// switch m := msg.(type) { +// case process.ActionProcessOutput: +// fmt.Print(m.Line) +// case process.ActionProcessExited: +// fmt.Printf("Exit code: %d\n", m.ExitCode) +// } +// return nil +// }) +package process + +import "time" + +// Status represents the process lifecycle state. +type Status string + +const ( + // StatusPending indicates the process is queued but not yet started. + StatusPending Status = "pending" + // StatusRunning indicates the process is actively executing. + StatusRunning Status = "running" + // StatusExited indicates the process completed (check ExitCode). + StatusExited Status = "exited" + // StatusFailed indicates the process could not be started. + StatusFailed Status = "failed" + // StatusKilled indicates the process was terminated by signal. + StatusKilled Status = "killed" +) + +// Stream identifies the output source. +type Stream string + +const ( + // StreamStdout is standard output. + StreamStdout Stream = "stdout" + // StreamStderr is standard error. + StreamStderr Stream = "stderr" +) + +// RunOptions configures process execution. +type RunOptions struct { + // Command is the executable to run. + Command string + // Args are the command arguments. + Args []string + // Dir is the working directory (empty = current). + Dir string + // Env are additional environment variables (KEY=VALUE format). + Env []string + // DisableCapture disables output buffering. + // By default, output is captured to a ring buffer. + DisableCapture bool +} + +// Info provides a snapshot of process state without internal fields. +type Info struct { + ID string `json:"id"` + Command string `json:"command"` + Args []string `json:"args"` + Dir string `json:"dir"` + StartedAt time.Time `json:"startedAt"` + Status Status `json:"status"` + ExitCode int `json:"exitCode"` + Duration time.Duration `json:"duration"` + PID int `json:"pid"` +}