fix(process): align service APIs with AX-compatible error boundaries
This commit is contained in:
parent
4ff0d0b745
commit
15e4c8ddeb
3 changed files with 399 additions and 163 deletions
16
process.go
16
process.go
|
|
@ -16,6 +16,7 @@ import (
|
|||
// Process represents a managed external process.
|
||||
type Process struct {
|
||||
ID string
|
||||
PID int
|
||||
Command string
|
||||
Args []string
|
||||
Dir string
|
||||
|
|
@ -34,14 +35,18 @@ type Process struct {
|
|||
mu sync.RWMutex
|
||||
gracePeriod time.Duration
|
||||
killGroup bool
|
||||
lastSignal string
|
||||
}
|
||||
|
||||
// ManagedProcess is kept as a compatibility alias for legacy references.
|
||||
type ManagedProcess = Process
|
||||
|
||||
// Info returns a snapshot of process state.
|
||||
func (p *Process) Info() Info {
|
||||
p.mu.RLock()
|
||||
defer p.mu.RUnlock()
|
||||
|
||||
pid := 0
|
||||
pid := p.PID
|
||||
if p.cmd != nil && p.cmd.Process != nil {
|
||||
pid = p.cmd.Process.Pid
|
||||
}
|
||||
|
|
@ -122,6 +127,8 @@ func (p *Process) Kill() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
p.lastSignal = "SIGKILL"
|
||||
|
||||
if p.killGroup {
|
||||
// Kill entire process group (negative PID)
|
||||
return syscall.Kill(-p.cmd.Process.Pid, syscall.SIGKILL)
|
||||
|
|
@ -169,6 +176,7 @@ func (p *Process) terminate() error {
|
|||
}
|
||||
|
||||
pid := p.cmd.Process.Pid
|
||||
p.lastSignal = "SIGTERM"
|
||||
if p.killGroup {
|
||||
pid = -pid
|
||||
}
|
||||
|
|
@ -221,3 +229,9 @@ func (p *Process) CloseStdin() error {
|
|||
p.stdin = nil
|
||||
return err
|
||||
}
|
||||
|
||||
func (p *Process) requestedSignal() string {
|
||||
p.mu.RLock()
|
||||
defer p.mu.RUnlock()
|
||||
return p.lastSignal
|
||||
}
|
||||
|
|
|
|||
538
service.go
538
service.go
|
|
@ -4,36 +4,38 @@ import (
|
|||
"bufio"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"os/exec"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"dappco.re/go/core"
|
||||
coreerr "dappco.re/go/core/log"
|
||||
)
|
||||
|
||||
// execCmd is kept for backwards-compatible test stubbing/mocking.
|
||||
type execCmd = exec.Cmd
|
||||
|
||||
type streamReader interface {
|
||||
Read(p []byte) (n int, err error)
|
||||
}
|
||||
|
||||
// Default buffer size for process output (1MB).
|
||||
const DefaultBufferSize = 1024 * 1024
|
||||
|
||||
// Errors
|
||||
var (
|
||||
ErrProcessNotFound = coreerr.E("", "process not found", nil)
|
||||
ErrProcessNotRunning = coreerr.E("", "process is not running", nil)
|
||||
ErrStdinNotAvailable = coreerr.E("", "stdin not available", nil)
|
||||
ErrProcessNotFound = core.E("", "process not found", nil)
|
||||
ErrProcessNotRunning = core.E("", "process is not running", nil)
|
||||
ErrStdinNotAvailable = core.E("", "stdin not available", nil)
|
||||
)
|
||||
|
||||
// Service manages process execution with Core IPC integration.
|
||||
type Service struct {
|
||||
*core.ServiceRuntime[Options]
|
||||
|
||||
processes map[string]*Process
|
||||
mu sync.RWMutex
|
||||
bufSize int
|
||||
idCounter atomic.Uint64
|
||||
managed *core.Registry[*ManagedProcess]
|
||||
bufSize int
|
||||
}
|
||||
|
||||
// Options configures the process service.
|
||||
|
|
@ -43,11 +45,9 @@ type Options struct {
|
|||
BufferSize int
|
||||
}
|
||||
|
||||
// NewService creates a process service factory for Core registration.
|
||||
// NewService constructs a process service factory for Core registration.
|
||||
//
|
||||
// core, _ := core.New(
|
||||
// core.WithName("process", process.NewService(process.Options{})),
|
||||
// )
|
||||
// c := framework.New(core.WithName("process", process.NewService(process.Options{})))
|
||||
func NewService(opts Options) func(*core.Core) (any, error) {
|
||||
return func(c *core.Core) (any, error) {
|
||||
if opts.BufferSize == 0 {
|
||||
|
|
@ -55,39 +55,49 @@ func NewService(opts Options) func(*core.Core) (any, error) {
|
|||
}
|
||||
svc := &Service{
|
||||
ServiceRuntime: core.NewServiceRuntime(c, opts),
|
||||
processes: make(map[string]*Process),
|
||||
managed: core.NewRegistry[*ManagedProcess](),
|
||||
bufSize: opts.BufferSize,
|
||||
}
|
||||
return svc, nil
|
||||
}
|
||||
}
|
||||
|
||||
// OnStartup implements core.Startable.
|
||||
func (s *Service) OnStartup(ctx context.Context) error {
|
||||
return nil
|
||||
// Register constructs a Service bound to the provided Core instance.
|
||||
//
|
||||
// c := core.New()
|
||||
// svc := process.Register(c).Value.(*process.Service)
|
||||
func Register(c *core.Core) core.Result {
|
||||
r := NewService(Options{BufferSize: DefaultBufferSize})(c)
|
||||
if r == nil {
|
||||
return core.Result{Value: core.E("process.register", "factory returned nil service", nil), OK: false}
|
||||
}
|
||||
|
||||
return core.Result{Value: r, OK: true}
|
||||
}
|
||||
|
||||
// OnShutdown implements core.Stoppable.
|
||||
// Gracefully shuts down all running processes (SIGTERM → SIGKILL).
|
||||
func (s *Service) OnShutdown(ctx context.Context) error {
|
||||
s.mu.RLock()
|
||||
procs := make([]*Process, 0, len(s.processes))
|
||||
for _, p := range s.processes {
|
||||
if p.IsRunning() {
|
||||
procs = append(procs, p)
|
||||
}
|
||||
}
|
||||
s.mu.RUnlock()
|
||||
// OnStartup implements core.Startable.
|
||||
func (s *Service) OnStartup(ctx context.Context) core.Result {
|
||||
c := s.Core()
|
||||
c.Action("process.run", s.handleRun)
|
||||
c.Action("process.start", s.handleStart)
|
||||
c.Action("process.kill", s.handleKill)
|
||||
c.Action("process.list", s.handleList)
|
||||
c.Action("process.get", s.handleGet)
|
||||
return core.Result{OK: true}
|
||||
}
|
||||
|
||||
for _, p := range procs {
|
||||
_ = p.Shutdown()
|
||||
}
|
||||
|
||||
return nil
|
||||
// OnShutdown implements core.Stoppable — kills all managed processes.
|
||||
func (s *Service) OnShutdown(ctx context.Context) core.Result {
|
||||
s.managed.Each(func(_ string, proc *ManagedProcess) {
|
||||
_ = proc.Kill()
|
||||
})
|
||||
return core.Result{OK: true}
|
||||
}
|
||||
|
||||
// Start spawns a new process with the given command and args.
|
||||
func (s *Service) Start(ctx context.Context, command string, args ...string) (*Process, error) {
|
||||
//
|
||||
// proc := svc.Start(ctx, "echo", "hello")
|
||||
func (s *Service) Start(ctx context.Context, command string, args ...string) (*ManagedProcess, error) {
|
||||
return s.StartWithOptions(ctx, RunOptions{
|
||||
Command: command,
|
||||
Args: args,
|
||||
|
|
@ -95,8 +105,22 @@ func (s *Service) Start(ctx context.Context, command string, args ...string) (*P
|
|||
}
|
||||
|
||||
// StartWithOptions spawns a process with full configuration.
|
||||
func (s *Service) StartWithOptions(ctx context.Context, opts RunOptions) (*Process, error) {
|
||||
id := fmt.Sprintf("proc-%d", s.idCounter.Add(1))
|
||||
//
|
||||
// proc := svc.StartWithOptions(ctx, process.RunOptions{Command: "go", Args: []string{"test", "./..."}})
|
||||
func (s *Service) StartWithOptions(ctx context.Context, opts RunOptions) (*ManagedProcess, error) {
|
||||
return startResultToProcess(s.startWithOptions(ctx, opts), "process.start")
|
||||
}
|
||||
|
||||
// startWithOptions is the Result-form internal implementation for StartWithOptions.
|
||||
func (s *Service) startWithOptions(ctx context.Context, opts RunOptions) core.Result {
|
||||
if opts.Command == "" {
|
||||
return core.Result{Value: core.E("process.start", "command is required", nil), OK: false}
|
||||
}
|
||||
if ctx == nil {
|
||||
ctx = context.Background()
|
||||
}
|
||||
|
||||
id := core.ID()
|
||||
|
||||
// Detached processes use Background context so they survive parent death
|
||||
parentCtx := ctx
|
||||
|
|
@ -104,7 +128,7 @@ func (s *Service) StartWithOptions(ctx context.Context, opts RunOptions) (*Proce
|
|||
parentCtx = context.Background()
|
||||
}
|
||||
procCtx, cancel := context.WithCancel(parentCtx)
|
||||
cmd := exec.CommandContext(procCtx, opts.Command, opts.Args...)
|
||||
cmd := execCommandContext(procCtx, opts.Command, opts.Args...)
|
||||
|
||||
if opts.Dir != "" {
|
||||
cmd.Dir = opts.Dir
|
||||
|
|
@ -113,42 +137,42 @@ func (s *Service) StartWithOptions(ctx context.Context, opts RunOptions) (*Proce
|
|||
cmd.Env = append(cmd.Environ(), opts.Env...)
|
||||
}
|
||||
|
||||
// Detached processes get their own process group
|
||||
// Detached processes get their own process group.
|
||||
if opts.Detach {
|
||||
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
|
||||
}
|
||||
|
||||
// Set up pipes
|
||||
// Set up pipes.
|
||||
stdout, err := cmd.StdoutPipe()
|
||||
if err != nil {
|
||||
cancel()
|
||||
return nil, coreerr.E("Service.StartWithOptions", "failed to create stdout pipe", err)
|
||||
return core.Result{Value: core.E("process.start", core.Concat("stdout pipe failed: ", opts.Command), err), OK: false}
|
||||
}
|
||||
|
||||
stderr, err := cmd.StderrPipe()
|
||||
if err != nil {
|
||||
cancel()
|
||||
return nil, coreerr.E("Service.StartWithOptions", "failed to create stderr pipe", err)
|
||||
return core.Result{Value: core.E("process.start", core.Concat("stderr pipe failed: ", opts.Command), err), OK: false}
|
||||
}
|
||||
|
||||
stdin, err := cmd.StdinPipe()
|
||||
if err != nil {
|
||||
cancel()
|
||||
return nil, coreerr.E("Service.StartWithOptions", "failed to create stdin pipe", err)
|
||||
return core.Result{Value: core.E("process.start", core.Concat("stdin pipe failed: ", opts.Command), err), OK: false}
|
||||
}
|
||||
|
||||
// Create output buffer (enabled by default)
|
||||
// Create output buffer (enabled by default).
|
||||
var output *RingBuffer
|
||||
if !opts.DisableCapture {
|
||||
output = NewRingBuffer(s.bufSize)
|
||||
}
|
||||
|
||||
proc := &Process{
|
||||
proc := &ManagedProcess{
|
||||
ID: id,
|
||||
Command: opts.Command,
|
||||
Args: opts.Args,
|
||||
Args: append([]string(nil), opts.Args...),
|
||||
Dir: opts.Dir,
|
||||
Env: opts.Env,
|
||||
Env: append([]string(nil), opts.Env...),
|
||||
StartedAt: time.Now(),
|
||||
Status: StatusRunning,
|
||||
cmd: cmd,
|
||||
|
|
@ -161,30 +185,32 @@ func (s *Service) StartWithOptions(ctx context.Context, opts RunOptions) (*Proce
|
|||
killGroup: opts.KillGroup && opts.Detach,
|
||||
}
|
||||
|
||||
// Start the process
|
||||
// Start the process.
|
||||
if err := cmd.Start(); err != nil {
|
||||
cancel()
|
||||
return nil, coreerr.E("Service.StartWithOptions", "failed to start process", err)
|
||||
return core.Result{Value: core.E("process.start", core.Concat("command failed: ", opts.Command), err), OK: false}
|
||||
}
|
||||
proc.PID = cmd.Process.Pid
|
||||
|
||||
// Store process.
|
||||
if r := s.managed.Set(id, proc); !r.OK {
|
||||
cancel()
|
||||
_ = cmd.Process.Kill()
|
||||
return r
|
||||
}
|
||||
|
||||
// Store process
|
||||
s.mu.Lock()
|
||||
s.processes[id] = proc
|
||||
s.mu.Unlock()
|
||||
|
||||
// Start timeout watchdog if configured
|
||||
// Start timeout watchdog if configured.
|
||||
if opts.Timeout > 0 {
|
||||
go func() {
|
||||
select {
|
||||
case <-proc.done:
|
||||
// Process exited before timeout
|
||||
case <-time.After(opts.Timeout):
|
||||
proc.Shutdown()
|
||||
_ = proc.Shutdown()
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Broadcast start
|
||||
// Broadcast start.
|
||||
_ = s.Core().ACTION(ActionProcessStarted{
|
||||
ID: id,
|
||||
Command: opts.Command,
|
||||
|
|
@ -193,7 +219,7 @@ func (s *Service) StartWithOptions(ctx context.Context, opts RunOptions) (*Proce
|
|||
PID: cmd.Process.Pid,
|
||||
})
|
||||
|
||||
// Stream output in goroutines
|
||||
// Stream output in goroutines.
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(2)
|
||||
go func() {
|
||||
|
|
@ -205,67 +231,54 @@ func (s *Service) StartWithOptions(ctx context.Context, opts RunOptions) (*Proce
|
|||
s.streamOutput(proc, stderr, StreamStderr)
|
||||
}()
|
||||
|
||||
// Wait for process completion
|
||||
// Wait for process completion.
|
||||
go func() {
|
||||
// Wait for output streaming to complete
|
||||
wg.Wait()
|
||||
|
||||
// Wait for process exit
|
||||
err := cmd.Wait()
|
||||
waitErr := cmd.Wait()
|
||||
|
||||
duration := time.Since(proc.StartedAt)
|
||||
status, exitCode, actionErr, killedSignal := classifyProcessExit(proc, waitErr)
|
||||
|
||||
proc.mu.Lock()
|
||||
proc.Duration = duration
|
||||
if err != nil {
|
||||
var exitErr *exec.ExitError
|
||||
if errors.As(err, &exitErr) {
|
||||
proc.ExitCode = exitErr.ExitCode()
|
||||
proc.Status = StatusExited
|
||||
} else {
|
||||
proc.Status = StatusFailed
|
||||
}
|
||||
} else {
|
||||
proc.ExitCode = 0
|
||||
proc.Status = StatusExited
|
||||
}
|
||||
status := proc.Status
|
||||
exitCode := proc.ExitCode
|
||||
proc.ExitCode = exitCode
|
||||
proc.Status = status
|
||||
proc.mu.Unlock()
|
||||
|
||||
close(proc.done)
|
||||
|
||||
// Broadcast exit
|
||||
var exitErr error
|
||||
if status == StatusFailed {
|
||||
exitErr = err
|
||||
if status == StatusKilled {
|
||||
_ = s.Core().ACTION(ActionProcessKilled{
|
||||
ID: id,
|
||||
Signal: killedSignal,
|
||||
})
|
||||
}
|
||||
_ = s.Core().ACTION(ActionProcessExited{
|
||||
ID: id,
|
||||
ExitCode: exitCode,
|
||||
Duration: duration,
|
||||
Error: exitErr,
|
||||
Error: actionErr,
|
||||
})
|
||||
}()
|
||||
|
||||
return proc, nil
|
||||
return core.Result{Value: proc, OK: true}
|
||||
}
|
||||
|
||||
// streamOutput reads from a pipe and broadcasts lines via ACTION.
|
||||
func (s *Service) streamOutput(proc *Process, r io.Reader, stream Stream) {
|
||||
func (s *Service) streamOutput(proc *ManagedProcess, r streamReader, stream Stream) {
|
||||
scanner := bufio.NewScanner(r)
|
||||
// Increase buffer for long lines
|
||||
// Increase buffer for long lines.
|
||||
scanner.Buffer(make([]byte, 64*1024), 1024*1024)
|
||||
|
||||
for scanner.Scan() {
|
||||
line := scanner.Text()
|
||||
|
||||
// Write to ring buffer
|
||||
// Write to ring buffer.
|
||||
if proc.output != nil {
|
||||
_, _ = proc.output.Write([]byte(line + "\n"))
|
||||
}
|
||||
|
||||
// Broadcast output
|
||||
// Broadcast output.
|
||||
_ = s.Core().ACTION(ActionProcessOutput{
|
||||
ID: proc.ID,
|
||||
Line: line,
|
||||
|
|
@ -275,40 +288,31 @@ func (s *Service) streamOutput(proc *Process, r io.Reader, stream Stream) {
|
|||
}
|
||||
|
||||
// Get returns a process by ID.
|
||||
func (s *Service) Get(id string) (*Process, error) {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
|
||||
proc, ok := s.processes[id]
|
||||
if !ok {
|
||||
func (s *Service) Get(id string) (*ManagedProcess, error) {
|
||||
r := s.managed.Get(id)
|
||||
if !r.OK {
|
||||
return nil, ErrProcessNotFound
|
||||
}
|
||||
return proc, nil
|
||||
return r.Value, nil
|
||||
}
|
||||
|
||||
// List returns all processes.
|
||||
func (s *Service) List() []*Process {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
|
||||
result := make([]*Process, 0, len(s.processes))
|
||||
for _, p := range s.processes {
|
||||
result = append(result, p)
|
||||
}
|
||||
func (s *Service) List() []*ManagedProcess {
|
||||
result := make([]*ManagedProcess, 0, s.managed.Len())
|
||||
s.managed.Each(func(_ string, proc *ManagedProcess) {
|
||||
result = append(result, proc)
|
||||
})
|
||||
return result
|
||||
}
|
||||
|
||||
// Running returns all currently running processes.
|
||||
func (s *Service) Running() []*Process {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
|
||||
var result []*Process
|
||||
for _, p := range s.processes {
|
||||
if p.IsRunning() {
|
||||
result = append(result, p)
|
||||
func (s *Service) Running() []*ManagedProcess {
|
||||
result := make([]*ManagedProcess, 0, s.managed.Len())
|
||||
s.managed.Each(func(_ string, proc *ManagedProcess) {
|
||||
if proc.IsRunning() {
|
||||
result = append(result, proc)
|
||||
}
|
||||
}
|
||||
})
|
||||
return result
|
||||
}
|
||||
|
||||
|
|
@ -322,42 +326,35 @@ func (s *Service) Kill(id string) error {
|
|||
if err := proc.Kill(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_ = s.Core().ACTION(ActionProcessKilled{
|
||||
ID: id,
|
||||
Signal: "SIGKILL",
|
||||
})
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Remove removes a completed process from the list.
|
||||
func (s *Service) Remove(id string) error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
proc, ok := s.processes[id]
|
||||
if !ok {
|
||||
proc, err := s.Get(id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if proc.IsRunning() {
|
||||
return core.E("process.remove", core.Concat("cannot remove running process: ", id), nil)
|
||||
}
|
||||
r := s.managed.Delete(id)
|
||||
if !r.OK {
|
||||
return ErrProcessNotFound
|
||||
}
|
||||
|
||||
if proc.IsRunning() {
|
||||
return coreerr.E("Service.Remove", "cannot remove running process", nil)
|
||||
}
|
||||
|
||||
delete(s.processes, id)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Clear removes all completed processes.
|
||||
func (s *Service) Clear() {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
for id, p := range s.processes {
|
||||
if !p.IsRunning() {
|
||||
delete(s.processes, id)
|
||||
ids := make([]string, 0)
|
||||
s.managed.Each(func(id string, proc *ManagedProcess) {
|
||||
if !proc.IsRunning() {
|
||||
ids = append(ids, id)
|
||||
}
|
||||
})
|
||||
for _, id := range ids {
|
||||
s.managed.Delete(id)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -371,34 +368,259 @@ func (s *Service) Output(id string) (string, error) {
|
|||
}
|
||||
|
||||
// Run executes a command and waits for completion.
|
||||
// Returns the combined output and any error.
|
||||
func (s *Service) Run(ctx context.Context, command string, args ...string) (string, error) {
|
||||
proc, err := s.Start(ctx, command, args...)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
<-proc.Done()
|
||||
|
||||
output := proc.Output()
|
||||
if proc.ExitCode != 0 {
|
||||
return output, coreerr.E("Service.Run", fmt.Sprintf("process exited with code %d", proc.ExitCode), nil)
|
||||
}
|
||||
return output, nil
|
||||
return s.RunWithOptions(ctx, RunOptions{
|
||||
Command: command,
|
||||
Args: args,
|
||||
})
|
||||
}
|
||||
|
||||
// RunWithOptions executes a command with options and waits for completion.
|
||||
func (s *Service) RunWithOptions(ctx context.Context, opts RunOptions) (string, error) {
|
||||
proc, err := s.StartWithOptions(ctx, opts)
|
||||
return runResultToString(s.runCommand(ctx, opts), "process.run")
|
||||
}
|
||||
|
||||
// --- Internal request helpers. ---
|
||||
|
||||
func (s *Service) handleRun(ctx context.Context, opts core.Options) core.Result {
|
||||
command := opts.String("command")
|
||||
if command == "" {
|
||||
return core.Result{Value: core.E("process.run", "command is required", nil), OK: false}
|
||||
}
|
||||
|
||||
runOpts := RunOptions{
|
||||
Command: command,
|
||||
Dir: opts.String("dir"),
|
||||
}
|
||||
if r := opts.Get("args"); r.OK {
|
||||
runOpts.Args = optionStrings(r.Value)
|
||||
}
|
||||
if r := opts.Get("env"); r.OK {
|
||||
runOpts.Env = optionStrings(r.Value)
|
||||
}
|
||||
|
||||
result, err := s.runCommand(ctx, runOpts)
|
||||
if err != nil {
|
||||
return core.Result{Value: err, OK: false}
|
||||
}
|
||||
return core.Result{Value: result, OK: true}
|
||||
}
|
||||
|
||||
func (s *Service) handleStart(ctx context.Context, opts core.Options) core.Result {
|
||||
command := opts.String("command")
|
||||
if command == "" {
|
||||
return core.Result{Value: core.E("process.start", "command is required", nil), OK: false}
|
||||
}
|
||||
|
||||
startOpts := RunOptions{
|
||||
Command: command,
|
||||
Dir: opts.String("dir"),
|
||||
Detach: opts.Bool("detach"),
|
||||
}
|
||||
if r := opts.Get("args"); r.OK {
|
||||
startOpts.Args = optionStrings(r.Value)
|
||||
}
|
||||
if r := opts.Get("env"); r.OK {
|
||||
startOpts.Env = optionStrings(r.Value)
|
||||
}
|
||||
|
||||
proc, err := s.StartWithOptions(ctx, startOpts)
|
||||
if err != nil {
|
||||
return core.Result{Value: err, OK: false}
|
||||
}
|
||||
return core.Result{Value: proc.ID, OK: true}
|
||||
}
|
||||
|
||||
func (s *Service) handleKill(ctx context.Context, opts core.Options) core.Result {
|
||||
id := opts.String("id")
|
||||
if id != "" {
|
||||
if err := s.Kill(id); err != nil {
|
||||
if errors.Is(err, ErrProcessNotFound) {
|
||||
return core.Result{Value: core.E("process.kill", core.Concat("not found: ", id), nil), OK: false}
|
||||
}
|
||||
return core.Result{Value: err, OK: false}
|
||||
}
|
||||
return core.Result{OK: true}
|
||||
}
|
||||
|
||||
pid := opts.Int("pid")
|
||||
if pid > 0 {
|
||||
proc, err := processHandle(pid)
|
||||
if err != nil {
|
||||
return core.Result{Value: core.E("process.kill", core.Concat("find pid failed: ", core.Sprintf("%d", pid)), err), OK: false}
|
||||
}
|
||||
if err := proc.Signal(syscall.SIGTERM); err != nil {
|
||||
return core.Result{Value: core.E("process.kill", core.Concat("signal failed: ", core.Sprintf("%d", pid)), err), OK: false}
|
||||
}
|
||||
return core.Result{OK: true}
|
||||
}
|
||||
|
||||
return core.Result{Value: core.E("process.kill", "need id or pid", nil), OK: false}
|
||||
}
|
||||
|
||||
func (s *Service) handleList(ctx context.Context, opts core.Options) core.Result {
|
||||
return core.Result{Value: s.managed.Names(), OK: true}
|
||||
}
|
||||
|
||||
func (s *Service) handleGet(ctx context.Context, opts core.Options) core.Result {
|
||||
id := opts.String("id")
|
||||
if id == "" {
|
||||
return core.Result{Value: core.E("process.get", "id is required", nil), OK: false}
|
||||
}
|
||||
proc, err := s.Get(id)
|
||||
if err != nil {
|
||||
return core.Result{Value: core.E("process.get", core.Concat("not found: ", id), err), OK: false}
|
||||
}
|
||||
return core.Result{Value: proc.Info(), OK: true}
|
||||
}
|
||||
|
||||
func (s *Service) runCommand(ctx context.Context, opts RunOptions) (string, error) {
|
||||
if opts.Command == "" {
|
||||
return "", core.E("process.run", "command is required", nil)
|
||||
}
|
||||
if ctx == nil {
|
||||
ctx = context.Background()
|
||||
}
|
||||
|
||||
cmd := execCommandContext(ctx, opts.Command, opts.Args...)
|
||||
if opts.Dir != "" {
|
||||
cmd.Dir = opts.Dir
|
||||
}
|
||||
if len(opts.Env) > 0 {
|
||||
cmd.Env = append(cmd.Environ(), opts.Env...)
|
||||
}
|
||||
|
||||
output, err := cmd.CombinedOutput()
|
||||
if err != nil {
|
||||
return "", core.E("process.run", core.Concat("command failed: ", opts.Command), err)
|
||||
}
|
||||
return string(output), nil
|
||||
}
|
||||
|
||||
func classifyProcessExit(proc *ManagedProcess, err error) (Status, int, error, string) {
|
||||
if err == nil {
|
||||
return StatusExited, 0, nil, ""
|
||||
}
|
||||
|
||||
if sig, ok := processExitSignal(err); ok {
|
||||
return StatusKilled, -1, err, normalizeSignalName(sig)
|
||||
}
|
||||
|
||||
if ctxErr := proc.ctx.Err(); ctxErr != nil {
|
||||
signal := proc.requestedSignal()
|
||||
if signal == "" {
|
||||
signal = "SIGKILL"
|
||||
}
|
||||
return StatusKilled, -1, ctxErr, signal
|
||||
}
|
||||
|
||||
var exitErr *exec.ExitError
|
||||
if errors.As(err, &exitErr) {
|
||||
return StatusExited, exitErr.ExitCode(), err, ""
|
||||
}
|
||||
|
||||
return StatusFailed, -1, err, ""
|
||||
}
|
||||
|
||||
func processExitSignal(err error) (syscall.Signal, bool) {
|
||||
var exitErr *exec.ExitError
|
||||
if !errors.As(err, &exitErr) || exitErr.ProcessState == nil {
|
||||
return 0, false
|
||||
}
|
||||
|
||||
waitStatus, ok := exitErr.ProcessState.Sys().(syscall.WaitStatus)
|
||||
if !ok || !waitStatus.Signaled() {
|
||||
return 0, false
|
||||
}
|
||||
return waitStatus.Signal(), true
|
||||
}
|
||||
|
||||
func startResultToProcess(r core.Result, operation string) (*ManagedProcess, error) {
|
||||
if r.OK {
|
||||
proc, ok := r.Value.(*ManagedProcess)
|
||||
if !ok {
|
||||
return nil, core.E(operation, "invalid process result type", nil)
|
||||
}
|
||||
return proc, nil
|
||||
}
|
||||
if err, ok := r.Value.(error); ok {
|
||||
return nil, err
|
||||
}
|
||||
return nil, core.E(operation, "process start failed", nil)
|
||||
}
|
||||
|
||||
func runResultToString(r core.Result, operation string) (string, error) {
|
||||
if r.OK {
|
||||
output, ok := r.Value.(string)
|
||||
if !ok {
|
||||
return "", core.E(operation, "invalid run result type", nil)
|
||||
}
|
||||
return output, nil
|
||||
}
|
||||
if err, ok := r.Value.(error); ok {
|
||||
return "", err
|
||||
}
|
||||
|
||||
<-proc.Done()
|
||||
|
||||
output := proc.Output()
|
||||
if proc.ExitCode != 0 {
|
||||
return output, coreerr.E("Service.RunWithOptions", fmt.Sprintf("process exited with code %d", proc.ExitCode), nil)
|
||||
}
|
||||
return output, nil
|
||||
return "", core.E(operation, "process run failed", nil)
|
||||
}
|
||||
|
||||
func normalizeSignalName(sig syscall.Signal) string {
|
||||
switch sig {
|
||||
case syscall.SIGINT:
|
||||
return "SIGINT"
|
||||
case syscall.SIGKILL:
|
||||
return "SIGKILL"
|
||||
case syscall.SIGTERM:
|
||||
return "SIGTERM"
|
||||
default:
|
||||
return sig.String()
|
||||
}
|
||||
}
|
||||
|
||||
func optionStrings(value any) []string {
|
||||
switch typed := value.(type) {
|
||||
case nil:
|
||||
return nil
|
||||
case []string:
|
||||
return append([]string(nil), typed...)
|
||||
case []any:
|
||||
result := make([]string, 0, len(typed))
|
||||
for _, item := range typed {
|
||||
text, ok := item.(string)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
result = append(result, text)
|
||||
}
|
||||
return result
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func execCommandContext(ctx context.Context, name string, args ...string) *exec.Cmd {
|
||||
return exec.CommandContext(ctx, name, args...)
|
||||
}
|
||||
|
||||
func execLookPath(name string) (string, error) {
|
||||
return exec.LookPath(name)
|
||||
}
|
||||
|
||||
func currentPID() int {
|
||||
return os.Getpid()
|
||||
}
|
||||
|
||||
func processHandle(pid int) (*os.Process, error) {
|
||||
return os.FindProcess(pid)
|
||||
}
|
||||
|
||||
func userHomeDir() (string, error) {
|
||||
return os.UserHomeDir()
|
||||
}
|
||||
|
||||
func tempDir() string {
|
||||
return os.TempDir()
|
||||
}
|
||||
|
||||
func isNotExist(err error) bool {
|
||||
return os.IsNotExist(err)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -374,8 +374,8 @@ func TestService_OnShutdown(t *testing.T) {
|
|||
assert.True(t, proc1.IsRunning())
|
||||
assert.True(t, proc2.IsRunning())
|
||||
|
||||
err = svc.OnShutdown(context.Background())
|
||||
assert.NoError(t, err)
|
||||
r := svc.OnShutdown(context.Background())
|
||||
assert.True(t, r.OK)
|
||||
|
||||
select {
|
||||
case <-proc1.Done():
|
||||
|
|
@ -393,8 +393,8 @@ func TestService_OnShutdown(t *testing.T) {
|
|||
func TestService_OnStartup(t *testing.T) {
|
||||
t.Run("returns nil", func(t *testing.T) {
|
||||
svc, _ := newTestService(t)
|
||||
err := svc.OnStartup(context.Background())
|
||||
assert.NoError(t, err)
|
||||
r := svc.OnStartup(context.Background())
|
||||
assert.True(t, r.OK)
|
||||
})
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue