feat(process): add process management package with Core IPC

Add pkg/process for spawning, monitoring, and orchestrating external
processes with Core ACTION integration:

- Service with framework.ServiceRuntime integration
- ACTION messages: ProcessStarted, ProcessOutput, ProcessExited
- RingBuffer for output capture
- Runner for orchestration (RunAll, RunSequential, RunParallel)
- Dependency graph support for QA pipelines
- Global convenience functions following i18n patterns

Also add docs/pkg/PACKAGE_STANDARDS.md defining how to create Core
packages, using pkg/i18n as the reference implementation.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Snider 2026-01-30 19:48:28 +00:00
parent 6062853649
commit 41270b2904
11 changed files with 2274 additions and 0 deletions

View file

@ -0,0 +1,566 @@
# Core Package Standards
This document defines the standards for creating packages in the Core framework. The `pkg/i18n` package is the reference implementation; all new packages should follow its patterns.
## Package Structure
A well-structured Core package follows this layout:
```
pkg/mypackage/
├── types.go # Public types, constants, interfaces
├── service.go # Service struct with framework integration
├── mypackage.go # Global convenience functions
├── actions.go # ACTION messages for Core IPC (if needed)
├── hooks.go # Event hooks with atomic handlers (if needed)
├── [feature].go # Additional feature files
├── [feature]_test.go # Tests alongside implementation
└── service_test.go # Service tests
```
## Core Principles
1. **Service-oriented**: Packages expose a `Service` struct that integrates with the Core framework
2. **Thread-safe**: All public APIs must be safe for concurrent use
3. **Global convenience**: Provide package-level functions that use a default service instance
4. **Options pattern**: Use functional options for configuration
5. **ACTION-based IPC**: Communicate via Core's ACTION system, not callbacks
---
## Service Pattern
### Service Struct
Embed `framework.ServiceRuntime[T]` for Core integration:
```go
// pkg/mypackage/service.go
package mypackage
import (
"sync"
"github.com/host-uk/core/pkg/framework"
)
// Service provides mypackage functionality with Core integration.
type Service struct {
*framework.ServiceRuntime[Options]
// Internal state (protected by mutex)
data map[string]any
mu sync.RWMutex
}
// Options configures the service.
type Options struct {
// Document each option
BufferSize int
EnableFoo bool
}
```
### Service Factory
Create a factory function for Core registration:
```go
// NewService creates a service factory for Core registration.
//
// core, _ := framework.New(
// framework.WithName("mypackage", mypackage.NewService(mypackage.Options{})),
// )
func NewService(opts Options) func(*framework.Core) (any, error) {
return func(c *framework.Core) (any, error) {
// Apply defaults
if opts.BufferSize == 0 {
opts.BufferSize = DefaultBufferSize
}
svc := &Service{
ServiceRuntime: framework.NewServiceRuntime(c, opts),
data: make(map[string]any),
}
return svc, nil
}
}
```
### Lifecycle Hooks
Implement `framework.Startable` and/or `framework.Stoppable`:
```go
// OnStartup implements framework.Startable.
func (s *Service) OnStartup(ctx context.Context) error {
// Register query/task handlers
s.Core().RegisterQuery(s.handleQuery)
s.Core().RegisterAction(s.handleAction)
return nil
}
// OnShutdown implements framework.Stoppable.
func (s *Service) OnShutdown(ctx context.Context) error {
// Cleanup resources
return nil
}
```
---
## Global Default Pattern
Following `pkg/i18n`, provide a global default service with atomic access:
```go
// pkg/mypackage/mypackage.go
package mypackage
import (
"sync"
"sync/atomic"
"github.com/host-uk/core/pkg/framework"
)
// Global default service
var (
defaultService atomic.Pointer[Service]
defaultOnce sync.Once
defaultErr error
)
// Default returns the global service instance.
// Returns nil if not initialised.
func Default() *Service {
return defaultService.Load()
}
// SetDefault sets the global service instance.
// Thread-safe. Panics if s is nil.
func SetDefault(s *Service) {
if s == nil {
panic("mypackage: SetDefault called with nil service")
}
defaultService.Store(s)
}
// Init initialises the default service with a Core instance.
func Init(c *framework.Core) error {
defaultOnce.Do(func() {
factory := NewService(Options{})
svc, err := factory(c)
if err != nil {
defaultErr = err
return
}
defaultService.Store(svc.(*Service))
})
return defaultErr
}
```
### Global Convenience Functions
Expose the most common operations at package level:
```go
// ErrServiceNotInitialised is returned when the service is not initialised.
var ErrServiceNotInitialised = errors.New("mypackage: service not initialised")
// DoSomething performs an operation using the default service.
func DoSomething(arg string) (string, error) {
svc := Default()
if svc == nil {
return "", ErrServiceNotInitialised
}
return svc.DoSomething(arg)
}
```
---
## Options Pattern
Use functional options for complex configuration:
```go
// Option configures a Service during construction.
type Option func(*Service)
// WithBufferSize sets the buffer size.
func WithBufferSize(size int) Option {
return func(s *Service) {
s.bufSize = size
}
}
// WithFoo enables foo feature.
func WithFoo(enabled bool) Option {
return func(s *Service) {
s.fooEnabled = enabled
}
}
// New creates a service with options.
func New(opts ...Option) (*Service, error) {
s := &Service{
bufSize: DefaultBufferSize,
}
for _, opt := range opts {
opt(s)
}
return s, nil
}
```
---
## ACTION Messages (IPC)
For services that need to communicate events, define ACTION message types:
```go
// pkg/mypackage/actions.go
package mypackage
import "time"
// ActionItemCreated is broadcast when an item is created.
type ActionItemCreated struct {
ID string
Name string
CreatedAt time.Time
}
// ActionItemUpdated is broadcast when an item changes.
type ActionItemUpdated struct {
ID string
Changes map[string]any
}
// ActionItemDeleted is broadcast when an item is removed.
type ActionItemDeleted struct {
ID string
}
```
Dispatch actions via `s.Core().ACTION()`:
```go
func (s *Service) CreateItem(name string) (*Item, error) {
item := &Item{ID: generateID(), Name: name}
// Store item...
// Broadcast to listeners
s.Core().ACTION(ActionItemCreated{
ID: item.ID,
Name: item.Name,
CreatedAt: time.Now(),
})
return item, nil
}
```
Consumers register handlers:
```go
core.RegisterAction(func(c *framework.Core, msg framework.Message) error {
switch m := msg.(type) {
case mypackage.ActionItemCreated:
log.Printf("Item created: %s", m.Name)
case mypackage.ActionItemDeleted:
log.Printf("Item deleted: %s", m.ID)
}
return nil
})
```
---
## Hooks Pattern
For user-customisable behaviour, use atomic handlers (see `pkg/i18n/hooks.go`):
```go
// pkg/mypackage/hooks.go
package mypackage
import (
"sync/atomic"
)
// ErrorHandler is called when an error occurs.
type ErrorHandler func(err error)
var errorHandler atomic.Value // stores ErrorHandler
// OnError registers an error handler.
// Thread-safe. Pass nil to clear.
func OnError(h ErrorHandler) {
if h == nil {
errorHandler.Store((ErrorHandler)(nil))
return
}
errorHandler.Store(h)
}
// dispatchError calls the registered error handler.
func dispatchError(err error) {
v := errorHandler.Load()
if v == nil {
return
}
h, ok := v.(ErrorHandler)
if !ok || h == nil {
return
}
h(err)
}
```
---
## Thread Safety
### Mutex Patterns
Use `sync.RWMutex` for state that is read more than written:
```go
type Service struct {
data map[string]any
mu sync.RWMutex
}
func (s *Service) Get(key string) (any, bool) {
s.mu.RLock()
defer s.mu.RUnlock()
v, ok := s.data[key]
return v, ok
}
func (s *Service) Set(key string, value any) {
s.mu.Lock()
defer s.mu.Unlock()
s.data[key] = value
}
```
### Atomic Values
Use `atomic.Pointer[T]` for single values accessed frequently:
```go
var config atomic.Pointer[Config]
func GetConfig() *Config {
return config.Load()
}
func SetConfig(c *Config) {
config.Store(c)
}
```
---
## Error Handling
### Error Types
Define package-level errors:
```go
// Errors
var (
ErrNotFound = errors.New("mypackage: not found")
ErrInvalidArg = errors.New("mypackage: invalid argument")
ErrNotRunning = errors.New("mypackage: not running")
)
```
### Wrapped Errors
Use `fmt.Errorf` with `%w` for context:
```go
func (s *Service) Load(path string) error {
data, err := os.ReadFile(path)
if err != nil {
return fmt.Errorf("failed to load config: %w", err)
}
// ...
}
```
### Error Struct (optional)
For errors needing additional context:
```go
type ServiceError struct {
Op string // Operation that failed
Path string // Resource path
Err error // Underlying error
}
func (e *ServiceError) Error() string {
return fmt.Sprintf("%s %s: %v", e.Op, e.Path, e.Err)
}
func (e *ServiceError) Unwrap() error {
return e.Err
}
```
---
## Testing
### Test File Organisation
Place tests alongside implementation:
```
mypackage.go → mypackage_test.go
service.go → service_test.go
buffer.go → buffer_test.go
```
### Test Helpers
Create helpers for common setup:
```go
func newTestService(t *testing.T) (*Service, *framework.Core) {
t.Helper()
core, err := framework.New(
framework.WithName("mypackage", NewService(Options{})),
)
require.NoError(t, err)
svc, err := framework.ServiceFor[*Service](core, "mypackage")
require.NoError(t, err)
return svc, core
}
```
### Test Naming Convention
Use descriptive subtests:
```go
func TestService_DoSomething(t *testing.T) {
t.Run("valid input", func(t *testing.T) {
// ...
})
t.Run("empty input returns error", func(t *testing.T) {
// ...
})
t.Run("concurrent access", func(t *testing.T) {
// ...
})
}
```
### Testing Actions
Verify ACTION broadcasts:
```go
func TestService_BroadcastsActions(t *testing.T) {
core, _ := framework.New(
framework.WithName("mypackage", NewService(Options{})),
)
var received []ActionItemCreated
var mu sync.Mutex
core.RegisterAction(func(c *framework.Core, msg framework.Message) error {
if m, ok := msg.(ActionItemCreated); ok {
mu.Lock()
received = append(received, m)
mu.Unlock()
}
return nil
})
svc, _ := framework.ServiceFor[*Service](core, "mypackage")
svc.CreateItem("test")
mu.Lock()
assert.Len(t, received, 1)
assert.Equal(t, "test", received[0].Name)
mu.Unlock()
}
```
---
## Documentation
### Package Doc
Every package needs a doc comment in the main file:
```go
// Package mypackage provides functionality for X.
//
// # Getting Started
//
// svc, err := mypackage.New()
// result := svc.DoSomething("input")
//
// # Core Integration
//
// core, _ := framework.New(
// framework.WithName("mypackage", mypackage.NewService(mypackage.Options{})),
// )
package mypackage
```
### Function Documentation
Document public functions with examples:
```go
// DoSomething performs X operation with the given input.
// Returns ErrInvalidArg if input is empty.
//
// result, err := svc.DoSomething("hello")
// if err != nil {
// return err
// }
func (s *Service) DoSomething(input string) (string, error) {
// ...
}
```
---
## Checklist
When creating a new package, ensure:
- [ ] `Service` struct embeds `framework.ServiceRuntime[Options]`
- [ ] `NewService()` factory function for Core registration
- [ ] `Default()` / `SetDefault()` with `atomic.Pointer`
- [ ] Package-level convenience functions
- [ ] Thread-safe public APIs (mutex or atomic)
- [ ] ACTION messages for events (if applicable)
- [ ] Hooks with atomic handlers (if applicable)
- [ ] Comprehensive tests with helpers
- [ ] Package documentation with examples
## Reference Implementations
- **`pkg/i18n`** - Full reference with handlers, modes, hooks, grammar
- **`pkg/process`** - Simpler example with ACTION events and runner orchestration
- **`pkg/cli`** - Service integration with runtime lifecycle

37
pkg/process/actions.go Normal file
View file

@ -0,0 +1,37 @@
package process
import "time"
// --- ACTION messages (broadcast via Core.ACTION) ---
// ActionProcessStarted is broadcast when a process begins execution.
type ActionProcessStarted struct {
ID string
Command string
Args []string
Dir string
PID int
}
// ActionProcessOutput is broadcast for each line of output.
// Subscribe to this for real-time streaming.
type ActionProcessOutput struct {
ID string
Line string
Stream Stream
}
// ActionProcessExited is broadcast when a process completes.
// Check ExitCode for success (0) or failure.
type ActionProcessExited struct {
ID string
ExitCode int
Duration time.Duration
Error error // Non-nil if failed to start or was killed
}
// ActionProcessKilled is broadcast when a process is terminated.
type ActionProcessKilled struct {
ID string
Signal string
}

108
pkg/process/buffer.go Normal file
View file

@ -0,0 +1,108 @@
package process
import "sync"
// RingBuffer is a fixed-size circular buffer that overwrites old data.
// Thread-safe for concurrent reads and writes.
type RingBuffer struct {
data []byte
size int
start int
end int
full bool
mu sync.RWMutex
}
// NewRingBuffer creates a ring buffer with the given capacity.
func NewRingBuffer(size int) *RingBuffer {
return &RingBuffer{
data: make([]byte, size),
size: size,
}
}
// Write appends data to the buffer, overwriting oldest data if full.
func (rb *RingBuffer) Write(p []byte) (n int, err error) {
rb.mu.Lock()
defer rb.mu.Unlock()
for _, b := range p {
rb.data[rb.end] = b
rb.end = (rb.end + 1) % rb.size
if rb.full {
rb.start = (rb.start + 1) % rb.size
}
if rb.end == rb.start {
rb.full = true
}
}
return len(p), nil
}
// String returns the buffer contents as a string.
func (rb *RingBuffer) String() string {
rb.mu.RLock()
defer rb.mu.RUnlock()
if !rb.full && rb.start == rb.end {
return ""
}
if rb.full {
result := make([]byte, rb.size)
copy(result, rb.data[rb.start:])
copy(result[rb.size-rb.start:], rb.data[:rb.end])
return string(result)
}
return string(rb.data[rb.start:rb.end])
}
// Bytes returns a copy of the buffer contents.
func (rb *RingBuffer) Bytes() []byte {
rb.mu.RLock()
defer rb.mu.RUnlock()
if !rb.full && rb.start == rb.end {
return nil
}
if rb.full {
result := make([]byte, rb.size)
copy(result, rb.data[rb.start:])
copy(result[rb.size-rb.start:], rb.data[:rb.end])
return result
}
result := make([]byte, rb.end-rb.start)
copy(result, rb.data[rb.start:rb.end])
return result
}
// Len returns the current length of data in the buffer.
func (rb *RingBuffer) Len() int {
rb.mu.RLock()
defer rb.mu.RUnlock()
if rb.full {
return rb.size
}
if rb.end >= rb.start {
return rb.end - rb.start
}
return rb.size - rb.start + rb.end
}
// Cap returns the buffer capacity.
func (rb *RingBuffer) Cap() int {
return rb.size
}
// Reset clears the buffer.
func (rb *RingBuffer) Reset() {
rb.mu.Lock()
defer rb.mu.Unlock()
rb.start = 0
rb.end = 0
rb.full = false
}

