feat(ansible): honour max_fail_percentage
Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
parent
9246a91fec
commit
df8c055954
2 changed files with 157 additions and 20 deletions
119
executor.go
119
executor.go
|
|
@ -2,6 +2,7 @@ package ansible
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"regexp"
|
||||
"slices"
|
||||
"strconv"
|
||||
|
|
@ -130,6 +131,7 @@ func (e *Executor) runPlay(ctx context.Context, play *Play) error {
|
|||
if len(hosts) == 0 {
|
||||
return nil // No hosts matched
|
||||
}
|
||||
failedHosts := make(map[string]bool)
|
||||
|
||||
// Merge play vars
|
||||
for k, v := range play.Vars {
|
||||
|
|
@ -153,28 +155,28 @@ func (e *Executor) runPlay(ctx context.Context, play *Play) error {
|
|||
for _, batch := range e.hostBatches(hosts, play.Serial) {
|
||||
// Execute pre_tasks
|
||||
for _, task := range play.PreTasks {
|
||||
if err := e.runTaskOnHosts(ctx, batch, &task, play); err != nil {
|
||||
if err := e.runTaskOnHosts(ctx, batch, &task, play, failedHosts, len(hosts), play.MaxFailPercent); err != nil && !isHostFailureError(err) {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Execute roles
|
||||
for _, roleRef := range play.Roles {
|
||||
if err := e.runRole(ctx, batch, &roleRef, play); err != nil {
|
||||
if err := e.runRole(ctx, batch, &roleRef, play, failedHosts, len(hosts), play.MaxFailPercent); err != nil && !isHostFailureError(err) {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Execute tasks
|
||||
for _, task := range play.Tasks {
|
||||
if err := e.runTaskOnHosts(ctx, batch, &task, play); err != nil {
|
||||
if err := e.runTaskOnHosts(ctx, batch, &task, play, failedHosts, len(hosts), play.MaxFailPercent); err != nil && !isHostFailureError(err) {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Execute post_tasks
|
||||
for _, task := range play.PostTasks {
|
||||
if err := e.runTaskOnHosts(ctx, batch, &task, play); err != nil {
|
||||
if err := e.runTaskOnHosts(ctx, batch, &task, play, failedHosts, len(hosts), play.MaxFailPercent); err != nil && !isHostFailureError(err) {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
|
@ -183,7 +185,7 @@ func (e *Executor) runPlay(ctx context.Context, play *Play) error {
|
|||
// Run notified handlers
|
||||
for _, handler := range play.Handlers {
|
||||
if e.notified[handler.Name] {
|
||||
if err := e.runTaskOnHosts(ctx, hosts, &handler, play); err != nil {
|
||||
if err := e.runTaskOnHosts(ctx, hosts, &handler, play, failedHosts, len(hosts), play.MaxFailPercent); err != nil && !isHostFailureError(err) {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
|
@ -241,7 +243,7 @@ func serialBatchSize(serial any, hostCount int) int {
|
|||
}
|
||||
|
||||
// runRole executes a role on hosts.
|
||||
func (e *Executor) runRole(ctx context.Context, hosts []string, roleRef *RoleRef, play *Play) error {
|
||||
func (e *Executor) runRole(ctx context.Context, hosts []string, roleRef *RoleRef, play *Play, failedHosts map[string]bool, totalHosts int, maxFailPercent int) error {
|
||||
// Check when condition
|
||||
if roleRef.When != nil {
|
||||
if !e.evaluateWhen(roleRef.When, "", nil) {
|
||||
|
|
@ -268,7 +270,10 @@ func (e *Executor) runRole(ctx context.Context, hosts []string, roleRef *RoleRef
|
|||
for _, task := range tasks {
|
||||
task := task
|
||||
task.Tags = append(append([]string{}, roleRef.Tags...), task.Tags...)
|
||||
if err := e.runTaskOnHosts(ctx, hosts, &task, play); err != nil {
|
||||
if err := e.runTaskOnHosts(ctx, hosts, &task, play, failedHosts, totalHosts, maxFailPercent); err != nil {
|
||||
if isHostFailureError(err) {
|
||||
continue
|
||||
}
|
||||
// Restore vars
|
||||
e.vars = oldVars
|
||||
return err
|
||||
|
|
@ -281,7 +286,12 @@ func (e *Executor) runRole(ctx context.Context, hosts []string, roleRef *RoleRef
|
|||
}
|
||||
|
||||
// runTaskOnHosts runs a task on all hosts.
|
||||
func (e *Executor) runTaskOnHosts(ctx context.Context, hosts []string, task *Task, play *Play) error {
|
||||
func (e *Executor) runTaskOnHosts(ctx context.Context, hosts []string, task *Task, play *Play, failedHosts map[string]bool, totalHosts int, maxFailPercent int) error {
|
||||
hosts = filterFailedHosts(hosts, failedHosts)
|
||||
if len(hosts) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// run_once executes the task only on the first host in the current batch.
|
||||
if task.RunOnce && len(hosts) > 1 {
|
||||
hosts = hosts[:1]
|
||||
|
|
@ -295,28 +305,94 @@ func (e *Executor) runTaskOnHosts(ctx context.Context, hosts []string, task *Tas
|
|||
|
||||
// Handle block tasks
|
||||
if len(task.Block) > 0 {
|
||||
return e.runBlock(ctx, hosts, task, play)
|
||||
return e.runBlock(ctx, hosts, task, play, failedHosts, totalHosts, maxFailPercent)
|
||||
}
|
||||
|
||||
// Handle include/import
|
||||
if task.IncludeTasks != "" || task.ImportTasks != "" {
|
||||
return e.runIncludeTasks(ctx, hosts, task, play)
|
||||
return e.runIncludeTasks(ctx, hosts, task, play, failedHosts, totalHosts, maxFailPercent)
|
||||
}
|
||||
if task.IncludeRole != nil || task.ImportRole != nil {
|
||||
return e.runIncludeRole(ctx, hosts, task, play)
|
||||
return e.runIncludeRole(ctx, hosts, task, play, failedHosts, totalHosts, maxFailPercent)
|
||||
}
|
||||
|
||||
var (
|
||||
haveFailure bool
|
||||
lastErr error
|
||||
)
|
||||
|
||||
for _, host := range hosts {
|
||||
if failedHosts[host] {
|
||||
continue
|
||||
}
|
||||
if err := e.runTaskOnHost(ctx, host, task, play); err != nil {
|
||||
if !task.IgnoreErrors {
|
||||
return err
|
||||
failedHosts[host] = true
|
||||
taskErr := coreerr.E("Executor.runTaskOnHosts", sprintf("task failed on %s: %s", host, err.Error()), err)
|
||||
if maxFailPercent > 0 && exceedsMaxFailPercent(len(failedHosts), totalHosts, maxFailPercent) {
|
||||
return coreerr.E("Executor.runTaskOnHosts", sprintf("max fail percentage exceeded: %d%% failed hosts of %d", len(failedHosts), totalHosts), taskErr)
|
||||
}
|
||||
if maxFailPercent > 0 {
|
||||
haveFailure = true
|
||||
lastErr = taskErr
|
||||
continue
|
||||
}
|
||||
return taskErr
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if haveFailure {
|
||||
return &hostFailureError{err: lastErr}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func filterFailedHosts(hosts []string, failedHosts map[string]bool) []string {
|
||||
if len(hosts) == 0 || len(failedHosts) == 0 {
|
||||
return hosts
|
||||
}
|
||||
|
||||
filtered := make([]string, 0, len(hosts))
|
||||
for _, host := range hosts {
|
||||
if !failedHosts[host] {
|
||||
filtered = append(filtered, host)
|
||||
}
|
||||
}
|
||||
return filtered
|
||||
}
|
||||
|
||||
func exceedsMaxFailPercent(failedHosts, totalHosts, maxFailPercent int) bool {
|
||||
if maxFailPercent <= 0 || totalHosts <= 0 || failedHosts <= 0 {
|
||||
return false
|
||||
}
|
||||
return failedHosts*100 > totalHosts*maxFailPercent
|
||||
}
|
||||
|
||||
type hostFailureError struct {
|
||||
err error
|
||||
}
|
||||
|
||||
func (e *hostFailureError) Error() string {
|
||||
if e == nil || e.err == nil {
|
||||
return "host failure"
|
||||
}
|
||||
return e.err.Error()
|
||||
}
|
||||
|
||||
func (e *hostFailureError) Unwrap() error {
|
||||
if e == nil {
|
||||
return nil
|
||||
}
|
||||
return e.err
|
||||
}
|
||||
|
||||
func isHostFailureError(err error) bool {
|
||||
var target *hostFailureError
|
||||
return errors.As(err, &target)
|
||||
}
|
||||
|
||||
// runTaskOnHost runs a task on a single host.
|
||||
func (e *Executor) runTaskOnHost(ctx context.Context, host string, task *Task, play *Play) error {
|
||||
start := time.Now()
|
||||
|
|
@ -596,12 +672,12 @@ func (e *Executor) runLoop(ctx context.Context, host string, client *SSHClient,
|
|||
}
|
||||
|
||||
// runBlock handles block/rescue/always.
|
||||
func (e *Executor) runBlock(ctx context.Context, hosts []string, task *Task, play *Play) error {
|
||||
func (e *Executor) runBlock(ctx context.Context, hosts []string, task *Task, play *Play, failedHosts map[string]bool, totalHosts int, maxFailPercent int) error {
|
||||
var blockErr error
|
||||
|
||||
// Try block
|
||||
for _, t := range task.Block {
|
||||
if err := e.runTaskOnHosts(ctx, hosts, &t, play); err != nil {
|
||||
if err := e.runTaskOnHosts(ctx, hosts, &t, play, failedHosts, totalHosts, maxFailPercent); err != nil {
|
||||
blockErr = err
|
||||
break
|
||||
}
|
||||
|
|
@ -610,7 +686,7 @@ func (e *Executor) runBlock(ctx context.Context, hosts []string, task *Task, pla
|
|||
// Run rescue if block failed
|
||||
if blockErr != nil && len(task.Rescue) > 0 {
|
||||
for _, t := range task.Rescue {
|
||||
if err := e.runTaskOnHosts(ctx, hosts, &t, play); err != nil {
|
||||
if err := e.runTaskOnHosts(ctx, hosts, &t, play, failedHosts, totalHosts, maxFailPercent); err != nil {
|
||||
// Rescue also failed
|
||||
break
|
||||
}
|
||||
|
|
@ -619,7 +695,7 @@ func (e *Executor) runBlock(ctx context.Context, hosts []string, task *Task, pla
|
|||
|
||||
// Always run always block
|
||||
for _, t := range task.Always {
|
||||
if err := e.runTaskOnHosts(ctx, hosts, &t, play); err != nil {
|
||||
if err := e.runTaskOnHosts(ctx, hosts, &t, play, failedHosts, totalHosts, maxFailPercent); err != nil {
|
||||
if blockErr == nil {
|
||||
blockErr = err
|
||||
}
|
||||
|
|
@ -634,7 +710,7 @@ func (e *Executor) runBlock(ctx context.Context, hosts []string, task *Task, pla
|
|||
}
|
||||
|
||||
// runIncludeTasks handles include_tasks/import_tasks.
|
||||
func (e *Executor) runIncludeTasks(ctx context.Context, hosts []string, task *Task, play *Play) error {
|
||||
func (e *Executor) runIncludeTasks(ctx context.Context, hosts []string, task *Task, play *Play, failedHosts map[string]bool, totalHosts int, maxFailPercent int) error {
|
||||
path := task.IncludeTasks
|
||||
if path == "" {
|
||||
path = task.ImportTasks
|
||||
|
|
@ -649,7 +725,10 @@ func (e *Executor) runIncludeTasks(ctx context.Context, hosts []string, task *Ta
|
|||
}
|
||||
|
||||
for _, t := range tasks {
|
||||
if err := e.runTaskOnHosts(ctx, hosts, &t, play); err != nil {
|
||||
if err := e.runTaskOnHosts(ctx, hosts, &t, play, failedHosts, totalHosts, maxFailPercent); err != nil {
|
||||
if isHostFailureError(err) {
|
||||
continue
|
||||
}
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
|
@ -658,7 +737,7 @@ func (e *Executor) runIncludeTasks(ctx context.Context, hosts []string, task *Ta
|
|||
}
|
||||
|
||||
// runIncludeRole handles include_role/import_role.
|
||||
func (e *Executor) runIncludeRole(ctx context.Context, hosts []string, task *Task, play *Play) error {
|
||||
func (e *Executor) runIncludeRole(ctx context.Context, hosts []string, task *Task, play *Play, failedHosts map[string]bool, totalHosts int, maxFailPercent int) error {
|
||||
var roleName, tasksFrom string
|
||||
var roleVars map[string]any
|
||||
|
||||
|
|
@ -678,7 +757,7 @@ func (e *Executor) runIncludeRole(ctx context.Context, hosts []string, task *Tas
|
|||
Vars: roleVars,
|
||||
}
|
||||
|
||||
return e.runRole(ctx, hosts, roleRef, play)
|
||||
return e.runRole(ctx, hosts, roleRef, play, failedHosts, totalHosts, maxFailPercent)
|
||||
}
|
||||
|
||||
// getHosts returns hosts matching the pattern.
|
||||
|
|
|
|||
|
|
@ -138,6 +138,64 @@ func TestExecutor_RunPlay_Good_SerialBatchesHosts(t *testing.T) {
|
|||
}, gathered)
|
||||
}
|
||||
|
||||
func TestExecutor_RunPlay_Good_MaxFailPercentStopsAfterThreshold(t *testing.T) {
|
||||
e := NewExecutor("/tmp")
|
||||
e.SetInventoryDirect(&Inventory{
|
||||
All: &InventoryGroup{
|
||||
Hosts: map[string]*Host{
|
||||
"host1": {Vars: map[string]any{"fail_first": true, "fail_second": false}},
|
||||
"host2": {Vars: map[string]any{"fail_first": false, "fail_second": true}},
|
||||
"host3": {Vars: map[string]any{"fail_first": false, "fail_second": false}},
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
var executed []string
|
||||
e.OnTaskStart = func(host string, task *Task) {
|
||||
executed = append(executed, host+":"+task.Name)
|
||||
}
|
||||
|
||||
gatherFacts := false
|
||||
play := &Play{
|
||||
Name: "max fail",
|
||||
Hosts: "all",
|
||||
GatherFacts: &gatherFacts,
|
||||
MaxFailPercent: 50,
|
||||
Tasks: []Task{
|
||||
{
|
||||
Name: "first failure",
|
||||
Module: "fail",
|
||||
Args: map[string]any{"msg": "first"},
|
||||
When: "{{ fail_first }}",
|
||||
},
|
||||
{
|
||||
Name: "second failure",
|
||||
Module: "fail",
|
||||
Args: map[string]any{"msg": "second"},
|
||||
When: "{{ fail_second }}",
|
||||
},
|
||||
{
|
||||
Name: "final task",
|
||||
Module: "debug",
|
||||
Args: map[string]any{"msg": "ok"},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
err := e.runPlay(context.Background(), play)
|
||||
require.Error(t, err)
|
||||
assert.Equal(t, []string{
|
||||
"host1:first failure",
|
||||
"host2:first failure",
|
||||
"host3:first failure",
|
||||
"host2:second failure",
|
||||
}, executed)
|
||||
assert.NotContains(t, executed, "host3:second failure")
|
||||
assert.NotContains(t, executed, "host1:final task")
|
||||
assert.NotContains(t, executed, "host2:final task")
|
||||
assert.NotContains(t, executed, "host3:final task")
|
||||
}
|
||||
|
||||
func TestExecutor_RunPlay_Good_RunOnceTaskOnlyRunsOnFirstHost(t *testing.T) {
|
||||
e := NewExecutor("/tmp")
|
||||
e.SetInventoryDirect(&Inventory{
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue