diff --git a/CLAUDE.md b/CLAUDE.md index c134314..75db9f9 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -46,13 +46,19 @@ Builder-pattern wrapper around `os/exec` with structured logging via a pluggable ## Key Patterns - **Core integration**: `Service` embeds `*core.ServiceRuntime[Options]` and uses `s.Core().ACTION(...)` to broadcast typed action messages. Tests create a Core instance via `framework.New(framework.WithName("process", NewService(...)))`. -- **Output capture**: All process output goes through a fixed-size `RingBuffer` (default 1MB). Oldest data is silently overwritten when full. +- **Output capture**: All process output goes through a fixed-size `RingBuffer` (default 1MB). Oldest data is silently overwritten when full. Set `RunOptions.DisableCapture` to skip buffering for long-running processes where output is only streamed via IPC. - **Process lifecycle**: Status transitions are `StatusPending → StatusRunning → StatusExited|StatusFailed|StatusKilled`. The `done` channel closes on exit; use `<-proc.Done()` or `proc.Wait()`. +- **Detach / process group isolation**: Set `RunOptions.Detach = true` to run the subprocess in its own process group (`Setpgid`). Detached processes use `context.Background()` so they survive parent context cancellation and parent death. +- **Graceful shutdown**: `Service.OnShutdown` kills all running processes. `Daemon.Stop()` performs ordered teardown: sets health to not-ready → shuts down health server → releases PID file → unregisters from registry. `DaemonOptions.ShutdownTimeout` (default 30 s) bounds the shutdown context. +- **Auto-registration**: Pass a `Registry` and `RegistryEntry` in `DaemonOptions` to automatically register the daemon on `Start()` and unregister on `Stop()`. - **PID liveness checks**: Both `PIDFile` and `Registry` use `proc.Signal(syscall.Signal(0))` to check if a PID is alive before trusting stored state. +- **Error handling**: All errors MUST use `coreerr.E()` from `go-log` (imported as `coreerr`), never `fmt.Errorf` or `errors.New`. Sentinel errors are package-level vars created with `coreerr.E("", "message", nil)`. ## Dependencies - `forge.lthn.ai/core/go/pkg/core` — Core DI framework, IPC actions, `ServiceRuntime` +- `forge.lthn.ai/core/go-log` — Structured error constructor (`coreerr.E()`) +- `forge.lthn.ai/core/go-io` — Filesystem abstraction (`coreio.Local`) used by PIDFile and Registry - `github.com/stretchr/testify` — test assertions (require/assert) ## Testing diff --git a/daemon_test.go b/daemon_test.go index 6d958ff..4e641d4 100644 --- a/daemon_test.go +++ b/daemon_test.go @@ -93,6 +93,47 @@ func TestDaemon_DefaultShutdownTimeout(t *testing.T) { assert.Equal(t, 30*time.Second, d.opts.ShutdownTimeout) } +func TestDaemon_RunBlocksUntilCancelled(t *testing.T) { + d := NewDaemon(DaemonOptions{ + HealthAddr: "127.0.0.1:0", + }) + + err := d.Start() + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + + done := make(chan error, 1) + go func() { + done <- d.Run(ctx) + }() + + // Run should be blocking + select { + case <-done: + t.Fatal("Run should block until context is cancelled") + case <-time.After(50 * time.Millisecond): + // Expected — still blocking + } + + cancel() + + select { + case err := <-done: + assert.NoError(t, err) + case <-time.After(2 * time.Second): + t.Fatal("Run should return after context cancellation") + } +} + +func TestDaemon_StopIdempotent(t *testing.T) { + d := NewDaemon(DaemonOptions{}) + + // Stop without Start should be a no-op + err := d.Stop() + assert.NoError(t, err) +} + func TestDaemon_AutoRegisters(t *testing.T) { dir := t.TempDir() reg := NewRegistry(filepath.Join(dir, "daemons")) diff --git a/exec/exec_test.go b/exec/exec_test.go index 6984e9a..a636c51 100644 --- a/exec/exec_test.go +++ b/exec/exec_test.go @@ -146,3 +146,67 @@ func TestCommand_UsesDefaultLogger(t *testing.T) { t.Errorf("expected default logger to receive 1 debug call, got %d", len(logger.debugCalls)) } } + +func TestCommand_WithDir(t *testing.T) { + ctx := context.Background() + out, err := exec.Command(ctx, "pwd"). + WithDir("/tmp"). + WithLogger(&mockLogger{}). + Output() + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + trimmed := strings.TrimSpace(string(out)) + if trimmed != "/tmp" && trimmed != "/private/tmp" { + t.Errorf("expected /tmp or /private/tmp, got %q", trimmed) + } +} + +func TestCommand_WithEnv(t *testing.T) { + ctx := context.Background() + out, err := exec.Command(ctx, "sh", "-c", "echo $TEST_EXEC_VAR"). + WithEnv([]string{"TEST_EXEC_VAR=exec_val"}). + WithLogger(&mockLogger{}). + Output() + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if strings.TrimSpace(string(out)) != "exec_val" { + t.Errorf("expected 'exec_val', got %q", string(out)) + } +} + +func TestCommand_WithStdinStdoutStderr(t *testing.T) { + ctx := context.Background() + input := strings.NewReader("piped input\n") + var stdout, stderr strings.Builder + + err := exec.Command(ctx, "cat"). + WithStdin(input). + WithStdout(&stdout). + WithStderr(&stderr). + WithLogger(&mockLogger{}). + Run() + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if strings.TrimSpace(stdout.String()) != "piped input" { + t.Errorf("expected 'piped input', got %q", stdout.String()) + } +} + +func TestRunQuiet_Good(t *testing.T) { + ctx := context.Background() + err := exec.RunQuiet(ctx, "echo", "quiet") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } +} + +func TestRunQuiet_Bad(t *testing.T) { + ctx := context.Background() + err := exec.RunQuiet(ctx, "sh", "-c", "echo fail >&2; exit 1") + if err == nil { + t.Fatal("expected error") + } +} diff --git a/pidfile.go b/pidfile.go index 240c44c..8f62843 100644 --- a/pidfile.go +++ b/pidfile.go @@ -60,7 +60,10 @@ func (p *PIDFile) Acquire() error { func (p *PIDFile) Release() error { p.mu.Lock() defer p.mu.Unlock() - return coreio.Local.Delete(p.path) + if err := coreio.Local.Delete(p.path); err != nil { + return coreerr.E("PIDFile.Release", "failed to remove PID file", err) + } + return nil } // Path returns the PID file path. diff --git a/process_global.go b/process_global.go index 162b8d8..c09c6ba 100644 --- a/process_global.go +++ b/process_global.go @@ -6,6 +6,7 @@ import ( "sync/atomic" "forge.lthn.ai/core/go/pkg/core" + coreerr "forge.lthn.ai/core/go-log" ) // Global default service (follows i18n pattern). @@ -25,7 +26,7 @@ func Default() *Service { // Thread-safe: can be called concurrently with Default(). func SetDefault(s *Service) error { if s == nil { - return &ServiceError{msg: "process: SetDefault called with nil service"} + return ErrSetDefaultNil } defaultService.Store(s) return nil @@ -120,15 +121,10 @@ func Running() []*Process { return svc.Running() } -// ErrServiceNotInitialized is returned when the service is not initialized. -var ErrServiceNotInitialized = &ServiceError{msg: "process: service not initialized; call process.Init(core) first"} - -// ServiceError represents a service-level error. -type ServiceError struct { - msg string -} - -// Error returns the service error message. -func (e *ServiceError) Error() string { - return e.msg -} +// Errors +var ( + // ErrServiceNotInitialized is returned when the service is not initialized. + ErrServiceNotInitialized = coreerr.E("", "process: service not initialized; call process.Init(core) first", nil) + // ErrSetDefaultNil is returned when SetDefault is called with nil. + ErrSetDefaultNil = coreerr.E("", "process: SetDefault called with nil service", nil) +) diff --git a/process_test.go b/process_test.go index 8bf7bf7..cafb900 100644 --- a/process_test.go +++ b/process_test.go @@ -2,6 +2,7 @@ package process import ( "context" + "os" "testing" "time" @@ -189,6 +190,39 @@ func TestProcess_SendInput(t *testing.T) { }) } +func TestProcess_Signal(t *testing.T) { + t.Run("sends signal to running process", func(t *testing.T) { + svc, _ := newTestService(t) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + proc, err := svc.Start(ctx, "sleep", "60") + require.NoError(t, err) + + err = proc.Signal(os.Interrupt) + assert.NoError(t, err) + + select { + case <-proc.Done(): + // Process terminated by signal + case <-time.After(2 * time.Second): + t.Fatal("process should have been terminated by signal") + } + }) + + t.Run("error on completed process", func(t *testing.T) { + svc, _ := newTestService(t) + + proc, err := svc.Start(context.Background(), "echo", "done") + require.NoError(t, err) + <-proc.Done() + + err = proc.Signal(os.Interrupt) + assert.ErrorIs(t, err, ErrProcessNotRunning) + }) +} + func TestProcess_CloseStdin(t *testing.T) { t.Run("closes stdin pipe", func(t *testing.T) { svc, _ := newTestService(t) diff --git a/registry.go b/registry.go index fca2399..347035f 100644 --- a/registry.go +++ b/registry.go @@ -9,6 +9,7 @@ import ( "time" coreio "forge.lthn.ai/core/go-io" + coreerr "forge.lthn.ai/core/go-log" ) // DaemonEntry records a running daemon in the registry. @@ -50,20 +51,26 @@ func (r *Registry) Register(entry DaemonEntry) error { } if err := coreio.Local.EnsureDir(r.dir); err != nil { - return err + return coreerr.E("Registry.Register", "failed to create registry directory", err) } data, err := json.MarshalIndent(entry, "", " ") if err != nil { - return err + return coreerr.E("Registry.Register", "failed to marshal entry", err) } - return coreio.Local.Write(r.entryPath(entry.Code, entry.Daemon), string(data)) + if err := coreio.Local.Write(r.entryPath(entry.Code, entry.Daemon), string(data)); err != nil { + return coreerr.E("Registry.Register", "failed to write entry file", err) + } + return nil } // Unregister removes a daemon entry from the registry. func (r *Registry) Unregister(code, daemon string) error { - return coreio.Local.Delete(r.entryPath(code, daemon)) + if err := coreio.Local.Delete(r.entryPath(code, daemon)); err != nil { + return coreerr.E("Registry.Unregister", "failed to delete entry file", err) + } + return nil } // Get reads a single daemon entry and checks whether its process is alive. diff --git a/service_test.go b/service_test.go index dc8c85b..7627ce3 100644 --- a/service_test.go +++ b/service_test.go @@ -98,6 +98,60 @@ func TestService_Start(t *testing.T) { t.Fatal("process should have been killed") } }) + + t.Run("disable capture", func(t *testing.T) { + svc, _ := newTestService(t) + + proc, err := svc.StartWithOptions(context.Background(), RunOptions{ + Command: "echo", + Args: []string{"no-capture"}, + DisableCapture: true, + }) + require.NoError(t, err) + <-proc.Done() + + assert.Equal(t, StatusExited, proc.Status) + assert.Equal(t, "", proc.Output(), "output should be empty when capture is disabled") + }) + + t.Run("with environment variables", func(t *testing.T) { + svc, _ := newTestService(t) + + proc, err := svc.StartWithOptions(context.Background(), RunOptions{ + Command: "sh", + Args: []string{"-c", "echo $MY_TEST_VAR"}, + Env: []string{"MY_TEST_VAR=hello_env"}, + }) + require.NoError(t, err) + <-proc.Done() + + assert.Contains(t, proc.Output(), "hello_env") + }) + + t.Run("detach survives parent context", func(t *testing.T) { + svc, _ := newTestService(t) + + ctx, cancel := context.WithCancel(context.Background()) + + proc, err := svc.StartWithOptions(ctx, RunOptions{ + Command: "echo", + Args: []string{"detached"}, + Detach: true, + }) + require.NoError(t, err) + + // Cancel the parent context + cancel() + + // Detached process should still complete normally + select { + case <-proc.Done(): + assert.Equal(t, StatusExited, proc.Status) + assert.Equal(t, 0, proc.ExitCode) + case <-time.After(2 * time.Second): + t.Fatal("detached process should have completed") + } + }) } func TestService_Run(t *testing.T) { @@ -255,3 +309,139 @@ func TestService_Clear(t *testing.T) { assert.Len(t, svc.List(), 0) }) } + +func TestService_Kill(t *testing.T) { + t.Run("kills running process", func(t *testing.T) { + svc, _ := newTestService(t) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + proc, err := svc.Start(ctx, "sleep", "60") + require.NoError(t, err) + + err = svc.Kill(proc.ID) + assert.NoError(t, err) + + select { + case <-proc.Done(): + // Process killed successfully + case <-time.After(2 * time.Second): + t.Fatal("process should have been killed") + } + }) + + t.Run("error on unknown id", func(t *testing.T) { + svc, _ := newTestService(t) + + err := svc.Kill("nonexistent") + assert.ErrorIs(t, err, ErrProcessNotFound) + }) +} + +func TestService_Output(t *testing.T) { + t.Run("returns captured output", func(t *testing.T) { + svc, _ := newTestService(t) + + proc, err := svc.Start(context.Background(), "echo", "captured") + require.NoError(t, err) + <-proc.Done() + + output, err := svc.Output(proc.ID) + require.NoError(t, err) + assert.Contains(t, output, "captured") + }) + + t.Run("error on unknown id", func(t *testing.T) { + svc, _ := newTestService(t) + + _, err := svc.Output("nonexistent") + assert.ErrorIs(t, err, ErrProcessNotFound) + }) +} + +func TestService_OnShutdown(t *testing.T) { + t.Run("kills all running processes", func(t *testing.T) { + svc, _ := newTestService(t) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + proc1, err := svc.Start(ctx, "sleep", "60") + require.NoError(t, err) + proc2, err := svc.Start(ctx, "sleep", "60") + require.NoError(t, err) + + assert.True(t, proc1.IsRunning()) + assert.True(t, proc2.IsRunning()) + + err = svc.OnShutdown(context.Background()) + assert.NoError(t, err) + + select { + case <-proc1.Done(): + case <-time.After(2 * time.Second): + t.Fatal("proc1 should have been killed") + } + select { + case <-proc2.Done(): + case <-time.After(2 * time.Second): + t.Fatal("proc2 should have been killed") + } + }) +} + +func TestService_OnStartup(t *testing.T) { + t.Run("returns nil", func(t *testing.T) { + svc, _ := newTestService(t) + err := svc.OnStartup(context.Background()) + assert.NoError(t, err) + }) +} + +func TestService_RunWithOptions(t *testing.T) { + t.Run("returns output on success", func(t *testing.T) { + svc, _ := newTestService(t) + + output, err := svc.RunWithOptions(context.Background(), RunOptions{ + Command: "echo", + Args: []string{"opts-test"}, + }) + require.NoError(t, err) + assert.Contains(t, output, "opts-test") + }) + + t.Run("returns error on failure", func(t *testing.T) { + svc, _ := newTestService(t) + + _, err := svc.RunWithOptions(context.Background(), RunOptions{ + Command: "sh", + Args: []string{"-c", "exit 2"}, + }) + assert.Error(t, err) + assert.Contains(t, err.Error(), "exited with code 2") + }) +} + +func TestService_Running(t *testing.T) { + t.Run("returns only running processes", func(t *testing.T) { + svc, _ := newTestService(t) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + proc1, err := svc.Start(ctx, "sleep", "60") + require.NoError(t, err) + + proc2, err := svc.Start(context.Background(), "echo", "done") + require.NoError(t, err) + <-proc2.Done() + + running := svc.Running() + assert.Len(t, running, 1) + assert.Equal(t, proc1.ID, running[0].ID) + + cancel() + <-proc1.Done() + }) +}