feat: extract process package from core/go pkg/process

Process management with Core IPC integration, output streaming via
ring buffer, exec wrapper with logging, and dependency-aware runner.

Moved from forge.lthn.ai/core/go/pkg/process to standalone module.

Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
Snider 2026-03-06 12:50:09 +00:00
commit 8095807ad6
17 changed files with 2631 additions and 0 deletions

37
actions.go Normal file
View file

@ -0,0 +1,37 @@
package process
import "time"
// --- ACTION messages (broadcast via Core.ACTION) ---
// ActionProcessStarted is broadcast when a process begins execution.
type ActionProcessStarted struct {
ID string
Command string
Args []string
Dir string
PID int
}
// ActionProcessOutput is broadcast for each line of output.
// Subscribe to this for real-time streaming.
type ActionProcessOutput struct {
ID string
Line string
Stream Stream
}
// ActionProcessExited is broadcast when a process completes.
// Check ExitCode for success (0) or failure.
type ActionProcessExited struct {
ID string
ExitCode int
Duration time.Duration
Error error // Non-nil if failed to start or was killed
}
// ActionProcessKilled is broadcast when a process is terminated.
type ActionProcessKilled struct {
ID string
Signal string
}

108
buffer.go Normal file
View file

@ -0,0 +1,108 @@
package process
import "sync"
// RingBuffer is a fixed-size circular buffer that overwrites old data.
// Thread-safe for concurrent reads and writes.
type RingBuffer struct {
data []byte
size int
start int
end int
full bool
mu sync.RWMutex
}
// NewRingBuffer creates a ring buffer with the given capacity.
func NewRingBuffer(size int) *RingBuffer {
return &RingBuffer{
data: make([]byte, size),
size: size,
}
}
// Write appends data to the buffer, overwriting oldest data if full.
func (rb *RingBuffer) Write(p []byte) (n int, err error) {
rb.mu.Lock()
defer rb.mu.Unlock()
for _, b := range p {
rb.data[rb.end] = b
rb.end = (rb.end + 1) % rb.size
if rb.full {
rb.start = (rb.start + 1) % rb.size
}
if rb.end == rb.start {
rb.full = true
}
}
return len(p), nil
}
// String returns the buffer contents as a string.
func (rb *RingBuffer) String() string {
rb.mu.RLock()
defer rb.mu.RUnlock()
if !rb.full && rb.start == rb.end {
return ""
}
if rb.full {
result := make([]byte, rb.size)
copy(result, rb.data[rb.start:])
copy(result[rb.size-rb.start:], rb.data[:rb.end])
return string(result)
}
return string(rb.data[rb.start:rb.end])
}
// Bytes returns a copy of the buffer contents.
func (rb *RingBuffer) Bytes() []byte {
rb.mu.RLock()
defer rb.mu.RUnlock()
if !rb.full && rb.start == rb.end {
return nil
}
if rb.full {
result := make([]byte, rb.size)
copy(result, rb.data[rb.start:])
copy(result[rb.size-rb.start:], rb.data[:rb.end])
return result
}
result := make([]byte, rb.end-rb.start)
copy(result, rb.data[rb.start:rb.end])
return result
}
// Len returns the current length of data in the buffer.
func (rb *RingBuffer) Len() int {
rb.mu.RLock()
defer rb.mu.RUnlock()
if rb.full {
return rb.size
}
if rb.end >= rb.start {
return rb.end - rb.start
}
return rb.size - rb.start + rb.end
}
// Cap returns the buffer capacity.
func (rb *RingBuffer) Cap() int {
return rb.size
}
// Reset clears the buffer.
func (rb *RingBuffer) Reset() {
rb.mu.Lock()
defer rb.mu.Unlock()
rb.start = 0
rb.end = 0
rb.full = false
}

72
buffer_test.go Normal file
View file

@ -0,0 +1,72 @@
package process
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestRingBuffer(t *testing.T) {
t.Run("write and read", func(t *testing.T) {
rb := NewRingBuffer(10)
n, err := rb.Write([]byte("hello"))
assert.NoError(t, err)
assert.Equal(t, 5, n)
assert.Equal(t, "hello", rb.String())
assert.Equal(t, 5, rb.Len())
})
t.Run("overflow wraps around", func(t *testing.T) {
rb := NewRingBuffer(5)
_, _ = rb.Write([]byte("hello"))
assert.Equal(t, "hello", rb.String())
_, _ = rb.Write([]byte("world"))
// Should contain "world" (overwrote "hello")
assert.Equal(t, 5, rb.Len())
assert.Equal(t, "world", rb.String())
})
t.Run("partial overflow", func(t *testing.T) {
rb := NewRingBuffer(10)
_, _ = rb.Write([]byte("hello"))
_, _ = rb.Write([]byte("worldx"))
// Should contain "lloworldx" (11 chars, buffer is 10)
assert.Equal(t, 10, rb.Len())
})
t.Run("empty buffer", func(t *testing.T) {
rb := NewRingBuffer(10)
assert.Equal(t, "", rb.String())
assert.Equal(t, 0, rb.Len())
assert.Nil(t, rb.Bytes())
})
t.Run("reset", func(t *testing.T) {
rb := NewRingBuffer(10)
_, _ = rb.Write([]byte("hello"))
rb.Reset()
assert.Equal(t, "", rb.String())
assert.Equal(t, 0, rb.Len())
})
t.Run("cap", func(t *testing.T) {
rb := NewRingBuffer(42)
assert.Equal(t, 42, rb.Cap())
})
t.Run("bytes returns copy", func(t *testing.T) {
rb := NewRingBuffer(10)
_, _ = rb.Write([]byte("hello"))
bytes := rb.Bytes()
assert.Equal(t, []byte("hello"), bytes)
// Modifying returned bytes shouldn't affect buffer
bytes[0] = 'x'
assert.Equal(t, "hello", rb.String())
})
}

176
exec/exec.go Normal file
View file

