feat: restore agentci, git, and jobrunner packages

These packages are still imported by the CLI. Reverting the premature
extraction so forge consumers can resolve them.

Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
Snider 2026-02-21 19:27:35 +00:00
parent 8527302df1
commit 5bfafcd6fc
29 changed files with 4139 additions and 0 deletions

87
agentci/clotho.go Normal file
View file

@ -0,0 +1,87 @@
package agentci
import (
"context"
"strings"
"forge.lthn.ai/core/go-scm/jobrunner"
)
// RunMode determines the execution strategy for a dispatched task.
type RunMode string
const (
ModeStandard RunMode = "standard"
ModeDual RunMode = "dual" // The Clotho Protocol — dual-run verification
)
// Spinner is the Clotho orchestrator that determines the fate of each task.
type Spinner struct {
Config ClothoConfig
Agents map[string]AgentConfig
}
// NewSpinner creates a new Clotho orchestrator.
func NewSpinner(cfg ClothoConfig, agents map[string]AgentConfig) *Spinner {
return &Spinner{
Config: cfg,
Agents: agents,
}
}
// DeterminePlan decides if a signal requires dual-run verification based on
// the global strategy, agent configuration, and repository criticality.
func (s *Spinner) DeterminePlan(signal *jobrunner.PipelineSignal, agentName string) RunMode {
if s.Config.Strategy != "clotho-verified" {
return ModeStandard
}
agent, ok := s.Agents[agentName]
if !ok {
return ModeStandard
}
if agent.DualRun {
return ModeDual
}
// Protect critical repos with dual-run (Axiom 1).
if signal.RepoName == "core" || strings.Contains(signal.RepoName, "security") {
return ModeDual
}
return ModeStandard
}
// GetVerifierModel returns the model for the secondary "signed" verification run.
func (s *Spinner) GetVerifierModel(agentName string) string {
agent, ok := s.Agents[agentName]
if !ok || agent.VerifyModel == "" {
return "gemini-1.5-pro"
}
return agent.VerifyModel
}
// FindByForgejoUser resolves a Forgejo username to the agent config key and config.
// This decouples agent naming (mythological roles) from Forgejo identity.
func (s *Spinner) FindByForgejoUser(forgejoUser string) (string, AgentConfig, bool) {
if forgejoUser == "" {
return "", AgentConfig{}, false
}
// Direct match on config key first.
if agent, ok := s.Agents[forgejoUser]; ok {
return forgejoUser, agent, true
}
// Search by ForgejoUser field.
for name, agent := range s.Agents {
if agent.ForgejoUser != "" && agent.ForgejoUser == forgejoUser {
return name, agent, true
}
}
return "", AgentConfig{}, false
}
// Weave compares primary and verifier outputs. Returns true if they converge.
// This is a placeholder for future semantic diff logic.
func (s *Spinner) Weave(ctx context.Context, primaryOutput, signedOutput []byte) (bool, error) {
return string(primaryOutput) == string(signedOutput), nil
}

144
agentci/config.go Normal file
View file

@ -0,0 +1,144 @@
// Package agentci provides configuration, security, and orchestration for AgentCI dispatch targets.
package agentci
import (
"fmt"
"forge.lthn.ai/core/go/pkg/config"
)
// AgentConfig represents a single agent machine in the config file.
type AgentConfig struct {
Host string `yaml:"host" mapstructure:"host"`
QueueDir string `yaml:"queue_dir" mapstructure:"queue_dir"`
ForgejoUser string `yaml:"forgejo_user" mapstructure:"forgejo_user"`
Model string `yaml:"model" mapstructure:"model"` // primary AI model
Runner string `yaml:"runner" mapstructure:"runner"` // runner binary: claude, codex, gemini
VerifyModel string `yaml:"verify_model" mapstructure:"verify_model"` // secondary model for dual-run
SecurityLevel string `yaml:"security_level" mapstructure:"security_level"` // low, high
Roles []string `yaml:"roles" mapstructure:"roles"`
DualRun bool `yaml:"dual_run" mapstructure:"dual_run"`
Active bool `yaml:"active" mapstructure:"active"`
}
// ClothoConfig controls the orchestration strategy.
type ClothoConfig struct {
Strategy string `yaml:"strategy" mapstructure:"strategy"` // direct, clotho-verified
ValidationThreshold float64 `yaml:"validation_threshold" mapstructure:"validation_threshold"` // divergence limit (0.0-1.0)
SigningKeyPath string `yaml:"signing_key_path" mapstructure:"signing_key_path"`
}
// LoadAgents reads agent targets from config and returns a map of AgentConfig.
// Returns an empty map (not an error) if no agents are configured.
func LoadAgents(cfg *config.Config) (map[string]AgentConfig, error) {
var agents map[string]AgentConfig
if err := cfg.Get("agentci.agents", &agents); err != nil {
return map[string]AgentConfig{}, nil
}
// Validate and apply defaults.
for name, ac := range agents {
if !ac.Active {
continue
}
if ac.Host == "" {
return nil, fmt.Errorf("agent %q: host is required", name)
}
if ac.QueueDir == "" {
ac.QueueDir = "/home/claude/ai-work/queue"
}
if ac.Model == "" {
ac.Model = "sonnet"
}
if ac.Runner == "" {
ac.Runner = "claude"
}
agents[name] = ac
}
return agents, nil
}
// LoadActiveAgents returns only active agents.
func LoadActiveAgents(cfg *config.Config) (map[string]AgentConfig, error) {
all, err := LoadAgents(cfg)
if err != nil {
return nil, err
}
active := make(map[string]AgentConfig)
for name, ac := range all {
if ac.Active {
active[name] = ac
}
}
return active, nil
}
// LoadClothoConfig loads the Clotho orchestrator settings.
// Returns sensible defaults if no config is present.
func LoadClothoConfig(cfg *config.Config) (ClothoConfig, error) {
var cc ClothoConfig
if err := cfg.Get("agentci.clotho", &cc); err != nil {
return ClothoConfig{
Strategy: "direct",
ValidationThreshold: 0.85,
}, nil
}
if cc.Strategy == "" {
cc.Strategy = "direct"
}
if cc.ValidationThreshold == 0 {
cc.ValidationThreshold = 0.85
}
return cc, nil
}
// SaveAgent writes an agent config entry to the config file.
func SaveAgent(cfg *config.Config, name string, ac AgentConfig) error {
key := fmt.Sprintf("agentci.agents.%s", name)
data := map[string]any{
"host": ac.Host,
"queue_dir": ac.QueueDir,
"forgejo_user": ac.ForgejoUser,
"active": ac.Active,
"dual_run": ac.DualRun,
}
if ac.Model != "" {
data["model"] = ac.Model
}
if ac.Runner != "" {
data["runner"] = ac.Runner
}
if ac.VerifyModel != "" {
data["verify_model"] = ac.VerifyModel
}
if ac.SecurityLevel != "" {
data["security_level"] = ac.SecurityLevel
}
if len(ac.Roles) > 0 {
data["roles"] = ac.Roles
}
return cfg.Set(key, data)
}
// RemoveAgent removes an agent from the config file.
func RemoveAgent(cfg *config.Config, name string) error {
var agents map[string]AgentConfig
if err := cfg.Get("agentci.agents", &agents); err != nil {
return fmt.Errorf("no agents configured")
}
if _, ok := agents[name]; !ok {
return fmt.Errorf("agent %q not found", name)
}
delete(agents, name)
return cfg.Set("agentci.agents", agents)
}
// ListAgents returns all configured agents (active and inactive).
func ListAgents(cfg *config.Config) (map[string]AgentConfig, error) {
var agents map[string]AgentConfig
if err := cfg.Get("agentci.agents", &agents); err != nil {
return map[string]AgentConfig{}, nil
}
return agents, nil
}

329
agentci/config_test.go Normal file
View file