View file

@ -0,0 +1,72 @@
package process
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestRingBuffer(t *testing.T) {
t.Run("write and read", func(t *testing.T) {
rb := NewRingBuffer(10)
n, err := rb.Write([]byte("hello"))
assert.NoError(t, err)
assert.Equal(t, 5, n)
assert.Equal(t, "hello", rb.String())
assert.Equal(t, 5, rb.Len())
})
t.Run("overflow wraps around", func(t *testing.T) {
rb := NewRingBuffer(5)
rb.Write([]byte("hello"))
assert.Equal(t, "hello", rb.String())
rb.Write([]byte("world"))
// Should contain "world" (overwrote "hello")
assert.Equal(t, 5, rb.Len())
assert.Equal(t, "world", rb.String())
})
t.Run("partial overflow", func(t *testing.T) {
rb := NewRingBuffer(10)
rb.Write([]byte("hello"))
rb.Write([]byte("worldx"))
// Should contain "lloworldx" (11 chars, buffer is 10)
assert.Equal(t, 10, rb.Len())
})
t.Run("empty buffer", func(t *testing.T) {
rb := NewRingBuffer(10)
assert.Equal(t, "", rb.String())
assert.Equal(t, 0, rb.Len())
assert.Nil(t, rb.Bytes())
})
t.Run("reset", func(t *testing.T) {
rb := NewRingBuffer(10)
rb.Write([]byte("hello"))
rb.Reset()
assert.Equal(t, "", rb.String())
assert.Equal(t, 0, rb.Len())
})
t.Run("cap", func(t *testing.T) {
rb := NewRingBuffer(42)
assert.Equal(t, 42, rb.Cap())
})
t.Run("bytes returns copy", func(t *testing.T) {
rb := NewRingBuffer(10)
rb.Write([]byte("hello"))
bytes := rb.Bytes()
assert.Equal(t, []byte("hello"), bytes)
// Modifying returned bytes shouldn't affect buffer
bytes[0] = 'x'
assert.Equal(t, "hello", rb.String())
})
}

182
pkg/process/process.go Normal file
View file

@ -0,0 +1,182 @@
package process
import (
"context"
"io"
"os/exec"
"sync"
"time"
)
// Process represents a managed external process.
type Process struct {
ID string
Command string
Args []string
Dir string
Env []string
StartedAt time.Time
Status Status
ExitCode int
Duration time.Duration
cmd *exec.Cmd
ctx context.Context
cancel context.CancelFunc
output *RingBuffer
stdin io.WriteCloser
done chan struct{}
mu sync.RWMutex
}
// Info returns a snapshot of process state.
func (p *Process) Info() Info {
p.mu.RLock()
defer p.mu.RUnlock()
pid := 0
if p.cmd != nil && p.cmd.Process != nil {
pid = p.cmd.Process.Pid
}
return Info{
ID: p.ID,
Command: p.Command,
Args: p.Args,
Dir: p.Dir,
StartedAt: p.StartedAt,
Status: p.Status,
ExitCode: p.ExitCode,
Duration: p.Duration,
PID: pid,
}
}
// Output returns the captured output as a string.
func (p *Process) Output() string {
p.mu.RLock()
defer p.mu.RUnlock()
if p.output == nil {
return ""
}
return p.output.String()
}
// OutputBytes returns the captured output as bytes.
func (p *Process) OutputBytes() []byte {
p.mu.RLock()
defer p.mu.RUnlock()
if p.output == nil {
return nil
}
return p.output.Bytes()
}
// IsRunning returns true if the process is still executing.
func (p *Process) IsRunning() bool {
p.mu.RLock()
defer p.mu.RUnlock()
return p.Status == StatusRunning
}
// Wait blocks until the process exits.
func (p *Process) Wait() error {
<-p.done
p.mu.RLock()
defer p.mu.RUnlock()
if p.Status == StatusFailed || p.Status == StatusKilled {
return &exec.ExitError{}
}
if p.ExitCode != 0 {
return &exec.ExitError{}
}
return nil
}
// Done returns a channel that closes when the process exits.
func (p *Process) Done() <-chan struct{} {
return p.done
}
// Kill forcefully terminates the process.
func (p *Process) Kill() error {
p.mu.Lock()
defer p.mu.Unlock()
if p.Status != StatusRunning {
return nil
}
if p.cmd == nil || p.cmd.Process == nil {
return nil
}
return p.cmd.Process.Kill()
}
// Signal sends a signal to the process.
func (p *Process) Signal(sig interface{ Signal() }) error {
p.mu.Lock()
defer p.mu.Unlock()
if p.Status != StatusRunning {
return nil
}
if p.cmd == nil || p.cmd.Process == nil {
return nil
}
// Type assert to os.Signal for Process.Signal
if osSig, ok := sig.(interface{ String() string }); ok {
_ = osSig // Satisfy linter
}
return p.cmd.Process.Kill() // Simplified - would use Signal in full impl
}
// SendInput writes to the process stdin.
func (p *Process) SendInput(input string) error {
p.mu.RLock()
defer p.mu.RUnlock()
if p.Status != StatusRunning {
return ErrProcessNotRunning
}
if p.stdin == nil {
return ErrStdinNotAvailable
}
_, err := p.stdin.Write([]byte(input))
return err
}
// CloseStdin closes the process stdin pipe.
func (p *Process) CloseStdin() error {
p.mu.Lock()
defer p.mu.Unlock()
if p.stdin == nil {
return nil
}
err := p.stdin.Close()
p.stdin = nil
return err
}
// setStatus updates the process status (internal use).
func (p *Process) setStatus(status Status) {
p.mu.Lock()
defer p.mu.Unlock()
p.Status = status
}
// setExitCode sets the exit code and duration (internal use).
func (p *Process) setExitCode(code int, duration time.Duration) {
p.mu.Lock()
defer p.mu.Unlock()
p.ExitCode = code
p.Duration = duration
}