@ -0,0 +1,176 @@
package exec
import (
"bytes"
"context"
"fmt"
"io"
"os"
"os/exec"
"strings"
)
// Options configuration for command execution
type Options struct {
Dir string
Env []string
Stdin io.Reader
Stdout io.Writer
Stderr io.Writer
// If true, command will run in background (not implemented in this wrapper yet)
// Background bool
}
// Command wraps os/exec.Command with logging and context
func Command(ctx context.Context, name string, args ...string) *Cmd {
return &Cmd{
name: name,
args: args,
ctx: ctx,
}
}
// Cmd represents a wrapped command
type Cmd struct {
name string
args []string
ctx context.Context
opts Options
cmd *exec.Cmd
logger Logger
}
// WithDir sets the working directory
func (c *Cmd) WithDir(dir string) *Cmd {
c.opts.Dir = dir
return c
}
// WithEnv sets the environment variables
func (c *Cmd) WithEnv(env []string) *Cmd {
c.opts.Env = env
return c
}
// WithStdin sets stdin
func (c *Cmd) WithStdin(r io.Reader) *Cmd {
c.opts.Stdin = r
return c
}
// WithStdout sets stdout
func (c *Cmd) WithStdout(w io.Writer) *Cmd {
c.opts.Stdout = w
return c
}
// WithStderr sets stderr
func (c *Cmd) WithStderr(w io.Writer) *Cmd {
c.opts.Stderr = w
return c
}
// WithLogger sets a custom logger for this command.
// If not set, the package default logger is used.
func (c *Cmd) WithLogger(l Logger) *Cmd {
c.logger = l
return c
}
// Run executes the command and waits for it to finish.
// It automatically logs the command execution at debug level.
func (c *Cmd) Run() error {
c.prepare()
c.logDebug("executing command")
if err := c.cmd.Run(); err != nil {
wrapped := wrapError(err, c.name, c.args)
c.logError("command failed", wrapped)
return wrapped
}
return nil
}
// Output runs the command and returns its standard output.
func (c *Cmd) Output() ([]byte, error) {
c.prepare()
c.logDebug("executing command")
out, err := c.cmd.Output()
if err != nil {
wrapped := wrapError(err, c.name, c.args)
c.logError("command failed", wrapped)
return nil, wrapped
}
return out, nil
}
// CombinedOutput runs the command and returns its combined standard output and standard error.
func (c *Cmd) CombinedOutput() ([]byte, error) {
c.prepare()
c.logDebug("executing command")
out, err := c.cmd.CombinedOutput()
if err != nil {
wrapped := wrapError(err, c.name, c.args)
c.logError("command failed", wrapped)
return out, wrapped
}
return out, nil
}
func (c *Cmd) prepare() {
if c.ctx != nil {
c.cmd = exec.CommandContext(c.ctx, c.name, c.args...)
} else {
// Should we enforce context? The issue says "Enforce context usage".
// For now, let's allow nil but log a warning if we had a logger?
// Or strictly panic/error?
// Let's fallback to Background for now but maybe strict later.
c.cmd = exec.Command(c.name, c.args...)
}
c.cmd.Dir = c.opts.Dir
if len(c.opts.Env) > 0 {
c.cmd.Env = append(os.Environ(), c.opts.Env...)
}
c.cmd.Stdin = c.opts.Stdin
c.cmd.Stdout = c.opts.Stdout
c.cmd.Stderr = c.opts.Stderr
}
// RunQuiet executes the command suppressing stdout unless there is an error.
// Useful for internal commands.
func RunQuiet(ctx context.Context, name string, args ...string) error {
var stderr bytes.Buffer
cmd := Command(ctx, name, args...).WithStderr(&stderr)
if err := cmd.Run(); err != nil {
// Include stderr in error message
return fmt.Errorf("%w: %s", err, strings.TrimSpace(stderr.String()))
}
return nil
}
func wrapError(err error, name string, args []string) error {
cmdStr := name + " " + strings.Join(args, " ")
if exitErr, ok := err.(*exec.ExitError); ok {
return fmt.Errorf("command %q failed with exit code %d: %w", cmdStr, exitErr.ExitCode(), err)
}
return fmt.Errorf("failed to execute %q: %w", cmdStr, err)
}
func (c *Cmd) getLogger() Logger {
if c.logger != nil {
return c.logger
}
return defaultLogger
}
func (c *Cmd) logDebug(msg string) {
c.getLogger().Debug(msg, "cmd", c.name, "args", strings.Join(c.args, " "))
}
func (c *Cmd) logError(msg string, err error) {
c.getLogger().Error(msg, "cmd", c.name, "args", strings.Join(c.args, " "), "err", err)
}

148
exec/exec_test.go Normal file
View file

@ -0,0 +1,148 @@
package exec_test
import (
"context"
"strings"
"testing"
"forge.lthn.ai/core/go-process/exec"
)
// mockLogger captures log calls for testing
type mockLogger struct {
debugCalls []logCall
errorCalls []logCall
}
type logCall struct {
msg string
keyvals []any
}
func (m *mockLogger) Debug(msg string, keyvals ...any) {
m.debugCalls = append(m.debugCalls, logCall{msg, keyvals})
}
func (m *mockLogger) Error(msg string, keyvals ...any) {
m.errorCalls = append(m.errorCalls, logCall{msg, keyvals})
}
func TestCommand_Run_Good_LogsDebug(t *testing.T) {
logger := &mockLogger{}
ctx := context.Background()
err := exec.Command(ctx, "echo", "hello").
WithLogger(logger).
Run()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if len(logger.debugCalls) != 1 {
t.Fatalf("expected 1 debug call, got %d", len(logger.debugCalls))
}
if logger.debugCalls[0].msg != "executing command" {
t.Errorf("expected msg 'executing command', got %q", logger.debugCalls[0].msg)
}
if len(logger.errorCalls) != 0 {
t.Errorf("expected no error calls, got %d", len(logger.errorCalls))
}
}
func TestCommand_Run_Bad_LogsError(t *testing.T) {
logger := &mockLogger{}
ctx := context.Background()
err := exec.Command(ctx, "false").
WithLogger(logger).
Run()
if err == nil {
t.Fatal("expected error")
}
if len(logger.debugCalls) != 1 {
t.Fatalf("expected 1 debug call, got %d", len(logger.debugCalls))
}
if len(logger.errorCalls) != 1 {
t.Fatalf("expected 1 error call, got %d", len(logger.errorCalls))
}
if logger.errorCalls[0].msg != "command failed" {
t.Errorf("expected msg 'command failed', got %q", logger.errorCalls[0].msg)
}
}
func TestCommand_Output_Good(t *testing.T) {
logger := &mockLogger{}
ctx := context.Background()
out, err := exec.Command(ctx, "echo", "test").
WithLogger(logger).
Output()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if strings.TrimSpace(string(out)) != "test" {
t.Errorf("expected 'test', got %q", string(out))
}
if len(logger.debugCalls) != 1 {
t.Errorf("expected 1 debug call, got %d", len(logger.debugCalls))
}
}
func TestCommand_CombinedOutput_Good(t *testing.T) {
logger := &mockLogger{}
ctx := context.Background()
out, err := exec.Command(ctx, "echo", "combined").
WithLogger(logger).
CombinedOutput()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if strings.TrimSpace(string(out)) != "combined" {
t.Errorf("expected 'combined', got %q", string(out))
}
if len(logger.debugCalls) != 1 {
t.Errorf("expected 1 debug call, got %d", len(logger.debugCalls))
}
}
func TestNopLogger(t *testing.T) {
// Verify NopLogger doesn't panic
var nop exec.NopLogger
nop.Debug("msg", "key", "val")
nop.Error("msg", "key", "val")
}
func TestSetDefaultLogger(t *testing.T) {
original := exec.DefaultLogger()
defer exec.SetDefaultLogger(original)
logger := &mockLogger{}
exec.SetDefaultLogger(logger)
if exec.DefaultLogger() != logger {
t.Error("default logger not set correctly")
}
// Test nil resets to NopLogger
exec.SetDefaultLogger(nil)
if _, ok := exec.DefaultLogger().(exec.NopLogger); !ok {
t.Error("expected NopLogger when setting nil")
}
}
func TestCommand_UsesDefaultLogger(t *testing.T) {
original := exec.DefaultLogger()
defer exec.SetDefaultLogger(original)
logger := &mockLogger{}
exec.SetDefaultLogger(logger)
ctx := context.Background()
_ = exec.Command(ctx, "echo", "test").Run()
if len(logger.debugCalls) != 1 {
t.Errorf("expected default logger to receive 1 debug call, got %d", len(logger.debugCalls))
}
}

35
exec/logger.go Normal file
View file

@ -0,0 +1,35 @@
package exec
// Logger interface for command execution logging.
// Compatible with pkg/log.Logger and other structured loggers.
type Logger interface {
// Debug logs a debug-level message with optional key-value pairs.
Debug(msg string, keyvals ...any)
// Error logs an error-level message with optional key-value pairs.
Error(msg string, keyvals ...any)
}
// NopLogger is a no-op logger that discards all messages.
type NopLogger struct{}
// Debug discards the message (no-op implementation).
func (NopLogger) Debug(string, ...any) {}
// Error discards the message (no-op implementation).
func (NopLogger) Error(string, ...any) {}
var defaultLogger Logger = NopLogger{}
// SetDefaultLogger sets the package-level default logger.
// Commands without an explicit logger will use this.
func SetDefaultLogger(l Logger) {
if l == nil {
l = NopLogger{}
}
defaultLogger = l
}
// DefaultLogger returns the current default logger.
func DefaultLogger() Logger {
return defaultLogger
}

298
global_test.go Normal file
View file