@ -0,0 +1,329 @@
package agentci
import (
"testing"
"forge.lthn.ai/core/go/pkg/config"
"forge.lthn.ai/core/go/pkg/io"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func newTestConfig(t *testing.T, yaml string) *config.Config {
t.Helper()
m := io.NewMockMedium()
if yaml != "" {
m.Files["/tmp/test/config.yaml"] = yaml
}
cfg, err := config.New(config.WithMedium(m), config.WithPath("/tmp/test/config.yaml"))
require.NoError(t, err)
return cfg
}
func TestLoadAgents_Good(t *testing.T) {
cfg := newTestConfig(t, `
agentci:
agents:
darbs-claude:
host: claude@192.168.0.201
queue_dir: /home/claude/ai-work/queue
forgejo_user: darbs-claude
model: sonnet
runner: claude
active: true
`)
agents, err := LoadAgents(cfg)
require.NoError(t, err)
require.Len(t, agents, 1)
agent := agents["darbs-claude"]
assert.Equal(t, "claude@192.168.0.201", agent.Host)
assert.Equal(t, "/home/claude/ai-work/queue", agent.QueueDir)
assert.Equal(t, "sonnet", agent.Model)
assert.Equal(t, "claude", agent.Runner)
}
func TestLoadAgents_Good_MultipleAgents(t *testing.T) {
cfg := newTestConfig(t, `
agentci:
agents:
darbs-claude:
host: claude@192.168.0.201
queue_dir: /home/claude/ai-work/queue
active: true
local-codex:
host: localhost
queue_dir: /home/claude/ai-work/queue
runner: codex
active: true
`)
agents, err := LoadAgents(cfg)
require.NoError(t, err)
assert.Len(t, agents, 2)
assert.Contains(t, agents, "darbs-claude")
assert.Contains(t, agents, "local-codex")
}
func TestLoadAgents_Good_SkipsInactive(t *testing.T) {
cfg := newTestConfig(t, `
agentci:
agents:
active-agent:
host: claude@10.0.0.1
active: true
offline-agent:
host: claude@10.0.0.2
active: false
`)
agents, err := LoadAgents(cfg)
require.NoError(t, err)
// Both are returned, but only active-agent has defaults applied.
assert.Len(t, agents, 2)
assert.Contains(t, agents, "active-agent")
}
func TestLoadActiveAgents_Good(t *testing.T) {
cfg := newTestConfig(t, `
agentci:
agents:
active-agent:
host: claude@10.0.0.1
active: true
offline-agent:
host: claude@10.0.0.2
active: false
`)
active, err := LoadActiveAgents(cfg)
require.NoError(t, err)
assert.Len(t, active, 1)
assert.Contains(t, active, "active-agent")
}
func TestLoadAgents_Good_Defaults(t *testing.T) {
cfg := newTestConfig(t, `
agentci:
agents:
minimal:
host: claude@10.0.0.1
active: true
`)
agents, err := LoadAgents(cfg)
require.NoError(t, err)
require.Len(t, agents, 1)
agent := agents["minimal"]
assert.Equal(t, "/home/claude/ai-work/queue", agent.QueueDir)
assert.Equal(t, "sonnet", agent.Model)
assert.Equal(t, "claude", agent.Runner)
}
func TestLoadAgents_Good_NoConfig(t *testing.T) {
cfg := newTestConfig(t, "")
agents, err := LoadAgents(cfg)
require.NoError(t, err)
assert.Empty(t, agents)
}
func TestLoadAgents_Bad_MissingHost(t *testing.T) {
cfg := newTestConfig(t, `
agentci:
agents:
broken:
queue_dir: /tmp
active: true
`)
_, err := LoadAgents(cfg)
assert.Error(t, err)
assert.Contains(t, err.Error(), "host is required")
}
func TestLoadAgents_Good_WithDualRun(t *testing.T) {
cfg := newTestConfig(t, `
agentci:
agents:
gemini-agent:
host: localhost
runner: gemini
model: gemini-2.0-flash
verify_model: gemini-1.5-pro
dual_run: true
active: true
`)
agents, err := LoadAgents(cfg)
require.NoError(t, err)
agent := agents["gemini-agent"]
assert.Equal(t, "gemini", agent.Runner)
assert.Equal(t, "gemini-2.0-flash", agent.Model)
assert.Equal(t, "gemini-1.5-pro", agent.VerifyModel)
assert.True(t, agent.DualRun)
}
func TestLoadClothoConfig_Good(t *testing.T) {
cfg := newTestConfig(t, `
agentci:
clotho:
strategy: clotho-verified
validation_threshold: 0.9
signing_key_path: /etc/core/keys/clotho.pub
`)
cc, err := LoadClothoConfig(cfg)
require.NoError(t, err)
assert.Equal(t, "clotho-verified", cc.Strategy)
assert.Equal(t, 0.9, cc.ValidationThreshold)
assert.Equal(t, "/etc/core/keys/clotho.pub", cc.SigningKeyPath)
}
func TestLoadClothoConfig_Good_Defaults(t *testing.T) {
cfg := newTestConfig(t, "")
cc, err := LoadClothoConfig(cfg)
require.NoError(t, err)
assert.Equal(t, "direct", cc.Strategy)
assert.Equal(t, 0.85, cc.ValidationThreshold)
}
func TestSaveAgent_Good(t *testing.T) {
cfg := newTestConfig(t, "")
err := SaveAgent(cfg, "new-agent", AgentConfig{
Host: "claude@10.0.0.5",
QueueDir: "/home/claude/ai-work/queue",
ForgejoUser: "new-agent",
Model: "haiku",
Runner: "claude",
Active: true,
})
require.NoError(t, err)
agents, err := ListAgents(cfg)
require.NoError(t, err)
require.Contains(t, agents, "new-agent")
assert.Equal(t, "claude@10.0.0.5", agents["new-agent"].Host)
assert.Equal(t, "haiku", agents["new-agent"].Model)
}
func TestSaveAgent_Good_WithDualRun(t *testing.T) {
cfg := newTestConfig(t, "")
err := SaveAgent(cfg, "verified-agent", AgentConfig{
Host: "claude@10.0.0.5",
Model: "gemini-2.0-flash",
VerifyModel: "gemini-1.5-pro",
DualRun: true,
Active: true,
})
require.NoError(t, err)
agents, err := ListAgents(cfg)
require.NoError(t, err)
require.Contains(t, agents, "verified-agent")
assert.True(t, agents["verified-agent"].DualRun)
}
func TestSaveAgent_Good_OmitsEmptyOptionals(t *testing.T) {
cfg := newTestConfig(t, "")
err := SaveAgent(cfg, "minimal", AgentConfig{
Host: "claude@10.0.0.1",
Active: true,
})
require.NoError(t, err)
agents, err := ListAgents(cfg)
require.NoError(t, err)
assert.Contains(t, agents, "minimal")
}
func TestRemoveAgent_Good(t *testing.T) {
cfg := newTestConfig(t, `
agentci:
agents:
to-remove:
host: claude@10.0.0.1
active: true
to-keep:
host: claude@10.0.0.2
active: true
`)
err := RemoveAgent(cfg, "to-remove")
require.NoError(t, err)
agents, err := ListAgents(cfg)
require.NoError(t, err)
assert.NotContains(t, agents, "to-remove")
assert.Contains(t, agents, "to-keep")
}
func TestRemoveAgent_Bad_NotFound(t *testing.T) {
cfg := newTestConfig(t, `
agentci:
agents:
existing:
host: claude@10.0.0.1
active: true
`)
err := RemoveAgent(cfg, "nonexistent")
assert.Error(t, err)
assert.Contains(t, err.Error(), "not found")
}
func TestRemoveAgent_Bad_NoAgents(t *testing.T) {
cfg := newTestConfig(t, "")
err := RemoveAgent(cfg, "anything")
assert.Error(t, err)
assert.Contains(t, err.Error(), "no agents configured")
}
func TestListAgents_Good(t *testing.T) {
cfg := newTestConfig(t, `
agentci:
agents:
agent-a:
host: claude@10.0.0.1
active: true
agent-b:
host: claude@10.0.0.2
active: false
`)
agents, err := ListAgents(cfg)
require.NoError(t, err)
assert.Len(t, agents, 2)
assert.True(t, agents["agent-a"].Active)
assert.False(t, agents["agent-b"].Active)
}
func TestListAgents_Good_Empty(t *testing.T) {
cfg := newTestConfig(t, "")
agents, err := ListAgents(cfg)
require.NoError(t, err)
assert.Empty(t, agents)
}
func TestRoundTrip_SaveThenLoad(t *testing.T) {
cfg := newTestConfig(t, "")
err := SaveAgent(cfg, "alpha", AgentConfig{
Host: "claude@alpha",
QueueDir: "/home/claude/work/queue",
ForgejoUser: "alpha-bot",
Model: "opus",
Runner: "claude",
Active: true,
})
require.NoError(t, err)
err = SaveAgent(cfg, "beta", AgentConfig{
Host: "claude@beta",
ForgejoUser: "beta-bot",
Runner: "codex",
Active: true,
})
require.NoError(t, err)
agents, err := LoadActiveAgents(cfg)
require.NoError(t, err)
assert.Len(t, agents, 2)
assert.Equal(t, "claude@alpha", agents["alpha"].Host)
assert.Equal(t, "opus", agents["alpha"].Model)
assert.Equal(t, "codex", agents["beta"].Runner)
}

49
agentci/security.go Normal file
View file

@ -0,0 +1,49 @@
package agentci
import (
"fmt"
"os/exec"
"path/filepath"
"regexp"
"strings"
)
var safeNameRegex = regexp.MustCompile(`^[a-zA-Z0-9\-\_\.]+$`)
// SanitizePath ensures a filename or directory name is safe and prevents path traversal.
// Returns filepath.Base of the input after validation.
func SanitizePath(input string) (string, error) {
base := filepath.Base(input)
if !safeNameRegex.MatchString(base) {
return "", fmt.Errorf("invalid characters in path element: %s", input)
}
if base == "." || base == ".." || base == "/" {
return "", fmt.Errorf("invalid path element: %s", base)
}
return base, nil
}
// EscapeShellArg wraps a string in single quotes for safe remote shell insertion.
// Prefer exec.Command arguments over constructing shell strings where possible.
func EscapeShellArg(arg string) string {
return "'" + strings.ReplaceAll(arg, "'", "'\\''") + "'"
}
// SecureSSHCommand creates an SSH exec.Cmd with strict host key checking and batch mode.
func SecureSSHCommand(host string, remoteCmd string) *exec.Cmd {
return exec.Command("ssh",
"-o", "StrictHostKeyChecking=yes",
"-o", "BatchMode=yes",
"-o", "ConnectTimeout=10",
host,
remoteCmd,
)
}
// MaskToken returns a masked version of a token for safe logging.
func MaskToken(token string) string {
if len(token) < 8 {
return "*****"
}
return token[:4] + "****" + token[len(token)-4:]
}

265
git/git.go Normal file
View file

@ -0,0 +1,265 @@
// Package git provides utilities for git operations across multiple repositories.
package git
import (
"bytes"
"context"
"io"
"os"
"os/exec"
"strconv"
"strings"
"sync"
)
// RepoStatus represents the git status of a single repository.
type RepoStatus struct {
Name string
Path string
Modified int
Untracked int
Staged int
Ahead int
Behind int
Branch string
Error error
}
// IsDirty returns true if there are uncommitted changes.
func (s *RepoStatus) IsDirty() bool {
return s.Modified > 0 || s.Untracked > 0 || s.Staged > 0
}
// HasUnpushed returns true if there are commits to push.
func (s *RepoStatus) HasUnpushed() bool {
return s.Ahead > 0
}
// HasUnpulled returns true if there are commits to pull.
func (s *RepoStatus) HasUnpulled() bool {
return s.Behind > 0
}
// StatusOptions configures the status check.
type StatusOptions struct {
// Paths is a list of repo paths to check
Paths []string
// Names maps paths to display names
Names map[string]string
}
// Status checks git status for multiple repositories in parallel.
func Status(ctx context.Context, opts StatusOptions) []RepoStatus {
var wg sync.WaitGroup
results := make([]RepoStatus, len(opts.Paths))
for i, path := range opts.Paths {
wg.Add(1)
go func(idx int, repoPath string) {
defer wg.Done()
name := opts.Names[repoPath]
if name == "" {
name = repoPath
}
results[idx] = getStatus(ctx, repoPath, name)
}(i, path)
}
wg.Wait()
return results
}
// getStatus gets the git status for a single repository.
func getStatus(ctx context.Context, path, name string) RepoStatus {
status := RepoStatus{
Name: name,
Path: path,
}
// Get current branch
branch, err := gitCommand(ctx, path, "rev-parse", "--abbrev-ref", "HEAD")
if err != nil {
status.Error = err
return status
}
status.Branch = strings.TrimSpace(branch)
// Get porcelain status
porcelain, err := gitCommand(ctx, path, "status", "--porcelain")
if err != nil {
status.Error = err
return status
}
// Parse status output
for _, line := range strings.Split(porcelain, "\n") {
if len(line) < 2 {
continue
}
x, y := line[0], line[1]
// Untracked
if x == '?' && y == '?' {
status.Untracked++
continue
}
// Staged (index has changes)
if x == 'A' || x == 'D' || x == 'R' || x == 'M' {
status.Staged++
}
// Modified in working tree
if y == 'M' || y == 'D' {
status.Modified++
}
}
// Get ahead/behind counts
ahead, behind := getAheadBehind(ctx, path)
status.Ahead = ahead
status.Behind = behind
return status
}
// getAheadBehind returns the number of commits ahead and behind upstream.
func getAheadBehind(ctx context.Context, path string) (ahead, behind int) {
// Try to get ahead count
aheadStr, err := gitCommand(ctx, path, "rev-list", "--count", "@{u}..HEAD")
if err == nil {
ahead, _ = strconv.Atoi(strings.TrimSpace(aheadStr))
}
// Try to get behind count
behindStr, err := gitCommand(ctx, path, "rev-list", "--count", "HEAD..@{u}")
if err == nil {
behind, _ = strconv.Atoi(strings.TrimSpace(behindStr))
}
return ahead, behind
}
// Push pushes commits for a single repository.
// Uses interactive mode to support SSH passphrase prompts.
func Push(ctx context.Context, path string) error {
return gitInteractive(ctx, path, "push")
}
// Pull pulls changes for a single repository.
// Uses interactive mode to support SSH passphrase prompts.
func Pull(ctx context.Context, path string) error {
return gitInteractive(ctx, path, "pull", "--rebase")
}
// IsNonFastForward checks if an error is a non-fast-forward rejection.
func IsNonFastForward(err error) bool {
if err == nil {
return false
}
msg := err.Error()
return strings.Contains(msg, "non-fast-forward") ||
strings.Contains(msg, "fetch first") ||
strings.Contains(msg, "tip of your current branch is behind")
}
// gitInteractive runs a git command with terminal attached for user interaction.
func gitInteractive(ctx context.Context, dir string, args ...string) error {
cmd := exec.CommandContext(ctx, "git", args...)
cmd.Dir = dir
// Connect to terminal for SSH passphrase prompts
cmd.Stdin = os.Stdin
cmd.Stdout = os.Stdout
// Capture stderr for error reporting while also showing it
var stderr bytes.Buffer
cmd.Stderr = io.MultiWriter(os.Stderr, &stderr)
if err := cmd.Run(); err != nil {
if stderr.Len() > 0 {
return &GitError{Err: err, Stderr: stderr.String()}
}
return err
}
return nil
}
// PushResult represents the result of a push operation.
type PushResult struct {
Name string
Path string
Success bool
Error error
}
// PushMultiple pushes multiple repositories sequentially.
// Sequential because SSH passphrase prompts need user interaction.
func PushMultiple(ctx context.Context, paths []string, names map[string]string) []PushResult {
results := make([]PushResult, len(paths))
for i, path := range paths {
name := names[path]
if name == "" {
name = path
}
result := PushResult{
Name: name,
Path: path,
}
err := Push(ctx, path)
if err != nil {
result.Error = err
} else {
result.Success = true
}
results[i] = result
}
return results
}
// gitCommand runs a git command and returns stdout.
func gitCommand(ctx context.Context, dir string, args ...string) (string, error) {
cmd := exec.CommandContext(ctx, "git", args...)
cmd.Dir = dir
var stdout, stderr bytes.Buffer
cmd.Stdout = &stdout
cmd.Stderr = &stderr
if err := cmd.Run(); err != nil {
// Include stderr in error message for better diagnostics
if stderr.Len() > 0 {
return "", &GitError{Err: err, Stderr: stderr.String()}
}
return "", err
}
return stdout.String(), nil
}
// GitError wraps a git command error with stderr output.
type GitError struct {
Err error
Stderr string
}
// Error returns the git error message, preferring stderr output.
func (e *GitError) Error() string {
// Return just the stderr message, trimmed
msg := strings.TrimSpace(e.Stderr)
if msg != "" {
return msg
}
return e.Err.Error()
}
// Unwrap returns the underlying error for error chain inspection.
func (e *GitError) Unwrap() error {
return e.Err
}

126
git/service.go Normal file
View file

@ -0,0 +1,126 @@
package git
import (
"context"
"forge.lthn.ai/core/go/pkg/framework"
)
// Queries for git service
// QueryStatus requests git status for paths.
type QueryStatus struct {
Paths []string
Names map[string]string
}
// QueryDirtyRepos requests repos with uncommitted changes.
type QueryDirtyRepos struct{}
// QueryAheadRepos requests repos with unpushed commits.
type QueryAheadRepos struct{}
// Tasks for git service
// TaskPush requests git push for a path.
type TaskPush struct {
Path string
Name string
}
// TaskPull requests git pull for a path.
type TaskPull struct {
Path string
Name string
}
// TaskPushMultiple requests git push for multiple paths.
type TaskPushMultiple struct {
Paths []string
Names map[string]string
}
// ServiceOptions for configuring the git service.
type ServiceOptions struct {
WorkDir string
}
// Service provides git operations as a Core service.
type Service struct {
*framework.ServiceRuntime[ServiceOptions]
lastStatus []RepoStatus
}
// NewService creates a git service factory.
func NewService(opts ServiceOptions) func(*framework.Core) (any, error) {
return func(c *framework.Core) (any, error) {
return &Service{
ServiceRuntime: framework.NewServiceRuntime(c, opts),
}, nil
}
}
// OnStartup registers query and task handlers.
func (s *Service) OnStartup(ctx context.Context) error {
s.Core().RegisterQuery(s.handleQuery)
s.Core().RegisterTask(s.handleTask)
return nil
}
func (s *Service) handleQuery(c *framework.Core, q framework.Query) (any, bool, error) {
switch m := q.(type) {
case QueryStatus:
statuses := Status(context.Background(), StatusOptions(m))
s.lastStatus = statuses
return statuses, true, nil
case QueryDirtyRepos:
return s.DirtyRepos(), true, nil
case QueryAheadRepos:
return s.AheadRepos(), true, nil
}
return nil, false, nil
}
func (s *Service) handleTask(c *framework.Core, t framework.Task) (any, bool, error) {
switch m := t.(type) {
case TaskPush:
err := Push(context.Background(), m.Path)
return nil, true, err
case TaskPull:
err := Pull(context.Background(), m.Path)
return nil, true, err
case TaskPushMultiple:
results := PushMultiple(context.Background(), m.Paths, m.Names)
return results, true, nil
}
return nil, false, nil
}
// Status returns last status result.
func (s *Service) Status() []RepoStatus { return s.lastStatus }
// DirtyRepos returns repos with uncommitted changes.
func (s *Service) DirtyRepos() []RepoStatus {
var dirty []RepoStatus
for _, st := range s.lastStatus {
if st.Error == nil && st.IsDirty() {
dirty = append(dirty, st)
}
}
return dirty
}
// AheadRepos returns repos with unpushed commits.
func (s *Service) AheadRepos() []RepoStatus {
var ahead []RepoStatus
for _, st := range s.lastStatus {
if st.Error == nil && st.HasUnpushed() {
ahead = append(ahead, st)
}
}
return ahead
}

View file

@ -0,0 +1,114 @@
package forgejo
import (
"regexp"
"strconv"
forgejosdk "codeberg.org/mvdkleijn/forgejo-sdk/forgejo/v2"
"forge.lthn.ai/core/go-scm/jobrunner"
)
// epicChildRe matches checklist items: - [ ] #42 or - [x] #42
var epicChildRe = regexp.MustCompile(`- \[([ x])\] #(\d+)`)
// parseEpicChildren extracts child issue numbers from an epic body's checklist.
func parseEpicChildren(body string) (unchecked []int, checked []int) {
matches := epicChildRe.FindAllStringSubmatch(body, -1)
for _, m := range matches {
num, err := strconv.Atoi(m[2])
if err != nil {
continue
}
if m[1] == "x" {
checked = append(checked, num)
} else {
unchecked = append(unchecked, num)
}
}
return unchecked, checked
}
// linkedPRRe matches "#N" references in PR bodies.
var linkedPRRe = regexp.MustCompile(`#(\d+)`)
// findLinkedPR finds the first PR whose body references the given issue number.
func findLinkedPR(prs []*forgejosdk.PullRequest, issueNumber int) *forgejosdk.PullRequest {
target := strconv.Itoa(issueNumber)
for _, pr := range prs {
matches := linkedPRRe.FindAllStringSubmatch(pr.Body, -1)
for _, m := range matches {
if m[1] == target {
return pr
}
}
}
return nil
}
// mapPRState maps Forgejo's PR state and merged flag to a canonical string.
func mapPRState(pr *forgejosdk.PullRequest) string {
if pr.HasMerged {
return "MERGED"
}
switch pr.State {
case forgejosdk.StateOpen:
return "OPEN"
case forgejosdk.StateClosed:
return "CLOSED"
default:
return "CLOSED"
}
}
// mapMergeable maps Forgejo's boolean Mergeable field to a canonical string.
func mapMergeable(pr *forgejosdk.PullRequest) string {
if pr.HasMerged {
return "UNKNOWN"
}
if pr.Mergeable {
return "MERGEABLE"
}
return "CONFLICTING"
}
// mapCombinedStatus maps a Forgejo CombinedStatus to SUCCESS/FAILURE/PENDING.
func mapCombinedStatus(cs *forgejosdk.CombinedStatus) string {
if cs == nil || cs.TotalCount == 0 {
return "PENDING"
}
switch cs.State {
case forgejosdk.StatusSuccess:
return "SUCCESS"
case forgejosdk.StatusFailure, forgejosdk.StatusError:
return "FAILURE"
default:
return "PENDING"
}
}
// buildSignal creates a PipelineSignal from Forgejo API data.
func buildSignal(
owner, repo string,
epicNumber, childNumber int,
pr *forgejosdk.PullRequest,
checkStatus string,
) *jobrunner.PipelineSignal {
sig := &jobrunner.PipelineSignal{
EpicNumber: epicNumber,
ChildNumber: childNumber,
PRNumber: int(pr.Index),
RepoOwner: owner,
RepoName: repo,
PRState: mapPRState(pr),
IsDraft: false, // SDK v2.2.0 doesn't expose Draft; treat as non-draft
Mergeable: mapMergeable(pr),
CheckStatus: checkStatus,
}
if pr.Head != nil {
sig.LastCommitSHA = pr.Head.Sha
}
return sig
}

173
jobrunner/forgejo/source.go Normal file
View file

@ -0,0 +1,173 @@
package forgejo
import (
"context"
"fmt"
"strings"
"forge.lthn.ai/core/go-scm/forge"
"forge.lthn.ai/core/go-scm/jobrunner"
"forge.lthn.ai/core/go/pkg/log"
)
// Config configures a ForgejoSource.
type Config struct {
Repos []string // "owner/repo" format
}
// ForgejoSource polls a Forgejo instance for pipeline signals from epic issues.
type ForgejoSource struct {
repos []string
forge *forge.Client
}
// New creates a ForgejoSource using the given forge client.
func New(cfg Config, client *forge.Client) *ForgejoSource {
return &ForgejoSource{
repos: cfg.Repos,
forge: client,
}
}
// Name returns the source identifier.
func (s *ForgejoSource) Name() string {
return "forgejo"
}
// Poll fetches epics and their linked PRs from all configured repositories,
// returning a PipelineSignal for each unchecked child that has a linked PR.
func (s *ForgejoSource) Poll(ctx context.Context) ([]*jobrunner.PipelineSignal, error) {
var signals []*jobrunner.PipelineSignal
for _, repoFull := range s.repos {
owner, repo, err := splitRepo(repoFull)
if err != nil {
log.Error("invalid repo format", "repo", repoFull, "err", err)
continue
}
repoSignals, err := s.pollRepo(ctx, owner, repo)
if err != nil {
log.Error("poll repo failed", "repo", repoFull, "err", err)
continue
}
signals = append(signals, repoSignals...)
}
return signals, nil
}
// Report posts the action result as a comment on the epic issue.
func (s *ForgejoSource) Report(ctx context.Context, result *jobrunner.ActionResult) error {
if result == nil {
return nil
}
status := "succeeded"
if !result.Success {
status = "failed"
}
body := fmt.Sprintf("**jobrunner** `%s` %s for #%d (PR #%d)", result.Action, status, result.ChildNumber, result.PRNumber)
if result.Error != "" {
body += fmt.Sprintf("\n\n```\n%s\n```", result.Error)
}
return s.forge.CreateIssueComment(result.RepoOwner, result.RepoName, int64(result.EpicNumber), body)
}
// pollRepo fetches epics and PRs for a single repository.
func (s *ForgejoSource) pollRepo(_ context.Context, owner, repo string) ([]*jobrunner.PipelineSignal, error) {
// Fetch epic issues (label=epic, state=open).
issues, err := s.forge.ListIssues(owner, repo, forge.ListIssuesOpts{State: "open"})
if err != nil {
return nil, log.E("forgejo.pollRepo", "fetch issues", err)
}
// Filter to epics only.
var epics []epicInfo
for _, issue := range issues {
for _, label := range issue.Labels {
if label.Name == "epic" {
epics = append(epics, epicInfo{
Number: int(issue.Index),
Body: issue.Body,
})
break
}
}
}
if len(epics) == 0 {
return nil, nil
}
// Fetch all open PRs (and also merged/closed to catch MERGED state).
prs, err := s.forge.ListPullRequests(owner, repo, "all")
if err != nil {
return nil, log.E("forgejo.pollRepo", "fetch PRs", err)
}
var signals []*jobrunner.PipelineSignal
for _, epic := range epics {
unchecked, _ := parseEpicChildren(epic.Body)
for _, childNum := range unchecked {
pr := findLinkedPR(prs, childNum)
if pr == nil {
// No PR yet — check if the child issue is assigned (needs coding).
childIssue, err := s.forge.GetIssue(owner, repo, int64(childNum))
if err != nil {
log.Error("fetch child issue failed", "repo", owner+"/"+repo, "issue", childNum, "err", err)
continue
}
if len(childIssue.Assignees) > 0 && childIssue.Assignees[0].UserName != "" {
sig := &jobrunner.PipelineSignal{
EpicNumber: epic.Number,
ChildNumber: childNum,
RepoOwner: owner,
RepoName: repo,
NeedsCoding: true,
Assignee: childIssue.Assignees[0].UserName,
IssueTitle: childIssue.Title,
IssueBody: childIssue.Body,
}
signals = append(signals, sig)
}
continue
}
// Get combined commit status for the PR's head SHA.
checkStatus := "PENDING"
if pr.Head != nil && pr.Head.Sha != "" {
cs, err := s.forge.GetCombinedStatus(owner, repo, pr.Head.Sha)
if err != nil {
log.Error("fetch combined status failed", "repo", owner+"/"+repo, "sha", pr.Head.Sha, "err", err)
} else {
checkStatus = mapCombinedStatus(cs)
}
}
sig := buildSignal(owner, repo, epic.Number, childNum, pr, checkStatus)
signals = append(signals, sig)
}
}
return signals, nil
}
type epicInfo struct {
Number int
Body string
}
// splitRepo parses "owner/repo" into its components.
func splitRepo(full string) (string, string, error) {
parts := strings.SplitN(full, "/", 2)
if len(parts) != 2 || parts[0] == "" || parts[1] == "" {
return "", "", log.E("forgejo.splitRepo", fmt.Sprintf("expected owner/repo format, got %q", full), nil)
}
return parts[0], parts[1], nil
}

View file

@ -0,0 +1,177 @@
package forgejo
import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"strings"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"forge.lthn.ai/core/go-scm/forge"
"forge.lthn.ai/core/go-scm/jobrunner"
)
// withVersion wraps an HTTP handler to serve the Forgejo /api/v1/version
// endpoint that the SDK calls during NewClient initialization.
func withVersion(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if strings.HasSuffix(r.URL.Path, "/version") {
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{"version":"9.0.0"}`))
return
}
next.ServeHTTP(w, r)
})
}
func newTestClient(t *testing.T, url string) *forge.Client {
t.Helper()
client, err := forge.New(url, "test-token")
require.NoError(t, err)
return client
}
func TestForgejoSource_Name(t *testing.T) {
s := New(Config{}, nil)
assert.Equal(t, "forgejo", s.Name())
}
func TestForgejoSource_Poll_Good(t *testing.T) {
srv := httptest.NewServer(withVersion(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
path := r.URL.Path
w.Header().Set("Content-Type", "application/json")
switch {
// List issues — return one epic
case strings.Contains(path, "/issues"):
issues := []map[string]any{
{
"number": 10,
"body": "## Tasks\n- [ ] #11\n- [x] #12\n",
"labels": []map[string]string{{"name": "epic"}},
"state": "open",
},
}
_ = json.NewEncoder(w).Encode(issues)
// List PRs — return one open PR linked to #11
case strings.Contains(path, "/pulls"):
prs := []map[string]any{
{
"number": 20,
"body": "Fixes #11",
"state": "open",
"mergeable": true,
"merged": false,
"head": map[string]string{"sha": "abc123", "ref": "feature", "label": "feature"},
},
}
_ = json.NewEncoder(w).Encode(prs)
// Combined status
case strings.Contains(path, "/status"):
status := map[string]any{
"state": "success",
"total_count": 1,
"statuses": []map[string]any{{"status": "success", "context": "ci"}},
}
_ = json.NewEncoder(w).Encode(status)
default:
w.WriteHeader(http.StatusNotFound)
}
})))
defer srv.Close()
client := newTestClient(t, srv.URL)
s := New(Config{Repos: []string{"test-org/test-repo"}}, client)
signals, err := s.Poll(context.Background())
require.NoError(t, err)
require.Len(t, signals, 1)
sig := signals[0]
assert.Equal(t, 10, sig.EpicNumber)
assert.Equal(t, 11, sig.ChildNumber)
assert.Equal(t, 20, sig.PRNumber)
assert.Equal(t, "OPEN", sig.PRState)
assert.Equal(t, "MERGEABLE", sig.Mergeable)
assert.Equal(t, "SUCCESS", sig.CheckStatus)
assert.Equal(t, "test-org", sig.RepoOwner)
assert.Equal(t, "test-repo", sig.RepoName)
assert.Equal(t, "abc123", sig.LastCommitSHA)
}
func TestForgejoSource_Poll_NoEpics(t *testing.T) {
srv := httptest.NewServer(withVersion(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode([]any{})
})))
defer srv.Close()
client := newTestClient(t, srv.URL)
s := New(Config{Repos: []string{"test-org/test-repo"}}, client)
signals, err := s.Poll(context.Background())
require.NoError(t, err)
assert.Empty(t, signals)
}
func TestForgejoSource_Report_Good(t *testing.T) {
var capturedBody string
srv := httptest.NewServer(withVersion(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
var body map[string]string
_ = json.NewDecoder(r.Body).Decode(&body)
capturedBody = body["body"]
_ = json.NewEncoder(w).Encode(map[string]any{"id": 1})
})))
defer srv.Close()
client := newTestClient(t, srv.URL)
s := New(Config{}, client)
result := &jobrunner.ActionResult{
Action: "enable_auto_merge",
RepoOwner: "test-org",
RepoName: "test-repo",
EpicNumber: 10,
ChildNumber: 11,
PRNumber: 20,
Success: true,
}
err := s.Report(context.Background(), result)
require.NoError(t, err)
assert.Contains(t, capturedBody, "enable_auto_merge")
assert.Contains(t, capturedBody, "succeeded")
}
func TestParseEpicChildren(t *testing.T) {
body := "## Tasks\n- [x] #1\n- [ ] #7\n- [ ] #8\n- [x] #3\n"
unchecked, checked := parseEpicChildren(body)
assert.Equal(t, []int{7, 8}, unchecked)
assert.Equal(t, []int{1, 3}, checked)
}
func TestFindLinkedPR(t *testing.T) {
assert.Nil(t, findLinkedPR(nil, 7))
}
func TestSplitRepo(t *testing.T) {
owner, repo, err := splitRepo("host-uk/core")
require.NoError(t, err)
assert.Equal(t, "host-uk", owner)
assert.Equal(t, "core", repo)
_, _, err = splitRepo("invalid")
assert.Error(t, err)
_, _, err = splitRepo("")
assert.Error(t, err)
}

View file

@ -0,0 +1,87 @@
package handlers
import (
"context"
"fmt"
"time"
"forge.lthn.ai/core/go-scm/forge"
"forge.lthn.ai/core/go-scm/jobrunner"
)
const (
ColorAgentComplete = "#0e8a16" // Green
)
// CompletionHandler manages issue state when an agent finishes work.
type CompletionHandler struct {
forge *forge.Client
}
// NewCompletionHandler creates a handler for agent completion events.
func NewCompletionHandler(client *forge.Client) *CompletionHandler {
return &CompletionHandler{
forge: client,
}
}
// Name returns the handler identifier.
func (h *CompletionHandler) Name() string {
return "completion"
}
// Match returns true if the signal indicates an agent has finished a task.
func (h *CompletionHandler) Match(signal *jobrunner.PipelineSignal) bool {
return signal.Type == "agent_completion"
}
// Execute updates the issue labels based on the completion status.
func (h *CompletionHandler) Execute(ctx context.Context, signal *jobrunner.PipelineSignal) (*jobrunner.ActionResult, error) {
start := time.Now()
// Remove in-progress label.
if inProgressLabel, err := h.forge.GetLabelByName(signal.RepoOwner, signal.RepoName, LabelInProgress); err == nil {
_ = h.forge.RemoveIssueLabel(signal.RepoOwner, signal.RepoName, int64(signal.ChildNumber), inProgressLabel.ID)
}
if signal.Success {
completeLabel, err := h.forge.EnsureLabel(signal.RepoOwner, signal.RepoName, LabelAgentComplete, ColorAgentComplete)
if err != nil {
return nil, fmt.Errorf("ensure label %s: %w", LabelAgentComplete, err)
}
if err := h.forge.AddIssueLabels(signal.RepoOwner, signal.RepoName, int64(signal.ChildNumber), []int64{completeLabel.ID}); err != nil {
return nil, fmt.Errorf("add completed label: %w", err)
}
if signal.Message != "" {
_ = h.forge.CreateIssueComment(signal.RepoOwner, signal.RepoName, int64(signal.ChildNumber), signal.Message)
}
} else {
failedLabel, err := h.forge.EnsureLabel(signal.RepoOwner, signal.RepoName, LabelAgentFailed, ColorAgentFailed)
if err != nil {
return nil, fmt.Errorf("ensure label %s: %w", LabelAgentFailed, err)
}
if err := h.forge.AddIssueLabels(signal.RepoOwner, signal.RepoName, int64(signal.ChildNumber), []int64{failedLabel.ID}); err != nil {
return nil, fmt.Errorf("add failed label: %w", err)
}
msg := "Agent reported failure."
if signal.Error != "" {
msg += fmt.Sprintf("\n\nError: %s", signal.Error)
}
_ = h.forge.CreateIssueComment(signal.RepoOwner, signal.RepoName, int64(signal.ChildNumber), msg)
}
return &jobrunner.ActionResult{
Action: "completion",
RepoOwner: signal.RepoOwner,
RepoName: signal.RepoName,
EpicNumber: signal.EpicNumber,
ChildNumber: signal.ChildNumber,
Success: true,
Timestamp: time.Now(),
Duration: time.Since(start),
}, nil
}

View file

@ -0,0 +1,290 @@
package handlers
import (
"bytes"
"context"
"encoding/json"
"fmt"
"path/filepath"
"time"
"forge.lthn.ai/core/go-scm/agentci"
"forge.lthn.ai/core/go-scm/forge"
"forge.lthn.ai/core/go-scm/jobrunner"
"forge.lthn.ai/core/go/pkg/log"
)
const (
LabelAgentReady = "agent-ready"
LabelInProgress = "in-progress"
LabelAgentFailed = "agent-failed"
LabelAgentComplete = "agent-completed"
ColorInProgress = "#1d76db" // Blue
ColorAgentFailed = "#c0392b" // Red
)
// DispatchTicket is the JSON payload written to the agent's queue.
// The ForgeToken is transferred separately via a .env file with 0600 permissions.
type DispatchTicket struct {
ID string `json:"id"`
RepoOwner string `json:"repo_owner"`
RepoName string `json:"repo_name"`
IssueNumber int `json:"issue_number"`
IssueTitle string `json:"issue_title"`
IssueBody string `json:"issue_body"`
TargetBranch string `json:"target_branch"`
EpicNumber int `json:"epic_number"`
ForgeURL string `json:"forge_url"`
ForgeUser string `json:"forgejo_user"`
Model string `json:"model,omitempty"`
Runner string `json:"runner,omitempty"`
VerifyModel string `json:"verify_model,omitempty"`
DualRun bool `json:"dual_run"`
CreatedAt string `json:"created_at"`
}
// DispatchHandler dispatches coding work to remote agent machines via SSH.
type DispatchHandler struct {
forge *forge.Client
forgeURL string
token string
spinner *agentci.Spinner
}
// NewDispatchHandler creates a handler that dispatches tickets to agent machines.
func NewDispatchHandler(client *forge.Client, forgeURL, token string, spinner *agentci.Spinner) *DispatchHandler {
return &DispatchHandler{
forge: client,
forgeURL: forgeURL,
token: token,
spinner: spinner,
}
}
// Name returns the handler identifier.
func (h *DispatchHandler) Name() string {
return "dispatch"
}
// Match returns true for signals where a child issue needs coding (no PR yet)
// and the assignee is a known agent (by config key or Forgejo username).
func (h *DispatchHandler) Match(signal *jobrunner.PipelineSignal) bool {
if !signal.NeedsCoding {
return false
}
_, _, ok := h.spinner.FindByForgejoUser(signal.Assignee)
return ok
}
// Execute creates a ticket JSON and transfers it securely to the agent's queue directory.
func (h *DispatchHandler) Execute(ctx context.Context, signal *jobrunner.PipelineSignal) (*jobrunner.ActionResult, error) {
start := time.Now()
agentName, agent, ok := h.spinner.FindByForgejoUser(signal.Assignee)
if !ok {
return nil, fmt.Errorf("unknown agent: %s", signal.Assignee)
}
// Sanitize inputs to prevent path traversal.
safeOwner, err := agentci.SanitizePath(signal.RepoOwner)
if err != nil {
return nil, fmt.Errorf("invalid repo owner: %w", err)
}
safeRepo, err := agentci.SanitizePath(signal.RepoName)
if err != nil {
return nil, fmt.Errorf("invalid repo name: %w", err)
}
// Ensure in-progress label exists on repo.
inProgressLabel, err := h.forge.EnsureLabel(safeOwner, safeRepo, LabelInProgress, ColorInProgress)
if err != nil {
return nil, fmt.Errorf("ensure label %s: %w", LabelInProgress, err)
}
// Check if already in progress to prevent double-dispatch.
issue, err := h.forge.GetIssue(safeOwner, safeRepo, int64(signal.ChildNumber))
if err == nil {
for _, l := range issue.Labels {
if l.Name == LabelInProgress || l.Name == LabelAgentComplete {
log.Info("issue already processed, skipping", "issue", signal.ChildNumber, "label", l.Name)
return &jobrunner.ActionResult{
Action: "dispatch",
Success: true,
Timestamp: time.Now(),
Duration: time.Since(start),
}, nil
}
}
}
// Assign agent and add in-progress label.
if err := h.forge.AssignIssue(safeOwner, safeRepo, int64(signal.ChildNumber), []string{signal.Assignee}); err != nil {
log.Warn("failed to assign agent, continuing", "err", err)
}
if err := h.forge.AddIssueLabels(safeOwner, safeRepo, int64(signal.ChildNumber), []int64{inProgressLabel.ID}); err != nil {
return nil, fmt.Errorf("add in-progress label: %w", err)
}
// Remove agent-ready label if present.
if readyLabel, err := h.forge.GetLabelByName(safeOwner, safeRepo, LabelAgentReady); err == nil {
_ = h.forge.RemoveIssueLabel(safeOwner, safeRepo, int64(signal.ChildNumber), readyLabel.ID)
}
// Clotho planning — determine execution mode.
runMode := h.spinner.DeterminePlan(signal, agentName)
verifyModel := ""
if runMode == agentci.ModeDual {
verifyModel = h.spinner.GetVerifierModel(agentName)
}
// Build ticket.
targetBranch := "new" // TODO: resolve from epic or repo default
ticketID := fmt.Sprintf("%s-%s-%d-%d", safeOwner, safeRepo, signal.ChildNumber, time.Now().Unix())
ticket := DispatchTicket{
ID: ticketID,
RepoOwner: safeOwner,
RepoName: safeRepo,
IssueNumber: signal.ChildNumber,
IssueTitle: signal.IssueTitle,
IssueBody: signal.IssueBody,
TargetBranch: targetBranch,
EpicNumber: signal.EpicNumber,
ForgeURL: h.forgeURL,
ForgeUser: signal.Assignee,
Model: agent.Model,
Runner: agent.Runner,
VerifyModel: verifyModel,
DualRun: runMode == agentci.ModeDual,
CreatedAt: time.Now().UTC().Format(time.RFC3339),
}
ticketJSON, err := json.MarshalIndent(ticket, "", " ")
if err != nil {
h.failDispatch(signal, "Failed to marshal ticket JSON")
return nil, fmt.Errorf("marshal ticket: %w", err)
}
// Check if ticket already exists on agent (dedup).
ticketName := fmt.Sprintf("ticket-%s-%s-%d.json", safeOwner, safeRepo, signal.ChildNumber)
if h.ticketExists(ctx, agent, ticketName) {
log.Info("ticket already queued, skipping", "ticket", ticketName, "agent", signal.Assignee)
return &jobrunner.ActionResult{
Action: "dispatch",
RepoOwner: safeOwner,
RepoName: safeRepo,
EpicNumber: signal.EpicNumber,
ChildNumber: signal.ChildNumber,
Success: true,
Timestamp: time.Now(),
Duration: time.Since(start),
}, nil
}
// Transfer ticket JSON.
remoteTicketPath := filepath.Join(agent.QueueDir, ticketName)
if err := h.secureTransfer(ctx, agent, remoteTicketPath, ticketJSON, 0644); err != nil {
h.failDispatch(signal, fmt.Sprintf("Ticket transfer failed: %v", err))
return &jobrunner.ActionResult{
Action: "dispatch",
RepoOwner: safeOwner,
RepoName: safeRepo,
EpicNumber: signal.EpicNumber,
ChildNumber: signal.ChildNumber,
Success: false,
Error: fmt.Sprintf("transfer ticket: %v", err),
Timestamp: time.Now(),
Duration: time.Since(start),
}, nil
}
// Transfer token via separate .env file with 0600 permissions.
envContent := fmt.Sprintf("FORGE_TOKEN=%s\n", h.token)
remoteEnvPath := filepath.Join(agent.QueueDir, fmt.Sprintf(".env.%s", ticketID))
if err := h.secureTransfer(ctx, agent, remoteEnvPath, []byte(envContent), 0600); err != nil {
// Clean up the ticket if env transfer fails.
_ = h.runRemote(ctx, agent, fmt.Sprintf("rm -f %s", agentci.EscapeShellArg(remoteTicketPath)))
h.failDispatch(signal, fmt.Sprintf("Token transfer failed: %v", err))
return &jobrunner.ActionResult{
Action: "dispatch",
RepoOwner: safeOwner,
RepoName: safeRepo,
EpicNumber: signal.EpicNumber,
ChildNumber: signal.ChildNumber,
Success: false,
Error: fmt.Sprintf("transfer token: %v", err),
Timestamp: time.Now(),
Duration: time.Since(start),
}, nil
}
// Comment on issue.
modeStr := "Standard"
if runMode == agentci.ModeDual {
modeStr = "Clotho Verified (Dual Run)"
}
comment := fmt.Sprintf("Dispatched to **%s** agent queue.\nMode: **%s**", signal.Assignee, modeStr)
_ = h.forge.CreateIssueComment(safeOwner, safeRepo, int64(signal.ChildNumber), comment)
return &jobrunner.ActionResult{
Action: "dispatch",
RepoOwner: safeOwner,
RepoName: safeRepo,
EpicNumber: signal.EpicNumber,
ChildNumber: signal.ChildNumber,
Success: true,
Timestamp: time.Now(),
Duration: time.Since(start),
}, nil
}
// failDispatch handles cleanup when dispatch fails (adds failed label, removes in-progress).
func (h *DispatchHandler) failDispatch(signal *jobrunner.PipelineSignal, reason string) {
if failedLabel, err := h.forge.EnsureLabel(signal.RepoOwner, signal.RepoName, LabelAgentFailed, ColorAgentFailed); err == nil {
_ = h.forge.AddIssueLabels(signal.RepoOwner, signal.RepoName, int64(signal.ChildNumber), []int64{failedLabel.ID})
}
if inProgressLabel, err := h.forge.GetLabelByName(signal.RepoOwner, signal.RepoName, LabelInProgress); err == nil {
_ = h.forge.RemoveIssueLabel(signal.RepoOwner, signal.RepoName, int64(signal.ChildNumber), inProgressLabel.ID)
}
_ = h.forge.CreateIssueComment(signal.RepoOwner, signal.RepoName, int64(signal.ChildNumber), fmt.Sprintf("Agent dispatch failed: %s", reason))
}
// secureTransfer writes data to a remote path via SSH stdin, preventing command injection.
func (h *DispatchHandler) secureTransfer(ctx context.Context, agent agentci.AgentConfig, remotePath string, data []byte, mode int) error {
safeRemotePath := agentci.EscapeShellArg(remotePath)
remoteCmd := fmt.Sprintf("cat > %s && chmod %o %s", safeRemotePath, mode, safeRemotePath)
cmd := agentci.SecureSSHCommand(agent.Host, remoteCmd)
cmd.Stdin = bytes.NewReader(data)
output, err := cmd.CombinedOutput()
if err != nil {
return log.E("dispatch.transfer", fmt.Sprintf("ssh to %s failed: %s", agent.Host, string(output)), err)
}
return nil
}
// runRemote executes a command on the agent via SSH.
func (h *DispatchHandler) runRemote(ctx context.Context, agent agentci.AgentConfig, cmdStr string) error {
cmd := agentci.SecureSSHCommand(agent.Host, cmdStr)
return cmd.Run()
}
// ticketExists checks if a ticket file already exists in queue, active, or done.
func (h *DispatchHandler) ticketExists(ctx context.Context, agent agentci.AgentConfig, ticketName string) bool {
safeTicket, err := agentci.SanitizePath(ticketName)
if err != nil {
return false
}
qDir := agent.QueueDir
checkCmd := fmt.Sprintf(
"test -f %s/%s || test -f %s/../active/%s || test -f %s/../done/%s",
qDir, safeTicket, qDir, safeTicket, qDir, safeTicket,
)
cmd := agentci.SecureSSHCommand(agent.Host, checkCmd)
return cmd.Run() == nil
}

View file

@ -0,0 +1,327 @@
package handlers
import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"forge.lthn.ai/core/go-scm/agentci"
"forge.lthn.ai/core/go-scm/jobrunner"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// newTestSpinner creates a Spinner with the given agents for testing.
func newTestSpinner(agents map[string]agentci.AgentConfig) *agentci.Spinner {
return agentci.NewSpinner(agentci.ClothoConfig{Strategy: "direct"}, agents)
}
// --- Match tests ---
func TestDispatch_Match_Good_NeedsCoding(t *testing.T) {
spinner := newTestSpinner(map[string]agentci.AgentConfig{
"darbs-claude": {Host: "claude@192.168.0.201", QueueDir: "~/ai-work/queue", Active: true},
})
h := NewDispatchHandler(nil, "", "", spinner)
sig := &jobrunner.PipelineSignal{
NeedsCoding: true,
Assignee: "darbs-claude",
}
assert.True(t, h.Match(sig))
}
func TestDispatch_Match_Good_MultipleAgents(t *testing.T) {
spinner := newTestSpinner(map[string]agentci.AgentConfig{
"darbs-claude": {Host: "claude@192.168.0.201", QueueDir: "~/ai-work/queue", Active: true},
"local-codex": {Host: "localhost", QueueDir: "~/ai-work/queue", Active: true},
})
h := NewDispatchHandler(nil, "", "", spinner)
sig := &jobrunner.PipelineSignal{
NeedsCoding: true,
Assignee: "local-codex",
}
assert.True(t, h.Match(sig))
}
func TestDispatch_Match_Bad_HasPR(t *testing.T) {
spinner := newTestSpinner(map[string]agentci.AgentConfig{
"darbs-claude": {Host: "claude@192.168.0.201", QueueDir: "~/ai-work/queue", Active: true},
})
h := NewDispatchHandler(nil, "", "", spinner)
sig := &jobrunner.PipelineSignal{
NeedsCoding: false,
PRNumber: 7,
Assignee: "darbs-claude",
}
assert.False(t, h.Match(sig))
}
func TestDispatch_Match_Bad_UnknownAgent(t *testing.T) {
spinner := newTestSpinner(map[string]agentci.AgentConfig{
"darbs-claude": {Host: "claude@192.168.0.201", QueueDir: "~/ai-work/queue", Active: true},
})
h := NewDispatchHandler(nil, "", "", spinner)
sig := &jobrunner.PipelineSignal{
NeedsCoding: true,
Assignee: "unknown-user",
}
assert.False(t, h.Match(sig))
}
func TestDispatch_Match_Bad_NotAssigned(t *testing.T) {
spinner := newTestSpinner(map[string]agentci.AgentConfig{
"darbs-claude": {Host: "claude@192.168.0.201", QueueDir: "~/ai-work/queue", Active: true},
})
h := NewDispatchHandler(nil, "", "", spinner)
sig := &jobrunner.PipelineSignal{
NeedsCoding: true,
Assignee: "",
}
assert.False(t, h.Match(sig))
}
func TestDispatch_Match_Bad_EmptyAgentMap(t *testing.T) {
spinner := newTestSpinner(map[string]agentci.AgentConfig{})
h := NewDispatchHandler(nil, "", "", spinner)
sig := &jobrunner.PipelineSignal{
NeedsCoding: true,
Assignee: "darbs-claude",
}
assert.False(t, h.Match(sig))
}
// --- Name test ---
func TestDispatch_Name_Good(t *testing.T) {
spinner := newTestSpinner(nil)
h := NewDispatchHandler(nil, "", "", spinner)
assert.Equal(t, "dispatch", h.Name())
}
// --- Execute tests ---
func TestDispatch_Execute_Bad_UnknownAgent(t *testing.T) {
srv := httptest.NewServer(withVersion(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
})))
defer srv.Close()
client := newTestForgeClient(t, srv.URL)
spinner := newTestSpinner(map[string]agentci.AgentConfig{
"darbs-claude": {Host: "claude@192.168.0.201", QueueDir: "~/ai-work/queue", Active: true},
})
h := NewDispatchHandler(client, srv.URL, "test-token", spinner)
sig := &jobrunner.PipelineSignal{
NeedsCoding: true,
Assignee: "nonexistent-agent",
RepoOwner: "host-uk",
RepoName: "core",
ChildNumber: 1,
}
_, err := h.Execute(context.Background(), sig)
require.Error(t, err)
assert.Contains(t, err.Error(), "unknown agent")
}
func TestDispatch_TicketJSON_Good(t *testing.T) {
ticket := DispatchTicket{
ID: "host-uk-core-5-1234567890",
RepoOwner: "host-uk",
RepoName: "core",
IssueNumber: 5,
IssueTitle: "Fix the thing",
IssueBody: "Please fix this bug",
TargetBranch: "new",
EpicNumber: 3,
ForgeURL: "https://forge.lthn.ai",
ForgeUser: "darbs-claude",
Model: "sonnet",
Runner: "claude",
DualRun: false,
CreatedAt: "2026-02-09T12:00:00Z",
}
data, err := json.MarshalIndent(ticket, "", " ")
require.NoError(t, err)
var decoded map[string]any
err = json.Unmarshal(data, &decoded)
require.NoError(t, err)
assert.Equal(t, "host-uk-core-5-1234567890", decoded["id"])
assert.Equal(t, "host-uk", decoded["repo_owner"])
assert.Equal(t, "core", decoded["repo_name"])
assert.Equal(t, float64(5), decoded["issue_number"])
assert.Equal(t, "Fix the thing", decoded["issue_title"])
assert.Equal(t, "Please fix this bug", decoded["issue_body"])
assert.Equal(t, "new", decoded["target_branch"])
assert.Equal(t, float64(3), decoded["epic_number"])
assert.Equal(t, "https://forge.lthn.ai", decoded["forge_url"])
assert.Equal(t, "darbs-claude", decoded["forgejo_user"])
assert.Equal(t, "sonnet", decoded["model"])
assert.Equal(t, "claude", decoded["runner"])
// Token should NOT be present in the ticket.
_, hasToken := decoded["forge_token"]
assert.False(t, hasToken, "forge_token must not be in ticket JSON")
}
func TestDispatch_TicketJSON_Good_DualRun(t *testing.T) {
ticket := DispatchTicket{
ID: "test-dual",
RepoOwner: "host-uk",
RepoName: "core",
IssueNumber: 1,
ForgeURL: "https://forge.lthn.ai",
Model: "gemini-2.0-flash",
VerifyModel: "gemini-1.5-pro",
DualRun: true,
}
data, err := json.Marshal(ticket)
require.NoError(t, err)
var roundtrip DispatchTicket
err = json.Unmarshal(data, &roundtrip)
require.NoError(t, err)
assert.True(t, roundtrip.DualRun)
assert.Equal(t, "gemini-1.5-pro", roundtrip.VerifyModel)
}
func TestDispatch_TicketJSON_Good_OmitsEmptyModelRunner(t *testing.T) {
ticket := DispatchTicket{
ID: "test-1",
RepoOwner: "host-uk",
RepoName: "core",
IssueNumber: 1,
TargetBranch: "new",
ForgeURL: "https://forge.lthn.ai",
}
data, err := json.MarshalIndent(ticket, "", " ")
require.NoError(t, err)
var decoded map[string]any
err = json.Unmarshal(data, &decoded)
require.NoError(t, err)
_, hasModel := decoded["model"]
_, hasRunner := decoded["runner"]
assert.False(t, hasModel, "model should be omitted when empty")
assert.False(t, hasRunner, "runner should be omitted when empty")
}
func TestDispatch_TicketJSON_Good_ModelRunnerVariants(t *testing.T) {
tests := []struct {
name string
model string
runner string
}{
{"claude-sonnet", "sonnet", "claude"},
{"claude-opus", "opus", "claude"},
{"codex-default", "", "codex"},
{"gemini-default", "", "gemini"},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ticket := DispatchTicket{
ID: "test-" + tt.name,
RepoOwner: "host-uk",
RepoName: "core",
IssueNumber: 1,
TargetBranch: "new",
ForgeURL: "https://forge.lthn.ai",
Model: tt.model,
Runner: tt.runner,
}
data, err := json.Marshal(ticket)
require.NoError(t, err)
var roundtrip DispatchTicket
err = json.Unmarshal(data, &roundtrip)
require.NoError(t, err)
assert.Equal(t, tt.model, roundtrip.Model)
assert.Equal(t, tt.runner, roundtrip.Runner)
})
}
}
func TestDispatch_Execute_Good_PostsComment(t *testing.T) {
var commentPosted bool
var commentBody string
srv := httptest.NewServer(withVersion(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
switch {
case r.Method == http.MethodGet && r.URL.Path == "/api/v1/repos/host-uk/core/labels":
json.NewEncoder(w).Encode([]any{})
return
case r.Method == http.MethodPost && r.URL.Path == "/api/v1/repos/host-uk/core/labels":
json.NewEncoder(w).Encode(map[string]any{"id": 1, "name": "in-progress", "color": "#1d76db"})
return
case r.Method == http.MethodGet && r.URL.Path == "/api/v1/repos/host-uk/core/issues/5":
json.NewEncoder(w).Encode(map[string]any{"id": 5, "number": 5, "labels": []any{}, "title": "Test"})
return
case r.Method == http.MethodPatch && r.URL.Path == "/api/v1/repos/host-uk/core/issues/5":
json.NewEncoder(w).Encode(map[string]any{"id": 5, "number": 5})
return
case r.Method == http.MethodPost && r.URL.Path == "/api/v1/repos/host-uk/core/issues/5/labels":
json.NewEncoder(w).Encode([]any{map[string]any{"id": 1, "name": "in-progress"}})
return
case r.Method == http.MethodPost && r.URL.Path == "/api/v1/repos/host-uk/core/issues/5/comments":
commentPosted = true
var body map[string]string
_ = json.NewDecoder(r.Body).Decode(&body)
commentBody = body["body"]
json.NewEncoder(w).Encode(map[string]any{"id": 1, "body": body["body"]})
return
}
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(map[string]any{})
})))
defer srv.Close()
client := newTestForgeClient(t, srv.URL)
spinner := newTestSpinner(map[string]agentci.AgentConfig{
"darbs-claude": {Host: "localhost", QueueDir: "/tmp/nonexistent-queue", Active: true},
})
h := NewDispatchHandler(client, srv.URL, "test-token", spinner)
sig := &jobrunner.PipelineSignal{
NeedsCoding: true,
Assignee: "darbs-claude",
RepoOwner: "host-uk",
RepoName: "core",
ChildNumber: 5,
EpicNumber: 3,
IssueTitle: "Test issue",
IssueBody: "Test body",
}
result, err := h.Execute(context.Background(), sig)
require.NoError(t, err)
assert.Equal(t, "dispatch", result.Action)
assert.Equal(t, "host-uk", result.RepoOwner)
assert.Equal(t, "core", result.RepoName)
assert.Equal(t, 3, result.EpicNumber)
assert.Equal(t, 5, result.ChildNumber)
if result.Success {
assert.True(t, commentPosted)
assert.Contains(t, commentBody, "darbs-claude")
}
}

View file

@ -0,0 +1,58 @@
package handlers
import (
"context"
"fmt"
"time"
"forge.lthn.ai/core/go-scm/forge"
"forge.lthn.ai/core/go-scm/jobrunner"
)
// EnableAutoMergeHandler merges a PR that is ready using squash strategy.
type EnableAutoMergeHandler struct {
forge *forge.Client
}
// NewEnableAutoMergeHandler creates a handler that merges ready PRs.
func NewEnableAutoMergeHandler(f *forge.Client) *EnableAutoMergeHandler {
return &EnableAutoMergeHandler{forge: f}
}
// Name returns the handler identifier.
func (h *EnableAutoMergeHandler) Name() string {
return "enable_auto_merge"
}
// Match returns true when the PR is open, not a draft, mergeable, checks
// are passing, and there are no unresolved review threads.
func (h *EnableAutoMergeHandler) Match(signal *jobrunner.PipelineSignal) bool {
return signal.PRState == "OPEN" &&
!signal.IsDraft &&
signal.Mergeable == "MERGEABLE" &&
signal.CheckStatus == "SUCCESS" &&
!signal.HasUnresolvedThreads()
}
// Execute merges the pull request with squash strategy.
func (h *EnableAutoMergeHandler) Execute(ctx context.Context, signal *jobrunner.PipelineSignal) (*jobrunner.ActionResult, error) {
start := time.Now()
err := h.forge.MergePullRequest(signal.RepoOwner, signal.RepoName, int64(signal.PRNumber), "squash")
result := &jobrunner.ActionResult{
Action: "enable_auto_merge",
RepoOwner: signal.RepoOwner,
RepoName: signal.RepoName,
PRNumber: signal.PRNumber,
Success: err == nil,
Timestamp: time.Now(),
Duration: time.Since(start),
}
if err != nil {
result.Error = fmt.Sprintf("merge failed: %v", err)
}
return result, nil
}

View file

@ -0,0 +1,105 @@
package handlers
import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"forge.lthn.ai/core/go-scm/jobrunner"
)
func TestEnableAutoMerge_Match_Good(t *testing.T) {
h := NewEnableAutoMergeHandler(nil)
sig := &jobrunner.PipelineSignal{
PRState: "OPEN",
IsDraft: false,
Mergeable: "MERGEABLE",
CheckStatus: "SUCCESS",
ThreadsTotal: 0,
ThreadsResolved: 0,
}
assert.True(t, h.Match(sig))
}
func TestEnableAutoMerge_Match_Bad_Draft(t *testing.T) {
h := NewEnableAutoMergeHandler(nil)
sig := &jobrunner.PipelineSignal{
PRState: "OPEN",
IsDraft: true,
Mergeable: "MERGEABLE",
CheckStatus: "SUCCESS",
ThreadsTotal: 0,
ThreadsResolved: 0,
}
assert.False(t, h.Match(sig))
}
func TestEnableAutoMerge_Match_Bad_UnresolvedThreads(t *testing.T) {
h := NewEnableAutoMergeHandler(nil)
sig := &jobrunner.PipelineSignal{
PRState: "OPEN",
IsDraft: false,
Mergeable: "MERGEABLE",
CheckStatus: "SUCCESS",
ThreadsTotal: 5,
ThreadsResolved: 3,
}
assert.False(t, h.Match(sig))
}
func TestEnableAutoMerge_Execute_Good(t *testing.T) {
var capturedPath string
var capturedMethod string
srv := httptest.NewServer(withVersion(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
capturedMethod = r.Method
capturedPath = r.URL.Path
w.WriteHeader(http.StatusOK)
})))
defer srv.Close()
client := newTestForgeClient(t, srv.URL)
h := NewEnableAutoMergeHandler(client)
sig := &jobrunner.PipelineSignal{
RepoOwner: "host-uk",
RepoName: "core-php",
PRNumber: 55,
}
result, err := h.Execute(context.Background(), sig)
require.NoError(t, err)
assert.True(t, result.Success)
assert.Equal(t, "enable_auto_merge", result.Action)
assert.Equal(t, http.MethodPost, capturedMethod)
assert.Equal(t, "/api/v1/repos/host-uk/core-php/pulls/55/merge", capturedPath)
}
func TestEnableAutoMerge_Execute_Bad_MergeFailed(t *testing.T) {
srv := httptest.NewServer(withVersion(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusConflict)
_ = json.NewEncoder(w).Encode(map[string]string{"message": "merge conflict"})
})))
defer srv.Close()
client := newTestForgeClient(t, srv.URL)
h := NewEnableAutoMergeHandler(client)
sig := &jobrunner.PipelineSignal{
RepoOwner: "host-uk",
RepoName: "core-php",
PRNumber: 55,
}
result, err := h.Execute(context.Background(), sig)
require.NoError(t, err)
assert.False(t, result.Success)
assert.Contains(t, result.Error, "merge failed")
}

View file

@ -0,0 +1,55 @@
package handlers
import (
"context"
"fmt"
"time"
"forge.lthn.ai/core/go-scm/forge"
"forge.lthn.ai/core/go-scm/jobrunner"
)
// PublishDraftHandler marks a draft PR as ready for review once its checks pass.
type PublishDraftHandler struct {
forge *forge.Client
}
// NewPublishDraftHandler creates a handler that publishes draft PRs.
func NewPublishDraftHandler(f *forge.Client) *PublishDraftHandler {
return &PublishDraftHandler{forge: f}
}
// Name returns the handler identifier.
func (h *PublishDraftHandler) Name() string {
return "publish_draft"
}
// Match returns true when the PR is a draft, open, and all checks have passed.
func (h *PublishDraftHandler) Match(signal *jobrunner.PipelineSignal) bool {
return signal.IsDraft &&
signal.PRState == "OPEN" &&
signal.CheckStatus == "SUCCESS"
}
// Execute marks the PR as no longer a draft.
func (h *PublishDraftHandler) Execute(ctx context.Context, signal *jobrunner.PipelineSignal) (*jobrunner.ActionResult, error) {
start := time.Now()
err := h.forge.SetPRDraft(signal.RepoOwner, signal.RepoName, int64(signal.PRNumber), false)
result := &jobrunner.ActionResult{
Action: "publish_draft",
RepoOwner: signal.RepoOwner,
RepoName: signal.RepoName,
PRNumber: signal.PRNumber,
Success: err == nil,
Timestamp: time.Now(),
Duration: time.Since(start),
}
if err != nil {
result.Error = fmt.Sprintf("publish draft failed: %v", err)
}
return result, nil
}

View file

@ -0,0 +1,84 @@
package handlers
import (
"context"
"io"
"net/http"
"net/http/httptest"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"forge.lthn.ai/core/go-scm/jobrunner"
)
func TestPublishDraft_Match_Good(t *testing.T) {
h := NewPublishDraftHandler(nil)
sig := &jobrunner.PipelineSignal{
IsDraft: true,
PRState: "OPEN",
CheckStatus: "SUCCESS",
}
assert.True(t, h.Match(sig))
}
func TestPublishDraft_Match_Bad_NotDraft(t *testing.T) {
h := NewPublishDraftHandler(nil)
sig := &jobrunner.PipelineSignal{
IsDraft: false,
PRState: "OPEN",
CheckStatus: "SUCCESS",
}
assert.False(t, h.Match(sig))
}
func TestPublishDraft_Match_Bad_ChecksFailing(t *testing.T) {
h := NewPublishDraftHandler(nil)
sig := &jobrunner.PipelineSignal{
IsDraft: true,
PRState: "OPEN",
CheckStatus: "FAILURE",
}
assert.False(t, h.Match(sig))
}
func TestPublishDraft_Execute_Good(t *testing.T) {
var capturedMethod string
var capturedPath string
var capturedBody string
srv := httptest.NewServer(withVersion(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
capturedMethod = r.Method
capturedPath = r.URL.Path
b, _ := io.ReadAll(r.Body)
capturedBody = string(b)
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(`{}`))
})))
defer srv.Close()
client := newTestForgeClient(t, srv.URL)
h := NewPublishDraftHandler(client)
sig := &jobrunner.PipelineSignal{
RepoOwner: "host-uk",
RepoName: "core-php",
PRNumber: 42,
IsDraft: true,
PRState: "OPEN",
}
result, err := h.Execute(context.Background(), sig)
require.NoError(t, err)
assert.Equal(t, http.MethodPatch, capturedMethod)
assert.Equal(t, "/api/v1/repos/host-uk/core-php/pulls/42", capturedPath)
assert.Contains(t, capturedBody, `"draft":false`)
assert.True(t, result.Success)
assert.Equal(t, "publish_draft", result.Action)
assert.Equal(t, "host-uk", result.RepoOwner)
assert.Equal(t, "core-php", result.RepoName)
assert.Equal(t, 42, result.PRNumber)
}

View file

@ -0,0 +1,79 @@
package handlers
import (
"context"
"fmt"
"time"
forgejosdk "codeberg.org/mvdkleijn/forgejo-sdk/forgejo/v2"
"forge.lthn.ai/core/go-scm/forge"
"forge.lthn.ai/core/go-scm/jobrunner"
)
// DismissReviewsHandler dismisses stale "request changes" reviews on a PR.
// This replaces the GitHub-only ResolveThreadsHandler because Forgejo does
// not have a thread resolution API.
type DismissReviewsHandler struct {
forge *forge.Client
}
// NewDismissReviewsHandler creates a handler that dismisses stale reviews.
func NewDismissReviewsHandler(f *forge.Client) *DismissReviewsHandler {
return &DismissReviewsHandler{forge: f}
}
// Name returns the handler identifier.
func (h *DismissReviewsHandler) Name() string {
return "dismiss_reviews"
}
// Match returns true when the PR is open and has unresolved review threads.
func (h *DismissReviewsHandler) Match(signal *jobrunner.PipelineSignal) bool {
return signal.PRState == "OPEN" && signal.HasUnresolvedThreads()
}
// Execute dismisses stale "request changes" reviews on the PR.
func (h *DismissReviewsHandler) Execute(ctx context.Context, signal *jobrunner.PipelineSignal) (*jobrunner.ActionResult, error) {
start := time.Now()
reviews, err := h.forge.ListPRReviews(signal.RepoOwner, signal.RepoName, int64(signal.PRNumber))
if err != nil {
return nil, fmt.Errorf("dismiss_reviews: list reviews: %w", err)
}
var dismissErrors []string
dismissed := 0
for _, review := range reviews {
if review.State != forgejosdk.ReviewStateRequestChanges || review.Dismissed || !review.Stale {
continue
}
if err := h.forge.DismissReview(
signal.RepoOwner, signal.RepoName,
int64(signal.PRNumber), review.ID,
"Automatically dismissed: review is stale after new commits",
); err != nil {
dismissErrors = append(dismissErrors, err.Error())
} else {
dismissed++
}
}
result := &jobrunner.ActionResult{
Action: "dismiss_reviews",
RepoOwner: signal.RepoOwner,
RepoName: signal.RepoName,
PRNumber: signal.PRNumber,
Success: len(dismissErrors) == 0,
Timestamp: time.Now(),
Duration: time.Since(start),
}
if len(dismissErrors) > 0 {
result.Error = fmt.Sprintf("failed to dismiss %d review(s): %s",
len(dismissErrors), dismissErrors[0])
}
return result, nil
}

View file

@ -0,0 +1,91 @@
package handlers
import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"forge.lthn.ai/core/go-scm/jobrunner"
)
func TestDismissReviews_Match_Good(t *testing.T) {
h := NewDismissReviewsHandler(nil)
sig := &jobrunner.PipelineSignal{
PRState: "OPEN",
ThreadsTotal: 4,
ThreadsResolved: 2,
}
assert.True(t, h.Match(sig))
}
func TestDismissReviews_Match_Bad_AllResolved(t *testing.T) {
h := NewDismissReviewsHandler(nil)
sig := &jobrunner.PipelineSignal{
PRState: "OPEN",
ThreadsTotal: 3,
ThreadsResolved: 3,
}
assert.False(t, h.Match(sig))
}
func TestDismissReviews_Execute_Good(t *testing.T) {
callCount := 0
srv := httptest.NewServer(withVersion(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
callCount++
w.Header().Set("Content-Type", "application/json")
// ListPullReviews (GET)
if r.Method == http.MethodGet {
reviews := []map[string]any{
{
"id": 1, "state": "REQUEST_CHANGES", "dismissed": false, "stale": true,
"body": "fix this", "commit_id": "abc123",
},
{
"id": 2, "state": "APPROVED", "dismissed": false, "stale": false,
"body": "looks good", "commit_id": "abc123",
},
{
"id": 3, "state": "REQUEST_CHANGES", "dismissed": false, "stale": true,
"body": "needs work", "commit_id": "abc123",
},
}
_ = json.NewEncoder(w).Encode(reviews)
return
}
// DismissPullReview (POST to dismissals endpoint)
w.WriteHeader(http.StatusOK)
})))
defer srv.Close()
client := newTestForgeClient(t, srv.URL)
h := NewDismissReviewsHandler(client)
sig := &jobrunner.PipelineSignal{
RepoOwner: "host-uk",
RepoName: "core-admin",
PRNumber: 33,
PRState: "OPEN",
ThreadsTotal: 3,
ThreadsResolved: 1,
}
result, err := h.Execute(context.Background(), sig)
require.NoError(t, err)
assert.True(t, result.Success)
assert.Equal(t, "dismiss_reviews", result.Action)
assert.Equal(t, "host-uk", result.RepoOwner)
assert.Equal(t, "core-admin", result.RepoName)
assert.Equal(t, 33, result.PRNumber)
// 1 list + 2 dismiss (reviews #1 and #3 are stale REQUEST_CHANGES)
assert.Equal(t, 3, callCount)
}

View file

@ -0,0 +1,74 @@
package handlers
import (
"context"
"fmt"
"time"
"forge.lthn.ai/core/go-scm/forge"
"forge.lthn.ai/core/go-scm/jobrunner"
)
// SendFixCommandHandler posts a comment on a PR asking for conflict or
// review fixes.
type SendFixCommandHandler struct {
forge *forge.Client
}
// NewSendFixCommandHandler creates a handler that posts fix commands.
func NewSendFixCommandHandler(f *forge.Client) *SendFixCommandHandler {
return &SendFixCommandHandler{forge: f}
}
// Name returns the handler identifier.
func (h *SendFixCommandHandler) Name() string {
return "send_fix_command"
}
// Match returns true when the PR is open and either has merge conflicts or
// has unresolved threads with failing checks.
func (h *SendFixCommandHandler) Match(signal *jobrunner.PipelineSignal) bool {
if signal.PRState != "OPEN" {
return false
}
if signal.Mergeable == "CONFLICTING" {
return true
}
if signal.HasUnresolvedThreads() && signal.CheckStatus == "FAILURE" {
return true
}
return false
}
// Execute posts a comment on the PR asking for a fix.
func (h *SendFixCommandHandler) Execute(ctx context.Context, signal *jobrunner.PipelineSignal) (*jobrunner.ActionResult, error) {
start := time.Now()
var message string
if signal.Mergeable == "CONFLICTING" {
message = "Can you fix the merge conflict?"
} else {
message = "Can you fix the code reviews?"
}
err := h.forge.CreateIssueComment(
signal.RepoOwner, signal.RepoName,
int64(signal.PRNumber), message,
)
result := &jobrunner.ActionResult{
Action: "send_fix_command",
RepoOwner: signal.RepoOwner,
RepoName: signal.RepoName,
PRNumber: signal.PRNumber,
Success: err == nil,
Timestamp: time.Now(),
Duration: time.Since(start),
}
if err != nil {
result.Error = fmt.Sprintf("post comment failed: %v", err)
}
return result, nil
}

View file

@ -0,0 +1,87 @@
package handlers
import (
"context"
"io"
"net/http"
"net/http/httptest"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"forge.lthn.ai/core/go-scm/jobrunner"
)
func TestSendFixCommand_Match_Good_Conflicting(t *testing.T) {
h := NewSendFixCommandHandler(nil)
sig := &jobrunner.PipelineSignal{
PRState: "OPEN",
Mergeable: "CONFLICTING",
}
assert.True(t, h.Match(sig))
}
func TestSendFixCommand_Match_Good_UnresolvedThreads(t *testing.T) {
h := NewSendFixCommandHandler(nil)
sig := &jobrunner.PipelineSignal{
PRState: "OPEN",
Mergeable: "MERGEABLE",
CheckStatus: "FAILURE",
ThreadsTotal: 3,
ThreadsResolved: 1,
}
assert.True(t, h.Match(sig))
}
func TestSendFixCommand_Match_Bad_Clean(t *testing.T) {
h := NewSendFixCommandHandler(nil)
sig := &jobrunner.PipelineSignal{
PRState: "OPEN",
Mergeable: "MERGEABLE",
CheckStatus: "SUCCESS",
ThreadsTotal: 2,
ThreadsResolved: 2,
}
assert.False(t, h.Match(sig))
}
func TestSendFixCommand_Execute_Good_Conflict(t *testing.T) {
var capturedMethod string
var capturedPath string
var capturedBody string
srv := httptest.NewServer(withVersion(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
capturedMethod = r.Method
capturedPath = r.URL.Path
b, _ := io.ReadAll(r.Body)
capturedBody = string(b)
w.WriteHeader(http.StatusCreated)
_, _ = w.Write([]byte(`{"id":1}`))
})))
defer srv.Close()
client := newTestForgeClient(t, srv.URL)
h := NewSendFixCommandHandler(client)
sig := &jobrunner.PipelineSignal{
RepoOwner: "host-uk",
RepoName: "core-tenant",
PRNumber: 17,
PRState: "OPEN",
Mergeable: "CONFLICTING",
}
result, err := h.Execute(context.Background(), sig)
require.NoError(t, err)
assert.Equal(t, http.MethodPost, capturedMethod)
assert.Equal(t, "/api/v1/repos/host-uk/core-tenant/issues/17/comments", capturedPath)
assert.Contains(t, capturedBody, "fix the merge conflict")
assert.True(t, result.Success)
assert.Equal(t, "send_fix_command", result.Action)
assert.Equal(t, "host-uk", result.RepoOwner)
assert.Equal(t, "core-tenant", result.RepoName)
assert.Equal(t, 17, result.PRNumber)
}

View file

@ -0,0 +1,35 @@
package handlers
import (
"net/http"
"strings"
"testing"
"github.com/stretchr/testify/require"
"forge.lthn.ai/core/go-scm/forge"
)
// forgejoVersionResponse is the JSON response for /api/v1/version.
const forgejoVersionResponse = `{"version":"9.0.0"}`
// withVersion wraps an HTTP handler to also serve the Forgejo version endpoint
// that the SDK calls during NewClient initialization.
func withVersion(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if strings.HasSuffix(r.URL.Path, "/version") {
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(forgejoVersionResponse))
return
}
next.ServeHTTP(w, r)
})
}
// newTestForgeClient creates a forge.Client pointing at the given test server URL.
func newTestForgeClient(t *testing.T, url string) *forge.Client {
t.Helper()
client, err := forge.New(url, "test-token")
require.NoError(t, err)
return client
}

View file

@ -0,0 +1,100 @@
package handlers
import (
"context"
"fmt"
"strings"
"time"
forgejosdk "codeberg.org/mvdkleijn/forgejo-sdk/forgejo/v2"
"forge.lthn.ai/core/go-scm/forge"
"forge.lthn.ai/core/go-scm/jobrunner"
)
// TickParentHandler ticks a child checkbox in the parent epic issue body
// after the child's PR has been merged.
type TickParentHandler struct {
forge *forge.Client
}
// NewTickParentHandler creates a handler that ticks parent epic checkboxes.
func NewTickParentHandler(f *forge.Client) *TickParentHandler {
return &TickParentHandler{forge: f}
}
// Name returns the handler identifier.
func (h *TickParentHandler) Name() string {
return "tick_parent"
}
// Match returns true when the child PR has been merged.
func (h *TickParentHandler) Match(signal *jobrunner.PipelineSignal) bool {
return signal.PRState == "MERGED"
}
// Execute fetches the epic body, replaces the unchecked checkbox for the
// child issue with a checked one, updates the epic, and closes the child issue.
func (h *TickParentHandler) Execute(ctx context.Context, signal *jobrunner.PipelineSignal) (*jobrunner.ActionResult, error) {
start := time.Now()
// Fetch the epic issue body.
epic, err := h.forge.GetIssue(signal.RepoOwner, signal.RepoName, int64(signal.EpicNumber))
if err != nil {
return nil, fmt.Errorf("tick_parent: fetch epic: %w", err)
}
oldBody := epic.Body
unchecked := fmt.Sprintf("- [ ] #%d", signal.ChildNumber)
checked := fmt.Sprintf("- [x] #%d", signal.ChildNumber)
if !strings.Contains(oldBody, unchecked) {
// Already ticked or not found -- nothing to do.
return &jobrunner.ActionResult{
Action: "tick_parent",
RepoOwner: signal.RepoOwner,
RepoName: signal.RepoName,
PRNumber: signal.PRNumber,
Success: true,
Timestamp: time.Now(),
Duration: time.Since(start),
}, nil
}
newBody := strings.Replace(oldBody, unchecked, checked, 1)
// Update the epic body.
_, err = h.forge.EditIssue(signal.RepoOwner, signal.RepoName, int64(signal.EpicNumber), forgejosdk.EditIssueOption{
Body: &newBody,
})
if err != nil {
return &jobrunner.ActionResult{
Action: "tick_parent",
RepoOwner: signal.RepoOwner,
RepoName: signal.RepoName,
PRNumber: signal.PRNumber,
Error: fmt.Sprintf("edit epic failed: %v", err),
Timestamp: time.Now(),
Duration: time.Since(start),
}, nil
}
// Close the child issue.
err = h.forge.CloseIssue(signal.RepoOwner, signal.RepoName, int64(signal.ChildNumber))
result := &jobrunner.ActionResult{
Action: "tick_parent",
RepoOwner: signal.RepoOwner,
RepoName: signal.RepoName,
PRNumber: signal.PRNumber,
Success: err == nil,
Timestamp: time.Now(),
Duration: time.Since(start),
}
if err != nil {
result.Error = fmt.Sprintf("close child issue failed: %v", err)
}
return result, nil
}

View file

@ -0,0 +1,98 @@
package handlers
import (
"context"
"encoding/json"
"io"
"net/http"
"net/http/httptest"
"strings"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"forge.lthn.ai/core/go-scm/jobrunner"
)
func TestTickParent_Match_Good(t *testing.T) {
h := NewTickParentHandler(nil)
sig := &jobrunner.PipelineSignal{
PRState: "MERGED",
}
assert.True(t, h.Match(sig))
}
func TestTickParent_Match_Bad_Open(t *testing.T) {
h := NewTickParentHandler(nil)
sig := &jobrunner.PipelineSignal{
PRState: "OPEN",
}
assert.False(t, h.Match(sig))
}
func TestTickParent_Execute_Good(t *testing.T) {
epicBody := "## Tasks\n- [x] #1\n- [ ] #7\n- [ ] #8\n"
var editBody string
var closeCalled bool
srv := httptest.NewServer(withVersion(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
path := r.URL.Path
method := r.Method
w.Header().Set("Content-Type", "application/json")
switch {
// GET issue (fetch epic)
case method == http.MethodGet && strings.Contains(path, "/issues/42"):
_ = json.NewEncoder(w).Encode(map[string]any{
"number": 42,
"body": epicBody,
"title": "Epic",
})
// PATCH issue (edit epic body)
case method == http.MethodPatch && strings.Contains(path, "/issues/42"):
b, _ := io.ReadAll(r.Body)
editBody = string(b)
_ = json.NewEncoder(w).Encode(map[string]any{
"number": 42,
"body": editBody,
"title": "Epic",
})
// PATCH issue (close child — state: closed)
case method == http.MethodPatch && strings.Contains(path, "/issues/7"):
closeCalled = true
_ = json.NewEncoder(w).Encode(map[string]any{
"number": 7,
"state": "closed",
})
default:
w.WriteHeader(http.StatusNotFound)
}
})))
defer srv.Close()
client := newTestForgeClient(t, srv.URL)
h := NewTickParentHandler(client)
sig := &jobrunner.PipelineSignal{
RepoOwner: "host-uk",
RepoName: "core-php",
EpicNumber: 42,
ChildNumber: 7,
PRNumber: 99,
PRState: "MERGED",
}
result, err := h.Execute(context.Background(), sig)
require.NoError(t, err)
assert.True(t, result.Success)
assert.Equal(t, "tick_parent", result.Action)
// Verify the edit body contains the checked checkbox.
assert.Contains(t, editBody, "- [x] #7")
assert.True(t, closeCalled, "expected child issue to be closed")
}

170
jobrunner/journal.go Normal file
View file

@ -0,0 +1,170 @@
package jobrunner
import (
"encoding/json"
"fmt"
"os"
"path/filepath"
"regexp"
"strings"
"sync"
)
// validPathComponent matches safe repo owner/name characters (alphanumeric, hyphen, underscore, dot).
var validPathComponent = regexp.MustCompile(`^[a-zA-Z0-9][a-zA-Z0-9._-]*$`)
// JournalEntry is a single line in the JSONL audit log.
type JournalEntry struct {
Timestamp string `json:"ts"`
Epic int `json:"epic"`
Child int `json:"child"`
PR int `json:"pr"`
Repo string `json:"repo"`
Action string `json:"action"`
Signals SignalSnapshot `json:"signals"`
Result ResultSnapshot `json:"result"`
Cycle int `json:"cycle"`
}
// SignalSnapshot captures the structural state of a PR at the time of action.
type SignalSnapshot struct {
PRState string `json:"pr_state"`
IsDraft bool `json:"is_draft"`
CheckStatus string `json:"check_status"`
Mergeable string `json:"mergeable"`
ThreadsTotal int `json:"threads_total"`
ThreadsResolved int `json:"threads_resolved"`
}
// ResultSnapshot captures the outcome of an action.
type ResultSnapshot struct {
Success bool `json:"success"`
Error string `json:"error,omitempty"`
DurationMs int64 `json:"duration_ms"`
}
// Journal writes ActionResult entries to date-partitioned JSONL files.
type Journal struct {
baseDir string
mu sync.Mutex
}
// NewJournal creates a new Journal rooted at baseDir.
func NewJournal(baseDir string) (*Journal, error) {
if baseDir == "" {
return nil, fmt.Errorf("journal base directory is required")
}
return &Journal{baseDir: baseDir}, nil
}
// sanitizePathComponent validates a single path component (owner or repo name)
// to prevent path traversal attacks. It rejects "..", empty strings, paths
// containing separators, and any value outside the safe character set.
func sanitizePathComponent(name string) (string, error) {
// Reject empty or whitespace-only values.
if name == "" || strings.TrimSpace(name) == "" {
return "", fmt.Errorf("invalid path component: %q", name)
}
// Reject inputs containing path separators (directory traversal attempt).
if strings.ContainsAny(name, `/\`) {
return "", fmt.Errorf("path component contains directory separator: %q", name)
}
// Use filepath.Clean to normalize (e.g., collapse redundant dots).
clean := filepath.Clean(name)
// Reject traversal components.
if clean == "." || clean == ".." {
return "", fmt.Errorf("invalid path component: %q", name)
}
// Validate against the safe character set.
if !validPathComponent.MatchString(clean) {
return "", fmt.Errorf("path component contains invalid characters: %q", name)
}
return clean, nil
}
// Append writes a journal entry for the given signal and result.
func (j *Journal) Append(signal *PipelineSignal, result *ActionResult) error {
if signal == nil {
return fmt.Errorf("signal is required")
}
if result == nil {
return fmt.Errorf("result is required")
}
entry := JournalEntry{
Timestamp: result.Timestamp.UTC().Format("2006-01-02T15:04:05Z"),
Epic: signal.EpicNumber,
Child: signal.ChildNumber,
PR: signal.PRNumber,
Repo: signal.RepoFullName(),
Action: result.Action,
Signals: SignalSnapshot{
PRState: signal.PRState,
IsDraft: signal.IsDraft,
CheckStatus: signal.CheckStatus,
Mergeable: signal.Mergeable,
ThreadsTotal: signal.ThreadsTotal,
ThreadsResolved: signal.ThreadsResolved,
},
Result: ResultSnapshot{
Success: result.Success,
Error: result.Error,
DurationMs: result.Duration.Milliseconds(),
},
Cycle: result.Cycle,
}
data, err := json.Marshal(entry)
if err != nil {
return fmt.Errorf("marshal journal entry: %w", err)
}
data = append(data, '\n')
// Sanitize path components to prevent path traversal (CVE: issue #46).
owner, err := sanitizePathComponent(signal.RepoOwner)
if err != nil {
return fmt.Errorf("invalid repo owner: %w", err)
}
repo, err := sanitizePathComponent(signal.RepoName)
if err != nil {
return fmt.Errorf("invalid repo name: %w", err)
}
date := result.Timestamp.UTC().Format("2006-01-02")
dir := filepath.Join(j.baseDir, owner, repo)
// Resolve to absolute path and verify it stays within baseDir.
absBase, err := filepath.Abs(j.baseDir)
if err != nil {
return fmt.Errorf("resolve base directory: %w", err)
}
absDir, err := filepath.Abs(dir)
if err != nil {
return fmt.Errorf("resolve journal directory: %w", err)
}
if !strings.HasPrefix(absDir, absBase+string(filepath.Separator)) {
return fmt.Errorf("journal path %q escapes base directory %q", absDir, absBase)
}
j.mu.Lock()
defer j.mu.Unlock()
if err := os.MkdirAll(dir, 0o755); err != nil {
return fmt.Errorf("create journal directory: %w", err)
}
path := filepath.Join(dir, date+".jsonl")
f, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644)
if err != nil {
return fmt.Errorf("open journal file: %w", err)
}
defer func() { _ = f.Close() }()
_, err = f.Write(data)
return err
}

263
jobrunner/journal_test.go Normal file
View file

@ -0,0 +1,263 @@
package jobrunner
import (
"bufio"
"encoding/json"
"os"
"path/filepath"
"strings"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestJournal_Append_Good(t *testing.T) {
dir := t.TempDir()
j, err := NewJournal(dir)
require.NoError(t, err)
ts := time.Date(2026, 2, 5, 14, 30, 0, 0, time.UTC)
signal := &PipelineSignal{
EpicNumber: 10,
ChildNumber: 3,
PRNumber: 55,
RepoOwner: "host-uk",
RepoName: "core-tenant",
PRState: "OPEN",
IsDraft: false,
Mergeable: "MERGEABLE",
CheckStatus: "SUCCESS",
ThreadsTotal: 2,
ThreadsResolved: 1,
LastCommitSHA: "abc123",
LastCommitAt: ts,
LastReviewAt: ts,
}
result := &ActionResult{
Action: "merge",
RepoOwner: "host-uk",
RepoName: "core-tenant",
EpicNumber: 10,
ChildNumber: 3,
PRNumber: 55,
Success: true,
Timestamp: ts,
Duration: 1200 * time.Millisecond,
Cycle: 1,
}
err = j.Append(signal, result)
require.NoError(t, err)
// Read the file back.
expectedPath := filepath.Join(dir, "host-uk", "core-tenant", "2026-02-05.jsonl")
f, err := os.Open(expectedPath)
require.NoError(t, err)
defer func() { _ = f.Close() }()
scanner := bufio.NewScanner(f)
require.True(t, scanner.Scan(), "expected at least one line in JSONL file")
var entry JournalEntry
err = json.Unmarshal(scanner.Bytes(), &entry)
require.NoError(t, err)
assert.Equal(t, "2026-02-05T14:30:00Z", entry.Timestamp)
assert.Equal(t, 10, entry.Epic)
assert.Equal(t, 3, entry.Child)
assert.Equal(t, 55, entry.PR)
assert.Equal(t, "host-uk/core-tenant", entry.Repo)
assert.Equal(t, "merge", entry.Action)
assert.Equal(t, 1, entry.Cycle)
// Verify signal snapshot.
assert.Equal(t, "OPEN", entry.Signals.PRState)
assert.Equal(t, false, entry.Signals.IsDraft)
assert.Equal(t, "SUCCESS", entry.Signals.CheckStatus)
assert.Equal(t, "MERGEABLE", entry.Signals.Mergeable)
assert.Equal(t, 2, entry.Signals.ThreadsTotal)
assert.Equal(t, 1, entry.Signals.ThreadsResolved)
// Verify result snapshot.
assert.Equal(t, true, entry.Result.Success)
assert.Equal(t, "", entry.Result.Error)
assert.Equal(t, int64(1200), entry.Result.DurationMs)
// Append a second entry and verify two lines exist.
result2 := &ActionResult{
Action: "comment",
RepoOwner: "host-uk",
RepoName: "core-tenant",
Success: false,
Error: "rate limited",
Timestamp: ts,
Duration: 50 * time.Millisecond,
Cycle: 2,
}
err = j.Append(signal, result2)
require.NoError(t, err)
data, err := os.ReadFile(expectedPath)
require.NoError(t, err)
lines := 0
sc := bufio.NewScanner(strings.NewReader(string(data)))
for sc.Scan() {
lines++
}
assert.Equal(t, 2, lines, "expected two JSONL lines after two appends")
}
func TestJournal_Append_Bad_PathTraversal(t *testing.T) {
dir := t.TempDir()
j, err := NewJournal(dir)
require.NoError(t, err)
ts := time.Now()
tests := []struct {
name string
repoOwner string
repoName string
wantErr string
}{
{
name: "dotdot owner",
repoOwner: "..",
repoName: "core",
wantErr: "invalid repo owner",
},
{
name: "dotdot repo",
repoOwner: "host-uk",
repoName: "../../etc/cron.d",
wantErr: "invalid repo name",
},
{
name: "slash in owner",
repoOwner: "../etc",
repoName: "core",
wantErr: "invalid repo owner",
},
{
name: "absolute path in repo",
repoOwner: "host-uk",
repoName: "/etc/passwd",
wantErr: "invalid repo name",
},
{
name: "empty owner",
repoOwner: "",
repoName: "core",
wantErr: "invalid repo owner",
},
{
name: "empty repo",
repoOwner: "host-uk",
repoName: "",
wantErr: "invalid repo name",
},
{
name: "dot only owner",
repoOwner: ".",
repoName: "core",
wantErr: "invalid repo owner",
},
{
name: "spaces only owner",
repoOwner: " ",
repoName: "core",
wantErr: "invalid repo owner",
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
signal := &PipelineSignal{
RepoOwner: tc.repoOwner,
RepoName: tc.repoName,
}
result := &ActionResult{
Action: "merge",
Timestamp: ts,
}
err := j.Append(signal, result)
require.Error(t, err)
assert.Contains(t, err.Error(), tc.wantErr)
})
}
}
func TestJournal_Append_Good_ValidNames(t *testing.T) {
dir := t.TempDir()
j, err := NewJournal(dir)
require.NoError(t, err)
ts := time.Date(2026, 2, 5, 14, 30, 0, 0, time.UTC)
// Verify valid names with dots, hyphens, underscores all work.
validNames := []struct {
owner string
repo string
}{
{"host-uk", "core"},
{"my_org", "my_repo"},
{"org.name", "repo.v2"},
{"a", "b"},
{"Org-123", "Repo_456.go"},
}
for _, vn := range validNames {
signal := &PipelineSignal{
RepoOwner: vn.owner,
RepoName: vn.repo,
}
result := &ActionResult{
Action: "test",
Timestamp: ts,
}
err := j.Append(signal, result)
assert.NoError(t, err, "expected valid name pair %s/%s to succeed", vn.owner, vn.repo)
}
}
func TestJournal_Append_Bad_NilSignal(t *testing.T) {
dir := t.TempDir()
j, err := NewJournal(dir)
require.NoError(t, err)
result := &ActionResult{
Action: "merge",
Timestamp: time.Now(),
}
err = j.Append(nil, result)
require.Error(t, err)
assert.Contains(t, err.Error(), "signal is required")
}
func TestJournal_Append_Bad_NilResult(t *testing.T) {
dir := t.TempDir()
j, err := NewJournal(dir)
require.NoError(t, err)
signal := &PipelineSignal{
RepoOwner: "host-uk",
RepoName: "core-php",
}
err = j.Append(signal, nil)
require.Error(t, err)
assert.Contains(t, err.Error(), "result is required")
}

195
jobrunner/poller.go Normal file
View file

@ -0,0 +1,195 @@
package jobrunner
import (
"context"
"sync"
"time"
"forge.lthn.ai/core/go/pkg/log"
)
// PollerConfig configures a Poller.
type PollerConfig struct {
Sources []JobSource
Handlers []JobHandler
Journal *Journal
PollInterval time.Duration
DryRun bool
}
// Poller discovers signals from sources and dispatches them to handlers.
type Poller struct {
mu sync.RWMutex
sources []JobSource
handlers []JobHandler
journal *Journal
interval time.Duration
dryRun bool
cycle int
}
// NewPoller creates a Poller from the given config.
func NewPoller(cfg PollerConfig) *Poller {
interval := cfg.PollInterval
if interval <= 0 {
interval = 60 * time.Second
}
return &Poller{
sources: cfg.Sources,
handlers: cfg.Handlers,
journal: cfg.Journal,
interval: interval,
dryRun: cfg.DryRun,
}
}
// Cycle returns the number of completed poll-dispatch cycles.
func (p *Poller) Cycle() int {
p.mu.RLock()
defer p.mu.RUnlock()
return p.cycle
}
// DryRun returns whether dry-run mode is enabled.
func (p *Poller) DryRun() bool {
p.mu.RLock()
defer p.mu.RUnlock()
return p.dryRun
}
// SetDryRun enables or disables dry-run mode.
func (p *Poller) SetDryRun(v bool) {
p.mu.Lock()
p.dryRun = v
p.mu.Unlock()
}
// AddSource appends a source to the poller.
func (p *Poller) AddSource(s JobSource) {
p.mu.Lock()
p.sources = append(p.sources, s)
p.mu.Unlock()
}
// AddHandler appends a handler to the poller.
func (p *Poller) AddHandler(h JobHandler) {
p.mu.Lock()
p.handlers = append(p.handlers, h)
p.mu.Unlock()
}
// Run starts a blocking poll-dispatch loop. It runs one cycle immediately,
// then repeats on each tick of the configured interval until the context
// is cancelled.
func (p *Poller) Run(ctx context.Context) error {
if err := p.RunOnce(ctx); err != nil {
return err
}
ticker := time.NewTicker(p.interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
if err := p.RunOnce(ctx); err != nil {
return err
}
}
}
}
// RunOnce performs a single poll-dispatch cycle: iterate sources, poll each,
// find the first matching handler for each signal, and execute it.
func (p *Poller) RunOnce(ctx context.Context) error {
p.mu.Lock()
p.cycle++
cycle := p.cycle
dryRun := p.dryRun
sources := make([]JobSource, len(p.sources))
copy(sources, p.sources)
handlers := make([]JobHandler, len(p.handlers))
copy(handlers, p.handlers)
p.mu.Unlock()
log.Info("poller cycle starting", "cycle", cycle, "sources", len(sources), "handlers", len(handlers))
for _, src := range sources {
signals, err := src.Poll(ctx)
if err != nil {
log.Error("poll failed", "source", src.Name(), "err", err)
continue
}
log.Info("polled source", "source", src.Name(), "signals", len(signals))
for _, sig := range signals {
handler := p.findHandler(handlers, sig)
if handler == nil {
log.Debug("no matching handler", "epic", sig.EpicNumber, "child", sig.ChildNumber)
continue
}
if dryRun {
log.Info("dry-run: would execute",
"handler", handler.Name(),
"epic", sig.EpicNumber,
"child", sig.ChildNumber,
"pr", sig.PRNumber,
)
continue
}
start := time.Now()
result, err := handler.Execute(ctx, sig)
elapsed := time.Since(start)
if err != nil {
log.Error("handler execution failed",
"handler", handler.Name(),
"epic", sig.EpicNumber,
"child", sig.ChildNumber,
"err", err,
)
continue
}
result.Cycle = cycle
result.EpicNumber = sig.EpicNumber
result.ChildNumber = sig.ChildNumber
result.Duration = elapsed
if p.journal != nil {
if jErr := p.journal.Append(sig, result); jErr != nil {
log.Error("journal append failed", "err", jErr)
}
}
if rErr := src.Report(ctx, result); rErr != nil {
log.Error("source report failed", "source", src.Name(), "err", rErr)
}
log.Info("handler executed",
"handler", handler.Name(),
"action", result.Action,
"success", result.Success,
"duration", elapsed,
)
}
}
return nil
}
// findHandler returns the first handler that matches the signal, or nil.
func (p *Poller) findHandler(handlers []JobHandler, sig *PipelineSignal) JobHandler {
for _, h := range handlers {
if h.Match(sig) {
return h
}
}
return nil
}

307
jobrunner/poller_test.go Normal file
View file

@ -0,0 +1,307 @@
package jobrunner
import (
"context"
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// --- Mock source ---
type mockSource struct {
name string
signals []*PipelineSignal
reports []*ActionResult
mu sync.Mutex
}
func (m *mockSource) Name() string { return m.name }
func (m *mockSource) Poll(_ context.Context) ([]*PipelineSignal, error) {
m.mu.Lock()
defer m.mu.Unlock()
return m.signals, nil
}
func (m *mockSource) Report(_ context.Context, result *ActionResult) error {
m.mu.Lock()
defer m.mu.Unlock()
m.reports = append(m.reports, result)
return nil
}
// --- Mock handler ---
type mockHandler struct {
name string
matchFn func(*PipelineSignal) bool
executed []*PipelineSignal
mu sync.Mutex
}
func (m *mockHandler) Name() string { return m.name }
func (m *mockHandler) Match(sig *PipelineSignal) bool {
if m.matchFn != nil {
return m.matchFn(sig)
}
return true
}
func (m *mockHandler) Execute(_ context.Context, sig *PipelineSignal) (*ActionResult, error) {
m.mu.Lock()
defer m.mu.Unlock()
m.executed = append(m.executed, sig)
return &ActionResult{
Action: m.name,
RepoOwner: sig.RepoOwner,
RepoName: sig.RepoName,
PRNumber: sig.PRNumber,
Success: true,
Timestamp: time.Now(),
}, nil
}
func TestPoller_RunOnce_Good(t *testing.T) {
sig := &PipelineSignal{
EpicNumber: 1,
ChildNumber: 2,
PRNumber: 10,
RepoOwner: "host-uk",
RepoName: "core-php",
PRState: "OPEN",
CheckStatus: "SUCCESS",
Mergeable: "MERGEABLE",
}
src := &mockSource{
name: "test-source",
signals: []*PipelineSignal{sig},
}
handler := &mockHandler{
name: "test-handler",
matchFn: func(s *PipelineSignal) bool {
return s.PRNumber == 10
},
}
p := NewPoller(PollerConfig{
Sources: []JobSource{src},
Handlers: []JobHandler{handler},
})
err := p.RunOnce(context.Background())
require.NoError(t, err)
// Handler should have been called with our signal.
handler.mu.Lock()
defer handler.mu.Unlock()
require.Len(t, handler.executed, 1)
assert.Equal(t, 10, handler.executed[0].PRNumber)
// Source should have received a report.
src.mu.Lock()
defer src.mu.Unlock()
require.Len(t, src.reports, 1)
assert.Equal(t, "test-handler", src.reports[0].Action)
assert.True(t, src.reports[0].Success)
assert.Equal(t, 1, src.reports[0].Cycle)
assert.Equal(t, 1, src.reports[0].EpicNumber)
assert.Equal(t, 2, src.reports[0].ChildNumber)
// Cycle counter should have incremented.
assert.Equal(t, 1, p.Cycle())
}
func TestPoller_RunOnce_Good_NoSignals(t *testing.T) {
src := &mockSource{
name: "empty-source",
signals: nil,
}
handler := &mockHandler{
name: "unused-handler",
}
p := NewPoller(PollerConfig{
Sources: []JobSource{src},
Handlers: []JobHandler{handler},
})
err := p.RunOnce(context.Background())
require.NoError(t, err)
// Handler should not have been called.
handler.mu.Lock()
defer handler.mu.Unlock()
assert.Empty(t, handler.executed)
// Source should not have received reports.
src.mu.Lock()
defer src.mu.Unlock()
assert.Empty(t, src.reports)
assert.Equal(t, 1, p.Cycle())
}
func TestPoller_RunOnce_Good_NoMatchingHandler(t *testing.T) {
sig := &PipelineSignal{
EpicNumber: 5,
ChildNumber: 8,
PRNumber: 42,
RepoOwner: "host-uk",
RepoName: "core-tenant",
PRState: "OPEN",
}
src := &mockSource{
name: "test-source",
signals: []*PipelineSignal{sig},
}
handler := &mockHandler{
name: "picky-handler",
matchFn: func(s *PipelineSignal) bool {
return false // never matches
},
}
p := NewPoller(PollerConfig{
Sources: []JobSource{src},
Handlers: []JobHandler{handler},
})
err := p.RunOnce(context.Background())
require.NoError(t, err)
// Handler should not have been called.
handler.mu.Lock()
defer handler.mu.Unlock()
assert.Empty(t, handler.executed)
// Source should not have received reports (no action taken).
src.mu.Lock()
defer src.mu.Unlock()
assert.Empty(t, src.reports)
}
func TestPoller_RunOnce_Good_DryRun(t *testing.T) {
sig := &PipelineSignal{
EpicNumber: 1,
ChildNumber: 3,
PRNumber: 20,
RepoOwner: "host-uk",
RepoName: "core-admin",
PRState: "OPEN",
CheckStatus: "SUCCESS",
Mergeable: "MERGEABLE",
}
src := &mockSource{
name: "test-source",
signals: []*PipelineSignal{sig},
}
handler := &mockHandler{
name: "merge-handler",
matchFn: func(s *PipelineSignal) bool {
return true
},
}
p := NewPoller(PollerConfig{
Sources: []JobSource{src},
Handlers: []JobHandler{handler},
DryRun: true,
})
assert.True(t, p.DryRun())
err := p.RunOnce(context.Background())
require.NoError(t, err)
// Handler should NOT have been called in dry-run mode.
handler.mu.Lock()
defer handler.mu.Unlock()
assert.Empty(t, handler.executed)
// Source should not have received reports.
src.mu.Lock()
defer src.mu.Unlock()
assert.Empty(t, src.reports)
}
func TestPoller_SetDryRun_Good(t *testing.T) {
p := NewPoller(PollerConfig{})
assert.False(t, p.DryRun())
p.SetDryRun(true)
assert.True(t, p.DryRun())
p.SetDryRun(false)
assert.False(t, p.DryRun())
}
func TestPoller_AddSourceAndHandler_Good(t *testing.T) {
p := NewPoller(PollerConfig{})
sig := &PipelineSignal{
EpicNumber: 1,
ChildNumber: 1,
PRNumber: 5,
RepoOwner: "host-uk",
RepoName: "core-php",
PRState: "OPEN",
}
src := &mockSource{
name: "added-source",
signals: []*PipelineSignal{sig},
}
handler := &mockHandler{
name: "added-handler",
matchFn: func(s *PipelineSignal) bool { return true },
}
p.AddSource(src)
p.AddHandler(handler)
err := p.RunOnce(context.Background())
require.NoError(t, err)
handler.mu.Lock()
defer handler.mu.Unlock()
require.Len(t, handler.executed, 1)
assert.Equal(t, 5, handler.executed[0].PRNumber)
}
func TestPoller_Run_Good(t *testing.T) {
src := &mockSource{
name: "tick-source",
signals: nil,
}
p := NewPoller(PollerConfig{
Sources: []JobSource{src},
PollInterval: 50 * time.Millisecond,
})
ctx, cancel := context.WithTimeout(context.Background(), 180*time.Millisecond)
defer cancel()
err := p.Run(ctx)
assert.ErrorIs(t, err, context.DeadlineExceeded)
// Should have completed at least 2 cycles (one immediate + at least one tick).
assert.GreaterOrEqual(t, p.Cycle(), 2)
}
func TestPoller_DefaultInterval_Good(t *testing.T) {
p := NewPoller(PollerConfig{})
assert.Equal(t, 60*time.Second, p.interval)
}

72
jobrunner/types.go Normal file
View file

@ -0,0 +1,72 @@
package jobrunner
import (
"context"
"time"
)
// PipelineSignal is the structural snapshot of a child issue/PR.
// Carries structural state plus issue title/body for dispatch prompts.
type PipelineSignal struct {
EpicNumber int
ChildNumber int
PRNumber int
RepoOwner string
RepoName string
PRState string // OPEN, MERGED, CLOSED
IsDraft bool
Mergeable string // MERGEABLE, CONFLICTING, UNKNOWN
CheckStatus string // SUCCESS, FAILURE, PENDING
ThreadsTotal int
ThreadsResolved int
LastCommitSHA string
LastCommitAt time.Time
LastReviewAt time.Time
NeedsCoding bool // true if child has no PR (work not started)
Assignee string // issue assignee username (for dispatch)
IssueTitle string // child issue title (for dispatch prompt)
IssueBody string // child issue body (for dispatch prompt)
Type string // signal type (e.g., "agent_completion")
Success bool // agent completion success flag
Error string // agent error message
Message string // agent completion message
}
// RepoFullName returns "owner/repo".
func (s *PipelineSignal) RepoFullName() string {
return s.RepoOwner + "/" + s.RepoName
}
// HasUnresolvedThreads returns true if there are unresolved review threads.
func (s *PipelineSignal) HasUnresolvedThreads() bool {
return s.ThreadsTotal > s.ThreadsResolved
}
// ActionResult carries the outcome of a handler execution.
type ActionResult struct {
Action string `json:"action"`
RepoOwner string `json:"repo_owner"`
RepoName string `json:"repo_name"`
EpicNumber int `json:"epic"`
ChildNumber int `json:"child"`
PRNumber int `json:"pr"`
Success bool `json:"success"`
Error string `json:"error,omitempty"`
Timestamp time.Time `json:"ts"`
Duration time.Duration `json:"duration_ms"`
Cycle int `json:"cycle"`
}
// JobSource discovers actionable work from an external system.
type JobSource interface {
Name() string
Poll(ctx context.Context) ([]*PipelineSignal, error)
Report(ctx context.Context, result *ActionResult) error
}
// JobHandler processes a single pipeline signal.
type JobHandler interface {
Name() string
Match(signal *PipelineSignal) bool
Execute(ctx context.Context, signal *PipelineSignal) (*ActionResult, error)
}

98
jobrunner/types_test.go Normal file
View file

@ -0,0 +1,98 @@
package jobrunner
import (
"encoding/json"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestPipelineSignal_RepoFullName_Good(t *testing.T) {
sig := &PipelineSignal{
RepoOwner: "host-uk",
RepoName: "core-php",
}
assert.Equal(t, "host-uk/core-php", sig.RepoFullName())
}
func TestPipelineSignal_HasUnresolvedThreads_Good(t *testing.T) {
sig := &PipelineSignal{
ThreadsTotal: 5,
ThreadsResolved: 3,
}
assert.True(t, sig.HasUnresolvedThreads())
}
func TestPipelineSignal_HasUnresolvedThreads_Bad_AllResolved(t *testing.T) {
sig := &PipelineSignal{
ThreadsTotal: 4,
ThreadsResolved: 4,
}
assert.False(t, sig.HasUnresolvedThreads())
// Also verify zero threads is not unresolved.
sigZero := &PipelineSignal{
ThreadsTotal: 0,
ThreadsResolved: 0,
}
assert.False(t, sigZero.HasUnresolvedThreads())
}
func TestActionResult_JSON_Good(t *testing.T) {
ts := time.Date(2026, 2, 5, 12, 0, 0, 0, time.UTC)
result := &ActionResult{
Action: "merge",
RepoOwner: "host-uk",
RepoName: "core-tenant",
EpicNumber: 42,
ChildNumber: 7,
PRNumber: 99,
Success: true,
Timestamp: ts,
Duration: 1500 * time.Millisecond,
Cycle: 3,
}
data, err := json.Marshal(result)
require.NoError(t, err)
var decoded map[string]any
err = json.Unmarshal(data, &decoded)
require.NoError(t, err)
assert.Equal(t, "merge", decoded["action"])
assert.Equal(t, "host-uk", decoded["repo_owner"])
assert.Equal(t, "core-tenant", decoded["repo_name"])
assert.Equal(t, float64(42), decoded["epic"])
assert.Equal(t, float64(7), decoded["child"])
assert.Equal(t, float64(99), decoded["pr"])
assert.Equal(t, true, decoded["success"])
assert.Equal(t, float64(3), decoded["cycle"])
// Error field should be omitted when empty.
_, hasError := decoded["error"]
assert.False(t, hasError, "error field should be omitted when empty")
// Verify round-trip with error field present.
resultWithErr := &ActionResult{
Action: "merge",
RepoOwner: "host-uk",
RepoName: "core-tenant",
Success: false,
Error: "checks failing",
Timestamp: ts,
Duration: 200 * time.Millisecond,
Cycle: 1,
}
data2, err := json.Marshal(resultWithErr)
require.NoError(t, err)
var decoded2 map[string]any
err = json.Unmarshal(data2, &decoded2)
require.NoError(t, err)
assert.Equal(t, "checks failing", decoded2["error"])
assert.Equal(t, false, decoded2["success"])
}