package process import ( "context" "fmt" "log/slog" "sync" "time" ) // RestartPolicy configures automatic restart behaviour for supervised units. type RestartPolicy struct { // Delay between restart attempts. Delay time.Duration // MaxRestarts is the maximum number of restarts before giving up. // Use -1 for unlimited restarts. MaxRestarts int } // DaemonSpec defines a long-running external process under supervision. type DaemonSpec struct { // Name identifies this daemon (must be unique within the supervisor). Name string // RunOptions defines the command, args, dir, env. RunOptions // Restart configures automatic restart behaviour. Restart RestartPolicy } // GoSpec defines a supervised Go function that runs in a goroutine. // The function should block until done or ctx is cancelled. type GoSpec struct { // Name identifies this task (must be unique within the supervisor). Name string // Func is the function to supervise. It receives a context that is // cancelled when the supervisor stops or the task is explicitly stopped. // If it returns an error or panics, the supervisor restarts it // according to the restart policy. Func func(ctx context.Context) error // Restart configures automatic restart behaviour. Restart RestartPolicy } // DaemonStatus contains a snapshot of a supervised unit's state. type DaemonStatus struct { Name string `json:"name"` Type string `json:"type"` // "process" or "goroutine" Running bool `json:"running"` PID int `json:"pid,omitempty"` RestartCount int `json:"restartCount"` LastStart time.Time `json:"lastStart"` Uptime time.Duration `json:"uptime"` ExitCode int `json:"exitCode,omitempty"` } // supervisedUnit is the internal state for any supervised unit. type supervisedUnit struct { name string unitType string // "process" or "goroutine" restart RestartPolicy restartCount int lastStart time.Time running bool exitCode int // For process daemons runOpts *RunOptions proc *Process // For go functions goFunc func(ctx context.Context) error cancel context.CancelFunc done chan struct{} // closed when supervision goroutine exits mu sync.Mutex } func (u *supervisedUnit) status() DaemonStatus { u.mu.Lock() defer u.mu.Unlock() var uptime time.Duration if u.running && !u.lastStart.IsZero() { uptime = time.Since(u.lastStart) } pid := 0 if u.proc != nil { info := u.proc.Info() pid = info.PID } return DaemonStatus{ Name: u.name, Type: u.unitType, Running: u.running, PID: pid, RestartCount: u.restartCount, LastStart: u.lastStart, Uptime: uptime, ExitCode: u.exitCode, } } // ShutdownTimeout is the maximum time to wait for supervised units during shutdown. const ShutdownTimeout = 15 * time.Second // Supervisor manages long-running processes and goroutines with automatic restart. // // For external processes, it requires a Service instance. // For Go functions, no Service is needed. // // sup := process.NewSupervisor(svc) // sup.Register(process.DaemonSpec{ // Name: "worker", // RunOptions: process.RunOptions{Command: "worker", Args: []string{"--port", "8080"}}, // Restart: process.RestartPolicy{Delay: 5 * time.Second, MaxRestarts: -1}, // }) // sup.RegisterFunc(process.GoSpec{ // Name: "health-check", // Func: healthCheckLoop, // Restart: process.RestartPolicy{Delay: time.Second, MaxRestarts: -1}, // }) // sup.Start() // defer sup.Stop() type Supervisor struct { service *Service units map[string]*supervisedUnit ctx context.Context cancel context.CancelFunc wg sync.WaitGroup mu sync.RWMutex started bool } // NewSupervisor creates a supervisor. // The Service parameter is optional (nil) if only supervising Go functions. func NewSupervisor(svc *Service) *Supervisor { ctx, cancel := context.WithCancel(context.Background()) return &Supervisor{ service: svc, units: make(map[string]*supervisedUnit), ctx: ctx, cancel: cancel, } } // Register adds an external process daemon for supervision. // Panics if no Service was provided to NewSupervisor. func (s *Supervisor) Register(spec DaemonSpec) { if s.service == nil { panic("process: Supervisor.Register requires a Service (use NewSupervisor with non-nil service)") } s.mu.Lock() defer s.mu.Unlock() opts := spec.RunOptions s.units[spec.Name] = &supervisedUnit{ name: spec.Name, unitType: "process", restart: spec.Restart, runOpts: &opts, } } // RegisterFunc adds a Go function for supervision. func (s *Supervisor) RegisterFunc(spec GoSpec) { s.mu.Lock() defer s.mu.Unlock() s.units[spec.Name] = &supervisedUnit{ name: spec.Name, unitType: "goroutine", restart: spec.Restart, goFunc: spec.Func, } } // Start begins supervising all registered units. // Safe to call once — subsequent calls are no-ops. func (s *Supervisor) Start() { s.mu.Lock() if s.started { s.mu.Unlock() return } s.started = true s.mu.Unlock() s.mu.RLock() for _, unit := range s.units { s.startUnit(unit) } s.mu.RUnlock() } // startUnit launches the supervision goroutine for a single unit. func (s *Supervisor) startUnit(u *supervisedUnit) { u.mu.Lock() if u.running { u.mu.Unlock() return } u.running = true u.lastStart = time.Now() unitCtx, unitCancel := context.WithCancel(s.ctx) u.cancel = unitCancel u.done = make(chan struct{}) u.mu.Unlock() s.wg.Add(1) go func() { defer s.wg.Done() defer close(u.done) s.superviseLoop(u, unitCtx) }() slog.Info("supervisor: started unit", "name", u.name, "type", u.unitType) } // superviseLoop is the core restart loop for a supervised unit. // ctx is the unit's own context, derived from s.ctx. Cancelling either // the supervisor or the unit's context exits this loop. func (s *Supervisor) superviseLoop(u *supervisedUnit, ctx context.Context) { for { // Check if this unit's context is cancelled (covers both // supervisor shutdown and manual restart/stop) select { case <-ctx.Done(): u.mu.Lock() u.running = false u.mu.Unlock() return default: } // Run the unit with panic recovery exitCode := s.runUnit(u, ctx) // If context was cancelled during run, exit the loop if ctx.Err() != nil { u.mu.Lock() u.running = false u.mu.Unlock() return } u.mu.Lock() u.exitCode = exitCode u.restartCount++ shouldRestart := u.restart.MaxRestarts < 0 || u.restartCount <= u.restart.MaxRestarts delay := u.restart.Delay count := u.restartCount u.mu.Unlock() if !shouldRestart { slog.Warn("supervisor: unit reached max restarts", "name", u.name, "maxRestarts", u.restart.MaxRestarts, ) u.mu.Lock() u.running = false u.mu.Unlock() return } // Wait before restarting, or exit if context is cancelled select { case <-ctx.Done(): u.mu.Lock() u.running = false u.mu.Unlock() return case <-time.After(delay): slog.Info("supervisor: restarting unit", "name", u.name, "restartCount", count, "exitCode", exitCode, ) u.mu.Lock() u.lastStart = time.Now() u.mu.Unlock() } } } // runUnit executes a single run of the unit, returning exit code. // Recovers from panics. func (s *Supervisor) runUnit(u *supervisedUnit, ctx context.Context) (exitCode int) { defer func() { if r := recover(); r != nil { slog.Error("supervisor: unit panicked", "name", u.name, "panic", fmt.Sprintf("%v", r), ) exitCode = 1 } }() switch u.unitType { case "process": return s.runProcess(u, ctx) case "goroutine": return s.runGoFunc(u, ctx) default: slog.Error("supervisor: unknown unit type", "name", u.name, "type", u.unitType) return 1 } } // runProcess starts an external process and waits for it to exit. func (s *Supervisor) runProcess(u *supervisedUnit, ctx context.Context) int { proc, err := s.service.StartWithOptions(ctx, *u.runOpts) if err != nil { slog.Error("supervisor: failed to start process", "name", u.name, "error", err, ) return 1 } u.mu.Lock() u.proc = proc u.mu.Unlock() // Wait for process to finish or context cancellation select { case <-proc.Done(): info := proc.Info() return info.ExitCode case <-ctx.Done(): // Context cancelled — kill the process _ = proc.Kill() <-proc.Done() return -1 } } // runGoFunc runs a Go function and returns 0 on success, 1 on error. func (s *Supervisor) runGoFunc(u *supervisedUnit, ctx context.Context) int { if err := u.goFunc(ctx); err != nil { if ctx.Err() != nil { // Context was cancelled, not a real error return -1 } slog.Error("supervisor: go function returned error", "name", u.name, "error", err, ) return 1 } return 0 } // Stop gracefully shuts down all supervised units. func (s *Supervisor) Stop() { s.cancel() // Wait with timeout done := make(chan struct{}) go func() { s.wg.Wait() close(done) }() select { case <-done: slog.Info("supervisor: all units stopped") case <-time.After(ShutdownTimeout): slog.Warn("supervisor: shutdown timeout, some units may not have stopped") } s.mu.Lock() s.started = false s.mu.Unlock() } // Restart stops and restarts a specific unit by name. func (s *Supervisor) Restart(name string) error { s.mu.RLock() u, ok := s.units[name] s.mu.RUnlock() if !ok { return fmt.Errorf("supervisor: unit not found: %s", name) } // Cancel the current run and wait for the supervision goroutine to exit u.mu.Lock() if u.cancel != nil { u.cancel() } done := u.done u.mu.Unlock() // Wait for the old supervision goroutine to exit if done != nil { <-done } // Reset restart counter for the fresh start u.mu.Lock() u.restartCount = 0 u.mu.Unlock() // Start fresh s.startUnit(u) return nil } // StopUnit stops a specific unit without restarting it. func (s *Supervisor) StopUnit(name string) error { s.mu.RLock() u, ok := s.units[name] s.mu.RUnlock() if !ok { return fmt.Errorf("supervisor: unit not found: %s", name) } u.mu.Lock() if u.cancel != nil { u.cancel() } // Set max restarts to 0 to prevent the loop from restarting u.restart.MaxRestarts = 0 u.restartCount = 1 u.mu.Unlock() return nil } // Status returns the status of a specific supervised unit. func (s *Supervisor) Status(name string) (DaemonStatus, error) { s.mu.RLock() u, ok := s.units[name] s.mu.RUnlock() if !ok { return DaemonStatus{}, fmt.Errorf("supervisor: unit not found: %s", name) } return u.status(), nil } // Statuses returns the status of all supervised units. func (s *Supervisor) Statuses() map[string]DaemonStatus { s.mu.RLock() defer s.mu.RUnlock() result := make(map[string]DaemonStatus, len(s.units)) for name, u := range s.units { result[name] = u.status() } return result } // UnitNames returns the names of all registered units. func (s *Supervisor) UnitNames() []string { s.mu.RLock() defer s.mu.RUnlock() names := make([]string, 0, len(s.units)) for name := range s.units { names = append(names, name) } return names }