go/pkg/process/supervisor.go

471 lines
11 KiB
Go
Raw Permalink Normal View History

package process
import (
"context"
"fmt"
"log/slog"
"sync"
"time"
)
// RestartPolicy configures automatic restart behaviour for supervised units.
type RestartPolicy struct {
// Delay between restart attempts.
Delay time.Duration
// MaxRestarts is the maximum number of restarts before giving up.
// Use -1 for unlimited restarts.
MaxRestarts int
}
// DaemonSpec defines a long-running external process under supervision.
type DaemonSpec struct {
// Name identifies this daemon (must be unique within the supervisor).
Name string
// RunOptions defines the command, args, dir, env.
RunOptions
// Restart configures automatic restart behaviour.
Restart RestartPolicy
}
// GoSpec defines a supervised Go function that runs in a goroutine.
// The function should block until done or ctx is cancelled.
type GoSpec struct {
// Name identifies this task (must be unique within the supervisor).
Name string
// Func is the function to supervise. It receives a context that is
// cancelled when the supervisor stops or the task is explicitly stopped.
// If it returns an error or panics, the supervisor restarts it
// according to the restart policy.
Func func(ctx context.Context) error
// Restart configures automatic restart behaviour.
Restart RestartPolicy
}
// DaemonStatus contains a snapshot of a supervised unit's state.
type DaemonStatus struct {
Name string `json:"name"`
Type string `json:"type"` // "process" or "goroutine"
Running bool `json:"running"`
PID int `json:"pid,omitempty"`
RestartCount int `json:"restartCount"`
LastStart time.Time `json:"lastStart"`
Uptime time.Duration `json:"uptime"`
ExitCode int `json:"exitCode,omitempty"`
}
// supervisedUnit is the internal state for any supervised unit.
type supervisedUnit struct {
name string
unitType string // "process" or "goroutine"
restart RestartPolicy
restartCount int
lastStart time.Time
running bool
exitCode int
// For process daemons
runOpts *RunOptions
proc *Process
// For go functions
goFunc func(ctx context.Context) error
cancel context.CancelFunc
done chan struct{} // closed when supervision goroutine exits
mu sync.Mutex
}
func (u *supervisedUnit) status() DaemonStatus {
u.mu.Lock()
defer u.mu.Unlock()
var uptime time.Duration
if u.running && !u.lastStart.IsZero() {
uptime = time.Since(u.lastStart)
}
pid := 0
if u.proc != nil {
info := u.proc.Info()
pid = info.PID
}
return DaemonStatus{
Name: u.name,
Type: u.unitType,
Running: u.running,
PID: pid,
RestartCount: u.restartCount,
LastStart: u.lastStart,
Uptime: uptime,
ExitCode: u.exitCode,
}
}
// ShutdownTimeout is the maximum time to wait for supervised units during shutdown.
const ShutdownTimeout = 15 * time.Second
// Supervisor manages long-running processes and goroutines with automatic restart.
//
// For external processes, it requires a Service instance.
// For Go functions, no Service is needed.
//
// sup := process.NewSupervisor(svc)
// sup.Register(process.DaemonSpec{
// Name: "worker",
// RunOptions: process.RunOptions{Command: "worker", Args: []string{"--port", "8080"}},
// Restart: process.RestartPolicy{Delay: 5 * time.Second, MaxRestarts: -1},
// })
// sup.RegisterFunc(process.GoSpec{
// Name: "health-check",
// Func: healthCheckLoop,
// Restart: process.RestartPolicy{Delay: time.Second, MaxRestarts: -1},
// })
// sup.Start()
// defer sup.Stop()
type Supervisor struct {
service *Service
units map[string]*supervisedUnit
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
mu sync.RWMutex
started bool
}
// NewSupervisor creates a supervisor.
// The Service parameter is optional (nil) if only supervising Go functions.
func NewSupervisor(svc *Service) *Supervisor {
ctx, cancel := context.WithCancel(context.Background())
return &Supervisor{
service: svc,
units: make(map[string]*supervisedUnit),
ctx: ctx,
cancel: cancel,
}
}
// Register adds an external process daemon for supervision.
// Panics if no Service was provided to NewSupervisor.
func (s *Supervisor) Register(spec DaemonSpec) {
if s.service == nil {
panic("process: Supervisor.Register requires a Service (use NewSupervisor with non-nil service)")
}
s.mu.Lock()
defer s.mu.Unlock()
opts := spec.RunOptions
s.units[spec.Name] = &supervisedUnit{
name: spec.Name,
unitType: "process",
restart: spec.Restart,
runOpts: &opts,
}
}
// RegisterFunc adds a Go function for supervision.
func (s *Supervisor) RegisterFunc(spec GoSpec) {
s.mu.Lock()
defer s.mu.Unlock()
s.units[spec.Name] = &supervisedUnit{
name: spec.Name,
unitType: "goroutine",
restart: spec.Restart,
goFunc: spec.Func,
}
}
// Start begins supervising all registered units.
// Safe to call once — subsequent calls are no-ops.
func (s *Supervisor) Start() {
s.mu.Lock()
if s.started {
s.mu.Unlock()
return
}
s.started = true
s.mu.Unlock()
s.mu.RLock()
for _, unit := range s.units {
s.startUnit(unit)
}
s.mu.RUnlock()
}
// startUnit launches the supervision goroutine for a single unit.
func (s *Supervisor) startUnit(u *supervisedUnit) {
u.mu.Lock()
if u.running {
u.mu.Unlock()
return
}
u.running = true
u.lastStart = time.Now()
unitCtx, unitCancel := context.WithCancel(s.ctx)
u.cancel = unitCancel
u.done = make(chan struct{})
u.mu.Unlock()
s.wg.Add(1)
go func() {
defer s.wg.Done()
defer close(u.done)
s.superviseLoop(u, unitCtx)
}()
slog.Info("supervisor: started unit", "name", u.name, "type", u.unitType)
}
// superviseLoop is the core restart loop for a supervised unit.
// ctx is the unit's own context, derived from s.ctx. Cancelling either
// the supervisor or the unit's context exits this loop.
func (s *Supervisor) superviseLoop(u *supervisedUnit, ctx context.Context) {
for {
// Check if this unit's context is cancelled (covers both
// supervisor shutdown and manual restart/stop)
select {
case <-ctx.Done():
u.mu.Lock()
u.running = false
u.mu.Unlock()
return
default:
}
// Run the unit with panic recovery
exitCode := s.runUnit(u, ctx)
// If context was cancelled during run, exit the loop
if ctx.Err() != nil {
u.mu.Lock()
u.running = false
u.mu.Unlock()
return
}
u.mu.Lock()
u.exitCode = exitCode
u.restartCount++
shouldRestart := u.restart.MaxRestarts < 0 || u.restartCount <= u.restart.MaxRestarts
delay := u.restart.Delay
count := u.restartCount
u.mu.Unlock()
if !shouldRestart {
slog.Warn("supervisor: unit reached max restarts",
"name", u.name,
"maxRestarts", u.restart.MaxRestarts,
)
u.mu.Lock()
u.running = false
u.mu.Unlock()
return
}
// Wait before restarting, or exit if context is cancelled
select {
case <-ctx.Done():
u.mu.Lock()
u.running = false
u.mu.Unlock()
return
case <-time.After(delay):
slog.Info("supervisor: restarting unit",
"name", u.name,
"restartCount", count,
"exitCode", exitCode,
)
u.mu.Lock()
u.lastStart = time.Now()
u.mu.Unlock()
}
}
}
// runUnit executes a single run of the unit, returning exit code.
// Recovers from panics.
func (s *Supervisor) runUnit(u *supervisedUnit, ctx context.Context) (exitCode int) {
defer func() {
if r := recover(); r != nil {
slog.Error("supervisor: unit panicked",
"name", u.name,
"panic", fmt.Sprintf("%v", r),
)
exitCode = 1
}
}()
switch u.unitType {
case "process":
return s.runProcess(u, ctx)
case "goroutine":
return s.runGoFunc(u, ctx)
default:
slog.Error("supervisor: unknown unit type", "name", u.name, "type", u.unitType)
return 1
}
}
// runProcess starts an external process and waits for it to exit.
func (s *Supervisor) runProcess(u *supervisedUnit, ctx context.Context) int {
proc, err := s.service.StartWithOptions(ctx, *u.runOpts)
if err != nil {
slog.Error("supervisor: failed to start process",
"name", u.name,
"error", err,
)
return 1
}
u.mu.Lock()
u.proc = proc
u.mu.Unlock()
// Wait for process to finish or context cancellation
select {
case <-proc.Done():
info := proc.Info()
return info.ExitCode
case <-ctx.Done():
// Context cancelled — kill the process
_ = proc.Kill()
<-proc.Done()
return -1
}
}
// runGoFunc runs a Go function and returns 0 on success, 1 on error.
func (s *Supervisor) runGoFunc(u *supervisedUnit, ctx context.Context) int {
if err := u.goFunc(ctx); err != nil {
if ctx.Err() != nil {
// Context was cancelled, not a real error
return -1
}
slog.Error("supervisor: go function returned error",
"name", u.name,
"error", err,
)
return 1
}
return 0
}
// Stop gracefully shuts down all supervised units.
func (s *Supervisor) Stop() {
s.cancel()
// Wait with timeout
done := make(chan struct{})
go func() {
s.wg.Wait()
close(done)
}()
select {
case <-done:
slog.Info("supervisor: all units stopped")
case <-time.After(ShutdownTimeout):
slog.Warn("supervisor: shutdown timeout, some units may not have stopped")
}
s.mu.Lock()
s.started = false
s.mu.Unlock()
}
// Restart stops and restarts a specific unit by name.
func (s *Supervisor) Restart(name string) error {
s.mu.RLock()
u, ok := s.units[name]
s.mu.RUnlock()
if !ok {
return fmt.Errorf("supervisor: unit not found: %s", name)
}
// Cancel the current run and wait for the supervision goroutine to exit
u.mu.Lock()
if u.cancel != nil {
u.cancel()
}
done := u.done
u.mu.Unlock()
// Wait for the old supervision goroutine to exit
if done != nil {
<-done
}
// Reset restart counter for the fresh start
u.mu.Lock()
u.restartCount = 0
u.mu.Unlock()
// Start fresh
s.startUnit(u)
return nil
}
// StopUnit stops a specific unit without restarting it.
func (s *Supervisor) StopUnit(name string) error {
s.mu.RLock()
u, ok := s.units[name]
s.mu.RUnlock()
if !ok {
return fmt.Errorf("supervisor: unit not found: %s", name)
}
u.mu.Lock()
if u.cancel != nil {
u.cancel()
}
// Set max restarts to 0 to prevent the loop from restarting
u.restart.MaxRestarts = 0
u.restartCount = 1
u.mu.Unlock()
return nil
}
// Status returns the status of a specific supervised unit.
func (s *Supervisor) Status(name string) (DaemonStatus, error) {
s.mu.RLock()
u, ok := s.units[name]
s.mu.RUnlock()
if !ok {
return DaemonStatus{}, fmt.Errorf("supervisor: unit not found: %s", name)
}
return u.status(), nil
}
// Statuses returns the status of all supervised units.
func (s *Supervisor) Statuses() map[string]DaemonStatus {
s.mu.RLock()
defer s.mu.RUnlock()
result := make(map[string]DaemonStatus, len(s.units))
for name, u := range s.units {
result[name] = u.status()
}
return result
}
// UnitNames returns the names of all registered units.
func (s *Supervisor) UnitNames() []string {
s.mu.RLock()
defer s.mu.RUnlock()
names := make([]string, 0, len(s.units))
for name := range s.units {
names = append(names, name)
}
return names
}