go-ansible/executor.go

1954 lines
44 KiB
Go
Raw Normal View History

package ansible
import (
"context"
2026-04-01 20:14:40 +00:00
"encoding/base64"
"errors"
"io"
"io/fs"
"regexp"
"slices"
2026-04-01 19:54:13 +00:00
"strconv"
"strings"
"sync"
"text/template"
"time"
coreio "dappco.re/go/core/io"
coreerr "dappco.re/go/core/log"
)
var errEndPlay = errors.New("end play")
var errEndHost = errors.New("end host")
// sshExecutorClient is the client contract used by the executor.
type sshExecutorClient interface {
Run(ctx context.Context, cmd string) (stdout, stderr string, exitCode int, err error)
RunScript(ctx context.Context, script string) (stdout, stderr string, exitCode int, err error)
Upload(ctx context.Context, local io.Reader, remote string, mode fs.FileMode) error
Download(ctx context.Context, remote string) ([]byte, error)
FileExists(ctx context.Context, path string) (bool, error)
Stat(ctx context.Context, path string) (map[string]any, error)
BecomeState() (become bool, user, password string)
SetBecome(become bool, user, password string)
Close() error
}
2026-04-01 20:51:57 +00:00
// environmentSSHClient wraps another SSH client and prefixes commands with
// shell exports so play/task environment variables reach remote execution.
type environmentSSHClient struct {
sshExecutorClient
prefix string
}
func (c *environmentSSHClient) Run(ctx context.Context, cmd string) (string, string, int, error) {
return c.sshExecutorClient.Run(ctx, c.prefix+cmd)
}
func (c *environmentSSHClient) RunScript(ctx context.Context, script string) (string, string, int, error) {
return c.sshExecutorClient.RunScript(ctx, c.prefix+script)
}
// Executor runs Ansible playbooks.
//
// Example:
//
// exec := NewExecutor("/workspace/playbooks")
type Executor struct {
2026-04-01 20:17:29 +00:00
parser *Parser
inventory *Inventory
vars map[string]any
facts map[string]*Facts
results map[string]map[string]*TaskResult // host -> register_name -> result
handlers map[string][]Task
notified map[string]bool
clients map[string]sshExecutorClient
batchFailedHosts map[string]bool
endedHosts map[string]bool
2026-04-01 20:17:29 +00:00
mu sync.RWMutex
// Callbacks
OnPlayStart func(play *Play)
OnTaskStart func(host string, task *Task)
OnTaskEnd func(host string, task *Task, result *TaskResult)
OnPlayEnd func(play *Play)
// Options
Limit string
Tags []string
SkipTags []string
CheckMode bool
Diff bool
Verbose int
}
// NewExecutor creates a new playbook executor.
//
// Example:
//
// exec := NewExecutor("/workspace/playbooks")
func NewExecutor(basePath string) *Executor {
return &Executor{
parser: NewParser(basePath),
vars: make(map[string]any),
facts: make(map[string]*Facts),
results: make(map[string]map[string]*TaskResult),
handlers: make(map[string][]Task),
notified: make(map[string]bool),
clients: make(map[string]sshExecutorClient),
endedHosts: make(map[string]bool),
}
}
// SetInventory loads inventory from a file.
//
// Example:
//
// err := exec.SetInventory("/workspace/inventory.yml")
func (e *Executor) SetInventory(path string) error {
inv, err := e.parser.ParseInventory(path)
if err != nil {
return err
}
e.inventory = inv
return nil
}
// SetInventoryDirect sets inventory directly.
//
// Example:
//
// exec.SetInventoryDirect(&Inventory{All: &InventoryGroup{}})
func (e *Executor) SetInventoryDirect(inv *Inventory) {
e.inventory = inv
}
// SetVar sets a variable.
//
// Example:
//
// exec.SetVar("env", "prod")
func (e *Executor) SetVar(key string, value any) {
e.mu.Lock()
defer e.mu.Unlock()
e.vars[key] = value
}
// Run executes a playbook.
//
// Example:
//
// err := exec.Run(context.Background(), "/workspace/playbooks/site.yml")
func (e *Executor) Run(ctx context.Context, playbookPath string) error {
plays, err := e.parser.ParsePlaybook(playbookPath)
if err != nil {
return coreerr.E("Executor.Run", "parse playbook", err)
}
for i := range plays {
if err := e.runPlay(ctx, &plays[i]); err != nil {
return coreerr.E("Executor.Run", sprintf("play %d (%s)", i, plays[i].Name), err)
}
}
return nil
}
// runPlay executes a single play.
func (e *Executor) runPlay(ctx context.Context, play *Play) error {
if e.OnPlayStart != nil {
e.OnPlayStart(play)
}
defer func() {
if e.OnPlayEnd != nil {
e.OnPlayEnd(play)
}
}()
// Get target hosts
hosts := e.getHosts(play.Hosts)
if len(hosts) == 0 {
return nil // No hosts matched
}
e.endedHosts = make(map[string]bool)
// Merge play vars
for k, v := range play.Vars {
e.vars[k] = v
}
2026-04-01 19:54:13 +00:00
for _, batch := range splitSerialHosts(hosts, play.Serial) {
if len(batch) == 0 {
continue
}
batch = e.filterActiveHosts(batch)
2026-04-01 19:54:13 +00:00
if len(batch) == 0 {
continue
}
2026-04-01 20:17:29 +00:00
e.batchFailedHosts = make(map[string]bool)
2026-04-01 19:54:13 +00:00
// Gather facts if needed
gatherFacts := play.GatherFacts == nil || *play.GatherFacts
if gatherFacts {
for _, host := range batch {
if err := e.gatherFacts(ctx, host, play); err != nil {
// Non-fatal
if e.Verbose > 0 {
coreerr.Warn("gather facts failed", "host", host, "err", err)
}
}
}
}
2026-04-01 19:54:13 +00:00
// Execute pre_tasks
for _, task := range play.PreTasks {
if err := e.runTaskOnHosts(ctx, batch, &task, play); err != nil {
if errors.Is(err, errEndPlay) {
return nil
}
2026-04-01 19:54:13 +00:00
return err
}
}
2026-04-01 19:54:13 +00:00
// Execute roles
for _, roleRef := range play.Roles {
if err := e.runRole(ctx, batch, &roleRef, play); err != nil {
if errors.Is(err, errEndPlay) {
return nil
}
2026-04-01 19:54:13 +00:00
return err
}
}
// Execute tasks
for _, task := range play.Tasks {
if err := e.runTaskOnHosts(ctx, batch, &task, play); err != nil {
if errors.Is(err, errEndPlay) {
return nil
}
2026-04-01 19:54:13 +00:00
return err
}
}
// Execute post_tasks
for _, task := range play.PostTasks {
if err := e.runTaskOnHosts(ctx, batch, &task, play); err != nil {
if errors.Is(err, errEndPlay) {
return nil
}
2026-04-01 19:54:13 +00:00
return err
}
}
// Run notified handlers for this batch.
if err := e.runNotifiedHandlers(ctx, batch, play); err != nil {
if errors.Is(err, errEndPlay) {
return nil
}
return err
}
}
2026-04-01 19:54:13 +00:00
return nil
}
// splitSerialHosts splits a host list into serial batches.
func splitSerialHosts(hosts []string, serial any) [][]string {
batchSize := resolveSerialBatchSize(serial, len(hosts))
if batchSize <= 0 || batchSize >= len(hosts) {
if len(hosts) == 0 {
return nil
}
2026-04-01 19:54:13 +00:00
return [][]string{hosts}
}
2026-04-01 19:54:13 +00:00
batches := make([][]string, 0, (len(hosts)+batchSize-1)/batchSize)
for len(hosts) > 0 {
size := batchSize
if size > len(hosts) {
size = len(hosts)
}
2026-04-01 19:54:13 +00:00
batch := append([]string(nil), hosts[:size]...)
batches = append(batches, batch)
hosts = hosts[size:]
}
2026-04-01 19:54:13 +00:00
return batches
}
2026-04-01 19:54:13 +00:00
// resolveSerialBatchSize converts a play serial value into a concrete batch size.
func resolveSerialBatchSize(serial any, total int) int {
if total <= 0 {
return 0
}
2026-04-01 19:54:13 +00:00
switch v := serial.(type) {
case nil:
return total
case int:
if v > 0 {
return v
}
case int8:
if v > 0 {
return int(v)
}
case int16:
if v > 0 {
return int(v)
}
case int32:
if v > 0 {
return int(v)
}
case int64:
if v > 0 {
return int(v)
}
case uint:
if v > 0 {
return int(v)
}
case uint8:
if v > 0 {
return int(v)
}
case uint16:
if v > 0 {
return int(v)
}
case uint32:
if v > 0 {
return int(v)
}
case uint64:
if v > 0 {
return int(v)
}
case string:
s := corexTrimSpace(v)
if s == "" {
return total
}
if corexHasSuffix(s, "%") {
percent, err := strconv.Atoi(strings.TrimSuffix(s, "%"))
if err == nil && percent > 0 {
size := (total*percent + 99) / 100
if size < 1 {
size = 1
}
if size > total {
size = total
}
return size
}
return total
}
if n, err := strconv.Atoi(s); err == nil && n > 0 {
return n
}
}
return total
}
// runRole executes a role on hosts.
func (e *Executor) runRole(ctx context.Context, hosts []string, roleRef *RoleRef, play *Play) error {
// Merge role vars
oldVars := make(map[string]any)
for k, v := range e.vars {
oldVars[k] = v
}
for k, v := range roleRef.Vars {
e.vars[k] = v
}
2026-04-01 20:32:50 +00:00
eligibleHosts := make([]string, 0, len(hosts))
for _, host := range hosts {
if roleRef.When == nil || e.evaluateWhen(roleRef.When, host, nil) {
eligibleHosts = append(eligibleHosts, host)
}
}
if len(eligibleHosts) == 0 {
e.vars = oldVars
return nil
}
// Parse role tasks
tasks, err := e.parser.ParseRole(roleRef.Role, roleRef.TasksFrom)
if err != nil {
e.vars = oldVars
return coreerr.E("executor.runRole", sprintf("parse role %s", roleRef.Role), err)
}
// Execute tasks
for _, task := range tasks {
2026-04-01 20:30:15 +00:00
effectiveTask := task
if len(roleRef.Tags) > 0 {
effectiveTask.Tags = append(append([]string(nil), roleRef.Tags...), task.Tags...)
}
2026-04-01 20:32:50 +00:00
if err := e.runTaskOnHosts(ctx, eligibleHosts, &effectiveTask, play); err != nil {
// Restore vars
e.vars = oldVars
return err
}
}
// Restore vars
e.vars = oldVars
return nil
}
// runTaskOnHosts runs a task on all hosts.
func (e *Executor) runTaskOnHosts(ctx context.Context, hosts []string, task *Task, play *Play) error {
// Check tags
if !e.matchesTags(task.Tags) {
return nil
}
// run_once executes the task against a single host, then shares any
// registered result with the rest of the host set.
if task.RunOnce && len(hosts) > 0 {
single := *task
single.RunOnce = false
if err := e.runTaskOnHosts(ctx, hosts[:1], &single, play); err != nil {
return err
}
e.copyRegisteredResultToHosts(hosts, hosts[0], task.Register)
return nil
}
// Handle block tasks
if len(task.Block) > 0 {
return e.runBlock(ctx, hosts, task, play)
}
// Handle include/import
if task.IncludeTasks != "" || task.ImportTasks != "" {
return e.runIncludeTasks(ctx, hosts, task, play)
}
if task.IncludeRole != nil || task.ImportRole != nil {
return e.runIncludeRole(ctx, hosts, task, play)
}
for _, host := range hosts {
if e.isHostEnded(host) {
continue
}
2026-04-01 19:51:23 +00:00
if err := e.runTaskOnHost(ctx, host, hosts, task, play); err != nil {
if errors.Is(err, errEndHost) {
continue
}
if !task.IgnoreErrors {
return err
}
}
2026-04-01 20:17:29 +00:00
if err := e.checkMaxFailPercentage(play, hosts); err != nil {
return err
}
}
return nil
}
// copyRegisteredResultToHosts shares a registered task result from one host to
// the rest of the current host set.
func (e *Executor) copyRegisteredResultToHosts(hosts []string, sourceHost, register string) {
if register == "" {
return
}
sourceResults := e.results[sourceHost]
if sourceResults == nil {
return
}
result, ok := sourceResults[register]
if !ok || result == nil {
return
}
for _, host := range hosts {
if host == sourceHost {
continue
}
if e.results[host] == nil {
e.results[host] = make(map[string]*TaskResult)
}
clone := *result
if result.Results != nil {
clone.Results = append([]TaskResult(nil), result.Results...)
}
if result.Data != nil {
clone.Data = make(map[string]any, len(result.Data))
for k, v := range result.Data {
clone.Data[k] = v
}
}
e.results[host][register] = &clone
}
}
// runTaskOnHost runs a task on a single host.
2026-04-01 19:51:23 +00:00
func (e *Executor) runTaskOnHost(ctx context.Context, host string, hosts []string, task *Task, play *Play) error {
if e.isHostEnded(host) {
return nil
}
start := time.Now()
if e.OnTaskStart != nil {
e.OnTaskStart(host, task)
}
// Initialize host results
if e.results[host] == nil {
e.results[host] = make(map[string]*TaskResult)
}
// Check when condition
if task.When != nil {
if !e.evaluateWhen(task.When, host, task) {
result := &TaskResult{Skipped: true, Msg: "Skipped due to when condition"}
if task.Register != "" {
e.results[host][task.Register] = result
}
if e.OnTaskEnd != nil {
e.OnTaskEnd(host, task, result)
}
return nil
}
}
// Honour check mode for tasks that would mutate state.
if e.CheckMode && !isCheckModeSafeTask(task) {
result := &TaskResult{Skipped: true, Msg: "Skipped in check mode"}
if task.Register != "" {
e.results[host][task.Register] = result
}
if e.OnTaskEnd != nil {
e.OnTaskEnd(host, task, result)
}
return nil
}
// Get SSH client
executionHost := host
if task.Delegate != "" {
executionHost = e.templateString(task.Delegate, host, task)
if executionHost == "" {
executionHost = host
}
}
client, err := e.getClient(executionHost, play)
if err != nil {
return coreerr.E("Executor.runTaskOnHost", sprintf("get client for %s", executionHost), err)
}
// Handle loops
if task.Loop != nil {
return e.runLoop(ctx, host, client, task, play)
}
// Execute the task, honouring retries/until when configured.
result, err := e.runTaskWithRetries(ctx, host, task, play, func() (*TaskResult, error) {
return e.executeModule(ctx, host, client, task, play)
})
if err != nil {
result = &TaskResult{Failed: true, Msg: err.Error()}
}
result.Duration = time.Since(start)
// Store result
if task.Register != "" {
e.results[host][task.Register] = result
}
// Handle notify
if result.Changed && task.Notify != nil {
e.handleNotify(task.Notify)
}
if e.OnTaskEnd != nil {
e.OnTaskEnd(host, task, result)
}
2026-04-01 19:51:23 +00:00
if NormalizeModule(task.Module) == "ansible.builtin.meta" {
if err := e.handleMetaAction(ctx, host, hosts, play, result); err != nil {
2026-04-01 19:51:23 +00:00
return err
}
}
if result.Failed && !task.IgnoreErrors {
2026-04-01 20:17:29 +00:00
e.markBatchHostFailed(host)
return coreerr.E("Executor.runTaskOnHost", "task failed: "+result.Msg, nil)
}
2026-04-01 20:17:29 +00:00
if result.Failed {
e.markBatchHostFailed(host)
}
return nil
}
func (e *Executor) markBatchHostFailed(host string) {
if host == "" {
return
}
if e.batchFailedHosts == nil {
e.batchFailedHosts = make(map[string]bool)
}
e.batchFailedHosts[host] = true
}
func (e *Executor) checkMaxFailPercentage(play *Play, hosts []string) error {
if play == nil || play.MaxFailPercent <= 0 || len(hosts) == 0 {
return nil
}
threshold := play.MaxFailPercent
failed := 0
for _, host := range hosts {
if e.batchFailedHosts != nil && e.batchFailedHosts[host] {
failed++
}
}
if failed == 0 {
return nil
}
percentage := (failed * 100) / len(hosts)
if percentage > threshold {
return coreerr.E("Executor.runTaskOnHosts", sprintf("max fail percentage exceeded: %d%% failed (threshold %d%%)", percentage, threshold), nil)
}
return nil
}
// runTaskWithRetries executes a task once or multiple times when retries,
// delay, or until are configured.
func (e *Executor) runTaskWithRetries(ctx context.Context, host string, task *Task, play *Play, execute func() (*TaskResult, error)) (*TaskResult, error) {
attempts := 1
if task != nil {
if task.Until != "" {
if task.Retries > 0 {
attempts = task.Retries + 1
} else {
attempts = 4
}
} else if task.Retries > 0 {
attempts = task.Retries + 1
}
}
var result *TaskResult
var err error
for attempt := 1; attempt <= attempts; attempt++ {
result, err = execute()
if err != nil {
result = &TaskResult{Failed: true, Msg: err.Error()}
}
if result == nil {
result = &TaskResult{}
}
e.applyTaskResultConditions(host, task, result)
if !shouldRetryTask(task, host, e, result) || attempt == attempts {
break
}
if task != nil && task.Delay > 0 {
timer := time.NewTimer(time.Duration(task.Delay) * time.Second)
select {
case <-ctx.Done():
timer.Stop()
return result, ctx.Err()
case <-timer.C:
}
}
}
return result, nil
}
func shouldRetryTask(task *Task, host string, e *Executor, result *TaskResult) bool {
if task == nil || result == nil {
return false
}
if task.Until != "" {
return !e.evaluateWhenWithLocals(task.Until, host, task, map[string]any{
"result": result,
"stdout": result.Stdout,
"stderr": result.Stderr,
"rc": result.RC,
"changed": result.Changed,
"failed": result.Failed,
"skipped": result.Skipped,
"msg": result.Msg,
"duration": result.Duration,
})
}
if task.Retries > 0 {
return result.Failed
}
return false
}
// runLoop handles task loops.
func (e *Executor) runLoop(ctx context.Context, host string, client sshExecutorClient, task *Task, play *Play) error {
items := e.resolveLoop(task.Loop, host)
loopVar := "item"
if task.LoopControl != nil && task.LoopControl.LoopVar != "" {
loopVar = task.LoopControl.LoopVar
}
// Save loop state to restore after loop
savedVars := make(map[string]any)
if v, ok := e.vars[loopVar]; ok {
savedVars[loopVar] = v
}
indexVar := ""
if task.LoopControl != nil && task.LoopControl.IndexVar != "" {
indexVar = task.LoopControl.IndexVar
if v, ok := e.vars[indexVar]; ok {
savedVars[indexVar] = v
}
}
var savedLoopMeta any
if task.LoopControl != nil && task.LoopControl.Extended {
if v, ok := e.vars["ansible_loop"]; ok {
savedLoopMeta = v
}
}
var results []TaskResult
for i, item := range items {
// Set loop variables
e.vars[loopVar] = item
if indexVar != "" {
e.vars[indexVar] = i
}
if task.LoopControl != nil && task.LoopControl.Extended {
loopMeta := map[string]any{
"index": i + 1,
"index0": i,
"first": i == 0,
"last": i == len(items)-1,
"length": len(items),
"revindex": len(items) - i,
"revindex0": len(items) - i - 1,
}
if task.LoopControl.Label != "" {
loopMeta["label"] = e.templateString(task.LoopControl.Label, host, task)
}
e.vars["ansible_loop"] = loopMeta
}
result, err := e.runTaskWithRetries(ctx, host, task, play, func() (*TaskResult, error) {
return e.executeModule(ctx, host, client, task, play)
})
if err != nil {
result = &TaskResult{Failed: true, Msg: err.Error()}
}
results = append(results, *result)
if task.LoopControl != nil && task.LoopControl.Pause > 0 && i < len(items)-1 {
timer := time.NewTimer(time.Duration(task.LoopControl.Pause) * time.Second)
select {
case <-ctx.Done():
timer.Stop()
return ctx.Err()
case <-timer.C:
}
}
if result.Failed && !task.IgnoreErrors {
break
}
}
// Restore loop variables
if v, ok := savedVars[loopVar]; ok {
e.vars[loopVar] = v
} else {
delete(e.vars, loopVar)
}
if indexVar != "" {
if v, ok := savedVars[indexVar]; ok {
e.vars[indexVar] = v
} else {
delete(e.vars, indexVar)
}
}
if task.LoopControl != nil && task.LoopControl.Extended {
if savedLoopMeta != nil {
e.vars["ansible_loop"] = savedLoopMeta
} else {
delete(e.vars, "ansible_loop")
}
}
// Store combined result
if task.Register != "" {
combined := &TaskResult{
Results: results,
Changed: false,
}
for _, r := range results {
if r.Changed {
combined.Changed = true
}
if r.Failed {
combined.Failed = true
}
}
e.results[host][task.Register] = combined
}
return nil
}
// isCheckModeSafeTask reports whether a task can run without changing state
// during check mode.
func isCheckModeSafeTask(task *Task) bool {
if task == nil {
return true
}
if len(task.Block) > 0 || len(task.Rescue) > 0 || len(task.Always) > 0 {
return true
}
if task.IncludeTasks != "" || task.ImportTasks != "" {
return true
}
if task.IncludeRole != nil || task.ImportRole != nil {
return true
}
switch NormalizeModule(task.Module) {
case "ansible.builtin.debug",
"ansible.builtin.fail",
"ansible.builtin.assert",
"ansible.builtin.pause",
"ansible.builtin.wait_for",
"ansible.builtin.stat",
"ansible.builtin.slurp",
"ansible.builtin.include_vars",
"ansible.builtin.meta",
"ansible.builtin.set_fact",
"ansible.builtin.add_host",
"ansible.builtin.group_by",
"ansible.builtin.setup":
return true
default:
return false
}
}
// runBlock handles block/rescue/always.
func (e *Executor) runBlock(ctx context.Context, hosts []string, task *Task, play *Play) error {
var blockErr error
// Try block
for _, t := range task.Block {
if err := e.runTaskOnHosts(ctx, hosts, &t, play); err != nil {
blockErr = err
break
}
}
// Run rescue if block failed
if blockErr != nil && len(task.Rescue) > 0 {
for _, t := range task.Rescue {
if err := e.runTaskOnHosts(ctx, hosts, &t, play); err != nil {
// Rescue also failed
break
}
}
}
// Always run always block
for _, t := range task.Always {
if err := e.runTaskOnHosts(ctx, hosts, &t, play); err != nil {
if blockErr == nil {
blockErr = err
}
}
}
if blockErr != nil && len(task.Rescue) == 0 {
return blockErr
}
return nil
}
// runIncludeTasks handles include_tasks/import_tasks.
func (e *Executor) runIncludeTasks(ctx context.Context, hosts []string, task *Task, play *Play) error {
path := task.IncludeTasks
if path == "" {
path = task.ImportTasks
}
// Resolve path relative to playbook
path = e.templateString(path, "", nil)
tasks, err := e.parser.ParseTasks(path)
if err != nil {
return coreerr.E("Executor.runIncludeTasks", "include_tasks "+path, err)
}
for _, t := range tasks {
if err := e.runTaskOnHosts(ctx, hosts, &t, play); err != nil {
return err
}
}
return nil
}
// runIncludeRole handles include_role/import_role.
func (e *Executor) runIncludeRole(ctx context.Context, hosts []string, task *Task, play *Play) error {
var roleName, tasksFrom string
var roleVars map[string]any
if task.IncludeRole != nil {
roleName = task.IncludeRole.Name
tasksFrom = task.IncludeRole.TasksFrom
roleVars = task.IncludeRole.Vars
} else {
roleName = task.ImportRole.Name
tasksFrom = task.ImportRole.TasksFrom
roleVars = task.ImportRole.Vars
}
roleRef := &RoleRef{
Role: roleName,
TasksFrom: tasksFrom,
Vars: roleVars,
}
return e.runRole(ctx, hosts, roleRef, play)
}
// getHosts returns hosts matching the pattern.
func (e *Executor) getHosts(pattern string) []string {
if e.inventory == nil {
if pattern == "localhost" {
return []string{"localhost"}
}
return nil
}
hosts := GetHosts(e.inventory, pattern)
// Apply limit - filter to hosts that are also in the limit group
if e.Limit != "" {
limitHosts := GetHosts(e.inventory, e.Limit)
limitSet := make(map[string]bool)
for _, h := range limitHosts {
limitSet[h] = true
}
var filtered []string
for _, h := range hosts {
if limitSet[h] || h == e.Limit || contains(h, e.Limit) {
filtered = append(filtered, h)
}
}
hosts = filtered
}
return hosts
}
// getClient returns or creates an SSH client for a host.
func (e *Executor) getClient(host string, play *Play) (sshExecutorClient, error) {
e.mu.Lock()
defer e.mu.Unlock()
if client, ok := e.clients[host]; ok {
return client, nil
}
// Get host vars
vars := make(map[string]any)
if e.inventory != nil {
vars = GetHostVars(e.inventory, host)
}
// Merge with play vars
for k, v := range e.vars {
if _, exists := vars[k]; !exists {
vars[k] = v
}
}
// Build SSH config
cfg := SSHConfig{
Host: host,
Port: 22,
User: "root",
}
if h, ok := vars["ansible_host"].(string); ok {
cfg.Host = h
}
if p, ok := vars["ansible_port"].(int); ok {
cfg.Port = p
}
if u, ok := vars["ansible_user"].(string); ok {
cfg.User = u
}
if p, ok := vars["ansible_password"].(string); ok {
cfg.Password = p
}
if k, ok := vars["ansible_ssh_private_key_file"].(string); ok {
cfg.KeyFile = k
}
// Apply play become settings
if play.Become {
cfg.Become = true
cfg.BecomeUser = play.BecomeUser
if bp, ok := vars["ansible_become_password"].(string); ok {
cfg.BecomePass = bp
} else if cfg.Password != "" {
// Use SSH password for sudo if no become password specified
cfg.BecomePass = cfg.Password
}
}
client, err := NewSSHClient(cfg)
if err != nil {
return nil, err
}
e.clients[host] = client
return client, nil
}
// gatherFacts collects facts from a host.
func (e *Executor) gatherFacts(ctx context.Context, host string, play *Play) error {
if play.Connection == "local" || host == "localhost" {
// Local facts
e.facts[host] = &Facts{
Hostname: "localhost",
}
return nil
}
client, err := e.getClient(host, play)
if err != nil {
return err
}
facts, err := e.collectFacts(ctx, client)
if err != nil {
return err
}
e.mu.Lock()
e.facts[host] = facts
e.mu.Unlock()
return nil
}
// evaluateWhen evaluates a when condition.
func (e *Executor) evaluateWhen(when any, host string, task *Task) bool {
return e.evaluateWhenWithLocals(when, host, task, nil)
}
func (e *Executor) evaluateWhenWithLocals(when any, host string, task *Task, locals map[string]any) bool {
conditions := normalizeConditions(when)
for _, cond := range conditions {
cond = e.templateString(cond, host, task)
if !e.evalConditionWithLocals(cond, host, task, locals) {
return false
}
}
return true
}
func normalizeConditions(when any) []string {
switch v := when.(type) {
case string:
return []string{v}
case []any:
var conds []string
for _, c := range v {
if s, ok := c.(string); ok {
conds = append(conds, s)
}
}
return conds
case []string:
return v
}
return nil
}
// evalCondition evaluates a single condition.
func (e *Executor) evalCondition(cond string, host string) bool {
return e.evalConditionWithLocals(cond, host, nil, nil)
}
func (e *Executor) evalConditionWithLocals(cond string, host string, task *Task, locals map[string]any) bool {
cond = corexTrimSpace(cond)
// Handle negation
if corexHasPrefix(cond, "not ") {
return !e.evalConditionWithLocals(corexTrimPrefix(cond, "not "), host, task, locals)
}
// Handle equality/inequality
if contains(cond, "==") {
parts := splitN(cond, "==", 2)
if len(parts) == 2 {
left, leftOK := e.resolveConditionOperand(parts[0], host, task, locals)
right, rightOK := e.resolveConditionOperand(parts[1], host, task, locals)
if !leftOK || !rightOK {
return true
}
return left == right
}
}
if contains(cond, "!=") {
parts := splitN(cond, "!=", 2)
if len(parts) == 2 {
left, leftOK := e.resolveConditionOperand(parts[0], host, task, locals)
right, rightOK := e.resolveConditionOperand(parts[1], host, task, locals)
if !leftOK || !rightOK {
return true
}
return left != right
}
}
// Handle boolean literals
if cond == "true" || cond == "True" {
return true
}
if cond == "false" || cond == "False" {
return false
}
// Handle registered variable checks
// e.g., "result is success", "result.rc == 0"
if contains(cond, " is ") {
parts := splitN(cond, " is ", 2)
varName := corexTrimSpace(parts[0])
check := corexTrimSpace(parts[1])
if result, ok := e.lookupConditionValue(varName, host, task, locals); ok {
switch v := result.(type) {
case *TaskResult:
switch check {
case "defined":
return true
case "not defined", "undefined":
return false
case "success", "succeeded":
return !v.Failed
case "failed":
return v.Failed
case "changed":
return v.Changed
case "skipped":
return v.Skipped
}
case TaskResult:
switch check {
case "defined":
return true
case "not defined", "undefined":
return false
case "success", "succeeded":
return !v.Failed
case "failed":
return v.Failed
case "changed":
return v.Changed
case "skipped":
return v.Skipped
}
}
return true
}
if check == "not defined" || check == "undefined" {
return true
}
return false
}
// Handle simple var checks
if contains(cond, " | default(") {
// Extract var name and check if defined
re := regexp.MustCompile(`(\w+)\s*\|\s*default\([^)]*\)`)
if match := re.FindStringSubmatch(cond); len(match) > 1 {
// Has default, so condition is satisfied
return true
}
}
// Check if it's a variable that should be truthy
if result, ok := e.lookupConditionValue(cond, host, task, locals); ok {
switch v := result.(type) {
case *TaskResult:
return !v.Failed && !v.Skipped
case TaskResult:
return !v.Failed && !v.Skipped
case bool:
return v
case string:
return v != "" && v != "false" && v != "False"
case int:
return v != 0
case int64:
return v != 0
case float64:
return v != 0
}
}
// Check vars
if val, ok := e.vars[cond]; ok {
switch v := val.(type) {
case bool:
return v
case string:
return v != "" && v != "false" && v != "False"
case int:
return v != 0
}
}
// Default to true for unknown conditions (be permissive)
return true
}
func (e *Executor) lookupConditionValue(name string, host string, task *Task, locals map[string]any) (any, bool) {
name = corexTrimSpace(name)
if locals != nil {
if val, ok := locals[name]; ok {
return val, true
}
parts := splitN(name, ".", 2)
if len(parts) == 2 {
if base, ok := locals[parts[0]]; ok {
if value, ok := taskResultField(base, parts[1]); ok {
return value, true
}
}
}
}
if result := e.getRegisteredVar(host, name); result != nil {
if len(splitN(name, ".", 2)) == 2 {
parts := splitN(name, ".", 2)
if value, ok := taskResultField(result, parts[1]); ok {
return value, true
}
}
return result, true
}
if val, ok := e.vars[name]; ok {
return val, true
}
if task != nil {
if val, ok := task.Vars[name]; ok {
return val, true
}
}
if e.inventory != nil {
hostVars := GetHostVars(e.inventory, host)
if val, ok := hostVars[name]; ok {
return val, true
}
}
if facts, ok := e.facts[host]; ok {
switch name {
case "ansible_hostname":
return facts.Hostname, true
case "ansible_fqdn":
return facts.FQDN, true
case "ansible_os_family":
return facts.OS, true
case "ansible_memtotal_mb":
return facts.Memory, true
case "ansible_processor_vcpus":
return facts.CPUs, true
case "ansible_default_ipv4_address":
return facts.IPv4, true
case "ansible_distribution":
return facts.Distribution, true
case "ansible_distribution_version":
return facts.Version, true
case "ansible_architecture":
return facts.Architecture, true
case "ansible_kernel":
return facts.Kernel, true
}
}
return nil, false
}
func taskResultField(value any, field string) (any, bool) {
switch v := value.(type) {
case *TaskResult:
return taskResultField(*v, field)
case TaskResult:
switch field {
case "stdout":
return v.Stdout, true
case "stderr":
return v.Stderr, true
case "rc":
return v.RC, true
case "changed":
return v.Changed, true
case "failed":
return v.Failed, true
case "skipped":
return v.Skipped, true
case "msg":
return v.Msg, true
}
case map[string]any:
if val, ok := v[field]; ok {
return val, true
}
}
return nil, false
}
func (e *Executor) resolveConditionOperand(expr string, host string, task *Task, locals map[string]any) (string, bool) {
expr = corexTrimSpace(expr)
if expr == "true" || expr == "True" || expr == "false" || expr == "False" {
return expr, true
}
if len(expr) > 0 && expr[0] >= '0' && expr[0] <= '9' {
return expr, true
}
if (len(expr) >= 2 && expr[0] == '\'' && expr[len(expr)-1] == '\'') || (len(expr) >= 2 && expr[0] == '"' && expr[len(expr)-1] == '"') {
return expr[1 : len(expr)-1], true
}
if value, ok := e.lookupConditionValue(expr, host, task, locals); ok {
return sprintf("%v", value), true
}
return expr, false
}
func (e *Executor) applyTaskResultConditions(host string, task *Task, result *TaskResult) {
if result == nil || task == nil {
return
}
locals := map[string]any{
"result": result,
"stdout": result.Stdout,
"stderr": result.Stderr,
"rc": result.RC,
"changed": result.Changed,
"failed": result.Failed,
"skipped": result.Skipped,
"msg": result.Msg,
"duration": result.Duration,
}
if task.ChangedWhen != nil {
result.Changed = e.evaluateWhenWithLocals(task.ChangedWhen, host, task, locals)
locals["changed"] = result.Changed
locals["result"] = result
}
if task.FailedWhen != nil {
result.Failed = e.evaluateWhenWithLocals(task.FailedWhen, host, task, locals)
locals["failed"] = result.Failed
locals["result"] = result
}
}
// getRegisteredVar gets a registered task result.
func (e *Executor) getRegisteredVar(host string, name string) *TaskResult {
e.mu.RLock()
defer e.mu.RUnlock()
// Handle dotted access (e.g., "result.stdout")
parts := splitN(name, ".", 2)
varName := parts[0]
if hostResults, ok := e.results[host]; ok {
if result, ok := hostResults[varName]; ok {
return result
}
}
return nil
}
// templateString applies Jinja2-like templating.
func (e *Executor) templateString(s string, host string, task *Task) string {
// Handle {{ var }} syntax
re := regexp.MustCompile(`\{\{\s*([^}]+)\s*\}\}`)
return re.ReplaceAllStringFunc(s, func(match string) string {
expr := corexTrimSpace(match[2 : len(match)-2])
return e.resolveExpr(expr, host, task)
})
}
// resolveExpr resolves a template expression.
func (e *Executor) resolveExpr(expr string, host string, task *Task) string {
// Handle filters
if contains(expr, " | ") {
parts := splitN(expr, " | ", 2)
value := e.resolveExpr(parts[0], host, task)
return e.applyFilter(value, parts[1])
}
// Handle lookups
if corexHasPrefix(expr, "lookup(") {
return e.handleLookup(expr)
}
// Handle registered vars
if contains(expr, ".") {
parts := splitN(expr, ".", 2)
if result := e.getRegisteredVar(host, parts[0]); result != nil {
switch parts[1] {
case "stdout":
return result.Stdout
case "stderr":
return result.Stderr
case "rc":
return sprintf("%d", result.RC)
case "changed":
return sprintf("%t", result.Changed)
case "failed":
return sprintf("%t", result.Failed)
}
}
}
// Resolve nested maps from vars, task vars, or host vars.
if contains(expr, ".") {
parts := splitN(expr, ".", 2)
if val, ok := e.lookupExprValue(parts[0], host, task); ok {
if nested, ok := lookupNestedValue(val, parts[1]); ok {
return sprintf("%v", nested)
}
}
}
// Check vars
if val, ok := e.vars[expr]; ok {
return sprintf("%v", val)
}
// Check task vars
if task != nil {
if val, ok := task.Vars[expr]; ok {
return sprintf("%v", val)
}
}
// Check host vars
if e.inventory != nil {
hostVars := GetHostVars(e.inventory, host)
if val, ok := hostVars[expr]; ok {
return sprintf("%v", val)
}
}
// Check facts
if facts, ok := e.facts[host]; ok {
switch expr {
case "ansible_hostname":
return facts.Hostname
case "ansible_fqdn":
return facts.FQDN
case "ansible_os_family":
return facts.OS
case "ansible_memtotal_mb":
return sprintf("%d", facts.Memory)
case "ansible_processor_vcpus":
return sprintf("%d", facts.CPUs)
case "ansible_default_ipv4_address":
return facts.IPv4
case "ansible_distribution":
return facts.Distribution
case "ansible_distribution_version":
return facts.Version
case "ansible_architecture":
return facts.Architecture
case "ansible_kernel":
return facts.Kernel
}
}
return "{{ " + expr + " }}" // Return as-is if unresolved
}
2026-04-01 20:51:57 +00:00
// buildEnvironmentPrefix renders merged play/task environment variables as
// shell export statements.
func (e *Executor) buildEnvironmentPrefix(host string, task *Task, play *Play) string {
env := make(map[string]string)
if play != nil {
for key, value := range play.Environment {
env[key] = value
}
}
if task != nil {
for key, value := range task.Environment {
env[key] = value
}
}
if len(env) == 0 {
return ""
}
keys := make([]string, 0, len(env))
for key := range env {
keys = append(keys, key)
}
slices.Sort(keys)
parts := make([]string, 0, len(keys))
for _, key := range keys {
renderedKey := e.templateString(key, host, task)
if renderedKey == "" {
continue
}
renderedValue := e.templateString(env[key], host, task)
parts = append(parts, sprintf("export %s=%s", renderedKey, shellQuote(renderedValue)))
}
if len(parts) == 0 {
return ""
}
return join("; ", parts) + "; "
}
// shellQuote wraps a string in single quotes for shell use.
func shellQuote(value string) string {
return "'" + replaceAll(value, "'", "'\\''") + "'"
}
// lookupExprValue resolves the first segment of an expression against the
// executor, task, and inventory scopes.
func (e *Executor) lookupExprValue(name string, host string, task *Task) (any, bool) {
if val, ok := e.vars[name]; ok {
return val, true
}
if task != nil {
if val, ok := task.Vars[name]; ok {
return val, true
}
}
if e.inventory != nil {
hostVars := GetHostVars(e.inventory, host)
if val, ok := hostVars[name]; ok {
return val, true
}
}
return nil, false
}
// lookupNestedValue walks a dotted path through nested maps.
func lookupNestedValue(value any, path string) (any, bool) {
if path == "" {
return value, true
}
current := value
for _, segment := range split(path, ".") {
next, ok := current.(map[string]any)
if !ok {
return nil, false
}
current, ok = next[segment]
if !ok {
return nil, false
}
}
return current, true
}
// applyFilter applies a Jinja2 filter.
func (e *Executor) applyFilter(value, filter string) string {
filter = corexTrimSpace(filter)
// Handle default filter
if corexHasPrefix(filter, "default(") {
if value == "" || value == "{{ "+filter+" }}" {
// Extract default value
re := regexp.MustCompile(`default\(([^)]*)\)`)
if match := re.FindStringSubmatch(filter); len(match) > 1 {
return trimCutset(match[1], "'\"")
}
}
return value
}
// Handle bool filter
if filter == "bool" {
lowered := lower(value)
if lowered == "true" || lowered == "yes" || lowered == "1" {
return "true"
}
return "false"
}
// Handle trim
if filter == "trim" {
return corexTrimSpace(value)
}
// Handle b64decode
if filter == "b64decode" {
2026-04-01 20:14:40 +00:00
decoded, err := base64.StdEncoding.DecodeString(value)
if err == nil {
return string(decoded)
}
if decoded, err := base64.RawStdEncoding.DecodeString(value); err == nil {
return string(decoded)
}
return value
}
return value
}
// handleLookup handles lookup() expressions.
func (e *Executor) handleLookup(expr string) string {
// Parse lookup('type', 'arg')
re := regexp.MustCompile(`lookup\s*\(\s*['"](\w+)['"]\s*,\s*['"]([^'"]+)['"]\s*`)
match := re.FindStringSubmatch(expr)
if len(match) < 3 {
return ""
}
lookupType := match[1]
arg := match[2]
switch lookupType {
case "env":
return env(arg)
case "file":
if data, err := coreio.Local.Read(arg); err == nil {
return data
}
}
return ""
}
// resolveLoop resolves loop items.
func (e *Executor) resolveLoop(loop any, host string) []any {
switch v := loop.(type) {
case []any:
return v
case []string:
items := make([]any, len(v))
for i, s := range v {
items[i] = s
}
return items
case string:
// Template the string and see if it's a var reference
resolved := e.templateString(v, host, nil)
if val, ok := e.vars[resolved]; ok {
if items, ok := val.([]any); ok {
return items
}
}
}
return nil
}
// matchesTags checks if task tags match execution tags.
func (e *Executor) matchesTags(taskTags []string) bool {
// If no tags specified, run all
if len(e.Tags) == 0 && len(e.SkipTags) == 0 {
return true
}
// Check skip tags
for _, skip := range e.SkipTags {
if slices.Contains(taskTags, skip) {
return false
}
}
// Check include tags
if len(e.Tags) > 0 {
for _, tag := range e.Tags {
if tag == "all" || slices.Contains(taskTags, tag) {
return true
}
}
return false
}
return true
}
// handleNotify marks handlers as notified.
func (e *Executor) handleNotify(notify any) {
switch v := notify.(type) {
case string:
e.notified[v] = true
case []any:
for _, n := range v {
if s, ok := n.(string); ok {
e.notified[s] = true
}
}
case []string:
for _, s := range v {
e.notified[s] = true
}
}
}
2026-04-01 19:51:23 +00:00
// runNotifiedHandlers executes any handlers that have been notified and then
// clears the notification state for those handlers.
func (e *Executor) runNotifiedHandlers(ctx context.Context, hosts []string, play *Play) error {
if play == nil || len(play.Handlers) == 0 {
return nil
}
pending := make(map[string]bool)
for name, notified := range e.notified {
if notified {
pending[name] = true
e.notified[name] = false
}
}
if len(pending) == 0 {
return nil
}
for _, handler := range play.Handlers {
if pending[handler.Name] {
if err := e.runTaskOnHosts(ctx, hosts, &handler, play); err != nil {
return err
}
}
}
return nil
}
// handleMetaAction applies module meta side effects after the task result has
// been recorded and callbacks have fired.
func (e *Executor) handleMetaAction(ctx context.Context, host string, hosts []string, play *Play, result *TaskResult) error {
2026-04-01 19:51:23 +00:00
if result == nil || result.Data == nil {
return nil
}
action, _ := result.Data["action"].(string)
switch action {
case "flush_handlers":
return e.runNotifiedHandlers(ctx, hosts, play)
2026-04-01 20:03:11 +00:00
case "clear_facts":
e.clearFacts(hosts)
return nil
case "clear_host_errors":
e.clearHostErrors()
return nil
case "end_play":
return errEndPlay
case "end_host":
e.markHostEnded(host)
return errEndHost
case "reset_connection":
e.resetConnection(host)
return nil
2026-04-01 19:51:23 +00:00
default:
return nil
}
}
2026-04-01 20:03:11 +00:00
// clearFacts removes cached facts for the given hosts.
func (e *Executor) clearFacts(hosts []string) {
e.mu.Lock()
defer e.mu.Unlock()
for _, host := range hosts {
delete(e.facts, host)
}
}
// clearHostErrors resets the current batch failure tracking so later tasks can
// proceed after a meta clear_host_errors action.
func (e *Executor) clearHostErrors() {
e.mu.Lock()
defer e.mu.Unlock()
e.batchFailedHosts = make(map[string]bool)
}
// markHostEnded records that a host should be skipped for the rest of the play.
func (e *Executor) markHostEnded(host string) {
if host == "" {
return
}
e.mu.Lock()
defer e.mu.Unlock()
if e.endedHosts == nil {
e.endedHosts = make(map[string]bool)
}
e.endedHosts[host] = true
}
// isHostEnded reports whether a host has been retired for the current play.
func (e *Executor) isHostEnded(host string) bool {
if host == "" {
return false
}
e.mu.RLock()
defer e.mu.RUnlock()
return e.endedHosts[host]
}
// filterActiveHosts removes hosts that have already been ended.
func (e *Executor) filterActiveHosts(hosts []string) []string {
if len(hosts) == 0 {
return hosts
}
filtered := make([]string, 0, len(hosts))
for _, host := range hosts {
if !e.isHostEnded(host) {
filtered = append(filtered, host)
}
}
return filtered
}
// resetConnection closes and removes the cached SSH client for a host.
func (e *Executor) resetConnection(host string) {
if host == "" {
return
}
e.mu.Lock()
client, ok := e.clients[host]
if ok {
delete(e.clients, host)
}
e.mu.Unlock()
if ok {
_ = client.Close()
}
}
// Close closes all SSH connections.
//
// Example:
//
// exec.Close()
func (e *Executor) Close() {
e.mu.Lock()
defer e.mu.Unlock()
for _, client := range e.clients {
_ = client.Close()
}
e.clients = make(map[string]sshExecutorClient)
}
// TemplateFile processes a template file.
//
// Example:
//
// content, err := exec.TemplateFile("/workspace/templates/app.conf.j2", "web1", &Task{})
func (e *Executor) TemplateFile(src, host string, task *Task) (string, error) {
content, err := coreio.Local.Read(src)
if err != nil {
return "", err
}
// Convert Jinja2 to Go template syntax (basic conversion)
tmplContent := content
tmplContent = replaceAll(tmplContent, "{{", "{{ .")
tmplContent = replaceAll(tmplContent, "{%", "{{")
tmplContent = replaceAll(tmplContent, "%}", "}}")
tmpl, err := template.New("template").Parse(tmplContent)
if err != nil {
// Fall back to simple replacement
return e.templateString(content, host, task), nil
}
// Build context map
context := make(map[string]any)
for k, v := range e.vars {
context[k] = v
}
// Add host vars
if e.inventory != nil {
hostVars := GetHostVars(e.inventory, host)
for k, v := range hostVars {
context[k] = v
}
}
// Add facts
if facts, ok := e.facts[host]; ok {
context["ansible_hostname"] = facts.Hostname
context["ansible_fqdn"] = facts.FQDN
context["ansible_os_family"] = facts.OS
context["ansible_memtotal_mb"] = facts.Memory
context["ansible_processor_vcpus"] = facts.CPUs
context["ansible_default_ipv4_address"] = facts.IPv4
context["ansible_distribution"] = facts.Distribution
context["ansible_distribution_version"] = facts.Version
context["ansible_architecture"] = facts.Architecture
context["ansible_kernel"] = facts.Kernel
}
buf := newBuilder()
if err := tmpl.Execute(buf, context); err != nil {
return e.templateString(content, host, task), nil
}
return buf.String(), nil
}