go/pkg/process/runner.go

294 lines
6.2 KiB
Go
Raw Permalink Normal View History

package process
import (
"context"
"fmt"
"sync"
"time"
)
// Runner orchestrates multiple processes with dependencies.
type Runner struct {
service *Service
}
// NewRunner creates a runner for the given service.
func NewRunner(svc *Service) *Runner {
return &Runner{service: svc}
}
// RunSpec defines a process to run with optional dependencies.
type RunSpec struct {
// Name is a friendly identifier (e.g., "lint", "test").
Name string
// Command is the executable to run.
Command string
// Args are the command arguments.
Args []string
// Dir is the working directory.
Dir string
// Env are additional environment variables.
Env []string
// After lists spec names that must complete successfully first.
After []string
// AllowFailure if true, continues pipeline even if this spec fails.
AllowFailure bool
}
// RunResult captures the outcome of a single process.
type RunResult struct {
Name string
Spec RunSpec
ExitCode int
Duration time.Duration
Output string
Error error
Skipped bool
}
// Passed returns true if the process succeeded.
func (r RunResult) Passed() bool {
return !r.Skipped && r.Error == nil && r.ExitCode == 0
}
// RunAllResult is the aggregate result of running multiple specs.
type RunAllResult struct {
Results []RunResult
Duration time.Duration
Passed int
Failed int
Skipped int
}
// Success returns true if all non-skipped specs passed.
func (r RunAllResult) Success() bool {
return r.Failed == 0
}
// RunAll executes specs respecting dependencies, parallelising where possible.
func (r *Runner) RunAll(ctx context.Context, specs []RunSpec) (*RunAllResult, error) {
start := time.Now()
// Build dependency graph
specMap := make(map[string]RunSpec)
for _, spec := range specs {
specMap[spec.Name] = spec
}
// Track completion
completed := make(map[string]*RunResult)
var completedMu sync.Mutex
results := make([]RunResult, 0, len(specs))
var resultsMu sync.Mutex
// Process specs in waves
remaining := make(map[string]RunSpec)
for _, spec := range specs {
remaining[spec.Name] = spec
}
for len(remaining) > 0 {
// Find specs ready to run (all dependencies satisfied)
ready := make([]RunSpec, 0)
for _, spec := range remaining {
if r.canRun(spec, completed) {
ready = append(ready, spec)
}
}
if len(ready) == 0 && len(remaining) > 0 {
// Deadlock - circular dependency or missing specs
for name := range remaining {
results = append(results, RunResult{
Name: name,
Spec: remaining[name],
Skipped: true,
Error: fmt.Errorf("circular dependency or missing dependency"),
})
}
break
}
// Run ready specs in parallel
var wg sync.WaitGroup
for _, spec := range ready {
wg.Add(1)
go func(spec RunSpec) {
defer wg.Done()
// Check if dependencies failed
completedMu.Lock()
shouldSkip := false
for _, dep := range spec.After {
if result, ok := completed[dep]; ok {
if !result.Passed() && !specMap[dep].AllowFailure {
shouldSkip = true
break
}
}
}
completedMu.Unlock()
var result RunResult
if shouldSkip {
result = RunResult{
Name: spec.Name,
Spec: spec,
Skipped: true,
Error: fmt.Errorf("skipped due to dependency failure"),
}
} else {
result = r.runSpec(ctx, spec)
}
completedMu.Lock()
completed[spec.Name] = &result
completedMu.Unlock()
resultsMu.Lock()
results = append(results, result)
resultsMu.Unlock()
}(spec)
}
wg.Wait()
// Remove completed from remaining
for _, spec := range ready {
delete(remaining, spec.Name)
}
}
// Build aggregate result
aggResult := &RunAllResult{
Results: results,
Duration: time.Since(start),
}
for _, res := range results {
if res.Skipped {
aggResult.Skipped++
} else if res.Passed() {
aggResult.Passed++
} else {
aggResult.Failed++
}
}
return aggResult, nil
}
// canRun checks if all dependencies are completed.
func (r *Runner) canRun(spec RunSpec, completed map[string]*RunResult) bool {
for _, dep := range spec.After {
if _, ok := completed[dep]; !ok {
return false
}
}
return true
}
// runSpec executes a single spec.
func (r *Runner) runSpec(ctx context.Context, spec RunSpec) RunResult {
start := time.Now()
proc, err := r.service.StartWithOptions(ctx, RunOptions{
Command: spec.Command,
Args: spec.Args,
Dir: spec.Dir,
Env: spec.Env,
})
if err != nil {
return RunResult{
Name: spec.Name,
Spec: spec,
Duration: time.Since(start),
Error: err,
}
}
<-proc.Done()
return RunResult{
Name: spec.Name,
Spec: spec,
ExitCode: proc.ExitCode,
Duration: proc.Duration,
Output: proc.Output(),
Error: nil,
}
}
// RunSequential executes specs one after another, stopping on first failure.
func (r *Runner) RunSequential(ctx context.Context, specs []RunSpec) (*RunAllResult, error) {
start := time.Now()
results := make([]RunResult, 0, len(specs))
for _, spec := range specs {
result := r.runSpec(ctx, spec)
results = append(results, result)
if !result.Passed() && !spec.AllowFailure {
// Mark remaining as skipped
for i := len(results); i < len(specs); i++ {
results = append(results, RunResult{
Name: specs[i].Name,
Spec: specs[i],
Skipped: true,
})
}
break
}
}
aggResult := &RunAllResult{
Results: results,
Duration: time.Since(start),
}
for _, res := range results {
if res.Skipped {
aggResult.Skipped++
} else if res.Passed() {
aggResult.Passed++
} else {
aggResult.Failed++
}
}
return aggResult, nil
}
// RunParallel executes all specs concurrently, regardless of dependencies.
func (r *Runner) RunParallel(ctx context.Context, specs []RunSpec) (*RunAllResult, error) {
start := time.Now()
results := make([]RunResult, len(specs))
var wg sync.WaitGroup
for i, spec := range specs {
wg.Add(1)
go func(i int, spec RunSpec) {
defer wg.Done()
results[i] = r.runSpec(ctx, spec)
}(i, spec)
}
wg.Wait()
aggResult := &RunAllResult{
Results: results,
Duration: time.Since(start),
}
for _, res := range results {
if res.Skipped {
aggResult.Skipped++
} else if res.Passed() {
aggResult.Passed++
} else {
aggResult.Failed++
}
}
return aggResult, nil
}