fix(ax): complete v0.8.0 process compliance

Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
Virgil 2026-03-27 05:16:27 +00:00
parent 1425023862
commit 2ccd84b87a
15 changed files with 388 additions and 614 deletions

View file

@ -5,7 +5,7 @@ import (
"sync"
"time"
coreerr "dappco.re/go/core/log"
"dappco.re/go/core"
)
// DaemonOptions configures daemon mode execution.
@ -77,7 +77,7 @@ func (d *Daemon) Start() error {
defer d.mu.Unlock()
if d.running {
return coreerr.E("Daemon.Start", "daemon already running", nil)
return core.E("daemon.start", "daemon already running", nil)
}
if d.pid != nil {
@ -105,7 +105,16 @@ func (d *Daemon) Start() error {
entry.Health = d.health.Addr()
}
if err := d.opts.Registry.Register(entry); err != nil {
return coreerr.E("Daemon.Start", "registry", err)
if d.health != nil {
shutdownCtx, cancel := context.WithTimeout(context.Background(), d.opts.ShutdownTimeout)
_ = d.health.Stop(shutdownCtx)
cancel()
}
if d.pid != nil {
_ = d.pid.Release()
}
d.running = false
return core.E("daemon.start", "registry", err)
}
}
@ -117,7 +126,7 @@ func (d *Daemon) Run(ctx context.Context) error {
d.mu.Lock()
if !d.running {
d.mu.Unlock()
return coreerr.E("Daemon.Run", "daemon not started - call Start() first", nil)
return core.E("daemon.run", "daemon not started - call Start() first", nil)
}
d.mu.Unlock()
@ -143,13 +152,13 @@ func (d *Daemon) Stop() error {
if d.health != nil {
d.health.SetReady(false)
if err := d.health.Stop(shutdownCtx); err != nil {
errs = append(errs, coreerr.E("Daemon.Stop", "health server", err))
errs = append(errs, core.E("daemon.stop", "health server", err))
}
}
if d.pid != nil {
if err := d.pid.Release(); err != nil && !isNotExist(err) {
errs = append(errs, coreerr.E("Daemon.Stop", "pid file", err))
errs = append(errs, core.E("daemon.stop", "pid file", err))
}
}
@ -161,7 +170,7 @@ func (d *Daemon) Stop() error {
d.running = false
if len(errs) > 0 {
return coreerr.Join(errs...)
return core.ErrorJoin(errs...)
}
return nil
}

View file

@ -1,261 +0,0 @@
package process
import (
"context"
"sync"
"testing"
framework "dappco.re/go/core"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestGlobal_NotInitialized_Bad(t *testing.T) {
old := defaultService.Swap(nil)
defer func() {
if old != nil {
defaultService.Store(old)
}
}()
assert.Nil(t, Default())
r := Start(context.Background(), "echo", "test")
assert.False(t, r.OK)
r = Run(context.Background(), "echo", "test")
assert.False(t, r.OK)
_, err := Get("proc-1")
assert.ErrorIs(t, err, ErrServiceNotInitialized)
assert.Nil(t, List())
assert.Nil(t, Running())
err = Kill("proc-1")
assert.ErrorIs(t, err, ErrServiceNotInitialized)
r = StartWithOptions(context.Background(), RunOptions{Command: "echo"})
assert.False(t, r.OK)
r = RunWithOptions(context.Background(), RunOptions{Command: "echo"})
assert.False(t, r.OK)
}
func newGlobalTestService(t *testing.T) *Service {
t.Helper()
c := framework.New()
factory := NewService(Options{})
raw, err := factory(c)
require.NoError(t, err)
return raw.(*Service)
}
func TestGlobal_SetDefault_Good(t *testing.T) {
t.Run("sets and retrieves service", func(t *testing.T) {
old := defaultService.Swap(nil)
defer func() {
if old != nil {
defaultService.Store(old)
}
}()
svc := newGlobalTestService(t)
err := SetDefault(svc)
require.NoError(t, err)
assert.Equal(t, svc, Default())
})
t.Run("errors on nil", func(t *testing.T) {
err := SetDefault(nil)
assert.Error(t, err)
})
}
func TestGlobal_DefaultConcurrent_Good(t *testing.T) {
old := defaultService.Swap(nil)
defer func() {
if old != nil {
defaultService.Store(old)
}
}()
svc := newGlobalTestService(t)
err := SetDefault(svc)
require.NoError(t, err)
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
s := Default()
assert.NotNil(t, s)
assert.Equal(t, svc, s)
}()
}
wg.Wait()
}
func TestGlobal_SetDefaultConcurrent_Good(t *testing.T) {
old := defaultService.Swap(nil)
defer func() {
if old != nil {
defaultService.Store(old)
}
}()
var services []*Service
for i := 0; i < 10; i++ {
svc := newGlobalTestService(t)
services = append(services, svc)
}
var wg sync.WaitGroup
for _, svc := range services {
wg.Add(1)
go func(s *Service) {
defer wg.Done()
_ = SetDefault(s)
}(svc)
}
wg.Wait()
final := Default()
assert.NotNil(t, final)
found := false
for _, svc := range services {
if svc == final {
found = true
break
}
}
assert.True(t, found, "Default should be one of the set services")
}
func TestGlobal_ConcurrentOps_Good(t *testing.T) {
old := defaultService.Swap(nil)
defer func() {
if old != nil {
defaultService.Store(old)
}
}()
svc := newGlobalTestService(t)
err := SetDefault(svc)
require.NoError(t, err)
var wg sync.WaitGroup
var processes []*Process
var procMu sync.Mutex
for i := 0; i < 20; i++ {
wg.Add(1)
go func() {
defer wg.Done()
r := Start(context.Background(), "echo", "concurrent")
if r.OK {
procMu.Lock()
processes = append(processes, r.Value.(*Process))
procMu.Unlock()
}
}()
}
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
_ = List()
_ = Running()
}()
}
wg.Wait()
procMu.Lock()
for _, p := range processes {
<-p.Done()
}
procMu.Unlock()
assert.Len(t, processes, 20)
var wg2 sync.WaitGroup
for _, p := range processes {
wg2.Add(1)
go func(id string) {
defer wg2.Done()
got, err := Get(id)
assert.NoError(t, err)
assert.NotNil(t, got)
}(p.ID)
}
wg2.Wait()
}
func TestGlobal_StartWithOptions_Good(t *testing.T) {
svc, _ := newTestService(t)
old := defaultService.Swap(svc)
defer func() {
if old != nil {
defaultService.Store(old)
}
}()
r := StartWithOptions(context.Background(), RunOptions{
Command: "echo",
Args: []string{"with", "options"},
})
require.True(t, r.OK)
proc := r.Value.(*Process)
<-proc.Done()
assert.Equal(t, 0, proc.ExitCode)
assert.Contains(t, proc.Output(), "with options")
}
func TestGlobal_RunWithOptions_Good(t *testing.T) {
svc, _ := newTestService(t)
old := defaultService.Swap(svc)
defer func() {
if old != nil {
defaultService.Store(old)
}
}()
r := RunWithOptions(context.Background(), RunOptions{
Command: "echo",
Args: []string{"run", "options"},
})
assert.True(t, r.OK)
assert.Contains(t, r.Value.(string), "run options")
}
func TestGlobal_Running_Good(t *testing.T) {
svc, _ := newTestService(t)
old := defaultService.Swap(svc)
defer func() {
if old != nil {
defaultService.Store(old)
}
}()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
r := Start(ctx, "sleep", "60")
require.True(t, r.OK)
proc := r.Value.(*Process)
running := Running()
assert.Len(t, running, 1)
assert.Equal(t, proc.ID, running[0].ID)
cancel()
<-proc.Done()
running = Running()
assert.Len(t, running, 0)
}

4
go.mod
View file

@ -3,9 +3,8 @@ module dappco.re/go/core/process
go 1.26.0
require (
dappco.re/go/core v0.5.0
dappco.re/go/core v0.8.0-alpha.1
dappco.re/go/core/io v0.2.0
dappco.re/go/core/log v0.1.0
dappco.re/go/core/ws v0.3.0
forge.lthn.ai/core/api v0.1.5
github.com/gin-gonic/gin v1.12.0
@ -13,6 +12,7 @@ require (
)
require (
dappco.re/go/core/log v0.1.0 // indirect
forge.lthn.ai/core/go-io v0.1.5 // indirect
forge.lthn.ai/core/go-log v0.0.4 // indirect
github.com/99designs/gqlgen v0.17.88 // indirect

4
go.sum
View file

@ -1,5 +1,5 @@
dappco.re/go/core v0.5.0 h1:P5DJoaCiK5Q+af5UiTdWqUIW4W4qYKzpgGK50thm21U=
dappco.re/go/core v0.5.0/go.mod h1:f2/tBZ3+3IqDrg2F5F598llv0nmb/4gJVCFzM5geE4A=
dappco.re/go/core v0.8.0-alpha.1 h1:gj7+Scv+L63Z7wMxbJYHhaRFkHJo2u4MMPuUSv/Dhtk=
dappco.re/go/core v0.8.0-alpha.1/go.mod h1:f2/tBZ3+3IqDrg2F5F598llv0nmb/4gJVCFzM5geE4A=
dappco.re/go/core/io v0.2.0 h1:zuudgIiTsQQ5ipVt97saWdGLROovbEB/zdVyy9/l+I4=
dappco.re/go/core/io v0.2.0/go.mod h1:1QnQV6X9LNgFKfm8SkOtR9LLaj3bDcsOIeJOOyjbL5E=
dappco.re/go/core/log v0.1.0 h1:pa71Vq2TD2aoEUQWFKwNcaJ3GBY8HbaNGqtE688Unyc=

View file

@ -8,7 +8,6 @@ import (
"time"
"dappco.re/go/core"
coreerr "dappco.re/go/core/log"
)
// HealthCheck is a function that returns nil if healthy.
@ -90,7 +89,7 @@ func (h *HealthServer) Start() error {
listener, err := net.Listen("tcp", h.addr)
if err != nil {
return coreerr.E("HealthServer.Start", "failed to listen on "+h.addr, err)
return core.E("health.start", core.Concat("failed to listen on ", h.addr), err)
}
h.listener = listener

View file

@ -7,8 +7,8 @@ import (
"sync"
"syscall"
"dappco.re/go/core"
coreio "dappco.re/go/core/io"
coreerr "dappco.re/go/core/log"
)
// PIDFile manages a process ID file for single-instance enforcement.
@ -33,7 +33,7 @@ func (p *PIDFile) Acquire() error {
if err == nil && pid > 0 {
if proc, err := processHandle(pid); err == nil {
if err := proc.Signal(syscall.Signal(0)); err == nil {
return coreerr.E("PIDFile.Acquire", "another instance is running (PID "+strconv.Itoa(pid)+")", nil)
return core.E("pidfile.acquire", core.Concat("another instance is running (PID ", strconv.Itoa(pid), ")"), nil)
}
}
}
@ -42,13 +42,13 @@ func (p *PIDFile) Acquire() error {
if dir := path.Dir(p.path); dir != "." {
if err := coreio.Local.EnsureDir(dir); err != nil {
return coreerr.E("PIDFile.Acquire", "failed to create PID directory", err)
return core.E("pidfile.acquire", "failed to create PID directory", err)
}
}
pid := currentPID()
if err := coreio.Local.Write(p.path, strconv.Itoa(pid)); err != nil {
return coreerr.E("PIDFile.Acquire", "failed to write PID file", err)
return core.E("pidfile.acquire", "failed to write PID file", err)
}
return nil
@ -59,7 +59,7 @@ func (p *PIDFile) Release() error {
p.mu.Lock()
defer p.mu.Unlock()
if err := coreio.Local.Delete(p.path); err != nil {
return coreerr.E("PIDFile.Release", "failed to remove PID file", err)
return core.E("pidfile.release", "failed to remove PID file", err)
}
return nil
}

View file

@ -7,7 +7,7 @@ import (
"syscall"
"time"
coreerr "dappco.re/go/core/log"
"dappco.re/go/core"
)
type processStdin interface {
@ -15,8 +15,8 @@ type processStdin interface {
Close() error
}
// Process represents a managed external process.
type Process struct {
// ManagedProcess represents a tracked external process started by the service.
type ManagedProcess struct {
ID string
Command string
Args []string
@ -38,8 +38,11 @@ type Process struct {
killGroup bool
}
// Process is kept as a compatibility alias for ManagedProcess.
type Process = ManagedProcess
// Info returns a snapshot of process state.
func (p *Process) Info() Info {
func (p *ManagedProcess) Info() ProcessInfo {
p.mu.RLock()
defer p.mu.RUnlock()
@ -48,12 +51,13 @@ func (p *Process) Info() Info {
pid = p.cmd.Process.Pid
}
return Info{
return ProcessInfo{
ID: p.ID,
Command: p.Command,
Args: p.Args,
Dir: p.Dir,
StartedAt: p.StartedAt,
Running: p.Status == StatusRunning,
Status: p.Status,
ExitCode: p.ExitCode,
Duration: p.Duration,
@ -62,7 +66,7 @@ func (p *Process) Info() Info {
}
// Output returns the captured output as a string.
func (p *Process) Output() string {
func (p *ManagedProcess) Output() string {
p.mu.RLock()
defer p.mu.RUnlock()
if p.output == nil {
@ -72,7 +76,7 @@ func (p *Process) Output() string {
}
// OutputBytes returns the captured output as bytes.
func (p *Process) OutputBytes() []byte {
func (p *ManagedProcess) OutputBytes() []byte {
p.mu.RLock()
defer p.mu.RUnlock()
if p.output == nil {
@ -82,37 +86,40 @@ func (p *Process) OutputBytes() []byte {
}
// IsRunning returns true if the process is still executing.
func (p *Process) IsRunning() bool {
p.mu.RLock()
defer p.mu.RUnlock()
return p.Status == StatusRunning
func (p *ManagedProcess) IsRunning() bool {
select {
case <-p.done:
return false
default:
return true
}
}
// Wait blocks until the process exits.
func (p *Process) Wait() error {
func (p *ManagedProcess) Wait() error {
<-p.done
p.mu.RLock()
defer p.mu.RUnlock()
if p.Status == StatusFailed {
return coreerr.E("Process.Wait", "process failed to start: "+p.ID, nil)
return core.E("process.wait", core.Concat("process failed to start: ", p.ID), nil)
}
if p.Status == StatusKilled {
return coreerr.E("Process.Wait", "process was killed: "+p.ID, nil)
return core.E("process.wait", core.Concat("process was killed: ", p.ID), nil)
}
if p.ExitCode != 0 {
return coreerr.E("Process.Wait", "process exited with code "+strconv.Itoa(p.ExitCode), nil)
return core.E("process.wait", core.Concat("process exited with code ", strconv.Itoa(p.ExitCode)), nil)
}
return nil
}
// Done returns a channel that closes when the process exits.
func (p *Process) Done() <-chan struct{} {
func (p *ManagedProcess) Done() <-chan struct{} {
return p.done
}
// Kill forcefully terminates the process.
// If KillGroup is set, kills the entire process group.
func (p *Process) Kill() error {
func (p *ManagedProcess) Kill() error {
p.mu.Lock()
defer p.mu.Unlock()
@ -134,7 +141,7 @@ func (p *Process) Kill() error {
// Shutdown gracefully stops the process: SIGTERM, then SIGKILL after grace period.
// If GracePeriod was not set (zero), falls back to immediate Kill().
// If KillGroup is set, signals are sent to the entire process group.
func (p *Process) Shutdown() error {
func (p *ManagedProcess) Shutdown() error {
p.mu.RLock()
grace := p.gracePeriod
p.mu.RUnlock()
@ -158,7 +165,7 @@ func (p *Process) Shutdown() error {
}
// terminate sends SIGTERM to the process (or process group if KillGroup is set).
func (p *Process) terminate() error {
func (p *ManagedProcess) terminate() error {
p.mu.Lock()
defer p.mu.Unlock()
@ -178,7 +185,7 @@ func (p *Process) terminate() error {
}
// SendInput writes to the process stdin.
func (p *Process) SendInput(input string) error {
func (p *ManagedProcess) SendInput(input string) error {
p.mu.RLock()
defer p.mu.RUnlock()
@ -195,7 +202,7 @@ func (p *Process) SendInput(input string) error {
}
// CloseStdin closes the process stdin pipe.
func (p *Process) CloseStdin() error {
func (p *ManagedProcess) CloseStdin() error {
p.mu.Lock()
defer p.mu.Unlock()

View file

@ -1,130 +0,0 @@
package process
import (
"context"
"sync"
"sync/atomic"
"dappco.re/go/core"
coreerr "dappco.re/go/core/log"
)
// Global default service (follows i18n pattern).
var (
defaultService atomic.Pointer[Service]
defaultOnce sync.Once
defaultErr error
)
// Default returns the global process service.
// Returns nil if not initialized.
func Default() *Service {
return defaultService.Load()
}
// SetDefault sets the global process service.
// Thread-safe: can be called concurrently with Default().
func SetDefault(s *Service) error {
if s == nil {
return ErrSetDefaultNil
}
defaultService.Store(s)
return nil
}
// Init initializes the default global service with a Core instance.
// This is typically called during application startup.
func Init(c *core.Core) error {
defaultOnce.Do(func() {
factory := NewService(Options{})
svc, err := factory(c)
if err != nil {
defaultErr = err
return
}
defaultService.Store(svc.(*Service))
})
return defaultErr
}
// --- Global convenience functions ---
// Start spawns a new process using the default service.
func Start(ctx context.Context, command string, args ...string) core.Result {
svc := Default()
if svc == nil {
return core.Result{OK: false}
}
return svc.Start(ctx, command, args...)
}
// Run executes a command and waits for completion using the default service.
func Run(ctx context.Context, command string, args ...string) core.Result {
svc := Default()
if svc == nil {
return core.Result{Value: "", OK: false}
}
return svc.Run(ctx, command, args...)
}
// Get returns a process by ID from the default service.
func Get(id string) (*Process, error) {
svc := Default()
if svc == nil {
return nil, ErrServiceNotInitialized
}
return svc.Get(id)
}
// List returns all processes from the default service.
func List() []*Process {
svc := Default()
if svc == nil {
return nil
}
return svc.List()
}
// Kill terminates a process by ID using the default service.
func Kill(id string) error {
svc := Default()
if svc == nil {
return ErrServiceNotInitialized
}
return svc.Kill(id)
}
// StartWithOptions spawns a process with full configuration using the default service.
func StartWithOptions(ctx context.Context, opts RunOptions) core.Result {
svc := Default()
if svc == nil {
return core.Result{OK: false}
}
return svc.StartWithOptions(ctx, opts)
}
// RunWithOptions executes a command with options and waits using the default service.
func RunWithOptions(ctx context.Context, opts RunOptions) core.Result {
svc := Default()
if svc == nil {
return core.Result{Value: "", OK: false}
}
return svc.RunWithOptions(ctx, opts)
}
// Running returns all currently running processes from the default service.
func Running() []*Process {
svc := Default()
if svc == nil {
return nil
}
return svc.Running()
}
// Errors
var (
// ErrServiceNotInitialized is returned when the service is not initialized.
ErrServiceNotInitialized = coreerr.E("", "process: service not initialized; call process.Init(core) first", nil)
// ErrSetDefaultNil is returned when SetDefault is called with nil.
ErrSetDefaultNil = coreerr.E("", "process: SetDefault called with nil service", nil)
)

View file

@ -5,12 +5,12 @@ import (
"context"
"strconv"
coreerr "dappco.re/go/core/log"
"dappco.re/go/core"
)
// ErrProgramNotFound is returned when Find cannot locate the binary on PATH.
// Callers may use core.Is to detect this condition.
var ErrProgramNotFound = coreerr.E("", "program: binary not found in PATH", nil)
var ErrProgramNotFound = core.E("", "program: binary not found in PATH", nil)
// Program represents a named executable located on the system PATH.
// Create one with a Name, call Find to resolve its path, then Run or RunDir.
@ -30,11 +30,11 @@ type Program struct {
// err := p.Find()
func (p *Program) Find() error {
if p.Name == "" {
return coreerr.E("Program.Find", "program name is empty", nil)
return core.E("program.find", "program name is empty", nil)
}
path, err := execLookPath(p.Name)
if err != nil {
return coreerr.E("Program.Find", strconv.Quote(p.Name)+": not found in PATH", ErrProgramNotFound)
return core.E("program.find", core.Concat(strconv.Quote(p.Name), ": not found in PATH"), ErrProgramNotFound)
}
p.Path = path
return nil
@ -68,7 +68,7 @@ func (p *Program) RunDir(ctx context.Context, dir string, args ...string) (strin
}
if err := cmd.Run(); err != nil {
return string(bytes.TrimSpace(out.Bytes())), coreerr.E("Program.RunDir", strconv.Quote(p.Name)+": command failed", err)
return string(bytes.TrimSpace(out.Bytes())), core.E("program.run", core.Concat(strconv.Quote(p.Name), ": command failed"), err)
}
return string(bytes.TrimSpace(out.Bytes())), nil
}

View file

@ -8,7 +8,6 @@ import (
"dappco.re/go/core"
coreio "dappco.re/go/core/io"
coreerr "dappco.re/go/core/log"
)
// DaemonEntry records a running daemon in the registry.
@ -58,16 +57,16 @@ func (r *Registry) Register(entry DaemonEntry) error {
}
if err := coreio.Local.EnsureDir(r.dir); err != nil {
return coreerr.E("Registry.Register", "failed to create registry directory", err)
return core.E("registry.register", "failed to create registry directory", err)
}
data, err := marshalDaemonEntry(entry)
if err != nil {
return coreerr.E("Registry.Register", "failed to marshal entry", err)
return core.E("registry.register", "failed to marshal entry", err)
}
if err := coreio.Local.Write(r.entryPath(entry.Code, entry.Daemon), data); err != nil {
return coreerr.E("Registry.Register", "failed to write entry file", err)
return core.E("registry.register", "failed to write entry file", err)
}
return nil
}
@ -75,7 +74,7 @@ func (r *Registry) Register(entry DaemonEntry) error {
// Unregister removes a daemon entry from the registry.
func (r *Registry) Unregister(code, daemon string) error {
if err := coreio.Local.Delete(r.entryPath(code, daemon)); err != nil {
return coreerr.E("Registry.Unregister", "failed to delete entry file", err)
return core.E("registry.unregister", "failed to delete entry file", err)
}
return nil
}
@ -112,7 +111,7 @@ func (r *Registry) List() ([]DaemonEntry, error) {
entries, err := coreio.Local.List(r.dir)
if err != nil {
return nil, coreerr.E("Registry.List", "failed to list registry directory", err)
return nil, core.E("registry.list", "failed to list registry directory", err)
}
var alive []DaemonEntry

View file

@ -5,7 +5,7 @@ import (
"sync"
"time"
coreerr "dappco.re/go/core/log"
"dappco.re/go/core"
)
// Runner orchestrates multiple processes with dependencies.
@ -105,7 +105,7 @@ func (r *Runner) RunAll(ctx context.Context, specs []RunSpec) (*RunAllResult, er
Name: name,
Spec: remaining[name],
ExitCode: 1,
Error: coreerr.E("Runner.RunAll", "circular dependency or missing dependency", nil),
Error: core.E("runner.run_all", "circular dependency or missing dependency", nil),
})
}
break
@ -137,7 +137,7 @@ func (r *Runner) RunAll(ctx context.Context, specs []RunSpec) (*RunAllResult, er
Name: spec.Name,
Spec: spec,
Skipped: true,
Error: coreerr.E("Runner.RunAll", "skipped due to dependency failure", nil),
Error: core.E("runner.run_all", "skipped due to dependency failure", nil),
}
} else {
result = r.runSpec(ctx, spec)
@ -200,10 +200,15 @@ func (r *Runner) runSpec(ctx context.Context, spec RunSpec) RunResult {
Env: spec.Env,
})
if !sr.OK {
err, _ := sr.Value.(error)
if err == nil {
err = core.E("runner.run_spec", core.Concat("failed to start: ", spec.Name), nil)
}
return RunResult{
Name: spec.Name,
Spec: spec,
Duration: time.Since(start),
Error: err,
}
}

View file

@ -13,11 +13,9 @@ func newTestRunner(t *testing.T) *Runner {
t.Helper()
c := framework.New()
factory := NewService(Options{})
raw, err := factory(c)
require.NoError(t, err)
return NewRunner(raw.(*Service))
r := Register(c)
require.True(t, r.OK)
return NewRunner(r.Value.(*Service))
}
func TestRunner_RunSequential_Good(t *testing.T) {

View file

@ -5,14 +5,11 @@ import (
"context"
"os"
"os/exec"
"strconv"
"sync"
"sync/atomic"
"syscall"
"time"
"dappco.re/go/core"
coreerr "dappco.re/go/core/log"
)
type execCmd = exec.Cmd
@ -26,19 +23,17 @@ const DefaultBufferSize = 1024 * 1024
// Errors
var (
ErrProcessNotFound = coreerr.E("", "process not found", nil)
ErrProcessNotRunning = coreerr.E("", "process is not running", nil)
ErrStdinNotAvailable = coreerr.E("", "stdin not available", nil)
ErrProcessNotFound = core.E("", "process not found", nil)
ErrProcessNotRunning = core.E("", "process is not running", nil)
ErrStdinNotAvailable = core.E("", "stdin not available", nil)
)
// Service manages process execution with Core IPC integration.
type Service struct {
*core.ServiceRuntime[Options]
processes map[string]*Process
mu sync.RWMutex
bufSize int
idCounter atomic.Uint64
managed *core.Registry[*ManagedProcess]
bufSize int
}
// Options configures the process service.
@ -53,40 +48,23 @@ type Options struct {
// c := core.New()
// svc := process.Register(c).Value.(*process.Service)
func Register(c *core.Core) core.Result {
opts := Options{BufferSize: DefaultBufferSize}
svc := &Service{
ServiceRuntime: core.NewServiceRuntime(c, Options{BufferSize: DefaultBufferSize}),
processes: make(map[string]*Process),
bufSize: DefaultBufferSize,
ServiceRuntime: core.NewServiceRuntime(c, opts),
managed: core.NewRegistry[*ManagedProcess](),
bufSize: opts.BufferSize,
}
return core.Result{Value: svc, OK: true}
}
// NewService creates a process service factory for Core registration.
// Deprecated: Use Register(c) to construct a Service directly.
//
// c := core.New()
// factory := process.NewService(process.Options{})
// raw, err := factory(c)
// if err != nil {
// return nil, err
// }
// svc := raw.(*process.Service)
func NewService(opts Options) func(*core.Core) (any, error) {
return func(c *core.Core) (any, error) {
if opts.BufferSize == 0 {
opts.BufferSize = DefaultBufferSize
}
svc := &Service{
ServiceRuntime: core.NewServiceRuntime(c, opts),
processes: make(map[string]*Process),
bufSize: opts.BufferSize,
}
return svc, nil
}
}
// OnStartup implements core.Startable.
func (s *Service) OnStartup(ctx context.Context) core.Result {
c := s.Core()
c.Action("process.run", s.handleRun)
c.Action("process.start", s.handleStart)
c.Action("process.kill", s.handleKill)
c.Action("process.list", s.handleList)
c.Action("process.get", s.handleGet)
return core.Result{OK: true}
}
@ -94,19 +72,9 @@ func (s *Service) OnStartup(ctx context.Context) core.Result {
//
// c.ServiceShutdown(ctx) // calls OnShutdown on all Stoppable services
func (s *Service) OnShutdown(ctx context.Context) core.Result {
s.mu.RLock()
procs := make([]*Process, 0, len(s.processes))
for _, p := range s.processes {
if p.IsRunning() {
procs = append(procs, p)
}
}
s.mu.RUnlock()
for _, p := range procs {
_ = p.Shutdown()
}
s.managed.Each(func(_ string, proc *ManagedProcess) {
_ = proc.Kill()
})
return core.Result{OK: true}
}
@ -126,7 +94,14 @@ func (s *Service) Start(ctx context.Context, command string, args ...string) cor
// r := svc.StartWithOptions(ctx, process.RunOptions{Command: "go", Args: []string{"test", "./..."}})
// if r.OK { proc := r.Value.(*Process) }
func (s *Service) StartWithOptions(ctx context.Context, opts RunOptions) core.Result {
id := s.nextProcessID()
if opts.Command == "" {
return core.Result{Value: core.E("process.start", "command is required", nil), OK: false}
}
if ctx == nil {
ctx = context.Background()
}
id := core.ID()
// Detached processes use Background context so they survive parent death
parentCtx := ctx
@ -152,19 +127,19 @@ func (s *Service) StartWithOptions(ctx context.Context, opts RunOptions) core.Re
stdout, err := cmd.StdoutPipe()
if err != nil {
cancel()
return core.Result{OK: false}
return core.Result{Value: core.E("process.start", core.Concat("stdout pipe failed: ", opts.Command), err), OK: false}
}
stderr, err := cmd.StderrPipe()
if err != nil {
cancel()
return core.Result{OK: false}
return core.Result{Value: core.E("process.start", core.Concat("stderr pipe failed: ", opts.Command), err), OK: false}
}
stdin, err := cmd.StdinPipe()
if err != nil {
cancel()
return core.Result{OK: false}
return core.Result{Value: core.E("process.start", core.Concat("stdin pipe failed: ", opts.Command), err), OK: false}
}
// Create output buffer (enabled by default)
@ -173,7 +148,7 @@ func (s *Service) StartWithOptions(ctx context.Context, opts RunOptions) core.Re
output = NewRingBuffer(s.bufSize)
}
proc := &Process{
proc := &ManagedProcess{
ID: id,
Command: opts.Command,
Args: opts.Args,
@ -194,13 +169,15 @@ func (s *Service) StartWithOptions(ctx context.Context, opts RunOptions) core.Re
// Start the process
if err := cmd.Start(); err != nil {
cancel()
return core.Result{OK: false}
return core.Result{Value: core.E("process.start", core.Concat("command failed: ", opts.Command), err), OK: false}
}
// Store process
s.mu.Lock()
s.processes[id] = proc
s.mu.Unlock()
if r := s.managed.Set(id, proc); !r.OK {
cancel()
_ = cmd.Process.Kill()
return r
}
// Start timeout watchdog if configured
if opts.Timeout > 0 {
@ -273,7 +250,7 @@ func (s *Service) StartWithOptions(ctx context.Context, opts RunOptions) core.Re
}
// streamOutput reads from a pipe and broadcasts lines via ACTION.
func (s *Service) streamOutput(proc *Process, r streamReader, stream Stream) {
func (s *Service) streamOutput(proc *ManagedProcess, r streamReader, stream Stream) {
scanner := bufio.NewScanner(r)
// Increase buffer for long lines
scanner.Buffer(make([]byte, 64*1024), 1024*1024)
@ -296,40 +273,31 @@ func (s *Service) streamOutput(proc *Process, r streamReader, stream Stream) {
}
// Get returns a process by ID.
func (s *Service) Get(id string) (*Process, error) {
s.mu.RLock()
defer s.mu.RUnlock()
proc, ok := s.processes[id]
if !ok {
func (s *Service) Get(id string) (*ManagedProcess, error) {
r := s.managed.Get(id)
if !r.OK {
return nil, ErrProcessNotFound
}
return proc, nil
return r.Value.(*ManagedProcess), nil
}
// List returns all processes.
func (s *Service) List() []*Process {
s.mu.RLock()
defer s.mu.RUnlock()
result := make([]*Process, 0, len(s.processes))
for _, p := range s.processes {
result = append(result, p)
}
func (s *Service) List() []*ManagedProcess {
result := make([]*ManagedProcess, 0, s.managed.Len())
s.managed.Each(func(_ string, proc *ManagedProcess) {
result = append(result, proc)
})
return result
}
// Running returns all currently running processes.
func (s *Service) Running() []*Process {
s.mu.RLock()
defer s.mu.RUnlock()
var result []*Process
for _, p := range s.processes {
if p.IsRunning() {
result = append(result, p)
func (s *Service) Running() []*ManagedProcess {
result := make([]*ManagedProcess, 0, s.managed.Len())
s.managed.Each(func(_ string, proc *ManagedProcess) {
if proc.IsRunning() {
result = append(result, proc)
}
}
})
return result
}
@ -354,31 +322,30 @@ func (s *Service) Kill(id string) error {
// Remove removes a completed process from the list.
func (s *Service) Remove(id string) error {
s.mu.Lock()
defer s.mu.Unlock()
proc, ok := s.processes[id]
if !ok {
proc, err := s.Get(id)
if err != nil {
return err
}
if proc.IsRunning() {
return core.E("process.remove", core.Concat("cannot remove running process: ", id), nil)
}
r := s.managed.Delete(id)
if !r.OK {
return ErrProcessNotFound
}
if proc.IsRunning() {
return coreerr.E("Service.Remove", "cannot remove running process", nil)
}
delete(s.processes, id)
return nil
}
// Clear removes all completed processes.
func (s *Service) Clear() {
s.mu.Lock()
defer s.mu.Unlock()
for id, p := range s.processes {
if !p.IsRunning() {
delete(s.processes, id)
ids := make([]string, 0)
s.managed.Each(func(id string, proc *ManagedProcess) {
if !proc.IsRunning() {
ids = append(ids, id)
}
})
for _, id := range ids {
s.managed.Delete(id)
}
}
@ -397,15 +364,10 @@ func (s *Service) Output(id string) (string, error) {
// r := svc.Run(ctx, "go", "test", "./...")
// output := r.Value.(string)
func (s *Service) Run(ctx context.Context, command string, args ...string) core.Result {
r := s.Start(ctx, command, args...)
if !r.OK {
return core.Result{Value: "", OK: false}
}
proc := r.Value.(*Process)
<-proc.Done()
return core.Result{Value: proc.Output(), OK: proc.ExitCode == 0}
return s.RunWithOptions(ctx, RunOptions{
Command: command,
Args: args,
})
}
// RunWithOptions executes a command with options and waits for completion.
@ -414,15 +376,7 @@ func (s *Service) Run(ctx context.Context, command string, args ...string) core.
// r := svc.RunWithOptions(ctx, process.RunOptions{Command: "go", Args: []string{"test"}})
// output := r.Value.(string)
func (s *Service) RunWithOptions(ctx context.Context, opts RunOptions) core.Result {
r := s.StartWithOptions(ctx, opts)
if !r.OK {
return core.Result{Value: "", OK: false}
}
proc := r.Value.(*Process)
<-proc.Done()
return core.Result{Value: proc.Output(), OK: proc.ExitCode == 0}
return s.runCommand(ctx, opts)
}
// --- Internal Request Helpers ---
@ -430,7 +384,7 @@ func (s *Service) RunWithOptions(ctx context.Context, opts RunOptions) core.Resu
func (s *Service) handleRun(ctx context.Context, opts core.Options) core.Result {
command := opts.String("command")
if command == "" {
return core.Result{Value: coreerr.E("process.run", "command is required", nil), OK: false}
return core.Result{Value: core.E("process.run", "command is required", nil), OK: false}
}
runOpts := RunOptions{
@ -448,56 +402,94 @@ func (s *Service) handleRun(ctx context.Context, opts core.Options) core.Result
}
}
return s.RunWithOptions(ctx, runOpts)
return s.runCommand(ctx, runOpts)
}
func (s *Service) handleStart(ctx context.Context, opts core.Options) core.Result {
command := opts.String("command")
if command == "" {
return core.Result{Value: coreerr.E("process.start", "command is required", nil), OK: false}
return core.Result{Value: core.E("process.start", "command is required", nil), OK: false}
}
runOpts := RunOptions{
Command: command,
Dir: opts.String("dir"),
Detach: true,
}
if r := opts.Get("args"); r.OK {
if args, ok := r.Value.([]string); ok {
runOpts.Args = args
}
}
if r := opts.Get("env"); r.OK {
if env, ok := r.Value.([]string); ok {
runOpts.Env = env
}
}
r := s.StartWithOptions(ctx, runOpts)
if !r.OK {
return r
}
return core.Result{Value: r.Value.(*Process).ID, OK: true}
return core.Result{Value: r.Value.(*ManagedProcess).ID, OK: true}
}
func (s *Service) handleKill(ctx context.Context, opts core.Options) core.Result {
id := opts.String("id")
if id != "" {
if err := s.Kill(id); err != nil {
if core.Is(err, ErrProcessNotFound) {
return core.Result{Value: core.E("process.kill", core.Concat("not found: ", id), nil), OK: false}
}
return core.Result{Value: err, OK: false}
}
return core.Result{OK: true}
}
return core.Result{Value: coreerr.E("process.kill", "id is required", nil), OK: false}
pid := opts.Int("pid")
if pid > 0 {
proc, err := processHandle(pid)
if err != nil {
return core.Result{Value: core.E("process.kill", core.Concat("find pid failed: ", core.Sprintf("%d", pid)), err), OK: false}
}
if err := proc.Signal(syscall.SIGTERM); err != nil {
return core.Result{Value: core.E("process.kill", core.Concat("signal failed: ", core.Sprintf("%d", pid)), err), OK: false}
}
return core.Result{OK: true}
}
return core.Result{Value: core.E("process.kill", "need id or pid", nil), OK: false}
}
func (s *Service) handleList(ctx context.Context, opts core.Options) core.Result {
s.mu.RLock()
defer s.mu.RUnlock()
return core.Result{Value: s.managed.Names(), OK: true}
}
ids := make([]string, 0, len(s.processes))
for id := range s.processes {
ids = append(ids, id)
func (s *Service) runCommand(ctx context.Context, opts RunOptions) core.Result {
if opts.Command == "" {
return core.Result{Value: core.E("process.run", "command is required", nil), OK: false}
}
return core.Result{Value: ids, OK: true}
if ctx == nil {
ctx = context.Background()
}
cmd := execCommandContext(ctx, opts.Command, opts.Args...)
if opts.Dir != "" {
cmd.Dir = opts.Dir
}
if len(opts.Env) > 0 {
cmd.Env = append(cmd.Environ(), opts.Env...)
}
output, err := cmd.CombinedOutput()
if err != nil {
return core.Result{Value: core.E("process.run", core.Concat("command failed: ", opts.Command), err), OK: false}
}
return core.Result{Value: string(output), OK: true}
}
// Signal sends a signal to the process.
func (p *Process) Signal(sig os.Signal) error {
func (p *ManagedProcess) Signal(sig os.Signal) error {
p.mu.Lock()
defer p.mu.Unlock()
@ -542,13 +534,12 @@ func isNotExist(err error) bool {
func (s *Service) handleGet(ctx context.Context, opts core.Options) core.Result {
id := opts.String("id")
if id == "" {
return core.Result{Value: core.E("process.get", "id is required", nil), OK: false}
}
proc, err := s.Get(id)
if err != nil {
return core.Result{Value: err, OK: false}
return core.Result{Value: core.E("process.get", core.Concat("not found: ", id), err), OK: false}
}
return core.Result{Value: proc.Info(), OK: true}
}
func (s *Service) nextProcessID() string {
return "proc-" + strconv.FormatUint(s.idCounter.Add(1), 10)
}

View file

@ -15,12 +15,183 @@ func newTestService(t *testing.T) (*Service, *framework.Core) {
t.Helper()
c := framework.New()
factory := NewService(Options{BufferSize: 1024})
raw, err := factory(c)
r := Register(c)
require.True(t, r.OK)
return r.Value.(*Service), c
}
func newStartedTestService(t *testing.T) (*Service, *framework.Core) {
t.Helper()
svc, c := newTestService(t)
r := svc.OnStartup(context.Background())
require.True(t, r.OK)
return svc, c
}
func TestService_Register_Good(t *testing.T) {
c := framework.New(framework.WithService(Register))
svc, ok := framework.ServiceFor[*Service](c, "process")
require.True(t, ok)
assert.NotNil(t, svc)
}
func TestService_OnStartup_Good(t *testing.T) {
svc, c := newTestService(t)
r := svc.OnStartup(context.Background())
require.True(t, r.OK)
assert.True(t, c.Action("process.run").Exists())
assert.True(t, c.Action("process.start").Exists())
assert.True(t, c.Action("process.kill").Exists())
assert.True(t, c.Action("process.list").Exists())
assert.True(t, c.Action("process.get").Exists())
}
func TestService_HandleRun_Good(t *testing.T) {
_, c := newStartedTestService(t)
r := c.Action("process.run").Run(context.Background(), framework.NewOptions(
framework.Option{Key: "command", Value: "echo"},
framework.Option{Key: "args", Value: []string{"hello"}},
))
require.True(t, r.OK)
assert.Contains(t, r.Value.(string), "hello")
}
func TestService_HandleRun_Bad(t *testing.T) {
_, c := newStartedTestService(t)
r := c.Action("process.run").Run(context.Background(), framework.NewOptions(
framework.Option{Key: "command", Value: "nonexistent_command_xyz"},
))
assert.False(t, r.OK)
}
func TestService_HandleRun_Ugly(t *testing.T) {
_, c := newStartedTestService(t)
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancel()
r := c.Action("process.run").Run(ctx, framework.NewOptions(
framework.Option{Key: "command", Value: "sleep"},
framework.Option{Key: "args", Value: []string{"1"}},
))
assert.False(t, r.OK)
}
func TestService_HandleStart_Good(t *testing.T) {
svc, c := newStartedTestService(t)
r := c.Action("process.start").Run(context.Background(), framework.NewOptions(
framework.Option{Key: "command", Value: "sleep"},
framework.Option{Key: "args", Value: []string{"60"}},
))
require.True(t, r.OK)
id := r.Value.(string)
proc, err := svc.Get(id)
require.NoError(t, err)
assert.True(t, proc.IsRunning())
kill := c.Action("process.kill").Run(context.Background(), framework.NewOptions(
framework.Option{Key: "id", Value: id},
))
require.True(t, kill.OK)
<-proc.Done()
}
func TestService_HandleStart_Bad(t *testing.T) {
_, c := newStartedTestService(t)
r := c.Action("process.start").Run(context.Background(), framework.NewOptions(
framework.Option{Key: "command", Value: "nonexistent_command_xyz"},
))
assert.False(t, r.OK)
}
func TestService_HandleKill_Good(t *testing.T) {
svc, c := newStartedTestService(t)
start := c.Action("process.start").Run(context.Background(), framework.NewOptions(
framework.Option{Key: "command", Value: "sleep"},
framework.Option{Key: "args", Value: []string{"60"}},
))
require.True(t, start.OK)
id := start.Value.(string)
proc, err := svc.Get(id)
require.NoError(t, err)
svc := raw.(*Service)
return svc, c
kill := c.Action("process.kill").Run(context.Background(), framework.NewOptions(
framework.Option{Key: "id", Value: id},
))
require.True(t, kill.OK)
select {
case <-proc.Done():
case <-time.After(2 * time.Second):
t.Fatal("process should have been killed")
}
}
func TestService_HandleKill_Bad(t *testing.T) {
_, c := newStartedTestService(t)
r := c.Action("process.kill").Run(context.Background(), framework.NewOptions(
framework.Option{Key: "id", Value: "missing"},
))
assert.False(t, r.OK)
}
func TestService_HandleList_Good(t *testing.T) {
svc, c := newStartedTestService(t)
startOne := c.Action("process.start").Run(context.Background(), framework.NewOptions(
framework.Option{Key: "command", Value: "sleep"},
framework.Option{Key: "args", Value: []string{"60"}},
))
require.True(t, startOne.OK)
startTwo := c.Action("process.start").Run(context.Background(), framework.NewOptions(
framework.Option{Key: "command", Value: "sleep"},
framework.Option{Key: "args", Value: []string{"60"}},
))
require.True(t, startTwo.OK)
r := c.Action("process.list").Run(context.Background(), framework.NewOptions())
require.True(t, r.OK)
ids := r.Value.([]string)
assert.Len(t, ids, 2)
for _, id := range ids {
proc, err := svc.Get(id)
require.NoError(t, err)
_ = proc.Kill()
<-proc.Done()
}
}
func TestService_Ugly_PermissionModel(t *testing.T) {
c := framework.New()
r := c.Process().Run(context.Background(), "echo", "blocked")
assert.False(t, r.OK)
c = framework.New(framework.WithService(Register))
startup := c.ServiceStartup(context.Background(), nil)
require.True(t, startup.OK)
defer func() {
shutdown := c.ServiceShutdown(context.Background())
assert.True(t, shutdown.OK)
}()
r = c.Process().Run(context.Background(), "echo", "allowed")
require.True(t, r.OK)
assert.Contains(t, r.Value.(string), "allowed")
}
func startProc(t *testing.T, svc *Service, ctx context.Context, command string, args ...string) *Process {
@ -171,12 +342,7 @@ func TestService_Run_Good(t *testing.T) {
func TestService_Actions_Good(t *testing.T) {
t.Run("broadcasts events", func(t *testing.T) {
c := framework.New()
factory := NewService(Options{})
raw, err := factory(c)
require.NoError(t, err)
svc := raw.(*Service)
svc, c := newTestService(t)
var started []ActionProcessStarted
var outputs []ActionProcessOutput
@ -381,14 +547,6 @@ func TestService_OnShutdown_Good(t *testing.T) {
})
}
func TestService_OnStartup_Good(t *testing.T) {
t.Run("returns ok", func(t *testing.T) {
svc, _ := newTestService(t)
r := svc.OnStartup(context.Background())
assert.True(t, r.OK)
})
}
func TestService_RunWithOptions_Good(t *testing.T) {
t.Run("returns output on success", func(t *testing.T) {
svc, _ := newTestService(t)

View file

@ -5,16 +5,11 @@
//
// # Getting Started
//
// c := core.New()
// factory := process.NewService(process.Options{})
// raw, err := factory(c)
// if err != nil {
// return err
// }
// c := core.New(core.WithService(process.Register))
// _ = c.ServiceStartup(ctx, nil)
//
// svc := raw.(*process.Service)
// r := svc.Start(ctx, "go", "test", "./...")
// proc := r.Value.(*process.Process)
// r := c.Process().Run(ctx, "go", "test", "./...")
// output := r.Value.(string)
//
// # Listening for Events
//
@ -90,15 +85,19 @@ type RunOptions struct {
KillGroup bool
}
// Info provides a snapshot of process state without internal fields.
type Info struct {
// ProcessInfo provides a snapshot of process state without internal fields.
type ProcessInfo struct {
ID string `json:"id"`
Command string `json:"command"`
Args []string `json:"args"`
Dir string `json:"dir"`
StartedAt time.Time `json:"startedAt"`
Running bool `json:"running"`
Status Status `json:"status"`
ExitCode int `json:"exitCode"`
Duration time.Duration `json:"duration"`
PID int `json:"pid"`
}
// Info is kept as a compatibility alias for ProcessInfo.
type Info = ProcessInfo