1323 lines
30 KiB
Go
1323 lines
30 KiB
Go
package ansible
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"regexp"
|
|
"slices"
|
|
"strconv"
|
|
"sync"
|
|
"text/template"
|
|
"time"
|
|
|
|
coreerr "dappco.re/go/core/log"
|
|
)
|
|
|
|
// Executor runs Ansible playbooks.
|
|
//
|
|
// Example:
|
|
//
|
|
// exec := NewExecutor("/workspace/playbooks")
|
|
type Executor struct {
|
|
parser *Parser
|
|
inventory *Inventory
|
|
vars map[string]any
|
|
facts map[string]*Facts
|
|
results map[string]map[string]*TaskResult // host -> register_name -> result
|
|
handlers map[string][]Task
|
|
notified map[string]bool
|
|
clients map[string]*SSHClient
|
|
mu sync.RWMutex
|
|
|
|
// Callbacks
|
|
OnPlayStart func(play *Play)
|
|
OnTaskStart func(host string, task *Task)
|
|
OnTaskEnd func(host string, task *Task, result *TaskResult)
|
|
OnPlayEnd func(play *Play)
|
|
|
|
// Options
|
|
Limit string
|
|
Tags []string
|
|
SkipTags []string
|
|
CheckMode bool
|
|
Diff bool
|
|
Verbose int
|
|
}
|
|
|
|
// NewExecutor creates a new playbook executor.
|
|
//
|
|
// Example:
|
|
//
|
|
// exec := NewExecutor("/workspace/playbooks")
|
|
func NewExecutor(basePath string) *Executor {
|
|
return &Executor{
|
|
parser: NewParser(basePath),
|
|
vars: make(map[string]any),
|
|
facts: make(map[string]*Facts),
|
|
results: make(map[string]map[string]*TaskResult),
|
|
handlers: make(map[string][]Task),
|
|
notified: make(map[string]bool),
|
|
clients: make(map[string]*SSHClient),
|
|
}
|
|
}
|
|
|
|
// SetInventory loads inventory from a file.
|
|
//
|
|
// Example:
|
|
//
|
|
// err := exec.SetInventory("/workspace/inventory.yml")
|
|
func (e *Executor) SetInventory(path string) error {
|
|
inv, err := e.parser.ParseInventory(path)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
e.inventory = inv
|
|
return nil
|
|
}
|
|
|
|
// SetInventoryDirect sets inventory directly.
|
|
//
|
|
// Example:
|
|
//
|
|
// exec.SetInventoryDirect(&Inventory{All: &InventoryGroup{}})
|
|
func (e *Executor) SetInventoryDirect(inv *Inventory) {
|
|
e.inventory = inv
|
|
}
|
|
|
|
// SetVar sets a variable.
|
|
//
|
|
// Example:
|
|
//
|
|
// exec.SetVar("env", "prod")
|
|
func (e *Executor) SetVar(key string, value any) {
|
|
e.mu.Lock()
|
|
defer e.mu.Unlock()
|
|
e.vars[key] = value
|
|
}
|
|
|
|
// Run executes a playbook.
|
|
//
|
|
// Example:
|
|
//
|
|
// err := exec.Run(context.Background(), "/workspace/playbooks/site.yml")
|
|
func (e *Executor) Run(ctx context.Context, playbookPath string) error {
|
|
plays, err := e.parser.ParsePlaybook(playbookPath)
|
|
if err != nil {
|
|
return coreerr.E("Executor.Run", "parse playbook", err)
|
|
}
|
|
|
|
for i := range plays {
|
|
if err := e.runPlay(ctx, &plays[i]); err != nil {
|
|
return coreerr.E("Executor.Run", sprintf("play %d (%s)", i, plays[i].Name), err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// runPlay executes a single play.
|
|
func (e *Executor) runPlay(ctx context.Context, play *Play) error {
|
|
if e.OnPlayStart != nil {
|
|
e.OnPlayStart(play)
|
|
}
|
|
defer func() {
|
|
if e.OnPlayEnd != nil {
|
|
e.OnPlayEnd(play)
|
|
}
|
|
}()
|
|
|
|
// Get target hosts
|
|
hosts := e.getHosts(play.Hosts)
|
|
if len(hosts) == 0 {
|
|
return nil // No hosts matched
|
|
}
|
|
failedHosts := make(map[string]bool)
|
|
|
|
// Merge play vars
|
|
for k, v := range play.Vars {
|
|
e.vars[k] = v
|
|
}
|
|
|
|
// Gather facts if needed
|
|
gatherFacts := play.GatherFacts == nil || *play.GatherFacts
|
|
if gatherFacts {
|
|
for _, host := range hosts {
|
|
if err := e.gatherFacts(ctx, host, play); err != nil {
|
|
// Non-fatal
|
|
if e.Verbose > 0 {
|
|
coreerr.Warn("gather facts failed", "host", host, "err", err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Execute hosts in serial batches when requested.
|
|
for _, batch := range e.hostBatches(hosts, play.Serial) {
|
|
// Execute pre_tasks
|
|
for _, task := range play.PreTasks {
|
|
if err := e.runTaskOnHosts(ctx, batch, &task, play, failedHosts, len(hosts), play.MaxFailPercent); err != nil && !isHostFailureError(err) {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Execute roles
|
|
for _, roleRef := range play.Roles {
|
|
if err := e.runRole(ctx, batch, &roleRef, play, failedHosts, len(hosts), play.MaxFailPercent); err != nil && !isHostFailureError(err) {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Execute tasks
|
|
for _, task := range play.Tasks {
|
|
if err := e.runTaskOnHosts(ctx, batch, &task, play, failedHosts, len(hosts), play.MaxFailPercent); err != nil && !isHostFailureError(err) {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Execute post_tasks
|
|
for _, task := range play.PostTasks {
|
|
if err := e.runTaskOnHosts(ctx, batch, &task, play, failedHosts, len(hosts), play.MaxFailPercent); err != nil && !isHostFailureError(err) {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
// Run notified handlers
|
|
for _, handler := range play.Handlers {
|
|
if e.notified[handler.Name] {
|
|
if err := e.runTaskOnHosts(ctx, hosts, &handler, play, failedHosts, len(hosts), play.MaxFailPercent); err != nil && !isHostFailureError(err) {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// hostBatches returns the host list split into serial batches.
|
|
func (e *Executor) hostBatches(hosts []string, serial any) [][]string {
|
|
batchSize := serialBatchSize(serial, len(hosts))
|
|
if batchSize >= len(hosts) {
|
|
return [][]string{hosts}
|
|
}
|
|
|
|
batches := make([][]string, 0, (len(hosts)+batchSize-1)/batchSize)
|
|
for start := 0; start < len(hosts); start += batchSize {
|
|
end := start + batchSize
|
|
if end > len(hosts) {
|
|
end = len(hosts)
|
|
}
|
|
batch := make([]string, end-start)
|
|
copy(batch, hosts[start:end])
|
|
batches = append(batches, batch)
|
|
}
|
|
return batches
|
|
}
|
|
|
|
// serialBatchSize normalises the play serial value to a usable batch size.
|
|
func serialBatchSize(serial any, hostCount int) int {
|
|
switch v := serial.(type) {
|
|
case int:
|
|
if v > 0 {
|
|
if v > hostCount {
|
|
return hostCount
|
|
}
|
|
return v
|
|
}
|
|
case int64:
|
|
if v > 0 {
|
|
if int(v) > hostCount {
|
|
return hostCount
|
|
}
|
|
return int(v)
|
|
}
|
|
case string:
|
|
if n, err := strconv.Atoi(corexTrimSpace(v)); err == nil && n > 0 {
|
|
if n > hostCount {
|
|
return hostCount
|
|
}
|
|
return n
|
|
}
|
|
}
|
|
return hostCount
|
|
}
|
|
|
|
// runRole executes a role on hosts.
|
|
func (e *Executor) runRole(ctx context.Context, hosts []string, roleRef *RoleRef, play *Play, failedHosts map[string]bool, totalHosts int, maxFailPercent int) error {
|
|
// Check when condition
|
|
if roleRef.When != nil {
|
|
if !e.evaluateWhen(roleRef.When, "", nil) {
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// Parse role tasks
|
|
tasks, err := e.parser.ParseRole(roleRef.Role, roleRef.TasksFrom)
|
|
if err != nil {
|
|
return coreerr.E("executor.runRole", sprintf("parse role %s", roleRef.Role), err)
|
|
}
|
|
|
|
// Merge role vars
|
|
oldVars := make(map[string]any)
|
|
for k, v := range e.vars {
|
|
oldVars[k] = v
|
|
}
|
|
for k, v := range roleRef.Vars {
|
|
e.vars[k] = v
|
|
}
|
|
|
|
// Execute tasks
|
|
for _, task := range tasks {
|
|
task := task
|
|
task.Tags = append(append([]string{}, roleRef.Tags...), task.Tags...)
|
|
if err := e.runTaskOnHosts(ctx, hosts, &task, play, failedHosts, totalHosts, maxFailPercent); err != nil {
|
|
if isHostFailureError(err) {
|
|
continue
|
|
}
|
|
// Restore vars
|
|
e.vars = oldVars
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Restore vars
|
|
e.vars = oldVars
|
|
return nil
|
|
}
|
|
|
|
// runTaskOnHosts runs a task on all hosts.
|
|
func (e *Executor) runTaskOnHosts(ctx context.Context, hosts []string, task *Task, play *Play, failedHosts map[string]bool, totalHosts int, maxFailPercent int) error {
|
|
hosts = filterFailedHosts(hosts, failedHosts)
|
|
if len(hosts) == 0 {
|
|
return nil
|
|
}
|
|
|
|
// run_once executes the task only on the first host in the current batch.
|
|
if task.RunOnce && len(hosts) > 1 {
|
|
hosts = hosts[:1]
|
|
}
|
|
|
|
// Check tags
|
|
tags := append(append([]string{}, play.Tags...), task.Tags...)
|
|
if !e.matchesTags(tags) {
|
|
return nil
|
|
}
|
|
|
|
// Handle block tasks
|
|
if len(task.Block) > 0 {
|
|
return e.runBlock(ctx, hosts, task, play, failedHosts, totalHosts, maxFailPercent)
|
|
}
|
|
|
|
// Handle include/import
|
|
if task.IncludeTasks != "" || task.ImportTasks != "" {
|
|
return e.runIncludeTasks(ctx, hosts, task, play, failedHosts, totalHosts, maxFailPercent)
|
|
}
|
|
if task.IncludeRole != nil || task.ImportRole != nil {
|
|
return e.runIncludeRole(ctx, hosts, task, play, failedHosts, totalHosts, maxFailPercent)
|
|
}
|
|
|
|
var (
|
|
haveFailure bool
|
|
lastErr error
|
|
)
|
|
|
|
for _, host := range hosts {
|
|
if failedHosts[host] {
|
|
continue
|
|
}
|
|
if err := e.runTaskOnHost(ctx, host, task, play); err != nil {
|
|
if !task.IgnoreErrors {
|
|
failedHosts[host] = true
|
|
taskErr := coreerr.E("Executor.runTaskOnHosts", sprintf("task failed on %s: %s", host, err.Error()), err)
|
|
if maxFailPercent > 0 && exceedsMaxFailPercent(len(failedHosts), totalHosts, maxFailPercent) {
|
|
return coreerr.E("Executor.runTaskOnHosts", sprintf("max fail percentage exceeded: %d%% failed hosts of %d", len(failedHosts), totalHosts), taskErr)
|
|
}
|
|
if maxFailPercent > 0 {
|
|
haveFailure = true
|
|
lastErr = taskErr
|
|
continue
|
|
}
|
|
return taskErr
|
|
}
|
|
}
|
|
}
|
|
|
|
if haveFailure {
|
|
return &hostFailureError{err: lastErr}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func filterFailedHosts(hosts []string, failedHosts map[string]bool) []string {
|
|
if len(hosts) == 0 || len(failedHosts) == 0 {
|
|
return hosts
|
|
}
|
|
|
|
filtered := make([]string, 0, len(hosts))
|
|
for _, host := range hosts {
|
|
if !failedHosts[host] {
|
|
filtered = append(filtered, host)
|
|
}
|
|
}
|
|
return filtered
|
|
}
|
|
|
|
func exceedsMaxFailPercent(failedHosts, totalHosts, maxFailPercent int) bool {
|
|
if maxFailPercent <= 0 || totalHosts <= 0 || failedHosts <= 0 {
|
|
return false
|
|
}
|
|
return failedHosts*100 > totalHosts*maxFailPercent
|
|
}
|
|
|
|
type hostFailureError struct {
|
|
err error
|
|
}
|
|
|
|
func (e *hostFailureError) Error() string {
|
|
if e == nil || e.err == nil {
|
|
return "host failure"
|
|
}
|
|
return e.err.Error()
|
|
}
|
|
|
|
func (e *hostFailureError) Unwrap() error {
|
|
if e == nil {
|
|
return nil
|
|
}
|
|
return e.err
|
|
}
|
|
|
|
func isHostFailureError(err error) bool {
|
|
var target *hostFailureError
|
|
return errors.As(err, &target)
|
|
}
|
|
|
|
// runTaskOnHost runs a task on a single host.
|
|
func (e *Executor) runTaskOnHost(ctx context.Context, host string, task *Task, play *Play) error {
|
|
start := time.Now()
|
|
|
|
if e.OnTaskStart != nil {
|
|
e.OnTaskStart(host, task)
|
|
}
|
|
|
|
// Initialise host results
|
|
if e.results[host] == nil {
|
|
e.results[host] = make(map[string]*TaskResult)
|
|
}
|
|
|
|
// Check when condition
|
|
if task.When != nil {
|
|
if !e.evaluateWhen(task.When, host, task) {
|
|
result := &TaskResult{Skipped: true, Msg: "Skipped due to when condition"}
|
|
if task.Register != "" {
|
|
e.results[host][task.Register] = result
|
|
}
|
|
if e.OnTaskEnd != nil {
|
|
e.OnTaskEnd(host, task, result)
|
|
}
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// Get SSH client
|
|
client, err := e.getClient(host, play)
|
|
if err != nil {
|
|
return coreerr.E("Executor.runTaskOnHost", sprintf("get client for %s", host), err)
|
|
}
|
|
|
|
// Handle loops
|
|
if task.Loop != nil {
|
|
return e.runLoop(ctx, host, client, task, play)
|
|
}
|
|
|
|
// Execute the task, retrying when an until condition is present.
|
|
result, err := e.runTaskWithUntil(ctx, host, client, task, play)
|
|
if err != nil {
|
|
result = &TaskResult{Failed: true, Msg: err.Error()}
|
|
}
|
|
result.Duration = time.Since(start)
|
|
|
|
// Store result
|
|
if task.Register != "" {
|
|
e.results[host][task.Register] = result
|
|
}
|
|
|
|
// Handle notify
|
|
if result.Changed && task.Notify != nil {
|
|
e.handleNotify(task.Notify)
|
|
}
|
|
|
|
if e.OnTaskEnd != nil {
|
|
e.OnTaskEnd(host, task, result)
|
|
}
|
|
|
|
if result.Failed && !task.IgnoreErrors {
|
|
return coreerr.E("Executor.runTaskOnHost", "task failed: "+result.Msg, nil)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// runTaskWithUntil executes a task once or retries it until the until
|
|
// condition evaluates to true.
|
|
func (e *Executor) runTaskWithUntil(ctx context.Context, host string, client *SSHClient, task *Task, play *Play) (*TaskResult, error) {
|
|
if task.Until == "" {
|
|
return e.executeModule(ctx, host, client, task, play)
|
|
}
|
|
|
|
retries := task.Retries
|
|
if retries <= 0 {
|
|
retries = 3
|
|
}
|
|
|
|
delay := task.Delay
|
|
if delay <= 0 {
|
|
delay = 1
|
|
}
|
|
|
|
restoreAlias := task.Register != "result"
|
|
var (
|
|
previousAlias *TaskResult
|
|
hadAlias bool
|
|
)
|
|
if restoreAlias {
|
|
e.mu.RLock()
|
|
if hostResults, ok := e.results[host]; ok {
|
|
previousAlias, hadAlias = hostResults["result"]
|
|
}
|
|
e.mu.RUnlock()
|
|
defer func() {
|
|
e.mu.Lock()
|
|
defer e.mu.Unlock()
|
|
if e.results[host] == nil {
|
|
e.results[host] = make(map[string]*TaskResult)
|
|
}
|
|
if hadAlias {
|
|
e.results[host]["result"] = previousAlias
|
|
} else {
|
|
delete(e.results[host], "result")
|
|
}
|
|
}()
|
|
}
|
|
|
|
lastResult, lastErr := retryTask(ctx, retries, delay, func() (*TaskResult, error) {
|
|
result, err := e.executeModule(ctx, host, client, task, play)
|
|
if err != nil {
|
|
result = &TaskResult{Failed: true, Msg: err.Error()}
|
|
}
|
|
if result == nil {
|
|
result = &TaskResult{}
|
|
}
|
|
e.setTempResult(host, task.Register, result)
|
|
return result, nil
|
|
}, func(result *TaskResult) bool {
|
|
return e.evaluateWhen(task.Until, host, task)
|
|
})
|
|
|
|
if lastErr != nil {
|
|
return lastResult, lastErr
|
|
}
|
|
|
|
if lastResult == nil {
|
|
lastResult = &TaskResult{}
|
|
}
|
|
lastResult.Failed = true
|
|
if lastResult.Msg == "" {
|
|
lastResult.Msg = sprintf("until condition not met: %s", task.Until)
|
|
}
|
|
return lastResult, nil
|
|
}
|
|
|
|
// setTempResult exposes the latest task result for until evaluation without
|
|
// leaving stale state behind when a different register name is used.
|
|
func (e *Executor) setTempResult(host string, register string, result *TaskResult) {
|
|
e.mu.Lock()
|
|
defer e.mu.Unlock()
|
|
|
|
if e.results[host] == nil {
|
|
e.results[host] = make(map[string]*TaskResult)
|
|
}
|
|
e.results[host]["result"] = result
|
|
|
|
if register != "" {
|
|
e.results[host][register] = result
|
|
}
|
|
}
|
|
|
|
// retryTask runs fn until done returns true or the retry budget is exhausted.
|
|
func retryTask(ctx context.Context, retries, delay int, fn func() (*TaskResult, error), done func(*TaskResult) bool) (*TaskResult, error) {
|
|
if retries < 0 {
|
|
retries = 0
|
|
}
|
|
if delay < 0 {
|
|
delay = 0
|
|
}
|
|
|
|
var lastResult *TaskResult
|
|
var lastErr error
|
|
|
|
for attempt := 0; attempt <= retries; attempt++ {
|
|
lastResult, lastErr = fn()
|
|
if lastErr != nil {
|
|
return lastResult, lastErr
|
|
}
|
|
if lastResult == nil {
|
|
lastResult = &TaskResult{}
|
|
}
|
|
if done == nil || done(lastResult) {
|
|
return lastResult, nil
|
|
}
|
|
if attempt < retries && delay > 0 {
|
|
if err := sleepWithContext(ctx, time.Duration(delay)*time.Second); err != nil {
|
|
return lastResult, err
|
|
}
|
|
}
|
|
}
|
|
|
|
return lastResult, nil
|
|
}
|
|
|
|
// sleepWithContext pauses for the requested duration or stops early when the
|
|
// context is cancelled.
|
|
func sleepWithContext(ctx context.Context, d time.Duration) error {
|
|
timer := time.NewTimer(d)
|
|
defer timer.Stop()
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case <-timer.C:
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// runLoop handles task loops.
|
|
func (e *Executor) runLoop(ctx context.Context, host string, client *SSHClient, task *Task, play *Play) error {
|
|
items := e.resolveLoop(task.Loop, host)
|
|
|
|
loopVar := "item"
|
|
if task.LoopControl != nil && task.LoopControl.LoopVar != "" {
|
|
loopVar = task.LoopControl.LoopVar
|
|
}
|
|
|
|
// Save loop state to restore after loop
|
|
savedVars := make(map[string]any)
|
|
if v, ok := e.vars[loopVar]; ok {
|
|
savedVars[loopVar] = v
|
|
}
|
|
indexVar := ""
|
|
if task.LoopControl != nil && task.LoopControl.IndexVar != "" {
|
|
indexVar = task.LoopControl.IndexVar
|
|
if v, ok := e.vars[indexVar]; ok {
|
|
savedVars[indexVar] = v
|
|
}
|
|
}
|
|
|
|
var results []TaskResult
|
|
for i, item := range items {
|
|
// Set loop variables
|
|
e.vars[loopVar] = item
|
|
if indexVar != "" {
|
|
e.vars[indexVar] = i
|
|
}
|
|
|
|
result, err := e.executeModule(ctx, host, client, task, play)
|
|
if err != nil {
|
|
result = &TaskResult{Failed: true, Msg: err.Error()}
|
|
}
|
|
results = append(results, *result)
|
|
|
|
if result.Failed && !task.IgnoreErrors {
|
|
break
|
|
}
|
|
|
|
if task.LoopControl != nil && task.LoopControl.Pause > 0 && i < len(items)-1 {
|
|
time.Sleep(time.Duration(task.LoopControl.Pause) * time.Second)
|
|
}
|
|
}
|
|
|
|
// Restore loop variables
|
|
if v, ok := savedVars[loopVar]; ok {
|
|
e.vars[loopVar] = v
|
|
} else {
|
|
delete(e.vars, loopVar)
|
|
}
|
|
if indexVar != "" {
|
|
if v, ok := savedVars[indexVar]; ok {
|
|
e.vars[indexVar] = v
|
|
} else {
|
|
delete(e.vars, indexVar)
|
|
}
|
|
}
|
|
|
|
// Store combined result
|
|
if task.Register != "" {
|
|
combined := &TaskResult{
|
|
Results: results,
|
|
Changed: false,
|
|
}
|
|
for _, r := range results {
|
|
if r.Changed {
|
|
combined.Changed = true
|
|
}
|
|
if r.Failed {
|
|
combined.Failed = true
|
|
}
|
|
}
|
|
e.results[host][task.Register] = combined
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// runBlock handles block/rescue/always.
|
|
func (e *Executor) runBlock(ctx context.Context, hosts []string, task *Task, play *Play, failedHosts map[string]bool, totalHosts int, maxFailPercent int) error {
|
|
var blockErr error
|
|
|
|
// Try block
|
|
for _, t := range task.Block {
|
|
if err := e.runTaskOnHosts(ctx, hosts, &t, play, failedHosts, totalHosts, maxFailPercent); err != nil {
|
|
blockErr = err
|
|
break
|
|
}
|
|
}
|
|
|
|
// Run rescue if block failed
|
|
if blockErr != nil && len(task.Rescue) > 0 {
|
|
for _, t := range task.Rescue {
|
|
if err := e.runTaskOnHosts(ctx, hosts, &t, play, failedHosts, totalHosts, maxFailPercent); err != nil {
|
|
// Rescue also failed
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
// Always run always block
|
|
for _, t := range task.Always {
|
|
if err := e.runTaskOnHosts(ctx, hosts, &t, play, failedHosts, totalHosts, maxFailPercent); err != nil {
|
|
if blockErr == nil {
|
|
blockErr = err
|
|
}
|
|
}
|
|
}
|
|
|
|
if blockErr != nil && len(task.Rescue) == 0 {
|
|
return blockErr
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// runIncludeTasks handles include_tasks/import_tasks.
|
|
func (e *Executor) runIncludeTasks(ctx context.Context, hosts []string, task *Task, play *Play, failedHosts map[string]bool, totalHosts int, maxFailPercent int) error {
|
|
path := task.IncludeTasks
|
|
if path == "" {
|
|
path = task.ImportTasks
|
|
}
|
|
|
|
// Resolve path relative to playbook
|
|
path = e.templateString(path, "", nil)
|
|
|
|
tasks, err := e.parser.ParseTasks(path)
|
|
if err != nil {
|
|
return coreerr.E("Executor.runIncludeTasks", "include_tasks "+path, err)
|
|
}
|
|
|
|
for _, t := range tasks {
|
|
if err := e.runTaskOnHosts(ctx, hosts, &t, play, failedHosts, totalHosts, maxFailPercent); err != nil {
|
|
if isHostFailureError(err) {
|
|
continue
|
|
}
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// runIncludeRole handles include_role/import_role.
|
|
func (e *Executor) runIncludeRole(ctx context.Context, hosts []string, task *Task, play *Play, failedHosts map[string]bool, totalHosts int, maxFailPercent int) error {
|
|
var roleName, tasksFrom string
|
|
var roleVars map[string]any
|
|
|
|
if task.IncludeRole != nil {
|
|
roleName = task.IncludeRole.Name
|
|
tasksFrom = task.IncludeRole.TasksFrom
|
|
roleVars = task.IncludeRole.Vars
|
|
} else {
|
|
roleName = task.ImportRole.Name
|
|
tasksFrom = task.ImportRole.TasksFrom
|
|
roleVars = task.ImportRole.Vars
|
|
}
|
|
|
|
roleRef := &RoleRef{
|
|
Role: roleName,
|
|
TasksFrom: tasksFrom,
|
|
Vars: roleVars,
|
|
}
|
|
|
|
return e.runRole(ctx, hosts, roleRef, play, failedHosts, totalHosts, maxFailPercent)
|
|
}
|
|
|
|
// getHosts returns hosts matching the pattern.
|
|
func (e *Executor) getHosts(pattern string) []string {
|
|
if e.inventory == nil {
|
|
if pattern == "localhost" {
|
|
return []string{"localhost"}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
hosts := GetHosts(e.inventory, pattern)
|
|
|
|
// Apply limit - filter to hosts that are also in the limit group
|
|
if e.Limit != "" {
|
|
limitHosts := GetHosts(e.inventory, e.Limit)
|
|
limitSet := make(map[string]bool)
|
|
for _, h := range limitHosts {
|
|
limitSet[h] = true
|
|
}
|
|
|
|
var filtered []string
|
|
for _, h := range hosts {
|
|
if limitSet[h] || h == e.Limit || contains(h, e.Limit) {
|
|
filtered = append(filtered, h)
|
|
}
|
|
}
|
|
hosts = filtered
|
|
}
|
|
|
|
return hosts
|
|
}
|
|
|
|
// getClient returns or creates an SSH client for a host.
|
|
func (e *Executor) getClient(host string, play *Play) (*SSHClient, error) {
|
|
e.mu.Lock()
|
|
defer e.mu.Unlock()
|
|
|
|
if client, ok := e.clients[host]; ok {
|
|
return client, nil
|
|
}
|
|
|
|
// Get host vars
|
|
vars := make(map[string]any)
|
|
if e.inventory != nil {
|
|
vars = GetHostVars(e.inventory, host)
|
|
}
|
|
|
|
// Merge with play vars
|
|
for k, v := range e.vars {
|
|
if _, exists := vars[k]; !exists {
|
|
vars[k] = v
|
|
}
|
|
}
|
|
|
|
// Build SSH config
|
|
cfg := SSHConfig{
|
|
Host: host,
|
|
Port: 22,
|
|
User: "root",
|
|
}
|
|
|
|
if h, ok := vars["ansible_host"].(string); ok {
|
|
cfg.Host = h
|
|
}
|
|
if p, ok := vars["ansible_port"].(int); ok {
|
|
cfg.Port = p
|
|
}
|
|
if u, ok := vars["ansible_user"].(string); ok {
|
|
cfg.User = u
|
|
}
|
|
if p, ok := vars["ansible_password"].(string); ok {
|
|
cfg.Password = p
|
|
}
|
|
if k, ok := vars["ansible_ssh_private_key_file"].(string); ok {
|
|
cfg.KeyFile = k
|
|
}
|
|
|
|
// Apply play become settings
|
|
if play.Become {
|
|
cfg.Become = true
|
|
cfg.BecomeUser = play.BecomeUser
|
|
if bp, ok := vars["ansible_become_password"].(string); ok {
|
|
cfg.BecomePass = bp
|
|
} else if cfg.Password != "" {
|
|
// Use SSH password for sudo if no become password specified
|
|
cfg.BecomePass = cfg.Password
|
|
}
|
|
}
|
|
|
|
client, err := NewSSHClient(cfg)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
e.clients[host] = client
|
|
return client, nil
|
|
}
|
|
|
|
// gatherFacts collects facts from a host.
|
|
func (e *Executor) gatherFacts(ctx context.Context, host string, play *Play) error {
|
|
if play.Connection == "local" || host == "localhost" {
|
|
// Local facts
|
|
e.facts[host] = &Facts{
|
|
Hostname: "localhost",
|
|
}
|
|
return nil
|
|
}
|
|
|
|
client, err := e.getClient(host, play)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Gather basic facts
|
|
facts := &Facts{}
|
|
|
|
// Hostname
|
|
stdout, _, _, err := client.Run(ctx, "hostname -f 2>/dev/null || hostname")
|
|
if err == nil {
|
|
facts.FQDN = corexTrimSpace(stdout)
|
|
}
|
|
|
|
stdout, _, _, err = client.Run(ctx, "hostname -s 2>/dev/null || hostname")
|
|
if err == nil {
|
|
facts.Hostname = corexTrimSpace(stdout)
|
|
}
|
|
|
|
// OS info
|
|
stdout, _, _, _ = client.Run(ctx, "cat /etc/os-release 2>/dev/null | grep -E '^(ID|VERSION_ID)=' | head -2")
|
|
for _, line := range split(stdout, "\n") {
|
|
if corexHasPrefix(line, "ID=") {
|
|
facts.Distribution = trimCutset(corexTrimPrefix(line, "ID="), "\"")
|
|
}
|
|
if corexHasPrefix(line, "VERSION_ID=") {
|
|
facts.Version = trimCutset(corexTrimPrefix(line, "VERSION_ID="), "\"")
|
|
}
|
|
}
|
|
|
|
// Architecture
|
|
stdout, _, _, _ = client.Run(ctx, "uname -m")
|
|
facts.Architecture = corexTrimSpace(stdout)
|
|
|
|
// Kernel
|
|
stdout, _, _, _ = client.Run(ctx, "uname -r")
|
|
facts.Kernel = corexTrimSpace(stdout)
|
|
|
|
e.mu.Lock()
|
|
e.facts[host] = facts
|
|
e.mu.Unlock()
|
|
|
|
return nil
|
|
}
|
|
|
|
// evaluateWhen evaluates a when condition.
|
|
func (e *Executor) evaluateWhen(when any, host string, task *Task) bool {
|
|
conditions := normalizeConditions(when)
|
|
|
|
for _, cond := range conditions {
|
|
cond = e.templateString(cond, host, task)
|
|
if !e.evalCondition(cond, host) {
|
|
return false
|
|
}
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
func normalizeConditions(when any) []string {
|
|
switch v := when.(type) {
|
|
case string:
|
|
return []string{v}
|
|
case []any:
|
|
var conds []string
|
|
for _, c := range v {
|
|
if s, ok := c.(string); ok {
|
|
conds = append(conds, s)
|
|
}
|
|
}
|
|
return conds
|
|
case []string:
|
|
return v
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// evalCondition evaluates a single condition.
|
|
func (e *Executor) evalCondition(cond string, host string) bool {
|
|
cond = corexTrimSpace(cond)
|
|
|
|
// Handle negation
|
|
if corexHasPrefix(cond, "not ") {
|
|
return !e.evalCondition(corexTrimPrefix(cond, "not "), host)
|
|
}
|
|
|
|
// Handle boolean literals
|
|
if cond == "true" || cond == "True" {
|
|
return true
|
|
}
|
|
if cond == "false" || cond == "False" {
|
|
return false
|
|
}
|
|
|
|
// Handle registered variable checks
|
|
// e.g., "result is success", "result.rc == 0"
|
|
if contains(cond, " is ") {
|
|
parts := splitN(cond, " is ", 2)
|
|
varName := corexTrimSpace(parts[0])
|
|
check := corexTrimSpace(parts[1])
|
|
|
|
result := e.getRegisteredVar(host, varName)
|
|
if result == nil {
|
|
return check == "not defined" || check == "undefined"
|
|
}
|
|
|
|
switch check {
|
|
case "defined":
|
|
return true
|
|
case "not defined", "undefined":
|
|
return false
|
|
case "success", "succeeded":
|
|
return !result.Failed
|
|
case "failed":
|
|
return result.Failed
|
|
case "changed":
|
|
return result.Changed
|
|
case "skipped":
|
|
return result.Skipped
|
|
}
|
|
}
|
|
|
|
// Handle simple var checks
|
|
if contains(cond, " | default(") {
|
|
// Extract var name and check if defined
|
|
re := regexp.MustCompile(`(\w+)\s*\|\s*default\([^)]*\)`)
|
|
if match := re.FindStringSubmatch(cond); len(match) > 1 {
|
|
// Has default, so condition is satisfied
|
|
return true
|
|
}
|
|
}
|
|
|
|
// Check if it's a variable that should be truthy
|
|
if result := e.getRegisteredVar(host, cond); result != nil {
|
|
return !result.Failed && !result.Skipped
|
|
}
|
|
|
|
// Check vars
|
|
if val, ok := e.vars[cond]; ok {
|
|
switch v := val.(type) {
|
|
case bool:
|
|
return v
|
|
case string:
|
|
return v != "" && v != "false" && v != "False"
|
|
case int:
|
|
return v != 0
|
|
}
|
|
}
|
|
|
|
// Default to true for unknown conditions (be permissive)
|
|
return true
|
|
}
|
|
|
|
// getRegisteredVar gets a registered task result.
|
|
func (e *Executor) getRegisteredVar(host string, name string) *TaskResult {
|
|
e.mu.RLock()
|
|
defer e.mu.RUnlock()
|
|
|
|
// Handle dotted access (e.g., "result.stdout")
|
|
parts := splitN(name, ".", 2)
|
|
varName := parts[0]
|
|
|
|
if hostResults, ok := e.results[host]; ok {
|
|
if result, ok := hostResults[varName]; ok {
|
|
return result
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// templateString applies Jinja2-like templating.
|
|
func (e *Executor) templateString(s string, host string, task *Task) string {
|
|
// Handle {{ var }} syntax
|
|
re := regexp.MustCompile(`\{\{\s*([^}]+)\s*\}\}`)
|
|
|
|
return re.ReplaceAllStringFunc(s, func(match string) string {
|
|
expr := corexTrimSpace(match[2 : len(match)-2])
|
|
return e.resolveExpr(expr, host, task)
|
|
})
|
|
}
|
|
|
|
// resolveExpr resolves a template expression.
|
|
func (e *Executor) resolveExpr(expr string, host string, task *Task) string {
|
|
// Handle filters
|
|
if contains(expr, " | ") {
|
|
parts := splitN(expr, " | ", 2)
|
|
value := e.resolveExpr(parts[0], host, task)
|
|
return e.applyFilter(value, parts[1])
|
|
}
|
|
|
|
// Handle lookups
|
|
if corexHasPrefix(expr, "lookup(") {
|
|
return e.handleLookup(expr)
|
|
}
|
|
|
|
// Handle registered vars
|
|
if contains(expr, ".") {
|
|
parts := splitN(expr, ".", 2)
|
|
if result := e.getRegisteredVar(host, parts[0]); result != nil {
|
|
switch parts[1] {
|
|
case "stdout":
|
|
return result.Stdout
|
|
case "stderr":
|
|
return result.Stderr
|
|
case "rc":
|
|
return sprintf("%d", result.RC)
|
|
case "changed":
|
|
return sprintf("%t", result.Changed)
|
|
case "failed":
|
|
return sprintf("%t", result.Failed)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Check vars
|
|
if val, ok := e.vars[expr]; ok {
|
|
return sprintf("%v", val)
|
|
}
|
|
|
|
// Check task vars
|
|
if task != nil {
|
|
if val, ok := task.Vars[expr]; ok {
|
|
return sprintf("%v", val)
|
|
}
|
|
}
|
|
|
|
// Check host vars
|
|
if e.inventory != nil {
|
|
hostVars := GetHostVars(e.inventory, host)
|
|
if val, ok := hostVars[expr]; ok {
|
|
return sprintf("%v", val)
|
|
}
|
|
}
|
|
|
|
// Check facts
|
|
if facts, ok := e.facts[host]; ok {
|
|
switch expr {
|
|
case "ansible_hostname":
|
|
return facts.Hostname
|
|
case "ansible_fqdn":
|
|
return facts.FQDN
|
|
case "ansible_distribution":
|
|
return facts.Distribution
|
|
case "ansible_distribution_version":
|
|
return facts.Version
|
|
case "ansible_architecture":
|
|
return facts.Architecture
|
|
case "ansible_kernel":
|
|
return facts.Kernel
|
|
}
|
|
}
|
|
|
|
return "{{ " + expr + " }}" // Return as-is if unresolved
|
|
}
|
|
|
|
// applyFilter applies a Jinja2 filter.
|
|
func (e *Executor) applyFilter(value, filter string) string {
|
|
filter = corexTrimSpace(filter)
|
|
|
|
// Handle default filter
|
|
if corexHasPrefix(filter, "default(") {
|
|
if value == "" || value == "{{ "+filter+" }}" {
|
|
// Extract default value
|
|
re := regexp.MustCompile(`default\(([^)]*)\)`)
|
|
if match := re.FindStringSubmatch(filter); len(match) > 1 {
|
|
return trimCutset(match[1], "'\"")
|
|
}
|
|
}
|
|
return value
|
|
}
|
|
|
|
// Handle bool filter
|
|
if filter == "bool" {
|
|
lowered := lower(value)
|
|
if lowered == "true" || lowered == "yes" || lowered == "1" {
|
|
return "true"
|
|
}
|
|
return "false"
|
|
}
|
|
|
|
// Handle trim
|
|
if filter == "trim" {
|
|
return corexTrimSpace(value)
|
|
}
|
|
|
|
// Handle b64decode
|
|
if filter == "b64decode" {
|
|
// Would need base64 decode
|
|
return value
|
|
}
|
|
|
|
return value
|
|
}
|
|
|
|
// handleLookup handles lookup() expressions.
|
|
func (e *Executor) handleLookup(expr string) string {
|
|
// Parse lookup('type', 'arg')
|
|
re := regexp.MustCompile(`lookup\s*\(\s*['"](\w+)['"]\s*,\s*['"]([^'"]+)['"]\s*`)
|
|
match := re.FindStringSubmatch(expr)
|
|
if len(match) < 3 {
|
|
return ""
|
|
}
|
|
|
|
lookupType := match[1]
|
|
arg := match[2]
|
|
|
|
switch lookupType {
|
|
case "env":
|
|
return env(arg)
|
|
case "file":
|
|
if data, err := localFS.Read(arg); err == nil {
|
|
return data
|
|
}
|
|
}
|
|
|
|
return ""
|
|
}
|
|
|
|
// resolveLoop resolves loop items.
|
|
func (e *Executor) resolveLoop(loop any, host string) []any {
|
|
switch v := loop.(type) {
|
|
case []any:
|
|
return v
|
|
case []string:
|
|
items := make([]any, len(v))
|
|
for i, s := range v {
|
|
items[i] = s
|
|
}
|
|
return items
|
|
case string:
|
|
// Template the string and see if it's a var reference
|
|
resolved := e.templateString(v, host, nil)
|
|
if val, ok := e.vars[resolved]; ok {
|
|
if items, ok := val.([]any); ok {
|
|
return items
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// matchesTags checks if task tags match execution tags.
|
|
func (e *Executor) matchesTags(taskTags []string) bool {
|
|
// If no tags specified, run all
|
|
if len(e.Tags) == 0 && len(e.SkipTags) == 0 {
|
|
return true
|
|
}
|
|
|
|
// Check skip tags
|
|
for _, skip := range e.SkipTags {
|
|
if slices.Contains(taskTags, skip) {
|
|
return false
|
|
}
|
|
}
|
|
|
|
// Check include tags
|
|
if len(e.Tags) > 0 {
|
|
for _, tag := range e.Tags {
|
|
if tag == "all" || slices.Contains(taskTags, tag) {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
// handleNotify marks handlers as notified.
|
|
func (e *Executor) handleNotify(notify any) {
|
|
switch v := notify.(type) {
|
|
case string:
|
|
e.notified[v] = true
|
|
case []any:
|
|
for _, n := range v {
|
|
if s, ok := n.(string); ok {
|
|
e.notified[s] = true
|
|
}
|
|
}
|
|
case []string:
|
|
for _, s := range v {
|
|
e.notified[s] = true
|
|
}
|
|
}
|
|
}
|
|
|
|
// Close closes all SSH connections.
|
|
//
|
|
// Example:
|
|
//
|
|
// exec.Close()
|
|
func (e *Executor) Close() {
|
|
e.mu.Lock()
|
|
defer e.mu.Unlock()
|
|
|
|
for _, client := range e.clients {
|
|
_ = client.Close()
|
|
}
|
|
e.clients = make(map[string]*SSHClient)
|
|
}
|
|
|
|
// TemplateFile processes a template file.
|
|
//
|
|
// Example:
|
|
//
|
|
// content, err := exec.TemplateFile("/workspace/templates/app.conf.j2", "web1", &Task{})
|
|
func (e *Executor) TemplateFile(src, host string, task *Task) (string, error) {
|
|
content, err := localFS.Read(src)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
// Convert Jinja2 to Go template syntax (basic conversion)
|
|
tmplContent := content
|
|
tmplContent = replaceAll(tmplContent, "{{", "{{ .")
|
|
tmplContent = replaceAll(tmplContent, "{%", "{{")
|
|
tmplContent = replaceAll(tmplContent, "%}", "}}")
|
|
|
|
tmpl, err := template.New("template").Parse(tmplContent)
|
|
if err != nil {
|
|
// Fall back to simple replacement
|
|
return e.templateString(content, host, task), nil
|
|
}
|
|
|
|
// Build context map
|
|
context := make(map[string]any)
|
|
for k, v := range e.vars {
|
|
context[k] = v
|
|
}
|
|
// Add host vars
|
|
if e.inventory != nil {
|
|
hostVars := GetHostVars(e.inventory, host)
|
|
for k, v := range hostVars {
|
|
context[k] = v
|
|
}
|
|
}
|
|
// Add facts
|
|
if facts, ok := e.facts[host]; ok {
|
|
context["ansible_hostname"] = facts.Hostname
|
|
context["ansible_fqdn"] = facts.FQDN
|
|
context["ansible_distribution"] = facts.Distribution
|
|
context["ansible_distribution_version"] = facts.Version
|
|
context["ansible_architecture"] = facts.Architecture
|
|
context["ansible_kernel"] = facts.Kernel
|
|
}
|
|
|
|
buf := newBuilder()
|
|
if err := tmpl.Execute(buf, context); err != nil {
|
|
return e.templateString(content, host, task), nil
|
|
}
|
|
|
|
return buf.String(), nil
|
|
}
|