@ -0,0 +1,298 @@
package process
import (
"context"
"sync"
"testing"
"forge.lthn.ai/core/go/pkg/framework"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestGlobal_DefaultNotInitialized(t *testing.T) {
// Reset global state for this test
old := defaultService.Swap(nil)
defer func() {
if old != nil {
defaultService.Store(old)
}
}()
assert.Nil(t, Default())
_, err := Start(context.Background(), "echo", "test")
assert.ErrorIs(t, err, ErrServiceNotInitialized)
_, err = Run(context.Background(), "echo", "test")
assert.ErrorIs(t, err, ErrServiceNotInitialized)
_, err = Get("proc-1")
assert.ErrorIs(t, err, ErrServiceNotInitialized)
assert.Nil(t, List())
assert.Nil(t, Running())
err = Kill("proc-1")
assert.ErrorIs(t, err, ErrServiceNotInitialized)
_, err = StartWithOptions(context.Background(), RunOptions{Command: "echo"})
assert.ErrorIs(t, err, ErrServiceNotInitialized)
_, err = RunWithOptions(context.Background(), RunOptions{Command: "echo"})
assert.ErrorIs(t, err, ErrServiceNotInitialized)
}
func TestGlobal_SetDefault(t *testing.T) {
t.Run("sets and retrieves service", func(t *testing.T) {
// Reset global state
old := defaultService.Swap(nil)
defer func() {
if old != nil {
defaultService.Store(old)
}
}()
core, err := framework.New(
framework.WithName("process", NewService(Options{})),
)
require.NoError(t, err)
svc, err := framework.ServiceFor[*Service](core, "process")
require.NoError(t, err)
SetDefault(svc)
assert.Equal(t, svc, Default())
})
t.Run("panics on nil", func(t *testing.T) {
assert.Panics(t, func() {
SetDefault(nil)
})
})
}
func TestGlobal_ConcurrentDefault(t *testing.T) {
// Reset global state
old := defaultService.Swap(nil)
defer func() {
if old != nil {
defaultService.Store(old)
}
}()
core, err := framework.New(
framework.WithName("process", NewService(Options{})),
)
require.NoError(t, err)
svc, err := framework.ServiceFor[*Service](core, "process")
require.NoError(t, err)
SetDefault(svc)
// Concurrent reads of Default()
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
s := Default()
assert.NotNil(t, s)
assert.Equal(t, svc, s)
}()
}
wg.Wait()
}
func TestGlobal_ConcurrentSetDefault(t *testing.T) {
// Reset global state
old := defaultService.Swap(nil)
defer func() {
if old != nil {
defaultService.Store(old)
}
}()
// Create multiple services
var services []*Service
for i := 0; i < 10; i++ {
core, err := framework.New(
framework.WithName("process", NewService(Options{})),
)
require.NoError(t, err)
svc, err := framework.ServiceFor[*Service](core, "process")
require.NoError(t, err)
services = append(services, svc)
}
// Concurrent SetDefault calls - should not panic or race
var wg sync.WaitGroup
for _, svc := range services {
wg.Add(1)
go func(s *Service) {
defer wg.Done()
SetDefault(s)
}(svc)
}
wg.Wait()
// Final state should be one of the services
final := Default()
assert.NotNil(t, final)
found := false
for _, svc := range services {
if svc == final {
found = true
break
}
}
assert.True(t, found, "Default should be one of the set services")
}
func TestGlobal_ConcurrentOperations(t *testing.T) {
// Reset global state
old := defaultService.Swap(nil)
defer func() {
if old != nil {
defaultService.Store(old)
}
}()
core, err := framework.New(
framework.WithName("process", NewService(Options{})),
)
require.NoError(t, err)
svc, err := framework.ServiceFor[*Service](core, "process")
require.NoError(t, err)
SetDefault(svc)
// Concurrent Start, List, Get operations
var wg sync.WaitGroup
var processes []*Process
var procMu sync.Mutex
// Start 20 processes concurrently
for i := 0; i < 20; i++ {
wg.Add(1)
go func() {
defer wg.Done()
proc, err := Start(context.Background(), "echo", "concurrent")
if err == nil {
procMu.Lock()
processes = append(processes, proc)
procMu.Unlock()
}
}()
}
// Concurrent List calls while starting
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
_ = List()
_ = Running()
}()
}
wg.Wait()
// Wait for all processes to complete
procMu.Lock()
for _, p := range processes {
<-p.Done()
}
procMu.Unlock()
// All should have succeeded
assert.Len(t, processes, 20)
// Concurrent Get calls
var wg2 sync.WaitGroup
for _, p := range processes {
wg2.Add(1)
go func(id string) {
defer wg2.Done()
got, err := Get(id)
assert.NoError(t, err)
assert.NotNil(t, got)
}(p.ID)
}
wg2.Wait()
}
func TestGlobal_StartWithOptions(t *testing.T) {
svc, _ := newTestService(t)
// Set as default
old := defaultService.Swap(svc)
defer func() {
if old != nil {
defaultService.Store(old)
}
}()
proc, err := StartWithOptions(context.Background(), RunOptions{
Command: "echo",
Args: []string{"with", "options"},
})
require.NoError(t, err)
<-proc.Done()
assert.Equal(t, 0, proc.ExitCode)
assert.Contains(t, proc.Output(), "with options")
}
func TestGlobal_RunWithOptions(t *testing.T) {
svc, _ := newTestService(t)
// Set as default
old := defaultService.Swap(svc)
defer func() {
if old != nil {
defaultService.Store(old)
}
}()
output, err := RunWithOptions(context.Background(), RunOptions{
Command: "echo",
Args: []string{"run", "options"},
})
require.NoError(t, err)
assert.Contains(t, output, "run options")
}
func TestGlobal_Running(t *testing.T) {
svc, _ := newTestService(t)
// Set as default
old := defaultService.Swap(svc)
defer func() {
if old != nil {
defaultService.Store(old)
}
}()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Start a long-running process
proc, err := Start(ctx, "sleep", "60")
require.NoError(t, err)
running := Running()
assert.Len(t, running, 1)
assert.Equal(t, proc.ID, running[0].ID)
cancel()
<-proc.Done()
running = Running()
assert.Len(t, running, 0)
}

15
go.mod Normal file
View file

