471 lines
11 KiB
Go
471 lines
11 KiB
Go
|
|
package process
|
||
|
|
|
||
|
|
import (
|
||
|
|
"context"
|
||
|
|
"fmt"
|
||
|
|
"log/slog"
|
||
|
|
"sync"
|
||
|
|
"time"
|
||
|
|
)
|
||
|
|
|
||
|
|
// RestartPolicy configures automatic restart behaviour for supervised units.
|
||
|
|
type RestartPolicy struct {
|
||
|
|
// Delay between restart attempts.
|
||
|
|
Delay time.Duration
|
||
|
|
// MaxRestarts is the maximum number of restarts before giving up.
|
||
|
|
// Use -1 for unlimited restarts.
|
||
|
|
MaxRestarts int
|
||
|
|
}
|
||
|
|
|
||
|
|
// DaemonSpec defines a long-running external process under supervision.
|
||
|
|
type DaemonSpec struct {
|
||
|
|
// Name identifies this daemon (must be unique within the supervisor).
|
||
|
|
Name string
|
||
|
|
// RunOptions defines the command, args, dir, env.
|
||
|
|
RunOptions
|
||
|
|
// Restart configures automatic restart behaviour.
|
||
|
|
Restart RestartPolicy
|
||
|
|
}
|
||
|
|
|
||
|
|
// GoSpec defines a supervised Go function that runs in a goroutine.
|
||
|
|
// The function should block until done or ctx is cancelled.
|
||
|
|
type GoSpec struct {
|
||
|
|
// Name identifies this task (must be unique within the supervisor).
|
||
|
|
Name string
|
||
|
|
// Func is the function to supervise. It receives a context that is
|
||
|
|
// cancelled when the supervisor stops or the task is explicitly stopped.
|
||
|
|
// If it returns an error or panics, the supervisor restarts it
|
||
|
|
// according to the restart policy.
|
||
|
|
Func func(ctx context.Context) error
|
||
|
|
// Restart configures automatic restart behaviour.
|
||
|
|
Restart RestartPolicy
|
||
|
|
}
|
||
|
|
|
||
|
|
// DaemonStatus contains a snapshot of a supervised unit's state.
|
||
|
|
type DaemonStatus struct {
|
||
|
|
Name string `json:"name"`
|
||
|
|
Type string `json:"type"` // "process" or "goroutine"
|
||
|
|
Running bool `json:"running"`
|
||
|
|
PID int `json:"pid,omitempty"`
|
||
|
|
RestartCount int `json:"restartCount"`
|
||
|
|
LastStart time.Time `json:"lastStart"`
|
||
|
|
Uptime time.Duration `json:"uptime"`
|
||
|
|
ExitCode int `json:"exitCode,omitempty"`
|
||
|
|
}
|
||
|
|
|
||
|
|
// supervisedUnit is the internal state for any supervised unit.
|
||
|
|
type supervisedUnit struct {
|
||
|
|
name string
|
||
|
|
unitType string // "process" or "goroutine"
|
||
|
|
restart RestartPolicy
|
||
|
|
restartCount int
|
||
|
|
lastStart time.Time
|
||
|
|
running bool
|
||
|
|
exitCode int
|
||
|
|
|
||
|
|
// For process daemons
|
||
|
|
runOpts *RunOptions
|
||
|
|
proc *Process
|
||
|
|
|
||
|
|
// For go functions
|
||
|
|
goFunc func(ctx context.Context) error
|
||
|
|
|
||
|
|
cancel context.CancelFunc
|
||
|
|
done chan struct{} // closed when supervision goroutine exits
|
||
|
|
mu sync.Mutex
|
||
|
|
}
|
||
|
|
|
||
|
|
func (u *supervisedUnit) status() DaemonStatus {
|
||
|
|
u.mu.Lock()
|
||
|
|
defer u.mu.Unlock()
|
||
|
|
|
||
|
|
var uptime time.Duration
|
||
|
|
if u.running && !u.lastStart.IsZero() {
|
||
|
|
uptime = time.Since(u.lastStart)
|
||
|
|
}
|
||
|
|
|
||
|
|
pid := 0
|
||
|
|
if u.proc != nil {
|
||
|
|
info := u.proc.Info()
|
||
|
|
pid = info.PID
|
||
|
|
}
|
||
|
|
|
||
|
|
return DaemonStatus{
|
||
|
|
Name: u.name,
|
||
|
|
Type: u.unitType,
|
||
|
|
Running: u.running,
|
||
|
|
PID: pid,
|
||
|
|
RestartCount: u.restartCount,
|
||
|
|
LastStart: u.lastStart,
|
||
|
|
Uptime: uptime,
|
||
|
|
ExitCode: u.exitCode,
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// ShutdownTimeout is the maximum time to wait for supervised units during shutdown.
|
||
|
|
const ShutdownTimeout = 15 * time.Second
|
||
|
|
|
||
|
|
// Supervisor manages long-running processes and goroutines with automatic restart.
|
||
|
|
//
|
||
|
|
// For external processes, it requires a Service instance.
|
||
|
|
// For Go functions, no Service is needed.
|
||
|
|
//
|
||
|
|
// sup := process.NewSupervisor(svc)
|
||
|
|
// sup.Register(process.DaemonSpec{
|
||
|
|
// Name: "worker",
|
||
|
|
// RunOptions: process.RunOptions{Command: "worker", Args: []string{"--port", "8080"}},
|
||
|
|
// Restart: process.RestartPolicy{Delay: 5 * time.Second, MaxRestarts: -1},
|
||
|
|
// })
|
||
|
|
// sup.RegisterFunc(process.GoSpec{
|
||
|
|
// Name: "health-check",
|
||
|
|
// Func: healthCheckLoop,
|
||
|
|
// Restart: process.RestartPolicy{Delay: time.Second, MaxRestarts: -1},
|
||
|
|
// })
|
||
|
|
// sup.Start()
|
||
|
|
// defer sup.Stop()
|
||
|
|
type Supervisor struct {
|
||
|
|
service *Service
|
||
|
|
units map[string]*supervisedUnit
|
||
|
|
ctx context.Context
|
||
|
|
cancel context.CancelFunc
|
||
|
|
wg sync.WaitGroup
|
||
|
|
mu sync.RWMutex
|
||
|
|
started bool
|
||
|
|
}
|
||
|
|
|
||
|
|
// NewSupervisor creates a supervisor.
|
||
|
|
// The Service parameter is optional (nil) if only supervising Go functions.
|
||
|
|
func NewSupervisor(svc *Service) *Supervisor {
|
||
|
|
ctx, cancel := context.WithCancel(context.Background())
|
||
|
|
return &Supervisor{
|
||
|
|
service: svc,
|
||
|
|
units: make(map[string]*supervisedUnit),
|
||
|
|
ctx: ctx,
|
||
|
|
cancel: cancel,
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// Register adds an external process daemon for supervision.
|
||
|
|
// Panics if no Service was provided to NewSupervisor.
|
||
|
|
func (s *Supervisor) Register(spec DaemonSpec) {
|
||
|
|
if s.service == nil {
|
||
|
|
panic("process: Supervisor.Register requires a Service (use NewSupervisor with non-nil service)")
|
||
|
|
}
|
||
|
|
|
||
|
|
s.mu.Lock()
|
||
|
|
defer s.mu.Unlock()
|
||
|
|
|
||
|
|
opts := spec.RunOptions
|
||
|
|
s.units[spec.Name] = &supervisedUnit{
|
||
|
|
name: spec.Name,
|
||
|
|
unitType: "process",
|
||
|
|
restart: spec.Restart,
|
||
|
|
runOpts: &opts,
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// RegisterFunc adds a Go function for supervision.
|
||
|
|
func (s *Supervisor) RegisterFunc(spec GoSpec) {
|
||
|
|
s.mu.Lock()
|
||
|
|
defer s.mu.Unlock()
|
||
|
|
|
||
|
|
s.units[spec.Name] = &supervisedUnit{
|
||
|
|
name: spec.Name,
|
||
|
|
unitType: "goroutine",
|
||
|
|
restart: spec.Restart,
|
||
|
|
goFunc: spec.Func,
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// Start begins supervising all registered units.
|
||
|
|
// Safe to call once — subsequent calls are no-ops.
|
||
|
|
func (s *Supervisor) Start() {
|
||
|
|
s.mu.Lock()
|
||
|
|
if s.started {
|
||
|
|
s.mu.Unlock()
|
||
|
|
return
|
||
|
|
}
|
||
|
|
s.started = true
|
||
|
|
s.mu.Unlock()
|
||
|
|
|
||
|
|
s.mu.RLock()
|
||
|
|
for _, unit := range s.units {
|
||
|
|
s.startUnit(unit)
|
||
|
|
}
|
||
|
|
s.mu.RUnlock()
|
||
|
|
}
|
||
|
|
|
||
|
|
// startUnit launches the supervision goroutine for a single unit.
|
||
|
|
func (s *Supervisor) startUnit(u *supervisedUnit) {
|
||
|
|
u.mu.Lock()
|
||
|
|
if u.running {
|
||
|
|
u.mu.Unlock()
|
||
|
|
return
|
||
|
|
}
|
||
|
|
u.running = true
|
||
|
|
u.lastStart = time.Now()
|
||
|
|
|
||
|
|
unitCtx, unitCancel := context.WithCancel(s.ctx)
|
||
|
|
u.cancel = unitCancel
|
||
|
|
u.done = make(chan struct{})
|
||
|
|
u.mu.Unlock()
|
||
|
|
|
||
|
|
s.wg.Add(1)
|
||
|
|
go func() {
|
||
|
|
defer s.wg.Done()
|
||
|
|
defer close(u.done)
|
||
|
|
s.superviseLoop(u, unitCtx)
|
||
|
|
}()
|
||
|
|
|
||
|
|
slog.Info("supervisor: started unit", "name", u.name, "type", u.unitType)
|
||
|
|
}
|
||
|
|
|
||
|
|
// superviseLoop is the core restart loop for a supervised unit.
|
||
|
|
// ctx is the unit's own context, derived from s.ctx. Cancelling either
|
||
|
|
// the supervisor or the unit's context exits this loop.
|
||
|
|
func (s *Supervisor) superviseLoop(u *supervisedUnit, ctx context.Context) {
|
||
|
|
for {
|
||
|
|
// Check if this unit's context is cancelled (covers both
|
||
|
|
// supervisor shutdown and manual restart/stop)
|
||
|
|
select {
|
||
|
|
case <-ctx.Done():
|
||
|
|
u.mu.Lock()
|
||
|
|
u.running = false
|
||
|
|
u.mu.Unlock()
|
||
|
|
return
|
||
|
|
default:
|
||
|
|
}
|
||
|
|
|
||
|
|
// Run the unit with panic recovery
|
||
|
|
exitCode := s.runUnit(u, ctx)
|
||
|
|
|
||
|
|
// If context was cancelled during run, exit the loop
|
||
|
|
if ctx.Err() != nil {
|
||
|
|
u.mu.Lock()
|
||
|
|
u.running = false
|
||
|
|
u.mu.Unlock()
|
||
|
|
return
|
||
|
|
}
|
||
|
|
|
||
|
|
u.mu.Lock()
|
||
|
|
u.exitCode = exitCode
|
||
|
|
u.restartCount++
|
||
|
|
shouldRestart := u.restart.MaxRestarts < 0 || u.restartCount <= u.restart.MaxRestarts
|
||
|
|
delay := u.restart.Delay
|
||
|
|
count := u.restartCount
|
||
|
|
u.mu.Unlock()
|
||
|
|
|
||
|
|
if !shouldRestart {
|
||
|
|
slog.Warn("supervisor: unit reached max restarts",
|
||
|
|
"name", u.name,
|
||
|
|
"maxRestarts", u.restart.MaxRestarts,
|
||
|
|
)
|
||
|
|
u.mu.Lock()
|
||
|
|
u.running = false
|
||
|
|
u.mu.Unlock()
|
||
|
|
return
|
||
|
|
}
|
||
|
|
|
||
|
|
// Wait before restarting, or exit if context is cancelled
|
||
|
|
select {
|
||
|
|
case <-ctx.Done():
|
||
|
|
u.mu.Lock()
|
||
|
|
u.running = false
|
||
|
|
u.mu.Unlock()
|
||
|
|
return
|
||
|
|
case <-time.After(delay):
|
||
|
|
slog.Info("supervisor: restarting unit",
|
||
|
|
"name", u.name,
|
||
|
|
"restartCount", count,
|
||
|
|
"exitCode", exitCode,
|
||
|
|
)
|
||
|
|
u.mu.Lock()
|
||
|
|
u.lastStart = time.Now()
|
||
|
|
u.mu.Unlock()
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// runUnit executes a single run of the unit, returning exit code.
|
||
|
|
// Recovers from panics.
|
||
|
|
func (s *Supervisor) runUnit(u *supervisedUnit, ctx context.Context) (exitCode int) {
|
||
|
|
defer func() {
|
||
|
|
if r := recover(); r != nil {
|
||
|
|
slog.Error("supervisor: unit panicked",
|
||
|
|
"name", u.name,
|
||
|
|
"panic", fmt.Sprintf("%v", r),
|
||
|
|
)
|
||
|
|
exitCode = 1
|
||
|
|
}
|
||
|
|
}()
|
||
|
|
|
||
|
|
switch u.unitType {
|
||
|
|
case "process":
|
||
|
|
return s.runProcess(u, ctx)
|
||
|
|
case "goroutine":
|
||
|
|
return s.runGoFunc(u, ctx)
|
||
|
|
default:
|
||
|
|
slog.Error("supervisor: unknown unit type", "name", u.name, "type", u.unitType)
|
||
|
|
return 1
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// runProcess starts an external process and waits for it to exit.
|
||
|
|
func (s *Supervisor) runProcess(u *supervisedUnit, ctx context.Context) int {
|
||
|
|
proc, err := s.service.StartWithOptions(ctx, *u.runOpts)
|
||
|
|
if err != nil {
|
||
|
|
slog.Error("supervisor: failed to start process",
|
||
|
|
"name", u.name,
|
||
|
|
"error", err,
|
||
|
|
)
|
||
|
|
return 1
|
||
|
|
}
|
||
|
|
|
||
|
|
u.mu.Lock()
|
||
|
|
u.proc = proc
|
||
|
|
u.mu.Unlock()
|
||
|
|
|
||
|
|
// Wait for process to finish or context cancellation
|
||
|
|
select {
|
||
|
|
case <-proc.Done():
|
||
|
|
info := proc.Info()
|
||
|
|
return info.ExitCode
|
||
|
|
case <-ctx.Done():
|
||
|
|
// Context cancelled — kill the process
|
||
|
|
_ = proc.Kill()
|
||
|
|
<-proc.Done()
|
||
|
|
return -1
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// runGoFunc runs a Go function and returns 0 on success, 1 on error.
|
||
|
|
func (s *Supervisor) runGoFunc(u *supervisedUnit, ctx context.Context) int {
|
||
|
|
if err := u.goFunc(ctx); err != nil {
|
||
|
|
if ctx.Err() != nil {
|
||
|
|
// Context was cancelled, not a real error
|
||
|
|
return -1
|
||
|
|
}
|
||
|
|
slog.Error("supervisor: go function returned error",
|
||
|
|
"name", u.name,
|
||
|
|
"error", err,
|
||
|
|
)
|
||
|
|
return 1
|
||
|
|
}
|
||
|
|
return 0
|
||
|
|
}
|
||
|
|
|
||
|
|
// Stop gracefully shuts down all supervised units.
|
||
|
|
func (s *Supervisor) Stop() {
|
||
|
|
s.cancel()
|
||
|
|
|
||
|
|
// Wait with timeout
|
||
|
|
done := make(chan struct{})
|
||
|
|
go func() {
|
||
|
|
s.wg.Wait()
|
||
|
|
close(done)
|
||
|
|
}()
|
||
|
|
|
||
|
|
select {
|
||
|
|
case <-done:
|
||
|
|
slog.Info("supervisor: all units stopped")
|
||
|
|
case <-time.After(ShutdownTimeout):
|
||
|
|
slog.Warn("supervisor: shutdown timeout, some units may not have stopped")
|
||
|
|
}
|
||
|
|
|
||
|
|
s.mu.Lock()
|
||
|
|
s.started = false
|
||
|
|
s.mu.Unlock()
|
||
|
|
}
|
||
|
|
|
||
|
|
// Restart stops and restarts a specific unit by name.
|
||
|
|
func (s *Supervisor) Restart(name string) error {
|
||
|
|
s.mu.RLock()
|
||
|
|
u, ok := s.units[name]
|
||
|
|
s.mu.RUnlock()
|
||
|
|
|
||
|
|
if !ok {
|
||
|
|
return fmt.Errorf("supervisor: unit not found: %s", name)
|
||
|
|
}
|
||
|
|
|
||
|
|
// Cancel the current run and wait for the supervision goroutine to exit
|
||
|
|
u.mu.Lock()
|
||
|
|
if u.cancel != nil {
|
||
|
|
u.cancel()
|
||
|
|
}
|
||
|
|
done := u.done
|
||
|
|
u.mu.Unlock()
|
||
|
|
|
||
|
|
// Wait for the old supervision goroutine to exit
|
||
|
|
if done != nil {
|
||
|
|
<-done
|
||
|
|
}
|
||
|
|
|
||
|
|
// Reset restart counter for the fresh start
|
||
|
|
u.mu.Lock()
|
||
|
|
u.restartCount = 0
|
||
|
|
u.mu.Unlock()
|
||
|
|
|
||
|
|
// Start fresh
|
||
|
|
s.startUnit(u)
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
|
||
|
|
// StopUnit stops a specific unit without restarting it.
|
||
|
|
func (s *Supervisor) StopUnit(name string) error {
|
||
|
|
s.mu.RLock()
|
||
|
|
u, ok := s.units[name]
|
||
|
|
s.mu.RUnlock()
|
||
|
|
|
||
|
|
if !ok {
|
||
|
|
return fmt.Errorf("supervisor: unit not found: %s", name)
|
||
|
|
}
|
||
|
|
|
||
|
|
u.mu.Lock()
|
||
|
|
if u.cancel != nil {
|
||
|
|
u.cancel()
|
||
|
|
}
|
||
|
|
// Set max restarts to 0 to prevent the loop from restarting
|
||
|
|
u.restart.MaxRestarts = 0
|
||
|
|
u.restartCount = 1
|
||
|
|
u.mu.Unlock()
|
||
|
|
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
|
||
|
|
// Status returns the status of a specific supervised unit.
|
||
|
|
func (s *Supervisor) Status(name string) (DaemonStatus, error) {
|
||
|
|
s.mu.RLock()
|
||
|
|
u, ok := s.units[name]
|
||
|
|
s.mu.RUnlock()
|
||
|
|
|
||
|
|
if !ok {
|
||
|
|
return DaemonStatus{}, fmt.Errorf("supervisor: unit not found: %s", name)
|
||
|
|
}
|
||
|
|
|
||
|
|
return u.status(), nil
|
||
|
|
}
|
||
|
|
|
||
|
|
// Statuses returns the status of all supervised units.
|
||
|
|
func (s *Supervisor) Statuses() map[string]DaemonStatus {
|
||
|
|
s.mu.RLock()
|
||
|
|
defer s.mu.RUnlock()
|
||
|
|
|
||
|
|
result := make(map[string]DaemonStatus, len(s.units))
|
||
|
|
for name, u := range s.units {
|
||
|
|
result[name] = u.status()
|
||
|
|
}
|
||
|
|
return result
|
||
|
|
}
|
||
|
|
|
||
|
|
// UnitNames returns the names of all registered units.
|
||
|
|
func (s *Supervisor) UnitNames() []string {
|
||
|
|
s.mu.RLock()
|
||
|
|
defer s.mu.RUnlock()
|
||
|
|
|
||
|
|
names := make([]string, 0, len(s.units))
|
||
|
|
for name := range s.units {
|
||
|
|
names = append(names, name)
|
||
|
|
}
|
||
|
|
return names
|
||
|
|
}
|