fix(ax): complete Agent Experience service alignment

This commit is contained in:
Virgil 2026-03-30 00:45:36 +00:00
parent e75cb1fc97
commit 8a85c3cd86
25 changed files with 1141 additions and 688 deletions

View file

@ -4,6 +4,8 @@ import "sync"
// RingBuffer is a fixed-size circular buffer that overwrites old data.
// Thread-safe for concurrent reads and writes.
//
// rb := process.NewRingBuffer(1024)
type RingBuffer struct {
data []byte
size int
@ -14,6 +16,8 @@ type RingBuffer struct {
}
// NewRingBuffer creates a ring buffer with the given capacity.
//
// rb := process.NewRingBuffer(256)
func NewRingBuffer(size int) *RingBuffer {
return &RingBuffer{
data: make([]byte, size),

View file

@ -6,7 +6,7 @@ import (
"github.com/stretchr/testify/assert"
)
func TestRingBuffer(t *testing.T) {
func TestRingBuffer_Basics_Good(t *testing.T) {
t.Run("write and read", func(t *testing.T) {
rb := NewRingBuffer(10)

View file

@ -2,15 +2,15 @@ package process
import (
"context"
"errors"
"os"
"sync"
"time"
coreerr "dappco.re/go/core/log"
"dappco.re/go/core"
)
// DaemonOptions configures daemon mode execution.
//
// opts := process.DaemonOptions{PIDFile: "/tmp/process.pid", HealthAddr: "127.0.0.1:0"}
type DaemonOptions struct {
// PIDFile path for single-instance enforcement.
// Leave empty to skip PID file management.
@ -37,6 +37,8 @@ type DaemonOptions struct {
}
// Daemon manages daemon lifecycle: PID file, health server, graceful shutdown.
//
// daemon := process.NewDaemon(process.DaemonOptions{HealthAddr: "127.0.0.1:0"})
type Daemon struct {
opts DaemonOptions
pid *PIDFile
@ -46,6 +48,8 @@ type Daemon struct {
}
// NewDaemon creates a daemon runner with the given options.
//
// daemon := process.NewDaemon(process.DaemonOptions{PIDFile: "/tmp/process.pid"})
func NewDaemon(opts DaemonOptions) *Daemon {
if opts.ShutdownTimeout == 0 {
opts.ShutdownTimeout = 30 * time.Second
@ -73,7 +77,7 @@ func (d *Daemon) Start() error {
defer d.mu.Unlock()
if d.running {
return coreerr.E("Daemon.Start", "daemon already running", nil)
return core.E("daemon.start", "daemon already running", nil)
}
if d.pid != nil {
@ -96,12 +100,21 @@ func (d *Daemon) Start() error {
// Auto-register if registry is set
if d.opts.Registry != nil {
entry := d.opts.RegistryEntry
entry.PID = os.Getpid()
entry.PID = currentPID()
if d.health != nil {
entry.Health = d.health.Addr()
}
if err := d.opts.Registry.Register(entry); err != nil {
return coreerr.E("Daemon.Start", "registry", err)
if d.health != nil {
shutdownCtx, cancel := context.WithTimeout(context.Background(), d.opts.ShutdownTimeout)
_ = d.health.Stop(shutdownCtx)
cancel()
}
if d.pid != nil {
_ = d.pid.Release()
}
d.running = false
return core.E("daemon.start", "registry", err)
}
}
@ -113,7 +126,7 @@ func (d *Daemon) Run(ctx context.Context) error {
d.mu.Lock()
if !d.running {
d.mu.Unlock()
return coreerr.E("Daemon.Run", "daemon not started - call Start() first", nil)
return core.E("daemon.run", "daemon not started - call Start() first", nil)
}
d.mu.Unlock()
@ -139,13 +152,13 @@ func (d *Daemon) Stop() error {
if d.health != nil {
d.health.SetReady(false)
if err := d.health.Stop(shutdownCtx); err != nil {
errs = append(errs, coreerr.E("Daemon.Stop", "health server", err))
errs = append(errs, core.E("daemon.stop", "health server", err))
}
}
if d.pid != nil {
if err := d.pid.Release(); err != nil && !os.IsNotExist(err) {
errs = append(errs, coreerr.E("Daemon.Stop", "pid file", err))
if err := d.pid.Release(); err != nil && !isNotExist(err) {
errs = append(errs, core.E("daemon.stop", "pid file", err))
}
}
@ -157,7 +170,7 @@ func (d *Daemon) Stop() error {
d.running = false
if len(errs) > 0 {
return errors.Join(errs...)
return core.ErrorJoin(errs...)
}
return nil
}

View file

@ -4,16 +4,16 @@ import (
"context"
"net/http"
"os"
"path/filepath"
"testing"
"time"
"dappco.re/go/core"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestDaemon_StartAndStop(t *testing.T) {
pidPath := filepath.Join(t.TempDir(), "test.pid")
func TestDaemon_Lifecycle_Good(t *testing.T) {
pidPath := core.JoinPath(t.TempDir(), "test.pid")
d := NewDaemon(DaemonOptions{
PIDFile: pidPath,
@ -36,7 +36,7 @@ func TestDaemon_StartAndStop(t *testing.T) {
require.NoError(t, err)
}
func TestDaemon_DoubleStartFails(t *testing.T) {
func TestDaemon_AlreadyRunning_Bad(t *testing.T) {
d := NewDaemon(DaemonOptions{
HealthAddr: "127.0.0.1:0",
})
@ -50,7 +50,7 @@ func TestDaemon_DoubleStartFails(t *testing.T) {
assert.Contains(t, err.Error(), "already running")
}
func TestDaemon_RunWithoutStartFails(t *testing.T) {
func TestDaemon_RunUnstarted_Bad(t *testing.T) {
d := NewDaemon(DaemonOptions{})
ctx, cancel := context.WithCancel(context.Background())
@ -61,7 +61,7 @@ func TestDaemon_RunWithoutStartFails(t *testing.T) {
assert.Contains(t, err.Error(), "not started")
}
func TestDaemon_SetReady(t *testing.T) {
func TestDaemon_SetReady_Good(t *testing.T) {
d := NewDaemon(DaemonOptions{
HealthAddr: "127.0.0.1:0",
})
@ -83,17 +83,17 @@ func TestDaemon_SetReady(t *testing.T) {
_ = resp.Body.Close()
}
func TestDaemon_NoHealthAddrReturnsEmpty(t *testing.T) {
func TestDaemon_HealthAddrDisabled_Good(t *testing.T) {
d := NewDaemon(DaemonOptions{})
assert.Empty(t, d.HealthAddr())
}
func TestDaemon_DefaultShutdownTimeout(t *testing.T) {
func TestDaemon_DefaultTimeout_Good(t *testing.T) {
d := NewDaemon(DaemonOptions{})
assert.Equal(t, 30*time.Second, d.opts.ShutdownTimeout)
}
func TestDaemon_RunBlocksUntilCancelled(t *testing.T) {
func TestDaemon_RunBlocking_Good(t *testing.T) {
d := NewDaemon(DaemonOptions{
HealthAddr: "127.0.0.1:0",
})
@ -126,7 +126,7 @@ func TestDaemon_RunBlocksUntilCancelled(t *testing.T) {
}
}
func TestDaemon_StopIdempotent(t *testing.T) {
func TestDaemon_StopIdempotent_Good(t *testing.T) {
d := NewDaemon(DaemonOptions{})
// Stop without Start should be a no-op
@ -134,9 +134,9 @@ func TestDaemon_StopIdempotent(t *testing.T) {
assert.NoError(t, err)
}
func TestDaemon_AutoRegisters(t *testing.T) {
func TestDaemon_AutoRegister_Good(t *testing.T) {
dir := t.TempDir()
reg := NewRegistry(filepath.Join(dir, "daemons"))
reg := NewRegistry(core.JoinPath(dir, "daemons"))
d := NewDaemon(DaemonOptions{
HealthAddr: "127.0.0.1:0",

View file

@ -3,27 +3,27 @@ package exec
import (
"bytes"
"context"
"fmt"
"io"
"os"
"os/exec"
"strings"
coreerr "dappco.re/go/core/log"
"dappco.re/go/core"
)
// Options configuration for command execution
// Options configures command execution.
//
// opts := exec.Options{Dir: "/workspace", Env: []string{"CI=1"}}
type Options struct {
Dir string
Env []string
Stdin io.Reader
Stdout io.Writer
Stderr io.Writer
// If true, command will run in background (not implemented in this wrapper yet)
// Background bool
}
// Command wraps os/exec.Command with logging and context
// Command wraps `os/exec.Command` with logging and context.
//
// cmd := exec.Command(ctx, "git", "status").WithDir("/workspace")
func Command(ctx context.Context, name string, args ...string) *Cmd {
return &Cmd{
name: name,
@ -32,7 +32,7 @@ func Command(ctx context.Context, name string, args ...string) *Cmd {
}
}
// Cmd represents a wrapped command
// Cmd represents a wrapped command.
type Cmd struct {
name string
args []string
@ -42,31 +42,31 @@ type Cmd struct {
logger Logger
}
// WithDir sets the working directory
// WithDir sets the working directory.
func (c *Cmd) WithDir(dir string) *Cmd {
c.opts.Dir = dir
return c
}
// WithEnv sets the environment variables
// WithEnv sets the environment variables.
func (c *Cmd) WithEnv(env []string) *Cmd {
c.opts.Env = env
return c
}
// WithStdin sets stdin
// WithStdin sets stdin.
func (c *Cmd) WithStdin(r io.Reader) *Cmd {
c.opts.Stdin = r
return c
}
// WithStdout sets stdout
// WithStdout sets stdout.
func (c *Cmd) WithStdout(w io.Writer) *Cmd {
c.opts.Stdout = w
return c
}
// WithStderr sets stderr
// WithStderr sets stderr.
func (c *Cmd) WithStderr(w io.Writer) *Cmd {
c.opts.Stderr = w
return c
@ -122,16 +122,13 @@ func (c *Cmd) CombinedOutput() ([]byte, error) {
}
func (c *Cmd) prepare() {
if c.ctx != nil {
c.cmd = exec.CommandContext(c.ctx, c.name, c.args...)
} else {
// Should we enforce context? The issue says "Enforce context usage".
// For now, let's allow nil but log a warning if we had a logger?
// Or strictly panic/error?
// Let's fallback to Background for now but maybe strict later.
c.cmd = exec.Command(c.name, c.args...)
ctx := c.ctx
if ctx == nil {
ctx = context.Background()
}
c.cmd = exec.CommandContext(ctx, c.name, c.args...)
c.cmd.Dir = c.opts.Dir
if len(c.opts.Env) > 0 {
c.cmd.Env = append(os.Environ(), c.opts.Env...)
@ -144,22 +141,23 @@ func (c *Cmd) prepare() {
// RunQuiet executes the command suppressing stdout unless there is an error.
// Useful for internal commands.
//
// _ = exec.RunQuiet(ctx, "go", "test", "./...")
func RunQuiet(ctx context.Context, name string, args ...string) error {
var stderr bytes.Buffer
cmd := Command(ctx, name, args...).WithStderr(&stderr)
if err := cmd.Run(); err != nil {
// Include stderr in error message
return coreerr.E("RunQuiet", strings.TrimSpace(stderr.String()), err)
return core.E("RunQuiet", core.Trim(stderr.String()), err)
}
return nil
}
func wrapError(caller string, err error, name string, args []string) error {
cmdStr := name + " " + strings.Join(args, " ")
cmdStr := commandString(name, args)
if exitErr, ok := err.(*exec.ExitError); ok {
return coreerr.E(caller, fmt.Sprintf("command %q failed with exit code %d", cmdStr, exitErr.ExitCode()), err)
return core.E(caller, core.Sprintf("command %q failed with exit code %d", cmdStr, exitErr.ExitCode()), err)
}
return coreerr.E(caller, fmt.Sprintf("failed to execute %q", cmdStr), err)
return core.E(caller, core.Sprintf("failed to execute %q", cmdStr), err)
}
func (c *Cmd) getLogger() Logger {
@ -170,9 +168,17 @@ func (c *Cmd) getLogger() Logger {
}
func (c *Cmd) logDebug(msg string) {
c.getLogger().Debug(msg, "cmd", c.name, "args", strings.Join(c.args, " "))
c.getLogger().Debug(msg, "cmd", c.name, "args", core.Join(" ", c.args...))
}
func (c *Cmd) logError(msg string, err error) {
c.getLogger().Error(msg, "cmd", c.name, "args", strings.Join(c.args, " "), "err", err)
c.getLogger().Error(msg, "cmd", c.name, "args", core.Join(" ", c.args...), "err", err)
}
func commandString(name string, args []string) string {
if len(args) == 0 {
return name
}
parts := append([]string{name}, args...)
return core.Join(" ", parts...)
}

View file

@ -2,9 +2,9 @@ package exec_test
import (
"context"
"strings"
"testing"
"dappco.re/go/core"
"dappco.re/go/core/process/exec"
)
@ -27,7 +27,7 @@ func (m *mockLogger) Error(msg string, keyvals ...any) {
m.errorCalls = append(m.errorCalls, logCall{msg, keyvals})
}
func TestCommand_Run_Good_LogsDebug(t *testing.T) {
func TestCommand_Run_Good(t *testing.T) {
logger := &mockLogger{}
ctx := context.Background()
@ -49,7 +49,7 @@ func TestCommand_Run_Good_LogsDebug(t *testing.T) {
}
}
func TestCommand_Run_Bad_LogsError(t *testing.T) {
func TestCommand_Run_Bad(t *testing.T) {
logger := &mockLogger{}
ctx := context.Background()
@ -71,6 +71,14 @@ func TestCommand_Run_Bad_LogsError(t *testing.T) {
}
}
func TestCommand_Run_WithNilContext_Good(t *testing.T) {
var ctx context.Context
if err := exec.Command(ctx, "echo", "hello").Run(); err != nil {
t.Fatalf("unexpected error: %v", err)
}
}
func TestCommand_Output_Good(t *testing.T) {
logger := &mockLogger{}
ctx := context.Background()
@ -81,7 +89,7 @@ func TestCommand_Output_Good(t *testing.T) {
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if strings.TrimSpace(string(out)) != "test" {
if core.Trim(string(out)) != "test" {
t.Errorf("expected 'test', got %q", string(out))
}
if len(logger.debugCalls) != 1 {
@ -99,7 +107,7 @@ func TestCommand_CombinedOutput_Good(t *testing.T) {
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if strings.TrimSpace(string(out)) != "combined" {
if core.Trim(string(out)) != "combined" {
t.Errorf("expected 'combined', got %q", string(out))
}
if len(logger.debugCalls) != 1 {
@ -107,14 +115,14 @@ func TestCommand_CombinedOutput_Good(t *testing.T) {
}
}
func TestNopLogger(t *testing.T) {
func TestNopLogger_Methods_Good(t *testing.T) {
// Verify NopLogger doesn't panic
var nop exec.NopLogger
nop.Debug("msg", "key", "val")
nop.Error("msg", "key", "val")
}
func TestSetDefaultLogger(t *testing.T) {
func TestLogger_SetDefault_Good(t *testing.T) {
original := exec.DefaultLogger()
defer exec.SetDefaultLogger(original)
@ -132,7 +140,7 @@ func TestSetDefaultLogger(t *testing.T) {
}
}
func TestCommand_UsesDefaultLogger(t *testing.T) {
func TestCommand_UsesDefaultLogger_Good(t *testing.T) {
original := exec.DefaultLogger()
defer exec.SetDefaultLogger(original)
@ -147,7 +155,7 @@ func TestCommand_UsesDefaultLogger(t *testing.T) {
}
}
func TestCommand_WithDir(t *testing.T) {
func TestCommand_WithDir_Good(t *testing.T) {
ctx := context.Background()
out, err := exec.Command(ctx, "pwd").
WithDir("/tmp").
@ -156,13 +164,13 @@ func TestCommand_WithDir(t *testing.T) {
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
trimmed := strings.TrimSpace(string(out))
trimmed := core.Trim(string(out))
if trimmed != "/tmp" && trimmed != "/private/tmp" {
t.Errorf("expected /tmp or /private/tmp, got %q", trimmed)
}
}
func TestCommand_WithEnv(t *testing.T) {
func TestCommand_WithEnv_Good(t *testing.T) {
ctx := context.Background()
out, err := exec.Command(ctx, "sh", "-c", "echo $TEST_EXEC_VAR").
WithEnv([]string{"TEST_EXEC_VAR=exec_val"}).
@ -171,31 +179,32 @@ func TestCommand_WithEnv(t *testing.T) {
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if strings.TrimSpace(string(out)) != "exec_val" {
if core.Trim(string(out)) != "exec_val" {
t.Errorf("expected 'exec_val', got %q", string(out))
}
}
func TestCommand_WithStdinStdoutStderr(t *testing.T) {
func TestCommand_WithStdinStdoutStderr_Good(t *testing.T) {
ctx := context.Background()
input := strings.NewReader("piped input\n")
var stdout, stderr strings.Builder
input := core.NewReader("piped input\n")
stdout := core.NewBuilder()
stderr := core.NewBuilder()
err := exec.Command(ctx, "cat").
WithStdin(input).
WithStdout(&stdout).
WithStderr(&stderr).
WithStdout(stdout).
WithStderr(stderr).
WithLogger(&mockLogger{}).
Run()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if strings.TrimSpace(stdout.String()) != "piped input" {
if core.Trim(stdout.String()) != "piped input" {
t.Errorf("expected 'piped input', got %q", stdout.String())
}
}
func TestRunQuiet_Good(t *testing.T) {
func TestRunQuiet_Command_Good(t *testing.T) {
ctx := context.Background()
err := exec.RunQuiet(ctx, "echo", "quiet")
if err != nil {
@ -203,7 +212,7 @@ func TestRunQuiet_Good(t *testing.T) {
}
}
func TestRunQuiet_Bad(t *testing.T) {
func TestRunQuiet_Command_Bad(t *testing.T) {
ctx := context.Background()
err := exec.RunQuiet(ctx, "sh", "-c", "echo fail >&2; exit 1")
if err == nil {

View file

@ -2,6 +2,8 @@ package exec
// Logger interface for command execution logging.
// Compatible with pkg/log.Logger and other structured loggers.
//
// exec.SetDefaultLogger(myLogger)
type Logger interface {
// Debug logs a debug-level message with optional key-value pairs.
Debug(msg string, keyvals ...any)
@ -10,6 +12,8 @@ type Logger interface {
}
// NopLogger is a no-op logger that discards all messages.
//
// var logger exec.NopLogger
type NopLogger struct{}
// Debug discards the message (no-op implementation).
@ -22,6 +26,8 @@ var defaultLogger Logger = NopLogger{}
// SetDefaultLogger sets the package-level default logger.
// Commands without an explicit logger will use this.
//
// exec.SetDefaultLogger(myLogger)
func SetDefaultLogger(l Logger) {
if l == nil {
l = NopLogger{}
@ -30,6 +36,8 @@ func SetDefaultLogger(l Logger) {
}
// DefaultLogger returns the current default logger.
//
// logger := exec.DefaultLogger()
func DefaultLogger() Logger {
return defaultLogger
}

15
go.mod
View file

@ -3,16 +3,16 @@ module dappco.re/go/core/process
go 1.26.0
require (
dappco.re/go/core v0.4.7
dappco.re/go/core/io v0.1.7
dappco.re/go/core/log v0.0.4
dappco.re/go/core/ws v0.2.4
dappco.re/go/core v0.8.0-alpha.1
dappco.re/go/core/io v0.2.0
dappco.re/go/core/ws v0.3.0
forge.lthn.ai/core/api v0.1.5
github.com/gin-gonic/gin v1.12.0
github.com/stretchr/testify v1.11.1
)
require (
dappco.re/go/core/log v0.1.0 // indirect
forge.lthn.ai/core/go-io v0.1.5 // indirect
forge.lthn.ai/core/go-log v0.0.4 // indirect
github.com/99designs/gqlgen v0.17.88 // indirect
@ -108,10 +108,3 @@ require (
google.golang.org/protobuf v1.36.11 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
replace (
dappco.re/go/core => ../go
dappco.re/go/core/io => ../go-io
dappco.re/go/core/log => ../go-log
dappco.re/go/core/ws => ../go-ws
)

8
go.sum
View file

@ -1,3 +1,11 @@
dappco.re/go/core v0.8.0-alpha.1 h1:gj7+Scv+L63Z7wMxbJYHhaRFkHJo2u4MMPuUSv/Dhtk=
dappco.re/go/core v0.8.0-alpha.1/go.mod h1:f2/tBZ3+3IqDrg2F5F598llv0nmb/4gJVCFzM5geE4A=
dappco.re/go/core/io v0.2.0 h1:zuudgIiTsQQ5ipVt97saWdGLROovbEB/zdVyy9/l+I4=
dappco.re/go/core/io v0.2.0/go.mod h1:1QnQV6X9LNgFKfm8SkOtR9LLaj3bDcsOIeJOOyjbL5E=
dappco.re/go/core/log v0.1.0 h1:pa71Vq2TD2aoEUQWFKwNcaJ3GBY8HbaNGqtE688Unyc=
dappco.re/go/core/log v0.1.0/go.mod h1:Nkqb8gsXhZAO8VLpx7B8i1iAmohhzqA20b9Zr8VUcJs=
dappco.re/go/core/ws v0.3.0 h1:ZxR8y5pfrWvnCHVN7qExXz7fdP5a063uNqyqE0Ab8pQ=
dappco.re/go/core/ws v0.3.0/go.mod h1:aLyXrJnbCOGL0SW9rC1EHAAIS83w3djO374gHIz4Nic=
forge.lthn.ai/core/api v0.1.5 h1:NwZrcOyBjaiz5/cn0n0tnlMUodi8Or6FHMx59C7Kv2o=
forge.lthn.ai/core/api v0.1.5/go.mod h1:PBnaWyOVXSOGy+0x2XAPUFMYJxQ2CNhppia/D06ZPII=
forge.lthn.ai/core/go-io v0.1.5 h1:+XJ1YhaGGFLGtcNbPtVlndTjk+pO0Ydi2hRDj5/cHOM=

View file

@ -2,19 +2,22 @@ package process
import (
"context"
"fmt"
"net"
"net/http"
"sync"
"time"
coreerr "dappco.re/go/core/log"
"dappco.re/go/core"
)
// HealthCheck is a function that returns nil if healthy.
//
// check := process.HealthCheck(func() error { return nil })
type HealthCheck func() error
// HealthServer provides HTTP /health and /ready endpoints for process monitoring.
//
// hs := process.NewHealthServer("127.0.0.1:0")
type HealthServer struct {
addr string
server *http.Server
@ -25,6 +28,8 @@ type HealthServer struct {
}
// NewHealthServer creates a health check server on the given address.
//
// hs := process.NewHealthServer("127.0.0.1:0")
func NewHealthServer(addr string) *HealthServer {
return &HealthServer{
addr: addr,
@ -58,13 +63,13 @@ func (h *HealthServer) Start() error {
for _, check := range checks {
if err := check(); err != nil {
w.WriteHeader(http.StatusServiceUnavailable)
_, _ = fmt.Fprintf(w, "unhealthy: %v\n", err)
_, _ = w.Write([]byte("unhealthy: " + err.Error() + "\n"))
return
}
}
w.WriteHeader(http.StatusOK)
_, _ = fmt.Fprintln(w, "ok")
_, _ = w.Write([]byte("ok\n"))
})
mux.HandleFunc("/ready", func(w http.ResponseWriter, r *http.Request) {
@ -74,17 +79,17 @@ func (h *HealthServer) Start() error {
if !ready {
w.WriteHeader(http.StatusServiceUnavailable)
_, _ = fmt.Fprintln(w, "not ready")
_, _ = w.Write([]byte("not ready\n"))
return
}
w.WriteHeader(http.StatusOK)
_, _ = fmt.Fprintln(w, "ready")
_, _ = w.Write([]byte("ready\n"))
})
listener, err := net.Listen("tcp", h.addr)
if err != nil {
return coreerr.E("HealthServer.Start", fmt.Sprintf("failed to listen on %s", h.addr), err)
return core.E("health.start", core.Concat("failed to listen on ", h.addr), err)
}
h.listener = listener
@ -115,9 +120,11 @@ func (h *HealthServer) Addr() string {
// WaitForHealth polls a health endpoint until it responds 200 or the timeout
// (in milliseconds) expires. Returns true if healthy, false on timeout.
//
// ok := process.WaitForHealth("127.0.0.1:9000", 2_000)
func WaitForHealth(addr string, timeoutMs int) bool {
deadline := time.Now().Add(time.Duration(timeoutMs) * time.Millisecond)
url := fmt.Sprintf("http://%s/health", addr)
url := core.Concat("http://", addr, "/health")
client := &http.Client{Timeout: 2 * time.Second}

View file

@ -9,7 +9,7 @@ import (
"github.com/stretchr/testify/require"
)
func TestHealthServer_Endpoints(t *testing.T) {
func TestHealthServer_Endpoints_Good(t *testing.T) {
hs := NewHealthServer("127.0.0.1:0")
err := hs.Start()
require.NoError(t, err)
@ -36,7 +36,7 @@ func TestHealthServer_Endpoints(t *testing.T) {
_ = resp.Body.Close()
}
func TestHealthServer_WithChecks(t *testing.T) {
func TestHealthServer_WithChecks_Good(t *testing.T) {
hs := NewHealthServer("127.0.0.1:0")
healthy := true
@ -66,7 +66,7 @@ func TestHealthServer_WithChecks(t *testing.T) {
_ = resp.Body.Close()
}
func TestWaitForHealth_Reachable(t *testing.T) {
func TestWaitForHealth_Reachable_Good(t *testing.T) {
hs := NewHealthServer("127.0.0.1:0")
require.NoError(t, hs.Start())
defer func() { _ = hs.Stop(context.Background()) }()
@ -75,7 +75,7 @@ func TestWaitForHealth_Reachable(t *testing.T) {
assert.True(t, ok)
}
func TestWaitForHealth_Unreachable(t *testing.T) {
func TestWaitForHealth_Unreachable_Bad(t *testing.T) {
ok := WaitForHealth("127.0.0.1:19999", 500)
assert.False(t, ok)
}

View file

@ -1,16 +1,14 @@
package process
import (
"fmt"
"os"
"path/filepath"
"bytes"
"path"
"strconv"
"strings"
"sync"
"syscall"
"dappco.re/go/core"
coreio "dappco.re/go/core/io"
coreerr "dappco.re/go/core/log"
)
// PIDFile manages a process ID file for single-instance enforcement.
@ -31,26 +29,26 @@ func (p *PIDFile) Acquire() error {
defer p.mu.Unlock()
if data, err := coreio.Local.Read(p.path); err == nil {
pid, err := strconv.Atoi(strings.TrimSpace(data))
pid, err := strconv.Atoi(string(bytes.TrimSpace([]byte(data))))
if err == nil && pid > 0 {
if proc, err := os.FindProcess(pid); err == nil {
if proc, err := processHandle(pid); err == nil {
if err := proc.Signal(syscall.Signal(0)); err == nil {
return coreerr.E("PIDFile.Acquire", fmt.Sprintf("another instance is running (PID %d)", pid), nil)
return core.E("pidfile.acquire", core.Concat("another instance is running (PID ", strconv.Itoa(pid), ")"), nil)
}
}
}
_ = coreio.Local.Delete(p.path)
}
if dir := filepath.Dir(p.path); dir != "." {
if dir := path.Dir(p.path); dir != "." {
if err := coreio.Local.EnsureDir(dir); err != nil {
return coreerr.E("PIDFile.Acquire", "failed to create PID directory", err)
return core.E("pidfile.acquire", "failed to create PID directory", err)
}
}
pid := os.Getpid()
pid := currentPID()
if err := coreio.Local.Write(p.path, strconv.Itoa(pid)); err != nil {
return coreerr.E("PIDFile.Acquire", "failed to write PID file", err)
return core.E("pidfile.acquire", "failed to write PID file", err)
}
return nil
@ -61,7 +59,7 @@ func (p *PIDFile) Release() error {
p.mu.Lock()
defer p.mu.Unlock()
if err := coreio.Local.Delete(p.path); err != nil {
return coreerr.E("PIDFile.Release", "failed to remove PID file", err)
return core.E("pidfile.release", "failed to remove PID file", err)
}
return nil
}
@ -80,12 +78,12 @@ func ReadPID(path string) (int, bool) {
return 0, false
}
pid, err := strconv.Atoi(strings.TrimSpace(data))
pid, err := strconv.Atoi(string(bytes.TrimSpace([]byte(data))))
if err != nil || pid <= 0 {
return 0, false
}
proc, err := os.FindProcess(pid)
proc, err := processHandle(pid)
if err != nil {
return pid, false
}

View file

@ -2,15 +2,15 @@ package process
import (
"os"
"path/filepath"
"testing"
"dappco.re/go/core"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestPIDFile_AcquireAndRelease(t *testing.T) {
pidPath := filepath.Join(t.TempDir(), "test.pid")
func TestPIDFile_Acquire_Good(t *testing.T) {
pidPath := core.JoinPath(t.TempDir(), "test.pid")
pid := NewPIDFile(pidPath)
err := pid.Acquire()
require.NoError(t, err)
@ -23,8 +23,8 @@ func TestPIDFile_AcquireAndRelease(t *testing.T) {
assert.True(t, os.IsNotExist(err))
}
func TestPIDFile_StalePID(t *testing.T) {
pidPath := filepath.Join(t.TempDir(), "stale.pid")
func TestPIDFile_AcquireStale_Good(t *testing.T) {
pidPath := core.JoinPath(t.TempDir(), "stale.pid")
require.NoError(t, os.WriteFile(pidPath, []byte("999999999"), 0644))
pid := NewPIDFile(pidPath)
err := pid.Acquire()
@ -33,8 +33,8 @@ func TestPIDFile_StalePID(t *testing.T) {
require.NoError(t, err)
}
func TestPIDFile_CreatesParentDirectory(t *testing.T) {
pidPath := filepath.Join(t.TempDir(), "subdir", "nested", "test.pid")
func TestPIDFile_CreateDirectory_Good(t *testing.T) {
pidPath := core.JoinPath(t.TempDir(), "subdir", "nested", "test.pid")
pid := NewPIDFile(pidPath)
err := pid.Acquire()
require.NoError(t, err)
@ -42,27 +42,27 @@ func TestPIDFile_CreatesParentDirectory(t *testing.T) {
require.NoError(t, err)
}
func TestPIDFile_Path(t *testing.T) {
func TestPIDFile_Path_Good(t *testing.T) {
pid := NewPIDFile("/tmp/test.pid")
assert.Equal(t, "/tmp/test.pid", pid.Path())
}
func TestReadPID_Missing(t *testing.T) {
func TestReadPID_Missing_Bad(t *testing.T) {
pid, running := ReadPID("/nonexistent/path.pid")
assert.Equal(t, 0, pid)
assert.False(t, running)
}
func TestReadPID_InvalidContent(t *testing.T) {
path := filepath.Join(t.TempDir(), "bad.pid")
func TestReadPID_Invalid_Bad(t *testing.T) {
path := core.JoinPath(t.TempDir(), "bad.pid")
require.NoError(t, os.WriteFile(path, []byte("notanumber"), 0644))
pid, running := ReadPID(path)
assert.Equal(t, 0, pid)
assert.False(t, running)
}
func TestReadPID_StalePID(t *testing.T) {
path := filepath.Join(t.TempDir(), "stale.pid")
func TestReadPID_Stale_Bad(t *testing.T) {
path := core.JoinPath(t.TempDir(), "stale.pid")
require.NoError(t, os.WriteFile(path, []byte("999999999"), 0644))
pid, running := ReadPID(path)
assert.Equal(t, 999999999, pid)

View file

@ -3,14 +3,13 @@
package api_test
import (
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
goapi "forge.lthn.ai/core/api"
process "dappco.re/go/core/process"
processapi "dappco.re/go/core/process/pkg/api"
goapi "forge.lthn.ai/core/api"
"github.com/gin-gonic/gin"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@ -65,10 +64,8 @@ func TestProcessProvider_ListDaemons_Good(t *testing.T) {
assert.Equal(t, http.StatusOK, w.Code)
var resp goapi.Response[[]any]
err := json.Unmarshal(w.Body.Bytes(), &resp)
require.NoError(t, err)
assert.True(t, resp.Success)
body := w.Body.String()
assert.NotEmpty(t, body)
}
func TestProcessProvider_GetDaemon_Bad(t *testing.T) {
@ -95,7 +92,7 @@ func TestProcessProvider_RegistersAsRouteGroup_Good(t *testing.T) {
assert.Equal(t, "process", engine.Groups()[0].Name())
}
func TestProcessProvider_Channels_RegisterAsStreamGroup_Good(t *testing.T) {
func TestProcessProvider_StreamGroup_Good(t *testing.T) {
p := processapi.NewProvider(nil, nil)
engine, err := goapi.New()

View file

@ -2,19 +2,21 @@ package process
import (
"context"
"fmt"
"io"
"os"
"os/exec"
"strconv"
"sync"
"syscall"
"time"
coreerr "dappco.re/go/core/log"
"dappco.re/go/core"
)
// Process represents a managed external process.
type Process struct {
type processStdin interface {
Write(p []byte) (n int, err error)
Close() error
}
// ManagedProcess represents a tracked external process started by the service.
type ManagedProcess struct {
ID string
PID int
Command string
@ -26,11 +28,11 @@ type Process struct {
ExitCode int
Duration time.Duration
cmd *exec.Cmd
cmd *execCmd
ctx context.Context
cancel context.CancelFunc
output *RingBuffer
stdin io.WriteCloser
stdin processStdin
done chan struct{}
mu sync.RWMutex
gracePeriod time.Duration
@ -38,34 +40,30 @@ type Process struct {
lastSignal string
}
// ManagedProcess is kept as a compatibility alias for legacy references.
type ManagedProcess = Process
// Process is kept as a compatibility alias for ManagedProcess.
type Process = ManagedProcess
// Info returns a snapshot of process state.
func (p *Process) Info() Info {
func (p *ManagedProcess) Info() ProcessInfo {
p.mu.RLock()
defer p.mu.RUnlock()
pid := p.PID
if p.cmd != nil && p.cmd.Process != nil {
pid = p.cmd.Process.Pid
}
return Info{
return ProcessInfo{
ID: p.ID,
Command: p.Command,
Args: p.Args,
Args: append([]string(nil), p.Args...),
Dir: p.Dir,
StartedAt: p.StartedAt,
Running: p.Status == StatusRunning,
Status: p.Status,
ExitCode: p.ExitCode,
Duration: p.Duration,
PID: pid,
PID: p.PID,
}
}
// Output returns the captured output as a string.
func (p *Process) Output() string {
func (p *ManagedProcess) Output() string {
p.mu.RLock()
defer p.mu.RUnlock()
if p.output == nil {
@ -75,7 +73,7 @@ func (p *Process) Output() string {
}
// OutputBytes returns the captured output as bytes.
func (p *Process) OutputBytes() []byte {
func (p *ManagedProcess) OutputBytes() []byte {
p.mu.RLock()
defer p.mu.RUnlock()
if p.output == nil {
@ -85,37 +83,40 @@ func (p *Process) OutputBytes() []byte {
}
// IsRunning returns true if the process is still executing.
func (p *Process) IsRunning() bool {
p.mu.RLock()
defer p.mu.RUnlock()
return p.Status == StatusRunning
func (p *ManagedProcess) IsRunning() bool {
select {
case <-p.done:
return false
default:
return true
}
}
// Wait blocks until the process exits.
func (p *Process) Wait() error {
func (p *ManagedProcess) Wait() error {
<-p.done
p.mu.RLock()
defer p.mu.RUnlock()
if p.Status == StatusFailed {
return coreerr.E("Process.Wait", fmt.Sprintf("process failed to start: %s", p.ID), nil)
return core.E("process.wait", core.Concat("process failed to start: ", p.ID), nil)
}
if p.Status == StatusKilled {
return coreerr.E("Process.Wait", fmt.Sprintf("process was killed: %s", p.ID), nil)
return core.E("process.wait", core.Concat("process was killed: ", p.ID), nil)
}
if p.ExitCode != 0 {
return coreerr.E("Process.Wait", fmt.Sprintf("process exited with code %d", p.ExitCode), nil)
return core.E("process.wait", core.Concat("process exited with code ", strconv.Itoa(p.ExitCode)), nil)
}
return nil
}
// Done returns a channel that closes when the process exits.
func (p *Process) Done() <-chan struct{} {
func (p *ManagedProcess) Done() <-chan struct{} {
return p.done
}
// Kill forcefully terminates the process.
// If KillGroup is set, kills the entire process group.
func (p *Process) Kill() error {
func (p *ManagedProcess) Kill() error {
p.mu.Lock()
defer p.mu.Unlock()
@ -128,7 +129,6 @@ func (p *Process) Kill() error {
}
p.lastSignal = "SIGKILL"
if p.killGroup {
// Kill entire process group (negative PID)
return syscall.Kill(-p.cmd.Process.Pid, syscall.SIGKILL)
@ -139,7 +139,7 @@ func (p *Process) Kill() error {
// Shutdown gracefully stops the process: SIGTERM, then SIGKILL after grace period.
// If GracePeriod was not set (zero), falls back to immediate Kill().
// If KillGroup is set, signals are sent to the entire process group.
func (p *Process) Shutdown() error {
func (p *ManagedProcess) Shutdown() error {
p.mu.RLock()
grace := p.gracePeriod
p.mu.RUnlock()
@ -163,7 +163,7 @@ func (p *Process) Shutdown() error {
}
// terminate sends SIGTERM to the process (or process group if KillGroup is set).
func (p *Process) terminate() error {
func (p *ManagedProcess) terminate() error {
p.mu.Lock()
defer p.mu.Unlock()
@ -176,31 +176,15 @@ func (p *Process) terminate() error {
}
pid := p.cmd.Process.Pid
p.lastSignal = "SIGTERM"
if p.killGroup {
pid = -pid
}
p.lastSignal = "SIGTERM"
return syscall.Kill(pid, syscall.SIGTERM)
}
// Signal sends a signal to the process.
func (p *Process) Signal(sig os.Signal) error {
p.mu.Lock()
defer p.mu.Unlock()
if p.Status != StatusRunning {
return ErrProcessNotRunning
}
if p.cmd == nil || p.cmd.Process == nil {
return nil
}
return p.cmd.Process.Signal(sig)
}
// SendInput writes to the process stdin.
func (p *Process) SendInput(input string) error {
func (p *ManagedProcess) SendInput(input string) error {
p.mu.RLock()
defer p.mu.RUnlock()
@ -217,7 +201,7 @@ func (p *Process) SendInput(input string) error {
}
// CloseStdin closes the process stdin pipe.
func (p *Process) CloseStdin() error {
func (p *ManagedProcess) CloseStdin() error {
p.mu.Lock()
defer p.mu.Unlock()
@ -230,7 +214,7 @@ func (p *Process) CloseStdin() error {
return err
}
func (p *Process) requestedSignal() string {
func (p *ManagedProcess) requestedSignal() string {
p.mu.RLock()
defer p.mu.RUnlock()
return p.lastSignal

View file

@ -10,11 +10,10 @@ import (
"github.com/stretchr/testify/require"
)
func TestProcess_Info(t *testing.T) {
func TestProcess_Info_Good(t *testing.T) {
svc, _ := newTestService(t)
proc, err := svc.Start(context.Background(), "echo", "hello")
require.NoError(t, err)
proc := startProc(t, svc, context.Background(), "echo", "hello")
<-proc.Done()
@ -27,216 +26,163 @@ func TestProcess_Info(t *testing.T) {
assert.Greater(t, info.Duration, time.Duration(0))
}
func TestProcess_Output(t *testing.T) {
func TestProcess_Output_Good(t *testing.T) {
t.Run("captures stdout", func(t *testing.T) {
svc, _ := newTestService(t)
proc, err := svc.Start(context.Background(), "echo", "hello world")
require.NoError(t, err)
proc := startProc(t, svc, context.Background(), "echo", "hello world")
<-proc.Done()
output := proc.Output()
assert.Contains(t, output, "hello world")
assert.Contains(t, proc.Output(), "hello world")
})
t.Run("OutputBytes returns copy", func(t *testing.T) {
svc, _ := newTestService(t)
proc, err := svc.Start(context.Background(), "echo", "test")
require.NoError(t, err)
proc := startProc(t, svc, context.Background(), "echo", "test")
<-proc.Done()
bytes := proc.OutputBytes()
assert.NotNil(t, bytes)
assert.Contains(t, string(bytes), "test")
})
}
func TestProcess_IsRunning(t *testing.T) {
func TestProcess_IsRunning_Good(t *testing.T) {
t.Run("true while running", func(t *testing.T) {
svc, _ := newTestService(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
proc, err := svc.Start(ctx, "sleep", "10")
require.NoError(t, err)
proc := startProc(t, svc, ctx, "sleep", "10")
assert.True(t, proc.IsRunning())
cancel()
<-proc.Done()
assert.False(t, proc.IsRunning())
})
t.Run("false after completion", func(t *testing.T) {
svc, _ := newTestService(t)
proc, err := svc.Start(context.Background(), "echo", "done")
require.NoError(t, err)
proc := startProc(t, svc, context.Background(), "echo", "done")
<-proc.Done()
assert.False(t, proc.IsRunning())
})
}
func TestProcess_Wait(t *testing.T) {
func TestProcess_Wait_Good(t *testing.T) {
t.Run("returns nil on success", func(t *testing.T) {
svc, _ := newTestService(t)
proc, err := svc.Start(context.Background(), "echo", "ok")
require.NoError(t, err)
err = proc.Wait()
proc := startProc(t, svc, context.Background(), "echo", "ok")
err := proc.Wait()
assert.NoError(t, err)
})
t.Run("returns error on failure", func(t *testing.T) {
svc, _ := newTestService(t)
proc, err := svc.Start(context.Background(), "sh", "-c", "exit 1")
require.NoError(t, err)
err = proc.Wait()
proc := startProc(t, svc, context.Background(), "sh", "-c", "exit 1")
err := proc.Wait()
assert.Error(t, err)
})
}
func TestProcess_Done(t *testing.T) {
func TestProcess_Done_Good(t *testing.T) {
t.Run("channel closes on completion", func(t *testing.T) {
svc, _ := newTestService(t)
proc, err := svc.Start(context.Background(), "echo", "test")
require.NoError(t, err)
proc := startProc(t, svc, context.Background(), "echo", "test")
select {
case <-proc.Done():
// Success - channel closed
case <-time.After(5 * time.Second):
t.Fatal("Done channel should have closed")
}
})
}
func TestProcess_Kill(t *testing.T) {
func TestProcess_Kill_Good(t *testing.T) {
t.Run("terminates running process", func(t *testing.T) {
svc, _ := newTestService(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
proc, err := svc.Start(ctx, "sleep", "60")
require.NoError(t, err)
proc := startProc(t, svc, ctx, "sleep", "60")
assert.True(t, proc.IsRunning())
err = proc.Kill()
err := proc.Kill()
assert.NoError(t, err)
select {
case <-proc.Done():
// Good - process terminated
case <-time.After(2 * time.Second):
t.Fatal("process should have been killed")
}
assert.Equal(t, StatusKilled, proc.Status)
assert.Equal(t, -1, proc.ExitCode)
})
t.Run("noop on completed process", func(t *testing.T) {
svc, _ := newTestService(t)
proc, err := svc.Start(context.Background(), "echo", "done")
require.NoError(t, err)
proc := startProc(t, svc, context.Background(), "echo", "done")
<-proc.Done()
err = proc.Kill()
err := proc.Kill()
assert.NoError(t, err)
})
}
func TestProcess_SendInput(t *testing.T) {
func TestProcess_SendInput_Good(t *testing.T) {
t.Run("writes to stdin", func(t *testing.T) {
svc, _ := newTestService(t)
proc := startProc(t, svc, context.Background(), "cat")
// Use cat to echo back stdin
proc, err := svc.Start(context.Background(), "cat")
require.NoError(t, err)
err = proc.SendInput("hello\n")
err := proc.SendInput("hello\n")
assert.NoError(t, err)
err = proc.CloseStdin()
assert.NoError(t, err)
<-proc.Done()
assert.Contains(t, proc.Output(), "hello")
})
t.Run("error on completed process", func(t *testing.T) {
svc, _ := newTestService(t)
proc, err := svc.Start(context.Background(), "echo", "done")
require.NoError(t, err)
proc := startProc(t, svc, context.Background(), "echo", "done")
<-proc.Done()
err = proc.SendInput("test")
err := proc.SendInput("test")
assert.ErrorIs(t, err, ErrProcessNotRunning)
})
}
func TestProcess_Signal(t *testing.T) {
func TestProcess_Signal_Good(t *testing.T) {
t.Run("sends signal to running process", func(t *testing.T) {
svc, _ := newTestService(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
proc, err := svc.Start(ctx, "sleep", "60")
require.NoError(t, err)
err = proc.Signal(os.Interrupt)
proc := startProc(t, svc, ctx, "sleep", "60")
err := proc.Signal(os.Interrupt)
assert.NoError(t, err)
select {
case <-proc.Done():
// Process terminated by signal
case <-time.After(2 * time.Second):
t.Fatal("process should have been terminated by signal")
}
assert.Equal(t, StatusKilled, proc.Status)
})
t.Run("error on completed process", func(t *testing.T) {
svc, _ := newTestService(t)
proc, err := svc.Start(context.Background(), "echo", "done")
require.NoError(t, err)
proc := startProc(t, svc, context.Background(), "echo", "done")
<-proc.Done()
err = proc.Signal(os.Interrupt)
err := proc.Signal(os.Interrupt)
assert.ErrorIs(t, err, ErrProcessNotRunning)
})
}
func TestProcess_CloseStdin(t *testing.T) {
func TestProcess_CloseStdin_Good(t *testing.T) {
t.Run("closes stdin pipe", func(t *testing.T) {
svc, _ := newTestService(t)
proc, err := svc.Start(context.Background(), "cat")
require.NoError(t, err)
err = proc.CloseStdin()
proc := startProc(t, svc, context.Background(), "cat")
err := proc.CloseStdin()
assert.NoError(t, err)
// Process should exit now that stdin is closed
select {
case <-proc.Done():
// Good
case <-time.After(2 * time.Second):
t.Fatal("cat should exit when stdin is closed")
}
@ -244,78 +190,66 @@ func TestProcess_CloseStdin(t *testing.T) {
t.Run("double close is safe", func(t *testing.T) {
svc, _ := newTestService(t)
proc, err := svc.Start(context.Background(), "cat")
require.NoError(t, err)
// First close
err = proc.CloseStdin()
proc := startProc(t, svc, context.Background(), "cat")
err := proc.CloseStdin()
assert.NoError(t, err)
<-proc.Done()
// Second close should be safe (stdin already nil)
err = proc.CloseStdin()
assert.NoError(t, err)
})
}
func TestProcess_Timeout(t *testing.T) {
func TestProcess_Timeout_Good(t *testing.T) {
t.Run("kills process after timeout", func(t *testing.T) {
svc, _ := newTestService(t)
proc, err := svc.StartWithOptions(context.Background(), RunOptions{
r := svc.StartWithOptions(context.Background(), RunOptions{
Command: "sleep",
Args: []string{"60"},
Timeout: 200 * time.Millisecond,
})
require.NoError(t, err)
require.True(t, r.OK)
proc := r.Value.(*Process)
select {
case <-proc.Done():
// Good — process was killed by timeout
case <-time.After(5 * time.Second):
t.Fatal("process should have been killed by timeout")
}
assert.False(t, proc.IsRunning())
assert.Equal(t, StatusKilled, proc.Status)
})
t.Run("no timeout when zero", func(t *testing.T) {
svc, _ := newTestService(t)
proc, err := svc.StartWithOptions(context.Background(), RunOptions{
r := svc.StartWithOptions(context.Background(), RunOptions{
Command: "echo",
Args: []string{"fast"},
Timeout: 0,
})
require.NoError(t, err)
require.True(t, r.OK)
proc := r.Value.(*Process)
<-proc.Done()
assert.Equal(t, 0, proc.ExitCode)
})
}
func TestProcess_Shutdown(t *testing.T) {
func TestProcess_Shutdown_Good(t *testing.T) {
t.Run("graceful with grace period", func(t *testing.T) {
svc, _ := newTestService(t)
// Use a process that traps SIGTERM
proc, err := svc.StartWithOptions(context.Background(), RunOptions{
r := svc.StartWithOptions(context.Background(), RunOptions{
Command: "sleep",
Args: []string{"60"},
GracePeriod: 100 * time.Millisecond,
})
require.NoError(t, err)
require.True(t, r.OK)
proc := r.Value.(*Process)
assert.True(t, proc.IsRunning())
err = proc.Shutdown()
err := proc.Shutdown()
assert.NoError(t, err)
select {
case <-proc.Done():
// Good
case <-time.After(5 * time.Second):
t.Fatal("shutdown should have completed")
}
@ -323,70 +257,65 @@ func TestProcess_Shutdown(t *testing.T) {
t.Run("immediate kill without grace period", func(t *testing.T) {
svc, _ := newTestService(t)
proc, err := svc.StartWithOptions(context.Background(), RunOptions{
r := svc.StartWithOptions(context.Background(), RunOptions{
Command: "sleep",
Args: []string{"60"},
})
require.NoError(t, err)
require.True(t, r.OK)
proc := r.Value.(*Process)
err = proc.Shutdown()
err := proc.Shutdown()
assert.NoError(t, err)
select {
case <-proc.Done():
// Good
case <-time.After(2 * time.Second):
t.Fatal("kill should be immediate")
}
})
}
func TestProcess_KillGroup(t *testing.T) {
func TestProcess_KillGroup_Good(t *testing.T) {
t.Run("kills child processes", func(t *testing.T) {
svc, _ := newTestService(t)
// Spawn a parent that spawns a child — KillGroup should kill both
proc, err := svc.StartWithOptions(context.Background(), RunOptions{
r := svc.StartWithOptions(context.Background(), RunOptions{
Command: "sh",
Args: []string{"-c", "sleep 60 & wait"},
Detach: true,
KillGroup: true,
})
require.NoError(t, err)
require.True(t, r.OK)
proc := r.Value.(*Process)
// Give child time to spawn
time.Sleep(100 * time.Millisecond)
err = proc.Kill()
err := proc.Kill()
assert.NoError(t, err)
select {
case <-proc.Done():
// Good — whole group killed
case <-time.After(5 * time.Second):
t.Fatal("process group should have been killed")
}
})
}
func TestProcess_TimeoutWithGrace(t *testing.T) {
func TestProcess_TimeoutWithGrace_Good(t *testing.T) {
t.Run("timeout triggers graceful shutdown", func(t *testing.T) {
svc, _ := newTestService(t)
proc, err := svc.StartWithOptions(context.Background(), RunOptions{
r := svc.StartWithOptions(context.Background(), RunOptions{
Command: "sleep",
Args: []string{"60"},
Timeout: 200 * time.Millisecond,
GracePeriod: 100 * time.Millisecond,
})
require.NoError(t, err)
require.True(t, r.OK)
proc := r.Value.(*Process)
select {
case <-proc.Done():
// Good — timeout + grace triggered
case <-time.After(5 * time.Second):
t.Fatal("process should have been killed by timeout")
}
assert.Equal(t, StatusKilled, proc.Status)
})
}

View file

@ -3,19 +3,19 @@ package process
import (
"bytes"
"context"
"fmt"
"os/exec"
"strings"
"strconv"
coreerr "dappco.re/go/core/log"
"dappco.re/go/core"
)
// ErrProgramNotFound is returned when Find cannot locate the binary on PATH.
// Callers may use errors.Is to detect this condition.
var ErrProgramNotFound = coreerr.E("", "program: binary not found in PATH", nil)
// Callers may use core.Is to detect this condition.
var ErrProgramNotFound = core.E("", "program: binary not found in PATH", nil)
// Program represents a named executable located on the system PATH.
// Create one with a Name, call Find to resolve its path, then Run or RunDir.
//
// p := &process.Program{Name: "go"}
type Program struct {
// Name is the binary name (e.g. "go", "node", "git").
Name string
@ -26,13 +26,15 @@ type Program struct {
// Find resolves the program's absolute path using exec.LookPath.
// Returns ErrProgramNotFound (wrapped) if the binary is not on PATH.
//
// err := p.Find()
func (p *Program) Find() error {
if p.Name == "" {
return coreerr.E("Program.Find", "program name is empty", nil)
return core.E("program.find", "program name is empty", nil)
}
path, err := exec.LookPath(p.Name)
path, err := execLookPath(p.Name)
if err != nil {
return coreerr.E("Program.Find", fmt.Sprintf("%q: not found in PATH", p.Name), ErrProgramNotFound)
return core.E("program.find", core.Concat(strconv.Quote(p.Name), ": not found in PATH"), ErrProgramNotFound)
}
p.Path = path
return nil
@ -40,6 +42,8 @@ func (p *Program) Find() error {
// Run executes the program with args in the current working directory.
// Returns trimmed combined stdout+stderr output and any error.
//
// out, err := p.Run(ctx, "version")
func (p *Program) Run(ctx context.Context, args ...string) (string, error) {
return p.RunDir(ctx, "", args...)
}
@ -47,6 +51,8 @@ func (p *Program) Run(ctx context.Context, args ...string) (string, error) {
// RunDir executes the program with args in dir.
// Returns trimmed combined stdout+stderr output and any error.
// If dir is empty, the process inherits the caller's working directory.
//
// out, err := p.RunDir(ctx, "/workspace", "test", "./...")
func (p *Program) RunDir(ctx context.Context, dir string, args ...string) (string, error) {
binary := p.Path
if binary == "" {
@ -54,7 +60,7 @@ func (p *Program) RunDir(ctx context.Context, dir string, args ...string) (strin
}
var out bytes.Buffer
cmd := exec.CommandContext(ctx, binary, args...)
cmd := execCommandContext(ctx, binary, args...)
cmd.Stdout = &out
cmd.Stderr = &out
if dir != "" {
@ -62,7 +68,7 @@ func (p *Program) RunDir(ctx context.Context, dir string, args ...string) (strin
}
if err := cmd.Run(); err != nil {
return strings.TrimSpace(out.String()), coreerr.E("Program.RunDir", fmt.Sprintf("%q: command failed", p.Name), err)
return string(bytes.TrimSpace(out.Bytes())), core.E("program.run", core.Concat(strconv.Quote(p.Name), ": command failed"), err)
}
return strings.TrimSpace(out.String()), nil
return string(bytes.TrimSpace(out.Bytes())), nil
}

View file

@ -2,10 +2,11 @@ package process_test
import (
"context"
"path/filepath"
"os"
"testing"
"time"
"dappco.re/go/core"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@ -19,25 +20,25 @@ func testCtx(t *testing.T) context.Context {
return ctx
}
func TestProgram_Find_KnownBinary(t *testing.T) {
func TestProgram_Find_Good(t *testing.T) {
p := &process.Program{Name: "echo"}
require.NoError(t, p.Find())
assert.NotEmpty(t, p.Path)
}
func TestProgram_Find_UnknownBinary(t *testing.T) {
func TestProgram_FindUnknown_Bad(t *testing.T) {
p := &process.Program{Name: "no-such-binary-xyzzy-42"}
err := p.Find()
require.Error(t, err)
assert.ErrorIs(t, err, process.ErrProgramNotFound)
}
func TestProgram_Find_EmptyName(t *testing.T) {
func TestProgram_FindEmpty_Bad(t *testing.T) {
p := &process.Program{}
require.Error(t, p.Find())
}
func TestProgram_Run_ReturnsOutput(t *testing.T) {
func TestProgram_Run_Good(t *testing.T) {
p := &process.Program{Name: "echo"}
require.NoError(t, p.Find())
@ -46,7 +47,7 @@ func TestProgram_Run_ReturnsOutput(t *testing.T) {
assert.Equal(t, "hello", out)
}
func TestProgram_Run_WithoutFind_FallsBackToName(t *testing.T) {
func TestProgram_RunFallback_Good(t *testing.T) {
// Path is empty; RunDir should fall back to Name for OS PATH resolution.
p := &process.Program{Name: "echo"}
@ -55,7 +56,7 @@ func TestProgram_Run_WithoutFind_FallsBackToName(t *testing.T) {
assert.Equal(t, "fallback", out)
}
func TestProgram_RunDir_UsesDirectory(t *testing.T) {
func TestProgram_RunDir_Good(t *testing.T) {
p := &process.Program{Name: "pwd"}
require.NoError(t, p.Find())
@ -63,15 +64,14 @@ func TestProgram_RunDir_UsesDirectory(t *testing.T) {
out, err := p.RunDir(testCtx(t), dir)
require.NoError(t, err)
// Resolve symlinks on both sides for portability (macOS uses /private/ prefix).
canonicalDir, err := filepath.EvalSymlinks(dir)
dirInfo, err := os.Stat(dir)
require.NoError(t, err)
canonicalOut, err := filepath.EvalSymlinks(out)
outInfo, err := os.Stat(core.Trim(out))
require.NoError(t, err)
assert.Equal(t, canonicalDir, canonicalOut)
assert.True(t, os.SameFile(dirInfo, outInfo))
}
func TestProgram_Run_FailingCommand(t *testing.T) {
func TestProgram_RunFailure_Bad(t *testing.T) {
p := &process.Program{Name: "false"}
require.NoError(t, p.Find())

View file

@ -1,18 +1,18 @@
package process
import (
"encoding/json"
"os"
"path/filepath"
"strings"
"path"
"strconv"
"syscall"
"time"
"dappco.re/go/core"
coreio "dappco.re/go/core/io"
coreerr "dappco.re/go/core/log"
)
// DaemonEntry records a running daemon in the registry.
//
// entry := process.DaemonEntry{Code: "myapp", Daemon: "serve", PID: 1234}
type DaemonEntry struct {
Code string `json:"code"`
Daemon string `json:"daemon"`
@ -24,22 +24,28 @@ type DaemonEntry struct {
}
// Registry tracks running daemons via JSON files in a directory.
//
// reg := process.NewRegistry("/tmp/process-daemons")
type Registry struct {
dir string
}
// NewRegistry creates a registry backed by the given directory.
//
// reg := process.NewRegistry("/tmp/process-daemons")
func NewRegistry(dir string) *Registry {
return &Registry{dir: dir}
}
// DefaultRegistry returns a registry using ~/.core/daemons/.
//
// reg := process.DefaultRegistry()
func DefaultRegistry() *Registry {
home, err := os.UserHomeDir()
home, err := userHomeDir()
if err != nil {
home = os.TempDir()
home = tempDir()
}
return NewRegistry(filepath.Join(home, ".core", "daemons"))
return NewRegistry(path.Join(home, ".core", "daemons"))
}
// Register writes a daemon entry to the registry directory.
@ -51,16 +57,16 @@ func (r *Registry) Register(entry DaemonEntry) error {
}
if err := coreio.Local.EnsureDir(r.dir); err != nil {
return coreerr.E("Registry.Register", "failed to create registry directory", err)
return core.E("registry.register", "failed to create registry directory", err)
}
data, err := json.MarshalIndent(entry, "", " ")
data, err := marshalDaemonEntry(entry)
if err != nil {
return coreerr.E("Registry.Register", "failed to marshal entry", err)
return core.E("registry.register", "failed to marshal entry", err)
}
if err := coreio.Local.Write(r.entryPath(entry.Code, entry.Daemon), string(data)); err != nil {
return coreerr.E("Registry.Register", "failed to write entry file", err)
if err := coreio.Local.Write(r.entryPath(entry.Code, entry.Daemon), data); err != nil {
return core.E("registry.register", "failed to write entry file", err)
}
return nil
}
@ -68,7 +74,7 @@ func (r *Registry) Register(entry DaemonEntry) error {
// Unregister removes a daemon entry from the registry.
func (r *Registry) Unregister(code, daemon string) error {
if err := coreio.Local.Delete(r.entryPath(code, daemon)); err != nil {
return coreerr.E("Registry.Unregister", "failed to delete entry file", err)
return core.E("registry.unregister", "failed to delete entry file", err)
}
return nil
}
@ -83,8 +89,8 @@ func (r *Registry) Get(code, daemon string) (*DaemonEntry, bool) {
return nil, false
}
var entry DaemonEntry
if err := json.Unmarshal([]byte(data), &entry); err != nil {
entry, err := unmarshalDaemonEntry(data)
if err != nil {
_ = coreio.Local.Delete(path)
return nil, false
}
@ -99,20 +105,28 @@ func (r *Registry) Get(code, daemon string) (*DaemonEntry, bool) {
// List returns all alive daemon entries, pruning any with dead PIDs.
func (r *Registry) List() ([]DaemonEntry, error) {
matches, err := filepath.Glob(filepath.Join(r.dir, "*.json"))
if !coreio.Local.Exists(r.dir) {
return nil, nil
}
entries, err := coreio.Local.List(r.dir)
if err != nil {
return nil, err
return nil, core.E("registry.list", "failed to list registry directory", err)
}
var alive []DaemonEntry
for _, path := range matches {
for _, entryFile := range entries {
if entryFile.IsDir() || !core.HasSuffix(entryFile.Name(), ".json") {
continue
}
path := path.Join(r.dir, entryFile.Name())
data, err := coreio.Local.Read(path)
if err != nil {
continue
}
var entry DaemonEntry
if err := json.Unmarshal([]byte(data), &entry); err != nil {
entry, err := unmarshalDaemonEntry(data)
if err != nil {
_ = coreio.Local.Delete(path)
continue
}
@ -130,8 +144,8 @@ func (r *Registry) List() ([]DaemonEntry, error) {
// entryPath returns the filesystem path for a daemon entry.
func (r *Registry) entryPath(code, daemon string) string {
name := strings.ReplaceAll(code, "/", "-") + "-" + strings.ReplaceAll(daemon, "/", "-") + ".json"
return filepath.Join(r.dir, name)
name := sanitizeRegistryComponent(code) + "-" + sanitizeRegistryComponent(daemon) + ".json"
return path.Join(r.dir, name)
}
// isAlive checks whether a process with the given PID is running.
@ -139,9 +153,263 @@ func isAlive(pid int) bool {
if pid <= 0 {
return false
}
proc, err := os.FindProcess(pid)
proc, err := processHandle(pid)
if err != nil {
return false
}
return proc.Signal(syscall.Signal(0)) == nil
}
func sanitizeRegistryComponent(value string) string {
buf := make([]byte, len(value))
for i := 0; i < len(value); i++ {
if value[i] == '/' {
buf[i] = '-'
continue
}
buf[i] = value[i]
}
return string(buf)
}
func marshalDaemonEntry(entry DaemonEntry) (string, error) {
fields := []struct {
key string
value string
}{
{key: "code", value: quoteJSONString(entry.Code)},
{key: "daemon", value: quoteJSONString(entry.Daemon)},
{key: "pid", value: strconv.Itoa(entry.PID)},
}
if entry.Health != "" {
fields = append(fields, struct {
key string
value string
}{key: "health", value: quoteJSONString(entry.Health)})
}
if entry.Project != "" {
fields = append(fields, struct {
key string
value string
}{key: "project", value: quoteJSONString(entry.Project)})
}
if entry.Binary != "" {
fields = append(fields, struct {
key string
value string
}{key: "binary", value: quoteJSONString(entry.Binary)})
}
fields = append(fields, struct {
key string
value string
}{
key: "started",
value: quoteJSONString(entry.Started.Format(time.RFC3339Nano)),
})
builder := core.NewBuilder()
builder.WriteString("{\n")
for i, field := range fields {
builder.WriteString(core.Concat(" ", quoteJSONString(field.key), ": ", field.value))
if i < len(fields)-1 {
builder.WriteString(",")
}
builder.WriteString("\n")
}
builder.WriteString("}")
return builder.String(), nil
}
func unmarshalDaemonEntry(data string) (DaemonEntry, error) {
values, err := parseJSONObject(data)
if err != nil {
return DaemonEntry{}, err
}
entry := DaemonEntry{
Code: values["code"],
Daemon: values["daemon"],
Health: values["health"],
Project: values["project"],
Binary: values["binary"],
}
pidValue, ok := values["pid"]
if !ok {
return DaemonEntry{}, core.E("Registry.unmarshalDaemonEntry", "missing pid", nil)
}
entry.PID, err = strconv.Atoi(pidValue)
if err != nil {
return DaemonEntry{}, core.E("Registry.unmarshalDaemonEntry", "invalid pid", err)
}
startedValue, ok := values["started"]
if !ok {
return DaemonEntry{}, core.E("Registry.unmarshalDaemonEntry", "missing started", nil)
}
entry.Started, err = time.Parse(time.RFC3339Nano, startedValue)
if err != nil {
return DaemonEntry{}, core.E("Registry.unmarshalDaemonEntry", "invalid started timestamp", err)
}
return entry, nil
}
func parseJSONObject(data string) (map[string]string, error) {
trimmed := core.Trim(data)
if trimmed == "" {
return nil, core.E("Registry.parseJSONObject", "empty JSON object", nil)
}
if trimmed[0] != '{' || trimmed[len(trimmed)-1] != '}' {
return nil, core.E("Registry.parseJSONObject", "invalid JSON object", nil)
}
values := make(map[string]string)
index := skipJSONSpace(trimmed, 1)
for index < len(trimmed) {
if trimmed[index] == '}' {
return values, nil
}
key, next, err := parseJSONString(trimmed, index)
if err != nil {
return nil, err
}
index = skipJSONSpace(trimmed, next)
if index >= len(trimmed) || trimmed[index] != ':' {
return nil, core.E("Registry.parseJSONObject", "missing key separator", nil)
}
index = skipJSONSpace(trimmed, index+1)
if index >= len(trimmed) {
return nil, core.E("Registry.parseJSONObject", "missing value", nil)
}
var value string
if trimmed[index] == '"' {
value, index, err = parseJSONString(trimmed, index)
if err != nil {
return nil, err
}
} else {
start := index
for index < len(trimmed) && trimmed[index] != ',' && trimmed[index] != '}' {
index++
}
value = core.Trim(trimmed[start:index])
}
values[key] = value
index = skipJSONSpace(trimmed, index)
if index >= len(trimmed) {
break
}
if trimmed[index] == ',' {
index = skipJSONSpace(trimmed, index+1)
continue
}
if trimmed[index] == '}' {
return values, nil
}
return nil, core.E("Registry.parseJSONObject", "invalid object separator", nil)
}
return nil, core.E("Registry.parseJSONObject", "unterminated JSON object", nil)
}
func parseJSONString(data string, start int) (string, int, error) {
if start >= len(data) || data[start] != '"' {
return "", 0, core.E("Registry.parseJSONString", "expected quoted string", nil)
}
builder := core.NewBuilder()
for index := start + 1; index < len(data); index++ {
ch := data[index]
if ch == '"' {
return builder.String(), index + 1, nil
}
if ch != '\\' {
builder.WriteByte(ch)
continue
}
index++
if index >= len(data) {
return "", 0, core.E("Registry.parseJSONString", "unterminated escape sequence", nil)
}
switch data[index] {
case '"', '\\', '/':
builder.WriteByte(data[index])
case 'b':
builder.WriteByte('\b')
case 'f':
builder.WriteByte('\f')
case 'n':
builder.WriteByte('\n')
case 'r':
builder.WriteByte('\r')
case 't':
builder.WriteByte('\t')
case 'u':
if index+4 >= len(data) {
return "", 0, core.E("Registry.parseJSONString", "short unicode escape", nil)
}
r, err := strconv.ParseInt(data[index+1:index+5], 16, 32)
if err != nil {
return "", 0, core.E("Registry.parseJSONString", "invalid unicode escape", err)
}
builder.WriteRune(rune(r))
index += 4
default:
return "", 0, core.E("Registry.parseJSONString", "invalid escape sequence", nil)
}
}
return "", 0, core.E("Registry.parseJSONString", "unterminated string", nil)
}
func skipJSONSpace(data string, index int) int {
for index < len(data) {
switch data[index] {
case ' ', '\n', '\r', '\t':
index++
default:
return index
}
}
return index
}
func quoteJSONString(value string) string {
builder := core.NewBuilder()
builder.WriteByte('"')
for i := 0; i < len(value); i++ {
switch value[i] {
case '\\', '"':
builder.WriteByte('\\')
builder.WriteByte(value[i])
case '\b':
builder.WriteString(`\b`)
case '\f':
builder.WriteString(`\f`)
case '\n':
builder.WriteString(`\n`)
case '\r':
builder.WriteString(`\r`)
case '\t':
builder.WriteString(`\t`)
default:
if value[i] < 0x20 {
builder.WriteString(core.Sprintf("\\u%04x", value[i]))
continue
}
builder.WriteByte(value[i])
}
}
builder.WriteByte('"')
return builder.String()
}

View file

@ -2,15 +2,15 @@ package process
import (
"os"
"path/filepath"
"testing"
"time"
"dappco.re/go/core"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestRegistry_RegisterAndGet(t *testing.T) {
func TestRegistry_Register_Good(t *testing.T) {
dir := t.TempDir()
reg := NewRegistry(dir)
@ -39,7 +39,7 @@ func TestRegistry_RegisterAndGet(t *testing.T) {
assert.Equal(t, started, got.Started)
}
func TestRegistry_Unregister(t *testing.T) {
func TestRegistry_Unregister_Good(t *testing.T) {
dir := t.TempDir()
reg := NewRegistry(dir)
@ -53,7 +53,7 @@ func TestRegistry_Unregister(t *testing.T) {
require.NoError(t, err)
// File should exist
path := filepath.Join(dir, "myapp-server.json")
path := core.JoinPath(dir, "myapp-server.json")
_, err = os.Stat(path)
require.NoError(t, err)
@ -65,7 +65,7 @@ func TestRegistry_Unregister(t *testing.T) {
assert.True(t, os.IsNotExist(err))
}
func TestRegistry_List(t *testing.T) {
func TestRegistry_List_Good(t *testing.T) {
dir := t.TempDir()
reg := NewRegistry(dir)
@ -79,7 +79,7 @@ func TestRegistry_List(t *testing.T) {
assert.Len(t, entries, 2)
}
func TestRegistry_List_PrunesStale(t *testing.T) {
func TestRegistry_PruneStale_Good(t *testing.T) {
dir := t.TempDir()
reg := NewRegistry(dir)
@ -87,7 +87,7 @@ func TestRegistry_List_PrunesStale(t *testing.T) {
require.NoError(t, err)
// File should exist before listing
path := filepath.Join(dir, "dead-proc.json")
path := core.JoinPath(dir, "dead-proc.json")
_, err = os.Stat(path)
require.NoError(t, err)
@ -100,7 +100,7 @@ func TestRegistry_List_PrunesStale(t *testing.T) {
assert.True(t, os.IsNotExist(err))
}
func TestRegistry_Get_NotFound(t *testing.T) {
func TestRegistry_GetMissing_Bad(t *testing.T) {
dir := t.TempDir()
reg := NewRegistry(dir)
@ -109,8 +109,8 @@ func TestRegistry_Get_NotFound(t *testing.T) {
assert.False(t, ok)
}
func TestRegistry_CreatesDirectory(t *testing.T) {
dir := filepath.Join(t.TempDir(), "nested", "deep", "daemons")
func TestRegistry_CreateDirectory_Good(t *testing.T) {
dir := core.JoinPath(t.TempDir(), "nested", "deep", "daemons")
reg := NewRegistry(dir)
err := reg.Register(DaemonEntry{Code: "app", Daemon: "srv", PID: os.Getpid()})
@ -121,7 +121,7 @@ func TestRegistry_CreatesDirectory(t *testing.T) {
assert.True(t, info.IsDir())
}
func TestDefaultRegistry(t *testing.T) {
func TestRegistry_Default_Good(t *testing.T) {
reg := DefaultRegistry()
assert.NotNil(t, reg)
}

View file

@ -5,7 +5,7 @@ import (
"sync"
"time"
coreerr "dappco.re/go/core/log"
"dappco.re/go/core"
)
// Runner orchestrates multiple processes with dependencies.
@ -105,7 +105,7 @@ func (r *Runner) RunAll(ctx context.Context, specs []RunSpec) (*RunAllResult, er
Name: name,
Spec: remaining[name],
ExitCode: 1,
Error: coreerr.E("Runner.RunAll", "circular dependency or missing dependency", nil),
Error: core.E("runner.run_all", "circular dependency or missing dependency", nil),
})
}
break
@ -137,7 +137,7 @@ func (r *Runner) RunAll(ctx context.Context, specs []RunSpec) (*RunAllResult, er
Name: spec.Name,
Spec: spec,
Skipped: true,
Error: coreerr.E("Runner.RunAll", "skipped due to dependency failure", nil),
Error: core.E("runner.run_all", "skipped due to dependency failure", nil),
}
} else {
result = r.runSpec(ctx, spec)
@ -193,13 +193,17 @@ func (r *Runner) canRun(spec RunSpec, completed map[string]*RunResult) bool {
func (r *Runner) runSpec(ctx context.Context, spec RunSpec) RunResult {
start := time.Now()
proc, err := r.service.StartWithOptions(ctx, RunOptions{
sr := r.service.StartWithOptions(ctx, RunOptions{
Command: spec.Command,
Args: spec.Args,
Dir: spec.Dir,
Env: spec.Env,
})
if err != nil {
if !sr.OK {
err, _ := sr.Value.(error)
if err == nil {
err = core.E("runner.run_spec", core.Concat("failed to start: ", spec.Name), nil)
}
return RunResult{
Name: spec.Name,
Spec: spec,
@ -208,6 +212,7 @@ func (r *Runner) runSpec(ctx context.Context, spec RunSpec) RunResult {
}
}
proc := sr.Value.(*Process)
<-proc.Done()
return RunResult{

View file

@ -13,14 +13,12 @@ func newTestRunner(t *testing.T) *Runner {
t.Helper()
c := framework.New()
factory := NewService(Options{})
raw, err := factory(c)
require.NoError(t, err)
return NewRunner(raw.(*Service))
r := Register(c)
require.True(t, r.OK)
return NewRunner(r.Value.(*Service))
}
func TestRunner_RunSequential(t *testing.T) {
func TestRunner_RunSequential_Good(t *testing.T) {
t.Run("all pass", func(t *testing.T) {
runner := newTestRunner(t)
@ -70,7 +68,7 @@ func TestRunner_RunSequential(t *testing.T) {
})
}
func TestRunner_RunParallel(t *testing.T) {
func TestRunner_RunParallel_Good(t *testing.T) {
t.Run("all run concurrently", func(t *testing.T) {
runner := newTestRunner(t)
@ -102,7 +100,7 @@ func TestRunner_RunParallel(t *testing.T) {
})
}
func TestRunner_RunAll(t *testing.T) {
func TestRunner_RunAll_Good(t *testing.T) {
t.Run("respects dependencies", func(t *testing.T) {
runner := newTestRunner(t)
@ -150,7 +148,7 @@ func TestRunner_RunAll(t *testing.T) {
})
}
func TestRunner_RunAll_CircularDeps(t *testing.T) {
func TestRunner_CircularDeps_Bad(t *testing.T) {
t.Run("circular dependency counts as failed", func(t *testing.T) {
runner := newTestRunner(t)
@ -166,7 +164,7 @@ func TestRunner_RunAll_CircularDeps(t *testing.T) {
})
}
func TestRunResult_Passed(t *testing.T) {
func TestRunResult_Passed_Good(t *testing.T) {
t.Run("success", func(t *testing.T) {
r := RunResult{ExitCode: 0}
assert.True(t, r.Passed())

View file

@ -3,7 +3,6 @@ package process
import (
"bufio"
"context"
"errors"
"os"
"os/exec"
"sync"
@ -13,7 +12,6 @@ import (
"dappco.re/go/core"
)
// execCmd is kept for backwards-compatible test stubbing/mocking.
type execCmd = exec.Cmd
type streamReader interface {
@ -45,34 +43,18 @@ type Options struct {
BufferSize int
}
// NewService constructs a process service factory for Core registration.
//
// c := framework.New(core.WithName("process", process.NewService(process.Options{})))
func NewService(opts Options) func(*core.Core) (any, error) {
return func(c *core.Core) (any, error) {
if opts.BufferSize == 0 {
opts.BufferSize = DefaultBufferSize
}
svc := &Service{
ServiceRuntime: core.NewServiceRuntime(c, opts),
managed: core.NewRegistry[*ManagedProcess](),
bufSize: opts.BufferSize,
}
return svc, nil
}
}
// Register constructs a Service bound to the provided Core instance.
//
// c := core.New()
// svc := process.Register(c).Value.(*process.Service)
func Register(c *core.Core) core.Result {
r := NewService(Options{BufferSize: DefaultBufferSize})(c)
if r == nil {
return core.Result{Value: core.E("process.register", "factory returned nil service", nil), OK: false}
opts := Options{BufferSize: DefaultBufferSize}
svc := &Service{
ServiceRuntime: core.NewServiceRuntime(c, opts),
managed: core.NewRegistry[*ManagedProcess](),
bufSize: opts.BufferSize,
}
return core.Result{Value: r, OK: true}
return core.Result{Value: svc, OK: true}
}
// OnStartup implements core.Startable.
@ -87,6 +69,8 @@ func (s *Service) OnStartup(ctx context.Context) core.Result {
}
// OnShutdown implements core.Stoppable — kills all managed processes.
//
// c.ServiceShutdown(ctx) // calls OnShutdown on all Stoppable services
func (s *Service) OnShutdown(ctx context.Context) core.Result {
s.managed.Each(func(_ string, proc *ManagedProcess) {
_ = proc.Kill()
@ -96,8 +80,9 @@ func (s *Service) OnShutdown(ctx context.Context) core.Result {
// Start spawns a new process with the given command and args.
//
// proc := svc.Start(ctx, "echo", "hello")
func (s *Service) Start(ctx context.Context, command string, args ...string) (*ManagedProcess, error) {
// r := svc.Start(ctx, "echo", "hello")
// if r.OK { proc := r.Value.(*Process) }
func (s *Service) Start(ctx context.Context, command string, args ...string) core.Result {
return s.StartWithOptions(ctx, RunOptions{
Command: command,
Args: args,
@ -106,13 +91,9 @@ func (s *Service) Start(ctx context.Context, command string, args ...string) (*M
// StartWithOptions spawns a process with full configuration.
//
// proc := svc.StartWithOptions(ctx, process.RunOptions{Command: "go", Args: []string{"test", "./..."}})
func (s *Service) StartWithOptions(ctx context.Context, opts RunOptions) (*ManagedProcess, error) {
return startResultToProcess(s.startWithOptions(ctx, opts), "process.start")
}
// startWithOptions is the Result-form internal implementation for StartWithOptions.
func (s *Service) startWithOptions(ctx context.Context, opts RunOptions) core.Result {
// r := svc.StartWithOptions(ctx, process.RunOptions{Command: "go", Args: []string{"test", "./..."}})
// if r.OK { proc := r.Value.(*Process) }
func (s *Service) StartWithOptions(ctx context.Context, opts RunOptions) core.Result {
if opts.Command == "" {
return core.Result{Value: core.E("process.start", "command is required", nil), OK: false}
}
@ -137,12 +118,12 @@ func (s *Service) startWithOptions(ctx context.Context, opts RunOptions) core.Re
cmd.Env = append(cmd.Environ(), opts.Env...)
}
// Detached processes get their own process group.
// Detached processes get their own process group
if opts.Detach {
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
}
// Set up pipes.
// Set up pipes
stdout, err := cmd.StdoutPipe()
if err != nil {
cancel()
@ -161,7 +142,7 @@ func (s *Service) startWithOptions(ctx context.Context, opts RunOptions) core.Re
return core.Result{Value: core.E("process.start", core.Concat("stdin pipe failed: ", opts.Command), err), OK: false}
}
// Create output buffer (enabled by default).
// Create output buffer (enabled by default)
var output *RingBuffer
if !opts.DisableCapture {
output = NewRingBuffer(s.bufSize)
@ -185,33 +166,33 @@ func (s *Service) startWithOptions(ctx context.Context, opts RunOptions) core.Re
killGroup: opts.KillGroup && opts.Detach,
}
// Start the process.
// Start the process
if err := cmd.Start(); err != nil {
cancel()
return core.Result{Value: core.E("process.start", core.Concat("command failed: ", opts.Command), err), OK: false}
}
proc.PID = cmd.Process.Pid
// Store process.
// Store process
if r := s.managed.Set(id, proc); !r.OK {
cancel()
_ = cmd.Process.Kill()
return r
}
// Start timeout watchdog if configured.
// Start timeout watchdog if configured
if opts.Timeout > 0 {
go func() {
select {
case <-proc.done:
case <-time.After(opts.Timeout):
_ = proc.Shutdown()
proc.Shutdown()
}
}()
}
// Broadcast start.
_ = s.Core().ACTION(ActionProcessStarted{
// Broadcast start
s.Core().ACTION(ActionProcessStarted{
ID: id,
Command: opts.Command,
Args: opts.Args,
@ -219,7 +200,7 @@ func (s *Service) startWithOptions(ctx context.Context, opts RunOptions) core.Re
PID: cmd.Process.Pid,
})
// Stream output in goroutines.
// Stream output in goroutines
var wg sync.WaitGroup
wg.Add(2)
go func() {
@ -231,7 +212,7 @@ func (s *Service) startWithOptions(ctx context.Context, opts RunOptions) core.Re
s.streamOutput(proc, stderr, StreamStderr)
}()
// Wait for process completion.
// Wait for process completion
go func() {
wg.Wait()
waitErr := cmd.Wait()
@ -240,6 +221,7 @@ func (s *Service) startWithOptions(ctx context.Context, opts RunOptions) core.Re
status, exitCode, actionErr, killedSignal := classifyProcessExit(proc, waitErr)
proc.mu.Lock()
proc.PID = cmd.Process.Pid
proc.Duration = duration
proc.ExitCode = exitCode
proc.Status = status
@ -253,7 +235,7 @@ func (s *Service) startWithOptions(ctx context.Context, opts RunOptions) core.Re
Signal: killedSignal,
})
}
_ = s.Core().ACTION(ActionProcessExited{
s.Core().ACTION(ActionProcessExited{
ID: id,
ExitCode: exitCode,
Duration: duration,
@ -267,18 +249,18 @@ func (s *Service) startWithOptions(ctx context.Context, opts RunOptions) core.Re
// streamOutput reads from a pipe and broadcasts lines via ACTION.
func (s *Service) streamOutput(proc *ManagedProcess, r streamReader, stream Stream) {
scanner := bufio.NewScanner(r)
// Increase buffer for long lines.
// Increase buffer for long lines
scanner.Buffer(make([]byte, 64*1024), 1024*1024)
for scanner.Scan() {
line := scanner.Text()
// Write to ring buffer.
// Write to ring buffer
if proc.output != nil {
_, _ = proc.output.Write([]byte(line + "\n"))
}
// Broadcast output.
// Broadcast output
_ = s.Core().ACTION(ActionProcessOutput{
ID: proc.ID,
Line: line,
@ -293,7 +275,7 @@ func (s *Service) Get(id string) (*ManagedProcess, error) {
if !r.OK {
return nil, ErrProcessNotFound
}
return r.Value, nil
return r.Value.(*ManagedProcess), nil
}
// List returns all processes.
@ -368,7 +350,11 @@ func (s *Service) Output(id string) (string, error) {
}
// Run executes a command and waits for completion.
func (s *Service) Run(ctx context.Context, command string, args ...string) (string, error) {
// Value is always the output string. OK is true if exit code is 0.
//
// r := svc.Run(ctx, "go", "test", "./...")
// output := r.Value.(string)
func (s *Service) Run(ctx context.Context, command string, args ...string) core.Result {
return s.RunWithOptions(ctx, RunOptions{
Command: command,
Args: args,
@ -376,11 +362,15 @@ func (s *Service) Run(ctx context.Context, command string, args ...string) (stri
}
// RunWithOptions executes a command with options and waits for completion.
func (s *Service) RunWithOptions(ctx context.Context, opts RunOptions) (string, error) {
return runResultToString(s.runCommand(ctx, opts), "process.run")
// Value is always the output string. OK is true if exit code is 0.
//
// r := svc.RunWithOptions(ctx, process.RunOptions{Command: "go", Args: []string{"test"}})
// output := r.Value.(string)
func (s *Service) RunWithOptions(ctx context.Context, opts RunOptions) core.Result {
return s.runCommand(ctx, opts)
}
// --- Internal request helpers. ---
// --- Internal Request Helpers ---
func (s *Service) handleRun(ctx context.Context, opts core.Options) core.Result {
command := opts.String("command")
@ -399,11 +389,7 @@ func (s *Service) handleRun(ctx context.Context, opts core.Options) core.Result
runOpts.Env = optionStrings(r.Value)
}
result, err := s.runCommand(ctx, runOpts)
if err != nil {
return core.Result{Value: err, OK: false}
}
return core.Result{Value: result, OK: true}
return s.runCommand(ctx, runOpts)
}
func (s *Service) handleStart(ctx context.Context, opts core.Options) core.Result {
@ -412,30 +398,35 @@ func (s *Service) handleStart(ctx context.Context, opts core.Options) core.Resul
return core.Result{Value: core.E("process.start", "command is required", nil), OK: false}
}
startOpts := RunOptions{
Command: command,
Dir: opts.String("dir"),
Detach: opts.Bool("detach"),
}
if r := opts.Get("args"); r.OK {
startOpts.Args = optionStrings(r.Value)
}
if r := opts.Get("env"); r.OK {
startOpts.Env = optionStrings(r.Value)
detach := true
if opts.Has("detach") {
detach = opts.Bool("detach")
}
proc, err := s.StartWithOptions(ctx, startOpts)
if err != nil {
return core.Result{Value: err, OK: false}
runOpts := RunOptions{
Command: command,
Dir: opts.String("dir"),
Detach: detach,
}
return core.Result{Value: proc.ID, OK: true}
if r := opts.Get("args"); r.OK {
runOpts.Args = optionStrings(r.Value)
}
if r := opts.Get("env"); r.OK {
runOpts.Env = optionStrings(r.Value)
}
r := s.StartWithOptions(ctx, runOpts)
if !r.OK {
return r
}
return core.Result{Value: r.Value.(*ManagedProcess).ID, OK: true}
}
func (s *Service) handleKill(ctx context.Context, opts core.Options) core.Result {
id := opts.String("id")
if id != "" {
if err := s.Kill(id); err != nil {
if errors.Is(err, ErrProcessNotFound) {
if core.Is(err, ErrProcessNotFound) {
return core.Result{Value: core.E("process.kill", core.Concat("not found: ", id), nil), OK: false}
}
return core.Result{Value: err, OK: false}
@ -462,21 +453,9 @@ func (s *Service) handleList(ctx context.Context, opts core.Options) core.Result
return core.Result{Value: s.managed.Names(), OK: true}
}
func (s *Service) handleGet(ctx context.Context, opts core.Options) core.Result {
id := opts.String("id")
if id == "" {
return core.Result{Value: core.E("process.get", "id is required", nil), OK: false}
}
proc, err := s.Get(id)
if err != nil {
return core.Result{Value: core.E("process.get", core.Concat("not found: ", id), err), OK: false}
}
return core.Result{Value: proc.Info(), OK: true}
}
func (s *Service) runCommand(ctx context.Context, opts RunOptions) (string, error) {
func (s *Service) runCommand(ctx context.Context, opts RunOptions) core.Result {
if opts.Command == "" {
return "", core.E("process.run", "command is required", nil)
return core.Result{Value: core.E("process.run", "command is required", nil), OK: false}
}
if ctx == nil {
ctx = context.Background()
@ -492,109 +471,28 @@ func (s *Service) runCommand(ctx context.Context, opts RunOptions) (string, erro
output, err := cmd.CombinedOutput()
if err != nil {
return "", core.E("process.run", core.Concat("command failed: ", opts.Command), err)
return core.Result{Value: core.E("process.run", core.Concat("command failed: ", opts.Command), err), OK: false}
}
return string(output), nil
return core.Result{Value: string(output), OK: true}
}
func classifyProcessExit(proc *ManagedProcess, err error) (Status, int, error, string) {
if err == nil {
return StatusExited, 0, nil, ""
// Signal sends a signal to the process.
func (p *ManagedProcess) Signal(sig os.Signal) error {
p.mu.Lock()
defer p.mu.Unlock()
if p.Status != StatusRunning {
return ErrProcessNotRunning
}
if sig, ok := processExitSignal(err); ok {
return StatusKilled, -1, err, normalizeSignalName(sig)
}
if ctxErr := proc.ctx.Err(); ctxErr != nil {
signal := proc.requestedSignal()
if signal == "" {
signal = "SIGKILL"
}
return StatusKilled, -1, ctxErr, signal
}
var exitErr *exec.ExitError
if errors.As(err, &exitErr) {
return StatusExited, exitErr.ExitCode(), err, ""
}
return StatusFailed, -1, err, ""
}
func processExitSignal(err error) (syscall.Signal, bool) {
var exitErr *exec.ExitError
if !errors.As(err, &exitErr) || exitErr.ProcessState == nil {
return 0, false
}
waitStatus, ok := exitErr.ProcessState.Sys().(syscall.WaitStatus)
if !ok || !waitStatus.Signaled() {
return 0, false
}
return waitStatus.Signal(), true
}
func startResultToProcess(r core.Result, operation string) (*ManagedProcess, error) {
if r.OK {
proc, ok := r.Value.(*ManagedProcess)
if !ok {
return nil, core.E(operation, "invalid process result type", nil)
}
return proc, nil
}
if err, ok := r.Value.(error); ok {
return nil, err
}
return nil, core.E(operation, "process start failed", nil)
}
func runResultToString(r core.Result, operation string) (string, error) {
if r.OK {
output, ok := r.Value.(string)
if !ok {
return "", core.E(operation, "invalid run result type", nil)
}
return output, nil
}
if err, ok := r.Value.(error); ok {
return "", err
}
return "", core.E(operation, "process run failed", nil)
}
func normalizeSignalName(sig syscall.Signal) string {
switch sig {
case syscall.SIGINT:
return "SIGINT"
case syscall.SIGKILL:
return "SIGKILL"
case syscall.SIGTERM:
return "SIGTERM"
default:
return sig.String()
}
}
func optionStrings(value any) []string {
switch typed := value.(type) {
case nil:
return nil
case []string:
return append([]string(nil), typed...)
case []any:
result := make([]string, 0, len(typed))
for _, item := range typed {
text, ok := item.(string)
if !ok {
return nil
}
result = append(result, text)
}
return result
default:
if p.cmd == nil || p.cmd.Process == nil {
return nil
}
if signal, ok := sig.(syscall.Signal); ok {
p.lastSignal = normalizeSignalName(signal)
}
return p.cmd.Process.Signal(sig)
}
func execCommandContext(ctx context.Context, name string, args ...string) *exec.Cmd {
@ -624,3 +522,87 @@ func tempDir() string {
func isNotExist(err error) bool {
return os.IsNotExist(err)
}
func (s *Service) handleGet(ctx context.Context, opts core.Options) core.Result {
id := opts.String("id")
if id == "" {
return core.Result{Value: core.E("process.get", "id is required", nil), OK: false}
}
proc, err := s.Get(id)
if err != nil {
return core.Result{Value: core.E("process.get", core.Concat("not found: ", id), err), OK: false}
}
return core.Result{Value: proc.Info(), OK: true}
}
func optionStrings(value any) []string {
switch typed := value.(type) {
case nil:
return nil
case []string:
return append([]string(nil), typed...)
case []any:
result := make([]string, 0, len(typed))
for _, item := range typed {
text, ok := item.(string)
if !ok {
return nil
}
result = append(result, text)
}
return result
default:
return nil
}
}
func classifyProcessExit(proc *ManagedProcess, err error) (Status, int, error, string) {
if err == nil {
return StatusExited, 0, nil, ""
}
if sig, ok := processExitSignal(err); ok {
return StatusKilled, -1, err, normalizeSignalName(sig)
}
if ctxErr := proc.ctx.Err(); ctxErr != nil {
signal := proc.requestedSignal()
if signal == "" {
signal = "SIGKILL"
}
return StatusKilled, -1, ctxErr, signal
}
var exitErr *exec.ExitError
if core.As(err, &exitErr) {
return StatusExited, exitErr.ExitCode(), err, ""
}
return StatusFailed, -1, err, ""
}
func processExitSignal(err error) (syscall.Signal, bool) {
var exitErr *exec.ExitError
if !core.As(err, &exitErr) || exitErr.ProcessState == nil {
return 0, false
}
waitStatus, ok := exitErr.ProcessState.Sys().(syscall.WaitStatus)
if !ok || !waitStatus.Signaled() {
return 0, false
}
return waitStatus.Signal(), true
}
func normalizeSignalName(sig syscall.Signal) string {
switch sig {
case syscall.SIGINT:
return "SIGINT"
case syscall.SIGKILL:
return "SIGKILL"
case syscall.SIGTERM:
return "SIGTERM"
default:
return sig.String()
}
}

View file

@ -2,7 +2,6 @@ package process
import (
"context"
"strings"
"sync"
"testing"
"time"
@ -16,27 +15,267 @@ func newTestService(t *testing.T) (*Service, *framework.Core) {
t.Helper()
c := framework.New()
factory := NewService(Options{BufferSize: 1024})
raw, err := factory(c)
require.NoError(t, err)
r := Register(c)
require.True(t, r.OK)
return r.Value.(*Service), c
}
svc := raw.(*Service)
func newStartedTestService(t *testing.T) (*Service, *framework.Core) {
t.Helper()
svc, c := newTestService(t)
r := svc.OnStartup(context.Background())
require.True(t, r.OK)
return svc, c
}
func TestService_Start(t *testing.T) {
func TestService_Register_Good(t *testing.T) {
c := framework.New(framework.WithService(Register))
svc, ok := framework.ServiceFor[*Service](c, "process")
require.True(t, ok)
assert.NotNil(t, svc)
}
func TestService_OnStartup_Good(t *testing.T) {
svc, c := newTestService(t)
r := svc.OnStartup(context.Background())
require.True(t, r.OK)
assert.True(t, c.Action("process.run").Exists())
assert.True(t, c.Action("process.start").Exists())
assert.True(t, c.Action("process.kill").Exists())
assert.True(t, c.Action("process.list").Exists())
assert.True(t, c.Action("process.get").Exists())
}
func TestService_HandleRun_Good(t *testing.T) {
_, c := newStartedTestService(t)
r := c.Action("process.run").Run(context.Background(), framework.NewOptions(
framework.Option{Key: "command", Value: "echo"},
framework.Option{Key: "args", Value: []string{"hello"}},
))
require.True(t, r.OK)
assert.Contains(t, r.Value.(string), "hello")
}
func TestService_HandleRun_Bad(t *testing.T) {
_, c := newStartedTestService(t)
r := c.Action("process.run").Run(context.Background(), framework.NewOptions(
framework.Option{Key: "command", Value: "nonexistent_command_xyz"},
))
assert.False(t, r.OK)
}
func TestService_HandleRun_Ugly(t *testing.T) {
_, c := newStartedTestService(t)
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancel()
r := c.Action("process.run").Run(ctx, framework.NewOptions(
framework.Option{Key: "command", Value: "sleep"},
framework.Option{Key: "args", Value: []string{"1"}},
))
assert.False(t, r.OK)
}
func TestService_HandleStart_Good(t *testing.T) {
svc, c := newStartedTestService(t)
r := c.Action("process.start").Run(context.Background(), framework.NewOptions(
framework.Option{Key: "command", Value: "sleep"},
framework.Option{Key: "args", Value: []string{"60"}},
))
require.True(t, r.OK)
id := r.Value.(string)
proc, err := svc.Get(id)
require.NoError(t, err)
assert.True(t, proc.IsRunning())
kill := c.Action("process.kill").Run(context.Background(), framework.NewOptions(
framework.Option{Key: "id", Value: id},
))
require.True(t, kill.OK)
<-proc.Done()
t.Run("respects detach=false", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
start := c.Action("process.start").Run(ctx, framework.NewOptions(
framework.Option{Key: "command", Value: "sleep"},
framework.Option{Key: "args", Value: []string{"60"}},
framework.Option{Key: "detach", Value: false},
))
require.True(t, start.OK)
id := start.Value.(string)
proc, err := svc.Get(id)
require.NoError(t, err)
cancel()
select {
case <-proc.Done():
case <-time.After(2 * time.Second):
t.Fatal("process should honor detached=false context cancellation")
}
})
}
func TestService_HandleStart_Bad(t *testing.T) {
_, c := newStartedTestService(t)
r := c.Action("process.start").Run(context.Background(), framework.NewOptions(
framework.Option{Key: "command", Value: "nonexistent_command_xyz"},
))
assert.False(t, r.OK)
}
func TestService_HandleKill_Good(t *testing.T) {
svc, c := newStartedTestService(t)
start := c.Action("process.start").Run(context.Background(), framework.NewOptions(
framework.Option{Key: "command", Value: "sleep"},
framework.Option{Key: "args", Value: []string{"60"}},
))
require.True(t, start.OK)
id := start.Value.(string)
proc, err := svc.Get(id)
require.NoError(t, err)
kill := c.Action("process.kill").Run(context.Background(), framework.NewOptions(
framework.Option{Key: "id", Value: id},
))
require.True(t, kill.OK)
select {
case <-proc.Done():
case <-time.After(2 * time.Second):
t.Fatal("process should have been killed")
}
}
func TestService_HandleKill_Bad(t *testing.T) {
_, c := newStartedTestService(t)
r := c.Action("process.kill").Run(context.Background(), framework.NewOptions(
framework.Option{Key: "id", Value: "missing"},
))
assert.False(t, r.OK)
}
func TestService_HandleList_Good(t *testing.T) {
svc, c := newStartedTestService(t)
startOne := c.Action("process.start").Run(context.Background(), framework.NewOptions(
framework.Option{Key: "command", Value: "sleep"},
framework.Option{Key: "args", Value: []string{"60"}},
))
require.True(t, startOne.OK)
startTwo := c.Action("process.start").Run(context.Background(), framework.NewOptions(
framework.Option{Key: "command", Value: "sleep"},
framework.Option{Key: "args", Value: []string{"60"}},
))
require.True(t, startTwo.OK)
r := c.Action("process.list").Run(context.Background(), framework.NewOptions())
require.True(t, r.OK)
ids := r.Value.([]string)
assert.Len(t, ids, 2)
for _, id := range ids {
proc, err := svc.Get(id)
require.NoError(t, err)
_ = proc.Kill()
<-proc.Done()
}
}
func TestService_HandleGet_Good(t *testing.T) {
svc, c := newStartedTestService(t)
start := c.Action("process.start").Run(context.Background(), framework.NewOptions(
framework.Option{Key: "command", Value: "sleep"},
framework.Option{Key: "args", Value: []string{"60"}},
))
require.True(t, start.OK)
id := start.Value.(string)
r := c.Action("process.get").Run(context.Background(), framework.NewOptions(
framework.Option{Key: "id", Value: id},
))
require.True(t, r.OK)
info := r.Value.(ProcessInfo)
assert.Equal(t, id, info.ID)
assert.Equal(t, "sleep", info.Command)
assert.True(t, info.Running)
assert.Equal(t, StatusRunning, info.Status)
assert.Positive(t, info.PID)
proc, err := svc.Get(id)
require.NoError(t, err)
_ = proc.Kill()
<-proc.Done()
}
func TestService_HandleGet_Bad(t *testing.T) {
_, c := newStartedTestService(t)
missingID := c.Action("process.get").Run(context.Background(), framework.NewOptions())
assert.False(t, missingID.OK)
missingProc := c.Action("process.get").Run(context.Background(), framework.NewOptions(
framework.Option{Key: "id", Value: "missing"},
))
assert.False(t, missingProc.OK)
}
func TestService_Ugly_PermissionModel(t *testing.T) {
c := framework.New()
r := c.Process().Run(context.Background(), "echo", "blocked")
assert.False(t, r.OK)
c = framework.New(framework.WithService(Register))
startup := c.ServiceStartup(context.Background(), nil)
require.True(t, startup.OK)
defer func() {
shutdown := c.ServiceShutdown(context.Background())
assert.True(t, shutdown.OK)
}()
r = c.Process().Run(context.Background(), "echo", "allowed")
require.True(t, r.OK)
assert.Contains(t, r.Value.(string), "allowed")
}
func startProc(t *testing.T, svc *Service, ctx context.Context, command string, args ...string) *Process {
t.Helper()
r := svc.Start(ctx, command, args...)
require.True(t, r.OK)
return r.Value.(*Process)
}
func TestService_Start_Good(t *testing.T) {
t.Run("echo command", func(t *testing.T) {
svc, _ := newTestService(t)
proc, err := svc.Start(context.Background(), "echo", "hello")
require.NoError(t, err)
require.NotNil(t, proc)
proc := startProc(t, svc, context.Background(), "echo", "hello")
assert.NotEmpty(t, proc.ID)
assert.Positive(t, proc.PID)
assert.Equal(t, "echo", proc.Command)
assert.Equal(t, []string{"hello"}, proc.Args)
// Wait for completion
<-proc.Done()
assert.Equal(t, StatusExited, proc.Status)
@ -47,8 +286,7 @@ func TestService_Start(t *testing.T) {
t.Run("failing command", func(t *testing.T) {
svc, _ := newTestService(t)
proc, err := svc.Start(context.Background(), "sh", "-c", "exit 42")
require.NoError(t, err)
proc := startProc(t, svc, context.Background(), "sh", "-c", "exit 42")
<-proc.Done()
@ -59,23 +297,23 @@ func TestService_Start(t *testing.T) {
t.Run("non-existent command", func(t *testing.T) {
svc, _ := newTestService(t)
_, err := svc.Start(context.Background(), "nonexistent_command_xyz")
assert.Error(t, err)
r := svc.Start(context.Background(), "nonexistent_command_xyz")
assert.False(t, r.OK)
})
t.Run("with working directory", func(t *testing.T) {
svc, _ := newTestService(t)
proc, err := svc.StartWithOptions(context.Background(), RunOptions{
r := svc.StartWithOptions(context.Background(), RunOptions{
Command: "pwd",
Dir: "/tmp",
})
require.NoError(t, err)
require.True(t, r.OK)
proc := r.Value.(*Process)
<-proc.Done()
// On macOS /tmp is a symlink to /private/tmp
output := strings.TrimSpace(proc.Output())
output := framework.Trim(proc.Output())
assert.True(t, output == "/tmp" || output == "/private/tmp", "got: %s", output)
})
@ -83,15 +321,12 @@ func TestService_Start(t *testing.T) {
svc, _ := newTestService(t)
ctx, cancel := context.WithCancel(context.Background())
proc, err := svc.Start(ctx, "sleep", "10")
require.NoError(t, err)
proc := startProc(t, svc, ctx, "sleep", "10")
// Cancel immediately
cancel()
select {
case <-proc.Done():
// Good - process was killed
case <-time.After(2 * time.Second):
t.Fatal("process should have been killed")
}
@ -100,12 +335,13 @@ func TestService_Start(t *testing.T) {
t.Run("disable capture", func(t *testing.T) {
svc, _ := newTestService(t)
proc, err := svc.StartWithOptions(context.Background(), RunOptions{
r := svc.StartWithOptions(context.Background(), RunOptions{
Command: "echo",
Args: []string{"no-capture"},
DisableCapture: true,
})
require.NoError(t, err)
require.True(t, r.OK)
proc := r.Value.(*Process)
<-proc.Done()
assert.Equal(t, StatusExited, proc.Status)
@ -115,12 +351,13 @@ func TestService_Start(t *testing.T) {
t.Run("with environment variables", func(t *testing.T) {
svc, _ := newTestService(t)
proc, err := svc.StartWithOptions(context.Background(), RunOptions{
r := svc.StartWithOptions(context.Background(), RunOptions{
Command: "sh",
Args: []string{"-c", "echo $MY_TEST_VAR"},
Env: []string{"MY_TEST_VAR=hello_env"},
})
require.NoError(t, err)
require.True(t, r.OK)
proc := r.Value.(*Process)
<-proc.Done()
assert.Contains(t, proc.Output(), "hello_env")
@ -131,17 +368,16 @@ func TestService_Start(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
proc, err := svc.StartWithOptions(ctx, RunOptions{
r := svc.StartWithOptions(ctx, RunOptions{
Command: "echo",
Args: []string{"detached"},
Detach: true,
})
require.NoError(t, err)
require.True(t, r.OK)
proc := r.Value.(*Process)
// Cancel the parent context
cancel()
// Detached process should still complete normally
select {
case <-proc.Done():
assert.Equal(t, StatusExited, proc.Status)
@ -152,33 +388,26 @@ func TestService_Start(t *testing.T) {
})
}
func TestService_Run(t *testing.T) {
func TestService_Run_Good(t *testing.T) {
t.Run("returns output", func(t *testing.T) {
svc, _ := newTestService(t)
output, err := svc.Run(context.Background(), "echo", "hello world")
require.NoError(t, err)
assert.Contains(t, output, "hello world")
r := svc.Run(context.Background(), "echo", "hello world")
assert.True(t, r.OK)
assert.Contains(t, r.Value.(string), "hello world")
})
t.Run("returns error on failure", func(t *testing.T) {
t.Run("returns !OK on failure", func(t *testing.T) {
svc, _ := newTestService(t)
_, err := svc.Run(context.Background(), "sh", "-c", "exit 1")
assert.Error(t, err)
assert.Contains(t, err.Error(), "exited with code 1")
r := svc.Run(context.Background(), "sh", "-c", "exit 1")
assert.False(t, r.OK)
})
}
func TestService_Actions(t *testing.T) {
func TestService_Actions_Good(t *testing.T) {
t.Run("broadcasts events", func(t *testing.T) {
c := framework.New()
// Register process service on Core
factory := NewService(Options{})
raw, err := factory(c)
require.NoError(t, err)
svc := raw.(*Service)
svc, c := newTestService(t)
var started []ActionProcessStarted
var outputs []ActionProcessOutput
@ -198,12 +427,10 @@ func TestService_Actions(t *testing.T) {
}
return framework.Result{OK: true}
})
proc, err := svc.Start(context.Background(), "echo", "test")
require.NoError(t, err)
proc := startProc(t, svc, context.Background(), "echo", "test")
<-proc.Done()
// Give time for events to propagate
time.Sleep(10 * time.Millisecond)
mu.Lock()
@ -216,7 +443,7 @@ func TestService_Actions(t *testing.T) {
assert.NotEmpty(t, outputs)
foundTest := false
for _, o := range outputs {
if strings.Contains(o.Line, "test") {
if framework.Contains(o.Line, "test") {
foundTest = true
break
}
@ -226,14 +453,44 @@ func TestService_Actions(t *testing.T) {
assert.Len(t, exited, 1)
assert.Equal(t, 0, exited[0].ExitCode)
})
t.Run("broadcasts killed event", func(t *testing.T) {
svc, c := newTestService(t)
var killed []ActionProcessKilled
var mu sync.Mutex
c.RegisterAction(func(cc *framework.Core, msg framework.Message) framework.Result {
mu.Lock()
defer mu.Unlock()
if m, ok := msg.(ActionProcessKilled); ok {
killed = append(killed, m)
}
return framework.Result{OK: true}
})
proc := startProc(t, svc, context.Background(), "sleep", "60")
err := svc.Kill(proc.ID)
require.NoError(t, err)
<-proc.Done()
time.Sleep(10 * time.Millisecond)
mu.Lock()
defer mu.Unlock()
require.Len(t, killed, 1)
assert.Equal(t, proc.ID, killed[0].ID)
assert.Equal(t, "SIGKILL", killed[0].Signal)
})
}
func TestService_List(t *testing.T) {
func TestService_List_Good(t *testing.T) {
t.Run("tracks processes", func(t *testing.T) {
svc, _ := newTestService(t)
proc1, _ := svc.Start(context.Background(), "echo", "1")
proc2, _ := svc.Start(context.Background(), "echo", "2")
proc1 := startProc(t, svc, context.Background(), "echo", "1")
proc2 := startProc(t, svc, context.Background(), "echo", "2")
<-proc1.Done()
<-proc2.Done()
@ -245,7 +502,7 @@ func TestService_List(t *testing.T) {
t.Run("get by id", func(t *testing.T) {
svc, _ := newTestService(t)
proc, _ := svc.Start(context.Background(), "echo", "test")
proc := startProc(t, svc, context.Background(), "echo", "test")
<-proc.Done()
got, err := svc.Get(proc.ID)
@ -261,11 +518,11 @@ func TestService_List(t *testing.T) {
})
}
func TestService_Remove(t *testing.T) {
func TestService_Remove_Good(t *testing.T) {
t.Run("removes completed process", func(t *testing.T) {
svc, _ := newTestService(t)
proc, _ := svc.Start(context.Background(), "echo", "test")
proc := startProc(t, svc, context.Background(), "echo", "test")
<-proc.Done()
err := svc.Remove(proc.ID)
@ -281,7 +538,7 @@ func TestService_Remove(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
proc, _ := svc.Start(ctx, "sleep", "10")
proc := startProc(t, svc, ctx, "sleep", "10")
err := svc.Remove(proc.ID)
assert.Error(t, err)
@ -291,12 +548,12 @@ func TestService_Remove(t *testing.T) {
})
}
func TestService_Clear(t *testing.T) {
func TestService_Clear_Good(t *testing.T) {
t.Run("clears completed processes", func(t *testing.T) {
svc, _ := newTestService(t)
proc1, _ := svc.Start(context.Background(), "echo", "1")
proc2, _ := svc.Start(context.Background(), "echo", "2")
proc1 := startProc(t, svc, context.Background(), "echo", "1")
proc2 := startProc(t, svc, context.Background(), "echo", "2")
<-proc1.Done()
<-proc2.Done()
@ -309,22 +566,20 @@ func TestService_Clear(t *testing.T) {
})
}
func TestService_Kill(t *testing.T) {
func TestService_Kill_Good(t *testing.T) {
t.Run("kills running process", func(t *testing.T) {
svc, _ := newTestService(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
proc, err := svc.Start(ctx, "sleep", "60")
require.NoError(t, err)
proc := startProc(t, svc, ctx, "sleep", "60")
err = svc.Kill(proc.ID)
err := svc.Kill(proc.ID)
assert.NoError(t, err)
select {
case <-proc.Done():
// Process killed successfully
case <-time.After(2 * time.Second):
t.Fatal("process should have been killed")
}
@ -338,12 +593,11 @@ func TestService_Kill(t *testing.T) {
})
}
func TestService_Output(t *testing.T) {
func TestService_Output_Good(t *testing.T) {
t.Run("returns captured output", func(t *testing.T) {
svc, _ := newTestService(t)
proc, err := svc.Start(context.Background(), "echo", "captured")
require.NoError(t, err)
proc := startProc(t, svc, context.Background(), "echo", "captured")
<-proc.Done()
output, err := svc.Output(proc.ID)
@ -359,17 +613,15 @@ func TestService_Output(t *testing.T) {
})
}
func TestService_OnShutdown(t *testing.T) {
func TestService_OnShutdown_Good(t *testing.T) {
t.Run("kills all running processes", func(t *testing.T) {
svc, _ := newTestService(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
proc1, err := svc.Start(ctx, "sleep", "60")
require.NoError(t, err)
proc2, err := svc.Start(ctx, "sleep", "60")
require.NoError(t, err)
proc1 := startProc(t, svc, ctx, "sleep", "60")
proc2 := startProc(t, svc, ctx, "sleep", "60")
assert.True(t, proc1.IsRunning())
assert.True(t, proc2.IsRunning())
@ -390,50 +642,38 @@ func TestService_OnShutdown(t *testing.T) {
})
}
func TestService_OnStartup(t *testing.T) {
t.Run("returns nil", func(t *testing.T) {
svc, _ := newTestService(t)
r := svc.OnStartup(context.Background())
assert.True(t, r.OK)
})
}
func TestService_RunWithOptions(t *testing.T) {
func TestService_RunWithOptions_Good(t *testing.T) {
t.Run("returns output on success", func(t *testing.T) {
svc, _ := newTestService(t)
output, err := svc.RunWithOptions(context.Background(), RunOptions{
r := svc.RunWithOptions(context.Background(), RunOptions{
Command: "echo",
Args: []string{"opts-test"},
})
require.NoError(t, err)
assert.Contains(t, output, "opts-test")
assert.True(t, r.OK)
assert.Contains(t, r.Value.(string), "opts-test")
})
t.Run("returns error on failure", func(t *testing.T) {
t.Run("returns !OK on failure", func(t *testing.T) {
svc, _ := newTestService(t)
_, err := svc.RunWithOptions(context.Background(), RunOptions{
r := svc.RunWithOptions(context.Background(), RunOptions{
Command: "sh",
Args: []string{"-c", "exit 2"},
})
assert.Error(t, err)
assert.Contains(t, err.Error(), "exited with code 2")
assert.False(t, r.OK)
})
}
func TestService_Running(t *testing.T) {
func TestService_Running_Good(t *testing.T) {
t.Run("returns only running processes", func(t *testing.T) {
svc, _ := newTestService(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
proc1, err := svc.Start(ctx, "sleep", "60")
require.NoError(t, err)
proc2, err := svc.Start(context.Background(), "echo", "done")
require.NoError(t, err)
proc1 := startProc(t, svc, ctx, "sleep", "60")
proc2 := startProc(t, svc, context.Background(), "echo", "done")
<-proc2.Done()
running := svc.Running()

View file

@ -5,30 +5,24 @@
//
// # Getting Started
//
// // Register with Core
// core, _ := framework.New(
// framework.WithName("process", process.NewService(process.Options{})),
// )
// c := core.New(core.WithService(process.Register))
// _ = c.ServiceStartup(ctx, nil)
//
// // Get service and run a process
// svc, err := framework.ServiceFor[*process.Service](core, "process")
// if err != nil {
// return err
// }
// proc, err := svc.Start(ctx, "go", "test", "./...")
// r := c.Process().Run(ctx, "go", "test", "./...")
// output := r.Value.(string)
//
// # Listening for Events
//
// Process events are broadcast via Core.ACTION:
//
// core.RegisterAction(func(c *framework.Core, msg framework.Message) error {
// c.RegisterAction(func(c *core.Core, msg core.Message) core.Result {
// switch m := msg.(type) {
// case process.ActionProcessOutput:
// fmt.Print(m.Line)
// case process.ActionProcessExited:
// fmt.Printf("Exit code: %d\n", m.ExitCode)
// }
// return nil
// return core.Result{OK: true}
// })
package process
@ -91,15 +85,19 @@ type RunOptions struct {
KillGroup bool
}
// Info provides a snapshot of process state without internal fields.
type Info struct {
// ProcessInfo provides a snapshot of process state without internal fields.
type ProcessInfo struct {
ID string `json:"id"`
Command string `json:"command"`
Args []string `json:"args"`
Dir string `json:"dir"`
StartedAt time.Time `json:"startedAt"`
Running bool `json:"running"`
Status Status `json:"status"`
ExitCode int `json:"exitCode"`
Duration time.Duration `json:"duration"`
PID int `json:"pid"`
}
// Info is kept as a compatibility alias for ProcessInfo.
type Info = ProcessInfo