go-process/service.go

862 lines
20 KiB
Go

package process
import (
"bufio"
"context"
"errors"
"fmt"
"os"
"os/exec"
"sort"
"sync"
"sync/atomic"
"syscall"
"time"
"dappco.re/go/core"
coreerr "dappco.re/go/core/log"
goio "io"
)
// Default buffer size for process output (1MB).
const DefaultBufferSize = 1024 * 1024
// Errors
var (
ErrProcessNotFound = coreerr.E("", "process not found", nil)
ErrProcessNotRunning = coreerr.E("", "process is not running", nil)
ErrStdinNotAvailable = coreerr.E("", "stdin not available", nil)
ErrContextRequired = coreerr.E("", "context is required", nil)
)
// Service manages process execution with Core IPC integration.
type Service struct {
*core.ServiceRuntime[Options]
processes map[string]*Process
mu sync.RWMutex
bufSize int
idCounter atomic.Uint64
registrations sync.Once
}
// coreApp returns the attached Core runtime, if one exists.
func (s *Service) coreApp() *core.Core {
if s == nil || s.ServiceRuntime == nil {
return nil
}
return s.ServiceRuntime.Core()
}
// Options configures the process service.
//
// Example:
//
// svc := process.NewService(process.Options{BufferSize: 2 * 1024 * 1024})
type Options struct {
// BufferSize is the ring buffer size for output capture.
// Default: 1MB (1024 * 1024 bytes).
BufferSize int
}
// NewService creates a process service factory for Core registration.
//
// core, _ := core.New(
// core.WithName("process", process.NewService(process.Options{})),
// )
//
// Example:
//
// factory := process.NewService(process.Options{})
func NewService(opts Options) func(*core.Core) (any, error) {
return func(c *core.Core) (any, error) {
if opts.BufferSize == 0 {
opts.BufferSize = DefaultBufferSize
}
svc := &Service{
ServiceRuntime: core.NewServiceRuntime(c, opts),
processes: make(map[string]*Process),
bufSize: opts.BufferSize,
}
return svc, nil
}
}
// OnStartup implements core.Startable.
//
// Example:
//
// _ = svc.OnStartup(ctx)
func (s *Service) OnStartup(ctx context.Context) error {
s.registrations.Do(func() {
if c := s.coreApp(); c != nil {
c.RegisterTask(s.handleTask)
}
})
return nil
}
// OnShutdown implements core.Stoppable.
// Immediately kills all running processes to avoid shutdown stalls.
//
// Example:
//
// _ = svc.OnShutdown(ctx)
func (s *Service) OnShutdown(ctx context.Context) error {
s.mu.RLock()
procs := make([]*Process, 0, len(s.processes))
for _, p := range s.processes {
if p.IsRunning() {
procs = append(procs, p)
}
}
s.mu.RUnlock()
for _, p := range procs {
_, _ = p.killTree()
}
return nil
}
// Start spawns a new process with the given command and args.
//
// Example:
//
// proc, err := svc.Start(ctx, "echo", "hello")
func (s *Service) Start(ctx context.Context, command string, args ...string) (*Process, error) {
return s.StartWithOptions(ctx, RunOptions{
Command: command,
Args: args,
})
}
// StartWithOptions spawns a process with full configuration.
//
// Example:
//
// proc, err := svc.StartWithOptions(ctx, process.RunOptions{Command: "pwd", Dir: "/tmp"})
func (s *Service) StartWithOptions(ctx context.Context, opts RunOptions) (*Process, error) {
if opts.Command == "" {
return nil, coreerr.E("Service.StartWithOptions", "command is required", nil)
}
if ctx == nil {
return nil, coreerr.E("Service.StartWithOptions", "context is required", ErrContextRequired)
}
id := fmt.Sprintf("proc-%d", s.idCounter.Add(1))
startedAt := time.Now()
if opts.KillGroup && !opts.Detach {
return nil, coreerr.E("Service.StartWithOptions", "KillGroup requires Detach", nil)
}
// Detached processes use Background context so they survive parent death
parentCtx := ctx
if opts.Detach {
parentCtx = context.Background()
}
procCtx, cancel := context.WithCancel(parentCtx)
cmd := exec.CommandContext(procCtx, opts.Command, opts.Args...)
if opts.Dir != "" {
cmd.Dir = opts.Dir
}
if len(opts.Env) > 0 {
cmd.Env = append(cmd.Environ(), opts.Env...)
}
// Put every subprocess in its own process group so shutdown can terminate
// the full tree without affecting the parent process.
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
// Set up pipes
stdout, err := cmd.StdoutPipe()
if err != nil {
cancel()
return nil, coreerr.E("Service.StartWithOptions", "failed to create stdout pipe", err)
}
stderr, err := cmd.StderrPipe()
if err != nil {
cancel()
return nil, coreerr.E("Service.StartWithOptions", "failed to create stderr pipe", err)
}
stdin, err := cmd.StdinPipe()
if err != nil {
cancel()
return nil, coreerr.E("Service.StartWithOptions", "failed to create stdin pipe", err)
}
// Create output buffer (enabled by default)
var output *RingBuffer
if !opts.DisableCapture {
output = NewRingBuffer(s.bufSize)
}
proc := &Process{
ID: id,
Command: opts.Command,
Args: append([]string(nil), opts.Args...),
Dir: opts.Dir,
Env: append([]string(nil), opts.Env...),
StartedAt: startedAt,
Status: StatusPending,
cmd: cmd,
ctx: procCtx,
cancel: cancel,
output: output,
stdin: stdin,
done: make(chan struct{}),
gracePeriod: opts.GracePeriod,
killGroup: opts.KillGroup && opts.Detach,
}
// Start the process
if err := cmd.Start(); err != nil {
proc.mu.Lock()
proc.Status = StatusFailed
proc.ExitCode = -1
proc.Duration = time.Since(startedAt)
proc.mu.Unlock()
s.mu.Lock()
s.processes[id] = proc
s.mu.Unlock()
close(proc.done)
cancel()
if c := s.coreApp(); c != nil {
_ = c.ACTION(ActionProcessExited{
ID: id,
ExitCode: -1,
Duration: proc.Duration,
Error: nil,
})
}
return proc, coreerr.E("Service.StartWithOptions", "failed to start process", err)
}
proc.mu.Lock()
proc.Status = StatusRunning
proc.mu.Unlock()
// Store process
s.mu.Lock()
s.processes[id] = proc
s.mu.Unlock()
// Start timeout watchdog if configured
if opts.Timeout > 0 {
go func() {
select {
case <-proc.done:
// Process exited before timeout
case <-time.After(opts.Timeout):
proc.Shutdown()
}
}()
}
// Broadcast start
if c := s.coreApp(); c != nil {
_ = c.ACTION(ActionProcessStarted{
ID: id,
Command: opts.Command,
Args: opts.Args,
Dir: opts.Dir,
PID: cmd.Process.Pid,
})
}
// Stream output in goroutines
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
s.streamOutput(proc, stdout, StreamStdout)
}()
go func() {
defer wg.Done()
s.streamOutput(proc, stderr, StreamStderr)
}()
// Wait for process completion
go func() {
// Wait for output streaming to complete
wg.Wait()
// Wait for process exit
err := cmd.Wait()
duration := time.Since(proc.StartedAt)
status, exitCode, _, signalName := classifyProcessExit(err)
proc.mu.Lock()
proc.Duration = duration
proc.ExitCode = exitCode
proc.Status = status
proc.mu.Unlock()
close(proc.done)
if status == StatusKilled {
s.emitKilledAction(proc, signalName)
}
exitAction := ActionProcessExited{
ID: id,
ExitCode: exitCode,
Duration: duration,
Error: nil,
}
if c := s.coreApp(); c != nil {
_ = c.ACTION(exitAction)
}
}()
return proc, nil
}
// streamOutput reads from a pipe and broadcasts lines via ACTION.
func (s *Service) streamOutput(proc *Process, r goio.Reader, stream Stream) {
scanner := bufio.NewScanner(r)
// Increase buffer for long lines
scanner.Buffer(make([]byte, 64*1024), 1024*1024)
for scanner.Scan() {
line := scanner.Text()
// Write to ring buffer
if proc.output != nil {
_, _ = proc.output.Write([]byte(line + "\n"))
}
// Broadcast output
if c := s.coreApp(); c != nil {
_ = c.ACTION(ActionProcessOutput{
ID: proc.ID,
Line: line,
Stream: stream,
})
}
}
}
// Get returns a process by ID.
//
// Example:
//
// proc, err := svc.Get("proc-1")
func (s *Service) Get(id string) (*Process, error) {
s.mu.RLock()
defer s.mu.RUnlock()
proc, ok := s.processes[id]
if !ok {
return nil, ErrProcessNotFound
}
return proc, nil
}
// List returns all processes.
//
// Example:
//
// for _, proc := range svc.List() { _ = proc }
func (s *Service) List() []*Process {
s.mu.RLock()
defer s.mu.RUnlock()
result := make([]*Process, 0, len(s.processes))
for _, p := range s.processes {
result = append(result, p)
}
sortProcesses(result)
return result
}
// Running returns all currently running processes.
//
// Example:
//
// running := svc.Running()
func (s *Service) Running() []*Process {
s.mu.RLock()
defer s.mu.RUnlock()
var result []*Process
for _, p := range s.processes {
if p.IsRunning() {
result = append(result, p)
}
}
sortProcesses(result)
return result
}
// Kill terminates a process by ID.
//
// Example:
//
// _ = svc.Kill("proc-1")
func (s *Service) Kill(id string) error {
proc, err := s.Get(id)
if err != nil {
return err
}
sent, err := proc.kill()
if err != nil {
return err
}
if sent {
s.emitKilledAction(proc, "SIGKILL")
}
return nil
}
// KillPID terminates a process by operating-system PID.
//
// Example:
//
// _ = svc.KillPID(1234)
func (s *Service) KillPID(pid int) error {
if pid <= 0 {
return coreerr.E("Service.KillPID", "pid must be positive", nil)
}
if proc := s.findByPID(pid); proc != nil {
sent, err := proc.kill()
if err != nil {
return err
}
if sent {
s.emitKilledAction(proc, "SIGKILL")
}
return nil
}
if err := syscall.Kill(pid, syscall.SIGTERM); err != nil {
return coreerr.E("Service.KillPID", fmt.Sprintf("failed to signal pid %d", pid), err)
}
return nil
}
// Signal sends a signal to a process by ID.
//
// Example:
//
// _ = svc.Signal("proc-1", syscall.SIGTERM)
func (s *Service) Signal(id string, sig os.Signal) error {
proc, err := s.Get(id)
if err != nil {
return err
}
return proc.Signal(sig)
}
// SignalPID sends a signal to a process by operating-system PID.
//
// Example:
//
// _ = svc.SignalPID(1234, syscall.SIGTERM)
func (s *Service) SignalPID(pid int, sig os.Signal) error {
if pid <= 0 {
return coreerr.E("Service.SignalPID", "pid must be positive", nil)
}
if proc := s.findByPID(pid); proc != nil {
return proc.Signal(sig)
}
target, err := os.FindProcess(pid)
if err != nil {
return coreerr.E("Service.SignalPID", fmt.Sprintf("failed to find pid %d", pid), err)
}
if err := target.Signal(sig); err != nil {
return coreerr.E("Service.SignalPID", fmt.Sprintf("failed to signal pid %d", pid), err)
}
return nil
}
// Remove removes a completed process from the list.
//
// Example:
//
// _ = svc.Remove("proc-1")
func (s *Service) Remove(id string) error {
s.mu.Lock()
defer s.mu.Unlock()
proc, ok := s.processes[id]
if !ok {
return ErrProcessNotFound
}
if proc.IsRunning() {
return coreerr.E("Service.Remove", "cannot remove running process", nil)
}
delete(s.processes, id)
return nil
}
// Clear removes all completed processes.
//
// Example:
//
// svc.Clear()
func (s *Service) Clear() {
s.mu.Lock()
defer s.mu.Unlock()
for id, p := range s.processes {
if !p.IsRunning() {
delete(s.processes, id)
}
}
}
// Output returns the captured output of a process.
//
// Example:
//
// out, err := svc.Output("proc-1")
func (s *Service) Output(id string) (string, error) {
proc, err := s.Get(id)
if err != nil {
return "", err
}
return proc.Output(), nil
}
// Input writes data to the stdin of a managed process.
//
// Example:
//
// _ = svc.Input("proc-1", "hello\n")
func (s *Service) Input(id string, input string) error {
proc, err := s.Get(id)
if err != nil {
return err
}
return proc.SendInput(input)
}
// CloseStdin closes the stdin pipe of a managed process.
//
// Example:
//
// _ = svc.CloseStdin("proc-1")
func (s *Service) CloseStdin(id string) error {
proc, err := s.Get(id)
if err != nil {
return err
}
return proc.CloseStdin()
}
// Wait blocks until a managed process exits and returns its final snapshot.
//
// Example:
//
// info, err := svc.Wait("proc-1")
func (s *Service) Wait(id string) (Info, error) {
proc, err := s.Get(id)
if err != nil {
return Info{}, err
}
if err := proc.Wait(); err != nil {
return proc.Info(), err
}
return proc.Info(), nil
}
// findByPID locates a managed process by operating-system PID.
func (s *Service) findByPID(pid int) *Process {
s.mu.RLock()
defer s.mu.RUnlock()
for _, proc := range s.processes {
proc.mu.RLock()
matches := proc.cmd != nil && proc.cmd.Process != nil && proc.cmd.Process.Pid == pid
proc.mu.RUnlock()
if matches {
return proc
}
}
return nil
}
// Run executes a command and waits for completion.
// Returns the combined output and any error.
//
// Example:
//
// out, err := svc.Run(ctx, "echo", "hello")
func (s *Service) Run(ctx context.Context, command string, args ...string) (string, error) {
proc, err := s.Start(ctx, command, args...)
if err != nil {
return "", err
}
<-proc.Done()
output := proc.Output()
if proc.Status == StatusKilled {
return output, coreerr.E("Service.Run", "process was killed", nil)
}
if proc.ExitCode != 0 {
return output, coreerr.E("Service.Run", fmt.Sprintf("process exited with code %d", proc.ExitCode), nil)
}
return output, nil
}
// RunWithOptions executes a command with options and waits for completion.
//
// Example:
//
// out, err := svc.RunWithOptions(ctx, process.RunOptions{Command: "echo", Args: []string{"hello"}})
func (s *Service) RunWithOptions(ctx context.Context, opts RunOptions) (string, error) {
proc, err := s.StartWithOptions(ctx, opts)
if err != nil {
return "", err
}
<-proc.Done()
output := proc.Output()
if proc.Status == StatusKilled {
return output, coreerr.E("Service.RunWithOptions", "process was killed", nil)
}
if proc.ExitCode != 0 {
return output, coreerr.E("Service.RunWithOptions", fmt.Sprintf("process exited with code %d", proc.ExitCode), nil)
}
return output, nil
}
// handleTask dispatches Core.PERFORM messages for the process service.
func (s *Service) handleTask(c *core.Core, task core.Task) core.Result {
switch m := task.(type) {
case TaskProcessStart:
proc, err := s.StartWithOptions(c.Context(), RunOptions{
Command: m.Command,
Args: m.Args,
Dir: m.Dir,
Env: m.Env,
DisableCapture: m.DisableCapture,
Detach: m.Detach,
Timeout: m.Timeout,
GracePeriod: m.GracePeriod,
KillGroup: m.KillGroup,
})
if err != nil {
return core.Result{Value: err, OK: false}
}
return core.Result{Value: proc.Info(), OK: true}
case TaskProcessRun:
output, err := s.RunWithOptions(c.Context(), RunOptions{
Command: m.Command,
Args: m.Args,
Dir: m.Dir,
Env: m.Env,
DisableCapture: m.DisableCapture,
Detach: m.Detach,
Timeout: m.Timeout,
GracePeriod: m.GracePeriod,
KillGroup: m.KillGroup,
})
if err != nil {
return core.Result{Value: err, OK: false}
}
return core.Result{Value: output, OK: true}
case TaskProcessKill:
switch {
case m.ID != "":
if err := s.Kill(m.ID); err != nil {
return core.Result{Value: err, OK: false}
}
return core.Result{OK: true}
case m.PID > 0:
if err := s.KillPID(m.PID); err != nil {
return core.Result{Value: err, OK: false}
}
return core.Result{OK: true}
default:
return core.Result{Value: coreerr.E("Service.handleTask", "task process kill requires an id or pid", nil), OK: false}
}
case TaskProcessSignal:
switch {
case m.ID != "":
if err := s.Signal(m.ID, m.Signal); err != nil {
return core.Result{Value: err, OK: false}
}
return core.Result{OK: true}
case m.PID > 0:
if err := s.SignalPID(m.PID, m.Signal); err != nil {
return core.Result{Value: err, OK: false}
}
return core.Result{OK: true}
default:
return core.Result{Value: coreerr.E("Service.handleTask", "task process signal requires an id or pid", nil), OK: false}
}
case TaskProcessGet:
if m.ID == "" {
return core.Result{Value: coreerr.E("Service.handleTask", "task process get requires an id", nil), OK: false}
}
proc, err := s.Get(m.ID)
if err != nil {
return core.Result{Value: err, OK: false}
}
return core.Result{Value: proc.Info(), OK: true}
case TaskProcessWait:
if m.ID == "" {
return core.Result{Value: coreerr.E("Service.handleTask", "task process wait requires an id", nil), OK: false}
}
info, err := s.Wait(m.ID)
if err != nil {
return core.Result{Value: err, OK: false}
}
return core.Result{Value: info, OK: true}
case TaskProcessOutput:
if m.ID == "" {
return core.Result{Value: coreerr.E("Service.handleTask", "task process output requires an id", nil), OK: false}
}
output, err := s.Output(m.ID)
if err != nil {
return core.Result{Value: err, OK: false}
}
return core.Result{Value: output, OK: true}
case TaskProcessInput:
if m.ID == "" {
return core.Result{Value: coreerr.E("Service.handleTask", "task process input requires an id", nil), OK: false}
}
proc, err := s.Get(m.ID)
if err != nil {
return core.Result{Value: err, OK: false}
}
if err := proc.SendInput(m.Input); err != nil {
return core.Result{Value: err, OK: false}
}
return core.Result{OK: true}
case TaskProcessCloseStdin:
if m.ID == "" {
return core.Result{Value: coreerr.E("Service.handleTask", "task process close stdin requires an id", nil), OK: false}
}
proc, err := s.Get(m.ID)
if err != nil {
return core.Result{Value: err, OK: false}
}
if err := proc.CloseStdin(); err != nil {
return core.Result{Value: err, OK: false}
}
return core.Result{OK: true}
case TaskProcessList:
procs := s.List()
if m.RunningOnly {
procs = s.Running()
}
infos := make([]Info, 0, len(procs))
for _, proc := range procs {
infos = append(infos, proc.Info())
}
return core.Result{Value: infos, OK: true}
case TaskProcessRemove:
if m.ID == "" {
return core.Result{Value: coreerr.E("Service.handleTask", "task process remove requires an id", nil), OK: false}
}
if err := s.Remove(m.ID); err != nil {
return core.Result{Value: err, OK: false}
}
return core.Result{OK: true}
case TaskProcessClear:
s.Clear()
return core.Result{OK: true}
default:
return core.Result{}
}
}
// classifyProcessExit maps a command completion error to lifecycle state.
func classifyProcessExit(err error) (Status, int, error, string) {
if err == nil {
return StatusExited, 0, nil, ""
}
var exitErr *exec.ExitError
if errors.As(err, &exitErr) {
if ws, ok := exitErr.Sys().(syscall.WaitStatus); ok && ws.Signaled() {
signalName := ws.Signal().String()
if signalName == "" {
signalName = "signal"
}
return StatusKilled, -1, coreerr.E("Service.StartWithOptions", "process was killed", nil), signalName
}
exitCode := exitErr.ExitCode()
return StatusExited, exitCode, coreerr.E("Service.StartWithOptions", fmt.Sprintf("process exited with code %d", exitCode), nil), ""
}
return StatusFailed, 0, err, ""
}
// emitKilledAction broadcasts a kill event once for the given process.
func (s *Service) emitKilledAction(proc *Process, signalName string) {
if proc == nil {
return
}
proc.mu.Lock()
if proc.killNotified {
proc.mu.Unlock()
return
}
proc.killNotified = true
if signalName != "" {
proc.killSignal = signalName
} else if proc.killSignal == "" {
proc.killSignal = "SIGKILL"
}
signal := proc.killSignal
proc.mu.Unlock()
if c := s.coreApp(); c != nil {
_ = c.ACTION(ActionProcessKilled{
ID: proc.ID,
Signal: signal,
})
}
}
// sortProcesses orders processes by start time, then ID for stable output.
func sortProcesses(procs []*Process) {
sort.Slice(procs, func(i, j int) bool {
if procs[i].StartedAt.Equal(procs[j].StartedAt) {
return procs[i].ID < procs[j].ID
}
return procs[i].StartedAt.Before(procs[j].StartedAt)
})
}