diff --git a/daemon.go b/daemon.go index c06e8db..a60c4ef 100644 --- a/daemon.go +++ b/daemon.go @@ -5,7 +5,7 @@ import ( "sync" "time" - coreerr "dappco.re/go/core/log" + "dappco.re/go/core" ) // DaemonOptions configures daemon mode execution. @@ -77,7 +77,7 @@ func (d *Daemon) Start() error { defer d.mu.Unlock() if d.running { - return coreerr.E("Daemon.Start", "daemon already running", nil) + return core.E("daemon.start", "daemon already running", nil) } if d.pid != nil { @@ -105,7 +105,16 @@ func (d *Daemon) Start() error { entry.Health = d.health.Addr() } if err := d.opts.Registry.Register(entry); err != nil { - return coreerr.E("Daemon.Start", "registry", err) + if d.health != nil { + shutdownCtx, cancel := context.WithTimeout(context.Background(), d.opts.ShutdownTimeout) + _ = d.health.Stop(shutdownCtx) + cancel() + } + if d.pid != nil { + _ = d.pid.Release() + } + d.running = false + return core.E("daemon.start", "registry", err) } } @@ -117,7 +126,7 @@ func (d *Daemon) Run(ctx context.Context) error { d.mu.Lock() if !d.running { d.mu.Unlock() - return coreerr.E("Daemon.Run", "daemon not started - call Start() first", nil) + return core.E("daemon.run", "daemon not started - call Start() first", nil) } d.mu.Unlock() @@ -143,13 +152,13 @@ func (d *Daemon) Stop() error { if d.health != nil { d.health.SetReady(false) if err := d.health.Stop(shutdownCtx); err != nil { - errs = append(errs, coreerr.E("Daemon.Stop", "health server", err)) + errs = append(errs, core.E("daemon.stop", "health server", err)) } } if d.pid != nil { if err := d.pid.Release(); err != nil && !isNotExist(err) { - errs = append(errs, coreerr.E("Daemon.Stop", "pid file", err)) + errs = append(errs, core.E("daemon.stop", "pid file", err)) } } @@ -161,7 +170,7 @@ func (d *Daemon) Stop() error { d.running = false if len(errs) > 0 { - return coreerr.Join(errs...) + return core.ErrorJoin(errs...) } return nil } diff --git a/global_test.go b/global_test.go deleted file mode 100644 index d1bd44a..0000000 --- a/global_test.go +++ /dev/null @@ -1,261 +0,0 @@ -package process - -import ( - "context" - "sync" - "testing" - - framework "dappco.re/go/core" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func TestGlobal_NotInitialized_Bad(t *testing.T) { - old := defaultService.Swap(nil) - defer func() { - if old != nil { - defaultService.Store(old) - } - }() - - assert.Nil(t, Default()) - - r := Start(context.Background(), "echo", "test") - assert.False(t, r.OK) - - r = Run(context.Background(), "echo", "test") - assert.False(t, r.OK) - - _, err := Get("proc-1") - assert.ErrorIs(t, err, ErrServiceNotInitialized) - - assert.Nil(t, List()) - assert.Nil(t, Running()) - - err = Kill("proc-1") - assert.ErrorIs(t, err, ErrServiceNotInitialized) - - r = StartWithOptions(context.Background(), RunOptions{Command: "echo"}) - assert.False(t, r.OK) - - r = RunWithOptions(context.Background(), RunOptions{Command: "echo"}) - assert.False(t, r.OK) -} - -func newGlobalTestService(t *testing.T) *Service { - t.Helper() - c := framework.New() - factory := NewService(Options{}) - raw, err := factory(c) - require.NoError(t, err) - return raw.(*Service) -} - -func TestGlobal_SetDefault_Good(t *testing.T) { - t.Run("sets and retrieves service", func(t *testing.T) { - old := defaultService.Swap(nil) - defer func() { - if old != nil { - defaultService.Store(old) - } - }() - - svc := newGlobalTestService(t) - err := SetDefault(svc) - require.NoError(t, err) - assert.Equal(t, svc, Default()) - }) - - t.Run("errors on nil", func(t *testing.T) { - err := SetDefault(nil) - assert.Error(t, err) - }) -} - -func TestGlobal_DefaultConcurrent_Good(t *testing.T) { - old := defaultService.Swap(nil) - defer func() { - if old != nil { - defaultService.Store(old) - } - }() - - svc := newGlobalTestService(t) - err := SetDefault(svc) - require.NoError(t, err) - - var wg sync.WaitGroup - for i := 0; i < 100; i++ { - wg.Add(1) - go func() { - defer wg.Done() - s := Default() - assert.NotNil(t, s) - assert.Equal(t, svc, s) - }() - } - wg.Wait() -} - -func TestGlobal_SetDefaultConcurrent_Good(t *testing.T) { - old := defaultService.Swap(nil) - defer func() { - if old != nil { - defaultService.Store(old) - } - }() - - var services []*Service - for i := 0; i < 10; i++ { - svc := newGlobalTestService(t) - services = append(services, svc) - } - - var wg sync.WaitGroup - for _, svc := range services { - wg.Add(1) - go func(s *Service) { - defer wg.Done() - _ = SetDefault(s) - }(svc) - } - wg.Wait() - - final := Default() - assert.NotNil(t, final) - - found := false - for _, svc := range services { - if svc == final { - found = true - break - } - } - assert.True(t, found, "Default should be one of the set services") -} - -func TestGlobal_ConcurrentOps_Good(t *testing.T) { - old := defaultService.Swap(nil) - defer func() { - if old != nil { - defaultService.Store(old) - } - }() - - svc := newGlobalTestService(t) - err := SetDefault(svc) - require.NoError(t, err) - - var wg sync.WaitGroup - var processes []*Process - var procMu sync.Mutex - - for i := 0; i < 20; i++ { - wg.Add(1) - go func() { - defer wg.Done() - r := Start(context.Background(), "echo", "concurrent") - if r.OK { - procMu.Lock() - processes = append(processes, r.Value.(*Process)) - procMu.Unlock() - } - }() - } - - for i := 0; i < 10; i++ { - wg.Add(1) - go func() { - defer wg.Done() - _ = List() - _ = Running() - }() - } - - wg.Wait() - - procMu.Lock() - for _, p := range processes { - <-p.Done() - } - procMu.Unlock() - - assert.Len(t, processes, 20) - - var wg2 sync.WaitGroup - for _, p := range processes { - wg2.Add(1) - go func(id string) { - defer wg2.Done() - got, err := Get(id) - assert.NoError(t, err) - assert.NotNil(t, got) - }(p.ID) - } - wg2.Wait() -} - -func TestGlobal_StartWithOptions_Good(t *testing.T) { - svc, _ := newTestService(t) - old := defaultService.Swap(svc) - defer func() { - if old != nil { - defaultService.Store(old) - } - }() - - r := StartWithOptions(context.Background(), RunOptions{ - Command: "echo", - Args: []string{"with", "options"}, - }) - require.True(t, r.OK) - proc := r.Value.(*Process) - - <-proc.Done() - assert.Equal(t, 0, proc.ExitCode) - assert.Contains(t, proc.Output(), "with options") -} - -func TestGlobal_RunWithOptions_Good(t *testing.T) { - svc, _ := newTestService(t) - old := defaultService.Swap(svc) - defer func() { - if old != nil { - defaultService.Store(old) - } - }() - - r := RunWithOptions(context.Background(), RunOptions{ - Command: "echo", - Args: []string{"run", "options"}, - }) - assert.True(t, r.OK) - assert.Contains(t, r.Value.(string), "run options") -} - -func TestGlobal_Running_Good(t *testing.T) { - svc, _ := newTestService(t) - old := defaultService.Swap(svc) - defer func() { - if old != nil { - defaultService.Store(old) - } - }() - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - r := Start(ctx, "sleep", "60") - require.True(t, r.OK) - proc := r.Value.(*Process) - - running := Running() - assert.Len(t, running, 1) - assert.Equal(t, proc.ID, running[0].ID) - - cancel() - <-proc.Done() - - running = Running() - assert.Len(t, running, 0) -} diff --git a/go.mod b/go.mod index cedb725..21177fe 100644 --- a/go.mod +++ b/go.mod @@ -3,9 +3,8 @@ module dappco.re/go/core/process go 1.26.0 require ( - dappco.re/go/core v0.5.0 + dappco.re/go/core v0.8.0-alpha.1 dappco.re/go/core/io v0.2.0 - dappco.re/go/core/log v0.1.0 dappco.re/go/core/ws v0.3.0 forge.lthn.ai/core/api v0.1.5 github.com/gin-gonic/gin v1.12.0 @@ -13,6 +12,7 @@ require ( ) require ( + dappco.re/go/core/log v0.1.0 // indirect forge.lthn.ai/core/go-io v0.1.5 // indirect forge.lthn.ai/core/go-log v0.0.4 // indirect github.com/99designs/gqlgen v0.17.88 // indirect diff --git a/go.sum b/go.sum index 3fc5cfc..5cf7b04 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,5 @@ -dappco.re/go/core v0.5.0 h1:P5DJoaCiK5Q+af5UiTdWqUIW4W4qYKzpgGK50thm21U= -dappco.re/go/core v0.5.0/go.mod h1:f2/tBZ3+3IqDrg2F5F598llv0nmb/4gJVCFzM5geE4A= +dappco.re/go/core v0.8.0-alpha.1 h1:gj7+Scv+L63Z7wMxbJYHhaRFkHJo2u4MMPuUSv/Dhtk= +dappco.re/go/core v0.8.0-alpha.1/go.mod h1:f2/tBZ3+3IqDrg2F5F598llv0nmb/4gJVCFzM5geE4A= dappco.re/go/core/io v0.2.0 h1:zuudgIiTsQQ5ipVt97saWdGLROovbEB/zdVyy9/l+I4= dappco.re/go/core/io v0.2.0/go.mod h1:1QnQV6X9LNgFKfm8SkOtR9LLaj3bDcsOIeJOOyjbL5E= dappco.re/go/core/log v0.1.0 h1:pa71Vq2TD2aoEUQWFKwNcaJ3GBY8HbaNGqtE688Unyc= diff --git a/health.go b/health.go index e52dd94..00093ed 100644 --- a/health.go +++ b/health.go @@ -8,7 +8,6 @@ import ( "time" "dappco.re/go/core" - coreerr "dappco.re/go/core/log" ) // HealthCheck is a function that returns nil if healthy. @@ -90,7 +89,7 @@ func (h *HealthServer) Start() error { listener, err := net.Listen("tcp", h.addr) if err != nil { - return coreerr.E("HealthServer.Start", "failed to listen on "+h.addr, err) + return core.E("health.start", core.Concat("failed to listen on ", h.addr), err) } h.listener = listener diff --git a/pidfile.go b/pidfile.go index 4ac94f2..6db566f 100644 --- a/pidfile.go +++ b/pidfile.go @@ -7,8 +7,8 @@ import ( "sync" "syscall" + "dappco.re/go/core" coreio "dappco.re/go/core/io" - coreerr "dappco.re/go/core/log" ) // PIDFile manages a process ID file for single-instance enforcement. @@ -33,7 +33,7 @@ func (p *PIDFile) Acquire() error { if err == nil && pid > 0 { if proc, err := processHandle(pid); err == nil { if err := proc.Signal(syscall.Signal(0)); err == nil { - return coreerr.E("PIDFile.Acquire", "another instance is running (PID "+strconv.Itoa(pid)+")", nil) + return core.E("pidfile.acquire", core.Concat("another instance is running (PID ", strconv.Itoa(pid), ")"), nil) } } } @@ -42,13 +42,13 @@ func (p *PIDFile) Acquire() error { if dir := path.Dir(p.path); dir != "." { if err := coreio.Local.EnsureDir(dir); err != nil { - return coreerr.E("PIDFile.Acquire", "failed to create PID directory", err) + return core.E("pidfile.acquire", "failed to create PID directory", err) } } pid := currentPID() if err := coreio.Local.Write(p.path, strconv.Itoa(pid)); err != nil { - return coreerr.E("PIDFile.Acquire", "failed to write PID file", err) + return core.E("pidfile.acquire", "failed to write PID file", err) } return nil @@ -59,7 +59,7 @@ func (p *PIDFile) Release() error { p.mu.Lock() defer p.mu.Unlock() if err := coreio.Local.Delete(p.path); err != nil { - return coreerr.E("PIDFile.Release", "failed to remove PID file", err) + return core.E("pidfile.release", "failed to remove PID file", err) } return nil } diff --git a/process.go b/process.go index c1240c4..ad0748e 100644 --- a/process.go +++ b/process.go @@ -7,7 +7,7 @@ import ( "syscall" "time" - coreerr "dappco.re/go/core/log" + "dappco.re/go/core" ) type processStdin interface { @@ -15,8 +15,8 @@ type processStdin interface { Close() error } -// Process represents a managed external process. -type Process struct { +// ManagedProcess represents a tracked external process started by the service. +type ManagedProcess struct { ID string Command string Args []string @@ -38,8 +38,11 @@ type Process struct { killGroup bool } +// Process is kept as a compatibility alias for ManagedProcess. +type Process = ManagedProcess + // Info returns a snapshot of process state. -func (p *Process) Info() Info { +func (p *ManagedProcess) Info() ProcessInfo { p.mu.RLock() defer p.mu.RUnlock() @@ -48,12 +51,13 @@ func (p *Process) Info() Info { pid = p.cmd.Process.Pid } - return Info{ + return ProcessInfo{ ID: p.ID, Command: p.Command, Args: p.Args, Dir: p.Dir, StartedAt: p.StartedAt, + Running: p.Status == StatusRunning, Status: p.Status, ExitCode: p.ExitCode, Duration: p.Duration, @@ -62,7 +66,7 @@ func (p *Process) Info() Info { } // Output returns the captured output as a string. -func (p *Process) Output() string { +func (p *ManagedProcess) Output() string { p.mu.RLock() defer p.mu.RUnlock() if p.output == nil { @@ -72,7 +76,7 @@ func (p *Process) Output() string { } // OutputBytes returns the captured output as bytes. -func (p *Process) OutputBytes() []byte { +func (p *ManagedProcess) OutputBytes() []byte { p.mu.RLock() defer p.mu.RUnlock() if p.output == nil { @@ -82,37 +86,40 @@ func (p *Process) OutputBytes() []byte { } // IsRunning returns true if the process is still executing. -func (p *Process) IsRunning() bool { - p.mu.RLock() - defer p.mu.RUnlock() - return p.Status == StatusRunning +func (p *ManagedProcess) IsRunning() bool { + select { + case <-p.done: + return false + default: + return true + } } // Wait blocks until the process exits. -func (p *Process) Wait() error { +func (p *ManagedProcess) Wait() error { <-p.done p.mu.RLock() defer p.mu.RUnlock() if p.Status == StatusFailed { - return coreerr.E("Process.Wait", "process failed to start: "+p.ID, nil) + return core.E("process.wait", core.Concat("process failed to start: ", p.ID), nil) } if p.Status == StatusKilled { - return coreerr.E("Process.Wait", "process was killed: "+p.ID, nil) + return core.E("process.wait", core.Concat("process was killed: ", p.ID), nil) } if p.ExitCode != 0 { - return coreerr.E("Process.Wait", "process exited with code "+strconv.Itoa(p.ExitCode), nil) + return core.E("process.wait", core.Concat("process exited with code ", strconv.Itoa(p.ExitCode)), nil) } return nil } // Done returns a channel that closes when the process exits. -func (p *Process) Done() <-chan struct{} { +func (p *ManagedProcess) Done() <-chan struct{} { return p.done } // Kill forcefully terminates the process. // If KillGroup is set, kills the entire process group. -func (p *Process) Kill() error { +func (p *ManagedProcess) Kill() error { p.mu.Lock() defer p.mu.Unlock() @@ -134,7 +141,7 @@ func (p *Process) Kill() error { // Shutdown gracefully stops the process: SIGTERM, then SIGKILL after grace period. // If GracePeriod was not set (zero), falls back to immediate Kill(). // If KillGroup is set, signals are sent to the entire process group. -func (p *Process) Shutdown() error { +func (p *ManagedProcess) Shutdown() error { p.mu.RLock() grace := p.gracePeriod p.mu.RUnlock() @@ -158,7 +165,7 @@ func (p *Process) Shutdown() error { } // terminate sends SIGTERM to the process (or process group if KillGroup is set). -func (p *Process) terminate() error { +func (p *ManagedProcess) terminate() error { p.mu.Lock() defer p.mu.Unlock() @@ -178,7 +185,7 @@ func (p *Process) terminate() error { } // SendInput writes to the process stdin. -func (p *Process) SendInput(input string) error { +func (p *ManagedProcess) SendInput(input string) error { p.mu.RLock() defer p.mu.RUnlock() @@ -195,7 +202,7 @@ func (p *Process) SendInput(input string) error { } // CloseStdin closes the process stdin pipe. -func (p *Process) CloseStdin() error { +func (p *ManagedProcess) CloseStdin() error { p.mu.Lock() defer p.mu.Unlock() diff --git a/process_global.go b/process_global.go deleted file mode 100644 index dc30b2e..0000000 --- a/process_global.go +++ /dev/null @@ -1,130 +0,0 @@ -package process - -import ( - "context" - "sync" - "sync/atomic" - - "dappco.re/go/core" - coreerr "dappco.re/go/core/log" -) - -// Global default service (follows i18n pattern). -var ( - defaultService atomic.Pointer[Service] - defaultOnce sync.Once - defaultErr error -) - -// Default returns the global process service. -// Returns nil if not initialized. -func Default() *Service { - return defaultService.Load() -} - -// SetDefault sets the global process service. -// Thread-safe: can be called concurrently with Default(). -func SetDefault(s *Service) error { - if s == nil { - return ErrSetDefaultNil - } - defaultService.Store(s) - return nil -} - -// Init initializes the default global service with a Core instance. -// This is typically called during application startup. -func Init(c *core.Core) error { - defaultOnce.Do(func() { - factory := NewService(Options{}) - svc, err := factory(c) - if err != nil { - defaultErr = err - return - } - defaultService.Store(svc.(*Service)) - }) - return defaultErr -} - -// --- Global convenience functions --- - -// Start spawns a new process using the default service. -func Start(ctx context.Context, command string, args ...string) core.Result { - svc := Default() - if svc == nil { - return core.Result{OK: false} - } - return svc.Start(ctx, command, args...) -} - -// Run executes a command and waits for completion using the default service. -func Run(ctx context.Context, command string, args ...string) core.Result { - svc := Default() - if svc == nil { - return core.Result{Value: "", OK: false} - } - return svc.Run(ctx, command, args...) -} - -// Get returns a process by ID from the default service. -func Get(id string) (*Process, error) { - svc := Default() - if svc == nil { - return nil, ErrServiceNotInitialized - } - return svc.Get(id) -} - -// List returns all processes from the default service. -func List() []*Process { - svc := Default() - if svc == nil { - return nil - } - return svc.List() -} - -// Kill terminates a process by ID using the default service. -func Kill(id string) error { - svc := Default() - if svc == nil { - return ErrServiceNotInitialized - } - return svc.Kill(id) -} - -// StartWithOptions spawns a process with full configuration using the default service. -func StartWithOptions(ctx context.Context, opts RunOptions) core.Result { - svc := Default() - if svc == nil { - return core.Result{OK: false} - } - return svc.StartWithOptions(ctx, opts) -} - -// RunWithOptions executes a command with options and waits using the default service. -func RunWithOptions(ctx context.Context, opts RunOptions) core.Result { - svc := Default() - if svc == nil { - return core.Result{Value: "", OK: false} - } - return svc.RunWithOptions(ctx, opts) -} - -// Running returns all currently running processes from the default service. -func Running() []*Process { - svc := Default() - if svc == nil { - return nil - } - return svc.Running() -} - -// Errors -var ( - // ErrServiceNotInitialized is returned when the service is not initialized. - ErrServiceNotInitialized = coreerr.E("", "process: service not initialized; call process.Init(core) first", nil) - // ErrSetDefaultNil is returned when SetDefault is called with nil. - ErrSetDefaultNil = coreerr.E("", "process: SetDefault called with nil service", nil) -) diff --git a/program.go b/program.go index 97ce842..ab40876 100644 --- a/program.go +++ b/program.go @@ -5,12 +5,12 @@ import ( "context" "strconv" - coreerr "dappco.re/go/core/log" + "dappco.re/go/core" ) // ErrProgramNotFound is returned when Find cannot locate the binary on PATH. // Callers may use core.Is to detect this condition. -var ErrProgramNotFound = coreerr.E("", "program: binary not found in PATH", nil) +var ErrProgramNotFound = core.E("", "program: binary not found in PATH", nil) // Program represents a named executable located on the system PATH. // Create one with a Name, call Find to resolve its path, then Run or RunDir. @@ -30,11 +30,11 @@ type Program struct { // err := p.Find() func (p *Program) Find() error { if p.Name == "" { - return coreerr.E("Program.Find", "program name is empty", nil) + return core.E("program.find", "program name is empty", nil) } path, err := execLookPath(p.Name) if err != nil { - return coreerr.E("Program.Find", strconv.Quote(p.Name)+": not found in PATH", ErrProgramNotFound) + return core.E("program.find", core.Concat(strconv.Quote(p.Name), ": not found in PATH"), ErrProgramNotFound) } p.Path = path return nil @@ -68,7 +68,7 @@ func (p *Program) RunDir(ctx context.Context, dir string, args ...string) (strin } if err := cmd.Run(); err != nil { - return string(bytes.TrimSpace(out.Bytes())), coreerr.E("Program.RunDir", strconv.Quote(p.Name)+": command failed", err) + return string(bytes.TrimSpace(out.Bytes())), core.E("program.run", core.Concat(strconv.Quote(p.Name), ": command failed"), err) } return string(bytes.TrimSpace(out.Bytes())), nil } diff --git a/registry.go b/registry.go index 8740a89..e5f96e0 100644 --- a/registry.go +++ b/registry.go @@ -8,7 +8,6 @@ import ( "dappco.re/go/core" coreio "dappco.re/go/core/io" - coreerr "dappco.re/go/core/log" ) // DaemonEntry records a running daemon in the registry. @@ -58,16 +57,16 @@ func (r *Registry) Register(entry DaemonEntry) error { } if err := coreio.Local.EnsureDir(r.dir); err != nil { - return coreerr.E("Registry.Register", "failed to create registry directory", err) + return core.E("registry.register", "failed to create registry directory", err) } data, err := marshalDaemonEntry(entry) if err != nil { - return coreerr.E("Registry.Register", "failed to marshal entry", err) + return core.E("registry.register", "failed to marshal entry", err) } if err := coreio.Local.Write(r.entryPath(entry.Code, entry.Daemon), data); err != nil { - return coreerr.E("Registry.Register", "failed to write entry file", err) + return core.E("registry.register", "failed to write entry file", err) } return nil } @@ -75,7 +74,7 @@ func (r *Registry) Register(entry DaemonEntry) error { // Unregister removes a daemon entry from the registry. func (r *Registry) Unregister(code, daemon string) error { if err := coreio.Local.Delete(r.entryPath(code, daemon)); err != nil { - return coreerr.E("Registry.Unregister", "failed to delete entry file", err) + return core.E("registry.unregister", "failed to delete entry file", err) } return nil } @@ -112,7 +111,7 @@ func (r *Registry) List() ([]DaemonEntry, error) { entries, err := coreio.Local.List(r.dir) if err != nil { - return nil, coreerr.E("Registry.List", "failed to list registry directory", err) + return nil, core.E("registry.list", "failed to list registry directory", err) } var alive []DaemonEntry diff --git a/runner.go b/runner.go index 80fec17..d6cb443 100644 --- a/runner.go +++ b/runner.go @@ -5,7 +5,7 @@ import ( "sync" "time" - coreerr "dappco.re/go/core/log" + "dappco.re/go/core" ) // Runner orchestrates multiple processes with dependencies. @@ -105,7 +105,7 @@ func (r *Runner) RunAll(ctx context.Context, specs []RunSpec) (*RunAllResult, er Name: name, Spec: remaining[name], ExitCode: 1, - Error: coreerr.E("Runner.RunAll", "circular dependency or missing dependency", nil), + Error: core.E("runner.run_all", "circular dependency or missing dependency", nil), }) } break @@ -137,7 +137,7 @@ func (r *Runner) RunAll(ctx context.Context, specs []RunSpec) (*RunAllResult, er Name: spec.Name, Spec: spec, Skipped: true, - Error: coreerr.E("Runner.RunAll", "skipped due to dependency failure", nil), + Error: core.E("runner.run_all", "skipped due to dependency failure", nil), } } else { result = r.runSpec(ctx, spec) @@ -200,10 +200,15 @@ func (r *Runner) runSpec(ctx context.Context, spec RunSpec) RunResult { Env: spec.Env, }) if !sr.OK { + err, _ := sr.Value.(error) + if err == nil { + err = core.E("runner.run_spec", core.Concat("failed to start: ", spec.Name), nil) + } return RunResult{ Name: spec.Name, Spec: spec, Duration: time.Since(start), + Error: err, } } diff --git a/runner_test.go b/runner_test.go index efdde44..0afa3ba 100644 --- a/runner_test.go +++ b/runner_test.go @@ -13,11 +13,9 @@ func newTestRunner(t *testing.T) *Runner { t.Helper() c := framework.New() - factory := NewService(Options{}) - raw, err := factory(c) - require.NoError(t, err) - - return NewRunner(raw.(*Service)) + r := Register(c) + require.True(t, r.OK) + return NewRunner(r.Value.(*Service)) } func TestRunner_RunSequential_Good(t *testing.T) { diff --git a/service.go b/service.go index 1b66c33..619e225 100644 --- a/service.go +++ b/service.go @@ -5,14 +5,11 @@ import ( "context" "os" "os/exec" - "strconv" "sync" - "sync/atomic" "syscall" "time" "dappco.re/go/core" - coreerr "dappco.re/go/core/log" ) type execCmd = exec.Cmd @@ -26,19 +23,17 @@ const DefaultBufferSize = 1024 * 1024 // Errors var ( - ErrProcessNotFound = coreerr.E("", "process not found", nil) - ErrProcessNotRunning = coreerr.E("", "process is not running", nil) - ErrStdinNotAvailable = coreerr.E("", "stdin not available", nil) + ErrProcessNotFound = core.E("", "process not found", nil) + ErrProcessNotRunning = core.E("", "process is not running", nil) + ErrStdinNotAvailable = core.E("", "stdin not available", nil) ) // Service manages process execution with Core IPC integration. type Service struct { *core.ServiceRuntime[Options] - processes map[string]*Process - mu sync.RWMutex - bufSize int - idCounter atomic.Uint64 + managed *core.Registry[*ManagedProcess] + bufSize int } // Options configures the process service. @@ -53,40 +48,23 @@ type Options struct { // c := core.New() // svc := process.Register(c).Value.(*process.Service) func Register(c *core.Core) core.Result { + opts := Options{BufferSize: DefaultBufferSize} svc := &Service{ - ServiceRuntime: core.NewServiceRuntime(c, Options{BufferSize: DefaultBufferSize}), - processes: make(map[string]*Process), - bufSize: DefaultBufferSize, + ServiceRuntime: core.NewServiceRuntime(c, opts), + managed: core.NewRegistry[*ManagedProcess](), + bufSize: opts.BufferSize, } return core.Result{Value: svc, OK: true} } -// NewService creates a process service factory for Core registration. -// Deprecated: Use Register(c) to construct a Service directly. -// -// c := core.New() -// factory := process.NewService(process.Options{}) -// raw, err := factory(c) -// if err != nil { -// return nil, err -// } -// svc := raw.(*process.Service) -func NewService(opts Options) func(*core.Core) (any, error) { - return func(c *core.Core) (any, error) { - if opts.BufferSize == 0 { - opts.BufferSize = DefaultBufferSize - } - svc := &Service{ - ServiceRuntime: core.NewServiceRuntime(c, opts), - processes: make(map[string]*Process), - bufSize: opts.BufferSize, - } - return svc, nil - } -} - // OnStartup implements core.Startable. func (s *Service) OnStartup(ctx context.Context) core.Result { + c := s.Core() + c.Action("process.run", s.handleRun) + c.Action("process.start", s.handleStart) + c.Action("process.kill", s.handleKill) + c.Action("process.list", s.handleList) + c.Action("process.get", s.handleGet) return core.Result{OK: true} } @@ -94,19 +72,9 @@ func (s *Service) OnStartup(ctx context.Context) core.Result { // // c.ServiceShutdown(ctx) // calls OnShutdown on all Stoppable services func (s *Service) OnShutdown(ctx context.Context) core.Result { - s.mu.RLock() - procs := make([]*Process, 0, len(s.processes)) - for _, p := range s.processes { - if p.IsRunning() { - procs = append(procs, p) - } - } - s.mu.RUnlock() - - for _, p := range procs { - _ = p.Shutdown() - } - + s.managed.Each(func(_ string, proc *ManagedProcess) { + _ = proc.Kill() + }) return core.Result{OK: true} } @@ -126,7 +94,14 @@ func (s *Service) Start(ctx context.Context, command string, args ...string) cor // r := svc.StartWithOptions(ctx, process.RunOptions{Command: "go", Args: []string{"test", "./..."}}) // if r.OK { proc := r.Value.(*Process) } func (s *Service) StartWithOptions(ctx context.Context, opts RunOptions) core.Result { - id := s.nextProcessID() + if opts.Command == "" { + return core.Result{Value: core.E("process.start", "command is required", nil), OK: false} + } + if ctx == nil { + ctx = context.Background() + } + + id := core.ID() // Detached processes use Background context so they survive parent death parentCtx := ctx @@ -152,19 +127,19 @@ func (s *Service) StartWithOptions(ctx context.Context, opts RunOptions) core.Re stdout, err := cmd.StdoutPipe() if err != nil { cancel() - return core.Result{OK: false} + return core.Result{Value: core.E("process.start", core.Concat("stdout pipe failed: ", opts.Command), err), OK: false} } stderr, err := cmd.StderrPipe() if err != nil { cancel() - return core.Result{OK: false} + return core.Result{Value: core.E("process.start", core.Concat("stderr pipe failed: ", opts.Command), err), OK: false} } stdin, err := cmd.StdinPipe() if err != nil { cancel() - return core.Result{OK: false} + return core.Result{Value: core.E("process.start", core.Concat("stdin pipe failed: ", opts.Command), err), OK: false} } // Create output buffer (enabled by default) @@ -173,7 +148,7 @@ func (s *Service) StartWithOptions(ctx context.Context, opts RunOptions) core.Re output = NewRingBuffer(s.bufSize) } - proc := &Process{ + proc := &ManagedProcess{ ID: id, Command: opts.Command, Args: opts.Args, @@ -194,13 +169,15 @@ func (s *Service) StartWithOptions(ctx context.Context, opts RunOptions) core.Re // Start the process if err := cmd.Start(); err != nil { cancel() - return core.Result{OK: false} + return core.Result{Value: core.E("process.start", core.Concat("command failed: ", opts.Command), err), OK: false} } // Store process - s.mu.Lock() - s.processes[id] = proc - s.mu.Unlock() + if r := s.managed.Set(id, proc); !r.OK { + cancel() + _ = cmd.Process.Kill() + return r + } // Start timeout watchdog if configured if opts.Timeout > 0 { @@ -273,7 +250,7 @@ func (s *Service) StartWithOptions(ctx context.Context, opts RunOptions) core.Re } // streamOutput reads from a pipe and broadcasts lines via ACTION. -func (s *Service) streamOutput(proc *Process, r streamReader, stream Stream) { +func (s *Service) streamOutput(proc *ManagedProcess, r streamReader, stream Stream) { scanner := bufio.NewScanner(r) // Increase buffer for long lines scanner.Buffer(make([]byte, 64*1024), 1024*1024) @@ -296,40 +273,31 @@ func (s *Service) streamOutput(proc *Process, r streamReader, stream Stream) { } // Get returns a process by ID. -func (s *Service) Get(id string) (*Process, error) { - s.mu.RLock() - defer s.mu.RUnlock() - - proc, ok := s.processes[id] - if !ok { +func (s *Service) Get(id string) (*ManagedProcess, error) { + r := s.managed.Get(id) + if !r.OK { return nil, ErrProcessNotFound } - return proc, nil + return r.Value.(*ManagedProcess), nil } // List returns all processes. -func (s *Service) List() []*Process { - s.mu.RLock() - defer s.mu.RUnlock() - - result := make([]*Process, 0, len(s.processes)) - for _, p := range s.processes { - result = append(result, p) - } +func (s *Service) List() []*ManagedProcess { + result := make([]*ManagedProcess, 0, s.managed.Len()) + s.managed.Each(func(_ string, proc *ManagedProcess) { + result = append(result, proc) + }) return result } // Running returns all currently running processes. -func (s *Service) Running() []*Process { - s.mu.RLock() - defer s.mu.RUnlock() - - var result []*Process - for _, p := range s.processes { - if p.IsRunning() { - result = append(result, p) +func (s *Service) Running() []*ManagedProcess { + result := make([]*ManagedProcess, 0, s.managed.Len()) + s.managed.Each(func(_ string, proc *ManagedProcess) { + if proc.IsRunning() { + result = append(result, proc) } - } + }) return result } @@ -354,31 +322,30 @@ func (s *Service) Kill(id string) error { // Remove removes a completed process from the list. func (s *Service) Remove(id string) error { - s.mu.Lock() - defer s.mu.Unlock() - - proc, ok := s.processes[id] - if !ok { + proc, err := s.Get(id) + if err != nil { + return err + } + if proc.IsRunning() { + return core.E("process.remove", core.Concat("cannot remove running process: ", id), nil) + } + r := s.managed.Delete(id) + if !r.OK { return ErrProcessNotFound } - - if proc.IsRunning() { - return coreerr.E("Service.Remove", "cannot remove running process", nil) - } - - delete(s.processes, id) return nil } // Clear removes all completed processes. func (s *Service) Clear() { - s.mu.Lock() - defer s.mu.Unlock() - - for id, p := range s.processes { - if !p.IsRunning() { - delete(s.processes, id) + ids := make([]string, 0) + s.managed.Each(func(id string, proc *ManagedProcess) { + if !proc.IsRunning() { + ids = append(ids, id) } + }) + for _, id := range ids { + s.managed.Delete(id) } } @@ -397,15 +364,10 @@ func (s *Service) Output(id string) (string, error) { // r := svc.Run(ctx, "go", "test", "./...") // output := r.Value.(string) func (s *Service) Run(ctx context.Context, command string, args ...string) core.Result { - r := s.Start(ctx, command, args...) - if !r.OK { - return core.Result{Value: "", OK: false} - } - - proc := r.Value.(*Process) - <-proc.Done() - - return core.Result{Value: proc.Output(), OK: proc.ExitCode == 0} + return s.RunWithOptions(ctx, RunOptions{ + Command: command, + Args: args, + }) } // RunWithOptions executes a command with options and waits for completion. @@ -414,15 +376,7 @@ func (s *Service) Run(ctx context.Context, command string, args ...string) core. // r := svc.RunWithOptions(ctx, process.RunOptions{Command: "go", Args: []string{"test"}}) // output := r.Value.(string) func (s *Service) RunWithOptions(ctx context.Context, opts RunOptions) core.Result { - r := s.StartWithOptions(ctx, opts) - if !r.OK { - return core.Result{Value: "", OK: false} - } - - proc := r.Value.(*Process) - <-proc.Done() - - return core.Result{Value: proc.Output(), OK: proc.ExitCode == 0} + return s.runCommand(ctx, opts) } // --- Internal Request Helpers --- @@ -430,7 +384,7 @@ func (s *Service) RunWithOptions(ctx context.Context, opts RunOptions) core.Resu func (s *Service) handleRun(ctx context.Context, opts core.Options) core.Result { command := opts.String("command") if command == "" { - return core.Result{Value: coreerr.E("process.run", "command is required", nil), OK: false} + return core.Result{Value: core.E("process.run", "command is required", nil), OK: false} } runOpts := RunOptions{ @@ -448,56 +402,94 @@ func (s *Service) handleRun(ctx context.Context, opts core.Options) core.Result } } - return s.RunWithOptions(ctx, runOpts) + return s.runCommand(ctx, runOpts) } func (s *Service) handleStart(ctx context.Context, opts core.Options) core.Result { command := opts.String("command") if command == "" { - return core.Result{Value: coreerr.E("process.start", "command is required", nil), OK: false} + return core.Result{Value: core.E("process.start", "command is required", nil), OK: false} } runOpts := RunOptions{ Command: command, Dir: opts.String("dir"), + Detach: true, } if r := opts.Get("args"); r.OK { if args, ok := r.Value.([]string); ok { runOpts.Args = args } } + if r := opts.Get("env"); r.OK { + if env, ok := r.Value.([]string); ok { + runOpts.Env = env + } + } r := s.StartWithOptions(ctx, runOpts) if !r.OK { return r } - return core.Result{Value: r.Value.(*Process).ID, OK: true} + return core.Result{Value: r.Value.(*ManagedProcess).ID, OK: true} } func (s *Service) handleKill(ctx context.Context, opts core.Options) core.Result { id := opts.String("id") if id != "" { if err := s.Kill(id); err != nil { + if core.Is(err, ErrProcessNotFound) { + return core.Result{Value: core.E("process.kill", core.Concat("not found: ", id), nil), OK: false} + } return core.Result{Value: err, OK: false} } return core.Result{OK: true} } - return core.Result{Value: coreerr.E("process.kill", "id is required", nil), OK: false} + + pid := opts.Int("pid") + if pid > 0 { + proc, err := processHandle(pid) + if err != nil { + return core.Result{Value: core.E("process.kill", core.Concat("find pid failed: ", core.Sprintf("%d", pid)), err), OK: false} + } + if err := proc.Signal(syscall.SIGTERM); err != nil { + return core.Result{Value: core.E("process.kill", core.Concat("signal failed: ", core.Sprintf("%d", pid)), err), OK: false} + } + return core.Result{OK: true} + } + + return core.Result{Value: core.E("process.kill", "need id or pid", nil), OK: false} } func (s *Service) handleList(ctx context.Context, opts core.Options) core.Result { - s.mu.RLock() - defer s.mu.RUnlock() + return core.Result{Value: s.managed.Names(), OK: true} +} - ids := make([]string, 0, len(s.processes)) - for id := range s.processes { - ids = append(ids, id) +func (s *Service) runCommand(ctx context.Context, opts RunOptions) core.Result { + if opts.Command == "" { + return core.Result{Value: core.E("process.run", "command is required", nil), OK: false} } - return core.Result{Value: ids, OK: true} + if ctx == nil { + ctx = context.Background() + } + + cmd := execCommandContext(ctx, opts.Command, opts.Args...) + if opts.Dir != "" { + cmd.Dir = opts.Dir + } + if len(opts.Env) > 0 { + cmd.Env = append(cmd.Environ(), opts.Env...) + } + + output, err := cmd.CombinedOutput() + if err != nil { + return core.Result{Value: core.E("process.run", core.Concat("command failed: ", opts.Command), err), OK: false} + } + return core.Result{Value: string(output), OK: true} } // Signal sends a signal to the process. -func (p *Process) Signal(sig os.Signal) error { +func (p *ManagedProcess) Signal(sig os.Signal) error { p.mu.Lock() defer p.mu.Unlock() @@ -542,13 +534,12 @@ func isNotExist(err error) bool { func (s *Service) handleGet(ctx context.Context, opts core.Options) core.Result { id := opts.String("id") + if id == "" { + return core.Result{Value: core.E("process.get", "id is required", nil), OK: false} + } proc, err := s.Get(id) if err != nil { - return core.Result{Value: err, OK: false} + return core.Result{Value: core.E("process.get", core.Concat("not found: ", id), err), OK: false} } return core.Result{Value: proc.Info(), OK: true} } - -func (s *Service) nextProcessID() string { - return "proc-" + strconv.FormatUint(s.idCounter.Add(1), 10) -} diff --git a/service_test.go b/service_test.go index 7bd12b8..d6a2ece 100644 --- a/service_test.go +++ b/service_test.go @@ -15,12 +15,183 @@ func newTestService(t *testing.T) (*Service, *framework.Core) { t.Helper() c := framework.New() - factory := NewService(Options{BufferSize: 1024}) - raw, err := factory(c) + r := Register(c) + require.True(t, r.OK) + return r.Value.(*Service), c +} + +func newStartedTestService(t *testing.T) (*Service, *framework.Core) { + t.Helper() + + svc, c := newTestService(t) + r := svc.OnStartup(context.Background()) + require.True(t, r.OK) + return svc, c +} + +func TestService_Register_Good(t *testing.T) { + c := framework.New(framework.WithService(Register)) + + svc, ok := framework.ServiceFor[*Service](c, "process") + require.True(t, ok) + assert.NotNil(t, svc) +} + +func TestService_OnStartup_Good(t *testing.T) { + svc, c := newTestService(t) + + r := svc.OnStartup(context.Background()) + require.True(t, r.OK) + + assert.True(t, c.Action("process.run").Exists()) + assert.True(t, c.Action("process.start").Exists()) + assert.True(t, c.Action("process.kill").Exists()) + assert.True(t, c.Action("process.list").Exists()) + assert.True(t, c.Action("process.get").Exists()) +} + +func TestService_HandleRun_Good(t *testing.T) { + _, c := newStartedTestService(t) + + r := c.Action("process.run").Run(context.Background(), framework.NewOptions( + framework.Option{Key: "command", Value: "echo"}, + framework.Option{Key: "args", Value: []string{"hello"}}, + )) + require.True(t, r.OK) + assert.Contains(t, r.Value.(string), "hello") +} + +func TestService_HandleRun_Bad(t *testing.T) { + _, c := newStartedTestService(t) + + r := c.Action("process.run").Run(context.Background(), framework.NewOptions( + framework.Option{Key: "command", Value: "nonexistent_command_xyz"}, + )) + assert.False(t, r.OK) +} + +func TestService_HandleRun_Ugly(t *testing.T) { + _, c := newStartedTestService(t) + + ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) + defer cancel() + + r := c.Action("process.run").Run(ctx, framework.NewOptions( + framework.Option{Key: "command", Value: "sleep"}, + framework.Option{Key: "args", Value: []string{"1"}}, + )) + assert.False(t, r.OK) +} + +func TestService_HandleStart_Good(t *testing.T) { + svc, c := newStartedTestService(t) + + r := c.Action("process.start").Run(context.Background(), framework.NewOptions( + framework.Option{Key: "command", Value: "sleep"}, + framework.Option{Key: "args", Value: []string{"60"}}, + )) + require.True(t, r.OK) + + id := r.Value.(string) + proc, err := svc.Get(id) + require.NoError(t, err) + assert.True(t, proc.IsRunning()) + + kill := c.Action("process.kill").Run(context.Background(), framework.NewOptions( + framework.Option{Key: "id", Value: id}, + )) + require.True(t, kill.OK) + <-proc.Done() +} + +func TestService_HandleStart_Bad(t *testing.T) { + _, c := newStartedTestService(t) + + r := c.Action("process.start").Run(context.Background(), framework.NewOptions( + framework.Option{Key: "command", Value: "nonexistent_command_xyz"}, + )) + assert.False(t, r.OK) +} + +func TestService_HandleKill_Good(t *testing.T) { + svc, c := newStartedTestService(t) + + start := c.Action("process.start").Run(context.Background(), framework.NewOptions( + framework.Option{Key: "command", Value: "sleep"}, + framework.Option{Key: "args", Value: []string{"60"}}, + )) + require.True(t, start.OK) + + id := start.Value.(string) + proc, err := svc.Get(id) require.NoError(t, err) - svc := raw.(*Service) - return svc, c + kill := c.Action("process.kill").Run(context.Background(), framework.NewOptions( + framework.Option{Key: "id", Value: id}, + )) + require.True(t, kill.OK) + + select { + case <-proc.Done(): + case <-time.After(2 * time.Second): + t.Fatal("process should have been killed") + } +} + +func TestService_HandleKill_Bad(t *testing.T) { + _, c := newStartedTestService(t) + + r := c.Action("process.kill").Run(context.Background(), framework.NewOptions( + framework.Option{Key: "id", Value: "missing"}, + )) + assert.False(t, r.OK) +} + +func TestService_HandleList_Good(t *testing.T) { + svc, c := newStartedTestService(t) + + startOne := c.Action("process.start").Run(context.Background(), framework.NewOptions( + framework.Option{Key: "command", Value: "sleep"}, + framework.Option{Key: "args", Value: []string{"60"}}, + )) + require.True(t, startOne.OK) + startTwo := c.Action("process.start").Run(context.Background(), framework.NewOptions( + framework.Option{Key: "command", Value: "sleep"}, + framework.Option{Key: "args", Value: []string{"60"}}, + )) + require.True(t, startTwo.OK) + + r := c.Action("process.list").Run(context.Background(), framework.NewOptions()) + require.True(t, r.OK) + + ids := r.Value.([]string) + assert.Len(t, ids, 2) + + for _, id := range ids { + proc, err := svc.Get(id) + require.NoError(t, err) + _ = proc.Kill() + <-proc.Done() + } +} + +func TestService_Ugly_PermissionModel(t *testing.T) { + c := framework.New() + + r := c.Process().Run(context.Background(), "echo", "blocked") + assert.False(t, r.OK) + + c = framework.New(framework.WithService(Register)) + startup := c.ServiceStartup(context.Background(), nil) + require.True(t, startup.OK) + defer func() { + shutdown := c.ServiceShutdown(context.Background()) + assert.True(t, shutdown.OK) + }() + + r = c.Process().Run(context.Background(), "echo", "allowed") + require.True(t, r.OK) + assert.Contains(t, r.Value.(string), "allowed") } func startProc(t *testing.T, svc *Service, ctx context.Context, command string, args ...string) *Process { @@ -171,12 +342,7 @@ func TestService_Run_Good(t *testing.T) { func TestService_Actions_Good(t *testing.T) { t.Run("broadcasts events", func(t *testing.T) { - c := framework.New() - - factory := NewService(Options{}) - raw, err := factory(c) - require.NoError(t, err) - svc := raw.(*Service) + svc, c := newTestService(t) var started []ActionProcessStarted var outputs []ActionProcessOutput @@ -381,14 +547,6 @@ func TestService_OnShutdown_Good(t *testing.T) { }) } -func TestService_OnStartup_Good(t *testing.T) { - t.Run("returns ok", func(t *testing.T) { - svc, _ := newTestService(t) - r := svc.OnStartup(context.Background()) - assert.True(t, r.OK) - }) -} - func TestService_RunWithOptions_Good(t *testing.T) { t.Run("returns output on success", func(t *testing.T) { svc, _ := newTestService(t) diff --git a/types.go b/types.go index 0fe5651..822d858 100644 --- a/types.go +++ b/types.go @@ -5,16 +5,11 @@ // // # Getting Started // -// c := core.New() -// factory := process.NewService(process.Options{}) -// raw, err := factory(c) -// if err != nil { -// return err -// } +// c := core.New(core.WithService(process.Register)) +// _ = c.ServiceStartup(ctx, nil) // -// svc := raw.(*process.Service) -// r := svc.Start(ctx, "go", "test", "./...") -// proc := r.Value.(*process.Process) +// r := c.Process().Run(ctx, "go", "test", "./...") +// output := r.Value.(string) // // # Listening for Events // @@ -90,15 +85,19 @@ type RunOptions struct { KillGroup bool } -// Info provides a snapshot of process state without internal fields. -type Info struct { +// ProcessInfo provides a snapshot of process state without internal fields. +type ProcessInfo struct { ID string `json:"id"` Command string `json:"command"` Args []string `json:"args"` Dir string `json:"dir"` StartedAt time.Time `json:"startedAt"` + Running bool `json:"running"` Status Status `json:"status"` ExitCode int `json:"exitCode"` Duration time.Duration `json:"duration"` PID int `json:"pid"` } + +// Info is kept as a compatibility alias for ProcessInfo. +type Info = ProcessInfo