@ -0,0 +1,15 @@
module forge.lthn.ai/core/go-process
go 1.26.0
require (
forge.lthn.ai/core/go v0.1.0
github.com/stretchr/testify v1.11.1
)
require (
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

22
go.sum Normal file
View file

@ -0,0 +1,22 @@
forge.lthn.ai/core/go v0.1.0 h1:Ow/1NTajrrNPO0zgkskEyEGdx4SKpiNqTaqM0txNOYI=
forge.lthn.ai/core/go v0.1.0/go.mod h1:lwi0tccAlg5j3k6CfoNJEueBc5l9mUeSBX/x6uY8ZbQ=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ=
github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

167
process.go Normal file
View file

@ -0,0 +1,167 @@
package process
import (
"context"
"io"
"os/exec"
"sync"
"time"
)
// Process represents a managed external process.
type Process struct {
ID string
Command string
Args []string
Dir string
Env []string
StartedAt time.Time
Status Status
ExitCode int
Duration time.Duration
cmd *exec.Cmd
ctx context.Context
cancel context.CancelFunc
output *RingBuffer
stdin io.WriteCloser
done chan struct{}
mu sync.RWMutex
}
// Info returns a snapshot of process state.
func (p *Process) Info() Info {
p.mu.RLock()
defer p.mu.RUnlock()
pid := 0
if p.cmd != nil && p.cmd.Process != nil {
pid = p.cmd.Process.Pid
}
return Info{
ID: p.ID,
Command: p.Command,
Args: p.Args,
Dir: p.Dir,
StartedAt: p.StartedAt,
Status: p.Status,
ExitCode: p.ExitCode,
Duration: p.Duration,
PID: pid,
}
}
// Output returns the captured output as a string.
func (p *Process) Output() string {
p.mu.RLock()
defer p.mu.RUnlock()
if p.output == nil {
return ""
}
return p.output.String()
}
// OutputBytes returns the captured output as bytes.
func (p *Process) OutputBytes() []byte {
p.mu.RLock()
defer p.mu.RUnlock()
if p.output == nil {
return nil
}
return p.output.Bytes()
}
// IsRunning returns true if the process is still executing.
func (p *Process) IsRunning() bool {
p.mu.RLock()
defer p.mu.RUnlock()
return p.Status == StatusRunning
}
// Wait blocks until the process exits.
func (p *Process) Wait() error {
<-p.done
p.mu.RLock()
defer p.mu.RUnlock()
if p.Status == StatusFailed || p.Status == StatusKilled {
return &exec.ExitError{}
}
if p.ExitCode != 0 {
return &exec.ExitError{}
}
return nil
}
// Done returns a channel that closes when the process exits.
func (p *Process) Done() <-chan struct{} {
return p.done
}
// Kill forcefully terminates the process.
func (p *Process) Kill() error {
p.mu.Lock()
defer p.mu.Unlock()
if p.Status != StatusRunning {
return nil
}
if p.cmd == nil || p.cmd.Process == nil {
return nil
}
return p.cmd.Process.Kill()
}
// Signal sends a signal to the process.
func (p *Process) Signal(sig interface{ Signal() }) error {
p.mu.Lock()
defer p.mu.Unlock()
if p.Status != StatusRunning {
return nil
}
if p.cmd == nil || p.cmd.Process == nil {
return nil
}
// Type assert to os.Signal for Process.Signal
if osSig, ok := sig.(interface{ String() string }); ok {
_ = osSig // Satisfy linter
}
return p.cmd.Process.Kill() // Simplified - would use Signal in full impl
}
// SendInput writes to the process stdin.
func (p *Process) SendInput(input string) error {
p.mu.RLock()
defer p.mu.RUnlock()
if p.Status != StatusRunning {
return ErrProcessNotRunning
}
if p.stdin == nil {
return ErrStdinNotAvailable
}
_, err := p.stdin.Write([]byte(input))
return err
}
// CloseStdin closes the process stdin pipe.
func (p *Process) CloseStdin() error {
p.mu.Lock()
defer p.mu.Unlock()
if p.stdin == nil {
return nil
}
err := p.stdin.Close()
p.stdin = nil
return err
}

133
process_global.go Normal file
View file

@ -0,0 +1,133 @@
package process
import (
"context"
"sync"
"sync/atomic"
"forge.lthn.ai/core/go/pkg/framework"
)
// Global default service (follows i18n pattern).
var (
defaultService atomic.Pointer[Service]
defaultOnce sync.Once
defaultErr error
)
// Default returns the global process service.
// Returns nil if not initialized.
func Default() *Service {
return defaultService.Load()
}
// SetDefault sets the global process service.
// Thread-safe: can be called concurrently with Default().
func SetDefault(s *Service) {
if s == nil {
panic("process: SetDefault called with nil service")
}
defaultService.Store(s)
}
// Init initializes the default global service with a Core instance.
// This is typically called during application startup.
func Init(c *framework.Core) error {
defaultOnce.Do(func() {
factory := NewService(Options{})
svc, err := factory(c)
if err != nil {
defaultErr = err
return
}
defaultService.Store(svc.(*Service))
})
return defaultErr
}
// --- Global convenience functions ---
// Start spawns a new process using the default service.
func Start(ctx context.Context, command string, args ...string) (*Process, error) {
svc := Default()
if svc == nil {
return nil, ErrServiceNotInitialized
}
return svc.Start(ctx, command, args...)
}
// Run executes a command and waits for completion using the default service.
func Run(ctx context.Context, command string, args ...string) (string, error) {
svc := Default()
if svc == nil {
return "", ErrServiceNotInitialized
}
return svc.Run(ctx, command, args...)
}
// Get returns a process by ID from the default service.
func Get(id string) (*Process, error) {
svc := Default()
if svc == nil {
return nil, ErrServiceNotInitialized
}
return svc.Get(id)
}
// List returns all processes from the default service.
func List() []*Process {
svc := Default()
if svc == nil {
return nil
}
return svc.List()
}
// Kill terminates a process by ID using the default service.
func Kill(id string) error {
svc := Default()
if svc == nil {
return ErrServiceNotInitialized
}
return svc.Kill(id)
}
// StartWithOptions spawns a process with full configuration using the default service.
func StartWithOptions(ctx context.Context, opts RunOptions) (*Process, error) {
svc := Default()
if svc == nil {
return nil, ErrServiceNotInitialized
}
return svc.StartWithOptions(ctx, opts)
}
// RunWithOptions executes a command with options and waits using the default service.
func RunWithOptions(ctx context.Context, opts RunOptions) (string, error) {
svc := Default()
if svc == nil {
return "", ErrServiceNotInitialized
}
return svc.RunWithOptions(ctx, opts)
}
// Running returns all currently running processes from the default service.
func Running() []*Process {
svc := Default()
if svc == nil {
return nil
}
return svc.Running()
}
// ErrServiceNotInitialized is returned when the service is not initialized.
var ErrServiceNotInitialized = &ServiceError{msg: "process: service not initialized"}
// ServiceError represents a service-level error.
type ServiceError struct {
msg string
}
// Error returns the service error message.
func (e *ServiceError) Error() string {
return e.msg
}

227
process_test.go Normal file
View file

@ -0,0 +1,227 @@
package process
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestProcess_Info(t *testing.T) {
svc, _ := newTestService(t)
proc, err := svc.Start(context.Background(), "echo", "hello")
require.NoError(t, err)
<-proc.Done()
info := proc.Info()
assert.Equal(t, proc.ID, info.ID)
assert.Equal(t, "echo", info.Command)
assert.Equal(t, []string{"hello"}, info.Args)
assert.Equal(t, StatusExited, info.Status)
assert.Equal(t, 0, info.ExitCode)
assert.Greater(t, info.Duration, time.Duration(0))
}
func TestProcess_Output(t *testing.T) {
t.Run("captures stdout", func(t *testing.T) {
svc, _ := newTestService(t)
proc, err := svc.Start(context.Background(), "echo", "hello world")
require.NoError(t, err)
<-proc.Done()
output := proc.Output()
assert.Contains(t, output, "hello world")
})
t.Run("OutputBytes returns copy", func(t *testing.T) {
svc, _ := newTestService(t)
proc, err := svc.Start(context.Background(), "echo", "test")
require.NoError(t, err)
<-proc.Done()
bytes := proc.OutputBytes()
assert.NotNil(t, bytes)
assert.Contains(t, string(bytes), "test")
})
}
func TestProcess_IsRunning(t *testing.T) {
t.Run("true while running", func(t *testing.T) {
svc, _ := newTestService(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
proc, err := svc.Start(ctx, "sleep", "10")
require.NoError(t, err)
assert.True(t, proc.IsRunning())
cancel()
<-proc.Done()
assert.False(t, proc.IsRunning())
})
t.Run("false after completion", func(t *testing.T) {
svc, _ := newTestService(t)
proc, err := svc.Start(context.Background(), "echo", "done")
require.NoError(t, err)
<-proc.Done()
assert.False(t, proc.IsRunning())
})
}
func TestProcess_Wait(t *testing.T) {
t.Run("returns nil on success", func(t *testing.T) {
svc, _ := newTestService(t)
proc, err := svc.Start(context.Background(), "echo", "ok")
require.NoError(t, err)
err = proc.Wait()
assert.NoError(t, err)
})
t.Run("returns error on failure", func(t *testing.T) {
svc, _ := newTestService(t)
proc, err := svc.Start(context.Background(), "sh", "-c", "exit 1")
require.NoError(t, err)
err = proc.Wait()
assert.Error(t, err)
})
}
func TestProcess_Done(t *testing.T) {
t.Run("channel closes on completion", func(t *testing.T) {
svc, _ := newTestService(t)
proc, err := svc.Start(context.Background(), "echo", "test")
require.NoError(t, err)
select {
case <-proc.Done():
// Success - channel closed
case <-time.After(5 * time.Second):
t.Fatal("Done channel should have closed")
}
})
}
func TestProcess_Kill(t *testing.T) {
t.Run("terminates running process", func(t *testing.T) {
svc, _ := newTestService(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
proc, err := svc.Start(ctx, "sleep", "60")
require.NoError(t, err)
assert.True(t, proc.IsRunning())
err = proc.Kill()
assert.NoError(t, err)
select {
case <-proc.Done():
// Good - process terminated
case <-time.After(2 * time.Second):
t.Fatal("process should have been killed")
}
})
t.Run("noop on completed process", func(t *testing.T) {
svc, _ := newTestService(t)
proc, err := svc.Start(context.Background(), "echo", "done")
require.NoError(t, err)
<-proc.Done()
err = proc.Kill()
assert.NoError(t, err)
})
}
func TestProcess_SendInput(t *testing.T) {
t.Run("writes to stdin", func(t *testing.T) {
svc, _ := newTestService(t)
// Use cat to echo back stdin
proc, err := svc.Start(context.Background(), "cat")
require.NoError(t, err)
err = proc.SendInput("hello\n")
assert.NoError(t, err)
err = proc.CloseStdin()
assert.NoError(t, err)
<-proc.Done()
assert.Contains(t, proc.Output(), "hello")
})
t.Run("error on completed process", func(t *testing.T) {
svc, _ := newTestService(t)
proc, err := svc.Start(context.Background(), "echo", "done")
require.NoError(t, err)
<-proc.Done()
err = proc.SendInput("test")
assert.ErrorIs(t, err, ErrProcessNotRunning)
})
}
func TestProcess_CloseStdin(t *testing.T) {
t.Run("closes stdin pipe", func(t *testing.T) {
svc, _ := newTestService(t)
proc, err := svc.Start(context.Background(), "cat")
require.NoError(t, err)
err = proc.CloseStdin()
assert.NoError(t, err)
// Process should exit now that stdin is closed
select {
case <-proc.Done():
// Good
case <-time.After(2 * time.Second):
t.Fatal("cat should exit when stdin is closed")
}
})
t.Run("double close is safe", func(t *testing.T) {
svc, _ := newTestService(t)
proc, err := svc.Start(context.Background(), "cat")
require.NoError(t, err)
// First close
err = proc.CloseStdin()
assert.NoError(t, err)
<-proc.Done()
// Second close should be safe (stdin already nil)
err = proc.CloseStdin()
assert.NoError(t, err)
})
}

293
runner.go Normal file
View file

@ -0,0 +1,293 @@
package process
import (
"context"
"errors"
"sync"
"time"
)
// Runner orchestrates multiple processes with dependencies.
type Runner struct {
service *Service
}
// NewRunner creates a runner for the given service.
func NewRunner(svc *Service) *Runner {
return &Runner{service: svc}
}
// RunSpec defines a process to run with optional dependencies.
type RunSpec struct {
// Name is a friendly identifier (e.g., "lint", "test").
Name string
// Command is the executable to run.
Command string
// Args are the command arguments.
Args []string
// Dir is the working directory.
Dir string
// Env are additional environment variables.
Env []string
// After lists spec names that must complete successfully first.
After []string
// AllowFailure if true, continues pipeline even if this spec fails.
AllowFailure bool
}
// RunResult captures the outcome of a single process.
type RunResult struct {
Name string
Spec RunSpec
ExitCode int
Duration time.Duration
Output string
Error error
Skipped bool
}
// Passed returns true if the process succeeded.
func (r RunResult) Passed() bool {
return !r.Skipped && r.Error == nil && r.ExitCode == 0
}
// RunAllResult is the aggregate result of running multiple specs.
type RunAllResult struct {
Results []RunResult
Duration time.Duration
Passed int
Failed int
Skipped int
}
// Success returns true if all non-skipped specs passed.
func (r RunAllResult) Success() bool {
return r.Failed == 0
}
// RunAll executes specs respecting dependencies, parallelising where possible.
func (r *Runner) RunAll(ctx context.Context, specs []RunSpec) (*RunAllResult, error) {
start := time.Now()
// Build dependency graph
specMap := make(map[string]RunSpec)
for _, spec := range specs {
specMap[spec.Name] = spec
}
// Track completion
completed := make(map[string]*RunResult)
var completedMu sync.Mutex
results := make([]RunResult, 0, len(specs))
var resultsMu sync.Mutex
// Process specs in waves
remaining := make(map[string]RunSpec)
for _, spec := range specs {
remaining[spec.Name] = spec
}
for len(remaining) > 0 {
// Find specs ready to run (all dependencies satisfied)
ready := make([]RunSpec, 0)
for _, spec := range remaining {
if r.canRun(spec, completed) {
ready = append(ready, spec)
}
}
if len(ready) == 0 && len(remaining) > 0 {
// Deadlock - circular dependency or missing specs
for name := range remaining {
results = append(results, RunResult{
Name: name,
Spec: remaining[name],
Skipped: true,
Error: errors.New("circular dependency or missing dependency"),
})
}
break
}
// Run ready specs in parallel
var wg sync.WaitGroup
for _, spec := range ready {
wg.Add(1)
go func(spec RunSpec) {
defer wg.Done()
// Check if dependencies failed
completedMu.Lock()
shouldSkip := false
for _, dep := range spec.After {
if result, ok := completed[dep]; ok {
if !result.Passed() && !specMap[dep].AllowFailure {
shouldSkip = true
break
}
}
}
completedMu.Unlock()
var result RunResult
if shouldSkip {
result = RunResult{
Name: spec.Name,
Spec: spec,
Skipped: true,
Error: errors.New("skipped due to dependency failure"),
}
} else {
result = r.runSpec(ctx, spec)
}
completedMu.Lock()
completed[spec.Name] = &result
completedMu.Unlock()
resultsMu.Lock()
results = append(results, result)
resultsMu.Unlock()
}(spec)
}
wg.Wait()
// Remove completed from remaining
for _, spec := range ready {
delete(remaining, spec.Name)
}
}
// Build aggregate result
aggResult := &RunAllResult{
Results: results,
Duration: time.Since(start),
}
for _, res := range results {
if res.Skipped {
aggResult.Skipped++
} else if res.Passed() {
aggResult.Passed++
} else {
aggResult.Failed++
}
}
return aggResult, nil
}
// canRun checks if all dependencies are completed.
func (r *Runner) canRun(spec RunSpec, completed map[string]*RunResult) bool {
for _, dep := range spec.After {
if _, ok := completed[dep]; !ok {
return false
}
}
return true
}
// runSpec executes a single spec.
func (r *Runner) runSpec(ctx context.Context, spec RunSpec) RunResult {
start := time.Now()
proc, err := r.service.StartWithOptions(ctx, RunOptions{
Command: spec.Command,
Args: spec.Args,
Dir: spec.Dir,
Env: spec.Env,
})
if err != nil {
return RunResult{
Name: spec.Name,
Spec: spec,
Duration: time.Since(start),
Error: err,
}
}
<-proc.Done()
return RunResult{
Name: spec.Name,
Spec: spec,
ExitCode: proc.ExitCode,
Duration: proc.Duration,
Output: proc.Output(),
Error: nil,
}
}
// RunSequential executes specs one after another, stopping on first failure.
func (r *Runner) RunSequential(ctx context.Context, specs []RunSpec) (*RunAllResult, error) {
start := time.Now()
results := make([]RunResult, 0, len(specs))
for _, spec := range specs {
result := r.runSpec(ctx, spec)
results = append(results, result)
if !result.Passed() && !spec.AllowFailure {
// Mark remaining as skipped
for i := len(results); i < len(specs); i++ {
results = append(results, RunResult{
Name: specs[i].Name,
Spec: specs[i],
Skipped: true,
})
}
break
}
}
aggResult := &RunAllResult{
Results: results,
Duration: time.Since(start),
}
for _, res := range results {
if res.Skipped {
aggResult.Skipped++
} else if res.Passed() {
aggResult.Passed++
} else {
aggResult.Failed++
}
}
return aggResult, nil
}
// RunParallel executes all specs concurrently, regardless of dependencies.
func (r *Runner) RunParallel(ctx context.Context, specs []RunSpec) (*RunAllResult, error) {
start := time.Now()
results := make([]RunResult, len(specs))
var wg sync.WaitGroup
for i, spec := range specs {
wg.Add(1)
go func(i int, spec RunSpec) {
defer wg.Done()
results[i] = r.runSpec(ctx, spec)
}(i, spec)
}
wg.Wait()
aggResult := &RunAllResult{
Results: results,
Duration: time.Since(start),
}
for _, res := range results {
if res.Skipped {
aggResult.Skipped++
} else if res.Passed() {
aggResult.Passed++
} else {
aggResult.Failed++
}
}
return aggResult, nil
}

176
runner_test.go Normal file
View file

@ -0,0 +1,176 @@
package process
import (
"context"
"testing"
"forge.lthn.ai/core/go/pkg/framework"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func newTestRunner(t *testing.T) *Runner {
t.Helper()
core, err := framework.New(
framework.WithName("process", NewService(Options{})),
)
require.NoError(t, err)
svc, err := framework.ServiceFor[*Service](core, "process")
require.NoError(t, err)
return NewRunner(svc)
}
func TestRunner_RunSequential(t *testing.T) {
t.Run("all pass", func(t *testing.T) {
runner := newTestRunner(t)
result, err := runner.RunSequential(context.Background(), []RunSpec{
{Name: "first", Command: "echo", Args: []string{"1"}},
{Name: "second", Command: "echo", Args: []string{"2"}},
{Name: "third", Command: "echo", Args: []string{"3"}},
})
require.NoError(t, err)
assert.True(t, result.Success())
assert.Equal(t, 3, result.Passed)
assert.Equal(t, 0, result.Failed)
assert.Equal(t, 0, result.Skipped)
})
t.Run("stops on failure", func(t *testing.T) {
runner := newTestRunner(t)
result, err := runner.RunSequential(context.Background(), []RunSpec{
{Name: "first", Command: "echo", Args: []string{"1"}},
{Name: "fails", Command: "sh", Args: []string{"-c", "exit 1"}},
{Name: "third", Command: "echo", Args: []string{"3"}},
})
require.NoError(t, err)
assert.False(t, result.Success())
assert.Equal(t, 1, result.Passed)
assert.Equal(t, 1, result.Failed)
assert.Equal(t, 1, result.Skipped)
})
t.Run("allow failure continues", func(t *testing.T) {
runner := newTestRunner(t)
result, err := runner.RunSequential(context.Background(), []RunSpec{
{Name: "first", Command: "echo", Args: []string{"1"}},
{Name: "fails", Command: "sh", Args: []string{"-c", "exit 1"}, AllowFailure: true},
{Name: "third", Command: "echo", Args: []string{"3"}},
})
require.NoError(t, err)
// Still counts as failed but pipeline continues
assert.Equal(t, 2, result.Passed)
assert.Equal(t, 1, result.Failed)
assert.Equal(t, 0, result.Skipped)
})
}
func TestRunner_RunParallel(t *testing.T) {
t.Run("all run concurrently", func(t *testing.T) {
runner := newTestRunner(t)
result, err := runner.RunParallel(context.Background(), []RunSpec{
{Name: "first", Command: "echo", Args: []string{"1"}},
{Name: "second", Command: "echo", Args: []string{"2"}},
{Name: "third", Command: "echo", Args: []string{"3"}},
})
require.NoError(t, err)
assert.True(t, result.Success())
assert.Equal(t, 3, result.Passed)
assert.Len(t, result.Results, 3)
})
t.Run("failure doesnt stop others", func(t *testing.T) {
runner := newTestRunner(t)
result, err := runner.RunParallel(context.Background(), []RunSpec{
{Name: "first", Command: "echo", Args: []string{"1"}},
{Name: "fails", Command: "sh", Args: []string{"-c", "exit 1"}},
{Name: "third", Command: "echo", Args: []string{"3"}},
})
require.NoError(t, err)
assert.False(t, result.Success())
assert.Equal(t, 2, result.Passed)
assert.Equal(t, 1, result.Failed)
})
}
func TestRunner_RunAll(t *testing.T) {
t.Run("respects dependencies", func(t *testing.T) {
runner := newTestRunner(t)
result, err := runner.RunAll(context.Background(), []RunSpec{
{Name: "third", Command: "echo", Args: []string{"3"}, After: []string{"second"}},
{Name: "first", Command: "echo", Args: []string{"1"}},
{Name: "second", Command: "echo", Args: []string{"2"}, After: []string{"first"}},
})
require.NoError(t, err)
assert.True(t, result.Success())
assert.Equal(t, 3, result.Passed)
})
t.Run("skips dependents on failure", func(t *testing.T) {
runner := newTestRunner(t)
result, err := runner.RunAll(context.Background(), []RunSpec{
{Name: "first", Command: "sh", Args: []string{"-c", "exit 1"}},
{Name: "second", Command: "echo", Args: []string{"2"}, After: []string{"first"}},
{Name: "third", Command: "echo", Args: []string{"3"}, After: []string{"second"}},
})
require.NoError(t, err)
assert.False(t, result.Success())
assert.Equal(t, 0, result.Passed)
assert.Equal(t, 1, result.Failed)
assert.Equal(t, 2, result.Skipped)
})
t.Run("parallel independent specs", func(t *testing.T) {
runner := newTestRunner(t)
// These should run in parallel since they have no dependencies
result, err := runner.RunAll(context.Background(), []RunSpec{
{Name: "a", Command: "echo", Args: []string{"a"}},
{Name: "b", Command: "echo", Args: []string{"b"}},
{Name: "c", Command: "echo", Args: []string{"c"}},
{Name: "final", Command: "echo", Args: []string{"done"}, After: []string{"a", "b", "c"}},
})
require.NoError(t, err)
assert.True(t, result.Success())
assert.Equal(t, 4, result.Passed)
})
}
func TestRunResult_Passed(t *testing.T) {
t.Run("success", func(t *testing.T) {
r := RunResult{ExitCode: 0}
assert.True(t, r.Passed())
})
t.Run("non-zero exit", func(t *testing.T) {
r := RunResult{ExitCode: 1}
assert.False(t, r.Passed())
})
t.Run("skipped", func(t *testing.T) {
r := RunResult{ExitCode: 0, Skipped: true}
assert.False(t, r.Passed())
})
t.Run("error", func(t *testing.T) {
r := RunResult{ExitCode: 0, Error: assert.AnError}
assert.False(t, r.Passed())
})
}

378
service.go Normal file
View file

@ -0,0 +1,378 @@
package process
import (
"bufio"
"context"
"errors"
"fmt"
"io"
"os/exec"
"sync"
"sync/atomic"
"time"
"forge.lthn.ai/core/go/pkg/framework"
)
// Default buffer size for process output (1MB).
const DefaultBufferSize = 1024 * 1024
// Errors
var (
ErrProcessNotFound = errors.New("process not found")
ErrProcessNotRunning = errors.New("process is not running")
ErrStdinNotAvailable = errors.New("stdin not available")
)
// Service manages process execution with Core IPC integration.
type Service struct {
*framework.ServiceRuntime[Options]
processes map[string]*Process
mu sync.RWMutex
bufSize int
idCounter atomic.Uint64
}
// Options configures the process service.
type Options struct {
// BufferSize is the ring buffer size for output capture.
// Default: 1MB (1024 * 1024 bytes).
BufferSize int
}
// NewService creates a process service factory for Core registration.
//
// core, _ := framework.New(
// framework.WithName("process", process.NewService(process.Options{})),
// )
func NewService(opts Options) func(*framework.Core) (any, error) {
return func(c *framework.Core) (any, error) {
if opts.BufferSize == 0 {
opts.BufferSize = DefaultBufferSize
}
svc := &Service{
ServiceRuntime: framework.NewServiceRuntime(c, opts),
processes: make(map[string]*Process),
bufSize: opts.BufferSize,
}
return svc, nil
}
}
// OnStartup implements framework.Startable.
func (s *Service) OnStartup(ctx context.Context) error {
return nil
}
// OnShutdown implements framework.Stoppable.
// Kills all running processes on shutdown.
func (s *Service) OnShutdown(ctx context.Context) error {
s.mu.RLock()
procs := make([]*Process, 0, len(s.processes))
for _, p := range s.processes {
if p.IsRunning() {
procs = append(procs, p)
}
}
s.mu.RUnlock()
for _, p := range procs {
_ = p.Kill()
}
return nil
}
// Start spawns a new process with the given command and args.
func (s *Service) Start(ctx context.Context, command string, args ...string) (*Process, error) {
return s.StartWithOptions(ctx, RunOptions{
Command: command,
Args: args,
})
}
// StartWithOptions spawns a process with full configuration.
func (s *Service) StartWithOptions(ctx context.Context, opts RunOptions) (*Process, error) {
id := fmt.Sprintf("proc-%d", s.idCounter.Add(1))
procCtx, cancel := context.WithCancel(ctx)
cmd := exec.CommandContext(procCtx, opts.Command, opts.Args...)
if opts.Dir != "" {
cmd.Dir = opts.Dir
}
if len(opts.Env) > 0 {
cmd.Env = append(cmd.Environ(), opts.Env...)
}
// Set up pipes
stdout, err := cmd.StdoutPipe()
if err != nil {
cancel()
return nil, fmt.Errorf("failed to create stdout pipe: %w", err)
}
stderr, err := cmd.StderrPipe()
if err != nil {
cancel()
return nil, fmt.Errorf("failed to create stderr pipe: %w", err)
}
stdin, err := cmd.StdinPipe()
if err != nil {
cancel()
return nil, fmt.Errorf("failed to create stdin pipe: %w", err)
}
// Create output buffer (enabled by default)
var output *RingBuffer
if !opts.DisableCapture {
output = NewRingBuffer(s.bufSize)
}
proc := &Process{
ID: id,
Command: opts.Command,
Args: opts.Args,
Dir: opts.Dir,
Env: opts.Env,
StartedAt: time.Now(),
Status: StatusRunning,
cmd: cmd,
ctx: procCtx,
cancel: cancel,
output: output,
stdin: stdin,
done: make(chan struct{}),
}
// Start the process
if err := cmd.Start(); err != nil {
cancel()
return nil, fmt.Errorf("failed to start process: %w", err)
}
// Store process
s.mu.Lock()
s.processes[id] = proc
s.mu.Unlock()
// Broadcast start
_ = s.Core().ACTION(ActionProcessStarted{
ID: id,
Command: opts.Command,
Args: opts.Args,
Dir: opts.Dir,
PID: cmd.Process.Pid,
})
// Stream output in goroutines
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
s.streamOutput(proc, stdout, StreamStdout)
}()
go func() {
defer wg.Done()
s.streamOutput(proc, stderr, StreamStderr)
}()
// Wait for process completion
go func() {
// Wait for output streaming to complete
wg.Wait()
// Wait for process exit
err := cmd.Wait()
duration := time.Since(proc.StartedAt)
proc.mu.Lock()
proc.Duration = duration
if err != nil {
var exitErr *exec.ExitError
if errors.As(err, &exitErr) {
proc.ExitCode = exitErr.ExitCode()
proc.Status = StatusExited
} else {
proc.Status = StatusFailed
}
} else {
proc.ExitCode = 0
proc.Status = StatusExited
}
status := proc.Status
exitCode := proc.ExitCode
proc.mu.Unlock()
close(proc.done)
// Broadcast exit
var exitErr error
if status == StatusFailed {
exitErr = err
}
_ = s.Core().ACTION(ActionProcessExited{
ID: id,
ExitCode: exitCode,
Duration: duration,
Error: exitErr,
})
}()
return proc, nil
}
// streamOutput reads from a pipe and broadcasts lines via ACTION.
func (s *Service) streamOutput(proc *Process, r io.Reader, stream Stream) {
scanner := bufio.NewScanner(r)
// Increase buffer for long lines
scanner.Buffer(make([]byte, 64*1024), 1024*1024)
for scanner.Scan() {
line := scanner.Text()
// Write to ring buffer
if proc.output != nil {
_, _ = proc.output.Write([]byte(line + "\n"))
}
// Broadcast output
_ = s.Core().ACTION(ActionProcessOutput{
ID: proc.ID,
Line: line,
Stream: stream,
})
}
}
// Get returns a process by ID.
func (s *Service) Get(id string) (*Process, error) {
s.mu.RLock()
defer s.mu.RUnlock()
proc, ok := s.processes[id]
if !ok {
return nil, ErrProcessNotFound
}
return proc, nil
}
// List returns all processes.
func (s *Service) List() []*Process {
s.mu.RLock()
defer s.mu.RUnlock()
result := make([]*Process, 0, len(s.processes))
for _, p := range s.processes {
result = append(result, p)
}
return result
}
// Running returns all currently running processes.
func (s *Service) Running() []*Process {
s.mu.RLock()
defer s.mu.RUnlock()
var result []*Process
for _, p := range s.processes {
if p.IsRunning() {
result = append(result, p)
}
}
return result
}
// Kill terminates a process by ID.
func (s *Service) Kill(id string) error {
proc, err := s.Get(id)
if err != nil {
return err
}
if err := proc.Kill(); err != nil {
return err
}
_ = s.Core().ACTION(ActionProcessKilled{
ID: id,
Signal: "SIGKILL",
})
return nil
}
// Remove removes a completed process from the list.
func (s *Service) Remove(id string) error {
s.mu.Lock()
defer s.mu.Unlock()
proc, ok := s.processes[id]
if !ok {
return ErrProcessNotFound
}
if proc.IsRunning() {
return errors.New("cannot remove running process")
}
delete(s.processes, id)
return nil
}
// Clear removes all completed processes.
func (s *Service) Clear() {
s.mu.Lock()
defer s.mu.Unlock()
for id, p := range s.processes {
if !p.IsRunning() {
delete(s.processes, id)
}
}
}
// Output returns the captured output of a process.
func (s *Service) Output(id string) (string, error) {
proc, err := s.Get(id)
if err != nil {
return "", err
}
return proc.Output(), nil
}
// Run executes a command and waits for completion.
// Returns the combined output and any error.
func (s *Service) Run(ctx context.Context, command string, args ...string) (string, error) {
proc, err := s.Start(ctx, command, args...)
if err != nil {
return "", err
}
<-proc.Done()
output := proc.Output()
if proc.ExitCode != 0 {
return output, fmt.Errorf("process exited with code %d", proc.ExitCode)
}
return output, nil
}
// RunWithOptions executes a command with options and waits for completion.
func (s *Service) RunWithOptions(ctx context.Context, opts RunOptions) (string, error) {
proc, err := s.StartWithOptions(ctx, opts)
if err != nil {
return "", err
}
<-proc.Done()
output := proc.Output()
if proc.ExitCode != 0 {
return output, fmt.Errorf("process exited with code %d", proc.ExitCode)
}
return output, nil
}

257
service_test.go Normal file
View file

@ -0,0 +1,257 @@
package process
import (
"context"
"strings"
"sync"
"testing"
"time"
"forge.lthn.ai/core/go/pkg/framework"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func newTestService(t *testing.T) (*Service, *framework.Core) {
t.Helper()
core, err := framework.New(
framework.WithName("process", NewService(Options{BufferSize: 1024})),
)
require.NoError(t, err)
svc, err := framework.ServiceFor[*Service](core, "process")
require.NoError(t, err)
return svc, core
}
func TestService_Start(t *testing.T) {
t.Run("echo command", func(t *testing.T) {
svc, _ := newTestService(t)
proc, err := svc.Start(context.Background(), "echo", "hello")
require.NoError(t, err)
require.NotNil(t, proc)
assert.NotEmpty(t, proc.ID)
assert.Equal(t, "echo", proc.Command)
assert.Equal(t, []string{"hello"}, proc.Args)
// Wait for completion
<-proc.Done()
assert.Equal(t, StatusExited, proc.Status)
assert.Equal(t, 0, proc.ExitCode)
assert.Contains(t, proc.Output(), "hello")
})
t.Run("failing command", func(t *testing.T) {
svc, _ := newTestService(t)
proc, err := svc.Start(context.Background(), "sh", "-c", "exit 42")
require.NoError(t, err)
<-proc.Done()
assert.Equal(t, StatusExited, proc.Status)
assert.Equal(t, 42, proc.ExitCode)
})
t.Run("non-existent command", func(t *testing.T) {
svc, _ := newTestService(t)
_, err := svc.Start(context.Background(), "nonexistent_command_xyz")
assert.Error(t, err)
})
t.Run("with working directory", func(t *testing.T) {
svc, _ := newTestService(t)
proc, err := svc.StartWithOptions(context.Background(), RunOptions{
Command: "pwd",
Dir: "/tmp",
})
require.NoError(t, err)
<-proc.Done()
// On macOS /tmp is a symlink to /private/tmp
output := strings.TrimSpace(proc.Output())
assert.True(t, output == "/tmp" || output == "/private/tmp", "got: %s", output)
})
t.Run("context cancellation", func(t *testing.T) {
svc, _ := newTestService(t)
ctx, cancel := context.WithCancel(context.Background())
proc, err := svc.Start(ctx, "sleep", "10")
require.NoError(t, err)
// Cancel immediately
cancel()
select {
case <-proc.Done():
// Good - process was killed
case <-time.After(2 * time.Second):
t.Fatal("process should have been killed")
}
})
}
func TestService_Run(t *testing.T) {
t.Run("returns output", func(t *testing.T) {
svc, _ := newTestService(t)
output, err := svc.Run(context.Background(), "echo", "hello world")
require.NoError(t, err)
assert.Contains(t, output, "hello world")
})
t.Run("returns error on failure", func(t *testing.T) {
svc, _ := newTestService(t)
_, err := svc.Run(context.Background(), "sh", "-c", "exit 1")
assert.Error(t, err)
assert.Contains(t, err.Error(), "exited with code 1")
})
}
func TestService_Actions(t *testing.T) {
t.Run("broadcasts events", func(t *testing.T) {
core, err := framework.New(
framework.WithName("process", NewService(Options{})),
)
require.NoError(t, err)
var started []ActionProcessStarted
var outputs []ActionProcessOutput
var exited []ActionProcessExited
var mu sync.Mutex
core.RegisterAction(func(c *framework.Core, msg framework.Message) error {
mu.Lock()
defer mu.Unlock()
switch m := msg.(type) {
case ActionProcessStarted:
started = append(started, m)
case ActionProcessOutput:
outputs = append(outputs, m)
case ActionProcessExited:
exited = append(exited, m)
}
return nil
})
svc, _ := framework.ServiceFor[*Service](core, "process")
proc, err := svc.Start(context.Background(), "echo", "test")
require.NoError(t, err)
<-proc.Done()
// Give time for events to propagate
time.Sleep(10 * time.Millisecond)
mu.Lock()
defer mu.Unlock()
assert.Len(t, started, 1)
assert.Equal(t, "echo", started[0].Command)
assert.Equal(t, []string{"test"}, started[0].Args)
assert.NotEmpty(t, outputs)
foundTest := false
for _, o := range outputs {
if strings.Contains(o.Line, "test") {
foundTest = true
break
}
}
assert.True(t, foundTest, "should have output containing 'test'")
assert.Len(t, exited, 1)
assert.Equal(t, 0, exited[0].ExitCode)
})
}
func TestService_List(t *testing.T) {
t.Run("tracks processes", func(t *testing.T) {
svc, _ := newTestService(t)
proc1, _ := svc.Start(context.Background(), "echo", "1")
proc2, _ := svc.Start(context.Background(), "echo", "2")
<-proc1.Done()
<-proc2.Done()
list := svc.List()
assert.Len(t, list, 2)
})
t.Run("get by id", func(t *testing.T) {
svc, _ := newTestService(t)
proc, _ := svc.Start(context.Background(), "echo", "test")
<-proc.Done()
got, err := svc.Get(proc.ID)
require.NoError(t, err)
assert.Equal(t, proc.ID, got.ID)
})
t.Run("get not found", func(t *testing.T) {
svc, _ := newTestService(t)
_, err := svc.Get("nonexistent")
assert.ErrorIs(t, err, ErrProcessNotFound)
})
}
func TestService_Remove(t *testing.T) {
t.Run("removes completed process", func(t *testing.T) {
svc, _ := newTestService(t)
proc, _ := svc.Start(context.Background(), "echo", "test")
<-proc.Done()
err := svc.Remove(proc.ID)
require.NoError(t, err)
_, err = svc.Get(proc.ID)
assert.ErrorIs(t, err, ErrProcessNotFound)
})
t.Run("cannot remove running process", func(t *testing.T) {
svc, _ := newTestService(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
proc, _ := svc.Start(ctx, "sleep", "10")
err := svc.Remove(proc.ID)
assert.Error(t, err)
cancel()
<-proc.Done()
})
}
func TestService_Clear(t *testing.T) {
t.Run("clears completed processes", func(t *testing.T) {
svc, _ := newTestService(t)
proc1, _ := svc.Start(context.Background(), "echo", "1")
proc2, _ := svc.Start(context.Background(), "echo", "2")
<-proc1.Done()
<-proc2.Done()
assert.Len(t, svc.List(), 2)
svc.Clear()
assert.Len(t, svc.List(), 0)
})
}

89
types.go Normal file
View file

@ -0,0 +1,89 @@
// Package process provides process management with Core IPC integration.
//
// The process package enables spawning, monitoring, and controlling external
// processes with output streaming via the Core ACTION system.
//
// # Getting Started
//
// // Register with Core
// core, _ := framework.New(
// framework.WithName("process", process.NewService(process.Options{})),
// )
//
// // Get service and run a process
// svc, err := framework.ServiceFor[*process.Service](core, "process")
// if err != nil {
// return err
// }
// proc, err := svc.Start(ctx, "go", "test", "./...")
//
// # Listening for Events
//
// Process events are broadcast via Core.ACTION:
//
// core.RegisterAction(func(c *framework.Core, msg framework.Message) error {
// switch m := msg.(type) {
// case process.ActionProcessOutput:
// fmt.Print(m.Line)
// case process.ActionProcessExited:
// fmt.Printf("Exit code: %d\n", m.ExitCode)
// }
// return nil
// })
package process
import "time"
// Status represents the process lifecycle state.
type Status string
const (
// StatusPending indicates the process is queued but not yet started.
StatusPending Status = "pending"
// StatusRunning indicates the process is actively executing.
StatusRunning Status = "running"
// StatusExited indicates the process completed (check ExitCode).
StatusExited Status = "exited"
// StatusFailed indicates the process could not be started.
StatusFailed Status = "failed"
// StatusKilled indicates the process was terminated by signal.
StatusKilled Status = "killed"
)
// Stream identifies the output source.
type Stream string
const (
// StreamStdout is standard output.
StreamStdout Stream = "stdout"
// StreamStderr is standard error.
StreamStderr Stream = "stderr"
)
// RunOptions configures process execution.
type RunOptions struct {
// Command is the executable to run.
Command string
// Args are the command arguments.
Args []string
// Dir is the working directory (empty = current).
Dir string
// Env are additional environment variables (KEY=VALUE format).
Env []string
// DisableCapture disables output buffering.
// By default, output is captured to a ring buffer.
DisableCapture bool
}
// Info provides a snapshot of process state without internal fields.
type Info struct {
ID string `json:"id"`
Command string `json:"command"`
Args []string `json:"args"`
Dir string `json:"dir"`
StartedAt time.Time `json:"startedAt"`
Status Status `json:"status"`
ExitCode int `json:"exitCode"`
Duration time.Duration `json:"duration"`
PID int `json:"pid"`
}