diff --git a/internal/cmd/ai/cmd_agent.go b/internal/cmd/ai/cmd_agent.go index 2c99a4a1..48489d6c 100644 --- a/internal/cmd/ai/cmd_agent.go +++ b/internal/cmd/ai/cmd_agent.go @@ -36,7 +36,7 @@ func loadConfig() (*config.Config, error) { func agentAddCmd() *cli.Command { cmd := &cli.Command{ Use: "add ", - Short: "Add an agent to the config", + Short: "Add an agent to the config and verify SSH", Args: cli.ExactArgs(2), RunE: func(cmd *cli.Command, args []string) error { name := args[0] @@ -50,14 +50,38 @@ func agentAddCmd() *cli.Command { if queueDir == "" { queueDir = "/home/claude/ai-work/queue" } + model, _ := cmd.Flags().GetString("model") + dualRun, _ := cmd.Flags().GetBool("dual-run") - // Test SSH connectivity. - // TODO: Replace exec ssh with charmbracelet/ssh native Go client + keygen. + // Scan and add host key to known_hosts. + parts := strings.Split(host, "@") + hostname := parts[len(parts)-1] + + fmt.Printf("Scanning host key for %s... ", hostname) + scanCmd := exec.Command("ssh-keyscan", "-H", hostname) + keys, err := scanCmd.Output() + if err != nil { + fmt.Println(errorStyle.Render("FAILED")) + return fmt.Errorf("failed to scan host keys: %w", err) + } + + home, _ := os.UserHomeDir() + knownHostsPath := filepath.Join(home, ".ssh", "known_hosts") + f, err := os.OpenFile(knownHostsPath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0600) + if err != nil { + return fmt.Errorf("failed to open known_hosts: %w", err) + } + if _, err := f.Write(keys); err != nil { + f.Close() + return fmt.Errorf("failed to write known_hosts: %w", err) + } + f.Close() + fmt.Println(successStyle.Render("OK")) + + // Test SSH with strict host key checking. fmt.Printf("Testing SSH to %s... ", host) - out, err := exec.Command("ssh", - "-o", "StrictHostKeyChecking=accept-new", - "-o", "ConnectTimeout=10", - host, "echo ok").CombinedOutput() + testCmd := agentci.SecureSSHCommand(host, "echo ok") + out, err := testCmd.CombinedOutput() if err != nil { fmt.Println(errorStyle.Render("FAILED")) return fmt.Errorf("SSH failed: %s", strings.TrimSpace(string(out))) @@ -73,6 +97,8 @@ func agentAddCmd() *cli.Command { Host: host, QueueDir: queueDir, ForgejoUser: forgejoUser, + Model: model, + DualRun: dualRun, Active: true, } if err := agentci.SaveAgent(cfg, name, ac); err != nil { @@ -85,6 +111,8 @@ func agentAddCmd() *cli.Command { } cmd.Flags().String("forgejo-user", "", "Forgejo username (defaults to agent name)") cmd.Flags().String("queue-dir", "", "Remote queue directory (default: /home/claude/ai-work/queue)") + cmd.Flags().String("model", "sonnet", "Primary AI model") + cmd.Flags().Bool("dual-run", false, "Enable Clotho dual-run verification") return cmd } @@ -108,22 +136,21 @@ func agentListCmd() *cli.Command { return nil } - table := cli.NewTable("NAME", "HOST", "FORGEJO USER", "ACTIVE", "QUEUE") + table := cli.NewTable("NAME", "HOST", "MODEL", "DUAL", "ACTIVE", "QUEUE") for name, ac := range agents { active := dimStyle.Render("no") if ac.Active { active = successStyle.Render("yes") } + dual := dimStyle.Render("no") + if ac.DualRun { + dual = successStyle.Render("yes") + } // Quick SSH check for queue depth. - // TODO: Replace exec ssh with charmbracelet/ssh native Go client. queue := dimStyle.Render("-") - out, err := exec.Command("ssh", - "-o", "StrictHostKeyChecking=accept-new", - "-o", "ConnectTimeout=5", - ac.Host, - fmt.Sprintf("ls %s/ticket-*.json 2>/dev/null | wc -l", ac.QueueDir), - ).Output() + checkCmd := agentci.SecureSSHCommand(ac.Host, fmt.Sprintf("ls %s/ticket-*.json 2>/dev/null | wc -l", ac.QueueDir)) + out, err := checkCmd.Output() if err == nil { n := strings.TrimSpace(string(out)) if n != "0" { @@ -133,7 +160,7 @@ func agentListCmd() *cli.Command { } } - table.AddRow(name, ac.Host, ac.ForgejoUser, active, queue) + table.AddRow(name, ac.Host, ac.Model, dual, active, queue) } table.Render() return nil @@ -182,11 +209,7 @@ func agentStatusCmd() *cli.Command { fi ` - // TODO: Replace exec ssh with charmbracelet/ssh native Go client. - sshCmd := exec.Command("ssh", - "-o", "StrictHostKeyChecking=accept-new", - "-o", "ConnectTimeout=10", - ac.Host, script) + sshCmd := agentci.SecureSSHCommand(ac.Host, script) sshCmd.Stdout = os.Stdout sshCmd.Stderr = os.Stderr return sshCmd.Run() @@ -218,19 +241,12 @@ func agentLogsCmd() *cli.Command { return fmt.Errorf("agent %q not found", name) } - // TODO: Replace exec ssh with charmbracelet/ssh native Go client. - tailArgs := []string{ - "-o", "StrictHostKeyChecking=accept-new", - "-o", "ConnectTimeout=10", - ac.Host, - } + remoteCmd := fmt.Sprintf("tail -n %d ~/ai-work/logs/runner.log", lines) if follow { - tailArgs = append(tailArgs, fmt.Sprintf("tail -f -n %d ~/ai-work/logs/runner.log", lines)) - } else { - tailArgs = append(tailArgs, fmt.Sprintf("tail -n %d ~/ai-work/logs/runner.log", lines)) + remoteCmd = fmt.Sprintf("tail -f -n %d ~/ai-work/logs/runner.log", lines) } - sshCmd := exec.Command("ssh", tailArgs...) + sshCmd := agentci.SecureSSHCommand(ac.Host, remoteCmd) sshCmd.Stdout = os.Stdout sshCmd.Stderr = os.Stderr sshCmd.Stdin = os.Stdin @@ -307,7 +323,6 @@ func agentRemoveCmd() *cli.Command { // findSetupScript looks for agent-setup.sh in common locations. func findSetupScript() string { - // Relative to executable. exe, _ := os.Executable() if exe != "" { dir := filepath.Dir(exe) @@ -322,7 +337,6 @@ func findSetupScript() string { } } - // Working directory. cwd, _ := os.Getwd() if cwd != "" { p := filepath.Join(cwd, "scripts", "agent-setup.sh") @@ -333,4 +347,3 @@ func findSetupScript() string { return "" } - diff --git a/internal/core-ide/headless.go b/internal/core-ide/headless.go index 1ac22efa..cd2619a8 100644 --- a/internal/core-ide/headless.go +++ b/internal/core-ide/headless.go @@ -67,17 +67,23 @@ func startHeadless() { enableAutoMerge := handlers.NewEnableAutoMergeHandler(forgeClient) tickParent := handlers.NewTickParentHandler(forgeClient) - // Agent dispatch — load targets from ~/.core/config.yaml + // Agent dispatch — Clotho integration cfg, cfgErr := config.New() - var agentTargets map[string]handlers.AgentTarget + var agentTargets map[string]agentci.AgentConfig + var clothoCfg agentci.ClothoConfig + if cfgErr == nil { - agentTargets, _ = agentci.LoadAgents(cfg) + agentTargets, _ = agentci.LoadActiveAgents(cfg) + clothoCfg, _ = agentci.LoadClothoConfig(cfg) } if agentTargets == nil { - agentTargets = map[string]handlers.AgentTarget{} + agentTargets = map[string]agentci.AgentConfig{} } - log.Printf("Loaded %d agent targets", len(agentTargets)) - dispatch := handlers.NewDispatchHandler(forgeClient, forgeURL, forgeToken, agentTargets) + + spinner := agentci.NewSpinner(clothoCfg, agentTargets) + log.Printf("Loaded %d agent targets. Strategy: %s", len(agentTargets), clothoCfg.Strategy) + + dispatch := handlers.NewDispatchHandler(forgeClient, forgeURL, forgeToken, spinner) // Build poller poller := jobrunner.NewPoller(jobrunner.PollerConfig{ diff --git a/pkg/agentci/clotho.go b/pkg/agentci/clotho.go new file mode 100644 index 00000000..f8b25b37 --- /dev/null +++ b/pkg/agentci/clotho.go @@ -0,0 +1,68 @@ +package agentci + +import ( + "context" + "strings" + + "github.com/host-uk/core/pkg/jobrunner" +) + +// RunMode determines the execution strategy for a dispatched task. +type RunMode string + +const ( + ModeStandard RunMode = "standard" + ModeDual RunMode = "dual" // The Clotho Protocol — dual-run verification +) + +// Spinner is the Clotho orchestrator that determines the fate of each task. +type Spinner struct { + Config ClothoConfig + Agents map[string]AgentConfig +} + +// NewSpinner creates a new Clotho orchestrator. +func NewSpinner(cfg ClothoConfig, agents map[string]AgentConfig) *Spinner { + return &Spinner{ + Config: cfg, + Agents: agents, + } +} + +// DeterminePlan decides if a signal requires dual-run verification based on +// the global strategy, agent configuration, and repository criticality. +func (s *Spinner) DeterminePlan(signal *jobrunner.PipelineSignal, agentName string) RunMode { + if s.Config.Strategy != "clotho-verified" { + return ModeStandard + } + + agent, ok := s.Agents[agentName] + if !ok { + return ModeStandard + } + if agent.DualRun { + return ModeDual + } + + // Protect critical repos with dual-run (Axiom 1). + if signal.RepoName == "core" || strings.Contains(signal.RepoName, "security") { + return ModeDual + } + + return ModeStandard +} + +// GetVerifierModel returns the model for the secondary "signed" verification run. +func (s *Spinner) GetVerifierModel(agentName string) string { + agent, ok := s.Agents[agentName] + if !ok || agent.VerifyModel == "" { + return "gemini-1.5-pro" + } + return agent.VerifyModel +} + +// Weave compares primary and verifier outputs. Returns true if they converge. +// This is a placeholder for future semantic diff logic. +func (s *Spinner) Weave(ctx context.Context, primaryOutput, signedOutput []byte) (bool, error) { + return string(primaryOutput) == string(signedOutput), nil +} diff --git a/pkg/agentci/config.go b/pkg/agentci/config.go index f0d39c29..1c3c0544 100644 --- a/pkg/agentci/config.go +++ b/pkg/agentci/config.go @@ -1,62 +1,96 @@ -// Package agentci provides configuration and management for AgentCI dispatch targets. +// Package agentci provides configuration, security, and orchestration for AgentCI dispatch targets. package agentci import ( "fmt" "github.com/host-uk/core/pkg/config" - "github.com/host-uk/core/pkg/jobrunner/handlers" - "github.com/host-uk/core/pkg/log" ) // AgentConfig represents a single agent machine in the config file. type AgentConfig struct { - Host string `yaml:"host" mapstructure:"host"` - QueueDir string `yaml:"queue_dir" mapstructure:"queue_dir"` - ForgejoUser string `yaml:"forgejo_user" mapstructure:"forgejo_user"` - Model string `yaml:"model" mapstructure:"model"` // claude model: sonnet, haiku, opus (default: sonnet) - Runner string `yaml:"runner" mapstructure:"runner"` // runner binary: claude, codex (default: claude) - Active bool `yaml:"active" mapstructure:"active"` + Host string `yaml:"host" mapstructure:"host"` + QueueDir string `yaml:"queue_dir" mapstructure:"queue_dir"` + ForgejoUser string `yaml:"forgejo_user" mapstructure:"forgejo_user"` + Model string `yaml:"model" mapstructure:"model"` // primary AI model + Runner string `yaml:"runner" mapstructure:"runner"` // runner binary: claude, codex, gemini + VerifyModel string `yaml:"verify_model" mapstructure:"verify_model"` // secondary model for dual-run + SecurityLevel string `yaml:"security_level" mapstructure:"security_level"` // low, high + Roles []string `yaml:"roles" mapstructure:"roles"` + DualRun bool `yaml:"dual_run" mapstructure:"dual_run"` + Active bool `yaml:"active" mapstructure:"active"` } -// LoadAgents reads agent targets from config and returns a map suitable for the dispatch handler. +// ClothoConfig controls the orchestration strategy. +type ClothoConfig struct { + Strategy string `yaml:"strategy" mapstructure:"strategy"` // direct, clotho-verified + ValidationThreshold float64 `yaml:"validation_threshold" mapstructure:"validation_threshold"` // divergence limit (0.0-1.0) + SigningKeyPath string `yaml:"signing_key_path" mapstructure:"signing_key_path"` +} + +// LoadAgents reads agent targets from config and returns a map of AgentConfig. // Returns an empty map (not an error) if no agents are configured. -func LoadAgents(cfg *config.Config) (map[string]handlers.AgentTarget, error) { +func LoadAgents(cfg *config.Config) (map[string]AgentConfig, error) { var agents map[string]AgentConfig if err := cfg.Get("agentci.agents", &agents); err != nil { - // No config is fine — just no agents. - return map[string]handlers.AgentTarget{}, nil + return map[string]AgentConfig{}, nil } - targets := make(map[string]handlers.AgentTarget) + // Validate and apply defaults. for name, ac := range agents { if !ac.Active { continue } if ac.Host == "" { - return nil, log.E("agentci.LoadAgents", fmt.Sprintf("agent %q: host is required", name), nil) + return nil, fmt.Errorf("agent %q: host is required", name) } - queueDir := ac.QueueDir - if queueDir == "" { - queueDir = "/home/claude/ai-work/queue" + if ac.QueueDir == "" { + ac.QueueDir = "/home/claude/ai-work/queue" } - model := ac.Model - if model == "" { - model = "sonnet" + if ac.Model == "" { + ac.Model = "sonnet" } - runner := ac.Runner - if runner == "" { - runner = "claude" - } - targets[name] = handlers.AgentTarget{ - Host: ac.Host, - QueueDir: queueDir, - Model: model, - Runner: runner, + if ac.Runner == "" { + ac.Runner = "claude" } + agents[name] = ac } - return targets, nil + return agents, nil +} + +// LoadActiveAgents returns only active agents. +func LoadActiveAgents(cfg *config.Config) (map[string]AgentConfig, error) { + all, err := LoadAgents(cfg) + if err != nil { + return nil, err + } + active := make(map[string]AgentConfig) + for name, ac := range all { + if ac.Active { + active[name] = ac + } + } + return active, nil +} + +// LoadClothoConfig loads the Clotho orchestrator settings. +// Returns sensible defaults if no config is present. +func LoadClothoConfig(cfg *config.Config) (ClothoConfig, error) { + var cc ClothoConfig + if err := cfg.Get("agentci.clotho", &cc); err != nil { + return ClothoConfig{ + Strategy: "direct", + ValidationThreshold: 0.85, + }, nil + } + if cc.Strategy == "" { + cc.Strategy = "direct" + } + if cc.ValidationThreshold == 0 { + cc.ValidationThreshold = 0.85 + } + return cc, nil } // SaveAgent writes an agent config entry to the config file. @@ -67,6 +101,7 @@ func SaveAgent(cfg *config.Config, name string, ac AgentConfig) error { "queue_dir": ac.QueueDir, "forgejo_user": ac.ForgejoUser, "active": ac.Active, + "dual_run": ac.DualRun, } if ac.Model != "" { data["model"] = ac.Model @@ -74,6 +109,15 @@ func SaveAgent(cfg *config.Config, name string, ac AgentConfig) error { if ac.Runner != "" { data["runner"] = ac.Runner } + if ac.VerifyModel != "" { + data["verify_model"] = ac.VerifyModel + } + if ac.SecurityLevel != "" { + data["security_level"] = ac.SecurityLevel + } + if len(ac.Roles) > 0 { + data["roles"] = ac.Roles + } return cfg.Set(key, data) } @@ -81,10 +125,10 @@ func SaveAgent(cfg *config.Config, name string, ac AgentConfig) error { func RemoveAgent(cfg *config.Config, name string) error { var agents map[string]AgentConfig if err := cfg.Get("agentci.agents", &agents); err != nil { - return log.E("agentci.RemoveAgent", "no agents configured", err) + return fmt.Errorf("no agents configured") } if _, ok := agents[name]; !ok { - return log.E("agentci.RemoveAgent", fmt.Sprintf("agent %q not found", name), nil) + return fmt.Errorf("agent %q not found", name) } delete(agents, name) return cfg.Set("agentci.agents", agents) diff --git a/pkg/agentci/config_test.go b/pkg/agentci/config_test.go index 6295b04c..4867457b 100644 --- a/pkg/agentci/config_test.go +++ b/pkg/agentci/config_test.go @@ -5,7 +5,6 @@ import ( "github.com/host-uk/core/pkg/config" "github.com/host-uk/core/pkg/io" - "github.com/host-uk/core/pkg/jobrunner/handlers" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -33,11 +32,11 @@ agentci: runner: claude active: true `) - targets, err := LoadAgents(cfg) + agents, err := LoadAgents(cfg) require.NoError(t, err) - require.Len(t, targets, 1) + require.Len(t, agents, 1) - agent := targets["darbs-claude"] + agent := agents["darbs-claude"] assert.Equal(t, "claude@192.168.0.201", agent.Host) assert.Equal(t, "/home/claude/ai-work/queue", agent.QueueDir) assert.Equal(t, "sonnet", agent.Model) @@ -58,11 +57,11 @@ agentci: runner: codex active: true `) - targets, err := LoadAgents(cfg) + agents, err := LoadAgents(cfg) require.NoError(t, err) - assert.Len(t, targets, 2) - assert.Contains(t, targets, "darbs-claude") - assert.Contains(t, targets, "local-codex") + assert.Len(t, agents, 2) + assert.Contains(t, agents, "darbs-claude") + assert.Contains(t, agents, "local-codex") } func TestLoadAgents_Good_SkipsInactive(t *testing.T) { @@ -76,10 +75,28 @@ agentci: host: claude@10.0.0.2 active: false `) - targets, err := LoadAgents(cfg) + agents, err := LoadAgents(cfg) require.NoError(t, err) - assert.Len(t, targets, 1) - assert.Contains(t, targets, "active-agent") + // Both are returned, but only active-agent has defaults applied. + assert.Len(t, agents, 2) + assert.Contains(t, agents, "active-agent") +} + +func TestLoadActiveAgents_Good(t *testing.T) { + cfg := newTestConfig(t, ` +agentci: + agents: + active-agent: + host: claude@10.0.0.1 + active: true + offline-agent: + host: claude@10.0.0.2 + active: false +`) + active, err := LoadActiveAgents(cfg) + require.NoError(t, err) + assert.Len(t, active, 1) + assert.Contains(t, active, "active-agent") } func TestLoadAgents_Good_Defaults(t *testing.T) { @@ -90,11 +107,11 @@ agentci: host: claude@10.0.0.1 active: true `) - targets, err := LoadAgents(cfg) + agents, err := LoadAgents(cfg) require.NoError(t, err) - require.Len(t, targets, 1) + require.Len(t, agents, 1) - agent := targets["minimal"] + agent := agents["minimal"] assert.Equal(t, "/home/claude/ai-work/queue", agent.QueueDir) assert.Equal(t, "sonnet", agent.Model) assert.Equal(t, "claude", agent.Runner) @@ -102,9 +119,9 @@ agentci: func TestLoadAgents_Good_NoConfig(t *testing.T) { cfg := newTestConfig(t, "") - targets, err := LoadAgents(cfg) + agents, err := LoadAgents(cfg) require.NoError(t, err) - assert.Empty(t, targets) + assert.Empty(t, agents) } func TestLoadAgents_Bad_MissingHost(t *testing.T) { @@ -120,24 +137,49 @@ agentci: assert.Contains(t, err.Error(), "host is required") } -func TestLoadAgents_Good_ReturnsAgentTargets(t *testing.T) { +func TestLoadAgents_Good_WithDualRun(t *testing.T) { cfg := newTestConfig(t, ` agentci: agents: gemini-agent: host: localhost runner: gemini - model: "" + model: gemini-2.0-flash + verify_model: gemini-1.5-pro + dual_run: true active: true `) - targets, err := LoadAgents(cfg) + agents, err := LoadAgents(cfg) require.NoError(t, err) - agent := targets["gemini-agent"] - // Verify it returns the handlers.AgentTarget type. - var _ handlers.AgentTarget = agent + agent := agents["gemini-agent"] assert.Equal(t, "gemini", agent.Runner) - assert.Equal(t, "sonnet", agent.Model) // default when empty + assert.Equal(t, "gemini-2.0-flash", agent.Model) + assert.Equal(t, "gemini-1.5-pro", agent.VerifyModel) + assert.True(t, agent.DualRun) +} + +func TestLoadClothoConfig_Good(t *testing.T) { + cfg := newTestConfig(t, ` +agentci: + clotho: + strategy: clotho-verified + validation_threshold: 0.9 + signing_key_path: /etc/core/keys/clotho.pub +`) + cc, err := LoadClothoConfig(cfg) + require.NoError(t, err) + assert.Equal(t, "clotho-verified", cc.Strategy) + assert.Equal(t, 0.9, cc.ValidationThreshold) + assert.Equal(t, "/etc/core/keys/clotho.pub", cc.SigningKeyPath) +} + +func TestLoadClothoConfig_Good_Defaults(t *testing.T) { + cfg := newTestConfig(t, "") + cc, err := LoadClothoConfig(cfg) + require.NoError(t, err) + assert.Equal(t, "direct", cc.Strategy) + assert.Equal(t, 0.85, cc.ValidationThreshold) } func TestSaveAgent_Good(t *testing.T) { @@ -153,7 +195,6 @@ func TestSaveAgent_Good(t *testing.T) { }) require.NoError(t, err) - // Verify we can load it back. agents, err := ListAgents(cfg) require.NoError(t, err) require.Contains(t, agents, "new-agent") @@ -161,6 +202,24 @@ func TestSaveAgent_Good(t *testing.T) { assert.Equal(t, "haiku", agents["new-agent"].Model) } +func TestSaveAgent_Good_WithDualRun(t *testing.T) { + cfg := newTestConfig(t, "") + + err := SaveAgent(cfg, "verified-agent", AgentConfig{ + Host: "claude@10.0.0.5", + Model: "gemini-2.0-flash", + VerifyModel: "gemini-1.5-pro", + DualRun: true, + Active: true, + }) + require.NoError(t, err) + + agents, err := ListAgents(cfg) + require.NoError(t, err) + require.Contains(t, agents, "verified-agent") + assert.True(t, agents["verified-agent"].DualRun) +} + func TestSaveAgent_Good_OmitsEmptyOptionals(t *testing.T) { cfg := newTestConfig(t, "") @@ -243,7 +302,6 @@ func TestListAgents_Good_Empty(t *testing.T) { func TestRoundTrip_SaveThenLoad(t *testing.T) { cfg := newTestConfig(t, "") - // Save two agents. err := SaveAgent(cfg, "alpha", AgentConfig{ Host: "claude@alpha", QueueDir: "/home/claude/work/queue", @@ -262,11 +320,10 @@ func TestRoundTrip_SaveThenLoad(t *testing.T) { }) require.NoError(t, err) - // Load as AgentTargets (what the dispatch handler uses). - targets, err := LoadAgents(cfg) + agents, err := LoadActiveAgents(cfg) require.NoError(t, err) - assert.Len(t, targets, 2) - assert.Equal(t, "claude@alpha", targets["alpha"].Host) - assert.Equal(t, "opus", targets["alpha"].Model) - assert.Equal(t, "codex", targets["beta"].Runner) + assert.Len(t, agents, 2) + assert.Equal(t, "claude@alpha", agents["alpha"].Host) + assert.Equal(t, "opus", agents["alpha"].Model) + assert.Equal(t, "codex", agents["beta"].Runner) } diff --git a/pkg/agentci/security.go b/pkg/agentci/security.go new file mode 100644 index 00000000..f917b3f2 --- /dev/null +++ b/pkg/agentci/security.go @@ -0,0 +1,49 @@ +package agentci + +import ( + "fmt" + "os/exec" + "path/filepath" + "regexp" + "strings" +) + +var safeNameRegex = regexp.MustCompile(`^[a-zA-Z0-9\-\_\.]+$`) + +// SanitizePath ensures a filename or directory name is safe and prevents path traversal. +// Returns filepath.Base of the input after validation. +func SanitizePath(input string) (string, error) { + base := filepath.Base(input) + if !safeNameRegex.MatchString(base) { + return "", fmt.Errorf("invalid characters in path element: %s", input) + } + if base == "." || base == ".." || base == "/" { + return "", fmt.Errorf("invalid path element: %s", base) + } + return base, nil +} + +// EscapeShellArg wraps a string in single quotes for safe remote shell insertion. +// Prefer exec.Command arguments over constructing shell strings where possible. +func EscapeShellArg(arg string) string { + return "'" + strings.ReplaceAll(arg, "'", "'\\''") + "'" +} + +// SecureSSHCommand creates an SSH exec.Cmd with strict host key checking and batch mode. +func SecureSSHCommand(host string, remoteCmd string) *exec.Cmd { + return exec.Command("ssh", + "-o", "StrictHostKeyChecking=yes", + "-o", "BatchMode=yes", + "-o", "ConnectTimeout=10", + host, + remoteCmd, + ) +} + +// MaskToken returns a masked version of a token for safe logging. +func MaskToken(token string) string { + if len(token) < 8 { + return "*****" + } + return token[:4] + "****" + token[len(token)-4:] +} diff --git a/pkg/forge/issues.go b/pkg/forge/issues.go index 1321f2d4..55fa2e3f 100644 --- a/pkg/forge/issues.go +++ b/pkg/forge/issues.go @@ -75,6 +75,17 @@ func (c *Client) EditIssue(owner, repo string, number int64, opts forgejo.EditIs return issue, nil } +// AssignIssue assigns an issue to the specified users. +func (c *Client) AssignIssue(owner, repo string, number int64, assignees []string) error { + _, _, err := c.api.EditIssue(owner, repo, number, forgejo.EditIssueOption{ + Assignees: assignees, + }) + if err != nil { + return log.E("forge.AssignIssue", "failed to assign issue", err) + } + return nil +} + // ListPullRequests returns pull requests for the given repository. func (c *Client) ListPullRequests(owner, repo string, state string) ([]*forgejo.PullRequest, error) { st := forgejo.StateOpen diff --git a/pkg/forge/labels.go b/pkg/forge/labels.go index 89f2de28..d97fb62f 100644 --- a/pkg/forge/labels.go +++ b/pkg/forge/labels.go @@ -1,6 +1,9 @@ package forge import ( + "fmt" + "strings" + forgejo "codeberg.org/mvdkleijn/forgejo-sdk/forgejo/v2" "github.com/host-uk/core/pkg/log" @@ -58,3 +61,52 @@ func (c *Client) CreateRepoLabel(owner, repo string, opts forgejo.CreateLabelOpt return label, nil } + +// GetLabelByName retrieves a specific label by name from a repository. +func (c *Client) GetLabelByName(owner, repo, name string) (*forgejo.Label, error) { + labels, err := c.ListRepoLabels(owner, repo) + if err != nil { + return nil, err + } + + for _, l := range labels { + if strings.EqualFold(l.Name, name) { + return l, nil + } + } + + return nil, fmt.Errorf("label %s not found in %s/%s", name, owner, repo) +} + +// EnsureLabel checks if a label exists, and creates it if it doesn't. +func (c *Client) EnsureLabel(owner, repo, name, color string) (*forgejo.Label, error) { + label, err := c.GetLabelByName(owner, repo, name) + if err == nil { + return label, nil + } + + return c.CreateRepoLabel(owner, repo, forgejo.CreateLabelOption{ + Name: name, + Color: color, + }) +} + +// AddIssueLabels adds labels to an issue. +func (c *Client) AddIssueLabels(owner, repo string, number int64, labelIDs []int64) error { + _, _, err := c.api.AddIssueLabels(owner, repo, number, forgejo.IssueLabelsOption{ + Labels: labelIDs, + }) + if err != nil { + return log.E("forge.AddIssueLabels", "failed to add labels to issue", err) + } + return nil +} + +// RemoveIssueLabel removes a label from an issue. +func (c *Client) RemoveIssueLabel(owner, repo string, number int64, labelID int64) error { + _, err := c.api.DeleteIssueLabel(owner, repo, number, labelID) + if err != nil { + return log.E("forge.RemoveIssueLabel", "failed to remove label from issue", err) + } + return nil +} diff --git a/pkg/jobrunner/handlers/dispatch.go b/pkg/jobrunner/handlers/dispatch.go index 886f96c2..de2ae7d2 100644 --- a/pkg/jobrunner/handlers/dispatch.go +++ b/pkg/jobrunner/handlers/dispatch.go @@ -1,28 +1,31 @@ package handlers import ( + "bytes" "context" "encoding/json" "fmt" - "os/exec" "path/filepath" - "strings" "time" + "github.com/host-uk/core/pkg/agentci" "github.com/host-uk/core/pkg/forge" "github.com/host-uk/core/pkg/jobrunner" "github.com/host-uk/core/pkg/log" ) -// AgentTarget maps a Forgejo username to an SSH-reachable agent machine. -type AgentTarget struct { - Host string // SSH destination (e.g., "claude@192.168.0.201") - QueueDir string // Remote queue directory (e.g., "~/ai-work/queue") - Model string // AI model: sonnet, haiku, opus (default: sonnet) - Runner string // Runner binary: claude, codex (default: claude) -} +const ( + LabelAgentReady = "agent-ready" + LabelInProgress = "in-progress" + LabelAgentFailed = "agent-failed" + LabelAgentComplete = "agent-completed" + + ColorInProgress = "#1d76db" // Blue + ColorAgentFailed = "#c0392b" // Red +) // DispatchTicket is the JSON payload written to the agent's queue. +// The ForgeToken is transferred separately via a .env file with 0600 permissions. type DispatchTicket struct { ID string `json:"id"` RepoOwner string `json:"repo_owner"` @@ -33,28 +36,29 @@ type DispatchTicket struct { TargetBranch string `json:"target_branch"` EpicNumber int `json:"epic_number"` ForgeURL string `json:"forge_url"` - ForgeToken string `json:"forge_token"` ForgeUser string `json:"forgejo_user"` Model string `json:"model,omitempty"` Runner string `json:"runner,omitempty"` + VerifyModel string `json:"verify_model,omitempty"` + DualRun bool `json:"dual_run"` CreatedAt string `json:"created_at"` } -// DispatchHandler dispatches coding work to remote agent machines via SSH/SCP. +// DispatchHandler dispatches coding work to remote agent machines via SSH. type DispatchHandler struct { forge *forge.Client forgeURL string token string - agents map[string]AgentTarget + spinner *agentci.Spinner } // NewDispatchHandler creates a handler that dispatches tickets to agent machines. -func NewDispatchHandler(client *forge.Client, forgeURL, token string, agents map[string]AgentTarget) *DispatchHandler { +func NewDispatchHandler(client *forge.Client, forgeURL, token string, spinner *agentci.Spinner) *DispatchHandler { return &DispatchHandler{ forge: client, forgeURL: forgeURL, token: token, - agents: agents, + spinner: spinner, } } @@ -69,62 +73,108 @@ func (h *DispatchHandler) Match(signal *jobrunner.PipelineSignal) bool { if !signal.NeedsCoding { return false } - _, ok := h.agents[signal.Assignee] + _, ok := h.spinner.Agents[signal.Assignee] return ok } -// Execute creates a ticket JSON and SCPs it to the agent's queue directory. +// Execute creates a ticket JSON and transfers it securely to the agent's queue directory. func (h *DispatchHandler) Execute(ctx context.Context, signal *jobrunner.PipelineSignal) (*jobrunner.ActionResult, error) { start := time.Now() - agent, ok := h.agents[signal.Assignee] + agent, ok := h.spinner.Agents[signal.Assignee] if !ok { - return nil, log.E("dispatch.Execute", fmt.Sprintf("unknown agent: %s", signal.Assignee), nil) + return nil, fmt.Errorf("unknown agent: %s", signal.Assignee) } - // Determine target branch (default to repo default). + // Sanitize inputs to prevent path traversal. + safeOwner, err := agentci.SanitizePath(signal.RepoOwner) + if err != nil { + return nil, fmt.Errorf("invalid repo owner: %w", err) + } + safeRepo, err := agentci.SanitizePath(signal.RepoName) + if err != nil { + return nil, fmt.Errorf("invalid repo name: %w", err) + } + + // Ensure in-progress label exists on repo. + inProgressLabel, err := h.forge.EnsureLabel(safeOwner, safeRepo, LabelInProgress, ColorInProgress) + if err != nil { + return nil, fmt.Errorf("ensure label %s: %w", LabelInProgress, err) + } + + // Check if already in progress to prevent double-dispatch. + issue, err := h.forge.GetIssue(safeOwner, safeRepo, int64(signal.ChildNumber)) + if err == nil { + for _, l := range issue.Labels { + if l.Name == LabelInProgress || l.Name == LabelAgentComplete { + log.Info("issue already processed, skipping", "issue", signal.ChildNumber, "label", l.Name) + return &jobrunner.ActionResult{ + Action: "dispatch", + Success: true, + Timestamp: time.Now(), + Duration: time.Since(start), + }, nil + } + } + } + + // Assign agent and add in-progress label. + if err := h.forge.AssignIssue(safeOwner, safeRepo, int64(signal.ChildNumber), []string{signal.Assignee}); err != nil { + log.Warn("failed to assign agent, continuing", "err", err) + } + + if err := h.forge.AddIssueLabels(safeOwner, safeRepo, int64(signal.ChildNumber), []int64{inProgressLabel.ID}); err != nil { + return nil, fmt.Errorf("add in-progress label: %w", err) + } + + // Remove agent-ready label if present. + if readyLabel, err := h.forge.GetLabelByName(safeOwner, safeRepo, LabelAgentReady); err == nil { + _ = h.forge.RemoveIssueLabel(safeOwner, safeRepo, int64(signal.ChildNumber), readyLabel.ID) + } + + // Clotho planning — determine execution mode. + runMode := h.spinner.DeterminePlan(signal, signal.Assignee) + verifyModel := "" + if runMode == agentci.ModeDual { + verifyModel = h.spinner.GetVerifierModel(signal.Assignee) + } + + // Build ticket. targetBranch := "new" // TODO: resolve from epic or repo default + ticketID := fmt.Sprintf("%s-%s-%d-%d", safeOwner, safeRepo, signal.ChildNumber, time.Now().Unix()) ticket := DispatchTicket{ - ID: fmt.Sprintf("%s-%s-%d-%d", signal.RepoOwner, signal.RepoName, signal.ChildNumber, time.Now().Unix()), - RepoOwner: signal.RepoOwner, - RepoName: signal.RepoName, + ID: ticketID, + RepoOwner: safeOwner, + RepoName: safeRepo, IssueNumber: signal.ChildNumber, IssueTitle: signal.IssueTitle, IssueBody: signal.IssueBody, TargetBranch: targetBranch, EpicNumber: signal.EpicNumber, ForgeURL: h.forgeURL, - ForgeToken: h.token, ForgeUser: signal.Assignee, Model: agent.Model, Runner: agent.Runner, + VerifyModel: verifyModel, + DualRun: runMode == agentci.ModeDual, CreatedAt: time.Now().UTC().Format(time.RFC3339), } ticketJSON, err := json.MarshalIndent(ticket, "", " ") if err != nil { - return &jobrunner.ActionResult{ - Action: "dispatch", - RepoOwner: signal.RepoOwner, - RepoName: signal.RepoName, - EpicNumber: signal.EpicNumber, - ChildNumber: signal.ChildNumber, - Success: false, - Error: fmt.Sprintf("marshal ticket: %v", err), - Timestamp: time.Now(), - Duration: time.Since(start), - }, nil + h.failDispatch(signal, "Failed to marshal ticket JSON") + return nil, fmt.Errorf("marshal ticket: %w", err) } // Check if ticket already exists on agent (dedup). - ticketName := fmt.Sprintf("ticket-%s-%s-%d.json", signal.RepoOwner, signal.RepoName, signal.ChildNumber) - if h.ticketExists(agent, ticketName) { + ticketName := fmt.Sprintf("ticket-%s-%s-%d.json", safeOwner, safeRepo, signal.ChildNumber) + if h.ticketExists(ctx, agent, ticketName) { log.Info("ticket already queued, skipping", "ticket", ticketName, "agent", signal.Assignee) return &jobrunner.ActionResult{ Action: "dispatch", - RepoOwner: signal.RepoOwner, - RepoName: signal.RepoName, + RepoOwner: safeOwner, + RepoName: safeRepo, EpicNumber: signal.EpicNumber, ChildNumber: signal.ChildNumber, Success: true, @@ -133,30 +183,55 @@ func (h *DispatchHandler) Execute(ctx context.Context, signal *jobrunner.Pipelin }, nil } - // SCP ticket to agent queue. - remotePath := filepath.Join(agent.QueueDir, ticketName) - if err := h.scpTicket(ctx, agent.Host, remotePath, ticketJSON); err != nil { + // Transfer ticket JSON. + remoteTicketPath := filepath.Join(agent.QueueDir, ticketName) + if err := h.secureTransfer(ctx, agent, remoteTicketPath, ticketJSON, 0644); err != nil { + h.failDispatch(signal, fmt.Sprintf("Ticket transfer failed: %v", err)) return &jobrunner.ActionResult{ Action: "dispatch", - RepoOwner: signal.RepoOwner, - RepoName: signal.RepoName, + RepoOwner: safeOwner, + RepoName: safeRepo, EpicNumber: signal.EpicNumber, ChildNumber: signal.ChildNumber, Success: false, - Error: fmt.Sprintf("scp ticket: %v", err), + Error: fmt.Sprintf("transfer ticket: %v", err), + Timestamp: time.Now(), + Duration: time.Since(start), + }, nil + } + + // Transfer token via separate .env file with 0600 permissions. + envContent := fmt.Sprintf("FORGE_TOKEN=%s\n", h.token) + remoteEnvPath := filepath.Join(agent.QueueDir, fmt.Sprintf(".env.%s", ticketID)) + if err := h.secureTransfer(ctx, agent, remoteEnvPath, []byte(envContent), 0600); err != nil { + // Clean up the ticket if env transfer fails. + _ = h.runRemote(ctx, agent, fmt.Sprintf("rm -f %s", agentci.EscapeShellArg(remoteTicketPath))) + h.failDispatch(signal, fmt.Sprintf("Token transfer failed: %v", err)) + return &jobrunner.ActionResult{ + Action: "dispatch", + RepoOwner: safeOwner, + RepoName: safeRepo, + EpicNumber: signal.EpicNumber, + ChildNumber: signal.ChildNumber, + Success: false, + Error: fmt.Sprintf("transfer token: %v", err), Timestamp: time.Now(), Duration: time.Since(start), }, nil } // Comment on issue. - comment := fmt.Sprintf("Dispatched to **%s** agent queue.", signal.Assignee) - _ = h.forge.CreateIssueComment(signal.RepoOwner, signal.RepoName, int64(signal.ChildNumber), comment) + modeStr := "Standard" + if runMode == agentci.ModeDual { + modeStr = "Clotho Verified (Dual Run)" + } + comment := fmt.Sprintf("Dispatched to **%s** agent queue.\nMode: **%s**", signal.Assignee, modeStr) + _ = h.forge.CreateIssueComment(safeOwner, safeRepo, int64(signal.ChildNumber), comment) return &jobrunner.ActionResult{ Action: "dispatch", - RepoOwner: signal.RepoOwner, - RepoName: signal.RepoName, + RepoOwner: safeOwner, + RepoName: safeRepo, EpicNumber: signal.EpicNumber, ChildNumber: signal.ChildNumber, Success: true, @@ -165,37 +240,51 @@ func (h *DispatchHandler) Execute(ctx context.Context, signal *jobrunner.Pipelin }, nil } -// scpTicket writes ticket data to a remote path via SSH. -// TODO: Replace exec ssh+cat with charmbracelet/ssh for native Go SSH. -func (h *DispatchHandler) scpTicket(ctx context.Context, host, remotePath string, data []byte) error { - // Use ssh + cat instead of scp for piping stdin. - // TODO: Use charmbracelet/keygen for key management, native Go SSH client for transport. - cmd := exec.CommandContext(ctx, "ssh", - "-o", "StrictHostKeyChecking=accept-new", - "-o", "ConnectTimeout=10", - host, - fmt.Sprintf("cat > %s", remotePath), - ) - cmd.Stdin = strings.NewReader(string(data)) +// failDispatch handles cleanup when dispatch fails (adds failed label, removes in-progress). +func (h *DispatchHandler) failDispatch(signal *jobrunner.PipelineSignal, reason string) { + if failedLabel, err := h.forge.EnsureLabel(signal.RepoOwner, signal.RepoName, LabelAgentFailed, ColorAgentFailed); err == nil { + _ = h.forge.AddIssueLabels(signal.RepoOwner, signal.RepoName, int64(signal.ChildNumber), []int64{failedLabel.ID}) + } + + if inProgressLabel, err := h.forge.GetLabelByName(signal.RepoOwner, signal.RepoName, LabelInProgress); err == nil { + _ = h.forge.RemoveIssueLabel(signal.RepoOwner, signal.RepoName, int64(signal.ChildNumber), inProgressLabel.ID) + } + + _ = h.forge.CreateIssueComment(signal.RepoOwner, signal.RepoName, int64(signal.ChildNumber), fmt.Sprintf("Agent dispatch failed: %s", reason)) +} + +// secureTransfer writes data to a remote path via SSH stdin, preventing command injection. +func (h *DispatchHandler) secureTransfer(ctx context.Context, agent agentci.AgentConfig, remotePath string, data []byte, mode int) error { + safeRemotePath := agentci.EscapeShellArg(remotePath) + remoteCmd := fmt.Sprintf("cat > %s && chmod %o %s", safeRemotePath, mode, safeRemotePath) + + cmd := agentci.SecureSSHCommand(agent.Host, remoteCmd) + cmd.Stdin = bytes.NewReader(data) output, err := cmd.CombinedOutput() if err != nil { - return log.E("dispatch.scp", fmt.Sprintf("ssh to %s failed: %s", host, string(output)), err) + return log.E("dispatch.transfer", fmt.Sprintf("ssh to %s failed: %s", agent.Host, string(output)), err) } return nil } +// runRemote executes a command on the agent via SSH. +func (h *DispatchHandler) runRemote(ctx context.Context, agent agentci.AgentConfig, cmdStr string) error { + cmd := agentci.SecureSSHCommand(agent.Host, cmdStr) + return cmd.Run() +} + // ticketExists checks if a ticket file already exists in queue, active, or done. -// TODO: Replace exec ssh with native Go SSH client (charmbracelet/ssh). -func (h *DispatchHandler) ticketExists(agent AgentTarget, ticketName string) bool { - cmd := exec.Command("ssh", - "-o", "StrictHostKeyChecking=accept-new", - "-o", "ConnectTimeout=10", - agent.Host, - fmt.Sprintf("test -f %s/%s || test -f %s/../active/%s || test -f %s/../done/%s", - agent.QueueDir, ticketName, - agent.QueueDir, ticketName, - agent.QueueDir, ticketName), +func (h *DispatchHandler) ticketExists(ctx context.Context, agent agentci.AgentConfig, ticketName string) bool { + safeTicket, err := agentci.SanitizePath(ticketName) + if err != nil { + return false + } + qDir := agent.QueueDir + checkCmd := fmt.Sprintf( + "test -f %s/%s || test -f %s/../active/%s || test -f %s/../done/%s", + qDir, safeTicket, qDir, safeTicket, qDir, safeTicket, ) + cmd := agentci.SecureSSHCommand(agent.Host, checkCmd) return cmd.Run() == nil } diff --git a/pkg/jobrunner/handlers/dispatch_test.go b/pkg/jobrunner/handlers/dispatch_test.go index 66e82474..f91f312b 100644 --- a/pkg/jobrunner/handlers/dispatch_test.go +++ b/pkg/jobrunner/handlers/dispatch_test.go @@ -7,17 +7,24 @@ import ( "net/http/httptest" "testing" + "github.com/host-uk/core/pkg/agentci" "github.com/host-uk/core/pkg/jobrunner" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) +// newTestSpinner creates a Spinner with the given agents for testing. +func newTestSpinner(agents map[string]agentci.AgentConfig) *agentci.Spinner { + return agentci.NewSpinner(agentci.ClothoConfig{Strategy: "direct"}, agents) +} + // --- Match tests --- func TestDispatch_Match_Good_NeedsCoding(t *testing.T) { - h := NewDispatchHandler(nil, "", "", map[string]AgentTarget{ - "darbs-claude": {Host: "claude@192.168.0.201", QueueDir: "~/ai-work/queue"}, + spinner := newTestSpinner(map[string]agentci.AgentConfig{ + "darbs-claude": {Host: "claude@192.168.0.201", QueueDir: "~/ai-work/queue", Active: true}, }) + h := NewDispatchHandler(nil, "", "", spinner) sig := &jobrunner.PipelineSignal{ NeedsCoding: true, Assignee: "darbs-claude", @@ -26,10 +33,11 @@ func TestDispatch_Match_Good_NeedsCoding(t *testing.T) { } func TestDispatch_Match_Good_MultipleAgents(t *testing.T) { - h := NewDispatchHandler(nil, "", "", map[string]AgentTarget{ - "darbs-claude": {Host: "claude@192.168.0.201", QueueDir: "~/ai-work/queue"}, - "local-codex": {Host: "localhost", QueueDir: "~/ai-work/queue"}, + spinner := newTestSpinner(map[string]agentci.AgentConfig{ + "darbs-claude": {Host: "claude@192.168.0.201", QueueDir: "~/ai-work/queue", Active: true}, + "local-codex": {Host: "localhost", QueueDir: "~/ai-work/queue", Active: true}, }) + h := NewDispatchHandler(nil, "", "", spinner) sig := &jobrunner.PipelineSignal{ NeedsCoding: true, Assignee: "local-codex", @@ -38,9 +46,10 @@ func TestDispatch_Match_Good_MultipleAgents(t *testing.T) { } func TestDispatch_Match_Bad_HasPR(t *testing.T) { - h := NewDispatchHandler(nil, "", "", map[string]AgentTarget{ - "darbs-claude": {Host: "claude@192.168.0.201", QueueDir: "~/ai-work/queue"}, + spinner := newTestSpinner(map[string]agentci.AgentConfig{ + "darbs-claude": {Host: "claude@192.168.0.201", QueueDir: "~/ai-work/queue", Active: true}, }) + h := NewDispatchHandler(nil, "", "", spinner) sig := &jobrunner.PipelineSignal{ NeedsCoding: false, PRNumber: 7, @@ -50,9 +59,10 @@ func TestDispatch_Match_Bad_HasPR(t *testing.T) { } func TestDispatch_Match_Bad_UnknownAgent(t *testing.T) { - h := NewDispatchHandler(nil, "", "", map[string]AgentTarget{ - "darbs-claude": {Host: "claude@192.168.0.201", QueueDir: "~/ai-work/queue"}, + spinner := newTestSpinner(map[string]agentci.AgentConfig{ + "darbs-claude": {Host: "claude@192.168.0.201", QueueDir: "~/ai-work/queue", Active: true}, }) + h := NewDispatchHandler(nil, "", "", spinner) sig := &jobrunner.PipelineSignal{ NeedsCoding: true, Assignee: "unknown-user", @@ -61,9 +71,10 @@ func TestDispatch_Match_Bad_UnknownAgent(t *testing.T) { } func TestDispatch_Match_Bad_NotAssigned(t *testing.T) { - h := NewDispatchHandler(nil, "", "", map[string]AgentTarget{ - "darbs-claude": {Host: "claude@192.168.0.201", QueueDir: "~/ai-work/queue"}, + spinner := newTestSpinner(map[string]agentci.AgentConfig{ + "darbs-claude": {Host: "claude@192.168.0.201", QueueDir: "~/ai-work/queue", Active: true}, }) + h := NewDispatchHandler(nil, "", "", spinner) sig := &jobrunner.PipelineSignal{ NeedsCoding: true, Assignee: "", @@ -72,7 +83,8 @@ func TestDispatch_Match_Bad_NotAssigned(t *testing.T) { } func TestDispatch_Match_Bad_EmptyAgentMap(t *testing.T) { - h := NewDispatchHandler(nil, "", "", map[string]AgentTarget{}) + spinner := newTestSpinner(map[string]agentci.AgentConfig{}) + h := NewDispatchHandler(nil, "", "", spinner) sig := &jobrunner.PipelineSignal{ NeedsCoding: true, Assignee: "darbs-claude", @@ -83,13 +95,12 @@ func TestDispatch_Match_Bad_EmptyAgentMap(t *testing.T) { // --- Name test --- func TestDispatch_Name_Good(t *testing.T) { - h := NewDispatchHandler(nil, "", "", nil) + spinner := newTestSpinner(nil) + h := NewDispatchHandler(nil, "", "", spinner) assert.Equal(t, "dispatch", h.Name()) } // --- Execute tests --- -// Execute calls SSH/SCP which can't be tested in unit tests without the remote. -// These tests verify the ticket construction and error paths that don't need SSH. func TestDispatch_Execute_Bad_UnknownAgent(t *testing.T) { srv := httptest.NewServer(withVersion(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -98,9 +109,10 @@ func TestDispatch_Execute_Bad_UnknownAgent(t *testing.T) { defer srv.Close() client := newTestForgeClient(t, srv.URL) - h := NewDispatchHandler(client, srv.URL, "test-token", map[string]AgentTarget{ - "darbs-claude": {Host: "claude@192.168.0.201", QueueDir: "~/ai-work/queue"}, + spinner := newTestSpinner(map[string]agentci.AgentConfig{ + "darbs-claude": {Host: "claude@192.168.0.201", QueueDir: "~/ai-work/queue", Active: true}, }) + h := NewDispatchHandler(client, srv.URL, "test-token", spinner) sig := &jobrunner.PipelineSignal{ NeedsCoding: true, @@ -116,7 +128,6 @@ func TestDispatch_Execute_Bad_UnknownAgent(t *testing.T) { } func TestDispatch_TicketJSON_Good(t *testing.T) { - // Verify DispatchTicket serializes correctly with all fields. ticket := DispatchTicket{ ID: "host-uk-core-5-1234567890", RepoOwner: "host-uk", @@ -127,17 +138,16 @@ func TestDispatch_TicketJSON_Good(t *testing.T) { TargetBranch: "new", EpicNumber: 3, ForgeURL: "https://forge.lthn.ai", - ForgeToken: "test-token-123", ForgeUser: "darbs-claude", Model: "sonnet", Runner: "claude", + DualRun: false, CreatedAt: "2026-02-09T12:00:00Z", } data, err := json.MarshalIndent(ticket, "", " ") require.NoError(t, err) - // Verify JSON field names. var decoded map[string]any err = json.Unmarshal(data, &decoded) require.NoError(t, err) @@ -151,10 +161,34 @@ func TestDispatch_TicketJSON_Good(t *testing.T) { assert.Equal(t, "new", decoded["target_branch"]) assert.Equal(t, float64(3), decoded["epic_number"]) assert.Equal(t, "https://forge.lthn.ai", decoded["forge_url"]) - assert.Equal(t, "test-token-123", decoded["forge_token"]) assert.Equal(t, "darbs-claude", decoded["forgejo_user"]) assert.Equal(t, "sonnet", decoded["model"]) assert.Equal(t, "claude", decoded["runner"]) + // Token should NOT be present in the ticket. + _, hasToken := decoded["forge_token"] + assert.False(t, hasToken, "forge_token must not be in ticket JSON") +} + +func TestDispatch_TicketJSON_Good_DualRun(t *testing.T) { + ticket := DispatchTicket{ + ID: "test-dual", + RepoOwner: "host-uk", + RepoName: "core", + IssueNumber: 1, + ForgeURL: "https://forge.lthn.ai", + Model: "gemini-2.0-flash", + VerifyModel: "gemini-1.5-pro", + DualRun: true, + } + + data, err := json.Marshal(ticket) + require.NoError(t, err) + + var roundtrip DispatchTicket + err = json.Unmarshal(data, &roundtrip) + require.NoError(t, err) + assert.True(t, roundtrip.DualRun) + assert.Equal(t, "gemini-1.5-pro", roundtrip.VerifyModel) } func TestDispatch_TicketJSON_Good_OmitsEmptyModelRunner(t *testing.T) { @@ -165,13 +199,11 @@ func TestDispatch_TicketJSON_Good_OmitsEmptyModelRunner(t *testing.T) { IssueNumber: 1, TargetBranch: "new", ForgeURL: "https://forge.lthn.ai", - ForgeToken: "tok", } data, err := json.MarshalIndent(ticket, "", " ") require.NoError(t, err) - // Model and runner should be omitted when empty (omitempty tag). var decoded map[string]any err = json.Unmarshal(data, &decoded) require.NoError(t, err) @@ -203,7 +235,6 @@ func TestDispatch_TicketJSON_Good_ModelRunnerVariants(t *testing.T) { IssueNumber: 1, TargetBranch: "new", ForgeURL: "https://forge.lthn.ai", - ForgeToken: "tok", Model: tt.model, Runner: tt.runner, } @@ -221,30 +252,53 @@ func TestDispatch_TicketJSON_Good_ModelRunnerVariants(t *testing.T) { } func TestDispatch_Execute_Good_PostsComment(t *testing.T) { - // This test verifies that Execute attempts to post a comment to the issue. - // SSH/SCP will fail (no remote), but we can verify the comment API call - // by checking if the Forgejo API was hit. var commentPosted bool var commentBody string srv := httptest.NewServer(withVersion(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if r.Method == http.MethodPost && r.URL.Path == "/api/v1/repos/host-uk/core/issues/5/comments" { + w.Header().Set("Content-Type", "application/json") + + switch { + case r.Method == http.MethodGet && r.URL.Path == "/api/v1/repos/host-uk/core/labels": + json.NewEncoder(w).Encode([]any{}) + return + + case r.Method == http.MethodPost && r.URL.Path == "/api/v1/repos/host-uk/core/labels": + json.NewEncoder(w).Encode(map[string]any{"id": 1, "name": "in-progress", "color": "#1d76db"}) + return + + case r.Method == http.MethodGet && r.URL.Path == "/api/v1/repos/host-uk/core/issues/5": + json.NewEncoder(w).Encode(map[string]any{"id": 5, "number": 5, "labels": []any{}, "title": "Test"}) + return + + case r.Method == http.MethodPatch && r.URL.Path == "/api/v1/repos/host-uk/core/issues/5": + json.NewEncoder(w).Encode(map[string]any{"id": 5, "number": 5}) + return + + case r.Method == http.MethodPost && r.URL.Path == "/api/v1/repos/host-uk/core/issues/5/labels": + json.NewEncoder(w).Encode([]any{map[string]any{"id": 1, "name": "in-progress"}}) + return + + case r.Method == http.MethodPost && r.URL.Path == "/api/v1/repos/host-uk/core/issues/5/comments": commentPosted = true var body map[string]string _ = json.NewDecoder(r.Body).Decode(&body) commentBody = body["body"] + json.NewEncoder(w).Encode(map[string]any{"id": 1, "body": body["body"]}) + return } + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(map[string]any{}) }))) defer srv.Close() client := newTestForgeClient(t, srv.URL) - // Use localhost as agent host — ticketExists and scpTicket will fail - // via SSH but we're testing the flow up to the SCP step. - h := NewDispatchHandler(client, srv.URL, "test-token", map[string]AgentTarget{ - "darbs-claude": {Host: "localhost", QueueDir: "/tmp/nonexistent-queue"}, + spinner := newTestSpinner(map[string]agentci.AgentConfig{ + "darbs-claude": {Host: "localhost", QueueDir: "/tmp/nonexistent-queue", Active: true}, }) + h := NewDispatchHandler(client, srv.URL, "test-token", spinner) sig := &jobrunner.PipelineSignal{ NeedsCoding: true, @@ -260,9 +314,6 @@ func TestDispatch_Execute_Good_PostsComment(t *testing.T) { result, err := h.Execute(context.Background(), sig) require.NoError(t, err) - // SSH may fail (no remote), so check for either: - // 1. Success (if SSH happened to work, e.g. localhost) - // 2. SCP error with correct metadata assert.Equal(t, "dispatch", result.Action) assert.Equal(t, "host-uk", result.RepoOwner) assert.Equal(t, "core", result.RepoName) @@ -270,7 +321,6 @@ func TestDispatch_Execute_Good_PostsComment(t *testing.T) { assert.Equal(t, 5, result.ChildNumber) if result.Success { - // If SCP succeeded, comment should have been posted. assert.True(t, commentPosted) assert.Contains(t, commentBody, "darbs-claude") } diff --git a/pkg/jobrunner/types.go b/pkg/jobrunner/types.go index e8d0bd2d..ce51cafe 100644 --- a/pkg/jobrunner/types.go +++ b/pkg/jobrunner/types.go @@ -26,6 +26,10 @@ type PipelineSignal struct { Assignee string // issue assignee username (for dispatch) IssueTitle string // child issue title (for dispatch prompt) IssueBody string // child issue body (for dispatch prompt) + Type string // signal type (e.g., "agent_completion") + Success bool // agent completion success flag + Error string // agent error message + Message string // agent completion message } // RepoFullName returns "owner/repo". diff --git a/scripts/agent-runner.sh b/scripts/agent-runner.sh index 06c99bc7..8f1b3643 100755 --- a/scripts/agent-runner.sh +++ b/scripts/agent-runner.sh @@ -1,5 +1,5 @@ #!/bin/bash -# agent-runner.sh — One-at-a-time queue runner for Claude Code agents. +# agent-runner.sh — Clotho-Verified Queue Runner for AgentCI. # Deployed to agent machines, triggered by cron every 5 minutes. # # Usage: */5 * * * * ~/ai-work/agent-runner.sh >> ~/ai-work/logs/runner.log 2>&1 @@ -26,14 +26,7 @@ if [ -f "$LOCK_FILE" ]; then rm -f "$LOCK_FILE" fi -# --- 2. Check credits --- -# Parse remaining usage from claude. If under 5% remaining, skip. -if command -v claude &>/dev/null; then - USAGE_OUTPUT=$(claude --output-format json -p "Reply with just the word OK" 2>/dev/null | head -1 || echo "") - # Fallback: if we can't check, proceed anyway. -fi - -# --- 3. Pick oldest ticket --- +# --- 2. Pick oldest ticket --- TICKET=$(find "$QUEUE_DIR" -name 'ticket-*.json' -type f 2>/dev/null | sort | head -1) if [ -z "$TICKET" ]; then exit 0 # No work @@ -42,19 +35,24 @@ fi TICKET_BASENAME=$(basename "$TICKET") echo "$(date -Iseconds) Processing ticket: $TICKET_BASENAME" -# --- 4. Lock --- +# --- 3. Lock --- echo $$ > "$LOCK_FILE" cleanup() { rm -f "$LOCK_FILE" + # Secure cleanup of env file if it still exists. + if [ -n "${ENV_FILE:-}" ] && [ -f "$ENV_FILE" ]; then + rm -f "$ENV_FILE" + fi echo "$(date -Iseconds) Lock released." } trap cleanup EXIT -# --- 5. Move to active --- +# --- 4. Move to active --- mv "$TICKET" "$ACTIVE_DIR/" TICKET_FILE="$ACTIVE_DIR/$TICKET_BASENAME" -# --- 6. Extract ticket data --- +# --- 5. Extract ticket data --- +ID=$(jq -r .id "$TICKET_FILE") REPO_OWNER=$(jq -r .repo_owner "$TICKET_FILE") REPO_NAME=$(jq -r .repo_name "$TICKET_FILE") ISSUE_NUM=$(jq -r .issue_number "$TICKET_FILE") @@ -62,10 +60,30 @@ ISSUE_TITLE=$(jq -r .issue_title "$TICKET_FILE") ISSUE_BODY=$(jq -r .issue_body "$TICKET_FILE") TARGET_BRANCH=$(jq -r .target_branch "$TICKET_FILE") FORGE_URL=$(jq -r .forge_url "$TICKET_FILE") -FORGE_TOKEN=$(jq -r .forge_token "$TICKET_FILE") +DUAL_RUN=$(jq -r '.dual_run // false' "$TICKET_FILE") +MODEL=$(jq -r '.model // "sonnet"' "$TICKET_FILE") +RUNNER=$(jq -r '.runner // "claude"' "$TICKET_FILE") +VERIFY_MODEL=$(jq -r '.verify_model // ""' "$TICKET_FILE") echo "$(date -Iseconds) Issue: ${REPO_OWNER}/${REPO_NAME}#${ISSUE_NUM} - ${ISSUE_TITLE}" +# --- 6. Load secure token from .env file --- +ENV_FILE="$QUEUE_DIR/.env.$ID" +if [ -f "$ENV_FILE" ]; then + source "$ENV_FILE" + rm -f "$ENV_FILE" # Delete immediately after sourcing +else + echo "$(date -Iseconds) ERROR: Token file not found for ticket $ID" + mv "$TICKET_FILE" "$DONE_DIR/" + exit 1 +fi + +if [ -z "${FORGE_TOKEN:-}" ]; then + echo "$(date -Iseconds) ERROR: FORGE_TOKEN missing from env file." + mv "$TICKET_FILE" "$DONE_DIR/" + exit 1 +fi + # --- 7. Clone or update repo --- JOB_DIR="$WORK_DIR/jobs/${REPO_OWNER}-${REPO_NAME}-${ISSUE_NUM}" REPO_DIR="$JOB_DIR/$REPO_NAME" @@ -90,8 +108,11 @@ else cd "$REPO_DIR" fi -# --- 8. Build prompt --- -PROMPT="You are working on issue #${ISSUE_NUM} in ${REPO_OWNER}/${REPO_NAME}. +# --- 8. Agent execution function --- +run_agent() { + local model="$1" + local log_suffix="$2" + local prompt="You are working on issue #${ISSUE_NUM} in ${REPO_OWNER}/${REPO_NAME}. Title: ${ISSUE_TITLE} @@ -102,46 +123,76 @@ The repo is cloned at the current directory on branch '${TARGET_BRANCH}'. Create a feature branch from '${TARGET_BRANCH}', make minimal targeted changes, commit referencing #${ISSUE_NUM}, and push. Then create a PR targeting '${TARGET_BRANCH}' using the forgejo MCP tools or git push." -# --- 9. Run AI agent --- -MODEL=$(jq -r '.model // "sonnet"' "$TICKET_FILE") -RUNNER=$(jq -r '.runner // "claude"' "$TICKET_FILE") -LOG_FILE="$LOG_DIR/${REPO_OWNER}-${REPO_NAME}-${ISSUE_NUM}.log" + local log_file="$LOG_DIR/${ID}-${log_suffix}.log" + echo "$(date -Iseconds) Running ${RUNNER} (model: ${model}, suffix: ${log_suffix})..." -echo "$(date -Iseconds) Running ${RUNNER} (model: ${MODEL})..." + case "$RUNNER" in + codex) + codex exec --full-auto "$prompt" > "$log_file" 2>&1 + ;; + gemini) + local model_flag="" + if [ -n "$model" ] && [ "$model" != "sonnet" ]; then + model_flag="-m $model" + fi + echo "$prompt" | gemini -p - -y $model_flag > "$log_file" 2>&1 + ;; + *) + echo "$prompt" | claude -p \ + --model "$model" \ + --dangerously-skip-permissions \ + --output-format text \ + > "$log_file" 2>&1 + ;; + esac + return $? +} -case "$RUNNER" in - codex) - codex exec --full-auto \ - "$PROMPT" \ - > "$LOG_FILE" 2>&1 - ;; - gemini) - MODEL_FLAG="" - if [ -n "$MODEL" ] && [ "$MODEL" != "sonnet" ]; then - MODEL_FLAG="-m $MODEL" - fi - echo "$PROMPT" | gemini -p - -y $MODEL_FLAG \ - > "$LOG_FILE" 2>&1 - ;; - *) - echo "$PROMPT" | claude -p \ - --model "$MODEL" \ - --dangerously-skip-permissions \ - --output-format text \ - > "$LOG_FILE" 2>&1 - ;; -esac -EXIT_CODE=$? -echo "$(date -Iseconds) ${RUNNER} exited with code: $EXIT_CODE" +# --- 9. Execute --- +run_agent "$MODEL" "primary" +EXIT_CODE_A=$? + +FINAL_EXIT=$EXIT_CODE_A +COMMENT="" + +if [ "$DUAL_RUN" = "true" ] && [ -n "$VERIFY_MODEL" ]; then + echo "$(date -Iseconds) Clotho Dual Run: resetting for verifier..." + HASH_A=$(git rev-parse HEAD) + git checkout "$TARGET_BRANCH" 2>/dev/null || true + + run_agent "$VERIFY_MODEL" "verifier" + EXIT_CODE_B=$? + HASH_B=$(git rev-parse HEAD) + + # Compare the two runs. + echo "$(date -Iseconds) Comparing threads..." + DIFF_COUNT=$(git diff --shortstat "$HASH_A" "$HASH_B" 2>/dev/null | wc -l || echo "1") + + if [ "$DIFF_COUNT" -eq 0 ] && [ "$EXIT_CODE_A" -eq 0 ] && [ "$EXIT_CODE_B" -eq 0 ]; then + echo "$(date -Iseconds) Clotho Verification: Threads converged." + FINAL_EXIT=0 + git checkout "$HASH_A" 2>/dev/null + git push origin "HEAD:refs/heads/feat/issue-${ISSUE_NUM}" + else + echo "$(date -Iseconds) Clotho Verification: Divergence detected." + FINAL_EXIT=1 + COMMENT="**Clotho Verification Failed**\n\nPrimary ($MODEL) and Verifier ($VERIFY_MODEL) produced divergent results.\nPrimary Exit: $EXIT_CODE_A | Verifier Exit: $EXIT_CODE_B" + fi +else + # Standard single run — push if successful. + if [ $FINAL_EXIT -eq 0 ]; then + git push origin "HEAD:refs/heads/feat/issue-${ISSUE_NUM}" 2>/dev/null || true + fi +fi # --- 10. Move to done --- mv "$TICKET_FILE" "$DONE_DIR/" # --- 11. Report result back to Forgejo --- -if [ $EXIT_CODE -eq 0 ]; then +if [ $FINAL_EXIT -eq 0 ] && [ -z "$COMMENT" ]; then COMMENT="Agent completed work on #${ISSUE_NUM}. Exit code: 0." -else - COMMENT="Agent failed on #${ISSUE_NUM} (exit code: ${EXIT_CODE}). Check logs on agent machine." +elif [ -z "$COMMENT" ]; then + COMMENT="Agent failed on #${ISSUE_NUM} (exit code: ${FINAL_EXIT}). Check logs on agent machine." fi curl -s -X POST "${FORGE_URL}/api/v1/repos/${REPO_OWNER}/${REPO_NAME}/issues/${ISSUE_NUM}/comments" \ @@ -150,4 +201,4 @@ curl -s -X POST "${FORGE_URL}/api/v1/repos/${REPO_OWNER}/${REPO_NAME}/issues/${I -d "$(jq -n --arg body "$COMMENT" '{body: $body}')" \ > /dev/null 2>&1 || true -echo "$(date -Iseconds) Done: $TICKET_BASENAME (exit: $EXIT_CODE)" +echo "$(date -Iseconds) Done: $TICKET_BASENAME (exit: $FINAL_EXIT)" diff --git a/scripts/agent-setup.sh b/scripts/agent-setup.sh index 70ecacc7..a2ec4032 100755 --- a/scripts/agent-setup.sh +++ b/scripts/agent-setup.sh @@ -8,7 +8,7 @@ set -euo pipefail HOST="${1:?Usage: agent-setup.sh }" -SSH_OPTS="-o StrictHostKeyChecking=accept-new -o ConnectTimeout=10" +SSH_OPTS="-o StrictHostKeyChecking=yes -o BatchMode=yes -o ConnectTimeout=10" SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)" RUNNER_SCRIPT="${SCRIPT_DIR}/agent-runner.sh"