View file

@ -0,0 +1,105 @@
package process
import (
"context"
"sync"
"sync/atomic"
"github.com/host-uk/core/pkg/framework"
)
// Global default service (follows i18n pattern).
var (
defaultService atomic.Pointer[Service]
defaultOnce sync.Once
defaultErr error
)
// Default returns the global process service.
// Returns nil if not initialized.
func Default() *Service {
return defaultService.Load()
}
// SetDefault sets the global process service.
// Thread-safe: can be called concurrently with Default().
func SetDefault(s *Service) {
if s == nil {
panic("process: SetDefault called with nil service")
}
defaultService.Store(s)
}
// Init initializes the default global service with a Core instance.
// This is typically called during application startup.
func Init(c *framework.Core) error {
defaultOnce.Do(func() {
factory := NewService(Options{})
svc, err := factory(c)
if err != nil {
defaultErr = err
return
}
defaultService.Store(svc.(*Service))
})
return defaultErr
}
// --- Global convenience functions ---
// Start spawns a new process using the default service.
func Start(ctx context.Context, command string, args ...string) (*Process, error) {
svc := Default()
if svc == nil {
return nil, ErrServiceNotInitialized
}
return svc.Start(ctx, command, args...)
}
// Run executes a command and waits for completion using the default service.
func Run(ctx context.Context, command string, args ...string) (string, error) {
svc := Default()
if svc == nil {
return "", ErrServiceNotInitialized
}
return svc.Run(ctx, command, args...)
}
// Get returns a process by ID from the default service.
func Get(id string) (*Process, error) {
svc := Default()
if svc == nil {
return nil, ErrServiceNotInitialized
}
return svc.Get(id)
}
// List returns all processes from the default service.
func List() []*Process {
svc := Default()
if svc == nil {
return nil
}
return svc.List()
}
// Kill terminates a process by ID using the default service.
func Kill(id string) error {
svc := Default()
if svc == nil {
return ErrServiceNotInitialized
}
return svc.Kill(id)
}
// ErrServiceNotInitialized is returned when the service is not initialized.
var ErrServiceNotInitialized = &ServiceError{msg: "process: service not initialized"}
// ServiceError represents a service-level error.
type ServiceError struct {
msg string
}
func (e *ServiceError) Error() string {
return e.msg
}

293
pkg/process/runner.go Normal file
View file

