diff --git a/process.go b/process.go index 9a2ad0b..ec03f39 100644 --- a/process.go +++ b/process.go @@ -16,6 +16,7 @@ import ( // Process represents a managed external process. type Process struct { ID string + PID int Command string Args []string Dir string @@ -34,14 +35,18 @@ type Process struct { mu sync.RWMutex gracePeriod time.Duration killGroup bool + lastSignal string } +// ManagedProcess is kept as a compatibility alias for legacy references. +type ManagedProcess = Process + // Info returns a snapshot of process state. func (p *Process) Info() Info { p.mu.RLock() defer p.mu.RUnlock() - pid := 0 + pid := p.PID if p.cmd != nil && p.cmd.Process != nil { pid = p.cmd.Process.Pid } @@ -122,6 +127,8 @@ func (p *Process) Kill() error { return nil } + p.lastSignal = "SIGKILL" + if p.killGroup { // Kill entire process group (negative PID) return syscall.Kill(-p.cmd.Process.Pid, syscall.SIGKILL) @@ -169,6 +176,7 @@ func (p *Process) terminate() error { } pid := p.cmd.Process.Pid + p.lastSignal = "SIGTERM" if p.killGroup { pid = -pid } @@ -221,3 +229,9 @@ func (p *Process) CloseStdin() error { p.stdin = nil return err } + +func (p *Process) requestedSignal() string { + p.mu.RLock() + defer p.mu.RUnlock() + return p.lastSignal +} diff --git a/service.go b/service.go index 5fd1339..576b3ec 100644 --- a/service.go +++ b/service.go @@ -4,36 +4,38 @@ import ( "bufio" "context" "errors" - "fmt" - "io" + "os" "os/exec" "sync" - "sync/atomic" "syscall" "time" "dappco.re/go/core" - coreerr "dappco.re/go/core/log" ) +// execCmd is kept for backwards-compatible test stubbing/mocking. +type execCmd = exec.Cmd + +type streamReader interface { + Read(p []byte) (n int, err error) +} + // Default buffer size for process output (1MB). const DefaultBufferSize = 1024 * 1024 // Errors var ( - ErrProcessNotFound = coreerr.E("", "process not found", nil) - ErrProcessNotRunning = coreerr.E("", "process is not running", nil) - ErrStdinNotAvailable = coreerr.E("", "stdin not available", nil) + ErrProcessNotFound = core.E("", "process not found", nil) + ErrProcessNotRunning = core.E("", "process is not running", nil) + ErrStdinNotAvailable = core.E("", "stdin not available", nil) ) // Service manages process execution with Core IPC integration. type Service struct { *core.ServiceRuntime[Options] - processes map[string]*Process - mu sync.RWMutex - bufSize int - idCounter atomic.Uint64 + managed *core.Registry[*ManagedProcess] + bufSize int } // Options configures the process service. @@ -43,11 +45,9 @@ type Options struct { BufferSize int } -// NewService creates a process service factory for Core registration. +// NewService constructs a process service factory for Core registration. // -// core, _ := core.New( -// core.WithName("process", process.NewService(process.Options{})), -// ) +// c := framework.New(core.WithName("process", process.NewService(process.Options{}))) func NewService(opts Options) func(*core.Core) (any, error) { return func(c *core.Core) (any, error) { if opts.BufferSize == 0 { @@ -55,39 +55,49 @@ func NewService(opts Options) func(*core.Core) (any, error) { } svc := &Service{ ServiceRuntime: core.NewServiceRuntime(c, opts), - processes: make(map[string]*Process), + managed: core.NewRegistry[*ManagedProcess](), bufSize: opts.BufferSize, } return svc, nil } } -// OnStartup implements core.Startable. -func (s *Service) OnStartup(ctx context.Context) error { - return nil +// Register constructs a Service bound to the provided Core instance. +// +// c := core.New() +// svc := process.Register(c).Value.(*process.Service) +func Register(c *core.Core) core.Result { + r := NewService(Options{BufferSize: DefaultBufferSize})(c) + if r == nil { + return core.Result{Value: core.E("process.register", "factory returned nil service", nil), OK: false} + } + + return core.Result{Value: r, OK: true} } -// OnShutdown implements core.Stoppable. -// Gracefully shuts down all running processes (SIGTERM → SIGKILL). -func (s *Service) OnShutdown(ctx context.Context) error { - s.mu.RLock() - procs := make([]*Process, 0, len(s.processes)) - for _, p := range s.processes { - if p.IsRunning() { - procs = append(procs, p) - } - } - s.mu.RUnlock() +// OnStartup implements core.Startable. +func (s *Service) OnStartup(ctx context.Context) core.Result { + c := s.Core() + c.Action("process.run", s.handleRun) + c.Action("process.start", s.handleStart) + c.Action("process.kill", s.handleKill) + c.Action("process.list", s.handleList) + c.Action("process.get", s.handleGet) + return core.Result{OK: true} +} - for _, p := range procs { - _ = p.Shutdown() - } - - return nil +// OnShutdown implements core.Stoppable — kills all managed processes. +func (s *Service) OnShutdown(ctx context.Context) core.Result { + s.managed.Each(func(_ string, proc *ManagedProcess) { + _ = proc.Kill() + }) + return core.Result{OK: true} } // Start spawns a new process with the given command and args. -func (s *Service) Start(ctx context.Context, command string, args ...string) (*Process, error) { +// +// proc := svc.Start(ctx, "echo", "hello") +func (s *Service) Start(ctx context.Context, command string, args ...string) (*ManagedProcess, error) { return s.StartWithOptions(ctx, RunOptions{ Command: command, Args: args, @@ -95,8 +105,22 @@ func (s *Service) Start(ctx context.Context, command string, args ...string) (*P } // StartWithOptions spawns a process with full configuration. -func (s *Service) StartWithOptions(ctx context.Context, opts RunOptions) (*Process, error) { - id := fmt.Sprintf("proc-%d", s.idCounter.Add(1)) +// +// proc := svc.StartWithOptions(ctx, process.RunOptions{Command: "go", Args: []string{"test", "./..."}}) +func (s *Service) StartWithOptions(ctx context.Context, opts RunOptions) (*ManagedProcess, error) { + return startResultToProcess(s.startWithOptions(ctx, opts), "process.start") +} + +// startWithOptions is the Result-form internal implementation for StartWithOptions. +func (s *Service) startWithOptions(ctx context.Context, opts RunOptions) core.Result { + if opts.Command == "" { + return core.Result{Value: core.E("process.start", "command is required", nil), OK: false} + } + if ctx == nil { + ctx = context.Background() + } + + id := core.ID() // Detached processes use Background context so they survive parent death parentCtx := ctx @@ -104,7 +128,7 @@ func (s *Service) StartWithOptions(ctx context.Context, opts RunOptions) (*Proce parentCtx = context.Background() } procCtx, cancel := context.WithCancel(parentCtx) - cmd := exec.CommandContext(procCtx, opts.Command, opts.Args...) + cmd := execCommandContext(procCtx, opts.Command, opts.Args...) if opts.Dir != "" { cmd.Dir = opts.Dir @@ -113,42 +137,42 @@ func (s *Service) StartWithOptions(ctx context.Context, opts RunOptions) (*Proce cmd.Env = append(cmd.Environ(), opts.Env...) } - // Detached processes get their own process group + // Detached processes get their own process group. if opts.Detach { cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} } - // Set up pipes + // Set up pipes. stdout, err := cmd.StdoutPipe() if err != nil { cancel() - return nil, coreerr.E("Service.StartWithOptions", "failed to create stdout pipe", err) + return core.Result{Value: core.E("process.start", core.Concat("stdout pipe failed: ", opts.Command), err), OK: false} } stderr, err := cmd.StderrPipe() if err != nil { cancel() - return nil, coreerr.E("Service.StartWithOptions", "failed to create stderr pipe", err) + return core.Result{Value: core.E("process.start", core.Concat("stderr pipe failed: ", opts.Command), err), OK: false} } stdin, err := cmd.StdinPipe() if err != nil { cancel() - return nil, coreerr.E("Service.StartWithOptions", "failed to create stdin pipe", err) + return core.Result{Value: core.E("process.start", core.Concat("stdin pipe failed: ", opts.Command), err), OK: false} } - // Create output buffer (enabled by default) + // Create output buffer (enabled by default). var output *RingBuffer if !opts.DisableCapture { output = NewRingBuffer(s.bufSize) } - proc := &Process{ + proc := &ManagedProcess{ ID: id, Command: opts.Command, - Args: opts.Args, + Args: append([]string(nil), opts.Args...), Dir: opts.Dir, - Env: opts.Env, + Env: append([]string(nil), opts.Env...), StartedAt: time.Now(), Status: StatusRunning, cmd: cmd, @@ -161,30 +185,32 @@ func (s *Service) StartWithOptions(ctx context.Context, opts RunOptions) (*Proce killGroup: opts.KillGroup && opts.Detach, } - // Start the process + // Start the process. if err := cmd.Start(); err != nil { cancel() - return nil, coreerr.E("Service.StartWithOptions", "failed to start process", err) + return core.Result{Value: core.E("process.start", core.Concat("command failed: ", opts.Command), err), OK: false} + } + proc.PID = cmd.Process.Pid + + // Store process. + if r := s.managed.Set(id, proc); !r.OK { + cancel() + _ = cmd.Process.Kill() + return r } - // Store process - s.mu.Lock() - s.processes[id] = proc - s.mu.Unlock() - - // Start timeout watchdog if configured + // Start timeout watchdog if configured. if opts.Timeout > 0 { go func() { select { case <-proc.done: - // Process exited before timeout case <-time.After(opts.Timeout): - proc.Shutdown() + _ = proc.Shutdown() } }() } - // Broadcast start + // Broadcast start. _ = s.Core().ACTION(ActionProcessStarted{ ID: id, Command: opts.Command, @@ -193,7 +219,7 @@ func (s *Service) StartWithOptions(ctx context.Context, opts RunOptions) (*Proce PID: cmd.Process.Pid, }) - // Stream output in goroutines + // Stream output in goroutines. var wg sync.WaitGroup wg.Add(2) go func() { @@ -205,67 +231,54 @@ func (s *Service) StartWithOptions(ctx context.Context, opts RunOptions) (*Proce s.streamOutput(proc, stderr, StreamStderr) }() - // Wait for process completion + // Wait for process completion. go func() { - // Wait for output streaming to complete wg.Wait() - - // Wait for process exit - err := cmd.Wait() + waitErr := cmd.Wait() duration := time.Since(proc.StartedAt) + status, exitCode, actionErr, killedSignal := classifyProcessExit(proc, waitErr) proc.mu.Lock() proc.Duration = duration - if err != nil { - var exitErr *exec.ExitError - if errors.As(err, &exitErr) { - proc.ExitCode = exitErr.ExitCode() - proc.Status = StatusExited - } else { - proc.Status = StatusFailed - } - } else { - proc.ExitCode = 0 - proc.Status = StatusExited - } - status := proc.Status - exitCode := proc.ExitCode + proc.ExitCode = exitCode + proc.Status = status proc.mu.Unlock() close(proc.done) - // Broadcast exit - var exitErr error - if status == StatusFailed { - exitErr = err + if status == StatusKilled { + _ = s.Core().ACTION(ActionProcessKilled{ + ID: id, + Signal: killedSignal, + }) } _ = s.Core().ACTION(ActionProcessExited{ ID: id, ExitCode: exitCode, Duration: duration, - Error: exitErr, + Error: actionErr, }) }() - return proc, nil + return core.Result{Value: proc, OK: true} } // streamOutput reads from a pipe and broadcasts lines via ACTION. -func (s *Service) streamOutput(proc *Process, r io.Reader, stream Stream) { +func (s *Service) streamOutput(proc *ManagedProcess, r streamReader, stream Stream) { scanner := bufio.NewScanner(r) - // Increase buffer for long lines + // Increase buffer for long lines. scanner.Buffer(make([]byte, 64*1024), 1024*1024) for scanner.Scan() { line := scanner.Text() - // Write to ring buffer + // Write to ring buffer. if proc.output != nil { _, _ = proc.output.Write([]byte(line + "\n")) } - // Broadcast output + // Broadcast output. _ = s.Core().ACTION(ActionProcessOutput{ ID: proc.ID, Line: line, @@ -275,40 +288,31 @@ func (s *Service) streamOutput(proc *Process, r io.Reader, stream Stream) { } // Get returns a process by ID. -func (s *Service) Get(id string) (*Process, error) { - s.mu.RLock() - defer s.mu.RUnlock() - - proc, ok := s.processes[id] - if !ok { +func (s *Service) Get(id string) (*ManagedProcess, error) { + r := s.managed.Get(id) + if !r.OK { return nil, ErrProcessNotFound } - return proc, nil + return r.Value, nil } // List returns all processes. -func (s *Service) List() []*Process { - s.mu.RLock() - defer s.mu.RUnlock() - - result := make([]*Process, 0, len(s.processes)) - for _, p := range s.processes { - result = append(result, p) - } +func (s *Service) List() []*ManagedProcess { + result := make([]*ManagedProcess, 0, s.managed.Len()) + s.managed.Each(func(_ string, proc *ManagedProcess) { + result = append(result, proc) + }) return result } // Running returns all currently running processes. -func (s *Service) Running() []*Process { - s.mu.RLock() - defer s.mu.RUnlock() - - var result []*Process - for _, p := range s.processes { - if p.IsRunning() { - result = append(result, p) +func (s *Service) Running() []*ManagedProcess { + result := make([]*ManagedProcess, 0, s.managed.Len()) + s.managed.Each(func(_ string, proc *ManagedProcess) { + if proc.IsRunning() { + result = append(result, proc) } - } + }) return result } @@ -322,42 +326,35 @@ func (s *Service) Kill(id string) error { if err := proc.Kill(); err != nil { return err } - - _ = s.Core().ACTION(ActionProcessKilled{ - ID: id, - Signal: "SIGKILL", - }) - return nil } // Remove removes a completed process from the list. func (s *Service) Remove(id string) error { - s.mu.Lock() - defer s.mu.Unlock() - - proc, ok := s.processes[id] - if !ok { + proc, err := s.Get(id) + if err != nil { + return err + } + if proc.IsRunning() { + return core.E("process.remove", core.Concat("cannot remove running process: ", id), nil) + } + r := s.managed.Delete(id) + if !r.OK { return ErrProcessNotFound } - - if proc.IsRunning() { - return coreerr.E("Service.Remove", "cannot remove running process", nil) - } - - delete(s.processes, id) return nil } // Clear removes all completed processes. func (s *Service) Clear() { - s.mu.Lock() - defer s.mu.Unlock() - - for id, p := range s.processes { - if !p.IsRunning() { - delete(s.processes, id) + ids := make([]string, 0) + s.managed.Each(func(id string, proc *ManagedProcess) { + if !proc.IsRunning() { + ids = append(ids, id) } + }) + for _, id := range ids { + s.managed.Delete(id) } } @@ -371,34 +368,259 @@ func (s *Service) Output(id string) (string, error) { } // Run executes a command and waits for completion. -// Returns the combined output and any error. func (s *Service) Run(ctx context.Context, command string, args ...string) (string, error) { - proc, err := s.Start(ctx, command, args...) - if err != nil { - return "", err - } - - <-proc.Done() - - output := proc.Output() - if proc.ExitCode != 0 { - return output, coreerr.E("Service.Run", fmt.Sprintf("process exited with code %d", proc.ExitCode), nil) - } - return output, nil + return s.RunWithOptions(ctx, RunOptions{ + Command: command, + Args: args, + }) } // RunWithOptions executes a command with options and waits for completion. func (s *Service) RunWithOptions(ctx context.Context, opts RunOptions) (string, error) { - proc, err := s.StartWithOptions(ctx, opts) + return runResultToString(s.runCommand(ctx, opts), "process.run") +} + +// --- Internal request helpers. --- + +func (s *Service) handleRun(ctx context.Context, opts core.Options) core.Result { + command := opts.String("command") + if command == "" { + return core.Result{Value: core.E("process.run", "command is required", nil), OK: false} + } + + runOpts := RunOptions{ + Command: command, + Dir: opts.String("dir"), + } + if r := opts.Get("args"); r.OK { + runOpts.Args = optionStrings(r.Value) + } + if r := opts.Get("env"); r.OK { + runOpts.Env = optionStrings(r.Value) + } + + result, err := s.runCommand(ctx, runOpts) if err != nil { + return core.Result{Value: err, OK: false} + } + return core.Result{Value: result, OK: true} +} + +func (s *Service) handleStart(ctx context.Context, opts core.Options) core.Result { + command := opts.String("command") + if command == "" { + return core.Result{Value: core.E("process.start", "command is required", nil), OK: false} + } + + startOpts := RunOptions{ + Command: command, + Dir: opts.String("dir"), + Detach: opts.Bool("detach"), + } + if r := opts.Get("args"); r.OK { + startOpts.Args = optionStrings(r.Value) + } + if r := opts.Get("env"); r.OK { + startOpts.Env = optionStrings(r.Value) + } + + proc, err := s.StartWithOptions(ctx, startOpts) + if err != nil { + return core.Result{Value: err, OK: false} + } + return core.Result{Value: proc.ID, OK: true} +} + +func (s *Service) handleKill(ctx context.Context, opts core.Options) core.Result { + id := opts.String("id") + if id != "" { + if err := s.Kill(id); err != nil { + if errors.Is(err, ErrProcessNotFound) { + return core.Result{Value: core.E("process.kill", core.Concat("not found: ", id), nil), OK: false} + } + return core.Result{Value: err, OK: false} + } + return core.Result{OK: true} + } + + pid := opts.Int("pid") + if pid > 0 { + proc, err := processHandle(pid) + if err != nil { + return core.Result{Value: core.E("process.kill", core.Concat("find pid failed: ", core.Sprintf("%d", pid)), err), OK: false} + } + if err := proc.Signal(syscall.SIGTERM); err != nil { + return core.Result{Value: core.E("process.kill", core.Concat("signal failed: ", core.Sprintf("%d", pid)), err), OK: false} + } + return core.Result{OK: true} + } + + return core.Result{Value: core.E("process.kill", "need id or pid", nil), OK: false} +} + +func (s *Service) handleList(ctx context.Context, opts core.Options) core.Result { + return core.Result{Value: s.managed.Names(), OK: true} +} + +func (s *Service) handleGet(ctx context.Context, opts core.Options) core.Result { + id := opts.String("id") + if id == "" { + return core.Result{Value: core.E("process.get", "id is required", nil), OK: false} + } + proc, err := s.Get(id) + if err != nil { + return core.Result{Value: core.E("process.get", core.Concat("not found: ", id), err), OK: false} + } + return core.Result{Value: proc.Info(), OK: true} +} + +func (s *Service) runCommand(ctx context.Context, opts RunOptions) (string, error) { + if opts.Command == "" { + return "", core.E("process.run", "command is required", nil) + } + if ctx == nil { + ctx = context.Background() + } + + cmd := execCommandContext(ctx, opts.Command, opts.Args...) + if opts.Dir != "" { + cmd.Dir = opts.Dir + } + if len(opts.Env) > 0 { + cmd.Env = append(cmd.Environ(), opts.Env...) + } + + output, err := cmd.CombinedOutput() + if err != nil { + return "", core.E("process.run", core.Concat("command failed: ", opts.Command), err) + } + return string(output), nil +} + +func classifyProcessExit(proc *ManagedProcess, err error) (Status, int, error, string) { + if err == nil { + return StatusExited, 0, nil, "" + } + + if sig, ok := processExitSignal(err); ok { + return StatusKilled, -1, err, normalizeSignalName(sig) + } + + if ctxErr := proc.ctx.Err(); ctxErr != nil { + signal := proc.requestedSignal() + if signal == "" { + signal = "SIGKILL" + } + return StatusKilled, -1, ctxErr, signal + } + + var exitErr *exec.ExitError + if errors.As(err, &exitErr) { + return StatusExited, exitErr.ExitCode(), err, "" + } + + return StatusFailed, -1, err, "" +} + +func processExitSignal(err error) (syscall.Signal, bool) { + var exitErr *exec.ExitError + if !errors.As(err, &exitErr) || exitErr.ProcessState == nil { + return 0, false + } + + waitStatus, ok := exitErr.ProcessState.Sys().(syscall.WaitStatus) + if !ok || !waitStatus.Signaled() { + return 0, false + } + return waitStatus.Signal(), true +} + +func startResultToProcess(r core.Result, operation string) (*ManagedProcess, error) { + if r.OK { + proc, ok := r.Value.(*ManagedProcess) + if !ok { + return nil, core.E(operation, "invalid process result type", nil) + } + return proc, nil + } + if err, ok := r.Value.(error); ok { + return nil, err + } + return nil, core.E(operation, "process start failed", nil) +} + +func runResultToString(r core.Result, operation string) (string, error) { + if r.OK { + output, ok := r.Value.(string) + if !ok { + return "", core.E(operation, "invalid run result type", nil) + } + return output, nil + } + if err, ok := r.Value.(error); ok { return "", err } - - <-proc.Done() - - output := proc.Output() - if proc.ExitCode != 0 { - return output, coreerr.E("Service.RunWithOptions", fmt.Sprintf("process exited with code %d", proc.ExitCode), nil) - } - return output, nil + return "", core.E(operation, "process run failed", nil) +} + +func normalizeSignalName(sig syscall.Signal) string { + switch sig { + case syscall.SIGINT: + return "SIGINT" + case syscall.SIGKILL: + return "SIGKILL" + case syscall.SIGTERM: + return "SIGTERM" + default: + return sig.String() + } +} + +func optionStrings(value any) []string { + switch typed := value.(type) { + case nil: + return nil + case []string: + return append([]string(nil), typed...) + case []any: + result := make([]string, 0, len(typed)) + for _, item := range typed { + text, ok := item.(string) + if !ok { + return nil + } + result = append(result, text) + } + return result + default: + return nil + } +} + +func execCommandContext(ctx context.Context, name string, args ...string) *exec.Cmd { + return exec.CommandContext(ctx, name, args...) +} + +func execLookPath(name string) (string, error) { + return exec.LookPath(name) +} + +func currentPID() int { + return os.Getpid() +} + +func processHandle(pid int) (*os.Process, error) { + return os.FindProcess(pid) +} + +func userHomeDir() (string, error) { + return os.UserHomeDir() +} + +func tempDir() string { + return os.TempDir() +} + +func isNotExist(err error) bool { + return os.IsNotExist(err) } diff --git a/service_test.go b/service_test.go index 868b7a3..d237857 100644 --- a/service_test.go +++ b/service_test.go @@ -374,8 +374,8 @@ func TestService_OnShutdown(t *testing.T) { assert.True(t, proc1.IsRunning()) assert.True(t, proc2.IsRunning()) - err = svc.OnShutdown(context.Background()) - assert.NoError(t, err) + r := svc.OnShutdown(context.Background()) + assert.True(t, r.OK) select { case <-proc1.Done(): @@ -393,8 +393,8 @@ func TestService_OnShutdown(t *testing.T) { func TestService_OnStartup(t *testing.T) { t.Run("returns nil", func(t *testing.T) { svc, _ := newTestService(t) - err := svc.OnStartup(context.Background()) - assert.NoError(t, err) + r := svc.OnStartup(context.Background()) + assert.True(t, r.OK) }) }