Adds a Supervisor layer to pkg/process that manages long-running processes and goroutines with automatic restart, panic recovery, and graceful shutdown. Supports both external processes (DaemonSpec) and Go functions (GoSpec) with configurable restart policies. Also exposes AddHealthCheck on the Daemon struct so supervised services can wire their status into the daemon health endpoint. Co-Authored-By: Virgil <virgil@lethean.io>
470 lines
11 KiB
Go
470 lines
11 KiB
Go
package process
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log/slog"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// RestartPolicy configures automatic restart behaviour for supervised units.
|
|
type RestartPolicy struct {
|
|
// Delay between restart attempts.
|
|
Delay time.Duration
|
|
// MaxRestarts is the maximum number of restarts before giving up.
|
|
// Use -1 for unlimited restarts.
|
|
MaxRestarts int
|
|
}
|
|
|
|
// DaemonSpec defines a long-running external process under supervision.
|
|
type DaemonSpec struct {
|
|
// Name identifies this daemon (must be unique within the supervisor).
|
|
Name string
|
|
// RunOptions defines the command, args, dir, env.
|
|
RunOptions
|
|
// Restart configures automatic restart behaviour.
|
|
Restart RestartPolicy
|
|
}
|
|
|
|
// GoSpec defines a supervised Go function that runs in a goroutine.
|
|
// The function should block until done or ctx is cancelled.
|
|
type GoSpec struct {
|
|
// Name identifies this task (must be unique within the supervisor).
|
|
Name string
|
|
// Func is the function to supervise. It receives a context that is
|
|
// cancelled when the supervisor stops or the task is explicitly stopped.
|
|
// If it returns an error or panics, the supervisor restarts it
|
|
// according to the restart policy.
|
|
Func func(ctx context.Context) error
|
|
// Restart configures automatic restart behaviour.
|
|
Restart RestartPolicy
|
|
}
|
|
|
|
// DaemonStatus contains a snapshot of a supervised unit's state.
|
|
type DaemonStatus struct {
|
|
Name string `json:"name"`
|
|
Type string `json:"type"` // "process" or "goroutine"
|
|
Running bool `json:"running"`
|
|
PID int `json:"pid,omitempty"`
|
|
RestartCount int `json:"restartCount"`
|
|
LastStart time.Time `json:"lastStart"`
|
|
Uptime time.Duration `json:"uptime"`
|
|
ExitCode int `json:"exitCode,omitempty"`
|
|
}
|
|
|
|
// supervisedUnit is the internal state for any supervised unit.
|
|
type supervisedUnit struct {
|
|
name string
|
|
unitType string // "process" or "goroutine"
|
|
restart RestartPolicy
|
|
restartCount int
|
|
lastStart time.Time
|
|
running bool
|
|
exitCode int
|
|
|
|
// For process daemons
|
|
runOpts *RunOptions
|
|
proc *Process
|
|
|
|
// For go functions
|
|
goFunc func(ctx context.Context) error
|
|
|
|
cancel context.CancelFunc
|
|
done chan struct{} // closed when supervision goroutine exits
|
|
mu sync.Mutex
|
|
}
|
|
|
|
func (u *supervisedUnit) status() DaemonStatus {
|
|
u.mu.Lock()
|
|
defer u.mu.Unlock()
|
|
|
|
var uptime time.Duration
|
|
if u.running && !u.lastStart.IsZero() {
|
|
uptime = time.Since(u.lastStart)
|
|
}
|
|
|
|
pid := 0
|
|
if u.proc != nil {
|
|
info := u.proc.Info()
|
|
pid = info.PID
|
|
}
|
|
|
|
return DaemonStatus{
|
|
Name: u.name,
|
|
Type: u.unitType,
|
|
Running: u.running,
|
|
PID: pid,
|
|
RestartCount: u.restartCount,
|
|
LastStart: u.lastStart,
|
|
Uptime: uptime,
|
|
ExitCode: u.exitCode,
|
|
}
|
|
}
|
|
|
|
// ShutdownTimeout is the maximum time to wait for supervised units during shutdown.
|
|
const ShutdownTimeout = 15 * time.Second
|
|
|
|
// Supervisor manages long-running processes and goroutines with automatic restart.
|
|
//
|
|
// For external processes, it requires a Service instance.
|
|
// For Go functions, no Service is needed.
|
|
//
|
|
// sup := process.NewSupervisor(svc)
|
|
// sup.Register(process.DaemonSpec{
|
|
// Name: "worker",
|
|
// RunOptions: process.RunOptions{Command: "worker", Args: []string{"--port", "8080"}},
|
|
// Restart: process.RestartPolicy{Delay: 5 * time.Second, MaxRestarts: -1},
|
|
// })
|
|
// sup.RegisterFunc(process.GoSpec{
|
|
// Name: "health-check",
|
|
// Func: healthCheckLoop,
|
|
// Restart: process.RestartPolicy{Delay: time.Second, MaxRestarts: -1},
|
|
// })
|
|
// sup.Start()
|
|
// defer sup.Stop()
|
|
type Supervisor struct {
|
|
service *Service
|
|
units map[string]*supervisedUnit
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
wg sync.WaitGroup
|
|
mu sync.RWMutex
|
|
started bool
|
|
}
|
|
|
|
// NewSupervisor creates a supervisor.
|
|
// The Service parameter is optional (nil) if only supervising Go functions.
|
|
func NewSupervisor(svc *Service) *Supervisor {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
return &Supervisor{
|
|
service: svc,
|
|
units: make(map[string]*supervisedUnit),
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
}
|
|
}
|
|
|
|
// Register adds an external process daemon for supervision.
|
|
// Panics if no Service was provided to NewSupervisor.
|
|
func (s *Supervisor) Register(spec DaemonSpec) {
|
|
if s.service == nil {
|
|
panic("process: Supervisor.Register requires a Service (use NewSupervisor with non-nil service)")
|
|
}
|
|
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
opts := spec.RunOptions
|
|
s.units[spec.Name] = &supervisedUnit{
|
|
name: spec.Name,
|
|
unitType: "process",
|
|
restart: spec.Restart,
|
|
runOpts: &opts,
|
|
}
|
|
}
|
|
|
|
// RegisterFunc adds a Go function for supervision.
|
|
func (s *Supervisor) RegisterFunc(spec GoSpec) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
s.units[spec.Name] = &supervisedUnit{
|
|
name: spec.Name,
|
|
unitType: "goroutine",
|
|
restart: spec.Restart,
|
|
goFunc: spec.Func,
|
|
}
|
|
}
|
|
|
|
// Start begins supervising all registered units.
|
|
// Safe to call once — subsequent calls are no-ops.
|
|
func (s *Supervisor) Start() {
|
|
s.mu.Lock()
|
|
if s.started {
|
|
s.mu.Unlock()
|
|
return
|
|
}
|
|
s.started = true
|
|
s.mu.Unlock()
|
|
|
|
s.mu.RLock()
|
|
for _, unit := range s.units {
|
|
s.startUnit(unit)
|
|
}
|
|
s.mu.RUnlock()
|
|
}
|
|
|
|
// startUnit launches the supervision goroutine for a single unit.
|
|
func (s *Supervisor) startUnit(u *supervisedUnit) {
|
|
u.mu.Lock()
|
|
if u.running {
|
|
u.mu.Unlock()
|
|
return
|
|
}
|
|
u.running = true
|
|
u.lastStart = time.Now()
|
|
|
|
unitCtx, unitCancel := context.WithCancel(s.ctx)
|
|
u.cancel = unitCancel
|
|
u.done = make(chan struct{})
|
|
u.mu.Unlock()
|
|
|
|
s.wg.Add(1)
|
|
go func() {
|
|
defer s.wg.Done()
|
|
defer close(u.done)
|
|
s.superviseLoop(u, unitCtx)
|
|
}()
|
|
|
|
slog.Info("supervisor: started unit", "name", u.name, "type", u.unitType)
|
|
}
|
|
|
|
// superviseLoop is the core restart loop for a supervised unit.
|
|
// ctx is the unit's own context, derived from s.ctx. Cancelling either
|
|
// the supervisor or the unit's context exits this loop.
|
|
func (s *Supervisor) superviseLoop(u *supervisedUnit, ctx context.Context) {
|
|
for {
|
|
// Check if this unit's context is cancelled (covers both
|
|
// supervisor shutdown and manual restart/stop)
|
|
select {
|
|
case <-ctx.Done():
|
|
u.mu.Lock()
|
|
u.running = false
|
|
u.mu.Unlock()
|
|
return
|
|
default:
|
|
}
|
|
|
|
// Run the unit with panic recovery
|
|
exitCode := s.runUnit(u, ctx)
|
|
|
|
// If context was cancelled during run, exit the loop
|
|
if ctx.Err() != nil {
|
|
u.mu.Lock()
|
|
u.running = false
|
|
u.mu.Unlock()
|
|
return
|
|
}
|
|
|
|
u.mu.Lock()
|
|
u.exitCode = exitCode
|
|
u.restartCount++
|
|
shouldRestart := u.restart.MaxRestarts < 0 || u.restartCount <= u.restart.MaxRestarts
|
|
delay := u.restart.Delay
|
|
count := u.restartCount
|
|
u.mu.Unlock()
|
|
|
|
if !shouldRestart {
|
|
slog.Warn("supervisor: unit reached max restarts",
|
|
"name", u.name,
|
|
"maxRestarts", u.restart.MaxRestarts,
|
|
)
|
|
u.mu.Lock()
|
|
u.running = false
|
|
u.mu.Unlock()
|
|
return
|
|
}
|
|
|
|
// Wait before restarting, or exit if context is cancelled
|
|
select {
|
|
case <-ctx.Done():
|
|
u.mu.Lock()
|
|
u.running = false
|
|
u.mu.Unlock()
|
|
return
|
|
case <-time.After(delay):
|
|
slog.Info("supervisor: restarting unit",
|
|
"name", u.name,
|
|
"restartCount", count,
|
|
"exitCode", exitCode,
|
|
)
|
|
u.mu.Lock()
|
|
u.lastStart = time.Now()
|
|
u.mu.Unlock()
|
|
}
|
|
}
|
|
}
|
|
|
|
// runUnit executes a single run of the unit, returning exit code.
|
|
// Recovers from panics.
|
|
func (s *Supervisor) runUnit(u *supervisedUnit, ctx context.Context) (exitCode int) {
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
slog.Error("supervisor: unit panicked",
|
|
"name", u.name,
|
|
"panic", fmt.Sprintf("%v", r),
|
|
)
|
|
exitCode = 1
|
|
}
|
|
}()
|
|
|
|
switch u.unitType {
|
|
case "process":
|
|
return s.runProcess(u, ctx)
|
|
case "goroutine":
|
|
return s.runGoFunc(u, ctx)
|
|
default:
|
|
slog.Error("supervisor: unknown unit type", "name", u.name, "type", u.unitType)
|
|
return 1
|
|
}
|
|
}
|
|
|
|
// runProcess starts an external process and waits for it to exit.
|
|
func (s *Supervisor) runProcess(u *supervisedUnit, ctx context.Context) int {
|
|
proc, err := s.service.StartWithOptions(ctx, *u.runOpts)
|
|
if err != nil {
|
|
slog.Error("supervisor: failed to start process",
|
|
"name", u.name,
|
|
"error", err,
|
|
)
|
|
return 1
|
|
}
|
|
|
|
u.mu.Lock()
|
|
u.proc = proc
|
|
u.mu.Unlock()
|
|
|
|
// Wait for process to finish or context cancellation
|
|
select {
|
|
case <-proc.Done():
|
|
info := proc.Info()
|
|
return info.ExitCode
|
|
case <-ctx.Done():
|
|
// Context cancelled — kill the process
|
|
_ = proc.Kill()
|
|
<-proc.Done()
|
|
return -1
|
|
}
|
|
}
|
|
|
|
// runGoFunc runs a Go function and returns 0 on success, 1 on error.
|
|
func (s *Supervisor) runGoFunc(u *supervisedUnit, ctx context.Context) int {
|
|
if err := u.goFunc(ctx); err != nil {
|
|
if ctx.Err() != nil {
|
|
// Context was cancelled, not a real error
|
|
return -1
|
|
}
|
|
slog.Error("supervisor: go function returned error",
|
|
"name", u.name,
|
|
"error", err,
|
|
)
|
|
return 1
|
|
}
|
|
return 0
|
|
}
|
|
|
|
// Stop gracefully shuts down all supervised units.
|
|
func (s *Supervisor) Stop() {
|
|
s.cancel()
|
|
|
|
// Wait with timeout
|
|
done := make(chan struct{})
|
|
go func() {
|
|
s.wg.Wait()
|
|
close(done)
|
|
}()
|
|
|
|
select {
|
|
case <-done:
|
|
slog.Info("supervisor: all units stopped")
|
|
case <-time.After(ShutdownTimeout):
|
|
slog.Warn("supervisor: shutdown timeout, some units may not have stopped")
|
|
}
|
|
|
|
s.mu.Lock()
|
|
s.started = false
|
|
s.mu.Unlock()
|
|
}
|
|
|
|
// Restart stops and restarts a specific unit by name.
|
|
func (s *Supervisor) Restart(name string) error {
|
|
s.mu.RLock()
|
|
u, ok := s.units[name]
|
|
s.mu.RUnlock()
|
|
|
|
if !ok {
|
|
return fmt.Errorf("supervisor: unit not found: %s", name)
|
|
}
|
|
|
|
// Cancel the current run and wait for the supervision goroutine to exit
|
|
u.mu.Lock()
|
|
if u.cancel != nil {
|
|
u.cancel()
|
|
}
|
|
done := u.done
|
|
u.mu.Unlock()
|
|
|
|
// Wait for the old supervision goroutine to exit
|
|
if done != nil {
|
|
<-done
|
|
}
|
|
|
|
// Reset restart counter for the fresh start
|
|
u.mu.Lock()
|
|
u.restartCount = 0
|
|
u.mu.Unlock()
|
|
|
|
// Start fresh
|
|
s.startUnit(u)
|
|
return nil
|
|
}
|
|
|
|
// StopUnit stops a specific unit without restarting it.
|
|
func (s *Supervisor) StopUnit(name string) error {
|
|
s.mu.RLock()
|
|
u, ok := s.units[name]
|
|
s.mu.RUnlock()
|
|
|
|
if !ok {
|
|
return fmt.Errorf("supervisor: unit not found: %s", name)
|
|
}
|
|
|
|
u.mu.Lock()
|
|
if u.cancel != nil {
|
|
u.cancel()
|
|
}
|
|
// Set max restarts to 0 to prevent the loop from restarting
|
|
u.restart.MaxRestarts = 0
|
|
u.restartCount = 1
|
|
u.mu.Unlock()
|
|
|
|
return nil
|
|
}
|
|
|
|
// Status returns the status of a specific supervised unit.
|
|
func (s *Supervisor) Status(name string) (DaemonStatus, error) {
|
|
s.mu.RLock()
|
|
u, ok := s.units[name]
|
|
s.mu.RUnlock()
|
|
|
|
if !ok {
|
|
return DaemonStatus{}, fmt.Errorf("supervisor: unit not found: %s", name)
|
|
}
|
|
|
|
return u.status(), nil
|
|
}
|
|
|
|
// Statuses returns the status of all supervised units.
|
|
func (s *Supervisor) Statuses() map[string]DaemonStatus {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
|
|
result := make(map[string]DaemonStatus, len(s.units))
|
|
for name, u := range s.units {
|
|
result[name] = u.status()
|
|
}
|
|
return result
|
|
}
|
|
|
|
// UnitNames returns the names of all registered units.
|
|
func (s *Supervisor) UnitNames() []string {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
|
|
names := make([]string, 0, len(s.units))
|
|
for name := range s.units {
|
|
names = append(names, name)
|
|
}
|
|
return names
|
|
}
|