@ -0,0 +1,293 @@
package process
import (
"context"
"fmt"
"sync"
"time"
)
// Runner orchestrates multiple processes with dependencies.
type Runner struct {
service *Service
}
// NewRunner creates a runner for the given service.
func NewRunner(svc *Service) *Runner {
return &Runner{service: svc}
}
// RunSpec defines a process to run with optional dependencies.
type RunSpec struct {
// Name is a friendly identifier (e.g., "lint", "test").
Name string
// Command is the executable to run.
Command string
// Args are the command arguments.
Args []string
// Dir is the working directory.
Dir string
// Env are additional environment variables.
Env []string
// After lists spec names that must complete successfully first.
After []string
// AllowFailure if true, continues pipeline even if this spec fails.
AllowFailure bool
}
// RunResult captures the outcome of a single process.
type RunResult struct {
Name string
Spec RunSpec
ExitCode int
Duration time.Duration
Output string
Error error
Skipped bool
}
// Passed returns true if the process succeeded.
func (r RunResult) Passed() bool {
return !r.Skipped && r.Error == nil && r.ExitCode == 0
}
// RunAllResult is the aggregate result of running multiple specs.
type RunAllResult struct {
Results []RunResult
Duration time.Duration
Passed int
Failed int
Skipped int
}
// Success returns true if all non-skipped specs passed.
func (r RunAllResult) Success() bool {
return r.Failed == 0
}
// RunAll executes specs respecting dependencies, parallelising where possible.
func (r *Runner) RunAll(ctx context.Context, specs []RunSpec) (*RunAllResult, error) {
start := time.Now()
// Build dependency graph
specMap := make(map[string]RunSpec)
for _, spec := range specs {
specMap[spec.Name] = spec
}
// Track completion
completed := make(map[string]*RunResult)
var completedMu sync.Mutex
results := make([]RunResult, 0, len(specs))
var resultsMu sync.Mutex
// Process specs in waves
remaining := make(map[string]RunSpec)
for _, spec := range specs {
remaining[spec.Name] = spec
}
for len(remaining) > 0 {
// Find specs ready to run (all dependencies satisfied)
ready := make([]RunSpec, 0)
for _, spec := range remaining {
if r.canRun(spec, completed) {
ready = append(ready, spec)
}
}
if len(ready) == 0 && len(remaining) > 0 {
// Deadlock - circular dependency or missing specs
for name := range remaining {
results = append(results, RunResult{
Name: name,
Spec: remaining[name],
Skipped: true,
Error: fmt.Errorf("circular dependency or missing dependency"),
})
}
break
}
// Run ready specs in parallel
var wg sync.WaitGroup
for _, spec := range ready {
wg.Add(1)
go func(spec RunSpec) {
defer wg.Done()
// Check if dependencies failed
completedMu.Lock()
shouldSkip := false
for _, dep := range spec.After {
if result, ok := completed[dep]; ok {
if !result.Passed() && !specMap[dep].AllowFailure {
shouldSkip = true
break
}
}
}
completedMu.Unlock()
var result RunResult
if shouldSkip {
result = RunResult{
Name: spec.Name,
Spec: spec,
Skipped: true,
Error: fmt.Errorf("skipped due to dependency failure"),
}
} else {
result = r.runSpec(ctx, spec)
}
completedMu.Lock()
completed[spec.Name] = &result
completedMu.Unlock()
resultsMu.Lock()
results = append(results, result)
resultsMu.Unlock()
}(spec)
}
wg.Wait()
// Remove completed from remaining
for _, spec := range ready {
delete(remaining, spec.Name)
}
}
// Build aggregate result
aggResult := &RunAllResult{
Results: results,
Duration: time.Since(start),
}
for _, res := range results {
if res.Skipped {
aggResult.Skipped++
} else if res.Passed() {
aggResult.Passed++
} else {
aggResult.Failed++
}
}
return aggResult, nil
}
// canRun checks if all dependencies are completed.
func (r *Runner) canRun(spec RunSpec, completed map[string]*RunResult) bool {
for _, dep := range spec.After {
if _, ok := completed[dep]; !ok {
return false
}
}
return true
}
// runSpec executes a single spec.
func (r *Runner) runSpec(ctx context.Context, spec RunSpec) RunResult {
start := time.Now()
proc, err := r.service.StartWithOptions(ctx, RunOptions{
Command: spec.Command,
Args: spec.Args,
Dir: spec.Dir,
Env: spec.Env,
})
if err != nil {
return RunResult{
Name: spec.Name,
Spec: spec,
Duration: time.Since(start),
Error: err,
}
}
<-proc.Done()
return RunResult{
Name: spec.Name,
Spec: spec,
ExitCode: proc.ExitCode,
Duration: proc.Duration,
Output: proc.Output(),
Error: nil,
}
}
// RunSequential executes specs one after another, stopping on first failure.
func (r *Runner) RunSequential(ctx context.Context, specs []RunSpec) (*RunAllResult, error) {
start := time.Now()
results := make([]RunResult, 0, len(specs))
for _, spec := range specs {
result := r.runSpec(ctx, spec)
results = append(results, result)
if !result.Passed() && !spec.AllowFailure {
// Mark remaining as skipped
for i := len(results); i < len(specs); i++ {
results = append(results, RunResult{
Name: specs[i].Name,
Spec: specs[i],
Skipped: true,
})
}
break
}
}
aggResult := &RunAllResult{
Results: results,
Duration: time.Since(start),
}
for _, res := range results {
if res.Skipped {
aggResult.Skipped++
} else if res.Passed() {
aggResult.Passed++
} else {
aggResult.Failed++
}
}
return aggResult, nil
}
// RunParallel executes all specs concurrently, regardless of dependencies.
func (r *Runner) RunParallel(ctx context.Context, specs []RunSpec) (*RunAllResult, error) {
start := time.Now()
results := make([]RunResult, len(specs))
var wg sync.WaitGroup
for i, spec := range specs {
wg.Add(1)
go func(i int, spec RunSpec) {
defer wg.Done()
results[i] = r.runSpec(ctx, spec)
}(i, spec)
}
wg.Wait()
aggResult := &RunAllResult{
Results: results,
Duration: time.Since(start),
}
for _, res := range results {
if res.Skipped {
aggResult.Skipped++
} else if res.Passed() {
aggResult.Passed++
} else {
aggResult.Failed++
}
}
return aggResult, nil
}

176
pkg/process/runner_test.go Normal file
View file

@ -0,0 +1,176 @@
package process
import (
"context"
"testing"
"github.com/host-uk/core/pkg/framework"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func newTestRunner(t *testing.T) *Runner {
t.Helper()
core, err := framework.New(
framework.WithName("process", NewService(Options{})),
)
require.NoError(t, err)
svc, err := framework.ServiceFor[*Service](core, "process")
require.NoError(t, err)
return NewRunner(svc)
}
func TestRunner_RunSequential(t *testing.T) {
t.Run("all pass", func(t *testing.T) {
runner := newTestRunner(t)
result, err := runner.RunSequential(context.Background(), []RunSpec{
{Name: "first", Command: "echo", Args: []string{"1"}},
{Name: "second", Command: "echo", Args: []string{"2"}},
{Name: "third", Command: "echo", Args: []string{"3"}},
})
require.NoError(t, err)
assert.True(t, result.Success())
assert.Equal(t, 3, result.Passed)
assert.Equal(t, 0, result.Failed)
assert.Equal(t, 0, result.Skipped)
})
t.Run("stops on failure", func(t *testing.T) {
runner := newTestRunner(t)
result, err := runner.RunSequential(context.Background(), []RunSpec{
{Name: "first", Command: "echo", Args: []string{"1"}},
{Name: "fails", Command: "sh", Args: []string{"-c", "exit 1"}},
{Name: "third", Command: "echo", Args: []string{"3"}},
})
require.NoError(t, err)
assert.False(t, result.Success())
assert.Equal(t, 1, result.Passed)
assert.Equal(t, 1, result.Failed)
assert.Equal(t, 1, result.Skipped)
})
t.Run("allow failure continues", func(t *testing.T) {
runner := newTestRunner(t)
result, err := runner.RunSequential(context.Background(), []RunSpec{
{Name: "first", Command: "echo", Args: []string{"1"}},
{Name: "fails", Command: "sh", Args: []string{"-c", "exit 1"}, AllowFailure: true},
{Name: "third", Command: "echo", Args: []string{"3"}},
})
require.NoError(t, err)
// Still counts as failed but pipeline continues
assert.Equal(t, 2, result.Passed)
assert.Equal(t, 1, result.Failed)
assert.Equal(t, 0, result.Skipped)
})
}
func TestRunner_RunParallel(t *testing.T) {
t.Run("all run concurrently", func(t *testing.T) {
runner := newTestRunner(t)
result, err := runner.RunParallel(context.Background(), []RunSpec{
{Name: "first", Command: "echo", Args: []string{"1"}},
{Name: "second", Command: "echo", Args: []string{"2"}},
{Name: "third", Command: "echo", Args: []string{"3"}},
})
require.NoError(t, err)
assert.True(t, result.Success())
assert.Equal(t, 3, result.Passed)
assert.Len(t, result.Results, 3)
})
t.Run("failure doesnt stop others", func(t *testing.T) {
runner := newTestRunner(t)
result, err := runner.RunParallel(context.Background(), []RunSpec{
{Name: "first", Command: "echo", Args: []string{"1"}},
{Name: "fails", Command: "sh", Args: []string{"-c", "exit 1"}},
{Name: "third", Command: "echo", Args: []string{"3"}},
})
require.NoError(t, err)
assert.False(t, result.Success())
assert.Equal(t, 2, result.Passed)
assert.Equal(t, 1, result.Failed)
})
}
func TestRunner_RunAll(t *testing.T) {
t.Run("respects dependencies", func(t *testing.T) {
runner := newTestRunner(t)
result, err := runner.RunAll(context.Background(), []RunSpec{
{Name: "third", Command: "echo", Args: []string{"3"}, After: []string{"second"}},
{Name: "first", Command: "echo", Args: []string{"1"}},
{Name: "second", Command: "echo", Args: []string{"2"}, After: []string{"first"}},
})
require.NoError(t, err)
assert.True(t, result.Success())
assert.Equal(t, 3, result.Passed)
})
t.Run("skips dependents on failure", func(t *testing.T) {
runner := newTestRunner(t)
result, err := runner.RunAll(context.Background(), []RunSpec{
{Name: "first", Command: "sh", Args: []string{"-c", "exit 1"}},
{Name: "second", Command: "echo", Args: []string{"2"}, After: []string{"first"}},
{Name: "third", Command: "echo", Args: []string{"3"}, After: []string{"second"}},
})
require.NoError(t, err)
assert.False(t, result.Success())
assert.Equal(t, 0, result.Passed)
assert.Equal(t, 1, result.Failed)
assert.Equal(t, 2, result.Skipped)
})
t.Run("parallel independent specs", func(t *testing.T) {
runner := newTestRunner(t)
// These should run in parallel since they have no dependencies
result, err := runner.RunAll(context.Background(), []RunSpec{
{Name: "a", Command: "echo", Args: []string{"a"}},
{Name: "b", Command: "echo", Args: []string{"b"}},
{Name: "c", Command: "echo", Args: []string{"c"}},
{Name: "final", Command: "echo", Args: []string{"done"}, After: []string{"a", "b", "c"}},
})
require.NoError(t, err)
assert.True(t, result.Success())
assert.Equal(t, 4, result.Passed)
})
}
func TestRunResult_Passed(t *testing.T) {
t.Run("success", func(t *testing.T) {
r := RunResult{ExitCode: 0}
assert.True(t, r.Passed())
})
t.Run("non-zero exit", func(t *testing.T) {
r := RunResult{ExitCode: 1}
assert.False(t, r.Passed())
})
t.Run("skipped", func(t *testing.T) {
r := RunResult{ExitCode: 0, Skipped: true}
assert.False(t, r.Passed())
})
t.Run("error", func(t *testing.T) {
r := RunResult{ExitCode: 0, Error: assert.AnError}
assert.False(t, r.Passed())
})
}

378
pkg/process/service.go Normal file
View file

@ -0,0 +1,378 @@
package process
import (
"bufio"
"context"
"errors"
"fmt"
"io"
"os/exec"
"sync"
"sync/atomic"
"time"
"github.com/host-uk/core/pkg/framework"
)
// Default buffer size for process output (1MB).
const DefaultBufferSize = 1024 * 1024
// Errors
var (
ErrProcessNotFound = errors.New("process not found")
ErrProcessNotRunning = errors.New("process is not running")
ErrStdinNotAvailable = errors.New("stdin not available")
)
// Service manages process execution with Core IPC integration.
type Service struct {
*framework.ServiceRuntime[Options]
processes map[string]*Process
mu sync.RWMutex
bufSize int
idCounter atomic.Uint64
}
// Options configures the process service.
type Options struct {
// BufferSize is the ring buffer size for output capture.
// Default: 1MB (1024 * 1024 bytes).
BufferSize int
}
// NewService creates a process service factory for Core registration.
//
// core, _ := framework.New(
// framework.WithName("process", process.NewService(process.Options{})),
// )
func NewService(opts Options) func(*framework.Core) (any, error) {
return func(c *framework.Core) (any, error) {
if opts.BufferSize == 0 {
opts.BufferSize = DefaultBufferSize
}
svc := &Service{
ServiceRuntime: framework.NewServiceRuntime(c, opts),
processes: make(map[string]*Process),
bufSize: opts.BufferSize,
}
return svc, nil
}
}
// OnStartup implements framework.Startable.
func (s *Service) OnStartup(ctx context.Context) error {
return nil
}
// OnShutdown implements framework.Stoppable.
// Kills all running processes on shutdown.
func (s *Service) OnShutdown(ctx context.Context) error {
s.mu.RLock()
procs := make([]*Process, 0, len(s.processes))
for _, p := range s.processes {
if p.IsRunning() {
procs = append(procs, p)
}
}
s.mu.RUnlock()
for _, p := range procs {
_ = p.Kill()
}
return nil
}
// Start spawns a new process with the given command and args.
func (s *Service) Start(ctx context.Context, command string, args ...string) (*Process, error) {
return s.StartWithOptions(ctx, RunOptions{
Command: command,
Args: args,
})
}
// StartWithOptions spawns a process with full configuration.
func (s *Service) StartWithOptions(ctx context.Context, opts RunOptions) (*Process, error) {
id := fmt.Sprintf("proc-%d", s.idCounter.Add(1))
procCtx, cancel := context.WithCancel(ctx)
cmd := exec.CommandContext(procCtx, opts.Command, opts.Args...)
if opts.Dir != "" {
cmd.Dir = opts.Dir
}
if len(opts.Env) > 0 {
cmd.Env = append(cmd.Environ(), opts.Env...)
}
// Set up pipes
stdout, err := cmd.StdoutPipe()
if err != nil {
cancel()
return nil, fmt.Errorf("failed to create stdout pipe: %w", err)
}
stderr, err := cmd.StderrPipe()
if err != nil {
cancel()
return nil, fmt.Errorf("failed to create stderr pipe: %w", err)
}
stdin, err := cmd.StdinPipe()
if err != nil {
cancel()
return nil, fmt.Errorf("failed to create stdin pipe: %w", err)
}
// Create output buffer (enabled by default)
var output *RingBuffer
if !opts.DisableCapture {
output = NewRingBuffer(s.bufSize)
}
proc := &Process{
ID: id,
Command: opts.Command,
Args: opts.Args,
Dir: opts.Dir,
Env: opts.Env,
StartedAt: time.Now(),
Status: StatusRunning,
cmd: cmd,
ctx: procCtx,
cancel: cancel,
output: output,
stdin: stdin,
done: make(chan struct{}),
}
// Start the process
if err := cmd.Start(); err != nil {
cancel()
return nil, fmt.Errorf("failed to start process: %w", err)
}
// Store process
s.mu.Lock()
s.processes[id] = proc
s.mu.Unlock()
// Broadcast start
s.Core().ACTION(ActionProcessStarted{
ID: id,
Command: opts.Command,
Args: opts.Args,
Dir: opts.Dir,
PID: cmd.Process.Pid,
})
// Stream output in goroutines
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
s.streamOutput(proc, stdout, StreamStdout)
}()
go func() {
defer wg.Done()
s.streamOutput(proc, stderr, StreamStderr)
}()
// Wait for process completion
go func() {
// Wait for output streaming to complete
wg.Wait()
// Wait for process exit
err := cmd.Wait()
duration := time.Since(proc.StartedAt)
proc.mu.Lock()
proc.Duration = duration
if err != nil {
var exitErr *exec.ExitError
if errors.As(err, &exitErr) {
proc.ExitCode = exitErr.ExitCode()
proc.Status = StatusExited
} else {
proc.Status = StatusFailed
}
} else {
proc.ExitCode = 0
proc.Status = StatusExited
}
status := proc.Status
exitCode := proc.ExitCode
proc.mu.Unlock()
close(proc.done)
// Broadcast exit
var exitErr error
if status == StatusFailed {
exitErr = err
}
s.Core().ACTION(ActionProcessExited{
ID: id,
ExitCode: exitCode,
Duration: duration,
Error: exitErr,
})
}()
return proc, nil
}
// streamOutput reads from a pipe and broadcasts lines via ACTION.
func (s *Service) streamOutput(proc *Process, r io.Reader, stream Stream) {
scanner := bufio.NewScanner(r)
// Increase buffer for long lines
scanner.Buffer(make([]byte, 64*1024), 1024*1024)
for scanner.Scan() {
line := scanner.Text()
// Write to ring buffer
if proc.output != nil {
proc.output.Write([]byte(line + "\n"))
}
// Broadcast output
s.Core().ACTION(ActionProcessOutput{
ID: proc.ID,
Line: line,
Stream: stream,
})
}
}
// Get returns a process by ID.
func (s *Service) Get(id string) (*Process, error) {
s.mu.RLock()
defer s.mu.RUnlock()
proc, ok := s.processes[id]
if !ok {
return nil, ErrProcessNotFound
}
return proc, nil
}
// List returns all processes.
func (s *Service) List() []*Process {
s.mu.RLock()
defer s.mu.RUnlock()
result := make([]*Process, 0, len(s.processes))
for _, p := range s.processes {
result = append(result, p)
}
return result
}
// Running returns all currently running processes.
func (s *Service) Running() []*Process {
s.mu.RLock()
defer s.mu.RUnlock()
var result []*Process
for _, p := range s.processes {
if p.IsRunning() {
result = append(result, p)
}
}
return result
}
// Kill terminates a process by ID.
func (s *Service) Kill(id string) error {
proc, err := s.Get(id)
if err != nil {
return err
}
if err := proc.Kill(); err != nil {
return err
}
s.Core().ACTION(ActionProcessKilled{
ID: id,
Signal: "SIGKILL",
})
return nil
}
// Remove removes a completed process from the list.
func (s *Service) Remove(id string) error {
s.mu.Lock()
defer s.mu.Unlock()
proc, ok := s.processes[id]
if !ok {
return ErrProcessNotFound
}
if proc.IsRunning() {
return errors.New("cannot remove running process")
}
delete(s.processes, id)
return nil
}
// Clear removes all completed processes.
func (s *Service) Clear() {
s.mu.Lock()
defer s.mu.Unlock()
for id, p := range s.processes {
if !p.IsRunning() {
delete(s.processes, id)
}
}
}
// Output returns the captured output of a process.
func (s *Service) Output(id string) (string, error) {
proc, err := s.Get(id)
if err != nil {
return "", err
}
return proc.Output(), nil
}
// Run executes a command and waits for completion.
// Returns the combined output and any error.
func (s *Service) Run(ctx context.Context, command string, args ...string) (string, error) {
proc, err := s.Start(ctx, command, args...)
if err != nil {
return "", err
}
<-proc.Done()
output := proc.Output()
if proc.ExitCode != 0 {
return output, fmt.Errorf("process exited with code %d", proc.ExitCode)
}
return output, nil
}
// RunWithOptions executes a command with options and waits for completion.
func (s *Service) RunWithOptions(ctx context.Context, opts RunOptions) (string, error) {
proc, err := s.StartWithOptions(ctx, opts)
if err != nil {
return "", err
}
<-proc.Done()
output := proc.Output()
if proc.ExitCode != 0 {
return output, fmt.Errorf("process exited with code %d", proc.ExitCode)
}
return output, nil
}

