feat(process): preserve runner result order
This commit is contained in:
parent
dcf058047e
commit
9457694e46
2 changed files with 61 additions and 10 deletions
37
runner.go
37
runner.go
|
|
@ -13,6 +13,9 @@ type Runner struct {
|
|||
service *Service
|
||||
}
|
||||
|
||||
// ErrRunnerNoService is returned when a runner was created without a service.
|
||||
var ErrRunnerNoService = coreerr.E("", "runner service is nil", nil)
|
||||
|
||||
// NewRunner creates a runner for the given service.
|
||||
func NewRunner(svc *Service) *Runner {
|
||||
return &Runner{service: svc}
|
||||
|
|
@ -68,20 +71,24 @@ func (r RunAllResult) Success() bool {
|
|||
|
||||
// RunAll executes specs respecting dependencies, parallelising where possible.
|
||||
func (r *Runner) RunAll(ctx context.Context, specs []RunSpec) (*RunAllResult, error) {
|
||||
if err := r.ensureService(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
start := time.Now()
|
||||
|
||||
// Build dependency graph
|
||||
specMap := make(map[string]RunSpec)
|
||||
indexMap := make(map[string]int, len(specs))
|
||||
for _, spec := range specs {
|
||||
specMap[spec.Name] = spec
|
||||
indexMap[spec.Name] = len(indexMap)
|
||||
}
|
||||
|
||||
// Track completion
|
||||
completed := make(map[string]*RunResult)
|
||||
var completedMu sync.Mutex
|
||||
|
||||
results := make([]RunResult, 0, len(specs))
|
||||
var resultsMu sync.Mutex
|
||||
results := make([]RunResult, len(specs))
|
||||
|
||||
// Process specs in waves
|
||||
remaining := make(map[string]RunSpec)
|
||||
|
|
@ -99,16 +106,15 @@ func (r *Runner) RunAll(ctx context.Context, specs []RunSpec) (*RunAllResult, er
|
|||
}
|
||||
|
||||
if len(ready) == 0 && len(remaining) > 0 {
|
||||
// Deadlock — circular dependency or missing specs. Report them as skipped
|
||||
// with an error so callers can distinguish dependency graph issues from
|
||||
// command execution failures.
|
||||
// Deadlock - circular dependency or missing specs.
|
||||
// Keep the output aligned with the input order.
|
||||
for name := range remaining {
|
||||
results = append(results, RunResult{
|
||||
results[indexMap[name]] = RunResult{
|
||||
Name: name,
|
||||
Spec: remaining[name],
|
||||
Skipped: true,
|
||||
Error: coreerr.E("Runner.RunAll", "circular dependency or missing dependency", nil),
|
||||
})
|
||||
}
|
||||
}
|
||||
break
|
||||
}
|
||||
|
|
@ -149,9 +155,7 @@ func (r *Runner) RunAll(ctx context.Context, specs []RunSpec) (*RunAllResult, er
|
|||
completed[spec.Name] = &result
|
||||
completedMu.Unlock()
|
||||
|
||||
resultsMu.Lock()
|
||||
results = append(results, result)
|
||||
resultsMu.Unlock()
|
||||
results[indexMap[spec.Name]] = result
|
||||
}(spec)
|
||||
}
|
||||
wg.Wait()
|
||||
|
|
@ -181,6 +185,13 @@ func (r *Runner) RunAll(ctx context.Context, specs []RunSpec) (*RunAllResult, er
|
|||
return aggResult, nil
|
||||
}
|
||||
|
||||
func (r *Runner) ensureService() error {
|
||||
if r == nil || r.service == nil {
|
||||
return ErrRunnerNoService
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// canRun checks if all dependencies are completed.
|
||||
func (r *Runner) canRun(spec RunSpec, completed map[string]*RunResult) bool {
|
||||
for _, dep := range spec.After {
|
||||
|
|
@ -224,6 +235,9 @@ func (r *Runner) runSpec(ctx context.Context, spec RunSpec) RunResult {
|
|||
|
||||
// RunSequential executes specs one after another, stopping on first failure.
|
||||
func (r *Runner) RunSequential(ctx context.Context, specs []RunSpec) (*RunAllResult, error) {
|
||||
if err := r.ensureService(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
start := time.Now()
|
||||
results := make([]RunResult, 0, len(specs))
|
||||
|
||||
|
|
@ -264,6 +278,9 @@ func (r *Runner) RunSequential(ctx context.Context, specs []RunSpec) (*RunAllRes
|
|||
|
||||
// RunParallel executes all specs concurrently, regardless of dependencies.
|
||||
func (r *Runner) RunParallel(ctx context.Context, specs []RunSpec) (*RunAllResult, error) {
|
||||
if err := r.ensureService(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
start := time.Now()
|
||||
results := make([]RunResult, len(specs))
|
||||
|
||||
|
|
|
|||
|
|
@ -148,6 +148,24 @@ func TestRunner_RunAll(t *testing.T) {
|
|||
assert.True(t, result.Success())
|
||||
assert.Equal(t, 4, result.Passed)
|
||||
})
|
||||
|
||||
t.Run("preserves input order", func(t *testing.T) {
|
||||
runner := newTestRunner(t)
|
||||
|
||||
specs := []RunSpec{
|
||||
{Name: "third", Command: "echo", Args: []string{"3"}, After: []string{"second"}},
|
||||
{Name: "first", Command: "echo", Args: []string{"1"}},
|
||||
{Name: "second", Command: "echo", Args: []string{"2"}, After: []string{"first"}},
|
||||
}
|
||||
|
||||
result, err := runner.RunAll(context.Background(), specs)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Len(t, result.Results, len(specs))
|
||||
for i, res := range result.Results {
|
||||
assert.Equal(t, specs[i].Name, res.Name)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestRunner_RunAll_CircularDeps(t *testing.T) {
|
||||
|
|
@ -207,3 +225,19 @@ func TestRunResult_Passed(t *testing.T) {
|
|||
assert.False(t, r.Passed())
|
||||
})
|
||||
}
|
||||
|
||||
func TestRunner_NilService(t *testing.T) {
|
||||
runner := NewRunner(nil)
|
||||
|
||||
_, err := runner.RunAll(context.Background(), nil)
|
||||
require.Error(t, err)
|
||||
assert.ErrorIs(t, err, ErrRunnerNoService)
|
||||
|
||||
_, err = runner.RunSequential(context.Background(), nil)
|
||||
require.Error(t, err)
|
||||
assert.ErrorIs(t, err, ErrRunnerNoService)
|
||||
|
||||
_, err = runner.RunParallel(context.Background(), nil)
|
||||
require.Error(t, err)
|
||||
assert.ErrorIs(t, err, ErrRunnerNoService)
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue