From df8c05595488f89b7166d6ac24052dbf5b2e6d2b Mon Sep 17 00:00:00 2001 From: Virgil Date: Wed, 1 Apr 2026 06:23:52 +0000 Subject: [PATCH] feat(ansible): honour max_fail_percentage Co-Authored-By: Virgil --- executor.go | 119 +++++++++++++++++++++++++++++++++++++++-------- executor_test.go | 58 +++++++++++++++++++++++ 2 files changed, 157 insertions(+), 20 deletions(-) diff --git a/executor.go b/executor.go index 2be708b..259e550 100644 --- a/executor.go +++ b/executor.go @@ -2,6 +2,7 @@ package ansible import ( "context" + "errors" "regexp" "slices" "strconv" @@ -130,6 +131,7 @@ func (e *Executor) runPlay(ctx context.Context, play *Play) error { if len(hosts) == 0 { return nil // No hosts matched } + failedHosts := make(map[string]bool) // Merge play vars for k, v := range play.Vars { @@ -153,28 +155,28 @@ func (e *Executor) runPlay(ctx context.Context, play *Play) error { for _, batch := range e.hostBatches(hosts, play.Serial) { // Execute pre_tasks for _, task := range play.PreTasks { - if err := e.runTaskOnHosts(ctx, batch, &task, play); err != nil { + if err := e.runTaskOnHosts(ctx, batch, &task, play, failedHosts, len(hosts), play.MaxFailPercent); err != nil && !isHostFailureError(err) { return err } } // Execute roles for _, roleRef := range play.Roles { - if err := e.runRole(ctx, batch, &roleRef, play); err != nil { + if err := e.runRole(ctx, batch, &roleRef, play, failedHosts, len(hosts), play.MaxFailPercent); err != nil && !isHostFailureError(err) { return err } } // Execute tasks for _, task := range play.Tasks { - if err := e.runTaskOnHosts(ctx, batch, &task, play); err != nil { + if err := e.runTaskOnHosts(ctx, batch, &task, play, failedHosts, len(hosts), play.MaxFailPercent); err != nil && !isHostFailureError(err) { return err } } // Execute post_tasks for _, task := range play.PostTasks { - if err := e.runTaskOnHosts(ctx, batch, &task, play); err != nil { + if err := e.runTaskOnHosts(ctx, batch, &task, play, failedHosts, len(hosts), play.MaxFailPercent); err != nil && !isHostFailureError(err) { return err } } @@ -183,7 +185,7 @@ func (e *Executor) runPlay(ctx context.Context, play *Play) error { // Run notified handlers for _, handler := range play.Handlers { if e.notified[handler.Name] { - if err := e.runTaskOnHosts(ctx, hosts, &handler, play); err != nil { + if err := e.runTaskOnHosts(ctx, hosts, &handler, play, failedHosts, len(hosts), play.MaxFailPercent); err != nil && !isHostFailureError(err) { return err } } @@ -241,7 +243,7 @@ func serialBatchSize(serial any, hostCount int) int { } // runRole executes a role on hosts. -func (e *Executor) runRole(ctx context.Context, hosts []string, roleRef *RoleRef, play *Play) error { +func (e *Executor) runRole(ctx context.Context, hosts []string, roleRef *RoleRef, play *Play, failedHosts map[string]bool, totalHosts int, maxFailPercent int) error { // Check when condition if roleRef.When != nil { if !e.evaluateWhen(roleRef.When, "", nil) { @@ -268,7 +270,10 @@ func (e *Executor) runRole(ctx context.Context, hosts []string, roleRef *RoleRef for _, task := range tasks { task := task task.Tags = append(append([]string{}, roleRef.Tags...), task.Tags...) - if err := e.runTaskOnHosts(ctx, hosts, &task, play); err != nil { + if err := e.runTaskOnHosts(ctx, hosts, &task, play, failedHosts, totalHosts, maxFailPercent); err != nil { + if isHostFailureError(err) { + continue + } // Restore vars e.vars = oldVars return err @@ -281,7 +286,12 @@ func (e *Executor) runRole(ctx context.Context, hosts []string, roleRef *RoleRef } // runTaskOnHosts runs a task on all hosts. -func (e *Executor) runTaskOnHosts(ctx context.Context, hosts []string, task *Task, play *Play) error { +func (e *Executor) runTaskOnHosts(ctx context.Context, hosts []string, task *Task, play *Play, failedHosts map[string]bool, totalHosts int, maxFailPercent int) error { + hosts = filterFailedHosts(hosts, failedHosts) + if len(hosts) == 0 { + return nil + } + // run_once executes the task only on the first host in the current batch. if task.RunOnce && len(hosts) > 1 { hosts = hosts[:1] @@ -295,28 +305,94 @@ func (e *Executor) runTaskOnHosts(ctx context.Context, hosts []string, task *Tas // Handle block tasks if len(task.Block) > 0 { - return e.runBlock(ctx, hosts, task, play) + return e.runBlock(ctx, hosts, task, play, failedHosts, totalHosts, maxFailPercent) } // Handle include/import if task.IncludeTasks != "" || task.ImportTasks != "" { - return e.runIncludeTasks(ctx, hosts, task, play) + return e.runIncludeTasks(ctx, hosts, task, play, failedHosts, totalHosts, maxFailPercent) } if task.IncludeRole != nil || task.ImportRole != nil { - return e.runIncludeRole(ctx, hosts, task, play) + return e.runIncludeRole(ctx, hosts, task, play, failedHosts, totalHosts, maxFailPercent) } + var ( + haveFailure bool + lastErr error + ) + for _, host := range hosts { + if failedHosts[host] { + continue + } if err := e.runTaskOnHost(ctx, host, task, play); err != nil { if !task.IgnoreErrors { - return err + failedHosts[host] = true + taskErr := coreerr.E("Executor.runTaskOnHosts", sprintf("task failed on %s: %s", host, err.Error()), err) + if maxFailPercent > 0 && exceedsMaxFailPercent(len(failedHosts), totalHosts, maxFailPercent) { + return coreerr.E("Executor.runTaskOnHosts", sprintf("max fail percentage exceeded: %d%% failed hosts of %d", len(failedHosts), totalHosts), taskErr) + } + if maxFailPercent > 0 { + haveFailure = true + lastErr = taskErr + continue + } + return taskErr } } } + if haveFailure { + return &hostFailureError{err: lastErr} + } + return nil } +func filterFailedHosts(hosts []string, failedHosts map[string]bool) []string { + if len(hosts) == 0 || len(failedHosts) == 0 { + return hosts + } + + filtered := make([]string, 0, len(hosts)) + for _, host := range hosts { + if !failedHosts[host] { + filtered = append(filtered, host) + } + } + return filtered +} + +func exceedsMaxFailPercent(failedHosts, totalHosts, maxFailPercent int) bool { + if maxFailPercent <= 0 || totalHosts <= 0 || failedHosts <= 0 { + return false + } + return failedHosts*100 > totalHosts*maxFailPercent +} + +type hostFailureError struct { + err error +} + +func (e *hostFailureError) Error() string { + if e == nil || e.err == nil { + return "host failure" + } + return e.err.Error() +} + +func (e *hostFailureError) Unwrap() error { + if e == nil { + return nil + } + return e.err +} + +func isHostFailureError(err error) bool { + var target *hostFailureError + return errors.As(err, &target) +} + // runTaskOnHost runs a task on a single host. func (e *Executor) runTaskOnHost(ctx context.Context, host string, task *Task, play *Play) error { start := time.Now() @@ -596,12 +672,12 @@ func (e *Executor) runLoop(ctx context.Context, host string, client *SSHClient, } // runBlock handles block/rescue/always. -func (e *Executor) runBlock(ctx context.Context, hosts []string, task *Task, play *Play) error { +func (e *Executor) runBlock(ctx context.Context, hosts []string, task *Task, play *Play, failedHosts map[string]bool, totalHosts int, maxFailPercent int) error { var blockErr error // Try block for _, t := range task.Block { - if err := e.runTaskOnHosts(ctx, hosts, &t, play); err != nil { + if err := e.runTaskOnHosts(ctx, hosts, &t, play, failedHosts, totalHosts, maxFailPercent); err != nil { blockErr = err break } @@ -610,7 +686,7 @@ func (e *Executor) runBlock(ctx context.Context, hosts []string, task *Task, pla // Run rescue if block failed if blockErr != nil && len(task.Rescue) > 0 { for _, t := range task.Rescue { - if err := e.runTaskOnHosts(ctx, hosts, &t, play); err != nil { + if err := e.runTaskOnHosts(ctx, hosts, &t, play, failedHosts, totalHosts, maxFailPercent); err != nil { // Rescue also failed break } @@ -619,7 +695,7 @@ func (e *Executor) runBlock(ctx context.Context, hosts []string, task *Task, pla // Always run always block for _, t := range task.Always { - if err := e.runTaskOnHosts(ctx, hosts, &t, play); err != nil { + if err := e.runTaskOnHosts(ctx, hosts, &t, play, failedHosts, totalHosts, maxFailPercent); err != nil { if blockErr == nil { blockErr = err } @@ -634,7 +710,7 @@ func (e *Executor) runBlock(ctx context.Context, hosts []string, task *Task, pla } // runIncludeTasks handles include_tasks/import_tasks. -func (e *Executor) runIncludeTasks(ctx context.Context, hosts []string, task *Task, play *Play) error { +func (e *Executor) runIncludeTasks(ctx context.Context, hosts []string, task *Task, play *Play, failedHosts map[string]bool, totalHosts int, maxFailPercent int) error { path := task.IncludeTasks if path == "" { path = task.ImportTasks @@ -649,7 +725,10 @@ func (e *Executor) runIncludeTasks(ctx context.Context, hosts []string, task *Ta } for _, t := range tasks { - if err := e.runTaskOnHosts(ctx, hosts, &t, play); err != nil { + if err := e.runTaskOnHosts(ctx, hosts, &t, play, failedHosts, totalHosts, maxFailPercent); err != nil { + if isHostFailureError(err) { + continue + } return err } } @@ -658,7 +737,7 @@ func (e *Executor) runIncludeTasks(ctx context.Context, hosts []string, task *Ta } // runIncludeRole handles include_role/import_role. -func (e *Executor) runIncludeRole(ctx context.Context, hosts []string, task *Task, play *Play) error { +func (e *Executor) runIncludeRole(ctx context.Context, hosts []string, task *Task, play *Play, failedHosts map[string]bool, totalHosts int, maxFailPercent int) error { var roleName, tasksFrom string var roleVars map[string]any @@ -678,7 +757,7 @@ func (e *Executor) runIncludeRole(ctx context.Context, hosts []string, task *Tas Vars: roleVars, } - return e.runRole(ctx, hosts, roleRef, play) + return e.runRole(ctx, hosts, roleRef, play, failedHosts, totalHosts, maxFailPercent) } // getHosts returns hosts matching the pattern. diff --git a/executor_test.go b/executor_test.go index 5d425c9..473030b 100644 --- a/executor_test.go +++ b/executor_test.go @@ -138,6 +138,64 @@ func TestExecutor_RunPlay_Good_SerialBatchesHosts(t *testing.T) { }, gathered) } +func TestExecutor_RunPlay_Good_MaxFailPercentStopsAfterThreshold(t *testing.T) { + e := NewExecutor("/tmp") + e.SetInventoryDirect(&Inventory{ + All: &InventoryGroup{ + Hosts: map[string]*Host{ + "host1": {Vars: map[string]any{"fail_first": true, "fail_second": false}}, + "host2": {Vars: map[string]any{"fail_first": false, "fail_second": true}}, + "host3": {Vars: map[string]any{"fail_first": false, "fail_second": false}}, + }, + }, + }) + + var executed []string + e.OnTaskStart = func(host string, task *Task) { + executed = append(executed, host+":"+task.Name) + } + + gatherFacts := false + play := &Play{ + Name: "max fail", + Hosts: "all", + GatherFacts: &gatherFacts, + MaxFailPercent: 50, + Tasks: []Task{ + { + Name: "first failure", + Module: "fail", + Args: map[string]any{"msg": "first"}, + When: "{{ fail_first }}", + }, + { + Name: "second failure", + Module: "fail", + Args: map[string]any{"msg": "second"}, + When: "{{ fail_second }}", + }, + { + Name: "final task", + Module: "debug", + Args: map[string]any{"msg": "ok"}, + }, + }, + } + + err := e.runPlay(context.Background(), play) + require.Error(t, err) + assert.Equal(t, []string{ + "host1:first failure", + "host2:first failure", + "host3:first failure", + "host2:second failure", + }, executed) + assert.NotContains(t, executed, "host3:second failure") + assert.NotContains(t, executed, "host1:final task") + assert.NotContains(t, executed, "host2:final task") + assert.NotContains(t, executed, "host3:final task") +} + func TestExecutor_RunPlay_Good_RunOnceTaskOnlyRunsOnFirstHost(t *testing.T) { e := NewExecutor("/tmp") e.SetInventoryDirect(&Inventory{