271
pkg/process/service_test.go Normal file
View file

@ -0,0 +1,271 @@
package process
import (
"context"
"strings"
"sync"
"testing"
"time"
"github.com/host-uk/core/pkg/framework"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func newTestService(t *testing.T) (*Service, *framework.Core) {
t.Helper()
core, err := framework.New(
framework.WithName("process", NewService(Options{BufferSize: 1024})),
)
require.NoError(t, err)
svc, err := framework.ServiceFor[*Service](core, "process")
require.NoError(t, err)
return svc, core
}
func TestService_Start(t *testing.T) {
t.Run("echo command", func(t *testing.T) {
svc, _ := newTestService(t)
proc, err := svc.Start(context.Background(), "echo", "hello")
require.NoError(t, err)
require.NotNil(t, proc)
assert.NotEmpty(t, proc.ID)
assert.Equal(t, "echo", proc.Command)
assert.Equal(t, []string{"hello"}, proc.Args)
// Wait for completion
<-proc.Done()
assert.Equal(t, StatusExited, proc.Status)
assert.Equal(t, 0, proc.ExitCode)
assert.Contains(t, proc.Output(), "hello")
})
t.Run("failing command", func(t *testing.T) {
svc, _ := newTestService(t)
proc, err := svc.Start(context.Background(), "sh", "-c", "exit 42")
require.NoError(t, err)
<-proc.Done()
assert.Equal(t, StatusExited, proc.Status)
assert.Equal(t, 42, proc.ExitCode)
})
t.Run("non-existent command", func(t *testing.T) {
svc, _ := newTestService(t)
_, err := svc.Start(context.Background(), "nonexistent_command_xyz")
assert.Error(t, err)
})
t.Run("with working directory", func(t *testing.T) {
svc, _ := newTestService(t)
proc, err := svc.StartWithOptions(context.Background(), RunOptions{
Command: "pwd",
Dir: "/tmp",
})
require.NoError(t, err)
<-proc.Done()
// On macOS /tmp is a symlink to /private/tmp
output := strings.TrimSpace(proc.Output())
assert.True(t, output == "/tmp" || output == "/private/tmp", "got: %s", output)
})
t.Run("context cancellation", func(t *testing.T) {
svc, _ := newTestService(t)
ctx, cancel := context.WithCancel(context.Background())
proc, err := svc.Start(ctx, "sleep", "10")
require.NoError(t, err)
// Cancel immediately
cancel()
select {
case <-proc.Done():
// Good - process was killed
case <-time.After(2 * time.Second):
t.Fatal("process should have been killed")
}
})
}
func TestService_Run(t *testing.T) {
t.Run("returns output", func(t *testing.T) {
svc, _ := newTestService(t)
output, err := svc.Run(context.Background(), "echo", "hello world")
require.NoError(t, err)
assert.Contains(t, output, "hello world")
})
t.Run("returns error on failure", func(t *testing.T) {
svc, _ := newTestService(t)
_, err := svc.Run(context.Background(), "sh", "-c", "exit 1")
assert.Error(t, err)
assert.Contains(t, err.Error(), "exited with code 1")
})
}
func TestService_Actions(t *testing.T) {
t.Run("broadcasts events", func(t *testing.T) {
core, err := framework.New(
framework.WithName("process", NewService(Options{})),
)
require.NoError(t, err)
var started []ActionProcessStarted
var outputs []ActionProcessOutput
var exited []ActionProcessExited
var mu sync.Mutex
core.RegisterAction(func(c *framework.Core, msg framework.Message) error {
mu.Lock()
defer mu.Unlock()
switch m := msg.(type) {
case ActionProcessStarted:
started = append(started, m)
case ActionProcessOutput:
outputs = append(outputs, m)
case ActionProcessExited:
exited = append(exited, m)
}
return nil
})
svc, _ := framework.ServiceFor[*Service](core, "process")
proc, err := svc.Start(context.Background(), "echo", "test")
require.NoError(t, err)
<-proc.Done()
// Give time for events to propagate
time.Sleep(10 * time.Millisecond)
mu.Lock()
defer mu.Unlock()
assert.Len(t, started, 1)
assert.Equal(t, "echo", started[0].Command)
assert.Equal(t, []string{"test"}, started[0].Args)
assert.NotEmpty(t, outputs)
foundTest := false
for _, o := range outputs {
if strings.Contains(o.Line, "test") {
foundTest = true
break
}
}
assert.True(t, foundTest, "should have output containing 'test'")
assert.Len(t, exited, 1)
assert.Equal(t, 0, exited[0].ExitCode)
})
}
func TestService_List(t *testing.T) {
t.Run("tracks processes", func(t *testing.T) {
svc, _ := newTestService(t)
proc1, _ := svc.Start(context.Background(), "echo", "1")
proc2, _ := svc.Start(context.Background(), "echo", "2")
<-proc1.Done()
<-proc2.Done()
list := svc.List()
assert.Len(t, list, 2)
})
t.Run("get by id", func(t *testing.T) {
svc, _ := newTestService(t)
proc, _ := svc.Start(context.Background(), "echo", "test")
<-proc.Done()
got, err := svc.Get(proc.ID)
require.NoError(t, err)
assert.Equal(t, proc.ID, got.ID)
})
t.Run("get not found", func(t *testing.T) {
svc, _ := newTestService(t)
_, err := svc.Get("nonexistent")
assert.ErrorIs(t, err, ErrProcessNotFound)
})
}
func TestService_Remove(t *testing.T) {
t.Run("removes completed process", func(t *testing.T) {
svc, _ := newTestService(t)
proc, _ := svc.Start(context.Background(), "echo", "test")
<-proc.Done()
err := svc.Remove(proc.ID)
require.NoError(t, err)
_, err = svc.Get(proc.ID)
assert.ErrorIs(t, err, ErrProcessNotFound)
})
t.Run("cannot remove running process", func(t *testing.T) {
svc, _ := newTestService(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
proc, _ := svc.Start(ctx, "sleep", "10")
err := svc.Remove(proc.ID)
assert.Error(t, err)
cancel()
<-proc.Done()
})
}
func TestService_Clear(t *testing.T) {
t.Run("clears completed processes", func(t *testing.T) {
svc, _ := newTestService(t)
proc1, _ := svc.Start(context.Background(), "echo", "1")
proc2, _ := svc.Start(context.Background(), "echo", "2")
<-proc1.Done()
<-proc2.Done()
assert.Len(t, svc.List(), 2)
svc.Clear()
assert.Len(t, svc.List(), 0)
})
}
func TestProcess_Info(t *testing.T) {
svc, _ := newTestService(t)
proc, _ := svc.Start(context.Background(), "echo", "hello")
<-proc.Done()
info := proc.Info()
assert.Equal(t, proc.ID, info.ID)
assert.Equal(t, "echo", info.Command)
assert.Equal(t, []string{"hello"}, info.Args)
assert.Equal(t, StatusExited, info.Status)
assert.Equal(t, 0, info.ExitCode)
}

86
pkg/process/types.go Normal file
View file

@ -0,0 +1,86 @@
// Package process provides process management with Core IPC integration.
//
// The process package enables spawning, monitoring, and controlling external
// processes with output streaming via the Core ACTION system.
//
// # Getting Started
//
// // Register with Core
// core, _ := framework.New(
// framework.WithName("process", process.NewService(process.Options{})),
// )
//
// // Get service and run a process
// svc := framework.MustServiceFor[*process.Service](core, "process")
// proc, _ := svc.Start(ctx, "go", "test", "./...")
//
// # Listening for Events
//
// Process events are broadcast via Core.ACTION:
//
// core.RegisterAction(func(c *framework.Core, msg framework.Message) error {
// switch m := msg.(type) {
// case process.ActionProcessOutput:
// fmt.Print(m.Line)
// case process.ActionProcessExited:
// fmt.Printf("Exit code: %d\n", m.ExitCode)
// }
// return nil
// })
package process
import "time"
// Status represents the process lifecycle state.
type Status string
const (
// StatusPending indicates the process is queued but not yet started.
StatusPending Status = "pending"
// StatusRunning indicates the process is actively executing.
StatusRunning Status = "running"
// StatusExited indicates the process completed (check ExitCode).
StatusExited Status = "exited"
// StatusFailed indicates the process could not be started.
StatusFailed Status = "failed"
// StatusKilled indicates the process was terminated by signal.
StatusKilled Status = "killed"
)
// Stream identifies the output source.
type Stream string
const (
// StreamStdout is standard output.
StreamStdout Stream = "stdout"
// StreamStderr is standard error.
StreamStderr Stream = "stderr"
)
// RunOptions configures process execution.
type RunOptions struct {
// Command is the executable to run.
Command string
// Args are the command arguments.
Args []string
// Dir is the working directory (empty = current).
Dir string
// Env are additional environment variables (KEY=VALUE format).
Env []string
// DisableCapture disables output buffering.
// By default, output is captured to a ring buffer.
DisableCapture bool
}
// Info provides a snapshot of process state without internal fields.
type Info struct {
ID string `json:"id"`
Command string `json:"command"`
Args []string `json:"args"`
Dir string `json:"dir"`
StartedAt time.Time `json:"startedAt"`
Status Status `json:"status"`
ExitCode int `json:"exitCode"`
Duration time.Duration `json:"duration"`
PID int `json:"pid"`
}